diff --git a/cpp/src/arrow/filesystem/gcsfs.cc b/cpp/src/arrow/filesystem/gcsfs.cc index 73d599fffa5..aa69890d336 100644 --- a/cpp/src/arrow/filesystem/gcsfs.cc +++ b/cpp/src/arrow/filesystem/gcsfs.cc @@ -25,6 +25,9 @@ #include "arrow/result.h" #include "arrow/util/checked_cast.h" +#define ARROW_GCS_RETURN_NOT_OK(expr) \ + if (!expr.ok()) return internal::ToArrowStatus(expr) + namespace arrow { namespace fs { namespace { @@ -32,6 +35,13 @@ namespace { namespace gcs = google::cloud::storage; auto constexpr kSep = '/'; +// Change the default upload buffer size. In general, sending larger buffers is more +// efficient with GCS, as each buffer requires a roundtrip to the service. With formatted +// output (when using `operator<<`), keeping a larger buffer in memory before uploading +// makes sense. With unformatted output (the only choice given gcs::io::OutputStream's +// API) it is better to let the caller provide as large a buffer as they want. The GCS C++ +// client library will upload this buffer with zero copies if possible. +auto constexpr kUploadBufferSize = 256 * 1024; struct GcsPath { std::string full_path; @@ -83,18 +93,14 @@ class GcsInputStream : public arrow::io::InputStream { Result Read(int64_t nbytes, void* out) override { stream_.read(static_cast(out), nbytes); - if (!stream_.status().ok()) { - return internal::ToArrowStatus(stream_.status()); - } + ARROW_GCS_RETURN_NOT_OK(stream_.status()); return stream_.gcount(); } Result> Read(int64_t nbytes) override { ARROW_ASSIGN_OR_RAISE(auto buffer, arrow::AllocateResizableBuffer(nbytes)); stream_.read(reinterpret_cast(buffer->mutable_data()), nbytes); - if (!stream_.status().ok()) { - return internal::ToArrowStatus(stream_.status()); - } + ARROW_GCS_RETURN_NOT_OK(stream_.status()); RETURN_NOT_OK(buffer->Resize(stream_.gcount(), true)); return buffer; } @@ -103,6 +109,43 @@ class GcsInputStream : public arrow::io::InputStream { mutable gcs::ObjectReadStream stream_; }; +class GcsOutputStream : public arrow::io::OutputStream { + public: + explicit GcsOutputStream(gcs::ObjectWriteStream stream) : stream_(std::move(stream)) {} + ~GcsOutputStream() override = default; + + Status Close() override { + stream_.Close(); + return internal::ToArrowStatus(stream_.last_status()); + } + + Result Tell() const override { + if (!stream_) { + return Status::IOError("invalid stream"); + } + return tell_; + } + + bool closed() const override { return !stream_.IsOpen(); } + + Status Write(const void* data, int64_t nbytes) override { + if (stream_.write(reinterpret_cast(data), nbytes)) { + tell_ += nbytes; + return Status::OK(); + } + return internal::ToArrowStatus(stream_.last_status()); + } + + Status Flush() override { + stream_.flush(); + return Status::OK(); + } + + private: + gcs::ObjectWriteStream stream_; + int64_t tell_ = 0; +}; + } // namespace google::cloud::Options AsGoogleCloudOptions(const GcsOptions& o) { @@ -116,6 +159,7 @@ google::cloud::Options AsGoogleCloudOptions(const GcsOptions& o) { options.set( google::cloud::MakeInsecureCredentials()); } + options.set(kUploadBufferSize); if (!o.endpoint_override.empty()) { options.set(scheme + "://" + o.endpoint_override); } @@ -140,12 +184,27 @@ class GcsFileSystem::Impl { Result> OpenInputStream(const GcsPath& path) { auto stream = client_.ReadObject(path.bucket, path.object); - if (!stream.status().ok()) { - return internal::ToArrowStatus(stream.status()); - } + ARROW_GCS_RETURN_NOT_OK(stream.status()); return std::make_shared(std::move(stream)); } + Result> OpenOutputStream( + const GcsPath& path, const std::shared_ptr& metadata) { + gcs::EncryptionKey encryption_key; + ARROW_ASSIGN_OR_RAISE(encryption_key, internal::ToEncryptionKey(metadata)); + gcs::PredefinedAcl predefined_acl; + ARROW_ASSIGN_OR_RAISE(predefined_acl, internal::ToPredefinedAcl(metadata)); + gcs::KmsKeyName kms_key_name; + ARROW_ASSIGN_OR_RAISE(kms_key_name, internal::ToKmsKeyName(metadata)); + gcs::WithObjectMetadata with_object_metadata; + ARROW_ASSIGN_OR_RAISE(with_object_metadata, internal::ToObjectMetadata(metadata)); + + auto stream = client_.WriteObject(path.bucket, path.object, encryption_key, + predefined_acl, kms_key_name, with_object_metadata); + ARROW_GCS_RETURN_NOT_OK(stream.last_status()); + return std::make_shared(std::move(stream)); + } + private: static Result GetFileInfoImpl(const GcsPath& path, const google::cloud::Status& status, @@ -245,7 +304,8 @@ Result> GcsFileSystem::OpenInputFile( Result> GcsFileSystem::OpenOutputStream( const std::string& path, const std::shared_ptr& metadata) { - return Status::NotImplemented("The GCS FileSystem is not fully implemented"); + ARROW_ASSIGN_OR_RAISE(auto p, GcsPath::FromString(path)); + return impl_->OpenOutputStream(p, metadata); } Result> GcsFileSystem::OpenAppendStream( diff --git a/cpp/src/arrow/filesystem/gcsfs_internal.cc b/cpp/src/arrow/filesystem/gcsfs_internal.cc index 898015859c2..759e95feb73 100644 --- a/cpp/src/arrow/filesystem/gcsfs_internal.cc +++ b/cpp/src/arrow/filesystem/gcsfs_internal.cc @@ -17,9 +17,13 @@ #include "arrow/filesystem/gcsfs_internal.h" +#include // NOLINT #include #include +#include + +#include "arrow/util/key_value_metadata.h" namespace arrow { namespace fs { @@ -62,6 +66,133 @@ Status ToArrowStatus(const google::cloud::Status& s) { return Status::OK(); } +namespace gcs = ::google::cloud::storage; + +Result ToEncryptionKey( + const std::shared_ptr& metadata) { + if (!metadata) { + return gcs::EncryptionKey{}; + } + + const auto& keys = metadata->keys(); + const auto& values = metadata->values(); + + for (std::size_t i = 0; i < keys.size(); ++i) { + if (keys[i] == "encryptionKeyBase64") { + return gcs::EncryptionKey::FromBase64Key(values[i]); + } + } + return gcs::EncryptionKey{}; +} + +Result ToKmsKeyName( + const std::shared_ptr& metadata) { + if (!metadata) { + return gcs::KmsKeyName{}; + } + + const auto& keys = metadata->keys(); + const auto& values = metadata->values(); + + for (std::size_t i = 0; i < keys.size(); ++i) { + if (keys[i] == "kmsKeyName") { + return gcs::KmsKeyName(values[i]); + } + } + return gcs::KmsKeyName{}; +} + +Result ToPredefinedAcl( + const std::shared_ptr& metadata) { + if (!metadata) { + return gcs::PredefinedAcl{}; + } + + const auto& keys = metadata->keys(); + const auto& values = metadata->values(); + + for (std::size_t i = 0; i < keys.size(); ++i) { + if (keys[i] == "predefinedAcl") { + return gcs::PredefinedAcl(values[i]); + } + } + return gcs::PredefinedAcl{}; +} + +Result ToObjectMetadata( + const std::shared_ptr& metadata) { + if (!metadata) { + return gcs::WithObjectMetadata{}; + } + + static auto const setters = [] { + using setter = std::function; + return std::unordered_map{ + {"Cache-Control", + [](gcs::ObjectMetadata& m, const std::string& v) { + m.set_cache_control(v); + return Status::OK(); + }}, + {"Content-Disposition", + [](gcs::ObjectMetadata& m, const std::string& v) { + m.set_content_disposition(v); + return Status::OK(); + }}, + {"Content-Encoding", + [](gcs::ObjectMetadata& m, const std::string& v) { + m.set_content_encoding(v); + return Status::OK(); + }}, + {"Content-Language", + [](gcs::ObjectMetadata& m, const std::string& v) { + m.set_content_language(v); + return Status::OK(); + }}, + {"Content-Type", + [](gcs::ObjectMetadata& m, const std::string& v) { + m.set_content_type(v); + return Status::OK(); + }}, + {"customTime", + [](gcs::ObjectMetadata& m, const std::string& v) { + std::string err; + absl::Time t; + if (!absl::ParseTime(absl::RFC3339_full, v, &t, &err)) { + return Status::Invalid("Error parsing RFC-3339 timestamp: '", v, "': ", err); + } + m.set_custom_time(absl::ToChronoTime(t)); + return Status::OK(); + }}, + {"storageClass", + [](gcs::ObjectMetadata& m, const std::string& v) { + m.set_storage_class(v); + return Status::OK(); + }}, + {"predefinedAcl", + [](gcs::ObjectMetadata&, const std::string&) { return Status::OK(); }}, + {"encryptionKeyBase64", + [](gcs::ObjectMetadata&, const std::string&) { return Status::OK(); }}, + {"kmsKeyName", + [](gcs::ObjectMetadata&, const std::string&) { return Status::OK(); }}, + }; + }(); + + const auto& keys = metadata->keys(); + const auto& values = metadata->values(); + + gcs::ObjectMetadata object_metadata; + for (std::size_t i = 0; i < keys.size(); ++i) { + auto it = setters.find(keys[i]); + if (it != setters.end()) { + auto status = it->second(object_metadata, values[i]); + if (!status.ok()) return status; + } else { + object_metadata.upsert_metadata(keys[i], values[i]); + } + } + return gcs::WithObjectMetadata(std::move(object_metadata)); +} + } // namespace internal } // namespace fs } // namespace arrow diff --git a/cpp/src/arrow/filesystem/gcsfs_internal.h b/cpp/src/arrow/filesystem/gcsfs_internal.h index 8d568701ed5..938d08d6323 100644 --- a/cpp/src/arrow/filesystem/gcsfs_internal.h +++ b/cpp/src/arrow/filesystem/gcsfs_internal.h @@ -18,6 +18,9 @@ #pragma once #include +#include +#include +#include #include #include @@ -31,6 +34,18 @@ namespace internal { Status ToArrowStatus(const google::cloud::Status& s); +Result ToEncryptionKey( + const std::shared_ptr& metadata); + +Result ToPredefinedAcl( + const std::shared_ptr& metadata); + +Result ToKmsKeyName( + const std::shared_ptr& metadata); + +Result ToObjectMetadata( + const std::shared_ptr& metadata); + } // namespace internal } // namespace fs } // namespace arrow diff --git a/cpp/src/arrow/filesystem/gcsfs_test.cc b/cpp/src/arrow/filesystem/gcsfs_test.cc index 369317fbb34..3304d4bcc73 100644 --- a/cpp/src/arrow/filesystem/gcsfs_test.cc +++ b/cpp/src/arrow/filesystem/gcsfs_test.cc @@ -32,6 +32,7 @@ #include "arrow/filesystem/test_util.h" #include "arrow/testing/gtest_util.h" #include "arrow/testing/util.h" +#include "arrow/util/key_value_metadata.h" namespace arrow { namespace fs { @@ -45,6 +46,8 @@ using ::testing::HasSubstr; using ::testing::IsEmpty; using ::testing::Not; using ::testing::NotNull; +using ::testing::Pair; +using ::testing::UnorderedElementsAre; auto const* kPreexistingBucket = "test-bucket-name"; auto const* kPreexistingObject = "test-object-name"; @@ -183,6 +186,96 @@ TEST(GcsFileSystem, FileSystemCompare) { EXPECT_FALSE(a->Equals(*b)); } +std::shared_ptr KeyValueMetadataForTest() { + return arrow::key_value_metadata({ + {"Cache-Control", "test-only-cache-control"}, + {"Content-Disposition", "test-only-content-disposition"}, + {"Content-Encoding", "test-only-content-encoding"}, + {"Content-Language", "test-only-content-language"}, + {"Content-Type", "test-only-content-type"}, + {"customTime", "2021-10-26T01:02:03.456Z"}, + {"storageClass", "test-only-storage-class"}, + {"key", "test-only-value"}, + {"kmsKeyName", "test-only-kms-key-name"}, + {"predefinedAcl", "test-only-predefined-acl"}, + // computed with: /bin/echo -n "01234567" | openssl base64 + {"encryptionKeyBase64", "MDEyMzQ1Njc="}, + }); +} + +TEST(GcsFileSystem, ToEncryptionKey) { + gcs::EncryptionKey key; + ASSERT_OK_AND_ASSIGN(key, + arrow::fs::internal::ToEncryptionKey(KeyValueMetadataForTest())); + ASSERT_TRUE(key.has_value()); + EXPECT_EQ(key.value().algorithm, "AES256"); + EXPECT_EQ(key.value().key, "MDEyMzQ1Njc="); + // /bin/echo -n "01234567" | sha256sum | awk '{printf("%s", $1);}' | + // xxd -r -p | openssl base64 + // to get the SHA256 value of the key. + EXPECT_EQ(key.value().sha256, "kkWSubED8U+DP6r7Z/SAaR8BmIqkV8AGF2n1jNRzEbw="); +} + +TEST(GcsFileSystem, ToEncryptionKeyEmpty) { + gcs::EncryptionKey key; + ASSERT_OK_AND_ASSIGN(key, arrow::fs::internal::ToEncryptionKey({})); + ASSERT_FALSE(key.has_value()); +} + +TEST(GcsFileSystem, ToKmsKeyName) { + gcs::KmsKeyName key; + ASSERT_OK_AND_ASSIGN(key, arrow::fs::internal::ToKmsKeyName(KeyValueMetadataForTest())); + EXPECT_EQ(key.value_or(""), "test-only-kms-key-name"); +} + +TEST(GcsFileSystem, ToKmsKeyNameEmpty) { + gcs::KmsKeyName key; + ASSERT_OK_AND_ASSIGN(key, arrow::fs::internal::ToKmsKeyName({})); + ASSERT_FALSE(key.has_value()); +} + +TEST(GcsFileSystem, ToPredefinedAcl) { + gcs::PredefinedAcl predefined; + ASSERT_OK_AND_ASSIGN(predefined, + arrow::fs::internal::ToPredefinedAcl(KeyValueMetadataForTest())); + EXPECT_EQ(predefined.value_or(""), "test-only-predefined-acl"); +} + +TEST(GcsFileSystem, ToPredefinedAclEmpty) { + gcs::PredefinedAcl predefined; + ASSERT_OK_AND_ASSIGN(predefined, arrow::fs::internal::ToPredefinedAcl({})); + ASSERT_FALSE(predefined.has_value()); +} + +TEST(GcsFileSystem, ToObjectMetadata) { + gcs::WithObjectMetadata metadata; + ASSERT_OK_AND_ASSIGN(metadata, + arrow::fs::internal::ToObjectMetadata(KeyValueMetadataForTest())); + ASSERT_TRUE(metadata.has_value()); + EXPECT_EQ(metadata.value().cache_control(), "test-only-cache-control"); + EXPECT_EQ(metadata.value().content_disposition(), "test-only-content-disposition"); + EXPECT_EQ(metadata.value().content_encoding(), "test-only-content-encoding"); + EXPECT_EQ(metadata.value().content_language(), "test-only-content-language"); + EXPECT_EQ(metadata.value().content_type(), "test-only-content-type"); + ASSERT_TRUE(metadata.value().has_custom_time()); + EXPECT_THAT(metadata.value().metadata(), + UnorderedElementsAre(Pair("key", "test-only-value"))); +} + +TEST(GcsFileSystem, ToObjectMetadataEmpty) { + gcs::WithObjectMetadata metadata; + ASSERT_OK_AND_ASSIGN(metadata, arrow::fs::internal::ToObjectMetadata({})); + ASSERT_FALSE(metadata.has_value()); +} + +TEST(GcsFileSystem, ToObjectMetadataInvalidCustomTime) { + auto metadata = arrow::fs::internal::ToObjectMetadata(arrow::key_value_metadata({ + {"customTime", "invalid"}, + })); + EXPECT_EQ(metadata.status().code(), StatusCode::Invalid); + EXPECT_THAT(metadata.status().message(), HasSubstr("Error parsing RFC-3339")); +} + TEST_F(GcsIntegrationTest, GetFileInfoBucket) { auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions()); arrow::fs::AssertFileInfo(fs.get(), kPreexistingBucket, FileType::Directory); @@ -259,6 +352,63 @@ TEST_F(GcsIntegrationTest, ReadObjectInfoInvalid) { EXPECT_EQ(result.status().code(), StatusCode::IOError); } +TEST_F(GcsIntegrationTest, WriteObjectSmall) { + auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions()); + + const auto path = kPreexistingBucket + std::string("/test-write-object"); + std::shared_ptr output; + ASSERT_OK_AND_ASSIGN(output, fs->OpenOutputStream(path, {})); + const auto expected = std::string(kLoremIpsum); + ASSERT_OK(output->Write(expected.data(), expected.size())); + ASSERT_OK(output->Close()); + + // Verify we can read the object back. + std::shared_ptr input; + ASSERT_OK_AND_ASSIGN(input, fs->OpenInputStream(path)); + + std::array inbuf{}; + std::int64_t size; + ASSERT_OK_AND_ASSIGN(size, input->Read(inbuf.size(), inbuf.data())); + + EXPECT_EQ(std::string(inbuf.data(), size), expected); +} + +TEST_F(GcsIntegrationTest, WriteObjectLarge) { + auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions()); + + const auto path = kPreexistingBucket + std::string("/test-write-object"); + std::shared_ptr output; + ASSERT_OK_AND_ASSIGN(output, fs->OpenOutputStream(path, {})); + // These buffer sizes are intentionally not multiples of the upload quantum (256 KiB). + std::array sizes{257 * 1024, 258 * 1024, 259 * 1024}; + std::array buffers{ + std::string(sizes[0], 'A'), + std::string(sizes[1], 'B'), + std::string(sizes[2], 'C'), + }; + auto expected = std::int64_t{0}; + for (auto i = 0; i != 3; ++i) { + ASSERT_OK(output->Write(buffers[i].data(), buffers[i].size())); + expected += sizes[i]; + ASSERT_EQ(output->Tell(), expected); + } + ASSERT_OK(output->Close()); + + // Verify we can read the object back. + std::shared_ptr input; + ASSERT_OK_AND_ASSIGN(input, fs->OpenInputStream(path)); + + std::string contents; + std::shared_ptr buffer; + do { + ASSERT_OK_AND_ASSIGN(buffer, input->Read(128 * 1024)); + ASSERT_TRUE(buffer); + contents.append(buffer->ToString()); + } while (buffer->size() != 0); + + EXPECT_EQ(contents, buffers[0] + buffers[1] + buffers[2]); +} + } // namespace } // namespace fs } // namespace arrow