1
0
mirror of https://github.com/rclone/rclone.git synced 2026-02-09 13:10:01 +00:00

filelu: add multipart upload support with configurable cutoff

This commit is contained in:
kingston125
2026-02-04 17:26:17 -05:00
committed by Nick Craig-Wood
parent 37f6336636
commit eef0b39a2c
4 changed files with 192 additions and 26 deletions

View File

@@ -21,6 +21,11 @@ import (
"github.com/rclone/rclone/lib/rest"
)
const (
defaultUploadCutoff = fs.SizeSuffix(500 * 1024 * 1024)
defaultChunkSize = fs.SizeSuffix(64 * 1024 * 1024)
)
// Register the backend with Rclone
func init() {
fs.Register(&fs.RegInfo{
@@ -33,6 +38,17 @@ func init() {
Required: true,
Sensitive: true,
},
{
Name: "upload_cutoff",
Help: "Cutoff for switching to chunked upload. Any files larger than this will be uploaded in chunks of chunk_size.",
Default: defaultUploadCutoff,
Advanced: true,
}, {
Name: "chunk_size",
Help: "Chunk size to use for uploading. Used for multipart uploads.",
Default: defaultChunkSize,
Advanced: true,
},
{
Name: config.ConfigEncoding,
Help: config.ConfigEncodingHelp,
@@ -72,8 +88,10 @@ func init() {
// Options defines the configuration for the FileLu backend
type Options struct {
Key string `config:"key"`
Enc encoder.MultiEncoder `config:"encoding"`
Key string `config:"key"`
Enc encoder.MultiEncoder `config:"encoding"`
UploadCutoff fs.SizeSuffix `config:"upload_cutoff"`
ChunkSize fs.SizeSuffix `config:"chunk_size"`
}
// Fs represents the FileLu file system
@@ -189,7 +207,6 @@ func (f *Fs) Purge(ctx context.Context, dir string) error {
return f.deleteFolder(ctx, fullPath)
}
// List returns a list of files and folders
// List returns a list of files and folders for the given directory
func (f *Fs) List(ctx context.Context, dir string) (fs.DirEntries, error) {
// Compose full path for API call
@@ -250,23 +267,11 @@ func (f *Fs) List(ctx context.Context, dir string) (fs.DirEntries, error) {
// Put uploads a file directly to the destination folder in the FileLu storage system.
func (f *Fs) Put(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
if src.Size() == 0 {
return nil, fs.ErrorCantUploadEmptyFiles
o := &Object{
fs: f,
remote: src.Remote(),
}
err := f.uploadFile(ctx, in, src.Remote())
if err != nil {
return nil, err
}
newObject := &Object{
fs: f,
remote: src.Remote(),
size: src.Size(),
modTime: src.ModTime(ctx),
}
fs.Infof(f, "Put: Successfully uploaded new file %q", src.Remote())
return newObject, nil
return o, o.Update(ctx, in, src, options...)
}
// Move moves the file to the specified location

View File

@@ -16,6 +16,59 @@ import (
"github.com/rclone/rclone/lib/rest"
)
// multipartInit starts a new multipart upload and returns server details.
func (f *Fs) multipartInit(ctx context.Context, folderPath, filename string) (*api.MultipartInitResponse, error) {
opts := rest.Opts{
Method: "GET",
Path: "/multipart/init",
Parameters: url.Values{
"key": {f.opt.Key},
"filename": {filename},
"folder_path": {folderPath},
},
}
var result api.MultipartInitResponse
err := f.pacer.Call(func() (bool, error) {
_, err := f.srv.CallJSON(ctx, &opts, nil, &result)
return fserrors.ShouldRetry(err), err
})
if err != nil {
return nil, err
}
if result.Status != 200 {
return nil, fmt.Errorf("multipart init error: %s", result.Msg)
}
return &result, nil
}
// completeMultipart finalizes the multipart upload on the file server.
func (f *Fs) completeMultipart(ctx context.Context, server string, uploadID string, sessID string, objectPath string) error {
req, err := http.NewRequestWithContext(ctx, "POST", server, nil)
if err != nil {
return err
}
req.Header.Set("X-RC-Upload-Id", uploadID)
req.Header.Set("X-Sess-ID", sessID)
req.Header.Set("X-Object-Path", objectPath)
resp, err := f.client.Do(req)
if err != nil {
return err
}
defer func() { _ = resp.Body.Close() }()
if resp.StatusCode != 202 {
body, _ := io.ReadAll(resp.Body)
return fmt.Errorf("completeMultipart failed %d: %s", resp.StatusCode, string(body))
}
return nil
}
// createFolder creates a folder at the specified path.
func (f *Fs) createFolder(ctx context.Context, dirPath string) (*api.CreateFolderResponse, error) {
encodedDir := f.fromStandardPath(dirPath)

View File

@@ -1,6 +1,7 @@
package filelu
import (
"bytes"
"context"
"encoding/json"
"errors"
@@ -15,6 +16,105 @@ import (
"github.com/rclone/rclone/fs"
)
// multipartUpload uploads a file in fixed-size chunks using the multipart API.
func (f *Fs) multipartUpload(ctx context.Context, in io.Reader, remote string) error {
dir := path.Dir(remote)
if dir == "." {
dir = ""
}
if dir != "" {
_ = f.Mkdir(ctx, dir)
}
folder := strings.Trim(dir, "/")
if folder != "" {
folder = "/" + folder
}
file := path.Base(remote)
initResp, err := f.multipartInit(ctx, folder, file)
if err != nil {
return fmt.Errorf("multipart init failed: %w", err)
}
uploadID := initResp.Result.UploadID
sessID := initResp.Result.SessID
server := initResp.Result.Server
objectPath := initResp.Result.ObjectPath
chunkSize := int(f.opt.ChunkSize)
buf := make([]byte, 0, chunkSize)
tmp := make([]byte, 1024*1024)
partNo := 1
for {
n, errRead := in.Read(tmp)
if n > 0 {
buf = append(buf, tmp[:n]...)
// If buffer reached chunkSize, upload a full part
if len(buf) >= chunkSize {
err = f.uploadPart(ctx, server, uploadID, sessID, objectPath, partNo, bytes.NewReader(buf))
if err != nil {
return fmt.Errorf("upload part %d failed: %w", partNo, err)
}
partNo++
buf = buf[:0]
}
}
if errRead == io.EOF {
break
}
if errRead != nil {
return fmt.Errorf("read failed: %w", errRead)
}
}
if len(buf) > 0 {
err = f.uploadPart(ctx, server, uploadID, sessID, objectPath, partNo, bytes.NewReader(buf))
if err != nil {
return fmt.Errorf("upload part %d failed: %w", partNo, err)
}
}
err = f.completeMultipart(ctx, server, uploadID, sessID, objectPath)
if err != nil {
return fmt.Errorf("complete multipart failed: %w", err)
}
return nil
}
// uploadPart sends a single multipart chunk to the upload server.
func (f *Fs) uploadPart(ctx context.Context, server, uploadID, sessID, objectPath string, partNo int, r io.Reader) error {
url := fmt.Sprintf("%s?partNumber=%d&uploadId=%s", server, partNo, uploadID)
req, err := http.NewRequestWithContext(ctx, "PUT", url, r)
if err != nil {
return err
}
req.Header.Set("X-RC-Upload-Id", uploadID)
req.Header.Set("X-RC-Part-No", fmt.Sprintf("%d", partNo))
req.Header.Set("X-Sess-ID", sessID)
req.Header.Set("X-Object-Path", objectPath)
resp, err := f.client.Do(req)
if err != nil {
return err
}
defer func() { _ = resp.Body.Close() }()
if resp.StatusCode != 200 {
return fmt.Errorf("uploadPart failed: %s", resp.Status)
}
return nil
}
// uploadFile uploads a file to FileLu
func (f *Fs) uploadFile(ctx context.Context, fileContent io.Reader, fileFullPath string) error {
directory := path.Dir(fileFullPath)

View File

@@ -140,15 +140,23 @@ func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (io.ReadClo
// Update updates the object with new data
func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error {
if src.Size() <= 0 {
return fs.ErrorCantUploadEmptyFiles
size := src.Size()
if size <= int64(o.fs.opt.UploadCutoff) {
err := o.fs.uploadFile(ctx, in, o.remote)
if err != nil {
return err
}
} else {
fullPath := path.Join(o.fs.root, o.remote)
err := o.fs.multipartUpload(ctx, in, fullPath)
if err != nil {
return fmt.Errorf("failed to upload file: %w", err)
}
}
err := o.fs.uploadFile(ctx, in, o.remote)
if err != nil {
return fmt.Errorf("failed to upload file: %w", err)
}
o.size = src.Size()
o.size = size
o.modTime = src.ModTime(ctx)
return nil
}