mirror of
https://github.com/rclone/rclone.git
synced 2026-01-04 09:33:36 +00:00
Redo error handling for sync/copy/move
* Factor sync/copy/move into its own file * Make fatal errors abort the sync * Make Copy return errors * Make Sync/Copy/Move return the last Copy error if there was one * Prioritise returning Fatal errors * NoRetry errors are returned if no other types of errors
This commit is contained in:
454
fs/operations.go
454
fs/operations.go
@@ -198,11 +198,10 @@ func removeFailedCopy(dst Object) bool {
|
||||
// If dst is nil then the object must not exist already. If you do
|
||||
// call Copy() with dst nil on a pre-existing file then some filing
|
||||
// systems (eg Drive) may duplicate the file.
|
||||
func Copy(f Fs, dst, src Object) {
|
||||
func Copy(f Fs, dst, src Object) (err error) {
|
||||
maxTries := Config.LowLevelRetries
|
||||
tries := 0
|
||||
doUpdate := dst != nil
|
||||
var err error
|
||||
var actionTaken string
|
||||
for {
|
||||
// Try server side copy first - if has optional interface and
|
||||
@@ -265,7 +264,7 @@ func Copy(f Fs, dst, src Object) {
|
||||
Stats.Error()
|
||||
ErrorLog(src, "Failed to copy: %v", err)
|
||||
removeFailedCopy(dst)
|
||||
return
|
||||
return err
|
||||
}
|
||||
|
||||
// Verify sizes are the same after transfer
|
||||
@@ -274,7 +273,7 @@ func Copy(f Fs, dst, src Object) {
|
||||
err = errors.Errorf("corrupted on transfer: sizes differ %d vs %d", src.Size(), dst.Size())
|
||||
ErrorLog(dst, "%v", err)
|
||||
removeFailedCopy(dst)
|
||||
return
|
||||
return err
|
||||
}
|
||||
|
||||
// Verify hashes are the same after transfer - ignoring blank hashes
|
||||
@@ -286,12 +285,14 @@ func Copy(f Fs, dst, src Object) {
|
||||
// Get common hash type
|
||||
hashType := common.GetOne()
|
||||
|
||||
srcSum, err := src.Hash(hashType)
|
||||
var srcSum string
|
||||
srcSum, err = src.Hash(hashType)
|
||||
if err != nil {
|
||||
Stats.Error()
|
||||
ErrorLog(src, "Failed to read src hash: %v", err)
|
||||
} else if srcSum != "" {
|
||||
dstSum, err := dst.Hash(hashType)
|
||||
var dstSum string
|
||||
dstSum, err = dst.Hash(hashType)
|
||||
if err != nil {
|
||||
Stats.Error()
|
||||
ErrorLog(dst, "Failed to read hash: %v", err)
|
||||
@@ -300,134 +301,13 @@ func Copy(f Fs, dst, src Object) {
|
||||
err = errors.Errorf("corrupted on transfer: %v hash differ %q vs %q", hashType, srcSum, dstSum)
|
||||
ErrorLog(dst, "%v", err)
|
||||
removeFailedCopy(dst)
|
||||
return
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Debug(src, actionTaken)
|
||||
}
|
||||
|
||||
// Check to see if src needs to be copied to dst and if so puts it in out
|
||||
func checkOne(pair ObjectPair, out ObjectPairChan) {
|
||||
src, dst := pair.src, pair.dst
|
||||
if dst == nil {
|
||||
Debug(src, "Couldn't find file - need to transfer")
|
||||
out <- pair
|
||||
return
|
||||
}
|
||||
// Check to see if can store this
|
||||
if !src.Storable() {
|
||||
return
|
||||
}
|
||||
// If we should ignore existing files, don't transfer
|
||||
if Config.IgnoreExisting {
|
||||
Debug(src, "Destination exists, skipping")
|
||||
return
|
||||
}
|
||||
// If we should upload unconditionally
|
||||
if Config.IgnoreTimes {
|
||||
Debug(src, "Uploading unconditionally as --ignore-times is in use")
|
||||
out <- pair
|
||||
return
|
||||
}
|
||||
// If UpdateOlder is in effect, skip if dst is newer than src
|
||||
if Config.UpdateOlder {
|
||||
srcModTime := src.ModTime()
|
||||
dstModTime := dst.ModTime()
|
||||
dt := dstModTime.Sub(srcModTime)
|
||||
// If have a mutually agreed precision then use that
|
||||
modifyWindow := Config.ModifyWindow
|
||||
if modifyWindow == ModTimeNotSupported {
|
||||
// Otherwise use 1 second as a safe default as
|
||||
// the resolution of the time a file was
|
||||
// uploaded.
|
||||
modifyWindow = time.Second
|
||||
}
|
||||
switch {
|
||||
case dt >= modifyWindow:
|
||||
Debug(src, "Destination is newer than source, skipping")
|
||||
return
|
||||
case dt <= -modifyWindow:
|
||||
Debug(src, "Destination is older than source, transferring")
|
||||
default:
|
||||
if src.Size() == dst.Size() {
|
||||
Debug(src, "Destination mod time is within %v of source and sizes identical, skipping", modifyWindow)
|
||||
return
|
||||
}
|
||||
Debug(src, "Destination mod time is within %v of source but sizes differ, transferring", modifyWindow)
|
||||
}
|
||||
} else {
|
||||
// Check to see if changed or not
|
||||
if Equal(src, dst) {
|
||||
Debug(src, "Unchanged skipping")
|
||||
return
|
||||
}
|
||||
}
|
||||
out <- pair
|
||||
}
|
||||
|
||||
// PairChecker reads Objects~s on in send to out if they need transferring.
|
||||
//
|
||||
// FIXME potentially doing lots of hashes at once
|
||||
func PairChecker(in ObjectPairChan, out ObjectPairChan, wg *sync.WaitGroup) {
|
||||
defer wg.Done()
|
||||
for pair := range in {
|
||||
src := pair.src
|
||||
Stats.Checking(src.Remote())
|
||||
checkOne(pair, out)
|
||||
Stats.DoneChecking(src.Remote())
|
||||
}
|
||||
}
|
||||
|
||||
// PairCopier reads Objects on in and copies them.
|
||||
func PairCopier(in ObjectPairChan, fdst Fs, wg *sync.WaitGroup) {
|
||||
defer wg.Done()
|
||||
for pair := range in {
|
||||
src := pair.src
|
||||
Stats.Transferring(src.Remote())
|
||||
if Config.DryRun {
|
||||
Log(src, "Not copying as --dry-run")
|
||||
} else {
|
||||
Copy(fdst, pair.dst, src)
|
||||
}
|
||||
Stats.DoneTransferring(src.Remote())
|
||||
}
|
||||
}
|
||||
|
||||
// PairMover reads Objects on in and moves them if possible, or copies
|
||||
// them if not
|
||||
func PairMover(in ObjectPairChan, fdst Fs, wg *sync.WaitGroup) {
|
||||
defer wg.Done()
|
||||
// See if we have Move available
|
||||
fdstMover, haveMover := fdst.(Mover)
|
||||
for pair := range in {
|
||||
src := pair.src
|
||||
dst := pair.dst
|
||||
Stats.Transferring(src.Remote())
|
||||
if Config.DryRun {
|
||||
Log(src, "Not moving as --dry-run")
|
||||
} else if haveMover && src.Fs().Name() == fdst.Name() {
|
||||
// Delete destination if it exists
|
||||
if pair.dst != nil {
|
||||
err := dst.Remove()
|
||||
if err != nil {
|
||||
Stats.Error()
|
||||
ErrorLog(dst, "Couldn't delete: %v", err)
|
||||
}
|
||||
}
|
||||
_, err := fdstMover.Move(src, src.Remote())
|
||||
if err != nil {
|
||||
Stats.Error()
|
||||
ErrorLog(dst, "Couldn't move: %v", err)
|
||||
} else {
|
||||
Debug(src, "Moved")
|
||||
}
|
||||
} else {
|
||||
Copy(fdst, pair.dst, src)
|
||||
}
|
||||
Stats.DoneTransferring(src.Remote())
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// DeleteFile deletes a single file respecting --dry-run and accumulating stats and errors.
|
||||
@@ -476,7 +356,10 @@ func DeleteFiles(toBeDeleted ObjectsChan) error {
|
||||
// dir is the start directory, "" for root
|
||||
// If includeAll is specified all files will be added,
|
||||
// otherwise only files passing the filter will be added.
|
||||
func readFilesFn(fs Fs, includeAll bool, dir string, add func(Object)) (err error) {
|
||||
//
|
||||
// Each object is passed ito the function provided. If that returns
|
||||
// an error then the listing will be aborted and that error returned.
|
||||
func readFilesFn(fs Fs, includeAll bool, dir string, add func(Object) error) (err error) {
|
||||
list := NewLister()
|
||||
if !includeAll {
|
||||
list.SetFilter(Config.Filter)
|
||||
@@ -494,7 +377,10 @@ func readFilesFn(fs Fs, includeAll bool, dir string, add func(Object)) (err erro
|
||||
}
|
||||
// Make sure we don't delete excluded files if not required
|
||||
if includeAll || Config.Filter.IncludeObject(o) {
|
||||
add(o)
|
||||
err = add(o)
|
||||
if err != nil {
|
||||
list.SetError(err)
|
||||
}
|
||||
} else {
|
||||
Debug(o, "Excluded from sync (and deletion)")
|
||||
}
|
||||
@@ -511,7 +397,7 @@ func readFilesFn(fs Fs, includeAll bool, dir string, add func(Object)) (err erro
|
||||
func readFilesMap(fs Fs, includeAll bool, dir string) (files map[string]Object, err error) {
|
||||
files = make(map[string]Object)
|
||||
normalised := make(map[string]struct{})
|
||||
err = readFilesFn(fs, includeAll, dir, func(o Object) {
|
||||
err = readFilesFn(fs, includeAll, dir, func(o Object) error {
|
||||
remote := o.Remote()
|
||||
normalisedRemote := strings.ToLower(norm.NFC.String(remote))
|
||||
if _, ok := files[remote]; !ok {
|
||||
@@ -523,6 +409,7 @@ func readFilesMap(fs Fs, includeAll bool, dir string) (files map[string]Object,
|
||||
Log(o, "Duplicate file detected")
|
||||
}
|
||||
normalised[normalisedRemote] = struct{}{}
|
||||
return nil
|
||||
})
|
||||
return files, err
|
||||
}
|
||||
@@ -565,309 +452,6 @@ func Same(fdst, fsrc Fs) bool {
|
||||
return fdst.Name() == fsrc.Name() && fdst.Root() == fsrc.Root()
|
||||
}
|
||||
|
||||
type syncCopyMove struct {
|
||||
// parameters
|
||||
fdst Fs
|
||||
fsrc Fs
|
||||
Delete bool
|
||||
DoMove bool
|
||||
dir string
|
||||
// internal state
|
||||
noTraverse bool // if set don't trafevers the dst
|
||||
deleteBefore bool // set if we must delete objects before copying
|
||||
dstFiles map[string]Object // dst files, only used if Delete
|
||||
srcFiles map[string]Object // src files, only used if deleteBefore
|
||||
srcFilesChan chan Object // passes src objects
|
||||
srcFilesResult chan error // error result of src listing
|
||||
dstFilesResult chan error // error result of dst listing
|
||||
checkerWg sync.WaitGroup // wait for checkers
|
||||
toBeChecked ObjectPairChan // checkers channel
|
||||
copierWg sync.WaitGroup // wait for copiers
|
||||
toBeUploaded ObjectPairChan // copiers channel
|
||||
}
|
||||
|
||||
func newSyncCopyMove(fdst, fsrc Fs, Delete bool, DoMove bool) *syncCopyMove {
|
||||
s := &syncCopyMove{
|
||||
fdst: fdst,
|
||||
fsrc: fsrc,
|
||||
Delete: Delete,
|
||||
DoMove: DoMove,
|
||||
dir: "",
|
||||
srcFilesChan: make(chan Object, Config.Checkers+Config.Transfers),
|
||||
srcFilesResult: make(chan error, 1),
|
||||
dstFilesResult: make(chan error, 1),
|
||||
noTraverse: Config.NoTraverse,
|
||||
toBeChecked: make(ObjectPairChan, Config.Transfers),
|
||||
toBeUploaded: make(ObjectPairChan, Config.Transfers),
|
||||
deleteBefore: Delete && Config.DeleteBefore,
|
||||
}
|
||||
if s.noTraverse && s.Delete {
|
||||
Debug(s.fdst, "Ignoring --no-traverse with sync")
|
||||
s.noTraverse = false
|
||||
}
|
||||
return s
|
||||
|
||||
}
|
||||
|
||||
// This reads the source files from s.srcFiles into srcFilesChan then
|
||||
// closes it
|
||||
//
|
||||
// It returns the final result of the read into s.srcFilesResult
|
||||
func (s *syncCopyMove) readSrcUsingMap() {
|
||||
for _, o := range s.srcFiles {
|
||||
s.srcFilesChan <- o
|
||||
}
|
||||
close(s.srcFilesChan)
|
||||
s.srcFilesResult <- nil
|
||||
}
|
||||
|
||||
// This reads the source files into srcFilesChan then closes it
|
||||
//
|
||||
// It returns the final result of the read into s.srcFilesResult
|
||||
func (s *syncCopyMove) readSrcUsingChan() {
|
||||
err := readFilesFn(s.fsrc, false, s.dir, func(o Object) {
|
||||
s.srcFilesChan <- o
|
||||
})
|
||||
close(s.srcFilesChan)
|
||||
s.srcFilesResult <- err
|
||||
}
|
||||
|
||||
// This reads the destination files in into dstFiles
|
||||
//
|
||||
// It returns the final result of the read into s.dstFilesResult
|
||||
func (s *syncCopyMove) readDstFiles() {
|
||||
var err error
|
||||
s.dstFiles, err = readFilesMap(s.fdst, Config.Filter.DeleteExcluded, s.dir)
|
||||
s.dstFilesResult <- err
|
||||
}
|
||||
|
||||
// This deletes the files in the dstFiles map. If checkSrcMap is set
|
||||
// then it checks to see if they exist first in srcFiles the source
|
||||
// file map, otherwise it unconditionally deletes them. If
|
||||
// checkSrcMap is clear then it assumes that the any source files that
|
||||
// have been found have been removed from dstFiles already.
|
||||
func (s *syncCopyMove) deleteFiles(checkSrcMap bool) error {
|
||||
if Stats.Errored() {
|
||||
ErrorLog(s.fdst, "%v", ErrorNotDeleting)
|
||||
return ErrorNotDeleting
|
||||
}
|
||||
|
||||
// Delete the spare files
|
||||
toDelete := make(ObjectsChan, Config.Transfers)
|
||||
go func() {
|
||||
for remote, o := range s.dstFiles {
|
||||
if checkSrcMap {
|
||||
_, exists := s.srcFiles[remote]
|
||||
if !exists {
|
||||
toDelete <- o
|
||||
}
|
||||
} else {
|
||||
toDelete <- o
|
||||
}
|
||||
}
|
||||
close(toDelete)
|
||||
}()
|
||||
return DeleteFiles(toDelete)
|
||||
}
|
||||
|
||||
// This starts the background checkers.
|
||||
func (s *syncCopyMove) startCheckers() {
|
||||
s.checkerWg.Add(Config.Checkers)
|
||||
for i := 0; i < Config.Checkers; i++ {
|
||||
go PairChecker(s.toBeChecked, s.toBeUploaded, &s.checkerWg)
|
||||
}
|
||||
}
|
||||
|
||||
// This stops the background checkers
|
||||
func (s *syncCopyMove) stopCheckers() {
|
||||
close(s.toBeChecked)
|
||||
Log(s.fdst, "Waiting for checks to finish")
|
||||
s.checkerWg.Wait()
|
||||
}
|
||||
|
||||
// This starts the background transfers
|
||||
func (s *syncCopyMove) startTransfers() {
|
||||
s.copierWg.Add(Config.Transfers)
|
||||
for i := 0; i < Config.Transfers; i++ {
|
||||
if s.DoMove {
|
||||
go PairMover(s.toBeUploaded, s.fdst, &s.copierWg)
|
||||
} else {
|
||||
go PairCopier(s.toBeUploaded, s.fdst, &s.copierWg)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// This stops the background transfers
|
||||
func (s *syncCopyMove) stopTransfers() {
|
||||
close(s.toBeUploaded)
|
||||
Log(s.fdst, "Waiting for transfers to finish")
|
||||
s.copierWg.Wait()
|
||||
}
|
||||
|
||||
// Syncs fsrc into fdst
|
||||
//
|
||||
// If Delete is true then it deletes any files in fdst that aren't in fsrc
|
||||
//
|
||||
// If DoMove is true then files will be moved instead of copied
|
||||
//
|
||||
// dir is the start directory, "" for root
|
||||
func (s *syncCopyMove) run() error {
|
||||
if Same(s.fdst, s.fsrc) {
|
||||
ErrorLog(s.fdst, "Nothing to do as source and destination are the same")
|
||||
return nil
|
||||
}
|
||||
|
||||
err := Mkdir(s.fdst)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Start reading dstFiles if required
|
||||
if !s.noTraverse {
|
||||
go s.readDstFiles()
|
||||
}
|
||||
|
||||
// If s.deleteBefore then we need to read the whole source map first
|
||||
if s.deleteBefore {
|
||||
// Read source files into the map
|
||||
s.srcFiles, err = readFilesMap(s.fsrc, false, s.dir)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// Pump the map into s.srcFilesChan
|
||||
go s.readSrcUsingMap()
|
||||
} else {
|
||||
go s.readSrcUsingChan()
|
||||
}
|
||||
|
||||
// Wait for dstfiles to finish reading if we were reading them
|
||||
// and report any errors
|
||||
if !s.noTraverse {
|
||||
err = <-s.dstFilesResult
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// Delete files first if required
|
||||
// Have dstFiles and srcFiles complete at this point
|
||||
if s.deleteBefore {
|
||||
err = s.deleteFiles(true)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// Start background checking and transferring pipeline
|
||||
s.startCheckers()
|
||||
s.startTransfers()
|
||||
|
||||
// Do the transfers
|
||||
for src := range s.srcFilesChan {
|
||||
remote := src.Remote()
|
||||
var dst Object
|
||||
if s.noTraverse {
|
||||
var err error
|
||||
dst, err = s.fdst.NewObject(remote)
|
||||
if err != nil {
|
||||
dst = nil
|
||||
if err != ErrorObjectNotFound {
|
||||
Debug(src, "Error making NewObject: %v", err)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
dst = s.dstFiles[remote]
|
||||
// Remove file from s.dstFiles because it exists in srcFiles
|
||||
delete(s.dstFiles, remote)
|
||||
}
|
||||
if dst != nil {
|
||||
s.toBeChecked <- ObjectPair{src, dst}
|
||||
} else {
|
||||
// No need to check since doesn't exist
|
||||
s.toBeUploaded <- ObjectPair{src, nil}
|
||||
}
|
||||
}
|
||||
|
||||
// Stop background checking and transferring pipeline
|
||||
s.stopCheckers()
|
||||
s.stopTransfers()
|
||||
|
||||
// Retrieve the delayed error from the source listing goroutine
|
||||
err = <-s.srcFilesResult
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Delete files during or after
|
||||
if s.Delete && (Config.DeleteDuring || Config.DeleteAfter) {
|
||||
err = s.deleteFiles(false)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Sync fsrc into fdst
|
||||
func Sync(fdst, fsrc Fs) error {
|
||||
return newSyncCopyMove(fdst, fsrc, true, false).run()
|
||||
}
|
||||
|
||||
// CopyDir copies fsrc into fdst
|
||||
func CopyDir(fdst, fsrc Fs) error {
|
||||
return newSyncCopyMove(fdst, fsrc, false, false).run()
|
||||
}
|
||||
|
||||
// moveDir moves fsrc into fdst
|
||||
func moveDir(fdst, fsrc Fs) error {
|
||||
return newSyncCopyMove(fdst, fsrc, false, true).run()
|
||||
}
|
||||
|
||||
// MoveDir moves fsrc into fdst
|
||||
func MoveDir(fdst, fsrc Fs) error {
|
||||
if Same(fdst, fsrc) {
|
||||
ErrorLog(fdst, "Nothing to do as source and destination are the same")
|
||||
return nil
|
||||
}
|
||||
|
||||
// First attempt to use DirMover if exists, same Fs and no filters are active
|
||||
if fdstDirMover, ok := fdst.(DirMover); ok && fsrc.Name() == fdst.Name() && Config.Filter.InActive() {
|
||||
err := fdstDirMover.DirMove(fsrc)
|
||||
Debug(fdst, "Using server side directory move")
|
||||
switch err {
|
||||
case ErrorCantDirMove, ErrorDirExists:
|
||||
Debug(fdst, "Server side directory move failed - fallback to copy/delete: %v", err)
|
||||
case nil:
|
||||
Debug(fdst, "Server side directory move succeeded")
|
||||
return nil
|
||||
default:
|
||||
Stats.Error()
|
||||
ErrorLog(fdst, "Server side directory move failed: %v", err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// Now move the files
|
||||
err := moveDir(fdst, fsrc)
|
||||
if err != nil || Stats.Errored() {
|
||||
ErrorLog(fdst, "Not deleting files as there were IO errors")
|
||||
return err
|
||||
}
|
||||
// If no filters then purge
|
||||
if Config.Filter.InActive() {
|
||||
return Purge(fsrc)
|
||||
}
|
||||
// Otherwise remove any remaining files obeying filters
|
||||
err = Delete(fsrc)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// and try to remove the directory if empty - ignoring error
|
||||
_ = TryRmdir(fsrc)
|
||||
return nil
|
||||
}
|
||||
|
||||
// checkIdentical checks to see if dst and src are identical
|
||||
//
|
||||
// it returns true if differences were found
|
||||
|
||||
Reference in New Issue
Block a user