Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 122 additions & 10 deletions cpp/src/arrow/filesystem/gcsfs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,12 @@ class GcsInputStream : public arrow::io::InputStream {
public:
explicit GcsInputStream(gcs::ObjectReadStream stream, std::string bucket_name,
std::string object_name, gcs::Generation generation,
gcs::Client client)
gcs::ReadFromOffset offset, gcs::Client client)
: stream_(std::move(stream)),
bucket_name_(std::move(bucket_name)),
object_name_(std::move(object_name)),
generation_(generation),
offset_(offset.value_or(0)),
client_(std::move(client)) {}

~GcsInputStream() override = default;
Expand All @@ -95,7 +96,7 @@ class GcsInputStream : public arrow::io::InputStream {
if (!stream_) {
return Status::IOError("invalid stream");
}
return stream_.tellg();
return stream_.tellg() + offset_;
}

bool closed() const override { return !stream_.IsOpen(); }
Expand Down Expand Up @@ -132,6 +133,7 @@ class GcsInputStream : public arrow::io::InputStream {
std::string bucket_name_;
std::string object_name_;
gcs::Generation generation_;
std::int64_t offset_;
gcs::Client client_;
};

Expand Down Expand Up @@ -172,6 +174,79 @@ class GcsOutputStream : public arrow::io::OutputStream {
int64_t tell_ = 0;
};

using InputStreamFactory = std::function<Result<std::shared_ptr<io::InputStream>>(
const std::string&, const std::string&, gcs::Generation, gcs::ReadFromOffset)>;

class GcsRandomAccessFile : public arrow::io::RandomAccessFile {
public:
GcsRandomAccessFile(InputStreamFactory factory, gcs::ObjectMetadata metadata,
std::shared_ptr<io::InputStream> stream)
: factory_(std::move(factory)),
metadata_(std::move(metadata)),
stream_(std::move(stream)) {}
~GcsRandomAccessFile() override = default;

//@{
// @name FileInterface
Status Close() override { return stream_->Close(); }
Status Abort() override { return stream_->Abort(); }
Result<int64_t> Tell() const override { return stream_->Tell(); }
bool closed() const override { return stream_->closed(); }
//@}

//@{
// @name Readable
Result<int64_t> Read(int64_t nbytes, void* out) override {
return stream_->Read(nbytes, out);
}
Result<std::shared_ptr<Buffer>> Read(int64_t nbytes) override {
return stream_->Read(nbytes);
}
const arrow::io::IOContext& io_context() const override {
return stream_->io_context();
}
//@}

//@{
// @name InputStream
Result<std::shared_ptr<const KeyValueMetadata>> ReadMetadata() override {
return internal::FromObjectMetadata(metadata_);
}
//@}

//@{
// @name RandomAccessFile
Result<int64_t> GetSize() override { return metadata_.size(); }
Result<int64_t> ReadAt(int64_t position, int64_t nbytes, void* out) override {
std::shared_ptr<io::InputStream> stream;
ARROW_ASSIGN_OR_RAISE(stream, factory_(metadata_.bucket(), metadata_.name(),
gcs::Generation(metadata_.generation()),
gcs::ReadFromOffset(position)));
return stream->Read(nbytes, out);
}
Result<std::shared_ptr<Buffer>> ReadAt(int64_t position, int64_t nbytes) override {
std::shared_ptr<io::InputStream> stream;
ARROW_ASSIGN_OR_RAISE(stream, factory_(metadata_.bucket(), metadata_.name(),
gcs::Generation(metadata_.generation()),
gcs::ReadFromOffset(position)));
return stream->Read(nbytes);
}
//@}

// from Seekable
Status Seek(int64_t position) override {
ARROW_ASSIGN_OR_RAISE(stream_, factory_(metadata_.bucket(), metadata_.name(),
gcs::Generation(metadata_.generation()),
gcs::ReadFromOffset(position)));
return Status::OK();
}

private:
InputStreamFactory factory_;
gcs::ObjectMetadata metadata_;
std::shared_ptr<io::InputStream> stream_;
};

} // namespace

