diff --git a/cpp/src/arrow/filesystem/gcsfs.cc b/cpp/src/arrow/filesystem/gcsfs.cc index aa69890d336..49b0930dca4 100644 --- a/cpp/src/arrow/filesystem/gcsfs.cc +++ b/cpp/src/arrow/filesystem/gcsfs.cc @@ -73,10 +73,19 @@ struct GcsPath { class GcsInputStream : public arrow::io::InputStream { public: - explicit GcsInputStream(gcs::ObjectReadStream stream) : stream_(std::move(stream)) {} + explicit GcsInputStream(gcs::ObjectReadStream stream, std::string bucket_name, + std::string object_name, gcs::Generation generation, + gcs::Client client) + : stream_(std::move(stream)), + bucket_name_(std::move(bucket_name)), + object_name_(std::move(object_name)), + generation_(generation), + client_(std::move(client)) {} ~GcsInputStream() override = default; + //@{ + // @name FileInterface Status Close() override { stream_.Close(); return Status::OK(); @@ -90,7 +99,10 @@ class GcsInputStream : public arrow::io::InputStream { } bool closed() const override { return !stream_.IsOpen(); } + //@} + //@{ + // @name Readable Result Read(int64_t nbytes, void* out) override { stream_.read(static_cast(out), nbytes); ARROW_GCS_RETURN_NOT_OK(stream_.status()); @@ -104,9 +116,23 @@ class GcsInputStream : public arrow::io::InputStream { RETURN_NOT_OK(buffer->Resize(stream_.gcount(), true)); return buffer; } + //@} + + //@{ + // @name InputStream + Result> ReadMetadata() override { + auto metadata = client_.GetObjectMetadata(bucket_name_, object_name_, generation_); + ARROW_GCS_RETURN_NOT_OK(metadata.status()); + return internal::FromObjectMetadata(*metadata); + } + //@} private: mutable gcs::ObjectReadStream stream_; + std::string bucket_name_; + std::string object_name_; + gcs::Generation generation_; + gcs::Client client_; }; class GcsOutputStream : public arrow::io::OutputStream { @@ -185,7 +211,8 @@ class GcsFileSystem::Impl { Result> OpenInputStream(const GcsPath& path) { auto stream = client_.ReadObject(path.bucket, path.object); ARROW_GCS_RETURN_NOT_OK(stream.status()); - return std::make_shared(std::move(stream)); + return std::make_shared(std::move(stream), path.bucket, path.object, + gcs::Generation(), client_); } Result> OpenOutputStream( diff --git a/cpp/src/arrow/filesystem/gcsfs_internal.cc b/cpp/src/arrow/filesystem/gcsfs_internal.cc index 759e95feb73..e4cb38ade92 100644 --- a/cpp/src/arrow/filesystem/gcsfs_internal.cc +++ b/cpp/src/arrow/filesystem/gcsfs_internal.cc @@ -22,6 +22,7 @@ #include #include +#include #include "arrow/util/key_value_metadata.h" @@ -193,6 +194,70 @@ Result ToObjectMetadata( return gcs::WithObjectMetadata(std::move(object_metadata)); } +Result> FromObjectMetadata( + gcs::ObjectMetadata const& m) { + auto format_time = [](std::chrono::system_clock::time_point tp) { + return absl::FormatTime(absl::RFC3339_full, absl::FromChrono(tp), + absl::UTCTimeZone()); + }; + auto result = std::make_shared(); + // The fields are in the same order as defined in: + // https://cloud.google.com/storage/docs/json_api/v1/objects + // Where previous practice in Arrow suggested using a different field name (Content-Type + // vs. contentType) we prefer the existing practice in Arrow. + result->Append("id", m.id()); + result->Append("selfLink", m.self_link()); + result->Append("name", m.name()); + result->Append("bucket", m.bucket()); + result->Append("generation", std::to_string(m.generation())); + result->Append("Content-Type", m.content_type()); + result->Append("timeCreated", format_time(m.time_created())); + result->Append("updated", format_time(m.updated())); + if (m.has_custom_time()) { + result->Append("customTime", format_time(m.custom_time())); + } + if (m.time_deleted() != std::chrono::system_clock::time_point()) { + result->Append("timeDeleted", format_time(m.time_deleted())); + } + result->Append("temporaryHold", m.temporary_hold() ? "true" : "false"); + result->Append("eventBasedHold", m.event_based_hold() ? "true" : "false"); + if (m.retention_expiration_time() != std::chrono::system_clock::time_point()) { + result->Append("retentionExpirationTime", format_time(m.retention_expiration_time())); + } + result->Append("storageClass", m.storage_class()); + if (m.time_storage_class_updated() != std::chrono::system_clock::time_point()) { + result->Append("timeStorageClassUpdated", + format_time(m.time_storage_class_updated())); + } + result->Append("size", std::to_string(m.size())); + result->Append("md5Hash", m.md5_hash()); + result->Append("mediaLink", m.media_link()); + result->Append("Content-Encoding", m.content_encoding()); + result->Append("Content-Disposition", m.content_disposition()); + result->Append("Content-Language", m.content_language()); + result->Append("Cache-Control", m.cache_control()); + for (auto const& kv : m.metadata()) { + result->Append("metadata." + kv.first, kv.second); + } + // Skip "acl" because it is overly complex + if (m.has_owner()) { + result->Append("owner.entity", m.owner().entity); + result->Append("owner.entityId", m.owner().entity_id); + } + result->Append("crc32c", m.crc32c()); + result->Append("componentCount", std::to_string(m.component_count())); + result->Append("etag", m.etag()); + if (m.has_customer_encryption()) { + result->Append("customerEncryption.encryptionAlgorithm", + m.customer_encryption().encryption_algorithm); + result->Append("customerEncryption.keySha256", m.customer_encryption().key_sha256); + } + if (!m.kms_key_name().empty()) { + result->Append("kmsKeyName", m.kms_key_name()); + } + return result; +} + } // namespace internal } // namespace fs } // namespace arrow diff --git a/cpp/src/arrow/filesystem/gcsfs_internal.h b/cpp/src/arrow/filesystem/gcsfs_internal.h index 938d08d6323..fc7789ff0f5 100644 --- a/cpp/src/arrow/filesystem/gcsfs_internal.h +++ b/cpp/src/arrow/filesystem/gcsfs_internal.h @@ -46,6 +46,9 @@ Result ToKmsKeyName( Result ToObjectMetadata( const std::shared_ptr& metadata); +Result> FromObjectMetadata( + google::cloud::storage::ObjectMetadata const& m); + } // namespace internal } // namespace fs } // namespace arrow diff --git a/cpp/src/arrow/filesystem/gcsfs_test.cc b/cpp/src/arrow/filesystem/gcsfs_test.cc index 3304d4bcc73..fffb2718c6a 100644 --- a/cpp/src/arrow/filesystem/gcsfs_test.cc +++ b/cpp/src/arrow/filesystem/gcsfs_test.cc @@ -17,6 +17,7 @@ #include "arrow/filesystem/gcsfs.h" +#include #include #include #include @@ -111,6 +112,13 @@ class GcsIntegrationTest : public ::testing::Test { return options; } + gcs::Client GcsClient() { + return gcs::Client( + google::cloud::Options{} + .set("http://127.0.0.1:" + port_) + .set(gc::MakeInsecureCredentials())); + } + private: std::string port_; bp::child server_process_; @@ -276,6 +284,35 @@ TEST(GcsFileSystem, ToObjectMetadataInvalidCustomTime) { EXPECT_THAT(metadata.status().message(), HasSubstr("Error parsing RFC-3339")); } +TEST(GcsFileSystem, ObjectMetadataRoundtrip) { + const auto source = KeyValueMetadataForTest(); + gcs::WithObjectMetadata tmp; + ASSERT_OK_AND_ASSIGN(tmp, arrow::fs::internal::ToObjectMetadata(source)); + std::shared_ptr destination; + ASSERT_OK_AND_ASSIGN(destination, arrow::fs::internal::FromObjectMetadata(tmp.value())); + // Only a subset of the keys are configurable in gcs::ObjectMetadata, for example, the + // size and CRC32C values of an object are only settable when the metadata is received + // from the services. For those keys that should be preserved, verify they are preserved + // in the round trip. + for (auto key : {"Cache-Control", "Content-Disposition", "Content-Encoding", + "Content-Language", "Content-Type", "storageClass"}) { + SCOPED_TRACE("Testing key " + std::string(key)); + ASSERT_OK_AND_ASSIGN(auto s, source->Get(key)); + ASSERT_OK_AND_ASSIGN(auto d, destination->Get(key)); + EXPECT_EQ(s, d); + } + // RFC-3339 formatted timestamps may differ in trivial things, e.g., the UTC timezone + // can be represented as 'Z' or '+00:00'. + ASSERT_OK_AND_ASSIGN(auto s, source->Get("customTime")); + ASSERT_OK_AND_ASSIGN(auto d, destination->Get("customTime")); + std::string err; + absl::Time ts; + absl::Time td; + ASSERT_TRUE(absl::ParseTime(absl::RFC3339_full, s, &ts, &err)) << "error=" << err; + ASSERT_TRUE(absl::ParseTime(absl::RFC3339_full, d, &td, &err)) << "error=" << err; + EXPECT_NEAR(absl::ToDoubleSeconds(ts - td), 0, 0.5); +} + TEST_F(GcsIntegrationTest, GetFileInfoBucket) { auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions()); arrow::fs::AssertFileInfo(fs.get(), kPreexistingBucket, FileType::Directory); @@ -352,6 +389,68 @@ TEST_F(GcsIntegrationTest, ReadObjectInfoInvalid) { EXPECT_EQ(result.status().code(), StatusCode::IOError); } +TEST_F(GcsIntegrationTest, ReadObjectReadMetadata) { + auto client = GcsClient(); + const auto custom_time = std::chrono::system_clock::now() + std::chrono::hours(1); + const std::string object_name = "ReadObjectMetadataTest/simple.txt"; + const gcs::ObjectMetadata expected = + client + .InsertObject(kPreexistingBucket, object_name, + "The quick brown fox jumps over the lazy dog", + gcs::WithObjectMetadata(gcs::ObjectMetadata() + .set_content_type("text/plain") + .set_custom_time(custom_time) + .set_cache_control("no-cache") + .upsert_metadata("key0", "value0"))) + .value(); + + auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions()); + std::shared_ptr stream; + ASSERT_OK_AND_ASSIGN( + stream, fs->OpenInputStream(std::string(kPreexistingBucket) + "/" + object_name)); + + auto format_time = [](std::chrono::system_clock::time_point tp) { + return absl::FormatTime(absl::RFC3339_full, absl::FromChrono(tp), + absl::UTCTimeZone()); + }; + + std::shared_ptr actual; + ASSERT_OK_AND_ASSIGN(actual, stream->ReadMetadata()); + ASSERT_OK_AND_EQ(expected.self_link(), actual->Get("selfLink")); + ASSERT_OK_AND_EQ(expected.name(), actual->Get("name")); + ASSERT_OK_AND_EQ(expected.bucket(), actual->Get("bucket")); + ASSERT_OK_AND_EQ(std::to_string(expected.generation()), actual->Get("generation")); + ASSERT_OK_AND_EQ(expected.content_type(), actual->Get("Content-Type")); + ASSERT_OK_AND_EQ(format_time(expected.time_created()), actual->Get("timeCreated")); + ASSERT_OK_AND_EQ(format_time(expected.updated()), actual->Get("updated")); + ASSERT_FALSE(actual->Contains("timeDeleted")); + ASSERT_OK_AND_EQ(format_time(custom_time), actual->Get("customTime")); + ASSERT_OK_AND_EQ("false", actual->Get("temporaryHold")); + ASSERT_OK_AND_EQ("false", actual->Get("eventBasedHold")); + ASSERT_FALSE(actual->Contains("retentionExpirationTime")); + ASSERT_OK_AND_EQ(expected.storage_class(), actual->Get("storageClass")); + ASSERT_FALSE(actual->Contains("storageClassUpdated")); + ASSERT_OK_AND_EQ(std::to_string(expected.size()), actual->Get("size")); + ASSERT_OK_AND_EQ(expected.md5_hash(), actual->Get("md5Hash")); + ASSERT_OK_AND_EQ(expected.media_link(), actual->Get("mediaLink")); + ASSERT_OK_AND_EQ(expected.content_encoding(), actual->Get("Content-Encoding")); + ASSERT_OK_AND_EQ(expected.content_disposition(), actual->Get("Content-Disposition")); + ASSERT_OK_AND_EQ(expected.content_language(), actual->Get("Content-Language")); + ASSERT_OK_AND_EQ(expected.cache_control(), actual->Get("Cache-Control")); + auto p = expected.metadata().find("key0"); + ASSERT_TRUE(p != expected.metadata().end()); + ASSERT_OK_AND_EQ(p->second, actual->Get("metadata.key0")); + ASSERT_EQ(expected.has_owner(), actual->Contains("owner.entity")); + ASSERT_EQ(expected.has_owner(), actual->Contains("owner.entityId")); + ASSERT_OK_AND_EQ(expected.crc32c(), actual->Get("crc32c")); + ASSERT_OK_AND_EQ(std::to_string(expected.component_count()), + actual->Get("componentCount")); + ASSERT_OK_AND_EQ(expected.etag(), actual->Get("etag")); + ASSERT_FALSE(actual->Contains("customerEncryption.encryptionAlgorithm")); + ASSERT_FALSE(actual->Contains("customerEncryption.keySha256")); + ASSERT_FALSE(actual->Contains("kmsKeyName")); +} + TEST_F(GcsIntegrationTest, WriteObjectSmall) { auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions());