mirror of
https://github.com/rclone/rclone.git
synced 2026-02-17 09:59:10 +00:00
operations: multithread copy: grab memory before making go routines
This reduces the number of go routines which can get out of hand when using large --transfers and --multi-thread-streams from potentially --multi-thread-streams * --transfers Go routines to --max-memory / --multi-thread-chunk-size It serializes the memory allocator in each transfer which should be good for performance and reduce lock contention.
This commit is contained in:
@@ -64,25 +64,15 @@ type multiThreadCopyState struct {
|
||||
}
|
||||
|
||||
// Copy a single chunk into place
|
||||
func (mc *multiThreadCopyState) copyChunk(ctx context.Context, chunk int, writer fs.ChunkWriter) (err error) {
|
||||
func (mc *multiThreadCopyState) copyChunk(ctx context.Context, chunk int, writer fs.ChunkWriter, start, end, size int64, rw *pool.RW) (err error) {
|
||||
defer func() {
|
||||
if !mc.noBuffering {
|
||||
fs.CheckClose(rw, &err)
|
||||
}
|
||||
if err != nil {
|
||||
fs.Debugf(mc.src, "multi-thread copy: chunk %d/%d failed: %v", chunk+1, mc.numChunks, err)
|
||||
}
|
||||
}()
|
||||
start := int64(chunk) * mc.partSize
|
||||
if start >= mc.size {
|
||||
return nil
|
||||
}
|
||||
end := min(start+mc.partSize, mc.size)
|
||||
size := end - start
|
||||
|
||||
// Reserve the memory first so we don't open the source and wait for memory buffers for ages
|
||||
var rw *pool.RW
|
||||
if !mc.noBuffering {
|
||||
rw = multipart.NewRW().Reserve(size)
|
||||
defer fs.CheckClose(rw, &err)
|
||||
}
|
||||
|
||||
fs.Debugf(mc.src, "multi-thread copy: chunk %d/%d (%d-%d) size %v starting", chunk+1, mc.numChunks, start, end, fs.SizeSuffix(size))
|
||||
|
||||
@@ -226,9 +216,24 @@ func multiThreadCopy(ctx context.Context, f fs.Fs, remote string, src fs.Object,
|
||||
if gCtx.Err() != nil {
|
||||
break
|
||||
}
|
||||
chunk := chunk
|
||||
|
||||
// Work out how big and where the chunk is
|
||||
start := int64(chunk) * mc.partSize
|
||||
if start >= mc.size {
|
||||
continue
|
||||
}
|
||||
end := min(start+mc.partSize, mc.size)
|
||||
size := end - start
|
||||
|
||||
// Reserve the memory first so we don't open the source and wait for memory buffers for ages
|
||||
// This also avoids creating an excess of goroutines all waiting on memory.
|
||||
var rw *pool.RW
|
||||
if !mc.noBuffering {
|
||||
rw = multipart.NewRW().Reserve(size)
|
||||
}
|
||||
|
||||
g.Go(func() error {
|
||||
return mc.copyChunk(gCtx, chunk, chunkWriter)
|
||||
return mc.copyChunk(gCtx, chunk, chunkWriter, start, end, size, rw)
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user