google::cloud::Options AsGoogleCloudOptions(const GcsOptions& o) {
Expand Down Expand Up @@ -214,11 +289,14 @@ class GcsFileSystem::Impl {
return internal::ToArrowStatus(metadata.status());
}

Result<std::shared_ptr<io::InputStream>> OpenInputStream(const GcsPath& path) {
auto stream = client_.ReadObject(path.bucket, path.object);
Result<std::shared_ptr<io::InputStream>> OpenInputStream(const std::string& bucket_name,
const std::string& object_name,
gcs::Generation generation,
gcs::ReadFromOffset offset) {
auto stream = client_.ReadObject(bucket_name, object_name, generation, offset);
ARROW_GCS_RETURN_NOT_OK(stream.status());
return std::make_shared<GcsInputStream>(std::move(stream), path.bucket, path.object,
gcs::Generation(), client_);
return std::make_shared<GcsInputStream>(std::move(stream), bucket_name, object_name,
gcs::Generation(), offset, client_);
}

Result<std::shared_ptr<io::OutputStream>> OpenOutputStream(
Expand All @@ -238,6 +316,10 @@ class GcsFileSystem::Impl {
return std::make_shared<GcsOutputStream>(std::move(stream));
}

google::cloud::StatusOr<gcs::ObjectMetadata> GetObjectMetadata(const GcsPath& path) {
return client_.GetObjectMetadata(path.bucket, path.object);
}

private:
static Result<FileInfo> GetFileInfoImpl(const GcsPath& path,
const google::cloud::Status& status,
Expand Down Expand Up @@ -315,7 +397,8 @@ Status GcsFileSystem::CopyFile(const std::string& src, const std::string& dest)
Result<std::shared_ptr<io::InputStream>> GcsFileSystem::OpenInputStream(
const std::string& path) {
ARROW_ASSIGN_OR_RAISE(auto p, GcsPath::FromString(path));
return impl_->OpenInputStream(p);
return impl_->OpenInputStream(p.bucket, p.object, gcs::Generation(),
gcs::ReadFromOffset());
}

Result<std::shared_ptr<io::InputStream>> GcsFileSystem::OpenInputStream(
Expand All @@ -324,17 +407,46 @@ Result<std::shared_ptr<io::InputStream>> GcsFileSystem::OpenInputStream(
return Status::IOError("Only files can be opened as input streams");
}
ARROW_ASSIGN_OR_RAISE(auto p, GcsPath::FromString(info.path()));
return impl_->OpenInputStream(p);
return impl_->OpenInputStream(p.bucket, p.object, gcs::Generation(),
gcs::ReadFromOffset());
}

Result<std::shared_ptr<io::RandomAccessFile>> GcsFileSystem::OpenInputFile(
const std::string& path) {
return Status::NotImplemented("The GCS FileSystem is not fully implemented");
ARROW_ASSIGN_OR_RAISE(auto p, GcsPath::FromString(path));
auto metadata = impl_->GetObjectMetadata(p);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm curious, does this add a roundtrip to the server?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it does. I am trying to ensure that Read() and ReadAt() and Seek() when going back are using the same generation of an object [*]. We could try to use undocumented (and likely to break) APIs to extract the generation without this roundtrip. If it turns out the roundtrip (and I should add, the additional API charges) are really important, then I would rather add a documented API to the C++ client library and then use that here.

[*]: you probably know this, but objects in GCS are versioned. You can have more than one version of the same object, and/or have the "latest" version replaced while you are reading from it. I would think we want all operations in one io::RandomAccessFile to refer to the same generation.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would think we want all operations in one io::RandomAccessFile to refer to the same generation.

Oh, definitely. I just wonder if there's a way to extract the metadata from the first stream creation? Perhaps this is something that the GCS C++ library can provide (assuming the information exists at all at the HTTP level)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And thus my (maybe too obscure) note about "undocumented (and likely to break) APIs". The C++ client library currently returns the HTTP headers, which include thinks like x-goog-generation (not the full metadata). This is only intended for debugging, and I know of future changes that will make these headers unavailable (sometimes, when the user selects a non-HTTP transport).

I can change the C++ client to return the generation in a future-proof API. I would rather do that in a separate PR, after I fix the C++ client library (googleapis/google-cloud-cpp#7677).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can change the C++ client to return the generation in a future-proof API. I would rather do that in a separate PR, after I fix the C++ client library

No problem from me.

ARROW_GCS_RETURN_NOT_OK(metadata.status());
auto impl = impl_;
auto open_stream = [impl](const std::string& b, const std::string& o, gcs::Generation g,
gcs::ReadFromOffset offset) {
return impl->OpenInputStream(b, o, g, offset);
};
ARROW_ASSIGN_OR_RAISE(
auto stream,
impl_->OpenInputStream(p.bucket, p.object, gcs::Generation(metadata->generation()),
gcs::ReadFromOffset()));

return std::make_shared<GcsRandomAccessFile>(std::move(open_stream),
*std::move(metadata), std::move(stream));
}

Result<std::shared_ptr<io::RandomAccessFile>> GcsFileSystem::OpenInputFile(
const FileInfo& info) {
return Status::NotImplemented("The GCS FileSystem is not fully implemented");
ARROW_ASSIGN_OR_RAISE(auto p, GcsPath::FromString(info.path()));
auto metadata = impl_->GetObjectMetadata(p);
ARROW_GCS_RETURN_NOT_OK(metadata.status());
auto impl = impl_;
auto open_stream = [impl](const std::string& b, const std::string& o, gcs::Generation g,
gcs::ReadFromOffset offset) {
return impl->OpenInputStream(b, o, g, offset);
};
ARROW_ASSIGN_OR_RAISE(
auto stream,
impl_->OpenInputStream(p.bucket, p.object, gcs::Generation(metadata->generation()),
gcs::ReadFromOffset()));

return std::make_shared<GcsRandomAccessFile>(std::move(open_stream),
*std::move(metadata), std::move(stream));
}

Result<std::shared_ptr<io::OutputStream>> GcsFileSystem::OpenOutputStream(
Expand Down
149 changes: 142 additions & 7 deletions cpp/src/arrow/filesystem/gcsfs_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

#include <array>
#include <boost/process.hpp>
#include <random>
#include <string>

#include "arrow/filesystem/gcsfs_internal.h"
Expand Down Expand Up @@ -73,6 +74,8 @@ class GcsIntegrationTest : public ::testing::Test {

protected:
void SetUp() override {
// Initialize a PRNG with a small amount of entropy.
generator_ = std::mt19937_64(std::random_device()());
port_ = std::to_string(GetListenPort());
auto exe_path = bp::search_path("python3");
ASSERT_THAT(exe_path, Not(IsEmpty()));
Expand Down Expand Up @@ -119,7 +122,23 @@ class GcsIntegrationTest : public ::testing::Test {
.set<gc::UnifiedCredentialsOption>(gc::MakeInsecureCredentials()));
}

std::string RandomLine(int lineno, std::size_t width) {
auto const fillers = std::string("abcdefghijlkmnopqrstuvwxyz0123456789");
std::uniform_int_distribution<std::size_t> d(0, fillers.size() - 1);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can probably reuse random_string from arrow/testing/util.h instead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

random_string() produces non-printable characters, which are not the easiest thing to troubleshoot. random_ascii() could do it, but has a weird API using a uint8_t* output parameter. I would rather leave this as-is.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair enough :-)


auto line = std::to_string(lineno) + ": ";
std::generate_n(std::back_inserter(line), width - line.size() - 1,
[&] { return fillers[d(generator_)]; });
line += '\n';
return line;
}

int RandomIndex(std::size_t end) {
return std::uniform_int_distribution<std::size_t>(0, end - 1)(generator_);
}

private:
std::mt19937_64 generator_;
std::string port_;
bp::child server_process_;
};
Expand Down Expand Up @@ -384,22 +403,18 @@ TEST_F(GcsIntegrationTest, ReadObjectInfo) {
TEST_F(GcsIntegrationTest, ReadObjectNotFound) {
auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions());

auto result = fs->OpenInputStream(NotFoundObjectPath());
EXPECT_EQ(result.status().code(), StatusCode::IOError);
ASSERT_RAISES(IOError, fs->OpenInputStream(NotFoundObjectPath()));
}

TEST_F(GcsIntegrationTest, ReadObjectInfoInvalid) {
auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions());

arrow::fs::FileInfo info;
ASSERT_OK_AND_ASSIGN(info, fs->GetFileInfo(kPreexistingBucket));

auto result = fs->OpenInputStream(NotFoundObjectPath());
EXPECT_EQ(result.status().code(), StatusCode::IOError);
ASSERT_RAISES(IOError, fs->OpenInputStream(info));

ASSERT_OK_AND_ASSIGN(info, fs->GetFileInfo(NotFoundObjectPath()));
result = fs->OpenInputStream(NotFoundObjectPath());
EXPECT_EQ(result.status().code(), StatusCode::IOError);
ASSERT_RAISES(IOError, fs->OpenInputStream(info));
}

TEST_F(GcsIntegrationTest, ReadObjectReadMetadata) {
Expand Down Expand Up @@ -521,6 +536,126 @@ TEST_F(GcsIntegrationTest, WriteObjectLarge) {
EXPECT_EQ(contents, buffers[0] + buffers[1] + buffers[2]);
}

TEST_F(GcsIntegrationTest, OpenInputFileMixedReadVsReadAt) {
auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions());

// Create a file large enough to make the random access tests non-trivial.
auto constexpr kLineWidth = 100;
auto constexpr kLineCount = 4096;
std::vector<std::string> lines(kLineCount);
int lineno = 0;
std::generate_n(lines.begin(), lines.size(),
[&] { return RandomLine(++lineno, kLineWidth); });

const auto path =
kPreexistingBucket + std::string("/OpenInputFileMixedReadVsReadAt/object-name");
std::shared_ptr<io::OutputStream> output;
ASSERT_OK_AND_ASSIGN(output, fs->OpenOutputStream(path, {}));
for (auto const& line : lines) {
ASSERT_OK(output->Write(line.data(), line.size()));
}
ASSERT_OK(output->Close());

std::shared_ptr<io::RandomAccessFile> file;
ASSERT_OK_AND_ASSIGN(file, fs->OpenInputFile(path));
for (int i = 0; i != 32; ++i) {
SCOPED_TRACE("Iteration " + std::to_string(i));
// Verify sequential reads work as expected.
std::array<char, kLineWidth> buffer{};
std::int64_t size;
{
ASSERT_OK_AND_ASSIGN(auto actual, file->Read(kLineWidth));
EXPECT_EQ(lines[2 * i], actual->ToString());
}
{
ASSERT_OK_AND_ASSIGN(size, file->Read(buffer.size(), buffer.data()));
EXPECT_EQ(size, kLineWidth);
auto actual = std::string{buffer.begin(), buffer.end()};
EXPECT_EQ(lines[2 * i + 1], actual);
}

// Verify random reads interleave too.
auto const index = RandomIndex(kLineCount);
auto const position = index * kLineWidth;
ASSERT_OK_AND_ASSIGN(size, file->ReadAt(position, buffer.size(), buffer.data()));
EXPECT_EQ(size, kLineWidth);
auto actual = std::string{buffer.begin(), buffer.end()};
EXPECT_EQ(lines[index], actual);

// Verify random reads using buffers work.
ASSERT_OK_AND_ASSIGN(auto b, file->ReadAt(position, kLineWidth));
EXPECT_EQ(lines[index], b->ToString());
}
}

TEST_F(GcsIntegrationTest, OpenInputFileRandomSeek) {
auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions());

// Create a file large enough to make the random access tests non-trivial.
auto constexpr kLineWidth = 100;
auto constexpr kLineCount = 4096;
std::vector<std::string> lines(kLineCount);
int lineno = 0;
std::generate_n(lines.begin(), lines.size(),
[&] { return RandomLine(++lineno, kLineWidth); });

const auto path =
kPreexistingBucket + std::string("/OpenInputFileRandomSeek/object-name");
std::shared_ptr<io::OutputStream> output;
ASSERT_OK_AND_ASSIGN(output, fs->OpenOutputStream(path, {}));
for (auto const& line : lines) {
ASSERT_OK(output->Write(line.data(), line.size()));
}
ASSERT_OK(output->Close());

std::shared_ptr<io::RandomAccessFile> file;
ASSERT_OK_AND_ASSIGN(file, fs->OpenInputFile(path));
for (int i = 0; i != 32; ++i) {
SCOPED_TRACE("Iteration " + std::to_string(i));
// Verify sequential reads work as expected.
auto const index = RandomIndex(kLineCount);
auto const position = index * kLineWidth;
ASSERT_OK(file->Seek(position));
ASSERT_OK_AND_ASSIGN(auto actual, file->Read(kLineWidth));
EXPECT_EQ(lines[index], actual->ToString());
}
}

TEST_F(GcsIntegrationTest, OpenInputFileInfo) {
auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions());

arrow::fs::FileInfo info;
ASSERT_OK_AND_ASSIGN(info, fs->GetFileInfo(PreexistingObjectPath()));

std::shared_ptr<io::RandomAccessFile> file;
ASSERT_OK_AND_ASSIGN(file, fs->OpenInputFile(info));

std::array<char, 1024> buffer{};
std::int64_t size;
auto constexpr kStart = 16;
ASSERT_OK_AND_ASSIGN(size, file->ReadAt(kStart, buffer.size(), buffer.data()));

auto const expected = std::string(kLoremIpsum).substr(kStart);
EXPECT_EQ(std::string(buffer.data(), size), expected);
}

TEST_F(GcsIntegrationTest, OpenInputFileNotFound) {
auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions());

ASSERT_RAISES(IOError, fs->OpenInputFile(NotFoundObjectPath()));
}

TEST_F(GcsIntegrationTest, OpenInputFileInfoInvalid) {
auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions());

arrow::fs::FileInfo info;
ASSERT_OK_AND_ASSIGN(info, fs->GetFileInfo(kPreexistingBucket));
ASSERT_RAISES(IOError, fs->OpenInputFile(info));

ASSERT_OK_AND_ASSIGN(info, fs->GetFileInfo(NotFoundObjectPath()));
ASSERT_RAISES(IOError, fs->OpenInputFile(info));
}

} // namespace
} // namespace fs
} // namespace arrow