1
0
mirror of https://github.com/gilbertchen/duplicacy synced 2025-12-06 00:03:38 +00:00

Compare commits

...

55 Commits

Author SHA1 Message Date
Gilbert Chen
8e9caea201 Retry on broken pipe in Azure backend
Azure sometimes disconnect the connection randomly when uploading files.  The
returned error was 'broken pipe' but this error is wrapped deep in multiple
levels of errors so we have to check the error string instead.
2019-05-07 22:35:51 -04:00
Gilbert Chen
18ba415f56 Bump version to 2.2.0 2019-05-06 12:26:40 -04:00
Gilbert Chen
458687d543 The cat command doesn't need to load the entire file into memory
It can print out the chunk as soon as a chunk is retrieved.  This avoids
reconstructing the file in the memory which can be an issue with large files.
2019-05-03 11:33:16 -04:00
Gilbert Chen
57a408a577 Rework the Backblaze B2 backend
* All APIs include UploadFile are done via the call() function
* New retry mechanism limiting the maximum backoff each time to 1 minute
* Add an env var DUPLICACY_B2_RETRIES to specify the number of retries
* Handle special/unicode characters in repositor ids
* Allow a directory in a bucket to be used as the storage destination
2019-04-30 23:31:57 -04:00
Gilbert Chen
a73ed462b6 Roll back the import path change 'import duplicacy/src'
Import paths are relative to $GOPATH and $GOROOT, so 'import duplicacy/src'
unfortunately doesn't work.
2019-04-27 22:01:17 -04:00
gilbertchen
e56efc1d3a Merge pull request #554 from arikorn/ask_b2_application_key
Request B2 "Backblaze Account or Application ID"
2019-04-27 10:55:12 -04:00
gilbertchen
bb58f42a37 Merge pull request #529 from turtleleo/patch-1
Retry on 408 error from Google Drive (Update to duplicacy_gcdstorage.go)
2019-04-27 10:53:26 -04:00
Thomas Tempelmann
22e8d9e60a Change the import from "github.com/gilbertchen/duplicacy/src" to "duplicacy/src" so that a forked project uses the forked "src" dir and not the original one. 2019-04-27 00:10:50 -04:00
Gilbert Chen
4eb174cec5 Remove a few util functions that aren't necessary 2019-04-26 23:47:25 -04:00
gilbertchen
6fd3fbd568 Merge pull request #514 from a-s-z-home/filter_extension
Filter extension: @ to include another file
2019-04-26 21:56:42 -04:00
Gilbert Chen
a6fe3d785e Fixed a MoveFile bug in Wasabi when the storage is at the root of a bucket
When the storage dir is empty, the destination path passed to the MOVE api starts
with a / which causes Wasabi to fail silently.
2019-04-24 16:48:25 -04:00
Gilbert Chen
1da151f9d9 Add an additional lookup for a chunk that isn't in the chunk list
A chunk not in the chunk list may actually exists in two scenarios:
* the chunk may be a special snapshot chunk that contains the chunk sequence,
  so it may be resurrected by the chunk downloader if it had been turned into
  a fossil before
* if the API to list all chunks doesn't return the complete list due to some
  bug

This additional lookup avoid reporting the missing chunk prematurely.
2019-04-21 20:32:21 -04:00
Gilbert Chen
4b69c1162e Fix a memory issue that check -tabular uses too much memory with many revisions
The call to GetSnapshotChunks in ShowStatisticsTabular sets keepChunkHashes to
true -- this can cause too much memory consumption with hundreds of revisions.
2019-04-20 22:47:03 -04:00
Gilbert Chen
abcb4d75c1 Fixed a bug where filenames starting with i or e are mistakenly interpreted as regex 2019-04-07 22:43:36 -04:00
Ari Kornfeld
10d2058738 Request B2 "Backblaze Account or Application ID" (rather than "Account ID")
fixes #539 (Duplicacy init for B2 storage still ask for account ID)
2019-04-02 22:20:29 -07:00
Gilbert Chen
43a5ffe011 Fixed a bug where a wrong variable is used as the number of threads 2019-03-13 15:38:26 -04:00
Gilbert Chen
d16273fe2b Set the content length for upload 2019-03-04 15:34:32 -05:00
Gilbert Chen
2b56d576c7 Fixed a webdav compatibility issue with rclone and other bugs 2019-02-26 14:00:02 -05:00
turtleleo
82c6c15f1c Update duplicacy_gcdstorage.go
Add automatic retry on receiving error 408 (request timeout) from Google Drive.
2019-01-16 13:12:46 -05:00
gilbertchen
bebd7c4b77 Merge pull request #495 from plasticrake/environment-variables
Replace special characters in environment variable name with underscores
2019-01-04 17:04:29 -05:00
gilbertchen
46376d82ed Merge pull request #489 from gilbertchen/sftp_retry
Retry on EOF errors in the SFTP backend
2019-01-04 13:53:44 -05:00
gilbertchen
c4a3dd1eeb Merge pull request #454 from mikecook/master
spelling fix, go fmt, go vet
2019-01-04 13:50:17 -05:00
gilbertchen
31c25e98f7 Merge branch 'master' into master 2019-01-04 13:48:44 -05:00
gilbertchen
242db8377e Merge pull request #447 from s4y/patch-1
Acknowledge malware/spam warnings from GCD
2019-01-04 13:33:11 -05:00
Gilbert Chen
e6d8b7d070 Use 1024*1024 as 1M as opposed to 10^6 2019-01-04 13:29:30 -05:00
Gilbert Chen
bb652d0a8c Add a Sync call before close when uploading a file to local storage 2019-01-03 12:44:50 -05:00
Gilbert Chen
a354d03bc9 Remove a binary file accidentally checked in 2019-01-02 21:36:04 -05:00
Michael Cook
4b9524bd43 go vet: unreachable code 2018-12-29 13:20:11 +01:00
Michael Cook
a782d42ad6 go vet: result of fmt.Errorf call not used 2018-12-29 13:20:10 +01:00
Michael Cook
0762c448c4 gofmt -s 2018-12-29 13:20:10 +01:00
Michael Cook
741644b575 spelling 2018-12-29 13:04:40 +01:00
a-s-z-home
df7487cc0b Merge remote-tracking branch 'origin/master' into filter_extension 2018-11-15 01:40:39 +01:00
Gilbert Chen
8aa67c8162 Support ssh private key files encrypted by passphrases 2018-11-09 14:17:56 -05:00
Gilbert Chen
53548a895f Add the \?\ prefix to all paths on Windows 2018-11-08 21:29:02 -05:00
a-s-z-home
5e8baab4ec - Reverted changes to exclude mechanism of .duplicacy directory. 2018-11-05 22:39:11 +01:00
a-s-z-home
e1fa39008d Use new filter processing function for restore command.
- You can now include a filter file by using "@<filename>".
2018-11-05 00:59:39 +01:00
a-s-z-home
aaebf4510c - Replaced static check for .duplicacy directory with usage of predefined filters.
Do not "misuse" property nobackupFile to trigger this feature.
- Restructured ProcessFilterFile function and splitted it in smaller parts.
- Prepare usage of new filter syntax for arguments of restore command.
2018-11-05 00:32:12 +01:00
a-s-z-home
96dd28995b Added an include mechanism for filter file.
- Using @<filename>, you can now include other files. Relative paths are supported.
  This is useful, if you have several repositories with some different filters and a common filter base set.
2018-11-03 20:39:03 +01:00
a-s-z-home
166f6e6266 Added string array helper functions Contains and Find. 2018-11-03 20:20:00 +01:00
a-s-z-home
86c89f43a0 Automatically exclude .duplicacy directory only, if nobackup_file is not
set.
2018-11-03 20:13:43 +01:00
Gilbert Chen
2e5cbc73b9 Bump version to 2.1.2 2018-11-03 11:45:50 -04:00
Gilbert Chen
21b3d9e57f Padding size was incorrect -- didn't pad to multiples of 256 2018-11-03 11:42:03 -04:00
Gilbert Chen
244b797a1c Print the number of files if available in the snapshot file 2018-11-03 10:38:35 -04:00
Gilbert Chen
073292018c Don't show snapshots whose tags don't match the given one 2018-10-28 23:30:22 -04:00
Gilbert Chen
15f15aa2ca Show more statistics in the check command 2018-10-28 23:27:36 -04:00
Gilbert Chen
d8e13d8d85 Benchmark may incorrectly list the chunks directory when looking for previous temporary files 2018-10-22 09:11:15 -04:00
Gilbert Chen
bfb4b44c0a Optimizating restore to avoid reading newly created sparse file 2018-10-21 22:43:24 -04:00
Patrick Seal
cce798ceac Replace special characters in environment variable name with underscores 2018-09-18 11:16:31 -07:00
Gilbert Chen
22a0b222db Align snapshot times to the beginning of days when calculating the differences 2018-09-08 20:31:49 -04:00
Gilbert Chen
674d35e5ca Get accountID from b2_authorize_account and supply it to b2_list_buckets 2018-09-08 20:21:49 -04:00
Gilbert Chen
ab28115f95 Retry on EOF errors in the SFTP backend 2018-08-29 23:15:00 -04:00
Gilbert Chen
a7d2a941be Restore UID and GID of symlinks 2018-08-29 17:10:35 -04:00
Gilbert Chen
39d71a3256 Fixed a divide by zero bug when the repository has only zero-byte files 2018-08-10 12:17:40 -04:00
Gilbert Chen
9d10cc77fc Do not update the Windows keyring file if the password remains unchanged 2018-08-08 14:03:49 -04:00
Sidney San Martín
20172e07e6 Acknowledge malware/spam warnings from GCD
If Google thinks that a file is malware or spam (which can happen
spuriously to blobs of encrypted data), it will prevent the initial
download and return an error with reason "cannotDownloadAbusiveFile".
The API expects a program to prompt the user in this case and then,
optionally, let them bypass it.

Ideally duplicacy should prompt, but this patch just logs a warning.

When I printed `err.(*googleapi.Error)`, its `Errors` field was empty,
hence the sketchy string matching. It's possible that I did something
wrong, though.
2018-06-13 10:49:04 -07:00
33 changed files with 867 additions and 559 deletions

View File

