mirror of
https://github.com/gilbertchen/duplicacy
synced 2025-12-10 05:13:17 +00:00
Retry on EOF errors in the SFTP backend
This commit is contained in:
@@ -14,6 +14,7 @@ import (
|
|||||||
"runtime"
|
"runtime"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
"sync"
|
||||||
|
|
||||||
"github.com/pkg/sftp"
|
"github.com/pkg/sftp"
|
||||||
"golang.org/x/crypto/ssh"
|
"golang.org/x/crypto/ssh"
|
||||||
@@ -23,9 +24,13 @@ type SFTPStorage struct {
|
|||||||
StorageBase
|
StorageBase
|
||||||
|
|
||||||
client *sftp.Client
|
client *sftp.Client
|
||||||
|
clientLock sync.Mutex
|
||||||
minimumNesting int // The minimum level of directories to dive into before searching for the chunk file.
|
minimumNesting int // The minimum level of directories to dive into before searching for the chunk file.
|
||||||
storageDir string
|
storageDir string
|
||||||
numberOfThreads int
|
numberOfThreads int
|
||||||
|
numberOfTries int
|
||||||
|
serverAddress string
|
||||||
|
sftpConfig *ssh.ClientConfig
|
||||||
}
|
}
|
||||||
|
|
||||||
func CreateSFTPStorageWithPassword(server string, port int, username string, storageDir string,
|
func CreateSFTPStorageWithPassword(server string, port int, username string, storageDir string,
|
||||||
@@ -86,6 +91,9 @@ func CreateSFTPStorage(server string, port int, username string, storageDir stri
|
|||||||
storageDir: storageDir,
|
storageDir: storageDir,
|
||||||
minimumNesting: minimumNesting,
|
minimumNesting: minimumNesting,
|
||||||
numberOfThreads: threads,
|
numberOfThreads: threads,
|
||||||
|
numberOfTries: 6,
|
||||||
|
serverAddress: serverAddress,
|
||||||
|
sftpConfig: sftpConfig,
|
||||||
}
|
}
|
||||||
|
|
||||||
// Random number fo generating the temporary chunk file suffix.
|
// Random number fo generating the temporary chunk file suffix.
|
||||||
@@ -99,13 +107,60 @@ func CreateSFTPStorage(server string, port int, username string, storageDir stri
|
|||||||
}
|
}
|
||||||
|
|
||||||
func CloseSFTPStorage(storage *SFTPStorage) {
|
func CloseSFTPStorage(storage *SFTPStorage) {
|
||||||
storage.client.Close()
|
if storage.client != nil {
|
||||||
|
storage.client.Close()
|
||||||
|
storage.client = nil
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (storage *SFTPStorage) getSFTPClient() *sftp.Client {
|
||||||
|
storage.clientLock.Lock()
|
||||||
|
defer storage.clientLock.Unlock()
|
||||||
|
return storage.client
|
||||||
|
}
|
||||||
|
|
||||||
|
func (storage *SFTPStorage) retry(f func () error) error {
|
||||||
|
delay := time.Second
|
||||||
|
for i := 0;; i++ {
|
||||||
|
err := f()
|
||||||
|
if err != nil && strings.Contains(err.Error(), "EOF") && i < storage.numberOfTries {
|
||||||
|
LOG_WARN("SFTP_RETRY", "Encountered an error (%v); retry after %d second(s)", err, delay/time.Second)
|
||||||
|
time.Sleep(delay)
|
||||||
|
delay *= 2
|
||||||
|
|
||||||
|
storage.clientLock.Lock()
|
||||||
|
if storage.client != nil {
|
||||||
|
storage.client.Close()
|
||||||
|
storage.client = nil
|
||||||
|
}
|
||||||
|
|
||||||
|
connection, err := ssh.Dial("tcp", storage.serverAddress, storage.sftpConfig)
|
||||||
|
if err != nil {
|
||||||
|
storage.clientLock.Unlock()
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
client, err := sftp.NewClient(connection)
|
||||||
|
if err != nil {
|
||||||
|
connection.Close()
|
||||||
|
storage.clientLock.Unlock()
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
storage.client = client
|
||||||
|
storage.clientLock.Unlock()
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
// ListFiles return the list of files and subdirectories under 'file' (non-recursively)
|
// ListFiles return the list of files and subdirectories under 'file' (non-recursively)
|
||||||
func (storage *SFTPStorage) ListFiles(threadIndex int, dirPath string) (files []string, sizes []int64, err error) {
|
func (storage *SFTPStorage) ListFiles(threadIndex int, dirPath string) (files []string, sizes []int64, err error) {
|
||||||
|
|
||||||
entries, err := storage.client.ReadDir(path.Join(storage.storageDir, dirPath))
|
var entries []os.FileInfo
|
||||||
|
err = storage.retry(func() error {
|
||||||
|
entries, err = storage.getSFTPClient().ReadDir(path.Join(storage.storageDir, dirPath))
|
||||||
|
return err
|
||||||
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
@@ -126,7 +181,11 @@ func (storage *SFTPStorage) ListFiles(threadIndex int, dirPath string) (files []
|
|||||||
// DeleteFile deletes the file or directory at 'filePath'.
|
// DeleteFile deletes the file or directory at 'filePath'.
|
||||||
func (storage *SFTPStorage) DeleteFile(threadIndex int, filePath string) (err error) {
|
func (storage *SFTPStorage) DeleteFile(threadIndex int, filePath string) (err error) {
|
||||||
fullPath := path.Join(storage.storageDir, filePath)
|
fullPath := path.Join(storage.storageDir, filePath)
|
||||||
fileInfo, err := storage.client.Stat(fullPath)
|
var fileInfo os.FileInfo
|
||||||
|
err = storage.retry(func() error {
|
||||||
|
fileInfo, err = storage.getSFTPClient().Stat(fullPath)
|
||||||
|
return err
|
||||||
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if os.IsNotExist(err) {
|
if os.IsNotExist(err) {
|
||||||
LOG_TRACE("SFTP_STORAGE", "File %s has disappeared before deletion", filePath)
|
LOG_TRACE("SFTP_STORAGE", "File %s has disappeared before deletion", filePath)
|
||||||
@@ -137,33 +196,47 @@ func (storage *SFTPStorage) DeleteFile(threadIndex int, filePath string) (err er
|
|||||||
if fileInfo == nil {
|
if fileInfo == nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
return storage.client.Remove(path.Join(storage.storageDir, filePath))
|
return storage.retry(func() error { return storage.getSFTPClient().Remove(path.Join(storage.storageDir, filePath)) })
|
||||||
}
|
}
|
||||||
|
|
||||||
// MoveFile renames the file.
|
// MoveFile renames the file.
|
||||||
func (storage *SFTPStorage) MoveFile(threadIndex int, from string, to string) (err error) {
|
func (storage *SFTPStorage) MoveFile(threadIndex int, from string, to string) (err error) {
|
||||||
toPath := path.Join(storage.storageDir, to)
|
toPath := path.Join(storage.storageDir, to)
|
||||||
fileInfo, err := storage.client.Stat(toPath)
|
var fileInfo os.FileInfo
|
||||||
|
err = storage.retry(func() error {
|
||||||
|
fileInfo, err = storage.getSFTPClient().Stat(toPath)
|
||||||
|
return err
|
||||||
|
})
|
||||||
if fileInfo != nil {
|
if fileInfo != nil {
|
||||||
return fmt.Errorf("The destination file %s already exists", toPath)
|
return fmt.Errorf("The destination file %s already exists", toPath)
|
||||||
}
|
}
|
||||||
return storage.client.Rename(path.Join(storage.storageDir, from),
|
err = storage.retry(func() error { return storage.getSFTPClient().Rename(path.Join(storage.storageDir, from),
|
||||||
path.Join(storage.storageDir, to))
|
path.Join(storage.storageDir, to)) })
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// CreateDirectory creates a new directory.
|
// CreateDirectory creates a new directory.
|
||||||
func (storage *SFTPStorage) CreateDirectory(threadIndex int, dirPath string) (err error) {
|
func (storage *SFTPStorage) CreateDirectory(threadIndex int, dirPath string) (err error) {
|
||||||
fullPath := path.Join(storage.storageDir, dirPath)
|
fullPath := path.Join(storage.storageDir, dirPath)
|
||||||
fileInfo, err := storage.client.Stat(fullPath)
|
var fileInfo os.FileInfo
|
||||||
|
err = storage.retry(func() error {
|
||||||
|
fileInfo, err = storage.getSFTPClient().Stat(fullPath)
|
||||||
|
return err
|
||||||
|
})
|
||||||
if fileInfo != nil && fileInfo.IsDir() {
|
if fileInfo != nil && fileInfo.IsDir() {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
return storage.client.Mkdir(path.Join(storage.storageDir, dirPath))
|
return storage.retry(func() error { return storage.getSFTPClient().Mkdir(path.Join(storage.storageDir, dirPath)) })
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetFileInfo returns the information about the file or directory at 'filePath'.
|
// GetFileInfo returns the information about the file or directory at 'filePath'.
|
||||||
func (storage *SFTPStorage) GetFileInfo(threadIndex int, filePath string) (exist bool, isDir bool, size int64, err error) {
|
func (storage *SFTPStorage) GetFileInfo(threadIndex int, filePath string) (exist bool, isDir bool, size int64, err error) {
|
||||||
fileInfo, err := storage.client.Stat(path.Join(storage.storageDir, filePath))
|
var fileInfo os.FileInfo
|
||||||
|
err = storage.retry(func() error {
|
||||||
|
fileInfo, err = storage.getSFTPClient().Stat(path.Join(storage.storageDir, filePath))
|
||||||
|
return err
|
||||||
|
})
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if os.IsNotExist(err) {
|
if os.IsNotExist(err) {
|
||||||
return false, false, 0, nil
|
return false, false, 0, nil
|
||||||
@@ -181,18 +254,19 @@ func (storage *SFTPStorage) GetFileInfo(threadIndex int, filePath string) (exist
|
|||||||
|
|
||||||
// DownloadFile reads the file at 'filePath' into the chunk.
|
// DownloadFile reads the file at 'filePath' into the chunk.
|
||||||
func (storage *SFTPStorage) DownloadFile(threadIndex int, filePath string, chunk *Chunk) (err error) {
|
func (storage *SFTPStorage) DownloadFile(threadIndex int, filePath string, chunk *Chunk) (err error) {
|
||||||
file, err := storage.client.Open(path.Join(storage.storageDir, filePath))
|
return storage.retry(func() error {
|
||||||
|
file, err := storage.getSFTPClient().Open(path.Join(storage.storageDir, filePath))
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
defer file.Close()
|
defer file.Close()
|
||||||
if _, err = RateLimitedCopy(chunk, file, storage.DownloadRateLimit/storage.numberOfThreads); err != nil {
|
if _, err = RateLimitedCopy(chunk, file, storage.DownloadRateLimit/storage.numberOfThreads); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
return nil
|
||||||
return nil
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// UploadFile writes 'content' to the file at 'filePath'.
|
// UploadFile writes 'content' to the file at 'filePath'.
|
||||||
@@ -203,20 +277,26 @@ func (storage *SFTPStorage) UploadFile(threadIndex int, filePath string, content
|
|||||||
dirs := strings.Split(filePath, "/")
|
dirs := strings.Split(filePath, "/")
|
||||||
if len(dirs) > 1 {
|
if len(dirs) > 1 {
|
||||||
fullDir := path.Dir(fullPath)
|
fullDir := path.Dir(fullPath)
|
||||||
_, err := storage.client.Stat(fullDir)
|
err = storage.retry(func() error {
|
||||||
|
_, err := storage.getSFTPClient().Stat(fullDir)
|
||||||
|
return err
|
||||||
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// The error may be caused by a non-existent fullDir, or a broken connection. In either case,
|
// The error may be caused by a non-existent fullDir, or a broken connection. In either case,
|
||||||
// we just assume it is the former because there isn't a way to tell which is the case.
|
// we just assume it is the former because there isn't a way to tell which is the case.
|
||||||
for i, _ := range dirs[1 : len(dirs)-1] {
|
for i, _ := range dirs[1 : len(dirs)-1] {
|
||||||
subDir := path.Join(storage.storageDir, path.Join(dirs[0:i+2]...))
|
subDir := path.Join(storage.storageDir, path.Join(dirs[0:i+2]...))
|
||||||
// We don't check the error; just keep going blindly but always store the last err
|
// We don't check the error; just keep going blindly but always store the last err
|
||||||
err = storage.client.Mkdir(subDir)
|
err = storage.getSFTPClient().Mkdir(subDir)
|
||||||
}
|
}
|
||||||
|
|
||||||
// If there is an error creating the dirs, we check fullDir one more time, because another thread
|
// If there is an error creating the dirs, we check fullDir one more time, because another thread
|
||||||
// may happen to create the same fullDir ahead of this thread
|
// may happen to create the same fullDir ahead of this thread
|
||||||
if err != nil {
|
if err != nil {
|
||||||
_, err := storage.client.Stat(fullDir)
|
err = storage.retry(func() error {
|
||||||
|
_, err := storage.getSFTPClient().Stat(fullDir)
|
||||||
|
return err
|
||||||
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@@ -224,39 +304,41 @@ func (storage *SFTPStorage) UploadFile(threadIndex int, filePath string, content
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
letters := "abcdefghijklmnopqrstuvwxyz"
|
return storage.retry(func() error {
|
||||||
suffix := make([]byte, 8)
|
|
||||||
for i := range suffix {
|
|
||||||
suffix[i] = letters[rand.Intn(len(letters))]
|
|
||||||
}
|
|
||||||
|
|
||||||
temporaryFile := fullPath + "." + string(suffix) + ".tmp"
|
letters := "abcdefghijklmnopqrstuvwxyz"
|
||||||
|
suffix := make([]byte, 8)
|
||||||
file, err := storage.client.OpenFile(temporaryFile, os.O_WRONLY|os.O_CREATE|os.O_TRUNC)
|
for i := range suffix {
|
||||||
if err != nil {
|
suffix[i] = letters[rand.Intn(len(letters))]
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
reader := CreateRateLimitedReader(content, storage.UploadRateLimit/storage.numberOfThreads)
|
|
||||||
_, err = io.Copy(file, reader)
|
|
||||||
if err != nil {
|
|
||||||
file.Close()
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
file.Close()
|
|
||||||
|
|
||||||
err = storage.client.Rename(temporaryFile, fullPath)
|
|
||||||
if err != nil {
|
|
||||||
|
|
||||||
if _, err = storage.client.Stat(fullPath); err == nil {
|
|
||||||
storage.client.Remove(temporaryFile)
|
|
||||||
return nil
|
|
||||||
} else {
|
|
||||||
return fmt.Errorf("Uploaded file but failed to store it at %s: %v", fullPath, err)
|
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
temporaryFile := fullPath + "." + string(suffix) + ".tmp"
|
||||||
|
|
||||||
|
file, err := storage.getSFTPClient().OpenFile(temporaryFile, os.O_WRONLY|os.O_CREATE|os.O_TRUNC)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
reader := CreateRateLimitedReader(content, storage.UploadRateLimit/storage.numberOfThreads)
|
||||||
|
_, err = io.Copy(file, reader)
|
||||||
|
if err != nil {
|
||||||
|
file.Close()
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
file.Close()
|
||||||
|
|
||||||
|
err = storage.getSFTPClient().Rename(temporaryFile, fullPath)
|
||||||
|
if err != nil {
|
||||||
|
if _, err = storage.getSFTPClient().Stat(fullPath); err == nil {
|
||||||
|
storage.getSFTPClient().Remove(temporaryFile)
|
||||||
|
return nil
|
||||||
|
} else {
|
||||||
|
return fmt.Errorf("Uploaded file but failed to store it at %s: %v", fullPath, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// If a local snapshot cache is needed for the storage to avoid downloading/uploading chunks too often when
|
// If a local snapshot cache is needed for the storage to avoid downloading/uploading chunks too often when
|
||||||
|
|||||||
Reference in New Issue
Block a user