1
0
mirror of https://github.com/rclone/rclone.git synced 2025-12-10 13:23:21 +00:00

Compare commits

...

2 Commits

Author SHA1 Message Date
Nick Craig-Wood
7b3caf8d11 yandex: support deduplicated uploads - fixes #6032 2022-03-13 15:57:28 +00:00
Nick Craig-Wood
83ea146ea8 hasher: fix crash on object not found
Before this fix `NewObject` could return a wrapped `fs.Object(nil)`
which caused a crash.

This changes the interface of `wrapObject` so it returns an error
which must be checked. This forces the callers to return a `nil`
object rather than an `fs.Object(nil)`.

See: https://forum.rclone.org/t/panic-in-hasher-when-mounting-with-vfs-cache-and-not-synced-data-in-the-cache/29697/11
2022-03-12 16:45:25 +00:00
3 changed files with 66 additions and 19 deletions

View File

@@ -202,7 +202,11 @@ func (f *Fs) wrapEntries(baseEntries fs.DirEntries) (hashEntries fs.DirEntries,
for _, entry := range baseEntries { for _, entry := range baseEntries {
switch x := entry.(type) { switch x := entry.(type) {
case fs.Object: case fs.Object:
hashEntries = append(hashEntries, f.wrapObject(x, nil)) obj, err := f.wrapObject(x, nil)
if err != nil {
return nil, err
}
hashEntries = append(hashEntries, obj)
default: default:
hashEntries = append(hashEntries, entry) // trash in - trash out hashEntries = append(hashEntries, entry) // trash in - trash out
} }
@@ -251,7 +255,7 @@ func (f *Fs) PutStream(ctx context.Context, in io.Reader, src fs.ObjectInfo, opt
if do := f.Fs.Features().PutStream; do != nil { if do := f.Fs.Features().PutStream; do != nil {
_ = f.pruneHash(src.Remote()) _ = f.pruneHash(src.Remote())
oResult, err := do(ctx, in, src, options...) oResult, err := do(ctx, in, src, options...)
return f.wrapObject(oResult, err), err return f.wrapObject(oResult, err)
} }
return nil, errors.New("PutStream not supported") return nil, errors.New("PutStream not supported")
} }
@@ -261,7 +265,7 @@ func (f *Fs) PutUnchecked(ctx context.Context, in io.Reader, src fs.ObjectInfo,
if do := f.Fs.Features().PutUnchecked; do != nil { if do := f.Fs.Features().PutUnchecked; do != nil {
_ = f.pruneHash(src.Remote()) _ = f.pruneHash(src.Remote())
oResult, err := do(ctx, in, src, options...) oResult, err := do(ctx, in, src, options...)
return f.wrapObject(oResult, err), err return f.wrapObject(oResult, err)
} }
return nil, errors.New("PutUnchecked not supported") return nil, errors.New("PutUnchecked not supported")
} }
@@ -348,7 +352,7 @@ func (f *Fs) Copy(ctx context.Context, src fs.Object, remote string) (fs.Object,
return nil, fs.ErrorCantCopy return nil, fs.ErrorCantCopy
} }
oResult, err := do(ctx, o.Object, remote) oResult, err := do(ctx, o.Object, remote)
return f.wrapObject(oResult, err), err return f.wrapObject(oResult, err)
} }
// Move src to this remote using server-side move operations. // Move src to this remote using server-side move operations.
@@ -371,7 +375,7 @@ func (f *Fs) Move(ctx context.Context, src fs.Object, remote string) (fs.Object,
dir: false, dir: false,
fs: f, fs: f,
}) })
return f.wrapObject(oResult, nil), nil return f.wrapObject(oResult, nil)
} }
// DirMove moves src, srcRemote to this remote at dstRemote using server-side move operations. // DirMove moves src, srcRemote to this remote at dstRemote using server-side move operations.
@@ -410,7 +414,7 @@ func (f *Fs) Shutdown(ctx context.Context) (err error) {
// NewObject finds the Object at remote. // NewObject finds the Object at remote.
func (f *Fs) NewObject(ctx context.Context, remote string) (fs.Object, error) { func (f *Fs) NewObject(ctx context.Context, remote string) (fs.Object, error) {
o, err := f.Fs.NewObject(ctx, remote) o, err := f.Fs.NewObject(ctx, remote)
return f.wrapObject(o, err), err return f.wrapObject(o, err)
} }
// //
@@ -424,11 +428,14 @@ type Object struct {
} }
// Wrap base object into hasher object // Wrap base object into hasher object
func (f *Fs) wrapObject(o fs.Object, err error) *Object { func (f *Fs) wrapObject(o fs.Object, err error) (*Object, error) {
if err != nil || o == nil { if err != nil {
return nil return nil, err
} }
return &Object{Object: o, f: f} if o == nil {
return nil, fs.ErrorObjectNotFound
}
return &Object{Object: o, f: f}, nil
} }
// Fs returns read only access to the Fs that this object is part of // Fs returns read only access to the Fs that this object is part of

