diff --git a/cpp/src/arrow/filesystem/gcsfs.cc b/cpp/src/arrow/filesystem/gcsfs.cc index 3e8d8468b32..9c3ce804b09 100644 --- a/cpp/src/arrow/filesystem/gcsfs.cc +++ b/cpp/src/arrow/filesystem/gcsfs.cc @@ -18,12 +18,14 @@ #include "arrow/filesystem/gcsfs.h" #include +#include #include "arrow/buffer.h" #include "arrow/filesystem/gcsfs_internal.h" #include "arrow/filesystem/path_util.h" #include "arrow/result.h" #include "arrow/util/checked_cast.h" +#include "arrow/util/thread_pool.h" #define ARROW_GCS_RETURN_NOT_OK(expr) \ if (!expr.ok()) return internal::ToArrowStatus(expr) @@ -342,6 +344,36 @@ class GcsFileSystem::Impl { return internal::ToArrowStatus(CreateDirMarkerRecursive(p.bucket, p.object)); } + Status DeleteDir(const GcsPath& p, const io::IOContext& io_context) { + RETURN_NOT_OK(DeleteDirContents(p, io_context)); + if (!p.object.empty()) { + return internal::ToArrowStatus(client_.DeleteObject(p.bucket, p.object)); + } + return internal::ToArrowStatus(client_.DeleteBucket(p.bucket)); + } + + Status DeleteDirContents(const GcsPath& p, const io::IOContext& io_context) { + // Deleting large directories can be fairly slow, we need to parallelize the + // operation. + auto async_delete = + [&p, this](const google::cloud::StatusOr& o) -> Status { + if (!o) return internal::ToArrowStatus(o.status()); + // The list includes the directory, skip it. DeleteDir() takes care of it. + if (o->bucket() == p.bucket && o->name() == p.object) return {}; + return internal::ToArrowStatus( + client_.DeleteObject(o->bucket(), o->name(), gcs::Generation(o->generation()))); + }; + + std::vector> submitted; + // This iterates over all the objects, and schedules parallel deletes. + auto prefix = p.object.empty() ? gcs::Prefix() : gcs::Prefix(p.object); + for (const auto& o : client_.ListObjects(p.bucket, prefix)) { + submitted.push_back(DeferNotOk(io_context.executor()->Submit(async_delete, o))); + } + + return AllFinished(submitted).status(); + } + Status DeleteFile(const GcsPath& p) { if (!p.object.empty() && p.object.back() == '/') { return Status::IOError("The given path (" + p.full_path + @@ -456,7 +488,8 @@ Status GcsFileSystem::CreateDir(const std::string& path, bool recursive) { } Status GcsFileSystem::DeleteDir(const std::string& path) { - return Status::NotImplemented("The GCS FileSystem is not fully implemented"); + ARROW_ASSIGN_OR_RAISE(auto p, GcsPath::FromString(path)); + return impl_->DeleteDir(p, io_context()); } Status GcsFileSystem::DeleteDirContents(const std::string& path) { diff --git a/cpp/src/arrow/filesystem/gcsfs_test.cc b/cpp/src/arrow/filesystem/gcsfs_test.cc index 04390307676..112a99bf567 100644 --- a/cpp/src/arrow/filesystem/gcsfs_test.cc +++ b/cpp/src/arrow/filesystem/gcsfs_test.cc @@ -414,6 +414,36 @@ TEST_F(GcsIntegrationTest, CreateDirRecursiveBucketAndFolder) { arrow::fs::AssertFileInfo(fs.get(), "new-bucket/", FileType::Directory); } +TEST_F(GcsIntegrationTest, DeleteDirSuccess) { + auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions()); + const char* const kTestFolders[] = { + "a/", "a/0/", "a/0/0/", "a/1/", "a/2/", + }; + for (auto const* f : kTestFolders) { + const auto folder = PreexistingBucketPath() + f; + ASSERT_OK(fs->CreateDir(folder, true)); + for (int i = 0; i != 64; ++i) { + const auto filename = folder + "test-file-" + std::to_string(i); + ASSERT_OK_AND_ASSIGN(auto w, fs->OpenOutputStream(filename, {})); + ASSERT_OK(w->Write(filename.data(), filename.size())); + ASSERT_OK(w->Close()); + } + } + + ASSERT_OK(fs->DeleteDir(PreexistingBucketPath() + kTestFolders[0])); + arrow::fs::AssertFileInfo(fs.get(), PreexistingBucketPath(), FileType::Directory); + arrow::fs::AssertFileInfo(fs.get(), PreexistingObjectPath(), FileType::File); + + for (auto const* f : kTestFolders) { + const auto folder = PreexistingBucketPath() + f; + arrow::fs::AssertFileInfo(fs.get(), folder, FileType::NotFound); + for (int i = 0; i != 64; ++i) { + const auto filename = folder + "test-file-" + std::to_string(i); + arrow::fs::AssertFileInfo(fs.get(), filename, FileType::NotFound); + } + } +} + TEST_F(GcsIntegrationTest, DeleteRootDirContents) { auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions()); EXPECT_RAISES_WITH_MESSAGE_THAT(NotImplemented, HasSubstr("too dangerous"),