diff --git a/cpp/src/arrow/filesystem/azurefs.cc b/cpp/src/arrow/filesystem/azurefs.cc index 9b3c0c0c1d7..0bad8563397 100644 --- a/cpp/src/arrow/filesystem/azurefs.cc +++ b/cpp/src/arrow/filesystem/azurefs.cc @@ -22,6 +22,7 @@ #include "arrow/filesystem/azurefs.h" #include "arrow/filesystem/azurefs_internal.h" +#include "arrow/io/memory.h" // idenfity.hpp triggers -Wattributes warnings cause -Werror builds to fail, // so disable it for this file with pragmas. @@ -144,6 +145,9 @@ Status AzureOptions::ExtractFromUriQuery(const Uri& uri) { blob_storage_scheme = "http"; dfs_storage_scheme = "http"; } + } else if (kv.first == "background_writes") { + ARROW_ASSIGN_OR_RAISE(background_writes, + ::arrow::internal::ParseBoolean(kv.second)); } else { return Status::Invalid( "Unexpected query parameter in Azure Blob File System URI: '", kv.first, "'"); @@ -937,8 +941,8 @@ Status CommitBlockList(std::shared_ptr block_bl const std::vector& block_ids, const Blobs::CommitBlockListOptions& options) { try { - // CommitBlockList puts all block_ids in the latest element. That means in the case of - // overlapping block_ids the newly staged block ids will always replace the + // CommitBlockList puts all block_ids in the latest element. That means in the case + // of overlapping block_ids the newly staged block ids will always replace the // previously committed blocks. // https://learn.microsoft.com/en-us/rest/api/storageservices/put-block-list?tabs=microsoft-entra-id#request-body block_blob_client->CommitBlockList(block_ids, options); @@ -950,7 +954,34 @@ Status CommitBlockList(std::shared_ptr block_bl return Status::OK(); } +Status StageBlock(Blobs::BlockBlobClient* block_blob_client, const std::string& id, + Core::IO::MemoryBodyStream& content) { + try { + block_blob_client->StageBlock(id, content); + } catch (const Storage::StorageException& exception) { + return ExceptionToStatus( + exception, "StageBlock failed for '", block_blob_client->GetUrl(), + "' new_block_id: '", id, + "'. Staging new blocks is fundamental to streaming writes to blob storage."); + } + + return Status::OK(); +} + +/// Writes will be buffered up to this size (in bytes) before actually uploading them. +static constexpr int64_t kBlockUploadSizeBytes = 10 * 1024 * 1024; +/// The maximum size of a block in Azure Blob (as per docs). +static constexpr int64_t kMaxBlockSizeBytes = 4UL * 1024 * 1024 * 1024; + +/// This output stream, similar to other arrow OutputStreams, is not thread-safe. class ObjectAppendStream final : public io::OutputStream { + private: + struct UploadState; + + std::shared_ptr Self() { + return std::dynamic_pointer_cast(shared_from_this()); + } + public: ObjectAppendStream(std::shared_ptr block_blob_client, const io::IOContext& io_context, const AzureLocation& location, @@ -958,7 +989,8 @@ class ObjectAppendStream final : public io::OutputStream { const AzureOptions& options) : block_blob_client_(std::move(block_blob_client)), io_context_(io_context), - location_(location) { + location_(location), + background_writes_(options.background_writes) { if (metadata && metadata->size() != 0) { ArrowMetadataToCommitBlockListOptions(metadata, commit_block_list_options_); } else if (options.default_metadata && options.default_metadata->size() != 0) { @@ -1008,10 +1040,13 @@ class ObjectAppendStream final : public io::OutputStream { content_length_ = 0; } } + + upload_state_ = std::make_shared(); + if (content_length_ > 0) { ARROW_ASSIGN_OR_RAISE(auto block_list, GetBlockList(block_blob_client_)); for (auto block : block_list.CommittedBlocks) { - block_ids_.push_back(block.Name); + upload_state_->block_ids.push_back(block.Name); } } initialised_ = true; @@ -1031,12 +1066,34 @@ class ObjectAppendStream final : public io::OutputStream { if (closed_) { return Status::OK(); } + + if (current_block_) { + // Upload remaining buffer + RETURN_NOT_OK(AppendCurrentBlock()); + } + RETURN_NOT_OK(Flush()); block_blob_client_ = nullptr; closed_ = true; return Status::OK(); } + Future<> CloseAsync() override { + if (closed_) { + return Status::OK(); + } + + if (current_block_) { + // Upload remaining buffer + RETURN_NOT_OK(AppendCurrentBlock()); + } + + return FlushAsync().Then([self = Self()]() { + self->block_blob_client_ = nullptr; + self->closed_ = true; + }); + } + bool closed() const override { return closed_; } Status CheckClosed(const char* action) const { @@ -1052,11 +1109,11 @@ class ObjectAppendStream final : public io::OutputStream { } Status Write(const std::shared_ptr& buffer) override { - return DoAppend(buffer->data(), buffer->size(), buffer); + return DoWrite(buffer->data(), buffer->size(), buffer); } Status Write(const void* data, int64_t nbytes) override { - return DoAppend(data, nbytes); + return DoWrite(data, nbytes); } Status Flush() override { @@ -1066,20 +1123,111 @@ class ObjectAppendStream final : public io::OutputStream { // flush. This also avoids some unhandled errors when flushing in the destructor. return Status::OK(); } - return CommitBlockList(block_blob_client_, block_ids_, commit_block_list_options_); + + Future<> pending_blocks_completed; + { + std::unique_lock lock(upload_state_->mutex); + pending_blocks_completed = upload_state_->pending_blocks_completed; + } + + RETURN_NOT_OK(pending_blocks_completed.status()); + std::unique_lock lock(upload_state_->mutex); + return CommitBlockList(block_blob_client_, upload_state_->block_ids, + commit_block_list_options_); } - private: - Status DoAppend(const void* data, int64_t nbytes, - std::shared_ptr owned_buffer = nullptr) { - RETURN_NOT_OK(CheckClosed("append")); - auto append_data = reinterpret_cast(data); - Core::IO::MemoryBodyStream block_content(append_data, nbytes); - if (block_content.Length() == 0) { + Future<> FlushAsync() { + RETURN_NOT_OK(CheckClosed("flush async")); + if (!initialised_) { + // If the stream has not been successfully initialized then there is nothing to + // flush. This also avoids some unhandled errors when flushing in the destructor. return Status::OK(); } - const auto n_block_ids = block_ids_.size(); + Future<> pending_blocks_completed; + { + std::unique_lock lock(upload_state_->mutex); + pending_blocks_completed = upload_state_->pending_blocks_completed; + } + + return pending_blocks_completed.Then([self = Self()] { + std::unique_lock lock(self->upload_state_->mutex); + return CommitBlockList(self->block_blob_client_, self->upload_state_->block_ids, + self->commit_block_list_options_); + }); + } + + private: + Status AppendCurrentBlock() { + ARROW_ASSIGN_OR_RAISE(auto buf, current_block_->Finish()); + current_block_.reset(); + current_block_size_ = 0; + return AppendBlock(buf); + } + + Status DoWrite(const void* data, int64_t nbytes, + std::shared_ptr owned_buffer = nullptr) { + if (closed_) { + return Status::Invalid("Operation on closed stream"); + } + + const auto* data_ptr = reinterpret_cast(data); + auto advance_ptr = [this, &data_ptr, &nbytes](const int64_t offset) { + data_ptr += offset; + nbytes -= offset; + pos_ += offset; + content_length_ += offset; + }; + + // Handle case where we have some bytes buffered from prior calls. + if (current_block_size_ > 0) { + // Try to fill current buffer + const int64_t to_copy = + std::min(nbytes, kBlockUploadSizeBytes - current_block_size_); + RETURN_NOT_OK(current_block_->Write(data_ptr, to_copy)); + current_block_size_ += to_copy; + advance_ptr(to_copy); + + // If buffer isn't full, break + if (current_block_size_ < kBlockUploadSizeBytes) { + return Status::OK(); + } + + // Upload current buffer + RETURN_NOT_OK(AppendCurrentBlock()); + } + + // We can upload chunks without copying them into a buffer + while (nbytes >= kBlockUploadSizeBytes) { + const auto upload_size = std::min(nbytes, kMaxBlockSizeBytes); + RETURN_NOT_OK(AppendBlock(data_ptr, upload_size)); + advance_ptr(upload_size); + } + + // Buffer remaining bytes + if (nbytes > 0) { + current_block_size_ = nbytes; + + if (current_block_ == nullptr) { + ARROW_ASSIGN_OR_RAISE( + current_block_, + io::BufferOutputStream::Create(kBlockUploadSizeBytes, io_context_.pool())); + } else { + // Re-use the allocation from before. + RETURN_NOT_OK(current_block_->Reset(kBlockUploadSizeBytes, io_context_.pool())); + } + + RETURN_NOT_OK(current_block_->Write(data_ptr, current_block_size_)); + pos_ += current_block_size_; + content_length_ += current_block_size_; + } + + return Status::OK(); + } + + std::string CreateBlock() { + std::unique_lock lock(upload_state_->mutex); + const auto n_block_ids = upload_state_->block_ids.size(); // New block ID must always be distinct from the existing block IDs. Otherwise we // will accidentally replace the content of existing blocks, causing corruption. @@ -1093,36 +1241,106 @@ class ObjectAppendStream final : public io::OutputStream { new_block_id.insert(0, required_padding_digits, '0'); // There is a small risk when appending to a blob created by another client that // `new_block_id` may overlapping with an existing block id. Adding the `-arrow` - // suffix significantly reduces the risk, but does not 100% eliminate it. For example - // if the blob was previously created with one block, with id `00001-arrow` then the - // next block we append will conflict with that, and cause corruption. + // suffix significantly reduces the risk, but does not 100% eliminate it. For + // example if the blob was previously created with one block, with id `00001-arrow` + // then the next block we append will conflict with that, and cause corruption. new_block_id += "-arrow"; new_block_id = Core::Convert::Base64Encode( std::vector(new_block_id.begin(), new_block_id.end())); - try { - block_blob_client_->StageBlock(new_block_id, block_content); - } catch (const Storage::StorageException& exception) { - return ExceptionToStatus( - exception, "StageBlock failed for '", block_blob_client_->GetUrl(), - "' new_block_id: '", new_block_id, - "'. Staging new blocks is fundamental to streaming writes to blob storage."); + upload_state_->block_ids.push_back(new_block_id); + + // We only use the future if we have background writes enabled. Without background + // writes the future is initialized as finished and not mutated any more. + if (background_writes_ && upload_state_->blocks_in_progress++ == 0) { + upload_state_->pending_blocks_completed = Future<>::Make(); } - block_ids_.push_back(new_block_id); - pos_ += nbytes; - content_length_ += nbytes; + + return new_block_id; + } + + Status AppendBlock(const void* data, int64_t nbytes, + std::shared_ptr owned_buffer = nullptr) { + RETURN_NOT_OK(CheckClosed("append")); + + if (nbytes == 0) { + return Status::OK(); + } + + const auto block_id = CreateBlock(); + + if (background_writes_) { + if (owned_buffer == nullptr) { + ARROW_ASSIGN_OR_RAISE(owned_buffer, AllocateBuffer(nbytes, io_context_.pool())); + memcpy(owned_buffer->mutable_data(), data, nbytes); + } else { + DCHECK_EQ(data, owned_buffer->data()); + DCHECK_EQ(nbytes, owned_buffer->size()); + } + + // The closure keeps the buffer and the upload state alive + auto deferred = [owned_buffer, block_id, block_blob_client = block_blob_client_, + state = upload_state_]() mutable -> Status { + Core::IO::MemoryBodyStream block_content(owned_buffer->data(), + owned_buffer->size()); + + auto status = StageBlock(block_blob_client.get(), block_id, block_content); + HandleUploadOutcome(state, status); + return Status::OK(); + }; + RETURN_NOT_OK(io::internal::SubmitIO(io_context_, std::move(deferred))); + } else { + auto append_data = reinterpret_cast(data); + Core::IO::MemoryBodyStream block_content(append_data, nbytes); + + RETURN_NOT_OK(StageBlock(block_blob_client_.get(), block_id, block_content)); + } + return Status::OK(); } + Status AppendBlock(std::shared_ptr buffer) { + return AppendBlock(buffer->data(), buffer->size(), buffer); + } + + static void HandleUploadOutcome(const std::shared_ptr& state, + const Status& status) { + std::unique_lock lock(state->mutex); + if (!status.ok()) { + state->status &= status; + } + // Notify completion + if (--state->blocks_in_progress == 0) { + auto fut = state->pending_blocks_completed; + lock.unlock(); + fut.MarkFinished(state->status); + } + } + std::shared_ptr block_blob_client_; const io::IOContext io_context_; const AzureLocation location_; + const bool background_writes_; int64_t content_length_ = kNoSize; + std::shared_ptr current_block_; + int64_t current_block_size_ = 0; + bool closed_ = false; bool initialised_ = false; int64_t pos_ = 0; - std::vector block_ids_; + + // This struct is kept alive through background writes to avoid problems + // in the completion handler. + struct UploadState { + std::mutex mutex; + std::vector block_ids; + int64_t blocks_in_progress = 0; + Status status; + Future<> pending_blocks_completed = Future<>::MakeFinished(Status::OK()); + }; + std::shared_ptr upload_state_; + Blobs::CommitBlockListOptions commit_block_list_options_; }; diff --git a/cpp/src/arrow/filesystem/azurefs.h b/cpp/src/arrow/filesystem/azurefs.h index 072b061eeb2..ebbe00c4ee7 100644 --- a/cpp/src/arrow/filesystem/azurefs.h +++ b/cpp/src/arrow/filesystem/azurefs.h @@ -112,6 +112,9 @@ struct ARROW_EXPORT AzureOptions { /// This will be ignored if non-empty metadata is passed to OpenOutputStream. std::shared_ptr default_metadata; + /// Whether OutputStream writes will be issued in the background, without blocking. + bool background_writes = true; + private: enum class CredentialKind { kDefault, diff --git a/cpp/src/arrow/filesystem/azurefs_test.cc b/cpp/src/arrow/filesystem/azurefs_test.cc index 5ff241b17ff..9d437d1f83a 100644 --- a/cpp/src/arrow/filesystem/azurefs_test.cc +++ b/cpp/src/arrow/filesystem/azurefs_test.cc @@ -39,6 +39,7 @@ #include #include #include +#include #include #include @@ -53,6 +54,7 @@ #include "arrow/status.h" #include "arrow/testing/gtest_util.h" #include "arrow/testing/util.h" +#include "arrow/util/future.h" #include "arrow/util/io_util.h" #include "arrow/util/key_value_metadata.h" #include "arrow/util/logging.h" @@ -566,6 +568,7 @@ class TestAzureOptions : public ::testing::Test { ASSERT_EQ(options.dfs_storage_scheme, default_options.dfs_storage_scheme); ASSERT_EQ(options.credential_kind_, AzureOptions::CredentialKind::kDefault); ASSERT_EQ(path, "container/dir/blob"); + ASSERT_EQ(options.background_writes, true); } void TestFromUriDfsStorage() { @@ -582,6 +585,7 @@ class TestAzureOptions : public ::testing::Test { ASSERT_EQ(options.dfs_storage_scheme, default_options.dfs_storage_scheme); ASSERT_EQ(options.credential_kind_, AzureOptions::CredentialKind::kDefault); ASSERT_EQ(path, "file_system/dir/file"); + ASSERT_EQ(options.background_writes, true); } void TestFromUriAbfs() { @@ -597,6 +601,7 @@ class TestAzureOptions : public ::testing::Test { ASSERT_EQ(options.dfs_storage_scheme, "https"); ASSERT_EQ(options.credential_kind_, AzureOptions::CredentialKind::kStorageSharedKey); ASSERT_EQ(path, "container/dir/blob"); + ASSERT_EQ(options.background_writes, true); } void TestFromUriAbfss() { @@ -612,6 +617,7 @@ class TestAzureOptions : public ::testing::Test { ASSERT_EQ(options.dfs_storage_scheme, "https"); ASSERT_EQ(options.credential_kind_, AzureOptions::CredentialKind::kStorageSharedKey); ASSERT_EQ(path, "container/dir/blob"); + ASSERT_EQ(options.background_writes, true); } void TestFromUriEnableTls() { @@ -628,6 +634,17 @@ class TestAzureOptions : public ::testing::Test { ASSERT_EQ(options.dfs_storage_scheme, "http"); ASSERT_EQ(options.credential_kind_, AzureOptions::CredentialKind::kStorageSharedKey); ASSERT_EQ(path, "container/dir/blob"); + ASSERT_EQ(options.background_writes, true); + } + + void TestFromUriDisableBackgroundWrites() { + std::string path; + ASSERT_OK_AND_ASSIGN(auto options, + AzureOptions::FromUri( + "abfs://account:password@127.0.0.1:10000/container/dir/blob?" + "background_writes=false", + &path)); + ASSERT_EQ(options.background_writes, false); } void TestFromUriCredentialDefault() { @@ -773,6 +790,9 @@ TEST_F(TestAzureOptions, FromUriDfsStorage) { TestFromUriDfsStorage(); } TEST_F(TestAzureOptions, FromUriAbfs) { TestFromUriAbfs(); } TEST_F(TestAzureOptions, FromUriAbfss) { TestFromUriAbfss(); } TEST_F(TestAzureOptions, FromUriEnableTls) { TestFromUriEnableTls(); } +TEST_F(TestAzureOptions, FromUriDisableBackgroundWrites) { + TestFromUriDisableBackgroundWrites(); +} TEST_F(TestAzureOptions, FromUriCredentialDefault) { TestFromUriCredentialDefault(); } TEST_F(TestAzureOptions, FromUriCredentialAnonymous) { TestFromUriCredentialAnonymous(); } TEST_F(TestAzureOptions, FromUriCredentialStorageSharedKey) { @@ -929,8 +949,9 @@ class TestAzureFileSystem : public ::testing::Test { void UploadLines(const std::vector& lines, const std::string& path, int total_size) { ASSERT_OK_AND_ASSIGN(auto output, fs()->OpenOutputStream(path, {})); - const auto all_lines = std::accumulate(lines.begin(), lines.end(), std::string("")); - ASSERT_OK(output->Write(all_lines)); + for (auto const& line : lines) { + ASSERT_OK(output->Write(line.data(), line.size())); + } ASSERT_OK(output->Close()); } @@ -1474,6 +1495,162 @@ class TestAzureFileSystem : public ::testing::Test { arrow::fs::AssertFileInfo(fs(), data.Path("dir/file0"), FileType::File); } + void AssertObjectContents(AzureFileSystem* fs, std::string_view path, + std::string_view expected) { + ASSERT_OK_AND_ASSIGN(auto input, fs->OpenInputStream(std::string{path})); + std::string contents; + std::shared_ptr buffer; + do { + ASSERT_OK_AND_ASSIGN(buffer, input->Read(128 * 1024)); + contents.append(buffer->ToString()); + } while (buffer->size() != 0); + + EXPECT_EQ(expected, contents); + } + + void TestOpenOutputStreamSmall() { + ASSERT_OK_AND_ASSIGN(auto fs, AzureFileSystem::Make(options_)); + + auto data = SetUpPreexistingData(); + const auto path = data.ContainerPath("test-write-object"); + ASSERT_OK_AND_ASSIGN(auto output, fs->OpenOutputStream(path, {})); + const std::string_view expected(PreexistingData::kLoremIpsum); + ASSERT_OK(output->Write(expected)); + ASSERT_OK(output->Close()); + + // Verify we can read the object back. + AssertObjectContents(fs.get(), path, expected); + } + + void TestOpenOutputStreamLarge() { + ASSERT_OK_AND_ASSIGN(auto fs, AzureFileSystem::Make(options_)); + + auto data = SetUpPreexistingData(); + const auto path = data.ContainerPath("test-write-object"); + ASSERT_OK_AND_ASSIGN(auto output, fs->OpenOutputStream(path, {})); + + // Upload 5 MB, 4 MB und 2 MB and a very small write to test varying sizes + std::vector sizes{5 * 1024 * 1024, 4 * 1024 * 1024, 2 * 1024 * 1024, + 2000}; + + std::vector buffers{}; + char current_char = 'A'; + for (const auto size : sizes) { + buffers.emplace_back(size, current_char++); + } + + auto expected_size = std::int64_t{0}; + for (size_t i = 0; i < buffers.size(); ++i) { + ASSERT_OK(output->Write(buffers[i])); + expected_size += sizes[i]; + ASSERT_EQ(expected_size, output->Tell()); + } + ASSERT_OK(output->Close()); + + AssertObjectContents(fs.get(), path, + buffers[0] + buffers[1] + buffers[2] + buffers[3]); + } + + void TestOpenOutputStreamLargeSingleWrite() { + ASSERT_OK_AND_ASSIGN(auto fs, AzureFileSystem::Make(options_)); + + auto data = SetUpPreexistingData(); + const auto path = data.ContainerPath("test-write-object"); + ASSERT_OK_AND_ASSIGN(auto output, fs->OpenOutputStream(path, {})); + + constexpr std::int64_t size{12 * 1024 * 1024}; + const std::string large_string(size, 'X'); + + ASSERT_OK(output->Write(large_string)); + ASSERT_EQ(size, output->Tell()); + ASSERT_OK(output->Close()); + + AssertObjectContents(fs.get(), path, large_string); + } + + void TestOpenOutputStreamCloseAsync() { +#if defined(ADDRESS_SANITIZER) || defined(ARROW_VALGRIND) + // This false positive leak is similar to the one pinpointed in the + // have_false_positive_memory_leak_with_generator() comments above, + // though the stack trace is different. It happens when a block list + // is committed from a background thread. + // + // clang-format off + // Direct leak of 968 byte(s) in 1 object(s) allocated from: + // #0 calloc + // #1 (/lib/x86_64-linux-gnu/libxml2.so.2+0xe25a4) + // #2 __xmlDefaultBufferSize + // #3 xmlBufferCreate + // #4 Azure::Storage::_internal::XmlWriter::XmlWriter() + // #5 Azure::Storage::Blobs::_detail::BlockBlobClient::CommitBlockList + // #6 Azure::Storage::Blobs::BlockBlobClient::CommitBlockList + // #7 arrow::fs::(anonymous namespace)::CommitBlockList + // #8 arrow::fs::(anonymous namespace)::ObjectAppendStream::FlushAsync()::'lambda' + // clang-format on + // + // TODO perhaps remove this skip once we can rely on + // https://github.com/Azure/azure-sdk-for-cpp/pull/5767 + // + // Also note that ClickHouse has a workaround for a similar issue: + // https://github.com/ClickHouse/ClickHouse/pull/45796 + if (options_.background_writes) { + GTEST_SKIP() << "False positive memory leak in libxml2 with CloseAsync"; + } +#endif + ASSERT_OK_AND_ASSIGN(auto fs, AzureFileSystem::Make(options_)); + auto data = SetUpPreexistingData(); + const std::string path = data.ContainerPath("test-write-object"); + constexpr auto payload = PreexistingData::kLoremIpsum; + + ASSERT_OK_AND_ASSIGN(auto stream, fs->OpenOutputStream(path)); + ASSERT_OK(stream->Write(payload)); + auto close_fut = stream->CloseAsync(); + + ASSERT_OK(close_fut.MoveResult()); + + AssertObjectContents(fs.get(), path, payload); + } + + void TestOpenOutputStreamCloseAsyncDestructor() { +#if defined(ADDRESS_SANITIZER) || defined(ARROW_VALGRIND) + // See above. + if (options_.background_writes) { + GTEST_SKIP() << "False positive memory leak in libxml2 with CloseAsync"; + } +#endif + ASSERT_OK_AND_ASSIGN(auto fs, AzureFileSystem::Make(options_)); + auto data = SetUpPreexistingData(); + const std::string path = data.ContainerPath("test-write-object"); + constexpr auto payload = PreexistingData::kLoremIpsum; + + ASSERT_OK_AND_ASSIGN(auto stream, fs->OpenOutputStream(path)); + ASSERT_OK(stream->Write(payload)); + // Destructor implicitly closes stream and completes the upload. + // Testing it doesn't matter whether flush is triggered asynchronously + // after CloseAsync or synchronously after stream.reset() since we're just + // checking that the future keeps the stream alive until completion + // rather than segfaulting on a dangling stream. + auto close_fut = stream->CloseAsync(); + stream.reset(); + ASSERT_OK(close_fut.MoveResult()); + + AssertObjectContents(fs.get(), path, payload); + } + + void TestOpenOutputStreamDestructor() { + ASSERT_OK_AND_ASSIGN(auto fs, AzureFileSystem::Make(options_)); + constexpr auto* payload = "new data"; + auto data = SetUpPreexistingData(); + const std::string path = data.ContainerPath("test-write-object"); + + ASSERT_OK_AND_ASSIGN(auto stream, fs->OpenOutputStream(path)); + ASSERT_OK(stream->Write(payload)); + // Destructor implicitly closes stream and completes the multipart upload. + stream.reset(); + + AssertObjectContents(fs.get(), path, payload); + } + private: using StringMatcher = ::testing::PolymorphicMatcher<::testing::internal::HasSubstrMatcher>; @@ -2704,53 +2881,27 @@ TEST_F(TestAzuriteFileSystem, WriteMetadataHttpHeaders) { ASSERT_EQ("text/plain", content_type); } -TEST_F(TestAzuriteFileSystem, OpenOutputStreamSmall) { - auto data = SetUpPreexistingData(); - const auto path = data.ContainerPath("test-write-object"); - ASSERT_OK_AND_ASSIGN(auto output, fs()->OpenOutputStream(path, {})); - const std::string_view expected(PreexistingData::kLoremIpsum); - ASSERT_OK(output->Write(expected)); - ASSERT_OK(output->Close()); - - // Verify we can read the object back. - ASSERT_OK_AND_ASSIGN(auto input, fs()->OpenInputStream(path)); +TEST_F(TestAzuriteFileSystem, OpenOutputStreamSmallNoBackgroundWrites) { + options_.background_writes = false; + TestOpenOutputStreamSmall(); +} - std::array inbuf{}; - ASSERT_OK_AND_ASSIGN(auto size, input->Read(inbuf.size(), inbuf.data())); +TEST_F(TestAzuriteFileSystem, OpenOutputStreamSmall) { TestOpenOutputStreamSmall(); } - EXPECT_EQ(expected, std::string_view(inbuf.data(), size)); +TEST_F(TestAzuriteFileSystem, OpenOutputStreamLargeNoBackgroundWrites) { + options_.background_writes = false; + TestOpenOutputStreamLarge(); } -TEST_F(TestAzuriteFileSystem, OpenOutputStreamLarge) { - auto data = SetUpPreexistingData(); - const auto path = data.ContainerPath("test-write-object"); - ASSERT_OK_AND_ASSIGN(auto output, fs()->OpenOutputStream(path, {})); - std::array sizes{257 * 1024, 258 * 1024, 259 * 1024}; - std::array buffers{ - std::string(sizes[0], 'A'), - std::string(sizes[1], 'B'), - std::string(sizes[2], 'C'), - }; - auto expected = std::int64_t{0}; - for (auto i = 0; i != 3; ++i) { - ASSERT_OK(output->Write(buffers[i])); - expected += sizes[i]; - ASSERT_EQ(expected, output->Tell()); - } - ASSERT_OK(output->Close()); - - // Verify we can read the object back. - ASSERT_OK_AND_ASSIGN(auto input, fs()->OpenInputStream(path)); +TEST_F(TestAzuriteFileSystem, OpenOutputStreamLarge) { TestOpenOutputStreamLarge(); } - std::string contents; - std::shared_ptr buffer; - do { - ASSERT_OK_AND_ASSIGN(buffer, input->Read(128 * 1024)); - ASSERT_TRUE(buffer); - contents.append(buffer->ToString()); - } while (buffer->size() != 0); +TEST_F(TestAzuriteFileSystem, OpenOutputStreamLargeSingleWriteNoBackgroundWrites) { + options_.background_writes = false; + TestOpenOutputStreamLargeSingleWrite(); +} - EXPECT_EQ(contents, buffers[0] + buffers[1] + buffers[2]); +TEST_F(TestAzuriteFileSystem, OpenOutputStreamLargeSingleWrite) { + TestOpenOutputStreamLargeSingleWrite(); } TEST_F(TestAzuriteFileSystem, OpenOutputStreamTruncatesExistingFile) { @@ -2820,6 +2971,33 @@ TEST_F(TestAzuriteFileSystem, OpenOutputStreamClosed) { ASSERT_RAISES(Invalid, output->Tell()); } +TEST_F(TestAzuriteFileSystem, OpenOutputStreamCloseAsync) { + TestOpenOutputStreamCloseAsync(); +} + +TEST_F(TestAzuriteFileSystem, OpenOutputStreamCloseAsyncNoBackgroundWrites) { + options_.background_writes = false; + TestOpenOutputStreamCloseAsync(); +} + +TEST_F(TestAzuriteFileSystem, OpenOutputStreamAsyncDestructor) { + TestOpenOutputStreamCloseAsyncDestructor(); +} + +TEST_F(TestAzuriteFileSystem, OpenOutputStreamAsyncDestructorNoBackgroundWrites) { + options_.background_writes = false; + TestOpenOutputStreamCloseAsyncDestructor(); +} + +TEST_F(TestAzuriteFileSystem, OpenOutputStreamDestructor) { + TestOpenOutputStreamDestructor(); +} + +TEST_F(TestAzuriteFileSystem, OpenOutputStreamDestructorNoBackgroundWrites) { + options_.background_writes = false; + TestOpenOutputStreamDestructor(); +} + TEST_F(TestAzuriteFileSystem, OpenOutputStreamUri) { auto data = SetUpPreexistingData(); const auto path = data.ContainerPath("open-output-stream-uri.txt");