1
0
mirror of https://github.com/rclone/rclone.git synced 2026-02-03 02:03:20 +00:00

Compare commits

..

1 Commits

Author SHA1 Message Date
Nick Craig-Wood
4d97a6d2c1 local: when overwriting a symlink with a file delete the link first
If the destination file was a symlink, rclone would update the pointed
to file instead of replacing the symlink with a file.

rsync replaces files with symlinks and symlinks with files silently so
rclone should do the same.

Fixes #3400
2020-09-08 15:35:36 +01:00
2 changed files with 30 additions and 261 deletions

View File

@@ -22,7 +22,6 @@ of path_display and all will be well.
*/
import (
"bytes"
"context"
"fmt"
"io"
@@ -30,11 +29,9 @@ import (
"path"
"regexp"
"strings"
"sync"
"time"
"github.com/dropbox/dropbox-sdk-go-unofficial/dropbox"
"github.com/dropbox/dropbox-sdk-go-unofficial/dropbox/async"
"github.com/dropbox/dropbox-sdk-go-unofficial/dropbox/auth"
"github.com/dropbox/dropbox-sdk-go-unofficial/dropbox/common"
"github.com/dropbox/dropbox-sdk-go-unofficial/dropbox/files"
@@ -50,7 +47,6 @@ import (
"github.com/rclone/rclone/fs/config/obscure"
"github.com/rclone/rclone/fs/fserrors"
"github.com/rclone/rclone/fs/hash"
"github.com/rclone/rclone/lib/atexit"
"github.com/rclone/rclone/lib/encoder"
"github.com/rclone/rclone/lib/oauthutil"
"github.com/rclone/rclone/lib/pacer"
@@ -65,7 +61,6 @@ const (
minSleep = 10 * time.Millisecond
maxSleep = 2 * time.Second
decayConstant = 2 // bigger for slower decay, exponential
maxBatchSize = 1000
// Upload chunk size - setting too small makes uploads slow.
// Chunks are buffered into memory for retries.
//
@@ -147,23 +142,6 @@ memory. It can be set smaller if you are tight on memory.`, maxChunkSize),
Help: "Impersonate this user when using a business account.",
Default: "",
Advanced: true,
}, {
Name: "batch",
Help: `Enable batching of files if non-zero.
This sets the batch size of files to upload. It has to be less than 1000. A
sensible setting is probably 1000 if you are using this feature.
Rclone will close any outstanding batches when it exits.
Setting this is a great idea if you are uploading lots of small files as it will
make them a lot quicker. You can use --transfers 32 to maximise throughput.
It has the downside that rclone can't check the hash of the file after upload,
so using "rclone check" after the transfer completes is recommended.
`,
Default: 0,
Advanced: true,
}, {
Name: config.ConfigEncoding,
Help: config.ConfigEncodingHelp,
@@ -185,7 +163,6 @@ so using "rclone check" after the transfer completes is recommended.
type Options struct {
ChunkSize fs.SizeSuffix `config:"chunk_size"`
Impersonate string `config:"impersonate"`
Batch int `config:"batch"`
Enc encoder.MultiEncoder `config:"encoding"`
}
@@ -203,7 +180,6 @@ type Fs struct {
slashRootSlash string // root with "/" prefix and postfix, lowercase
pacer *fs.Pacer // To pace the API calls
ns string // The namespace we are using or "" for none
batcher *batcher // batch builder
}
// Object describes a dropbox object
@@ -219,165 +195,6 @@ type Object struct {
// ------------------------------------------------------------
// batcher holds info about the current items waiting for upload
type batcher struct {
f *Fs // Fs this batch is part of
mu sync.Mutex // lock for vars below
commitMu sync.Mutex // lock for waiting for batch
maxBatch int // maximum size for batch
active int // number of batches being sent
items []*files.UploadSessionFinishArg // current uncommitted files
atexit atexit.FnHandle // atexit handle
}
// newBatcher creates a new batcher structure
func newBatcher(f *Fs, maxBatch int) *batcher {
return &batcher{
f: f,
maxBatch: maxBatch,
}
}
// Start starts adding an item to a batch returning true if it was
// successfully started
//
// This should be paired with End
func (b *batcher) Start() bool {
if b.maxBatch <= 0 {
return false
}
b.mu.Lock()
defer b.mu.Unlock()
b.active++
// FIXME set a timer or something
return true
}
// End ends adding an item
func (b *batcher) End(started bool) error {
if !started {
return nil
}
b.mu.Lock()
defer b.mu.Unlock()
b.active--
if len(b.items) < b.maxBatch {
return nil
}
return b._commit(false)
}
// Waits for the batch to complete - call with batchMu held
func (b *batcher) _waitForBatchResult(res *files.UploadSessionFinishBatchLaunch) (batchResult *files.UploadSessionFinishBatchResult, err error) {
if res.AsyncJobId == "" {
return res.Complete, nil
}
var batchStatus *files.UploadSessionFinishBatchJobStatus
sleepTime := time.Second
const maxTries = 120
for try := 1; try <= maxTries; try++ {
err = b.f.pacer.Call(func() (bool, error) {
batchStatus, err = b.f.srv.UploadSessionFinishBatchCheck(&async.PollArg{
AsyncJobId: res.AsyncJobId,
})
return shouldRetry(err)
})
if err != nil {
fs.Errorf(b.f, "failed to wait for batch: %v", err)
break
}
if batchStatus.Tag == "complete" {
break
}
fs.Debugf(b.f, "sleeping for %v to wait for batch to complete, try %d/%d", sleepTime, try, maxTries)
time.Sleep(sleepTime)
}
return batchStatus.Complete, nil
}
// commit a batch - call with batchMu held
//
// if finalizing is true then it doesn't unregister Finalize as this
// causes a deadlock during finalization.
func (b *batcher) _commit(finalizing bool) (err error) {
b.commitMu.Lock()
batch := "batch"
if finalizing {
batch = "last batch"
}
fs.Debugf(b.f, "comitting %s length %d", batch, len(b.items))
var arg = &files.UploadSessionFinishBatchArg{
Entries: b.items,
}
var res *files.UploadSessionFinishBatchLaunch
err = b.f.pacer.Call(func() (bool, error) {
res, err = b.f.srv.UploadSessionFinishBatch(arg)
// If error is insufficient space then don't retry
if e, ok := err.(files.UploadSessionFinishAPIError); ok {
if e.EndpointError != nil && e.EndpointError.Path != nil && e.EndpointError.Path.Tag == files.WriteErrorInsufficientSpace {
err = fserrors.NoRetryError(err)
return false, err
}
}
// after the first chunk is uploaded, we retry everything
return err != nil, err
})
if err != nil {
b.commitMu.Unlock()
return err
}
// Clear batch
b.items = nil
// If finalizing, don't unregister or get result
if finalizing {
b.commitMu.Unlock()
return nil
}
// Unregister the atexit since queue is empty
atexit.Unregister(b.atexit)
b.atexit = nil
// Wait for the batch to finish before we proceed in the background
go func() {
defer b.commitMu.Unlock()
_, err = b._waitForBatchResult(res)
if err != nil {
fs.Errorf(b.f, "Error waiting for batch to finish: %v", err)
}
}()
return nil
}
// Add adds a finished item to the batch
func (b *batcher) Add(commitInfo *files.UploadSessionFinishArg) {
fs.Debugf(b.f, "adding %q to batch", commitInfo.Commit.Path)
b.mu.Lock()
defer b.mu.Unlock()
b.items = append(b.items, commitInfo)
if b.atexit == nil {
b.atexit = atexit.Register(b.Finalize)
}
}
// Finalize finishes any pending batches
func (b *batcher) Finalize() {
b.mu.Lock()
defer b.mu.Unlock()
if len(b.items) == 0 {
return
}
err := b._commit(true)
if err != nil {
fs.Errorf(b.f, "Failed to finalize last batch: %v", err)
}
}
// ------------------------------------------------------------
// Name of the remote (as passed into NewFs)
func (f *Fs) Name() string {
return f.name
@@ -413,7 +230,7 @@ func shouldRetry(err error) (bool, error) {
switch e := err.(type) {
case auth.RateLimitAPIError:
if e.RateLimitError.RetryAfter > 0 {
fs.Logf(baseErrString, "Too many requests or write operations. Trying again in %d seconds.", e.RateLimitError.RetryAfter)
fs.Debugf(baseErrString, "Too many requests or write operations. Trying again in %d seconds.", e.RateLimitError.RetryAfter)
err = pacer.RetryAfterError(err, time.Duration(e.RateLimitError.RetryAfter)*time.Second)
}
return true, err
@@ -456,9 +273,6 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) {
if err != nil {
return nil, errors.Wrap(err, "dropbox: chunk size")
}
if opt.Batch > maxBatchSize || opt.Batch < 0 {
return nil, errors.Errorf("dropbox: batch must be < %d and >= 0 - it is currently %d", maxBatchSize, opt.Batch)
}
// Convert the old token if it exists. The old token was just
// just a string, the new one is a JSON blob
@@ -483,7 +297,6 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) {
opt: *opt,
pacer: fs.NewPacer(pacer.NewDefault(pacer.MinSleep(minSleep), pacer.MaxSleep(maxSleep), pacer.DecayConstant(decayConstant))),
}
f.batcher = newBatcher(f, f.opt.Batch)
config := dropbox.Config{
LogLevel: dropbox.LogOff, // logging in the SDK: LogOff, LogDebug, LogInfo
Client: oAuthClient, // maybe???
@@ -1231,13 +1044,6 @@ func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (in io.Read
// unknown (i.e. -1) or smaller than uploadChunkSize, the method incurs an
// avoidable request to the Dropbox API that does not carry payload.
func (o *Object) uploadChunked(in0 io.Reader, commitInfo *files.CommitInfo, size int64) (entry *files.FileMetadata, err error) {
batching := o.fs.batcher.Start()
defer func() {
batchErr := o.fs.batcher.End(batching)
if err != nil {
err = batchErr
}
}()
chunkSize := int64(o.fs.opt.ChunkSize)
chunks := 0
if size != -1 {
@@ -1251,15 +1057,11 @@ func (o *Object) uploadChunked(in0 io.Reader, commitInfo *files.CommitInfo, size
fs.Debugf(o, "Streaming chunk %d/%d", cur, cur)
} else if chunks == 0 {
fs.Debugf(o, "Streaming chunk %d/unknown", cur)
} else if chunks != 1 {
} else {
fs.Debugf(o, "Uploading chunk %d/%d", cur, chunks)
}
}
appendArg := files.UploadSessionAppendArg{
Close: chunks == 1,
}
// write the first chunk
fmtChunk(1, false)
var res *files.UploadSessionStartResult
@@ -1269,10 +1071,7 @@ func (o *Object) uploadChunked(in0 io.Reader, commitInfo *files.CommitInfo, size
if _, err = chunk.Seek(0, io.SeekStart); err != nil {
return false, nil
}
arg := files.UploadSessionStartArg{
Close: appendArg.Close,
}
res, err = o.fs.srv.UploadSessionStart(&arg, chunk)
res, err = o.fs.srv.UploadSessionStart(&files.UploadSessionStartArg{}, chunk)
return shouldRetry(err)
})
if err != nil {
@@ -1283,34 +1082,22 @@ func (o *Object) uploadChunked(in0 io.Reader, commitInfo *files.CommitInfo, size
SessionId: res.SessionId,
Offset: 0,
}
appendArg.Cursor = &cursor
appendArg := files.UploadSessionAppendArg{
Cursor: &cursor,
Close: false,
}
// write more whole chunks (if any, and if !batching), if
// batching write the last chunk also.
// write more whole chunks (if any)
currentChunk := 2
for {
if chunks > 0 {
// Size known
if currentChunk == chunks {
// Last chunk
if !batching {
// if the size is known, only upload full chunks. Remaining bytes are uploaded with
// the UploadSessionFinish request.
break
}
appendArg.Close = true
} else if currentChunk > chunks {
break
}
} else {
// Size unknown
lastReadWasShort := in.BytesRead()-cursor.Offset < uint64(chunkSize)
if lastReadWasShort {
// if the size is unknown, upload as long as we can read full chunks from the reader.
// The UploadSessionFinish request will not contain any payload.
// This is also what we want if batching
break
}
if chunks > 0 && currentChunk >= chunks {
// if the size is known, only upload full chunks. Remaining bytes are uploaded with
// the UploadSessionFinish request.
break
} else if chunks == 0 && in.BytesRead()-cursor.Offset < uint64(chunkSize) {
// if the size is unknown, upload as long as we can read full chunks from the reader.
// The UploadSessionFinish request will not contain any payload.
break
}
cursor.Offset = in.BytesRead()
fmtChunk(currentChunk, false)
@@ -1336,26 +1123,6 @@ func (o *Object) uploadChunked(in0 io.Reader, commitInfo *files.CommitInfo, size
Cursor: &cursor,
Commit: commitInfo,
}
// If we are batching then we should have written all the data now
// store the commit info now for a batch commit
if batching {
// If we haven't closed the session then we need to
if !appendArg.Close {
fs.Debugf(o, "Closing session")
var empty bytes.Buffer
err = o.fs.pacer.Call(func() (bool, error) {
err = o.fs.srv.UploadSessionAppendV2(&appendArg, &empty)
// after the first chunk is uploaded, we retry everything
return err != nil, err
})
if err != nil {
return nil, err
}
}
o.fs.batcher.Add(args)
return nil, nil
}
fmtChunk(currentChunk, true)
chunk = readers.NewRepeatableReaderBuffer(in, buf)
err = o.fs.pacer.Call(func() (bool, error) {
@@ -1398,7 +1165,7 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
size := src.Size()
var err error
var entry *files.FileMetadata
if size > int64(o.fs.opt.ChunkSize) || size == -1 || o.fs.opt.Batch > 0 {
if size > int64(o.fs.opt.ChunkSize) || size == -1 {
entry, err = o.uploadChunked(in, commitInfo, size)
} else {
err = o.fs.pacer.CallNoRetry(func() (bool, error) {
@@ -1409,13 +1176,6 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
if err != nil {
return errors.Wrap(err, "upload failed")
}
// If we haven't received data back from batch upload then fake it
if entry == nil {
o.bytes = size
o.modTime = commitInfo.ClientModified
o.hash = "" // we don't have this
return nil
}
return o.setMetadataFromEntry(entry)
}

View File

@@ -990,7 +990,7 @@ func (o *Object) openTranslatedLink(offset, limit int64) (lrc io.ReadCloser, err
// Read the link and return the destination it as the contents of the object
linkdst, err := os.Readlink(o.path)
if err != nil {
return nil, err
return nil, errors.Wrap(err, "read link")
}
return readers.NewLimitedReadCloser(ioutil.NopCloser(strings.NewReader(linkdst[offset:])), limit), nil
}
@@ -1033,7 +1033,7 @@ func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (in io.Read
fd, err := file.Open(o.path)
if err != nil {
return
return nil, errors.Wrap(err, "Open")
}
wrappedFd := readers.NewLimitedReadCloser(newFadviseReadCloser(o, fd, offset, limit), limit)
if offset != 0 {
@@ -1098,6 +1098,15 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
// If it is a translated link, just read in the contents, and
// then create a symlink
if !o.translatedLink {
fi, err := os.Lstat(o.path)
// if object is currently a symlink remove it
if err == nil && fi.Mode()&os.ModeSymlink != 0 {
fs.Debugf(o, "Deleting as is symlink before updating as file")
err = os.Remove(o.path)
if err != nil {
fs.Debugf(o, "Failed to removing symlink: %v", err)
}
}
f, err := file.OpenFile(o.path, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0666)
if err != nil {
if runtime.GOOS == "windows" && os.IsPermission(err) {
@@ -1106,10 +1115,10 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
// See: https://stackoverflow.com/questions/13215716/ioerror-errno-13-permission-denied-when-trying-to-open-hidden-file-in-w-mod
f, err = file.OpenFile(o.path, os.O_WRONLY|os.O_TRUNC, 0666)
if err != nil {
return err
return errors.Wrap(err, "Update Windows")
}
} else {
return err
return errors.Wrap(err, "Update")
}
}
// Pre-allocate the file for performance reasons