mirror of
https://github.com/rclone/rclone.git
synced 2026-02-03 18:23:30 +00:00
Compare commits
3 Commits
test
...
fix-azureb
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
2887806f33 | ||
|
|
9ed4295e34 | ||
|
|
2fa1a52f22 |
@@ -2765,8 +2765,6 @@ func (o *Object) clearUncommittedBlocks(ctx context.Context) (err error) {
|
||||
blockList blockblob.GetBlockListResponse
|
||||
properties *blob.GetPropertiesResponse
|
||||
options *blockblob.CommitBlockListOptions
|
||||
// Use temporary pacer as this can be called recursively which can cause a deadlock with --max-connections
|
||||
pacer = fs.NewPacer(ctx, pacer.NewS3(pacer.MinSleep(minSleep), pacer.MaxSleep(maxSleep), pacer.DecayConstant(decayConstant)))
|
||||
)
|
||||
|
||||
properties, err = o.readMetaDataAlways(ctx)
|
||||
@@ -2778,7 +2776,7 @@ func (o *Object) clearUncommittedBlocks(ctx context.Context) (err error) {
|
||||
|
||||
if objectExists {
|
||||
// Get the committed block list
|
||||
err = pacer.Call(func() (bool, error) {
|
||||
err = o.fs.pacer.Call(func() (bool, error) {
|
||||
blockList, err = blockBlobSVC.GetBlockList(ctx, blockblob.BlockListTypeAll, nil)
|
||||
return o.fs.shouldRetry(ctx, err)
|
||||
})
|
||||
@@ -2820,7 +2818,7 @@ func (o *Object) clearUncommittedBlocks(ctx context.Context) (err error) {
|
||||
|
||||
// Commit only the committed blocks
|
||||
fs.Debugf(o, "Committing %d blocks to remove uncommitted blocks", len(blockIDs))
|
||||
err = pacer.Call(func() (bool, error) {
|
||||
err = o.fs.pacer.Call(func() (bool, error) {
|
||||
_, err := blockBlobSVC.CommitBlockList(ctx, blockIDs, options)
|
||||
return o.fs.shouldRetry(ctx, err)
|
||||
})
|
||||
|
||||
@@ -4,6 +4,8 @@ package pacer
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"runtime"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@@ -153,31 +155,37 @@ func (p *Pacer) ModifyCalculator(f func(Calculator)) {
|
||||
// This must be called as a pair with endCall.
|
||||
//
|
||||
// This waits for the pacer token
|
||||
func (p *Pacer) beginCall() {
|
||||
func (p *Pacer) beginCall(limitConnections bool) {
|
||||
// pacer starts with a token in and whenever we take one out
|
||||
// XXX ms later we put another in. We could do this with a
|
||||
// Ticker more accurately, but then we'd have to work out how
|
||||
// not to run it when it wasn't needed
|
||||
<-p.pacer
|
||||
if p.maxConnections > 0 {
|
||||
<-p.connTokens
|
||||
}
|
||||
|
||||
p.mu.Lock()
|
||||
// Restart the timer
|
||||
go func(t time.Duration) {
|
||||
time.Sleep(t)
|
||||
p.pacer <- struct{}{}
|
||||
}(p.state.SleepTime)
|
||||
sleepTime := p.state.SleepTime
|
||||
p.mu.Unlock()
|
||||
|
||||
if sleepTime > 0 {
|
||||
<-p.pacer
|
||||
|
||||
// Restart the timer
|
||||
go func(t time.Duration) {
|
||||
time.Sleep(t)
|
||||
p.pacer <- struct{}{}
|
||||
}(sleepTime)
|
||||
}
|
||||
|
||||
if limitConnections {
|
||||
<-p.connTokens
|
||||
}
|
||||
}
|
||||
|
||||
// endCall implements the pacing algorithm
|
||||
//
|
||||
// This should calculate a new sleepTime. It takes a boolean as to
|
||||
// whether the operation should be retried or not.
|
||||
func (p *Pacer) endCall(retry bool, err error) {
|
||||
if p.maxConnections > 0 {
|
||||
func (p *Pacer) endCall(retry bool, err error, limitConnections bool) {
|
||||
if limitConnections {
|
||||
p.connTokens <- struct{}{}
|
||||
}
|
||||
p.mu.Lock()
|
||||
@@ -191,13 +199,44 @@ func (p *Pacer) endCall(retry bool, err error) {
|
||||
p.mu.Unlock()
|
||||
}
|
||||
|
||||
// Detect the pacer being called reentrantly.
|
||||
//
|
||||
// This looks for Pacer.call in the call stack and returns true if it
|
||||
// is found.
|
||||
//
|
||||
// Ideally we would do this by passing a context about but there are
|
||||
// an awful lot of Pacer calls!
|
||||
//
|
||||
// This is only needed when p.maxConnections > 0 which isn't a common
|
||||
// configuration so adding a bit of extra slowdown here is not a
|
||||
// problem.
|
||||
func pacerReentered() bool {
|
||||
var pcs [48]uintptr
|
||||
n := runtime.Callers(3, pcs[:]) // skip runtime.Callers, pacerReentered and call
|
||||
frames := runtime.CallersFrames(pcs[:n])
|
||||
for {
|
||||
f, more := frames.Next()
|
||||
if strings.HasSuffix(f.Function, "(*Pacer).call") {
|
||||
return true
|
||||
}
|
||||
if !more {
|
||||
break
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// call implements Call but with settable retries
|
||||
func (p *Pacer) call(fn Paced, retries int) (err error) {
|
||||
var retry bool
|
||||
limitConnections := false
|
||||
if p.maxConnections > 0 && !pacerReentered() {
|
||||
limitConnections = true
|
||||
}
|
||||
for i := 1; i <= retries; i++ {
|
||||
p.beginCall()
|
||||
p.beginCall(limitConnections)
|
||||
retry, err = p.invoker(i, retries, fn)
|
||||
p.endCall(retry, err)
|
||||
p.endCall(retry, err, limitConnections)
|
||||
if !retry {
|
||||
break
|
||||
}
|
||||
|
||||
@@ -108,7 +108,7 @@ func waitForPace(p *Pacer, duration time.Duration) (when time.Time) {
|
||||
func TestBeginCall(t *testing.T) {
|
||||
p := New(MaxConnectionsOption(10), CalculatorOption(NewDefault(MinSleep(1*time.Millisecond))))
|
||||
emptyTokens(p)
|
||||
go p.beginCall()
|
||||
go p.beginCall(true)
|
||||
if !waitForPace(p, 10*time.Millisecond).IsZero() {
|
||||
t.Errorf("beginSleep fired too early #1")
|
||||
}
|
||||
@@ -131,7 +131,7 @@ func TestBeginCall(t *testing.T) {
|
||||
func TestBeginCallZeroConnections(t *testing.T) {
|
||||
p := New(MaxConnectionsOption(0), CalculatorOption(NewDefault(MinSleep(1*time.Millisecond))))
|
||||
emptyTokens(p)
|
||||
go p.beginCall()
|
||||
go p.beginCall(false)
|
||||
if !waitForPace(p, 10*time.Millisecond).IsZero() {
|
||||
t.Errorf("beginSleep fired too early #1")
|
||||
}
|
||||
@@ -257,7 +257,7 @@ func TestEndCall(t *testing.T) {
|
||||
p := New(MaxConnectionsOption(5))
|
||||
emptyTokens(p)
|
||||
p.state.ConsecutiveRetries = 1
|
||||
p.endCall(true, nil)
|
||||
p.endCall(true, nil, true)
|
||||
assert.Equal(t, 1, len(p.connTokens))
|
||||
assert.Equal(t, 2, p.state.ConsecutiveRetries)
|
||||
}
|
||||
@@ -266,7 +266,7 @@ func TestEndCallZeroConnections(t *testing.T) {
|
||||
p := New(MaxConnectionsOption(0))
|
||||
emptyTokens(p)
|
||||
p.state.ConsecutiveRetries = 1
|
||||
p.endCall(false, nil)
|
||||
p.endCall(false, nil, false)
|
||||
assert.Equal(t, 0, len(p.connTokens))
|
||||
assert.Equal(t, 0, p.state.ConsecutiveRetries)
|
||||
}
|
||||
@@ -353,6 +353,78 @@ func TestCallParallel(t *testing.T) {
|
||||
wait.Broadcast()
|
||||
}
|
||||
|
||||
func BenchmarkPacerReentered(b *testing.B) {
|
||||
for b.Loop() {
|
||||
_ = pacerReentered()
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkPacerReentered100(b *testing.B) {
|
||||
var fn func(level int)
|
||||
fn = func(level int) {
|
||||
if level > 0 {
|
||||
fn(level - 1)
|
||||
return
|
||||
}
|
||||
for b.Loop() {
|
||||
_ = pacerReentered()
|
||||
}
|
||||
|
||||
}
|
||||
fn(100)
|
||||
}
|
||||
|
||||
func TestCallMaxConnectionsRecursiveDeadlock(t *testing.T) {
|
||||
p := New(CalculatorOption(NewDefault(MinSleep(1*time.Millisecond), MaxSleep(2*time.Millisecond))))
|
||||
p.SetMaxConnections(1)
|
||||
dp := &dummyPaced{retry: false}
|
||||
err := p.Call(func() (bool, error) {
|
||||
// check we have taken the connection token
|
||||
// no tokens left means deadlock on the recursive call
|
||||
assert.Equal(t, 0, len(p.connTokens))
|
||||
return false, p.Call(dp.fn)
|
||||
})
|
||||
assert.Equal(t, 1, dp.called)
|
||||
assert.Equal(t, errFoo, err)
|
||||
}
|
||||
|
||||
func TestCallMaxConnectionsRecursiveDeadlock2(t *testing.T) {
|
||||
p := New(CalculatorOption(NewDefault(MinSleep(1*time.Millisecond), MaxSleep(2*time.Millisecond))))
|
||||
p.SetMaxConnections(1)
|
||||
dp := &dummyPaced{retry: false}
|
||||
wg := new(sync.WaitGroup)
|
||||
|
||||
// Normal
|
||||
for range 100 {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
err := p.Call(func() (bool, error) {
|
||||
// check we have taken the connection token
|
||||
assert.Equal(t, 0, len(p.connTokens))
|
||||
return false, nil
|
||||
})
|
||||
assert.NoError(t, err)
|
||||
}()
|
||||
|
||||
// Now attempt a recursive call
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
err := p.Call(func() (bool, error) {
|
||||
// check we have taken the connection token
|
||||
assert.Equal(t, 0, len(p.connTokens))
|
||||
// Do recursive call
|
||||
return false, p.Call(dp.fn)
|
||||
})
|
||||
assert.Equal(t, errFoo, err)
|
||||
}()
|
||||
}
|
||||
|
||||
// Tidy up
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func TestRetryAfterError_NonNilErr(t *testing.T) {
|
||||
orig := errors.New("test failure")
|
||||
dur := 2 * time.Second
|
||||
|
||||
Reference in New Issue
Block a user