From f5a2092a1bfc685ba28b9ac76fa5974f6ef7d57a Mon Sep 17 00:00:00 2001 From: Noah Stapp Date: Tue, 25 Jun 2024 11:39:05 -0700 Subject: [PATCH 1/3] PYTHON-4499 Log pymongo.connection even without CMAP --- pymongo/asynchronous/pool.py | 302 ++++++++++++++++--------------- pymongo/asynchronous/topology.py | 1 + pymongo/synchronous/pool.py | 302 ++++++++++++++++--------------- pymongo/synchronous/topology.py | 1 + 4 files changed, 318 insertions(+), 288 deletions(-) diff --git a/pymongo/asynchronous/pool.py b/pymongo/asynchronous/pool.py index 3cd7485a92..eda2a16428 100644 --- a/pymongo/asynchronous/pool.py +++ b/pymongo/asynchronous/pool.py @@ -518,6 +518,7 @@ class PoolOptions: "__server_api", "__load_balanced", "__credentials", + "__is_monitor", ) def __init__( @@ -539,6 +540,7 @@ def __init__( server_api: Optional[ServerApi] = None, load_balanced: Optional[bool] = None, credentials: Optional[MongoCredential] = None, + is_monitor: Optional[bool] = False, ): self.__max_pool_size = max_pool_size self.__min_pool_size = min_pool_size @@ -558,6 +560,7 @@ def __init__( self.__load_balanced = load_balanced self.__credentials = credentials self.__metadata = copy.deepcopy(_METADATA) + self.__is_monitor = is_monitor if appname: self.__metadata["application"] = {"name": appname} @@ -717,6 +720,11 @@ def load_balanced(self) -> Optional[bool]: """True if this Pool is configured in load balanced mode.""" return self.__load_balanced + @property + def is_monitor(self) -> Optional[bool]: + """True if this Pool is owned by a Monitor.""" + return self.__is_monitor + class _CancellationContext: def __init__(self) -> None: @@ -762,6 +770,7 @@ def __init__( self.op_msg_enabled = False self.listeners = pool.opts._event_listeners self.enabled_for_cmap = pool.enabled_for_cmap + self.enabled_for_logging = pool.enabled_for_cmap self.compression_settings = pool.opts._compression_settings self.compression_context: Union[SnappyContext, ZlibContext, ZstdContext, None] = None self.socket_checker: SocketChecker = SocketChecker() @@ -1125,20 +1134,20 @@ async def authenticate(self, reauthenticate: bool = False) -> None: await auth.authenticate(creds, self, reauthenticate=reauthenticate) self.ready = True + duration = time.monotonic() - self.creation_time if self.enabled_for_cmap: assert self.listeners is not None - duration = time.monotonic() - self.creation_time self.listeners.publish_connection_ready(self.address, self.id, duration) - if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _CONNECTION_LOGGER, - clientId=self._client_id, - message=_ConnectionStatusMessage.CONN_READY, - serverHost=self.address[0], - serverPort=self.address[1], - driverConnectionId=self.id, - durationMS=duration, - ) + if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): + _debug_log( + _CONNECTION_LOGGER, + clientId=self._client_id, + message=_ConnectionStatusMessage.CONN_READY, + serverHost=self.address[0], + serverPort=self.address[1], + driverConnectionId=self.id, + durationMS=duration, + ) def validate_session( self, client: Optional[AsyncMongoClient], session: Optional[ClientSession] @@ -1158,10 +1167,11 @@ def close_conn(self, reason: Optional[str]) -> None: if self.closed: return self._close_conn() - if reason and self.enabled_for_cmap: - assert self.listeners is not None - self.listeners.publish_connection_closed(self.address, self.id, reason) - if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): + if reason: + if self.enabled_for_cmap: + assert self.listeners is not None + self.listeners.publish_connection_closed(self.address, self.id, reason) + if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): _debug_log( _CONNECTION_LOGGER, clientId=self._client_id, @@ -1473,12 +1483,13 @@ def __init__( self.address = address self.opts = options self.handshake = handshake - # Don't publish events in Monitor pools. + # Don't publish events or logs in Monitor pools. self.enabled_for_cmap = ( self.handshake and self.opts._event_listeners is not None and self.opts._event_listeners.enabled_for_cmap ) + self.enabled_for_logging = self.handshake and not self.opts.is_monitor # The first portion of the wait queue. # Enforces: maxPoolSize @@ -1500,15 +1511,15 @@ def __init__( self.opts._event_listeners.publish_pool_created( self.address, self.opts.non_default_options ) - if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _CONNECTION_LOGGER, - clientId=self._client_id, - message=_ConnectionStatusMessage.POOL_CREATED, - serverHost=self.address[0], - serverPort=self.address[1], - **self.opts.non_default_options, - ) + if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): + _debug_log( + _CONNECTION_LOGGER, + clientId=self._client_id, + message=_ConnectionStatusMessage.POOL_CREATED, + serverHost=self.address[0], + serverPort=self.address[1], + **self.opts.non_default_options, + ) # Similar to active_sockets but includes threads in the wait queue. self.operation_count: int = 0 # Retain references to pinned connections to prevent the CPython GC @@ -1526,14 +1537,14 @@ async def ready(self) -> None: if self.enabled_for_cmap: assert self.opts._event_listeners is not None self.opts._event_listeners.publish_pool_ready(self.address) - if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _CONNECTION_LOGGER, - clientId=self._client_id, - message=_ConnectionStatusMessage.POOL_READY, - serverHost=self.address[0], - serverPort=self.address[1], - ) + if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): + _debug_log( + _CONNECTION_LOGGER, + clientId=self._client_id, + message=_ConnectionStatusMessage.POOL_READY, + serverHost=self.address[0], + serverPort=self.address[1], + ) @property def closed(self) -> bool: @@ -1591,23 +1602,24 @@ async def _reset( if self.enabled_for_cmap: assert listeners is not None listeners.publish_pool_closed(self.address) - if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _CONNECTION_LOGGER, - clientId=self._client_id, - message=_ConnectionStatusMessage.POOL_CLOSED, - serverHost=self.address[0], - serverPort=self.address[1], - ) - else: - if old_state != PoolState.PAUSED and self.enabled_for_cmap: - assert listeners is not None - listeners.publish_pool_cleared( - self.address, - service_id=service_id, - interrupt_connections=interrupt_connections, + if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): + _debug_log( + _CONNECTION_LOGGER, + clientId=self._client_id, + message=_ConnectionStatusMessage.POOL_CLOSED, + serverHost=self.address[0], + serverPort=self.address[1], ) - if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): + else: + if old_state != PoolState.PAUSED: + if self.enabled_for_cmap: + assert listeners is not None + listeners.publish_pool_cleared( + self.address, + service_id=service_id, + interrupt_connections=interrupt_connections, + ) + if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): _debug_log( _CONNECTION_LOGGER, clientId=self._client_id, @@ -1717,15 +1729,15 @@ async def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> C if self.enabled_for_cmap: assert listeners is not None listeners.publish_connection_created(self.address, conn_id) - if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _CONNECTION_LOGGER, - clientId=self._client_id, - message=_ConnectionStatusMessage.CONN_CREATED, - serverHost=self.address[0], - serverPort=self.address[1], - driverConnectionId=conn_id, - ) + if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): + _debug_log( + _CONNECTION_LOGGER, + clientId=self._client_id, + message=_ConnectionStatusMessage.CONN_CREATED, + serverHost=self.address[0], + serverPort=self.address[1], + driverConnectionId=conn_id, + ) try: sock = await _configured_socket(self.address, self.opts) @@ -1735,17 +1747,17 @@ async def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> C listeners.publish_connection_closed( self.address, conn_id, ConnectionClosedReason.ERROR ) - if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _CONNECTION_LOGGER, - clientId=self._client_id, - message=_ConnectionStatusMessage.CONN_CLOSED, - serverHost=self.address[0], - serverPort=self.address[1], - driverConnectionId=conn_id, - reason=_verbose_connection_error_reason(ConnectionClosedReason.ERROR), - error=ConnectionClosedReason.ERROR, - ) + if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): + _debug_log( + _CONNECTION_LOGGER, + clientId=self._client_id, + message=_ConnectionStatusMessage.CONN_CLOSED, + serverHost=self.address[0], + serverPort=self.address[1], + driverConnectionId=conn_id, + reason=_verbose_connection_error_reason(ConnectionClosedReason.ERROR), + error=ConnectionClosedReason.ERROR, + ) if isinstance(error, (IOError, OSError, SSLError)): details = _get_timeout_details(self.opts) _raise_connection_failure(self.address, error, timeout_details=details) @@ -1793,31 +1805,31 @@ async def checkout( if self.enabled_for_cmap: assert listeners is not None listeners.publish_connection_check_out_started(self.address) - if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _CONNECTION_LOGGER, - clientId=self._client_id, - message=_ConnectionStatusMessage.CHECKOUT_STARTED, - serverHost=self.address[0], - serverPort=self.address[1], - ) + if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): + _debug_log( + _CONNECTION_LOGGER, + clientId=self._client_id, + message=_ConnectionStatusMessage.CHECKOUT_STARTED, + serverHost=self.address[0], + serverPort=self.address[1], + ) conn = await self._get_conn(checkout_started_time, handler=handler) + duration = time.monotonic() - checkout_started_time if self.enabled_for_cmap: assert listeners is not None - duration = time.monotonic() - checkout_started_time listeners.publish_connection_checked_out(self.address, conn.id, duration) - if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _CONNECTION_LOGGER, - clientId=self._client_id, - message=_ConnectionStatusMessage.CHECKOUT_SUCCEEDED, - serverHost=self.address[0], - serverPort=self.address[1], - driverConnectionId=conn.id, - durationMS=duration, - ) + if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): + _debug_log( + _CONNECTION_LOGGER, + clientId=self._client_id, + message=_ConnectionStatusMessage.CHECKOUT_SUCCEEDED, + serverHost=self.address[0], + serverPort=self.address[1], + driverConnectionId=conn.id, + durationMS=duration, + ) try: async with self.lock: self.active_contexts.add(conn.cancel_context) @@ -1849,13 +1861,14 @@ async def checkout( def _raise_if_not_ready(self, checkout_started_time: float, emit_event: bool) -> None: if self.state != PoolState.READY: - if self.enabled_for_cmap and emit_event: - assert self.opts._event_listeners is not None + if emit_event: duration = time.monotonic() - checkout_started_time - self.opts._event_listeners.publish_connection_check_out_failed( - self.address, ConnectionCheckOutFailedReason.CONN_ERROR, duration - ) - if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): + if self.enabled_for_cmap: + assert self.opts._event_listeners is not None + self.opts._event_listeners.publish_connection_check_out_failed( + self.address, ConnectionCheckOutFailedReason.CONN_ERROR, duration + ) + if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): _debug_log( _CONNECTION_LOGGER, clientId=self._client_id, @@ -1883,23 +1896,23 @@ async def _get_conn( await self.reset_without_pause() if self.closed: + duration = time.monotonic() - checkout_started_time if self.enabled_for_cmap: assert self.opts._event_listeners is not None - duration = time.monotonic() - checkout_started_time self.opts._event_listeners.publish_connection_check_out_failed( self.address, ConnectionCheckOutFailedReason.POOL_CLOSED, duration ) - if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _CONNECTION_LOGGER, - clientId=self._client_id, - message=_ConnectionStatusMessage.CHECKOUT_FAILED, - serverHost=self.address[0], - serverPort=self.address[1], - reason="Connection pool was closed", - error=ConnectionCheckOutFailedReason.POOL_CLOSED, - durationMS=duration, - ) + if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): + _debug_log( + _CONNECTION_LOGGER, + clientId=self._client_id, + message=_ConnectionStatusMessage.CHECKOUT_FAILED, + serverHost=self.address[0], + serverPort=self.address[1], + reason="Connection pool was closed", + error=ConnectionCheckOutFailedReason.POOL_CLOSED, + durationMS=duration, + ) raise _PoolClosedError( "Attempted to check out a connection from closed connection pool" ) @@ -1975,13 +1988,14 @@ async def _get_conn( self.active_sockets -= 1 self.size_cond.notify() - if self.enabled_for_cmap and not emitted_event: - assert self.opts._event_listeners is not None + if not emitted_event: duration = time.monotonic() - checkout_started_time - self.opts._event_listeners.publish_connection_check_out_failed( - self.address, ConnectionCheckOutFailedReason.CONN_ERROR, duration - ) - if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): + if self.enabled_for_cmap: + assert self.opts._event_listeners is not None + self.opts._event_listeners.publish_connection_check_out_failed( + self.address, ConnectionCheckOutFailedReason.CONN_ERROR, duration + ) + if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): _debug_log( _CONNECTION_LOGGER, clientId=self._client_id, @@ -2014,15 +2028,15 @@ async def checkin(self, conn: Connection) -> None: if self.enabled_for_cmap: assert listeners is not None listeners.publish_connection_checked_in(self.address, conn.id) - if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _CONNECTION_LOGGER, - clientId=self._client_id, - message=_ConnectionStatusMessage.CHECKEDIN, - serverHost=self.address[0], - serverPort=self.address[1], - driverConnectionId=conn.id, - ) + if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): + _debug_log( + _CONNECTION_LOGGER, + clientId=self._client_id, + message=_ConnectionStatusMessage.CHECKEDIN, + serverHost=self.address[0], + serverPort=self.address[1], + driverConnectionId=conn.id, + ) if self.pid != os.getpid(): await self.reset_without_pause() else: @@ -2035,17 +2049,17 @@ async def checkin(self, conn: Connection) -> None: listeners.publish_connection_closed( self.address, conn.id, ConnectionClosedReason.ERROR ) - if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _CONNECTION_LOGGER, - clientId=self._client_id, - message=_ConnectionStatusMessage.CONN_CLOSED, - serverHost=self.address[0], - serverPort=self.address[1], - driverConnectionId=conn.id, - reason=_verbose_connection_error_reason(ConnectionClosedReason.ERROR), - error=ConnectionClosedReason.ERROR, - ) + if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): + _debug_log( + _CONNECTION_LOGGER, + clientId=self._client_id, + message=_ConnectionStatusMessage.CONN_CLOSED, + serverHost=self.address[0], + serverPort=self.address[1], + driverConnectionId=conn.id, + reason=_verbose_connection_error_reason(ConnectionClosedReason.ERROR), + error=ConnectionClosedReason.ERROR, + ) else: async with self.lock: # Hold the lock to ensure this section does not race with @@ -2107,23 +2121,23 @@ def _perished(self, conn: Connection) -> bool: def _raise_wait_queue_timeout(self, checkout_started_time: float) -> NoReturn: listeners = self.opts._event_listeners + duration = time.monotonic() - checkout_started_time if self.enabled_for_cmap: assert listeners is not None - duration = time.monotonic() - checkout_started_time listeners.publish_connection_check_out_failed( self.address, ConnectionCheckOutFailedReason.TIMEOUT, duration ) - if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _CONNECTION_LOGGER, - clientId=self._client_id, - message=_ConnectionStatusMessage.CHECKOUT_FAILED, - serverHost=self.address[0], - serverPort=self.address[1], - reason="Wait queue timeout elapsed without a connection becoming available", - error=ConnectionCheckOutFailedReason.TIMEOUT, - durationMS=duration, - ) + if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): + _debug_log( + _CONNECTION_LOGGER, + clientId=self._client_id, + message=_ConnectionStatusMessage.CHECKOUT_FAILED, + serverHost=self.address[0], + serverPort=self.address[1], + reason="Wait queue timeout elapsed without a connection becoming available", + error=ConnectionCheckOutFailedReason.TIMEOUT, + durationMS=duration, + ) timeout = _csot.get_timeout() or self.opts.wait_queue_timeout if self.opts.load_balanced: other_ops = self.active_sockets - self.ncursors - self.ntxns diff --git a/pymongo/asynchronous/topology.py b/pymongo/asynchronous/topology.py index ac578113b2..9cc609830b 100644 --- a/pymongo/asynchronous/topology.py +++ b/pymongo/asynchronous/topology.py @@ -885,6 +885,7 @@ def _create_pool_for_monitor(self, address: _Address) -> Pool: driver=options.driver, pause_enabled=False, server_api=options.server_api, + is_monitor=True, ) return self._settings.pool_class( diff --git a/pymongo/synchronous/pool.py b/pymongo/synchronous/pool.py index b71cec2203..1940de61d0 100644 --- a/pymongo/synchronous/pool.py +++ b/pymongo/synchronous/pool.py @@ -518,6 +518,7 @@ class PoolOptions: "__server_api", "__load_balanced", "__credentials", + "__is_monitor", ) def __init__( @@ -539,6 +540,7 @@ def __init__( server_api: Optional[ServerApi] = None, load_balanced: Optional[bool] = None, credentials: Optional[MongoCredential] = None, + is_monitor: Optional[bool] = False, ): self.__max_pool_size = max_pool_size self.__min_pool_size = min_pool_size @@ -558,6 +560,7 @@ def __init__( self.__load_balanced = load_balanced self.__credentials = credentials self.__metadata = copy.deepcopy(_METADATA) + self.__is_monitor = is_monitor if appname: self.__metadata["application"] = {"name": appname} @@ -717,6 +720,11 @@ def load_balanced(self) -> Optional[bool]: """True if this Pool is configured in load balanced mode.""" return self.__load_balanced + @property + def is_monitor(self) -> Optional[bool]: + """True if this Pool is owned by a Monitor.""" + return self.__is_monitor + class _CancellationContext: def __init__(self) -> None: @@ -762,6 +770,7 @@ def __init__( self.op_msg_enabled = False self.listeners = pool.opts._event_listeners self.enabled_for_cmap = pool.enabled_for_cmap + self.enabled_for_logging = pool.enabled_for_cmap self.compression_settings = pool.opts._compression_settings self.compression_context: Union[SnappyContext, ZlibContext, ZstdContext, None] = None self.socket_checker: SocketChecker = SocketChecker() @@ -1125,20 +1134,20 @@ def authenticate(self, reauthenticate: bool = False) -> None: auth.authenticate(creds, self, reauthenticate=reauthenticate) self.ready = True + duration = time.monotonic() - self.creation_time if self.enabled_for_cmap: assert self.listeners is not None - duration = time.monotonic() - self.creation_time self.listeners.publish_connection_ready(self.address, self.id, duration) - if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _CONNECTION_LOGGER, - clientId=self._client_id, - message=_ConnectionStatusMessage.CONN_READY, - serverHost=self.address[0], - serverPort=self.address[1], - driverConnectionId=self.id, - durationMS=duration, - ) + if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): + _debug_log( + _CONNECTION_LOGGER, + clientId=self._client_id, + message=_ConnectionStatusMessage.CONN_READY, + serverHost=self.address[0], + serverPort=self.address[1], + driverConnectionId=self.id, + durationMS=duration, + ) def validate_session( self, client: Optional[MongoClient], session: Optional[ClientSession] @@ -1156,10 +1165,11 @@ def close_conn(self, reason: Optional[str]) -> None: if self.closed: return self._close_conn() - if reason and self.enabled_for_cmap: - assert self.listeners is not None - self.listeners.publish_connection_closed(self.address, self.id, reason) - if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): + if reason: + if self.enabled_for_cmap: + assert self.listeners is not None + self.listeners.publish_connection_closed(self.address, self.id, reason) + if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): _debug_log( _CONNECTION_LOGGER, clientId=self._client_id, @@ -1469,12 +1479,13 @@ def __init__( self.address = address self.opts = options self.handshake = handshake - # Don't publish events in Monitor pools. + # Don't publish events or logs in Monitor pools. self.enabled_for_cmap = ( self.handshake and self.opts._event_listeners is not None and self.opts._event_listeners.enabled_for_cmap ) + self.enabled_for_logging = self.handshake and not self.opts.is_monitor # The first portion of the wait queue. # Enforces: maxPoolSize @@ -1496,15 +1507,15 @@ def __init__( self.opts._event_listeners.publish_pool_created( self.address, self.opts.non_default_options ) - if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _CONNECTION_LOGGER, - clientId=self._client_id, - message=_ConnectionStatusMessage.POOL_CREATED, - serverHost=self.address[0], - serverPort=self.address[1], - **self.opts.non_default_options, - ) + if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): + _debug_log( + _CONNECTION_LOGGER, + clientId=self._client_id, + message=_ConnectionStatusMessage.POOL_CREATED, + serverHost=self.address[0], + serverPort=self.address[1], + **self.opts.non_default_options, + ) # Similar to active_sockets but includes threads in the wait queue. self.operation_count: int = 0 # Retain references to pinned connections to prevent the CPython GC @@ -1522,14 +1533,14 @@ def ready(self) -> None: if self.enabled_for_cmap: assert self.opts._event_listeners is not None self.opts._event_listeners.publish_pool_ready(self.address) - if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _CONNECTION_LOGGER, - clientId=self._client_id, - message=_ConnectionStatusMessage.POOL_READY, - serverHost=self.address[0], - serverPort=self.address[1], - ) + if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): + _debug_log( + _CONNECTION_LOGGER, + clientId=self._client_id, + message=_ConnectionStatusMessage.POOL_READY, + serverHost=self.address[0], + serverPort=self.address[1], + ) @property def closed(self) -> bool: @@ -1587,23 +1598,24 @@ def _reset( if self.enabled_for_cmap: assert listeners is not None listeners.publish_pool_closed(self.address) - if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _CONNECTION_LOGGER, - clientId=self._client_id, - message=_ConnectionStatusMessage.POOL_CLOSED, - serverHost=self.address[0], - serverPort=self.address[1], - ) - else: - if old_state != PoolState.PAUSED and self.enabled_for_cmap: - assert listeners is not None - listeners.publish_pool_cleared( - self.address, - service_id=service_id, - interrupt_connections=interrupt_connections, + if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): + _debug_log( + _CONNECTION_LOGGER, + clientId=self._client_id, + message=_ConnectionStatusMessage.POOL_CLOSED, + serverHost=self.address[0], + serverPort=self.address[1], ) - if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): + else: + if old_state != PoolState.PAUSED: + if self.enabled_for_cmap: + assert listeners is not None + listeners.publish_pool_cleared( + self.address, + service_id=service_id, + interrupt_connections=interrupt_connections, + ) + if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): _debug_log( _CONNECTION_LOGGER, clientId=self._client_id, @@ -1711,15 +1723,15 @@ def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> Connect if self.enabled_for_cmap: assert listeners is not None listeners.publish_connection_created(self.address, conn_id) - if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _CONNECTION_LOGGER, - clientId=self._client_id, - message=_ConnectionStatusMessage.CONN_CREATED, - serverHost=self.address[0], - serverPort=self.address[1], - driverConnectionId=conn_id, - ) + if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): + _debug_log( + _CONNECTION_LOGGER, + clientId=self._client_id, + message=_ConnectionStatusMessage.CONN_CREATED, + serverHost=self.address[0], + serverPort=self.address[1], + driverConnectionId=conn_id, + ) try: sock = _configured_socket(self.address, self.opts) @@ -1729,17 +1741,17 @@ def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> Connect listeners.publish_connection_closed( self.address, conn_id, ConnectionClosedReason.ERROR ) - if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _CONNECTION_LOGGER, - clientId=self._client_id, - message=_ConnectionStatusMessage.CONN_CLOSED, - serverHost=self.address[0], - serverPort=self.address[1], - driverConnectionId=conn_id, - reason=_verbose_connection_error_reason(ConnectionClosedReason.ERROR), - error=ConnectionClosedReason.ERROR, - ) + if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): + _debug_log( + _CONNECTION_LOGGER, + clientId=self._client_id, + message=_ConnectionStatusMessage.CONN_CLOSED, + serverHost=self.address[0], + serverPort=self.address[1], + driverConnectionId=conn_id, + reason=_verbose_connection_error_reason(ConnectionClosedReason.ERROR), + error=ConnectionClosedReason.ERROR, + ) if isinstance(error, (IOError, OSError, SSLError)): details = _get_timeout_details(self.opts) _raise_connection_failure(self.address, error, timeout_details=details) @@ -1787,31 +1799,31 @@ def checkout( if self.enabled_for_cmap: assert listeners is not None listeners.publish_connection_check_out_started(self.address) - if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _CONNECTION_LOGGER, - clientId=self._client_id, - message=_ConnectionStatusMessage.CHECKOUT_STARTED, - serverHost=self.address[0], - serverPort=self.address[1], - ) + if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): + _debug_log( + _CONNECTION_LOGGER, + clientId=self._client_id, + message=_ConnectionStatusMessage.CHECKOUT_STARTED, + serverHost=self.address[0], + serverPort=self.address[1], + ) conn = self._get_conn(checkout_started_time, handler=handler) + duration = time.monotonic() - checkout_started_time if self.enabled_for_cmap: assert listeners is not None - duration = time.monotonic() - checkout_started_time listeners.publish_connection_checked_out(self.address, conn.id, duration) - if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _CONNECTION_LOGGER, - clientId=self._client_id, - message=_ConnectionStatusMessage.CHECKOUT_SUCCEEDED, - serverHost=self.address[0], - serverPort=self.address[1], - driverConnectionId=conn.id, - durationMS=duration, - ) + if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): + _debug_log( + _CONNECTION_LOGGER, + clientId=self._client_id, + message=_ConnectionStatusMessage.CHECKOUT_SUCCEEDED, + serverHost=self.address[0], + serverPort=self.address[1], + driverConnectionId=conn.id, + durationMS=duration, + ) try: with self.lock: self.active_contexts.add(conn.cancel_context) @@ -1843,13 +1855,14 @@ def checkout( def _raise_if_not_ready(self, checkout_started_time: float, emit_event: bool) -> None: if self.state != PoolState.READY: - if self.enabled_for_cmap and emit_event: - assert self.opts._event_listeners is not None + if emit_event: duration = time.monotonic() - checkout_started_time - self.opts._event_listeners.publish_connection_check_out_failed( - self.address, ConnectionCheckOutFailedReason.CONN_ERROR, duration - ) - if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): + if self.enabled_for_cmap: + assert self.opts._event_listeners is not None + self.opts._event_listeners.publish_connection_check_out_failed( + self.address, ConnectionCheckOutFailedReason.CONN_ERROR, duration + ) + if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): _debug_log( _CONNECTION_LOGGER, clientId=self._client_id, @@ -1877,23 +1890,23 @@ def _get_conn( self.reset_without_pause() if self.closed: + duration = time.monotonic() - checkout_started_time if self.enabled_for_cmap: assert self.opts._event_listeners is not None - duration = time.monotonic() - checkout_started_time self.opts._event_listeners.publish_connection_check_out_failed( self.address, ConnectionCheckOutFailedReason.POOL_CLOSED, duration ) - if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _CONNECTION_LOGGER, - clientId=self._client_id, - message=_ConnectionStatusMessage.CHECKOUT_FAILED, - serverHost=self.address[0], - serverPort=self.address[1], - reason="Connection pool was closed", - error=ConnectionCheckOutFailedReason.POOL_CLOSED, - durationMS=duration, - ) + if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): + _debug_log( + _CONNECTION_LOGGER, + clientId=self._client_id, + message=_ConnectionStatusMessage.CHECKOUT_FAILED, + serverHost=self.address[0], + serverPort=self.address[1], + reason="Connection pool was closed", + error=ConnectionCheckOutFailedReason.POOL_CLOSED, + durationMS=duration, + ) raise _PoolClosedError( "Attempted to check out a connection from closed connection pool" ) @@ -1969,13 +1982,14 @@ def _get_conn( self.active_sockets -= 1 self.size_cond.notify() - if self.enabled_for_cmap and not emitted_event: - assert self.opts._event_listeners is not None + if not emitted_event: duration = time.monotonic() - checkout_started_time - self.opts._event_listeners.publish_connection_check_out_failed( - self.address, ConnectionCheckOutFailedReason.CONN_ERROR, duration - ) - if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): + if self.enabled_for_cmap: + assert self.opts._event_listeners is not None + self.opts._event_listeners.publish_connection_check_out_failed( + self.address, ConnectionCheckOutFailedReason.CONN_ERROR, duration + ) + if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): _debug_log( _CONNECTION_LOGGER, clientId=self._client_id, @@ -2008,15 +2022,15 @@ def checkin(self, conn: Connection) -> None: if self.enabled_for_cmap: assert listeners is not None listeners.publish_connection_checked_in(self.address, conn.id) - if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _CONNECTION_LOGGER, - clientId=self._client_id, - message=_ConnectionStatusMessage.CHECKEDIN, - serverHost=self.address[0], - serverPort=self.address[1], - driverConnectionId=conn.id, - ) + if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): + _debug_log( + _CONNECTION_LOGGER, + clientId=self._client_id, + message=_ConnectionStatusMessage.CHECKEDIN, + serverHost=self.address[0], + serverPort=self.address[1], + driverConnectionId=conn.id, + ) if self.pid != os.getpid(): self.reset_without_pause() else: @@ -2029,17 +2043,17 @@ def checkin(self, conn: Connection) -> None: listeners.publish_connection_closed( self.address, conn.id, ConnectionClosedReason.ERROR ) - if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _CONNECTION_LOGGER, - clientId=self._client_id, - message=_ConnectionStatusMessage.CONN_CLOSED, - serverHost=self.address[0], - serverPort=self.address[1], - driverConnectionId=conn.id, - reason=_verbose_connection_error_reason(ConnectionClosedReason.ERROR), - error=ConnectionClosedReason.ERROR, - ) + if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): + _debug_log( + _CONNECTION_LOGGER, + clientId=self._client_id, + message=_ConnectionStatusMessage.CONN_CLOSED, + serverHost=self.address[0], + serverPort=self.address[1], + driverConnectionId=conn.id, + reason=_verbose_connection_error_reason(ConnectionClosedReason.ERROR), + error=ConnectionClosedReason.ERROR, + ) else: with self.lock: # Hold the lock to ensure this section does not race with @@ -2101,23 +2115,23 @@ def _perished(self, conn: Connection) -> bool: def _raise_wait_queue_timeout(self, checkout_started_time: float) -> NoReturn: listeners = self.opts._event_listeners + duration = time.monotonic() - checkout_started_time if self.enabled_for_cmap: assert listeners is not None - duration = time.monotonic() - checkout_started_time listeners.publish_connection_check_out_failed( self.address, ConnectionCheckOutFailedReason.TIMEOUT, duration ) - if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _CONNECTION_LOGGER, - clientId=self._client_id, - message=_ConnectionStatusMessage.CHECKOUT_FAILED, - serverHost=self.address[0], - serverPort=self.address[1], - reason="Wait queue timeout elapsed without a connection becoming available", - error=ConnectionCheckOutFailedReason.TIMEOUT, - durationMS=duration, - ) + if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): + _debug_log( + _CONNECTION_LOGGER, + clientId=self._client_id, + message=_ConnectionStatusMessage.CHECKOUT_FAILED, + serverHost=self.address[0], + serverPort=self.address[1], + reason="Wait queue timeout elapsed without a connection becoming available", + error=ConnectionCheckOutFailedReason.TIMEOUT, + durationMS=duration, + ) timeout = _csot.get_timeout() or self.opts.wait_queue_timeout if self.opts.load_balanced: other_ops = self.active_sockets - self.ncursors - self.ntxns diff --git a/pymongo/synchronous/topology.py b/pymongo/synchronous/topology.py index 6c8cd88702..35c6c1cab4 100644 --- a/pymongo/synchronous/topology.py +++ b/pymongo/synchronous/topology.py @@ -883,6 +883,7 @@ def _create_pool_for_monitor(self, address: _Address) -> Pool: driver=options.driver, pause_enabled=False, server_api=options.server_api, + is_monitor=True, ) return self._settings.pool_class( From 49c5f010fc566c1c536f02d273ac35cbd836abd8 Mon Sep 17 00:00:00 2001 From: Noah Stapp Date: Tue, 25 Jun 2024 12:29:04 -0700 Subject: [PATCH 2/3] Add test --- test/test_logger.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/test/test_logger.py b/test/test_logger.py index d1f84a8441..60abcadc4c 100644 --- a/test/test_logger.py +++ b/test/test_logger.py @@ -16,6 +16,7 @@ import os from test import unittest from test.test_client import IntegrationTest +from test.utils import single_client from unittest.mock import patch from bson import json_util @@ -82,6 +83,19 @@ def test_truncation_multi_byte_codepoints(self): self.assertEqual(last_3_bytes, str_to_repeat) + def test_logging_without_listeners(self): + c = single_client() + self.assertEqual(len(c._event_listeners.event_listeners()), 0) + with self.assertLogs("pymongo.connection", level="DEBUG") as cm: + c.db.test.insert_one({"x": "1"}) + self.assertGreater(len(cm.records), 0) + with self.assertLogs("pymongo.command", level="DEBUG") as cm: + c.db.test.insert_one({"x": "1"}) + self.assertGreater(len(cm.records), 0) + with self.assertLogs("pymongo.serverSelection", level="DEBUG") as cm: + c.db.test.insert_one({"x": "1"}) + self.assertGreater(len(cm.records), 0) + if __name__ == "__main__": unittest.main() From 110970f5ed6b456a4eb672b5e5abf4baebf31693 Mon Sep 17 00:00:00 2001 From: Noah Stapp Date: Tue, 25 Jun 2024 12:41:50 -0700 Subject: [PATCH 3/3] Remove is_monitor --- pymongo/asynchronous/pool.py | 12 ++---------- pymongo/asynchronous/topology.py | 1 - pymongo/synchronous/pool.py | 12 ++---------- pymongo/synchronous/topology.py | 1 - 4 files changed, 4 insertions(+), 22 deletions(-) diff --git a/pymongo/asynchronous/pool.py b/pymongo/asynchronous/pool.py index eda2a16428..5a8d9ecdfc 100644 --- a/pymongo/asynchronous/pool.py +++ b/pymongo/asynchronous/pool.py @@ -518,7 +518,6 @@ class PoolOptions: "__server_api", "__load_balanced", "__credentials", - "__is_monitor", ) def __init__( @@ -540,7 +539,6 @@ def __init__( server_api: Optional[ServerApi] = None, load_balanced: Optional[bool] = None, credentials: Optional[MongoCredential] = None, - is_monitor: Optional[bool] = False, ): self.__max_pool_size = max_pool_size self.__min_pool_size = min_pool_size @@ -560,7 +558,6 @@ def __init__( self.__load_balanced = load_balanced self.__credentials = credentials self.__metadata = copy.deepcopy(_METADATA) - self.__is_monitor = is_monitor if appname: self.__metadata["application"] = {"name": appname} @@ -720,11 +717,6 @@ def load_balanced(self) -> Optional[bool]: """True if this Pool is configured in load balanced mode.""" return self.__load_balanced - @property - def is_monitor(self) -> Optional[bool]: - """True if this Pool is owned by a Monitor.""" - return self.__is_monitor - class _CancellationContext: def __init__(self) -> None: @@ -770,7 +762,7 @@ def __init__( self.op_msg_enabled = False self.listeners = pool.opts._event_listeners self.enabled_for_cmap = pool.enabled_for_cmap - self.enabled_for_logging = pool.enabled_for_cmap + self.enabled_for_logging = pool.enabled_for_logging self.compression_settings = pool.opts._compression_settings self.compression_context: Union[SnappyContext, ZlibContext, ZstdContext, None] = None self.socket_checker: SocketChecker = SocketChecker() @@ -1489,7 +1481,7 @@ def __init__( and self.opts._event_listeners is not None and self.opts._event_listeners.enabled_for_cmap ) - self.enabled_for_logging = self.handshake and not self.opts.is_monitor + self.enabled_for_logging = self.handshake # The first portion of the wait queue. # Enforces: maxPoolSize diff --git a/pymongo/asynchronous/topology.py b/pymongo/asynchronous/topology.py index 9cc609830b..ac578113b2 100644 --- a/pymongo/asynchronous/topology.py +++ b/pymongo/asynchronous/topology.py @@ -885,7 +885,6 @@ def _create_pool_for_monitor(self, address: _Address) -> Pool: driver=options.driver, pause_enabled=False, server_api=options.server_api, - is_monitor=True, ) return self._settings.pool_class( diff --git a/pymongo/synchronous/pool.py b/pymongo/synchronous/pool.py index 1940de61d0..076a38ece1 100644 --- a/pymongo/synchronous/pool.py +++ b/pymongo/synchronous/pool.py @@ -518,7 +518,6 @@ class PoolOptions: "__server_api", "__load_balanced", "__credentials", - "__is_monitor", ) def __init__( @@ -540,7 +539,6 @@ def __init__( server_api: Optional[ServerApi] = None, load_balanced: Optional[bool] = None, credentials: Optional[MongoCredential] = None, - is_monitor: Optional[bool] = False, ): self.__max_pool_size = max_pool_size self.__min_pool_size = min_pool_size @@ -560,7 +558,6 @@ def __init__( self.__load_balanced = load_balanced self.__credentials = credentials self.__metadata = copy.deepcopy(_METADATA) - self.__is_monitor = is_monitor if appname: self.__metadata["application"] = {"name": appname} @@ -720,11 +717,6 @@ def load_balanced(self) -> Optional[bool]: """True if this Pool is configured in load balanced mode.""" return self.__load_balanced - @property - def is_monitor(self) -> Optional[bool]: - """True if this Pool is owned by a Monitor.""" - return self.__is_monitor - class _CancellationContext: def __init__(self) -> None: @@ -770,7 +762,7 @@ def __init__( self.op_msg_enabled = False self.listeners = pool.opts._event_listeners self.enabled_for_cmap = pool.enabled_for_cmap - self.enabled_for_logging = pool.enabled_for_cmap + self.enabled_for_logging = pool.enabled_for_logging self.compression_settings = pool.opts._compression_settings self.compression_context: Union[SnappyContext, ZlibContext, ZstdContext, None] = None self.socket_checker: SocketChecker = SocketChecker() @@ -1485,7 +1477,7 @@ def __init__( and self.opts._event_listeners is not None and self.opts._event_listeners.enabled_for_cmap ) - self.enabled_for_logging = self.handshake and not self.opts.is_monitor + self.enabled_for_logging = self.handshake # The first portion of the wait queue. # Enforces: maxPoolSize diff --git a/pymongo/synchronous/topology.py b/pymongo/synchronous/topology.py index 35c6c1cab4..6c8cd88702 100644 --- a/pymongo/synchronous/topology.py +++ b/pymongo/synchronous/topology.py @@ -883,7 +883,6 @@ def _create_pool_for_monitor(self, address: _Address) -> Pool: driver=options.driver, pause_enabled=False, server_api=options.server_api, - is_monitor=True, ) return self._settings.pool_class(