From 63034603894f1a660f68c5396c53a46b3f975a9e Mon Sep 17 00:00:00 2001 From: Markus Gerstel Date: Wed, 10 Nov 2021 15:07:15 +0000 Subject: [PATCH 1/9] add on-close callback --- src/workflows/transport/pika_transport.py | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/src/workflows/transport/pika_transport.py b/src/workflows/transport/pika_transport.py index b073205e..2cabf868 100644 --- a/src/workflows/transport/pika_transport.py +++ b/src/workflows/transport/pika_transport.py @@ -13,7 +13,6 @@ from enum import Enum, auto from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, Union -import pika import pika.exceptions from pika.adapters.blocking_connection import BlockingChannel @@ -1134,9 +1133,21 @@ def _get_shared_channel(self) -> BlockingChannel: if not self._pika_shared_channel: self._pika_shared_channel = self._connection.channel() + self._pika_shared_channel.add_on_close_callback( + self._callback_on_channel_close + ) + ##### self._pika_shared_channel.confirm_delivery() return self._pika_shared_channel + def _callback_on_channel_close( + self, channel: BlockingChannel, exception: pika.exceptions.ChannelClosed + ): + logger.error(f"Channel {channel} closed with {exception}") + self._please_stop.set() + if self._connection: + self._connection.add_callback_threadsafe(lambda: self._connection.close()) + def _recreate_subscriptions(self): """Resubscribe all existing subscriptions""" old_subscriptions = self._subscriptions @@ -1169,6 +1180,7 @@ def _add_subscription(self, subscription_id: int, subscription: _PikaSubscriptio # Open a dedicated channel for this subscription channel = self._connection.channel() + channel.add_on_close_callback(self._callback_on_channel_close) channel.basic_qos(prefetch_count=subscription.prefetch_count) if subscription.kind is _PikaSubscriptionKind.FANOUT: From 2de042f63b720c4cee6913ef3e6f9a4b66841630 Mon Sep 17 00:00:00 2001 From: Markus Gerstel Date: Thu, 11 Nov 2021 13:35:32 +0000 Subject: [PATCH 2/9] Detect transport connection loss in service process --- src/workflows/services/common_service.py | 22 ++++++++++++---------- src/workflows/transport/pika_transport.py | 10 +++++----- 2 files changed, 17 insertions(+), 15 deletions(-) diff --git a/src/workflows/services/common_service.py b/src/workflows/services/common_service.py index 348090f4..b8966a60 100644 --- a/src/workflows/services/common_service.py +++ b/src/workflows/services/common_service.py @@ -393,20 +393,22 @@ def start(self, **kwargs): while not self.__shutdown: # main loop self.__update_service_status(self.SERVICE_STATUS_IDLE) - if self._idle_time is None: - task = self.__queue.get() - else: - try: - task = self.__queue.get(True, self._idle_time) - run_idle_task = False - except queue.Empty: - run_idle_task = True - if run_idle_task: + try: + task = self.__queue.get(True, self._idle_time or 2) + run_idle_task = False + except queue.Empty: + run_idle_task = True + + if self.transport and not self.transport.is_connected(): + raise workflows.Disconnected("Connection lost") + + if run_idle_task: + if self._idle_time: # run this outside the 'except' to avoid exception chaining self.__update_service_status(self.SERVICE_STATUS_TIMER) if self._idle_callback: self._idle_callback() - continue + continue self.__update_service_status(self.SERVICE_STATUS_PROCESSING) diff --git a/src/workflows/transport/pika_transport.py b/src/workflows/transport/pika_transport.py index 2cabf868..64dddfeb 100644 --- a/src/workflows/transport/pika_transport.py +++ b/src/workflows/transport/pika_transport.py @@ -1133,11 +1133,11 @@ def _get_shared_channel(self) -> BlockingChannel: if not self._pika_shared_channel: self._pika_shared_channel = self._connection.channel() - self._pika_shared_channel.add_on_close_callback( - self._callback_on_channel_close - ) + # self._pika_shared_channel.add_on_close_callback( + # self._callback_on_channel_close + # ) - ##### self._pika_shared_channel.confirm_delivery() + ##### self._pika_shared_channel.confirm_delivery() return self._pika_shared_channel def _callback_on_channel_close( @@ -1180,7 +1180,7 @@ def _add_subscription(self, subscription_id: int, subscription: _PikaSubscriptio # Open a dedicated channel for this subscription channel = self._connection.channel() - channel.add_on_close_callback(self._callback_on_channel_close) + # channel.add_on_close_callback(self._callback_on_channel_close) channel.basic_qos(prefetch_count=subscription.prefetch_count) if subscription.kind is _PikaSubscriptionKind.FANOUT: From 546a98e9cb403b9e974945f53d8064a665b2f27a Mon Sep 17 00:00:00 2001 From: Markus Gerstel Date: Thu, 11 Nov 2021 13:36:49 +0000 Subject: [PATCH 3/9] revert on_channel_close() callback changes this API doesn't actually exist?! --- src/workflows/transport/pika_transport.py | 15 +-------------- 1 file changed, 1 insertion(+), 14 deletions(-) diff --git a/src/workflows/transport/pika_transport.py b/src/workflows/transport/pika_transport.py index 64dddfeb..5b551d78 100644 --- a/src/workflows/transport/pika_transport.py +++ b/src/workflows/transport/pika_transport.py @@ -1133,21 +1133,9 @@ def _get_shared_channel(self) -> BlockingChannel: if not self._pika_shared_channel: self._pika_shared_channel = self._connection.channel() - # self._pika_shared_channel.add_on_close_callback( - # self._callback_on_channel_close - # ) - - ##### self._pika_shared_channel.confirm_delivery() + ##### self._pika_shared_channel.confirm_delivery() return self._pika_shared_channel - def _callback_on_channel_close( - self, channel: BlockingChannel, exception: pika.exceptions.ChannelClosed - ): - logger.error(f"Channel {channel} closed with {exception}") - self._please_stop.set() - if self._connection: - self._connection.add_callback_threadsafe(lambda: self._connection.close()) - def _recreate_subscriptions(self): """Resubscribe all existing subscriptions""" old_subscriptions = self._subscriptions @@ -1180,7 +1168,6 @@ def _add_subscription(self, subscription_id: int, subscription: _PikaSubscriptio # Open a dedicated channel for this subscription channel = self._connection.channel() - # channel.add_on_close_callback(self._callback_on_channel_close) channel.basic_qos(prefetch_count=subscription.prefetch_count) if subscription.kind is _PikaSubscriptionKind.FANOUT: From 433b4c7a854fbd9970c43e7b6d2067dc2f6c0a64 Mon Sep 17 00:00:00 2001 From: Markus Gerstel Date: Thu, 11 Nov 2021 13:39:31 +0000 Subject: [PATCH 4/9] Remove one page of not very insightful traceback --- src/workflows/transport/pika_transport.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/workflows/transport/pika_transport.py b/src/workflows/transport/pika_transport.py index 5b551d78..0d9aebfa 100644 --- a/src/workflows/transport/pika_transport.py +++ b/src/workflows/transport/pika_transport.py @@ -394,7 +394,7 @@ def _subscribe( pika.exceptions.AMQPChannelError, pika.exceptions.AMQPConnectionError, ) as e: - raise workflows.Disconnected(e) + raise workflows.Disconnected(repr(e)) from None def _subscribe_broadcast( self, From 5b7f425dfd1b22b15a88cb33afe0065dd96bea47 Mon Sep 17 00:00:00 2001 From: Markus Gerstel Date: Thu, 11 Nov 2021 13:41:01 +0000 Subject: [PATCH 5/9] Update test for changed behaviour --- tests/services/test_common_service.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/services/test_common_service.py b/tests/services/test_common_service.py index 4bb2c807..de107bd4 100644 --- a/tests/services/test_common_service.py +++ b/tests/services/test_common_service.py @@ -220,7 +220,7 @@ def test_observe_shutdown_command(): # Check startup/shutdown sequence service.initializing.assert_called_once() service.in_shutdown.assert_called_once() - main_queue.get.assert_called_once_with() + main_queue.get.assert_called_once_with(True, 2) messages = [] while fe_pipe_out.poll(): message = fe_pipe_out.recv() From 66cab92178e1fa274144328d60d6440f049d7064 Mon Sep 17 00:00:00 2001 From: Markus Gerstel Date: Thu, 11 Nov 2021 14:23:22 +0000 Subject: [PATCH 6/9] Improve handling of log forwarding issues between service and frontend --- src/workflows/logging/__init__.py | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/src/workflows/logging/__init__.py b/src/workflows/logging/__init__.py index 2d50e994..5ab56628 100644 --- a/src/workflows/logging/__init__.py +++ b/src/workflows/logging/__init__.py @@ -62,5 +62,16 @@ def emit(self, record): for serialization.""" try: self._callback(self.prepare(record)) - except Exception: - self.handleError(record) + except Exception as e: + try: + sys.stderr.write( + f"--- Logging error --- Exception: {e!r}\n" + "Could not forward log message from service to frontend process\n" + f"Message: {record.msg}\n" + f"Level: {record.levelno} - Thread: {record.threadName} - Arguments: {record.args}\n" + ) + except Exception as e: + sys.stderr.write( + "--- Logging error ---\n" + f"Encountered exception {e!r} during exception handling\n" + ) From fd40615246a0268420aa290d9c17daef3aa4ff40 Mon Sep 17 00:00:00 2001 From: Markus Gerstel Date: Thu, 11 Nov 2021 14:26:57 +0000 Subject: [PATCH 7/9] move single file out of directory --- src/workflows/{logging/__init__.py => logging.py} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename src/workflows/{logging/__init__.py => logging.py} (100%) diff --git a/src/workflows/logging/__init__.py b/src/workflows/logging.py similarity index 100% rename from src/workflows/logging/__init__.py rename to src/workflows/logging.py From 850df296dc439b7120b472d85c2166ae468c857c Mon Sep 17 00:00:00 2001 From: Markus Gerstel Date: Thu, 11 Nov 2021 14:33:33 +0000 Subject: [PATCH 8/9] Stick with logging module API --- src/workflows/logging.py | 28 ++++++++++++++++------------ 1 file changed, 16 insertions(+), 12 deletions(-) diff --git a/src/workflows/logging.py b/src/workflows/logging.py index 5ab56628..5b03cf7b 100644 --- a/src/workflows/logging.py +++ b/src/workflows/logging.py @@ -62,16 +62,20 @@ def emit(self, record): for serialization.""" try: self._callback(self.prepare(record)) + except Exception: + self.handleError(record) + + def handleError(self, record): + t, v, _ = sys.exc_info() + try: + sys.stderr.write( + f"--- Logging error --- {t.__name__}: {v}\n" + "Could not forward log message from service to frontend process\n" + f"Message: {record.msg}\n" + f"Level: {record.levelno} - Thread: {record.threadName} - Arguments: {record.args}\n" + ) except Exception as e: - try: - sys.stderr.write( - f"--- Logging error --- Exception: {e!r}\n" - "Could not forward log message from service to frontend process\n" - f"Message: {record.msg}\n" - f"Level: {record.levelno} - Thread: {record.threadName} - Arguments: {record.args}\n" - ) - except Exception as e: - sys.stderr.write( - "--- Logging error ---\n" - f"Encountered exception {e!r} during exception handling\n" - ) + sys.stderr.write( + "--- Logging error ---\n" + f"Encountered exception {e!r} during exception handling\n" + ) From 41a77eb528a13d58f710381808741e7321e433d6 Mon Sep 17 00:00:00 2001 From: Markus Gerstel Date: Thu, 11 Nov 2021 14:48:39 +0000 Subject: [PATCH 9/9] Fix resource warnings --- tests/transport/test_pika.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/transport/test_pika.py b/tests/transport/test_pika.py index c5cc8076..c6fb358f 100644 --- a/tests/transport/test_pika.py +++ b/tests/transport/test_pika.py @@ -797,9 +797,10 @@ def connection_params(): ] # Try a connection here to make sure this is valid try: - pika.BlockingConnection(params) + bc = pika.BlockingConnection(params) except BaseException: pytest.skip("Failed to create test RabbitMQ connection") + bc.close() return params