1
0
mirror of https://github.com/rclone/rclone.git synced 2025-12-11 22:03:17 +00:00

Compare commits

...

4 Commits

Author SHA1 Message Date
Nick Craig-Wood
1e207173d9 accounting: add SetMaxCompletedTransfers method to fix bisync race #8815
Before this change bisync adjusted the global MaxCompletedTransfers
variable which caused races.

This adds a SetMaxCompletedTransfers method and uses it in bisync.

The MaxCompletedTransfers global becomes the default. This can be
changed externally if rclone is in use as a library, and the commit
history indicates that MaxCompletedTransfers was added for exactly
this purpose so we try not to break it here.
2025-09-24 10:26:07 +01:00
Nick Craig-Wood
05279e3918 accounting: add RemoveDoneTransfers method to fix bisync race #8815
Before this change bisync was adjusting MaxCompletedTransfers in order
to clear the done transfers from the stats.

This wasn't working (because it was only clearing one transfer) and
was part of a race adjusting MaxCompletedTransfers.

This fixes the problem by introducing a new method RemoveDoneTransfers
to clear the done transfers explicitly and calling it in bisync.
2025-09-24 10:25:29 +01:00
nielash
42a601fbf2 bisync: use t.TempDir() in tests to fix sporadic race #8815 2025-09-24 10:24:30 +01:00
Nick Craig-Wood
9b5e6a7e91 bisync: fix race when CaptureOutput is used concurrently #8815
Before this change CaptureOutput could trip the race detector when
used concurrently. In particular if go routines using the logging are
outlasting the return from `fun()`.

This fixes the problem with a mutex.
2025-09-24 10:24:30 +01:00
6 changed files with 106 additions and 57 deletions

View File

@@ -4,15 +4,19 @@ package bilib
import ( import (
"bytes" "bytes"
"log/slog" "log/slog"
"sync"
"github.com/rclone/rclone/fs/log" "github.com/rclone/rclone/fs/log"
) )
// CaptureOutput runs a function capturing its output at log level INFO. // CaptureOutput runs a function capturing its output at log level INFO.
func CaptureOutput(fun func()) []byte { func CaptureOutput(fun func()) []byte {
var mu sync.Mutex
buf := &bytes.Buffer{} buf := &bytes.Buffer{}
oldLevel := log.Handler.SetLevel(slog.LevelInfo) oldLevel := log.Handler.SetLevel(slog.LevelInfo)
log.Handler.SetOutput(func(level slog.Level, text string) { log.Handler.SetOutput(func(level slog.Level, text string) {
mu.Lock()
defer mu.Unlock()
buf.WriteString(text) buf.WriteString(text)
}) })
defer func() { defer func() {
@@ -20,5 +24,7 @@ func CaptureOutput(fun func()) []byte {
log.Handler.SetLevel(oldLevel) log.Handler.SetLevel(oldLevel)
}() }()
fun() fun()
mu.Lock()
defer mu.Unlock()
return buf.Bytes() return buf.Bytes()
} }

View File

@@ -330,7 +330,7 @@ func testBisync(ctx context.Context, t *testing.T, path1, path2 string) {
baseDir, err := os.Getwd() baseDir, err := os.Getwd()
require.NoError(t, err, "get current directory") require.NoError(t, err, "get current directory")
randName := time.Now().Format("150405") + random.String(2) // some bucket backends don't like dots, keep this short to avoid linux errors randName := time.Now().Format("150405") + random.String(2) // some bucket backends don't like dots, keep this short to avoid linux errors
tempDir := filepath.Join(os.TempDir(), randName) tempDir := filepath.Join(t.TempDir(), randName)
workDir := filepath.Join(tempDir, "workdir") workDir := filepath.Join(tempDir, "workdir")
b := &bisyncTest{ b := &bisyncTest{

View File

@@ -707,8 +707,7 @@ func (b *bisyncRun) modifyListing(ctx context.Context, src fs.Fs, dst fs.Fs, res
prettyprint(dstList.list, "dstList", fs.LogLevelDebug) prettyprint(dstList.list, "dstList", fs.LogLevelDebug)
// clear stats so we only do this once // clear stats so we only do this once
accounting.MaxCompletedTransfers = 0 accounting.Stats(ctx).RemoveDoneTransfers()
accounting.Stats(ctx).PruneTransfers()
} }
if b.DebugName != "" { if b.DebugName != "" {

View File

@@ -245,10 +245,8 @@ func (b *bisyncRun) fastCopy(ctx context.Context, fsrc, fdst fs.Fs, files bilib.
} }
} }
b.SyncCI = fs.GetConfig(ctxCopy) // allows us to request graceful shutdown b.SyncCI = fs.GetConfig(ctxCopy) // allows us to request graceful shutdown
if accounting.MaxCompletedTransfers != -1 { accounting.Stats(ctxCopy).SetMaxCompletedTransfers(-1) // we need a complete list in the event of graceful shutdown
accounting.MaxCompletedTransfers = -1 // we need a complete list in the event of graceful shutdown
}
ctxCopy, b.CancelSync = context.WithCancel(ctxCopy) ctxCopy, b.CancelSync = context.WithCancel(ctxCopy)
b.testFn() b.testFn()
err := sync.Sync(ctxCopy, fdst, fsrc, b.opt.CreateEmptySrcDirs) err := sync.Sync(ctxCopy, fdst, fsrc, b.opt.CreateEmptySrcDirs)

View File

@@ -22,48 +22,52 @@ const (
averageStopAfter = time.Minute averageStopAfter = time.Minute
) )
// MaxCompletedTransfers specifies maximum number of completed transfers in startedTransfers list // MaxCompletedTransfers specifies the default maximum number of
// completed transfers in startedTransfers list. This can be adjusted
// for a given StatsInfo by calling the SetMaxCompletedTransfers
// method.
var MaxCompletedTransfers = 100 var MaxCompletedTransfers = 100
// StatsInfo accounts all transfers // StatsInfo accounts all transfers
// N.B.: if this struct is modified, please remember to also update sum() function in stats_groups // N.B.: if this struct is modified, please remember to also update sum() function in stats_groups
// to correctly count the updated fields // to correctly count the updated fields
type StatsInfo struct { type StatsInfo struct {
mu sync.RWMutex mu sync.RWMutex
ctx context.Context ctx context.Context
ci *fs.ConfigInfo ci *fs.ConfigInfo
bytes int64 bytes int64
errors int64 errors int64
lastError error lastError error
fatalError bool fatalError bool
retryError bool retryError bool
retryAfter time.Time retryAfter time.Time
checks int64 checks int64
checking *transferMap checking *transferMap
checkQueue int checkQueue int
checkQueueSize int64 checkQueueSize int64
transfers int64 transfers int64
transferring *transferMap transferring *transferMap
transferQueue int transferQueue int
transferQueueSize int64 transferQueueSize int64
listed int64 listed int64
renames int64 renames int64
renameQueue int renameQueue int
renameQueueSize int64 renameQueueSize int64
deletes int64 deletes int64
deletesSize int64 deletesSize int64
deletedDirs int64 deletedDirs int64
inProgress *inProgress inProgress *inProgress
startedTransfers []*Transfer // currently active transfers startedTransfers []*Transfer // currently active transfers
oldTimeRanges timeRanges // a merged list of time ranges for the transfers oldTimeRanges timeRanges // a merged list of time ranges for the transfers
oldDuration time.Duration // duration of transfers we have culled oldDuration time.Duration // duration of transfers we have culled
group string group string
startTime time.Time // the moment these stats were initialized or reset startTime time.Time // the moment these stats were initialized or reset
average averageValues average averageValues
serverSideCopies int64 serverSideCopies int64
serverSideCopyBytes int64 serverSideCopyBytes int64
serverSideMoves int64 serverSideMoves int64
serverSideMoveBytes int64 serverSideMoveBytes int64
maxCompletedTransfers int
} }
type averageValues struct { type averageValues struct {
@@ -81,17 +85,26 @@ type averageValues struct {
func NewStats(ctx context.Context) *StatsInfo { func NewStats(ctx context.Context) *StatsInfo {
ci := fs.GetConfig(ctx) ci := fs.GetConfig(ctx)
s := &StatsInfo{ s := &StatsInfo{
ctx: ctx, ctx: ctx,
ci: ci, ci: ci,
checking: newTransferMap(ci.Checkers, "checking"), checking: newTransferMap(ci.Checkers, "checking"),
transferring: newTransferMap(ci.Transfers, "transferring"), transferring: newTransferMap(ci.Transfers, "transferring"),
inProgress: newInProgress(ctx), inProgress: newInProgress(ctx),
startTime: time.Now(), startTime: time.Now(),
average: averageValues{}, average: averageValues{},
maxCompletedTransfers: MaxCompletedTransfers,
} }
return s return s
} }
// SetMaxCompletedTransfers sets the maximum number of completed transfers to keep.
func (s *StatsInfo) SetMaxCompletedTransfers(n int) *StatsInfo {
s.mu.Lock()
s.maxCompletedTransfers = n
s.mu.Unlock()
return s
}
// RemoteStats returns stats for rc // RemoteStats returns stats for rc
// //
// If short is true then the transfers and checkers won't be added. // If short is true then the transfers and checkers won't be added.
@@ -912,22 +925,31 @@ func (s *StatsInfo) RemoveTransfer(transfer *Transfer) {
} }
// PruneTransfers makes sure there aren't too many old transfers by removing // PruneTransfers makes sure there aren't too many old transfers by removing
// single finished transfer. // a single finished transfer. Returns true if it removed a transfer.
func (s *StatsInfo) PruneTransfers() { func (s *StatsInfo) PruneTransfers() bool {
if MaxCompletedTransfers < 0 {
return
}
s.mu.Lock() s.mu.Lock()
defer s.mu.Unlock()
if s.maxCompletedTransfers < 0 {
return false
}
removed := false
// remove a transfer from the start if we are over quota // remove a transfer from the start if we are over quota
if len(s.startedTransfers) > MaxCompletedTransfers+s.ci.Transfers { if len(s.startedTransfers) > s.maxCompletedTransfers+s.ci.Transfers {
for i, tr := range s.startedTransfers { for i, tr := range s.startedTransfers {
if tr.IsDone() { if tr.IsDone() {
s._removeTransfer(tr, i) s._removeTransfer(tr, i)
removed = true
break break
} }
} }
} }
s.mu.Unlock() return removed
}
// RemoveDoneTransfers removes all Done transfers.
func (s *StatsInfo) RemoveDoneTransfers() {
for s.PruneTransfers() {
}
} }
// AddServerSideMove counts a server side move // AddServerSideMove counts a server side move

View File

@@ -465,3 +465,27 @@ func TestPruneTransfers(t *testing.T) {
}) })
} }
} }
func TestRemoveDoneTransfers(t *testing.T) {
ctx := context.Background()
s := NewStats(ctx)
const transfers = 10
for i := int64(1); i <= int64(transfers); i++ {
s.AddTransfer(&Transfer{
startedAt: time.Unix(i, 0),
completedAt: time.Unix(i+1, 0),
})
}
s.mu.Lock()
assert.Equal(t, time.Duration(transfers)*time.Second, s._totalDuration())
assert.Equal(t, transfers, len(s.startedTransfers))
s.mu.Unlock()
s.RemoveDoneTransfers()
s.mu.Lock()
assert.Equal(t, time.Duration(transfers)*time.Second, s._totalDuration())
assert.Equal(t, transfers, len(s.startedTransfers))
s.mu.Unlock()
}