1
0
mirror of https://github.com/rclone/rclone.git synced 2026-01-06 02:23:24 +00:00

Compare commits

..

13 Commits

Author SHA1 Message Date
Nick Craig-Wood
1e207173d9 accounting: add SetMaxCompletedTransfers method to fix bisync race #8815
Before this change bisync adjusted the global MaxCompletedTransfers
variable which caused races.

This adds a SetMaxCompletedTransfers method and uses it in bisync.

The MaxCompletedTransfers global becomes the default. This can be
changed externally if rclone is in use as a library, and the commit
history indicates that MaxCompletedTransfers was added for exactly
this purpose so we try not to break it here.
2025-09-24 10:26:07 +01:00
Nick Craig-Wood
05279e3918 accounting: add RemoveDoneTransfers method to fix bisync race #8815
Before this change bisync was adjusting MaxCompletedTransfers in order
to clear the done transfers from the stats.

This wasn't working (because it was only clearing one transfer) and
was part of a race adjusting MaxCompletedTransfers.

This fixes the problem by introducing a new method RemoveDoneTransfers
to clear the done transfers explicitly and calling it in bisync.
2025-09-24 10:25:29 +01:00
nielash
42a601fbf2 bisync: use t.TempDir() in tests to fix sporadic race #8815 2025-09-24 10:24:30 +01:00
Nick Craig-Wood
9b5e6a7e91 bisync: fix race when CaptureOutput is used concurrently #8815
Before this change CaptureOutput could trip the race detector when
used concurrently. In particular if go routines using the logging are
outlasting the return from `fun()`.

This fixes the problem with a mutex.
2025-09-24 10:24:30 +01:00
Nick Craig-Wood
5d6d79e7d4 pacer: fix deadlock with --max-connections
If the pacer was used recursively and --max-connections was in use
then it could deadlock if all the connections were in use at the time
of recursive call (likely).

This affected the azureblob backend because when it receives an
InvalidBlockOrBlob error it attempts to clear the condition before
retrying. This in turn involves recursively calling the pacer.

This fixes the problem by skipping the --max-connections check if the
pacer is called recursively.

The recursive detection is done by stack inspection which isn't ideal,
but the alternative would be to add ctx to all >1,000 pacer calls. The
benchmark reveals stack inspection takes about 55nS per stack level so
it is relatively cheap.
2025-09-22 17:39:27 +01:00
Nick Craig-Wood
11de074cbf Revert "azureblob: fix deadlock with --max-connections with InvalidBlockOrBlob errors"
This reverts commit 0c1902cc6037d81eaf95e931172879517a25d529.

This turns out not to be sufficient so we need a better approach
2025-09-22 17:39:27 +01:00
Nick Craig-Wood
e9ab177a32 Add Youfu Zhang to contributors 2025-09-22 17:39:27 +01:00
Nick Craig-Wood
f3f4fba98d Add Matt LaPaglia to contributors 2025-09-22 17:39:27 +01:00
Sudipto Baral
03fccdd67b smb: optimize smb mount performance by avoiding stat checks during initialization
add IsPathDir function and tests for trailing slash optimization
2025-09-22 15:33:44 +01:00
Youfu Zhang
231083647e pikpak: fix unnecessary retries by using URL expire parameter - fixes #8601
Before this change, rclone would unnecessarily retry downloads when
the `Link.Expire` field was unreliable but the download URL contained
a valid expire query parameter. This primarily affects cases where
media links are unavailable or when `no_media_link` is enabled.

The `Link.Valid()` method now primarily checks the URL's expire query
parameter (as Unix timestamp) and falls back to the Expire field
only when URL parsing fails. This eliminates the `error no link`
retry loops while maintaining backward compatibility.

