diff --git a/fs/operations/multithread.go b/fs/operations/multithread.go index 9abd5f743..c106a4565 100644 --- a/fs/operations/multithread.go +++ b/fs/operations/multithread.go @@ -64,25 +64,15 @@ type multiThreadCopyState struct { } // Copy a single chunk into place -func (mc *multiThreadCopyState) copyChunk(ctx context.Context, chunk int, writer fs.ChunkWriter) (err error) { +func (mc *multiThreadCopyState) copyChunk(ctx context.Context, chunk int, writer fs.ChunkWriter, start, end, size int64, rw *pool.RW) (err error) { defer func() { + if !mc.noBuffering { + fs.CheckClose(rw, &err) + } if err != nil { fs.Debugf(mc.src, "multi-thread copy: chunk %d/%d failed: %v", chunk+1, mc.numChunks, err) } }() - start := int64(chunk) * mc.partSize - if start >= mc.size { - return nil - } - end := min(start+mc.partSize, mc.size) - size := end - start - - // Reserve the memory first so we don't open the source and wait for memory buffers for ages - var rw *pool.RW - if !mc.noBuffering { - rw = multipart.NewRW().Reserve(size) - defer fs.CheckClose(rw, &err) - } fs.Debugf(mc.src, "multi-thread copy: chunk %d/%d (%d-%d) size %v starting", chunk+1, mc.numChunks, start, end, fs.SizeSuffix(size)) @@ -226,9 +216,24 @@ func multiThreadCopy(ctx context.Context, f fs.Fs, remote string, src fs.Object, if gCtx.Err() != nil { break } - chunk := chunk + + // Work out how big and where the chunk is + start := int64(chunk) * mc.partSize + if start >= mc.size { + continue + } + end := min(start+mc.partSize, mc.size) + size := end - start + + // Reserve the memory first so we don't open the source and wait for memory buffers for ages + // This also avoids creating an excess of goroutines all waiting on memory. + var rw *pool.RW + if !mc.noBuffering { + rw = multipart.NewRW().Reserve(size) + } + g.Go(func() error { - return mc.copyChunk(gCtx, chunk, chunkWriter) + return mc.copyChunk(gCtx, chunk, chunkWriter, start, end, size, rw) }) }