1
0
mirror of https://github.com/rclone/rclone.git synced 2025-12-06 00:03:32 +00:00

Compare commits

...

1 Commits

Author SHA1 Message Date
Nick Craig-Wood
7d97ba0a0f pool: fix deadlock with --max-memory and multipart transfers
Because multipart transfers can need more than one buffer to complete,
if transfers was set very high, it was possible for lots of multipart
transfers to start, grab fewer buffers than chunk size, then deadlock
because no more memory was available.

This fixes the problem by introducing a reservation system which the
multipart transfer uses to ensure it can reserve all the memory for
one chunk before starting.
2025-04-22 17:25:34 +01:00
4 changed files with 70 additions and 7 deletions

View File

@@ -92,7 +92,7 @@ func (mc *multiThreadCopyState) copyChunk(ctx context.Context, chunk int, writer
rs = rc
} else {
// Read the chunk into buffered reader
rw := multipart.NewRW()
rw := multipart.NewRW().Reserve(size)
defer fs.CheckClose(rw, &err)
_, err = io.CopyN(rw, rc, size)
if err != nil {

View File

@@ -93,7 +93,7 @@ func UploadMultipart(ctx context.Context, src fs.ObjectInfo, in io.Reader, opt U
for partNum := int64(0); !finished; partNum++ {
// Get a block of memory from the pool and token which limits concurrency.
tokens.Get()
rw := NewRW()
rw := NewRW().Reserve(chunkSize)
if acc != nil {
rw.SetAccounting(acc.AccountRead)
}

View File

@@ -182,8 +182,34 @@ func (bp *Pool) release(mem int64) {
totalMemory.Release(mem)
}
// Reserve buffers for use. Blocks until they are free.
//
// Doesn't allocate any memory.
//
// Must be released by calling GetReserved() which releases 1 buffer or
// Release() to release any number of buffers.
func (bp *Pool) Reserve(buffers int) {
waitTime := time.Millisecond
for {
err := bp.acquire(int64(buffers) * int64(bp.bufferSize))
if err == nil {
break
}
fs.Logf(nil, "Failed to get reservation for buffer, waiting for %v: %v", waitTime, err)
time.Sleep(waitTime)
waitTime *= 2
}
}
// Release previously Reserved buffers.
//
// Doesn't free any memory.
func (bp *Pool) Release(buffers int) {
bp.release(int64(buffers) * int64(bp.bufferSize))
}
// Get a buffer from the pool or allocate one
func (bp *Pool) Get() []byte {
func (bp *Pool) getBlock(reserved bool) []byte {
bp.mu.Lock()
var buf []byte
waitTime := time.Millisecond
@@ -193,9 +219,11 @@ func (bp *Pool) Get() []byte {
break
} else {
var err error
bp.mu.Unlock()
err = bp.acquire(int64(bp.bufferSize))
bp.mu.Lock()
if !reserved {
bp.mu.Unlock()
err = bp.acquire(int64(bp.bufferSize))
bp.mu.Lock()
}
if err == nil {
buf, err = bp.alloc(bp.bufferSize)
if err == nil {
@@ -217,6 +245,16 @@ func (bp *Pool) Get() []byte {
return buf
}
// Get a buffer from the pool or allocate one
func (bp *Pool) Get() []byte {
return bp.getBlock(false)
}
// GetReserved gets a reserved buffer from the pool or allocates one.
func (bp *Pool) GetReserved() []byte {
return bp.getBlock(true)
}
// freeBuffer returns mem to the os if required - call with lock held
func (bp *Pool) freeBuffer(mem []byte) {
err := bp.free(mem)

View File

@@ -35,6 +35,8 @@ type RW struct {
// Read side Variables
out int // offset we are reading from
reads int // count how many times the data has been read
reserved int // number of buffers reserved
}
var (
@@ -59,6 +61,20 @@ func NewRW(pool *Pool) *RW {
return rw
}
// Reserve bytes of memory.
//
// Reserve, but don't allocation n bytes of memory.
//
// This is rounded up to the nearest buffer page size.
func (rw *RW) Reserve(n int64) *RW {
rw.mu.Lock()
defer rw.mu.Unlock()
buffers := int((n + int64(rw.pool.bufferSize) - 1) / int64(rw.pool.bufferSize))
rw.pool.Reserve(buffers)
rw.reserved += buffers
return rw
}
// SetAccounting should be provided with a function which will be
// called after every read from the RW.
//
@@ -200,7 +216,12 @@ func (rw *RW) writePage() (page []byte) {
if len(rw.pages) > 0 && rw.lastOffset < rw.pool.bufferSize {
return rw.pages[len(rw.pages)-1][rw.lastOffset:]
}
page = rw.pool.Get()
if rw.reserved > 0 {
page = rw.pool.GetReserved()
rw.reserved--
} else {
page = rw.pool.Get()
}
rw.pages = append(rw.pages, page)
rw.lastOffset = 0
return page
@@ -321,6 +342,10 @@ func (rw *RW) Close() error {
rw.pool.Put(page)
}
rw.pages = nil
if rw.reserved > 0 {
rw.pool.Release(rw.reserved)
rw.reserved = 0
}
return nil
}