1
0
mirror of https://github.com/rclone/rclone.git synced 2025-12-12 06:13:20 +00:00

Compare commits

...

10 Commits

Author SHA1 Message Date
Nick Craig-Wood
ab60a77aba cluster: make workers write status and controller read the status
The controller will retry the batches if it loses contact with the
worker.
2025-10-16 15:50:52 +01:00
Nick Craig-Wood
09535a06f7 cluster: add docs 2025-10-16 15:48:32 +01:00
Nick Craig-Wood
173b720173 rcd: obey --cluster if set to run as a cluster server 2025-10-16 15:48:32 +01:00
Nick Craig-Wood
6660e6ec7c sync,move,copy: add --cluster support 2025-10-16 15:48:32 +01:00
Nick Craig-Wood
14c604335e cluster: implement --cluster and related flags FIXME WIP
Needs
- tests
2025-10-16 15:48:32 +01:00
Nick Craig-Wood
bfcb23b7b2 accounting: add AccountReadN for use in cluster 2025-10-16 15:48:32 +01:00
Nick Craig-Wood
46dbdb8cb7 fs: add NonDefaultRC for discovering options in use
This enables us to send rc messages with the config in use.
2025-10-16 15:48:32 +01:00
Nick Craig-Wood
17932fcc38 fs: move tests into correct files 2025-10-16 15:48:32 +01:00
Nick Craig-Wood
77faa787e1 rc: add NewJobFromBytes for reading jobs from non HTTP transactions 2025-10-16 15:48:32 +01:00
Nick Craig-Wood
0701dd55cd rc: add job/batch for sending batches of rc commands to run concurrently 2025-10-16 15:48:32 +01:00
19 changed files with 2424 additions and 338 deletions

View File

@@ -291,7 +291,7 @@ jobs:
README.md
RELEASE.md
CODE_OF_CONDUCT.md
docs/content/{authors,bugs,changelog,docs,downloads,faq,filtering,gui,install,licence,overview,privacy}.md
docs/content/{authors,bugs,changelog,cluster,docs,downloads,faq,filtering,gui,install,licence,overview,privacy}.md
- name: Scan edits of autogenerated files
run: bin/check_autogenerated_edits.py 'origin/${{ github.base_ref }}'

View File

