From 509b9b96a6a40679fb9912dbff41bfe484943cad Mon Sep 17 00:00:00 2001 From: buengese Date: Fri, 31 Jan 2020 19:18:19 +0100 Subject: [PATCH] press: rebase and fix the broken tests ... add additional error handling for metadata upload ... fix metadata wrapping ... make sure not to leave any orphaned data when overwriting existing files with Put, Copy or Move ... restructure some minor things to make them clearer ... remove xz compression --- backend/all/all.go | 1 + backend/press/alg_exec.go | 98 ---- backend/press/compression.go | 27 - backend/press/compression_test.go | 5 - backend/press/press.go | 784 ++++++++++++++++-------------- backend/press/press_test.go | 86 ++-- 6 files changed, 467 insertions(+), 534 deletions(-) delete mode 100644 backend/press/alg_exec.go diff --git a/backend/all/all.go b/backend/all/all.go index 3b693a1cd..fd26706aa 100644 --- a/backend/all/all.go +++ b/backend/all/all.go @@ -28,6 +28,7 @@ import ( _ "github.com/rclone/rclone/backend/opendrive" _ "github.com/rclone/rclone/backend/pcloud" _ "github.com/rclone/rclone/backend/premiumizeme" + _ "github.com/rclone/rclone/backend/press" _ "github.com/rclone/rclone/backend/putio" _ "github.com/rclone/rclone/backend/qingstor" _ "github.com/rclone/rclone/backend/s3" diff --git a/backend/press/alg_exec.go b/backend/press/alg_exec.go deleted file mode 100644 index be91911d6..000000000 --- a/backend/press/alg_exec.go +++ /dev/null @@ -1,98 +0,0 @@ -package press - -// This file implements shell exec algorithms that require binaries. -import ( - "bytes" - "io" - "os/exec" -) - -// XZ command -const xzcommand = "xz" // Name of xz binary (if available) - -// ExecHeader - Header we add to an exec file. We don't need this. -var ExecHeader = []byte{} - -// Function that checks whether XZ is present in the system -func checkXZ() bool { - _, err := exec.LookPath("xz") - if err != nil { - return false - } - return true -} - -// Function that gets binary paths if needed -func getBinPaths(c *Compression, mode int) (err error) { - err = nil - if mode == XZMin || mode == XZDefault { - c.BinPath, err = exec.LookPath(xzcommand) - } - return err -} - -// Function that compresses a block using a shell command without wrapping in gzip. Requires an binary corresponding with the command. -func (c *Compression) compressBlockExec(in []byte, out io.Writer, binaryPath string, args []string) (compressedSize uint32, uncompressedSize int64, err error) { - // Initialize compression subprocess - subprocess := exec.Command(binaryPath, args...) - stdin, err := subprocess.StdinPipe() - if err != nil { - return 0, 0, err - } - - // Run subprocess that creates compressed file - stdinError := make(chan error) - go func() { - _, err := stdin.Write(in) - _ = stdin.Close() - stdinError <- err - }() - - // Get output - output, err := subprocess.Output() - if err != nil { - return 0, 0, err - } - - // Copy over - n, err := io.Copy(out, bytes.NewReader(output)) - if err != nil { - return uint32(n), int64(len(in)), err - } - - // Check if there was an error and return - err = <-stdinError - - return uint32(n), int64(len(in)), err -} - -// Utility function to decompress a block range using a shell command which wasn't wrapped in gzip -func decompressBlockRangeExec(in io.Reader, out io.Writer, binaryPath string, args []string) (n int, err error) { - // Decompress actual compression - // Initialize decompression subprocess - subprocess := exec.Command(binaryPath, args...) - stdin, err := subprocess.StdinPipe() - if err != nil { - return 0, err - } - - // Run subprocess that copies over compressed block - stdinError := make(chan error) - go func() { - _, err := io.Copy(stdin, in) - _ = stdin.Close() - stdinError <- err - }() - - // Get output, copy, and return - output, err := subprocess.Output() - if err != nil { - return 0, err - } - n64, err := io.Copy(out, bytes.NewReader(output)) - if err != nil { - return int(n64), err - } - err = <-stdinError - return int(n64), err -} diff --git a/backend/press/compression.go b/backend/press/compression.go index 589199c74..eb99bd2ff 100644 --- a/backend/press/compression.go +++ b/backend/press/compression.go @@ -36,8 +36,6 @@ const ( GzipMax = 3 LZ4 = 4 Snappy = 5 - XZMin = 6 - XZDefault = 7 ) // Errors @@ -72,10 +70,6 @@ func NewCompressionPreset(preset string) (*Compression, error) { return NewCompression(GzipMin, 131070) // GZIP-min compression (fast) case "gzip-default": return NewCompression(GzipDefault, 131070) // GZIP-default compression (medium) - case "xz-min": - return NewCompression(XZMin, 524288) // XZ-min compression (slow) - case "xz-default": - return NewCompression(XZDefault, 1048576) // XZ-default compression (very slow) } return nil, errors.New("Compression mode doesn't exist") } @@ -93,10 +87,6 @@ func NewCompressionPresetNumber(preset int) (*Compression, error) { return NewCompression(GzipMin, 131070) // GZIP-min compression (fast) case GzipDefault: return NewCompression(GzipDefault, 131070) // GZIP-default compression (medium) - case XZMin: - return NewCompression(XZMin, 524288) // XZ-min compression (slow) - case XZDefault: - return NewCompression(XZDefault, 1048576) // XZ-default compression (very slow) } return nil, errors.New("Compression mode doesn't exist") } @@ -115,8 +105,6 @@ func NewCompressionAdvanced(mode int, bs uint32, hb int64, threads int, mcr floa c.HeuristicBytes = hb c.NumThreads = threads c.MaxCompressionRatio = mcr - // Get binary path if needed - err = getBinPaths(c, mode) return c, err } @@ -131,8 +119,6 @@ func (c *Compression) GetFileExtension() string { switch c.CompressionMode { case GzipStore, GzipMin, GzipDefault, GzipMax: return ".gz" - case XZMin, XZDefault: - return ".xzgz" case LZ4: return ".lz4" case Snappy: @@ -142,7 +128,6 @@ func (c *Compression) GetFileExtension() string { } // GetFileCompressionInfo gets a file extension along with compressibility of file -// It is currently not being used but may be usable in the future. func (c *Compression) GetFileCompressionInfo(reader io.Reader) (compressable bool, extension string, err error) { // Use our compression algorithm to do a heuristic on the first few bytes var emulatedBlock, emulatedBlockCompressed bytes.Buffer @@ -170,8 +155,6 @@ func (c *Compression) getHeader() []byte { switch c.CompressionMode { case GzipStore, GzipMin, GzipDefault, GzipMax: return GzipHeader - case XZMin, XZDefault: - return ExecHeader case LZ4: return LZ4Header case Snappy: @@ -185,8 +168,6 @@ func (c *Compression) getFooter() []byte { switch c.CompressionMode { case GzipStore, GzipMin, GzipDefault, GzipMax: return []byte{} - case XZMin, XZDefault: - return []byte{} case LZ4: return LZ4Footer case Snappy: @@ -207,10 +188,6 @@ func (c *Compression) compressBlock(in []byte, out io.Writer) (compressedSize ui return c.compressBlockGz(in, out, 6) case GzipMax: return c.compressBlockGz(in, out, 9) - case XZDefault: - return c.compressBlockExec(in, out, c.BinPath, []string{"-c"}) - case XZMin: - return c.compressBlockExec(in, out, c.BinPath, []string{"-c1"}) case LZ4: return c.compressBlockLz4(in, out) case Snappy: @@ -349,10 +326,6 @@ func (d *Decompressor) decompressBlock(in io.Reader, out io.Writer) (n int, err switch d.c.CompressionMode { // Select decompression function based off compression mode case GzipStore, GzipMin, GzipDefault, GzipMax: return decompressBlockRangeGz(in, out) - case XZMin: - return decompressBlockRangeExec(in, out, d.c.BinPath, []string{"-dc1"}) - case XZDefault: - return decompressBlockRangeExec(in, out, d.c.BinPath, []string{"-dc"}) case LZ4: return decompressBlockLz4(in, out, int64(d.c.BlockSize)) case Snappy: diff --git a/backend/press/compression_test.go b/backend/press/compression_test.go index 51927e00d..c4520b0e1 100644 --- a/backend/press/compression_test.go +++ b/backend/press/compression_test.go @@ -123,11 +123,6 @@ func getCompressibleString(size int) string { func TestCompression(t *testing.T) { testCases := []string{"lz4", "snappy", "gzip-min"} - if checkXZ() { - testCases = append(testCases, "xz-min") - } else { - t.Log("XZ binary not found on current system. Not testing xz.") - } for _, tc := range testCases { t.Run(tc, func(t *testing.T) { testSmallLarge(t, tc) diff --git a/backend/press/press.go b/backend/press/press.go index 3313495a2..1393dce6f 100644 --- a/backend/press/press.go +++ b/backend/press/press.go @@ -5,6 +5,7 @@ import ( "bufio" "bytes" "compress/gzip" + "context" "crypto/md5" "encoding/binary" "encoding/gob" @@ -17,24 +18,18 @@ import ( "github.com/gabriel-vasile/mimetype" - "github.com/ncw/rclone/fs" - "github.com/ncw/rclone/fs/accounting" - "github.com/ncw/rclone/fs/chunkedreader" - "github.com/ncw/rclone/fs/config/configmap" - "github.com/ncw/rclone/fs/config/configstruct" - "github.com/ncw/rclone/fs/fspath" - "github.com/ncw/rclone/fs/hash" - "github.com/ncw/rclone/fs/operations" // Used for Rcat "github.com/pkg/errors" + "github.com/rclone/rclone/fs" + "github.com/rclone/rclone/fs/accounting" + "github.com/rclone/rclone/fs/chunkedreader" + "github.com/rclone/rclone/fs/config/configmap" + "github.com/rclone/rclone/fs/config/configstruct" + "github.com/rclone/rclone/fs/fspath" + "github.com/rclone/rclone/fs/hash" + "github.com/rclone/rclone/fs/operations" + // Used for Rcat ) -/** -NOTES: -Filenames are now . -Hashes and mime types now supported -Metadata files now used to store metadata and point to actual files -**/ - // Globals // Register with Fs func init() { @@ -53,16 +48,6 @@ func init() { Help: "Standard gzip compression with default parameters.", }, } - if checkXZ() { // If XZ is on the system, append compression mode options that are only available with the XZ binary installed - compressionModeOptions = append(compressionModeOptions, []fs.OptionExample{{ - Value: "xz-min", - Help: "Slow but powerful compression with reasonable speed.", - }, { - Value: "xz-default", - Help: "Slowest but best compression.", - }, - }...) - } // Register our remote fs.Register(&fs.RegInfo{ @@ -106,18 +91,21 @@ func NewFs(name, rpath string, m configmap.Mapper) (fs.Fs, error) { if err != nil { return nil, err } - c, err := newCompressionForConfig(opt) - if err != nil { - return nil, err - } + remote := opt.Remote if strings.HasPrefix(remote, name+":") { return nil, errors.New("can't point press remote at itself - check the value of the remote setting") } + wInfo, wName, wPath, wConfig, err := fs.ConfigFs(remote) if err != nil { return nil, errors.Wrapf(err, "failed to parse remote %q to wrap", remote) } + + c, err := newCompressionForConfig(opt) + if err != nil { + return nil, err + } // Strip trailing slashes if they exist in rpath rpath = strings.TrimRight(rpath, "\\/") @@ -148,13 +136,16 @@ func NewFs(name, rpath string, m configmap.Mapper) (fs.Fs, error) { DuplicateFiles: true, ReadMimeType: false, WriteMimeType: false, + GetTier: true, + SetTier: true, BucketBased: true, CanHaveEmptyDirectories: true, - SetTier: true, - GetTier: true, }).Fill(f).Mask(wrappedFs).WrapsFs(f, wrappedFs) // We support reading MIME types no matter the wrapped fs f.features.ReadMimeType = true + if wrappedFs.Features().Move == nil && wrappedFs.Features().Copy == nil { + f.features.PutStream = nil + } return f, err } @@ -253,42 +244,6 @@ type Fs struct { c *Compression } -// Name of the remote (as passed into NewFs) -func (f *Fs) Name() string { - return f.name -} - -// Root of the remote (as passed into NewFs) -func (f *Fs) Root() string { - return f.root -} - -// Features returns the optional features of this Fs -func (f *Fs) Features() *fs.Features { - return f.features -} - -// String returns a description of the FS -func (f *Fs) String() string { - return fmt.Sprintf("Compressed drive '%s:%s'", f.name, f.root) -} - -// Get an object from a metadata file -/*func (f *Fs) addMeta(entries *fs.DirEntries, mo fs.Object) { - meta := readMetadata(mo) - origFileName, err := processMetadataName(mo.Remote()) - if err != nil { - fs.Errorf(mo, "Not a metadata file: %v", err) - return - } - o, err := f.Fs.NewObject(generateDataNameFromCompressionMode(origFileName, meta.Size, meta.CompressionMode)) - if err != nil { - fs.Errorf(mo, "Metadata corrupted: %v", err) - return - } - *entries = append(*entries, f.newObject(o, mo, meta)) -}*/ - // Get an Object from a data DirEntry func (f *Fs) addData(entries *fs.DirEntries, o fs.Object) { origFileName, _, size, err := processFileName(o.Remote()) @@ -344,8 +299,8 @@ func (f *Fs) processEntries(entries fs.DirEntries) (newEntries fs.DirEntries, er // This should return ErrDirNotFound if the directory isn't // found. // List entries and process them -func (f *Fs) List(dir string) (entries fs.DirEntries, err error) { - entries, err = f.Fs.List(dir) +func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err error) { + entries, err = f.Fs.List(ctx, dir) if err != nil { return nil, err } @@ -368,8 +323,8 @@ func (f *Fs) List(dir string) (entries fs.DirEntries, err error) { // // Don't implement this unless you have a more efficient way // of listing recursively that doing a directory traversal. -func (f *Fs) ListR(dir string, callback fs.ListRCallback) (err error) { - return f.Fs.Features().ListR(dir, func(entries fs.DirEntries) error { +func (f *Fs) ListR(ctx context.Context, dir string, callback fs.ListRCallback) (err error) { + return f.Fs.Features().ListR(ctx, dir, func(entries fs.DirEntries) error { newEntries, err := f.processEntries(entries) if err != nil { return err @@ -379,18 +334,18 @@ func (f *Fs) ListR(dir string, callback fs.ListRCallback) (err error) { } // NewObject finds the Object at remote. -func (f *Fs) NewObject(remote string) (fs.Object, error) { +func (f *Fs) NewObject(ctx context.Context, remote string) (fs.Object, error) { // Read metadata from metadata object - mo, err := f.Fs.NewObject(generateMetadataName(remote)) + mo, err := f.Fs.NewObject(ctx, generateMetadataName(remote)) if err != nil { return nil, err } - meta := readMetadata(mo) + meta := readMetadata(ctx, mo) if meta == nil { return nil, errors.New("error decoding metadata") } // Create our Object - o, err := f.Fs.NewObject(generateDataNameFromCompressionMode(remote, meta.Size, meta.CompressionMode)) + o, err := f.Fs.NewObject(ctx, generateDataNameFromCompressionMode(remote, meta.Size, meta.CompressionMode)) return f.newObject(o, mo, meta), err } @@ -407,23 +362,23 @@ func (c *Compression) checkFileCompressibilityAndType(in io.Reader) (newReader i if err != nil { return nil, false, "", err } - mimeType, _ = mimetype.Detect(b.Bytes()) + mime := mimetype.Detect(b.Bytes()) in = io.MultiReader(bytes.NewReader(b.Bytes()), in) in = wrap(in) - return in, compressible, mimeType, nil + return in, compressible, mime.String(), nil } // Verifies an object hash -func (f *Fs) verifyObjectHash(o fs.Object, hasher *hash.MultiHasher, ht hash.Type) (err error) { +func (f *Fs) verifyObjectHash(ctx context.Context, o fs.Object, hasher *hash.MultiHasher, ht hash.Type) (err error) { srcHash := hasher.Sums()[ht] var dstHash string - dstHash, err = o.Hash(ht) + dstHash, err = o.Hash(ctx, ht) if err != nil { return errors.Wrap(err, "failed to read destination hash") } if srcHash != "" && dstHash != "" && srcHash != dstHash { // remove object - err = o.Remove() + err = o.Remove(ctx) if err != nil { fs.Errorf(o, "Failed to remove corrupted object: %v", err) } @@ -432,7 +387,7 @@ func (f *Fs) verifyObjectHash(o fs.Object, hasher *hash.MultiHasher, ht hash.Typ return nil } -type putFn func(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) +type putFn func(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) type blockDataAndError struct { err error @@ -440,7 +395,7 @@ type blockDataAndError struct { } // Put a compressed version of a file. Returns a wrappable object and metadata. -func (f *Fs) putCompress(in io.Reader, src fs.ObjectInfo, options []fs.OpenOption, put putFn, mimeType string, verifyCompressedObject bool) (fs.Object, *ObjectMetadata, error) { +func (f *Fs) putCompress(ctx context.Context, in io.Reader, src fs.ObjectInfo, options []fs.OpenOption, put putFn, mimeType string, verifyCompressedObject bool) (fs.Object, *ObjectMetadata, error) { // Unwrap reader accounting in, wrap := accounting.UnWrap(in) @@ -483,24 +438,23 @@ func (f *Fs) putCompress(in io.Reader, src fs.ObjectInfo, options []fs.OpenOptio } // Transfer the data - // o, err := put(wrappedIn, f.renameObjectInfo(src, f.c.generateDataName(src.Remote(), src.Size(), true), -1), options...) - o, err := operations.Rcat(f.Fs, f.c.generateDataName(src.Remote(), src.Size(), true), ioutil.NopCloser(wrappedIn), src.ModTime()) + //o, err := put(ctx, wrappedIn, f.wrapInfo(src, f.c.generateDataName(src.Remote(), src.Size(), true), src.Size()), options...) + o, err := operations.Rcat(ctx, f.Fs, f.c.generateDataName(src.Remote(), src.Size(), true), ioutil.NopCloser(wrappedIn), src.ModTime(ctx)) if err != nil { if o != nil { - removeErr := o.Remove() + removeErr := o.Remove(ctx) if removeErr != nil { fs.Errorf(o, "Failed to remove partially transferred object: %v", err) } } return nil, nil, err } - // Check whether we got an error during compression result := <-compressionResult err = result.err if err != nil { if o != nil { - removeErr := o.Remove() + removeErr := o.Remove(ctx) if removeErr != nil { fs.Errorf(o, "Failed to remove partially compressed object: %v", err) } @@ -511,11 +465,11 @@ func (f *Fs) putCompress(in io.Reader, src fs.ObjectInfo, options []fs.OpenOptio // Generate metadata blockData := result.blockData _, _, decompressedSize := parseBlockData(blockData, f.c.BlockSize) - meta := newMetadata(decompressedSize, f.c.CompressionMode, blockData, metaHasher.Sum([]byte{}), mimeType) + meta := newMetadata(decompressedSize, f.c.CompressionMode, blockData, metaHasher.Sum(nil), mimeType) // Check the hashes of the compressed data if we were comparing them if ht != hash.None && hasher != nil { - err := f.verifyObjectHash(o, hasher, ht) + err := f.verifyObjectHash(ctx, o, hasher, ht) if err != nil { return nil, nil, err } @@ -525,7 +479,7 @@ func (f *Fs) putCompress(in io.Reader, src fs.ObjectInfo, options []fs.OpenOptio } // Put an uncompressed version of a file. Returns a wrappable object and metadata. -func (f *Fs) putUncompress(in io.Reader, src fs.ObjectInfo, options []fs.OpenOption, put putFn, mimeType string, verifyCompressedObject bool) (fs.Object, *ObjectMetadata, error) { +func (f *Fs) putUncompress(ctx context.Context, in io.Reader, src fs.ObjectInfo, options []fs.OpenOption, put putFn, mimeType string, verifyCompressedObject bool) (fs.Object, *ObjectMetadata, error) { // Unwrap the accounting, add our metadata hasher, then wrap it back on in, wrap := accounting.UnWrap(in) metaHasher := md5.New() @@ -548,11 +502,11 @@ func (f *Fs) putUncompress(in io.Reader, src fs.ObjectInfo, options []fs.OpenOpt wrappedIn = wrap(wrappedIn) } // Put the object - o, err := put(wrappedIn, f.renameObjectInfo(src, f.c.generateDataName(src.Remote(), src.Size(), false), src.Size()), options...) + o, err := put(ctx, wrappedIn, f.wrapInfo(src, f.c.generateDataName(src.Remote(), src.Size(), false), src.Size()), options...) //o, err := operations.Rcat(f, f.c.generateDataName(src.Remote(), src.Size(), false), wrappedIn, src.ModTime()) if err != nil { if o != nil { - removeErr := o.Remove() + removeErr := o.Remove(ctx) if removeErr != nil { fs.Errorf(o, "Failed to remove partially transferred object: %v", err) } @@ -561,7 +515,7 @@ func (f *Fs) putUncompress(in io.Reader, src fs.ObjectInfo, options []fs.OpenOpt } // Check the hashes of the compressed data if we were comparing them if ht != hash.None && hasher != nil { - err := f.verifyObjectHash(o, hasher, ht) + err := f.verifyObjectHash(ctx, o, hasher, ht) if err != nil { return nil, nil, err } @@ -571,7 +525,7 @@ func (f *Fs) putUncompress(in io.Reader, src fs.ObjectInfo, options []fs.OpenOpt } // This function will write a metadata struct to a metadata Object for an src. Returns a wrappable metadata object. -func (f *Fs) putMetadata(meta *ObjectMetadata, src fs.ObjectInfo, options []fs.OpenOption, put putFn, verifyCompressedObject bool) (mo fs.Object, err error) { +func (f *Fs) putMetadata(ctx context.Context, meta *ObjectMetadata, src fs.ObjectInfo, options []fs.OpenOption, put putFn, verifyCompressedObject bool) (mo fs.Object, err error) { // Generate the metadata contents var b bytes.Buffer gzipWriter := gzip.NewWriter(&b) @@ -586,63 +540,48 @@ func (f *Fs) putMetadata(meta *ObjectMetadata, src fs.ObjectInfo, options []fs.O } var metaReader io.Reader metaReader = bytes.NewReader(b.Bytes()) - // If verifyCompressedObject is on, find a hash the destination supports to compute a hash of - // the compressed data. - ht := f.Fs.Hashes().GetOne() - var hasher *hash.MultiHasher - if ht != hash.None && verifyCompressedObject { - hasher, err = hash.NewMultiHasherTypes(hash.NewHashSet(ht)) - if err != nil { - return nil, err - } - metaReader = io.TeeReader(metaReader, hasher) - } + // Put the data - mo, err = put(metaReader, f.renameObjectInfo(src, generateMetadataName(src.Remote()), int64(b.Len())), options...) + mo, err = put(ctx, metaReader, f.wrapInfo(src, generateMetadataName(src.Remote()), int64(b.Len())), options...) if err != nil { - removeErr := mo.Remove() + removeErr := mo.Remove(ctx) if removeErr != nil { fs.Errorf(mo, "Failed to remove partially transferred object: %v", err) } return nil, err } - // Check the hashes of the compressed data if we were comparing them - if ht != hash.None && hasher != nil { - err := f.verifyObjectHash(mo, hasher, ht) - if err != nil { - return nil, err - } - } return mo, nil } // This function will put both the data and metadata for an Object. // putData is the function used for data, while putMeta is the function used for metadata. -func (f *Fs) putWithCustomFunctions(in io.Reader, src fs.ObjectInfo, options []fs.OpenOption, putData putFn, putMeta putFn, verifyCompressedObject bool) (*Object, error) { - // Check compressibility of file - in, compressible, mimeType, err := f.c.checkFileCompressibilityAndType(in) - if err != nil { - return nil, err - } +func (f *Fs) putWithCustomFunctions(ctx context.Context, in io.Reader, src fs.ObjectInfo, options []fs.OpenOption, + putData putFn, putMeta putFn, compressible bool, mimeType string, verifyCompressedObject bool) (*Object, error) { // Put file then metadata var dataObject fs.Object var meta *ObjectMetadata + var err error if compressible { - dataObject, meta, err = f.putCompress(in, src, options, putData, mimeType, verifyCompressedObject) + dataObject, meta, err = f.putCompress(ctx, in, src, options, putData, mimeType, verifyCompressedObject) } else { - dataObject, meta, err = f.putUncompress(in, src, options, putData, mimeType, verifyCompressedObject) + dataObject, meta, err = f.putUncompress(ctx, in, src, options, putData, mimeType, verifyCompressedObject) } if err != nil { return nil, err } - mo, err := f.putMetadata(meta, src, options, putMeta, verifyCompressedObject) - return f.newObject(dataObject, mo, meta), err -} -// This function will put both the data and metadata for an Object, using the default f.Fs.Put for metadata and checking file hashes. -func (f *Fs) put(in io.Reader, src fs.ObjectInfo, options []fs.OpenOption, put putFn) (*Object, error) { - return f.putWithCustomFunctions(in, src, options, put, f.Fs.Put, true) + mo, err := f.putMetadata(ctx, meta, src, options, putMeta, verifyCompressedObject) + + // meta data upload may fail. in this case we try to remove the original object + if err != nil { + removeError := dataObject.Remove(ctx) + if removeError != nil { + return nil, removeError + } + return nil, err + } + return f.newObject(dataObject, mo, meta), err } // Put in to the remote path with the modTime given of the given size @@ -650,27 +589,95 @@ func (f *Fs) put(in io.Reader, src fs.ObjectInfo, options []fs.OpenOption, put p // May create the object even if it returns an error - if so // will return the object and the error, otherwise will return // nil and the error -func (f *Fs) Put(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) { - return f.put(in, src, options, f.Fs.Put) +func (f *Fs) Put(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) { + // If there's already an existent objects we need to make sure to explcitly update it to make sure we don't leave + // orphaned data. Alternatively we could also deleted (which would simpler) but has the disadvantage that it + // destroys all server-side versioning. + o, err := f.NewObject(ctx, src.Remote()) + if err == fs.ErrorObjectNotFound { + // Get our file compressibility + in, compressible, mimeType, err := f.c.checkFileCompressibilityAndType(in) + if err != nil { + return nil, err + } + return f.putWithCustomFunctions(ctx, in, src, options, f.Fs.Put, f.Fs.Put, compressible, mimeType, true) + } + if err != nil { + return nil, err + } + return o, o.Update(ctx, in, src, options...) } // PutStream uploads to the remote path with the modTime given of indeterminate size -func (f *Fs) PutStream(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) { - return f.put(in, src, options, f.Fs.Features().PutStream) +func (f *Fs) PutStream(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) { + oldObj, err := f.NewObject(ctx, src.Remote()) + if err != nil && err != fs.ErrorObjectNotFound { + return nil, err + } + found := err == nil + + in, compressible, mimeType, err := f.c.checkFileCompressibilityAndType(in) + if err != nil { + return nil, err + } + newObj, err := f.putWithCustomFunctions(ctx, in, src, options, f.Fs.Features().PutStream, f.Fs.Put, compressible, mimeType, true) + if err != nil { + return nil, err + } + + // Our transfer is now complete if we allready had an object with the same name we can safely remove it now + // this is necessary to make sure we don't leave the remote in an inconsistent state. + if found { + err = oldObj.(*Object).Object.Remove(ctx) + if err != nil { + return nil, errors.Wrap(err, "Could remove original object") + } + } + + moveFs, ok := f.Fs.(fs.Mover) + var wrapObj fs.Object + if ok { + wrapObj, err = moveFs.Move(ctx, newObj.Object, f.c.generateDataName(src.Remote(), newObj.size, compressible)) + if err != nil { + return nil, errors.Wrap(err, "Couldn't rename streamed object.") + } + } + + // If we don't have move we'll need to resort to serverside copy and remove + copyFs, ok := f.Fs.(fs.Copier) + if ok { + wrapObj, err := copyFs.Copy(ctx, newObj.Object, f.c.generateDataName(src.Remote(), newObj.size, compressible)) + if err != nil { + return nil, errors.Wrap(err, "Could't copy streamed object.") + } + // Remove the original + err = newObj.Remove(ctx) + if err != nil { + return wrapObj, errors.Wrap(err, "Couldn't remove original streamed object. Remote may be in an incositent state.") + } + } + + newObj.Object = wrapObj + + return newObj, nil } +// Temporarely disabled. There might be a way to implement this correctly but with the current handling metadata duplicate objects +// will break stuff. Right no I can't think of a way to make this work. + // PutUnchecked uploads the object // // This will create a duplicate if we upload a new file without // checking to see if there is one already - use Put() for that. -func (f *Fs) PutUnchecked(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) { +/*func (f *Fs) PutUnchecked(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) { // If PutUnchecked is supported, do it. + // I'm unsure about this. With the current metadata model this might actually break things. Needs some manual testing. do := f.Fs.Features().PutUnchecked if do == nil { return nil, errors.New("can't PutUnchecked") } - return f.putWithCustomFunctions(in, src, options, do, do, false) -} + return f.putWithCustomFunctions(ctx, in, src, options, do, do, false) +}*/ // Hashes returns the supported hash sets. func (f *Fs) Hashes() hash.Set { @@ -680,15 +687,15 @@ func (f *Fs) Hashes() hash.Set { // Mkdir makes the directory (container, bucket) // // Shouldn't return an error if it already exists -func (f *Fs) Mkdir(dir string) error { - return f.Fs.Mkdir(dir) +func (f *Fs) Mkdir(ctx context.Context, dir string) error { + return f.Fs.Mkdir(ctx, dir) } // Rmdir removes the directory (container, bucket) if empty // // Return an error if it doesn't exist or isn't empty -func (f *Fs) Rmdir(dir string) error { - return f.Fs.Rmdir(dir) +func (f *Fs) Rmdir(ctx context.Context, dir string) error { + return f.Fs.Rmdir(ctx, dir) } // Purge all files in the root and the root directory @@ -697,12 +704,12 @@ func (f *Fs) Rmdir(dir string) error { // quicker than just running Remove() on the result of List() // // Return an error if it doesn't exist -func (f *Fs) Purge() error { +func (f *Fs) Purge(ctx context.Context) error { do := f.Fs.Features().Purge if do == nil { return fs.ErrorCantPurge } - return do() + return do(ctx) } // Copy src to this remote using server side copy operations. @@ -714,7 +721,7 @@ func (f *Fs) Purge() error { // Will only be called if src.Fs().Name() == f.Name() // // If it isn't possible then return fs.ErrorCantCopy -func (f *Fs) Copy(src fs.Object, remote string) (fs.Object, error) { +func (f *Fs) Copy(ctx context.Context, src fs.Object, remote string) (fs.Object, error) { do := f.Fs.Features().Copy if do == nil { return nil, fs.ErrorCantCopy @@ -723,19 +730,32 @@ func (f *Fs) Copy(src fs.Object, remote string) (fs.Object, error) { if !ok { return nil, fs.ErrorCantCopy } + // We might be trying to overwrite a file with a newer version but due to size difference the name + // is different. Therefore we have to remove the old file first (if it exists). + dstFile, err := f.NewObject(ctx, remote) + if err != nil && err != fs.ErrorObjectNotFound { + return nil, err + } + if err == nil { + err := dstFile.Remove(ctx) + if err != nil { + return nil, err + } + } + // Copy over metadata - err := o.loadMetadataObjectIfNotLoaded() + err = o.loadMetadataIfNotLoaded(ctx) if err != nil { return nil, err } newFilename := generateMetadataName(remote) - moResult, err := do(o.mo, newFilename) + moResult, err := do(ctx, o.mo, newFilename) if err != nil { return nil, err } // Copy over data newFilename = generateDataNameFromCompressionMode(remote, src.Size(), o.meta.CompressionMode) - oResult, err := do(o.Object, newFilename) + oResult, err := do(ctx, o.Object, newFilename) if err != nil { return nil, err } @@ -751,7 +771,7 @@ func (f *Fs) Copy(src fs.Object, remote string) (fs.Object, error) { // Will only be called if src.Fs().Name() == f.Name() // // If it isn't possible then return fs.ErrorCantMove -func (f *Fs) Move(src fs.Object, remote string) (fs.Object, error) { +func (f *Fs) Move(ctx context.Context, src fs.Object, remote string) (fs.Object, error) { do := f.Fs.Features().Move if do == nil { return nil, fs.ErrorCantMove @@ -760,19 +780,33 @@ func (f *Fs) Move(src fs.Object, remote string) (fs.Object, error) { if !ok { return nil, fs.ErrorCantMove } + // We might be trying to overwrite a file with a newer version but due to size difference the name + // is different. Therefore we have to remove the old file first (if it exists). + dstFile, err := f.NewObject(ctx, remote) + if err != nil && err != fs.ErrorObjectNotFound { + return nil, err + } + if err == nil { + err := dstFile.Remove(ctx) + if err != nil { + return nil, err + } + } + // Move metadata - err := o.loadMetadataObjectIfNotLoaded() + err = o.loadMetadataIfNotLoaded(ctx) if err != nil { return nil, err } newFilename := generateMetadataName(remote) - moResult, err := do(o.mo, newFilename) + moResult, err := do(ctx, o.mo, newFilename) if err != nil { return nil, err } + // Move data newFilename = generateDataNameFromCompressionMode(remote, src.Size(), o.meta.CompressionMode) - oResult, err := do(o.Object, newFilename) + oResult, err := do(ctx, o.Object, newFilename) if err != nil { return nil, err } @@ -787,7 +821,7 @@ func (f *Fs) Move(src fs.Object, remote string) (fs.Object, error) { // If it isn't possible then return fs.ErrorCantDirMove // // If destination exists then return fs.ErrorDirExists -func (f *Fs) DirMove(src fs.Fs, srcRemote, dstRemote string) error { +func (f *Fs) DirMove(ctx context.Context, src fs.Fs, srcRemote, dstRemote string) error { do := f.Fs.Features().DirMove if do == nil { return fs.ErrorCantDirMove @@ -797,28 +831,28 @@ func (f *Fs) DirMove(src fs.Fs, srcRemote, dstRemote string) error { fs.Debugf(srcFs, "Can't move directory - not same remote type") return fs.ErrorCantDirMove } - return do(srcFs.Fs, srcRemote, dstRemote) + return do(ctx, srcFs.Fs, srcRemote, dstRemote) } // CleanUp the trash in the Fs // // Implement this if you have a way of emptying the trash or // otherwise cleaning up old versions of files. -func (f *Fs) CleanUp() error { +func (f *Fs) CleanUp(ctx context.Context) error { do := f.Fs.Features().CleanUp if do == nil { return errors.New("can't CleanUp") } - return do() + return do(ctx) } // About gets quota information from the Fs -func (f *Fs) About() (*fs.Usage, error) { +func (f *Fs) About(ctx context.Context) (*fs.Usage, error) { do := f.Fs.Features().About if do == nil { return nil, errors.New("About not supported") } - return do() + return do(ctx) } // UnWrap returns the Fs that this Fs is wrapping @@ -838,16 +872,16 @@ func (f *Fs) SetWrapper(wrapper fs.Fs) { // MergeDirs merges the contents of all the directories passed // in into the first one and rmdirs the other directories. -func (f *Fs) MergeDirs(dirs []fs.Directory) error { +func (f *Fs) MergeDirs(ctx context.Context, dirs []fs.Directory) error { do := f.Fs.Features().MergeDirs if do == nil { return errors.New("MergeDirs not supported") } out := make([]fs.Directory, len(dirs)) for i, dir := range dirs { - out[i] = fs.NewDirCopy(dir).SetRemote(dir.Remote()) + out[i] = fs.NewDirCopy(ctx, dir).SetRemote(dir.Remote()) } - return do(out) + return do(ctx, out) } // DirCacheFlush resets the directory cache - used in testing @@ -862,7 +896,7 @@ func (f *Fs) DirCacheFlush() { // ChangeNotify calls the passed function with a path // that has had changes. If the implementation // uses polling, it should adhere to the given interval. -func (f *Fs) ChangeNotify(notifyFunc func(string, fs.EntryType), pollIntervalChan <-chan time.Duration) { +func (f *Fs) ChangeNotify(ctx context.Context, notifyFunc func(string, fs.EntryType), pollIntervalChan <-chan time.Duration) { do := f.Fs.Features().ChangeNotify if do == nil { return @@ -885,21 +919,21 @@ func (f *Fs) ChangeNotify(notifyFunc func(string, fs.EntryType), pollIntervalCha } notifyFunc(wrappedPath, entryType) } - do(wrappedNotifyFunc, pollIntervalChan) + do(ctx, wrappedNotifyFunc, pollIntervalChan) } // PublicLink generates a public link to the remote path (usually readable by anyone) -func (f *Fs) PublicLink(remote string) (string, error) { +func (f *Fs) PublicLink(ctx context.Context, remote string) (string, error) { do := f.Fs.Features().PublicLink if do == nil { return "", errors.New("PublicLink not supported") } - o, err := f.NewObject(remote) + o, err := f.NewObject(ctx, remote) if err != nil { // assume it is a directory - return do(remote) + return do(ctx, remote) } - return do(o.(*Object).Object.Remote()) + return do(ctx, o.(*Object).Object.Remote()) } /*** OBJECT FUNCTIONS ***/ @@ -935,9 +969,9 @@ func newMetadata(size int64, compressionMode int, blockData []uint32, hash []byt } // This function will read the metadata from a metadata object. -func readMetadata(mo fs.Object) (meta *ObjectMetadata) { +func readMetadata(ctx context.Context, mo fs.Object) (meta *ObjectMetadata) { // Open our meradata object - rc, err := mo.Open() + rc, err := mo.Open(ctx) if err != nil { return nil } @@ -964,6 +998,104 @@ func readMetadata(mo fs.Object) (meta *ObjectMetadata) { return meta } +// Remove removes this object +func (o *Object) Remove(ctx context.Context) error { + err := o.loadMetadataObjectIfNotLoaded(ctx) + if err != nil { + return err + } + err = o.mo.Remove(ctx) + objErr := o.Object.Remove(ctx) + if err != nil { + return err + } + return objErr +} + +// ReadCloserWrapper combines a Reader and a Closer to a ReadCloser +type ReadCloserWrapper struct { + dataSource io.Reader + closer io.Closer +} + +func combineReaderAndCloser(dataSource io.Reader, closer io.Closer) *ReadCloserWrapper { + rc := new(ReadCloserWrapper) + rc.dataSource = dataSource + rc.closer = closer + return rc +} + +// Read function +func (w *ReadCloserWrapper) Read(p []byte) (n int, err error) { + return w.dataSource.Read(p) +} + +// Close function +func (w *ReadCloserWrapper) Close() error { + return w.closer.Close() +} + +// Update in to the object with the modTime given of the given size +func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (err error) { + err = o.loadMetadataIfNotLoaded(ctx) // Loads metadata object too + if err != nil { + return err + } + // Function that updates metadata object + updateMeta := func(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) { + return o.mo, o.mo.Update(ctx, in, src, options...) + } + + // Get our file compressibility + in, compressible, mimeType, err := o.f.c.checkFileCompressibilityAndType(in) + if err != nil { + return err + } + + // Since we're encoding the original filesize in the name we'll need to make sure that this name is updated before the actual update + var newObject *Object + origName := o.Remote() + if o.meta.CompressionMode != Uncompressed || compressible { + // If we aren't, we must either move-then-update or reupload-then-remove the object, and update the metadata. + // Check if this FS supports moving + moveFs, ok := o.f.Fs.(fs.Mover) + if ok { // If this fs supports moving, use move-then-update. This may help keep some versioning alive. + // First, move the object + var movedObject fs.Object + movedObject, err = moveFs.Move(ctx, o.Object, o.f.c.generateDataName(o.Remote(), src.Size(), compressible)) + if err != nil { + return err + } + // Create function that updates moved object, then update + update := func(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) { + return movedObject, movedObject.Update(ctx, in, src, options...) + } + newObject, err = o.f.putWithCustomFunctions(ctx, in, src, options, update, updateMeta, compressible, mimeType, true) + } else { // If this fs does not support moving, fall back to reuploading the object then removing the old one. + newObject, err = o.f.putWithCustomFunctions(ctx, in, o.f.wrapInfo(src, origName, src.Size()), options, o.f.Fs.Put, updateMeta, compressible, mimeType, true) + removeErr := o.Object.Remove(ctx) // Note: We must do remove later so a failed update doesn't destroy data. + if removeErr != nil { + return removeErr + } + } + } else { + // Function that updates object + update := func(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) { + return o.Object, o.Object.Update(ctx, in, src, options...) + } + // If we are, just update the object and metadata + newObject, err = o.f.putWithCustomFunctions(ctx, in, src, options, update, updateMeta, compressible, mimeType, true) + } + if err != nil { + return err + } + // Update object metadata and return + o.Object = newObject.Object + o.meta = newObject.meta + o.size = newObject.size + return nil +} + // This will initialize the variables of a new press Object. The metadata object, mo, and metadata struct, meta, must be specified. func (f *Fs) newObject(o fs.Object, mo fs.Object, meta *ObjectMetadata) *Object { return &Object{ @@ -989,21 +1121,21 @@ func (f *Fs) newObjectSizeAndNameOnly(o fs.Object, moName string, size int64) *O } // This loads the metadata of a press Object if it's not loaded yet -func (o *Object) loadMetadataIfNotLoaded() (err error) { - err = o.loadMetadataObjectIfNotLoaded() +func (o *Object) loadMetadataIfNotLoaded(ctx context.Context) (err error) { + err = o.loadMetadataObjectIfNotLoaded(ctx) if err != nil { return err } if o.meta == nil { - o.meta = readMetadata(o.mo) + o.meta = readMetadata(ctx, o.mo) } return err } // This loads the metadata object of a press Object if it's not loaded yet -func (o *Object) loadMetadataObjectIfNotLoaded() (err error) { +func (o *Object) loadMetadataObjectIfNotLoaded(ctx context.Context) (err error) { if o.mo == nil { - o.mo, err = o.f.Fs.NewObject(o.moName) + o.mo, err = o.f.Fs.NewObject(ctx, o.moName) } return err } @@ -1021,20 +1153,6 @@ func (o *Object) String() string { return o.Remote() } -// Remove removes this object -func (o *Object) Remove() error { - err := o.loadMetadataObjectIfNotLoaded() - if err != nil { - return err - } - err = o.mo.Remove() - objErr := o.Object.Remove() - if err != nil { - return err - } - return objErr -} - // Remote returns the remote path func (o *Object) Remote() string { origFileName, _, _, err := processFileName(o.Object.Remote()) @@ -1053,10 +1171,19 @@ func (o *Object) Size() int64 { return o.meta.Size } +// MimeType returns the MIME type of the file +func (o *Object) MimeType(ctx context.Context) string { + err := o.loadMetadataIfNotLoaded(ctx) + if err != nil { + return "error/error" + } + return o.meta.MimeType +} + // Hash returns the selected checksum of the file // If no checksum is available it returns "" -func (o *Object) Hash(ht hash.Type) (string, error) { - err := o.loadMetadataIfNotLoaded() +func (o *Object) Hash(ctx context.Context, ht hash.Type) (string, error) { + err := o.loadMetadataIfNotLoaded(ctx) if err != nil { return "", err } @@ -1066,13 +1193,23 @@ func (o *Object) Hash(ht hash.Type) (string, error) { return hex.EncodeToString(o.meta.Hash), nil } -// MimeType returns the MIME type of the file -func (o *Object) MimeType() string { - err := o.loadMetadataIfNotLoaded() - if err != nil { - return "error/error" +// SetTier performs changing storage tier of the Object if +// multiple storage classes supported +func (o *Object) SetTier(tier string) error { + do, ok := o.Object.(fs.SetTierer) + if !ok { + return errors.New("press: underlying remote does not support SetTier") } - return o.meta.MimeType + return do.SetTier(tier) +} + +// GetTier returns storage tier or class of the Object +func (o *Object) GetTier() string { + do, ok := o.Object.(fs.GetTierer) + if !ok { + return "" + } + return do.GetTier() } // UnWrap returns the wrapped Object @@ -1080,38 +1217,15 @@ func (o *Object) UnWrap() fs.Object { return o.Object } -// ReadCloserWrapper combines a Reader and a Closer to a ReadCloser -type ReadCloserWrapper struct { - dataSource io.Reader - closer io.Closer -} - -func combineReaderAndCloser(dataSource io.Reader, closer io.Closer) *ReadCloserWrapper { - rc := new(ReadCloserWrapper) - rc.dataSource = dataSource - rc.closer = closer - return rc -} - -// Read function -func (w *ReadCloserWrapper) Read(p []byte) (n int, err error) { - return w.dataSource.Read(p) -} - -// Close function -func (w *ReadCloserWrapper) Close() error { - return w.closer.Close() -} - // Open opens the file for read. Call Close() on the returned io.ReadCloser. Note that this call requires quite a bit of overhead. -func (o *Object) Open(options ...fs.OpenOption) (rc io.ReadCloser, err error) { - err = o.loadMetadataIfNotLoaded() +func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (rc io.ReadCloser, err error) { + err = o.loadMetadataIfNotLoaded(ctx) if err != nil { return nil, err } // If we're uncompressed, just pass this to the underlying object if o.meta.CompressionMode == Uncompressed { - return o.Object.Open(options...) + return o.Object.Open(ctx, options...) } // Get offset and limit from OpenOptions, pass the rest to the underlying remote var openOptions []fs.OpenOption = []fs.OpenOption{&fs.SeekOption{Offset: 0}} @@ -1127,7 +1241,7 @@ func (o *Object) Open(options ...fs.OpenOption) (rc io.ReadCloser, err error) { } } // Get a chunkedreader for the wrapped object - chunkedReader := chunkedreader.New(o.Object, initialChunkSize, maxChunkSize) + chunkedReader := chunkedreader.New(ctx, o.Object, initialChunkSize, maxChunkSize) // Get file handle c, err := NewCompressionPresetNumber(o.meta.CompressionMode) if err != nil { @@ -1155,60 +1269,76 @@ func (o *Object) Open(options ...fs.OpenOption) (rc io.ReadCloser, err error) { return combineReaderAndCloser(fileReader, chunkedReader), nil } -// Update in to the object with the modTime given of the given size -func (o *Object) Update(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (err error) { - err = o.loadMetadataIfNotLoaded() // Loads metadata object too - if err != nil { - return err +// ObjectInfo describes a wrapped fs.ObjectInfo for being the source +type ObjectInfo struct { + src fs.ObjectInfo + fs *Fs + remote string + size int64 +} + +func (f *Fs) wrapInfo(src fs.ObjectInfo, newRemote string, size int64) *ObjectInfo { + return &ObjectInfo{ + src: src, + fs: f, + remote: newRemote, + size: size, } - // Function that updates metadata object - updateMeta := func(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) { - return o.mo, o.mo.Update(in, src, options...) +} + +// Fs returns read only access to the Fs that this object is part of +func (o *ObjectInfo) Fs() fs.Info { + if o.fs == nil { + panic("stub ObjectInfo") } - // Get our file compressibility - in, compressible, _, err := o.f.c.checkFileCompressibilityAndType(in) - if err != nil { - return err + return o.fs +} + +// String returns string representation +func (o *ObjectInfo) String() string { + return o.src.String() +} + +// Storable returns whether object is storable +func (o *ObjectInfo) Storable() bool { + return o.src.Storable() +} + +// Remote returns the remote path +func (o *ObjectInfo) Remote() string { + if o.remote != "" { + return o.remote } - // Check if we're updating an uncompressed file with an uncompressible object - var newObject *Object - origName := o.Remote() - if o.meta.CompressionMode != Uncompressed || compressible { - // If we aren't, we must either move-then-update or reupload-then-remove the object, and update the metadata. - // Check if this FS supports moving - moveFs, ok := o.f.Fs.(fs.Mover) - if ok { // If this fs supports moving, use move-then-update. This may help keep some versioning alive. - // First, move the object - var movedObject fs.Object - movedObject, err = moveFs.Move(o.Object, o.f.c.generateDataName(o.Remote(), src.Size(), compressible)) - if err != nil { - return err - } - // Create function that updates moved object, then update - update := func(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) { - return movedObject, movedObject.Update(in, src, options...) - } - newObject, err = o.f.putWithCustomFunctions(in, src, options, update, updateMeta, true) - } else { // If this fs does not support moving, fall back to reuploading the object then removing the old one. - newObject, err = o.f.putWithCustomFunctions(in, o.f.renameObjectInfo(src, origName, src.Size()), options, o.f.Fs.Put, updateMeta, true) - removeErr := o.Object.Remove() // Note: We must do remove later so a failed update doesn't destroy data. - if removeErr != nil { - return removeErr - } - } - } else { - // Function that updates object - update := func(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) { - return o.Object, o.Object.Update(in, src, options...) - } - // If we are, just update the object and metadata - newObject, err = o.f.putWithCustomFunctions(in, src, options, update, updateMeta, true) + return o.src.Remote() +} + +// Size returns the size of the file +func (o *ObjectInfo) Size() int64 { + if o.size != -1 { + return o.size } - // Update object metadata and return - o.Object = newObject.Object - o.meta = newObject.meta - o.size = newObject.size - return err + return o.src.Size() +} + +// ModTime returns the modification time +func (o *ObjectInfo) ModTime(ctx context.Context) time.Time { + return o.src.ModTime(ctx) +} + +// Hash returns the selected checksum of the file +// If no checksum is available it returns "" +func (o *ObjectInfo) Hash(ctx context.Context, ht hash.Type) (string, error) { + if ht != hash.MD5 { + return "", hash.ErrUnsupported + } + if o.Size() != o.src.Size() { + return "", hash.ErrUnsupported + } + value, err := o.src.Hash(ctx, ht) + if err == hash.ErrUnsupported { + return "", hash.ErrUnsupported + } + return value, err } // ID returns the ID of the Object if known, or "" if not @@ -1220,108 +1350,29 @@ func (o *Object) ID() string { return do.ID() } -// SetTier performs changing storage tier of the Object if -// multiple storage classes supported -func (o *Object) SetTier(tier string) error { - do, ok := o.Object.(fs.SetTierer) - if !ok { - return errors.New("press: underlying remote does not support SetTier") - } - return do.SetTier(tier) +// Name of the remote (as passed into NewFs) +func (f *Fs) Name() string { + return f.name } -// GetTier returns storage tier or class of the Object -func (o *Object) GetTier() string { - do, ok := o.Object.(fs.GetTierer) - if !ok { - return "" - } - return do.GetTier() +// Root of the remote (as passed into NewFs) +func (f *Fs) Root() string { + return f.root } -// RenamedObjectInfo is the renamed representation of an ObjectInfo -type RenamedObjectInfo struct { - fs.ObjectInfo - remote string - size int64 +// Features returns the optional features of this Fs +func (f *Fs) Features() *fs.Features { + return f.features } -func (f *Fs) renameObjectInfo(src fs.ObjectInfo, newRemote string, size int64) *RenamedObjectInfo { - return &RenamedObjectInfo{ - ObjectInfo: src, - remote: newRemote, - size: size, - } +// Return a string version +func (f *Fs) String() string { + return fmt.Sprintf("Compressed: %s:%s", f.name, f.root) } -// Remote gets the remote of the RenamedObjectInfo -func (o *RenamedObjectInfo) Remote() string { - return o.remote -} - -// Size is unknown -func (o *RenamedObjectInfo) Size() int64 { - return o.size -} - -// ObjectInfo describes a wrapped fs.ObjectInfo for being the source -type ObjectInfo struct { - fs.ObjectInfo - f *Fs - meta *ObjectMetadata -} - -// Gets a new ObjectInfo from an src and a metadata struct -func (f *Fs) newObjectInfo(src fs.ObjectInfo) *ObjectInfo { - return &ObjectInfo{ - ObjectInfo: src, - f: f, - meta: nil, - } -} - -// Fs returns read only access to the Fs that this object is part of -func (o *ObjectInfo) Fs() fs.Info { - return o.f -} - -// Remote returns the remote path -func (o *ObjectInfo) Remote() string { - origFileName, _, _, err := processFileName(o.ObjectInfo.Remote()) - if err != nil { - fs.Errorf(o, "Could not get remote path for: %s", o.ObjectInfo.Remote()) - return o.ObjectInfo.Remote() - } - return origFileName -} - -// Size returns the size of the file -func (o *ObjectInfo) Size() int64 { - _, _, size, err := processFileName(o.ObjectInfo.Remote()) - if err != nil { - fs.Errorf(o, "Could not get size for: %s", o.ObjectInfo.Remote()) - return -1 - } - if size == -2 { // File is uncompressed - return o.ObjectInfo.Size() - } - return size -} - -// Hash returns the selected checksum of the file -// If no checksum is available it returns "" -func (o *ObjectInfo) Hash(ht hash.Type) (string, error) { - if o.meta == nil { - mo, err := o.f.NewObject(generateMetadataName(o.Remote())) - if err != nil { - return "", err - } - o.meta = readMetadata(mo) - } - if ht&hash.MD5 == 0 { - return "", hash.ErrUnsupported - } - return hex.EncodeToString(o.meta.Hash), nil +// Precision returns the precision of this Fs +func (f *Fs) Precision() time.Duration { + return f.Fs.Precision() } // Check the interfaces are satisfied @@ -1331,7 +1382,6 @@ var ( _ fs.Copier = (*Fs)(nil) _ fs.Mover = (*Fs)(nil) _ fs.DirMover = (*Fs)(nil) - _ fs.PutUncheckeder = (*Fs)(nil) _ fs.PutStreamer = (*Fs)(nil) _ fs.CleanUpper = (*Fs)(nil) _ fs.UnWrapper = (*Fs)(nil) @@ -1343,10 +1393,10 @@ var ( _ fs.ChangeNotifier = (*Fs)(nil) _ fs.PublicLinker = (*Fs)(nil) _ fs.ObjectInfo = (*ObjectInfo)(nil) + _ fs.GetTierer = (*Object)(nil) + _ fs.SetTierer = (*Object)(nil) _ fs.Object = (*Object)(nil) _ fs.ObjectUnWrapper = (*Object)(nil) _ fs.IDer = (*Object)(nil) - _ fs.SetTierer = (*Object)(nil) - _ fs.GetTierer = (*Object)(nil) _ fs.MimeTyper = (*Object)(nil) ) diff --git a/backend/press/press_test.go b/backend/press/press_test.go index db4110ca7..f65290669 100644 --- a/backend/press/press_test.go +++ b/backend/press/press_test.go @@ -6,9 +6,9 @@ import ( "path/filepath" "testing" - _ "github.com/ncw/rclone/backend/local" - "github.com/ncw/rclone/fstest" - "github.com/ncw/rclone/fstest/fstests" + _ "github.com/rclone/rclone/backend/local" + "github.com/rclone/rclone/fstest" + "github.com/rclone/rclone/fstest/fstests" ) // TestIntegration runs integration tests against the remote @@ -17,10 +17,21 @@ func TestIntegration(t *testing.T) { t.Skip("Skipping as -remote not set") } fstests.Run(t, &fstests.Opt{ - RemoteName: *fstest.RemoteName, - NilObject: (*Object)(nil), - UnimplementableFsMethods: []string{"OpenWriterAt"}, - UnimplementableObjectMethods: []string{}, + RemoteName: *fstest.RemoteName, + NilObject: (*Object)(nil), + UnimplementableFsMethods: []string{ + "OpenWriterAt", + "MergeDirs", + "DirCacheFlush", + "PutUnchecked", + "PutStream", + "UserInfo", + "Disconnect", + }, + UnimplementableObjectMethods: []string{ + "GetTier", + "SetTier", + }, }) } @@ -32,10 +43,21 @@ func TestRemoteLz4(t *testing.T) { tempdir := filepath.Join(os.TempDir(), "rclone-press-test-lz4") name := "TestPressLz4" fstests.Run(t, &fstests.Opt{ - RemoteName: name + ":", - NilObject: (*Object)(nil), - UnimplementableFsMethods: []string{"OpenWriterAt"}, - UnimplementableObjectMethods: []string{}, + RemoteName: name + ":", + NilObject: (*Object)(nil), + UnimplementableFsMethods: []string{ + "OpenWriterAt", + "MergeDirs", + "DirCacheFlush", + "PutUnchecked", + "PutStream", + "UserInfo", + "Disconnect", + }, + UnimplementableObjectMethods: []string{ + "GetTier", + "SetTier", + }, ExtraConfig: []fstests.ExtraConfigItem{ {Name: name, Key: "type", Value: "press"}, {Name: name, Key: "remote", Value: tempdir}, @@ -52,10 +74,21 @@ func TestRemoteGzip(t *testing.T) { tempdir := filepath.Join(os.TempDir(), "rclone-press-test-gzip") name := "TestPressGzip" fstests.Run(t, &fstests.Opt{ - RemoteName: name + ":", - NilObject: (*Object)(nil), - UnimplementableFsMethods: []string{"OpenWriterAt"}, - UnimplementableObjectMethods: []string{}, + RemoteName: name + ":", + NilObject: (*Object)(nil), + UnimplementableFsMethods: []string{ + "OpenWriterAt", + "MergeDirs", + "DirCacheFlush", + "PutUnchecked", + "PutStream", + "UserInfo", + "Disconnect", + }, + UnimplementableObjectMethods: []string{ + "GetTier", + "SetTier", + }, ExtraConfig: []fstests.ExtraConfigItem{ {Name: name, Key: "type", Value: "press"}, {Name: name, Key: "remote", Value: tempdir}, @@ -64,25 +97,4 @@ func TestRemoteGzip(t *testing.T) { }) } -// TestRemoteXZ tests XZ compression -func TestRemoteXZ(t *testing.T) { - if !checkXZ() { - t.Skip("XZ binary not found on current system") - } - if *fstest.RemoteName != "" { - t.Skip("Skipping as -remote set") - } - tempdir := filepath.Join(os.TempDir(), "rclone-press-test-xz") - name := "TestPressXZ" - fstests.Run(t, &fstests.Opt{ - RemoteName: name + ":", - NilObject: (*Object)(nil), - UnimplementableFsMethods: []string{"OpenWriterAt"}, - UnimplementableObjectMethods: []string{}, - ExtraConfig: []fstests.ExtraConfigItem{ - {Name: name, Key: "type", Value: "press"}, - {Name: name, Key: "remote", Value: tempdir}, - {Name: name, Key: "compression_mode", Value: "xz-min"}, - }, - }) -} +// TODO: Snappy needs testing