1
0
mirror of https://github.com/rclone/rclone.git synced 2026-02-03 18:23:30 +00:00

Compare commits

...

3 Commits

Author SHA1 Message Date
Nick Craig-Wood
2887806f33 pacer: fix deadlock between pacer token and --max-connections
It was possible in the presence of --max-connections and recursive
calls to the pacer to deadlock it leaving all connections waiting on
either a max connection token or a pacer token.

This fixes the problem by making sure we return the pacer token on
schedule if we take it.

This also short circuits the pacer token if sleepTime is 0.
2026-02-03 16:40:54 +00:00
Nick Craig-Wood
9ed4295e34 pacer: fix deadlock with --max-connections
If the pacer was used recursively and --max-connections was in use
then it could deadlock if all the connections were in use at the time
of recursive call (likely).

This affected the azureblob backend because when it receives an
InvalidBlockOrBlob error it attempts to clear the condition before
retrying. This in turn involves recursively calling the pacer.

This fixes the problem by skipping the --max-connections check if the
pacer is called recursively.

The recursive detection is done by stack inspection which isn't ideal,
but the alternative would be to add ctx to all >1,000 pacer calls. The
benchmark reveals stack inspection takes about 55nS per stack level so
it is relatively cheap.
2025-09-01 15:37:49 +01:00
Nick Craig-Wood
2fa1a52f22 Revert "azureblob: fix deadlock with --max-connections with InvalidBlockOrBlob errors"
This reverts commit 0c1902cc6037d81eaf95e931172879517a25d529.

This turns out not to be sufficient so we need a better approach
2025-08-30 12:03:50 +01:00
3 changed files with 131 additions and 22 deletions

View File

