1
0
mirror of https://github.com/gilbertchen/duplicacy synced 2025-12-18 09:13:17 +00:00

Add a storage backend for Storj

The url format is storj://satellite/bucket/path.  You can get the
satellite along with the api access key when requesting an Access
Grant of type API Access.
This commit is contained in:
Gilbert Chen
2022-09-30 10:30:06 -04:00
parent cde660ee9f
commit 6009f64b66
3 changed files with 208 additions and 0 deletions

View File

@@ -714,6 +714,23 @@ func CreateStorage(preference Preference, resetPassword bool, threads int) (stor
}
SavePassword(preference, "fabric_token", token)
return smeStorage
} else if matched[1] == "storj" {
satellite := matched[2] + matched[3]
bucket := matched[5]
storageDir := ""
index := strings.Index(bucket, "/")
if index >= 0 {
storageDir = bucket[index + 1:]
bucket = bucket[:index]
}
apiKey := GetPassword(preference, "storj_key", "Enter the API access key:", true, resetPassword)
passphrase := GetPassword(preference, "storj_passphrase", "Enter the passphrase:", true, resetPassword)
storjStorage, err := CreateStorjStorage(satellite, apiKey, passphrase, bucket, storageDir, threads)
if err != nil {
LOG_ERROR("STORAGE_CREATE", "Failed to load the Storj storage at %s: %v", storageURL, err)
return nil
}
return storjStorage
} else {
LOG_ERROR("STORAGE_CREATE", "The storage type '%s' is not supported", matched[1])
return nil

View File

@@ -169,6 +169,13 @@ func loadStorage(localStoragePath string, threads int) (Storage, error) {
}
storage.SetDefaultNestingLevels([]int{2, 3}, 2)
return storage, err
} else if *testStorageName == "storj" {
storage, err := CreateStorjStorage(config["satellite"], config["key"], config["passphrase"], config["bucket"], config["storage_path"], threads)
if err != nil {
return nil, err
}
storage.SetDefaultNestingLevels([]int{2, 3}, 2)
return storage, err
}
return nil, fmt.Errorf("Invalid storage named: %s", *testStorageName)

View File

@@ -0,0 +1,184 @@
// Copyright (c) Acrosync LLC. All rights reserved.
// Free for personal use and commercial trial
// Commercial use requires per-user licenses available from https://duplicacy.com
package duplicacy
import (
"fmt"
"io"
"context"
"storj.io/uplink"
)
// StorjStorage is a storage backend for Storj.
type StorjStorage struct {
StorageBase
project *uplink.Project
bucket string
storageDir string
numberOfThreads int
}
// CreateStorjStorage creates a Storj storage.
func CreateStorjStorage(satellite string, apiKey string, passphrase string,
bucket string, storageDir string, threads int) (storage *StorjStorage, err error) {
ctx := context.Background()
access, err := uplink.RequestAccessWithPassphrase(ctx, satellite, apiKey, passphrase)
if err != nil {
return nil, fmt.Errorf("cannot request the access grant: %v", err)
}
project, err := uplink.OpenProject(ctx, access)
if err != nil {
return nil, fmt.Errorf("cannot open the project: %v", err)
}
_, err = project.StatBucket(ctx, bucket)
if err != nil {
return nil, fmt.Errorf("cannot found the bucket: %v", err)
}
if storageDir != "" && storageDir[len(storageDir) - 1] != '/' {
storageDir += "/"
}
storage = &StorjStorage {
project: project,
bucket: bucket,
storageDir: storageDir,
numberOfThreads: threads,
}
storage.DerivedStorage = storage
storage.SetDefaultNestingLevels([]int{2, 3}, 2)
return storage, nil
}
// ListFiles return the list of files and subdirectories under 'dir' (non-recursively).
func (storage *StorjStorage) ListFiles(threadIndex int, dir string) (
files []string, sizes []int64, err error) {
fullPath := storage.storageDir + dir
if fullPath != "" && fullPath[len(fullPath) - 1] != '/' {
fullPath += "/"
}
options := uplink.ListObjectsOptions {
Prefix: fullPath,
System: true, // request SystemMetadata which includes ContentLength
}
objects := storage.project.ListObjects(context.Background(), storage.bucket, &options)
for objects.Next() {
if objects.Err() != nil {
return nil, nil, objects.Err()
}
item := objects.Item()
name := item.Key[len(fullPath):]
size := item.System.ContentLength
if item.IsPrefix {
if name != "" && name[len(name) - 1] != '/' {
name += "/"
size = 0
}
}
files = append(files, name)
sizes = append(sizes, size)
}
return files, sizes, nil
}
// DeleteFile deletes the file or directory at 'filePath'.
func (storage *StorjStorage) DeleteFile(threadIndex int, filePath string) (err error) {
_, err = storage.project.DeleteObject(context.Background(), storage.bucket,
storage.storageDir + filePath)
return err
}
// MoveFile renames the file.
func (storage *StorjStorage) MoveFile(threadIndex int, from string, to string) (err error) {
err = storage.project.MoveObject(context.Background(), storage.bucket,
storage.storageDir + from, storage.bucket, storage.storageDir + to, nil)
return err
}
// CreateDirectory creates a new directory.
func (storage *StorjStorage) CreateDirectory(threadIndex int, dir string) (err error) {
return nil
}
// GetFileInfo returns the information about the file or directory at 'filePath'.
func (storage *StorjStorage) GetFileInfo(threadIndex int, filePath string) (
exist bool, isDir bool, size int64, err error) {
info, err := storage.project.StatObject(context.Background(), storage.bucket,
storage.storageDir + filePath)
if info == nil {
return false, false, 0, nil
} else if err != nil {
return false, false, 0, err
} else {
return true, info.IsPrefix, info.System.ContentLength, nil
}
}
// DownloadFile reads the file at 'filePath' into the chunk.
func (storage *StorjStorage) DownloadFile(threadIndex int, filePath string, chunk *Chunk) (err error) {
file, err := storage.project.DownloadObject(context.Background(), storage.bucket,
storage.storageDir + filePath, nil)
if err != nil {
return err
}
defer file.Close()
if _, err = RateLimitedCopy(chunk, file, storage.DownloadRateLimit/storage.numberOfThreads); err != nil {
return err
}
return nil
}
// UploadFile writes 'content' to the file at 'filePath'
func (storage *StorjStorage) UploadFile(threadIndex int, filePath string, content []byte) (err error) {
file, err := storage.project.UploadObject(context.Background(), storage.bucket,
storage.storageDir + filePath, nil)
if err != nil {
return err
}
reader := CreateRateLimitedReader(content, storage.UploadRateLimit/storage.numberOfThreads)
_, err = io.Copy(file, reader)
if err != nil {
return err
}
err = file.Commit()
if err != nil {
return err
}
return nil
}
// If a local snapshot cache is needed for the storage to avoid downloading/uploading chunks too often when
// managing snapshots.
func (storage *StorjStorage) IsCacheNeeded() bool { return true }
// If the 'MoveFile' method is implemented.
func (storage *StorjStorage) IsMoveFileImplemented() bool { return true }
// If the storage can guarantee strong consistency.
func (storage *StorjStorage) IsStrongConsistent() bool { return false }
// If the storage supports fast listing of files names.
func (storage *StorjStorage) IsFastListing() bool { return true }
// Enable the test mode.
func (storage *StorjStorage) EnableTestMode() {}