From 0c7da5380957369328ee728ce1cd7c097d59706c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Serdar=20=C3=96zer?= Date: Wed, 28 Jan 2026 16:04:42 +0100 Subject: [PATCH 1/2] feat: upload stream functionality added - UploadStream function upload by chunking of size 4MB and with 5 go routines - This function is used when file size is bigger then 32MB - These magic numbers are based on fog/azurebs library, check in this link https://github.com/fog/fog-azure-rm/blob/68ea3f7dd14c0a524146f1e426fa2ad1f6192cc8/lib/fog/azurerm/constants.rb --- azurebs/client/client.go | 43 +++++++-- azurebs/client/client_test.go | 25 +++++ .../client/clientfakes/fake_storage_client.go | 94 ++++++++++++++---- azurebs/client/storage_client.go | 96 +++++++++++++++---- azurebs/integration/assertions.go | 65 ++++++++++++- azurebs/integration/general_azure_test.go | 5 + 6 files changed, 279 insertions(+), 49 deletions(-) diff --git a/azurebs/client/client.go b/azurebs/client/client.go index caaa6f6..28fd2c9 100644 --- a/azurebs/client/client.go +++ b/azurebs/client/client.go @@ -15,6 +15,17 @@ type AzBlobstore struct { storageClient StorageClient } +// Single blob put threshold is 32MB +const singleBlobPutThreshold = int64(32 * 1024 * 1024) + +func getFileSize(source *os.File) (int64, error) { + fileInfo, err := source.Stat() + if err != nil { + return 0, fmt.Errorf("failed to get file stat: %w", err) + } + return fileInfo.Size(), nil +} + func New(storageClient StorageClient) (AzBlobstore, error) { return AzBlobstore{storageClient: storageClient}, nil } @@ -30,24 +41,36 @@ func (client *AzBlobstore) Put(sourceFilePath string, dest string) error { return err } defer source.Close() //nolint:errcheck - - md5, err := client.storageClient.Upload(source, dest) + fileSize, err := getFileSize(source) if err != nil { - return fmt.Errorf("upload failure: %w", err) + return err } + if fileSize <= singleBlobPutThreshold { + md5, err := client.storageClient.Upload(source, dest) + if err != nil { + return fmt.Errorf("upload failure: %w", err) + } - if !bytes.Equal(sourceMD5, md5) { - slog.Error("Upload failed due to MD5 mismatch, deleting blob", "blob", dest, "expected_md5", fmt.Sprintf("%x", sourceMD5), "received_md5", fmt.Sprintf("%x", md5)) + if !bytes.Equal(sourceMD5, md5) { + slog.Error("Upload failed due to MD5 mismatch, deleting blob", "blob", dest, "expected_md5", fmt.Sprintf("%x", sourceMD5), "received_md5", fmt.Sprintf("%x", md5)) - err := client.storageClient.Delete(dest) - if err != nil { - slog.Error("Failed to delete blob after MD5 mismatch", "blob", dest, "error", err) + err := client.storageClient.Delete(dest) + if err != nil { + slog.Error("Failed to delete blob after MD5 mismatch", "blob", dest, "error", err) + } + return fmt.Errorf("MD5 mismatch: expected %x, got %x", sourceMD5, md5) + } + + slog.Debug("MD5 verification passed", "blob", dest, "md5", fmt.Sprintf("%x", md5)) + + } else { + err := client.storageClient.UploadStream(source, dest) + if err != nil { + return fmt.Errorf("upload failure: %w", err) } - return fmt.Errorf("MD5 mismatch: expected %x, got %x", sourceMD5, md5) } - slog.Debug("MD5 verification passed", "blob", dest, "md5", fmt.Sprintf("%x", md5)) return nil } diff --git a/azurebs/client/client_test.go b/azurebs/client/client_test.go index 0c86998..86953f6 100644 --- a/azurebs/client/client_test.go +++ b/azurebs/client/client_test.go @@ -32,6 +32,31 @@ var _ = Describe("Client", func() { Expect(dest).To(Equal("target/blob")) }) + It("uploads a file with UploadStream", func() { + storageClient := clientfakes.FakeStorageClient{} + + azBlobstore, err := client.New(&storageClient) + Expect(err).ToNot(HaveOccurred()) + + file, _ := os.CreateTemp("", "tmpfile-test-upload") //nolint:errcheck + defer os.Remove(file.Name()) + + contentSize := 1024 * 1024 * 64 // 64MB + content := make([]byte, contentSize) + for i := range contentSize { + content[i] = '0' + } + _, err = file.Write(content) + + azBlobstore.Put(file.Name(), "target/blob") //nolint:errcheck + + Expect(storageClient.UploadStreamCallCount()).To(Equal(1)) + source, dest := storageClient.UploadStreamArgsForCall(0) + + Expect(source).To(BeAssignableToTypeOf((*os.File)(nil))) + Expect(dest).To(Equal("target/blob")) + }) + It("skips the upload if the md5 cannot be calculated from the file", func() { storageClient := clientfakes.FakeStorageClient{} diff --git a/azurebs/client/clientfakes/fake_storage_client.go b/azurebs/client/clientfakes/fake_storage_client.go index 11b8530..fb1f03b 100644 --- a/azurebs/client/clientfakes/fake_storage_client.go +++ b/azurebs/client/clientfakes/fake_storage_client.go @@ -133,6 +133,18 @@ type FakeStorageClient struct { result1 []byte result2 error } + UploadStreamStub func(io.ReadSeekCloser, string) error + uploadStreamMutex sync.RWMutex + uploadStreamArgsForCall []struct { + arg1 io.ReadSeekCloser + arg2 string + } + uploadStreamReturns struct { + result1 error + } + uploadStreamReturnsOnCall map[int]struct { + result1 error + } invocations map[string][][]interface{} invocationsMutex sync.RWMutex } @@ -756,29 +768,71 @@ func (fake *FakeStorageClient) UploadReturnsOnCall(i int, result1 []byte, result }{result1, result2} } +func (fake *FakeStorageClient) UploadStream(arg1 io.ReadSeekCloser, arg2 string) error { + fake.uploadStreamMutex.Lock() + ret, specificReturn := fake.uploadStreamReturnsOnCall[len(fake.uploadStreamArgsForCall)] + fake.uploadStreamArgsForCall = append(fake.uploadStreamArgsForCall, struct { + arg1 io.ReadSeekCloser + arg2 string + }{arg1, arg2}) + stub := fake.UploadStreamStub + fakeReturns := fake.uploadStreamReturns + fake.recordInvocation("UploadStream", []interface{}{arg1, arg2}) + fake.uploadStreamMutex.Unlock() + if stub != nil { + return stub(arg1, arg2) + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeStorageClient) UploadStreamCallCount() int { + fake.uploadStreamMutex.RLock() + defer fake.uploadStreamMutex.RUnlock() + return len(fake.uploadStreamArgsForCall) +} + +func (fake *FakeStorageClient) UploadStreamCalls(stub func(io.ReadSeekCloser, string) error) { + fake.uploadStreamMutex.Lock() + defer fake.uploadStreamMutex.Unlock() + fake.UploadStreamStub = stub +} + +func (fake *FakeStorageClient) UploadStreamArgsForCall(i int) (io.ReadSeekCloser, string) { + fake.uploadStreamMutex.RLock() + defer fake.uploadStreamMutex.RUnlock() + argsForCall := fake.uploadStreamArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2 +} + +func (fake *FakeStorageClient) UploadStreamReturns(result1 error) { + fake.uploadStreamMutex.Lock() + defer fake.uploadStreamMutex.Unlock() + fake.UploadStreamStub = nil + fake.uploadStreamReturns = struct { + result1 error + }{result1} +} + +func (fake *FakeStorageClient) UploadStreamReturnsOnCall(i int, result1 error) { + fake.uploadStreamMutex.Lock() + defer fake.uploadStreamMutex.Unlock() + fake.UploadStreamStub = nil + if fake.uploadStreamReturnsOnCall == nil { + fake.uploadStreamReturnsOnCall = make(map[int]struct { + result1 error + }) + } + fake.uploadStreamReturnsOnCall[i] = struct { + result1 error + }{result1} +} + func (fake *FakeStorageClient) Invocations() map[string][][]interface{} { fake.invocationsMutex.RLock() defer fake.invocationsMutex.RUnlock() - fake.copyMutex.RLock() - defer fake.copyMutex.RUnlock() - fake.deleteMutex.RLock() - defer fake.deleteMutex.RUnlock() - fake.deleteRecursiveMutex.RLock() - defer fake.deleteRecursiveMutex.RUnlock() - fake.downloadMutex.RLock() - defer fake.downloadMutex.RUnlock() - fake.ensureContainerExistsMutex.RLock() - defer fake.ensureContainerExistsMutex.RUnlock() - fake.existsMutex.RLock() - defer fake.existsMutex.RUnlock() - fake.listMutex.RLock() - defer fake.listMutex.RUnlock() - fake.propertiesMutex.RLock() - defer fake.propertiesMutex.RUnlock() - fake.signedUrlMutex.RLock() - defer fake.signedUrlMutex.RUnlock() - fake.uploadMutex.RLock() - defer fake.uploadMutex.RUnlock() copiedInvocations := map[string][][]interface{}{} for key, value := range fake.invocations { copiedInvocations[key] = value diff --git a/azurebs/client/storage_client.go b/azurebs/client/storage_client.go index 7ad72d6..a03e4eb 100644 --- a/azurebs/client/storage_client.go +++ b/azurebs/client/storage_client.go @@ -31,6 +31,11 @@ type StorageClient interface { dest string, ) ([]byte, error) + UploadStream( + source io.ReadSeekCloser, + dest string, + ) error + Download( source string, dest *os.File, @@ -68,6 +73,36 @@ type StorageClient interface { EnsureContainerExists() error } +// 4 MB of block size +const blockSize = int64(4 * 1024 * 1024) + +// number of go routines +const maxConcurrency = 5 + +func createContext(dsc DefaultStorageClient) (context.Context, context.CancelFunc, error) { + var ctx context.Context + var cancel context.CancelFunc + + if dsc.storageConfig.Timeout != "" { + timeoutInt, err := strconv.Atoi(dsc.storageConfig.Timeout) + timeout := time.Duration(timeoutInt) * time.Second + if timeout < 1 && err == nil { + slog.Info("Invalid time, need at least 1 second", "timeout", dsc.storageConfig.Timeout) + return nil, nil, fmt.Errorf("invalid time: %w", err) + } + if err != nil { + slog.Info("Invalid timeout format, need seconds as number e.g. 30s", "timeout", dsc.storageConfig.Timeout) + return nil, nil, fmt.Errorf("invalid timeout format: %w", err) + } + ctx, cancel = context.WithTimeout(context.Background(), timeout) + } else { + ctx, cancel = context.WithCancel(context.Background()) + } + + return ctx, cancel, nil + +} + type DefaultStorageClient struct { credential *azblob.SharedKeyCredential serviceURL string @@ -91,26 +126,15 @@ func (dsc DefaultStorageClient) Upload( ) ([]byte, error) { blobURL := fmt.Sprintf("%s/%s", dsc.serviceURL, dest) - var ctx context.Context - var cancel context.CancelFunc - if dsc.storageConfig.Timeout != "" { - timeoutInt, err := strconv.Atoi(dsc.storageConfig.Timeout) - timeout := time.Duration(timeoutInt) * time.Second - if timeout < 1 && err == nil { - slog.Info("Invalid time, need at least 1 second", "timeout", dsc.storageConfig.Timeout) - return nil, fmt.Errorf("invalid time: %w", err) - } - if err != nil { - slog.Info("Invalid timeout format, need seconds as number e.g. 30s", "timeout", dsc.storageConfig.Timeout) - return nil, fmt.Errorf("invalid timeout format: %w", err) - } - slog.Info("Uploading blob to container", "container", dsc.storageConfig.ContainerName, "blob", dest, "url", blobURL, "timeout", timeout.String()) - - ctx, cancel = context.WithTimeout(context.Background(), timeout) + slog.Info("Uploading blob to container", "container", dsc.storageConfig.ContainerName, "blob", dest, "url", blobURL, "timeout", dsc.storageConfig.Timeout) } else { slog.Info("Uploading blob to container", "container", dsc.storageConfig.ContainerName, "blob", dest, "url", blobURL) - ctx, cancel = context.WithCancel(context.Background()) + } + + ctx, cancel, err := createContext(dsc) + if err != nil { + return nil, err } defer cancel() @@ -118,6 +142,7 @@ func (dsc DefaultStorageClient) Upload( if err != nil { return nil, err } + uploadResponse, err := client.Upload(ctx, source, nil) if err != nil { if dsc.storageConfig.Timeout != "" && errors.Is(err, context.DeadlineExceeded) { @@ -127,7 +152,42 @@ func (dsc DefaultStorageClient) Upload( } slog.Info("Successfully uploaded blob", "container", dsc.storageConfig.ContainerName, "blob", dest) - return uploadResponse.ContentMD5, err + return uploadResponse.ContentMD5, nil +} + +func (dsc DefaultStorageClient) UploadStream( + source io.ReadSeekCloser, + dest string, +) error { + blobURL := fmt.Sprintf("%s/%s", dsc.serviceURL, dest) + + if dsc.storageConfig.Timeout != "" { + slog.Info("UploadStreaming blob to container", "container", dsc.storageConfig.ContainerName, "blob", dest, "url", blobURL, "timeout", dsc.storageConfig.Timeout) + } else { + slog.Info("UploadStreaming blob to container", "container", dsc.storageConfig.ContainerName, "blob", dest, "url", blobURL) + } + + ctx, cancel, err := createContext(dsc) + if err != nil { + return err + } + defer cancel() + + client, err := blockblob.NewClientWithSharedKeyCredential(blobURL, dsc.credential, nil) + if err != nil { + return err + } + + _, err = client.UploadStream(ctx, source, &azblob.UploadStreamOptions{BlockSize: blockSize, Concurrency: maxConcurrency}) + if err != nil { + if dsc.storageConfig.Timeout != "" && errors.Is(err, context.DeadlineExceeded) { + return fmt.Errorf("upload failed: timeout of %s reached while uploading %s", dsc.storageConfig.Timeout, dest) + } + return fmt.Errorf("upload failure: %w", err) + } + + slog.Info("Successfully uploaded blob", "container", dsc.storageConfig.ContainerName, "blob", dest) + return nil } func (dsc DefaultStorageClient) Download( diff --git a/azurebs/integration/assertions.go b/azurebs/integration/assertions.go index 4b98d77..624d928 100644 --- a/azurebs/integration/assertions.go +++ b/azurebs/integration/assertions.go @@ -2,6 +2,8 @@ package integration import ( "bytes" + "crypto/md5" + "io" "os" "github.com/cloudfoundry/storage-cli/azurebs/config" @@ -46,7 +48,7 @@ func AssertPutHonorsCustomTimeout(cliPath string, cfg *config.AZStorageConfig) { Expect(err).ToNot(HaveOccurred()) Expect(sess.ExitCode()).To(BeZero()) Expect(sess.Err).Should(gbytes.Say(`"msg":"Uploading blob to container`)) - Expect(sess.Err).Should(gbytes.Say(`"timeout":"3s"`)) + Expect(sess.Err).Should(gbytes.Say(`"timeout":"3"`)) sess, err = RunCli(cliPath, configPath, storageType, "delete", blob) Expect(err).ToNot(HaveOccurred()) @@ -397,6 +399,67 @@ func AssertOnCopy(cliPath string, cfg *config.AZStorageConfig) { Expect(cliSession.ExitCode()).To(BeZero()) } +func AssertOnUploadStream(cliPath string, cfg *config.AZStorageConfig) { + configPath := MakeConfigFile(cfg) + contentSize := 1024 * 1024 * 64 //64MB + defer os.Remove(configPath) //nolint:errcheck + + // Create a blob to upload + blobName := GenerateRandomString() + blobContent := GenerateRandomString(contentSize) + contentFile := MakeContentFile(blobContent) + defer os.Remove(contentFile) //nolint:errcheck + + // Calculate MD5 of original file + originalFile, err := os.Open(contentFile) + Expect(err).ToNot(HaveOccurred()) + originalHash := md5.New() + _, err = io.Copy(originalHash, originalFile) + Expect(err).ToNot(HaveOccurred()) + originalFile.Close() //nolint:errcheck + originalMD5 := originalHash.Sum(nil) + + cliSession, err := RunCli(cliPath, configPath, storageType, "put", contentFile, blobName) + Expect(err).ToNot(HaveOccurred()) + Expect(cliSession.ExitCode()).To(BeZero()) + Expect(cliSession.Err).Should(gbytes.Say(`"msg":"UploadStreaming blob to container"`)) + + // Assert that the copied blob exists + cliSession, err = RunCli(cliPath, configPath, storageType, "exists", blobName) + Expect(err).ToNot(HaveOccurred()) + Expect(cliSession.ExitCode()).To(BeZero()) + + // Compare the content of the original and downloaded blobs + tmpLocalFile, err := os.CreateTemp("", "download-big-file") + Expect(err).ToNot(HaveOccurred()) + err = tmpLocalFile.Close() + Expect(err).ToNot(HaveOccurred()) + defer os.Remove(tmpLocalFile.Name()) //nolint:errcheck + cliSession, err = RunCli(cliPath, configPath, storageType, "get", blobName, tmpLocalFile.Name()) + Expect(err).ToNot(HaveOccurred()) + Expect(cliSession.ExitCode()).To(BeZero()) + + // Verify file size matches + downloadedInfo, err := os.Stat(tmpLocalFile.Name()) + Expect(err).ToNot(HaveOccurred()) + Expect(downloadedInfo.Size()).To(Equal(int64(contentSize))) + + // Verify MD5 matches + downloadedFile, err := os.Open(tmpLocalFile.Name()) + Expect(err).ToNot(HaveOccurred()) + downloadedHash := md5.New() + _, err = io.Copy(downloadedHash, downloadedFile) + Expect(err).ToNot(HaveOccurred()) + downloadedFile.Close() //nolint:errcheck + downloadedMD5 := downloadedHash.Sum(nil) + Expect(downloadedMD5).To(Equal(originalMD5)) + + // Clean up + cliSession, err = RunCli(cliPath, configPath, storageType, "delete", blobName) + Expect(err).ToNot(HaveOccurred()) + Expect(cliSession.ExitCode()).To(BeZero()) +} + func CreateRandomBlobs(cliPath string, cfg *config.AZStorageConfig, count int, prefix string) { configPath := MakeConfigFile(cfg) defer os.Remove(configPath) //nolint:errcheck diff --git a/azurebs/integration/general_azure_test.go b/azurebs/integration/general_azure_test.go index b8f0d84..bedc8a5 100644 --- a/azurebs/integration/general_azure_test.go +++ b/azurebs/integration/general_azure_test.go @@ -97,6 +97,11 @@ var _ = Describe("General testing for all Azure regions", func() { configurations, ) + DescribeTable("Invoking `put` with file size bigger than singleBlobPutThreshold(32MB)", + func(cfg *config.AZStorageConfig) { integration.AssertOnUploadStream(cliPath, cfg) }, + configurations, + ) + Describe("Invoking `put`", func() { var blobName string var configPath string From b56050948f3e7a48446bda62532e5d0cd1ddf216 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Serdar=20=C3=96zer?= Date: Wed, 28 Jan 2026 16:22:08 +0100 Subject: [PATCH 2/2] fix: linting errors are fixed --- azurebs/client/client_test.go | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/azurebs/client/client_test.go b/azurebs/client/client_test.go index 86953f6..caf0ce0 100644 --- a/azurebs/client/client_test.go +++ b/azurebs/client/client_test.go @@ -1,6 +1,7 @@ package client_test import ( + "bytes" "errors" "os" "runtime" @@ -39,14 +40,12 @@ var _ = Describe("Client", func() { Expect(err).ToNot(HaveOccurred()) file, _ := os.CreateTemp("", "tmpfile-test-upload") //nolint:errcheck - defer os.Remove(file.Name()) + defer os.Remove(file.Name()) //nolint:errcheck contentSize := 1024 * 1024 * 64 // 64MB - content := make([]byte, contentSize) - for i := range contentSize { - content[i] = '0' - } - _, err = file.Write(content) + + content := bytes.Repeat([]byte("x"), contentSize) + _, _ = file.Write(content) //nolint:errcheck azBlobstore.Put(file.Name(), "target/blob") //nolint:errcheck