1
0
mirror of https://github.com/gilbertchen/duplicacy synced 2025-12-15 07:43:21 +00:00

Add -rewrite to the check command to fix corrupted chunks

This option is useful only when erasure coding is enabled.  It will
download and re-upload chunks that contain corruption but are
generally recoverable.  It can also be used to fix chunks that
are created by 3.0.1 on arm64 machines with wrong hashes.
This commit is contained in:
Gilbert Chen
2022-11-15 11:47:02 -05:00
parent 6a7a2c8048
commit bc2d762e41
8 changed files with 130 additions and 62 deletions

View File

@@ -981,10 +981,11 @@ func checkSnapshots(context *cli.Context) {
checkChunks := context.Bool("chunks") checkChunks := context.Bool("chunks")
searchFossils := context.Bool("fossils") searchFossils := context.Bool("fossils")
resurrect := context.Bool("resurrect") resurrect := context.Bool("resurrect")
rewrite := context.Bool("rewrite")
persist := context.Bool("persist") persist := context.Bool("persist")
backupManager.SetupSnapshotCache(preference.Name) backupManager.SetupSnapshotCache(preference.Name)
backupManager.SnapshotManager.CheckSnapshots(id, revisions, tag, showStatistics, showTabular, checkFiles, checkChunks, searchFossils, resurrect, threads, persist) backupManager.SnapshotManager.CheckSnapshots(id, revisions, tag, showStatistics, showTabular, checkFiles, checkChunks, searchFossils, resurrect, rewrite, threads, persist)
runScript(context, preference.Name, "post") runScript(context, preference.Name, "post")
} }
@@ -1676,6 +1677,10 @@ func main() {
Name: "resurrect", Name: "resurrect",
Usage: "turn referenced fossils back into chunks", Usage: "turn referenced fossils back into chunks",
}, },
cli.BoolFlag{
Name: "rewrite",
Usage: "rewrite chunks with recoverable corruption",
},
cli.BoolFlag{ cli.BoolFlag{
Name: "files", Name: "files",
Usage: "verify the integrity of every file", Usage: "verify the integrity of every file",

View File

@@ -223,7 +223,7 @@ func (manager *BackupManager) Backup(top string, quickMode bool, threads int, ta
localListingChannel := make(chan *Entry) localListingChannel := make(chan *Entry)
remoteListingChannel := make(chan *Entry) remoteListingChannel := make(chan *Entry)
chunkOperator := CreateChunkOperator(manager.config, manager.storage, manager.snapshotCache, showStatistics, threads, false) chunkOperator := CreateChunkOperator(manager.config, manager.storage, manager.snapshotCache, showStatistics, false, threads, false)
var skippedDirectories []string var skippedDirectories []string
var skippedFiles []string var skippedFiles []string
@@ -673,7 +673,7 @@ func (manager *BackupManager) Restore(top string, revision int, inPlace bool, qu
localListingChannel := make(chan *Entry) localListingChannel := make(chan *Entry)
remoteListingChannel := make(chan *Entry) remoteListingChannel := make(chan *Entry)
chunkOperator := CreateChunkOperator(manager.config, manager.storage, manager.snapshotCache, showStatistics, threads, false) chunkOperator := CreateChunkOperator(manager.config, manager.storage, manager.snapshotCache, showStatistics, false, threads, allowFailures)
LOG_INFO("RESTORE_INDEXING", "Indexing %s", top) LOG_INFO("RESTORE_INDEXING", "Indexing %s", top)
go func() { go func() {
@@ -1715,13 +1715,13 @@ func (manager *BackupManager) CopySnapshots(otherManager *BackupManager, snapsho
LOG_INFO("SNAPSHOT_COPY", "Chunks to copy: %d, to skip: %d, total: %d", len(chunksToCopy), len(chunks) - len(chunksToCopy), len(chunks)) LOG_INFO("SNAPSHOT_COPY", "Chunks to copy: %d, to skip: %d, total: %d", len(chunksToCopy), len(chunks) - len(chunksToCopy), len(chunks))
chunkDownloader := CreateChunkOperator(manager.config, manager.storage, nil, false, downloadingThreads, false) chunkDownloader := CreateChunkOperator(manager.config, manager.storage, nil, false, false, downloadingThreads, false)
var uploadedBytes int64 var uploadedBytes int64
startTime := time.Now() startTime := time.Now()
copiedChunks := 0 copiedChunks := 0
chunkUploader := CreateChunkOperator(otherManager.config, otherManager.storage, nil, false, uploadingThreads, false) chunkUploader := CreateChunkOperator(otherManager.config, otherManager.storage, nil, false, false, uploadingThreads, false)
chunkUploader.UploadCompletionFunc = func(chunk *Chunk, chunkIndex int, skipped bool, chunkSize int, uploadSize int) { chunkUploader.UploadCompletionFunc = func(chunk *Chunk, chunkIndex int, skipped bool, chunkSize int, uploadSize int) {
action := "Skipped" action := "Skipped"
if !skipped { if !skipped {

View File

@@ -377,8 +377,9 @@ func init() {
// Decrypt decrypts the encrypted data stored in the chunk buffer. If derivationKey is not nil, the actual // Decrypt decrypts the encrypted data stored in the chunk buffer. If derivationKey is not nil, the actual
// encryption key will be HMAC-SHA256(encryptionKey, derivationKey). // encryption key will be HMAC-SHA256(encryptionKey, derivationKey).
func (chunk *Chunk) Decrypt(encryptionKey []byte, derivationKey string) (err error) { func (chunk *Chunk) Decrypt(encryptionKey []byte, derivationKey string) (err error, rewriteNeeded bool) {
rewriteNeeded = false
var offset int var offset int
encryptedBuffer := AllocateChunkBuffer() encryptedBuffer := AllocateChunkBuffer()
@@ -394,13 +395,13 @@ func (chunk *Chunk) Decrypt(encryptionKey []byte, derivationKey string) (err err
// The chunk was encoded with erasure coding // The chunk was encoded with erasure coding
if len(encryptedBuffer.Bytes()) < bannerLength + 14 { if len(encryptedBuffer.Bytes()) < bannerLength + 14 {
return fmt.Errorf("Erasure coding header truncated (%d bytes)", len(encryptedBuffer.Bytes())) return fmt.Errorf("Erasure coding header truncated (%d bytes)", len(encryptedBuffer.Bytes())), false
} }
// Check the header checksum // Check the header checksum
header := encryptedBuffer.Bytes()[bannerLength: bannerLength + 14] header := encryptedBuffer.Bytes()[bannerLength: bannerLength + 14]
if header[12] != header[0] ^ header[2] ^ header[4] ^ header[6] ^ header[8] ^ header[10] || if header[12] != header[0] ^ header[2] ^ header[4] ^ header[6] ^ header[8] ^ header[10] ||
header[13] != header[1] ^ header[3] ^ header[5] ^ header[7] ^ header[9] ^ header[11] { header[13] != header[1] ^ header[3] ^ header[5] ^ header[7] ^ header[9] ^ header[11] {
return fmt.Errorf("Erasure coding header corrupted (%x)", header) return fmt.Errorf("Erasure coding header corrupted (%x)", header), false
} }
// Read the parameters // Read the parameters
@@ -420,7 +421,7 @@ func (chunk *Chunk) Decrypt(encryptionKey []byte, derivationKey string) (err err
} else if len(encryptedBuffer.Bytes()) > minimumLength { } else if len(encryptedBuffer.Bytes()) > minimumLength {
LOG_WARN("CHUNK_ERASURECODE", "Chunk is truncated (%d out of %d bytes)", len(encryptedBuffer.Bytes()), expectedLength) LOG_WARN("CHUNK_ERASURECODE", "Chunk is truncated (%d out of %d bytes)", len(encryptedBuffer.Bytes()), expectedLength)
} else { } else {
return fmt.Errorf("Not enough chunk data for recovery; chunk size: %d bytes, data size: %d, parity: %d/%d", chunkSize, len(encryptedBuffer.Bytes()), dataShards, parityShards) return fmt.Errorf("Not enough chunk data for recovery; chunk size: %d bytes, data size: %d, parity: %d/%d", chunkSize, len(encryptedBuffer.Bytes()), dataShards, parityShards), false
} }
// Where the hashes start // Where the hashes start
@@ -443,11 +444,11 @@ func (chunk *Chunk) Decrypt(encryptionKey []byte, derivationKey string) (err err
// Now verify the hash // Now verify the hash
hasher, err := highwayhash.New(hashKey) hasher, err := highwayhash.New(hashKey)
if err != nil { if err != nil {
return err return err, false
} }
_, err = hasher.Write(encryptedBuffer.Bytes()[start: start + shardSize]) _, err = hasher.Write(encryptedBuffer.Bytes()[start: start + shardSize])
if err != nil { if err != nil {
return err return err, false
} }
matched := bytes.Compare(hasher.Sum(nil), encryptedBuffer.Bytes()[hashOffset + i * 32: hashOffset + (i + 1) * 32]) == 0 matched := bytes.Compare(hasher.Sum(nil), encryptedBuffer.Bytes()[hashOffset + i * 32: hashOffset + (i + 1) * 32]) == 0
@@ -461,6 +462,7 @@ func (chunk *Chunk) Decrypt(encryptionKey []byte, derivationKey string) (err err
if matched && !wrongHashDetected { if matched && !wrongHashDetected {
LOG_WARN("CHUNK_ERASURECODE", "Hash for shard %d was calculated with a wrong version of highwayhash", i) LOG_WARN("CHUNK_ERASURECODE", "Hash for shard %d was calculated with a wrong version of highwayhash", i)
wrongHashDetected = true wrongHashDetected = true
rewriteNeeded = true
} }
} }
} }
@@ -469,6 +471,7 @@ func (chunk *Chunk) Decrypt(encryptionKey []byte, derivationKey string) (err err
if !matched { if !matched {
if i < dataShards { if i < dataShards {
recoveryNeeded = true recoveryNeeded = true
rewriteNeeded = true
} }
} else { } else {
// The shard is good // The shard is good
@@ -488,7 +491,7 @@ func (chunk *Chunk) Decrypt(encryptionKey []byte, derivationKey string) (err err
encryptedBuffer.Read(encryptedBuffer.Bytes()[:dataOffset]) encryptedBuffer.Read(encryptedBuffer.Bytes()[:dataOffset])
} else { } else {
if availableShards < dataShards { if availableShards < dataShards {
return fmt.Errorf("Not enough chunk data for recover; only %d out of %d shards are complete", availableShards, dataShards + parityShards) return fmt.Errorf("Not enough chunk data for recover; only %d out of %d shards are complete", availableShards, dataShards + parityShards), false
} }
// Show the validity of shards using a string of * and - // Show the validity of shards using a string of * and -
@@ -504,11 +507,11 @@ func (chunk *Chunk) Decrypt(encryptionKey []byte, derivationKey string) (err err
LOG_WARN("CHUNK_ERASURECODE", "Recovering a %d byte chunk from %d byte shards: %s", chunkSize, shardSize, slots) LOG_WARN("CHUNK_ERASURECODE", "Recovering a %d byte chunk from %d byte shards: %s", chunkSize, shardSize, slots)
encoder, err := reedsolomon.New(dataShards, parityShards) encoder, err := reedsolomon.New(dataShards, parityShards)
if err != nil { if err != nil {
return err return err, false
} }
err = encoder.Reconstruct(data) err = encoder.Reconstruct(data)
if err != nil { if err != nil {
return err return err, false
} }
LOG_DEBUG("CHUNK_ERASURECODE", "Chunk data successfully recovered") LOG_DEBUG("CHUNK_ERASURECODE", "Chunk data successfully recovered")
buffer := AllocateChunkBuffer() buffer := AllocateChunkBuffer()
@@ -541,28 +544,28 @@ func (chunk *Chunk) Decrypt(encryptionKey []byte, derivationKey string) (err err
} }
if len(encryptedBuffer.Bytes()) < bannerLength + 12 { if len(encryptedBuffer.Bytes()) < bannerLength + 12 {
return fmt.Errorf("No enough encrypted data (%d bytes) provided", len(encryptedBuffer.Bytes())) return fmt.Errorf("No enough encrypted data (%d bytes) provided", len(encryptedBuffer.Bytes())), false
} }
if string(encryptedBuffer.Bytes()[:bannerLength-1]) != ENCRYPTION_BANNER[:bannerLength-1] { if string(encryptedBuffer.Bytes()[:bannerLength-1]) != ENCRYPTION_BANNER[:bannerLength-1] {
return fmt.Errorf("The storage doesn't seem to be encrypted") return fmt.Errorf("The storage doesn't seem to be encrypted"), false
} }
encryptionVersion := encryptedBuffer.Bytes()[bannerLength-1] encryptionVersion := encryptedBuffer.Bytes()[bannerLength-1]
if encryptionVersion != 0 && encryptionVersion != ENCRYPTION_VERSION_RSA { if encryptionVersion != 0 && encryptionVersion != ENCRYPTION_VERSION_RSA {
return fmt.Errorf("Unsupported encryption version %d", encryptionVersion) return fmt.Errorf("Unsupported encryption version %d", encryptionVersion), false
} }
if encryptionVersion == ENCRYPTION_VERSION_RSA { if encryptionVersion == ENCRYPTION_VERSION_RSA {
if chunk.config.rsaPrivateKey == nil { if chunk.config.rsaPrivateKey == nil {
LOG_ERROR("CHUNK_DECRYPT", "An RSA private key is required to decrypt the chunk") LOG_ERROR("CHUNK_DECRYPT", "An RSA private key is required to decrypt the chunk")
return fmt.Errorf("An RSA private key is required to decrypt the chunk") return fmt.Errorf("An RSA private key is required to decrypt the chunk"), false
} }
encryptedKeyLength := binary.LittleEndian.Uint16(encryptedBuffer.Bytes()[bannerLength:bannerLength+2]) encryptedKeyLength := binary.LittleEndian.Uint16(encryptedBuffer.Bytes()[bannerLength:bannerLength+2])
if len(encryptedBuffer.Bytes()) < bannerLength + 14 + int(encryptedKeyLength) { if len(encryptedBuffer.Bytes()) < bannerLength + 14 + int(encryptedKeyLength) {
return fmt.Errorf("No enough encrypted data (%d bytes) provided", len(encryptedBuffer.Bytes())) return fmt.Errorf("No enough encrypted data (%d bytes) provided", len(encryptedBuffer.Bytes())), false
} }
encryptedKey := encryptedBuffer.Bytes()[bannerLength + 2:bannerLength + 2 + int(encryptedKeyLength)] encryptedKey := encryptedBuffer.Bytes()[bannerLength + 2:bannerLength + 2 + int(encryptedKeyLength)]
@@ -570,19 +573,19 @@ func (chunk *Chunk) Decrypt(encryptionKey []byte, derivationKey string) (err err
decryptedKey, err := rsa.DecryptOAEP(sha256.New(), rand.Reader, chunk.config.rsaPrivateKey, encryptedKey, nil) decryptedKey, err := rsa.DecryptOAEP(sha256.New(), rand.Reader, chunk.config.rsaPrivateKey, encryptedKey, nil)
if err != nil { if err != nil {
return err return err, false
} }
key = decryptedKey key = decryptedKey
} }
aesBlock, err := aes.NewCipher(key) aesBlock, err := aes.NewCipher(key)
if err != nil { if err != nil {
return err return err, false
} }
gcm, err := cipher.NewGCM(aesBlock) gcm, err := cipher.NewGCM(aesBlock)
if err != nil { if err != nil {
return err return err, false
} }
offset = bannerLength + gcm.NonceSize() offset = bannerLength + gcm.NonceSize()
@@ -592,7 +595,7 @@ func (chunk *Chunk) Decrypt(encryptionKey []byte, derivationKey string) (err err
encryptedBuffer.Bytes()[offset:], nil) encryptedBuffer.Bytes()[offset:], nil)
if err != nil { if err != nil {
return err return err, false
} }
paddingLength := int(decryptedBytes[len(decryptedBytes)-1]) paddingLength := int(decryptedBytes[len(decryptedBytes)-1])
@@ -600,14 +603,14 @@ func (chunk *Chunk) Decrypt(encryptionKey []byte, derivationKey string) (err err
paddingLength = 256 paddingLength = 256
} }
if len(decryptedBytes) <= paddingLength { if len(decryptedBytes) <= paddingLength {
return fmt.Errorf("Incorrect padding length %d out of %d bytes", paddingLength, len(decryptedBytes)) return fmt.Errorf("Incorrect padding length %d out of %d bytes", paddingLength, len(decryptedBytes)), false
} }
for i := 0; i < paddingLength; i++ { for i := 0; i < paddingLength; i++ {
padding := decryptedBytes[len(decryptedBytes)-1-i] padding := decryptedBytes[len(decryptedBytes)-1-i]
if padding != byte(paddingLength) { if padding != byte(paddingLength) {
return fmt.Errorf("Incorrect padding of length %d: %x", paddingLength, return fmt.Errorf("Incorrect padding of length %d: %x", paddingLength,
decryptedBytes[len(decryptedBytes)-paddingLength:]) decryptedBytes[len(decryptedBytes)-paddingLength:]), false
} }
} }
@@ -621,18 +624,18 @@ func (chunk *Chunk) Decrypt(encryptionKey []byte, derivationKey string) (err err
chunk.buffer.Reset() chunk.buffer.Reset()
decompressed, err := lz4.Decode(chunk.buffer.Bytes(), encryptedBuffer.Bytes()[4:]) decompressed, err := lz4.Decode(chunk.buffer.Bytes(), encryptedBuffer.Bytes()[4:])
if err != nil { if err != nil {
return err return err, false
} }
chunk.buffer.Write(decompressed) chunk.buffer.Write(decompressed)
chunk.hasher = chunk.config.NewKeyedHasher(chunk.config.HashKey) chunk.hasher = chunk.config.NewKeyedHasher(chunk.config.HashKey)
chunk.hasher.Write(decompressed) chunk.hasher.Write(decompressed)
chunk.hash = nil chunk.hash = nil
return nil return nil, rewriteNeeded
} }
inflater, err := zlib.NewReader(encryptedBuffer) inflater, err := zlib.NewReader(encryptedBuffer)
if err != nil { if err != nil {
return err return err, false
} }
defer inflater.Close() defer inflater.Close()
@@ -642,9 +645,9 @@ func (chunk *Chunk) Decrypt(encryptionKey []byte, derivationKey string) (err err
chunk.hash = nil chunk.hash = nil
if _, err = io.Copy(chunk, inflater); err != nil { if _, err = io.Copy(chunk, inflater); err != nil {
return err return err, false
} }
return nil return nil, rewriteNeeded
} }

View File

@@ -43,7 +43,7 @@ func TestErasureCoding(t *testing.T) {
chunk.Reset(false) chunk.Reset(false)
chunk.Write(encryptedData) chunk.Write(encryptedData)
err = chunk.Decrypt([]byte(""), "") err, _ = chunk.Decrypt([]byte(""), "")
if err != nil { if err != nil {
t.Errorf("Failed to decrypt the data: %v", err) t.Errorf("Failed to decrypt the data: %v", err)
return return
@@ -110,7 +110,7 @@ func TestChunkBasic(t *testing.T) {
chunk.Reset(false) chunk.Reset(false)
chunk.Write(encryptedData) chunk.Write(encryptedData)
err = chunk.Decrypt(key, "") err, _ = chunk.Decrypt(key, "")
if err != nil { if err != nil {
t.Errorf("Failed to decrypt the data: %v", err) t.Errorf("Failed to decrypt the data: %v", err)
continue continue

View File

@@ -57,11 +57,14 @@ type ChunkOperator struct {
allowFailures bool // Whether to fail on download error, or continue allowFailures bool // Whether to fail on download error, or continue
NumberOfFailedChunks int64 // The number of chunks that can't be downloaded NumberOfFailedChunks int64 // The number of chunks that can't be downloaded
rewriteChunks bool // Whether to rewrite corrupted chunks when erasure coding is enabled
UploadCompletionFunc func(chunk *Chunk, chunkIndex int, inCache bool, chunkSize int, uploadSize int) UploadCompletionFunc func(chunk *Chunk, chunkIndex int, inCache bool, chunkSize int, uploadSize int)
} }
// CreateChunkOperator creates a new ChunkOperator. // CreateChunkOperator creates a new ChunkOperator.
func CreateChunkOperator(config *Config, storage Storage, snapshotCache *FileStorage, showStatistics bool, threads int, allowFailures bool) *ChunkOperator { func CreateChunkOperator(config *Config, storage Storage, snapshotCache *FileStorage, showStatistics bool, rewriteChunks bool, threads int,
allowFailures bool) *ChunkOperator {
operator := &ChunkOperator{ operator := &ChunkOperator{
config: config, config: config,
@@ -76,6 +79,7 @@ func CreateChunkOperator(config *Config, storage Storage, snapshotCache *FileSto
collectionLock: &sync.Mutex{}, collectionLock: &sync.Mutex{},
startTime: time.Now().Unix(), startTime: time.Now().Unix(),
allowFailures: allowFailures, allowFailures: allowFailures,
rewriteChunks: rewriteChunks,
} }
// Start the operator goroutines // Start the operator goroutines
@@ -331,24 +335,34 @@ func (operator *ChunkOperator) DownloadChunk(threadIndex int, task ChunkTask) {
atomic.AddInt64(&operator.NumberOfFailedChunks, 1) atomic.AddInt64(&operator.NumberOfFailedChunks, 1)
if operator.allowFailures { if operator.allowFailures {
chunk.isBroken = true
task.completionFunc(chunk, task.chunkIndex) task.completionFunc(chunk, task.chunkIndex)
} }
} }
chunkPath := ""
fossilPath := ""
filePath := ""
const MaxDownloadAttempts = 3 const MaxDownloadAttempts = 3
for downloadAttempt := 0; ; downloadAttempt++ { for downloadAttempt := 0; ; downloadAttempt++ {
exist := false
var err error
// Find the chunk by ID first. // Find the chunk by ID first.
chunkPath, exist, _, err := operator.storage.FindChunk(threadIndex, chunkID, false) chunkPath, exist, _, err = operator.storage.FindChunk(threadIndex, chunkID, false)
if err != nil { if err != nil {
completeFailedChunk() completeFailedChunk()
LOG_WERROR(operator.allowFailures, "DOWNLOAD_CHUNK", "Failed to find the chunk %s: %v", chunkID, err) LOG_WERROR(operator.allowFailures, "DOWNLOAD_CHUNK", "Failed to find the chunk %s: %v", chunkID, err)
return return
} }
if !exist { if exist {
filePath = chunkPath
} else {
// No chunk is found. Have to find it in the fossil pool again. // No chunk is found. Have to find it in the fossil pool again.
fossilPath, exist, _, err := operator.storage.FindChunk(threadIndex, chunkID, true) fossilPath, exist, _, err = operator.storage.FindChunk(threadIndex, chunkID, true)
if err != nil { if err != nil {
completeFailedChunk() completeFailedChunk()
LOG_WERROR(operator.allowFailures, "DOWNLOAD_CHUNK", "Failed to find the chunk %s: %v", chunkID, err) LOG_WERROR(operator.allowFailures, "DOWNLOAD_CHUNK", "Failed to find the chunk %s: %v", chunkID, err)
@@ -383,20 +397,11 @@ func (operator *ChunkOperator) DownloadChunk(threadIndex int, task ChunkTask) {
return return
} }
// We can't download the fossil directly. We have to turn it back into a regular chunk and try filePath = fossilPath
// downloading again. LOG_WARN("DOWNLOAD_FOSSIL", "Chunk %s is a fossil", chunkID)
err = operator.storage.MoveFile(threadIndex, fossilPath, chunkPath)
if err != nil {
completeFailedChunk()
LOG_WERROR(operator.allowFailures, "DOWNLOAD_CHUNK", "Failed to resurrect chunk %s: %v", chunkID, err)
return
} }
LOG_WARN("DOWNLOAD_RESURRECT", "Fossil %s has been resurrected", chunkID) err = operator.storage.DownloadFile(threadIndex, filePath, chunk)
continue
}
err = operator.storage.DownloadFile(threadIndex, chunkPath, chunk)
if err != nil { if err != nil {
_, isHubic := operator.storage.(*HubicStorage) _, isHubic := operator.storage.(*HubicStorage)
// Retry on EOF or if it is a Hubic backend as it may return 404 even when the chunk exists // Retry on EOF or if it is a Hubic backend as it may return 404 even when the chunk exists
@@ -412,7 +417,8 @@ func (operator *ChunkOperator) DownloadChunk(threadIndex int, task ChunkTask) {
} }
} }
err = chunk.Decrypt(operator.config.ChunkKey, task.chunkHash) rewriteNeeded := false
err, rewriteNeeded = chunk.Decrypt(operator.config.ChunkKey, task.chunkHash)
if err != nil { if err != nil {
if downloadAttempt < MaxDownloadAttempts { if downloadAttempt < MaxDownloadAttempts {
LOG_WARN("DOWNLOAD_RETRY", "Failed to decrypt the chunk %s: %v; retrying", chunkID, err) LOG_WARN("DOWNLOAD_RETRY", "Failed to decrypt the chunk %s: %v; retrying", chunkID, err)
@@ -440,6 +446,38 @@ func (operator *ChunkOperator) DownloadChunk(threadIndex int, task ChunkTask) {
} }
} }
if rewriteNeeded && operator.rewriteChunks {
if filePath != fossilPath {
fossilPath = filePath + ".fsl"
err := operator.storage.MoveFile(threadIndex, chunkPath, fossilPath)
if err != nil {
LOG_WARN("CHUNK_REWRITE", "Failed to fossilize the chunk %s: %v", task.chunkID, err)
} else {
LOG_TRACE("CHUNK_REWRITE", "The existing chunk %s has been marked as a fossil for rewrite", task.chunkID)
operator.collectionLock.Lock()
operator.fossils = append(operator.fossils, fossilPath)
operator.collectionLock.Unlock()
}
}
newChunk := operator.config.GetChunk()
newChunk.Reset(true)
newChunk.Write(chunk.GetBytes())
// Encrypt the chunk only after we know that it must be uploaded.
err = newChunk.Encrypt(operator.config.ChunkKey, chunk.GetHash(), task.isMetadata)
if err == nil {
// Re-upload the chunk
err = operator.storage.UploadFile(threadIndex, chunkPath, newChunk.GetBytes())
if err != nil {
LOG_WARN("CHUNK_REWRITE", "Failed to re-upload the chunk %s: %v", chunkID, err)
} else {
LOG_INFO("CHUNK_REWRITE", "The chunk %s has been re-uploaded", chunkID)
}
}
operator.config.PutChunk(newChunk)
}
break break
} }

View File

@@ -87,7 +87,7 @@ func TestChunkOperator(t *testing.T) {
totalFileSize += chunk.GetLength() totalFileSize += chunk.GetLength()
} }
chunkOperator := CreateChunkOperator(config, storage, nil, false, *testThreads, false) chunkOperator := CreateChunkOperator(config, storage, nil, false, false, *testThreads, false)
chunkOperator.UploadCompletionFunc = func(chunk *Chunk, chunkIndex int, skipped bool, chunkSize int, uploadSize int) { chunkOperator.UploadCompletionFunc = func(chunk *Chunk, chunkIndex int, skipped bool, chunkSize int, uploadSize int) {
t.Logf("Chunk %s size %d (%d/%d) uploaded", chunk.GetID(), chunkSize, chunkIndex, len(chunks)) t.Logf("Chunk %s size %d (%d/%d) uploaded", chunk.GetID(), chunkSize, chunkIndex, len(chunks))
} }

View File

@@ -436,7 +436,7 @@ func DownloadConfig(storage Storage, password string) (config *Config, isEncrypt
} }
// Decrypt the config file. masterKey == nil means no encryption. // Decrypt the config file. masterKey == nil means no encryption.
err = configFile.Decrypt(masterKey, "") err, _ = configFile.Decrypt(masterKey, "")
if err != nil { if err != nil {
return nil, false, fmt.Errorf("Failed to retrieve the config file: %v", err) return nil, false, fmt.Errorf("Failed to retrieve the config file: %v", err)
} }

View File

@@ -269,15 +269,15 @@ func (reader *sequenceReader) Read(data []byte) (n int, err error) {
return reader.buffer.Read(data) return reader.buffer.Read(data)
} }
func (manager *SnapshotManager) CreateChunkOperator(resurrect bool, threads int, allowFailures bool) { func (manager *SnapshotManager) CreateChunkOperator(resurrect bool, rewriteChunks bool, threads int, allowFailures bool) {
if manager.chunkOperator == nil { if manager.chunkOperator == nil {
manager.chunkOperator = CreateChunkOperator(manager.config, manager.storage, manager.snapshotCache, resurrect, threads, allowFailures) manager.chunkOperator = CreateChunkOperator(manager.config, manager.storage, manager.snapshotCache, resurrect, rewriteChunks, threads, allowFailures)
} }
} }
// DownloadSequence returns the content represented by a sequence of chunks. // DownloadSequence returns the content represented by a sequence of chunks.
func (manager *SnapshotManager) DownloadSequence(sequence []string) (content []byte) { func (manager *SnapshotManager) DownloadSequence(sequence []string) (content []byte) {
manager.CreateChunkOperator(false, 1, false) manager.CreateChunkOperator(false, false, 1, false)
for _, chunkHash := range sequence { for _, chunkHash := range sequence {
chunk := manager.chunkOperator.Download(chunkHash, 0, true) chunk := manager.chunkOperator.Download(chunkHash, 0, true)
content = append(content, chunk.GetBytes()...) content = append(content, chunk.GetBytes()...)
@@ -654,7 +654,7 @@ func (manager *SnapshotManager) ListSnapshots(snapshotID string, revisionsToList
LOG_DEBUG("LIST_PARAMETERS", "id: %s, revisions: %v, tag: %s, showFiles: %t, showChunks: %t", LOG_DEBUG("LIST_PARAMETERS", "id: %s, revisions: %v, tag: %s, showFiles: %t, showChunks: %t",
snapshotID, revisionsToList, tag, showFiles, showChunks) snapshotID, revisionsToList, tag, showFiles, showChunks)
manager.CreateChunkOperator(false, 1, false) manager.CreateChunkOperator(false, false, 1, false)
defer func() { defer func() {
manager.chunkOperator.Stop() manager.chunkOperator.Stop()
manager.chunkOperator = nil manager.chunkOperator = nil
@@ -760,9 +760,9 @@ func (manager *SnapshotManager) ListSnapshots(snapshotID string, revisionsToList
// CheckSnapshots checks if there is any problem with a snapshot. // CheckSnapshots checks if there is any problem with a snapshot.
func (manager *SnapshotManager) CheckSnapshots(snapshotID string, revisionsToCheck []int, tag string, showStatistics bool, showTabular bool, func (manager *SnapshotManager) CheckSnapshots(snapshotID string, revisionsToCheck []int, tag string, showStatistics bool, showTabular bool,
checkFiles bool, checkChunks, searchFossils bool, resurrect bool, threads int, allowFailures bool) bool { checkFiles bool, checkChunks, searchFossils bool, resurrect bool, rewriteChunks bool, threads int, allowFailures bool) bool {
manager.CreateChunkOperator(resurrect, threads, allowFailures) manager.CreateChunkOperator(resurrect, rewriteChunks, threads, allowFailures)
defer func() { defer func() {
manager.chunkOperator.Stop() manager.chunkOperator.Stop()
manager.chunkOperator = nil manager.chunkOperator = nil
@@ -1336,7 +1336,7 @@ func (manager *SnapshotManager) RetrieveFile(snapshot *Snapshot, file *Entry, la
return true return true
} }
manager.CreateChunkOperator(false, 1, false) manager.CreateChunkOperator(false, false, 1, false)
fileHasher := manager.config.NewFileHasher() fileHasher := manager.config.NewFileHasher()
alternateHash := false alternateHash := false
@@ -1372,6 +1372,11 @@ func (manager *SnapshotManager) RetrieveFile(snapshot *Snapshot, file *Entry, la
} }
} }
if chunk.isBroken {
*lastChunk = nil
return false
}
output(chunk.GetBytes()[start:end]) output(chunk.GetBytes()[start:end])
if alternateHash { if alternateHash {
fileHasher.Write([]byte(hex.EncodeToString([]byte(hash)))) fileHasher.Write([]byte(hex.EncodeToString([]byte(hash))))
@@ -1465,7 +1470,7 @@ func (manager *SnapshotManager) Diff(top string, snapshotID string, revisions []
LOG_DEBUG("DIFF_PARAMETERS", "top: %s, id: %s, revision: %v, path: %s, compareByHash: %t", LOG_DEBUG("DIFF_PARAMETERS", "top: %s, id: %s, revision: %v, path: %s, compareByHash: %t",
top, snapshotID, revisions, filePath, compareByHash) top, snapshotID, revisions, filePath, compareByHash)
manager.CreateChunkOperator(false, 1, false) manager.CreateChunkOperator(false, false, 1, false)
defer func() { defer func() {
manager.chunkOperator.Stop() manager.chunkOperator.Stop()
manager.chunkOperator = nil manager.chunkOperator = nil
@@ -1690,7 +1695,7 @@ func (manager *SnapshotManager) ShowHistory(top string, snapshotID string, revis
LOG_DEBUG("HISTORY_PARAMETERS", "top: %s, id: %s, revisions: %v, path: %s, showLocalHash: %t", LOG_DEBUG("HISTORY_PARAMETERS", "top: %s, id: %s, revisions: %v, path: %s, showLocalHash: %t",
top, snapshotID, revisions, filePath, showLocalHash) top, snapshotID, revisions, filePath, showLocalHash)
manager.CreateChunkOperator(false, 1, false) manager.CreateChunkOperator(false, false, 1, false)
defer func() { defer func() {
manager.chunkOperator.Stop() manager.chunkOperator.Stop()
manager.chunkOperator = nil manager.chunkOperator = nil
@@ -1818,7 +1823,7 @@ func (manager *SnapshotManager) PruneSnapshots(selfID string, snapshotID string,
LOG_WARN("DELETE_OPTIONS", "Tags or retention policy will be ignored if at least one revision is specified") LOG_WARN("DELETE_OPTIONS", "Tags or retention policy will be ignored if at least one revision is specified")
} }
manager.CreateChunkOperator(false, threads, false) manager.CreateChunkOperator(false, false, threads, false)
defer func() { defer func() {
manager.chunkOperator.Stop() manager.chunkOperator.Stop()
manager.chunkOperator = nil manager.chunkOperator = nil
@@ -2594,12 +2599,29 @@ func (manager *SnapshotManager) DownloadFile(path string, derivationKey string)
derivationKey = derivationKey[len(derivationKey)-64:] derivationKey = derivationKey[len(derivationKey)-64:]
} }
err = manager.fileChunk.Decrypt(manager.config.FileKey, derivationKey) err, rewriteNeeded := manager.fileChunk.Decrypt(manager.config.FileKey, derivationKey)
if err != nil { if err != nil {
LOG_ERROR("DOWNLOAD_DECRYPT", "Failed to decrypt the file %s: %v", path, err) LOG_ERROR("DOWNLOAD_DECRYPT", "Failed to decrypt the file %s: %v", path, err)
return nil return nil
} }
if rewriteNeeded && manager.chunkOperator.rewriteChunks {
newChunk := manager.config.GetChunk()
newChunk.Reset(true)
newChunk.Write(manager.fileChunk.GetBytes())
err = newChunk.Encrypt(manager.config.FileKey, derivationKey, true)
if err == nil {
err = manager.storage.UploadFile(0, path, newChunk.GetBytes())
if err != nil {
LOG_WARN("DOWNLOAD_REWRITE", "Failed to re-uploaded the file %s: %v", path, err)
} else{
LOG_INFO("DOWNLOAD_REWRITE", "The file %s has been re-uploaded", path)
}
}
manager.config.PutChunk(newChunk)
}
err = manager.snapshotCache.UploadFile(0, path, manager.fileChunk.GetBytes()) err = manager.snapshotCache.UploadFile(0, path, manager.fileChunk.GetBytes())
if err != nil { if err != nil {
LOG_WARN("DOWNLOAD_FILE_CACHE", "Failed to add the file %s to the snapshot cache: %v", path, err) LOG_WARN("DOWNLOAD_FILE_CACHE", "Failed to add the file %s to the snapshot cache: %v", path, err)