1
0
mirror of https://github.com/rclone/rclone.git synced 2026-02-12 22:43:27 +00:00

Compare commits

...

7 Commits

Author SHA1 Message Date
Nick Craig-Wood
983a4c3686 azureblob: add server side copy real time accounting 2026-02-12 16:44:50 +00:00
Nick Craig-Wood
d516515dfe operations: add method to real time account server side copy
Before this change server side copies would show at 0% until they were
done then show at 100%.

With support from the backend, server side copies can now be accounted
in real time. This will only work for backends which have been
modified and themselves get feedback about how copies are going.
2026-02-12 16:44:50 +00:00
Duncan F
d17425eb1f azureblob: add --azureblob-copy-total-concurrency to limit total multipart copy concurrency 2026-02-11 13:36:47 +00:00
Nick Craig-Wood
83d0c186a7 pacer: re-read the sleep time as it may be stale
Before this change we read sleepTime before acquiring the pacer token
and uses that possibly stale value to schedule the token return. When
many goroutines enter while sleepTime is high (e.g., 10s), each
goroutine caches this 10s value. Even if successful calls rapidly
decay the pacer state to 0, the queued goroutines still schedule 10s
token returns, so the queue drains at 1 req/10s for the entire herd.
This can create multi‑minute delays even after the pacer has dropped
to 0.

After this change we refresh the sleep time after getting the token.

This problem was introduced by the desire to skip reading the pacer
token entirely when sleepTime is 0 in high performance backends (eg
s3, azure blob).
2026-02-06 13:03:18 +00:00
Nick Craig-Wood
2887806f33 pacer: fix deadlock between pacer token and --max-connections
It was possible in the presence of --max-connections and recursive
calls to the pacer to deadlock it leaving all connections waiting on
either a max connection token or a pacer token.

This fixes the problem by making sure we return the pacer token on
schedule if we take it.

This also short circuits the pacer token if sleepTime is 0.
2026-02-03 16:40:54 +00:00
Nick Craig-Wood
9ed4295e34 pacer: fix deadlock with --max-connections
If the pacer was used recursively and --max-connections was in use
then it could deadlock if all the connections were in use at the time
of recursive call (likely).

This affected the azureblob backend because when it receives an
InvalidBlockOrBlob error it attempts to clear the condition before
retrying. This in turn involves recursively calling the pacer.

This fixes the problem by skipping the --max-connections check if the
pacer is called recursively.

The recursive detection is done by stack inspection which isn't ideal,
but the alternative would be to add ctx to all >1,000 pacer calls. The
benchmark reveals stack inspection takes about 55nS per stack level so
it is relatively cheap.
2025-09-01 15:37:49 +01:00
Nick Craig-Wood
2fa1a52f22 Revert "azureblob: fix deadlock with --max-connections with InvalidBlockOrBlob errors"
This reverts commit 0c1902cc6037d81eaf95e931172879517a25d529.

This turns out not to be sufficient so we need a better approach
2025-08-30 12:03:50 +01:00
8 changed files with 341 additions and 24 deletions

View File

