diff --git a/src/workflows/logging/__init__.py b/src/workflows/logging.py similarity index 79% rename from src/workflows/logging/__init__.py rename to src/workflows/logging.py index 2d50e994..5b03cf7b 100644 --- a/src/workflows/logging/__init__.py +++ b/src/workflows/logging.py @@ -64,3 +64,18 @@ def emit(self, record): self._callback(self.prepare(record)) except Exception: self.handleError(record) + + def handleError(self, record): + t, v, _ = sys.exc_info() + try: + sys.stderr.write( + f"--- Logging error --- {t.__name__}: {v}\n" + "Could not forward log message from service to frontend process\n" + f"Message: {record.msg}\n" + f"Level: {record.levelno} - Thread: {record.threadName} - Arguments: {record.args}\n" + ) + except Exception as e: + sys.stderr.write( + "--- Logging error ---\n" + f"Encountered exception {e!r} during exception handling\n" + ) diff --git a/src/workflows/services/common_service.py b/src/workflows/services/common_service.py index 348090f4..b8966a60 100644 --- a/src/workflows/services/common_service.py +++ b/src/workflows/services/common_service.py @@ -393,20 +393,22 @@ def start(self, **kwargs): while not self.__shutdown: # main loop self.__update_service_status(self.SERVICE_STATUS_IDLE) - if self._idle_time is None: - task = self.__queue.get() - else: - try: - task = self.__queue.get(True, self._idle_time) - run_idle_task = False - except queue.Empty: - run_idle_task = True - if run_idle_task: + try: + task = self.__queue.get(True, self._idle_time or 2) + run_idle_task = False + except queue.Empty: + run_idle_task = True + + if self.transport and not self.transport.is_connected(): + raise workflows.Disconnected("Connection lost") + + if run_idle_task: + if self._idle_time: # run this outside the 'except' to avoid exception chaining self.__update_service_status(self.SERVICE_STATUS_TIMER) if self._idle_callback: self._idle_callback() - continue + continue self.__update_service_status(self.SERVICE_STATUS_PROCESSING) diff --git a/src/workflows/transport/pika_transport.py b/src/workflows/transport/pika_transport.py index b073205e..0d9aebfa 100644 --- a/src/workflows/transport/pika_transport.py +++ b/src/workflows/transport/pika_transport.py @@ -13,7 +13,6 @@ from enum import Enum, auto from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, Union -import pika import pika.exceptions from pika.adapters.blocking_connection import BlockingChannel @@ -395,7 +394,7 @@ def _subscribe( pika.exceptions.AMQPChannelError, pika.exceptions.AMQPConnectionError, ) as e: - raise workflows.Disconnected(e) + raise workflows.Disconnected(repr(e)) from None def _subscribe_broadcast( self, diff --git a/tests/services/test_common_service.py b/tests/services/test_common_service.py index 4bb2c807..de107bd4 100644 --- a/tests/services/test_common_service.py +++ b/tests/services/test_common_service.py @@ -220,7 +220,7 @@ def test_observe_shutdown_command(): # Check startup/shutdown sequence service.initializing.assert_called_once() service.in_shutdown.assert_called_once() - main_queue.get.assert_called_once_with() + main_queue.get.assert_called_once_with(True, 2) messages = [] while fe_pipe_out.poll(): message = fe_pipe_out.recv() diff --git a/tests/transport/test_pika.py b/tests/transport/test_pika.py index c5cc8076..c6fb358f 100644 --- a/tests/transport/test_pika.py +++ b/tests/transport/test_pika.py @@ -797,9 +797,10 @@ def connection_params(): ] # Try a connection here to make sure this is valid try: - pika.BlockingConnection(params) + bc = pika.BlockingConnection(params) except BaseException: pytest.skip("Failed to create test RabbitMQ connection") + bc.close() return params