From 220745ffbd2c5fe8a9769434eaa465a725e25280 Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Fri, 20 Oct 2023 01:51:48 -0700 Subject: [PATCH 1/4] Initialize S3 on first use Initializing S3 on import may cause undesired consequences for users who do not use S3: - Longer import times; - Consumption of unnecessary resources, e.g., AWS event loop thread(s); - Potential exposure to bugs in S3 package dependencies. Therefore, a more appropriate way to handle S3 initialization seems to be to move it to its first use. --- python/pyarrow/_s3fs.pyx | 33 ++++++++++++++++++++++++++++++++- python/pyarrow/fs.py | 4 ---- 2 files changed, 32 insertions(+), 5 deletions(-) diff --git a/python/pyarrow/_s3fs.pyx b/python/pyarrow/_s3fs.pyx index ab451713699..c3a291d2aed 100644 --- a/python/pyarrow/_s3fs.pyx +++ b/python/pyarrow/_s3fs.pyx @@ -37,6 +37,9 @@ cpdef enum S3LogLevel: Debug = CS3LogLevel_Debug Trace = CS3LogLevel_Trace +# Prevent registration of multiple `atexit` handlers +_initialized = False + def initialize_s3(S3LogLevel log_level=S3LogLevel.Fatal, int num_event_loop_threads=1): """ @@ -63,7 +66,13 @@ def ensure_s3_initialized(): """ Initialize S3 (with default options) if not already initialized """ - check_status(CEnsureS3Initialized()) + global _initialized + + if not _initialized: + check_status(CEnsureS3Initialized()) + import atexit + atexit.register(finalize_s3) + _initialized = True def finalize_s3(): @@ -93,6 +102,8 @@ def resolve_s3_region(bucket): c_string c_bucket c_string c_region + ensure_s3_initialized() + c_bucket = tobytes(bucket) with nogil: c_region = GetResultValue(ResolveS3BucketRegion(c_bucket)) @@ -260,6 +271,26 @@ cdef class S3FileSystem(FileSystem): load_frequency=900, proxy_options=None, allow_bucket_creation=False, allow_bucket_deletion=False, retry_strategy: S3RetryStrategy = AwsStandardS3RetryStrategy(max_attempts=3)): + ensure_s3_initialized() + + self._initialize_s3(access_key=access_key, secret_key=secret_key, session_token=session_token, + anonymous=anonymous, region=region, request_timeout=request_timeout, + connect_timeout=connect_timeout, scheme=scheme, endpoint_override=endpoint_override, + background_writes=background_writes, default_metadata=default_metadata, + role_arn=role_arn, session_name=session_name, external_id=external_id, + load_frequency=load_frequency, proxy_options=proxy_options, + allow_bucket_creation=allow_bucket_creation, allow_bucket_deletion=allow_bucket_deletion, + retry_strategy=retry_strategy) + + def _initialize_s3(self, *, access_key=None, secret_key=None, session_token=None, + bint anonymous=False, region=None, request_timeout=None, + connect_timeout=None, scheme=None, endpoint_override=None, + bint background_writes=True, default_metadata=None, + role_arn=None, session_name=None, external_id=None, + load_frequency=900, proxy_options=None, + allow_bucket_creation=False, allow_bucket_deletion=False, + retry_strategy: S3RetryStrategy = AwsStandardS3RetryStrategy(max_attempts=3)): + cdef: optional[CS3Options] options shared_ptr[CS3FileSystem] wrapped diff --git a/python/pyarrow/fs.py b/python/pyarrow/fs.py index 36655c7d128..1abf8a43c69 100644 --- a/python/pyarrow/fs.py +++ b/python/pyarrow/fs.py @@ -57,10 +57,6 @@ finalize_s3, initialize_s3, resolve_s3_region) except ImportError: _not_imported.append("S3FileSystem") -else: - ensure_s3_initialized() - import atexit - atexit.register(finalize_s3) def __getattr__(name): From 50aa5ee775246026c934ba29c189967d8c083af6 Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Tue, 24 Oct 2023 00:09:47 -0700 Subject: [PATCH 2/4] Use `arrow::fs::EnsureS3Finalized` as Python finalizer --- python/pyarrow/_s3fs.pyx | 18 ++++++++---------- python/pyarrow/fs.py | 5 ++++- python/pyarrow/includes/libarrow_fs.pxd | 1 + 3 files changed, 13 insertions(+), 11 deletions(-) diff --git a/python/pyarrow/_s3fs.pyx b/python/pyarrow/_s3fs.pyx index c3a291d2aed..7e44b9fc353 100644 --- a/python/pyarrow/_s3fs.pyx +++ b/python/pyarrow/_s3fs.pyx @@ -37,9 +37,6 @@ cpdef enum S3LogLevel: Debug = CS3LogLevel_Debug Trace = CS3LogLevel_Trace -# Prevent registration of multiple `atexit` handlers -_initialized = False - def initialize_s3(S3LogLevel log_level=S3LogLevel.Fatal, int num_event_loop_threads=1): """ @@ -66,19 +63,20 @@ def ensure_s3_initialized(): """ Initialize S3 (with default options) if not already initialized """ - global _initialized - - if not _initialized: - check_status(CEnsureS3Initialized()) - import atexit - atexit.register(finalize_s3) - _initialized = True + check_status(CEnsureS3Initialized()) def finalize_s3(): check_status(CFinalizeS3()) +def ensure_s3_finalized(): + """ + Finalize S3 if already initialized + """ + check_status(CEnsureS3Finalized()) + + def resolve_s3_region(bucket): """ Resolve the S3 region of a bucket. diff --git a/python/pyarrow/fs.py b/python/pyarrow/fs.py index 1abf8a43c69..4cb723f53eb 100644 --- a/python/pyarrow/fs.py +++ b/python/pyarrow/fs.py @@ -54,9 +54,12 @@ from pyarrow._s3fs import ( # noqa AwsDefaultS3RetryStrategy, AwsStandardS3RetryStrategy, S3FileSystem, S3LogLevel, S3RetryStrategy, ensure_s3_initialized, - finalize_s3, initialize_s3, resolve_s3_region) + finalize_s3, ensure_s3_finalized, initialize_s3, resolve_s3_region) except ImportError: _not_imported.append("S3FileSystem") +else: + import atexit + atexit.register(ensure_s3_finalized) def __getattr__(name): diff --git a/python/pyarrow/includes/libarrow_fs.pxd b/python/pyarrow/includes/libarrow_fs.pxd index 2727fc20119..cb30f4e750e 100644 --- a/python/pyarrow/includes/libarrow_fs.pxd +++ b/python/pyarrow/includes/libarrow_fs.pxd @@ -211,6 +211,7 @@ cdef extern from "arrow/filesystem/api.h" namespace "arrow::fs" nogil: const CS3GlobalOptions& options) cdef CStatus CEnsureS3Initialized "arrow::fs::EnsureS3Initialized"() cdef CStatus CFinalizeS3 "arrow::fs::FinalizeS3"() + cdef CStatus CEnsureS3Finalized "arrow::fs::EnsureS3Finalized"() cdef CResult[c_string] ResolveS3BucketRegion(const c_string& bucket) From 58097e91a305ea05aa7efc24db097a5289b514e4 Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Tue, 24 Oct 2023 03:21:48 -0700 Subject: [PATCH 3/4] Remove unnecessary `_initialize_s3` method --- python/pyarrow/_s3fs.pyx | 18 ------------------ 1 file changed, 18 deletions(-) diff --git a/python/pyarrow/_s3fs.pyx b/python/pyarrow/_s3fs.pyx index 7e44b9fc353..f7f2a5f8088 100644 --- a/python/pyarrow/_s3fs.pyx +++ b/python/pyarrow/_s3fs.pyx @@ -271,24 +271,6 @@ cdef class S3FileSystem(FileSystem): retry_strategy: S3RetryStrategy = AwsStandardS3RetryStrategy(max_attempts=3)): ensure_s3_initialized() - self._initialize_s3(access_key=access_key, secret_key=secret_key, session_token=session_token, - anonymous=anonymous, region=region, request_timeout=request_timeout, - connect_timeout=connect_timeout, scheme=scheme, endpoint_override=endpoint_override, - background_writes=background_writes, default_metadata=default_metadata, - role_arn=role_arn, session_name=session_name, external_id=external_id, - load_frequency=load_frequency, proxy_options=proxy_options, - allow_bucket_creation=allow_bucket_creation, allow_bucket_deletion=allow_bucket_deletion, - retry_strategy=retry_strategy) - - def _initialize_s3(self, *, access_key=None, secret_key=None, session_token=None, - bint anonymous=False, region=None, request_timeout=None, - connect_timeout=None, scheme=None, endpoint_override=None, - bint background_writes=True, default_metadata=None, - role_arn=None, session_name=None, external_id=None, - load_frequency=900, proxy_options=None, - allow_bucket_creation=False, allow_bucket_deletion=False, - retry_strategy: S3RetryStrategy = AwsStandardS3RetryStrategy(max_attempts=3)): - cdef: optional[CS3Options] options shared_ptr[CS3FileSystem] wrapped From a164f250aae579a487304cbcd3d974af662e39e4 Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Tue, 24 Oct 2023 17:20:00 +0200 Subject: [PATCH 4/4] Add comment --- python/pyarrow/fs.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/python/pyarrow/fs.py b/python/pyarrow/fs.py index 4cb723f53eb..ead750ca44e 100644 --- a/python/pyarrow/fs.py +++ b/python/pyarrow/fs.py @@ -58,6 +58,10 @@ except ImportError: _not_imported.append("S3FileSystem") else: + # GH-38364: we don't initialize S3 eagerly as that could lead + # to crashes at shutdown even when S3 isn't used. + # Instead, S3 is initialized lazily using `ensure_s3_initialized` + # in assorted places. import atexit atexit.register(ensure_s3_finalized)