1
0
mirror of https://github.com/rclone/rclone.git synced 2025-12-06 00:03:32 +00:00

chunker: reduce length of temporary suffix

This commit is contained in:
Ivan Andreev
2019-12-04 13:43:58 +03:00
committed by Nick Craig-Wood
parent 50bb9b7bdd
commit 41ba1bba2b
3 changed files with 384 additions and 206 deletions

View File

@@ -12,11 +12,13 @@ import (
gohash "hash"
"io"
"io/ioutil"
"math/rand"
"path"
"regexp"
"sort"
"strconv"
"strings"
"sync"
"time"
"github.com/pkg/errors"
@@ -34,46 +36,57 @@ import (
// and optional metadata object. If it's present,
// meta object is named after the original file.
//
// The only supported metadata format is simplejson atm.
// It supports only per-file meta objects that are rudimentary,
// used mostly for consistency checks (lazily for performance reasons).
// Other formats can be developed that use an external meta store
// free of these limitations, but this needs some support from
// rclone core (eg. metadata store interfaces).
//
// The following types of chunks are supported:
// data and control, active and temporary.
// Chunk type is identified by matching chunk file name
// based on the chunk name format configured by user.
//
// Both data and control chunks can be either temporary or
// active (non-temporary).
// Both data and control chunks can be either temporary (aka hidden)
// or active (non-temporary aka normal aka permanent).
// An operation creates temporary chunks while it runs.
// By completion it removes temporary and leaves active
// (aka normal aka permanent) chunks.
// By completion it removes temporary and leaves active chunks.
//
// Temporary (aka hidden) chunks have a special hardcoded suffix
// in addition to the configured name pattern. The suffix comes last
// to prevent name collisions with non-temporary chunks.
// Temporary suffix includes so called transaction number usually
// abbreviated as `xactNo` below, a generic non-negative integer
// Temporary chunks have a special hardcoded suffix in addition
// to the configured name pattern.
// Temporary suffix includes so called transaction identifier
// (abbreviated as `xactID` below), a generic non-negative base-36 "number"
// used by parallel operations to share a composite object.
// Chunker also accepts the longer decimal temporary suffix (obsolete),
// which is transparently converted to the new format. In its maximum
// length of 13 decimals it makes a 7-digit base-36 number.
//
// Chunker can tell data chunks from control chunks by the characters
// located in the "hash placeholder" position of configured format.
// Data chunks have decimal digits there.
// Control chunks have a short lowercase literal prepended by underscore
// in that position.
// Control chunks have in that position a short lowercase alphanumeric
// string (starting with a letter) prepended by underscore.
//
// Metadata format v1 does not define any control chunk types,
// they are currently ignored aka reserved.
// In future they can be used to implement resumable uploads etc.
//
const (
ctrlTypeRegStr = `[a-z]{3,9}`
tempChunkFormat = `%s..tmp_%010d`
tempChunkRegStr = `\.\.tmp_([0-9]{10,19})`
ctrlTypeRegStr = `[a-z][a-z0-9]{2,6}`
tempSuffixFormat = `_%04s`
tempSuffixRegStr = `_([0-9a-z]{4,9})`
tempSuffixRegOld = `\.\.tmp_([0-9]{10,13})`
)
var (
ctrlTypeRegexp = regexp.MustCompile(`^` + ctrlTypeRegStr + `$`)
// regular expressions to validate control type and temporary suffix
ctrlTypeRegexp = regexp.MustCompile(`^` + ctrlTypeRegStr + `$`)
tempSuffixRegexp = regexp.MustCompile(`^` + tempSuffixRegStr + `$`)
)
// Normally metadata is a small piece of JSON (about 100-300 bytes).
// The size of valid metadata size must never exceed this limit.
// The size of valid metadata must never exceed this limit.
// Current maximum provides a reasonable room for future extensions.
//
// Please refrain from increasing it, this can cause old rclone versions
@@ -101,6 +114,9 @@ const revealHidden = false
// Prevent memory overflow due to specially crafted chunk name
const maxSafeChunkNumber = 10000000
// Number of attempts to find unique transaction identifier
const maxTransactionProbes = 100
// standard chunker errors
var (
ErrChunkOverflow = errors.New("chunk number overflow")
@@ -113,13 +129,6 @@ const (
delFailed = 2 // move, then delete and try again if failed
)
// Note: metadata logic is tightly coupled with chunker code in many
// places, eg. in checks whether a file should have meta object or is
// eligible for chunking.
// If more metadata formats (or versions of a format) are added in future,
// it may be advisable to factor it into a "metadata strategy" interface
// similar to chunkingReader or linearReader below.
// Register with Fs
func init() {
fs.Register(&fs.RegInfo{
@@ -261,7 +270,7 @@ func NewFs(name, rpath string, m configmap.Mapper) (fs.Fs, error) {
// detects a composite file because it finds the first chunk!
// (yet can't satisfy fstest.CheckListing, will ignore)
if err == nil && !f.useMeta && strings.Contains(rpath, "/") {
firstChunkPath := f.makeChunkName(remotePath, 0, "", -1)
firstChunkPath := f.makeChunkName(remotePath, 0, "", "")
_, testErr := baseInfo.NewFs(baseName, firstChunkPath, baseConfig)
if testErr == fs.ErrorIsFile {
err = testErr
@@ -310,12 +319,16 @@ type Fs struct {
dataNameFmt string // name format of data chunks
ctrlNameFmt string // name format of control chunks
nameRegexp *regexp.Regexp // regular expression to match chunk names
xactIDRand *rand.Rand // generator of random transaction identifiers
xactIDMutex sync.Mutex // mutex for the source of randomness
opt Options // copy of Options
features *fs.Features // optional features
dirSort bool // reserved for future, ignored
}
// configure must be called only from NewFs or by unit tests
// configure sets up chunker for given name format, meta format and hash type.
// It also seeds the source of random transaction identifiers.
// configure must be called only from NewFs or by unit tests.
func (f *Fs) configure(nameFormat, metaFormat, hashType string) error {
if err := f.setChunkNameFormat(nameFormat); err != nil {
return errors.Wrapf(err, "invalid name format '%s'", nameFormat)
@@ -326,6 +339,10 @@ func (f *Fs) configure(nameFormat, metaFormat, hashType string) error {
if err := f.setHashType(hashType); err != nil {
return err
}
randomSeed := time.Now().UnixNano()
f.xactIDRand = rand.New(rand.NewSource(randomSeed))
return nil
}
@@ -414,13 +431,13 @@ func (f *Fs) setChunkNameFormat(pattern string) error {
}
reDataOrCtrl := fmt.Sprintf("(?:(%s)|_(%s))", reDigits, ctrlTypeRegStr)
// this must be non-greedy or else it can eat up temporary suffix
// this must be non-greedy or else it could eat up temporary suffix
const mainNameRegStr = "(.+?)"
strRegex := regexp.QuoteMeta(pattern)
strRegex = reHashes.ReplaceAllLiteralString(strRegex, reDataOrCtrl)
strRegex = strings.Replace(strRegex, "\\*", mainNameRegStr, -1)
strRegex = fmt.Sprintf("^%s(?:%s)?$", strRegex, tempChunkRegStr)
strRegex = fmt.Sprintf("^%s(?:%s|%s)?$", strRegex, tempSuffixRegStr, tempSuffixRegOld)
f.nameRegexp = regexp.MustCompile(strRegex)
// craft printf formats for active data/control chunks
@@ -435,34 +452,36 @@ func (f *Fs) setChunkNameFormat(pattern string) error {
return nil
}
// makeChunkName produces chunk name (or path) for given file.
// makeChunkName produces chunk name (or path) for a given file.
//
// mainPath can be name, relative or absolute path of main file.
// filePath can be name, relative or absolute path of main file.
//
// chunkNo must be a zero based index of data chunk.
// Negative chunkNo eg. -1 indicates a control chunk.
// ctrlType is type of control chunk (must be valid).
// ctrlType must be "" for data chunks.
//
// xactNo is a transaction number.
// Negative xactNo eg. -1 indicates an active chunk,
// otherwise produce temporary chunk name.
// xactID is a transaction identifier. Empty xactID denotes active chunk,
// otherwise temporary chunk name is produced.
//
func (f *Fs) makeChunkName(mainPath string, chunkNo int, ctrlType string, xactNo int64) string {
dir, mainName := path.Split(mainPath)
var name string
func (f *Fs) makeChunkName(filePath string, chunkNo int, ctrlType, xactID string) string {
dir, parentName := path.Split(filePath)
var name, tempSuffix string
switch {
case chunkNo >= 0 && ctrlType == "":
name = fmt.Sprintf(f.dataNameFmt, mainName, chunkNo+f.opt.StartFrom)
name = fmt.Sprintf(f.dataNameFmt, parentName, chunkNo+f.opt.StartFrom)
case chunkNo < 0 && ctrlTypeRegexp.MatchString(ctrlType):
name = fmt.Sprintf(f.ctrlNameFmt, mainName, ctrlType)
name = fmt.Sprintf(f.ctrlNameFmt, parentName, ctrlType)
default:
panic("makeChunkName: invalid argument") // must not produce something we can't consume
}
if xactNo >= 0 {
name = fmt.Sprintf(tempChunkFormat, name, xactNo)
if xactID != "" {
tempSuffix = fmt.Sprintf(tempSuffixFormat, xactID)
if !tempSuffixRegexp.MatchString(tempSuffix) {
panic("makeChunkName: invalid argument")
}
}
return dir + name
return dir + name + tempSuffix
}
// parseChunkName checks whether given file path belongs to
@@ -470,20 +489,21 @@ func (f *Fs) makeChunkName(mainPath string, chunkNo int, ctrlType string, xactNo
//
// filePath can be name, relative or absolute path of a file.
//
// Returned mainPath is a non-empty string if valid chunk name
// is detected or "" if it's not a chunk.
// Returned parentPath is path of the composite file owning the chunk.
// It's a non-empty string if valid chunk name is detected
// or "" if it's not a chunk.
// Other returned values depend on detected chunk type:
// data or control, active or temporary:
//
// data chunk - the returned chunkNo is non-negative and ctrlType is ""
// control chunk - the chunkNo is -1 and ctrlType is non-empty string
// active chunk - the returned xactNo is -1
// temporary chunk - the xactNo is non-negative integer
func (f *Fs) parseChunkName(filePath string) (mainPath string, chunkNo int, ctrlType string, xactNo int64) {
// control chunk - the chunkNo is -1 and ctrlType is a non-empty string
// active chunk - the returned xactID is ""
// temporary chunk - the xactID is a non-empty string
func (f *Fs) parseChunkName(filePath string) (parentPath string, chunkNo int, ctrlType, xactID string) {
dir, name := path.Split(filePath)
match := f.nameRegexp.FindStringSubmatch(name)
if match == nil || match[1] == "" {
return "", -1, "", -1
return "", -1, "", ""
}
var err error
@@ -494,19 +514,26 @@ func (f *Fs) parseChunkName(filePath string) (mainPath string, chunkNo int, ctrl
}
if chunkNo -= f.opt.StartFrom; chunkNo < 0 {
fs.Infof(f, "invalid data chunk number in file %q", name)
return "", -1, "", -1
return "", -1, "", ""
}
}
xactNo = -1
if match[4] != "" {
if xactNo, err = strconv.ParseInt(match[4], 10, 64); err != nil || xactNo < 0 {
fs.Infof(f, "invalid transaction number in file %q", name)
return "", -1, "", -1
xactID = match[4]
}
if match[5] != "" {
// old-style temporary suffix
number, err := strconv.ParseInt(match[5], 10, 64)
if err != nil || number < 0 {
fs.Infof(f, "invalid old-style transaction number in file %q", name)
return "", -1, "", ""
}
// convert old-style transaction number to base-36 transaction ID
xactID = fmt.Sprintf(tempSuffixFormat, strconv.FormatInt(number, 36))
xactID = xactID[1:] // strip leading underscore
}
mainPath = dir + match[1]
parentPath = dir + match[1]
ctrlType = match[3]
return
}
@@ -514,17 +541,74 @@ func (f *Fs) parseChunkName(filePath string) (mainPath string, chunkNo int, ctrl
// forbidChunk prints error message or raises error if file is chunk.
// First argument sets log prefix, use `false` to suppress message.
func (f *Fs) forbidChunk(o interface{}, filePath string) error {
if mainPath, _, _, _ := f.parseChunkName(filePath); mainPath != "" {
if parentPath, _, _, _ := f.parseChunkName(filePath); parentPath != "" {
if f.opt.FailHard {
return fmt.Errorf("chunk overlap with %q", mainPath)
return fmt.Errorf("chunk overlap with %q", parentPath)
}
if boolVal, isBool := o.(bool); !isBool || boolVal {
fs.Errorf(o, "chunk overlap with %q", mainPath)
fs.Errorf(o, "chunk overlap with %q", parentPath)
}
}
return nil
}
// newXactID produces a sufficiently random transaction identifier.
//
// The temporary suffix mask allows identifiers consisting of 4-9
// base-36 digits (ie. digits 0-9 or lowercase letters a-z).
// The identifiers must be unique between transactions running on
// the single file in parallel.
//
// Currently the function produces 6-character identifiers.
// Together with underscore this makes a 7-character temporary suffix.
//
// The first 4 characters isolate groups of transactions by time intervals.
// The maximum length of interval is base-36 "zzzz" ie. 1,679,615 seconds.
// The function rather takes a maximum prime closest to this number
// (see https://primes.utm.edu) as the interval length to better safeguard
// against repeating pseudo-random sequences in cases when rclone is
// invoked from a periodic scheduler like unix cron.
// Thus, the interval is slightly more than 19 days 10 hours 33 minutes.
//
// The remaining 2 base-36 digits (in the range from 0 to 1295 inclusive)
// are taken from the local random source.
// This provides about 0.1% collision probability for two parallel
// operations started at the same second and working on the same file.
//
// Non-empty filePath argument enables probing for existing temporary chunk
// to further eliminate collisions.
func (f *Fs) newXactID(ctx context.Context, filePath string) (xactID string, err error) {
const closestPrimeZzzzSeconds = 1679609
const maxTwoBase36Digits = 1295
unixSec := time.Now().Unix()
if unixSec < 0 {
unixSec = -unixSec // unlikely but the number must be positive
}
circleSec := unixSec % closestPrimeZzzzSeconds
first4chars := strconv.FormatInt(circleSec, 36)
for tries := 0; tries < maxTransactionProbes; tries++ {
f.xactIDMutex.Lock()
randomness := f.xactIDRand.Int63n(maxTwoBase36Digits + 1)
f.xactIDMutex.Unlock()
last2chars := strconv.FormatInt(randomness, 36)
xactID = fmt.Sprintf("%04s%02s", first4chars, last2chars)
if filePath == "" {
return
}
probeChunk := f.makeChunkName(filePath, 0, "", xactID)
_, probeErr := f.base.NewObject(ctx, probeChunk)
if probeErr != nil {
return
}
}
return "", fmt.Errorf("can't setup transaction for %s", filePath)
}
// List the objects and directories in dir into entries.
// The entries can be returned in any order but should be
// for a complete directory.
@@ -602,8 +686,8 @@ func (f *Fs) processEntries(ctx context.Context, origEntries fs.DirEntries, dirP
switch entry := dirOrObject.(type) {
case fs.Object:
remote := entry.Remote()
if mainRemote, chunkNo, ctrlType, xactNo := f.parseChunkName(remote); mainRemote != "" {
if xactNo != -1 {
if mainRemote, chunkNo, ctrlType, xactID := f.parseChunkName(remote); mainRemote != "" {
if xactID != "" {
if revealHidden {
fs.Infof(f, "ignore temporary chunk %q", remote)
}
@@ -686,7 +770,7 @@ func (f *Fs) processEntries(ctx context.Context, origEntries fs.DirEntries, dirP
//
// Please note that every NewObject invocation will scan the whole directory.
// Using here something like fs.DirCache might improve performance
// (but will make logic more complex, though).
// (yet making the logic more complex).
//
// Note that chunker prefers analyzing file names rather than reading
// the content of meta object assuming that directory scans are fast
@@ -752,8 +836,8 @@ func (f *Fs) NewObject(ctx context.Context, remote string) (fs.Object, error) {
if !strings.Contains(entryRemote, remote) {
continue // bypass regexp to save cpu
}
mainRemote, chunkNo, ctrlType, xactNo := f.parseChunkName(entryRemote)
if mainRemote == "" || mainRemote != remote || ctrlType != "" || xactNo != -1 {
mainRemote, chunkNo, ctrlType, xactID := f.parseChunkName(entryRemote)
if mainRemote == "" || mainRemote != remote || ctrlType != "" || xactID != "" {
continue // skip non-conforming, temporary and control chunks
}
//fs.Debugf(f, "%q belongs to %q as chunk %d", entryRemote, mainRemote, chunkNo)
@@ -786,7 +870,7 @@ func (f *Fs) NewObject(ctx context.Context, remote string) (fs.Object, error) {
// This is either a composite object with metadata or a non-chunked
// file without metadata. Validate it and update the total data size.
// As an optimization, skip metadata reading here - we will call
// readMetadata lazily when needed.
// readMetadata lazily when needed (reading can be expensive).
if err := o.validate(); err != nil {
return nil, err
}
@@ -843,14 +927,11 @@ func (f *Fs) put(ctx context.Context, in io.Reader, src fs.ObjectInfo, remote st
}
}()
// Use system timer as a trivial source of transaction numbers,
// don't try hard to safeguard against chunk collisions between
// parallel transactions.
xactNo := time.Now().Unix()
if xactNo < 0 {
xactNo = -xactNo // unlikely but transaction number must be positive
}
baseRemote := remote
xactID, errXact := f.newXactID(ctx, baseRemote)
if errXact != nil {
return nil, errXact
}
// Transfer chunks data
for c.chunkNo = 0; !c.done; c.chunkNo++ {
@@ -858,7 +939,7 @@ func (f *Fs) put(ctx context.Context, in io.Reader, src fs.ObjectInfo, remote st
return nil, ErrChunkOverflow
}
tempRemote := f.makeChunkName(baseRemote, c.chunkNo, "", xactNo)
tempRemote := f.makeChunkName(baseRemote, c.chunkNo, "", xactID)
size := c.sizeLeft
if size > c.chunkSize {
size = c.chunkSize
@@ -962,7 +1043,7 @@ func (f *Fs) put(ctx context.Context, in io.Reader, src fs.ObjectInfo, remote st
// Rename data chunks from temporary to final names
for chunkNo, chunk := range c.chunks {
chunkRemote := f.makeChunkName(baseRemote, chunkNo, "", -1)
chunkRemote := f.makeChunkName(baseRemote, chunkNo, "", "")
chunkMoved, errMove := f.baseMove(ctx, chunk, chunkRemote, delFailed)
if errMove != nil {
return nil, errMove
@@ -1221,11 +1302,6 @@ func (f *Fs) PutUnchecked(ctx context.Context, in io.Reader, src fs.ObjectInfo,
return f.newObject("", o, nil), nil
}
// Precision returns the precision of this Fs
func (f *Fs) Precision() time.Duration {
return f.base.Precision()
}
// Hashes returns the supported hash sets.
// Chunker advertises a hash type if and only if it can be calculated
// for files of any size, non-chunked or composite.
@@ -1613,8 +1689,8 @@ func (f *Fs) ChangeNotify(ctx context.Context, notifyFunc func(string, fs.EntryT
wrappedNotifyFunc := func(path string, entryType fs.EntryType) {
//fs.Debugf(f, "ChangeNotify: path %q entryType %d", path, entryType)
if entryType == fs.EntryObject {
mainPath, _, _, xactNo := f.parseChunkName(path)
if mainPath != "" && xactNo == -1 {
mainPath, _, _, xactID := f.parseChunkName(path)
if mainPath != "" && xactID == "" {
path = mainPath
}
}
@@ -2063,7 +2139,7 @@ type metaSimpleJSON struct {
// Current implementation creates metadata in three cases:
// - for files larger than chunk size
// - if file contents can be mistaken as meta object
// - if consistent hashing is on but wrapped remote can't provide given hash
// - if consistent hashing is On but wrapped remote can't provide given hash
//
func marshalSimpleJSON(ctx context.Context, size int64, nChunks int, md5, sha1 string) ([]byte, error) {
version := metadataVersion
@@ -2177,6 +2253,11 @@ func (f *Fs) String() string {
return fmt.Sprintf("Chunked '%s:%s'", f.name, f.root)
}
// Precision returns the precision of this Fs
func (f *Fs) Precision() time.Duration {
return f.base.Precision()
}
// Check the interfaces are satisfied
var (
_ fs.Fs = (*Fs)(nil)