mirror of
https://github.com/gilbertchen/duplicacy
synced 2025-12-15 07:43:21 +00:00
In GCD backend each thread should have its own backoff value
This commit is contained in:
@@ -30,7 +30,7 @@ type GCDStorage struct {
|
|||||||
service *drive.Service
|
service *drive.Service
|
||||||
idCache map[string]string
|
idCache map[string]string
|
||||||
idCacheLock *sync.Mutex
|
idCacheLock *sync.Mutex
|
||||||
backoff int
|
backoffs []int
|
||||||
|
|
||||||
isConnected bool
|
isConnected bool
|
||||||
numberOfThreads int
|
numberOfThreads int
|
||||||
@@ -45,12 +45,12 @@ type GCDConfig struct {
|
|||||||
Token oauth2.Token `json:"token"`
|
Token oauth2.Token `json:"token"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func (storage *GCDStorage) shouldRetry(err error) (bool, error) {
|
func (storage *GCDStorage) shouldRetry(threadIndex int, err error) (bool, error) {
|
||||||
|
|
||||||
retry := false
|
retry := false
|
||||||
message := ""
|
message := ""
|
||||||
if err == nil {
|
if err == nil {
|
||||||
storage.backoff = 1
|
storage.backoffs[threadIndex] = 1
|
||||||
return false, nil
|
return false, nil
|
||||||
} else if e, ok := err.(*googleapi.Error); ok {
|
} else if e, ok := err.(*googleapi.Error); ok {
|
||||||
if 500 <= e.Code && e.Code < 600 {
|
if 500 <= e.Code && e.Code < 600 {
|
||||||
@@ -84,15 +84,15 @@ func (storage *GCDStorage) shouldRetry(err error) (bool, error) {
|
|||||||
retry = err.Temporary()
|
retry = err.Temporary()
|
||||||
}
|
}
|
||||||
|
|
||||||
if !retry || storage.backoff >= 256{
|
if !retry || storage.backoffs[threadIndex] >= 256 {
|
||||||
storage.backoff = 1
|
storage.backoffs[threadIndex] = 1
|
||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
|
|
||||||
delay := float32(storage.backoff) * rand.Float32()
|
delay := float32(storage.backoffs[threadIndex]) * rand.Float32()
|
||||||
LOG_DEBUG("GCD_RETRY", "%s; retrying after %.2f seconds", message, delay)
|
LOG_DEBUG("GCD_RETRY", "%s; retrying after %.2f seconds", message, delay)
|
||||||
time.Sleep(time.Duration(float32(storage.backoff) * float32(time.Second)))
|
time.Sleep(time.Duration(float32(storage.backoffs[threadIndex]) * float32(time.Second)))
|
||||||
storage.backoff *= 2
|
storage.backoffs[threadIndex] *= 2
|
||||||
return true, nil
|
return true, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -129,7 +129,7 @@ func (storage *GCDStorage) deletePathID(path string) {
|
|||||||
storage.idCacheLock.Unlock()
|
storage.idCacheLock.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (storage *GCDStorage) listFiles(parentID string, listFiles bool) ([]*drive.File, error) {
|
func (storage *GCDStorage) listFiles(threadIndex int, parentID string, listFiles bool) ([]*drive.File, error) {
|
||||||
|
|
||||||
if parentID == "" {
|
if parentID == "" {
|
||||||
return nil, fmt.Errorf("No parent ID provided")
|
return nil, fmt.Errorf("No parent ID provided")
|
||||||
@@ -157,7 +157,7 @@ func (storage *GCDStorage) listFiles(parentID string, listFiles bool) ([]*drive.
|
|||||||
|
|
||||||
for {
|
for {
|
||||||
fileList, err = storage.service.Files.List().Q(query).Fields("nextPageToken", "files(name, mimeType, id, size)").PageToken(startToken).PageSize(maxCount).Do()
|
fileList, err = storage.service.Files.List().Q(query).Fields("nextPageToken", "files(name, mimeType, id, size)").PageToken(startToken).PageSize(maxCount).Do()
|
||||||
if retry, e := storage.shouldRetry(err); e == nil && !retry {
|
if retry, e := storage.shouldRetry(threadIndex, err); e == nil && !retry {
|
||||||
break
|
break
|
||||||
} else if retry {
|
} else if retry {
|
||||||
continue
|
continue
|
||||||
@@ -178,7 +178,7 @@ func (storage *GCDStorage) listFiles(parentID string, listFiles bool) ([]*drive.
|
|||||||
return files, nil
|
return files, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (storage *GCDStorage) listByName(parentID string, name string) (string, bool, int64, error) {
|
func (storage *GCDStorage) listByName(threadIndex int, parentID string, name string) (string, bool, int64, error) {
|
||||||
|
|
||||||
var fileList *drive.FileList
|
var fileList *drive.FileList
|
||||||
var err error
|
var err error
|
||||||
@@ -187,7 +187,7 @@ func (storage *GCDStorage) listByName(parentID string, name string) (string, boo
|
|||||||
query := "name = '" + name + "' and '" + parentID + "' in parents"
|
query := "name = '" + name + "' and '" + parentID + "' in parents"
|
||||||
fileList, err = storage.service.Files.List().Q(query).Fields("files(name, mimeType, id, size)").Do()
|
fileList, err = storage.service.Files.List().Q(query).Fields("files(name, mimeType, id, size)").Do()
|
||||||
|
|
||||||
if retry, e := storage.shouldRetry(err); e == nil && !retry {
|
if retry, e := storage.shouldRetry(threadIndex, err); e == nil && !retry {
|
||||||
break
|
break
|
||||||
} else if retry {
|
} else if retry {
|
||||||
continue
|
continue
|
||||||
@@ -205,7 +205,7 @@ func (storage *GCDStorage) listByName(parentID string, name string) (string, boo
|
|||||||
return file.Id, file.MimeType == "application/vnd.google-apps.folder", file.Size, nil
|
return file.Id, file.MimeType == "application/vnd.google-apps.folder", file.Size, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (storage *GCDStorage) getIDFromPath(path string) (string, error) {
|
func (storage *GCDStorage) getIDFromPath(threadIndex int, path string) (string, error) {
|
||||||
|
|
||||||
fileID := "root"
|
fileID := "root"
|
||||||
|
|
||||||
@@ -231,7 +231,7 @@ func (storage *GCDStorage) getIDFromPath(path string) (string, error) {
|
|||||||
|
|
||||||
var err error
|
var err error
|
||||||
var isDir bool
|
var isDir bool
|
||||||
fileID, isDir, _, err = storage.listByName(fileID, name)
|
fileID, isDir, _, err = storage.listByName(threadIndex, fileID, name)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
@@ -276,9 +276,10 @@ func CreateGCDStorage(tokenFile string, storagePath string, threads int) (storag
|
|||||||
numberOfThreads: threads,
|
numberOfThreads: threads,
|
||||||
idCache: make(map[string]string),
|
idCache: make(map[string]string),
|
||||||
idCacheLock: &sync.Mutex{},
|
idCacheLock: &sync.Mutex{},
|
||||||
|
backoffs: make([]int, threads),
|
||||||
}
|
}
|
||||||
|
|
||||||
storagePathID, err := storage.getIDFromPath(storagePath)
|
storagePathID, err := storage.getIDFromPath(0, storagePath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@@ -286,7 +287,7 @@ func CreateGCDStorage(tokenFile string, storagePath string, threads int) (storag
|
|||||||
storage.idCache[""] = storagePathID
|
storage.idCache[""] = storagePathID
|
||||||
|
|
||||||
for _, dir := range []string { "chunks", "snapshots", "fossils" } {
|
for _, dir := range []string { "chunks", "snapshots", "fossils" } {
|
||||||
dirID, isDir, _, err := storage.listByName(storagePathID, dir)
|
dirID, isDir, _, err := storage.listByName(0, storagePathID, dir)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@@ -316,7 +317,7 @@ func (storage *GCDStorage) ListFiles(threadIndex int, dir string) ([]string, []i
|
|||||||
|
|
||||||
if dir == "snapshots" {
|
if dir == "snapshots" {
|
||||||
|
|
||||||
files, err := storage.listFiles(storage.getPathID(dir), false)
|
files, err := storage.listFiles(threadIndex, storage.getPathID(dir), false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
@@ -329,12 +330,12 @@ func (storage *GCDStorage) ListFiles(threadIndex int, dir string) ([]string, []i
|
|||||||
}
|
}
|
||||||
return subDirs, nil, nil
|
return subDirs, nil, nil
|
||||||
} else if strings.HasPrefix(dir, "snapshots/") {
|
} else if strings.HasPrefix(dir, "snapshots/") {
|
||||||
pathID, err := storage.getIDFromPath(dir)
|
pathID, err := storage.getIDFromPath(threadIndex, dir)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
entries, err := storage.listFiles(pathID, true)
|
entries, err := storage.listFiles(threadIndex, pathID, true)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
@@ -351,7 +352,7 @@ func (storage *GCDStorage) ListFiles(threadIndex int, dir string) ([]string, []i
|
|||||||
sizes := []int64{}
|
sizes := []int64{}
|
||||||
|
|
||||||
for _, parent := range []string { "chunks", "fossils" } {
|
for _, parent := range []string { "chunks", "fossils" } {
|
||||||
entries, err := storage.listFiles(storage.getPathID(parent), true)
|
entries, err := storage.listFiles(threadIndex, storage.getPathID(parent), true)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
@@ -376,7 +377,7 @@ func (storage *GCDStorage) DeleteFile(threadIndex int, filePath string) (err err
|
|||||||
filePath = storage.convertFilePath(filePath)
|
filePath = storage.convertFilePath(filePath)
|
||||||
fileID, ok := storage.findPathID(filePath)
|
fileID, ok := storage.findPathID(filePath)
|
||||||
if !ok {
|
if !ok {
|
||||||
fileID, err = storage.getIDFromPath(filePath)
|
fileID, err = storage.getIDFromPath(threadIndex, filePath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
LOG_TRACE("GCD_STORAGE", "Ignored file deletion error: %v", err)
|
LOG_TRACE("GCD_STORAGE", "Ignored file deletion error: %v", err)
|
||||||
return nil
|
return nil
|
||||||
@@ -385,7 +386,7 @@ func (storage *GCDStorage) DeleteFile(threadIndex int, filePath string) (err err
|
|||||||
|
|
||||||
for {
|
for {
|
||||||
err = storage.service.Files.Delete(fileID).Fields("id").Do()
|
err = storage.service.Files.Delete(fileID).Fields("id").Do()
|
||||||
if retry, err := storage.shouldRetry(err); err == nil && !retry {
|
if retry, err := storage.shouldRetry(threadIndex, err); err == nil && !retry {
|
||||||
storage.deletePathID(filePath)
|
storage.deletePathID(filePath)
|
||||||
return nil
|
return nil
|
||||||
} else if retry {
|
} else if retry {
|
||||||
@@ -420,7 +421,7 @@ func (storage *GCDStorage) MoveFile(threadIndex int, from string, to string) (er
|
|||||||
|
|
||||||
for {
|
for {
|
||||||
_, err = storage.service.Files.Update(fileID, nil).AddParents(toParentID).RemoveParents(fromParentID).Do()
|
_, err = storage.service.Files.Update(fileID, nil).AddParents(toParentID).RemoveParents(fromParentID).Do()
|
||||||
if retry, err := storage.shouldRetry(err); err == nil && !retry {
|
if retry, err := storage.shouldRetry(threadIndex, err); err == nil && !retry {
|
||||||
break
|
break
|
||||||
} else if retry {
|
} else if retry {
|
||||||
continue
|
continue
|
||||||
@@ -469,7 +470,7 @@ func (storage *GCDStorage) CreateDirectory(threadIndex int, dir string) (err err
|
|||||||
|
|
||||||
for {
|
for {
|
||||||
file, err = storage.service.Files.Create(file).Fields("id").Do()
|
file, err = storage.service.Files.Create(file).Fields("id").Do()
|
||||||
if retry, err := storage.shouldRetry(err); err == nil && !retry {
|
if retry, err := storage.shouldRetry(threadIndex, err); err == nil && !retry {
|
||||||
break
|
break
|
||||||
} else if retry {
|
} else if retry {
|
||||||
continue
|
continue
|
||||||
@@ -495,12 +496,12 @@ func (storage *GCDStorage) GetFileInfo(threadIndex int, filePath string) (exist
|
|||||||
if dir == "." {
|
if dir == "." {
|
||||||
dir = ""
|
dir = ""
|
||||||
}
|
}
|
||||||
dirID, err := storage.getIDFromPath(dir)
|
dirID, err := storage.getIDFromPath(threadIndex, dir)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, false, 0, err
|
return false, false, 0, err
|
||||||
}
|
}
|
||||||
|
|
||||||
fileID, isDir, size, err = storage.listByName(dirID, path.Base(filePath))
|
fileID, isDir, size, err = storage.listByName(threadIndex, dirID, path.Base(filePath))
|
||||||
if fileID != "" {
|
if fileID != "" {
|
||||||
storage.savePathID(filePath, fileID)
|
storage.savePathID(filePath, fileID)
|
||||||
}
|
}
|
||||||
@@ -509,7 +510,7 @@ func (storage *GCDStorage) GetFileInfo(threadIndex int, filePath string) (exist
|
|||||||
|
|
||||||
for {
|
for {
|
||||||
file, err := storage.service.Files.Get(fileID).Fields("id, mimeType").Do()
|
file, err := storage.service.Files.Get(fileID).Fields("id, mimeType").Do()
|
||||||
if retry, err := storage.shouldRetry(err); err == nil && !retry {
|
if retry, err := storage.shouldRetry(threadIndex, err); err == nil && !retry {
|
||||||
return true, file.MimeType == "application/vnd.google-apps.folder", file.Size, nil
|
return true, file.MimeType == "application/vnd.google-apps.folder", file.Size, nil
|
||||||
} else if retry {
|
} else if retry {
|
||||||
continue
|
continue
|
||||||
@@ -533,7 +534,7 @@ func (storage *GCDStorage) FindChunk(threadIndex int, chunkID string, isFossil b
|
|||||||
}
|
}
|
||||||
|
|
||||||
fileID := ""
|
fileID := ""
|
||||||
fileID, _, size, err = storage.listByName(parentID, chunkID)
|
fileID, _, size, err = storage.listByName(threadIndex, parentID, chunkID)
|
||||||
if fileID != "" {
|
if fileID != "" {
|
||||||
storage.savePathID(realPath, fileID)
|
storage.savePathID(realPath, fileID)
|
||||||
}
|
}
|
||||||
@@ -545,7 +546,7 @@ func (storage *GCDStorage) DownloadFile(threadIndex int, filePath string, chunk
|
|||||||
// We never download the fossil so there is no need to convert the path
|
// We never download the fossil so there is no need to convert the path
|
||||||
fileID, ok := storage.findPathID(filePath)
|
fileID, ok := storage.findPathID(filePath)
|
||||||
if !ok {
|
if !ok {
|
||||||
fileID, err = storage.getIDFromPath(filePath)
|
fileID, err = storage.getIDFromPath(threadIndex, filePath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@@ -556,7 +557,7 @@ func (storage *GCDStorage) DownloadFile(threadIndex int, filePath string, chunk
|
|||||||
|
|
||||||
for {
|
for {
|
||||||
response, err = storage.service.Files.Get(fileID).Download()
|
response, err = storage.service.Files.Get(fileID).Download()
|
||||||
if retry, err := storage.shouldRetry(err); err == nil && !retry {
|
if retry, err := storage.shouldRetry(threadIndex, err); err == nil && !retry {
|
||||||
break
|
break
|
||||||
} else if retry {
|
} else if retry {
|
||||||
continue
|
continue
|
||||||
@@ -583,7 +584,7 @@ func (storage *GCDStorage) UploadFile(threadIndex int, filePath string, content
|
|||||||
|
|
||||||
parentID, ok := storage.findPathID(parent)
|
parentID, ok := storage.findPathID(parent)
|
||||||
if !ok {
|
if !ok {
|
||||||
parentID, err = storage.getIDFromPath(parent)
|
parentID, err = storage.getIDFromPath(threadIndex, parent)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@@ -599,7 +600,7 @@ func (storage *GCDStorage) UploadFile(threadIndex int, filePath string, content
|
|||||||
for {
|
for {
|
||||||
reader := CreateRateLimitedReader(content, storage.UploadRateLimit / storage.numberOfThreads)
|
reader := CreateRateLimitedReader(content, storage.UploadRateLimit / storage.numberOfThreads)
|
||||||
_, err = storage.service.Files.Create(file).Media(reader).Fields("id").Do()
|
_, err = storage.service.Files.Create(file).Media(reader).Fields("id").Do()
|
||||||
if retry, err := storage.shouldRetry(err); err == nil && !retry {
|
if retry, err := storage.shouldRetry(threadIndex, err); err == nil && !retry {
|
||||||
break
|
break
|
||||||
} else if retry {
|
} else if retry {
|
||||||
continue
|
continue
|
||||||
|
|||||||
Reference in New Issue
Block a user