1
0
mirror of https://github.com/rclone/rclone.git synced 2026-02-18 18:33:36 +00:00

internxt: implement re-login under refresh logic, improve retry logic - fixes #9174

This commit is contained in:
José Zúniga
2026-02-13 14:18:51 -05:00
committed by GitHub
parent 4b3aa5aea0
commit 33859568d6
4 changed files with 207 additions and 56 deletions

View File

@@ -13,8 +13,12 @@ import (
"github.com/golang-jwt/jwt/v5"
internxtauth "github.com/internxt/rclone-adapter/auth"
internxtconfig "github.com/internxt/rclone-adapter/config"
sdkerrors "github.com/internxt/rclone-adapter/errors"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/config/configmap"
"github.com/rclone/rclone/fs/config/obscure"
"github.com/rclone/rclone/fs/fserrors"
"github.com/rclone/rclone/fs/fshttp"
"github.com/rclone/rclone/lib/oauthutil"
"golang.org/x/oauth2"
)
@@ -101,7 +105,6 @@ func jwtToOAuth2Token(jwtString string) (*oauth2.Token, error) {
}
// computeBasicAuthHeader creates the BasicAuthHeader for bucket operations
// Following the pattern from SDK's auth/access.go:96-102
func computeBasicAuthHeader(bridgeUser, userID string) string {
sum := sha256.Sum256([]byte(userID))
hexPass := hex.EncodeToString(sum[:])
@@ -144,3 +147,100 @@ func refreshJWTToken(ctx context.Context, name string, m configmap.Mapper) error
fs.Debugf(name, "Token refreshed successfully, new expiry: %v", token.Expiry)
return nil
}
// reLogin performs a full re-login using stored email+password credentials.
// Returns the AccessResponse on success, or an error if 2FA is required or login fails.
func (f *Fs) reLogin(ctx context.Context) (*internxtauth.AccessResponse, error) {
password, err := obscure.Reveal(f.opt.Pass)
if err != nil {
return nil, fmt.Errorf("couldn't decrypt password: %w", err)
}
cfg := internxtconfig.NewDefaultToken("")
cfg.HTTPClient = fshttp.NewClient(ctx)
loginResp, err := internxtauth.Login(ctx, cfg, f.opt.Email)
if err != nil {
return nil, fmt.Errorf("re-login check failed: %w", err)
}
if loginResp.TFA {
return nil, errors.New("account requires 2FA - please run: rclone config reconnect " + f.name + ":")
}
resp, err := internxtauth.DoLogin(ctx, cfg, f.opt.Email, password, "")
if err != nil {
return nil, fmt.Errorf("re-login failed: %w", err)
}
return resp, nil
}
// refreshOrReLogin tries to refresh the JWT token first; if that fails with 401,
// it falls back to a full re-login using stored credentials.
func (f *Fs) refreshOrReLogin(ctx context.Context) error {
refreshErr := refreshJWTToken(ctx, f.name, f.m)
if refreshErr == nil {
newToken, err := oauthutil.GetToken(f.name, f.m)
if err != nil {
return fmt.Errorf("failed to get refreshed token: %w", err)
}
f.cfg.Token = newToken.AccessToken
f.cfg.BasicAuthHeader = computeBasicAuthHeader(f.bridgeUser, f.userID)
fs.Debugf(f, "Token refresh succeeded")
return nil
}
var httpErr *sdkerrors.HTTPError
if !errors.As(refreshErr, &httpErr) || httpErr.StatusCode() != 401 {
if fserrors.ShouldRetry(refreshErr) {
return refreshErr
}
return refreshErr
}
fs.Debugf(f, "Token refresh returned 401, attempting re-login with stored credentials")
resp, err := f.reLogin(ctx)
if err != nil {
return fmt.Errorf("re-login fallback failed: %w", err)
}
oauthToken, err := jwtToOAuth2Token(resp.NewToken)
if err != nil {
return fmt.Errorf("failed to parse re-login token: %w", err)
}
err = oauthutil.PutToken(f.name, f.m, oauthToken, true)
if err != nil {
return fmt.Errorf("failed to save re-login token: %w", err)
}
f.cfg.Token = oauthToken.AccessToken
f.bridgeUser = resp.User.BridgeUser
f.userID = resp.User.UserID
f.cfg.BasicAuthHeader = computeBasicAuthHeader(f.bridgeUser, f.userID)
f.cfg.Bucket = resp.User.Bucket
f.cfg.RootFolderID = resp.User.RootFolderID
fs.Debugf(f, "Re-login succeeded, new token expiry: %v", oauthToken.Expiry)
return nil
}
// reAuthorize is called after getting 401 from the server.
// It serializes re-auth attempts and uses a circuit-breaker to avoid infinite loops.
func (f *Fs) reAuthorize(ctx context.Context) error {
f.authMu.Lock()
defer f.authMu.Unlock()
if f.authFailed {
return errors.New("re-authorization permanently failed")
}
err := f.refreshOrReLogin(ctx)
if err != nil {
f.authFailed = true
return err
}
return nil
}

