mirror of
https://github.com/rclone/rclone.git
synced 2026-02-17 18:09:03 +00:00
pacer: fix deadlock between pacer token and --max-connections
It was possible in the presence of --max-connections and recursive calls to the pacer to deadlock it leaving all connections waiting on either a max connection token or a pacer token. This fixes the problem by making sure we return the pacer token on schedule if we take it. This also short circuits the pacer token if sleepTime is 0.
This commit is contained in:
@@ -159,18 +159,24 @@ func (p *Pacer) beginCall(limitConnections bool) {
|
||||
// XXX ms later we put another in. We could do this with a
|
||||
// Ticker more accurately, but then we'd have to work out how
|
||||
// not to run it when it wasn't needed
|
||||
<-p.pacer
|
||||
|
||||
p.mu.Lock()
|
||||
sleepTime := p.state.SleepTime
|
||||
p.mu.Unlock()
|
||||
|
||||
if sleepTime > 0 {
|
||||
<-p.pacer
|
||||
|
||||
// Restart the timer
|
||||
go func(t time.Duration) {
|
||||
time.Sleep(t)
|
||||
p.pacer <- struct{}{}
|
||||
}(sleepTime)
|
||||
}
|
||||
|
||||
if limitConnections {
|
||||
<-p.connTokens
|
||||
}
|
||||
|
||||
p.mu.Lock()
|
||||
// Restart the timer
|
||||
go func(t time.Duration) {
|
||||
time.Sleep(t)
|
||||
p.pacer <- struct{}{}
|
||||
}(p.state.SleepTime)
|
||||
p.mu.Unlock()
|
||||
}
|
||||
|
||||
// endCall implements the pacing algorithm
|
||||
|
||||
@@ -367,6 +367,43 @@ func TestCallMaxConnectionsRecursiveDeadlock(t *testing.T) {
|
||||
assert.Equal(t, errFoo, err)
|
||||
}
|
||||
|
||||
func TestCallMaxConnectionsRecursiveDeadlock2(t *testing.T) {
|
||||
p := New(CalculatorOption(NewDefault(MinSleep(1*time.Millisecond), MaxSleep(2*time.Millisecond))))
|
||||
p.SetMaxConnections(1)
|
||||
dp := &dummyPaced{retry: false}
|
||||
wg := new(sync.WaitGroup)
|
||||
|
||||
// Normal
|
||||
for range 100 {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
err := p.Call(func() (bool, error) {
|
||||
// check we have taken the connection token
|
||||
assert.Equal(t, 0, len(p.connTokens))
|
||||
return false, nil
|
||||
})
|
||||
assert.NoError(t, err)
|
||||
}()
|
||||
|
||||
// Now attempt a recursive call
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
err := p.Call(func() (bool, error) {
|
||||
// check we have taken the connection token
|
||||
assert.Equal(t, 0, len(p.connTokens))
|
||||
// Do recursive call
|
||||
return false, p.Call(dp.fn)
|
||||
})
|
||||
assert.Equal(t, errFoo, err)
|
||||
}()
|
||||
}
|
||||
|
||||
// Tidy up
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func TestRetryAfterError_NonNilErr(t *testing.T) {
|
||||
orig := errors.New("test failure")
|
||||
dur := 2 * time.Second
|
||||
|
||||
Reference in New Issue
Block a user