From 1d11897597fdfd2c6c225716472c37c5e5d8a486 Mon Sep 17 00:00:00 2001 From: Tomaz Muraus Date: Tue, 22 Jan 2019 13:01:50 +0100 Subject: [PATCH 1/3] Fix RabbitMQ connection retry wrapper so it doesn't block for indefinetly if connection to RabbitMQ cannot be established. Also add errback which logs an actual error when trying to re-establish a connection. --- .../transport/connection_retry_wrapper.py | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/st2common/st2common/transport/connection_retry_wrapper.py b/st2common/st2common/transport/connection_retry_wrapper.py index 05eb667e1b..af46c3702b 100644 --- a/st2common/st2common/transport/connection_retry_wrapper.py +++ b/st2common/st2common/transport/connection_retry_wrapper.py @@ -136,6 +136,7 @@ def run(self, connection, wrapped_callback): raise # -1, 0 and 1+ are handled properly by eventlet.sleep + wait = 1 self._logger.debug('Received RabbitMQ server error, sleeping for %s seconds ' 'before retrying: %s' % (wait, str(e))) eventlet.sleep(wait) @@ -146,10 +147,21 @@ def run(self, connection, wrapped_callback): # entire ConnectionPool simultaneously but that would require writing our own # ConnectionPool. If a server recovers it could happen that the same process # ends up talking to separate nodes in a cluster. - connection.ensure_connection() + def log_error_on_conn_failure(exc, interval): + self._logger.debug('Failed to re-establish connection to RabbitMQ server: %s' % + (str(e))) + + try: + # NOTE: This function blocks and tries to restablish a connection for + # indefinetly if "max_retries" argument is not specified + connection.ensure_connection(max_retries=5, errback=log_error_on_conn_failure) + except Exception: + self._logger.exception('Connections to RabbitMQ cannot be re-established: %s', + str(e)) + raise except Exception as e: - self._logger.exception('Connections to rabbitmq cannot be re-established: %s', + self._logger.exception('Connections to RabbitMQ cannot be re-established: %s', str(e)) # Not being able to publish a message could be a significant issue for an app. raise From 2d8c88a3284d5f6ac88889a2439e7753eb06c1b9 Mon Sep 17 00:00:00 2001 From: Tomaz Muraus Date: Tue, 22 Jan 2019 13:34:29 +0100 Subject: [PATCH 2/3] Make max retries configurable, also log sleep interval. --- .../st2common/transport/connection_retry_wrapper.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/st2common/st2common/transport/connection_retry_wrapper.py b/st2common/st2common/transport/connection_retry_wrapper.py index af46c3702b..f49ef7a438 100644 --- a/st2common/st2common/transport/connection_retry_wrapper.py +++ b/st2common/st2common/transport/connection_retry_wrapper.py @@ -99,9 +99,12 @@ def wrapped_callback(connection, channel): retry_wrapper.run(connection=connection, wrapped_callback=wrapped_callback) """ - def __init__(self, cluster_size, logger): + def __init__(self, cluster_size, logger, ensure_max_retries=3): self._retry_context = ClusterRetryContext(cluster_size=cluster_size) self._logger = logger + # How many times to try to retrying establishing a connection in a place where we are + # calling connection.ensure_connection + self._ensure_max_retries = ensure_max_retries def errback(self, exc, interval): self._logger.error('Rabbitmq connection error: %s', exc.message) @@ -149,13 +152,14 @@ def run(self, connection, wrapped_callback): # ends up talking to separate nodes in a cluster. def log_error_on_conn_failure(exc, interval): - self._logger.debug('Failed to re-establish connection to RabbitMQ server: %s' % - (str(e))) + self._logger.debug('Failed to re-establish connection to RabbitMQ server, ' + 'retrying in %s seconds: %s' % (interval, str(e))) try: # NOTE: This function blocks and tries to restablish a connection for # indefinetly if "max_retries" argument is not specified - connection.ensure_connection(max_retries=5, errback=log_error_on_conn_failure) + connection.ensure_connection(max_retries=self._ensure_max_retries, + errback=log_error_on_conn_failure) except Exception: self._logger.exception('Connections to RabbitMQ cannot be re-established: %s', str(e)) From ea41466b9c40acf3615eeb0500591fee88ca3467 Mon Sep 17 00:00:00 2001 From: Tomaz Muraus Date: Tue, 22 Jan 2019 13:45:55 +0100 Subject: [PATCH 3/3] Remove debug code. --- st2common/st2common/transport/connection_retry_wrapper.py | 1 - 1 file changed, 1 deletion(-) diff --git a/st2common/st2common/transport/connection_retry_wrapper.py b/st2common/st2common/transport/connection_retry_wrapper.py index f49ef7a438..30c780e63c 100644 --- a/st2common/st2common/transport/connection_retry_wrapper.py +++ b/st2common/st2common/transport/connection_retry_wrapper.py @@ -139,7 +139,6 @@ def run(self, connection, wrapped_callback): raise # -1, 0 and 1+ are handled properly by eventlet.sleep - wait = 1 self._logger.debug('Received RabbitMQ server error, sleeping for %s seconds ' 'before retrying: %s' % (wait, str(e))) eventlet.sleep(wait)