Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 27 additions & 8 deletions cpp/src/arrow/filesystem/gcsfs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -96,28 +96,32 @@ class GcsInputStream : public arrow::io::InputStream {
// @name FileInterface
Status Close() override {
stream_.Close();
closed_ = true;
return Status::OK();
}

Result<int64_t> Tell() const override {
if (!stream_) {
return Status::IOError("invalid stream");
}
if (closed()) return Status::Invalid("Cannot use Tell() on a closed stream");
return stream_.tellg() + offset_;
}

bool closed() const override { return !stream_.IsOpen(); }
// A gcs::ObjectReadStream can be "born closed". For small objects the stream returns
// `IsOpen() == false` as soon as it is created, but the application can still read from
// it.
bool closed() const override { return closed_ && !stream_.IsOpen(); }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason this is && rather than ||?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. zero-length objects are "born closed" in google-cloud-cpp. Probably a bug (or at least a very poorly thought API), but you can read() from them, while they return IsOpen() == false.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, can you add a comment about that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. That got me thinking about the similar expression for output streams, and you were right in that case. Fixed that too.

//@}

//@{
// @name Readable
Result<int64_t> Read(int64_t nbytes, void* out) override {
if (closed()) return Status::Invalid("Cannot read from a closed stream");
stream_.read(static_cast<char*>(out), nbytes);
ARROW_GCS_RETURN_NOT_OK(stream_.status());
return stream_.gcount();
}

Result<std::shared_ptr<Buffer>> Read(int64_t nbytes) override {
if (closed()) return Status::Invalid("Cannot read from a closed stream");
ARROW_ASSIGN_OR_RAISE(auto buffer, arrow::AllocateResizableBuffer(nbytes));
stream_.read(reinterpret_cast<char*>(buffer->mutable_data()), nbytes);
ARROW_GCS_RETURN_NOT_OK(stream_.status());
Expand All @@ -142,6 +146,7 @@ class GcsInputStream : public arrow::io::InputStream {
gcs::Generation generation_;
std::int64_t offset_;
gcs::Client client_;
bool closed_ = false;
};

class GcsOutputStream : public arrow::io::OutputStream {
Expand All @@ -151,19 +156,28 @@ class GcsOutputStream : public arrow::io::OutputStream {

Status Close() override {
stream_.Close();
closed_ = true;
return internal::ToArrowStatus(stream_.last_status());
}

Result<int64_t> Tell() const override {
if (!stream_) {
return Status::IOError("invalid stream");
}
if (closed()) return Status::Invalid("Cannot use Tell() on a closed stream");
return tell_;
}

bool closed() const override { return !stream_.IsOpen(); }
// gcs::ObjectWriteStream can be "closed" without an explicit Close() call. At this time
// this class does not use any of the mechanisms [*] that trigger such behavior.
// Nevertheless, we defensively prepare for them by checking either condition.
//
// [*]: These mechanisms include:
// - resumable uploads that are "resumed" after the upload completed are born
// "closed",
// - uploads that prescribe their total size using the `x-upload-content-length` header
// are completed and "closed" as soon as the upload reaches that size.
bool closed() const override { return closed_ || !stream_.IsOpen(); }

Status Write(const void* data, int64_t nbytes) override {
if (closed()) return Status::Invalid("Cannot write to a closed stream");
if (stream_.write(reinterpret_cast<const char*>(data), nbytes)) {
tell_ += nbytes;
return Status::OK();
Expand All @@ -172,13 +186,15 @@ class GcsOutputStream : public arrow::io::OutputStream {
}

Status Flush() override {
if (closed()) return Status::Invalid("Cannot flush a closed stream");
stream_.flush();
return Status::OK();
}

private:
gcs::ObjectWriteStream stream_;
int64_t tell_ = 0;
bool closed_ = false;
};

using InputStreamFactory = std::function<Result<std::shared_ptr<io::InputStream>>(
Expand Down Expand Up @@ -225,13 +241,15 @@ class GcsRandomAccessFile : public arrow::io::RandomAccessFile {
// @name RandomAccessFile
Result<int64_t> GetSize() override { return metadata_.size(); }
Result<int64_t> ReadAt(int64_t position, int64_t nbytes, void* out) override {
if (closed()) return Status::Invalid("Cannot read from closed file");
std::shared_ptr<io::InputStream> stream;
ARROW_ASSIGN_OR_RAISE(stream, factory_(metadata_.bucket(), metadata_.name(),
gcs::Generation(metadata_.generation()),
gcs::ReadFromOffset(position)));
return stream->Read(nbytes, out);
}
Result<std::shared_ptr<Buffer>> ReadAt(int64_t position, int64_t nbytes) override {
if (closed()) return Status::Invalid("Cannot read from closed file");
std::shared_ptr<io::InputStream> stream;
ARROW_ASSIGN_OR_RAISE(stream, factory_(metadata_.bucket(), metadata_.name(),
gcs::Generation(metadata_.generation()),
Expand All @@ -242,6 +260,7 @@ class GcsRandomAccessFile : public arrow::io::RandomAccessFile {

// from Seekable
Status Seek(int64_t position) override {
if (closed()) return Status::Invalid("Cannot seek in a closed file");
ARROW_ASSIGN_OR_RAISE(stream_, factory_(metadata_.bucket(), metadata_.name(),
gcs::Generation(metadata_.generation()),
gcs::ReadFromOffset(position)));
Expand Down
70 changes: 61 additions & 9 deletions cpp/src/arrow/filesystem/gcsfs_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -671,7 +671,7 @@ TEST_F(GcsIntegrationTest, CopyFileNotFound) {
ASSERT_RAISES(IOError, fs->CopyFile(NotFoundObjectPath(), destination_path));
}

TEST_F(GcsIntegrationTest, ReadObjectString) {
TEST_F(GcsIntegrationTest, OpenInputStreamString) {
auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions());

std::shared_ptr<io::InputStream> stream;
Expand All @@ -684,7 +684,7 @@ TEST_F(GcsIntegrationTest, ReadObjectString) {
EXPECT_EQ(std::string(buffer.data(), size), kLoremIpsum);
}

TEST_F(GcsIntegrationTest, ReadObjectStringBuffers) {
TEST_F(GcsIntegrationTest, OpenInputStreamStringBuffers) {
auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions());

std::shared_ptr<io::InputStream> stream;
Expand All @@ -700,7 +700,7 @@ TEST_F(GcsIntegrationTest, ReadObjectStringBuffers) {
EXPECT_EQ(contents, kLoremIpsum);
}

TEST_F(GcsIntegrationTest, ReadObjectInfo) {
TEST_F(GcsIntegrationTest, OpenInputStreamInfo) {
auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions());

arrow::fs::FileInfo info;
Expand All @@ -716,13 +716,27 @@ TEST_F(GcsIntegrationTest, ReadObjectInfo) {
EXPECT_EQ(std::string(buffer.data(), size), kLoremIpsum);
}

TEST_F(GcsIntegrationTest, ReadObjectNotFound) {
TEST_F(GcsIntegrationTest, OpenInputStreamEmpty) {
auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions());

const auto object_path =
internal::ConcatAbstractPath(PreexistingBucketName(), "empty-object.txt");
CreateFile(fs.get(), object_path, std::string());

ASSERT_OK_AND_ASSIGN(auto stream, fs->OpenInputStream(object_path));
std::array<char, 1024> buffer{};
std::int64_t size;
ASSERT_OK_AND_ASSIGN(size, stream->Read(buffer.size(), buffer.data()));
EXPECT_EQ(size, 0);
}

TEST_F(GcsIntegrationTest, OpenInputStreamNotFound) {
auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions());

ASSERT_RAISES(IOError, fs->OpenInputStream(NotFoundObjectPath()));
}

TEST_F(GcsIntegrationTest, ReadObjectInfoInvalid) {
TEST_F(GcsIntegrationTest, OpenInputStreamInfoInvalid) {
auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions());

arrow::fs::FileInfo info;
Expand All @@ -733,10 +747,10 @@ TEST_F(GcsIntegrationTest, ReadObjectInfoInvalid) {
ASSERT_RAISES(IOError, fs->OpenInputStream(info));
}

TEST_F(GcsIntegrationTest, ReadObjectReadMetadata) {
TEST_F(GcsIntegrationTest, OpenInputStreamReadMetadata) {
auto client = GcsClient();
const auto custom_time = std::chrono::system_clock::now() + std::chrono::hours(1);
const std::string object_name = "ReadObjectMetadataTest/simple.txt";
const std::string object_name = "OpenInputStreamMetadataTest/simple.txt";
const gcs::ObjectMetadata expected =
client
.InsertObject(PreexistingBucketName(), object_name,
Expand Down Expand Up @@ -795,7 +809,18 @@ TEST_F(GcsIntegrationTest, ReadObjectReadMetadata) {
ASSERT_FALSE(actual->Contains("kmsKeyName"));
}

TEST_F(GcsIntegrationTest, WriteObjectSmall) {
TEST_F(GcsIntegrationTest, OpenInputStreamClosed) {
auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions());

ASSERT_OK_AND_ASSIGN(auto stream, fs->OpenInputStream(PreexistingObjectPath()));
ASSERT_OK(stream->Close());
std::array<char, 16> buffer{};
ASSERT_RAISES(Invalid, stream->Read(buffer.size(), buffer.data()));
ASSERT_RAISES(Invalid, stream->Read(buffer.size()));
ASSERT_RAISES(Invalid, stream->Tell());
}

TEST_F(GcsIntegrationTest, OpenOutputStreamSmall) {
auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions());

const auto path = PreexistingBucketPath() + "test-write-object";
Expand All @@ -816,7 +841,7 @@ TEST_F(GcsIntegrationTest, WriteObjectSmall) {
EXPECT_EQ(std::string(inbuf.data(), size), expected);
}

TEST_F(GcsIntegrationTest, WriteObjectLarge) {
TEST_F(GcsIntegrationTest, OpenOutputStreamLarge) {
auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions());

const auto path = PreexistingBucketPath() + "test-write-object";
Expand Down Expand Up @@ -852,6 +877,19 @@ TEST_F(GcsIntegrationTest, WriteObjectLarge) {
EXPECT_EQ(contents, buffers[0] + buffers[1] + buffers[2]);
}

TEST_F(GcsIntegrationTest, OpenOutputStreamClosed) {
auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions());

const auto path = internal::ConcatAbstractPath(PreexistingBucketName(),
"open-output-stream-closed.txt");
std::shared_ptr<io::OutputStream> output;
ASSERT_OK_AND_ASSIGN(output, fs->OpenOutputStream(path, {}));
ASSERT_OK(output->Close());
ASSERT_RAISES(Invalid, output->Write(kLoremIpsum, std::strlen(kLoremIpsum)));
ASSERT_RAISES(Invalid, output->Flush());
ASSERT_RAISES(Invalid, output->Tell());
}

TEST_F(GcsIntegrationTest, OpenInputFileMixedReadVsReadAt) {
auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions());

Expand Down Expand Up @@ -971,6 +1009,20 @@ TEST_F(GcsIntegrationTest, OpenInputFileInfoInvalid) {
ASSERT_RAISES(IOError, fs->OpenInputFile(info));
}

TEST_F(GcsIntegrationTest, OpenInputFileClosed) {
auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions());

ASSERT_OK_AND_ASSIGN(auto stream, fs->OpenInputFile(PreexistingObjectPath()));
ASSERT_OK(stream->Close());
std::array<char, 16> buffer{};
ASSERT_RAISES(Invalid, stream->Tell());
ASSERT_RAISES(Invalid, stream->Read(buffer.size(), buffer.data()));
ASSERT_RAISES(Invalid, stream->Read(buffer.size()));
ASSERT_RAISES(Invalid, stream->ReadAt(1, buffer.size(), buffer.data()));
ASSERT_RAISES(Invalid, stream->ReadAt(1, 1));
ASSERT_RAISES(Invalid, stream->Seek(2));
}

} // namespace
} // namespace fs
} // namespace arrow