1
0
mirror of https://github.com/rclone/rclone.git synced 2026-01-23 12:53:28 +00:00

Compare commits

..

14 Commits

Author SHA1 Message Date
Nick Craig-Wood
8c8a38711b Replace sync with lib/sync for deadlock detection with deadlock build tag 2021-11-22 18:15:07 +00:00
Nick Craig-Wood
386deb3633 lib/sync: a wrapper for inserting go-deadlock into the rclone build 2021-11-22 18:08:12 +00:00
Nick Craig-Wood
a351484997 sftp: fix timeout on hashing large files by sending keepalives
Before this fix the SFTP sessions could timeout when doing hashes if
they took longer than the --timeout parameter.

This patch sends keepalive packets every minute while a shell command
is running to keep the connection open.

See: https://forum.rclone.org/t/rclone-check-over-sftp-failure-to-calculate-md5-hash-for-large-files/27487
2021-11-22 15:26:29 +00:00
Nick Craig-Wood
099eff8891 sftp: refactor so we only have one way of running remote commands
This also returns errors from running ssh Hash commands which we
didn't do before.
2021-11-22 15:26:29 +00:00
albertony
c4cb167d4a Add rsapkf and Will Holtz to contributors 2021-11-21 19:26:05 +01:00
Will Holtz
38e100ab19 docs/config: more explicit doc for config create --all with params 2021-11-21 19:22:19 +01:00
rsapkf
db95a0d6c3 docs/pcloud: fix typo 2021-11-21 19:16:19 +01:00
Nick Craig-Wood
df07964db3 azureblob: raise --azureblob-upload-concurrency to 16 by default
After speed testing it was discovered that upload speed goes up pretty
much linearly with upload concurrency. This patch changes the default
from 4 to 16 which means that rclone will use 16 * 4M = 64M per
transfer which is OK even for low memory devices.

This adds a note that performance may be increased by increasing
upload concurrency.

See: https://forum.rclone.org/t/performance-of-rclone-vs-azcopy/27437/9
2021-11-18 16:09:02 +00:00
Nick Craig-Wood
fbc4c4ad9a azureblob: remove 100MB upper limit on chunk_size as it is no longer needed 2021-11-18 16:09:02 +00:00
Nick Craig-Wood
4454b3e1ae azureblob: implement --azureblob-upload-concurrency parameter to speed uploads
See: https://forum.rclone.org/t/performance-of-rclone-vs-azcopy/27437
2021-11-18 16:08:57 +00:00
Nick Craig-Wood
f9321fccbb Add deinferno to contributors 2021-11-18 15:51:45 +00:00
Ole Frost
3c2252b7c0 fs/operations: add server-side moves to stats
Fixes #5430
2021-11-18 12:20:56 +00:00
Cnly
51c952654c fstests: treat accountUpgradeRequired as success for OneDrive PublicLink 2021-11-17 17:35:17 +00:00
deinferno
80e47be65f yandex: add permanent deletion support 2021-11-17 16:57:41 +00:00
149 changed files with 482 additions and 1089 deletions

View File

