1
0
mirror of https://github.com/rclone/rclone.git synced 2026-01-07 11:03:15 +00:00

s3: implement --s3-version-at flag - Fixes #1776

This commit is contained in:
Nick Craig-Wood
2022-07-26 17:58:57 +01:00
parent 1542a979f9
commit 4344a3e2ea
3 changed files with 372 additions and 61 deletions

View File

@@ -1989,6 +1989,20 @@ circumstances or for testing.
Help: "Include old versions in directory listings.",
Default: false,
Advanced: true,
}, {
Name: "version_at",
Help: `Show file versions as they were at the specified time.
The parameter should be a date, "2006-01-02", datetime "2006-01-02
15:04:05" or a duration for that long ago, eg "100d" or "1h".
Note that when using this no file write operations are permitted,
so you can't upload files or delete them.
See [the time option docs](/docs/#time-option) for valid formats.
`,
Default: fs.Time{},
Advanced: true,
},
}})
}
@@ -2011,6 +2025,11 @@ const (
maxExpireDuration = fs.Duration(7 * 24 * time.Hour) // max expiry is 1 week
)
// globals
var (
errNotWithVersionAt = errors.New("can't modify or delete files in --s3-version-at mode")
)
// system metadata keys which this backend owns
var systemMetadataInfo = map[string]fs.MetadataHelp{
"cache-control": {
@@ -2107,6 +2126,7 @@ type Options struct {
UseMultipartEtag fs.Tristate `config:"use_multipart_etag"`
UsePresignedRequest bool `config:"use_presigned_request"`
Versions bool `config:"versions"`
VersionAt fs.Time `config:"version_at"`
}
// Fs represents a remote s3 server
@@ -2599,6 +2619,9 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e
if err != nil {
return nil, fmt.Errorf("s3: upload cutoff: %w", err)
}
if opt.Versions && opt.VersionAt.IsSet() {
return nil, errors.New("s3: cant use --s3-versions and --s3-version-at at the same time")
}
if opt.ACL == "" {
opt.ACL = "private"
}
@@ -2697,11 +2720,15 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e
// It may return info == nil and err == nil if a HEAD would be more appropriate
func (f *Fs) getMetaDataListing(ctx context.Context, wantRemote string) (info *s3.Object, versionID *string, err error) {
bucket, bucketPath := f.split(wantRemote)
timestamp, bucketPath := version.Remove(bucketPath)
// If the path had no version string return no info, to force caller to look it up
if timestamp.IsZero() {
return nil, nil, nil
// Strip the version string off if using versions
if f.opt.Versions {
var timestamp time.Time
timestamp, bucketPath = version.Remove(bucketPath)
// If the path had no version string return no info, to force caller to look it up
if timestamp.IsZero() {
return nil, nil, nil
}
}
err = f.list(ctx, listOpt{
@@ -2710,6 +2737,7 @@ func (f *Fs) getMetaDataListing(ctx context.Context, wantRemote string) (info *s
recurse: true,
withVersions: f.opt.Versions,
findFile: true,
versionAt: f.opt.VersionAt,
}, func(gotRemote string, object *s3.Object, objectVersionID *string, isDirectory bool) error {
if isDirectory {
return nil
@@ -2741,8 +2769,8 @@ func (f *Fs) newObjectWithInfo(ctx context.Context, remote string, info *s3.Obje
fs: f,
remote: remote,
}
if info == nil && f.opt.Versions && version.Match(remote) {
// If versions, have to read the listing to find the version ID
if info == nil && ((f.opt.Versions && version.Match(remote)) || f.opt.VersionAt.IsSet()) {
// If versions, have to read the listing to find the correct version ID
info, versionID, err = f.getMetaDataListing(ctx, remote)
if err != nil {
return nil, err
@@ -2822,7 +2850,7 @@ func (f *Fs) updateRegionForBucket(ctx context.Context, bucket string) error {
// Common interface for bucket listers
type bucketLister interface {
List(ctx context.Context, hidden bool) (resp *s3.ListObjectsV2Output, versionIDs []*string, err error)
List(ctx context.Context) (resp *s3.ListObjectsV2Output, versionIDs []*string, err error)
URLEncodeListings(bool)
}
@@ -2843,7 +2871,7 @@ func (f *Fs) newV1List(req *s3.ListObjectsV2Input) bucketLister {
}
// List a bucket with V1 listing
func (ls *v1List) List(ctx context.Context, hidden bool) (resp *s3.ListObjectsV2Output, versionIDs []*string, err error) {
func (ls *v1List) List(ctx context.Context) (resp *s3.ListObjectsV2Output, versionIDs []*string, err error) {
respv1, err := ls.f.c.ListObjectsWithContext(ctx, &ls.req)
if err != nil {
return nil, nil, err
@@ -2899,7 +2927,7 @@ func (f *Fs) newV2List(req *s3.ListObjectsV2Input) bucketLister {
}
// Do a V2 listing
func (ls *v2List) List(ctx context.Context, hidden bool) (resp *s3.ListObjectsV2Output, versionIDs []*string, err error) {
func (ls *v2List) List(ctx context.Context) (resp *s3.ListObjectsV2Output, versionIDs []*string, err error) {
resp, err = ls.f.c.ListObjectsV2WithContext(ctx, &ls.req)
ls.req.ContinuationToken = resp.NextContinuationToken
return resp, nil, err
@@ -2916,25 +2944,88 @@ func (ls *v2List) URLEncodeListings(encode bool) {
// Versions bucket lister
type versionsList struct {
f *Fs
req s3.ListObjectVersionsInput
f *Fs
req s3.ListObjectVersionsInput
versionAt time.Time // set if we want only versions before this
usingVersionAt bool // set if we need to use versionAt
hidden bool // set to see hidden versions
lastKeySent string // last Key sent to the receiving function
}
// Create a new Versions bucket lister
func (f *Fs) newVersionsList(req *s3.ListObjectsV2Input) bucketLister {
func (f *Fs) newVersionsList(req *s3.ListObjectsV2Input, hidden bool, versionAt time.Time) bucketLister {
l := &versionsList{
f: f,
f: f,
versionAt: versionAt,
usingVersionAt: !versionAt.IsZero(),
hidden: hidden,
}
// Convert v2 req into withVersions req
structs.SetFrom(&l.req, req)
return l
}
// Any s3.Objects with this as their size are delete markers
// Any s3.Object or s3.ObjectVersion with this as their Size are delete markers
var isDeleteMarker = new(int64)
// Compare two s3.ObjectVersions, sorted alphabetically by key with
// the newest first if the Keys match or the one with IsLatest set if
// everything matches.
func versionLess(a, b *s3.ObjectVersion) bool {
if a == nil || a.Key == nil || a.LastModified == nil {
return true
}
if b == nil || b.Key == nil || b.LastModified == nil {
return false
}
if *a.Key < *b.Key {
return true
}
if *a.Key > *b.Key {
return false
}
dt := (*a.LastModified).Sub(*b.LastModified)
if dt > 0 {
return true
}
if dt < 0 {
return false
}
if aws.BoolValue(a.IsLatest) {
return true
}
return false
}
// Merge the DeleteMarkers into the Versions.
//
// These are delivered by S3 sorted by key then by LastUpdated
// newest first but annoyingly the SDK splits them up into two
// so we need to merge them back again
//
// We do this by converting the s3.DeleteEntry into
// s3.ObjectVersion with Size = isDeleteMarker to tell them apart
//
// We then merge them back into the Versions in the correct order
func mergeDeleteMarkers(oldVersions []*s3.ObjectVersion, deleteMarkers []*s3.DeleteMarkerEntry) (newVersions []*s3.ObjectVersion) {
newVersions = make([]*s3.ObjectVersion, 0, len(oldVersions)+len(deleteMarkers))
for _, deleteMarker := range deleteMarkers {
var obj = new(s3.ObjectVersion)
structs.SetFrom(obj, deleteMarker)
obj.Size = isDeleteMarker
for len(oldVersions) > 0 && versionLess(oldVersions[0], obj) {
newVersions = append(newVersions, oldVersions[0])
oldVersions = oldVersions[1:]
}
newVersions = append(newVersions, obj)
}
// Merge any remaining versions
newVersions = append(newVersions, oldVersions...)
return newVersions
}
// List a bucket with versions
func (ls *versionsList) List(ctx context.Context, hidden bool) (resp *s3.ListObjectsV2Output, versionIDs []*string, err error) {
func (ls *versionsList) List(ctx context.Context) (resp *s3.ListObjectsV2Output, versionIDs []*string, err error) {
respVersions, err := ls.f.c.ListObjectVersionsWithContext(ctx, &ls.req)
if err != nil {
return nil, nil, err
@@ -2956,16 +3047,36 @@ func (ls *versionsList) List(ctx context.Context, hidden bool) (resp *s3.ListObj
resp = new(s3.ListObjectsV2Output)
structs.SetFrom(resp, respVersions)
// Merge in delete Markers as s3.ObjectVersion if we need them
if ls.hidden || ls.usingVersionAt {
respVersions.Versions = mergeDeleteMarkers(respVersions.Versions, respVersions.DeleteMarkers)
}
// Convert the Versions and the DeleteMarkers into an array of s3.Object
//
// These are returned in the order that they are stored with the most recent first.
// With the annoyance that the Versions and DeleteMarkers are split into two
objs := make([]*s3.Object, 0, len(respVersions.Versions))
for _, objVersion := range respVersions.Versions {
if ls.usingVersionAt {
if objVersion.LastModified.After(ls.versionAt) {
// Ignore versions that were created after the specified time
continue
}
if *objVersion.Key == ls.lastKeySent {
// Ignore versions before the already returned version
continue
}
}
ls.lastKeySent = *objVersion.Key
// Don't send delete markers if we don't want hidden things
if !ls.hidden && objVersion.Size == isDeleteMarker {
continue
}
var obj = new(s3.Object)
structs.SetFrom(obj, objVersion)
// Adjust the file names
if !aws.BoolValue(objVersion.IsLatest) {
if !ls.usingVersionAt && !aws.BoolValue(objVersion.IsLatest) {
if obj.Key != nil && objVersion.LastModified != nil {
*obj.Key = version.Add(*obj.Key, *objVersion.LastModified)
}
@@ -2974,25 +3085,6 @@ func (ls *versionsList) List(ctx context.Context, hidden bool) (resp *s3.ListObj
versionIDs = append(versionIDs, objVersion.VersionId)
}
// If hidden is set, put the delete markers in too, but set
// their sizes to a sentinel delete marker size
if hidden {
for _, deleteMarker := range respVersions.DeleteMarkers {
var obj = new(s3.Object)
structs.SetFrom(obj, deleteMarker)
obj.Size = isDeleteMarker
// Adjust the file names
if !aws.BoolValue(deleteMarker.IsLatest) {
if obj.Key != nil && deleteMarker.LastModified != nil {
*obj.Key = version.Add(*obj.Key, *deleteMarker.LastModified)
}
}
objs = append(objs, obj)
versionIDs = append(versionIDs, deleteMarker.VersionId)
}
}
resp.Contents = objs
return resp, versionIDs, nil
}
@@ -3015,14 +3107,15 @@ var errEndList = errors.New("end list")
// list options
type listOpt struct {
bucket string // bucket to list
directory string // directory with bucket
prefix string // prefix to remove from listing
addBucket bool // if set, the bucket is added to the start of the remote
recurse bool // if set, recurse to read sub directories
withVersions bool // if set, versions are produced
hidden bool // if set, return delete markers as objects with size == isDeleteMarker
findFile bool // if set, it will look for files called (bucket, directory)
bucket string // bucket to list
directory string // directory with bucket
prefix string // prefix to remove from listing
addBucket bool // if set, the bucket is added to the start of the remote
recurse bool // if set, recurse to read sub directories
withVersions bool // if set, versions are produced
hidden bool // if set, return delete markers as objects with size == isDeleteMarker
findFile bool // if set, it will look for files called (bucket, directory)
versionAt fs.Time // if set only show versions <= this time
}
// list lists the objects into the function supplied with the opt
@@ -3067,8 +3160,8 @@ func (f *Fs) list(ctx context.Context, opt listOpt, fn listFn) error {
}
var listBucket bucketLister
switch {
case opt.withVersions:
listBucket = f.newVersionsList(&req)
case opt.withVersions || opt.versionAt.IsSet():
listBucket = f.newVersionsList(&req, opt.hidden, time.Time(opt.versionAt))
case f.opt.ListVersion == 1:
listBucket = f.newV1List(&req)
default:
@@ -3080,7 +3173,7 @@ func (f *Fs) list(ctx context.Context, opt listOpt, fn listFn) error {
var versionIDs []*string
err = f.pacer.Call(func() (bool, error) {
listBucket.URLEncodeListings(urlEncodeListings)
resp, versionIDs, err = listBucket.List(ctx, opt.hidden)
resp, versionIDs, err = listBucket.List(ctx)
if err != nil && !urlEncodeListings {
if awsErr, ok := err.(awserr.RequestFailure); ok {
if origErr := awsErr.OrigErr(); origErr != nil {
@@ -3215,6 +3308,7 @@ func (f *Fs) listDir(ctx context.Context, bucket, directory, prefix string, addB
prefix: prefix,
addBucket: addBucket,
withVersions: f.opt.Versions,
versionAt: f.opt.VersionAt,
}, func(remote string, object *s3.Object, versionID *string, isDirectory bool) error {
entry, err := f.itemToDirEntry(ctx, remote, object, versionID, isDirectory)
if err != nil {
@@ -3300,6 +3394,7 @@ func (f *Fs) ListR(ctx context.Context, dir string, callback fs.ListRCallback) (
addBucket: addBucket,
recurse: true,
withVersions: f.opt.Versions,
versionAt: f.opt.VersionAt,
}, func(remote string, object *s3.Object, versionID *string, isDirectory bool) error {
entry, err := f.itemToDirEntry(ctx, remote, object, versionID, isDirectory)
if err != nil {
@@ -3609,6 +3704,9 @@ func (f *Fs) copyMultipart(ctx context.Context, copyReq *s3.CopyObjectInput, dst
//
// If it isn't possible then return fs.ErrorCantCopy
func (f *Fs) Copy(ctx context.Context, src fs.Object, remote string) (fs.Object, error) {
if f.opt.VersionAt.IsSet() {
return nil, errNotWithVersionAt
}
dstBucket, dstPath := f.split(remote)
err := f.makeBucket(ctx, dstBucket)
if err != nil {
@@ -4074,6 +4172,9 @@ func (f *Fs) CleanUp(ctx context.Context) (err error) {
//
// Implemented here so we can make sure we delete old versions.
func (f *Fs) purge(ctx context.Context, dir string, oldOnly bool) error {
if f.opt.VersionAt.IsSet() {
return errNotWithVersionAt
}
bucket, directory := f.split(dir)
if bucket == "" {
return errors.New("can't purge from root")
@@ -4871,6 +4972,9 @@ func (o *Object) uploadSinglepartPresignedRequest(ctx context.Context, req *s3.P
// Update the Object from in with modTime and size
func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error {
if o.fs.opt.VersionAt.IsSet() {
return errNotWithVersionAt
}
bucket, bucketPath := o.split()
err := o.fs.makeBucket(ctx, bucket)
if err != nil {
@@ -5075,6 +5179,9 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
// Remove an object
func (o *Object) Remove(ctx context.Context) error {
if o.fs.opt.VersionAt.IsSet() {
return errNotWithVersionAt
}
bucket, bucketPath := o.split()
req := s3.DeleteObjectInput{
Bucket: &bucket,