1
0
mirror of https://github.com/rclone/rclone.git synced 2026-02-01 17:23:39 +00:00

Compare commits

..

10 Commits

Author SHA1 Message Date
Nick Craig-Wood
ab60a77aba cluster: make workers write status and controller read the status
The controller will retry the batches if it loses contact with the
worker.
2025-10-16 15:50:52 +01:00
Nick Craig-Wood
09535a06f7 cluster: add docs 2025-10-16 15:48:32 +01:00
Nick Craig-Wood
173b720173 rcd: obey --cluster if set to run as a cluster server 2025-10-16 15:48:32 +01:00
Nick Craig-Wood
6660e6ec7c sync,move,copy: add --cluster support 2025-10-16 15:48:32 +01:00
Nick Craig-Wood
14c604335e cluster: implement --cluster and related flags FIXME WIP
Needs
- tests
2025-10-16 15:48:32 +01:00
Nick Craig-Wood
bfcb23b7b2 accounting: add AccountReadN for use in cluster 2025-10-16 15:48:32 +01:00
Nick Craig-Wood
46dbdb8cb7 fs: add NonDefaultRC for discovering options in use
This enables us to send rc messages with the config in use.
2025-10-16 15:48:32 +01:00
Nick Craig-Wood
17932fcc38 fs: move tests into correct files 2025-10-16 15:48:32 +01:00
Nick Craig-Wood
77faa787e1 rc: add NewJobFromBytes for reading jobs from non HTTP transactions 2025-10-16 15:48:32 +01:00
Nick Craig-Wood
0701dd55cd rc: add job/batch for sending batches of rc commands to run concurrently 2025-10-16 15:48:32 +01:00
30 changed files with 2441 additions and 655 deletions

View File

@@ -291,7 +291,7 @@ jobs:
README.md
RELEASE.md
CODE_OF_CONDUCT.md
docs/content/{authors,bugs,changelog,docs,downloads,faq,filtering,gui,install,licence,overview,privacy}.md
docs/content/{authors,bugs,changelog,cluster,docs,downloads,faq,filtering,gui,install,licence,overview,privacy}.md
- name: Scan edits of autogenerated files
run: bin/check_autogenerated_edits.py 'origin/${{ github.base_ref }}'

View File

