1
0
mirror of https://github.com/rclone/rclone.git synced 2025-12-25 04:33:39 +00:00

Compare commits

..

30 Commits

Author SHA1 Message Date
Nick Craig-Wood
259d0bb8ce webdav: don't delete files on error - fixes #3263
Previous to this change rclone deleted a file if the upload failed
half way through.

After this change we only delete the upload if the
--webdav-delete-on-error flag is set.

We set this in the serve webdav tests to make them pass.
2021-01-18 16:23:19 +00:00
Nick Craig-Wood
cef51d58ac jottacloud: fix token refresh failed: is not a regular file error
Before this change the jottacloud token renewer would run and give the
error:

    Token refresh failed: is not a regular file

This is because the refresh runs on the root and it isn't a file.

This was fixed by ignoring that specific error.

See: https://forum.rclone.org/t/jottacloud-crypt-3-gb-copy-runs-for-a-week-without-completing/21173
2021-01-12 17:09:44 +00:00
Nick Craig-Wood
e0b5a13a13 jottacloud: fix token renewer to fix long uploads
See: https://forum.rclone.org/t/jottacloud-crypt-3-gb-copy-runs-for-a-week-without-completing/21173
2021-01-11 16:44:11 +00:00
Nick Craig-Wood
de21356154 Add Denis Neuling to contributors 2021-01-11 16:44:11 +00:00
Ivan Andreev
35a4de2030 chunker: fix case-insensitive NewObject, test metadata detection #4902
- fix test case FsNewObjectCaseInsensitive (PR #4830)
- continue PR #4917, add comments in metadata detection code
- add warning about metadata detection in user documentation
- change metadata size limits, make room for future development
- hide critical chunker parameters from command line
2021-01-10 22:29:24 +03:00
Ivan Andreev
847625822f chunker: improve detection of incompatible metadata #4917
Before this patch chunker required that there is at least one
data chunk to start checking for a composite object.

Now if chunker finds at least one potential temporary or control
chunk, it marks found files as a suspected composite object.
When later rclone tries a concrete operation on the object,
it performs postponed metadata read and decides: is this a native
composite object, incompatible metadata or just garbage.
2021-01-10 21:55:15 +03:00
Nick Craig-Wood
3877df4e62 s3: update help for --s3-no-check-bucket #4913 2021-01-10 17:54:19 +00:00
Denis Neuling
ec73d2fb9a azure-blob-storage: utilize streaming capabilities - #1614 2021-01-10 17:02:42 +00:00
Nick Craig-Wood
a7689d7023 rcserver: fix 500 error when marshalling errors from core/command
Before this change attempting to return an error from core/command
failed with a 500 error and a message about unmarshable types.

This is because it was attempting to marshal the input parameters
which get _response added to them which contains an unmarshalable
field.

This was fixed by using the original parameters in the error response
rather than the one modified during the error handling.

This also adds end to end tests for the streaming facilities as used
in core/command.
2021-01-10 16:34:46 +00:00
Nick Craig-Wood
847a44e7ad fs/rc: add Copy method to rc.Params 2021-01-10 16:34:46 +00:00
Nick Craig-Wood
b3710c962e rc: fix core/command giving 500 internal error - fixes #4914
Before this change calling core/command gave the error

    error: response object is required expecting *http.ResponseWriter value for key "_response" (was *http.response)

This was because the http.ResponseWriter is an interface not an object.

Removing the `*` fixes the problem.

This also indicates that this bit of code wasn't properly tested.
2021-01-10 16:34:46 +00:00
Nick Craig-Wood
35ccfe1721 Add kice to contributors 2021-01-10 16:34:46 +00:00
kice
ef2bfb9718 onedrive: Support addressing site by server-relative URL (#4761) 2021-01-09 03:26:42 +08:00
Nick Craig-Wood
a97effa27c build: add go1.16beta1 to the build matrix 2021-01-08 12:22:37 +00:00
Nick Craig-Wood
01adee7554 build: raise minimum go version to go1.12 2021-01-08 12:17:09 +00:00
Alex Chen
78a76b0d29 onedrive: remove % and # from the set of encoded characters (#4909)
onedrive: remove % and # from the set of encoded characters

This fixes #4700, fixes #4184, fixes #2920.
2021-01-08 12:07:17 +00:00
Nick Craig-Wood
e775328523 ftp,sftp: Make --tpslimit apply - fixes #4906 2021-01-08 10:29:57 +00:00
Nick Craig-Wood
50344e7792 accounting: factor --tpslimit code into accounting from fshttp 2021-01-08 10:29:57 +00:00
Nick Craig-Wood
d58fdb10db onedrive: enhance link creation with expiry, scope, type and password
This change makes the --expire flag in `rclone link` work.

It also adds the new flags

    --onedrive-link-type
    --onedrive-link-scope
    --onedrive-link-password

See: https://forum.rclone.org/t/create-share-link-within-the-organization-only/21498
2021-01-08 09:22:50 +00:00
Nick Craig-Wood
feaacfd226 hdfs: correct username parameter in integration tests 2021-01-08 09:05:25 +00:00
Nick Craig-Wood
e3c238ac95 build: add -buildmode to cross-compile.go
This builds on

768e4c4735 build: Temporary fix for Windows build errors

But passes the -buildmode flag down to the cross-compile.go command
too.
2021-01-08 08:58:50 +00:00
Nick Craig-Wood
752997c5e8 Add Yury Stankevich to contributors 2021-01-08 08:58:50 +00:00
Yury Stankevich
71edc75ca6 HDFS (Hadoop Distributed File System) implementation - #42
This includes an HDFS docker image to use with the integration tests.

Co-authored-by: Ivan Andreev <ivandeex@gmail.com>
Co-authored-by: Nick Craig-Wood <nick@craig-wood.com>
2021-01-07 09:48:51 +00:00
Ivan Andreev
768e4c4735 build: Temporary fix for Windows build errors
Applies a temporary fix similar to https://github.com/grafana/grafana/pull/28557
before go 1.15.6+ fixes https://github.com/golang/go/issues/40795
2021-01-07 09:48:51 +00:00
Nick Craig-Wood
c553ad5158 serve sftp: fix authentication on one connection blocking others - fixes #4882
Before this change, if one connection was authenticating this would
block any others from authenticating.

This was due to ssh.NewServerConn not being called in a go routine
after the Accept call.

This is fixed by running the ssh authentication in a go routine.

Thanks to @FiloSottile for advice on how to fix this.

See: https://github.com/golang/go/issues/43521
2021-01-06 15:34:07 +00:00
Alex Chen
c66b901320 onedrive: (business only) workaround to replace existing file on server-side copy (#4904) 2021-01-06 10:50:37 +08:00
Nick Craig-Wood
dd67a3d5f5 operations: add size if known to skipped items and JSON log - fixes #4624 2021-01-05 19:44:25 +00:00
Nick Craig-Wood
e972f2c98a log: make it easier to add parameters to JSON logging 2021-01-05 19:44:25 +00:00
Nick Craig-Wood
acbcb1ea9d Add Ilyess Bachiri to contributors 2021-01-05 19:44:25 +00:00
Nick Craig-Wood
d4444375ac Add Kerry Su to contributors 2021-01-05 19:44:25 +00:00
62 changed files with 1783 additions and 461 deletions

View File

@@ -19,7 +19,7 @@ jobs:
strategy:
fail-fast: false
matrix:
job_name: ['linux', 'mac', 'windows_amd64', 'windows_386', 'other_os', 'go1.11', 'go1.12', 'go1.13', 'go1.14']
job_name: ['linux', 'mac', 'windows_amd64', 'windows_386', 'other_os', 'go1.12', 'go1.13', 'go1.14', 'go1.16']
include:
- job_name: linux
@@ -46,6 +46,7 @@ jobs:
go: '1.15.x'
gotags: cmount
build_flags: '-include "^windows/amd64" -cgo'
build_args: '-buildmode exe'
quicktest: true
racequicktest: true
deploy: true
@@ -57,6 +58,7 @@ jobs:
goarch: '386'
cgo: '1'
build_flags: '-include "^windows/386" -cgo'
build_args: '-buildmode exe'
quicktest: true
deploy: true
@@ -67,11 +69,6 @@ jobs:
compile_all: true
deploy: true
- job_name: go1.11
os: ubuntu-latest
go: '1.11.x'
quicktest: true
- job_name: go1.12
os: ubuntu-latest
go: '1.12.x'
@@ -88,6 +85,12 @@ jobs:
quicktest: true
racequicktest: true
- job_name: go1.16
os: ubuntu-latest
go: '1.16.0-beta1'
quicktest: true
racequicktest: true
name: ${{ matrix.job_name }}
runs-on: ${{ matrix.os }}
@@ -109,6 +112,7 @@ jobs:
run: |
echo 'GOTAGS=${{ matrix.gotags }}' >> $GITHUB_ENV
echo 'BUILD_FLAGS=${{ matrix.build_flags }}' >> $GITHUB_ENV
echo 'BUILD_ARGS=${{ matrix.build_args }}' >> $GITHUB_ENV
if [[ "${{ matrix.goarch }}" != "" ]]; then echo 'GOARCH=${{ matrix.goarch }}' >> $GITHUB_ENV ; fi
if [[ "${{ matrix.cgo }}" != "" ]]; then echo 'CGO_ENABLED=${{ matrix.cgo }}' >> $GITHUB_ENV ; fi

View File

@@ -46,13 +46,13 @@ endif
.PHONY: rclone test_all vars version
rclone:
go build -v --ldflags "-s -X github.com/rclone/rclone/fs.Version=$(TAG)" $(BUILDTAGS)
go build -v --ldflags "-s -X github.com/rclone/rclone/fs.Version=$(TAG)" $(BUILDTAGS) $(BUILD_ARGS)
mkdir -p `go env GOPATH`/bin/
cp -av rclone`go env GOEXE` `go env GOPATH`/bin/rclone`go env GOEXE`.new
mv -v `go env GOPATH`/bin/rclone`go env GOEXE`.new `go env GOPATH`/bin/rclone`go env GOEXE`
test_all:
go install --ldflags "-s -X github.com/rclone/rclone/fs.Version=$(TAG)" $(BUILDTAGS) github.com/rclone/rclone/fstest/test_all
go install --ldflags "-s -X github.com/rclone/rclone/fs.Version=$(TAG)" $(BUILDTAGS) $(BUILD_ARGS) github.com/rclone/rclone/fstest/test_all
vars:
@echo SHELL="'$(SHELL)'"
@@ -188,10 +188,10 @@ upload_github:
./bin/upload-github $(TAG)
cross: doc
go run bin/cross-compile.go -release current $(BUILDTAGS) $(TAG)
go run bin/cross-compile.go -release current $(BUILDTAGS) $(BUILD_ARGS) $(TAG)
beta:
go run bin/cross-compile.go $(BUILDTAGS) $(TAG)
go run bin/cross-compile.go $(BUILDTAGS) $(BUILD_ARGS) $(TAG)
rclone -v copy build/ memstore:pub-rclone-org/$(TAG)
@echo Beta release ready at https://pub.rclone.org/$(TAG)/
@@ -199,7 +199,7 @@ log_since_last_release:
git log $(LAST_TAG)..
compile_all:
go run bin/cross-compile.go -compile-only $(BUILDTAGS) $(TAG)
go run bin/cross-compile.go -compile-only $(BUILDTAGS) $(BUILD_ARGS) $(TAG)
ci_upload:
sudo chown -R $$USER build
@@ -213,7 +213,7 @@ endif
ci_beta:
git log $(LAST_TAG).. > /tmp/git-log.txt
go run bin/cross-compile.go -release beta-latest -git-log /tmp/git-log.txt $(BUILD_FLAGS) $(BUILDTAGS) $(TAG)
go run bin/cross-compile.go -release beta-latest -git-log /tmp/git-log.txt $(BUILD_FLAGS) $(BUILDTAGS) $(BUILD_ARGS) $(TAG)
rclone --config bin/travis.rclone.conf -v copy --exclude '*beta-latest*' build/ $(BETA_UPLOAD)
ifeq ($(or $(BRANCH_PATH),$(RELEASE_TAG)),)
rclone --config bin/travis.rclone.conf -v copy --include '*beta-latest*' --include version.txt build/ $(BETA_UPLOAD_ROOT)$(BETA_SUBDIR)

View File

@@ -36,6 +36,7 @@ Rclone *("rsync for cloud storage")* is a command line program to sync files and
* Google Cloud Storage [:page_facing_up:](https://rclone.org/googlecloudstorage/)
* Google Drive [:page_facing_up:](https://rclone.org/drive/)
* Google Photos [:page_facing_up:](https://rclone.org/googlephotos/)
* HDFS (Hadoop Distributed Filesystem) [:page_facing_up:](https://rclone.org/hdfs/)
* HTTP [:page_facing_up:](https://rclone.org/http/)
* Hubic [:page_facing_up:](https://rclone.org/hubic/)
* Jottacloud [:page_facing_up:](https://rclone.org/jottacloud/)

View File

@@ -18,6 +18,7 @@ import (
_ "github.com/rclone/rclone/backend/ftp"
_ "github.com/rclone/rclone/backend/googlecloudstorage"
_ "github.com/rclone/rclone/backend/googlephotos"
_ "github.com/rclone/rclone/backend/hdfs"
_ "github.com/rclone/rclone/backend/http"
_ "github.com/rclone/rclone/backend/hubic"
_ "github.com/rclone/rclone/backend/jottacloud"

View File

@@ -5,9 +5,7 @@
package azureblob
import (
"bytes"
"context"
"crypto/md5"
"encoding/base64"
"encoding/hex"
"encoding/json"
@@ -26,7 +24,6 @@ import (
"github.com/Azure/go-autorest/autorest/adal"
"github.com/pkg/errors"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/accounting"
"github.com/rclone/rclone/fs/config"
"github.com/rclone/rclone/fs/config/configmap"
"github.com/rclone/rclone/fs/config/configstruct"
@@ -39,8 +36,6 @@ import (
"github.com/rclone/rclone/lib/env"
"github.com/rclone/rclone/lib/pacer"
"github.com/rclone/rclone/lib/pool"
"github.com/rclone/rclone/lib/readers"
"golang.org/x/sync/errgroup"
)
const (
@@ -51,15 +46,11 @@ const (
modTimeKey = "mtime"
timeFormatIn = time.RFC3339
timeFormatOut = "2006-01-02T15:04:05.000000000Z07:00"
maxTotalParts = 50000 // in multipart upload
storageDefaultBaseURL = "blob.core.windows.net"
// maxUncommittedSize = 9 << 30 // can't upload bigger than this
defaultChunkSize = 4 * fs.MebiByte
maxChunkSize = 100 * fs.MebiByte
defaultUploadCutoff = 256 * fs.MebiByte
maxUploadCutoff = 256 * fs.MebiByte
defaultAccessTier = azblob.AccessTierNone
maxTryTimeout = time.Hour * 24 * 365 //max time of an azure web request response window (whether or not data is flowing)
defaultChunkSize = 4 * fs.MebiByte
maxChunkSize = 100 * fs.MebiByte
defaultAccessTier = azblob.AccessTierNone
maxTryTimeout = time.Hour * 24 * 365 //max time of an azure web request response window (whether or not data is flowing)
// Default storage account, key and blob endpoint for emulator support,
// though it is a base64 key checked in here, it is publicly available secret.
emulatorAccount = "devstoreaccount1"
@@ -137,8 +128,7 @@ msi_client_id, or msi_mi_res_id parameters.`,
Advanced: true,
}, {
Name: "upload_cutoff",
Help: "Cutoff for switching to chunked upload (<= 256MB).",
Default: defaultUploadCutoff,
Help: "Cutoff for switching to chunked upload (<= 256MB). (Deprecated)",
Advanced: true,
}, {
Name: "chunk_size",
@@ -241,7 +231,6 @@ type Options struct {
MSIResourceID string `config:"msi_mi_res_id"`
Endpoint string `config:"endpoint"`
SASURL string `config:"sas_url"`
UploadCutoff fs.SizeSuffix `config:"upload_cutoff"`
ChunkSize fs.SizeSuffix `config:"chunk_size"`
ListChunkSize uint `config:"list_chunk"`
AccessTier string `config:"access_tier"`
@@ -397,21 +386,6 @@ func (f *Fs) setUploadChunkSize(cs fs.SizeSuffix) (old fs.SizeSuffix, err error)
return
}
func checkUploadCutoff(cs fs.SizeSuffix) error {
if cs > maxUploadCutoff {
return errors.Errorf("%v must be less than or equal to %v", cs, maxUploadCutoff)
}
return nil
}
func (f *Fs) setUploadCutoff(cs fs.SizeSuffix) (old fs.SizeSuffix, err error) {
err = checkUploadCutoff(cs)
if err == nil {
old, f.opt.UploadCutoff = f.opt.UploadCutoff, cs
}
return
}
// httpClientFactory creates a Factory object that sends HTTP requests
// to an rclone's http.Client.
//
@@ -506,10 +480,6 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e
return nil, err
}
err = checkUploadCutoff(opt.UploadCutoff)
if err != nil {
return nil, errors.Wrap(err, "azure: upload cutoff")
}
err = checkUploadChunkSize(opt.ChunkSize)
if err != nil {
return nil, errors.Wrap(err, "azure: chunk size")
@@ -1510,12 +1480,6 @@ func init() {
}
}
// readSeeker joins an io.Reader and an io.Seeker
type readSeeker struct {
io.Reader
io.Seeker
}
// increment the slice passed in as LSB binary
func increment(xs []byte) {
for i, digit := range xs {
@@ -1528,143 +1492,6 @@ func increment(xs []byte) {
}
}
var warnStreamUpload sync.Once
// uploadMultipart uploads a file using multipart upload
//
// Write a larger blob, using CreateBlockBlob, PutBlock, and PutBlockList.
func (o *Object) uploadMultipart(ctx context.Context, in io.Reader, size int64, blob *azblob.BlobURL, httpHeaders *azblob.BlobHTTPHeaders) (err error) {
// Calculate correct chunkSize
chunkSize := int64(o.fs.opt.ChunkSize)
totalParts := -1
// Note that the max size of file is 4.75 TB (100 MB X 50,000
// blocks) and this is bigger than the max uncommitted block
// size (9.52 TB) so we do not need to part commit block lists
// or garbage collect uncommitted blocks.
//
// See: https://docs.microsoft.com/en-gb/rest/api/storageservices/put-block
// size can be -1 here meaning we don't know the size of the incoming file. We use ChunkSize
// buffers here (default 4MB). With a maximum number of parts (50,000) this will be a file of
// 195GB which seems like a not too unreasonable limit.
if size == -1 {
warnStreamUpload.Do(func() {
fs.Logf(o, "Streaming uploads using chunk size %v will have maximum file size of %v",
o.fs.opt.ChunkSize, fs.SizeSuffix(chunkSize*maxTotalParts))
})
} else {
// Adjust partSize until the number of parts is small enough.
if size/chunkSize >= maxTotalParts {
// Calculate partition size rounded up to the nearest MB
chunkSize = (((size / maxTotalParts) >> 20) + 1) << 20
}
if chunkSize > int64(maxChunkSize) {
return errors.Errorf("can't upload as it is too big %v - takes more than %d chunks of %v", fs.SizeSuffix(size), totalParts, fs.SizeSuffix(chunkSize/2))
}
totalParts = int(size / chunkSize)
if size%chunkSize != 0 {
totalParts++
}
}
fs.Debugf(o, "Multipart upload session started for %d parts of size %v", totalParts, fs.SizeSuffix(chunkSize))
// unwrap the accounting from the input, we use wrap to put it
// back on after the buffering
in, wrap := accounting.UnWrap(in)
// Upload the chunks
var (
g, gCtx = errgroup.WithContext(ctx)
remaining = size // remaining size in file for logging only, -1 if size < 0
position = int64(0) // position in file
memPool = o.fs.getMemoryPool(chunkSize) // pool to get memory from
finished = false // set when we have read EOF
blocks []string // list of blocks for finalize
blockBlobURL = blob.ToBlockBlobURL() // Get BlockBlobURL, we will use default pipeline here
ac = azblob.LeaseAccessConditions{} // Use default lease access conditions
binaryBlockID = make([]byte, 8) // block counter as LSB first 8 bytes
)
for part := 0; !finished; part++ {
// Get a block of memory from the pool and a token which limits concurrency
o.fs.uploadToken.Get()
buf := memPool.Get()
free := func() {
memPool.Put(buf) // return the buf
o.fs.uploadToken.Put() // return the token
}
// Fail fast, in case an errgroup managed function returns an error
// gCtx is cancelled. There is no point in uploading all the other parts.
if gCtx.Err() != nil {
free()
break
}
// Read the chunk
n, err := readers.ReadFill(in, buf) // this can never return 0, nil
if err == io.EOF {
if n == 0 { // end if no data
free()
break
}
finished = true
} else if err != nil {
free()
return errors.Wrap(err, "multipart upload failed to read source")
}
buf = buf[:n]
// increment the blockID and save the blocks for finalize
increment(binaryBlockID)
blockID := base64.StdEncoding.EncodeToString(binaryBlockID)
blocks = append(blocks, blockID)
// Transfer the chunk
fs.Debugf(o, "Uploading part %d/%d offset %v/%v part size %v", part+1, totalParts, fs.SizeSuffix(position), fs.SizeSuffix(size), fs.SizeSuffix(chunkSize))
g.Go(func() (err error) {
defer free()
// Upload the block, with MD5 for check
md5sum := md5.Sum(buf)
transactionalMD5 := md5sum[:]
err = o.fs.pacer.Call(func() (bool, error) {
bufferReader := bytes.NewReader(buf)
wrappedReader := wrap(bufferReader)
rs := readSeeker{wrappedReader, bufferReader}
_, err = blockBlobURL.StageBlock(ctx, blockID, &rs, ac, transactionalMD5, azblob.ClientProvidedKeyOptions{})
return o.fs.shouldRetry(err)
})
if err != nil {
return errors.Wrap(err, "multipart upload failed to upload part")
}
return nil
})
// ready for next block
if size >= 0 {
remaining -= chunkSize
}
position += chunkSize
}
err = g.Wait()
if err != nil {
return err
}
// Finalise the upload session
err = o.fs.pacer.Call(func() (bool, error) {
_, err := blockBlobURL.CommitBlockList(ctx, blocks, *httpHeaders, o.meta, azblob.BlobAccessConditions{}, azblob.AccessTierType(o.fs.opt.AccessTier), nil, azblob.ClientProvidedKeyOptions{})
return o.fs.shouldRetry(err)
})
if err != nil {
return errors.Wrap(err, "multipart upload failed to finalize")
}
return nil
}
// Update the object with the contents of the io.Reader, modTime and size
//
// The new object may have been created if an error is returned
@@ -1685,7 +1512,7 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
if err != nil {
return err
}
size := src.Size()
// Update Mod time
o.updateMetadataWithModTime(src.ModTime(ctx))
if err != nil {
@@ -1695,10 +1522,9 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
blob := o.getBlobReference()
httpHeaders := azblob.BlobHTTPHeaders{}
httpHeaders.ContentType = fs.MimeType(ctx, src)
// Compute the Content-MD5 of the file, for multiparts uploads it
// Compute the Content-MD5 of the file. As we stream all uploads it
// will be set in PutBlockList API call using the 'x-ms-blob-content-md5' header
// Note: If multipart, an MD5 checksum will also be computed for each uploaded block
// in order to validate its integrity during transport
if !o.fs.opt.DisableCheckSum {
if sourceMD5, _ := src.Hash(ctx, hash.MD5); sourceMD5 != "" {
sourceMD5bytes, err := hex.DecodeString(sourceMD5)
@@ -1716,26 +1542,12 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
Metadata: o.meta,
BlobHTTPHeaders: httpHeaders,
}
// FIXME Until https://github.com/Azure/azure-storage-blob-go/pull/75
// is merged the SDK can't upload a single blob of exactly the chunk
// size, so upload with a multipart upload to work around.
// See: https://github.com/rclone/rclone/issues/2653
multipartUpload := size < 0 || size >= int64(o.fs.opt.UploadCutoff)
if size == int64(o.fs.opt.ChunkSize) {
multipartUpload = true
fs.Debugf(o, "Setting multipart upload for file of chunk size (%d) to work around SDK bug", size)
}
// Don't retry, return a retry error instead
err = o.fs.pacer.CallNoRetry(func() (bool, error) {
if multipartUpload {
// If a large file upload in chunks
err = o.uploadMultipart(ctx, in, size, &blob, &httpHeaders)
} else {
// Write a small blob in one transaction
blockBlobURL := blob.ToBlockBlobURL()
_, err = azblob.UploadStreamToBlockBlob(ctx, in, blockBlobURL, putBlobOptions)
}
// Stream contents of the reader object to the given blob URL
blockBlobURL := blob.ToBlockBlobURL()
_, err = azblob.UploadStreamToBlockBlob(ctx, in, blockBlobURL, putBlobOptions)
return o.fs.shouldRetry(err)
})
if err != nil {

View File

@@ -29,13 +29,8 @@ func (f *Fs) SetUploadChunkSize(cs fs.SizeSuffix) (fs.SizeSuffix, error) {
return f.setUploadChunkSize(cs)
}
func (f *Fs) SetUploadCutoff(cs fs.SizeSuffix) (fs.SizeSuffix, error) {
return f.setUploadCutoff(cs)
}
var (
_ fstests.SetUploadChunkSizer = (*Fs)(nil)
_ fstests.SetUploadCutoffer = (*Fs)(nil)
)
// TestServicePrincipalFileSuccess checks that, given a proper JSON file, we can create a token.

View File

@@ -97,7 +97,8 @@ var (
//
// And still chunker's primary function is to chunk large files
// rather than serve as a generic metadata container.
const maxMetadataSize = 255
const maxMetadataSize = 1023
const maxMetadataSizeWritten = 255
// Current/highest supported metadata format.
const metadataVersion = 1
@@ -121,6 +122,8 @@ const maxTransactionProbes = 100
// standard chunker errors
var (
ErrChunkOverflow = errors.New("chunk number overflow")
ErrMetaTooBig = errors.New("metadata is too big")
ErrMetaUnknown = errors.New("unknown metadata, please upgrade rclone")
)
// variants of baseMove's parameter delMode
@@ -150,6 +153,7 @@ Normally should contain a ':' and a path, e.g. "myremote:path/to/dir",
}, {
Name: "name_format",
Advanced: true,
Hide: fs.OptionHideCommandLine,
Default: `*.rclone_chunk.###`,
Help: `String format of chunk file names.
The two placeholders are: base file name (*) and chunk number (#...).
@@ -160,12 +164,14 @@ Possible chunk files are ignored if their name does not match given format.`,
}, {
Name: "start_from",
Advanced: true,
Hide: fs.OptionHideCommandLine,
Default: 1,
Help: `Minimum valid chunk number. Usually 0 or 1.
By default chunk numbers start from 1.`,
}, {
Name: "meta_format",
Advanced: true,
Hide: fs.OptionHideCommandLine,
Default: "simplejson",
Help: `Format of the metadata object or "none". By default "simplejson".
Metadata is a small JSON file named after the composite file.`,
@@ -693,43 +699,50 @@ func (f *Fs) processEntries(ctx context.Context, origEntries fs.DirEntries, dirP
switch entry := dirOrObject.(type) {
case fs.Object:
remote := entry.Remote()
if mainRemote, chunkNo, ctrlType, xactID := f.parseChunkName(remote); mainRemote != "" {
if xactID != "" {
if revealHidden {
fs.Infof(f, "ignore temporary chunk %q", remote)
}
break
mainRemote, chunkNo, ctrlType, xactID := f.parseChunkName(remote)
if mainRemote == "" {
// this is meta object or standalone file
object := f.newObject("", entry, nil)
byRemote[remote] = object
tempEntries = append(tempEntries, object)
break
}
// this is some kind of chunk
// metobject should have been created above if present
isSpecial := xactID != "" || ctrlType != ""
mainObject := byRemote[mainRemote]
if mainObject == nil && f.useMeta && !isSpecial {
fs.Debugf(f, "skip orphan data chunk %q", remote)
break
}
if mainObject == nil && !f.useMeta {
// this is the "nometa" case
// create dummy chunked object without metadata
mainObject = f.newObject(mainRemote, nil, nil)
byRemote[mainRemote] = mainObject
if !badEntry[mainRemote] {
tempEntries = append(tempEntries, mainObject)
}
if ctrlType != "" {
if revealHidden {
fs.Infof(f, "ignore control chunk %q", remote)
}
break
}
if isSpecial {
if revealHidden {
fs.Infof(f, "ignore non-data chunk %q", remote)
}
mainObject := byRemote[mainRemote]
if mainObject == nil && f.useMeta {
fs.Debugf(f, "skip chunk %q without meta object", remote)
break
}
if mainObject == nil {
// useMeta is false - create chunked object without metadata
mainObject = f.newObject(mainRemote, nil, nil)
byRemote[mainRemote] = mainObject
if !badEntry[mainRemote] {
tempEntries = append(tempEntries, mainObject)
}
}
if err := mainObject.addChunk(entry, chunkNo); err != nil {
if f.opt.FailHard {
return nil, err
}
badEntry[mainRemote] = true
// need to read metadata to ensure actual object type
// no need to read if metaobject is too big or absent,
// use the fact that before calling validate()
// the `size` field caches metaobject size, if any
if f.useMeta && mainObject != nil && mainObject.size <= maxMetadataSize {
mainObject.unsure = true
}
break
}
object := f.newObject("", entry, nil)
byRemote[remote] = object
tempEntries = append(tempEntries, object)
if err := mainObject.addChunk(entry, chunkNo); err != nil {
if f.opt.FailHard {
return nil, err
}
badEntry[mainRemote] = true
}
case fs.Directory:
isSubdir[entry.Remote()] = true
wrapDir := fs.NewDirCopy(ctx, entry)
@@ -784,14 +797,22 @@ func (f *Fs) processEntries(ctx context.Context, origEntries fs.DirEntries, dirP
// but opening even a small file can be slow on some backends.
//
func (f *Fs) NewObject(ctx context.Context, remote string) (fs.Object, error) {
return f.scanObject(ctx, remote, false)
}
// scanObject is like NewObject with optional quick scan mode.
// The quick mode avoids directory requests other than `List`,
// ignores non-chunked objects and skips chunk size checks.
func (f *Fs) scanObject(ctx context.Context, remote string, quickScan bool) (fs.Object, error) {
if err := f.forbidChunk(false, remote); err != nil {
return nil, errors.Wrap(err, "can't access")
}
var (
o *Object
baseObj fs.Object
err error
o *Object
baseObj fs.Object
err error
sameMain bool
)
if f.useMeta {
@@ -805,6 +826,7 @@ func (f *Fs) NewObject(ctx context.Context, remote string) (fs.Object, error) {
// as a hard limit. Anything larger than that is treated as a
// non-chunked file without even checking its contents, so it's
// paramount to prevent metadata from exceeding the maximum size.
// Anything smaller is additionally checked for format.
o = f.newObject("", baseObj, nil)
if o.size > maxMetadataSize {
return o, nil
@@ -834,18 +856,34 @@ func (f *Fs) NewObject(ctx context.Context, remote string) (fs.Object, error) {
return nil, errors.Wrap(err, "can't detect composite file")
}
caseInsensitive := f.features.CaseInsensitive
for _, dirOrObject := range entries {
entry, ok := dirOrObject.(fs.Object)
if !ok {
continue
}
entryRemote := entry.Remote()
if !strings.Contains(entryRemote, remote) {
if !caseInsensitive && !strings.Contains(entryRemote, remote) {
continue // bypass regexp to save cpu
}
mainRemote, chunkNo, ctrlType, xactID := f.parseChunkName(entryRemote)
if mainRemote == "" || mainRemote != remote || ctrlType != "" || xactID != "" {
continue // skip non-conforming, temporary and control chunks
if mainRemote == "" {
continue // skip non-chunks
}
if caseInsensitive {
sameMain = strings.EqualFold(mainRemote, remote)
} else {
sameMain = mainRemote == remote
}
if !sameMain {
continue // skip alien chunks
}
if ctrlType != "" || xactID != "" {
if f.useMeta {
// temporary/control chunk calls for lazy metadata read
o.unsure = true
}
continue
}
//fs.Debugf(f, "%q belongs to %q as chunk %d", entryRemote, mainRemote, chunkNo)
if err := o.addChunk(entry, chunkNo); err != nil {
@@ -855,7 +893,7 @@ func (f *Fs) NewObject(ctx context.Context, remote string) (fs.Object, error) {
if o.main == nil && (o.chunks == nil || len(o.chunks) == 0) {
// Scanning hasn't found data chunks with conforming names.
if f.useMeta {
if f.useMeta || quickScan {
// Metadata is required but absent and there are no chunks.
return nil, fs.ErrorObjectNotFound
}
@@ -878,23 +916,48 @@ func (f *Fs) NewObject(ctx context.Context, remote string) (fs.Object, error) {
// file without metadata. Validate it and update the total data size.
// As an optimization, skip metadata reading here - we will call
// readMetadata lazily when needed (reading can be expensive).
if err := o.validate(); err != nil {
return nil, err
if !quickScan {
if err := o.validate(); err != nil {
return nil, err
}
}
return o, nil
}
// readMetadata reads composite object metadata and caches results,
// in case of critical errors metadata is not cached.
// Returns ErrMetaUnknown if an unsupported metadata format is detected.
// If object is not chunked but marked by List or NewObject for recheck,
// readMetadata will attempt to parse object as composite with fallback
// to non-chunked representation if the attempt fails.
func (o *Object) readMetadata(ctx context.Context) error {
// return quickly if metadata is absent or has been already cached
if !o.f.useMeta {
o.isFull = true
}
if o.isFull {
return nil
}
if !o.isComposite() || !o.f.useMeta {
if !o.isComposite() && !o.unsure {
// this for sure is a non-chunked standalone file
o.isFull = true
return nil
}
// validate metadata
metaObject := o.main
if metaObject.Size() > maxMetadataSize {
if o.unsure {
// this is not metadata but a foreign object
o.unsure = false
o.chunks = nil // make isComposite return false
o.isFull = true // cache results
return nil
}
return ErrMetaTooBig
}
// size is within limits, perform consistency checks
reader, err := metaObject.Open(ctx)
if err != nil {
return err
@@ -907,8 +970,22 @@ func (o *Object) readMetadata(ctx context.Context) error {
switch o.f.opt.MetaFormat {
case "simplejson":
metaInfo, err := unmarshalSimpleJSON(ctx, metaObject, metadata, true)
if err != nil {
metaInfo, madeByChunker, err := unmarshalSimpleJSON(ctx, metaObject, metadata)
if o.unsure {
o.unsure = false
if !madeByChunker {
// this is not metadata but a foreign object
o.chunks = nil // make isComposite return false
o.isFull = true // cache results
return nil
}
}
switch err {
case nil:
// fall thru
case ErrMetaTooBig, ErrMetaUnknown:
return err // return these errors unwrapped for unit tests
default:
return errors.Wrap(err, "invalid metadata")
}
if o.size != metaInfo.Size() || len(o.chunks) != metaInfo.nChunks {
@@ -918,12 +995,36 @@ func (o *Object) readMetadata(ctx context.Context) error {
o.sha1 = metaInfo.sha1
}
o.isFull = true
o.isFull = true // cache results
return nil
}
// put implements Put, PutStream, PutUnchecked, Update
func (f *Fs) put(ctx context.Context, in io.Reader, src fs.ObjectInfo, remote string, options []fs.OpenOption, basePut putFn) (obj fs.Object, err error) {
func (f *Fs) put(
ctx context.Context, in io.Reader, src fs.ObjectInfo, remote string, options []fs.OpenOption,
basePut putFn, action string, target fs.Object) (obj fs.Object, err error) {
// Perform consistency checks
if err := f.forbidChunk(src, remote); err != nil {
return nil, errors.Wrap(err, action+" refused")
}
if target == nil {
// Get target object with a quick directory scan
// skip metadata check if target object does not exist.
// ignore not-chunked objects, skip chunk size checks.
if obj, err := f.scanObject(ctx, remote, true); err == nil {
target = obj
}
}
if target != nil {
obj := target.(*Object)
if err := obj.readMetadata(ctx); err == ErrMetaUnknown {
// refuse to update a file of unsupported format
return nil, errors.Wrap(err, "refusing to "+action)
}
}
// Prepare to upload
c := f.newChunkingReader(src)
wrapIn := c.wrapStream(ctx, in, src)
@@ -1013,8 +1114,8 @@ func (f *Fs) put(ctx context.Context, in io.Reader, src fs.ObjectInfo, remote st
// Check for input that looks like valid metadata
needMeta := len(c.chunks) > 1
if c.readCount <= maxMetadataSize && len(c.chunks) == 1 {
_, err := unmarshalSimpleJSON(ctx, c.chunks[0], c.smallHead, false)
needMeta = err == nil
_, madeByChunker, _ := unmarshalSimpleJSON(ctx, c.chunks[0], c.smallHead)
needMeta = madeByChunker
}
// Finalize small object as non-chunked.
@@ -1273,29 +1374,16 @@ func (f *Fs) removeOldChunks(ctx context.Context, remote string) {
// will return the object and the error, otherwise will return
// nil and the error
func (f *Fs) Put(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
if err := f.forbidChunk(src, src.Remote()); err != nil {
return nil, errors.Wrap(err, "refusing to put")
}
return f.put(ctx, in, src, src.Remote(), options, f.base.Put)
return f.put(ctx, in, src, src.Remote(), options, f.base.Put, "put", nil)
}
// PutStream uploads to the remote path with the modTime given of indeterminate size
func (f *Fs) PutStream(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
if err := f.forbidChunk(src, src.Remote()); err != nil {
return nil, errors.Wrap(err, "refusing to upload")
}
return f.put(ctx, in, src, src.Remote(), options, f.base.Features().PutStream)
return f.put(ctx, in, src, src.Remote(), options, f.base.Features().PutStream, "upload", nil)
}
// Update in to the object with the modTime given of the given size
func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error {
if err := o.f.forbidChunk(o, o.Remote()); err != nil {
return errors.Wrap(err, "update refused")
}
if err := o.readMetadata(ctx); err != nil {
// refuse to update a file of unsupported format
return errors.Wrap(err, "refusing to update")
}
basePut := o.f.base.Put
if src.Size() < 0 {
basePut = o.f.base.Features().PutStream
@@ -1303,7 +1391,7 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
return errors.New("wrapped file system does not support streaming uploads")
}
}
oNew, err := o.f.put(ctx, in, src, o.Remote(), options, basePut)
oNew, err := o.f.put(ctx, in, src, o.Remote(), options, basePut, "update", o)
if err == nil {
*o = *oNew.(*Object)
}
@@ -1417,7 +1505,7 @@ func (o *Object) Remove(ctx context.Context) (err error) {
// to corrupt file in hard mode. Hence, refuse to Remove, too.
return errors.Wrap(err, "refuse to corrupt")
}
if err := o.readMetadata(ctx); err != nil {
if err := o.readMetadata(ctx); err == ErrMetaUnknown {
// Proceed but warn user that unexpected things can happen.
fs.Errorf(o, "Removing a file with unsupported metadata: %v", err)
}
@@ -1445,6 +1533,11 @@ func (f *Fs) copyOrMove(ctx context.Context, o *Object, remote string, do copyMo
if err := f.forbidChunk(o, remote); err != nil {
return nil, errors.Wrapf(err, "can't %s", opName)
}
if err := o.readMetadata(ctx); err != nil {
// Refuse to copy/move composite files with invalid or future
// metadata format which might involve unsupported chunk types.
return nil, errors.Wrapf(err, "can't %s this file", opName)
}
if !o.isComposite() {
fs.Debugf(o, "%s non-chunked object...", opName)
oResult, err := do(ctx, o.mainChunk(), remote) // chain operation to a single wrapped chunk
@@ -1453,11 +1546,6 @@ func (f *Fs) copyOrMove(ctx context.Context, o *Object, remote string, do copyMo
}
return f.newObject("", oResult, nil), nil
}
if err := o.readMetadata(ctx); err != nil {
// Refuse to copy/move composite files with invalid or future
// metadata format which might involve unsupported chunk types.
return nil, errors.Wrapf(err, "can't %s this file", opName)
}
fs.Debugf(o, "%s %d data chunks...", opName, len(o.chunks))
mainRemote := o.remote
@@ -1539,6 +1627,8 @@ func (f *Fs) okForServerSide(ctx context.Context, src fs.Object, opName string)
diff = "chunk sizes"
case f.opt.NameFormat != obj.f.opt.NameFormat:
diff = "chunk name formats"
case f.opt.StartFrom != obj.f.opt.StartFrom:
diff = "chunk numbering"
case f.opt.MetaFormat != obj.f.opt.MetaFormat:
diff = "meta formats"
}
@@ -1548,6 +1638,10 @@ func (f *Fs) okForServerSide(ctx context.Context, src fs.Object, opName string)
return
}
if obj.unsure {
// ensure object is composite if need to re-read metadata
_ = obj.readMetadata(ctx)
}
requireMetaHash := obj.isComposite() && f.opt.MetaFormat == "simplejson"
if !requireMetaHash && !f.hashAll {
ok = true // hash is not required for metadata
@@ -1741,6 +1835,7 @@ type Object struct {
chunks []fs.Object // active data chunks if file is composite, or wrapped file as a single chunk if meta format is 'none'
size int64 // cached total size of chunks in a composite file or -1 for non-chunked files
isFull bool // true if metadata has been read
unsure bool // true if need to read metadata to detect object type
md5 string
sha1 string
f *Fs
@@ -1762,6 +1857,9 @@ func (o *Object) addChunk(chunk fs.Object, chunkNo int) error {
copy(newChunks, o.chunks)
o.chunks = newChunks
}
if o.chunks[chunkNo] != nil {
return fmt.Errorf("duplicate chunk number %d", chunkNo+o.f.opt.StartFrom)
}
o.chunks[chunkNo] = chunk
return nil
}
@@ -1891,15 +1989,16 @@ func (o *Object) SetModTime(ctx context.Context, mtime time.Time) error {
// on the level of wrapped remote but chunker is unaware of that.
//
func (o *Object) Hash(ctx context.Context, hashType hash.Type) (string, error) {
if err := o.readMetadata(ctx); err != nil {
return "", err // valid metadata is required to get hash, abort
}
if !o.isComposite() {
// First, chain to the wrapped non-chunked file if possible.
if value, err := o.mainChunk().Hash(ctx, hashType); err == nil && value != "" {
return value, nil
}
}
if err := o.readMetadata(ctx); err != nil {
return "", err // valid metadata is required to get hash, abort
}
// Try hash from metadata if the file is composite or if wrapped remote fails.
switch hashType {
case hash.MD5:
@@ -1924,13 +2023,13 @@ func (o *Object) UnWrap() fs.Object {
// Open opens the file for read. Call Close() on the returned io.ReadCloser
func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (rc io.ReadCloser, err error) {
if !o.isComposite() {
return o.mainChunk().Open(ctx, options...) // chain to wrapped non-chunked file
}
if err := o.readMetadata(ctx); err != nil {
// refuse to open unsupported format
return nil, errors.Wrap(err, "can't open")
}
if !o.isComposite() {
return o.mainChunk().Open(ctx, options...) // chain to wrapped non-chunked file
}
var openOptions []fs.OpenOption
var offset, limit int64 = 0, -1
@@ -2188,72 +2287,74 @@ func marshalSimpleJSON(ctx context.Context, size int64, nChunks int, md5, sha1 s
SHA1: sha1,
}
data, err := json.Marshal(&metadata)
if err == nil && data != nil && len(data) >= maxMetadataSize {
if err == nil && data != nil && len(data) >= maxMetadataSizeWritten {
// be a nitpicker, never produce something you can't consume
return nil, errors.New("metadata can't be this big, please report to rclone developers")
}
return data, err
}
// unmarshalSimpleJSON
// unmarshalSimpleJSON parses metadata.
//
// In case of errors returns a flag telling whether input has been
// produced by incompatible version of rclone vs wasn't metadata at all.
// Only metadata format version 1 is supported atm.
// Future releases will transparently migrate older metadata objects.
// New format will have a higher version number and cannot be correctly
// handled by current implementation.
// The version check below will then explicitly ask user to upgrade rclone.
//
func unmarshalSimpleJSON(ctx context.Context, metaObject fs.Object, data []byte, strictChecks bool) (info *ObjectInfo, err error) {
func unmarshalSimpleJSON(ctx context.Context, metaObject fs.Object, data []byte) (info *ObjectInfo, madeByChunker bool, err error) {
// Be strict about JSON format
// to reduce possibility that a random small file resembles metadata.
if data != nil && len(data) > maxMetadataSize {
return nil, errors.New("too big")
if data != nil && len(data) > maxMetadataSizeWritten {
return nil, false, ErrMetaTooBig
}
if data == nil || len(data) < 2 || data[0] != '{' || data[len(data)-1] != '}' {
return nil, errors.New("invalid json")
return nil, false, errors.New("invalid json")
}
var metadata metaSimpleJSON
err = json.Unmarshal(data, &metadata)
if err != nil {
return nil, err
return nil, false, err
}
// Basic fields are strictly required
// to reduce possibility that a random small file resembles metadata.
if metadata.Version == nil || metadata.Size == nil || metadata.ChunkNum == nil {
return nil, errors.New("missing required field")
return nil, false, errors.New("missing required field")
}
// Perform strict checks, avoid corruption of future metadata formats.
if *metadata.Version < 1 {
return nil, errors.New("wrong version")
return nil, false, errors.New("wrong version")
}
if *metadata.Size < 0 {
return nil, errors.New("negative file size")
return nil, false, errors.New("negative file size")
}
if *metadata.ChunkNum < 0 {
return nil, errors.New("negative number of chunks")
return nil, false, errors.New("negative number of chunks")
}
if *metadata.ChunkNum > maxSafeChunkNumber {
return nil, ErrChunkOverflow
return nil, true, ErrChunkOverflow // produced by incompatible version of rclone
}
if metadata.MD5 != "" {
_, err = hex.DecodeString(metadata.MD5)
if len(metadata.MD5) != 32 || err != nil {
return nil, errors.New("wrong md5 hash")
return nil, false, errors.New("wrong md5 hash")
}
}
if metadata.SHA1 != "" {
_, err = hex.DecodeString(metadata.SHA1)
if len(metadata.SHA1) != 40 || err != nil {
return nil, errors.New("wrong sha1 hash")
return nil, false, errors.New("wrong sha1 hash")
}
}
// ChunkNum is allowed to be 0 in future versions
if *metadata.ChunkNum < 1 && *metadata.Version <= metadataVersion {
return nil, errors.New("wrong number of chunks")
return nil, false, errors.New("wrong number of chunks")
}
// Non-strict mode also accepts future metadata versions
if *metadata.Version > metadataVersion && strictChecks {
return nil, fmt.Errorf("version %d is not supported, please upgrade rclone", metadata.Version)
if *metadata.Version > metadataVersion {
return nil, true, ErrMetaUnknown // produced by incompatible version of rclone
}
var nilFs *Fs // nil object triggers appropriate type method
@@ -2261,7 +2362,7 @@ func unmarshalSimpleJSON(ctx context.Context, metaObject fs.Object, data []byte,
info.nChunks = *metadata.ChunkNum
info.md5 = metadata.MD5
info.sha1 = metadata.SHA1
return info, nil
return info, true, nil
}
func silentlyRemove(ctx context.Context, o fs.Object) {

View File

@@ -13,6 +13,7 @@ import (
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/hash"
"github.com/rclone/rclone/fs/object"
"github.com/rclone/rclone/fs/operations"
"github.com/rclone/rclone/fstest"
"github.com/rclone/rclone/fstest/fstests"
@@ -663,6 +664,80 @@ func testMetadataInput(t *testing.T, f *Fs) {
runSubtest(futureMeta, "future")
}
// test that chunker refuses to change on objects with future/unknowm metadata
func testFutureProof(t *testing.T, f *Fs) {
if f.opt.MetaFormat == "none" {
t.Skip("this test requires metadata support")
}
saveOpt := f.opt
ctx := context.Background()
f.opt.FailHard = true
const dir = "future"
const file = dir + "/test"
defer func() {
f.opt.FailHard = false
_ = operations.Purge(ctx, f.base, dir)
f.opt = saveOpt
}()
modTime := fstest.Time("2001-02-03T04:05:06.499999999Z")
putPart := func(name string, part int, data, msg string) {
if part > 0 {
name = f.makeChunkName(name, part-1, "", "")
}
item := fstest.Item{Path: name, ModTime: modTime}
_, obj := fstests.PutTestContents(ctx, t, f.base, &item, data, true)
assert.NotNil(t, obj, msg)
}
// simulate chunked object from future
meta := `{"ver":999,"nchunks":3,"size":9,"garbage":"litter","sha1":"0707f2970043f9f7c22029482db27733deaec029"}`
putPart(file, 0, meta, "metaobject")
putPart(file, 1, "abc", "chunk1")
putPart(file, 2, "def", "chunk2")
putPart(file, 3, "ghi", "chunk3")
// List should succeed
ls, err := f.List(ctx, dir)
assert.NoError(t, err)
assert.Equal(t, 1, len(ls))
assert.Equal(t, int64(9), ls[0].Size())
// NewObject should succeed
obj, err := f.NewObject(ctx, file)
assert.NoError(t, err)
assert.Equal(t, file, obj.Remote())
assert.Equal(t, int64(9), obj.Size())
// Hash must fail
_, err = obj.Hash(ctx, hash.SHA1)
assert.Equal(t, ErrMetaUnknown, err)
// Move must fail
mobj, err := operations.Move(ctx, f, nil, file+"2", obj)
assert.Nil(t, mobj)
assert.Error(t, err)
if err != nil {
assert.Contains(t, err.Error(), "please upgrade rclone")
}
// Put must fail
oi := object.NewStaticObjectInfo(file, modTime, 3, true, nil, nil)
buf := bytes.NewBufferString("abc")
_, err = f.Put(ctx, buf, oi)
assert.Error(t, err)
// Rcat must fail
in := ioutil.NopCloser(bytes.NewBufferString("abc"))
robj, err := operations.Rcat(ctx, f, file, in, modTime)
assert.Nil(t, robj)
assert.NotNil(t, err)
if err != nil {
assert.Contains(t, err.Error(), "please upgrade rclone")
}
}
// InternalTest dispatches all internal tests
func (f *Fs) InternalTest(t *testing.T) {
t.Run("PutLarge", func(t *testing.T) {
@@ -686,6 +761,9 @@ func (f *Fs) InternalTest(t *testing.T) {
t.Run("MetadataInput", func(t *testing.T) {
testMetadataInput(t, f)
})
t.Run("FutureProof", func(t *testing.T) {
testFutureProof(t, f)
})
}
var _ fstests.InternalTester = (*Fs)(nil)

View File

@@ -15,6 +15,7 @@ import (
"github.com/jlaffaye/ftp"
"github.com/pkg/errors"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/accounting"
"github.com/rclone/rclone/fs/config"
"github.com/rclone/rclone/fs/config/configmap"
"github.com/rclone/rclone/fs/config/configstruct"
@@ -258,6 +259,7 @@ func (f *Fs) getFtpConnection(ctx context.Context) (c *ftp.ServerConn, err error
if f.opt.Concurrency > 0 {
f.tokens.Get()
}
accounting.LimitTPS(ctx)
f.poolMu.Lock()
if len(f.pool) > 0 {
c = f.pool[0]

257
backend/hdfs/fs.go Normal file
View File

@@ -0,0 +1,257 @@
// +build !plan9
package hdfs
import (
"context"
"fmt"
"io"
"os"
"path"
"time"
"github.com/colinmarc/hdfs/v2"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/config/configmap"
"github.com/rclone/rclone/fs/config/configstruct"
"github.com/rclone/rclone/fs/hash"
)
// Fs represents a HDFS server
type Fs struct {
name string
root string
features *fs.Features // optional features
opt Options // options for this backend
ci *fs.ConfigInfo // global config
client *hdfs.Client
}
// NewFs constructs an Fs from the path
func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, error) {
opt := new(Options)
err := configstruct.Set(m, opt)
if err != nil {
return nil, err
}
client, err := hdfs.NewClient(hdfs.ClientOptions{
Addresses: []string{opt.Namenode},
User: opt.Username,
UseDatanodeHostname: false,
})
if err != nil {
return nil, err
}
f := &Fs{
name: name,
root: root,
opt: *opt,
ci: fs.GetConfig(ctx),
client: client,
}
f.features = (&fs.Features{
CanHaveEmptyDirectories: true,
}).Fill(ctx, f)
info, err := f.client.Stat(f.realpath(""))
if err == nil && !info.IsDir() {
f.root = path.Dir(f.root)
return f, fs.ErrorIsFile
}
return f, nil
}
// Name of this fs
func (f *Fs) Name() string {
return f.name
}
// Root of the remote (as passed into NewFs)
func (f *Fs) Root() string {
return f.root
}
// String returns a description of the FS
func (f *Fs) String() string {
return fmt.Sprintf("hdfs://%s", f.opt.Namenode)
}
// Features returns the optional features of this Fs
func (f *Fs) Features() *fs.Features {
return f.features
}
// Precision return the precision of this Fs
func (f *Fs) Precision() time.Duration {
return time.Second
}
// Hashes are not supported
func (f *Fs) Hashes() hash.Set {
return hash.Set(hash.None)
}
// NewObject finds file at remote or return fs.ErrorObjectNotFound
func (f *Fs) NewObject(ctx context.Context, remote string) (fs.Object, error) {
realpath := f.realpath(remote)
fs.Debugf(f, "new [%s]", realpath)
info, err := f.ensureFile(realpath)
if err != nil {
return nil, err
}
return &Object{
fs: f,
remote: remote,
size: info.Size(),
modTime: info.ModTime(),
}, nil
}
// List the objects and directories in dir into entries.
func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err error) {
realpath := f.realpath(dir)
fs.Debugf(f, "list [%s]", realpath)
err = f.ensureDirectory(realpath)
if err != nil {
return nil, err
}
list, err := f.client.ReadDir(realpath)
if err != nil {
return nil, err
}
for _, x := range list {
stdName := f.opt.Enc.ToStandardName(x.Name())
remote := path.Join(dir, stdName)
if x.IsDir() {
entries = append(entries, fs.NewDir(remote, x.ModTime()))
} else {
entries = append(entries, &Object{
fs: f,
remote: remote,
size: x.Size(),
modTime: x.ModTime()})
}
}
return entries, nil
}
// Put the object
func (f *Fs) Put(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
o := &Object{
fs: f,
remote: src.Remote(),
}
err := o.Update(ctx, in, src, options...)
return o, err
}
// PutStream uploads to the remote path with the modTime given of indeterminate size
func (f *Fs) PutStream(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
return f.Put(ctx, in, src, options...)
}
// Mkdir makes a directory
func (f *Fs) Mkdir(ctx context.Context, dir string) error {
fs.Debugf(f, "mkdir [%s]", f.realpath(dir))
return f.client.MkdirAll(f.realpath(dir), 0755)
}
// Rmdir deletes the directory
func (f *Fs) Rmdir(ctx context.Context, dir string) error {
realpath := f.realpath(dir)
fs.Debugf(f, "rmdir [%s]", realpath)
err := f.ensureDirectory(realpath)
if err != nil {
return err
}
// do not remove empty directory
list, err := f.client.ReadDir(realpath)
if err != nil {
return err
}
if len(list) > 0 {
return fs.ErrorDirectoryNotEmpty
}
return f.client.Remove(realpath)
}
// Purge deletes all the files in the directory
func (f *Fs) Purge(ctx context.Context, dir string) error {
realpath := f.realpath(dir)
fs.Debugf(f, "purge [%s]", realpath)
err := f.ensureDirectory(realpath)
if err != nil {
return err
}
return f.client.RemoveAll(realpath)
}
// About gets quota information from the Fs
func (f *Fs) About(ctx context.Context) (*fs.Usage, error) {
info, err := f.client.StatFs()
if err != nil {
return nil, err
}
return &fs.Usage{
Total: fs.NewUsageValue(int64(info.Capacity)),
Used: fs.NewUsageValue(int64(info.Used)),
Free: fs.NewUsageValue(int64(info.Remaining)),
}, nil
}
func (f *Fs) ensureDirectory(realpath string) error {
info, err := f.client.Stat(realpath)
if e, ok := err.(*os.PathError); ok && e.Err == os.ErrNotExist {
return fs.ErrorDirNotFound
}
if err != nil {
return err
}
if !info.IsDir() {
return fs.ErrorDirNotFound
}
return nil
}
func (f *Fs) ensureFile(realpath string) (os.FileInfo, error) {
info, err := f.client.Stat(realpath)
if e, ok := err.(*os.PathError); ok && e.Err == os.ErrNotExist {
return nil, fs.ErrorObjectNotFound
}
if err != nil {
return nil, err
}
if info.IsDir() {
return nil, fs.ErrorObjectNotFound
}
return info, nil
}
func (f *Fs) realpath(dir string) string {
return f.opt.Enc.FromStandardPath(xPath(f.Root(), dir))
}
// Check the interfaces are satisfied
var (
_ fs.Fs = (*Fs)(nil)
_ fs.Purger = (*Fs)(nil)
_ fs.PutStreamer = (*Fs)(nil)
_ fs.Abouter = (*Fs)(nil)
)

58
backend/hdfs/hdfs.go Normal file
View File

@@ -0,0 +1,58 @@
// +build !plan9
package hdfs
import (
"path"
"strings"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/config"
"github.com/rclone/rclone/lib/encoder"
)
func init() {
fsi := &fs.RegInfo{
Name: "hdfs",
Description: "Hadoop distributed file system",
NewFs: NewFs,
Options: []fs.Option{{
Name: "namenode",
Help: "hadoop name node and port",
Required: true,
Examples: []fs.OptionExample{{
Value: "namenode:8020",
Help: "Connect to host namenode at port 8020",
}},
}, {
Name: "username",
Help: "hadoop user name",
Required: false,
Examples: []fs.OptionExample{{
Value: "root",
Help: "Connect to hdfs as root",
}},
}, {
Name: config.ConfigEncoding,
Help: config.ConfigEncodingHelp,
Advanced: true,
Default: (encoder.Display | encoder.EncodeInvalidUtf8 | encoder.EncodeColon),
}},
}
fs.Register(fsi)
}
// Options for this backend
type Options struct {
Namenode string `config:"namenode"`
Username string `config:"username"`
Enc encoder.MultiEncoder `config:"encoding"`
}
// xPath make correct file path with leading '/'
func xPath(root string, tail string) string {
if !strings.HasPrefix(root, "/") {
root = "/" + root
}
return path.Join(root, tail)
}

20
backend/hdfs/hdfs_test.go Normal file
View File

@@ -0,0 +1,20 @@
// Test HDFS filesystem interface
// +build !plan9
package hdfs_test
import (
"testing"
"github.com/rclone/rclone/backend/hdfs"
"github.com/rclone/rclone/fstest/fstests"
)
// TestIntegration runs integration tests against the remote
func TestIntegration(t *testing.T) {
fstests.Run(t, &fstests.Opt{
RemoteName: "TestHdfs:",
NilObject: (*hdfs.Object)(nil),
})
}

View File

@@ -0,0 +1,6 @@
// Build for hdfs for unsupported platforms to stop go complaining
// about "no buildable Go source files "
// +build plan9
package hdfs

177
backend/hdfs/object.go Normal file
View File

@@ -0,0 +1,177 @@
// +build !plan9
package hdfs
import (
"context"
"io"
"path"
"time"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/hash"
"github.com/rclone/rclone/lib/readers"
)
// Object describes an HDFS file
type Object struct {
fs *Fs
remote string
size int64
modTime time.Time
}
// Fs returns the parent Fs
func (o *Object) Fs() fs.Info {
return o.fs
}
// Remote returns the remote path
func (o *Object) Remote() string {
return o.remote
}
// Size returns the size of an object in bytes
func (o *Object) Size() int64 {
return o.size
}
// ModTime returns the modification time of the object
func (o *Object) ModTime(ctx context.Context) time.Time {
return o.modTime
}
// SetModTime sets the modification time of the local fs object
func (o *Object) SetModTime(ctx context.Context, modTime time.Time) error {
realpath := o.fs.realpath(o.Remote())
err := o.fs.client.Chtimes(realpath, modTime, modTime)
if err != nil {
return err
}
o.modTime = modTime
return nil
}
// Storable returns whether this object is storable
func (o *Object) Storable() bool {
return true
}
// Return a string version
func (o *Object) String() string {
if o == nil {
return "<nil>"
}
return o.Remote()
}
// Hash is not supported
func (o *Object) Hash(ctx context.Context, r hash.Type) (string, error) {
return "", hash.ErrUnsupported
}
// Open an object for read
func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (in io.ReadCloser, err error) {
realpath := o.realpath()
fs.Debugf(o.fs, "open [%s]", realpath)
f, err := o.fs.client.Open(realpath)
if err != nil {
return nil, err
}
var offset, limit int64 = 0, -1
for _, option := range options {
switch x := option.(type) {
case *fs.SeekOption:
offset = x.Offset
case *fs.RangeOption:
offset, limit = x.Decode(o.Size())
}
}
_, err = f.Seek(offset, io.SeekStart)
if err != nil {
return nil, err
}
if limit != -1 {
in = readers.NewLimitedReadCloser(f, limit)
} else {
in = f
}
return in, err
}
// Update object
func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error {
realpath := o.fs.realpath(src.Remote())
dirname := path.Dir(realpath)
fs.Debugf(o.fs, "update [%s]", realpath)
err := o.fs.client.MkdirAll(dirname, 755)
if err != nil {
return err
}
info, err := o.fs.client.Stat(realpath)
if err == nil {
err = o.fs.client.Remove(realpath)
if err != nil {
return err
}
}
out, err := o.fs.client.Create(realpath)
if err != nil {
return err
}
cleanup := func() {
rerr := o.fs.client.Remove(realpath)
if rerr != nil {
fs.Errorf(o.fs, "failed to remove [%v]: %v", realpath, rerr)
}
}
_, err = io.Copy(out, in)
if err != nil {
cleanup()
return err
}
err = out.Close()
if err != nil {
cleanup()
return err
}
info, err = o.fs.client.Stat(realpath)
if err != nil {
return err
}
err = o.SetModTime(ctx, src.ModTime(ctx))
if err != nil {
return err
}
o.size = info.Size()
return nil
}
// Remove an object
func (o *Object) Remove(ctx context.Context) error {
realpath := o.fs.realpath(o.remote)
fs.Debugf(o.fs, "remove [%s]", realpath)
return o.fs.client.Remove(realpath)
}
func (o *Object) realpath() string {
return o.fs.opt.Enc.FromStandardPath(xPath(o.Fs().Root(), o.remote))
}
// Check the interfaces are satisfied
var (
_ fs.Object = (*Object)(nil)
)

View File

@@ -727,6 +727,9 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e
// Renew the token in the background
f.tokenRenewer = oauthutil.NewRenew(f.String(), ts, func() error {
_, err := f.readMetaDataForPath(ctx, "")
if err == fs.ErrorNotAFile {
err = nil
}
return err
})
@@ -1463,6 +1466,8 @@ func readMD5(in io.Reader, size, threshold int64) (md5sum string, out io.Reader,
//
// The new object may have been created if an error is returned
func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (err error) {
o.fs.tokenRenewer.Start()
defer o.fs.tokenRenewer.Stop()
size := src.Size()
md5String, err := src.Hash(ctx, hash.MD5)
if err != nil || md5String == "" {

View File

@@ -253,8 +253,10 @@ type MoveItemRequest struct {
//CreateShareLinkRequest is the request to create a sharing link
//Always Type:view and Scope:anonymous for public sharing
type CreateShareLinkRequest struct {
Type string `json:"type"` //Link type in View, Edit or Embed
Scope string `json:"scope,omitempty"` //Optional. Scope in anonymous, organization
Type string `json:"type"` // Link type in View, Edit or Embed
Scope string `json:"scope,omitempty"` // Scope in anonymous, organization
Password string `json:"password,omitempty"` // The password of the sharing link that is set by the creator. Optional and OneDrive Personal only.
Expiry *time.Time `json:"expirationDateTime,omitempty"` // A String with format of yyyy-MM-ddTHH:mm:ssZ of DateTime indicates the expiration time of the permission.
}
//CreateShareLinkResponse is the response from CreateShareLinkRequest

View File

@@ -12,6 +12,7 @@ import (
"log"
"net/http"
"path"
"regexp"
"strconv"
"strings"
"sync"
@@ -120,9 +121,18 @@ func init() {
var opts rest.Opts
var finalDriveID string
var siteID string
var relativePath string
switch config.Choose("Your choice",
[]string{"onedrive", "sharepoint", "driveid", "siteid", "search"},
[]string{"OneDrive Personal or Business", "Root Sharepoint site", "Type in driveID", "Type in SiteID", "Search a Sharepoint site"},
[]string{"onedrive", "sharepoint", "url", "search", "driveid", "siteid", "path"},
[]string{
"OneDrive Personal or Business",
"Root Sharepoint site",
"Sharepoint site name or URL (e.g. mysite or https://contoso.sharepoint.com/sites/mysite)",
"Search for a Sharepoint site",
"Type in driveID (advanced)",
"Type in SiteID (advanced)",
"Sharepoint server-relative path (advanced, e.g. /teams/hr)",
},
false) {
case "onedrive":
@@ -143,6 +153,20 @@ func init() {
case "siteid":
fmt.Printf("Paste your Site ID here> ")
siteID = config.ReadLine()
case "url":
fmt.Println("Example: \"https://contoso.sharepoint.com/sites/mysite\" or \"mysite\"")
fmt.Printf("Paste your Site URL here> ")
siteURL := config.ReadLine()
re := regexp.MustCompile(`https://.*\.sharepoint.com/sites/(.*)`)
match := re.FindStringSubmatch(siteURL)
if len(match) == 2 {
relativePath = "/sites/" + match[1]
} else {
relativePath = "/sites/" + siteURL
}
case "path":
fmt.Printf("Enter server-relative URL here> ")
relativePath = config.ReadLine()
case "search":
fmt.Printf("What to search for> ")
searchTerm := config.ReadLine()
@@ -169,6 +193,21 @@ func init() {
}
}
// if we use server-relative URL for finding the drive
if relativePath != "" {
opts = rest.Opts{
Method: "GET",
RootURL: graphURL,
Path: "/sites/root:" + relativePath,
}
site := siteResource{}
_, err := srv.CallJSON(ctx, &opts, nil, &site)
if err != nil {
log.Fatalf("Failed to query available site by relative path: %v", err)
}
siteID = site.SiteID
}
// if we have a siteID we need to ask for the drives
if siteID != "" {
opts = rest.Opts{
@@ -295,6 +334,41 @@ modification time and removes all but the last version.
**NB** Onedrive personal can't currently delete versions so don't use
this flag there.
`,
Advanced: true,
}, {
Name: "link_scope",
Default: "anonymous",
Help: `Set the scope of the links created by the link command.`,
Advanced: true,
Examples: []fs.OptionExample{{
Value: "anonymous",
Help: "Anyone with the link has access, without needing to sign in. This may include people outside of your organization. Anonymous link support may be disabled by an administrator.",
}, {
Value: "organization",
Help: "Anyone signed into your organization (tenant) can use the link to get access. Only available in OneDrive for Business and SharePoint.",
}},
}, {
Name: "link_type",
Default: "view",
Help: `Set the type of the links created by the link command.`,
Advanced: true,
Examples: []fs.OptionExample{{
Value: "view",
Help: "Creates a read-only link to the item.",
}, {
Value: "edit",
Help: "Creates a read-write link to the item.",
}, {
Value: "embed",
Help: "Creates an embeddable link to the item.",
}},
}, {
Name: "link_password",
Default: "",
Help: `Set the password for links created by the link command.
At the time of writing this only works with OneDrive personal paid accounts.
`,
Advanced: true,
}, {
@@ -310,8 +384,6 @@ this flag there.
// | (vertical line) -> '' // FULLWIDTH VERTICAL LINE
// ? (question mark) -> '' // FULLWIDTH QUESTION MARK
// * (asterisk) -> '' // FULLWIDTH ASTERISK
// # (number sign) -> '' // FULLWIDTH NUMBER SIGN
// % (percent sign) -> '' // FULLWIDTH PERCENT SIGN
//
// Folder names cannot begin with a tilde ('~')
// List of replaced characters:
@@ -336,7 +408,6 @@ this flag there.
// https://docs.microsoft.com/en-us/onedrive/developer/rest-api/concepts/addressing-driveitems?view=odsp-graph-online#path-encoding
Default: (encoder.Display |
encoder.EncodeBackSlash |
encoder.EncodeHashPercent |
encoder.EncodeLeftSpace |
encoder.EncodeLeftTilde |
encoder.EncodeRightPeriod |
@@ -355,6 +426,9 @@ type Options struct {
ExposeOneNoteFiles bool `config:"expose_onenote_files"`
ServerSideAcrossConfigs bool `config:"server_side_across_configs"`
NoVersions bool `config:"no_versions"`
LinkScope string `config:"link_scope"`
LinkType string `config:"link_type"`
LinkPassword string `config:"link_password"`
Enc encoder.MultiEncoder `config:"encoding"`
}
@@ -1063,7 +1137,8 @@ func (f *Fs) Copy(ctx context.Context, src fs.Object, remote string) (fs.Object,
}
// Copy the object
opts := newOptsCall(srcObj.id, "POST", "/copy")
// The query param is a workaround for OneDrive Business for #4590
opts := newOptsCall(srcObj.id, "POST", "/copy?@microsoft.graph.conflictBehavior=replace")
opts.ExtraHeaders = map[string]string{"Prefer": "respond-async"}
opts.NoResponse = true
@@ -1302,8 +1377,14 @@ func (f *Fs) PublicLink(ctx context.Context, remote string, expire fs.Duration,
opts := newOptsCall(info.GetID(), "POST", "/createLink")
share := api.CreateShareLinkRequest{
Type: "view",
Scope: "anonymous",
Type: f.opt.LinkType,
Scope: f.opt.LinkScope,
Password: f.opt.LinkPassword,
}
if expire < fs.Duration(time.Hour*24*365*100) {
expiry := time.Now().Add(time.Duration(expire))
share.Expiry = &expiry
}
var resp *http.Response

View File

@@ -1181,6 +1181,10 @@ In Ceph, this can be increased with the "rgw list buckets max chunk" option.
This can be useful when trying to minimise the number of transactions
rclone does if you know the bucket exists already.
It can also be needed if the user you are using does not have bucket
creation permissions. Before v1.52.0 this would have passed silently
due to a bug.
`,
Default: false,
Advanced: true,

View File

@@ -21,6 +21,7 @@ import (
"github.com/pkg/errors"
"github.com/pkg/sftp"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/accounting"
"github.com/rclone/rclone/fs/config"
"github.com/rclone/rclone/fs/config/configmap"
"github.com/rclone/rclone/fs/config/configstruct"
@@ -349,6 +350,7 @@ func (f *Fs) newSftpClient(conn *ssh.Client, opts ...sftp.ClientOption) (*sftp.C
// Get an SFTP connection from the pool, or open a new one
func (f *Fs) getSftpConnection(ctx context.Context) (c *conn, err error) {
accounting.LimitTPS(ctx)
f.poolMu.Lock()
for len(f.pool) > 0 {
c = f.pool[0]

View File

@@ -86,6 +86,15 @@ func init() {
Name: "bearer_token_command",
Help: "Command to run to get a bearer token",
Advanced: true,
}, {
Name: "delete_on_error",
Default: false,
Help: `Delete a partially uploaded file on upload failure.
Some webdav backends (eg rclone serve webdav) leave behind half
written files on error. This flag causes them to be deleted if the
upload fails part of the way through.`,
Advanced: true,
}},
})
}
@@ -98,6 +107,7 @@ type Options struct {
Pass string `config:"pass"`
BearerToken string `config:"bearer_token"`
BearerTokenCommand string `config:"bearer_token_command"`
DeleteOnError bool `config:"delete_on_error"`
}
// Fs represents a remote webdav
@@ -1193,14 +1203,16 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
return o.fs.shouldRetry(resp, err)
})
if err != nil {
// Give the WebDAV server a chance to get its internal state in order after the
// error. The error may have been local in which case we closed the connection.
// The server may still be dealing with it for a moment. A sleep isn't ideal but I
// haven't been able to think of a better method to find out if the server has
// finished - ncw
time.Sleep(1 * time.Second)
// Remove failed upload
_ = o.Remove(ctx)
if o.fs.opt.DeleteOnError {
// Give the WebDAV server a chance to get its internal state in order after the
// error. The error may have been local in which case we closed the connection.
// The server may still be dealing with it for a moment. A sleep isn't ideal but I
// haven't been able to think of a better method to find out if the server has
// finished - ncw
time.Sleep(1 * time.Second)
// Remove failed upload
_ = o.Remove(ctx)
}
return err
}
// read metadata from remote

View File

@@ -36,6 +36,7 @@ var (
cgo = flag.Bool("cgo", false, "Use cgo for the build")
noClean = flag.Bool("no-clean", false, "Don't clean the build directory before running.")
tags = flag.String("tags", "", "Space separated list of build tags")
buildmode = flag.String("buildmode", "", "Passed to go build -buildmode flag")
compileOnly = flag.Bool("compile-only", false, "Just build the binary, not the zip.")
)
@@ -300,8 +301,15 @@ func compileArch(version, goos, goarch, dir string) bool {
"-trimpath",
"-o", output,
"-tags", *tags,
"..",
}
if *buildmode != "" {
args = append(args,
"-buildmode", *buildmode,
)
}
args = append(args,
"..",
)
env := []string{
"GOOS=" + goos,
"GOARCH=" + stripVersion(goarch),

View File

@@ -42,6 +42,7 @@ docs = [
"googlecloudstorage.md",
"drive.md",
"googlephotos.md",
"hdfs.md",
"http.md",
"hubic.md",
"jottacloud.md",

View File

@@ -78,6 +78,39 @@ func (s *server) getVFS(what string, sshConn *ssh.ServerConn) (VFS *vfs.VFS) {
return VFS
}
// Accept a single connection - run in a go routine as the ssh
// authentication can block
func (s *server) acceptConnection(nConn net.Conn) {
what := describeConn(nConn)
// Before use, a handshake must be performed on the incoming net.Conn.
sshConn, chans, reqs, err := ssh.NewServerConn(nConn, s.config)
if err != nil {
fs.Errorf(what, "SSH login failed: %v", err)
return
}
fs.Infof(what, "SSH login from %s using %s", sshConn.User(), sshConn.ClientVersion())
// Discard all global out-of-band Requests
go ssh.DiscardRequests(reqs)
c := &conn{
what: what,
vfs: s.getVFS(what, sshConn),
}
if c.vfs == nil {
fs.Infof(what, "Closing unauthenticated connection (couldn't find VFS)")
_ = nConn.Close()
return
}
c.handlers = newVFSHandler(c.vfs)
// Accept all channels
go c.handleChannels(chans)
}
// Accept connections and call them in a go routine
func (s *server) acceptConnections() {
for {
nConn, err := s.listener.Accept()
@@ -88,33 +121,7 @@ func (s *server) acceptConnections() {
fs.Errorf(nil, "Failed to accept incoming connection: %v", err)
continue
}
what := describeConn(nConn)
// Before use, a handshake must be performed on the incoming net.Conn.
sshConn, chans, reqs, err := ssh.NewServerConn(nConn, s.config)
if err != nil {
fs.Errorf(what, "SSH login failed: %v", err)
continue
}
fs.Infof(what, "SSH login from %s using %s", sshConn.User(), sshConn.ClientVersion())
// Discard all global out-of-band Requests
go ssh.DiscardRequests(reqs)
c := &conn{
what: what,
vfs: s.getVFS(what, sshConn),
}
if c.vfs == nil {
fs.Infof(what, "Closing unauthenticated connection (couldn't find VFS)")
_ = nConn.Close()
continue
}
c.handlers = newVFSHandler(c.vfs)
// Accept all channels
go c.handleChannels(chans)
go s.acceptConnection(nConn)
}
}

View File

@@ -62,11 +62,12 @@ func TestWebDav(t *testing.T) {
// Config for the backend we'll use to connect to the server
config := configmap.Simple{
"type": "webdav",
"vendor": "other",
"url": w.Server.URL(),
"user": testUser,
"pass": obscure.MustObscure(testPass),
"type": "webdav",
"vendor": "other",
"url": w.Server.URL(),
"user": testUser,
"pass": obscure.MustObscure(testPass),
"delete_on_error": "true",
}
return config, func() {

View File

@@ -120,6 +120,7 @@ WebDAV or S3, that work out of the box.)
{{< provider name="Google Cloud Storage" home="https://cloud.google.com/storage/" config="/googlecloudstorage/" >}}
{{< provider name="Google Drive" home="https://www.google.com/drive/" config="/drive/" >}}
{{< provider name="Google Photos" home="https://www.google.com/photos/about/" config="/googlephotos/" >}}
{{< provider name="HDFS" home="https://hadoop.apache.org/" config="/hdfs/" >}}
{{< provider name="HTTP" home="https://en.wikipedia.org/wiki/Hypertext_Transfer_Protocol" config="/http/" >}}
{{< provider name="Hubic" home="https://hubic.com/" config="/hubic/" >}}
{{< provider name="Jottacloud" home="https://www.jottacloud.com/en/" config="/jottacloud/" >}}

View File

@@ -440,3 +440,8 @@ put them back in again.` >}}
* Claudio Bantaloukas <rockdreamer@gmail.com>
* Benjamin Gustin <gustin.ben@gmail.com>
* Ingo Weiss <ingo@redhat.com>
* Kerry Su <me@sshockwave.net>
* Ilyess Bachiri <ilyess.bachiri@sonder.com>
* Yury Stankevich <urykhy@gmail.com>
* kice <wslikerqs@gmail.com>
* Denis Neuling <denisneuling@gmail.com>

View File

@@ -146,27 +146,6 @@ Container level SAS URLs are useful for temporarily allowing third
parties access to a single container or putting credentials into an
untrusted environment such as a CI build server.
### Multipart uploads ###
Rclone supports multipart uploads with Azure Blob storage. Files
bigger than 256MB will be uploaded using chunked upload by default.
The files will be uploaded in parallel in 4MB chunks (by default).
Note that these chunks are buffered in memory and there may be up to
`--transfers` of them being uploaded at once.
Files can't be split into more than 50,000 chunks so by default, so
the largest file that can be uploaded with 4MB chunk size is 195GB.
Above this rclone will double the chunk size until it creates less
than 50,000 chunks. By default this will mean a maximum file size of
3.2TB can be uploaded. This can be raised to 5TB using
`--azureblob-chunk-size 100M`.
Note that rclone doesn't commit the block list until the end of the
upload which means that there is a limit of 9.5TB of multipart uploads
in progress as Azure won't allow more than that amount of uncommitted
blocks.
{{< rem autogenerated options start" - DO NOT EDIT - instead edit fs.RegInfo in backend/azureblob/azureblob.go then run make backenddocs" >}}
### Standard Options
@@ -223,15 +202,6 @@ Leave blank normally.
- Type: string
- Default: ""
#### --azureblob-upload-cutoff
Cutoff for switching to chunked upload (<= 256MB).
- Config: upload_cutoff
- Env Var: RCLONE_AZUREBLOB_UPLOAD_CUTOFF
- Type: SizeSuffix
- Default: 256M
#### --azureblob-chunk-size
Upload chunk size (<= 100MB).

View File

@@ -299,6 +299,9 @@ If wrapped remote is case insensitive, the chunker overlay will inherit
that property (so you can't have a file called "Hello.doc" and "hello.doc"
in the same directory).
Chunker included in rclone releases up to `v1.54` can sometimes fail to
detect metadata produced by recent versions of rclone. We recommend users
to keep rclone up-to-date to avoid data corruption.
{{< rem autogenerated options start" - DO NOT EDIT - instead edit fs.RegInfo in backend/chunker/chunker.go then run make backenddocs" >}}
### Standard Options

View File

@@ -36,6 +36,7 @@ See the following for detailed instructions for
* [Google Cloud Storage](/googlecloudstorage/)
* [Google Drive](/drive/)
* [Google Photos](/googlephotos/)
* [HDFS](/hdfs/)
* [HTTP](/http/)
* [Hubic](/hubic/)
* [Jottacloud / GetSky.no](/jottacloud/)
@@ -1296,10 +1297,15 @@ facility is `DAEMON`.
### --tpslimit float ###
Limit HTTP transactions per second to this. Default is 0 which is used
to mean unlimited transactions per second.
Limit transactions per second to this number. Default is 0 which is
used to mean unlimited transactions per second.
For example to limit rclone to 10 HTTP transactions per second use
A transaction is roughly defined as an API call; its exact meaning
will depend on the backend. For HTTP based backends it is an HTTP
PUT/GET/POST/etc and its response. For FTP/SFTP it is a round trip
transaction over TCP.
For example to limit rclone to 10 transactions per second use
`--tpslimit 10`, or to 1 transaction every 2 seconds use `--tpslimit
0.5`.
@@ -1310,6 +1316,10 @@ banned or rate limited).
This can be very useful for `rclone mount` to control the behaviour of
applications using it.
This limit applies to all HTTP based backends and to the FTP and SFTP
backends. It does not apply to the local backend or the Tardigrade
backend.
See also `--tpslimit-burst`.
### --tpslimit-burst int ###

199
docs/content/hdfs.md Normal file
View File

@@ -0,0 +1,199 @@
---
title: "HDFS Remote"
description: "Remote for Hadoop Distributed Filesystem"
---
{{< icon "fa fa-globe" >}} HDFS
-------------------------------------------------
[HDFS](https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-hdfs/HdfsDesign.html) is a
distributed file-system, part of the [Apache Hadoop](https://hadoop.apache.org/) framework.
Paths are specified as `remote:` or `remote:path/to/dir`.
Here is an example of how to make a remote called `remote`. First run:
rclone config
This will guide you through an interactive setup process:
```
No remotes found - make a new one
n) New remote
s) Set configuration password
q) Quit config
n/s/q> n
name> remote
Type of storage to configure.
Enter a string value. Press Enter for the default ("").
Choose a number from below, or type in your own value
[skip]
XX / Hadoop distributed file system
\ "hdfs"
[skip]
Storage> hdfs
** See help for hdfs backend at: https://rclone.org/hdfs/ **
hadoop name node and port
Enter a string value. Press Enter for the default ("").
Choose a number from below, or type in your own value
1 / Connect to host namenode at port 8020
\ "namenode:8020"
namenode> namenode.hadoop:8020
hadoop user name
Enter a string value. Press Enter for the default ("").
Choose a number from below, or type in your own value
1 / Connect to hdfs as root
\ "root"
username> root
Edit advanced config? (y/n)
y) Yes
n) No (default)
y/n> n
Remote config
--------------------
[remote]
type = hdfs
namenode = namenode.hadoop:8020
username = root
--------------------
y) Yes this is OK (default)
e) Edit this remote
d) Delete this remote
y/e/d> y
Current remotes:
Name Type
==== ====
hadoop hdfs
e) Edit existing remote
n) New remote
d) Delete remote
r) Rename remote
c) Copy remote
s) Set configuration password
q) Quit config
e/n/d/r/c/s/q> q
```
This remote is called `remote` and can now be used like this
See all the top level directories
rclone lsd remote:
List the contents of a directory
rclone ls remote:directory
Sync the remote `directory` to `/home/local/directory`, deleting any excess files.
rclone sync -i remote:directory /home/local/directory
### Setting up your own HDFS instance for testing
You may start with a [manual setup](https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/SingleCluster.html)
or use the docker image from the tests:
If you want to build the docker image
```
git clone https://github.com/rclone/rclone.git
cd rclone/fstest/testserver/images/test-hdfs
docker build --rm -t rclone/test-hdfs .
```
Or you can just use the latest one pushed
```
docker run --rm --name "rclone-hdfs" -p 127.0.0.1:9866:9866 -p 127.0.0.1:8020:8020 --hostname "rclone-hdfs" rclone/test-hdfs
```
**NB** it need few seconds to startup.
For this docker image the remote needs to be configured like this:
```
[remote]
type = hdfs
namenode = 127.0.0.1:8020
username = root
```
You can stop this image with `docker kill rclone-hdfs` (**NB** it does not use volumes, so all data
uploaded will be lost.)
### Modified time
Time accurate to 1 second is stored.
### Checksum
No checksums are implemented.
### Usage information
You can use the `rclone about remote:` command which will display filesystem size and current usage.
### Restricted filename characters
In addition to the [default restricted characters set](/overview/#restricted-characters)
the following characters are also replaced:
| Character | Value | Replacement |
| --------- |:-----:|:-----------:|
| : | 0x3A | |
Invalid UTF-8 bytes will also be [replaced](/overview/#invalid-utf8).
### Limitations
- No server-side `Move` or `DirMove`.
- Checksums not implemented.
{{< rem autogenerated options start" - DO NOT EDIT - instead edit fs.RegInfo in backend/hdfs/hdfs.go then run make backenddocs" >}}
### Standard Options
Here are the standard options specific to hdfs (Hadoop distributed file system).
#### --hdfs-namenode
hadoop name node and port
- Config: namenode
- Env Var: RCLONE_HDFS_NAMENODE
- Type: string
- Default: ""
- Examples:
- "namenode:8020"
- Connect to host namenode at port 8020
#### --hdfs-username
hadoop user name
- Config: username
- Env Var: RCLONE_HDFS_USERNAME
- Type: string
- Default: ""
- Examples:
- "root"
- Connect to hdfs as root
### Advanced Options
Here are the advanced options specific to hdfs (Hadoop distributed file system).
#### --hdfs-encoding
This sets the encoding for the backend.
See: the [encoding section in the overview](/overview/#encoding) for more info.
- Config: encoding
- Env Var: RCLONE_HDFS_ENCODING
- Type: MultiEncoder
- Default: Slash,Colon,Del,Ctl,InvalidUtf8,Dot
{{< rem autogenerated options stop >}}

View File

@@ -181,7 +181,7 @@ kill %1
## Install from source ##
Make sure you have at least [Go](https://golang.org/) 1.11
Make sure you have at least [Go](https://golang.org/) 1.12
installed. [Download go](https://golang.org/dl/) if necessary. The
latest release is recommended. Then

View File

@@ -163,8 +163,6 @@ the following characters are also replaced:
| ? | 0x3F | |
| \ | 0x5C | |
| \| | 0x7C | |
| # | 0x23 | |
| % | 0x25 | |
File names can also not end with the following characters.
These only get replaced if they are the last character in the name:

View File

@@ -28,6 +28,7 @@ Here is an overview of the major features of each cloud storage system.
| Google Cloud Storage | MD5 | Yes | No | No | R/W |
| Google Drive | MD5 | Yes | No | Yes | R/W |
| Google Photos | - | No | No | Yes | R |
| HDFS | - | Yes | No | No | - |
| HTTP | - | No | No | No | R |
| Hubic | MD5 | Yes | No | No | R/W |
| Jottacloud | MD5 | Yes | Yes | No | R |
@@ -341,6 +342,7 @@ upon backend specific capabilities.
| Google Cloud Storage | Yes | Yes | No | No | No | Yes | Yes | No [#2178](https://github.com/rclone/rclone/issues/2178) | No | No |
| Google Drive | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes |
| Google Photos | No | No | No | No | No | No | No | No | No | No |
| HDFS | Yes | No | No | No | No | No | Yes | No | Yes | Yes |
| HTTP | No | No | No | No | No | No | No | No [#2178](https://github.com/rclone/rclone/issues/2178) | No | Yes |
| Hubic | Yes † | Yes | No | No | No | Yes | Yes | No [#2178](https://github.com/rclone/rclone/issues/2178) | Yes | No |
| Jottacloud | Yes | Yes | Yes | Yes | Yes | Yes | No | Yes | Yes | Yes |

View File

@@ -78,6 +78,7 @@
<a class="dropdown-item" href="/googlecloudstorage/"><i class="fab fa-google"></i> Google Cloud Storage</a>
<a class="dropdown-item" href="/drive/"><i class="fab fa-google"></i> Google Drive</a>
<a class="dropdown-item" href="/googlephotos/"><i class="fas fa-images"></i> Google Photos</a>
<a class="dropdown-item" href="/hdfs/"><i class="fa fa-globe"></i> HDFS (Hadoop Distributed Filesystem)</a>
<a class="dropdown-item" href="/http/"><i class="fa fa-globe"></i> HTTP</a>
<a class="dropdown-item" href="/hubic/"><i class="fa fa-space-shuttle"></i> Hubic</a>
<a class="dropdown-item" href="/jottacloud/"><i class="fa fa-cloud"></i> Jottacloud</a>

View File

@@ -368,7 +368,7 @@ func (s *StatsInfo) Transferred() []TransferSnapshot {
func (s *StatsInfo) Log() {
if s.ci.UseJSONLog {
out, _ := s.RemoteStats()
fs.LogLevelPrintf(s.ci.StatsLogLevel, nil, "%v%v\n", s, fs.LogValue("stats", out))
fs.LogLevelPrintf(s.ci.StatsLogLevel, nil, "%v%v\n", s, fs.LogValueHide("stats", out))
} else {
fs.LogLevelPrintf(s.ci.StatsLogLevel, nil, "%v\n", s)
}

37
fs/accounting/tpslimit.go Normal file
View File

@@ -0,0 +1,37 @@
package accounting
import (
"context"
"github.com/rclone/rclone/fs"
"golang.org/x/time/rate"
)
var (
tpsBucket *rate.Limiter // for limiting number of http transactions per second
)
// StartLimitTPS starts the token bucket for transactions per second
// limiting if necessary
func StartLimitTPS(ctx context.Context) {
ci := fs.GetConfig(ctx)
if ci.TPSLimit > 0 {
tpsBurst := ci.TPSLimitBurst
if tpsBurst < 1 {
tpsBurst = 1
}
tpsBucket = rate.NewLimiter(rate.Limit(ci.TPSLimit), tpsBurst)
fs.Infof(nil, "Starting transaction limiter: max %g transactions/s with burst %d", ci.TPSLimit, tpsBurst)
}
}
// LimitTPS limits the number of transactions per second if enabled.
// It should be called once per transaction.
func LimitTPS(ctx context.Context) {
if tpsBucket != nil {
tbErr := tpsBucket.Wait(ctx)
if tbErr != nil && tbErr != context.Canceled {
fs.Errorf(nil, "HTTP token bucket error: %v", tbErr)
}
}
}

View File

@@ -0,0 +1,39 @@
package accounting
import (
"context"
"testing"
"time"
"github.com/rclone/rclone/fs"
"github.com/stretchr/testify/assert"
)
func TestLimitTPS(t *testing.T) {
timeTransactions := func(n int, minTime, maxTime time.Duration) {
start := time.Now()
for i := 0; i < n; i++ {
LimitTPS(context.Background())
}
dt := time.Since(start)
assert.True(t, dt >= minTime && dt <= maxTime, "Expecting time between %v and %v, got %v", minTime, maxTime, dt)
}
t.Run("Off", func(t *testing.T) {
assert.Nil(t, tpsBucket)
timeTransactions(100, 0*time.Millisecond, 100*time.Millisecond)
})
t.Run("On", func(t *testing.T) {
ctx, ci := fs.AddConfig(context.Background())
ci.TPSLimit = 100.0
ci.TPSLimitBurst = 0
StartLimitTPS(ctx)
assert.NotNil(t, tpsBucket)
defer func() {
tpsBucket = nil
}()
timeTransactions(100, 900*time.Millisecond, 2000*time.Millisecond)
})
}

View File

@@ -34,7 +34,6 @@ import (
"github.com/rclone/rclone/fs/config/configstruct"
"github.com/rclone/rclone/fs/config/obscure"
"github.com/rclone/rclone/fs/driveletter"
"github.com/rclone/rclone/fs/fshttp"
"github.com/rclone/rclone/fs/fspath"
"github.com/rclone/rclone/fs/rc"
"github.com/rclone/rclone/lib/random"
@@ -236,7 +235,7 @@ func LoadConfig(ctx context.Context) {
accounting.StartTokenTicker(ctx)
// Start the transactions per second limiter
fshttp.StartHTTPTokenBucket(ctx)
accounting.StartLimitTPS(ctx)
}
var errorConfigFileNotFound = errors.New("config file not found")

View File

@@ -16,9 +16,9 @@ import (
"time"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/accounting"
"github.com/rclone/rclone/lib/structs"
"golang.org/x/net/publicsuffix"
"golang.org/x/time/rate"
)
const (
@@ -29,24 +29,10 @@ const (
var (
transport http.RoundTripper
noTransport = new(sync.Once)
tpsBucket *rate.Limiter // for limiting number of http transactions per second
cookieJar, _ = cookiejar.New(&cookiejar.Options{PublicSuffixList: publicsuffix.List})
logMutex sync.Mutex
)
// StartHTTPTokenBucket starts the token bucket if necessary
func StartHTTPTokenBucket(ctx context.Context) {
ci := fs.GetConfig(ctx)
if ci.TPSLimit > 0 {
tpsBurst := ci.TPSLimitBurst
if tpsBurst < 1 {
tpsBurst = 1
}
tpsBucket = rate.NewLimiter(rate.Limit(ci.TPSLimit), tpsBurst)
fs.Infof(nil, "Starting HTTP transaction limiter: max %g transactions/s with burst %d", ci.TPSLimit, tpsBurst)
}
}
// A net.Conn that sets a deadline for every Read or Write operation
type timeoutConn struct {
net.Conn
@@ -309,13 +295,8 @@ func cleanAuths(buf []byte) []byte {
// RoundTrip implements the RoundTripper interface.
func (t *Transport) RoundTrip(req *http.Request) (resp *http.Response, err error) {
// Get transactions per second token first if limiting
if tpsBucket != nil {
tbErr := tpsBucket.Wait(req.Context())
if tbErr != nil && tbErr != context.Canceled {
fs.Errorf(nil, "HTTP token bucket error: %v", tbErr)
}
}
// Limit transactions per second if required
accounting.LimitTPS(req.Context())
// Force user agent
req.Header.Set("User-Agent", t.userAgent)
// Set user defined headers

View File

@@ -77,8 +77,9 @@ var LogPrint = func(level LogLevel, text string) {
// LogValueItem describes keyed item for a JSON log entry
type LogValueItem struct {
key string
value interface{}
key string
value interface{}
render bool
}
// LogValue should be used as an argument to any logging calls to
@@ -86,14 +87,31 @@ type LogValueItem struct {
//
// key is the dictionary parameter used to store value.
func LogValue(key string, value interface{}) LogValueItem {
return LogValueItem{key: key, value: value}
return LogValueItem{key: key, value: value, render: true}
}
// String returns an empty string so LogValueItem entries won't show
// in the textual representation of logs. They need to be put in so
// the number of parameters of the log call matches.
// LogValueHide should be used as an argument to any logging calls to
// augment the JSON output with more structured information.
//
// key is the dictionary parameter used to store value.
//
// String() will return a blank string - this is useful to put items
// in which don't print into the log.
func LogValueHide(key string, value interface{}) LogValueItem {
return LogValueItem{key: key, value: value, render: false}
}
// String returns the representation of value. If render is fals this
// is an empty string so LogValueItem entries won't show in the
// textual representation of logs.
func (j LogValueItem) String() string {
return ""
if !j.render {
return ""
}
if do, ok := j.value.(fmt.Stringer); ok {
return do.String()
}
return fmt.Sprint(j.value)
}
// LogPrintf produces a log string from the arguments passed in

View File

@@ -1,6 +1,28 @@
package fs
import "github.com/spf13/pflag"
import (
"fmt"
"testing"
"github.com/spf13/pflag"
"github.com/stretchr/testify/assert"
)
// Check it satisfies the interface
var _ pflag.Value = (*LogLevel)(nil)
var _ fmt.Stringer = LogValueItem{}
type withString struct{}
func (withString) String() string {
return "hello"
}
func TestLogValue(t *testing.T) {
x := LogValue("x", 1)
assert.Equal(t, "1", x.String())
x = LogValue("x", withString{})
assert.Equal(t, "hello", x.String())
x = LogValueHide("x", withString{})
assert.Equal(t, "", x.String())
}

View File

@@ -2123,7 +2123,15 @@ func SkipDestructive(ctx context.Context, subject interface{}, action string) (s
return false
}
if skip {
fs.Logf(subject, "Skipped %s as %s is set", action, flag)
size := int64(-1)
if do, ok := subject.(interface{ Size() int64 }); ok {
size = do.Size()
}
if size >= 0 {
fs.Logf(subject, "Skipped %s as %s is set (size %v)", fs.LogValue("skipped", action), flag, fs.LogValue("size", fs.SizeSuffix(size)))
} else {
fs.Logf(subject, "Skipped %s as %s is set", fs.LogValue("skipped", action), flag)
}
}
return skip
}

View File

@@ -353,17 +353,22 @@ func init() {
- command - a string with the command name
- arg - a list of arguments for the backend command
- opt - a map of string to string of options
- returnType - one of ("COMBINED_OUTPUT", "STREAM", "STREAM_ONLY_STDOUT", "STREAM_ONLY_STDERR")
- defaults to "COMBINED_OUTPUT" if not set
- the STREAM returnTypes will write the output to the body of the HTTP message
- the COMBINED_OUTPUT will write the output to the "result" parameter
Returns
- result - result from the backend command
- only set when using returnType "COMBINED_OUTPUT"
- error - set if rclone exits with an error code
- returnType - one of ("COMBINED_OUTPUT", "STREAM", "STREAM_ONLY_STDOUT". "STREAM_ONLY_STDERR")
- returnType - one of ("COMBINED_OUTPUT", "STREAM", "STREAM_ONLY_STDOUT", "STREAM_ONLY_STDERR")
For example
rclone rc core/command command=ls -a mydrive:/ -o max-depth=1
rclone rc core/command -a ls -a mydrive:/ -o max-depth=1
rclone rc core/command -a ls -a mydrive:/ -o max-depth=1
Returns
@@ -386,7 +391,6 @@ OR
// rcRunCommand runs an rclone command with the given args and flags
func rcRunCommand(ctx context.Context, in Params) (out Params, err error) {
command, err := in.GetString("command")
if err != nil {
command = ""
@@ -409,7 +413,7 @@ func rcRunCommand(ctx context.Context, in Params) (out Params, err error) {
returnType = "COMBINED_OUTPUT"
}
var httpResponse *http.ResponseWriter
var httpResponse http.ResponseWriter
httpResponse, err = in.GetHTTPResponseWriter()
if err != nil {
return nil, errors.Errorf("response object is required\n" + err.Error())
@@ -460,12 +464,14 @@ func rcRunCommand(ctx context.Context, in Params) (out Params, err error) {
"error": false,
}, nil
} else if returnType == "STREAM_ONLY_STDOUT" {
cmd.Stdout = *httpResponse
cmd.Stdout = httpResponse
} else if returnType == "STREAM_ONLY_STDERR" {
cmd.Stderr = *httpResponse
cmd.Stderr = httpResponse
} else if returnType == "STREAM" {
cmd.Stdout = *httpResponse
cmd.Stderr = *httpResponse
cmd.Stdout = httpResponse
cmd.Stderr = httpResponse
} else {
return nil, errors.Errorf("Unknown returnType %q", returnType)
}
err = cmd.Run()

View File

@@ -7,6 +7,7 @@ import (
"net/http/httptest"
"os"
"runtime"
"strings"
"testing"
"github.com/stretchr/testify/assert"
@@ -22,6 +23,12 @@ func TestMain(m *testing.M) {
fmt.Printf("rclone %s\n", fs.Version)
os.Exit(0)
}
// Pretend to error if we have an unknown command
if os.Args[len(os.Args)-1] == "unknown_command" {
fmt.Printf("rclone %s\n", fs.Version)
fmt.Fprintf(os.Stderr, "Unknown command\n")
os.Exit(1)
}
os.Exit(m.Run())
}
@@ -136,17 +143,56 @@ func TestCoreQuit(t *testing.T) {
func TestCoreCommand(t *testing.T) {
call := Calls.Get("core/command")
var httpResponse http.ResponseWriter = httptest.NewRecorder()
test := func(command string, returnType string, wantOutput string, fail bool) {
var rec = httptest.NewRecorder()
var w http.ResponseWriter = rec
in := Params{
"command": "version",
"opt": map[string]string{},
"arg": []string{},
"_response": &httpResponse,
in := Params{
"command": command,
"opt": map[string]string{},
"arg": []string{},
"_response": w,
}
if returnType != "" {
in["returnType"] = returnType
} else {
returnType = "COMBINED_OUTPUT"
}
stream := strings.HasPrefix(returnType, "STREAM")
got, err := call.Fn(context.Background(), in)
if stream && fail {
assert.Error(t, err)
} else {
assert.NoError(t, err)
}
if !stream {
assert.Equal(t, wantOutput, got["result"])
assert.Equal(t, fail, got["error"])
} else {
assert.Equal(t, wantOutput, rec.Body.String())
}
assert.Equal(t, http.StatusOK, rec.Result().StatusCode)
}
got, err := call.Fn(context.Background(), in)
require.NoError(t, err)
assert.Equal(t, fmt.Sprintf("rclone %s\n", fs.Version), got["result"])
assert.Equal(t, false, got["error"])
version := fmt.Sprintf("rclone %s\n", fs.Version)
errorString := "Unknown command\n"
t.Run("OK", func(t *testing.T) {
test("version", "", version, false)
})
t.Run("Fail", func(t *testing.T) {
test("unknown_command", "", version+errorString, true)
})
t.Run("Combined", func(t *testing.T) {
test("unknown_command", "COMBINED_OUTPUT", version+errorString, true)
})
t.Run("Stderr", func(t *testing.T) {
test("unknown_command", "STREAM_ONLY_STDERR", errorString, true)
})
t.Run("Stdout", func(t *testing.T) {
test("unknown_command", "STREAM_ONLY_STDOUT", version, true)
})
t.Run("Stream", func(t *testing.T) {
test("unknown_command", "STREAM", version+errorString, true)
})
}

View File

@@ -79,6 +79,15 @@ func Reshape(out interface{}, in interface{}) error {
return nil
}
// Copy shallow copies the Params
func (p Params) Copy() (out Params) {
out = make(Params, len(p))
for k, v := range p {
out[k] = v
}
return out
}
// Get gets a parameter from the input
//
// If the parameter isn't found then error will be of type
@@ -112,15 +121,15 @@ func (p Params) GetHTTPRequest() (*http.Request, error) {
//
// If the parameter isn't found then error will be of type
// ErrParamNotFound and the returned value will be nil.
func (p Params) GetHTTPResponseWriter() (*http.ResponseWriter, error) {
func (p Params) GetHTTPResponseWriter() (http.ResponseWriter, error) {
key := "_response"
value, err := p.Get(key)
if err != nil {
return nil, err
}
request, ok := value.(*http.ResponseWriter)
request, ok := value.(http.ResponseWriter)
if !ok {
return nil, ErrParamInvalid{errors.Errorf("expecting *http.ResponseWriter value for key %q (was %T)", key, value)}
return nil, ErrParamInvalid{errors.Errorf("expecting http.ResponseWriter value for key %q (was %T)", key, value)}
}
return request, nil
}

View File

@@ -2,6 +2,8 @@ package rc
import (
"fmt"
"net/http"
"net/http/httptest"
"testing"
"time"
@@ -61,6 +63,19 @@ func TestReshape(t *testing.T) {
}
func TestParamsCopy(t *testing.T) {
in := Params{
"ok": 1,
"x": "seventeen",
"nil": nil,
}
out := in.Copy()
assert.Equal(t, in, out)
if &in == &out {
t.Error("didn't copy")
}
}
func TestParamsGet(t *testing.T) {
in := Params{
"ok": 1,
@@ -346,3 +361,53 @@ func TestParamsGetStructMissingOK(t *testing.T) {
assert.Equal(t, 4.2, out.Float)
assert.Equal(t, true, IsErrParamInvalid(e3), e3.Error())
}
func TestParamsGetHTTPRequest(t *testing.T) {
in := Params{}
req, err := in.GetHTTPRequest()
assert.Nil(t, req)
assert.Error(t, err)
assert.Equal(t, true, IsErrParamNotFound(err), err.Error())
in = Params{
"_request": 42,
}
req, err = in.GetHTTPRequest()
assert.Nil(t, req)
assert.Error(t, err)
assert.Equal(t, true, IsErrParamInvalid(err), err.Error())
r := new(http.Request)
in = Params{
"_request": r,
}
req, err = in.GetHTTPRequest()
assert.NotNil(t, req)
assert.NoError(t, err)
assert.Equal(t, r, req)
}
func TestParamsGetHTTPResponseWriter(t *testing.T) {
in := Params{}
wr, err := in.GetHTTPResponseWriter()
assert.Nil(t, wr)
assert.Error(t, err)
assert.Equal(t, true, IsErrParamNotFound(err), err.Error())
in = Params{
"_response": 42,
}
wr, err = in.GetHTTPResponseWriter()
assert.Nil(t, wr)
assert.Error(t, err)
assert.Equal(t, true, IsErrParamInvalid(err), err.Error())
var w http.ResponseWriter = httptest.NewRecorder()
in = Params{
"_response": w,
}
wr, err = in.GetHTTPResponseWriter()
assert.NotNil(t, wr)
assert.NoError(t, err)
assert.Equal(t, w, wr)
}

View File

@@ -186,7 +186,7 @@ func writeError(path string, in rc.Params, w http.ResponseWriter, err error, sta
})
if err != nil {
// can't return the error at this point
fs.Errorf(nil, "rc: failed to write JSON output: %v", err)
fs.Errorf(nil, "rc: writeError: failed to write JSON output from %#v: %v", in, err)
}
}
@@ -270,6 +270,9 @@ func (s *Server) handlePost(w http.ResponseWriter, r *http.Request, path string)
writeError(path, in, w, errors.Errorf("authentication must be set up on the rc server to use %q or the --rc-no-auth flag must be in use", path), http.StatusForbidden)
return
}
inOrig := in.Copy()
if call.NeedsRequest {
// Add the request to RC
in["_request"] = r
@@ -282,7 +285,7 @@ func (s *Server) handlePost(w http.ResponseWriter, r *http.Request, path string)
// Check to see if it is async or not
isAsync, err := in.GetBool("_async")
if rc.NotErrParamNotFound(err) {
writeError(path, in, w, err, http.StatusBadRequest)
writeError(path, inOrig, w, err, http.StatusBadRequest)
return
}
delete(in, "_async") // remove the async parameter after parsing so vfs operations don't get confused
@@ -297,7 +300,7 @@ func (s *Server) handlePost(w http.ResponseWriter, r *http.Request, path string)
w.Header().Add("x-rclone-jobid", fmt.Sprintf("%d", jobID))
}
if err != nil {
writeError(path, in, w, err, http.StatusInternalServerError)
writeError(path, inOrig, w, err, http.StatusInternalServerError)
return
}
if out == nil {
@@ -308,8 +311,8 @@ func (s *Server) handlePost(w http.ResponseWriter, r *http.Request, path string)
err = rc.WriteJSON(w, out)
if err != nil {
// can't return the error at this point - but have a go anyway
writeError(path, in, w, err, http.StatusInternalServerError)
fs.Errorf(nil, "rc: failed to write JSON output: %v", err)
writeError(path, inOrig, w, err, http.StatusInternalServerError)
fs.Errorf(nil, "rc: handlePost: failed to write JSON output: %v", err)
}
}

View File

@@ -8,6 +8,7 @@ import (
"io/ioutil"
"net/http"
"net/http/httptest"
"os"
"regexp"
"strings"
"testing"
@@ -17,6 +18,7 @@ import (
"github.com/stretchr/testify/require"
_ "github.com/rclone/rclone/backend/local"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/accounting"
"github.com/rclone/rclone/fs/rc"
)
@@ -28,6 +30,21 @@ const (
remoteURL = "[" + testFs + "]/" // initial URL path to fetch from that remote
)
func TestMain(m *testing.M) {
// Pretend to be rclone version if we have a version string parameter
if os.Args[len(os.Args)-1] == "version" {
fmt.Printf("rclone %s\n", fs.Version)
os.Exit(0)
}
// Pretend to error if we have an unknown command
if os.Args[len(os.Args)-1] == "unknown_command" {
fmt.Printf("rclone %s\n", fs.Version)
fmt.Fprintf(os.Stderr, "Unknown command\n")
os.Exit(1)
}
os.Exit(m.Run())
}
// Test the RC server runs and we can do HTTP fetches from it.
// We'll do the majority of the testing with the httptest framework
func TestRcServer(t *testing.T) {
@@ -456,6 +473,73 @@ func TestRC(t *testing.T) {
testServer(t, tests, &opt)
}
func TestRCWithAuth(t *testing.T) {
tests := []testRun{{
Name: "core-command",
URL: "core/command",
Method: "POST",
Body: `command=version`,
ContentType: "application/x-www-form-urlencoded",
Status: http.StatusOK,
Expected: fmt.Sprintf(`{
"error": false,
"result": "rclone %s\n"
}
`, fs.Version),
}, {
Name: "core-command-bad-returnType",
URL: "core/command",
Method: "POST",
Body: `command=version&returnType=POTATO`,
ContentType: "application/x-www-form-urlencoded",
Status: http.StatusInternalServerError,
Expected: `{
"error": "Unknown returnType \"POTATO\"",
"input": {
"command": "version",
"returnType": "POTATO"
},
"path": "core/command",
"status": 500
}
`,
}, {
Name: "core-command-stream",
URL: "core/command",
Method: "POST",
Body: `command=version&returnType=STREAM`,
ContentType: "application/x-www-form-urlencoded",
Status: http.StatusOK,
Expected: fmt.Sprintf(`rclone %s
{}
`, fs.Version),
}, {
Name: "core-command-stream-error",
URL: "core/command",
Method: "POST",
Body: `command=unknown_command&returnType=STREAM`,
ContentType: "application/x-www-form-urlencoded",
Status: http.StatusOK,
Expected: fmt.Sprintf(`rclone %s
Unknown command
{
"error": "exit status 1",
"input": {
"command": "unknown_command",
"returnType": "STREAM"
},
"path": "core/command",
"status": 500
}
`, fs.Version),
}}
opt := newTestOpt()
opt.Serve = true
opt.Files = testFs
opt.NoAuth = true
testServer(t, tests, &opt)
}
func TestMethods(t *testing.T) {
tests := []testRun{{
Name: "options",

View File

@@ -1,7 +1,7 @@
//+build !go1.11
//+build !go1.12
package fs
// Upgrade to Go version 1.11 to compile rclone - latest stable go
// Upgrade to Go version 1.12 to compile rclone - latest stable go
// compiler recommended.
func init() { Go_version_1_11_required_for_compilation() }
func init() { Go_version_1_12_required_for_compilation() }

View File

@@ -297,3 +297,8 @@ backends:
- backend: "zoho"
remote: "TestZoho:"
fastlist: false
- backend: "hdfs"
remote: "TestHdfs:"
fastlist: false
ignore:
- TestSyncUTFNorm

View File

@@ -0,0 +1,42 @@
# A very minimal hdfs server for integration testing rclone
FROM debian:stretch
RUN apt-get update \
&& DEBIAN_FRONTEND=noninteractive apt-get install -y --no-install-recommends openjdk-8-jdk \
&& rm -rf /var/lib/apt/lists/*
ENV JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64/
RUN apt-get update && DEBIAN_FRONTEND=noninteractive apt-get install -y --no-install-recommends net-tools curl python
ENV HADOOP_VERSION 3.2.1
ENV HADOOP_URL https://www.apache.org/dist/hadoop/common/hadoop-$HADOOP_VERSION/hadoop-$HADOOP_VERSION.tar.gz
RUN set -x \
&& curl -fSL "$HADOOP_URL" -o /tmp/hadoop.tar.gz \
&& tar -xvf /tmp/hadoop.tar.gz -C /opt/ \
&& rm /tmp/hadoop.tar.gz*
RUN ln -s /opt/hadoop-$HADOOP_VERSION/etc/hadoop /etc/hadoop
RUN mkdir /opt/hadoop-$HADOOP_VERSION/logs
RUN mkdir /hadoop-data
RUN mkdir -p /hadoop/dfs/name
RUN mkdir -p /hadoop/dfs/data
ENV HADOOP_HOME=/opt/hadoop-$HADOOP_VERSION
ENV HADOOP_CONF_DIR=/etc/hadoop
ENV MULTIHOMED_NETWORK=1
ENV USER=root
ENV PATH $HADOOP_HOME/bin/:$PATH
ADD core-site.xml /etc/hadoop/core-site.xml
ADD hdfs-site.xml /etc/hadoop/hdfs-site.xml
ADD httpfs-site.xml /etc/hadoop/httpfs-site.xml
ADD kms-site.xml /etc/hadoop/kms-site.xml
ADD mapred-site.xml /etc/hadoop/mapred-site.xml
ADD yarn-site.xml /etc/hadoop/yarn-site.xml
ADD run.sh /run.sh
RUN chmod a+x /run.sh
CMD ["/run.sh"]

View File

@@ -0,0 +1,32 @@
# Test HDFS
This is a docker image for rclone's integration tests which runs an
hdfs filesystem in a docker image.
## Build
```
docker build --rm -t rclone/test-hdfs .
docker push rclone/test-hdfs
```
# Test
configure remote:
```
[TestHdfs]
type = hdfs
namenode = 127.0.0.1:8020
username = root
```
run tests
```
cd backend/hdfs
GO111MODULE=on go test -v
```
stop docker image:
```
docker kill rclone-hdfs
```

View File

@@ -0,0 +1,6 @@
<configuration>
<property><name>fs.defaultFS</name><value>hdfs://localhost:8020</value></property>
<property><name>hadoop.http.staticuser.user</name><value>root</value></property>
<property><name>hadoop.proxyuser.root.groups</name><value>root,nogroup</value></property>
<property><name>hadoop.proxyuser.root.hosts</name><value>*</value></property>
</configuration>

View File

@@ -0,0 +1,14 @@
<configuration>
<property><name>dfs.client.use.datanode.hostname</name><value>true</value></property>
<property><name>dfs.datanode.data.dir</name><value>file:///hadoop/dfs/data</value></property>
<property><name>dfs.datanode.use.datanode.hostname</name><value>true</value></property>
<property><name>dfs.namenode.accesstime.precision</name><value>3600000</value></property>
<property><name>dfs.namenode.http-bind-host</name><value>0.0.0.0</value></property>
<property><name>dfs.namenode.https-bind-host</name><value>0.0.0.0</value></property>
<property><name>dfs.namenode.name.dir</name><value>file:///hadoop/dfs/name</value></property>
<property><name>dfs.namenode.rpc-bind-host</name><value>0.0.0.0</value></property>
<property><name>dfs.namenode.safemode.extension</name><value>5000</value></property>
<property><name>dfs.namenode.servicerpc-bind-host</name><value>0.0.0.0</value></property>
<property><name>dfs.replication</name><value>2</value></property>
<property><name>nfs.dump.dir</name><value>/tmp</value></property>
</configuration>

View File

@@ -0,0 +1,2 @@
<configuration>
</configuration>

View File

@@ -0,0 +1,2 @@
<configuration>
</configuration>

View File

@@ -0,0 +1,5 @@
<configuration>
<property><name>mapreduce.framework.name</name><value>yarn</value></property>
<property><name>yarn.nodemanager.bind-host</name><value>0.0.0.0</value></property>
</configuration>

View File

@@ -0,0 +1,8 @@
#!/bin/bash
echo format namenode
hdfs namenode -format test
hdfs namenode &
hdfs datanode &
exec sleep infinity

View File

@@ -0,0 +1,14 @@
<configuration>
<property><name>yarn.log-aggregation-enable</name><value>true</value></property>
<property><name>yarn.log.server.url</name><value>http://localhost:8188/applicationhistory/logs/</value></property>
<property><name>yarn.nodemanager.aux-services.mapreduce.shuffle.class</name><value>org.apache.hadoop.mapred.ShuffleHandler</value></property>
<property><name>yarn.nodemanager.aux-services</name><value>mapreduce_shuffle</value></property>
<property><name>yarn.nodemanager.bind-host</name><value>0.0.0.0</value></property>
<property><name>yarn.nodemanager.bind-host</name><value>0.0.0.0</value></property>
<property><name>yarn.nodemanager.remote-app-log-dir</name><value>/app-logs</value></property>
<property><name>yarn.timeline-service.bind-host</name><value>0.0.0.0</value></property>
<property><name>yarn.timeline-service.enabled</name><value>true</value></property>
<property><name>yarn.timeline-service.generic-application-history.enabled</name><value>true</value></property>
<property><name>yarn.timeline-service.hostname</name><value>historyserver.hadoop</value></property>
<property><name>yarn.timeline-service.leveldb-timeline-store.path</name><value>/hadoop/yarn/timeline</value></property>
</configuration>

View File

@@ -0,0 +1,24 @@
#!/bin/bash
set -e
NAME=rclone-hdfs
. $(dirname "$0")/docker.bash
start() {
docker run --rm -d --name "rclone-hdfs" -p 127.0.0.1:9866:9866 -p 127.0.0.1:8020:8020 --hostname "rclone-hdfs" rclone/test-hdfs
sleep 10
echo type=hdfs
echo namenode=127.0.0.1:8020
echo username=root
}
stop() {
if status ; then
docker kill $NAME
echo "$NAME stopped"
fi
}
. $(dirname "$0")/run.bash

1
go.mod
View File

@@ -18,6 +18,7 @@ require (
github.com/billziss-gh/cgofuse v1.4.0
github.com/buengese/sgzip v0.1.0
github.com/calebcase/tmpfile v1.0.2 // indirect
github.com/colinmarc/hdfs/v2 v2.1.1
github.com/coreos/go-semver v0.3.0
github.com/dropbox/dropbox-sdk-go-unofficial v5.6.0+incompatible
github.com/gabriel-vasile/mimetype v1.1.1

26
go.sum
View File

@@ -140,6 +140,8 @@ github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDk
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
github.com/cockroachdb/datadriven v0.0.0-20190809214429-80d97fb3cbaa/go.mod h1:zn76sxSg3SzpJ0PPJaLDCu+Bu0Lg3sKTORVIj19EIF8=
github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd/go.mod h1:sE/e/2PUdi/liOCUjSTXgM1o87ZssimdTWN964YiIeI=
github.com/colinmarc/hdfs/v2 v2.1.1 h1:x0hw/m+o3UE20Scso/KCkvYNc9Di39TBlCfGMkJ1/a0=
github.com/colinmarc/hdfs/v2 v2.1.1/go.mod h1:M3x+k8UKKmxtFu++uAZ0OtDU8jR3jnaZIAc6yK4Ue0c=
github.com/coreos/bbolt v1.3.2/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkEiiKk=
github.com/coreos/etcd v3.3.13+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE=
github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
@@ -222,6 +224,7 @@ github.com/golang/mock v1.4.1/go.mod h1:UOMv5ysSaYNkG+OFQykRIcU/QvvxJf3p21QfJ2Bt
github.com/golang/mock v1.4.3/go.mod h1:UOMv5ysSaYNkG+OFQykRIcU/QvvxJf3p21QfJ2Bt3cw=
github.com/golang/mock v1.4.4 h1:l75CXGRSwbaYNpl/Z2X1XIIAMSCquvXgpVZDhwEIJsc=
github.com/golang/mock v1.4.4/go.mod h1:l3mdAwkq5BuhzHwde/uurv3sEJeZMXNpwsxVWU71h+4=
github.com/golang/protobuf v1.1.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
@@ -303,7 +306,9 @@ github.com/hashicorp/go-multierror v1.0.0/go.mod h1:dHtQlpGsu+cZNNAkkCN/P3hoUDHh
github.com/hashicorp/go-rootcerts v1.0.0/go.mod h1:K6zTfqpRlCUIjkwsN4Z+hiSfzSTQa6eBIzfwKfwNnHU=
github.com/hashicorp/go-sockaddr v1.0.0/go.mod h1:7Xibr9yA9JjQq1JpNB2Vw7kxv8xerXegt+ozgdvDeDU=
github.com/hashicorp/go-syslog v1.0.0/go.mod h1:qPfqrKkXGihmCqbJM2mZgkZGvKG1dFdvsLplgctolz4=
github.com/hashicorp/go-uuid v0.0.0-20180228145832-27454136f036/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro=
github.com/hashicorp/go-uuid v1.0.0/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro=
github.com/hashicorp/go-uuid v1.0.1 h1:fv1ep09latC32wFoVwnqcnKJGnMSdBanPczbHAYm1BE=
github.com/hashicorp/go-uuid v1.0.1/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro=
github.com/hashicorp/go-version v1.2.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA=
github.com/hashicorp/go.net v0.0.1/go.mod h1:hjKkEWcCURg++eb33jQU7oqQcI9XDCnUzHA0oac0k90=
@@ -325,6 +330,8 @@ github.com/iguanesolutions/go-systemd/v5 v5.0.0/go.mod h1:VPlzL6z0rXd3HU7oLkMoEq
github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM=
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
github.com/influxdata/influxdb1-client v0.0.0-20191209144304-8bf82d3c094d/go.mod h1:qj24IKcXYK6Iy9ceXlo3Tc+vtHo9lIhSX5JddghvEPo=
github.com/jcmturner/gofork v0.0.0-20180107083740-2aebee971930 h1:v4CYlQ+HeysPHsr2QFiEO60gKqnvn1xwvuKhhAhuEkk=
github.com/jcmturner/gofork v0.0.0-20180107083740-2aebee971930/go.mod h1:MK8+TM0La+2rjBD4jE12Kj1pCCxK7d2LK/UM3ncEo0o=
github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
github.com/jlaffaye/ftp v0.0.0-20190624084859-c1312a7102bf/go.mod h1:lli8NYPQOFy3O++YmYbqVgOcQ1JPCwdOy+5zSjKJ9qY=
github.com/jlaffaye/ftp v0.0.0-20201112195030-9aae4d151126 h1:ly2C51IMpCCV8RpTDRXgzG/L9iZXb8ePEixaew/HwBs=
@@ -458,6 +465,7 @@ github.com/pact-foundation/pact-go v1.0.4/go.mod h1:uExwJY4kCzNPcHRj+hCR/HBbOOIw
github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc=
github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ=
github.com/pborman/getopt v0.0.0-20180729010549-6fdd0a2c7117/go.mod h1:85jBQOZwpVEaDAr341tbn15RS4fCAsIst0qp7i8ex1o=
github.com/pborman/uuid v1.2.0/go.mod h1:X/NO0urCmaxf9VXbdlT7C2Yzkj2IKimNn4k+gtPdI/k=
github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
github.com/pengsrc/go-shared v0.2.1-0.20190131101655-1999055a4a14 h1:XeOYlK9W1uCmhjJSsY78Mcuh7MVkNjTzmHx1yBzizSU=
@@ -649,6 +657,7 @@ go.uber.org/zap v1.16.0/go.mod h1:MA8QOfq0BHJwdXa996Y4dYkAqRKB8/1K1QMMZVaNZjQ=
goftp.io/server v0.4.0 h1:hqsVdwd1/l6QtYxD9pxca9mEAJYZ7+FPCnmeXKXHQNw=
goftp.io/server v0.4.0/go.mod h1:hFZeR656ErRt3ojMKt7H10vQ5nuWV1e0YeUTeorlR6k=
golang.org/x/crypto v0.0.0-20170930174604-9419663f5a44/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20180723164146-c126467f60eb/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20181029021203-45a5f77698d3/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190131182504-b8fe1690c613/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
@@ -663,7 +672,9 @@ golang.org/x/crypto v0.0.0-20200115085410-6d4e4cb37c7d/go.mod h1:LzIPMQfyMNhhGPh
golang.org/x/crypto v0.0.0-20200302210943-78000ba7a073/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 h1:psW17arqaxU48Z5kZ0CQnkZWQJsqcURM6tKiBApRjXI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a h1:vclmkQCjlDX5OydZ9wv8rBCcS0QyQY66Mpf/7BZbInM=
golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0 h1:hb9wdF1z5waM+dSIICn1l0DkLVDT3hqhhQsDNUmHPRE=
golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20201016220609-9e8e0b390897 h1:pLI5jrR7OSLijeIDcmRxNmw2api+jEfxLoykJVice/E=
golang.org/x/crypto v0.0.0-20201016220609-9e8e0b390897/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
@@ -738,6 +749,7 @@ golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81R
golang.org/x/net v0.0.0-20200707034311-ab3426394381 h1:VXak5I6aEWmAXeQjA+QSZzlgNrpq9mjcfDemuexIKsU=
golang.org/x/net v0.0.0-20200707034311-ab3426394381/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
golang.org/x/net v0.0.0-20201010224723-4f7140c49acb h1:mUVeFHoDKis5nxCAzoAi7E8Ghb86EXh/RK6wtvJIqRY=
golang.org/x/net v0.0.0-20201010224723-4f7140c49acb/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20201029055024-942e2f445f3c h1:rpcgRPA7OvNEOdprt2Wx8/Re2cBTd8NPo/lvo3AyMqk=
golang.org/x/net v0.0.0-20201029055024-942e2f445f3c/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
@@ -808,6 +820,7 @@ golang.org/x/sys v0.0.0-20200803210538-64077c9b5642/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20200828194041-157a740278f4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200905004654-be1d3432aa8f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201015000850-e3ed0017c211 h1:9UQO31fZ+0aKQOFldThf7BKPMJTiBfWycGh/u3UoO88=
golang.org/x/sys v0.0.0-20201015000850-e3ed0017c211/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201029080932-201ba4db2418 h1:HlFl4V6pEMziuLXyRkm5BIYq1y1GAbb02pRlWvI54OM=
golang.org/x/sys v0.0.0-20201029080932-201ba4db2418/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
@@ -902,6 +915,7 @@ google.golang.org/api v0.28.0 h1:jMF5hhVfMkTZwHW1SDpKq5CkgWLXOb31Foaca9Zr3oM=
google.golang.org/api v0.28.0/go.mod h1:lIXQywCXRcnZPGlsd8NbLnOjtAoL6em04bJ9+z0MncE=
google.golang.org/api v0.29.0/go.mod h1:Lcubydp8VUV7KeIHD9z2Bys/sm/vGKnG1UHuDBSrHWM=
google.golang.org/api v0.30.0/go.mod h1:QGmEvQ87FHZNiUVJkT14jQNYJ4ZJjdRF23ZXz5138Fc=
google.golang.org/api v0.33.0 h1:+gL0XvACeMIvpwLZ5rQZzLn5cwOsgg8dIcfJ2SYfBVw=
google.golang.org/api v0.33.0/go.mod h1:/XrVsuzM0rZmrsbjJutiuftIzeuTQcEeaYcSk/mQ1dg=
google.golang.org/api v0.34.0 h1:k40adF3uR+6x/+hO5Dh4ZFUqFp67vxvbpafFiJxl10A=
google.golang.org/api v0.34.0/go.mod h1:/XrVsuzM0rZmrsbjJutiuftIzeuTQcEeaYcSk/mQ1dg=
@@ -948,6 +962,7 @@ google.golang.org/genproto v0.0.0-20200729003335-053ba62fc06f/go.mod h1:FWY/as6D
google.golang.org/genproto v0.0.0-20200804131852-c06518451d9c/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
google.golang.org/genproto v0.0.0-20200825200019-8632dd797987/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
google.golang.org/genproto v0.0.0-20200904004341-0bd0a958aa1d/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
google.golang.org/genproto v0.0.0-20201019141844-1ed22bb0c154 h1:bFFRpT+e8JJVY7lMMfvezL1ZIwqiwmPl2bsE2yx4HqM=
google.golang.org/genproto v0.0.0-20201019141844-1ed22bb0c154/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
google.golang.org/genproto v0.0.0-20201028140639-c77dae4b0522 h1:7RoRaOmOAXwqnurgQ5g5/d0yCi9ha2UxuTZULXudK7A=
google.golang.org/genproto v0.0.0-20201028140639-c77dae4b0522/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
@@ -971,6 +986,7 @@ google.golang.org/grpc v1.30.0 h1:M5a8xTlYTxwMn5ZFkwhRabsygDY5G8TYLyQDBxJNAxE=
google.golang.org/grpc v1.30.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak=
google.golang.org/grpc v1.31.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak=
google.golang.org/grpc v1.31.1/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak=
google.golang.org/grpc v1.32.0 h1:zWTV+LMdc3kaiJMSTOFz2UgSBgx8RNQoTGiZu3fR9S0=
google.golang.org/grpc v1.32.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak=
google.golang.org/grpc v1.33.1 h1:DGeFlSan2f+WEtCERJ4J9GJWk15TxUi8QGagfI87Xyc=
google.golang.org/grpc v1.33.1/go.mod h1:fr5YgcSWrqhRRxogOsw7RzIpsmvOZ6IcH4kBYTpR3n0=
@@ -1001,6 +1017,16 @@ gopkg.in/gcfg.v1 v1.2.3/go.mod h1:yesOnuUOFQAhST5vPY4nbZsb/huCgGGXlipJsBn0b3o=
gopkg.in/ini.v1 v1.42.0 h1:7N3gPTt50s8GuLortA00n8AqRTk75qOP98+mTPpgzRk=
gopkg.in/ini.v1 v1.42.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
gopkg.in/ini.v1 v1.51.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
gopkg.in/jcmturner/aescts.v1 v1.0.1 h1:cVVZBK2b1zY26haWB4vbBiZrfFQnfbTVrE3xZq6hrEw=
gopkg.in/jcmturner/aescts.v1 v1.0.1/go.mod h1:nsR8qBOg+OucoIW+WMhB3GspUQXq9XorLnQb9XtvcOo=
gopkg.in/jcmturner/dnsutils.v1 v1.0.1 h1:cIuC1OLRGZrld+16ZJvvZxVJeKPsvd5eUIvxfoN5hSM=
gopkg.in/jcmturner/dnsutils.v1 v1.0.1/go.mod h1:m3v+5svpVOhtFAP/wSz+yzh4Mc0Fg7eRhxkJMWSIz9Q=
gopkg.in/jcmturner/goidentity.v3 v3.0.0 h1:1duIyWiTaYvVx3YX2CYtpJbUFd7/UuPYCfgXtQ3VTbI=
gopkg.in/jcmturner/goidentity.v3 v3.0.0/go.mod h1:oG2kH0IvSYNIu80dVAyu/yoefjq1mNfM5bm88whjWx4=
gopkg.in/jcmturner/gokrb5.v7 v7.3.0 h1:0709Jtq/6QXEuWRfAm260XqlpcwL1vxtO1tUE2qK8Z4=
gopkg.in/jcmturner/gokrb5.v7 v7.3.0/go.mod h1:l8VISx+WGYp+Fp7KRbsiUuXTTOnxIc3Tuvyavf11/WM=
gopkg.in/jcmturner/rpc.v1 v1.1.0 h1:QHIUxTX1ISuAv9dD2wJ9HWQVuWDX/Zc0PfeC2tjc4rU=
gopkg.in/jcmturner/rpc.v1 v1.1.0/go.mod h1:YIdkC4XfD6GXbzje11McwsDuOlZQSb9W4vfLvuNnlv8=
gopkg.in/resty.v1 v1.12.0/go.mod h1:mDo4pnntr5jdWRML875a/NmxYqAlA73dVijT2AXvQQo=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=