Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions cpp/src/arrow/filesystem/filesystem.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
#ifdef ARROW_HDFS
#include "arrow/filesystem/hdfs.h"
#endif
#ifdef ARROW_GCS
#include "arrow/filesystem/gcsfs.h"
#endif
#ifdef ARROW_S3
#include "arrow/filesystem/s3fs.h"
#endif
Expand Down Expand Up @@ -686,6 +689,16 @@ Result<std::shared_ptr<FileSystem>> FileSystemFromUriReal(const Uri& uri,
}
return std::make_shared<LocalFileSystem>(options, io_context);
}
if (scheme == "gs" || scheme == "gcs") {
#ifdef ARROW_GCS
ARROW_ASSIGN_OR_RAISE(auto options, GcsOptions::FromUri(uri, out_path));
ARROW_ASSIGN_OR_RAISE(auto gcsfs, GcsFileSystem::Make(options, io_context));
return gcsfs;
#else
return Status::NotImplemented("Got GCS URI but Arrow compiled without GCS support");
#endif
}

if (scheme == "hdfs" || scheme == "viewfs") {
#ifdef ARROW_HDFS
ARROW_ASSIGN_OR_RAISE(auto options, HdfsOptions::FromUri(uri));
Expand Down
127 changes: 98 additions & 29 deletions cpp/src/arrow/filesystem/gcsfs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,8 @@ class GcsFileSystem::Impl {
google::cloud::StatusOr<gcs::BucketMetadata> b = client_.GetBucketMetadata(bucket);
if (!b) {
if (b.status().code() == GcsCode::kNotFound) {
b = client_.CreateBucket(bucket, gcs::BucketMetadata());
b = client_.CreateBucket(bucket, gcs::BucketMetadata().set_location(
options_.default_bucket_location));
}
if (!b) return internal::ToArrowStatus(b.status());
}
Expand All @@ -433,7 +434,10 @@ class GcsFileSystem::Impl {
Status CreateDir(const GcsPath& p) {
if (p.object.empty()) {
return internal::ToArrowStatus(
client_.CreateBucket(p.bucket, gcs::BucketMetadata()).status());
client_
.CreateBucket(p.bucket, gcs::BucketMetadata().set_location(
options_.default_bucket_location))
.status());
}
auto parent = p.parent();
if (!parent.object.empty()) {
Expand Down Expand Up @@ -542,14 +546,19 @@ class GcsFileSystem::Impl {

Result<std::shared_ptr<io::OutputStream>> OpenOutputStream(
const GcsPath& path, const std::shared_ptr<const KeyValueMetadata>& metadata) {
std::shared_ptr<const KeyValueMetadata> resolved_metadata = metadata;
if (resolved_metadata == nullptr && options_.default_metadata != nullptr) {
resolved_metadata = options_.default_metadata;
}
gcs::EncryptionKey encryption_key;
ARROW_ASSIGN_OR_RAISE(encryption_key, internal::ToEncryptionKey(metadata));
ARROW_ASSIGN_OR_RAISE(encryption_key, internal::ToEncryptionKey(resolved_metadata));
gcs::PredefinedAcl predefined_acl;
ARROW_ASSIGN_OR_RAISE(predefined_acl, internal::ToPredefinedAcl(metadata));
ARROW_ASSIGN_OR_RAISE(predefined_acl, internal::ToPredefinedAcl(resolved_metadata));
gcs::KmsKeyName kms_key_name;
ARROW_ASSIGN_OR_RAISE(kms_key_name, internal::ToKmsKeyName(metadata));
ARROW_ASSIGN_OR_RAISE(kms_key_name, internal::ToKmsKeyName(resolved_metadata));
gcs::WithObjectMetadata with_object_metadata;
ARROW_ASSIGN_OR_RAISE(with_object_metadata, internal::ToObjectMetadata(metadata));
ARROW_ASSIGN_OR_RAISE(with_object_metadata,
internal::ToObjectMetadata(resolved_metadata));

auto stream = client_.WriteObject(path.bucket, path.object, encryption_key,
predefined_acl, kms_key_name, with_object_metadata);
Expand Down Expand Up @@ -610,46 +619,106 @@ class GcsFileSystem::Impl {

bool GcsOptions::Equals(const GcsOptions& other) const {
return credentials == other.credentials &&
endpoint_override == other.endpoint_override && scheme == other.scheme;
endpoint_override == other.endpoint_override && scheme == other.scheme &&
default_bucket_location == other.default_bucket_location;
}

GcsOptions GcsOptions::Defaults() {
return GcsOptions{
std::make_shared<GcsCredentials>(google::cloud::MakeGoogleDefaultCredentials()),
{},
"https"};
GcsOptions options{};
options.credentials =
std::make_shared<GcsCredentials>(google::cloud::MakeGoogleDefaultCredentials());
options.scheme = "https";
return options;
}

GcsOptions GcsOptions::Anonymous() {
return GcsOptions{
std::make_shared<GcsCredentials>(google::cloud::MakeInsecureCredentials()),
{},
"http"};
GcsOptions options{};
options.credentials =
std::make_shared<GcsCredentials>(google::cloud::MakeInsecureCredentials());
options.scheme = "http";
return options;
}

GcsOptions GcsOptions::FromAccessToken(const std::string& access_token,
std::chrono::system_clock::time_point expiration) {
return GcsOptions{
std::make_shared<GcsCredentials>(
google::cloud::MakeAccessTokenCredentials(access_token, expiration)),
{},
"https"};
GcsOptions options{};
options.credentials = std::make_shared<GcsCredentials>(
google::cloud::MakeAccessTokenCredentials(access_token, expiration));
options.scheme = "https";
return options;
}

GcsOptions GcsOptions::FromImpersonatedServiceAccount(
const GcsCredentials& base_credentials, const std::string& target_service_account) {
return GcsOptions{std::make_shared<GcsCredentials>(
google::cloud::MakeImpersonateServiceAccountCredentials(
base_credentials.credentials, target_service_account)),
{},
"https"};
GcsOptions options{};
options.credentials = std::make_shared<GcsCredentials>(
google::cloud::MakeImpersonateServiceAccountCredentials(
base_credentials.credentials, target_service_account));
options.scheme = "https";
return options;
}

GcsOptions GcsOptions::FromServiceAccountCredentials(const std::string& json_object) {
return GcsOptions{std::make_shared<GcsCredentials>(
google::cloud::MakeServiceAccountCredentials(json_object)),
{},
"https"};
GcsOptions options{};
options.credentials = std::make_shared<GcsCredentials>(
google::cloud::MakeServiceAccountCredentials(json_object));
options.scheme = "https";
return options;
}

Result<GcsOptions> GcsOptions::FromUri(const arrow::internal::Uri& uri,
std::string* out_path) {
const auto bucket = uri.host();
auto path = uri.path();
if (bucket.empty()) {
if (!path.empty()) {
return Status::Invalid("Missing bucket name in GCS URI");
}
} else {
if (path.empty()) {
path = bucket;
} else {
if (path[0] != '/') {
return Status::Invalid("GCS URI should be absolute, not relative");
}
path = bucket + path;
}
}
if (out_path != nullptr) {
*out_path = std::string(internal::RemoveTrailingSlash(path));
}

std::unordered_map<std::string, std::string> options_map;
ARROW_ASSIGN_OR_RAISE(const auto options_items, uri.query_items());
for (const auto& kv : options_items) {
options_map.emplace(kv.first, kv.second);
}

if (!uri.password().empty() || !uri.username().empty()) {
return Status::Invalid("GCS does not accept username or password.");
}

auto options = GcsOptions::Defaults();
for (const auto& kv : options_map) {
if (kv.first == "location") {
options.default_bucket_location = kv.second;
} else if (kv.first == "scheme") {
options.scheme = kv.second;
} else if (kv.first == "endpoint_override") {
options.endpoint_override = kv.second;
} else {
return Status::Invalid("Unexpected query parameter in GCS URI: '", kv.first, "'");
}
}

return options;
}

Result<GcsOptions> GcsOptions::FromUri(const std::string& uri_string,
std::string* out_path) {
arrow::internal::Uri uri;
RETURN_NOT_OK(uri.Parse(uri_string));
return FromUri(uri, out_path);
}

std::string GcsFileSystem::type_name() const { return "gcs"; }
Expand Down
13 changes: 13 additions & 0 deletions cpp/src/arrow/filesystem/gcsfs.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <vector>

#include "arrow/filesystem/filesystem.h"
#include "arrow/util/uri.h"

namespace arrow {
namespace fs {
Expand All @@ -34,6 +35,13 @@ struct ARROW_EXPORT GcsOptions {

std::string endpoint_override;
std::string scheme;
/// \brief Location to use for creating buckets.
std::string default_bucket_location;

/// \brief Default metadata for OpenOutputStream.
///
/// This will be ignored if non-empty metadata is passed to OpenOutputStream.
std::shared_ptr<const KeyValueMetadata> default_metadata;

bool Equals(const GcsOptions& other) const;

Expand Down Expand Up @@ -89,6 +97,11 @@ struct ARROW_EXPORT GcsOptions {
///
/// [aip/4112]: https://google.aip.dev/auth/4112
static GcsOptions FromServiceAccountCredentials(const std::string& json_object);

/// Initialize from URIs such as "gs://bucket/object".
static Result<GcsOptions> FromUri(const arrow::internal::Uri& uri,
std::string* out_path);
static Result<GcsOptions> FromUri(const std::string& uri, std::string* out_path);
};

/// \brief GCS-backed FileSystem implementation.
Expand Down
90 changes: 90 additions & 0 deletions cpp/src/arrow/filesystem/gcsfs_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,52 @@ TEST(GcsFileSystem, OptionsAnonymous) {
EXPECT_EQ(a.scheme, "http");
}

TEST(GcsFileSystem, OptionsFromUri) {
std::string path;
GcsOptions options;

ASSERT_OK_AND_ASSIGN(options, GcsOptions::FromUri("gs://", &path));
EXPECT_EQ(options.default_bucket_location, "");
EXPECT_EQ(options.scheme, "https");
EXPECT_EQ(options.endpoint_override, "");
EXPECT_EQ(path, "");

ASSERT_OK_AND_ASSIGN(options, GcsOptions::FromUri("gs:", &path));
EXPECT_EQ(path, "");

// Username/password is unsupported
ASSERT_RAISES(Invalid, GcsOptions::FromUri("gs://access:secret@mybucket", &path));

ASSERT_OK_AND_ASSIGN(options, GcsOptions::FromUri("gs://mybucket/", &path));
EXPECT_EQ(options.default_bucket_location, "");
EXPECT_EQ(options.scheme, "https");
EXPECT_EQ(options.endpoint_override, "");
EXPECT_EQ(path, "mybucket");

ASSERT_OK_AND_ASSIGN(options, GcsOptions::FromUri("gs://mybucket/foo/bar/", &path));
EXPECT_EQ(options.default_bucket_location, "");
EXPECT_EQ(options.scheme, "https");
EXPECT_EQ(options.endpoint_override, "");
EXPECT_EQ(path, "mybucket/foo/bar");

// Explicit default_bucket_location override
ASSERT_OK_AND_ASSIGN(
options,
GcsOptions::FromUri("gs://mybucket/foo/bar/"
"?endpoint_override=localhost&scheme=http&location=us-west2",
&path));
EXPECT_EQ(options.default_bucket_location, "us-west2");
EXPECT_EQ(options.scheme, "http");
EXPECT_EQ(options.endpoint_override, "localhost");
EXPECT_EQ(path, "mybucket/foo/bar");

// Missing bucket name
ASSERT_RAISES(Invalid, GcsOptions::FromUri("gs:///foo/bar/", &path));

// Invalid option
ASSERT_RAISES(Invalid, GcsOptions::FromUri("gs://mybucket/?xxx=zzz", &path));
}

TEST(GcsFileSystem, OptionsAccessToken) {
auto a = GcsOptions::FromAccessToken(
"invalid-access-token-test-only",
Expand Down Expand Up @@ -956,6 +1002,50 @@ TEST_F(GcsIntegrationTest, OpenInputStreamClosed) {
ASSERT_RAISES(Invalid, stream->Tell());
}

TEST_F(GcsIntegrationTest, TestWriteWithDefaults) {
auto options = TestGcsOptions();
options.default_bucket_location = "utopia";
options.default_metadata = arrow::key_value_metadata({{"foo", "bar"}});
auto fs = GcsFileSystem::Make(options);
std::string bucket = "new_bucket_with_default_location";
auto file_name = "object_with_defaults";
ASSERT_OK(fs->CreateDir(bucket, /*recursive=*/false));
const auto path = bucket + "/" + file_name;
std::shared_ptr<io::OutputStream> output;
ASSERT_OK_AND_ASSIGN(output, fs->OpenOutputStream(path, /*metadata=*/{}));
const auto expected = std::string(kLoremIpsum);
ASSERT_OK(output->Write(expected.data(), expected.size()));
ASSERT_OK(output->Close());

// Verify we can read the object back.
std::shared_ptr<io::InputStream> input;
ASSERT_OK_AND_ASSIGN(input, fs->OpenInputStream(path));

std::array<char, 1024> inbuf{};
std::int64_t size;
ASSERT_OK_AND_ASSIGN(size, input->Read(inbuf.size(), inbuf.data()));

EXPECT_EQ(std::string(inbuf.data(), size), expected);
auto object = GcsClient().GetObjectMetadata(bucket, file_name);
ASSERT_TRUE(object.ok()) << "status=" << object.status();
EXPECT_EQ(object->mutable_metadata()["foo"], "bar");
auto bucket_info = GcsClient().GetBucketMetadata(bucket);
ASSERT_TRUE(bucket_info.ok()) << "status=" << object.status();
EXPECT_EQ(bucket_info->location(), "utopia");

// Check that explicit metadata overrides the defaults.
ASSERT_OK_AND_ASSIGN(
output, fs->OpenOutputStream(
path, /*metadata=*/arrow::key_value_metadata({{"bar", "foo"}})));
ASSERT_OK(output->Write(expected.data(), expected.size()));
ASSERT_OK(output->Close());
object = GcsClient().GetObjectMetadata(bucket, file_name);
ASSERT_TRUE(object.ok()) << "status=" << object.status();
EXPECT_EQ(object->mutable_metadata()["bar"], "foo");
// Defaults are overwritten and not merged.
EXPECT_FALSE(object->has_metadata("foo"));
}

TEST_F(GcsIntegrationTest, OpenOutputStreamSmall) {
auto fs = GcsFileSystem::Make(TestGcsOptions());

Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/filesystem/s3fs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ Result<S3Options> S3Options::FromUri(const Uri& uri, std::string* out_path) {
path = bucket;
} else {
if (path[0] != '/') {
return Status::Invalid("S3 URI should absolute, not relative");
return Status::Invalid("S3 URI should be absolute, not relative");
}
path = bucket + path;
}
Expand Down