Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions cts/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,11 @@ def cache_composes_if_state_changed(session, flush_context):


def start_to_publish_messages(session):
"""Publish messages after data is committed to database successfully"""
"""
Publish messages after data is committed to database successfully.

Messages are published asynchronously in a background thread.
"""
import cts.messaging as messaging

with _cache_lock:
Expand All @@ -99,8 +103,6 @@ def start_to_publish_messages(session):
msgs += compose_msgs
log.debug("Sending messages: %s", msgs)
if msgs:
try:
messaging.publish(msgs)
except Exception:
log.exception("Cannot publish message to bus.")
# Publish asynchronously - returns immediately
messaging.publish(msgs)
_cached_composes.clear()
93 changes: 76 additions & 17 deletions cts/messaging.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
# Written by Chenxiong Qi <cqi@redhat.com>

import json
import time
from concurrent.futures import ThreadPoolExecutor
from logging import getLogger

from cts import conf
Expand All @@ -30,34 +32,91 @@

__all__ = ("publish",)

# Thread pool for async message publishing (single worker to serialize message sending)
_executor = ThreadPoolExecutor(max_workers=1, thread_name_prefix="cts-messaging")


def publish(msgs):
"""Start to send messages to message broker"""
"""
Publish messages to message broker asynchronously.

Messages are published in a background thread to avoid blocking HTTP requests.
Failures are logged but do not prevent the HTTP response from being sent.
"""
backend = _get_messaging_backend()
if backend is not None:
backend(msgs)

def _send():
try:
backend(msgs)
except Exception:
log.exception("Failed to publish messages to message broker.")

_executor.submit(_send)


def _retry_with_backoff(func, max_retries=3, initial_delay=1.0, backoff_multiplier=2.0):
"""
Retry a function with exponential backoff.

Args:
func: Callable to retry
max_retries: Maximum number of retry attempts (default: 3)
initial_delay: Initial delay in seconds (default: 1.0)
backoff_multiplier: Multiplier for exponential backoff (default: 2.0)

Returns:
Result of the function call

Raises:
The last exception if all retries fail
"""
delay = initial_delay
last_exception = None

for attempt in range(max_retries + 1):
try:
return func()
except Exception as e:
last_exception = e
if attempt < max_retries:
log.warning(
f"UMB messaging attempt {attempt + 1}/{max_retries + 1} failed: {e}. "
f"Retrying in {delay:.1f}s..."
)
time.sleep(delay)
delay *= backoff_multiplier
else:
log.error(f"UMB messaging failed after {max_retries + 1} attempts: {e}")

raise last_exception


def _umb_send_msg(msgs):
"""Send message to Unified Message Bus"""
"""Send message to Unified Message Bus with retry logic"""

import proton
from rhmsg.activemq.producer import AMQProducer

config = {
"urls": conf.messaging_broker_urls,
"certificate": conf.messaging_cert_file,
"private_key": conf.messaging_key_file,
"trusted_certificates": conf.messaging_ca_cert,
}
with AMQProducer(**config) as producer:
for msg in msgs:
event = msg.get("event", "event")
topic = "%s%s" % (conf.messaging_topic_prefix, event)
producer.through_topic(topic)
outgoing_msg = proton.Message()
outgoing_msg.body = json.dumps(msg)
producer.send(outgoing_msg)
def _send():
"""Inner function to send messages (will be retried on failure)"""
config = {
"urls": conf.messaging_broker_urls,
"certificate": conf.messaging_cert_file,
"private_key": conf.messaging_key_file,
"trusted_certificates": conf.messaging_ca_cert,
}
with AMQProducer(**config) as producer:
for msg in msgs:
event = msg.get("event", "event")
topic = "%s%s" % (conf.messaging_topic_prefix, event)
producer.through_topic(topic)
outgoing_msg = proton.Message()
outgoing_msg.body = json.dumps(msg)
producer.send(outgoing_msg)

# Retry the send operation with exponential backoff
_retry_with_backoff(_send)


def _fedora_messaging_send_msg(msgs):
Expand Down
144 changes: 144 additions & 0 deletions tests/test_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -348,3 +348,147 @@ def test_retag_stale_composes(self, publish):
self.assertEqual(publish.mock_calls[7], expected_call)
# There should be 8 mock calls, since timeout is not occured for development-nightly-requested compose and nightly compose is not retagged
self.assertEqual(len(publish.mock_calls), 8)


