|
|
|
|
@@ -48,6 +48,7 @@ func (o *Object) SetModTime(ctx context.Context, modTime time.Time) error {
|
|
|
|
|
realpath := o.fs.realpath(o.Remote())
|
|
|
|
|
err := o.fs.client.Chtimes(realpath, modTime, modTime)
|
|
|
|
|
if err != nil {
|
|
|
|
|
fs.Errorf(o, "SetModTime: ChTimes(%q, %v, %v) returned error: %v", realpath, modTime, modTime, err)
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
o.modTime = modTime
|
|
|
|
|
@@ -113,31 +114,49 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
|
|
|
|
|
|
|
|
|
|
err := o.fs.client.MkdirAll(dirname, 0755)
|
|
|
|
|
if err != nil {
|
|
|
|
|
fs.Errorf(o, "update: MkdirAll(%q, 0755) returned error: %v", dirname, err)
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
_, err = o.fs.client.Stat(realpath)
|
|
|
|
|
if err == nil {
|
|
|
|
|
fs.Errorf(o, "update: Stat(%q) returned error: %v", realpath, err)
|
|
|
|
|
err = o.fs.client.Remove(realpath)
|
|
|
|
|
if err != nil {
|
|
|
|
|
fs.Errorf(o, "update: Remove(%q) returned error: %v", realpath, err)
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
out, err := o.fs.client.Create(realpath)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
cleanup := func() {
|
|
|
|
|
rerr := o.fs.client.Remove(realpath)
|
|
|
|
|
if rerr != nil {
|
|
|
|
|
fs.Errorf(o, "update: cleanup: Remove(%q) returned error: %v", realpath, err)
|
|
|
|
|
fs.Errorf(o.fs, "failed to remove [%v]: %v", realpath, rerr)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
_, err = io.Copy(out, in)
|
|
|
|
|
var out *hdfs.FileWriter
|
|
|
|
|
err = o.fs.pacer.Call(func() (bool, error) {
|
|
|
|
|
if out != nil {
|
|
|
|
|
_ = out.Close()
|
|
|
|
|
out = nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
out, err = o.fs.client.Create(realpath)
|
|
|
|
|
if err != nil {
|
|
|
|
|
fs.Errorf(o, "update: Create(%q) returned error: %v", realpath, err)
|
|
|
|
|
return false, err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
_, err = io.Copy(out, in)
|
|
|
|
|
if err == nil {
|
|
|
|
|
return false, nil
|
|
|
|
|
}
|
|
|
|
|
return errors.Is(err, io.ErrUnexpectedEOF), err
|
|
|
|
|
})
|
|
|
|
|
if err != nil {
|
|
|
|
|
fs.Errorf(o, "update: io.Copy returned error: %v", err)
|
|
|
|
|
cleanup()
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
@@ -160,21 +179,25 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
|
|
|
|
|
return errors.Is(err, hdfs.ErrReplicating), err
|
|
|
|
|
})
|
|
|
|
|
if err != nil {
|
|
|
|
|
fs.Errorf(o, "update: Close(%#v) returned error: %v", out, err)
|
|
|
|
|
cleanup()
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
info, err := o.fs.client.Stat(realpath)
|
|
|
|
|
if err != nil {
|
|
|
|
|
fs.Errorf(o, "update: Stat#2(%q) returned error: %v", realpath, err)
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
err = o.SetModTime(ctx, src.ModTime(ctx))
|
|
|
|
|
if err != nil {
|
|
|
|
|
fs.Errorf(o, "update: SetModTime(%v) returned error: %v", src.ModTime(ctx), err)
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
o.size = info.Size()
|
|
|
|
|
|
|
|
|
|
fs.Errorf(o, "update: returned no error")
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|