@@ -2765,8 +2765,6 @@ func (o *Object) clearUncommittedBlocks(ctx context.Context) (err error) {
blockList blockblob.GetBlockListResponse
properties *blob.GetPropertiesResponse
options *blockblob.CommitBlockListOptions
// Use temporary pacer as this can be called recursively which can cause a deadlock with --max-connections
pacer = fs.NewPacer(ctx, pacer.NewS3(pacer.MinSleep(minSleep), pacer.MaxSleep(maxSleep), pacer.DecayConstant(decayConstant)))
)
properties, err = o.readMetaDataAlways(ctx)
@@ -2778,7 +2776,7 @@ func (o *Object) clearUncommittedBlocks(ctx context.Context) (err error) {
if objectExists {
// Get the committed block list
err = pacer.Call(func() (bool, error) {
err = o.fs.pacer.Call(func() (bool, error) {
blockList, err = blockBlobSVC.GetBlockList(ctx, blockblob.BlockListTypeAll, nil)
return o.fs.shouldRetry(ctx, err)
})
@@ -2820,7 +2818,7 @@ func (o *Object) clearUncommittedBlocks(ctx context.Context) (err error) {
// Commit only the committed blocks
fs.Debugf(o, "Committing %d blocks to remove uncommitted blocks", len(blockIDs))
err = pacer.Call(func() (bool, error) {
err = o.fs.pacer.Call(func() (bool, error) {
_, err := blockBlobSVC.CommitBlockList(ctx, blockIDs, options)
return o.fs.shouldRetry(ctx, err)
})

View File

@@ -4,6 +4,8 @@ package pacer
import (
"errors"
"fmt"
"runtime"
"strings"
"sync"
"time"
@@ -153,31 +155,37 @@ func (p *Pacer) ModifyCalculator(f func(Calculator)) {
// This must be called as a pair with endCall.
//
// This waits for the pacer token
func (p *Pacer) beginCall() {
func (p *Pacer) beginCall(limitConnections bool) {
// pacer starts with a token in and whenever we take one out
// XXX ms later we put another in. We could do this with a
// Ticker more accurately, but then we'd have to work out how
// not to run it when it wasn't needed
<-p.pacer
if p.maxConnections > 0 {
<-p.connTokens
}
p.mu.Lock()
// Restart the timer
go func(t time.Duration) {
time.Sleep(t)
p.pacer <- struct{}{}
}(p.state.SleepTime)
sleepTime := p.state.SleepTime
p.mu.Unlock()
if sleepTime > 0 {
<-p.pacer
// Restart the timer
go func(t time.Duration) {
time.Sleep(t)
p.pacer <- struct{}{}
}(sleepTime)
}
if limitConnections {
<-p.connTokens
}
}
// endCall implements the pacing algorithm
//
// This should calculate a new sleepTime. It takes a boolean as to
// whether the operation should be retried or not.
func (p *Pacer) endCall(retry bool, err error) {
if p.maxConnections > 0 {
func (p *Pacer) endCall(retry bool, err error, limitConnections bool) {
if limitConnections {
p.connTokens <- struct{}{}
}
p.mu.Lock()
@@ -191,13 +199,44 @@ func (p *Pacer) endCall(retry bool, err error) {
p.mu.Unlock()
}
// Detect the pacer being called reentrantly.
//
// This looks for Pacer.call in the call stack and returns true if it
// is found.
//
// Ideally we would do this by passing a context about but there are
// an awful lot of Pacer calls!
//
// This is only needed when p.maxConnections > 0 which isn't a common
// configuration so adding a bit of extra slowdown here is not a
// problem.
func pacerReentered() bool {
var pcs [48]uintptr
n := runtime.Callers(3, pcs[:]) // skip runtime.Callers, pacerReentered and call
frames := runtime.CallersFrames(pcs[:n])
for {
f, more := frames.Next()
if strings.HasSuffix(f.Function, "(*Pacer).call") {
return true
}
if !more {
break
}
}
return false
}
// call implements Call but with settable retries
func (p *Pacer) call(fn Paced, retries int) (err error) {
var retry bool
limitConnections := false
if p.maxConnections > 0 && !pacerReentered() {
limitConnections = true
}
for i := 1; i <= retries; i++ {
p.beginCall()
p.beginCall(limitConnections)
retry, err = p.invoker(i, retries, fn)
p.endCall(retry, err)
p.endCall(retry, err, limitConnections)
if !retry {
break
}

View File

@@ -108,7 +108,7 @@ func waitForPace(p *Pacer, duration time.Duration) (when time.Time) {
func TestBeginCall(t *testing.T) {
p := New(MaxConnectionsOption(10), CalculatorOption(NewDefault(MinSleep(1*time.Millisecond))))
emptyTokens(p)
go p.beginCall()
go p.beginCall(true)
if !waitForPace(p, 10*time.Millisecond).IsZero() {
t.Errorf("beginSleep fired too early #1")
}
@@ -131,7 +131,7 @@ func TestBeginCall(t *testing.T) {
func TestBeginCallZeroConnections(t *testing.T) {
p := New(MaxConnectionsOption(0), CalculatorOption(NewDefault(MinSleep(1*time.Millisecond))))
emptyTokens(p)
go p.beginCall()
go p.beginCall(false)
if !waitForPace(p, 10*time.Millisecond).IsZero() {
t.Errorf("beginSleep fired too early #1")
}
@@ -257,7 +257,7 @@ func TestEndCall(t *testing.T) {
p := New(MaxConnectionsOption(5))
emptyTokens(p)
p.state.ConsecutiveRetries = 1
p.endCall(true, nil)
p.endCall(true, nil, true)
assert.Equal(t, 1, len(p.connTokens))
assert.Equal(t, 2, p.state.ConsecutiveRetries)
}
@@ -266,7 +266,7 @@ func TestEndCallZeroConnections(t *testing.T) {
p := New(MaxConnectionsOption(0))
emptyTokens(p)
p.state.ConsecutiveRetries = 1
p.endCall(false, nil)
p.endCall(false, nil, false)
assert.Equal(t, 0, len(p.connTokens))
assert.Equal(t, 0, p.state.ConsecutiveRetries)
}
@@ -353,6 +353,78 @@ func TestCallParallel(t *testing.T) {
wait.Broadcast()
}
func BenchmarkPacerReentered(b *testing.B) {
for b.Loop() {
_ = pacerReentered()
}
}
func BenchmarkPacerReentered100(b *testing.B) {
var fn func(level int)
fn = func(level int) {
if level > 0 {
fn(level - 1)
return
}
for b.Loop() {
_ = pacerReentered()
}
}
fn(100)
}
func TestCallMaxConnectionsRecursiveDeadlock(t *testing.T) {
p := New(CalculatorOption(NewDefault(MinSleep(1*time.Millisecond), MaxSleep(2*time.Millisecond))))
p.SetMaxConnections(1)
dp := &dummyPaced{retry: false}
err := p.Call(func() (bool, error) {
// check we have taken the connection token
// no tokens left means deadlock on the recursive call
assert.Equal(t, 0, len(p.connTokens))
return false, p.Call(dp.fn)
})
assert.Equal(t, 1, dp.called)
assert.Equal(t, errFoo, err)
}
func TestCallMaxConnectionsRecursiveDeadlock2(t *testing.T) {
p := New(CalculatorOption(NewDefault(MinSleep(1*time.Millisecond), MaxSleep(2*time.Millisecond))))
p.SetMaxConnections(1)
dp := &dummyPaced{retry: false}
wg := new(sync.WaitGroup)
// Normal
for range 100 {
wg.Add(1)
go func() {
defer wg.Done()
err := p.Call(func() (bool, error) {
// check we have taken the connection token
assert.Equal(t, 0, len(p.connTokens))
return false, nil
})
assert.NoError(t, err)
}()
// Now attempt a recursive call
wg.Add(1)
go func() {
defer wg.Done()
err := p.Call(func() (bool, error) {
// check we have taken the connection token
assert.Equal(t, 0, len(p.connTokens))
// Do recursive call
return false, p.Call(dp.fn)
})
assert.Equal(t, errFoo, err)
}()
}
// Tidy up
wg.Wait()
}
func TestRetryAfterError_NonNilErr(t *testing.T) {
orig := errors.New("test failure")
dur := 2 * time.Second