@unittest.skipUnless(rhmsg, "rhmsg is required to run this test case.")
class TestRhmsgRetries(unittest.TestCase):
"""Test certificate file validation for UMB messaging"""

def test_retry_with_backoff_success_on_first_attempt(self):
"""Test retry succeeds immediately if function works on first try"""
from cts.messaging import _retry_with_backoff

call_count = [0]

def successful_func():
call_count[0] += 1
return "success"

result = _retry_with_backoff(successful_func)
self.assertEqual(result, "success")
self.assertEqual(call_count[0], 1)

def test_retry_with_backoff_success_after_failures(self):
"""Test retry succeeds after some failures"""
from cts.messaging import _retry_with_backoff

call_count = [0]

def flaky_func():
call_count[0] += 1
if call_count[0] < 3:
raise ConnectionError("Temporary failure")
return "success"

with patch("time.sleep"): # Mock sleep to speed up test
result = _retry_with_backoff(flaky_func, max_retries=3)

self.assertEqual(result, "success")
self.assertEqual(call_count[0], 3)

def test_retry_with_backoff_exhausts_retries(self):
"""Test retry raises exception after all retries exhausted"""
from cts.messaging import _retry_with_backoff

call_count = [0]

def always_fails():
call_count[0] += 1
raise ConnectionError("Persistent failure")

with patch("time.sleep"): # Mock sleep to speed up test
with self.assertRaises(ConnectionError) as cm:
_retry_with_backoff(always_fails, max_retries=2)

self.assertIn("Persistent failure", str(cm.exception))
self.assertEqual(call_count[0], 3) # Initial attempt + 2 retries

@patch("rhmsg.activemq.producer.AMQProducer")
@patch("proton.Message")
def test_umb_send_msg_retries_on_transient_failure(
self, mock_message, mock_producer
):
"""Test that _umb_send_msg retries on transient failures"""
from cts.messaging import _umb_send_msg

# Simulate transient failure then success
attempt_count = [0]

def producer_side_effect(*args, **kwargs):
attempt_count[0] += 1
if attempt_count[0] == 1:
raise ConnectionError("Transient network error")
return mock_producer.return_value

with patch("time.sleep"): # Mock sleep to speed up test
with patch(
"rhmsg.activemq.producer.AMQProducer", side_effect=producer_side_effect
):
# Should succeed on second attempt
_umb_send_msg([{"event": "test", "data": "test"}])

self.assertEqual(attempt_count[0], 2)

def test_publish_is_async(self):
"""Test that publish() returns immediately without blocking"""
from cts.messaging import publish, _executor
import time

# Mock the backend to simulate slow operation
slow_backend = Mock()

def slow_send(msgs):
time.sleep(0.1) # Simulate slow message sending

slow_backend.side_effect = slow_send

with patch("cts.messaging._get_messaging_backend", return_value=slow_backend):
start = time.time()
publish([{"event": "test"}])
elapsed = time.time() - start

# publish() should return immediately (much less than 0.1s)
self.assertLess(elapsed, 0.05)

# Wait for background thread to complete
_executor.shutdown(wait=True, cancel_futures=False)

# Now the backend should have been called
slow_backend.assert_called_once()

# Recreate executor for other tests
import cts.messaging
from concurrent.futures import ThreadPoolExecutor

cts.messaging._executor = ThreadPoolExecutor(
max_workers=1, thread_name_prefix="cts-messaging"
)

def test_publish_error_handling_in_background_thread(self):
"""Test that errors in background thread are logged properly"""
from cts.messaging import publish, _executor

# Mock backend that raises an error
failing_backend = Mock(side_effect=RuntimeError("Connection failed"))

with patch(
"cts.messaging._get_messaging_backend", return_value=failing_backend
):
with patch("cts.messaging.log") as mock_log:
publish([{"event": "test"}])

# Wait for background thread to complete
_executor.shutdown(wait=True, cancel_futures=False)

# Error should have been logged
mock_log.exception.assert_called_once_with(
"Failed to publish messages to message broker."
)

# Recreate executor for other tests
import cts.messaging
from concurrent.futures import ThreadPoolExecutor

cts.messaging._executor = ThreadPoolExecutor(
max_workers=1, thread_name_prefix="cts-messaging"
)