diff --git a/cmd/cmd.go b/cmd/cmd.go index cc485dd4f..101c7ee7b 100644 --- a/cmd/cmd.go +++ b/cmd/cmd.go @@ -23,6 +23,7 @@ import ( "github.com/rclone/rclone/fs" "github.com/rclone/rclone/fs/accounting" "github.com/rclone/rclone/fs/cache" + "github.com/rclone/rclone/fs/cluster" "github.com/rclone/rclone/fs/config/configfile" "github.com/rclone/rclone/fs/config/configflags" "github.com/rclone/rclone/fs/config/flags" @@ -481,6 +482,22 @@ func initConfig() { } }) } + + // Run as a cluster worker if configured, otherwise ignoring + // the command given on the command line + if ci.Cluster != "" { + if ci.ClusterID == "" || ci.ClusterID == "0" { + fs.Infof(nil, "Running in cluster mode %q as controller", ci.ClusterID) + } else { + fs.Infof(nil, "Running in cluster mode %q as worker with id %q", ci.ClusterID, ci.ClusterID) + worker, err := cluster.NewWorker(ctx) + if err != nil || worker == nil { + fs.Fatalf(nil, "Failed to start cluster worker: %v", err) + } + // Do not continue with the main thread + select {} + } + } } func resolveExitCode(err error) { diff --git a/fs/cluster/cluster.go b/fs/cluster/cluster.go new file mode 100644 index 000000000..2863b49c3 --- /dev/null +++ b/fs/cluster/cluster.go @@ -0,0 +1,498 @@ +// Package cluster implements a machanism to distribute work over a +// cluster of rclone instances. +package cluster + +import ( + "context" + "errors" + "fmt" + "path" + "strings" + "sync" + "time" + + "github.com/rclone/rclone/fs" + "github.com/rclone/rclone/fs/accounting" + "github.com/rclone/rclone/fs/filter" + "github.com/rclone/rclone/fs/operations" + "github.com/rclone/rclone/fs/rc" + "github.com/rclone/rclone/lib/atexit" +) + +// ErrClusterNotConfigured is returned from creation functions. +var ErrClusterNotConfigured = errors.New("cluster is not configured") + +// Cluster describes the workings of the current cluster. +type Cluster struct { + jobs *Jobs + id string + batchFiles int + batchSize fs.SizeSuffix + cleanup fs.ClusterCleanup // how we cleanup cluster files + _config rc.Params // for rc + _filter rc.Params // for rc + cancel func() // stop bg job + wg sync.WaitGroup // bg job finished + quit chan struct{} // signal graceful stop + sync chan chan<- struct{} // sync the current jobs + quitWorkers bool // if set, send workers a stop signal on Shutdown + + mu sync.Mutex + currentBatch Batch + inflight map[string]Batch + shutdown bool +} + +// Batch is a collection of rc tasks to do +type Batch struct { + size int64 // size in batch + Path string `json:"_path"` + Inputs []rc.Params `json:"inputs"` + Config rc.Params `json:"_config,omitempty"` + Filter rc.Params `json:"_filter,omitempty"` + + trs []*accounting.Transfer // transfer for each Input + sizes []int64 // sizes for each Input +} + +// BatchResult has the results of the batch as received. +type BatchResult struct { + Results []rc.Params `json:"results"` + + // Error returns + Error string `json:"error"` + Status int `json:"status"` + Input string `json:"input"` + Path string `json:"path"` +} + +// NewCluster creates a new cluster from the config in ctx. +// +// It may return nil for no cluster is configured. +func NewCluster(ctx context.Context) (*Cluster, error) { + ci := fs.GetConfig(ctx) + if ci.Cluster == "" { + return nil, nil + } + jobs, err := NewJobs(ctx) + if err != nil { + return nil, err + } + c := &Cluster{ + jobs: jobs, + id: ci.ClusterID, + quitWorkers: ci.ClusterQuitWorkers, + batchFiles: ci.ClusterBatchFiles, + batchSize: ci.ClusterBatchSize, + cleanup: ci.ClusterCleanup, + quit: make(chan struct{}), + sync: make(chan chan<- struct{}), + inflight: make(map[string]Batch), + } + + // Configure _config + configParams, err := fs.ConfigOptionsInfo.NonDefaultRC(ci) + if err != nil { + return nil, fmt.Errorf("failed to read global config: %w", err) + } + // Remove any global cluster config + for k := range configParams { + if strings.HasPrefix(k, "Cluster") { + delete(configParams, k) + } + } + if len(configParams) != 0 { + fs.Debugf(nil, "Overridden global config: %#v", configParams) + } + c._config = rc.Params(configParams) + + // Configure _filter + fi := filter.GetConfig(ctx) + if !fi.InActive() { + filterParams, err := filter.OptionsInfo.NonDefaultRC(fi) + if err != nil { + return nil, fmt.Errorf("failed to read filter config: %w", err) + } + fs.Debugf(nil, "Overridden filter config: %#v", filterParams) + c._filter = rc.Params(filterParams) + } + + err = c.jobs.createDirectoryStructure(ctx) + if err != nil { + return nil, err + } + + // Start the background worker + bgCtx, cancel := context.WithCancel(context.Background()) + c.cancel = cancel + c.wg.Add(1) + go c.run(bgCtx) + + fs.Logf(c.jobs.f, "Started cluster master") + + return c, nil +} + +var ( + globalClusterMu sync.Mutex + globalCluster *Cluster +) + +// GetCluster starts or gets a cluster. +// +// If no cluster is configured or the cluster can't be started then it +// returns nil. +func GetCluster(ctx context.Context) *Cluster { + globalClusterMu.Lock() + defer globalClusterMu.Unlock() + + if globalCluster != nil { + return globalCluster + } + + cluster, err := NewCluster(ctx) + if err != nil { + fs.Errorf(nil, "Failed to start cluster: %v", err) + return nil + } + if cluster != nil { + atexit.Register(func() { + err := cluster.Shutdown(context.Background()) + if err != nil { + fs.Errorf(nil, "Failed to shutdown cluster: %v", err) + } + }) + } + + globalCluster = cluster + return globalCluster +} + +// Send the current batch for processing +// +// call with c.mu held +func (c *Cluster) sendBatch(ctx context.Context) (err error) { + // Do nothing if the batch is empty + if len(c.currentBatch.Inputs) == 0 { + return nil + } + + // Get and reset current batch + b := c.currentBatch + c.currentBatch = Batch{} + + b.Path = "job/batch" + b.Config = c._config + b.Filter = c._filter + + // write the pending job + name, err := c.jobs.writeJob(ctx, clusterPending, &b) + if err != nil { + return err + } + + fs.Infof(name, "written cluster batch file") + c.inflight[name] = b + return nil +} + +// Add the command to the current batch +func (c *Cluster) addToBatch(ctx context.Context, obj fs.Object, in rc.Params, size int64, tr *accounting.Transfer) (err error) { + c.mu.Lock() + defer c.mu.Unlock() + if c.shutdown { + return errors.New("internal error: can't add file to Shutdown cluster") + } + + c.currentBatch.Inputs = append(c.currentBatch.Inputs, in) + c.currentBatch.size += size + c.currentBatch.trs = append(c.currentBatch.trs, tr) + c.currentBatch.sizes = append(c.currentBatch.sizes, size) + + if c.currentBatch.size >= int64(c.batchSize) || len(c.currentBatch.Inputs) >= c.batchFiles { + err = c.sendBatch(ctx) + if err != nil { + return err + } + } + return nil +} + +// Move does operations.Move via the cluster. +// +// Move src object to dst or fdst if nil. If dst is nil then it uses +// remote as the name of the new object. +func (c *Cluster) Move(ctx context.Context, fdst fs.Fs, dst fs.Object, remote string, src fs.Object) (err error) { + tr := accounting.Stats(ctx).NewTransfer(src, fdst) + if operations.SkipDestructive(ctx, src, "cluster move") { + in := tr.Account(ctx, nil) + in.DryRun(src.Size()) + tr.Done(ctx, nil) + return nil + } + fsrc, ok := src.Fs().(fs.Fs) + if !ok { + err = errors.New("internal error: cluster move: can't cast src.Fs() to fs.Fs") + tr.Done(ctx, err) + return err + } + in := rc.Params{ + "_path": "operations/movefile", + "dstFs": fs.ConfigStringFull(fdst), + "dstRemote": remote, + "srcFs": fs.ConfigStringFull(fsrc), + "srcRemote": src.Remote(), + } + if dst != nil { + in["dstRemote"] = dst.Remote() + } + return c.addToBatch(ctx, src, in, src.Size(), tr) +} + +// Copy does operations.Copy via the cluster. +// +// Copy src object to dst or fdst if nil. If dst is nil then it uses +// remote as the name of the new object. +func (c *Cluster) Copy(ctx context.Context, fdst fs.Fs, dst fs.Object, remote string, src fs.Object) (err error) { + tr := accounting.Stats(ctx).NewTransfer(src, fdst) + if operations.SkipDestructive(ctx, src, "cluster copy") { + in := tr.Account(ctx, nil) + in.DryRun(src.Size()) + tr.Done(ctx, nil) + return nil + } + fsrc, ok := src.Fs().(fs.Fs) + if !ok { + err = errors.New("internal error: cluster copy: can't cast src.Fs() to fs.Fs") + tr.Done(ctx, err) + return err + } + in := rc.Params{ + "_path": "operations/copyfile", + "dstFs": fs.ConfigStringFull(fdst), + "dstRemote": remote, + "srcFs": fs.ConfigStringFull(fsrc), + "srcRemote": src.Remote(), + } + if dst != nil { + in["dstRemote"] = dst.Remote() + } + return c.addToBatch(ctx, src, in, src.Size(), tr) +} + +// DeleteFile does operations.DeleteFile via the cluster +// +// If useBackupDir is set and --backup-dir is in effect then it moves +// the file to there instead of deleting +func (c *Cluster) DeleteFile(ctx context.Context, dst fs.Object) (err error) { + tr := accounting.Stats(ctx).NewCheckingTransfer(dst, "deleting") + err = accounting.Stats(ctx).DeleteFile(ctx, dst.Size()) + if err != nil { + tr.Done(ctx, err) + return err + } + if operations.SkipDestructive(ctx, dst, "cluster delete") { + tr.Done(ctx, nil) + return + } + fdst, ok := dst.Fs().(fs.Fs) + if !ok { + err = errors.New("internal error: cluster delete: can't cast dst.Fs() to fs.Fs") + tr.Done(ctx, nil) + return err + } + in := rc.Params{ + "_path": "operations/deletefile", + "fs": fs.ConfigStringFull(fdst), + "remote": dst.Remote(), + } + return c.addToBatch(ctx, dst, in, 0, tr) +} + +// processCompletedJob loads the job and checks it off +func (c *Cluster) processCompletedJob(ctx context.Context, obj fs.Object) error { + name := path.Base(obj.Remote()) + name, _ = strings.CutSuffix(name, ".json") + fs.Debugf(nil, "cluster: processing completed job %q", name) + + var output BatchResult + err := c.jobs.readJob(ctx, obj, &output) + if err != nil { + return fmt.Errorf("check jobs read: %w", err) + } + + c.mu.Lock() + input, ok := c.inflight[name] + // FIXME delete or save job + if !ok { + for k := range c.inflight { + fs.Debugf(nil, "key %q", k) + } + c.mu.Unlock() + return fmt.Errorf("check jobs: job %q not found", name) + } + c.mu.Unlock() + + // Delete the inflight entry when batch is processed + defer func() { + c.mu.Lock() + delete(c.inflight, name) + c.mu.Unlock() + }() + + // Check job + if output.Error != "" { + return fmt.Errorf("cluster: failed to run batch job: %s (%d)", output.Error, output.Status) + } + if len(input.Inputs) != len(output.Results) { + return fmt.Errorf("cluster: input had %d jobs but output had %d", len(input.Inputs), len(output.Results)) + } + + // Run through the batch and mark operations as successful or not + for i := range input.Inputs { + in := input.Inputs[i] + tr := input.trs[i] + size := input.sizes[i] + out := output.Results[i] + errorString, hasError := out["error"] + var err error + if hasError && errorString != "" { + err = fmt.Errorf("cluster: worker error: %s (%v)", errorString, out["status"]) + } + if err == nil && in["_path"] == "operations/movefile" { + accounting.Stats(ctx).Renames(1) + } + acc := tr.Account(ctx, nil) + acc.AccountReadN(size) + tr.Done(ctx, err) + remote, ok := in["dstRemote"] + if !ok { + remote = in["remote"] + } + if err == nil { + fs.Infof(remote, "cluster %s successful", in["_path"]) + } else { + fs.Errorf(remote, "cluster %s failed: %v", in["_path"], err) + } + } + + return nil +} + +// checkJobs sees if there are any completed jobs +func (c *Cluster) checkJobs(ctx context.Context) { + objs, err := c.jobs.listDir(ctx, clusterDone) + if err != nil { + fs.Errorf(nil, "cluster: get completed job list failed: %v", err) + return + } + for _, obj := range objs { + err := c.processCompletedJob(ctx, obj) + status := "output-ok" + ok := true + if err != nil { + status = "output-failed" + ok = false + fs.Errorf(nil, "cluster: process completed job failed: %v", err) + } + c.jobs.finish(ctx, obj, status, ok) + } +} + +// Run the background process +func (c *Cluster) run(ctx context.Context) { + defer c.wg.Done() + checkJobs := time.NewTicker(clusterCheckJobsInterval) + defer checkJobs.Stop() + var syncedChans []chan<- struct{} + for { + select { + case <-ctx.Done(): + return + case <-c.quit: + fs.Debugf(nil, "cluster: quit request received") + return + case synced := <-c.sync: + syncedChans = append(syncedChans, synced) + fs.Debugf(nil, "cluster: sync request received") + case <-checkJobs.C: + } + c.checkJobs(ctx) + if len(syncedChans) > 0 { + c.mu.Lock() + n := len(c.inflight) + c.mu.Unlock() + if n == 0 { + fs.Debugf(nil, "cluster: synced") + for _, synced := range syncedChans { + synced <- struct{}{} + } + syncedChans = nil + } + } + } +} + +// Sync the cluster. +// +// Call this when all job items have been added to the cluster. +// +// This will wait for any outstanding jobs to finish regardless of who +// put them in +func (c *Cluster) Sync(ctx context.Context) error { + // Flush any outstanding + c.mu.Lock() + err := c.sendBatch(ctx) + c.mu.Unlock() + + // Wait for the cluster to be empty + done := make(chan struct{}) + c.sync <- done + <-done + + return err +} + +// Shutdown the cluster. +// +// Call this when all job items have been added to the cluster. +// +// This will wait for any outstanding jobs to finish. +func (c *Cluster) Shutdown(ctx context.Context) (err error) { + c.mu.Lock() + inBatch := len(c.currentBatch.Inputs) + inFlight := len(c.inflight) + shutdown := c.shutdown + c.shutdown = true + c.mu.Unlock() + + if inBatch > 0 { + err = errors.Join(nil, fmt.Errorf("%d items batched on cluster shutdown", inBatch)) + } + if inFlight > 0 { + err = errors.Join(nil, fmt.Errorf("%d items in flight on cluster shutdown", inFlight)) + } + if shutdown { + fs.Debugf(nil, "cluster: already shutdown") + return nil + } + c.quit <- struct{}{} + fs.Debugf(nil, "Waiting for cluster to finish") + c.wg.Wait() + + // Send a quit job + if c.quitWorkers { + fs.Logf(nil, "Sending quit to workers") + quitErr := c.jobs.writeQuitJob(ctx, clusterPending) + if quitErr != nil { + err = errors.Join(err, fmt.Errorf("shutdown quit: %w", quitErr)) + } + } + return err +} + +// Abort the cluster and any outstanding jobs. +func (c *Cluster) Abort() { + c.cancel() + c.wg.Wait() +} diff --git a/fs/cluster/jobs.go b/fs/cluster/jobs.go new file mode 100644 index 000000000..7057c88da --- /dev/null +++ b/fs/cluster/jobs.go @@ -0,0 +1,293 @@ +package cluster + +import ( + "bytes" + "cmp" + "context" + "encoding/json" + "errors" + "fmt" + "io" + "os" + "path" + "slices" + "strings" + "time" + + "github.com/rclone/rclone/fs" + "github.com/rclone/rclone/fs/cache" + "github.com/rclone/rclone/fs/hash" + "github.com/rclone/rclone/fs/object" + "github.com/rclone/rclone/fs/operations" + "github.com/rclone/rclone/lib/atexit" + "github.com/rclone/rclone/lib/pacer" + "github.com/rclone/rclone/lib/random" +) + +// Batches flow from queue/pending to queue/processing/ +const ( + clusterQueue = "queue" + clusterPending = clusterQueue + "/pending" + clusterProcessing = clusterQueue + "/processing" + clusterDone = clusterQueue + "/done" + clusterFinished = clusterQueue + "/finished" + + minSleep = 10 * time.Millisecond + maxSleep = 2 * time.Second + decayConstant = 2 // bigger for slower decay, exponential + + // Read the queue this often + clusterCheckJobsInterval = time.Second + + // Name of job which signals to the workers to quit + quitJob = "QUIT" +) + +// Jobs is a container for sending and receiving jobs to the cluster. +type Jobs struct { + remote string // remote for job storage + f fs.Fs // cluster remote storage + partial bool // do we need to write and rename + hasMove bool // set if f has server side move otherwise has server side copy + cleanup fs.ClusterCleanup // how we cleanup the cluster files + pacer *fs.Pacer // To pace the API calls +} + +// NewJobs creates a Jobs source from the config in ctx. +// +// It may return nil for no cluster is configured. +func NewJobs(ctx context.Context) (*Jobs, error) { + ci := fs.GetConfig(ctx) + if ci.Cluster == "" { + return nil, nil + } + f, err := cache.Get(ctx, ci.Cluster) + if err != nil { + return nil, fmt.Errorf("cluster remote creation: %w", err) + } + features := f.Features() + if features.Move == nil && features.Copy == nil { + return nil, fmt.Errorf("cluster remote must have server side move and %q doesn't", ci.Cluster) + } + jobs := &Jobs{ + remote: ci.Cluster, + f: f, + partial: features.PartialUploads, + hasMove: features.Move != nil, + pacer: fs.NewPacer(ctx, pacer.NewDefault(pacer.MinSleep(minSleep), pacer.MaxSleep(maxSleep), pacer.DecayConstant(decayConstant))), + cleanup: ci.ClusterCleanup, + } + return jobs, nil +} + +// Create the cluster directory structure +func (jobs *Jobs) createDirectoryStructure(ctx context.Context) (err error) { + for _, dir := range []string{clusterPending, clusterProcessing, clusterDone, clusterFinished} { + err = jobs.f.Mkdir(ctx, dir) + if err != nil { + return fmt.Errorf("cluster mkdir %q: %w", dir, err) + } + } + return nil +} + +// rename a file +// +// if this returns fs.ErrorObjectNotFound then the file has already been renamed. +func (jobs *Jobs) rename(ctx context.Context, src fs.Object, dstRemote string) (dst fs.Object, err error) { + features := jobs.f.Features() + if jobs.hasMove { + dst, err = features.Move(ctx, src, dstRemote) + if err != nil { + return nil, fmt.Errorf("cluster: failed to rename job file: %w", err) + } + } else { + dst, err = features.Copy(ctx, src, dstRemote) + if err != nil { + return nil, fmt.Errorf("cluster: failed to rename (copy phase) job file: %w", err) + } + err = src.Remove(ctx) + if err != nil { + return nil, fmt.Errorf("cluster: failed to rename (delete phase) job file: %w", err) + } + } + return dst, nil +} + +// Finish with a jobs file +func (jobs *Jobs) finish(ctx context.Context, obj fs.Object, status string, ok bool) { + var err error + if (ok && jobs.cleanup == fs.ClusterCleanupCompleted) || jobs.cleanup == fs.ClusterCleanupFull { + err = obj.Remove(ctx) + } else { + name := path.Join(clusterFinished, status, path.Base(obj.Remote())) + _, err = jobs.rename(ctx, obj, name) + } + if err != nil { + fs.Errorf(nil, "cluster: removing completed job failed: %v", err) + } +} + +// write buf into remote +func (jobs *Jobs) writeFile(ctx context.Context, remote string, modTime time.Time, buf []byte) error { + partialRemote := remote + if jobs.partial { + partialRemote = remote + ".partial" + } + // Calculate hashes + w, err := hash.NewMultiHasherTypes(jobs.f.Hashes()) + if err != nil { + return err + } + _, err = w.Write(buf) + if err != nil { + return err + } + obji := object.NewStaticObjectInfo(partialRemote, modTime, int64(len(buf)), true, w.Sums(), jobs.f) + var obj fs.Object + err = jobs.pacer.Call(func() (bool, error) { + in := bytes.NewBuffer(buf) + obj, err = jobs.f.Put(ctx, in, obji) + if err != nil { + return true, fmt.Errorf("cluster: failed to write %q: %q", remote, err) + } + return false, nil + }) + if err != nil { + return err + } + if jobs.partial { + obj, err = jobs.rename(ctx, obj, remote) + if err != nil { + return err + } + } + return nil +} + +// write a job to a file returning the name +func (jobs *Jobs) writeJob(ctx context.Context, where string, job any) (name string, err error) { + now := time.Now().UTC() + name = fmt.Sprintf("%s-%s", now.Format(time.RFC3339Nano), random.String(20)) + remote := path.Join(where, name+".json") + buf, err := json.MarshalIndent(job, "", "\t") + if err != nil { + return "", fmt.Errorf("cluster: job json: %w", err) + } + err = jobs.writeFile(ctx, remote, now, buf) + if err != nil { + return "", fmt.Errorf("cluster: job write: %w", err) + } + return name, nil +} + +// write a quit job to a file +func (jobs *Jobs) writeQuitJob(ctx context.Context, where string) (err error) { + now := time.Now().UTC() + remote := path.Join(where, quitJob+".json") + err = jobs.writeFile(ctx, remote, now, []byte("{}")) + if err != nil { + return fmt.Errorf("cluster: quit job write: %w", err) + } + return nil +} + +// read buf from object +func (jobs *Jobs) readFile(ctx context.Context, o fs.Object) (buf []byte, err error) { + err = jobs.pacer.Call(func() (bool, error) { + in, err := operations.Open(ctx, o) + if err != nil { + return true, fmt.Errorf("cluster: failed to open %q: %w", o, err) + } + buf, err = io.ReadAll(in) + if err != nil { + return true, fmt.Errorf("cluster: failed to read %q: %w", o, err) + } + err = in.Close() + if err != nil { + return true, fmt.Errorf("cluster: failed to close %q: %w", o, err) + } + return false, nil + }) + if err != nil { + return nil, err + } + return buf, nil +} + +// read a job from a file +// +// job should be a pointer to something to be unmarshalled +func (jobs *Jobs) readJob(ctx context.Context, obj fs.Object, job any) error { + buf, err := jobs.readFile(ctx, obj) + if err != nil { + return fmt.Errorf("cluster: job read: %w", err) + } + err = json.Unmarshal(buf, job) + if err != nil { + return fmt.Errorf("cluster: job read json: %w", err) + } + return nil +} + +// lists the json files in a cluster directory +func (jobs *Jobs) listDir(ctx context.Context, dir string) (objects []fs.Object, err error) { + entries, err := jobs.f.List(ctx, dir) + if err != nil { + return nil, fmt.Errorf("cluster: failed to list %q: %w", dir, err) + } + entries.ForObject(func(o fs.Object) { + if strings.HasSuffix(o.Remote(), ".json") { + objects = append(objects, o) + } + }) + slices.SortStableFunc(objects, func(a, b fs.Object) int { + return cmp.Compare(a.Remote(), b.Remote()) + }) + return objects, nil +} + +// get a job from pending if there is one available. +// +// Returns a nil object if no jobs are available. +// +// FIXME should mark jobs as error jobs in here if they can't be read properly? +func (jobs *Jobs) getJob(ctx context.Context, id string) (name string, obj fs.Object, err error) { + objs, err := jobs.listDir(ctx, clusterPending) + if err != nil { + return "", nil, fmt.Errorf("get job list: %w", err) + } + quit := false + for len(objs) > 0 { + obj = objs[0] + objs = objs[1:] + name = path.Base(obj.Remote()) + name, _ = strings.CutSuffix(name, ".json") + + // See if we have been asked to quit + if name == quitJob { + quit = true + continue + } + + // claim the job + newName := fmt.Sprintf("%s-%s.json", name, id) + newRemote := path.Join(clusterProcessing, newName) + obj, err = jobs.rename(ctx, obj, newRemote) + if errors.Is(err, fs.ErrorObjectNotFound) { + // claim failed - try again + continue + } + if err != nil { + return "", nil, fmt.Errorf("get job claim: %w", err) + } + return name, obj, nil + } + // No jobs found + if quit { + fs.Logf(nil, "Exiting cluster worker on command") + atexit.Run() + os.Exit(0) + } + return "", nil, nil +} diff --git a/fs/cluster/worker.go b/fs/cluster/worker.go new file mode 100644 index 000000000..7c5a5f592 --- /dev/null +++ b/fs/cluster/worker.go @@ -0,0 +1,101 @@ +package cluster + +import ( + "context" + "path" + "sync" + "time" + + "github.com/rclone/rclone/fs" + "github.com/rclone/rclone/fs/rc/jobs" + "github.com/rclone/rclone/lib/random" +) + +// Worker describes a single instance of a cluster worker. +type Worker struct { + jobs *Jobs + cancel func() // stop bg job + wg sync.WaitGroup // bg job finished + id string // id of this worker +} + +// NewWorker creates a new cluster from the config in ctx. +// +// It may return nil for no cluster is configured. +func NewWorker(ctx context.Context) (*Worker, error) { + ci := fs.GetConfig(ctx) + if ci.Cluster == "" { + return nil, nil + } + jobs, err := NewJobs(ctx) + if err != nil { + return nil, err + } + w := &Worker{ + jobs: jobs, + id: ci.ClusterID, + } + if w.id == "" { + w.id = random.String(10) + } + + // Start the background worker + bgCtx, cancel := context.WithCancel(context.Background()) + w.cancel = cancel + w.wg.Add(1) + go w.run(bgCtx) + + fs.Logf(w.jobs.f, "Started cluster worker") + + return w, nil +} + +// Check to see if a job exists and run it if available +func (w *Worker) checkJobs(ctx context.Context) { + name, obj, err := w.jobs.getJob(ctx, w.id) + if err != nil { + fs.Errorf(nil, "check jobs get: %v", err) + return + } + if obj == nil { + return // no jobs available + } + fs.Debugf(nil, "cluster: processing pending job %q", name) + inBuf, err := w.jobs.readFile(ctx, obj) + if err != nil { + fs.Errorf(nil, "check jobs read: %v", err) + w.jobs.finish(ctx, obj, "input-error", false) + return + } + w.jobs.finish(ctx, obj, "input-ok", true) + outBuf := jobs.NewJobFromBytes(ctx, inBuf) + remote := path.Join(clusterDone, name+".json") + err = w.jobs.writeFile(ctx, remote, time.Now(), outBuf) + if err != nil { + fs.Errorf(nil, "check jobs failed to write output: %v", err) + return + } + fs.Debugf(nil, "cluster: processed pending job %q", name) +} + +// Run the background process +func (w *Worker) run(ctx context.Context) { + defer w.wg.Done() + checkJobs := time.NewTicker(clusterCheckJobsInterval) + defer checkJobs.Stop() + for { + select { + case <-ctx.Done(): + return + case <-checkJobs.C: + w.checkJobs(ctx) + } + } +} + +// Shutdown the worker regardless of whether it has work to process or not. +func (w *Worker) Shutdown(ctx context.Context) error { + w.cancel() + w.wg.Wait() + return nil +} diff --git a/fs/config.go b/fs/config.go index ce0a1df9e..d2e51cec6 100644 --- a/fs/config.go +++ b/fs/config.go @@ -50,6 +50,34 @@ var ( ConfigEdit = "config_fs_edit" ) +// ClusterCleanup describes the cluster cleanup choices. +type ClusterCleanup = Enum[clusterCleanupChoices] + +// Cluster cleanup choices. +// +// ClusterCleanupNone don't remove any cluster files +// ClusterCleanupCompleted remove successfully completed jobs +// ClusterCleanupFull remove everything on exit +const ( + ClusterCleanupNone ClusterCleanup = iota + ClusterCleanupCompleted + ClusterCleanupFull +) + +type clusterCleanupChoices struct{} + +func (clusterCleanupChoices) Choices() []string { + return []string{ + ClusterCleanupNone: "none", + ClusterCleanupCompleted: "completed", + ClusterCleanupFull: "full", + } +} + +func (clusterCleanupChoices) Type() string { + return "ClusterCleanup" +} + // ConfigOptionsInfo describes the Options in use var ConfigOptionsInfo = Options{{ Name: "modify_window", @@ -566,6 +594,36 @@ var ConfigOptionsInfo = Options{{ Default: "", Help: "HTTP proxy URL.", Groups: "Networking", +}, { + Name: "cluster", + Default: "", + Help: "Enable cluster mode with remote to use as shared storage.", + Groups: "Networking", +}, { + Name: "cluster_id", + Default: "", + Help: "Set to an ID for the cluster. An ID of 0 or empty becomes the controller.", + Groups: "Networking", +}, { + Name: "cluster_quit_workers", + Default: false, + Help: "Set to cause the controller to quit the workers when it finished.", + Groups: "Networking", +}, { + Name: "cluster_batch_files", + Default: 1000, + Help: "Max number of files for a cluster batch.", + Groups: "Networking", +}, { + Name: "cluster_batch_size", + Default: Tebi, + Help: "Max size of files for a cluster batch.", + Groups: "Networking", +}, { + Name: "cluster_cleanup", + Default: ClusterCleanupFull, + Help: "Control which cluster files get cleaned up.", + Groups: "Networking", }} // ConfigInfo is filesystem config options @@ -680,6 +738,12 @@ type ConfigInfo struct { MaxConnections int `config:"max_connections"` NameTransform []string `config:"name_transform"` HTTPProxy string `config:"http_proxy"` + Cluster string `config:"cluster"` + ClusterID string `config:"cluster_id"` + ClusterQuitWorkers bool `config:"cluster_quit_workers"` + ClusterBatchFiles int `config:"cluster_batch_files"` + ClusterBatchSize SizeSuffix `config:"cluster_batch_size"` + ClusterCleanup ClusterCleanup `config:"cluster_cleanup"` } func init() {