1
0
mirror of https://github.com/gilbertchen/duplicacy synced 2025-12-06 00:03:38 +00:00

Compare commits

...

8 Commits

Author SHA1 Message Date
Gilbert Chen
27ff3e216b Bump version to 3.1.0 2022-12-06 23:28:44 -05:00
Gilbert Chen
1ba204a21b Upgrade go-dropbox to the latest
This is to incorporate the fix:
https://github.com/gilbertchen/go-dropbox/commit/60ebcd

Otherwise the access token won't get updated after token refresh
2022-12-06 23:24:11 -05:00
Gilbert Chen
b8c7594dbf Release the chunk used to download files when finished
Without this fix, a chunk is leaked for each snapshot checked
with `-files`.
2022-12-06 22:46:25 -05:00
Gilbert Chen
58f0d2be5a Fixed a bug that didn't preserve the version bit when copying old snapshots
The version bit should not be set to 1 when encoding a snapshot.  Instead,
it must be set to 1 on snapshot creation.

To correctly process old snapshots encoded incorrectly with version bit set
to 1, the first byte of the encoded file list is also checked.  If the first
byte is `[`, then it must be an old snapshot, since the file list in the new
snapshot format always starts with a string encoded in msgpack, the first
byte of which can't be `[`.
2022-11-22 21:31:24 -05:00
Gilbert Chen
0a794e6fea Fixed test errors and remove obsolete tests 2022-11-15 11:53:46 -05:00
Gilbert Chen
bc2d762e41 Add -rewrite to the check command to fix corrupted chunks
This option is useful only when erasure coding is enabled.  It will
download and re-upload chunks that contain corruption but are
generally recoverable.  It can also be used to fix chunks that
are created by 3.0.1 on arm64 machines with wrong hashes.
2022-11-15 11:47:02 -05:00
Gilbert Chen
6a7a2c8048 Upgrade github.com/minio/highwayhash to 1.0.2
highwayhash 1.0.1 contains a bug leading to incorrect hashes on arm64 machines.
The 1.0.1 version is retained in github.com/gilbertchen/highwayhash so the hash
can be checked again if a mismatch is detected by 1.0.2.
2022-11-09 14:44:24 -05:00
Gilbert Chen
3472206bcf Handle zero-byte files correctly
This commit fixed 2 bugs.  The first bug occurs when an incomplete backup
contains a zero-byte file and no chunks.  The second bug occurs when the
repository contains only zero-byte files.
2022-11-08 22:54:35 -05:00
17 changed files with 280 additions and 723 deletions

View File

