mirror of
https://github.com/rclone/rclone.git
synced 2025-12-26 13:13:31 +00:00
Before this change we used an overcomplicated method of memory reservations in the pool.RW which caused deadlocks. This changes it to use a much simpler reservation system where we actually reserve the memory and store it in the pool.RW. This allows us to use the semaphore.Weighted to count the actually memory in use (rather than the memory in use and in the cache). This in turn allows accurate use of the semaphore by users wanting memory.
371 lines
9.1 KiB
Go
371 lines
9.1 KiB
Go
package pool
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"io"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
// RWAccount is a function which will be called after every read
|
|
// from the RW.
|
|
//
|
|
// It may return an error which will be passed back to the user.
|
|
type RWAccount func(n int) error
|
|
|
|
// RW contains the state for the read/writer
|
|
//
|
|
// It can be used as a FIFO to read data from a source and write it out again.
|
|
type RW struct {
|
|
// Written once variables in initialization
|
|
pool *Pool // pool to get pages from
|
|
account RWAccount // account for a read
|
|
accountOn int // only account on or after this read
|
|
|
|
// Shared variables between Read and Write
|
|
// Write updates these but Read reads from them
|
|
// They must all stay in sync together
|
|
mu sync.Mutex // protect the shared variables
|
|
pages [][]byte // backing store
|
|
size int // size written
|
|
lastOffset int // size in last page
|
|
written chan struct{} // signalled when a write happens
|
|
|
|
// Read side Variables
|
|
out int // offset we are reading from
|
|
reads int // count how many times the data has been read
|
|
|
|
reserved [][]byte // reserved buffers
|
|
}
|
|
|
|
var (
|
|
errInvalidWhence = errors.New("pool.RW Seek: invalid whence")
|
|
errNegativeSeek = errors.New("pool.RW Seek: negative position")
|
|
errSeekPastEnd = errors.New("pool.RW Seek: attempt to seek past end of data")
|
|
)
|
|
|
|
// NewRW returns a reader / writer which is backed from pages from the
|
|
// pool passed in.
|
|
//
|
|
// Data can be stored in it by calling Write and read from it by
|
|
// calling Read.
|
|
//
|
|
// When writing it only appends data. Seek only applies to reading.
|
|
func NewRW(pool *Pool) *RW {
|
|
rw := &RW{
|
|
pool: pool,
|
|
pages: make([][]byte, 0, 16),
|
|
written: make(chan struct{}, 1),
|
|
}
|
|
return rw
|
|
}
|
|
|
|
// Reserve bytes of memory.
|
|
//
|
|
// This allocates n bytes of memory for later use.
|
|
//
|
|
// This is rounded up to the nearest buffer page size.
|
|
//
|
|
// Only safe to call once.
|
|
func (rw *RW) Reserve(n int64) *RW {
|
|
rw.mu.Lock()
|
|
defer rw.mu.Unlock()
|
|
buffers := int((n + int64(rw.pool.bufferSize) - 1) / int64(rw.pool.bufferSize))
|
|
rw.reserved = rw.pool.GetN(buffers)
|
|
return rw
|
|
}
|
|
|
|
// SetAccounting should be provided with a function which will be
|
|
// called after every read from the RW.
|
|
//
|
|
// It may return an error which will be passed back to the user.
|
|
//
|
|
// Not thread safe - call in initialization only.
|
|
func (rw *RW) SetAccounting(account RWAccount) *RW {
|
|
rw.account = account
|
|
return rw
|
|
}
|
|
|
|
// DelayAccountinger enables an accounting delay
|
|
type DelayAccountinger interface {
|
|
// DelayAccounting makes sure the accounting function only
|
|
// gets called on the i-th or later read of the data from this
|
|
// point (counting from 1).
|
|
//
|
|
// This is useful so that we don't account initial reads of
|
|
// the data e.g. when calculating hashes.
|
|
//
|
|
// Set this to 0 to account everything.
|
|
DelayAccounting(i int)
|
|
}
|
|
|
|
// DelayAccounting makes sure the accounting function only gets called
|
|
// on the i-th or later read of the data from this point (counting
|
|
// from 1).
|
|
//
|
|
// This is useful so that we don't account initial reads of the data
|
|
// e.g. when calculating hashes.
|
|
//
|
|
// Set this to 0 to account everything.
|
|
//
|
|
// Not thread safe - call in initialization only.
|
|
func (rw *RW) DelayAccounting(i int) {
|
|
rw.accountOn = i
|
|
rw.reads = 0
|
|
}
|
|
|
|
// Returns the page and offset of i for reading.
|
|
//
|
|
// Ensure there are pages before calling this.
|
|
func (rw *RW) readPage(i int) (page []byte) {
|
|
rw.mu.Lock()
|
|
defer rw.mu.Unlock()
|
|
// Count a read of the data if we read the first page
|
|
if i == 0 {
|
|
rw.reads++
|
|
}
|
|
pageNumber := i / rw.pool.bufferSize
|
|
offset := i % rw.pool.bufferSize
|
|
page = rw.pages[pageNumber]
|
|
// Clip the last page to the amount written
|
|
if pageNumber == len(rw.pages)-1 {
|
|
page = page[:rw.lastOffset]
|
|
}
|
|
return page[offset:]
|
|
}
|
|
|
|
// account for n bytes being read
|
|
func (rw *RW) accountRead(n int) error {
|
|
if rw.account == nil {
|
|
return nil
|
|
}
|
|
// Don't start accounting until we've reached this many reads
|
|
//
|
|
// rw.reads will be 1 the first time this is called
|
|
// rw.accountOn 2 means start accounting on the 2nd read through
|
|
if rw.reads >= rw.accountOn {
|
|
return rw.account(n)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Returns true if we have read to EOF
|
|
func (rw *RW) eof() bool {
|
|
rw.mu.Lock()
|
|
defer rw.mu.Unlock()
|
|
return rw.out >= rw.size
|
|
}
|
|
|
|
// Read reads up to len(p) bytes into p. It returns the number of
|
|
// bytes read (0 <= n <= len(p)) and any error encountered. If some
|
|
// data is available but not len(p) bytes, Read returns what is
|
|
// available instead of waiting for more.
|
|
func (rw *RW) Read(p []byte) (n int, err error) {
|
|
var (
|
|
nn int
|
|
page []byte
|
|
)
|
|
for len(p) > 0 {
|
|
if rw.eof() {
|
|
return n, io.EOF
|
|
}
|
|
page = rw.readPage(rw.out)
|
|
nn = copy(p, page)
|
|
p = p[nn:]
|
|
n += nn
|
|
rw.out += nn
|
|
err = rw.accountRead(nn)
|
|
if err != nil {
|
|
return n, err
|
|
}
|
|
}
|
|
return n, nil
|
|
}
|
|
|
|
// WriteTo writes data to w until there's no more data to write or
|
|
// when an error occurs. The return value n is the number of bytes
|
|
// written. Any error encountered during the write is also returned.
|
|
//
|
|
// The Copy function uses WriteTo if available. This avoids an
|
|
// allocation and a copy.
|
|
func (rw *RW) WriteTo(w io.Writer) (n int64, err error) {
|
|
var (
|
|
nn int
|
|
page []byte
|
|
)
|
|
for !rw.eof() {
|
|
page = rw.readPage(rw.out)
|
|
nn, err = w.Write(page)
|
|
n += int64(nn)
|
|
rw.out += nn
|
|
if err != nil {
|
|
return n, err
|
|
}
|
|
err = rw.accountRead(nn)
|
|
if err != nil {
|
|
return n, err
|
|
}
|
|
}
|
|
return n, nil
|
|
}
|
|
|
|
// Get the page we are writing to
|
|
func (rw *RW) writePage() (page []byte) {
|
|
rw.mu.Lock()
|
|
defer rw.mu.Unlock()
|
|
if len(rw.pages) > 0 && rw.lastOffset < rw.pool.bufferSize {
|
|
return rw.pages[len(rw.pages)-1][rw.lastOffset:]
|
|
}
|
|
if len(rw.reserved) > 0 {
|
|
// Get reserved pages if available
|
|
i := len(rw.reserved) - 1
|
|
page = rw.reserved[i]
|
|
rw.reserved[i] = nil
|
|
rw.reserved = rw.reserved[:i]
|
|
} else {
|
|
page = rw.pool.Get()
|
|
}
|
|
rw.pages = append(rw.pages, page)
|
|
rw.lastOffset = 0
|
|
return page
|
|
}
|
|
|
|
// Write writes len(p) bytes from p to the underlying data stream. It returns
|
|
// the number of bytes written len(p). It cannot return an error.
|
|
func (rw *RW) Write(p []byte) (n int, err error) {
|
|
var (
|
|
nn int
|
|
page []byte
|
|
)
|
|
for len(p) > 0 {
|
|
page = rw.writePage()
|
|
nn = copy(page, p)
|
|
p = p[nn:]
|
|
n += nn
|
|
rw.mu.Lock()
|
|
rw.size += nn
|
|
rw.lastOffset += nn
|
|
rw.mu.Unlock()
|
|
rw.signalWrite() // signal more data available
|
|
}
|
|
return n, nil
|
|
}
|
|
|
|
// ReadFrom reads data from r until EOF or error. The return value n is the
|
|
// number of bytes read. Any error except EOF encountered during the read is
|
|
// also returned.
|
|
//
|
|
// The Copy function uses ReadFrom if available. This avoids an
|
|
// allocation and a copy.
|
|
func (rw *RW) ReadFrom(r io.Reader) (n int64, err error) {
|
|
var (
|
|
nn int
|
|
page []byte
|
|
)
|
|
for err == nil {
|
|
page = rw.writePage()
|
|
nn, err = r.Read(page)
|
|
n += int64(nn)
|
|
rw.mu.Lock()
|
|
rw.size += nn
|
|
rw.lastOffset += nn
|
|
rw.mu.Unlock()
|
|
rw.signalWrite() // signal more data available
|
|
}
|
|
if err == io.EOF {
|
|
err = nil
|
|
}
|
|
return n, err
|
|
}
|
|
|
|
// signal that a write has happened
|
|
func (rw *RW) signalWrite() {
|
|
select {
|
|
case rw.written <- struct{}{}:
|
|
default:
|
|
}
|
|
}
|
|
|
|
// WaitWrite sleeps until a data is written to the RW or Close is
|
|
// called or the context is cancelled occurs or for a maximum of 1
|
|
// Second then returns.
|
|
//
|
|
// This can be used when calling Read while the buffer is filling up.
|
|
func (rw *RW) WaitWrite(ctx context.Context) {
|
|
timer := time.NewTimer(time.Second)
|
|
select {
|
|
case <-timer.C:
|
|
case <-ctx.Done():
|
|
case <-rw.written:
|
|
}
|
|
timer.Stop()
|
|
}
|
|
|
|
// Seek sets the offset for the next Read (not Write - this is always
|
|
// appended) to offset, interpreted according to whence: SeekStart
|
|
// means relative to the start of the file, SeekCurrent means relative
|
|
// to the current offset, and SeekEnd means relative to the end (for
|
|
// example, offset = -2 specifies the penultimate byte of the file).
|
|
// Seek returns the new offset relative to the start of the file or an
|
|
// error, if any.
|
|
//
|
|
// Seeking to an offset before the start of the file is an error. Seeking
|
|
// beyond the end of the written data is an error.
|
|
func (rw *RW) Seek(offset int64, whence int) (int64, error) {
|
|
var abs int64
|
|
rw.mu.Lock()
|
|
size := int64(rw.size)
|
|
rw.mu.Unlock()
|
|
switch whence {
|
|
case io.SeekStart:
|
|
abs = offset
|
|
case io.SeekCurrent:
|
|
abs = int64(rw.out) + offset
|
|
case io.SeekEnd:
|
|
abs = size + offset
|
|
default:
|
|
return 0, errInvalidWhence
|
|
}
|
|
if abs < 0 {
|
|
return 0, errNegativeSeek
|
|
}
|
|
if abs > size {
|
|
return offset - (abs - size), errSeekPastEnd
|
|
}
|
|
rw.out = int(abs)
|
|
return abs, nil
|
|
}
|
|
|
|
// Close the buffer returning memory to the pool
|
|
func (rw *RW) Close() error {
|
|
rw.mu.Lock()
|
|
defer rw.mu.Unlock()
|
|
rw.signalWrite() // signal more data available
|
|
rw.pool.PutN(rw.pages)
|
|
clear(rw.pages)
|
|
rw.pages = nil
|
|
rw.pool.PutN(rw.reserved)
|
|
clear(rw.reserved)
|
|
rw.reserved = nil
|
|
return nil
|
|
}
|
|
|
|
// Size returns the number of bytes in the buffer
|
|
func (rw *RW) Size() int64 {
|
|
rw.mu.Lock()
|
|
defer rw.mu.Unlock()
|
|
return int64(rw.size)
|
|
}
|
|
|
|
// Check interfaces
|
|
var (
|
|
_ io.Reader = (*RW)(nil)
|
|
_ io.ReaderFrom = (*RW)(nil)
|
|
_ io.Writer = (*RW)(nil)
|
|
_ io.WriterTo = (*RW)(nil)
|
|
_ io.Seeker = (*RW)(nil)
|
|
_ io.Closer = (*RW)(nil)
|
|
_ DelayAccountinger = (*RW)(nil)
|
|
)
|