mirror of
https://github.com/rclone/rclone.git
synced 2025-12-06 00:03:32 +00:00
Compare commits
1 Commits
master
...
fix-max-me
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7d97ba0a0f |
@@ -92,7 +92,7 @@ func (mc *multiThreadCopyState) copyChunk(ctx context.Context, chunk int, writer
|
||||
rs = rc
|
||||
} else {
|
||||
// Read the chunk into buffered reader
|
||||
rw := multipart.NewRW()
|
||||
rw := multipart.NewRW().Reserve(size)
|
||||
defer fs.CheckClose(rw, &err)
|
||||
_, err = io.CopyN(rw, rc, size)
|
||||
if err != nil {
|
||||
|
||||
@@ -93,7 +93,7 @@ func UploadMultipart(ctx context.Context, src fs.ObjectInfo, in io.Reader, opt U
|
||||
for partNum := int64(0); !finished; partNum++ {
|
||||
// Get a block of memory from the pool and token which limits concurrency.
|
||||
tokens.Get()
|
||||
rw := NewRW()
|
||||
rw := NewRW().Reserve(chunkSize)
|
||||
if acc != nil {
|
||||
rw.SetAccounting(acc.AccountRead)
|
||||
}
|
||||
|
||||
@@ -182,8 +182,34 @@ func (bp *Pool) release(mem int64) {
|
||||
totalMemory.Release(mem)
|
||||
}
|
||||
|
||||
// Reserve buffers for use. Blocks until they are free.
|
||||
//
|
||||
// Doesn't allocate any memory.
|
||||
//
|
||||
// Must be released by calling GetReserved() which releases 1 buffer or
|
||||
// Release() to release any number of buffers.
|
||||
func (bp *Pool) Reserve(buffers int) {
|
||||
waitTime := time.Millisecond
|
||||
for {
|
||||
err := bp.acquire(int64(buffers) * int64(bp.bufferSize))
|
||||
if err == nil {
|
||||
break
|
||||
}
|
||||
fs.Logf(nil, "Failed to get reservation for buffer, waiting for %v: %v", waitTime, err)
|
||||
time.Sleep(waitTime)
|
||||
waitTime *= 2
|
||||
}
|
||||
}
|
||||
|
||||
// Release previously Reserved buffers.
|
||||
//
|
||||
// Doesn't free any memory.
|
||||
func (bp *Pool) Release(buffers int) {
|
||||
bp.release(int64(buffers) * int64(bp.bufferSize))
|
||||
}
|
||||
|
||||
// Get a buffer from the pool or allocate one
|
||||
func (bp *Pool) Get() []byte {
|
||||
func (bp *Pool) getBlock(reserved bool) []byte {
|
||||
bp.mu.Lock()
|
||||
var buf []byte
|
||||
waitTime := time.Millisecond
|
||||
@@ -193,9 +219,11 @@ func (bp *Pool) Get() []byte {
|
||||
break
|
||||
} else {
|
||||
var err error
|
||||
bp.mu.Unlock()
|
||||
err = bp.acquire(int64(bp.bufferSize))
|
||||
bp.mu.Lock()
|
||||
if !reserved {
|
||||
bp.mu.Unlock()
|
||||
err = bp.acquire(int64(bp.bufferSize))
|
||||
bp.mu.Lock()
|
||||
}
|
||||
if err == nil {
|
||||
buf, err = bp.alloc(bp.bufferSize)
|
||||
if err == nil {
|
||||
@@ -217,6 +245,16 @@ func (bp *Pool) Get() []byte {
|
||||
return buf
|
||||
}
|
||||
|
||||
// Get a buffer from the pool or allocate one
|
||||
func (bp *Pool) Get() []byte {
|
||||
return bp.getBlock(false)
|
||||
}
|
||||
|
||||
// GetReserved gets a reserved buffer from the pool or allocates one.
|
||||
func (bp *Pool) GetReserved() []byte {
|
||||
return bp.getBlock(true)
|
||||
}
|
||||
|
||||
// freeBuffer returns mem to the os if required - call with lock held
|
||||
func (bp *Pool) freeBuffer(mem []byte) {
|
||||
err := bp.free(mem)
|
||||
|
||||
@@ -35,6 +35,8 @@ type RW struct {
|
||||
// Read side Variables
|
||||
out int // offset we are reading from
|
||||
reads int // count how many times the data has been read
|
||||
|
||||
reserved int // number of buffers reserved
|
||||
}
|
||||
|
||||
var (
|
||||
@@ -59,6 +61,20 @@ func NewRW(pool *Pool) *RW {
|
||||
return rw
|
||||
}
|
||||
|
||||
// Reserve bytes of memory.
|
||||
//
|
||||
// Reserve, but don't allocation n bytes of memory.
|
||||
//
|
||||
// This is rounded up to the nearest buffer page size.
|
||||
func (rw *RW) Reserve(n int64) *RW {
|
||||
rw.mu.Lock()
|
||||
defer rw.mu.Unlock()
|
||||
buffers := int((n + int64(rw.pool.bufferSize) - 1) / int64(rw.pool.bufferSize))
|
||||
rw.pool.Reserve(buffers)
|
||||
rw.reserved += buffers
|
||||
return rw
|
||||
}
|
||||
|
||||
// SetAccounting should be provided with a function which will be
|
||||
// called after every read from the RW.
|
||||
//
|
||||
@@ -200,7 +216,12 @@ func (rw *RW) writePage() (page []byte) {
|
||||
if len(rw.pages) > 0 && rw.lastOffset < rw.pool.bufferSize {
|
||||
return rw.pages[len(rw.pages)-1][rw.lastOffset:]
|
||||
}
|
||||
page = rw.pool.Get()
|
||||
if rw.reserved > 0 {
|
||||
page = rw.pool.GetReserved()
|
||||
rw.reserved--
|
||||
} else {
|
||||
page = rw.pool.Get()
|
||||
}
|
||||
rw.pages = append(rw.pages, page)
|
||||
rw.lastOffset = 0
|
||||
return page
|
||||
@@ -321,6 +342,10 @@ func (rw *RW) Close() error {
|
||||
rw.pool.Put(page)
|
||||
}
|
||||
rw.pages = nil
|
||||
if rw.reserved > 0 {
|
||||
rw.pool.Release(rw.reserved)
|
||||
rw.reserved = 0
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user