@@ -19,7 +19,7 @@ import (
"path"
"strconv"
"strings"
"sync"
"github.com/rclone/rclone/lib/sync"
"time"
"github.com/Azure/azure-pipeline-go/pipeline"
@@ -50,8 +50,6 @@ const (
timeFormatOut = "2006-01-02T15:04:05.000000000Z07:00"
storageDefaultBaseURL = "blob.core.windows.net"
defaultChunkSize = 4 * fs.Mebi
maxChunkSize = 100 * fs.Mebi
uploadConcurrency = 4
defaultAccessTier = azblob.AccessTierNone
maxTryTimeout = time.Hour * 24 * 365 //max time of an azure web request response window (whether or not data is flowing)
// Default storage account, key and blob endpoint for emulator support,
@@ -134,12 +132,33 @@ msi_client_id, or msi_mi_res_id parameters.`,
Advanced: true,
}, {
Name: "chunk_size",
Help: `Upload chunk size (<= 100 MiB).
Help: `Upload chunk size.
Note that this is stored in memory and there may be up to
"--transfers" chunks stored at once in memory.`,
"--transfers" * "--azureblob-upload-concurrency" chunks stored at once
in memory.`,
Default: defaultChunkSize,
Advanced: true,
}, {
Name: "upload_concurrency",
Help: `Concurrency for multipart uploads.
This is the number of chunks of the same file that are uploaded
concurrently.
If you are uploading small numbers of large files over high-speed
links and these uploads do not fully utilize your bandwidth, then
increasing this may help to speed up the transfers.
In tests, upload speed increases almost linearly with upload
concurrency. For example to fill a gigabit pipe it may be necessary to
raise this to 64. Note that this will use more memory.
Note that chunks are stored in memory and there may be up to
"--transfers" * "--azureblob-upload-concurrency" chunks stored at once
in memory.`,
Default: 16,
Advanced: true,
}, {
Name: "list_chunk",
Help: `Size of blob list.
@@ -257,6 +276,7 @@ type Options struct {
Endpoint string `config:"endpoint"`
SASURL string `config:"sas_url"`
ChunkSize fs.SizeSuffix `config:"chunk_size"`
UploadConcurrency int `config:"upload_concurrency"`
ListChunkSize uint `config:"list_chunk"`
AccessTier string `config:"access_tier"`
ArchiveTierDelete bool `config:"archive_tier_delete"`
@@ -416,9 +436,6 @@ func checkUploadChunkSize(cs fs.SizeSuffix) error {
if cs < minChunkSize {
return fmt.Errorf("%s is less than %s", cs, minChunkSize)
}
if cs > maxChunkSize {
return fmt.Errorf("%s is greater than %s", cs, maxChunkSize)
}
return nil
}
@@ -1667,10 +1684,10 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
putBlobOptions := azblob.UploadStreamToBlockBlobOptions{
BufferSize: int(o.fs.opt.ChunkSize),
MaxBuffers: uploadConcurrency,
MaxBuffers: o.fs.opt.UploadConcurrency,
Metadata: o.meta,
BlobHTTPHeaders: httpHeaders,
TransferManager: o.fs.newPoolWrapper(uploadConcurrency),
TransferManager: o.fs.newPoolWrapper(o.fs.opt.UploadConcurrency),
}
// Don't retry, return a retry error instead

View File

@@ -17,12 +17,10 @@ import (
// TestIntegration runs integration tests against the remote
func TestIntegration(t *testing.T) {
fstests.Run(t, &fstests.Opt{
RemoteName: "TestAzureBlob:",
NilObject: (*Object)(nil),
TiersToTest: []string{"Hot", "Cool"},
ChunkedUpload: fstests.ChunkedUploadConfig{
MaxChunkSize: maxChunkSize,
},
RemoteName: "TestAzureBlob:",
NilObject: (*Object)(nil),
TiersToTest: []string{"Hot", "Cool"},
ChunkedUpload: fstests.ChunkedUploadConfig{},
})
}

View File

@@ -17,7 +17,7 @@ import (
"path"
"strconv"
"strings"
"sync"
"github.com/rclone/rclone/lib/sync"
"time"
"github.com/rclone/rclone/backend/b2/api"

View File

@@ -13,7 +13,7 @@ import (
gohash "hash"
"io"
"strings"
"sync"
"github.com/rclone/rclone/lib/sync"
"github.com/rclone/rclone/backend/b2/api"
"github.com/rclone/rclone/fs"

View File

@@ -23,7 +23,7 @@ import (
"path"
"strconv"
"strings"
"sync"
"github.com/rclone/rclone/lib/sync"
"sync/atomic"
"time"

View File

@@ -13,7 +13,7 @@ import (
"io"
"net/http"
"strconv"
"sync"
"github.com/rclone/rclone/lib/sync"
"time"
"github.com/rclone/rclone/backend/box/api"

View File

@@ -16,7 +16,7 @@ import (
"sort"
"strconv"
"strings"
"sync"
"github.com/rclone/rclone/lib/sync"
"syscall"
"time"

View File

@@ -18,7 +18,7 @@ func TestIntegration(t *testing.T) {
fstests.Run(t, &fstests.Opt{
RemoteName: "TestCache:",
NilObject: (*cache.Object)(nil),
UnimplementableFsMethods: []string{"PublicLink", "OpenWriterAt", "Resume"},
UnimplementableFsMethods: []string{"PublicLink", "OpenWriterAt"},
UnimplementableObjectMethods: []string{"MimeType", "ID", "GetTier", "SetTier"},
SkipInvalidUTF8: true, // invalid UTF-8 confuses the cache
})

View File

@@ -11,7 +11,7 @@ import (
"path"
"runtime"
"strings"
"sync"
"github.com/rclone/rclone/lib/sync"
"time"
"github.com/rclone/rclone/fs"

View File

@@ -8,7 +8,7 @@ import (
"fmt"
"io"
"path"
"sync"
"github.com/rclone/rclone/lib/sync"
"time"
"github.com/rclone/rclone/fs"

View File

@@ -12,7 +12,7 @@ import (
"net/http"
"net/url"
"strings"
"sync"
"github.com/rclone/rclone/lib/sync"
"time"
cache "github.com/patrickmn/go-cache"

View File

@@ -14,7 +14,7 @@ import (
"path"
"strconv"
"strings"
"sync"
"github.com/rclone/rclone/lib/sync"
"time"
"github.com/rclone/rclone/fs"

View File

@@ -6,8 +6,6 @@ import (
"context"
"crypto/md5"
"crypto/sha1"
"encoding"
"encoding/base64"
"encoding/hex"
"encoding/json"
"errors"
@@ -15,14 +13,13 @@ import (
gohash "hash"
"io"
"io/ioutil"
"log"
"math/rand"
"path"
"regexp"
"sort"
"strconv"
"strings"
"sync"
"github.com/rclone/rclone/lib/sync"
"time"
"github.com/rclone/rclone/fs"
@@ -382,8 +379,6 @@ type Fs struct {
features *fs.Features // optional features
dirSort bool // reserved for future, ignored
useNoRename bool // can be set with the transactions option
hashState string // set in resume(), used to restore hash state
resumeXactID string // set in resume(), allows reuse of xactID upon resume
}
// configure sets up chunker for given name format, meta format and hash type.
@@ -1157,41 +1152,7 @@ func (f *Fs) put(
// Prepare to upload
c := f.newChunkingReader(src)
// Prepare for resume if resumable
var resumeOpt *fs.OptionResume
// partialHashState will be used in wrapStream to restore hash state
var partialHashState []byte
for _, option := range options {
switch option.(type) {
case *fs.OptionResume:
resumeOpt = option.(*fs.OptionResume)
if resumeOpt.Pos != 0 {
numChunksOnRemote := resumeOpt.Pos / int64(f.opt.ChunkSize)
// Checks for existing chunks on the remote
for i := 0; i < int(numChunksOnRemote); i++ {
existingChunkName := f.makeChunkName(remote, i, "", f.resumeXactID)
existingChunk, err := f.base.NewObject(ctx, existingChunkName)
// If NewObject returns an error the chunk likely doesn't exist on the remote and we cannot resume
if err != nil {
resumeOpt.Pos = 0
c.chunks = nil
break
}
c.chunks = append(c.chunks, existingChunk)
}
fs.Debugf(f, "Resuming at chunk number: %d", numChunksOnRemote)
partialHashState, _ = base64.StdEncoding.DecodeString(f.hashState)
// Discard bytes that already exist on remote
written, err := io.CopyN(ioutil.Discard, in, resumeOpt.Pos)
if err != nil {
return nil, err
}
c.accountBytes(written)
c.sizeLeft = c.sizeTotal - c.readCount
}
}
}
wrapIn := c.wrapStream(ctx, in, src, partialHashState)
wrapIn := c.wrapStream(ctx, in, src)
var metaObject fs.Object
defer func() {
@@ -1201,22 +1162,13 @@ func (f *Fs) put(
}()
baseRemote := remote
var xactID string
if resumeOpt != nil && resumeOpt.Pos != 0 {
xactID = f.resumeXactID
} else {
xactID, err = f.newXactID(ctx, baseRemote)
if err != nil {
return nil, err
}
xactID, errXact := f.newXactID(ctx, baseRemote)
if errXact != nil {
return nil, errXact
}
// Transfer chunks data
for c.chunkNo = 0; !c.done; c.chunkNo++ {
// skip to chunk we can resume from if resumeOpt is set
if c.chunkNo == 0 && resumeOpt != nil && resumeOpt.Pos != 0 {
c.chunkNo = int(resumeOpt.Pos) / int(f.opt.ChunkSize)
}
if c.chunkNo > maxSafeChunkNumber {
return nil, ErrChunkOverflow
}
@@ -1278,41 +1230,6 @@ func (f *Fs) put(
c.chunkLimit = c.chunkSize
c.chunks = append(c.chunks, chunk)
// If an OptionResume was passed than we should call SetID so a resume can be attempted in event of a failure
// ID keeps track of the first chunk that should be uploaded if a resume is attempted
if resumeOpt != nil {
// Publish hash state to control chunk
marshaler, ok := c.hasher.(encoding.BinaryMarshaler)
if !ok {
return nil, fmt.Errorf("The hash type does not implement encoding.BinaryMarshaler")
}
state, err := marshaler.MarshalBinary()
if err != nil {
return nil, err
}
hashType := f.opt.HashType
data, err := marshalPartialHashJSON(ctx, hashType, base64.StdEncoding.EncodeToString(state))
if err != nil {
return nil, err
}
controlChunkName := f.makeChunkName(remote, -1, "phash", xactID)
controlInfo := f.wrapInfo(src, controlChunkName, int64(len(data)))
controlChunk, err := basePut(ctx, bytes.NewReader(data), controlInfo)
defer func() {
_ = controlChunk.Remove(ctx)
}()
if err != nil {
return nil, err
}
positionStr := strconv.Itoa(c.chunkNo + 1) // stores the number of chunks uploaded
chunkSizeStr := strconv.FormatInt(c.chunkSize, 10)
startFromStr := strconv.FormatInt(int64(f.opt.StartFrom), 10)
err = resumeOpt.SetID(ctx, chunkSizeStr+","+startFromStr+","+positionStr+","+xactID, f.opt.HashType, base64.StdEncoding.EncodeToString(state))
if err != nil {
return nil, err
}
}
}
// Validate uploaded size
@@ -1439,7 +1356,7 @@ func (f *Fs) newChunkingReader(src fs.ObjectInfo) *chunkingReader {
return c
}
func (c *chunkingReader) wrapStream(ctx context.Context, in io.Reader, src fs.ObjectInfo, partialHashState []byte) io.Reader {
func (c *chunkingReader) wrapStream(ctx context.Context, in io.Reader, src fs.ObjectInfo) io.Reader {
baseIn, wrapBack := accounting.UnWrap(in)
switch {
@@ -1474,15 +1391,6 @@ func (c *chunkingReader) wrapStream(ctx context.Context, in io.Reader, src fs.Ob
}
if c.hasher != nil {
// Restores hash state during a resume
if partialHashState != nil {
unmarshaler, ok := c.hasher.(encoding.BinaryUnmarshaler)
if ok {
if err := unmarshaler.UnmarshalBinary(partialHashState); err != nil {
log.Fatal("unable to unmarshal hash:", err)
}
}
}
baseIn = io.TeeReader(baseIn, c.hasher)
}
c.baseReader = baseIn
@@ -2602,34 +2510,6 @@ func unmarshalSimpleJSON(ctx context.Context, metaObject fs.Object, data []byte)
return info, true, nil
}
// Format for partial hash control chunks
type partialHashJSON struct {
HashType string `json:"htype"`
PartialHash string `json:"phash"`
}
// marshalPartialHashJSON
//
// Creates a JSON containing the hashType being used and the partial hash state. This will be stored in
// a control chunk and used for resume functionality.
//
func marshalPartialHashJSON(ctx context.Context, hashType, partialHash string) ([]byte, error) {
controlData := partialHashJSON{
HashType: hashType,
PartialHash: partialHash,
}
data, err := json.Marshal(&controlData)
return data, err
}
// unmarshalPartialHashJSON parses partial hash control chunk.
//
func unmarshalPartialHashJSON(ctx context.Context, data []byte) (hashType, partialHashState string, err error) {
var partialHashData partialHashJSON
err = json.Unmarshal(data, &partialHashData)
return partialHashData.HashType, partialHashData.PartialHash, err
}
func silentlyRemove(ctx context.Context, o fs.Object) {
_ = o.Remove(ctx) // ignore error
}
@@ -2664,58 +2544,9 @@ func (f *Fs) CanQuickRename() bool {
return f.base.Features().Move != nil
}
// Resume checks whether the (remote, ID) pair is valid and returns
// the point the file should be resumed from or an error.
func (f *Fs) Resume(ctx context.Context, remote, ID, hashName, hashState string) (Pos int64, err error) {
idSlice := strings.Split(ID, ",")
cachedChunkSize, err := strconv.ParseInt(idSlice[0], 10, 64)
cachedStartFrom, err := strconv.ParseInt(idSlice[1], 10, 64)
cachedChunkNo, err := strconv.ParseInt(idSlice[2], 10, 64)
cachedXactID := idSlice[3]
if err != nil {
return 0, err
}
if cachedChunkSize != int64(f.opt.ChunkSize) {
return 0, errors.New("ChunkSize doesn't match for file we are trying to resume")
}
if f.opt.StartFrom != int(cachedStartFrom) {
return 0, errors.New("StartFrom doesn't match for file we are trying to resume")
}
// Check partial hash control chunk
controlChunkName := f.makeChunkName(remote, -1, "phash", cachedXactID)
hashControlChunk, err := f.base.NewObject(ctx, controlChunkName)
if err != nil {
return 0, err
}
reader, err := hashControlChunk.Open(ctx)
data, err := ioutil.ReadAll(reader)
_ = reader.Close() // ensure file handle is freed on windows
if err != nil {
return 0, err
}
remoteHashType, remoteHashState, err := unmarshalPartialHashJSON(ctx, data)
if remoteHashType == hashName && remoteHashState == hashState {
if f.opt.HashType != remoteHashType {
fs.Debugf(f, "Resume skipped, mismatch hash types. prev: %s, curr: %s", remoteHashType, f.opt.HashType)
return 0, nil
}
pos := cachedChunkNo * cachedChunkSize
if err != nil {
return 0, err
}
f.hashState = hashState
f.resumeXactID = cachedXactID
return pos, nil
}
// No valid control chunks found, rewind from start
return 0, nil
}
// Check the interfaces are satisfied
var (
_ fs.Fs = (*Fs)(nil)
_ fs.Resumer = (*Fs)(nil)
_ fs.Purger = (*Fs)(nil)
_ fs.Copier = (*Fs)(nil)
_ fs.Mover = (*Fs)(nil)

View File

@@ -13,7 +13,7 @@ import (
"io"
"strconv"
"strings"
"sync"
"github.com/rclone/rclone/lib/sync"
"time"
"unicode/utf8"

View File

@@ -23,7 +23,7 @@ func TestIntegration(t *testing.T) {
fstests.Run(t, &fstests.Opt{
RemoteName: *fstest.RemoteName,
NilObject: (*crypt.Object)(nil),
UnimplementableFsMethods: []string{"OpenWriterAt", "Resume"},
UnimplementableFsMethods: []string{"OpenWriterAt"},
UnimplementableObjectMethods: []string{"MimeType"},
})
}

View File

@@ -21,7 +21,7 @@ import (
"sort"
"strconv"
"strings"
"sync"
"github.com/rclone/rclone/lib/sync"
"sync/atomic"
"text/template"
"time"

View File

@@ -10,7 +10,7 @@ import (
"context"
"errors"
"fmt"
"sync"
"github.com/rclone/rclone/lib/sync"
"time"
"github.com/dropbox/dropbox-sdk-go-unofficial/v6/dropbox/async"

View File

@@ -25,7 +25,7 @@ import (
"net/url"
"path"
"strings"
"sync"
"github.com/rclone/rclone/lib/sync"
"sync/atomic"
"time"

View File

@@ -12,7 +12,7 @@ import (
"path"
"runtime"
"strings"
"sync"
"github.com/rclone/rclone/lib/sync"
"time"
"github.com/jlaffaye/ftp"

View File

@@ -5,7 +5,7 @@ package googlephotos
import (
"path"
"strings"
"sync"
"github.com/rclone/rclone/lib/sync"
"github.com/rclone/rclone/backend/googlephotos/api"
)

View File

@@ -15,7 +15,7 @@ import (
"regexp"
"strconv"
"strings"
"sync"
"github.com/rclone/rclone/lib/sync"
"time"
"github.com/rclone/rclone/backend/googlephotos/api"

View File

@@ -9,7 +9,7 @@ import (
"io"
"path"
"strings"
"sync"
"github.com/rclone/rclone/lib/sync"
"time"
"github.com/rclone/rclone/fs"

View File

@@ -15,7 +15,7 @@ import (
"path"
"strconv"
"strings"
"sync"
"github.com/rclone/rclone/lib/sync"
"time"
"github.com/rclone/rclone/fs"

View File

@@ -12,9 +12,8 @@ import (
"path"
"path/filepath"
"runtime"
"strconv"
"strings"
"sync"
"github.com/rclone/rclone/lib/sync"
"time"
"unicode/utf8"
@@ -25,7 +24,6 @@ import (
"github.com/rclone/rclone/fs/config/configstruct"
"github.com/rclone/rclone/fs/fserrors"
"github.com/rclone/rclone/fs/hash"
"github.com/rclone/rclone/lib/atexit"
"github.com/rclone/rclone/lib/encoder"
"github.com/rclone/rclone/lib/file"
"github.com/rclone/rclone/lib/readers"
@@ -232,7 +230,6 @@ type Fs struct {
precision time.Duration // precision of local filesystem
warnedMu sync.Mutex // used for locking access to 'warned'.
warned map[string]struct{} // whether we have warned about this string
hashState map[string]string // set in resume(), used to restore hash state
// do os.Lstat or os.Stat
lstat func(name string) (os.FileInfo, error)
@@ -270,12 +267,11 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e
}
f := &Fs{
name: name,
opt: *opt,
warned: make(map[string]struct{}),
hashState: make(map[string]string),
dev: devUnset,
lstat: os.Lstat,
name: name,
opt: *opt,
warned: make(map[string]struct{}),
dev: devUnset,
lstat: os.Lstat,
}
f.root = cleanRootPath(root, f.opt.NoUNC, f.opt.Enc)
f.features = (&fs.Features{
@@ -1119,7 +1115,6 @@ func (nwc nopWriterCloser) Close() error {
func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (err error) {
var out io.WriteCloser
var hasher *hash.MultiHasher
var resumeOpt *fs.OptionResume
for _, option := range options {
switch x := option.(type) {
@@ -1130,32 +1125,6 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
return err
}
}
case *fs.OptionResume:
resumeOpt = option.(*fs.OptionResume)
if resumeOpt.Pos != 0 {
fs.Logf(o, "Resuming at byte position: %d", resumeOpt.Pos)
// Discard bytes that already exist on backend
_, err := io.CopyN(ioutil.Discard, in, resumeOpt.Pos)
if err != nil {
return err
}
hashType := o.fs.Hashes().GetOne()
if resumeOpt.Hash != "" {
if err = hashType.Set(resumeOpt.Hash); err != nil {
return err
}
if !o.fs.Hashes().Contains(hashType) {
return fmt.Errorf("unsupported resume hash: %q", resumeOpt.Hash)
}
}
hasher, err = hash.NewMultiHasherTypes(hash.NewHashSet(hashType))
if err != nil {
return err
}
if err := hasher.RestoreHashState(hashType, o.fs.hashState[o.remote]); err != nil {
return err
}
}
}
}
@@ -1169,12 +1138,7 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
// If it is a translated link, just read in the contents, and
// then create a symlink
if !o.translatedLink {
var f *os.File
if resumeOpt != nil && resumeOpt.Pos != 0 {
f, err = file.OpenFile(o.path, os.O_WRONLY|os.O_APPEND, 0666)
} else {
f, err = file.OpenFile(o.path, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0666)
}
f, err := file.OpenFile(o.path, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0666)
if err != nil {
if runtime.GOOS == "windows" && os.IsPermission(err) {
// If permission denied on Windows might be trying to update a
@@ -1188,7 +1152,7 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
return err
}
}
if !o.fs.opt.NoPreAllocate && resumeOpt != nil && resumeOpt.Pos == 0 {
if !o.fs.opt.NoPreAllocate {
// Pre-allocate the file for performance reasons
err = file.PreAllocate(src.Size(), f)
if err != nil {
@@ -1209,46 +1173,7 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
in = io.TeeReader(in, hasher)
}
var cacheingWg sync.WaitGroup // Used to halt code execution while resume cache is written
var copyWg sync.WaitGroup // Ensure that io.Copy has returned before writing resume data
copyWg.Add(1)
// Context for read so that we can handle io.copy being interrupted
ctxr, cancel := context.WithCancel(ctx)
// Create exit handler during Copy so that resume data can be written if interrupted
var atexitOnce sync.Once
atexitHandle := atexit.Register(func() {
atexitOnce.Do(func() {
if resumeOpt == nil || hasher == nil {
return
}
// If OptionResume was passed, call SetID to prepare for future resumes
// ID is the number of bytes written to the destination
// Stops the copy so cache is consistent with remote
cacheingWg.Add(1)
cancel()
copyWg.Wait()
fs.Infof(o, "Updating resume cache")
fileInfo, _ := o.fs.lstat(o.path)
writtenStr := strconv.FormatInt(fileInfo.Size(), 10)
hashType := hasher.Hashes().GetOne()
hashState, err := hasher.GetHashState(hashType)
if err == nil {
err = resumeOpt.SetID(ctx, writtenStr, hashType.String(), hashState)
}
if err != nil {
fs.Logf(o, "Updating resume cache failed: %v", err)
}
})
})
cr := readers.NewContextReader(ctxr, in)
_, err = io.Copy(out, cr)
copyWg.Done()
atexit.Unregister(atexitHandle)
if errors.Is(err, context.Canceled) {
// If resume data is being written we want to wait here for the program to exit
cacheingWg.Wait()
}
_, err = io.Copy(out, in)
closeErr := out.Close()
if err == nil {
err = closeErr
@@ -1413,44 +1338,9 @@ func cleanRootPath(s string, noUNC bool, enc encoder.MultiEncoder) string {
return s
}
// Resume checks whether the (remote, ID) pair is valid and returns
// the point the file should be resumed from or an error.
func (f *Fs) Resume(ctx context.Context, remote, ID, hashName, hashState string) (Pos int64, err error) {
cachedPos, err := strconv.ParseInt(ID, 10, 64)
if err != nil {
return 0, err
}
// Compare hash of partial file on remote with partial hash in cache
remoteObject, err := f.NewObject(ctx, remote)
if err != nil {
return 0, err
}
if remoteObject.Size() != cachedPos {
return 0, errors.New("size on remote does not match resume cache")
}
hashType := hash.NameToType(hashName)
remoteHash, err := remoteObject.Hash(ctx, hashType)
if err != nil {
return 0, err
}
cachedHash, err := hash.SumPartialHash(hashName, hashState)
if err != nil {
return 0, err
}
// Hashes match, attempt resume
if cachedHash == remoteHash {
f.hashState[remote] = hashState
return cachedPos, nil
}
// No valid position found, restart from beginning
fs.Infof(remote, "Not resuming as cached hash state did not match hash state on remote")
return 0, nil
}
// Check the interfaces are satisfied
var (
_ fs.Fs = &Fs{}
_ fs.Resumer = &Fs{}
_ fs.Purger = &Fs{}
_ fs.PutStreamer = &Fs{}
_ fs.Mover = &Fs{}

View File

@@ -3,7 +3,7 @@ package local
import (
"io/ioutil"
"os"
"sync"
"github.com/rclone/rclone/lib/sync"
"testing"
"time"

View File

@@ -12,7 +12,7 @@ import (
"sort"
"strconv"
"strings"
"sync"
"github.com/rclone/rclone/lib/sync"
"time"
"encoding/hex"

View File

@@ -22,7 +22,7 @@ import (
"io"
"path"
"strings"
"sync"
"github.com/rclone/rclone/lib/sync"
"time"
"github.com/rclone/rclone/fs"

View File

@@ -11,7 +11,7 @@ import (
"io/ioutil"
"path"
"strings"
"sync"
"github.com/rclone/rclone/lib/sync"
"time"
"github.com/rclone/rclone/fs"

View File

@@ -16,7 +16,7 @@ import (
"regexp"
"strconv"
"strings"
"sync"
"github.com/rclone/rclone/lib/sync"
"time"
"github.com/rclone/rclone/backend/onedrive/api"

View File

@@ -13,7 +13,7 @@ import (
"hash"
"io"
"sort"
"sync"
"github.com/rclone/rclone/lib/sync"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/lib/atexit"

View File

@@ -19,7 +19,7 @@ import (
"sort"
"strconv"
"strings"
"sync"
"github.com/rclone/rclone/lib/sync"
"time"
"github.com/aws/aws-sdk-go/aws"

View File

@@ -4,7 +4,7 @@ import (
"context"
"fmt"
"net/url"
"sync"
"github.com/rclone/rclone/lib/sync"
"time"
"github.com/rclone/rclone/fs"

View File

@@ -10,7 +10,7 @@ import (
"path"
"strconv"
"strings"
"sync"
"github.com/rclone/rclone/lib/sync"
"time"
"github.com/coreos/go-semver/semver"

View File

@@ -17,7 +17,7 @@ import (
"regexp"
"strconv"
"strings"
"sync"
"github.com/rclone/rclone/lib/sync"
"sync/atomic"
"time"
@@ -42,7 +42,8 @@ const (
hashCommandNotSupported = "none"
minSleep = 100 * time.Millisecond
maxSleep = 2 * time.Second
decayConstant = 2 // bigger for slower decay, exponential
decayConstant = 2 // bigger for slower decay, exponential
keepAliveInterval = time.Minute // send keepalives every this long while running commands
)
var (
@@ -339,6 +340,32 @@ func (c *conn) wait() {
c.err <- c.sshClient.Conn.Wait()
}
// Send a keepalive over the ssh connection
func (c *conn) sendKeepAlive() {
_, _, err := c.sshClient.SendRequest("keepalive@openssh.com", true, nil)
if err != nil {
fs.Debugf(nil, "Failed to send keep alive: %v", err)
}
}
// Send keepalives every interval over the ssh connection until done is closed
func (c *conn) sendKeepAlives(interval time.Duration) (done chan struct{}) {
done = make(chan struct{})
go func() {
t := time.NewTicker(interval)
defer t.Stop()
for {
select {
case <-t.C:
c.sendKeepAlive()
case <-done:
return
}
}
}()
return done
}
// Closes the connection
func (c *conn) close() error {
sftpErr := c.sftpClient.Close()
@@ -1098,6 +1125,9 @@ func (f *Fs) run(ctx context.Context, cmd string) ([]byte, error) {
}
defer f.putSftpConnection(&c, err)
// Send keepalives while the connection is open
defer close(c.sendKeepAlives(keepAliveInterval))
session, err := c.sshClient.NewSession()
if err != nil {
return nil, fmt.Errorf("run: get SFTP session: %w", err)
@@ -1110,10 +1140,12 @@ func (f *Fs) run(ctx context.Context, cmd string) ([]byte, error) {
session.Stdout = &stdout
session.Stderr = &stderr
fs.Debugf(f, "Running remote command: %s", cmd)
err = session.Run(cmd)
if err != nil {
return nil, fmt.Errorf("failed to run %q: %s: %w", cmd, stderr.Bytes(), err)
return nil, fmt.Errorf("failed to run %q: %s: %w", cmd, bytes.TrimSpace(stderr.Bytes()), err)
}
fs.Debugf(f, "Remote command result: %s", bytes.TrimSpace(stdout.Bytes()))
return stdout.Bytes(), nil
}
@@ -1230,8 +1262,6 @@ func (o *Object) Remote() string {
// Hash returns the selected checksum of the file
// If no checksum is available it returns ""
func (o *Object) Hash(ctx context.Context, r hash.Type) (string, error) {
o.fs.addSession() // Show session in use
defer o.fs.removeSession()
if o.fs.opt.DisableHashCheck {
return "", nil
}
@@ -1255,36 +1285,16 @@ func (o *Object) Hash(ctx context.Context, r hash.Type) (string, error) {
return "", hash.ErrUnsupported
}
c, err := o.fs.getSftpConnection(ctx)
if err != nil {
return "", fmt.Errorf("Hash get SFTP connection: %w", err)
}
session, err := c.sshClient.NewSession()
o.fs.putSftpConnection(&c, err)
if err != nil {
return "", fmt.Errorf("Hash put SFTP connection: %w", err)
}
var stdout, stderr bytes.Buffer
session.Stdout = &stdout
session.Stderr = &stderr
escapedPath := shellEscape(o.path())
if o.fs.opt.PathOverride != "" {
escapedPath = shellEscape(path.Join(o.fs.opt.PathOverride, o.remote))
}
err = session.Run(hashCmd + " " + escapedPath)
fs.Debugf(nil, "sftp cmd = %s", escapedPath)
b, err := o.fs.run(ctx, hashCmd+" "+escapedPath)
if err != nil {
_ = session.Close()
fs.Debugf(o, "Failed to calculate %v hash: %v (%s)", r, err, bytes.TrimSpace(stderr.Bytes()))
return "", nil
return "", fmt.Errorf("failed to calculate %v hash: %w", r, err)
}
_ = session.Close()
b := stdout.Bytes()
fs.Debugf(nil, "sftp output = %q", b)
str := parseHash(b)
fs.Debugf(nil, "sftp hash = %q", str)
if r == hash.MD5 {
o.md5sum = &str
} else if r == hash.SHA1 {

View File

@@ -3,7 +3,7 @@
package sftp
import "sync"
import "github.com/rclone/rclone/lib/sync"
// stringLock locks for string IDs passed in
type stringLock struct {

View File

@@ -5,7 +5,7 @@ package sftp
import (
"fmt"
"sync"
"github.com/rclone/rclone/lib/sync"
"testing"
"time"

View File

@@ -13,7 +13,7 @@ import (
"fmt"
"io"
"strings"
"sync"
"github.com/rclone/rclone/lib/sync"
"github.com/rclone/rclone/backend/sharefile/api"
"github.com/rclone/rclone/fs"

View File

@@ -23,7 +23,7 @@ import (
"regexp"
"strconv"
"strings"
"sync"
"github.com/rclone/rclone/lib/sync"
"time"
"github.com/rclone/rclone/backend/sugarsync/api"

View File

@@ -4,7 +4,7 @@ import (
"context"
"fmt"
"io"
"sync"
"github.com/rclone/rclone/lib/sync"
"time"
"github.com/rclone/rclone/backend/union/upstream"

View File

@@ -3,7 +3,7 @@ package policy
import (
"context"
"path"
"sync"
"github.com/rclone/rclone/lib/sync"
"github.com/rclone/rclone/backend/union/upstream"
"github.com/rclone/rclone/fs"

View File

@@ -3,7 +3,7 @@ package policy
import (
"context"
"path"
"sync"
"github.com/rclone/rclone/lib/sync"
"time"
"github.com/rclone/rclone/backend/union/upstream"

View File

@@ -9,7 +9,7 @@ import (
"path"
"path/filepath"
"strings"
"sync"
"github.com/rclone/rclone/lib/sync"
"time"
"github.com/rclone/rclone/backend/union/policy"

View File

@@ -18,7 +18,7 @@ func TestIntegration(t *testing.T) {
}
fstests.Run(t, &fstests.Opt{
RemoteName: *fstest.RemoteName,
UnimplementableFsMethods: []string{"OpenWriterAt", "DuplicateFiles", "Resume"},
UnimplementableFsMethods: []string{"OpenWriterAt", "DuplicateFiles"},
UnimplementableObjectMethods: []string{"MimeType"},
})
}

View File

@@ -9,7 +9,7 @@ import (
"path"
"path/filepath"
"strings"
"sync"
"github.com/rclone/rclone/lib/sync"
"sync/atomic"
"time"

View File

@@ -6,7 +6,7 @@ import (
"regexp"
"strconv"
"strings"
"sync"
"github.com/rclone/rclone/lib/sync"
"time"
"github.com/rclone/rclone/fs"

View File

@@ -21,7 +21,7 @@ import (
"path"
"strconv"
"strings"
"sync"
"github.com/rclone/rclone/lib/sync"
"time"
"github.com/rclone/rclone/backend/webdav/api"

View File

@@ -66,6 +66,11 @@ func init() {
})
},
Options: append(oauthutil.SharedOptions, []fs.Option{{
Name: "hard_delete",
Help: "Delete files permanently rather than putting them into the trash.",
Default: false,
Advanced: true,
}, {
Name: config.ConfigEncoding,
Help: config.ConfigEncodingHelp,
Advanced: true,
@@ -79,8 +84,9 @@ func init() {
// Options defines the configuration for this backend
type Options struct {
Token string `config:"token"`
Enc encoder.MultiEncoder `config:"encoding"`
Token string `config:"token"`
HardDelete bool `config:"hard_delete"`
Enc encoder.MultiEncoder `config:"encoding"`
}
// Fs represents a remote yandex
@@ -630,7 +636,7 @@ func (f *Fs) purgeCheck(ctx context.Context, dir string, check bool) error {
}
}
//delete directory
return f.delete(ctx, root, false)
return f.delete(ctx, root, f.opt.HardDelete)
}
// Rmdir deletes the container
@@ -1141,7 +1147,7 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
// Remove an object
func (o *Object) Remove(ctx context.Context) error {
return o.fs.delete(ctx, o.filePath(), false)
return o.fs.delete(ctx, o.filePath(), o.fs.opt.HardDelete)
}
// MimeType of an Object if known, "" otherwise

View File

@@ -19,7 +19,7 @@ import (
"runtime"
"sort"
"strings"
"sync"
"github.com/rclone/rclone/lib/sync"
"text/template"
"time"

View File

@@ -19,7 +19,7 @@ import (
"runtime/pprof"
"strconv"
"strings"
"sync"
"github.com/rclone/rclone/lib/sync"
"time"
"github.com/rclone/rclone/fs"

View File

@@ -9,7 +9,7 @@ import (
"io"
"os"
"path"
"sync"
"github.com/rclone/rclone/lib/sync"
"sync/atomic"
"time"

View File

@@ -12,7 +12,7 @@ import (
"os"
"path/filepath"
"sort"
"sync"
"github.com/rclone/rclone/lib/sync"
"time"
)

View File

@@ -7,7 +7,7 @@ import (
"os"
"runtime"
"strings"
"sync"
"github.com/rclone/rclone/lib/sync"
"time"
"github.com/rclone/rclone/cmd"

View File

@@ -5,7 +5,7 @@ import (
"errors"
"log"
"sort"
"sync"
"github.com/rclone/rclone/lib/sync"
"time"
"github.com/rclone/rclone/fs"

View File

@@ -5,7 +5,7 @@ import (
"context"
"fmt"
"path"
"sync"
"github.com/rclone/rclone/lib/sync"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/walk"

View File

@@ -6,7 +6,7 @@ import (
"bytes"
"fmt"
"strings"
"sync"
"github.com/rclone/rclone/lib/sync"
"time"
"github.com/rclone/rclone/fs"

View File

@@ -3,7 +3,7 @@ package rcd
import (
"context"
"log"
"sync"
"github.com/rclone/rclone/lib/sync"
sysdnotify "github.com/iguanesolutions/go-systemd/v5/notify"
"github.com/rclone/rclone/cmd"

View File

@@ -10,7 +10,7 @@ import (
"path/filepath"
"reflect"
"sort"
"sync"
"github.com/rclone/rclone/lib/sync"
"time"
sysdnotify "github.com/iguanesolutions/go-systemd/v5/notify"

View File

@@ -14,7 +14,7 @@ import (
"os"
"os/user"
"strconv"
"sync"
"github.com/rclone/rclone/lib/sync"
"time"
"github.com/rclone/rclone/cmd"

View File

@@ -5,7 +5,7 @@ package restic
import (
"strings"
"sync"
"github.com/rclone/rclone/lib/sync"
"github.com/rclone/rclone/fs"
)

View File

@@ -16,7 +16,7 @@ import (
"sort"
"strconv"
"strings"
"sync"
"github.com/rclone/rclone/lib/sync"
"time"
"github.com/rclone/rclone/cmd"

View File

@@ -3,7 +3,7 @@ package memory
import (
"context"
"runtime"
"sync"
"github.com/rclone/rclone/lib/sync"
"github.com/rclone/rclone/cmd"
"github.com/rclone/rclone/cmd/test"

View File

@@ -550,3 +550,6 @@ put them back in again.` >}}
* Fredric Arklid <fredric.arklid@consid.se>
* Andy Jackson <Andrew.Jackson@bl.uk>
* Sinan Tan <i@tinytangent.com>
* deinferno <14363193+deinferno@users.noreply.github.com>
* rsapkf <rsapkfff@pm.me>
* Will Holtz <wholtz@gmail.com>

View File

@@ -81,6 +81,14 @@ key. It is stored using RFC3339 Format time with nanosecond
precision. The metadata is supplied during directory listings so
there is no overhead to using it.
### Performance
When uploading large files, increasing the value of
`--azureblob-upload-concurrency` will increase performance at the cost
of using more memory. The default of 16 is set quite conservatively to
use less memory. It maybe be necessary raise it to 64 or higher to
fully utilize a 1 GBit/s link with a single file transfer.
### Restricted filename characters
In addition to the [default restricted characters set](/overview/#restricted-characters)

View File

@@ -107,8 +107,9 @@ At the end of the non interactive process, rclone will return a result
with `State` as empty string.
If `--all` is passed then rclone will ask all the config questions,
not just the post config questions. Any parameters are used as
defaults for questions as usual.
not just the post config questions. Parameters that are supplied on
the command line or from environment variables are used as defaults
for questions as usual.
Note that `bin/config.py` in the rclone source implements this protocol
as a readable demonstration.

View File

@@ -80,7 +80,7 @@ List all the files in your pCloud
rclone ls remote:
To copy a local directory to an pCloud directory called backup
To copy a local directory to a pCloud directory called backup
rclone copy /home/source remote:backup

View File

@@ -620,7 +620,7 @@ issue](https://github.com/pkg/sftp/issues/156) is fixed.
Note that since SFTP isn't HTTP based the following flags don't work
with it: `--dump-headers`, `--dump-bodies`, `--dump-auth`
Note that `--timeout` isn't supported (but `--contimeout` is).
Note that `--timeout` and `--contimeout` are both supported.
## C14 {#c14}

View File

@@ -175,6 +175,15 @@ Leave blank to use the provider defaults.
- Type: string
- Default: ""
#### --yandex-hard-delete
Delete files permanently rather than putting them into the trash.
- Config: hard_delete
- Env Var: RCLONE_YANDEX_HARD_DELETE
- Type: bool
- Default: false
#### --yandex-encoding
This sets the encoding for the backend.

View File

@@ -6,7 +6,7 @@ import (
"errors"
"fmt"
"io"
"sync"
"github.com/rclone/rclone/lib/sync"
"time"
"unicode/utf8"

View File

@@ -2,7 +2,7 @@ package accounting
import (
"context"
"sync"
"github.com/rclone/rclone/lib/sync"
"github.com/rclone/rclone/fs"
)

View File

@@ -6,7 +6,7 @@ import (
"fmt"
"sort"
"strings"
"sync"
"github.com/rclone/rclone/lib/sync"
"time"
"github.com/rclone/rclone/fs"

View File

@@ -2,7 +2,7 @@ package accounting
import (
"context"
"sync"
"github.com/rclone/rclone/lib/sync"
"github.com/rclone/rclone/fs/rc"

View File

@@ -4,11 +4,11 @@ import (
"context"
"errors"
"fmt"
"sync"
"time"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/rc"
"github.com/rclone/rclone/lib/sync"
"golang.org/x/time/rate"
)

View File

@@ -4,7 +4,7 @@ import (
"context"
"encoding/json"
"io"
"sync"
"github.com/rclone/rclone/lib/sync"
"time"
"github.com/rclone/rclone/fs"

View File

@@ -5,7 +5,7 @@ import (
"fmt"
"sort"
"strings"
"sync"
"github.com/rclone/rclone/lib/sync"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/rc"

View File

@@ -6,7 +6,7 @@ import (
"context"
"errors"
"io"
"sync"
"github.com/rclone/rclone/lib/sync"
"time"
"github.com/rclone/rclone/fs"

View File

@@ -9,7 +9,7 @@ import (
"io/ioutil"
"math/rand"
"strings"
"sync"
"github.com/rclone/rclone/lib/sync"
"testing"
"testing/iotest"
"time"

2
fs/cache/cache.go vendored
View File

@@ -4,7 +4,7 @@ package cache
import (
"context"
"runtime"
"sync"
"github.com/rclone/rclone/lib/sync"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/filter"

View File

@@ -4,7 +4,7 @@ import (
"context"
"errors"
"io"
"sync"
"github.com/rclone/rclone/lib/sync"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/hash"

View File

@@ -130,8 +130,6 @@ type ConfigInfo struct {
FsCacheExpireDuration time.Duration
FsCacheExpireInterval time.Duration
DisableHTTP2 bool
MaxResumeCacheSize SizeSuffix
ResumeCutoff SizeSuffix
HumanReadable bool
KvLockTime time.Duration // maximum time to keep key-value database locked by process
}
@@ -165,8 +163,6 @@ func NewConfig() *ConfigInfo {
c.TPSLimitBurst = 1
c.MaxTransfer = -1
c.MaxBacklog = 10000
c.MaxResumeCacheSize = SizeSuffix(100 * 1024)
c.ResumeCutoff = -1
// We do not want to set the default here. We use this variable being empty as part of the fall-through of options.
// c.StatsOneLineDateFormat = "2006/01/02 15:04:05 - "
c.MultiThreadCutoff = SizeSuffix(250 * 1024 * 1024)

View File

@@ -8,7 +8,7 @@ import (
"os"
"path/filepath"
"strings"
"sync"
"github.com/rclone/rclone/lib/sync"
"github.com/Unknwon/goconfig"
"github.com/rclone/rclone/fs"

View File

@@ -132,8 +132,6 @@ func AddFlags(ci *fs.ConfigInfo, flagSet *pflag.FlagSet) {
flags.BoolVarP(flagSet, &ci.RefreshTimes, "refresh-times", "", ci.RefreshTimes, "Refresh the modtime of remote files")
flags.BoolVarP(flagSet, &ci.NoConsole, "no-console", "", ci.NoConsole, "Hide console window (supported on Windows only)")
flags.StringVarP(flagSet, &dscp, "dscp", "", "", "Set DSCP value to connections, value or name, e.g. CS1, LE, DF, AF21")
flags.FVarP(flagSet, &ci.MaxResumeCacheSize, "max-resume-cache-size", "", "The maximum size of the cache used to store data necessary for resuming uploads. When the storage grows beyond this size, the oldest resume data will be deleted. (default 100k")
flags.FVarP(flagSet, &ci.ResumeCutoff, "resume-cutoff", "", "If set, attempt to resume all partial uploads larger than this size. (default off)")
flags.DurationVarP(flagSet, &ci.FsCacheExpireDuration, "fs-cache-expire-duration", "", ci.FsCacheExpireDuration, "Cache remotes for this long (0 to disable caching)")
flags.DurationVarP(flagSet, &ci.FsCacheExpireInterval, "fs-cache-expire-interval", "", ci.FsCacheExpireInterval, "Interval to check for expired remotes")
flags.BoolVarP(flagSet, &ci.DisableHTTP2, "disable-http2", "", ci.DisableHTTP2, "Disable HTTP/2 in the global transport")

View File

@@ -2,7 +2,7 @@ package config
import (
"encoding/json"
"sync"
"github.com/rclone/rclone/lib/sync"
)
// defaultStorage implements config.Storage, providing in-memory config.

View File

@@ -163,10 +163,6 @@ type Features struct {
// Shutdown the backend, closing any background tasks and any
// cached connections.
Shutdown func(ctx context.Context) error
// Resume checks whether the (remote, ID) pair is valid and returns
// the point the file should be resumed from or an error.
Resume func(ctx context.Context, remote, ID, hashName, hashState string) (Pos int64, err error)
}
// Disable nil's out the named feature. If it isn't found then it
@@ -294,9 +290,6 @@ func (ft *Features) Fill(ctx context.Context, f Fs) *Features {
if do, ok := f.(Shutdowner); ok {
ft.Shutdown = do.Shutdown
}
if do, ok := f.(Resumer); ok {
ft.Resume = do.Resume
}
return ft.DisableList(GetConfig(ctx).DisableFeatures)
}
@@ -643,13 +636,6 @@ type Shutdowner interface {
Shutdown(ctx context.Context) error
}
// Resumer is an optional interface for Fs
type Resumer interface {
// Resume checks whether the (remote, ID) pair is valid and returns
// the point the file should be resumed from or an error.
Resume(ctx context.Context, remote, ID, hashName, hashState string) (Pos int64, err error)
}
// ObjectsChan is a channel of Objects
type ObjectsChan chan Object

View File

@@ -6,7 +6,7 @@ import (
"io/ioutil"
"os"
"strings"
"sync"
"github.com/rclone/rclone/lib/sync"
"testing"
"time"

View File

@@ -48,7 +48,6 @@ var (
ErrorNotImplemented = errors.New("optional feature not implemented")
ErrorCommandNotFound = errors.New("command not found")
ErrorFileNameTooLong = errors.New("file name too long")
ErrorCantResume = errors.New("can't resume file upload")
)
// CheckClose is a utility function used to check the return from

View File

@@ -7,7 +7,7 @@ import (
"fmt"
"os"
"strings"
"sync"
"github.com/rclone/rclone/lib/sync"
"testing"
"time"

View File

@@ -5,7 +5,7 @@ import (
"net"
"runtime"
"strings"
"sync"
"github.com/rclone/rclone/lib/sync"
"time"
"github.com/rclone/rclone/fs"

View File

@@ -12,7 +12,7 @@ import (
"net/http"
"net/http/cookiejar"
"net/http/httputil"
"sync"
"github.com/rclone/rclone/lib/sync"
"time"
"github.com/rclone/rclone/fs"

View File

@@ -4,7 +4,6 @@ import (
"crypto/md5"
"crypto/sha1"
"crypto/sha256"
"encoding"
"encoding/base64"
"encoding/hex"
"errors"
@@ -229,14 +228,6 @@ func (m *MultiHasher) Write(p []byte) (n int, err error) {
return n, err
}
// Hashes returns accumulated hash types.
func (m *MultiHasher) Hashes() (set Set) {
for ht := range m.h {
set.Add(ht)
}
return
}
// Sums returns the sums of all accumulated hashes as hex encoded
// strings.
func (m *MultiHasher) Sums() map[Type]string {
@@ -273,67 +264,6 @@ func (m *MultiHasher) Size() int64 {
return m.size
}
// GetHashState returns the partial hash state for the given hash type encoded as a string
func (m *MultiHasher) GetHashState(hashType Type) (string, error) {
h, ok := m.h[hashType]
if !ok {
return "", ErrUnsupported
}
marshaler, ok := h.(encoding.BinaryMarshaler)
if !ok {
return "", errors.New(hashType.String() + " does not implement encoding.BinaryMarshaler")
}
data, err := marshaler.MarshalBinary()
if err != nil {
return "", err
}
return base64.StdEncoding.EncodeToString(data), nil
}
// RestoreHashState restores the partial hash state for the passed hash type
func (m *MultiHasher) RestoreHashState(hashType Type, hashState string) error {
partialHashState, err := base64.StdEncoding.DecodeString(hashState)
if err != nil {
return err
}
unmarshaler, ok := m.h[hashType].(encoding.BinaryUnmarshaler)
if ok {
if err := unmarshaler.UnmarshalBinary(partialHashState); err != nil {
return err
}
}
return nil
}
// SumPartialHash returns the hash of the partial hash state
func SumPartialHash(hashName, hashState string) (string, error) {
partialHashDef, ok := name2hash[hashName]
if !ok {
return "", ErrUnsupported
}
partialHash := partialHashDef.newFunc()
partialHashState, err := base64.StdEncoding.DecodeString(hashState)
if err != nil {
return "", err
}
unmarshaler, ok := partialHash.(encoding.BinaryUnmarshaler)
if ok {
if err := unmarshaler.UnmarshalBinary(partialHashState); err != nil {
return "", err
}
}
return hex.EncodeToString(partialHash.Sum(nil)), nil
}
// NameToType returns the requested hash type or None if the hash type isn't supported
func NameToType(hashName string) Type {
hashDef, ok := name2hash[hashName]
if !ok {
return None
}
return hashDef.hashType
}
// A Set Indicates one or more hash types.
type Set int

View File

@@ -7,7 +7,7 @@ import (
"path"
"sort"
"strings"
"sync"
"github.com/rclone/rclone/lib/sync"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/dirtree"

View File

@@ -7,7 +7,7 @@ import (
"errors"
"fmt"
"strings"
"sync"
"github.com/rclone/rclone/lib/sync"
"testing"
_ "github.com/rclone/rclone/backend/local"

View File

@@ -3,19 +3,13 @@
package fs
import (
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
"os"
"path/filepath"
"sort"
"strconv"
"strings"
"github.com/rclone/rclone/fs/hash"
"github.com/rclone/rclone/lib/cacheroot"
)
// OpenOption is an interface describing options for Open
@@ -236,145 +230,6 @@ func (o *HashesOption) Mandatory() bool {
return false
}
// OptionResume defines a Put/Upload for doing resumes
type OptionResume struct {
ID string // resume this ID if set
Pos int64 // and resume from this position
Hash string
Src Object
F Fs
Remote string
CacheCleaned bool
CacheDir string
}
// SetID will be called by backend's Put/Update function if the object's upload
// could be resumed upon failure
//
// SetID takes the passed resume ID, hash state, hash name and Fingerprint of the object and stores it in
// --cache-dir so that future Copy operations can resume the upload if it fails
func (o *OptionResume) SetID(ctx context.Context, ID, hashName, hashState string) error {
ci := GetConfig(ctx)
// Get the Fingerprint of the src object so that future Copy operations can ensure the
// object hasn't changed before resuming an upload
fingerprint := Fingerprint(ctx, o.Src, true)
data, err := marshalResumeJSON(ctx, fingerprint, ID, hashName, hashState)
if err != nil {
return fmt.Errorf("failed to marshal data JSON: %w", err)
}
if len(data) < int(ci.MaxResumeCacheSize) {
// Each remote will have its own directory for cached resume files
dirPath, _, err := cacheroot.CreateCacheRoot(o.CacheDir, o.F.Name(), o.F.Root(), "resume")
if err != nil {
return err
}
err = os.MkdirAll(dirPath, os.ModePerm)
if err != nil {
return fmt.Errorf("failed to create cache directory %v: %w", dirPath, err)
}
// Write resume data to disk
cachePath := filepath.Join(dirPath, o.Remote)
cacheFile, err := os.Create(cachePath)
if err != nil {
return fmt.Errorf("failed to create cache file %v: %w", cachePath, err)
}
defer func() {
_ = cacheFile.Close()
}()
_, errWrite := cacheFile.Write(data)
if errWrite != nil {
return fmt.Errorf("failed to write JSON to file: %w", errWrite)
}
}
if !o.CacheCleaned {
rootCacheDir := filepath.Join(o.CacheDir, "resume")
if err := cleanResumeCache(ctx, rootCacheDir); err != nil {
return fmt.Errorf("failed to clean resume cache: %w", err)
}
}
o.CacheCleaned = true
return nil
}
// ResumeJSON is a struct for storing resume info in cache
type ResumeJSON struct {
Fingerprint string `json:"fprint"`
ID string `json:"id"`
HashName string `json:"hname"`
HashState string `json:"hstate"`
}
func marshalResumeJSON(ctx context.Context, fprint, id, hashName, hashState string) ([]byte, error) {
resumedata := ResumeJSON{
Fingerprint: fprint,
ID: id,
HashName: hashName,
HashState: hashState,
}
data, err := json.Marshal(&resumedata)
return data, err
}
// cleanCache checks the size of the resume cache and removes the oldest resume files if more than limit
func cleanResumeCache(ctx context.Context, rootCacheDir string) error {
ci := GetConfig(ctx)
var paths []string
pathsWithInfo := make(map[string]os.FileInfo)
totalCacheSize := int64(0)
walkErr := filepath.Walk(rootCacheDir,
func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if info.IsDir() {
// Empty subdirectories in the resume cache dir can be removed
removeErr := os.Remove(path)
if err != nil && !os.IsNotExist(removeErr) {
return fmt.Errorf("failed to remove empty subdirectory: %s: %w", path, err)
}
return nil
}
paths = append(paths, path)
pathsWithInfo[path] = info
totalCacheSize += info.Size()
return nil
})
if walkErr != nil {
return fmt.Errorf("error walking through cache when cleaning cache dir: %w", walkErr)
}
if totalCacheSize > int64(ci.MaxResumeCacheSize) {
sort.Slice(paths, func(i, j int) bool {
return pathsWithInfo[paths[i]].ModTime().Before(pathsWithInfo[paths[j]].ModTime())
})
for _, p := range paths {
if totalCacheSize < int64(ci.MaxResumeCacheSize) {
break
}
if err := os.Remove(p); err != nil {
return fmt.Errorf("error removing oldest cache file: %s: %w", p, err)
}
totalCacheSize -= pathsWithInfo[p].Size()
Debugf(p, "Successfully removed oldest cache file")
}
}
return nil
}
// Header formats the option as an http header
func (o *OptionResume) Header() (key string, value string) {
return "", ""
}
// String formats the option into human readable form
func (o *OptionResume) String() string {
return fmt.Sprintf("OptionResume(ID:%v, Pos:%v)", o.ID, o.Pos)
}
// Mandatory returns whether the option must be parsed or can be ignored
func (o *OptionResume) Mandatory() bool {
return false
}
// NullOption defines an Option which does nothing
type NullOption struct {
}

View File

@@ -10,7 +10,7 @@ import (
"os"
"regexp"
"strings"
"sync"
"github.com/rclone/rclone/lib/sync"
"sync/atomic"
"github.com/rclone/rclone/fs"

View File

@@ -1,23 +0,0 @@
//go:build !windows
// +build !windows
package operations
import (
"os"
"os/exec"
"syscall"
)
func sendInterrupt() error {
p, err := os.FindProcess(syscall.Getpid())
if err != nil {
return err
}
err = p.Signal(os.Interrupt)
return err
}
func setupCmd(cmd *exec.Cmd) {
// Only needed for windows
}

View File

@@ -1,32 +0,0 @@
//go:build windows
// +build windows
package operations
import (
"os/exec"
"syscall"
)
// Credit: https://github.com/golang/go/blob/6125d0c4265067cdb67af1340bf689975dd128f4/src/os/signal/signal_windows_test.go#L18
func sendInterrupt() error {
d, e := syscall.LoadDLL("kernel32.dll")
if e != nil {
return e
}
p, e := d.FindProc("GenerateConsoleCtrlEvent")
if e != nil {
return e
}
r, _, e := p.Call(syscall.CTRL_BREAK_EVENT, uintptr(syscall.Getpid()))
if r == 0 {
return e
}
return nil
}
func setupCmd(cmd *exec.Cmd) {
(*cmd).SysProcAttr = &syscall.SysProcAttr{
CreationFlags: syscall.CREATE_NEW_PROCESS_GROUP,
}
}

View File

@@ -18,7 +18,7 @@ import (
"sort"
"strconv"
"strings"
"sync"
"github.com/rclone/rclone/lib/sync"
"sync/atomic"
"time"
@@ -33,7 +33,6 @@ import (
"github.com/rclone/rclone/fs/object"
"github.com/rclone/rclone/fs/walk"
"github.com/rclone/rclone/lib/atexit"
"github.com/rclone/rclone/lib/cacheroot"
"github.com/rclone/rclone/lib/pacer"
"github.com/rclone/rclone/lib/random"
"github.com/rclone/rclone/lib/readers"
@@ -365,11 +364,6 @@ func CommonHash(ctx context.Context, fa, fb fs.Info) (hash.Type, *fs.HashesOptio
// be nil.
func Copy(ctx context.Context, f fs.Fs, dst fs.Object, remote string, src fs.Object) (newDst fs.Object, err error) {
ci := fs.GetConfig(ctx)
var resumeOpt *fs.OptionResume
if f.Features().Resume != nil {
resumeOpt = createResumeOpt(ctx, f, remote, src)
}
tr := accounting.Stats(ctx).NewTransfer(src)
defer func() {
tr.Done(ctx, err)
@@ -411,7 +405,7 @@ func Copy(ctx context.Context, f fs.Fs, dst fs.Object, remote string, src fs.Obj
if err == nil {
dst = newDst
in.ServerSideCopyEnd(dst.Size()) // account the bytes for the server-side transfer
err = in.Close()
_ = in.Close()
} else {
_ = in.Close()
}
@@ -467,10 +461,6 @@ func Copy(ctx context.Context, f fs.Fs, dst fs.Object, remote string, src fs.Obj
wrappedSrc = NewOverrideRemote(src, remote)
}
options := []fs.OpenOption{hashOption}
// Appends OptionResume if it was set
if resumeOpt != nil {
options = append(options, resumeOpt)
}
for _, option := range ci.UploadHeaders {
options = append(options, option)
}
@@ -485,17 +475,6 @@ func Copy(ctx context.Context, f fs.Fs, dst fs.Object, remote string, src fs.Obj
if err == nil {
newDst = dst
err = closeErr
cacheParent := config.GetCacheDir()
// Remove resume cache file (if one was created) when Put/Upload is successful
cacheDir, _, err := cacheroot.CreateCacheRoot(cacheParent, f.Name(), f.Root(), "resume")
if err != nil {
return nil, err
}
cacheFile := filepath.Join(cacheDir, remote)
removeErr := os.Remove(cacheFile)
if err != nil && !os.IsNotExist(removeErr) {
return nil, fmt.Errorf("failed to remove resume cache file after upload: %w", err)
}
}
}
}
@@ -619,6 +598,8 @@ func Move(ctx context.Context, fdst fs.Fs, dst fs.Object, remote string, src fs.
}
}
// Move dst <- src
in := tr.Account(ctx, nil) // account the transfer
in.ServerSideCopyStart()
newDst, err = doMove(ctx, src, remote)
switch err {
case nil:
@@ -627,13 +608,16 @@ func Move(ctx context.Context, fdst fs.Fs, dst fs.Object, remote string, src fs.
} else {
fs.Infof(src, "Moved (server-side)")
}
in.ServerSideCopyEnd(newDst.Size()) // account the bytes for the server-side transfer
_ = in.Close()
return newDst, nil
case fs.ErrorCantMove:
fs.Debugf(src, "Can't move, switching to copy")
_ = in.Close()
default:
err = fs.CountError(err)
fs.Errorf(src, "Couldn't move: %v", err)
_ = in.Close()
return newDst, err
}
}

View File

@@ -4,7 +4,7 @@ import (
"context"
"errors"
"io"
"sync"
"github.com/rclone/rclone/lib/sync"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/fserrors"

View File

@@ -1,73 +0,0 @@
package operations
import (
"context"
"encoding/json"
"io/ioutil"
"os"
"path/filepath"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/config"
"github.com/rclone/rclone/lib/cacheroot"
)
// Creates an OptionResume that will be passed to Put/Upload
func createResumeOpt(ctx context.Context, f fs.Fs, remote string, src fs.Object) (resumeOpt *fs.OptionResume) {
ci := fs.GetConfig(ctx)
cacheParent := config.GetCacheDir()
resumeOpt = &fs.OptionResume{ID: "", Pos: 0, Src: src, F: f, Remote: remote, CacheCleaned: false, CacheDir: cacheParent}
if ci.ResumeCutoff >= 0 {
cacheDir, _, err := cacheroot.CreateCacheRoot(cacheParent, f.Name(), f.Root(), "resume")
if err != nil {
return nil
}
cacheFile := filepath.Join(cacheDir, remote)
resumeID, hashName, hashState, attemptResume := readResumeCache(ctx, f, src, cacheFile)
if attemptResume {
fs.Debugf(f, "Existing resume cache file found: %s. A resume will now be attempted.", cacheFile)
position, resumeErr := f.Features().Resume(ctx, remote, resumeID, hashName, hashState)
if resumeErr != nil {
fs.Errorf(src, "Resume canceled: %v", resumeErr)
} else if position > int64(ci.ResumeCutoff) {
resumeOpt.Pos = position
resumeOpt.Hash = hashName
}
}
}
return resumeOpt
}
// readResumeCache checks to see if a resume ID has been cached for the source object.
// If it finds one it returns it along with true to signal a resume can be attempted
func readResumeCache(ctx context.Context, f fs.Fs, src fs.Object, cacheName string) (resumeID, hashName, hashState string, attemptResume bool) {
existingCacheFile, statErr := os.Open(cacheName)
defer func() {
_ = existingCacheFile.Close()
}()
if !os.IsNotExist(statErr) {
rawData, readErr := ioutil.ReadAll(existingCacheFile)
if readErr == nil {
existingFingerprint, resumeID, hashName, hashState, unmarshalErr := unmarshalResumeJSON(ctx, rawData)
if unmarshalErr != nil {
fs.Debugf(f, "Failed to unmarshal Resume JSON: %s. Resume will not be attempted.", unmarshalErr.Error())
} else if existingFingerprint != "" {
// Check if the src object has changed by comparing new Fingerprint to Fingerprint in cache file
fingerprint := fs.Fingerprint(ctx, src, true)
if existingFingerprint == fingerprint {
return resumeID, hashName, hashState, true
}
}
}
}
return "", "", "", false
}
func unmarshalResumeJSON(ctx context.Context, data []byte) (fprint, id, hashName, hashState string, err error) {
var resumedata fs.ResumeJSON
err = json.Unmarshal(data, &resumedata)
if err != nil {
return "", "", "", "", err
}
return resumedata.Fingerprint, resumedata.ID, resumedata.HashName, resumedata.HashState, nil
}

View File

@@ -1,163 +0,0 @@
package operations
import (
"bytes"
"context"
"io"
"io/ioutil"
"log"
"math/rand"
"os"
"os/exec"
"runtime"
"strings"
"sync"
"testing"
"time"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fstest"
"github.com/rclone/rclone/fstest/mockobject"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
type interruptReader struct {
once sync.Once
r io.Reader
}
// Read sends an OS specific interrupt signal and then reads 1 byte at a time
func (r *interruptReader) Read(b []byte) (n int, err error) {
r.once.Do(func() {
_ = sendInterrupt()
})
buffer := make([]byte, 1)
n, err = r.r.Read(buffer)
b[0] = buffer[0]
// Simulate duration of a larger read without needing to test with a large file
// Allows for the interrupt to be handled before Copy completes
time.Sleep(time.Microsecond * 10)
return n, err
}
// this is a wrapper for a mockobject with a custom Open function
//
// n indicates the number of bytes to read before sending an
// interrupt signal
type resumeTestObject struct {
fs.Object
n int64
}
// Open opens the file for read. Call Close() on the returned io.ReadCloser
//
// The Reader will signal an interrupt after reading n bytes, then continue to read 1 byte at a time.
// If TestResume is successful, the interrupt will be processed and reads will be cancelled before running
// out of bytes to read
func (o *resumeTestObject) Open(ctx context.Context, options ...fs.OpenOption) (io.ReadCloser, error) {
rc, err := o.Object.Open(ctx, options...)
if err != nil {
return nil, err
}
r := io.MultiReader(&io.LimitedReader{R: rc, N: o.n}, &interruptReader{r: rc})
// Wrap with Close in a new readCloser
rc = readCloser{Reader: r, Closer: rc}
return rc, nil
}
func makeContent(t *testing.T, size int) []byte {
content := make([]byte, size)
r := rand.New(rand.NewSource(42))
_, err := io.ReadFull(r, content)
assert.NoError(t, err)
return content
}
func TestResume(t *testing.T) {
ctx := context.Background()
r := fstest.NewRun(t)
defer r.Finalise()
ci := fs.GetConfig(ctx)
ci.ResumeCutoff = 0
// Contents for the mock object
var (
// Test contents must be large enough that io.Copy does not complete during the first Rclone Copy operation
resumeTestContents = makeContent(t, 1024)
expectedContents = resumeTestContents
)
// Create mockobjects with given breaks
createTestSrc := func(interrupt int64) (fs.Object, fs.Object) {
srcOrig := mockobject.New("potato").WithContent(resumeTestContents, mockobject.SeekModeNone)
srcOrig.SetFs(r.Flocal)
src := &resumeTestObject{
Object: srcOrig,
n: interrupt,
}
return src, srcOrig
}
checkContents := func(obj fs.Object, contents string) {
assert.NotNil(t, obj)
assert.Equal(t, int64(len(contents)), obj.Size())
r, err := obj.Open(ctx)
assert.NoError(t, err)
assert.NotNil(t, r)
if r == nil {
return
}
data, err := ioutil.ReadAll(r)
assert.NoError(t, err)
assert.Equal(t, contents, string(data))
_ = r.Close()
}
srcBreak, srcNoBreak := createTestSrc(2)
// Run first Copy only in a subprocess so that it can be interrupted without ending the test
// adapted from: https://stackoverflow.com/questions/26225513/how-to-test-os-exit-scenarios-in-go
if os.Getenv("RUNTEST") == "1" {
remoteRoot := os.Getenv("REMOTEROOT")
remoteFs, err := fs.NewFs(ctx, remoteRoot)
require.NoError(t, err)
_, _ = Copy(ctx, remoteFs, nil, "testdst", srcBreak)
// This should never be reached as the subroutine should exit during Copy
require.True(t, false, "Problem with test, first Copy operation should've been interrupted before completion")
return
}
// Start the subprocess
cmd := exec.Command(os.Args[0], "-test.run=TestResume")
cmd.Env = append(os.Environ(), "RUNTEST=1", "REMOTEROOT="+r.Fremote.Root())
cmd.Stdout = os.Stdout
setupCmd(cmd)
err := cmd.Run()
e, ok := err.(*exec.ExitError)
// Exit code after signal will be (128+signum) on Linux or (signum) on Windows
expectedErrorString := "exit status 1"
if runtime.GOOS == "windows" {
expectedErrorString = "exit status 2"
}
assert.True(t, ok)
assert.Contains(t, e.Error(), expectedErrorString)
var buf bytes.Buffer
log.SetOutput(&buf)
defer func() {
log.SetOutput(os.Stderr)
}()
// Start copy again, but with no breaks
newDst, err := Copy(ctx, r.Fremote, nil, "testdst", srcNoBreak)
assert.NoError(t, err)
// Checks to see if a resume was initiated
// Resumed byte position can vary slightly depending how long it takes atexit to process the interrupt
assert.True(t, strings.Contains(buf.String(), "Resuming at byte position: "), "The upload did not resume when restarted. Message: %q", buf.String())
checkContents(newDst, string(expectedContents))
}

View File

@@ -7,7 +7,7 @@ import (
"errors"
"fmt"
"runtime/debug"
"sync"
"github.com/rclone/rclone/lib/sync"
"sync/atomic"
"time"

Some files were not shown because too many files have changed in this diff Show More