1
0
mirror of https://github.com/rclone/rclone.git synced 2025-12-28 14:13:28 +00:00

Add Multipart upload WIP

This commit is contained in:
dougal
2025-12-15 17:44:29 +00:00
parent 8e94a154ed
commit 672d9469eb
2 changed files with 352 additions and 11 deletions

View File

@@ -158,3 +158,56 @@ type CopyResponse struct {
Status string `json:"status"`
Entries []Item `json:"entries"`
}
// MultiPartCreateRequest is the input of POST /s3/multipart/create
type MultiPartCreateRequest struct {
Filename string `json:"filename"`
Mime string `json:"mime"`
Size int64 `json:"size"`
Extension string `json:"extension"`
}
// MultiPartCreateResponse is returned by POST /s3/multipart/create
type MultiPartCreateResponse struct {
UploadID string `json:"uploadId"`
Key string `json:"key"`
}
// CompletedPart Type for completed parts when making a multipart upload.
type CompletedPart struct {
ETag string `json:"ETag"`
PartNumber int32 `json:"PartNumber"`
}
// MultiPartGetURLsRequest is the input of POST /s3/multipart/batch-sign-part-urls
type MultiPartGetURLsRequest struct {
UploadID string `json:"uploadId"`
Key string `json:"key"`
PartNumbers []int `json:"partNumbers"`
}
// MultiPartGetURLsResponse is the result of POST /s3/multipart/batch-sign-part-urls
type MultiPartGetURLsResponse struct {
URLs []struct {
URL string `json:"url"`
PartNumber int32 `json:"partNumber"`
} `json:"urls"`
}
// MultiPartCompleteRequest is the input to POST /s3/multipart/complete
type MultiPartCompleteRequest struct {
UploadID string `json:"uploadId"`
Key string `json:"key"`
Parts []CompletedPart `json:"parts"`
}
// MultiPartCompleteResponse is the result of POST /s3/multipart/complete
type MultiPartCompleteResponse struct {
Location string `json:"location"`
}
// MultiPartAbort is the input of POST /s3/multipart/abort
type MultiPartAbort struct {
UploadID string `json:"uploadId"`
Key string `json:"key"`
}

View File

