mirror of
https://github.com/rclone/rclone.git
synced 2025-12-06 00:03:32 +00:00
Added support for reading and writing zstd-compressed archives in seekable format using "github.com/klauspost/compress/zstd" and "github.com/SaveTheRbtz/zstd-seekable-format-go/pkg". Bumped Go version from 1.24.0 to 1.24.4 due to requirements of "github.com/SaveTheRbtz/zstd-seekable-format-go/pkg".
1522 lines
46 KiB
Go
1522 lines
46 KiB
Go
// Package compress provides wrappers for Fs and Object which implement compression.
|
||
package compress
|
||
|
||
import (
|
||
"bytes"
|
||
"context"
|
||
"encoding/base64"
|
||
"encoding/binary"
|
||
"encoding/hex"
|
||
"encoding/json"
|
||
"errors"
|
||
"fmt"
|
||
"io"
|
||
"os"
|
||
"path"
|
||
"regexp"
|
||
"strings"
|
||
"time"
|
||
|
||
"github.com/buengese/sgzip"
|
||
"github.com/gabriel-vasile/mimetype"
|
||
|
||
"github.com/rclone/rclone/fs"
|
||
"github.com/rclone/rclone/fs/accounting"
|
||
"github.com/rclone/rclone/fs/chunkedreader"
|
||
"github.com/rclone/rclone/fs/config/configmap"
|
||
"github.com/rclone/rclone/fs/config/configstruct"
|
||
"github.com/rclone/rclone/fs/fspath"
|
||
"github.com/rclone/rclone/fs/hash"
|
||
"github.com/rclone/rclone/fs/list"
|
||
"github.com/rclone/rclone/fs/log"
|
||
"github.com/rclone/rclone/fs/object"
|
||
"github.com/rclone/rclone/fs/operations"
|
||
)
|
||
|
||
// Globals
|
||
const (
|
||
initialChunkSize = 262144 // Initial and max sizes of chunks when reading parts of the file. Currently
|
||
maxChunkSize = 8388608 // at 256 KiB and 8 MiB.
|
||
chunkStreams = 0 // Streams to use for reading
|
||
|
||
bufferSize = 8388608
|
||
heuristicBytes = 1048576
|
||
minCompressionRatio = 1.1
|
||
|
||
gzFileExt = ".gz"
|
||
zstdFileExt = ".zst"
|
||
metaFileExt = ".json"
|
||
uncompressedFileExt = ".bin"
|
||
)
|
||
|
||
// Compression modes
|
||
const (
|
||
Uncompressed = 0
|
||
Gzip = 2
|
||
Zstd = 4
|
||
)
|
||
|
||
var nameRegexp = regexp.MustCompile(`^(.+?)\.([A-Za-z0-9-_]{11})$`)
|
||
|
||
// Register with Fs
|
||
func init() {
|
||
// Build compression mode options.
|
||
compressionModeOptions := []fs.OptionExample{
|
||
{ // Default compression mode options {
|
||
Value: "gzip",
|
||
Help: "Standard gzip compression with fastest parameters.",
|
||
},
|
||
{
|
||
Value: "zstd",
|
||
Help: "Zstandard compression — fast modern algorithm offering adjustable speed-to-compression tradeoffs.",
|
||
},
|
||
}
|
||
|
||
// Register our remote
|
||
fs.Register(&fs.RegInfo{
|
||
Name: "compress",
|
||
Description: "Compress a remote",
|
||
NewFs: NewFs,
|
||
MetadataInfo: &fs.MetadataInfo{
|
||
Help: `Any metadata supported by the underlying remote is read and written.`,
|
||
},
|
||
Options: []fs.Option{{
|
||
Name: "remote",
|
||
Help: "Remote to compress.",
|
||
Required: true,
|
||
}, {
|
||
Name: "mode",
|
||
Help: "Compression mode.",
|
||
Default: "gzip",
|
||
Examples: compressionModeOptions,
|
||
}, {
|
||
Name: "level",
|
||
Help: `GZIP (levels -2 to 9):
|
||
- -2 — Huffman encoding only. Only use if you know what you're doing.
|
||
- -1 (default) — recommended; equivalent to level 5.
|
||
- 0 — turns off compression.
|
||
- 1–9 — increase compression at the cost of speed. Going past 6 generally offers very little return.
|
||
|
||
ZSTD (levels 0 to 4):
|
||
- 0 — turns off compression entirely.
|
||
- 1 — fastest compression with the lowest ratio.
|
||
- 2 (default) — good balance of speed and compression.
|
||
- 3 — better compression, but uses about 2–3x more CPU than the default.
|
||
- 4 — best possible compression ratio (highest CPU cost).
|
||
|
||
Notes:
|
||
- Choose GZIP for wide compatibility; ZSTD for better speed/ratio tradeoffs.
|
||
- Negative gzip levels: -2 = Huffman-only, -1 = default (≈ level 5).`,
|
||
Required: true,
|
||
}, {
|
||
Name: "ram_cache_limit",
|
||
Help: `Some remotes don't allow the upload of files with unknown size.
|
||
In this case the compressed file will need to be cached to determine
|
||
it's size.
|
||
|
||
Files smaller than this limit will be cached in RAM, files larger than
|
||
this limit will be cached on disk.`,
|
||
Default: fs.SizeSuffix(20 * 1024 * 1024),
|
||
Advanced: true,
|
||
}},
|
||
})
|
||
}
|
||
|
||
// compressionModeHandler defines the interface for handling different compression modes
|
||
type compressionModeHandler interface {
|
||
// processFileNameGetFileExtension returns the file extension for the given compression mode
|
||
processFileNameGetFileExtension(compressionMode int) string
|
||
|
||
// newObjectGetOriginalSize returns the original file size from the metadata
|
||
newObjectGetOriginalSize(meta *ObjectMetadata) (int64, error)
|
||
|
||
// isCompressible checks the compression ratio of the provided data and returns true if the ratio exceeds
|
||
// the configured threshold
|
||
isCompressible(r io.Reader, compressionMode int) (bool, error)
|
||
|
||
// putCompress compresses the input data and uploads it to the remote, returning the new object and its metadata
|
||
putCompress(
|
||
ctx context.Context,
|
||
f *Fs,
|
||
in io.Reader,
|
||
src fs.ObjectInfo,
|
||
options []fs.OpenOption,
|
||
mimeType string,
|
||
) (fs.Object, *ObjectMetadata, error)
|
||
|
||
// openGetReadCloser opens a compressed object and returns a ReadCloser in the Open method
|
||
openGetReadCloser(
|
||
ctx context.Context,
|
||
o *Object,
|
||
offset int64,
|
||
limit int64,
|
||
cr chunkedreader.ChunkedReader,
|
||
closer io.Closer,
|
||
options ...fs.OpenOption,
|
||
) (rc io.ReadCloser, err error)
|
||
|
||
// putUncompressGetNewMetadata returns metadata in the putUncompress method for a specific compression algorithm
|
||
putUncompressGetNewMetadata(o fs.Object, mode int, md5 string, mimeType string, sum []byte) (fs.Object, *ObjectMetadata, error)
|
||
|
||
// This function generates a metadata object for sgzip.GzipMetadata or SzstdMetadata.
|
||
// Warning: This function panics if cmeta is not of the expected type.
|
||
newMetadata(size int64, mode int, cmeta any, md5 string, mimeType string) *ObjectMetadata
|
||
}
|
||
|
||
// Options defines the configuration for this backend
|
||
type Options struct {
|
||
Remote string `config:"remote"`
|
||
CompressionMode string `config:"mode"`
|
||
CompressionLevel int `config:"level"`
|
||
RAMCacheLimit fs.SizeSuffix `config:"ram_cache_limit"`
|
||
}
|
||
|
||
/*** FILESYSTEM FUNCTIONS ***/
|
||
|
||
// Fs represents a wrapped fs.Fs
|
||
type Fs struct {
|
||
fs.Fs
|
||
wrapper fs.Fs
|
||
name string
|
||
root string
|
||
opt Options
|
||
mode int // compression mode id
|
||
features *fs.Features // optional features
|
||
modeHandler compressionModeHandler // compression mode handler
|
||
}
|
||
|
||
// NewFs constructs an Fs from the path, container:path
|
||
func NewFs(ctx context.Context, name, rpath string, m configmap.Mapper) (fs.Fs, error) {
|
||
// Parse config into Options struct
|
||
opt := new(Options)
|
||
err := configstruct.Set(m, opt)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
remote := opt.Remote
|
||
if strings.HasPrefix(remote, name+":") {
|
||
return nil, errors.New("can't point press remote at itself - check the value of the remote setting")
|
||
}
|
||
|
||
wInfo, wName, wPath, wConfig, err := fs.ConfigFs(remote)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("failed to parse remote %q to wrap: %w", remote, err)
|
||
}
|
||
|
||
// Strip trailing slashes if they exist in rpath
|
||
rpath = strings.TrimRight(rpath, "\\/")
|
||
|
||
// First, check for a file
|
||
// If a metadata file was found, return an error. Otherwise, check for a directory
|
||
remotePath := fspath.JoinRootPath(wPath, makeMetadataName(rpath))
|
||
wrappedFs, err := wInfo.NewFs(ctx, wName, remotePath, wConfig)
|
||
if err != fs.ErrorIsFile {
|
||
remotePath = fspath.JoinRootPath(wPath, rpath)
|
||
wrappedFs, err = wInfo.NewFs(ctx, wName, remotePath, wConfig)
|
||
}
|
||
if err != nil && err != fs.ErrorIsFile {
|
||
return nil, fmt.Errorf("failed to make remote %s:%q to wrap: %w", wName, remotePath, err)
|
||
}
|
||
|
||
compressionMode := compressionModeFromName(opt.CompressionMode)
|
||
var modeHandler compressionModeHandler
|
||
|
||
switch compressionMode {
|
||
case Gzip:
|
||
modeHandler = &gzipModeHandler{}
|
||
case Zstd:
|
||
modeHandler = &zstdModeHandler{}
|
||
case Uncompressed:
|
||
modeHandler = &uncompressedModeHandler{}
|
||
default:
|
||
modeHandler = &unknownModeHandler{}
|
||
}
|
||
|
||
// Create the wrapping fs
|
||
f := &Fs{
|
||
Fs: wrappedFs,
|
||
name: name,
|
||
root: rpath,
|
||
opt: *opt,
|
||
mode: compressionMode,
|
||
modeHandler: modeHandler,
|
||
}
|
||
// Correct root if definitely pointing to a file
|
||
if err == fs.ErrorIsFile {
|
||
f.root = path.Dir(f.root)
|
||
if f.root == "." || f.root == "/" {
|
||
f.root = ""
|
||
}
|
||
}
|
||
// the features here are ones we could support, and they are
|
||
// ANDed with the ones from wrappedFs
|
||
f.features = (&fs.Features{
|
||
CaseInsensitive: true,
|
||
DuplicateFiles: false,
|
||
ReadMimeType: false,
|
||
WriteMimeType: false,
|
||
GetTier: true,
|
||
SetTier: true,
|
||
BucketBased: true,
|
||
CanHaveEmptyDirectories: true,
|
||
ReadMetadata: true,
|
||
WriteMetadata: true,
|
||
UserMetadata: true,
|
||
ReadDirMetadata: true,
|
||
WriteDirMetadata: true,
|
||
WriteDirSetModTime: true,
|
||
UserDirMetadata: true,
|
||
DirModTimeUpdatesOnWrite: true,
|
||
PartialUploads: true,
|
||
}).Fill(ctx, f).Mask(ctx, wrappedFs).WrapsFs(f, wrappedFs)
|
||
// We support reading MIME types no matter the wrapped fs
|
||
f.features.ReadMimeType = true
|
||
// We can only support putstream if we have serverside copy or move
|
||
if !operations.CanServerSideMove(wrappedFs) {
|
||
f.features.Disable("PutStream")
|
||
}
|
||
// Enable ListP always
|
||
f.features.ListP = f.ListP
|
||
|
||
return f, err
|
||
}
|
||
|
||
// compressionModeFromName converts a compression mode name to its int representation.
|
||
func compressionModeFromName(name string) int {
|
||
switch name {
|
||
case "gzip":
|
||
return Gzip
|
||
case "zstd":
|
||
return Zstd
|
||
default:
|
||
return Uncompressed
|
||
}
|
||
}
|
||
|
||
// Converts an int64 to base64
|
||
func int64ToBase64(number int64) string {
|
||
intBytes := make([]byte, 8)
|
||
binary.LittleEndian.PutUint64(intBytes, uint64(number))
|
||
return base64.RawURLEncoding.EncodeToString(intBytes)
|
||
}
|
||
|
||
// Converts base64 to int64
|
||
func base64ToInt64(str string) (int64, error) {
|
||
intBytes, err := base64.RawURLEncoding.DecodeString(str)
|
||
if err != nil {
|
||
return 0, err
|
||
}
|
||
return int64(binary.LittleEndian.Uint64(intBytes)), nil
|
||
}
|
||
|
||
// Processes a file name for a compressed file. Returns the original file name, the extension, and the size of the original file.
|
||
// Returns -2 for the original size if the file is uncompressed.
|
||
func processFileName(compressedFileName string, modeHandler compressionModeHandler) (origFileName string, extension string, origSize int64, err error) {
|
||
// Separate the filename and size from the extension
|
||
extensionPos := strings.LastIndex(compressedFileName, ".")
|
||
if extensionPos == -1 {
|
||
return "", "", 0, errors.New("file name has no extension")
|
||
}
|
||
extension = compressedFileName[extensionPos:]
|
||
nameWithSize := compressedFileName[:extensionPos]
|
||
if extension == uncompressedFileExt {
|
||
return nameWithSize, extension, -2, nil
|
||
}
|
||
match := nameRegexp.FindStringSubmatch(nameWithSize)
|
||
if match == nil || len(match) != 3 {
|
||
return "", "", 0, errors.New("invalid filename")
|
||
}
|
||
size, err := base64ToInt64(match[2])
|
||
if err != nil {
|
||
return "", "", 0, errors.New("could not decode size")
|
||
}
|
||
ext := modeHandler.processFileNameGetFileExtension(compressionModeFromName(compressedFileName[extensionPos+1:]))
|
||
return match[1], ext, size, nil
|
||
}
|
||
|
||
// Generates the file name for a metadata file
|
||
func makeMetadataName(remote string) (newRemote string) {
|
||
return remote + metaFileExt
|
||
}
|
||
|
||
// Checks whether a file is a metadata file
|
||
func isMetadataFile(filename string) bool {
|
||
return strings.HasSuffix(filename, metaFileExt)
|
||
}
|
||
|
||
// Checks whether a file is a metadata file and returns the original
|
||
// file name and a flag indicating whether it was a metadata file or
|
||
// not.
|
||
func unwrapMetadataFile(filename string) (string, bool) {
|
||
if !isMetadataFile(filename) {
|
||
return "", false
|
||
}
|
||
return filename[:len(filename)-len(metaFileExt)], true
|
||
}
|
||
|
||
// makeDataName generates the file name for a data file with specified compression mode
|
||
func makeDataName(remote string, size int64, mode int) (newRemote string) {
|
||
switch mode {
|
||
case Gzip:
|
||
newRemote = remote + "." + int64ToBase64(size) + gzFileExt
|
||
case Zstd:
|
||
newRemote = remote + "." + int64ToBase64(size) + zstdFileExt
|
||
default:
|
||
newRemote = remote + uncompressedFileExt
|
||
}
|
||
|
||
return newRemote
|
||
}
|
||
|
||
// dataName generates the file name for data file
|
||
func (f *Fs) dataName(remote string, size int64, compressed bool) (name string) {
|
||
if !compressed {
|
||
return makeDataName(remote, size, Uncompressed)
|
||
}
|
||
return makeDataName(remote, size, f.mode)
|
||
}
|
||
|
||
// addData parses an object and adds it to the DirEntries
|
||
func (f *Fs) addData(entries *fs.DirEntries, o fs.Object) {
|
||
origFileName, _, size, err := processFileName(o.Remote(), f.modeHandler)
|
||
if err != nil {
|
||
fs.Errorf(o, "Error on parsing file name: %v", err)
|
||
return
|
||
}
|
||
if size == -2 { // File is uncompressed
|
||
size = o.Size()
|
||
}
|
||
metaName := makeMetadataName(origFileName)
|
||
*entries = append(*entries, f.newObjectSizeAndNameOnly(o, metaName, size))
|
||
}
|
||
|
||
// addDir adds a dir to the dir entries
|
||
func (f *Fs) addDir(entries *fs.DirEntries, dir fs.Directory) {
|
||
*entries = append(*entries, f.newDir(dir))
|
||
}
|
||
|
||
// newDir returns a dir
|
||
func (f *Fs) newDir(dir fs.Directory) fs.Directory {
|
||
return dir // We're using the same dir
|
||
}
|
||
|
||
// processEntries parses the file names and adds metadata to the dir entries
|
||
func (f *Fs) processEntries(entries fs.DirEntries) (newEntries fs.DirEntries, err error) {
|
||
newEntries = entries[:0] // in place filter
|
||
for _, entry := range entries {
|
||
switch x := entry.(type) {
|
||
case fs.Object:
|
||
if !isMetadataFile(x.Remote()) {
|
||
f.addData(&newEntries, x) // Only care about data files for now; metadata files are redundant.
|
||
}
|
||
case fs.Directory:
|
||
f.addDir(&newEntries, x)
|
||
default:
|
||
return nil, fmt.Errorf("unknown object type %T", entry)
|
||
}
|
||
}
|
||
return newEntries, nil
|
||
}
|
||
|
||
// List the objects and directories in dir into entries. The
|
||
// entries can be returned in any order but should be for a
|
||
// complete directory.
|
||
//
|
||
// dir should be "" to list the root, and should not have
|
||
// trailing slashes.
|
||
//
|
||
// This should return ErrDirNotFound if the directory isn't
|
||
// found.
|
||
// List entries and process them
|
||
func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err error) {
|
||
return list.WithListP(ctx, dir, f)
|
||
}
|
||
|
||
// ListP lists the objects and directories of the Fs starting
|
||
// from dir non recursively into out.
|
||
//
|
||
// dir should be "" to start from the root, and should not
|
||
// have trailing slashes.
|
||
//
|
||
// This should return ErrDirNotFound if the directory isn't
|
||
// found.
|
||
//
|
||
// It should call callback for each tranche of entries read.
|
||
// These need not be returned in any particular order. If
|
||
// callback returns an error then the listing will stop
|
||
// immediately.
|
||
func (f *Fs) ListP(ctx context.Context, dir string, callback fs.ListRCallback) error {
|
||
wrappedCallback := func(entries fs.DirEntries) error {
|
||
entries, err := f.processEntries(entries)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
return callback(entries)
|
||
}
|
||
listP := f.Fs.Features().ListP
|
||
if listP == nil {
|
||
entries, err := f.Fs.List(ctx, dir)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
return wrappedCallback(entries)
|
||
}
|
||
return listP(ctx, dir, wrappedCallback)
|
||
}
|
||
|
||
// ListR lists the objects and directories of the Fs starting
|
||
// from dir recursively into out.
|
||
//
|
||
// dir should be "" to start from the root, and should not
|
||
// have trailing slashes.
|
||
//
|
||
// This should return ErrDirNotFound if the directory isn't
|
||
// found.
|
||
//
|
||
// It should call callback for each tranche of entries read.
|
||
// These need not be returned in any particular order. If
|
||
// callback returns an error then the listing will stop
|
||
// immediately.
|
||
//
|
||
// Don't implement this unless you have a more efficient way
|
||
// of listing recursively that doing a directory traversal.
|
||
func (f *Fs) ListR(ctx context.Context, dir string, callback fs.ListRCallback) (err error) {
|
||
return f.Fs.Features().ListR(ctx, dir, func(entries fs.DirEntries) error {
|
||
newEntries, err := f.processEntries(entries)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
return callback(newEntries)
|
||
})
|
||
}
|
||
|
||
// NewObject finds the Object at remote.
|
||
func (f *Fs) NewObject(ctx context.Context, remote string) (fs.Object, error) {
|
||
// Read metadata from metadata object
|
||
mo, err := f.Fs.NewObject(ctx, makeMetadataName(remote))
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
meta, err := readMetadata(ctx, mo)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("error decoding metadata: %w", err)
|
||
}
|
||
size, err := f.modeHandler.newObjectGetOriginalSize(meta)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("error reading metadata: %w", err)
|
||
}
|
||
// Create our Object
|
||
o, err := f.Fs.NewObject(ctx, makeDataName(remote, size, meta.Mode))
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
return f.newObject(o, mo, meta), nil
|
||
}
|
||
|
||
// checkCompressAndType checks if an object is compressible and determines it's mime type
|
||
// returns a multireader with the bytes that were read to determine mime type
|
||
func checkCompressAndType(in io.Reader, compressionMode int, modeHandler compressionModeHandler) (newReader io.Reader, compressible bool, mimeType string, err error) {
|
||
in, wrap := accounting.UnWrap(in)
|
||
buf := make([]byte, heuristicBytes)
|
||
n, err := in.Read(buf)
|
||
buf = buf[:n]
|
||
if err != nil && err != io.EOF {
|
||
return nil, false, "", err
|
||
}
|
||
mime := mimetype.Detect(buf)
|
||
compressible, err = modeHandler.isCompressible(bytes.NewReader(buf), compressionMode)
|
||
if err != nil {
|
||
return nil, false, "", err
|
||
}
|
||
in = io.MultiReader(bytes.NewReader(buf), in)
|
||
return wrap(in), compressible, mime.String(), nil
|
||
}
|
||
|
||
// verifyObjectHash verifies the Objects hash
|
||
func (f *Fs) verifyObjectHash(ctx context.Context, o fs.Object, hasher *hash.MultiHasher, ht hash.Type) error {
|
||
srcHash := hasher.Sums()[ht]
|
||
dstHash, err := o.Hash(ctx, ht)
|
||
if err != nil {
|
||
return fmt.Errorf("failed to read destination hash: %w", err)
|
||
}
|
||
if srcHash != "" && dstHash != "" && srcHash != dstHash {
|
||
// remove object
|
||
err = o.Remove(ctx)
|
||
if err != nil {
|
||
fs.Errorf(o, "Failed to remove corrupted object: %v", err)
|
||
}
|
||
return fmt.Errorf("corrupted on transfer: %v compressed hashes differ src(%s) %q vs dst(%s) %q", ht, f.Fs, srcHash, o.Fs(), dstHash)
|
||
}
|
||
return nil
|
||
}
|
||
|
||
type putFn func(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error)
|
||
|
||
type compressionResult[T sgzip.GzipMetadata | SzstdMetadata] struct {
|
||
err error
|
||
meta T
|
||
}
|
||
|
||
// replicating some of operations.Rcat functionality because we want to support remotes without streaming
|
||
// support and of course cannot know the size of a compressed file before compressing it.
|
||
func (f *Fs) rcat(ctx context.Context, dstFileName string, in io.ReadCloser, modTime time.Time, options []fs.OpenOption) (o fs.Object, err error) {
|
||
|
||
// cache small files in memory and do normal upload
|
||
buf := make([]byte, f.opt.RAMCacheLimit)
|
||
if n, err := io.ReadFull(in, buf); err == io.EOF || err == io.ErrUnexpectedEOF {
|
||
src := object.NewStaticObjectInfo(dstFileName, modTime, int64(len(buf[:n])), false, nil, f.Fs)
|
||
return f.Fs.Put(ctx, bytes.NewBuffer(buf[:n]), src, options...)
|
||
}
|
||
|
||
// Need to include what we already read
|
||
in = &ReadCloserWrapper{
|
||
Reader: io.MultiReader(bytes.NewReader(buf), in),
|
||
Closer: in,
|
||
}
|
||
|
||
canStream := f.Fs.Features().PutStream != nil
|
||
if canStream {
|
||
src := object.NewStaticObjectInfo(dstFileName, modTime, -1, false, nil, f.Fs)
|
||
return f.Fs.Features().PutStream(ctx, in, src, options...)
|
||
}
|
||
|
||
fs.Debugf(f, "Target remote doesn't support streaming uploads, creating temporary local file")
|
||
tempFile, err := os.CreateTemp("", "rclone-press-")
|
||
defer func() {
|
||
// these errors should be relatively uncritical and the upload should've succeeded so it's okay-ish
|
||
// to ignore them
|
||
_ = tempFile.Close()
|
||
_ = os.Remove(tempFile.Name())
|
||
}()
|
||
if err != nil {
|
||
return nil, fmt.Errorf("failed to create temporary local FS to spool file: %w", err)
|
||
}
|
||
if _, err = io.Copy(tempFile, in); err != nil {
|
||
return nil, fmt.Errorf("failed to write temporary local file: %w", err)
|
||
}
|
||
if _, err = tempFile.Seek(0, 0); err != nil {
|
||
return nil, fmt.Errorf("failed to seek temporary local file: %w", err)
|
||
}
|
||
finfo, err := tempFile.Stat()
|
||
if err != nil {
|
||
return nil, fmt.Errorf("failed to stat temporary local file: %w", err)
|
||
}
|
||
return f.Fs.Put(ctx, tempFile, object.NewStaticObjectInfo(dstFileName, modTime, finfo.Size(), false, nil, f.Fs))
|
||
}
|
||
|
||
// Put a compressed version of a file. Returns a wrappable object and metadata.
|
||
func (f *Fs) putCompress(ctx context.Context, in io.Reader, src fs.ObjectInfo, options []fs.OpenOption, mimeType string) (fs.Object, *ObjectMetadata, error) {
|
||
return f.modeHandler.putCompress(ctx, f, in, src, options, mimeType)
|
||
}
|
||
|
||
// Put an uncompressed version of a file. Returns a wrappable object and metadata.
|
||
func (f *Fs) putUncompress(ctx context.Context, in io.Reader, src fs.ObjectInfo, put putFn, options []fs.OpenOption, mimeType string) (fs.Object, *ObjectMetadata, error) {
|
||
// Unwrap the accounting, add our metadata hasher, then wrap it back on
|
||
in, wrap := accounting.UnWrap(in)
|
||
|
||
hs := hash.NewHashSet(hash.MD5)
|
||
ht := f.Fs.Hashes().GetOne()
|
||
if !hs.Contains(ht) {
|
||
hs.Add(ht)
|
||
}
|
||
metaHasher, err := hash.NewMultiHasherTypes(hs)
|
||
if err != nil {
|
||
return nil, nil, err
|
||
}
|
||
in = io.TeeReader(in, metaHasher)
|
||
wrappedIn := wrap(in)
|
||
|
||
// Put the object
|
||
o, err := put(ctx, wrappedIn, f.wrapInfo(src, makeDataName(src.Remote(), src.Size(), Uncompressed), src.Size()), options...)
|
||
if err != nil {
|
||
if o != nil {
|
||
removeErr := o.Remove(ctx)
|
||
if removeErr != nil {
|
||
fs.Errorf(o, "Failed to remove partially transferred object: %v", err)
|
||
}
|
||
}
|
||
return nil, nil, err
|
||
}
|
||
// Check the hashes of the compressed data if we were comparing them
|
||
if ht != hash.None {
|
||
err := f.verifyObjectHash(ctx, o, metaHasher, ht)
|
||
if err != nil {
|
||
return nil, nil, err
|
||
}
|
||
}
|
||
|
||
// Return our object and metadata
|
||
sum, err := metaHasher.Sum(hash.MD5)
|
||
if err != nil {
|
||
return nil, nil, err
|
||
}
|
||
|
||
return f.modeHandler.putUncompressGetNewMetadata(o, Uncompressed, hex.EncodeToString(sum), mimeType, sum)
|
||
}
|
||
|
||
// This function will write a metadata struct to a metadata Object for an src. Returns a wrappable metadata object.
|
||
func (f *Fs) putMetadata(ctx context.Context, meta *ObjectMetadata, src fs.ObjectInfo, options []fs.OpenOption, put putFn) (mo fs.Object, err error) {
|
||
// Generate the metadata contents
|
||
data, err := json.Marshal(meta)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
metaReader := bytes.NewReader(data)
|
||
|
||
// Put the data
|
||
mo, err = put(ctx, metaReader, f.wrapInfo(src, makeMetadataName(src.Remote()), int64(len(data))), options...)
|
||
if err != nil {
|
||
if mo != nil {
|
||
removeErr := mo.Remove(ctx)
|
||
if removeErr != nil {
|
||
fs.Errorf(mo, "Failed to remove partially transferred object: %v", err)
|
||
}
|
||
}
|
||
return nil, err
|
||
}
|
||
|
||
return mo, nil
|
||
}
|
||
|
||
// This function will put both the data and metadata for an Object.
|
||
// putData is the function used for data, while putMeta is the function used for metadata.
|
||
// The putData function will only be used when the object is not compressible if the
|
||
// data is compressible this parameter will be ignored.
|
||
func (f *Fs) putWithCustomFunctions(ctx context.Context, in io.Reader, src fs.ObjectInfo, options []fs.OpenOption,
|
||
putData putFn, putMeta putFn, compressible bool, mimeType string) (*Object, error) {
|
||
// Put file then metadata
|
||
var dataObject fs.Object
|
||
var meta *ObjectMetadata
|
||
var err error
|
||
if compressible {
|
||
dataObject, meta, err = f.putCompress(ctx, in, src, options, mimeType)
|
||
} else {
|
||
dataObject, meta, err = f.putUncompress(ctx, in, src, putData, options, mimeType)
|
||
}
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
mo, err := f.putMetadata(ctx, meta, src, options, putMeta)
|
||
|
||
// meta data upload may fail. in this case we try to remove the original object
|
||
if err != nil {
|
||
removeError := dataObject.Remove(ctx)
|
||
if removeError != nil {
|
||
return nil, removeError
|
||
}
|
||
return nil, err
|
||
}
|
||
return f.newObject(dataObject, mo, meta), nil
|
||
}
|
||
|
||
// Put in to the remote path with the modTime given of the given size
|
||
//
|
||
// May create the object even if it returns an error - if so
|
||
// will return the object and the error, otherwise will return
|
||
// nil and the error
|
||
func (f *Fs) Put(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
|
||
// If there's already an existent objects we need to make sure to explicitly update it to make sure we don't leave
|
||
// orphaned data. Alternatively we could also deleted (which would simpler) but has the disadvantage that it
|
||
// destroys all server-side versioning.
|
||
o, err := f.NewObject(ctx, src.Remote())
|
||
if err == fs.ErrorObjectNotFound {
|
||
// Get our file compressibility
|
||
in, compressible, mimeType, err := checkCompressAndType(in, f.mode, f.modeHandler)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
return f.putWithCustomFunctions(ctx, in, src, options, f.Fs.Put, f.Fs.Put, compressible, mimeType)
|
||
}
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
return o, o.Update(ctx, in, src, options...)
|
||
}
|
||
|
||
// PutStream uploads to the remote path with the modTime given of indeterminate size
|
||
func (f *Fs) PutStream(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
|
||
oldObj, err := f.NewObject(ctx, src.Remote())
|
||
if err != nil && err != fs.ErrorObjectNotFound {
|
||
return nil, err
|
||
}
|
||
found := err == nil
|
||
|
||
in, compressible, mimeType, err := checkCompressAndType(in, f.mode, f.modeHandler)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
newObj, err := f.putWithCustomFunctions(ctx, in, src, options, f.Fs.Features().PutStream, f.Fs.Put, compressible, mimeType)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
// Our transfer is now complete. We have to make sure to remove the old object because our new object will
|
||
// have a different name except when both the old and the new object where uncompressed.
|
||
if found && (oldObj.(*Object).meta.Mode != Uncompressed || compressible) {
|
||
err = oldObj.(*Object).Object.Remove(ctx)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("couldn't remove original object: %w", err)
|
||
}
|
||
}
|
||
|
||
// If our new object is compressed we have to rename it with the correct size.
|
||
// Uncompressed objects don't store the size in the name so we they'll already have the correct name.
|
||
if compressible {
|
||
wrapObj, err := operations.Move(ctx, f.Fs, nil, f.dataName(src.Remote(), newObj.size, compressible), newObj.Object)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("couldn't rename streamed object: %w", err)
|
||
}
|
||
newObj.Object = wrapObj
|
||
}
|
||
return newObj, nil
|
||
}
|
||
|
||
// Temporarily disabled. There might be a way to implement this correctly but with the current handling metadata duplicate objects
|
||
// will break stuff. Right no I can't think of a way to make this work.
|
||
|
||
// PutUnchecked uploads the object
|
||
//
|
||
// This will create a duplicate if we upload a new file without
|
||
// checking to see if there is one already - use Put() for that.
|
||
|
||
// Hashes returns the supported hash sets.
|
||
func (f *Fs) Hashes() hash.Set {
|
||
return hash.Set(hash.MD5)
|
||
}
|
||
|
||
// Mkdir makes the directory (container, bucket)
|
||
//
|
||
// Shouldn't return an error if it already exists
|
||
func (f *Fs) Mkdir(ctx context.Context, dir string) error {
|
||
return f.Fs.Mkdir(ctx, dir)
|
||
}
|
||
|
||
// MkdirMetadata makes the root directory of the Fs object
|
||
func (f *Fs) MkdirMetadata(ctx context.Context, dir string, metadata fs.Metadata) (fs.Directory, error) {
|
||
if do := f.Fs.Features().MkdirMetadata; do != nil {
|
||
return do(ctx, dir, metadata)
|
||
}
|
||
return nil, fs.ErrorNotImplemented
|
||
}
|
||
|
||
// Rmdir removes the directory (container, bucket) if empty
|
||
//
|
||
// Return an error if it doesn't exist or isn't empty
|
||
func (f *Fs) Rmdir(ctx context.Context, dir string) error {
|
||
return f.Fs.Rmdir(ctx, dir)
|
||
}
|
||
|
||
// Purge all files in the root and the root directory
|
||
//
|
||
// Implement this if you have a way of deleting all the files
|
||
// quicker than just running Remove() on the result of List()
|
||
//
|
||
// Return an error if it doesn't exist
|
||
func (f *Fs) Purge(ctx context.Context, dir string) error {
|
||
do := f.Fs.Features().Purge
|
||
if do == nil {
|
||
return fs.ErrorCantPurge
|
||
}
|
||
return do(ctx, dir)
|
||
}
|
||
|
||
// Copy src to this remote using server side copy operations.
|
||
//
|
||
// This is stored with the remote path given.
|
||
//
|
||
// It returns the destination Object and a possible error.
|
||
//
|
||
// Will only be called if src.Fs().Name() == f.Name()
|
||
//
|
||
// If it isn't possible then return fs.ErrorCantCopy
|
||
func (f *Fs) Copy(ctx context.Context, src fs.Object, remote string) (fs.Object, error) {
|
||
do := f.Fs.Features().Copy
|
||
if do == nil {
|
||
return nil, fs.ErrorCantCopy
|
||
}
|
||
o, ok := src.(*Object)
|
||
if !ok {
|
||
return nil, fs.ErrorCantCopy
|
||
}
|
||
// We might be trying to overwrite a file with a newer version but due to size difference the name
|
||
// is different. Therefore we have to remove the old file first (if it exists).
|
||
dstFile, err := f.NewObject(ctx, remote)
|
||
if err != nil && err != fs.ErrorObjectNotFound {
|
||
return nil, err
|
||
}
|
||
if err == nil {
|
||
err := dstFile.Remove(ctx)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
}
|
||
|
||
// Copy over metadata
|
||
err = o.loadMetadataIfNotLoaded(ctx)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
newFilename := makeMetadataName(remote)
|
||
moResult, err := do(ctx, o.mo, newFilename)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
// Copy over data
|
||
newFilename = makeDataName(remote, src.Size(), o.meta.Mode)
|
||
oResult, err := do(ctx, o.Object, newFilename)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
return f.newObject(oResult, moResult, o.meta), nil
|
||
}
|
||
|
||
// Move src to this remote using server side move operations.
|
||
//
|
||
// This is stored with the remote path given.
|
||
//
|
||
// It returns the destination Object and a possible error.
|
||
//
|
||
// Will only be called if src.Fs().Name() == f.Name()
|
||
//
|
||
// If it isn't possible then return fs.ErrorCantMove
|
||
func (f *Fs) Move(ctx context.Context, src fs.Object, remote string) (fs.Object, error) {
|
||
do := f.Fs.Features().Move
|
||
if do == nil {
|
||
return nil, fs.ErrorCantMove
|
||
}
|
||
o, ok := src.(*Object)
|
||
if !ok {
|
||
return nil, fs.ErrorCantMove
|
||
}
|
||
// We might be trying to overwrite a file with a newer version but due to size difference the name
|
||
// is different. Therefore we have to remove the old file first (if it exists).
|
||
dstFile, err := f.NewObject(ctx, remote)
|
||
if err != nil && err != fs.ErrorObjectNotFound {
|
||
return nil, err
|
||
}
|
||
if err == nil {
|
||
err := dstFile.Remove(ctx)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
}
|
||
|
||
// Move metadata
|
||
err = o.loadMetadataIfNotLoaded(ctx)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
newFilename := makeMetadataName(remote)
|
||
moResult, err := do(ctx, o.mo, newFilename)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
// Move data
|
||
newFilename = makeDataName(remote, src.Size(), o.meta.Mode)
|
||
oResult, err := do(ctx, o.Object, newFilename)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
return f.newObject(oResult, moResult, o.meta), nil
|
||
}
|
||
|
||
// DirMove moves src, srcRemote to this remote at dstRemote
|
||
// using server side move operations.
|
||
//
|
||
// Will only be called if src.Fs().Name() == f.Name()
|
||
//
|
||
// If it isn't possible then return fs.ErrorCantDirMove
|
||
//
|
||
// If destination exists then return fs.ErrorDirExists
|
||
func (f *Fs) DirMove(ctx context.Context, src fs.Fs, srcRemote, dstRemote string) error {
|
||
do := f.Fs.Features().DirMove
|
||
if do == nil {
|
||
return fs.ErrorCantDirMove
|
||
}
|
||
srcFs, ok := src.(*Fs)
|
||
if !ok {
|
||
fs.Debugf(srcFs, "Can't move directory - not same remote type")
|
||
return fs.ErrorCantDirMove
|
||
}
|
||
return do(ctx, srcFs.Fs, srcRemote, dstRemote)
|
||
}
|
||
|
||
// DirSetModTime sets the directory modtime for dir
|
||
func (f *Fs) DirSetModTime(ctx context.Context, dir string, modTime time.Time) error {
|
||
if do := f.Fs.Features().DirSetModTime; do != nil {
|
||
return do(ctx, dir, modTime)
|
||
}
|
||
return fs.ErrorNotImplemented
|
||
}
|
||
|
||
// CleanUp the trash in the Fs
|
||
//
|
||
// Implement this if you have a way of emptying the trash or
|
||
// otherwise cleaning up old versions of files.
|
||
func (f *Fs) CleanUp(ctx context.Context) error {
|
||
do := f.Fs.Features().CleanUp
|
||
if do == nil {
|
||
return errors.New("not supported by underlying remote")
|
||
}
|
||
return do(ctx)
|
||
}
|
||
|
||
// About gets quota information from the Fs
|
||
func (f *Fs) About(ctx context.Context) (*fs.Usage, error) {
|
||
do := f.Fs.Features().About
|
||
if do == nil {
|
||
return nil, errors.New("not supported by underlying remote")
|
||
}
|
||
return do(ctx)
|
||
}
|
||
|
||
// UnWrap returns the Fs that this Fs is wrapping
|
||
func (f *Fs) UnWrap() fs.Fs {
|
||
return f.Fs
|
||
}
|
||
|
||
// WrapFs returns the Fs that is wrapping this Fs
|
||
func (f *Fs) WrapFs() fs.Fs {
|
||
return f.wrapper
|
||
}
|
||
|
||
// SetWrapper sets the Fs that is wrapping this Fs
|
||
func (f *Fs) SetWrapper(wrapper fs.Fs) {
|
||
f.wrapper = wrapper
|
||
}
|
||
|
||
// MergeDirs merges the contents of all the directories passed
|
||
// in into the first one and rmdirs the other directories.
|
||
func (f *Fs) MergeDirs(ctx context.Context, dirs []fs.Directory) error {
|
||
do := f.Fs.Features().MergeDirs
|
||
if do == nil {
|
||
return errors.New("MergeDirs not supported")
|
||
}
|
||
out := make([]fs.Directory, len(dirs))
|
||
for i, dir := range dirs {
|
||
out[i] = fs.NewDirCopy(ctx, dir).SetRemote(dir.Remote())
|
||
}
|
||
return do(ctx, out)
|
||
}
|
||
|
||
// DirCacheFlush resets the directory cache - used in testing
|
||
// as an optional interface
|
||
func (f *Fs) DirCacheFlush() {
|
||
do := f.Fs.Features().DirCacheFlush
|
||
if do != nil {
|
||
do()
|
||
}
|
||
}
|
||
|
||
// ChangeNotify calls the passed function with a path
|
||
// that has had changes. If the implementation
|
||
// uses polling, it should adhere to the given interval.
|
||
func (f *Fs) ChangeNotify(ctx context.Context, notifyFunc func(string, fs.EntryType), pollIntervalChan <-chan time.Duration) {
|
||
do := f.Fs.Features().ChangeNotify
|
||
if do == nil {
|
||
return
|
||
}
|
||
wrappedNotifyFunc := func(path string, entryType fs.EntryType) {
|
||
fs.Logf(f, "path %q entryType %d", path, entryType)
|
||
var (
|
||
wrappedPath string
|
||
isMetadataFile bool
|
||
)
|
||
switch entryType {
|
||
case fs.EntryDirectory:
|
||
wrappedPath = path
|
||
case fs.EntryObject:
|
||
// Note: All we really need to do to monitor the object is to check whether the metadata changed,
|
||
// as the metadata contains the hash. This will work unless there's a hash collision and the sizes stay the same.
|
||
wrappedPath, isMetadataFile = unwrapMetadataFile(path)
|
||
if !isMetadataFile {
|
||
return
|
||
}
|
||
default:
|
||
fs.Errorf(path, "press ChangeNotify: ignoring unknown EntryType %d", entryType)
|
||
return
|
||
}
|
||
notifyFunc(wrappedPath, entryType)
|
||
}
|
||
do(ctx, wrappedNotifyFunc, pollIntervalChan)
|
||
}
|
||
|
||
// PublicLink generates a public link to the remote path (usually readable by anyone)
|
||
func (f *Fs) PublicLink(ctx context.Context, remote string, duration fs.Duration, unlink bool) (string, error) {
|
||
do := f.Fs.Features().PublicLink
|
||
if do == nil {
|
||
return "", errors.New("can't PublicLink: not supported by underlying remote")
|
||
}
|
||
o, err := f.NewObject(ctx, remote)
|
||
if err != nil {
|
||
// assume it is a directory
|
||
return do(ctx, remote, duration, unlink)
|
||
}
|
||
return do(ctx, o.(*Object).Object.Remote(), duration, unlink)
|
||
}
|
||
|
||
/*** OBJECT FUNCTIONS ***/
|
||
|
||
// ObjectMetadata describes the metadata for an Object.
|
||
type ObjectMetadata struct {
|
||
Mode int // Compression mode of the file.
|
||
Size int64 // Size of the object.
|
||
MD5 string // MD5 hash of the file.
|
||
MimeType string // Mime type of the file
|
||
CompressionMetadataGzip *sgzip.GzipMetadata // Metadata for Gzip compression
|
||
CompressionMetadataZstd *SzstdMetadata // Metadata for Zstd compression
|
||
}
|
||
|
||
// Object with external metadata
|
||
type Object struct {
|
||
fs.Object // Wraps around data object for this object
|
||
f *Fs // Filesystem object is in
|
||
mo fs.Object // Metadata object for this object
|
||
moName string // Metadata file name for this object
|
||
size int64 // Size of this object
|
||
meta *ObjectMetadata // Metadata struct for this object (nil if not loaded)
|
||
}
|
||
|
||
// This function will read the metadata from a metadata object.
|
||
func readMetadata(ctx context.Context, mo fs.Object) (meta *ObjectMetadata, err error) {
|
||
// Open our meradata object
|
||
rc, err := mo.Open(ctx)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
defer fs.CheckClose(rc, &err)
|
||
jr := json.NewDecoder(rc)
|
||
meta = new(ObjectMetadata)
|
||
if err = jr.Decode(meta); err != nil {
|
||
return nil, err
|
||
}
|
||
return meta, nil
|
||
}
|
||
|
||
// Remove removes this object
|
||
func (o *Object) Remove(ctx context.Context) error {
|
||
err := o.loadMetadataObjectIfNotLoaded(ctx)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
err = o.mo.Remove(ctx)
|
||
objErr := o.Object.Remove(ctx)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
return objErr
|
||
}
|
||
|
||
// ReadCloserWrapper combines a Reader and a Closer to a ReadCloser
|
||
type ReadCloserWrapper struct {
|
||
io.Reader
|
||
io.Closer
|
||
}
|
||
|
||
// Update in to the object with the modTime given of the given size
|
||
func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (err error) {
|
||
err = o.loadMetadataIfNotLoaded(ctx) // Loads metadata object too
|
||
if err != nil {
|
||
return err
|
||
}
|
||
// Function that updates metadata object
|
||
updateMeta := func(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
|
||
return o.mo, o.mo.Update(ctx, in, src, options...)
|
||
}
|
||
|
||
in, compressible, mimeType, err := checkCompressAndType(in, o.meta.Mode, o.f.modeHandler)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
// Since we are storing the filesize in the name the new object may have different name than the old
|
||
// We'll make sure to delete the old object in this case
|
||
var newObject *Object
|
||
origName := o.Remote()
|
||
if o.meta.Mode != Uncompressed || compressible {
|
||
newObject, err = o.f.putWithCustomFunctions(ctx, in, o.f.wrapInfo(src, origName, src.Size()), options, o.f.Fs.Put, updateMeta, compressible, mimeType)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
if newObject.Object.Remote() != o.Object.Remote() {
|
||
if removeErr := o.Object.Remove(ctx); removeErr != nil {
|
||
return removeErr
|
||
}
|
||
}
|
||
} else {
|
||
// We can only support update when BOTH the old and the new object are uncompressed because only then
|
||
// the filesize will be known beforehand and name will stay the same
|
||
update := func(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
|
||
return o.Object, o.Object.Update(ctx, in, src, options...)
|
||
}
|
||
// If we are, just update the object and metadata
|
||
newObject, err = o.f.putWithCustomFunctions(ctx, in, src, options, update, updateMeta, compressible, mimeType)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
}
|
||
// Update object metadata and return
|
||
o.Object = newObject.Object
|
||
o.meta = newObject.meta
|
||
o.size = newObject.size
|
||
return nil
|
||
}
|
||
|
||
// This will initialize the variables of a new press Object. The metadata object, mo, and metadata struct, meta, must be specified.
|
||
func (f *Fs) newObject(o fs.Object, mo fs.Object, meta *ObjectMetadata) *Object {
|
||
if o == nil {
|
||
log.Trace(nil, "newObject(%#v, %#v, %#v) called with nil o", o, mo, meta)
|
||
}
|
||
return &Object{
|
||
Object: o,
|
||
f: f,
|
||
mo: mo,
|
||
moName: mo.Remote(),
|
||
size: meta.Size,
|
||
meta: meta,
|
||
}
|
||
}
|
||
|
||
// This initializes the variables of a press Object with only the size. The metadata will be loaded later on demand.
|
||
func (f *Fs) newObjectSizeAndNameOnly(o fs.Object, moName string, size int64) *Object {
|
||
if o == nil {
|
||
log.Trace(nil, "newObjectSizeAndNameOnly(%#v, %#v, %#v) called with nil o", o, moName, size)
|
||
}
|
||
return &Object{
|
||
Object: o,
|
||
f: f,
|
||
mo: nil,
|
||
moName: moName,
|
||
size: size,
|
||
meta: nil,
|
||
}
|
||
}
|
||
|
||
// Shutdown the backend, closing any background tasks and any
|
||
// cached connections.
|
||
func (f *Fs) Shutdown(ctx context.Context) error {
|
||
do := f.Fs.Features().Shutdown
|
||
if do == nil {
|
||
return nil
|
||
}
|
||
return do(ctx)
|
||
}
|
||
|
||
// This loads the metadata of a press Object if it's not loaded yet
|
||
func (o *Object) loadMetadataIfNotLoaded(ctx context.Context) (err error) {
|
||
err = o.loadMetadataObjectIfNotLoaded(ctx)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
if o.meta == nil {
|
||
o.meta, err = readMetadata(ctx, o.mo)
|
||
}
|
||
return err
|
||
}
|
||
|
||
// This loads the metadata object of a press Object if it's not loaded yet
|
||
func (o *Object) loadMetadataObjectIfNotLoaded(ctx context.Context) (err error) {
|
||
if o.mo == nil {
|
||
o.mo, err = o.f.Fs.NewObject(ctx, o.moName)
|
||
}
|
||
return err
|
||
}
|
||
|
||
// Fs returns read only access to the Fs that this object is part of
|
||
func (o *Object) Fs() fs.Info {
|
||
return o.f
|
||
}
|
||
|
||
// Return a string version
|
||
func (o *Object) String() string {
|
||
if o == nil {
|
||
return "<nil>"
|
||
}
|
||
return o.Remote()
|
||
}
|
||
|
||
// Remote returns the remote path
|
||
func (o *Object) Remote() string {
|
||
origFileName, _, _, err := processFileName(o.Object.Remote(), o.f.modeHandler)
|
||
if err != nil {
|
||
fs.Errorf(o.f, "Could not get remote path for: %s", o.Object.Remote())
|
||
return o.Object.Remote()
|
||
}
|
||
return origFileName
|
||
}
|
||
|
||
// Size returns the size of the file
|
||
func (o *Object) Size() int64 {
|
||
if o.meta == nil {
|
||
return o.size
|
||
}
|
||
return o.meta.Size
|
||
}
|
||
|
||
// MimeType returns the MIME type of the file
|
||
func (o *Object) MimeType(ctx context.Context) string {
|
||
err := o.loadMetadataIfNotLoaded(ctx)
|
||
if err != nil {
|
||
return "error/error"
|
||
}
|
||
return o.meta.MimeType
|
||
}
|
||
|
||
// Metadata returns metadata for an object
|
||
//
|
||
// It should return nil if there is no Metadata
|
||
func (o *Object) Metadata(ctx context.Context) (fs.Metadata, error) {
|
||
err := o.loadMetadataIfNotLoaded(ctx)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
do, ok := o.mo.(fs.Metadataer)
|
||
if !ok {
|
||
return nil, nil
|
||
}
|
||
return do.Metadata(ctx)
|
||
}
|
||
|
||
// SetMetadata sets metadata for an Object
|
||
//
|
||
// It should return fs.ErrorNotImplemented if it can't set metadata
|
||
func (o *Object) SetMetadata(ctx context.Context, metadata fs.Metadata) error {
|
||
do, ok := o.Object.(fs.SetMetadataer)
|
||
if !ok {
|
||
return fs.ErrorNotImplemented
|
||
}
|
||
return do.SetMetadata(ctx, metadata)
|
||
}
|
||
|
||
// Hash returns the selected checksum of the file
|
||
// If no checksum is available it returns ""
|
||
func (o *Object) Hash(ctx context.Context, ht hash.Type) (string, error) {
|
||
if ht != hash.MD5 {
|
||
return "", hash.ErrUnsupported
|
||
}
|
||
err := o.loadMetadataIfNotLoaded(ctx)
|
||
if err != nil {
|
||
return "", err
|
||
}
|
||
return o.meta.MD5, nil
|
||
}
|
||
|
||
// SetTier performs changing storage tier of the Object if
|
||
// multiple storage classes supported
|
||
func (o *Object) SetTier(tier string) error {
|
||
do, ok := o.Object.(fs.SetTierer)
|
||
mdo, mok := o.mo.(fs.SetTierer)
|
||
if !(ok && mok) {
|
||
return errors.New("press: underlying remote does not support SetTier")
|
||
}
|
||
if err := mdo.SetTier(tier); err != nil {
|
||
return err
|
||
}
|
||
return do.SetTier(tier)
|
||
}
|
||
|
||
// GetTier returns storage tier or class of the Object
|
||
func (o *Object) GetTier() string {
|
||
do, ok := o.mo.(fs.GetTierer)
|
||
if !ok {
|
||
return ""
|
||
}
|
||
return do.GetTier()
|
||
}
|
||
|
||
// UnWrap returns the wrapped Object
|
||
func (o *Object) UnWrap() fs.Object {
|
||
return o.Object
|
||
}
|
||
|
||
// Open opens the file for read. Call Close() on the returned io.ReadCloser. Note that this call requires quite a bit of overhead.
|
||
func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (rc io.ReadCloser, err error) {
|
||
err = o.loadMetadataIfNotLoaded(ctx)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
// If we're uncompressed, just pass this to the underlying object
|
||
if o.meta.Mode == Uncompressed {
|
||
return o.Object.Open(ctx, options...)
|
||
}
|
||
// Get offset and limit from OpenOptions, pass the rest to the underlying remote
|
||
var offset, limit int64 = 0, -1
|
||
for _, option := range options {
|
||
switch x := option.(type) {
|
||
case *fs.SeekOption:
|
||
offset = x.Offset
|
||
case *fs.RangeOption:
|
||
offset, limit = x.Decode(o.Size())
|
||
}
|
||
}
|
||
// Get a chunkedreader for the wrapped object
|
||
chunkedReader := chunkedreader.New(ctx, o.Object, initialChunkSize, maxChunkSize, chunkStreams)
|
||
var retCloser io.Closer = chunkedReader
|
||
return o.f.modeHandler.openGetReadCloser(ctx, o, offset, limit, chunkedReader, retCloser, options...)
|
||
}
|
||
|
||
// ObjectInfo describes a wrapped fs.ObjectInfo for being the source
|
||
type ObjectInfo struct {
|
||
src fs.ObjectInfo
|
||
fs *Fs
|
||
remote string
|
||
size int64
|
||
}
|
||
|
||
func (f *Fs) wrapInfo(src fs.ObjectInfo, newRemote string, size int64) *ObjectInfo {
|
||
return &ObjectInfo{
|
||
src: src,
|
||
fs: f,
|
||
remote: newRemote,
|
||
size: size,
|
||
}
|
||
}
|
||
|
||
// Fs returns read only access to the Fs that this object is part of
|
||
func (o *ObjectInfo) Fs() fs.Info {
|
||
if o.fs == nil {
|
||
panic("stub ObjectInfo")
|
||
}
|
||
return o.fs
|
||
}
|
||
|
||
// String returns string representation
|
||
func (o *ObjectInfo) String() string {
|
||
return o.src.String()
|
||
}
|
||
|
||
// Storable returns whether object is storable
|
||
func (o *ObjectInfo) Storable() bool {
|
||
return o.src.Storable()
|
||
}
|
||
|
||
// Remote returns the remote path
|
||
func (o *ObjectInfo) Remote() string {
|
||
if o.remote != "" {
|
||
return o.remote
|
||
}
|
||
return o.src.Remote()
|
||
}
|
||
|
||
// Size returns the size of the file
|
||
func (o *ObjectInfo) Size() int64 {
|
||
return o.size
|
||
}
|
||
|
||
// ModTime returns the modification time
|
||
func (o *ObjectInfo) ModTime(ctx context.Context) time.Time {
|
||
return o.src.ModTime(ctx)
|
||
}
|
||
|
||
// Hash returns the selected checksum of the file
|
||
// If no checksum is available it returns ""
|
||
func (o *ObjectInfo) Hash(ctx context.Context, ht hash.Type) (string, error) {
|
||
return "", nil // cannot know the checksum
|
||
}
|
||
|
||
// ID returns the ID of the Object if known, or "" if not
|
||
func (o *ObjectInfo) ID() string {
|
||
do, ok := o.src.(fs.IDer)
|
||
if !ok {
|
||
return ""
|
||
}
|
||
return do.ID()
|
||
}
|
||
|
||
// MimeType returns the content type of the Object if
|
||
// known, or "" if not
|
||
func (o *ObjectInfo) MimeType(ctx context.Context) string {
|
||
do, ok := o.src.(fs.MimeTyper)
|
||
if !ok {
|
||
return ""
|
||
}
|
||
return do.MimeType(ctx)
|
||
}
|
||
|
||
// UnWrap returns the Object that this Object is wrapping or
|
||
// nil if it isn't wrapping anything
|
||
func (o *ObjectInfo) UnWrap() fs.Object {
|
||
return fs.UnWrapObjectInfo(o.src)
|
||
}
|
||
|
||
// Metadata returns metadata for an object
|
||
//
|
||
// It should return nil if there is no Metadata
|
||
func (o *ObjectInfo) Metadata(ctx context.Context) (fs.Metadata, error) {
|
||
do, ok := o.src.(fs.Metadataer)
|
||
if !ok {
|
||
return nil, nil
|
||
}
|
||
return do.Metadata(ctx)
|
||
}
|
||
|
||
// GetTier returns storage tier or class of the Object
|
||
func (o *ObjectInfo) GetTier() string {
|
||
do, ok := o.src.(fs.GetTierer)
|
||
if !ok {
|
||
return ""
|
||
}
|
||
return do.GetTier()
|
||
}
|
||
|
||
// ID returns the ID of the Object if known, or "" if not
|
||
func (o *Object) ID() string {
|
||
do, ok := o.Object.(fs.IDer)
|
||
if !ok {
|
||
return ""
|
||
}
|
||
return do.ID()
|
||
}
|
||
|
||
// Name of the remote (as passed into NewFs)
|
||
func (f *Fs) Name() string {
|
||
return f.name
|
||
}
|
||
|
||
// Root of the remote (as passed into NewFs)
|
||
func (f *Fs) Root() string {
|
||
return f.root
|
||
}
|
||
|
||
// Features returns the optional features of this Fs
|
||
func (f *Fs) Features() *fs.Features {
|
||
return f.features
|
||
}
|
||
|
||
// Return a string version
|
||
func (f *Fs) String() string {
|
||
return fmt.Sprintf("Compressed: %s:%s", f.name, f.root)
|
||
}
|
||
|
||
// Precision returns the precision of this Fs
|
||
func (f *Fs) Precision() time.Duration {
|
||
return f.Fs.Precision()
|
||
}
|
||
|
||
// Check the interfaces are satisfied
|
||
var (
|
||
_ fs.Fs = (*Fs)(nil)
|
||
_ fs.Purger = (*Fs)(nil)
|
||
_ fs.Copier = (*Fs)(nil)
|
||
_ fs.Mover = (*Fs)(nil)
|
||
_ fs.DirMover = (*Fs)(nil)
|
||
_ fs.DirSetModTimer = (*Fs)(nil)
|
||
_ fs.MkdirMetadataer = (*Fs)(nil)
|
||
_ fs.PutStreamer = (*Fs)(nil)
|
||
_ fs.CleanUpper = (*Fs)(nil)
|
||
_ fs.UnWrapper = (*Fs)(nil)
|
||
_ fs.ListRer = (*Fs)(nil)
|
||
_ fs.Abouter = (*Fs)(nil)
|
||
_ fs.Wrapper = (*Fs)(nil)
|
||
_ fs.MergeDirser = (*Fs)(nil)
|
||
_ fs.DirCacheFlusher = (*Fs)(nil)
|
||
_ fs.ChangeNotifier = (*Fs)(nil)
|
||
_ fs.PublicLinker = (*Fs)(nil)
|
||
_ fs.Shutdowner = (*Fs)(nil)
|
||
_ fs.FullObjectInfo = (*ObjectInfo)(nil)
|
||
_ fs.FullObject = (*Object)(nil)
|
||
)
|