@@ -34,7 +34,6 @@ directories to and from different cloud storage providers.
- China Mobile Ecloud Elastic Object Storage (EOS) [:page_facing_up:](https://rclone.org/s3/#china-mobile-ecloud-eos)
- Cloudflare R2 [:page_facing_up:](https://rclone.org/s3/#cloudflare-r2)
- Citrix ShareFile [:page_facing_up:](https://rclone.org/sharefile/)
- Cubbit DS3 [:page_facing_up:](https://rclone.org/s3/#Cubbit)
- DigitalOcean Spaces [:page_facing_up:](https://rclone.org/s3/#digitalocean-spaces)
- Digi Storage [:page_facing_up:](https://rclone.org/koofr/#digi-storage)
- Dreamhost [:page_facing_up:](https://rclone.org/s3/#dreamhost)
@@ -107,7 +106,6 @@ directories to and from different cloud storage providers.
- Seagate Lyve Cloud [:page_facing_up:](https://rclone.org/s3/#lyve)
- SeaweedFS [:page_facing_up:](https://rclone.org/s3/#seaweedfs)
- Selectel Object Storage [:page_facing_up:](https://rclone.org/s3/#selectel)
- Servercore Object Storage [:page_facing_up:](https://rclone.org/s3/#servercore)
- SFTP [:page_facing_up:](https://rclone.org/sftp/)
- SMB / CIFS [:page_facing_up:](https://rclone.org/smb/)
- Spectra Logic [:page_facing_up:](https://rclone.org/s3/#spectralogic)

View File

@@ -95,9 +95,6 @@ var providerOption = fs.Option{
}, {
Value: "Cloudflare",
Help: "Cloudflare R2 Storage",
}, {
Value: "Cubbit",
Help: "Cubbit DS3 Object Storage",
}, {
Value: "DigitalOcean",
Help: "DigitalOcean Spaces",
@@ -185,9 +182,6 @@ var providerOption = fs.Option{
}, {
Value: "Selectel",
Help: "Selectel Object Storage",
}, {
Value: "Servercore",
Help: "Servercore Object Storage",
}, {
Value: "SpectraLogic",
Help: "Spectra Logic Black Pearl",
@@ -363,14 +357,6 @@ func init() {
Value: "auto",
Help: "R2 buckets are automatically distributed across Cloudflare's data centers for low latency.",
}},
}, {
Name: "region",
Help: "Region to connect to.",
Provider: "Cubbit",
Examples: []fs.OptionExample{{
Value: "eu-west-1",
Help: "Europe West",
}},
}, {
Name: "region",
Help: "Region to connect to for FileLu S5.",
@@ -697,26 +683,6 @@ func init() {
Value: "ru-1",
Help: "St. Petersburg",
}},
}, {
Name: "region",
Help: "Region where your is data stored.\n",
Provider: "Servercore",
Examples: []fs.OptionExample{{
Value: "ru-1",
Help: "St. Petersburg",
}, {
Value: "gis-1",
Help: "Moscow",
}, {
Value: "ru-7",
Help: "Moscow",
}, {
Value: "uz-2",
Help: "Tashkent, Uzbekistan",
}, {
Value: "kz-1",
Help: "Almaty, Kazakhstan",
}},
}, {
Name: "region",
Help: "Region where your data stored.\n",
@@ -748,7 +714,7 @@ func init() {
}, {
Name: "region",
Help: "Region to connect to.\n\nLeave blank if you are using an S3 clone and you don't have a region.",
Provider: "!AWS,Alibaba,ArvanCloud,ChinaMobile,Cloudflare,Cubbit,FlashBlade,FileLu,Hetzner,HuaweiOBS,IDrive,Intercolo,IONOS,Liara,Linode,Magalu,Mega,OVHcloud,Petabox,Qiniu,Rabata,RackCorp,Scaleway,Selectel,Servercore,SpectraLogic,Storj,Synology,TencentCOS,Zata",
Provider: "!AWS,Alibaba,ArvanCloud,ChinaMobile,Cloudflare,FlashBlade,FileLu,Hetzner,HuaweiOBS,IDrive,Intercolo,IONOS,Liara,Linode,Magalu,Mega,OVHcloud,Petabox,Qiniu,Rabata,RackCorp,Scaleway,Selectel,SpectraLogic,Storj,Synology,TencentCOS,Zata",
Examples: []fs.OptionExample{{
Value: "",
Help: "Use this if unsure.\nWill use v4 signatures and an empty region.",
@@ -949,14 +915,6 @@ func init() {
Value: "eos-anhui-1.cmecloud.cn",
Help: "Anhui China (Huainan)",
}},
}, {
Name: "endpoint",
Help: "Endpoint for Cubbit DS3 Object Storage.",
Provider: "Cubbit",
Examples: []fs.OptionExample{{
Value: "s3.cubbit.eu",
Help: "Cubbit DS3 Object Storage endpoint",
}},
}, {
Name: "endpoint",
Help: "Endpoint for FileLu S5 Object Storage.\nRequired when using FileLu S5.",
@@ -1559,26 +1517,6 @@ func init() {
Value: "s3.ru-1.storage.selcloud.ru",
Help: "Saint Petersburg",
}},
}, {
Name: "endpoint",
Help: "Endpoint for Servercore Object Storage.",
Provider: "Servercore",
Examples: []fs.OptionExample{{
Value: "s3.ru-1.storage.selcloud.ru",
Help: "Saint Petersburg",
}, {
Value: "s3.gis-1.storage.selcloud.ru",
Help: "Moscow",
}, {
Value: "s3.ru-7.storage.selcloud.ru",
Help: "Moscow",
}, {
Value: "s3.uz-2.srvstorage.uz",
Help: "Tashkent, Uzbekistan",
}, {
Value: "s3.kz-1.srvstorage.kz",
Help: "Almaty, Kazakhstan",
}},
}, {
Name: "endpoint",
Help: "Endpoint for Scaleway Object Storage.",
@@ -1709,7 +1647,7 @@ func init() {
}, {
Name: "endpoint",
Help: "Endpoint for S3 API.\n\nRequired when using an S3 clone.",
Provider: "!AWS,Alibaba,ArvanCloud,ChinaMobile,Cubbit,GCS,Hetzner,HuaweiOBS,IBMCOS,IDrive,Intercolo,IONOS,Liara,Linode,LyveCloud,Magalu,OVHcloud,Petabox,Qiniu,Rabata,RackCorp,Scaleway,Selectel,Servercore,StackPath,Storj,Synology,TencentCOS,Zata",
Provider: "!AWS,Alibaba,ArvanCloud,ChinaMobile,GCS,Hetzner,HuaweiOBS,IBMCOS,IDrive,Intercolo,IONOS,Liara,Linode,LyveCloud,Magalu,OVHcloud,Petabox,Qiniu,Rabata,RackCorp,Scaleway,Selectel,StackPath,Storj,Synology,TencentCOS,Zata",
Examples: []fs.OptionExample{{
Value: "objects-us-east-1.dream.io",
Help: "Dream Objects endpoint",
@@ -2258,7 +2196,7 @@ func init() {
}, {
Name: "location_constraint",
Help: "Location constraint - must be set to match the Region.\n\nLeave blank if not sure. Used when creating buckets only.",
Provider: "!AWS,Alibaba,ArvanCloud,ChinaMobile,Cloudflare,Cubbit,FlashBlade,FileLu,HuaweiOBS,IBMCOS,IDrive,Intercolo,IONOS,Leviia,Liara,Linode,Magalu,Mega,Outscale,OVHcloud,Petabox,Qiniu,Rabata,RackCorp,Scaleway,Selectel,Servercore,SpectraLogic,StackPath,Storj,TencentCOS",
Provider: "!AWS,Alibaba,ArvanCloud,ChinaMobile,Cloudflare,FlashBlade,FileLu,HuaweiOBS,IBMCOS,IDrive,Intercolo,IONOS,Leviia,Liara,Linode,Magalu,Mega,Outscale,OVHcloud,Petabox,Qiniu,Rabata,RackCorp,Scaleway,Selectel,SpectraLogic,StackPath,Storj,TencentCOS",
}, {
Name: "acl",
Help: `Canned ACL used when creating buckets and storing or copying objects.
@@ -2273,7 +2211,7 @@ doesn't copy the ACL from the source but rather writes a fresh one.
If the acl is an empty string then no X-Amz-Acl: header is added and
the default (private) will be used.
`,
Provider: "!Cloudflare,FlashBlade,Mega,Rabata,Selectel,Servercore,SpectraLogic,Storj,Synology",
Provider: "!Cloudflare,FlashBlade,Mega,Rabata,Selectel,SpectraLogic,Storj,Synology",
Examples: []fs.OptionExample{{
Value: "default",
Help: "Owner gets Full_CONTROL.\nNo one else has access rights (default).",
@@ -3847,9 +3785,6 @@ func setQuirks(opt *Options) {
case "Cloudflare":
virtualHostStyle = false
useMultipartEtag = false // currently multipart Etags are random
case "Cubbit":
// no quirks
useMultipartEtag = false // Cubbit calculates multipart Etags differently from AWS
case "ArvanCloud":
listObjectsV2 = false
virtualHostStyle = false
@@ -3949,8 +3884,6 @@ func setQuirks(opt *Options) {
useAlreadyExists = true
case "Selectel":
urlEncodeListings = false
case "Servercore":
urlEncodeListings = false
case "SeaweedFS":
listObjectsV2 = false // untested
virtualHostStyle = false

View File

@@ -14,4 +14,4 @@ if [ -z "$globs" ]; then
exit 1
fi
docker run --rm -v $PWD:/workdir --user $(id -u):$(id -g) davidanson/markdownlint-cli2 $globs
docker run -v $PWD:/workdir --user $(id -u):$(id -g) davidanson/markdownlint-cli2 $globs

View File

@@ -23,6 +23,7 @@ import (
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/accounting"
"github.com/rclone/rclone/fs/cache"
"github.com/rclone/rclone/fs/cluster"
"github.com/rclone/rclone/fs/config/configfile"
"github.com/rclone/rclone/fs/config/configflags"
"github.com/rclone/rclone/fs/config/flags"
@@ -481,6 +482,22 @@ func initConfig() {
}
})
}
// Run as a cluster worker if configured, otherwise ignoring
// the command given on the command line
if ci.Cluster != "" {
if ci.ClusterID == "" || ci.ClusterID == "0" {
fs.Infof(nil, "Running in cluster mode %q as controller", ci.ClusterID)
} else {
fs.Infof(nil, "Running in cluster mode %q as worker with id %q", ci.ClusterID, ci.ClusterID)
worker, err := cluster.NewWorker(ctx)
if err != nil || worker == nil {
fs.Fatalf(nil, "Failed to start cluster worker: %v", err)
}
// Do not continue with the main thread
select {}
}
}
}
func resolveExitCode(err error) {

View File

@@ -7,6 +7,7 @@ import (
"github.com/rclone/rclone/cmd"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/cluster"
"github.com/rclone/rclone/fs/rc"
"github.com/rclone/rclone/fs/rc/rcflags"
"github.com/rclone/rclone/fs/rc/rcserver"
@@ -38,6 +39,8 @@ See the [rc documentation](/rc/) for more info on the rc flags.
"groups": "RC",
},
Run: func(command *cobra.Command, args []string) {
ctx := context.Background()
cmd.CheckArgs(0, 1, command, args)
if rc.Opt.Enabled {
fs.Fatalf(nil, "Don't supply --rc flag when using rcd")
@@ -49,6 +52,12 @@ See the [rc documentation](/rc/) for more info on the rc flags.
rc.Opt.Files = args[0]
}
// Start the cluster worker if configured
_, err := cluster.NewWorker(ctx)
if err != nil {
fs.Fatalf(nil, "Failed to start cluster worker: %v", err)
}
s, err := rcserver.Start(context.Background(), &rc.Opt)
if err != nil {
fs.Fatalf(nil, "Failed to start remote control: %v", err)

View File

@@ -117,7 +117,6 @@ WebDAV or S3, that work out of the box.)
{{< provider name="Citrix ShareFile" home="http://sharefile.com/" config="/sharefile/" >}}
{{< provider name="Cloudflare R2" home="https://blog.cloudflare.com/r2-open-beta/" config="/s3/#cloudflare-r2" >}}
{{< provider name="Cloudinary" home="https://cloudinary.com/" config="/cloudinary/" >}}
{{< provider name="Cubbit DS3" home="https://cubbit.io/ds3-cloud" config="/s3/#Cubbit" >}}
{{< provider name="DigitalOcean Spaces" home="https://www.digitalocean.com/products/object-storage/" config="/s3/#digitalocean-spaces" >}}
{{< provider name="Digi Storage" home="https://storage.rcs-rds.ro/" config="/koofr/#digi-storage" >}}
{{< provider name="Dreamhost" home="https://www.dreamhost.com/cloud/storage/" config="/s3/#dreamhost" >}}
@@ -193,7 +192,6 @@ WebDAV or S3, that work out of the box.)
{{< provider name="Seagate Lyve Cloud" home="https://www.seagate.com/gb/en/services/cloud/storage/" config="/s3/#lyve" >}}
{{< provider name="SeaweedFS" home="https://github.com/chrislusf/seaweedfs/" config="/s3/#seaweedfs" >}}
{{< provider name="Selectel" home="https://selectel.ru/services/cloud/storage/" config="/s3/#selectel" >}}
{{< provider name="Servercore Object Storage" home="https://servercore.com/services/object-storage/" config="/s3/#servercore" >}}
{{< provider name="SFTP" home="https://en.wikipedia.org/wiki/SSH_File_Transfer_Protocol" config="/sftp/" >}}
{{< provider name="Sia" home="https://sia.tech/" config="/sia/" >}}
{{< provider name="SMB / CIFS" home="https://en.wikipedia.org/wiki/Server_Message_Block" config="/smb/" >}}

View File

@@ -1021,4 +1021,3 @@ put them back in again.` >}}
- Andrew Ruthven <andrew@etc.gen.nz>
- spiffytech <git@spiffy.tech>
- Dulani Woods <Dulani@gmail.com>
- Marco Ferretti <mferretti93@gmail.com>

217
docs/content/cluster.md Normal file
View File

@@ -0,0 +1,217 @@
---
title: "Cluster"
description: "Clustering rclone"
versionIntroduced: "v1.72"
---
# Cluster
Rclone has a cluster mode invoked with the `--cluster` flag. This
enables a group of rclone instances to work together on doing a sync.
This is controlled by a group of flags starting with `--cluster-` and
enabled with the `--cluster` flag.
```text
--cluster string Enable cluster mode with remote to use as shared storage
--cluster-batch-files int Max number of files for a cluster batch (default 1000)
--cluster-batch-size SizeSuffix Max size of files for a cluster batch (default 1Ti)
--cluster-cleanup ClusterCleanup Control which cluster files get cleaned up (default full)
--cluster-id string Set to an ID for the cluster. An ID of 0 or empty becomes the controller
--cluster-quit-workers Set to cause the controller to quit the workers when it finished
```
The command might look something like this which is a normal rclone
command but with a new `--cluster` flag which points at an rclone
remote defining the cluster storage. This is the signal to rclone that
it should engage the cluster mode with a controller and workers.
```sh
rclone copy source: destination: --flags --cluster /work
rclone copy source: destination: --flags --cluster s3:bucket
```
This works only with the `rclone sync`, `copy` and `move` commands.
If the remote specified by the `--cluster` command is inside the
`source:` or `destination:` it must be excluded with the filter flags.
Any rclone remotes used in the transfer must be defined in all cluster
nodes. Defining remotes with connection strings will get around that
problem.
## Terminology
The cluster has two logical groups, the controller and the workers.
There is one controller and many workers.
The controller and the workers will communicate with each other by
creating files in the remote pointed to by the `--cluster` flag. This
could be for example an S3 bucket or a Kubernetes PVC.
The files are JSON serialized rc commands. Multiple commands are sent
using `rc/batch`. The commands flow `pending``processing``done`
`finished`
```text
└── queue
├── pending ← pending task files created by the controller
├── processing ← claimed tasks being executed by a worker
├── done ← finished tasks awaiting the controller to read the result
└── finished ← completed task files
```
The cluster can be set up in two ways as a persistent cluster or as a
transient cluster.
### Persistent cluster
Run a cluster of workers using
```sh
rclone rcd --cluster /work
```
Then run rclone commands when required on the cluster:
```sh
rclone copy source: destination: --flags --cluster /work
```
In this mode there can be many rclone commands executing at once.
### Transient cluster
Run many copies of rclone simultaneously, for example in a Kubernetes
indexed job.
The rclone with `--cluster-id 0` becomes the controller and the others
become the workers. For a Kubernetes indexed job, setting
`--cluster-id $(JOB_COMPLETION_INDEX)` would work well.
Add the `--cluster-quit-workers` flag - this will cause the controller
to make sure the workers exit when it has finished.
All instances of rclone run a command like this so the whole cluster
can only run one rclone command:
```sh
rclone copy source: destination: --flags --cluster /work --cluster-id $(JOB_COMPLETION_INDEX) --cluster-quit-workers
```
## Controller
The controller runs the sync and work distribution.
- It does the listing of the source and destination directories
comparing files in order to find files which need to be transferred.
- Files which need to be transferred are then batched into jobs of
`--cluster-batch-files` files to transfer or `--cluster-batch-size`
max size in `queue/pending` for the workers to pick up.
- It watches `queue/done` for finished jobs and updates the transfer
statistics and logs any errors, accordingly moving the job to
`queue/finished`.
Once the sync is complete, if `--cluster-quit-workers` is set, then it
sends the workers a special command which causes them all to exit.
The controller only sends transfer jobs to the workers. All the other
tasks (eg listing, comparing) are done by the controller. The
controller does not execute any transfer tasks itself.
The controller reads worker status as written to `queue/status` and
will detect workers which have stopped. If it detects a failed worker
then it will re-assign any outstanding work.
## Workers
The workers job is entirely to act as API endpoints that receive their
work via files in `/work`. Then
- Read work in `queue/pending`
- Attempt to rename into `queue/processing`
- If the cluster work directory supports atomic renames, then use
those, otherwise read the file, write the copy, delete the original.
If the delete fails then the rename was not successful (possible on
s3 backends).
- If successful then do that item of work. If not successful another
worker got there first and sleep for a bit then retry.
- After the copy is complete then remove the `queue/processing` file
or rename it into `queue/finished` if the `--cluster-cleanup` flag
allows it.
- Repeat
Every second the worker will write a status file in `queue/status` to
be read by the controller.
## Layout of the work directory
The format of the files in this directory may change without notice
but the layout is documented here as it can help debugging.
```text
/work - root of the work directory
└── queue - files to control the queue
├── done - job files that are finished and read
├── finished - job files that are finished but not yet read
├── pending - job files that are not started yet
├── processing - job files that are running
└── status - worker status files
```
If debugging use `--cluster-cleanup none` to leave the completed files
in the directory layout.
## Flags
### --cluster string
This enables the cluster mode. Without this flag, all the other
cluster flags are ignored. This should be given a remote which can be
a local directory, eg `/work` or a remote directory, eg `s3:bucket`.
### --cluster-batch-files int
This controls the number of files copied in a cluster batch. Setting
this larger may be more efficient but it means the statistics will be
less accurate on the controller (default 1000).
### --cluster-batch-size SizeSuffix
This controls the total size of files in a cluster batch. If the size
of the files in a batch exceeds this number then the batch will be
sent to the workers. Setting this larger may be more efficient but it
means the statistics will be less accurate on the controller. (default
1TiB)
### --cluster-cleanup ClusterCleanup
Controls which cluster files get cleaned up.
- `full` - clean all work files (default)
- `completed` - clean completed work files but leave the errors and status
- `none` - leave all the file (useful for debugging)
### --cluster-id string
Set an ID for the rclone instance. This can be a string or a number.
An ID of 0 will become the controller otherwise the instance will
become a worker. If this flag isn't supplied or the value is empty,
then a random string will be used instead.
### --cluster-quit-workers
If this flag is set, then when the controller finishes its sync task
it will quit all the workers before it exits.
## Not implemented
Here are some features from the original design which are not
implemented yet:
- the controller will not notice if workers die or fail to complete
their tasks
- the controller does not re-assign the workers work if necessary
- the controller does not restart the sync
- the workers do not write any status files (but the stats are
correctly accounted)

View File

@@ -852,32 +852,6 @@ the binary units, e.g. 1, 2\*\*10, 2\*\*20, 2\*\*30 respectively.
See also [--human-readable](#human-readable).
### --assume-listings-sorted
This flag can be used when the source and destination backends are
guaranteed to return the items in the same sorted order and in that
case it will speed up the sync.
Not all backends are guaranteed to return sorted entries (eg local)
but s3 should, so an s3 to s3 sync could benefit from this flag.
If rclone finds an out of order directory entry then it will cancel
the sync with the error:
```console
out of order listing in source (remote:dir)
```
In this case you should remove the `--assume-listings-sorted` flag.
If you are using `--assume-listings-sorted` then rclone will assume
`--no-unicode-normalization` and it will compare file names in a case
sensitive way.
Normally sorting directory entries is not a bottleneck, but it can
become so with syncs of millions of items in a single directory as the
sync will not start until the directory listing is complete.
## Main options
### --backup-dir string
@@ -3341,6 +3315,15 @@ For the remote control options and for instructions on how to remote control rcl
See [the remote control section](/rc/).
## Cluster
For the cluster options and for instructions on how to cluster rclone:
- `--cluster`
- Anything starting with `--cluster-`
See the [cluster section](/cluster/).
## Logging
rclone has 4 levels of logging, `ERROR`, `NOTICE`, `INFO` and `DEBUG`.

View File

@@ -18,7 +18,6 @@ The S3 backend can be used with a number of different providers:
{{< provider name="China Mobile Ecloud Elastic Object Storage (EOS)" home="https://ecloud.10086.cn/home/product-introduction/eos/" config="/s3/#china-mobile-ecloud-eos" >}}
{{< provider name="Cloudflare R2" home="https://blog.cloudflare.com/r2-open-beta/" config="/s3/#cloudflare-r2" >}}
{{< provider name="Arvan Cloud Object Storage (AOS)" home="https://www.arvancloud.com/en/products/cloud-storage" config="/s3/#arvan-cloud" >}}
{{< provider name="Cubbit DS3" home="https://cubbit.io/ds3-cloud" config="/s3/#Cubbit" >}}
{{< provider name="DigitalOcean Spaces" home="https://www.digitalocean.com/products/object-storage/" config="/s3/#digitalocean-spaces" >}}
{{< provider name="Dreamhost" home="https://www.dreamhost.com/cloud/storage/" config="/s3/#dreamhost" >}}
{{< provider name="Exaba" home="https://exaba.com/" config="/s3/#exaba" >}}
@@ -48,7 +47,6 @@ The S3 backend can be used with a number of different providers:
{{< provider name="Seagate Lyve Cloud" home="https://www.seagate.com/gb/en/services/cloud/storage/" config="/s3/#lyve" >}}
{{< provider name="SeaweedFS" home="https://github.com/chrislusf/seaweedfs/" config="/s3/#seaweedfs" >}}
{{< provider name="Selectel" home="https://selectel.ru/services/cloud/storage/" config="/s3/#selectel" >}}
{{< provider name="Servercore Object Storage" home="https://servercore.com/services/object-storage/" config="/s3/#servercore" >}}
{{< provider name="Spectra Logic" home="https://spectralogic.com/blackpearl-nearline-object-gateway" config="/s3/#spectralogic" >}}
{{< provider name="StackPath" home="https://www.stackpath.com/products/object-storage/" config="/s3/#stackpath" >}}
{{< provider name="Storj" home="https://storj.io/" config="/s3/#storj" >}}
@@ -3236,47 +3234,6 @@ does. If this is causing a problem then upload the files with
A consequence of this is that `Content-Encoding: gzip` will never
appear in the metadata on Cloudflare.
### Cubbit DS3 {#Cubbit}
[Cubbit Object Storage](https://www.cubbit.io/ds3-cloud) is a geo-distributed cloud object storage platform.
To connect to Cubbit DS3 you will need an access key and secret key pair. You can follow this [guide](https://docs.cubbit.io/getting-started/quickstart#api-keys) to retrieve these keys. They will be needed when prompted by `rclone config`.
Default region will correspond to `eu-west-1` and the endpoint has to be specified as `s3.cubbit.eu`.
Going through the whole process of creating a new remote by running `rclone config`, each prompt should be answered as shown below:
```
name> cubbit-ds3 (or any name you like)
Storage> s3
provider> Cubbit
env_auth> false
access_key_id> YOUR_ACCESS_KEY
secret_access_key> YOUR_SECRET_KEY
region> eu-west-1 (or leave empty)
endpoint> s3.cubbit.eu
acl>
```
The resulting configuration file should look like:
```
[cubbit-ds3]
type = s3
provider = Cubbit
access_key_id = ACCESS_KEY
secret_access_key = SECRET_KEY
region = eu-west-1
endpoint = s3.cubbit.eu
```
You can then start using Cubbit DS3 with rclone. For example, to create a new bucket and copy files into it, you can run:
```
rclone mkdir cubbit-ds3:my-bucket
rclone copy /path/to/files cubbit-ds3:my-bucket
```
### DigitalOcean Spaces
[Spaces](https://www.digitalocean.com/products/object-storage/) is an [S3-interoperable](https://developers.digitalocean.com/documentation/spaces/) object storage service from cloud provider DigitalOcean.
@@ -6447,117 +6404,6 @@ region = ru-1
endpoint = s3.ru-1.storage.selcloud.ru
```
### Servercore {#servercore}
[Servercore Object Storage](https://servercore.io/object-storage/) is an S3
compatible object storage system that provides scalable and secure storage
solutions for businesses of all sizes.
rclone config example:
```
No remotes found, make a new one\?
n) New remote
s) Set configuration password
q) Quit config
n/s/q> n
Enter name for new remote.
name> servercore
Option Storage.
Type of storage to configure.
Choose a number from below, or type in your own value.
[snip]
XX / Amazon S3 Compliant Storage Providers including ..., Servercore, ...
\ (s3)
[snip]
Storage> s3
Option provider.
Choose your S3 provider.
Choose a number from below, or type in your own value.
Press Enter to leave empty.
[snip]
XX / Servercore Object Storage
\ (Servercore)
[snip]
provider> Servercore
Option env_auth.
Get AWS credentials from runtime (environment variables or EC2/ECS meta data if no env vars).
Only applies if access_key_id and secret_access_key is blank.
Choose a number from below, or type in your own boolean value (true or false).
Press Enter for the default (false).
1 / Enter AWS credentials in the next step.
\ (false)
2 / Get AWS credentials from the environment (env vars or IAM).
\ (true)
env_auth> 1
Option access_key_id.
AWS Access Key ID.
Leave blank for anonymous access or runtime credentials.
Enter a value. Press Enter to leave empty.
access_key_id> ACCESS_KEY
Option secret_access_key.
AWS Secret Access Key (password).
Leave blank for anonymous access or runtime credentials.
Enter a value. Press Enter to leave empty.
secret_access_key> SECRET_ACCESS_KEY
Option region.
Region where your is data stored.
Choose a number from below, or type in your own value.
Press Enter to leave empty.
1 / St. Petersburg
\ (ru-1)
2 / Moscow
\ (gis-1)
3 / Moscow
\ (ru-7)
4 / Tashkent, Uzbekistan
\ (uz-2)
5 / Almaty, Kazakhstan
\ (kz-1)
region> 1
Option endpoint.
Endpoint for Servercore Object Storage.
Choose a number from below, or type in your own value.
Press Enter to leave empty.
1 / Saint Petersburg
\ (s3.ru-1.storage.selcloud.ru)
2 / Moscow
\ (s3.gis-1.storage.selcloud.ru)
3 / Moscow
\ (s3.ru-7.storage.selcloud.ru)
4 / Tashkent, Uzbekistan
\ (s3.uz-2.srvstorage.uz)
5 / Almaty, Kazakhstan
\ (s3.kz-1.srvstorage.kz)
endpoint> 1
Edit advanced config?
y) Yes
n) No (default)
y/n> n
Configuration complete.
Options:
- type: s3
- provider: Servercore
- access_key_id: ACCESS_KEY
- secret_access_key: SECRET_ACCESS_KEY
- region: ru-1
- endpoint: s3.ru-1.storage.selcloud.ru
Keep this "servercore" remote?
y) Yes this is OK (default)
e) Edit this remote
d) Delete this remote
y/e/d> y
```
### Spectra Logic {#spectralogic}
[Spectra Logic](https://www.spectralogic.com/blackpearl-nearline-object-gateway)

View File

@@ -59,6 +59,7 @@ Thank you very much to our sponsors:
<!-- markdownlint-capture -->
<!-- markdownlint-disable line-length no-bare-urls -->
{{< sponsor src="/img/logos/backblaze.svg" width="300" height="200" title="Visit our sponsor Backblaze" link="https://www.backblaze.com/cloud-storage-rclonead?utm_source=rclone&utm_medium=paid&utm_campaign=rclone-website-20250715">}}
{{< sponsor src="/img/logos/idrive_e2.svg" width="300" height="200" title="Visit our sponsor IDrive e2" link="https://www.idrive.com/e2/?refer=rclone">}}
{{< sponsor src="/img/logos/filescom-enterprise-grade-workflows.png" width="300" height="200" title="Start Your Free Trial Today" link="https://files.com/?utm_source=rclone&utm_medium=referral&utm_campaign=banner&utm_term=rclone">}}
{{< sponsor src="/img/logos/mega-s4.svg" width="300" height="200" title="MEGA S4: New S3 compatible object storage. High scale. Low cost. Free egress." link="https://mega.io/objectstorage?utm_source=rclone&utm_medium=referral&utm_campaign=rclone-mega-s4&mct=rclonepromo">}}
@@ -66,7 +67,7 @@ Thank you very much to our sponsors:
{{< sponsor src="/img/logos/route4me.svg" width="400" height="200" title="Visit our sponsor Route4Me" link="https://route4me.com/">}}
{{< sponsor src="/img/logos/rcloneview.svg" width="300" height="200" title="Visit our sponsor RcloneView" link="https://rcloneview.com/">}}
{{< sponsor src="/img/logos/rcloneui.svg" width="300" height="200" title="Visit our sponsor RcloneUI" link="https://github.com/rclone-ui/rclone-ui">}}
{{< sponsor src="/img/logos/filelu-rclone.svg" width="300" height="200" title="Visit our sponsor FileLu" link="https://filelu.com/">}}
{{< sponsor src="/img/logos/filelu-rclone.svg" width="250" height="200" title="Visit our sponsor FileLu" link="https://filelu.com/">}}
{{< sponsor src="/img/logos/torbox.png" width="200" height="200" title="Visit our sponsor TORBOX" link="https://www.torbox.app/">}}
<!-- markdownlint-restore -->

View File

@@ -9,6 +9,15 @@
</div>
{{end}}
<div class="card">
<div class="card-header">
Platinum Sponsor
</div>
<div class="card-body">
<a href="https://www.backblaze.com/cloud-storage-rclonead?utm_source=rclone&utm_medium=paid&utm_campaign=rclone-website-20250715" target="_blank" rel="noopener" title="Visit rclone's sponsor Backblaze"><img src="/img/logos/backblaze.svg"></a><br />
</div>
</div>
<div class="card">
<div class="card-header">
Gold Sponsor

View File

@@ -20,6 +20,7 @@
<a class="dropdown-item" href="/gui/"><i class="fa fa-book fa-fw"></i> GUI</a>
<a class="dropdown-item" href="/rc/"><i class="fa fa-book fa-fw"></i> Remote Control</a>
<a class="dropdown-item" href="/remote_setup/"><i class="fa fa-book fa-fw"></i> Remote Setup</a>
<a class="dropdown-item" href="/cluster/"><i class="fa fa-book fa-fw"></i> Cluster</a>
<a class="dropdown-item" href="/changelog/"><i class="fa fa-book fa-fw"></i> Changelog</a>
<a class="dropdown-item" href="/bugs/"><i class="fa fa-book fa-fw"></i> Bugs</a>
<a class="dropdown-item" href="/faq/"><i class="fa fa-book fa-fw"></i> FAQ</a>

View File

@@ -82,7 +82,7 @@ type accountValues struct {
max int64 // if >=0 the max number of bytes to transfer
start time.Time // Start time of first read
lpTime time.Time // Time of last average measurement
lpBytes int // Number of bytes read since last measurement
lpBytes int64 // Number of bytes read since last measurement
avg float64 // Moving average of last few measurements in Byte/s
}
@@ -344,15 +344,20 @@ func (acc *Account) limitPerFileBandwidth(n int) {
}
}
// Account the read and limit bandwidth
func (acc *Account) accountRead(n int) {
// Account the read
func (acc *Account) accountReadN(n int64) {
// Update Stats
acc.values.mu.Lock()
acc.values.lpBytes += n
acc.values.bytes += int64(n)
acc.values.bytes += n
acc.values.mu.Unlock()
acc.stats.Bytes(int64(n))
acc.stats.Bytes(n)
}
// Account the read and limit bandwidth
func (acc *Account) accountRead(n int) {
acc.accountReadN(int64(n))
TokenBucket.LimitBandwidth(TokenBucketSlotAccounting, n)
acc.limitPerFileBandwidth(n)
@@ -427,6 +432,15 @@ func (acc *Account) AccountRead(n int) (err error) {
return err
}
// AccountReadN account having read n bytes
//
// Does not obey any transfer limits, bandwidth limits, etc.
func (acc *Account) AccountReadN(n int64) {
acc.mu.Lock()
defer acc.mu.Unlock()
acc.accountReadN(n)
}
// Close the object
func (acc *Account) Close() error {
acc.mu.Lock()

View File

@@ -100,7 +100,7 @@ func TestAccountRead(t *testing.T) {
assert.True(t, acc.values.start.IsZero())
acc.values.mu.Lock()
assert.Equal(t, 0, acc.values.lpBytes)
assert.Equal(t, int64(0), acc.values.lpBytes)
assert.Equal(t, int64(0), acc.values.bytes)
acc.values.mu.Unlock()
assert.Equal(t, int64(0), stats.bytes)
@@ -113,7 +113,7 @@ func TestAccountRead(t *testing.T) {
assert.False(t, acc.values.start.IsZero())
acc.values.mu.Lock()
assert.Equal(t, 2, acc.values.lpBytes)
assert.Equal(t, int64(2), acc.values.lpBytes)
assert.Equal(t, int64(2), acc.values.bytes)
acc.values.mu.Unlock()
assert.Equal(t, int64(2), stats.bytes)
@@ -145,7 +145,7 @@ func testAccountWriteTo(t *testing.T, withBuffer bool) {
assert.True(t, acc.values.start.IsZero())
acc.values.mu.Lock()
assert.Equal(t, 0, acc.values.lpBytes)
assert.Equal(t, int64(0), acc.values.lpBytes)
assert.Equal(t, int64(0), acc.values.bytes)
acc.values.mu.Unlock()
assert.Equal(t, int64(0), stats.bytes)
@@ -159,7 +159,7 @@ func testAccountWriteTo(t *testing.T, withBuffer bool) {
assert.False(t, acc.values.start.IsZero())
acc.values.mu.Lock()
assert.Equal(t, len(buf), acc.values.lpBytes)
assert.Equal(t, int64(len(buf)), acc.values.lpBytes)
assert.Equal(t, int64(len(buf)), acc.values.bytes)
acc.values.mu.Unlock()
assert.Equal(t, int64(len(buf)), stats.bytes)

598
fs/cluster/cluster.go Normal file
View File

@@ -0,0 +1,598 @@
// Package cluster implements a machanism to distribute work over a
// cluster of rclone instances.
package cluster
import (
"context"
"encoding/json"
"errors"
"fmt"
"path"
"strings"
"sync"
"time"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/accounting"
"github.com/rclone/rclone/fs/filter"
"github.com/rclone/rclone/fs/operations"
"github.com/rclone/rclone/fs/rc"
"github.com/rclone/rclone/lib/atexit"
"github.com/rclone/rclone/lib/errcount"
"golang.org/x/sync/errgroup"
)
// ErrClusterNotConfigured is returned from creation functions.
var ErrClusterNotConfigured = errors.New("cluster is not configured")
// If we don't hear from workers in this time we assume they have timed out
// and re-assign their jobs.
const workerTimeout = 2 * time.Second
// Cluster describes the workings of the current cluster.
type Cluster struct {
jobs *Jobs
id string
batchFiles int
batchSize fs.SizeSuffix
cleanup fs.ClusterCleanup // how we cleanup cluster files
_config rc.Params // for rc
_filter rc.Params // for rc
cancel func() // stop bg job
wg sync.WaitGroup // bg job finished
quit chan struct{} // signal graceful stop
sync chan chan<- struct{} // sync the current jobs
quitWorkers bool // if set, send workers a stop signal on Shutdown
workers map[string]*WorkerStatus // worker ID => status
deadWorkers map[string]struct{}
mu sync.Mutex
currentBatch Batch
inflight map[string]Batch
shutdown bool
}
// Batch is a collection of rc tasks to do
type Batch struct {
size int64 // size in batch
Path string `json:"_path"`
Inputs []rc.Params `json:"inputs"`
Config rc.Params `json:"_config,omitempty"`
Filter rc.Params `json:"_filter,omitempty"`
trs []*accounting.Transfer // transfer for each Input
sizes []int64 // sizes for each Input
}
// BatchResult has the results of the batch as received.
type BatchResult struct {
Results []rc.Params `json:"results"`
// Error returns
Error string `json:"error"`
Status int `json:"status"`
Input string `json:"input"`
Path string `json:"path"`
}
// NewCluster creates a new cluster from the config in ctx.
//
// It may return nil for no cluster is configured.
func NewCluster(ctx context.Context) (*Cluster, error) {
ci := fs.GetConfig(ctx)
if ci.Cluster == "" {
return nil, nil
}
jobs, err := NewJobs(ctx)
if err != nil {
return nil, err
}
c := &Cluster{
jobs: jobs,
id: ci.ClusterID,
quitWorkers: ci.ClusterQuitWorkers,
batchFiles: ci.ClusterBatchFiles,
batchSize: ci.ClusterBatchSize,
cleanup: ci.ClusterCleanup,
quit: make(chan struct{}),
sync: make(chan chan<- struct{}),
inflight: make(map[string]Batch),
workers: make(map[string]*WorkerStatus),
deadWorkers: make(map[string]struct{}),
}
// Configure _config
configParams, err := fs.ConfigOptionsInfo.NonDefaultRC(ci)
if err != nil {
return nil, fmt.Errorf("failed to read global config: %w", err)
}
// Remove any global cluster config
for k := range configParams {
if strings.HasPrefix(k, "Cluster") {
delete(configParams, k)
}
}
if len(configParams) != 0 {
fs.Debugf(nil, "Overridden global config: %#v", configParams)
}
c._config = rc.Params(configParams)
// Configure _filter
fi := filter.GetConfig(ctx)
if !fi.InActive() {
filterParams, err := filter.OptionsInfo.NonDefaultRC(fi)
if err != nil {
return nil, fmt.Errorf("failed to read filter config: %w", err)
}
fs.Debugf(nil, "Overridden filter config: %#v", filterParams)
c._filter = rc.Params(filterParams)
}
err = c.jobs.createDirectoryStructure(ctx)
if err != nil {
return nil, err
}
// Start the background worker
bgCtx, cancel := context.WithCancel(context.Background())
c.cancel = cancel
c.wg.Add(1)
go c.run(bgCtx)
fs.Logf(c.jobs.f, "Started cluster master")
return c, nil
}
var (
globalClusterMu sync.Mutex
globalCluster *Cluster
)
// GetCluster starts or gets a cluster.
//
// If no cluster is configured or the cluster can't be started then it
// returns nil.
func GetCluster(ctx context.Context) *Cluster {
globalClusterMu.Lock()
defer globalClusterMu.Unlock()
if globalCluster != nil {
return globalCluster
}
cluster, err := NewCluster(ctx)
if err != nil {
fs.Errorf(nil, "Failed to start cluster: %v", err)
return nil
}
if cluster != nil {
atexit.Register(func() {
err := cluster.Shutdown(context.Background())
if err != nil {
fs.Errorf(nil, "Failed to shutdown cluster: %v", err)
}
})
}
globalCluster = cluster
return globalCluster
}
// Send the current batch for processing
//
// call with c.mu held
func (c *Cluster) sendBatch(ctx context.Context) (err error) {
// Do nothing if the batch is empty
if len(c.currentBatch.Inputs) == 0 {
return nil
}
// Get and reset current batch
b := c.currentBatch
c.currentBatch = Batch{}
b.Path = "job/batch"
b.Config = c._config
b.Filter = c._filter
// write the pending job
name, err := c.jobs.writeJob(ctx, clusterPending, &b)
if err != nil {
return err
}
fs.Infof(name, "written cluster batch file")
c.inflight[name] = b
return nil
}
// Add the command to the current batch
func (c *Cluster) addToBatch(ctx context.Context, obj fs.Object, in rc.Params, size int64, tr *accounting.Transfer) (err error) {
c.mu.Lock()
defer c.mu.Unlock()
if c.shutdown {
return errors.New("internal error: can't add file to Shutdown cluster")
}
c.currentBatch.Inputs = append(c.currentBatch.Inputs, in)
c.currentBatch.size += size
c.currentBatch.trs = append(c.currentBatch.trs, tr)
c.currentBatch.sizes = append(c.currentBatch.sizes, size)
if c.currentBatch.size >= int64(c.batchSize) || len(c.currentBatch.Inputs) >= c.batchFiles {
err = c.sendBatch(ctx)
if err != nil {
return err
}
}
return nil
}
// Move does operations.Move via the cluster.
//
// Move src object to dst or fdst if nil. If dst is nil then it uses
// remote as the name of the new object.
func (c *Cluster) Move(ctx context.Context, fdst fs.Fs, dst fs.Object, remote string, src fs.Object) (err error) {
tr := accounting.Stats(ctx).NewTransfer(src, fdst)
if operations.SkipDestructive(ctx, src, "cluster move") {
in := tr.Account(ctx, nil)
in.DryRun(src.Size())
tr.Done(ctx, nil)
return nil
}
fsrc, ok := src.Fs().(fs.Fs)
if !ok {
err = errors.New("internal error: cluster move: can't cast src.Fs() to fs.Fs")
tr.Done(ctx, err)
return err
}
in := rc.Params{
"_path": "operations/movefile",
"dstFs": fs.ConfigStringFull(fdst),
"dstRemote": remote,
"srcFs": fs.ConfigStringFull(fsrc),
"srcRemote": src.Remote(),
}
if dst != nil {
in["dstRemote"] = dst.Remote()
}
return c.addToBatch(ctx, src, in, src.Size(), tr)
}
// Copy does operations.Copy via the cluster.
//
// Copy src object to dst or fdst if nil. If dst is nil then it uses
// remote as the name of the new object.
func (c *Cluster) Copy(ctx context.Context, fdst fs.Fs, dst fs.Object, remote string, src fs.Object) (err error) {
tr := accounting.Stats(ctx).NewTransfer(src, fdst)
if operations.SkipDestructive(ctx, src, "cluster copy") {
in := tr.Account(ctx, nil)
in.DryRun(src.Size())
tr.Done(ctx, nil)
return nil
}
fsrc, ok := src.Fs().(fs.Fs)
if !ok {
err = errors.New("internal error: cluster copy: can't cast src.Fs() to fs.Fs")
tr.Done(ctx, err)
return err
}
in := rc.Params{
"_path": "operations/copyfile",
"dstFs": fs.ConfigStringFull(fdst),
"dstRemote": remote,
"srcFs": fs.ConfigStringFull(fsrc),
"srcRemote": src.Remote(),
}
if dst != nil {
in["dstRemote"] = dst.Remote()
}
return c.addToBatch(ctx, src, in, src.Size(), tr)
}
// DeleteFile does operations.DeleteFile via the cluster
//
// If useBackupDir is set and --backup-dir is in effect then it moves
// the file to there instead of deleting
func (c *Cluster) DeleteFile(ctx context.Context, dst fs.Object) (err error) {
tr := accounting.Stats(ctx).NewCheckingTransfer(dst, "deleting")
err = accounting.Stats(ctx).DeleteFile(ctx, dst.Size())
if err != nil {
tr.Done(ctx, err)
return err
}
if operations.SkipDestructive(ctx, dst, "cluster delete") {
tr.Done(ctx, nil)
return
}
fdst, ok := dst.Fs().(fs.Fs)
if !ok {
err = errors.New("internal error: cluster delete: can't cast dst.Fs() to fs.Fs")
tr.Done(ctx, nil)
return err
}
in := rc.Params{
"_path": "operations/deletefile",
"fs": fs.ConfigStringFull(fdst),
"remote": dst.Remote(),
}
return c.addToBatch(ctx, dst, in, 0, tr)
}
// processCompletedJob loads the job and checks it off
func (c *Cluster) processCompletedJob(ctx context.Context, obj fs.Object) error {
name := path.Base(obj.Remote())
name, _ = strings.CutSuffix(name, ".json")
fs.Debugf(nil, "cluster: processing completed job %q", name)
var output BatchResult
err := c.jobs.readJob(ctx, obj, &output)
if err != nil {
return fmt.Errorf("check jobs read: %w", err)
}
c.mu.Lock()
input, ok := c.inflight[name]
// FIXME delete or save job
if !ok {
for k := range c.inflight {
fs.Debugf(nil, "key %q", k)
}
c.mu.Unlock()
return fmt.Errorf("check jobs: job %q not found", name)
}
c.mu.Unlock()
// Delete the inflight entry when batch is processed
defer func() {
c.mu.Lock()
delete(c.inflight, name)
c.mu.Unlock()
}()
// Check job
if output.Error != "" {
return fmt.Errorf("cluster: failed to run batch job: %s (%d)", output.Error, output.Status)
}
if len(input.Inputs) != len(output.Results) {
return fmt.Errorf("cluster: input had %d jobs but output had %d", len(input.Inputs), len(output.Results))
}
// Run through the batch and mark operations as successful or not
for i := range input.Inputs {
in := input.Inputs[i]
tr := input.trs[i]
size := input.sizes[i]
out := output.Results[i]
errorString, hasError := out["error"]
var err error
if hasError && errorString != "" {
err = fmt.Errorf("cluster: worker error: %s (%v)", errorString, out["status"])
}
if err == nil && in["_path"] == "operations/movefile" {
accounting.Stats(ctx).Renames(1)
}
acc := tr.Account(ctx, nil)
acc.AccountReadN(size)
tr.Done(ctx, err)
remote, ok := in["dstRemote"]
if !ok {
remote = in["remote"]
}
if err == nil {
fs.Infof(remote, "cluster %s successful", in["_path"])
} else {
fs.Errorf(remote, "cluster %s failed: %v", in["_path"], err)
}
}
return nil
}
// loadWorkerStatus updates the worker status
func (c *Cluster) loadWorkerStatus(ctx context.Context) error {
objs, err := c.jobs.listDir(ctx, clusterStatus)
if err != nil {
return fmt.Errorf("cluster: get job status list failed: %w", err)
}
ec := errcount.New()
g, gCtx := errgroup.WithContext(ctx)
var mu sync.Mutex
for _, obj := range objs {
g.Go(func() error {
buf, err := c.jobs.readFile(gCtx, obj)
if err != nil {
ec.Add(fmt.Errorf("read object: %w", err))
return nil
}
workerStatus := new(WorkerStatus)
err = json.Unmarshal(buf, workerStatus)
if err != nil {
ec.Add(fmt.Errorf("status json: %w", err))
return nil
}
mu.Lock()
c.workers[workerStatus.ID] = workerStatus
mu.Unlock()
return nil
})
}
return ec.Err("cluster: load status")
}
// checkWorkers loads the worker status
func (c *Cluster) checkWorkers(ctx context.Context) {
err := c.loadWorkerStatus(ctx)
if err != nil {
fs.Errorf(nil, "failed to read some worker status: %v", err)
}
for workerID, status := range c.workers {
timeSinceUpdated := time.Since(status.Updated)
if timeSinceUpdated > workerTimeout {
if _, isDead := c.deadWorkers[workerID]; isDead {
continue
}
fs.Errorf(nil, "cluster: haven't heard from worker %q for %v - assuming dead", workerID, timeSinceUpdated)
// Find any jobs claimed by worker and restart
objs, err := c.jobs.listDir(ctx, clusterProcessing)
if err != nil {
fs.Errorf(nil, "cluster: failed to find pending jobs: %v", err)
continue
}
for _, obj := range objs {
fs.Errorf(obj, "cluster: checking job")
// Jobs are named {jobID}-{workerID}.json
name := strings.TrimSuffix(path.Base(obj.Remote()), ".json")
dash := strings.LastIndex(name, "-")
if dash < 0 {
fs.Errorf(nil, "cluster: failed to find dash in job %q", name)
continue
}
jobID, jobWorkerID := name[:dash], name[dash+1:]
fs.Errorf(obj, "cluster: checking jobID %q, workerID %q", jobID, jobWorkerID)
if workerID != jobWorkerID {
fs.Debugf(nil, "cluster: job %q doesn't match %q", jobWorkerID, workerID)
continue
}
// Found a job running on worker - rename it back to Pending
newRemote := path.Join(clusterPending, jobID+".json")
_, err = c.jobs.rename(ctx, obj, newRemote)
if err != nil {
fs.Errorf(nil, "cluster: failed to restart job %q: %v", jobID, err)
continue
}
fs.Errorf(nil, "cluster: restarted job %q", jobID)
}
c.deadWorkers[workerID] = struct{}{}
} else {
if _, isDead := c.deadWorkers[workerID]; isDead {
fs.Errorf(nil, "cluster: dead worker %q came back to life!", workerID)
delete(c.deadWorkers, workerID)
}
}
}
}
// checkJobs sees if there are any completed jobs
func (c *Cluster) checkJobs(ctx context.Context) {
objs, err := c.jobs.listDir(ctx, clusterDone)
if err != nil {
fs.Errorf(nil, "cluster: get completed job list failed: %v", err)
return
}
for _, obj := range objs {
err := c.processCompletedJob(ctx, obj)
status := "output-ok"
ok := true
if err != nil {
status = "output-failed"
ok = false
fs.Errorf(nil, "cluster: process completed job failed: %v", err)
}
c.jobs.finish(ctx, obj, status, ok)
}
}
// Run the background process
func (c *Cluster) run(ctx context.Context) {
defer c.wg.Done()
checkJobs := time.NewTicker(clusterCheckJobsInterval)
defer checkJobs.Stop()
checkWorkers := time.NewTicker(clusterCheckWorkersInterval)
defer checkWorkers.Stop()
var syncedChans []chan<- struct{}
for {
select {
case <-ctx.Done():
return
case <-c.quit:
fs.Debugf(nil, "cluster: quit request received")
return
case synced := <-c.sync:
syncedChans = append(syncedChans, synced)
fs.Debugf(nil, "cluster: sync request received")
case <-checkWorkers.C:
c.checkWorkers(ctx)
case <-checkJobs.C:
}
c.checkJobs(ctx)
if len(syncedChans) > 0 {
c.mu.Lock()
n := len(c.inflight)
c.mu.Unlock()
if n == 0 {
fs.Debugf(nil, "cluster: synced")
for _, synced := range syncedChans {
synced <- struct{}{}
}
syncedChans = nil
}
}
}
}
// Sync the cluster.
//
// Call this when all job items have been added to the cluster.
//
// This will wait for any outstanding jobs to finish regardless of who
// put them in
func (c *Cluster) Sync(ctx context.Context) error {
// Flush any outstanding
c.mu.Lock()
err := c.sendBatch(ctx)
c.mu.Unlock()
// Wait for the cluster to be empty
done := make(chan struct{})
c.sync <- done
<-done
return err
}
// Shutdown the cluster.
//
// Call this when all job items have been added to the cluster.
//
// This will wait for any outstanding jobs to finish.
func (c *Cluster) Shutdown(ctx context.Context) (err error) {
c.mu.Lock()
inBatch := len(c.currentBatch.Inputs)
inFlight := len(c.inflight)
shutdown := c.shutdown
c.shutdown = true
c.mu.Unlock()
if inBatch > 0 {
err = errors.Join(nil, fmt.Errorf("%d items batched on cluster shutdown", inBatch))
}
if inFlight > 0 {
err = errors.Join(nil, fmt.Errorf("%d items in flight on cluster shutdown", inFlight))
}
if shutdown {
fs.Debugf(nil, "cluster: already shutdown")
return nil
}
c.quit <- struct{}{}
fs.Debugf(nil, "Waiting for cluster to finish")
c.wg.Wait()
// Send a quit job
if c.quitWorkers {
fs.Logf(nil, "Sending quit to workers")
quitErr := c.jobs.writeQuitJob(ctx, clusterPending)
if quitErr != nil {
err = errors.Join(err, fmt.Errorf("shutdown quit: %w", quitErr))
}
}
return err
}
// Abort the cluster and any outstanding jobs.
func (c *Cluster) Abort() {
c.cancel()
c.wg.Wait()
}

311
fs/cluster/jobs.go Normal file
View File

@@ -0,0 +1,311 @@
package cluster
import (
"bytes"
"cmp"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"os"
"path"
"slices"
"strings"
"time"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/cache"
"github.com/rclone/rclone/fs/hash"
"github.com/rclone/rclone/fs/object"
"github.com/rclone/rclone/fs/operations"
"github.com/rclone/rclone/lib/atexit"
"github.com/rclone/rclone/lib/pacer"
"github.com/rclone/rclone/lib/random"
)
// Batches flow from queue/pending to queue/processing/
const (
clusterQueue = "queue"
clusterPending = clusterQueue + "/pending"
clusterProcessing = clusterQueue + "/processing"
clusterDone = clusterQueue + "/done"
clusterFinished = clusterQueue + "/finished"
clusterStatus = clusterQueue + "/status"
minSleep = 10 * time.Millisecond
maxSleep = 2 * time.Second
decayConstant = 2 // bigger for slower decay, exponential
// Read the queue this often
clusterCheckJobsInterval = time.Second
// Write the worker status this often
clusterWriteStatusInterval = time.Second
// Read the worker status this often
clusterCheckWorkersInterval = time.Second
// Name of job which signals to the workers to quit
quitJob = "QUIT"
)
// Jobs is a container for sending and receiving jobs to the cluster.
type Jobs struct {
remote string // remote for job storage
f fs.Fs // cluster remote storage
partial bool // do we need to write and rename
hasMove bool // set if f has server side move otherwise has server side copy
cleanup fs.ClusterCleanup // how we cleanup the cluster files
pacer *fs.Pacer // To pace the API calls
}
// NewJobs creates a Jobs source from the config in ctx.
//
// It may return nil for no cluster is configured.
func NewJobs(ctx context.Context) (*Jobs, error) {
ci := fs.GetConfig(ctx)
if ci.Cluster == "" {
return nil, nil
}
f, err := cache.Get(ctx, ci.Cluster)
if err != nil {
return nil, fmt.Errorf("cluster remote creation: %w", err)
}
features := f.Features()
if features.Move == nil && features.Copy == nil {
return nil, fmt.Errorf("cluster remote must have server side move and %q doesn't", ci.Cluster)
}
jobs := &Jobs{
remote: ci.Cluster,
f: f,
partial: features.PartialUploads,
hasMove: features.Move != nil,
pacer: fs.NewPacer(ctx, pacer.NewDefault(pacer.MinSleep(minSleep), pacer.MaxSleep(maxSleep), pacer.DecayConstant(decayConstant))),
cleanup: ci.ClusterCleanup,
}
return jobs, nil
}
// Create the cluster directory structure
func (jobs *Jobs) createDirectoryStructure(ctx context.Context) (err error) {
for _, dir := range []string{clusterPending, clusterProcessing, clusterDone, clusterFinished, clusterStatus} {
err = jobs.f.Mkdir(ctx, dir)
if err != nil {
return fmt.Errorf("cluster mkdir %q: %w", dir, err)
}
}
return nil
}
// rename a file
//
// if this returns fs.ErrorObjectNotFound then the file has already been renamed.
func (jobs *Jobs) rename(ctx context.Context, src fs.Object, dstRemote string) (dst fs.Object, err error) {
features := jobs.f.Features()
if jobs.hasMove {
dst, err = features.Move(ctx, src, dstRemote)
if err != nil {
return nil, fmt.Errorf("cluster: failed to rename job file: %w", err)
}
} else {
dst, err = features.Copy(ctx, src, dstRemote)
if err != nil {
return nil, fmt.Errorf("cluster: failed to rename (copy phase) job file: %w", err)
}
err = src.Remove(ctx)
if err != nil {
return nil, fmt.Errorf("cluster: failed to rename (delete phase) job file: %w", err)
}
}
return dst, nil
}
// Finish with a jobs file
func (jobs *Jobs) finish(ctx context.Context, obj fs.Object, status string, ok bool) {
var err error
if (ok && jobs.cleanup == fs.ClusterCleanupCompleted) || jobs.cleanup == fs.ClusterCleanupFull {
err = obj.Remove(ctx)
} else {
name := path.Join(clusterFinished, status, path.Base(obj.Remote()))
_, err = jobs.rename(ctx, obj, name)
}
if err != nil {
fs.Errorf(nil, "cluster: removing completed job failed: %v", err)
}
}
// write buf into remote
func (jobs *Jobs) writeFile(ctx context.Context, remote string, modTime time.Time, buf []byte) error {
partialRemote := remote
if jobs.partial {
partialRemote = remote + ".partial"
}
// Calculate hashes
w, err := hash.NewMultiHasherTypes(jobs.f.Hashes())
if err != nil {
return err
}
_, err = w.Write(buf)
if err != nil {
return err
}
obji := object.NewStaticObjectInfo(partialRemote, modTime, int64(len(buf)), true, w.Sums(), jobs.f)
var obj fs.Object
err = jobs.pacer.Call(func() (bool, error) {
in := bytes.NewBuffer(buf)
obj, err = jobs.f.Put(ctx, in, obji)
if err != nil {
return true, fmt.Errorf("cluster: failed to write %q: %q", remote, err)
}
return false, nil
})
if err != nil {
return err
}
if jobs.partial {
obj, err = jobs.rename(ctx, obj, remote)
if err != nil {
return err
}
}
return nil
}
// Remove the file if it exists
func (jobs *Jobs) removeFile(ctx context.Context, remote string) error {
obj, err := jobs.f.NewObject(ctx, remote)
if errors.Is(err, fs.ErrorObjectNotFound) || errors.Is(err, fs.ErrorDirNotFound) {
return nil
} else if err != nil {
return err
}
return obj.Remove(ctx)
}
// write a job to a file returning the name
func (jobs *Jobs) writeJob(ctx context.Context, where string, job any) (name string, err error) {
now := time.Now().UTC()
name = fmt.Sprintf("%s-%s", now.Format(time.RFC3339Nano), random.String(20))
remote := path.Join(where, name+".json")
buf, err := json.MarshalIndent(job, "", "\t")
if err != nil {
return "", fmt.Errorf("cluster: job json: %w", err)
}
err = jobs.writeFile(ctx, remote, now, buf)
if err != nil {
return "", fmt.Errorf("cluster: job write: %w", err)
}
return name, nil
}
// write a quit job to a file
func (jobs *Jobs) writeQuitJob(ctx context.Context, where string) (err error) {
now := time.Now().UTC()
remote := path.Join(where, quitJob+".json")
err = jobs.writeFile(ctx, remote, now, []byte("{}"))
if err != nil {
return fmt.Errorf("cluster: quit job write: %w", err)
}
return nil
}
// read buf from object
func (jobs *Jobs) readFile(ctx context.Context, o fs.Object) (buf []byte, err error) {
err = jobs.pacer.Call(func() (bool, error) {
in, err := operations.Open(ctx, o)
if err != nil {
return true, fmt.Errorf("cluster: failed to open %q: %w", o, err)
}
buf, err = io.ReadAll(in)
if err != nil {
return true, fmt.Errorf("cluster: failed to read %q: %w", o, err)
}
err = in.Close()
if err != nil {
return true, fmt.Errorf("cluster: failed to close %q: %w", o, err)
}
return false, nil
})
if err != nil {
return nil, err
}
return buf, nil
}
// read a job from a file
//
// job should be a pointer to something to be unmarshalled
func (jobs *Jobs) readJob(ctx context.Context, obj fs.Object, job any) error {
buf, err := jobs.readFile(ctx, obj)
if err != nil {
return fmt.Errorf("cluster: job read: %w", err)
}
err = json.Unmarshal(buf, job)
if err != nil {
return fmt.Errorf("cluster: job read json: %w", err)
}
return nil
}
// lists the json files in a cluster directory
func (jobs *Jobs) listDir(ctx context.Context, dir string) (objects []fs.Object, err error) {
entries, err := jobs.f.List(ctx, dir)
if err != nil {
return nil, fmt.Errorf("cluster: failed to list %q: %w", dir, err)
}
entries.ForObject(func(o fs.Object) {
if strings.HasSuffix(o.Remote(), ".json") {
objects = append(objects, o)
}
})
slices.SortStableFunc(objects, func(a, b fs.Object) int {
return cmp.Compare(a.Remote(), b.Remote())
})
return objects, nil
}
// get a job from pending if there is one available.
//
// Returns a nil object if no jobs are available.
//
// FIXME should mark jobs as error jobs in here if they can't be read properly?
func (jobs *Jobs) getJob(ctx context.Context, id string) (name string, obj fs.Object, err error) {
objs, err := jobs.listDir(ctx, clusterPending)
if err != nil {
return "", nil, fmt.Errorf("get job list: %w", err)
}
quit := false
for len(objs) > 0 {
obj = objs[0]
objs = objs[1:]
name = path.Base(obj.Remote())
name, _ = strings.CutSuffix(name, ".json")
// See if we have been asked to quit
if name == quitJob {
quit = true
continue
}
// claim the job
newName := fmt.Sprintf("%s-%s.json", name, id)
newRemote := path.Join(clusterProcessing, newName)
obj, err = jobs.rename(ctx, obj, newRemote)
if errors.Is(err, fs.ErrorObjectNotFound) {
// claim failed - try again
continue
}
if err != nil {
return "", nil, fmt.Errorf("get job claim: %w", err)
}
return name, obj, nil
}
// No jobs found
if quit {
fs.Logf(nil, "Exiting cluster worker on command")
atexit.Run()
os.Exit(0)
}
return "", nil, nil
}

211
fs/cluster/worker.go Normal file
View File

@@ -0,0 +1,211 @@
package cluster
import (
"context"
"encoding/json"
"path"
"sync"
"time"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/accounting"
"github.com/rclone/rclone/fs/rc"
"github.com/rclone/rclone/fs/rc/jobs"
"github.com/rclone/rclone/lib/random"
)
const maxWorkersDone = 16 // maximum jobs in the done list
// Worker describes a single instance of a cluster worker.
type Worker struct {
jobs *Jobs
cancel func() // stop bg job
wg sync.WaitGroup // bg job finished
id string // id of this worker
status string // place it stores it status
jobsMu sync.Mutex
running map[string]struct{} // IDs of the jobs being processed
done []string // IDs of finished jobs
}
// WorkerStatus shows the status of this worker including jobs
// running.
type WorkerStatus struct {
ID string `json:"id"`
Running map[string]rc.Params `json:"running"` // Job ID => accounting.RemoteStats
Done map[string]bool `json:"done"` // Job ID => finished status
Updated time.Time `json:"updated"`
}
// NewWorker creates a new cluster from the config in ctx.
//
// It may return nil for no cluster is configured.
func NewWorker(ctx context.Context) (*Worker, error) {
ci := fs.GetConfig(ctx)
if ci.Cluster == "" {
return nil, nil
}
jobs, err := NewJobs(ctx)
if err != nil {
return nil, err
}
w := &Worker{
jobs: jobs,
id: ci.ClusterID,
running: make(map[string]struct{}),
}
if w.id == "" {
w.id = random.String(10)
}
w.status = path.Join(clusterStatus, w.id+".json")
// Start the background workers
bgCtx, cancel := context.WithCancel(context.Background())
w.cancel = cancel
w.wg.Add(1)
go w.runJobs(bgCtx)
w.wg.Add(1)
go w.runStatus(bgCtx)
fs.Logf(w.jobs.f, "Started cluster worker")
return w, nil
}
// Check to see if a job exists and run it if available
func (w *Worker) checkJobs(ctx context.Context) {
name, obj, err := w.jobs.getJob(ctx, w.id)
if err != nil {
fs.Errorf(nil, "check jobs get: %v", err)
return
}
if obj == nil {
return // no jobs available
}
// make a stats group for this job
ctx = accounting.WithStatsGroup(ctx, name)
// Add job ID
w.jobsMu.Lock()
w.running[name] = struct{}{}
w.jobsMu.Unlock()
fs.Infof(nil, "write jobID %q", name)
// Remove job ID on exit
defer func() {
w.jobsMu.Lock()
delete(w.running, name)
w.done = append(w.done, name)
if len(w.done) > maxWorkersDone {
w.done = w.done[len(w.done)-maxWorkersDone : len(w.done)]
}
w.jobsMu.Unlock()
}()
fs.Debugf(nil, "cluster: processing pending job %q", name)
inBuf, err := w.jobs.readFile(ctx, obj)
if err != nil {
fs.Errorf(nil, "check jobs read: %v", err)
w.jobs.finish(ctx, obj, "input-error", false)
return
}
outBuf := jobs.NewJobFromBytes(ctx, inBuf)
remote := path.Join(clusterDone, name+".json")
err = w.jobs.writeFile(ctx, remote, time.Now(), outBuf)
if err != nil {
fs.Errorf(nil, "check jobs failed to write output: %v", err)
return
}
w.jobs.finish(ctx, obj, "input-ok", true)
fs.Debugf(nil, "cluster: processed pending job %q", name)
}
// Run the background process to pick up jobs
func (w *Worker) runJobs(ctx context.Context) {
defer w.wg.Done()
checkJobs := time.NewTicker(clusterCheckJobsInterval)
defer checkJobs.Stop()
for {
select {
case <-ctx.Done():
return
case <-checkJobs.C:
w.checkJobs(ctx)
}
}
}
// Write the worker status
func (w *Worker) writeStatus(ctx context.Context) {
// Create the worker status from the jobIDs and the short stats
status := WorkerStatus{
ID: w.id,
Running: make(map[string]rc.Params),
Updated: time.Now(),
Done: make(map[string]bool),
}
w.jobsMu.Lock()
for _, jobID := range w.done {
status.Done[jobID] = true
}
for jobID := range w.running {
fs.Infof(nil, "read jobID %q", jobID)
si := accounting.StatsGroup(ctx, jobID)
out, err := si.RemoteStats(true)
if err != nil {
fs.Errorf(nil, "cluster: write status: stats: %v", err)
status.Running[jobID] = rc.Params{}
} else {
status.Running[jobID] = out
}
status.Done[jobID] = false
}
w.jobsMu.Unlock()
// Write the stats to a file
buf, err := json.MarshalIndent(status, "", "\t")
if err != nil {
fs.Errorf(nil, "cluster: write status: json: %w", err)
return
}
err = w.jobs.writeFile(ctx, w.status, status.Updated, buf)
if err != nil {
fs.Errorf(nil, "cluster: write status: %w", err)
}
}
// Remove the worker status
func (w *Worker) clearStatus(ctx context.Context) {
err := w.jobs.removeFile(ctx, w.status)
if err != nil {
fs.Errorf(nil, "cluster: clear status: %w", err)
}
}
// Run the background process to write status
func (w *Worker) runStatus(ctx context.Context) {
defer w.wg.Done()
w.writeStatus(ctx)
defer w.clearStatus(ctx)
writeStatus := time.NewTicker(clusterWriteStatusInterval)
defer writeStatus.Stop()
for {
select {
case <-ctx.Done():
return
case <-writeStatus.C:
t0 := time.Now()
w.writeStatus(ctx)
fs.Debugf(nil, "write status took %v at %v", time.Since(t0), t0)
}
}
}
// Shutdown the worker regardless of whether it has work to process or not.
func (w *Worker) Shutdown(ctx context.Context) error {
w.cancel()
w.wg.Wait()
return nil
}

View File

@@ -50,6 +50,34 @@ var (
ConfigEdit = "config_fs_edit"
)
// ClusterCleanup describes the cluster cleanup choices.
type ClusterCleanup = Enum[clusterCleanupChoices]
// Cluster cleanup choices.
//
// ClusterCleanupNone don't remove any cluster files
// ClusterCleanupCompleted remove successfully completed jobs
// ClusterCleanupFull remove everything on exit
const (
ClusterCleanupNone ClusterCleanup = iota
ClusterCleanupCompleted
ClusterCleanupFull
)
type clusterCleanupChoices struct{}
func (clusterCleanupChoices) Choices() []string {
return []string{
ClusterCleanupNone: "none",
ClusterCleanupCompleted: "completed",
ClusterCleanupFull: "full",
}
}
func (clusterCleanupChoices) Type() string {
return "ClusterCleanup"
}
// ConfigOptionsInfo describes the Options in use
var ConfigOptionsInfo = Options{{
Name: "modify_window",
@@ -567,11 +595,35 @@ var ConfigOptionsInfo = Options{{
Help: "HTTP proxy URL.",
Groups: "Networking",
}, {
Name: "assume_listings_sorted",
Default: false,
Advanced: true,
Help: "If set will not sort listings. If listings aren't sorted the sync may go wrong.",
Groups: "Copy",
Name: "cluster",
Default: "",
Help: "Enable cluster mode with remote to use as shared storage.",
Groups: "Networking",
}, {
Name: "cluster_id",
Default: "",
Help: "Set to an ID for the cluster. An ID of 0 or empty becomes the controller.",
Groups: "Networking",
}, {
Name: "cluster_quit_workers",
Default: false,
Help: "Set to cause the controller to quit the workers when it finished.",
Groups: "Networking",
}, {
Name: "cluster_batch_files",
Default: 1000,
Help: "Max number of files for a cluster batch.",
Groups: "Networking",
}, {
Name: "cluster_batch_size",
Default: Tebi,
Help: "Max size of files for a cluster batch.",
Groups: "Networking",
}, {
Name: "cluster_cleanup",
Default: ClusterCleanupFull,
Help: "Control which cluster files get cleaned up.",
Groups: "Networking",
}}
// ConfigInfo is filesystem config options
@@ -686,7 +738,12 @@ type ConfigInfo struct {
MaxConnections int `config:"max_connections"`
NameTransform []string `config:"name_transform"`
HTTPProxy string `config:"http_proxy"`
AssumeListingsSorted bool `config:"assume_listings_sorted"`
Cluster string `config:"cluster"`
ClusterID string `config:"cluster_id"`
ClusterQuitWorkers bool `config:"cluster_quit_workers"`
ClusterBatchFiles int `config:"cluster_batch_files"`
ClusterBatchSize SizeSuffix `config:"cluster_batch_size"`
ClusterCleanup ClusterCleanup `config:"cluster_cleanup"`
}
func init() {

View File

@@ -2,21 +2,9 @@ package fs
import (
"context"
"encoding/json"
"errors"
"fmt"
"os"
"strings"
"sync"
"testing"
"time"
"github.com/spf13/pflag"
"github.com/stretchr/testify/require"
"github.com/rclone/rclone/fs/config/configmap"
"github.com/rclone/rclone/fs/fserrors"
"github.com/rclone/rclone/lib/pacer"
"github.com/stretchr/testify/assert"
)
@@ -90,315 +78,3 @@ func TestFeaturesDisableList(t *testing.T) {
assert.False(t, ft.CaseInsensitive)
assert.False(t, ft.DuplicateFiles)
}
// Check it satisfies the interface
var _ pflag.Value = (*Option)(nil)
func TestOption(t *testing.T) {
d := &Option{
Name: "potato",
Value: SizeSuffix(17 << 20),
}
assert.Equal(t, "17Mi", d.String())
assert.Equal(t, "SizeSuffix", d.Type())
err := d.Set("18M")
assert.NoError(t, err)
assert.Equal(t, SizeSuffix(18<<20), d.Value)
err = d.Set("sdfsdf")
assert.Error(t, err)
}
var errFoo = errors.New("foo")
type dummyPaced struct {
retry bool
called int
wait *sync.Cond
}
func (dp *dummyPaced) fn() (bool, error) {
if dp.wait != nil {
dp.wait.L.Lock()
dp.wait.Wait()
dp.wait.L.Unlock()
}
dp.called++
return dp.retry, errFoo
}
func TestPacerCall(t *testing.T) {
ctx := context.Background()
config := GetConfig(ctx)
expectedCalled := config.LowLevelRetries
if expectedCalled == 0 {
ctx, config = AddConfig(ctx)
expectedCalled = 20
config.LowLevelRetries = expectedCalled
}
p := NewPacer(ctx, pacer.NewDefault(pacer.MinSleep(1*time.Millisecond), pacer.MaxSleep(2*time.Millisecond)))
dp := &dummyPaced{retry: true}
err := p.Call(dp.fn)
require.Equal(t, expectedCalled, dp.called)
require.Implements(t, (*fserrors.Retrier)(nil), err)
}
func TestPacerCallNoRetry(t *testing.T) {
p := NewPacer(context.Background(), pacer.NewDefault(pacer.MinSleep(1*time.Millisecond), pacer.MaxSleep(2*time.Millisecond)))
dp := &dummyPaced{retry: true}
err := p.CallNoRetry(dp.fn)
require.Equal(t, 1, dp.called)
require.Implements(t, (*fserrors.Retrier)(nil), err)
}
// Test options
var (
nouncOption = Option{
Name: "nounc",
}
copyLinksOption = Option{
Name: "copy_links",
Default: false,
NoPrefix: true,
ShortOpt: "L",
Advanced: true,
}
caseInsensitiveOption = Option{
Name: "case_insensitive",
Default: false,
Value: true,
Advanced: true,
}
testOptions = Options{nouncOption, copyLinksOption, caseInsensitiveOption}
)
func TestOptionsSetValues(t *testing.T) {
assert.Nil(t, testOptions[0].Default)
assert.Equal(t, false, testOptions[1].Default)
assert.Equal(t, false, testOptions[2].Default)
testOptions.setValues()
assert.Equal(t, "", testOptions[0].Default)
assert.Equal(t, false, testOptions[1].Default)
assert.Equal(t, false, testOptions[2].Default)
}
func TestOptionsGet(t *testing.T) {
opt := testOptions.Get("copy_links")
assert.Equal(t, &copyLinksOption, opt)
opt = testOptions.Get("not_found")
assert.Nil(t, opt)
}
func TestOptionsOveridden(t *testing.T) {
m := configmap.New()
m1 := configmap.Simple{
"nounc": "m1",
"copy_links": "m1",
}
m.AddGetter(m1, configmap.PriorityNormal)
m2 := configmap.Simple{
"nounc": "m2",
"case_insensitive": "m2",
}
m.AddGetter(m2, configmap.PriorityConfig)
m3 := configmap.Simple{
"nounc": "m3",
}
m.AddGetter(m3, configmap.PriorityDefault)
got := testOptions.Overridden(m)
assert.Equal(t, configmap.Simple{
"copy_links": "m1",
"nounc": "m1",
}, got)
}
func TestOptionsNonDefault(t *testing.T) {
m := configmap.Simple{}
got := testOptions.NonDefault(m)
assert.Equal(t, configmap.Simple{}, got)
m["case_insensitive"] = "false"
got = testOptions.NonDefault(m)
assert.Equal(t, configmap.Simple{}, got)
m["case_insensitive"] = "true"
got = testOptions.NonDefault(m)
assert.Equal(t, configmap.Simple{"case_insensitive": "true"}, got)
}
func TestOptionMarshalJSON(t *testing.T) {
out, err := json.MarshalIndent(&caseInsensitiveOption, "", "")
assert.NoError(t, err)
require.Equal(t, `{
"Name": "case_insensitive",
"FieldName": "",
"Help": "",
"Default": false,
"Value": true,
"Hide": 0,
"Required": false,
"IsPassword": false,
"NoPrefix": false,
"Advanced": true,
"Exclusive": false,
"Sensitive": false,
"DefaultStr": "false",
"ValueStr": "true",
"Type": "bool"
}`, string(out))
}
func TestOptionGetValue(t *testing.T) {
assert.Equal(t, "", nouncOption.GetValue())
assert.Equal(t, false, copyLinksOption.GetValue())
assert.Equal(t, true, caseInsensitiveOption.GetValue())
}
func TestOptionString(t *testing.T) {
assert.Equal(t, "", nouncOption.String())
assert.Equal(t, "false", copyLinksOption.String())
assert.Equal(t, "true", caseInsensitiveOption.String())
}
func TestOptionStringStringArray(t *testing.T) {
opt := Option{
Name: "string_array",
Default: []string(nil),
}
assert.Equal(t, "", opt.String())
opt.Default = []string{}
assert.Equal(t, "", opt.String())
opt.Default = []string{"a", "b"}
assert.Equal(t, "a,b", opt.String())
opt.Default = []string{"hello, world!", "goodbye, world!"}
assert.Equal(t, `"hello, world!","goodbye, world!"`, opt.String())
}
func TestOptionStringSizeSuffix(t *testing.T) {
opt := Option{
Name: "size_suffix",
Default: SizeSuffix(0),
}
assert.Equal(t, "0", opt.String())
opt.Default = SizeSuffix(-1)
assert.Equal(t, "off", opt.String())
opt.Default = SizeSuffix(100)
assert.Equal(t, "100B", opt.String())
opt.Default = SizeSuffix(1024)
assert.Equal(t, "1Ki", opt.String())
}
func TestOptionSet(t *testing.T) {
o := caseInsensitiveOption
assert.Equal(t, true, o.Value)
err := o.Set("FALSE")
assert.NoError(t, err)
assert.Equal(t, false, o.Value)
o = copyLinksOption
assert.Equal(t, nil, o.Value)
err = o.Set("True")
assert.NoError(t, err)
assert.Equal(t, true, o.Value)
err = o.Set("INVALID")
assert.Error(t, err)
assert.Equal(t, true, o.Value)
}
func TestOptionType(t *testing.T) {
assert.Equal(t, "string", nouncOption.Type())
assert.Equal(t, "bool", copyLinksOption.Type())
assert.Equal(t, "bool", caseInsensitiveOption.Type())
}
func TestOptionFlagName(t *testing.T) {
assert.Equal(t, "local-nounc", nouncOption.FlagName("local"))
assert.Equal(t, "copy-links", copyLinksOption.FlagName("local"))
assert.Equal(t, "local-case-insensitive", caseInsensitiveOption.FlagName("local"))
}
func TestOptionEnvVarName(t *testing.T) {
assert.Equal(t, "RCLONE_LOCAL_NOUNC", nouncOption.EnvVarName("local"))
assert.Equal(t, "RCLONE_LOCAL_COPY_LINKS", copyLinksOption.EnvVarName("local"))
assert.Equal(t, "RCLONE_LOCAL_CASE_INSENSITIVE", caseInsensitiveOption.EnvVarName("local"))
}
func TestOptionGetters(t *testing.T) {
// Set up env vars
envVars := [][2]string{
{"RCLONE_CONFIG_LOCAL_POTATO_PIE", "yes"},
{"RCLONE_COPY_LINKS", "TRUE"},
{"RCLONE_LOCAL_NOUNC", "NOUNC"},
}
for _, ev := range envVars {
assert.NoError(t, os.Setenv(ev[0], ev[1]))
}
defer func() {
for _, ev := range envVars {
assert.NoError(t, os.Unsetenv(ev[0]))
}
}()
oldConfigFileGet := ConfigFileGet
ConfigFileGet = func(section, key string) (string, bool) {
if section == "sausage" && key == "key1" {
return "value1", true
}
return "", false
}
defer func() {
ConfigFileGet = oldConfigFileGet
}()
// set up getters
// A configmap.Getter to read from the environment RCLONE_CONFIG_backend_option_name
configEnvVarsGetter := configEnvVars("local")
// A configmap.Getter to read from the environment RCLONE_option_name
optionEnvVarsGetter := optionEnvVars{"local", testOptions}
// A configmap.Getter to read either the default value or the set
// value from the RegInfo.Options
regInfoValuesGetterFalse := &regInfoValues{
options: testOptions,
useDefault: false,
}
regInfoValuesGetterTrue := &regInfoValues{
options: testOptions,
useDefault: true,
}
// A configmap.Setter to read from the config file
configFileGetter := getConfigFile("sausage")
for i, test := range []struct {
get configmap.Getter
key string
wantValue string
wantOk bool
}{
{configEnvVarsGetter, "not_found", "", false},
{configEnvVarsGetter, "potato_pie", "yes", true},
{optionEnvVarsGetter, "not_found", "", false},
{optionEnvVarsGetter, "copy_links", "TRUE", true},
{optionEnvVarsGetter, "nounc", "NOUNC", true},
{optionEnvVarsGetter, "case_insensitive", "", false},
{regInfoValuesGetterFalse, "not_found", "", false},
{regInfoValuesGetterFalse, "case_insensitive", "true", true},
{regInfoValuesGetterFalse, "copy_links", "", false},
{regInfoValuesGetterTrue, "not_found", "", false},
{regInfoValuesGetterTrue, "case_insensitive", "true", true},
{regInfoValuesGetterTrue, "copy_links", "false", true},
{configFileGetter, "not_found", "", false},
{configFileGetter, "key1", "value1", true},
} {
what := fmt.Sprintf("%d: %+v: %q", i, test.get, test.key)
gotValue, gotOk := test.get.Get(test.key)
assert.Equal(t, test.wantValue, gotValue, what)
assert.Equal(t, test.wantOk, gotOk, what)
}
}

View File

@@ -45,7 +45,6 @@ type Sorter struct {
keyFn KeyFn // transform an entry into a sort key
cutoff int // number of entries above which we start extsort
extSort bool // true if we are ext sorting
noSort bool // true if we aren't sorting
inputChan chan string // for sending data to the ext sort
outputChan <-chan string // for receiving data from the ext sort
errChan <-chan error // for getting errors from the ext sort
@@ -79,7 +78,6 @@ func NewSorter(ctx context.Context, f NewObjecter, callback fs.ListRCallback, ke
keyFn: keyFn,
cutoff: ci.ListCutoff,
errs: errcount.New(),
noSort: ci.AssumeListingsSorted,
}, nil
}
@@ -174,9 +172,6 @@ func (ls *Sorter) startExtSort() (err error) {
//
// Safe to call from concurrent go routines
func (ls *Sorter) Add(entries fs.DirEntries) error {
if ls.noSort {
return ls.callback(entries)
}
ls.mu.Lock()
defer ls.mu.Unlock()
if ls.extSort {
@@ -272,9 +267,6 @@ func (lh *listHelper) Flush() error {
// Send the sorted entries to the callback.
func (ls *Sorter) Send() (err error) {
if ls.noSort {
return nil
}
ls.mu.Lock()
defer ls.mu.Unlock()

View File

@@ -46,46 +46,6 @@ func TestSorter(t *testing.T) {
assert.Equal(t, fs.DirEntries(nil), ls.entries)
}
func TestSorterAssumeSorted(t *testing.T) {
ctx, ci := fs.AddConfig(context.Background())
ci.AssumeListingsSorted = true
gotEntry := 0
wantEntries := fs.DirEntries{
mockdir.New("c"),
mockobject.Object("C"),
mockdir.New("b"),
mockobject.Object("B"),
mockdir.New("a"),
mockobject.Object("A"),
}
callback := func(entries fs.DirEntries) error {
for _, entry := range entries {
require.Equal(t, wantEntries[gotEntry], entry)
gotEntry++
}
return nil
}
ls, err := NewSorter(ctx, nil, callback, nil)
require.NoError(t, err)
// Test Add
require.NoError(t, ls.Add(wantEntries[0:2]))
require.NoError(t, ls.Add(wantEntries[2:6]))
assert.Equal(t, 6, gotEntry)
assert.Equal(t, fs.DirEntries(nil), ls.entries)
// Test Send
err = ls.Send()
require.NoError(t, err)
assert.Equal(t, 6, gotEntry)
// Test Cleanup
ls.CleanUp()
assert.Equal(t, 6, gotEntry)
assert.Equal(t, fs.DirEntries(nil), ls.entries)
}
func TestSorterIdentity(t *testing.T) {
ctx := context.Background()
cmpFn := func(a, b fs.DirEntry) int {

View File

@@ -13,7 +13,6 @@ import (
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/dirtree"
"github.com/rclone/rclone/fs/filter"
"github.com/rclone/rclone/fs/fserrors"
"github.com/rclone/rclone/fs/list"
"github.com/rclone/rclone/fs/walk"
"github.com/rclone/rclone/lib/transform"
@@ -331,8 +330,7 @@ func (m *March) matchListings(srcChan, dstChan <-chan fs.DirEntry, srcOnly, dstO
continue
} else if srcName < srcPrevName {
// this should never happen since we sort the listings
// however the user may be using the --assume-listings-sorted flag
return fserrors.FatalError(fmt.Errorf("out of order listing in source (%v)", src.Fs()))
panic("Out of order listing in source")
}
}
if dst != nil && dstPrev != nil {
@@ -342,8 +340,7 @@ func (m *March) matchListings(srcChan, dstChan <-chan fs.DirEntry, srcOnly, dstO
continue
} else if dstName < dstPrevName {
// this should never happen since we sort the listings
// however the user may be using the --assume-listings-sorted flag
return fserrors.FatalError(fmt.Errorf("out of order listing in destination (%v)", dst.Fs()))
panic("Out of order listing in destination")
}
}
switch {

57
fs/pacer_test.go Normal file
View File

@@ -0,0 +1,57 @@
package fs
import (
"context"
"errors"
"sync"
"testing"
"time"
"github.com/rclone/rclone/fs/fserrors"
"github.com/rclone/rclone/lib/pacer"
"github.com/stretchr/testify/require"
)
var errFoo = errors.New("foo")
type dummyPaced struct {
retry bool
called int
wait *sync.Cond
}
func (dp *dummyPaced) fn() (bool, error) {
if dp.wait != nil {
dp.wait.L.Lock()
dp.wait.Wait()
dp.wait.L.Unlock()
}
dp.called++
return dp.retry, errFoo
}
func TestPacerCall(t *testing.T) {
ctx := context.Background()
config := GetConfig(ctx)
expectedCalled := config.LowLevelRetries
if expectedCalled == 0 {
ctx, config = AddConfig(ctx)
expectedCalled = 20
config.LowLevelRetries = expectedCalled
}
p := NewPacer(ctx, pacer.NewDefault(pacer.MinSleep(1*time.Millisecond), pacer.MaxSleep(2*time.Millisecond)))
dp := &dummyPaced{retry: true}
err := p.Call(dp.fn)
require.Equal(t, expectedCalled, dp.called)
require.Implements(t, (*fserrors.Retrier)(nil), err)
}
func TestPacerCallNoRetry(t *testing.T) {
p := NewPacer(context.Background(), pacer.NewDefault(pacer.MinSleep(1*time.Millisecond), pacer.MaxSleep(2*time.Millisecond)))
dp := &dummyPaced{retry: true}
err := p.CallNoRetry(dp.fn)
require.Equal(t, 1, dp.called)
require.Implements(t, (*fserrors.Retrier)(nil), err)
}

View File

@@ -2,11 +2,15 @@
package jobs
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
"runtime/debug"
"slices"
"strings"
"sync"
"sync/atomic"
"time"
@@ -17,6 +21,7 @@ import (
"github.com/rclone/rclone/fs/cache"
"github.com/rclone/rclone/fs/filter"
"github.com/rclone/rclone/fs/rc"
"golang.org/x/sync/errgroup"
)
// Fill in these to avoid circular dependencies
@@ -475,3 +480,249 @@ func rcGroupStop(ctx context.Context, in rc.Params) (out rc.Params, err error) {
out = make(rc.Params)
return out, nil
}
// NewJobFromParams creates an rc job rc.Params.
//
// The JSON blob should contain a _path entry.
//
// It returns a rc.Params as output which may be an error.
func NewJobFromParams(ctx context.Context, in rc.Params) (out rc.Params) {
path := "unknown"
// Return an rc error blob
rcError := func(err error, status int) rc.Params {
fs.Errorf(nil, "rc: %q: error: %v", path, err)
out, _ = rc.Error(path, in, err, status)
return out
}
// Find the call
path, err := in.GetString("_path")
if err != nil {
return rcError(err, http.StatusNotFound)
}
delete(in, "_path")
call := rc.Calls.Get(path)
if call == nil {
return rcError(fmt.Errorf("couldn't find path %q", path), http.StatusNotFound)
}
if call.NeedsRequest {
return rcError(fmt.Errorf("can't run path %q as it needs the request", path), http.StatusBadRequest)
}
if call.NeedsResponse {
return rcError(fmt.Errorf("can't run path %q as it needs the response", path), http.StatusBadRequest)
}
// Pass on the group if one is set in the context and it isn't set in the input.
if _, found := in["_group"]; !found {
group, ok := accounting.StatsGroupFromContext(ctx)
if ok {
in["_group"] = group
}
}
fs.Debugf(nil, "rc: %q: with parameters %+v", path, in)
_, out, err = NewJob(ctx, call.Fn, in)
if err != nil {
return rcError(err, http.StatusInternalServerError)
}
if out == nil {
out = make(rc.Params)
}
fs.Debugf(nil, "rc: %q: reply %+v: %v", path, out, err)
return out
}
// NewJobFromBytes creates an rc job from a JSON blob as bytes.
//
// The JSON blob should contain a _path entry.
//
// It returns a JSON blob as output which may be an error.
func NewJobFromBytes(ctx context.Context, inBuf []byte) (outBuf []byte) {
var in rc.Params
var out rc.Params
// Parse a JSON blob from the input
err := json.Unmarshal(inBuf, &in)
if err != nil {
out, _ = rc.Error("unknown", in, err, http.StatusBadRequest)
} else {
out = NewJobFromParams(ctx, in)
}
var w bytes.Buffer
err = rc.WriteJSON(&w, out)
if err != nil {
fs.Errorf(nil, "rc: NewJobFromBytes: failed to write JSON output: %v", err)
return []byte(`{"error":"failed to write JSON output"}`)
}
return w.Bytes()
}
func init() {
rc.Add(rc.Call{
Path: "job/batch",
AuthRequired: true, // require auth always since sub commands may require it
Fn: rcBatch,
Title: "Run a batch of rclone rc commands concurrently.",
Help: strings.ReplaceAll(`
This takes the following parameters:
- concurrency - int - do this many commands concurrently. Defaults to |--transfers| if not set.
- inputs - an list of inputs to the commands with an extra |_path| parameter
|||json
{
"_path": "rc/path",
"param1": "parameter for the path as documented",
"param2": "parameter for the path as documented, etc",
}
|||json
The inputs may use |_async|, |_group|, |_config| and |_filter| as normal when using the rc.
Returns:
- results - a list of results from the commands with one entry for each in inputs.
For example:
|||sh
rclone rc job/batch --json '{
"inputs": [
{
"_path": "rc/noop",
"parameter": "OK"
},
{
"_path": "rc/error",
"parameter": "BAD"
}
]
}
'
|||
Gives the result:
|||json
{
"results": [
{
"parameter": "OK"
},
{
"error": "arbitrary error on input map[parameter:BAD]",
"input": {
"parameter": "BAD"
},
"path": "rc/error",
"status": 500
}
]
}
|||
`, "|", "`"),
})
}
/*
// Run a single batch job
func runBatchJob(ctx context.Context, inputAny any) (out rc.Params, err error) {
var in rc.Params
path := "unknown"
defer func() {
if err != nil {
out, _ = rc.Error(path, in, err, http.StatusInternalServerError)
}
}()
// get the inputs to the job
input, ok := inputAny.(map[string]any)
if !ok {
return nil, rc.NewErrParamInvalid(fmt.Errorf("\"inputs\" items must be objects not %T", inputAny))
}
in = rc.Params(input)
path, err = in.GetString("_path")
if err != nil {
return nil, err
}
delete(in, "_path")
call := rc.Calls.Get(path)
// Check call
if call == nil {
return nil, rc.NewErrParamInvalid(fmt.Errorf("path %q does not exist", path))
}
path = call.Path
if call.NeedsRequest {
return nil, rc.NewErrParamInvalid(fmt.Errorf("can't run path %q as it needs the request", path))
}
if call.NeedsResponse {
return nil, rc.NewErrParamInvalid(fmt.Errorf("can't run path %q as it needs the response", path))
}
// Run the job
_, out, err = NewJob(ctx, call.Fn, in)
if err != nil {
return nil, err
}
// Reshape (serialize then deserialize) the data so it is in the form expected
err = rc.Reshape(&out, out)
if err != nil {
return nil, err
}
return out, nil
}
*/
// Batch the registered commands
func rcBatch(ctx context.Context, in rc.Params) (out rc.Params, err error) {
out = make(rc.Params)
// Read inputs
inputsAny, err := in.Get("inputs")
if err != nil {
return nil, err
}
inputs, ok := inputsAny.([]any)
if !ok {
return nil, rc.NewErrParamInvalid(fmt.Errorf("expecting list key %q (was %T)", "inputs", inputsAny))
}
// Read concurrency
concurrency, err := in.GetInt64("concurrency")
if rc.IsErrParamNotFound(err) {
ci := fs.GetConfig(ctx)
concurrency = int64(ci.Transfers)
} else if err != nil {
return nil, err
}
// Prepare outputs
results := make([]rc.Params, len(inputs))
out["results"] = results
g, gCtx := errgroup.WithContext(ctx)
g.SetLimit(int(concurrency))
for i, inputAny := range inputs {
input, ok := inputAny.(map[string]any)
if !ok {
results[i], _ = rc.Error("unknown", nil, fmt.Errorf("\"inputs\" items must be objects not %T", inputAny), http.StatusBadRequest)
continue
}
in := rc.Params(input)
if concurrency <= 1 {
results[i] = NewJobFromParams(ctx, in)
} else {
g.Go(func() error {
results[i] = NewJobFromParams(gCtx, in)
return nil
})
}
}
_ = g.Wait()
return out, nil
}

View File

@@ -2,6 +2,7 @@ package jobs
import (
"context"
"encoding/json"
"errors"
"runtime"
"testing"
@@ -602,3 +603,294 @@ func TestOnFinishDataRace(t *testing.T) {
}
}
}
// Register some test rc calls
func init() {
rc.Add(rc.Call{
Path: "test/needs_request",
NeedsRequest: true,
})
rc.Add(rc.Call{
Path: "test/needs_response",
NeedsResponse: true,
})
}
func TestNewJobFromParams(t *testing.T) {
ctx := context.Background()
for _, test := range []struct {
in rc.Params
want rc.Params
}{{
in: rc.Params{
"_path": "rc/noop",
"a": "potato",
},
want: rc.Params{
"a": "potato",
},
}, {
in: rc.Params{
"_path": "rc/noop",
"b": "sausage",
},
want: rc.Params{
"b": "sausage",
},
}, {
in: rc.Params{
"_path": "rc/error",
"e": "sausage",
},
want: rc.Params{
"error": "arbitrary error on input map[e:sausage]",
"input": rc.Params{
"e": "sausage",
},
"path": "rc/error",
"status": 500,
},
}, {
in: rc.Params{
"_path": "bad/path",
"param": "sausage",
},
want: rc.Params{
"error": "couldn't find path \"bad/path\"",
"input": rc.Params{
"param": "sausage",
},
"path": "bad/path",
"status": 404,
},
}, {
in: rc.Params{
"_path": "test/needs_request",
},
want: rc.Params{
"error": "can't run path \"test/needs_request\" as it needs the request",
"input": rc.Params{},
"path": "test/needs_request",
"status": 400,
},
}, {
in: rc.Params{
"_path": "test/needs_response",
},
want: rc.Params{
"error": "can't run path \"test/needs_response\" as it needs the response",
"input": rc.Params{},
"path": "test/needs_response",
"status": 400,
},
}, {
in: rc.Params{
"nopath": "BOOM",
},
want: rc.Params{
"error": "Didn't find key \"_path\" in input",
"input": rc.Params{
"nopath": "BOOM",
},
"path": "",
"status": 400,
},
}} {
got := NewJobFromParams(ctx, test.in)
assert.Equal(t, test.want, got)
}
}
func TestNewJobFromBytes(t *testing.T) {
ctx := context.Background()
for _, test := range []struct {
in string
want string
}{{
in: `{
"_path": "rc/noop",
"a": "potato"
}`,
want: `{
"a": "potato"
}
`,
}, {
in: `{
"_path": "rc/error",
"e": "sausage"
}`,
want: `{
"error": "arbitrary error on input map[e:sausage]",
"input": {
"e": "sausage"
},
"path": "rc/error",
"status": 500
}
`,
}, {
in: `parse error`,
want: `{
"error": "invalid character 'p' looking for beginning of value",
"input": null,
"path": "unknown",
"status": 400
}
`,
}, {
in: `"just a string"`,
want: `{
"error": "json: cannot unmarshal string into Go value of type rc.Params",
"input": null,
"path": "unknown",
"status": 400
}
`,
}} {
got := NewJobFromBytes(ctx, []byte(test.in))
assert.Equal(t, test.want, string(got))
}
}
func TestJobsBatch(t *testing.T) {
ctx := context.Background()
call := rc.Calls.Get("job/batch")
assert.NotNil(t, call)
inJSON := `{
"inputs": [
{
"_path": "rc/noop",
"a": "potato"
},
"bad string",
{
"_path": "rc/noop",
"b": "sausage"
},
{
"_path": "rc/error",
"e": "sausage"
},
{
"_path": "bad/path",
"param": "sausage"
},
{
"_path": "test/needs_request"
},
{
"_path": "test/needs_response"
},
{
"nopath": "BOOM"
}
]
}
`
var in rc.Params
require.NoError(t, json.Unmarshal([]byte(inJSON), &in))
wantJSON := `{
"results": [
{
"a": "potato"
},
{
"error": "\"inputs\" items must be objects not string",
"input": null,
"path": "unknown",
"status": 400
},
{
"b": "sausage"
},
{
"error": "arbitrary error on input map[e:sausage]",
"input": {
"e": "sausage"
},
"path": "rc/error",
"status": 500
},
{
"error": "couldn't find path \"bad/path\"",
"input": {
"param": "sausage"
},
"path": "bad/path",
"status": 404
},
{
"error": "can't run path \"test/needs_request\" as it needs the request",
"input": {},
"path": "test/needs_request",
"status": 400
},
{
"error": "can't run path \"test/needs_response\" as it needs the response",
"input": {},
"path": "test/needs_response",
"status": 400
},
{
"error": "Didn't find key \"_path\" in input",
"input": {
"nopath": "BOOM"
},
"path": "",
"status": 400
}
]
}
`
var want rc.Params
require.NoError(t, json.Unmarshal([]byte(wantJSON), &want))
out, err := call.Fn(ctx, in)
require.NoError(t, err)
var got rc.Params
require.NoError(t, rc.Reshape(&got, out))
assert.Equal(t, want, got)
}
func TestJobsBatchConcurrent(t *testing.T) {
ctx := context.Background()
for concurrency := range 10 {
in := rc.Params{}
var inputs []any
var results []rc.Params
for i := range 100 {
in := map[string]any{
"_path": "rc/noop",
"i": i,
}
inputs = append(inputs, in)
results = append(results, rc.Params{
"i": i,
})
}
in["inputs"] = inputs
want := rc.Params{
"results": results,
}
if concurrency > 0 {
in["concurrency"] = concurrency
}
call := rc.Calls.Get("job/batch")
assert.NotNil(t, call)
got, err := call.Fn(ctx, in)
require.NoError(t, err)
assert.Equal(t, want, got)
}
}

View File

@@ -154,6 +154,37 @@ func (os Options) NonDefault(m configmap.Getter) configmap.Simple {
return nonDefault
}
// NonDefaultRC discovers which config values aren't at their default
//
// It expects a pointer to the current config struct in opts.
//
// It returns the overridden config in rc config format.
func (os Options) NonDefaultRC(opts any) (map[string]any, error) {
items, err := configstruct.Items(opts)
if err != nil {
return nil, err
}
itemsByName := map[string]*configstruct.Item{}
for i := range items {
item := &items[i]
itemsByName[item.Name] = item
}
var nonDefault = map[string]any{}
for i := range os {
opt := &os[i]
item, found := itemsByName[opt.Name]
if !found {
return nil, fmt.Errorf("key %q in OptionsInfo not found in Options struct", opt.Name)
}
value := fmt.Sprint(item.Value)
defaultValue := fmt.Sprint(opt.Default)
if value != defaultValue {
nonDefault[item.Field] = item.Value
}
}
return nonDefault, nil
}
// HasAdvanced discovers if any options have an Advanced setting
func (os Options) HasAdvanced() bool {
for i := range os {

308
fs/registry_test.go Normal file
View File

@@ -0,0 +1,308 @@
package fs
import (
"encoding/json"
"fmt"
"os"
"testing"
"github.com/rclone/rclone/fs/config/configmap"
"github.com/spf13/pflag"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
// Check it satisfies the interface
var _ pflag.Value = (*Option)(nil)
func TestOption(t *testing.T) {
d := &Option{
Name: "potato",
Value: SizeSuffix(17 << 20),
}
assert.Equal(t, "17Mi", d.String())
assert.Equal(t, "SizeSuffix", d.Type())
err := d.Set("18M")
assert.NoError(t, err)
assert.Equal(t, SizeSuffix(18<<20), d.Value)
err = d.Set("sdfsdf")
assert.Error(t, err)
}
// Test options
var (
nouncOption = Option{
Name: "nounc",
}
copyLinksOption = Option{
Name: "copy_links",
Default: false,
NoPrefix: true,
ShortOpt: "L",
Advanced: true,
}
caseInsensitiveOption = Option{
Name: "case_insensitive",
Default: false,
Value: true,
Advanced: true,
}
testOptions = Options{nouncOption, copyLinksOption, caseInsensitiveOption}
)
func TestOptionsSetValues(t *testing.T) {
assert.Nil(t, testOptions[0].Default)
assert.Equal(t, false, testOptions[1].Default)
assert.Equal(t, false, testOptions[2].Default)
testOptions.setValues()
assert.Equal(t, "", testOptions[0].Default)
assert.Equal(t, false, testOptions[1].Default)
assert.Equal(t, false, testOptions[2].Default)
}
func TestOptionsGet(t *testing.T) {
opt := testOptions.Get("copy_links")
assert.Equal(t, &copyLinksOption, opt)
opt = testOptions.Get("not_found")
assert.Nil(t, opt)
}
func TestOptionsOveridden(t *testing.T) {
m := configmap.New()
m1 := configmap.Simple{
"nounc": "m1",
"copy_links": "m1",
}
m.AddGetter(m1, configmap.PriorityNormal)
m2 := configmap.Simple{
"nounc": "m2",
"case_insensitive": "m2",
}
m.AddGetter(m2, configmap.PriorityConfig)
m3 := configmap.Simple{
"nounc": "m3",
}
m.AddGetter(m3, configmap.PriorityDefault)
got := testOptions.Overridden(m)
assert.Equal(t, configmap.Simple{
"copy_links": "m1",
"nounc": "m1",
}, got)
}
func TestOptionsNonDefault(t *testing.T) {
m := configmap.Simple{}
got := testOptions.NonDefault(m)
assert.Equal(t, configmap.Simple{}, got)
m["case_insensitive"] = "false"
got = testOptions.NonDefault(m)
assert.Equal(t, configmap.Simple{}, got)
m["case_insensitive"] = "true"
got = testOptions.NonDefault(m)
assert.Equal(t, configmap.Simple{"case_insensitive": "true"}, got)
}
func TestOptionMarshalJSON(t *testing.T) {
out, err := json.MarshalIndent(&caseInsensitiveOption, "", "")
assert.NoError(t, err)
require.Equal(t, `{
"Name": "case_insensitive",
"FieldName": "",
"Help": "",
"Default": false,
"Value": true,
"Hide": 0,
"Required": false,
"IsPassword": false,
"NoPrefix": false,
"Advanced": true,
"Exclusive": false,
"Sensitive": false,
"DefaultStr": "false",
"ValueStr": "true",
"Type": "bool"
}`, string(out))
}
func TestOptionGetValue(t *testing.T) {
assert.Equal(t, "", nouncOption.GetValue())
assert.Equal(t, false, copyLinksOption.GetValue())
assert.Equal(t, true, caseInsensitiveOption.GetValue())
}
func TestOptionString(t *testing.T) {
assert.Equal(t, "", nouncOption.String())
assert.Equal(t, "false", copyLinksOption.String())
assert.Equal(t, "true", caseInsensitiveOption.String())
}
func TestOptionStringStringArray(t *testing.T) {
opt := Option{
Name: "string_array",
Default: []string(nil),
}
assert.Equal(t, "", opt.String())
opt.Default = []string{}
assert.Equal(t, "", opt.String())
opt.Default = []string{"a", "b"}
assert.Equal(t, "a,b", opt.String())
opt.Default = []string{"hello, world!", "goodbye, world!"}
assert.Equal(t, `"hello, world!","goodbye, world!"`, opt.String())
}
func TestOptionStringSizeSuffix(t *testing.T) {
opt := Option{
Name: "size_suffix",
Default: SizeSuffix(0),
}
assert.Equal(t, "0", opt.String())
opt.Default = SizeSuffix(-1)
assert.Equal(t, "off", opt.String())
opt.Default = SizeSuffix(100)
assert.Equal(t, "100B", opt.String())
opt.Default = SizeSuffix(1024)
assert.Equal(t, "1Ki", opt.String())
}
func TestOptionSet(t *testing.T) {
o := caseInsensitiveOption
assert.Equal(t, true, o.Value)
err := o.Set("FALSE")
assert.NoError(t, err)
assert.Equal(t, false, o.Value)
o = copyLinksOption
assert.Equal(t, nil, o.Value)
err = o.Set("True")
assert.NoError(t, err)
assert.Equal(t, true, o.Value)
err = o.Set("INVALID")
assert.Error(t, err)
assert.Equal(t, true, o.Value)
}
func TestOptionType(t *testing.T) {
assert.Equal(t, "string", nouncOption.Type())
assert.Equal(t, "bool", copyLinksOption.Type())
assert.Equal(t, "bool", caseInsensitiveOption.Type())
}
func TestOptionFlagName(t *testing.T) {
assert.Equal(t, "local-nounc", nouncOption.FlagName("local"))
assert.Equal(t, "copy-links", copyLinksOption.FlagName("local"))
assert.Equal(t, "local-case-insensitive", caseInsensitiveOption.FlagName("local"))
}
func TestOptionEnvVarName(t *testing.T) {
assert.Equal(t, "RCLONE_LOCAL_NOUNC", nouncOption.EnvVarName("local"))
assert.Equal(t, "RCLONE_LOCAL_COPY_LINKS", copyLinksOption.EnvVarName("local"))
assert.Equal(t, "RCLONE_LOCAL_CASE_INSENSITIVE", caseInsensitiveOption.EnvVarName("local"))
}
func TestOptionGetters(t *testing.T) {
// Set up env vars
envVars := [][2]string{
{"RCLONE_CONFIG_LOCAL_POTATO_PIE", "yes"},
{"RCLONE_COPY_LINKS", "TRUE"},
{"RCLONE_LOCAL_NOUNC", "NOUNC"},
}
for _, ev := range envVars {
assert.NoError(t, os.Setenv(ev[0], ev[1]))
}
defer func() {
for _, ev := range envVars {
assert.NoError(t, os.Unsetenv(ev[0]))
}
}()
oldConfigFileGet := ConfigFileGet
ConfigFileGet = func(section, key string) (string, bool) {
if section == "sausage" && key == "key1" {
return "value1", true
}
return "", false
}
defer func() {
ConfigFileGet = oldConfigFileGet
}()
// set up getters
// A configmap.Getter to read from the environment RCLONE_CONFIG_backend_option_name
configEnvVarsGetter := configEnvVars("local")
// A configmap.Getter to read from the environment RCLONE_option_name
optionEnvVarsGetter := optionEnvVars{"local", testOptions}
// A configmap.Getter to read either the default value or the set
// value from the RegInfo.Options
regInfoValuesGetterFalse := &regInfoValues{
options: testOptions,
useDefault: false,
}
regInfoValuesGetterTrue := &regInfoValues{
options: testOptions,
useDefault: true,
}
// A configmap.Setter to read from the config file
configFileGetter := getConfigFile("sausage")
for i, test := range []struct {
get configmap.Getter
key string
wantValue string
wantOk bool
}{
{configEnvVarsGetter, "not_found", "", false},
{configEnvVarsGetter, "potato_pie", "yes", true},
{optionEnvVarsGetter, "not_found", "", false},
{optionEnvVarsGetter, "copy_links", "TRUE", true},
{optionEnvVarsGetter, "nounc", "NOUNC", true},
{optionEnvVarsGetter, "case_insensitive", "", false},
{regInfoValuesGetterFalse, "not_found", "", false},
{regInfoValuesGetterFalse, "case_insensitive", "true", true},
{regInfoValuesGetterFalse, "copy_links", "", false},
{regInfoValuesGetterTrue, "not_found", "", false},
{regInfoValuesGetterTrue, "case_insensitive", "true", true},
{regInfoValuesGetterTrue, "copy_links", "false", true},
{configFileGetter, "not_found", "", false},
{configFileGetter, "key1", "value1", true},
} {
what := fmt.Sprintf("%d: %+v: %q", i, test.get, test.key)
gotValue, gotOk := test.get.Get(test.key)
assert.Equal(t, test.wantValue, gotValue, what)
assert.Equal(t, test.wantOk, gotOk, what)
}
}
func TestOptionsNonDefaultRC(t *testing.T) {
type cfg struct {
X string `config:"x"`
Y int `config:"y"`
}
c := &cfg{X: "a", Y: 6}
opts := Options{
{Name: "x", Default: "a"}, // at default, should be omitted
{Name: "y", Default: 5}, // non-default, should be included
}
got, err := opts.NonDefaultRC(c)
require.NoError(t, err)
require.Equal(t, map[string]any{"Y": 6}, got)
}
func TestOptionsNonDefaultRCMissingKey(t *testing.T) {
type cfg struct {
X string `config:"x"`
}
c := &cfg{X: "a"}
// Options refers to a key not present in the struct -> expect error
opts := Options{{Name: "missing", Default: ""}}
_, err := opts.NonDefaultRC(c)
assert.ErrorContains(t, err, "not found")
}

View File

@@ -14,6 +14,7 @@ import (
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/accounting"
"github.com/rclone/rclone/fs/cluster"
"github.com/rclone/rclone/fs/filter"
"github.com/rclone/rclone/fs/fserrors"
"github.com/rclone/rclone/fs/hash"
@@ -97,6 +98,7 @@ type syncCopyMove struct {
setDirModTimesMaxLevel int // max level of the directories to set
modifiedDirs map[string]struct{} // dirs with changed contents (if s.setDirModTimeAfter)
allowOverlap bool // whether we allow src and dst to overlap (i.e. for convmv)
cluster *cluster.Cluster // non-nil to run sync via cluster
}
// For keeping track of delayed modtime sets
@@ -164,6 +166,7 @@ func newSyncCopyMove(ctx context.Context, fdst, fsrc fs.Fs, deleteMode fs.Delete
setDirModTimeAfter: !ci.NoUpdateDirModTime && (!copyEmptySrcDirs || fsrc.Features().CanHaveEmptyDirectories && fdst.Features().DirModTimeUpdatesOnWrite),
modifiedDirs: make(map[string]struct{}),
allowOverlap: allowOverlap,
cluster: cluster.GetCluster(ctx),
}
s.logger, s.usingLogger = operations.GetLogger(ctx)
@@ -496,13 +499,25 @@ func (s *syncCopyMove) pairCopyOrMove(ctx context.Context, in *pipe, fdst fs.Fs,
dst := pair.Dst
if s.DoMove {
if src != dst {
_, err = operations.MoveTransfer(ctx, fdst, dst, src.Remote(), src)
if s.cluster != nil {
err = s.cluster.Move(ctx, fdst, dst, src.Remote(), src)
} else {
_, err = operations.MoveTransfer(ctx, fdst, dst, src.Remote(), src)
}
} else {
// src == dst signals delete the src
err = operations.DeleteFile(ctx, src)
if s.cluster != nil {
err = s.cluster.DeleteFile(ctx, src)
} else {
err = operations.DeleteFile(ctx, src)
}
}
} else {
_, err = operations.Copy(ctx, fdst, dst, src.Remote(), src)
if s.cluster != nil {
err = s.cluster.Copy(ctx, fdst, dst, src.Remote(), src)
} else {
_, err = operations.Copy(ctx, fdst, dst, src.Remote(), src)
}
}
s.processError(err)
if err != nil {
@@ -539,8 +554,13 @@ func (s *syncCopyMove) startTransfers() {
// This stops the background transfers
func (s *syncCopyMove) stopTransfers() {
s.toBeUploaded.Close()
fs.Debugf(s.fdst, "Waiting for transfers to finish")
s.transfersWg.Wait()
fs.Debugf(s.fdst, "Waiting for transfers to finish")
if s.cluster != nil {
fs.Debugf(s.fdst, "Waiting for cluster transfers to finish")
s.processError(s.cluster.Sync(s.ctx))
fs.Debugf(s.fdst, "Cluster transfers finished")
}
}
// This starts the background renamers.