Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 59 additions & 4 deletions cpp/src/arrow/filesystem/gcsfs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include <google/cloud/storage/client.h>

#include "arrow/buffer.h"
#include "arrow/filesystem/gcsfs_internal.h"
#include "arrow/filesystem/path_util.h"
#include "arrow/result.h"
Expand All @@ -28,6 +29,8 @@ namespace arrow {
namespace fs {
namespace {

namespace gcs = google::cloud::storage;

auto constexpr kSep = '/';

struct GcsPath {
Expand Down Expand Up @@ -58,9 +61,48 @@ struct GcsPath {
}
};

} // namespace
class GcsInputStream : public arrow::io::InputStream {
public:
explicit GcsInputStream(gcs::ObjectReadStream stream) : stream_(std::move(stream)) {}

namespace gcs = google::cloud::storage;
~GcsInputStream() override = default;

Status Close() override {
stream_.Close();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just wanted to check if stream_.Close() is idempotent (not sure if this is a requirement but I believe all of the other interface implementations allow it). e.g. S3

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Starting with v1.32.0 they are idempotent. Created ARROW-14385 to remind myself of updating any dependencies.

return Status::OK();
}

Result<int64_t> Tell() const override {
if (!stream_) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a reason for this check here, and not in the other methods?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the other methods an invalid stream would result in an error anyway. Happy to add the additional checks if you think they make the code easier to grok.

return Status::IOError("invalid stream");
}
return stream_.tellg();
}

bool closed() const override { return !stream_.IsOpen(); }

Result<int64_t> Read(int64_t nbytes, void* out) override {
stream_.read(static_cast<char*>(out), nbytes);
if (!stream_.status().ok()) {
return internal::ToArrowStatus(stream_.status());
}
return stream_.gcount();
}

Result<std::shared_ptr<Buffer>> Read(int64_t nbytes) override {
ARROW_ASSIGN_OR_RAISE(auto buffer, arrow::AllocateResizableBuffer(nbytes));
stream_.read(reinterpret_cast<char*>(buffer->mutable_data()), nbytes);
if (!stream_.status().ok()) {
return internal::ToArrowStatus(stream_.status());
}
return arrow::SliceMutableBufferSafe(std::move(buffer), 0, stream_.gcount());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm. This is merely slicing the output buffer, but not resizing it. If the actual size is much smaller than nbytes, then there is a significant memory wastage.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True. On the other hand, a short read from a large buffer could require a lot of copying. I am not sure which is more likely to be a problem. I am happy to make a change if you have some data or intuition as to which one is worse.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does GCS return short reads except for the EOF case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For all practical purposes, only the EOF case. The exception would be some kind of transient error that is stubborn enough to exhaust the retry policy.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. In any case, other implementations (such as S3) do resize the buffer, and there doesn't seem to be a reason to act differently here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}

private:
mutable gcs::ObjectReadStream stream_;
};

} // namespace

google::cloud::Options AsGoogleCloudOptions(const GcsOptions& o) {
auto options = google::cloud::Options{};
Expand Down Expand Up @@ -95,6 +137,14 @@ class GcsFileSystem::Impl {
return GetFileInfoImpl(path, std::move(meta).status(), FileType::Directory);
}

Result<std::shared_ptr<io::InputStream>> OpenInputStream(const GcsPath& path) {
auto stream = client_.ReadObject(path.bucket, path.object);
if (!stream.status().ok()) {
return internal::ToArrowStatus(stream.status());
}
return std::make_shared<GcsInputStream>(std::move(stream));
}

private:
static Result<FileInfo> GetFileInfoImpl(const GcsPath& path,
const google::cloud::Status& status,
Expand Down Expand Up @@ -169,12 +219,17 @@ Status GcsFileSystem::CopyFile(const std::string& src, const std::string& dest)

Result<std::shared_ptr<io::InputStream>> GcsFileSystem::OpenInputStream(
const std::string& path) {
return Status::NotImplemented("The GCS FileSystem is not fully implemented");
ARROW_ASSIGN_OR_RAISE(auto p, GcsPath::FromString(path));
return impl_->OpenInputStream(p);
}

Result<std::shared_ptr<io::InputStream>> GcsFileSystem::OpenInputStream(
const FileInfo& info) {
return Status::NotImplemented("The GCS FileSystem is not fully implemented");
if (!info.IsFile()) {
return Status::IOError("Only files can be opened as input streams");
}
ARROW_ASSIGN_OR_RAISE(auto p, GcsPath::FromString(info.path()));
return impl_->OpenInputStream(p);
}

Result<std::shared_ptr<io::RandomAccessFile>> GcsFileSystem::OpenInputFile(
Expand Down
4 changes: 1 addition & 3 deletions cpp/src/arrow/filesystem/gcsfs_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,8 @@ Status ToArrowStatus(const google::cloud::Status& s) {
case google::cloud::StatusCode::kInvalidArgument:
return Status::Invalid(os.str());
case google::cloud::StatusCode::kDeadlineExceeded:
return Status::IOError(os.str());
case google::cloud::StatusCode::kNotFound:
// TODO: it is unclear if a better mapping would be possible.
return Status::UnknownError(os.str());
return Status::IOError(os.str());
case google::cloud::StatusCode::kAlreadyExists:
return Status::AlreadyExists(os.str());
case google::cloud::StatusCode::kPermissionDenied:
Expand Down
109 changes: 102 additions & 7 deletions cpp/src/arrow/filesystem/gcsfs_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@
#include <google/cloud/storage/options.h>
#include <gtest/gtest.h>

#include <array>
#include <boost/process.hpp>
#include <string>

#include "arrow/filesystem/gcsfs_internal.h"
#include "arrow/filesystem/test_util.h"
#include "arrow/testing/gtest_util.h"
#include "arrow/testing/util.h"

namespace arrow {
Expand All @@ -45,6 +47,15 @@ using ::testing::Not;
using ::testing::NotNull;

auto const* kPreexistingBucket = "test-bucket-name";
auto const* kPreexistingObject = "test-object-name";
auto const* kLoremIpsum = R"""(
Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor
incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis
nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat.
Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu
fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in
culpa qui officia deserunt mollit anim id est laborum.
)""";

class GcsIntegrationTest : public ::testing::Test {
public:
Expand All @@ -65,16 +76,29 @@ class GcsIntegrationTest : public ::testing::Test {
server_process_ = bp::child(boost::this_process::environment(), exe_path, "-m",
"testbench", "--port", port_);

// Create a bucket in the testbench. This makes it easier to bootstrap GcsFileSystem
// and its tests.
// Create a bucket and a small file in the testbench. This makes it easier to
// bootstrap GcsFileSystem and its tests.
auto client = gcs::Client(
google::cloud::Options{}
.set<gcs::RestEndpointOption>("http://127.0.0.1:" + port_)
.set<gc::UnifiedCredentialsOption>(gc::MakeInsecureCredentials()));
google::cloud::StatusOr<gcs::BucketMetadata> metadata = client.CreateBucketForProject(
google::cloud::StatusOr<gcs::BucketMetadata> bucket = client.CreateBucketForProject(
kPreexistingBucket, "ignored-by-testbench", gcs::BucketMetadata{});
ASSERT_TRUE(metadata.ok()) << "Failed to create bucket <" << kPreexistingBucket
<< ">, status=" << metadata.status();
ASSERT_TRUE(bucket.ok()) << "Failed to create bucket <" << kPreexistingBucket
<< ">, status=" << bucket.status();

google::cloud::StatusOr<gcs::ObjectMetadata> object =
client.InsertObject(kPreexistingBucket, kPreexistingObject, kLoremIpsum);
ASSERT_TRUE(object.ok()) << "Failed to create object <" << kPreexistingObject
<< ">, status=" << object.status();
}

static std::string PreexistingObjectPath() {
return std::string(kPreexistingBucket) + "/" + kPreexistingObject;
}

static std::string NotFoundObjectPath() {
return std::string(kPreexistingBucket) + "/not-found";
}

GcsOptions TestGcsOptions() {
Expand Down Expand Up @@ -114,7 +138,7 @@ TEST(GcsFileSystem, ToArrowStatus) {
{google::cloud::StatusCode::kUnknown, StatusCode::UnknownError},
{google::cloud::StatusCode::kInvalidArgument, StatusCode::Invalid},
{google::cloud::StatusCode::kDeadlineExceeded, StatusCode::IOError},
{google::cloud::StatusCode::kNotFound, StatusCode::UnknownError},
{google::cloud::StatusCode::kNotFound, StatusCode::IOError},
{google::cloud::StatusCode::kAlreadyExists, StatusCode::AlreadyExists},
{google::cloud::StatusCode::kPermissionDenied, StatusCode::IOError},
{google::cloud::StatusCode::kUnauthenticated, StatusCode::IOError},
Expand Down Expand Up @@ -159,11 +183,82 @@ TEST(GcsFileSystem, FileSystemCompare) {
EXPECT_FALSE(a->Equals(*b));
}

TEST_F(GcsIntegrationTest, MakeBucket) {
TEST_F(GcsIntegrationTest, GetFileInfoBucket) {
auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions());
arrow::fs::AssertFileInfo(fs.get(), kPreexistingBucket, FileType::Directory);
}

TEST_F(GcsIntegrationTest, GetFileInfoObject) {
auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions());
arrow::fs::AssertFileInfo(fs.get(), PreexistingObjectPath(), FileType::File);
}

TEST_F(GcsIntegrationTest, ReadObjectString) {
auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions());

std::shared_ptr<io::InputStream> stream;
ASSERT_OK_AND_ASSIGN(stream, fs->OpenInputStream(PreexistingObjectPath()));

std::array<char, 1024> buffer{};
std::int64_t size;
ASSERT_OK_AND_ASSIGN(size, stream->Read(buffer.size(), buffer.data()));

EXPECT_EQ(std::string(buffer.data(), size), kLoremIpsum);
}

TEST_F(GcsIntegrationTest, ReadObjectStringBuffers) {
auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions());

std::shared_ptr<io::InputStream> stream;
ASSERT_OK_AND_ASSIGN(stream, fs->OpenInputStream(PreexistingObjectPath()));

std::string contents;
std::shared_ptr<Buffer> buffer;
do {
ASSERT_OK_AND_ASSIGN(buffer, stream->Read(16));
contents.append(buffer->ToString());
} while (buffer && buffer->size() != 0);

EXPECT_EQ(contents, kLoremIpsum);
}

TEST_F(GcsIntegrationTest, ReadObjectInfo) {
auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions());

arrow::fs::FileInfo info;
ASSERT_OK_AND_ASSIGN(info, fs->GetFileInfo(PreexistingObjectPath()));

std::shared_ptr<io::InputStream> stream;
ASSERT_OK_AND_ASSIGN(stream, fs->OpenInputStream(info));

std::array<char, 1024> buffer{};
std::int64_t size;
ASSERT_OK_AND_ASSIGN(size, stream->Read(buffer.size(), buffer.data()));

EXPECT_EQ(std::string(buffer.data(), size), kLoremIpsum);
}

TEST_F(GcsIntegrationTest, ReadObjectNotFound) {
auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions());

auto result = fs->OpenInputStream(NotFoundObjectPath());
EXPECT_EQ(result.status().code(), StatusCode::IOError);
}

TEST_F(GcsIntegrationTest, ReadObjectInfoInvalid) {
auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions());

arrow::fs::FileInfo info;
ASSERT_OK_AND_ASSIGN(info, fs->GetFileInfo(kPreexistingBucket));

auto result = fs->OpenInputStream(NotFoundObjectPath());
EXPECT_EQ(result.status().code(), StatusCode::IOError);

ASSERT_OK_AND_ASSIGN(info, fs->GetFileInfo(NotFoundObjectPath()));
result = fs->OpenInputStream(NotFoundObjectPath());
EXPECT_EQ(result.status().code(), StatusCode::IOError);
}

} // namespace
} // namespace fs
} // namespace arrow