1
0
mirror of https://github.com/rclone/rclone.git synced 2026-01-03 00:53:43 +00:00

sync: implement --list-cutoff to allow on disk sorting for reduced memory use

Before this change, rclone had to load an entire directory into RAM in
order to sort it so it could be synced.

With directories with millions of entries, this used too much memory.

This fixes the probem by using an on disk sort when there are more
than --list-cutoff entries in a directory.

Fixes #7974
This commit is contained in:
Nick Craig-Wood
2024-12-09 11:30:34 +00:00
parent 0148bd4668
commit 385465bfa9
9 changed files with 493 additions and 18 deletions

View File

@@ -3,16 +3,30 @@ package list
import (
"cmp"
"context"
"errors"
"fmt"
"slices"
"strings"
"sync"
"time"
"github.com/lanrat/extsort"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/lib/errcount"
"golang.org/x/sync/errgroup"
)
// NewObjecter is the minimum facilities we need from the fs.Fs passed into NewSorter.
type NewObjecter interface {
// NewObject finds the Object at remote. If it can't be found
// it returns the error ErrorObjectNotFound.
NewObject(ctx context.Context, remote string) (fs.Object, error)
}
// Sorter implements an efficient mechanism for sorting list entries.
//
// If there are a large number of entries, this may be done on disk
// instead of in memory.
// If there are a large number of entries (above `--list-cutoff`),
// this may be done on disk instead of in memory.
//
// Supply entries with the Add method, call Send at the end to deliver
// the sorted entries and finalise with CleanUp regardless of whether
@@ -21,11 +35,21 @@ import (
// Sorted entries are delivered to the callback supplied to NewSorter
// when the Send method is called.
type Sorter struct {
ctx context.Context
mu sync.Mutex
callback fs.ListRCallback
entries fs.DirEntries
keyFn KeyFn
ctx context.Context // context for everything
ci *fs.ConfigInfo // config we are using
cancel func() // cancel all background operations
mu sync.Mutex // protect the below
f NewObjecter // fs that we are listing
callback fs.ListRCallback // where to send the sorted entries to
entries fs.DirEntries // accumulated entries
keyFn KeyFn // transform an entry into a sort key
cutoff int // number of entries above which we start extsort
extSort bool // true if we are ext sorting
inputChan chan string // for sending data to the ext sort
outputChan chan string // for receiving data from the ext sort
errChan chan error // for getting errors from the ext sort
sorter *extsort.StringSorter // external string sort
errs *errcount.ErrCount // accumulate errors
}
// KeyFn turns an entry into a sort key
@@ -39,17 +63,109 @@ func identityKeyFn(entry fs.DirEntry) string {
// NewSorter creates a new Sorter with callback for sorted entries to
// be delivered to. keyFn is used to process each entry to get a key
// function, if nil then it will just use entry.Remote()
func NewSorter(ctx context.Context, callback fs.ListRCallback, keyFn KeyFn) (*Sorter, error) {
func NewSorter(ctx context.Context, f NewObjecter, callback fs.ListRCallback, keyFn KeyFn) (*Sorter, error) {
ci := fs.GetConfig(ctx)
ctx, cancel := context.WithCancel(ctx)
if keyFn == nil {
keyFn = identityKeyFn
}
return &Sorter{
ctx: ctx,
ci: ci,
cancel: cancel,
f: f,
callback: callback,
keyFn: keyFn,
cutoff: ci.ListCutoff,
errs: errcount.New(),
}, nil
}
// Turn a directory entry into a combined key and data for extsort
func (ls *Sorter) entryToKey(entry fs.DirEntry) string {
// To start with we just use the Remote to recover the object
// To make more efficient we would serialize the object here
remote := entry.Remote()
remote = strings.TrimRight(remote, "/")
if _, isDir := entry.(fs.Directory); isDir {
remote += "/"
}
key := ls.keyFn(entry) + "\x00" + remote
return key
}
// Turn an exsort key back into a directory entry
func (ls *Sorter) keyToEntry(ctx context.Context, key string) (entry fs.DirEntry, err error) {
null := strings.IndexRune(key, '\x00')
if null < 0 {
return nil, errors.New("sorter: failed to deserialize: missing null")
}
remote := key[null+1:]
if remote, isDir := strings.CutSuffix(remote, "/"); isDir {
// Is a directory
//
// Note this creates a very minimal directory entry which should be fine for the
// bucket based remotes this code will be run on.
entry = fs.NewDir(remote, time.Time{})
} else {
obj, err := ls.f.NewObject(ctx, remote)
if err != nil {
fs.Errorf(ls.f, "sorter: failed to re-create object %q: %v", remote, err)
return nil, fmt.Errorf("sorter: failed to re-create object: %w", err)
}
entry = obj
}
return entry, nil
}
func (ls *Sorter) sendEntriesToExtSort(entries fs.DirEntries) (err error) {
for _, entry := range entries {
select {
case ls.inputChan <- ls.entryToKey(entry):
case err = <-ls.errChan:
if err != nil {
return err
}
}
}
select {
case err = <-ls.errChan:
default:
}
return err
}
func (ls *Sorter) startExtSort() (err error) {
fs.Logf(ls.f, "Switching to on disk sorting as more than %d entries in one directory detected", ls.cutoff)
ls.inputChan = make(chan string, 100)
// Options to control the extsort
opt := extsort.Config{
NumWorkers: 8, // small effect
ChanBuffSize: 1024, // small effect
SortedChanBuffSize: 1024, // makes a lot of difference
ChunkSize: 32 * 1024, // tuned for 50 char records (UUID sized)
// Defaults
// ChunkSize: int(1e6), // amount of records to store in each chunk which will be written to disk
// NumWorkers: 2, // maximum number of workers to use for parallel sorting
// ChanBuffSize: 1, // buffer size for merging chunks
// SortedChanBuffSize: 10, // buffer size for passing records to output
// TempFilesDir: "", // empty for use OS default ex: /tmp
}
ls.sorter, ls.outputChan, ls.errChan = extsort.Strings(ls.inputChan, &opt)
go ls.sorter.Sort(ls.ctx)
// Show we are extsorting now
ls.extSort = true
// Send the accumulated entries to the sorter
fs.Debugf(ls.f, "Sending accumulated directory entries to disk")
err = ls.sendEntriesToExtSort(ls.entries)
fs.Debugf(ls.f, "Done sending accumulated directory entries to disk")
clear(ls.entries)
ls.entries = nil
return err
}
// Add entries to the list sorter.
//
// Does not call the callback.
@@ -58,15 +174,133 @@ func NewSorter(ctx context.Context, callback fs.ListRCallback, keyFn KeyFn) (*So
func (ls *Sorter) Add(entries fs.DirEntries) error {
ls.mu.Lock()
defer ls.mu.Unlock()
ls.entries = append(ls.entries, entries...)
if ls.extSort {
err := ls.sendEntriesToExtSort(entries)
if err != nil {
return err
}
} else {
ls.entries = append(ls.entries, entries...)
if len(ls.entries) >= ls.cutoff {
err := ls.startExtSort()
if err != nil {
return err
}
}
}
return nil
}
// Number of entries to batch in list helper
const listHelperBatchSize = 100
// listHelper is used to turn keys into entries concurrently
type listHelper struct {
ls *Sorter // parent
keys []string // keys being built up
entries fs.DirEntries // entries processed concurrently as a batch
errs []error // errors processed concurrently
}
// NewlistHelper should be with the callback passed in
func (ls *Sorter) newListHelper() *listHelper {
return &listHelper{
ls: ls,
entries: make(fs.DirEntries, listHelperBatchSize),
errs: make([]error, listHelperBatchSize),
}
}
// send sends the stored entries to the callback if there are >= max
// entries.
func (lh *listHelper) send(max int) (err error) {
if len(lh.keys) < max {
return nil
}
// Turn this batch into objects in parallel
g, gCtx := errgroup.WithContext(lh.ls.ctx)
g.SetLimit(lh.ls.ci.Checkers)
for i, key := range lh.keys {
i, key := i, key // can remove when go1.22 is minimum version
g.Go(func() error {
lh.entries[i], lh.errs[i] = lh.ls.keyToEntry(gCtx, key)
return nil
})
}
err = g.Wait()
if err != nil {
return err
}
// Account errors and collect OK entries
toSend := lh.entries[:0]
for i := range lh.keys {
entry, err := lh.entries[i], lh.errs[i]
if err != nil {
lh.ls.errs.Add(err)
} else if entry != nil {
toSend = append(toSend, entry)
}
}
// fmt.Println(lh.keys)
// fmt.Println(toSend)
err = lh.ls.callback(toSend)
clear(lh.entries)
clear(lh.errs)
lh.keys = lh.keys[:0]
return err
}
// Add an entry to the stored entries and send them if there are more
// than a certain amount
func (lh *listHelper) Add(key string) error {
lh.keys = append(lh.keys, key)
return lh.send(100)
}
// Flush the stored entries (if any) sending them to the callback
func (lh *listHelper) Flush() error {
return lh.send(1)
}
// Send the sorted entries to the callback.
func (ls *Sorter) Send() error {
func (ls *Sorter) Send() (err error) {
ls.mu.Lock()
defer ls.mu.Unlock()
if ls.extSort {
close(ls.inputChan)
list := ls.newListHelper()
outer:
for {
select {
case key, ok := <-ls.outputChan:
if !ok {
break outer
}
err := list.Add(key)
if err != nil {
return err
}
case err := <-ls.errChan:
if err != nil {
return err
}
}
}
err = list.Flush()
if err != nil {
return err
}
return ls.errs.Err("sorter")
}
// Sort the directory entries by Remote
//
// We use a stable sort here just in case there are
@@ -90,7 +324,10 @@ func (ls *Sorter) CleanUp() {
ls.mu.Lock()
defer ls.mu.Unlock()
ls.cancel()
clear(ls.entries)
ls.entries = nil
ls.extSort = false
}
// SortToChan makes a callback for the Sorter which sends the output