@@ -7,6 +7,7 @@ package main
import ( import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"net/http"
"os" "os"
"os/exec" "os/exec"
"os/signal" "os/signal"
@@ -16,7 +17,6 @@ import (
"runtime" "runtime"
"strconv" "strconv"
"strings" "strings"
"net/http"
_ "net/http/pprof" _ "net/http/pprof"
@@ -159,8 +159,6 @@ func setGlobalOptions(context *cli.Context) {
}() }()
} }
duplicacy.RunInBackground = context.GlobalBool("background") duplicacy.RunInBackground = context.GlobalBool("background")
} }
@@ -309,11 +307,11 @@ func configRepository(context *cli.Context, init bool) {
repositoryPath = context.String("repository") repositoryPath = context.String("repository")
} }
preference := duplicacy.Preference{ preference := duplicacy.Preference{
Name: storageName, Name: storageName,
SnapshotID: snapshotID, SnapshotID: snapshotID,
RepositoryPath: repositoryPath, RepositoryPath: repositoryPath,
StorageURL: storageURL, StorageURL: storageURL,
Encrypted: context.Bool("encrypt"), Encrypted: context.Bool("encrypt"),
} }
storage := duplicacy.CreateStorage(preference, true, 1) storage := duplicacy.CreateStorage(preference, true, 1)
@@ -533,7 +531,7 @@ func setPreference(context *cli.Context) {
if triBool.IsSet() { if triBool.IsSet() {
newPreference.DoNotSavePassword = triBool.IsTrue() newPreference.DoNotSavePassword = triBool.IsTrue()
} }
newPreference.NobackupFile = context.String("nobackup-file") newPreference.NobackupFile = context.String("nobackup-file")
key := context.String("key") key := context.String("key")
@@ -650,7 +648,7 @@ func changePassword(context *cli.Context) {
duplicacy.LOG_INFO("CONFIG_CLEAN", "The local copy of the old config has been removed") duplicacy.LOG_INFO("CONFIG_CLEAN", "The local copy of the old config has been removed")
} }
} }
} () }()
err = storage.DeleteFile(0, "config") err = storage.DeleteFile(0, "config")
if err != nil { if err != nil {
@@ -784,27 +782,17 @@ func restoreRepository(context *cli.Context) {
pattern = pattern[1:] pattern = pattern[1:]
} }
if duplicacy.IsUnspecifiedFilter(pattern) {
pattern = "+" + pattern
}
if duplicacy.IsEmptyFilter(pattern) {
continue
}
if strings.HasPrefix(pattern, "i:") || strings.HasPrefix(pattern, "e:") {
valid, err := duplicacy.IsValidRegex(pattern[2:])
if !valid || err != nil {
duplicacy.LOG_ERROR("SNAPSHOT_FILTER", "Invalid regular expression encountered for filter: \"%s\", error: %v", pattern, err)
}
}
patterns = append(patterns, pattern) patterns = append(patterns, pattern)
} }
patterns = duplicacy.ProcessFilterLines(patterns, make([]string, 0))
duplicacy.LOG_DEBUG("REGEX_DEBUG", "There are %d compiled regular expressions stored", len(duplicacy.RegexMap)) duplicacy.LOG_DEBUG("REGEX_DEBUG", "There are %d compiled regular expressions stored", len(duplicacy.RegexMap))
duplicacy.LOG_INFO("SNAPSHOT_FILTER", "Loaded %d include/exclude pattern(s)", len(patterns))
storage.SetRateLimits(context.Int("limit-rate"), 0) storage.SetRateLimits(context.Int("limit-rate"), 0)
backupManager := duplicacy.CreateBackupManager(preference.SnapshotID, storage, repository, password, preference.NobackupFile) backupManager := duplicacy.CreateBackupManager(preference.SnapshotID, storage, repository, password, preference.NobackupFile)
duplicacy.SavePassword(*preference, "password", password) duplicacy.SavePassword(*preference, "password", password)
@@ -1262,7 +1250,7 @@ func infoStorage(context *cli.Context) {
for _, dir := range dirs { for _, dir := range dirs {
if len(dir) > 0 && dir[len(dir)-1] == '/' { if len(dir) > 0 && dir[len(dir)-1] == '/' {
duplicacy.LOG_INFO("STORAGE_SNAPSHOT", "%s", dir[0:len(dir) - 1]) duplicacy.LOG_INFO("STORAGE_SNAPSHOT", "%s", dir[0:len(dir)-1])
} }
} }
@@ -1298,7 +1286,7 @@ func benchmark(context *cli.Context) {
} }
threads := downloadThreads threads := downloadThreads
if (threads < uploadThreads) { if threads < uploadThreads {
threads = uploadThreads threads = uploadThreads
} }
@@ -1309,7 +1297,7 @@ func benchmark(context *cli.Context) {
if storage == nil { if storage == nil {
return return
} }
duplicacy.Benchmark(repository, storage, int64(fileSize) * 1000000, chunkSize * 1024 * 1024, chunkCount, uploadThreads, downloadThreads) duplicacy.Benchmark(repository, storage, int64(fileSize) * 1024 * 1024, chunkSize * 1024 * 1024, chunkCount, uploadThreads, downloadThreads)
} }
func main() { func main() {
@@ -1773,8 +1761,8 @@ func main() {
Argument: "<storage name>", Argument: "<storage name>",
}, },
cli.BoolFlag{ cli.BoolFlag{
Name: "bit-identical", Name: "bit-identical",
Usage: "(when using -copy) make the new storage bit-identical to also allow rsync etc.", Usage: "(when using -copy) make the new storage bit-identical to also allow rsync etc.",
}, },
cli.StringFlag{ cli.StringFlag{
Name: "repository", Name: "repository",
@@ -1815,10 +1803,10 @@ func main() {
Arg: "true", Arg: "true",
}, },
cli.StringFlag{ cli.StringFlag{
Name: "nobackup-file", Name: "nobackup-file",
Usage: "Directories containing a file with this name will not be backed up", Usage: "Directories containing a file with this name will not be backed up",
Argument: "<file name>", Argument: "<file name>",
Value: "", Value: "",
}, },
cli.StringFlag{ cli.StringFlag{
Name: "key", Name: "key",
@@ -1984,8 +1972,8 @@ func main() {
Argument: "<address:port>", Argument: "<address:port>",
}, },
cli.StringFlag{ cli.StringFlag{
Name: "comment", Name: "comment",
Usage: "add a comment to identify the process", Usage: "add a comment to identify the process",
}, },
} }
@@ -1993,13 +1981,13 @@ func main() {
app.Name = "duplicacy" app.Name = "duplicacy"
app.HelpName = "duplicacy" app.HelpName = "duplicacy"
app.Usage = "A new generation cloud backup tool based on lock-free deduplication" app.Usage = "A new generation cloud backup tool based on lock-free deduplication"
app.Version = "2.1.1" + " (" + GitCommit + ")" app.Version = "2.2.0" + " (" + GitCommit + ")"
// If the program is interrupted, call the RunAtError function. // If the program is interrupted, call the RunAtError function.
c := make(chan os.Signal, 1) c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt) signal.Notify(c, os.Interrupt)
go func() { go func() {
for _ = range c { for range c {
duplicacy.RunAtError() duplicacy.RunAtError()
os.Exit(1) os.Exit(1)
} }

View File

@@ -104,7 +104,7 @@ func (azureStorage *AzureStorage) ListFiles(threadIndex int, dir string) (files
if dir == "snapshots/" { if dir == "snapshots/" {
for subDir, _ := range subDirs { for subDir := range subDirs {
files = append(files, subDir) files = append(files, subDir)
} }
@@ -166,9 +166,21 @@ func (storage *AzureStorage) DownloadFile(threadIndex int, filePath string, chun
// UploadFile writes 'content' to the file at 'filePath'. // UploadFile writes 'content' to the file at 'filePath'.
func (storage *AzureStorage) UploadFile(threadIndex int, filePath string, content []byte) (err error) { func (storage *AzureStorage) UploadFile(threadIndex int, filePath string, content []byte) (err error) {
reader := CreateRateLimitedReader(content, storage.UploadRateLimit/len(storage.containers))
blob := storage.containers[threadIndex].GetBlobReference(filePath) tries := 0
return blob.CreateBlockBlobFromReader(reader, nil)
for {
reader := CreateRateLimitedReader(content, storage.UploadRateLimit/len(storage.containers))
blob := storage.containers[threadIndex].GetBlobReference(filePath)
err = blob.CreateBlockBlobFromReader(reader, nil)
if err == nil || !strings.Contains(err.Error(), "write: broken pipe") || tries >= 3 {
return err
}
LOG_INFO("AZURE_RETRY", "Connection unexpectedly terminated: %v; retrying", err)
tries++
}
} }

View File

@@ -5,19 +5,22 @@
package duplicacy package duplicacy
import ( import (
"bytes"
"crypto/sha1"
"encoding/base64"
"encoding/hex"
"encoding/json"
"fmt"
"io" "io"
"io/ioutil" "os"
"math/rand" "fmt"
"net/http" "bytes"
"time"
"sync"
"strconv" "strconv"
"strings" "strings"
"time" "net/url"
"net/http"
"math/rand"
"io/ioutil"
"crypto/sha1"
"encoding/hex"
"encoding/json"
"encoding/base64"
) )
type B2Error struct { type B2Error struct {
@@ -39,66 +42,112 @@ var B2AuthorizationURL = "https://api.backblazeb2.com/b2api/v1/b2_authorize_acco
type B2Client struct { type B2Client struct {
HTTPClient *http.Client HTTPClient *http.Client
AccountID string AccountID string
ApplicationKeyID string
ApplicationKey string ApplicationKey string
BucketName string
BucketID string
StorageDir string
Lock sync.Mutex
AuthorizationToken string AuthorizationToken string
APIURL string APIURL string
DownloadURL string DownloadURL string
BucketName string IsAuthorized bool
BucketID string
UploadURL string UploadURLs []string
UploadToken string UploadTokens []string
TestMode bool Threads int
MaximumRetries int
TestMode bool
} }
func NewB2Client(accountID string, applicationKey string) *B2Client { // URL encode the given path but keep the slashes intact
func B2Escape(path string) string {
var components []string
for _, c := range strings.Split(path, "/") {
components = append(components, url.QueryEscape(c))
}
return strings.Join(components, "/")
}
func NewB2Client(applicationKeyID string, applicationKey string, storageDir string, threads int) *B2Client {
for storageDir != "" && storageDir[0] == '/' {
storageDir = storageDir[1:]
}
if storageDir != "" && storageDir[len(storageDir) - 1] != '/' {
storageDir += "/"
}
maximumRetries := 10
if value, found := os.LookupEnv("DUPLICACY_B2_RETRIES"); found && value != "" {
maximumRetries, _ = strconv.Atoi(value)
LOG_INFO("B2_RETRIES", "Setting maximum retries for B2 to %d", maximumRetries)
}
client := &B2Client{ client := &B2Client{
HTTPClient: http.DefaultClient, HTTPClient: http.DefaultClient,
AccountID: accountID, ApplicationKeyID: applicationKeyID,
ApplicationKey: applicationKey, ApplicationKey: applicationKey,
StorageDir: storageDir,
UploadURLs: make([]string, threads),
UploadTokens: make([]string, threads),
Threads: threads,
MaximumRetries: maximumRetries,
} }
return client return client
} }
func (client *B2Client) retry(backoff int, response *http.Response) int { func (client *B2Client) getAPIURL() string {
client.Lock.Lock()
defer client.Lock.Unlock()
return client.APIURL
}
func (client *B2Client) getDownloadURL() string {
client.Lock.Lock()
defer client.Lock.Unlock()
return client.DownloadURL
}
func (client *B2Client) retry(retries int, response *http.Response) int {
if response != nil { if response != nil {
if backoffList, found := response.Header["Retry-After"]; found && len(backoffList) > 0 { if backoffList, found := response.Header["Retry-After"]; found && len(backoffList) > 0 {
retryAfter, _ := strconv.Atoi(backoffList[0]) retryAfter, _ := strconv.Atoi(backoffList[0])
if retryAfter >= 1 { if retryAfter >= 1 {
time.Sleep(time.Duration(retryAfter) * time.Second) time.Sleep(time.Duration(retryAfter) * time.Second)
return 0 return 1
} }
} }
} }
if backoff == 0 {
backoff = 1 if retries >= client.MaximumRetries + 1 {
} else { return 0
backoff *= 2
} }
time.Sleep(time.Duration(backoff) * time.Second) retries++
return backoff delay := 1 << uint(retries)
if delay > 64 {
delay = 64
}
delayInSeconds := (rand.Float32() + 1.0) * float32(delay) / 2.0
time.Sleep(time.Duration(delayInSeconds) * time.Second)
return retries
} }
func (client *B2Client) call(url string, method string, requestHeaders map[string]string, input interface{}) (io.ReadCloser, http.Header, int64, error) { func (client *B2Client) call(threadIndex int, requestURL string, method string, requestHeaders map[string]string, input interface{}) (
io.ReadCloser, http.Header, int64, error) {
switch method {
case http.MethodGet:
break
case http.MethodHead:
break
case http.MethodPost:
break
default:
return nil, nil, 0, fmt.Errorf("unhandled http request method: " + method)
}
var response *http.Response var response *http.Response
backoff := 0 retries := 0
for i := 0; i < 8; i++ { for {
var inputReader *bytes.Reader var inputReader io.Reader
isUpload := false
switch input.(type) { switch input.(type) {
default: default:
@@ -107,21 +156,43 @@ func (client *B2Client) call(url string, method string, requestHeaders map[strin
return nil, nil, 0, err return nil, nil, 0, err
} }
inputReader = bytes.NewReader(jsonInput) inputReader = bytes.NewReader(jsonInput)
case []byte:
inputReader = bytes.NewReader(input.([]byte))
case int: case int:
inputReader = bytes.NewReader([]byte("")) inputReader = bytes.NewReader([]byte(""))
case []byte:
isUpload = true
inputReader = bytes.NewReader(input.([]byte))
case *RateLimitedReader:
isUpload = true
rateLimitedReader := input.(*RateLimitedReader)
rateLimitedReader.Reset()
inputReader = rateLimitedReader
} }
request, err := http.NewRequest(method, url, inputReader)
if isUpload {
if client.UploadURLs[threadIndex] == "" || client.UploadTokens[threadIndex] == "" {
err := client.getUploadURL(threadIndex)
if err != nil {
return nil, nil, 0, err
}
}
requestURL = client.UploadURLs[threadIndex]
}
request, err := http.NewRequest(method, requestURL, inputReader)
if err != nil { if err != nil {
return nil, nil, 0, err return nil, nil, 0, err
} }
if url == B2AuthorizationURL { if requestURL == B2AuthorizationURL {
request.Header.Set("Authorization", "Basic "+base64.StdEncoding.EncodeToString([]byte(client.AccountID+":"+client.ApplicationKey))) request.Header.Set("Authorization", "Basic "+base64.StdEncoding.EncodeToString([]byte(client.ApplicationKeyID+":"+client.ApplicationKey)))
} else if isUpload {
request.ContentLength, _ = strconv.ParseInt(requestHeaders["Content-Length"], 10, 64)
request.Header.Set("Authorization", client.UploadTokens[threadIndex])
} else { } else {
client.Lock.Lock()
request.Header.Set("Authorization", client.AuthorizationToken) request.Header.Set("Authorization", client.AuthorizationToken)
client.Lock.Unlock()
} }
if requestHeaders != nil { if requestHeaders != nil {
@@ -132,7 +203,9 @@ func (client *B2Client) call(url string, method string, requestHeaders map[strin
if client.TestMode { if client.TestMode {
r := rand.Float32() r := rand.Float32()
if r < 0.5 { if r < 0.5 && isUpload {
request.Header.Set("X-Bz-Test-Mode", "fail_some_uploads")
} else if r < 0.75 {
request.Header.Set("X-Bz-Test-Mode", "expire_some_account_authorization_tokens") request.Header.Set("X-Bz-Test-Mode", "expire_some_account_authorization_tokens")
} else { } else {
request.Header.Set("X-Bz-Test-Mode", "force_cap_exceeded") request.Header.Set("X-Bz-Test-Mode", "force_cap_exceeded")
@@ -141,27 +214,46 @@ func (client *B2Client) call(url string, method string, requestHeaders map[strin
response, err = client.HTTPClient.Do(request) response, err = client.HTTPClient.Do(request)
if err != nil { if err != nil {
if url != B2AuthorizationURL {
LOG_DEBUG("BACKBLAZE_CALL", "URL request '%s' returned an error: %v", url, err) // Don't retry when the first authorization request fails
backoff = client.retry(backoff, response) if requestURL == B2AuthorizationURL && !client.IsAuthorized {
continue return nil, nil, 0, err
} }
return nil, nil, 0, err
LOG_TRACE("BACKBLAZE_CALL", "[%d] URL request '%s' returned an error: %v", threadIndex, requestURL, err)
retries = client.retry(retries, response)
if retries <= 0 {
return nil, nil, 0, err
}
// Clear the upload url to requrest a new one on retry
if isUpload {
client.UploadURLs[threadIndex] = ""
client.UploadTokens[threadIndex] = ""
}
continue
} }
if response.StatusCode < 300 { if response.StatusCode < 300 {
return response.Body, response.Header, response.ContentLength, nil return response.Body, response.Header, response.ContentLength, nil
} }
LOG_DEBUG("BACKBLAZE_CALL", "URL request '%s %s' returned status code %d", method, url, response.StatusCode) e := &B2Error{}
if err := json.NewDecoder(response.Body).Decode(e); err != nil {
LOG_TRACE("BACKBLAZE_CALL", "[%d] URL request '%s %s' returned status code %d", threadIndex, method, requestURL, response.StatusCode)
} else {
LOG_TRACE("BACKBLAZE_CALL", "[%d] URL request '%s %s' returned %d %s", threadIndex, method, requestURL, response.StatusCode, e.Message)
}
io.Copy(ioutil.Discard, response.Body)
response.Body.Close() response.Body.Close()
if response.StatusCode == 401 { if response.StatusCode == 401 {
if url == B2AuthorizationURL { if requestURL == B2AuthorizationURL {
return nil, nil, 0, fmt.Errorf("Authorization failure") return nil, nil, 0, fmt.Errorf("Authorization failure")
} }
client.AuthorizeAccount() client.AuthorizeAccount(threadIndex)
continue continue
} else if response.StatusCode == 403 { } else if response.StatusCode == 403 {
if !client.TestMode { if !client.TestMode {
@@ -175,32 +267,21 @@ func (client *B2Client) call(url string, method string, requestHeaders map[strin
} else if response.StatusCode == 416 { } else if response.StatusCode == 416 {
if http.MethodHead == method { if http.MethodHead == method {
// 416 Requested Range Not Satisfiable // 416 Requested Range Not Satisfiable
return nil, nil, 0, fmt.Errorf("URL request '%s' returned status code %d", url, response.StatusCode) return nil, nil, 0, fmt.Errorf("URL request '%s' returned %d %s", requestURL, response.StatusCode, e.Message)
} }
} else if response.StatusCode == 429 || response.StatusCode == 408 {
backoff = client.retry(backoff, response)
continue
} else if response.StatusCode >= 500 && response.StatusCode <= 599 {
backoff = client.retry(backoff, response)
continue
} else {
LOG_INFO("BACKBLAZE_CALL", "URL request '%s' returned status code %d", url, response.StatusCode)
backoff = client.retry(backoff, response)
continue
} }
defer response.Body.Close() retries = client.retry(retries, response)
if retries <= 0 {
e := &B2Error{} return nil, nil, 0, fmt.Errorf("URL request '%s' returned %d %s", requestURL, response.StatusCode, e.Message)
if err := json.NewDecoder(response.Body).Decode(e); err != nil {
return nil, nil, 0, err
} }
return nil, nil, 0, e if isUpload {
client.UploadURLs[threadIndex] = ""
client.UploadTokens[threadIndex] = ""
}
} }
return nil, nil, 0, fmt.Errorf("Maximum backoff reached")
} }
type B2AuthorizeAccountOutput struct { type B2AuthorizeAccountOutput struct {
@@ -210,9 +291,11 @@ type B2AuthorizeAccountOutput struct {
DownloadURL string DownloadURL string
} }
func (client *B2Client) AuthorizeAccount() (err error) { func (client *B2Client) AuthorizeAccount(threadIndex int) (err error) {
client.Lock.Lock()
defer client.Lock.Unlock()
readCloser, _, _, err := client.call(B2AuthorizationURL, http.MethodPost, nil, make(map[string]string)) readCloser, _, _, err := client.call(threadIndex, B2AuthorizationURL, http.MethodPost, nil, make(map[string]string))
if err != nil { if err != nil {
return err return err
} }
@@ -225,15 +308,20 @@ func (client *B2Client) AuthorizeAccount() (err error) {
return err return err
} }
// The account id may be different from the application key id so we're getting the account id from the returned
// json object here, which is needed by the b2_list_buckets call.
client.AccountID = output.AccountID
client.AuthorizationToken = output.AuthorizationToken client.AuthorizationToken = output.AuthorizationToken
client.APIURL = output.APIURL client.APIURL = output.APIURL
client.DownloadURL = output.DownloadURL client.DownloadURL = output.DownloadURL
client.IsAuthorized = true
return nil return nil
} }
type ListBucketOutput struct { type ListBucketOutput struct {
AccoundID string AccountID string
BucketID string BucketID string
BucketName string BucketName string
BucketType string BucketType string
@@ -244,9 +332,9 @@ func (client *B2Client) FindBucket(bucketName string) (err error) {
input := make(map[string]string) input := make(map[string]string)
input["accountId"] = client.AccountID input["accountId"] = client.AccountID
url := client.APIURL + "/b2api/v1/b2_list_buckets" url := client.getAPIURL() + "/b2api/v1/b2_list_buckets"
readCloser, _, _, err := client.call(url, http.MethodPost, nil, input) readCloser, _, _, err := client.call(0, url, http.MethodPost, nil, input)
if err != nil { if err != nil {
return err return err
} }
@@ -288,7 +376,7 @@ type B2ListFileNamesOutput struct {
NextFileId string NextFileId string
} }
func (client *B2Client) ListFileNames(startFileName string, singleFile bool, includeVersions bool) (files []*B2Entry, err error) { func (client *B2Client) ListFileNames(threadIndex int, startFileName string, singleFile bool, includeVersions bool) (files []*B2Entry, err error) {
maxFileCount := 1000 maxFileCount := 1000
if singleFile { if singleFile {
@@ -306,20 +394,21 @@ func (client *B2Client) ListFileNames(startFileName string, singleFile bool, inc
input := make(map[string]interface{}) input := make(map[string]interface{})
input["bucketId"] = client.BucketID input["bucketId"] = client.BucketID
input["startFileName"] = startFileName input["startFileName"] = client.StorageDir + startFileName
input["maxFileCount"] = maxFileCount input["maxFileCount"] = maxFileCount
input["prefix"] = client.StorageDir
for { for {
url := client.APIURL + "/b2api/v1/b2_list_file_names" url := client.getAPIURL() + "/b2api/v1/b2_list_file_names"
requestHeaders := map[string]string{} requestHeaders := map[string]string{}
requestMethod := http.MethodPost requestMethod := http.MethodPost
var requestInput interface{} var requestInput interface{}
requestInput = input requestInput = input
if includeVersions { if includeVersions {
url = client.APIURL + "/b2api/v1/b2_list_file_versions" url = client.getAPIURL() + "/b2api/v1/b2_list_file_versions"
} else if singleFile { } else if singleFile {
// handle a single file with no versions as a special case to download the last byte of the file // handle a single file with no versions as a special case to download the last byte of the file
url = client.DownloadURL + "/file/" + client.BucketName + "/" + startFileName url = client.getDownloadURL() + "/file/" + client.BucketName + "/" + B2Escape(client.StorageDir + startFileName)
// requesting byte -1 works for empty files where 0-0 fails with a 416 error // requesting byte -1 works for empty files where 0-0 fails with a 416 error
requestHeaders["Range"] = "bytes=-1" requestHeaders["Range"] = "bytes=-1"
// HEAD request // HEAD request
@@ -329,7 +418,7 @@ func (client *B2Client) ListFileNames(startFileName string, singleFile bool, inc
var readCloser io.ReadCloser var readCloser io.ReadCloser
var responseHeader http.Header var responseHeader http.Header
var err error var err error
readCloser, responseHeader, _, err = client.call(url, requestMethod, requestHeaders, requestInput) readCloser, responseHeader, _, err = client.call(threadIndex, url, requestMethod, requestHeaders, requestInput)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -342,7 +431,7 @@ func (client *B2Client) ListFileNames(startFileName string, singleFile bool, inc
if singleFile && !includeVersions { if singleFile && !includeVersions {
if responseHeader == nil { if responseHeader == nil {
LOG_DEBUG("BACKBLAZE_LIST", "b2_download_file_by_name did not return headers") LOG_DEBUG("BACKBLAZE_LIST", "%s did not return headers", url)
return []*B2Entry{}, nil return []*B2Entry{}, nil
} }
requiredHeaders := []string{ requiredHeaders := []string{
@@ -356,7 +445,7 @@ func (client *B2Client) ListFileNames(startFileName string, singleFile bool, inc
} }
} }
if len(missingKeys) > 0 { if len(missingKeys) > 0 {
return nil, fmt.Errorf("b2_download_file_by_name missing headers: %s", missingKeys) return nil, fmt.Errorf("%s missing headers: %s", url, missingKeys)
} }
// construct the B2Entry from the response headers of the download request // construct the B2Entry from the response headers of the download request
fileID := responseHeader.Get("x-bz-file-id") fileID := responseHeader.Get("x-bz-file-id")
@@ -373,14 +462,14 @@ func (client *B2Client) ListFileNames(startFileName string, singleFile bool, inc
// this should only execute if the requested file is empty and the range request didn't result in a Content-Range header // this should only execute if the requested file is empty and the range request didn't result in a Content-Range header
fileSize, _ = strconv.ParseInt(lengthString, 0, 64) fileSize, _ = strconv.ParseInt(lengthString, 0, 64)
if fileSize != 0 { if fileSize != 0 {
return nil, fmt.Errorf("b2_download_file_by_name returned non-zero file length") return nil, fmt.Errorf("%s returned non-zero file length", url)
} }
} else { } else {
return nil, fmt.Errorf("could not parse b2_download_file_by_name headers") return nil, fmt.Errorf("could not parse headers returned by %s", url)
} }
fileUploadTimestamp, _ := strconv.ParseInt(responseHeader.Get("X-Bz-Upload-Timestamp"), 0, 64) fileUploadTimestamp, _ := strconv.ParseInt(responseHeader.Get("X-Bz-Upload-Timestamp"), 0, 64)
return []*B2Entry{&B2Entry{fileID, fileName, fileAction, fileSize, fileUploadTimestamp}}, nil return []*B2Entry{{fileID, fileName[len(client.StorageDir):], fileAction, fileSize, fileUploadTimestamp}}, nil
} }
if err = json.NewDecoder(readCloser).Decode(&output); err != nil { if err = json.NewDecoder(readCloser).Decode(&output); err != nil {
@@ -389,31 +478,27 @@ func (client *B2Client) ListFileNames(startFileName string, singleFile bool, inc
ioutil.ReadAll(readCloser) ioutil.ReadAll(readCloser)
if startFileName == "" { for _, file := range output.Files {
files = append(files, output.Files...) file.FileName = file.FileName[len(client.StorageDir):]
} else { if singleFile {
for _, file := range output.Files { if file.FileName == startFileName {
if singleFile { files = append(files, file)
if file.FileName == startFileName { if !includeVersions {
files = append(files, file)
if !includeVersions {
output.NextFileName = ""
break
}
} else {
output.NextFileName = "" output.NextFileName = ""
break break
} }
} else { } else {
if strings.HasPrefix(file.FileName, startFileName) { output.NextFileName = ""
files = append(files, file) break
} else { }
output.NextFileName = "" } else {
break if strings.HasPrefix(file.FileName, startFileName) {
} files = append(files, file)
} else {
output.NextFileName = ""
break
} }
} }
} }
if len(output.NextFileName) == 0 { if len(output.NextFileName) == 0 {
@@ -429,14 +514,14 @@ func (client *B2Client) ListFileNames(startFileName string, singleFile bool, inc
return files, nil return files, nil
} }
func (client *B2Client) DeleteFile(fileName string, fileID string) (err error) { func (client *B2Client) DeleteFile(threadIndex int, fileName string, fileID string) (err error) {
input := make(map[string]string) input := make(map[string]string)
input["fileName"] = fileName input["fileName"] = client.StorageDir + fileName
input["fileId"] = fileID input["fileId"] = fileID
url := client.APIURL + "/b2api/v1/b2_delete_file_version" url := client.getAPIURL() + "/b2api/v1/b2_delete_file_version"
readCloser, _, _, err := client.call(url, http.MethodPost, make(map[string]string), input) readCloser, _, _, err := client.call(threadIndex, url, http.MethodPost, make(map[string]string), input)
if err != nil { if err != nil {
return err return err
} }
@@ -449,14 +534,14 @@ type B2HideFileOutput struct {
FileID string FileID string
} }
func (client *B2Client) HideFile(fileName string) (fileID string, err error) { func (client *B2Client) HideFile(threadIndex int, fileName string) (fileID string, err error) {
input := make(map[string]string) input := make(map[string]string)
input["bucketId"] = client.BucketID input["bucketId"] = client.BucketID
input["fileName"] = fileName input["fileName"] = client.StorageDir + fileName
url := client.APIURL + "/b2api/v1/b2_hide_file" url := client.getAPIURL() + "/b2api/v1/b2_hide_file"
readCloser, _, _, err := client.call(url, http.MethodPost, make(map[string]string), input) readCloser, _, _, err := client.call(threadIndex, url, http.MethodPost, make(map[string]string), input)
if err != nil { if err != nil {
return "", err return "", err
} }
@@ -473,11 +558,11 @@ func (client *B2Client) HideFile(fileName string) (fileID string, err error) {
return output.FileID, nil return output.FileID, nil
} }
func (client *B2Client) DownloadFile(filePath string) (io.ReadCloser, int64, error) { func (client *B2Client) DownloadFile(threadIndex int, filePath string) (io.ReadCloser, int64, error) {
url := client.DownloadURL + "/file/" + client.BucketName + "/" + filePath url := client.getDownloadURL() + "/file/" + client.BucketName + "/" + B2Escape(client.StorageDir + filePath)
readCloser, _, len, err := client.call(url, http.MethodGet, make(map[string]string), 0) readCloser, _, len, err := client.call(threadIndex, url, http.MethodGet, make(map[string]string), 0)
return readCloser, len, err return readCloser, len, err
} }
@@ -487,12 +572,12 @@ type B2GetUploadArgumentOutput struct {
AuthorizationToken string AuthorizationToken string
} }
func (client *B2Client) getUploadURL() error { func (client *B2Client) getUploadURL(threadIndex int) error {
input := make(map[string]string) input := make(map[string]string)
input["bucketId"] = client.BucketID input["bucketId"] = client.BucketID
url := client.APIURL + "/b2api/v1/b2_get_upload_url" url := client.getAPIURL() + "/b2api/v1/b2_get_upload_url"
readCloser, _, _, err := client.call(url, http.MethodPost, make(map[string]string), input) readCloser, _, _, err := client.call(threadIndex, url, http.MethodPost, make(map[string]string), input)
if err != nil { if err != nil {
return err return err
} }
@@ -505,96 +590,29 @@ func (client *B2Client) getUploadURL() error {
return err return err
} }
client.UploadURL = output.UploadURL client.UploadURLs[threadIndex] = output.UploadURL
client.UploadToken = output.AuthorizationToken client.UploadTokens[threadIndex] = output.AuthorizationToken
return nil return nil
} }
func (client *B2Client) UploadFile(filePath string, content []byte, rateLimit int) (err error) { func (client *B2Client) UploadFile(threadIndex int, filePath string, content []byte, rateLimit int) (err error) {
hasher := sha1.New() hasher := sha1.New()
hasher.Write(content) hasher.Write(content)
hash := hex.EncodeToString(hasher.Sum(nil)) hash := hex.EncodeToString(hasher.Sum(nil))
headers := make(map[string]string) headers := make(map[string]string)
headers["X-Bz-File-Name"] = filePath headers["X-Bz-File-Name"] = B2Escape(client.StorageDir + filePath)
headers["Content-Length"] = fmt.Sprintf("%d", len(content))
headers["Content-Type"] = "application/octet-stream" headers["Content-Type"] = "application/octet-stream"
headers["X-Bz-Content-Sha1"] = hash headers["X-Bz-Content-Sha1"] = hash
var response *http.Response readCloser, _, _, err := client.call(threadIndex, "", http.MethodPost, headers, CreateRateLimitedReader(content, rateLimit))
if err != nil {
backoff := 0 return err
for i := 0; i < 8; i++ {
if client.UploadURL == "" || client.UploadToken == "" {
err = client.getUploadURL()
if err != nil {
return err
}
}
request, err := http.NewRequest("POST", client.UploadURL, CreateRateLimitedReader(content, rateLimit))
if err != nil {
return err
}
request.ContentLength = int64(len(content))
request.Header.Set("Authorization", client.UploadToken)
request.Header.Set("X-Bz-File-Name", filePath)
request.Header.Set("Content-Type", "application/octet-stream")
request.Header.Set("X-Bz-Content-Sha1", hash)
for key, value := range headers {
request.Header.Set(key, value)
}
if client.TestMode {
r := rand.Float32()
if r < 0.8 {
request.Header.Set("X-Bz-Test-Mode", "fail_some_uploads")
} else if r < 0.9 {
request.Header.Set("X-Bz-Test-Mode", "expire_some_account_authorization_tokens")
} else {
request.Header.Set("X-Bz-Test-Mode", "force_cap_exceeded")
}
}
response, err = client.HTTPClient.Do(request)
if err != nil {
LOG_DEBUG("BACKBLAZE_UPLOAD", "URL request '%s' returned an error: %v", client.UploadURL, err)
backoff = client.retry(backoff, response)
client.UploadURL = ""
client.UploadToken = ""
continue
}
io.Copy(ioutil.Discard, response.Body)
response.Body.Close()
if response.StatusCode < 300 {
return nil
}
LOG_DEBUG("BACKBLAZE_UPLOAD", "URL request '%s' returned status code %d", client.UploadURL, response.StatusCode)
if response.StatusCode == 401 {
LOG_INFO("BACKBLAZE_UPLOAD", "Re-authorization required")
client.UploadURL = ""
client.UploadToken = ""
continue
} else if response.StatusCode == 403 {
if !client.TestMode {
return fmt.Errorf("B2 cap exceeded")
}
continue
} else {
LOG_INFO("BACKBLAZE_UPLOAD", "URL request '%s' returned status code %d", client.UploadURL, response.StatusCode)
backoff = client.retry(backoff, response)
client.UploadURL = ""
client.UploadToken = ""
}
} }
return fmt.Errorf("Maximum backoff reached") readCloser.Close()
return nil
} }

View File

@@ -37,7 +37,7 @@ func createB2ClientForTest(t *testing.T) (*B2Client, string) {
return nil, "" return nil, ""
} }
return NewB2Client(b2["account"], b2["key"]), b2["bucket"] return NewB2Client(b2["account"], b2["key"], b2["directory"], 1), b2["bucket"]
} }
@@ -50,7 +50,7 @@ func TestB2Client(t *testing.T) {
b2Client.TestMode = true b2Client.TestMode = true
err := b2Client.AuthorizeAccount() err := b2Client.AuthorizeAccount(0)
if err != nil { if err != nil {
t.Errorf("Failed to authorize the b2 account: %v", err) t.Errorf("Failed to authorize the b2 account: %v", err)
return return
@@ -64,14 +64,14 @@ func TestB2Client(t *testing.T) {
testDirectory := "b2client_test/" testDirectory := "b2client_test/"
files, err := b2Client.ListFileNames(testDirectory, false, false) files, err := b2Client.ListFileNames(0, testDirectory, false, false)
if err != nil { if err != nil {
t.Errorf("Failed to list files: %v", err) t.Errorf("Failed to list files: %v", err)
return return
} }
for _, file := range files { for _, file := range files {
err = b2Client.DeleteFile(file.FileName, file.FileID) err = b2Client.DeleteFile(0, file.FileName, file.FileID)
if err != nil { if err != nil {
t.Errorf("Failed to delete file '%s': %v", file.FileName, err) t.Errorf("Failed to delete file '%s': %v", file.FileName, err)
} }
@@ -90,14 +90,14 @@ func TestB2Client(t *testing.T) {
hash := sha256.Sum256(content) hash := sha256.Sum256(content)
name := hex.EncodeToString(hash[:]) name := hex.EncodeToString(hash[:])
err = b2Client.UploadFile(testDirectory+name, content, 100) err = b2Client.UploadFile(0, testDirectory+name, content, 100)
if err != nil { if err != nil {
t.Errorf("Error uploading file '%s': %v", name, err) t.Errorf("Error uploading file '%s': %v", name, err)
return return
} }
} }
files, err = b2Client.ListFileNames(testDirectory, false, false) files, err = b2Client.ListFileNames(0, testDirectory, false, false)
if err != nil { if err != nil {
t.Errorf("Failed to list files: %v", err) t.Errorf("Failed to list files: %v", err)
return return
@@ -105,7 +105,7 @@ func TestB2Client(t *testing.T) {
for _, file := range files { for _, file := range files {
readCloser, _, err := b2Client.DownloadFile(file.FileName) readCloser, _, err := b2Client.DownloadFile(0, file.FileName)
if err != nil { if err != nil {
t.Errorf("Error downloading file '%s': %v", file.FileName, err) t.Errorf("Error downloading file '%s': %v", file.FileName, err)
return return
@@ -125,7 +125,7 @@ func TestB2Client(t *testing.T) {
} }
for _, file := range files { for _, file := range files {
err = b2Client.DeleteFile(file.FileName, file.FileID) err = b2Client.DeleteFile(0, file.FileName, file.FileID)
if err != nil { if err != nil {
t.Errorf("Failed to delete file '%s': %v", file.FileName, err) t.Errorf("Failed to delete file '%s': %v", file.FileName, err)
} }

View File

@@ -11,32 +11,26 @@ import (
type B2Storage struct { type B2Storage struct {
StorageBase StorageBase
clients []*B2Client client *B2Client
} }
// CreateB2Storage creates a B2 storage object. // CreateB2Storage creates a B2 storage object.
func CreateB2Storage(accountID string, applicationKey string, bucket string, threads int) (storage *B2Storage, err error) { func CreateB2Storage(accountID string, applicationKey string, bucket string, storageDir string, threads int) (storage *B2Storage, err error) {
var clients []*B2Client client := NewB2Client(accountID, applicationKey, storageDir, threads)
for i := 0; i < threads; i++ { err = client.AuthorizeAccount(0)
client := NewB2Client(accountID, applicationKey) if err != nil {
return nil, err
}
err = client.AuthorizeAccount() err = client.FindBucket(bucket)
if err != nil { if err != nil {
return nil, err return nil, err
}
err = client.FindBucket(bucket)
if err != nil {
return nil, err
}
clients = append(clients, client)
} }
storage = &B2Storage{ storage = &B2Storage{
clients: clients, client: client,
} }
storage.DerivedStorage = storage storage.DerivedStorage = storage
@@ -56,7 +50,7 @@ func (storage *B2Storage) ListFiles(threadIndex int, dir string) (files []string
includeVersions = true includeVersions = true
} }
entries, err := storage.clients[threadIndex].ListFileNames(dir, false, includeVersions) entries, err := storage.client.ListFileNames(threadIndex, dir, false, includeVersions)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
@@ -71,7 +65,7 @@ func (storage *B2Storage) ListFiles(threadIndex int, dir string) (files []string
subDirs[subDir+"/"] = true subDirs[subDir+"/"] = true
} }
for subDir, _ := range subDirs { for subDir := range subDirs {
files = append(files, subDir) files = append(files, subDir)
} }
} else if dir == "chunks" { } else if dir == "chunks" {
@@ -102,7 +96,7 @@ func (storage *B2Storage) DeleteFile(threadIndex int, filePath string) (err erro
if strings.HasSuffix(filePath, ".fsl") { if strings.HasSuffix(filePath, ".fsl") {
filePath = filePath[:len(filePath)-len(".fsl")] filePath = filePath[:len(filePath)-len(".fsl")]
entries, err := storage.clients[threadIndex].ListFileNames(filePath, true, true) entries, err := storage.client.ListFileNames(threadIndex, filePath, true, true)
if err != nil { if err != nil {
return err return err
} }
@@ -116,7 +110,7 @@ func (storage *B2Storage) DeleteFile(threadIndex int, filePath string) (err erro
toBeDeleted = true toBeDeleted = true
err = storage.clients[threadIndex].DeleteFile(filePath, entry.FileID) err = storage.client.DeleteFile(threadIndex, filePath, entry.FileID)
if err != nil { if err != nil {
return err return err
} }
@@ -125,7 +119,7 @@ func (storage *B2Storage) DeleteFile(threadIndex int, filePath string) (err erro
return nil return nil
} else { } else {
entries, err := storage.clients[threadIndex].ListFileNames(filePath, true, false) entries, err := storage.client.ListFileNames(threadIndex, filePath, true, false)
if err != nil { if err != nil {
return err return err
} }
@@ -133,7 +127,7 @@ func (storage *B2Storage) DeleteFile(threadIndex int, filePath string) (err erro
if len(entries) == 0 { if len(entries) == 0 {
return nil return nil
} }
return storage.clients[threadIndex].DeleteFile(filePath, entries[0].FileID) return storage.client.DeleteFile(threadIndex, filePath, entries[0].FileID)
} }
} }
@@ -160,10 +154,10 @@ func (storage *B2Storage) MoveFile(threadIndex int, from string, to string) (err
} }
if filePath == from { if filePath == from {
_, err = storage.clients[threadIndex].HideFile(from) _, err = storage.client.HideFile(threadIndex, from)
return err return err
} else { } else {
entries, err := storage.clients[threadIndex].ListFileNames(filePath, true, true) entries, err := storage.client.ListFileNames(threadIndex, filePath, true, true)
if err != nil { if err != nil {
return err return err
} }
@@ -171,7 +165,7 @@ func (storage *B2Storage) MoveFile(threadIndex int, from string, to string) (err
return nil return nil
} }
return storage.clients[threadIndex].DeleteFile(filePath, entries[0].FileID) return storage.client.DeleteFile(threadIndex, filePath, entries[0].FileID)
} }
} }
@@ -188,7 +182,7 @@ func (storage *B2Storage) GetFileInfo(threadIndex int, filePath string) (exist b
filePath = filePath[:len(filePath)-len(".fsl")] filePath = filePath[:len(filePath)-len(".fsl")]
} }
entries, err := storage.clients[threadIndex].ListFileNames(filePath, true, isFossil) entries, err := storage.client.ListFileNames(threadIndex, filePath, true, isFossil)
if err != nil { if err != nil {
return false, false, 0, err return false, false, 0, err
} }
@@ -211,21 +205,21 @@ func (storage *B2Storage) GetFileInfo(threadIndex int, filePath string) (exist b
func (storage *B2Storage) DownloadFile(threadIndex int, filePath string, chunk *Chunk) (err error) { func (storage *B2Storage) DownloadFile(threadIndex int, filePath string, chunk *Chunk) (err error) {
filePath = strings.Replace(filePath, " ", "%20", -1) filePath = strings.Replace(filePath, " ", "%20", -1)
readCloser, _, err := storage.clients[threadIndex].DownloadFile(filePath) readCloser, _, err := storage.client.DownloadFile(threadIndex, filePath)
if err != nil { if err != nil {
return err return err
} }
defer readCloser.Close() defer readCloser.Close()
_, err = RateLimitedCopy(chunk, readCloser, storage.DownloadRateLimit/len(storage.clients)) _, err = RateLimitedCopy(chunk, readCloser, storage.DownloadRateLimit/storage.client.Threads)
return err return err
} }
// UploadFile writes 'content' to the file at 'filePath'. // UploadFile writes 'content' to the file at 'filePath'.
func (storage *B2Storage) UploadFile(threadIndex int, filePath string, content []byte) (err error) { func (storage *B2Storage) UploadFile(threadIndex int, filePath string, content []byte) (err error) {
filePath = strings.Replace(filePath, " ", "%20", -1) filePath = strings.Replace(filePath, " ", "%20", -1)
return storage.clients[threadIndex].UploadFile(filePath, content, storage.UploadRateLimit/len(storage.clients)) return storage.client.UploadFile(threadIndex, filePath, content, storage.UploadRateLimit/storage.client.Threads)
} }
// If a local snapshot cache is needed for the storage to avoid downloading/uploading chunks too often when // If a local snapshot cache is needed for the storage to avoid downloading/uploading chunks too often when
@@ -243,7 +237,5 @@ func (storage *B2Storage) IsFastListing() bool { return true }
// Enable the test mode. // Enable the test mode.
func (storage *B2Storage) EnableTestMode() { func (storage *B2Storage) EnableTestMode() {
for _, client := range storage.clients { storage.client.TestMode = true
client.TestMode = true
}
} }

View File

@@ -33,7 +33,7 @@ type BackupManager struct {
snapshotCache *FileStorage // for copies of chunks needed by snapshots snapshotCache *FileStorage // for copies of chunks needed by snapshots
config *Config // contains a number of options config *Config // contains a number of options
nobackupFile string // don't backup directory when this file name is found nobackupFile string // don't backup directory when this file name is found
} }
@@ -65,7 +65,7 @@ func CreateBackupManager(snapshotID string, storage Storage, top string, passwor
SnapshotManager: snapshotManager, SnapshotManager: snapshotManager,
config: config, config: config,
nobackupFile: nobackupFile, nobackupFile: nobackupFile,
} }
@@ -472,7 +472,7 @@ func (manager *BackupManager) Backup(top string, quickMode bool, threads int, ta
uploadedModifiedFileSize := atomic.AddInt64(&uploadedModifiedFileSize, int64(chunkSize)) uploadedModifiedFileSize := atomic.AddInt64(&uploadedModifiedFileSize, int64(chunkSize))
if IsTracing() || showStatistics { if (IsTracing() || showStatistics) && totalModifiedFileSize > 0 {
now := time.Now().Unix() now := time.Now().Unix()
if now <= startUploadingTime { if now <= startUploadingTime {
now = startUploadingTime + 1 now = startUploadingTime + 1
@@ -825,6 +825,7 @@ func (manager *BackupManager) Restore(top string, revision int, inPlace bool, qu
if stat.Mode()&os.ModeSymlink != 0 { if stat.Mode()&os.ModeSymlink != 0 {
isRegular, link, err := Readlink(fullPath) isRegular, link, err := Readlink(fullPath)
if err == nil && link == entry.Link && !isRegular { if err == nil && link == entry.Link && !isRegular {
entry.RestoreMetadata(fullPath, nil, setOwner)
continue continue
} }
} }
@@ -837,6 +838,7 @@ func (manager *BackupManager) Restore(top string, revision int, inPlace bool, qu
LOG_ERROR("RESTORE_SYMLINK", "Can't create symlink %s: %v", entry.Path, err) LOG_ERROR("RESTORE_SYMLINK", "Can't create symlink %s: %v", entry.Path, err)
return false return false
} }
entry.RestoreMetadata(fullPath, nil, setOwner)
LOG_TRACE("DOWNLOAD_DONE", "Symlink %s updated", entry.Path) LOG_TRACE("DOWNLOAD_DONE", "Symlink %s updated", entry.Path)
} else if entry.IsDir() { } else if entry.IsDir() {
stat, err := os.Stat(fullPath) stat, err := os.Stat(fullPath)
@@ -979,12 +981,12 @@ type fileEncoder struct {
buffer *bytes.Buffer buffer *bytes.Buffer
} }
// Read reads data from the embeded buffer // Read reads data from the embedded buffer
func (encoder fileEncoder) Read(data []byte) (n int, err error) { func (encoder fileEncoder) Read(data []byte) (n int, err error) {
return encoder.buffer.Read(data) return encoder.buffer.Read(data)
} }
// NextFile switchs to the next file and generates its json description in the buffer. It also takes care of // NextFile switches to the next file and generates its json description in the buffer. It also takes care of
// the ending ']' and the commas between files. // the ending ']' and the commas between files.
func (encoder *fileEncoder) NextFile() (io.Reader, bool) { func (encoder *fileEncoder) NextFile() (io.Reader, bool) {
if encoder.currentIndex == len(encoder.files) { if encoder.currentIndex == len(encoder.files) {
@@ -1124,7 +1126,7 @@ func (manager *BackupManager) UploadSnapshot(chunkMaker *ChunkMaker, uploader *C
} }
// Restore downloads a file from the storage. If 'inPlace' is false, the download file is saved first to a temporary // Restore downloads a file from the storage. If 'inPlace' is false, the download file is saved first to a temporary
// file under the .duplicacy directory and then replaces the existing one. Otherwise, the exising file will be // file under the .duplicacy directory and then replaces the existing one. Otherwise, the existing file will be
// overwritten directly. // overwritten directly.
func (manager *BackupManager) RestoreFile(chunkDownloader *ChunkDownloader, chunkMaker *ChunkMaker, entry *Entry, top string, inPlace bool, overwrite bool, func (manager *BackupManager) RestoreFile(chunkDownloader *ChunkDownloader, chunkMaker *ChunkMaker, entry *Entry, top string, inPlace bool, overwrite bool,
showStatistics bool, totalFileSize int64, downloadedFileSize int64, startTime int64) bool { showStatistics bool, totalFileSize int64, downloadedFileSize int64, startTime int64) bool {
@@ -1160,6 +1162,9 @@ func (manager *BackupManager) RestoreFile(chunkDownloader *ChunkDownloader, chun
lengthMap := make(map[string]int) lengthMap := make(map[string]int)
var offset int64 var offset int64
// If the file is newly created (needed by sparse file optimization)
isNewFile := false
existingFile, err = os.Open(fullPath) existingFile, err = os.Open(fullPath)
if err != nil { if err != nil {
if os.IsNotExist(err) { if os.IsNotExist(err) {
@@ -1194,6 +1199,7 @@ func (manager *BackupManager) RestoreFile(chunkDownloader *ChunkDownloader, chun
LOG_ERROR("DOWNLOAD_OPEN", "Can't reopen the initial file just created: %v", err) LOG_ERROR("DOWNLOAD_OPEN", "Can't reopen the initial file just created: %v", err)
return false return false
} }
isNewFile = true
} }
} else { } else {
LOG_TRACE("DOWNLOAD_OPEN", "Can't open the existing file: %v", err) LOG_TRACE("DOWNLOAD_OPEN", "Can't open the existing file: %v", err)
@@ -1206,6 +1212,9 @@ func (manager *BackupManager) RestoreFile(chunkDownloader *ChunkDownloader, chun
} }
} }
// The key in this map is the number of zeroes. The value is the corresponding hash.
knownHashes := make(map[int]string)
fileHash := "" fileHash := ""
if existingFile != nil { if existingFile != nil {
@@ -1215,6 +1224,7 @@ func (manager *BackupManager) RestoreFile(chunkDownloader *ChunkDownloader, chun
fileHasher := manager.config.NewFileHasher() fileHasher := manager.config.NewFileHasher()
buffer := make([]byte, 64*1024) buffer := make([]byte, 64*1024)
err = nil err = nil
isSkipped := false
// We set to read one more byte so the file hash will be different if the file to be restored is a // We set to read one more byte so the file hash will be different if the file to be restored is a
// truncated portion of the existing file // truncated portion of the existing file
for i := entry.StartChunk; i <= entry.EndChunk+1; i++ { for i := entry.StartChunk; i <= entry.EndChunk+1; i++ {
@@ -1230,6 +1240,28 @@ func (manager *BackupManager) RestoreFile(chunkDownloader *ChunkDownloader, chun
chunkSize = 1 // the size of extra chunk beyond EndChunk chunkSize = 1 // the size of extra chunk beyond EndChunk
} }
count := 0 count := 0
if isNewFile {
if hash, found := knownHashes[chunkSize]; found {
// We have read the same number of zeros before, so we just retrieve the hash from the map
existingChunks = append(existingChunks, hash)
existingLengths = append(existingLengths, chunkSize)
offsetMap[hash] = offset
lengthMap[hash] = chunkSize
offset += int64(chunkSize)
isSkipped = true
continue
}
}
if isSkipped {
_, err := existingFile.Seek(offset, 0)
if err != nil {
LOG_ERROR("DOWNLOAD_SEEK", "Failed to seek to offset %d: %v", offset, err)
}
isSkipped = false
}
for count < chunkSize { for count < chunkSize {
n := chunkSize - count n := chunkSize - count
if n > cap(buffer) { if n > cap(buffer) {
@@ -1256,12 +1288,16 @@ func (manager *BackupManager) RestoreFile(chunkDownloader *ChunkDownloader, chun
offsetMap[hash] = offset offsetMap[hash] = offset
lengthMap[hash] = chunkSize lengthMap[hash] = chunkSize
offset += int64(chunkSize) offset += int64(chunkSize)
if isNewFile {
knownHashes[chunkSize] = hash
}
} }
if err == io.EOF { if err == io.EOF {
break break
} }
} }
fileHash = hex.EncodeToString(fileHasher.Sum(nil)) fileHash = hex.EncodeToString(fileHasher.Sum(nil))
} else { } else {
// If it is not inplace, we want to reuse any chunks in the existing file regardless their offets, so // If it is not inplace, we want to reuse any chunks in the existing file regardless their offets, so
@@ -1491,7 +1527,7 @@ func (manager *BackupManager) CopySnapshots(otherManager *BackupManager, snapsho
revisionsToBeCopied []int, threads int) bool { revisionsToBeCopied []int, threads int) bool {
if !manager.config.IsCompatiableWith(otherManager.config) { if !manager.config.IsCompatiableWith(otherManager.config) {
LOG_ERROR("CONFIG_INCOMPATIABLE", "Two storages are not compatiable for the copy operation") LOG_ERROR("CONFIG_INCOMPATIBLE", "Two storages are not compatible for the copy operation")
return false return false
} }
@@ -1637,7 +1673,7 @@ func (manager *BackupManager) CopySnapshots(otherManager *BackupManager, snapsho
chunksToCopy := 0 chunksToCopy := 0
chunksToSkip := 0 chunksToSkip := 0
for chunkHash, _ := range chunks { for chunkHash := range chunks {
otherChunkID := otherManager.config.GetChunkIDFromHash(chunkHash) otherChunkID := otherManager.config.GetChunkIDFromHash(chunkHash)
if _, found := otherChunks[otherChunkID]; found { if _, found := otherChunks[otherChunkID]; found {
chunksToSkip++ chunksToSkip++
@@ -1667,7 +1703,7 @@ func (manager *BackupManager) CopySnapshots(otherManager *BackupManager, snapsho
totalSkipped := 0 totalSkipped := 0
chunkIndex := 0 chunkIndex := 0
for chunkHash, _ := range chunks { for chunkHash := range chunks {
chunkIndex++ chunkIndex++
chunkID := manager.config.GetChunkIDFromHash(chunkHash) chunkID := manager.config.GetChunkIDFromHash(chunkHash)
newChunkID := otherManager.config.GetChunkIDFromHash(chunkHash) newChunkID := otherManager.config.GetChunkIDFromHash(chunkHash)

View File

@@ -246,8 +246,8 @@ func TestBackupManager(t *testing.T) {
backupManager.Backup(testDir+"/repository1" /*quickMode=*/, true, threads, "first", false, false, 0, false) backupManager.Backup(testDir+"/repository1" /*quickMode=*/, true, threads, "first", false, false, 0, false)
time.Sleep(time.Duration(delay) * time.Second) time.Sleep(time.Duration(delay) * time.Second)
SetDuplicacyPreferencePath(testDir + "/repository2/.duplicacy") SetDuplicacyPreferencePath(testDir + "/repository2/.duplicacy")
backupManager.Restore(testDir+"/repository2", threads, /*inPlace=*/false, /*quickMode=*/false, threads, /*overwrite=*/true, backupManager.Restore(testDir+"/repository2", threads /*inPlace=*/, false /*quickMode=*/, false, threads /*overwrite=*/, true,
/*deleteMode=*/false, /*setowner=*/false, /*showStatistics=*/false, /*patterns=*/ nil) /*deleteMode=*/ false /*setowner=*/, false /*showStatistics=*/, false /*patterns=*/, nil)
for _, f := range []string{"file1", "file2", "dir1/file3"} { for _, f := range []string{"file1", "file2", "dir1/file3"} {
if _, err := os.Stat(testDir + "/repository2/" + f); os.IsNotExist(err) { if _, err := os.Stat(testDir + "/repository2/" + f); os.IsNotExist(err) {
@@ -270,8 +270,8 @@ func TestBackupManager(t *testing.T) {
backupManager.Backup(testDir+"/repository1" /*quickMode=*/, true, threads, "second", false, false, 0, false) backupManager.Backup(testDir+"/repository1" /*quickMode=*/, true, threads, "second", false, false, 0, false)
time.Sleep(time.Duration(delay) * time.Second) time.Sleep(time.Duration(delay) * time.Second)
SetDuplicacyPreferencePath(testDir + "/repository2/.duplicacy") SetDuplicacyPreferencePath(testDir + "/repository2/.duplicacy")
backupManager.Restore(testDir+"/repository2", 2, /*inPlace=*/true, /*quickMode=*/true, threads, /*overwrite=*/true, backupManager.Restore(testDir+"/repository2", 2 /*inPlace=*/, true /*quickMode=*/, true, threads /*overwrite=*/, true,
/*deleteMode=*/false, /*setowner=*/false, /*showStatistics=*/false, /*patterns=*/nil) /*deleteMode=*/ false /*setowner=*/, false /*showStatistics=*/, false /*patterns=*/, nil)
for _, f := range []string{"file1", "file2", "dir1/file3"} { for _, f := range []string{"file1", "file2", "dir1/file3"} {
hash1 := getFileHash(testDir + "/repository1/" + f) hash1 := getFileHash(testDir + "/repository1/" + f)
@@ -298,8 +298,8 @@ func TestBackupManager(t *testing.T) {
createRandomFile(testDir+"/repository2/dir5/file5", 100) createRandomFile(testDir+"/repository2/dir5/file5", 100)
SetDuplicacyPreferencePath(testDir + "/repository2/.duplicacy") SetDuplicacyPreferencePath(testDir + "/repository2/.duplicacy")
backupManager.Restore(testDir+"/repository2", 3, /*inPlace=*/true, /*quickMode=*/false, threads, /*overwrite=*/true, backupManager.Restore(testDir+"/repository2", 3 /*inPlace=*/, true /*quickMode=*/, false, threads /*overwrite=*/, true,
/*deleteMode=*/true, /*setowner=*/false, /*showStatistics=*/false, /*patterns=*/nil) /*deleteMode=*/ true /*setowner=*/, false /*showStatistics=*/, false /*patterns=*/, nil)
for _, f := range []string{"file1", "file2", "dir1/file3"} { for _, f := range []string{"file1", "file2", "dir1/file3"} {
hash1 := getFileHash(testDir + "/repository1/" + f) hash1 := getFileHash(testDir + "/repository1/" + f)
@@ -325,8 +325,8 @@ func TestBackupManager(t *testing.T) {
os.Remove(testDir + "/repository1/file2") os.Remove(testDir + "/repository1/file2")
os.Remove(testDir + "/repository1/dir1/file3") os.Remove(testDir + "/repository1/dir1/file3")
SetDuplicacyPreferencePath(testDir + "/repository1/.duplicacy") SetDuplicacyPreferencePath(testDir + "/repository1/.duplicacy")
backupManager.Restore(testDir+"/repository1", 3, /*inPlace=*/true, /*quickMode=*/false, threads, /*overwrite=*/true, backupManager.Restore(testDir+"/repository1", 3 /*inPlace=*/, true /*quickMode=*/, false, threads /*overwrite=*/, true,
/*deleteMode=*/false, /*setowner=*/false, /*showStatistics=*/false, /*patterns=*/[]string{"+file2", "+dir1/file3", "-*"}) /*deleteMode=*/ false /*setowner=*/, false /*showStatistics=*/, false /*patterns=*/, []string{"+file2", "+dir1/file3", "-*"})
for _, f := range []string{"file1", "file2", "dir1/file3"} { for _, f := range []string{"file1", "file2", "dir1/file3"} {
hash1 := getFileHash(testDir + "/repository1/" + f) hash1 := getFileHash(testDir + "/repository1/" + f)

View File

@@ -5,18 +5,18 @@
package duplicacy package duplicacy
import ( import (
"os"
"bytes" "bytes"
"compress/zlib" "compress/zlib"
"crypto/aes" "crypto/aes"
"crypto/cipher" "crypto/cipher"
"crypto/rand"
"crypto/hmac" "crypto/hmac"
"crypto/rand"
"crypto/sha256" "crypto/sha256"
"encoding/hex" "encoding/hex"
"fmt" "fmt"
"hash" "hash"
"io" "io"
"os"
"runtime" "runtime"
"github.com/bkaradzic/go-lz4" "github.com/bkaradzic/go-lz4"
@@ -250,10 +250,7 @@ func (chunk *Chunk) Encrypt(encryptionKey []byte, derivationKey string) (err err
// PKCS7 is used. Compressed chunk sizes leaks information about the original chunks so we want the padding sizes // PKCS7 is used. Compressed chunk sizes leaks information about the original chunks so we want the padding sizes
// to be the maximum allowed by PKCS7 // to be the maximum allowed by PKCS7
dataLength := encryptedBuffer.Len() - offset dataLength := encryptedBuffer.Len() - offset
paddingLength := dataLength % 256 paddingLength := 256 - dataLength%256
if paddingLength == 0 {
paddingLength = 256
}
encryptedBuffer.Write(bytes.Repeat([]byte{byte(paddingLength)}, paddingLength)) encryptedBuffer.Write(bytes.Repeat([]byte{byte(paddingLength)}, paddingLength))
encryptedBuffer.Write(bytes.Repeat([]byte{0}, gcm.Overhead())) encryptedBuffer.Write(bytes.Repeat([]byte{0}, gcm.Overhead()))
@@ -270,7 +267,7 @@ func (chunk *Chunk) Encrypt(encryptionKey []byte, derivationKey string) (err err
} }
// This is to ensure compability with Vertical Backup, which still uses HMAC-SHA256 (instead of HMAC-BLAKE2) to // This is to ensure compatibility with Vertical Backup, which still uses HMAC-SHA256 (instead of HMAC-BLAKE2) to
// derive the key used to encrypt/decrypt files and chunks. // derive the key used to encrypt/decrypt files and chunks.
var DecryptWithHMACSHA256 = false var DecryptWithHMACSHA256 = false
@@ -345,7 +342,6 @@ func (chunk *Chunk) Decrypt(encryptionKey []byte, derivationKey string) (err err
return err return err
} }
paddingLength := int(decryptedBytes[len(decryptedBytes)-1]) paddingLength := int(decryptedBytes[len(decryptedBytes)-1])
if paddingLength == 0 { if paddingLength == 0 {
paddingLength = 256 paddingLength = 256

View File

@@ -22,6 +22,8 @@ func TestChunk(t *testing.T) {
config.CompressionLevel = DEFAULT_COMPRESSION_LEVEL config.CompressionLevel = DEFAULT_COMPRESSION_LEVEL
maxSize := 1000000 maxSize := 1000000
remainderLength := -1
for i := 0; i < 500; i++ { for i := 0; i < 500; i++ {
size := rand.Int() % maxSize size := rand.Int() % maxSize
@@ -44,6 +46,12 @@ func TestChunk(t *testing.T) {
encryptedData := make([]byte, chunk.GetLength()) encryptedData := make([]byte, chunk.GetLength())
copy(encryptedData, chunk.GetBytes()) copy(encryptedData, chunk.GetBytes())
if remainderLength == -1 {
remainderLength = len(encryptedData) % 256
} else if len(encryptedData)%256 != remainderLength {
t.Errorf("Incorrect padding size")
}
chunk.Reset(false) chunk.Reset(false)
chunk.Write(encryptedData) chunk.Write(encryptedData)
err = chunk.Decrypt(key, "") err = chunk.Decrypt(key, "")
@@ -63,7 +71,7 @@ func TestChunk(t *testing.T) {
} }
if bytes.Compare(plainData, decryptedData) != 0 { if bytes.Compare(plainData, decryptedData) != 0 {
t.Logf("orginal length: %d, decrypted length: %d", len(plainData), len(decryptedData)) t.Logf("Original length: %d, decrypted length: %d", len(plainData), len(decryptedData))
t.Errorf("Original data:\n%x\nDecrypted data:\n%x\n", plainData, decryptedData) t.Errorf("Original data:\n%x\nDecrypted data:\n%x\n", plainData, decryptedData)
} }

View File

@@ -178,7 +178,7 @@ func (downloader *ChunkDownloader) Reclaim(chunkIndex int) {
return return
} }
for i, _ := range downloader.completedTasks { for i := range downloader.completedTasks {
if i < chunkIndex && downloader.taskList[i].chunk != nil { if i < chunkIndex && downloader.taskList[i].chunk != nil {
downloader.config.PutChunk(downloader.taskList[i].chunk) downloader.config.PutChunk(downloader.taskList[i].chunk)
downloader.taskList[i].chunk = nil downloader.taskList[i].chunk = nil
@@ -253,7 +253,7 @@ func (downloader *ChunkDownloader) Stop() {
downloader.numberOfDownloadingChunks-- downloader.numberOfDownloadingChunks--
} }
for i, _ := range downloader.completedTasks { for i := range downloader.completedTasks {
downloader.config.PutChunk(downloader.taskList[i].chunk) downloader.config.PutChunk(downloader.taskList[i].chunk)
downloader.taskList[i].chunk = nil downloader.taskList[i].chunk = nil
downloader.numberOfActiveChunks-- downloader.numberOfActiveChunks--

View File

@@ -18,7 +18,7 @@ const (
ChunkOperationResurrect = 3 ChunkOperationResurrect = 3
) )
// ChunkOperatorTask is used to pass paramaters for different kinds of chunk operations. // ChunkOperatorTask is used to pass parameters for different kinds of chunk operations.
type ChunkOperatorTask struct { type ChunkOperatorTask struct {
operation int // The type of operation operation int // The type of operation
chunkID string // The chunk id chunkID string // The chunk id

View File

@@ -272,7 +272,7 @@ func (entry *Entry) IsLink() bool {
} }
func (entry *Entry) GetPermissions() os.FileMode { func (entry *Entry) GetPermissions() os.FileMode {
return os.FileMode(entry.Mode)&fileModeMask return os.FileMode(entry.Mode) & fileModeMask
} }
func (entry *Entry) IsSameAs(other *Entry) bool { func (entry *Entry) IsSameAs(other *Entry) bool {
@@ -292,7 +292,7 @@ func (entry *Entry) String(maxSizeDigits int) string {
func (entry *Entry) RestoreMetadata(fullPath string, fileInfo *os.FileInfo, setOwner bool) bool { func (entry *Entry) RestoreMetadata(fullPath string, fileInfo *os.FileInfo, setOwner bool) bool {
if fileInfo == nil { if fileInfo == nil {
stat, err := os.Stat(fullPath) stat, err := os.Lstat(fullPath)
fileInfo = &stat fileInfo = &stat
if err != nil { if err != nil {
LOG_ERROR("RESTORE_STAT", "Failed to retrieve the file info: %v", err) LOG_ERROR("RESTORE_STAT", "Failed to retrieve the file info: %v", err)
@@ -307,7 +307,8 @@ func (entry *Entry) RestoreMetadata(fullPath string, fileInfo *os.FileInfo, setO
} }
} }
if (*fileInfo).Mode()&fileModeMask != entry.GetPermissions() { // Only set the permission if the file is not a symlink
if !entry.IsLink() && (*fileInfo).Mode()&fileModeMask != entry.GetPermissions() {
err := os.Chmod(fullPath, entry.GetPermissions()) err := os.Chmod(fullPath, entry.GetPermissions())
if err != nil { if err != nil {
LOG_ERROR("RESTORE_CHMOD", "Failed to set the file permissions: %v", err) LOG_ERROR("RESTORE_CHMOD", "Failed to set the file permissions: %v", err)
@@ -315,7 +316,8 @@ func (entry *Entry) RestoreMetadata(fullPath string, fileInfo *os.FileInfo, setO
} }
} }
if (*fileInfo).ModTime().Unix() != entry.Time { // Only set the time if the file is not a symlink
if !entry.IsLink() && (*fileInfo).ModTime().Unix() != entry.Time {
modifiedTime := time.Unix(entry.Time, 0) modifiedTime := time.Unix(entry.Time, 0)
err := os.Chtimes(fullPath, modifiedTime, modifiedTime) err := os.Chtimes(fullPath, modifiedTime, modifiedTime)
if err != nil { if err != nil {
@@ -454,10 +456,10 @@ func ListEntries(top string, path string, fileList *[]*Entry, patterns []string,
if err != nil { if err != nil {
return directoryList, nil, err return directoryList, nil, err
} }
// This binary search works because ioutil.ReadDir returns files sorted by Name() by default // This binary search works because ioutil.ReadDir returns files sorted by Name() by default
if nobackupFile != "" { if nobackupFile != "" {
ii := sort.Search(len(files), func(ii int) bool { return strings.Compare(files[ii].Name(), nobackupFile) >= 0}) ii := sort.Search(len(files), func(ii int) bool { return strings.Compare(files[ii].Name(), nobackupFile) >= 0 })
if ii < len(files) && files[ii].Name() == nobackupFile { if ii < len(files) && files[ii].Name() == nobackupFile {
LOG_DEBUG("LIST_NOBACKUP", "%s is excluded due to nobackup file", path) LOG_DEBUG("LIST_NOBACKUP", "%s is excluded due to nobackup file", path)
return directoryList, skippedFiles, nil return directoryList, skippedFiles, nil

View File

@@ -34,7 +34,7 @@ func CreateFileReader(top string, files []*Entry) *FileReader {
return reader return reader
} }
// NextFile switchs to the next file in the file reader. // NextFile switches to the next file in the file reader.
func (reader *FileReader) NextFile() bool { func (reader *FileReader) NextFile() bool {
if reader.CurrentFile != nil { if reader.CurrentFile != nil {

View File

@@ -165,7 +165,7 @@ func (storage *FileStorage) UploadFile(threadIndex int, filePath string, content
} }
} else { } else {
if !stat.IsDir() { if !stat.IsDir() {
fmt.Errorf("The path %s is not a directory", dir) return fmt.Errorf("The path %s is not a directory", dir)
} }
} }
} }
@@ -190,7 +190,16 @@ func (storage *FileStorage) UploadFile(threadIndex int, filePath string, content
return err return err
} }
file.Close() err = file.Sync()
if err != nil {
file.Close()
return err
}
err = file.Close()
if err != nil {
return err
}
err = os.Rename(temporaryFile, fullPath) err = os.Rename(temporaryFile, fullPath)
if err != nil { if err != nil {

View File

@@ -25,7 +25,7 @@ import (
) )
var ( var (
GCDFileMimeType = "application/octet-stream" GCDFileMimeType = "application/octet-stream"
GCDDirectoryMimeType = "application/vnd.google-apps.folder" GCDDirectoryMimeType = "application/vnd.google-apps.folder"
) )
@@ -33,7 +33,7 @@ type GCDStorage struct {
StorageBase StorageBase
service *drive.Service service *drive.Service
idCache map[string]string // only directories are saved in this cache idCache map[string]string // only directories are saved in this cache
idCacheLock sync.Mutex idCacheLock sync.Mutex
backoffs []int // desired backoff time in seconds for each thread backoffs []int // desired backoff time in seconds for each thread
attempts []int // number of failed attempts since last success for each thread attempts []int // number of failed attempts since last success for each thread
@@ -78,6 +78,10 @@ func (storage *GCDStorage) shouldRetry(threadIndex int, err error) (bool, error)
// User Rate Limit Exceeded // User Rate Limit Exceeded
message = e.Message message = e.Message
retry = true retry = true
} else if e.Code == 408 {
// Request timeout
message = e.Message
retry = true
} else if e.Code == 401 { } else if e.Code == 401 {
// Only retry on authorization error when storage has been connected before // Only retry on authorization error when storage has been connected before
if storage.isConnected { if storage.isConnected {
@@ -291,7 +295,7 @@ func (storage *GCDStorage) getIDFromPath(threadIndex int, filePath string, creat
} else if isDir { } else if isDir {
storage.savePathID(current, fileID) storage.savePathID(current, fileID)
} }
if i != len(names) - 1 && !isDir { if i != len(names)-1 && !isDir {
return "", fmt.Errorf("Path '%s' is not a directory", current) return "", fmt.Errorf("Path '%s' is not a directory", current)
} }
} }
@@ -386,11 +390,11 @@ func (storage *GCDStorage) ListFiles(threadIndex int, dir string) ([]string, []i
subDirs := []string{} subDirs := []string{}
for _, file := range files { for _, file := range files {
storage.savePathID("snapshots/" + file.Name, file.Id) storage.savePathID("snapshots/"+file.Name, file.Id)
subDirs = append(subDirs, file.Name + "/") subDirs = append(subDirs, file.Name+"/")
} }
return subDirs, nil, nil return subDirs, nil, nil
} else if strings.HasPrefix(dir, "snapshots/") { } else if strings.HasPrefix(dir, "snapshots/") || strings.HasPrefix(dir, "benchmark") {
pathID, err := storage.getIDFromPath(threadIndex, dir, false) pathID, err := storage.getIDFromPath(threadIndex, dir, false)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
@@ -438,8 +442,8 @@ func (storage *GCDStorage) ListFiles(threadIndex int, dir string) ([]string, []i
files = append(files, name) files = append(files, name)
sizes = append(sizes, entry.Size) sizes = append(sizes, entry.Size)
} else { } else {
parents = append(parents, parent+ "/" + entry.Name) parents = append(parents, parent+"/"+entry.Name)
storage.savePathID(parent + "/" + entry.Name, entry.Id) storage.savePathID(parent+"/"+entry.Name, entry.Id)
} }
} }
} }
@@ -624,13 +628,22 @@ func (storage *GCDStorage) DownloadFile(threadIndex int, filePath string, chunk
var response *http.Response var response *http.Response
for { for {
response, err = storage.service.Files.Get(fileID).Download() // AcknowledgeAbuse(true) lets the download proceed even if GCD thinks that it contains malware.
if retry, err := storage.shouldRetry(threadIndex, err); err == nil && !retry { // TODO: Should this prompt the user or log a warning?
req := storage.service.Files.Get(fileID)
if e, ok := err.(*googleapi.Error); ok {
if strings.Contains(err.Error(), "cannotDownloadAbusiveFile") || len(e.Errors) > 0 && e.Errors[0].Reason == "cannotDownloadAbusiveFile" {
LOG_WARN("GCD_STORAGE", "%s is marked as abusive, will download anyway.", filePath)
req = req.AcknowledgeAbuse(true)
}
}
response, err = req.Download()
if retry, retry_err := storage.shouldRetry(threadIndex, err); retry_err == nil && !retry {
break break
} else if retry { } else if retry {
continue continue
} else { } else {
return err return retry_err
} }
} }

View File

@@ -113,7 +113,7 @@ func (storage *HubicStorage) ListFiles(threadIndex int, dir string) ([]string, [
for _, entry := range entries { for _, entry := range entries {
if entry.Type == "application/directory" { if entry.Type == "application/directory" {
files = append(files, entry.Name + "/") files = append(files, entry.Name+"/")
sizes = append(sizes, 0) sizes = append(sizes, 0)
} else { } else {
files = append(files, entry.Name) files = append(files, entry.Name)

View File

@@ -136,6 +136,16 @@ func keyringSet(key string, value string) bool {
if value == "" { if value == "" {
keyring[key] = nil keyring[key] = nil
} else { } else {
// Check if the value to be set is the same as the existing one
existingEncryptedValue := keyring[key]
if len(existingEncryptedValue) > 0 {
existingValue, err := keyringDecrypt(existingEncryptedValue)
if err == nil && string(existingValue) == value {
return true
}
}
encryptedValue, err := keyringEncrypt([]byte(value)) encryptedValue, err := keyringEncrypt([]byte(value))
if err != nil { if err != nil {
LOG_DEBUG("KEYRING_ENCRYPT", "Failed to encrypt the value: %v", err) LOG_DEBUG("KEYRING_ENCRYPT", "Failed to encrypt the value: %v", err)

View File

@@ -97,7 +97,7 @@ func (storage *OneDriveStorage) ListFiles(threadIndex int, dir string) ([]string
} }
} }
return subDirs, nil, nil return subDirs, nil, nil
} else if strings.HasPrefix(dir, "snapshots/") { } else if strings.HasPrefix(dir, "snapshots/") || strings.HasPrefix(dir, "benchmark") {
entries, err := storage.client.ListEntries(storage.storageDir + "/" + dir) entries, err := storage.client.ListEntries(storage.storageDir + "/" + dir)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err

View File

@@ -210,7 +210,7 @@ func (storage *S3Storage) DownloadFile(threadIndex int, filePath string, chunk *
defer output.Body.Close() defer output.Body.Close()
_, err = RateLimitedCopy(chunk, output.Body, storage.DownloadRateLimit/len(storage.bucket)) _, err = RateLimitedCopy(chunk, output.Body, storage.DownloadRateLimit/storage.numberOfThreads)
return err return err
} }
@@ -225,7 +225,7 @@ func (storage *S3Storage) UploadFile(threadIndex int, filePath string, content [
Bucket: aws.String(storage.bucket), Bucket: aws.String(storage.bucket),
Key: aws.String(storage.storageDir + filePath), Key: aws.String(storage.storageDir + filePath),
ACL: aws.String(s3.ObjectCannedACLPrivate), ACL: aws.String(s3.ObjectCannedACLPrivate),
Body: CreateRateLimitedReader(content, storage.UploadRateLimit/len(storage.bucket)), Body: CreateRateLimitedReader(content, storage.UploadRateLimit/storage.numberOfThreads),
ContentType: aws.String("application/duplicacy"), ContentType: aws.String("application/duplicacy"),
} }
@@ -237,8 +237,6 @@ func (storage *S3Storage) UploadFile(threadIndex int, filePath string, content [
LOG_INFO("S3_RETRY", "Retrying on %s: %v", reflect.TypeOf(err), err) LOG_INFO("S3_RETRY", "Retrying on %s: %v", reflect.TypeOf(err), err)
attempts += 1 attempts += 1
} }
return err
} }
// If a local snapshot cache is needed for the storage to avoid downloading/uploading chunks too often when // If a local snapshot cache is needed for the storage to avoid downloading/uploading chunks too often when

View File

@@ -14,6 +14,7 @@ import (
"runtime" "runtime"
"strings" "strings"
"time" "time"
"sync"
"github.com/pkg/sftp" "github.com/pkg/sftp"
"golang.org/x/crypto/ssh" "golang.org/x/crypto/ssh"
@@ -23,9 +24,13 @@ type SFTPStorage struct {
StorageBase StorageBase
client *sftp.Client client *sftp.Client
clientLock sync.Mutex
minimumNesting int // The minimum level of directories to dive into before searching for the chunk file. minimumNesting int // The minimum level of directories to dive into before searching for the chunk file.
storageDir string storageDir string
numberOfThreads int numberOfThreads int
numberOfTries int
serverAddress string
sftpConfig *ssh.ClientConfig
} }
func CreateSFTPStorageWithPassword(server string, port int, username string, storageDir string, func CreateSFTPStorageWithPassword(server string, port int, username string, storageDir string,
@@ -86,6 +91,9 @@ func CreateSFTPStorage(server string, port int, username string, storageDir stri
storageDir: storageDir, storageDir: storageDir,
minimumNesting: minimumNesting, minimumNesting: minimumNesting,
numberOfThreads: threads, numberOfThreads: threads,
numberOfTries: 6,
serverAddress: serverAddress,
sftpConfig: sftpConfig,
} }
// Random number fo generating the temporary chunk file suffix. // Random number fo generating the temporary chunk file suffix.
@@ -99,13 +107,60 @@ func CreateSFTPStorage(server string, port int, username string, storageDir stri
} }
func CloseSFTPStorage(storage *SFTPStorage) { func CloseSFTPStorage(storage *SFTPStorage) {
storage.client.Close() if storage.client != nil {
storage.client.Close()
storage.client = nil
}
} }
func (storage *SFTPStorage) getSFTPClient() *sftp.Client {
storage.clientLock.Lock()
defer storage.clientLock.Unlock()
return storage.client
}
func (storage *SFTPStorage) retry(f func () error) error {
delay := time.Second
for i := 0;; i++ {
err := f()
if err != nil && strings.Contains(err.Error(), "EOF") && i < storage.numberOfTries {
LOG_WARN("SFTP_RETRY", "Encountered an error (%v); retry after %d second(s)", err, delay/time.Second)
time.Sleep(delay)
delay *= 2
storage.clientLock.Lock()
if storage.client != nil {
storage.client.Close()
storage.client = nil
}
connection, err := ssh.Dial("tcp", storage.serverAddress, storage.sftpConfig)
if err != nil {
storage.clientLock.Unlock()
return err
}
client, err := sftp.NewClient(connection)
if err != nil {
connection.Close()
storage.clientLock.Unlock()
return err
}
storage.client = client
storage.clientLock.Unlock()
continue
}
return err
}
}
// ListFiles return the list of files and subdirectories under 'file' (non-recursively) // ListFiles return the list of files and subdirectories under 'file' (non-recursively)
func (storage *SFTPStorage) ListFiles(threadIndex int, dirPath string) (files []string, sizes []int64, err error) { func (storage *SFTPStorage) ListFiles(threadIndex int, dirPath string) (files []string, sizes []int64, err error) {
entries, err := storage.client.ReadDir(path.Join(storage.storageDir, dirPath)) var entries []os.FileInfo
err = storage.retry(func() error {
entries, err = storage.getSFTPClient().ReadDir(path.Join(storage.storageDir, dirPath))
return err
})
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
@@ -126,7 +181,11 @@ func (storage *SFTPStorage) ListFiles(threadIndex int, dirPath string) (files []
// DeleteFile deletes the file or directory at 'filePath'. // DeleteFile deletes the file or directory at 'filePath'.
func (storage *SFTPStorage) DeleteFile(threadIndex int, filePath string) (err error) { func (storage *SFTPStorage) DeleteFile(threadIndex int, filePath string) (err error) {
fullPath := path.Join(storage.storageDir, filePath) fullPath := path.Join(storage.storageDir, filePath)
fileInfo, err := storage.client.Stat(fullPath) var fileInfo os.FileInfo
err = storage.retry(func() error {
fileInfo, err = storage.getSFTPClient().Stat(fullPath)
return err
})
if err != nil { if err != nil {
if os.IsNotExist(err) { if os.IsNotExist(err) {
LOG_TRACE("SFTP_STORAGE", "File %s has disappeared before deletion", filePath) LOG_TRACE("SFTP_STORAGE", "File %s has disappeared before deletion", filePath)
@@ -137,33 +196,47 @@ func (storage *SFTPStorage) DeleteFile(threadIndex int, filePath string) (err er
if fileInfo == nil { if fileInfo == nil {
return nil return nil
} }
return storage.client.Remove(path.Join(storage.storageDir, filePath)) return storage.retry(func() error { return storage.getSFTPClient().Remove(path.Join(storage.storageDir, filePath)) })
} }
// MoveFile renames the file. // MoveFile renames the file.
func (storage *SFTPStorage) MoveFile(threadIndex int, from string, to string) (err error) { func (storage *SFTPStorage) MoveFile(threadIndex int, from string, to string) (err error) {
toPath := path.Join(storage.storageDir, to) toPath := path.Join(storage.storageDir, to)
fileInfo, err := storage.client.Stat(toPath) var fileInfo os.FileInfo
err = storage.retry(func() error {
fileInfo, err = storage.getSFTPClient().Stat(toPath)
return err
})
if fileInfo != nil { if fileInfo != nil {
return fmt.Errorf("The destination file %s already exists", toPath) return fmt.Errorf("The destination file %s already exists", toPath)
} }
return storage.client.Rename(path.Join(storage.storageDir, from), err = storage.retry(func() error { return storage.getSFTPClient().Rename(path.Join(storage.storageDir, from),
path.Join(storage.storageDir, to)) path.Join(storage.storageDir, to)) })
return err
} }
// CreateDirectory creates a new directory. // CreateDirectory creates a new directory.
func (storage *SFTPStorage) CreateDirectory(threadIndex int, dirPath string) (err error) { func (storage *SFTPStorage) CreateDirectory(threadIndex int, dirPath string) (err error) {
fullPath := path.Join(storage.storageDir, dirPath) fullPath := path.Join(storage.storageDir, dirPath)
fileInfo, err := storage.client.Stat(fullPath) var fileInfo os.FileInfo
err = storage.retry(func() error {
fileInfo, err = storage.getSFTPClient().Stat(fullPath)
return err
})
if fileInfo != nil && fileInfo.IsDir() { if fileInfo != nil && fileInfo.IsDir() {
return nil return nil
} }
return storage.client.Mkdir(path.Join(storage.storageDir, dirPath)) return storage.retry(func() error { return storage.getSFTPClient().Mkdir(path.Join(storage.storageDir, dirPath)) })
} }
// GetFileInfo returns the information about the file or directory at 'filePath'. // GetFileInfo returns the information about the file or directory at 'filePath'.
func (storage *SFTPStorage) GetFileInfo(threadIndex int, filePath string) (exist bool, isDir bool, size int64, err error) { func (storage *SFTPStorage) GetFileInfo(threadIndex int, filePath string) (exist bool, isDir bool, size int64, err error) {
fileInfo, err := storage.client.Stat(path.Join(storage.storageDir, filePath)) var fileInfo os.FileInfo
err = storage.retry(func() error {
fileInfo, err = storage.getSFTPClient().Stat(path.Join(storage.storageDir, filePath))
return err
})
if err != nil { if err != nil {
if os.IsNotExist(err) { if os.IsNotExist(err) {
return false, false, 0, nil return false, false, 0, nil
@@ -181,18 +254,19 @@ func (storage *SFTPStorage) GetFileInfo(threadIndex int, filePath string) (exist
// DownloadFile reads the file at 'filePath' into the chunk. // DownloadFile reads the file at 'filePath' into the chunk.
func (storage *SFTPStorage) DownloadFile(threadIndex int, filePath string, chunk *Chunk) (err error) { func (storage *SFTPStorage) DownloadFile(threadIndex int, filePath string, chunk *Chunk) (err error) {
file, err := storage.client.Open(path.Join(storage.storageDir, filePath)) return storage.retry(func() error {
file, err := storage.getSFTPClient().Open(path.Join(storage.storageDir, filePath))
if err != nil { if err != nil {
return err return err
} }
defer file.Close() defer file.Close()
if _, err = RateLimitedCopy(chunk, file, storage.DownloadRateLimit/storage.numberOfThreads); err != nil { if _, err = RateLimitedCopy(chunk, file, storage.DownloadRateLimit/storage.numberOfThreads); err != nil {
return err return err
} }
return nil
return nil })
} }
// UploadFile writes 'content' to the file at 'filePath'. // UploadFile writes 'content' to the file at 'filePath'.
@@ -203,20 +277,26 @@ func (storage *SFTPStorage) UploadFile(threadIndex int, filePath string, content
dirs := strings.Split(filePath, "/") dirs := strings.Split(filePath, "/")
if len(dirs) > 1 { if len(dirs) > 1 {
fullDir := path.Dir(fullPath) fullDir := path.Dir(fullPath)
_, err := storage.client.Stat(fullDir) err = storage.retry(func() error {
_, err := storage.getSFTPClient().Stat(fullDir)
return err
})
if err != nil { if err != nil {
// The error may be caused by a non-existent fullDir, or a broken connection. In either case, // The error may be caused by a non-existent fullDir, or a broken connection. In either case,
// we just assume it is the former because there isn't a way to tell which is the case. // we just assume it is the former because there isn't a way to tell which is the case.
for i, _ := range dirs[1 : len(dirs)-1] { for i := range dirs[1 : len(dirs)-1] {
subDir := path.Join(storage.storageDir, path.Join(dirs[0:i+2]...)) subDir := path.Join(storage.storageDir, path.Join(dirs[0:i+2]...))
// We don't check the error; just keep going blindly but always store the last err // We don't check the error; just keep going blindly but always store the last err
err = storage.client.Mkdir(subDir) err = storage.getSFTPClient().Mkdir(subDir)
} }
// If there is an error creating the dirs, we check fullDir one more time, because another thread // If there is an error creating the dirs, we check fullDir one more time, because another thread
// may happen to create the same fullDir ahead of this thread // may happen to create the same fullDir ahead of this thread
if err != nil { if err != nil {
_, err := storage.client.Stat(fullDir) err = storage.retry(func() error {
_, err := storage.getSFTPClient().Stat(fullDir)
return err
})
if err != nil { if err != nil {
return err return err
} }
@@ -224,39 +304,41 @@ func (storage *SFTPStorage) UploadFile(threadIndex int, filePath string, content
} }
} }
letters := "abcdefghijklmnopqrstuvwxyz" return storage.retry(func() error {
suffix := make([]byte, 8)
for i := range suffix {
suffix[i] = letters[rand.Intn(len(letters))]
}
temporaryFile := fullPath + "." + string(suffix) + ".tmp" letters := "abcdefghijklmnopqrstuvwxyz"
suffix := make([]byte, 8)
file, err := storage.client.OpenFile(temporaryFile, os.O_WRONLY|os.O_CREATE|os.O_TRUNC) for i := range suffix {
if err != nil { suffix[i] = letters[rand.Intn(len(letters))]
return err
}
reader := CreateRateLimitedReader(content, storage.UploadRateLimit/storage.numberOfThreads)
_, err = io.Copy(file, reader)
if err != nil {
file.Close()
return err
}
file.Close()
err = storage.client.Rename(temporaryFile, fullPath)
if err != nil {
if _, err = storage.client.Stat(fullPath); err == nil {
storage.client.Remove(temporaryFile)
return nil
} else {
return fmt.Errorf("Uploaded file but failed to store it at %s: %v", fullPath, err)
} }
}
return nil temporaryFile := fullPath + "." + string(suffix) + ".tmp"
file, err := storage.getSFTPClient().OpenFile(temporaryFile, os.O_WRONLY|os.O_CREATE|os.O_TRUNC)
if err != nil {
return err
}
reader := CreateRateLimitedReader(content, storage.UploadRateLimit/storage.numberOfThreads)
_, err = io.Copy(file, reader)
if err != nil {
file.Close()
return err
}
file.Close()
err = storage.getSFTPClient().Rename(temporaryFile, fullPath)
if err != nil {
if _, err = storage.getSFTPClient().Stat(fullPath); err == nil {
storage.getSFTPClient().Remove(temporaryFile)
return nil
} else {
return fmt.Errorf("Uploaded file but failed to store it at %s: %v", fullPath, err)
}
}
return nil
})
} }
// If a local snapshot cache is needed for the storage to avoid downloading/uploading chunks too often when // If a local snapshot cache is needed for the storage to avoid downloading/uploading chunks too often when

View File

@@ -25,7 +25,7 @@ var snapshotDate string
func CharsToString(ca []int8) string { func CharsToString(ca []int8) string {
len := len(ca) len := len(ca)
ba := make([]byte, len) ba := make([]byte, len)
for i, v := range ca { for i, v := range ca {
ba[i] = byte(v) ba[i] = byte(v)
@@ -54,8 +54,8 @@ func GetPathDeviceId(path string) (deviceId int32, err error) {
// Executes shell command with timeout and returns stdout // Executes shell command with timeout and returns stdout
func CommandWithTimeout(timeoutInSeconds int, name string, arg ...string) (output string, err error) { func CommandWithTimeout(timeoutInSeconds int, name string, arg ...string) (output string, err error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(timeoutInSeconds) * time.Second) ctx, cancel := context.WithTimeout(context.Background(), time.Duration(timeoutInSeconds)*time.Second)
defer cancel() defer cancel()
cmd := exec.CommandContext(ctx, name, arg...) cmd := exec.CommandContext(ctx, name, arg...)
out, err := cmd.Output() out, err := cmd.Output()
@@ -91,10 +91,10 @@ func DeleteShadowCopy() {
LOG_ERROR("VSS_DELETE", "Error while deleting temporary mount directory") LOG_ERROR("VSS_DELETE", "Error while deleting temporary mount directory")
return return
} }
LOG_INFO("VSS_DELETE", "Shadow copy unmounted and deleted at %s", snapshotPath) LOG_INFO("VSS_DELETE", "Shadow copy unmounted and deleted at %s", snapshotPath)
snapshotPath = "" snapshotPath = ""
} }
func CreateShadowCopy(top string, shadowCopy bool, timeoutInSeconds int) (shadowTop string) { func CreateShadowCopy(top string, shadowCopy bool, timeoutInSeconds int) (shadowTop string) {
@@ -128,7 +128,7 @@ func CreateShadowCopy(top string, shadowCopy bool, timeoutInSeconds int) (shadow
} }
if deviceIdLocal != deviceIdRepository { if deviceIdLocal != deviceIdRepository {
LOG_WARN("VSS_PATH", "VSS not supported for non-local repository path: ", top) LOG_WARN("VSS_PATH", "VSS not supported for non-local repository path: ", top)
return top return top
} }
if timeoutInSeconds <= 60 { if timeoutInSeconds <= 60 {
@@ -157,8 +157,8 @@ func CreateShadowCopy(top string, shadowCopy bool, timeoutInSeconds int) (shadow
snapshotDate = strings.TrimSpace(tmutilOutput[colonPos+1:]) snapshotDate = strings.TrimSpace(tmutilOutput[colonPos+1:])
// Mount snapshot as readonly and hide from GUI i.e. Finder // Mount snapshot as readonly and hide from GUI i.e. Finder
_, err = CommandWithTimeout(timeoutInSeconds, _, err = CommandWithTimeout(timeoutInSeconds,
"/sbin/mount", "-t", "apfs", "-o", "nobrowse,-r,-s=com.apple.TimeMachine." + snapshotDate, "/", snapshotPath) "/sbin/mount", "-t", "apfs", "-o", "nobrowse,-r,-s=com.apple.TimeMachine."+snapshotDate, "/", snapshotPath)
if err != nil { if err != nil {
LOG_ERROR("VSS_CREATE", "Error while mounting snapshot: ", err) LOG_ERROR("VSS_CREATE", "Error while mounting snapshot: ", err)
return top return top

View File

@@ -11,6 +11,7 @@ import (
"io/ioutil" "io/ioutil"
"os" "os"
"path" "path"
"path/filepath"
"strconv" "strconv"
"strings" "strings"
"time" "time"
@@ -68,47 +69,7 @@ func CreateSnapshotFromDirectory(id string, top string, nobackupFile string) (sn
var patterns []string var patterns []string
patternFile, err := ioutil.ReadFile(path.Join(GetDuplicacyPreferencePath(), "filters")) patterns = ProcessFilters()
if err == nil {
for _, pattern := range strings.Split(string(patternFile), "\n") {
pattern = strings.TrimSpace(pattern)
if len(pattern) == 0 {
continue
}
if pattern[0] == '#' {
continue
}
if IsUnspecifiedFilter(pattern) {
pattern = "+" + pattern
}
if IsEmptyFilter(pattern) {
continue
}
if strings.HasPrefix(pattern, "i:") || strings.HasPrefix(pattern, "e:") {
valid, err := IsValidRegex(pattern[2:])
if !valid || err != nil {
LOG_ERROR("SNAPSHOT_FILTER", "Invalid regular expression encountered for filter: \"%s\", error: %v", pattern, err)
}
}
patterns = append(patterns, pattern)
}
LOG_DEBUG("REGEX_DEBUG", "There are %d compiled regular expressions stored", len(RegexMap))
LOG_INFO("SNAPSHOT_FILTER", "Loaded %d include/exclude pattern(s)", len(patterns))
if IsTracing() {
for _, pattern := range patterns {
LOG_TRACE("SNAPSHOT_PATTERN", "Pattern: %s", pattern)
}
}
}
directories := make([]*Entry, 0, 256) directories := make([]*Entry, 0, 256)
directories = append(directories, CreateEntry("", 0, 0, 0)) directories = append(directories, CreateEntry("", 0, 0, 0))
@@ -150,6 +111,103 @@ func CreateSnapshotFromDirectory(id string, top string, nobackupFile string) (sn
return snapshot, skippedDirectories, skippedFiles, nil return snapshot, skippedDirectories, skippedFiles, nil
} }
func AppendPattern(patterns []string, new_pattern string) (new_patterns []string) {
for _, pattern := range patterns {
if pattern == new_pattern {
LOG_INFO("SNAPSHOT_FILTER", "Ignoring duplicate pattern: %s ...", new_pattern)
return patterns
}
}
new_patterns = append(patterns, new_pattern)
return new_patterns
}
func ProcessFilters() (patterns []string) {
patterns = ProcessFilterFile(joinPath(GetDuplicacyPreferencePath(), "filters"), make([]string, 0))
LOG_DEBUG("REGEX_DEBUG", "There are %d compiled regular expressions stored", len(RegexMap))
LOG_INFO("SNAPSHOT_FILTER", "Loaded %d include/exclude pattern(s)", len(patterns))
if IsTracing() {
for _, pattern := range patterns {
LOG_TRACE("SNAPSHOT_PATTERN", "Pattern: %s", pattern)
}
}
return patterns
}
func ProcessFilterFile(patternFile string, includedFiles []string) (patterns []string) {
for _, file := range includedFiles {
if file == patternFile {
// cycle in include mechanism discovered.
LOG_ERROR("SNAPSHOT_FILTER", "The filter file %s has already been included", patternFile)
return patterns
}
}
includedFiles = append(includedFiles, patternFile)
LOG_INFO("SNAPSHOT_FILTER", "Parsing filter file %s", patternFile)
patternFileContent, err := ioutil.ReadFile(patternFile)
if err == nil {
patternFileLines := strings.Split(string(patternFileContent), "\n")
patterns = ProcessFilterLines(patternFileLines, includedFiles)
}
return patterns
}
func ProcessFilterLines(patternFileLines []string, includedFiles []string) (patterns []string) {
for _, pattern := range patternFileLines {
pattern = strings.TrimSpace(pattern)
if len(pattern) == 0 {
continue
}
if strings.HasPrefix(pattern, "@") {
patternIncludeFile := strings.TrimSpace(pattern[1:])
if patternIncludeFile == "" {
continue
}
if ! filepath.IsAbs(patternIncludeFile) {
basePath := ""
if len(includedFiles) == 0 {
basePath, _ = os.Getwd()
} else {
basePath = filepath.Dir(includedFiles[len(includedFiles)-1])
}
patternIncludeFile = joinPath(basePath, patternIncludeFile)
}
for _, pattern := range ProcessFilterFile(patternIncludeFile, includedFiles) {
patterns = AppendPattern(patterns, pattern)
}
continue
}
if pattern[0] == '#' {
continue
}
if IsUnspecifiedFilter(pattern) {
pattern = "+" + pattern
}
if IsEmptyFilter(pattern) {
continue
}
if strings.HasPrefix(pattern, "i:") || strings.HasPrefix(pattern, "e:") {
valid, err := IsValidRegex(pattern[2:])
if !valid || err != nil {
LOG_ERROR("SNAPSHOT_FILTER", "Invalid regular expression encountered for filter: \"%s\", error: %v", pattern, err)
}
}
patterns = AppendPattern(patterns, pattern)
}
return patterns
}
// This is the struct used to save/load incomplete snapshots // This is the struct used to save/load incomplete snapshots
type IncompleteSnapshot struct { type IncompleteSnapshot struct {
Files []*Entry Files []*Entry

View File

@@ -57,7 +57,7 @@ func CreateFossilCollection(allSnapshots map[string][]*Snapshot) *FossilCollecti
} }
return &FossilCollection{ return &FossilCollection{
LastRevisions: lastRevisions, LastRevisions: lastRevisions,
DeletedRevisions: make(map[string][]int), DeletedRevisions: make(map[string][]int),
} }
} }
@@ -170,6 +170,16 @@ func (collection *FossilCollection) IsEmpty() bool {
return len(collection.Fossils) == 0 && len(collection.Temporaries) == 0 return len(collection.Fossils) == 0 && len(collection.Temporaries) == 0
} }
// Calculates the number of days between two times ignoring the hours, minutes and seconds.
func getDaysBetween(start int64, end int64) int {
startTime := time.Unix(start, 0).In(time.Now().Location())
endTime := time.Unix(end, 0).In(time.Now().Location())
startDate := time.Date(startTime.Year(), startTime.Month(), startTime.Day(), 0, 0, 0, 0, startTime.Location())
endDate := time.Date(endTime.Year(), endTime.Month(), endTime.Day(), 0, 0, 0, 0, endTime.Location())
hours := int(endDate.Sub(startDate).Hours())
return (hours + 1) / 24
}
// SnapshotManager is mainly responsible for downloading, and deleting snapshots. // SnapshotManager is mainly responsible for downloading, and deleting snapshots.
type SnapshotManager struct { type SnapshotManager struct {
@@ -376,7 +386,7 @@ func (manager *SnapshotManager) CleanSnapshotCache(latestSnapshot *Snapshot, all
if allSnapshots == nil { if allSnapshots == nil {
// If the 'fossils' directory exists then don't clean the cache as all snapshots will be needed later // If the 'fossils' directory exists then don't clean the cache as all snapshots will be needed later
// during the fossil collection phase. The deletion procedure creates this direcotry. // during the fossil collection phase. The deletion procedure creates this directory.
// We only check this condition when allSnapshots is nil because // We only check this condition when allSnapshots is nil because
// in thise case it is the deletion procedure that is trying to clean the snapshot cache. // in thise case it is the deletion procedure that is trying to clean the snapshot cache.
exist, _, _, err := manager.snapshotCache.GetFileInfo(0, "fossils") exist, _, _, err := manager.snapshotCache.GetFileInfo(0, "fossils")
@@ -679,6 +689,9 @@ func (manager *SnapshotManager) ListSnapshots(snapshotID string, revisionsToList
for _, revision := range revisions { for _, revision := range revisions {
snapshot := manager.DownloadSnapshot(snapshotID, revision) snapshot := manager.DownloadSnapshot(snapshotID, revision)
if tag != "" && snapshot.Tag != tag {
continue
}
creationTime := time.Unix(snapshot.StartTime, 0).Format("2006-01-02 15:04") creationTime := time.Unix(snapshot.StartTime, 0).Format("2006-01-02 15:04")
tagWithSpace := "" tagWithSpace := ""
if len(snapshot.Tag) > 0 { if len(snapshot.Tag) > 0 {
@@ -687,15 +700,16 @@ func (manager *SnapshotManager) ListSnapshots(snapshotID string, revisionsToList
LOG_INFO("SNAPSHOT_INFO", "Snapshot %s revision %d created at %s %s%s", LOG_INFO("SNAPSHOT_INFO", "Snapshot %s revision %d created at %s %s%s",
snapshotID, revision, creationTime, tagWithSpace, snapshot.Options) snapshotID, revision, creationTime, tagWithSpace, snapshot.Options)
if tag != "" && snapshot.Tag != tag {
continue
}
if showFiles { if showFiles {
manager.DownloadSnapshotFileSequence(snapshot, nil, false) manager.DownloadSnapshotFileSequence(snapshot, nil, false)
} }
if showFiles { if showFiles {
if snapshot.NumberOfFiles > 0 {
LOG_INFO("SNAPSHOT_STATS", "Files: %d", snapshot.NumberOfFiles)
}
maxSize := int64(9) maxSize := int64(9)
maxSizeDigits := 1 maxSizeDigits := 1
totalFiles := 0 totalFiles := 0
@@ -793,7 +807,7 @@ func (manager *SnapshotManager) CheckSnapshots(snapshotID string, revisionsToChe
snapshotIDIndex := 0 snapshotIDIndex := 0
totalMissingChunks := 0 totalMissingChunks := 0
for snapshotID, _ = range snapshotMap { for snapshotID = range snapshotMap {
revisions := revisionsToCheck revisions := revisionsToCheck
if len(revisions) == 0 || showStatistics { if len(revisions) == 0 || showStatistics {
@@ -806,11 +820,28 @@ func (manager *SnapshotManager) CheckSnapshots(snapshotID string, revisionsToChe
for _, revision := range revisions { for _, revision := range revisions {
snapshot := manager.DownloadSnapshot(snapshotID, revision) snapshot := manager.DownloadSnapshot(snapshotID, revision)
snapshotMap[snapshotID] = append(snapshotMap[snapshotID], snapshot)
if tag != "" && snapshot.Tag != tag { if tag != "" && snapshot.Tag != tag {
continue continue
} }
snapshotMap[snapshotID] = append(snapshotMap[snapshotID], snapshot)
}
}
totalRevisions := 0
for _, snapshotList := range snapshotMap {
totalRevisions += len(snapshotList)
}
LOG_INFO("SNAPSHOT_CHECK", "%d snapshots and %d revisions", len(snapshotMap), totalRevisions)
var totalChunkSize int64
for _, size := range chunkSizeMap {
totalChunkSize += size
}
LOG_INFO("SNAPSHOT_CHECK", "Total chunk size is %s in %d chunks", PrettyNumber(totalChunkSize), len(chunkSizeMap))
for snapshotID = range snapshotMap {
for _, snapshot := range snapshotMap[snapshotID] {
if checkFiles { if checkFiles {
manager.DownloadSnapshotContents(snapshot, nil, false) manager.DownloadSnapshotContents(snapshot, nil, false)
@@ -824,22 +855,36 @@ func (manager *SnapshotManager) CheckSnapshots(snapshotID string, revisionsToChe
} }
missingChunks := 0 missingChunks := 0
for chunkID, _ := range chunks { for chunkID := range chunks {
_, found := chunkSizeMap[chunkID] _, found := chunkSizeMap[chunkID]
if !found { if !found {
// Look up the chunk again in case it actually exists, but only if there aren't
// too many missing chunks.
if missingChunks < 100 {
_, exist, _, err := manager.storage.FindChunk(0, chunkID, false)
if err != nil {
LOG_WARN("SNAPSHOT_VALIDATE", "Failed to check the existence of chunk %s: %v",
chunkID, err)
} else if exist {
LOG_INFO("SNAPSHOT_VALIDATE", "Chunk %s is confirmed to exist", chunkID)
continue
}
}
if !searchFossils { if !searchFossils {
missingChunks += 1 missingChunks += 1
LOG_WARN("SNAPSHOT_VALIDATE", LOG_WARN("SNAPSHOT_VALIDATE",
"Chunk %s referenced by snapshot %s at revision %d does not exist", "Chunk %s referenced by snapshot %s at revision %d does not exist",
chunkID, snapshotID, revision) chunkID, snapshotID, snapshot.Revision)
continue continue
} }
chunkPath, exist, size, err := manager.storage.FindChunk(0, chunkID, true) chunkPath, exist, size, err := manager.storage.FindChunk(0, chunkID, true)
if err != nil { if err != nil {
LOG_ERROR("SNAPSHOT_VALIDATE", "Failed to check the existence of chunk %s: %v", LOG_ERROR("SNAPSHOT_VALIDATE", "Failed to check the existence of fossil %s: %v",
chunkID, err) chunkID, err)
return false return false
} }
@@ -848,7 +893,7 @@ func (manager *SnapshotManager) CheckSnapshots(snapshotID string, revisionsToChe
missingChunks += 1 missingChunks += 1
LOG_WARN("SNAPSHOT_VALIDATE", LOG_WARN("SNAPSHOT_VALIDATE",
"Chunk %s referenced by snapshot %s at revision %d does not exist", "Chunk %s referenced by snapshot %s at revision %d does not exist",
chunkID, snapshotID, revision) chunkID, snapshotID, snapshot.Revision)
continue continue
} }
@@ -856,7 +901,7 @@ func (manager *SnapshotManager) CheckSnapshots(snapshotID string, revisionsToChe
manager.resurrectChunk(chunkPath, chunkID) manager.resurrectChunk(chunkPath, chunkID)
} else { } else {
LOG_WARN("SNAPSHOT_FOSSIL", "Chunk %s referenced by snapshot %s at revision %d "+ LOG_WARN("SNAPSHOT_FOSSIL", "Chunk %s referenced by snapshot %s at revision %d "+
"has been marked as a fossil", chunkID, snapshotID, revision) "has been marked as a fossil", chunkID, snapshotID, snapshot.Revision)
} }
chunkSizeMap[chunkID] = size chunkSizeMap[chunkID] = size
@@ -879,11 +924,11 @@ func (manager *SnapshotManager) CheckSnapshots(snapshotID string, revisionsToChe
if missingChunks > 0 { if missingChunks > 0 {
LOG_WARN("SNAPSHOT_CHECK", "Some chunks referenced by snapshot %s at revision %d are missing", LOG_WARN("SNAPSHOT_CHECK", "Some chunks referenced by snapshot %s at revision %d are missing",
snapshotID, revision) snapshotID, snapshot.Revision)
totalMissingChunks += missingChunks totalMissingChunks += missingChunks
} else { } else {
LOG_INFO("SNAPSHOT_CHECK", "All chunks referenced by snapshot %s at revision %d exist", LOG_INFO("SNAPSHOT_CHECK", "All chunks referenced by snapshot %s at revision %d exist",
snapshotID, revision) snapshotID, snapshot.Revision)
} }
} }
@@ -922,7 +967,7 @@ func (manager *SnapshotManager) ShowStatistics(snapshotMap map[string][]*Snapsho
var totalChunkSize int64 var totalChunkSize int64
var uniqueChunkSize int64 var uniqueChunkSize int64
for chunkID, _ := range chunks { for chunkID := range chunks {
chunkSize := chunkSizeMap[chunkID] chunkSize := chunkSizeMap[chunkID]
totalChunkSize += chunkSize totalChunkSize += chunkSize
if chunkUniqueMap[chunkID] { if chunkUniqueMap[chunkID] {
@@ -940,7 +985,7 @@ func (manager *SnapshotManager) ShowStatistics(snapshotMap map[string][]*Snapsho
var totalChunkSize int64 var totalChunkSize int64
var uniqueChunkSize int64 var uniqueChunkSize int64
for chunkID, _ := range snapshotChunks { for chunkID := range snapshotChunks {
chunkSize := chunkSizeMap[chunkID] chunkSize := chunkSizeMap[chunkID]
totalChunkSize += chunkSize totalChunkSize += chunkSize
@@ -967,18 +1012,20 @@ func (manager *SnapshotManager) ShowStatisticsTabular(snapshotMap map[string][]*
earliestSeenChunks := make(map[string]int) earliestSeenChunks := make(map[string]int)
for _, snapshot := range snapshotList { for _, snapshot := range snapshotList {
for _, chunkID := range manager.GetSnapshotChunks(snapshot, true) { for _, chunkID := range manager.GetSnapshotChunks(snapshot, false) {
if earliestSeenChunks[chunkID] == 0 { if earliestSeenChunks[chunkID] == 0 {
earliestSeenChunks[chunkID] = math.MaxInt32 earliestSeenChunks[chunkID] = math.MaxInt32
} }
earliestSeenChunks[chunkID] = MinInt(earliestSeenChunks[chunkID], snapshot.Revision) if earliestSeenChunks[chunkID] > snapshot.Revision {
earliestSeenChunks[chunkID] = snapshot.Revision
}
} }
} }
for _, snapshot := range snapshotList { for _, snapshot := range snapshotList {
chunks := make(map[string]bool) chunks := make(map[string]bool)
for _, chunkID := range manager.GetSnapshotChunks(snapshot, true) { for _, chunkID := range manager.GetSnapshotChunks(snapshot, false) {
chunks[chunkID] = true chunks[chunkID] = true
snapshotChunks[chunkID] = true snapshotChunks[chunkID] = true
} }
@@ -990,7 +1037,7 @@ func (manager *SnapshotManager) ShowStatisticsTabular(snapshotMap map[string][]*
var newChunkCount int64 var newChunkCount int64
var newChunkSize int64 var newChunkSize int64
for chunkID, _ := range chunks { for chunkID := range chunks {
chunkSize := chunkSizeMap[chunkID] chunkSize := chunkSizeMap[chunkID]
totalChunkSize += chunkSize totalChunkSize += chunkSize
totalChunkCount += 1 totalChunkCount += 1
@@ -1018,7 +1065,7 @@ func (manager *SnapshotManager) ShowStatisticsTabular(snapshotMap map[string][]*
var uniqueChunkSize int64 var uniqueChunkSize int64
var totalChunkCount int64 var totalChunkCount int64
var uniqueChunkCount int64 var uniqueChunkCount int64
for chunkID, _ := range snapshotChunks { for chunkID := range snapshotChunks {
chunkSize := chunkSizeMap[chunkID] chunkSize := chunkSizeMap[chunkID]
totalChunkSize += chunkSize totalChunkSize += chunkSize
totalChunkCount += 1 totalChunkCount += 1
@@ -1123,7 +1170,7 @@ func (manager *SnapshotManager) VerifySnapshot(snapshot *Snapshot) bool {
} }
} }
// RetrieveFile retrieve the file in the specifed snapshot. // RetrieveFile retrieves the file in the specified snapshot.
func (manager *SnapshotManager) RetrieveFile(snapshot *Snapshot, file *Entry, output func([]byte)) bool { func (manager *SnapshotManager) RetrieveFile(snapshot *Snapshot, file *Entry, output func([]byte)) bool {
if file.Size == 0 { if file.Size == 0 {
@@ -1238,15 +1285,14 @@ func (manager *SnapshotManager) PrintFile(snapshotID string, revision int, path
} }
file := manager.FindFile(snapshot, path, false) file := manager.FindFile(snapshot, path, false)
var content []byte if !manager.RetrieveFile(snapshot, file, func(chunk []byte) {
if !manager.RetrieveFile(snapshot, file, func(chunk []byte) { content = append(content, chunk...) }) { fmt.Printf("%s", chunk)
}) {
LOG_ERROR("SNAPSHOT_RETRIEVE", "File %s is corrupted in snapshot %s at revision %d", LOG_ERROR("SNAPSHOT_RETRIEVE", "File %s is corrupted in snapshot %s at revision %d",
path, snapshot.ID, snapshot.Revision) path, snapshot.ID, snapshot.Revision)
return false return false
} }
fmt.Printf("%s", string(content))
return true return true
} }
@@ -1907,7 +1953,7 @@ func (manager *SnapshotManager) PruneSnapshots(selfID string, snapshotID string,
// Find out which retent policy applies based on the age. // Find out which retent policy applies based on the age.
for i < len(retentionPolicies) && for i < len(retentionPolicies) &&
int(now-snapshot.StartTime) < retentionPolicies[i].Age*secondsInDay { getDaysBetween(snapshot.StartTime, now) < retentionPolicies[i].Age {
i++ i++
lastSnapshotTime = 0 lastSnapshotTime = 0
} }
@@ -1920,9 +1966,8 @@ func (manager *SnapshotManager) PruneSnapshots(selfID string, snapshotID string,
snapshot.Flag = true snapshot.Flag = true
toBeDeleted++ toBeDeleted++
} else if lastSnapshotTime != 0 && } else if lastSnapshotTime != 0 &&
int(snapshot.StartTime-lastSnapshotTime) < retentionPolicies[i].Interval*secondsInDay-600 { getDaysBetween(lastSnapshotTime, snapshot.StartTime) < retentionPolicies[i].Interval {
// Delete the snapshot if it is too close to the last kept one. Note that a tolerance of 10 // Delete the snapshot if it is too close to the last kept one.
// minutes was subtracted from the interval.
LOG_DEBUG("SNAPSHOT_DELETE", "Snapshot %s at revision %d to be deleted - older than %d days, less than %d days from previous", LOG_DEBUG("SNAPSHOT_DELETE", "Snapshot %s at revision %d to be deleted - older than %d days, less than %d days from previous",
snapshot.ID, snapshot.Revision, retentionPolicies[i].Age, retentionPolicies[i].Interval) snapshot.ID, snapshot.Revision, retentionPolicies[i].Age, retentionPolicies[i].Interval)
snapshot.Flag = true snapshot.Flag = true
@@ -2191,7 +2236,7 @@ func (manager *SnapshotManager) pruneSnapshotsExhaustive(referencedFossils map[s
continue continue
} }
manager.chunkOperator.Resurrect(chunk, chunkDir + file) manager.chunkOperator.Resurrect(chunk, chunkDir+file)
fmt.Fprintf(logFile, "Found referenced fossil %s\n", file) fmt.Fprintf(logFile, "Found referenced fossil %s\n", file)
} else { } else {
@@ -2202,7 +2247,7 @@ func (manager *SnapshotManager) pruneSnapshotsExhaustive(referencedFossils map[s
} }
if exclusive { if exclusive {
manager.chunkOperator.Delete(chunk, chunkDir + file) manager.chunkOperator.Delete(chunk, chunkDir+file)
} else { } else {
collection.AddFossil(chunkDir + file) collection.AddFossil(chunkDir + file)
LOG_DEBUG("FOSSIL_FIND", "Found unreferenced fossil %s", file) LOG_DEBUG("FOSSIL_FIND", "Found unreferenced fossil %s", file)
@@ -2217,7 +2262,7 @@ func (manager *SnapshotManager) pruneSnapshotsExhaustive(referencedFossils map[s
chunk := strings.Replace(file, "/", "", -1) chunk := strings.Replace(file, "/", "", -1)
if !chunkRegex.MatchString(chunk) { if !chunkRegex.MatchString(chunk) {
LOG_WARN("CHUNK_UNKONWN_FILE", "File %s is not a chunk", file) LOG_WARN("CHUNK_UNKNOWN_FILE", "File %s is not a chunk", file)
continue continue
} }
@@ -2379,7 +2424,7 @@ func (manager *SnapshotManager) DownloadFile(path string, derivationKey string)
} }
if len(derivationKey) > 64 { if len(derivationKey) > 64 {
derivationKey = derivationKey[len(derivationKey) - 64:] derivationKey = derivationKey[len(derivationKey)-64:]
} }
err = manager.fileChunk.Decrypt(manager.config.FileKey, derivationKey) err = manager.fileChunk.Decrypt(manager.config.FileKey, derivationKey)
@@ -2413,7 +2458,7 @@ func (manager *SnapshotManager) UploadFile(path string, derivationKey string, co
} }
if len(derivationKey) > 64 { if len(derivationKey) > 64 {
derivationKey = derivationKey[len(derivationKey) - 64:] derivationKey = derivationKey[len(derivationKey)-64:]
} }
err := manager.fileChunk.Encrypt(manager.config.FileKey, derivationKey) err := manager.fileChunk.Encrypt(manager.config.FileKey, derivationKey)

View File

@@ -9,12 +9,12 @@ import (
"encoding/hex" "encoding/hex"
"encoding/json" "encoding/json"
"fmt" "fmt"
"io/ioutil"
"os" "os"
"path" "path"
"strings" "strings"
"testing" "testing"
"time" "time"
"io/ioutil"
) )
func createDummySnapshot(snapshotID string, revision int, endTime int64) *Snapshot { func createDummySnapshot(snapshotID string, revision int, endTime int64) *Snapshot {
@@ -500,7 +500,7 @@ func TestPruneWithRetentionPolicyAndTag(t *testing.T) {
t.Logf("Creating 30 snapshots") t.Logf("Creating 30 snapshots")
for i := 0; i < 30; i++ { for i := 0; i < 30; i++ {
tag := "auto" tag := "auto"
if i % 3 == 0 { if i%3 == 0 {
tag = "manual" tag = "manual"
} }
createTestSnapshot(snapshotManager, "vm1@host1", i+1, now-int64(30-i)*day-3600, now-int64(30-i)*day-60, []string{chunkHashes[i]}, tag) createTestSnapshot(snapshotManager, "vm1@host1", i+1, now-int64(30-i)*day-3600, now-int64(30-i)*day-60, []string{chunkHashes[i]}, tag)
@@ -615,12 +615,12 @@ func TestPruneNewSnapshots(t *testing.T) {
// Create another snapshot of vm1 that brings back chunkHash1 // Create another snapshot of vm1 that brings back chunkHash1
createTestSnapshot(snapshotManager, "vm1@host1", 3, now-0*day-3600, now-0*day-60, []string{chunkHash1, chunkHash3}, "tag") createTestSnapshot(snapshotManager, "vm1@host1", 3, now-0*day-3600, now-0*day-60, []string{chunkHash1, chunkHash3}, "tag")
// Create another snapshot of vm2 so the fossil collection will be processed by next prune // Create another snapshot of vm2 so the fossil collection will be processed by next prune
createTestSnapshot(snapshotManager, "vm2@host1", 2, now + 3600, now + 3600 * 2, []string{chunkHash4, chunkHash5}, "tag") createTestSnapshot(snapshotManager, "vm2@host1", 2, now+3600, now+3600*2, []string{chunkHash4, chunkHash5}, "tag")
// Now chunkHash1 wil be resurrected // Now chunkHash1 wil be resurrected
snapshotManager.PruneSnapshots("vm1@host1", "vm1@host1", []int{}, []string{}, []string{}, false, false, []string{}, false, false, false, 1) snapshotManager.PruneSnapshots("vm1@host1", "vm1@host1", []int{}, []string{}, []string{}, false, false, []string{}, false, false, false, 1)
checkTestSnapshots(snapshotManager, 4, 0) checkTestSnapshots(snapshotManager, 4, 0)
snapshotManager.CheckSnapshots("vm1@host1", []int{2, 3}, "", false, false, false, false, false); snapshotManager.CheckSnapshots("vm1@host1", []int{2, 3}, "", false, false, false, false, false)
} }
// A fossil collection left by an aborted prune should be ignored if any supposedly deleted snapshot exists // A fossil collection left by an aborted prune should be ignored if any supposedly deleted snapshot exists
@@ -664,12 +664,12 @@ func TestPruneGhostSnapshots(t *testing.T) {
// Create another snapshot of vm1 so the fossil collection becomes eligible for processing. // Create another snapshot of vm1 so the fossil collection becomes eligible for processing.
chunkHash4 := uploadRandomChunk(snapshotManager, chunkSize) chunkHash4 := uploadRandomChunk(snapshotManager, chunkSize)
createTestSnapshot(snapshotManager, "vm1@host1", 3, now - day - 3600, now - day - 60, []string{chunkHash3, chunkHash4}, "tag") createTestSnapshot(snapshotManager, "vm1@host1", 3, now-day-3600, now-day-60, []string{chunkHash3, chunkHash4}, "tag")
// Run the prune again but the fossil collection should be igored, since revision 1 still exists // Run the prune again but the fossil collection should be igored, since revision 1 still exists
snapshotManager.PruneSnapshots("vm1@host1", "vm1@host1", []int{}, []string{}, []string{}, false, false, []string{}, false, false, false, 1) snapshotManager.PruneSnapshots("vm1@host1", "vm1@host1", []int{}, []string{}, []string{}, false, false, []string{}, false, false, false, 1)
checkTestSnapshots(snapshotManager, 3, 2) checkTestSnapshots(snapshotManager, 3, 2)
snapshotManager.CheckSnapshots("vm1@host1", []int{1, 2, 3}, "", false, false, false, true /*searchFossils*/, false); snapshotManager.CheckSnapshots("vm1@host1", []int{1, 2, 3}, "", false, false, false, true /*searchFossils*/, false)
// Prune snapshot 1 again // Prune snapshot 1 again
snapshotManager.PruneSnapshots("vm1@host1", "vm1@host1", []int{1}, []string{}, []string{}, false, false, []string{}, false, false, false, 1) snapshotManager.PruneSnapshots("vm1@host1", "vm1@host1", []int{1}, []string{}, []string{}, false, false, []string{}, false, false, false, 1)
@@ -677,11 +677,11 @@ func TestPruneGhostSnapshots(t *testing.T) {
// Create another snapshot // Create another snapshot
chunkHash5 := uploadRandomChunk(snapshotManager, chunkSize) chunkHash5 := uploadRandomChunk(snapshotManager, chunkSize)
createTestSnapshot(snapshotManager, "vm1@host1", 4, now + 3600, now + 3600 * 2, []string{chunkHash5, chunkHash5}, "tag") createTestSnapshot(snapshotManager, "vm1@host1", 4, now+3600, now+3600*2, []string{chunkHash5, chunkHash5}, "tag")
checkTestSnapshots(snapshotManager, 3, 2) checkTestSnapshots(snapshotManager, 3, 2)
// Run the prune again and this time the fossil collection will be processed and the fossils removed // Run the prune again and this time the fossil collection will be processed and the fossils removed
snapshotManager.PruneSnapshots("vm1@host1", "vm1@host1", []int{}, []string{}, []string{}, false, false, []string{}, false, false, false, 1) snapshotManager.PruneSnapshots("vm1@host1", "vm1@host1", []int{}, []string{}, []string{}, false, false, []string{}, false, false, false, 1)
checkTestSnapshots(snapshotManager, 3, 0) checkTestSnapshots(snapshotManager, 3, 0)
snapshotManager.CheckSnapshots("vm1@host1", []int{2, 3, 4}, "", false, false, false, false, false); snapshotManager.CheckSnapshots("vm1@host1", []int{2, 3, 4}, "", false, false, false, false, false)
} }

View File

@@ -89,7 +89,7 @@ func (storage *StorageBase) SetRateLimits(downloadRateLimit int, uploadRateLimit
} }
// SetDefaultNestingLevels sets the default read and write levels. This is usually called by // SetDefaultNestingLevels sets the default read and write levels. This is usually called by
// derived storages to set the levels with old values so that storages initialied by ealier versions // derived storages to set the levels with old values so that storages initialized by earlier versions
// will continue to work. // will continue to work.
func (storage *StorageBase) SetDefaultNestingLevels(readLevels []int, writeLevel int) { func (storage *StorageBase) SetDefaultNestingLevels(readLevels []int, writeLevel int) {
storage.readLevels = readLevels storage.readLevels = readLevels
@@ -291,6 +291,7 @@ func CreateStorage(preference Preference, resetPassword bool, threads int) (stor
// If ssh_key_file is set, skip password-based login // If ssh_key_file is set, skip password-based login
keyFile := GetPasswordFromPreference(preference, "ssh_key_file") keyFile := GetPasswordFromPreference(preference, "ssh_key_file")
passphrase := ""
password := "" password := ""
passwordCallback := func() (string, error) { passwordCallback := func() (string, error) {
@@ -348,7 +349,20 @@ func CreateStorage(preference Preference, resetPassword bool, threads int) (stor
} else { } else {
key, err = ssh.ParsePrivateKey(content) key, err = ssh.ParsePrivateKey(content)
if err != nil { if err != nil {
LOG_INFO("SSH_PUBLICKEY", "Failed to parse the private key file %s: %v", keyFile, err) if strings.Contains(err.Error(), "cannot decode encrypted private keys") {
LOG_TRACE("SSH_PUBLICKEY", "The private key file is encrypted")
passphrase = GetPassword(preference, "ssh_passphrase", "Enter the passphrase to decrypt the private key file:", false, resetPassword)
if len(passphrase) == 0 {
LOG_INFO("SSH_PUBLICKEY", "No passphrase to descrypt the private key file %s", keyFile)
} else {
key, err = ssh.ParsePrivateKeyWithPassphrase(content, []byte(passphrase))
if err != nil {
LOG_INFO("SSH_PUBLICKEY", "Failed to parse the encrypted private key file %s: %v", keyFile, err)
}
}
} else {
LOG_INFO("SSH_PUBLICKEY", "Failed to parse the private key file %s: %v", keyFile, err)
}
} }
} }
} }
@@ -410,6 +424,9 @@ func CreateStorage(preference Preference, resetPassword bool, threads int) (stor
if keyFile != "" { if keyFile != "" {
SavePassword(preference, "ssh_key_file", keyFile) SavePassword(preference, "ssh_key_file", keyFile)
if passphrase != "" {
SavePassword(preference, "ssh_passphrase", passphrase)
}
} else if password != "" { } else if password != "" {
SavePassword(preference, "ssh_password", password) SavePassword(preference, "ssh_password", password)
} }
@@ -509,11 +526,12 @@ func CreateStorage(preference Preference, resetPassword bool, threads int) (stor
return dropboxStorage return dropboxStorage
} else if matched[1] == "b2" { } else if matched[1] == "b2" {
bucket := matched[3] bucket := matched[3]
storageDir := matched[5]
accountID := GetPassword(preference, "b2_id", "Enter Backblaze Account ID:", true, resetPassword) accountID := GetPassword(preference, "b2_id", "Enter Backblaze account or application id:", true, resetPassword)
applicationKey := GetPassword(preference, "b2_key", "Enter Backblaze Application Key:", true, resetPassword) applicationKey := GetPassword(preference, "b2_key", "Enter corresponding Backblaze application key:", true, resetPassword)
b2Storage, err := CreateB2Storage(accountID, applicationKey, bucket, threads) b2Storage, err := CreateB2Storage(accountID, applicationKey, bucket, storageDir, threads)
if err != nil { if err != nil {
LOG_ERROR("STORAGE_CREATE", "Failed to load the Backblaze B2 storage at %s: %v", storageURL, err) LOG_ERROR("STORAGE_CREATE", "Failed to load the Backblaze B2 storage at %s: %v", storageURL, err)
return nil return nil
@@ -609,7 +627,7 @@ func CreateStorage(preference Preference, resetPassword bool, threads int) (stor
} else if matched[1] == "webdav" || matched[1] == "webdav-http" { } else if matched[1] == "webdav" || matched[1] == "webdav-http" {
server := matched[3] server := matched[3]
username := matched[2] username := matched[2]
username = username[:len(username) - 1] username = username[:len(username)-1]
storageDir := matched[5] storageDir := matched[5]
port := 0 port := 0
useHTTP := matched[1] == "webdav-http" useHTTP := matched[1] == "webdav-http"

View File

@@ -80,12 +80,12 @@ func loadStorage(localStoragePath string, threads int) (Storage, error) {
return storage, err return storage, err
} else if testStorageName == "s3" { } else if testStorageName == "s3" {
storage, err := CreateS3Storage(config["region"], config["endpoint"], config["bucket"], config["directory"], config["access_key"], config["secret_key"], threads, true, false) storage, err := CreateS3Storage(config["region"], config["endpoint"], config["bucket"], config["directory"], config["access_key"], config["secret_key"], threads, true, false)
return storage, err
storage.SetDefaultNestingLevels([]int{2, 3}, 2) storage.SetDefaultNestingLevels([]int{2, 3}, 2)
return storage, err
} else if testStorageName == "wasabi" { } else if testStorageName == "wasabi" {
storage, err := CreateWasabiStorage(config["region"], config["endpoint"], config["bucket"], config["directory"], config["access_key"], config["secret_key"], threads) storage, err := CreateWasabiStorage(config["region"], config["endpoint"], config["bucket"], config["directory"], config["access_key"], config["secret_key"], threads)
return storage, err
storage.SetDefaultNestingLevels([]int{2, 3}, 2) storage.SetDefaultNestingLevels([]int{2, 3}, 2)
return storage, err
} else if testStorageName == "s3c" { } else if testStorageName == "s3c" {
storage, err := CreateS3CStorage(config["region"], config["endpoint"], config["bucket"], config["directory"], config["access_key"], config["secret_key"], threads) storage, err := CreateS3CStorage(config["region"], config["endpoint"], config["bucket"], config["directory"], config["access_key"], config["secret_key"], threads)
storage.SetDefaultNestingLevels([]int{2, 3}, 2) storage.SetDefaultNestingLevels([]int{2, 3}, 2)
@@ -107,7 +107,7 @@ func loadStorage(localStoragePath string, threads int) (Storage, error) {
storage.SetDefaultNestingLevels([]int{2, 3}, 2) storage.SetDefaultNestingLevels([]int{2, 3}, 2)
return storage, err return storage, err
} else if testStorageName == "b2" { } else if testStorageName == "b2" {
storage, err := CreateB2Storage(config["account"], config["key"], config["bucket"], threads) storage, err := CreateB2Storage(config["account"], config["key"], config["bucket"], config["directory"], threads)
storage.SetDefaultNestingLevels([]int{2, 3}, 2) storage.SetDefaultNestingLevels([]int{2, 3}, 2)
return storage, err return storage, err
} else if testStorageName == "gcs-s3" { } else if testStorageName == "gcs-s3" {
@@ -153,10 +153,7 @@ func loadStorage(localStoragePath string, threads int) (Storage, error) {
} }
storage.SetDefaultNestingLevels([]int{2, 3}, 2) storage.SetDefaultNestingLevels([]int{2, 3}, 2)
return storage, err return storage, err
} else {
return nil, fmt.Errorf("Invalid storage named: %s", testStorageName)
} }
return nil, fmt.Errorf("Invalid storage named: %s", testStorageName) return nil, fmt.Errorf("Invalid storage named: %s", testStorageName)
} }
@@ -299,7 +296,8 @@ func TestStorage(t *testing.T) {
LOG_INFO("STORAGE_TEST", "storage: %s", testStorageName) LOG_INFO("STORAGE_TEST", "storage: %s", testStorageName)
storage, err := loadStorage(testDir, 1) threads := 8
storage, err := loadStorage(testDir, threads)
if err != nil { if err != nil {
t.Errorf("Failed to create storage: %v", err) t.Errorf("Failed to create storage: %v", err)
return return
@@ -329,16 +327,16 @@ func TestStorage(t *testing.T) {
storage.CreateDirectory(0, "shared") storage.CreateDirectory(0, "shared")
// Upload to the same directory by multiple goroutines // Upload to the same directory by multiple goroutines
count := 8 count := threads
finished := make(chan int, count) finished := make(chan int, count)
for i := 0; i < count; i++ { for i := 0; i < count; i++ {
go func(name string) { go func(threadIndex int, name string) {
err := storage.UploadFile(0, name, []byte("this is a test file")) err := storage.UploadFile(threadIndex, name, []byte("this is a test file"))
if err != nil { if err != nil {
t.Errorf("Error to upload '%s': %v", name, err) t.Errorf("Error to upload '%s': %v", name, err)
} }
finished <- 0 finished <- 0
}(fmt.Sprintf("shared/a/b/c/%d", i)) }(i, fmt.Sprintf("shared/a/b/c/%d", i))
} }
for i := 0; i < count; i++ { for i := 0; i < count; i++ {
@@ -387,7 +385,6 @@ func TestStorage(t *testing.T) {
snapshotIDs := []string{} snapshotIDs := []string{}
for _, snapshotDir := range snapshotDirs { for _, snapshotDir := range snapshotDirs {
LOG_INFO("debug", "snapshot dir: %s", snapshotDir)
if len(snapshotDir) > 0 && snapshotDir[len(snapshotDir)-1] == '/' { if len(snapshotDir) > 0 && snapshotDir[len(snapshotDir)-1] == '/' {
snapshotIDs = append(snapshotIDs, snapshotDir[:len(snapshotDir)-1]) snapshotIDs = append(snapshotIDs, snapshotDir[:len(snapshotDir)-1])
} }

View File

@@ -21,7 +21,7 @@ type SwiftStorage struct {
threads int threads int
} }
// CreateSwiftStorage creates an OpenStack Swift storage object. storageURL is in the form of // CreateSwiftStorage creates an OpenStack Swift storage object. storageURL is in the form of
// `user@authURL/container/path?arg1=value1&arg2=value2`` // `user@authURL/container/path?arg1=value1&arg2=value2``
func CreateSwiftStorage(storageURL string, key string, threads int) (storage *SwiftStorage, err error) { func CreateSwiftStorage(storageURL string, key string, threads int) (storage *SwiftStorage, err error) {

View File

@@ -10,10 +10,7 @@ import (
"fmt" "fmt"
"io" "io"
"os" "os"
"path"
"path/filepath"
"regexp" "regexp"
"runtime"
"strconv" "strconv"
"strings" "strings"
"time" "time"
@@ -58,7 +55,7 @@ func IsEmptyFilter(pattern string) bool {
} }
func IsUnspecifiedFilter(pattern string) bool { func IsUnspecifiedFilter(pattern string) bool {
if pattern[0] != '+' && pattern[0] != '-' && pattern[0] != 'i' && pattern[0] != 'e' { if pattern[0] != '+' && pattern[0] != '-' && !strings.HasPrefix(pattern, "i:") && !strings.HasPrefix(pattern, "e:") {
return true return true
} else { } else {
return false return false
@@ -176,6 +173,15 @@ func GetPasswordFromPreference(preference Preference, passwordType string) strin
if password, found := os.LookupEnv(name); found && password != "" { if password, found := os.LookupEnv(name); found && password != "" {
return password return password
} }
re := regexp.MustCompile(`[^a-zA-Z0-9_]`)
namePlain := re.ReplaceAllString(name, "_")
if namePlain != name {
LOG_DEBUG("PASSWORD_ENV_VAR", "Reading the environment variable %s", namePlain)
if password, found := os.LookupEnv(namePlain); found && password != "" {
return password
}
}
} }
// If the password is stored in the preference, there is no need to include the storage name // If the password is stored in the preference, there is no need to include the storage name
@@ -390,19 +396,6 @@ func MatchPath(filePath string, patterns []string) (included bool) {
} }
} }
func joinPath(components ...string) string {
combinedPath := path.Join(components...)
if len(combinedPath) > 257 && runtime.GOOS == "windows" {
combinedPath = `\\?\` + filepath.Join(components...)
// If the path is on a samba drive we must use the UNC format
if strings.HasPrefix(combinedPath, `\\?\\\`) {
combinedPath = `\\?\UNC\` + combinedPath[6:]
}
}
return combinedPath
}
func PrettyNumber(number int64) string { func PrettyNumber(number int64) string {
G := int64(1024 * 1024 * 1024) G := int64(1024 * 1024 * 1024)
@@ -467,10 +460,3 @@ func AtoSize(sizeString string) int {
return size return size
} }
func MinInt(x, y int) int {
if x < y {
return x
}
return y
}

View File

@@ -9,6 +9,7 @@ package duplicacy
import ( import (
"bytes" "bytes"
"os" "os"
"path"
"path/filepath" "path/filepath"
"syscall" "syscall"
@@ -35,7 +36,7 @@ func SetOwner(fullPath string, entry *Entry, fileInfo *os.FileInfo) bool {
stat, ok := (*fileInfo).Sys().(*syscall.Stat_t) stat, ok := (*fileInfo).Sys().(*syscall.Stat_t)
if ok && stat != nil && (int(stat.Uid) != entry.UID || int(stat.Gid) != entry.GID) { if ok && stat != nil && (int(stat.Uid) != entry.UID || int(stat.Gid) != entry.GID) {
if entry.UID != -1 && entry.GID != -1 { if entry.UID != -1 && entry.GID != -1 {
err := os.Chown(fullPath, entry.UID, entry.GID) err := os.Lchown(fullPath, entry.UID, entry.GID)
if err != nil { if err != nil {
LOG_ERROR("RESTORE_CHOWN", "Failed to change uid or gid: %v", err) LOG_ERROR("RESTORE_CHOWN", "Failed to change uid or gid: %v", err)
return false return false
@@ -83,3 +84,7 @@ func (entry *Entry) SetAttributesToFile(fullPath string) {
} }
} }
func joinPath(components ...string) string {
return path.Join(components...)
}

View File

@@ -92,6 +92,17 @@ func TestMatchPattern(t *testing.T) {
} }
} }
for _, pattern := range []string{ "+", "-", "i:", "e:", "+a", "-a", "i:a", "e:a"} {
if IsUnspecifiedFilter(pattern) {
t.Errorf("pattern %s has a specified filter", pattern)
}
}
for _, pattern := range []string{ "i", "e", "ia", "ib", "a", "b"} {
if !IsUnspecifiedFilter(pattern) {
t.Errorf("pattern %s does not have a specified filter", pattern)
}
}
} }
func TestRateLimit(t *testing.T) { func TestRateLimit(t *testing.T) {

View File

@@ -7,6 +7,8 @@ package duplicacy
import ( import (
"fmt" "fmt"
"os" "os"
"path/filepath"
"strings"
"syscall" "syscall"
"unsafe" "unsafe"
) )
@@ -114,3 +116,13 @@ func (entry *Entry) ReadAttributes(top string) {
func (entry *Entry) SetAttributesToFile(fullPath string) { func (entry *Entry) SetAttributesToFile(fullPath string) {
} }
func joinPath(components ...string) string {
combinedPath := `\\?\` + filepath.Join(components...)
// If the path is on a samba drive we must use the UNC format
if strings.HasPrefix(combinedPath, `\\?\\\`) {
combinedPath = `\\?\UNC\` + combinedPath[6:]
}
return combinedPath
}

View File

@@ -93,49 +93,49 @@ func (storage *WasabiStorage) DeleteFile(
// rename. It's designed to get the job done with as few dependencies // rename. It's designed to get the job done with as few dependencies
// on other packages as possible rather than being somethng // on other packages as possible rather than being somethng
// general-purpose and reusable. // general-purpose and reusable.
func (storage *WasabiStorage) MoveFile( func (storage *WasabiStorage) MoveFile(threadIndex int, from string, to string) (err error) {
threadIndex int, from string, to string,
) (err error) {
var from_path string var fromPath string
// The from path includes the bucket. Take care not to include an empty storageDir // The from path includes the bucket. Take care not to include an empty storageDir
// string as Wasabi's backend will return 404 on URLs with double slashes. // string as Wasabi's backend will return 404 on URLs with double slashes.
if (storage.storageDir == "") { if storage.storageDir == "" {
from_path = fmt.Sprintf("/%s/%s", storage.bucket, from) fromPath = fmt.Sprintf("/%s/%s", storage.bucket, from)
} else { } else {
from_path = fmt.Sprintf("/%s/%s/%s", storage.bucket, storage.storageDir, from) fromPath = fmt.Sprintf("/%s/%s/%s", storage.bucket, storage.storageDir, from)
} }
object := fmt.Sprintf("https://%s@%s%s", object := fmt.Sprintf("https://%s@%s%s", storage.region, storage.endpoint, fromPath)
storage.region, storage.endpoint, from_path)
toPath := to
// The object's new name is relative to the top of the bucket. // The object's new name is relative to the top of the bucket.
new_name := fmt.Sprintf("%s/%s", storage.storageDir, to) if storage.storageDir != "" {
toPath = fmt.Sprintf("%s/%s", storage.storageDir, to)
}
timestamp := time.Now().Format(time.RFC1123Z) timestamp := time.Now().Format(time.RFC1123Z)
signing_string := fmt.Sprintf("MOVE\n\n\n%s\n%s", timestamp, from_path) signingString := fmt.Sprintf("MOVE\n\n\n%s\n%s", timestamp, fromPath)
signer := hmac.New(sha1.New, []byte(storage.secret)) signer := hmac.New(sha1.New, []byte(storage.secret))
signer.Write([]byte(signing_string)) signer.Write([]byte(signingString))
signature := base64.StdEncoding.EncodeToString(signer.Sum(nil)) signature := base64.StdEncoding.EncodeToString(signer.Sum(nil))
authorization := fmt.Sprintf("AWS %s:%s", storage.key, signature) authorization := fmt.Sprintf("AWS %s:%s", storage.key, signature)
request, error := http.NewRequest("MOVE", object, nil) request, err := http.NewRequest("MOVE", object, nil)
if error != nil { if err != nil {
return error return err
} }
request.Header.Add("Authorization", authorization) request.Header.Add("Authorization", authorization)
request.Header.Add("Date", timestamp) request.Header.Add("Date", timestamp)
request.Header.Add("Destination", new_name) request.Header.Add("Destination", toPath)
request.Header.Add("Host", storage.endpoint) request.Header.Add("Host", storage.endpoint)
request.Header.Add("Overwrite", "true") request.Header.Add("Overwrite", "true")
response, error := storage.client.Do(request) response, err := storage.client.Do(request)
if error != nil { if err != nil {
return error return err
} }
defer response.Body.Close() defer response.Body.Close()

View File

@@ -19,9 +19,9 @@ import (
"net/http" "net/http"
//"net/http/httputil" //"net/http/httputil"
"strconv" "strconv"
"strings"
"sync" "sync"
"time" "time"
"strings"
) )
type WebDAVStorage struct { type WebDAVStorage struct {
@@ -42,14 +42,14 @@ type WebDAVStorage struct {
var ( var (
errWebDAVAuthorizationFailure = errors.New("Authentication failed") errWebDAVAuthorizationFailure = errors.New("Authentication failed")
errWebDAVMovedPermanently = errors.New("Moved permanently") errWebDAVMovedPermanently = errors.New("Moved permanently")
errWebDAVNotExist = errors.New("Path does not exist") errWebDAVNotExist = errors.New("Path does not exist")
errWebDAVMaximumBackoff = errors.New("Maximum backoff reached") errWebDAVMaximumBackoff = errors.New("Maximum backoff reached")
errWebDAVMethodNotAllowed = errors.New("Method not allowed") errWebDAVMethodNotAllowed = errors.New("Method not allowed")
) )
func CreateWebDAVStorage(host string, port int, username string, password string, storageDir string, useHTTP bool, threads int) (storage *WebDAVStorage, err error) { func CreateWebDAVStorage(host string, port int, username string, password string, storageDir string, useHTTP bool, threads int) (storage *WebDAVStorage, err error) {
if storageDir[len(storageDir)-1] != '/' { if len(storageDir) > 0 && storageDir[len(storageDir)-1] != '/' {
storageDir += "/" storageDir += "/"
} }
@@ -59,7 +59,7 @@ func CreateWebDAVStorage(host string, port int, username string, password string
username: username, username: username,
password: password, password: password,
storageDir: "", storageDir: "",
useHTTP: false, useHTTP: useHTTP,
client: http.DefaultClient, client: http.DefaultClient,
threads: threads, threads: threads,
@@ -68,7 +68,7 @@ func CreateWebDAVStorage(host string, port int, username string, password string
// Make sure it doesn't follow redirect // Make sure it doesn't follow redirect
storage.client.CheckRedirect = func(req *http.Request, via []*http.Request) error { storage.client.CheckRedirect = func(req *http.Request, via []*http.Request) error {
return http.ErrUseLastResponse return http.ErrUseLastResponse
} }
exist, isDir, _, err := storage.GetFileInfo(0, storageDir) exist, isDir, _, err := storage.GetFileInfo(0, storageDir)
@@ -151,6 +151,10 @@ func (storage *WebDAVStorage) sendRequest(method string, uri string, depth int,
request.Header.Set(key, value) request.Header.Set(key, value)
} }
if method == "PUT" {
request.ContentLength = int64(len(data))
}
//requestDump, err := httputil.DumpRequest(request, true) //requestDump, err := httputil.DumpRequest(request, true)
//LOG_INFO("debug", "Request: %s", requestDump) //LOG_INFO("debug", "Request: %s", requestDump)
@@ -313,6 +317,7 @@ func (storage *WebDAVStorage) ListFiles(threadIndex int, dir string) (files []st
// GetFileInfo returns the information about the file or directory at 'filePath'. // GetFileInfo returns the information about the file or directory at 'filePath'.
func (storage *WebDAVStorage) GetFileInfo(threadIndex int, filePath string) (exist bool, isDir bool, size int64, err error) { func (storage *WebDAVStorage) GetFileInfo(threadIndex int, filePath string) (exist bool, isDir bool, size int64, err error) {
properties, err := storage.getProperties(filePath, 0, "getcontentlength", "resourcetype") properties, err := storage.getProperties(filePath, 0, "getcontentlength", "resourcetype")
if err != nil { if err != nil {
if err == errWebDAVNotExist { if err == errWebDAVNotExist {
@@ -325,11 +330,18 @@ func (storage *WebDAVStorage) GetFileInfo(threadIndex int, filePath string) (exi
return false, false, 0, err return false, false, 0, err
} }
if m, exist := properties["/" + storage.storageDir + filePath]; !exist { m, exist := properties["/"+storage.storageDir+filePath]
// If no properties exist for the given filePath, remove the trailing / from filePath and search again
if !exist && filePath != "" && filePath[len(filePath) - 1] == '/' {
m, exist = properties["/"+storage.storageDir+filePath[:len(filePath) - 1]]
}
if !exist {
return false, false, 0, nil return false, false, 0, nil
} else if resourceType, exist := m["resourcetype"]; exist && strings.Contains(resourceType, "collection") { } else if resourceType, exist := m["resourcetype"]; exist && strings.Contains(resourceType, "collection") {
return true, true, 0, nil return true, true, 0, nil
} else if length, exist := m["getcontentlength"]; exist && length != ""{ } else if length, exist := m["getcontentlength"]; exist && length != "" {
value, _ := strconv.Atoi(length) value, _ := strconv.Atoi(length)
return true, false, int64(value), nil return true, false, int64(value), nil
} else { } else {