diff --git a/duplicacy/duplicacy_main.go b/duplicacy/duplicacy_main.go index 5f3736c..dbf2b5a 100644 --- a/duplicacy/duplicacy_main.go +++ b/duplicacy/duplicacy_main.go @@ -1171,9 +1171,14 @@ func copySnapshots(context *cli.Context) { os.Exit(ArgumentExitCode) } - threads := context.Int("threads") - if threads < 1 { - threads = 1 + uploadingThreads := context.Int("threads") + if uploadingThreads < 1 { + uploadingThreads = 1 + } + + downloadingThreads := context.Int("download-threads") + if downloadingThreads < 1 { + downloadingThreads = 1 } repository, source := getRepositoryPreference(context, context.String("from")) @@ -1181,7 +1186,7 @@ func copySnapshots(context *cli.Context) { runScript(context, source.Name, "pre") duplicacy.LOG_INFO("STORAGE_SET", "Source storage set to %s", source.StorageURL) - sourceStorage := duplicacy.CreateStorage(*source, false, threads) + sourceStorage := duplicacy.CreateStorage(*source, false, downloadingThreads) if sourceStorage == nil { return } @@ -1211,7 +1216,7 @@ func copySnapshots(context *cli.Context) { } duplicacy.LOG_INFO("STORAGE_SET", "Destination storage set to %s", destination.StorageURL) - destinationStorage := duplicacy.CreateStorage(*destination, false, threads) + destinationStorage := duplicacy.CreateStorage(*destination, false, uploadingThreads) if destinationStorage == nil { return } @@ -1236,7 +1241,7 @@ func copySnapshots(context *cli.Context) { snapshotID = context.String("id") } - sourceManager.CopySnapshots(destinationManager, snapshotID, revisions, threads) + sourceManager.CopySnapshots(destinationManager, snapshotID, revisions, uploadingThreads, downloadingThreads) runScript(context, source.Name, "post") } @@ -1999,6 +2004,12 @@ func main() { Usage: "number of uploading threads", Argument: "", }, + cli.IntFlag{ + Name: "download-threads", + Value: 1, + Usage: "number of downloading threads", + Argument: "", + }, cli.StringFlag{ Name: "key", Usage: "the RSA private key to decrypt file chunks from the source storage", diff --git a/src/duplicacy_backupmanager.go b/src/duplicacy_backupmanager.go index dd04ba3..f460625 100644 --- a/src/duplicacy_backupmanager.go +++ b/src/duplicacy_backupmanager.go @@ -1600,7 +1600,7 @@ func (manager *BackupManager) RestoreFile(chunkDownloader *ChunkDownloader, chun // CopySnapshots copies the specified snapshots from one storage to the other. func (manager *BackupManager) CopySnapshots(otherManager *BackupManager, snapshotID string, - revisionsToBeCopied []int, threads int) bool { + revisionsToBeCopied []int, uploadingThreads int, downloadingThreads int) bool { if !manager.config.IsCompatiableWith(otherManager.config) { LOG_ERROR("CONFIG_INCOMPATIBLE", "Two storages are not compatible for the copy operation") @@ -1745,63 +1745,64 @@ func (manager *BackupManager) CopySnapshots(otherManager *BackupManager, snapsho LOG_DEBUG("SNAPSHOT_COPY", "Found %d chunks on destination storage", len(otherChunks)) - chunksToCopy := 0 - chunksToSkip := 0 + var chunksToCopy []string for chunkHash := range chunks { otherChunkID := otherManager.config.GetChunkIDFromHash(chunkHash) - if _, found := otherChunks[otherChunkID]; found { - chunksToSkip++ - } else { - chunksToCopy++ + if _, found := otherChunks[otherChunkID]; !found { + chunksToCopy = append(chunksToCopy, chunkHash) } } - LOG_DEBUG("SNAPSHOT_COPY", "Chunks to copy = %d, to skip = %d, total = %d", chunksToCopy, chunksToSkip, chunksToCopy+chunksToSkip) - LOG_DEBUG("SNAPSHOT_COPY", "Total chunks in source snapshot revisions = %d\n", len(chunks)) + LOG_INFO("SNAPSHOT_COPY", "Chunks to copy: %d, to skip: %d, total: %d", len(chunksToCopy), len(chunks) - len(chunksToCopy), len(chunks)) - chunkDownloader := CreateChunkDownloader(manager.config, manager.storage, nil, false, threads, false) + chunkDownloader := CreateChunkDownloader(manager.config, manager.storage, nil, false, downloadingThreads, false) - chunkUploader := CreateChunkUploader(otherManager.config, otherManager.storage, nil, threads, + var uploadedBytes int64 + startTime := time.Now() + + copiedChunks := 0 + chunkUploader := CreateChunkUploader(otherManager.config, otherManager.storage, nil, uploadingThreads, func(chunk *Chunk, chunkIndex int, skipped bool, chunkSize int, uploadSize int) { - if skipped { - LOG_INFO("SNAPSHOT_COPY", "Chunk %s (%d/%d) exists at the destination", chunk.GetID(), chunkIndex, len(chunks)) - } else { - LOG_INFO("SNAPSHOT_COPY", "Chunk %s (%d/%d) copied to the destination", chunk.GetID(), chunkIndex, len(chunks)) + action := "Skipped" + if !skipped { + copiedChunks++ + action = "Copied" } + + atomic.AddInt64(&uploadedBytes, int64(chunkSize)) + + elapsedTime := time.Now().Sub(startTime).Seconds() + speed := int64(float64(atomic.LoadInt64(&uploadedBytes)) / elapsedTime) + remainingTime := int64(float64(len(chunksToCopy) - chunkIndex - 1) / float64(chunkIndex + 1) * elapsedTime) + percentage := float64(chunkIndex + 1) / float64(len(chunksToCopy)) * 100.0 + LOG_INFO("COPY_PROGRESS", "%s chunk %s (%d/%d) %sB/s %s %.1f%%", + action, chunk.GetID(), chunkIndex + 1, len(chunksToCopy), + PrettySize(speed), PrettyTime(remainingTime), percentage) otherManager.config.PutChunk(chunk) }) chunkUploader.Start() - totalCopied := 0 - totalSkipped := 0 - chunkIndex := 0 - - for chunkHash, isSnapshot := range chunks { - chunkIndex++ + for _, chunkHash := range chunksToCopy { + chunkDownloader.AddChunk(chunkHash) + } + for i, chunkHash := range chunksToCopy { chunkID := manager.config.GetChunkIDFromHash(chunkHash) newChunkID := otherManager.config.GetChunkIDFromHash(chunkHash) - if _, found := otherChunks[newChunkID]; !found { - LOG_DEBUG("SNAPSHOT_COPY", "Copying chunk %s to %s", chunkID, newChunkID) - i := chunkDownloader.AddChunk(chunkHash) - chunk := chunkDownloader.WaitForChunk(i) - newChunk := otherManager.config.GetChunk() - newChunk.Reset(true) - newChunk.Write(chunk.GetBytes()) - newChunk.isSnapshot = isSnapshot - chunkUploader.StartChunk(newChunk, chunkIndex) - totalCopied++ - } else { - LOG_INFO("SNAPSHOT_COPY", "Chunk %s (%d/%d) skipped at the destination", chunkID, chunkIndex, len(chunks)) - totalSkipped++ - } + LOG_DEBUG("SNAPSHOT_COPY", "Copying chunk %s to %s", chunkID, newChunkID) + chunk := chunkDownloader.WaitForChunk(i) + newChunk := otherManager.config.GetChunk() + newChunk.Reset(true) + newChunk.Write(chunk.GetBytes()) + newChunk.isSnapshot = chunks[chunkHash] + chunkUploader.StartChunk(newChunk, i) } chunkDownloader.Stop() chunkUploader.Stop() - LOG_INFO("SNAPSHOT_COPY", "Copy complete, %d total chunks, %d chunks copied, %d skipped", totalCopied+totalSkipped, totalCopied, totalSkipped) + LOG_INFO("SNAPSHOT_COPY", "Copied %d new chunks and skipped %d existing chunks", copiedChunks, len(chunks) - copiedChunks) for _, snapshot := range snapshots { if revisionMap[snapshot.ID][snapshot.Revision] == false {