Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
183 changes: 127 additions & 56 deletions cpp/src/arrow/filesystem/azurefs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -970,6 +970,81 @@ class AzureFileSystem::Impl {
return stream;
}

private:
Status DeleteDirContentsWihtoutHierarchicalNamespace(const AzureLocation& location,
bool missing_dir_ok) {
auto container_client =
blob_service_client_->GetBlobContainerClient(location.container);
Azure::Storage::Blobs::ListBlobsOptions options;
if (!location.path.empty()) {
options.Prefix = internal::EnsureTrailingSlash(location.path);
}
// https://learn.microsoft.com/en-us/rest/api/storageservices/blob-batch#remarks
//
// Only supports up to 256 subrequests in a single batch. The
// size of the body for a batch request can't exceed 4 MB.
const int32_t kNumMaxRequestsInBatch = 256;
options.PageSizeHint = kNumMaxRequestsInBatch;
try {
auto list_response = container_client.ListBlobs(options);
if (!missing_dir_ok && list_response.Blobs.empty()) {
return PathNotFound(location);
}
for (; list_response.HasPage(); list_response.MoveToNextPage()) {
if (list_response.Blobs.empty()) {
continue;
}
auto batch = container_client.CreateBatch();
std::vector<Azure::Storage::DeferredResponse<
Azure::Storage::Blobs::Models::DeleteBlobResult>>
deferred_responses;
for (const auto& blob_item : list_response.Blobs) {
deferred_responses.push_back(batch.DeleteBlob(blob_item.Name));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Skip when the blob_item.Name matches the options.Prefix because that could be the empty-dir marker blob (an empty blob ending with a /).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that we should do it in #38772 .
We have a test for this case. So we can detect the case when we work on #38772 .

}
try {
container_client.SubmitBatch(batch);
} catch (const Azure::Storage::StorageException& exception) {
return internal::ExceptionToStatus(
"Failed to delete blobs in a directory: " + location.path + ": " +
container_client.GetUrl(),
exception);
}
std::vector<std::string> failed_blob_names;
for (size_t i = 0; i < deferred_responses.size(); ++i) {
const auto& deferred_response = deferred_responses[i];
bool success = true;
try {
auto delete_result = deferred_response.GetResponse();
success = delete_result.Value.Deleted;
} catch (const Azure::Storage::StorageException& exception) {
success = false;
}
if (!success) {
const auto& blob_item = list_response.Blobs[i];
failed_blob_names.push_back(blob_item.Name);
}
}
if (!failed_blob_names.empty()) {
if (failed_blob_names.size() == 1) {
return Status::IOError("Failed to delete a blob: ", failed_blob_names[0],
": " + container_client.GetUrl());
} else {
return Status::IOError("Failed to delete blobs: [",
arrow::internal::JoinStrings(failed_blob_names, ", "),
"]: " + container_client.GetUrl());
}
}
}
} catch (const Azure::Storage::StorageException& exception) {
return internal::ExceptionToStatus(
"Failed to list blobs in a directory: " + location.path + ": " +
container_client.GetUrl(),
exception);
}
return Status::OK();
}

public:
Status DeleteDir(const AzureLocation& location) {
if (location.container.empty()) {
return Status::Invalid("Cannot delete an empty container");
Expand Down Expand Up @@ -1017,69 +1092,64 @@ class AzureFileSystem::Impl {
exception);
}
} else {
auto container_client =
blob_service_client_->GetBlobContainerClient(location.container);
Azure::Storage::Blobs::ListBlobsOptions options;
options.Prefix = internal::EnsureTrailingSlash(location.path);
// https://learn.microsoft.com/en-us/rest/api/storageservices/blob-batch#remarks
//
// Only supports up to 256 subrequests in a single batch. The
// size of the body for a batch request can't exceed 4 MB.
const int32_t kNumMaxRequestsInBatch = 256;
options.PageSizeHint = kNumMaxRequestsInBatch;
return DeleteDirContentsWihtoutHierarchicalNamespace(location,
/*missing_dir_ok=*/true);
}
}

