1
0
mirror of https://github.com/rclone/rclone.git synced 2026-02-16 00:24:42 +00:00

Compare commits

..

1 Commits

Author SHA1 Message Date
Nick Craig-Wood
ef6b133710 rc: implement operations/discard to read and discard a file 2025-09-23 11:25:02 +01:00
9 changed files with 215 additions and 140 deletions

View File

@@ -4,19 +4,15 @@ package bilib
import (
"bytes"
"log/slog"
"sync"
"github.com/rclone/rclone/fs/log"
)
// CaptureOutput runs a function capturing its output at log level INFO.
func CaptureOutput(fun func()) []byte {
var mu sync.Mutex
buf := &bytes.Buffer{}
oldLevel := log.Handler.SetLevel(slog.LevelInfo)
log.Handler.SetOutput(func(level slog.Level, text string) {
mu.Lock()
defer mu.Unlock()
buf.WriteString(text)
})
defer func() {
@@ -24,7 +20,5 @@ func CaptureOutput(fun func()) []byte {
log.Handler.SetLevel(oldLevel)
}()
fun()
mu.Lock()
defer mu.Unlock()
return buf.Bytes()
}

View File

@@ -330,7 +330,7 @@ func testBisync(ctx context.Context, t *testing.T, path1, path2 string) {
baseDir, err := os.Getwd()
require.NoError(t, err, "get current directory")
randName := time.Now().Format("150405") + random.String(2) // some bucket backends don't like dots, keep this short to avoid linux errors
tempDir := filepath.Join(t.TempDir(), randName)
tempDir := filepath.Join(os.TempDir(), randName)
workDir := filepath.Join(tempDir, "workdir")
b := &bisyncTest{

View File

@@ -707,7 +707,8 @@ func (b *bisyncRun) modifyListing(ctx context.Context, src fs.Fs, dst fs.Fs, res
prettyprint(dstList.list, "dstList", fs.LogLevelDebug)
// clear stats so we only do this once
accounting.Stats(ctx).RemoveDoneTransfers()
accounting.MaxCompletedTransfers = 0
accounting.Stats(ctx).PruneTransfers()
}
if b.DebugName != "" {

View File

@@ -245,8 +245,10 @@ func (b *bisyncRun) fastCopy(ctx context.Context, fsrc, fdst fs.Fs, files bilib.
}
}
b.SyncCI = fs.GetConfig(ctxCopy) // allows us to request graceful shutdown
accounting.Stats(ctxCopy).SetMaxCompletedTransfers(-1) // we need a complete list in the event of graceful shutdown
b.SyncCI = fs.GetConfig(ctxCopy) // allows us to request graceful shutdown
if accounting.MaxCompletedTransfers != -1 {
accounting.MaxCompletedTransfers = -1 // we need a complete list in the event of graceful shutdown
}
ctxCopy, b.CancelSync = context.WithCancel(ctxCopy)
b.testFn()
err := sync.Sync(ctxCopy, fdst, fsrc, b.opt.CreateEmptySrcDirs)

View File

@@ -22,52 +22,48 @@ const (
averageStopAfter = time.Minute
)
// MaxCompletedTransfers specifies the default maximum number of
// completed transfers in startedTransfers list. This can be adjusted
// for a given StatsInfo by calling the SetMaxCompletedTransfers
// method.
// MaxCompletedTransfers specifies maximum number of completed transfers in startedTransfers list
var MaxCompletedTransfers = 100
// StatsInfo accounts all transfers
// N.B.: if this struct is modified, please remember to also update sum() function in stats_groups
// to correctly count the updated fields
type StatsInfo struct {
mu sync.RWMutex
ctx context.Context
ci *fs.ConfigInfo
bytes int64
errors int64
lastError error
fatalError bool
retryError bool
retryAfter time.Time
checks int64
checking *transferMap
checkQueue int
checkQueueSize int64
transfers int64
transferring *transferMap
transferQueue int
transferQueueSize int64
listed int64
renames int64
renameQueue int
renameQueueSize int64
deletes int64
deletesSize int64
deletedDirs int64
inProgress *inProgress
startedTransfers []*Transfer // currently active transfers
oldTimeRanges timeRanges // a merged list of time ranges for the transfers
oldDuration time.Duration // duration of transfers we have culled
group string
startTime time.Time // the moment these stats were initialized or reset
average averageValues
serverSideCopies int64
serverSideCopyBytes int64
serverSideMoves int64
serverSideMoveBytes int64
maxCompletedTransfers int
mu sync.RWMutex
ctx context.Context
ci *fs.ConfigInfo
bytes int64
errors int64
lastError error
fatalError bool
retryError bool
retryAfter time.Time
checks int64
checking *transferMap
checkQueue int
checkQueueSize int64
transfers int64
transferring *transferMap
transferQueue int
transferQueueSize int64
listed int64
renames int64
renameQueue int
renameQueueSize int64
deletes int64
deletesSize int64
deletedDirs int64
inProgress *inProgress
startedTransfers []*Transfer // currently active transfers
oldTimeRanges timeRanges // a merged list of time ranges for the transfers
oldDuration time.Duration // duration of transfers we have culled
group string
startTime time.Time // the moment these stats were initialized or reset
average averageValues
serverSideCopies int64
serverSideCopyBytes int64
serverSideMoves int64
serverSideMoveBytes int64
}
type averageValues struct {
@@ -85,26 +81,17 @@ type averageValues struct {
func NewStats(ctx context.Context) *StatsInfo {
ci := fs.GetConfig(ctx)
s := &StatsInfo{
ctx: ctx,
ci: ci,
checking: newTransferMap(ci.Checkers, "checking"),
transferring: newTransferMap(ci.Transfers, "transferring"),
inProgress: newInProgress(ctx),
startTime: time.Now(),
average: averageValues{},
maxCompletedTransfers: MaxCompletedTransfers,
ctx: ctx,
ci: ci,
checking: newTransferMap(ci.Checkers, "checking"),
transferring: newTransferMap(ci.Transfers, "transferring"),
inProgress: newInProgress(ctx),
startTime: time.Now(),
average: averageValues{},
}
return s
}
// SetMaxCompletedTransfers sets the maximum number of completed transfers to keep.
func (s *StatsInfo) SetMaxCompletedTransfers(n int) *StatsInfo {
s.mu.Lock()
s.maxCompletedTransfers = n
s.mu.Unlock()
return s
}
// RemoteStats returns stats for rc
//
// If short is true then the transfers and checkers won't be added.
@@ -925,31 +912,22 @@ func (s *StatsInfo) RemoveTransfer(transfer *Transfer) {
}
// PruneTransfers makes sure there aren't too many old transfers by removing
// a single finished transfer. Returns true if it removed a transfer.
func (s *StatsInfo) PruneTransfers() bool {
s.mu.Lock()
defer s.mu.Unlock()
if s.maxCompletedTransfers < 0 {
return false
// single finished transfer.
func (s *StatsInfo) PruneTransfers() {
if MaxCompletedTransfers < 0 {
return
}
removed := false
s.mu.Lock()
// remove a transfer from the start if we are over quota
if len(s.startedTransfers) > s.maxCompletedTransfers+s.ci.Transfers {
if len(s.startedTransfers) > MaxCompletedTransfers+s.ci.Transfers {
for i, tr := range s.startedTransfers {
if tr.IsDone() {
s._removeTransfer(tr, i)
removed = true
break
}
}
}
return removed
}
// RemoveDoneTransfers removes all Done transfers.
func (s *StatsInfo) RemoveDoneTransfers() {
for s.PruneTransfers() {
}
s.mu.Unlock()
}
// AddServerSideMove counts a server side move

View File

@@ -465,27 +465,3 @@ func TestPruneTransfers(t *testing.T) {
})
}
}
func TestRemoveDoneTransfers(t *testing.T) {
ctx := context.Background()
s := NewStats(ctx)
const transfers = 10
for i := int64(1); i <= int64(transfers); i++ {
s.AddTransfer(&Transfer{
startedAt: time.Unix(i, 0),
completedAt: time.Unix(i+1, 0),
})
}
s.mu.Lock()
assert.Equal(t, time.Duration(transfers)*time.Second, s._totalDuration())
assert.Equal(t, transfers, len(s.startedTransfers))
s.mu.Unlock()
s.RemoveDoneTransfers()
s.mu.Lock()
assert.Equal(t, time.Duration(transfers)*time.Second, s._totalDuration())
assert.Equal(t, transfers, len(s.startedTransfers))
s.mu.Unlock()
}

View File

@@ -1281,6 +1281,48 @@ type readCloser struct {
io.Closer
}
// CatFile outputs the file to the io.Writer
//
// if offset == 0 it will be ignored
// if offset > 0 then the file will be seeked to that offset
// if offset < 0 then the file will be seeked that far from the end
//
// if count < 0 then it will be ignored
// if count >= 0 then only that many characters will be output
func CatFile(ctx context.Context, o fs.Object, offset, count int64, w io.Writer) (err error) {
ci := fs.GetConfig(ctx)
tr := accounting.Stats(ctx).NewTransfer(o, nil)
defer func() {
tr.Done(ctx, err)
}()
opt := fs.RangeOption{Start: offset, End: -1}
size := o.Size()
if opt.Start < 0 {
opt.Start += size
}
if count >= 0 {
opt.End = opt.Start + count - 1
}
var options []fs.OpenOption
if opt.Start > 0 || opt.End >= 0 {
options = append(options, &opt)
}
for _, option := range ci.DownloadHeaders {
options = append(options, option)
}
var in io.ReadCloser
in, err = Open(ctx, o, options...)
if err != nil {
return err
}
if count >= 0 {
in = &readCloser{Reader: &io.LimitedReader{R: in, N: count}, Closer: in}
}
in = tr.Account(ctx, in).WithBuffer() // account and buffer the transfer
_, err = io.Copy(w, in)
return err
}
// Cat any files to the io.Writer
//
// if offset == 0 it will be ignored
@@ -1291,46 +1333,14 @@ type readCloser struct {
// if count >= 0 then only that many characters will be output
func Cat(ctx context.Context, f fs.Fs, w io.Writer, offset, count int64, sep []byte) error {
var mu sync.Mutex
ci := fs.GetConfig(ctx)
return ListFn(ctx, f, func(o fs.Object) {
var err error
tr := accounting.Stats(ctx).NewTransfer(o, nil)
defer func() {
tr.Done(ctx, err)
}()
opt := fs.RangeOption{Start: offset, End: -1}
size := o.Size()
if opt.Start < 0 {
opt.Start += size
}
if count >= 0 {
opt.End = opt.Start + count - 1
}
var options []fs.OpenOption
if opt.Start > 0 || opt.End >= 0 {
options = append(options, &opt)
}
for _, option := range ci.DownloadHeaders {
options = append(options, option)
}
var in io.ReadCloser
in, err = Open(ctx, o, options...)
if err != nil {
err = fs.CountError(ctx, err)
fs.Errorf(o, "Failed to open: %v", err)
return
}
if count >= 0 {
in = &readCloser{Reader: &io.LimitedReader{R: in, N: count}, Closer: in}
}
in = tr.Account(ctx, in).WithBuffer() // account and buffer the transfer
// take the lock just before we output stuff, so at the last possible moment
mu.Lock()
defer mu.Unlock()
_, err = io.Copy(w, in)
err := CatFile(ctx, o, offset, count, w)
if err != nil {
err = fs.CountError(ctx, err)
fs.Errorf(o, "Failed to send to output: %v", err)
return
}
if len(sep) > 0 {
_, err = w.Write(sep)

View File

@@ -948,3 +948,52 @@ func rcHashsum(ctx context.Context, in rc.Params) (out rc.Params, err error) {
}
return out, err
}
func init() {
rc.Add(rc.Call{
Path: "operations/discard",
AuthRequired: true,
Fn: rcDiscard,
Title: "Read and discard bytes from a file",
Help: `This takes the following parameters:
- fs - a remote name string e.g. "drive:"
- remote - a file within that remote e.g. "file.txt"
- offset - offset to start reading from, start if unset, from end if -ve
- count - bytes to read, all if unset
This is similar to the [cat](/commands/rclone_cat/) with the --discard flag.
It can be used for reading files into the VFS cache.
`,
})
}
// Cat a file with --discard
func rcDiscard(ctx context.Context, in rc.Params) (out rc.Params, err error) {
f, remote, err := rc.GetFsAndRemote(ctx, in)
if err != nil {
return nil, err
}
o, err := f.NewObject(ctx, remote)
if err != nil {
return nil, err
}
offset, err := in.GetInt64("offset")
if rc.IsErrParamNotFound(err) {
offset = 0
} else if err != nil {
return nil, err
}
count, err := in.GetInt64("count")
if rc.IsErrParamNotFound(err) {
count = -1
} else if err != nil {
return nil, err
}
err = CatFile(ctx, o, offset, count, io.Discard)
if err != nil {
return nil, err
}
return nil, nil
}

View File

@@ -14,12 +14,14 @@ import (
"time"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/accounting"
"github.com/rclone/rclone/fs/cache"
"github.com/rclone/rclone/fs/hash"
"github.com/rclone/rclone/fs/operations"
"github.com/rclone/rclone/fs/rc"
"github.com/rclone/rclone/fstest"
"github.com/rclone/rclone/lib/diskusage"
"github.com/rclone/rclone/lib/random"
"github.com/rclone/rclone/lib/rest"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@@ -866,3 +868,66 @@ func TestRcHashsumFile(t *testing.T) {
assert.Equal(t, "md5", out["hashType"])
assert.Equal(t, []string{"0ef726ce9b1a7692357ff70dd321d595 hashsum-file1"}, out["hashsum"])
}
// operations/discard: read and discard the contents of a file
func TestRcDiscard(t *testing.T) {
ctx := context.Background()
r, call := rcNewRun(t, "operations/discard")
r.Mkdir(ctx, r.Fremote)
fileContents := "file contents to be discarded"
file := r.WriteBoth(ctx, "discard-file", fileContents, t1)
r.CheckLocalItems(t, file)
r.CheckRemoteItems(t, file)
for _, tt := range []struct {
name string
in rc.Params
want int64
}{{
name: "full read",
in: rc.Params{
"fs": r.FremoteName,
"remote": file.Path,
},
want: int64(len(fileContents)),
}, {
name: "start",
in: rc.Params{
"fs": r.FremoteName,
"remote": file.Path,
"count": 2,
},
want: 2,
}, {
name: "offset",
in: rc.Params{
"fs": r.FremoteName,
"remote": file.Path,
"offset": 1,
"count": 3,
},
want: 3,
}, {
name: "end",
in: rc.Params{
"fs": r.FremoteName,
"remote": file.Path,
"offset": -1,
"count": 4,
},
want: 1,
}} {
t.Run(tt.name, func(t *testing.T) {
group := random.String(8)
ctx := accounting.WithStatsGroup(ctx, group)
out, err := call.Fn(ctx, tt.in)
require.NoError(t, err)
assert.Equal(t, rc.Params(nil), out)
stats := accounting.Stats(ctx)
assert.Equal(t, tt.want, stats.GetBytes())
})
}
}