From 2830d1235b31071e8f1a48aab256949acdc49029 Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Tue, 8 Sep 2020 19:21:43 +0200 Subject: [PATCH] ARROW-9906: [C++] Keep S3 filesystem alive through open file objects --- cpp/src/arrow/filesystem/s3fs.cc | 101 +++++++++--------- .../arrow/filesystem/s3fs_narrative_test.cc | 2 +- cpp/src/arrow/filesystem/s3fs_test.cc | 22 ++++ python/pyarrow/tests/test_parquet.py | 21 ++-- 4 files changed, 84 insertions(+), 62 deletions(-) diff --git a/cpp/src/arrow/filesystem/s3fs.cc b/cpp/src/arrow/filesystem/s3fs.cc index cc58890aeb0..9616bac71bf 100644 --- a/cpp/src/arrow/filesystem/s3fs.cc +++ b/cpp/src/arrow/filesystem/s3fs.cc @@ -436,13 +436,11 @@ Result GetObjectRange(Aws::S3::S3Client* client, } // A RandomAccessFile that reads from a S3 object -class ObjectInputFile : public io::RandomAccessFile { +class ObjectInputFile final : public io::RandomAccessFile { public: - ObjectInputFile(Aws::S3::S3Client* client, const S3Path& path) - : client_(client), path_(path) {} - - ObjectInputFile(Aws::S3::S3Client* client, const S3Path& path, int64_t size) - : client_(client), path_(path), content_length_(size) {} + ObjectInputFile(std::shared_ptr fs, Aws::S3::S3Client* client, + const S3Path& path, int64_t size = kNoSize) + : fs_(std::move(fs)), client_(client), path_(path), content_length_(size) {} Status Init() { // Issue a HEAD Object to get the content-length and ensure any @@ -492,6 +490,8 @@ class ObjectInputFile : public io::RandomAccessFile { // RandomAccessFile APIs Status Close() override { + fs_.reset(); + client_ = nullptr; closed_ = true; return Status::OK(); } @@ -566,6 +566,7 @@ class ObjectInputFile : public io::RandomAccessFile { } protected: + std::shared_ptr fs_; // Owner of S3Client Aws::S3::S3Client* client_; S3Path path_; bool closed_ = false; @@ -580,14 +581,14 @@ class ObjectInputFile : public io::RandomAccessFile { static constexpr int64_t kMinimumPartUpload = 5 * 1024 * 1024; // An OutputStream that writes to a S3 object -class ObjectOutputStream : public io::OutputStream { +class ObjectOutputStream final : public io::OutputStream { protected: struct UploadState; public: - ObjectOutputStream(Aws::S3::S3Client* client, const S3Path& path, - const S3Options& options) - : client_(client), path_(path), options_(options) {} + ObjectOutputStream(std::shared_ptr fs, Aws::S3::S3Client* client, + const S3Path& path, const S3Options& options) + : fs_(std::move(fs)), client_(client), path_(path), options_(options) {} ~ObjectOutputStream() override { // For compliance with the rest of the IO stack, Close rather than Abort, @@ -632,6 +633,8 @@ class ObjectOutputStream : public io::OutputStream { outcome.GetError()); } current_part_.reset(); + fs_.reset(); + client_ = nullptr; closed_ = true; return Status::OK(); } @@ -677,6 +680,8 @@ class ObjectOutputStream : public io::OutputStream { outcome.GetError()); } + fs_.reset(); + client_ = nullptr; closed_ = true; return Status::OK(); } @@ -849,6 +854,7 @@ class ObjectOutputStream : public io::OutputStream { } protected: + std::shared_ptr fs_; // Owner of S3Client Aws::S3::S3Client* client_; S3Path path_; const S3Options& options_; @@ -1273,6 +1279,35 @@ class S3FileSystem::Impl { } return Status::OK(); } + + Result> OpenInputFile(const std::string& s, + S3FileSystem* fs) { + ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(s)); + RETURN_NOT_OK(ValidateFilePath(path)); + + auto ptr = + std::make_shared(fs->shared_from_this(), client_.get(), path); + RETURN_NOT_OK(ptr->Init()); + return ptr; + } + + Result> OpenInputFile(const FileInfo& info, + S3FileSystem* fs) { + if (info.type() == FileType::NotFound) { + return ::arrow::fs::internal::PathNotFound(info.path()); + } + if (info.type() != FileType::File && info.type() != FileType::Unknown) { + return ::arrow::fs::internal::NotAFile(info.path()); + } + + ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(info.path())); + RETURN_NOT_OK(ValidateFilePath(path)); + + auto ptr = std::make_shared(fs->shared_from_this(), client_.get(), + path, info.size()); + RETURN_NOT_OK(ptr->Init()); + return ptr; + } }; S3FileSystem::S3FileSystem(const S3Options& options) : impl_(new Impl{options}) {} @@ -1528,56 +1563,22 @@ Status S3FileSystem::CopyFile(const std::string& src, const std::string& dest) { Result> S3FileSystem::OpenInputStream( const std::string& s) { - ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(s)); - RETURN_NOT_OK(ValidateFilePath(path)); - - auto ptr = std::make_shared(impl_->client_.get(), path); - RETURN_NOT_OK(ptr->Init()); - return ptr; + return impl_->OpenInputFile(s, this); } Result> S3FileSystem::OpenInputStream( const FileInfo& info) { - if (info.type() == FileType::NotFound) { - return ::arrow::fs::internal::PathNotFound(info.path()); - } - if (info.type() != FileType::File && info.type() != FileType::Unknown) { - return ::arrow::fs::internal::NotAFile(info.path()); - } - - ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(info.path())); - RETURN_NOT_OK(ValidateFilePath(path)); - - auto ptr = std::make_shared(impl_->client_.get(), path, info.size()); - RETURN_NOT_OK(ptr->Init()); - return ptr; + return impl_->OpenInputFile(info, this); } Result> S3FileSystem::OpenInputFile( const std::string& s) { - ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(s)); - RETURN_NOT_OK(ValidateFilePath(path)); - - auto ptr = std::make_shared(impl_->client_.get(), path); - RETURN_NOT_OK(ptr->Init()); - return ptr; + return impl_->OpenInputFile(s, this); } Result> S3FileSystem::OpenInputFile( const FileInfo& info) { - if (info.type() == FileType::NotFound) { - return ::arrow::fs::internal::PathNotFound(info.path()); - } - if (info.type() != FileType::File && info.type() != FileType::Unknown) { - return ::arrow::fs::internal::NotAFile(info.path()); - } - - ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(info.path())); - RETURN_NOT_OK(ValidateFilePath(path)); - - auto ptr = std::make_shared(impl_->client_.get(), path, info.size()); - RETURN_NOT_OK(ptr->Init()); - return ptr; + return impl_->OpenInputFile(info, this); } Result> S3FileSystem::OpenOutputStream( @@ -1585,8 +1586,8 @@ Result> S3FileSystem::OpenOutputStream( ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(s)); RETURN_NOT_OK(ValidateFilePath(path)); - auto ptr = - std::make_shared(impl_->client_.get(), path, impl_->options_); + auto ptr = std::make_shared( + shared_from_this(), impl_->client_.get(), path, impl_->options_); RETURN_NOT_OK(ptr->Init()); return ptr; } diff --git a/cpp/src/arrow/filesystem/s3fs_narrative_test.cc b/cpp/src/arrow/filesystem/s3fs_narrative_test.cc index b47df2f4966..548b046b245 100644 --- a/cpp/src/arrow/filesystem/s3fs_narrative_test.cc +++ b/cpp/src/arrow/filesystem/s3fs_narrative_test.cc @@ -87,7 +87,7 @@ void PrintError(const std::string& context_msg, const Result& result) { void ClearBucket(int argc, char** argv) { auto fs = MakeFileSystem(); - ASSERT_OK(fs->DeleteDirContents("")); + ASSERT_OK(fs->DeleteRootDirContents()); } void TestBucket(int argc, char** argv) { diff --git a/cpp/src/arrow/filesystem/s3fs_test.cc b/cpp/src/arrow/filesystem/s3fs_test.cc index 673c549df18..9fd1b57cdc1 100644 --- a/cpp/src/arrow/filesystem/s3fs_test.cc +++ b/cpp/src/arrow/filesystem/s3fs_test.cc @@ -360,6 +360,16 @@ class TestS3FS : public S3TestMixin { ASSERT_OK_AND_ASSIGN(stream, fs_->OpenOutputStream("bucket/newfile1")); ASSERT_OK(stream->Close()); AssertObjectContents(client_.get(), "bucket", "newfile1", ""); + + // Open file and then lose filesystem reference + ASSERT_EQ(fs_.use_count(), 1); // needed for test to work + std::weak_ptr weak_fs(fs_); + ASSERT_OK_AND_ASSIGN(stream, fs_->OpenOutputStream("bucket/newfile5")); + fs_.reset(); + ASSERT_FALSE(weak_fs.expired()); + ASSERT_OK(stream->Write("some data")); + ASSERT_OK(stream->Close()); + ASSERT_TRUE(weak_fs.expired()); } void TestOpenOutputStreamAbort() { @@ -682,11 +692,23 @@ TEST_F(TestS3FS, OpenInputStream) { AssertBufferEqual(*buf, "sub data"); ASSERT_OK_AND_ASSIGN(buf, stream->Read(100)); AssertBufferEqual(*buf, ""); + ASSERT_OK(stream->Close()); // "Directories" ASSERT_RAISES(IOError, fs_->OpenInputStream("bucket/emptydir")); ASSERT_RAISES(IOError, fs_->OpenInputStream("bucket/somedir")); ASSERT_RAISES(IOError, fs_->OpenInputStream("bucket")); + + // Open file and then lose filesystem reference + ASSERT_EQ(fs_.use_count(), 1); // needed for test to work + std::weak_ptr weak_fs(fs_); + ASSERT_OK_AND_ASSIGN(stream, fs_->OpenInputStream("bucket/somefile")); + fs_.reset(); + ASSERT_FALSE(weak_fs.expired()); + ASSERT_OK_AND_ASSIGN(buf, stream->Read(10)); + AssertBufferEqual(*buf, "some data"); + ASSERT_OK(stream->Close()); + ASSERT_TRUE(weak_fs.expired()); } TEST_F(TestS3FS, OpenInputFile) { diff --git a/python/pyarrow/tests/test_parquet.py b/python/pyarrow/tests/test_parquet.py index 0ade957ee48..1152cc62a32 100644 --- a/python/pyarrow/tests/test_parquet.py +++ b/python/pyarrow/tests/test_parquet.py @@ -3593,20 +3593,19 @@ def test_parquet_writer_filesystem_s3(s3_example_fs): tm.assert_frame_equal(result, df) -# TODO segfaulting (ARROW-9814?) -# @pytest.mark.pandas -# @pytest.mark.s3 -# def test_parquet_writer_filesystem_s3_uri(s3_example_fs): -# df = _test_dataframe(100) -# table = pa.Table.from_pandas(df, preserve_index=False) +@pytest.mark.pandas +@pytest.mark.s3 +def test_parquet_writer_filesystem_s3_uri(s3_example_fs): + df = _test_dataframe(100) + table = pa.Table.from_pandas(df, preserve_index=False) -# fs, uri, path = s3_example_fs + fs, uri, path = s3_example_fs -# with pq.ParquetWriter(uri, table.schema, version='2.0') as writer: -# writer.write_table(table) + with pq.ParquetWriter(uri, table.schema, version='2.0') as writer: + writer.write_table(table) -# result = _read_table(path, filesystem=fs).to_pandas() -# tm.assert_frame_equal(result, df) + result = _read_table(path, filesystem=fs).to_pandas() + tm.assert_frame_equal(result, df) @pytest.mark.pandas