diff --git a/cpp/src/arrow/filesystem/filesystem.cc b/cpp/src/arrow/filesystem/filesystem.cc index 826e5d285df..19d122b571e 100644 --- a/cpp/src/arrow/filesystem/filesystem.cc +++ b/cpp/src/arrow/filesystem/filesystem.cc @@ -24,6 +24,9 @@ #ifdef ARROW_HDFS #include "arrow/filesystem/hdfs.h" #endif +#ifdef ARROW_GCS +#include "arrow/filesystem/gcsfs.h" +#endif #ifdef ARROW_S3 #include "arrow/filesystem/s3fs.h" #endif @@ -686,6 +689,16 @@ Result> FileSystemFromUriReal(const Uri& uri, } return std::make_shared(options, io_context); } + if (scheme == "gs" || scheme == "gcs") { +#ifdef ARROW_GCS + ARROW_ASSIGN_OR_RAISE(auto options, GcsOptions::FromUri(uri, out_path)); + ARROW_ASSIGN_OR_RAISE(auto gcsfs, GcsFileSystem::Make(options, io_context)); + return gcsfs; +#else + return Status::NotImplemented("Got GCS URI but Arrow compiled without GCS support"); +#endif + } + if (scheme == "hdfs" || scheme == "viewfs") { #ifdef ARROW_HDFS ARROW_ASSIGN_OR_RAISE(auto options, HdfsOptions::FromUri(uri)); diff --git a/cpp/src/arrow/filesystem/gcsfs.cc b/cpp/src/arrow/filesystem/gcsfs.cc index a61fef2246e..f1b5a10e829 100644 --- a/cpp/src/arrow/filesystem/gcsfs.cc +++ b/cpp/src/arrow/filesystem/gcsfs.cc @@ -410,7 +410,8 @@ class GcsFileSystem::Impl { google::cloud::StatusOr b = client_.GetBucketMetadata(bucket); if (!b) { if (b.status().code() == GcsCode::kNotFound) { - b = client_.CreateBucket(bucket, gcs::BucketMetadata()); + b = client_.CreateBucket(bucket, gcs::BucketMetadata().set_location( + options_.default_bucket_location)); } if (!b) return internal::ToArrowStatus(b.status()); } @@ -433,7 +434,10 @@ class GcsFileSystem::Impl { Status CreateDir(const GcsPath& p) { if (p.object.empty()) { return internal::ToArrowStatus( - client_.CreateBucket(p.bucket, gcs::BucketMetadata()).status()); + client_ + .CreateBucket(p.bucket, gcs::BucketMetadata().set_location( + options_.default_bucket_location)) + .status()); } auto parent = p.parent(); if (!parent.object.empty()) { @@ -542,14 +546,19 @@ class GcsFileSystem::Impl { Result> OpenOutputStream( const GcsPath& path, const std::shared_ptr& metadata) { + std::shared_ptr resolved_metadata = metadata; + if (resolved_metadata == nullptr && options_.default_metadata != nullptr) { + resolved_metadata = options_.default_metadata; + } gcs::EncryptionKey encryption_key; - ARROW_ASSIGN_OR_RAISE(encryption_key, internal::ToEncryptionKey(metadata)); + ARROW_ASSIGN_OR_RAISE(encryption_key, internal::ToEncryptionKey(resolved_metadata)); gcs::PredefinedAcl predefined_acl; - ARROW_ASSIGN_OR_RAISE(predefined_acl, internal::ToPredefinedAcl(metadata)); + ARROW_ASSIGN_OR_RAISE(predefined_acl, internal::ToPredefinedAcl(resolved_metadata)); gcs::KmsKeyName kms_key_name; - ARROW_ASSIGN_OR_RAISE(kms_key_name, internal::ToKmsKeyName(metadata)); + ARROW_ASSIGN_OR_RAISE(kms_key_name, internal::ToKmsKeyName(resolved_metadata)); gcs::WithObjectMetadata with_object_metadata; - ARROW_ASSIGN_OR_RAISE(with_object_metadata, internal::ToObjectMetadata(metadata)); + ARROW_ASSIGN_OR_RAISE(with_object_metadata, + internal::ToObjectMetadata(resolved_metadata)); auto stream = client_.WriteObject(path.bucket, path.object, encryption_key, predefined_acl, kms_key_name, with_object_metadata); @@ -610,46 +619,106 @@ class GcsFileSystem::Impl { bool GcsOptions::Equals(const GcsOptions& other) const { return credentials == other.credentials && - endpoint_override == other.endpoint_override && scheme == other.scheme; + endpoint_override == other.endpoint_override && scheme == other.scheme && + default_bucket_location == other.default_bucket_location; } GcsOptions GcsOptions::Defaults() { - return GcsOptions{ - std::make_shared(google::cloud::MakeGoogleDefaultCredentials()), - {}, - "https"}; + GcsOptions options{}; + options.credentials = + std::make_shared(google::cloud::MakeGoogleDefaultCredentials()); + options.scheme = "https"; + return options; } GcsOptions GcsOptions::Anonymous() { - return GcsOptions{ - std::make_shared(google::cloud::MakeInsecureCredentials()), - {}, - "http"}; + GcsOptions options{}; + options.credentials = + std::make_shared(google::cloud::MakeInsecureCredentials()); + options.scheme = "http"; + return options; } GcsOptions GcsOptions::FromAccessToken(const std::string& access_token, std::chrono::system_clock::time_point expiration) { - return GcsOptions{ - std::make_shared( - google::cloud::MakeAccessTokenCredentials(access_token, expiration)), - {}, - "https"}; + GcsOptions options{}; + options.credentials = std::make_shared( + google::cloud::MakeAccessTokenCredentials(access_token, expiration)); + options.scheme = "https"; + return options; } GcsOptions GcsOptions::FromImpersonatedServiceAccount( const GcsCredentials& base_credentials, const std::string& target_service_account) { - return GcsOptions{std::make_shared( - google::cloud::MakeImpersonateServiceAccountCredentials( - base_credentials.credentials, target_service_account)), - {}, - "https"}; + GcsOptions options{}; + options.credentials = std::make_shared( + google::cloud::MakeImpersonateServiceAccountCredentials( + base_credentials.credentials, target_service_account)); + options.scheme = "https"; + return options; } GcsOptions GcsOptions::FromServiceAccountCredentials(const std::string& json_object) { - return GcsOptions{std::make_shared( - google::cloud::MakeServiceAccountCredentials(json_object)), - {}, - "https"}; + GcsOptions options{}; + options.credentials = std::make_shared( + google::cloud::MakeServiceAccountCredentials(json_object)); + options.scheme = "https"; + return options; +} + +Result GcsOptions::FromUri(const arrow::internal::Uri& uri, + std::string* out_path) { + const auto bucket = uri.host(); + auto path = uri.path(); + if (bucket.empty()) { + if (!path.empty()) { + return Status::Invalid("Missing bucket name in GCS URI"); + } + } else { + if (path.empty()) { + path = bucket; + } else { + if (path[0] != '/') { + return Status::Invalid("GCS URI should be absolute, not relative"); + } + path = bucket + path; + } + } + if (out_path != nullptr) { + *out_path = std::string(internal::RemoveTrailingSlash(path)); + } + + std::unordered_map options_map; + ARROW_ASSIGN_OR_RAISE(const auto options_items, uri.query_items()); + for (const auto& kv : options_items) { + options_map.emplace(kv.first, kv.second); + } + + if (!uri.password().empty() || !uri.username().empty()) { + return Status::Invalid("GCS does not accept username or password."); + } + + auto options = GcsOptions::Defaults(); + for (const auto& kv : options_map) { + if (kv.first == "location") { + options.default_bucket_location = kv.second; + } else if (kv.first == "scheme") { + options.scheme = kv.second; + } else if (kv.first == "endpoint_override") { + options.endpoint_override = kv.second; + } else { + return Status::Invalid("Unexpected query parameter in GCS URI: '", kv.first, "'"); + } + } + + return options; +} + +Result GcsOptions::FromUri(const std::string& uri_string, + std::string* out_path) { + arrow::internal::Uri uri; + RETURN_NOT_OK(uri.Parse(uri_string)); + return FromUri(uri, out_path); } std::string GcsFileSystem::type_name() const { return "gcs"; } diff --git a/cpp/src/arrow/filesystem/gcsfs.h b/cpp/src/arrow/filesystem/gcsfs.h index 2bf103736cc..d1e5e8ec99d 100644 --- a/cpp/src/arrow/filesystem/gcsfs.h +++ b/cpp/src/arrow/filesystem/gcsfs.h @@ -22,6 +22,7 @@ #include #include "arrow/filesystem/filesystem.h" +#include "arrow/util/uri.h" namespace arrow { namespace fs { @@ -34,6 +35,13 @@ struct ARROW_EXPORT GcsOptions { std::string endpoint_override; std::string scheme; + /// \brief Location to use for creating buckets. + std::string default_bucket_location; + + /// \brief Default metadata for OpenOutputStream. + /// + /// This will be ignored if non-empty metadata is passed to OpenOutputStream. + std::shared_ptr default_metadata; bool Equals(const GcsOptions& other) const; @@ -89,6 +97,11 @@ struct ARROW_EXPORT GcsOptions { /// /// [aip/4112]: https://google.aip.dev/auth/4112 static GcsOptions FromServiceAccountCredentials(const std::string& json_object); + + /// Initialize from URIs such as "gs://bucket/object". + static Result FromUri(const arrow::internal::Uri& uri, + std::string* out_path); + static Result FromUri(const std::string& uri, std::string* out_path); }; /// \brief GCS-backed FileSystem implementation. diff --git a/cpp/src/arrow/filesystem/gcsfs_test.cc b/cpp/src/arrow/filesystem/gcsfs_test.cc index 5ebb0168e5b..9eaacb0dc15 100644 --- a/cpp/src/arrow/filesystem/gcsfs_test.cc +++ b/cpp/src/arrow/filesystem/gcsfs_test.cc @@ -295,6 +295,52 @@ TEST(GcsFileSystem, OptionsAnonymous) { EXPECT_EQ(a.scheme, "http"); } +TEST(GcsFileSystem, OptionsFromUri) { + std::string path; + GcsOptions options; + + ASSERT_OK_AND_ASSIGN(options, GcsOptions::FromUri("gs://", &path)); + EXPECT_EQ(options.default_bucket_location, ""); + EXPECT_EQ(options.scheme, "https"); + EXPECT_EQ(options.endpoint_override, ""); + EXPECT_EQ(path, ""); + + ASSERT_OK_AND_ASSIGN(options, GcsOptions::FromUri("gs:", &path)); + EXPECT_EQ(path, ""); + + // Username/password is unsupported + ASSERT_RAISES(Invalid, GcsOptions::FromUri("gs://access:secret@mybucket", &path)); + + ASSERT_OK_AND_ASSIGN(options, GcsOptions::FromUri("gs://mybucket/", &path)); + EXPECT_EQ(options.default_bucket_location, ""); + EXPECT_EQ(options.scheme, "https"); + EXPECT_EQ(options.endpoint_override, ""); + EXPECT_EQ(path, "mybucket"); + + ASSERT_OK_AND_ASSIGN(options, GcsOptions::FromUri("gs://mybucket/foo/bar/", &path)); + EXPECT_EQ(options.default_bucket_location, ""); + EXPECT_EQ(options.scheme, "https"); + EXPECT_EQ(options.endpoint_override, ""); + EXPECT_EQ(path, "mybucket/foo/bar"); + + // Explicit default_bucket_location override + ASSERT_OK_AND_ASSIGN( + options, + GcsOptions::FromUri("gs://mybucket/foo/bar/" + "?endpoint_override=localhost&scheme=http&location=us-west2", + &path)); + EXPECT_EQ(options.default_bucket_location, "us-west2"); + EXPECT_EQ(options.scheme, "http"); + EXPECT_EQ(options.endpoint_override, "localhost"); + EXPECT_EQ(path, "mybucket/foo/bar"); + + // Missing bucket name + ASSERT_RAISES(Invalid, GcsOptions::FromUri("gs:///foo/bar/", &path)); + + // Invalid option + ASSERT_RAISES(Invalid, GcsOptions::FromUri("gs://mybucket/?xxx=zzz", &path)); +} + TEST(GcsFileSystem, OptionsAccessToken) { auto a = GcsOptions::FromAccessToken( "invalid-access-token-test-only", @@ -956,6 +1002,50 @@ TEST_F(GcsIntegrationTest, OpenInputStreamClosed) { ASSERT_RAISES(Invalid, stream->Tell()); } +TEST_F(GcsIntegrationTest, TestWriteWithDefaults) { + auto options = TestGcsOptions(); + options.default_bucket_location = "utopia"; + options.default_metadata = arrow::key_value_metadata({{"foo", "bar"}}); + auto fs = GcsFileSystem::Make(options); + std::string bucket = "new_bucket_with_default_location"; + auto file_name = "object_with_defaults"; + ASSERT_OK(fs->CreateDir(bucket, /*recursive=*/false)); + const auto path = bucket + "/" + file_name; + std::shared_ptr output; + ASSERT_OK_AND_ASSIGN(output, fs->OpenOutputStream(path, /*metadata=*/{})); + const auto expected = std::string(kLoremIpsum); + ASSERT_OK(output->Write(expected.data(), expected.size())); + ASSERT_OK(output->Close()); + + // Verify we can read the object back. + std::shared_ptr input; + ASSERT_OK_AND_ASSIGN(input, fs->OpenInputStream(path)); + + std::array inbuf{}; + std::int64_t size; + ASSERT_OK_AND_ASSIGN(size, input->Read(inbuf.size(), inbuf.data())); + + EXPECT_EQ(std::string(inbuf.data(), size), expected); + auto object = GcsClient().GetObjectMetadata(bucket, file_name); + ASSERT_TRUE(object.ok()) << "status=" << object.status(); + EXPECT_EQ(object->mutable_metadata()["foo"], "bar"); + auto bucket_info = GcsClient().GetBucketMetadata(bucket); + ASSERT_TRUE(bucket_info.ok()) << "status=" << object.status(); + EXPECT_EQ(bucket_info->location(), "utopia"); + + // Check that explicit metadata overrides the defaults. + ASSERT_OK_AND_ASSIGN( + output, fs->OpenOutputStream( + path, /*metadata=*/arrow::key_value_metadata({{"bar", "foo"}}))); + ASSERT_OK(output->Write(expected.data(), expected.size())); + ASSERT_OK(output->Close()); + object = GcsClient().GetObjectMetadata(bucket, file_name); + ASSERT_TRUE(object.ok()) << "status=" << object.status(); + EXPECT_EQ(object->mutable_metadata()["bar"], "foo"); + // Defaults are overwritten and not merged. + EXPECT_FALSE(object->has_metadata("foo")); +} + TEST_F(GcsIntegrationTest, OpenOutputStreamSmall) { auto fs = GcsFileSystem::Make(TestGcsOptions()); diff --git a/cpp/src/arrow/filesystem/s3fs.cc b/cpp/src/arrow/filesystem/s3fs.cc index 89fe889b2c3..28017cad7e5 100644 --- a/cpp/src/arrow/filesystem/s3fs.cc +++ b/cpp/src/arrow/filesystem/s3fs.cc @@ -321,7 +321,7 @@ Result S3Options::FromUri(const Uri& uri, std::string* out_path) { path = bucket; } else { if (path[0] != '/') { - return Status::Invalid("S3 URI should absolute, not relative"); + return Status::Invalid("S3 URI should be absolute, not relative"); } path = bucket + path; }