From b305c143abac1ee2baf6728970bbd33a67847ae6 Mon Sep 17 00:00:00 2001 From: Thomas Newton Date: Sun, 11 Feb 2024 14:48:56 +0000 Subject: [PATCH 01/20] Add MetadataIndicatesIsDirectory function --- cpp/src/arrow/filesystem/azurefs.cc | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/cpp/src/arrow/filesystem/azurefs.cc b/cpp/src/arrow/filesystem/azurefs.cc index 23af67a33d6..bde11c94673 100644 --- a/cpp/src/arrow/filesystem/azurefs.cc +++ b/cpp/src/arrow/filesystem/azurefs.cc @@ -347,6 +347,22 @@ bool IsContainerNotFound(const Storage::StorageException& e) { return false; } +const auto kHierarchicalNamespaceIsDirectoryMetadataKey = "hdi_isFolder"; +const auto kFlatNamespaceIsDirectoryMetadataKey = "is_directory"; + +bool MetadataIndicatesIsDirectory(const Storage::Metadata& metadata) { + // Inspired by + // https://github.com/Azure/azure-sdk-for-cpp/blob/12407e8bfcb9bc1aa43b253c1d0ec93bf795ae3b/sdk/storage/azure-storage-files-datalake/src/datalake_utilities.cpp#L86-L91 + auto hierarchical_directory_metadata = + metadata.find(kHierarchicalNamespaceIsDirectoryMetadataKey); + if (hierarchical_directory_metadata != metadata.end()) { + return hierarchical_directory_metadata->second == "true"; + } + auto flat_directory_metadata = metadata.find(kFlatNamespaceIsDirectoryMetadataKey); + return flat_directory_metadata != metadata.end() && + flat_directory_metadata->second == "true"; +} + template std::string FormatValue(typename TypeTraits::CType value) { struct StringAppender { @@ -1690,7 +1706,7 @@ class AzureFileSystem::Impl { // on directory marker blobs. // https://github.com/fsspec/adlfs/blob/32132c4094350fca2680155a5c236f2e9f991ba5/adlfs/spec.py#L855-L870 Blobs::UploadBlockBlobFromOptions blob_options; - blob_options.Metadata.emplace("is_directory", "true"); + blob_options.Metadata.emplace(kFlatNamespaceIsDirectoryMetadataKey, "true"); block_blob_client.UploadFrom(nullptr, 0, blob_options); } From af47546cf1c118ba532121ea7ac8750ca7f5d4b9 Mon Sep 17 00:00:00 2001 From: Thomas Newton Date: Sun, 11 Feb 2024 14:53:29 +0000 Subject: [PATCH 02/20] Return not a file if trying to read or write a directory --- cpp/src/arrow/filesystem/azurefs.cc | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/cpp/src/arrow/filesystem/azurefs.cc b/cpp/src/arrow/filesystem/azurefs.cc index bde11c94673..3d2f96ee93d 100644 --- a/cpp/src/arrow/filesystem/azurefs.cc +++ b/cpp/src/arrow/filesystem/azurefs.cc @@ -533,6 +533,9 @@ class ObjectInputFile final : public io::RandomAccessFile { } try { auto properties = blob_client_->GetProperties(); + if (MetadataIndicatesIsDirectory(properties.Value.Metadata)) { + return NotAFile(location_); + } content_length_ = properties.Value.BlobSize; metadata_ = PropertiesToMetadata(properties.Value); return Status::OK(); @@ -739,6 +742,9 @@ class ObjectAppendStream final : public io::OutputStream { } else { try { auto properties = block_blob_client_->GetProperties(); + if (MetadataIndicatesIsDirectory(properties.Value.Metadata)) { + return NotAFile(location_); + } content_length_ = properties.Value.BlobSize; pos_ = content_length_; } catch (const Storage::StorageException& exception) { From e0d5d2bd770b984004b75527d57bc38d20ba8b7e Mon Sep 17 00:00:00 2001 From: Thomas Newton Date: Sun, 11 Feb 2024 16:42:48 +0000 Subject: [PATCH 03/20] Add tests --- cpp/src/arrow/filesystem/azurefs_test.cc | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/cpp/src/arrow/filesystem/azurefs_test.cc b/cpp/src/arrow/filesystem/azurefs_test.cc index e6bd80d1d25..41dc710efbf 100644 --- a/cpp/src/arrow/filesystem/azurefs_test.cc +++ b/cpp/src/arrow/filesystem/azurefs_test.cc @@ -826,6 +826,14 @@ class TestAzureFileSystem : public ::testing::Test { AssertFileInfo(fs(), subdir3, FileType::Directory); } + void TestDisallowReadingOrWritingDirectoryMarkers() { + auto data = SetUpPreexistingData(); + auto directory_path = data.Path("directory"); + ASSERT_OK(fs()->CreateDir(directory_path)); + ASSERT_RAISES(IOError, fs()->OpenAppendStream(directory_path)); + ASSERT_RAISES(IOError, fs()->OpenInputFile(directory_path)); + } + void TestDeleteDirSuccessEmpty() { if (HasSubmitBatchBug()) { GTEST_SKIP() << kSubmitBatchBugMessage; @@ -1559,6 +1567,10 @@ TYPED_TEST(TestAzureFileSystemOnAllScenarios, CreateDirOnMissingContainer) { this->TestCreateDirOnMissingContainer(); } +TYPED_TEST(TestAzureFileSystemOnAllScenarios, DisallowReadingOrWritingDirectoryMarkers) { + this->TestDisallowReadingOrWritingDirectoryMarkers(); +} + TYPED_TEST(TestAzureFileSystemOnAllScenarios, DeleteDirSuccessEmpty) { this->TestDeleteDirSuccessEmpty(); } From 67a5e2052baec1573af2a38b327aa9972d4a9376 Mon Sep 17 00:00:00 2001 From: Thomas Newton Date: Sun, 11 Feb 2024 17:22:16 +0000 Subject: [PATCH 04/20] Avoid some unhandled exceptions in destructor --- cpp/src/arrow/filesystem/azurefs.cc | 14 +++++++++++--- cpp/src/arrow/io/interfaces.cc | 13 +++++++++++-- cpp/src/arrow/io/util_internal.h | 1 + 3 files changed, 23 insertions(+), 5 deletions(-) diff --git a/cpp/src/arrow/filesystem/azurefs.cc b/cpp/src/arrow/filesystem/azurefs.cc index 3d2f96ee93d..83ab2851bdc 100644 --- a/cpp/src/arrow/filesystem/azurefs.cc +++ b/cpp/src/arrow/filesystem/azurefs.cc @@ -730,9 +730,15 @@ class ObjectAppendStream final : public io::OutputStream { } ~ObjectAppendStream() override { - // For compliance with the rest of the IO stack, Close rather than Abort, - // even though it may be more expensive. - io::internal::CloseFromDestructor(this); + if (at_least_one_successful_append_) { + // For compliance with the rest of the IO stack, Close rather than Abort, + // even though it may be more expensive. + io::internal::CloseFromDestructor(this); + } else { + // Avoid Flushing if we don't need to. If Flush throws an exception in this + // destructor it can't be handled by the caller. + io::internal::AbortFromDestructor(this); + } } Status Init() { @@ -856,6 +862,7 @@ class ObjectAppendStream final : public io::OutputStream { block_ids_.push_back(new_block_id); pos_ += nbytes; content_length_ += nbytes; + at_least_one_successful_append_ = true; return Status::OK(); } @@ -864,6 +871,7 @@ class ObjectAppendStream final : public io::OutputStream { const AzureLocation location_; bool closed_ = false; + bool at_least_one_successful_append_ = false; int64_t pos_ = 0; int64_t content_length_ = kNoSize; std::vector block_ids_; diff --git a/cpp/src/arrow/io/interfaces.cc b/cpp/src/arrow/io/interfaces.cc index 1d35549cc43..1fe28fc6226 100644 --- a/cpp/src/arrow/io/interfaces.cc +++ b/cpp/src/arrow/io/interfaces.cc @@ -280,8 +280,7 @@ Result> RandomAccessFile::GetStream( namespace internal { -void CloseFromDestructor(FileInterface* file) { - Status st = file->Close(); +void HandleFileStatusFromDestructor(Status st, FileInterface* file) { if (!st.ok()) { auto file_type = typeid(*file).name(); #ifdef NDEBUG @@ -295,6 +294,16 @@ void CloseFromDestructor(FileInterface* file) { } } +void CloseFromDestructor(FileInterface* file) { + Status st = file->Close(); + HandleFileStatusFromDestructor(st, file); +} + +void AbortFromDestructor(FileInterface* file) { + Status st = file->Abort(); + HandleFileStatusFromDestructor(st, file); +} + Result ValidateReadRange(int64_t offset, int64_t size, int64_t file_size) { if (offset < 0 || size < 0) { return Status::Invalid("Invalid read (offset = ", offset, ", size = ", size, ")"); diff --git a/cpp/src/arrow/io/util_internal.h b/cpp/src/arrow/io/util_internal.h index 2015f6a2112..cc42556582b 100644 --- a/cpp/src/arrow/io/util_internal.h +++ b/cpp/src/arrow/io/util_internal.h @@ -31,6 +31,7 @@ namespace io { namespace internal { ARROW_EXPORT void CloseFromDestructor(FileInterface* file); +ARROW_EXPORT void AbortFromDestructor(FileInterface* file); // Validate a (offset, size) region (as given to ReadAt) against // the file size. Return the actual read size. From 5388f67e0380525f9afd6cdddfd5254e4be5cfdc Mon Sep 17 00:00:00 2001 From: Thomas Newton Date: Sun, 11 Feb 2024 17:36:57 +0000 Subject: [PATCH 05/20] Add test DisallowCreatingFileAndDirectoryWithTheSameName --- cpp/src/arrow/filesystem/azurefs_test.cc | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/cpp/src/arrow/filesystem/azurefs_test.cc b/cpp/src/arrow/filesystem/azurefs_test.cc index 41dc710efbf..a466e7c29ba 100644 --- a/cpp/src/arrow/filesystem/azurefs_test.cc +++ b/cpp/src/arrow/filesystem/azurefs_test.cc @@ -834,6 +834,18 @@ class TestAzureFileSystem : public ::testing::Test { ASSERT_RAISES(IOError, fs()->OpenInputFile(directory_path)); } + void TestDisallowCreatingFileAndDirectoryWithTheSameName() { + auto data = SetUpPreexistingData(); + auto path1 = data.Path("directory1"); + ASSERT_OK(fs()->CreateDir(path1)); + ASSERT_RAISES(IOError, fs()->OpenOutputStream(path1)); + ASSERT_RAISES(IOError, fs()->OpenAppendStream(path1)); + + auto path2 = data.Path("directory2"); + ASSERT_OK(fs()->OpenOutputStream(path2)); + ASSERT_RAISES(IOError, fs()->CreateDir(path2)); + } + void TestDeleteDirSuccessEmpty() { if (HasSubmitBatchBug()) { GTEST_SKIP() << kSubmitBatchBugMessage; @@ -1571,6 +1583,10 @@ TYPED_TEST(TestAzureFileSystemOnAllScenarios, DisallowReadingOrWritingDirectoryM this->TestDisallowReadingOrWritingDirectoryMarkers(); } +TYPED_TEST(TestAzureFileSystemOnAllScenarios, DisallowCreatingFileAndDirectoryWithTheSameName) { + this->TestDisallowCreatingFileAndDirectoryWithTheSameName(); +} + TYPED_TEST(TestAzureFileSystemOnAllScenarios, DeleteDirSuccessEmpty) { this->TestDeleteDirSuccessEmpty(); } From 25dff2fc3bf0c100b03670a3ddabeb4c728d4c07 Mon Sep 17 00:00:00 2001 From: Thomas Newton Date: Sun, 18 Feb 2024 13:31:56 +0000 Subject: [PATCH 06/20] Add test cases for DisallowReadingOrWritingDirectoryMarkers with trailing slash --- cpp/src/arrow/filesystem/azurefs_test.cc | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/cpp/src/arrow/filesystem/azurefs_test.cc b/cpp/src/arrow/filesystem/azurefs_test.cc index a466e7c29ba..08a74e9b2be 100644 --- a/cpp/src/arrow/filesystem/azurefs_test.cc +++ b/cpp/src/arrow/filesystem/azurefs_test.cc @@ -829,9 +829,14 @@ class TestAzureFileSystem : public ::testing::Test { void TestDisallowReadingOrWritingDirectoryMarkers() { auto data = SetUpPreexistingData(); auto directory_path = data.Path("directory"); + + auto directory_path_with_slash = directory_path + "/"; ASSERT_OK(fs()->CreateDir(directory_path)); - ASSERT_RAISES(IOError, fs()->OpenAppendStream(directory_path)); ASSERT_RAISES(IOError, fs()->OpenInputFile(directory_path)); + ASSERT_RAISES(IOError, fs()->OpenAppendStream(directory_path)); + + ASSERT_RAISES(IOError, fs()->OpenInputFile(directory_path_with_slash)); + ASSERT_RAISES(IOError, fs()->OpenAppendStream(directory_path_with_slash)); } void TestDisallowCreatingFileAndDirectoryWithTheSameName() { From bfdd719db6fa7b6a57b3218c07c5bff129827b89 Mon Sep 17 00:00:00 2001 From: Thomas Newton Date: Sun, 18 Feb 2024 15:31:07 +0000 Subject: [PATCH 07/20] Add comments explaining the strategy with ObjectInputFile --- cpp/src/arrow/filesystem/azurefs.cc | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/cpp/src/arrow/filesystem/azurefs.cc b/cpp/src/arrow/filesystem/azurefs.cc index 83ab2851bdc..38edadde824 100644 --- a/cpp/src/arrow/filesystem/azurefs.cc +++ b/cpp/src/arrow/filesystem/azurefs.cc @@ -528,10 +528,15 @@ class ObjectInputFile final : public io::RandomAccessFile { Status Init() { if (content_length_ != kNoSize) { + // When the user provides the file size we don't validate that its a file. We assume + // the user knows what they are doing. This is only a read so its not a big deal if + // they make a mistake. DCHECK_GE(content_length_, 0); return Status::OK(); } try { + // To open an ObjectInputFile the Blob must exist and it must not represent + // a directory. Additionally we need to know the file size. auto properties = blob_client_->GetProperties(); if (MetadataIndicatesIsDirectory(properties.Value.Metadata)) { return NotAFile(location_); From ff72cdf21f0bcdd7a16866687e4143bf79a1f27c Mon Sep 17 00:00:00 2001 From: Thomas Newton Date: Sun, 18 Feb 2024 15:32:34 +0000 Subject: [PATCH 08/20] Ensure not a directory on OpenOutputStream and OpenAppendStream --- cpp/src/arrow/filesystem/azurefs.cc | 36 +++++++++++++++++++++++------ 1 file changed, 29 insertions(+), 7 deletions(-) diff --git a/cpp/src/arrow/filesystem/azurefs.cc b/cpp/src/arrow/filesystem/azurefs.cc index 38edadde824..87f8a41fc53 100644 --- a/cpp/src/arrow/filesystem/azurefs.cc +++ b/cpp/src/arrow/filesystem/azurefs.cc @@ -722,10 +722,13 @@ class ObjectAppendStream final : public io::OutputStream { ObjectAppendStream(std::shared_ptr block_blob_client, const io::IOContext& io_context, const AzureLocation& location, const std::shared_ptr& metadata, - const AzureOptions& options, int64_t size = kNoSize) + const AzureOptions& options, + std::function ensure_not_flat_namespace_directory, + int64_t size = kNoSize) : block_blob_client_(std::move(block_blob_client)), io_context_(io_context), location_(location), + ensure_not_flat_namespace_directory_(std::move(ensure_not_flat_namespace_directory)), content_length_(size) { if (metadata && metadata->size() != 0) { metadata_ = ArrowMetadataToAzureMetadata(metadata); @@ -760,6 +763,7 @@ class ObjectAppendStream final : public io::OutputStream { pos_ = content_length_; } catch (const Storage::StorageException& exception) { if (exception.StatusCode == Http::HttpStatusCode::NotFound) { + RETURN_NOT_OK(ensure_not_flat_namespace_directory_()); RETURN_NOT_OK(CreateEmptyBlockBlob(*block_blob_client_)); } else { return ExceptionToStatus( @@ -874,11 +878,12 @@ class ObjectAppendStream final : public io::OutputStream { std::shared_ptr block_blob_client_; const io::IOContext io_context_; const AzureLocation location_; + std::function ensure_not_flat_namespace_directory_; + int64_t content_length_ = kNoSize; bool closed_ = false; bool at_least_one_successful_append_ = false; int64_t pos_ = 0; - int64_t content_length_ = kNoSize; std::vector block_ids_; Storage::Metadata metadata_; }; @@ -1697,18 +1702,35 @@ class AzureFileSystem::Impl { AzureFileSystem* fs) { RETURN_NOT_OK(ValidateFileLocation(location)); + const auto blob_container_client = GetBlobContainerClient(location.container); auto block_blob_client = std::make_shared( - blob_service_client_->GetBlobContainerClient(location.container) - .GetBlockBlobClient(location.path)); + blob_container_client.GetBlockBlobClient(location.path)); + + auto ensure_not_flat_namespace_directory = [this, location, + blob_container_client]() -> Status { + bool hierarchical_namespace_enabled = + HierarchicalNamespaceSupport(GetFileSystemClient(location.container)) == + HNSSupport::kEnabled; + if (!hierarchical_namespace_enabled) { + ARROW_ASSIGN_OR_RAISE(auto status, GetFileInfo(blob_container_client, location)) + if (status.type() == FileType::Directory) { + return NotAFile(location); + } + } + return Status::OK(); + }; std::shared_ptr stream; if (truncate) { + RETURN_NOT_OK(ensure_not_flat_namespace_directory()); RETURN_NOT_OK(CreateEmptyBlockBlob(*block_blob_client)); - stream = std::make_shared(block_blob_client, fs->io_context(), - location, metadata, options_, 0); + stream = std::make_shared( + block_blob_client, fs->io_context(), location, metadata, options_, + ensure_not_flat_namespace_directory, 0); } else { stream = std::make_shared(block_blob_client, fs->io_context(), - location, metadata, options_); + location, metadata, options_, + ensure_not_flat_namespace_directory); } RETURN_NOT_OK(stream->Init()); return stream; From 8f573cb415cecf10b409313e90c1f9565bde1b36 Mon Sep 17 00:00:00 2001 From: Thomas Newton Date: Sun, 18 Feb 2024 15:36:34 +0000 Subject: [PATCH 09/20] More complete tests --- cpp/src/arrow/filesystem/azurefs_test.cc | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/cpp/src/arrow/filesystem/azurefs_test.cc b/cpp/src/arrow/filesystem/azurefs_test.cc index 08a74e9b2be..76fce445cab 100644 --- a/cpp/src/arrow/filesystem/azurefs_test.cc +++ b/cpp/src/arrow/filesystem/azurefs_test.cc @@ -830,12 +830,14 @@ class TestAzureFileSystem : public ::testing::Test { auto data = SetUpPreexistingData(); auto directory_path = data.Path("directory"); - auto directory_path_with_slash = directory_path + "/"; ASSERT_OK(fs()->CreateDir(directory_path)); ASSERT_RAISES(IOError, fs()->OpenInputFile(directory_path)); + ASSERT_RAISES(IOError, fs()->OpenOutputStream(directory_path)); ASSERT_RAISES(IOError, fs()->OpenAppendStream(directory_path)); + auto directory_path_with_slash = directory_path + "/"; ASSERT_RAISES(IOError, fs()->OpenInputFile(directory_path_with_slash)); + ASSERT_RAISES(IOError, fs()->OpenOutputStream(directory_path_with_slash)); ASSERT_RAISES(IOError, fs()->OpenAppendStream(directory_path_with_slash)); } @@ -845,10 +847,12 @@ class TestAzureFileSystem : public ::testing::Test { ASSERT_OK(fs()->CreateDir(path1)); ASSERT_RAISES(IOError, fs()->OpenOutputStream(path1)); ASSERT_RAISES(IOError, fs()->OpenAppendStream(path1)); + AssertFileInfo(fs(), path1, FileType::Directory); auto path2 = data.Path("directory2"); ASSERT_OK(fs()->OpenOutputStream(path2)); ASSERT_RAISES(IOError, fs()->CreateDir(path2)); + AssertFileInfo(fs(), path2, FileType::File); } void TestDeleteDirSuccessEmpty() { From f71cc500bfb88c64e731146943d384cd08151475 Mon Sep 17 00:00:00 2001 From: Thomas Newton Date: Sun, 18 Feb 2024 16:42:11 +0000 Subject: [PATCH 10/20] Tidy --- cpp/src/arrow/filesystem/azurefs.cc | 44 +++++++++++++----------- cpp/src/arrow/filesystem/azurefs_test.cc | 7 ++-- 2 files changed, 28 insertions(+), 23 deletions(-) diff --git a/cpp/src/arrow/filesystem/azurefs.cc b/cpp/src/arrow/filesystem/azurefs.cc index 87f8a41fc53..c117ca37f33 100644 --- a/cpp/src/arrow/filesystem/azurefs.cc +++ b/cpp/src/arrow/filesystem/azurefs.cc @@ -528,9 +528,8 @@ class ObjectInputFile final : public io::RandomAccessFile { Status Init() { if (content_length_ != kNoSize) { - // When the user provides the file size we don't validate that its a file. We assume - // the user knows what they are doing. This is only a read so its not a big deal if - // they make a mistake. + // When the user provides the file size we don't validate that its a file. This is + // only a read so its not a big deal if the user make a mistake. DCHECK_GE(content_length_, 0); return Status::OK(); } @@ -722,14 +721,15 @@ class ObjectAppendStream final : public io::OutputStream { ObjectAppendStream(std::shared_ptr block_blob_client, const io::IOContext& io_context, const AzureLocation& location, const std::shared_ptr& metadata, - const AzureOptions& options, + const AzureOptions& options, const bool truncate, std::function ensure_not_flat_namespace_directory, int64_t size = kNoSize) : block_blob_client_(std::move(block_blob_client)), io_context_(io_context), location_(location), - ensure_not_flat_namespace_directory_(std::move(ensure_not_flat_namespace_directory)), - content_length_(size) { + truncate_(truncate), + ensure_not_flat_namespace_directory_( + std::move(ensure_not_flat_namespace_directory)) { if (metadata && metadata->size() != 0) { metadata_ = ArrowMetadataToAzureMetadata(metadata); } else if (options.default_metadata && options.default_metadata->size() != 0) { @@ -743,16 +743,22 @@ class ObjectAppendStream final : public io::OutputStream { // even though it may be more expensive. io::internal::CloseFromDestructor(this); } else { - // Avoid Flushing if we don't need to. If Flush throws an exception in this + // Avoid Flushing if we don't need to. If Flush throws an exception in this // destructor it can't be handled by the caller. io::internal::AbortFromDestructor(this); } } Status Init() { - if (content_length_ != kNoSize) { - DCHECK_GE(content_length_, 0); - pos_ = content_length_; + if (truncate_) { + content_length_ = 0; + pos_ = 0; + // Create an empty file overwriting any existing file, but fail if there is an + // existing directory. + RETURN_NOT_OK(ensure_not_flat_namespace_directory_()); + // On hierarchical namespace CreateEmptyBlockBlob will fail if there is an existing + // directory so we don't need to check like we do on flat namespace. + RETURN_NOT_OK(CreateEmptyBlockBlob(*block_blob_client_)); } else { try { auto properties = block_blob_client_->GetProperties(); @@ -763,6 +769,9 @@ class ObjectAppendStream final : public io::OutputStream { pos_ = content_length_; } catch (const Storage::StorageException& exception) { if (exception.StatusCode == Http::HttpStatusCode::NotFound) { + // No file exists but on flat namespace its possible there is a directory + // marker or an implied directory. Ensure there is no directory before starting + // a new empty file. RETURN_NOT_OK(ensure_not_flat_namespace_directory_()); RETURN_NOT_OK(CreateEmptyBlockBlob(*block_blob_client_)); } else { @@ -878,6 +887,7 @@ class ObjectAppendStream final : public io::OutputStream { std::shared_ptr block_blob_client_; const io::IOContext io_context_; const AzureLocation location_; + const bool truncate_; std::function ensure_not_flat_namespace_directory_; int64_t content_length_ = kNoSize; @@ -1721,17 +1731,9 @@ class AzureFileSystem::Impl { }; std::shared_ptr stream; - if (truncate) { - RETURN_NOT_OK(ensure_not_flat_namespace_directory()); - RETURN_NOT_OK(CreateEmptyBlockBlob(*block_blob_client)); - stream = std::make_shared( - block_blob_client, fs->io_context(), location, metadata, options_, - ensure_not_flat_namespace_directory, 0); - } else { - stream = std::make_shared(block_blob_client, fs->io_context(), - location, metadata, options_, - ensure_not_flat_namespace_directory); - } + stream = std::make_shared(block_blob_client, fs->io_context(), + location, metadata, options_, truncate, + ensure_not_flat_namespace_directory); RETURN_NOT_OK(stream->Init()); return stream; } diff --git a/cpp/src/arrow/filesystem/azurefs_test.cc b/cpp/src/arrow/filesystem/azurefs_test.cc index 76fce445cab..143f47f4c65 100644 --- a/cpp/src/arrow/filesystem/azurefs_test.cc +++ b/cpp/src/arrow/filesystem/azurefs_test.cc @@ -851,7 +851,9 @@ class TestAzureFileSystem : public ::testing::Test { auto path2 = data.Path("directory2"); ASSERT_OK(fs()->OpenOutputStream(path2)); - ASSERT_RAISES(IOError, fs()->CreateDir(path2)); + // CreateDir returns OK even if there is already a file or directory at this + // location. Whether or not this is the desired behaviour is debatable. + ASSERT_OK(fs()->CreateDir(path2)); AssertFileInfo(fs(), path2, FileType::File); } @@ -1592,7 +1594,8 @@ TYPED_TEST(TestAzureFileSystemOnAllScenarios, DisallowReadingOrWritingDirectoryM this->TestDisallowReadingOrWritingDirectoryMarkers(); } -TYPED_TEST(TestAzureFileSystemOnAllScenarios, DisallowCreatingFileAndDirectoryWithTheSameName) { +TYPED_TEST(TestAzureFileSystemOnAllScenarios, + DisallowCreatingFileAndDirectoryWithTheSameName) { this->TestDisallowCreatingFileAndDirectoryWithTheSameName(); } From 37862c235887648c7422d5a5efb299ea9b2aca3c Mon Sep 17 00:00:00 2001 From: Thomas Newton Date: Sun, 18 Feb 2024 16:53:06 +0000 Subject: [PATCH 11/20] Add an assertion that metadata can be written without modifying the file --- cpp/src/arrow/filesystem/azurefs_test.cc | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/cpp/src/arrow/filesystem/azurefs_test.cc b/cpp/src/arrow/filesystem/azurefs_test.cc index 143f47f4c65..5d19c905410 100644 --- a/cpp/src/arrow/filesystem/azurefs_test.cc +++ b/cpp/src/arrow/filesystem/azurefs_test.cc @@ -2202,6 +2202,18 @@ TEST_F(TestAzuriteFileSystem, WriteMetadata) { .Value.Metadata; // Defaults are overwritten and not merged. EXPECT_EQ(Core::CaseInsensitiveMap{std::make_pair("bar", "foo")}, blob_metadata); + + // Metadata can be written without writing any data. + ASSERT_OK_AND_ASSIGN( + output, fs_with_defaults->OpenOutputStream( + full_path, /*metadata=*/arrow::key_value_metadata({{"bar", "baz"}}))); + ASSERT_OK(output->Close()); + blob_metadata = blob_service_client_->GetBlobContainerClient(data.container_name) + .GetBlockBlobClient(blob_path) + .GetProperties() + .Value.Metadata; + // Defaults are overwritten and not merged. + EXPECT_EQ(Core::CaseInsensitiveMap{std::make_pair("bar", "baz")}, blob_metadata); } TEST_F(TestAzuriteFileSystem, OpenOutputStreamSmall) { From 28e9bdd1c9ccac331f4673e5fb0fed2eb344cbd8 Mon Sep 17 00:00:00 2001 From: Thomas Newton Date: Sun, 18 Feb 2024 16:58:28 +0000 Subject: [PATCH 12/20] Avoid errors in destructor more neatly --- cpp/src/arrow/filesystem/azurefs.cc | 21 ++++++++++----------- cpp/src/arrow/io/interfaces.cc | 13 ++----------- cpp/src/arrow/io/util_internal.h | 1 - 3 files changed, 12 insertions(+), 23 deletions(-) diff --git a/cpp/src/arrow/filesystem/azurefs.cc b/cpp/src/arrow/filesystem/azurefs.cc index c117ca37f33..b18530f0ef1 100644 --- a/cpp/src/arrow/filesystem/azurefs.cc +++ b/cpp/src/arrow/filesystem/azurefs.cc @@ -738,15 +738,9 @@ class ObjectAppendStream final : public io::OutputStream { } ~ObjectAppendStream() override { - if (at_least_one_successful_append_) { - // For compliance with the rest of the IO stack, Close rather than Abort, - // even though it may be more expensive. - io::internal::CloseFromDestructor(this); - } else { - // Avoid Flushing if we don't need to. If Flush throws an exception in this - // destructor it can't be handled by the caller. - io::internal::AbortFromDestructor(this); - } + // For compliance with the rest of the IO stack, Close rather than Abort, + // even though it may be more expensive. + io::internal::CloseFromDestructor(this); } Status Init() { @@ -789,6 +783,7 @@ class ObjectAppendStream final : public io::OutputStream { block_ids_.push_back(block.Name); } } + initialised_ = true; return Status::OK(); } @@ -835,6 +830,11 @@ class ObjectAppendStream final : public io::OutputStream { Status Flush() override { RETURN_NOT_OK(CheckClosed("flush")); + if (!initialised_) { + // If the stream has not been successfully initialized then there is nothing to + // flush. This also avoids some unhandled errors when flushing in the destructor. + return Status::OK(); + } return CommitBlockList(block_blob_client_, block_ids_, metadata_); } @@ -880,7 +880,6 @@ class ObjectAppendStream final : public io::OutputStream { block_ids_.push_back(new_block_id); pos_ += nbytes; content_length_ += nbytes; - at_least_one_successful_append_ = true; return Status::OK(); } @@ -892,7 +891,7 @@ class ObjectAppendStream final : public io::OutputStream { int64_t content_length_ = kNoSize; bool closed_ = false; - bool at_least_one_successful_append_ = false; + bool initialised_ = false; int64_t pos_ = 0; std::vector block_ids_; Storage::Metadata metadata_; diff --git a/cpp/src/arrow/io/interfaces.cc b/cpp/src/arrow/io/interfaces.cc index 1fe28fc6226..1d35549cc43 100644 --- a/cpp/src/arrow/io/interfaces.cc +++ b/cpp/src/arrow/io/interfaces.cc @@ -280,7 +280,8 @@ Result> RandomAccessFile::GetStream( namespace internal { -void HandleFileStatusFromDestructor(Status st, FileInterface* file) { +void CloseFromDestructor(FileInterface* file) { + Status st = file->Close(); if (!st.ok()) { auto file_type = typeid(*file).name(); #ifdef NDEBUG @@ -294,16 +295,6 @@ void HandleFileStatusFromDestructor(Status st, FileInterface* file) { } } -void CloseFromDestructor(FileInterface* file) { - Status st = file->Close(); - HandleFileStatusFromDestructor(st, file); -} - -void AbortFromDestructor(FileInterface* file) { - Status st = file->Abort(); - HandleFileStatusFromDestructor(st, file); -} - Result ValidateReadRange(int64_t offset, int64_t size, int64_t file_size) { if (offset < 0 || size < 0) { return Status::Invalid("Invalid read (offset = ", offset, ", size = ", size, ")"); diff --git a/cpp/src/arrow/io/util_internal.h b/cpp/src/arrow/io/util_internal.h index cc42556582b..2015f6a2112 100644 --- a/cpp/src/arrow/io/util_internal.h +++ b/cpp/src/arrow/io/util_internal.h @@ -31,7 +31,6 @@ namespace io { namespace internal { ARROW_EXPORT void CloseFromDestructor(FileInterface* file); -ARROW_EXPORT void AbortFromDestructor(FileInterface* file); // Validate a (offset, size) region (as given to ReadAt) against // the file size. Return the actual read size. From 26d6e96da729f3abb4ce9e406d0b5f42105361fa Mon Sep 17 00:00:00 2001 From: Thomas Newton Date: Sun, 18 Feb 2024 17:04:27 +0000 Subject: [PATCH 13/20] Small update to WriteMetadata test --- cpp/src/arrow/filesystem/azurefs_test.cc | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cpp/src/arrow/filesystem/azurefs_test.cc b/cpp/src/arrow/filesystem/azurefs_test.cc index 5d19c905410..7dabb3304ff 100644 --- a/cpp/src/arrow/filesystem/azurefs_test.cc +++ b/cpp/src/arrow/filesystem/azurefs_test.cc @@ -2205,7 +2205,7 @@ TEST_F(TestAzuriteFileSystem, WriteMetadata) { // Metadata can be written without writing any data. ASSERT_OK_AND_ASSIGN( - output, fs_with_defaults->OpenOutputStream( + output, fs_with_defaults->OpenAppendStream( full_path, /*metadata=*/arrow::key_value_metadata({{"bar", "baz"}}))); ASSERT_OK(output->Close()); blob_metadata = blob_service_client_->GetBlobContainerClient(data.container_name) @@ -2214,6 +2214,7 @@ TEST_F(TestAzuriteFileSystem, WriteMetadata) { .Value.Metadata; // Defaults are overwritten and not merged. EXPECT_EQ(Core::CaseInsensitiveMap{std::make_pair("bar", "baz")}, blob_metadata); + } TEST_F(TestAzuriteFileSystem, OpenOutputStreamSmall) { From ab021131bb0cb85e422f745d8270e4ddb71944e1 Mon Sep 17 00:00:00 2001 From: Thomas Newton Date: Sun, 18 Feb 2024 20:33:42 +0000 Subject: [PATCH 14/20] Auto-format --- cpp/src/arrow/filesystem/azurefs.cc | 12 ++++++------ cpp/src/arrow/filesystem/azurefs_test.cc | 5 ++--- 2 files changed, 8 insertions(+), 9 deletions(-) diff --git a/cpp/src/arrow/filesystem/azurefs.cc b/cpp/src/arrow/filesystem/azurefs.cc index b18530f0ef1..b370abf8bdb 100644 --- a/cpp/src/arrow/filesystem/azurefs.cc +++ b/cpp/src/arrow/filesystem/azurefs.cc @@ -528,7 +528,7 @@ class ObjectInputFile final : public io::RandomAccessFile { Status Init() { if (content_length_ != kNoSize) { - // When the user provides the file size we don't validate that its a file. This is + // When the user provides the file size we don't validate that its a file. This is // only a read so its not a big deal if the user make a mistake. DCHECK_GE(content_length_, 0); return Status::OK(); @@ -747,11 +747,11 @@ class ObjectAppendStream final : public io::OutputStream { if (truncate_) { content_length_ = 0; pos_ = 0; - // Create an empty file overwriting any existing file, but fail if there is an + // Create an empty file overwriting any existing file, but fail if there is an // existing directory. RETURN_NOT_OK(ensure_not_flat_namespace_directory_()); - // On hierarchical namespace CreateEmptyBlockBlob will fail if there is an existing - // directory so we don't need to check like we do on flat namespace. + // On hierarchical namespace CreateEmptyBlockBlob will fail if there is an existing + // directory so we don't need to check like we do on flat namespace. RETURN_NOT_OK(CreateEmptyBlockBlob(*block_blob_client_)); } else { try { @@ -763,7 +763,7 @@ class ObjectAppendStream final : public io::OutputStream { pos_ = content_length_; } catch (const Storage::StorageException& exception) { if (exception.StatusCode == Http::HttpStatusCode::NotFound) { - // No file exists but on flat namespace its possible there is a directory + // No file exists but on flat namespace its possible there is a directory // marker or an implied directory. Ensure there is no directory before starting // a new empty file. RETURN_NOT_OK(ensure_not_flat_namespace_directory_()); @@ -831,7 +831,7 @@ class ObjectAppendStream final : public io::OutputStream { Status Flush() override { RETURN_NOT_OK(CheckClosed("flush")); if (!initialised_) { - // If the stream has not been successfully initialized then there is nothing to + // If the stream has not been successfully initialized then there is nothing to // flush. This also avoids some unhandled errors when flushing in the destructor. return Status::OK(); } diff --git a/cpp/src/arrow/filesystem/azurefs_test.cc b/cpp/src/arrow/filesystem/azurefs_test.cc index 7dabb3304ff..30762ae23c3 100644 --- a/cpp/src/arrow/filesystem/azurefs_test.cc +++ b/cpp/src/arrow/filesystem/azurefs_test.cc @@ -851,8 +851,8 @@ class TestAzureFileSystem : public ::testing::Test { auto path2 = data.Path("directory2"); ASSERT_OK(fs()->OpenOutputStream(path2)); - // CreateDir returns OK even if there is already a file or directory at this - // location. Whether or not this is the desired behaviour is debatable. + // CreateDir returns OK even if there is already a file or directory at this + // location. Whether or not this is the desired behaviour is debatable. ASSERT_OK(fs()->CreateDir(path2)); AssertFileInfo(fs(), path2, FileType::File); } @@ -2214,7 +2214,6 @@ TEST_F(TestAzuriteFileSystem, WriteMetadata) { .Value.Metadata; // Defaults are overwritten and not merged. EXPECT_EQ(Core::CaseInsensitiveMap{std::make_pair("bar", "baz")}, blob_metadata); - } TEST_F(TestAzuriteFileSystem, OpenOutputStreamSmall) { From b905f5d088a487a0517f7c052b4ff047019da80f Mon Sep 17 00:00:00 2001 From: Thomas Newton Date: Tue, 20 Feb 2024 09:52:38 +0000 Subject: [PATCH 15/20] Move arguments to ObjectAppendStream Init instead of constructor --- cpp/src/arrow/filesystem/azurefs.cc | 25 +++++++++---------------- 1 file changed, 9 insertions(+), 16 deletions(-) diff --git a/cpp/src/arrow/filesystem/azurefs.cc b/cpp/src/arrow/filesystem/azurefs.cc index b370abf8bdb..05fab9943f4 100644 --- a/cpp/src/arrow/filesystem/azurefs.cc +++ b/cpp/src/arrow/filesystem/azurefs.cc @@ -721,15 +721,10 @@ class ObjectAppendStream final : public io::OutputStream { ObjectAppendStream(std::shared_ptr block_blob_client, const io::IOContext& io_context, const AzureLocation& location, const std::shared_ptr& metadata, - const AzureOptions& options, const bool truncate, - std::function ensure_not_flat_namespace_directory, - int64_t size = kNoSize) + const AzureOptions& options) : block_blob_client_(std::move(block_blob_client)), io_context_(io_context), - location_(location), - truncate_(truncate), - ensure_not_flat_namespace_directory_( - std::move(ensure_not_flat_namespace_directory)) { + location_(location) { if (metadata && metadata->size() != 0) { metadata_ = ArrowMetadataToAzureMetadata(metadata); } else if (options.default_metadata && options.default_metadata->size() != 0) { @@ -743,13 +738,14 @@ class ObjectAppendStream final : public io::OutputStream { io::internal::CloseFromDestructor(this); } - Status Init() { - if (truncate_) { + Status Init(const bool truncate, + std::function ensure_not_flat_namespace_directory) { + if (truncate) { content_length_ = 0; pos_ = 0; // Create an empty file overwriting any existing file, but fail if there is an // existing directory. - RETURN_NOT_OK(ensure_not_flat_namespace_directory_()); + RETURN_NOT_OK(ensure_not_flat_namespace_directory()); // On hierarchical namespace CreateEmptyBlockBlob will fail if there is an existing // directory so we don't need to check like we do on flat namespace. RETURN_NOT_OK(CreateEmptyBlockBlob(*block_blob_client_)); @@ -766,7 +762,7 @@ class ObjectAppendStream final : public io::OutputStream { // No file exists but on flat namespace its possible there is a directory // marker or an implied directory. Ensure there is no directory before starting // a new empty file. - RETURN_NOT_OK(ensure_not_flat_namespace_directory_()); + RETURN_NOT_OK(ensure_not_flat_namespace_directory()); RETURN_NOT_OK(CreateEmptyBlockBlob(*block_blob_client_)); } else { return ExceptionToStatus( @@ -886,8 +882,6 @@ class ObjectAppendStream final : public io::OutputStream { std::shared_ptr block_blob_client_; const io::IOContext io_context_; const AzureLocation location_; - const bool truncate_; - std::function ensure_not_flat_namespace_directory_; int64_t content_length_ = kNoSize; bool closed_ = false; @@ -1731,9 +1725,8 @@ class AzureFileSystem::Impl { std::shared_ptr stream; stream = std::make_shared(block_blob_client, fs->io_context(), - location, metadata, options_, truncate, - ensure_not_flat_namespace_directory); - RETURN_NOT_OK(stream->Init()); + location, metadata, options_); + RETURN_NOT_OK(stream->Init(truncate, ensure_not_flat_namespace_directory)); return stream; } From aec7b34a7d02477cda7cbeb787e9763cad23919c Mon Sep 17 00:00:00 2001 From: Thomas Newton Date: Wed, 21 Feb 2024 00:23:55 +0000 Subject: [PATCH 16/20] Add a test case for outputstream when container does not exist --- cpp/src/arrow/filesystem/azurefs_test.cc | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/cpp/src/arrow/filesystem/azurefs_test.cc b/cpp/src/arrow/filesystem/azurefs_test.cc index 30762ae23c3..ad0932dd1a6 100644 --- a/cpp/src/arrow/filesystem/azurefs_test.cc +++ b/cpp/src/arrow/filesystem/azurefs_test.cc @@ -857,6 +857,10 @@ class TestAzureFileSystem : public ::testing::Test { AssertFileInfo(fs(), path2, FileType::File); } + void TestOpenOutputStreamWithMissingContainer() { + ASSERT_RAISES(IOError, fs()->OpenOutputStream("not-a-container/file", {})); + } + void TestDeleteDirSuccessEmpty() { if (HasSubmitBatchBug()) { GTEST_SKIP() << kSubmitBatchBugMessage; @@ -1599,6 +1603,11 @@ TYPED_TEST(TestAzureFileSystemOnAllScenarios, this->TestDisallowCreatingFileAndDirectoryWithTheSameName(); } +TYPED_TEST(TestAzureFileSystemOnAllScenarios, + OpenOutputStreamWithMissingContainer) { + this->TestOpenOutputStreamWithMissingContainer(); +} + TYPED_TEST(TestAzureFileSystemOnAllScenarios, DeleteDirSuccessEmpty) { this->TestDeleteDirSuccessEmpty(); } From be3395360df6b10f1daba050933470612cd6e46c Mon Sep 17 00:00:00 2001 From: Thomas Newton Date: Wed, 21 Feb 2024 00:30:05 +0000 Subject: [PATCH 17/20] Avoid unnecessary API call if kContainerNotFound --- cpp/src/arrow/filesystem/azurefs.cc | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/cpp/src/arrow/filesystem/azurefs.cc b/cpp/src/arrow/filesystem/azurefs.cc index 05fab9943f4..151613131a4 100644 --- a/cpp/src/arrow/filesystem/azurefs.cc +++ b/cpp/src/arrow/filesystem/azurefs.cc @@ -17,6 +17,7 @@ #include #include +#include #include #include @@ -1711,15 +1712,19 @@ class AzureFileSystem::Impl { auto ensure_not_flat_namespace_directory = [this, location, blob_container_client]() -> Status { - bool hierarchical_namespace_enabled = - HierarchicalNamespaceSupport(GetFileSystemClient(location.container)) == - HNSSupport::kEnabled; - if (!hierarchical_namespace_enabled) { + ARROW_ASSIGN_OR_RAISE( + auto hns_support, + HierarchicalNamespaceSupport(GetFileSystemClient(location.container))); + if (hns_support == HNSSupport::kDisabled) { + // Flat namespace so we need to GetFileInfo in-case its a directory. ARROW_ASSIGN_OR_RAISE(auto status, GetFileInfo(blob_container_client, location)) if (status.type() == FileType::Directory) { return NotAFile(location); } } + // kContainerNotFound - it doesn't exist, so no need to check if its a directory. + // kEnabled - hierarchical namespace so Azure APIs will fail if its a directory. We + // don't need to explicitly check. return Status::OK(); }; From 5ddd70c4a8b4de06388ebca1857c2bac0ab3fbe8 Mon Sep 17 00:00:00 2001 From: Thomas Newton Date: Wed, 21 Feb 2024 08:42:42 +0000 Subject: [PATCH 18/20] Fix lint --- cpp/src/arrow/filesystem/azurefs_test.cc | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/cpp/src/arrow/filesystem/azurefs_test.cc b/cpp/src/arrow/filesystem/azurefs_test.cc index ad0932dd1a6..649536bb348 100644 --- a/cpp/src/arrow/filesystem/azurefs_test.cc +++ b/cpp/src/arrow/filesystem/azurefs_test.cc @@ -1603,8 +1603,7 @@ TYPED_TEST(TestAzureFileSystemOnAllScenarios, this->TestDisallowCreatingFileAndDirectoryWithTheSameName(); } -TYPED_TEST(TestAzureFileSystemOnAllScenarios, - OpenOutputStreamWithMissingContainer) { +TYPED_TEST(TestAzureFileSystemOnAllScenarios, OpenOutputStreamWithMissingContainer) { this->TestOpenOutputStreamWithMissingContainer(); } From 8ac3c950e57410f38ae5ccecf9ba6f838cddff61 Mon Sep 17 00:00:00 2001 From: Thomas Newton Date: Wed, 21 Feb 2024 08:54:47 +0000 Subject: [PATCH 19/20] Tidy --- cpp/src/arrow/filesystem/azurefs.cc | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/cpp/src/arrow/filesystem/azurefs.cc b/cpp/src/arrow/filesystem/azurefs.cc index 151613131a4..91062ac18a4 100644 --- a/cpp/src/arrow/filesystem/azurefs.cc +++ b/cpp/src/arrow/filesystem/azurefs.cc @@ -17,7 +17,6 @@ #include #include -#include #include #include @@ -530,7 +529,7 @@ class ObjectInputFile final : public io::RandomAccessFile { Status Init() { if (content_length_ != kNoSize) { // When the user provides the file size we don't validate that its a file. This is - // only a read so its not a big deal if the user make a mistake. + // only a read so its not a big deal if the user makes a mistake. DCHECK_GE(content_length_, 0); return Status::OK(); } @@ -744,8 +743,8 @@ class ObjectAppendStream final : public io::OutputStream { if (truncate) { content_length_ = 0; pos_ = 0; - // Create an empty file overwriting any existing file, but fail if there is an - // existing directory. + // We need to create an empty file overwriting any existing file, but + // fail if there is an existing directory. RETURN_NOT_OK(ensure_not_flat_namespace_directory()); // On hierarchical namespace CreateEmptyBlockBlob will fail if there is an existing // directory so we don't need to check like we do on flat namespace. From 111abbbce0717685af93fdb9327c65a28bff3b4e Mon Sep 17 00:00:00 2001 From: Thomas Newton Date: Wed, 21 Feb 2024 10:10:24 +0000 Subject: [PATCH 20/20] Lint again --- cpp/src/arrow/filesystem/azurefs.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/arrow/filesystem/azurefs.cc b/cpp/src/arrow/filesystem/azurefs.cc index 91062ac18a4..5f7737dd44e 100644 --- a/cpp/src/arrow/filesystem/azurefs.cc +++ b/cpp/src/arrow/filesystem/azurefs.cc @@ -743,7 +743,7 @@ class ObjectAppendStream final : public io::OutputStream { if (truncate) { content_length_ = 0; pos_ = 0; - // We need to create an empty file overwriting any existing file, but + // We need to create an empty file overwriting any existing file, but // fail if there is an existing directory. RETURN_NOT_OK(ensure_not_flat_namespace_directory()); // On hierarchical namespace CreateEmptyBlockBlob will fail if there is an existing