1
0
mirror of https://github.com/gilbertchen/duplicacy synced 2025-12-06 00:03:38 +00:00

Compare commits

...

5 Commits

Author SHA1 Message Date
sevimo123
5ea5ae09c8 Merge 488abc3d6c into bb214b6e04 2024-11-27 04:58:35 +00:00
Gilbert Chen
bb214b6e04 Bump version to 3.2.4 2024-10-30 12:05:18 -04:00
Gilbert Chen
6bca9fccdd maxCollectionNumber must be increased even in collect-only mode
This fixed a bug that caused collect-only mode to keep overriding
collect 1 during every prune
2024-10-29 22:46:46 -04:00
Gilbert Chen
a06d925e53 Remove 'incomplete_files' in deleteIncompleteSnapshot()
This file is used for storing the on-disk entry list.  When an error occurs,
this file is renamed to 'incomplete_snapshot' for fast resuming on next run.
But if there is no error this file should be removed.
2024-10-29 21:04:51 -04:00
Victor Mozgin
488abc3d6c Multithreaded chunk listing for OneDrive (with batching and configuration) 2023-02-22 06:06:04 -05:00
6 changed files with 395 additions and 26 deletions

View File

@@ -2263,7 +2263,7 @@ func main() {
app.Name = "duplicacy"
app.HelpName = "duplicacy"
app.Usage = "A new generation cloud backup tool based on lock-free deduplication"
app.Version = "3.2.3" + " (" + GitCommit + ")"
app.Version = "3.2.4" + " (" + GitCommit + ")"
// Exit with code 2 if an invalid command is provided
app.CommandNotFound = func(context *cli.Context, command string) {

View File

@@ -559,7 +559,7 @@ func loadIncompleteSnapshot(snapshotID string, cachePath string) *EntryList {
// Delete the two incomplete files.
func deleteIncompleteSnapshot(cachePath string) {
for _, file := range []string{"incomplete_snapshot", "incomplete_chunks"} {
for _, file := range []string{"incomplete_snapshot", "incomplete_chunks", "incomplete_files"} {
filePath := path.Join(cachePath, file)
if _, err := os.Stat(filePath); err == nil {
err = os.Remove(filePath)

View File

@@ -46,12 +46,24 @@ type OneDriveClient struct {
IsConnected bool
TestMode bool
IsBusiness bool
IsBusiness bool
MaxBatchReqs int
RefreshTokenURL string
APIURL string
BatchURL string
StripBatchPrefixURL string
}
func NewOneDriveClient(tokenFile string, isBusiness bool, client_id string, client_secret string, drive_id string) (*OneDriveClient, error) {
func NewOneDriveClient(
tokenFile string,
isBusiness bool,
max_batch_requests int,
client_id string,
client_secret string,
drive_id string,
) (*OneDriveClient, error) {
description, err := ioutil.ReadFile(tokenFile)
if err != nil {
@@ -64,12 +76,13 @@ func NewOneDriveClient(tokenFile string, isBusiness bool, client_id string, clie
}
client := &OneDriveClient{
HTTPClient: http.DefaultClient,
TokenFile: tokenFile,
Token: token,
OAConfig: nil,
TokenLock: &sync.Mutex{},
IsBusiness: isBusiness,
HTTPClient: http.DefaultClient,
TokenFile: tokenFile,
Token: token,
OAConfig: nil,
TokenLock: &sync.Mutex{},
IsBusiness: isBusiness,
MaxBatchReqs: max_batch_requests,
}
if (client_id != "") {
@@ -92,9 +105,12 @@ func NewOneDriveClient(tokenFile string, isBusiness bool, client_id string, clie
if drive_id != "" {
client.APIURL = "https://graph.microsoft.com/v1.0/drives/"+drive_id
}
client.BatchURL = "https://graph.microsoft.com/v1.0/$batch"
client.StripBatchPrefixURL = "https://graph.microsoft.com/v1.0/"
} else {
client.RefreshTokenURL = "https://duplicacy.com/one_refresh"
client.APIURL = "https://api.onedrive.com/v1.0/drive"
client.BatchURL = "<Batching is only supported by Graph API>"
}
client.RefreshToken(false)
@@ -272,6 +288,12 @@ func (client *OneDriveClient) RefreshToken(force bool) (err error) {
return nil
}
type OneDriveListReqItem struct {
Path string
URL string
}
type OneDriveEntry struct {
ID string
Name string
@@ -279,9 +301,15 @@ type OneDriveEntry struct {
Size int64
}
type ErrorResponse struct {
Code string `json:"code"`
Message string `json:"message"`
}
type OneDriveListEntriesOutput struct {
Entries []OneDriveEntry `json:"value"`
NextLink string `json:"@odata.nextLink"`
Error ErrorResponse `json:"error"`
}
func (client *OneDriveClient) ListEntries(path string) ([]OneDriveEntry, error) {
@@ -324,6 +352,174 @@ func (client *OneDriveClient) ListEntries(path string) ([]OneDriveEntry, error)
return entries, nil
}
func (client *OneDriveClient) ListPathToURL(path string) (url string) {
url = client.APIURL + "/root:/" + path + ":/children"
if path == "" {
url = client.APIURL + "/root/children"
}
if client.TestMode {
url += "?top=8"
} else {
url += "?top=1000"
}
url += "&select=name,size,folder"
return url
}
type BatchRequestItem struct {
Id string `json:"id"`
Method string `json:"method"`
URL string `json:"url"`
}
type BatchRequest struct {
Requests []BatchRequestItem `json:"requests"`
}
type BatchResponseItem struct {
Id string `json:"id"`
Status int `json:"status"`
Headers map[string]string `json:"headers"`
Body OneDriveListEntriesOutput `json:"body"`
}
type BatchResponse struct {
Responses []BatchResponseItem `json:"responses"`
}
func (client *OneDriveClient) ListEntriesBatch(
prefix string,
batchReqs []OneDriveListReqItem,
) (
entriesPerReq []OneDriveListEntriesOutput,
newReqs []OneDriveListReqItem,
err error,
) {
nReqs := len(batchReqs)
entriesPerReq = make([]OneDriveListEntriesOutput, nReqs, nReqs)
newReqs = make([]OneDriveListReqItem, 0, 0)
for i, req := range batchReqs {
if req.URL == "" {
batchReqs[i].URL = client.ListPathToURL(prefix + "/" + batchReqs[i].Path)
}
}
if client.IsBusiness && nReqs > 1 {
// OneDrive Business uses Graph API which supports batching
breq := BatchRequest{}
breq.Requests = make([]BatchRequestItem, len(batchReqs), len(batchReqs))
for i, req := range batchReqs {
breq.Requests[i].Id = strconv.Itoa(i+1)
breq.Requests[i].Method = "GET"
breq.Requests[i].URL = req.URL[len(client.StripBatchPrefixURL):]
}
tracestr := fmt.Sprintf("Batch payload: %d requests", len(breq.Requests))
for _, req := range breq.Requests {
tracestr = tracestr + fmt.Sprintf("\n\t\t%s %s", req.Method, req.URL)
}
LOG_TRACE("ONEDRIVE_BATCH", tracestr)
readCloser, _, err := client.call(client.BatchURL, "POST", breq, "application/json")
if err != nil {
return nil, nil, err
}
defer readCloser.Close()
bresp := &BatchResponse{}
if err = json.NewDecoder(readCloser).Decode(&bresp); err != nil {
return nil, nil, err
}
if len(bresp.Responses) != len(batchReqs) {
return nil, nil, fmt.Errorf("Batch response length mismatch with requests length: %d != %d", len(bresp.Responses), len(batchReqs))
}
throttleDelay := 0
for _, resp := range bresp.Responses {
nresp, err := strconv.Atoi(resp.Id)
if err != nil {
return nil, nil, err
}
nresp-- // ids are 1-based in the response payload
if resp.Status == 200 { // success
entriesPerReq[nresp] = resp.Body
if entriesPerReq[nresp].NextLink != "" {
newReqs = append(
newReqs,
OneDriveListReqItem{
Path: batchReqs[nresp].Path,
URL: entriesPerReq[nresp].NextLink,
},
)
}
} else if resp.Status == 429 { // throttled
var backoff int
backoffList, found := resp.Headers["Retry-After"]
if found && len(backoffList)>0 {
backoff, _ = strconv.Atoi(backoffList)
backoff *= 1000 // s to ms
} else {
backoff = 300000 // 5 minutes by default
}
if backoff > throttleDelay {
throttleDelay = backoff
}
LOG_INFO("ONEDRIVE_RETRY", "Batch item response code: %d; suggested retry is %d milliseconds", resp.Status, backoff)
// Retry the same URL
newReqs = append(newReqs, batchReqs[nresp])
} else if resp.Status == 400 || resp.Status == 401 {
// Some errors are expected, e.g. unauthorized / expired token
// Retry the same URL
newReqs = append(newReqs, batchReqs[nresp])
} else { // unexpected error
errmsg := resp.Body.Error.Message
//LOG_TRACE("ONEDRIVE_BATCH", "Unexpected batch response error %d: %s / %s", resp.Status, http.StatusText(resp.Status), errmsg)
return nil, nil, fmt.Errorf("Unexpected batch response error %d: %s / %s", resp.Status, http.StatusText(resp.Status), errmsg)
}
}
if throttleDelay > 0 {
LOG_INFO("ONEDRIVE_RETRY", "Batch request throttled; retry after %d milliseconds", throttleDelay)
time.Sleep(time.Duration(throttleDelay) * time.Millisecond)
throttleDelay = 0
}
} else {
// Run without batching
for i, req := range batchReqs {
readCloser, _, err := client.call(req.URL, "GET", 0, "")
if err != nil {
return nil, nil, err
}
defer readCloser.Close()
if err = json.NewDecoder(readCloser).Decode(&entriesPerReq[i]); err != nil {
return nil, nil, err
}
if entriesPerReq[i].NextLink != "" {
newReqs = append(
newReqs,
OneDriveListReqItem{
Path: batchReqs[i].Path,
URL: entriesPerReq[i].NextLink,
},
)
}
}
}
return entriesPerReq, newReqs, nil
}
func (client *OneDriveClient) GetFileInfo(path string) (string, bool, int64, error) {
url := client.APIURL + "/root:/" + path

View File

@@ -8,24 +8,41 @@ import (
"fmt"
"path"
"strings"
"sync"
)
type OneDriveStorage struct {
StorageBase
client *OneDriveClient
storageDir string
numberOfThread int
client *OneDriveClient
storageDir string
numberOfThreads int
}
// CreateOneDriveStorage creates an OneDrive storage object.
func CreateOneDriveStorage(tokenFile string, isBusiness bool, storagePath string, threads int, client_id string, client_secret string, drive_id string) (storage *OneDriveStorage, err error) {
func CreateOneDriveStorage(
tokenFile string,
isBusiness bool,
storagePath string,
threads int,
max_batch_requests int,
client_id string,
client_secret string,
drive_id string,
) (storage *OneDriveStorage, err error) {
for len(storagePath) > 0 && storagePath[len(storagePath)-1] == '/' {
storagePath = storagePath[:len(storagePath)-1]
}
client, err := NewOneDriveClient(tokenFile, isBusiness, client_id, client_secret, drive_id)
client, err := NewOneDriveClient(
tokenFile,
isBusiness,
max_batch_requests,
client_id,
client_secret,
drive_id,
)
if err != nil {
return nil, err
}
@@ -44,9 +61,9 @@ func CreateOneDriveStorage(tokenFile string, isBusiness bool, storagePath string
}
storage = &OneDriveStorage{
client: client,
storageDir: storagePath,
numberOfThread: threads,
client: client,
storageDir: storagePath,
numberOfThreads: threads,
}
for _, path := range []string{"chunks", "fossils", "snapshots"} {
@@ -78,8 +95,9 @@ func (storage *OneDriveStorage) convertFilePath(filePath string) string {
return filePath
}
// ListFiles return the list of files and subdirectories under 'dir' (non-recursively)
func (storage *OneDriveStorage) ListFiles(threadIndex int, dir string) ([]string, []int64, error) {
func (storage *OneDriveStorage) ListFilesNotThreaded(threadIndex int, dir string) ([]string, []int64, error) {
for len(dir) > 0 && dir[len(dir)-1] == '/' {
dir = dir[:len(dir)-1]
@@ -145,6 +163,140 @@ func (storage *OneDriveStorage) ListFiles(threadIndex int, dir string) ([]string
}
// ListFiles return the list of files and subdirectories under 'dir' (non-recursively)
func (storage *OneDriveStorage) ListFiles(threadIndex int, dir string) ([]string, []int64, error) {
for len(dir) > 0 && dir[len(dir)-1] == '/' {
dir = dir[:len(dir)-1]
}
if dir == "snapshots" {
// Not threaded
entries, err := storage.client.ListEntries(storage.storageDir + "/" + dir)
if err != nil {
return nil, nil, err
}
subDirs := []string{}
for _, entry := range entries {
if len(entry.Folder) > 0 {
subDirs = append(subDirs, entry.Name+"/")
}
}
return subDirs, nil, nil
} else if strings.HasPrefix(dir, "snapshots/") || strings.HasPrefix(dir, "benchmark") {
// Not threaded
entries, err := storage.client.ListEntries(storage.storageDir + "/" + dir)
if err != nil {
return nil, nil, err
}
files := []string{}
for _, entry := range entries {
if len(entry.Folder) == 0 {
files = append(files, entry.Name)
}
}
return files, nil, nil
} else {
// Batched and threaded
lock := sync.Mutex {}
allFiles := []string{}
allSizes := []int64{}
errorChannel := make(chan error)
requestChannel := make(chan OneDriveListReqItem)
activeWorkers := 0
requests := []OneDriveListReqItem{
{Path:"chunks", URL:""},
{Path:"fossils", URL:""},
}
maxRequestsPerThread := 1
if storage.client.MaxBatchReqs > 1 {
// OneDrive Business works through Graph API
// which supports batch requests (20 is the max)
maxRequestsPerThread = storage.client.MaxBatchReqs
}
for len(requests) > 0 || activeWorkers > 0 {
if len(requests) > 0 && activeWorkers < storage.numberOfThreads {
n_batchReqs := len(requests)
if n_batchReqs > maxRequestsPerThread {
n_batchReqs = maxRequestsPerThread
}
// Dequeue n_batch_reqs from the request queue
batchReqs := requests[:n_batchReqs]
requests = requests[n_batchReqs:]
activeWorkers++
go func(batchReqs []OneDriveListReqItem) {
// Will do non-batching if disabled / not supported
entriesPerReq, newReqs, err := storage.client.ListEntriesBatch(storage.storageDir, batchReqs)
if err != nil {
errorChannel <- err
return
}
// send paging requests first
for _, pageReq := range newReqs {
requestChannel <- pageReq
}
files := []string {}
sizes := []int64 {}
for i, entries := range entriesPerReq {
LOG_DEBUG("ONE_STORAGE", "Listing %s; %d items returned", batchReqs[i].Path, len(entries.Entries))
for _, entry := range entries.Entries {
if len(entry.Folder) == 0 {
name := entry.Name
if strings.HasPrefix(batchReqs[i].Path, "fossils") {
name = batchReqs[i].Path + "/" + name + ".fsl"
name = name[len("fossils/"):]
} else {
name = batchReqs[i].Path + "/" + name
name = name[len("chunks/"):]
}
files = append(files, name)
sizes = append(sizes, entry.Size)
} else {
recurseDirRequest := OneDriveListReqItem{
Path: batchReqs[i].Path + "/" + entry.Name,
URL: "",
}
requestChannel <- recurseDirRequest
}
}
}
lock.Lock()
allFiles = append(allFiles, files...)
allSizes = append(allSizes, sizes...)
lock.Unlock()
requestChannel <- OneDriveListReqItem{Path:"", URL:""}
} (batchReqs)
}
if activeWorkers > 0 {
select {
case err := <- errorChannel:
return nil, nil, err
case request := <- requestChannel:
if request.Path == "" {
activeWorkers--
} else {
requests = append(requests, request)
}
}
}
}
return allFiles, allSizes, nil
}
}
// DeleteFile deletes the file or directory at 'filePath'.
func (storage *OneDriveStorage) DeleteFile(threadIndex int, filePath string) (err error) {
filePath = storage.convertFilePath(filePath)
@@ -211,13 +363,13 @@ func (storage *OneDriveStorage) DownloadFile(threadIndex int, filePath string, c
defer readCloser.Close()
_, err = RateLimitedCopy(chunk, readCloser, storage.DownloadRateLimit/storage.numberOfThread)
_, err = RateLimitedCopy(chunk, readCloser, storage.DownloadRateLimit/storage.numberOfThreads)
return err
}
// UploadFile writes 'content' to the file at 'filePath'.
func (storage *OneDriveStorage) UploadFile(threadIndex int, filePath string, content []byte) (err error) {
err = storage.client.UploadFile(storage.storageDir+"/"+filePath, content, storage.UploadRateLimit/storage.numberOfThread)
err = storage.client.UploadFile(storage.storageDir+"/"+filePath, content, storage.UploadRateLimit/storage.numberOfThreads)
if e, ok := err.(OneDriveError); ok && e.Status == 409 {
LOG_TRACE("ONEDRIVE_UPLOAD", "File %s already exists", filePath)

View File

@@ -2006,10 +2006,6 @@ func (manager *SnapshotManager) PruneSnapshots(selfID string, snapshotID string,
// deletable.
for _, collectionName := range collections {
if collectOnly {
continue
}
matched := collectionRegex.FindStringSubmatch(collectionName)
if matched == nil {
continue
@@ -2020,6 +2016,10 @@ func (manager *SnapshotManager) PruneSnapshots(selfID string, snapshotID string,
maxCollectionNumber = collectionNumber
}
if collectOnly {
continue
}
collectionFile := path.Join(collectionDir, collectionName)
manager.fileChunk.Reset(false)

View File

@@ -669,7 +669,28 @@ func CreateStorage(preference Preference, resetPassword bool, threads int) (stor
client_secret = GetPassword(preference, matched[1] + "_client_secret", prompt, true, resetPassword)
}
oneDriveStorage, err := CreateOneDriveStorage(tokenFile, matched[1] == "odb", storagePath, threads, client_id, client_secret, drive_id)
// OneDrive Business uses Graph API which supports request batching
// "disabled" - (-1) disabled
// "max" - ( 0) enabled, max requests per batch (normally, 20)
// "<nn>" - (nn) enabled, specified requests per batch
max_batch_requests := -1
if matched[1] == "odb" {
max_batch_requests_str := GetPasswordFromPreference(preference, matched[1] + "_max_batch_requests")
if max_batch_requests_str == "max" {
max_batch_requests = 20
} else if max_batch_requests_str != "" {
n, err := strconv.Atoi(max_batch_requests_str)
if err == nil {
max_batch_requests = n
if max_batch_requests > 20 {
max_batch_requests = 20
}
}
}
}
oneDriveStorage, err := CreateOneDriveStorage(tokenFile, matched[1] == "odb", storagePath, threads, max_batch_requests, client_id, client_secret, drive_id)
if err != nil {
LOG_ERROR("STORAGE_CREATE", "Failed to load the OneDrive storage at %s: %v", storageURL, err)
return nil