Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 24 additions & 14 deletions cpp/src/arrow/filesystem/s3fs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -881,6 +881,11 @@ class S3ClientFinalizer : public std::enable_shared_from_this<S3ClientFinalizer>

auto LockShared() { return std::shared_lock(mutex_); }

void Initialize() {
std::unique_lock lock(mutex_);
finalized_ = false;
}

protected:
friend class S3ClientHolder;

Expand Down Expand Up @@ -3419,29 +3424,33 @@ struct AwsInstance {

// Returns true iff the instance was newly initialized with `options`
Result<bool> EnsureInitialized(const S3GlobalOptions& options) {
// NOTE: The individual accesses are atomic but the entire sequence below is not.
// The application should serialize calls to InitializeS3() and FinalizeS3()
// (see docstrings).
if (is_finalized_.load()) {
return Status::Invalid("Attempt to initialize S3 after it has been finalized");
}
bool newly_initialized = false;
// EnsureInitialized() can be called concurrently by FileSystemFromUri,
// therefore we need to serialize initialization (GH-39897).
std::call_once(initialize_flag_, [&]() {
bool was_initialized = is_initialized_.exchange(true);
DCHECK(!was_initialized);
// We use a mutex instead of std::call_once to allow re-initialization after
// finalization (as supported by the AWS SDK).
std::lock_guard<std::mutex> lock(init_mutex_);

if (!is_initialized_.load()) {
// Not already initialized, allow re-initialization after finalization
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we perhaps only allow it if the AWS SDK version is recent enough?

is_finalized_.store(false);
is_initialized_.store(true);
DoInitialize(options);
newly_initialized = true;
});
return newly_initialized;
return true; // newly initialized
}
return false;
}

bool IsInitialized() { return !is_finalized_ && is_initialized_; }

bool IsFinalized() { return is_finalized_; }

void Finalize(bool from_destructor = false) {
std::unique_lock<std::mutex> lock(init_mutex_, std::defer_lock);
// Don't try to acquire the lock from destructor to avoid potential deadlocks
if (!from_destructor) {
lock.lock();
}

if (is_finalized_.exchange(true)) {
// Already finalized
return;
Expand Down Expand Up @@ -3508,12 +3517,13 @@ struct AwsInstance {
aws_options_.httpOptions.compliantRfc3986Encoding = true;
aws_options_.httpOptions.installSigPipeHandler = options.install_sigpipe_handler;
Aws::InitAPI(aws_options_);
GetClientFinalizer()->Initialize();
}

Aws::SDKOptions aws_options_;
std::atomic<bool> is_initialized_;
std::atomic<bool> is_finalized_;
std::once_flag initialize_flag_;
std::mutex init_mutex_;
};

AwsInstance* GetAwsInstance() {
Expand Down
Loading