From 5050f42b8b8f15b459b02033b3dfe2cf0db97ef6 Mon Sep 17 00:00:00 2001 From: Nick Craig-Wood Date: Thu, 24 Apr 2025 16:50:03 +0100 Subject: [PATCH] pool: unify memory between multipart and asyncreader to use one pool Before this the multipart code and asyncreader used separate pools which is inefficient on memory use. --- fs/asyncreader/asyncreader.go | 23 ++++++------------ lib/multipart/multipart.go | 26 +++----------------- lib/pool/pool.go | 46 ++++++++++++++++++++++++++--------- lib/pool/pool_test.go | 3 +-- 4 files changed, 46 insertions(+), 52 deletions(-) diff --git a/fs/asyncreader/asyncreader.go b/fs/asyncreader/asyncreader.go index b645fa46f..808419f9a 100644 --- a/fs/asyncreader/asyncreader.go +++ b/fs/asyncreader/asyncreader.go @@ -7,7 +7,6 @@ import ( "errors" "io" "sync" - "time" "github.com/rclone/rclone/fs" "github.com/rclone/rclone/lib/pool" @@ -16,10 +15,8 @@ import ( const ( // BufferSize is the default size of the async buffer - BufferSize = 1024 * 1024 - softStartInitial = 4 * 1024 - bufferCacheSize = 64 // max number of buffers to keep in cache - bufferCacheFlushTime = 5 * time.Second // flush the cached buffers after this long + BufferSize = pool.BufferSize + softStartInitial = 4 * 1024 ) // ErrorStreamAbandoned is returned when the input is closed before the end of the stream @@ -42,6 +39,7 @@ type AsyncReader struct { closed bool // whether we have closed the underlying stream mu sync.Mutex // lock for Read/WriteTo/Abandon/Close ci *fs.ConfigInfo // for reading config + pool *pool.Pool // pool to get memory from } // New returns a reader that will asynchronously read from @@ -58,7 +56,8 @@ func New(ctx context.Context, rd io.ReadCloser, buffers int) (*AsyncReader, erro return nil, errors.New("nil reader supplied") } a := &AsyncReader{ - ci: fs.GetConfig(ctx), + ci: fs.GetConfig(ctx), + pool: pool.Global(), } a.init(rd, buffers) return a, nil @@ -104,24 +103,16 @@ func (a *AsyncReader) init(rd io.ReadCloser, buffers int) { }() } -// bufferPool is a global pool of buffers -var bufferPool *pool.Pool -var bufferPoolOnce sync.Once - // return the buffer to the pool (clearing it) func (a *AsyncReader) putBuffer(b *buffer) { - bufferPool.Put(b.buf) + a.pool.Put(b.buf) b.buf = nil } // get a buffer from the pool func (a *AsyncReader) getBuffer() *buffer { - bufferPoolOnce.Do(func() { - // Initialise the buffer pool when used - bufferPool = pool.New(bufferCacheFlushTime, BufferSize, bufferCacheSize, a.ci.UseMmap) - }) return &buffer{ - buf: bufferPool.Get(), + buf: a.pool.Get(), } } diff --git a/lib/multipart/multipart.go b/lib/multipart/multipart.go index 3e5cb4de5..f3b959ce5 100644 --- a/lib/multipart/multipart.go +++ b/lib/multipart/multipart.go @@ -5,8 +5,6 @@ import ( "context" "fmt" "io" - "sync" - "time" "github.com/rclone/rclone/fs" "github.com/rclone/rclone/fs/accounting" @@ -18,30 +16,12 @@ import ( const ( // BufferSize is the default size of the pages used in the reader - BufferSize = 1024 * 1024 - bufferCacheSize = 64 // max number of buffers to keep in cache - bufferCacheFlushTime = 5 * time.Second // flush the cached buffers after this long + BufferSize = pool.BufferSize ) -// bufferPool is a global pool of buffers -var ( - bufferPool *pool.Pool - bufferPoolOnce sync.Once -) - -// get a buffer pool -func getPool() *pool.Pool { - bufferPoolOnce.Do(func() { - ci := fs.GetConfig(context.Background()) - // Initialise the buffer pool when used - bufferPool = pool.New(bufferCacheFlushTime, BufferSize, bufferCacheSize, ci.UseMmap) - }) - return bufferPool -} - -// NewRW gets a pool.RW using the multipart pool +// NewRW gets a pool.RW using the global pool func NewRW() *pool.RW { - return pool.NewRW(getPool()) + return pool.NewRW(pool.Global()) } // UploadMultipartOptions options for the generic multipart upload diff --git a/lib/pool/pool.go b/lib/pool/pool.go index a4a5dc752..6a94bb814 100644 --- a/lib/pool/pool.go +++ b/lib/pool/pool.go @@ -13,6 +13,15 @@ import ( "golang.org/x/sync/semaphore" ) +const ( + // BufferSize is the page size of the Global() pool + BufferSize = 1024 * 1024 + // BufferCacheSize is the max number of buffers to keep in the cache for the Global() pool + BufferCacheSize = 64 + // BufferCacheFlushTime is the max time to keep buffers in the Global() pool + BufferCacheFlushTime = 5 * time.Second +) + // Pool of internal buffers // // We hold buffers in cache. Every time we Get or Put we update @@ -67,6 +76,17 @@ func New(flushTime time.Duration, bufferSize, poolSize int, useMmap bool) *Pool return nil } } + + // Initialise total memory limit if required + totalMemoryInit.Do(func() { + ci := fs.GetConfig(context.Background()) + + // Set max buffer memory limiter + if ci.MaxBufferMemory > 0 { + totalMemory = semaphore.NewWeighted(int64(ci.MaxBufferMemory)) + } + }) + bp.timer = time.AfterFunc(flushTime, bp.flushAged) return bp } @@ -157,20 +177,10 @@ func (bp *Pool) updateMinFill() { // acquire mem bytes of memory func (bp *Pool) acquire(mem int64) error { - ctx := context.Background() - - totalMemoryInit.Do(func() { - ci := fs.GetConfig(ctx) - - // Set max buffer memory limiter - if ci.MaxBufferMemory > 0 { - totalMemory = semaphore.NewWeighted(int64(ci.MaxBufferMemory)) - } - }) - if totalMemory == nil { return nil } + ctx := context.Background() return totalMemory.Acquire(ctx, mem) } @@ -248,3 +258,17 @@ func (bp *Pool) Put(buf []byte) { bp.updateMinFill() bp.kickFlusher() } + +// bufferPool is a global pool of buffers +var bufferPool *Pool +var bufferPoolOnce sync.Once + +// Global gets a global pool of BufferSize, BufferCacheSize, BufferCacheFlushTime. +func Global() *Pool { + bufferPoolOnce.Do(func() { + // Initialise the buffer pool when used + ci := fs.GetConfig(context.Background()) + bufferPool = New(BufferCacheFlushTime, BufferSize, BufferCacheSize, ci.UseMmap) + }) + return bufferPool +} diff --git a/lib/pool/pool_test.go b/lib/pool/pool_test.go index 23618194a..425acbd7a 100644 --- a/lib/pool/pool_test.go +++ b/lib/pool/pool_test.go @@ -240,11 +240,10 @@ func TestPoolMaxBufferMemory(t *testing.T) { totalMemoryInit = sync.Once{} // reset the sync.Once as it likely has been used totalMemory = nil bp := New(60*time.Second, 4096, 2, true) + assert.NotNil(t, totalMemory) assert.Equal(t, bp.alloced, 0) - assert.Nil(t, totalMemory) buf := bp.Get() - assert.NotNil(t, totalMemory) bp.Put(buf) assert.Equal(t, bp.alloced, 1)