1
0
mirror of https://github.com/gilbertchen/duplicacy synced 2025-12-06 00:03:38 +00:00

Improvements for the copy command

* Added a `-download-threads` option for specifying the number of downloading
  threads
* Show progress log messages during copy
This commit is contained in:
Gilbert Chen
2020-09-24 14:56:19 -04:00
parent 6841c989c6
commit 947006411b
2 changed files with 54 additions and 42 deletions

View File

@@ -1171,9 +1171,14 @@ func copySnapshots(context *cli.Context) {
os.Exit(ArgumentExitCode)
}
threads := context.Int("threads")
if threads < 1 {
threads = 1
uploadingThreads := context.Int("threads")
if uploadingThreads < 1 {
uploadingThreads = 1
}
downloadingThreads := context.Int("download-threads")
if downloadingThreads < 1 {
downloadingThreads = 1
}
repository, source := getRepositoryPreference(context, context.String("from"))
@@ -1181,7 +1186,7 @@ func copySnapshots(context *cli.Context) {
runScript(context, source.Name, "pre")
duplicacy.LOG_INFO("STORAGE_SET", "Source storage set to %s", source.StorageURL)
sourceStorage := duplicacy.CreateStorage(*source, false, threads)
sourceStorage := duplicacy.CreateStorage(*source, false, downloadingThreads)
if sourceStorage == nil {
return
}
@@ -1211,7 +1216,7 @@ func copySnapshots(context *cli.Context) {
}
duplicacy.LOG_INFO("STORAGE_SET", "Destination storage set to %s", destination.StorageURL)
destinationStorage := duplicacy.CreateStorage(*destination, false, threads)
destinationStorage := duplicacy.CreateStorage(*destination, false, uploadingThreads)
if destinationStorage == nil {
return
}
@@ -1236,7 +1241,7 @@ func copySnapshots(context *cli.Context) {
snapshotID = context.String("id")
}
sourceManager.CopySnapshots(destinationManager, snapshotID, revisions, threads)
sourceManager.CopySnapshots(destinationManager, snapshotID, revisions, uploadingThreads, downloadingThreads)
runScript(context, source.Name, "post")
}
@@ -1999,6 +2004,12 @@ func main() {
Usage: "number of uploading threads",
Argument: "<n>",
},
cli.IntFlag{
Name: "download-threads",
Value: 1,
Usage: "number of downloading threads",
Argument: "<n>",
},
cli.StringFlag{
Name: "key",
Usage: "the RSA private key to decrypt file chunks from the source storage",

View File

@@ -1600,7 +1600,7 @@ func (manager *BackupManager) RestoreFile(chunkDownloader *ChunkDownloader, chun
// CopySnapshots copies the specified snapshots from one storage to the other.
func (manager *BackupManager) CopySnapshots(otherManager *BackupManager, snapshotID string,
revisionsToBeCopied []int, threads int) bool {
revisionsToBeCopied []int, uploadingThreads int, downloadingThreads int) bool {
if !manager.config.IsCompatiableWith(otherManager.config) {
LOG_ERROR("CONFIG_INCOMPATIBLE", "Two storages are not compatible for the copy operation")
@@ -1745,63 +1745,64 @@ func (manager *BackupManager) CopySnapshots(otherManager *BackupManager, snapsho
LOG_DEBUG("SNAPSHOT_COPY", "Found %d chunks on destination storage", len(otherChunks))
chunksToCopy := 0
chunksToSkip := 0
var chunksToCopy []string
for chunkHash := range chunks {
otherChunkID := otherManager.config.GetChunkIDFromHash(chunkHash)
if _, found := otherChunks[otherChunkID]; found {
chunksToSkip++
} else {
chunksToCopy++
if _, found := otherChunks[otherChunkID]; !found {
chunksToCopy = append(chunksToCopy, chunkHash)
}
}
LOG_DEBUG("SNAPSHOT_COPY", "Chunks to copy = %d, to skip = %d, total = %d", chunksToCopy, chunksToSkip, chunksToCopy+chunksToSkip)
LOG_DEBUG("SNAPSHOT_COPY", "Total chunks in source snapshot revisions = %d\n", len(chunks))
LOG_INFO("SNAPSHOT_COPY", "Chunks to copy: %d, to skip: %d, total: %d", len(chunksToCopy), len(chunks) - len(chunksToCopy), len(chunks))
chunkDownloader := CreateChunkDownloader(manager.config, manager.storage, nil, false, threads, false)
chunkDownloader := CreateChunkDownloader(manager.config, manager.storage, nil, false, downloadingThreads, false)
chunkUploader := CreateChunkUploader(otherManager.config, otherManager.storage, nil, threads,
var uploadedBytes int64
startTime := time.Now()
copiedChunks := 0
chunkUploader := CreateChunkUploader(otherManager.config, otherManager.storage, nil, uploadingThreads,
func(chunk *Chunk, chunkIndex int, skipped bool, chunkSize int, uploadSize int) {
if skipped {
LOG_INFO("SNAPSHOT_COPY", "Chunk %s (%d/%d) exists at the destination", chunk.GetID(), chunkIndex, len(chunks))
} else {
LOG_INFO("SNAPSHOT_COPY", "Chunk %s (%d/%d) copied to the destination", chunk.GetID(), chunkIndex, len(chunks))
action := "Skipped"
if !skipped {
copiedChunks++
action = "Copied"
}
atomic.AddInt64(&uploadedBytes, int64(chunkSize))
elapsedTime := time.Now().Sub(startTime).Seconds()
speed := int64(float64(atomic.LoadInt64(&uploadedBytes)) / elapsedTime)
remainingTime := int64(float64(len(chunksToCopy) - chunkIndex - 1) / float64(chunkIndex + 1) * elapsedTime)
percentage := float64(chunkIndex + 1) / float64(len(chunksToCopy)) * 100.0
LOG_INFO("COPY_PROGRESS", "%s chunk %s (%d/%d) %sB/s %s %.1f%%",
action, chunk.GetID(), chunkIndex + 1, len(chunksToCopy),
PrettySize(speed), PrettyTime(remainingTime), percentage)
otherManager.config.PutChunk(chunk)
})
chunkUploader.Start()
totalCopied := 0
totalSkipped := 0
chunkIndex := 0
for chunkHash, isSnapshot := range chunks {
chunkIndex++
for _, chunkHash := range chunksToCopy {
chunkDownloader.AddChunk(chunkHash)
}
for i, chunkHash := range chunksToCopy {
chunkID := manager.config.GetChunkIDFromHash(chunkHash)
newChunkID := otherManager.config.GetChunkIDFromHash(chunkHash)
if _, found := otherChunks[newChunkID]; !found {
LOG_DEBUG("SNAPSHOT_COPY", "Copying chunk %s to %s", chunkID, newChunkID)
i := chunkDownloader.AddChunk(chunkHash)
chunk := chunkDownloader.WaitForChunk(i)
newChunk := otherManager.config.GetChunk()
newChunk.Reset(true)
newChunk.Write(chunk.GetBytes())
newChunk.isSnapshot = isSnapshot
chunkUploader.StartChunk(newChunk, chunkIndex)
totalCopied++
} else {
LOG_INFO("SNAPSHOT_COPY", "Chunk %s (%d/%d) skipped at the destination", chunkID, chunkIndex, len(chunks))
totalSkipped++
}
LOG_DEBUG("SNAPSHOT_COPY", "Copying chunk %s to %s", chunkID, newChunkID)
chunk := chunkDownloader.WaitForChunk(i)
newChunk := otherManager.config.GetChunk()
newChunk.Reset(true)
newChunk.Write(chunk.GetBytes())
newChunk.isSnapshot = chunks[chunkHash]
chunkUploader.StartChunk(newChunk, i)
}
chunkDownloader.Stop()
chunkUploader.Stop()
LOG_INFO("SNAPSHOT_COPY", "Copy complete, %d total chunks, %d chunks copied, %d skipped", totalCopied+totalSkipped, totalCopied, totalSkipped)
LOG_INFO("SNAPSHOT_COPY", "Copied %d new chunks and skipped %d existing chunks", copiedChunks, len(chunks) - copiedChunks)
for _, snapshot := range snapshots {
if revisionMap[snapshot.ID][snapshot.Revision] == false {