mirror of
https://github.com/rclone/rclone.git
synced 2026-02-11 14:03:46 +00:00
Compare commits
4 Commits
fix-rc-dis
...
fix-8815-b
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1e207173d9 | ||
|
|
05279e3918 | ||
|
|
42a601fbf2 | ||
|
|
9b5e6a7e91 |
@@ -4,15 +4,19 @@ package bilib
|
||||
import (
|
||||
"bytes"
|
||||
"log/slog"
|
||||
"sync"
|
||||
|
||||
"github.com/rclone/rclone/fs/log"
|
||||
)
|
||||
|
||||
// CaptureOutput runs a function capturing its output at log level INFO.
|
||||
func CaptureOutput(fun func()) []byte {
|
||||
var mu sync.Mutex
|
||||
buf := &bytes.Buffer{}
|
||||
oldLevel := log.Handler.SetLevel(slog.LevelInfo)
|
||||
log.Handler.SetOutput(func(level slog.Level, text string) {
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
buf.WriteString(text)
|
||||
})
|
||||
defer func() {
|
||||
@@ -20,5 +24,7 @@ func CaptureOutput(fun func()) []byte {
|
||||
log.Handler.SetLevel(oldLevel)
|
||||
}()
|
||||
fun()
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
return buf.Bytes()
|
||||
}
|
||||
|
||||
@@ -330,7 +330,7 @@ func testBisync(ctx context.Context, t *testing.T, path1, path2 string) {
|
||||
baseDir, err := os.Getwd()
|
||||
require.NoError(t, err, "get current directory")
|
||||
randName := time.Now().Format("150405") + random.String(2) // some bucket backends don't like dots, keep this short to avoid linux errors
|
||||
tempDir := filepath.Join(os.TempDir(), randName)
|
||||
tempDir := filepath.Join(t.TempDir(), randName)
|
||||
workDir := filepath.Join(tempDir, "workdir")
|
||||
|
||||
b := &bisyncTest{
|
||||
|
||||
@@ -707,8 +707,7 @@ func (b *bisyncRun) modifyListing(ctx context.Context, src fs.Fs, dst fs.Fs, res
|
||||
prettyprint(dstList.list, "dstList", fs.LogLevelDebug)
|
||||
|
||||
// clear stats so we only do this once
|
||||
accounting.MaxCompletedTransfers = 0
|
||||
accounting.Stats(ctx).PruneTransfers()
|
||||
accounting.Stats(ctx).RemoveDoneTransfers()
|
||||
}
|
||||
|
||||
if b.DebugName != "" {
|
||||
|
||||
@@ -245,10 +245,8 @@ func (b *bisyncRun) fastCopy(ctx context.Context, fsrc, fdst fs.Fs, files bilib.
|
||||
}
|
||||
}
|
||||
|
||||
b.SyncCI = fs.GetConfig(ctxCopy) // allows us to request graceful shutdown
|
||||
if accounting.MaxCompletedTransfers != -1 {
|
||||
accounting.MaxCompletedTransfers = -1 // we need a complete list in the event of graceful shutdown
|
||||
}
|
||||
b.SyncCI = fs.GetConfig(ctxCopy) // allows us to request graceful shutdown
|
||||
accounting.Stats(ctxCopy).SetMaxCompletedTransfers(-1) // we need a complete list in the event of graceful shutdown
|
||||
ctxCopy, b.CancelSync = context.WithCancel(ctxCopy)
|
||||
b.testFn()
|
||||
err := sync.Sync(ctxCopy, fdst, fsrc, b.opt.CreateEmptySrcDirs)
|
||||
|
||||
@@ -22,48 +22,52 @@ const (
|
||||
averageStopAfter = time.Minute
|
||||
)
|
||||
|
||||
// MaxCompletedTransfers specifies maximum number of completed transfers in startedTransfers list
|
||||
// MaxCompletedTransfers specifies the default maximum number of
|
||||
// completed transfers in startedTransfers list. This can be adjusted
|
||||
// for a given StatsInfo by calling the SetMaxCompletedTransfers
|
||||
// method.
|
||||
var MaxCompletedTransfers = 100
|
||||
|
||||
// StatsInfo accounts all transfers
|
||||
// N.B.: if this struct is modified, please remember to also update sum() function in stats_groups
|
||||
// to correctly count the updated fields
|
||||
type StatsInfo struct {
|
||||
mu sync.RWMutex
|
||||
ctx context.Context
|
||||
ci *fs.ConfigInfo
|
||||
bytes int64
|
||||
errors int64
|
||||
lastError error
|
||||
fatalError bool
|
||||
retryError bool
|
||||
retryAfter time.Time
|
||||
checks int64
|
||||
checking *transferMap
|
||||
checkQueue int
|
||||
checkQueueSize int64
|
||||
transfers int64
|
||||
transferring *transferMap
|
||||
transferQueue int
|
||||
transferQueueSize int64
|
||||
listed int64
|
||||
renames int64
|
||||
renameQueue int
|
||||
renameQueueSize int64
|
||||
deletes int64
|
||||
deletesSize int64
|
||||
deletedDirs int64
|
||||
inProgress *inProgress
|
||||
startedTransfers []*Transfer // currently active transfers
|
||||
oldTimeRanges timeRanges // a merged list of time ranges for the transfers
|
||||
oldDuration time.Duration // duration of transfers we have culled
|
||||
group string
|
||||
startTime time.Time // the moment these stats were initialized or reset
|
||||
average averageValues
|
||||
serverSideCopies int64
|
||||
serverSideCopyBytes int64
|
||||
serverSideMoves int64
|
||||
serverSideMoveBytes int64
|
||||
mu sync.RWMutex
|
||||
ctx context.Context
|
||||
ci *fs.ConfigInfo
|
||||
bytes int64
|
||||
errors int64
|
||||
lastError error
|
||||
fatalError bool
|
||||
retryError bool
|
||||
retryAfter time.Time
|
||||
checks int64
|
||||
checking *transferMap
|
||||
checkQueue int
|
||||
checkQueueSize int64
|
||||
transfers int64
|
||||
transferring *transferMap
|
||||
transferQueue int
|
||||
transferQueueSize int64
|
||||
listed int64
|
||||
renames int64
|
||||
renameQueue int
|
||||
renameQueueSize int64
|
||||
deletes int64
|
||||
deletesSize int64
|
||||
deletedDirs int64
|
||||
inProgress *inProgress
|
||||
startedTransfers []*Transfer // currently active transfers
|
||||
oldTimeRanges timeRanges // a merged list of time ranges for the transfers
|
||||
oldDuration time.Duration // duration of transfers we have culled
|
||||
group string
|
||||
startTime time.Time // the moment these stats were initialized or reset
|
||||
average averageValues
|
||||
serverSideCopies int64
|
||||
serverSideCopyBytes int64
|
||||
serverSideMoves int64
|
||||
serverSideMoveBytes int64
|
||||
maxCompletedTransfers int
|
||||
}
|
||||
|
||||
type averageValues struct {
|
||||
@@ -81,17 +85,26 @@ type averageValues struct {
|
||||
func NewStats(ctx context.Context) *StatsInfo {
|
||||
ci := fs.GetConfig(ctx)
|
||||
s := &StatsInfo{
|
||||
ctx: ctx,
|
||||
ci: ci,
|
||||
checking: newTransferMap(ci.Checkers, "checking"),
|
||||
transferring: newTransferMap(ci.Transfers, "transferring"),
|
||||
inProgress: newInProgress(ctx),
|
||||
startTime: time.Now(),
|
||||
average: averageValues{},
|
||||
ctx: ctx,
|
||||
ci: ci,
|
||||
checking: newTransferMap(ci.Checkers, "checking"),
|
||||
transferring: newTransferMap(ci.Transfers, "transferring"),
|
||||
inProgress: newInProgress(ctx),
|
||||
startTime: time.Now(),
|
||||
average: averageValues{},
|
||||
maxCompletedTransfers: MaxCompletedTransfers,
|
||||
}
|
||||
return s
|
||||
}
|
||||
|
||||
// SetMaxCompletedTransfers sets the maximum number of completed transfers to keep.
|
||||
func (s *StatsInfo) SetMaxCompletedTransfers(n int) *StatsInfo {
|
||||
s.mu.Lock()
|
||||
s.maxCompletedTransfers = n
|
||||
s.mu.Unlock()
|
||||
return s
|
||||
}
|
||||
|
||||
// RemoteStats returns stats for rc
|
||||
//
|
||||
// If short is true then the transfers and checkers won't be added.
|
||||
@@ -912,22 +925,31 @@ func (s *StatsInfo) RemoveTransfer(transfer *Transfer) {
|
||||
}
|
||||
|
||||
// PruneTransfers makes sure there aren't too many old transfers by removing
|
||||
// single finished transfer.
|
||||
func (s *StatsInfo) PruneTransfers() {
|
||||
if MaxCompletedTransfers < 0 {
|
||||
return
|
||||
}
|
||||
// a single finished transfer. Returns true if it removed a transfer.
|
||||
func (s *StatsInfo) PruneTransfers() bool {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
if s.maxCompletedTransfers < 0 {
|
||||
return false
|
||||
}
|
||||
removed := false
|
||||
// remove a transfer from the start if we are over quota
|
||||
if len(s.startedTransfers) > MaxCompletedTransfers+s.ci.Transfers {
|
||||
if len(s.startedTransfers) > s.maxCompletedTransfers+s.ci.Transfers {
|
||||
for i, tr := range s.startedTransfers {
|
||||
if tr.IsDone() {
|
||||
s._removeTransfer(tr, i)
|
||||
removed = true
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
s.mu.Unlock()
|
||||
return removed
|
||||
}
|
||||
|
||||
// RemoveDoneTransfers removes all Done transfers.
|
||||
func (s *StatsInfo) RemoveDoneTransfers() {
|
||||
for s.PruneTransfers() {
|
||||
}
|
||||
}
|
||||
|
||||
// AddServerSideMove counts a server side move
|
||||
|
||||
@@ -465,3 +465,27 @@ func TestPruneTransfers(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestRemoveDoneTransfers(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
s := NewStats(ctx)
|
||||
const transfers = 10
|
||||
for i := int64(1); i <= int64(transfers); i++ {
|
||||
s.AddTransfer(&Transfer{
|
||||
startedAt: time.Unix(i, 0),
|
||||
completedAt: time.Unix(i+1, 0),
|
||||
})
|
||||
}
|
||||
|
||||
s.mu.Lock()
|
||||
assert.Equal(t, time.Duration(transfers)*time.Second, s._totalDuration())
|
||||
assert.Equal(t, transfers, len(s.startedTransfers))
|
||||
s.mu.Unlock()
|
||||
|
||||
s.RemoveDoneTransfers()
|
||||
|
||||
s.mu.Lock()
|
||||
assert.Equal(t, time.Duration(transfers)*time.Second, s._totalDuration())
|
||||
assert.Equal(t, transfers, len(s.startedTransfers))
|
||||
s.mu.Unlock()
|
||||
}
|
||||
|
||||
@@ -1281,48 +1281,6 @@ type readCloser struct {
|
||||
io.Closer
|
||||
}
|
||||
|
||||
// CatFile outputs the file to the io.Writer
|
||||
//
|
||||
// if offset == 0 it will be ignored
|
||||
// if offset > 0 then the file will be seeked to that offset
|
||||
// if offset < 0 then the file will be seeked that far from the end
|
||||
//
|
||||
// if count < 0 then it will be ignored
|
||||
// if count >= 0 then only that many characters will be output
|
||||
func CatFile(ctx context.Context, o fs.Object, offset, count int64, w io.Writer) (err error) {
|
||||
ci := fs.GetConfig(ctx)
|
||||
tr := accounting.Stats(ctx).NewTransfer(o, nil)
|
||||
defer func() {
|
||||
tr.Done(ctx, err)
|
||||
}()
|
||||
opt := fs.RangeOption{Start: offset, End: -1}
|
||||
size := o.Size()
|
||||
if opt.Start < 0 {
|
||||
opt.Start += size
|
||||
}
|
||||
if count >= 0 {
|
||||
opt.End = opt.Start + count - 1
|
||||
}
|
||||
var options []fs.OpenOption
|
||||
if opt.Start > 0 || opt.End >= 0 {
|
||||
options = append(options, &opt)
|
||||
}
|
||||
for _, option := range ci.DownloadHeaders {
|
||||
options = append(options, option)
|
||||
}
|
||||
var in io.ReadCloser
|
||||
in, err = Open(ctx, o, options...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if count >= 0 {
|
||||
in = &readCloser{Reader: &io.LimitedReader{R: in, N: count}, Closer: in}
|
||||
}
|
||||
in = tr.Account(ctx, in).WithBuffer() // account and buffer the transfer
|
||||
_, err = io.Copy(w, in)
|
||||
return err
|
||||
}
|
||||
|
||||
// Cat any files to the io.Writer
|
||||
//
|
||||
// if offset == 0 it will be ignored
|
||||
@@ -1333,14 +1291,46 @@ func CatFile(ctx context.Context, o fs.Object, offset, count int64, w io.Writer)
|
||||
// if count >= 0 then only that many characters will be output
|
||||
func Cat(ctx context.Context, f fs.Fs, w io.Writer, offset, count int64, sep []byte) error {
|
||||
var mu sync.Mutex
|
||||
ci := fs.GetConfig(ctx)
|
||||
return ListFn(ctx, f, func(o fs.Object) {
|
||||
var err error
|
||||
tr := accounting.Stats(ctx).NewTransfer(o, nil)
|
||||
defer func() {
|
||||
tr.Done(ctx, err)
|
||||
}()
|
||||
opt := fs.RangeOption{Start: offset, End: -1}
|
||||
size := o.Size()
|
||||
if opt.Start < 0 {
|
||||
opt.Start += size
|
||||
}
|
||||
if count >= 0 {
|
||||
opt.End = opt.Start + count - 1
|
||||
}
|
||||
var options []fs.OpenOption
|
||||
if opt.Start > 0 || opt.End >= 0 {
|
||||
options = append(options, &opt)
|
||||
}
|
||||
for _, option := range ci.DownloadHeaders {
|
||||
options = append(options, option)
|
||||
}
|
||||
var in io.ReadCloser
|
||||
in, err = Open(ctx, o, options...)
|
||||
if err != nil {
|
||||
err = fs.CountError(ctx, err)
|
||||
fs.Errorf(o, "Failed to open: %v", err)
|
||||
return
|
||||
}
|
||||
if count >= 0 {
|
||||
in = &readCloser{Reader: &io.LimitedReader{R: in, N: count}, Closer: in}
|
||||
}
|
||||
in = tr.Account(ctx, in).WithBuffer() // account and buffer the transfer
|
||||
// take the lock just before we output stuff, so at the last possible moment
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
err := CatFile(ctx, o, offset, count, w)
|
||||
_, err = io.Copy(w, in)
|
||||
if err != nil {
|
||||
err = fs.CountError(ctx, err)
|
||||
fs.Errorf(o, "Failed to send to output: %v", err)
|
||||
return
|
||||
}
|
||||
if len(sep) > 0 {
|
||||
_, err = w.Write(sep)
|
||||
|
||||
@@ -948,52 +948,3 @@ func rcHashsum(ctx context.Context, in rc.Params) (out rc.Params, err error) {
|
||||
}
|
||||
return out, err
|
||||
}
|
||||
|
||||
func init() {
|
||||
rc.Add(rc.Call{
|
||||
Path: "operations/discard",
|
||||
AuthRequired: true,
|
||||
Fn: rcDiscard,
|
||||
Title: "Read and discard bytes from a file",
|
||||
Help: `This takes the following parameters:
|
||||
|
||||
- fs - a remote name string e.g. "drive:"
|
||||
- remote - a file within that remote e.g. "file.txt"
|
||||
- offset - offset to start reading from, start if unset, from end if -ve
|
||||
- count - bytes to read, all if unset
|
||||
|
||||
This is similar to the [cat](/commands/rclone_cat/) with the --discard flag.
|
||||
|
||||
It can be used for reading files into the VFS cache.
|
||||
`,
|
||||
})
|
||||
}
|
||||
|
||||
// Cat a file with --discard
|
||||
func rcDiscard(ctx context.Context, in rc.Params) (out rc.Params, err error) {
|
||||
f, remote, err := rc.GetFsAndRemote(ctx, in)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
o, err := f.NewObject(ctx, remote)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
offset, err := in.GetInt64("offset")
|
||||
if rc.IsErrParamNotFound(err) {
|
||||
offset = 0
|
||||
} else if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
count, err := in.GetInt64("count")
|
||||
if rc.IsErrParamNotFound(err) {
|
||||
count = -1
|
||||
} else if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
err = CatFile(ctx, o, offset, count, io.Discard)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
@@ -14,14 +14,12 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/rclone/rclone/fs"
|
||||
"github.com/rclone/rclone/fs/accounting"
|
||||
"github.com/rclone/rclone/fs/cache"
|
||||
"github.com/rclone/rclone/fs/hash"
|
||||
"github.com/rclone/rclone/fs/operations"
|
||||
"github.com/rclone/rclone/fs/rc"
|
||||
"github.com/rclone/rclone/fstest"
|
||||
"github.com/rclone/rclone/lib/diskusage"
|
||||
"github.com/rclone/rclone/lib/random"
|
||||
"github.com/rclone/rclone/lib/rest"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
@@ -868,66 +866,3 @@ func TestRcHashsumFile(t *testing.T) {
|
||||
assert.Equal(t, "md5", out["hashType"])
|
||||
assert.Equal(t, []string{"0ef726ce9b1a7692357ff70dd321d595 hashsum-file1"}, out["hashsum"])
|
||||
}
|
||||
|
||||
// operations/discard: read and discard the contents of a file
|
||||
func TestRcDiscard(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
r, call := rcNewRun(t, "operations/discard")
|
||||
r.Mkdir(ctx, r.Fremote)
|
||||
|
||||
fileContents := "file contents to be discarded"
|
||||
file := r.WriteBoth(ctx, "discard-file", fileContents, t1)
|
||||
r.CheckLocalItems(t, file)
|
||||
r.CheckRemoteItems(t, file)
|
||||
|
||||
for _, tt := range []struct {
|
||||
name string
|
||||
in rc.Params
|
||||
want int64
|
||||
}{{
|
||||
name: "full read",
|
||||
in: rc.Params{
|
||||
"fs": r.FremoteName,
|
||||
"remote": file.Path,
|
||||
},
|
||||
want: int64(len(fileContents)),
|
||||
}, {
|
||||
name: "start",
|
||||
in: rc.Params{
|
||||
"fs": r.FremoteName,
|
||||
"remote": file.Path,
|
||||
"count": 2,
|
||||
},
|
||||
want: 2,
|
||||
}, {
|
||||
name: "offset",
|
||||
in: rc.Params{
|
||||
"fs": r.FremoteName,
|
||||
"remote": file.Path,
|
||||
"offset": 1,
|
||||
"count": 3,
|
||||
},
|
||||
want: 3,
|
||||
}, {
|
||||
name: "end",
|
||||
in: rc.Params{
|
||||
"fs": r.FremoteName,
|
||||
"remote": file.Path,
|
||||
"offset": -1,
|
||||
"count": 4,
|
||||
},
|
||||
want: 1,
|
||||
}} {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
group := random.String(8)
|
||||
ctx := accounting.WithStatsGroup(ctx, group)
|
||||
|
||||
out, err := call.Fn(ctx, tt.in)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, rc.Params(nil), out)
|
||||
|
||||
stats := accounting.Stats(ctx)
|
||||
assert.Equal(t, tt.want, stats.GetBytes())
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user