diff --git a/backend/drime/api/types.go b/backend/drime/api/types.go index 8e7e0f595..c9bf50699 100644 --- a/backend/drime/api/types.go +++ b/backend/drime/api/types.go @@ -158,3 +158,56 @@ type CopyResponse struct { Status string `json:"status"` Entries []Item `json:"entries"` } + +// MultiPartCreateRequest is the input of POST /s3/multipart/create +type MultiPartCreateRequest struct { + Filename string `json:"filename"` + Mime string `json:"mime"` + Size int64 `json:"size"` + Extension string `json:"extension"` +} + +// MultiPartCreateResponse is returned by POST /s3/multipart/create +type MultiPartCreateResponse struct { + UploadID string `json:"uploadId"` + Key string `json:"key"` +} + +// CompletedPart Type for completed parts when making a multipart upload. +type CompletedPart struct { + ETag string `json:"ETag"` + PartNumber int32 `json:"PartNumber"` +} + +// MultiPartGetURLsRequest is the input of POST /s3/multipart/batch-sign-part-urls +type MultiPartGetURLsRequest struct { + UploadID string `json:"uploadId"` + Key string `json:"key"` + PartNumbers []int `json:"partNumbers"` +} + +// MultiPartGetURLsResponse is the result of POST /s3/multipart/batch-sign-part-urls +type MultiPartGetURLsResponse struct { + URLs []struct { + URL string `json:"url"` + PartNumber int32 `json:"partNumber"` + } `json:"urls"` +} + +// MultiPartCompleteRequest is the input to POST /s3/multipart/complete +type MultiPartCompleteRequest struct { + UploadID string `json:"uploadId"` + Key string `json:"key"` + Parts []CompletedPart `json:"parts"` +} + +// MultiPartCompleteResponse is the result of POST /s3/multipart/complete +type MultiPartCompleteResponse struct { + Location string `json:"location"` +} + +// MultiPartAbort is the input of POST /s3/multipart/abort +type MultiPartAbort struct { + UploadID string `json:"uploadId"` + Key string `json:"key"` +} diff --git a/backend/drime/drime.go b/backend/drime/drime.go index 31c3a2fd6..a8949a31c 100644 --- a/backend/drime/drime.go +++ b/backend/drime/drime.go @@ -22,12 +22,15 @@ import ( "net/http" "net/url" "path" + "sort" "strconv" "strings" + "sync" "time" "github.com/rclone/rclone/backend/drime/api" "github.com/rclone/rclone/fs" + "github.com/rclone/rclone/fs/chunksize" "github.com/rclone/rclone/fs/config" "github.com/rclone/rclone/fs/config/configmap" "github.com/rclone/rclone/fs/config/configstruct" @@ -42,11 +45,14 @@ import ( ) const ( - minSleep = 10 * time.Millisecond - maxSleep = 20 * time.Second - decayConstant = 1 // bigger for slower decay, exponential - baseURL = "https://app.drime.cloud/" - rootURL = baseURL + "api/v1" + minSleep = 10 * time.Millisecond + maxSleep = 20 * time.Second + decayConstant = 1 // bigger for slower decay, exponential + baseURL = "https://app.drime.cloud/" + rootURL = baseURL + "api/v1" + maxUploadParts = 10000 // maximum allowed number of parts in a multi-part upload + minChunkSize = fs.SizeSuffix(1024 * 1024 * 5) + defaultUploadCutoff = fs.SizeSuffix(200 * 1024 * 1024) ) // Register with Fs @@ -92,6 +98,58 @@ Leave this blank normally, rclone will fill it in automatically. Help: "Delete files permanently rather than putting them into the trash.", Default: false, Advanced: true, + }, { + Name: "upload_cutoff", + Help: `Cutoff for switching to chunked upload. + +Any files larger than this will be uploaded in chunks of chunk_size. +The minimum is 0 and the maximum is 5 GiB.`, + Default: defaultUploadCutoff, + Advanced: true, + }, { + Name: "chunk_size", + Help: `Chunk size to use for uploading. + +When uploading files larger than upload_cutoff or files with unknown +size (e.g. from "rclone rcat" or uploaded with "rclone mount" or google +photos or google docs) they will be uploaded as multipart uploads +using this chunk size. + +Note that "--s3-upload-concurrency" chunks of this size are buffered +in memory per transfer. + +If you are transferring large files over high-speed links and you have +enough memory, then increasing this will speed up the transfers. + +Rclone will automatically increase the chunk size when uploading a +large file of known size to stay below the 10,000 chunks limit. + +Files of unknown size are uploaded with the configured +chunk_size. Since the default chunk size is 5 MiB and there can be at +most 10,000 chunks, this means that by default the maximum size of +a file you can stream upload is 48 GiB. If you wish to stream upload +larger files then you will need to increase chunk_size. + +Increasing the chunk size decreases the accuracy of the progress +statistics displayed with "-P" flag. Rclone treats chunk as sent when +it's buffered by the AWS SDK, when in fact it may still be uploading. +A bigger chunk size means a bigger AWS SDK buffer and progress +reporting more deviating from the truth. +`, + Default: minChunkSize, + Advanced: true, + }, { + Name: "upload_concurrency", + Help: `Concurrency for multipart uploads and copies. + +This is the number of chunks of the same file that are uploaded +concurrently for multipart uploads and copies. + +If you are uploading small numbers of large files over high-speed links +and these uploads do not fully utilize your bandwidth, then increasing +this may help to speed up the transfers.`, + Default: 4, + Advanced: true, }, { Name: config.ConfigEncoding, Help: config.ConfigEncodingHelp, @@ -124,12 +182,14 @@ base32768isOK = true // make sure maxFileLength for 2 byte unicode chars is the // Options defines the configuration for this backend type Options struct { - AccessToken string `config:"access_token"` - RootFolderID string `config:"root_folder_id"` - WorkspaceID string `config:"workspace_id"` - HardDelete bool `config:"hard_delete"` - ListChunk int `config:"list_chunk"` - Enc encoder.MultiEncoder `config:"encoding"` + AccessToken string `config:"access_token"` + RootFolderID string `config:"root_folder_id"` + WorkspaceID string `config:"workspace_id"` + UploadConcurrency int `config:"upload_concurrency"` + ChunkSize fs.SizeSuffix `config:"chunk_size"` + HardDelete bool `config:"hard_delete"` + ListChunk int `config:"list_chunk"` + Enc encoder.MultiEncoder `config:"encoding"` } // Fs represents a remote drime @@ -952,6 +1012,233 @@ func (f *Fs) Hashes() hash.Set { return hash.Set(hash.None) } +var warnStreamUpload sync.Once + +type drimeChunkWriter struct { + uploadID string + key string + chunkSize int64 + size int64 + f *Fs + o *Object + completedParts []api.CompletedPart + completedPartsMu sync.Mutex +} + +// OpenChunkWriter returns the chunk size and a ChunkWriter +// +// Pass in the remote and the src object +// You can also use options to hint at the desired chunk size +func (f *Fs) OpenChunkWriter(ctx context.Context, remote string, src fs.ObjectInfo, options ...fs.OpenOption) (info fs.ChunkWriterInfo, writer fs.ChunkWriter, err error) { + // Temporary Object under construction + o := &Object{ + fs: f, + remote: remote, + } + + size := src.Size() + fs.FixRangeOption(options, size) + + // calculate size of parts + chunkSize := f.opt.ChunkSize + + // size can be -1 here meaning we don't know the size of the incoming file. We use ChunkSize + // buffers here (default 64 MB). With a maximum number of parts (10,000) this will be a file of + // 640 GB. + if size == -1 { + warnStreamUpload.Do(func() { + fs.Logf(f, "Streaming uploads using chunk size %v will have maximum file size of %v", + chunkSize, fs.SizeSuffix(int64(chunkSize)*int64(maxUploadParts))) + }) + } else { + chunkSize = chunksize.Calculator(src, size, maxUploadParts, chunkSize) + } + + // Initiate multipart upload + req := api.MultiPartCreateRequest{ + Filename: f.opt.Enc.FromStandardName(path.Base(remote)), + Mime: fs.MimeType(ctx, src), + Size: size, + Extension: f.opt.Enc.FromStandardName(path.Ext(remote)), + } + + var resp api.MultiPartCreateResponse + + opts := rest.Opts{ + Method: "POST", + Path: "/s3/multipart/create", + Options: options, + } + + err = o.fs.pacer.Call(func() (bool, error) { + res, err := o.fs.srv.CallJSON(ctx, &opts, req, &resp) + return shouldRetry(ctx, res, err) + }) + + if err != nil { + return info, nil, fmt.Errorf("failed to initiate multipart upload: %w", err) + } + + chunkWriter := &drimeChunkWriter{ + uploadID: resp.UploadID, + key: resp.Key, + chunkSize: int64(chunkSize), + size: size, + f: f, + o: o, + } + info = fs.ChunkWriterInfo{ + ChunkSize: int64(chunkSize), + Concurrency: f.opt.UploadConcurrency, + LeavePartsOnError: false, + } + return info, chunkWriter, err +} + +// WriteChunk will write chunk number with reader bytes, where chunk number >= 0 +func (s *drimeChunkWriter) WriteChunk(ctx context.Context, chunkNumber int, reader io.ReadSeeker) (bytesWritten int64, err error) { + // chunk numbers between 1 and 100000 + chunkNumber++ + + // find size of chunk + chunkSize, err := reader.Seek(0, io.SeekEnd) + if err != nil { + return 0, fmt.Errorf("failed to seek chunk: %w", err) + } + + if chunkSize == 0 { + return 0, nil + } + + partOpts := rest.Opts{ + Method: "POST", + Path: "/s3/multipart/batch-sign-part-urls", + } + + req := api.MultiPartGetURLsRequest{ + UploadID: s.uploadID, + Key: s.key, + PartNumbers: []int{ + chunkNumber, + }, + } + + var resp api.MultiPartGetURLsResponse + + err = s.f.pacer.Call(func() (bool, error) { + res, err := s.f.srv.CallJSON(ctx, &partOpts, req, &resp) + return shouldRetry(ctx, res, err) + }) + + if err != nil { + return 0, fmt.Errorf("failed to get part URL: %w", err) + } + + if len(resp.URLs) != 1 { + return 0, fmt.Errorf("expecting 1 URL but got %d", len(resp.URLs)) + } + partURL := resp.URLs[0].URL + + opts := rest.Opts{ + Method: "PUT", + RootURL: partURL, + Body: reader, + ContentType: "application/octet-stream", + ContentLength: &chunkSize, + NoResponse: true, + ExtraHeaders: map[string]string{ + "Authorization": "", + }, + } + + var uploadRes *http.Response + + err = s.f.pacer.Call(func() (bool, error) { + _, err = reader.Seek(0, io.SeekStart) + if err != nil { + return false, fmt.Errorf("failed to seek chunk: %w", err) + } + uploadRes, err = s.f.srv.Call(ctx, &opts) + return shouldRetry(ctx, uploadRes, err) + }) + + if err != nil { + return 0, fmt.Errorf("failed to upload part %d: %w", chunkNumber, err) + } + + // Get ETag from response + etag := uploadRes.Header.Get("ETag") + etag = strings.Trim(etag, `"`) + fs.CheckClose(uploadRes.Body, &err) + + s.completedPartsMu.Lock() + defer s.completedPartsMu.Unlock() + s.completedParts = append(s.completedParts, api.CompletedPart{ + PartNumber: int32(chunkNumber), + ETag: etag, + }) + return chunkSize, nil +} + +// Close complete chunked writer finalising the file. +func (s *drimeChunkWriter) Close(ctx context.Context) error { + // Complete multipart upload + sort.Slice(s.completedParts, func(i, j int) bool { + return s.completedParts[i].PartNumber < s.completedParts[j].PartNumber + }) + + completeBody := api.MultiPartCompleteRequest{ + UploadID: s.uploadID, + Key: s.key, + Parts: s.completedParts, + } + + completeOpts := rest.Opts{ + Method: "POST", + Path: "/s3/multipart/complete", + } + + var response api.MultiPartCompleteResponse + + err := s.f.pacer.Call(func() (bool, error) { + res, err := s.f.srv.CallJSON(ctx, &completeOpts, completeBody, &response) + return shouldRetry(ctx, res, err) + }) + + if err != nil { + return fmt.Errorf("failed to complete multipart upload: %w", err) + } + + return nil +} + +// Abort chunk write +// +// You can and should call Abort without calling Close. +func (s *drimeChunkWriter) Abort(ctx context.Context) error { + opts := rest.Opts{ + Method: "POST", + Path: "/s3/multipart/abort", + NoResponse: true, + } + + req := api.MultiPartAbort{ + UploadID: s.uploadID, + Key: s.key, + } + + err := s.f.pacer.Call(func() (bool, error) { + res, err := s.f.srv.CallJSON(ctx, &opts, req, nil) + return shouldRetry(ctx, res, err) + }) + + if err != nil { + return fmt.Errorf("failed to abort multipart upload: %w", err) + } + + return nil +} + // ------------------------------------------------------------ // Fs returns the parent Fs @@ -1149,6 +1436,7 @@ var ( _ fs.Mover = (*Fs)(nil) _ fs.DirMover = (*Fs)(nil) _ fs.DirCacheFlusher = (*Fs)(nil) + _ fs.OpenChunkWriter = (*Fs)(nil) _ fs.Object = (*Object)(nil) _ fs.IDer = (*Object)(nil) _ fs.MimeTyper = (*Object)(nil)