diff --git a/backend/drime/api/types.go b/backend/drime/api/types.go index 5d5855102..e1501dbd9 100644 --- a/backend/drime/api/types.go +++ b/backend/drime/api/types.go @@ -208,6 +208,22 @@ type MultiPartCompleteResponse struct { Location string `json:"location"` } +// MultiPartEntriesRequest is the input to POST /s3/entries +type MultiPartEntriesRequest struct { + ClientMime string `json:"clientMime"` + ClientName string `json:"clientName"` + Filename string `json:"filename"` + Size int64 `json:"size"` + ClientExtension string `json:"clientExtension"` + ParentID json.Number `json:"parent_id"` + RelativePath string `json:"relativePath"` +} + +// MultiPartEntriesResponse is the result of POST /s3/entries +type MultiPartEntriesResponse struct { + FileEntry Item `json:"fileEntry"` +} + // MultiPartAbort is the input of POST /s3/multipart/abort type MultiPartAbort struct { UploadID string `json:"uploadId"` diff --git a/backend/drime/drime.go b/backend/drime/drime.go index 01d392b36..1cd817378 100644 --- a/backend/drime/drime.go +++ b/backend/drime/drime.go @@ -26,6 +26,7 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "time" "github.com/rclone/rclone/backend/drime/api" @@ -150,6 +151,14 @@ and these uploads do not fully utilize your bandwidth, then increasing this may help to speed up the transfers.`, Default: 4, Advanced: true, + }, { + Name: "upload_cutoff", + Help: `Cutoff for switching to chunked upload. + +Any files larger than this will be uploaded in chunks of chunk_size. +The minimum is 0 and the maximum is 5 GiB.`, + Default: defaultUploadCutoff, + Advanced: true, }, { Name: config.ConfigEncoding, Help: config.ConfigEncodingHelp, @@ -188,6 +197,7 @@ type Options struct { UploadConcurrency int `config:"upload_concurrency"` ChunkSize fs.SizeSuffix `config:"chunk_size"` HardDelete bool `config:"hard_delete"` + UploadCutoff fs.SizeSuffix `config:"upload_cutoff"` ListChunk int `config:"list_chunk"` Enc encoder.MultiEncoder `config:"encoding"` } @@ -219,6 +229,28 @@ type Object struct { // ------------------------------------------------------------ +func checkUploadChunkSize(cs fs.SizeSuffix) error { + if cs < minChunkSize { + return fmt.Errorf("%s is less than %s", cs, minChunkSize) + } + return nil +} + +func (f *Fs) setUploadChunkSize(cs fs.SizeSuffix) (old fs.SizeSuffix, err error) { + err = checkUploadChunkSize(cs) + if err == nil { + old, f.opt.ChunkSize = f.opt.ChunkSize, cs + } + return +} + +func (f *Fs) setUploadCutoff(cs fs.SizeSuffix) (old fs.SizeSuffix, err error) { + if err == nil { + old, f.opt.UploadCutoff = f.opt.UploadCutoff, cs + } + return +} + // Name of the remote (as passed into NewFs) func (f *Fs) Name() string { return f.name @@ -335,6 +367,11 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e return nil, err } + err = checkUploadChunkSize(opt.ChunkSize) + if err != nil { + return nil, fmt.Errorf("drime: chunk size: %w", err) + } + root = parsePath(root) client := fshttp.NewClient(ctx) @@ -1015,12 +1052,21 @@ func (f *Fs) Hashes() hash.Set { var warnStreamUpload sync.Once type drimeChunkWriter struct { - uploadID string - key string - chunkSize int64 - size int64 - f *Fs - o *Object + uploadID string + key string + chunkSize int64 + size int64 + f *Fs + o *Object + written atomic.Int64 + + uploadName string // uuid + leaf string + mime string + extension string + parentID json.Number + relativePath string + completedParts []api.CompletedPart completedPartsMu sync.Mutex } @@ -1060,11 +1106,13 @@ func (f *Fs) OpenChunkWriter(ctx context.Context, remote string, src fs.ObjectIn chunkSize = chunksize.Calculator(src, size, maxUploadParts, chunkSize) } + createSize := max(0, size) + // Initiate multipart upload req := api.MultiPartCreateRequest{ Filename: leaf, Mime: fs.MimeType(ctx, src), - Size: size, + Size: createSize, Extension: strings.TrimPrefix(path.Ext(leaf), `.`), ParentID: json.Number(directoryID), RelativePath: f.opt.Enc.FromStandardPath(path.Join(f.root, remote)), @@ -1087,13 +1135,27 @@ func (f *Fs) OpenChunkWriter(ctx context.Context, remote string, src fs.ObjectIn return info, nil, fmt.Errorf("failed to initiate multipart upload: %w", err) } + mime := fs.MimeType(ctx, src) + ext := strings.TrimPrefix(path.Ext(leaf), ".") + // must have file extension for multipart upload + if ext == "" { + ext = "bin" + } + rel := f.opt.Enc.FromStandardPath(path.Join(f.root, remote)) + chunkWriter := &drimeChunkWriter{ - uploadID: resp.UploadID, - key: resp.Key, - chunkSize: int64(chunkSize), - size: size, - f: f, - o: o, + uploadID: resp.UploadID, + key: resp.Key, + chunkSize: int64(chunkSize), + size: size, + f: f, + o: o, + uploadName: path.Base(resp.Key), + leaf: leaf, + mime: mime, + extension: ext, + parentID: json.Number(directoryID), + relativePath: rel, } info = fs.ChunkWriterInfo{ ChunkSize: int64(chunkSize), @@ -1184,6 +1246,10 @@ func (s *drimeChunkWriter) WriteChunk(ctx context.Context, chunkNumber int, read PartNumber: int32(chunkNumber), ETag: etag, }) + + // Count size written for unkown file sizes + s.written.Add(chunkSize) + return chunkSize, nil } @@ -1216,6 +1282,36 @@ func (s *drimeChunkWriter) Close(ctx context.Context) error { return fmt.Errorf("failed to complete multipart upload: %w", err) } + finalSize := s.size + if finalSize < 0 { + finalSize = s.written.Load() + } + + // s3/entries request to create drime object from multipart upload + req := api.MultiPartEntriesRequest{ + ClientMime: s.mime, + ClientName: s.leaf, + Filename: s.uploadName, + Size: finalSize, + ClientExtension: s.extension, + ParentID: s.parentID, + RelativePath: s.relativePath, + } + + entriesOpts := rest.Opts{ + Method: "POST", + Path: "/s3/entries", + } + + var res api.MultiPartEntriesResponse + err = s.f.pacer.Call(func() (bool, error) { + res, err := s.f.srv.CallJSON(ctx, &entriesOpts, req, &res) + return shouldRetry(ctx, res, err) + }) + if err != nil { + return fmt.Errorf("failed to create entry after multipart upload: %w", err) + } + return nil } diff --git a/backend/drime/drime_test.go b/backend/drime/drime_test.go index 10086d582..37b72d6c7 100644 --- a/backend/drime/drime_test.go +++ b/backend/drime/drime_test.go @@ -1,10 +1,10 @@ -// Test Drime filesystem interface -package drime_test +// Drime filesystem interface +package drime import ( "testing" - "github.com/rclone/rclone/backend/drime" + "github.com/rclone/rclone/fs" "github.com/rclone/rclone/fstest/fstests" ) @@ -12,6 +12,22 @@ import ( func TestIntegration(t *testing.T) { fstests.Run(t, &fstests.Opt{ RemoteName: "TestDrime:", - NilObject: (*drime.Object)(nil), + NilObject: (*Object)(nil), + ChunkedUpload: fstests.ChunkedUploadConfig{ + MinChunkSize: minChunkSize, + }, }) } + +func (f *Fs) SetUploadChunkSize(cs fs.SizeSuffix) (fs.SizeSuffix, error) { + return f.setUploadChunkSize(cs) +} + +func (f *Fs) SetUploadCutoff(cs fs.SizeSuffix) (fs.SizeSuffix, error) { + return f.setUploadCutoff(cs) +} + +var ( + _ fstests.SetUploadChunkSizer = (*Fs)(nil) + _ fstests.SetUploadCutoffer = (*Fs)(nil) +)