@@ -23,6 +23,7 @@ import (
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/accounting"
"github.com/rclone/rclone/fs/cache"
"github.com/rclone/rclone/fs/cluster"
"github.com/rclone/rclone/fs/config/configfile"
"github.com/rclone/rclone/fs/config/configflags"
"github.com/rclone/rclone/fs/config/flags"
@@ -481,6 +482,22 @@ func initConfig() {
}
})
}
// Run as a cluster worker if configured, otherwise ignoring
// the command given on the command line
if ci.Cluster != "" {
if ci.ClusterID == "" || ci.ClusterID == "0" {
fs.Infof(nil, "Running in cluster mode %q as controller", ci.ClusterID)
} else {
fs.Infof(nil, "Running in cluster mode %q as worker with id %q", ci.ClusterID, ci.ClusterID)
worker, err := cluster.NewWorker(ctx)
if err != nil || worker == nil {
fs.Fatalf(nil, "Failed to start cluster worker: %v", err)
}
// Do not continue with the main thread
select {}
}
}
}
func resolveExitCode(err error) {

View File

@@ -7,6 +7,7 @@ import (
"github.com/rclone/rclone/cmd"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/cluster"
"github.com/rclone/rclone/fs/rc"
"github.com/rclone/rclone/fs/rc/rcflags"
"github.com/rclone/rclone/fs/rc/rcserver"
@@ -38,6 +39,8 @@ See the [rc documentation](/rc/) for more info on the rc flags.
"groups": "RC",
},
Run: func(command *cobra.Command, args []string) {
ctx := context.Background()
cmd.CheckArgs(0, 1, command, args)
if rc.Opt.Enabled {
fs.Fatalf(nil, "Don't supply --rc flag when using rcd")
@@ -49,6 +52,12 @@ See the [rc documentation](/rc/) for more info on the rc flags.
rc.Opt.Files = args[0]
}
// Start the cluster worker if configured
_, err := cluster.NewWorker(ctx)
if err != nil {
fs.Fatalf(nil, "Failed to start cluster worker: %v", err)
}
s, err := rcserver.Start(context.Background(), &rc.Opt)
if err != nil {
fs.Fatalf(nil, "Failed to start remote control: %v", err)

217
docs/content/cluster.md Normal file
View File

@@ -0,0 +1,217 @@
---
title: "Cluster"
description: "Clustering rclone"
versionIntroduced: "v1.72"
---
# Cluster
Rclone has a cluster mode invoked with the `--cluster` flag. This
enables a group of rclone instances to work together on doing a sync.
This is controlled by a group of flags starting with `--cluster-` and
enabled with the `--cluster` flag.
```text
--cluster string Enable cluster mode with remote to use as shared storage
--cluster-batch-files int Max number of files for a cluster batch (default 1000)
--cluster-batch-size SizeSuffix Max size of files for a cluster batch (default 1Ti)
--cluster-cleanup ClusterCleanup Control which cluster files get cleaned up (default full)
--cluster-id string Set to an ID for the cluster. An ID of 0 or empty becomes the controller
--cluster-quit-workers Set to cause the controller to quit the workers when it finished
```
The command might look something like this which is a normal rclone
command but with a new `--cluster` flag which points at an rclone
remote defining the cluster storage. This is the signal to rclone that
it should engage the cluster mode with a controller and workers.
```sh
rclone copy source: destination: --flags --cluster /work
rclone copy source: destination: --flags --cluster s3:bucket
```
This works only with the `rclone sync`, `copy` and `move` commands.
If the remote specified by the `--cluster` command is inside the
`source:` or `destination:` it must be excluded with the filter flags.
Any rclone remotes used in the transfer must be defined in all cluster
nodes. Defining remotes with connection strings will get around that
problem.
## Terminology
The cluster has two logical groups, the controller and the workers.
There is one controller and many workers.
The controller and the workers will communicate with each other by
creating files in the remote pointed to by the `--cluster` flag. This
could be for example an S3 bucket or a Kubernetes PVC.
The files are JSON serialized rc commands. Multiple commands are sent
using `rc/batch`. The commands flow `pending``processing``done`
`finished`
```text
└── queue
├── pending ← pending task files created by the controller
├── processing ← claimed tasks being executed by a worker
├── done ← finished tasks awaiting the controller to read the result
└── finished ← completed task files
```
The cluster can be set up in two ways as a persistent cluster or as a
transient cluster.
### Persistent cluster
Run a cluster of workers using
```sh
rclone rcd --cluster /work
```
Then run rclone commands when required on the cluster:
```sh
rclone copy source: destination: --flags --cluster /work
```
In this mode there can be many rclone commands executing at once.
### Transient cluster
Run many copies of rclone simultaneously, for example in a Kubernetes
indexed job.
The rclone with `--cluster-id 0` becomes the controller and the others
become the workers. For a Kubernetes indexed job, setting
`--cluster-id $(JOB_COMPLETION_INDEX)` would work well.
Add the `--cluster-quit-workers` flag - this will cause the controller
to make sure the workers exit when it has finished.
All instances of rclone run a command like this so the whole cluster
can only run one rclone command:
```sh
rclone copy source: destination: --flags --cluster /work --cluster-id $(JOB_COMPLETION_INDEX) --cluster-quit-workers
```
## Controller
The controller runs the sync and work distribution.
- It does the listing of the source and destination directories
comparing files in order to find files which need to be transferred.
- Files which need to be transferred are then batched into jobs of
`--cluster-batch-files` files to transfer or `--cluster-batch-size`
max size in `queue/pending` for the workers to pick up.
- It watches `queue/done` for finished jobs and updates the transfer
statistics and logs any errors, accordingly moving the job to
`queue/finished`.
Once the sync is complete, if `--cluster-quit-workers` is set, then it
sends the workers a special command which causes them all to exit.
The controller only sends transfer jobs to the workers. All the other
tasks (eg listing, comparing) are done by the controller. The
controller does not execute any transfer tasks itself.
The controller reads worker status as written to `queue/status` and
will detect workers which have stopped. If it detects a failed worker
then it will re-assign any outstanding work.
## Workers
The workers job is entirely to act as API endpoints that receive their
work via files in `/work`. Then
- Read work in `queue/pending`
- Attempt to rename into `queue/processing`
- If the cluster work directory supports atomic renames, then use
those, otherwise read the file, write the copy, delete the original.
If the delete fails then the rename was not successful (possible on
s3 backends).
- If successful then do that item of work. If not successful another
worker got there first and sleep for a bit then retry.
- After the copy is complete then remove the `queue/processing` file
or rename it into `queue/finished` if the `--cluster-cleanup` flag
allows it.
- Repeat
Every second the worker will write a status file in `queue/status` to
be read by the controller.
## Layout of the work directory
The format of the files in this directory may change without notice
but the layout is documented here as it can help debugging.
```text
/work - root of the work directory
└── queue - files to control the queue
├── done - job files that are finished and read
├── finished - job files that are finished but not yet read
├── pending - job files that are not started yet
├── processing - job files that are running
└── status - worker status files
```
If debugging use `--cluster-cleanup none` to leave the completed files
in the directory layout.
## Flags
### --cluster string
This enables the cluster mode. Without this flag, all the other
cluster flags are ignored. This should be given a remote which can be
a local directory, eg `/work` or a remote directory, eg `s3:bucket`.
### --cluster-batch-files int
This controls the number of files copied in a cluster batch. Setting
this larger may be more efficient but it means the statistics will be
less accurate on the controller (default 1000).
### --cluster-batch-size SizeSuffix
This controls the total size of files in a cluster batch. If the size
of the files in a batch exceeds this number then the batch will be
sent to the workers. Setting this larger may be more efficient but it
means the statistics will be less accurate on the controller. (default
1TiB)
### --cluster-cleanup ClusterCleanup
Controls which cluster files get cleaned up.
- `full` - clean all work files (default)
- `completed` - clean completed work files but leave the errors and status
- `none` - leave all the file (useful for debugging)
### --cluster-id string
Set an ID for the rclone instance. This can be a string or a number.
An ID of 0 will become the controller otherwise the instance will
become a worker. If this flag isn't supplied or the value is empty,
then a random string will be used instead.
### --cluster-quit-workers
If this flag is set, then when the controller finishes its sync task
it will quit all the workers before it exits.
## Not implemented
Here are some features from the original design which are not
implemented yet:
- the controller will not notice if workers die or fail to complete
their tasks
- the controller does not re-assign the workers work if necessary
- the controller does not restart the sync
- the workers do not write any status files (but the stats are
correctly accounted)

View File

@@ -3315,6 +3315,15 @@ For the remote control options and for instructions on how to remote control rcl
See [the remote control section](/rc/).
## Cluster
For the cluster options and for instructions on how to cluster rclone:
- `--cluster`
- Anything starting with `--cluster-`
See the [cluster section](/cluster/).
## Logging
rclone has 4 levels of logging, `ERROR`, `NOTICE`, `INFO` and `DEBUG`.

View File

@@ -20,6 +20,7 @@
<a class="dropdown-item" href="/gui/"><i class="fa fa-book fa-fw"></i> GUI</a>
<a class="dropdown-item" href="/rc/"><i class="fa fa-book fa-fw"></i> Remote Control</a>
<a class="dropdown-item" href="/remote_setup/"><i class="fa fa-book fa-fw"></i> Remote Setup</a>
<a class="dropdown-item" href="/cluster/"><i class="fa fa-book fa-fw"></i> Cluster</a>
<a class="dropdown-item" href="/changelog/"><i class="fa fa-book fa-fw"></i> Changelog</a>
<a class="dropdown-item" href="/bugs/"><i class="fa fa-book fa-fw"></i> Bugs</a>
<a class="dropdown-item" href="/faq/"><i class="fa fa-book fa-fw"></i> FAQ</a>

View File

@@ -82,7 +82,7 @@ type accountValues struct {
max int64 // if >=0 the max number of bytes to transfer
start time.Time // Start time of first read
lpTime time.Time // Time of last average measurement
lpBytes int // Number of bytes read since last measurement
lpBytes int64 // Number of bytes read since last measurement
avg float64 // Moving average of last few measurements in Byte/s
}
@@ -344,15 +344,20 @@ func (acc *Account) limitPerFileBandwidth(n int) {
}
}
// Account the read and limit bandwidth
func (acc *Account) accountRead(n int) {
// Account the read
func (acc *Account) accountReadN(n int64) {
// Update Stats
acc.values.mu.Lock()
acc.values.lpBytes += n
acc.values.bytes += int64(n)
acc.values.bytes += n
acc.values.mu.Unlock()
acc.stats.Bytes(int64(n))
acc.stats.Bytes(n)
}
// Account the read and limit bandwidth
func (acc *Account) accountRead(n int) {
acc.accountReadN(int64(n))
TokenBucket.LimitBandwidth(TokenBucketSlotAccounting, n)
acc.limitPerFileBandwidth(n)
@@ -427,6 +432,15 @@ func (acc *Account) AccountRead(n int) (err error) {
return err
}
// AccountReadN account having read n bytes
//
// Does not obey any transfer limits, bandwidth limits, etc.
func (acc *Account) AccountReadN(n int64) {
acc.mu.Lock()
defer acc.mu.Unlock()
acc.accountReadN(n)
}
// Close the object
func (acc *Account) Close() error {
acc.mu.Lock()

View File

@@ -100,7 +100,7 @@ func TestAccountRead(t *testing.T) {
assert.True(t, acc.values.start.IsZero())
acc.values.mu.Lock()
assert.Equal(t, 0, acc.values.lpBytes)
assert.Equal(t, int64(0), acc.values.lpBytes)
assert.Equal(t, int64(0), acc.values.bytes)
acc.values.mu.Unlock()
assert.Equal(t, int64(0), stats.bytes)
@@ -113,7 +113,7 @@ func TestAccountRead(t *testing.T) {
assert.False(t, acc.values.start.IsZero())
acc.values.mu.Lock()
assert.Equal(t, 2, acc.values.lpBytes)
assert.Equal(t, int64(2), acc.values.lpBytes)
assert.Equal(t, int64(2), acc.values.bytes)
acc.values.mu.Unlock()
assert.Equal(t, int64(2), stats.bytes)
@@ -145,7 +145,7 @@ func testAccountWriteTo(t *testing.T, withBuffer bool) {
assert.True(t, acc.values.start.IsZero())
acc.values.mu.Lock()
assert.Equal(t, 0, acc.values.lpBytes)
assert.Equal(t, int64(0), acc.values.lpBytes)
assert.Equal(t, int64(0), acc.values.bytes)
acc.values.mu.Unlock()
assert.Equal(t, int64(0), stats.bytes)
@@ -159,7 +159,7 @@ func testAccountWriteTo(t *testing.T, withBuffer bool) {
assert.False(t, acc.values.start.IsZero())
acc.values.mu.Lock()
assert.Equal(t, len(buf), acc.values.lpBytes)
assert.Equal(t, int64(len(buf)), acc.values.lpBytes)
assert.Equal(t, int64(len(buf)), acc.values.bytes)
acc.values.mu.Unlock()
assert.Equal(t, int64(len(buf)), stats.bytes)

598
fs/cluster/cluster.go Normal file
View File

@@ -0,0 +1,598 @@
// Package cluster implements a machanism to distribute work over a
// cluster of rclone instances.
package cluster
import (
"context"
"encoding/json"
"errors"
"fmt"
"path"
"strings"
"sync"
"time"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/accounting"
"github.com/rclone/rclone/fs/filter"
"github.com/rclone/rclone/fs/operations"
"github.com/rclone/rclone/fs/rc"
"github.com/rclone/rclone/lib/atexit"
"github.com/rclone/rclone/lib/errcount"
"golang.org/x/sync/errgroup"
)
// ErrClusterNotConfigured is returned from creation functions.
var ErrClusterNotConfigured = errors.New("cluster is not configured")
// If we don't hear from workers in this time we assume they have timed out
// and re-assign their jobs.
const workerTimeout = 2 * time.Second
// Cluster describes the workings of the current cluster.
type Cluster struct {
jobs *Jobs
id string
batchFiles int
batchSize fs.SizeSuffix
cleanup fs.ClusterCleanup // how we cleanup cluster files
_config rc.Params // for rc
_filter rc.Params // for rc
cancel func() // stop bg job
wg sync.WaitGroup // bg job finished
quit chan struct{} // signal graceful stop
sync chan chan<- struct{} // sync the current jobs
quitWorkers bool // if set, send workers a stop signal on Shutdown
workers map[string]*WorkerStatus // worker ID => status
deadWorkers map[string]struct{}
mu sync.Mutex
currentBatch Batch
inflight map[string]Batch
shutdown bool
}
// Batch is a collection of rc tasks to do
type Batch struct {
size int64 // size in batch
Path string `json:"_path"`
Inputs []rc.Params `json:"inputs"`
Config rc.Params `json:"_config,omitempty"`
Filter rc.Params `json:"_filter,omitempty"`
trs []*accounting.Transfer // transfer for each Input
sizes []int64 // sizes for each Input
}
// BatchResult has the results of the batch as received.
type BatchResult struct {
Results []rc.Params `json:"results"`
// Error returns
Error string `json:"error"`
Status int `json:"status"`
Input string `json:"input"`
Path string `json:"path"`
}
// NewCluster creates a new cluster from the config in ctx.
//
// It may return nil for no cluster is configured.
func NewCluster(ctx context.Context) (*Cluster, error) {
ci := fs.GetConfig(ctx)
if ci.Cluster == "" {
return nil, nil
}
jobs, err := NewJobs(ctx)
if err != nil {
return nil, err
}
c := &Cluster{
jobs: jobs,
id: ci.ClusterID,
quitWorkers: ci.ClusterQuitWorkers,
batchFiles: ci.ClusterBatchFiles,
batchSize: ci.ClusterBatchSize,
cleanup: ci.ClusterCleanup,
quit: make(chan struct{}),
sync: make(chan chan<- struct{}),
inflight: make(map[string]Batch),
workers: make(map[string]*WorkerStatus),
deadWorkers: make(map[string]struct{}),
}
// Configure _config
configParams, err := fs.ConfigOptionsInfo.NonDefaultRC(ci)
if err != nil {
return nil, fmt.Errorf("failed to read global config: %w", err)
}
// Remove any global cluster config
for k := range configParams {
if strings.HasPrefix(k, "Cluster") {
delete(configParams, k)
}
}
if len(configParams) != 0 {
fs.Debugf(nil, "Overridden global config: %#v", configParams)
}
c._config = rc.Params(configParams)
// Configure _filter
fi := filter.GetConfig(ctx)
if !fi.InActive() {
filterParams, err := filter.OptionsInfo.NonDefaultRC(fi)
if err != nil {
return nil, fmt.Errorf("failed to read filter config: %w", err)
}
fs.Debugf(nil, "Overridden filter config: %#v", filterParams)
c._filter = rc.Params(filterParams)
}
err = c.jobs.createDirectoryStructure(ctx)
if err != nil {
return nil, err
}
// Start the background worker
bgCtx, cancel := context.WithCancel(context.Background())
c.cancel = cancel
c.wg.Add(1)
go c.run(bgCtx)
fs.Logf(c.jobs.f, "Started cluster master")
return c, nil
}
var (
globalClusterMu sync.Mutex
globalCluster *Cluster
)
// GetCluster starts or gets a cluster.
//
// If no cluster is configured or the cluster can't be started then it
// returns nil.
func GetCluster(ctx context.Context) *Cluster {
globalClusterMu.Lock()
defer globalClusterMu.Unlock()
if globalCluster != nil {
return globalCluster
}
cluster, err := NewCluster(ctx)
if err != nil {
fs.Errorf(nil, "Failed to start cluster: %v", err)
return nil
}
if cluster != nil {
atexit.Register(func() {
err := cluster.Shutdown(context.Background())
if err != nil {
fs.Errorf(nil, "Failed to shutdown cluster: %v", err)
}
})
}
globalCluster = cluster
return globalCluster
}
// Send the current batch for processing
//
// call with c.mu held
func (c *Cluster) sendBatch(ctx context.Context) (err error) {
// Do nothing if the batch is empty
if len(c.currentBatch.Inputs) == 0 {
return nil
}
// Get and reset current batch
b := c.currentBatch
c.currentBatch = Batch{}
b.Path = "job/batch"
b.Config = c._config
b.Filter = c._filter
// write the pending job
name, err := c.jobs.writeJob(ctx, clusterPending, &b)
if err != nil {
return err
}
fs.Infof(name, "written cluster batch file")
c.inflight[name] = b
return nil
}
// Add the command to the current batch
func (c *Cluster) addToBatch(ctx context.Context, obj fs.Object, in rc.Params, size int64, tr *accounting.Transfer) (err error) {
c.mu.Lock()
defer c.mu.Unlock()
if c.shutdown {
return errors.New("internal error: can't add file to Shutdown cluster")
}
c.currentBatch.Inputs = append(c.currentBatch.Inputs, in)
c.currentBatch.size += size
c.currentBatch.trs = append(c.currentBatch.trs, tr)
c.currentBatch.sizes = append(c.currentBatch.sizes, size)
if c.currentBatch.size >= int64(c.batchSize) || len(c.currentBatch.Inputs) >= c.batchFiles {
err = c.sendBatch(ctx)
if err != nil {
return err
}
}
return nil
}
// Move does operations.Move via the cluster.
//
// Move src object to dst or fdst if nil. If dst is nil then it uses
// remote as the name of the new object.
func (c *Cluster) Move(ctx context.Context, fdst fs.Fs, dst fs.Object, remote string, src fs.Object) (err error) {
tr := accounting.Stats(ctx).NewTransfer(src, fdst)
if operations.SkipDestructive(ctx, src, "cluster move") {
in := tr.Account(ctx, nil)
in.DryRun(src.Size())
tr.Done(ctx, nil)
return nil
}
fsrc, ok := src.Fs().(fs.Fs)
if !ok {
err = errors.New("internal error: cluster move: can't cast src.Fs() to fs.Fs")
tr.Done(ctx, err)
return err
}
in := rc.Params{
"_path": "operations/movefile",
"dstFs": fs.ConfigStringFull(fdst),
"dstRemote": remote,
"srcFs": fs.ConfigStringFull(fsrc),
"srcRemote": src.Remote(),
}
if dst != nil {
in["dstRemote"] = dst.Remote()
}
return c.addToBatch(ctx, src, in, src.Size(), tr)
}
// Copy does operations.Copy via the cluster.
//
// Copy src object to dst or fdst if nil. If dst is nil then it uses
// remote as the name of the new object.
func (c *Cluster) Copy(ctx context.Context, fdst fs.Fs, dst fs.Object, remote string, src fs.Object) (err error) {
tr := accounting.Stats(ctx).NewTransfer(src, fdst)
if operations.SkipDestructive(ctx, src, "cluster copy") {
in := tr.Account(ctx, nil)
in.DryRun(src.Size())
tr.Done(ctx, nil)
return nil
}
fsrc, ok := src.Fs().(fs.Fs)
if !ok {
err = errors.New("internal error: cluster copy: can't cast src.Fs() to fs.Fs")
tr.Done(ctx, err)
return err
}
in := rc.Params{
"_path": "operations/copyfile",
"dstFs": fs.ConfigStringFull(fdst),
"dstRemote": remote,
"srcFs": fs.ConfigStringFull(fsrc),
"srcRemote": src.Remote(),
}
if dst != nil {
in["dstRemote"] = dst.Remote()
}
return c.addToBatch(ctx, src, in, src.Size(), tr)
}
// DeleteFile does operations.DeleteFile via the cluster
//
// If useBackupDir is set and --backup-dir is in effect then it moves
// the file to there instead of deleting
func (c *Cluster) DeleteFile(ctx context.Context, dst fs.Object) (err error) {
tr := accounting.Stats(ctx).NewCheckingTransfer(dst, "deleting")
err = accounting.Stats(ctx).DeleteFile(ctx, dst.Size())
if err != nil {
tr.Done(ctx, err)
return err
}
if operations.SkipDestructive(ctx, dst, "cluster delete") {
tr.Done(ctx, nil)
return
}
fdst, ok := dst.Fs().(fs.Fs)
if !ok {
err = errors.New("internal error: cluster delete: can't cast dst.Fs() to fs.Fs")
tr.Done(ctx, nil)
return err
}
in := rc.Params{
"_path": "operations/deletefile",
"fs": fs.ConfigStringFull(fdst),
"remote": dst.Remote(),
}
return c.addToBatch(ctx, dst, in, 0, tr)
}
// processCompletedJob loads the job and checks it off
func (c *Cluster) processCompletedJob(ctx context.Context, obj fs.Object) error {
name := path.Base(obj.Remote())
name, _ = strings.CutSuffix(name, ".json")
fs.Debugf(nil, "cluster: processing completed job %q", name)
var output BatchResult
err := c.jobs.readJob(ctx, obj, &output)
if err != nil {
return fmt.Errorf("check jobs read: %w", err)
}
c.mu.Lock()
input, ok := c.inflight[name]
// FIXME delete or save job
if !ok {
for k := range c.inflight {
fs.Debugf(nil, "key %q", k)
}
c.mu.Unlock()
return fmt.Errorf("check jobs: job %q not found", name)
}
c.mu.Unlock()
// Delete the inflight entry when batch is processed
defer func() {
c.mu.Lock()
delete(c.inflight, name)
c.mu.Unlock()
}()
// Check job
if output.Error != "" {
return fmt.Errorf("cluster: failed to run batch job: %s (%d)", output.Error, output.Status)
}
if len(input.Inputs) != len(output.Results) {
return fmt.Errorf("cluster: input had %d jobs but output had %d", len(input.Inputs), len(output.Results))
}
// Run through the batch and mark operations as successful or not
for i := range input.Inputs {
in := input.Inputs[i]
tr := input.trs[i]
size := input.sizes[i]
out := output.Results[i]
errorString, hasError := out["error"]
var err error
if hasError && errorString != "" {
err = fmt.Errorf("cluster: worker error: %s (%v)", errorString, out["status"])
}
if err == nil && in["_path"] == "operations/movefile" {
accounting.Stats(ctx).Renames(1)
}
acc := tr.Account(ctx, nil)
acc.AccountReadN(size)
tr.Done(ctx, err)
remote, ok := in["dstRemote"]
if !ok {
remote = in["remote"]
}
if err == nil {
fs.Infof(remote, "cluster %s successful", in["_path"])
} else {
fs.Errorf(remote, "cluster %s failed: %v", in["_path"], err)
}
}
return nil
}
// loadWorkerStatus updates the worker status
func (c *Cluster) loadWorkerStatus(ctx context.Context) error {
objs, err := c.jobs.listDir(ctx, clusterStatus)
if err != nil {
return fmt.Errorf("cluster: get job status list failed: %w", err)
}
ec := errcount.New()
g, gCtx := errgroup.WithContext(ctx)
var mu sync.Mutex
for _, obj := range objs {
g.Go(func() error {
buf, err := c.jobs.readFile(gCtx, obj)
if err != nil {
ec.Add(fmt.Errorf("read object: %w", err))
return nil
}
workerStatus := new(WorkerStatus)
err = json.Unmarshal(buf, workerStatus)
if err != nil {
ec.Add(fmt.Errorf("status json: %w", err))
return nil
}
mu.Lock()
c.workers[workerStatus.ID] = workerStatus
mu.Unlock()
return nil
})
}
return ec.Err("cluster: load status")
}
// checkWorkers loads the worker status
func (c *Cluster) checkWorkers(ctx context.Context) {
err := c.loadWorkerStatus(ctx)
if err != nil {
fs.Errorf(nil, "failed to read some worker status: %v", err)
}
for workerID, status := range c.workers {
timeSinceUpdated := time.Since(status.Updated)
if timeSinceUpdated > workerTimeout {
if _, isDead := c.deadWorkers[workerID]; isDead {
continue
}
fs.Errorf(nil, "cluster: haven't heard from worker %q for %v - assuming dead", workerID, timeSinceUpdated)
// Find any jobs claimed by worker and restart
objs, err := c.jobs.listDir(ctx, clusterProcessing)
if err != nil {
fs.Errorf(nil, "cluster: failed to find pending jobs: %v", err)
continue
}
for _, obj := range objs {
fs.Errorf(obj, "cluster: checking job")
// Jobs are named {jobID}-{workerID}.json
name := strings.TrimSuffix(path.Base(obj.Remote()), ".json")
dash := strings.LastIndex(name, "-")
if dash < 0 {
fs.Errorf(nil, "cluster: failed to find dash in job %q", name)
continue
}
jobID, jobWorkerID := name[:dash], name[dash+1:]
fs.Errorf(obj, "cluster: checking jobID %q, workerID %q", jobID, jobWorkerID)
if workerID != jobWorkerID {
fs.Debugf(nil, "cluster: job %q doesn't match %q", jobWorkerID, workerID)
continue
}
// Found a job running on worker - rename it back to Pending
newRemote := path.Join(clusterPending, jobID+".json")
_, err = c.jobs.rename(ctx, obj, newRemote)
if err != nil {
fs.Errorf(nil, "cluster: failed to restart job %q: %v", jobID, err)
continue
}
fs.Errorf(nil, "cluster: restarted job %q", jobID)
}
c.deadWorkers[workerID] = struct{}{}
} else {
if _, isDead := c.deadWorkers[workerID]; isDead {
fs.Errorf(nil, "cluster: dead worker %q came back to life!", workerID)
delete(c.deadWorkers, workerID)
}
}
}
}
// checkJobs sees if there are any completed jobs
func (c *Cluster) checkJobs(ctx context.Context) {
objs, err := c.jobs.listDir(ctx, clusterDone)
if err != nil {
fs.Errorf(nil, "cluster: get completed job list failed: %v", err)
return
}
for _, obj := range objs {
err := c.processCompletedJob(ctx, obj)
status := "output-ok"
ok := true
if err != nil {
status = "output-failed"
ok = false
fs.Errorf(nil, "cluster: process completed job failed: %v", err)
}
c.jobs.finish(ctx, obj, status, ok)
}
}
// Run the background process
func (c *Cluster) run(ctx context.Context) {
defer c.wg.Done()
checkJobs := time.NewTicker(clusterCheckJobsInterval)
defer checkJobs.Stop()
checkWorkers := time.NewTicker(clusterCheckWorkersInterval)
defer checkWorkers.Stop()
var syncedChans []chan<- struct{}
for {
select {
case <-ctx.Done():
return
case <-c.quit:
fs.Debugf(nil, "cluster: quit request received")
return
case synced := <-c.sync:
syncedChans = append(syncedChans, synced)
fs.Debugf(nil, "cluster: sync request received")
case <-checkWorkers.C:
c.checkWorkers(ctx)
case <-checkJobs.C:
}
c.checkJobs(ctx)
if len(syncedChans) > 0 {
c.mu.Lock()
n := len(c.inflight)
c.mu.Unlock()
if n == 0 {
fs.Debugf(nil, "cluster: synced")
for _, synced := range syncedChans {
synced <- struct{}{}
}
syncedChans = nil
}
}
}
}
// Sync the cluster.
//
// Call this when all job items have been added to the cluster.
//
// This will wait for any outstanding jobs to finish regardless of who
// put them in
func (c *Cluster) Sync(ctx context.Context) error {
// Flush any outstanding
c.mu.Lock()
err := c.sendBatch(ctx)
c.mu.Unlock()
// Wait for the cluster to be empty
done := make(chan struct{})
c.sync <- done
<-done
return err
}
// Shutdown the cluster.
//
// Call this when all job items have been added to the cluster.
//
// This will wait for any outstanding jobs to finish.
func (c *Cluster) Shutdown(ctx context.Context) (err error) {
c.mu.Lock()
inBatch := len(c.currentBatch.Inputs)
inFlight := len(c.inflight)
shutdown := c.shutdown
c.shutdown = true
c.mu.Unlock()
if inBatch > 0 {
err = errors.Join(nil, fmt.Errorf("%d items batched on cluster shutdown", inBatch))
}
if inFlight > 0 {
err = errors.Join(nil, fmt.Errorf("%d items in flight on cluster shutdown", inFlight))
}
if shutdown {
fs.Debugf(nil, "cluster: already shutdown")
return nil
}
c.quit <- struct{}{}
fs.Debugf(nil, "Waiting for cluster to finish")
c.wg.Wait()
// Send a quit job
if c.quitWorkers {
fs.Logf(nil, "Sending quit to workers")
quitErr := c.jobs.writeQuitJob(ctx, clusterPending)
if quitErr != nil {
err = errors.Join(err, fmt.Errorf("shutdown quit: %w", quitErr))
}
}
return err
}
// Abort the cluster and any outstanding jobs.
func (c *Cluster) Abort() {
c.cancel()
c.wg.Wait()
}

311
fs/cluster/jobs.go Normal file
View File

@@ -0,0 +1,311 @@
package cluster
import (
"bytes"
"cmp"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"os"
"path"
"slices"
"strings"
"time"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/cache"
"github.com/rclone/rclone/fs/hash"
"github.com/rclone/rclone/fs/object"
"github.com/rclone/rclone/fs/operations"
"github.com/rclone/rclone/lib/atexit"
"github.com/rclone/rclone/lib/pacer"
"github.com/rclone/rclone/lib/random"
)
// Batches flow from queue/pending to queue/processing/
const (
clusterQueue = "queue"
clusterPending = clusterQueue + "/pending"
clusterProcessing = clusterQueue + "/processing"
clusterDone = clusterQueue + "/done"
clusterFinished = clusterQueue + "/finished"
clusterStatus = clusterQueue + "/status"
minSleep = 10 * time.Millisecond
maxSleep = 2 * time.Second
decayConstant = 2 // bigger for slower decay, exponential
// Read the queue this often
clusterCheckJobsInterval = time.Second
// Write the worker status this often
clusterWriteStatusInterval = time.Second
// Read the worker status this often
clusterCheckWorkersInterval = time.Second
// Name of job which signals to the workers to quit
quitJob = "QUIT"
)
// Jobs is a container for sending and receiving jobs to the cluster.
type Jobs struct {
remote string // remote for job storage
f fs.Fs // cluster remote storage
partial bool // do we need to write and rename
hasMove bool // set if f has server side move otherwise has server side copy
cleanup fs.ClusterCleanup // how we cleanup the cluster files
pacer *fs.Pacer // To pace the API calls
}
// NewJobs creates a Jobs source from the config in ctx.
//
// It may return nil for no cluster is configured.
func NewJobs(ctx context.Context) (*Jobs, error) {
ci := fs.GetConfig(ctx)
if ci.Cluster == "" {
return nil, nil
}
f, err := cache.Get(ctx, ci.Cluster)
if err != nil {
return nil, fmt.Errorf("cluster remote creation: %w", err)
}
features := f.Features()
if features.Move == nil && features.Copy == nil {
return nil, fmt.Errorf("cluster remote must have server side move and %q doesn't", ci.Cluster)
}
jobs := &Jobs{
remote: ci.Cluster,
f: f,
partial: features.PartialUploads,
hasMove: features.Move != nil,
pacer: fs.NewPacer(ctx, pacer.NewDefault(pacer.MinSleep(minSleep), pacer.MaxSleep(maxSleep), pacer.DecayConstant(decayConstant))),
cleanup: ci.ClusterCleanup,
}
return jobs, nil
}
// Create the cluster directory structure
func (jobs *Jobs) createDirectoryStructure(ctx context.Context) (err error) {
for _, dir := range []string{clusterPending, clusterProcessing, clusterDone, clusterFinished, clusterStatus} {
err = jobs.f.Mkdir(ctx, dir)
if err != nil {
return fmt.Errorf("cluster mkdir %q: %w", dir, err)
}
}
return nil
}
// rename a file
//
// if this returns fs.ErrorObjectNotFound then the file has already been renamed.
func (jobs *Jobs) rename(ctx context.Context, src fs.Object, dstRemote string) (dst fs.Object, err error) {
features := jobs.f.Features()
if jobs.hasMove {
dst, err = features.Move(ctx, src, dstRemote)
if err != nil {
return nil, fmt.Errorf("cluster: failed to rename job file: %w", err)
}
} else {
dst, err = features.Copy(ctx, src, dstRemote)
if err != nil {
return nil, fmt.Errorf("cluster: failed to rename (copy phase) job file: %w", err)
}
err = src.Remove(ctx)
if err != nil {
return nil, fmt.Errorf("cluster: failed to rename (delete phase) job file: %w", err)
}
}
return dst, nil
}
// Finish with a jobs file
func (jobs *Jobs) finish(ctx context.Context, obj fs.Object, status string, ok bool) {
var err error
if (ok && jobs.cleanup == fs.ClusterCleanupCompleted) || jobs.cleanup == fs.ClusterCleanupFull {
err = obj.Remove(ctx)
} else {
name := path.Join(clusterFinished, status, path.Base(obj.Remote()))
_, err = jobs.rename(ctx, obj, name)
}
if err != nil {
fs.Errorf(nil, "cluster: removing completed job failed: %v", err)
}
}
// write buf into remote
func (jobs *Jobs) writeFile(ctx context.Context, remote string, modTime time.Time, buf []byte) error {
partialRemote := remote
if jobs.partial {
partialRemote = remote + ".partial"
}
// Calculate hashes
w, err := hash.NewMultiHasherTypes(jobs.f.Hashes())
if err != nil {
return err
}
_, err = w.Write(buf)
if err != nil {
return err
}
obji := object.NewStaticObjectInfo(partialRemote, modTime, int64(len(buf)), true, w.Sums(), jobs.f)
var obj fs.Object
err = jobs.pacer.Call(func() (bool, error) {
in := bytes.NewBuffer(buf)
obj, err = jobs.f.Put(ctx, in, obji)
if err != nil {
return true, fmt.Errorf("cluster: failed to write %q: %q", remote, err)
}
return false, nil
})
if err != nil {
return err
}
if jobs.partial {
obj, err = jobs.rename(ctx, obj, remote)
if err != nil {
return err
}
}
return nil
}
// Remove the file if it exists
func (jobs *Jobs) removeFile(ctx context.Context, remote string) error {
obj, err := jobs.f.NewObject(ctx, remote)
if errors.Is(err, fs.ErrorObjectNotFound) || errors.Is(err, fs.ErrorDirNotFound) {
return nil
} else if err != nil {
return err
}
return obj.Remove(ctx)
}
// write a job to a file returning the name
func (jobs *Jobs) writeJob(ctx context.Context, where string, job any) (name string, err error) {
now := time.Now().UTC()
name = fmt.Sprintf("%s-%s", now.Format(time.RFC3339Nano), random.String(20))
remote := path.Join(where, name+".json")
buf, err := json.MarshalIndent(job, "", "\t")
if err != nil {
return "", fmt.Errorf("cluster: job json: %w", err)
}
err = jobs.writeFile(ctx, remote, now, buf)
if err != nil {
return "", fmt.Errorf("cluster: job write: %w", err)
}
return name, nil
}
// write a quit job to a file
func (jobs *Jobs) writeQuitJob(ctx context.Context, where string) (err error) {
now := time.Now().UTC()
remote := path.Join(where, quitJob+".json")
err = jobs.writeFile(ctx, remote, now, []byte("{}"))
if err != nil {
return fmt.Errorf("cluster: quit job write: %w", err)
}
return nil
}
// read buf from object
func (jobs *Jobs) readFile(ctx context.Context, o fs.Object) (buf []byte, err error) {
err = jobs.pacer.Call(func() (bool, error) {
in, err := operations.Open(ctx, o)
if err != nil {
return true, fmt.Errorf("cluster: failed to open %q: %w", o, err)
}
buf, err = io.ReadAll(in)
if err != nil {
return true, fmt.Errorf("cluster: failed to read %q: %w", o, err)
}
err = in.Close()
if err != nil {
return true, fmt.Errorf("cluster: failed to close %q: %w", o, err)
}
return false, nil
})
if err != nil {
return nil, err
}
return buf, nil
}
// read a job from a file
//
// job should be a pointer to something to be unmarshalled
func (jobs *Jobs) readJob(ctx context.Context, obj fs.Object, job any) error {
buf, err := jobs.readFile(ctx, obj)
if err != nil {
return fmt.Errorf("cluster: job read: %w", err)
}
err = json.Unmarshal(buf, job)
if err != nil {
return fmt.Errorf("cluster: job read json: %w", err)
}
return nil
}
// lists the json files in a cluster directory
func (jobs *Jobs) listDir(ctx context.Context, dir string) (objects []fs.Object, err error) {
entries, err := jobs.f.List(ctx, dir)
if err != nil {
return nil, fmt.Errorf("cluster: failed to list %q: %w", dir, err)
}
entries.ForObject(func(o fs.Object) {
if strings.HasSuffix(o.Remote(), ".json") {
objects = append(objects, o)
}
})
slices.SortStableFunc(objects, func(a, b fs.Object) int {
return cmp.Compare(a.Remote(), b.Remote())
})
return objects, nil
}
// get a job from pending if there is one available.
//
// Returns a nil object if no jobs are available.
//
// FIXME should mark jobs as error jobs in here if they can't be read properly?
func (jobs *Jobs) getJob(ctx context.Context, id string) (name string, obj fs.Object, err error) {
objs, err := jobs.listDir(ctx, clusterPending)
if err != nil {
return "", nil, fmt.Errorf("get job list: %w", err)
}
quit := false
for len(objs) > 0 {
obj = objs[0]
objs = objs[1:]
name = path.Base(obj.Remote())
name, _ = strings.CutSuffix(name, ".json")
// See if we have been asked to quit
if name == quitJob {
quit = true
continue
}
// claim the job
newName := fmt.Sprintf("%s-%s.json", name, id)
newRemote := path.Join(clusterProcessing, newName)
obj, err = jobs.rename(ctx, obj, newRemote)
if errors.Is(err, fs.ErrorObjectNotFound) {
// claim failed - try again
continue
}
if err != nil {
return "", nil, fmt.Errorf("get job claim: %w", err)
}
return name, obj, nil
}
// No jobs found
if quit {
fs.Logf(nil, "Exiting cluster worker on command")
atexit.Run()
os.Exit(0)
}
return "", nil, nil
}

211
fs/cluster/worker.go Normal file
View File

@@ -0,0 +1,211 @@
package cluster
import (
"context"
"encoding/json"
"path"
"sync"
"time"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/accounting"
"github.com/rclone/rclone/fs/rc"
"github.com/rclone/rclone/fs/rc/jobs"
"github.com/rclone/rclone/lib/random"
)
const maxWorkersDone = 16 // maximum jobs in the done list
// Worker describes a single instance of a cluster worker.
type Worker struct {
jobs *Jobs
cancel func() // stop bg job
wg sync.WaitGroup // bg job finished
id string // id of this worker
status string // place it stores it status
jobsMu sync.Mutex
running map[string]struct{} // IDs of the jobs being processed
done []string // IDs of finished jobs
}
// WorkerStatus shows the status of this worker including jobs
// running.
type WorkerStatus struct {
ID string `json:"id"`
Running map[string]rc.Params `json:"running"` // Job ID => accounting.RemoteStats
Done map[string]bool `json:"done"` // Job ID => finished status
Updated time.Time `json:"updated"`
}
// NewWorker creates a new cluster from the config in ctx.
//
// It may return nil for no cluster is configured.
func NewWorker(ctx context.Context) (*Worker, error) {
ci := fs.GetConfig(ctx)
if ci.Cluster == "" {
return nil, nil
}
jobs, err := NewJobs(ctx)
if err != nil {
return nil, err
}
w := &Worker{
jobs: jobs,
id: ci.ClusterID,
running: make(map[string]struct{}),
}
if w.id == "" {
w.id = random.String(10)
}
w.status = path.Join(clusterStatus, w.id+".json")
// Start the background workers
bgCtx, cancel := context.WithCancel(context.Background())
w.cancel = cancel
w.wg.Add(1)
go w.runJobs(bgCtx)
w.wg.Add(1)
go w.runStatus(bgCtx)
fs.Logf(w.jobs.f, "Started cluster worker")
return w, nil
}
// Check to see if a job exists and run it if available
func (w *Worker) checkJobs(ctx context.Context) {
name, obj, err := w.jobs.getJob(ctx, w.id)
if err != nil {
fs.Errorf(nil, "check jobs get: %v", err)
return
}
if obj == nil {
return // no jobs available
}
// make a stats group for this job
ctx = accounting.WithStatsGroup(ctx, name)
// Add job ID
w.jobsMu.Lock()
w.running[name] = struct{}{}
w.jobsMu.Unlock()
fs.Infof(nil, "write jobID %q", name)
// Remove job ID on exit
defer func() {
w.jobsMu.Lock()
delete(w.running, name)
w.done = append(w.done, name)
if len(w.done) > maxWorkersDone {
w.done = w.done[len(w.done)-maxWorkersDone : len(w.done)]
}
w.jobsMu.Unlock()
}()
fs.Debugf(nil, "cluster: processing pending job %q", name)
inBuf, err := w.jobs.readFile(ctx, obj)
if err != nil {
fs.Errorf(nil, "check jobs read: %v", err)
w.jobs.finish(ctx, obj, "input-error", false)
return
}
outBuf := jobs.NewJobFromBytes(ctx, inBuf)
remote := path.Join(clusterDone, name+".json")
err = w.jobs.writeFile(ctx, remote, time.Now(), outBuf)
if err != nil {
fs.Errorf(nil, "check jobs failed to write output: %v", err)
return
}
w.jobs.finish(ctx, obj, "input-ok", true)
fs.Debugf(nil, "cluster: processed pending job %q", name)
}
// Run the background process to pick up jobs
func (w *Worker) runJobs(ctx context.Context) {
defer w.wg.Done()
checkJobs := time.NewTicker(clusterCheckJobsInterval)
defer checkJobs.Stop()
for {
select {
case <-ctx.Done():
return
case <-checkJobs.C:
w.checkJobs(ctx)
}
}
}
// Write the worker status
func (w *Worker) writeStatus(ctx context.Context) {
// Create the worker status from the jobIDs and the short stats
status := WorkerStatus{
ID: w.id,
Running: make(map[string]rc.Params),
Updated: time.Now(),
Done: make(map[string]bool),
}
w.jobsMu.Lock()
for _, jobID := range w.done {
status.Done[jobID] = true
}
for jobID := range w.running {
fs.Infof(nil, "read jobID %q", jobID)
si := accounting.StatsGroup(ctx, jobID)
out, err := si.RemoteStats(true)
if err != nil {
fs.Errorf(nil, "cluster: write status: stats: %v", err)
status.Running[jobID] = rc.Params{}
} else {
status.Running[jobID] = out
}
status.Done[jobID] = false
}
w.jobsMu.Unlock()
// Write the stats to a file
buf, err := json.MarshalIndent(status, "", "\t")
if err != nil {
fs.Errorf(nil, "cluster: write status: json: %w", err)
return
}
err = w.jobs.writeFile(ctx, w.status, status.Updated, buf)
if err != nil {
fs.Errorf(nil, "cluster: write status: %w", err)
}
}
// Remove the worker status
func (w *Worker) clearStatus(ctx context.Context) {
err := w.jobs.removeFile(ctx, w.status)
if err != nil {
fs.Errorf(nil, "cluster: clear status: %w", err)
}
}
// Run the background process to write status
func (w *Worker) runStatus(ctx context.Context) {
defer w.wg.Done()
w.writeStatus(ctx)
defer w.clearStatus(ctx)
writeStatus := time.NewTicker(clusterWriteStatusInterval)
defer writeStatus.Stop()
for {
select {
case <-ctx.Done():
return
case <-writeStatus.C:
t0 := time.Now()
w.writeStatus(ctx)
fs.Debugf(nil, "write status took %v at %v", time.Since(t0), t0)
}
}
}
// Shutdown the worker regardless of whether it has work to process or not.
func (w *Worker) Shutdown(ctx context.Context) error {
w.cancel()
w.wg.Wait()
return nil
}

View File

@@ -50,6 +50,34 @@ var (
ConfigEdit = "config_fs_edit"
)
// ClusterCleanup describes the cluster cleanup choices.
type ClusterCleanup = Enum[clusterCleanupChoices]
// Cluster cleanup choices.
//
// ClusterCleanupNone don't remove any cluster files
// ClusterCleanupCompleted remove successfully completed jobs
// ClusterCleanupFull remove everything on exit
const (
ClusterCleanupNone ClusterCleanup = iota
ClusterCleanupCompleted
ClusterCleanupFull
)
type clusterCleanupChoices struct{}
func (clusterCleanupChoices) Choices() []string {
return []string{
ClusterCleanupNone: "none",
ClusterCleanupCompleted: "completed",
ClusterCleanupFull: "full",
}
}
func (clusterCleanupChoices) Type() string {
return "ClusterCleanup"
}
// ConfigOptionsInfo describes the Options in use
var ConfigOptionsInfo = Options{{
Name: "modify_window",
@@ -566,6 +594,36 @@ var ConfigOptionsInfo = Options{{
Default: "",
Help: "HTTP proxy URL.",
Groups: "Networking",
}, {
Name: "cluster",
Default: "",
Help: "Enable cluster mode with remote to use as shared storage.",
Groups: "Networking",
}, {
Name: "cluster_id",
Default: "",
Help: "Set to an ID for the cluster. An ID of 0 or empty becomes the controller.",
Groups: "Networking",
}, {
Name: "cluster_quit_workers",
Default: false,
Help: "Set to cause the controller to quit the workers when it finished.",
Groups: "Networking",
}, {
Name: "cluster_batch_files",
Default: 1000,
Help: "Max number of files for a cluster batch.",
Groups: "Networking",
}, {
Name: "cluster_batch_size",
Default: Tebi,
Help: "Max size of files for a cluster batch.",
Groups: "Networking",
}, {
Name: "cluster_cleanup",
Default: ClusterCleanupFull,
Help: "Control which cluster files get cleaned up.",
Groups: "Networking",
}}
// ConfigInfo is filesystem config options
@@ -680,6 +738,12 @@ type ConfigInfo struct {
MaxConnections int `config:"max_connections"`
NameTransform []string `config:"name_transform"`
HTTPProxy string `config:"http_proxy"`
Cluster string `config:"cluster"`
ClusterID string `config:"cluster_id"`
ClusterQuitWorkers bool `config:"cluster_quit_workers"`
ClusterBatchFiles int `config:"cluster_batch_files"`
ClusterBatchSize SizeSuffix `config:"cluster_batch_size"`
ClusterCleanup ClusterCleanup `config:"cluster_cleanup"`
}
func init() {

View File

@@ -2,21 +2,9 @@ package fs
import (
"context"
"encoding/json"
"errors"
"fmt"
"os"
"strings"
"sync"
"testing"
"time"
"github.com/spf13/pflag"
"github.com/stretchr/testify/require"
"github.com/rclone/rclone/fs/config/configmap"
"github.com/rclone/rclone/fs/fserrors"
"github.com/rclone/rclone/lib/pacer"
"github.com/stretchr/testify/assert"
)
@@ -90,315 +78,3 @@ func TestFeaturesDisableList(t *testing.T) {
assert.False(t, ft.CaseInsensitive)
assert.False(t, ft.DuplicateFiles)
}
// Check it satisfies the interface
var _ pflag.Value = (*Option)(nil)
func TestOption(t *testing.T) {
d := &Option{
Name: "potato",
Value: SizeSuffix(17 << 20),
}
assert.Equal(t, "17Mi", d.String())
assert.Equal(t, "SizeSuffix", d.Type())
err := d.Set("18M")
assert.NoError(t, err)
assert.Equal(t, SizeSuffix(18<<20), d.Value)
err = d.Set("sdfsdf")
assert.Error(t, err)
}
var errFoo = errors.New("foo")
type dummyPaced struct {
retry bool
called int
wait *sync.Cond
}
func (dp *dummyPaced) fn() (bool, error) {
if dp.wait != nil {
dp.wait.L.Lock()
dp.wait.Wait()
dp.wait.L.Unlock()
}
dp.called++
return dp.retry, errFoo
}
func TestPacerCall(t *testing.T) {
ctx := context.Background()
config := GetConfig(ctx)
expectedCalled := config.LowLevelRetries
if expectedCalled == 0 {
ctx, config = AddConfig(ctx)
expectedCalled = 20
config.LowLevelRetries = expectedCalled
}
p := NewPacer(ctx, pacer.NewDefault(pacer.MinSleep(1*time.Millisecond), pacer.MaxSleep(2*time.Millisecond)))
dp := &dummyPaced{retry: true}
err := p.Call(dp.fn)
require.Equal(t, expectedCalled, dp.called)
require.Implements(t, (*fserrors.Retrier)(nil), err)
}
func TestPacerCallNoRetry(t *testing.T) {
p := NewPacer(context.Background(), pacer.NewDefault(pacer.MinSleep(1*time.Millisecond), pacer.MaxSleep(2*time.Millisecond)))
dp := &dummyPaced{retry: true}
err := p.CallNoRetry(dp.fn)
require.Equal(t, 1, dp.called)
require.Implements(t, (*fserrors.Retrier)(nil), err)
}
// Test options
var (
nouncOption = Option{
Name: "nounc",
}
copyLinksOption = Option{
Name: "copy_links",
Default: false,
NoPrefix: true,
ShortOpt: "L",
Advanced: true,
}
caseInsensitiveOption = Option{
Name: "case_insensitive",
Default: false,
Value: true,
Advanced: true,
}
testOptions = Options{nouncOption, copyLinksOption, caseInsensitiveOption}
)
func TestOptionsSetValues(t *testing.T) {
assert.Nil(t, testOptions[0].Default)
assert.Equal(t, false, testOptions[1].Default)
assert.Equal(t, false, testOptions[2].Default)
testOptions.setValues()
assert.Equal(t, "", testOptions[0].Default)
assert.Equal(t, false, testOptions[1].Default)
assert.Equal(t, false, testOptions[2].Default)
}
func TestOptionsGet(t *testing.T) {
opt := testOptions.Get("copy_links")
assert.Equal(t, &copyLinksOption, opt)
opt = testOptions.Get("not_found")
assert.Nil(t, opt)
}
func TestOptionsOveridden(t *testing.T) {
m := configmap.New()
m1 := configmap.Simple{
"nounc": "m1",
"copy_links": "m1",
}
m.AddGetter(m1, configmap.PriorityNormal)
m2 := configmap.Simple{
"nounc": "m2",
"case_insensitive": "m2",
}
m.AddGetter(m2, configmap.PriorityConfig)
m3 := configmap.Simple{
"nounc": "m3",
}
m.AddGetter(m3, configmap.PriorityDefault)
got := testOptions.Overridden(m)
assert.Equal(t, configmap.Simple{
"copy_links": "m1",
"nounc": "m1",
}, got)
}
func TestOptionsNonDefault(t *testing.T) {
m := configmap.Simple{}
got := testOptions.NonDefault(m)
assert.Equal(t, configmap.Simple{}, got)
m["case_insensitive"] = "false"
got = testOptions.NonDefault(m)
assert.Equal(t, configmap.Simple{}, got)
m["case_insensitive"] = "true"
got = testOptions.NonDefault(m)
assert.Equal(t, configmap.Simple{"case_insensitive": "true"}, got)
}
func TestOptionMarshalJSON(t *testing.T) {
out, err := json.MarshalIndent(&caseInsensitiveOption, "", "")
assert.NoError(t, err)
require.Equal(t, `{
"Name": "case_insensitive",
"FieldName": "",
"Help": "",
"Default": false,
"Value": true,
"Hide": 0,
"Required": false,
"IsPassword": false,
"NoPrefix": false,
"Advanced": true,
"Exclusive": false,
"Sensitive": false,
"DefaultStr": "false",
"ValueStr": "true",
"Type": "bool"
}`, string(out))
}
func TestOptionGetValue(t *testing.T) {
assert.Equal(t, "", nouncOption.GetValue())
assert.Equal(t, false, copyLinksOption.GetValue())
assert.Equal(t, true, caseInsensitiveOption.GetValue())
}
func TestOptionString(t *testing.T) {
assert.Equal(t, "", nouncOption.String())
assert.Equal(t, "false", copyLinksOption.String())
assert.Equal(t, "true", caseInsensitiveOption.String())
}
func TestOptionStringStringArray(t *testing.T) {
opt := Option{
Name: "string_array",
Default: []string(nil),
}
assert.Equal(t, "", opt.String())
opt.Default = []string{}
assert.Equal(t, "", opt.String())
opt.Default = []string{"a", "b"}
assert.Equal(t, "a,b", opt.String())
opt.Default = []string{"hello, world!", "goodbye, world!"}
assert.Equal(t, `"hello, world!","goodbye, world!"`, opt.String())
}
func TestOptionStringSizeSuffix(t *testing.T) {
opt := Option{
Name: "size_suffix",
Default: SizeSuffix(0),
}
assert.Equal(t, "0", opt.String())
opt.Default = SizeSuffix(-1)
assert.Equal(t, "off", opt.String())
opt.Default = SizeSuffix(100)
assert.Equal(t, "100B", opt.String())
opt.Default = SizeSuffix(1024)
assert.Equal(t, "1Ki", opt.String())
}
func TestOptionSet(t *testing.T) {
o := caseInsensitiveOption
assert.Equal(t, true, o.Value)
err := o.Set("FALSE")
assert.NoError(t, err)
assert.Equal(t, false, o.Value)
o = copyLinksOption
assert.Equal(t, nil, o.Value)
err = o.Set("True")
assert.NoError(t, err)
assert.Equal(t, true, o.Value)
err = o.Set("INVALID")
assert.Error(t, err)
assert.Equal(t, true, o.Value)
}
func TestOptionType(t *testing.T) {
assert.Equal(t, "string", nouncOption.Type())
assert.Equal(t, "bool", copyLinksOption.Type())
assert.Equal(t, "bool", caseInsensitiveOption.Type())
}
func TestOptionFlagName(t *testing.T) {
assert.Equal(t, "local-nounc", nouncOption.FlagName("local"))
assert.Equal(t, "copy-links", copyLinksOption.FlagName("local"))
assert.Equal(t, "local-case-insensitive", caseInsensitiveOption.FlagName("local"))
}
func TestOptionEnvVarName(t *testing.T) {
assert.Equal(t, "RCLONE_LOCAL_NOUNC", nouncOption.EnvVarName("local"))
assert.Equal(t, "RCLONE_LOCAL_COPY_LINKS", copyLinksOption.EnvVarName("local"))
assert.Equal(t, "RCLONE_LOCAL_CASE_INSENSITIVE", caseInsensitiveOption.EnvVarName("local"))
}
func TestOptionGetters(t *testing.T) {
// Set up env vars
envVars := [][2]string{
{"RCLONE_CONFIG_LOCAL_POTATO_PIE", "yes"},
{"RCLONE_COPY_LINKS", "TRUE"},
{"RCLONE_LOCAL_NOUNC", "NOUNC"},
}
for _, ev := range envVars {
assert.NoError(t, os.Setenv(ev[0], ev[1]))
}
defer func() {
for _, ev := range envVars {
assert.NoError(t, os.Unsetenv(ev[0]))
}
}()
oldConfigFileGet := ConfigFileGet
ConfigFileGet = func(section, key string) (string, bool) {
if section == "sausage" && key == "key1" {
return "value1", true
}
return "", false
}
defer func() {
ConfigFileGet = oldConfigFileGet
}()
// set up getters
// A configmap.Getter to read from the environment RCLONE_CONFIG_backend_option_name
configEnvVarsGetter := configEnvVars("local")
// A configmap.Getter to read from the environment RCLONE_option_name
optionEnvVarsGetter := optionEnvVars{"local", testOptions}
// A configmap.Getter to read either the default value or the set
// value from the RegInfo.Options
regInfoValuesGetterFalse := &regInfoValues{
options: testOptions,
useDefault: false,
}
regInfoValuesGetterTrue := &regInfoValues{
options: testOptions,
useDefault: true,
}
// A configmap.Setter to read from the config file
configFileGetter := getConfigFile("sausage")
for i, test := range []struct {
get configmap.Getter
key string
wantValue string
wantOk bool
}{
{configEnvVarsGetter, "not_found", "", false},
{configEnvVarsGetter, "potato_pie", "yes", true},
{optionEnvVarsGetter, "not_found", "", false},
{optionEnvVarsGetter, "copy_links", "TRUE", true},
{optionEnvVarsGetter, "nounc", "NOUNC", true},
{optionEnvVarsGetter, "case_insensitive", "", false},
{regInfoValuesGetterFalse, "not_found", "", false},
{regInfoValuesGetterFalse, "case_insensitive", "true", true},
{regInfoValuesGetterFalse, "copy_links", "", false},
{regInfoValuesGetterTrue, "not_found", "", false},
{regInfoValuesGetterTrue, "case_insensitive", "true", true},
{regInfoValuesGetterTrue, "copy_links", "false", true},
{configFileGetter, "not_found", "", false},
{configFileGetter, "key1", "value1", true},
} {
what := fmt.Sprintf("%d: %+v: %q", i, test.get, test.key)
gotValue, gotOk := test.get.Get(test.key)
assert.Equal(t, test.wantValue, gotValue, what)
assert.Equal(t, test.wantOk, gotOk, what)
}
}

57
fs/pacer_test.go Normal file
View File

@@ -0,0 +1,57 @@
package fs
import (
"context"
"errors"
"sync"
"testing"
"time"
"github.com/rclone/rclone/fs/fserrors"
"github.com/rclone/rclone/lib/pacer"
"github.com/stretchr/testify/require"
)
var errFoo = errors.New("foo")
type dummyPaced struct {
retry bool
called int
wait *sync.Cond
}
func (dp *dummyPaced) fn() (bool, error) {
if dp.wait != nil {
dp.wait.L.Lock()
dp.wait.Wait()
dp.wait.L.Unlock()
}
dp.called++
return dp.retry, errFoo
}
func TestPacerCall(t *testing.T) {
ctx := context.Background()
config := GetConfig(ctx)
expectedCalled := config.LowLevelRetries
if expectedCalled == 0 {
ctx, config = AddConfig(ctx)
expectedCalled = 20
config.LowLevelRetries = expectedCalled
}
p := NewPacer(ctx, pacer.NewDefault(pacer.MinSleep(1*time.Millisecond), pacer.MaxSleep(2*time.Millisecond)))
dp := &dummyPaced{retry: true}
err := p.Call(dp.fn)
require.Equal(t, expectedCalled, dp.called)
require.Implements(t, (*fserrors.Retrier)(nil), err)
}
func TestPacerCallNoRetry(t *testing.T) {
p := NewPacer(context.Background(), pacer.NewDefault(pacer.MinSleep(1*time.Millisecond), pacer.MaxSleep(2*time.Millisecond)))
dp := &dummyPaced{retry: true}
err := p.CallNoRetry(dp.fn)
require.Equal(t, 1, dp.called)
require.Implements(t, (*fserrors.Retrier)(nil), err)
}

View File

@@ -2,11 +2,15 @@
package jobs
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
"runtime/debug"
"slices"
"strings"
"sync"
"sync/atomic"
"time"
@@ -17,6 +21,7 @@ import (
"github.com/rclone/rclone/fs/cache"
"github.com/rclone/rclone/fs/filter"
"github.com/rclone/rclone/fs/rc"
"golang.org/x/sync/errgroup"
)
// Fill in these to avoid circular dependencies
@@ -475,3 +480,249 @@ func rcGroupStop(ctx context.Context, in rc.Params) (out rc.Params, err error) {
out = make(rc.Params)
return out, nil
}
// NewJobFromParams creates an rc job rc.Params.
//
// The JSON blob should contain a _path entry.
//
// It returns a rc.Params as output which may be an error.
func NewJobFromParams(ctx context.Context, in rc.Params) (out rc.Params) {
path := "unknown"
// Return an rc error blob
rcError := func(err error, status int) rc.Params {
fs.Errorf(nil, "rc: %q: error: %v", path, err)
out, _ = rc.Error(path, in, err, status)
return out
}
// Find the call
path, err := in.GetString("_path")
if err != nil {
return rcError(err, http.StatusNotFound)
}
delete(in, "_path")
call := rc.Calls.Get(path)
if call == nil {
return rcError(fmt.Errorf("couldn't find path %q", path), http.StatusNotFound)
}
if call.NeedsRequest {
return rcError(fmt.Errorf("can't run path %q as it needs the request", path), http.StatusBadRequest)
}
if call.NeedsResponse {
return rcError(fmt.Errorf("can't run path %q as it needs the response", path), http.StatusBadRequest)
}
// Pass on the group if one is set in the context and it isn't set in the input.
if _, found := in["_group"]; !found {
group, ok := accounting.StatsGroupFromContext(ctx)
if ok {
in["_group"] = group
}
}
fs.Debugf(nil, "rc: %q: with parameters %+v", path, in)
_, out, err = NewJob(ctx, call.Fn, in)
if err != nil {
return rcError(err, http.StatusInternalServerError)
}
if out == nil {
out = make(rc.Params)
}
fs.Debugf(nil, "rc: %q: reply %+v: %v", path, out, err)
return out
}
// NewJobFromBytes creates an rc job from a JSON blob as bytes.
//
// The JSON blob should contain a _path entry.
//
// It returns a JSON blob as output which may be an error.
func NewJobFromBytes(ctx context.Context, inBuf []byte) (outBuf []byte) {
var in rc.Params
var out rc.Params
// Parse a JSON blob from the input
err := json.Unmarshal(inBuf, &in)
if err != nil {
out, _ = rc.Error("unknown", in, err, http.StatusBadRequest)
} else {
out = NewJobFromParams(ctx, in)
}
var w bytes.Buffer
err = rc.WriteJSON(&w, out)
if err != nil {
fs.Errorf(nil, "rc: NewJobFromBytes: failed to write JSON output: %v", err)
return []byte(`{"error":"failed to write JSON output"}`)
}
return w.Bytes()
}
func init() {
rc.Add(rc.Call{
Path: "job/batch",
AuthRequired: true, // require auth always since sub commands may require it
Fn: rcBatch,
Title: "Run a batch of rclone rc commands concurrently.",
Help: strings.ReplaceAll(`
This takes the following parameters:
- concurrency - int - do this many commands concurrently. Defaults to |--transfers| if not set.
- inputs - an list of inputs to the commands with an extra |_path| parameter
|||json
{
"_path": "rc/path",
"param1": "parameter for the path as documented",
"param2": "parameter for the path as documented, etc",
}
|||json
The inputs may use |_async|, |_group|, |_config| and |_filter| as normal when using the rc.
Returns:
- results - a list of results from the commands with one entry for each in inputs.
For example:
|||sh
rclone rc job/batch --json '{
"inputs": [
{
"_path": "rc/noop",
"parameter": "OK"
},
{
"_path": "rc/error",
"parameter": "BAD"
}
]
}
'
|||
Gives the result:
|||json
{
"results": [
{
"parameter": "OK"
},
{
"error": "arbitrary error on input map[parameter:BAD]",
"input": {
"parameter": "BAD"
},
"path": "rc/error",
"status": 500
}
]
}
|||
`, "|", "`"),
})
}
/*
// Run a single batch job
func runBatchJob(ctx context.Context, inputAny any) (out rc.Params, err error) {
var in rc.Params
path := "unknown"
defer func() {
if err != nil {
out, _ = rc.Error(path, in, err, http.StatusInternalServerError)
}
}()
// get the inputs to the job
input, ok := inputAny.(map[string]any)
if !ok {
return nil, rc.NewErrParamInvalid(fmt.Errorf("\"inputs\" items must be objects not %T", inputAny))
}
in = rc.Params(input)
path, err = in.GetString("_path")
if err != nil {
return nil, err
}
delete(in, "_path")
call := rc.Calls.Get(path)
// Check call
if call == nil {
return nil, rc.NewErrParamInvalid(fmt.Errorf("path %q does not exist", path))
}
path = call.Path
if call.NeedsRequest {
return nil, rc.NewErrParamInvalid(fmt.Errorf("can't run path %q as it needs the request", path))
}
if call.NeedsResponse {
return nil, rc.NewErrParamInvalid(fmt.Errorf("can't run path %q as it needs the response", path))
}
// Run the job
_, out, err = NewJob(ctx, call.Fn, in)
if err != nil {
return nil, err
}
// Reshape (serialize then deserialize) the data so it is in the form expected
err = rc.Reshape(&out, out)
if err != nil {
return nil, err
}
return out, nil
}
*/
// Batch the registered commands
func rcBatch(ctx context.Context, in rc.Params) (out rc.Params, err error) {
out = make(rc.Params)
// Read inputs
inputsAny, err := in.Get("inputs")
if err != nil {
return nil, err
}
inputs, ok := inputsAny.([]any)
if !ok {
return nil, rc.NewErrParamInvalid(fmt.Errorf("expecting list key %q (was %T)", "inputs", inputsAny))
}
// Read concurrency
concurrency, err := in.GetInt64("concurrency")
if rc.IsErrParamNotFound(err) {
ci := fs.GetConfig(ctx)
concurrency = int64(ci.Transfers)
} else if err != nil {
return nil, err
}
// Prepare outputs
results := make([]rc.Params, len(inputs))
out["results"] = results
g, gCtx := errgroup.WithContext(ctx)
g.SetLimit(int(concurrency))
for i, inputAny := range inputs {
input, ok := inputAny.(map[string]any)
if !ok {
results[i], _ = rc.Error("unknown", nil, fmt.Errorf("\"inputs\" items must be objects not %T", inputAny), http.StatusBadRequest)
continue
}
in := rc.Params(input)
if concurrency <= 1 {
results[i] = NewJobFromParams(ctx, in)
} else {
g.Go(func() error {
results[i] = NewJobFromParams(gCtx, in)
return nil
})
}
}
_ = g.Wait()
return out, nil
}

View File

@@ -2,6 +2,7 @@ package jobs
import (
"context"
"encoding/json"
"errors"
"runtime"
"testing"
@@ -602,3 +603,294 @@ func TestOnFinishDataRace(t *testing.T) {
}
}
}
// Register some test rc calls
func init() {
rc.Add(rc.Call{
Path: "test/needs_request",
NeedsRequest: true,
})
rc.Add(rc.Call{
Path: "test/needs_response",
NeedsResponse: true,
})
}
func TestNewJobFromParams(t *testing.T) {
ctx := context.Background()
for _, test := range []struct {
in rc.Params
want rc.Params
}{{
in: rc.Params{
"_path": "rc/noop",
"a": "potato",
},
want: rc.Params{
"a": "potato",
},
}, {
in: rc.Params{
"_path": "rc/noop",
"b": "sausage",
},
want: rc.Params{
"b": "sausage",
},
}, {
in: rc.Params{
"_path": "rc/error",
"e": "sausage",
},
want: rc.Params{
"error": "arbitrary error on input map[e:sausage]",
"input": rc.Params{
"e": "sausage",
},
"path": "rc/error",
"status": 500,
},
}, {
in: rc.Params{
"_path": "bad/path",
"param": "sausage",
},
want: rc.Params{
"error": "couldn't find path \"bad/path\"",
"input": rc.Params{
"param": "sausage",
},
"path": "bad/path",
"status": 404,
},
}, {
in: rc.Params{
"_path": "test/needs_request",
},
want: rc.Params{
"error": "can't run path \"test/needs_request\" as it needs the request",
"input": rc.Params{},
"path": "test/needs_request",
"status": 400,
},
}, {
in: rc.Params{
"_path": "test/needs_response",
},
want: rc.Params{
"error": "can't run path \"test/needs_response\" as it needs the response",
"input": rc.Params{},
"path": "test/needs_response",
"status": 400,
},
}, {
in: rc.Params{
"nopath": "BOOM",
},
want: rc.Params{
"error": "Didn't find key \"_path\" in input",
"input": rc.Params{
"nopath": "BOOM",
},
"path": "",
"status": 400,
},
}} {
got := NewJobFromParams(ctx, test.in)
assert.Equal(t, test.want, got)
}
}
func TestNewJobFromBytes(t *testing.T) {
ctx := context.Background()
for _, test := range []struct {
in string
want string
}{{
in: `{
"_path": "rc/noop",
"a": "potato"
}`,
want: `{
"a": "potato"
}
`,
}, {
in: `{
"_path": "rc/error",
"e": "sausage"
}`,
want: `{
"error": "arbitrary error on input map[e:sausage]",
"input": {
"e": "sausage"
},
"path": "rc/error",
"status": 500
}
`,
}, {
in: `parse error`,
want: `{
"error": "invalid character 'p' looking for beginning of value",
"input": null,
"path": "unknown",
"status": 400
}
`,
}, {
in: `"just a string"`,
want: `{
"error": "json: cannot unmarshal string into Go value of type rc.Params",
"input": null,
"path": "unknown",
"status": 400
}
`,
}} {
got := NewJobFromBytes(ctx, []byte(test.in))
assert.Equal(t, test.want, string(got))
}
}
func TestJobsBatch(t *testing.T) {
ctx := context.Background()
call := rc.Calls.Get("job/batch")
assert.NotNil(t, call)
inJSON := `{
"inputs": [
{
"_path": "rc/noop",
"a": "potato"
},
"bad string",
{
"_path": "rc/noop",
"b": "sausage"
},
{
"_path": "rc/error",
"e": "sausage"
},
{
"_path": "bad/path",
"param": "sausage"
},
{
"_path": "test/needs_request"
},
{
"_path": "test/needs_response"
},
{
"nopath": "BOOM"
}
]
}
`
var in rc.Params
require.NoError(t, json.Unmarshal([]byte(inJSON), &in))
wantJSON := `{
"results": [
{
"a": "potato"
},
{
"error": "\"inputs\" items must be objects not string",
"input": null,
"path": "unknown",
"status": 400
},
{
"b": "sausage"
},
{
"error": "arbitrary error on input map[e:sausage]",
"input": {
"e": "sausage"
},
"path": "rc/error",
"status": 500
},
{
"error": "couldn't find path \"bad/path\"",
"input": {
"param": "sausage"
},
"path": "bad/path",
"status": 404
},
{
"error": "can't run path \"test/needs_request\" as it needs the request",
"input": {},
"path": "test/needs_request",
"status": 400
},
{
"error": "can't run path \"test/needs_response\" as it needs the response",
"input": {},
"path": "test/needs_response",
"status": 400
},
{
"error": "Didn't find key \"_path\" in input",
"input": {
"nopath": "BOOM"
},
"path": "",
"status": 400
}
]
}
`
var want rc.Params
require.NoError(t, json.Unmarshal([]byte(wantJSON), &want))
out, err := call.Fn(ctx, in)
require.NoError(t, err)
var got rc.Params
require.NoError(t, rc.Reshape(&got, out))
assert.Equal(t, want, got)
}
func TestJobsBatchConcurrent(t *testing.T) {
ctx := context.Background()
for concurrency := range 10 {
in := rc.Params{}
var inputs []any
var results []rc.Params
for i := range 100 {
in := map[string]any{
"_path": "rc/noop",
"i": i,
}
inputs = append(inputs, in)
results = append(results, rc.Params{
"i": i,
})
}
in["inputs"] = inputs
want := rc.Params{
"results": results,
}
if concurrency > 0 {
in["concurrency"] = concurrency
}
call := rc.Calls.Get("job/batch")
assert.NotNil(t, call)
got, err := call.Fn(ctx, in)
require.NoError(t, err)
assert.Equal(t, want, got)
}
}

View File

@@ -154,6 +154,37 @@ func (os Options) NonDefault(m configmap.Getter) configmap.Simple {
return nonDefault
}
// NonDefaultRC discovers which config values aren't at their default
//
// It expects a pointer to the current config struct in opts.
//
// It returns the overridden config in rc config format.
func (os Options) NonDefaultRC(opts any) (map[string]any, error) {
items, err := configstruct.Items(opts)
if err != nil {
return nil, err
}
itemsByName := map[string]*configstruct.Item{}
for i := range items {
item := &items[i]
itemsByName[item.Name] = item
}
var nonDefault = map[string]any{}
for i := range os {
opt := &os[i]
item, found := itemsByName[opt.Name]
if !found {
return nil, fmt.Errorf("key %q in OptionsInfo not found in Options struct", opt.Name)
}
value := fmt.Sprint(item.Value)
defaultValue := fmt.Sprint(opt.Default)
if value != defaultValue {
nonDefault[item.Field] = item.Value
}
}
return nonDefault, nil
}
// HasAdvanced discovers if any options have an Advanced setting
func (os Options) HasAdvanced() bool {
for i := range os {

308
fs/registry_test.go Normal file
View File

@@ -0,0 +1,308 @@
package fs
import (
"encoding/json"
"fmt"
"os"
"testing"
"github.com/rclone/rclone/fs/config/configmap"
"github.com/spf13/pflag"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
// Check it satisfies the interface
var _ pflag.Value = (*Option)(nil)
func TestOption(t *testing.T) {
d := &Option{
Name: "potato",
Value: SizeSuffix(17 << 20),
}
assert.Equal(t, "17Mi", d.String())
assert.Equal(t, "SizeSuffix", d.Type())
err := d.Set("18M")
assert.NoError(t, err)
assert.Equal(t, SizeSuffix(18<<20), d.Value)
err = d.Set("sdfsdf")
assert.Error(t, err)
}
// Test options
var (
nouncOption = Option{
Name: "nounc",
}
copyLinksOption = Option{
Name: "copy_links",
Default: false,
NoPrefix: true,
ShortOpt: "L",
Advanced: true,
}
caseInsensitiveOption = Option{
Name: "case_insensitive",
Default: false,
Value: true,
Advanced: true,
}
testOptions = Options{nouncOption, copyLinksOption, caseInsensitiveOption}
)
func TestOptionsSetValues(t *testing.T) {
assert.Nil(t, testOptions[0].Default)
assert.Equal(t, false, testOptions[1].Default)
assert.Equal(t, false, testOptions[2].Default)
testOptions.setValues()
assert.Equal(t, "", testOptions[0].Default)
assert.Equal(t, false, testOptions[1].Default)
assert.Equal(t, false, testOptions[2].Default)
}
func TestOptionsGet(t *testing.T) {
opt := testOptions.Get("copy_links")
assert.Equal(t, &copyLinksOption, opt)
opt = testOptions.Get("not_found")
assert.Nil(t, opt)
}
func TestOptionsOveridden(t *testing.T) {
m := configmap.New()
m1 := configmap.Simple{
"nounc": "m1",
"copy_links": "m1",
}
m.AddGetter(m1, configmap.PriorityNormal)
m2 := configmap.Simple{
"nounc": "m2",
"case_insensitive": "m2",
}
m.AddGetter(m2, configmap.PriorityConfig)
m3 := configmap.Simple{
"nounc": "m3",
}
m.AddGetter(m3, configmap.PriorityDefault)
got := testOptions.Overridden(m)
assert.Equal(t, configmap.Simple{
"copy_links": "m1",
"nounc": "m1",
}, got)
}
func TestOptionsNonDefault(t *testing.T) {
m := configmap.Simple{}
got := testOptions.NonDefault(m)
assert.Equal(t, configmap.Simple{}, got)
m["case_insensitive"] = "false"
got = testOptions.NonDefault(m)
assert.Equal(t, configmap.Simple{}, got)
m["case_insensitive"] = "true"
got = testOptions.NonDefault(m)
assert.Equal(t, configmap.Simple{"case_insensitive": "true"}, got)
}
func TestOptionMarshalJSON(t *testing.T) {
out, err := json.MarshalIndent(&caseInsensitiveOption, "", "")
assert.NoError(t, err)
require.Equal(t, `{
"Name": "case_insensitive",
"FieldName": "",
"Help": "",
"Default": false,
"Value": true,
"Hide": 0,
"Required": false,
"IsPassword": false,
"NoPrefix": false,
"Advanced": true,
"Exclusive": false,
"Sensitive": false,
"DefaultStr": "false",
"ValueStr": "true",
"Type": "bool"
}`, string(out))
}
func TestOptionGetValue(t *testing.T) {
assert.Equal(t, "", nouncOption.GetValue())
assert.Equal(t, false, copyLinksOption.GetValue())
assert.Equal(t, true, caseInsensitiveOption.GetValue())
}
func TestOptionString(t *testing.T) {
assert.Equal(t, "", nouncOption.String())
assert.Equal(t, "false", copyLinksOption.String())
assert.Equal(t, "true", caseInsensitiveOption.String())
}
func TestOptionStringStringArray(t *testing.T) {
opt := Option{
Name: "string_array",
Default: []string(nil),
}
assert.Equal(t, "", opt.String())
opt.Default = []string{}
assert.Equal(t, "", opt.String())
opt.Default = []string{"a", "b"}
assert.Equal(t, "a,b", opt.String())
opt.Default = []string{"hello, world!", "goodbye, world!"}
assert.Equal(t, `"hello, world!","goodbye, world!"`, opt.String())
}
func TestOptionStringSizeSuffix(t *testing.T) {
opt := Option{
Name: "size_suffix",
Default: SizeSuffix(0),
}
assert.Equal(t, "0", opt.String())
opt.Default = SizeSuffix(-1)
assert.Equal(t, "off", opt.String())
opt.Default = SizeSuffix(100)
assert.Equal(t, "100B", opt.String())
opt.Default = SizeSuffix(1024)
assert.Equal(t, "1Ki", opt.String())
}
func TestOptionSet(t *testing.T) {
o := caseInsensitiveOption
assert.Equal(t, true, o.Value)
err := o.Set("FALSE")
assert.NoError(t, err)
assert.Equal(t, false, o.Value)
o = copyLinksOption
assert.Equal(t, nil, o.Value)
err = o.Set("True")
assert.NoError(t, err)
assert.Equal(t, true, o.Value)
err = o.Set("INVALID")
assert.Error(t, err)
assert.Equal(t, true, o.Value)
}
func TestOptionType(t *testing.T) {
assert.Equal(t, "string", nouncOption.Type())
assert.Equal(t, "bool", copyLinksOption.Type())
assert.Equal(t, "bool", caseInsensitiveOption.Type())
}
func TestOptionFlagName(t *testing.T) {
assert.Equal(t, "local-nounc", nouncOption.FlagName("local"))
assert.Equal(t, "copy-links", copyLinksOption.FlagName("local"))
assert.Equal(t, "local-case-insensitive", caseInsensitiveOption.FlagName("local"))
}
func TestOptionEnvVarName(t *testing.T) {
assert.Equal(t, "RCLONE_LOCAL_NOUNC", nouncOption.EnvVarName("local"))
assert.Equal(t, "RCLONE_LOCAL_COPY_LINKS", copyLinksOption.EnvVarName("local"))
assert.Equal(t, "RCLONE_LOCAL_CASE_INSENSITIVE", caseInsensitiveOption.EnvVarName("local"))
}
func TestOptionGetters(t *testing.T) {
// Set up env vars
envVars := [][2]string{
{"RCLONE_CONFIG_LOCAL_POTATO_PIE", "yes"},
{"RCLONE_COPY_LINKS", "TRUE"},
{"RCLONE_LOCAL_NOUNC", "NOUNC"},
}
for _, ev := range envVars {
assert.NoError(t, os.Setenv(ev[0], ev[1]))
}
defer func() {
for _, ev := range envVars {
assert.NoError(t, os.Unsetenv(ev[0]))
}
}()
oldConfigFileGet := ConfigFileGet
ConfigFileGet = func(section, key string) (string, bool) {
if section == "sausage" && key == "key1" {
return "value1", true
}
return "", false
}
defer func() {
ConfigFileGet = oldConfigFileGet
}()
// set up getters
// A configmap.Getter to read from the environment RCLONE_CONFIG_backend_option_name
configEnvVarsGetter := configEnvVars("local")
// A configmap.Getter to read from the environment RCLONE_option_name
optionEnvVarsGetter := optionEnvVars{"local", testOptions}
// A configmap.Getter to read either the default value or the set
// value from the RegInfo.Options
regInfoValuesGetterFalse := &regInfoValues{
options: testOptions,
useDefault: false,
}
regInfoValuesGetterTrue := &regInfoValues{
options: testOptions,
useDefault: true,
}
// A configmap.Setter to read from the config file
configFileGetter := getConfigFile("sausage")
for i, test := range []struct {
get configmap.Getter
key string
wantValue string
wantOk bool
}{
{configEnvVarsGetter, "not_found", "", false},
{configEnvVarsGetter, "potato_pie", "yes", true},
{optionEnvVarsGetter, "not_found", "", false},
{optionEnvVarsGetter, "copy_links", "TRUE", true},
{optionEnvVarsGetter, "nounc", "NOUNC", true},
{optionEnvVarsGetter, "case_insensitive", "", false},
{regInfoValuesGetterFalse, "not_found", "", false},
{regInfoValuesGetterFalse, "case_insensitive", "true", true},
{regInfoValuesGetterFalse, "copy_links", "", false},
{regInfoValuesGetterTrue, "not_found", "", false},
{regInfoValuesGetterTrue, "case_insensitive", "true", true},
{regInfoValuesGetterTrue, "copy_links", "false", true},
{configFileGetter, "not_found", "", false},
{configFileGetter, "key1", "value1", true},
} {
what := fmt.Sprintf("%d: %+v: %q", i, test.get, test.key)
gotValue, gotOk := test.get.Get(test.key)
assert.Equal(t, test.wantValue, gotValue, what)
assert.Equal(t, test.wantOk, gotOk, what)
}
}
func TestOptionsNonDefaultRC(t *testing.T) {
type cfg struct {
X string `config:"x"`
Y int `config:"y"`
}
c := &cfg{X: "a", Y: 6}
opts := Options{
{Name: "x", Default: "a"}, // at default, should be omitted
{Name: "y", Default: 5}, // non-default, should be included
}
got, err := opts.NonDefaultRC(c)
require.NoError(t, err)
require.Equal(t, map[string]any{"Y": 6}, got)
}
func TestOptionsNonDefaultRCMissingKey(t *testing.T) {
type cfg struct {
X string `config:"x"`
}
c := &cfg{X: "a"}
// Options refers to a key not present in the struct -> expect error
opts := Options{{Name: "missing", Default: ""}}
_, err := opts.NonDefaultRC(c)
assert.ErrorContains(t, err, "not found")
}

View File

@@ -14,6 +14,7 @@ import (
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/accounting"
"github.com/rclone/rclone/fs/cluster"
"github.com/rclone/rclone/fs/filter"
"github.com/rclone/rclone/fs/fserrors"
"github.com/rclone/rclone/fs/hash"
@@ -97,6 +98,7 @@ type syncCopyMove struct {
setDirModTimesMaxLevel int // max level of the directories to set
modifiedDirs map[string]struct{} // dirs with changed contents (if s.setDirModTimeAfter)
allowOverlap bool // whether we allow src and dst to overlap (i.e. for convmv)
cluster *cluster.Cluster // non-nil to run sync via cluster
}
// For keeping track of delayed modtime sets
@@ -164,6 +166,7 @@ func newSyncCopyMove(ctx context.Context, fdst, fsrc fs.Fs, deleteMode fs.Delete
setDirModTimeAfter: !ci.NoUpdateDirModTime && (!copyEmptySrcDirs || fsrc.Features().CanHaveEmptyDirectories && fdst.Features().DirModTimeUpdatesOnWrite),
modifiedDirs: make(map[string]struct{}),
allowOverlap: allowOverlap,
cluster: cluster.GetCluster(ctx),
}
s.logger, s.usingLogger = operations.GetLogger(ctx)
@@ -496,13 +499,25 @@ func (s *syncCopyMove) pairCopyOrMove(ctx context.Context, in *pipe, fdst fs.Fs,
dst := pair.Dst
if s.DoMove {
if src != dst {
_, err = operations.MoveTransfer(ctx, fdst, dst, src.Remote(), src)
if s.cluster != nil {
err = s.cluster.Move(ctx, fdst, dst, src.Remote(), src)
} else {
_, err = operations.MoveTransfer(ctx, fdst, dst, src.Remote(), src)
}
} else {
// src == dst signals delete the src
err = operations.DeleteFile(ctx, src)
if s.cluster != nil {
err = s.cluster.DeleteFile(ctx, src)
} else {
err = operations.DeleteFile(ctx, src)
}
}
} else {
_, err = operations.Copy(ctx, fdst, dst, src.Remote(), src)
if s.cluster != nil {
err = s.cluster.Copy(ctx, fdst, dst, src.Remote(), src)
} else {
_, err = operations.Copy(ctx, fdst, dst, src.Remote(), src)
}
}
s.processError(err)
if err != nil {
@@ -539,8 +554,13 @@ func (s *syncCopyMove) startTransfers() {
// This stops the background transfers
func (s *syncCopyMove) stopTransfers() {
s.toBeUploaded.Close()
fs.Debugf(s.fdst, "Waiting for transfers to finish")
s.transfersWg.Wait()
fs.Debugf(s.fdst, "Waiting for transfers to finish")
if s.cluster != nil {
fs.Debugf(s.fdst, "Waiting for cluster transfers to finish")
s.processError(s.cluster.Sync(s.ctx))
fs.Debugf(s.fdst, "Cluster transfers finished")
}
}
// This starts the background renamers.