diff --git a/src/duplicacy_b2client.go b/src/duplicacy_b2client.go index 3dfd0b8..7f8f92e 100644 --- a/src/duplicacy_b2client.go +++ b/src/duplicacy_b2client.go @@ -5,19 +5,22 @@ package duplicacy import ( - "bytes" - "crypto/sha1" - "encoding/base64" - "encoding/hex" - "encoding/json" - "fmt" "io" - "io/ioutil" - "math/rand" - "net/http" + "os" + "fmt" + "bytes" + "time" + "sync" "strconv" "strings" - "time" + "net/url" + "net/http" + "math/rand" + "io/ioutil" + "crypto/sha1" + "encoding/hex" + "encoding/json" + "encoding/base64" ) type B2Error struct { @@ -39,67 +42,112 @@ var B2AuthorizationURL = "https://api.backblazeb2.com/b2api/v1/b2_authorize_acco type B2Client struct { HTTPClient *http.Client + AccountID string ApplicationKeyID string ApplicationKey string + BucketName string + BucketID string + StorageDir string + + Lock sync.Mutex AuthorizationToken string APIURL string DownloadURL string - BucketName string - BucketID string + IsAuthorized bool - UploadURL string - UploadToken string + UploadURLs []string + UploadTokens []string - TestMode bool + Threads int + MaximumRetries int + TestMode bool } -func NewB2Client(applicationKeyID string, applicationKey string) *B2Client { +// URL encode the given path but keep the slashes intact +func B2Escape(path string) string { + var components []string + for _, c := range strings.Split(path, "/") { + components = append(components, url.QueryEscape(c)) + } + return strings.Join(components, "/") +} + +func NewB2Client(applicationKeyID string, applicationKey string, storageDir string, threads int) *B2Client { + + for storageDir != "" && storageDir[0] == '/' { + storageDir = storageDir[1:] + } + + if storageDir != "" && storageDir[len(storageDir) - 1] != '/' { + storageDir += "/" + } + + maximumRetries := 10 + if value, found := os.LookupEnv("DUPLICACY_B2_RETRIES"); found && value != "" { + maximumRetries, _ = strconv.Atoi(value) + LOG_INFO("B2_RETRIES", "Setting maximum retries for B2 to %d", maximumRetries) + } + client := &B2Client{ HTTPClient: http.DefaultClient, ApplicationKeyID: applicationKeyID, ApplicationKey: applicationKey, + StorageDir: storageDir, + UploadURLs: make([]string, threads), + UploadTokens: make([]string, threads), + Threads: threads, + MaximumRetries: maximumRetries, } return client } -func (client *B2Client) retry(backoff int, response *http.Response) int { +func (client *B2Client) getAPIURL() string { + client.Lock.Lock() + defer client.Lock.Unlock() + return client.APIURL +} + +func (client *B2Client) getDownloadURL() string { + client.Lock.Lock() + defer client.Lock.Unlock() + return client.DownloadURL +} + +func (client *B2Client) retry(retries int, response *http.Response) int { if response != nil { if backoffList, found := response.Header["Retry-After"]; found && len(backoffList) > 0 { retryAfter, _ := strconv.Atoi(backoffList[0]) if retryAfter >= 1 { time.Sleep(time.Duration(retryAfter) * time.Second) - return 0 + return 1 } } } - if backoff == 0 { - backoff = 1 - } else { - backoff *= 2 + + if retries >= client.MaximumRetries + 1 { + return 0 } - time.Sleep(time.Duration(backoff) * time.Second) - return backoff + retries++ + delay := 1 << uint(retries) + if delay > 64 { + delay = 64 + } + delayInSeconds := (rand.Float32() + 1.0) * float32(delay) / 2.0 + + time.Sleep(time.Duration(delayInSeconds) * time.Second) + return retries } -func (client *B2Client) call(url string, method string, requestHeaders map[string]string, input interface{}) (io.ReadCloser, http.Header, int64, error) { - - switch method { - case http.MethodGet: - break - case http.MethodHead: - break - case http.MethodPost: - break - default: - return nil, nil, 0, fmt.Errorf("unhandled http request method: " + method) - } +func (client *B2Client) call(threadIndex int, requestURL string, method string, requestHeaders map[string]string, input interface{}) ( + io.ReadCloser, http.Header, int64, error) { var response *http.Response - backoff := 0 - for i := 0; i < 8; i++ { - var inputReader *bytes.Reader + retries := 0 + for { + var inputReader io.Reader + isUpload := false switch input.(type) { default: @@ -108,21 +156,43 @@ func (client *B2Client) call(url string, method string, requestHeaders map[strin return nil, nil, 0, err } inputReader = bytes.NewReader(jsonInput) - case []byte: - inputReader = bytes.NewReader(input.([]byte)) case int: inputReader = bytes.NewReader([]byte("")) + case []byte: + isUpload = true + inputReader = bytes.NewReader(input.([]byte)) + case *RateLimitedReader: + isUpload = true + rateLimitedReader := input.(*RateLimitedReader) + rateLimitedReader.Reset() + inputReader = rateLimitedReader } - request, err := http.NewRequest(method, url, inputReader) + + if isUpload { + if client.UploadURLs[threadIndex] == "" || client.UploadTokens[threadIndex] == "" { + err := client.getUploadURL(threadIndex) + if err != nil { + return nil, nil, 0, err + } + } + requestURL = client.UploadURLs[threadIndex] + } + + request, err := http.NewRequest(method, requestURL, inputReader) if err != nil { return nil, nil, 0, err } - if url == B2AuthorizationURL { + if requestURL == B2AuthorizationURL { request.Header.Set("Authorization", "Basic "+base64.StdEncoding.EncodeToString([]byte(client.ApplicationKeyID+":"+client.ApplicationKey))) + } else if isUpload { + request.ContentLength, _ = strconv.ParseInt(requestHeaders["Content-Length"], 10, 64) + request.Header.Set("Authorization", client.UploadTokens[threadIndex]) } else { + client.Lock.Lock() request.Header.Set("Authorization", client.AuthorizationToken) + client.Lock.Unlock() } if requestHeaders != nil { @@ -133,7 +203,9 @@ func (client *B2Client) call(url string, method string, requestHeaders map[strin if client.TestMode { r := rand.Float32() - if r < 0.5 { + if r < 0.5 && isUpload { + request.Header.Set("X-Bz-Test-Mode", "fail_some_uploads") + } else if r < 0.75 { request.Header.Set("X-Bz-Test-Mode", "expire_some_account_authorization_tokens") } else { request.Header.Set("X-Bz-Test-Mode", "force_cap_exceeded") @@ -142,27 +214,46 @@ func (client *B2Client) call(url string, method string, requestHeaders map[strin response, err = client.HTTPClient.Do(request) if err != nil { - if url != B2AuthorizationURL { - LOG_DEBUG("BACKBLAZE_CALL", "URL request '%s' returned an error: %v", url, err) - backoff = client.retry(backoff, response) - continue + + // Don't retry when the first authorization request fails + if requestURL == B2AuthorizationURL && !client.IsAuthorized { + return nil, nil, 0, err } - return nil, nil, 0, err + + LOG_TRACE("BACKBLAZE_CALL", "[%d] URL request '%s' returned an error: %v", threadIndex, requestURL, err) + + retries = client.retry(retries, response) + if retries <= 0 { + return nil, nil, 0, err + } + + // Clear the upload url to requrest a new one on retry + if isUpload { + client.UploadURLs[threadIndex] = "" + client.UploadTokens[threadIndex] = "" + } + continue + } if response.StatusCode < 300 { return response.Body, response.Header, response.ContentLength, nil } - LOG_DEBUG("BACKBLAZE_CALL", "URL request '%s %s' returned status code %d", method, url, response.StatusCode) + e := &B2Error{} + if err := json.NewDecoder(response.Body).Decode(e); err != nil { + LOG_TRACE("BACKBLAZE_CALL", "[%d] URL request '%s %s' returned status code %d", threadIndex, method, requestURL, response.StatusCode) + } else { + LOG_TRACE("BACKBLAZE_CALL", "[%d] URL request '%s %s' returned %d %s", threadIndex, method, requestURL, response.StatusCode, e.Message) + } - io.Copy(ioutil.Discard, response.Body) response.Body.Close() + if response.StatusCode == 401 { - if url == B2AuthorizationURL { + if requestURL == B2AuthorizationURL { return nil, nil, 0, fmt.Errorf("Authorization failure") } - client.AuthorizeAccount() + client.AuthorizeAccount(threadIndex) continue } else if response.StatusCode == 403 { if !client.TestMode { @@ -176,32 +267,21 @@ func (client *B2Client) call(url string, method string, requestHeaders map[strin } else if response.StatusCode == 416 { if http.MethodHead == method { // 416 Requested Range Not Satisfiable - return nil, nil, 0, fmt.Errorf("URL request '%s' returned status code %d", url, response.StatusCode) + return nil, nil, 0, fmt.Errorf("URL request '%s' returned %d %s", requestURL, response.StatusCode, e.Message) } - } else if response.StatusCode == 429 || response.StatusCode == 408 { - backoff = client.retry(backoff, response) - continue - } else if response.StatusCode >= 500 && response.StatusCode <= 599 { - backoff = client.retry(backoff, response) - continue - } else { - LOG_INFO("BACKBLAZE_CALL", "URL request '%s' returned status code %d", url, response.StatusCode) - backoff = client.retry(backoff, response) - continue } - defer response.Body.Close() - - e := &B2Error{} - - if err := json.NewDecoder(response.Body).Decode(e); err != nil { - return nil, nil, 0, err + retries = client.retry(retries, response) + if retries <= 0 { + return nil, nil, 0, fmt.Errorf("URL request '%s' returned %d %s", requestURL, response.StatusCode, e.Message) } - return nil, nil, 0, e + if isUpload { + client.UploadURLs[threadIndex] = "" + client.UploadTokens[threadIndex] = "" + } } - return nil, nil, 0, fmt.Errorf("Maximum backoff reached") } type B2AuthorizeAccountOutput struct { @@ -211,9 +291,11 @@ type B2AuthorizeAccountOutput struct { DownloadURL string } -func (client *B2Client) AuthorizeAccount() (err error) { +func (client *B2Client) AuthorizeAccount(threadIndex int) (err error) { + client.Lock.Lock() + defer client.Lock.Unlock() - readCloser, _, _, err := client.call(B2AuthorizationURL, http.MethodPost, nil, make(map[string]string)) + readCloser, _, _, err := client.call(threadIndex, B2AuthorizationURL, http.MethodPost, nil, make(map[string]string)) if err != nil { return err } @@ -233,6 +315,7 @@ func (client *B2Client) AuthorizeAccount() (err error) { client.AuthorizationToken = output.AuthorizationToken client.APIURL = output.APIURL client.DownloadURL = output.DownloadURL + client.IsAuthorized = true return nil } @@ -249,9 +332,9 @@ func (client *B2Client) FindBucket(bucketName string) (err error) { input := make(map[string]string) input["accountId"] = client.AccountID - url := client.APIURL + "/b2api/v1/b2_list_buckets" + url := client.getAPIURL() + "/b2api/v1/b2_list_buckets" - readCloser, _, _, err := client.call(url, http.MethodPost, nil, input) + readCloser, _, _, err := client.call(0, url, http.MethodPost, nil, input) if err != nil { return err } @@ -293,7 +376,7 @@ type B2ListFileNamesOutput struct { NextFileId string } -func (client *B2Client) ListFileNames(startFileName string, singleFile bool, includeVersions bool) (files []*B2Entry, err error) { +func (client *B2Client) ListFileNames(threadIndex int, startFileName string, singleFile bool, includeVersions bool) (files []*B2Entry, err error) { maxFileCount := 1000 if singleFile { @@ -311,20 +394,21 @@ func (client *B2Client) ListFileNames(startFileName string, singleFile bool, inc input := make(map[string]interface{}) input["bucketId"] = client.BucketID - input["startFileName"] = startFileName + input["startFileName"] = client.StorageDir + startFileName input["maxFileCount"] = maxFileCount + input["prefix"] = client.StorageDir for { - url := client.APIURL + "/b2api/v1/b2_list_file_names" + url := client.getAPIURL() + "/b2api/v1/b2_list_file_names" requestHeaders := map[string]string{} requestMethod := http.MethodPost var requestInput interface{} requestInput = input if includeVersions { - url = client.APIURL + "/b2api/v1/b2_list_file_versions" + url = client.getAPIURL() + "/b2api/v1/b2_list_file_versions" } else if singleFile { // handle a single file with no versions as a special case to download the last byte of the file - url = client.DownloadURL + "/file/" + client.BucketName + "/" + startFileName + url = client.getDownloadURL() + "/file/" + client.BucketName + "/" + B2Escape(client.StorageDir + startFileName) // requesting byte -1 works for empty files where 0-0 fails with a 416 error requestHeaders["Range"] = "bytes=-1" // HEAD request @@ -334,7 +418,7 @@ func (client *B2Client) ListFileNames(startFileName string, singleFile bool, inc var readCloser io.ReadCloser var responseHeader http.Header var err error - readCloser, responseHeader, _, err = client.call(url, requestMethod, requestHeaders, requestInput) + readCloser, responseHeader, _, err = client.call(threadIndex, url, requestMethod, requestHeaders, requestInput) if err != nil { return nil, err } @@ -347,7 +431,7 @@ func (client *B2Client) ListFileNames(startFileName string, singleFile bool, inc if singleFile && !includeVersions { if responseHeader == nil { - LOG_DEBUG("BACKBLAZE_LIST", "b2_download_file_by_name did not return headers") + LOG_DEBUG("BACKBLAZE_LIST", "%s did not return headers", url) return []*B2Entry{}, nil } requiredHeaders := []string{ @@ -361,7 +445,7 @@ func (client *B2Client) ListFileNames(startFileName string, singleFile bool, inc } } if len(missingKeys) > 0 { - return nil, fmt.Errorf("b2_download_file_by_name missing headers: %s", missingKeys) + return nil, fmt.Errorf("%s missing headers: %s", url, missingKeys) } // construct the B2Entry from the response headers of the download request fileID := responseHeader.Get("x-bz-file-id") @@ -378,14 +462,14 @@ func (client *B2Client) ListFileNames(startFileName string, singleFile bool, inc // this should only execute if the requested file is empty and the range request didn't result in a Content-Range header fileSize, _ = strconv.ParseInt(lengthString, 0, 64) if fileSize != 0 { - return nil, fmt.Errorf("b2_download_file_by_name returned non-zero file length") + return nil, fmt.Errorf("%s returned non-zero file length", url) } } else { - return nil, fmt.Errorf("could not parse b2_download_file_by_name headers") + return nil, fmt.Errorf("could not parse headers returned by %s", url) } fileUploadTimestamp, _ := strconv.ParseInt(responseHeader.Get("X-Bz-Upload-Timestamp"), 0, 64) - return []*B2Entry{{fileID, fileName, fileAction, fileSize, fileUploadTimestamp}}, nil + return []*B2Entry{{fileID, fileName[len(client.StorageDir):], fileAction, fileSize, fileUploadTimestamp}}, nil } if err = json.NewDecoder(readCloser).Decode(&output); err != nil { @@ -394,31 +478,27 @@ func (client *B2Client) ListFileNames(startFileName string, singleFile bool, inc ioutil.ReadAll(readCloser) - if startFileName == "" { - files = append(files, output.Files...) - } else { - for _, file := range output.Files { - if singleFile { - if file.FileName == startFileName { - files = append(files, file) - if !includeVersions { - output.NextFileName = "" - break - } - } else { + for _, file := range output.Files { + file.FileName = file.FileName[len(client.StorageDir):] + if singleFile { + if file.FileName == startFileName { + files = append(files, file) + if !includeVersions { output.NextFileName = "" break } } else { - if strings.HasPrefix(file.FileName, startFileName) { - files = append(files, file) - } else { - output.NextFileName = "" - break - } + output.NextFileName = "" + break + } + } else { + if strings.HasPrefix(file.FileName, startFileName) { + files = append(files, file) + } else { + output.NextFileName = "" + break } } - } if len(output.NextFileName) == 0 { @@ -434,14 +514,14 @@ func (client *B2Client) ListFileNames(startFileName string, singleFile bool, inc return files, nil } -func (client *B2Client) DeleteFile(fileName string, fileID string) (err error) { +func (client *B2Client) DeleteFile(threadIndex int, fileName string, fileID string) (err error) { input := make(map[string]string) - input["fileName"] = fileName + input["fileName"] = client.StorageDir + fileName input["fileId"] = fileID - url := client.APIURL + "/b2api/v1/b2_delete_file_version" - readCloser, _, _, err := client.call(url, http.MethodPost, make(map[string]string), input) + url := client.getAPIURL() + "/b2api/v1/b2_delete_file_version" + readCloser, _, _, err := client.call(threadIndex, url, http.MethodPost, make(map[string]string), input) if err != nil { return err } @@ -454,14 +534,14 @@ type B2HideFileOutput struct { FileID string } -func (client *B2Client) HideFile(fileName string) (fileID string, err error) { +func (client *B2Client) HideFile(threadIndex int, fileName string) (fileID string, err error) { input := make(map[string]string) input["bucketId"] = client.BucketID - input["fileName"] = fileName + input["fileName"] = client.StorageDir + fileName - url := client.APIURL + "/b2api/v1/b2_hide_file" - readCloser, _, _, err := client.call(url, http.MethodPost, make(map[string]string), input) + url := client.getAPIURL() + "/b2api/v1/b2_hide_file" + readCloser, _, _, err := client.call(threadIndex, url, http.MethodPost, make(map[string]string), input) if err != nil { return "", err } @@ -478,11 +558,11 @@ func (client *B2Client) HideFile(fileName string) (fileID string, err error) { return output.FileID, nil } -func (client *B2Client) DownloadFile(filePath string) (io.ReadCloser, int64, error) { +func (client *B2Client) DownloadFile(threadIndex int, filePath string) (io.ReadCloser, int64, error) { - url := client.DownloadURL + "/file/" + client.BucketName + "/" + filePath + url := client.getDownloadURL() + "/file/" + client.BucketName + "/" + B2Escape(client.StorageDir + filePath) - readCloser, _, len, err := client.call(url, http.MethodGet, make(map[string]string), 0) + readCloser, _, len, err := client.call(threadIndex, url, http.MethodGet, make(map[string]string), 0) return readCloser, len, err } @@ -492,12 +572,12 @@ type B2GetUploadArgumentOutput struct { AuthorizationToken string } -func (client *B2Client) getUploadURL() error { +func (client *B2Client) getUploadURL(threadIndex int) error { input := make(map[string]string) input["bucketId"] = client.BucketID - url := client.APIURL + "/b2api/v1/b2_get_upload_url" - readCloser, _, _, err := client.call(url, http.MethodPost, make(map[string]string), input) + url := client.getAPIURL() + "/b2api/v1/b2_get_upload_url" + readCloser, _, _, err := client.call(threadIndex, url, http.MethodPost, make(map[string]string), input) if err != nil { return err } @@ -510,96 +590,29 @@ func (client *B2Client) getUploadURL() error { return err } - client.UploadURL = output.UploadURL - client.UploadToken = output.AuthorizationToken + client.UploadURLs[threadIndex] = output.UploadURL + client.UploadTokens[threadIndex] = output.AuthorizationToken return nil } -func (client *B2Client) UploadFile(filePath string, content []byte, rateLimit int) (err error) { +func (client *B2Client) UploadFile(threadIndex int, filePath string, content []byte, rateLimit int) (err error) { hasher := sha1.New() hasher.Write(content) hash := hex.EncodeToString(hasher.Sum(nil)) headers := make(map[string]string) - headers["X-Bz-File-Name"] = filePath + headers["X-Bz-File-Name"] = B2Escape(client.StorageDir + filePath) + headers["Content-Length"] = fmt.Sprintf("%d", len(content)) headers["Content-Type"] = "application/octet-stream" headers["X-Bz-Content-Sha1"] = hash - var response *http.Response - - backoff := 0 - for i := 0; i < 8; i++ { - - if client.UploadURL == "" || client.UploadToken == "" { - err = client.getUploadURL() - if err != nil { - return err - } - } - - request, err := http.NewRequest("POST", client.UploadURL, CreateRateLimitedReader(content, rateLimit)) - if err != nil { - return err - } - request.ContentLength = int64(len(content)) - - request.Header.Set("Authorization", client.UploadToken) - request.Header.Set("X-Bz-File-Name", filePath) - request.Header.Set("Content-Type", "application/octet-stream") - request.Header.Set("X-Bz-Content-Sha1", hash) - - for key, value := range headers { - request.Header.Set(key, value) - } - - if client.TestMode { - r := rand.Float32() - if r < 0.8 { - request.Header.Set("X-Bz-Test-Mode", "fail_some_uploads") - } else if r < 0.9 { - request.Header.Set("X-Bz-Test-Mode", "expire_some_account_authorization_tokens") - } else { - request.Header.Set("X-Bz-Test-Mode", "force_cap_exceeded") - } - } - - response, err = client.HTTPClient.Do(request) - if err != nil { - LOG_DEBUG("BACKBLAZE_UPLOAD", "URL request '%s' returned an error: %v", client.UploadURL, err) - backoff = client.retry(backoff, response) - client.UploadURL = "" - client.UploadToken = "" - continue - } - - io.Copy(ioutil.Discard, response.Body) - response.Body.Close() - - if response.StatusCode < 300 { - return nil - } - - LOG_DEBUG("BACKBLAZE_UPLOAD", "URL request '%s' returned status code %d", client.UploadURL, response.StatusCode) - - if response.StatusCode == 401 { - LOG_INFO("BACKBLAZE_UPLOAD", "Re-authorization required") - client.UploadURL = "" - client.UploadToken = "" - continue - } else if response.StatusCode == 403 { - if !client.TestMode { - return fmt.Errorf("B2 cap exceeded") - } - continue - } else { - LOG_INFO("BACKBLAZE_UPLOAD", "URL request '%s' returned status code %d", client.UploadURL, response.StatusCode) - backoff = client.retry(backoff, response) - client.UploadURL = "" - client.UploadToken = "" - } + readCloser, _, _, err := client.call(threadIndex, "", http.MethodPost, headers, CreateRateLimitedReader(content, rateLimit)) + if err != nil { + return err } - return fmt.Errorf("Maximum backoff reached") + readCloser.Close() + return nil } diff --git a/src/duplicacy_b2client_test.go b/src/duplicacy_b2client_test.go index 836952a..9c5ad9c 100644 --- a/src/duplicacy_b2client_test.go +++ b/src/duplicacy_b2client_test.go @@ -37,7 +37,7 @@ func createB2ClientForTest(t *testing.T) (*B2Client, string) { return nil, "" } - return NewB2Client(b2["account"], b2["key"]), b2["bucket"] + return NewB2Client(b2["account"], b2["key"], b2["directory"], 1), b2["bucket"] } @@ -50,7 +50,7 @@ func TestB2Client(t *testing.T) { b2Client.TestMode = true - err := b2Client.AuthorizeAccount() + err := b2Client.AuthorizeAccount(0) if err != nil { t.Errorf("Failed to authorize the b2 account: %v", err) return @@ -64,14 +64,14 @@ func TestB2Client(t *testing.T) { testDirectory := "b2client_test/" - files, err := b2Client.ListFileNames(testDirectory, false, false) + files, err := b2Client.ListFileNames(0, testDirectory, false, false) if err != nil { t.Errorf("Failed to list files: %v", err) return } for _, file := range files { - err = b2Client.DeleteFile(file.FileName, file.FileID) + err = b2Client.DeleteFile(0, file.FileName, file.FileID) if err != nil { t.Errorf("Failed to delete file '%s': %v", file.FileName, err) } @@ -90,14 +90,14 @@ func TestB2Client(t *testing.T) { hash := sha256.Sum256(content) name := hex.EncodeToString(hash[:]) - err = b2Client.UploadFile(testDirectory+name, content, 100) + err = b2Client.UploadFile(0, testDirectory+name, content, 100) if err != nil { t.Errorf("Error uploading file '%s': %v", name, err) return } } - files, err = b2Client.ListFileNames(testDirectory, false, false) + files, err = b2Client.ListFileNames(0, testDirectory, false, false) if err != nil { t.Errorf("Failed to list files: %v", err) return @@ -105,7 +105,7 @@ func TestB2Client(t *testing.T) { for _, file := range files { - readCloser, _, err := b2Client.DownloadFile(file.FileName) + readCloser, _, err := b2Client.DownloadFile(0, file.FileName) if err != nil { t.Errorf("Error downloading file '%s': %v", file.FileName, err) return @@ -125,7 +125,7 @@ func TestB2Client(t *testing.T) { } for _, file := range files { - err = b2Client.DeleteFile(file.FileName, file.FileID) + err = b2Client.DeleteFile(0, file.FileName, file.FileID) if err != nil { t.Errorf("Failed to delete file '%s': %v", file.FileName, err) } diff --git a/src/duplicacy_b2storage.go b/src/duplicacy_b2storage.go index c28a3a5..f54f992 100644 --- a/src/duplicacy_b2storage.go +++ b/src/duplicacy_b2storage.go @@ -11,32 +11,26 @@ import ( type B2Storage struct { StorageBase - clients []*B2Client + client *B2Client } // CreateB2Storage creates a B2 storage object. -func CreateB2Storage(accountID string, applicationKey string, bucket string, threads int) (storage *B2Storage, err error) { +func CreateB2Storage(accountID string, applicationKey string, bucket string, storageDir string, threads int) (storage *B2Storage, err error) { - var clients []*B2Client + client := NewB2Client(accountID, applicationKey, storageDir, threads) - for i := 0; i < threads; i++ { - client := NewB2Client(accountID, applicationKey) + err = client.AuthorizeAccount(0) + if err != nil { + return nil, err + } - err = client.AuthorizeAccount() - if err != nil { - return nil, err - } - - err = client.FindBucket(bucket) - if err != nil { - return nil, err - } - - clients = append(clients, client) + err = client.FindBucket(bucket) + if err != nil { + return nil, err } storage = &B2Storage{ - clients: clients, + client: client, } storage.DerivedStorage = storage @@ -56,7 +50,7 @@ func (storage *B2Storage) ListFiles(threadIndex int, dir string) (files []string includeVersions = true } - entries, err := storage.clients[threadIndex].ListFileNames(dir, false, includeVersions) + entries, err := storage.client.ListFileNames(threadIndex, dir, false, includeVersions) if err != nil { return nil, nil, err } @@ -102,7 +96,7 @@ func (storage *B2Storage) DeleteFile(threadIndex int, filePath string) (err erro if strings.HasSuffix(filePath, ".fsl") { filePath = filePath[:len(filePath)-len(".fsl")] - entries, err := storage.clients[threadIndex].ListFileNames(filePath, true, true) + entries, err := storage.client.ListFileNames(threadIndex, filePath, true, true) if err != nil { return err } @@ -116,7 +110,7 @@ func (storage *B2Storage) DeleteFile(threadIndex int, filePath string) (err erro toBeDeleted = true - err = storage.clients[threadIndex].DeleteFile(filePath, entry.FileID) + err = storage.client.DeleteFile(threadIndex, filePath, entry.FileID) if err != nil { return err } @@ -125,7 +119,7 @@ func (storage *B2Storage) DeleteFile(threadIndex int, filePath string) (err erro return nil } else { - entries, err := storage.clients[threadIndex].ListFileNames(filePath, true, false) + entries, err := storage.client.ListFileNames(threadIndex, filePath, true, false) if err != nil { return err } @@ -133,7 +127,7 @@ func (storage *B2Storage) DeleteFile(threadIndex int, filePath string) (err erro if len(entries) == 0 { return nil } - return storage.clients[threadIndex].DeleteFile(filePath, entries[0].FileID) + return storage.client.DeleteFile(threadIndex, filePath, entries[0].FileID) } } @@ -160,10 +154,10 @@ func (storage *B2Storage) MoveFile(threadIndex int, from string, to string) (err } if filePath == from { - _, err = storage.clients[threadIndex].HideFile(from) + _, err = storage.client.HideFile(threadIndex, from) return err } else { - entries, err := storage.clients[threadIndex].ListFileNames(filePath, true, true) + entries, err := storage.client.ListFileNames(threadIndex, filePath, true, true) if err != nil { return err } @@ -171,7 +165,7 @@ func (storage *B2Storage) MoveFile(threadIndex int, from string, to string) (err return nil } - return storage.clients[threadIndex].DeleteFile(filePath, entries[0].FileID) + return storage.client.DeleteFile(threadIndex, filePath, entries[0].FileID) } } @@ -188,7 +182,7 @@ func (storage *B2Storage) GetFileInfo(threadIndex int, filePath string) (exist b filePath = filePath[:len(filePath)-len(".fsl")] } - entries, err := storage.clients[threadIndex].ListFileNames(filePath, true, isFossil) + entries, err := storage.client.ListFileNames(threadIndex, filePath, true, isFossil) if err != nil { return false, false, 0, err } @@ -211,21 +205,21 @@ func (storage *B2Storage) GetFileInfo(threadIndex int, filePath string) (exist b func (storage *B2Storage) DownloadFile(threadIndex int, filePath string, chunk *Chunk) (err error) { filePath = strings.Replace(filePath, " ", "%20", -1) - readCloser, _, err := storage.clients[threadIndex].DownloadFile(filePath) + readCloser, _, err := storage.client.DownloadFile(threadIndex, filePath) if err != nil { return err } defer readCloser.Close() - _, err = RateLimitedCopy(chunk, readCloser, storage.DownloadRateLimit/len(storage.clients)) + _, err = RateLimitedCopy(chunk, readCloser, storage.DownloadRateLimit/storage.client.Threads) return err } // UploadFile writes 'content' to the file at 'filePath'. func (storage *B2Storage) UploadFile(threadIndex int, filePath string, content []byte) (err error) { filePath = strings.Replace(filePath, " ", "%20", -1) - return storage.clients[threadIndex].UploadFile(filePath, content, storage.UploadRateLimit/len(storage.clients)) + return storage.client.UploadFile(threadIndex, filePath, content, storage.UploadRateLimit/storage.client.Threads) } // If a local snapshot cache is needed for the storage to avoid downloading/uploading chunks too often when @@ -243,7 +237,5 @@ func (storage *B2Storage) IsFastListing() bool { return true } // Enable the test mode. func (storage *B2Storage) EnableTestMode() { - for _, client := range storage.clients { - client.TestMode = true - } + storage.client.TestMode = true } diff --git a/src/duplicacy_storage.go b/src/duplicacy_storage.go index c229bdc..7cf85cf 100644 --- a/src/duplicacy_storage.go +++ b/src/duplicacy_storage.go @@ -526,11 +526,12 @@ func CreateStorage(preference Preference, resetPassword bool, threads int) (stor return dropboxStorage } else if matched[1] == "b2" { bucket := matched[3] + storageDir := matched[5] - accountID := GetPassword(preference, "b2_id", "Enter Backblaze Account or Application ID:", true, resetPassword) - applicationKey := GetPassword(preference, "b2_key", "Enter Corresponding Backblaze Application Key:", true, resetPassword) + accountID := GetPassword(preference, "b2_id", "Enter Backblaze account or application id:", true, resetPassword) + applicationKey := GetPassword(preference, "b2_key", "Enter corresponding Backblaze application key:", true, resetPassword) - b2Storage, err := CreateB2Storage(accountID, applicationKey, bucket, threads) + b2Storage, err := CreateB2Storage(accountID, applicationKey, bucket, storageDir, threads) if err != nil { LOG_ERROR("STORAGE_CREATE", "Failed to load the Backblaze B2 storage at %s: %v", storageURL, err) return nil diff --git a/src/duplicacy_storage_test.go b/src/duplicacy_storage_test.go index 88a037f..0d1f1b3 100644 --- a/src/duplicacy_storage_test.go +++ b/src/duplicacy_storage_test.go @@ -107,7 +107,7 @@ func loadStorage(localStoragePath string, threads int) (Storage, error) { storage.SetDefaultNestingLevels([]int{2, 3}, 2) return storage, err } else if testStorageName == "b2" { - storage, err := CreateB2Storage(config["account"], config["key"], config["bucket"], threads) + storage, err := CreateB2Storage(config["account"], config["key"], config["bucket"], config["directory"], threads) storage.SetDefaultNestingLevels([]int{2, 3}, 2) return storage, err } else if testStorageName == "gcs-s3" { @@ -296,7 +296,8 @@ func TestStorage(t *testing.T) { LOG_INFO("STORAGE_TEST", "storage: %s", testStorageName) - storage, err := loadStorage(testDir, 1) + threads := 8 + storage, err := loadStorage(testDir, threads) if err != nil { t.Errorf("Failed to create storage: %v", err) return @@ -326,16 +327,16 @@ func TestStorage(t *testing.T) { storage.CreateDirectory(0, "shared") // Upload to the same directory by multiple goroutines - count := 8 + count := threads finished := make(chan int, count) for i := 0; i < count; i++ { - go func(name string) { - err := storage.UploadFile(0, name, []byte("this is a test file")) + go func(threadIndex int, name string) { + err := storage.UploadFile(threadIndex, name, []byte("this is a test file")) if err != nil { t.Errorf("Error to upload '%s': %v", name, err) } finished <- 0 - }(fmt.Sprintf("shared/a/b/c/%d", i)) + }(i, fmt.Sprintf("shared/a/b/c/%d", i)) } for i := 0; i < count; i++ { @@ -384,7 +385,6 @@ func TestStorage(t *testing.T) { snapshotIDs := []string{} for _, snapshotDir := range snapshotDirs { - LOG_INFO("debug", "snapshot dir: %s", snapshotDir) if len(snapshotDir) > 0 && snapshotDir[len(snapshotDir)-1] == '/' { snapshotIDs = append(snapshotIDs, snapshotDir[:len(snapshotDir)-1]) }