Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 70 additions & 10 deletions cpp/src/arrow/filesystem/gcsfs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,23 @@
#include "arrow/result.h"
#include "arrow/util/checked_cast.h"

#define ARROW_GCS_RETURN_NOT_OK(expr) \
if (!expr.ok()) return internal::ToArrowStatus(expr)

namespace arrow {
namespace fs {
namespace {

namespace gcs = google::cloud::storage;

auto constexpr kSep = '/';
// Change the default upload buffer size. In general, sending larger buffers is more
// efficient with GCS, as each buffer requires a roundtrip to the service. With formatted
// output (when using `operator<<`), keeping a larger buffer in memory before uploading
// makes sense. With unformatted output (the only choice given gcs::io::OutputStream's
// API) it is better to let the caller provide as large a buffer as they want. The GCS C++
// client library will upload this buffer with zero copies if possible.
auto constexpr kUploadBufferSize = 256 * 1024;

struct GcsPath {
std::string full_path;
Expand Down Expand Up @@ -83,18 +93,14 @@ class GcsInputStream : public arrow::io::InputStream {

Result<int64_t> Read(int64_t nbytes, void* out) override {
stream_.read(static_cast<char*>(out), nbytes);
if (!stream_.status().ok()) {
return internal::ToArrowStatus(stream_.status());
}
ARROW_GCS_RETURN_NOT_OK(stream_.status());
return stream_.gcount();
}

Result<std::shared_ptr<Buffer>> Read(int64_t nbytes) override {
ARROW_ASSIGN_OR_RAISE(auto buffer, arrow::AllocateResizableBuffer(nbytes));
stream_.read(reinterpret_cast<char*>(buffer->mutable_data()), nbytes);
if (!stream_.status().ok()) {
return internal::ToArrowStatus(stream_.status());
}
ARROW_GCS_RETURN_NOT_OK(stream_.status());
RETURN_NOT_OK(buffer->Resize(stream_.gcount(), true));
return buffer;
}
Expand All @@ -103,6 +109,43 @@ class GcsInputStream : public arrow::io::InputStream {
mutable gcs::ObjectReadStream stream_;
};

class GcsOutputStream : public arrow::io::OutputStream {
public:
explicit GcsOutputStream(gcs::ObjectWriteStream stream) : stream_(std::move(stream)) {}
~GcsOutputStream() override = default;

Status Close() override {
stream_.Close();
return internal::ToArrowStatus(stream_.last_status());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does last_status also clear the error status or is it sticky? If it's sticky, then a failed Write would also return an error when calling Close?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is sticky. And yes, a failed Write() will make subsequent Close() fail. It is unadvisable to finalize a stream that failed, you don't know what is its state.

}

Result<int64_t> Tell() const override {
if (!stream_) {
return Status::IOError("invalid stream");
}
return tell_;
}

bool closed() const override { return !stream_.IsOpen(); }

Status Write(const void* data, int64_t nbytes) override {
if (stream_.write(reinterpret_cast<const char*>(data), nbytes)) {
tell_ += nbytes;
return Status::OK();
}
return internal::ToArrowStatus(stream_.last_status());
}

Status Flush() override {
stream_.flush();
return Status::OK();
}

private:
gcs::ObjectWriteStream stream_;
int64_t tell_ = 0;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It appears tell_ is never updated anywhere. Should you do it in Write perhaps?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. Thanks.

};

} // namespace

google::cloud::Options AsGoogleCloudOptions(const GcsOptions& o) {
Expand All @@ -116,6 +159,7 @@ google::cloud::Options AsGoogleCloudOptions(const GcsOptions& o) {
options.set<google::cloud::UnifiedCredentialsOption>(
google::cloud::MakeInsecureCredentials());
}
options.set<gcs::UploadBufferSizeOption>(kUploadBufferSize);
if (!o.endpoint_override.empty()) {
options.set<gcs::RestEndpointOption>(scheme + "://" + o.endpoint_override);
}
Expand All @@ -140,12 +184,27 @@ class GcsFileSystem::Impl {

Result<std::shared_ptr<io::InputStream>> OpenInputStream(const GcsPath& path) {
auto stream = client_.ReadObject(path.bucket, path.object);
if (!stream.status().ok()) {
return internal::ToArrowStatus(stream.status());
}
ARROW_GCS_RETURN_NOT_OK(stream.status());
return std::make_shared<GcsInputStream>(std::move(stream));
}

Result<std::shared_ptr<io::OutputStream>> OpenOutputStream(
const GcsPath& path, const std::shared_ptr<const KeyValueMetadata>& metadata) {
gcs::EncryptionKey encryption_key;
ARROW_ASSIGN_OR_RAISE(encryption_key, internal::ToEncryptionKey(metadata));
gcs::PredefinedAcl predefined_acl;
ARROW_ASSIGN_OR_RAISE(predefined_acl, internal::ToPredefinedAcl(metadata));
gcs::KmsKeyName kms_key_name;
ARROW_ASSIGN_OR_RAISE(kms_key_name, internal::ToKmsKeyName(metadata));
gcs::WithObjectMetadata with_object_metadata;
ARROW_ASSIGN_OR_RAISE(with_object_metadata, internal::ToObjectMetadata(metadata));

auto stream = client_.WriteObject(path.bucket, path.object, encryption_key,
predefined_acl, kms_key_name, with_object_metadata);
ARROW_GCS_RETURN_NOT_OK(stream.last_status());
return std::make_shared<GcsOutputStream>(std::move(stream));
}

private:
static Result<FileInfo> GetFileInfoImpl(const GcsPath& path,
const google::cloud::Status& status,
Expand Down Expand Up @@ -245,7 +304,8 @@ Result<std::shared_ptr<io::RandomAccessFile>> GcsFileSystem::OpenInputFile(

Result<std::shared_ptr<io::OutputStream>> GcsFileSystem::OpenOutputStream(
const std::string& path, const std::shared_ptr<const KeyValueMetadata>& metadata) {
return Status::NotImplemented("The GCS FileSystem is not fully implemented");
ARROW_ASSIGN_OR_RAISE(auto p, GcsPath::FromString(path));
return impl_->OpenOutputStream(p, metadata);
}

Result<std::shared_ptr<io::OutputStream>> GcsFileSystem::OpenAppendStream(
Expand Down
131 changes: 131 additions & 0 deletions cpp/src/arrow/filesystem/gcsfs_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,13 @@

#include "arrow/filesystem/gcsfs_internal.h"

#include <absl/time/time.h> // NOLINT
#include <google/cloud/storage/client.h>

#include <sstream>
#include <unordered_map>

#include "arrow/util/key_value_metadata.h"

namespace arrow {
namespace fs {
Expand Down Expand Up @@ -62,6 +66,133 @@ Status ToArrowStatus(const google::cloud::Status& s) {
return Status::OK();
}

namespace gcs = ::google::cloud::storage;

Result<gcs::EncryptionKey> ToEncryptionKey(
const std::shared_ptr<const KeyValueMetadata>& metadata) {
if (!metadata) {
return gcs::EncryptionKey{};
}

const auto& keys = metadata->keys();
const auto& values = metadata->values();

for (std::size_t i = 0; i < keys.size(); ++i) {
if (keys[i] == "encryptionKeyBase64") {
return gcs::EncryptionKey::FromBase64Key(values[i]);
}
}
return gcs::EncryptionKey{};
}

Result<gcs::KmsKeyName> ToKmsKeyName(
const std::shared_ptr<const KeyValueMetadata>& metadata) {
if (!metadata) {
return gcs::KmsKeyName{};
}

const auto& keys = metadata->keys();
const auto& values = metadata->values();

for (std::size_t i = 0; i < keys.size(); ++i) {
if (keys[i] == "kmsKeyName") {
return gcs::KmsKeyName(values[i]);
}
}
return gcs::KmsKeyName{};
}

Result<gcs::PredefinedAcl> ToPredefinedAcl(
const std::shared_ptr<const KeyValueMetadata>& metadata) {
if (!metadata) {
return gcs::PredefinedAcl{};
}

const auto& keys = metadata->keys();
const auto& values = metadata->values();

for (std::size_t i = 0; i < keys.size(); ++i) {
if (keys[i] == "predefinedAcl") {
return gcs::PredefinedAcl(values[i]);
}
}
return gcs::PredefinedAcl{};
}

Result<gcs::WithObjectMetadata> ToObjectMetadata(
const std::shared_ptr<const KeyValueMetadata>& metadata) {
if (!metadata) {
return gcs::WithObjectMetadata{};
}

static auto const setters = [] {
using setter = std::function<Status(gcs::ObjectMetadata&, const std::string&)>;
return std::unordered_map<std::string, setter>{
{"Cache-Control",
[](gcs::ObjectMetadata& m, const std::string& v) {
m.set_cache_control(v);
return Status::OK();
}},
{"Content-Disposition",
[](gcs::ObjectMetadata& m, const std::string& v) {
m.set_content_disposition(v);
return Status::OK();
}},
{"Content-Encoding",
[](gcs::ObjectMetadata& m, const std::string& v) {
m.set_content_encoding(v);
return Status::OK();
}},
{"Content-Language",
[](gcs::ObjectMetadata& m, const std::string& v) {
m.set_content_language(v);
return Status::OK();
}},
{"Content-Type",
[](gcs::ObjectMetadata& m, const std::string& v) {
m.set_content_type(v);
return Status::OK();
}},
{"customTime",
[](gcs::ObjectMetadata& m, const std::string& v) {
std::string err;
absl::Time t;
if (!absl::ParseTime(absl::RFC3339_full, v, &t, &err)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is absl already an include-time dependency of GCS?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Some Abseil types are exposed in the public API for google-cloud-cpp.

return Status::Invalid("Error parsing RFC-3339 timestamp: '", v, "': ", err);
}
m.set_custom_time(absl::ToChronoTime(t));
return Status::OK();
}},
{"storageClass",
[](gcs::ObjectMetadata& m, const std::string& v) {
m.set_storage_class(v);
return Status::OK();
}},
{"predefinedAcl",
[](gcs::ObjectMetadata&, const std::string&) { return Status::OK(); }},
{"encryptionKeyBase64",
[](gcs::ObjectMetadata&, const std::string&) { return Status::OK(); }},
{"kmsKeyName",
[](gcs::ObjectMetadata&, const std::string&) { return Status::OK(); }},
};
}();

const auto& keys = metadata->keys();
const auto& values = metadata->values();

gcs::ObjectMetadata object_metadata;
for (std::size_t i = 0; i < keys.size(); ++i) {
auto it = setters.find(keys[i]);
if (it != setters.end()) {
auto status = it->second(object_metadata, values[i]);
if (!status.ok()) return status;
} else {
object_metadata.upsert_metadata(keys[i], values[i]);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is meant to allow inserting arbitrary metadata strings? Will GCS balk if the user throws some unrecognized metadata keys here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GCS accepts (mostly) arbitrary metadata keys:

https://cloud.google.com/storage/docs/metadata#custom-metadata

}
}
return gcs::WithObjectMetadata(std::move(object_metadata));
}

} // namespace internal
} // namespace fs
} // namespace arrow
15 changes: 15 additions & 0 deletions cpp/src/arrow/filesystem/gcsfs_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
#pragma once

#include <google/cloud/status.h>
#include <google/cloud/storage/object_metadata.h>
#include <google/cloud/storage/well_known_headers.h>
#include <google/cloud/storage/well_known_parameters.h>

#include <memory>
#include <string>
Expand All @@ -31,6 +34,18 @@ namespace internal {

Status ToArrowStatus(const google::cloud::Status& s);

Result<google::cloud::storage::EncryptionKey> ToEncryptionKey(
const std::shared_ptr<const KeyValueMetadata>& metadata);

Result<google::cloud::storage::PredefinedAcl> ToPredefinedAcl(
const std::shared_ptr<const KeyValueMetadata>& metadata);

Result<google::cloud::storage::KmsKeyName> ToKmsKeyName(
const std::shared_ptr<const KeyValueMetadata>& metadata);

Result<google::cloud::storage::WithObjectMetadata> ToObjectMetadata(
const std::shared_ptr<const KeyValueMetadata>& metadata);

} // namespace internal
} // namespace fs
} // namespace arrow
Loading