diff --git a/docs/content/cluster.md b/docs/content/cluster.md index 3b587b3f4..e6cf93275 100644 --- a/docs/content/cluster.md +++ b/docs/content/cluster.md @@ -119,6 +119,10 @@ The controller only sends transfer jobs to the workers. All the other tasks (eg listing, comparing) are done by the controller. The controller does not execute any transfer tasks itself. +The controller reads worker status as written to `queue/status` and +will detect workers which have stopped. If it detects a failed worker +then it will re-assign any outstanding work. + ## Workers The workers job is entirely to act as API endpoints that receive their @@ -137,6 +141,27 @@ work via files in `/work`. Then allows it. - Repeat +Every second the worker will write a status file in `queue/status` to +be read by the controller. + +## Layout of the work directory + +The format of the files in this directory may change without notice +but the layout is documented here as it can help debugging. + +```text +/work - root of the work directory +└── queue - files to control the queue + ├── done - job files that are finished and read + ├── finished - job files that are finished but not yet read + ├── pending - job files that are not started yet + ├── processing - job files that are running + └── status - worker status files +``` + +If debugging use `--cluster-cleanup none` to leave the completed files +in the directory layout. + ## Flags ### --cluster string diff --git a/fs/cluster/cluster.go b/fs/cluster/cluster.go index 2863b49c3..2ea548246 100644 --- a/fs/cluster/cluster.go +++ b/fs/cluster/cluster.go @@ -4,6 +4,7 @@ package cluster import ( "context" + "encoding/json" "errors" "fmt" "path" @@ -17,11 +18,17 @@ import ( "github.com/rclone/rclone/fs/operations" "github.com/rclone/rclone/fs/rc" "github.com/rclone/rclone/lib/atexit" + "github.com/rclone/rclone/lib/errcount" + "golang.org/x/sync/errgroup" ) // ErrClusterNotConfigured is returned from creation functions. var ErrClusterNotConfigured = errors.New("cluster is not configured") +// If we don't hear from workers in this time we assume they have timed out +// and re-assign their jobs. +const workerTimeout = 2 * time.Second + // Cluster describes the workings of the current cluster. type Cluster struct { jobs *Jobs @@ -37,6 +44,9 @@ type Cluster struct { sync chan chan<- struct{} // sync the current jobs quitWorkers bool // if set, send workers a stop signal on Shutdown + workers map[string]*WorkerStatus // worker ID => status + deadWorkers map[string]struct{} + mu sync.Mutex currentBatch Batch inflight map[string]Batch @@ -88,6 +98,8 @@ func NewCluster(ctx context.Context) (*Cluster, error) { quit: make(chan struct{}), sync: make(chan chan<- struct{}), inflight: make(map[string]Batch), + workers: make(map[string]*WorkerStatus), + deadWorkers: make(map[string]struct{}), } // Configure _config @@ -379,6 +391,90 @@ func (c *Cluster) processCompletedJob(ctx context.Context, obj fs.Object) error return nil } +// loadWorkerStatus updates the worker status +func (c *Cluster) loadWorkerStatus(ctx context.Context) error { + objs, err := c.jobs.listDir(ctx, clusterStatus) + if err != nil { + return fmt.Errorf("cluster: get job status list failed: %w", err) + } + ec := errcount.New() + g, gCtx := errgroup.WithContext(ctx) + var mu sync.Mutex + for _, obj := range objs { + g.Go(func() error { + buf, err := c.jobs.readFile(gCtx, obj) + if err != nil { + ec.Add(fmt.Errorf("read object: %w", err)) + return nil + } + workerStatus := new(WorkerStatus) + err = json.Unmarshal(buf, workerStatus) + if err != nil { + ec.Add(fmt.Errorf("status json: %w", err)) + return nil + } + mu.Lock() + c.workers[workerStatus.ID] = workerStatus + mu.Unlock() + return nil + }) + } + return ec.Err("cluster: load status") +} + +// checkWorkers loads the worker status +func (c *Cluster) checkWorkers(ctx context.Context) { + err := c.loadWorkerStatus(ctx) + if err != nil { + fs.Errorf(nil, "failed to read some worker status: %v", err) + } + for workerID, status := range c.workers { + timeSinceUpdated := time.Since(status.Updated) + if timeSinceUpdated > workerTimeout { + if _, isDead := c.deadWorkers[workerID]; isDead { + continue + } + fs.Errorf(nil, "cluster: haven't heard from worker %q for %v - assuming dead", workerID, timeSinceUpdated) + // Find any jobs claimed by worker and restart + objs, err := c.jobs.listDir(ctx, clusterProcessing) + if err != nil { + fs.Errorf(nil, "cluster: failed to find pending jobs: %v", err) + continue + } + for _, obj := range objs { + fs.Errorf(obj, "cluster: checking job") + // Jobs are named {jobID}-{workerID}.json + name := strings.TrimSuffix(path.Base(obj.Remote()), ".json") + dash := strings.LastIndex(name, "-") + if dash < 0 { + fs.Errorf(nil, "cluster: failed to find dash in job %q", name) + continue + } + jobID, jobWorkerID := name[:dash], name[dash+1:] + fs.Errorf(obj, "cluster: checking jobID %q, workerID %q", jobID, jobWorkerID) + if workerID != jobWorkerID { + fs.Debugf(nil, "cluster: job %q doesn't match %q", jobWorkerID, workerID) + continue + } + // Found a job running on worker - rename it back to Pending + newRemote := path.Join(clusterPending, jobID+".json") + _, err = c.jobs.rename(ctx, obj, newRemote) + if err != nil { + fs.Errorf(nil, "cluster: failed to restart job %q: %v", jobID, err) + continue + } + fs.Errorf(nil, "cluster: restarted job %q", jobID) + } + c.deadWorkers[workerID] = struct{}{} + } else { + if _, isDead := c.deadWorkers[workerID]; isDead { + fs.Errorf(nil, "cluster: dead worker %q came back to life!", workerID) + delete(c.deadWorkers, workerID) + } + } + } +} + // checkJobs sees if there are any completed jobs func (c *Cluster) checkJobs(ctx context.Context) { objs, err := c.jobs.listDir(ctx, clusterDone) @@ -404,6 +500,8 @@ func (c *Cluster) run(ctx context.Context) { defer c.wg.Done() checkJobs := time.NewTicker(clusterCheckJobsInterval) defer checkJobs.Stop() + checkWorkers := time.NewTicker(clusterCheckWorkersInterval) + defer checkWorkers.Stop() var syncedChans []chan<- struct{} for { select { @@ -415,6 +513,8 @@ func (c *Cluster) run(ctx context.Context) { case synced := <-c.sync: syncedChans = append(syncedChans, synced) fs.Debugf(nil, "cluster: sync request received") + case <-checkWorkers.C: + c.checkWorkers(ctx) case <-checkJobs.C: } c.checkJobs(ctx) diff --git a/fs/cluster/jobs.go b/fs/cluster/jobs.go index 7057c88da..7b0289e04 100644 --- a/fs/cluster/jobs.go +++ b/fs/cluster/jobs.go @@ -31,6 +31,7 @@ const ( clusterProcessing = clusterQueue + "/processing" clusterDone = clusterQueue + "/done" clusterFinished = clusterQueue + "/finished" + clusterStatus = clusterQueue + "/status" minSleep = 10 * time.Millisecond maxSleep = 2 * time.Second @@ -39,6 +40,12 @@ const ( // Read the queue this often clusterCheckJobsInterval = time.Second + // Write the worker status this often + clusterWriteStatusInterval = time.Second + + // Read the worker status this often + clusterCheckWorkersInterval = time.Second + // Name of job which signals to the workers to quit quitJob = "QUIT" ) @@ -82,7 +89,7 @@ func NewJobs(ctx context.Context) (*Jobs, error) { // Create the cluster directory structure func (jobs *Jobs) createDirectoryStructure(ctx context.Context) (err error) { - for _, dir := range []string{clusterPending, clusterProcessing, clusterDone, clusterFinished} { + for _, dir := range []string{clusterPending, clusterProcessing, clusterDone, clusterFinished, clusterStatus} { err = jobs.f.Mkdir(ctx, dir) if err != nil { return fmt.Errorf("cluster mkdir %q: %w", dir, err) @@ -165,6 +172,17 @@ func (jobs *Jobs) writeFile(ctx context.Context, remote string, modTime time.Tim return nil } +// Remove the file if it exists +func (jobs *Jobs) removeFile(ctx context.Context, remote string) error { + obj, err := jobs.f.NewObject(ctx, remote) + if errors.Is(err, fs.ErrorObjectNotFound) || errors.Is(err, fs.ErrorDirNotFound) { + return nil + } else if err != nil { + return err + } + return obj.Remove(ctx) +} + // write a job to a file returning the name func (jobs *Jobs) writeJob(ctx context.Context, where string, job any) (name string, err error) { now := time.Now().UTC() diff --git a/fs/cluster/worker.go b/fs/cluster/worker.go index 7c5a5f592..494403633 100644 --- a/fs/cluster/worker.go +++ b/fs/cluster/worker.go @@ -2,21 +2,40 @@ package cluster import ( "context" + "encoding/json" "path" "sync" "time" "github.com/rclone/rclone/fs" + "github.com/rclone/rclone/fs/accounting" + "github.com/rclone/rclone/fs/rc" "github.com/rclone/rclone/fs/rc/jobs" "github.com/rclone/rclone/lib/random" ) +const maxWorkersDone = 16 // maximum jobs in the done list + // Worker describes a single instance of a cluster worker. type Worker struct { jobs *Jobs cancel func() // stop bg job wg sync.WaitGroup // bg job finished id string // id of this worker + status string // place it stores it status + + jobsMu sync.Mutex + running map[string]struct{} // IDs of the jobs being processed + done []string // IDs of finished jobs +} + +// WorkerStatus shows the status of this worker including jobs +// running. +type WorkerStatus struct { + ID string `json:"id"` + Running map[string]rc.Params `json:"running"` // Job ID => accounting.RemoteStats + Done map[string]bool `json:"done"` // Job ID => finished status + Updated time.Time `json:"updated"` } // NewWorker creates a new cluster from the config in ctx. @@ -32,18 +51,22 @@ func NewWorker(ctx context.Context) (*Worker, error) { return nil, err } w := &Worker{ - jobs: jobs, - id: ci.ClusterID, + jobs: jobs, + id: ci.ClusterID, + running: make(map[string]struct{}), } if w.id == "" { w.id = random.String(10) } + w.status = path.Join(clusterStatus, w.id+".json") - // Start the background worker + // Start the background workers bgCtx, cancel := context.WithCancel(context.Background()) w.cancel = cancel w.wg.Add(1) - go w.run(bgCtx) + go w.runJobs(bgCtx) + w.wg.Add(1) + go w.runStatus(bgCtx) fs.Logf(w.jobs.f, "Started cluster worker") @@ -60,6 +83,27 @@ func (w *Worker) checkJobs(ctx context.Context) { if obj == nil { return // no jobs available } + + // make a stats group for this job + ctx = accounting.WithStatsGroup(ctx, name) + + // Add job ID + w.jobsMu.Lock() + w.running[name] = struct{}{} + w.jobsMu.Unlock() + fs.Infof(nil, "write jobID %q", name) + + // Remove job ID on exit + defer func() { + w.jobsMu.Lock() + delete(w.running, name) + w.done = append(w.done, name) + if len(w.done) > maxWorkersDone { + w.done = w.done[len(w.done)-maxWorkersDone : len(w.done)] + } + w.jobsMu.Unlock() + }() + fs.Debugf(nil, "cluster: processing pending job %q", name) inBuf, err := w.jobs.readFile(ctx, obj) if err != nil { @@ -67,7 +111,6 @@ func (w *Worker) checkJobs(ctx context.Context) { w.jobs.finish(ctx, obj, "input-error", false) return } - w.jobs.finish(ctx, obj, "input-ok", true) outBuf := jobs.NewJobFromBytes(ctx, inBuf) remote := path.Join(clusterDone, name+".json") err = w.jobs.writeFile(ctx, remote, time.Now(), outBuf) @@ -75,11 +118,12 @@ func (w *Worker) checkJobs(ctx context.Context) { fs.Errorf(nil, "check jobs failed to write output: %v", err) return } + w.jobs.finish(ctx, obj, "input-ok", true) fs.Debugf(nil, "cluster: processed pending job %q", name) } -// Run the background process -func (w *Worker) run(ctx context.Context) { +// Run the background process to pick up jobs +func (w *Worker) runJobs(ctx context.Context) { defer w.wg.Done() checkJobs := time.NewTicker(clusterCheckJobsInterval) defer checkJobs.Stop() @@ -93,6 +137,72 @@ func (w *Worker) run(ctx context.Context) { } } +// Write the worker status +func (w *Worker) writeStatus(ctx context.Context) { + // Create the worker status from the jobIDs and the short stats + status := WorkerStatus{ + ID: w.id, + Running: make(map[string]rc.Params), + Updated: time.Now(), + Done: make(map[string]bool), + } + w.jobsMu.Lock() + for _, jobID := range w.done { + status.Done[jobID] = true + } + for jobID := range w.running { + fs.Infof(nil, "read jobID %q", jobID) + si := accounting.StatsGroup(ctx, jobID) + out, err := si.RemoteStats(true) + if err != nil { + fs.Errorf(nil, "cluster: write status: stats: %v", err) + status.Running[jobID] = rc.Params{} + } else { + status.Running[jobID] = out + } + status.Done[jobID] = false + } + w.jobsMu.Unlock() + + // Write the stats to a file + buf, err := json.MarshalIndent(status, "", "\t") + if err != nil { + fs.Errorf(nil, "cluster: write status: json: %w", err) + return + } + err = w.jobs.writeFile(ctx, w.status, status.Updated, buf) + if err != nil { + fs.Errorf(nil, "cluster: write status: %w", err) + } +} + +// Remove the worker status +func (w *Worker) clearStatus(ctx context.Context) { + err := w.jobs.removeFile(ctx, w.status) + if err != nil { + fs.Errorf(nil, "cluster: clear status: %w", err) + } +} + +// Run the background process to write status +func (w *Worker) runStatus(ctx context.Context) { + defer w.wg.Done() + w.writeStatus(ctx) + defer w.clearStatus(ctx) + writeStatus := time.NewTicker(clusterWriteStatusInterval) + defer writeStatus.Stop() + for { + select { + case <-ctx.Done(): + return + case <-writeStatus.C: + t0 := time.Now() + w.writeStatus(ctx) + fs.Debugf(nil, "write status took %v at %v", time.Since(t0), t0) + } + } +} + // Shutdown the worker regardless of whether it has work to process or not. func (w *Worker) Shutdown(ctx context.Context) error { w.cancel()