1
0
mirror of https://github.com/rclone/rclone.git synced 2026-02-12 22:43:27 +00:00

azureblob: add --azureblob-copy-total-concurrency to limit total multipart copy concurrency

This commit is contained in:
Duncan F
2026-02-10 18:09:35 -08:00
committed by Nick Craig-Wood
parent 83d0c186a7
commit d17425eb1f

View File

@@ -343,6 +343,16 @@ In tests, copy speed increases almost linearly with copy
concurrency.`,
Default: 512,
Advanced: true,
}, {
Name: "copy_total_concurrency",
Help: `Global concurrency limit for multipart copy chunks.
This limits the total number of multipart copy chunks running at once
across all files.
Set to 0 to disable this limiter.`,
Default: 0,
Advanced: true,
}, {
Name: "use_copy_blob",
Help: `Whether to use the Copy Blob API when copying to the same storage account.
@@ -526,6 +536,7 @@ type Options struct {
ChunkSize fs.SizeSuffix `config:"chunk_size"`
CopyCutoff fs.SizeSuffix `config:"copy_cutoff"`
CopyConcurrency int `config:"copy_concurrency"`
CopyTotalConcurrency int `config:"copy_total_concurrency"`
UseCopyBlob bool `config:"use_copy_blob"`
UploadConcurrency int `config:"upload_concurrency"`
ListChunkSize uint `config:"list_chunk"`
@@ -560,6 +571,7 @@ type Fs struct {
cache *bucket.Cache // cache for container creation status
pacer *fs.Pacer // To pace and retry the API calls
uploadToken *pacer.TokenDispenser // control concurrency
copyToken *pacer.TokenDispenser // global multipart copy concurrency limiter
publicAccess container.PublicAccessType // Container Public Access Level
// user delegation cache
@@ -802,6 +814,7 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e
ci: ci,
pacer: fs.NewPacer(ctx, pacer.NewS3(pacer.MinSleep(minSleep), pacer.MaxSleep(maxSleep), pacer.DecayConstant(decayConstant))),
uploadToken: pacer.NewTokenDispenser(ci.Transfers),
copyToken: pacer.NewTokenDispenser(opt.CopyTotalConcurrency),
cache: bucket.NewCache(),
cntSVCcache: make(map[string]*container.Client, 1),
}
@@ -1875,8 +1888,14 @@ func (f *Fs) copyMultipart(ctx context.Context, remote, dstContainer, dstPath st
if gCtx.Err() != nil {
break
}
if f.opt.CopyTotalConcurrency > 0 {
f.copyToken.Get()
}
partNum := partNum // for closure
g.Go(func() error {
if f.opt.CopyTotalConcurrency > 0 {
defer f.copyToken.Put()
}
blockID := bic.newBlockID(partNum)
options := blockblob.StageBlockFromURLOptions{
Range: blob.HTTPRange{