1
0
mirror of https://github.com/gilbertchen/duplicacy synced 2025-12-15 15:53:26 +00:00

Rework the Backblaze B2 backend

* All APIs include UploadFile are done via the call() function
* New retry mechanism limiting the maximum backoff each time to 1 minute
* Add an env var DUPLICACY_B2_RETRIES to specify the number of retries
* Handle special/unicode characters in repositor ids
* Allow a directory in a bucket to be used as the storage destination
This commit is contained in:
Gilbert Chen
2019-04-30 23:31:57 -04:00
parent a73ed462b6
commit 57a408a577
5 changed files with 253 additions and 247 deletions

View File

@@ -5,19 +5,22 @@
package duplicacy package duplicacy
import ( import (
"bytes"
"crypto/sha1"
"encoding/base64"
"encoding/hex"
"encoding/json"
"fmt"
"io" "io"
"io/ioutil" "os"
"math/rand" "fmt"
"net/http" "bytes"
"time"
"sync"
"strconv" "strconv"
"strings" "strings"
"time" "net/url"
"net/http"
"math/rand"
"io/ioutil"
"crypto/sha1"
"encoding/hex"
"encoding/json"
"encoding/base64"
) )
type B2Error struct { type B2Error struct {
@@ -39,67 +42,112 @@ var B2AuthorizationURL = "https://api.backblazeb2.com/b2api/v1/b2_authorize_acco
type B2Client struct { type B2Client struct {
HTTPClient *http.Client HTTPClient *http.Client
AccountID string AccountID string
ApplicationKeyID string ApplicationKeyID string
ApplicationKey string ApplicationKey string
BucketName string
BucketID string
StorageDir string
Lock sync.Mutex
AuthorizationToken string AuthorizationToken string
APIURL string APIURL string
DownloadURL string DownloadURL string
BucketName string IsAuthorized bool
BucketID string
UploadURL string UploadURLs []string
UploadToken string UploadTokens []string
TestMode bool Threads int
MaximumRetries int
TestMode bool
} }
func NewB2Client(applicationKeyID string, applicationKey string) *B2Client { // URL encode the given path but keep the slashes intact
func B2Escape(path string) string {
var components []string
for _, c := range strings.Split(path, "/") {
components = append(components, url.QueryEscape(c))
}
return strings.Join(components, "/")
}
func NewB2Client(applicationKeyID string, applicationKey string, storageDir string, threads int) *B2Client {
for storageDir != "" && storageDir[0] == '/' {
storageDir = storageDir[1:]
}
if storageDir != "" && storageDir[len(storageDir) - 1] != '/' {
storageDir += "/"
}
maximumRetries := 10
if value, found := os.LookupEnv("DUPLICACY_B2_RETRIES"); found && value != "" {
maximumRetries, _ = strconv.Atoi(value)
LOG_INFO("B2_RETRIES", "Setting maximum retries for B2 to %d", maximumRetries)
}
client := &B2Client{ client := &B2Client{
HTTPClient: http.DefaultClient, HTTPClient: http.DefaultClient,
ApplicationKeyID: applicationKeyID, ApplicationKeyID: applicationKeyID,
ApplicationKey: applicationKey, ApplicationKey: applicationKey,
StorageDir: storageDir,
UploadURLs: make([]string, threads),
UploadTokens: make([]string, threads),
Threads: threads,
MaximumRetries: maximumRetries,
} }
return client return client
} }
func (client *B2Client) retry(backoff int, response *http.Response) int { func (client *B2Client) getAPIURL() string {
client.Lock.Lock()
defer client.Lock.Unlock()
return client.APIURL
}
func (client *B2Client) getDownloadURL() string {
client.Lock.Lock()
defer client.Lock.Unlock()
return client.DownloadURL
}
func (client *B2Client) retry(retries int, response *http.Response) int {
if response != nil { if response != nil {
if backoffList, found := response.Header["Retry-After"]; found && len(backoffList) > 0 { if backoffList, found := response.Header["Retry-After"]; found && len(backoffList) > 0 {
retryAfter, _ := strconv.Atoi(backoffList[0]) retryAfter, _ := strconv.Atoi(backoffList[0])
if retryAfter >= 1 { if retryAfter >= 1 {
time.Sleep(time.Duration(retryAfter) * time.Second) time.Sleep(time.Duration(retryAfter) * time.Second)
return 0 return 1
} }
} }
} }
if backoff == 0 {
backoff = 1 if retries >= client.MaximumRetries + 1 {
} else { return 0
backoff *= 2
} }
time.Sleep(time.Duration(backoff) * time.Second) retries++
return backoff delay := 1 << uint(retries)
if delay > 64 {
delay = 64
}
delayInSeconds := (rand.Float32() + 1.0) * float32(delay) / 2.0
time.Sleep(time.Duration(delayInSeconds) * time.Second)
return retries
} }
func (client *B2Client) call(url string, method string, requestHeaders map[string]string, input interface{}) (io.ReadCloser, http.Header, int64, error) { func (client *B2Client) call(threadIndex int, requestURL string, method string, requestHeaders map[string]string, input interface{}) (
io.ReadCloser, http.Header, int64, error) {
switch method {
case http.MethodGet:
break
case http.MethodHead:
break
case http.MethodPost:
break
default:
return nil, nil, 0, fmt.Errorf("unhandled http request method: " + method)
}
var response *http.Response var response *http.Response
backoff := 0 retries := 0
for i := 0; i < 8; i++ { for {
var inputReader *bytes.Reader var inputReader io.Reader
isUpload := false
switch input.(type) { switch input.(type) {
default: default:
@@ -108,21 +156,43 @@ func (client *B2Client) call(url string, method string, requestHeaders map[strin
return nil, nil, 0, err return nil, nil, 0, err
} }
inputReader = bytes.NewReader(jsonInput) inputReader = bytes.NewReader(jsonInput)
case []byte:
inputReader = bytes.NewReader(input.([]byte))
case int: case int:
inputReader = bytes.NewReader([]byte("")) inputReader = bytes.NewReader([]byte(""))
case []byte:
isUpload = true
inputReader = bytes.NewReader(input.([]byte))
case *RateLimitedReader:
isUpload = true
rateLimitedReader := input.(*RateLimitedReader)
rateLimitedReader.Reset()
inputReader = rateLimitedReader
} }
request, err := http.NewRequest(method, url, inputReader)
if isUpload {
if client.UploadURLs[threadIndex] == "" || client.UploadTokens[threadIndex] == "" {
err := client.getUploadURL(threadIndex)
if err != nil {
return nil, nil, 0, err
}
}
requestURL = client.UploadURLs[threadIndex]
}
request, err := http.NewRequest(method, requestURL, inputReader)
if err != nil { if err != nil {
return nil, nil, 0, err return nil, nil, 0, err
} }
if url == B2AuthorizationURL { if requestURL == B2AuthorizationURL {
request.Header.Set("Authorization", "Basic "+base64.StdEncoding.EncodeToString([]byte(client.ApplicationKeyID+":"+client.ApplicationKey))) request.Header.Set("Authorization", "Basic "+base64.StdEncoding.EncodeToString([]byte(client.ApplicationKeyID+":"+client.ApplicationKey)))
} else if isUpload {
request.ContentLength, _ = strconv.ParseInt(requestHeaders["Content-Length"], 10, 64)
request.Header.Set("Authorization", client.UploadTokens[threadIndex])
} else { } else {
client.Lock.Lock()
request.Header.Set("Authorization", client.AuthorizationToken) request.Header.Set("Authorization", client.AuthorizationToken)
client.Lock.Unlock()
} }
if requestHeaders != nil { if requestHeaders != nil {
@@ -133,7 +203,9 @@ func (client *B2Client) call(url string, method string, requestHeaders map[strin
if client.TestMode { if client.TestMode {
r := rand.Float32() r := rand.Float32()
if r < 0.5 { if r < 0.5 && isUpload {
request.Header.Set("X-Bz-Test-Mode", "fail_some_uploads")
} else if r < 0.75 {
request.Header.Set("X-Bz-Test-Mode", "expire_some_account_authorization_tokens") request.Header.Set("X-Bz-Test-Mode", "expire_some_account_authorization_tokens")
} else { } else {
request.Header.Set("X-Bz-Test-Mode", "force_cap_exceeded") request.Header.Set("X-Bz-Test-Mode", "force_cap_exceeded")
@@ -142,27 +214,46 @@ func (client *B2Client) call(url string, method string, requestHeaders map[strin
response, err = client.HTTPClient.Do(request) response, err = client.HTTPClient.Do(request)
if err != nil { if err != nil {
if url != B2AuthorizationURL {
LOG_DEBUG("BACKBLAZE_CALL", "URL request '%s' returned an error: %v", url, err) // Don't retry when the first authorization request fails
backoff = client.retry(backoff, response) if requestURL == B2AuthorizationURL && !client.IsAuthorized {
continue return nil, nil, 0, err
} }
return nil, nil, 0, err
LOG_TRACE("BACKBLAZE_CALL", "[%d] URL request '%s' returned an error: %v", threadIndex, requestURL, err)
retries = client.retry(retries, response)
if retries <= 0 {
return nil, nil, 0, err
}
// Clear the upload url to requrest a new one on retry
if isUpload {
client.UploadURLs[threadIndex] = ""
client.UploadTokens[threadIndex] = ""
}
continue
} }
if response.StatusCode < 300 { if response.StatusCode < 300 {
return response.Body, response.Header, response.ContentLength, nil return response.Body, response.Header, response.ContentLength, nil
} }
LOG_DEBUG("BACKBLAZE_CALL", "URL request '%s %s' returned status code %d", method, url, response.StatusCode) e := &B2Error{}
if err := json.NewDecoder(response.Body).Decode(e); err != nil {
LOG_TRACE("BACKBLAZE_CALL", "[%d] URL request '%s %s' returned status code %d", threadIndex, method, requestURL, response.StatusCode)
} else {
LOG_TRACE("BACKBLAZE_CALL", "[%d] URL request '%s %s' returned %d %s", threadIndex, method, requestURL, response.StatusCode, e.Message)
}
io.Copy(ioutil.Discard, response.Body)
response.Body.Close() response.Body.Close()
if response.StatusCode == 401 { if response.StatusCode == 401 {
if url == B2AuthorizationURL { if requestURL == B2AuthorizationURL {
return nil, nil, 0, fmt.Errorf("Authorization failure") return nil, nil, 0, fmt.Errorf("Authorization failure")
} }
client.AuthorizeAccount() client.AuthorizeAccount(threadIndex)
continue continue
} else if response.StatusCode == 403 { } else if response.StatusCode == 403 {
if !client.TestMode { if !client.TestMode {
@@ -176,32 +267,21 @@ func (client *B2Client) call(url string, method string, requestHeaders map[strin
} else if response.StatusCode == 416 { } else if response.StatusCode == 416 {
if http.MethodHead == method { if http.MethodHead == method {
// 416 Requested Range Not Satisfiable // 416 Requested Range Not Satisfiable
return nil, nil, 0, fmt.Errorf("URL request '%s' returned status code %d", url, response.StatusCode) return nil, nil, 0, fmt.Errorf("URL request '%s' returned %d %s", requestURL, response.StatusCode, e.Message)
} }
} else if response.StatusCode == 429 || response.StatusCode == 408 {
backoff = client.retry(backoff, response)
continue
} else if response.StatusCode >= 500 && response.StatusCode <= 599 {
backoff = client.retry(backoff, response)
continue
} else {
LOG_INFO("BACKBLAZE_CALL", "URL request '%s' returned status code %d", url, response.StatusCode)
backoff = client.retry(backoff, response)
continue
} }
defer response.Body.Close() retries = client.retry(retries, response)
if retries <= 0 {
e := &B2Error{} return nil, nil, 0, fmt.Errorf("URL request '%s' returned %d %s", requestURL, response.StatusCode, e.Message)
if err := json.NewDecoder(response.Body).Decode(e); err != nil {
return nil, nil, 0, err
} }
return nil, nil, 0, e if isUpload {
client.UploadURLs[threadIndex] = ""
client.UploadTokens[threadIndex] = ""
}
} }
return nil, nil, 0, fmt.Errorf("Maximum backoff reached")
} }
type B2AuthorizeAccountOutput struct { type B2AuthorizeAccountOutput struct {
@@ -211,9 +291,11 @@ type B2AuthorizeAccountOutput struct {
DownloadURL string DownloadURL string
} }
func (client *B2Client) AuthorizeAccount() (err error) { func (client *B2Client) AuthorizeAccount(threadIndex int) (err error) {
client.Lock.Lock()
defer client.Lock.Unlock()
readCloser, _, _, err := client.call(B2AuthorizationURL, http.MethodPost, nil, make(map[string]string)) readCloser, _, _, err := client.call(threadIndex, B2AuthorizationURL, http.MethodPost, nil, make(map[string]string))
if err != nil { if err != nil {
return err return err
} }
@@ -233,6 +315,7 @@ func (client *B2Client) AuthorizeAccount() (err error) {
client.AuthorizationToken = output.AuthorizationToken client.AuthorizationToken = output.AuthorizationToken
client.APIURL = output.APIURL client.APIURL = output.APIURL
client.DownloadURL = output.DownloadURL client.DownloadURL = output.DownloadURL
client.IsAuthorized = true
return nil return nil
} }
@@ -249,9 +332,9 @@ func (client *B2Client) FindBucket(bucketName string) (err error) {
input := make(map[string]string) input := make(map[string]string)
input["accountId"] = client.AccountID input["accountId"] = client.AccountID
url := client.APIURL + "/b2api/v1/b2_list_buckets" url := client.getAPIURL() + "/b2api/v1/b2_list_buckets"
readCloser, _, _, err := client.call(url, http.MethodPost, nil, input) readCloser, _, _, err := client.call(0, url, http.MethodPost, nil, input)
if err != nil { if err != nil {
return err return err
} }
@@ -293,7 +376,7 @@ type B2ListFileNamesOutput struct {
NextFileId string NextFileId string
} }
func (client *B2Client) ListFileNames(startFileName string, singleFile bool, includeVersions bool) (files []*B2Entry, err error) { func (client *B2Client) ListFileNames(threadIndex int, startFileName string, singleFile bool, includeVersions bool) (files []*B2Entry, err error) {
maxFileCount := 1000 maxFileCount := 1000
if singleFile { if singleFile {
@@ -311,20 +394,21 @@ func (client *B2Client) ListFileNames(startFileName string, singleFile bool, inc
input := make(map[string]interface{}) input := make(map[string]interface{})
input["bucketId"] = client.BucketID input["bucketId"] = client.BucketID
input["startFileName"] = startFileName input["startFileName"] = client.StorageDir + startFileName
input["maxFileCount"] = maxFileCount input["maxFileCount"] = maxFileCount
input["prefix"] = client.StorageDir
for { for {
url := client.APIURL + "/b2api/v1/b2_list_file_names" url := client.getAPIURL() + "/b2api/v1/b2_list_file_names"
requestHeaders := map[string]string{} requestHeaders := map[string]string{}
requestMethod := http.MethodPost requestMethod := http.MethodPost
var requestInput interface{} var requestInput interface{}
requestInput = input requestInput = input
if includeVersions { if includeVersions {
url = client.APIURL + "/b2api/v1/b2_list_file_versions" url = client.getAPIURL() + "/b2api/v1/b2_list_file_versions"
} else if singleFile { } else if singleFile {
// handle a single file with no versions as a special case to download the last byte of the file // handle a single file with no versions as a special case to download the last byte of the file
url = client.DownloadURL + "/file/" + client.BucketName + "/" + startFileName url = client.getDownloadURL() + "/file/" + client.BucketName + "/" + B2Escape(client.StorageDir + startFileName)
// requesting byte -1 works for empty files where 0-0 fails with a 416 error // requesting byte -1 works for empty files where 0-0 fails with a 416 error
requestHeaders["Range"] = "bytes=-1" requestHeaders["Range"] = "bytes=-1"
// HEAD request // HEAD request
@@ -334,7 +418,7 @@ func (client *B2Client) ListFileNames(startFileName string, singleFile bool, inc
var readCloser io.ReadCloser var readCloser io.ReadCloser
var responseHeader http.Header var responseHeader http.Header
var err error var err error
readCloser, responseHeader, _, err = client.call(url, requestMethod, requestHeaders, requestInput) readCloser, responseHeader, _, err = client.call(threadIndex, url, requestMethod, requestHeaders, requestInput)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -347,7 +431,7 @@ func (client *B2Client) ListFileNames(startFileName string, singleFile bool, inc
if singleFile && !includeVersions { if singleFile && !includeVersions {
if responseHeader == nil { if responseHeader == nil {
LOG_DEBUG("BACKBLAZE_LIST", "b2_download_file_by_name did not return headers") LOG_DEBUG("BACKBLAZE_LIST", "%s did not return headers", url)
return []*B2Entry{}, nil return []*B2Entry{}, nil
} }
requiredHeaders := []string{ requiredHeaders := []string{
@@ -361,7 +445,7 @@ func (client *B2Client) ListFileNames(startFileName string, singleFile bool, inc
} }
} }
if len(missingKeys) > 0 { if len(missingKeys) > 0 {
return nil, fmt.Errorf("b2_download_file_by_name missing headers: %s", missingKeys) return nil, fmt.Errorf("%s missing headers: %s", url, missingKeys)
} }
// construct the B2Entry from the response headers of the download request // construct the B2Entry from the response headers of the download request
fileID := responseHeader.Get("x-bz-file-id") fileID := responseHeader.Get("x-bz-file-id")
@@ -378,14 +462,14 @@ func (client *B2Client) ListFileNames(startFileName string, singleFile bool, inc
// this should only execute if the requested file is empty and the range request didn't result in a Content-Range header // this should only execute if the requested file is empty and the range request didn't result in a Content-Range header
fileSize, _ = strconv.ParseInt(lengthString, 0, 64) fileSize, _ = strconv.ParseInt(lengthString, 0, 64)
if fileSize != 0 { if fileSize != 0 {
return nil, fmt.Errorf("b2_download_file_by_name returned non-zero file length") return nil, fmt.Errorf("%s returned non-zero file length", url)
} }
} else { } else {
return nil, fmt.Errorf("could not parse b2_download_file_by_name headers") return nil, fmt.Errorf("could not parse headers returned by %s", url)
} }
fileUploadTimestamp, _ := strconv.ParseInt(responseHeader.Get("X-Bz-Upload-Timestamp"), 0, 64) fileUploadTimestamp, _ := strconv.ParseInt(responseHeader.Get("X-Bz-Upload-Timestamp"), 0, 64)
return []*B2Entry{{fileID, fileName, fileAction, fileSize, fileUploadTimestamp}}, nil return []*B2Entry{{fileID, fileName[len(client.StorageDir):], fileAction, fileSize, fileUploadTimestamp}}, nil
} }
if err = json.NewDecoder(readCloser).Decode(&output); err != nil { if err = json.NewDecoder(readCloser).Decode(&output); err != nil {
@@ -394,31 +478,27 @@ func (client *B2Client) ListFileNames(startFileName string, singleFile bool, inc
ioutil.ReadAll(readCloser) ioutil.ReadAll(readCloser)
if startFileName == "" { for _, file := range output.Files {
files = append(files, output.Files...) file.FileName = file.FileName[len(client.StorageDir):]
} else { if singleFile {
for _, file := range output.Files { if file.FileName == startFileName {
if singleFile { files = append(files, file)
if file.FileName == startFileName { if !includeVersions {
files = append(files, file)
if !includeVersions {
output.NextFileName = ""
break
}
} else {
output.NextFileName = "" output.NextFileName = ""
break break
} }
} else { } else {
if strings.HasPrefix(file.FileName, startFileName) { output.NextFileName = ""
files = append(files, file) break
} else { }
output.NextFileName = "" } else {
break if strings.HasPrefix(file.FileName, startFileName) {
} files = append(files, file)
} else {
output.NextFileName = ""
break
} }
} }
} }
if len(output.NextFileName) == 0 { if len(output.NextFileName) == 0 {
@@ -434,14 +514,14 @@ func (client *B2Client) ListFileNames(startFileName string, singleFile bool, inc
return files, nil return files, nil
} }
func (client *B2Client) DeleteFile(fileName string, fileID string) (err error) { func (client *B2Client) DeleteFile(threadIndex int, fileName string, fileID string) (err error) {
input := make(map[string]string) input := make(map[string]string)
input["fileName"] = fileName input["fileName"] = client.StorageDir + fileName
input["fileId"] = fileID input["fileId"] = fileID
url := client.APIURL + "/b2api/v1/b2_delete_file_version" url := client.getAPIURL() + "/b2api/v1/b2_delete_file_version"
readCloser, _, _, err := client.call(url, http.MethodPost, make(map[string]string), input) readCloser, _, _, err := client.call(threadIndex, url, http.MethodPost, make(map[string]string), input)
if err != nil { if err != nil {
return err return err
} }
@@ -454,14 +534,14 @@ type B2HideFileOutput struct {
FileID string FileID string
} }
func (client *B2Client) HideFile(fileName string) (fileID string, err error) { func (client *B2Client) HideFile(threadIndex int, fileName string) (fileID string, err error) {
input := make(map[string]string) input := make(map[string]string)
input["bucketId"] = client.BucketID input["bucketId"] = client.BucketID
input["fileName"] = fileName input["fileName"] = client.StorageDir + fileName
url := client.APIURL + "/b2api/v1/b2_hide_file" url := client.getAPIURL() + "/b2api/v1/b2_hide_file"
readCloser, _, _, err := client.call(url, http.MethodPost, make(map[string]string), input) readCloser, _, _, err := client.call(threadIndex, url, http.MethodPost, make(map[string]string), input)
if err != nil { if err != nil {
return "", err return "", err
} }
@@ -478,11 +558,11 @@ func (client *B2Client) HideFile(fileName string) (fileID string, err error) {
return output.FileID, nil return output.FileID, nil
} }
func (client *B2Client) DownloadFile(filePath string) (io.ReadCloser, int64, error) { func (client *B2Client) DownloadFile(threadIndex int, filePath string) (io.ReadCloser, int64, error) {
url := client.DownloadURL + "/file/" + client.BucketName + "/" + filePath url := client.getDownloadURL() + "/file/" + client.BucketName + "/" + B2Escape(client.StorageDir + filePath)
readCloser, _, len, err := client.call(url, http.MethodGet, make(map[string]string), 0) readCloser, _, len, err := client.call(threadIndex, url, http.MethodGet, make(map[string]string), 0)
return readCloser, len, err return readCloser, len, err
} }
@@ -492,12 +572,12 @@ type B2GetUploadArgumentOutput struct {
AuthorizationToken string AuthorizationToken string
} }
func (client *B2Client) getUploadURL() error { func (client *B2Client) getUploadURL(threadIndex int) error {
input := make(map[string]string) input := make(map[string]string)
input["bucketId"] = client.BucketID input["bucketId"] = client.BucketID
url := client.APIURL + "/b2api/v1/b2_get_upload_url" url := client.getAPIURL() + "/b2api/v1/b2_get_upload_url"
readCloser, _, _, err := client.call(url, http.MethodPost, make(map[string]string), input) readCloser, _, _, err := client.call(threadIndex, url, http.MethodPost, make(map[string]string), input)
if err != nil { if err != nil {
return err return err
} }
@@ -510,96 +590,29 @@ func (client *B2Client) getUploadURL() error {
return err return err
} }
client.UploadURL = output.UploadURL client.UploadURLs[threadIndex] = output.UploadURL
client.UploadToken = output.AuthorizationToken client.UploadTokens[threadIndex] = output.AuthorizationToken
return nil return nil
} }
func (client *B2Client) UploadFile(filePath string, content []byte, rateLimit int) (err error) { func (client *B2Client) UploadFile(threadIndex int, filePath string, content []byte, rateLimit int) (err error) {
hasher := sha1.New() hasher := sha1.New()
hasher.Write(content) hasher.Write(content)
hash := hex.EncodeToString(hasher.Sum(nil)) hash := hex.EncodeToString(hasher.Sum(nil))
headers := make(map[string]string) headers := make(map[string]string)
headers["X-Bz-File-Name"] = filePath headers["X-Bz-File-Name"] = B2Escape(client.StorageDir + filePath)
headers["Content-Length"] = fmt.Sprintf("%d", len(content))
headers["Content-Type"] = "application/octet-stream" headers["Content-Type"] = "application/octet-stream"
headers["X-Bz-Content-Sha1"] = hash headers["X-Bz-Content-Sha1"] = hash
var response *http.Response readCloser, _, _, err := client.call(threadIndex, "", http.MethodPost, headers, CreateRateLimitedReader(content, rateLimit))
if err != nil {
backoff := 0 return err
for i := 0; i < 8; i++ {
if client.UploadURL == "" || client.UploadToken == "" {
err = client.getUploadURL()
if err != nil {
return err
}
}
request, err := http.NewRequest("POST", client.UploadURL, CreateRateLimitedReader(content, rateLimit))
if err != nil {
return err
}
request.ContentLength = int64(len(content))
request.Header.Set("Authorization", client.UploadToken)
request.Header.Set("X-Bz-File-Name", filePath)
request.Header.Set("Content-Type", "application/octet-stream")
request.Header.Set("X-Bz-Content-Sha1", hash)
for key, value := range headers {
request.Header.Set(key, value)
}
if client.TestMode {
r := rand.Float32()
if r < 0.8 {
request.Header.Set("X-Bz-Test-Mode", "fail_some_uploads")
} else if r < 0.9 {
request.Header.Set("X-Bz-Test-Mode", "expire_some_account_authorization_tokens")
} else {
request.Header.Set("X-Bz-Test-Mode", "force_cap_exceeded")
}
}
response, err = client.HTTPClient.Do(request)
if err != nil {
LOG_DEBUG("BACKBLAZE_UPLOAD", "URL request '%s' returned an error: %v", client.UploadURL, err)
backoff = client.retry(backoff, response)
client.UploadURL = ""
client.UploadToken = ""
continue
}
io.Copy(ioutil.Discard, response.Body)
response.Body.Close()
if response.StatusCode < 300 {
return nil
}
LOG_DEBUG("BACKBLAZE_UPLOAD", "URL request '%s' returned status code %d", client.UploadURL, response.StatusCode)
if response.StatusCode == 401 {
LOG_INFO("BACKBLAZE_UPLOAD", "Re-authorization required")
client.UploadURL = ""
client.UploadToken = ""
continue
} else if response.StatusCode == 403 {
if !client.TestMode {
return fmt.Errorf("B2 cap exceeded")
}
continue
} else {
LOG_INFO("BACKBLAZE_UPLOAD", "URL request '%s' returned status code %d", client.UploadURL, response.StatusCode)
backoff = client.retry(backoff, response)
client.UploadURL = ""
client.UploadToken = ""
}
} }
return fmt.Errorf("Maximum backoff reached") readCloser.Close()
return nil
} }

View File

@@ -37,7 +37,7 @@ func createB2ClientForTest(t *testing.T) (*B2Client, string) {
return nil, "" return nil, ""
} }
return NewB2Client(b2["account"], b2["key"]), b2["bucket"] return NewB2Client(b2["account"], b2["key"], b2["directory"], 1), b2["bucket"]
} }
@@ -50,7 +50,7 @@ func TestB2Client(t *testing.T) {
b2Client.TestMode = true b2Client.TestMode = true
err := b2Client.AuthorizeAccount() err := b2Client.AuthorizeAccount(0)
if err != nil { if err != nil {
t.Errorf("Failed to authorize the b2 account: %v", err) t.Errorf("Failed to authorize the b2 account: %v", err)
return return
@@ -64,14 +64,14 @@ func TestB2Client(t *testing.T) {
testDirectory := "b2client_test/" testDirectory := "b2client_test/"
files, err := b2Client.ListFileNames(testDirectory, false, false) files, err := b2Client.ListFileNames(0, testDirectory, false, false)
if err != nil { if err != nil {
t.Errorf("Failed to list files: %v", err) t.Errorf("Failed to list files: %v", err)
return return
} }
for _, file := range files { for _, file := range files {
err = b2Client.DeleteFile(file.FileName, file.FileID) err = b2Client.DeleteFile(0, file.FileName, file.FileID)
if err != nil { if err != nil {
t.Errorf("Failed to delete file '%s': %v", file.FileName, err) t.Errorf("Failed to delete file '%s': %v", file.FileName, err)
} }
@@ -90,14 +90,14 @@ func TestB2Client(t *testing.T) {
hash := sha256.Sum256(content) hash := sha256.Sum256(content)
name := hex.EncodeToString(hash[:]) name := hex.EncodeToString(hash[:])
err = b2Client.UploadFile(testDirectory+name, content, 100) err = b2Client.UploadFile(0, testDirectory+name, content, 100)
if err != nil { if err != nil {
t.Errorf("Error uploading file '%s': %v", name, err) t.Errorf("Error uploading file '%s': %v", name, err)
return return
} }
} }
files, err = b2Client.ListFileNames(testDirectory, false, false) files, err = b2Client.ListFileNames(0, testDirectory, false, false)
if err != nil { if err != nil {
t.Errorf("Failed to list files: %v", err) t.Errorf("Failed to list files: %v", err)
return return
@@ -105,7 +105,7 @@ func TestB2Client(t *testing.T) {
for _, file := range files { for _, file := range files {
readCloser, _, err := b2Client.DownloadFile(file.FileName) readCloser, _, err := b2Client.DownloadFile(0, file.FileName)
if err != nil { if err != nil {
t.Errorf("Error downloading file '%s': %v", file.FileName, err) t.Errorf("Error downloading file '%s': %v", file.FileName, err)
return return
@@ -125,7 +125,7 @@ func TestB2Client(t *testing.T) {
} }
for _, file := range files { for _, file := range files {
err = b2Client.DeleteFile(file.FileName, file.FileID) err = b2Client.DeleteFile(0, file.FileName, file.FileID)
if err != nil { if err != nil {
t.Errorf("Failed to delete file '%s': %v", file.FileName, err) t.Errorf("Failed to delete file '%s': %v", file.FileName, err)
} }

View File

@@ -11,32 +11,26 @@ import (
type B2Storage struct { type B2Storage struct {
StorageBase StorageBase
clients []*B2Client client *B2Client
} }
// CreateB2Storage creates a B2 storage object. // CreateB2Storage creates a B2 storage object.
func CreateB2Storage(accountID string, applicationKey string, bucket string, threads int) (storage *B2Storage, err error) { func CreateB2Storage(accountID string, applicationKey string, bucket string, storageDir string, threads int) (storage *B2Storage, err error) {
var clients []*B2Client client := NewB2Client(accountID, applicationKey, storageDir, threads)
for i := 0; i < threads; i++ { err = client.AuthorizeAccount(0)
client := NewB2Client(accountID, applicationKey) if err != nil {
return nil, err
}
err = client.AuthorizeAccount() err = client.FindBucket(bucket)
if err != nil { if err != nil {
return nil, err return nil, err
}
err = client.FindBucket(bucket)
if err != nil {
return nil, err
}
clients = append(clients, client)
} }
storage = &B2Storage{ storage = &B2Storage{
clients: clients, client: client,
} }
storage.DerivedStorage = storage storage.DerivedStorage = storage
@@ -56,7 +50,7 @@ func (storage *B2Storage) ListFiles(threadIndex int, dir string) (files []string
includeVersions = true includeVersions = true
} }
entries, err := storage.clients[threadIndex].ListFileNames(dir, false, includeVersions) entries, err := storage.client.ListFileNames(threadIndex, dir, false, includeVersions)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
@@ -102,7 +96,7 @@ func (storage *B2Storage) DeleteFile(threadIndex int, filePath string) (err erro
if strings.HasSuffix(filePath, ".fsl") { if strings.HasSuffix(filePath, ".fsl") {
filePath = filePath[:len(filePath)-len(".fsl")] filePath = filePath[:len(filePath)-len(".fsl")]
entries, err := storage.clients[threadIndex].ListFileNames(filePath, true, true) entries, err := storage.client.ListFileNames(threadIndex, filePath, true, true)
if err != nil { if err != nil {
return err return err
} }
@@ -116,7 +110,7 @@ func (storage *B2Storage) DeleteFile(threadIndex int, filePath string) (err erro
toBeDeleted = true toBeDeleted = true
err = storage.clients[threadIndex].DeleteFile(filePath, entry.FileID) err = storage.client.DeleteFile(threadIndex, filePath, entry.FileID)
if err != nil { if err != nil {
return err return err
} }
@@ -125,7 +119,7 @@ func (storage *B2Storage) DeleteFile(threadIndex int, filePath string) (err erro
return nil return nil
} else { } else {
entries, err := storage.clients[threadIndex].ListFileNames(filePath, true, false) entries, err := storage.client.ListFileNames(threadIndex, filePath, true, false)
if err != nil { if err != nil {
return err return err
} }
@@ -133,7 +127,7 @@ func (storage *B2Storage) DeleteFile(threadIndex int, filePath string) (err erro
if len(entries) == 0 { if len(entries) == 0 {
return nil return nil
} }
return storage.clients[threadIndex].DeleteFile(filePath, entries[0].FileID) return storage.client.DeleteFile(threadIndex, filePath, entries[0].FileID)
} }
} }
@@ -160,10 +154,10 @@ func (storage *B2Storage) MoveFile(threadIndex int, from string, to string) (err
} }
if filePath == from { if filePath == from {
_, err = storage.clients[threadIndex].HideFile(from) _, err = storage.client.HideFile(threadIndex, from)
return err return err
} else { } else {
entries, err := storage.clients[threadIndex].ListFileNames(filePath, true, true) entries, err := storage.client.ListFileNames(threadIndex, filePath, true, true)
if err != nil { if err != nil {
return err return err
} }
@@ -171,7 +165,7 @@ func (storage *B2Storage) MoveFile(threadIndex int, from string, to string) (err
return nil return nil
} }
return storage.clients[threadIndex].DeleteFile(filePath, entries[0].FileID) return storage.client.DeleteFile(threadIndex, filePath, entries[0].FileID)
} }
} }
@@ -188,7 +182,7 @@ func (storage *B2Storage) GetFileInfo(threadIndex int, filePath string) (exist b
filePath = filePath[:len(filePath)-len(".fsl")] filePath = filePath[:len(filePath)-len(".fsl")]
} }
entries, err := storage.clients[threadIndex].ListFileNames(filePath, true, isFossil) entries, err := storage.client.ListFileNames(threadIndex, filePath, true, isFossil)
if err != nil { if err != nil {
return false, false, 0, err return false, false, 0, err
} }
@@ -211,21 +205,21 @@ func (storage *B2Storage) GetFileInfo(threadIndex int, filePath string) (exist b
func (storage *B2Storage) DownloadFile(threadIndex int, filePath string, chunk *Chunk) (err error) { func (storage *B2Storage) DownloadFile(threadIndex int, filePath string, chunk *Chunk) (err error) {
filePath = strings.Replace(filePath, " ", "%20", -1) filePath = strings.Replace(filePath, " ", "%20", -1)
readCloser, _, err := storage.clients[threadIndex].DownloadFile(filePath) readCloser, _, err := storage.client.DownloadFile(threadIndex, filePath)
if err != nil { if err != nil {
return err return err
} }
defer readCloser.Close() defer readCloser.Close()
_, err = RateLimitedCopy(chunk, readCloser, storage.DownloadRateLimit/len(storage.clients)) _, err = RateLimitedCopy(chunk, readCloser, storage.DownloadRateLimit/storage.client.Threads)
return err return err
} }
// UploadFile writes 'content' to the file at 'filePath'. // UploadFile writes 'content' to the file at 'filePath'.
func (storage *B2Storage) UploadFile(threadIndex int, filePath string, content []byte) (err error) { func (storage *B2Storage) UploadFile(threadIndex int, filePath string, content []byte) (err error) {
filePath = strings.Replace(filePath, " ", "%20", -1) filePath = strings.Replace(filePath, " ", "%20", -1)
return storage.clients[threadIndex].UploadFile(filePath, content, storage.UploadRateLimit/len(storage.clients)) return storage.client.UploadFile(threadIndex, filePath, content, storage.UploadRateLimit/storage.client.Threads)
} }
// If a local snapshot cache is needed for the storage to avoid downloading/uploading chunks too often when // If a local snapshot cache is needed for the storage to avoid downloading/uploading chunks too often when
@@ -243,7 +237,5 @@ func (storage *B2Storage) IsFastListing() bool { return true }
// Enable the test mode. // Enable the test mode.
func (storage *B2Storage) EnableTestMode() { func (storage *B2Storage) EnableTestMode() {
for _, client := range storage.clients { storage.client.TestMode = true
client.TestMode = true
}
} }

View File

@@ -526,11 +526,12 @@ func CreateStorage(preference Preference, resetPassword bool, threads int) (stor
return dropboxStorage return dropboxStorage
} else if matched[1] == "b2" { } else if matched[1] == "b2" {
bucket := matched[3] bucket := matched[3]
storageDir := matched[5]
accountID := GetPassword(preference, "b2_id", "Enter Backblaze Account or Application ID:", true, resetPassword) accountID := GetPassword(preference, "b2_id", "Enter Backblaze account or application id:", true, resetPassword)
applicationKey := GetPassword(preference, "b2_key", "Enter Corresponding Backblaze Application Key:", true, resetPassword) applicationKey := GetPassword(preference, "b2_key", "Enter corresponding Backblaze application key:", true, resetPassword)
b2Storage, err := CreateB2Storage(accountID, applicationKey, bucket, threads) b2Storage, err := CreateB2Storage(accountID, applicationKey, bucket, storageDir, threads)
if err != nil { if err != nil {
LOG_ERROR("STORAGE_CREATE", "Failed to load the Backblaze B2 storage at %s: %v", storageURL, err) LOG_ERROR("STORAGE_CREATE", "Failed to load the Backblaze B2 storage at %s: %v", storageURL, err)
return nil return nil

View File

@@ -107,7 +107,7 @@ func loadStorage(localStoragePath string, threads int) (Storage, error) {
storage.SetDefaultNestingLevels([]int{2, 3}, 2) storage.SetDefaultNestingLevels([]int{2, 3}, 2)
return storage, err return storage, err
} else if testStorageName == "b2" { } else if testStorageName == "b2" {
storage, err := CreateB2Storage(config["account"], config["key"], config["bucket"], threads) storage, err := CreateB2Storage(config["account"], config["key"], config["bucket"], config["directory"], threads)
storage.SetDefaultNestingLevels([]int{2, 3}, 2) storage.SetDefaultNestingLevels([]int{2, 3}, 2)
return storage, err return storage, err
} else if testStorageName == "gcs-s3" { } else if testStorageName == "gcs-s3" {
@@ -296,7 +296,8 @@ func TestStorage(t *testing.T) {
LOG_INFO("STORAGE_TEST", "storage: %s", testStorageName) LOG_INFO("STORAGE_TEST", "storage: %s", testStorageName)
storage, err := loadStorage(testDir, 1) threads := 8
storage, err := loadStorage(testDir, threads)
if err != nil { if err != nil {
t.Errorf("Failed to create storage: %v", err) t.Errorf("Failed to create storage: %v", err)
return return
@@ -326,16 +327,16 @@ func TestStorage(t *testing.T) {
storage.CreateDirectory(0, "shared") storage.CreateDirectory(0, "shared")
// Upload to the same directory by multiple goroutines // Upload to the same directory by multiple goroutines
count := 8 count := threads
finished := make(chan int, count) finished := make(chan int, count)
for i := 0; i < count; i++ { for i := 0; i < count; i++ {
go func(name string) { go func(threadIndex int, name string) {
err := storage.UploadFile(0, name, []byte("this is a test file")) err := storage.UploadFile(threadIndex, name, []byte("this is a test file"))
if err != nil { if err != nil {
t.Errorf("Error to upload '%s': %v", name, err) t.Errorf("Error to upload '%s': %v", name, err)
} }
finished <- 0 finished <- 0
}(fmt.Sprintf("shared/a/b/c/%d", i)) }(i, fmt.Sprintf("shared/a/b/c/%d", i))
} }
for i := 0; i < count; i++ { for i := 0; i < count; i++ {
@@ -384,7 +385,6 @@ func TestStorage(t *testing.T) {
snapshotIDs := []string{} snapshotIDs := []string{}
for _, snapshotDir := range snapshotDirs { for _, snapshotDir := range snapshotDirs {
LOG_INFO("debug", "snapshot dir: %s", snapshotDir)
if len(snapshotDir) > 0 && snapshotDir[len(snapshotDir)-1] == '/' { if len(snapshotDir) > 0 && snapshotDir[len(snapshotDir)-1] == '/' {
snapshotIDs = append(snapshotIDs, snapshotDir[:len(snapshotDir)-1]) snapshotIDs = append(snapshotIDs, snapshotDir[:len(snapshotDir)-1])
} }