mirror of
https://github.com/rclone/rclone.git
synced 2026-01-09 12:03:20 +00:00
multipart all tests done
This commit is contained in:
@@ -208,6 +208,22 @@ type MultiPartCompleteResponse struct {
|
||||
Location string `json:"location"`
|
||||
}
|
||||
|
||||
// MultiPartEntriesRequest is the input to POST /s3/entries
|
||||
type MultiPartEntriesRequest struct {
|
||||
ClientMime string `json:"clientMime"`
|
||||
ClientName string `json:"clientName"`
|
||||
Filename string `json:"filename"`
|
||||
Size int64 `json:"size"`
|
||||
ClientExtension string `json:"clientExtension"`
|
||||
ParentID json.Number `json:"parent_id"`
|
||||
RelativePath string `json:"relativePath"`
|
||||
}
|
||||
|
||||
// MultiPartEntriesResponse is the result of POST /s3/entries
|
||||
type MultiPartEntriesResponse struct {
|
||||
FileEntry Item `json:"fileEntry"`
|
||||
}
|
||||
|
||||
// MultiPartAbort is the input of POST /s3/multipart/abort
|
||||
type MultiPartAbort struct {
|
||||
UploadID string `json:"uploadId"`
|
||||
|
||||
@@ -26,6 +26,7 @@ import (
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/rclone/rclone/backend/drime/api"
|
||||
@@ -150,6 +151,14 @@ and these uploads do not fully utilize your bandwidth, then increasing
|
||||
this may help to speed up the transfers.`,
|
||||
Default: 4,
|
||||
Advanced: true,
|
||||
}, {
|
||||
Name: "upload_cutoff",
|
||||
Help: `Cutoff for switching to chunked upload.
|
||||
|
||||
Any files larger than this will be uploaded in chunks of chunk_size.
|
||||
The minimum is 0 and the maximum is 5 GiB.`,
|
||||
Default: defaultUploadCutoff,
|
||||
Advanced: true,
|
||||
}, {
|
||||
Name: config.ConfigEncoding,
|
||||
Help: config.ConfigEncodingHelp,
|
||||
@@ -188,6 +197,7 @@ type Options struct {
|
||||
UploadConcurrency int `config:"upload_concurrency"`
|
||||
ChunkSize fs.SizeSuffix `config:"chunk_size"`
|
||||
HardDelete bool `config:"hard_delete"`
|
||||
UploadCutoff fs.SizeSuffix `config:"upload_cutoff"`
|
||||
ListChunk int `config:"list_chunk"`
|
||||
Enc encoder.MultiEncoder `config:"encoding"`
|
||||
}
|
||||
@@ -219,6 +229,28 @@ type Object struct {
|
||||
|
||||
// ------------------------------------------------------------
|
||||
|
||||
func checkUploadChunkSize(cs fs.SizeSuffix) error {
|
||||
if cs < minChunkSize {
|
||||
return fmt.Errorf("%s is less than %s", cs, minChunkSize)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *Fs) setUploadChunkSize(cs fs.SizeSuffix) (old fs.SizeSuffix, err error) {
|
||||
err = checkUploadChunkSize(cs)
|
||||
if err == nil {
|
||||
old, f.opt.ChunkSize = f.opt.ChunkSize, cs
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (f *Fs) setUploadCutoff(cs fs.SizeSuffix) (old fs.SizeSuffix, err error) {
|
||||
if err == nil {
|
||||
old, f.opt.UploadCutoff = f.opt.UploadCutoff, cs
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Name of the remote (as passed into NewFs)
|
||||
func (f *Fs) Name() string {
|
||||
return f.name
|
||||
@@ -335,6 +367,11 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err = checkUploadChunkSize(opt.ChunkSize)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("drime: chunk size: %w", err)
|
||||
}
|
||||
|
||||
root = parsePath(root)
|
||||
|
||||
client := fshttp.NewClient(ctx)
|
||||
@@ -1015,12 +1052,21 @@ func (f *Fs) Hashes() hash.Set {
|
||||
var warnStreamUpload sync.Once
|
||||
|
||||
type drimeChunkWriter struct {
|
||||
uploadID string
|
||||
key string
|
||||
chunkSize int64
|
||||
size int64
|
||||
f *Fs
|
||||
o *Object
|
||||
uploadID string
|
||||
key string
|
||||
chunkSize int64
|
||||
size int64
|
||||
f *Fs
|
||||
o *Object
|
||||
written atomic.Int64
|
||||
|
||||
uploadName string // uuid
|
||||
leaf string
|
||||
mime string
|
||||
extension string
|
||||
parentID json.Number
|
||||
relativePath string
|
||||
|
||||
completedParts []api.CompletedPart
|
||||
completedPartsMu sync.Mutex
|
||||
}
|
||||
@@ -1060,11 +1106,13 @@ func (f *Fs) OpenChunkWriter(ctx context.Context, remote string, src fs.ObjectIn
|
||||
chunkSize = chunksize.Calculator(src, size, maxUploadParts, chunkSize)
|
||||
}
|
||||
|
||||
createSize := max(0, size)
|
||||
|
||||
// Initiate multipart upload
|
||||
req := api.MultiPartCreateRequest{
|
||||
Filename: leaf,
|
||||
Mime: fs.MimeType(ctx, src),
|
||||
Size: size,
|
||||
Size: createSize,
|
||||
Extension: strings.TrimPrefix(path.Ext(leaf), `.`),
|
||||
ParentID: json.Number(directoryID),
|
||||
RelativePath: f.opt.Enc.FromStandardPath(path.Join(f.root, remote)),
|
||||
@@ -1087,13 +1135,27 @@ func (f *Fs) OpenChunkWriter(ctx context.Context, remote string, src fs.ObjectIn
|
||||
return info, nil, fmt.Errorf("failed to initiate multipart upload: %w", err)
|
||||
}
|
||||
|
||||
mime := fs.MimeType(ctx, src)
|
||||
ext := strings.TrimPrefix(path.Ext(leaf), ".")
|
||||
// must have file extension for multipart upload
|
||||
if ext == "" {
|
||||
ext = "bin"
|
||||
}
|
||||
rel := f.opt.Enc.FromStandardPath(path.Join(f.root, remote))
|
||||
|
||||
chunkWriter := &drimeChunkWriter{
|
||||
uploadID: resp.UploadID,
|
||||
key: resp.Key,
|
||||
chunkSize: int64(chunkSize),
|
||||
size: size,
|
||||
f: f,
|
||||
o: o,
|
||||
uploadID: resp.UploadID,
|
||||
key: resp.Key,
|
||||
chunkSize: int64(chunkSize),
|
||||
size: size,
|
||||
f: f,
|
||||
o: o,
|
||||
uploadName: path.Base(resp.Key),
|
||||
leaf: leaf,
|
||||
mime: mime,
|
||||
extension: ext,
|
||||
parentID: json.Number(directoryID),
|
||||
relativePath: rel,
|
||||
}
|
||||
info = fs.ChunkWriterInfo{
|
||||
ChunkSize: int64(chunkSize),
|
||||
@@ -1184,6 +1246,10 @@ func (s *drimeChunkWriter) WriteChunk(ctx context.Context, chunkNumber int, read
|
||||
PartNumber: int32(chunkNumber),
|
||||
ETag: etag,
|
||||
})
|
||||
|
||||
// Count size written for unkown file sizes
|
||||
s.written.Add(chunkSize)
|
||||
|
||||
return chunkSize, nil
|
||||
}
|
||||
|
||||
@@ -1216,6 +1282,36 @@ func (s *drimeChunkWriter) Close(ctx context.Context) error {
|
||||
return fmt.Errorf("failed to complete multipart upload: %w", err)
|
||||
}
|
||||
|
||||
finalSize := s.size
|
||||
if finalSize < 0 {
|
||||
finalSize = s.written.Load()
|
||||
}
|
||||
|
||||
// s3/entries request to create drime object from multipart upload
|
||||
req := api.MultiPartEntriesRequest{
|
||||
ClientMime: s.mime,
|
||||
ClientName: s.leaf,
|
||||
Filename: s.uploadName,
|
||||
Size: finalSize,
|
||||
ClientExtension: s.extension,
|
||||
ParentID: s.parentID,
|
||||
RelativePath: s.relativePath,
|
||||
}
|
||||
|
||||
entriesOpts := rest.Opts{
|
||||
Method: "POST",
|
||||
Path: "/s3/entries",
|
||||
}
|
||||
|
||||
var res api.MultiPartEntriesResponse
|
||||
err = s.f.pacer.Call(func() (bool, error) {
|
||||
res, err := s.f.srv.CallJSON(ctx, &entriesOpts, req, &res)
|
||||
return shouldRetry(ctx, res, err)
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create entry after multipart upload: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@@ -1,10 +1,10 @@
|
||||
// Test Drime filesystem interface
|
||||
package drime_test
|
||||
// Drime filesystem interface
|
||||
package drime
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/rclone/rclone/backend/drime"
|
||||
"github.com/rclone/rclone/fs"
|
||||
"github.com/rclone/rclone/fstest/fstests"
|
||||
)
|
||||
|
||||
@@ -12,6 +12,22 @@ import (
|
||||
func TestIntegration(t *testing.T) {
|
||||
fstests.Run(t, &fstests.Opt{
|
||||
RemoteName: "TestDrime:",
|
||||
NilObject: (*drime.Object)(nil),
|
||||
NilObject: (*Object)(nil),
|
||||
ChunkedUpload: fstests.ChunkedUploadConfig{
|
||||
MinChunkSize: minChunkSize,
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
func (f *Fs) SetUploadChunkSize(cs fs.SizeSuffix) (fs.SizeSuffix, error) {
|
||||
return f.setUploadChunkSize(cs)
|
||||
}
|
||||
|
||||
func (f *Fs) SetUploadCutoff(cs fs.SizeSuffix) (fs.SizeSuffix, error) {
|
||||
return f.setUploadCutoff(cs)
|
||||
}
|
||||
|
||||
var (
|
||||
_ fstests.SetUploadChunkSizer = (*Fs)(nil)
|
||||
_ fstests.SetUploadCutoffer = (*Fs)(nil)
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user