@@ -981,10 +981,11 @@ func checkSnapshots(context *cli.Context) {
checkChunks := context.Bool("chunks")
searchFossils := context.Bool("fossils")
resurrect := context.Bool("resurrect")
rewrite := context.Bool("rewrite")
persist := context.Bool("persist")
backupManager.SetupSnapshotCache(preference.Name)
backupManager.SnapshotManager.CheckSnapshots(id, revisions, tag, showStatistics, showTabular, checkFiles, checkChunks, searchFossils, resurrect, threads, persist)
backupManager.SnapshotManager.CheckSnapshots(id, revisions, tag, showStatistics, showTabular, checkFiles, checkChunks, searchFossils, resurrect, rewrite, threads, persist)
runScript(context, preference.Name, "post")
}
@@ -1676,6 +1677,10 @@ func main() {
Name: "resurrect",
Usage: "turn referenced fossils back into chunks",
},
cli.BoolFlag{
Name: "rewrite",
Usage: "rewrite chunks with recoverable corruption",
},
cli.BoolFlag{
Name: "files",
Usage: "verify the integrity of every file",
@@ -2210,7 +2215,7 @@ func main() {
app.Name = "duplicacy"
app.HelpName = "duplicacy"
app.Usage = "A new generation cloud backup tool based on lock-free deduplication"
app.Version = "3.0.1" + " (" + GitCommit + ")"
app.Version = "3.1.0" + " (" + GitCommit + ")"
// Exit with code 2 if an invalid command is provided
app.CommandNotFound = func(context *cli.Context, command string) {

5
go.mod
View File

@@ -9,7 +9,7 @@ require (
github.com/bkaradzic/go-lz4 v1.0.0
github.com/gilbertchen/azure-sdk-for-go v14.1.2-0.20180323033227-8fd4663cab7c+incompatible
github.com/gilbertchen/cli v1.2.1-0.20160223210219-1de0a1836ce9
github.com/gilbertchen/go-dropbox v0.0.0-20221004154447-61204091e804
github.com/gilbertchen/go-dropbox v0.0.0-20221207034530-08c0c180a4f9
github.com/gilbertchen/go-ole v1.2.0
github.com/gilbertchen/goamz v0.0.0-20170712012135-eada9f4e8cc2
github.com/gilbertchen/gopass v0.0.0-20170109162249-bf9dde6d0d2c
@@ -17,7 +17,7 @@ require (
github.com/gilbertchen/xattr v0.0.0-20160926155429-68e7a6806b01
github.com/klauspost/reedsolomon v1.9.9
github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1
github.com/minio/highwayhash v1.0.1
github.com/minio/highwayhash v1.0.2
github.com/ncw/swift/v2 v2.0.1
github.com/pkg/sftp v1.11.0
github.com/pkg/xattr v0.4.1
@@ -34,6 +34,7 @@ require (
github.com/calebcase/tmpfile v1.0.3 // indirect
github.com/dgrijalva/jwt-go v3.2.0+incompatible // indirect
github.com/dnaeon/go-vcr v1.2.0 // indirect
github.com/gilbertchen/highwayhash v0.0.0-20221109044721-eeab1f4799d8 // indirect
github.com/goamz/goamz v0.0.0-20180131231218-8b901b531db8 // indirect
github.com/godbus/dbus v4.1.0+incompatible // indirect
github.com/gogo/protobuf v1.3.2 // indirect

8
go.sum
View File

@@ -56,12 +56,18 @@ github.com/gilbertchen/cli v1.2.1-0.20160223210219-1de0a1836ce9 h1:uMgtTp4sRJ7kM
github.com/gilbertchen/cli v1.2.1-0.20160223210219-1de0a1836ce9/go.mod h1:WOnN3JdZiZwUaYtLH2DRxe5PpD43wuOIvc/Wem/39M0=
github.com/gilbertchen/go-dropbox v0.0.0-20221004154447-61204091e804 h1:JZ0P02xoeaITbKLFAdBfiH8SNNvKGE2Y/RLdYtWoEVE=
github.com/gilbertchen/go-dropbox v0.0.0-20221004154447-61204091e804/go.mod h1:85+2CRHC/klHy4vEM+TYtbhDo2wMjPa4JNdVzUHsDIk=
github.com/gilbertchen/go-dropbox v0.0.0-20221128142034-9910c19f1d13 h1:54e1HiEXNXGif2PaQHizdGvszIDFE+2yIGzHMUYOQnQ=
github.com/gilbertchen/go-dropbox v0.0.0-20221128142034-9910c19f1d13/go.mod h1:85+2CRHC/klHy4vEM+TYtbhDo2wMjPa4JNdVzUHsDIk=
github.com/gilbertchen/go-dropbox v0.0.0-20221207034530-08c0c180a4f9 h1:3hJHxOyf/rAWWz9GNyai0hSt56vGMATS9B6yjw/bzzk=
github.com/gilbertchen/go-dropbox v0.0.0-20221207034530-08c0c180a4f9/go.mod h1:85+2CRHC/klHy4vEM+TYtbhDo2wMjPa4JNdVzUHsDIk=
github.com/gilbertchen/go-ole v1.2.0 h1:ay65uwxo6w8UVOxN0+fuCqUXGaXxbmkGs5m4uY6e1Zw=
github.com/gilbertchen/go-ole v1.2.0/go.mod h1:NNiozp7QxhyGmHxxNdFKIcVaINvJFTAjBJ2gYzh8fsg=
github.com/gilbertchen/goamz v0.0.0-20170712012135-eada9f4e8cc2 h1:VDPwi3huqeJBtymgLOvPAP4S2gbSSK/UrWVwRbRAmnw=
github.com/gilbertchen/goamz v0.0.0-20170712012135-eada9f4e8cc2/go.mod h1:AoxJeh8meXUrSWBLiq9BJvYMd9RAAGgEUU0gSkNedRY=
github.com/gilbertchen/gopass v0.0.0-20170109162249-bf9dde6d0d2c h1:0SR0aXvil/eQReU0olxp/j04B+Y/47fjDMotIxaAgKo=
github.com/gilbertchen/gopass v0.0.0-20170109162249-bf9dde6d0d2c/go.mod h1:HDsXH7AAfDsfYYX0te4zsNbnwVvZ2RtLEOCjN4y84jw=
github.com/gilbertchen/highwayhash v0.0.0-20221109044721-eeab1f4799d8 h1:ijgl4Y+OKCIFiCPk/Rf9tb6PrarVqitu5TynpyCmRK0=
github.com/gilbertchen/highwayhash v0.0.0-20221109044721-eeab1f4799d8/go.mod h1:0lQcVva56+L1PuUFXLOsJ6arJQaU0baIH8q+IegeBhg=
github.com/gilbertchen/keyring v0.0.0-20221004152639-1661cbebc508 h1:SqTyk5KkNXp7zTdTttIZSDcTrL5uau4K/2OpKvgBZVI=
github.com/gilbertchen/keyring v0.0.0-20221004152639-1661cbebc508/go.mod h1:w/pisxUZezf2XzU9Ewjphcf6q1mZtOzKPHhJiuc8cag=
github.com/gilbertchen/xattr v0.0.0-20160926155429-68e7a6806b01 h1:LqwS9qL6SrDkp0g0iwUkETrDdtB9gTKaIbSn9imUq5o=
@@ -165,6 +171,8 @@ github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1 h1:lYpkrQH5ajf0
github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1/go.mod h1:pD8RvIylQ358TN4wwqatJ8rNavkEINozVn9DtGI3dfQ=
github.com/minio/highwayhash v1.0.1 h1:dZ6IIu8Z14VlC0VpfKofAhCy74wu/Qb5gcn52yWoz/0=
github.com/minio/highwayhash v1.0.1/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY=
github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g=
github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY=
github.com/mmcloughlin/avo v0.0.0-20200803215136-443f81d77104 h1:ULR/QWMgcgRiZLUjSSJMU+fW+RDMstRdmnDWj9Q+AsA=
github.com/mmcloughlin/avo v0.0.0-20200803215136-443f81d77104/go.mod h1:wqKykBG2QzQDJEzvRkcS8x6MiSJkF52hXZsXcjaB3ls=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=

View File

@@ -1,153 +0,0 @@
// Copyright (c) Acrosync LLC. All rights reserved.
// Free for personal use and commercial trial
// Commercial use requires per-user licenses available from https://duplicacy.com
package duplicacy
import (
"crypto/sha256"
"encoding/hex"
"fmt"
"io"
"testing"
crypto_rand "crypto/rand"
"math/rand"
)
func TestACDClient(t *testing.T) {
acdClient, err := NewACDClient("acd-token.json")
if err != nil {
t.Errorf("Failed to create the ACD client: %v", err)
return
}
acdClient.TestMode = true
rootID, _, _, err := acdClient.ListByName("", "")
if err != nil {
t.Errorf("Failed to get the root node: %v", err)
return
}
if rootID == "" {
t.Errorf("No root node")
return
}
testID, _, _, err := acdClient.ListByName(rootID, "test")
if err != nil {
t.Errorf("Failed to list the test directory: %v", err)
return
}
if testID == "" {
testID, err = acdClient.CreateDirectory(rootID, "test")
if err != nil {
t.Errorf("Failed to create the test directory: %v", err)
return
}
}
test1ID, _, _, err := acdClient.ListByName(testID, "test1")
if err != nil {
t.Errorf("Failed to list the test1 directory: %v", err)
return
}
if test1ID == "" {
test1ID, err = acdClient.CreateDirectory(testID, "test1")
if err != nil {
t.Errorf("Failed to create the test1 directory: %v", err)
return
}
}
test2ID, _, _, err := acdClient.ListByName(testID, "test2")
if err != nil {
t.Errorf("Failed to list the test2 directory: %v", err)
return
}
if test2ID == "" {
test2ID, err = acdClient.CreateDirectory(testID, "test2")
if err != nil {
t.Errorf("Failed to create the test2 directory: %v", err)
return
}
}
fmt.Printf("test1: %s, test2: %s\n", test1ID, test2ID)
numberOfFiles := 20
maxFileSize := 64 * 1024
for i := 0; i < numberOfFiles; i++ {
content := make([]byte, rand.Int()%maxFileSize+1)
_, err = crypto_rand.Read(content)
if err != nil {
t.Errorf("Error generating random content: %v", err)
return
}
hasher := sha256.New()
hasher.Write(content)
filename := hex.EncodeToString(hasher.Sum(nil))
fmt.Printf("file: %s\n", filename)
_, err = acdClient.UploadFile(test1ID, filename, content, 100)
if err != nil {
/*if e, ok := err.(ACDError); !ok || e.Status != 409 */ {
t.Errorf("Failed to upload the file %s: %v", filename, err)
return
}
}
}
entries, err := acdClient.ListEntries(test1ID, true, false)
if err != nil {
t.Errorf("Error list randomly generated files: %v", err)
return
}
for _, entry := range entries {
err = acdClient.MoveFile(entry.ID, test1ID, test2ID)
if err != nil {
t.Errorf("Failed to move %s: %v", entry.Name, err)
return
}
}
entries, err = acdClient.ListEntries(test2ID, true, false)
if err != nil {
t.Errorf("Error list randomly generated files: %v", err)
return
}
for _, entry := range entries {
readCloser, _, err := acdClient.DownloadFile(entry.ID)
if err != nil {
t.Errorf("Error downloading file %s: %v", entry.Name, err)
return
}
hasher := sha256.New()
io.Copy(hasher, readCloser)
hash := hex.EncodeToString(hasher.Sum(nil))
if hash != entry.Name {
t.Errorf("File %s, hash %s", entry.Name, hash)
}
readCloser.Close()
}
for _, entry := range entries {
err = acdClient.DeleteFile(entry.ID)
if err != nil {
t.Errorf("Failed to delete the file %s: %v", entry.Name, err)
return
}
}
}

View File

@@ -1,133 +0,0 @@
// Copyright (c) Acrosync LLC. All rights reserved.
// Free for personal use and commercial trial
// Commercial use requires per-user licenses available from https://duplicacy.com
package duplicacy
import (
"crypto/sha256"
"encoding/hex"
"encoding/json"
"testing"
crypto_rand "crypto/rand"
"io"
"io/ioutil"
"math/rand"
)
func createB2ClientForTest(t *testing.T) (*B2Client, string) {
config, err := ioutil.ReadFile("test_storage.conf")
if err != nil {
t.Errorf("Failed to read config file: %v", err)
return nil, ""
}
storages := make(map[string]map[string]string)
err = json.Unmarshal(config, &storages)
if err != nil {
t.Errorf("Failed to parse config file: %v", err)
return nil, ""
}
b2, found := storages["b2"]
if !found {
t.Errorf("Failed to find b2 config")
return nil, ""
}
return NewB2Client(b2["account"], b2["key"], "", b2["directory"], 1), b2["bucket"]
}
func TestB2Client(t *testing.T) {
b2Client, bucket := createB2ClientForTest(t)
if b2Client == nil {
return
}
b2Client.TestMode = true
err, _ := b2Client.AuthorizeAccount(0)
if err != nil {
t.Errorf("Failed to authorize the b2 account: %v", err)
return
}
err = b2Client.FindBucket(bucket)
if err != nil {
t.Errorf("Failed to find bucket '%s': %v", bucket, err)
return
}
testDirectory := "b2client_test/"
files, err := b2Client.ListFileNames(0, testDirectory, false, false)
if err != nil {
t.Errorf("Failed to list files: %v", err)
return
}
for _, file := range files {
err = b2Client.DeleteFile(0, file.FileName, file.FileID)
if err != nil {
t.Errorf("Failed to delete file '%s': %v", file.FileName, err)
}
}
maxSize := 10000
for i := 0; i < 20; i++ {
size := rand.Int()%maxSize + 1
content := make([]byte, size)
_, err := crypto_rand.Read(content)
if err != nil {
t.Errorf("Error generating random content: %v", err)
return
}
hash := sha256.Sum256(content)
name := hex.EncodeToString(hash[:])
err = b2Client.UploadFile(0, testDirectory+name, content, 100)
if err != nil {
t.Errorf("Error uploading file '%s': %v", name, err)
return
}
}
files, err = b2Client.ListFileNames(0, testDirectory, false, false)
if err != nil {
t.Errorf("Failed to list files: %v", err)
return
}
for _, file := range files {
readCloser, _, err := b2Client.DownloadFile(0, file.FileName)
if err != nil {
t.Errorf("Error downloading file '%s': %v", file.FileName, err)
return
}
defer readCloser.Close()
hasher := sha256.New()
_, err = io.Copy(hasher, readCloser)
hash := hex.EncodeToString(hasher.Sum(nil))
if testDirectory+hash != file.FileName {
t.Errorf("File %s has hash %s", file.FileName, hash)
}
}
for _, file := range files {
err = b2Client.DeleteFile(0, file.FileName, file.FileID)
if err != nil {
t.Errorf("Failed to delete file '%s': %v", file.FileName, err)
}
}
}

View File

@@ -223,7 +223,7 @@ func (manager *BackupManager) Backup(top string, quickMode bool, threads int, ta
localListingChannel := make(chan *Entry)
remoteListingChannel := make(chan *Entry)
chunkOperator := CreateChunkOperator(manager.config, manager.storage, manager.snapshotCache, showStatistics, threads, false)
chunkOperator := CreateChunkOperator(manager.config, manager.storage, manager.snapshotCache, showStatistics, false, threads, false)
var skippedDirectories []string
var skippedFiles []string
@@ -301,26 +301,27 @@ func (manager *BackupManager) Backup(top string, quickMode bool, threads int, ta
if compareResult == 0 {
// No need to check if it is in hash mode -- in that case remote listing is nil
if localEntry.IsSameAs(remoteEntry) && localEntry.IsFile() {
if localEntry.Size > 0 {
localEntry.Hash = remoteEntry.Hash
localEntry.StartOffset = remoteEntry.StartOffset
localEntry.EndOffset = remoteEntry.EndOffset
delta := remoteEntry.StartChunk - len(localEntryList.PreservedChunkHashes)
if lastPreservedChunk != remoteEntry.StartChunk {
lastPreservedChunk = remoteEntry.StartChunk
localEntryList.AddPreservedChunk(remoteSnapshot.ChunkHashes[lastPreservedChunk], remoteSnapshot.ChunkLengths[lastPreservedChunk])
} else {
delta++
}
localEntry.Hash = remoteEntry.Hash
localEntry.StartOffset = remoteEntry.StartOffset
localEntry.EndOffset = remoteEntry.EndOffset
delta := remoteEntry.StartChunk - len(localEntryList.PreservedChunkHashes)
if lastPreservedChunk != remoteEntry.StartChunk {
lastPreservedChunk = remoteEntry.StartChunk
localEntryList.AddPreservedChunk(remoteSnapshot.ChunkHashes[lastPreservedChunk], remoteSnapshot.ChunkLengths[lastPreservedChunk])
} else {
delta++
for i := remoteEntry.StartChunk + 1; i <= remoteEntry.EndChunk; i++ {
localEntryList.AddPreservedChunk(remoteSnapshot.ChunkHashes[i], remoteSnapshot.ChunkLengths[i])
lastPreservedChunk = i
}
localEntry.StartChunk = remoteEntry.StartChunk - delta
localEntry.EndChunk = remoteEntry.EndChunk - delta
preservedFileSize += localEntry.Size
}
for i := remoteEntry.StartChunk + 1; i <= remoteEntry.EndChunk; i++ {
localEntryList.AddPreservedChunk(remoteSnapshot.ChunkHashes[i], remoteSnapshot.ChunkLengths[i])
lastPreservedChunk = i
}
localEntry.StartChunk = remoteEntry.StartChunk - delta
localEntry.EndChunk = remoteEntry.EndChunk - delta
preservedFileSize += localEntry.Size
} else {
totalModifiedFileSize += localEntry.Size
if localEntry.Size > 0 {
@@ -672,7 +673,7 @@ func (manager *BackupManager) Restore(top string, revision int, inPlace bool, qu
localListingChannel := make(chan *Entry)
remoteListingChannel := make(chan *Entry)
chunkOperator := CreateChunkOperator(manager.config, manager.storage, manager.snapshotCache, showStatistics, threads, false)
chunkOperator := CreateChunkOperator(manager.config, manager.storage, manager.snapshotCache, showStatistics, false, threads, allowFailures)
LOG_INFO("RESTORE_INDEXING", "Indexing %s", top)
go func() {
@@ -1047,24 +1048,24 @@ func (manager *BackupManager) UploadSnapshot(chunkOperator *ChunkOperator, top s
uploadEntryInfoFunc := func(entry *Entry) error {
delta := entry.StartChunk - len(chunkHashes) + 1
if entry.StartChunk != lastChunk {
chunkHashes = append(chunkHashes, snapshot.ChunkHashes[entry.StartChunk])
chunkLengths = append(chunkLengths, snapshot.ChunkLengths[entry.StartChunk])
delta--
}
if entry.IsFile() && entry.Size > 0 {
delta := entry.StartChunk - len(chunkHashes) + 1
if entry.StartChunk != lastChunk {
chunkHashes = append(chunkHashes, snapshot.ChunkHashes[entry.StartChunk])
chunkLengths = append(chunkLengths, snapshot.ChunkLengths[entry.StartChunk])
delta--
}
for i := entry.StartChunk + 1; i <= entry.EndChunk; i++ {
chunkHashes = append(chunkHashes, snapshot.ChunkHashes[i])
chunkLengths = append(chunkLengths, snapshot.ChunkLengths[i])
}
for i := entry.StartChunk + 1; i <= entry.EndChunk; i++ {
chunkHashes = append(chunkHashes, snapshot.ChunkHashes[i])
chunkLengths = append(chunkLengths, snapshot.ChunkLengths[i])
}
lastChunk = entry.EndChunk
entry.StartChunk -= delta
entry.EndChunk -= delta
lastChunk = entry.EndChunk
entry.StartChunk -= delta
entry.EndChunk -= delta
if entry.IsFile() {
delta := entry.EndChunk - entry.StartChunk
delta = entry.EndChunk - entry.StartChunk
entry.StartChunk -= lastEndChunk
lastEndChunk = entry.EndChunk
entry.EndChunk = delta
@@ -1714,13 +1715,13 @@ func (manager *BackupManager) CopySnapshots(otherManager *BackupManager, snapsho
LOG_INFO("SNAPSHOT_COPY", "Chunks to copy: %d, to skip: %d, total: %d", len(chunksToCopy), len(chunks) - len(chunksToCopy), len(chunks))
chunkDownloader := CreateChunkOperator(manager.config, manager.storage, nil, false, downloadingThreads, false)
chunkDownloader := CreateChunkOperator(manager.config, manager.storage, nil, false, false, downloadingThreads, false)
var uploadedBytes int64
startTime := time.Now()
copiedChunks := 0
chunkUploader := CreateChunkOperator(otherManager.config, otherManager.storage, nil, false, uploadingThreads, false)
chunkUploader := CreateChunkOperator(otherManager.config, otherManager.storage, nil, false, false, uploadingThreads, false)
chunkUploader.UploadCompletionFunc = func(chunk *Chunk, chunkIndex int, skipped bool, chunkSize int, uploadSize int) {
action := "Skipped"
if !skipped {

View File

@@ -358,16 +358,17 @@ func TestBackupManager(t *testing.T) {
if numberOfSnapshots != 3 {
t.Errorf("Expected 3 snapshots but got %d", numberOfSnapshots)
}
backupManager.SnapshotManager.CheckSnapshots( /*snapshotID*/ "host1" /*revisions*/, []int{1, 2, 3} /*tag*/, "",
/*showStatistics*/ false /*showTabular*/, false /*checkFiles*/, false /*checkChunks*/, false /*searchFossils*/, false /*resurrect*/, false, 1 /*allowFailures*/, false)
backupManager.SnapshotManager.CheckSnapshots( /*snapshotID*/ "host1", /*revisions*/ []int{1, 2, 3}, /*tag*/ "", /*showStatistics*/ false,
/*showTabular*/ false, /*checkFiles*/ false, /*checkChunks*/ false, /*searchFossils*/ false, /*resurrect*/ false, /*rewiret*/ false, 1, /*allowFailures*/false)
backupManager.SnapshotManager.PruneSnapshots("host1", "host1" /*revisions*/, []int{1} /*tags*/, nil /*retentions*/, nil,
/*exhaustive*/ false /*exclusive=*/, false /*ignoredIDs*/, nil /*dryRun*/, false /*deleteOnly*/, false /*collectOnly*/, false, 1)
numberOfSnapshots = backupManager.SnapshotManager.ListSnapshots( /*snapshotID*/ "host1" /*revisionsToList*/, nil /*tag*/, "" /*showFiles*/, false /*showChunks*/, false)
if numberOfSnapshots != 2 {
t.Errorf("Expected 2 snapshots but got %d", numberOfSnapshots)
}
backupManager.SnapshotManager.CheckSnapshots( /*snapshotID*/ "host1" /*revisions*/, []int{2, 3} /*tag*/, "",
/*showStatistics*/ false /*showTabular*/, false /*checkFiles*/, false /*checkChunks*/, false /*searchFossils*/, false /*resurrect*/, false, 1 /*allowFailures*/, false)
backupManager.SnapshotManager.CheckSnapshots( /*snapshotID*/ "host1", /*revisions*/ []int{2, 3}, /*tag*/ "", /*showStatistics*/ false,
/*showTabular*/ false, /*checkFiles*/ false, /*checkChunks*/ false, /*searchFossils*/ false, /*resurrect*/ false, /*rewiret*/ false, 1, /*allowFailures*/ false)
backupManager.Backup(testDir+"/repository1" /*quickMode=*/, false, threads, "fourth", false, false, 0, false, 1024, 1024)
backupManager.SnapshotManager.PruneSnapshots("host1", "host1" /*revisions*/, nil /*tags*/, nil /*retentions*/, nil,
/*exhaustive*/ false /*exclusive=*/, true /*ignoredIDs*/, nil /*dryRun*/, false /*deleteOnly*/, false /*collectOnly*/, false, 1)
@@ -375,8 +376,8 @@ func TestBackupManager(t *testing.T) {
if numberOfSnapshots != 3 {
t.Errorf("Expected 3 snapshots but got %d", numberOfSnapshots)
}
backupManager.SnapshotManager.CheckSnapshots( /*snapshotID*/ "host1" /*revisions*/, []int{2, 3, 4} /*tag*/, "",
/*showStatistics*/ false /*showTabular*/, false /*checkFiles*/, false /*checkChunks*/, false /*searchFossils*/, false /*resurrect*/, false, 1 /*allowFailures*/, false)
backupManager.SnapshotManager.CheckSnapshots( /*snapshotID*/ "host1", /*revisions*/ []int{2, 3, 4}, /*tag*/ "", /*showStatistics*/ false,
/*showTabular*/ false, /*checkFiles*/ false, /*checkChunks*/ false, /*searchFossils*/ false, /*resurrect*/ false, /*rewiret*/ false, 1, /*allowFailures*/ false)
/*buf := make([]byte, 1<<16)
runtime.Stack(buf, true)
@@ -548,13 +549,13 @@ func TestPersistRestore(t *testing.T) {
// check snapshots
unencBackupManager.SnapshotManager.CheckSnapshots( /*snapshotID*/ "host1" /*revisions*/, []int{1} /*tag*/, "",
/*showStatistics*/ true /*showTabular*/, false /*checkFiles*/, true /*checkChunks*/, false,
/*searchFossils*/ false /*resurrect*/, false, 1 /*allowFailures*/, false)
unencBackupManager.SnapshotManager.CheckSnapshots( /*snapshotID*/ "host1", /*revisions*/ []int{1}, /*tag*/ "",
/*showStatistics*/ true, /*showTabular*/ false, /*checkFiles*/ true, /*checkChunks*/ false,
/*searchFossils*/ false, /*resurrect*/ false, /*rewiret*/ false, 1, /*allowFailures*/ false)
encBackupManager.SnapshotManager.CheckSnapshots( /*snapshotID*/ "host1" /*revisions*/, []int{1} /*tag*/, "",
/*showStatistics*/ true /*showTabular*/, false /*checkFiles*/, true /*checkChunks*/, false,
/*searchFossils*/ false /*resurrect*/, false, 1 /*allowFailures*/, false)
encBackupManager.SnapshotManager.CheckSnapshots( /*snapshotID*/ "host1", /*revisions*/ []int{1}, /*tag*/ "",
/*showStatistics*/ true, /*showTabular*/ false, /*checkFiles*/ true, /*checkChunks*/ false,
/*searchFossils*/ false, /*resurrect*/ false, /*rewiret*/ false, 1, /*allowFailures*/ false)
// check functions
checkAllUncorrupted := func(cmpRepository string) {
@@ -640,18 +641,28 @@ func TestPersistRestore(t *testing.T) {
os.Remove(testDir+"/unenc_storage"+"/chunks"+chunkToCorrupt1)
os.Remove(testDir+"/enc_storage"+"/chunks"+chunkToCorrupt2)
}
// This is to make sure that allowFailures is set to true. Note that this is not needed
// in the production code because chunkOperator can be only recreated multiple time in tests.
if unencBackupManager.SnapshotManager.chunkOperator != nil {
unencBackupManager.SnapshotManager.chunkOperator.allowFailures = true
}
if encBackupManager.SnapshotManager.chunkOperator != nil {
encBackupManager.SnapshotManager.chunkOperator.allowFailures = true
}
// check snapshots with --persist (allowFailures == true)
// this would cause a panic and os.Exit from duplicacy_log if allowFailures == false
unencBackupManager.SnapshotManager.CheckSnapshots( /*snapshotID*/ "host1" /*revisions*/, []int{1} /*tag*/, "",
/*showStatistics*/ true /*showTabular*/, false /*checkFiles*/, true /*checkChunks*/, false,
/*searchFossils*/ false /*resurrect*/, false, 1 /*allowFailures*/, true)
unencBackupManager.SnapshotManager.CheckSnapshots( /*snapshotID*/ "host1", /*revisions*/ []int{1}, /*tag*/ "",
/*showStatistics*/ true, /*showTabular*/ false, /*checkFiles*/ true, /*checkChunks*/ false,
/*searchFossils*/ false, /*resurrect*/ false, /*rewrite*/ false, 1, /*allowFailures*/ true)
encBackupManager.SnapshotManager.CheckSnapshots( /*snapshotID*/ "host1" /*revisions*/, []int{1} /*tag*/, "",
/*showStatistics*/ true /*showTabular*/, false /*checkFiles*/, true /*checkChunks*/, false,
/*searchFossils*/ false /*resurrect*/, false, 1 /*allowFailures*/, true)
encBackupManager.SnapshotManager.CheckSnapshots( /*snapshotID*/ "host1", /*revisions*/ []int{1}, /*tag*/ "",
/*showStatistics*/ true, /*showTabular*/ false, /*checkFiles*/ true, /*checkChunks*/ false,
/*searchFossils*/ false, /*resurrect*/ false, /*rewrite*/ false, 1, /*allowFailures*/ true)
// test restore corrupted, inPlace = true, corrupted files will have hash failures
os.RemoveAll(testDir+"/repository2")
SetDuplicacyPreferencePath(testDir + "/repository2/.duplicacy")

View File

@@ -24,6 +24,12 @@ import (
"github.com/bkaradzic/go-lz4"
"github.com/minio/highwayhash"
"github.com/klauspost/reedsolomon"
// This is a fork of github.com/minio/highwayhash at 1.0.1 that computes incorrect hash on
// arm64 machines. We need this fork to be able to read the chunks created by Duplicacy
// CLI 3.0.1 which unfortunately relies on incorrect hashes to determine if each shard is valid.
wronghighwayhash "github.com/gilbertchen/highwayhash"
)
// A chunk needs to acquire a new buffer and return the old one for every encrypt/decrypt operation, therefore
@@ -371,8 +377,9 @@ func init() {
// Decrypt decrypts the encrypted data stored in the chunk buffer. If derivationKey is not nil, the actual
// encryption key will be HMAC-SHA256(encryptionKey, derivationKey).
func (chunk *Chunk) Decrypt(encryptionKey []byte, derivationKey string) (err error) {
func (chunk *Chunk) Decrypt(encryptionKey []byte, derivationKey string) (err error, rewriteNeeded bool) {
rewriteNeeded = false
var offset int
encryptedBuffer := AllocateChunkBuffer()
@@ -388,13 +395,13 @@ func (chunk *Chunk) Decrypt(encryptionKey []byte, derivationKey string) (err err
// The chunk was encoded with erasure coding
if len(encryptedBuffer.Bytes()) < bannerLength + 14 {
return fmt.Errorf("Erasure coding header truncated (%d bytes)", len(encryptedBuffer.Bytes()))
return fmt.Errorf("Erasure coding header truncated (%d bytes)", len(encryptedBuffer.Bytes())), false
}
// Check the header checksum
header := encryptedBuffer.Bytes()[bannerLength: bannerLength + 14]
if header[12] != header[0] ^ header[2] ^ header[4] ^ header[6] ^ header[8] ^ header[10] ||
header[13] != header[1] ^ header[3] ^ header[5] ^ header[7] ^ header[9] ^ header[11] {
return fmt.Errorf("Erasure coding header corrupted (%x)", header)
return fmt.Errorf("Erasure coding header corrupted (%x)", header), false
}
// Read the parameters
@@ -414,7 +421,7 @@ func (chunk *Chunk) Decrypt(encryptionKey []byte, derivationKey string) (err err
} else if len(encryptedBuffer.Bytes()) > minimumLength {
LOG_WARN("CHUNK_ERASURECODE", "Chunk is truncated (%d out of %d bytes)", len(encryptedBuffer.Bytes()), expectedLength)
} else {
return fmt.Errorf("Not enough chunk data for recovery; chunk size: %d bytes, data size: %d, parity: %d/%d", chunkSize, len(encryptedBuffer.Bytes()), dataShards, parityShards)
return fmt.Errorf("Not enough chunk data for recovery; chunk size: %d bytes, data size: %d, parity: %d/%d", chunkSize, len(encryptedBuffer.Bytes()), dataShards, parityShards), false
}
// Where the hashes start
@@ -426,6 +433,8 @@ func (chunk *Chunk) Decrypt(encryptionKey []byte, derivationKey string) (err err
recoveryNeeded := false
hashKey := make([]byte, 32)
availableShards := 0
wrongHashDetected := false
for i := 0; i < dataShards + parityShards; i++ {
start := dataOffset + i * shardSize
if start + shardSize > len(encryptedBuffer.Bytes()) {
@@ -435,15 +444,34 @@ func (chunk *Chunk) Decrypt(encryptionKey []byte, derivationKey string) (err err
// Now verify the hash
hasher, err := highwayhash.New(hashKey)
if err != nil {
return err
return err, false
}
_, err = hasher.Write(encryptedBuffer.Bytes()[start: start + shardSize])
if err != nil {
return err
return err, false
}
if bytes.Compare(hasher.Sum(nil), encryptedBuffer.Bytes()[hashOffset + i * 32: hashOffset + (i + 1) * 32]) != 0 {
matched := bytes.Compare(hasher.Sum(nil), encryptedBuffer.Bytes()[hashOffset + i * 32: hashOffset + (i + 1) * 32]) == 0
if !matched && runtime.GOARCH == "arm64" {
hasher, err := wronghighwayhash.New(hashKey)
if err == nil {
_, err = hasher.Write(encryptedBuffer.Bytes()[start: start + shardSize])
if err == nil {
matched = bytes.Compare(hasher.Sum(nil), encryptedBuffer.Bytes()[hashOffset + i * 32: hashOffset + (i + 1) * 32]) == 0
if matched && !wrongHashDetected {
LOG_WARN("CHUNK_ERASURECODE", "Hash for shard %d was calculated with a wrong version of highwayhash", i)
wrongHashDetected = true
rewriteNeeded = true
}
}
}
}
if !matched {
if i < dataShards {
recoveryNeeded = true
rewriteNeeded = true
}
} else {
// The shard is good
@@ -463,7 +491,7 @@ func (chunk *Chunk) Decrypt(encryptionKey []byte, derivationKey string) (err err
encryptedBuffer.Read(encryptedBuffer.Bytes()[:dataOffset])
} else {
if availableShards < dataShards {
return fmt.Errorf("Not enough chunk data for recover; only %d out of %d shards are complete", availableShards, dataShards + parityShards)
return fmt.Errorf("Not enough chunk data for recover; only %d out of %d shards are complete", availableShards, dataShards + parityShards), false
}
// Show the validity of shards using a string of * and -
@@ -479,11 +507,11 @@ func (chunk *Chunk) Decrypt(encryptionKey []byte, derivationKey string) (err err
LOG_WARN("CHUNK_ERASURECODE", "Recovering a %d byte chunk from %d byte shards: %s", chunkSize, shardSize, slots)
encoder, err := reedsolomon.New(dataShards, parityShards)
if err != nil {
return err
return err, false
}
err = encoder.Reconstruct(data)
if err != nil {
return err
return err, false
}
LOG_DEBUG("CHUNK_ERASURECODE", "Chunk data successfully recovered")
buffer := AllocateChunkBuffer()
@@ -516,28 +544,28 @@ func (chunk *Chunk) Decrypt(encryptionKey []byte, derivationKey string) (err err
}
if len(encryptedBuffer.Bytes()) < bannerLength + 12 {
return fmt.Errorf("No enough encrypted data (%d bytes) provided", len(encryptedBuffer.Bytes()))
return fmt.Errorf("No enough encrypted data (%d bytes) provided", len(encryptedBuffer.Bytes())), false
}
if string(encryptedBuffer.Bytes()[:bannerLength-1]) != ENCRYPTION_BANNER[:bannerLength-1] {
return fmt.Errorf("The storage doesn't seem to be encrypted")
return fmt.Errorf("The storage doesn't seem to be encrypted"), false
}
encryptionVersion := encryptedBuffer.Bytes()[bannerLength-1]
if encryptionVersion != 0 && encryptionVersion != ENCRYPTION_VERSION_RSA {
return fmt.Errorf("Unsupported encryption version %d", encryptionVersion)
return fmt.Errorf("Unsupported encryption version %d", encryptionVersion), false
}
if encryptionVersion == ENCRYPTION_VERSION_RSA {
if chunk.config.rsaPrivateKey == nil {
LOG_ERROR("CHUNK_DECRYPT", "An RSA private key is required to decrypt the chunk")
return fmt.Errorf("An RSA private key is required to decrypt the chunk")
return fmt.Errorf("An RSA private key is required to decrypt the chunk"), false
}
encryptedKeyLength := binary.LittleEndian.Uint16(encryptedBuffer.Bytes()[bannerLength:bannerLength+2])
if len(encryptedBuffer.Bytes()) < bannerLength + 14 + int(encryptedKeyLength) {
return fmt.Errorf("No enough encrypted data (%d bytes) provided", len(encryptedBuffer.Bytes()))
return fmt.Errorf("No enough encrypted data (%d bytes) provided", len(encryptedBuffer.Bytes())), false
}
encryptedKey := encryptedBuffer.Bytes()[bannerLength + 2:bannerLength + 2 + int(encryptedKeyLength)]
@@ -545,19 +573,19 @@ func (chunk *Chunk) Decrypt(encryptionKey []byte, derivationKey string) (err err
decryptedKey, err := rsa.DecryptOAEP(sha256.New(), rand.Reader, chunk.config.rsaPrivateKey, encryptedKey, nil)
if err != nil {
return err
return err, false
}
key = decryptedKey
}
aesBlock, err := aes.NewCipher(key)
if err != nil {
return err
return err, false
}
gcm, err := cipher.NewGCM(aesBlock)
if err != nil {
return err
return err, false
}
offset = bannerLength + gcm.NonceSize()
@@ -567,7 +595,7 @@ func (chunk *Chunk) Decrypt(encryptionKey []byte, derivationKey string) (err err
encryptedBuffer.Bytes()[offset:], nil)
if err != nil {
return err
return err, false
}
paddingLength := int(decryptedBytes[len(decryptedBytes)-1])
@@ -575,14 +603,14 @@ func (chunk *Chunk) Decrypt(encryptionKey []byte, derivationKey string) (err err
paddingLength = 256
}
if len(decryptedBytes) <= paddingLength {
return fmt.Errorf("Incorrect padding length %d out of %d bytes", paddingLength, len(decryptedBytes))
return fmt.Errorf("Incorrect padding length %d out of %d bytes", paddingLength, len(decryptedBytes)), false
}
for i := 0; i < paddingLength; i++ {
padding := decryptedBytes[len(decryptedBytes)-1-i]
if padding != byte(paddingLength) {
return fmt.Errorf("Incorrect padding of length %d: %x", paddingLength,
decryptedBytes[len(decryptedBytes)-paddingLength:])
decryptedBytes[len(decryptedBytes)-paddingLength:]), false
}
}
@@ -596,18 +624,18 @@ func (chunk *Chunk) Decrypt(encryptionKey []byte, derivationKey string) (err err
chunk.buffer.Reset()
decompressed, err := lz4.Decode(chunk.buffer.Bytes(), encryptedBuffer.Bytes()[4:])
if err != nil {
return err
return err, false
}
chunk.buffer.Write(decompressed)
chunk.hasher = chunk.config.NewKeyedHasher(chunk.config.HashKey)
chunk.hasher.Write(decompressed)
chunk.hash = nil
return nil
return nil, rewriteNeeded
}
inflater, err := zlib.NewReader(encryptedBuffer)
if err != nil {
return err
return err, false
}
defer inflater.Close()
@@ -617,9 +645,9 @@ func (chunk *Chunk) Decrypt(encryptionKey []byte, derivationKey string) (err err
chunk.hash = nil
if _, err = io.Copy(chunk, inflater); err != nil {
return err
return err, false
}
return nil
return nil, rewriteNeeded
}

View File

@@ -43,7 +43,7 @@ func TestErasureCoding(t *testing.T) {
chunk.Reset(false)
chunk.Write(encryptedData)
err = chunk.Decrypt([]byte(""), "")
err, _ = chunk.Decrypt([]byte(""), "")
if err != nil {
t.Errorf("Failed to decrypt the data: %v", err)
return
@@ -110,7 +110,7 @@ func TestChunkBasic(t *testing.T) {
chunk.Reset(false)
chunk.Write(encryptedData)
err = chunk.Decrypt(key, "")
err, _ = chunk.Decrypt(key, "")
if err != nil {
t.Errorf("Failed to decrypt the data: %v", err)
continue

View File

@@ -57,11 +57,14 @@ type ChunkOperator struct {
allowFailures bool // Whether to fail on download error, or continue
NumberOfFailedChunks int64 // The number of chunks that can't be downloaded
rewriteChunks bool // Whether to rewrite corrupted chunks when erasure coding is enabled
UploadCompletionFunc func(chunk *Chunk, chunkIndex int, inCache bool, chunkSize int, uploadSize int)
}
// CreateChunkOperator creates a new ChunkOperator.
func CreateChunkOperator(config *Config, storage Storage, snapshotCache *FileStorage, showStatistics bool, threads int, allowFailures bool) *ChunkOperator {
func CreateChunkOperator(config *Config, storage Storage, snapshotCache *FileStorage, showStatistics bool, rewriteChunks bool, threads int,
allowFailures bool) *ChunkOperator {
operator := &ChunkOperator{
config: config,
@@ -76,6 +79,7 @@ func CreateChunkOperator(config *Config, storage Storage, snapshotCache *FileSto
collectionLock: &sync.Mutex{},
startTime: time.Now().Unix(),
allowFailures: allowFailures,
rewriteChunks: rewriteChunks,
}
// Start the operator goroutines
@@ -331,24 +335,34 @@ func (operator *ChunkOperator) DownloadChunk(threadIndex int, task ChunkTask) {
atomic.AddInt64(&operator.NumberOfFailedChunks, 1)
if operator.allowFailures {
chunk.isBroken = true
task.completionFunc(chunk, task.chunkIndex)
}
}
chunkPath := ""
fossilPath := ""
filePath := ""
const MaxDownloadAttempts = 3
for downloadAttempt := 0; ; downloadAttempt++ {
exist := false
var err error
// Find the chunk by ID first.
chunkPath, exist, _, err := operator.storage.FindChunk(threadIndex, chunkID, false)
chunkPath, exist, _, err = operator.storage.FindChunk(threadIndex, chunkID, false)
if err != nil {
completeFailedChunk()
LOG_WERROR(operator.allowFailures, "DOWNLOAD_CHUNK", "Failed to find the chunk %s: %v", chunkID, err)
return
}
if !exist {
if exist {
filePath = chunkPath
} else {
// No chunk is found. Have to find it in the fossil pool again.
fossilPath, exist, _, err := operator.storage.FindChunk(threadIndex, chunkID, true)
fossilPath, exist, _, err = operator.storage.FindChunk(threadIndex, chunkID, true)
if err != nil {
completeFailedChunk()
LOG_WERROR(operator.allowFailures, "DOWNLOAD_CHUNK", "Failed to find the chunk %s: %v", chunkID, err)
@@ -383,20 +397,11 @@ func (operator *ChunkOperator) DownloadChunk(threadIndex int, task ChunkTask) {
return
}
// We can't download the fossil directly. We have to turn it back into a regular chunk and try
// downloading again.
err = operator.storage.MoveFile(threadIndex, fossilPath, chunkPath)
if err != nil {
completeFailedChunk()
LOG_WERROR(operator.allowFailures, "DOWNLOAD_CHUNK", "Failed to resurrect chunk %s: %v", chunkID, err)
return
}
LOG_WARN("DOWNLOAD_RESURRECT", "Fossil %s has been resurrected", chunkID)
continue
filePath = fossilPath
LOG_WARN("DOWNLOAD_FOSSIL", "Chunk %s is a fossil", chunkID)
}
err = operator.storage.DownloadFile(threadIndex, chunkPath, chunk)
err = operator.storage.DownloadFile(threadIndex, filePath, chunk)
if err != nil {
_, isHubic := operator.storage.(*HubicStorage)
// Retry on EOF or if it is a Hubic backend as it may return 404 even when the chunk exists
@@ -412,7 +417,8 @@ func (operator *ChunkOperator) DownloadChunk(threadIndex int, task ChunkTask) {
}
}
err = chunk.Decrypt(operator.config.ChunkKey, task.chunkHash)
rewriteNeeded := false
err, rewriteNeeded = chunk.Decrypt(operator.config.ChunkKey, task.chunkHash)
if err != nil {
if downloadAttempt < MaxDownloadAttempts {
LOG_WARN("DOWNLOAD_RETRY", "Failed to decrypt the chunk %s: %v; retrying", chunkID, err)
@@ -440,6 +446,38 @@ func (operator *ChunkOperator) DownloadChunk(threadIndex int, task ChunkTask) {
}
}
if rewriteNeeded && operator.rewriteChunks {
if filePath != fossilPath {
fossilPath = filePath + ".fsl"
err := operator.storage.MoveFile(threadIndex, chunkPath, fossilPath)
if err != nil {
LOG_WARN("CHUNK_REWRITE", "Failed to fossilize the chunk %s: %v", task.chunkID, err)
} else {
LOG_TRACE("CHUNK_REWRITE", "The existing chunk %s has been marked as a fossil for rewrite", task.chunkID)
operator.collectionLock.Lock()
operator.fossils = append(operator.fossils, fossilPath)
operator.collectionLock.Unlock()
}
}
newChunk := operator.config.GetChunk()
newChunk.Reset(true)
newChunk.Write(chunk.GetBytes())
// Encrypt the chunk only after we know that it must be uploaded.
err = newChunk.Encrypt(operator.config.ChunkKey, chunk.GetHash(), task.isMetadata)
if err == nil {
// Re-upload the chunk
err = operator.storage.UploadFile(threadIndex, chunkPath, newChunk.GetBytes())
if err != nil {
LOG_WARN("CHUNK_REWRITE", "Failed to re-upload the chunk %s: %v", chunkID, err)
} else {
LOG_INFO("CHUNK_REWRITE", "The chunk %s has been re-uploaded", chunkID)
}
}
operator.config.PutChunk(newChunk)
}
break
}

View File

@@ -87,7 +87,7 @@ func TestChunkOperator(t *testing.T) {
totalFileSize += chunk.GetLength()
}
chunkOperator := CreateChunkOperator(config, storage, nil, false, *testThreads, false)
chunkOperator := CreateChunkOperator(config, storage, nil, false, false, *testThreads, false)
chunkOperator.UploadCompletionFunc = func(chunk *Chunk, chunkIndex int, skipped bool, chunkSize int, uploadSize int) {
t.Logf("Chunk %s size %d (%d/%d) uploaded", chunk.GetID(), chunkSize, chunkIndex, len(chunks))
}

View File

@@ -436,7 +436,7 @@ func DownloadConfig(storage Storage, password string) (config *Config, isEncrypt
}
// Decrypt the config file. masterKey == nil means no encryption.
err = configFile.Decrypt(masterKey, "")
err, _ = configFile.Decrypt(masterKey, "")
if err != nil {
return nil, false, fmt.Errorf("Failed to retrieve the config file: %v", err)
}

View File

@@ -1,149 +0,0 @@
// Copyright (c) Acrosync LLC. All rights reserved.
// Free for personal use and commercial trial
// Commercial use requires per-user licenses available from https://duplicacy.com
package duplicacy
import (
"crypto/sha256"
"encoding/hex"
"fmt"
"io"
"testing"
crypto_rand "crypto/rand"
"math/rand"
)
func TestHubicClient(t *testing.T) {
hubicClient, err := NewHubicClient("hubic-token.json")
if err != nil {
t.Errorf("Failed to create the Hubic client: %v", err)
return
}
hubicClient.TestMode = true
existingFiles, err := hubicClient.ListEntries("")
for _, file := range existingFiles {
fmt.Printf("name: %s, isDir: %t\n", file.Name, file.Type == "application/directory")
}
testExists, _, _, err := hubicClient.GetFileInfo("test")
if err != nil {
t.Errorf("Failed to list the test directory: %v", err)
return
}
if !testExists {
err = hubicClient.CreateDirectory("test")
if err != nil {
t.Errorf("Failed to create the test directory: %v", err)
return
}
}
test1Exists, _, _, err := hubicClient.GetFileInfo("test/test1")
if err != nil {
t.Errorf("Failed to list the test1 directory: %v", err)
return
}
if !test1Exists {
err = hubicClient.CreateDirectory("test/test1")
if err != nil {
t.Errorf("Failed to create the test1 directory: %v", err)
return
}
}
test2Exists, _, _, err := hubicClient.GetFileInfo("test/test2")
if err != nil {
t.Errorf("Failed to list the test2 directory: %v", err)
return
}
if !test2Exists {
err = hubicClient.CreateDirectory("test/test2")
if err != nil {
t.Errorf("Failed to create the test2 directory: %v", err)
return
}
}
numberOfFiles := 20
maxFileSize := 64 * 1024
for i := 0; i < numberOfFiles; i++ {
content := make([]byte, rand.Int()%maxFileSize+1)
_, err = crypto_rand.Read(content)
if err != nil {
t.Errorf("Error generating random content: %v", err)
return
}
hasher := sha256.New()
hasher.Write(content)
filename := hex.EncodeToString(hasher.Sum(nil))
fmt.Printf("file: %s\n", filename)
err = hubicClient.UploadFile("test/test1/"+filename, content, 100)
if err != nil {
/*if e, ok := err.(ACDError); !ok || e.Status != 409 */ {
t.Errorf("Failed to upload the file %s: %v", filename, err)
return
}
}
}
entries, err := hubicClient.ListEntries("test/test1")
if err != nil {
t.Errorf("Error list randomly generated files: %v", err)
return
}
for _, entry := range entries {
exists, isDir, size, err := hubicClient.GetFileInfo("test/test1/" + entry.Name)
fmt.Printf("%s exists: %t, isDir: %t, size: %d, err: %v\n", "test/test1/"+entry.Name, exists, isDir, size, err)
err = hubicClient.MoveFile("test/test1/"+entry.Name, "test/test2/"+entry.Name)
if err != nil {
t.Errorf("Failed to move %s: %v", entry.Name, err)
return
}
}
entries, err = hubicClient.ListEntries("test/test2")
if err != nil {
t.Errorf("Error list randomly generated files: %v", err)
return
}
for _, entry := range entries {
readCloser, _, err := hubicClient.DownloadFile("test/test2/" + entry.Name)
if err != nil {
t.Errorf("Error downloading file %s: %v", entry.Name, err)
return
}
hasher := sha256.New()
io.Copy(hasher, readCloser)
hash := hex.EncodeToString(hasher.Sum(nil))
if hash != entry.Name {
t.Errorf("File %s, hash %s", entry.Name, hash)
}
readCloser.Close()
}
for _, entry := range entries {
err = hubicClient.DeleteFile("test/test2/" + entry.Name)
if err != nil {
t.Errorf("Failed to delete the file %s: %v", entry.Name, err)
return
}
}
}

View File

@@ -1,145 +0,0 @@
// Copyright (c) Acrosync LLC. All rights reserved.
// Free for personal use and commercial trial
// Commercial use requires per-user licenses available from https://duplicacy.com
package duplicacy
import (
"crypto/sha256"
"encoding/hex"
"fmt"
"io"
"testing"
crypto_rand "crypto/rand"
"math/rand"
)
func TestOneDriveClient(t *testing.T) {
oneDriveClient, err := NewOneDriveClient("one-token.json", false)
if err != nil {
t.Errorf("Failed to create the OneDrive client: %v", err)
return
}
oneDriveClient.TestMode = true
existingFiles, err := oneDriveClient.ListEntries("")
for _, file := range existingFiles {
fmt.Printf("name: %s, isDir: %t\n", file.Name, len(file.Folder) != 0)
}
testID, _, _, err := oneDriveClient.GetFileInfo("test")
if err != nil {
t.Errorf("Failed to list the test directory: %v", err)
return
}
if testID == "" {
err = oneDriveClient.CreateDirectory("", "test")
if err != nil {
t.Errorf("Failed to create the test directory: %v", err)
return
}
}
test1ID, _, _, err := oneDriveClient.GetFileInfo("test/test1")
if err != nil {
t.Errorf("Failed to list the test1 directory: %v", err)
return
}
if test1ID == "" {
err = oneDriveClient.CreateDirectory("test", "test1")
if err != nil {
t.Errorf("Failed to create the test1 directory: %v", err)
return
}
}
test2ID, _, _, err := oneDriveClient.GetFileInfo("test/test2")
if err != nil {
t.Errorf("Failed to list the test2 directory: %v", err)
return
}
if test2ID == "" {
err = oneDriveClient.CreateDirectory("test", "test2")
if err != nil {
t.Errorf("Failed to create the test2 directory: %v", err)
return
}
}
numberOfFiles := 20
maxFileSize := 64 * 1024
for i := 0; i < numberOfFiles; i++ {
content := make([]byte, rand.Int()%maxFileSize+1)
_, err = crypto_rand.Read(content)
if err != nil {
t.Errorf("Error generating random content: %v", err)
return
}
hasher := sha256.New()
hasher.Write(content)
filename := hex.EncodeToString(hasher.Sum(nil))
fmt.Printf("file: %s\n", filename)
err = oneDriveClient.UploadFile("test/test1/"+filename, content, 100)
if err != nil {
/*if e, ok := err.(ACDError); !ok || e.Status != 409 */ {
t.Errorf("Failed to upload the file %s: %v", filename, err)
return
}
}
}
entries, err := oneDriveClient.ListEntries("test/test1")
if err != nil {
t.Errorf("Error list randomly generated files: %v", err)
return
}
for _, entry := range entries {
err = oneDriveClient.MoveFile("test/test1/"+entry.Name, "test/test2")
if err != nil {
t.Errorf("Failed to move %s: %v", entry.Name, err)
return
}
}
entries, err = oneDriveClient.ListEntries("test/test2")
if err != nil {
t.Errorf("Error list randomly generated files: %v", err)
return
}
for _, entry := range entries {
readCloser, _, err := oneDriveClient.DownloadFile("test/test2/" + entry.Name)
if err != nil {
t.Errorf("Error downloading file %s: %v", entry.Name, err)
return
}
hasher := sha256.New()
io.Copy(hasher, readCloser)
hash := hex.EncodeToString(hasher.Sum(nil))
if hash != entry.Name {
t.Errorf("File %s, hash %s", entry.Name, hash)
}
readCloser.Close()
}
for _, entry := range entries {
err = oneDriveClient.DeleteFile("test/test2/" + entry.Name)
if err != nil {
t.Errorf("Failed to delete the file %s: %v", entry.Name, err)
return
}
}
}

View File

@@ -15,7 +15,6 @@ import (
"strings"
"time"
"sort"
"bytes"
"github.com/vmihailenco/msgpack"
@@ -52,6 +51,7 @@ type Snapshot struct {
// CreateEmptySnapshot creates an empty snapshot.
func CreateEmptySnapshot(id string) (snapshto *Snapshot) {
return &Snapshot{
Version: 1,
ID: id,
Revision: 0,
StartTime: time.Now().Unix(),
@@ -112,22 +112,21 @@ func (snapshot *Snapshot)ListRemoteFiles(config *Config, chunkOperator *ChunkOpe
}
var chunk *Chunk
reader := sequenceReader{
sequence: snapshot.FileSequence,
buffer: new(bytes.Buffer),
refillFunc: func(chunkHash string) []byte {
if chunk != nil {
config.PutChunk(chunk)
}
chunk = chunkOperator.Download(chunkHash, 0, true)
return chunk.GetBytes()
},
}
reader := NewSequenceReader(snapshot.FileSequence, func(chunkHash string) []byte {
if chunk != nil {
config.PutChunk(chunk)
}
chunk = chunkOperator.Download(chunkHash, 0, true)
return chunk.GetBytes()
})
if snapshot.Version == 0 {
// Normally if Version is 0 then the snapshot is created by CLI v2 but unfortunately CLI 3.0.1 does not set the
// version bit correctly when copying old backups. So we need to check the first byte -- if it is '[' then it is
// the old format. The new format starts with a string encoded in msgpack and the first byte can't be '['.
if snapshot.Version == 0 || reader.GetFirstByte() == '['{
LOG_INFO("SNAPSHOT_VERSION", "snapshot %s at revision %d is encoded in an old version format", snapshot.ID, snapshot.Revision)
files := make([]*Entry, 0)
decoder := json.NewDecoder(&reader)
decoder := json.NewDecoder(reader)
// read open bracket
_, err := decoder.Token()
@@ -156,7 +155,7 @@ func (snapshot *Snapshot)ListRemoteFiles(config *Config, chunkOperator *ChunkOpe
}
}
} else if snapshot.Version == 1 {
decoder := msgpack.NewDecoder(&reader)
decoder := msgpack.NewDecoder(reader)
lastEndChunk := 0
@@ -434,7 +433,7 @@ func (snapshot *Snapshot) MarshalJSON() ([]byte, error) {
object := make(map[string]interface{})
object["version"] = 1
object["version"] = snapshot.Version
object["id"] = snapshot.ID
object["revision"] = snapshot.Revision
object["options"] = snapshot.Options

View File

@@ -249,17 +249,27 @@ func (manager *SnapshotManager) DownloadSnapshot(snapshotID string, revision int
// the memory before passing them to the json unmarshaller.
type sequenceReader struct {
sequence []string
buffer *bytes.Buffer
buffer *bytes.Reader
index int
refillFunc func(hash string) []byte
}
func NewSequenceReader(sequence []string, refillFunc func(hash string) []byte) *sequenceReader {
newData := refillFunc(sequence[0])
return &sequenceReader{
sequence: sequence,
buffer: bytes.NewReader(newData),
index: 1,
refillFunc: refillFunc,
}
}
// Read reads a new chunk using the refill function when there is no more data in the buffer
func (reader *sequenceReader) Read(data []byte) (n int, err error) {
if len(reader.buffer.Bytes()) == 0 {
if reader.buffer.Len() == 0 {
if reader.index < len(reader.sequence) {
newData := reader.refillFunc(reader.sequence[reader.index])
reader.buffer.Write(newData)
reader.buffer = bytes.NewReader(newData)
reader.index++
} else {
return 0, io.EOF
@@ -269,15 +279,25 @@ func (reader *sequenceReader) Read(data []byte) (n int, err error) {
return reader.buffer.Read(data)
}
func (manager *SnapshotManager) CreateChunkOperator(resurrect bool, threads int, allowFailures bool) {
func (reader *sequenceReader) GetFirstByte() byte {
b, err := reader.buffer.ReadByte()
reader.buffer.UnreadByte()
if err != nil {
return 0
} else {
return b
}
}
func (manager *SnapshotManager) CreateChunkOperator(resurrect bool, rewriteChunks bool, threads int, allowFailures bool) {
if manager.chunkOperator == nil {
manager.chunkOperator = CreateChunkOperator(manager.config, manager.storage, manager.snapshotCache, resurrect, threads, allowFailures)
manager.chunkOperator = CreateChunkOperator(manager.config, manager.storage, manager.snapshotCache, resurrect, rewriteChunks, threads, allowFailures)
}
}
// DownloadSequence returns the content represented by a sequence of chunks.
func (manager *SnapshotManager) DownloadSequence(sequence []string) (content []byte) {
manager.CreateChunkOperator(false, 1, false)
manager.CreateChunkOperator(false, false, 1, false)
for _, chunkHash := range sequence {
chunk := manager.chunkOperator.Download(chunkHash, 0, true)
content = append(content, chunk.GetBytes()...)
@@ -654,7 +674,7 @@ func (manager *SnapshotManager) ListSnapshots(snapshotID string, revisionsToList
LOG_DEBUG("LIST_PARAMETERS", "id: %s, revisions: %v, tag: %s, showFiles: %t, showChunks: %t",
snapshotID, revisionsToList, tag, showFiles, showChunks)
manager.CreateChunkOperator(false, 1, false)
manager.CreateChunkOperator(false, false, 1, false)
defer func() {
manager.chunkOperator.Stop()
manager.chunkOperator = nil
@@ -760,9 +780,9 @@ func (manager *SnapshotManager) ListSnapshots(snapshotID string, revisionsToList
// CheckSnapshots checks if there is any problem with a snapshot.
func (manager *SnapshotManager) CheckSnapshots(snapshotID string, revisionsToCheck []int, tag string, showStatistics bool, showTabular bool,
checkFiles bool, checkChunks, searchFossils bool, resurrect bool, threads int, allowFailures bool) bool {
checkFiles bool, checkChunks, searchFossils bool, resurrect bool, rewriteChunks bool, threads int, allowFailures bool) bool {
manager.CreateChunkOperator(resurrect, threads, allowFailures)
manager.CreateChunkOperator(resurrect, rewriteChunks, threads, allowFailures)
defer func() {
manager.chunkOperator.Stop()
manager.chunkOperator = nil
@@ -1318,6 +1338,10 @@ func (manager *SnapshotManager) VerifySnapshot(snapshot *Snapshot) bool {
LOG_TRACE("SNAPSHOT_VERIFY", "%s", file.Path)
}
if lastChunk != nil {
manager.config.PutChunk(lastChunk)
}
if corruptedFiles > 0 {
LOG_WARN("SNAPSHOT_VERIFY", "Snapshot %s at revision %d contains %d corrupted files",
snapshot.ID, snapshot.Revision, corruptedFiles)
@@ -1336,7 +1360,7 @@ func (manager *SnapshotManager) RetrieveFile(snapshot *Snapshot, file *Entry, la
return true
}
manager.CreateChunkOperator(false, 1, false)
manager.CreateChunkOperator(false, false, 1, false)
fileHasher := manager.config.NewFileHasher()
alternateHash := false
@@ -1372,6 +1396,11 @@ func (manager *SnapshotManager) RetrieveFile(snapshot *Snapshot, file *Entry, la
}
}
if chunk.isBroken {
*lastChunk = nil
return false
}
output(chunk.GetBytes()[start:end])
if alternateHash {
fileHasher.Write([]byte(hex.EncodeToString([]byte(hash))))
@@ -1465,7 +1494,7 @@ func (manager *SnapshotManager) Diff(top string, snapshotID string, revisions []
LOG_DEBUG("DIFF_PARAMETERS", "top: %s, id: %s, revision: %v, path: %s, compareByHash: %t",
top, snapshotID, revisions, filePath, compareByHash)
manager.CreateChunkOperator(false, 1, false)
manager.CreateChunkOperator(false, false, 1, false)
defer func() {
manager.chunkOperator.Stop()
manager.chunkOperator = nil
@@ -1690,7 +1719,7 @@ func (manager *SnapshotManager) ShowHistory(top string, snapshotID string, revis
LOG_DEBUG("HISTORY_PARAMETERS", "top: %s, id: %s, revisions: %v, path: %s, showLocalHash: %t",
top, snapshotID, revisions, filePath, showLocalHash)
manager.CreateChunkOperator(false, 1, false)
manager.CreateChunkOperator(false, false, 1, false)
defer func() {
manager.chunkOperator.Stop()
manager.chunkOperator = nil
@@ -1818,7 +1847,7 @@ func (manager *SnapshotManager) PruneSnapshots(selfID string, snapshotID string,
LOG_WARN("DELETE_OPTIONS", "Tags or retention policy will be ignored if at least one revision is specified")
}
manager.CreateChunkOperator(false, threads, false)
manager.CreateChunkOperator(false, false, threads, false)
defer func() {
manager.chunkOperator.Stop()
manager.chunkOperator = nil
@@ -2594,12 +2623,29 @@ func (manager *SnapshotManager) DownloadFile(path string, derivationKey string)
derivationKey = derivationKey[len(derivationKey)-64:]
}
err = manager.fileChunk.Decrypt(manager.config.FileKey, derivationKey)
err, rewriteNeeded := manager.fileChunk.Decrypt(manager.config.FileKey, derivationKey)
if err != nil {
LOG_ERROR("DOWNLOAD_DECRYPT", "Failed to decrypt the file %s: %v", path, err)
return nil
}
if rewriteNeeded && manager.chunkOperator.rewriteChunks {
newChunk := manager.config.GetChunk()
newChunk.Reset(true)
newChunk.Write(manager.fileChunk.GetBytes())
err = newChunk.Encrypt(manager.config.FileKey, derivationKey, true)
if err == nil {
err = manager.storage.UploadFile(0, path, newChunk.GetBytes())
if err != nil {
LOG_WARN("DOWNLOAD_REWRITE", "Failed to re-uploaded the file %s: %v", path, err)
} else{
LOG_INFO("DOWNLOAD_REWRITE", "The file %s has been re-uploaded", path)
}
}
manager.config.PutChunk(newChunk)
}
err = manager.snapshotCache.UploadFile(0, path, manager.fileChunk.GetBytes())
if err != nil {
LOG_WARN("DOWNLOAD_FILE_CACHE", "Failed to add the file %s to the snapshot cache: %v", path, err)

View File

@@ -116,7 +116,7 @@ func createTestSnapshotManager(testDir string) *SnapshotManager {
func uploadTestChunk(manager *SnapshotManager, content []byte) string {
chunkOperator := CreateChunkOperator(manager.config, manager.storage, nil, false, *testThreads, false)
chunkOperator := CreateChunkOperator(manager.config, manager.storage, nil, false, false, *testThreads, false)
chunkOperator.UploadCompletionFunc = func(chunk *Chunk, chunkIndex int, skipped bool, chunkSize int, uploadSize int) {
LOG_INFO("UPLOAD_CHUNK", "Chunk %s size %d uploaded", chunk.GetID(), chunkSize)
}
@@ -179,7 +179,7 @@ func createTestSnapshot(manager *SnapshotManager, snapshotID string, revision in
func checkTestSnapshots(manager *SnapshotManager, expectedSnapshots int, expectedFossils int) {
manager.CreateChunkOperator(false, 1, false)
manager.CreateChunkOperator(false, false, 1, false)
defer func() {
manager.chunkOperator.Stop()
manager.chunkOperator = nil
@@ -625,7 +625,7 @@ func TestPruneNewSnapshots(t *testing.T) {
// Now chunkHash1 wil be resurrected
snapshotManager.PruneSnapshots("vm1@host1", "vm1@host1", []int{}, []string{}, []string{}, false, false, []string{}, false, false, false, 1)
checkTestSnapshots(snapshotManager, 4, 0)
snapshotManager.CheckSnapshots("vm1@host1", []int{2, 3}, "", false, false, false, false, false, false, 1, false)
snapshotManager.CheckSnapshots("vm1@host1", []int{2, 3}, "", false, false, false, false, false, false, false, 1, false)
}
// A fossil collection left by an aborted prune should be ignored if any supposedly deleted snapshot exists
@@ -674,7 +674,7 @@ func TestPruneGhostSnapshots(t *testing.T) {
// Run the prune again but the fossil collection should be igored, since revision 1 still exists
snapshotManager.PruneSnapshots("vm1@host1", "vm1@host1", []int{}, []string{}, []string{}, false, false, []string{}, false, false, false, 1)
checkTestSnapshots(snapshotManager, 3, 2)
snapshotManager.CheckSnapshots("vm1@host1", []int{1, 2, 3}, "", false, false, false, false, true /*searchFossils*/, false, 1, false)
snapshotManager.CheckSnapshots("vm1@host1", []int{1, 2, 3}, "", false, false, false, false, true /*searchFossils*/, false, false, 1, false)
// Prune snapshot 1 again
snapshotManager.PruneSnapshots("vm1@host1", "vm1@host1", []int{1}, []string{}, []string{}, false, false, []string{}, false, false, false, 1)
@@ -688,5 +688,5 @@ func TestPruneGhostSnapshots(t *testing.T) {
// Run the prune again and this time the fossil collection will be processed and the fossils removed
snapshotManager.PruneSnapshots("vm1@host1", "vm1@host1", []int{}, []string{}, []string{}, false, false, []string{}, false, false, false, 1)
checkTestSnapshots(snapshotManager, 3, 0)
snapshotManager.CheckSnapshots("vm1@host1", []int{2, 3, 4}, "", false, false, false, false, false, false, 1, false)
snapshotManager.CheckSnapshots("vm1@host1", []int{2, 3, 4}, "", false, false, false, false, false, false, false, 1, false)
}