mirror of
https://github.com/rclone/rclone.git
synced 2025-12-12 06:13:20 +00:00
Compare commits
10 Commits
copilot/fi
...
cluster
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ab60a77aba | ||
|
|
09535a06f7 | ||
|
|
173b720173 | ||
|
|
6660e6ec7c | ||
|
|
14c604335e | ||
|
|
bfcb23b7b2 | ||
|
|
46dbdb8cb7 | ||
|
|
17932fcc38 | ||
|
|
77faa787e1 | ||
|
|
0701dd55cd |
2
.github/workflows/build.yml
vendored
2
.github/workflows/build.yml
vendored
@@ -291,7 +291,7 @@ jobs:
|
||||
README.md
|
||||
RELEASE.md
|
||||
CODE_OF_CONDUCT.md
|
||||
docs/content/{authors,bugs,changelog,docs,downloads,faq,filtering,gui,install,licence,overview,privacy}.md
|
||||
docs/content/{authors,bugs,changelog,cluster,docs,downloads,faq,filtering,gui,install,licence,overview,privacy}.md
|
||||
|
||||
- name: Scan edits of autogenerated files
|
||||
run: bin/check_autogenerated_edits.py 'origin/${{ github.base_ref }}'
|
||||
|
||||
17
cmd/cmd.go
17
cmd/cmd.go
@@ -23,6 +23,7 @@ import (
|
||||
"github.com/rclone/rclone/fs"
|
||||
"github.com/rclone/rclone/fs/accounting"
|
||||
"github.com/rclone/rclone/fs/cache"
|
||||
"github.com/rclone/rclone/fs/cluster"
|
||||
"github.com/rclone/rclone/fs/config/configfile"
|
||||
"github.com/rclone/rclone/fs/config/configflags"
|
||||
"github.com/rclone/rclone/fs/config/flags"
|
||||
@@ -481,6 +482,22 @@ func initConfig() {
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// Run as a cluster worker if configured, otherwise ignoring
|
||||
// the command given on the command line
|
||||
if ci.Cluster != "" {
|
||||
if ci.ClusterID == "" || ci.ClusterID == "0" {
|
||||
fs.Infof(nil, "Running in cluster mode %q as controller", ci.ClusterID)
|
||||
} else {
|
||||
fs.Infof(nil, "Running in cluster mode %q as worker with id %q", ci.ClusterID, ci.ClusterID)
|
||||
worker, err := cluster.NewWorker(ctx)
|
||||
if err != nil || worker == nil {
|
||||
fs.Fatalf(nil, "Failed to start cluster worker: %v", err)
|
||||
}
|
||||
// Do not continue with the main thread
|
||||
select {}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func resolveExitCode(err error) {
|
||||
|
||||
@@ -7,6 +7,7 @@ import (
|
||||
|
||||
"github.com/rclone/rclone/cmd"
|
||||
"github.com/rclone/rclone/fs"
|
||||
"github.com/rclone/rclone/fs/cluster"
|
||||
"github.com/rclone/rclone/fs/rc"
|
||||
"github.com/rclone/rclone/fs/rc/rcflags"
|
||||
"github.com/rclone/rclone/fs/rc/rcserver"
|
||||
@@ -38,6 +39,8 @@ See the [rc documentation](/rc/) for more info on the rc flags.
|
||||
"groups": "RC",
|
||||
},
|
||||
Run: func(command *cobra.Command, args []string) {
|
||||
ctx := context.Background()
|
||||
|
||||
cmd.CheckArgs(0, 1, command, args)
|
||||
if rc.Opt.Enabled {
|
||||
fs.Fatalf(nil, "Don't supply --rc flag when using rcd")
|
||||
@@ -49,6 +52,12 @@ See the [rc documentation](/rc/) for more info on the rc flags.
|
||||
rc.Opt.Files = args[0]
|
||||
}
|
||||
|
||||
// Start the cluster worker if configured
|
||||
_, err := cluster.NewWorker(ctx)
|
||||
if err != nil {
|
||||
fs.Fatalf(nil, "Failed to start cluster worker: %v", err)
|
||||
}
|
||||
|
||||
s, err := rcserver.Start(context.Background(), &rc.Opt)
|
||||
if err != nil {
|
||||
fs.Fatalf(nil, "Failed to start remote control: %v", err)
|
||||
|
||||
217
docs/content/cluster.md
Normal file
217
docs/content/cluster.md
Normal file
@@ -0,0 +1,217 @@
|
||||
---
|
||||
title: "Cluster"
|
||||
description: "Clustering rclone"
|
||||
versionIntroduced: "v1.72"
|
||||
---
|
||||
|
||||
# Cluster
|
||||
|
||||
Rclone has a cluster mode invoked with the `--cluster` flag. This
|
||||
enables a group of rclone instances to work together on doing a sync.
|
||||
|
||||
This is controlled by a group of flags starting with `--cluster-` and
|
||||
enabled with the `--cluster` flag.
|
||||
|
||||
```text
|
||||
--cluster string Enable cluster mode with remote to use as shared storage
|
||||
--cluster-batch-files int Max number of files for a cluster batch (default 1000)
|
||||
--cluster-batch-size SizeSuffix Max size of files for a cluster batch (default 1Ti)
|
||||
--cluster-cleanup ClusterCleanup Control which cluster files get cleaned up (default full)
|
||||
--cluster-id string Set to an ID for the cluster. An ID of 0 or empty becomes the controller
|
||||
--cluster-quit-workers Set to cause the controller to quit the workers when it finished
|
||||
```
|
||||
|
||||
The command might look something like this which is a normal rclone
|
||||
command but with a new `--cluster` flag which points at an rclone
|
||||
remote defining the cluster storage. This is the signal to rclone that
|
||||
it should engage the cluster mode with a controller and workers.
|
||||
|
||||
```sh
|
||||
rclone copy source: destination: --flags --cluster /work
|
||||
rclone copy source: destination: --flags --cluster s3:bucket
|
||||
```
|
||||
|
||||
This works only with the `rclone sync`, `copy` and `move` commands.
|
||||
|
||||
If the remote specified by the `--cluster` command is inside the
|
||||
`source:` or `destination:` it must be excluded with the filter flags.
|
||||
|
||||
Any rclone remotes used in the transfer must be defined in all cluster
|
||||
nodes. Defining remotes with connection strings will get around that
|
||||
problem.
|
||||
|
||||
## Terminology
|
||||
|
||||
The cluster has two logical groups, the controller and the workers.
|
||||
There is one controller and many workers.
|
||||
|
||||
The controller and the workers will communicate with each other by
|
||||
creating files in the remote pointed to by the `--cluster` flag. This
|
||||
could be for example an S3 bucket or a Kubernetes PVC.
|
||||
|
||||
The files are JSON serialized rc commands. Multiple commands are sent
|
||||
using `rc/batch`. The commands flow `pending` →`processing` → `done` →
|
||||
`finished`
|
||||
|
||||
```text
|
||||
└── queue
|
||||
├── pending ← pending task files created by the controller
|
||||
├── processing ← claimed tasks being executed by a worker
|
||||
├── done ← finished tasks awaiting the controller to read the result
|
||||
└── finished ← completed task files
|
||||
```
|
||||
|
||||
The cluster can be set up in two ways as a persistent cluster or as a
|
||||
transient cluster.
|
||||
|
||||
### Persistent cluster
|
||||
|
||||
Run a cluster of workers using
|
||||
|
||||
```sh
|
||||
rclone rcd --cluster /work
|
||||
```
|
||||
|
||||
Then run rclone commands when required on the cluster:
|
||||
|
||||
```sh
|
||||
rclone copy source: destination: --flags --cluster /work
|
||||
```
|
||||
|
||||
In this mode there can be many rclone commands executing at once.
|
||||
|
||||
### Transient cluster
|
||||
|
||||
Run many copies of rclone simultaneously, for example in a Kubernetes
|
||||
indexed job.
|
||||
|
||||
The rclone with `--cluster-id 0` becomes the controller and the others
|
||||
become the workers. For a Kubernetes indexed job, setting
|
||||
`--cluster-id $(JOB_COMPLETION_INDEX)` would work well.
|
||||
|
||||
Add the `--cluster-quit-workers` flag - this will cause the controller
|
||||
to make sure the workers exit when it has finished.
|
||||
|
||||
All instances of rclone run a command like this so the whole cluster
|
||||
can only run one rclone command:
|
||||
|
||||
```sh
|
||||
rclone copy source: destination: --flags --cluster /work --cluster-id $(JOB_COMPLETION_INDEX) --cluster-quit-workers
|
||||
```
|
||||
|
||||
## Controller
|
||||
|
||||
The controller runs the sync and work distribution.
|
||||
|
||||
- It does the listing of the source and destination directories
|
||||
comparing files in order to find files which need to be transferred.
|
||||
- Files which need to be transferred are then batched into jobs of
|
||||
`--cluster-batch-files` files to transfer or `--cluster-batch-size`
|
||||
max size in `queue/pending` for the workers to pick up.
|
||||
- It watches `queue/done` for finished jobs and updates the transfer
|
||||
statistics and logs any errors, accordingly moving the job to
|
||||
`queue/finished`.
|
||||
|
||||
Once the sync is complete, if `--cluster-quit-workers` is set, then it
|
||||
sends the workers a special command which causes them all to exit.
|
||||
|
||||
The controller only sends transfer jobs to the workers. All the other
|
||||
tasks (eg listing, comparing) are done by the controller. The
|
||||
controller does not execute any transfer tasks itself.
|
||||
|
||||
The controller reads worker status as written to `queue/status` and
|
||||
will detect workers which have stopped. If it detects a failed worker
|
||||
then it will re-assign any outstanding work.
|
||||
|
||||
## Workers
|
||||
|
||||
The workers job is entirely to act as API endpoints that receive their
|
||||
work via files in `/work`. Then
|
||||
|
||||
- Read work in `queue/pending`
|
||||
- Attempt to rename into `queue/processing`
|
||||
- If the cluster work directory supports atomic renames, then use
|
||||
those, otherwise read the file, write the copy, delete the original.
|
||||
If the delete fails then the rename was not successful (possible on
|
||||
s3 backends).
|
||||
- If successful then do that item of work. If not successful another
|
||||
worker got there first and sleep for a bit then retry.
|
||||
- After the copy is complete then remove the `queue/processing` file
|
||||
or rename it into `queue/finished` if the `--cluster-cleanup` flag
|
||||
allows it.
|
||||
- Repeat
|
||||
|
||||
Every second the worker will write a status file in `queue/status` to
|
||||
be read by the controller.
|
||||
|
||||
## Layout of the work directory
|
||||
|
||||
The format of the files in this directory may change without notice
|
||||
but the layout is documented here as it can help debugging.
|
||||
|
||||
```text
|
||||
/work - root of the work directory
|
||||
└── queue - files to control the queue
|
||||
├── done - job files that are finished and read
|
||||
├── finished - job files that are finished but not yet read
|
||||
├── pending - job files that are not started yet
|
||||
├── processing - job files that are running
|
||||
└── status - worker status files
|
||||
```
|
||||
|
||||
If debugging use `--cluster-cleanup none` to leave the completed files
|
||||
in the directory layout.
|
||||
|
||||
## Flags
|
||||
|
||||
### --cluster string
|
||||
|
||||
This enables the cluster mode. Without this flag, all the other
|
||||
cluster flags are ignored. This should be given a remote which can be
|
||||
a local directory, eg `/work` or a remote directory, eg `s3:bucket`.
|
||||
|
||||
### --cluster-batch-files int
|
||||
|
||||
This controls the number of files copied in a cluster batch. Setting
|
||||
this larger may be more efficient but it means the statistics will be
|
||||
less accurate on the controller (default 1000).
|
||||
|
||||
### --cluster-batch-size SizeSuffix
|
||||
|
||||
This controls the total size of files in a cluster batch. If the size
|
||||
of the files in a batch exceeds this number then the batch will be
|
||||
sent to the workers. Setting this larger may be more efficient but it
|
||||
means the statistics will be less accurate on the controller. (default
|
||||
1TiB)
|
||||
|
||||
### --cluster-cleanup ClusterCleanup
|
||||
|
||||
Controls which cluster files get cleaned up.
|
||||
|
||||
- `full` - clean all work files (default)
|
||||
- `completed` - clean completed work files but leave the errors and status
|
||||
- `none` - leave all the file (useful for debugging)
|
||||
|
||||
### --cluster-id string
|
||||
|
||||
Set an ID for the rclone instance. This can be a string or a number.
|
||||
An ID of 0 will become the controller otherwise the instance will
|
||||
become a worker. If this flag isn't supplied or the value is empty,
|
||||
then a random string will be used instead.
|
||||
|
||||
### --cluster-quit-workers
|
||||
|
||||
If this flag is set, then when the controller finishes its sync task
|
||||
it will quit all the workers before it exits.
|
||||
|
||||
## Not implemented
|
||||
|
||||
Here are some features from the original design which are not
|
||||
implemented yet:
|
||||
|
||||
- the controller will not notice if workers die or fail to complete
|
||||
their tasks
|
||||
- the controller does not re-assign the workers work if necessary
|
||||
- the controller does not restart the sync
|
||||
- the workers do not write any status files (but the stats are
|
||||
correctly accounted)
|
||||
@@ -3315,6 +3315,15 @@ For the remote control options and for instructions on how to remote control rcl
|
||||
|
||||
See [the remote control section](/rc/).
|
||||
|
||||
## Cluster
|
||||
|
||||
For the cluster options and for instructions on how to cluster rclone:
|
||||
|
||||
- `--cluster`
|
||||
- Anything starting with `--cluster-`
|
||||
|
||||
See the [cluster section](/cluster/).
|
||||
|
||||
## Logging
|
||||
|
||||
rclone has 4 levels of logging, `ERROR`, `NOTICE`, `INFO` and `DEBUG`.
|
||||
|
||||
@@ -20,6 +20,7 @@
|
||||
<a class="dropdown-item" href="/gui/"><i class="fa fa-book fa-fw"></i> GUI</a>
|
||||
<a class="dropdown-item" href="/rc/"><i class="fa fa-book fa-fw"></i> Remote Control</a>
|
||||
<a class="dropdown-item" href="/remote_setup/"><i class="fa fa-book fa-fw"></i> Remote Setup</a>
|
||||
<a class="dropdown-item" href="/cluster/"><i class="fa fa-book fa-fw"></i> Cluster</a>
|
||||
<a class="dropdown-item" href="/changelog/"><i class="fa fa-book fa-fw"></i> Changelog</a>
|
||||
<a class="dropdown-item" href="/bugs/"><i class="fa fa-book fa-fw"></i> Bugs</a>
|
||||
<a class="dropdown-item" href="/faq/"><i class="fa fa-book fa-fw"></i> FAQ</a>
|
||||
|
||||
@@ -82,7 +82,7 @@ type accountValues struct {
|
||||
max int64 // if >=0 the max number of bytes to transfer
|
||||
start time.Time // Start time of first read
|
||||
lpTime time.Time // Time of last average measurement
|
||||
lpBytes int // Number of bytes read since last measurement
|
||||
lpBytes int64 // Number of bytes read since last measurement
|
||||
avg float64 // Moving average of last few measurements in Byte/s
|
||||
}
|
||||
|
||||
@@ -344,15 +344,20 @@ func (acc *Account) limitPerFileBandwidth(n int) {
|
||||
}
|
||||
}
|
||||
|
||||
// Account the read and limit bandwidth
|
||||
func (acc *Account) accountRead(n int) {
|
||||
// Account the read
|
||||
func (acc *Account) accountReadN(n int64) {
|
||||
// Update Stats
|
||||
acc.values.mu.Lock()
|
||||
acc.values.lpBytes += n
|
||||
acc.values.bytes += int64(n)
|
||||
acc.values.bytes += n
|
||||
acc.values.mu.Unlock()
|
||||
|
||||
acc.stats.Bytes(int64(n))
|
||||
acc.stats.Bytes(n)
|
||||
}
|
||||
|
||||
// Account the read and limit bandwidth
|
||||
func (acc *Account) accountRead(n int) {
|
||||
acc.accountReadN(int64(n))
|
||||
|
||||
TokenBucket.LimitBandwidth(TokenBucketSlotAccounting, n)
|
||||
acc.limitPerFileBandwidth(n)
|
||||
@@ -427,6 +432,15 @@ func (acc *Account) AccountRead(n int) (err error) {
|
||||
return err
|
||||
}
|
||||
|
||||
// AccountReadN account having read n bytes
|
||||
//
|
||||
// Does not obey any transfer limits, bandwidth limits, etc.
|
||||
func (acc *Account) AccountReadN(n int64) {
|
||||
acc.mu.Lock()
|
||||
defer acc.mu.Unlock()
|
||||
acc.accountReadN(n)
|
||||
}
|
||||
|
||||
// Close the object
|
||||
func (acc *Account) Close() error {
|
||||
acc.mu.Lock()
|
||||
|
||||
@@ -100,7 +100,7 @@ func TestAccountRead(t *testing.T) {
|
||||
|
||||
assert.True(t, acc.values.start.IsZero())
|
||||
acc.values.mu.Lock()
|
||||
assert.Equal(t, 0, acc.values.lpBytes)
|
||||
assert.Equal(t, int64(0), acc.values.lpBytes)
|
||||
assert.Equal(t, int64(0), acc.values.bytes)
|
||||
acc.values.mu.Unlock()
|
||||
assert.Equal(t, int64(0), stats.bytes)
|
||||
@@ -113,7 +113,7 @@ func TestAccountRead(t *testing.T) {
|
||||
|
||||
assert.False(t, acc.values.start.IsZero())
|
||||
acc.values.mu.Lock()
|
||||
assert.Equal(t, 2, acc.values.lpBytes)
|
||||
assert.Equal(t, int64(2), acc.values.lpBytes)
|
||||
assert.Equal(t, int64(2), acc.values.bytes)
|
||||
acc.values.mu.Unlock()
|
||||
assert.Equal(t, int64(2), stats.bytes)
|
||||
@@ -145,7 +145,7 @@ func testAccountWriteTo(t *testing.T, withBuffer bool) {
|
||||
|
||||
assert.True(t, acc.values.start.IsZero())
|
||||
acc.values.mu.Lock()
|
||||
assert.Equal(t, 0, acc.values.lpBytes)
|
||||
assert.Equal(t, int64(0), acc.values.lpBytes)
|
||||
assert.Equal(t, int64(0), acc.values.bytes)
|
||||
acc.values.mu.Unlock()
|
||||
assert.Equal(t, int64(0), stats.bytes)
|
||||
@@ -159,7 +159,7 @@ func testAccountWriteTo(t *testing.T, withBuffer bool) {
|
||||
|
||||
assert.False(t, acc.values.start.IsZero())
|
||||
acc.values.mu.Lock()
|
||||
assert.Equal(t, len(buf), acc.values.lpBytes)
|
||||
assert.Equal(t, int64(len(buf)), acc.values.lpBytes)
|
||||
assert.Equal(t, int64(len(buf)), acc.values.bytes)
|
||||
acc.values.mu.Unlock()
|
||||
assert.Equal(t, int64(len(buf)), stats.bytes)
|
||||
|
||||
598
fs/cluster/cluster.go
Normal file
598
fs/cluster/cluster.go
Normal file
@@ -0,0 +1,598 @@
|
||||
// Package cluster implements a machanism to distribute work over a
|
||||
// cluster of rclone instances.
|
||||
package cluster
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"path"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/rclone/rclone/fs"
|
||||
"github.com/rclone/rclone/fs/accounting"
|
||||
"github.com/rclone/rclone/fs/filter"
|
||||
"github.com/rclone/rclone/fs/operations"
|
||||
"github.com/rclone/rclone/fs/rc"
|
||||
"github.com/rclone/rclone/lib/atexit"
|
||||
"github.com/rclone/rclone/lib/errcount"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
// ErrClusterNotConfigured is returned from creation functions.
|
||||
var ErrClusterNotConfigured = errors.New("cluster is not configured")
|
||||
|
||||
// If we don't hear from workers in this time we assume they have timed out
|
||||
// and re-assign their jobs.
|
||||
const workerTimeout = 2 * time.Second
|
||||
|
||||
// Cluster describes the workings of the current cluster.
|
||||
type Cluster struct {
|
||||
jobs *Jobs
|
||||
id string
|
||||
batchFiles int
|
||||
batchSize fs.SizeSuffix
|
||||
cleanup fs.ClusterCleanup // how we cleanup cluster files
|
||||
_config rc.Params // for rc
|
||||
_filter rc.Params // for rc
|
||||
cancel func() // stop bg job
|
||||
wg sync.WaitGroup // bg job finished
|
||||
quit chan struct{} // signal graceful stop
|
||||
sync chan chan<- struct{} // sync the current jobs
|
||||
quitWorkers bool // if set, send workers a stop signal on Shutdown
|
||||
|
||||
workers map[string]*WorkerStatus // worker ID => status
|
||||
deadWorkers map[string]struct{}
|
||||
|
||||
mu sync.Mutex
|
||||
currentBatch Batch
|
||||
inflight map[string]Batch
|
||||
shutdown bool
|
||||
}
|
||||
|
||||
// Batch is a collection of rc tasks to do
|
||||
type Batch struct {
|
||||
size int64 // size in batch
|
||||
Path string `json:"_path"`
|
||||
Inputs []rc.Params `json:"inputs"`
|
||||
Config rc.Params `json:"_config,omitempty"`
|
||||
Filter rc.Params `json:"_filter,omitempty"`
|
||||
|
||||
trs []*accounting.Transfer // transfer for each Input
|
||||
sizes []int64 // sizes for each Input
|
||||
}
|
||||
|
||||
// BatchResult has the results of the batch as received.
|
||||
type BatchResult struct {
|
||||
Results []rc.Params `json:"results"`
|
||||
|
||||
// Error returns
|
||||
Error string `json:"error"`
|
||||
Status int `json:"status"`
|
||||
Input string `json:"input"`
|
||||
Path string `json:"path"`
|
||||
}
|
||||
|
||||
// NewCluster creates a new cluster from the config in ctx.
|
||||
//
|
||||
// It may return nil for no cluster is configured.
|
||||
func NewCluster(ctx context.Context) (*Cluster, error) {
|
||||
ci := fs.GetConfig(ctx)
|
||||
if ci.Cluster == "" {
|
||||
return nil, nil
|
||||
}
|
||||
jobs, err := NewJobs(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
c := &Cluster{
|
||||
jobs: jobs,
|
||||
id: ci.ClusterID,
|
||||
quitWorkers: ci.ClusterQuitWorkers,
|
||||
batchFiles: ci.ClusterBatchFiles,
|
||||
batchSize: ci.ClusterBatchSize,
|
||||
cleanup: ci.ClusterCleanup,
|
||||
quit: make(chan struct{}),
|
||||
sync: make(chan chan<- struct{}),
|
||||
inflight: make(map[string]Batch),
|
||||
workers: make(map[string]*WorkerStatus),
|
||||
deadWorkers: make(map[string]struct{}),
|
||||
}
|
||||
|
||||
// Configure _config
|
||||
configParams, err := fs.ConfigOptionsInfo.NonDefaultRC(ci)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to read global config: %w", err)
|
||||
}
|
||||
// Remove any global cluster config
|
||||
for k := range configParams {
|
||||
if strings.HasPrefix(k, "Cluster") {
|
||||
delete(configParams, k)
|
||||
}
|
||||
}
|
||||
if len(configParams) != 0 {
|
||||
fs.Debugf(nil, "Overridden global config: %#v", configParams)
|
||||
}
|
||||
c._config = rc.Params(configParams)
|
||||
|
||||
// Configure _filter
|
||||
fi := filter.GetConfig(ctx)
|
||||
if !fi.InActive() {
|
||||
filterParams, err := filter.OptionsInfo.NonDefaultRC(fi)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to read filter config: %w", err)
|
||||
}
|
||||
fs.Debugf(nil, "Overridden filter config: %#v", filterParams)
|
||||
c._filter = rc.Params(filterParams)
|
||||
}
|
||||
|
||||
err = c.jobs.createDirectoryStructure(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Start the background worker
|
||||
bgCtx, cancel := context.WithCancel(context.Background())
|
||||
c.cancel = cancel
|
||||
c.wg.Add(1)
|
||||
go c.run(bgCtx)
|
||||
|
||||
fs.Logf(c.jobs.f, "Started cluster master")
|
||||
|
||||
return c, nil
|
||||
}
|
||||
|
||||
var (
|
||||
globalClusterMu sync.Mutex
|
||||
globalCluster *Cluster
|
||||
)
|
||||
|
||||
// GetCluster starts or gets a cluster.
|
||||
//
|
||||
// If no cluster is configured or the cluster can't be started then it
|
||||
// returns nil.
|
||||
func GetCluster(ctx context.Context) *Cluster {
|
||||
globalClusterMu.Lock()
|
||||
defer globalClusterMu.Unlock()
|
||||
|
||||
if globalCluster != nil {
|
||||
return globalCluster
|
||||
}
|
||||
|
||||
cluster, err := NewCluster(ctx)
|
||||
if err != nil {
|
||||
fs.Errorf(nil, "Failed to start cluster: %v", err)
|
||||
return nil
|
||||
}
|
||||
if cluster != nil {
|
||||
atexit.Register(func() {
|
||||
err := cluster.Shutdown(context.Background())
|
||||
if err != nil {
|
||||
fs.Errorf(nil, "Failed to shutdown cluster: %v", err)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
globalCluster = cluster
|
||||
return globalCluster
|
||||
}
|
||||
|
||||
// Send the current batch for processing
|
||||
//
|
||||
// call with c.mu held
|
||||
func (c *Cluster) sendBatch(ctx context.Context) (err error) {
|
||||
// Do nothing if the batch is empty
|
||||
if len(c.currentBatch.Inputs) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Get and reset current batch
|
||||
b := c.currentBatch
|
||||
c.currentBatch = Batch{}
|
||||
|
||||
b.Path = "job/batch"
|
||||
b.Config = c._config
|
||||
b.Filter = c._filter
|
||||
|
||||
// write the pending job
|
||||
name, err := c.jobs.writeJob(ctx, clusterPending, &b)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
fs.Infof(name, "written cluster batch file")
|
||||
c.inflight[name] = b
|
||||
return nil
|
||||
}
|
||||
|
||||
// Add the command to the current batch
|
||||
func (c *Cluster) addToBatch(ctx context.Context, obj fs.Object, in rc.Params, size int64, tr *accounting.Transfer) (err error) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
if c.shutdown {
|
||||
return errors.New("internal error: can't add file to Shutdown cluster")
|
||||
}
|
||||
|
||||
c.currentBatch.Inputs = append(c.currentBatch.Inputs, in)
|
||||
c.currentBatch.size += size
|
||||
c.currentBatch.trs = append(c.currentBatch.trs, tr)
|
||||
c.currentBatch.sizes = append(c.currentBatch.sizes, size)
|
||||
|
||||
if c.currentBatch.size >= int64(c.batchSize) || len(c.currentBatch.Inputs) >= c.batchFiles {
|
||||
err = c.sendBatch(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Move does operations.Move via the cluster.
|
||||
//
|
||||
// Move src object to dst or fdst if nil. If dst is nil then it uses
|
||||
// remote as the name of the new object.
|
||||
func (c *Cluster) Move(ctx context.Context, fdst fs.Fs, dst fs.Object, remote string, src fs.Object) (err error) {
|
||||
tr := accounting.Stats(ctx).NewTransfer(src, fdst)
|
||||
if operations.SkipDestructive(ctx, src, "cluster move") {
|
||||
in := tr.Account(ctx, nil)
|
||||
in.DryRun(src.Size())
|
||||
tr.Done(ctx, nil)
|
||||
return nil
|
||||
}
|
||||
fsrc, ok := src.Fs().(fs.Fs)
|
||||
if !ok {
|
||||
err = errors.New("internal error: cluster move: can't cast src.Fs() to fs.Fs")
|
||||
tr.Done(ctx, err)
|
||||
return err
|
||||
}
|
||||
in := rc.Params{
|
||||
"_path": "operations/movefile",
|
||||
"dstFs": fs.ConfigStringFull(fdst),
|
||||
"dstRemote": remote,
|
||||
"srcFs": fs.ConfigStringFull(fsrc),
|
||||
"srcRemote": src.Remote(),
|
||||
}
|
||||
if dst != nil {
|
||||
in["dstRemote"] = dst.Remote()
|
||||
}
|
||||
return c.addToBatch(ctx, src, in, src.Size(), tr)
|
||||
}
|
||||
|
||||
// Copy does operations.Copy via the cluster.
|
||||
//
|
||||
// Copy src object to dst or fdst if nil. If dst is nil then it uses
|
||||
// remote as the name of the new object.
|
||||
func (c *Cluster) Copy(ctx context.Context, fdst fs.Fs, dst fs.Object, remote string, src fs.Object) (err error) {
|
||||
tr := accounting.Stats(ctx).NewTransfer(src, fdst)
|
||||
if operations.SkipDestructive(ctx, src, "cluster copy") {
|
||||
in := tr.Account(ctx, nil)
|
||||
in.DryRun(src.Size())
|
||||
tr.Done(ctx, nil)
|
||||
return nil
|
||||
}
|
||||
fsrc, ok := src.Fs().(fs.Fs)
|
||||
if !ok {
|
||||
err = errors.New("internal error: cluster copy: can't cast src.Fs() to fs.Fs")
|
||||
tr.Done(ctx, err)
|
||||
return err
|
||||
}
|
||||
in := rc.Params{
|
||||
"_path": "operations/copyfile",
|
||||
"dstFs": fs.ConfigStringFull(fdst),
|
||||
"dstRemote": remote,
|
||||
"srcFs": fs.ConfigStringFull(fsrc),
|
||||
"srcRemote": src.Remote(),
|
||||
}
|
||||
if dst != nil {
|
||||
in["dstRemote"] = dst.Remote()
|
||||
}
|
||||
return c.addToBatch(ctx, src, in, src.Size(), tr)
|
||||
}
|
||||
|
||||
// DeleteFile does operations.DeleteFile via the cluster
|
||||
//
|
||||
// If useBackupDir is set and --backup-dir is in effect then it moves
|
||||
// the file to there instead of deleting
|
||||
func (c *Cluster) DeleteFile(ctx context.Context, dst fs.Object) (err error) {
|
||||
tr := accounting.Stats(ctx).NewCheckingTransfer(dst, "deleting")
|
||||
err = accounting.Stats(ctx).DeleteFile(ctx, dst.Size())
|
||||
if err != nil {
|
||||
tr.Done(ctx, err)
|
||||
return err
|
||||
}
|
||||
if operations.SkipDestructive(ctx, dst, "cluster delete") {
|
||||
tr.Done(ctx, nil)
|
||||
return
|
||||
}
|
||||
fdst, ok := dst.Fs().(fs.Fs)
|
||||
if !ok {
|
||||
err = errors.New("internal error: cluster delete: can't cast dst.Fs() to fs.Fs")
|
||||
tr.Done(ctx, nil)
|
||||
return err
|
||||
}
|
||||
in := rc.Params{
|
||||
"_path": "operations/deletefile",
|
||||
"fs": fs.ConfigStringFull(fdst),
|
||||
"remote": dst.Remote(),
|
||||
}
|
||||
return c.addToBatch(ctx, dst, in, 0, tr)
|
||||
}
|
||||
|
||||
// processCompletedJob loads the job and checks it off
|
||||
func (c *Cluster) processCompletedJob(ctx context.Context, obj fs.Object) error {
|
||||
name := path.Base(obj.Remote())
|
||||
name, _ = strings.CutSuffix(name, ".json")
|
||||
fs.Debugf(nil, "cluster: processing completed job %q", name)
|
||||
|
||||
var output BatchResult
|
||||
err := c.jobs.readJob(ctx, obj, &output)
|
||||
if err != nil {
|
||||
return fmt.Errorf("check jobs read: %w", err)
|
||||
}
|
||||
|
||||
c.mu.Lock()
|
||||
input, ok := c.inflight[name]
|
||||
// FIXME delete or save job
|
||||
if !ok {
|
||||
for k := range c.inflight {
|
||||
fs.Debugf(nil, "key %q", k)
|
||||
}
|
||||
c.mu.Unlock()
|
||||
return fmt.Errorf("check jobs: job %q not found", name)
|
||||
}
|
||||
c.mu.Unlock()
|
||||
|
||||
// Delete the inflight entry when batch is processed
|
||||
defer func() {
|
||||
c.mu.Lock()
|
||||
delete(c.inflight, name)
|
||||
c.mu.Unlock()
|
||||
}()
|
||||
|
||||
// Check job
|
||||
if output.Error != "" {
|
||||
return fmt.Errorf("cluster: failed to run batch job: %s (%d)", output.Error, output.Status)
|
||||
}
|
||||
if len(input.Inputs) != len(output.Results) {
|
||||
return fmt.Errorf("cluster: input had %d jobs but output had %d", len(input.Inputs), len(output.Results))
|
||||
}
|
||||
|
||||
// Run through the batch and mark operations as successful or not
|
||||
for i := range input.Inputs {
|
||||
in := input.Inputs[i]
|
||||
tr := input.trs[i]
|
||||
size := input.sizes[i]
|
||||
out := output.Results[i]
|
||||
errorString, hasError := out["error"]
|
||||
var err error
|
||||
if hasError && errorString != "" {
|
||||
err = fmt.Errorf("cluster: worker error: %s (%v)", errorString, out["status"])
|
||||
}
|
||||
if err == nil && in["_path"] == "operations/movefile" {
|
||||
accounting.Stats(ctx).Renames(1)
|
||||
}
|
||||
acc := tr.Account(ctx, nil)
|
||||
acc.AccountReadN(size)
|
||||
tr.Done(ctx, err)
|
||||
remote, ok := in["dstRemote"]
|
||||
if !ok {
|
||||
remote = in["remote"]
|
||||
}
|
||||
if err == nil {
|
||||
fs.Infof(remote, "cluster %s successful", in["_path"])
|
||||
} else {
|
||||
fs.Errorf(remote, "cluster %s failed: %v", in["_path"], err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// loadWorkerStatus updates the worker status
|
||||
func (c *Cluster) loadWorkerStatus(ctx context.Context) error {
|
||||
objs, err := c.jobs.listDir(ctx, clusterStatus)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cluster: get job status list failed: %w", err)
|
||||
}
|
||||
ec := errcount.New()
|
||||
g, gCtx := errgroup.WithContext(ctx)
|
||||
var mu sync.Mutex
|
||||
for _, obj := range objs {
|
||||
g.Go(func() error {
|
||||
buf, err := c.jobs.readFile(gCtx, obj)
|
||||
if err != nil {
|
||||
ec.Add(fmt.Errorf("read object: %w", err))
|
||||
return nil
|
||||
}
|
||||
workerStatus := new(WorkerStatus)
|
||||
err = json.Unmarshal(buf, workerStatus)
|
||||
if err != nil {
|
||||
ec.Add(fmt.Errorf("status json: %w", err))
|
||||
return nil
|
||||
}
|
||||
mu.Lock()
|
||||
c.workers[workerStatus.ID] = workerStatus
|
||||
mu.Unlock()
|
||||
return nil
|
||||
})
|
||||
}
|
||||
return ec.Err("cluster: load status")
|
||||
}
|
||||
|
||||
// checkWorkers loads the worker status
|
||||
func (c *Cluster) checkWorkers(ctx context.Context) {
|
||||
err := c.loadWorkerStatus(ctx)
|
||||
if err != nil {
|
||||
fs.Errorf(nil, "failed to read some worker status: %v", err)
|
||||
}
|
||||
for workerID, status := range c.workers {
|
||||
timeSinceUpdated := time.Since(status.Updated)
|
||||
if timeSinceUpdated > workerTimeout {
|
||||
if _, isDead := c.deadWorkers[workerID]; isDead {
|
||||
continue
|
||||
}
|
||||
fs.Errorf(nil, "cluster: haven't heard from worker %q for %v - assuming dead", workerID, timeSinceUpdated)
|
||||
// Find any jobs claimed by worker and restart
|
||||
objs, err := c.jobs.listDir(ctx, clusterProcessing)
|
||||
if err != nil {
|
||||
fs.Errorf(nil, "cluster: failed to find pending jobs: %v", err)
|
||||
continue
|
||||
}
|
||||
for _, obj := range objs {
|
||||
fs.Errorf(obj, "cluster: checking job")
|
||||
// Jobs are named {jobID}-{workerID}.json
|
||||
name := strings.TrimSuffix(path.Base(obj.Remote()), ".json")
|
||||
dash := strings.LastIndex(name, "-")
|
||||
if dash < 0 {
|
||||
fs.Errorf(nil, "cluster: failed to find dash in job %q", name)
|
||||
continue
|
||||
}
|
||||
jobID, jobWorkerID := name[:dash], name[dash+1:]
|
||||
fs.Errorf(obj, "cluster: checking jobID %q, workerID %q", jobID, jobWorkerID)
|
||||
if workerID != jobWorkerID {
|
||||
fs.Debugf(nil, "cluster: job %q doesn't match %q", jobWorkerID, workerID)
|
||||
continue
|
||||
}
|
||||
// Found a job running on worker - rename it back to Pending
|
||||
newRemote := path.Join(clusterPending, jobID+".json")
|
||||
_, err = c.jobs.rename(ctx, obj, newRemote)
|
||||
if err != nil {
|
||||
fs.Errorf(nil, "cluster: failed to restart job %q: %v", jobID, err)
|
||||
continue
|
||||
}
|
||||
fs.Errorf(nil, "cluster: restarted job %q", jobID)
|
||||
}
|
||||
c.deadWorkers[workerID] = struct{}{}
|
||||
} else {
|
||||
if _, isDead := c.deadWorkers[workerID]; isDead {
|
||||
fs.Errorf(nil, "cluster: dead worker %q came back to life!", workerID)
|
||||
delete(c.deadWorkers, workerID)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// checkJobs sees if there are any completed jobs
|
||||
func (c *Cluster) checkJobs(ctx context.Context) {
|
||||
objs, err := c.jobs.listDir(ctx, clusterDone)
|
||||
if err != nil {
|
||||
fs.Errorf(nil, "cluster: get completed job list failed: %v", err)
|
||||
return
|
||||
}
|
||||
for _, obj := range objs {
|
||||
err := c.processCompletedJob(ctx, obj)
|
||||
status := "output-ok"
|
||||
ok := true
|
||||
if err != nil {
|
||||
status = "output-failed"
|
||||
ok = false
|
||||
fs.Errorf(nil, "cluster: process completed job failed: %v", err)
|
||||
}
|
||||
c.jobs.finish(ctx, obj, status, ok)
|
||||
}
|
||||
}
|
||||
|
||||
// Run the background process
|
||||
func (c *Cluster) run(ctx context.Context) {
|
||||
defer c.wg.Done()
|
||||
checkJobs := time.NewTicker(clusterCheckJobsInterval)
|
||||
defer checkJobs.Stop()
|
||||
checkWorkers := time.NewTicker(clusterCheckWorkersInterval)
|
||||
defer checkWorkers.Stop()
|
||||
var syncedChans []chan<- struct{}
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-c.quit:
|
||||
fs.Debugf(nil, "cluster: quit request received")
|
||||
return
|
||||
case synced := <-c.sync:
|
||||
syncedChans = append(syncedChans, synced)
|
||||
fs.Debugf(nil, "cluster: sync request received")
|
||||
case <-checkWorkers.C:
|
||||
c.checkWorkers(ctx)
|
||||
case <-checkJobs.C:
|
||||
}
|
||||
c.checkJobs(ctx)
|
||||
if len(syncedChans) > 0 {
|
||||
c.mu.Lock()
|
||||
n := len(c.inflight)
|
||||
c.mu.Unlock()
|
||||
if n == 0 {
|
||||
fs.Debugf(nil, "cluster: synced")
|
||||
for _, synced := range syncedChans {
|
||||
synced <- struct{}{}
|
||||
}
|
||||
syncedChans = nil
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Sync the cluster.
|
||||
//
|
||||
// Call this when all job items have been added to the cluster.
|
||||
//
|
||||
// This will wait for any outstanding jobs to finish regardless of who
|
||||
// put them in
|
||||
func (c *Cluster) Sync(ctx context.Context) error {
|
||||
// Flush any outstanding
|
||||
c.mu.Lock()
|
||||
err := c.sendBatch(ctx)
|
||||
c.mu.Unlock()
|
||||
|
||||
// Wait for the cluster to be empty
|
||||
done := make(chan struct{})
|
||||
c.sync <- done
|
||||
<-done
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// Shutdown the cluster.
|
||||
//
|
||||
// Call this when all job items have been added to the cluster.
|
||||
//
|
||||
// This will wait for any outstanding jobs to finish.
|
||||
func (c *Cluster) Shutdown(ctx context.Context) (err error) {
|
||||
c.mu.Lock()
|
||||
inBatch := len(c.currentBatch.Inputs)
|
||||
inFlight := len(c.inflight)
|
||||
shutdown := c.shutdown
|
||||
c.shutdown = true
|
||||
c.mu.Unlock()
|
||||
|
||||
if inBatch > 0 {
|
||||
err = errors.Join(nil, fmt.Errorf("%d items batched on cluster shutdown", inBatch))
|
||||
}
|
||||
if inFlight > 0 {
|
||||
err = errors.Join(nil, fmt.Errorf("%d items in flight on cluster shutdown", inFlight))
|
||||
}
|
||||
if shutdown {
|
||||
fs.Debugf(nil, "cluster: already shutdown")
|
||||
return nil
|
||||
}
|
||||
c.quit <- struct{}{}
|
||||
fs.Debugf(nil, "Waiting for cluster to finish")
|
||||
c.wg.Wait()
|
||||
|
||||
// Send a quit job
|
||||
if c.quitWorkers {
|
||||
fs.Logf(nil, "Sending quit to workers")
|
||||
quitErr := c.jobs.writeQuitJob(ctx, clusterPending)
|
||||
if quitErr != nil {
|
||||
err = errors.Join(err, fmt.Errorf("shutdown quit: %w", quitErr))
|
||||
}
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// Abort the cluster and any outstanding jobs.
|
||||
func (c *Cluster) Abort() {
|
||||
c.cancel()
|
||||
c.wg.Wait()
|
||||
}
|
||||
311
fs/cluster/jobs.go
Normal file
311
fs/cluster/jobs.go
Normal file
@@ -0,0 +1,311 @@
|
||||
package cluster
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"cmp"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"path"
|
||||
"slices"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/rclone/rclone/fs"
|
||||
"github.com/rclone/rclone/fs/cache"
|
||||
"github.com/rclone/rclone/fs/hash"
|
||||
"github.com/rclone/rclone/fs/object"
|
||||
"github.com/rclone/rclone/fs/operations"
|
||||
"github.com/rclone/rclone/lib/atexit"
|
||||
"github.com/rclone/rclone/lib/pacer"
|
||||
"github.com/rclone/rclone/lib/random"
|
||||
)
|
||||
|
||||
// Batches flow from queue/pending to queue/processing/
|
||||
const (
|
||||
clusterQueue = "queue"
|
||||
clusterPending = clusterQueue + "/pending"
|
||||
clusterProcessing = clusterQueue + "/processing"
|
||||
clusterDone = clusterQueue + "/done"
|
||||
clusterFinished = clusterQueue + "/finished"
|
||||
clusterStatus = clusterQueue + "/status"
|
||||
|
||||
minSleep = 10 * time.Millisecond
|
||||
maxSleep = 2 * time.Second
|
||||
decayConstant = 2 // bigger for slower decay, exponential
|
||||
|
||||
// Read the queue this often
|
||||
clusterCheckJobsInterval = time.Second
|
||||
|
||||
// Write the worker status this often
|
||||
clusterWriteStatusInterval = time.Second
|
||||
|
||||
// Read the worker status this often
|
||||
clusterCheckWorkersInterval = time.Second
|
||||
|
||||
// Name of job which signals to the workers to quit
|
||||
quitJob = "QUIT"
|
||||
)
|
||||
|
||||
// Jobs is a container for sending and receiving jobs to the cluster.
|
||||
type Jobs struct {
|
||||
remote string // remote for job storage
|
||||
f fs.Fs // cluster remote storage
|
||||
partial bool // do we need to write and rename
|
||||
hasMove bool // set if f has server side move otherwise has server side copy
|
||||
cleanup fs.ClusterCleanup // how we cleanup the cluster files
|
||||
pacer *fs.Pacer // To pace the API calls
|
||||
}
|
||||
|
||||
// NewJobs creates a Jobs source from the config in ctx.
|
||||
//
|
||||
// It may return nil for no cluster is configured.
|
||||
func NewJobs(ctx context.Context) (*Jobs, error) {
|
||||
ci := fs.GetConfig(ctx)
|
||||
if ci.Cluster == "" {
|
||||
return nil, nil
|
||||
}
|
||||
f, err := cache.Get(ctx, ci.Cluster)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cluster remote creation: %w", err)
|
||||
}
|
||||
features := f.Features()
|
||||
if features.Move == nil && features.Copy == nil {
|
||||
return nil, fmt.Errorf("cluster remote must have server side move and %q doesn't", ci.Cluster)
|
||||
}
|
||||
jobs := &Jobs{
|
||||
remote: ci.Cluster,
|
||||
f: f,
|
||||
partial: features.PartialUploads,
|
||||
hasMove: features.Move != nil,
|
||||
pacer: fs.NewPacer(ctx, pacer.NewDefault(pacer.MinSleep(minSleep), pacer.MaxSleep(maxSleep), pacer.DecayConstant(decayConstant))),
|
||||
cleanup: ci.ClusterCleanup,
|
||||
}
|
||||
return jobs, nil
|
||||
}
|
||||
|
||||
// Create the cluster directory structure
|
||||
func (jobs *Jobs) createDirectoryStructure(ctx context.Context) (err error) {
|
||||
for _, dir := range []string{clusterPending, clusterProcessing, clusterDone, clusterFinished, clusterStatus} {
|
||||
err = jobs.f.Mkdir(ctx, dir)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cluster mkdir %q: %w", dir, err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// rename a file
|
||||
//
|
||||
// if this returns fs.ErrorObjectNotFound then the file has already been renamed.
|
||||
func (jobs *Jobs) rename(ctx context.Context, src fs.Object, dstRemote string) (dst fs.Object, err error) {
|
||||
features := jobs.f.Features()
|
||||
if jobs.hasMove {
|
||||
dst, err = features.Move(ctx, src, dstRemote)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cluster: failed to rename job file: %w", err)
|
||||
}
|
||||
} else {
|
||||
dst, err = features.Copy(ctx, src, dstRemote)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cluster: failed to rename (copy phase) job file: %w", err)
|
||||
}
|
||||
err = src.Remove(ctx)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cluster: failed to rename (delete phase) job file: %w", err)
|
||||
}
|
||||
}
|
||||
return dst, nil
|
||||
}
|
||||
|
||||
// Finish with a jobs file
|
||||
func (jobs *Jobs) finish(ctx context.Context, obj fs.Object, status string, ok bool) {
|
||||
var err error
|
||||
if (ok && jobs.cleanup == fs.ClusterCleanupCompleted) || jobs.cleanup == fs.ClusterCleanupFull {
|
||||
err = obj.Remove(ctx)
|
||||
} else {
|
||||
name := path.Join(clusterFinished, status, path.Base(obj.Remote()))
|
||||
_, err = jobs.rename(ctx, obj, name)
|
||||
}
|
||||
if err != nil {
|
||||
fs.Errorf(nil, "cluster: removing completed job failed: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// write buf into remote
|
||||
func (jobs *Jobs) writeFile(ctx context.Context, remote string, modTime time.Time, buf []byte) error {
|
||||
partialRemote := remote
|
||||
if jobs.partial {
|
||||
partialRemote = remote + ".partial"
|
||||
}
|
||||
// Calculate hashes
|
||||
w, err := hash.NewMultiHasherTypes(jobs.f.Hashes())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = w.Write(buf)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
obji := object.NewStaticObjectInfo(partialRemote, modTime, int64(len(buf)), true, w.Sums(), jobs.f)
|
||||
var obj fs.Object
|
||||
err = jobs.pacer.Call(func() (bool, error) {
|
||||
in := bytes.NewBuffer(buf)
|
||||
obj, err = jobs.f.Put(ctx, in, obji)
|
||||
if err != nil {
|
||||
return true, fmt.Errorf("cluster: failed to write %q: %q", remote, err)
|
||||
}
|
||||
return false, nil
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if jobs.partial {
|
||||
obj, err = jobs.rename(ctx, obj, remote)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Remove the file if it exists
|
||||
func (jobs *Jobs) removeFile(ctx context.Context, remote string) error {
|
||||
obj, err := jobs.f.NewObject(ctx, remote)
|
||||
if errors.Is(err, fs.ErrorObjectNotFound) || errors.Is(err, fs.ErrorDirNotFound) {
|
||||
return nil
|
||||
} else if err != nil {
|
||||
return err
|
||||
}
|
||||
return obj.Remove(ctx)
|
||||
}
|
||||
|
||||
// write a job to a file returning the name
|
||||
func (jobs *Jobs) writeJob(ctx context.Context, where string, job any) (name string, err error) {
|
||||
now := time.Now().UTC()
|
||||
name = fmt.Sprintf("%s-%s", now.Format(time.RFC3339Nano), random.String(20))
|
||||
remote := path.Join(where, name+".json")
|
||||
buf, err := json.MarshalIndent(job, "", "\t")
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("cluster: job json: %w", err)
|
||||
}
|
||||
err = jobs.writeFile(ctx, remote, now, buf)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("cluster: job write: %w", err)
|
||||
}
|
||||
return name, nil
|
||||
}
|
||||
|
||||
// write a quit job to a file
|
||||
func (jobs *Jobs) writeQuitJob(ctx context.Context, where string) (err error) {
|
||||
now := time.Now().UTC()
|
||||
remote := path.Join(where, quitJob+".json")
|
||||
err = jobs.writeFile(ctx, remote, now, []byte("{}"))
|
||||
if err != nil {
|
||||
return fmt.Errorf("cluster: quit job write: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// read buf from object
|
||||
func (jobs *Jobs) readFile(ctx context.Context, o fs.Object) (buf []byte, err error) {
|
||||
err = jobs.pacer.Call(func() (bool, error) {
|
||||
in, err := operations.Open(ctx, o)
|
||||
if err != nil {
|
||||
return true, fmt.Errorf("cluster: failed to open %q: %w", o, err)
|
||||
}
|
||||
buf, err = io.ReadAll(in)
|
||||
if err != nil {
|
||||
return true, fmt.Errorf("cluster: failed to read %q: %w", o, err)
|
||||
}
|
||||
err = in.Close()
|
||||
if err != nil {
|
||||
return true, fmt.Errorf("cluster: failed to close %q: %w", o, err)
|
||||
}
|
||||
return false, nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return buf, nil
|
||||
}
|
||||
|
||||
// read a job from a file
|
||||
//
|
||||
// job should be a pointer to something to be unmarshalled
|
||||
func (jobs *Jobs) readJob(ctx context.Context, obj fs.Object, job any) error {
|
||||
buf, err := jobs.readFile(ctx, obj)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cluster: job read: %w", err)
|
||||
}
|
||||
err = json.Unmarshal(buf, job)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cluster: job read json: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// lists the json files in a cluster directory
|
||||
func (jobs *Jobs) listDir(ctx context.Context, dir string) (objects []fs.Object, err error) {
|
||||
entries, err := jobs.f.List(ctx, dir)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cluster: failed to list %q: %w", dir, err)
|
||||
}
|
||||
entries.ForObject(func(o fs.Object) {
|
||||
if strings.HasSuffix(o.Remote(), ".json") {
|
||||
objects = append(objects, o)
|
||||
}
|
||||
})
|
||||
slices.SortStableFunc(objects, func(a, b fs.Object) int {
|
||||
return cmp.Compare(a.Remote(), b.Remote())
|
||||
})
|
||||
return objects, nil
|
||||
}
|
||||
|
||||
// get a job from pending if there is one available.
|
||||
//
|
||||
// Returns a nil object if no jobs are available.
|
||||
//
|
||||
// FIXME should mark jobs as error jobs in here if they can't be read properly?
|
||||
func (jobs *Jobs) getJob(ctx context.Context, id string) (name string, obj fs.Object, err error) {
|
||||
objs, err := jobs.listDir(ctx, clusterPending)
|
||||
if err != nil {
|
||||
return "", nil, fmt.Errorf("get job list: %w", err)
|
||||
}
|
||||
quit := false
|
||||
for len(objs) > 0 {
|
||||
obj = objs[0]
|
||||
objs = objs[1:]
|
||||
name = path.Base(obj.Remote())
|
||||
name, _ = strings.CutSuffix(name, ".json")
|
||||
|
||||
// See if we have been asked to quit
|
||||
if name == quitJob {
|
||||
quit = true
|
||||
continue
|
||||
}
|
||||
|
||||
// claim the job
|
||||
newName := fmt.Sprintf("%s-%s.json", name, id)
|
||||
newRemote := path.Join(clusterProcessing, newName)
|
||||
obj, err = jobs.rename(ctx, obj, newRemote)
|
||||
if errors.Is(err, fs.ErrorObjectNotFound) {
|
||||
// claim failed - try again
|
||||
continue
|
||||
}
|
||||
if err != nil {
|
||||
return "", nil, fmt.Errorf("get job claim: %w", err)
|
||||
}
|
||||
return name, obj, nil
|
||||
}
|
||||
// No jobs found
|
||||
if quit {
|
||||
fs.Logf(nil, "Exiting cluster worker on command")
|
||||
atexit.Run()
|
||||
os.Exit(0)
|
||||
}
|
||||
return "", nil, nil
|
||||
}
|
||||
211
fs/cluster/worker.go
Normal file
211
fs/cluster/worker.go
Normal file
@@ -0,0 +1,211 @@
|
||||
package cluster
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"path"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/rclone/rclone/fs"
|
||||
"github.com/rclone/rclone/fs/accounting"
|
||||
"github.com/rclone/rclone/fs/rc"
|
||||
"github.com/rclone/rclone/fs/rc/jobs"
|
||||
"github.com/rclone/rclone/lib/random"
|
||||
)
|
||||
|
||||
const maxWorkersDone = 16 // maximum jobs in the done list
|
||||
|
||||
// Worker describes a single instance of a cluster worker.
|
||||
type Worker struct {
|
||||
jobs *Jobs
|
||||
cancel func() // stop bg job
|
||||
wg sync.WaitGroup // bg job finished
|
||||
id string // id of this worker
|
||||
status string // place it stores it status
|
||||
|
||||
jobsMu sync.Mutex
|
||||
running map[string]struct{} // IDs of the jobs being processed
|
||||
done []string // IDs of finished jobs
|
||||
}
|
||||
|
||||
// WorkerStatus shows the status of this worker including jobs
|
||||
// running.
|
||||
type WorkerStatus struct {
|
||||
ID string `json:"id"`
|
||||
Running map[string]rc.Params `json:"running"` // Job ID => accounting.RemoteStats
|
||||
Done map[string]bool `json:"done"` // Job ID => finished status
|
||||
Updated time.Time `json:"updated"`
|
||||
}
|
||||
|
||||
// NewWorker creates a new cluster from the config in ctx.
|
||||
//
|
||||
// It may return nil for no cluster is configured.
|
||||
func NewWorker(ctx context.Context) (*Worker, error) {
|
||||
ci := fs.GetConfig(ctx)
|
||||
if ci.Cluster == "" {
|
||||
return nil, nil
|
||||
}
|
||||
jobs, err := NewJobs(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
w := &Worker{
|
||||
jobs: jobs,
|
||||
id: ci.ClusterID,
|
||||
running: make(map[string]struct{}),
|
||||
}
|
||||
if w.id == "" {
|
||||
w.id = random.String(10)
|
||||
}
|
||||
w.status = path.Join(clusterStatus, w.id+".json")
|
||||
|
||||
// Start the background workers
|
||||
bgCtx, cancel := context.WithCancel(context.Background())
|
||||
w.cancel = cancel
|
||||
w.wg.Add(1)
|
||||
go w.runJobs(bgCtx)
|
||||
w.wg.Add(1)
|
||||
go w.runStatus(bgCtx)
|
||||
|
||||
fs.Logf(w.jobs.f, "Started cluster worker")
|
||||
|
||||
return w, nil
|
||||
}
|
||||
|
||||
// Check to see if a job exists and run it if available
|
||||
func (w *Worker) checkJobs(ctx context.Context) {
|
||||
name, obj, err := w.jobs.getJob(ctx, w.id)
|
||||
if err != nil {
|
||||
fs.Errorf(nil, "check jobs get: %v", err)
|
||||
return
|
||||
}
|
||||
if obj == nil {
|
||||
return // no jobs available
|
||||
}
|
||||
|
||||
// make a stats group for this job
|
||||
ctx = accounting.WithStatsGroup(ctx, name)
|
||||
|
||||
// Add job ID
|
||||
w.jobsMu.Lock()
|
||||
w.running[name] = struct{}{}
|
||||
w.jobsMu.Unlock()
|
||||
fs.Infof(nil, "write jobID %q", name)
|
||||
|
||||
// Remove job ID on exit
|
||||
defer func() {
|
||||
w.jobsMu.Lock()
|
||||
delete(w.running, name)
|
||||
w.done = append(w.done, name)
|
||||
if len(w.done) > maxWorkersDone {
|
||||
w.done = w.done[len(w.done)-maxWorkersDone : len(w.done)]
|
||||
}
|
||||
w.jobsMu.Unlock()
|
||||
}()
|
||||
|
||||
fs.Debugf(nil, "cluster: processing pending job %q", name)
|
||||
inBuf, err := w.jobs.readFile(ctx, obj)
|
||||
if err != nil {
|
||||
fs.Errorf(nil, "check jobs read: %v", err)
|
||||
w.jobs.finish(ctx, obj, "input-error", false)
|
||||
return
|
||||
}
|
||||
outBuf := jobs.NewJobFromBytes(ctx, inBuf)
|
||||
remote := path.Join(clusterDone, name+".json")
|
||||
err = w.jobs.writeFile(ctx, remote, time.Now(), outBuf)
|
||||
if err != nil {
|
||||
fs.Errorf(nil, "check jobs failed to write output: %v", err)
|
||||
return
|
||||
}
|
||||
w.jobs.finish(ctx, obj, "input-ok", true)
|
||||
fs.Debugf(nil, "cluster: processed pending job %q", name)
|
||||
}
|
||||
|
||||
// Run the background process to pick up jobs
|
||||
func (w *Worker) runJobs(ctx context.Context) {
|
||||
defer w.wg.Done()
|
||||
checkJobs := time.NewTicker(clusterCheckJobsInterval)
|
||||
defer checkJobs.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-checkJobs.C:
|
||||
w.checkJobs(ctx)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Write the worker status
|
||||
func (w *Worker) writeStatus(ctx context.Context) {
|
||||
// Create the worker status from the jobIDs and the short stats
|
||||
status := WorkerStatus{
|
||||
ID: w.id,
|
||||
Running: make(map[string]rc.Params),
|
||||
Updated: time.Now(),
|
||||
Done: make(map[string]bool),
|
||||
}
|
||||
w.jobsMu.Lock()
|
||||
for _, jobID := range w.done {
|
||||
status.Done[jobID] = true
|
||||
}
|
||||
for jobID := range w.running {
|
||||
fs.Infof(nil, "read jobID %q", jobID)
|
||||
si := accounting.StatsGroup(ctx, jobID)
|
||||
out, err := si.RemoteStats(true)
|
||||
if err != nil {
|
||||
fs.Errorf(nil, "cluster: write status: stats: %v", err)
|
||||
status.Running[jobID] = rc.Params{}
|
||||
} else {
|
||||
status.Running[jobID] = out
|
||||
}
|
||||
status.Done[jobID] = false
|
||||
}
|
||||
w.jobsMu.Unlock()
|
||||
|
||||
// Write the stats to a file
|
||||
buf, err := json.MarshalIndent(status, "", "\t")
|
||||
if err != nil {
|
||||
fs.Errorf(nil, "cluster: write status: json: %w", err)
|
||||
return
|
||||
}
|
||||
err = w.jobs.writeFile(ctx, w.status, status.Updated, buf)
|
||||
if err != nil {
|
||||
fs.Errorf(nil, "cluster: write status: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Remove the worker status
|
||||
func (w *Worker) clearStatus(ctx context.Context) {
|
||||
err := w.jobs.removeFile(ctx, w.status)
|
||||
if err != nil {
|
||||
fs.Errorf(nil, "cluster: clear status: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Run the background process to write status
|
||||
func (w *Worker) runStatus(ctx context.Context) {
|
||||
defer w.wg.Done()
|
||||
w.writeStatus(ctx)
|
||||
defer w.clearStatus(ctx)
|
||||
writeStatus := time.NewTicker(clusterWriteStatusInterval)
|
||||
defer writeStatus.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-writeStatus.C:
|
||||
t0 := time.Now()
|
||||
w.writeStatus(ctx)
|
||||
fs.Debugf(nil, "write status took %v at %v", time.Since(t0), t0)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Shutdown the worker regardless of whether it has work to process or not.
|
||||
func (w *Worker) Shutdown(ctx context.Context) error {
|
||||
w.cancel()
|
||||
w.wg.Wait()
|
||||
return nil
|
||||
}
|
||||
64
fs/config.go
64
fs/config.go
@@ -50,6 +50,34 @@ var (
|
||||
ConfigEdit = "config_fs_edit"
|
||||
)
|
||||
|
||||
// ClusterCleanup describes the cluster cleanup choices.
|
||||
type ClusterCleanup = Enum[clusterCleanupChoices]
|
||||
|
||||
// Cluster cleanup choices.
|
||||
//
|
||||
// ClusterCleanupNone don't remove any cluster files
|
||||
// ClusterCleanupCompleted remove successfully completed jobs
|
||||
// ClusterCleanupFull remove everything on exit
|
||||
const (
|
||||
ClusterCleanupNone ClusterCleanup = iota
|
||||
ClusterCleanupCompleted
|
||||
ClusterCleanupFull
|
||||
)
|
||||
|
||||
type clusterCleanupChoices struct{}
|
||||
|
||||
func (clusterCleanupChoices) Choices() []string {
|
||||
return []string{
|
||||
ClusterCleanupNone: "none",
|
||||
ClusterCleanupCompleted: "completed",
|
||||
ClusterCleanupFull: "full",
|
||||
}
|
||||
}
|
||||
|
||||
func (clusterCleanupChoices) Type() string {
|
||||
return "ClusterCleanup"
|
||||
}
|
||||
|
||||
// ConfigOptionsInfo describes the Options in use
|
||||
var ConfigOptionsInfo = Options{{
|
||||
Name: "modify_window",
|
||||
@@ -566,6 +594,36 @@ var ConfigOptionsInfo = Options{{
|
||||
Default: "",
|
||||
Help: "HTTP proxy URL.",
|
||||
Groups: "Networking",
|
||||
}, {
|
||||
Name: "cluster",
|
||||
Default: "",
|
||||
Help: "Enable cluster mode with remote to use as shared storage.",
|
||||
Groups: "Networking",
|
||||
}, {
|
||||
Name: "cluster_id",
|
||||
Default: "",
|
||||
Help: "Set to an ID for the cluster. An ID of 0 or empty becomes the controller.",
|
||||
Groups: "Networking",
|
||||
}, {
|
||||
Name: "cluster_quit_workers",
|
||||
Default: false,
|
||||
Help: "Set to cause the controller to quit the workers when it finished.",
|
||||
Groups: "Networking",
|
||||
}, {
|
||||
Name: "cluster_batch_files",
|
||||
Default: 1000,
|
||||
Help: "Max number of files for a cluster batch.",
|
||||
Groups: "Networking",
|
||||
}, {
|
||||
Name: "cluster_batch_size",
|
||||
Default: Tebi,
|
||||
Help: "Max size of files for a cluster batch.",
|
||||
Groups: "Networking",
|
||||
}, {
|
||||
Name: "cluster_cleanup",
|
||||
Default: ClusterCleanupFull,
|
||||
Help: "Control which cluster files get cleaned up.",
|
||||
Groups: "Networking",
|
||||
}}
|
||||
|
||||
// ConfigInfo is filesystem config options
|
||||
@@ -680,6 +738,12 @@ type ConfigInfo struct {
|
||||
MaxConnections int `config:"max_connections"`
|
||||
NameTransform []string `config:"name_transform"`
|
||||
HTTPProxy string `config:"http_proxy"`
|
||||
Cluster string `config:"cluster"`
|
||||
ClusterID string `config:"cluster_id"`
|
||||
ClusterQuitWorkers bool `config:"cluster_quit_workers"`
|
||||
ClusterBatchFiles int `config:"cluster_batch_files"`
|
||||
ClusterBatchSize SizeSuffix `config:"cluster_batch_size"`
|
||||
ClusterCleanup ClusterCleanup `config:"cluster_cleanup"`
|
||||
}
|
||||
|
||||
func init() {
|
||||
|
||||
324
fs/fs_test.go
324
fs/fs_test.go
@@ -2,21 +2,9 @@ package fs
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/spf13/pflag"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/rclone/rclone/fs/config/configmap"
|
||||
"github.com/rclone/rclone/fs/fserrors"
|
||||
"github.com/rclone/rclone/lib/pacer"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
@@ -90,315 +78,3 @@ func TestFeaturesDisableList(t *testing.T) {
|
||||
assert.False(t, ft.CaseInsensitive)
|
||||
assert.False(t, ft.DuplicateFiles)
|
||||
}
|
||||
|
||||
// Check it satisfies the interface
|
||||
var _ pflag.Value = (*Option)(nil)
|
||||
|
||||
func TestOption(t *testing.T) {
|
||||
d := &Option{
|
||||
Name: "potato",
|
||||
Value: SizeSuffix(17 << 20),
|
||||
}
|
||||
assert.Equal(t, "17Mi", d.String())
|
||||
assert.Equal(t, "SizeSuffix", d.Type())
|
||||
err := d.Set("18M")
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, SizeSuffix(18<<20), d.Value)
|
||||
err = d.Set("sdfsdf")
|
||||
assert.Error(t, err)
|
||||
}
|
||||
|
||||
var errFoo = errors.New("foo")
|
||||
|
||||
type dummyPaced struct {
|
||||
retry bool
|
||||
called int
|
||||
wait *sync.Cond
|
||||
}
|
||||
|
||||
func (dp *dummyPaced) fn() (bool, error) {
|
||||
if dp.wait != nil {
|
||||
dp.wait.L.Lock()
|
||||
dp.wait.Wait()
|
||||
dp.wait.L.Unlock()
|
||||
}
|
||||
dp.called++
|
||||
return dp.retry, errFoo
|
||||
}
|
||||
|
||||
func TestPacerCall(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
config := GetConfig(ctx)
|
||||
expectedCalled := config.LowLevelRetries
|
||||
if expectedCalled == 0 {
|
||||
ctx, config = AddConfig(ctx)
|
||||
expectedCalled = 20
|
||||
config.LowLevelRetries = expectedCalled
|
||||
}
|
||||
p := NewPacer(ctx, pacer.NewDefault(pacer.MinSleep(1*time.Millisecond), pacer.MaxSleep(2*time.Millisecond)))
|
||||
|
||||
dp := &dummyPaced{retry: true}
|
||||
err := p.Call(dp.fn)
|
||||
require.Equal(t, expectedCalled, dp.called)
|
||||
require.Implements(t, (*fserrors.Retrier)(nil), err)
|
||||
}
|
||||
|
||||
func TestPacerCallNoRetry(t *testing.T) {
|
||||
p := NewPacer(context.Background(), pacer.NewDefault(pacer.MinSleep(1*time.Millisecond), pacer.MaxSleep(2*time.Millisecond)))
|
||||
|
||||
dp := &dummyPaced{retry: true}
|
||||
err := p.CallNoRetry(dp.fn)
|
||||
require.Equal(t, 1, dp.called)
|
||||
require.Implements(t, (*fserrors.Retrier)(nil), err)
|
||||
}
|
||||
|
||||
// Test options
|
||||
var (
|
||||
nouncOption = Option{
|
||||
Name: "nounc",
|
||||
}
|
||||
copyLinksOption = Option{
|
||||
Name: "copy_links",
|
||||
Default: false,
|
||||
NoPrefix: true,
|
||||
ShortOpt: "L",
|
||||
Advanced: true,
|
||||
}
|
||||
caseInsensitiveOption = Option{
|
||||
Name: "case_insensitive",
|
||||
Default: false,
|
||||
Value: true,
|
||||
Advanced: true,
|
||||
}
|
||||
testOptions = Options{nouncOption, copyLinksOption, caseInsensitiveOption}
|
||||
)
|
||||
|
||||
func TestOptionsSetValues(t *testing.T) {
|
||||
assert.Nil(t, testOptions[0].Default)
|
||||
assert.Equal(t, false, testOptions[1].Default)
|
||||
assert.Equal(t, false, testOptions[2].Default)
|
||||
testOptions.setValues()
|
||||
assert.Equal(t, "", testOptions[0].Default)
|
||||
assert.Equal(t, false, testOptions[1].Default)
|
||||
assert.Equal(t, false, testOptions[2].Default)
|
||||
}
|
||||
|
||||
func TestOptionsGet(t *testing.T) {
|
||||
opt := testOptions.Get("copy_links")
|
||||
assert.Equal(t, ©LinksOption, opt)
|
||||
opt = testOptions.Get("not_found")
|
||||
assert.Nil(t, opt)
|
||||
}
|
||||
|
||||
func TestOptionsOveridden(t *testing.T) {
|
||||
m := configmap.New()
|
||||
m1 := configmap.Simple{
|
||||
"nounc": "m1",
|
||||
"copy_links": "m1",
|
||||
}
|
||||
m.AddGetter(m1, configmap.PriorityNormal)
|
||||
m2 := configmap.Simple{
|
||||
"nounc": "m2",
|
||||
"case_insensitive": "m2",
|
||||
}
|
||||
m.AddGetter(m2, configmap.PriorityConfig)
|
||||
m3 := configmap.Simple{
|
||||
"nounc": "m3",
|
||||
}
|
||||
m.AddGetter(m3, configmap.PriorityDefault)
|
||||
got := testOptions.Overridden(m)
|
||||
assert.Equal(t, configmap.Simple{
|
||||
"copy_links": "m1",
|
||||
"nounc": "m1",
|
||||
}, got)
|
||||
}
|
||||
|
||||
func TestOptionsNonDefault(t *testing.T) {
|
||||
m := configmap.Simple{}
|
||||
got := testOptions.NonDefault(m)
|
||||
assert.Equal(t, configmap.Simple{}, got)
|
||||
|
||||
m["case_insensitive"] = "false"
|
||||
got = testOptions.NonDefault(m)
|
||||
assert.Equal(t, configmap.Simple{}, got)
|
||||
|
||||
m["case_insensitive"] = "true"
|
||||
got = testOptions.NonDefault(m)
|
||||
assert.Equal(t, configmap.Simple{"case_insensitive": "true"}, got)
|
||||
}
|
||||
|
||||
func TestOptionMarshalJSON(t *testing.T) {
|
||||
out, err := json.MarshalIndent(&caseInsensitiveOption, "", "")
|
||||
assert.NoError(t, err)
|
||||
require.Equal(t, `{
|
||||
"Name": "case_insensitive",
|
||||
"FieldName": "",
|
||||
"Help": "",
|
||||
"Default": false,
|
||||
"Value": true,
|
||||
"Hide": 0,
|
||||
"Required": false,
|
||||
"IsPassword": false,
|
||||
"NoPrefix": false,
|
||||
"Advanced": true,
|
||||
"Exclusive": false,
|
||||
"Sensitive": false,
|
||||
"DefaultStr": "false",
|
||||
"ValueStr": "true",
|
||||
"Type": "bool"
|
||||
}`, string(out))
|
||||
}
|
||||
|
||||
func TestOptionGetValue(t *testing.T) {
|
||||
assert.Equal(t, "", nouncOption.GetValue())
|
||||
assert.Equal(t, false, copyLinksOption.GetValue())
|
||||
assert.Equal(t, true, caseInsensitiveOption.GetValue())
|
||||
}
|
||||
|
||||
func TestOptionString(t *testing.T) {
|
||||
assert.Equal(t, "", nouncOption.String())
|
||||
assert.Equal(t, "false", copyLinksOption.String())
|
||||
assert.Equal(t, "true", caseInsensitiveOption.String())
|
||||
}
|
||||
|
||||
func TestOptionStringStringArray(t *testing.T) {
|
||||
opt := Option{
|
||||
Name: "string_array",
|
||||
Default: []string(nil),
|
||||
}
|
||||
assert.Equal(t, "", opt.String())
|
||||
opt.Default = []string{}
|
||||
assert.Equal(t, "", opt.String())
|
||||
opt.Default = []string{"a", "b"}
|
||||
assert.Equal(t, "a,b", opt.String())
|
||||
opt.Default = []string{"hello, world!", "goodbye, world!"}
|
||||
assert.Equal(t, `"hello, world!","goodbye, world!"`, opt.String())
|
||||
}
|
||||
|
||||
func TestOptionStringSizeSuffix(t *testing.T) {
|
||||
opt := Option{
|
||||
Name: "size_suffix",
|
||||
Default: SizeSuffix(0),
|
||||
}
|
||||
assert.Equal(t, "0", opt.String())
|
||||
opt.Default = SizeSuffix(-1)
|
||||
assert.Equal(t, "off", opt.String())
|
||||
opt.Default = SizeSuffix(100)
|
||||
assert.Equal(t, "100B", opt.String())
|
||||
opt.Default = SizeSuffix(1024)
|
||||
assert.Equal(t, "1Ki", opt.String())
|
||||
}
|
||||
|
||||
func TestOptionSet(t *testing.T) {
|
||||
o := caseInsensitiveOption
|
||||
assert.Equal(t, true, o.Value)
|
||||
err := o.Set("FALSE")
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, false, o.Value)
|
||||
|
||||
o = copyLinksOption
|
||||
assert.Equal(t, nil, o.Value)
|
||||
err = o.Set("True")
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, true, o.Value)
|
||||
|
||||
err = o.Set("INVALID")
|
||||
assert.Error(t, err)
|
||||
assert.Equal(t, true, o.Value)
|
||||
}
|
||||
|
||||
func TestOptionType(t *testing.T) {
|
||||
assert.Equal(t, "string", nouncOption.Type())
|
||||
assert.Equal(t, "bool", copyLinksOption.Type())
|
||||
assert.Equal(t, "bool", caseInsensitiveOption.Type())
|
||||
}
|
||||
|
||||
func TestOptionFlagName(t *testing.T) {
|
||||
assert.Equal(t, "local-nounc", nouncOption.FlagName("local"))
|
||||
assert.Equal(t, "copy-links", copyLinksOption.FlagName("local"))
|
||||
assert.Equal(t, "local-case-insensitive", caseInsensitiveOption.FlagName("local"))
|
||||
}
|
||||
|
||||
func TestOptionEnvVarName(t *testing.T) {
|
||||
assert.Equal(t, "RCLONE_LOCAL_NOUNC", nouncOption.EnvVarName("local"))
|
||||
assert.Equal(t, "RCLONE_LOCAL_COPY_LINKS", copyLinksOption.EnvVarName("local"))
|
||||
assert.Equal(t, "RCLONE_LOCAL_CASE_INSENSITIVE", caseInsensitiveOption.EnvVarName("local"))
|
||||
}
|
||||
|
||||
func TestOptionGetters(t *testing.T) {
|
||||
// Set up env vars
|
||||
envVars := [][2]string{
|
||||
{"RCLONE_CONFIG_LOCAL_POTATO_PIE", "yes"},
|
||||
{"RCLONE_COPY_LINKS", "TRUE"},
|
||||
{"RCLONE_LOCAL_NOUNC", "NOUNC"},
|
||||
}
|
||||
for _, ev := range envVars {
|
||||
assert.NoError(t, os.Setenv(ev[0], ev[1]))
|
||||
}
|
||||
defer func() {
|
||||
for _, ev := range envVars {
|
||||
assert.NoError(t, os.Unsetenv(ev[0]))
|
||||
}
|
||||
}()
|
||||
|
||||
oldConfigFileGet := ConfigFileGet
|
||||
ConfigFileGet = func(section, key string) (string, bool) {
|
||||
if section == "sausage" && key == "key1" {
|
||||
return "value1", true
|
||||
}
|
||||
return "", false
|
||||
}
|
||||
defer func() {
|
||||
ConfigFileGet = oldConfigFileGet
|
||||
}()
|
||||
|
||||
// set up getters
|
||||
|
||||
// A configmap.Getter to read from the environment RCLONE_CONFIG_backend_option_name
|
||||
configEnvVarsGetter := configEnvVars("local")
|
||||
|
||||
// A configmap.Getter to read from the environment RCLONE_option_name
|
||||
optionEnvVarsGetter := optionEnvVars{"local", testOptions}
|
||||
|
||||
// A configmap.Getter to read either the default value or the set
|
||||
// value from the RegInfo.Options
|
||||
regInfoValuesGetterFalse := ®InfoValues{
|
||||
options: testOptions,
|
||||
useDefault: false,
|
||||
}
|
||||
regInfoValuesGetterTrue := ®InfoValues{
|
||||
options: testOptions,
|
||||
useDefault: true,
|
||||
}
|
||||
|
||||
// A configmap.Setter to read from the config file
|
||||
configFileGetter := getConfigFile("sausage")
|
||||
|
||||
for i, test := range []struct {
|
||||
get configmap.Getter
|
||||
key string
|
||||
wantValue string
|
||||
wantOk bool
|
||||
}{
|
||||
{configEnvVarsGetter, "not_found", "", false},
|
||||
{configEnvVarsGetter, "potato_pie", "yes", true},
|
||||
{optionEnvVarsGetter, "not_found", "", false},
|
||||
{optionEnvVarsGetter, "copy_links", "TRUE", true},
|
||||
{optionEnvVarsGetter, "nounc", "NOUNC", true},
|
||||
{optionEnvVarsGetter, "case_insensitive", "", false},
|
||||
{regInfoValuesGetterFalse, "not_found", "", false},
|
||||
{regInfoValuesGetterFalse, "case_insensitive", "true", true},
|
||||
{regInfoValuesGetterFalse, "copy_links", "", false},
|
||||
{regInfoValuesGetterTrue, "not_found", "", false},
|
||||
{regInfoValuesGetterTrue, "case_insensitive", "true", true},
|
||||
{regInfoValuesGetterTrue, "copy_links", "false", true},
|
||||
{configFileGetter, "not_found", "", false},
|
||||
{configFileGetter, "key1", "value1", true},
|
||||
} {
|
||||
what := fmt.Sprintf("%d: %+v: %q", i, test.get, test.key)
|
||||
gotValue, gotOk := test.get.Get(test.key)
|
||||
assert.Equal(t, test.wantValue, gotValue, what)
|
||||
assert.Equal(t, test.wantOk, gotOk, what)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
57
fs/pacer_test.go
Normal file
57
fs/pacer_test.go
Normal file
@@ -0,0 +1,57 @@
|
||||
package fs
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/rclone/rclone/fs/fserrors"
|
||||
"github.com/rclone/rclone/lib/pacer"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
var errFoo = errors.New("foo")
|
||||
|
||||
type dummyPaced struct {
|
||||
retry bool
|
||||
called int
|
||||
wait *sync.Cond
|
||||
}
|
||||
|
||||
func (dp *dummyPaced) fn() (bool, error) {
|
||||
if dp.wait != nil {
|
||||
dp.wait.L.Lock()
|
||||
dp.wait.Wait()
|
||||
dp.wait.L.Unlock()
|
||||
}
|
||||
dp.called++
|
||||
return dp.retry, errFoo
|
||||
}
|
||||
|
||||
func TestPacerCall(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
config := GetConfig(ctx)
|
||||
expectedCalled := config.LowLevelRetries
|
||||
if expectedCalled == 0 {
|
||||
ctx, config = AddConfig(ctx)
|
||||
expectedCalled = 20
|
||||
config.LowLevelRetries = expectedCalled
|
||||
}
|
||||
p := NewPacer(ctx, pacer.NewDefault(pacer.MinSleep(1*time.Millisecond), pacer.MaxSleep(2*time.Millisecond)))
|
||||
|
||||
dp := &dummyPaced{retry: true}
|
||||
err := p.Call(dp.fn)
|
||||
require.Equal(t, expectedCalled, dp.called)
|
||||
require.Implements(t, (*fserrors.Retrier)(nil), err)
|
||||
}
|
||||
|
||||
func TestPacerCallNoRetry(t *testing.T) {
|
||||
p := NewPacer(context.Background(), pacer.NewDefault(pacer.MinSleep(1*time.Millisecond), pacer.MaxSleep(2*time.Millisecond)))
|
||||
|
||||
dp := &dummyPaced{retry: true}
|
||||
err := p.CallNoRetry(dp.fn)
|
||||
require.Equal(t, 1, dp.called)
|
||||
require.Implements(t, (*fserrors.Retrier)(nil), err)
|
||||
}
|
||||
@@ -2,11 +2,15 @@
|
||||
package jobs
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"runtime/debug"
|
||||
"slices"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
@@ -17,6 +21,7 @@ import (
|
||||
"github.com/rclone/rclone/fs/cache"
|
||||
"github.com/rclone/rclone/fs/filter"
|
||||
"github.com/rclone/rclone/fs/rc"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
// Fill in these to avoid circular dependencies
|
||||
@@ -475,3 +480,249 @@ func rcGroupStop(ctx context.Context, in rc.Params) (out rc.Params, err error) {
|
||||
out = make(rc.Params)
|
||||
return out, nil
|
||||
}
|
||||
|
||||
// NewJobFromParams creates an rc job rc.Params.
|
||||
//
|
||||
// The JSON blob should contain a _path entry.
|
||||
//
|
||||
// It returns a rc.Params as output which may be an error.
|
||||
func NewJobFromParams(ctx context.Context, in rc.Params) (out rc.Params) {
|
||||
path := "unknown"
|
||||
|
||||
// Return an rc error blob
|
||||
rcError := func(err error, status int) rc.Params {
|
||||
fs.Errorf(nil, "rc: %q: error: %v", path, err)
|
||||
out, _ = rc.Error(path, in, err, status)
|
||||
return out
|
||||
}
|
||||
|
||||
// Find the call
|
||||
path, err := in.GetString("_path")
|
||||
if err != nil {
|
||||
return rcError(err, http.StatusNotFound)
|
||||
}
|
||||
delete(in, "_path")
|
||||
call := rc.Calls.Get(path)
|
||||
if call == nil {
|
||||
return rcError(fmt.Errorf("couldn't find path %q", path), http.StatusNotFound)
|
||||
}
|
||||
if call.NeedsRequest {
|
||||
return rcError(fmt.Errorf("can't run path %q as it needs the request", path), http.StatusBadRequest)
|
||||
}
|
||||
if call.NeedsResponse {
|
||||
return rcError(fmt.Errorf("can't run path %q as it needs the response", path), http.StatusBadRequest)
|
||||
}
|
||||
|
||||
// Pass on the group if one is set in the context and it isn't set in the input.
|
||||
if _, found := in["_group"]; !found {
|
||||
group, ok := accounting.StatsGroupFromContext(ctx)
|
||||
if ok {
|
||||
in["_group"] = group
|
||||
}
|
||||
}
|
||||
|
||||
fs.Debugf(nil, "rc: %q: with parameters %+v", path, in)
|
||||
_, out, err = NewJob(ctx, call.Fn, in)
|
||||
if err != nil {
|
||||
return rcError(err, http.StatusInternalServerError)
|
||||
}
|
||||
if out == nil {
|
||||
out = make(rc.Params)
|
||||
}
|
||||
|
||||
fs.Debugf(nil, "rc: %q: reply %+v: %v", path, out, err)
|
||||
return out
|
||||
}
|
||||
|
||||
// NewJobFromBytes creates an rc job from a JSON blob as bytes.
|
||||
//
|
||||
// The JSON blob should contain a _path entry.
|
||||
//
|
||||
// It returns a JSON blob as output which may be an error.
|
||||
func NewJobFromBytes(ctx context.Context, inBuf []byte) (outBuf []byte) {
|
||||
var in rc.Params
|
||||
var out rc.Params
|
||||
|
||||
// Parse a JSON blob from the input
|
||||
err := json.Unmarshal(inBuf, &in)
|
||||
if err != nil {
|
||||
out, _ = rc.Error("unknown", in, err, http.StatusBadRequest)
|
||||
} else {
|
||||
out = NewJobFromParams(ctx, in)
|
||||
}
|
||||
|
||||
var w bytes.Buffer
|
||||
err = rc.WriteJSON(&w, out)
|
||||
if err != nil {
|
||||
fs.Errorf(nil, "rc: NewJobFromBytes: failed to write JSON output: %v", err)
|
||||
return []byte(`{"error":"failed to write JSON output"}`)
|
||||
}
|
||||
return w.Bytes()
|
||||
}
|
||||
|
||||
func init() {
|
||||
rc.Add(rc.Call{
|
||||
Path: "job/batch",
|
||||
AuthRequired: true, // require auth always since sub commands may require it
|
||||
Fn: rcBatch,
|
||||
Title: "Run a batch of rclone rc commands concurrently.",
|
||||
Help: strings.ReplaceAll(`
|
||||
This takes the following parameters:
|
||||
|
||||
- concurrency - int - do this many commands concurrently. Defaults to |--transfers| if not set.
|
||||
- inputs - an list of inputs to the commands with an extra |_path| parameter
|
||||
|
||||
|||json
|
||||
{
|
||||
"_path": "rc/path",
|
||||
"param1": "parameter for the path as documented",
|
||||
"param2": "parameter for the path as documented, etc",
|
||||
}
|
||||
|||json
|
||||
|
||||
The inputs may use |_async|, |_group|, |_config| and |_filter| as normal when using the rc.
|
||||
|
||||
Returns:
|
||||
|
||||
- results - a list of results from the commands with one entry for each in inputs.
|
||||
|
||||
For example:
|
||||
|
||||
|||sh
|
||||
rclone rc job/batch --json '{
|
||||
"inputs": [
|
||||
{
|
||||
"_path": "rc/noop",
|
||||
"parameter": "OK"
|
||||
},
|
||||
{
|
||||
"_path": "rc/error",
|
||||
"parameter": "BAD"
|
||||
}
|
||||
]
|
||||
}
|
||||
'
|
||||
|||
|
||||
|
||||
Gives the result:
|
||||
|
||||
|||json
|
||||
{
|
||||
"results": [
|
||||
{
|
||||
"parameter": "OK"
|
||||
},
|
||||
{
|
||||
"error": "arbitrary error on input map[parameter:BAD]",
|
||||
"input": {
|
||||
"parameter": "BAD"
|
||||
},
|
||||
"path": "rc/error",
|
||||
"status": 500
|
||||
}
|
||||
]
|
||||
}
|
||||
|||
|
||||
`, "|", "`"),
|
||||
})
|
||||
}
|
||||
|
||||
/*
|
||||
// Run a single batch job
|
||||
func runBatchJob(ctx context.Context, inputAny any) (out rc.Params, err error) {
|
||||
var in rc.Params
|
||||
path := "unknown"
|
||||
defer func() {
|
||||
if err != nil {
|
||||
out, _ = rc.Error(path, in, err, http.StatusInternalServerError)
|
||||
}
|
||||
}()
|
||||
|
||||
// get the inputs to the job
|
||||
input, ok := inputAny.(map[string]any)
|
||||
if !ok {
|
||||
return nil, rc.NewErrParamInvalid(fmt.Errorf("\"inputs\" items must be objects not %T", inputAny))
|
||||
}
|
||||
in = rc.Params(input)
|
||||
path, err = in.GetString("_path")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
delete(in, "_path")
|
||||
call := rc.Calls.Get(path)
|
||||
|
||||
// Check call
|
||||
if call == nil {
|
||||
return nil, rc.NewErrParamInvalid(fmt.Errorf("path %q does not exist", path))
|
||||
}
|
||||
path = call.Path
|
||||
if call.NeedsRequest {
|
||||
return nil, rc.NewErrParamInvalid(fmt.Errorf("can't run path %q as it needs the request", path))
|
||||
}
|
||||
if call.NeedsResponse {
|
||||
return nil, rc.NewErrParamInvalid(fmt.Errorf("can't run path %q as it needs the response", path))
|
||||
}
|
||||
|
||||
// Run the job
|
||||
_, out, err = NewJob(ctx, call.Fn, in)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Reshape (serialize then deserialize) the data so it is in the form expected
|
||||
err = rc.Reshape(&out, out)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
*/
|
||||
|
||||
// Batch the registered commands
|
||||
func rcBatch(ctx context.Context, in rc.Params) (out rc.Params, err error) {
|
||||
out = make(rc.Params)
|
||||
|
||||
// Read inputs
|
||||
inputsAny, err := in.Get("inputs")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
inputs, ok := inputsAny.([]any)
|
||||
if !ok {
|
||||
return nil, rc.NewErrParamInvalid(fmt.Errorf("expecting list key %q (was %T)", "inputs", inputsAny))
|
||||
}
|
||||
|
||||
// Read concurrency
|
||||
concurrency, err := in.GetInt64("concurrency")
|
||||
if rc.IsErrParamNotFound(err) {
|
||||
ci := fs.GetConfig(ctx)
|
||||
concurrency = int64(ci.Transfers)
|
||||
} else if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Prepare outputs
|
||||
results := make([]rc.Params, len(inputs))
|
||||
out["results"] = results
|
||||
|
||||
g, gCtx := errgroup.WithContext(ctx)
|
||||
g.SetLimit(int(concurrency))
|
||||
for i, inputAny := range inputs {
|
||||
input, ok := inputAny.(map[string]any)
|
||||
if !ok {
|
||||
results[i], _ = rc.Error("unknown", nil, fmt.Errorf("\"inputs\" items must be objects not %T", inputAny), http.StatusBadRequest)
|
||||
continue
|
||||
}
|
||||
in := rc.Params(input)
|
||||
if concurrency <= 1 {
|
||||
results[i] = NewJobFromParams(ctx, in)
|
||||
} else {
|
||||
g.Go(func() error {
|
||||
results[i] = NewJobFromParams(gCtx, in)
|
||||
return nil
|
||||
})
|
||||
}
|
||||
}
|
||||
_ = g.Wait()
|
||||
return out, nil
|
||||
}
|
||||
|
||||
@@ -2,6 +2,7 @@ package jobs
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"runtime"
|
||||
"testing"
|
||||
@@ -602,3 +603,294 @@ func TestOnFinishDataRace(t *testing.T) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Register some test rc calls
|
||||
func init() {
|
||||
rc.Add(rc.Call{
|
||||
Path: "test/needs_request",
|
||||
NeedsRequest: true,
|
||||
})
|
||||
rc.Add(rc.Call{
|
||||
Path: "test/needs_response",
|
||||
NeedsResponse: true,
|
||||
})
|
||||
|
||||
}
|
||||
|
||||
func TestNewJobFromParams(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
for _, test := range []struct {
|
||||
in rc.Params
|
||||
want rc.Params
|
||||
}{{
|
||||
in: rc.Params{
|
||||
"_path": "rc/noop",
|
||||
"a": "potato",
|
||||
},
|
||||
want: rc.Params{
|
||||
"a": "potato",
|
||||
},
|
||||
}, {
|
||||
in: rc.Params{
|
||||
"_path": "rc/noop",
|
||||
"b": "sausage",
|
||||
},
|
||||
want: rc.Params{
|
||||
"b": "sausage",
|
||||
},
|
||||
}, {
|
||||
in: rc.Params{
|
||||
"_path": "rc/error",
|
||||
"e": "sausage",
|
||||
},
|
||||
want: rc.Params{
|
||||
"error": "arbitrary error on input map[e:sausage]",
|
||||
"input": rc.Params{
|
||||
"e": "sausage",
|
||||
},
|
||||
"path": "rc/error",
|
||||
"status": 500,
|
||||
},
|
||||
}, {
|
||||
in: rc.Params{
|
||||
"_path": "bad/path",
|
||||
"param": "sausage",
|
||||
},
|
||||
want: rc.Params{
|
||||
"error": "couldn't find path \"bad/path\"",
|
||||
"input": rc.Params{
|
||||
"param": "sausage",
|
||||
},
|
||||
"path": "bad/path",
|
||||
"status": 404,
|
||||
},
|
||||
}, {
|
||||
in: rc.Params{
|
||||
"_path": "test/needs_request",
|
||||
},
|
||||
want: rc.Params{
|
||||
"error": "can't run path \"test/needs_request\" as it needs the request",
|
||||
"input": rc.Params{},
|
||||
"path": "test/needs_request",
|
||||
"status": 400,
|
||||
},
|
||||
}, {
|
||||
in: rc.Params{
|
||||
"_path": "test/needs_response",
|
||||
},
|
||||
want: rc.Params{
|
||||
"error": "can't run path \"test/needs_response\" as it needs the response",
|
||||
"input": rc.Params{},
|
||||
"path": "test/needs_response",
|
||||
"status": 400,
|
||||
},
|
||||
}, {
|
||||
in: rc.Params{
|
||||
"nopath": "BOOM",
|
||||
},
|
||||
want: rc.Params{
|
||||
"error": "Didn't find key \"_path\" in input",
|
||||
"input": rc.Params{
|
||||
"nopath": "BOOM",
|
||||
},
|
||||
"path": "",
|
||||
"status": 400,
|
||||
},
|
||||
}} {
|
||||
got := NewJobFromParams(ctx, test.in)
|
||||
assert.Equal(t, test.want, got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestNewJobFromBytes(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
for _, test := range []struct {
|
||||
in string
|
||||
want string
|
||||
}{{
|
||||
in: `{
|
||||
"_path": "rc/noop",
|
||||
"a": "potato"
|
||||
}`,
|
||||
want: `{
|
||||
"a": "potato"
|
||||
}
|
||||
`,
|
||||
}, {
|
||||
in: `{
|
||||
"_path": "rc/error",
|
||||
"e": "sausage"
|
||||
}`,
|
||||
want: `{
|
||||
"error": "arbitrary error on input map[e:sausage]",
|
||||
"input": {
|
||||
"e": "sausage"
|
||||
},
|
||||
"path": "rc/error",
|
||||
"status": 500
|
||||
}
|
||||
`,
|
||||
}, {
|
||||
in: `parse error`,
|
||||
want: `{
|
||||
"error": "invalid character 'p' looking for beginning of value",
|
||||
"input": null,
|
||||
"path": "unknown",
|
||||
"status": 400
|
||||
}
|
||||
`,
|
||||
}, {
|
||||
in: `"just a string"`,
|
||||
want: `{
|
||||
"error": "json: cannot unmarshal string into Go value of type rc.Params",
|
||||
"input": null,
|
||||
"path": "unknown",
|
||||
"status": 400
|
||||
}
|
||||
`,
|
||||
}} {
|
||||
got := NewJobFromBytes(ctx, []byte(test.in))
|
||||
assert.Equal(t, test.want, string(got))
|
||||
}
|
||||
}
|
||||
|
||||
func TestJobsBatch(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
call := rc.Calls.Get("job/batch")
|
||||
assert.NotNil(t, call)
|
||||
|
||||
inJSON := `{
|
||||
"inputs": [
|
||||
{
|
||||
"_path": "rc/noop",
|
||||
"a": "potato"
|
||||
},
|
||||
"bad string",
|
||||
{
|
||||
"_path": "rc/noop",
|
||||
"b": "sausage"
|
||||
},
|
||||
{
|
||||
"_path": "rc/error",
|
||||
"e": "sausage"
|
||||
},
|
||||
{
|
||||
"_path": "bad/path",
|
||||
"param": "sausage"
|
||||
},
|
||||
{
|
||||
"_path": "test/needs_request"
|
||||
},
|
||||
{
|
||||
"_path": "test/needs_response"
|
||||
},
|
||||
{
|
||||
"nopath": "BOOM"
|
||||
}
|
||||
]
|
||||
}
|
||||
`
|
||||
var in rc.Params
|
||||
require.NoError(t, json.Unmarshal([]byte(inJSON), &in))
|
||||
|
||||
wantJSON := `{
|
||||
"results": [
|
||||
{
|
||||
"a": "potato"
|
||||
},
|
||||
{
|
||||
"error": "\"inputs\" items must be objects not string",
|
||||
"input": null,
|
||||
"path": "unknown",
|
||||
"status": 400
|
||||
},
|
||||
{
|
||||
"b": "sausage"
|
||||
},
|
||||
{
|
||||
"error": "arbitrary error on input map[e:sausage]",
|
||||
"input": {
|
||||
"e": "sausage"
|
||||
},
|
||||
"path": "rc/error",
|
||||
"status": 500
|
||||
},
|
||||
{
|
||||
"error": "couldn't find path \"bad/path\"",
|
||||
"input": {
|
||||
"param": "sausage"
|
||||
},
|
||||
"path": "bad/path",
|
||||
"status": 404
|
||||
},
|
||||
{
|
||||
"error": "can't run path \"test/needs_request\" as it needs the request",
|
||||
"input": {},
|
||||
"path": "test/needs_request",
|
||||
"status": 400
|
||||
},
|
||||
{
|
||||
"error": "can't run path \"test/needs_response\" as it needs the response",
|
||||
"input": {},
|
||||
"path": "test/needs_response",
|
||||
"status": 400
|
||||
},
|
||||
{
|
||||
"error": "Didn't find key \"_path\" in input",
|
||||
"input": {
|
||||
"nopath": "BOOM"
|
||||
},
|
||||
"path": "",
|
||||
"status": 400
|
||||
}
|
||||
]
|
||||
}
|
||||
`
|
||||
|
||||
var want rc.Params
|
||||
require.NoError(t, json.Unmarshal([]byte(wantJSON), &want))
|
||||
|
||||
out, err := call.Fn(ctx, in)
|
||||
require.NoError(t, err)
|
||||
|
||||
var got rc.Params
|
||||
require.NoError(t, rc.Reshape(&got, out))
|
||||
|
||||
assert.Equal(t, want, got)
|
||||
}
|
||||
|
||||
func TestJobsBatchConcurrent(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
for concurrency := range 10 {
|
||||
in := rc.Params{}
|
||||
var inputs []any
|
||||
var results []rc.Params
|
||||
for i := range 100 {
|
||||
in := map[string]any{
|
||||
"_path": "rc/noop",
|
||||
"i": i,
|
||||
}
|
||||
inputs = append(inputs, in)
|
||||
results = append(results, rc.Params{
|
||||
"i": i,
|
||||
})
|
||||
}
|
||||
in["inputs"] = inputs
|
||||
want := rc.Params{
|
||||
"results": results,
|
||||
}
|
||||
|
||||
if concurrency > 0 {
|
||||
in["concurrency"] = concurrency
|
||||
}
|
||||
call := rc.Calls.Get("job/batch")
|
||||
assert.NotNil(t, call)
|
||||
|
||||
got, err := call.Fn(ctx, in)
|
||||
require.NoError(t, err)
|
||||
|
||||
assert.Equal(t, want, got)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -154,6 +154,37 @@ func (os Options) NonDefault(m configmap.Getter) configmap.Simple {
|
||||
return nonDefault
|
||||
}
|
||||
|
||||
// NonDefaultRC discovers which config values aren't at their default
|
||||
//
|
||||
// It expects a pointer to the current config struct in opts.
|
||||
//
|
||||
// It returns the overridden config in rc config format.
|
||||
func (os Options) NonDefaultRC(opts any) (map[string]any, error) {
|
||||
items, err := configstruct.Items(opts)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
itemsByName := map[string]*configstruct.Item{}
|
||||
for i := range items {
|
||||
item := &items[i]
|
||||
itemsByName[item.Name] = item
|
||||
}
|
||||
var nonDefault = map[string]any{}
|
||||
for i := range os {
|
||||
opt := &os[i]
|
||||
item, found := itemsByName[opt.Name]
|
||||
if !found {
|
||||
return nil, fmt.Errorf("key %q in OptionsInfo not found in Options struct", opt.Name)
|
||||
}
|
||||
value := fmt.Sprint(item.Value)
|
||||
defaultValue := fmt.Sprint(opt.Default)
|
||||
if value != defaultValue {
|
||||
nonDefault[item.Field] = item.Value
|
||||
}
|
||||
}
|
||||
return nonDefault, nil
|
||||
}
|
||||
|
||||
// HasAdvanced discovers if any options have an Advanced setting
|
||||
func (os Options) HasAdvanced() bool {
|
||||
for i := range os {
|
||||
|
||||
308
fs/registry_test.go
Normal file
308
fs/registry_test.go
Normal file
@@ -0,0 +1,308 @@
|
||||
package fs
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
"github.com/rclone/rclone/fs/config/configmap"
|
||||
"github.com/spf13/pflag"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
// Check it satisfies the interface
|
||||
var _ pflag.Value = (*Option)(nil)
|
||||
|
||||
func TestOption(t *testing.T) {
|
||||
d := &Option{
|
||||
Name: "potato",
|
||||
Value: SizeSuffix(17 << 20),
|
||||
}
|
||||
assert.Equal(t, "17Mi", d.String())
|
||||
assert.Equal(t, "SizeSuffix", d.Type())
|
||||
err := d.Set("18M")
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, SizeSuffix(18<<20), d.Value)
|
||||
err = d.Set("sdfsdf")
|
||||
assert.Error(t, err)
|
||||
}
|
||||
|
||||
// Test options
|
||||
var (
|
||||
nouncOption = Option{
|
||||
Name: "nounc",
|
||||
}
|
||||
copyLinksOption = Option{
|
||||
Name: "copy_links",
|
||||
Default: false,
|
||||
NoPrefix: true,
|
||||
ShortOpt: "L",
|
||||
Advanced: true,
|
||||
}
|
||||
caseInsensitiveOption = Option{
|
||||
Name: "case_insensitive",
|
||||
Default: false,
|
||||
Value: true,
|
||||
Advanced: true,
|
||||
}
|
||||
testOptions = Options{nouncOption, copyLinksOption, caseInsensitiveOption}
|
||||
)
|
||||
|
||||
func TestOptionsSetValues(t *testing.T) {
|
||||
assert.Nil(t, testOptions[0].Default)
|
||||
assert.Equal(t, false, testOptions[1].Default)
|
||||
assert.Equal(t, false, testOptions[2].Default)
|
||||
testOptions.setValues()
|
||||
assert.Equal(t, "", testOptions[0].Default)
|
||||
assert.Equal(t, false, testOptions[1].Default)
|
||||
assert.Equal(t, false, testOptions[2].Default)
|
||||
}
|
||||
|
||||
func TestOptionsGet(t *testing.T) {
|
||||
opt := testOptions.Get("copy_links")
|
||||
assert.Equal(t, ©LinksOption, opt)
|
||||
opt = testOptions.Get("not_found")
|
||||
assert.Nil(t, opt)
|
||||
}
|
||||
|
||||
func TestOptionsOveridden(t *testing.T) {
|
||||
m := configmap.New()
|
||||
m1 := configmap.Simple{
|
||||
"nounc": "m1",
|
||||
"copy_links": "m1",
|
||||
}
|
||||
m.AddGetter(m1, configmap.PriorityNormal)
|
||||
m2 := configmap.Simple{
|
||||
"nounc": "m2",
|
||||
"case_insensitive": "m2",
|
||||
}
|
||||
m.AddGetter(m2, configmap.PriorityConfig)
|
||||
m3 := configmap.Simple{
|
||||
"nounc": "m3",
|
||||
}
|
||||
m.AddGetter(m3, configmap.PriorityDefault)
|
||||
got := testOptions.Overridden(m)
|
||||
assert.Equal(t, configmap.Simple{
|
||||
"copy_links": "m1",
|
||||
"nounc": "m1",
|
||||
}, got)
|
||||
}
|
||||
|
||||
func TestOptionsNonDefault(t *testing.T) {
|
||||
m := configmap.Simple{}
|
||||
got := testOptions.NonDefault(m)
|
||||
assert.Equal(t, configmap.Simple{}, got)
|
||||
|
||||
m["case_insensitive"] = "false"
|
||||
got = testOptions.NonDefault(m)
|
||||
assert.Equal(t, configmap.Simple{}, got)
|
||||
|
||||
m["case_insensitive"] = "true"
|
||||
got = testOptions.NonDefault(m)
|
||||
assert.Equal(t, configmap.Simple{"case_insensitive": "true"}, got)
|
||||
}
|
||||
|
||||
func TestOptionMarshalJSON(t *testing.T) {
|
||||
out, err := json.MarshalIndent(&caseInsensitiveOption, "", "")
|
||||
assert.NoError(t, err)
|
||||
require.Equal(t, `{
|
||||
"Name": "case_insensitive",
|
||||
"FieldName": "",
|
||||
"Help": "",
|
||||
"Default": false,
|
||||
"Value": true,
|
||||
"Hide": 0,
|
||||
"Required": false,
|
||||
"IsPassword": false,
|
||||
"NoPrefix": false,
|
||||
"Advanced": true,
|
||||
"Exclusive": false,
|
||||
"Sensitive": false,
|
||||
"DefaultStr": "false",
|
||||
"ValueStr": "true",
|
||||
"Type": "bool"
|
||||
}`, string(out))
|
||||
}
|
||||
|
||||
func TestOptionGetValue(t *testing.T) {
|
||||
assert.Equal(t, "", nouncOption.GetValue())
|
||||
assert.Equal(t, false, copyLinksOption.GetValue())
|
||||
assert.Equal(t, true, caseInsensitiveOption.GetValue())
|
||||
}
|
||||
|
||||
func TestOptionString(t *testing.T) {
|
||||
assert.Equal(t, "", nouncOption.String())
|
||||
assert.Equal(t, "false", copyLinksOption.String())
|
||||
assert.Equal(t, "true", caseInsensitiveOption.String())
|
||||
}
|
||||
|
||||
func TestOptionStringStringArray(t *testing.T) {
|
||||
opt := Option{
|
||||
Name: "string_array",
|
||||
Default: []string(nil),
|
||||
}
|
||||
assert.Equal(t, "", opt.String())
|
||||
opt.Default = []string{}
|
||||
assert.Equal(t, "", opt.String())
|
||||
opt.Default = []string{"a", "b"}
|
||||
assert.Equal(t, "a,b", opt.String())
|
||||
opt.Default = []string{"hello, world!", "goodbye, world!"}
|
||||
assert.Equal(t, `"hello, world!","goodbye, world!"`, opt.String())
|
||||
}
|
||||
|
||||
func TestOptionStringSizeSuffix(t *testing.T) {
|
||||
opt := Option{
|
||||
Name: "size_suffix",
|
||||
Default: SizeSuffix(0),
|
||||
}
|
||||
assert.Equal(t, "0", opt.String())
|
||||
opt.Default = SizeSuffix(-1)
|
||||
assert.Equal(t, "off", opt.String())
|
||||
opt.Default = SizeSuffix(100)
|
||||
assert.Equal(t, "100B", opt.String())
|
||||
opt.Default = SizeSuffix(1024)
|
||||
assert.Equal(t, "1Ki", opt.String())
|
||||
}
|
||||
|
||||
func TestOptionSet(t *testing.T) {
|
||||
o := caseInsensitiveOption
|
||||
assert.Equal(t, true, o.Value)
|
||||
err := o.Set("FALSE")
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, false, o.Value)
|
||||
|
||||
o = copyLinksOption
|
||||
assert.Equal(t, nil, o.Value)
|
||||
err = o.Set("True")
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, true, o.Value)
|
||||
|
||||
err = o.Set("INVALID")
|
||||
assert.Error(t, err)
|
||||
assert.Equal(t, true, o.Value)
|
||||
}
|
||||
|
||||
func TestOptionType(t *testing.T) {
|
||||
assert.Equal(t, "string", nouncOption.Type())
|
||||
assert.Equal(t, "bool", copyLinksOption.Type())
|
||||
assert.Equal(t, "bool", caseInsensitiveOption.Type())
|
||||
}
|
||||
|
||||
func TestOptionFlagName(t *testing.T) {
|
||||
assert.Equal(t, "local-nounc", nouncOption.FlagName("local"))
|
||||
assert.Equal(t, "copy-links", copyLinksOption.FlagName("local"))
|
||||
assert.Equal(t, "local-case-insensitive", caseInsensitiveOption.FlagName("local"))
|
||||
}
|
||||
|
||||
func TestOptionEnvVarName(t *testing.T) {
|
||||
assert.Equal(t, "RCLONE_LOCAL_NOUNC", nouncOption.EnvVarName("local"))
|
||||
assert.Equal(t, "RCLONE_LOCAL_COPY_LINKS", copyLinksOption.EnvVarName("local"))
|
||||
assert.Equal(t, "RCLONE_LOCAL_CASE_INSENSITIVE", caseInsensitiveOption.EnvVarName("local"))
|
||||
}
|
||||
|
||||
func TestOptionGetters(t *testing.T) {
|
||||
// Set up env vars
|
||||
envVars := [][2]string{
|
||||
{"RCLONE_CONFIG_LOCAL_POTATO_PIE", "yes"},
|
||||
{"RCLONE_COPY_LINKS", "TRUE"},
|
||||
{"RCLONE_LOCAL_NOUNC", "NOUNC"},
|
||||
}
|
||||
for _, ev := range envVars {
|
||||
assert.NoError(t, os.Setenv(ev[0], ev[1]))
|
||||
}
|
||||
defer func() {
|
||||
for _, ev := range envVars {
|
||||
assert.NoError(t, os.Unsetenv(ev[0]))
|
||||
}
|
||||
}()
|
||||
|
||||
oldConfigFileGet := ConfigFileGet
|
||||
ConfigFileGet = func(section, key string) (string, bool) {
|
||||
if section == "sausage" && key == "key1" {
|
||||
return "value1", true
|
||||
}
|
||||
return "", false
|
||||
}
|
||||
defer func() {
|
||||
ConfigFileGet = oldConfigFileGet
|
||||
}()
|
||||
|
||||
// set up getters
|
||||
|
||||
// A configmap.Getter to read from the environment RCLONE_CONFIG_backend_option_name
|
||||
configEnvVarsGetter := configEnvVars("local")
|
||||
|
||||
// A configmap.Getter to read from the environment RCLONE_option_name
|
||||
optionEnvVarsGetter := optionEnvVars{"local", testOptions}
|
||||
|
||||
// A configmap.Getter to read either the default value or the set
|
||||
// value from the RegInfo.Options
|
||||
regInfoValuesGetterFalse := ®InfoValues{
|
||||
options: testOptions,
|
||||
useDefault: false,
|
||||
}
|
||||
regInfoValuesGetterTrue := ®InfoValues{
|
||||
options: testOptions,
|
||||
useDefault: true,
|
||||
}
|
||||
|
||||
// A configmap.Setter to read from the config file
|
||||
configFileGetter := getConfigFile("sausage")
|
||||
|
||||
for i, test := range []struct {
|
||||
get configmap.Getter
|
||||
key string
|
||||
wantValue string
|
||||
wantOk bool
|
||||
}{
|
||||
{configEnvVarsGetter, "not_found", "", false},
|
||||
{configEnvVarsGetter, "potato_pie", "yes", true},
|
||||
{optionEnvVarsGetter, "not_found", "", false},
|
||||
{optionEnvVarsGetter, "copy_links", "TRUE", true},
|
||||
{optionEnvVarsGetter, "nounc", "NOUNC", true},
|
||||
{optionEnvVarsGetter, "case_insensitive", "", false},
|
||||
{regInfoValuesGetterFalse, "not_found", "", false},
|
||||
{regInfoValuesGetterFalse, "case_insensitive", "true", true},
|
||||
{regInfoValuesGetterFalse, "copy_links", "", false},
|
||||
{regInfoValuesGetterTrue, "not_found", "", false},
|
||||
{regInfoValuesGetterTrue, "case_insensitive", "true", true},
|
||||
{regInfoValuesGetterTrue, "copy_links", "false", true},
|
||||
{configFileGetter, "not_found", "", false},
|
||||
{configFileGetter, "key1", "value1", true},
|
||||
} {
|
||||
what := fmt.Sprintf("%d: %+v: %q", i, test.get, test.key)
|
||||
gotValue, gotOk := test.get.Get(test.key)
|
||||
assert.Equal(t, test.wantValue, gotValue, what)
|
||||
assert.Equal(t, test.wantOk, gotOk, what)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestOptionsNonDefaultRC(t *testing.T) {
|
||||
type cfg struct {
|
||||
X string `config:"x"`
|
||||
Y int `config:"y"`
|
||||
}
|
||||
c := &cfg{X: "a", Y: 6}
|
||||
opts := Options{
|
||||
{Name: "x", Default: "a"}, // at default, should be omitted
|
||||
{Name: "y", Default: 5}, // non-default, should be included
|
||||
}
|
||||
|
||||
got, err := opts.NonDefaultRC(c)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, map[string]any{"Y": 6}, got)
|
||||
}
|
||||
|
||||
func TestOptionsNonDefaultRCMissingKey(t *testing.T) {
|
||||
type cfg struct {
|
||||
X string `config:"x"`
|
||||
}
|
||||
c := &cfg{X: "a"}
|
||||
// Options refers to a key not present in the struct -> expect error
|
||||
opts := Options{{Name: "missing", Default: ""}}
|
||||
_, err := opts.NonDefaultRC(c)
|
||||
assert.ErrorContains(t, err, "not found")
|
||||
}
|
||||
@@ -14,6 +14,7 @@ import (
|
||||
|
||||
"github.com/rclone/rclone/fs"
|
||||
"github.com/rclone/rclone/fs/accounting"
|
||||
"github.com/rclone/rclone/fs/cluster"
|
||||
"github.com/rclone/rclone/fs/filter"
|
||||
"github.com/rclone/rclone/fs/fserrors"
|
||||
"github.com/rclone/rclone/fs/hash"
|
||||
@@ -97,6 +98,7 @@ type syncCopyMove struct {
|
||||
setDirModTimesMaxLevel int // max level of the directories to set
|
||||
modifiedDirs map[string]struct{} // dirs with changed contents (if s.setDirModTimeAfter)
|
||||
allowOverlap bool // whether we allow src and dst to overlap (i.e. for convmv)
|
||||
cluster *cluster.Cluster // non-nil to run sync via cluster
|
||||
}
|
||||
|
||||
// For keeping track of delayed modtime sets
|
||||
@@ -164,6 +166,7 @@ func newSyncCopyMove(ctx context.Context, fdst, fsrc fs.Fs, deleteMode fs.Delete
|
||||
setDirModTimeAfter: !ci.NoUpdateDirModTime && (!copyEmptySrcDirs || fsrc.Features().CanHaveEmptyDirectories && fdst.Features().DirModTimeUpdatesOnWrite),
|
||||
modifiedDirs: make(map[string]struct{}),
|
||||
allowOverlap: allowOverlap,
|
||||
cluster: cluster.GetCluster(ctx),
|
||||
}
|
||||
|
||||
s.logger, s.usingLogger = operations.GetLogger(ctx)
|
||||
@@ -496,14 +499,26 @@ func (s *syncCopyMove) pairCopyOrMove(ctx context.Context, in *pipe, fdst fs.Fs,
|
||||
dst := pair.Dst
|
||||
if s.DoMove {
|
||||
if src != dst {
|
||||
_, err = operations.MoveTransfer(ctx, fdst, dst, src.Remote(), src)
|
||||
if s.cluster != nil {
|
||||
err = s.cluster.Move(ctx, fdst, dst, src.Remote(), src)
|
||||
} else {
|
||||
// src == dst signals delete the src
|
||||
err = operations.DeleteFile(ctx, src)
|
||||
_, err = operations.MoveTransfer(ctx, fdst, dst, src.Remote(), src)
|
||||
}
|
||||
} else {
|
||||
// src == dst signals delete the src
|
||||
if s.cluster != nil {
|
||||
err = s.cluster.DeleteFile(ctx, src)
|
||||
} else {
|
||||
err = operations.DeleteFile(ctx, src)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
if s.cluster != nil {
|
||||
err = s.cluster.Copy(ctx, fdst, dst, src.Remote(), src)
|
||||
} else {
|
||||
_, err = operations.Copy(ctx, fdst, dst, src.Remote(), src)
|
||||
}
|
||||
}
|
||||
s.processError(err)
|
||||
if err != nil {
|
||||
s.logger(ctx, operations.TransferError, src, dst, err)
|
||||
@@ -539,8 +554,13 @@ func (s *syncCopyMove) startTransfers() {
|
||||
// This stops the background transfers
|
||||
func (s *syncCopyMove) stopTransfers() {
|
||||
s.toBeUploaded.Close()
|
||||
fs.Debugf(s.fdst, "Waiting for transfers to finish")
|
||||
s.transfersWg.Wait()
|
||||
fs.Debugf(s.fdst, "Waiting for transfers to finish")
|
||||
if s.cluster != nil {
|
||||
fs.Debugf(s.fdst, "Waiting for cluster transfers to finish")
|
||||
s.processError(s.cluster.Sync(s.ctx))
|
||||
fs.Debugf(s.fdst, "Cluster transfers finished")
|
||||
}
|
||||
}
|
||||
|
||||
// This starts the background renamers.
|
||||
|
||||
Reference in New Issue
Block a user