diff --git a/cpp/src/arrow/filesystem/s3fs.cc b/cpp/src/arrow/filesystem/s3fs.cc index 04774171503..8cfc2204162 100644 --- a/cpp/src/arrow/filesystem/s3fs.cc +++ b/cpp/src/arrow/filesystem/s3fs.cc @@ -881,6 +881,11 @@ class S3ClientFinalizer : public std::enable_shared_from_this auto LockShared() { return std::shared_lock(mutex_); } + void Initialize() { + std::unique_lock lock(mutex_); + finalized_ = false; + } + protected: friend class S3ClientHolder; @@ -3419,22 +3424,20 @@ struct AwsInstance { // Returns true iff the instance was newly initialized with `options` Result EnsureInitialized(const S3GlobalOptions& options) { - // NOTE: The individual accesses are atomic but the entire sequence below is not. - // The application should serialize calls to InitializeS3() and FinalizeS3() - // (see docstrings). - if (is_finalized_.load()) { - return Status::Invalid("Attempt to initialize S3 after it has been finalized"); - } - bool newly_initialized = false; // EnsureInitialized() can be called concurrently by FileSystemFromUri, // therefore we need to serialize initialization (GH-39897). - std::call_once(initialize_flag_, [&]() { - bool was_initialized = is_initialized_.exchange(true); - DCHECK(!was_initialized); + // We use a mutex instead of std::call_once to allow re-initialization after + // finalization (as supported by the AWS SDK). + std::lock_guard lock(init_mutex_); + + if (!is_initialized_.load()) { + // Not already initialized, allow re-initialization after finalization + is_finalized_.store(false); + is_initialized_.store(true); DoInitialize(options); - newly_initialized = true; - }); - return newly_initialized; + return true; // newly initialized + } + return false; } bool IsInitialized() { return !is_finalized_ && is_initialized_; } @@ -3442,6 +3445,12 @@ struct AwsInstance { bool IsFinalized() { return is_finalized_; } void Finalize(bool from_destructor = false) { + std::unique_lock lock(init_mutex_, std::defer_lock); + // Don't try to acquire the lock from destructor to avoid potential deadlocks + if (!from_destructor) { + lock.lock(); + } + if (is_finalized_.exchange(true)) { // Already finalized return; @@ -3508,12 +3517,13 @@ struct AwsInstance { aws_options_.httpOptions.compliantRfc3986Encoding = true; aws_options_.httpOptions.installSigPipeHandler = options.install_sigpipe_handler; Aws::InitAPI(aws_options_); + GetClientFinalizer()->Initialize(); } Aws::SDKOptions aws_options_; std::atomic is_initialized_; std::atomic is_finalized_; - std::once_flag initialize_flag_; + std::mutex init_mutex_; }; AwsInstance* GetAwsInstance() {