mirror of
https://github.com/rclone/rclone.git
synced 2026-02-25 00:53:06 +00:00
286 lines
7.2 KiB
Go
286 lines
7.2 KiB
Go
package filelu
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"mime/multipart"
|
|
"net/http"
|
|
"net/url"
|
|
"path"
|
|
"strings"
|
|
|
|
"github.com/rclone/rclone/fs"
|
|
"github.com/rclone/rclone/lib/rest"
|
|
)
|
|
|
|
// multipartUpload uploads a file in fixed-size chunks using the multipart API.
|
|
func (f *Fs) multipartUpload(ctx context.Context, in io.Reader, remote string) error {
|
|
dir := path.Dir(remote)
|
|
if dir == "." {
|
|
dir = ""
|
|
}
|
|
|
|
if dir != "" {
|
|
_ = f.Mkdir(ctx, dir)
|
|
}
|
|
|
|
folder := strings.Trim(dir, "/")
|
|
if folder != "" {
|
|
folder = "/" + folder
|
|
}
|
|
|
|
file := path.Base(remote)
|
|
|
|
initResp, err := f.multipartInit(ctx, folder, file)
|
|
if err != nil {
|
|
return fmt.Errorf("multipart init failed: %w", err)
|
|
}
|
|
|
|
uploadID := initResp.Result.UploadID
|
|
sessID := initResp.Result.SessID
|
|
server := initResp.Result.Server
|
|
objectPath := initResp.Result.ObjectPath
|
|
|
|
chunkSize := int(f.opt.ChunkSize)
|
|
buf := make([]byte, 0, chunkSize)
|
|
tmp := make([]byte, 1024*1024)
|
|
partNo := 1
|
|
|
|
for {
|
|
n, errRead := in.Read(tmp)
|
|
if n > 0 {
|
|
buf = append(buf, tmp[:n]...)
|
|
|
|
// If buffer reached chunkSize, upload a full part
|
|
if len(buf) >= chunkSize {
|
|
err = f.uploadPart(ctx, server, uploadID, sessID, objectPath, partNo, bytes.NewReader(buf))
|
|
if err != nil {
|
|
return fmt.Errorf("upload part %d failed: %w", partNo, err)
|
|
}
|
|
partNo++
|
|
buf = buf[:0]
|
|
}
|
|
}
|
|
|
|
if errRead == io.EOF {
|
|
break
|
|
}
|
|
if errRead != nil {
|
|
return fmt.Errorf("read failed: %w", errRead)
|
|
}
|
|
}
|
|
|
|
if len(buf) > 0 {
|
|
err = f.uploadPart(ctx, server, uploadID, sessID, objectPath, partNo, bytes.NewReader(buf))
|
|
if err != nil {
|
|
return fmt.Errorf("upload part %d failed: %w", partNo, err)
|
|
}
|
|
}
|
|
|
|
err = f.completeMultipart(ctx, server, uploadID, sessID, objectPath)
|
|
if err != nil {
|
|
return fmt.Errorf("complete multipart failed: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// uploadPart sends a single multipart chunk to the upload server.
|
|
func (f *Fs) uploadPart(ctx context.Context, server, uploadID, sessID, objectPath string, partNo int, r io.Reader) error {
|
|
url := fmt.Sprintf("%s?partNumber=%d&uploadId=%s", server, partNo, uploadID)
|
|
|
|
req, err := http.NewRequestWithContext(ctx, "PUT", url, r)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
req.Header.Set("X-RC-Upload-Id", uploadID)
|
|
req.Header.Set("X-RC-Part-No", fmt.Sprintf("%d", partNo))
|
|
req.Header.Set("X-Sess-ID", sessID)
|
|
req.Header.Set("X-Object-Path", objectPath)
|
|
|
|
resp, err := f.client.Do(req)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer func() { _ = resp.Body.Close() }()
|
|
|
|
if resp.StatusCode != 200 {
|
|
return fmt.Errorf("uploadPart failed: %s", resp.Status)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// uploadFile uploads a file to FileLu
|
|
func (f *Fs) uploadFile(ctx context.Context, fileContent io.Reader, fileFullPath string) error {
|
|
directory := path.Dir(fileFullPath)
|
|
fileName := path.Base(fileFullPath)
|
|
if directory == "." {
|
|
directory = ""
|
|
}
|
|
destinationFolderPath := path.Join(f.root, directory)
|
|
if destinationFolderPath != "" {
|
|
destinationFolderPath = "/" + strings.Trim(destinationFolderPath, "/")
|
|
}
|
|
|
|
existingEntries, err := f.List(ctx, path.Dir(fileFullPath))
|
|
if err != nil {
|
|
if errors.Is(err, fs.ErrorDirNotFound) {
|
|
err = f.Mkdir(ctx, path.Dir(fileFullPath))
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create directory: %w", err)
|
|
}
|
|
} else {
|
|
return fmt.Errorf("failed to list existing files: %w", err)
|
|
}
|
|
}
|
|
|
|
for _, entry := range existingEntries {
|
|
if entry.Remote() == fileFullPath {
|
|
_, ok := entry.(fs.Object)
|
|
if !ok {
|
|
continue
|
|
}
|
|
|
|
// If the file exists but is different, remove it
|
|
filePath := "/" + strings.Trim(destinationFolderPath+"/"+fileName, "/")
|
|
err = f.deleteFile(ctx, filePath)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to delete existing file: %w", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
uploadURL, sessID, err := f.getUploadServer(ctx)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to retrieve upload server: %w", err)
|
|
}
|
|
|
|
// Since the fileCode isn't used, just handle the error
|
|
if _, err := f.uploadFileWithDestination(ctx, uploadURL, sessID, fileName, fileContent, destinationFolderPath); err != nil {
|
|
return fmt.Errorf("failed to upload file: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (f *Fs) getUploadServer(ctx context.Context) (string, string, error) {
|
|
|
|
opts := rest.Opts{
|
|
Method: "GET",
|
|
Path: "/upload/server",
|
|
Parameters: url.Values{
|
|
"key": {f.opt.Key},
|
|
},
|
|
}
|
|
|
|
var result struct {
|
|
Status int `json:"status"`
|
|
SessID string `json:"sess_id"`
|
|
Result string `json:"result"`
|
|
Msg string `json:"msg"`
|
|
}
|
|
|
|
err := f.pacer.Call(func() (bool, error) {
|
|
_, err := f.srv.CallJSON(ctx, &opts, nil, &result)
|
|
if err != nil {
|
|
return shouldRetry(err), fmt.Errorf("failed to get upload server: %w", err)
|
|
}
|
|
return false, nil
|
|
})
|
|
|
|
if err != nil {
|
|
return "", "", err
|
|
}
|
|
|
|
if result.Status != 200 {
|
|
return "", "", fmt.Errorf("API error: %s", result.Msg)
|
|
}
|
|
|
|
return result.Result, result.SessID, nil
|
|
}
|
|
|
|
// uploadFileWithDestination uploads a file directly to a specified folder using file content reader.
|
|
func (f *Fs) uploadFileWithDestination(ctx context.Context, uploadURL, sessID, fileName string, fileContent io.Reader, dirPath string) (string, error) {
|
|
destinationPath := f.fromStandardPath(dirPath)
|
|
encodedFileName := f.fromStandardPath(fileName)
|
|
pr, pw := io.Pipe()
|
|
writer := multipart.NewWriter(pw)
|
|
isDeletionRequired := false
|
|
go func() {
|
|
defer func() {
|
|
if err := pw.Close(); err != nil {
|
|
fs.Logf(nil, "Failed to close: %v", err)
|
|
}
|
|
}()
|
|
_ = writer.WriteField("sess_id", sessID)
|
|
_ = writer.WriteField("utype", "prem")
|
|
_ = writer.WriteField("fld_path", destinationPath)
|
|
|
|
part, err := writer.CreateFormFile("file_0", encodedFileName)
|
|
if err != nil {
|
|
pw.CloseWithError(fmt.Errorf("failed to create form file: %w", err))
|
|
return
|
|
}
|
|
|
|
if _, err := io.Copy(part, fileContent); err != nil {
|
|
isDeletionRequired = true
|
|
pw.CloseWithError(fmt.Errorf("failed to copy file content: %w", err))
|
|
return
|
|
}
|
|
|
|
if err := writer.Close(); err != nil {
|
|
pw.CloseWithError(fmt.Errorf("failed to close writer: %w", err))
|
|
}
|
|
}()
|
|
|
|
var fileCode string
|
|
err := f.pacer.Call(func() (bool, error) {
|
|
req, err := http.NewRequestWithContext(ctx, "POST", uploadURL, pr)
|
|
if err != nil {
|
|
return false, fmt.Errorf("failed to create upload request: %w", err)
|
|
}
|
|
req.Header.Set("Content-Type", writer.FormDataContentType())
|
|
|
|
resp, err := f.client.Do(req)
|
|
if err != nil {
|
|
return shouldRetry(err), fmt.Errorf("failed to send upload request: %w", err)
|
|
}
|
|
defer respBodyClose(resp.Body)
|
|
|
|
var result []struct {
|
|
FileCode string `json:"file_code"`
|
|
FileStatus string `json:"file_status"`
|
|
}
|
|
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
|
|
return false, fmt.Errorf("failed to parse upload response: %w", err)
|
|
}
|
|
|
|
if len(result) == 0 || result[0].FileStatus != "OK" {
|
|
return false, fmt.Errorf("upload failed with status: %s", result[0].FileStatus)
|
|
}
|
|
|
|
fileCode = result[0].FileCode
|
|
return shouldRetryHTTP(resp.StatusCode), nil
|
|
})
|
|
|
|
if err != nil && isDeletionRequired {
|
|
// Attempt to delete the file if upload fails
|
|
_ = f.deleteFile(ctx, destinationPath+"/"+fileName)
|
|
}
|
|
|
|
return fileCode, err
|
|
}
|
|
|
|
// respBodyClose to check body response.
|
|
func respBodyClose(responseBody io.Closer) {
|
|
if cerr := responseBody.Close(); cerr != nil {
|
|
fmt.Printf("Error closing response body: %v\n", cerr)
|
|
}
|
|
}
|