mirror of
https://github.com/rclone/rclone.git
synced 2026-01-07 19:13:19 +00:00
Compare commits
1 Commits
fix-rc-dis
...
fix-rc-del
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7e9eae8c90 |
@@ -2797,6 +2797,8 @@ func (o *Object) clearUncommittedBlocks(ctx context.Context) (err error) {
|
||||
blockList blockblob.GetBlockListResponse
|
||||
properties *blob.GetPropertiesResponse
|
||||
options *blockblob.CommitBlockListOptions
|
||||
// Use temporary pacer as this can be called recursively which can cause a deadlock with --max-connections
|
||||
pacer = fs.NewPacer(ctx, pacer.NewS3(pacer.MinSleep(minSleep), pacer.MaxSleep(maxSleep), pacer.DecayConstant(decayConstant)))
|
||||
)
|
||||
|
||||
properties, err = o.readMetaDataAlways(ctx)
|
||||
@@ -2808,7 +2810,7 @@ func (o *Object) clearUncommittedBlocks(ctx context.Context) (err error) {
|
||||
|
||||
if objectExists {
|
||||
// Get the committed block list
|
||||
err = o.fs.pacer.Call(func() (bool, error) {
|
||||
err = pacer.Call(func() (bool, error) {
|
||||
blockList, err = blockBlobSVC.GetBlockList(ctx, blockblob.BlockListTypeAll, nil)
|
||||
return o.fs.shouldRetry(ctx, err)
|
||||
})
|
||||
@@ -2850,7 +2852,7 @@ func (o *Object) clearUncommittedBlocks(ctx context.Context) (err error) {
|
||||
|
||||
// Commit only the committed blocks
|
||||
fs.Debugf(o, "Committing %d blocks to remove uncommitted blocks", len(blockIDs))
|
||||
err = o.fs.pacer.Call(func() (bool, error) {
|
||||
err = pacer.Call(func() (bool, error) {
|
||||
_, err := blockBlobSVC.CommitBlockList(ctx, blockIDs, options)
|
||||
return o.fs.shouldRetry(ctx, err)
|
||||
})
|
||||
|
||||
@@ -2224,17 +2224,13 @@ func (f *Fs) OpenChunkWriter(ctx context.Context, remote string, src fs.ObjectIn
|
||||
return info, nil, err
|
||||
}
|
||||
|
||||
up, err := f.newLargeUpload(ctx, o, nil, src, f.opt.ChunkSize, false, nil, options...)
|
||||
if err != nil {
|
||||
return info, nil, err
|
||||
}
|
||||
|
||||
info = fs.ChunkWriterInfo{
|
||||
ChunkSize: up.chunkSize,
|
||||
ChunkSize: int64(f.opt.ChunkSize),
|
||||
Concurrency: o.fs.opt.UploadConcurrency,
|
||||
//LeavePartsOnError: o.fs.opt.LeavePartsOnError,
|
||||
}
|
||||
return info, up, nil
|
||||
up, err := f.newLargeUpload(ctx, o, nil, src, f.opt.ChunkSize, false, nil, options...)
|
||||
return info, up, err
|
||||
}
|
||||
|
||||
// Remove an object
|
||||
|
||||
@@ -5,7 +5,6 @@ package api
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/url"
|
||||
"reflect"
|
||||
"strconv"
|
||||
"time"
|
||||
@@ -137,25 +136,8 @@ type Link struct {
|
||||
}
|
||||
|
||||
// Valid reports whether l is non-nil, has an URL, and is not expired.
|
||||
// It primarily checks the URL's expire query parameter, falling back to the Expire field.
|
||||
func (l *Link) Valid() bool {
|
||||
if l == nil || l.URL == "" {
|
||||
return false
|
||||
}
|
||||
|
||||
// Primary validation: check URL's expire query parameter
|
||||
if u, err := url.Parse(l.URL); err == nil {
|
||||
if expireStr := u.Query().Get("expire"); expireStr != "" {
|
||||
// Try parsing as Unix timestamp (seconds)
|
||||
if expireInt, err := strconv.ParseInt(expireStr, 10, 64); err == nil {
|
||||
expireTime := time.Unix(expireInt, 0)
|
||||
return time.Now().Add(10 * time.Second).Before(expireTime)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Fallback validation: use the Expire field if URL parsing didn't work
|
||||
return time.Now().Add(10 * time.Second).Before(time.Time(l.Expire))
|
||||
return l != nil && l.URL != "" && time.Now().Add(10*time.Second).Before(time.Time(l.Expire))
|
||||
}
|
||||
|
||||
// URL is a basic form of URL
|
||||
|
||||
@@ -1,99 +0,0 @@
|
||||
package api
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
// TestLinkValid tests the Link.Valid method for various scenarios
|
||||
func TestLinkValid(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
link *Link
|
||||
expected bool
|
||||
desc string
|
||||
}{
|
||||
{
|
||||
name: "nil link",
|
||||
link: nil,
|
||||
expected: false,
|
||||
desc: "nil link should be invalid",
|
||||
},
|
||||
{
|
||||
name: "empty URL",
|
||||
link: &Link{URL: ""},
|
||||
expected: false,
|
||||
desc: "empty URL should be invalid",
|
||||
},
|
||||
{
|
||||
name: "valid URL with future expire parameter",
|
||||
link: &Link{
|
||||
URL: fmt.Sprintf("https://example.com/file?expire=%d", time.Now().Add(time.Hour).Unix()),
|
||||
},
|
||||
expected: true,
|
||||
desc: "URL with future expire parameter should be valid",
|
||||
},
|
||||
{
|
||||
name: "expired URL with past expire parameter",
|
||||
link: &Link{
|
||||
URL: fmt.Sprintf("https://example.com/file?expire=%d", time.Now().Add(-time.Hour).Unix()),
|
||||
},
|
||||
expected: false,
|
||||
desc: "URL with past expire parameter should be invalid",
|
||||
},
|
||||
{
|
||||
name: "URL expire parameter takes precedence over Expire field",
|
||||
link: &Link{
|
||||
URL: fmt.Sprintf("https://example.com/file?expire=%d", time.Now().Add(time.Hour).Unix()),
|
||||
Expire: Time(time.Now().Add(-time.Hour)), // Fallback is expired
|
||||
},
|
||||
expected: true,
|
||||
desc: "URL expire parameter should take precedence over Expire field",
|
||||
},
|
||||
{
|
||||
name: "URL expire parameter within 10 second buffer should be invalid",
|
||||
link: &Link{
|
||||
URL: fmt.Sprintf("https://example.com/file?expire=%d", time.Now().Add(5*time.Second).Unix()),
|
||||
},
|
||||
expected: false,
|
||||
desc: "URL expire parameter within 10 second buffer should be invalid",
|
||||
},
|
||||
{
|
||||
name: "fallback to Expire field when no URL expire parameter",
|
||||
link: &Link{
|
||||
URL: "https://example.com/file",
|
||||
Expire: Time(time.Now().Add(time.Hour)),
|
||||
},
|
||||
expected: true,
|
||||
desc: "should fallback to Expire field when URL has no expire parameter",
|
||||
},
|
||||
{
|
||||
name: "fallback to Expire field when URL expire parameter is invalid",
|
||||
link: &Link{
|
||||
URL: "https://example.com/file?expire=invalid",
|
||||
Expire: Time(time.Now().Add(time.Hour)),
|
||||
},
|
||||
expected: true,
|
||||
desc: "should fallback to Expire field when URL expire parameter is unparseable",
|
||||
},
|
||||
{
|
||||
name: "invalid when both URL expire and Expire field are expired",
|
||||
link: &Link{
|
||||
URL: fmt.Sprintf("https://example.com/file?expire=%d", time.Now().Add(-time.Hour).Unix()),
|
||||
Expire: Time(time.Now().Add(-time.Hour)),
|
||||
},
|
||||
expected: false,
|
||||
desc: "should be invalid when both URL expire and Expire field are expired",
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
result := tt.link.Valid()
|
||||
if result != tt.expected {
|
||||
t.Errorf("Link.Valid() = %v, expected %v. %s", result, tt.expected, tt.desc)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -192,9 +192,6 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// if root is empty or ends with / (must be a directory)
|
||||
isRootDir := isPathDir(root)
|
||||
|
||||
root = strings.Trim(root, "/")
|
||||
|
||||
f := &Fs{
|
||||
@@ -221,11 +218,6 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e
|
||||
if share == "" || dir == "" {
|
||||
return f, nil
|
||||
}
|
||||
|
||||
// Skip stat check if root is already a directory
|
||||
if isRootDir {
|
||||
return f, nil
|
||||
}
|
||||
cn, err := f.getConnection(ctx, share)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -902,11 +894,6 @@ func ensureSuffix(s, suffix string) string {
|
||||
return s + suffix
|
||||
}
|
||||
|
||||
// isPathDir determines if a path represents a directory based on trailing slash
|
||||
func isPathDir(path string) bool {
|
||||
return path == "" || strings.HasSuffix(path, "/")
|
||||
}
|
||||
|
||||
func trimPathPrefix(s, prefix string) string {
|
||||
// we need to clean the paths to make tests pass!
|
||||
s = betterPathClean(s)
|
||||
|
||||
@@ -1,41 +0,0 @@
|
||||
// Unit tests for internal SMB functions
|
||||
package smb
|
||||
|
||||
import "testing"
|
||||
|
||||
// TestIsPathDir tests the isPathDir function logic
|
||||
func TestIsPathDir(t *testing.T) {
|
||||
tests := []struct {
|
||||
path string
|
||||
expected bool
|
||||
}{
|
||||
// Empty path should be considered a directory
|
||||
{"", true},
|
||||
|
||||
// Paths with trailing slash should be directories
|
||||
{"/", true},
|
||||
{"share/", true},
|
||||
{"share/dir/", true},
|
||||
{"share/dir/subdir/", true},
|
||||
|
||||
// Paths without trailing slash should not be directories
|
||||
{"share", false},
|
||||
{"share/dir", false},
|
||||
{"share/dir/file", false},
|
||||
{"share/dir/subdir/file", false},
|
||||
|
||||
// Edge cases
|
||||
{"share//", true},
|
||||
{"share///", true},
|
||||
{"share/dir//", true},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.path, func(t *testing.T) {
|
||||
result := isPathDir(tt.path)
|
||||
if result != tt.expected {
|
||||
t.Errorf("isPathDir(%q) = %v, want %v", tt.path, result, tt.expected)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -208,7 +208,6 @@ func newServer(ctx context.Context, f fs.Fs, opt *Options, vfsOpt *vfscommon.Opt
|
||||
// Serve HTTP until the server is shutdown
|
||||
func (s *HTTP) Serve() error {
|
||||
s.server.Serve()
|
||||
fs.Logf(s.f, "HTTP Server started on %s", s.server.URLs())
|
||||
s.server.Wait()
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -1013,5 +1013,3 @@ put them back in again.` >}}
|
||||
- Robin Rolf <imer@imer.cc>
|
||||
- Jean-Christophe Cura <jcaspes@gmail.com>
|
||||
- russcoss <russcoss@outlook.com>
|
||||
- Matt LaPaglia <mlapaglia@gmail.com>
|
||||
- Youfu Zhang <1315097+zhangyoufu@users.noreply.github.com>
|
||||
|
||||
@@ -21,7 +21,7 @@ you started to share on Windows. On smbd, it's the section title in `smb.conf`
|
||||
(usually in `/etc/samba/`) file.
|
||||
You can find shares by querying the root if you're unsure (e.g. `rclone lsd remote:`).
|
||||
|
||||
You can't access the shared printers from rclone, obviously.
|
||||
You can't access to the shared printers from rclone, obviously.
|
||||
|
||||
You can't use Anonymous access for logging in. You have to use the `guest` user
|
||||
with an empty password instead. The rclone client tries to avoid 8.3 names when
|
||||
|
||||
@@ -1281,48 +1281,6 @@ type readCloser struct {
|
||||
io.Closer
|
||||
}
|
||||
|
||||
// CatFile outputs the file to the io.Writer
|
||||
//
|
||||
// if offset == 0 it will be ignored
|
||||
// if offset > 0 then the file will be seeked to that offset
|
||||
// if offset < 0 then the file will be seeked that far from the end
|
||||
//
|
||||
// if count < 0 then it will be ignored
|
||||
// if count >= 0 then only that many characters will be output
|
||||
func CatFile(ctx context.Context, o fs.Object, offset, count int64, w io.Writer) (err error) {
|
||||
ci := fs.GetConfig(ctx)
|
||||
tr := accounting.Stats(ctx).NewTransfer(o, nil)
|
||||
defer func() {
|
||||
tr.Done(ctx, err)
|
||||
}()
|
||||
opt := fs.RangeOption{Start: offset, End: -1}
|
||||
size := o.Size()
|
||||
if opt.Start < 0 {
|
||||
opt.Start += size
|
||||
}
|
||||
if count >= 0 {
|
||||
opt.End = opt.Start + count - 1
|
||||
}
|
||||
var options []fs.OpenOption
|
||||
if opt.Start > 0 || opt.End >= 0 {
|
||||
options = append(options, &opt)
|
||||
}
|
||||
for _, option := range ci.DownloadHeaders {
|
||||
options = append(options, option)
|
||||
}
|
||||
var in io.ReadCloser
|
||||
in, err = Open(ctx, o, options...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if count >= 0 {
|
||||
in = &readCloser{Reader: &io.LimitedReader{R: in, N: count}, Closer: in}
|
||||
}
|
||||
in = tr.Account(ctx, in).WithBuffer() // account and buffer the transfer
|
||||
_, err = io.Copy(w, in)
|
||||
return err
|
||||
}
|
||||
|
||||
// Cat any files to the io.Writer
|
||||
//
|
||||
// if offset == 0 it will be ignored
|
||||
@@ -1333,14 +1291,46 @@ func CatFile(ctx context.Context, o fs.Object, offset, count int64, w io.Writer)
|
||||
// if count >= 0 then only that many characters will be output
|
||||
func Cat(ctx context.Context, f fs.Fs, w io.Writer, offset, count int64, sep []byte) error {
|
||||
var mu sync.Mutex
|
||||
ci := fs.GetConfig(ctx)
|
||||
return ListFn(ctx, f, func(o fs.Object) {
|
||||
var err error
|
||||
tr := accounting.Stats(ctx).NewTransfer(o, nil)
|
||||
defer func() {
|
||||
tr.Done(ctx, err)
|
||||
}()
|
||||
opt := fs.RangeOption{Start: offset, End: -1}
|
||||
size := o.Size()
|
||||
if opt.Start < 0 {
|
||||
opt.Start += size
|
||||
}
|
||||
if count >= 0 {
|
||||
opt.End = opt.Start + count - 1
|
||||
}
|
||||
var options []fs.OpenOption
|
||||
if opt.Start > 0 || opt.End >= 0 {
|
||||
options = append(options, &opt)
|
||||
}
|
||||
for _, option := range ci.DownloadHeaders {
|
||||
options = append(options, option)
|
||||
}
|
||||
var in io.ReadCloser
|
||||
in, err = Open(ctx, o, options...)
|
||||
if err != nil {
|
||||
err = fs.CountError(ctx, err)
|
||||
fs.Errorf(o, "Failed to open: %v", err)
|
||||
return
|
||||
}
|
||||
if count >= 0 {
|
||||
in = &readCloser{Reader: &io.LimitedReader{R: in, N: count}, Closer: in}
|
||||
}
|
||||
in = tr.Account(ctx, in).WithBuffer() // account and buffer the transfer
|
||||
// take the lock just before we output stuff, so at the last possible moment
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
err := CatFile(ctx, o, offset, count, w)
|
||||
_, err = io.Copy(w, in)
|
||||
if err != nil {
|
||||
err = fs.CountError(ctx, err)
|
||||
fs.Errorf(o, "Failed to send to output: %v", err)
|
||||
return
|
||||
}
|
||||
if len(sep) > 0 {
|
||||
_, err = w.Write(sep)
|
||||
|
||||
@@ -208,7 +208,7 @@ func init() {
|
||||
{name: "rmdir", title: "Remove an empty directory or container"},
|
||||
{name: "purge", title: "Remove a directory or container and all of its contents"},
|
||||
{name: "rmdirs", title: "Remove all the empty directories in the path", help: "- leaveRoot - boolean, set to true not to delete the root\n"},
|
||||
{name: "delete", title: "Remove files in the path", noRemote: true},
|
||||
{name: "delete", title: "Remove files in the path", help: "- rmdirs - boolean, set to true to remove empty directories\n- leaveRoot - boolean if rmdirs is set, set to true not to delete the root\n", noRemote: true},
|
||||
{name: "deletefile", title: "Remove the single file pointed to"},
|
||||
{name: "copyurl", title: "Copy the URL to the object", help: "- url - string, URL to read from\n - autoFilename - boolean, set to true to retrieve destination file name from url\n"},
|
||||
{name: "uploadfile", title: "Upload file using multiform/form-data", help: "- each part in body represents a file to be uploaded\n", needsRequest: true, noCommand: true},
|
||||
@@ -267,7 +267,22 @@ func rcSingleCommand(ctx context.Context, in rc.Params, name string, noRemote bo
|
||||
}
|
||||
return nil, Rmdirs(ctx, f, remote, leaveRoot)
|
||||
case "delete":
|
||||
return nil, Delete(ctx, f)
|
||||
rmdirs, err := in.GetBool("rmdirs")
|
||||
if rc.NotErrParamNotFound(err) {
|
||||
return nil, err
|
||||
}
|
||||
leaveRoot, err := in.GetBool("leaveRoot")
|
||||
if rc.NotErrParamNotFound(err) {
|
||||
return nil, err
|
||||
}
|
||||
err = Delete(ctx, f)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if !rmdirs {
|
||||
return nil, nil
|
||||
}
|
||||
return nil, Rmdirs(ctx, f, remote, leaveRoot)
|
||||
case "deletefile":
|
||||
o, err := f.NewObject(ctx, remote)
|
||||
if err != nil {
|
||||
@@ -948,52 +963,3 @@ func rcHashsum(ctx context.Context, in rc.Params) (out rc.Params, err error) {
|
||||
}
|
||||
return out, err
|
||||
}
|
||||
|
||||
func init() {
|
||||
rc.Add(rc.Call{
|
||||
Path: "operations/discard",
|
||||
AuthRequired: true,
|
||||
Fn: rcDiscard,
|
||||
Title: "Read and discard bytes from a file",
|
||||
Help: `This takes the following parameters:
|
||||
|
||||
- fs - a remote name string e.g. "drive:"
|
||||
- remote - a file within that remote e.g. "file.txt"
|
||||
- offset - offset to start reading from, start if unset, from end if -ve
|
||||
- count - bytes to read, all if unset
|
||||
|
||||
This is similar to the [cat](/commands/rclone_cat/) with the --discard flag.
|
||||
|
||||
It can be used for reading files into the VFS cache.
|
||||
`,
|
||||
})
|
||||
}
|
||||
|
||||
// Cat a file with --discard
|
||||
func rcDiscard(ctx context.Context, in rc.Params) (out rc.Params, err error) {
|
||||
f, remote, err := rc.GetFsAndRemote(ctx, in)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
o, err := f.NewObject(ctx, remote)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
offset, err := in.GetInt64("offset")
|
||||
if rc.IsErrParamNotFound(err) {
|
||||
offset = 0
|
||||
} else if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
count, err := in.GetInt64("count")
|
||||
if rc.IsErrParamNotFound(err) {
|
||||
count = -1
|
||||
} else if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
err = CatFile(ctx, o, offset, count, io.Discard)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
@@ -14,14 +14,12 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/rclone/rclone/fs"
|
||||
"github.com/rclone/rclone/fs/accounting"
|
||||
"github.com/rclone/rclone/fs/cache"
|
||||
"github.com/rclone/rclone/fs/hash"
|
||||
"github.com/rclone/rclone/fs/operations"
|
||||
"github.com/rclone/rclone/fs/rc"
|
||||
"github.com/rclone/rclone/fstest"
|
||||
"github.com/rclone/rclone/lib/diskusage"
|
||||
"github.com/rclone/rclone/lib/random"
|
||||
"github.com/rclone/rclone/lib/rest"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
@@ -161,21 +159,32 @@ func TestRcCopyurl(t *testing.T) {
|
||||
|
||||
// operations/delete: Remove files in the path
|
||||
func TestRcDelete(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
r, call := rcNewRun(t, "operations/delete")
|
||||
file1 := r.WriteObject(ctx, "subdir/file1", "subdir/file1 contents", t1)
|
||||
file2 := r.WriteObject(ctx, "file2", "file2 contents", t1)
|
||||
|
||||
file1 := r.WriteObject(context.Background(), "small", "1234567890", t2) // 10 bytes
|
||||
file2 := r.WriteObject(context.Background(), "medium", "------------------------------------------------------------", t1) // 60 bytes
|
||||
file3 := r.WriteObject(context.Background(), "large", "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA", t1) // 100 bytes
|
||||
r.CheckRemoteItems(t, file1, file2, file3)
|
||||
fstest.CheckListingWithPrecision(t, r.Fremote, []fstest.Item{file1, file2}, []string{"subdir"}, fs.GetModifyWindow(ctx, r.Fremote))
|
||||
|
||||
in := rc.Params{
|
||||
"fs": r.FremoteName,
|
||||
}
|
||||
out, err := call.Fn(context.Background(), in)
|
||||
out, err := call.Fn(ctx, in)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, rc.Params(nil), out)
|
||||
|
||||
r.CheckRemoteItems(t)
|
||||
fstest.CheckListingWithPrecision(t, r.Fremote, []fstest.Item{}, []string{"subdir"}, fs.GetModifyWindow(ctx, r.Fremote))
|
||||
|
||||
// Now try with rmdirs=true and leaveRoot=true
|
||||
in["rmdirs"] = true
|
||||
in["leaveRoot"] = true
|
||||
out, err = call.Fn(ctx, in)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, rc.Params(nil), out)
|
||||
|
||||
fstest.CheckListingWithPrecision(t, r.Fremote, []fstest.Item{}, []string{}, fs.GetModifyWindow(ctx, r.Fremote))
|
||||
|
||||
// FIXME don't have an easy way of checking the root still exists or not
|
||||
}
|
||||
|
||||
// operations/deletefile: Remove the single file pointed to
|
||||
@@ -868,66 +877,3 @@ func TestRcHashsumFile(t *testing.T) {
|
||||
assert.Equal(t, "md5", out["hashType"])
|
||||
assert.Equal(t, []string{"0ef726ce9b1a7692357ff70dd321d595 hashsum-file1"}, out["hashsum"])
|
||||
}
|
||||
|
||||
// operations/discard: read and discard the contents of a file
|
||||
func TestRcDiscard(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
r, call := rcNewRun(t, "operations/discard")
|
||||
r.Mkdir(ctx, r.Fremote)
|
||||
|
||||
fileContents := "file contents to be discarded"
|
||||
file := r.WriteBoth(ctx, "discard-file", fileContents, t1)
|
||||
r.CheckLocalItems(t, file)
|
||||
r.CheckRemoteItems(t, file)
|
||||
|
||||
for _, tt := range []struct {
|
||||
name string
|
||||
in rc.Params
|
||||
want int64
|
||||
}{{
|
||||
name: "full read",
|
||||
in: rc.Params{
|
||||
"fs": r.FremoteName,
|
||||
"remote": file.Path,
|
||||
},
|
||||
want: int64(len(fileContents)),
|
||||
}, {
|
||||
name: "start",
|
||||
in: rc.Params{
|
||||
"fs": r.FremoteName,
|
||||
"remote": file.Path,
|
||||
"count": 2,
|
||||
},
|
||||
want: 2,
|
||||
}, {
|
||||
name: "offset",
|
||||
in: rc.Params{
|
||||
"fs": r.FremoteName,
|
||||
"remote": file.Path,
|
||||
"offset": 1,
|
||||
"count": 3,
|
||||
},
|
||||
want: 3,
|
||||
}, {
|
||||
name: "end",
|
||||
in: rc.Params{
|
||||
"fs": r.FremoteName,
|
||||
"remote": file.Path,
|
||||
"offset": -1,
|
||||
"count": 4,
|
||||
},
|
||||
want: 1,
|
||||
}} {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
group := random.String(8)
|
||||
ctx := accounting.WithStatsGroup(ctx, group)
|
||||
|
||||
out, err := call.Fn(ctx, tt.in)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, rc.Params(nil), out)
|
||||
|
||||
stats := accounting.Stats(ctx)
|
||||
assert.Equal(t, tt.want, stats.GetBytes())
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -523,6 +523,8 @@ func (s *Server) initTLS() error {
|
||||
func (s *Server) Serve() {
|
||||
s.wg.Add(len(s.instances))
|
||||
for _, ii := range s.instances {
|
||||
// TODO: decide how/when to log listening url
|
||||
// log.Printf("listening on %s", ii.url)
|
||||
go ii.serve(&s.wg)
|
||||
}
|
||||
// Install an atexit handler to shutdown gracefully
|
||||
|
||||
@@ -4,8 +4,6 @@ package pacer
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"runtime"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@@ -155,13 +153,13 @@ func (p *Pacer) ModifyCalculator(f func(Calculator)) {
|
||||
// This must be called as a pair with endCall.
|
||||
//
|
||||
// This waits for the pacer token
|
||||
func (p *Pacer) beginCall(limitConnections bool) {
|
||||
func (p *Pacer) beginCall() {
|
||||
// pacer starts with a token in and whenever we take one out
|
||||
// XXX ms later we put another in. We could do this with a
|
||||
// Ticker more accurately, but then we'd have to work out how
|
||||
// not to run it when it wasn't needed
|
||||
<-p.pacer
|
||||
if limitConnections {
|
||||
if p.maxConnections > 0 {
|
||||
<-p.connTokens
|
||||
}
|
||||
|
||||
@@ -178,8 +176,8 @@ func (p *Pacer) beginCall(limitConnections bool) {
|
||||
//
|
||||
// This should calculate a new sleepTime. It takes a boolean as to
|
||||
// whether the operation should be retried or not.
|
||||
func (p *Pacer) endCall(retry bool, err error, limitConnections bool) {
|
||||
if limitConnections {
|
||||
func (p *Pacer) endCall(retry bool, err error) {
|
||||
if p.maxConnections > 0 {
|
||||
p.connTokens <- struct{}{}
|
||||
}
|
||||
p.mu.Lock()
|
||||
@@ -193,44 +191,13 @@ func (p *Pacer) endCall(retry bool, err error, limitConnections bool) {
|
||||
p.mu.Unlock()
|
||||
}
|
||||
|
||||
// Detect the pacer being called reentrantly.
|
||||
//
|
||||
// This looks for Pacer.call in the call stack and returns true if it
|
||||
// is found.
|
||||
//
|
||||
// Ideally we would do this by passing a context about but there are
|
||||
// an awful lot of Pacer calls!
|
||||
//
|
||||
// This is only needed when p.maxConnections > 0 which isn't a common
|
||||
// configuration so adding a bit of extra slowdown here is not a
|
||||
// problem.
|
||||
func pacerReentered() bool {
|
||||
var pcs [48]uintptr
|
||||
n := runtime.Callers(3, pcs[:]) // skip runtime.Callers, pacerReentered and call
|
||||
frames := runtime.CallersFrames(pcs[:n])
|
||||
for {
|
||||
f, more := frames.Next()
|
||||
if strings.HasSuffix(f.Function, "(*Pacer).call") {
|
||||
return true
|
||||
}
|
||||
if !more {
|
||||
break
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// call implements Call but with settable retries
|
||||
func (p *Pacer) call(fn Paced, retries int) (err error) {
|
||||
var retry bool
|
||||
limitConnections := false
|
||||
if p.maxConnections > 0 && !pacerReentered() {
|
||||
limitConnections = true
|
||||
}
|
||||
for i := 1; i <= retries; i++ {
|
||||
p.beginCall(limitConnections)
|
||||
p.beginCall()
|
||||
retry, err = p.invoker(i, retries, fn)
|
||||
p.endCall(retry, err, limitConnections)
|
||||
p.endCall(retry, err)
|
||||
if !retry {
|
||||
break
|
||||
}
|
||||
|
||||
@@ -108,7 +108,7 @@ func waitForPace(p *Pacer, duration time.Duration) (when time.Time) {
|
||||
func TestBeginCall(t *testing.T) {
|
||||
p := New(MaxConnectionsOption(10), CalculatorOption(NewDefault(MinSleep(1*time.Millisecond))))
|
||||
emptyTokens(p)
|
||||
go p.beginCall(true)
|
||||
go p.beginCall()
|
||||
if !waitForPace(p, 10*time.Millisecond).IsZero() {
|
||||
t.Errorf("beginSleep fired too early #1")
|
||||
}
|
||||
@@ -131,7 +131,7 @@ func TestBeginCall(t *testing.T) {
|
||||
func TestBeginCallZeroConnections(t *testing.T) {
|
||||
p := New(MaxConnectionsOption(0), CalculatorOption(NewDefault(MinSleep(1*time.Millisecond))))
|
||||
emptyTokens(p)
|
||||
go p.beginCall(false)
|
||||
go p.beginCall()
|
||||
if !waitForPace(p, 10*time.Millisecond).IsZero() {
|
||||
t.Errorf("beginSleep fired too early #1")
|
||||
}
|
||||
@@ -257,7 +257,7 @@ func TestEndCall(t *testing.T) {
|
||||
p := New(MaxConnectionsOption(5))
|
||||
emptyTokens(p)
|
||||
p.state.ConsecutiveRetries = 1
|
||||
p.endCall(true, nil, true)
|
||||
p.endCall(true, nil)
|
||||
assert.Equal(t, 1, len(p.connTokens))
|
||||
assert.Equal(t, 2, p.state.ConsecutiveRetries)
|
||||
}
|
||||
@@ -266,7 +266,7 @@ func TestEndCallZeroConnections(t *testing.T) {
|
||||
p := New(MaxConnectionsOption(0))
|
||||
emptyTokens(p)
|
||||
p.state.ConsecutiveRetries = 1
|
||||
p.endCall(false, nil, false)
|
||||
p.endCall(false, nil)
|
||||
assert.Equal(t, 0, len(p.connTokens))
|
||||
assert.Equal(t, 0, p.state.ConsecutiveRetries)
|
||||
}
|
||||
@@ -353,41 +353,6 @@ func TestCallParallel(t *testing.T) {
|
||||
wait.Broadcast()
|
||||
}
|
||||
|
||||
func BenchmarkPacerReentered(b *testing.B) {
|
||||
for b.Loop() {
|
||||
_ = pacerReentered()
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkPacerReentered100(b *testing.B) {
|
||||
var fn func(level int)
|
||||
fn = func(level int) {
|
||||
if level > 0 {
|
||||
fn(level - 1)
|
||||
return
|
||||
}
|
||||
for b.Loop() {
|
||||
_ = pacerReentered()
|
||||
}
|
||||
|
||||
}
|
||||
fn(100)
|
||||
}
|
||||
|
||||
func TestCallMaxConnectionsRecursiveDeadlock(t *testing.T) {
|
||||
p := New(CalculatorOption(NewDefault(MinSleep(1*time.Millisecond), MaxSleep(2*time.Millisecond))))
|
||||
p.SetMaxConnections(1)
|
||||
dp := &dummyPaced{retry: false}
|
||||
err := p.Call(func() (bool, error) {
|
||||
// check we have taken the connection token
|
||||
// no tokens left means deadlock on the recursive call
|
||||
assert.Equal(t, 0, len(p.connTokens))
|
||||
return false, p.Call(dp.fn)
|
||||
})
|
||||
assert.Equal(t, 1, dp.called)
|
||||
assert.Equal(t, errFoo, err)
|
||||
}
|
||||
|
||||
func TestRetryAfterError_NonNilErr(t *testing.T) {
|
||||
orig := errors.New("test failure")
|
||||
dur := 2 * time.Second
|
||||
|
||||
Reference in New Issue
Block a user