Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 34 additions & 1 deletion cpp/src/arrow/filesystem/gcsfs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@
#include "arrow/filesystem/gcsfs.h"

#include <google/cloud/storage/client.h>
#include <algorithm>

#include "arrow/buffer.h"
#include "arrow/filesystem/gcsfs_internal.h"
#include "arrow/filesystem/path_util.h"
#include "arrow/result.h"
#include "arrow/util/checked_cast.h"
#include "arrow/util/thread_pool.h"

#define ARROW_GCS_RETURN_NOT_OK(expr) \
if (!expr.ok()) return internal::ToArrowStatus(expr)
Expand Down Expand Up @@ -342,6 +344,36 @@ class GcsFileSystem::Impl {
return internal::ToArrowStatus(CreateDirMarkerRecursive(p.bucket, p.object));
}

Status DeleteDir(const GcsPath& p, const io::IOContext& io_context) {
RETURN_NOT_OK(DeleteDirContents(p, io_context));
if (!p.object.empty()) {
return internal::ToArrowStatus(client_.DeleteObject(p.bucket, p.object));
}
return internal::ToArrowStatus(client_.DeleteBucket(p.bucket));
}

Status DeleteDirContents(const GcsPath& p, const io::IOContext& io_context) {
// Deleting large directories can be fairly slow, we need to parallelize the
// operation.
auto async_delete =
[&p, this](const google::cloud::StatusOr<gcs::ObjectMetadata>& o) -> Status {
if (!o) return internal::ToArrowStatus(o.status());
// The list includes the directory, skip it. DeleteDir() takes care of it.
if (o->bucket() == p.bucket && o->name() == p.object) return {};
return internal::ToArrowStatus(
client_.DeleteObject(o->bucket(), o->name(), gcs::Generation(o->generation())));
};

std::vector<Future<>> submitted;
// This iterates over all the objects, and schedules parallel deletes.
auto prefix = p.object.empty() ? gcs::Prefix() : gcs::Prefix(p.object);
for (const auto& o : client_.ListObjects(p.bucket, prefix)) {
submitted.push_back(DeferNotOk(io_context.executor()->Submit(async_delete, o)));
}

return AllFinished(submitted).status();
}

Status DeleteFile(const GcsPath& p) {
if (!p.object.empty() && p.object.back() == '/') {
return Status::IOError("The given path (" + p.full_path +
Expand Down Expand Up @@ -456,7 +488,8 @@ Status GcsFileSystem::CreateDir(const std::string& path, bool recursive) {
}

Status GcsFileSystem::DeleteDir(const std::string& path) {
return Status::NotImplemented("The GCS FileSystem is not fully implemented");
ARROW_ASSIGN_OR_RAISE(auto p, GcsPath::FromString(path));
return impl_->DeleteDir(p, io_context());
}

Status GcsFileSystem::DeleteDirContents(const std::string& path) {
Expand Down
30 changes: 30 additions & 0 deletions cpp/src/arrow/filesystem/gcsfs_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,36 @@ TEST_F(GcsIntegrationTest, CreateDirRecursiveBucketAndFolder) {
arrow::fs::AssertFileInfo(fs.get(), "new-bucket/", FileType::Directory);
}

TEST_F(GcsIntegrationTest, DeleteDirSuccess) {
auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions());
const char* const kTestFolders[] = {
"a/", "a/0/", "a/0/0/", "a/1/", "a/2/",
};
for (auto const* f : kTestFolders) {
const auto folder = PreexistingBucketPath() + f;
ASSERT_OK(fs->CreateDir(folder, true));
for (int i = 0; i != 64; ++i) {
const auto filename = folder + "test-file-" + std::to_string(i);
ASSERT_OK_AND_ASSIGN(auto w, fs->OpenOutputStream(filename, {}));
ASSERT_OK(w->Write(filename.data(), filename.size()));
ASSERT_OK(w->Close());
}
}

ASSERT_OK(fs->DeleteDir(PreexistingBucketPath() + kTestFolders[0]));
arrow::fs::AssertFileInfo(fs.get(), PreexistingBucketPath(), FileType::Directory);
arrow::fs::AssertFileInfo(fs.get(), PreexistingObjectPath(), FileType::File);

for (auto const* f : kTestFolders) {
const auto folder = PreexistingBucketPath() + f;
arrow::fs::AssertFileInfo(fs.get(), folder, FileType::NotFound);
for (int i = 0; i != 64; ++i) {
const auto filename = folder + "test-file-" + std::to_string(i);
arrow::fs::AssertFileInfo(fs.get(), filename, FileType::NotFound);
}
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should you test that other paths in the bucket still exist?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


TEST_F(GcsIntegrationTest, DeleteRootDirContents) {
auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions());
EXPECT_RAISES_WITH_MESSAGE_THAT(NotImplemented, HasSubstr("too dangerous"),
Expand Down