@@ -22,12 +22,15 @@ import (
"net/http"
"net/url"
"path"
"sort"
"strconv"
"strings"
"sync"
"time"
"github.com/rclone/rclone/backend/drime/api"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/chunksize"
"github.com/rclone/rclone/fs/config"
"github.com/rclone/rclone/fs/config/configmap"
"github.com/rclone/rclone/fs/config/configstruct"
@@ -42,11 +45,14 @@ import (
)
const (
minSleep = 10 * time.Millisecond
maxSleep = 20 * time.Second
decayConstant = 1 // bigger for slower decay, exponential
baseURL = "https://app.drime.cloud/"
rootURL = baseURL + "api/v1"
minSleep = 10 * time.Millisecond
maxSleep = 20 * time.Second
decayConstant = 1 // bigger for slower decay, exponential
baseURL = "https://app.drime.cloud/"
rootURL = baseURL + "api/v1"
maxUploadParts = 10000 // maximum allowed number of parts in a multi-part upload
minChunkSize = fs.SizeSuffix(1024 * 1024 * 5)
defaultUploadCutoff = fs.SizeSuffix(200 * 1024 * 1024)
)
// Register with Fs
@@ -92,6 +98,58 @@ Leave this blank normally, rclone will fill it in automatically.
Help: "Delete files permanently rather than putting them into the trash.",
Default: false,
Advanced: true,
}, {
Name: "upload_cutoff",
Help: `Cutoff for switching to chunked upload.
Any files larger than this will be uploaded in chunks of chunk_size.
The minimum is 0 and the maximum is 5 GiB.`,
Default: defaultUploadCutoff,
Advanced: true,
}, {
Name: "chunk_size",
Help: `Chunk size to use for uploading.
When uploading files larger than upload_cutoff or files with unknown
size (e.g. from "rclone rcat" or uploaded with "rclone mount" or google
photos or google docs) they will be uploaded as multipart uploads
using this chunk size.
Note that "--s3-upload-concurrency" chunks of this size are buffered
in memory per transfer.
If you are transferring large files over high-speed links and you have
enough memory, then increasing this will speed up the transfers.
Rclone will automatically increase the chunk size when uploading a
large file of known size to stay below the 10,000 chunks limit.
Files of unknown size are uploaded with the configured
chunk_size. Since the default chunk size is 5 MiB and there can be at
most 10,000 chunks, this means that by default the maximum size of
a file you can stream upload is 48 GiB. If you wish to stream upload
larger files then you will need to increase chunk_size.
Increasing the chunk size decreases the accuracy of the progress
statistics displayed with "-P" flag. Rclone treats chunk as sent when
it's buffered by the AWS SDK, when in fact it may still be uploading.
A bigger chunk size means a bigger AWS SDK buffer and progress
reporting more deviating from the truth.
`,
Default: minChunkSize,
Advanced: true,
}, {
Name: "upload_concurrency",
Help: `Concurrency for multipart uploads and copies.
This is the number of chunks of the same file that are uploaded
concurrently for multipart uploads and copies.
If you are uploading small numbers of large files over high-speed links
and these uploads do not fully utilize your bandwidth, then increasing
this may help to speed up the transfers.`,
Default: 4,
Advanced: true,
}, {
Name: config.ConfigEncoding,
Help: config.ConfigEncodingHelp,
@@ -124,12 +182,14 @@ base32768isOK = true // make sure maxFileLength for 2 byte unicode chars is the
// Options defines the configuration for this backend
type Options struct {
AccessToken string `config:"access_token"`
RootFolderID string `config:"root_folder_id"`
WorkspaceID string `config:"workspace_id"`
HardDelete bool `config:"hard_delete"`
ListChunk int `config:"list_chunk"`
Enc encoder.MultiEncoder `config:"encoding"`
AccessToken string `config:"access_token"`
RootFolderID string `config:"root_folder_id"`
WorkspaceID string `config:"workspace_id"`
UploadConcurrency int `config:"upload_concurrency"`
ChunkSize fs.SizeSuffix `config:"chunk_size"`
HardDelete bool `config:"hard_delete"`
ListChunk int `config:"list_chunk"`
Enc encoder.MultiEncoder `config:"encoding"`
}
// Fs represents a remote drime
@@ -952,6 +1012,233 @@ func (f *Fs) Hashes() hash.Set {
return hash.Set(hash.None)
}
var warnStreamUpload sync.Once
type drimeChunkWriter struct {
uploadID string
key string
chunkSize int64
size int64
f *Fs
o *Object
completedParts []api.CompletedPart
completedPartsMu sync.Mutex
}
// OpenChunkWriter returns the chunk size and a ChunkWriter
//
// Pass in the remote and the src object
// You can also use options to hint at the desired chunk size
func (f *Fs) OpenChunkWriter(ctx context.Context, remote string, src fs.ObjectInfo, options ...fs.OpenOption) (info fs.ChunkWriterInfo, writer fs.ChunkWriter, err error) {
// Temporary Object under construction
o := &Object{
fs: f,
remote: remote,
}
size := src.Size()
fs.FixRangeOption(options, size)
// calculate size of parts
chunkSize := f.opt.ChunkSize
// size can be -1 here meaning we don't know the size of the incoming file. We use ChunkSize
// buffers here (default 64 MB). With a maximum number of parts (10,000) this will be a file of
// 640 GB.
if size == -1 {
warnStreamUpload.Do(func() {
fs.Logf(f, "Streaming uploads using chunk size %v will have maximum file size of %v",
chunkSize, fs.SizeSuffix(int64(chunkSize)*int64(maxUploadParts)))
})
} else {
chunkSize = chunksize.Calculator(src, size, maxUploadParts, chunkSize)
}
// Initiate multipart upload
req := api.MultiPartCreateRequest{
Filename: f.opt.Enc.FromStandardName(path.Base(remote)),
Mime: fs.MimeType(ctx, src),
Size: size,
Extension: f.opt.Enc.FromStandardName(path.Ext(remote)),
}
var resp api.MultiPartCreateResponse
opts := rest.Opts{
Method: "POST",
Path: "/s3/multipart/create",
Options: options,
}
err = o.fs.pacer.Call(func() (bool, error) {
res, err := o.fs.srv.CallJSON(ctx, &opts, req, &resp)
return shouldRetry(ctx, res, err)
})
if err != nil {
return info, nil, fmt.Errorf("failed to initiate multipart upload: %w", err)
}
chunkWriter := &drimeChunkWriter{
uploadID: resp.UploadID,
key: resp.Key,
chunkSize: int64(chunkSize),
size: size,
f: f,
o: o,
}
info = fs.ChunkWriterInfo{
ChunkSize: int64(chunkSize),
Concurrency: f.opt.UploadConcurrency,
LeavePartsOnError: false,
}
return info, chunkWriter, err
}
// WriteChunk will write chunk number with reader bytes, where chunk number >= 0
func (s *drimeChunkWriter) WriteChunk(ctx context.Context, chunkNumber int, reader io.ReadSeeker) (bytesWritten int64, err error) {
// chunk numbers between 1 and 100000
chunkNumber++
// find size of chunk
chunkSize, err := reader.Seek(0, io.SeekEnd)
if err != nil {
return 0, fmt.Errorf("failed to seek chunk: %w", err)
}
if chunkSize == 0 {
return 0, nil
}
partOpts := rest.Opts{
Method: "POST",
Path: "/s3/multipart/batch-sign-part-urls",
}
req := api.MultiPartGetURLsRequest{
UploadID: s.uploadID,
Key: s.key,
PartNumbers: []int{
chunkNumber,
},
}
var resp api.MultiPartGetURLsResponse
err = s.f.pacer.Call(func() (bool, error) {
res, err := s.f.srv.CallJSON(ctx, &partOpts, req, &resp)
return shouldRetry(ctx, res, err)
})
if err != nil {
return 0, fmt.Errorf("failed to get part URL: %w", err)
}
if len(resp.URLs) != 1 {
return 0, fmt.Errorf("expecting 1 URL but got %d", len(resp.URLs))
}
partURL := resp.URLs[0].URL
opts := rest.Opts{
Method: "PUT",
RootURL: partURL,
Body: reader,
ContentType: "application/octet-stream",
ContentLength: &chunkSize,
NoResponse: true,
ExtraHeaders: map[string]string{
"Authorization": "",
},
}
var uploadRes *http.Response
err = s.f.pacer.Call(func() (bool, error) {
_, err = reader.Seek(0, io.SeekStart)
if err != nil {
return false, fmt.Errorf("failed to seek chunk: %w", err)
}
uploadRes, err = s.f.srv.Call(ctx, &opts)
return shouldRetry(ctx, uploadRes, err)
})
if err != nil {
return 0, fmt.Errorf("failed to upload part %d: %w", chunkNumber, err)
}
// Get ETag from response
etag := uploadRes.Header.Get("ETag")
etag = strings.Trim(etag, `"`)
fs.CheckClose(uploadRes.Body, &err)
s.completedPartsMu.Lock()
defer s.completedPartsMu.Unlock()
s.completedParts = append(s.completedParts, api.CompletedPart{
PartNumber: int32(chunkNumber),
ETag: etag,
})
return chunkSize, nil
}
// Close complete chunked writer finalising the file.
func (s *drimeChunkWriter) Close(ctx context.Context) error {
// Complete multipart upload
sort.Slice(s.completedParts, func(i, j int) bool {
return s.completedParts[i].PartNumber < s.completedParts[j].PartNumber
})
completeBody := api.MultiPartCompleteRequest{
UploadID: s.uploadID,
Key: s.key,
Parts: s.completedParts,
}
completeOpts := rest.Opts{
Method: "POST",
Path: "/s3/multipart/complete",
}
var response api.MultiPartCompleteResponse
err := s.f.pacer.Call(func() (bool, error) {
res, err := s.f.srv.CallJSON(ctx, &completeOpts, completeBody, &response)
return shouldRetry(ctx, res, err)
})
if err != nil {
return fmt.Errorf("failed to complete multipart upload: %w", err)
}
return nil
}
// Abort chunk write
//
// You can and should call Abort without calling Close.
func (s *drimeChunkWriter) Abort(ctx context.Context) error {
opts := rest.Opts{
Method: "POST",
Path: "/s3/multipart/abort",
NoResponse: true,
}
req := api.MultiPartAbort{
UploadID: s.uploadID,
Key: s.key,
}
err := s.f.pacer.Call(func() (bool, error) {
res, err := s.f.srv.CallJSON(ctx, &opts, req, nil)
return shouldRetry(ctx, res, err)
})
if err != nil {
return fmt.Errorf("failed to abort multipart upload: %w", err)
}
return nil
}
// ------------------------------------------------------------
// Fs returns the parent Fs
@@ -1149,6 +1436,7 @@ var (
_ fs.Mover = (*Fs)(nil)
_ fs.DirMover = (*Fs)(nil)
_ fs.DirCacheFlusher = (*Fs)(nil)
_ fs.OpenChunkWriter = (*Fs)(nil)
_ fs.Object = (*Object)(nil)
_ fs.IDer = (*Object)(nil)
_ fs.MimeTyper = (*Object)(nil)