@@ -52,6 +52,7 @@ import (
"github.com/rclone/rclone/lib/multipart"
"github.com/rclone/rclone/lib/pacer"
"github.com/rclone/rclone/lib/pool"
"github.com/rclone/rclone/lib/transferaccounter"
"golang.org/x/sync/errgroup"
)
@@ -343,6 +344,16 @@ In tests, copy speed increases almost linearly with copy
concurrency.`,
Default: 512,
Advanced: true,
}, {
Name: "copy_total_concurrency",
Help: `Global concurrency limit for multipart copy chunks.
This limits the total number of multipart copy chunks running at once
across all files.
Set to 0 to disable this limiter.`,
Default: 0,
Advanced: true,
}, {
Name: "use_copy_blob",
Help: `Whether to use the Copy Blob API when copying to the same storage account.
@@ -526,6 +537,7 @@ type Options struct {
ChunkSize fs.SizeSuffix `config:"chunk_size"`
CopyCutoff fs.SizeSuffix `config:"copy_cutoff"`
CopyConcurrency int `config:"copy_concurrency"`
CopyTotalConcurrency int `config:"copy_total_concurrency"`
UseCopyBlob bool `config:"use_copy_blob"`
UploadConcurrency int `config:"upload_concurrency"`
ListChunkSize uint `config:"list_chunk"`
@@ -560,6 +572,7 @@ type Fs struct {
cache *bucket.Cache // cache for container creation status
pacer *fs.Pacer // To pace and retry the API calls
uploadToken *pacer.TokenDispenser // control concurrency
copyToken *pacer.TokenDispenser // global multipart copy concurrency limiter
publicAccess container.PublicAccessType // Container Public Access Level
// user delegation cache
@@ -802,6 +815,7 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e
ci: ci,
pacer: fs.NewPacer(ctx, pacer.NewS3(pacer.MinSleep(minSleep), pacer.MaxSleep(maxSleep), pacer.DecayConstant(decayConstant))),
uploadToken: pacer.NewTokenDispenser(ci.Transfers),
copyToken: pacer.NewTokenDispenser(opt.CopyTotalConcurrency),
cache: bucket.NewCache(),
cntSVCcache: make(map[string]*container.Client, 1),
}
@@ -1865,18 +1879,26 @@ func (f *Fs) copyMultipart(ctx context.Context, remote, dstContainer, dstPath st
blockIDs = make([]string, numParts) // list of blocks for finalize
g, gCtx = errgroup.WithContext(ctx)
checker = newCheckForInvalidBlockOrBlob("copy", o)
account = transferaccounter.Get(ctx)
)
g.SetLimit(f.opt.CopyConcurrency)
fs.Debugf(o, "Starting multipart copy with %d parts of size %v", numParts, fs.SizeSuffix(partSize))
account.Start()
for partNum := uint64(0); partNum < uint64(numParts); partNum++ {
// Fail fast, in case an errgroup managed function returns an error
// gCtx is cancelled. There is no point in uploading all the other parts.
if gCtx.Err() != nil {
break
}
if f.opt.CopyTotalConcurrency > 0 {
f.copyToken.Get()
}
partNum := partNum // for closure
g.Go(func() error {
if f.opt.CopyTotalConcurrency > 0 {
defer f.copyToken.Put()
}
blockID := bic.newBlockID(partNum)
options := blockblob.StageBlockFromURLOptions{
Range: blob.HTTPRange{
@@ -1910,6 +1932,7 @@ func (f *Fs) copyMultipart(ctx context.Context, remote, dstContainer, dstPath st
return fmt.Errorf("multipart copy: failed to copy chunk %d with %v bytes: %w", partNum+1, -1, err)
}
blockIDs[partNum] = blockID
account.Add(partSize)
return nil
})
}
@@ -2765,8 +2788,6 @@ func (o *Object) clearUncommittedBlocks(ctx context.Context) (err error) {
blockList blockblob.GetBlockListResponse
properties *blob.GetPropertiesResponse
options *blockblob.CommitBlockListOptions
// Use temporary pacer as this can be called recursively which can cause a deadlock with --max-connections
pacer = fs.NewPacer(ctx, pacer.NewS3(pacer.MinSleep(minSleep), pacer.MaxSleep(maxSleep), pacer.DecayConstant(decayConstant)))
)
properties, err = o.readMetaDataAlways(ctx)
@@ -2778,7 +2799,7 @@ func (o *Object) clearUncommittedBlocks(ctx context.Context) (err error) {
if objectExists {
// Get the committed block list
err = pacer.Call(func() (bool, error) {
err = o.fs.pacer.Call(func() (bool, error) {
blockList, err = blockBlobSVC.GetBlockList(ctx, blockblob.BlockListTypeAll, nil)
return o.fs.shouldRetry(ctx, err)
})
@@ -2820,7 +2841,7 @@ func (o *Object) clearUncommittedBlocks(ctx context.Context) (err error) {
// Commit only the committed blocks
fs.Debugf(o, "Committing %d blocks to remove uncommitted blocks", len(blockIDs))
err = pacer.Call(func() (bool, error) {
err = o.fs.pacer.Call(func() (bool, error) {
_, err := blockBlobSVC.CommitBlockList(ctx, blockIDs, options)
return o.fs.shouldRetry(ctx, err)
})

View File

@@ -11,6 +11,7 @@ import (
"unicode/utf8"
"github.com/rclone/rclone/fs/rc"
"github.com/rclone/rclone/lib/transferaccounter"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/asyncreader"
@@ -312,6 +313,15 @@ func (acc *Account) serverSideEnd(n int64) {
}
}
// NewServerSideCopyAccounter returns a TransferAccounter for a server
// side copy and a new ctx with it embedded
func (acc *Account) NewServerSideCopyAccounter(ctx context.Context) (context.Context, *transferaccounter.TransferAccounter) {
return transferaccounter.New(ctx, func(n int64) {
acc.stats.AddServerSideCopyBytes(n)
acc.accountReadNoNetwork(n)
})
}
// ServerSideCopyEnd accounts for a read of n bytes in a server-side copy
func (acc *Account) ServerSideCopyEnd(n int64) {
acc.stats.AddServerSideCopy(n)
@@ -358,6 +368,17 @@ func (acc *Account) accountRead(n int) {
acc.limitPerFileBandwidth(n)
}
// Account the read if not using network (eg for server side copies)
func (acc *Account) accountReadNoNetwork(n int64) {
// Update Stats
acc.values.mu.Lock()
acc.values.lpBytes += int(n)
acc.values.bytes += n
acc.values.mu.Unlock()
acc.stats.BytesNoNetwork(n)
}
// read bytes from the io.Reader passed in and account them
func (acc *Account) read(in io.Reader, p []byte) (n int, err error) {
bytesUntilLimit, err := acc.checkReadBefore()

View File

@@ -938,6 +938,13 @@ func (s *StatsInfo) AddServerSideMove(n int64) {
s.mu.Unlock()
}
// AddServerSideCopyBytes adds bytes for a server side copy
func (s *StatsInfo) AddServerSideCopyBytes(n int64) {
s.mu.Lock()
s.serverSideCopyBytes += n
s.mu.Unlock()
}
// AddServerSideCopy counts a server side copy
func (s *StatsInfo) AddServerSideCopy(n int64) {
s.mu.Lock()

View File

@@ -148,9 +148,14 @@ func (c *copy) serverSideCopy(ctx context.Context) (actionTaken string, newDst f
}
in := c.tr.Account(ctx, nil) // account the transfer
in.ServerSideTransferStart()
newDst, err = doCopy(ctx, c.src, c.remoteForCopy)
newCtx, ta := in.NewServerSideCopyAccounter(ctx)
newDst, err = doCopy(newCtx, c.src, c.remoteForCopy)
if err == nil {
in.ServerSideCopyEnd(newDst.Size()) // account the bytes for the server-side transfer
var n int64
if !ta.Started() {
n = newDst.Size()
}
in.ServerSideCopyEnd(n) // account the bytes for the server-side transfer
}
_ = in.Close()
if errors.Is(err, fs.ErrorCantCopy) {

View File

@@ -4,6 +4,8 @@ package pacer
import (
"errors"
"fmt"
"runtime"
"strings"
"sync"
"time"
@@ -153,31 +155,43 @@ func (p *Pacer) ModifyCalculator(f func(Calculator)) {
// This must be called as a pair with endCall.
//
// This waits for the pacer token
func (p *Pacer) beginCall() {
func (p *Pacer) beginCall(limitConnections bool) {
// pacer starts with a token in and whenever we take one out
// XXX ms later we put another in. We could do this with a
// Ticker more accurately, but then we'd have to work out how
// not to run it when it wasn't needed
<-p.pacer
if p.maxConnections > 0 {
<-p.connTokens
}
p.mu.Lock()
// Restart the timer
go func(t time.Duration) {
time.Sleep(t)
p.pacer <- struct{}{}
}(p.state.SleepTime)
sleepTime := p.state.SleepTime
p.mu.Unlock()
if sleepTime > 0 {
<-p.pacer
// Re-read the sleep time as it may be stale
// after waiting for the pacer token
p.mu.Lock()
sleepTime = p.state.SleepTime
p.mu.Unlock()
// Restart the timer
go func(t time.Duration) {
time.Sleep(t)
p.pacer <- struct{}{}
}(sleepTime)
}
if limitConnections {
<-p.connTokens
}
}
// endCall implements the pacing algorithm
//
// This should calculate a new sleepTime. It takes a boolean as to
// whether the operation should be retried or not.
func (p *Pacer) endCall(retry bool, err error) {
if p.maxConnections > 0 {
func (p *Pacer) endCall(retry bool, err error, limitConnections bool) {
if limitConnections {
p.connTokens <- struct{}{}
}
p.mu.Lock()
@@ -191,13 +205,44 @@ func (p *Pacer) endCall(retry bool, err error) {
p.mu.Unlock()
}
// Detect the pacer being called reentrantly.
//
// This looks for Pacer.call in the call stack and returns true if it
// is found.
//
// Ideally we would do this by passing a context about but there are
// an awful lot of Pacer calls!
//
// This is only needed when p.maxConnections > 0 which isn't a common
// configuration so adding a bit of extra slowdown here is not a
// problem.
func pacerReentered() bool {
var pcs [48]uintptr
n := runtime.Callers(3, pcs[:]) // skip runtime.Callers, pacerReentered and call
frames := runtime.CallersFrames(pcs[:n])
for {
f, more := frames.Next()
if strings.HasSuffix(f.Function, "(*Pacer).call") {
return true
}
if !more {
break
}
}
return false
}
// call implements Call but with settable retries
func (p *Pacer) call(fn Paced, retries int) (err error) {
var retry bool
limitConnections := false
if p.maxConnections > 0 && !pacerReentered() {
limitConnections = true
}
for i := 1; i <= retries; i++ {
p.beginCall()
p.beginCall(limitConnections)
retry, err = p.invoker(i, retries, fn)
p.endCall(retry, err)
p.endCall(retry, err, limitConnections)
if !retry {
break
}

View File

@@ -108,7 +108,7 @@ func waitForPace(p *Pacer, duration time.Duration) (when time.Time) {
func TestBeginCall(t *testing.T) {
p := New(MaxConnectionsOption(10), CalculatorOption(NewDefault(MinSleep(1*time.Millisecond))))
emptyTokens(p)
go p.beginCall()
go p.beginCall(true)
if !waitForPace(p, 10*time.Millisecond).IsZero() {
t.Errorf("beginSleep fired too early #1")
}
@@ -131,7 +131,7 @@ func TestBeginCall(t *testing.T) {
func TestBeginCallZeroConnections(t *testing.T) {
p := New(MaxConnectionsOption(0), CalculatorOption(NewDefault(MinSleep(1*time.Millisecond))))
emptyTokens(p)
go p.beginCall()
go p.beginCall(false)
if !waitForPace(p, 10*time.Millisecond).IsZero() {
t.Errorf("beginSleep fired too early #1")
}
@@ -257,7 +257,7 @@ func TestEndCall(t *testing.T) {
p := New(MaxConnectionsOption(5))
emptyTokens(p)
p.state.ConsecutiveRetries = 1
p.endCall(true, nil)
p.endCall(true, nil, true)
assert.Equal(t, 1, len(p.connTokens))
assert.Equal(t, 2, p.state.ConsecutiveRetries)
}
@@ -266,7 +266,7 @@ func TestEndCallZeroConnections(t *testing.T) {
p := New(MaxConnectionsOption(0))
emptyTokens(p)
p.state.ConsecutiveRetries = 1
p.endCall(false, nil)
p.endCall(false, nil, false)
assert.Equal(t, 0, len(p.connTokens))
assert.Equal(t, 0, p.state.ConsecutiveRetries)
}
@@ -353,6 +353,78 @@ func TestCallParallel(t *testing.T) {
wait.Broadcast()
}
func BenchmarkPacerReentered(b *testing.B) {
for b.Loop() {
_ = pacerReentered()
}
}
func BenchmarkPacerReentered100(b *testing.B) {
var fn func(level int)
fn = func(level int) {
if level > 0 {
fn(level - 1)
return
}
for b.Loop() {
_ = pacerReentered()
}
}
fn(100)
}
func TestCallMaxConnectionsRecursiveDeadlock(t *testing.T) {
p := New(CalculatorOption(NewDefault(MinSleep(1*time.Millisecond), MaxSleep(2*time.Millisecond))))
p.SetMaxConnections(1)
dp := &dummyPaced{retry: false}
err := p.Call(func() (bool, error) {
// check we have taken the connection token
// no tokens left means deadlock on the recursive call
assert.Equal(t, 0, len(p.connTokens))
return false, p.Call(dp.fn)
})
assert.Equal(t, 1, dp.called)
assert.Equal(t, errFoo, err)
}
func TestCallMaxConnectionsRecursiveDeadlock2(t *testing.T) {
p := New(CalculatorOption(NewDefault(MinSleep(1*time.Millisecond), MaxSleep(2*time.Millisecond))))
p.SetMaxConnections(1)
dp := &dummyPaced{retry: false}
wg := new(sync.WaitGroup)
// Normal
for range 100 {
wg.Add(1)
go func() {
defer wg.Done()
err := p.Call(func() (bool, error) {
// check we have taken the connection token
assert.Equal(t, 0, len(p.connTokens))
return false, nil
})
assert.NoError(t, err)
}()
// Now attempt a recursive call
wg.Add(1)
go func() {
defer wg.Done()
err := p.Call(func() (bool, error) {
// check we have taken the connection token
assert.Equal(t, 0, len(p.connTokens))
// Do recursive call
return false, p.Call(dp.fn)
})
assert.Equal(t, errFoo, err)
}()
}
// Tidy up
wg.Wait()
}
func TestRetryAfterError_NonNilErr(t *testing.T) {
orig := errors.New("test failure")
dur := 2 * time.Second

View File

@@ -0,0 +1,63 @@
// Package transferaccounter provides utilities for accounting server side transfers.
package transferaccounter
import "context"
// Context key type for accounter
type accounterContextKeyType struct{}
// Context key for accounter
var accounterContextKey = accounterContextKeyType{}
// TransferAccounter is used to account server side and other transfers.
type TransferAccounter struct {
add func(n int64)
started bool
}
// New creates a TransferAccounter using the add function passed in.
//
// Note that the add function should be goroutine safe.
//
// It adds the new TransferAccounter to the context.
func New(ctx context.Context, add func(n int64)) (context.Context, *TransferAccounter) {
ta := &TransferAccounter{
add: add,
}
newCtx := context.WithValue(ctx, accounterContextKey, ta)
return newCtx, ta
}
// Start the transfer. Call this before calling Add().
func (ta *TransferAccounter) Start() {
ta.started = true
}
// Started returns if the transfer has had Start() called or not.
func (ta *TransferAccounter) Started() bool {
return ta.started
}
// Add n bytes to the transfer
func (ta *TransferAccounter) Add(n int64) {
ta.add(n)
}
// A transfer accounter which does nothing
var nullAccounter = &TransferAccounter{
add: func(n int64) {},
}
// Get returns a *TransferAccounter from the ctx.
//
// If none is found it will return a dummy one to keep the code simple.
func Get(ctx context.Context) *TransferAccounter {
if ctx == nil {
return nullAccounter
}
c := ctx.Value(accounterContextKey)
if c == nil {
return nullAccounter
}
return c.(*TransferAccounter)
}

View File

@@ -0,0 +1,83 @@
package transferaccounter
import (
"context"
"testing"
"github.com/stretchr/testify/assert"
)
func TestNew(t *testing.T) {
// Dummy add function
var totalBytes int64
addFn := func(n int64) {
totalBytes += n
}
// Create the accounter
ctx := context.Background()
_, ta := New(ctx, addFn)
// Verify object creation
assert.NotNil(t, ta)
assert.False(t, ta.Started(), "New accounter should not be started by default")
// Test Start()
ta.Start()
assert.True(t, ta.Started(), "Accounter should be started after calling Start()")
// Test Add() logic
ta.Add(100)
ta.Add(50)
assert.Equal(t, int64(150), totalBytes, "The add function should have been called with cumulative values")
}
func TestGet(t *testing.T) {
t.Run("Retrieve existing accounter", func(t *testing.T) {
// Create a specific accounter to identify later
expectedTotal := int64(0)
ctx, originalTa := New(context.Background(), func(n int64) { expectedTotal += n })
// Retrieve it
retrievedTa := Get(ctx)
// Assert it is the exact same pointer
assert.Equal(t, originalTa, retrievedTa)
// Verify functionality passes through
retrievedTa.Add(10)
assert.Equal(t, int64(10), expectedTotal)
})
t.Run("Context does not contain accounter", func(t *testing.T) {
ctx := context.Background()
ta := Get(ctx)
assert.NotNil(t, ta, "Get should never return nil")
assert.Equal(t, nullAccounter, ta, "Should return the global nullAccounter")
})
t.Run("Context is nil", func(t *testing.T) {
ta := Get(nil) //nolint:staticcheck // we want to test this
assert.NotNil(t, ta, "Get should never return nil")
assert.Equal(t, nullAccounter, ta, "Should return the global nullAccounter")
})
}
func TestNullAccounterBehavior(t *testing.T) {
// Ensure the null accounter (returned when context is missing/nil)
// can be called without panicking.
ta := Get(nil) //nolint:staticcheck // we want to test this
assert.NotPanics(t, func() {
ta.Start()
})
// Even after start, it acts as a valid object
assert.True(t, ta.Started())
assert.NotPanics(t, func() {
ta.Add(1000)
})
}