diff --git a/cpp/src/arrow/filesystem/gcsfs.cc b/cpp/src/arrow/filesystem/gcsfs.cc index 6b71a2c5cb1..79314059efc 100644 --- a/cpp/src/arrow/filesystem/gcsfs.cc +++ b/cpp/src/arrow/filesystem/gcsfs.cc @@ -34,7 +34,6 @@ namespace { namespace gcs = google::cloud::storage; -auto constexpr kSep = '/'; // Change the default upload buffer size. In general, sending larger buffers is more // efficient with GCS, as each buffer requires a roundtrip to the service. With formatted // output (when using `operator<<`), keeping a larger buffer in memory before uploading @@ -49,18 +48,17 @@ struct GcsPath { std::string object; static Result FromString(const std::string& s) { - const auto src = internal::RemoveTrailingSlash(s); - auto const first_sep = src.find_first_of(kSep); + auto const first_sep = s.find_first_of(internal::kSep); if (first_sep == 0) { return Status::Invalid("Path cannot start with a separator ('", s, "')"); } if (first_sep == std::string::npos) { - return GcsPath{std::string(src), std::string(src), ""}; + return GcsPath{s, internal::RemoveTrailingSlash(s).to_string(), ""}; } GcsPath path; - path.full_path = std::string(src); - path.bucket = std::string(src.substr(0, first_sep)); - path.object = std::string(src.substr(first_sep + 1)); + path.full_path = s; + path.bucket = s.substr(0, first_sep); + path.object = s.substr(first_sep + 1); return path; } @@ -275,12 +273,73 @@ class GcsFileSystem::Impl { const GcsOptions& options() const { return options_; } Result GetFileInfo(const GcsPath& path) { - if (!path.object.empty()) { - auto meta = client_.GetObjectMetadata(path.bucket, path.object); - return GetFileInfoImpl(path, std::move(meta).status(), FileType::File); + if (path.object.empty()) { + auto meta = client_.GetBucketMetadata(path.bucket); + return GetFileInfoImpl(path, std::move(meta).status(), FileType::Directory); } - auto meta = client_.GetBucketMetadata(path.bucket); - return GetFileInfoImpl(path, std::move(meta).status(), FileType::Directory); + auto meta = client_.GetObjectMetadata(path.bucket, path.object); + return GetFileInfoImpl( + path, std::move(meta).status(), + path.object.back() == '/' ? FileType::Directory : FileType::File); + } + + // GCS does not have directories or folders. But folders can be emulated (with some + // limitations) using marker objects. That and listing with prefixes creates the + // illusion of folders. + google::cloud::Status CreateDirMarker(const std::string& bucket, + util::string_view name) { + // Make the name canonical. + const auto canonical = internal::EnsureTrailingSlash(name); + return client_ + .InsertObject(bucket, canonical, std::string(), + gcs::WithObjectMetadata(gcs::ObjectMetadata().upsert_metadata( + "arrow/gcsfs", "directory"))) + .status(); + } + + google::cloud::Status CreateDirMarkerRecursive(const std::string& bucket, + const std::string& object) { + using GcsCode = google::cloud::StatusCode; + auto get_parent = [](std::string const& path) { + return std::move(internal::GetAbstractPathParent(path).first); + }; + // Maybe counterintuitively we create the markers from the most nested and up. Because + // GCS does not have directories creating `a/b/c` will succeed, even if `a/` or `a/b/` + // does not exist. In the common case, where `a/b/` may already exist, it is more + // efficient to just create `a/b/c/` and then find out that `a/b/` was already there. + // In the case where none exists, it does not matter which order we follow. + auto status = CreateDirMarker(bucket, object); + if (status.code() == GcsCode::kAlreadyExists) return {}; + if (status.code() == GcsCode::kNotFound) { + // Missing bucket, create it first ... + status = client_.CreateBucket(bucket, gcs::BucketMetadata()).status(); + if (status.code() != GcsCode::kOk && status.code() != GcsCode::kAlreadyExists) { + return status; + } + } + + for (auto parent = get_parent(object); !parent.empty(); parent = get_parent(parent)) { + status = CreateDirMarker(bucket, parent); + if (status.code() == GcsCode::kAlreadyExists) { + break; + } + if (!status.ok()) { + return status; + } + } + return {}; + } + + Status CreateDir(const GcsPath& p) { + if (p.object.empty()) { + return internal::ToArrowStatus( + client_.CreateBucket(p.bucket, gcs::BucketMetadata()).status()); + } + return internal::ToArrowStatus(CreateDirMarker(p.bucket, p.object)); + } + + Status CreateDirRecursive(const GcsPath& p) { + return internal::ToArrowStatus(CreateDirMarkerRecursive(p.bucket, p.object)); } Status DeleteFile(const GcsPath& p) { @@ -332,12 +391,15 @@ class GcsFileSystem::Impl { static Result GetFileInfoImpl(const GcsPath& path, const google::cloud::Status& status, FileType type) { + const auto& canonical = type == FileType::Directory + ? internal::EnsureTrailingSlash(path.full_path) + : path.full_path; if (status.ok()) { - return FileInfo(path.full_path, type); + return FileInfo(canonical, type); } using ::google::cloud::StatusCode; if (status.code() == StatusCode::kNotFound) { - return FileInfo(path.full_path, FileType::NotFound); + return FileInfo(canonical, FileType::NotFound); } return internal::ToArrowStatus(status); } @@ -373,7 +435,9 @@ Result GcsFileSystem::GetFileInfo(const FileSelector& select) { } Status GcsFileSystem::CreateDir(const std::string& path, bool recursive) { - return Status::NotImplemented("The GCS FileSystem is not fully implemented"); + ARROW_ASSIGN_OR_RAISE(auto p, GcsPath::FromString(path)); + if (!recursive) return impl_->CreateDir(p); + return impl_->CreateDirRecursive(p); } Status GcsFileSystem::DeleteDir(const std::string& path) { diff --git a/cpp/src/arrow/filesystem/gcsfs.h b/cpp/src/arrow/filesystem/gcsfs.h index 2583bdee820..4e5173e6e3a 100644 --- a/cpp/src/arrow/filesystem/gcsfs.h +++ b/cpp/src/arrow/filesystem/gcsfs.h @@ -40,24 +40,43 @@ struct ARROW_EXPORT GcsOptions { bool Equals(const GcsOptions& other) const; }; +// - TODO(ARROW-1231) - review this documentation before closing the bug. /// \brief GCS-backed FileSystem implementation. /// -/// Some implementation notes: -/// - TODO(ARROW-1231) - review all the notes once completed. -/// - buckets are treated as top-level directories on a "root". -/// - GCS buckets are in a global namespace, only one bucket -/// named `foo` exists in Google Cloud. -/// - Creating new top-level directories is implemented by creating -/// a bucket, this may be a slower operation than usual. -/// - A principal (service account, user, etc) can only list the -/// buckets for a single project, but can access the buckets -/// for many projects. It is possible that listing "all" -/// the buckets returns fewer buckets than you have access to. -/// - GCS does not have directories, they are emulated in this -/// library by listing objects with a common prefix. -/// - In general, GCS has much higher latency than local filesystems. -/// The throughput of GCS is comparable to the throughput of -/// a local file system. +/// GCS (Google Cloud Storage - https://cloud.google.com/storage) is a scalable object +/// storage system for any amount of data. The main abstractions in GCS are buckets and +/// objects. A bucket is a namespace for objects, buckets can store any number of objects, +/// tens of millions and even billions is not uncommon. Each object contains a single +/// blob of data, up to 5TiB in size. Buckets are typically configured to keep a single +/// version of each object, but versioning can be enabled. Versioning is important because +/// objects are immutable, once created one cannot append data to the object or modify the +/// object data in any way. +/// +/// GCS buckets are in a global namespace, if a Google Cloud customer creates a bucket +/// named `foo` no other customer can create a bucket with the same name. Note that a +/// principal (a user or service account) may only list the buckets they are entitled to, +/// and then only within a project. It is not possible to list "all" the buckets. +/// +/// Within each bucket objects are in flat namespace. GCS does not have folders or +/// directories. However, following some conventions it is possible to emulate +/// directories. To this end, this class: +/// +/// - All buckets are treated as directories at the "root" +/// - Creating a root directory results in a new bucket being created, this may be slower +/// than most GCS operations. +/// - Any object with a name ending with a slash (`/`) character is treated as a +/// directory. +/// - The class creates marker objects for a directory, using a trailing slash in the +/// marker names. For debugging purposes, the metadata of these marker objects indicate +/// that they are markers created by this class. The class does not rely on this +/// annotation. +/// - GCS can list all the objects with a given prefix, this is used to emulate listing +/// of directories. +/// - In object lists GCS can summarize all the objects with a common prefix as a single +/// entry, this is used to emulate non-recursive lists. Note that GCS list time is +/// proportional to the number of objects in the prefix. Listing recursively takes +/// almost the same time as non-recursive lists. +/// class ARROW_EXPORT GcsFileSystem : public FileSystem { public: ~GcsFileSystem() override = default; @@ -75,6 +94,7 @@ class ARROW_EXPORT GcsFileSystem : public FileSystem { Status DeleteDirContents(const std::string& path) override; + /// This is not implemented in GcsFileSystem, as it would be too dangerous. Status DeleteRootDirContents() override; Status DeleteFile(const std::string& path) override; diff --git a/cpp/src/arrow/filesystem/gcsfs_test.cc b/cpp/src/arrow/filesystem/gcsfs_test.cc index 30d37c18bf1..0cf5c847083 100644 --- a/cpp/src/arrow/filesystem/gcsfs_test.cc +++ b/cpp/src/arrow/filesystem/gcsfs_test.cc @@ -31,6 +31,7 @@ #include #include "arrow/filesystem/gcsfs_internal.h" +#include "arrow/filesystem/path_util.h" #include "arrow/filesystem/test_util.h" #include "arrow/testing/gtest_util.h" #include "arrow/testing/util.h" @@ -51,8 +52,6 @@ using ::testing::NotNull; using ::testing::Pair; using ::testing::UnorderedElementsAre; -auto const* kPreexistingBucket = "test-bucket-name"; -auto const* kPreexistingObject = "test-object-name"; auto const* kLoremIpsum = R"""( Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis @@ -90,22 +89,28 @@ class GcsIntegrationTest : public ::testing::Test { .set("http://127.0.0.1:" + port_) .set(gc::MakeInsecureCredentials())); google::cloud::StatusOr bucket = client.CreateBucketForProject( - kPreexistingBucket, "ignored-by-testbench", gcs::BucketMetadata{}); - ASSERT_TRUE(bucket.ok()) << "Failed to create bucket <" << kPreexistingBucket + PreexistingBucketName(), "ignored-by-testbench", gcs::BucketMetadata{}); + ASSERT_TRUE(bucket.ok()) << "Failed to create bucket <" << PreexistingBucketName() << ">, status=" << bucket.status(); - google::cloud::StatusOr object = - client.InsertObject(kPreexistingBucket, kPreexistingObject, kLoremIpsum); - ASSERT_TRUE(object.ok()) << "Failed to create object <" << kPreexistingObject + google::cloud::StatusOr object = client.InsertObject( + PreexistingBucketName(), PreexistingObjectName(), kLoremIpsum); + ASSERT_TRUE(object.ok()) << "Failed to create object <" << PreexistingObjectName() << ">, status=" << object.status(); } + static std::string PreexistingBucketName() { return "test-bucket-name"; } + + static std::string PreexistingBucketPath() { return PreexistingBucketName() + '/'; } + + static std::string PreexistingObjectName() { return "test-object-name"; } + static std::string PreexistingObjectPath() { - return std::string(kPreexistingBucket) + "/" + kPreexistingObject; + return PreexistingBucketPath() + PreexistingObjectName(); } static std::string NotFoundObjectPath() { - return std::string(kPreexistingBucket) + "/not-found"; + return PreexistingBucketPath() + "not-found"; } GcsOptions TestGcsOptions() { @@ -133,7 +138,7 @@ class GcsIntegrationTest : public ::testing::Test { return line; } - int RandomIndex(std::size_t end) { + std::size_t RandomIndex(std::size_t end) { return std::uniform_int_distribution(0, end - 1)(generator_); } @@ -334,7 +339,7 @@ TEST(GcsFileSystem, ObjectMetadataRoundtrip) { TEST_F(GcsIntegrationTest, GetFileInfoBucket) { auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions()); - arrow::fs::AssertFileInfo(fs.get(), kPreexistingBucket, FileType::Directory); + arrow::fs::AssertFileInfo(fs.get(), PreexistingBucketPath(), FileType::Directory); } TEST_F(GcsIntegrationTest, GetFileInfoObject) { @@ -342,6 +347,50 @@ TEST_F(GcsIntegrationTest, GetFileInfoObject) { arrow::fs::AssertFileInfo(fs.get(), PreexistingObjectPath(), FileType::File); } +TEST_F(GcsIntegrationTest, CreateDirSuccessBucketOnly) { + auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions()); + ASSERT_OK(fs->CreateDir("new-bucket", false)); + arrow::fs::AssertFileInfo(fs.get(), "new-bucket/", FileType::Directory); +} + +TEST_F(GcsIntegrationTest, CreateDirSuccessBucketAndFolder) { + auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions()); + const auto path = PreexistingBucketPath() + "new-folder/"; + ASSERT_OK(fs->CreateDir(path, false)); + arrow::fs::AssertFileInfo(fs.get(), path, FileType::Directory); +} + +TEST_F(GcsIntegrationTest, CreateDirFailureFolderWithMissingBucket) { + auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions()); + const auto path = std::string("not-a-bucket/new-folder/"); + ASSERT_RAISES(IOError, fs->CreateDir(path, false)); +} + +TEST_F(GcsIntegrationTest, CreateDirRecursiveBucketOnly) { + auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions()); + ASSERT_OK(fs->CreateDir("new-bucket", true)); + arrow::fs::AssertFileInfo(fs.get(), "new-bucket/", FileType::Directory); +} + +TEST_F(GcsIntegrationTest, CreateDirRecursiveFolderOnly) { + auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions()); + const auto parent = PreexistingBucketPath() + "new-folder/"; + const auto path = parent + "new-sub/"; + ASSERT_OK(fs->CreateDir(path, true)); + arrow::fs::AssertFileInfo(fs.get(), path, FileType::Directory); + arrow::fs::AssertFileInfo(fs.get(), parent, FileType::Directory); +} + +TEST_F(GcsIntegrationTest, CreateDirRecursiveBucketAndFolder) { + auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions()); + const auto parent = std::string("new-bucket/new-folder/"); + const auto path = parent + "new-sub/"; + ASSERT_OK(fs->CreateDir(path, true)); + arrow::fs::AssertFileInfo(fs.get(), path, FileType::Directory); + arrow::fs::AssertFileInfo(fs.get(), parent, FileType::Directory); + arrow::fs::AssertFileInfo(fs.get(), "new-bucket/", FileType::Directory); +} + TEST_F(GcsIntegrationTest, DeleteRootDirContents) { auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions()); EXPECT_RAISES_WITH_MESSAGE_THAT(NotImplemented, HasSubstr("too dangerous"), @@ -361,20 +410,20 @@ TEST_F(GcsIntegrationTest, DeleteFileFailure) { TEST_F(GcsIntegrationTest, DeleteFileDirectoryFails) { auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions()); - const auto path = std::string(kPreexistingBucket) + "/DeleteFileDirectoryFails/"; + const auto path = PreexistingBucketPath() + "DeleteFileDirectoryFails/"; ASSERT_RAISES(IOError, fs->DeleteFile(path)); } TEST_F(GcsIntegrationTest, CopyFileSuccess) { auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions()); - const auto destination_path = kPreexistingBucket + std::string("/copy-destination"); + const auto destination_path = PreexistingBucketPath() + "copy-destination"; ASSERT_OK(fs->CopyFile(PreexistingObjectPath(), destination_path)); arrow::fs::AssertFileInfo(fs.get(), destination_path, FileType::File); } TEST_F(GcsIntegrationTest, CopyFileNotFound) { auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions()); - const auto destination_path = kPreexistingBucket + std::string("/copy-destination"); + const auto destination_path = PreexistingBucketPath() + "copy-destination"; ASSERT_RAISES(IOError, fs->CopyFile(NotFoundObjectPath(), destination_path)); } @@ -433,7 +482,7 @@ TEST_F(GcsIntegrationTest, ReadObjectInfoInvalid) { auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions()); arrow::fs::FileInfo info; - ASSERT_OK_AND_ASSIGN(info, fs->GetFileInfo(kPreexistingBucket)); + ASSERT_OK_AND_ASSIGN(info, fs->GetFileInfo(PreexistingBucketPath())); ASSERT_RAISES(IOError, fs->OpenInputStream(info)); ASSERT_OK_AND_ASSIGN(info, fs->GetFileInfo(NotFoundObjectPath())); @@ -446,7 +495,7 @@ TEST_F(GcsIntegrationTest, ReadObjectReadMetadata) { const std::string object_name = "ReadObjectMetadataTest/simple.txt"; const gcs::ObjectMetadata expected = client - .InsertObject(kPreexistingBucket, object_name, + .InsertObject(PreexistingBucketName(), object_name, "The quick brown fox jumps over the lazy dog", gcs::WithObjectMetadata(gcs::ObjectMetadata() .set_content_type("text/plain") @@ -457,8 +506,8 @@ TEST_F(GcsIntegrationTest, ReadObjectReadMetadata) { auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions()); std::shared_ptr stream; - ASSERT_OK_AND_ASSIGN( - stream, fs->OpenInputStream(std::string(kPreexistingBucket) + "/" + object_name)); + ASSERT_OK_AND_ASSIGN(stream, + fs->OpenInputStream(PreexistingBucketPath() + object_name)); auto format_time = [](std::chrono::system_clock::time_point tp) { return absl::FormatTime(absl::RFC3339_full, absl::FromChrono(tp), @@ -505,7 +554,7 @@ TEST_F(GcsIntegrationTest, ReadObjectReadMetadata) { TEST_F(GcsIntegrationTest, WriteObjectSmall) { auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions()); - const auto path = kPreexistingBucket + std::string("/test-write-object"); + const auto path = PreexistingBucketPath() + "test-write-object"; std::shared_ptr output; ASSERT_OK_AND_ASSIGN(output, fs->OpenOutputStream(path, {})); const auto expected = std::string(kLoremIpsum); @@ -526,7 +575,7 @@ TEST_F(GcsIntegrationTest, WriteObjectSmall) { TEST_F(GcsIntegrationTest, WriteObjectLarge) { auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions()); - const auto path = kPreexistingBucket + std::string("/test-write-object"); + const auto path = PreexistingBucketPath() + "test-write-object"; std::shared_ptr output; ASSERT_OK_AND_ASSIGN(output, fs->OpenOutputStream(path, {})); // These buffer sizes are intentionally not multiples of the upload quantum (256 KiB). @@ -571,7 +620,7 @@ TEST_F(GcsIntegrationTest, OpenInputFileMixedReadVsReadAt) { [&] { return RandomLine(++lineno, kLineWidth); }); const auto path = - kPreexistingBucket + std::string("/OpenInputFileMixedReadVsReadAt/object-name"); + PreexistingBucketPath() + "OpenInputFileMixedReadVsReadAt/object-name"; std::shared_ptr output; ASSERT_OK_AND_ASSIGN(output, fs->OpenOutputStream(path, {})); for (auto const& line : lines) { @@ -622,8 +671,7 @@ TEST_F(GcsIntegrationTest, OpenInputFileRandomSeek) { std::generate_n(lines.begin(), lines.size(), [&] { return RandomLine(++lineno, kLineWidth); }); - const auto path = - kPreexistingBucket + std::string("/OpenInputFileRandomSeek/object-name"); + const auto path = PreexistingBucketPath() + "OpenInputFileRandomSeek/object-name"; std::shared_ptr output; ASSERT_OK_AND_ASSIGN(output, fs->OpenOutputStream(path, {})); for (auto const& line : lines) { @@ -672,7 +720,7 @@ TEST_F(GcsIntegrationTest, OpenInputFileInfoInvalid) { auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions()); arrow::fs::FileInfo info; - ASSERT_OK_AND_ASSIGN(info, fs->GetFileInfo(kPreexistingBucket)); + ASSERT_OK_AND_ASSIGN(info, fs->GetFileInfo(PreexistingBucketPath())); ASSERT_RAISES(IOError, fs->OpenInputFile(info)); ASSERT_OK_AND_ASSIGN(info, fs->GetFileInfo(NotFoundObjectPath()));