diff --git a/duplicacy/duplicacy_main.go b/duplicacy/duplicacy_main.go index 9618451..ae7c492 100644 --- a/duplicacy/duplicacy_main.go +++ b/duplicacy/duplicacy_main.go @@ -894,7 +894,12 @@ func checkSnapshots(context *cli.Context) { runScript(context, preference.Name, "pre") - storage := duplicacy.CreateStorage(*preference, false, 1) + threads := context.Int("threads") + if threads < 1 { + threads = 1 + } + + storage := duplicacy.CreateStorage(*preference, false, threads) if storage == nil { return } @@ -922,11 +927,12 @@ func checkSnapshots(context *cli.Context) { showStatistics := context.Bool("stats") showTabular := context.Bool("tabular") checkFiles := context.Bool("files") + checkChunks := context.Bool("chunks") searchFossils := context.Bool("fossils") resurrect := context.Bool("resurrect") backupManager.SetupSnapshotCache(preference.Name) - backupManager.SnapshotManager.CheckSnapshots(id, revisions, tag, showStatistics, showTabular, checkFiles, searchFossils, resurrect) + backupManager.SnapshotManager.CheckSnapshots(id, revisions, tag, showStatistics, showTabular, checkFiles, checkChunks, searchFossils, resurrect, threads) runScript(context, preference.Name, "post") } @@ -1589,6 +1595,10 @@ func main() { Name: "files", Usage: "verify the integrity of every file", }, + cli.BoolFlag{ + Name: "chunks", + Usage: "verify the integrity of every chunk", + }, cli.BoolFlag{ Name: "stats", Usage: "show deduplication statistics (imply -all and all revisions)", @@ -1607,6 +1617,12 @@ func main() { Usage: "the RSA private key to decrypt file chunks", Argument: "", }, + cli.IntFlag{ + Name: "threads", + Value: 1, + Usage: "number of threads used to verify chunks", + Argument: "", + }, }, Usage: "Check the integrity of snapshots", ArgsUsage: " ", diff --git a/src/duplicacy_chunkdownloader.go b/src/duplicacy_chunkdownloader.go index 5cbef08..468b2cf 100644 --- a/src/duplicacy_chunkdownloader.go +++ b/src/duplicacy_chunkdownloader.go @@ -126,6 +126,7 @@ func (downloader *ChunkDownloader) AddFiles(snapshot *Snapshot, files []*Entry) // AddChunk adds a single chunk the download list. func (downloader *ChunkDownloader) AddChunk(chunkHash string) int { + task := ChunkDownloadTask{ chunkIndex: len(downloader.taskList), chunkHash: chunkHash, @@ -253,6 +254,47 @@ func (downloader *ChunkDownloader) WaitForChunk(chunkIndex int) (chunk *Chunk) { return downloader.taskList[chunkIndex].chunk } +// WaitForCompletion waits until all chunks have been downloaded +func (downloader *ChunkDownloader) WaitForCompletion() { + + // Tasks in completedTasks have not been counted by numberOfActiveChunks + downloader.numberOfActiveChunks -= len(downloader.completedTasks) + + // find the completed task with the largest index; we'll start from the next index + for index := range downloader.completedTasks { + if downloader.lastChunkIndex < index { + downloader.lastChunkIndex = index + } + } + + // Looping until there isn't a download task in progress + for downloader.numberOfActiveChunks > 0 || downloader.lastChunkIndex + 1 < len(downloader.taskList) { + + // Wait for a completion event first + if downloader.numberOfActiveChunks > 0 { + completion := <-downloader.completionChannel + downloader.config.PutChunk(completion.chunk) + downloader.numberOfActiveChunks-- + downloader.numberOfDownloadedChunks++ + downloader.numberOfDownloadingChunks-- + } + + // Pass the tasks one by one to the download queue + if downloader.lastChunkIndex + 1 < len(downloader.taskList) { + task := &downloader.taskList[downloader.lastChunkIndex + 1] + if task.isDownloading { + downloader.lastChunkIndex++ + continue + } + downloader.taskQueue <- *task + task.isDownloading = true + downloader.numberOfDownloadingChunks++ + downloader.numberOfActiveChunks++ + downloader.lastChunkIndex++ + } + } +} + // Stop terminates all downloading goroutines func (downloader *ChunkDownloader) Stop() { for downloader.numberOfDownloadingChunks > 0 { diff --git a/src/duplicacy_snapshotmanager.go b/src/duplicacy_snapshotmanager.go index b46dc46..be3c635 100644 --- a/src/duplicacy_snapshotmanager.go +++ b/src/duplicacy_snapshotmanager.go @@ -653,6 +653,51 @@ func (manager *SnapshotManager) GetSnapshotChunks(snapshot *Snapshot, keepChunkH return chunks } +// GetSnapshotChunkHashes has an option to retrieve chunk hashes in addition to chunk ids. +func (manager *SnapshotManager) GetSnapshotChunkHashes(snapshot *Snapshot, chunkHashes *map[string]bool, chunkIDs map[string]bool) { + + for _, chunkHash := range snapshot.FileSequence { + if chunkHashes != nil { + (*chunkHashes)[chunkHash] = true + } + chunkIDs[manager.config.GetChunkIDFromHash(chunkHash)] = true + } + + for _, chunkHash := range snapshot.ChunkSequence { + if chunkHashes != nil { + (*chunkHashes)[chunkHash] = true + } + chunkIDs[manager.config.GetChunkIDFromHash(chunkHash)] = true + } + + for _, chunkHash := range snapshot.LengthSequence { + if chunkHashes != nil { + (*chunkHashes)[chunkHash] = true + } + chunkIDs[manager.config.GetChunkIDFromHash(chunkHash)] = true + } + + if len(snapshot.ChunkHashes) == 0 { + + description := manager.DownloadSequence(snapshot.ChunkSequence) + err := snapshot.LoadChunks(description) + if err != nil { + LOG_ERROR("SNAPSHOT_CHUNK", "Failed to load chunks for snapshot %s at revision %d: %v", + snapshot.ID, snapshot.Revision, err) + return + } + } + + for _, chunkHash := range snapshot.ChunkHashes { + if chunkHashes != nil { + (*chunkHashes)[chunkHash] = true + } + chunkIDs[manager.config.GetChunkIDFromHash(chunkHash)] = true + } + + snapshot.ClearChunks() +} + // ListSnapshots shows the information about a snapshot. func (manager *SnapshotManager) ListSnapshots(snapshotID string, revisionsToList []int, tag string, showFiles bool, showChunks bool) int { @@ -757,7 +802,9 @@ func (manager *SnapshotManager) ListSnapshots(snapshotID string, revisionsToList // ListSnapshots shows the information about a snapshot. func (manager *SnapshotManager) CheckSnapshots(snapshotID string, revisionsToCheck []int, tag string, showStatistics bool, showTabular bool, - checkFiles bool, searchFossils bool, resurrect bool) bool { + checkFiles bool, checkChunks, searchFossils bool, resurrect bool, threads int) bool { + + manager.chunkDownloader = CreateChunkDownloader(manager.config, manager.storage, manager.snapshotCache, false, threads) LOG_DEBUG("LIST_PARAMETERS", "id: %s, revisions: %v, tag: %s, showStatistics: %t, showTabular: %t, checkFiles: %t, searchFossils: %t, resurrect: %t", snapshotID, revisionsToCheck, tag, showStatistics, showTabular, checkFiles, searchFossils, resurrect) @@ -839,6 +886,12 @@ func (manager *SnapshotManager) CheckSnapshots(snapshotID string, revisionsToChe } LOG_INFO("SNAPSHOT_CHECK", "Total chunk size is %s in %d chunks", PrettyNumber(totalChunkSize), len(chunkSizeMap)) + var allChunkHashes *map[string]bool + if checkChunks && !checkFiles { + m := make(map[string]bool) + allChunkHashes = &m + } + for snapshotID = range snapshotMap { for _, snapshot := range snapshotMap[snapshotID] { @@ -850,9 +903,7 @@ func (manager *SnapshotManager) CheckSnapshots(snapshotID string, revisionsToChe } chunks := make(map[string]bool) - for _, chunkID := range manager.GetSnapshotChunks(snapshot, false) { - chunks[chunkID] = true - } + manager.GetSnapshotChunkHashes(snapshot, allChunkHashes, chunks) missingChunks := 0 for chunkID := range chunks { @@ -946,6 +997,14 @@ func (manager *SnapshotManager) CheckSnapshots(snapshotID string, revisionsToChe manager.ShowStatistics(snapshotMap, chunkSizeMap, chunkUniqueMap, chunkSnapshotMap) } + if checkChunks && !checkFiles { + LOG_INFO("SNAPSHOT_VERIFY", "Verifying %d chunks", len(*allChunkHashes)) + for chunkHash := range *allChunkHashes { + manager.chunkDownloader.AddChunk(chunkHash) + } + manager.chunkDownloader.WaitForCompletion() + LOG_INFO("SNAPSHOT_VERIFY", "All %d chunks have been successfully verified", len(*allChunkHashes)) + } return true }