mirror of
https://github.com/rclone/rclone.git
synced 2025-12-10 13:23:21 +00:00
Compare commits
2 Commits
feat/cache
...
fix-6032-y
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7b3caf8d11 | ||
|
|
83ea146ea8 |
@@ -202,7 +202,11 @@ func (f *Fs) wrapEntries(baseEntries fs.DirEntries) (hashEntries fs.DirEntries,
|
|||||||
for _, entry := range baseEntries {
|
for _, entry := range baseEntries {
|
||||||
switch x := entry.(type) {
|
switch x := entry.(type) {
|
||||||
case fs.Object:
|
case fs.Object:
|
||||||
hashEntries = append(hashEntries, f.wrapObject(x, nil))
|
obj, err := f.wrapObject(x, nil)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
hashEntries = append(hashEntries, obj)
|
||||||
default:
|
default:
|
||||||
hashEntries = append(hashEntries, entry) // trash in - trash out
|
hashEntries = append(hashEntries, entry) // trash in - trash out
|
||||||
}
|
}
|
||||||
@@ -251,7 +255,7 @@ func (f *Fs) PutStream(ctx context.Context, in io.Reader, src fs.ObjectInfo, opt
|
|||||||
if do := f.Fs.Features().PutStream; do != nil {
|
if do := f.Fs.Features().PutStream; do != nil {
|
||||||
_ = f.pruneHash(src.Remote())
|
_ = f.pruneHash(src.Remote())
|
||||||
oResult, err := do(ctx, in, src, options...)
|
oResult, err := do(ctx, in, src, options...)
|
||||||
return f.wrapObject(oResult, err), err
|
return f.wrapObject(oResult, err)
|
||||||
}
|
}
|
||||||
return nil, errors.New("PutStream not supported")
|
return nil, errors.New("PutStream not supported")
|
||||||
}
|
}
|
||||||
@@ -261,7 +265,7 @@ func (f *Fs) PutUnchecked(ctx context.Context, in io.Reader, src fs.ObjectInfo,
|
|||||||
if do := f.Fs.Features().PutUnchecked; do != nil {
|
if do := f.Fs.Features().PutUnchecked; do != nil {
|
||||||
_ = f.pruneHash(src.Remote())
|
_ = f.pruneHash(src.Remote())
|
||||||
oResult, err := do(ctx, in, src, options...)
|
oResult, err := do(ctx, in, src, options...)
|
||||||
return f.wrapObject(oResult, err), err
|
return f.wrapObject(oResult, err)
|
||||||
}
|
}
|
||||||
return nil, errors.New("PutUnchecked not supported")
|
return nil, errors.New("PutUnchecked not supported")
|
||||||
}
|
}
|
||||||
@@ -348,7 +352,7 @@ func (f *Fs) Copy(ctx context.Context, src fs.Object, remote string) (fs.Object,
|
|||||||
return nil, fs.ErrorCantCopy
|
return nil, fs.ErrorCantCopy
|
||||||
}
|
}
|
||||||
oResult, err := do(ctx, o.Object, remote)
|
oResult, err := do(ctx, o.Object, remote)
|
||||||
return f.wrapObject(oResult, err), err
|
return f.wrapObject(oResult, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Move src to this remote using server-side move operations.
|
// Move src to this remote using server-side move operations.
|
||||||
@@ -371,7 +375,7 @@ func (f *Fs) Move(ctx context.Context, src fs.Object, remote string) (fs.Object,
|
|||||||
dir: false,
|
dir: false,
|
||||||
fs: f,
|
fs: f,
|
||||||
})
|
})
|
||||||
return f.wrapObject(oResult, nil), nil
|
return f.wrapObject(oResult, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
// DirMove moves src, srcRemote to this remote at dstRemote using server-side move operations.
|
// DirMove moves src, srcRemote to this remote at dstRemote using server-side move operations.
|
||||||
@@ -410,7 +414,7 @@ func (f *Fs) Shutdown(ctx context.Context) (err error) {
|
|||||||
// NewObject finds the Object at remote.
|
// NewObject finds the Object at remote.
|
||||||
func (f *Fs) NewObject(ctx context.Context, remote string) (fs.Object, error) {
|
func (f *Fs) NewObject(ctx context.Context, remote string) (fs.Object, error) {
|
||||||
o, err := f.Fs.NewObject(ctx, remote)
|
o, err := f.Fs.NewObject(ctx, remote)
|
||||||
return f.wrapObject(o, err), err
|
return f.wrapObject(o, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
//
|
//
|
||||||
@@ -424,11 +428,14 @@ type Object struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Wrap base object into hasher object
|
// Wrap base object into hasher object
|
||||||
func (f *Fs) wrapObject(o fs.Object, err error) *Object {
|
func (f *Fs) wrapObject(o fs.Object, err error) (*Object, error) {
|
||||||
if err != nil || o == nil {
|
if err != nil {
|
||||||
return nil
|
return nil, err
|
||||||
}
|
}
|
||||||
return &Object{Object: o, f: f}
|
if o == nil {
|
||||||
|
return nil, fs.ErrorObjectNotFound
|
||||||
|
}
|
||||||
|
return &Object{Object: o, f: f}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Fs returns read only access to the Fs that this object is part of
|
// Fs returns read only access to the Fs that this object is part of
|
||||||
|
|||||||
@@ -210,8 +210,8 @@ func (f *Fs) Put(ctx context.Context, in io.Reader, src fs.ObjectInfo, options .
|
|||||||
|
|
||||||
_ = f.pruneHash(src.Remote())
|
_ = f.pruneHash(src.Remote())
|
||||||
oResult, err := f.Fs.Put(ctx, wrapIn, src, options...)
|
oResult, err := f.Fs.Put(ctx, wrapIn, src, options...)
|
||||||
o = f.wrapObject(oResult, err)
|
o, err = f.wrapObject(oResult, err)
|
||||||
if o == nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -28,6 +28,7 @@ import (
|
|||||||
"github.com/rclone/rclone/lib/readers"
|
"github.com/rclone/rclone/lib/readers"
|
||||||
"github.com/rclone/rclone/lib/rest"
|
"github.com/rclone/rclone/lib/rest"
|
||||||
"golang.org/x/oauth2"
|
"golang.org/x/oauth2"
|
||||||
|
"golang.org/x/sync/errgroup"
|
||||||
)
|
)
|
||||||
|
|
||||||
//oAuth
|
//oAuth
|
||||||
@@ -1073,7 +1074,7 @@ func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (in io.Read
|
|||||||
return resp.Body, err
|
return resp.Body, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (o *Object) upload(ctx context.Context, in io.Reader, overwrite bool, mimeType string, options ...fs.OpenOption) (err error) {
|
func (o *Object) upload(ctx context.Context, in io.Reader, overwrite bool, src fs.ObjectInfo, options ...fs.OpenOption) (err error) {
|
||||||
// prepare upload
|
// prepare upload
|
||||||
var resp *http.Response
|
var resp *http.Response
|
||||||
var ur api.AsyncInfo
|
var ur api.AsyncInfo
|
||||||
@@ -1087,6 +1088,29 @@ func (o *Object) upload(ctx context.Context, in io.Reader, overwrite bool, mimeT
|
|||||||
opts.Parameters.Set("path", o.fs.opt.Enc.FromStandardPath(o.filePath()))
|
opts.Parameters.Set("path", o.fs.opt.Enc.FromStandardPath(o.filePath()))
|
||||||
opts.Parameters.Set("overwrite", strconv.FormatBool(overwrite))
|
opts.Parameters.Set("overwrite", strconv.FormatBool(overwrite))
|
||||||
|
|
||||||
|
// Check to see if we can calculate a MD5 and SHA256 hash and
|
||||||
|
// if so start calculating them to do de-dupe the uploads.
|
||||||
|
var (
|
||||||
|
hashes = src.Fs().Hashes()
|
||||||
|
size = src.Size()
|
||||||
|
dedupe = size >= 0 && hashes.Contains(hash.MD5) && hashes.Contains(hash.SHA256)
|
||||||
|
g *errgroup.Group
|
||||||
|
gCtx context.Context
|
||||||
|
md5sum string
|
||||||
|
sha256sum string
|
||||||
|
)
|
||||||
|
if dedupe {
|
||||||
|
g, gCtx = errgroup.WithContext(ctx)
|
||||||
|
g.Go(func() (err error) {
|
||||||
|
md5sum, err = src.Hash(gCtx, hash.MD5)
|
||||||
|
return err
|
||||||
|
})
|
||||||
|
g.Go(func() (err error) {
|
||||||
|
sha256sum, err = src.Hash(gCtx, hash.SHA256)
|
||||||
|
return err
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
err = o.fs.pacer.Call(func() (bool, error) {
|
err = o.fs.pacer.Call(func() (bool, error) {
|
||||||
resp, err = o.fs.srv.CallJSON(ctx, &opts, nil, &ur)
|
resp, err = o.fs.srv.CallJSON(ctx, &opts, nil, &ur)
|
||||||
return shouldRetry(ctx, resp, err)
|
return shouldRetry(ctx, resp, err)
|
||||||
@@ -1098,11 +1122,27 @@ func (o *Object) upload(ctx context.Context, in io.Reader, overwrite bool, mimeT
|
|||||||
|
|
||||||
// perform the actual upload
|
// perform the actual upload
|
||||||
opts = rest.Opts{
|
opts = rest.Opts{
|
||||||
RootURL: ur.HRef,
|
RootURL: ur.HRef,
|
||||||
Method: "PUT",
|
Method: "PUT",
|
||||||
ContentType: mimeType,
|
ContentType: fs.MimeType(ctx, src),
|
||||||
Body: in,
|
Body: in,
|
||||||
NoResponse: true,
|
ExtraHeaders: map[string]string{},
|
||||||
|
NoResponse: true,
|
||||||
|
}
|
||||||
|
if size >= 0 {
|
||||||
|
opts.ContentLength = &size
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add the hashes to the PUT to dedupe the upload if possible
|
||||||
|
if dedupe {
|
||||||
|
err = g.Wait()
|
||||||
|
if err != nil {
|
||||||
|
fs.Debugf(o, "failed to calculate MD5 or SHA256: %v", err)
|
||||||
|
} else {
|
||||||
|
opts.ExtraHeaders["Expect"] = "100-continue"
|
||||||
|
opts.ExtraHeaders["Etag"] = md5sum
|
||||||
|
opts.ExtraHeaders["Sha256"] = sha256sum
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
err = o.fs.pacer.Call(func() (bool, error) {
|
err = o.fs.pacer.Call(func() (bool, error) {
|
||||||
@@ -1130,7 +1170,7 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
|
|||||||
}
|
}
|
||||||
|
|
||||||
//upload file
|
//upload file
|
||||||
err = o.upload(ctx, in1, true, fs.MimeType(ctx, src), options...)
|
err = o.upload(ctx, in1, true, src, options...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user