From 214a5618c25b049561db17015f16bf3eb04a70ca Mon Sep 17 00:00:00 2001 From: Sutou Kouhei Date: Mon, 3 Jul 2023 14:08:05 +0900 Subject: [PATCH 1/2] GH-36346: [C++][S3] Shutdown aws-sdk-cpp related resources on finalize All S3 related operations are failed after we call arrow::fs::FinalizeS3(). --- cpp/src/arrow/filesystem/s3fs.cc | 79 +++++++++++++++++++++++++++++--- cpp/src/arrow/filesystem/s3fs.h | 4 ++ 2 files changed, 77 insertions(+), 6 deletions(-) diff --git a/cpp/src/arrow/filesystem/s3fs.cc b/cpp/src/arrow/filesystem/s3fs.cc index c3a6eb0eace..2971c431448 100644 --- a/cpp/src/arrow/filesystem/s3fs.cc +++ b/cpp/src/arrow/filesystem/s3fs.cc @@ -398,12 +398,19 @@ namespace { Status CheckS3Initialized() { if (!IsS3Initialized()) { return Status::Invalid( - "S3 subsystem not initialized; please call InitializeS3() " + "S3 subsystem is not initialized; please call InitializeS3() " "before carrying out any S3-related operation"); } return Status::OK(); } +Status CheckS3Finalized() { + if (IsS3Finalized()) { + return Status::Invalid("S3 subsystem is finalized"); + } + return Status::OK(); +} + // XXX Sanitize paths by removing leading slash? struct S3Path { @@ -1008,6 +1015,8 @@ class ObjectInputFile final : public io::RandomAccessFile { content_length_(size) {} Status Init() { + RETURN_NOT_OK(CheckS3Finalized()); + // Issue a HEAD Object to get the content-length and ensure any // errors (e.g. file not found) don't wait until the first Read() call. if (content_length_ != kNoSize) { @@ -1099,6 +1108,8 @@ class ObjectInputFile final : public io::RandomAccessFile { return 0; } + RETURN_NOT_OK(CheckS3Finalized()); + // Read the desired range of bytes ARROW_ASSIGN_OR_RAISE(S3Model::GetObjectResult result, GetObjectRange(client_.get(), path_, position, nbytes, out)); @@ -1182,6 +1193,8 @@ class ObjectOutputStream final : public io::OutputStream { } Status Init() { + RETURN_NOT_OK(CheckS3Finalized()); + // Initiate the multi-part upload S3Model::CreateMultipartUploadRequest req; req.SetBucket(ToAwsString(path_.bucket)); @@ -1217,6 +1230,8 @@ class ObjectOutputStream final : public io::OutputStream { return Status::OK(); } + RETURN_NOT_OK(CheckS3Finalized()); + S3Model::AbortMultipartUploadRequest req; req.SetBucket(ToAwsString(path_.bucket)); req.SetKey(ToAwsString(path_.key)); @@ -1245,6 +1260,8 @@ class ObjectOutputStream final : public io::OutputStream { Future<> CloseAsync() override { if (closed_) return Status::OK(); + RETURN_NOT_OK(CheckS3Finalized()); + if (current_part_) { // Upload last part RETURN_NOT_OK(CommitCurrentPart()); @@ -1307,6 +1324,8 @@ class ObjectOutputStream final : public io::OutputStream { return Status::Invalid("Operation on closed stream"); } + RETURN_NOT_OK(CheckS3Finalized()); + const int8_t* data_ptr = reinterpret_cast(data); auto advance_ptr = [&data_ptr, &nbytes](const int64_t offset) { data_ptr += offset; @@ -1359,6 +1378,7 @@ class ObjectOutputStream final : public io::OutputStream { if (closed_) { return Status::Invalid("Operation on closed stream"); } + RETURN_NOT_OK(CheckS3Finalized()); // Wait for background writes to finish std::unique_lock lock(upload_state_->mutex); return upload_state_->pending_parts_completed; @@ -1367,6 +1387,7 @@ class ObjectOutputStream final : public io::OutputStream { // Upload-related helpers Status CommitCurrentPart() { + RETURN_NOT_OK(CheckS3Finalized()); ARROW_ASSIGN_OR_RAISE(auto buf, current_part_->Finish()); current_part_.reset(); current_part_size_ = 0; @@ -1379,6 +1400,8 @@ class ObjectOutputStream final : public io::OutputStream { Status UploadPart(const void* data, int64_t nbytes, std::shared_ptr owned_buffer = nullptr) { + RETURN_NOT_OK(CheckS3Finalized()); + S3Model::UploadPartRequest req; req.SetBucket(ToAwsString(path_.bucket)); req.SetKey(ToAwsString(path_.key)); @@ -1574,6 +1597,8 @@ struct TreeWalker : public std::enable_shared_from_this { S3Model::ListObjectsV2Request req; Status operator()(const Result& result) { + RETURN_NOT_OK(CheckS3Finalized()); + // Serialize calls to operation-specific handlers if (!walker->ok()) { // Early exit: avoid executing handlers if DoWalk() returned @@ -1692,6 +1717,8 @@ class S3FileSystem::Impl : public std::enable_shared_from_this BucketExists(const std::string& bucket) { + RETURN_NOT_OK(CheckS3Finalized()); + S3Model::HeadBucketRequest req; req.SetBucket(ToAwsString(bucket)); @@ -1709,6 +1736,8 @@ class S3FileSystem::Impl : public std::enable_shared_from_this IsEmptyDirectory( const std::string& bucket, const std::string& key, const S3Model::HeadObjectOutcome* previous_outcome = nullptr) { + RETURN_NOT_OK(CheckS3Finalized()); + if (previous_outcome) { // Fetch the backend from the previous error DCHECK(!previous_outcome->IsSuccess()); @@ -1850,6 +1885,7 @@ class S3FileSystem::Impl : public std::enable_shared_from_this IsNonEmptyDirectory(const S3Path& path) { + RETURN_NOT_OK(CheckS3Finalized()); S3Model::ListObjectsV2Request req; req.SetBucket(ToAwsString(path.bucket)); req.SetPrefix(ToAwsString(path.key) + kSep); @@ -1939,6 +1975,8 @@ class S3FileSystem::Impl : public std::enable_shared_from_this* out) { + RETURN_NOT_OK(CheckS3Finalized()); + FileInfoCollector collector(bucket, key, select); auto handle_error = [&](const AWSError& error) -> Status { @@ -2027,6 +2065,8 @@ class S3FileSystem::Impl : public std::enable_shared_from_this> WalkForDeleteDirAsync(const std::string& bucket, const std::string& key) { + RETURN_NOT_OK(CheckS3Finalized()); + auto state = std::make_shared(); auto handle_results = [state](const std::string& prefix, @@ -2064,6 +2104,8 @@ class S3FileSystem::Impl : public std::enable_shared_from_this DeleteObjectsAsync(const std::string& bucket, const std::vector& keys) { + RETURN_NOT_OK(CheckS3Finalized()); + struct DeleteCallback { const std::string bucket; @@ -2156,6 +2198,7 @@ class S3FileSystem::Impl : public std::enable_shared_from_this> ProcessListBuckets( const Aws::S3::Model::ListBucketsOutcome& outcome) { + RETURN_NOT_OK(CheckS3Finalized()); if (!outcome.IsSuccess()) { return ErrorToStatus(std::forward_as_tuple("When listing buckets: "), "ListBuckets", outcome.GetError()); @@ -2169,11 +2212,13 @@ class S3FileSystem::Impl : public std::enable_shared_from_this> ListBuckets() { + RETURN_NOT_OK(CheckS3Finalized()); auto outcome = client_->ListBuckets(); return ProcessListBuckets(outcome); } Future> ListBucketsAsync(io::IOContext ctx) { + RETURN_NOT_OK(CheckS3Finalized()); auto self = shared_from_this(); return DeferNotOk(SubmitIO(ctx, [self]() { return self->client_->ListBuckets(); })) // TODO(ARROW-12655) Change to Then(Impl::ProcessListBuckets) @@ -2187,6 +2232,7 @@ class S3FileSystem::Impl : public std::enable_shared_from_this(client_, fs->io_context(), path); RETURN_NOT_OK(ptr->Init()); @@ -2205,6 +2251,7 @@ class S3FileSystem::Impl : public std::enable_shared_from_this(client_, fs->io_context(), path, info.size()); @@ -2223,6 +2270,7 @@ S3FileSystem::~S3FileSystem() {} Result> S3FileSystem::Make( const S3Options& options, const io::IOContext& io_context) { RETURN_NOT_OK(CheckS3Initialized()); + RETURN_NOT_OK(CheckS3Finalized()); std::shared_ptr ptr(new S3FileSystem(options, io_context)); RETURN_NOT_OK(ptr->impl_->Init()); @@ -2250,6 +2298,8 @@ S3Options S3FileSystem::options() const { return impl_->options(); } std::string S3FileSystem::region() const { return impl_->region(); } Result S3FileSystem::GetFileInfo(const std::string& s) { + RETURN_NOT_OK(CheckS3Finalized()); + ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(s)); FileInfo info; info.set_path(s); @@ -2313,6 +2363,8 @@ Result S3FileSystem::GetFileInfo(const std::string& s) { } Result S3FileSystem::GetFileInfo(const FileSelector& select) { + RETURN_NOT_OK(CheckS3Finalized()); + ARROW_ASSIGN_OR_RAISE(auto base_path, S3Path::FromString(select.base_dir)); FileInfoVector results; @@ -2383,6 +2435,8 @@ FileInfoGenerator S3FileSystem::GetFileInfoGenerator(const FileSelector& select) } Status S3FileSystem::CreateDir(const std::string& s, bool recursive) { + RETURN_NOT_OK(CheckS3Finalized()); + ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(s)); if (path.key.empty()) { @@ -2426,6 +2480,8 @@ Status S3FileSystem::CreateDir(const std::string& s, bool recursive) { } Status S3FileSystem::DeleteDir(const std::string& s) { + RETURN_NOT_OK(CheckS3Finalized()); + ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(s)); if (path.empty()) { @@ -2455,6 +2511,8 @@ Status S3FileSystem::DeleteDirContents(const std::string& s, bool missing_dir_ok } Future<> S3FileSystem::DeleteDirContentsAsync(const std::string& s, bool missing_dir_ok) { + RETURN_NOT_OK(CheckS3Finalized()); + ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(s)); if (path.empty()) { @@ -2480,6 +2538,8 @@ Status S3FileSystem::DeleteRootDirContents() { } Status S3FileSystem::DeleteFile(const std::string& s) { + RETURN_NOT_OK(CheckS3Finalized()); + ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(s)); RETURN_NOT_OK(ValidateFilePath(path)); @@ -2506,6 +2566,8 @@ Status S3FileSystem::DeleteFile(const std::string& s) { } Status S3FileSystem::Move(const std::string& src, const std::string& dest) { + RETURN_NOT_OK(CheckS3Finalized()); + // XXX We don't implement moving directories as it would be too expensive: // one must copy all directory contents one by one (including object data), // then delete the original contents. @@ -2525,6 +2587,8 @@ Status S3FileSystem::Move(const std::string& src, const std::string& dest) { } Status S3FileSystem::CopyFile(const std::string& src, const std::string& dest) { + RETURN_NOT_OK(CheckS3Finalized()); + ARROW_ASSIGN_OR_RAISE(auto src_path, S3Path::FromString(src)); RETURN_NOT_OK(ValidateFilePath(src_path)); ARROW_ASSIGN_OR_RAISE(auto dest_path, S3Path::FromString(dest)); @@ -2562,6 +2626,8 @@ Result> S3FileSystem::OpenOutputStream( ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(s)); RETURN_NOT_OK(ValidateFilePath(path)); + RETURN_NOT_OK(CheckS3Finalized()); + auto ptr = std::make_shared(impl_->client_, io_context(), path, impl_->options(), metadata); RETURN_NOT_OK(ptr->Init()); @@ -2600,6 +2666,8 @@ struct AwsInstance : public ::arrow::internal::Executor::Resource { bool IsInitialized() { return !is_finalized_ && is_initialized_; } + bool IsFinalized() { return is_finalized_; } + void Finalize(bool from_destructor = false) { bool expected = true; is_finalized_.store(true); @@ -2608,9 +2676,9 @@ struct AwsInstance : public ::arrow::internal::Executor::Resource { ARROW_LOG(WARNING) << " arrow::fs::FinalizeS3 was not called even though S3 was initialized. " "This could lead to a segmentation fault at exit"; - RegionResolver::ResetDefaultInstance(); - Aws::ShutdownAPI(aws_options_); } + RegionResolver::ResetDefaultInstance(); + Aws::ShutdownAPI(aws_options_); } } @@ -2672,9 +2740,6 @@ struct AwsInstance : public ::arrow::internal::Executor::Resource { std::shared_ptr CreateAwsInstance() { auto instance = std::make_shared(); - // Don't let S3 be shutdown until all Arrow threads are done using it - arrow::internal::GetCpuThreadPool()->KeepAlive(instance); - io::internal::GetIOThreadPool()->KeepAlive(instance); return instance; } @@ -2713,6 +2778,8 @@ Status EnsureS3Finalized() { return FinalizeS3(); } bool IsS3Initialized() { return GetAwsInstance().IsInitialized(); } +bool IsS3Finalized() { return GetAwsInstance().IsFinalized(); } + // ----------------------------------------------------------------------- // Top-level utility functions diff --git a/cpp/src/arrow/filesystem/s3fs.h b/cpp/src/arrow/filesystem/s3fs.h index 0a7ca73ccb2..34d88c632e5 100644 --- a/cpp/src/arrow/filesystem/s3fs.h +++ b/cpp/src/arrow/filesystem/s3fs.h @@ -351,6 +351,10 @@ Status EnsureS3Initialized(); ARROW_EXPORT bool IsS3Initialized(); +/// Whether S3 was finalized. +ARROW_EXPORT +bool IsS3Finalized(); + /// Shutdown the S3 APIs. ARROW_EXPORT Status FinalizeS3(); From 01fad302747ddd8971ef4db3140222f0bbaf32b5 Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Mon, 3 Jul 2023 16:13:20 +0200 Subject: [PATCH 2/2] Add Python-based test --- python/pyarrow/tests/test_fs.py | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/python/pyarrow/tests/test_fs.py b/python/pyarrow/tests/test_fs.py index 59bbb5a3921..e35cf43bd2e 100644 --- a/python/pyarrow/tests/test_fs.py +++ b/python/pyarrow/tests/test_fs.py @@ -20,6 +20,8 @@ import os import pathlib import pickle +import subprocess +import sys import pytest import weakref @@ -1818,3 +1820,22 @@ def check_copied_files(destination_dir): destination_dir5.mkdir() copy_files(source_dir, destination_dir5, chunk_size=1, use_threads=False) check_copied_files(destination_dir5) + + +@pytest.mark.s3 +def test_s3_finalize(): + code = """if 1: + import pytest + from pyarrow.fs import FileSystem, ensure_s3_initialized, finalize_s3 + + fs, path = FileSystem.from_uri('s3://mf-nwp-models/README.txt') + assert fs.region == 'eu-west-1' + with fs.open_input_stream(path) as f: + f.read(50) + + finalize_s3() + + with pytest.raises(ValueError, match="S3 subsystem is finalized"): + fs.open_input_stream(path) + """ + subprocess.check_call([sys.executable, "-c", code])