From eef0b39a2caf03dc8b14cf2eb69d6c210499aa90 Mon Sep 17 00:00:00 2001 From: kingston125 Date: Wed, 4 Feb 2026 17:26:17 -0500 Subject: [PATCH] filelu: add multipart upload support with configurable cutoff --- backend/filelu/filelu.go | 43 ++++++----- backend/filelu/filelu_client.go | 53 +++++++++++++ backend/filelu/filelu_file_uploader.go | 100 +++++++++++++++++++++++++ backend/filelu/filelu_object.go | 22 ++++-- 4 files changed, 192 insertions(+), 26 deletions(-) diff --git a/backend/filelu/filelu.go b/backend/filelu/filelu.go index 306f9f80e..e97e13c22 100644 --- a/backend/filelu/filelu.go +++ b/backend/filelu/filelu.go @@ -21,6 +21,11 @@ import ( "github.com/rclone/rclone/lib/rest" ) +const ( + defaultUploadCutoff = fs.SizeSuffix(500 * 1024 * 1024) + defaultChunkSize = fs.SizeSuffix(64 * 1024 * 1024) +) + // Register the backend with Rclone func init() { fs.Register(&fs.RegInfo{ @@ -33,6 +38,17 @@ func init() { Required: true, Sensitive: true, }, + { + Name: "upload_cutoff", + Help: "Cutoff for switching to chunked upload. Any files larger than this will be uploaded in chunks of chunk_size.", + Default: defaultUploadCutoff, + Advanced: true, + }, { + Name: "chunk_size", + Help: "Chunk size to use for uploading. Used for multipart uploads.", + Default: defaultChunkSize, + Advanced: true, + }, { Name: config.ConfigEncoding, Help: config.ConfigEncodingHelp, @@ -72,8 +88,10 @@ func init() { // Options defines the configuration for the FileLu backend type Options struct { - Key string `config:"key"` - Enc encoder.MultiEncoder `config:"encoding"` + Key string `config:"key"` + Enc encoder.MultiEncoder `config:"encoding"` + UploadCutoff fs.SizeSuffix `config:"upload_cutoff"` + ChunkSize fs.SizeSuffix `config:"chunk_size"` } // Fs represents the FileLu file system @@ -189,7 +207,6 @@ func (f *Fs) Purge(ctx context.Context, dir string) error { return f.deleteFolder(ctx, fullPath) } -// List returns a list of files and folders // List returns a list of files and folders for the given directory func (f *Fs) List(ctx context.Context, dir string) (fs.DirEntries, error) { // Compose full path for API call @@ -250,23 +267,11 @@ func (f *Fs) List(ctx context.Context, dir string) (fs.DirEntries, error) { // Put uploads a file directly to the destination folder in the FileLu storage system. func (f *Fs) Put(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) { - if src.Size() == 0 { - return nil, fs.ErrorCantUploadEmptyFiles + o := &Object{ + fs: f, + remote: src.Remote(), } - - err := f.uploadFile(ctx, in, src.Remote()) - if err != nil { - return nil, err - } - - newObject := &Object{ - fs: f, - remote: src.Remote(), - size: src.Size(), - modTime: src.ModTime(ctx), - } - fs.Infof(f, "Put: Successfully uploaded new file %q", src.Remote()) - return newObject, nil + return o, o.Update(ctx, in, src, options...) } // Move moves the file to the specified location diff --git a/backend/filelu/filelu_client.go b/backend/filelu/filelu_client.go index 0beb1085c..beb9726ad 100644 --- a/backend/filelu/filelu_client.go +++ b/backend/filelu/filelu_client.go @@ -16,6 +16,59 @@ import ( "github.com/rclone/rclone/lib/rest" ) +// multipartInit starts a new multipart upload and returns server details. +func (f *Fs) multipartInit(ctx context.Context, folderPath, filename string) (*api.MultipartInitResponse, error) { + opts := rest.Opts{ + Method: "GET", + Path: "/multipart/init", + Parameters: url.Values{ + "key": {f.opt.Key}, + "filename": {filename}, + "folder_path": {folderPath}, + }, + } + + var result api.MultipartInitResponse + + err := f.pacer.Call(func() (bool, error) { + _, err := f.srv.CallJSON(ctx, &opts, nil, &result) + return fserrors.ShouldRetry(err), err + }) + if err != nil { + return nil, err + } + + if result.Status != 200 { + return nil, fmt.Errorf("multipart init error: %s", result.Msg) + } + + return &result, nil +} + +// completeMultipart finalizes the multipart upload on the file server. +func (f *Fs) completeMultipart(ctx context.Context, server string, uploadID string, sessID string, objectPath string) error { + req, err := http.NewRequestWithContext(ctx, "POST", server, nil) + if err != nil { + return err + } + + req.Header.Set("X-RC-Upload-Id", uploadID) + req.Header.Set("X-Sess-ID", sessID) + req.Header.Set("X-Object-Path", objectPath) + + resp, err := f.client.Do(req) + if err != nil { + return err + } + defer func() { _ = resp.Body.Close() }() + + if resp.StatusCode != 202 { + body, _ := io.ReadAll(resp.Body) + return fmt.Errorf("completeMultipart failed %d: %s", resp.StatusCode, string(body)) + } + return nil +} + // createFolder creates a folder at the specified path. func (f *Fs) createFolder(ctx context.Context, dirPath string) (*api.CreateFolderResponse, error) { encodedDir := f.fromStandardPath(dirPath) diff --git a/backend/filelu/filelu_file_uploader.go b/backend/filelu/filelu_file_uploader.go index 6da56ebdd..489d45faf 100644 --- a/backend/filelu/filelu_file_uploader.go +++ b/backend/filelu/filelu_file_uploader.go @@ -1,6 +1,7 @@ package filelu import ( + "bytes" "context" "encoding/json" "errors" @@ -15,6 +16,105 @@ import ( "github.com/rclone/rclone/fs" ) +// multipartUpload uploads a file in fixed-size chunks using the multipart API. +func (f *Fs) multipartUpload(ctx context.Context, in io.Reader, remote string) error { + dir := path.Dir(remote) + if dir == "." { + dir = "" + } + + if dir != "" { + _ = f.Mkdir(ctx, dir) + } + + folder := strings.Trim(dir, "/") + if folder != "" { + folder = "/" + folder + } + + file := path.Base(remote) + + initResp, err := f.multipartInit(ctx, folder, file) + if err != nil { + return fmt.Errorf("multipart init failed: %w", err) + } + + uploadID := initResp.Result.UploadID + sessID := initResp.Result.SessID + server := initResp.Result.Server + objectPath := initResp.Result.ObjectPath + + chunkSize := int(f.opt.ChunkSize) + buf := make([]byte, 0, chunkSize) + tmp := make([]byte, 1024*1024) + partNo := 1 + + for { + n, errRead := in.Read(tmp) + if n > 0 { + buf = append(buf, tmp[:n]...) + + // If buffer reached chunkSize, upload a full part + if len(buf) >= chunkSize { + err = f.uploadPart(ctx, server, uploadID, sessID, objectPath, partNo, bytes.NewReader(buf)) + if err != nil { + return fmt.Errorf("upload part %d failed: %w", partNo, err) + } + partNo++ + buf = buf[:0] + } + } + + if errRead == io.EOF { + break + } + if errRead != nil { + return fmt.Errorf("read failed: %w", errRead) + } + } + + if len(buf) > 0 { + err = f.uploadPart(ctx, server, uploadID, sessID, objectPath, partNo, bytes.NewReader(buf)) + if err != nil { + return fmt.Errorf("upload part %d failed: %w", partNo, err) + } + } + + err = f.completeMultipart(ctx, server, uploadID, sessID, objectPath) + if err != nil { + return fmt.Errorf("complete multipart failed: %w", err) + } + + return nil +} + +// uploadPart sends a single multipart chunk to the upload server. +func (f *Fs) uploadPart(ctx context.Context, server, uploadID, sessID, objectPath string, partNo int, r io.Reader) error { + url := fmt.Sprintf("%s?partNumber=%d&uploadId=%s", server, partNo, uploadID) + + req, err := http.NewRequestWithContext(ctx, "PUT", url, r) + if err != nil { + return err + } + + req.Header.Set("X-RC-Upload-Id", uploadID) + req.Header.Set("X-RC-Part-No", fmt.Sprintf("%d", partNo)) + req.Header.Set("X-Sess-ID", sessID) + req.Header.Set("X-Object-Path", objectPath) + + resp, err := f.client.Do(req) + if err != nil { + return err + } + defer func() { _ = resp.Body.Close() }() + + if resp.StatusCode != 200 { + return fmt.Errorf("uploadPart failed: %s", resp.Status) + } + + return nil +} + // uploadFile uploads a file to FileLu func (f *Fs) uploadFile(ctx context.Context, fileContent io.Reader, fileFullPath string) error { directory := path.Dir(fileFullPath) diff --git a/backend/filelu/filelu_object.go b/backend/filelu/filelu_object.go index bd138223b..edc2cff56 100644 --- a/backend/filelu/filelu_object.go +++ b/backend/filelu/filelu_object.go @@ -140,15 +140,23 @@ func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (io.ReadClo // Update updates the object with new data func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error { - if src.Size() <= 0 { - return fs.ErrorCantUploadEmptyFiles + size := src.Size() + + if size <= int64(o.fs.opt.UploadCutoff) { + err := o.fs.uploadFile(ctx, in, o.remote) + if err != nil { + return err + } + } else { + fullPath := path.Join(o.fs.root, o.remote) + err := o.fs.multipartUpload(ctx, in, fullPath) + if err != nil { + return fmt.Errorf("failed to upload file: %w", err) + } } - err := o.fs.uploadFile(ctx, in, o.remote) - if err != nil { - return fmt.Errorf("failed to upload file: %w", err) - } - o.size = src.Size() + o.size = size + o.modTime = src.ModTime(ctx) return nil }