From b0981ce0412c011287adfaebcc2d9feb5fed0af5 Mon Sep 17 00:00:00 2001 From: Nick Craig-Wood Date: Mon, 16 Feb 2026 17:26:02 +0000 Subject: [PATCH] operations: multithread copy: grab memory before making go routines This reduces the number of go routines which can get out of hand when using large --transfers and --multi-thread-streams from potentially --multi-thread-streams * --transfers Go routines to --max-memory / --multi-thread-chunk-size It serializes the memory allocator in each transfer which should be good for performance and reduce lock contention. --- fs/operations/multithread.go | 37 ++++++++++++++++++++---------------- 1 file changed, 21 insertions(+), 16 deletions(-) diff --git a/fs/operations/multithread.go b/fs/operations/multithread.go index 9abd5f743..c106a4565 100644 --- a/fs/operations/multithread.go +++ b/fs/operations/multithread.go @@ -64,25 +64,15 @@ type multiThreadCopyState struct { } // Copy a single chunk into place -func (mc *multiThreadCopyState) copyChunk(ctx context.Context, chunk int, writer fs.ChunkWriter) (err error) { +func (mc *multiThreadCopyState) copyChunk(ctx context.Context, chunk int, writer fs.ChunkWriter, start, end, size int64, rw *pool.RW) (err error) { defer func() { + if !mc.noBuffering { + fs.CheckClose(rw, &err) + } if err != nil { fs.Debugf(mc.src, "multi-thread copy: chunk %d/%d failed: %v", chunk+1, mc.numChunks, err) } }() - start := int64(chunk) * mc.partSize - if start >= mc.size { - return nil - } - end := min(start+mc.partSize, mc.size) - size := end - start - - // Reserve the memory first so we don't open the source and wait for memory buffers for ages - var rw *pool.RW - if !mc.noBuffering { - rw = multipart.NewRW().Reserve(size) - defer fs.CheckClose(rw, &err) - } fs.Debugf(mc.src, "multi-thread copy: chunk %d/%d (%d-%d) size %v starting", chunk+1, mc.numChunks, start, end, fs.SizeSuffix(size)) @@ -226,9 +216,24 @@ func multiThreadCopy(ctx context.Context, f fs.Fs, remote string, src fs.Object, if gCtx.Err() != nil { break } - chunk := chunk + + // Work out how big and where the chunk is + start := int64(chunk) * mc.partSize + if start >= mc.size { + continue + } + end := min(start+mc.partSize, mc.size) + size := end - start + + // Reserve the memory first so we don't open the source and wait for memory buffers for ages + // This also avoids creating an excess of goroutines all waiting on memory. + var rw *pool.RW + if !mc.noBuffering { + rw = multipart.NewRW().Reserve(size) + } + g.Go(func() error { - return mc.copyChunk(gCtx, chunk, chunkWriter) + return mc.copyChunk(gCtx, chunk, chunkWriter, start, end, size, rw) }) }