diff --git a/cpp/src/arrow/filesystem/gcsfs.cc b/cpp/src/arrow/filesystem/gcsfs.cc index 47515ef8a61..1f6cbac180c 100644 --- a/cpp/src/arrow/filesystem/gcsfs.cc +++ b/cpp/src/arrow/filesystem/gcsfs.cc @@ -96,28 +96,32 @@ class GcsInputStream : public arrow::io::InputStream { // @name FileInterface Status Close() override { stream_.Close(); + closed_ = true; return Status::OK(); } Result Tell() const override { - if (!stream_) { - return Status::IOError("invalid stream"); - } + if (closed()) return Status::Invalid("Cannot use Tell() on a closed stream"); return stream_.tellg() + offset_; } - bool closed() const override { return !stream_.IsOpen(); } + // A gcs::ObjectReadStream can be "born closed". For small objects the stream returns + // `IsOpen() == false` as soon as it is created, but the application can still read from + // it. + bool closed() const override { return closed_ && !stream_.IsOpen(); } //@} //@{ // @name Readable Result Read(int64_t nbytes, void* out) override { + if (closed()) return Status::Invalid("Cannot read from a closed stream"); stream_.read(static_cast(out), nbytes); ARROW_GCS_RETURN_NOT_OK(stream_.status()); return stream_.gcount(); } Result> Read(int64_t nbytes) override { + if (closed()) return Status::Invalid("Cannot read from a closed stream"); ARROW_ASSIGN_OR_RAISE(auto buffer, arrow::AllocateResizableBuffer(nbytes)); stream_.read(reinterpret_cast(buffer->mutable_data()), nbytes); ARROW_GCS_RETURN_NOT_OK(stream_.status()); @@ -142,6 +146,7 @@ class GcsInputStream : public arrow::io::InputStream { gcs::Generation generation_; std::int64_t offset_; gcs::Client client_; + bool closed_ = false; }; class GcsOutputStream : public arrow::io::OutputStream { @@ -151,19 +156,28 @@ class GcsOutputStream : public arrow::io::OutputStream { Status Close() override { stream_.Close(); + closed_ = true; return internal::ToArrowStatus(stream_.last_status()); } Result Tell() const override { - if (!stream_) { - return Status::IOError("invalid stream"); - } + if (closed()) return Status::Invalid("Cannot use Tell() on a closed stream"); return tell_; } - bool closed() const override { return !stream_.IsOpen(); } + // gcs::ObjectWriteStream can be "closed" without an explicit Close() call. At this time + // this class does not use any of the mechanisms [*] that trigger such behavior. + // Nevertheless, we defensively prepare for them by checking either condition. + // + // [*]: These mechanisms include: + // - resumable uploads that are "resumed" after the upload completed are born + // "closed", + // - uploads that prescribe their total size using the `x-upload-content-length` header + // are completed and "closed" as soon as the upload reaches that size. + bool closed() const override { return closed_ || !stream_.IsOpen(); } Status Write(const void* data, int64_t nbytes) override { + if (closed()) return Status::Invalid("Cannot write to a closed stream"); if (stream_.write(reinterpret_cast(data), nbytes)) { tell_ += nbytes; return Status::OK(); @@ -172,6 +186,7 @@ class GcsOutputStream : public arrow::io::OutputStream { } Status Flush() override { + if (closed()) return Status::Invalid("Cannot flush a closed stream"); stream_.flush(); return Status::OK(); } @@ -179,6 +194,7 @@ class GcsOutputStream : public arrow::io::OutputStream { private: gcs::ObjectWriteStream stream_; int64_t tell_ = 0; + bool closed_ = false; }; using InputStreamFactory = std::function>( @@ -225,6 +241,7 @@ class GcsRandomAccessFile : public arrow::io::RandomAccessFile { // @name RandomAccessFile Result GetSize() override { return metadata_.size(); } Result ReadAt(int64_t position, int64_t nbytes, void* out) override { + if (closed()) return Status::Invalid("Cannot read from closed file"); std::shared_ptr stream; ARROW_ASSIGN_OR_RAISE(stream, factory_(metadata_.bucket(), metadata_.name(), gcs::Generation(metadata_.generation()), @@ -232,6 +249,7 @@ class GcsRandomAccessFile : public arrow::io::RandomAccessFile { return stream->Read(nbytes, out); } Result> ReadAt(int64_t position, int64_t nbytes) override { + if (closed()) return Status::Invalid("Cannot read from closed file"); std::shared_ptr stream; ARROW_ASSIGN_OR_RAISE(stream, factory_(metadata_.bucket(), metadata_.name(), gcs::Generation(metadata_.generation()), @@ -242,6 +260,7 @@ class GcsRandomAccessFile : public arrow::io::RandomAccessFile { // from Seekable Status Seek(int64_t position) override { + if (closed()) return Status::Invalid("Cannot seek in a closed file"); ARROW_ASSIGN_OR_RAISE(stream_, factory_(metadata_.bucket(), metadata_.name(), gcs::Generation(metadata_.generation()), gcs::ReadFromOffset(position))); diff --git a/cpp/src/arrow/filesystem/gcsfs_test.cc b/cpp/src/arrow/filesystem/gcsfs_test.cc index a2985ca1d17..220aca1f746 100644 --- a/cpp/src/arrow/filesystem/gcsfs_test.cc +++ b/cpp/src/arrow/filesystem/gcsfs_test.cc @@ -671,7 +671,7 @@ TEST_F(GcsIntegrationTest, CopyFileNotFound) { ASSERT_RAISES(IOError, fs->CopyFile(NotFoundObjectPath(), destination_path)); } -TEST_F(GcsIntegrationTest, ReadObjectString) { +TEST_F(GcsIntegrationTest, OpenInputStreamString) { auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions()); std::shared_ptr stream; @@ -684,7 +684,7 @@ TEST_F(GcsIntegrationTest, ReadObjectString) { EXPECT_EQ(std::string(buffer.data(), size), kLoremIpsum); } -TEST_F(GcsIntegrationTest, ReadObjectStringBuffers) { +TEST_F(GcsIntegrationTest, OpenInputStreamStringBuffers) { auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions()); std::shared_ptr stream; @@ -700,7 +700,7 @@ TEST_F(GcsIntegrationTest, ReadObjectStringBuffers) { EXPECT_EQ(contents, kLoremIpsum); } -TEST_F(GcsIntegrationTest, ReadObjectInfo) { +TEST_F(GcsIntegrationTest, OpenInputStreamInfo) { auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions()); arrow::fs::FileInfo info; @@ -716,13 +716,27 @@ TEST_F(GcsIntegrationTest, ReadObjectInfo) { EXPECT_EQ(std::string(buffer.data(), size), kLoremIpsum); } -TEST_F(GcsIntegrationTest, ReadObjectNotFound) { +TEST_F(GcsIntegrationTest, OpenInputStreamEmpty) { + auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions()); + + const auto object_path = + internal::ConcatAbstractPath(PreexistingBucketName(), "empty-object.txt"); + CreateFile(fs.get(), object_path, std::string()); + + ASSERT_OK_AND_ASSIGN(auto stream, fs->OpenInputStream(object_path)); + std::array buffer{}; + std::int64_t size; + ASSERT_OK_AND_ASSIGN(size, stream->Read(buffer.size(), buffer.data())); + EXPECT_EQ(size, 0); +} + +TEST_F(GcsIntegrationTest, OpenInputStreamNotFound) { auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions()); ASSERT_RAISES(IOError, fs->OpenInputStream(NotFoundObjectPath())); } -TEST_F(GcsIntegrationTest, ReadObjectInfoInvalid) { +TEST_F(GcsIntegrationTest, OpenInputStreamInfoInvalid) { auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions()); arrow::fs::FileInfo info; @@ -733,10 +747,10 @@ TEST_F(GcsIntegrationTest, ReadObjectInfoInvalid) { ASSERT_RAISES(IOError, fs->OpenInputStream(info)); } -TEST_F(GcsIntegrationTest, ReadObjectReadMetadata) { +TEST_F(GcsIntegrationTest, OpenInputStreamReadMetadata) { auto client = GcsClient(); const auto custom_time = std::chrono::system_clock::now() + std::chrono::hours(1); - const std::string object_name = "ReadObjectMetadataTest/simple.txt"; + const std::string object_name = "OpenInputStreamMetadataTest/simple.txt"; const gcs::ObjectMetadata expected = client .InsertObject(PreexistingBucketName(), object_name, @@ -795,7 +809,18 @@ TEST_F(GcsIntegrationTest, ReadObjectReadMetadata) { ASSERT_FALSE(actual->Contains("kmsKeyName")); } -TEST_F(GcsIntegrationTest, WriteObjectSmall) { +TEST_F(GcsIntegrationTest, OpenInputStreamClosed) { + auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions()); + + ASSERT_OK_AND_ASSIGN(auto stream, fs->OpenInputStream(PreexistingObjectPath())); + ASSERT_OK(stream->Close()); + std::array buffer{}; + ASSERT_RAISES(Invalid, stream->Read(buffer.size(), buffer.data())); + ASSERT_RAISES(Invalid, stream->Read(buffer.size())); + ASSERT_RAISES(Invalid, stream->Tell()); +} + +TEST_F(GcsIntegrationTest, OpenOutputStreamSmall) { auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions()); const auto path = PreexistingBucketPath() + "test-write-object"; @@ -816,7 +841,7 @@ TEST_F(GcsIntegrationTest, WriteObjectSmall) { EXPECT_EQ(std::string(inbuf.data(), size), expected); } -TEST_F(GcsIntegrationTest, WriteObjectLarge) { +TEST_F(GcsIntegrationTest, OpenOutputStreamLarge) { auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions()); const auto path = PreexistingBucketPath() + "test-write-object"; @@ -852,6 +877,19 @@ TEST_F(GcsIntegrationTest, WriteObjectLarge) { EXPECT_EQ(contents, buffers[0] + buffers[1] + buffers[2]); } +TEST_F(GcsIntegrationTest, OpenOutputStreamClosed) { + auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions()); + + const auto path = internal::ConcatAbstractPath(PreexistingBucketName(), + "open-output-stream-closed.txt"); + std::shared_ptr output; + ASSERT_OK_AND_ASSIGN(output, fs->OpenOutputStream(path, {})); + ASSERT_OK(output->Close()); + ASSERT_RAISES(Invalid, output->Write(kLoremIpsum, std::strlen(kLoremIpsum))); + ASSERT_RAISES(Invalid, output->Flush()); + ASSERT_RAISES(Invalid, output->Tell()); +} + TEST_F(GcsIntegrationTest, OpenInputFileMixedReadVsReadAt) { auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions()); @@ -971,6 +1009,20 @@ TEST_F(GcsIntegrationTest, OpenInputFileInfoInvalid) { ASSERT_RAISES(IOError, fs->OpenInputFile(info)); } +TEST_F(GcsIntegrationTest, OpenInputFileClosed) { + auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions()); + + ASSERT_OK_AND_ASSIGN(auto stream, fs->OpenInputFile(PreexistingObjectPath())); + ASSERT_OK(stream->Close()); + std::array buffer{}; + ASSERT_RAISES(Invalid, stream->Tell()); + ASSERT_RAISES(Invalid, stream->Read(buffer.size(), buffer.data())); + ASSERT_RAISES(Invalid, stream->Read(buffer.size())); + ASSERT_RAISES(Invalid, stream->ReadAt(1, buffer.size(), buffer.data())); + ASSERT_RAISES(Invalid, stream->ReadAt(1, 1)); + ASSERT_RAISES(Invalid, stream->Seek(2)); +} + } // namespace } // namespace fs } // namespace arrow