View File

@@ -210,8 +210,8 @@ func (f *Fs) Put(ctx context.Context, in io.Reader, src fs.ObjectInfo, options .
_ = f.pruneHash(src.Remote()) _ = f.pruneHash(src.Remote())
oResult, err := f.Fs.Put(ctx, wrapIn, src, options...) oResult, err := f.Fs.Put(ctx, wrapIn, src, options...)
o = f.wrapObject(oResult, err) o, err = f.wrapObject(oResult, err)
if o == nil { if err != nil {
return nil, err return nil, err
} }

View File

@@ -28,6 +28,7 @@ import (
"github.com/rclone/rclone/lib/readers" "github.com/rclone/rclone/lib/readers"
"github.com/rclone/rclone/lib/rest" "github.com/rclone/rclone/lib/rest"
"golang.org/x/oauth2" "golang.org/x/oauth2"
"golang.org/x/sync/errgroup"
) )
//oAuth //oAuth
@@ -1073,7 +1074,7 @@ func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (in io.Read
return resp.Body, err return resp.Body, err
} }
func (o *Object) upload(ctx context.Context, in io.Reader, overwrite bool, mimeType string, options ...fs.OpenOption) (err error) { func (o *Object) upload(ctx context.Context, in io.Reader, overwrite bool, src fs.ObjectInfo, options ...fs.OpenOption) (err error) {
// prepare upload // prepare upload
var resp *http.Response var resp *http.Response
var ur api.AsyncInfo var ur api.AsyncInfo
@@ -1087,6 +1088,29 @@ func (o *Object) upload(ctx context.Context, in io.Reader, overwrite bool, mimeT
opts.Parameters.Set("path", o.fs.opt.Enc.FromStandardPath(o.filePath())) opts.Parameters.Set("path", o.fs.opt.Enc.FromStandardPath(o.filePath()))
opts.Parameters.Set("overwrite", strconv.FormatBool(overwrite)) opts.Parameters.Set("overwrite", strconv.FormatBool(overwrite))
// Check to see if we can calculate a MD5 and SHA256 hash and
// if so start calculating them to do de-dupe the uploads.
var (
hashes = src.Fs().Hashes()
size = src.Size()
dedupe = size >= 0 && hashes.Contains(hash.MD5) && hashes.Contains(hash.SHA256)
g *errgroup.Group
gCtx context.Context
md5sum string
sha256sum string
)
if dedupe {
g, gCtx = errgroup.WithContext(ctx)
g.Go(func() (err error) {
md5sum, err = src.Hash(gCtx, hash.MD5)
return err
})
g.Go(func() (err error) {
sha256sum, err = src.Hash(gCtx, hash.SHA256)
return err
})
}
err = o.fs.pacer.Call(func() (bool, error) { err = o.fs.pacer.Call(func() (bool, error) {
resp, err = o.fs.srv.CallJSON(ctx, &opts, nil, &ur) resp, err = o.fs.srv.CallJSON(ctx, &opts, nil, &ur)
return shouldRetry(ctx, resp, err) return shouldRetry(ctx, resp, err)
@@ -1098,11 +1122,27 @@ func (o *Object) upload(ctx context.Context, in io.Reader, overwrite bool, mimeT
// perform the actual upload // perform the actual upload
opts = rest.Opts{ opts = rest.Opts{
RootURL: ur.HRef, RootURL: ur.HRef,
Method: "PUT", Method: "PUT",
ContentType: mimeType, ContentType: fs.MimeType(ctx, src),
Body: in, Body: in,
NoResponse: true, ExtraHeaders: map[string]string{},
NoResponse: true,
}
if size >= 0 {
opts.ContentLength = &size
}
// Add the hashes to the PUT to dedupe the upload if possible
if dedupe {
err = g.Wait()
if err != nil {
fs.Debugf(o, "failed to calculate MD5 or SHA256: %v", err)
} else {
opts.ExtraHeaders["Expect"] = "100-continue"
opts.ExtraHeaders["Etag"] = md5sum
opts.ExtraHeaders["Sha256"] = sha256sum
}
} }
err = o.fs.pacer.Call(func() (bool, error) { err = o.fs.pacer.Call(func() (bool, error) {
@@ -1130,7 +1170,7 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
} }
//upload file //upload file
err = o.upload(ctx, in1, true, fs.MimeType(ctx, src), options...) err = o.upload(ctx, in1, true, src, options...)
if err != nil { if err != nil {
return err return err
} }