diff --git a/backend/azureblob/azureblob.go b/backend/azureblob/azureblob.go index 8c5c182c7..788c91b83 100644 --- a/backend/azureblob/azureblob.go +++ b/backend/azureblob/azureblob.go @@ -1751,6 +1751,7 @@ func (f *Fs) copyMultipart(ctx context.Context, remote, dstContainer, dstPath st numParts = (srcSize-1)/partSize + 1 blockIDs = make([]string, numParts) // list of blocks for finalize g, gCtx = errgroup.WithContext(ctx) + checker = newCheckForInvalidBlockOrBlob("copy", o) ) g.SetLimit(f.opt.CopyConcurrency) @@ -1780,8 +1781,13 @@ func (f *Fs) copyMultipart(ctx context.Context, remote, dstContainer, dstPath st } fs.Debugf(o, "multipart copy: starting chunk %d size %v offset %v/%v", partNum, fs.SizeSuffix(options.Range.Count), fs.SizeSuffix(options.Range.Offset), fs.SizeSuffix(srcSize)) err := f.pacer.Call(func() (bool, error) { + checker.start() _, err := dstBlockBlobSVC.StageBlockFromURL(ctx, blockID, srcURL, &options) + checker.stop() if err != nil { + if checker.checkErr(ctx, err) { + return true, err + } return f.shouldRetry(ctx, err) } return false, nil @@ -2393,6 +2399,7 @@ type azChunkWriter struct { blocks []azBlock // list of blocks for finalize o *Object bic *blockIDCreator + checker *checkForInvalidBlockOrBlob } // OpenChunkWriter returns the chunk size and a ChunkWriter @@ -2449,6 +2456,7 @@ func (f *Fs) OpenChunkWriter(ctx context.Context, remote string, src fs.ObjectIn f: f, ui: ui, o: o, + checker: newCheckForInvalidBlockOrBlob("upload", o), } info = fs.ChunkWriterInfo{ ChunkSize: int64(partSize), @@ -2463,6 +2471,81 @@ func (f *Fs) OpenChunkWriter(ctx context.Context, remote string, src fs.ObjectIn return info, chunkWriter, nil } +// isInvalidBlockOrBlob looks for the InvalidBlockOrBlob error in err +// returning true if it is found +func isInvalidBlockOrBlob(err error) bool { + var storageErr *azcore.ResponseError + if errors.As(err, &storageErr) { + return storageErr.ErrorCode == string(bloberror.InvalidBlobOrBlock) + } + return false +} + +// Struct to hold state for checking for InvalidBlockOrBlob +type checkForInvalidBlockOrBlob struct { + startMu sync.Mutex // hold when starting transactions + inFlight sync.WaitGroup // transactions in flight + what string // "copy" or "upload" + o *Object // object we are working on + cleared bool // set if we have cleared the uncommitted blocks - we only do this once +} + +// Make InvalidBlockOrBlob checker +func newCheckForInvalidBlockOrBlob(what string, o *Object) *checkForInvalidBlockOrBlob { + return &checkForInvalidBlockOrBlob{ + what: what, + o: o, + } +} + +// start marks that there is a transaction in progress +func (c *checkForInvalidBlockOrBlob) start() { + c.startMu.Lock() + defer c.startMu.Unlock() + c.inFlight.Add(1) +} + +// stop marks that this transaction has finished +func (c *checkForInvalidBlockOrBlob) stop() { + c.inFlight.Done() +} + +// checkErr looks for the InvalidBlockOrBlob error in err, and if it +// is found, it clears uncommitted blocks in o to clear the error. +// +// It returns a bool indicating whether the error was found or not. +// +// See https://gauravmantri.com/2013/05/18/windows-azure-blob-storage-dealing-with-the-specified-blob-or-block-content-is-invalid-error/ +func (c *checkForInvalidBlockOrBlob) checkErr(ctx context.Context, err error) (result bool) { + // defer log.Trace(c.o, "err=%#v, what=%q", err, c.what)("result=%v", &result) + if !isInvalidBlockOrBlob(err) { + return false + } + + // Prevent more transactions starting + c.startMu.Lock() + defer c.startMu.Unlock() + + if c.cleared { + fs.Debugf(c.o, "multipart %s: received %s error: already cleared", c.what, bloberror.InvalidBlobOrBlock) + return true + } + + // Wait for any other outstanding transactions to finish + c.inFlight.Wait() + + // Clear uncommitted blocks + fs.Debugf(c.o, "multipart %s: received %s error: clearing uncommitted blocks and retrying", c.what, bloberror.InvalidBlobOrBlock) + clearErr := c.o.clearUncommittedBlocks(ctx) + if clearErr != nil { + fs.Debugf(c.o, "multipart %s: error fixing %s: %v", c.what, bloberror.InvalidBlobOrBlock, clearErr) + } + fs.Debugf(c.o, "multipart %s: fixed %s", c.what, bloberror.InvalidBlobOrBlock) + c.cleared = true + + return true +} + // WriteChunk will write chunk number with reader bytes, where chunk number >= 0 func (w *azChunkWriter) WriteChunk(ctx context.Context, chunkNumber int, reader io.ReadSeeker) (int64, error) { if chunkNumber < 0 { @@ -2503,8 +2586,13 @@ func (w *azChunkWriter) WriteChunk(ctx context.Context, chunkNumber int, reader // Specify the transactional md5 for the body, to be validated by the service. TransactionalValidation: blob.TransferValidationTypeMD5(md5sum), } + w.checker.start() _, err = w.ui.blb.StageBlock(ctx, blockID, &readSeekCloser{Reader: reader, Seeker: reader}, &options) + w.checker.stop() if err != nil { + if w.checker.checkErr(ctx, err) { + return true, err + } if chunkNumber <= 8 { return w.f.shouldRetry(ctx, err) } diff --git a/backend/azureblob/azureblob_internal_test.go b/backend/azureblob/azureblob_internal_test.go index 6c96dbb97..67669775d 100644 --- a/backend/azureblob/azureblob_internal_test.go +++ b/backend/azureblob/azureblob_internal_test.go @@ -3,22 +3,20 @@ package azureblob import ( + "context" "encoding/base64" + "strings" "testing" + "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blockblob" + "github.com/rclone/rclone/fs" + "github.com/rclone/rclone/fstest" + "github.com/rclone/rclone/fstest/fstests" + "github.com/rclone/rclone/lib/random" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) -func (f *Fs) InternalTest(t *testing.T) { - // Check first feature flags are set on this - // remote - enabled := f.Features().SetTier - assert.True(t, enabled) - enabled = f.Features().GetTier - assert.True(t, enabled) -} - func TestBlockIDCreator(t *testing.T) { // Check creation and random number bic, err := newBlockIDCreator() @@ -46,3 +44,108 @@ func TestBlockIDCreator(t *testing.T) { assert.ErrorContains(t, bic.checkID(chunkNumber+1, got), "expecting decoded") assert.ErrorContains(t, bic2.checkID(chunkNumber, got), "random bytes") } + +func (f *Fs) testFeatures(t *testing.T) { + // Check first feature flags are set on this remote + enabled := f.Features().SetTier + assert.True(t, enabled) + enabled = f.Features().GetTier + assert.True(t, enabled) +} + +type ReadSeekCloser struct { + *strings.Reader +} + +func (r *ReadSeekCloser) Close() error { + return nil +} + +// Stage a block at remote but don't commit it +func (f *Fs) stageBlockWithoutCommit(ctx context.Context, t *testing.T, remote string) { + var ( + containerName, blobPath = f.split(remote) + containerClient = f.cntSVC(containerName) + blobClient = containerClient.NewBlockBlobClient(blobPath) + data = "uncommitted data" + blockID = "1" + blockIDBase64 = base64.StdEncoding.EncodeToString([]byte(blockID)) + ) + r := &ReadSeekCloser{strings.NewReader(data)} + _, err := blobClient.StageBlock(ctx, blockIDBase64, r, nil) + require.NoError(t, err) + + // Verify the block is staged but not committed + blockList, err := blobClient.GetBlockList(ctx, blockblob.BlockListTypeAll, nil) + require.NoError(t, err) + found := false + for _, block := range blockList.UncommittedBlocks { + if *block.Name == blockIDBase64 { + found = true + break + } + } + require.True(t, found, "Block ID not found in uncommitted blocks") +} + +// This tests uploading a blob where it has uncommitted blocks with a different ID size. +// +// https://gauravmantri.com/2013/05/18/windows-azure-blob-storage-dealing-with-the-specified-blob-or-block-content-is-invalid-error/ +// +// TestIntegration/FsMkdir/FsPutFiles/Internal/WriteUncommittedBlocks +func (f *Fs) testWriteUncommittedBlocks(t *testing.T) { + var ( + ctx = context.Background() + remote = "testBlob" + ) + + // Multipart copy the blob please + oldUseCopyBlob, oldCopyCutoff := f.opt.UseCopyBlob, f.opt.CopyCutoff + f.opt.UseCopyBlob = false + f.opt.CopyCutoff = f.opt.ChunkSize + defer func() { + f.opt.UseCopyBlob, f.opt.CopyCutoff = oldUseCopyBlob, oldCopyCutoff + }() + + // Create a blob with uncommitted blocks + f.stageBlockWithoutCommit(ctx, t, remote) + + // Now attempt to overwrite the block with a different sized block ID to provoke this error + + // Check the object does not exist + _, err := f.NewObject(ctx, remote) + require.Equal(t, fs.ErrorObjectNotFound, err) + + // Upload a multipart file over the block with uncommitted chunks of a different ID size + size := 4*int(f.opt.ChunkSize) - 1 + contents := random.String(size) + item := fstest.NewItem(remote, contents, fstest.Time("2001-05-06T04:05:06.499Z")) + o := fstests.PutTestContents(ctx, t, f, &item, contents, true) + + // Check size + assert.Equal(t, int64(size), o.Size()) + + // Create a new blob with uncommitted blocks + newRemote := "testBlob2" + f.stageBlockWithoutCommit(ctx, t, newRemote) + + // Copy over that block + dst, err := f.Copy(ctx, o, newRemote) + require.NoError(t, err) + + // Check basics + assert.Equal(t, int64(size), dst.Size()) + assert.Equal(t, newRemote, dst.Remote()) + + // Check contents + gotContents := fstests.ReadObject(ctx, t, dst, -1) + assert.Equal(t, contents, gotContents) + + // Remove the object + require.NoError(t, dst.Remove(ctx)) +} + +func (f *Fs) InternalTest(t *testing.T) { + t.Run("Features", f.testFeatures) + t.Run("WriteUncommittedBlocks", f.testWriteUncommittedBlocks) +}