From 52e7920fd60c3a35a8e9e695cccf0eb664dec5af Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Tue, 24 Jan 2023 17:00:44 -0800 Subject: [PATCH 1/9] Changed s3 finalization to happen after arrow threads finished --- cpp/src/arrow/filesystem/s3fs.cc | 127 ++++++++++++++------------ cpp/src/arrow/filesystem/s3fs_test.cc | 7 +- python/pyarrow/fs.py | 2 + 3 files changed, 73 insertions(+), 63 deletions(-) diff --git a/cpp/src/arrow/filesystem/s3fs.cc b/cpp/src/arrow/filesystem/s3fs.cc index 3b5846f575c..1ed650b63b0 100644 --- a/cpp/src/arrow/filesystem/s3fs.cc +++ b/cpp/src/arrow/filesystem/s3fs.cc @@ -2573,98 +2573,105 @@ namespace { std::mutex aws_init_lock; Aws::SDKOptions aws_options; -std::atomic aws_initialized(false); +bool aws_initialized = false; -Status DoInitializeS3(const S3GlobalOptions& options) { - Aws::Utils::Logging::LogLevel aws_log_level; +struct AwsInstance : public ::arrow::internal::Executor::Resource { + AwsInstance(const S3GlobalOptions& options) { + Aws::Utils::Logging::LogLevel aws_log_level; #define LOG_LEVEL_CASE(level_name) \ case S3LogLevel::level_name: \ aws_log_level = Aws::Utils::Logging::LogLevel::level_name; \ break; - switch (options.log_level) { - LOG_LEVEL_CASE(Fatal) - LOG_LEVEL_CASE(Error) - LOG_LEVEL_CASE(Warn) - LOG_LEVEL_CASE(Info) - LOG_LEVEL_CASE(Debug) - LOG_LEVEL_CASE(Trace) - default: - aws_log_level = Aws::Utils::Logging::LogLevel::Off; - } + switch (options.log_level) { + LOG_LEVEL_CASE(Fatal) + LOG_LEVEL_CASE(Error) + LOG_LEVEL_CASE(Warn) + LOG_LEVEL_CASE(Info) + LOG_LEVEL_CASE(Debug) + LOG_LEVEL_CASE(Trace) + default: + aws_log_level = Aws::Utils::Logging::LogLevel::Off; + } #undef LOG_LEVEL_CASE #ifdef ARROW_S3_HAS_CRT - aws_options.ioOptions.clientBootstrap_create_fn = - [ev_threads = options.num_event_loop_threads]() { - // https://github.com/aws/aws-sdk-cpp/blob/1.11.15/src/aws-cpp-sdk-core/source/Aws.cpp#L65 - Aws::Crt::Io::EventLoopGroup event_loop_group(ev_threads); - Aws::Crt::Io::DefaultHostResolver default_host_resolver( - event_loop_group, /*maxHosts=*/8, /*maxTTL=*/30); - auto client_bootstrap = Aws::MakeShared( - "Aws_Init_Cleanup", event_loop_group, default_host_resolver); - client_bootstrap->EnableBlockingShutdown(); - return client_bootstrap; - }; + aws_options.ioOptions.clientBootstrap_create_fn = + [ev_threads = options.num_event_loop_threads]() { + // https://github.com/aws/aws-sdk-cpp/blob/1.11.15/src/aws-cpp-sdk-core/source/Aws.cpp#L65 + Aws::Crt::Io::EventLoopGroup event_loop_group(ev_threads); + Aws::Crt::Io::DefaultHostResolver default_host_resolver( + event_loop_group, /*maxHosts=*/8, /*maxTTL=*/30); + auto client_bootstrap = Aws::MakeShared( + "Aws_Init_Cleanup", event_loop_group, default_host_resolver); + client_bootstrap->EnableBlockingShutdown(); + return client_bootstrap; + }; #endif - - aws_options.loggingOptions.logLevel = aws_log_level; - // By default the AWS SDK logs to files, log to console instead - aws_options.loggingOptions.logger_create_fn = [] { - return std::make_shared( - aws_options.loggingOptions.logLevel); - }; + aws_options.loggingOptions.logLevel = aws_log_level; + // By default the AWS SDK logs to files, log to console instead + aws_options.loggingOptions.logger_create_fn = [] { + return std::make_shared( + aws_options.loggingOptions.logLevel); + }; #if (defined(AWS_SDK_VERSION_MAJOR) && \ (AWS_SDK_VERSION_MAJOR > 1 || AWS_SDK_VERSION_MINOR > 9 || \ (AWS_SDK_VERSION_MINOR == 9 && AWS_SDK_VERSION_PATCH >= 272))) - // ARROW-18290: escape all special chars for compatibility with non-AWS S3 backends. - // This configuration options is only available with AWS SDK 1.9.272 and later. - aws_options.httpOptions.compliantRfc3986Encoding = true; + // ARROW-18290: escape all special chars for compatibility with non-AWS S3 backends. + // This configuration options is only available with AWS SDK 1.9.272 and later. + aws_options.httpOptions.compliantRfc3986Encoding = true; #endif - Aws::InitAPI(aws_options); - aws_initialized.store(true); - return Status::OK(); + Aws::InitAPI(aws_options); + } + + ~AwsInstance() { + RegionResolver::ResetDefaultInstance(); + Aws::ShutdownAPI(aws_options); + } +}; + +std::shared_ptr CreateAwsInstance(const S3GlobalOptions& options) { + aws_initialized = true; + auto instance = std::make_shared(options); + // Don't let S3 be shutdown until all Arrow threads are done using it + arrow::internal::GetCpuThreadPool()->KeepAlive(instance); + io::internal::GetIOThreadPool()->KeepAlive(instance); + return instance; } -Status DoFinalizeS3() { - RegionResolver::ResetDefaultInstance(); - Aws::ShutdownAPI(aws_options); - aws_initialized.store(false); - return Status::OK(); +std::shared_ptr* GetAwsInstance(const S3GlobalOptions& options) { + static auto instance = CreateAwsInstance(options); + return &instance; } } // namespace Status InitializeS3(const S3GlobalOptions& options) { - std::lock_guard lock(aws_init_lock); - return DoInitializeS3(options); -} - -Status EnsureS3Initialized() { - std::lock_guard lock(aws_init_lock); - if (!aws_initialized.load()) { - S3GlobalOptions options{S3LogLevel::Fatal}; - return DoInitializeS3(options); + std::lock_guard lock(aws_init_lock); + if (!(*GetAwsInstance(options))) { + return Status::Invalid("S3 has already been initialized and finalized"); } return Status::OK(); } -Status FinalizeS3() { - std::lock_guard lock(aws_init_lock); - return DoFinalizeS3(); -} +Status EnsureS3Initialized() { return InitializeS3({S3LogLevel::Fatal}); } -Status EnsureS3Finalized() { - std::lock_guard lock(aws_init_lock); - if (aws_initialized.load()) { - return DoFinalizeS3(); +Status FinalizeS3() { + std::lock_guard lock(aws_init_lock); + if (aws_initialized) { + GetAwsInstance({})->reset(); } return Status::OK(); } -bool IsS3Initialized() { return aws_initialized.load(); } +Status EnsureS3Finalized() { return FinalizeS3(); } + +bool IsS3Initialized() { + std::lock_guard lock(aws_init_lock); + return aws_initialized; +} // ----------------------------------------------------------------------- // Top-level utility functions diff --git a/cpp/src/arrow/filesystem/s3fs_test.cc b/cpp/src/arrow/filesystem/s3fs_test.cc index 62957057855..38df84bdede 100644 --- a/cpp/src/arrow/filesystem/s3fs_test.cc +++ b/cpp/src/arrow/filesystem/s3fs_test.cc @@ -185,7 +185,7 @@ class S3TestMixin : public AwsTestMixin { Status connect_status; int retries = kNumServerRetries; do { - InitServerAndClient(); + ASSERT_OK(InitServerAndClient()); connect_status = OutcomeToStatus("ListBuckets", client_->ListBuckets()); } while (!connect_status.ok() && --retries > 0); ASSERT_OK(connect_status); @@ -198,8 +198,8 @@ class S3TestMixin : public AwsTestMixin { } protected: - void InitServerAndClient() { - ASSERT_OK_AND_ASSIGN(minio_, GetMinioEnv()->GetOneServer()); + Status InitServerAndClient() { + ARROW_ASSIGN_OR_RAISE(minio_, GetMinioEnv()->GetOneServer()); client_config_.reset(new Aws::Client::ClientConfiguration()); client_config_->endpointOverride = ToAwsString(minio_->connect_string()); client_config_->scheme = Aws::Http::Scheme::HTTP; @@ -211,6 +211,7 @@ class S3TestMixin : public AwsTestMixin { new Aws::S3::S3Client(credentials_, *client_config_, Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, use_virtual_addressing)); + return Status::OK(); } // How many times to try launching a server in a row before decreeing failure diff --git a/python/pyarrow/fs.py b/python/pyarrow/fs.py index e8e53225fb8..567bea8ac05 100644 --- a/python/pyarrow/fs.py +++ b/python/pyarrow/fs.py @@ -59,6 +59,8 @@ _not_imported.append("S3FileSystem") else: ensure_s3_initialized() + import atexit + atexit.register(finalize_s3) def __getattr__(name): From 139cfb32bc86dd336e2c21e17150d77fe18da904 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Wed, 25 Jan 2023 06:25:14 -0800 Subject: [PATCH 2/9] Change constructor to explicit per lint --- cpp/src/arrow/filesystem/s3fs.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/arrow/filesystem/s3fs.cc b/cpp/src/arrow/filesystem/s3fs.cc index 1ed650b63b0..aec86deb7e7 100644 --- a/cpp/src/arrow/filesystem/s3fs.cc +++ b/cpp/src/arrow/filesystem/s3fs.cc @@ -2576,7 +2576,7 @@ Aws::SDKOptions aws_options; bool aws_initialized = false; struct AwsInstance : public ::arrow::internal::Executor::Resource { - AwsInstance(const S3GlobalOptions& options) { + explicit AwsInstance(const S3GlobalOptions& options) { Aws::Utils::Logging::LogLevel aws_log_level; #define LOG_LEVEL_CASE(level_name) \ From 302958ff36ee0ed5f1e0179746f96bc9f1820606 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Tue, 11 Apr 2023 09:15:14 -0700 Subject: [PATCH 3/9] Get rid of aws_initiailized variable --- cpp/src/arrow/filesystem/s3fs.cc | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/cpp/src/arrow/filesystem/s3fs.cc b/cpp/src/arrow/filesystem/s3fs.cc index aec86deb7e7..865be244ace 100644 --- a/cpp/src/arrow/filesystem/s3fs.cc +++ b/cpp/src/arrow/filesystem/s3fs.cc @@ -2573,7 +2573,6 @@ namespace { std::mutex aws_init_lock; Aws::SDKOptions aws_options; -bool aws_initialized = false; struct AwsInstance : public ::arrow::internal::Executor::Resource { explicit AwsInstance(const S3GlobalOptions& options) { @@ -2633,7 +2632,6 @@ struct AwsInstance : public ::arrow::internal::Executor::Resource { }; std::shared_ptr CreateAwsInstance(const S3GlobalOptions& options) { - aws_initialized = true; auto instance = std::make_shared(options); // Don't let S3 be shutdown until all Arrow threads are done using it arrow::internal::GetCpuThreadPool()->KeepAlive(instance); @@ -2646,6 +2644,8 @@ std::shared_ptr* GetAwsInstance(const S3GlobalOptions& options) { return &instance; } +bool IsAwsInitialized() { return !!GetAwsInstance({}); } + } // namespace Status InitializeS3(const S3GlobalOptions& options) { @@ -2660,7 +2660,7 @@ Status EnsureS3Initialized() { return InitializeS3({S3LogLevel::Fatal}); } Status FinalizeS3() { std::lock_guard lock(aws_init_lock); - if (aws_initialized) { + if (IsAwsInitialized()) { GetAwsInstance({})->reset(); } return Status::OK(); @@ -2670,7 +2670,7 @@ Status EnsureS3Finalized() { return FinalizeS3(); } bool IsS3Initialized() { std::lock_guard lock(aws_init_lock); - return aws_initialized; + return IsAwsInitialized(); } // ----------------------------------------------------------------------- From 1b1b8fefe08251d1e2c16418c388cdeca46be918 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Thu, 13 Apr 2023 09:22:01 -0700 Subject: [PATCH 4/9] Fixed up the initialization logic so it is contained within the instance, detects repeat initialization attempts, and is safer. --- cpp/src/arrow/filesystem/s3fs.cc | 74 +++++++++++++++++++++----------- 1 file changed, 50 insertions(+), 24 deletions(-) diff --git a/cpp/src/arrow/filesystem/s3fs.cc b/cpp/src/arrow/filesystem/s3fs.cc index 865be244ace..dde0cb551e0 100644 --- a/cpp/src/arrow/filesystem/s3fs.cc +++ b/cpp/src/arrow/filesystem/s3fs.cc @@ -2571,11 +2571,38 @@ Result> S3FileSystem::OpenAppendStream( namespace { -std::mutex aws_init_lock; Aws::SDKOptions aws_options; struct AwsInstance : public ::arrow::internal::Executor::Resource { - explicit AwsInstance(const S3GlobalOptions& options) { + AwsInstance() : is_initialized_(false), is_finalized_(false) {} + ~AwsInstance() { Finalize(); } + + // Returns true iff the instance was newly initialized with `options` + Result EnsureInitialized(const S3GlobalOptions& options) { + bool expected = false; + if (is_finalized_.load()) { + return Status::Invalid("Attempt to initialize S3 after it has been finalized"); + } + if (is_initialized_.compare_exchange_strong(expected, true)) { + DoInitialize(options); + return true; + } + return false; + } + + bool IsInitialized() { return !is_finalized_ && is_initialized_; } + + void Finalize() { + bool expected = true; + is_finalized_.store(true); + if (is_initialized_.compare_exchange_strong(expected, false)) { + RegionResolver::ResetDefaultInstance(); + Aws::ShutdownAPI(aws_options); + } + } + + private: + void DoInitialize(const S3GlobalOptions& options) { Aws::Utils::Logging::LogLevel aws_log_level; #define LOG_LEVEL_CASE(level_name) \ @@ -2625,53 +2652,52 @@ struct AwsInstance : public ::arrow::internal::Executor::Resource { Aws::InitAPI(aws_options); } - ~AwsInstance() { - RegionResolver::ResetDefaultInstance(); - Aws::ShutdownAPI(aws_options); - } + std::atomic is_initialized_; + std::atomic is_finalized_; }; -std::shared_ptr CreateAwsInstance(const S3GlobalOptions& options) { - auto instance = std::make_shared(options); +std::shared_ptr CreateAwsInstance() { + auto instance = std::make_shared(); // Don't let S3 be shutdown until all Arrow threads are done using it arrow::internal::GetCpuThreadPool()->KeepAlive(instance); io::internal::GetIOThreadPool()->KeepAlive(instance); return instance; } -std::shared_ptr* GetAwsInstance(const S3GlobalOptions& options) { - static auto instance = CreateAwsInstance(options); - return &instance; +AwsInstance& GetAwsInstance() { + static auto instance = CreateAwsInstance(); + return *instance; } -bool IsAwsInitialized() { return !!GetAwsInstance({}); } +Result EnsureAwsInstanceInitialized(const S3GlobalOptions& options) { + return GetAwsInstance().EnsureInitialized(options); +} } // namespace Status InitializeS3(const S3GlobalOptions& options) { - std::lock_guard lock(aws_init_lock); - if (!(*GetAwsInstance(options))) { - return Status::Invalid("S3 has already been initialized and finalized"); + ARROW_ASSIGN_OR_RAISE(bool successfully_initialized, + EnsureAwsInstanceInitialized(options)); + if (!successfully_initialized) { + return Status::Invalid( + "S3 was already initialized. It is safe to use but the options passed in this " + "call have been ignored."); } return Status::OK(); } -Status EnsureS3Initialized() { return InitializeS3({S3LogLevel::Fatal}); } +Status EnsureS3Initialized() { + return EnsureAwsInstanceInitialized({S3LogLevel::Fatal}).status(); +} Status FinalizeS3() { - std::lock_guard lock(aws_init_lock); - if (IsAwsInitialized()) { - GetAwsInstance({})->reset(); - } + GetAwsInstance().Finalize(); return Status::OK(); } Status EnsureS3Finalized() { return FinalizeS3(); } -bool IsS3Initialized() { - std::lock_guard lock(aws_init_lock); - return IsAwsInitialized(); -} +bool IsS3Initialized() { return GetAwsInstance().IsInitialized(); } // ----------------------------------------------------------------------- // Top-level utility functions From 83ea3aacd12faf527c2b0120565722ad79c293ae Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Fri, 14 Apr 2023 11:48:13 -0700 Subject: [PATCH 5/9] Move aws_options inside singleton. Clarify that FinalizeS3 MUST be called. Add shutdown hook for R --- cpp/src/arrow/filesystem/s3fs.cc | 17 ++++++++--------- cpp/src/arrow/filesystem/s3fs.h | 3 +++ r/R/arrow-package.R | 12 ++++++++++++ r/R/arrowExports.R | 4 ++++ r/src/arrowExports.cpp | 9 +++++++++ r/src/filesystem.cpp | 7 +++++++ 6 files changed, 43 insertions(+), 9 deletions(-) diff --git a/cpp/src/arrow/filesystem/s3fs.cc b/cpp/src/arrow/filesystem/s3fs.cc index dde0cb551e0..2483f5b0176 100644 --- a/cpp/src/arrow/filesystem/s3fs.cc +++ b/cpp/src/arrow/filesystem/s3fs.cc @@ -2571,8 +2571,6 @@ Result> S3FileSystem::OpenAppendStream( namespace { -Aws::SDKOptions aws_options; - struct AwsInstance : public ::arrow::internal::Executor::Resource { AwsInstance() : is_initialized_(false), is_finalized_(false) {} ~AwsInstance() { Finalize(); } @@ -2597,7 +2595,7 @@ struct AwsInstance : public ::arrow::internal::Executor::Resource { is_finalized_.store(true); if (is_initialized_.compare_exchange_strong(expected, false)) { RegionResolver::ResetDefaultInstance(); - Aws::ShutdownAPI(aws_options); + Aws::ShutdownAPI(aws_options_); } } @@ -2624,7 +2622,7 @@ struct AwsInstance : public ::arrow::internal::Executor::Resource { #undef LOG_LEVEL_CASE #ifdef ARROW_S3_HAS_CRT - aws_options.ioOptions.clientBootstrap_create_fn = + aws_options_.ioOptions.clientBootstrap_create_fn = [ev_threads = options.num_event_loop_threads]() { // https://github.com/aws/aws-sdk-cpp/blob/1.11.15/src/aws-cpp-sdk-core/source/Aws.cpp#L65 Aws::Crt::Io::EventLoopGroup event_loop_group(ev_threads); @@ -2636,22 +2634,23 @@ struct AwsInstance : public ::arrow::internal::Executor::Resource { return client_bootstrap; }; #endif - aws_options.loggingOptions.logLevel = aws_log_level; + aws_options_.loggingOptions.logLevel = aws_log_level; // By default the AWS SDK logs to files, log to console instead - aws_options.loggingOptions.logger_create_fn = [] { + aws_options_.loggingOptions.logger_create_fn = [this] { return std::make_shared( - aws_options.loggingOptions.logLevel); + aws_options_.loggingOptions.logLevel); }; #if (defined(AWS_SDK_VERSION_MAJOR) && \ (AWS_SDK_VERSION_MAJOR > 1 || AWS_SDK_VERSION_MINOR > 9 || \ (AWS_SDK_VERSION_MINOR == 9 && AWS_SDK_VERSION_PATCH >= 272))) // ARROW-18290: escape all special chars for compatibility with non-AWS S3 backends. // This configuration options is only available with AWS SDK 1.9.272 and later. - aws_options.httpOptions.compliantRfc3986Encoding = true; + aws_options_.httpOptions.compliantRfc3986Encoding = true; #endif - Aws::InitAPI(aws_options); + Aws::InitAPI(aws_options_); } + Aws::SDKOptions aws_options_; std::atomic is_initialized_; std::atomic is_finalized_; }; diff --git a/cpp/src/arrow/filesystem/s3fs.h b/cpp/src/arrow/filesystem/s3fs.h index 2be16f869d6..2bccecafe8e 100644 --- a/cpp/src/arrow/filesystem/s3fs.h +++ b/cpp/src/arrow/filesystem/s3fs.h @@ -335,6 +335,9 @@ struct ARROW_EXPORT S3GlobalOptions { /// Initialize the S3 APIs. It is required to call this function at least once /// before using S3FileSystem. +/// +/// Once this function is called you MUST call FinalizeS3 before the end of the +/// application in order to avoid a segmentation fault at shutdown. ARROW_EXPORT Status InitializeS3(const S3GlobalOptions& options); diff --git a/r/R/arrow-package.R b/r/R/arrow-package.R index a3c860a51c8..3d78a890b82 100644 --- a/r/R/arrow-package.R +++ b/r/R/arrow-package.R @@ -106,6 +106,12 @@ supported_dplyr_methods <- list( explain = NULL ) +# This should be run at session exit and must be called +# to avoid a segmentation fault at shutdown +finalize_s3 <- function(env) { + FinalizeS3() +} + #' @importFrom vctrs s3_register vec_size vec_cast vec_unique .onLoad <- function(...) { # Make sure C++ knows on which thread it is safe to call the R API @@ -147,6 +153,12 @@ supported_dplyr_methods <- list( # Register extension types that we use internally reregister_extension_type(vctrs_extension_type(vctrs::unspecified())) + # Registers a callback to run at session exit + .onLoad <- function(libname, pkgname) { + print(parent.env(environment())) + reg.finalizer(parent.env(environment()), finalize_s3, onexit=TRUE) + } + invisible() } diff --git a/r/R/arrowExports.R b/r/R/arrowExports.R index b3dd3a96018..a8e8f5b8af3 100644 --- a/r/R/arrowExports.R +++ b/r/R/arrowExports.R @@ -1352,6 +1352,10 @@ fs___S3FileSystem__region <- function(fs) { .Call(`_arrow_fs___S3FileSystem__region`, fs) } +FinalizeS3 <- function() { + invisible(.Call(`_arrow_FinalizeS3`)) +} + fs___GcsFileSystem__Make <- function(anonymous, options) { .Call(`_arrow_fs___GcsFileSystem__Make`, anonymous, options) } diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp index fb333e0c070..55c59f4b388 100644 --- a/r/src/arrowExports.cpp +++ b/r/src/arrowExports.cpp @@ -3489,6 +3489,14 @@ extern "C" SEXP _arrow_fs___S3FileSystem__region(SEXP fs_sexp){ } #endif +// filesystem.cpp +void FinalizeS3(); +extern "C" SEXP _arrow_FinalizeS3(){ +BEGIN_CPP11 + FinalizeS3(); + return R_NilValue; +END_CPP11 +} // filesystem.cpp #if defined(ARROW_R_WITH_GCS) std::shared_ptr fs___GcsFileSystem__Make(bool anonymous, cpp11::list options); @@ -5828,6 +5836,7 @@ static const R_CallMethodDef CallEntries[] = { { "_arrow_fs___CopyFiles", (DL_FUNC) &_arrow_fs___CopyFiles, 6}, { "_arrow_fs___S3FileSystem__create", (DL_FUNC) &_arrow_fs___S3FileSystem__create, 17}, { "_arrow_fs___S3FileSystem__region", (DL_FUNC) &_arrow_fs___S3FileSystem__region, 1}, + { "_arrow_FinalizeS3", (DL_FUNC) &_arrow_FinalizeS3, 0}, { "_arrow_fs___GcsFileSystem__Make", (DL_FUNC) &_arrow_fs___GcsFileSystem__Make, 2}, { "_arrow_fs___GcsFileSystem__options", (DL_FUNC) &_arrow_fs___GcsFileSystem__options, 1}, { "_arrow_io___Readable__Read", (DL_FUNC) &_arrow_io___Readable__Read, 2}, diff --git a/r/src/filesystem.cpp b/r/src/filesystem.cpp index cd795c0f80a..4388d111b4a 100644 --- a/r/src/filesystem.cpp +++ b/r/src/filesystem.cpp @@ -351,6 +351,13 @@ std::string fs___S3FileSystem__region(const std::shared_ptr& f #endif +// [[arrow::export]] +void FinalizeS3() { +#if defined(ARROW_R_WITH_S3) + StopIfNotOk(fs::FinalizeS3()); +#endif +} + #if defined(ARROW_R_WITH_GCS) #include From 234dbe85f1386e2de8d12836f68a238e23f85803 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Fri, 14 Apr 2023 12:37:30 -0700 Subject: [PATCH 6/9] Removed a debug print. Removed an extra nested .onLoad function. Use a dedicated environment instead of the parent environment --- r/R/arrow-package.R | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/r/R/arrow-package.R b/r/R/arrow-package.R index 3d78a890b82..16cf9092d9d 100644 --- a/r/R/arrow-package.R +++ b/r/R/arrow-package.R @@ -112,6 +112,9 @@ finalize_s3 <- function(env) { FinalizeS3() } +# Helper environment to register the exit hook +s3_finalizer <- new.env(parent = emptyenv()) + #' @importFrom vctrs s3_register vec_size vec_cast vec_unique .onLoad <- function(...) { # Make sure C++ knows on which thread it is safe to call the R API @@ -154,10 +157,7 @@ finalize_s3 <- function(env) { reregister_extension_type(vctrs_extension_type(vctrs::unspecified())) # Registers a callback to run at session exit - .onLoad <- function(libname, pkgname) { - print(parent.env(environment())) - reg.finalizer(parent.env(environment()), finalize_s3, onexit=TRUE) - } + reg.finalizer(s3_finalizer, finalize_s3, onexit=TRUE) invisible() } From c152aaf38365f6a418a209a0ac38b99b66ad5548 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Fri, 14 Apr 2023 13:11:14 -0700 Subject: [PATCH 7/9] Add explanatory comment per PR review --- r/R/arrow-package.R | 2 ++ 1 file changed, 2 insertions(+) diff --git a/r/R/arrow-package.R b/r/R/arrow-package.R index 16cf9092d9d..d45614a314e 100644 --- a/r/R/arrow-package.R +++ b/r/R/arrow-package.R @@ -157,6 +157,8 @@ s3_finalizer <- new.env(parent = emptyenv()) reregister_extension_type(vctrs_extension_type(vctrs::unspecified())) # Registers a callback to run at session exit + # This can't be done in .onUnload or .onDetach because those hooks are + # not guaranteed to run (e.g. they only run if the user unloads arrow) reg.finalizer(s3_finalizer, finalize_s3, onexit=TRUE) invisible() From 17809908e724f91326a2db2940819d8af8be9e43 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Fri, 14 Apr 2023 15:03:02 -0700 Subject: [PATCH 8/9] R lint fix --- r/R/arrow-package.R | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/r/R/arrow-package.R b/r/R/arrow-package.R index d45614a314e..76c420e21fa 100644 --- a/r/R/arrow-package.R +++ b/r/R/arrow-package.R @@ -159,7 +159,7 @@ s3_finalizer <- new.env(parent = emptyenv()) # Registers a callback to run at session exit # This can't be done in .onUnload or .onDetach because those hooks are # not guaranteed to run (e.g. they only run if the user unloads arrow) - reg.finalizer(s3_finalizer, finalize_s3, onexit=TRUE) + reg.finalizer(s3_finalizer, finalize_s3, onexit = TRUE) invisible() } From f817dfecbf8e69aca4c9a749acd03d695f92fedf Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Fri, 14 Apr 2023 17:13:06 -0700 Subject: [PATCH 9/9] Added a warning that should help users remember to call Finalize --- cpp/src/arrow/filesystem/s3fs.cc | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/cpp/src/arrow/filesystem/s3fs.cc b/cpp/src/arrow/filesystem/s3fs.cc index 2483f5b0176..a22d9c10bec 100644 --- a/cpp/src/arrow/filesystem/s3fs.cc +++ b/cpp/src/arrow/filesystem/s3fs.cc @@ -2573,7 +2573,7 @@ namespace { struct AwsInstance : public ::arrow::internal::Executor::Resource { AwsInstance() : is_initialized_(false), is_finalized_(false) {} - ~AwsInstance() { Finalize(); } + ~AwsInstance() { Finalize(/*from_destructor=*/true); } // Returns true iff the instance was newly initialized with `options` Result EnsureInitialized(const S3GlobalOptions& options) { @@ -2590,12 +2590,17 @@ struct AwsInstance : public ::arrow::internal::Executor::Resource { bool IsInitialized() { return !is_finalized_ && is_initialized_; } - void Finalize() { + void Finalize(bool from_destructor = false) { bool expected = true; is_finalized_.store(true); if (is_initialized_.compare_exchange_strong(expected, false)) { - RegionResolver::ResetDefaultInstance(); - Aws::ShutdownAPI(aws_options_); + if (from_destructor) { + ARROW_LOG(WARNING) + << " arrow::fs::FinalizeS3 was not called even though S3 was initialized. " + "This could lead to a segmentation fault at exit"; + RegionResolver::ResetDefaultInstance(); + Aws::ShutdownAPI(aws_options_); + } } }