1
0
mirror of https://github.com/gilbertchen/duplicacy synced 2025-12-10 05:13:17 +00:00

Exponential Backoff should work now.

Maximum sleep is 32*2.
This commit is contained in:
TheBestPessimist
2017-09-20 10:43:47 +03:00
parent ae44bf7226
commit 6a03a98f55

View File

@@ -30,12 +30,12 @@ type GCDStorage struct {
service *drive.Service service *drive.Service
idCache map[string]string idCache map[string]string
idCacheLock *sync.Mutex idCacheLock *sync.Mutex
backoffs []int backoffs []float64
backoffsRetries []int
isConnected bool isConnected bool
numberOfThreads int numberOfThreads int
TestMode bool TestMode bool
} }
type GCDConfig struct { type GCDConfig struct {
@@ -46,11 +46,27 @@ type GCDConfig struct {
} }
func (storage *GCDStorage) shouldRetry(threadIndex int, err error) (bool, error) { func (storage *GCDStorage) shouldRetry(threadIndex int, err error) (bool, error) {
const LIMIT_BACKOFF_TIME = 32
const MAX_NUMBER_OF_RETRIES = 15
minimumSleepRatio := 0.1
maximumSleepRatio := 0.2
minimumSleep := float64(storage.numberOfThreads) * minimumSleepRatio
maximumSleep := float64(storage.numberOfThreads) * maximumSleepRatio
rand.Seed(time.Now().UnixNano()) // unsure if this is needed
retry := false retry := false
message := "" message := ""
if err == nil { if err == nil {
storage.backoffs[threadIndex] = 1 /**
logic for said calculus is here: https://stackoverflow.com/questions/1527803/generating-random-whole-numbers-in-javascript-in-a-specific-range
i chose 0.1*thread number as a minimum sleep time
and 0.2*thread number as a maximum sleep time
for the first sleep of the first backoff of the threads.
This would mean that both when the program is started, and when multiple threads retry, google won't be ddosed :^)
*/
storage.backoffs[threadIndex] = rand.Float64()*(maximumSleep-minimumSleep+1) + minimumSleep
storage.backoffsRetries[threadIndex] = 0
return false, nil return false, nil
} else if e, ok := err.(*googleapi.Error); ok { } else if e, ok := err.(*googleapi.Error); ok {
if 500 <= e.Code && e.Code < 600 { if 500 <= e.Code && e.Code < 600 {
@@ -63,8 +79,9 @@ func (storage *GCDStorage) shouldRetry(threadIndex int, err error) (bool, error)
retry = true retry = true
} else if e.Code == 403 { } else if e.Code == 403 {
// User Rate Limit Exceeded // User Rate Limit Exceeded
message = "User Rate Limit Exceeded" message = e.Message // "User Rate Limit Exceeded"
retry = true retry = true
} else if e.Code == 401 { } else if e.Code == 401 {
// Only retry on authorization error when storage has been connected before // Only retry on authorization error when storage has been connected before
if storage.isConnected { if storage.isConnected {
@@ -84,21 +101,29 @@ func (storage *GCDStorage) shouldRetry(threadIndex int, err error) (bool, error)
retry = err.Temporary() retry = err.Temporary()
} }
if !retry || storage.backoffs[threadIndex] >= 256 { if !retry || storage.backoffsRetries[threadIndex] >= MAX_NUMBER_OF_RETRIES {
storage.backoffs[threadIndex] = 1 LOG_INFO("GCD_RETRY", "Thread: %03d. Maximum number of retries reached. Backoff time: %.2f. Number of retries: %d", threadIndex, storage.backoffs[threadIndex], storage.backoffsRetries[threadIndex])
storage.backoffs[threadIndex] = rand.Float64()*(maximumSleep-minimumSleep+1) + minimumSleep
storage.backoffsRetries[threadIndex] = 0
return false, err return false, err
} }
delay := float32(storage.backoffs[threadIndex]) * rand.Float32() delay := storage.backoffs[threadIndex]*rand.Float64() + storage.backoffs[threadIndex]*rand.Float64()
LOG_DEBUG("GCD_RETRY", "%s; retrying after %.2f seconds", message, delay) LOG_INFO("GCD_RETRY", "Thread: %03d. Message: %s. Retrying after %.2f seconds. current backoff: %.2f. Number of retries: %d", threadIndex, message, delay, storage.backoffs[threadIndex], storage.backoffsRetries[threadIndex])
time.Sleep(time.Duration(float32(storage.backoffs[threadIndex]) * float32(time.Second))) time.Sleep(time.Duration(delay * float64(time.Second)))
storage.backoffs[threadIndex] *= 2
if storage.backoffs[threadIndex] < LIMIT_BACKOFF_TIME {
storage.backoffs[threadIndex] *= 2.0
} else {
storage.backoffs[threadIndex] = LIMIT_BACKOFF_TIME
storage.backoffsRetries[threadIndex] += 1
}
return true, nil return true, nil
} }
func (storage *GCDStorage) convertFilePath(filePath string) (string) { func (storage *GCDStorage) convertFilePath(filePath string) string {
if strings.HasPrefix(filePath, "chunks/") && strings.HasSuffix(filePath, ".fsl") { if strings.HasPrefix(filePath, "chunks/") && strings.HasSuffix(filePath, ".fsl") {
return "fossils/" + filePath[len("chunks/"):len(filePath) - len(".fsl")] return "fossils/" + filePath[len("chunks/"):len(filePath)-len(".fsl")]
} }
return filePath return filePath
} }
@@ -135,7 +160,7 @@ func (storage *GCDStorage) listFiles(threadIndex int, parentID string, listFiles
return nil, fmt.Errorf("No parent ID provided") return nil, fmt.Errorf("No parent ID provided")
} }
files := []*drive.File {} files := []*drive.File{}
startToken := "" startToken := ""
@@ -174,7 +199,6 @@ func (storage *GCDStorage) listFiles(threadIndex int, parentID string, listFiles
} }
} }
return files, nil return files, nil
} }
@@ -238,7 +262,7 @@ func (storage *GCDStorage) getIDFromPath(threadIndex int, path string) (string,
if fileID == "" { if fileID == "" {
return "", fmt.Errorf("Path %s doesn't exist", path) return "", fmt.Errorf("Path %s doesn't exist", path)
} }
if i != len(names) - 1 && !isDir { if i != len(names)-1 && !isDir {
return "", fmt.Errorf("Invalid path %s", path) return "", fmt.Errorf("Invalid path %s", path)
} }
} }
@@ -253,7 +277,7 @@ func CreateGCDStorage(tokenFile string, storagePath string, threads int) (storag
return nil, err return nil, err
} }
gcdConfig := &GCDConfig {} gcdConfig := &GCDConfig{}
if err := json.Unmarshal(description, gcdConfig); err != nil { if err := json.Unmarshal(description, gcdConfig); err != nil {
return nil, err return nil, err
} }
@@ -271,12 +295,13 @@ func CreateGCDStorage(tokenFile string, storagePath string, threads int) (storag
return nil, err return nil, err
} }
storage = &GCDStorage { storage = &GCDStorage{
service: service, service: service,
numberOfThreads: threads, numberOfThreads: threads,
idCache: make(map[string]string), idCache: make(map[string]string),
idCacheLock: &sync.Mutex{}, idCacheLock: &sync.Mutex{},
backoffs: make([]int, threads), backoffs: make([]float64, threads),
backoffsRetries: make([]int, threads),
} }
storagePathID, err := storage.getIDFromPath(0, storagePath) storagePathID, err := storage.getIDFromPath(0, storagePath)
@@ -286,7 +311,7 @@ func CreateGCDStorage(tokenFile string, storagePath string, threads int) (storag
storage.idCache[""] = storagePathID storage.idCache[""] = storagePathID
for _, dir := range []string { "chunks", "snapshots", "fossils" } { for _, dir := range []string{"chunks", "snapshots", "fossils"} {
dirID, isDir, _, err := storage.listByName(0, storagePathID, dir) dirID, isDir, _, err := storage.listByName(0, storagePathID, dir)
if err != nil { if err != nil {
return nil, err return nil, err
@@ -297,7 +322,7 @@ func CreateGCDStorage(tokenFile string, storagePath string, threads int) (storag
return nil, err return nil, err
} }
} else if !isDir { } else if !isDir {
return nil, fmt.Errorf("%s/%s is not a directory", storagePath + "/" + dir) return nil, fmt.Errorf("%s/%s is not a directory", storagePath+"/"+dir)
} else { } else {
storage.idCache[dir] = dirID storage.idCache[dir] = dirID
} }
@@ -311,8 +336,8 @@ func CreateGCDStorage(tokenFile string, storagePath string, threads int) (storag
// ListFiles return the list of files and subdirectories under 'dir' (non-recursively) // ListFiles return the list of files and subdirectories under 'dir' (non-recursively)
func (storage *GCDStorage) ListFiles(threadIndex int, dir string) ([]string, []int64, error) { func (storage *GCDStorage) ListFiles(threadIndex int, dir string) ([]string, []int64, error) {
for len(dir) > 0 && dir[len(dir) - 1] == '/' { for len(dir) > 0 && dir[len(dir)-1] == '/' {
dir = dir[:len(dir) - 1] dir = dir[:len(dir)-1]
} }
if dir == "snapshots" { if dir == "snapshots" {
@@ -325,8 +350,8 @@ func (storage *GCDStorage) ListFiles(threadIndex int, dir string) ([]string, []i
subDirs := []string{} subDirs := []string{}
for _, file := range files { for _, file := range files {
storage.savePathID("snapshots/" + file.Name, file.Id) storage.savePathID("snapshots/"+file.Name, file.Id)
subDirs = append(subDirs, file.Name + "/") subDirs = append(subDirs, file.Name+"/")
} }
return subDirs, nil, nil return subDirs, nil, nil
} else if strings.HasPrefix(dir, "snapshots/") { } else if strings.HasPrefix(dir, "snapshots/") {
@@ -343,7 +368,7 @@ func (storage *GCDStorage) ListFiles(threadIndex int, dir string) ([]string, []i
files := []string{} files := []string{}
for _, entry := range entries { for _, entry := range entries {
storage.savePathID(dir + "/" + entry.Name, entry.Id) storage.savePathID(dir+"/"+entry.Name, entry.Id)
files = append(files, entry.Name) files = append(files, entry.Name)
} }
return files, nil, nil return files, nil, nil
@@ -351,7 +376,7 @@ func (storage *GCDStorage) ListFiles(threadIndex int, dir string) ([]string, []i
files := []string{} files := []string{}
sizes := []int64{} sizes := []int64{}
for _, parent := range []string { "chunks", "fossils" } { for _, parent := range []string{"chunks", "fossils"} {
entries, err := storage.listFiles(threadIndex, storage.getPathID(parent), true) entries, err := storage.listFiles(threadIndex, storage.getPathID(parent), true)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
@@ -362,7 +387,7 @@ func (storage *GCDStorage) ListFiles(threadIndex int, dir string) ([]string, []i
if parent == "fossils" { if parent == "fossils" {
name += ".fsl" name += ".fsl"
} }
storage.savePathID(parent + "/" + entry.Name, entry.Id) storage.savePathID(parent+"/"+entry.Name, entry.Id)
files = append(files, name) files = append(files, name)
sizes = append(sizes, entry.Size) sizes = append(sizes, entry.Size)
} }
@@ -438,8 +463,8 @@ func (storage *GCDStorage) MoveFile(threadIndex int, from string, to string) (er
// CreateDirectory creates a new directory. // CreateDirectory creates a new directory.
func (storage *GCDStorage) CreateDirectory(threadIndex int, dir string) (err error) { func (storage *GCDStorage) CreateDirectory(threadIndex int, dir string) (err error) {
for len(dir) > 0 && dir[len(dir) - 1] == '/' { for len(dir) > 0 && dir[len(dir)-1] == '/' {
dir = dir[:len(dir) - 1] dir = dir[:len(dir)-1]
} }
exist, isDir, _, err := storage.GetFileInfo(threadIndex, dir) exist, isDir, _, err := storage.GetFileInfo(threadIndex, dir)
@@ -462,10 +487,10 @@ func (storage *GCDStorage) CreateDirectory(threadIndex int, dir string) (err err
name = dir[len("snapshots/"):] name = dir[len("snapshots/"):]
} }
file := &drive.File { file := &drive.File{
Name: name, Name: name,
MimeType: "application/vnd.google-apps.folder", MimeType: "application/vnd.google-apps.folder",
Parents: []string { parentID }, Parents: []string{parentID},
} }
for { for {
@@ -485,8 +510,8 @@ func (storage *GCDStorage) CreateDirectory(threadIndex int, dir string) (err err
// GetFileInfo returns the information about the file or directory at 'filePath'. // GetFileInfo returns the information about the file or directory at 'filePath'.
func (storage *GCDStorage) GetFileInfo(threadIndex int, filePath string) (exist bool, isDir bool, size int64, err error) { func (storage *GCDStorage) GetFileInfo(threadIndex int, filePath string) (exist bool, isDir bool, size int64, err error) {
for len(filePath) > 0 && filePath[len(filePath) - 1] == '/' { for len(filePath) > 0 && filePath[len(filePath)-1] == '/' {
filePath = filePath[:len(filePath) - 1] filePath = filePath[:len(filePath)-1]
} }
// GetFileInfo is never called on a fossil // GetFileInfo is never called on a fossil
@@ -568,7 +593,7 @@ func (storage *GCDStorage) DownloadFile(threadIndex int, filePath string, chunk
defer response.Body.Close() defer response.Body.Close()
_, err = RateLimitedCopy(chunk, response.Body, storage.DownloadRateLimit / storage.numberOfThreads) _, err = RateLimitedCopy(chunk, response.Body, storage.DownloadRateLimit/storage.numberOfThreads)
return err return err
} }
@@ -591,14 +616,14 @@ func (storage *GCDStorage) UploadFile(threadIndex int, filePath string, content
storage.savePathID(parent, parentID) storage.savePathID(parent, parentID)
} }
file := &drive.File { file := &drive.File{
Name: path.Base(filePath), Name: path.Base(filePath),
MimeType: "application/octet-stream", MimeType: "application/octet-stream",
Parents: []string { parentID }, Parents: []string{parentID},
} }
for { for {
reader := CreateRateLimitedReader(content, storage.UploadRateLimit / storage.numberOfThreads) reader := CreateRateLimitedReader(content, storage.UploadRateLimit/storage.numberOfThreads)
_, err = storage.service.Files.Create(file).Media(reader).Fields("id").Do() _, err = storage.service.Files.Create(file).Media(reader).Fields("id").Do()
if retry, err := storage.shouldRetry(threadIndex, err); err == nil && !retry { if retry, err := storage.shouldRetry(threadIndex, err); err == nil && !retry {
break break