mirror of
https://github.com/rclone/rclone.git
synced 2026-01-23 12:53:28 +00:00
Compare commits
14 Commits
resume
...
fix-deadlo
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8c8a38711b | ||
|
|
386deb3633 | ||
|
|
a351484997 | ||
|
|
099eff8891 | ||
|
|
c4cb167d4a | ||
|
|
38e100ab19 | ||
|
|
db95a0d6c3 | ||
|
|
df07964db3 | ||
|
|
fbc4c4ad9a | ||
|
|
4454b3e1ae | ||
|
|
f9321fccbb | ||
|
|
3c2252b7c0 | ||
|
|
51c952654c | ||
|
|
80e47be65f |
@@ -19,7 +19,7 @@ import (
|
||||
"path"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"github.com/rclone/rclone/lib/sync"
|
||||
"time"
|
||||
|
||||
"github.com/Azure/azure-pipeline-go/pipeline"
|
||||
@@ -50,8 +50,6 @@ const (
|
||||
timeFormatOut = "2006-01-02T15:04:05.000000000Z07:00"
|
||||
storageDefaultBaseURL = "blob.core.windows.net"
|
||||
defaultChunkSize = 4 * fs.Mebi
|
||||
maxChunkSize = 100 * fs.Mebi
|
||||
uploadConcurrency = 4
|
||||
defaultAccessTier = azblob.AccessTierNone
|
||||
maxTryTimeout = time.Hour * 24 * 365 //max time of an azure web request response window (whether or not data is flowing)
|
||||
// Default storage account, key and blob endpoint for emulator support,
|
||||
@@ -134,12 +132,33 @@ msi_client_id, or msi_mi_res_id parameters.`,
|
||||
Advanced: true,
|
||||
}, {
|
||||
Name: "chunk_size",
|
||||
Help: `Upload chunk size (<= 100 MiB).
|
||||
Help: `Upload chunk size.
|
||||
|
||||
Note that this is stored in memory and there may be up to
|
||||
"--transfers" chunks stored at once in memory.`,
|
||||
"--transfers" * "--azureblob-upload-concurrency" chunks stored at once
|
||||
in memory.`,
|
||||
Default: defaultChunkSize,
|
||||
Advanced: true,
|
||||
}, {
|
||||
Name: "upload_concurrency",
|
||||
Help: `Concurrency for multipart uploads.
|
||||
|
||||
This is the number of chunks of the same file that are uploaded
|
||||
concurrently.
|
||||
|
||||
If you are uploading small numbers of large files over high-speed
|
||||
links and these uploads do not fully utilize your bandwidth, then
|
||||
increasing this may help to speed up the transfers.
|
||||
|
||||
In tests, upload speed increases almost linearly with upload
|
||||
concurrency. For example to fill a gigabit pipe it may be necessary to
|
||||
raise this to 64. Note that this will use more memory.
|
||||
|
||||
Note that chunks are stored in memory and there may be up to
|
||||
"--transfers" * "--azureblob-upload-concurrency" chunks stored at once
|
||||
in memory.`,
|
||||
Default: 16,
|
||||
Advanced: true,
|
||||
}, {
|
||||
Name: "list_chunk",
|
||||
Help: `Size of blob list.
|
||||
@@ -257,6 +276,7 @@ type Options struct {
|
||||
Endpoint string `config:"endpoint"`
|
||||
SASURL string `config:"sas_url"`
|
||||
ChunkSize fs.SizeSuffix `config:"chunk_size"`
|
||||
UploadConcurrency int `config:"upload_concurrency"`
|
||||
ListChunkSize uint `config:"list_chunk"`
|
||||
AccessTier string `config:"access_tier"`
|
||||
ArchiveTierDelete bool `config:"archive_tier_delete"`
|
||||
@@ -416,9 +436,6 @@ func checkUploadChunkSize(cs fs.SizeSuffix) error {
|
||||
if cs < minChunkSize {
|
||||
return fmt.Errorf("%s is less than %s", cs, minChunkSize)
|
||||
}
|
||||
if cs > maxChunkSize {
|
||||
return fmt.Errorf("%s is greater than %s", cs, maxChunkSize)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -1667,10 +1684,10 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
|
||||
|
||||
putBlobOptions := azblob.UploadStreamToBlockBlobOptions{
|
||||
BufferSize: int(o.fs.opt.ChunkSize),
|
||||
MaxBuffers: uploadConcurrency,
|
||||
MaxBuffers: o.fs.opt.UploadConcurrency,
|
||||
Metadata: o.meta,
|
||||
BlobHTTPHeaders: httpHeaders,
|
||||
TransferManager: o.fs.newPoolWrapper(uploadConcurrency),
|
||||
TransferManager: o.fs.newPoolWrapper(o.fs.opt.UploadConcurrency),
|
||||
}
|
||||
|
||||
// Don't retry, return a retry error instead
|
||||
|
||||
@@ -17,12 +17,10 @@ import (
|
||||
// TestIntegration runs integration tests against the remote
|
||||
func TestIntegration(t *testing.T) {
|
||||
fstests.Run(t, &fstests.Opt{
|
||||
RemoteName: "TestAzureBlob:",
|
||||
NilObject: (*Object)(nil),
|
||||
TiersToTest: []string{"Hot", "Cool"},
|
||||
ChunkedUpload: fstests.ChunkedUploadConfig{
|
||||
MaxChunkSize: maxChunkSize,
|
||||
},
|
||||
RemoteName: "TestAzureBlob:",
|
||||
NilObject: (*Object)(nil),
|
||||
TiersToTest: []string{"Hot", "Cool"},
|
||||
ChunkedUpload: fstests.ChunkedUploadConfig{},
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
@@ -17,7 +17,7 @@ import (
|
||||
"path"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"github.com/rclone/rclone/lib/sync"
|
||||
"time"
|
||||
|
||||
"github.com/rclone/rclone/backend/b2/api"
|
||||
|
||||
@@ -13,7 +13,7 @@ import (
|
||||
gohash "hash"
|
||||
"io"
|
||||
"strings"
|
||||
"sync"
|
||||
"github.com/rclone/rclone/lib/sync"
|
||||
|
||||
"github.com/rclone/rclone/backend/b2/api"
|
||||
"github.com/rclone/rclone/fs"
|
||||
|
||||
@@ -23,7 +23,7 @@ import (
|
||||
"path"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"github.com/rclone/rclone/lib/sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
|
||||
@@ -13,7 +13,7 @@ import (
|
||||
"io"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"sync"
|
||||
"github.com/rclone/rclone/lib/sync"
|
||||
"time"
|
||||
|
||||
"github.com/rclone/rclone/backend/box/api"
|
||||
|
||||
2
backend/cache/cache.go
vendored
2
backend/cache/cache.go
vendored
@@ -16,7 +16,7 @@ import (
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"github.com/rclone/rclone/lib/sync"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
|
||||
2
backend/cache/cache_test.go
vendored
2
backend/cache/cache_test.go
vendored
@@ -18,7 +18,7 @@ func TestIntegration(t *testing.T) {
|
||||
fstests.Run(t, &fstests.Opt{
|
||||
RemoteName: "TestCache:",
|
||||
NilObject: (*cache.Object)(nil),
|
||||
UnimplementableFsMethods: []string{"PublicLink", "OpenWriterAt", "Resume"},
|
||||
UnimplementableFsMethods: []string{"PublicLink", "OpenWriterAt"},
|
||||
UnimplementableObjectMethods: []string{"MimeType", "ID", "GetTier", "SetTier"},
|
||||
SkipInvalidUTF8: true, // invalid UTF-8 confuses the cache
|
||||
})
|
||||
|
||||
2
backend/cache/handle.go
vendored
2
backend/cache/handle.go
vendored
@@ -11,7 +11,7 @@ import (
|
||||
"path"
|
||||
"runtime"
|
||||
"strings"
|
||||
"sync"
|
||||
"github.com/rclone/rclone/lib/sync"
|
||||
"time"
|
||||
|
||||
"github.com/rclone/rclone/fs"
|
||||
|
||||
2
backend/cache/object.go
vendored
2
backend/cache/object.go
vendored
@@ -8,7 +8,7 @@ import (
|
||||
"fmt"
|
||||
"io"
|
||||
"path"
|
||||
"sync"
|
||||
"github.com/rclone/rclone/lib/sync"
|
||||
"time"
|
||||
|
||||
"github.com/rclone/rclone/fs"
|
||||
|
||||
2
backend/cache/plex.go
vendored
2
backend/cache/plex.go
vendored
@@ -12,7 +12,7 @@ import (
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strings"
|
||||
"sync"
|
||||
"github.com/rclone/rclone/lib/sync"
|
||||
"time"
|
||||
|
||||
cache "github.com/patrickmn/go-cache"
|
||||
|
||||
2
backend/cache/storage_persistent.go
vendored
2
backend/cache/storage_persistent.go
vendored
@@ -14,7 +14,7 @@ import (
|
||||
"path"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"github.com/rclone/rclone/lib/sync"
|
||||
"time"
|
||||
|
||||
"github.com/rclone/rclone/fs"
|
||||
|
||||
@@ -6,8 +6,6 @@ import (
|
||||
"context"
|
||||
"crypto/md5"
|
||||
"crypto/sha1"
|
||||
"encoding"
|
||||
"encoding/base64"
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
@@ -15,14 +13,13 @@ import (
|
||||
gohash "hash"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"math/rand"
|
||||
"path"
|
||||
"regexp"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"github.com/rclone/rclone/lib/sync"
|
||||
"time"
|
||||
|
||||
"github.com/rclone/rclone/fs"
|
||||
@@ -382,8 +379,6 @@ type Fs struct {
|
||||
features *fs.Features // optional features
|
||||
dirSort bool // reserved for future, ignored
|
||||
useNoRename bool // can be set with the transactions option
|
||||
hashState string // set in resume(), used to restore hash state
|
||||
resumeXactID string // set in resume(), allows reuse of xactID upon resume
|
||||
}
|
||||
|
||||
// configure sets up chunker for given name format, meta format and hash type.
|
||||
@@ -1157,41 +1152,7 @@ func (f *Fs) put(
|
||||
|
||||
// Prepare to upload
|
||||
c := f.newChunkingReader(src)
|
||||
// Prepare for resume if resumable
|
||||
var resumeOpt *fs.OptionResume
|
||||
// partialHashState will be used in wrapStream to restore hash state
|
||||
var partialHashState []byte
|
||||
for _, option := range options {
|
||||
switch option.(type) {
|
||||
case *fs.OptionResume:
|
||||
resumeOpt = option.(*fs.OptionResume)
|
||||
if resumeOpt.Pos != 0 {
|
||||
numChunksOnRemote := resumeOpt.Pos / int64(f.opt.ChunkSize)
|
||||
// Checks for existing chunks on the remote
|
||||
for i := 0; i < int(numChunksOnRemote); i++ {
|
||||
existingChunkName := f.makeChunkName(remote, i, "", f.resumeXactID)
|
||||
existingChunk, err := f.base.NewObject(ctx, existingChunkName)
|
||||
// If NewObject returns an error the chunk likely doesn't exist on the remote and we cannot resume
|
||||
if err != nil {
|
||||
resumeOpt.Pos = 0
|
||||
c.chunks = nil
|
||||
break
|
||||
}
|
||||
c.chunks = append(c.chunks, existingChunk)
|
||||
}
|
||||
fs.Debugf(f, "Resuming at chunk number: %d", numChunksOnRemote)
|
||||
partialHashState, _ = base64.StdEncoding.DecodeString(f.hashState)
|
||||
// Discard bytes that already exist on remote
|
||||
written, err := io.CopyN(ioutil.Discard, in, resumeOpt.Pos)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
c.accountBytes(written)
|
||||
c.sizeLeft = c.sizeTotal - c.readCount
|
||||
}
|
||||
}
|
||||
}
|
||||
wrapIn := c.wrapStream(ctx, in, src, partialHashState)
|
||||
wrapIn := c.wrapStream(ctx, in, src)
|
||||
|
||||
var metaObject fs.Object
|
||||
defer func() {
|
||||
@@ -1201,22 +1162,13 @@ func (f *Fs) put(
|
||||
}()
|
||||
|
||||
baseRemote := remote
|
||||
var xactID string
|
||||
if resumeOpt != nil && resumeOpt.Pos != 0 {
|
||||
xactID = f.resumeXactID
|
||||
} else {
|
||||
xactID, err = f.newXactID(ctx, baseRemote)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
xactID, errXact := f.newXactID(ctx, baseRemote)
|
||||
if errXact != nil {
|
||||
return nil, errXact
|
||||
}
|
||||
|
||||
// Transfer chunks data
|
||||
for c.chunkNo = 0; !c.done; c.chunkNo++ {
|
||||
// skip to chunk we can resume from if resumeOpt is set
|
||||
if c.chunkNo == 0 && resumeOpt != nil && resumeOpt.Pos != 0 {
|
||||
c.chunkNo = int(resumeOpt.Pos) / int(f.opt.ChunkSize)
|
||||
}
|
||||
if c.chunkNo > maxSafeChunkNumber {
|
||||
return nil, ErrChunkOverflow
|
||||
}
|
||||
@@ -1278,41 +1230,6 @@ func (f *Fs) put(
|
||||
c.chunkLimit = c.chunkSize
|
||||
|
||||
c.chunks = append(c.chunks, chunk)
|
||||
|
||||
// If an OptionResume was passed than we should call SetID so a resume can be attempted in event of a failure
|
||||
// ID keeps track of the first chunk that should be uploaded if a resume is attempted
|
||||
if resumeOpt != nil {
|
||||
// Publish hash state to control chunk
|
||||
marshaler, ok := c.hasher.(encoding.BinaryMarshaler)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("The hash type does not implement encoding.BinaryMarshaler")
|
||||
}
|
||||
state, err := marshaler.MarshalBinary()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
hashType := f.opt.HashType
|
||||
data, err := marshalPartialHashJSON(ctx, hashType, base64.StdEncoding.EncodeToString(state))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
controlChunkName := f.makeChunkName(remote, -1, "phash", xactID)
|
||||
controlInfo := f.wrapInfo(src, controlChunkName, int64(len(data)))
|
||||
controlChunk, err := basePut(ctx, bytes.NewReader(data), controlInfo)
|
||||
defer func() {
|
||||
_ = controlChunk.Remove(ctx)
|
||||
}()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
positionStr := strconv.Itoa(c.chunkNo + 1) // stores the number of chunks uploaded
|
||||
chunkSizeStr := strconv.FormatInt(c.chunkSize, 10)
|
||||
startFromStr := strconv.FormatInt(int64(f.opt.StartFrom), 10)
|
||||
err = resumeOpt.SetID(ctx, chunkSizeStr+","+startFromStr+","+positionStr+","+xactID, f.opt.HashType, base64.StdEncoding.EncodeToString(state))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Validate uploaded size
|
||||
@@ -1439,7 +1356,7 @@ func (f *Fs) newChunkingReader(src fs.ObjectInfo) *chunkingReader {
|
||||
return c
|
||||
}
|
||||
|
||||
func (c *chunkingReader) wrapStream(ctx context.Context, in io.Reader, src fs.ObjectInfo, partialHashState []byte) io.Reader {
|
||||
func (c *chunkingReader) wrapStream(ctx context.Context, in io.Reader, src fs.ObjectInfo) io.Reader {
|
||||
baseIn, wrapBack := accounting.UnWrap(in)
|
||||
|
||||
switch {
|
||||
@@ -1474,15 +1391,6 @@ func (c *chunkingReader) wrapStream(ctx context.Context, in io.Reader, src fs.Ob
|
||||
}
|
||||
|
||||
if c.hasher != nil {
|
||||
// Restores hash state during a resume
|
||||
if partialHashState != nil {
|
||||
unmarshaler, ok := c.hasher.(encoding.BinaryUnmarshaler)
|
||||
if ok {
|
||||
if err := unmarshaler.UnmarshalBinary(partialHashState); err != nil {
|
||||
log.Fatal("unable to unmarshal hash:", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
baseIn = io.TeeReader(baseIn, c.hasher)
|
||||
}
|
||||
c.baseReader = baseIn
|
||||
@@ -2602,34 +2510,6 @@ func unmarshalSimpleJSON(ctx context.Context, metaObject fs.Object, data []byte)
|
||||
return info, true, nil
|
||||
}
|
||||
|
||||
// Format for partial hash control chunks
|
||||
type partialHashJSON struct {
|
||||
HashType string `json:"htype"`
|
||||
PartialHash string `json:"phash"`
|
||||
}
|
||||
|
||||
// marshalPartialHashJSON
|
||||
//
|
||||
// Creates a JSON containing the hashType being used and the partial hash state. This will be stored in
|
||||
// a control chunk and used for resume functionality.
|
||||
//
|
||||
func marshalPartialHashJSON(ctx context.Context, hashType, partialHash string) ([]byte, error) {
|
||||
controlData := partialHashJSON{
|
||||
HashType: hashType,
|
||||
PartialHash: partialHash,
|
||||
}
|
||||
data, err := json.Marshal(&controlData)
|
||||
return data, err
|
||||
}
|
||||
|
||||
// unmarshalPartialHashJSON parses partial hash control chunk.
|
||||
//
|
||||
func unmarshalPartialHashJSON(ctx context.Context, data []byte) (hashType, partialHashState string, err error) {
|
||||
var partialHashData partialHashJSON
|
||||
err = json.Unmarshal(data, &partialHashData)
|
||||
return partialHashData.HashType, partialHashData.PartialHash, err
|
||||
}
|
||||
|
||||
func silentlyRemove(ctx context.Context, o fs.Object) {
|
||||
_ = o.Remove(ctx) // ignore error
|
||||
}
|
||||
@@ -2664,58 +2544,9 @@ func (f *Fs) CanQuickRename() bool {
|
||||
return f.base.Features().Move != nil
|
||||
}
|
||||
|
||||
// Resume checks whether the (remote, ID) pair is valid and returns
|
||||
// the point the file should be resumed from or an error.
|
||||
func (f *Fs) Resume(ctx context.Context, remote, ID, hashName, hashState string) (Pos int64, err error) {
|
||||
idSlice := strings.Split(ID, ",")
|
||||
cachedChunkSize, err := strconv.ParseInt(idSlice[0], 10, 64)
|
||||
cachedStartFrom, err := strconv.ParseInt(idSlice[1], 10, 64)
|
||||
cachedChunkNo, err := strconv.ParseInt(idSlice[2], 10, 64)
|
||||
cachedXactID := idSlice[3]
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
if cachedChunkSize != int64(f.opt.ChunkSize) {
|
||||
return 0, errors.New("ChunkSize doesn't match for file we are trying to resume")
|
||||
}
|
||||
if f.opt.StartFrom != int(cachedStartFrom) {
|
||||
return 0, errors.New("StartFrom doesn't match for file we are trying to resume")
|
||||
}
|
||||
// Check partial hash control chunk
|
||||
controlChunkName := f.makeChunkName(remote, -1, "phash", cachedXactID)
|
||||
hashControlChunk, err := f.base.NewObject(ctx, controlChunkName)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
reader, err := hashControlChunk.Open(ctx)
|
||||
data, err := ioutil.ReadAll(reader)
|
||||
_ = reader.Close() // ensure file handle is freed on windows
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
remoteHashType, remoteHashState, err := unmarshalPartialHashJSON(ctx, data)
|
||||
if remoteHashType == hashName && remoteHashState == hashState {
|
||||
if f.opt.HashType != remoteHashType {
|
||||
fs.Debugf(f, "Resume skipped, mismatch hash types. prev: %s, curr: %s", remoteHashType, f.opt.HashType)
|
||||
return 0, nil
|
||||
}
|
||||
pos := cachedChunkNo * cachedChunkSize
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
f.hashState = hashState
|
||||
f.resumeXactID = cachedXactID
|
||||
return pos, nil
|
||||
}
|
||||
|
||||
// No valid control chunks found, rewind from start
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
// Check the interfaces are satisfied
|
||||
var (
|
||||
_ fs.Fs = (*Fs)(nil)
|
||||
_ fs.Resumer = (*Fs)(nil)
|
||||
_ fs.Purger = (*Fs)(nil)
|
||||
_ fs.Copier = (*Fs)(nil)
|
||||
_ fs.Mover = (*Fs)(nil)
|
||||
|
||||
@@ -13,7 +13,7 @@ import (
|
||||
"io"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"github.com/rclone/rclone/lib/sync"
|
||||
"time"
|
||||
"unicode/utf8"
|
||||
|
||||
|
||||
@@ -23,7 +23,7 @@ func TestIntegration(t *testing.T) {
|
||||
fstests.Run(t, &fstests.Opt{
|
||||
RemoteName: *fstest.RemoteName,
|
||||
NilObject: (*crypt.Object)(nil),
|
||||
UnimplementableFsMethods: []string{"OpenWriterAt", "Resume"},
|
||||
UnimplementableFsMethods: []string{"OpenWriterAt"},
|
||||
UnimplementableObjectMethods: []string{"MimeType"},
|
||||
})
|
||||
}
|
||||
|
||||
@@ -21,7 +21,7 @@ import (
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"github.com/rclone/rclone/lib/sync"
|
||||
"sync/atomic"
|
||||
"text/template"
|
||||
"time"
|
||||
|
||||
@@ -10,7 +10,7 @@ import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
"github.com/rclone/rclone/lib/sync"
|
||||
"time"
|
||||
|
||||
"github.com/dropbox/dropbox-sdk-go-unofficial/v6/dropbox/async"
|
||||
|
||||
@@ -25,7 +25,7 @@ import (
|
||||
"net/url"
|
||||
"path"
|
||||
"strings"
|
||||
"sync"
|
||||
"github.com/rclone/rclone/lib/sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
|
||||
@@ -12,7 +12,7 @@ import (
|
||||
"path"
|
||||
"runtime"
|
||||
"strings"
|
||||
"sync"
|
||||
"github.com/rclone/rclone/lib/sync"
|
||||
"time"
|
||||
|
||||
"github.com/jlaffaye/ftp"
|
||||
|
||||
@@ -5,7 +5,7 @@ package googlephotos
|
||||
import (
|
||||
"path"
|
||||
"strings"
|
||||
"sync"
|
||||
"github.com/rclone/rclone/lib/sync"
|
||||
|
||||
"github.com/rclone/rclone/backend/googlephotos/api"
|
||||
)
|
||||
|
||||
@@ -15,7 +15,7 @@ import (
|
||||
"regexp"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"github.com/rclone/rclone/lib/sync"
|
||||
"time"
|
||||
|
||||
"github.com/rclone/rclone/backend/googlephotos/api"
|
||||
|
||||
@@ -9,7 +9,7 @@ import (
|
||||
"io"
|
||||
"path"
|
||||
"strings"
|
||||
"sync"
|
||||
"github.com/rclone/rclone/lib/sync"
|
||||
"time"
|
||||
|
||||
"github.com/rclone/rclone/fs"
|
||||
|
||||
@@ -15,7 +15,7 @@ import (
|
||||
"path"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"github.com/rclone/rclone/lib/sync"
|
||||
"time"
|
||||
|
||||
"github.com/rclone/rclone/fs"
|
||||
|
||||
@@ -12,9 +12,8 @@ import (
|
||||
"path"
|
||||
"path/filepath"
|
||||
"runtime"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"github.com/rclone/rclone/lib/sync"
|
||||
"time"
|
||||
"unicode/utf8"
|
||||
|
||||
@@ -25,7 +24,6 @@ import (
|
||||
"github.com/rclone/rclone/fs/config/configstruct"
|
||||
"github.com/rclone/rclone/fs/fserrors"
|
||||
"github.com/rclone/rclone/fs/hash"
|
||||
"github.com/rclone/rclone/lib/atexit"
|
||||
"github.com/rclone/rclone/lib/encoder"
|
||||
"github.com/rclone/rclone/lib/file"
|
||||
"github.com/rclone/rclone/lib/readers"
|
||||
@@ -232,7 +230,6 @@ type Fs struct {
|
||||
precision time.Duration // precision of local filesystem
|
||||
warnedMu sync.Mutex // used for locking access to 'warned'.
|
||||
warned map[string]struct{} // whether we have warned about this string
|
||||
hashState map[string]string // set in resume(), used to restore hash state
|
||||
|
||||
// do os.Lstat or os.Stat
|
||||
lstat func(name string) (os.FileInfo, error)
|
||||
@@ -270,12 +267,11 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e
|
||||
}
|
||||
|
||||
f := &Fs{
|
||||
name: name,
|
||||
opt: *opt,
|
||||
warned: make(map[string]struct{}),
|
||||
hashState: make(map[string]string),
|
||||
dev: devUnset,
|
||||
lstat: os.Lstat,
|
||||
name: name,
|
||||
opt: *opt,
|
||||
warned: make(map[string]struct{}),
|
||||
dev: devUnset,
|
||||
lstat: os.Lstat,
|
||||
}
|
||||
f.root = cleanRootPath(root, f.opt.NoUNC, f.opt.Enc)
|
||||
f.features = (&fs.Features{
|
||||
@@ -1119,7 +1115,6 @@ func (nwc nopWriterCloser) Close() error {
|
||||
func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (err error) {
|
||||
var out io.WriteCloser
|
||||
var hasher *hash.MultiHasher
|
||||
var resumeOpt *fs.OptionResume
|
||||
|
||||
for _, option := range options {
|
||||
switch x := option.(type) {
|
||||
@@ -1130,32 +1125,6 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
|
||||
return err
|
||||
}
|
||||
}
|
||||
case *fs.OptionResume:
|
||||
resumeOpt = option.(*fs.OptionResume)
|
||||
if resumeOpt.Pos != 0 {
|
||||
fs.Logf(o, "Resuming at byte position: %d", resumeOpt.Pos)
|
||||
// Discard bytes that already exist on backend
|
||||
_, err := io.CopyN(ioutil.Discard, in, resumeOpt.Pos)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
hashType := o.fs.Hashes().GetOne()
|
||||
if resumeOpt.Hash != "" {
|
||||
if err = hashType.Set(resumeOpt.Hash); err != nil {
|
||||
return err
|
||||
}
|
||||
if !o.fs.Hashes().Contains(hashType) {
|
||||
return fmt.Errorf("unsupported resume hash: %q", resumeOpt.Hash)
|
||||
}
|
||||
}
|
||||
hasher, err = hash.NewMultiHasherTypes(hash.NewHashSet(hashType))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := hasher.RestoreHashState(hashType, o.fs.hashState[o.remote]); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1169,12 +1138,7 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
|
||||
// If it is a translated link, just read in the contents, and
|
||||
// then create a symlink
|
||||
if !o.translatedLink {
|
||||
var f *os.File
|
||||
if resumeOpt != nil && resumeOpt.Pos != 0 {
|
||||
f, err = file.OpenFile(o.path, os.O_WRONLY|os.O_APPEND, 0666)
|
||||
} else {
|
||||
f, err = file.OpenFile(o.path, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0666)
|
||||
}
|
||||
f, err := file.OpenFile(o.path, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0666)
|
||||
if err != nil {
|
||||
if runtime.GOOS == "windows" && os.IsPermission(err) {
|
||||
// If permission denied on Windows might be trying to update a
|
||||
@@ -1188,7 +1152,7 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
|
||||
return err
|
||||
}
|
||||
}
|
||||
if !o.fs.opt.NoPreAllocate && resumeOpt != nil && resumeOpt.Pos == 0 {
|
||||
if !o.fs.opt.NoPreAllocate {
|
||||
// Pre-allocate the file for performance reasons
|
||||
err = file.PreAllocate(src.Size(), f)
|
||||
if err != nil {
|
||||
@@ -1209,46 +1173,7 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
|
||||
in = io.TeeReader(in, hasher)
|
||||
}
|
||||
|
||||
var cacheingWg sync.WaitGroup // Used to halt code execution while resume cache is written
|
||||
var copyWg sync.WaitGroup // Ensure that io.Copy has returned before writing resume data
|
||||
copyWg.Add(1)
|
||||
// Context for read so that we can handle io.copy being interrupted
|
||||
ctxr, cancel := context.WithCancel(ctx)
|
||||
// Create exit handler during Copy so that resume data can be written if interrupted
|
||||
var atexitOnce sync.Once
|
||||
atexitHandle := atexit.Register(func() {
|
||||
atexitOnce.Do(func() {
|
||||
if resumeOpt == nil || hasher == nil {
|
||||
return
|
||||
}
|
||||
// If OptionResume was passed, call SetID to prepare for future resumes
|
||||
// ID is the number of bytes written to the destination
|
||||
// Stops the copy so cache is consistent with remote
|
||||
cacheingWg.Add(1)
|
||||
cancel()
|
||||
copyWg.Wait()
|
||||
fs.Infof(o, "Updating resume cache")
|
||||
fileInfo, _ := o.fs.lstat(o.path)
|
||||
writtenStr := strconv.FormatInt(fileInfo.Size(), 10)
|
||||
hashType := hasher.Hashes().GetOne()
|
||||
hashState, err := hasher.GetHashState(hashType)
|
||||
if err == nil {
|
||||
err = resumeOpt.SetID(ctx, writtenStr, hashType.String(), hashState)
|
||||
}
|
||||
if err != nil {
|
||||
fs.Logf(o, "Updating resume cache failed: %v", err)
|
||||
}
|
||||
})
|
||||
})
|
||||
cr := readers.NewContextReader(ctxr, in)
|
||||
_, err = io.Copy(out, cr)
|
||||
copyWg.Done()
|
||||
atexit.Unregister(atexitHandle)
|
||||
if errors.Is(err, context.Canceled) {
|
||||
// If resume data is being written we want to wait here for the program to exit
|
||||
cacheingWg.Wait()
|
||||
}
|
||||
|
||||
_, err = io.Copy(out, in)
|
||||
closeErr := out.Close()
|
||||
if err == nil {
|
||||
err = closeErr
|
||||
@@ -1413,44 +1338,9 @@ func cleanRootPath(s string, noUNC bool, enc encoder.MultiEncoder) string {
|
||||
return s
|
||||
}
|
||||
|
||||
// Resume checks whether the (remote, ID) pair is valid and returns
|
||||
// the point the file should be resumed from or an error.
|
||||
func (f *Fs) Resume(ctx context.Context, remote, ID, hashName, hashState string) (Pos int64, err error) {
|
||||
cachedPos, err := strconv.ParseInt(ID, 10, 64)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
// Compare hash of partial file on remote with partial hash in cache
|
||||
remoteObject, err := f.NewObject(ctx, remote)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
if remoteObject.Size() != cachedPos {
|
||||
return 0, errors.New("size on remote does not match resume cache")
|
||||
}
|
||||
hashType := hash.NameToType(hashName)
|
||||
remoteHash, err := remoteObject.Hash(ctx, hashType)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
cachedHash, err := hash.SumPartialHash(hashName, hashState)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
// Hashes match, attempt resume
|
||||
if cachedHash == remoteHash {
|
||||
f.hashState[remote] = hashState
|
||||
return cachedPos, nil
|
||||
}
|
||||
// No valid position found, restart from beginning
|
||||
fs.Infof(remote, "Not resuming as cached hash state did not match hash state on remote")
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
// Check the interfaces are satisfied
|
||||
var (
|
||||
_ fs.Fs = &Fs{}
|
||||
_ fs.Resumer = &Fs{}
|
||||
_ fs.Purger = &Fs{}
|
||||
_ fs.PutStreamer = &Fs{}
|
||||
_ fs.Mover = &Fs{}
|
||||
|
||||
@@ -3,7 +3,7 @@ package local
|
||||
import (
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"sync"
|
||||
"github.com/rclone/rclone/lib/sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
|
||||
@@ -12,7 +12,7 @@ import (
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"github.com/rclone/rclone/lib/sync"
|
||||
"time"
|
||||
|
||||
"encoding/hex"
|
||||
|
||||
@@ -22,7 +22,7 @@ import (
|
||||
"io"
|
||||
"path"
|
||||
"strings"
|
||||
"sync"
|
||||
"github.com/rclone/rclone/lib/sync"
|
||||
"time"
|
||||
|
||||
"github.com/rclone/rclone/fs"
|
||||
|
||||
@@ -11,7 +11,7 @@ import (
|
||||
"io/ioutil"
|
||||
"path"
|
||||
"strings"
|
||||
"sync"
|
||||
"github.com/rclone/rclone/lib/sync"
|
||||
"time"
|
||||
|
||||
"github.com/rclone/rclone/fs"
|
||||
|
||||
@@ -16,7 +16,7 @@ import (
|
||||
"regexp"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"github.com/rclone/rclone/lib/sync"
|
||||
"time"
|
||||
|
||||
"github.com/rclone/rclone/backend/onedrive/api"
|
||||
|
||||
@@ -13,7 +13,7 @@ import (
|
||||
"hash"
|
||||
"io"
|
||||
"sort"
|
||||
"sync"
|
||||
"github.com/rclone/rclone/lib/sync"
|
||||
|
||||
"github.com/rclone/rclone/fs"
|
||||
"github.com/rclone/rclone/lib/atexit"
|
||||
|
||||
@@ -19,7 +19,7 @@ import (
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"github.com/rclone/rclone/lib/sync"
|
||||
"time"
|
||||
|
||||
"github.com/aws/aws-sdk-go/aws"
|
||||
|
||||
@@ -4,7 +4,7 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net/url"
|
||||
"sync"
|
||||
"github.com/rclone/rclone/lib/sync"
|
||||
"time"
|
||||
|
||||
"github.com/rclone/rclone/fs"
|
||||
|
||||
@@ -10,7 +10,7 @@ import (
|
||||
"path"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"github.com/rclone/rclone/lib/sync"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/go-semver/semver"
|
||||
|
||||
@@ -17,7 +17,7 @@ import (
|
||||
"regexp"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"github.com/rclone/rclone/lib/sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
@@ -42,7 +42,8 @@ const (
|
||||
hashCommandNotSupported = "none"
|
||||
minSleep = 100 * time.Millisecond
|
||||
maxSleep = 2 * time.Second
|
||||
decayConstant = 2 // bigger for slower decay, exponential
|
||||
decayConstant = 2 // bigger for slower decay, exponential
|
||||
keepAliveInterval = time.Minute // send keepalives every this long while running commands
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -339,6 +340,32 @@ func (c *conn) wait() {
|
||||
c.err <- c.sshClient.Conn.Wait()
|
||||
}
|
||||
|
||||
// Send a keepalive over the ssh connection
|
||||
func (c *conn) sendKeepAlive() {
|
||||
_, _, err := c.sshClient.SendRequest("keepalive@openssh.com", true, nil)
|
||||
if err != nil {
|
||||
fs.Debugf(nil, "Failed to send keep alive: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Send keepalives every interval over the ssh connection until done is closed
|
||||
func (c *conn) sendKeepAlives(interval time.Duration) (done chan struct{}) {
|
||||
done = make(chan struct{})
|
||||
go func() {
|
||||
t := time.NewTicker(interval)
|
||||
defer t.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-t.C:
|
||||
c.sendKeepAlive()
|
||||
case <-done:
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
return done
|
||||
}
|
||||
|
||||
// Closes the connection
|
||||
func (c *conn) close() error {
|
||||
sftpErr := c.sftpClient.Close()
|
||||
@@ -1098,6 +1125,9 @@ func (f *Fs) run(ctx context.Context, cmd string) ([]byte, error) {
|
||||
}
|
||||
defer f.putSftpConnection(&c, err)
|
||||
|
||||
// Send keepalives while the connection is open
|
||||
defer close(c.sendKeepAlives(keepAliveInterval))
|
||||
|
||||
session, err := c.sshClient.NewSession()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("run: get SFTP session: %w", err)
|
||||
@@ -1110,10 +1140,12 @@ func (f *Fs) run(ctx context.Context, cmd string) ([]byte, error) {
|
||||
session.Stdout = &stdout
|
||||
session.Stderr = &stderr
|
||||
|
||||
fs.Debugf(f, "Running remote command: %s", cmd)
|
||||
err = session.Run(cmd)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to run %q: %s: %w", cmd, stderr.Bytes(), err)
|
||||
return nil, fmt.Errorf("failed to run %q: %s: %w", cmd, bytes.TrimSpace(stderr.Bytes()), err)
|
||||
}
|
||||
fs.Debugf(f, "Remote command result: %s", bytes.TrimSpace(stdout.Bytes()))
|
||||
|
||||
return stdout.Bytes(), nil
|
||||
}
|
||||
@@ -1230,8 +1262,6 @@ func (o *Object) Remote() string {
|
||||
// Hash returns the selected checksum of the file
|
||||
// If no checksum is available it returns ""
|
||||
func (o *Object) Hash(ctx context.Context, r hash.Type) (string, error) {
|
||||
o.fs.addSession() // Show session in use
|
||||
defer o.fs.removeSession()
|
||||
if o.fs.opt.DisableHashCheck {
|
||||
return "", nil
|
||||
}
|
||||
@@ -1255,36 +1285,16 @@ func (o *Object) Hash(ctx context.Context, r hash.Type) (string, error) {
|
||||
return "", hash.ErrUnsupported
|
||||
}
|
||||
|
||||
c, err := o.fs.getSftpConnection(ctx)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("Hash get SFTP connection: %w", err)
|
||||
}
|
||||
session, err := c.sshClient.NewSession()
|
||||
o.fs.putSftpConnection(&c, err)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("Hash put SFTP connection: %w", err)
|
||||
}
|
||||
|
||||
var stdout, stderr bytes.Buffer
|
||||
session.Stdout = &stdout
|
||||
session.Stderr = &stderr
|
||||
escapedPath := shellEscape(o.path())
|
||||
if o.fs.opt.PathOverride != "" {
|
||||
escapedPath = shellEscape(path.Join(o.fs.opt.PathOverride, o.remote))
|
||||
}
|
||||
err = session.Run(hashCmd + " " + escapedPath)
|
||||
fs.Debugf(nil, "sftp cmd = %s", escapedPath)
|
||||
b, err := o.fs.run(ctx, hashCmd+" "+escapedPath)
|
||||
if err != nil {
|
||||
_ = session.Close()
|
||||
fs.Debugf(o, "Failed to calculate %v hash: %v (%s)", r, err, bytes.TrimSpace(stderr.Bytes()))
|
||||
return "", nil
|
||||
return "", fmt.Errorf("failed to calculate %v hash: %w", r, err)
|
||||
}
|
||||
|
||||
_ = session.Close()
|
||||
b := stdout.Bytes()
|
||||
fs.Debugf(nil, "sftp output = %q", b)
|
||||
str := parseHash(b)
|
||||
fs.Debugf(nil, "sftp hash = %q", str)
|
||||
if r == hash.MD5 {
|
||||
o.md5sum = &str
|
||||
} else if r == hash.SHA1 {
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
|
||||
package sftp
|
||||
|
||||
import "sync"
|
||||
import "github.com/rclone/rclone/lib/sync"
|
||||
|
||||
// stringLock locks for string IDs passed in
|
||||
type stringLock struct {
|
||||
|
||||
@@ -5,7 +5,7 @@ package sftp
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
"github.com/rclone/rclone/lib/sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
|
||||
@@ -13,7 +13,7 @@ import (
|
||||
"fmt"
|
||||
"io"
|
||||
"strings"
|
||||
"sync"
|
||||
"github.com/rclone/rclone/lib/sync"
|
||||
|
||||
"github.com/rclone/rclone/backend/sharefile/api"
|
||||
"github.com/rclone/rclone/fs"
|
||||
|
||||
@@ -23,7 +23,7 @@ import (
|
||||
"regexp"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"github.com/rclone/rclone/lib/sync"
|
||||
"time"
|
||||
|
||||
"github.com/rclone/rclone/backend/sugarsync/api"
|
||||
|
||||
@@ -4,7 +4,7 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"sync"
|
||||
"github.com/rclone/rclone/lib/sync"
|
||||
"time"
|
||||
|
||||
"github.com/rclone/rclone/backend/union/upstream"
|
||||
|
||||
@@ -3,7 +3,7 @@ package policy
|
||||
import (
|
||||
"context"
|
||||
"path"
|
||||
"sync"
|
||||
"github.com/rclone/rclone/lib/sync"
|
||||
|
||||
"github.com/rclone/rclone/backend/union/upstream"
|
||||
"github.com/rclone/rclone/fs"
|
||||
|
||||
@@ -3,7 +3,7 @@ package policy
|
||||
import (
|
||||
"context"
|
||||
"path"
|
||||
"sync"
|
||||
"github.com/rclone/rclone/lib/sync"
|
||||
"time"
|
||||
|
||||
"github.com/rclone/rclone/backend/union/upstream"
|
||||
|
||||
@@ -9,7 +9,7 @@ import (
|
||||
"path"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"sync"
|
||||
"github.com/rclone/rclone/lib/sync"
|
||||
"time"
|
||||
|
||||
"github.com/rclone/rclone/backend/union/policy"
|
||||
|
||||
@@ -18,7 +18,7 @@ func TestIntegration(t *testing.T) {
|
||||
}
|
||||
fstests.Run(t, &fstests.Opt{
|
||||
RemoteName: *fstest.RemoteName,
|
||||
UnimplementableFsMethods: []string{"OpenWriterAt", "DuplicateFiles", "Resume"},
|
||||
UnimplementableFsMethods: []string{"OpenWriterAt", "DuplicateFiles"},
|
||||
UnimplementableObjectMethods: []string{"MimeType"},
|
||||
})
|
||||
}
|
||||
|
||||
@@ -9,7 +9,7 @@ import (
|
||||
"path"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"sync"
|
||||
"github.com/rclone/rclone/lib/sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
|
||||
@@ -6,7 +6,7 @@ import (
|
||||
"regexp"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"github.com/rclone/rclone/lib/sync"
|
||||
"time"
|
||||
|
||||
"github.com/rclone/rclone/fs"
|
||||
|
||||
@@ -21,7 +21,7 @@ import (
|
||||
"path"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"github.com/rclone/rclone/lib/sync"
|
||||
"time"
|
||||
|
||||
"github.com/rclone/rclone/backend/webdav/api"
|
||||
|
||||
@@ -66,6 +66,11 @@ func init() {
|
||||
})
|
||||
},
|
||||
Options: append(oauthutil.SharedOptions, []fs.Option{{
|
||||
Name: "hard_delete",
|
||||
Help: "Delete files permanently rather than putting them into the trash.",
|
||||
Default: false,
|
||||
Advanced: true,
|
||||
}, {
|
||||
Name: config.ConfigEncoding,
|
||||
Help: config.ConfigEncodingHelp,
|
||||
Advanced: true,
|
||||
@@ -79,8 +84,9 @@ func init() {
|
||||
|
||||
// Options defines the configuration for this backend
|
||||
type Options struct {
|
||||
Token string `config:"token"`
|
||||
Enc encoder.MultiEncoder `config:"encoding"`
|
||||
Token string `config:"token"`
|
||||
HardDelete bool `config:"hard_delete"`
|
||||
Enc encoder.MultiEncoder `config:"encoding"`
|
||||
}
|
||||
|
||||
// Fs represents a remote yandex
|
||||
@@ -630,7 +636,7 @@ func (f *Fs) purgeCheck(ctx context.Context, dir string, check bool) error {
|
||||
}
|
||||
}
|
||||
//delete directory
|
||||
return f.delete(ctx, root, false)
|
||||
return f.delete(ctx, root, f.opt.HardDelete)
|
||||
}
|
||||
|
||||
// Rmdir deletes the container
|
||||
@@ -1141,7 +1147,7 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
|
||||
|
||||
// Remove an object
|
||||
func (o *Object) Remove(ctx context.Context) error {
|
||||
return o.fs.delete(ctx, o.filePath(), false)
|
||||
return o.fs.delete(ctx, o.filePath(), o.fs.opt.HardDelete)
|
||||
}
|
||||
|
||||
// MimeType of an Object if known, "" otherwise
|
||||
|
||||
@@ -19,7 +19,7 @@ import (
|
||||
"runtime"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
"github.com/rclone/rclone/lib/sync"
|
||||
"text/template"
|
||||
"time"
|
||||
|
||||
|
||||
@@ -19,7 +19,7 @@ import (
|
||||
"runtime/pprof"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"github.com/rclone/rclone/lib/sync"
|
||||
"time"
|
||||
|
||||
"github.com/rclone/rclone/fs"
|
||||
|
||||
@@ -9,7 +9,7 @@ import (
|
||||
"io"
|
||||
"os"
|
||||
"path"
|
||||
"sync"
|
||||
"github.com/rclone/rclone/lib/sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
|
||||
@@ -12,7 +12,7 @@ import (
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sort"
|
||||
"sync"
|
||||
"github.com/rclone/rclone/lib/sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
|
||||
@@ -7,7 +7,7 @@ import (
|
||||
"os"
|
||||
"runtime"
|
||||
"strings"
|
||||
"sync"
|
||||
"github.com/rclone/rclone/lib/sync"
|
||||
"time"
|
||||
|
||||
"github.com/rclone/rclone/cmd"
|
||||
|
||||
@@ -5,7 +5,7 @@ import (
|
||||
"errors"
|
||||
"log"
|
||||
"sort"
|
||||
"sync"
|
||||
"github.com/rclone/rclone/lib/sync"
|
||||
"time"
|
||||
|
||||
"github.com/rclone/rclone/fs"
|
||||
|
||||
@@ -5,7 +5,7 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"path"
|
||||
"sync"
|
||||
"github.com/rclone/rclone/lib/sync"
|
||||
|
||||
"github.com/rclone/rclone/fs"
|
||||
"github.com/rclone/rclone/fs/walk"
|
||||
|
||||
@@ -6,7 +6,7 @@ import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"strings"
|
||||
"sync"
|
||||
"github.com/rclone/rclone/lib/sync"
|
||||
"time"
|
||||
|
||||
"github.com/rclone/rclone/fs"
|
||||
|
||||
@@ -3,7 +3,7 @@ package rcd
|
||||
import (
|
||||
"context"
|
||||
"log"
|
||||
"sync"
|
||||
"github.com/rclone/rclone/lib/sync"
|
||||
|
||||
sysdnotify "github.com/iguanesolutions/go-systemd/v5/notify"
|
||||
"github.com/rclone/rclone/cmd"
|
||||
|
||||
@@ -10,7 +10,7 @@ import (
|
||||
"path/filepath"
|
||||
"reflect"
|
||||
"sort"
|
||||
"sync"
|
||||
"github.com/rclone/rclone/lib/sync"
|
||||
"time"
|
||||
|
||||
sysdnotify "github.com/iguanesolutions/go-systemd/v5/notify"
|
||||
|
||||
@@ -14,7 +14,7 @@ import (
|
||||
"os"
|
||||
"os/user"
|
||||
"strconv"
|
||||
"sync"
|
||||
"github.com/rclone/rclone/lib/sync"
|
||||
"time"
|
||||
|
||||
"github.com/rclone/rclone/cmd"
|
||||
|
||||
@@ -5,7 +5,7 @@ package restic
|
||||
|
||||
import (
|
||||
"strings"
|
||||
"sync"
|
||||
"github.com/rclone/rclone/lib/sync"
|
||||
|
||||
"github.com/rclone/rclone/fs"
|
||||
)
|
||||
|
||||
@@ -16,7 +16,7 @@ import (
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"github.com/rclone/rclone/lib/sync"
|
||||
"time"
|
||||
|
||||
"github.com/rclone/rclone/cmd"
|
||||
|
||||
@@ -3,7 +3,7 @@ package memory
|
||||
import (
|
||||
"context"
|
||||
"runtime"
|
||||
"sync"
|
||||
"github.com/rclone/rclone/lib/sync"
|
||||
|
||||
"github.com/rclone/rclone/cmd"
|
||||
"github.com/rclone/rclone/cmd/test"
|
||||
|
||||
@@ -550,3 +550,6 @@ put them back in again.` >}}
|
||||
* Fredric Arklid <fredric.arklid@consid.se>
|
||||
* Andy Jackson <Andrew.Jackson@bl.uk>
|
||||
* Sinan Tan <i@tinytangent.com>
|
||||
* deinferno <14363193+deinferno@users.noreply.github.com>
|
||||
* rsapkf <rsapkfff@pm.me>
|
||||
* Will Holtz <wholtz@gmail.com>
|
||||
|
||||
@@ -81,6 +81,14 @@ key. It is stored using RFC3339 Format time with nanosecond
|
||||
precision. The metadata is supplied during directory listings so
|
||||
there is no overhead to using it.
|
||||
|
||||
### Performance
|
||||
|
||||
When uploading large files, increasing the value of
|
||||
`--azureblob-upload-concurrency` will increase performance at the cost
|
||||
of using more memory. The default of 16 is set quite conservatively to
|
||||
use less memory. It maybe be necessary raise it to 64 or higher to
|
||||
fully utilize a 1 GBit/s link with a single file transfer.
|
||||
|
||||
### Restricted filename characters
|
||||
|
||||
In addition to the [default restricted characters set](/overview/#restricted-characters)
|
||||
|
||||
@@ -107,8 +107,9 @@ At the end of the non interactive process, rclone will return a result
|
||||
with `State` as empty string.
|
||||
|
||||
If `--all` is passed then rclone will ask all the config questions,
|
||||
not just the post config questions. Any parameters are used as
|
||||
defaults for questions as usual.
|
||||
not just the post config questions. Parameters that are supplied on
|
||||
the command line or from environment variables are used as defaults
|
||||
for questions as usual.
|
||||
|
||||
Note that `bin/config.py` in the rclone source implements this protocol
|
||||
as a readable demonstration.
|
||||
|
||||
@@ -80,7 +80,7 @@ List all the files in your pCloud
|
||||
|
||||
rclone ls remote:
|
||||
|
||||
To copy a local directory to an pCloud directory called backup
|
||||
To copy a local directory to a pCloud directory called backup
|
||||
|
||||
rclone copy /home/source remote:backup
|
||||
|
||||
|
||||
@@ -620,7 +620,7 @@ issue](https://github.com/pkg/sftp/issues/156) is fixed.
|
||||
Note that since SFTP isn't HTTP based the following flags don't work
|
||||
with it: `--dump-headers`, `--dump-bodies`, `--dump-auth`
|
||||
|
||||
Note that `--timeout` isn't supported (but `--contimeout` is).
|
||||
Note that `--timeout` and `--contimeout` are both supported.
|
||||
|
||||
|
||||
## C14 {#c14}
|
||||
|
||||
@@ -175,6 +175,15 @@ Leave blank to use the provider defaults.
|
||||
- Type: string
|
||||
- Default: ""
|
||||
|
||||
#### --yandex-hard-delete
|
||||
|
||||
Delete files permanently rather than putting them into the trash.
|
||||
|
||||
- Config: hard_delete
|
||||
- Env Var: RCLONE_YANDEX_HARD_DELETE
|
||||
- Type: bool
|
||||
- Default: false
|
||||
|
||||
#### --yandex-encoding
|
||||
|
||||
This sets the encoding for the backend.
|
||||
|
||||
@@ -6,7 +6,7 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"sync"
|
||||
"github.com/rclone/rclone/lib/sync"
|
||||
"time"
|
||||
"unicode/utf8"
|
||||
|
||||
|
||||
@@ -2,7 +2,7 @@ package accounting
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"github.com/rclone/rclone/lib/sync"
|
||||
|
||||
"github.com/rclone/rclone/fs"
|
||||
)
|
||||
|
||||
@@ -6,7 +6,7 @@ import (
|
||||
"fmt"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
"github.com/rclone/rclone/lib/sync"
|
||||
"time"
|
||||
|
||||
"github.com/rclone/rclone/fs"
|
||||
|
||||
@@ -2,7 +2,7 @@ package accounting
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"github.com/rclone/rclone/lib/sync"
|
||||
|
||||
"github.com/rclone/rclone/fs/rc"
|
||||
|
||||
|
||||
@@ -4,11 +4,11 @@ import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/rclone/rclone/fs"
|
||||
"github.com/rclone/rclone/fs/rc"
|
||||
"github.com/rclone/rclone/lib/sync"
|
||||
"golang.org/x/time/rate"
|
||||
)
|
||||
|
||||
|
||||
@@ -4,7 +4,7 @@ import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"io"
|
||||
"sync"
|
||||
"github.com/rclone/rclone/lib/sync"
|
||||
"time"
|
||||
|
||||
"github.com/rclone/rclone/fs"
|
||||
|
||||
@@ -5,7 +5,7 @@ import (
|
||||
"fmt"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
"github.com/rclone/rclone/lib/sync"
|
||||
|
||||
"github.com/rclone/rclone/fs"
|
||||
"github.com/rclone/rclone/fs/rc"
|
||||
|
||||
@@ -6,7 +6,7 @@ import (
|
||||
"context"
|
||||
"errors"
|
||||
"io"
|
||||
"sync"
|
||||
"github.com/rclone/rclone/lib/sync"
|
||||
"time"
|
||||
|
||||
"github.com/rclone/rclone/fs"
|
||||
|
||||
@@ -9,7 +9,7 @@ import (
|
||||
"io/ioutil"
|
||||
"math/rand"
|
||||
"strings"
|
||||
"sync"
|
||||
"github.com/rclone/rclone/lib/sync"
|
||||
"testing"
|
||||
"testing/iotest"
|
||||
"time"
|
||||
|
||||
2
fs/cache/cache.go
vendored
2
fs/cache/cache.go
vendored
@@ -4,7 +4,7 @@ package cache
|
||||
import (
|
||||
"context"
|
||||
"runtime"
|
||||
"sync"
|
||||
"github.com/rclone/rclone/lib/sync"
|
||||
|
||||
"github.com/rclone/rclone/fs"
|
||||
"github.com/rclone/rclone/fs/filter"
|
||||
|
||||
@@ -4,7 +4,7 @@ import (
|
||||
"context"
|
||||
"errors"
|
||||
"io"
|
||||
"sync"
|
||||
"github.com/rclone/rclone/lib/sync"
|
||||
|
||||
"github.com/rclone/rclone/fs"
|
||||
"github.com/rclone/rclone/fs/hash"
|
||||
|
||||
@@ -130,8 +130,6 @@ type ConfigInfo struct {
|
||||
FsCacheExpireDuration time.Duration
|
||||
FsCacheExpireInterval time.Duration
|
||||
DisableHTTP2 bool
|
||||
MaxResumeCacheSize SizeSuffix
|
||||
ResumeCutoff SizeSuffix
|
||||
HumanReadable bool
|
||||
KvLockTime time.Duration // maximum time to keep key-value database locked by process
|
||||
}
|
||||
@@ -165,8 +163,6 @@ func NewConfig() *ConfigInfo {
|
||||
c.TPSLimitBurst = 1
|
||||
c.MaxTransfer = -1
|
||||
c.MaxBacklog = 10000
|
||||
c.MaxResumeCacheSize = SizeSuffix(100 * 1024)
|
||||
c.ResumeCutoff = -1
|
||||
// We do not want to set the default here. We use this variable being empty as part of the fall-through of options.
|
||||
// c.StatsOneLineDateFormat = "2006/01/02 15:04:05 - "
|
||||
c.MultiThreadCutoff = SizeSuffix(250 * 1024 * 1024)
|
||||
|
||||
@@ -8,7 +8,7 @@ import (
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"sync"
|
||||
"github.com/rclone/rclone/lib/sync"
|
||||
|
||||
"github.com/Unknwon/goconfig"
|
||||
"github.com/rclone/rclone/fs"
|
||||
|
||||
@@ -132,8 +132,6 @@ func AddFlags(ci *fs.ConfigInfo, flagSet *pflag.FlagSet) {
|
||||
flags.BoolVarP(flagSet, &ci.RefreshTimes, "refresh-times", "", ci.RefreshTimes, "Refresh the modtime of remote files")
|
||||
flags.BoolVarP(flagSet, &ci.NoConsole, "no-console", "", ci.NoConsole, "Hide console window (supported on Windows only)")
|
||||
flags.StringVarP(flagSet, &dscp, "dscp", "", "", "Set DSCP value to connections, value or name, e.g. CS1, LE, DF, AF21")
|
||||
flags.FVarP(flagSet, &ci.MaxResumeCacheSize, "max-resume-cache-size", "", "The maximum size of the cache used to store data necessary for resuming uploads. When the storage grows beyond this size, the oldest resume data will be deleted. (default 100k")
|
||||
flags.FVarP(flagSet, &ci.ResumeCutoff, "resume-cutoff", "", "If set, attempt to resume all partial uploads larger than this size. (default off)")
|
||||
flags.DurationVarP(flagSet, &ci.FsCacheExpireDuration, "fs-cache-expire-duration", "", ci.FsCacheExpireDuration, "Cache remotes for this long (0 to disable caching)")
|
||||
flags.DurationVarP(flagSet, &ci.FsCacheExpireInterval, "fs-cache-expire-interval", "", ci.FsCacheExpireInterval, "Interval to check for expired remotes")
|
||||
flags.BoolVarP(flagSet, &ci.DisableHTTP2, "disable-http2", "", ci.DisableHTTP2, "Disable HTTP/2 in the global transport")
|
||||
|
||||
@@ -2,7 +2,7 @@ package config
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"sync"
|
||||
"github.com/rclone/rclone/lib/sync"
|
||||
)
|
||||
|
||||
// defaultStorage implements config.Storage, providing in-memory config.
|
||||
|
||||
@@ -163,10 +163,6 @@ type Features struct {
|
||||
// Shutdown the backend, closing any background tasks and any
|
||||
// cached connections.
|
||||
Shutdown func(ctx context.Context) error
|
||||
|
||||
// Resume checks whether the (remote, ID) pair is valid and returns
|
||||
// the point the file should be resumed from or an error.
|
||||
Resume func(ctx context.Context, remote, ID, hashName, hashState string) (Pos int64, err error)
|
||||
}
|
||||
|
||||
// Disable nil's out the named feature. If it isn't found then it
|
||||
@@ -294,9 +290,6 @@ func (ft *Features) Fill(ctx context.Context, f Fs) *Features {
|
||||
if do, ok := f.(Shutdowner); ok {
|
||||
ft.Shutdown = do.Shutdown
|
||||
}
|
||||
if do, ok := f.(Resumer); ok {
|
||||
ft.Resume = do.Resume
|
||||
}
|
||||
return ft.DisableList(GetConfig(ctx).DisableFeatures)
|
||||
}
|
||||
|
||||
@@ -643,13 +636,6 @@ type Shutdowner interface {
|
||||
Shutdown(ctx context.Context) error
|
||||
}
|
||||
|
||||
// Resumer is an optional interface for Fs
|
||||
type Resumer interface {
|
||||
// Resume checks whether the (remote, ID) pair is valid and returns
|
||||
// the point the file should be resumed from or an error.
|
||||
Resume(ctx context.Context, remote, ID, hashName, hashState string) (Pos int64, err error)
|
||||
}
|
||||
|
||||
// ObjectsChan is a channel of Objects
|
||||
type ObjectsChan chan Object
|
||||
|
||||
|
||||
@@ -6,7 +6,7 @@ import (
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"strings"
|
||||
"sync"
|
||||
"github.com/rclone/rclone/lib/sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
|
||||
1
fs/fs.go
1
fs/fs.go
@@ -48,7 +48,6 @@ var (
|
||||
ErrorNotImplemented = errors.New("optional feature not implemented")
|
||||
ErrorCommandNotFound = errors.New("command not found")
|
||||
ErrorFileNameTooLong = errors.New("file name too long")
|
||||
ErrorCantResume = errors.New("can't resume file upload")
|
||||
)
|
||||
|
||||
// CheckClose is a utility function used to check the return from
|
||||
|
||||
@@ -7,7 +7,7 @@ import (
|
||||
"fmt"
|
||||
"os"
|
||||
"strings"
|
||||
"sync"
|
||||
"github.com/rclone/rclone/lib/sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
|
||||
@@ -5,7 +5,7 @@ import (
|
||||
"net"
|
||||
"runtime"
|
||||
"strings"
|
||||
"sync"
|
||||
"github.com/rclone/rclone/lib/sync"
|
||||
"time"
|
||||
|
||||
"github.com/rclone/rclone/fs"
|
||||
|
||||
@@ -12,7 +12,7 @@ import (
|
||||
"net/http"
|
||||
"net/http/cookiejar"
|
||||
"net/http/httputil"
|
||||
"sync"
|
||||
"github.com/rclone/rclone/lib/sync"
|
||||
"time"
|
||||
|
||||
"github.com/rclone/rclone/fs"
|
||||
|
||||
@@ -4,7 +4,6 @@ import (
|
||||
"crypto/md5"
|
||||
"crypto/sha1"
|
||||
"crypto/sha256"
|
||||
"encoding"
|
||||
"encoding/base64"
|
||||
"encoding/hex"
|
||||
"errors"
|
||||
@@ -229,14 +228,6 @@ func (m *MultiHasher) Write(p []byte) (n int, err error) {
|
||||
return n, err
|
||||
}
|
||||
|
||||
// Hashes returns accumulated hash types.
|
||||
func (m *MultiHasher) Hashes() (set Set) {
|
||||
for ht := range m.h {
|
||||
set.Add(ht)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Sums returns the sums of all accumulated hashes as hex encoded
|
||||
// strings.
|
||||
func (m *MultiHasher) Sums() map[Type]string {
|
||||
@@ -273,67 +264,6 @@ func (m *MultiHasher) Size() int64 {
|
||||
return m.size
|
||||
}
|
||||
|
||||
// GetHashState returns the partial hash state for the given hash type encoded as a string
|
||||
func (m *MultiHasher) GetHashState(hashType Type) (string, error) {
|
||||
h, ok := m.h[hashType]
|
||||
if !ok {
|
||||
return "", ErrUnsupported
|
||||
}
|
||||
marshaler, ok := h.(encoding.BinaryMarshaler)
|
||||
if !ok {
|
||||
return "", errors.New(hashType.String() + " does not implement encoding.BinaryMarshaler")
|
||||
}
|
||||
data, err := marshaler.MarshalBinary()
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
return base64.StdEncoding.EncodeToString(data), nil
|
||||
}
|
||||
|
||||
// RestoreHashState restores the partial hash state for the passed hash type
|
||||
func (m *MultiHasher) RestoreHashState(hashType Type, hashState string) error {
|
||||
partialHashState, err := base64.StdEncoding.DecodeString(hashState)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
unmarshaler, ok := m.h[hashType].(encoding.BinaryUnmarshaler)
|
||||
if ok {
|
||||
if err := unmarshaler.UnmarshalBinary(partialHashState); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// SumPartialHash returns the hash of the partial hash state
|
||||
func SumPartialHash(hashName, hashState string) (string, error) {
|
||||
partialHashDef, ok := name2hash[hashName]
|
||||
if !ok {
|
||||
return "", ErrUnsupported
|
||||
}
|
||||
partialHash := partialHashDef.newFunc()
|
||||
partialHashState, err := base64.StdEncoding.DecodeString(hashState)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
unmarshaler, ok := partialHash.(encoding.BinaryUnmarshaler)
|
||||
if ok {
|
||||
if err := unmarshaler.UnmarshalBinary(partialHashState); err != nil {
|
||||
return "", err
|
||||
}
|
||||
}
|
||||
return hex.EncodeToString(partialHash.Sum(nil)), nil
|
||||
}
|
||||
|
||||
// NameToType returns the requested hash type or None if the hash type isn't supported
|
||||
func NameToType(hashName string) Type {
|
||||
hashDef, ok := name2hash[hashName]
|
||||
if !ok {
|
||||
return None
|
||||
}
|
||||
return hashDef.hashType
|
||||
}
|
||||
|
||||
// A Set Indicates one or more hash types.
|
||||
type Set int
|
||||
|
||||
|
||||
@@ -7,7 +7,7 @@ import (
|
||||
"path"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
"github.com/rclone/rclone/lib/sync"
|
||||
|
||||
"github.com/rclone/rclone/fs"
|
||||
"github.com/rclone/rclone/fs/dirtree"
|
||||
|
||||
@@ -7,7 +7,7 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"strings"
|
||||
"sync"
|
||||
"github.com/rclone/rclone/lib/sync"
|
||||
"testing"
|
||||
|
||||
_ "github.com/rclone/rclone/backend/local"
|
||||
|
||||
@@ -3,19 +3,13 @@
|
||||
package fs
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
"github.com/rclone/rclone/fs/hash"
|
||||
"github.com/rclone/rclone/lib/cacheroot"
|
||||
)
|
||||
|
||||
// OpenOption is an interface describing options for Open
|
||||
@@ -236,145 +230,6 @@ func (o *HashesOption) Mandatory() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
// OptionResume defines a Put/Upload for doing resumes
|
||||
type OptionResume struct {
|
||||
ID string // resume this ID if set
|
||||
Pos int64 // and resume from this position
|
||||
Hash string
|
||||
Src Object
|
||||
F Fs
|
||||
Remote string
|
||||
CacheCleaned bool
|
||||
CacheDir string
|
||||
}
|
||||
|
||||
// SetID will be called by backend's Put/Update function if the object's upload
|
||||
// could be resumed upon failure
|
||||
//
|
||||
// SetID takes the passed resume ID, hash state, hash name and Fingerprint of the object and stores it in
|
||||
// --cache-dir so that future Copy operations can resume the upload if it fails
|
||||
func (o *OptionResume) SetID(ctx context.Context, ID, hashName, hashState string) error {
|
||||
ci := GetConfig(ctx)
|
||||
// Get the Fingerprint of the src object so that future Copy operations can ensure the
|
||||
// object hasn't changed before resuming an upload
|
||||
fingerprint := Fingerprint(ctx, o.Src, true)
|
||||
data, err := marshalResumeJSON(ctx, fingerprint, ID, hashName, hashState)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to marshal data JSON: %w", err)
|
||||
}
|
||||
if len(data) < int(ci.MaxResumeCacheSize) {
|
||||
// Each remote will have its own directory for cached resume files
|
||||
dirPath, _, err := cacheroot.CreateCacheRoot(o.CacheDir, o.F.Name(), o.F.Root(), "resume")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = os.MkdirAll(dirPath, os.ModePerm)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create cache directory %v: %w", dirPath, err)
|
||||
}
|
||||
// Write resume data to disk
|
||||
cachePath := filepath.Join(dirPath, o.Remote)
|
||||
cacheFile, err := os.Create(cachePath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create cache file %v: %w", cachePath, err)
|
||||
}
|
||||
defer func() {
|
||||
_ = cacheFile.Close()
|
||||
}()
|
||||
_, errWrite := cacheFile.Write(data)
|
||||
if errWrite != nil {
|
||||
return fmt.Errorf("failed to write JSON to file: %w", errWrite)
|
||||
}
|
||||
}
|
||||
if !o.CacheCleaned {
|
||||
rootCacheDir := filepath.Join(o.CacheDir, "resume")
|
||||
if err := cleanResumeCache(ctx, rootCacheDir); err != nil {
|
||||
return fmt.Errorf("failed to clean resume cache: %w", err)
|
||||
}
|
||||
}
|
||||
o.CacheCleaned = true
|
||||
return nil
|
||||
}
|
||||
|
||||
// ResumeJSON is a struct for storing resume info in cache
|
||||
type ResumeJSON struct {
|
||||
Fingerprint string `json:"fprint"`
|
||||
ID string `json:"id"`
|
||||
HashName string `json:"hname"`
|
||||
HashState string `json:"hstate"`
|
||||
}
|
||||
|
||||
func marshalResumeJSON(ctx context.Context, fprint, id, hashName, hashState string) ([]byte, error) {
|
||||
resumedata := ResumeJSON{
|
||||
Fingerprint: fprint,
|
||||
ID: id,
|
||||
HashName: hashName,
|
||||
HashState: hashState,
|
||||
}
|
||||
data, err := json.Marshal(&resumedata)
|
||||
return data, err
|
||||
}
|
||||
|
||||
// cleanCache checks the size of the resume cache and removes the oldest resume files if more than limit
|
||||
func cleanResumeCache(ctx context.Context, rootCacheDir string) error {
|
||||
ci := GetConfig(ctx)
|
||||
var paths []string
|
||||
pathsWithInfo := make(map[string]os.FileInfo)
|
||||
totalCacheSize := int64(0)
|
||||
walkErr := filepath.Walk(rootCacheDir,
|
||||
func(path string, info os.FileInfo, err error) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if info.IsDir() {
|
||||
// Empty subdirectories in the resume cache dir can be removed
|
||||
removeErr := os.Remove(path)
|
||||
if err != nil && !os.IsNotExist(removeErr) {
|
||||
return fmt.Errorf("failed to remove empty subdirectory: %s: %w", path, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
paths = append(paths, path)
|
||||
pathsWithInfo[path] = info
|
||||
totalCacheSize += info.Size()
|
||||
return nil
|
||||
})
|
||||
if walkErr != nil {
|
||||
return fmt.Errorf("error walking through cache when cleaning cache dir: %w", walkErr)
|
||||
}
|
||||
if totalCacheSize > int64(ci.MaxResumeCacheSize) {
|
||||
sort.Slice(paths, func(i, j int) bool {
|
||||
return pathsWithInfo[paths[i]].ModTime().Before(pathsWithInfo[paths[j]].ModTime())
|
||||
})
|
||||
for _, p := range paths {
|
||||
if totalCacheSize < int64(ci.MaxResumeCacheSize) {
|
||||
break
|
||||
}
|
||||
if err := os.Remove(p); err != nil {
|
||||
return fmt.Errorf("error removing oldest cache file: %s: %w", p, err)
|
||||
}
|
||||
totalCacheSize -= pathsWithInfo[p].Size()
|
||||
Debugf(p, "Successfully removed oldest cache file")
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Header formats the option as an http header
|
||||
func (o *OptionResume) Header() (key string, value string) {
|
||||
return "", ""
|
||||
}
|
||||
|
||||
// String formats the option into human readable form
|
||||
func (o *OptionResume) String() string {
|
||||
return fmt.Sprintf("OptionResume(ID:%v, Pos:%v)", o.ID, o.Pos)
|
||||
}
|
||||
|
||||
// Mandatory returns whether the option must be parsed or can be ignored
|
||||
func (o *OptionResume) Mandatory() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
// NullOption defines an Option which does nothing
|
||||
type NullOption struct {
|
||||
}
|
||||
|
||||
@@ -10,7 +10,7 @@ import (
|
||||
"os"
|
||||
"regexp"
|
||||
"strings"
|
||||
"sync"
|
||||
"github.com/rclone/rclone/lib/sync"
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/rclone/rclone/fs"
|
||||
|
||||
@@ -1,23 +0,0 @@
|
||||
//go:build !windows
|
||||
// +build !windows
|
||||
|
||||
package operations
|
||||
|
||||
import (
|
||||
"os"
|
||||
"os/exec"
|
||||
"syscall"
|
||||
)
|
||||
|
||||
func sendInterrupt() error {
|
||||
p, err := os.FindProcess(syscall.Getpid())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = p.Signal(os.Interrupt)
|
||||
return err
|
||||
}
|
||||
|
||||
func setupCmd(cmd *exec.Cmd) {
|
||||
// Only needed for windows
|
||||
}
|
||||
@@ -1,32 +0,0 @@
|
||||
//go:build windows
|
||||
// +build windows
|
||||
|
||||
package operations
|
||||
|
||||
import (
|
||||
"os/exec"
|
||||
"syscall"
|
||||
)
|
||||
|
||||
// Credit: https://github.com/golang/go/blob/6125d0c4265067cdb67af1340bf689975dd128f4/src/os/signal/signal_windows_test.go#L18
|
||||
func sendInterrupt() error {
|
||||
d, e := syscall.LoadDLL("kernel32.dll")
|
||||
if e != nil {
|
||||
return e
|
||||
}
|
||||
p, e := d.FindProc("GenerateConsoleCtrlEvent")
|
||||
if e != nil {
|
||||
return e
|
||||
}
|
||||
r, _, e := p.Call(syscall.CTRL_BREAK_EVENT, uintptr(syscall.Getpid()))
|
||||
if r == 0 {
|
||||
return e
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func setupCmd(cmd *exec.Cmd) {
|
||||
(*cmd).SysProcAttr = &syscall.SysProcAttr{
|
||||
CreationFlags: syscall.CREATE_NEW_PROCESS_GROUP,
|
||||
}
|
||||
}
|
||||
@@ -18,7 +18,7 @@ import (
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"github.com/rclone/rclone/lib/sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
@@ -33,7 +33,6 @@ import (
|
||||
"github.com/rclone/rclone/fs/object"
|
||||
"github.com/rclone/rclone/fs/walk"
|
||||
"github.com/rclone/rclone/lib/atexit"
|
||||
"github.com/rclone/rclone/lib/cacheroot"
|
||||
"github.com/rclone/rclone/lib/pacer"
|
||||
"github.com/rclone/rclone/lib/random"
|
||||
"github.com/rclone/rclone/lib/readers"
|
||||
@@ -365,11 +364,6 @@ func CommonHash(ctx context.Context, fa, fb fs.Info) (hash.Type, *fs.HashesOptio
|
||||
// be nil.
|
||||
func Copy(ctx context.Context, f fs.Fs, dst fs.Object, remote string, src fs.Object) (newDst fs.Object, err error) {
|
||||
ci := fs.GetConfig(ctx)
|
||||
var resumeOpt *fs.OptionResume
|
||||
if f.Features().Resume != nil {
|
||||
resumeOpt = createResumeOpt(ctx, f, remote, src)
|
||||
}
|
||||
|
||||
tr := accounting.Stats(ctx).NewTransfer(src)
|
||||
defer func() {
|
||||
tr.Done(ctx, err)
|
||||
@@ -411,7 +405,7 @@ func Copy(ctx context.Context, f fs.Fs, dst fs.Object, remote string, src fs.Obj
|
||||
if err == nil {
|
||||
dst = newDst
|
||||
in.ServerSideCopyEnd(dst.Size()) // account the bytes for the server-side transfer
|
||||
err = in.Close()
|
||||
_ = in.Close()
|
||||
} else {
|
||||
_ = in.Close()
|
||||
}
|
||||
@@ -467,10 +461,6 @@ func Copy(ctx context.Context, f fs.Fs, dst fs.Object, remote string, src fs.Obj
|
||||
wrappedSrc = NewOverrideRemote(src, remote)
|
||||
}
|
||||
options := []fs.OpenOption{hashOption}
|
||||
// Appends OptionResume if it was set
|
||||
if resumeOpt != nil {
|
||||
options = append(options, resumeOpt)
|
||||
}
|
||||
for _, option := range ci.UploadHeaders {
|
||||
options = append(options, option)
|
||||
}
|
||||
@@ -485,17 +475,6 @@ func Copy(ctx context.Context, f fs.Fs, dst fs.Object, remote string, src fs.Obj
|
||||
if err == nil {
|
||||
newDst = dst
|
||||
err = closeErr
|
||||
cacheParent := config.GetCacheDir()
|
||||
// Remove resume cache file (if one was created) when Put/Upload is successful
|
||||
cacheDir, _, err := cacheroot.CreateCacheRoot(cacheParent, f.Name(), f.Root(), "resume")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
cacheFile := filepath.Join(cacheDir, remote)
|
||||
removeErr := os.Remove(cacheFile)
|
||||
if err != nil && !os.IsNotExist(removeErr) {
|
||||
return nil, fmt.Errorf("failed to remove resume cache file after upload: %w", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -619,6 +598,8 @@ func Move(ctx context.Context, fdst fs.Fs, dst fs.Object, remote string, src fs.
|
||||
}
|
||||
}
|
||||
// Move dst <- src
|
||||
in := tr.Account(ctx, nil) // account the transfer
|
||||
in.ServerSideCopyStart()
|
||||
newDst, err = doMove(ctx, src, remote)
|
||||
switch err {
|
||||
case nil:
|
||||
@@ -627,13 +608,16 @@ func Move(ctx context.Context, fdst fs.Fs, dst fs.Object, remote string, src fs.
|
||||
} else {
|
||||
fs.Infof(src, "Moved (server-side)")
|
||||
}
|
||||
|
||||
in.ServerSideCopyEnd(newDst.Size()) // account the bytes for the server-side transfer
|
||||
_ = in.Close()
|
||||
return newDst, nil
|
||||
case fs.ErrorCantMove:
|
||||
fs.Debugf(src, "Can't move, switching to copy")
|
||||
_ = in.Close()
|
||||
default:
|
||||
err = fs.CountError(err)
|
||||
fs.Errorf(src, "Couldn't move: %v", err)
|
||||
_ = in.Close()
|
||||
return newDst, err
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,7 +4,7 @@ import (
|
||||
"context"
|
||||
"errors"
|
||||
"io"
|
||||
"sync"
|
||||
"github.com/rclone/rclone/lib/sync"
|
||||
|
||||
"github.com/rclone/rclone/fs"
|
||||
"github.com/rclone/rclone/fs/fserrors"
|
||||
|
||||
@@ -1,73 +0,0 @@
|
||||
package operations
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path/filepath"
|
||||
|
||||
"github.com/rclone/rclone/fs"
|
||||
"github.com/rclone/rclone/fs/config"
|
||||
"github.com/rclone/rclone/lib/cacheroot"
|
||||
)
|
||||
|
||||
// Creates an OptionResume that will be passed to Put/Upload
|
||||
func createResumeOpt(ctx context.Context, f fs.Fs, remote string, src fs.Object) (resumeOpt *fs.OptionResume) {
|
||||
ci := fs.GetConfig(ctx)
|
||||
cacheParent := config.GetCacheDir()
|
||||
resumeOpt = &fs.OptionResume{ID: "", Pos: 0, Src: src, F: f, Remote: remote, CacheCleaned: false, CacheDir: cacheParent}
|
||||
if ci.ResumeCutoff >= 0 {
|
||||
cacheDir, _, err := cacheroot.CreateCacheRoot(cacheParent, f.Name(), f.Root(), "resume")
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
cacheFile := filepath.Join(cacheDir, remote)
|
||||
resumeID, hashName, hashState, attemptResume := readResumeCache(ctx, f, src, cacheFile)
|
||||
if attemptResume {
|
||||
fs.Debugf(f, "Existing resume cache file found: %s. A resume will now be attempted.", cacheFile)
|
||||
position, resumeErr := f.Features().Resume(ctx, remote, resumeID, hashName, hashState)
|
||||
if resumeErr != nil {
|
||||
fs.Errorf(src, "Resume canceled: %v", resumeErr)
|
||||
} else if position > int64(ci.ResumeCutoff) {
|
||||
resumeOpt.Pos = position
|
||||
resumeOpt.Hash = hashName
|
||||
}
|
||||
}
|
||||
}
|
||||
return resumeOpt
|
||||
}
|
||||
|
||||
// readResumeCache checks to see if a resume ID has been cached for the source object.
|
||||
// If it finds one it returns it along with true to signal a resume can be attempted
|
||||
func readResumeCache(ctx context.Context, f fs.Fs, src fs.Object, cacheName string) (resumeID, hashName, hashState string, attemptResume bool) {
|
||||
existingCacheFile, statErr := os.Open(cacheName)
|
||||
defer func() {
|
||||
_ = existingCacheFile.Close()
|
||||
}()
|
||||
if !os.IsNotExist(statErr) {
|
||||
rawData, readErr := ioutil.ReadAll(existingCacheFile)
|
||||
if readErr == nil {
|
||||
existingFingerprint, resumeID, hashName, hashState, unmarshalErr := unmarshalResumeJSON(ctx, rawData)
|
||||
if unmarshalErr != nil {
|
||||
fs.Debugf(f, "Failed to unmarshal Resume JSON: %s. Resume will not be attempted.", unmarshalErr.Error())
|
||||
} else if existingFingerprint != "" {
|
||||
// Check if the src object has changed by comparing new Fingerprint to Fingerprint in cache file
|
||||
fingerprint := fs.Fingerprint(ctx, src, true)
|
||||
if existingFingerprint == fingerprint {
|
||||
return resumeID, hashName, hashState, true
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return "", "", "", false
|
||||
}
|
||||
|
||||
func unmarshalResumeJSON(ctx context.Context, data []byte) (fprint, id, hashName, hashState string, err error) {
|
||||
var resumedata fs.ResumeJSON
|
||||
err = json.Unmarshal(data, &resumedata)
|
||||
if err != nil {
|
||||
return "", "", "", "", err
|
||||
}
|
||||
return resumedata.Fingerprint, resumedata.ID, resumedata.HashName, resumedata.HashState, nil
|
||||
}
|
||||
@@ -1,163 +0,0 @@
|
||||
package operations
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"math/rand"
|
||||
"os"
|
||||
"os/exec"
|
||||
"runtime"
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/rclone/rclone/fs"
|
||||
"github.com/rclone/rclone/fstest"
|
||||
"github.com/rclone/rclone/fstest/mockobject"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
type interruptReader struct {
|
||||
once sync.Once
|
||||
r io.Reader
|
||||
}
|
||||
|
||||
// Read sends an OS specific interrupt signal and then reads 1 byte at a time
|
||||
func (r *interruptReader) Read(b []byte) (n int, err error) {
|
||||
r.once.Do(func() {
|
||||
_ = sendInterrupt()
|
||||
})
|
||||
buffer := make([]byte, 1)
|
||||
n, err = r.r.Read(buffer)
|
||||
b[0] = buffer[0]
|
||||
// Simulate duration of a larger read without needing to test with a large file
|
||||
// Allows for the interrupt to be handled before Copy completes
|
||||
time.Sleep(time.Microsecond * 10)
|
||||
return n, err
|
||||
}
|
||||
|
||||
// this is a wrapper for a mockobject with a custom Open function
|
||||
//
|
||||
// n indicates the number of bytes to read before sending an
|
||||
// interrupt signal
|
||||
type resumeTestObject struct {
|
||||
fs.Object
|
||||
n int64
|
||||
}
|
||||
|
||||
// Open opens the file for read. Call Close() on the returned io.ReadCloser
|
||||
//
|
||||
// The Reader will signal an interrupt after reading n bytes, then continue to read 1 byte at a time.
|
||||
// If TestResume is successful, the interrupt will be processed and reads will be cancelled before running
|
||||
// out of bytes to read
|
||||
func (o *resumeTestObject) Open(ctx context.Context, options ...fs.OpenOption) (io.ReadCloser, error) {
|
||||
rc, err := o.Object.Open(ctx, options...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
r := io.MultiReader(&io.LimitedReader{R: rc, N: o.n}, &interruptReader{r: rc})
|
||||
// Wrap with Close in a new readCloser
|
||||
rc = readCloser{Reader: r, Closer: rc}
|
||||
return rc, nil
|
||||
}
|
||||
|
||||
func makeContent(t *testing.T, size int) []byte {
|
||||
content := make([]byte, size)
|
||||
r := rand.New(rand.NewSource(42))
|
||||
_, err := io.ReadFull(r, content)
|
||||
assert.NoError(t, err)
|
||||
return content
|
||||
}
|
||||
|
||||
func TestResume(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
r := fstest.NewRun(t)
|
||||
defer r.Finalise()
|
||||
ci := fs.GetConfig(ctx)
|
||||
ci.ResumeCutoff = 0
|
||||
|
||||
// Contents for the mock object
|
||||
var (
|
||||
// Test contents must be large enough that io.Copy does not complete during the first Rclone Copy operation
|
||||
resumeTestContents = makeContent(t, 1024)
|
||||
expectedContents = resumeTestContents
|
||||
)
|
||||
|
||||
// Create mockobjects with given breaks
|
||||
createTestSrc := func(interrupt int64) (fs.Object, fs.Object) {
|
||||
srcOrig := mockobject.New("potato").WithContent(resumeTestContents, mockobject.SeekModeNone)
|
||||
srcOrig.SetFs(r.Flocal)
|
||||
src := &resumeTestObject{
|
||||
Object: srcOrig,
|
||||
n: interrupt,
|
||||
}
|
||||
return src, srcOrig
|
||||
}
|
||||
|
||||
checkContents := func(obj fs.Object, contents string) {
|
||||
assert.NotNil(t, obj)
|
||||
assert.Equal(t, int64(len(contents)), obj.Size())
|
||||
|
||||
r, err := obj.Open(ctx)
|
||||
assert.NoError(t, err)
|
||||
assert.NotNil(t, r)
|
||||
if r == nil {
|
||||
return
|
||||
}
|
||||
data, err := ioutil.ReadAll(r)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, contents, string(data))
|
||||
_ = r.Close()
|
||||
}
|
||||
|
||||
srcBreak, srcNoBreak := createTestSrc(2)
|
||||
|
||||
// Run first Copy only in a subprocess so that it can be interrupted without ending the test
|
||||
// adapted from: https://stackoverflow.com/questions/26225513/how-to-test-os-exit-scenarios-in-go
|
||||
if os.Getenv("RUNTEST") == "1" {
|
||||
remoteRoot := os.Getenv("REMOTEROOT")
|
||||
remoteFs, err := fs.NewFs(ctx, remoteRoot)
|
||||
require.NoError(t, err)
|
||||
_, _ = Copy(ctx, remoteFs, nil, "testdst", srcBreak)
|
||||
// This should never be reached as the subroutine should exit during Copy
|
||||
require.True(t, false, "Problem with test, first Copy operation should've been interrupted before completion")
|
||||
return
|
||||
}
|
||||
// Start the subprocess
|
||||
cmd := exec.Command(os.Args[0], "-test.run=TestResume")
|
||||
cmd.Env = append(os.Environ(), "RUNTEST=1", "REMOTEROOT="+r.Fremote.Root())
|
||||
cmd.Stdout = os.Stdout
|
||||
setupCmd(cmd)
|
||||
err := cmd.Run()
|
||||
|
||||
e, ok := err.(*exec.ExitError)
|
||||
|
||||
// Exit code after signal will be (128+signum) on Linux or (signum) on Windows
|
||||
expectedErrorString := "exit status 1"
|
||||
if runtime.GOOS == "windows" {
|
||||
expectedErrorString = "exit status 2"
|
||||
}
|
||||
assert.True(t, ok)
|
||||
assert.Contains(t, e.Error(), expectedErrorString)
|
||||
|
||||
var buf bytes.Buffer
|
||||
log.SetOutput(&buf)
|
||||
defer func() {
|
||||
log.SetOutput(os.Stderr)
|
||||
}()
|
||||
|
||||
// Start copy again, but with no breaks
|
||||
newDst, err := Copy(ctx, r.Fremote, nil, "testdst", srcNoBreak)
|
||||
assert.NoError(t, err)
|
||||
|
||||
// Checks to see if a resume was initiated
|
||||
// Resumed byte position can vary slightly depending how long it takes atexit to process the interrupt
|
||||
assert.True(t, strings.Contains(buf.String(), "Resuming at byte position: "), "The upload did not resume when restarted. Message: %q", buf.String())
|
||||
|
||||
checkContents(newDst, string(expectedContents))
|
||||
}
|
||||
@@ -7,7 +7,7 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"runtime/debug"
|
||||
"sync"
|
||||
"github.com/rclone/rclone/lib/sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user