From 2f8d4eb8325796ca1df0fd2621b1ccf10873649b Mon Sep 17 00:00:00 2001 From: Carlos O'Ryan Date: Fri, 3 Dec 2021 13:52:45 +0000 Subject: [PATCH 1/6] ARROW-14916: [C++] GcsFileSystem can delete directories --- cpp/src/arrow/filesystem/gcsfs.cc | 79 +++++++++++++++++++++++++- cpp/src/arrow/filesystem/gcsfs.h | 3 + cpp/src/arrow/filesystem/gcsfs_test.cc | 28 +++++++++ 3 files changed, 108 insertions(+), 2 deletions(-) diff --git a/cpp/src/arrow/filesystem/gcsfs.cc b/cpp/src/arrow/filesystem/gcsfs.cc index 3e8d8468b32..c5b2c09dc01 100644 --- a/cpp/src/arrow/filesystem/gcsfs.cc +++ b/cpp/src/arrow/filesystem/gcsfs.cc @@ -18,6 +18,9 @@ #include "arrow/filesystem/gcsfs.h" #include +#include +#include +#include #include "arrow/buffer.h" #include "arrow/filesystem/gcsfs_internal.h" @@ -342,6 +345,76 @@ class GcsFileSystem::Impl { return internal::ToArrowStatus(CreateDirMarkerRecursive(p.bucket, p.object)); } + Status DeleteDir(const GcsPath& p) { + RETURN_NOT_OK(DeleteDirContents(p)); + if (!p.object.empty()) { + return internal::ToArrowStatus(client_.DeleteObject(p.bucket, p.object)); + } + return internal::ToArrowStatus(client_.DeleteBucket(p.bucket)); + } + + Status DeleteDirContents(const GcsPath& p) { + // Deleting large directories can be fairly slow, we need to parallelize the + // operation. This uses `std::async()` to run multiple delete operations in parallel. + // A simple form of flow control limits the number of operations running in + // parallel. + + // Keep the first error. The iteration stops when one or more errors are detected, + // but because multiple operations may be running in parallel we may receive more + // than one failure. + google::cloud::Status status; + auto keep_errors = [](google::cloud::Status a, google::cloud::Status b) { + return a.ok() ? std::move(a) : std::move(b); + }; + + using Future = std::future; + std::vector pending; + // Limit the number of operations in parallel. If the limit is reached it blocks until + // at least 1/2 of the operations have completed. Preserve the error status. + auto flow_control = [&] { + const auto hwm = static_cast(options_.maximum_concurrent_operations); + const auto lwm = hwm / 2; + if (pending.size() < hwm) return; + while (pending.size() > lwm && status.ok()) { + // Move any futures that are ready to the end of the vector. + auto end = std::partition(pending.begin(), pending.end(), [](Future& f) { + return f.wait_for(std::chrono::milliseconds(1)) != std::future_status::ready; + }); + // From the ready futures, discover if any finished with an error status + status = std::accumulate(end, pending.end(), std::move(status), + [&](google::cloud::Status s, Future& f) { + return keep_errors(std::move(s), f.get()); + }); + pending.erase(end, pending.end()); + } + }; + + auto async_delete = [](gcs::Client& client, gcs::ObjectMetadata const& o) { + return client.DeleteObject(o.bucket(), o.name(), gcs::Generation(o.generation())); + }; + + // This iterates over all the objects, and schedules parallel deletes. + auto prefix = p.object.empty() ? gcs::Prefix() : gcs::Prefix(p.object); + for (auto& o : client_.ListObjects(p.bucket, prefix)) { + if (!status.ok()) break; + if (!o) { + status = std::move(o).status(); + break; + } + // The list includes the directory, skip it. DeleteDir() takes care of it. + if (o->bucket() == p.bucket && o->name() == p.object) continue; + pending.push_back( + std::async(std::launch::async, async_delete, std::ref(client_), *std::move(o))); + flow_control(); + } + // Wait for any pending operations and return the first error. + return internal::ToArrowStatus( + std::accumulate(pending.begin(), pending.end(), std::move(status), + [&](google::cloud::Status s, Future& f) { + return keep_errors(std::move(s), f.get()); + })); + } + Status DeleteFile(const GcsPath& p) { if (!p.object.empty() && p.object.back() == '/') { return Status::IOError("The given path (" + p.full_path + @@ -424,7 +497,8 @@ class GcsFileSystem::Impl { }; bool GcsOptions::Equals(const GcsOptions& other) const { - return endpoint_override == other.endpoint_override && scheme == other.scheme; + return endpoint_override == other.endpoint_override && scheme == other.scheme && + maximum_concurrent_operations == other.maximum_concurrent_operations; } std::string GcsFileSystem::type_name() const { return "gcs"; } @@ -456,7 +530,8 @@ Status GcsFileSystem::CreateDir(const std::string& path, bool recursive) { } Status GcsFileSystem::DeleteDir(const std::string& path) { - return Status::NotImplemented("The GCS FileSystem is not fully implemented"); + ARROW_ASSIGN_OR_RAISE(auto p, GcsPath::FromString(path)); + return impl_->DeleteDir(p); } Status GcsFileSystem::DeleteDirContents(const std::string& path) { diff --git a/cpp/src/arrow/filesystem/gcsfs.h b/cpp/src/arrow/filesystem/gcsfs.h index 4e5173e6e3a..abe9f0f6bd9 100644 --- a/cpp/src/arrow/filesystem/gcsfs.h +++ b/cpp/src/arrow/filesystem/gcsfs.h @@ -37,6 +37,9 @@ struct ARROW_EXPORT GcsOptions { std::string endpoint_override; std::string scheme; + /// Limits the number of concurrent operations, such as Object deletes. + int maximum_concurrent_operations = 64; + bool Equals(const GcsOptions& other) const; }; diff --git a/cpp/src/arrow/filesystem/gcsfs_test.cc b/cpp/src/arrow/filesystem/gcsfs_test.cc index 04390307676..6dbf57921f2 100644 --- a/cpp/src/arrow/filesystem/gcsfs_test.cc +++ b/cpp/src/arrow/filesystem/gcsfs_test.cc @@ -414,6 +414,34 @@ TEST_F(GcsIntegrationTest, CreateDirRecursiveBucketAndFolder) { arrow::fs::AssertFileInfo(fs.get(), "new-bucket/", FileType::Directory); } +TEST_F(GcsIntegrationTest, DeleteDirSuccess) { + auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions()); + const char* const kTestFolders[] = { + "a/", "a/0/", "a/0/0/", "a/1/", "a/2/", + }; + for (auto const* f : kTestFolders) { + const auto folder = PreexistingBucketPath() + f; + ASSERT_OK(fs->CreateDir(folder, true)); + for (int i = 0; i != 64; ++i) { + const auto filename = folder + "test-file-" + std::to_string(i); + ASSERT_OK_AND_ASSIGN(auto w, fs->OpenOutputStream(filename, {})); + ASSERT_OK(w->Write(filename.data(), filename.size())); + ASSERT_OK(w->Close()); + } + } + + ASSERT_OK(fs->DeleteDir(PreexistingBucketPath() + kTestFolders[0])); + + for (auto const* f : kTestFolders) { + const auto folder = PreexistingBucketPath() + f; + arrow::fs::AssertFileInfo(fs.get(), folder, FileType::NotFound); + for (int i = 0; i != 64; ++i) { + const auto filename = folder + "test-file-" + std::to_string(i); + arrow::fs::AssertFileInfo(fs.get(), filename, FileType::NotFound); + } + } +} + TEST_F(GcsIntegrationTest, DeleteRootDirContents) { auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions()); EXPECT_RAISES_WITH_MESSAGE_THAT(NotImplemented, HasSubstr("too dangerous"), From df10c1fbacc2ff0a869ea94249f2e6718d6bdd4e Mon Sep 17 00:00:00 2001 From: Carlos O'Ryan Date: Thu, 9 Dec 2021 15:09:43 +0000 Subject: [PATCH 2/6] Address review comments --- cpp/src/arrow/filesystem/gcsfs.cc | 84 +++++++++++-------------------- 1 file changed, 30 insertions(+), 54 deletions(-) diff --git a/cpp/src/arrow/filesystem/gcsfs.cc b/cpp/src/arrow/filesystem/gcsfs.cc index c5b2c09dc01..982a29915f8 100644 --- a/cpp/src/arrow/filesystem/gcsfs.cc +++ b/cpp/src/arrow/filesystem/gcsfs.cc @@ -19,14 +19,13 @@ #include #include -#include -#include #include "arrow/buffer.h" #include "arrow/filesystem/gcsfs_internal.h" #include "arrow/filesystem/path_util.h" #include "arrow/result.h" #include "arrow/util/checked_cast.h" +#include "arrow/util/thread_pool.h" #define ARROW_GCS_RETURN_NOT_OK(expr) \ if (!expr.ok()) return internal::ToArrowStatus(expr) @@ -345,74 +344,51 @@ class GcsFileSystem::Impl { return internal::ToArrowStatus(CreateDirMarkerRecursive(p.bucket, p.object)); } - Status DeleteDir(const GcsPath& p) { - RETURN_NOT_OK(DeleteDirContents(p)); + Status DeleteDir(const GcsPath& p, const io::IOContext& io_context) { + RETURN_NOT_OK(DeleteDirContents(p, io_context)); if (!p.object.empty()) { return internal::ToArrowStatus(client_.DeleteObject(p.bucket, p.object)); } return internal::ToArrowStatus(client_.DeleteBucket(p.bucket)); } - Status DeleteDirContents(const GcsPath& p) { + Status DeleteDirContents(const GcsPath& p, const io::IOContext& io_context) { // Deleting large directories can be fairly slow, we need to parallelize the // operation. This uses `std::async()` to run multiple delete operations in parallel. // A simple form of flow control limits the number of operations running in // parallel. - // Keep the first error. The iteration stops when one or more errors are detected, - // but because multiple operations may be running in parallel we may receive more - // than one failure. - google::cloud::Status status; - auto keep_errors = [](google::cloud::Status a, google::cloud::Status b) { - return a.ok() ? std::move(a) : std::move(b); - }; - - using Future = std::future; - std::vector pending; - // Limit the number of operations in parallel. If the limit is reached it blocks until - // at least 1/2 of the operations have completed. Preserve the error status. - auto flow_control = [&] { - const auto hwm = static_cast(options_.maximum_concurrent_operations); - const auto lwm = hwm / 2; - if (pending.size() < hwm) return; - while (pending.size() > lwm && status.ok()) { - // Move any futures that are ready to the end of the vector. - auto end = std::partition(pending.begin(), pending.end(), [](Future& f) { - return f.wait_for(std::chrono::milliseconds(1)) != std::future_status::ready; - }); - // From the ready futures, discover if any finished with an error status - status = std::accumulate(end, pending.end(), std::move(status), - [&](google::cloud::Status s, Future& f) { - return keep_errors(std::move(s), f.get()); - }); - pending.erase(end, pending.end()); - } - }; - - auto async_delete = [](gcs::Client& client, gcs::ObjectMetadata const& o) { - return client.DeleteObject(o.bucket(), o.name(), gcs::Generation(o.generation())); + auto async_delete = + [&p](gcs::Client& client, + google::cloud::StatusOr o) -> google::cloud::Status { + if (!o) return std::move(o).status(); + // The list includes the directory, skip it. DeleteDir() takes care of it. + if (o->bucket() == p.bucket && o->name() == p.object) return {}; + return client.DeleteObject(o->bucket(), o->name(), + gcs::Generation(o->generation())); }; + using Future = arrow::Future; + std::vector> submitted; // This iterates over all the objects, and schedules parallel deletes. auto prefix = p.object.empty() ? gcs::Prefix() : gcs::Prefix(p.object); for (auto& o : client_.ListObjects(p.bucket, prefix)) { - if (!status.ok()) break; - if (!o) { - status = std::move(o).status(); - break; - } - // The list includes the directory, skip it. DeleteDir() takes care of it. - if (o->bucket() == p.bucket && o->name() == p.object) continue; - pending.push_back( - std::async(std::launch::async, async_delete, std::ref(client_), *std::move(o))); - flow_control(); + submitted.push_back( + io_context.executor()->Submit(async_delete, std::ref(client_), std::move(o))); } - // Wait for any pending operations and return the first error. - return internal::ToArrowStatus( - std::accumulate(pending.begin(), pending.end(), std::move(status), - [&](google::cloud::Status s, Future& f) { - return keep_errors(std::move(s), f.get()); - })); + + std::vector results(submitted.size()); + std::transform(submitted.begin(), submitted.end(), results.begin(), + [](Result& r) { + if (!r.ok()) return r.status(); + auto f = r.MoveValueUnsafe().MoveResult(); + if (!f.ok()) return f.status(); + return internal::ToArrowStatus(f.MoveValueUnsafe()); + }); + for (auto& r : results) { + if (!r.ok()) return r; + } + return {}; } Status DeleteFile(const GcsPath& p) { @@ -531,7 +507,7 @@ Status GcsFileSystem::CreateDir(const std::string& path, bool recursive) { Status GcsFileSystem::DeleteDir(const std::string& path) { ARROW_ASSIGN_OR_RAISE(auto p, GcsPath::FromString(path)); - return impl_->DeleteDir(p); + return impl_->DeleteDir(p, io_context()); } Status GcsFileSystem::DeleteDirContents(const std::string& path) { From 674aefcd72cf5d1d4d1d19107cd729851f62e4eb Mon Sep 17 00:00:00 2001 From: Carlos O'Ryan Date: Thu, 9 Dec 2021 15:53:18 +0000 Subject: [PATCH 3/6] Address other review comments --- cpp/src/arrow/filesystem/gcsfs.cc | 3 +-- cpp/src/arrow/filesystem/gcsfs.h | 3 --- 2 files changed, 1 insertion(+), 5 deletions(-) diff --git a/cpp/src/arrow/filesystem/gcsfs.cc b/cpp/src/arrow/filesystem/gcsfs.cc index 982a29915f8..4760fcd212b 100644 --- a/cpp/src/arrow/filesystem/gcsfs.cc +++ b/cpp/src/arrow/filesystem/gcsfs.cc @@ -473,8 +473,7 @@ class GcsFileSystem::Impl { }; bool GcsOptions::Equals(const GcsOptions& other) const { - return endpoint_override == other.endpoint_override && scheme == other.scheme && - maximum_concurrent_operations == other.maximum_concurrent_operations; + return endpoint_override == other.endpoint_override && scheme == other.scheme; } std::string GcsFileSystem::type_name() const { return "gcs"; } diff --git a/cpp/src/arrow/filesystem/gcsfs.h b/cpp/src/arrow/filesystem/gcsfs.h index abe9f0f6bd9..4e5173e6e3a 100644 --- a/cpp/src/arrow/filesystem/gcsfs.h +++ b/cpp/src/arrow/filesystem/gcsfs.h @@ -37,9 +37,6 @@ struct ARROW_EXPORT GcsOptions { std::string endpoint_override; std::string scheme; - /// Limits the number of concurrent operations, such as Object deletes. - int maximum_concurrent_operations = 64; - bool Equals(const GcsOptions& other) const; }; From 9985bc828d90fe870628e0eeefc11d845fe1e839 Mon Sep 17 00:00:00 2001 From: Carlos O'Ryan Date: Thu, 9 Dec 2021 17:37:21 +0000 Subject: [PATCH 4/6] Address review comments --- cpp/src/arrow/filesystem/gcsfs.cc | 28 +++++++--------------------- 1 file changed, 7 insertions(+), 21 deletions(-) diff --git a/cpp/src/arrow/filesystem/gcsfs.cc b/cpp/src/arrow/filesystem/gcsfs.cc index 4760fcd212b..e5cfba9c0f3 100644 --- a/cpp/src/arrow/filesystem/gcsfs.cc +++ b/cpp/src/arrow/filesystem/gcsfs.cc @@ -359,36 +359,22 @@ class GcsFileSystem::Impl { // parallel. auto async_delete = - [&p](gcs::Client& client, - google::cloud::StatusOr o) -> google::cloud::Status { - if (!o) return std::move(o).status(); + [&p, this](const google::cloud::StatusOr& o) -> Status { + if (!o) return internal::ToArrowStatus(o.status()); // The list includes the directory, skip it. DeleteDir() takes care of it. if (o->bucket() == p.bucket && o->name() == p.object) return {}; - return client.DeleteObject(o->bucket(), o->name(), - gcs::Generation(o->generation())); + return internal::ToArrowStatus( + client_.DeleteObject(o->bucket(), o->name(), gcs::Generation(o->generation()))); }; - using Future = arrow::Future; - std::vector> submitted; + std::vector> submitted; // This iterates over all the objects, and schedules parallel deletes. auto prefix = p.object.empty() ? gcs::Prefix() : gcs::Prefix(p.object); for (auto& o : client_.ListObjects(p.bucket, prefix)) { - submitted.push_back( - io_context.executor()->Submit(async_delete, std::ref(client_), std::move(o))); + submitted.push_back(DeferNotOk(io_context.executor()->Submit(async_delete, o))); } - std::vector results(submitted.size()); - std::transform(submitted.begin(), submitted.end(), results.begin(), - [](Result& r) { - if (!r.ok()) return r.status(); - auto f = r.MoveValueUnsafe().MoveResult(); - if (!f.ok()) return f.status(); - return internal::ToArrowStatus(f.MoveValueUnsafe()); - }); - for (auto& r : results) { - if (!r.ok()) return r; - } - return {}; + return AllFinished(submitted).status(); } Status DeleteFile(const GcsPath& p) { From 9734ced12acae128ee2781e7f997a7186b52553c Mon Sep 17 00:00:00 2001 From: Carlos O'Ryan Date: Thu, 9 Dec 2021 17:41:45 +0000 Subject: [PATCH 5/6] Improve testing per review --- cpp/src/arrow/filesystem/gcsfs_test.cc | 2 ++ 1 file changed, 2 insertions(+) diff --git a/cpp/src/arrow/filesystem/gcsfs_test.cc b/cpp/src/arrow/filesystem/gcsfs_test.cc index 6dbf57921f2..112a99bf567 100644 --- a/cpp/src/arrow/filesystem/gcsfs_test.cc +++ b/cpp/src/arrow/filesystem/gcsfs_test.cc @@ -431,6 +431,8 @@ TEST_F(GcsIntegrationTest, DeleteDirSuccess) { } ASSERT_OK(fs->DeleteDir(PreexistingBucketPath() + kTestFolders[0])); + arrow::fs::AssertFileInfo(fs.get(), PreexistingBucketPath(), FileType::Directory); + arrow::fs::AssertFileInfo(fs.get(), PreexistingObjectPath(), FileType::File); for (auto const* f : kTestFolders) { const auto folder = PreexistingBucketPath() + f; From 775a4a9b3b67cd7c4abe1f456d0878eafb95b2c2 Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Thu, 9 Dec 2021 19:08:30 +0100 Subject: [PATCH 6/6] Update comment, and a nit --- cpp/src/arrow/filesystem/gcsfs.cc | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/cpp/src/arrow/filesystem/gcsfs.cc b/cpp/src/arrow/filesystem/gcsfs.cc index e5cfba9c0f3..9c3ce804b09 100644 --- a/cpp/src/arrow/filesystem/gcsfs.cc +++ b/cpp/src/arrow/filesystem/gcsfs.cc @@ -354,10 +354,7 @@ class GcsFileSystem::Impl { Status DeleteDirContents(const GcsPath& p, const io::IOContext& io_context) { // Deleting large directories can be fairly slow, we need to parallelize the - // operation. This uses `std::async()` to run multiple delete operations in parallel. - // A simple form of flow control limits the number of operations running in - // parallel. - + // operation. auto async_delete = [&p, this](const google::cloud::StatusOr& o) -> Status { if (!o) return internal::ToArrowStatus(o.status()); @@ -370,7 +367,7 @@ class GcsFileSystem::Impl { std::vector> submitted; // This iterates over all the objects, and schedules parallel deletes. auto prefix = p.object.empty() ? gcs::Prefix() : gcs::Prefix(p.object); - for (auto& o : client_.ListObjects(p.bucket, prefix)) { + for (const auto& o : client_.ListObjects(p.bucket, prefix)) { submitted.push_back(DeferNotOk(io_context.executor()->Submit(async_delete, o))); }