mirror of
https://github.com/rclone/rclone.git
synced 2025-12-15 07:43:35 +00:00
Compare commits
4 Commits
dependabot
...
fix-dropbo
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c741c02fb6 | ||
|
|
56e8d75cab | ||
|
|
e74f5b8906 | ||
|
|
e4a7686444 |
@@ -22,6 +22,7 @@ of path_display and all will be well.
|
|||||||
*/
|
*/
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
@@ -29,9 +30,11 @@ import (
|
|||||||
"path"
|
"path"
|
||||||
"regexp"
|
"regexp"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/dropbox/dropbox-sdk-go-unofficial/dropbox"
|
"github.com/dropbox/dropbox-sdk-go-unofficial/dropbox"
|
||||||
|
"github.com/dropbox/dropbox-sdk-go-unofficial/dropbox/async"
|
||||||
"github.com/dropbox/dropbox-sdk-go-unofficial/dropbox/auth"
|
"github.com/dropbox/dropbox-sdk-go-unofficial/dropbox/auth"
|
||||||
"github.com/dropbox/dropbox-sdk-go-unofficial/dropbox/common"
|
"github.com/dropbox/dropbox-sdk-go-unofficial/dropbox/common"
|
||||||
"github.com/dropbox/dropbox-sdk-go-unofficial/dropbox/files"
|
"github.com/dropbox/dropbox-sdk-go-unofficial/dropbox/files"
|
||||||
@@ -47,6 +50,7 @@ import (
|
|||||||
"github.com/rclone/rclone/fs/config/obscure"
|
"github.com/rclone/rclone/fs/config/obscure"
|
||||||
"github.com/rclone/rclone/fs/fserrors"
|
"github.com/rclone/rclone/fs/fserrors"
|
||||||
"github.com/rclone/rclone/fs/hash"
|
"github.com/rclone/rclone/fs/hash"
|
||||||
|
"github.com/rclone/rclone/lib/atexit"
|
||||||
"github.com/rclone/rclone/lib/encoder"
|
"github.com/rclone/rclone/lib/encoder"
|
||||||
"github.com/rclone/rclone/lib/oauthutil"
|
"github.com/rclone/rclone/lib/oauthutil"
|
||||||
"github.com/rclone/rclone/lib/pacer"
|
"github.com/rclone/rclone/lib/pacer"
|
||||||
@@ -61,6 +65,7 @@ const (
|
|||||||
minSleep = 10 * time.Millisecond
|
minSleep = 10 * time.Millisecond
|
||||||
maxSleep = 2 * time.Second
|
maxSleep = 2 * time.Second
|
||||||
decayConstant = 2 // bigger for slower decay, exponential
|
decayConstant = 2 // bigger for slower decay, exponential
|
||||||
|
maxBatchSize = 1000
|
||||||
// Upload chunk size - setting too small makes uploads slow.
|
// Upload chunk size - setting too small makes uploads slow.
|
||||||
// Chunks are buffered into memory for retries.
|
// Chunks are buffered into memory for retries.
|
||||||
//
|
//
|
||||||
@@ -142,6 +147,23 @@ memory. It can be set smaller if you are tight on memory.`, maxChunkSize),
|
|||||||
Help: "Impersonate this user when using a business account.",
|
Help: "Impersonate this user when using a business account.",
|
||||||
Default: "",
|
Default: "",
|
||||||
Advanced: true,
|
Advanced: true,
|
||||||
|
}, {
|
||||||
|
Name: "batch",
|
||||||
|
Help: `Enable batching of files if non-zero.
|
||||||
|
|
||||||
|
This sets the batch size of files to upload. It has to be less than 1000. A
|
||||||
|
sensible setting is probably 1000 if you are using this feature.
|
||||||
|
|
||||||
|
Rclone will close any outstanding batches when it exits.
|
||||||
|
|
||||||
|
Setting this is a great idea if you are uploading lots of small files as it will
|
||||||
|
make them a lot quicker. You can use --transfers 32 to maximise throughput.
|
||||||
|
|
||||||
|
It has the downside that rclone can't check the hash of the file after upload,
|
||||||
|
so using "rclone check" after the transfer completes is recommended.
|
||||||
|
`,
|
||||||
|
Default: 0,
|
||||||
|
Advanced: true,
|
||||||
}, {
|
}, {
|
||||||
Name: config.ConfigEncoding,
|
Name: config.ConfigEncoding,
|
||||||
Help: config.ConfigEncodingHelp,
|
Help: config.ConfigEncodingHelp,
|
||||||
@@ -163,6 +185,7 @@ memory. It can be set smaller if you are tight on memory.`, maxChunkSize),
|
|||||||
type Options struct {
|
type Options struct {
|
||||||
ChunkSize fs.SizeSuffix `config:"chunk_size"`
|
ChunkSize fs.SizeSuffix `config:"chunk_size"`
|
||||||
Impersonate string `config:"impersonate"`
|
Impersonate string `config:"impersonate"`
|
||||||
|
Batch int `config:"batch"`
|
||||||
Enc encoder.MultiEncoder `config:"encoding"`
|
Enc encoder.MultiEncoder `config:"encoding"`
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -180,6 +203,7 @@ type Fs struct {
|
|||||||
slashRootSlash string // root with "/" prefix and postfix, lowercase
|
slashRootSlash string // root with "/" prefix and postfix, lowercase
|
||||||
pacer *fs.Pacer // To pace the API calls
|
pacer *fs.Pacer // To pace the API calls
|
||||||
ns string // The namespace we are using or "" for none
|
ns string // The namespace we are using or "" for none
|
||||||
|
batcher *batcher // batch builder
|
||||||
}
|
}
|
||||||
|
|
||||||
// Object describes a dropbox object
|
// Object describes a dropbox object
|
||||||
@@ -195,6 +219,165 @@ type Object struct {
|
|||||||
|
|
||||||
// ------------------------------------------------------------
|
// ------------------------------------------------------------
|
||||||
|
|
||||||
|
// batcher holds info about the current items waiting for upload
|
||||||
|
type batcher struct {
|
||||||
|
f *Fs // Fs this batch is part of
|
||||||
|
mu sync.Mutex // lock for vars below
|
||||||
|
commitMu sync.Mutex // lock for waiting for batch
|
||||||
|
maxBatch int // maximum size for batch
|
||||||
|
active int // number of batches being sent
|
||||||
|
items []*files.UploadSessionFinishArg // current uncommitted files
|
||||||
|
atexit atexit.FnHandle // atexit handle
|
||||||
|
}
|
||||||
|
|
||||||
|
// newBatcher creates a new batcher structure
|
||||||
|
func newBatcher(f *Fs, maxBatch int) *batcher {
|
||||||
|
return &batcher{
|
||||||
|
f: f,
|
||||||
|
maxBatch: maxBatch,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start starts adding an item to a batch returning true if it was
|
||||||
|
// successfully started
|
||||||
|
//
|
||||||
|
// This should be paired with End
|
||||||
|
func (b *batcher) Start() bool {
|
||||||
|
if b.maxBatch <= 0 {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
b.mu.Lock()
|
||||||
|
defer b.mu.Unlock()
|
||||||
|
b.active++
|
||||||
|
// FIXME set a timer or something
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
// End ends adding an item
|
||||||
|
func (b *batcher) End(started bool) error {
|
||||||
|
if !started {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
b.mu.Lock()
|
||||||
|
defer b.mu.Unlock()
|
||||||
|
b.active--
|
||||||
|
if len(b.items) < b.maxBatch {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return b._commit(false)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Waits for the batch to complete - call with batchMu held
|
||||||
|
func (b *batcher) _waitForBatchResult(res *files.UploadSessionFinishBatchLaunch) (batchResult *files.UploadSessionFinishBatchResult, err error) {
|
||||||
|
if res.AsyncJobId == "" {
|
||||||
|
return res.Complete, nil
|
||||||
|
}
|
||||||
|
var batchStatus *files.UploadSessionFinishBatchJobStatus
|
||||||
|
sleepTime := time.Second
|
||||||
|
const maxTries = 120
|
||||||
|
for try := 1; try <= maxTries; try++ {
|
||||||
|
err = b.f.pacer.Call(func() (bool, error) {
|
||||||
|
batchStatus, err = b.f.srv.UploadSessionFinishBatchCheck(&async.PollArg{
|
||||||
|
AsyncJobId: res.AsyncJobId,
|
||||||
|
})
|
||||||
|
return shouldRetry(err)
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
fs.Errorf(b.f, "failed to wait for batch: %v", err)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
if batchStatus.Tag == "complete" {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
fs.Debugf(b.f, "sleeping for %v to wait for batch to complete, try %d/%d", sleepTime, try, maxTries)
|
||||||
|
time.Sleep(sleepTime)
|
||||||
|
}
|
||||||
|
return batchStatus.Complete, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// commit a batch - call with batchMu held
|
||||||
|
//
|
||||||
|
// if finalizing is true then it doesn't unregister Finalize as this
|
||||||
|
// causes a deadlock during finalization.
|
||||||
|
func (b *batcher) _commit(finalizing bool) (err error) {
|
||||||
|
b.commitMu.Lock()
|
||||||
|
batch := "batch"
|
||||||
|
if finalizing {
|
||||||
|
batch = "last batch"
|
||||||
|
}
|
||||||
|
fs.Debugf(b.f, "comitting %s length %d", batch, len(b.items))
|
||||||
|
var arg = &files.UploadSessionFinishBatchArg{
|
||||||
|
Entries: b.items,
|
||||||
|
}
|
||||||
|
var res *files.UploadSessionFinishBatchLaunch
|
||||||
|
err = b.f.pacer.Call(func() (bool, error) {
|
||||||
|
res, err = b.f.srv.UploadSessionFinishBatch(arg)
|
||||||
|
// If error is insufficient space then don't retry
|
||||||
|
if e, ok := err.(files.UploadSessionFinishAPIError); ok {
|
||||||
|
if e.EndpointError != nil && e.EndpointError.Path != nil && e.EndpointError.Path.Tag == files.WriteErrorInsufficientSpace {
|
||||||
|
err = fserrors.NoRetryError(err)
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// after the first chunk is uploaded, we retry everything
|
||||||
|
return err != nil, err
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
b.commitMu.Unlock()
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Clear batch
|
||||||
|
b.items = nil
|
||||||
|
|
||||||
|
// If finalizing, don't unregister or get result
|
||||||
|
if finalizing {
|
||||||
|
b.commitMu.Unlock()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Unregister the atexit since queue is empty
|
||||||
|
atexit.Unregister(b.atexit)
|
||||||
|
b.atexit = nil
|
||||||
|
|
||||||
|
// Wait for the batch to finish before we proceed in the background
|
||||||
|
go func() {
|
||||||
|
defer b.commitMu.Unlock()
|
||||||
|
_, err = b._waitForBatchResult(res)
|
||||||
|
if err != nil {
|
||||||
|
fs.Errorf(b.f, "Error waiting for batch to finish: %v", err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add adds a finished item to the batch
|
||||||
|
func (b *batcher) Add(commitInfo *files.UploadSessionFinishArg) {
|
||||||
|
fs.Debugf(b.f, "adding %q to batch", commitInfo.Commit.Path)
|
||||||
|
b.mu.Lock()
|
||||||
|
defer b.mu.Unlock()
|
||||||
|
b.items = append(b.items, commitInfo)
|
||||||
|
if b.atexit == nil {
|
||||||
|
b.atexit = atexit.Register(b.Finalize)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Finalize finishes any pending batches
|
||||||
|
func (b *batcher) Finalize() {
|
||||||
|
b.mu.Lock()
|
||||||
|
defer b.mu.Unlock()
|
||||||
|
if len(b.items) == 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
err := b._commit(true)
|
||||||
|
if err != nil {
|
||||||
|
fs.Errorf(b.f, "Failed to finalize last batch: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ------------------------------------------------------------
|
||||||
|
|
||||||
// Name of the remote (as passed into NewFs)
|
// Name of the remote (as passed into NewFs)
|
||||||
func (f *Fs) Name() string {
|
func (f *Fs) Name() string {
|
||||||
return f.name
|
return f.name
|
||||||
@@ -230,7 +413,7 @@ func shouldRetry(err error) (bool, error) {
|
|||||||
switch e := err.(type) {
|
switch e := err.(type) {
|
||||||
case auth.RateLimitAPIError:
|
case auth.RateLimitAPIError:
|
||||||
if e.RateLimitError.RetryAfter > 0 {
|
if e.RateLimitError.RetryAfter > 0 {
|
||||||
fs.Debugf(baseErrString, "Too many requests or write operations. Trying again in %d seconds.", e.RateLimitError.RetryAfter)
|
fs.Logf(baseErrString, "Too many requests or write operations. Trying again in %d seconds.", e.RateLimitError.RetryAfter)
|
||||||
err = pacer.RetryAfterError(err, time.Duration(e.RateLimitError.RetryAfter)*time.Second)
|
err = pacer.RetryAfterError(err, time.Duration(e.RateLimitError.RetryAfter)*time.Second)
|
||||||
}
|
}
|
||||||
return true, err
|
return true, err
|
||||||
@@ -273,6 +456,9 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errors.Wrap(err, "dropbox: chunk size")
|
return nil, errors.Wrap(err, "dropbox: chunk size")
|
||||||
}
|
}
|
||||||
|
if opt.Batch > maxBatchSize || opt.Batch < 0 {
|
||||||
|
return nil, errors.Errorf("dropbox: batch must be < %d and >= 0 - it is currently %d", maxBatchSize, opt.Batch)
|
||||||
|
}
|
||||||
|
|
||||||
// Convert the old token if it exists. The old token was just
|
// Convert the old token if it exists. The old token was just
|
||||||
// just a string, the new one is a JSON blob
|
// just a string, the new one is a JSON blob
|
||||||
@@ -297,6 +483,7 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) {
|
|||||||
opt: *opt,
|
opt: *opt,
|
||||||
pacer: fs.NewPacer(pacer.NewDefault(pacer.MinSleep(minSleep), pacer.MaxSleep(maxSleep), pacer.DecayConstant(decayConstant))),
|
pacer: fs.NewPacer(pacer.NewDefault(pacer.MinSleep(minSleep), pacer.MaxSleep(maxSleep), pacer.DecayConstant(decayConstant))),
|
||||||
}
|
}
|
||||||
|
f.batcher = newBatcher(f, f.opt.Batch)
|
||||||
config := dropbox.Config{
|
config := dropbox.Config{
|
||||||
LogLevel: dropbox.LogOff, // logging in the SDK: LogOff, LogDebug, LogInfo
|
LogLevel: dropbox.LogOff, // logging in the SDK: LogOff, LogDebug, LogInfo
|
||||||
Client: oAuthClient, // maybe???
|
Client: oAuthClient, // maybe???
|
||||||
@@ -1044,6 +1231,13 @@ func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (in io.Read
|
|||||||
// unknown (i.e. -1) or smaller than uploadChunkSize, the method incurs an
|
// unknown (i.e. -1) or smaller than uploadChunkSize, the method incurs an
|
||||||
// avoidable request to the Dropbox API that does not carry payload.
|
// avoidable request to the Dropbox API that does not carry payload.
|
||||||
func (o *Object) uploadChunked(in0 io.Reader, commitInfo *files.CommitInfo, size int64) (entry *files.FileMetadata, err error) {
|
func (o *Object) uploadChunked(in0 io.Reader, commitInfo *files.CommitInfo, size int64) (entry *files.FileMetadata, err error) {
|
||||||
|
batching := o.fs.batcher.Start()
|
||||||
|
defer func() {
|
||||||
|
batchErr := o.fs.batcher.End(batching)
|
||||||
|
if err != nil {
|
||||||
|
err = batchErr
|
||||||
|
}
|
||||||
|
}()
|
||||||
chunkSize := int64(o.fs.opt.ChunkSize)
|
chunkSize := int64(o.fs.opt.ChunkSize)
|
||||||
chunks := 0
|
chunks := 0
|
||||||
if size != -1 {
|
if size != -1 {
|
||||||
@@ -1057,11 +1251,15 @@ func (o *Object) uploadChunked(in0 io.Reader, commitInfo *files.CommitInfo, size
|
|||||||
fs.Debugf(o, "Streaming chunk %d/%d", cur, cur)
|
fs.Debugf(o, "Streaming chunk %d/%d", cur, cur)
|
||||||
} else if chunks == 0 {
|
} else if chunks == 0 {
|
||||||
fs.Debugf(o, "Streaming chunk %d/unknown", cur)
|
fs.Debugf(o, "Streaming chunk %d/unknown", cur)
|
||||||
} else {
|
} else if chunks != 1 {
|
||||||
fs.Debugf(o, "Uploading chunk %d/%d", cur, chunks)
|
fs.Debugf(o, "Uploading chunk %d/%d", cur, chunks)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
appendArg := files.UploadSessionAppendArg{
|
||||||
|
Close: chunks == 1,
|
||||||
|
}
|
||||||
|
|
||||||
// write the first chunk
|
// write the first chunk
|
||||||
fmtChunk(1, false)
|
fmtChunk(1, false)
|
||||||
var res *files.UploadSessionStartResult
|
var res *files.UploadSessionStartResult
|
||||||
@@ -1071,7 +1269,10 @@ func (o *Object) uploadChunked(in0 io.Reader, commitInfo *files.CommitInfo, size
|
|||||||
if _, err = chunk.Seek(0, io.SeekStart); err != nil {
|
if _, err = chunk.Seek(0, io.SeekStart); err != nil {
|
||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
res, err = o.fs.srv.UploadSessionStart(&files.UploadSessionStartArg{}, chunk)
|
arg := files.UploadSessionStartArg{
|
||||||
|
Close: appendArg.Close,
|
||||||
|
}
|
||||||
|
res, err = o.fs.srv.UploadSessionStart(&arg, chunk)
|
||||||
return shouldRetry(err)
|
return shouldRetry(err)
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -1082,22 +1283,34 @@ func (o *Object) uploadChunked(in0 io.Reader, commitInfo *files.CommitInfo, size
|
|||||||
SessionId: res.SessionId,
|
SessionId: res.SessionId,
|
||||||
Offset: 0,
|
Offset: 0,
|
||||||
}
|
}
|
||||||
appendArg := files.UploadSessionAppendArg{
|
appendArg.Cursor = &cursor
|
||||||
Cursor: &cursor,
|
|
||||||
Close: false,
|
|
||||||
}
|
|
||||||
|
|
||||||
// write more whole chunks (if any)
|
// write more whole chunks (if any, and if !batching), if
|
||||||
|
// batching write the last chunk also.
|
||||||
currentChunk := 2
|
currentChunk := 2
|
||||||
for {
|
for {
|
||||||
if chunks > 0 && currentChunk >= chunks {
|
if chunks > 0 {
|
||||||
// if the size is known, only upload full chunks. Remaining bytes are uploaded with
|
// Size known
|
||||||
// the UploadSessionFinish request.
|
if currentChunk == chunks {
|
||||||
break
|
// Last chunk
|
||||||
} else if chunks == 0 && in.BytesRead()-cursor.Offset < uint64(chunkSize) {
|
if !batching {
|
||||||
// if the size is unknown, upload as long as we can read full chunks from the reader.
|
// if the size is known, only upload full chunks. Remaining bytes are uploaded with
|
||||||
// The UploadSessionFinish request will not contain any payload.
|
// the UploadSessionFinish request.
|
||||||
break
|
break
|
||||||
|
}
|
||||||
|
appendArg.Close = true
|
||||||
|
} else if currentChunk > chunks {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// Size unknown
|
||||||
|
lastReadWasShort := in.BytesRead()-cursor.Offset < uint64(chunkSize)
|
||||||
|
if lastReadWasShort {
|
||||||
|
// if the size is unknown, upload as long as we can read full chunks from the reader.
|
||||||
|
// The UploadSessionFinish request will not contain any payload.
|
||||||
|
// This is also what we want if batching
|
||||||
|
break
|
||||||
|
}
|
||||||
}
|
}
|
||||||
cursor.Offset = in.BytesRead()
|
cursor.Offset = in.BytesRead()
|
||||||
fmtChunk(currentChunk, false)
|
fmtChunk(currentChunk, false)
|
||||||
@@ -1123,6 +1336,26 @@ func (o *Object) uploadChunked(in0 io.Reader, commitInfo *files.CommitInfo, size
|
|||||||
Cursor: &cursor,
|
Cursor: &cursor,
|
||||||
Commit: commitInfo,
|
Commit: commitInfo,
|
||||||
}
|
}
|
||||||
|
// If we are batching then we should have written all the data now
|
||||||
|
// store the commit info now for a batch commit
|
||||||
|
if batching {
|
||||||
|
// If we haven't closed the session then we need to
|
||||||
|
if !appendArg.Close {
|
||||||
|
fs.Debugf(o, "Closing session")
|
||||||
|
var empty bytes.Buffer
|
||||||
|
err = o.fs.pacer.Call(func() (bool, error) {
|
||||||
|
err = o.fs.srv.UploadSessionAppendV2(&appendArg, &empty)
|
||||||
|
// after the first chunk is uploaded, we retry everything
|
||||||
|
return err != nil, err
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
o.fs.batcher.Add(args)
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
fmtChunk(currentChunk, true)
|
fmtChunk(currentChunk, true)
|
||||||
chunk = readers.NewRepeatableReaderBuffer(in, buf)
|
chunk = readers.NewRepeatableReaderBuffer(in, buf)
|
||||||
err = o.fs.pacer.Call(func() (bool, error) {
|
err = o.fs.pacer.Call(func() (bool, error) {
|
||||||
@@ -1165,7 +1398,7 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
|
|||||||
size := src.Size()
|
size := src.Size()
|
||||||
var err error
|
var err error
|
||||||
var entry *files.FileMetadata
|
var entry *files.FileMetadata
|
||||||
if size > int64(o.fs.opt.ChunkSize) || size == -1 {
|
if size > int64(o.fs.opt.ChunkSize) || size == -1 || o.fs.opt.Batch > 0 {
|
||||||
entry, err = o.uploadChunked(in, commitInfo, size)
|
entry, err = o.uploadChunked(in, commitInfo, size)
|
||||||
} else {
|
} else {
|
||||||
err = o.fs.pacer.CallNoRetry(func() (bool, error) {
|
err = o.fs.pacer.CallNoRetry(func() (bool, error) {
|
||||||
@@ -1176,6 +1409,13 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrap(err, "upload failed")
|
return errors.Wrap(err, "upload failed")
|
||||||
}
|
}
|
||||||
|
// If we haven't received data back from batch upload then fake it
|
||||||
|
if entry == nil {
|
||||||
|
o.bytes = size
|
||||||
|
o.modTime = commitInfo.ClientModified
|
||||||
|
o.hash = "" // we don't have this
|
||||||
|
return nil
|
||||||
|
}
|
||||||
return o.setMetadataFromEntry(entry)
|
return o.setMetadataFromEntry(entry)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -272,7 +272,7 @@ func (s *StatsInfo) String() string {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
_, _ = fmt.Fprintf(buf, "%s%10s / %s, %s, %s, ETA %s%s\n",
|
_, _ = fmt.Fprintf(buf, "%s%10s / %s, %s, %s, ETA %s%s",
|
||||||
dateString,
|
dateString,
|
||||||
fs.SizeSuffix(s.bytes),
|
fs.SizeSuffix(s.bytes),
|
||||||
fs.SizeSuffix(totalSize).Unit("Bytes"),
|
fs.SizeSuffix(totalSize).Unit("Bytes"),
|
||||||
@@ -283,6 +283,7 @@ func (s *StatsInfo) String() string {
|
|||||||
)
|
)
|
||||||
|
|
||||||
if !fs.Config.StatsOneLine {
|
if !fs.Config.StatsOneLine {
|
||||||
|
_, _ = buf.WriteRune('\n')
|
||||||
errorDetails := ""
|
errorDetails := ""
|
||||||
switch {
|
switch {
|
||||||
case s.fatalError:
|
case s.fatalError:
|
||||||
@@ -291,6 +292,7 @@ func (s *StatsInfo) String() string {
|
|||||||
errorDetails = " (retrying may help)"
|
errorDetails = " (retrying may help)"
|
||||||
case s.errors != 0:
|
case s.errors != 0:
|
||||||
errorDetails = " (no need to retry)"
|
errorDetails = " (no need to retry)"
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Add only non zero stats
|
// Add only non zero stats
|
||||||
|
|||||||
Reference in New Issue
Block a user