mirror of
https://github.com/gilbertchen/duplicacy
synced 2025-12-10 21:33:19 +00:00
A few fixes for multi-threaded pruning
This commit is contained in:
@@ -69,12 +69,19 @@ func CreateChunkOperator(storage Storage, threads int) *ChunkOperator {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (operator *ChunkOperator) Stop() {
|
func (operator *ChunkOperator) Stop() {
|
||||||
|
if atomic.LoadInt64(&operator.numberOfActiveTasks) < 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
for atomic.LoadInt64(&operator.numberOfActiveTasks) > 0 {
|
for atomic.LoadInt64(&operator.numberOfActiveTasks) > 0 {
|
||||||
time.Sleep(100 * time.Millisecond)
|
time.Sleep(100 * time.Millisecond)
|
||||||
}
|
}
|
||||||
for i := 0; i < operator.threads; i++ {
|
for i := 0; i < operator.threads; i++ {
|
||||||
operator.stopChannel <- false
|
operator.stopChannel <- false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Assign -1 to numberOfActiveTasks so Stop() can be called multiple times
|
||||||
|
atomic.AddInt64(&operator.numberOfActiveTasks, int64(-1))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (operator *ChunkOperator) AddTask(operation int, chunkID string, filePath string) {
|
func (operator *ChunkOperator) AddTask(operation int, chunkID string, filePath string) {
|
||||||
@@ -134,7 +141,6 @@ func (operator *ChunkOperator) Run(threadIndex int, task ChunkOperatorTask) {
|
|||||||
LOG_DEBUG("CHUNK_FIND", "Chunk %s exists in the storage", task.chunkID)
|
LOG_DEBUG("CHUNK_FIND", "Chunk %s exists in the storage", task.chunkID)
|
||||||
}
|
}
|
||||||
} else if task.operation == ChunkOperationDelete {
|
} else if task.operation == ChunkOperationDelete {
|
||||||
// In exclusive mode, we assume no other restore operation is running concurrently.
|
|
||||||
err := operator.storage.DeleteFile(threadIndex, task.filePath)
|
err := operator.storage.DeleteFile(threadIndex, task.filePath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
LOG_WARN("CHUNK_DELETE", "Failed to remove the file %s: %v", task.filePath, err)
|
LOG_WARN("CHUNK_DELETE", "Failed to remove the file %s: %v", task.filePath, err)
|
||||||
@@ -160,7 +166,7 @@ func (operator *ChunkOperator) Run(threadIndex int, task ChunkOperatorTask) {
|
|||||||
LOG_ERROR("CHUNK_DELETE", "Failed to fossilize the chunk %s: %v", task.chunkID, err)
|
LOG_ERROR("CHUNK_DELETE", "Failed to fossilize the chunk %s: %v", task.chunkID, err)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
LOG_TRACE("CHUNK_FOSSILIZE", "Fossilized chunk %s", task.chunkID)
|
LOG_TRACE("CHUNK_FOSSILIZE", "The chunk %s has been marked as a fossil", task.chunkID)
|
||||||
operator.fossilsLock.Lock()
|
operator.fossilsLock.Lock()
|
||||||
operator.fossils = append(operator.fossils, fossilPath)
|
operator.fossils = append(operator.fossils, fossilPath)
|
||||||
operator.fossilsLock.Unlock()
|
operator.fossilsLock.Unlock()
|
||||||
|
|||||||
@@ -1571,6 +1571,7 @@ func (manager *SnapshotManager) PruneSnapshots(selfID string, snapshotID string,
|
|||||||
}
|
}
|
||||||
|
|
||||||
manager.chunkOperator = CreateChunkOperator(manager.storage, threads)
|
manager.chunkOperator = CreateChunkOperator(manager.storage, threads)
|
||||||
|
defer manager.chunkOperator.Stop()
|
||||||
|
|
||||||
prefPath := GetDuplicacyPreferencePath()
|
prefPath := GetDuplicacyPreferencePath()
|
||||||
logDir := path.Join(prefPath, "logs")
|
logDir := path.Join(prefPath, "logs")
|
||||||
@@ -1934,7 +1935,7 @@ func (manager *SnapshotManager) PruneSnapshots(selfID string, snapshotID string,
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Save the fossil collection if it is not empty.
|
// Save the fossil collection if it is not empty.
|
||||||
if !collection.IsEmpty() && !dryRun {
|
if !collection.IsEmpty() && !dryRun && !exclusive {
|
||||||
collection.EndTime = time.Now().Unix()
|
collection.EndTime = time.Now().Unix()
|
||||||
|
|
||||||
collectionNumber := maxCollectionNumber + 1
|
collectionNumber := maxCollectionNumber + 1
|
||||||
@@ -2135,19 +2136,33 @@ func (manager *SnapshotManager) pruneSnapshotsExhaustive(referencedFossils map[s
|
|||||||
// This is a fossil. If it is unreferenced, it can be a result of failing to save the fossil
|
// This is a fossil. If it is unreferenced, it can be a result of failing to save the fossil
|
||||||
// collection file after making it a fossil.
|
// collection file after making it a fossil.
|
||||||
if _, found := referencedFossils[file]; !found {
|
if _, found := referencedFossils[file]; !found {
|
||||||
if dryRun {
|
|
||||||
LOG_INFO("FOSSIL_UNREFERENCED", "Found unreferenced fossil %s", file)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
chunk := strings.Replace(file, "/", "", -1)
|
chunk := strings.Replace(file, "/", "", -1)
|
||||||
chunk = strings.Replace(chunk, ".fsl", "", -1)
|
chunk = strings.Replace(chunk, ".fsl", "", -1)
|
||||||
|
|
||||||
if _, found := referencedChunks[chunk]; found {
|
if _, found := referencedChunks[chunk]; found {
|
||||||
|
|
||||||
|
if dryRun {
|
||||||
|
LOG_INFO("FOSSIL_REFERENCED", "Found referenced fossil %s", file)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
manager.chunkOperator.Resurrect(chunk, chunkDir + file)
|
manager.chunkOperator.Resurrect(chunk, chunkDir + file)
|
||||||
|
fmt.Fprintf(logFile, "Found referenced fossil %s\n", file)
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
collection.AddFossil(chunkDir + file)
|
|
||||||
LOG_DEBUG("FOSSIL_FIND", "Found unreferenced fossil %s", file)
|
if dryRun {
|
||||||
|
LOG_INFO("FOSSIL_UNREFERENCED", "Found unreferenced fossil %s", file)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if exclusive {
|
||||||
|
manager.chunkOperator.Delete(chunk, chunkDir + file)
|
||||||
|
} else {
|
||||||
|
collection.AddFossil(chunkDir + file)
|
||||||
|
LOG_DEBUG("FOSSIL_FIND", "Found unreferenced fossil %s", file)
|
||||||
|
}
|
||||||
fmt.Fprintf(logFile, "Found unreferenced fossil %s\n", file)
|
fmt.Fprintf(logFile, "Found unreferenced fossil %s\n", file)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user