Signed-off-by: Youfu Zhang <zhangyoufu@gmail.com>
2025-09-19 12:46:26 +09:00
dougal
0e203a7546 serve http: fix: logging url on start 2025-09-18 14:49:58 +01:00
Matt LaPaglia
a7dd787569 docs: fix typo 2025-09-16 14:27:10 +02:00
dougal
689555033e b2: fix 1TB+ uploads
Before this change the minimum chunk size would default to 96M which
would allow a maximum size of just below 1TB file to be uploaded, due to
the 10000 part rule for b2.

Now the calculated chunk size is used so the chunk size can be 5GB
making a max file size of 50TB.

Fixes #8460
2025-09-15 13:05:20 +01:00
20 changed files with 377 additions and 112 deletions

View File

@@ -2797,8 +2797,6 @@ func (o *Object) clearUncommittedBlocks(ctx context.Context) (err error) {
blockList blockblob.GetBlockListResponse
properties *blob.GetPropertiesResponse
options *blockblob.CommitBlockListOptions
// Use temporary pacer as this can be called recursively which can cause a deadlock with --max-connections
pacer = fs.NewPacer(ctx, pacer.NewS3(pacer.MinSleep(minSleep), pacer.MaxSleep(maxSleep), pacer.DecayConstant(decayConstant)))
)
properties, err = o.readMetaDataAlways(ctx)
@@ -2810,7 +2808,7 @@ func (o *Object) clearUncommittedBlocks(ctx context.Context) (err error) {
if objectExists {
// Get the committed block list
err = pacer.Call(func() (bool, error) {
err = o.fs.pacer.Call(func() (bool, error) {
blockList, err = blockBlobSVC.GetBlockList(ctx, blockblob.BlockListTypeAll, nil)
return o.fs.shouldRetry(ctx, err)
})
@@ -2852,7 +2850,7 @@ func (o *Object) clearUncommittedBlocks(ctx context.Context) (err error) {
// Commit only the committed blocks
fs.Debugf(o, "Committing %d blocks to remove uncommitted blocks", len(blockIDs))
err = pacer.Call(func() (bool, error) {
err = o.fs.pacer.Call(func() (bool, error) {
_, err := blockBlobSVC.CommitBlockList(ctx, blockIDs, options)
return o.fs.shouldRetry(ctx, err)
})

View File

@@ -2224,13 +2224,17 @@ func (f *Fs) OpenChunkWriter(ctx context.Context, remote string, src fs.ObjectIn
return info, nil, err
}
up, err := f.newLargeUpload(ctx, o, nil, src, f.opt.ChunkSize, false, nil, options...)
if err != nil {
return info, nil, err
}
info = fs.ChunkWriterInfo{
ChunkSize: int64(f.opt.ChunkSize),
ChunkSize: up.chunkSize,
Concurrency: o.fs.opt.UploadConcurrency,
//LeavePartsOnError: o.fs.opt.LeavePartsOnError,
}
up, err := f.newLargeUpload(ctx, o, nil, src, f.opt.ChunkSize, false, nil, options...)
return info, up, err
return info, up, nil
}
// Remove an object

View File

@@ -5,6 +5,7 @@ package api
import (
"fmt"
"net/url"
"reflect"
"strconv"
"time"
@@ -136,8 +137,25 @@ type Link struct {
}
// Valid reports whether l is non-nil, has an URL, and is not expired.
// It primarily checks the URL's expire query parameter, falling back to the Expire field.
func (l *Link) Valid() bool {
return l != nil && l.URL != "" && time.Now().Add(10*time.Second).Before(time.Time(l.Expire))
if l == nil || l.URL == "" {
return false
}
// Primary validation: check URL's expire query parameter
if u, err := url.Parse(l.URL); err == nil {
if expireStr := u.Query().Get("expire"); expireStr != "" {
// Try parsing as Unix timestamp (seconds)
if expireInt, err := strconv.ParseInt(expireStr, 10, 64); err == nil {
expireTime := time.Unix(expireInt, 0)
return time.Now().Add(10 * time.Second).Before(expireTime)
}
}
}
// Fallback validation: use the Expire field if URL parsing didn't work
return time.Now().Add(10 * time.Second).Before(time.Time(l.Expire))
}
// URL is a basic form of URL

View File

@@ -0,0 +1,99 @@
package api
import (
"fmt"
"testing"
"time"
)
// TestLinkValid tests the Link.Valid method for various scenarios
func TestLinkValid(t *testing.T) {
tests := []struct {
name string
link *Link
expected bool
desc string
}{
{
name: "nil link",
link: nil,
expected: false,
desc: "nil link should be invalid",
},
{
name: "empty URL",
link: &Link{URL: ""},
expected: false,
desc: "empty URL should be invalid",
},
{
name: "valid URL with future expire parameter",
link: &Link{
URL: fmt.Sprintf("https://example.com/file?expire=%d", time.Now().Add(time.Hour).Unix()),
},
expected: true,
desc: "URL with future expire parameter should be valid",
},
{
name: "expired URL with past expire parameter",
link: &Link{
URL: fmt.Sprintf("https://example.com/file?expire=%d", time.Now().Add(-time.Hour).Unix()),
},
expected: false,
desc: "URL with past expire parameter should be invalid",
},
{
name: "URL expire parameter takes precedence over Expire field",
link: &Link{
URL: fmt.Sprintf("https://example.com/file?expire=%d", time.Now().Add(time.Hour).Unix()),
Expire: Time(time.Now().Add(-time.Hour)), // Fallback is expired
},
expected: true,
desc: "URL expire parameter should take precedence over Expire field",
},
{
name: "URL expire parameter within 10 second buffer should be invalid",
link: &Link{
URL: fmt.Sprintf("https://example.com/file?expire=%d", time.Now().Add(5*time.Second).Unix()),
},
expected: false,
desc: "URL expire parameter within 10 second buffer should be invalid",
},
{
name: "fallback to Expire field when no URL expire parameter",
link: &Link{
URL: "https://example.com/file",
Expire: Time(time.Now().Add(time.Hour)),
},
expected: true,
desc: "should fallback to Expire field when URL has no expire parameter",
},
{
name: "fallback to Expire field when URL expire parameter is invalid",
link: &Link{
URL: "https://example.com/file?expire=invalid",
Expire: Time(time.Now().Add(time.Hour)),
},
expected: true,
desc: "should fallback to Expire field when URL expire parameter is unparseable",
},
{
name: "invalid when both URL expire and Expire field are expired",
link: &Link{
URL: fmt.Sprintf("https://example.com/file?expire=%d", time.Now().Add(-time.Hour).Unix()),
Expire: Time(time.Now().Add(-time.Hour)),
},
expected: false,
desc: "should be invalid when both URL expire and Expire field are expired",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := tt.link.Valid()
if result != tt.expected {
t.Errorf("Link.Valid() = %v, expected %v. %s", result, tt.expected, tt.desc)
}
})
}
}

View File

@@ -192,6 +192,9 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e
return nil, err
}
// if root is empty or ends with / (must be a directory)
isRootDir := isPathDir(root)
root = strings.Trim(root, "/")
f := &Fs{
@@ -218,6 +221,11 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e
if share == "" || dir == "" {
return f, nil
}
// Skip stat check if root is already a directory
if isRootDir {
return f, nil
}
cn, err := f.getConnection(ctx, share)
if err != nil {
return nil, err
@@ -894,6 +902,11 @@ func ensureSuffix(s, suffix string) string {
return s + suffix
}
// isPathDir determines if a path represents a directory based on trailing slash
func isPathDir(path string) bool {
return path == "" || strings.HasSuffix(path, "/")
}
func trimPathPrefix(s, prefix string) string {
// we need to clean the paths to make tests pass!
s = betterPathClean(s)

View File

@@ -0,0 +1,41 @@
// Unit tests for internal SMB functions
package smb
import "testing"
// TestIsPathDir tests the isPathDir function logic
func TestIsPathDir(t *testing.T) {
tests := []struct {
path string
expected bool
}{
// Empty path should be considered a directory
{"", true},
// Paths with trailing slash should be directories
{"/", true},
{"share/", true},
{"share/dir/", true},
{"share/dir/subdir/", true},
// Paths without trailing slash should not be directories
{"share", false},
{"share/dir", false},
{"share/dir/file", false},
{"share/dir/subdir/file", false},
// Edge cases
{"share//", true},
{"share///", true},
{"share/dir//", true},
}
for _, tt := range tests {
t.Run(tt.path, func(t *testing.T) {
result := isPathDir(tt.path)
if result != tt.expected {
t.Errorf("isPathDir(%q) = %v, want %v", tt.path, result, tt.expected)
}
})
}
}

View File

@@ -4,15 +4,19 @@ package bilib
import (
"bytes"
"log/slog"
"sync"
"github.com/rclone/rclone/fs/log"
)
// CaptureOutput runs a function capturing its output at log level INFO.
func CaptureOutput(fun func()) []byte {
var mu sync.Mutex
buf := &bytes.Buffer{}
oldLevel := log.Handler.SetLevel(slog.LevelInfo)
log.Handler.SetOutput(func(level slog.Level, text string) {
mu.Lock()
defer mu.Unlock()
buf.WriteString(text)
})
defer func() {
@@ -20,5 +24,7 @@ func CaptureOutput(fun func()) []byte {
log.Handler.SetLevel(oldLevel)
}()
fun()
mu.Lock()
defer mu.Unlock()
return buf.Bytes()
}

View File

@@ -330,7 +330,7 @@ func testBisync(ctx context.Context, t *testing.T, path1, path2 string) {
baseDir, err := os.Getwd()
require.NoError(t, err, "get current directory")
randName := time.Now().Format("150405") + random.String(2) // some bucket backends don't like dots, keep this short to avoid linux errors
tempDir := filepath.Join(os.TempDir(), randName)
tempDir := filepath.Join(t.TempDir(), randName)
workDir := filepath.Join(tempDir, "workdir")
b := &bisyncTest{

View File

@@ -707,8 +707,7 @@ func (b *bisyncRun) modifyListing(ctx context.Context, src fs.Fs, dst fs.Fs, res
prettyprint(dstList.list, "dstList", fs.LogLevelDebug)
// clear stats so we only do this once
accounting.MaxCompletedTransfers = 0
accounting.Stats(ctx).PruneTransfers()
accounting.Stats(ctx).RemoveDoneTransfers()
}
if b.DebugName != "" {

View File

@@ -245,10 +245,8 @@ func (b *bisyncRun) fastCopy(ctx context.Context, fsrc, fdst fs.Fs, files bilib.
}
}
b.SyncCI = fs.GetConfig(ctxCopy) // allows us to request graceful shutdown
if accounting.MaxCompletedTransfers != -1 {
accounting.MaxCompletedTransfers = -1 // we need a complete list in the event of graceful shutdown
}
b.SyncCI = fs.GetConfig(ctxCopy) // allows us to request graceful shutdown
accounting.Stats(ctxCopy).SetMaxCompletedTransfers(-1) // we need a complete list in the event of graceful shutdown
ctxCopy, b.CancelSync = context.WithCancel(ctxCopy)
b.testFn()
err := sync.Sync(ctxCopy, fdst, fsrc, b.opt.CreateEmptySrcDirs)

View File

@@ -208,6 +208,7 @@ func newServer(ctx context.Context, f fs.Fs, opt *Options, vfsOpt *vfscommon.Opt
// Serve HTTP until the server is shutdown
func (s *HTTP) Serve() error {
s.server.Serve()
fs.Logf(s.f, "HTTP Server started on %s", s.server.URLs())
s.server.Wait()
return nil
}

View File

@@ -1013,3 +1013,5 @@ put them back in again.` >}}
- Robin Rolf <imer@imer.cc>
- Jean-Christophe Cura <jcaspes@gmail.com>
- russcoss <russcoss@outlook.com>
- Matt LaPaglia <mlapaglia@gmail.com>
- Youfu Zhang <1315097+zhangyoufu@users.noreply.github.com>

View File

@@ -21,7 +21,7 @@ you started to share on Windows. On smbd, it's the section title in `smb.conf`
(usually in `/etc/samba/`) file.
You can find shares by querying the root if you're unsure (e.g. `rclone lsd remote:`).
You can't access to the shared printers from rclone, obviously.
You can't access the shared printers from rclone, obviously.
You can't use Anonymous access for logging in. You have to use the `guest` user
with an empty password instead. The rclone client tries to avoid 8.3 names when

View File

@@ -22,48 +22,52 @@ const (
averageStopAfter = time.Minute
)
// MaxCompletedTransfers specifies maximum number of completed transfers in startedTransfers list
// MaxCompletedTransfers specifies the default maximum number of
// completed transfers in startedTransfers list. This can be adjusted
// for a given StatsInfo by calling the SetMaxCompletedTransfers
// method.
var MaxCompletedTransfers = 100
// StatsInfo accounts all transfers
// N.B.: if this struct is modified, please remember to also update sum() function in stats_groups
// to correctly count the updated fields
type StatsInfo struct {
mu sync.RWMutex
ctx context.Context
ci *fs.ConfigInfo
bytes int64
errors int64
lastError error
fatalError bool
retryError bool
retryAfter time.Time
checks int64
checking *transferMap
checkQueue int
checkQueueSize int64
transfers int64
transferring *transferMap
transferQueue int
transferQueueSize int64
listed int64
renames int64
renameQueue int
renameQueueSize int64
deletes int64
deletesSize int64
deletedDirs int64
inProgress *inProgress
startedTransfers []*Transfer // currently active transfers
oldTimeRanges timeRanges // a merged list of time ranges for the transfers
oldDuration time.Duration // duration of transfers we have culled
group string
startTime time.Time // the moment these stats were initialized or reset
average averageValues
serverSideCopies int64
serverSideCopyBytes int64
serverSideMoves int64
serverSideMoveBytes int64
mu sync.RWMutex
ctx context.Context
ci *fs.ConfigInfo
bytes int64
errors int64
lastError error
fatalError bool
retryError bool
retryAfter time.Time
checks int64
checking *transferMap
checkQueue int
checkQueueSize int64
transfers int64
transferring *transferMap
transferQueue int
transferQueueSize int64
listed int64
renames int64
renameQueue int
renameQueueSize int64
deletes int64
deletesSize int64
deletedDirs int64
inProgress *inProgress
startedTransfers []*Transfer // currently active transfers
oldTimeRanges timeRanges // a merged list of time ranges for the transfers
oldDuration time.Duration // duration of transfers we have culled
group string
startTime time.Time // the moment these stats were initialized or reset
average averageValues
serverSideCopies int64
serverSideCopyBytes int64
serverSideMoves int64
serverSideMoveBytes int64
maxCompletedTransfers int
}
type averageValues struct {
@@ -81,17 +85,26 @@ type averageValues struct {
func NewStats(ctx context.Context) *StatsInfo {
ci := fs.GetConfig(ctx)
s := &StatsInfo{
ctx: ctx,
ci: ci,
checking: newTransferMap(ci.Checkers, "checking"),
transferring: newTransferMap(ci.Transfers, "transferring"),
inProgress: newInProgress(ctx),
startTime: time.Now(),
average: averageValues{},
ctx: ctx,
ci: ci,
checking: newTransferMap(ci.Checkers, "checking"),
transferring: newTransferMap(ci.Transfers, "transferring"),
inProgress: newInProgress(ctx),
startTime: time.Now(),
average: averageValues{},
maxCompletedTransfers: MaxCompletedTransfers,
}
return s
}
// SetMaxCompletedTransfers sets the maximum number of completed transfers to keep.
func (s *StatsInfo) SetMaxCompletedTransfers(n int) *StatsInfo {
s.mu.Lock()
s.maxCompletedTransfers = n
s.mu.Unlock()
return s
}
// RemoteStats returns stats for rc
//
// If short is true then the transfers and checkers won't be added.
@@ -912,22 +925,31 @@ func (s *StatsInfo) RemoveTransfer(transfer *Transfer) {
}
// PruneTransfers makes sure there aren't too many old transfers by removing
// single finished transfer.
func (s *StatsInfo) PruneTransfers() {
if MaxCompletedTransfers < 0 {
return
}
// a single finished transfer. Returns true if it removed a transfer.
func (s *StatsInfo) PruneTransfers() bool {
s.mu.Lock()
defer s.mu.Unlock()
if s.maxCompletedTransfers < 0 {
return false
}
removed := false
// remove a transfer from the start if we are over quota
if len(s.startedTransfers) > MaxCompletedTransfers+s.ci.Transfers {
if len(s.startedTransfers) > s.maxCompletedTransfers+s.ci.Transfers {
for i, tr := range s.startedTransfers {
if tr.IsDone() {
s._removeTransfer(tr, i)
removed = true
break
}
}
}
s.mu.Unlock()
return removed
}
// RemoveDoneTransfers removes all Done transfers.
func (s *StatsInfo) RemoveDoneTransfers() {
for s.PruneTransfers() {
}
}
// AddServerSideMove counts a server side move

View File

@@ -465,3 +465,27 @@ func TestPruneTransfers(t *testing.T) {
})
}
}
func TestRemoveDoneTransfers(t *testing.T) {
ctx := context.Background()
s := NewStats(ctx)
const transfers = 10
for i := int64(1); i <= int64(transfers); i++ {
s.AddTransfer(&Transfer{
startedAt: time.Unix(i, 0),
completedAt: time.Unix(i+1, 0),
})
}
s.mu.Lock()
assert.Equal(t, time.Duration(transfers)*time.Second, s._totalDuration())
assert.Equal(t, transfers, len(s.startedTransfers))
s.mu.Unlock()
s.RemoveDoneTransfers()
s.mu.Lock()
assert.Equal(t, time.Duration(transfers)*time.Second, s._totalDuration())
assert.Equal(t, transfers, len(s.startedTransfers))
s.mu.Unlock()
}

View File

@@ -208,7 +208,7 @@ func init() {
{name: "rmdir", title: "Remove an empty directory or container"},
{name: "purge", title: "Remove a directory or container and all of its contents"},
{name: "rmdirs", title: "Remove all the empty directories in the path", help: "- leaveRoot - boolean, set to true not to delete the root\n"},
{name: "delete", title: "Remove files in the path", help: "- rmdirs - boolean, set to true to remove empty directories\n- leaveRoot - boolean if rmdirs is set, set to true not to delete the root\n", noRemote: true},
{name: "delete", title: "Remove files in the path", noRemote: true},
{name: "deletefile", title: "Remove the single file pointed to"},
{name: "copyurl", title: "Copy the URL to the object", help: "- url - string, URL to read from\n - autoFilename - boolean, set to true to retrieve destination file name from url\n"},
{name: "uploadfile", title: "Upload file using multiform/form-data", help: "- each part in body represents a file to be uploaded\n", needsRequest: true, noCommand: true},
@@ -267,22 +267,7 @@ func rcSingleCommand(ctx context.Context, in rc.Params, name string, noRemote bo
}
return nil, Rmdirs(ctx, f, remote, leaveRoot)
case "delete":
rmdirs, err := in.GetBool("rmdirs")
if rc.NotErrParamNotFound(err) {
return nil, err
}
leaveRoot, err := in.GetBool("leaveRoot")
if rc.NotErrParamNotFound(err) {
return nil, err
}
err = Delete(ctx, f)
if err != nil {
return nil, err
}
if !rmdirs {
return nil, nil
}
return nil, Rmdirs(ctx, f, remote, leaveRoot)
return nil, Delete(ctx, f)
case "deletefile":
o, err := f.NewObject(ctx, remote)
if err != nil {

View File

@@ -159,32 +159,21 @@ func TestRcCopyurl(t *testing.T) {
// operations/delete: Remove files in the path
func TestRcDelete(t *testing.T) {
ctx := context.Background()
r, call := rcNewRun(t, "operations/delete")
file1 := r.WriteObject(ctx, "subdir/file1", "subdir/file1 contents", t1)
file2 := r.WriteObject(ctx, "file2", "file2 contents", t1)
fstest.CheckListingWithPrecision(t, r.Fremote, []fstest.Item{file1, file2}, []string{"subdir"}, fs.GetModifyWindow(ctx, r.Fremote))
file1 := r.WriteObject(context.Background(), "small", "1234567890", t2) // 10 bytes
file2 := r.WriteObject(context.Background(), "medium", "------------------------------------------------------------", t1) // 60 bytes
file3 := r.WriteObject(context.Background(), "large", "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA", t1) // 100 bytes
r.CheckRemoteItems(t, file1, file2, file3)
in := rc.Params{
"fs": r.FremoteName,
}
out, err := call.Fn(ctx, in)
out, err := call.Fn(context.Background(), in)
require.NoError(t, err)
assert.Equal(t, rc.Params(nil), out)
fstest.CheckListingWithPrecision(t, r.Fremote, []fstest.Item{}, []string{"subdir"}, fs.GetModifyWindow(ctx, r.Fremote))
// Now try with rmdirs=true and leaveRoot=true
in["rmdirs"] = true
in["leaveRoot"] = true
out, err = call.Fn(ctx, in)
require.NoError(t, err)
assert.Equal(t, rc.Params(nil), out)
fstest.CheckListingWithPrecision(t, r.Fremote, []fstest.Item{}, []string{}, fs.GetModifyWindow(ctx, r.Fremote))
// FIXME don't have an easy way of checking the root still exists or not
r.CheckRemoteItems(t)
}
// operations/deletefile: Remove the single file pointed to

View File

@@ -523,8 +523,6 @@ func (s *Server) initTLS() error {
func (s *Server) Serve() {
s.wg.Add(len(s.instances))
for _, ii := range s.instances {
// TODO: decide how/when to log listening url
// log.Printf("listening on %s", ii.url)
go ii.serve(&s.wg)
}
// Install an atexit handler to shutdown gracefully

View File

@@ -4,6 +4,8 @@ package pacer
import (
"errors"
"fmt"
"runtime"
"strings"
"sync"
"time"
@@ -153,13 +155,13 @@ func (p *Pacer) ModifyCalculator(f func(Calculator)) {
// This must be called as a pair with endCall.
//
// This waits for the pacer token
func (p *Pacer) beginCall() {
func (p *Pacer) beginCall(limitConnections bool) {
// pacer starts with a token in and whenever we take one out
// XXX ms later we put another in. We could do this with a
// Ticker more accurately, but then we'd have to work out how
// not to run it when it wasn't needed
<-p.pacer
if p.maxConnections > 0 {
if limitConnections {
<-p.connTokens
}
@@ -176,8 +178,8 @@ func (p *Pacer) beginCall() {
//
// This should calculate a new sleepTime. It takes a boolean as to
// whether the operation should be retried or not.
func (p *Pacer) endCall(retry bool, err error) {
if p.maxConnections > 0 {
func (p *Pacer) endCall(retry bool, err error, limitConnections bool) {
if limitConnections {
p.connTokens <- struct{}{}
}
p.mu.Lock()
@@ -191,13 +193,44 @@ func (p *Pacer) endCall(retry bool, err error) {
p.mu.Unlock()
}
// Detect the pacer being called reentrantly.
//
// This looks for Pacer.call in the call stack and returns true if it
// is found.
//
// Ideally we would do this by passing a context about but there are
// an awful lot of Pacer calls!
//
// This is only needed when p.maxConnections > 0 which isn't a common
// configuration so adding a bit of extra slowdown here is not a
// problem.
func pacerReentered() bool {
var pcs [48]uintptr
n := runtime.Callers(3, pcs[:]) // skip runtime.Callers, pacerReentered and call
frames := runtime.CallersFrames(pcs[:n])
for {
f, more := frames.Next()
if strings.HasSuffix(f.Function, "(*Pacer).call") {
return true
}
if !more {
break
}
}
return false
}
// call implements Call but with settable retries
func (p *Pacer) call(fn Paced, retries int) (err error) {
var retry bool
limitConnections := false
if p.maxConnections > 0 && !pacerReentered() {
limitConnections = true
}
for i := 1; i <= retries; i++ {
p.beginCall()
p.beginCall(limitConnections)
retry, err = p.invoker(i, retries, fn)
p.endCall(retry, err)
p.endCall(retry, err, limitConnections)
if !retry {
break
}

View File

@@ -108,7 +108,7 @@ func waitForPace(p *Pacer, duration time.Duration) (when time.Time) {
func TestBeginCall(t *testing.T) {
p := New(MaxConnectionsOption(10), CalculatorOption(NewDefault(MinSleep(1*time.Millisecond))))
emptyTokens(p)
go p.beginCall()
go p.beginCall(true)
if !waitForPace(p, 10*time.Millisecond).IsZero() {
t.Errorf("beginSleep fired too early #1")
}
@@ -131,7 +131,7 @@ func TestBeginCall(t *testing.T) {
func TestBeginCallZeroConnections(t *testing.T) {
p := New(MaxConnectionsOption(0), CalculatorOption(NewDefault(MinSleep(1*time.Millisecond))))
emptyTokens(p)
go p.beginCall()
go p.beginCall(false)
if !waitForPace(p, 10*time.Millisecond).IsZero() {
t.Errorf("beginSleep fired too early #1")
}
@@ -257,7 +257,7 @@ func TestEndCall(t *testing.T) {
p := New(MaxConnectionsOption(5))
emptyTokens(p)
p.state.ConsecutiveRetries = 1
p.endCall(true, nil)
p.endCall(true, nil, true)
assert.Equal(t, 1, len(p.connTokens))
assert.Equal(t, 2, p.state.ConsecutiveRetries)
}
@@ -266,7 +266,7 @@ func TestEndCallZeroConnections(t *testing.T) {
p := New(MaxConnectionsOption(0))
emptyTokens(p)
p.state.ConsecutiveRetries = 1
p.endCall(false, nil)
p.endCall(false, nil, false)
assert.Equal(t, 0, len(p.connTokens))
assert.Equal(t, 0, p.state.ConsecutiveRetries)
}
@@ -353,6 +353,41 @@ func TestCallParallel(t *testing.T) {
wait.Broadcast()
}
func BenchmarkPacerReentered(b *testing.B) {
for b.Loop() {
_ = pacerReentered()
}
}
func BenchmarkPacerReentered100(b *testing.B) {
var fn func(level int)
fn = func(level int) {
if level > 0 {
fn(level - 1)
return
}
for b.Loop() {
_ = pacerReentered()
}
}
fn(100)
}
func TestCallMaxConnectionsRecursiveDeadlock(t *testing.T) {
p := New(CalculatorOption(NewDefault(MinSleep(1*time.Millisecond), MaxSleep(2*time.Millisecond))))
p.SetMaxConnections(1)
dp := &dummyPaced{retry: false}
err := p.Call(func() (bool, error) {
// check we have taken the connection token
// no tokens left means deadlock on the recursive call
assert.Equal(t, 0, len(p.connTokens))
return false, p.Call(dp.fn)
})
assert.Equal(t, 1, dp.called)
assert.Equal(t, errFoo, err)
}
func TestRetryAfterError_NonNilErr(t *testing.T) {
orig := errors.New("test failure")
dur := 2 * time.Second