1
0
mirror of https://github.com/rclone/rclone.git synced 2025-12-06 00:03:32 +00:00
Files
rclone/lib/multipart/multipart.go
Nick Craig-Wood 117d8d9fdb pool: fix deadlock with --max-memory and multipart transfers
Because multipart transfers can need more than one buffer to complete,
if transfers was set very high, it was possible for lots of multipart
transfers to start, grab fewer buffers than chunk size, then deadlock
because no more memory was available.

This fixes the problem by introducing a reservation system which the
multipart transfer uses to ensure it can reserve all the memory for
one chunk before starting.
2025-08-07 12:45:44 +01:00

131 lines
3.4 KiB
Go

// Package multipart implements generic multipart uploading.
package multipart
import (
"context"
"fmt"
"io"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/accounting"
"github.com/rclone/rclone/lib/atexit"
"github.com/rclone/rclone/lib/pacer"
"github.com/rclone/rclone/lib/pool"
"golang.org/x/sync/errgroup"
)
const (
// BufferSize is the default size of the pages used in the reader
BufferSize = pool.BufferSize
)
// NewRW gets a pool.RW using the global pool
func NewRW() *pool.RW {
return pool.NewRW(pool.Global())
}
// UploadMultipartOptions options for the generic multipart upload
type UploadMultipartOptions struct {
Open fs.OpenChunkWriter // thing to call OpenChunkWriter on
OpenOptions []fs.OpenOption // options for OpenChunkWriter
}
// UploadMultipart does a generic multipart upload from src using f as OpenChunkWriter.
//
// in is read seqentially and chunks from it are uploaded in parallel.
//
// It returns the chunkWriter used in case the caller needs to extract any private info from it.
func UploadMultipart(ctx context.Context, src fs.ObjectInfo, in io.Reader, opt UploadMultipartOptions) (chunkWriterOut fs.ChunkWriter, err error) {
info, chunkWriter, err := opt.Open.OpenChunkWriter(ctx, src.Remote(), src, opt.OpenOptions...)
if err != nil {
return nil, fmt.Errorf("multipart upload failed to initialise: %w", err)
}
// make concurrency machinery
concurrency := max(info.Concurrency, 1)
tokens := pacer.NewTokenDispenser(concurrency)
uploadCtx, cancel := context.WithCancel(ctx)
defer cancel()
defer atexit.OnError(&err, func() {
cancel()
if info.LeavePartsOnError {
return
}
fs.Debugf(src, "Cancelling multipart upload")
errCancel := chunkWriter.Abort(ctx)
if errCancel != nil {
fs.Debugf(src, "Failed to cancel multipart upload: %v", errCancel)
}
})()
var (
g, gCtx = errgroup.WithContext(uploadCtx)
finished = false
off int64
size = src.Size()
chunkSize = info.ChunkSize
)
// Do the accounting manually
in, acc := accounting.UnWrapAccounting(in)
for partNum := int64(0); !finished; partNum++ {
// Get a block of memory from the pool and token which limits concurrency.
tokens.Get()
rw := NewRW().Reserve(chunkSize)
if acc != nil {
rw.SetAccounting(acc.AccountRead)
}
free := func() {
// return the memory and token
_ = rw.Close() // Can't return an error
tokens.Put()
}
// Fail fast, in case an errgroup managed function returns an error
// gCtx is cancelled. There is no point in uploading all the other parts.
if gCtx.Err() != nil {
free()
break
}
// Read the chunk
var n int64
n, err = io.CopyN(rw, in, chunkSize)
if err == io.EOF {
if n == 0 && partNum != 0 { // end if no data and if not first chunk
free()
break
}
finished = true
} else if err != nil {
free()
return nil, fmt.Errorf("multipart upload: failed to read source: %w", err)
}
partNum := partNum
partOff := off
off += n
g.Go(func() (err error) {
defer free()
fs.Debugf(src, "multipart upload: starting chunk %d size %v offset %v/%v", partNum, fs.SizeSuffix(n), fs.SizeSuffix(partOff), fs.SizeSuffix(size))
_, err = chunkWriter.WriteChunk(gCtx, int(partNum), rw)
return err
})
}
err = g.Wait()
if err != nil {
return nil, err
}
err = chunkWriter.Close(ctx)
if err != nil {
return nil, fmt.Errorf("multipart upload: failed to finalise: %w", err)
}
return chunkWriter, nil
}