From 7d97ba0a0f470354036042e60ef337c5805d7424 Mon Sep 17 00:00:00 2001 From: Nick Craig-Wood Date: Tue, 22 Apr 2025 17:20:03 +0100 Subject: [PATCH] pool: fix deadlock with --max-memory and multipart transfers Because multipart transfers can need more than one buffer to complete, if transfers was set very high, it was possible for lots of multipart transfers to start, grab fewer buffers than chunk size, then deadlock because no more memory was available. This fixes the problem by introducing a reservation system which the multipart transfer uses to ensure it can reserve all the memory for one chunk before starting. --- fs/operations/multithread.go | 2 +- lib/multipart/multipart.go | 2 +- lib/pool/pool.go | 46 ++++++++++++++++++++++++++++++++---- lib/pool/reader_writer.go | 27 ++++++++++++++++++++- 4 files changed, 70 insertions(+), 7 deletions(-) diff --git a/fs/operations/multithread.go b/fs/operations/multithread.go index d6753eb14..8eb9a1ad7 100644 --- a/fs/operations/multithread.go +++ b/fs/operations/multithread.go @@ -92,7 +92,7 @@ func (mc *multiThreadCopyState) copyChunk(ctx context.Context, chunk int, writer rs = rc } else { // Read the chunk into buffered reader - rw := multipart.NewRW() + rw := multipart.NewRW().Reserve(size) defer fs.CheckClose(rw, &err) _, err = io.CopyN(rw, rc, size) if err != nil { diff --git a/lib/multipart/multipart.go b/lib/multipart/multipart.go index 3e5cb4de5..2bbcd85d3 100644 --- a/lib/multipart/multipart.go +++ b/lib/multipart/multipart.go @@ -93,7 +93,7 @@ func UploadMultipart(ctx context.Context, src fs.ObjectInfo, in io.Reader, opt U for partNum := int64(0); !finished; partNum++ { // Get a block of memory from the pool and token which limits concurrency. tokens.Get() - rw := NewRW() + rw := NewRW().Reserve(chunkSize) if acc != nil { rw.SetAccounting(acc.AccountRead) } diff --git a/lib/pool/pool.go b/lib/pool/pool.go index a4a5dc752..6bafe8886 100644 --- a/lib/pool/pool.go +++ b/lib/pool/pool.go @@ -182,8 +182,34 @@ func (bp *Pool) release(mem int64) { totalMemory.Release(mem) } +// Reserve buffers for use. Blocks until they are free. +// +// Doesn't allocate any memory. +// +// Must be released by calling GetReserved() which releases 1 buffer or +// Release() to release any number of buffers. +func (bp *Pool) Reserve(buffers int) { + waitTime := time.Millisecond + for { + err := bp.acquire(int64(buffers) * int64(bp.bufferSize)) + if err == nil { + break + } + fs.Logf(nil, "Failed to get reservation for buffer, waiting for %v: %v", waitTime, err) + time.Sleep(waitTime) + waitTime *= 2 + } +} + +// Release previously Reserved buffers. +// +// Doesn't free any memory. +func (bp *Pool) Release(buffers int) { + bp.release(int64(buffers) * int64(bp.bufferSize)) +} + // Get a buffer from the pool or allocate one -func (bp *Pool) Get() []byte { +func (bp *Pool) getBlock(reserved bool) []byte { bp.mu.Lock() var buf []byte waitTime := time.Millisecond @@ -193,9 +219,11 @@ func (bp *Pool) Get() []byte { break } else { var err error - bp.mu.Unlock() - err = bp.acquire(int64(bp.bufferSize)) - bp.mu.Lock() + if !reserved { + bp.mu.Unlock() + err = bp.acquire(int64(bp.bufferSize)) + bp.mu.Lock() + } if err == nil { buf, err = bp.alloc(bp.bufferSize) if err == nil { @@ -217,6 +245,16 @@ func (bp *Pool) Get() []byte { return buf } +// Get a buffer from the pool or allocate one +func (bp *Pool) Get() []byte { + return bp.getBlock(false) +} + +// GetReserved gets a reserved buffer from the pool or allocates one. +func (bp *Pool) GetReserved() []byte { + return bp.getBlock(true) +} + // freeBuffer returns mem to the os if required - call with lock held func (bp *Pool) freeBuffer(mem []byte) { err := bp.free(mem) diff --git a/lib/pool/reader_writer.go b/lib/pool/reader_writer.go index f7c263bd1..54172689d 100644 --- a/lib/pool/reader_writer.go +++ b/lib/pool/reader_writer.go @@ -35,6 +35,8 @@ type RW struct { // Read side Variables out int // offset we are reading from reads int // count how many times the data has been read + + reserved int // number of buffers reserved } var ( @@ -59,6 +61,20 @@ func NewRW(pool *Pool) *RW { return rw } +// Reserve bytes of memory. +// +// Reserve, but don't allocation n bytes of memory. +// +// This is rounded up to the nearest buffer page size. +func (rw *RW) Reserve(n int64) *RW { + rw.mu.Lock() + defer rw.mu.Unlock() + buffers := int((n + int64(rw.pool.bufferSize) - 1) / int64(rw.pool.bufferSize)) + rw.pool.Reserve(buffers) + rw.reserved += buffers + return rw +} + // SetAccounting should be provided with a function which will be // called after every read from the RW. // @@ -200,7 +216,12 @@ func (rw *RW) writePage() (page []byte) { if len(rw.pages) > 0 && rw.lastOffset < rw.pool.bufferSize { return rw.pages[len(rw.pages)-1][rw.lastOffset:] } - page = rw.pool.Get() + if rw.reserved > 0 { + page = rw.pool.GetReserved() + rw.reserved-- + } else { + page = rw.pool.Get() + } rw.pages = append(rw.pages, page) rw.lastOffset = 0 return page @@ -321,6 +342,10 @@ func (rw *RW) Close() error { rw.pool.Put(page) } rw.pages = nil + if rw.reserved > 0 { + rw.pool.Release(rw.reserved) + rw.reserved = 0 + } return nil }