1
0
mirror of https://github.com/rclone/rclone.git synced 2025-12-06 00:03:32 +00:00

chunker: partially implement no-rename transactions (#4675)

Some storage providers e.g. S3 don't have an efficient rename operation.
Before this change, when chunker finished an upload, the server-side copy
and delete operations that renamed temporary chunks to their final names
could take a significant amount of time.
This PR records transaction identifier (versioning) in the metadata of
chunker composite objects striving to remove the need for rename
operations on such backends.
This approach will be triggered be the new "transactions" configuration
option, which can be "rename" (the default) or "norename".
We implement the new approach for uploads (Put operations).
The chunker Move operation still uses the rename operation of
underlying backend. Filling this gap is left for a later PR.

Co-authored-by: Ivan Andreev <ivandeex@gmail.com>
This commit is contained in:
Maxwell Calman
2020-10-30 15:30:04 -05:00
committed by Ivan Andreev
parent b029fb591f
commit 9cc8ff4dd4
4 changed files with 317 additions and 53 deletions

View File

@@ -47,7 +47,8 @@ import (
// The following types of chunks are supported:
// data and control, active and temporary.
// Chunk type is identified by matching chunk file name
// based on the chunk name format configured by user.
// based on the chunk name format configured by user and transaction
// style being used.
//
// Both data and control chunks can be either temporary (aka hidden)
// or active (non-temporary aka normal aka permanent).
@@ -63,6 +64,12 @@ import (
// which is transparently converted to the new format. In its maximum
// length of 13 decimals it makes a 7-digit base-36 number.
//
// When transactions is set to the norename style, data chunks will
// keep their temporary chunk names (with the transacion identifier
// suffix). To distinguish them from temporary chunks, the txn field
// of the metadata file is set to match the transaction identifier of
// the data chunks.
//
// Chunker can tell data chunks from control chunks by the characters
// located in the "hash placeholder" position of configured format.
// Data chunks have decimal digits there.
@@ -101,7 +108,7 @@ const maxMetadataSize = 1023
const maxMetadataSizeWritten = 255
// Current/highest supported metadata format.
const metadataVersion = 1
const metadataVersion = 2
// optimizeFirstChunk enables the following optimization in the Put:
// If a single chunk is expected, put the first chunk using the
@@ -224,6 +231,31 @@ It has the following fields: ver, size, nchunks, md5, sha1.`,
Help: "Warn user, skip incomplete file and proceed.",
},
},
}, {
Name: "transactions",
Advanced: true,
Default: "rename",
Help: `Choose how chunker should handle temporary files during transactions.`,
Hide: fs.OptionHideCommandLine,
Examples: []fs.OptionExample{
{
Value: "rename",
Help: "Rename temporary files after a successful transaction.",
}, {
Value: "norename",
Help: `Leave temporary file names and write transaction ID to metadata file.
Metadata is required for no rename transactions (meta format cannot be "none").
If you are using norename transactions you should be careful not to downgrade Rclone
as older versions of Rclone don't support this transaction style and will misinterpret
files manipulated by norename transactions.
This method is EXPERIMENTAL, don't use on production systems.`,
}, {
Value: "auto",
Help: `Rename or norename will be used depending on capabilities of the backend.
If meta format is set to "none", rename transactions will always be used.
This method is EXPERIMENTAL, don't use on production systems.`,
},
},
}},
})
}
@@ -271,7 +303,7 @@ func NewFs(ctx context.Context, name, rpath string, m configmap.Mapper) (fs.Fs,
cache.PinUntilFinalized(f.base, f)
f.dirSort = true // processEntries requires that meta Objects prerun data chunks atm.
if err := f.configure(opt.NameFormat, opt.MetaFormat, opt.HashType); err != nil {
if err := f.configure(opt.NameFormat, opt.MetaFormat, opt.HashType, opt.Transactions); err != nil {
return nil, err
}
@@ -309,13 +341,14 @@ func NewFs(ctx context.Context, name, rpath string, m configmap.Mapper) (fs.Fs,
// Options defines the configuration for this backend
type Options struct {
Remote string `config:"remote"`
ChunkSize fs.SizeSuffix `config:"chunk_size"`
NameFormat string `config:"name_format"`
StartFrom int `config:"start_from"`
MetaFormat string `config:"meta_format"`
HashType string `config:"hash_type"`
FailHard bool `config:"fail_hard"`
Remote string `config:"remote"`
ChunkSize fs.SizeSuffix `config:"chunk_size"`
NameFormat string `config:"name_format"`
StartFrom int `config:"start_from"`
MetaFormat string `config:"meta_format"`
HashType string `config:"hash_type"`
FailHard bool `config:"fail_hard"`
Transactions string `config:"transactions"`
}
// Fs represents a wrapped fs.Fs
@@ -337,12 +370,13 @@ type Fs struct {
opt Options // copy of Options
features *fs.Features // optional features
dirSort bool // reserved for future, ignored
useNoRename bool // can be set with the transactions option
}
// configure sets up chunker for given name format, meta format and hash type.
// It also seeds the source of random transaction identifiers.
// configure must be called only from NewFs or by unit tests.
func (f *Fs) configure(nameFormat, metaFormat, hashType string) error {
func (f *Fs) configure(nameFormat, metaFormat, hashType, transactionMode string) error {
if err := f.setChunkNameFormat(nameFormat); err != nil {
return errors.Wrapf(err, "invalid name format '%s'", nameFormat)
}
@@ -352,6 +386,9 @@ func (f *Fs) configure(nameFormat, metaFormat, hashType string) error {
if err := f.setHashType(hashType); err != nil {
return err
}
if err := f.setTransactionMode(transactionMode); err != nil {
return err
}
randomSeed := time.Now().UnixNano()
f.xactIDRand = rand.New(rand.NewSource(randomSeed))
@@ -411,6 +448,27 @@ func (f *Fs) setHashType(hashType string) error {
return nil
}
func (f *Fs) setTransactionMode(transactionMode string) error {
switch transactionMode {
case "rename":
f.useNoRename = false
case "norename":
if !f.useMeta {
return errors.New("incompatible transaction options")
}
f.useNoRename = true
case "auto":
f.useNoRename = !f.CanQuickRename()
if f.useNoRename && !f.useMeta {
f.useNoRename = false
return errors.New("using norename transactions requires metadata")
}
default:
return fmt.Errorf("unsupported transaction mode '%s'", transactionMode)
}
return nil
}
// setChunkNameFormat converts pattern based chunk name format
// into Printf format and Regular expressions for data and
// control chunks.
@@ -693,6 +751,7 @@ func (f *Fs) processEntries(ctx context.Context, origEntries fs.DirEntries, dirP
byRemote := make(map[string]*Object)
badEntry := make(map[string]bool)
isSubdir := make(map[string]bool)
txnByRemote := map[string]string{}
var tempEntries fs.DirEntries
for _, dirOrObject := range sortedEntries {
@@ -705,12 +764,18 @@ func (f *Fs) processEntries(ctx context.Context, origEntries fs.DirEntries, dirP
object := f.newObject("", entry, nil)
byRemote[remote] = object
tempEntries = append(tempEntries, object)
if f.useNoRename {
txnByRemote[remote], err = object.readXactID(ctx)
if err != nil {
return nil, err
}
}
break
}
// this is some kind of chunk
// metobject should have been created above if present
isSpecial := xactID != "" || ctrlType != ""
mainObject := byRemote[mainRemote]
isSpecial := xactID != txnByRemote[mainRemote] || ctrlType != ""
if mainObject == nil && f.useMeta && !isSpecial {
fs.Debugf(f, "skip orphan data chunk %q", remote)
break
@@ -809,10 +874,11 @@ func (f *Fs) scanObject(ctx context.Context, remote string, quickScan bool) (fs.
}
var (
o *Object
baseObj fs.Object
err error
sameMain bool
o *Object
baseObj fs.Object
currentXactID string
err error
sameMain bool
)
if f.useMeta {
@@ -856,7 +922,14 @@ func (f *Fs) scanObject(ctx context.Context, remote string, quickScan bool) (fs.
return nil, errors.Wrap(err, "can't detect composite file")
}
if f.useNoRename {
currentXactID, err = o.readXactID(ctx)
if err != nil {
return nil, err
}
}
caseInsensitive := f.features.CaseInsensitive
for _, dirOrObject := range entries {
entry, ok := dirOrObject.(fs.Object)
if !ok {
@@ -878,7 +951,7 @@ func (f *Fs) scanObject(ctx context.Context, remote string, quickScan bool) (fs.
if !sameMain {
continue // skip alien chunks
}
if ctrlType != "" || xactID != "" {
if ctrlType != "" || xactID != currentXactID {
if f.useMeta {
// temporary/control chunk calls for lazy metadata read
o.unsure = true
@@ -993,12 +1066,57 @@ func (o *Object) readMetadata(ctx context.Context) error {
}
o.md5 = metaInfo.md5
o.sha1 = metaInfo.sha1
o.xactID = metaInfo.xactID
}
o.isFull = true // cache results
o.xIDCached = true
return nil
}
// readXactID returns the transaction ID stored in the passed metadata object
func (o *Object) readXactID(ctx context.Context) (xactID string, err error) {
// if xactID has already been read and cahced return it now
if o.xIDCached {
return o.xactID, nil
}
// Avoid reading metadata for backends that don't use xactID to identify permanent chunks
if !o.f.useNoRename {
return "", errors.New("readXactID requires norename transactions")
}
if o.main == nil {
return "", errors.New("readXactID requires valid metaobject")
}
if o.main.Size() > maxMetadataSize {
return "", nil // this was likely not a metadata object, return empty xactID but don't throw error
}
reader, err := o.main.Open(ctx)
if err != nil {
return "", err
}
data, err := ioutil.ReadAll(reader)
_ = reader.Close() // ensure file handle is freed on windows
if err != nil {
return "", err
}
switch o.f.opt.MetaFormat {
case "simplejson":
if data != nil && len(data) > maxMetadataSizeWritten {
return "", nil // this was likely not a metadata object, return empty xactID but don't throw error
}
var metadata metaSimpleJSON
err = json.Unmarshal(data, &metadata)
if err != nil {
return "", nil // this was likely not a metadata object, return empty xactID but don't throw error
}
xactID = metadata.XactID
}
o.xactID = xactID
o.xIDCached = true
return xactID, nil
}
// put implements Put, PutStream, PutUnchecked, Update
func (f *Fs) put(
ctx context.Context, in io.Reader, src fs.ObjectInfo, remote string, options []fs.OpenOption,
@@ -1151,14 +1269,17 @@ func (f *Fs) put(
// If previous object was chunked, remove its chunks
f.removeOldChunks(ctx, baseRemote)
// Rename data chunks from temporary to final names
for chunkNo, chunk := range c.chunks {
chunkRemote := f.makeChunkName(baseRemote, chunkNo, "", "")
chunkMoved, errMove := f.baseMove(ctx, chunk, chunkRemote, delFailed)
if errMove != nil {
return nil, errMove
if !f.useNoRename {
// The transaction suffix will be removed for backends with quick rename operations
for chunkNo, chunk := range c.chunks {
chunkRemote := f.makeChunkName(baseRemote, chunkNo, "", "")
chunkMoved, errMove := f.baseMove(ctx, chunk, chunkRemote, delFailed)
if errMove != nil {
return nil, errMove
}
c.chunks[chunkNo] = chunkMoved
}
c.chunks[chunkNo] = chunkMoved
xactID = ""
}
if !f.useMeta {
@@ -1178,7 +1299,7 @@ func (f *Fs) put(
switch f.opt.MetaFormat {
case "simplejson":
c.updateHashes()
metadata, err = marshalSimpleJSON(ctx, sizeTotal, len(c.chunks), c.md5, c.sha1)
metadata, err = marshalSimpleJSON(ctx, sizeTotal, len(c.chunks), c.md5, c.sha1, xactID)
}
if err == nil {
metaInfo := f.wrapInfo(src, baseRemote, int64(len(metadata)))
@@ -1190,6 +1311,7 @@ func (f *Fs) put(
o := f.newObject("", metaObject, c.chunks)
o.size = sizeTotal
o.xactID = xactID
return o, nil
}
@@ -1593,7 +1715,7 @@ func (f *Fs) copyOrMove(ctx context.Context, o *Object, remote string, do copyMo
var metadata []byte
switch f.opt.MetaFormat {
case "simplejson":
metadata, err = marshalSimpleJSON(ctx, newObj.size, len(newChunks), md5, sha1)
metadata, err = marshalSimpleJSON(ctx, newObj.size, len(newChunks), md5, sha1, o.xactID)
if err == nil {
metaInfo := f.wrapInfo(metaObject, "", int64(len(metadata)))
err = newObj.main.Update(ctx, bytes.NewReader(metadata), metaInfo)
@@ -1809,7 +1931,13 @@ func (f *Fs) ChangeNotify(ctx context.Context, notifyFunc func(string, fs.EntryT
//fs.Debugf(f, "ChangeNotify: path %q entryType %d", path, entryType)
if entryType == fs.EntryObject {
mainPath, _, _, xactID := f.parseChunkName(path)
if mainPath != "" && xactID == "" {
metaXactID := ""
if f.useNoRename {
metaObject, _ := f.base.NewObject(ctx, mainPath)
dummyObject := f.newObject("", metaObject, nil)
metaXactID, _ = dummyObject.readXactID(ctx)
}
if mainPath != "" && xactID == metaXactID {
path = mainPath
}
}
@@ -1830,15 +1958,17 @@ func (f *Fs) Shutdown(ctx context.Context) error {
// Object represents a composite file wrapping one or more data chunks
type Object struct {
remote string
main fs.Object // meta object if file is composite, or wrapped non-chunked file, nil if meta format is 'none'
chunks []fs.Object // active data chunks if file is composite, or wrapped file as a single chunk if meta format is 'none'
size int64 // cached total size of chunks in a composite file or -1 for non-chunked files
isFull bool // true if metadata has been read
unsure bool // true if need to read metadata to detect object type
md5 string
sha1 string
f *Fs
remote string
main fs.Object // meta object if file is composite, or wrapped non-chunked file, nil if meta format is 'none'
chunks []fs.Object // active data chunks if file is composite, or wrapped file as a single chunk if meta format is 'none'
size int64 // cached total size of chunks in a composite file or -1 for non-chunked files
isFull bool // true if metadata has been read
xIDCached bool // true if xactID has been read
unsure bool // true if need to read metadata to detect object type
xactID string // transaction ID for "norename" or empty string for "renamed" chunks
md5 string
sha1 string
f *Fs
}
func (o *Object) addChunk(chunk fs.Object, chunkNo int) error {
@@ -2166,6 +2296,7 @@ type ObjectInfo struct {
src fs.ObjectInfo
fs *Fs
nChunks int // number of data chunks
xactID string // transaction ID for "norename" or empty string for "renamed" chunks
size int64 // overrides source size by the total size of data chunks
remote string // overrides remote name
md5 string // overrides MD5 checksum
@@ -2264,8 +2395,9 @@ type metaSimpleJSON struct {
Size *int64 `json:"size"` // total size of data chunks
ChunkNum *int `json:"nchunks"` // number of data chunks
// optional extra fields
MD5 string `json:"md5,omitempty"`
SHA1 string `json:"sha1,omitempty"`
MD5 string `json:"md5,omitempty"`
SHA1 string `json:"sha1,omitempty"`
XactID string `json:"txn,omitempty"` // transaction ID for norename transactions
}
// marshalSimpleJSON
@@ -2275,16 +2407,20 @@ type metaSimpleJSON struct {
// - if file contents can be mistaken as meta object
// - if consistent hashing is On but wrapped remote can't provide given hash
//
func marshalSimpleJSON(ctx context.Context, size int64, nChunks int, md5, sha1 string) ([]byte, error) {
func marshalSimpleJSON(ctx context.Context, size int64, nChunks int, md5, sha1, xactID string) ([]byte, error) {
version := metadataVersion
if xactID == "" && version == 2 {
version = 1
}
metadata := metaSimpleJSON{
// required core fields
Version: &version,
Size: &size,
ChunkNum: &nChunks,
// optional extra fields
MD5: md5,
SHA1: sha1,
MD5: md5,
SHA1: sha1,
XactID: xactID,
}
data, err := json.Marshal(&metadata)
if err == nil && data != nil && len(data) >= maxMetadataSizeWritten {
@@ -2362,6 +2498,7 @@ func unmarshalSimpleJSON(ctx context.Context, metaObject fs.Object, data []byte)
info.nChunks = *metadata.ChunkNum
info.md5 = metadata.MD5
info.sha1 = metadata.SHA1
info.xactID = metadata.XactID
return info, true, nil
}
@@ -2394,6 +2531,11 @@ func (f *Fs) Precision() time.Duration {
return f.base.Precision()
}
// CanQuickRename returns true if the Fs supports a quick rename operation
func (f *Fs) CanQuickRename() bool {
return f.base.Features().Move != nil
}
// Check the interfaces are satisfied
var (
_ fs.Fs = (*Fs)(nil)