Status DeleteDirContents(const AzureLocation& location, bool missing_dir_ok) {
if (location.container.empty()) {
return internal::InvalidDeleteDirContents(location.all);
}

ARROW_ASSIGN_OR_RAISE(auto hierarchical_namespace_enabled,
hierarchical_namespace_.Enabled(location.container));
if (hierarchical_namespace_enabled) {
auto file_system_client =
datalake_service_client_->GetFileSystemClient(location.container);
auto directory_client = file_system_client.GetDirectoryClient(location.path);
try {
auto list_response = container_client.ListBlobs(options);
while (list_response.HasPage() && !list_response.Blobs.empty()) {
auto batch = container_client.CreateBatch();
std::vector<Azure::Storage::DeferredResponse<
Azure::Storage::Blobs::Models::DeleteBlobResult>>
deferred_responses;
for (const auto& blob_item : list_response.Blobs) {
deferred_responses.push_back(batch.DeleteBlob(blob_item.Name));
}
try {
container_client.SubmitBatch(batch);
} catch (const Azure::Storage::StorageException& exception) {
return internal::ExceptionToStatus(
"Failed to delete blobs in a directory: " + location.path + ": " +
container_client.GetUrl(),
exception);
}
std::vector<std::string> failed_blob_names;
for (size_t i = 0; i < deferred_responses.size(); ++i) {
const auto& deferred_response = deferred_responses[i];
bool success = true;
try {
auto delete_result = deferred_response.GetResponse();
success = delete_result.Value.Deleted;
} catch (const Azure::Storage::StorageException& exception) {
success = false;
}
if (!success) {
const auto& blob_item = list_response.Blobs[i];
failed_blob_names.push_back(blob_item.Name);
}
}
if (!failed_blob_names.empty()) {
if (failed_blob_names.size() == 1) {
return Status::IOError("Failed to delete a blob: ", failed_blob_names[0],
": " + container_client.GetUrl());
auto list_response = directory_client.ListPaths(false);
for (; list_response.HasPage(); list_response.MoveToNextPage()) {
for (const auto& path : list_response.Paths) {
if (path.IsDirectory) {
auto sub_directory_client =
file_system_client.GetDirectoryClient(path.Name);
try {
sub_directory_client.DeleteRecursive();
} catch (const Azure::Storage::StorageException& exception) {
return internal::ExceptionToStatus(
"Failed to delete a sub directory: " + location.container +
internal::kSep + path.Name + ": " + sub_directory_client.GetUrl(),
exception);
}
} else {
return Status::IOError(
"Failed to delete blobs: [",
arrow::internal::JoinStrings(failed_blob_names, ", "),
"]: " + container_client.GetUrl());
auto sub_file_client = file_system_client.GetFileClient(path.Name);
try {
sub_file_client.Delete();
} catch (const Azure::Storage::StorageException& exception) {
return internal::ExceptionToStatus(
"Failed to delete a sub file: " + location.container +
internal::kSep + path.Name + ": " + sub_file_client.GetUrl(),
exception);
}
}
}
list_response.MoveToNextPage();
}
} catch (const Azure::Storage::StorageException& exception) {
return internal::ExceptionToStatus(
"Failed to list blobs in a directory: " + location.path + ": " +
container_client.GetUrl(),
exception);
if (missing_dir_ok &&
exception.StatusCode == Azure::Core::Http::HttpStatusCode::NotFound) {
return Status::OK();
} else {
return internal::ExceptionToStatus(
"Failed to delete directory contents: " + location.path + ": " +
directory_client.GetUrl(),
exception);
}
}
return Status::OK();
} else {
return DeleteDirContentsWihtoutHierarchicalNamespace(location, missing_dir_ok);
}
}
};
Expand Down Expand Up @@ -1121,7 +1191,8 @@ Status AzureFileSystem::DeleteDir(const std::string& path) {
}

Status AzureFileSystem::DeleteDirContents(const std::string& path, bool missing_dir_ok) {
return Status::NotImplemented("The Azure FileSystem is not fully implemented");
ARROW_ASSIGN_OR_RAISE(auto location, AzureLocation::FromString(path));
return impl_->DeleteDirContents(location, missing_dir_ok);
}

Status AzureFileSystem::DeleteRootDirContents() {
Expand Down
105 changes: 105 additions & 0 deletions cpp/src/arrow/filesystem/azurefs_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,45 @@ class AzureFileSystemTest : public ::testing::Test {

void RunGetFileInfoObjectWithNestedStructureTest();
void RunGetFileInfoObjectTest();

struct HierarchicalPaths {
std::string container;
std::string directory;
std::vector<std::string> sub_paths;
};

// Need to use "void" as the return type to use ASSERT_* in this method.
void CreateHierarchicalData(HierarchicalPaths& paths) {
const auto container_path = RandomContainerName();
const auto directory_path =
internal::ConcatAbstractPath(container_path, RandomDirectoryName());
const auto sub_directory_path =
internal::ConcatAbstractPath(directory_path, "new-sub");
const auto sub_blob_path =
internal::ConcatAbstractPath(sub_directory_path, "sub.txt");
const auto top_blob_path = internal::ConcatAbstractPath(directory_path, "top.txt");
ASSERT_OK(fs_->CreateDir(sub_directory_path, true));
ASSERT_OK_AND_ASSIGN(auto output, fs_->OpenOutputStream(sub_blob_path));
ASSERT_OK(output->Write(std::string_view("sub")));
ASSERT_OK(output->Close());
ASSERT_OK_AND_ASSIGN(output, fs_->OpenOutputStream(top_blob_path));
ASSERT_OK(output->Write(std::string_view("top")));
ASSERT_OK(output->Close());

AssertFileInfo(fs_.get(), container_path, FileType::Directory);
AssertFileInfo(fs_.get(), directory_path, FileType::Directory);
AssertFileInfo(fs_.get(), sub_directory_path, FileType::Directory);
AssertFileInfo(fs_.get(), sub_blob_path, FileType::File);
AssertFileInfo(fs_.get(), top_blob_path, FileType::File);

paths.container = container_path;
paths.directory = directory_path;
paths.sub_paths = {
sub_directory_path,
sub_blob_path,
top_blob_path,
};
}
};

class AzuriteFileSystemTest : public AzureFileSystemTest {
Expand Down Expand Up @@ -666,6 +705,72 @@ TEST_F(AzuriteFileSystemTest, DeleteDirUri) {
ASSERT_RAISES(Invalid, fs_->DeleteDir("abfs://" + PreexistingContainerPath()));
}

TEST_F(AzuriteFileSystemTest, DeleteDirContentsSuccessContainer) {
#ifdef __APPLE__
GTEST_SKIP() << "This test fails by an Azurite problem: "
"https://github.com/Azure/Azurite/pull/2302";
#endif
HierarchicalPaths paths;
CreateHierarchicalData(paths);
ASSERT_OK(fs_->DeleteDirContents(paths.container));
arrow::fs::AssertFileInfo(fs_.get(), paths.container, FileType::Directory);
arrow::fs::AssertFileInfo(fs_.get(), paths.directory, FileType::NotFound);
for (const auto& sub_path : paths.sub_paths) {
arrow::fs::AssertFileInfo(fs_.get(), sub_path, FileType::NotFound);
}
}

TEST_F(AzuriteFileSystemTest, DeleteDirContentsSuccessDirectory) {
#ifdef __APPLE__
GTEST_SKIP() << "This test fails by an Azurite problem: "
"https://github.com/Azure/Azurite/pull/2302";
#endif
HierarchicalPaths paths;
CreateHierarchicalData(paths);
ASSERT_OK(fs_->DeleteDirContents(paths.directory));
// GH-38772: We may change this to FileType::Directory.
arrow::fs::AssertFileInfo(fs_.get(), paths.directory, FileType::NotFound);
for (const auto& sub_path : paths.sub_paths) {
arrow::fs::AssertFileInfo(fs_.get(), sub_path, FileType::NotFound);
}
}

TEST_F(AzuriteFileSystemTest, DeleteDirContentsSuccessNonexistent) {
const auto directory_path =
internal::ConcatAbstractPath(PreexistingContainerName(), RandomDirectoryName());
ASSERT_OK(fs_->DeleteDirContents(directory_path, true));
arrow::fs::AssertFileInfo(fs_.get(), directory_path, FileType::NotFound);
}

TEST_F(AzuriteFileSystemTest, DeleteDirContentsFailureNonexistent) {
const auto directory_path =
internal::ConcatAbstractPath(PreexistingContainerName(), RandomDirectoryName());
ASSERT_RAISES(IOError, fs_->DeleteDirContents(directory_path, false));
}

TEST_F(AzureHierarchicalNamespaceFileSystemTest, DeleteDirContentsSuccessExist) {
HierarchicalPaths paths;
CreateHierarchicalData(paths);
ASSERT_OK(fs_->DeleteDirContents(paths.directory));
arrow::fs::AssertFileInfo(fs_.get(), paths.directory, FileType::Directory);
for (const auto& sub_path : paths.sub_paths) {
arrow::fs::AssertFileInfo(fs_.get(), sub_path, FileType::NotFound);
}
}

TEST_F(AzureHierarchicalNamespaceFileSystemTest, DeleteDirContentsSuccessNonexistent) {
const auto directory_path =
internal::ConcatAbstractPath(PreexistingContainerName(), RandomDirectoryName());
ASSERT_OK(fs_->DeleteDirContents(directory_path, true));
arrow::fs::AssertFileInfo(fs_.get(), directory_path, FileType::NotFound);
}

TEST_F(AzureHierarchicalNamespaceFileSystemTest, DeleteDirContentsFailureNonexistent) {
const auto directory_path =
internal::ConcatAbstractPath(PreexistingContainerName(), RandomDirectoryName());
ASSERT_RAISES(IOError, fs_->DeleteDirContents(directory_path, false));
}

TEST_F(AzuriteFileSystemTest, OpenInputStreamString) {
std::shared_ptr<io::InputStream> stream;
ASSERT_OK_AND_ASSIGN(stream, fs_->OpenInputStream(PreexistingObjectPath()));
Expand Down