View File

@@ -11,6 +11,7 @@ import (
"path"
"path/filepath"
"strings"
"sync"
"time"
"github.com/internxt/rclone-adapter/auth"
@@ -41,16 +42,34 @@ const (
decayConstant = 2 // bigger for slower decay, exponential
)
// shouldRetry determines if an error should be retried
func shouldRetry(ctx context.Context, err error) (bool, error) {
// shouldRetry determines if an error should be retried.
// On 401, it attempts to re-authorize before retrying.
// On 429, it honours the server's rate limit retry delay.
func (f *Fs) shouldRetry(ctx context.Context, err error) (bool, error) {
if fserrors.ContextError(ctx, &err) {
return false, err
}
var httpErr *sdkerrors.HTTPError
if errors.As(err, &httpErr) && httpErr.StatusCode() == 401 {
return true, err
if errors.As(err, &httpErr) {
switch httpErr.StatusCode() {
case 401:
if !f.authFailed {
authErr := f.reAuthorize(ctx)
if authErr != nil {
fs.Debugf(f, "Re-authorization failed: %v", authErr)
return false, err
}
return true, err
}
return false, err
case 429:
delay := httpErr.RetryAfter()
if delay <= 0 {
delay = time.Second
}
return true, pacer.RetryAfterError(err, delay)
}
}
return fserrors.ShouldRetry(err), err
}
@@ -184,6 +203,7 @@ type Fs struct {
name string
root string
opt Options
m configmap.Mapper
dirCache *dircache.DirCache
cfg *config.Config
features *fs.Features
@@ -191,6 +211,8 @@ type Fs struct {
tokenRenewer *oauthutil.Renew
bridgeUser string
userID string
authMu sync.Mutex
authFailed bool
}
// Object holds the data for a remote file object
@@ -263,45 +285,62 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e
cfg.SkipHashValidation = opt.SkipHashValidation
cfg.HTTPClient = fshttp.NewClient(ctx)
userInfo, err := getUserInfo(ctx, &userInfoConfig{Token: cfg.Token})
if err != nil {
return nil, fmt.Errorf("failed to fetch user info: %w", err)
}
cfg.RootFolderID = userInfo.RootFolderID
cfg.Bucket = userInfo.Bucket
cfg.BasicAuthHeader = computeBasicAuthHeader(userInfo.BridgeUser, userInfo.UserID)
f := &Fs{
name: name,
root: strings.Trim(root, "/"),
opt: *opt,
cfg: cfg,
bridgeUser: userInfo.BridgeUser,
userID: userInfo.UserID,
name: name,
root: strings.Trim(root, "/"),
opt: *opt,
m: m,
cfg: cfg,
}
f.pacer = fs.NewPacer(ctx, pacer.NewDefault(pacer.MinSleep(minSleep), pacer.MaxSleep(maxSleep), pacer.DecayConstant(decayConstant)))
var userInfo *userInfo
const maxRetries = 3
for attempt := 1; attempt <= maxRetries; attempt++ {
userInfo, err = getUserInfo(ctx, &userInfoConfig{Token: f.cfg.Token})
if err == nil {
break
}
var httpErr *sdkerrors.HTTPError
if errors.As(err, &httpErr) && httpErr.StatusCode() == 401 {
fs.Debugf(f, "getUserInfo returned 401, attempting re-auth")
authErr := f.refreshOrReLogin(ctx)
if authErr != nil {
return nil, fmt.Errorf("failed to fetch user info (re-auth failed): %w", err)
}
userInfo, err = getUserInfo(ctx, &userInfoConfig{Token: f.cfg.Token})
if err == nil {
break
}
return nil, fmt.Errorf("failed to fetch user info after re-auth: %w", err)
}
if fserrors.ShouldRetry(err) && attempt < maxRetries {
fs.Debugf(f, "getUserInfo transient error (attempt %d/%d): %v", attempt, maxRetries, err)
time.Sleep(time.Duration(attempt) * time.Second)
continue
}
return nil, fmt.Errorf("failed to fetch user info: %w", err)
}
f.cfg.RootFolderID = userInfo.RootFolderID
f.cfg.Bucket = userInfo.Bucket
f.cfg.BasicAuthHeader = computeBasicAuthHeader(userInfo.BridgeUser, userInfo.UserID)
f.bridgeUser = userInfo.BridgeUser
f.userID = userInfo.UserID
f.features = (&fs.Features{
CanHaveEmptyDirectories: true,
}).Fill(ctx, f)
if ts != nil {
f.tokenRenewer = oauthutil.NewRenew(f.String(), ts, func() error {
err := refreshJWTToken(ctx, name, m)
if err != nil {
return err
}
newToken, err := oauthutil.GetToken(name, m)
if err != nil {
return fmt.Errorf("failed to get refreshed token: %w", err)
}
f.cfg.Token = newToken.AccessToken
f.cfg.BasicAuthHeader = computeBasicAuthHeader(f.bridgeUser, f.userID)
return nil
f.authMu.Lock()
defer f.authMu.Unlock()
return f.refreshOrReLogin(ctx)
})
f.tokenRenewer.Start()
}
@@ -312,9 +351,19 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e
if err != nil {
// Assume it might be a file
newRoot, remote := dircache.SplitPath(f.root)
tempF := *f
tempF.dirCache = dircache.New(newRoot, f.cfg.RootFolderID, &tempF)
tempF.root = newRoot
tempF := &Fs{
name: f.name,
root: newRoot,
opt: f.opt,
m: f.m,
cfg: f.cfg,
features: f.features,
pacer: f.pacer,
tokenRenewer: f.tokenRenewer,
bridgeUser: f.bridgeUser,
userID: f.userID,
}
tempF.dirCache = dircache.New(newRoot, f.cfg.RootFolderID, tempF)
err = tempF.dirCache.FindRoot(ctx, false)
if err != nil {
@@ -367,7 +416,7 @@ func (f *Fs) Rmdir(ctx context.Context, dir string) error {
err = f.pacer.Call(func() (bool, error) {
var err error
childFolders, err = folders.ListAllFolders(ctx, f.cfg, id)
return shouldRetry(ctx, err)
return f.shouldRetry(ctx, err)
})
if err != nil {
return err
@@ -380,7 +429,7 @@ func (f *Fs) Rmdir(ctx context.Context, dir string) error {
err = f.pacer.Call(func() (bool, error) {
var err error
childFiles, err = folders.ListAllFiles(ctx, f.cfg, id)
return shouldRetry(ctx, err)
return f.shouldRetry(ctx, err)
})
if err != nil {
return err
@@ -395,7 +444,7 @@ func (f *Fs) Rmdir(ctx context.Context, dir string) error {
if err != nil && strings.Contains(err.Error(), "404") {
return false, fs.ErrorDirNotFound
}
return shouldRetry(ctx, err)
return f.shouldRetry(ctx, err)
})
if err != nil {
return err
@@ -412,7 +461,7 @@ func (f *Fs) FindLeaf(ctx context.Context, pathID, leaf string) (string, bool, e
err := f.pacer.Call(func() (bool, error) {
var err error
entries, err = folders.ListAllFolders(ctx, f.cfg, pathID)
return shouldRetry(ctx, err)
return f.shouldRetry(ctx, err)
})
if err != nil {
return "", false, err
@@ -437,7 +486,7 @@ func (f *Fs) CreateDir(ctx context.Context, pathID, leaf string) (string, error)
err := f.pacer.CallNoRetry(func() (bool, error) {
var err error
resp, err = folders.CreateFolder(ctx, f.cfg, request)
return shouldRetry(ctx, err)
return f.shouldRetry(ctx, err)
})
if err != nil {
// If folder already exists (409 conflict), try to find it
@@ -525,7 +574,7 @@ func (f *Fs) List(ctx context.Context, dir string) (fs.DirEntries, error) {
err = f.pacer.Call(func() (bool, error) {
var err error
foldersList, err = folders.ListAllFolders(ctx, f.cfg, dirID)
return shouldRetry(ctx, err)
return f.shouldRetry(ctx, err)
})
if err != nil {
return nil, err
@@ -538,7 +587,7 @@ func (f *Fs) List(ctx context.Context, dir string) (fs.DirEntries, error) {
err = f.pacer.Call(func() (bool, error) {
var err error
filesList, err = folders.ListAllFiles(ctx, f.cfg, dirID)
return shouldRetry(ctx, err)
return f.shouldRetry(ctx, err)
})
if err != nil {
return nil, err
@@ -616,7 +665,7 @@ func (f *Fs) Remove(ctx context.Context, remote string) error {
}
err = f.pacer.Call(func() (bool, error) {
err := folders.DeleteFolder(ctx, f.cfg, dirID)
return shouldRetry(ctx, err)
return f.shouldRetry(ctx, err)
})
if err != nil {
return err
@@ -642,7 +691,7 @@ func (f *Fs) NewObject(ctx context.Context, remote string) (fs.Object, error) {
err = f.pacer.Call(func() (bool, error) {
var err error
files, err = folders.ListAllFiles(ctx, f.cfg, dirID)
return shouldRetry(ctx, err)
return f.shouldRetry(ctx, err)
})
if err != nil {
return nil, err
@@ -720,7 +769,7 @@ func (f *Fs) About(ctx context.Context) (*fs.Usage, error) {
err := f.pacer.Call(func() (bool, error) {
var err error
internxtLimit, err = users.GetLimit(ctx, f.cfg)
return shouldRetry(ctx, err)
return f.shouldRetry(ctx, err)
})
if err != nil {
return nil, err
@@ -730,7 +779,7 @@ func (f *Fs) About(ctx context.Context) (*fs.Usage, error) {
err = f.pacer.Call(func() (bool, error) {
var err error
internxtUsage, err = users.GetUsage(ctx, f.cfg)
return shouldRetry(ctx, err)
return f.shouldRetry(ctx, err)
})
if err != nil {
return nil, err
@@ -776,7 +825,7 @@ func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (io.ReadClo
err := o.f.pacer.Call(func() (bool, error) {
var err error
stream, err = buckets.DownloadFileStream(ctx, o.f.cfg, o.id, rangeValue)
return shouldRetry(ctx, err)
return o.f.shouldRetry(ctx, err)
})
if err != nil {
return nil, err
@@ -826,7 +875,7 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
return false, nil
}
}
return shouldRetry(ctx, err)
return o.f.shouldRetry(ctx, err)
})
if err != nil {
return fmt.Errorf("failed to rename existing file to backup: %w", err)
@@ -847,7 +896,7 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
src.Size(),
src.ModTime(ctx),
)
return shouldRetry(ctx, err)
return o.f.shouldRetry(ctx, err)
})
if err != nil && isEmptyFileLimitError(err) {
@@ -885,7 +934,7 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
}
}
}
return shouldRetry(ctx, err)
return o.f.shouldRetry(ctx, err)
})
if err != nil {
fs.Errorf(o.f, "Failed to delete backup file %s.%s (UUID: %s): %v. This may leave an orphaned backup file.",
@@ -939,7 +988,7 @@ func (o *Object) recoverFromTimeoutConflict(ctx context.Context, uploadErr error
checkErr := o.f.pacer.Call(func() (bool, error) {
existingFile, err := o.f.preUploadCheck(ctx, encodedName, dirID)
if err != nil {
return shouldRetry(ctx, err)
return o.f.shouldRetry(ctx, err)
}
if existingFile != nil {
name := strings.TrimSuffix(baseName, filepath.Ext(baseName))
@@ -978,7 +1027,7 @@ func (o *Object) restoreBackupFile(ctx context.Context, backupUUID, origName, or
_ = o.f.pacer.Call(func() (bool, error) {
err := files.RenameFile(ctx, o.f.cfg, backupUUID,
o.f.opt.Encoding.FromStandardName(origName), origType)
return shouldRetry(ctx, err)
return o.f.shouldRetry(ctx, err)
})
}
@@ -986,6 +1035,6 @@ func (o *Object) restoreBackupFile(ctx context.Context, backupUUID, origName, or
func (o *Object) Remove(ctx context.Context) error {
return o.f.pacer.Call(func() (bool, error) {
err := files.DeleteFile(ctx, o.f.cfg, o.uuid)
return shouldRetry(ctx, err)
return o.f.shouldRetry(ctx, err)
})
}

2
go.mod
View File

@@ -45,7 +45,7 @@ require (
github.com/golang-jwt/jwt/v5 v5.3.0
github.com/google/uuid v1.6.0
github.com/hanwen/go-fuse/v2 v2.9.0
github.com/internxt/rclone-adapter v0.0.0-20260130171252-c3c6ebb49276
github.com/internxt/rclone-adapter v0.0.0-20260213125353-6f59c89fcb7c
github.com/jcmturner/gokrb5/v8 v8.4.4
github.com/jlaffaye/ftp v0.2.1-0.20240918233326-1b970516f5d3
github.com/josephspurrier/goversioninfo v1.5.0

2
go.sum
View File

@@ -425,6 +425,8 @@ github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2
github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw=
github.com/internxt/rclone-adapter v0.0.0-20260130171252-c3c6ebb49276 h1:PTJPYovznNqc9t/9MjvtqhrgEVC9OiK75ZPL6hqm6gM=
github.com/internxt/rclone-adapter v0.0.0-20260130171252-c3c6ebb49276/go.mod h1:vdPya4AIcDjvng4ViaAzqjegJf0VHYpYHQguFx5xBp0=
github.com/internxt/rclone-adapter v0.0.0-20260213125353-6f59c89fcb7c h1:r+KtxPyrhsYeNbsfeqTfEM8xRdwgV6LuNhLZxpXecb4=
github.com/internxt/rclone-adapter v0.0.0-20260213125353-6f59c89fcb7c/go.mod h1:vdPya4AIcDjvng4ViaAzqjegJf0VHYpYHQguFx5xBp0=
github.com/jcmturner/aescts/v2 v2.0.0 h1:9YKLH6ey7H4eDBXW8khjYslgyqG2xZikXP0EQFKrle8=
github.com/jcmturner/aescts/v2 v2.0.0/go.mod h1:AiaICIRyfYg35RUkr8yESTqvSy7csK90qZ5xfvvsoNs=
github.com/jcmturner/dnsutils/v2 v2.0.0 h1:lltnkeZGL0wILNvrNiVCR6Ro5PGU/SeBvVO/8c/iPbo=