mirror of
https://github.com/rclone/rclone.git
synced 2025-12-16 08:13:29 +00:00
Compare commits
1 Commits
v1.72-stab
...
fix-max-me
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7d97ba0a0f |
@@ -92,7 +92,7 @@ func (mc *multiThreadCopyState) copyChunk(ctx context.Context, chunk int, writer
|
|||||||
rs = rc
|
rs = rc
|
||||||
} else {
|
} else {
|
||||||
// Read the chunk into buffered reader
|
// Read the chunk into buffered reader
|
||||||
rw := multipart.NewRW()
|
rw := multipart.NewRW().Reserve(size)
|
||||||
defer fs.CheckClose(rw, &err)
|
defer fs.CheckClose(rw, &err)
|
||||||
_, err = io.CopyN(rw, rc, size)
|
_, err = io.CopyN(rw, rc, size)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|||||||
@@ -93,7 +93,7 @@ func UploadMultipart(ctx context.Context, src fs.ObjectInfo, in io.Reader, opt U
|
|||||||
for partNum := int64(0); !finished; partNum++ {
|
for partNum := int64(0); !finished; partNum++ {
|
||||||
// Get a block of memory from the pool and token which limits concurrency.
|
// Get a block of memory from the pool and token which limits concurrency.
|
||||||
tokens.Get()
|
tokens.Get()
|
||||||
rw := NewRW()
|
rw := NewRW().Reserve(chunkSize)
|
||||||
if acc != nil {
|
if acc != nil {
|
||||||
rw.SetAccounting(acc.AccountRead)
|
rw.SetAccounting(acc.AccountRead)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -182,8 +182,34 @@ func (bp *Pool) release(mem int64) {
|
|||||||
totalMemory.Release(mem)
|
totalMemory.Release(mem)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Reserve buffers for use. Blocks until they are free.
|
||||||
|
//
|
||||||
|
// Doesn't allocate any memory.
|
||||||
|
//
|
||||||
|
// Must be released by calling GetReserved() which releases 1 buffer or
|
||||||
|
// Release() to release any number of buffers.
|
||||||
|
func (bp *Pool) Reserve(buffers int) {
|
||||||
|
waitTime := time.Millisecond
|
||||||
|
for {
|
||||||
|
err := bp.acquire(int64(buffers) * int64(bp.bufferSize))
|
||||||
|
if err == nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
fs.Logf(nil, "Failed to get reservation for buffer, waiting for %v: %v", waitTime, err)
|
||||||
|
time.Sleep(waitTime)
|
||||||
|
waitTime *= 2
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Release previously Reserved buffers.
|
||||||
|
//
|
||||||
|
// Doesn't free any memory.
|
||||||
|
func (bp *Pool) Release(buffers int) {
|
||||||
|
bp.release(int64(buffers) * int64(bp.bufferSize))
|
||||||
|
}
|
||||||
|
|
||||||
// Get a buffer from the pool or allocate one
|
// Get a buffer from the pool or allocate one
|
||||||
func (bp *Pool) Get() []byte {
|
func (bp *Pool) getBlock(reserved bool) []byte {
|
||||||
bp.mu.Lock()
|
bp.mu.Lock()
|
||||||
var buf []byte
|
var buf []byte
|
||||||
waitTime := time.Millisecond
|
waitTime := time.Millisecond
|
||||||
@@ -193,9 +219,11 @@ func (bp *Pool) Get() []byte {
|
|||||||
break
|
break
|
||||||
} else {
|
} else {
|
||||||
var err error
|
var err error
|
||||||
|
if !reserved {
|
||||||
bp.mu.Unlock()
|
bp.mu.Unlock()
|
||||||
err = bp.acquire(int64(bp.bufferSize))
|
err = bp.acquire(int64(bp.bufferSize))
|
||||||
bp.mu.Lock()
|
bp.mu.Lock()
|
||||||
|
}
|
||||||
if err == nil {
|
if err == nil {
|
||||||
buf, err = bp.alloc(bp.bufferSize)
|
buf, err = bp.alloc(bp.bufferSize)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
@@ -217,6 +245,16 @@ func (bp *Pool) Get() []byte {
|
|||||||
return buf
|
return buf
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Get a buffer from the pool or allocate one
|
||||||
|
func (bp *Pool) Get() []byte {
|
||||||
|
return bp.getBlock(false)
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetReserved gets a reserved buffer from the pool or allocates one.
|
||||||
|
func (bp *Pool) GetReserved() []byte {
|
||||||
|
return bp.getBlock(true)
|
||||||
|
}
|
||||||
|
|
||||||
// freeBuffer returns mem to the os if required - call with lock held
|
// freeBuffer returns mem to the os if required - call with lock held
|
||||||
func (bp *Pool) freeBuffer(mem []byte) {
|
func (bp *Pool) freeBuffer(mem []byte) {
|
||||||
err := bp.free(mem)
|
err := bp.free(mem)
|
||||||
|
|||||||
@@ -35,6 +35,8 @@ type RW struct {
|
|||||||
// Read side Variables
|
// Read side Variables
|
||||||
out int // offset we are reading from
|
out int // offset we are reading from
|
||||||
reads int // count how many times the data has been read
|
reads int // count how many times the data has been read
|
||||||
|
|
||||||
|
reserved int // number of buffers reserved
|
||||||
}
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
@@ -59,6 +61,20 @@ func NewRW(pool *Pool) *RW {
|
|||||||
return rw
|
return rw
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Reserve bytes of memory.
|
||||||
|
//
|
||||||
|
// Reserve, but don't allocation n bytes of memory.
|
||||||
|
//
|
||||||
|
// This is rounded up to the nearest buffer page size.
|
||||||
|
func (rw *RW) Reserve(n int64) *RW {
|
||||||
|
rw.mu.Lock()
|
||||||
|
defer rw.mu.Unlock()
|
||||||
|
buffers := int((n + int64(rw.pool.bufferSize) - 1) / int64(rw.pool.bufferSize))
|
||||||
|
rw.pool.Reserve(buffers)
|
||||||
|
rw.reserved += buffers
|
||||||
|
return rw
|
||||||
|
}
|
||||||
|
|
||||||
// SetAccounting should be provided with a function which will be
|
// SetAccounting should be provided with a function which will be
|
||||||
// called after every read from the RW.
|
// called after every read from the RW.
|
||||||
//
|
//
|
||||||
@@ -200,7 +216,12 @@ func (rw *RW) writePage() (page []byte) {
|
|||||||
if len(rw.pages) > 0 && rw.lastOffset < rw.pool.bufferSize {
|
if len(rw.pages) > 0 && rw.lastOffset < rw.pool.bufferSize {
|
||||||
return rw.pages[len(rw.pages)-1][rw.lastOffset:]
|
return rw.pages[len(rw.pages)-1][rw.lastOffset:]
|
||||||
}
|
}
|
||||||
|
if rw.reserved > 0 {
|
||||||
|
page = rw.pool.GetReserved()
|
||||||
|
rw.reserved--
|
||||||
|
} else {
|
||||||
page = rw.pool.Get()
|
page = rw.pool.Get()
|
||||||
|
}
|
||||||
rw.pages = append(rw.pages, page)
|
rw.pages = append(rw.pages, page)
|
||||||
rw.lastOffset = 0
|
rw.lastOffset = 0
|
||||||
return page
|
return page
|
||||||
@@ -321,6 +342,10 @@ func (rw *RW) Close() error {
|
|||||||
rw.pool.Put(page)
|
rw.pool.Put(page)
|
||||||
}
|
}
|
||||||
rw.pages = nil
|
rw.pages = nil
|
||||||
|
if rw.reserved > 0 {
|
||||||
|
rw.pool.Release(rw.reserved)
|
||||||
|
rw.reserved = 0
|
||||||
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user