1
0
mirror of https://github.com/gilbertchen/duplicacy synced 2025-12-11 13:53:16 +00:00

Implement multi-threaded pruning

This commit is contained in:
Gilbert Chen
2018-06-04 21:52:07 -04:00
parent 7e021f26d3
commit dd53b4797e
5 changed files with 248 additions and 94 deletions

View File

@@ -175,6 +175,7 @@ type SnapshotManager struct {
snapshotCache *FileStorage
chunkDownloader *ChunkDownloader
chunkOperator *ChunkOperator
}
// CreateSnapshotManager creates a snapshot manager
@@ -1503,36 +1504,11 @@ func (manager *SnapshotManager) ShowHistory(top string, snapshotID string, revis
}
// fossilizeChunk turns the chunk into a fossil.
func (manager *SnapshotManager) fossilizeChunk(chunkID string, filePath string,
exclusive bool, collection *FossilCollection) bool {
func (manager *SnapshotManager) fossilizeChunk(chunkID string, filePath string, exclusive bool) bool {
if exclusive {
err := manager.storage.DeleteFile(0, filePath)
if err != nil {
LOG_ERROR("CHUNK_DELETE", "Failed to remove the chunk %s: %v", chunkID, err)
return false
} else {
LOG_TRACE("CHUNK_DELETE", "Deleted chunk file %s", chunkID)
}
manager.chunkOperator.Delete(chunkID, filePath)
} else {
fossilPath := filePath + ".fsl"
err := manager.storage.MoveFile(0, filePath, fossilPath)
if err != nil {
if _, exist, _, _ := manager.storage.FindChunk(0, chunkID, true); exist {
err := manager.storage.DeleteFile(0, filePath)
if err == nil {
LOG_TRACE("CHUNK_DELETE", "Deleted chunk file %s as the fossil already exists", chunkID)
}
} else {
LOG_ERROR("CHUNK_DELETE", "Failed to fossilize the chunk %s: %v", chunkID, err)
return false
}
} else {
LOG_TRACE("CHUNK_FOSSILIZE", "Fossilized chunk %s", chunkID)
}
collection.AddFossil(fossilPath)
manager.chunkOperator.Fossilize(chunkID, filePath)
}
return true
@@ -1541,6 +1517,7 @@ func (manager *SnapshotManager) fossilizeChunk(chunkID string, filePath string,
// resurrectChunk turns the fossil back into a chunk
func (manager *SnapshotManager) resurrectChunk(fossilPath string, chunkID string) bool {
manager.chunkOperator.Resurrect(chunkID, fossilPath)
chunkPath, exist, _, err := manager.storage.FindChunk(0, chunkID, false)
if err != nil {
LOG_ERROR("CHUNK_FIND", "Failed to locate the path for the chunk %s: %v", chunkID, err)
@@ -1581,7 +1558,7 @@ func (manager *SnapshotManager) resurrectChunk(fossilPath string, chunkID string
func (manager *SnapshotManager) PruneSnapshots(selfID string, snapshotID string, revisionsToBeDeleted []int,
tags []string, retentions []string,
exhaustive bool, exclusive bool, ignoredIDs []string,
dryRun bool, deleteOnly bool, collectOnly bool) bool {
dryRun bool, deleteOnly bool, collectOnly bool, threads int) bool {
LOG_DEBUG("DELETE_PARAMETERS",
"id: %s, revisions: %v, tags: %v, retentions: %v, exhaustive: %t, exclusive: %t, "+
@@ -1593,6 +1570,8 @@ func (manager *SnapshotManager) PruneSnapshots(selfID string, snapshotID string,
LOG_WARN("DELETE_OPTIONS", "Tags or retention policy will be ignored if at least one revision is specified")
}
manager.chunkOperator = CreateChunkOperator(manager.storage, threads)
prefPath := GetDuplicacyPreferencePath()
logDir := path.Join(prefPath, "logs")
err := os.MkdirAll(logDir, 0700)
@@ -1796,20 +1775,15 @@ func (manager *SnapshotManager) PruneSnapshots(selfID string, snapshotID string,
continue
}
manager.resurrectChunk(fossil, chunk)
manager.chunkOperator.Resurrect(chunk, fossil)
fmt.Fprintf(logFile, "Resurrected fossil %s (collection %s)\n", chunk, collectionName)
} else {
if dryRun {
LOG_INFO("FOSSIL_DELETE", "The chunk %s would be permanently removed", chunk)
} else {
err = manager.storage.DeleteFile(0, fossil)
if err != nil {
LOG_WARN("FOSSIL_DELETE", "The chunk %s could not be removed: %v", chunk, err)
} else {
LOG_INFO("FOSSIL_DELETE", "The chunk %s has been permanently removed", chunk)
fmt.Fprintf(logFile, "Deleted fossil %s (collection %s)\n", chunk, collectionName)
}
manager.chunkOperator.Delete(chunk, fossil)
fmt.Fprintf(logFile, "Deleted fossil %s (collection %s)\n", chunk, collectionName)
}
}
}
@@ -1820,8 +1794,7 @@ func (manager *SnapshotManager) PruneSnapshots(selfID string, snapshotID string,
LOG_INFO("TEMPORARY_DELETE", "The temporary file %s would be deleted", temporary)
} else {
// Fail silently, since temporary files are supposed to be renamed or deleted after upload is done
_ = manager.storage.DeleteFile(0, temporary)
LOG_INFO("TEMPORARY_DELETE", "The temporary file %s has been deleted", temporary)
manager.chunkOperator.Delete("", temporary)
fmt.Fprintf(logFile, "Deleted temporary %s (collection %s)\n", temporary, collectionName)
}
}
@@ -1949,12 +1922,17 @@ func (manager *SnapshotManager) PruneSnapshots(selfID string, snapshotID string,
if exhaustive {
success = manager.pruneSnapshotsExhaustive(referencedFossils, allSnapshots, collection, logFile, dryRun, exclusive)
} else {
success = manager.pruneSnapshots(allSnapshots, collection, logFile, dryRun, exclusive)
success = manager.pruneSnapshotsNonExhaustive(allSnapshots, collection, logFile, dryRun, exclusive)
}
if !success {
return false
}
manager.chunkOperator.Stop()
for _, fossil := range manager.chunkOperator.fossils {
collection.AddFossil(fossil)
}
// Save the fossil collection if it is not empty.
if !collection.IsEmpty() && !dryRun {
collection.EndTime = time.Now().Unix()
@@ -2028,7 +2006,7 @@ func (manager *SnapshotManager) PruneSnapshots(selfID string, snapshotID string,
// pruneSnapshots in non-exhaustive mode, only chunks that exist in the
// snapshots to be deleted but not other are identified as unreferenced chunks.
func (manager *SnapshotManager) pruneSnapshots(allSnapshots map[string][]*Snapshot, collection *FossilCollection, logFile io.Writer, dryRun, exclusive bool) bool {
func (manager *SnapshotManager) pruneSnapshotsNonExhaustive(allSnapshots map[string][]*Snapshot, collection *FossilCollection, logFile io.Writer, dryRun, exclusive bool) bool {
targetChunks := make(map[string]bool)
// Now build all chunks referened by snapshot not deleted
@@ -2085,18 +2063,7 @@ func (manager *SnapshotManager) pruneSnapshots(allSnapshots map[string][]*Snapsh
continue
}
chunkPath, exist, _, err := manager.storage.FindChunk(0, chunk, false)
if err != nil {
LOG_ERROR("CHUNK_FIND", "Failed to locate the path for the chunk %s: %v", chunk, err)
return false
}
if !exist {
LOG_WARN("CHUNK_MISSING", "The chunk %s does not exist", chunk)
continue
}
manager.fossilizeChunk(chunk, chunkPath, exclusive, collection)
manager.fossilizeChunk(chunk, "", exclusive)
if exclusive {
fmt.Fprintf(logFile, "Deleted chunk %s (exclusive mode)\n", chunk)
} else {
@@ -2158,12 +2125,7 @@ func (manager *SnapshotManager) pruneSnapshotsExhaustive(referencedFossils map[s
if exclusive {
// In exclusive mode, we assume no other restore operation is running concurrently.
err := manager.storage.DeleteFile(0, chunkDir+file)
if err != nil {
LOG_ERROR("CHUNK_TEMPORARY", "Failed to remove the temporary file %s: %v", file, err)
return false
}
LOG_DEBUG("CHUNK_TEMPORARY", "Deleted temporary file %s", file)
manager.chunkOperator.Delete("", chunkDir+file)
fmt.Fprintf(logFile, "Deleted temporary %s\n", file)
} else {
collection.AddTemporary(file)
@@ -2182,7 +2144,7 @@ func (manager *SnapshotManager) pruneSnapshotsExhaustive(referencedFossils map[s
chunk = strings.Replace(chunk, ".fsl", "", -1)
if _, found := referencedChunks[chunk]; found {
manager.resurrectChunk(chunkDir+file, chunk)
manager.chunkOperator.Resurrect(chunk, chunkDir + file)
} else {
collection.AddFossil(chunkDir + file)
LOG_DEBUG("FOSSIL_FIND", "Found unreferenced fossil %s", file)
@@ -2206,7 +2168,7 @@ func (manager *SnapshotManager) pruneSnapshotsExhaustive(referencedFossils map[s
continue
}
manager.fossilizeChunk(chunk, chunkDir+file, exclusive, collection)
manager.fossilizeChunk(chunk, chunkDir+file, exclusive)
if exclusive {
fmt.Fprintf(logFile, "Deleted chunk %s (exclusive mode)\n", chunk)
} else {
@@ -2222,14 +2184,8 @@ func (manager *SnapshotManager) pruneSnapshotsExhaustive(referencedFossils map[s
}
// This is a redundant chunk file (for instance D3/495A8D and D3/49/5A8D )
err := manager.storage.DeleteFile(0, chunkDir+file)
if err != nil {
LOG_WARN("CHUNK_DELETE", "Failed to remove the redundant chunk file %s: %v", file, err)
} else {
LOG_TRACE("CHUNK_DELETE", "Removed the redundant chunk file %s", file)
fmt.Fprintf(logFile, "Deleted redundant chunk %s\n", file)
}
manager.chunkOperator.Delete(chunk, chunkDir+file)
fmt.Fprintf(logFile, "Deleted redundant chunk %s\n", file)
} else {
referencedChunks[chunk] = true
LOG_DEBUG("CHUNK_KEEP", "Chunk %s is referenced", chunk)