diff --git a/src/duplicacy_s3cstorage.go b/src/duplicacy_s3cstorage.go new file mode 100644 index 0000000..518b798 --- /dev/null +++ b/src/duplicacy_s3cstorage.go @@ -0,0 +1,212 @@ +// Copyright (c) Acrosync LLC. All rights reserved. +// Licensed under the Fair Source License 0.9 (https://fair.io/) +// User Limitation: 5 users + +package duplicacy + +import ( + "time" + "github.com/gilbertchen/goamz/aws" + "github.com/gilbertchen/goamz/s3" +) + +// S3CStorage is a storage backend for s3 compatible storages that require V2 Signing. +type S3CStorage struct { + RateLimitedStorage + + buckets []*s3.Bucket + storageDir string +} + +// CreateS3CStorage creates a amazon s3 storage object. +func CreateS3CStorage(regionName string, endpoint string, bucketName string, storageDir string, + accessKey string, secretKey string, threads int) (storage *S3CStorage, err error) { + + var region aws.Region + + if endpoint == "" { + if regionName == "" { + regionName = "us-east-1" + } + region = aws.Regions[regionName] + } else { + region = aws.Region { Name: regionName, S3Endpoint:"https://" + endpoint } + } + + auth := aws.Auth{ AccessKey: accessKey, SecretKey: secretKey } + + var buckets []*s3.Bucket + for i := 0; i < threads; i++ { + s3Client := s3.New(auth, region) + s3Client.AttemptStrategy = aws.AttemptStrategy{ + Min: 8, + Total: 300 * time.Second, + Delay: 1000 * time.Millisecond, + } + + bucket := s3Client.Bucket(bucketName) + buckets = append(buckets, bucket) + } + + if len(storageDir) > 0 && storageDir[len(storageDir) - 1] != '/' { + storageDir += "/" + } + + storage = &S3CStorage { + buckets: buckets, + storageDir: storageDir, + } + + return storage, nil +} + +// ListFiles return the list of files and subdirectories under 'dir' (non-recursively) +func (storage *S3CStorage) ListFiles(threadIndex int, dir string) (files []string, sizes []int64, err error) { + if len(dir) > 0 && dir[len(dir) - 1] != '/' { + dir += "/" + } + + dirLength := len(storage.storageDir + dir) + if dir == "snapshots/" { + results, err := storage.buckets[threadIndex].List(storage.storageDir + dir, "/", "", 100) + if err != nil { + return nil, nil, err + } + + for _, subDir := range results.CommonPrefixes { + files = append(files, subDir[dirLength:]) + } + return files, nil, nil + } else if dir == "chunks/" { + marker := "" + for { + results, err := storage.buckets[threadIndex].List(storage.storageDir + dir, "", marker, 1000) + if err != nil { + return nil, nil, err + } + + for _, object := range results.Contents { + files = append(files, object.Key[dirLength:]) + sizes = append(sizes, object.Size) + } + + if !results.IsTruncated { + break + } + + marker = results.Contents[len(results.Contents) - 1].Key + } + return files, sizes, nil + + } else { + + results, err := storage.buckets[threadIndex].List(storage.storageDir + dir, "", "", 1000) + if err != nil { + return nil, nil, err + } + + for _, object := range results.Contents { + files = append(files, object.Key[dirLength:]) + } + return files, nil, nil + } +} + +// DeleteFile deletes the file or directory at 'filePath'. +func (storage *S3CStorage) DeleteFile(threadIndex int, filePath string) (err error) { + return storage.buckets[threadIndex].Del(storage.storageDir + filePath) +} + +// MoveFile renames the file. +func (storage *S3CStorage) MoveFile(threadIndex int, from string, to string) (err error) { + + options := s3.CopyOptions { ContentType: "application/duplicacy" } + _, err = storage.buckets[threadIndex].PutCopy(storage.storageDir + to, s3.Private, options, storage.buckets[threadIndex].Name + "/" + storage.storageDir + from) + if err != nil { + return nil + } + + return storage.DeleteFile(threadIndex, from) +} + +// CreateDirectory creates a new directory. +func (storage *S3CStorage) CreateDirectory(threadIndex int, dir string) (err error) { + return nil +} + +// GetFileInfo returns the information about the file or directory at 'filePath'. +func (storage *S3CStorage) GetFileInfo(threadIndex int, filePath string) (exist bool, isDir bool, size int64, err error) { + + response, err := storage.buckets[threadIndex].Head(storage.storageDir + filePath, nil) + if err != nil { + if e, ok := err.(*s3.Error); ok && (e.StatusCode == 403 || e.StatusCode == 404) { + return false, false, 0, nil + } else { + return false, false, 0, err + } + } + + if response.StatusCode == 403 || response.StatusCode == 404 { + return false, false, 0, nil + } else { + return true, false, response.ContentLength, nil + } +} + +// FindChunk finds the chunk with the specified id. If 'isFossil' is true, it will search for chunk files with +// the suffix '.fsl'. +func (storage *S3CStorage) FindChunk(threadIndex int, chunkID string, isFossil bool) (filePath string, exist bool, size int64, err error) { + + filePath = "chunks/" + chunkID + if isFossil { + filePath += ".fsl" + } + + exist, _, size, err = storage.GetFileInfo(threadIndex, filePath) + + if err != nil { + return "", false, 0, err + } else { + return filePath, exist, size, err + } + +} + +// DownloadFile reads the file at 'filePath' into the chunk. +func (storage *S3CStorage) DownloadFile(threadIndex int, filePath string, chunk *Chunk) (err error) { + + readCloser, err := storage.buckets[threadIndex].GetReader(storage.storageDir + filePath) + if err != nil { + return err + } + + defer readCloser.Close() + + _, err = RateLimitedCopy(chunk, readCloser, storage.DownloadRateLimit / len(storage.buckets)) + return err + +} + +// UploadFile writes 'content' to the file at 'filePath'. +func (storage *S3CStorage) UploadFile(threadIndex int, filePath string, content []byte) (err error) { + + options := s3.Options { } + reader := CreateRateLimitedReader(content, storage.UploadRateLimit / len(storage.buckets)) + return storage.buckets[threadIndex].PutReader(storage.storageDir + filePath, reader, int64(len(content)), "application/duplicacy", s3.Private, options) +} + +// If a local snapshot cache is needed for the storage to avoid downloading/uploading chunks too often when +// managing snapshots. +func (storage *S3CStorage) IsCacheNeeded () (bool) { return true } + +// If the 'MoveFile' method is implemented. +func (storage *S3CStorage) IsMoveFileImplemented() (bool) { return true } + +// If the storage can guarantee strong consistency. +func (storage *S3CStorage) IsStrongConsistent() (bool) { return false } + +// If the storage supports fast listing of files names. +func (storage *S3CStorage) IsFastListing() (bool) { return true } + +// Enable the test mode. +func (storage *S3CStorage) EnableTestMode() {} diff --git a/src/duplicacy_storage.go b/src/duplicacy_storage.go index 4f145e5..33225f1 100644 --- a/src/duplicacy_storage.go +++ b/src/duplicacy_storage.go @@ -293,7 +293,7 @@ func CreateStorage(preference Preference, resetPassword bool, threads int) (stor SavePassword(preference, "ssh_password", password) } return sftpStorage - } else if matched[1] == "s3" || matched[1] == "minio" { + } else if matched[1] == "s3" || matched[1] == "s3c" || matched[1] == "minio"{ // urlRegex := regexp.MustCompile(`^(\w+)://([\w\-]+@)?([^/]+)(/(.+))?`) @@ -319,17 +319,26 @@ func CreateStorage(preference Preference, resetPassword bool, threads int) (stor accessKey := GetPassword(preference, "s3_id", "Enter S3 Access Key ID:", true, resetPassword) secretKey := GetPassword(preference, "s3_secret", "Enter S3 Secret Access Key:", true, resetPassword) - isMinioCompatible := matched[1] == "minio" + var err error - s3Storage, err := CreateS3Storage(region, endpoint, bucket, storageDir, accessKey, secretKey, threads, isMinioCompatible) - if err != nil { - LOG_ERROR("STORAGE_CREATE", "Failed to load the S3 storage at %s: %v", storageURL, err) - return nil + if matched[1] == "s3c" { + storage, err = CreateS3CStorage(region, endpoint, bucket, storageDir, accessKey, secretKey, threads) + if err != nil { + LOG_ERROR("STORAGE_CREATE", "Failed to load the S3C storage at %s: %v", storageURL, err) + return nil + } + } else { + isMinioCompatible := matched[1] == "minio" + storage, err = CreateS3Storage(region, endpoint, bucket, storageDir, accessKey, secretKey, threads, isMinioCompatible) + if err != nil { + LOG_ERROR("STORAGE_CREATE", "Failed to load the S3 storage at %s: %v", storageURL, err) + return nil + } } SavePassword(preference, "s3_id", accessKey) SavePassword(preference, "s3_secret", secretKey) - return s3Storage + return storage } else if matched[1] == "dropbox" { storageDir := matched[3] + matched[5] token := GetPassword(preference, "dropbox_token", "Enter Dropbox access token:", true, resetPassword) diff --git a/src/duplicacy_storage_test.go b/src/duplicacy_storage_test.go index 2b06c3b..2a65ea0 100644 --- a/src/duplicacy_storage_test.go +++ b/src/duplicacy_storage_test.go @@ -66,6 +66,8 @@ func loadStorage(localStoragePath string, threads int) (Storage, error) { return CreateSFTPStorageWithPassword(storage["server"], port, storage["username"], storage["directory"], storage["password"], threads) } else if testStorageName == "s3" { return CreateS3Storage(storage["region"], storage["endpoint"], storage["bucket"], storage["directory"], storage["access_key"], storage["secret_key"], threads, false) + } else if testStorageName == "s3c" { + return CreateS3CStorage(storage["region"], storage["endpoint"], storage["bucket"], storage["directory"], storage["access_key"], storage["secret_key"], threads) } else if testStorageName == "minio" { return CreateS3Storage(storage["region"], storage["endpoint"], storage["bucket"], storage["directory"], storage["access_key"], storage["secret_key"], threads, true) } else if testStorageName == "dropbox" {