Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 29 additions & 2 deletions cpp/src/arrow/filesystem/gcsfs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,19 @@ struct GcsPath {

class GcsInputStream : public arrow::io::InputStream {
public:
explicit GcsInputStream(gcs::ObjectReadStream stream) : stream_(std::move(stream)) {}
explicit GcsInputStream(gcs::ObjectReadStream stream, std::string bucket_name,
std::string object_name, gcs::Generation generation,
gcs::Client client)
: stream_(std::move(stream)),
bucket_name_(std::move(bucket_name)),
object_name_(std::move(object_name)),
generation_(generation),
client_(std::move(client)) {}

~GcsInputStream() override = default;

//@{
// @name FileInterface
Status Close() override {
stream_.Close();
return Status::OK();
Expand All @@ -90,7 +99,10 @@ class GcsInputStream : public arrow::io::InputStream {
}

bool closed() const override { return !stream_.IsOpen(); }
//@}

//@{
// @name Readable
Result<int64_t> Read(int64_t nbytes, void* out) override {
stream_.read(static_cast<char*>(out), nbytes);
ARROW_GCS_RETURN_NOT_OK(stream_.status());
Expand All @@ -104,9 +116,23 @@ class GcsInputStream : public arrow::io::InputStream {
RETURN_NOT_OK(buffer->Resize(stream_.gcount(), true));
return buffer;
}
//@}

//@{
// @name InputStream
Result<std::shared_ptr<const KeyValueMetadata>> ReadMetadata() override {
auto metadata = client_.GetObjectMetadata(bucket_name_, object_name_, generation_);
ARROW_GCS_RETURN_NOT_OK(metadata.status());
return internal::FromObjectMetadata(*metadata);
}
//@}

private:
mutable gcs::ObjectReadStream stream_;
std::string bucket_name_;
std::string object_name_;
gcs::Generation generation_;
gcs::Client client_;
};

class GcsOutputStream : public arrow::io::OutputStream {
Expand Down Expand Up @@ -185,7 +211,8 @@ class GcsFileSystem::Impl {
Result<std::shared_ptr<io::InputStream>> OpenInputStream(const GcsPath& path) {
auto stream = client_.ReadObject(path.bucket, path.object);
ARROW_GCS_RETURN_NOT_OK(stream.status());
return std::make_shared<GcsInputStream>(std::move(stream));
return std::make_shared<GcsInputStream>(std::move(stream), path.bucket, path.object,
gcs::Generation(), client_);
}

Result<std::shared_ptr<io::OutputStream>> OpenOutputStream(
Expand Down
65 changes: 65 additions & 0 deletions cpp/src/arrow/filesystem/gcsfs_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

#include <sstream>
#include <unordered_map>
#include <vector>

#include "arrow/util/key_value_metadata.h"

Expand Down Expand Up @@ -193,6 +194,70 @@ Result<gcs::WithObjectMetadata> ToObjectMetadata(
return gcs::WithObjectMetadata(std::move(object_metadata));
}

Result<std::shared_ptr<const KeyValueMetadata>> FromObjectMetadata(
gcs::ObjectMetadata const& m) {
auto format_time = [](std::chrono::system_clock::time_point tp) {
return absl::FormatTime(absl::RFC3339_full, absl::FromChrono(tp),
absl::UTCTimeZone());
};
auto result = std::make_shared<KeyValueMetadata>();
// The fields are in the same order as defined in:
// https://cloud.google.com/storage/docs/json_api/v1/objects
// Where previous practice in Arrow suggested using a different field name (Content-Type
// vs. contentType) we prefer the existing practice in Arrow.
result->Append("id", m.id());
result->Append("selfLink", m.self_link());
result->Append("name", m.name());
result->Append("bucket", m.bucket());
result->Append("generation", std::to_string(m.generation()));
result->Append("Content-Type", m.content_type());
result->Append("timeCreated", format_time(m.time_created()));
result->Append("updated", format_time(m.updated()));
if (m.has_custom_time()) {
result->Append("customTime", format_time(m.custom_time()));
}
if (m.time_deleted() != std::chrono::system_clock::time_point()) {
result->Append("timeDeleted", format_time(m.time_deleted()));
}
result->Append("temporaryHold", m.temporary_hold() ? "true" : "false");
result->Append("eventBasedHold", m.event_based_hold() ? "true" : "false");
if (m.retention_expiration_time() != std::chrono::system_clock::time_point()) {
result->Append("retentionExpirationTime", format_time(m.retention_expiration_time()));
}
result->Append("storageClass", m.storage_class());
if (m.time_storage_class_updated() != std::chrono::system_clock::time_point()) {
result->Append("timeStorageClassUpdated",
format_time(m.time_storage_class_updated()));
}
result->Append("size", std::to_string(m.size()));
result->Append("md5Hash", m.md5_hash());
result->Append("mediaLink", m.media_link());
result->Append("Content-Encoding", m.content_encoding());
result->Append("Content-Disposition", m.content_disposition());
result->Append("Content-Language", m.content_language());
result->Append("Cache-Control", m.cache_control());
for (auto const& kv : m.metadata()) {
result->Append("metadata." + kv.first, kv.second);
}
// Skip "acl" because it is overly complex
if (m.has_owner()) {
result->Append("owner.entity", m.owner().entity);
result->Append("owner.entityId", m.owner().entity_id);
}
result->Append("crc32c", m.crc32c());
result->Append("componentCount", std::to_string(m.component_count()));
result->Append("etag", m.etag());
if (m.has_customer_encryption()) {
result->Append("customerEncryption.encryptionAlgorithm",
m.customer_encryption().encryption_algorithm);
result->Append("customerEncryption.keySha256", m.customer_encryption().key_sha256);
}
if (!m.kms_key_name().empty()) {
result->Append("kmsKeyName", m.kms_key_name());
}
return result;
}

} // namespace internal
} // namespace fs
} // namespace arrow
3 changes: 3 additions & 0 deletions cpp/src/arrow/filesystem/gcsfs_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ Result<google::cloud::storage::KmsKeyName> ToKmsKeyName(
Result<google::cloud::storage::WithObjectMetadata> ToObjectMetadata(
const std::shared_ptr<const KeyValueMetadata>& metadata);

Result<std::shared_ptr<const KeyValueMetadata>> FromObjectMetadata(
google::cloud::storage::ObjectMetadata const& m);

} // namespace internal
} // namespace fs
} // namespace arrow
99 changes: 99 additions & 0 deletions cpp/src/arrow/filesystem/gcsfs_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include "arrow/filesystem/gcsfs.h"

#include <absl/time/time.h>
#include <gmock/gmock-matchers.h>
#include <gmock/gmock-more-matchers.h>
#include <google/cloud/credentials.h>
Expand Down Expand Up @@ -111,6 +112,13 @@ class GcsIntegrationTest : public ::testing::Test {
return options;
}

gcs::Client GcsClient() {
return gcs::Client(
google::cloud::Options{}
.set<gcs::RestEndpointOption>("http://127.0.0.1:" + port_)
.set<gc::UnifiedCredentialsOption>(gc::MakeInsecureCredentials()));
}

private:
std::string port_;
bp::child server_process_;
Expand Down Expand Up @@ -276,6 +284,35 @@ TEST(GcsFileSystem, ToObjectMetadataInvalidCustomTime) {
EXPECT_THAT(metadata.status().message(), HasSubstr("Error parsing RFC-3339"));
}

TEST(GcsFileSystem, ObjectMetadataRoundtrip) {
const auto source = KeyValueMetadataForTest();
gcs::WithObjectMetadata tmp;
ASSERT_OK_AND_ASSIGN(tmp, arrow::fs::internal::ToObjectMetadata(source));
std::shared_ptr<const KeyValueMetadata> destination;
ASSERT_OK_AND_ASSIGN(destination, arrow::fs::internal::FromObjectMetadata(tmp.value()));
// Only a subset of the keys are configurable in gcs::ObjectMetadata, for example, the
// size and CRC32C values of an object are only settable when the metadata is received
// from the services. For those keys that should be preserved, verify they are preserved
// in the round trip.
for (auto key : {"Cache-Control", "Content-Disposition", "Content-Encoding",
"Content-Language", "Content-Type", "storageClass"}) {
SCOPED_TRACE("Testing key " + std::string(key));
ASSERT_OK_AND_ASSIGN(auto s, source->Get(key));
ASSERT_OK_AND_ASSIGN(auto d, destination->Get(key));
EXPECT_EQ(s, d);
}
// RFC-3339 formatted timestamps may differ in trivial things, e.g., the UTC timezone
// can be represented as 'Z' or '+00:00'.
ASSERT_OK_AND_ASSIGN(auto s, source->Get("customTime"));
ASSERT_OK_AND_ASSIGN(auto d, destination->Get("customTime"));
std::string err;
absl::Time ts;
absl::Time td;
ASSERT_TRUE(absl::ParseTime(absl::RFC3339_full, s, &ts, &err)) << "error=" << err;
ASSERT_TRUE(absl::ParseTime(absl::RFC3339_full, d, &td, &err)) << "error=" << err;
EXPECT_NEAR(absl::ToDoubleSeconds(ts - td), 0, 0.5);
}

TEST_F(GcsIntegrationTest, GetFileInfoBucket) {
auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions());
arrow::fs::AssertFileInfo(fs.get(), kPreexistingBucket, FileType::Directory);
Expand Down Expand Up @@ -352,6 +389,68 @@ TEST_F(GcsIntegrationTest, ReadObjectInfoInvalid) {
EXPECT_EQ(result.status().code(), StatusCode::IOError);
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it also possible to add a simple roundtrip test between ToObjectMetadata and FromObjectMetadata?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done (with some limitations, not all fields have public modifiers in gcs::ObjectMetadata)

TEST_F(GcsIntegrationTest, ReadObjectReadMetadata) {
auto client = GcsClient();
const auto custom_time = std::chrono::system_clock::now() + std::chrono::hours(1);
const std::string object_name = "ReadObjectMetadataTest/simple.txt";
const gcs::ObjectMetadata expected =
client
.InsertObject(kPreexistingBucket, object_name,
"The quick brown fox jumps over the lazy dog",
gcs::WithObjectMetadata(gcs::ObjectMetadata()
.set_content_type("text/plain")
.set_custom_time(custom_time)
.set_cache_control("no-cache")
.upsert_metadata("key0", "value0")))
.value();

auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions());
std::shared_ptr<io::InputStream> stream;
ASSERT_OK_AND_ASSIGN(
stream, fs->OpenInputStream(std::string(kPreexistingBucket) + "/" + object_name));

auto format_time = [](std::chrono::system_clock::time_point tp) {
return absl::FormatTime(absl::RFC3339_full, absl::FromChrono(tp),
absl::UTCTimeZone());
};

std::shared_ptr<const KeyValueMetadata> actual;
ASSERT_OK_AND_ASSIGN(actual, stream->ReadMetadata());
ASSERT_OK_AND_EQ(expected.self_link(), actual->Get("selfLink"));
ASSERT_OK_AND_EQ(expected.name(), actual->Get("name"));
ASSERT_OK_AND_EQ(expected.bucket(), actual->Get("bucket"));
ASSERT_OK_AND_EQ(std::to_string(expected.generation()), actual->Get("generation"));
ASSERT_OK_AND_EQ(expected.content_type(), actual->Get("Content-Type"));
ASSERT_OK_AND_EQ(format_time(expected.time_created()), actual->Get("timeCreated"));
ASSERT_OK_AND_EQ(format_time(expected.updated()), actual->Get("updated"));
ASSERT_FALSE(actual->Contains("timeDeleted"));
ASSERT_OK_AND_EQ(format_time(custom_time), actual->Get("customTime"));
ASSERT_OK_AND_EQ("false", actual->Get("temporaryHold"));
ASSERT_OK_AND_EQ("false", actual->Get("eventBasedHold"));
ASSERT_FALSE(actual->Contains("retentionExpirationTime"));
ASSERT_OK_AND_EQ(expected.storage_class(), actual->Get("storageClass"));
ASSERT_FALSE(actual->Contains("storageClassUpdated"));
ASSERT_OK_AND_EQ(std::to_string(expected.size()), actual->Get("size"));
ASSERT_OK_AND_EQ(expected.md5_hash(), actual->Get("md5Hash"));
ASSERT_OK_AND_EQ(expected.media_link(), actual->Get("mediaLink"));
ASSERT_OK_AND_EQ(expected.content_encoding(), actual->Get("Content-Encoding"));
ASSERT_OK_AND_EQ(expected.content_disposition(), actual->Get("Content-Disposition"));
ASSERT_OK_AND_EQ(expected.content_language(), actual->Get("Content-Language"));
ASSERT_OK_AND_EQ(expected.cache_control(), actual->Get("Cache-Control"));
auto p = expected.metadata().find("key0");
ASSERT_TRUE(p != expected.metadata().end());
ASSERT_OK_AND_EQ(p->second, actual->Get("metadata.key0"));
ASSERT_EQ(expected.has_owner(), actual->Contains("owner.entity"));
ASSERT_EQ(expected.has_owner(), actual->Contains("owner.entityId"));
ASSERT_OK_AND_EQ(expected.crc32c(), actual->Get("crc32c"));
ASSERT_OK_AND_EQ(std::to_string(expected.component_count()),
actual->Get("componentCount"));
ASSERT_OK_AND_EQ(expected.etag(), actual->Get("etag"));
ASSERT_FALSE(actual->Contains("customerEncryption.encryptionAlgorithm"));
ASSERT_FALSE(actual->Contains("customerEncryption.keySha256"));
ASSERT_FALSE(actual->Contains("kmsKeyName"));
}

TEST_F(GcsIntegrationTest, WriteObjectSmall) {
auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions());

Expand Down