mirror of
https://github.com/rclone/rclone.git
synced 2025-12-11 22:03:17 +00:00
Compare commits
4 Commits
copilot/fi
...
fix-8815-b
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1e207173d9 | ||
|
|
05279e3918 | ||
|
|
42a601fbf2 | ||
|
|
9b5e6a7e91 |
@@ -4,15 +4,19 @@ package bilib
|
|||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"log/slog"
|
"log/slog"
|
||||||
|
"sync"
|
||||||
|
|
||||||
"github.com/rclone/rclone/fs/log"
|
"github.com/rclone/rclone/fs/log"
|
||||||
)
|
)
|
||||||
|
|
||||||
// CaptureOutput runs a function capturing its output at log level INFO.
|
// CaptureOutput runs a function capturing its output at log level INFO.
|
||||||
func CaptureOutput(fun func()) []byte {
|
func CaptureOutput(fun func()) []byte {
|
||||||
|
var mu sync.Mutex
|
||||||
buf := &bytes.Buffer{}
|
buf := &bytes.Buffer{}
|
||||||
oldLevel := log.Handler.SetLevel(slog.LevelInfo)
|
oldLevel := log.Handler.SetLevel(slog.LevelInfo)
|
||||||
log.Handler.SetOutput(func(level slog.Level, text string) {
|
log.Handler.SetOutput(func(level slog.Level, text string) {
|
||||||
|
mu.Lock()
|
||||||
|
defer mu.Unlock()
|
||||||
buf.WriteString(text)
|
buf.WriteString(text)
|
||||||
})
|
})
|
||||||
defer func() {
|
defer func() {
|
||||||
@@ -20,5 +24,7 @@ func CaptureOutput(fun func()) []byte {
|
|||||||
log.Handler.SetLevel(oldLevel)
|
log.Handler.SetLevel(oldLevel)
|
||||||
}()
|
}()
|
||||||
fun()
|
fun()
|
||||||
|
mu.Lock()
|
||||||
|
defer mu.Unlock()
|
||||||
return buf.Bytes()
|
return buf.Bytes()
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -330,7 +330,7 @@ func testBisync(ctx context.Context, t *testing.T, path1, path2 string) {
|
|||||||
baseDir, err := os.Getwd()
|
baseDir, err := os.Getwd()
|
||||||
require.NoError(t, err, "get current directory")
|
require.NoError(t, err, "get current directory")
|
||||||
randName := time.Now().Format("150405") + random.String(2) // some bucket backends don't like dots, keep this short to avoid linux errors
|
randName := time.Now().Format("150405") + random.String(2) // some bucket backends don't like dots, keep this short to avoid linux errors
|
||||||
tempDir := filepath.Join(os.TempDir(), randName)
|
tempDir := filepath.Join(t.TempDir(), randName)
|
||||||
workDir := filepath.Join(tempDir, "workdir")
|
workDir := filepath.Join(tempDir, "workdir")
|
||||||
|
|
||||||
b := &bisyncTest{
|
b := &bisyncTest{
|
||||||
|
|||||||
@@ -707,8 +707,7 @@ func (b *bisyncRun) modifyListing(ctx context.Context, src fs.Fs, dst fs.Fs, res
|
|||||||
prettyprint(dstList.list, "dstList", fs.LogLevelDebug)
|
prettyprint(dstList.list, "dstList", fs.LogLevelDebug)
|
||||||
|
|
||||||
// clear stats so we only do this once
|
// clear stats so we only do this once
|
||||||
accounting.MaxCompletedTransfers = 0
|
accounting.Stats(ctx).RemoveDoneTransfers()
|
||||||
accounting.Stats(ctx).PruneTransfers()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if b.DebugName != "" {
|
if b.DebugName != "" {
|
||||||
|
|||||||
@@ -245,10 +245,8 @@ func (b *bisyncRun) fastCopy(ctx context.Context, fsrc, fdst fs.Fs, files bilib.
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
b.SyncCI = fs.GetConfig(ctxCopy) // allows us to request graceful shutdown
|
b.SyncCI = fs.GetConfig(ctxCopy) // allows us to request graceful shutdown
|
||||||
if accounting.MaxCompletedTransfers != -1 {
|
accounting.Stats(ctxCopy).SetMaxCompletedTransfers(-1) // we need a complete list in the event of graceful shutdown
|
||||||
accounting.MaxCompletedTransfers = -1 // we need a complete list in the event of graceful shutdown
|
|
||||||
}
|
|
||||||
ctxCopy, b.CancelSync = context.WithCancel(ctxCopy)
|
ctxCopy, b.CancelSync = context.WithCancel(ctxCopy)
|
||||||
b.testFn()
|
b.testFn()
|
||||||
err := sync.Sync(ctxCopy, fdst, fsrc, b.opt.CreateEmptySrcDirs)
|
err := sync.Sync(ctxCopy, fdst, fsrc, b.opt.CreateEmptySrcDirs)
|
||||||
|
|||||||
@@ -22,48 +22,52 @@ const (
|
|||||||
averageStopAfter = time.Minute
|
averageStopAfter = time.Minute
|
||||||
)
|
)
|
||||||
|
|
||||||
// MaxCompletedTransfers specifies maximum number of completed transfers in startedTransfers list
|
// MaxCompletedTransfers specifies the default maximum number of
|
||||||
|
// completed transfers in startedTransfers list. This can be adjusted
|
||||||
|
// for a given StatsInfo by calling the SetMaxCompletedTransfers
|
||||||
|
// method.
|
||||||
var MaxCompletedTransfers = 100
|
var MaxCompletedTransfers = 100
|
||||||
|
|
||||||
// StatsInfo accounts all transfers
|
// StatsInfo accounts all transfers
|
||||||
// N.B.: if this struct is modified, please remember to also update sum() function in stats_groups
|
// N.B.: if this struct is modified, please remember to also update sum() function in stats_groups
|
||||||
// to correctly count the updated fields
|
// to correctly count the updated fields
|
||||||
type StatsInfo struct {
|
type StatsInfo struct {
|
||||||
mu sync.RWMutex
|
mu sync.RWMutex
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
ci *fs.ConfigInfo
|
ci *fs.ConfigInfo
|
||||||
bytes int64
|
bytes int64
|
||||||
errors int64
|
errors int64
|
||||||
lastError error
|
lastError error
|
||||||
fatalError bool
|
fatalError bool
|
||||||
retryError bool
|
retryError bool
|
||||||
retryAfter time.Time
|
retryAfter time.Time
|
||||||
checks int64
|
checks int64
|
||||||
checking *transferMap
|
checking *transferMap
|
||||||
checkQueue int
|
checkQueue int
|
||||||
checkQueueSize int64
|
checkQueueSize int64
|
||||||
transfers int64
|
transfers int64
|
||||||
transferring *transferMap
|
transferring *transferMap
|
||||||
transferQueue int
|
transferQueue int
|
||||||
transferQueueSize int64
|
transferQueueSize int64
|
||||||
listed int64
|
listed int64
|
||||||
renames int64
|
renames int64
|
||||||
renameQueue int
|
renameQueue int
|
||||||
renameQueueSize int64
|
renameQueueSize int64
|
||||||
deletes int64
|
deletes int64
|
||||||
deletesSize int64
|
deletesSize int64
|
||||||
deletedDirs int64
|
deletedDirs int64
|
||||||
inProgress *inProgress
|
inProgress *inProgress
|
||||||
startedTransfers []*Transfer // currently active transfers
|
startedTransfers []*Transfer // currently active transfers
|
||||||
oldTimeRanges timeRanges // a merged list of time ranges for the transfers
|
oldTimeRanges timeRanges // a merged list of time ranges for the transfers
|
||||||
oldDuration time.Duration // duration of transfers we have culled
|
oldDuration time.Duration // duration of transfers we have culled
|
||||||
group string
|
group string
|
||||||
startTime time.Time // the moment these stats were initialized or reset
|
startTime time.Time // the moment these stats were initialized or reset
|
||||||
average averageValues
|
average averageValues
|
||||||
serverSideCopies int64
|
serverSideCopies int64
|
||||||
serverSideCopyBytes int64
|
serverSideCopyBytes int64
|
||||||
serverSideMoves int64
|
serverSideMoves int64
|
||||||
serverSideMoveBytes int64
|
serverSideMoveBytes int64
|
||||||
|
maxCompletedTransfers int
|
||||||
}
|
}
|
||||||
|
|
||||||
type averageValues struct {
|
type averageValues struct {
|
||||||
@@ -81,17 +85,26 @@ type averageValues struct {
|
|||||||
func NewStats(ctx context.Context) *StatsInfo {
|
func NewStats(ctx context.Context) *StatsInfo {
|
||||||
ci := fs.GetConfig(ctx)
|
ci := fs.GetConfig(ctx)
|
||||||
s := &StatsInfo{
|
s := &StatsInfo{
|
||||||
ctx: ctx,
|
ctx: ctx,
|
||||||
ci: ci,
|
ci: ci,
|
||||||
checking: newTransferMap(ci.Checkers, "checking"),
|
checking: newTransferMap(ci.Checkers, "checking"),
|
||||||
transferring: newTransferMap(ci.Transfers, "transferring"),
|
transferring: newTransferMap(ci.Transfers, "transferring"),
|
||||||
inProgress: newInProgress(ctx),
|
inProgress: newInProgress(ctx),
|
||||||
startTime: time.Now(),
|
startTime: time.Now(),
|
||||||
average: averageValues{},
|
average: averageValues{},
|
||||||
|
maxCompletedTransfers: MaxCompletedTransfers,
|
||||||
}
|
}
|
||||||
return s
|
return s
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SetMaxCompletedTransfers sets the maximum number of completed transfers to keep.
|
||||||
|
func (s *StatsInfo) SetMaxCompletedTransfers(n int) *StatsInfo {
|
||||||
|
s.mu.Lock()
|
||||||
|
s.maxCompletedTransfers = n
|
||||||
|
s.mu.Unlock()
|
||||||
|
return s
|
||||||
|
}
|
||||||
|
|
||||||
// RemoteStats returns stats for rc
|
// RemoteStats returns stats for rc
|
||||||
//
|
//
|
||||||
// If short is true then the transfers and checkers won't be added.
|
// If short is true then the transfers and checkers won't be added.
|
||||||
@@ -912,22 +925,31 @@ func (s *StatsInfo) RemoveTransfer(transfer *Transfer) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// PruneTransfers makes sure there aren't too many old transfers by removing
|
// PruneTransfers makes sure there aren't too many old transfers by removing
|
||||||
// single finished transfer.
|
// a single finished transfer. Returns true if it removed a transfer.
|
||||||
func (s *StatsInfo) PruneTransfers() {
|
func (s *StatsInfo) PruneTransfers() bool {
|
||||||
if MaxCompletedTransfers < 0 {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
s.mu.Lock()
|
s.mu.Lock()
|
||||||
|
defer s.mu.Unlock()
|
||||||
|
if s.maxCompletedTransfers < 0 {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
removed := false
|
||||||
// remove a transfer from the start if we are over quota
|
// remove a transfer from the start if we are over quota
|
||||||
if len(s.startedTransfers) > MaxCompletedTransfers+s.ci.Transfers {
|
if len(s.startedTransfers) > s.maxCompletedTransfers+s.ci.Transfers {
|
||||||
for i, tr := range s.startedTransfers {
|
for i, tr := range s.startedTransfers {
|
||||||
if tr.IsDone() {
|
if tr.IsDone() {
|
||||||
s._removeTransfer(tr, i)
|
s._removeTransfer(tr, i)
|
||||||
|
removed = true
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
s.mu.Unlock()
|
return removed
|
||||||
|
}
|
||||||
|
|
||||||
|
// RemoveDoneTransfers removes all Done transfers.
|
||||||
|
func (s *StatsInfo) RemoveDoneTransfers() {
|
||||||
|
for s.PruneTransfers() {
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// AddServerSideMove counts a server side move
|
// AddServerSideMove counts a server side move
|
||||||
|
|||||||
@@ -465,3 +465,27 @@ func TestPruneTransfers(t *testing.T) {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestRemoveDoneTransfers(t *testing.T) {
|
||||||
|
ctx := context.Background()
|
||||||
|
s := NewStats(ctx)
|
||||||
|
const transfers = 10
|
||||||
|
for i := int64(1); i <= int64(transfers); i++ {
|
||||||
|
s.AddTransfer(&Transfer{
|
||||||
|
startedAt: time.Unix(i, 0),
|
||||||
|
completedAt: time.Unix(i+1, 0),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
s.mu.Lock()
|
||||||
|
assert.Equal(t, time.Duration(transfers)*time.Second, s._totalDuration())
|
||||||
|
assert.Equal(t, transfers, len(s.startedTransfers))
|
||||||
|
s.mu.Unlock()
|
||||||
|
|
||||||
|
s.RemoveDoneTransfers()
|
||||||
|
|
||||||
|
s.mu.Lock()
|
||||||
|
assert.Equal(t, time.Duration(transfers)*time.Second, s._totalDuration())
|
||||||
|
assert.Equal(t, transfers, len(s.startedTransfers))
|
||||||
|
s.mu.Unlock()
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user