diff --git a/backend/azureblob/azureblob.go b/backend/azureblob/azureblob.go index 430b72778..c4fdb827a 100644 --- a/backend/azureblob/azureblob.go +++ b/backend/azureblob/azureblob.go @@ -343,6 +343,16 @@ In tests, copy speed increases almost linearly with copy concurrency.`, Default: 512, Advanced: true, + }, { + Name: "copy_total_concurrency", + Help: `Global concurrency limit for multipart copy chunks. + +This limits the total number of multipart copy chunks running at once +across all files. + +Set to 0 to disable this limiter.`, + Default: 0, + Advanced: true, }, { Name: "use_copy_blob", Help: `Whether to use the Copy Blob API when copying to the same storage account. @@ -526,6 +536,7 @@ type Options struct { ChunkSize fs.SizeSuffix `config:"chunk_size"` CopyCutoff fs.SizeSuffix `config:"copy_cutoff"` CopyConcurrency int `config:"copy_concurrency"` + CopyTotalConcurrency int `config:"copy_total_concurrency"` UseCopyBlob bool `config:"use_copy_blob"` UploadConcurrency int `config:"upload_concurrency"` ListChunkSize uint `config:"list_chunk"` @@ -560,6 +571,7 @@ type Fs struct { cache *bucket.Cache // cache for container creation status pacer *fs.Pacer // To pace and retry the API calls uploadToken *pacer.TokenDispenser // control concurrency + copyToken *pacer.TokenDispenser // global multipart copy concurrency limiter publicAccess container.PublicAccessType // Container Public Access Level // user delegation cache @@ -802,6 +814,7 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e ci: ci, pacer: fs.NewPacer(ctx, pacer.NewS3(pacer.MinSleep(minSleep), pacer.MaxSleep(maxSleep), pacer.DecayConstant(decayConstant))), uploadToken: pacer.NewTokenDispenser(ci.Transfers), + copyToken: pacer.NewTokenDispenser(opt.CopyTotalConcurrency), cache: bucket.NewCache(), cntSVCcache: make(map[string]*container.Client, 1), } @@ -1875,8 +1888,14 @@ func (f *Fs) copyMultipart(ctx context.Context, remote, dstContainer, dstPath st if gCtx.Err() != nil { break } + if f.opt.CopyTotalConcurrency > 0 { + f.copyToken.Get() + } partNum := partNum // for closure g.Go(func() error { + if f.opt.CopyTotalConcurrency > 0 { + defer f.copyToken.Put() + } blockID := bic.newBlockID(partNum) options := blockblob.StageBlockFromURLOptions{ Range: blob.HTTPRange{