From 7009f5ec4666d52c9097a2fa86587830167490ab Mon Sep 17 00:00:00 2001 From: Carlos O'Ryan Date: Fri, 15 Oct 2021 12:58:50 +0000 Subject: [PATCH 1/2] ARROW-14345: [C++] Implement streaming reads --- cpp/src/arrow/filesystem/gcsfs.cc | 63 +++++++++++- cpp/src/arrow/filesystem/gcsfs_internal.cc | 4 +- cpp/src/arrow/filesystem/gcsfs_test.cc | 109 +++++++++++++++++++-- 3 files changed, 162 insertions(+), 14 deletions(-) diff --git a/cpp/src/arrow/filesystem/gcsfs.cc b/cpp/src/arrow/filesystem/gcsfs.cc index ff911d02ab3..898e54cf593 100644 --- a/cpp/src/arrow/filesystem/gcsfs.cc +++ b/cpp/src/arrow/filesystem/gcsfs.cc @@ -19,6 +19,7 @@ #include +#include "arrow/buffer.h" #include "arrow/filesystem/gcsfs_internal.h" #include "arrow/filesystem/path_util.h" #include "arrow/result.h" @@ -28,6 +29,8 @@ namespace arrow { namespace fs { namespace { +namespace gcs = google::cloud::storage; + auto constexpr kSep = '/'; struct GcsPath { @@ -58,9 +61,48 @@ struct GcsPath { } }; -} // namespace +class GcsInputStream : public arrow::io::InputStream { + public: + explicit GcsInputStream(gcs::ObjectReadStream stream) : stream_(std::move(stream)) {} -namespace gcs = google::cloud::storage; + ~GcsInputStream() override = default; + + Status Close() override { + stream_.Close(); + return Status::OK(); + } + + Result Tell() const override { + if (!stream_) { + return Status::IOError("invalid stream"); + } + return stream_.tellg(); + } + + bool closed() const override { return !stream_.IsOpen(); } + + Result Read(int64_t nbytes, void* out) override { + stream_.read(static_cast(out), nbytes); + if (!stream_.status().ok()) { + return internal::ToArrowStatus(stream_.status()); + } + return stream_.gcount(); + } + + Result> Read(int64_t nbytes) override { + ARROW_ASSIGN_OR_RAISE(auto buffer, arrow::AllocateResizableBuffer(nbytes)); + stream_.read(reinterpret_cast(buffer->mutable_data()), nbytes); + if (!stream_.status().ok()) { + return internal::ToArrowStatus(stream_.status()); + } + return arrow::SliceMutableBufferSafe(std::move(buffer), 0, stream_.gcount()); + } + + private: + mutable gcs::ObjectReadStream stream_; +}; + +} // namespace google::cloud::Options AsGoogleCloudOptions(const GcsOptions& o) { auto options = google::cloud::Options{}; @@ -95,6 +137,14 @@ class GcsFileSystem::Impl { return GetFileInfoImpl(path, std::move(meta).status(), FileType::Directory); } + Result> OpenInputStream(const GcsPath& path) { + auto stream = client_.ReadObject(path.bucket, path.object); + if (!stream.status().ok()) { + return internal::ToArrowStatus(stream.status()); + } + return std::make_shared(std::move(stream)); + } + private: static Result GetFileInfoImpl(const GcsPath& path, const google::cloud::Status& status, @@ -169,12 +219,17 @@ Status GcsFileSystem::CopyFile(const std::string& src, const std::string& dest) Result> GcsFileSystem::OpenInputStream( const std::string& path) { - return Status::NotImplemented("The GCS FileSystem is not fully implemented"); + ARROW_ASSIGN_OR_RAISE(auto p, GcsPath::FromString(path)); + return impl_->OpenInputStream(p); } Result> GcsFileSystem::OpenInputStream( const FileInfo& info) { - return Status::NotImplemented("The GCS FileSystem is not fully implemented"); + if (!info.IsFile()) { + return Status::IOError("Only files can be opened as input streams"); + } + ARROW_ASSIGN_OR_RAISE(auto p, GcsPath::FromString(info.path())); + return impl_->OpenInputStream(p); } Result> GcsFileSystem::OpenInputFile( diff --git a/cpp/src/arrow/filesystem/gcsfs_internal.cc b/cpp/src/arrow/filesystem/gcsfs_internal.cc index 22df5cebf67..898015859c2 100644 --- a/cpp/src/arrow/filesystem/gcsfs_internal.cc +++ b/cpp/src/arrow/filesystem/gcsfs_internal.cc @@ -38,10 +38,8 @@ Status ToArrowStatus(const google::cloud::Status& s) { case google::cloud::StatusCode::kInvalidArgument: return Status::Invalid(os.str()); case google::cloud::StatusCode::kDeadlineExceeded: - return Status::IOError(os.str()); case google::cloud::StatusCode::kNotFound: - // TODO: it is unclear if a better mapping would be possible. - return Status::UnknownError(os.str()); + return Status::IOError(os.str()); case google::cloud::StatusCode::kAlreadyExists: return Status::AlreadyExists(os.str()); case google::cloud::StatusCode::kPermissionDenied: diff --git a/cpp/src/arrow/filesystem/gcsfs_test.cc b/cpp/src/arrow/filesystem/gcsfs_test.cc index 0776872e3ac..f228c9ce3ec 100644 --- a/cpp/src/arrow/filesystem/gcsfs_test.cc +++ b/cpp/src/arrow/filesystem/gcsfs_test.cc @@ -24,11 +24,13 @@ #include #include +#include #include #include #include "arrow/filesystem/gcsfs_internal.h" #include "arrow/filesystem/test_util.h" +#include "arrow/testing/gtest_util.h" #include "arrow/testing/util.h" namespace arrow { @@ -45,6 +47,15 @@ using ::testing::Not; using ::testing::NotNull; auto const* kPreexistingBucket = "test-bucket-name"; +auto const* kPreexistingObject = "test-object-name"; +auto const* kLoremIpsum = R"""( +Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor +incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis +nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. +Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu +fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in +culpa qui officia deserunt mollit anim id est laborum. +)"""; class GcsIntegrationTest : public ::testing::Test { public: @@ -65,16 +76,29 @@ class GcsIntegrationTest : public ::testing::Test { server_process_ = bp::child(boost::this_process::environment(), exe_path, "-m", "testbench", "--port", port_); - // Create a bucket in the testbench. This makes it easier to bootstrap GcsFileSystem - // and its tests. + // Create a bucket and a small file in the testbench. This makes it easier to + // bootstrap GcsFileSystem and its tests. auto client = gcs::Client( google::cloud::Options{} .set("http://127.0.0.1:" + port_) .set(gc::MakeInsecureCredentials())); - google::cloud::StatusOr metadata = client.CreateBucketForProject( + google::cloud::StatusOr bucket = client.CreateBucketForProject( kPreexistingBucket, "ignored-by-testbench", gcs::BucketMetadata{}); - ASSERT_TRUE(metadata.ok()) << "Failed to create bucket <" << kPreexistingBucket - << ">, status=" << metadata.status(); + ASSERT_TRUE(bucket.ok()) << "Failed to create bucket <" << kPreexistingBucket + << ">, status=" << bucket.status(); + + google::cloud::StatusOr object = + client.InsertObject(kPreexistingBucket, kPreexistingObject, kLoremIpsum); + ASSERT_TRUE(object.ok()) << "Failed to create object <" << kPreexistingObject + << ">, status=" << object.status(); + } + + static std::string PreexistingObjectPath() { + return std::string(kPreexistingBucket) + "/" + kPreexistingObject; + } + + static std::string NotFoundObjectPath() { + return std::string(kPreexistingBucket) + "/not-found"; } GcsOptions TestGcsOptions() { @@ -114,7 +138,7 @@ TEST(GcsFileSystem, ToArrowStatus) { {google::cloud::StatusCode::kUnknown, StatusCode::UnknownError}, {google::cloud::StatusCode::kInvalidArgument, StatusCode::Invalid}, {google::cloud::StatusCode::kDeadlineExceeded, StatusCode::IOError}, - {google::cloud::StatusCode::kNotFound, StatusCode::UnknownError}, + {google::cloud::StatusCode::kNotFound, StatusCode::IOError}, {google::cloud::StatusCode::kAlreadyExists, StatusCode::AlreadyExists}, {google::cloud::StatusCode::kPermissionDenied, StatusCode::IOError}, {google::cloud::StatusCode::kUnauthenticated, StatusCode::IOError}, @@ -159,11 +183,82 @@ TEST(GcsFileSystem, FileSystemCompare) { EXPECT_FALSE(a->Equals(*b)); } -TEST_F(GcsIntegrationTest, MakeBucket) { +TEST_F(GcsIntegrationTest, GetFileInfoBucket) { auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions()); arrow::fs::AssertFileInfo(fs.get(), kPreexistingBucket, FileType::Directory); } +TEST_F(GcsIntegrationTest, GetFileInfoObject) { + auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions()); + arrow::fs::AssertFileInfo(fs.get(), PreexistingObjectPath(), FileType::File); +} + +TEST_F(GcsIntegrationTest, ReadObjectString) { + auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions()); + + std::shared_ptr stream; + ASSERT_OK_AND_ASSIGN(stream, fs->OpenInputStream(PreexistingObjectPath())); + + std::array buffer{}; + std::int64_t size; + ASSERT_OK_AND_ASSIGN(size, stream->Read(buffer.size(), buffer.data())); + + EXPECT_EQ(std::string(buffer.data(), size), kLoremIpsum); +} + +TEST_F(GcsIntegrationTest, ReadObjectStringBuffers) { + auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions()); + + std::shared_ptr stream; + ASSERT_OK_AND_ASSIGN(stream, fs->OpenInputStream(PreexistingObjectPath())); + + std::string contents; + for (Result> r = stream->Read(16); r.ok() && (*r)->size() != 0; + r = stream->Read(16)) { + auto buffer = *r; + contents.append(buffer->ToString()); + } + + EXPECT_EQ(contents, kLoremIpsum); +} + +TEST_F(GcsIntegrationTest, ReadObjectInfo) { + auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions()); + + arrow::fs::FileInfo info; + ASSERT_OK_AND_ASSIGN(info, fs->GetFileInfo(PreexistingObjectPath())); + + std::shared_ptr stream; + ASSERT_OK_AND_ASSIGN(stream, fs->OpenInputStream(info)); + + std::array buffer{}; + std::int64_t size; + ASSERT_OK_AND_ASSIGN(size, stream->Read(buffer.size(), buffer.data())); + + EXPECT_EQ(std::string(buffer.data(), size), kLoremIpsum); +} + +TEST_F(GcsIntegrationTest, ReadObjectNotFound) { + auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions()); + + auto result = fs->OpenInputStream(NotFoundObjectPath()); + EXPECT_EQ(result.status().code(), StatusCode::IOError); +} + +TEST_F(GcsIntegrationTest, ReadObjectInfoInvalid) { + auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions()); + + arrow::fs::FileInfo info; + ASSERT_OK_AND_ASSIGN(info, fs->GetFileInfo(kPreexistingBucket)); + + auto result = fs->OpenInputStream(NotFoundObjectPath()); + EXPECT_EQ(result.status().code(), StatusCode::IOError); + + ASSERT_OK_AND_ASSIGN(info, fs->GetFileInfo(NotFoundObjectPath())); + result = fs->OpenInputStream(NotFoundObjectPath()); + EXPECT_EQ(result.status().code(), StatusCode::IOError); +} + } // namespace } // namespace fs } // namespace arrow From 200fd2a350eabc559977a42b7d41b4c445afb55c Mon Sep 17 00:00:00 2001 From: Carlos O'Ryan Date: Tue, 19 Oct 2021 20:13:41 +0000 Subject: [PATCH 2/2] Address review comments --- cpp/src/arrow/filesystem/gcsfs_test.cc | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/cpp/src/arrow/filesystem/gcsfs_test.cc b/cpp/src/arrow/filesystem/gcsfs_test.cc index f228c9ce3ec..369317fbb34 100644 --- a/cpp/src/arrow/filesystem/gcsfs_test.cc +++ b/cpp/src/arrow/filesystem/gcsfs_test.cc @@ -213,11 +213,11 @@ TEST_F(GcsIntegrationTest, ReadObjectStringBuffers) { ASSERT_OK_AND_ASSIGN(stream, fs->OpenInputStream(PreexistingObjectPath())); std::string contents; - for (Result> r = stream->Read(16); r.ok() && (*r)->size() != 0; - r = stream->Read(16)) { - auto buffer = *r; + std::shared_ptr buffer; + do { + ASSERT_OK_AND_ASSIGN(buffer, stream->Read(16)); contents.append(buffer->ToString()); - } + } while (buffer && buffer->size() != 0); EXPECT_EQ(contents, kLoremIpsum); }