From a6a37189030f3762382a7ce19948ec06a14f8894 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lubom=C3=ADr=20Sedl=C3=A1=C5=99?= Date: Mon, 3 Nov 2025 15:06:23 +0100 Subject: [PATCH] Add retries to publishing events Sending of messages is moved to a background thread, so that waiting between retries does not block the user waiting for http response. The database transaction still succeeds even if message publishing fails after all retries. Failed messages are logged for monitoring. JIRA: RHELCMP-14891 Assisted-By: Claude --- cts/events.py | 12 ++-- cts/messaging.py | 93 +++++++++++++++++++++++----- tests/test_events.py | 144 +++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 227 insertions(+), 22 deletions(-) diff --git a/cts/events.py b/cts/events.py index 8878e30..17aa33f 100644 --- a/cts/events.py +++ b/cts/events.py @@ -90,7 +90,11 @@ def cache_composes_if_state_changed(session, flush_context): def start_to_publish_messages(session): - """Publish messages after data is committed to database successfully""" + """ + Publish messages after data is committed to database successfully. + + Messages are published asynchronously in a background thread. + """ import cts.messaging as messaging with _cache_lock: @@ -99,8 +103,6 @@ def start_to_publish_messages(session): msgs += compose_msgs log.debug("Sending messages: %s", msgs) if msgs: - try: - messaging.publish(msgs) - except Exception: - log.exception("Cannot publish message to bus.") + # Publish asynchronously - returns immediately + messaging.publish(msgs) _cached_composes.clear() diff --git a/cts/messaging.py b/cts/messaging.py index e1ac7ba..1aaa4ca 100644 --- a/cts/messaging.py +++ b/cts/messaging.py @@ -22,6 +22,8 @@ # Written by Chenxiong Qi import json +import time +from concurrent.futures import ThreadPoolExecutor from logging import getLogger from cts import conf @@ -30,34 +32,91 @@ __all__ = ("publish",) +# Thread pool for async message publishing (single worker to serialize message sending) +_executor = ThreadPoolExecutor(max_workers=1, thread_name_prefix="cts-messaging") + def publish(msgs): - """Start to send messages to message broker""" + """ + Publish messages to message broker asynchronously. + + Messages are published in a background thread to avoid blocking HTTP requests. + Failures are logged but do not prevent the HTTP response from being sent. + """ backend = _get_messaging_backend() if backend is not None: - backend(msgs) + + def _send(): + try: + backend(msgs) + except Exception: + log.exception("Failed to publish messages to message broker.") + + _executor.submit(_send) + + +def _retry_with_backoff(func, max_retries=3, initial_delay=1.0, backoff_multiplier=2.0): + """ + Retry a function with exponential backoff. + + Args: + func: Callable to retry + max_retries: Maximum number of retry attempts (default: 3) + initial_delay: Initial delay in seconds (default: 1.0) + backoff_multiplier: Multiplier for exponential backoff (default: 2.0) + + Returns: + Result of the function call + + Raises: + The last exception if all retries fail + """ + delay = initial_delay + last_exception = None + + for attempt in range(max_retries + 1): + try: + return func() + except Exception as e: + last_exception = e + if attempt < max_retries: + log.warning( + f"UMB messaging attempt {attempt + 1}/{max_retries + 1} failed: {e}. " + f"Retrying in {delay:.1f}s..." + ) + time.sleep(delay) + delay *= backoff_multiplier + else: + log.error(f"UMB messaging failed after {max_retries + 1} attempts: {e}") + + raise last_exception def _umb_send_msg(msgs): - """Send message to Unified Message Bus""" + """Send message to Unified Message Bus with retry logic""" import proton from rhmsg.activemq.producer import AMQProducer - config = { - "urls": conf.messaging_broker_urls, - "certificate": conf.messaging_cert_file, - "private_key": conf.messaging_key_file, - "trusted_certificates": conf.messaging_ca_cert, - } - with AMQProducer(**config) as producer: - for msg in msgs: - event = msg.get("event", "event") - topic = "%s%s" % (conf.messaging_topic_prefix, event) - producer.through_topic(topic) - outgoing_msg = proton.Message() - outgoing_msg.body = json.dumps(msg) - producer.send(outgoing_msg) + def _send(): + """Inner function to send messages (will be retried on failure)""" + config = { + "urls": conf.messaging_broker_urls, + "certificate": conf.messaging_cert_file, + "private_key": conf.messaging_key_file, + "trusted_certificates": conf.messaging_ca_cert, + } + with AMQProducer(**config) as producer: + for msg in msgs: + event = msg.get("event", "event") + topic = "%s%s" % (conf.messaging_topic_prefix, event) + producer.through_topic(topic) + outgoing_msg = proton.Message() + outgoing_msg.body = json.dumps(msg) + producer.send(outgoing_msg) + + # Retry the send operation with exponential backoff + _retry_with_backoff(_send) def _fedora_messaging_send_msg(msgs): diff --git a/tests/test_events.py b/tests/test_events.py index 913b188..a511c19 100644 --- a/tests/test_events.py +++ b/tests/test_events.py @@ -348,3 +348,147 @@ def test_retag_stale_composes(self, publish): self.assertEqual(publish.mock_calls[7], expected_call) # There should be 8 mock calls, since timeout is not occured for development-nightly-requested compose and nightly compose is not retagged self.assertEqual(len(publish.mock_calls), 8) + + +@unittest.skipUnless(rhmsg, "rhmsg is required to run this test case.") +class TestRhmsgRetries(unittest.TestCase): + """Test certificate file validation for UMB messaging""" + + def test_retry_with_backoff_success_on_first_attempt(self): + """Test retry succeeds immediately if function works on first try""" + from cts.messaging import _retry_with_backoff + + call_count = [0] + + def successful_func(): + call_count[0] += 1 + return "success" + + result = _retry_with_backoff(successful_func) + self.assertEqual(result, "success") + self.assertEqual(call_count[0], 1) + + def test_retry_with_backoff_success_after_failures(self): + """Test retry succeeds after some failures""" + from cts.messaging import _retry_with_backoff + + call_count = [0] + + def flaky_func(): + call_count[0] += 1 + if call_count[0] < 3: + raise ConnectionError("Temporary failure") + return "success" + + with patch("time.sleep"): # Mock sleep to speed up test + result = _retry_with_backoff(flaky_func, max_retries=3) + + self.assertEqual(result, "success") + self.assertEqual(call_count[0], 3) + + def test_retry_with_backoff_exhausts_retries(self): + """Test retry raises exception after all retries exhausted""" + from cts.messaging import _retry_with_backoff + + call_count = [0] + + def always_fails(): + call_count[0] += 1 + raise ConnectionError("Persistent failure") + + with patch("time.sleep"): # Mock sleep to speed up test + with self.assertRaises(ConnectionError) as cm: + _retry_with_backoff(always_fails, max_retries=2) + + self.assertIn("Persistent failure", str(cm.exception)) + self.assertEqual(call_count[0], 3) # Initial attempt + 2 retries + + @patch("rhmsg.activemq.producer.AMQProducer") + @patch("proton.Message") + def test_umb_send_msg_retries_on_transient_failure( + self, mock_message, mock_producer + ): + """Test that _umb_send_msg retries on transient failures""" + from cts.messaging import _umb_send_msg + + # Simulate transient failure then success + attempt_count = [0] + + def producer_side_effect(*args, **kwargs): + attempt_count[0] += 1 + if attempt_count[0] == 1: + raise ConnectionError("Transient network error") + return mock_producer.return_value + + with patch("time.sleep"): # Mock sleep to speed up test + with patch( + "rhmsg.activemq.producer.AMQProducer", side_effect=producer_side_effect + ): + # Should succeed on second attempt + _umb_send_msg([{"event": "test", "data": "test"}]) + + self.assertEqual(attempt_count[0], 2) + + def test_publish_is_async(self): + """Test that publish() returns immediately without blocking""" + from cts.messaging import publish, _executor + import time + + # Mock the backend to simulate slow operation + slow_backend = Mock() + + def slow_send(msgs): + time.sleep(0.1) # Simulate slow message sending + + slow_backend.side_effect = slow_send + + with patch("cts.messaging._get_messaging_backend", return_value=slow_backend): + start = time.time() + publish([{"event": "test"}]) + elapsed = time.time() - start + + # publish() should return immediately (much less than 0.1s) + self.assertLess(elapsed, 0.05) + + # Wait for background thread to complete + _executor.shutdown(wait=True, cancel_futures=False) + + # Now the backend should have been called + slow_backend.assert_called_once() + + # Recreate executor for other tests + import cts.messaging + from concurrent.futures import ThreadPoolExecutor + + cts.messaging._executor = ThreadPoolExecutor( + max_workers=1, thread_name_prefix="cts-messaging" + ) + + def test_publish_error_handling_in_background_thread(self): + """Test that errors in background thread are logged properly""" + from cts.messaging import publish, _executor + + # Mock backend that raises an error + failing_backend = Mock(side_effect=RuntimeError("Connection failed")) + + with patch( + "cts.messaging._get_messaging_backend", return_value=failing_backend + ): + with patch("cts.messaging.log") as mock_log: + publish([{"event": "test"}]) + + # Wait for background thread to complete + _executor.shutdown(wait=True, cancel_futures=False) + + # Error should have been logged + mock_log.exception.assert_called_once_with( + "Failed to publish messages to message broker." + ) + + # Recreate executor for other tests + import cts.messaging + from concurrent.futures import ThreadPoolExecutor + + cts.messaging._executor = ThreadPoolExecutor( + max_workers=1, thread_name_prefix="cts-messaging" + )