From 38ef4777aff855ace5482931f04d59b41851b6c7 Mon Sep 17 00:00:00 2001 From: "Gavin M. Roy" Date: Fri, 24 Apr 2015 00:32:13 -0400 Subject: [PATCH 01/24] Rename the common module to state It only had the State class, nothing else common --- rejected/{common.py => state.py} | 71 +++++++++++------------- tests/test_state.py | 94 ++++++++++++++++---------------- 2 files changed, 80 insertions(+), 85 deletions(-) rename rejected/{common.py => state.py} (61%) diff --git a/rejected/common.py b/rejected/state.py similarity index 61% rename from rejected/common.py rename to rejected/state.py index 3cbed5c..44f624b 100644 --- a/rejected/common.py +++ b/rejected/state.py @@ -1,5 +1,5 @@ """ -Common Mixin Classes +Base State Tracking Class """ import logging @@ -8,20 +8,6 @@ LOGGER = logging.getLogger(__name__) -try: - from logging import NullHandler -except ImportError: - # Python 2.6 does not have a NullHandler - class NullHandler(logging.Handler): - def emit(self, record): - pass - - -def add_null_handler(): - logger = logging.getLogger() - logger.addHandler(NullHandler()) - - class State(object): """Class that is to be extended by MCP and process for maintaining the internal state of the application. @@ -38,19 +24,19 @@ class State(object): STATE_STOPPED = 0x08 # For reverse lookup - _STATES = {0x01: 'Initializing', - 0x02: 'Connecting', - 0x03: 'Idle', - 0x04: 'Active', - 0x05: 'Sleeping', - 0x06: 'Stop Requested', - 0x07: 'Shutting down', - 0x08: 'Stopped'} + STATES = {0x01: 'Initializing', + 0x02: 'Connecting', + 0x03: 'Idle', + 0x04: 'Active', + 0x05: 'Sleeping', + 0x06: 'Stop Requested', + 0x07: 'Shutting down', + 0x08: 'Stopped'} def __init__(self): """Initialize the state of the object""" - self._state = self.STATE_INITIALIZING - self._state_start = time.time() + self.state = self.STATE_INITIALIZING + self.state_start = time.time() def set_state(self, new_state): """Assign the specified state to this consumer object. @@ -60,14 +46,14 @@ def set_state(self, new_state): """ # Make sure it's a valid state - if new_state not in self._STATES: + if new_state not in self.STATES: raise ValueError('Invalid state value: %r' % new_state) # Set the state LOGGER.debug('State changing from %s to %s', - self._STATES[self._state], self._STATES[new_state]) - self._state = new_state - self._state_start = time.time() + self.STATES[self.state], self.STATES[new_state]) + self.state = new_state + self.state_start = time.time() @property def is_connecting(self): @@ -76,7 +62,7 @@ def is_connecting(self): :rtype: bool """ - return self._state == self.STATE_CONNECTING + return self.state == self.STATE_CONNECTING @property def is_idle(self): @@ -85,7 +71,7 @@ def is_idle(self): :rtype: bool """ - return self._state == self.STATE_IDLE + return self.state == self.STATE_IDLE @property def is_running(self): @@ -95,8 +81,8 @@ def is_running(self): :rtype: bool """ - return self._state in [self.STATE_IDLE, self.STATE_ACTIVE, - self.STATE_SLEEPING] + return self.state in [self.STATE_IDLE, self.STATE_ACTIVE, + self.STATE_SLEEPING] @property def is_shutting_down(self): @@ -105,7 +91,7 @@ def is_shutting_down(self): :rtype: bool """ - return self._state == self.STATE_SHUTTING_DOWN + return self.state == self.STATE_SHUTTING_DOWN @property @@ -115,7 +101,7 @@ def is_sleeping(self): :rtype: bool """ - return self._state == self.STATE_SLEEPING + return self.state == self.STATE_SLEEPING @property def is_stopped(self): @@ -124,7 +110,7 @@ def is_stopped(self): :rtype: bool """ - return self._state == self.STATE_STOPPED + return self.state == self.STATE_STOPPED @property def is_waiting_to_shutdown(self): @@ -133,7 +119,7 @@ def is_waiting_to_shutdown(self): :rtype: bool """ - return self._state == self.STATE_STOP_REQUESTED + return self.state == self.STATE_STOP_REQUESTED @property def state_description(self): @@ -142,4 +128,13 @@ def state_description(self): :rtype: str """ - return self._STATES[self._state] + return self.STATES[self.state] + + @property + def time_in_state(self): + """Return the time that has been spent in the current state. + + :rtype: float + + """ + return time.time() - self.state_start diff --git a/tests/test_state.py b/tests/test_state.py index ee0b61f..0eed4f1 100644 --- a/tests/test_state.py +++ b/tests/test_state.py @@ -5,168 +5,168 @@ except ImportError: import unittest -from rejected import common +from rejected import state class TestState(unittest.TestCase): def setUp(self): - self._obj = common.State() + self._obj = state.State() def testset_state_invalid_value(self): self.assertRaises(ValueError, self._obj.set_state, 9999) def testset_state_expected_assignment(self): - self._state = self._obj.STATE_IDLE + self.state = self._obj.STATE_IDLE self._obj.set_state(self._obj.STATE_CONNECTING) - self.assertEqual(self._obj._state, self._obj.STATE_CONNECTING) + self.assertEqual(self._obj.state, self._obj.STATE_CONNECTING) def testset_state_state_start(self): - self._state = self._obj.STATE_IDLE + self.state = self._obj.STATE_IDLE value = 86400 with mock.patch('time.time', return_value=value): self._obj.set_state(self._obj.STATE_CONNECTING) - self.assertEqual(self._obj._state_start, value) + self.assertEqual(self._obj.state_start, value) def test_state_initializing_desc(self): - self._obj._state = self._obj.STATE_INITIALIZING + self._obj.state = self._obj.STATE_INITIALIZING self.assertEqual(self._obj.state_description, - self._obj._STATES[self._obj.STATE_INITIALIZING]) + self._obj.STATES[self._obj.STATE_INITIALIZING]) def test_state_connecting_desc(self): - self._obj._state = self._obj.STATE_CONNECTING + self._obj.state = self._obj.STATE_CONNECTING self.assertEqual(self._obj.state_description, - self._obj._STATES[self._obj.STATE_CONNECTING]) + self._obj.STATES[self._obj.STATE_CONNECTING]) def test_state_idle_desc(self): - self._obj._state = self._obj.STATE_IDLE + self._obj.state = self._obj.STATE_IDLE self.assertEqual(self._obj.state_description, - self._obj._STATES[self._obj.STATE_IDLE]) + self._obj.STATES[self._obj.STATE_IDLE]) def test_state_active_desc(self): - self._obj._state = self._obj.STATE_ACTIVE + self._obj.state = self._obj.STATE_ACTIVE self.assertEqual(self._obj.state_description, - self._obj._STATES[self._obj.STATE_ACTIVE]) + self._obj.STATES[self._obj.STATE_ACTIVE]) def test_state_stop_requested_desc(self): - self._obj._state = self._obj.STATE_STOP_REQUESTED + self._obj.state = self._obj.STATE_STOP_REQUESTED self.assertEqual(self._obj.state_description, - self._obj._STATES[self._obj.STATE_STOP_REQUESTED]) + self._obj.STATES[self._obj.STATE_STOP_REQUESTED]) def test_state_shutting_down_desc(self): - self._obj._state = self._obj.STATE_SHUTTING_DOWN + self._obj.state = self._obj.STATE_SHUTTING_DOWN self.assertEqual(self._obj.state_description, - self._obj._STATES[self._obj.STATE_SHUTTING_DOWN]) + self._obj.STATES[self._obj.STATE_SHUTTING_DOWN]) def test_state_stopped_desc(self): - self._obj._state = self._obj.STATE_STOPPED + self._obj.state = self._obj.STATE_STOPPED self.assertEqual(self._obj.state_description, - self._obj._STATES[self._obj.STATE_STOPPED]) + self._obj.STATES[self._obj.STATE_STOPPED]) def test_is_idle_state_initializing(self): - self._obj._state = self._obj.STATE_INITIALIZING + self._obj.state = self._obj.STATE_INITIALIZING self.assertFalse(self._obj.is_idle) def test_is_idle_state_connecting(self): - self._obj._state = self._obj.STATE_CONNECTING + self._obj.state = self._obj.STATE_CONNECTING self.assertFalse(self._obj.is_idle) def test_is_idle_state_idle(self): - self._obj._state = self._obj.STATE_IDLE + self._obj.state = self._obj.STATE_IDLE self.assertTrue(self._obj.is_idle) def test_is_idle_state_processing(self): - self._obj._state = self._obj.STATE_ACTIVE + self._obj.state = self._obj.STATE_ACTIVE self.assertFalse(self._obj.is_idle) def test_is_idle_state_stop_requested(self): - self._obj._state = self._obj.STATE_STOP_REQUESTED + self._obj.state = self._obj.STATE_STOP_REQUESTED self.assertFalse(self._obj.is_idle) def test_is_idle_state_shutting_down(self): - self._obj._state = self._obj.STATE_SHUTTING_DOWN + self._obj.state = self._obj.STATE_SHUTTING_DOWN self.assertFalse(self._obj.is_idle) def test_is_idle_state_stopped(self): - self._obj._state = self._obj.STATE_STOPPED + self._obj.state = self._obj.STATE_STOPPED self.assertFalse(self._obj.is_idle) def test_is_running_state_initializing(self): - self._obj._state = self._obj.STATE_INITIALIZING + self._obj.state = self._obj.STATE_INITIALIZING self.assertFalse(self._obj.is_running) def test_is_running_state_connecting(self): - self._obj._state = self._obj.STATE_CONNECTING + self._obj.state = self._obj.STATE_CONNECTING self.assertFalse(self._obj.is_running) def test_is_running_state_idle(self): - self._obj._state = self._obj.STATE_IDLE + self._obj.state = self._obj.STATE_IDLE self.assertTrue(self._obj.is_running) def test_is_running_state_processing(self): - self._obj._state = self._obj.STATE_ACTIVE + self._obj.state = self._obj.STATE_ACTIVE self.assertTrue(self._obj.is_running) def test_is_running_state_stop_requested(self): - self._obj._state = self._obj.STATE_STOP_REQUESTED + self._obj.state = self._obj.STATE_STOP_REQUESTED self.assertFalse(self._obj.is_running) def test_is_running_state_shutting_down(self): - self._obj._state = self._obj.STATE_SHUTTING_DOWN + self._obj.state = self._obj.STATE_SHUTTING_DOWN self.assertFalse(self._obj.is_running) def test_is_running_state_stopped(self): - self._obj._state = self._obj.STATE_STOPPED + self._obj.state = self._obj.STATE_STOPPED self.assertFalse(self._obj.is_running) def test_is_shutting_down_state_initializing(self): - self._obj._state = self._obj.STATE_INITIALIZING + self._obj.state = self._obj.STATE_INITIALIZING self.assertFalse(self._obj.is_shutting_down) def test_is_shutting_down_state_connecting(self): - self._obj._state = self._obj.STATE_CONNECTING + self._obj.state = self._obj.STATE_CONNECTING self.assertFalse(self._obj.is_shutting_down) def test_is_shutting_down_state_idle(self): - self._obj._state = self._obj.STATE_IDLE + self._obj.state = self._obj.STATE_IDLE self.assertFalse(self._obj.is_shutting_down) def test_is_shutting_down_state_processing(self): - self._obj._state = self._obj.STATE_ACTIVE + self._obj.state = self._obj.STATE_ACTIVE self.assertFalse(self._obj.is_shutting_down) def test_is_shutting_down_state_stop_requested(self): - self._obj._state = self._obj.STATE_STOP_REQUESTED + self._obj.state = self._obj.STATE_STOP_REQUESTED self.assertFalse(self._obj.is_shutting_down) def test_is_shutting_down_state_shutting_down(self): - self._obj._state = self._obj.STATE_SHUTTING_DOWN + self._obj.state = self._obj.STATE_SHUTTING_DOWN self.assertTrue(self._obj.is_shutting_down) def test_is_shutting_down_state_stopped(self): - self._obj._state = self._obj.STATE_STOPPED + self._obj.state = self._obj.STATE_STOPPED self.assertFalse(self._obj.is_shutting_down) def test_is_stopped_state_initializing(self): - self._obj._state = self._obj.STATE_INITIALIZING + self._obj.state = self._obj.STATE_INITIALIZING self.assertFalse(self._obj.is_stopped) def test_is_stopped_state_connecting(self): - self._obj._state = self._obj.STATE_CONNECTING + self._obj.state = self._obj.STATE_CONNECTING self.assertFalse(self._obj.is_stopped) def test_is_stopped_state_idle(self): - self._obj._state = self._obj.STATE_IDLE + self._obj.state = self._obj.STATE_IDLE self.assertFalse(self._obj.is_stopped) def test_is_stopped_state_processing(self): - self._obj._state = self._obj.STATE_ACTIVE + self._obj.state = self._obj.STATE_ACTIVE self.assertFalse(self._obj.is_stopped) def test_is_stopped_state_stop_requested(self): - self._obj._state = self._obj.STATE_STOP_REQUESTED + self._obj.state = self._obj.STATE_STOP_REQUESTED self.assertFalse(self._obj.is_stopped) def test_is_stopped_state_shutting_down(self): - self._obj._state = self._obj.STATE_SHUTTING_DOWN + self._obj.state = self._obj.STATE_SHUTTING_DOWN self.assertFalse(self._obj.is_stopped) From 0a6ae1149ee73ca0de359030834225b03710433f Mon Sep 17 00:00:00 2001 From: "Gavin M. Roy" Date: Fri, 24 Apr 2015 00:36:50 -0400 Subject: [PATCH 02/24] MCP Cleanup - Is silly to have private vars since this is not a library - Change the consumers dict to an object to make it easier to reference - No need to check for zombie processes, silly me - Clean up how the -o and -q cli switches filter data - Fix some overriding of top level import names --- rejected/mcp.py | 335 ++++++++++++++++++++++------------------------ tests/test_mcp.py | 15 ++- 2 files changed, 170 insertions(+), 180 deletions(-) diff --git a/rejected/mcp.py b/rejected/mcp.py index 173b895..d6da38b 100644 --- a/rejected/mcp.py +++ b/rejected/mcp.py @@ -11,20 +11,34 @@ import sys import time -from rejected import common +from rejected import state from rejected import process from rejected import __version__ LOGGER = logging.getLogger(__name__) -class MasterControlProgram(common.State): +class Consumer(object): + """Class used for keeping track of each consumer type being managed by + the MCP + + """ + def __init__(self, connections, last_proc_num, processes, qty, queue): + self.connections = connections + self.last_proc_num = last_proc_num + self.processes = processes + self.qty = qty + self.queue = queue + + +class MasterControlProgram(state.State): """Master Control Program keeps track of and manages consumer processes.""" - _DEFAULT_CONSUMER_QTY = 1 - _MAX_SHUTDOWN_WAIT = 10 - _POLL_INTERVAL = 60.0 - _POLL_RESULTS_INTERVAL = 3.0 - _SHUTDOWN_WAIT = 1 + + DEFAULT_CONSUMER_QTY = 1 + MAX_SHUTDOWN_WAIT = 10 + POLL_INTERVAL = 60.0 + POLL_RESULTS_INTERVAL = 3.0 + SHUTDOWN_WAIT = 1 def __init__(self, config, consumer=None, profile=None, quantity=None): """Initialize the Master Control Program @@ -41,27 +55,26 @@ def __init__(self, config, consumer=None, profile=None, quantity=None): super(MasterControlProgram, self).__init__() # Default values - self._consumer = consumer - self._consumers = dict() - self._config = config - self._last_poll_results = dict() - self._poll_data = {'time': 0, 'processes': list()} - self._poll_timer = None - self._profile = profile - self._quantity = quantity if consumer else None - self._results_timer = None - self._stats = dict() - self._stats_queue = multiprocessing.Queue() - self._polled = False + self.consumer_cfg = self.get_consumer_cfg(config, consumer, quantity) + self.consumers = dict() + self.config = config + self.last_poll_results = dict() + self.poll_data = {'time': 0, 'processes': list()} + self.poll_timer = None + self.profile = profile + self.results_timer = None + self.stats = dict() + self.stats_queue = multiprocessing.Queue() + self.polled = False # Carry for logging internal stats collection data - self._log_stats_enabled = config.application.get('log_stats', False) - LOGGER.debug('Stats logging enabled: %s', self._log_stats_enabled) + self.log_stats_enabled = config.application.get('log_stats', False) + LOGGER.debug('Stats logging enabled: %s', self.log_stats_enabled) # Setup the poller related threads - self._poll_interval = config.application.get('poll_interval', - self._POLL_INTERVAL) - LOGGER.debug('Set process poll interval to %.2f', self._poll_interval) + self.poll_interval = config.application.get('poll_interval', + self.POLL_INTERVAL) + LOGGER.debug('Set process poll interval to %.2f', self.poll_interval) @property def active_processes(self): @@ -71,8 +84,8 @@ def active_processes(self): """ active_processes, dead_processes = list(), list() - for consumer in self._consumers: - for name in self._consumers[consumer]['processes']: + for consumer in self.consumers: + for name in self.consumers[consumer].processes: child = self.get_consumer_process(consumer, name) if int(child.pid) == os.getpid(): continue @@ -82,7 +95,7 @@ def active_processes(self): dead_processes.append((consumer, name)) continue - if self.is_a_dead_or_zombie_process(proc): + if self.is_dead(proc): dead_processes.append((consumer, name)) else: active_processes.append(child) @@ -144,8 +157,8 @@ def check_process_counts(self): """ LOGGER.debug('Checking minimum consumer process levels') - for name in self._consumers: - for connection in self._consumers[name]['connections']: + for name in self.consumers: + for connection in self.consumers[name].connections: processes_needed = self.process_spawn_qty(name, connection) LOGGER.debug('Need to spawn %i processes for %s on %s', processes_needed, name, connection) @@ -159,7 +172,7 @@ def collect_results(self, data_values): :type data_values: dict """ - self._last_poll_results['timestamp'] = self._poll_data['timestamp'] + self.last_poll_results['timestamp'] = self.poll_data['timestamp'] # Get the name and consumer name and remove it from what is reported consumer_name = data_values['consumer_name'] @@ -168,29 +181,12 @@ def collect_results(self, data_values): del data_values['name'] # Add it to our last poll global data - if consumer_name not in self._last_poll_results: - self._last_poll_results[consumer_name] = dict() - self._last_poll_results[consumer_name][process_name] = data_values + if consumer_name not in self.last_poll_results: + self.last_poll_results[consumer_name] = dict() + self.last_poll_results[consumer_name][process_name] = data_values # Calculate the stats - self._stats = self.calculate_stats(self._last_poll_results) - - def consumer_dict(self, configuration): - """Return a consumer dict for the given name and configuration. - - :param dict configuration: The consumer configuration - :rtype: dict - - """ - # Keep a dict that has a list of processes by connection - connections = dict() - for connection in configuration['connections']: - connections[connection] = list() - return {'connections': connections, - 'qty': configuration.get('qty', self._DEFAULT_CONSUMER_QTY), - 'last_proc_num': 0, - 'queue': configuration['queue'], - 'processes': dict()} + self.stats = self.calculate_stats(self.last_poll_results) @staticmethod def consumer_keyword(counts): @@ -223,28 +219,48 @@ def get_consumer_process(self, consumer, name): :returns: multiprocessing.Process """ - return self._consumers[consumer]['processes'].get(name) + return self.consumers[consumer].processes.get(name) + + @staticmethod + def get_consumer_cfg(config, only, qty): + """Get the consumers config, possibly filtering the config if only + or qty is set. + + :param config: The consumers config section + :type config: helper.config.Config + :param str only: When set, filter to run only this consumer + :param int qty: When set, set the consumer qty to this value + :rtype: dict + + """ + consumers = dict(config.application.Consumers) + if only: + for key in consumers.keys(): + if key != only: + del consumers[key] + if qty: + consumers[only]['qty'] = qty + return consumers @staticmethod - def is_a_dead_or_zombie_process(process): - """Checks to see if the specified process is a zombie or dead. + def is_dead(proc): + """Checks to see if the specified process is dead. - :param psutil.Process: The process to check + :param psutil.Process proc: The process to check :rtype: bool """ try: - if process.status in [psutil.STATUS_DEAD, psutil.STATUS_ZOMBIE]: + if proc.status == psutil.STATUS_DEAD: try: - LOGGER.debug('Found dead or zombie process with ' - '%s fds', - process.get_num_fds()) + LOGGER.debug('Found dead process with %s fds', + proc.get_num_fds()) except psutil.AccessDenied as error: - LOGGER.debug('Found dead or zombie process, ' - 'could not get fd count: %s', error) + LOGGER.debug('Found dead process, could not get ' + 'fd count: %s', error) return True except psutil.NoSuchProcess: - LOGGER.debug('Found dead or zombie process') + LOGGER.debug('Process is dead and does not exist') return True return False @@ -258,13 +274,13 @@ def kill_processes(self): processes = True while processes: processes = self.active_processes - for process in processes: - if int(process.pid) != int(os.getpid()): - LOGGER.warning('Killing %s (%s)', process.name, process.pid) - os.kill(int(process.pid), signal.SIGKILL) + for proc in processes: + if int(proc.pid) != int(os.getpid()): + LOGGER.warning('Killing %s (%s)', proc.name, proc.pid) + os.kill(int(proc.pid), signal.SIGKILL) else: LOGGER.warning('Cowardly refusing kill self (%s, %s)', - process.pid, os.getpid()) + proc.pid, os.getpid()) time.sleep(0.5) LOGGER.info('Killed all children') @@ -272,7 +288,7 @@ def kill_processes(self): def log_stats(self): """Output the stats to the LOGGER.""" - if not self._stats.get('counts'): + if not self.stats.get('counts'): LOGGER.info('Did not receive any stats data from children') return @@ -280,31 +296,43 @@ def log_stats(self): 'errors, waiting %.2f seconds and have spent %.2f seconds ' 'processing messages with an overall velocity of %.2f ' 'messages per second.', - self._stats['counts']['processes'], - self.consumer_keyword(self._stats['counts']), - self._stats['counts']['processed'], - self._stats['counts']['failed'], - self._stats['counts']['idle_time'], - self._stats['counts']['processing_time'], - self.calculate_velocity(self._stats['counts'])) - for key in self._stats['consumers'].keys(): + self.stats['counts']['processes'], + self.consumer_keyword(self.stats['counts']), + self.stats['counts']['processed'], + self.stats['counts']['failed'], + self.stats['counts']['idle_time'], + self.stats['counts']['processing_time'], + self.calculate_velocity(self.stats['counts'])) + for key in self.stats['consumers'].keys(): LOGGER.info('%i %s for %s have processed %i messages with %i ' 'errors, waiting %.2f seconds and have spent %.2f ' 'seconds processing messages with an overall velocity ' 'of %.2f messages per second.', - self._stats['consumers'][key]['processes'], - self.consumer_keyword(self._stats['consumers'][key]), + self.stats['consumers'][key]['processes'], + self.consumer_keyword(self.stats['consumers'][key]), key, - self._stats['consumers'][key]['processed'], - self._stats['consumers'][key]['failed'], - self._stats['consumers'][key]['idle_time'], - self._stats['consumers'][key]['processing_time'], - self.calculate_velocity(self._stats['consumers'][key])) - if self._poll_data['processes']: + self.stats['consumers'][key]['processed'], + self.stats['consumers'][key]['failed'], + self.stats['consumers'][key]['idle_time'], + self.stats['consumers'][key]['processing_time'], + self.calculate_velocity(self.stats['consumers'][key])) + if self.poll_data['processes']: LOGGER.warning('%i process(es) did not respond with stats in ' 'time: %r', - len(self._poll_data['processes']), - self._poll_data['processes']) + len(self.poll_data['processes']), + self.poll_data['processes']) + + def new_consumer(self, config): + """Return a consumer dict for the given name and configuration. + + :param dict config: The consumer configuration + :rtype: dict + + """ + return Consumer(dict([(c, []) for c in config['connections']]), + 0, dict(), + config.get('qty', self.DEFAULT_CONSUMER_QTY), + config['queue']) def new_process(self, consumer_name, connection_name): """Create a new consumer instances @@ -318,13 +346,13 @@ def new_process(self, consumer_name, connection_name): self.new_process_number(consumer_name)) LOGGER.debug('Creating a new process for %s: %s', connection_name, process_name) - kwargs = {'config': self._config.application, + kwargs = {'config': self.config.application, 'connection_name': connection_name, 'consumer_name': consumer_name, - 'profile': self._profile, + 'profile': self.profile, 'daemon': False, - 'stats_queue': self._stats_queue, - 'logging_config': self._config.logging} + 'stats_queue': self.stats_queue, + 'logging_config': self.config.logging} return process_name, process.Process(name=process_name, kwargs=kwargs) def new_process_number(self, name): @@ -335,22 +363,22 @@ def new_process_number(self, name): :rtype: int """ - self._consumers[name]['last_proc_num'] += 1 - return self._consumers[name]['last_proc_num'] + self.consumers[name].last_proc_num += 1 + return self.consumers[name].last_proc_num - def on_timer(self, signum, unused_frame): + def on_timer(self, _signum, _unused_frame): LOGGER.debug('Timer fired') - if not self._polled: + if not self.polled: self.poll() - self._polled = True + self.polled = True self.set_timer(5) # Wait 5 seconds for results else: - self._polled = False + self.polled = False self.poll_results_check() - self.set_timer(self._poll_interval) # Wait poll interval duration + self.set_timer(self.poll_interval) # Wait poll interval duration # If stats logging is enabled, log the stats - if self._log_stats_enabled: + if self.log_stats_enabled: self.log_stats() def poll(self): @@ -367,8 +395,7 @@ def poll(self): return self.set_state(self.STATE_STOPPED) # Start our data collection dict - self._poll_data = {'timestamp': time.time(), - 'processes': list()} + self.poll_data = {'timestamp': time.time(), 'processes': list()} # Iterate through all of the consumers for proc in self.active_processes: @@ -379,7 +406,7 @@ def poll(self): # Send the profile signal os.kill(int(proc.pid), signal.SIGPROF) - self._poll_data['processes'].append(proc.name) + self.poll_data['processes'].append(proc.name) # Check if we need to start more processes self.check_process_counts() @@ -391,7 +418,7 @@ def poll_duration_exceeded(self): """ return (time.time() - - self._poll_data['timestamp']) >= self._poll_interval + self.poll_data['timestamp']) >= self.poll_interval def poll_results_check(self): """Check the polling results by checking to see if the stats queue is @@ -402,18 +429,18 @@ def poll_results_check(self): LOGGER.debug('Checking for poll results') while True: try: - stats = self._stats_queue.get(False) + stats = self.stats_queue.get(False) except Queue.Empty: break try: - self._poll_data['processes'].remove(stats['name']) + self.poll_data['processes'].remove(stats['name']) except ValueError: pass self.collect_results(stats) - if self._poll_data['processes']: + if self.poll_data['processes']: LOGGER.warning('Did not receive results from %r', - self._poll_data['processes']) + self.poll_data['processes']) def process(self, consumer_name, process_name): """Return the process handle for the given consumer name and process @@ -424,7 +451,7 @@ def process(self, consumer_name, process_name): :rtype: rejected.process.Process """ - return self._consumers[consumer_name]['processes'][process_name] + return self.consumers[consumer_name].processes[process_name] def process_count(self, name, connection): """Return the process count for the given consumer name and connection. @@ -434,7 +461,7 @@ def process_count(self, name, connection): :rtype: int """ - return len(self._consumers[name]['connections'][connection]) + return len(self.consumers[name].connections[connection]) def process_count_by_consumer(self, name): """Return the process count by consumer only. @@ -444,8 +471,8 @@ def process_count_by_consumer(self, name): """ count = 0 - for connection in self._consumers[name]['connections']: - count += len(self._consumers[name]['connections'][connection]) + for connection in self.consumers[name].connections: + count += len(self.consumers[name].connections[connection]) return count def process_spawn_qty(self, name, connection): @@ -457,8 +484,7 @@ def process_spawn_qty(self, name, connection): :rtype: int """ - return self._consumers[name]['qty'] - self.process_count(name, - connection) + return self.consumers[name].qty - self.process_count(name, connection) def remove_consumer_process(self, consumer, name): """Remove all details for the specified consumer and process name. @@ -467,51 +493,33 @@ def remove_consumer_process(self, consumer, name): :param str name: The process name """ - for conn in self._consumers[consumer]['connections']: - if name in self._consumers[consumer]['connections'][conn]: - self._consumers[consumer]['connections'][conn].remove(name) - if name in self._consumers[consumer]['processes']: + for conn in self.consumers[consumer].connections: + if name in self.consumers[consumer].connections[conn]: + self.consumers[consumer].connections[conn].remove(name) + if name in self.consumers[consumer].processes: try: - self._consumers[consumer]['processes'][name].terminate() + self.consumers[consumer].processes[name].terminate() except OSError: pass - del self._consumers[consumer]['processes'][name] + del self.consumers[consumer].processes[name] def run(self): """When the consumer is ready to start running, kick off all of our consumer consumers and then loop while we process messages. """ - # Set the state to active self.set_state(self.STATE_ACTIVE) - - # Get the consumer section from the config - if 'Consumers' not in self._config.application: - LOGGER.error('Missing Consumers section of configuration, ' - 'aborting: %r', self._config.application) - return self.set_state(self.STATE_STOPPED) - - # Strip consumers if a consumer is specified - if self._consumer: - consumers = self._config.application.Consumers.keys() - for consumer in consumers: - if consumer != self._consumer: - LOGGER.debug('Removing %s for %s only processing', - consumer, self._consumer) - del self._config.application.Consumers[consumer] - - # Setup consumers and start the processes self.setup_consumers() # Set the SIGALRM handler for poll interval signal.signal(signal.SIGALRM, self.on_timer) # Kick off the poll timer - signal.setitimer(signal.ITIMER_REAL, self._poll_interval, 0) + signal.setitimer(signal.ITIMER_REAL, self.poll_interval, 0) # Loop for the lifetime of the app, pausing for a signal to pop up while self.is_running and self.total_process_count: - if self._state != self.STATE_SLEEPING: + if not self.is_sleeping: self.set_state(self.STATE_SLEEPING) signal.pause() @@ -521,14 +529,14 @@ def run(self): @staticmethod def set_process_name(): """Set the process name for the top level process so that it shows up - in logs in a more trackable fasion. + in logs in a more trackable fashion. """ - process = multiprocessing.current_process() + proc = multiprocessing.current_process() for offset in xrange(0, len(sys.argv)): if sys.argv[offset] == '-c': name = sys.argv[offset + 1].split('/')[-1] - process.name = name.split('.')[0] + proc.name = name.split('.')[0] break def set_timer(self, duration): @@ -552,17 +560,16 @@ def start_process(self, name, connection): :param str connection: The connection name """ - process_name, process = self.new_process(name, connection) - + process_name, proc = self.new_process(name, connection) LOGGER.info('Spawning %s process for %s to %s', process_name, name, connection) # Append the process to the consumer process list - self._consumers[name]['processes'][process_name] = process - self._consumers[name]['connections'][connection].append(process_name) + self.consumers[name].processes[process_name] = proc + self.consumers[name].connections[connection].append(process_name) # Start the process - process.start() + proc.start() def start_processes(self, name, connection, quantity): """Start the specified quantity of consumer processes for the given @@ -573,35 +580,17 @@ def start_processes(self, name, connection, quantity): :param int quantity: The quantity of processes to start """ - for process in xrange(0, quantity): - self.start_process(name, connection) + [self.start_process(name, connection) for _i in range(0, quantity)] def setup_consumers(self): """Iterate through each consumer in the configuration and kick off the minimal amount of processes, setting up the runtime data as well. """ - for name in self._config.application.Consumers: - - # Hold the config as a shortcut - config = self._config.application.Consumers[name] - - # If queue is not configured, report the error but skip processes - if 'queue' not in config: - LOGGER.critical('Consumer %s is missing a queue, skipping', - name) - continue - - # Create the dictionary values for this process - self._consumers[name] = self.consumer_dict(config) - - if self._quantity: - self._consumers[name]['qty'] = self._quantity - - # Iterate through the connections to create new consumer processes - for connection in self._consumers[name]['connections']: - self.start_processes(name, connection, - self._consumers[name]['qty']) + for name in self.consumer_cfg.keys(): + self.consumers[name] = self.new_consumer(self.consumer_cfg[name]) + for connection in self.consumers[name].connections: + self.start_processes(name, connection, self.consumers[name].qty) def stop_processes(self): """Iterate through all of the consumer processes shutting them down.""" @@ -615,9 +604,9 @@ def stop_processes(self): # Send SIGABRT LOGGER.info('Sending SIGABRT to active children') - for process in multiprocessing.active_children(): - if int(process.pid) != os.getpid(): - os.kill(int(process.pid), signal.SIGABRT) + for proc in multiprocessing.active_children(): + if int(proc.pid) != os.getpid(): + os.kill(int(proc.pid), signal.SIGABRT) # Wait for them to finish up to MAX_SHUTDOWN_WAIT iterations = 0 @@ -634,7 +623,7 @@ def stop_processes(self): return iterations += 1 - if iterations == self._MAX_SHUTDOWN_WAIT: + if iterations == self.MAX_SHUTDOWN_WAIT: self.kill_processes() break processes = self.total_process_count diff --git a/tests/test_mcp.py b/tests/test_mcp.py index d6c6496..42738c0 100644 --- a/tests/test_mcp.py +++ b/tests/test_mcp.py @@ -1,28 +1,29 @@ """Tests for the MCP""" import mock import multiprocessing +from mock import patch from helper import config from rejected import mcp from . import test_state -class TestMCP(test_state.TestState): - CONFIG = {'poll_interval': 30.0, 'log_stats': True} +class TestMCP(test_state.TestState): - @mock.patch.object(multiprocessing, 'Queue') - def setUp(self, mock_queue_unused): + CONFIG = {'poll_interval': 30.0, 'log_stats': True, 'Consumers': {}} + @patch.object(multiprocessing, 'Queue') + def setUp(self, _mock_queue_unused): self.cfg = config.Config() self.cfg.application.update(self.CONFIG) self._obj = mcp.MasterControlProgram(self.cfg) def test_mcp_init_consumers_dict(self): - self.assertIsInstance(self._obj._consumers, dict) + self.assertIsInstance(self._obj.consumers, dict) def test_mcp_init_consumers_dict_empty(self): - self.assertTrue(not self._obj._consumers, dict) + self.assertTrue(not self._obj.consumers, dict) def test_mcp_init_queue_initialized(self): - self.assertIsInstance(self._obj._stats_queue, mock.MagicMock) + self.assertIsInstance(self._obj.stats_queue, mock.MagicMock) From 5ad585a1eb2485666daf4f4a8653a94c9b5d94a3 Mon Sep 17 00:00:00 2001 From: "Gavin M. Roy" Date: Fri, 24 Apr 2015 00:37:00 -0400 Subject: [PATCH 03/24] Whitespace only --- rejected/data.py | 1 - 1 file changed, 1 deletion(-) diff --git a/rejected/data.py b/rejected/data.py index 4b70658..0fa9309 100644 --- a/rejected/data.py +++ b/rejected/data.py @@ -30,7 +30,6 @@ def __repr__(self): return '<%s(%s)>' % (self.__class__.__name__, items) - class Message(Data): """Class for containing all the attributes about a message object creating a flatter, move convenient way to access the data while supporting the legacy From f9c169268d8cc1dc7acc24a4cf8d53b0c3793f2f Mon Sep 17 00:00:00 2001 From: "Gavin M. Roy" Date: Fri, 24 Apr 2015 00:37:10 -0400 Subject: [PATCH 04/24] Add new 2.6 dependency --- setup.py | 1 + 1 file changed, 1 insertion(+) diff --git a/setup.py b/setup.py index ae976fa..01003e2 100644 --- a/setup.py +++ b/setup.py @@ -20,6 +20,7 @@ extras_require = {'html': ['beautifulsoup4']} if sys.version_info < (2, 7, 0): + install_requires.append('backport_collections') install_requires.append('importlib') setup(name='rejected', From 51c73ebc8faecd5b54617fcced20e8d1b30d5e39 Mon Sep 17 00:00:00 2001 From: "Gavin M. Roy" Date: Fri, 24 Apr 2015 00:37:27 -0400 Subject: [PATCH 05/24] Minor cleanup of __init__ --- rejected/__init__.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/rejected/__init__.py b/rejected/__init__.py index 31b3b38..467a846 100644 --- a/rejected/__init__.py +++ b/rejected/__init__.py @@ -4,7 +4,7 @@ """ __author__ = 'Gavin M. Roy ' __since__ = "2009-09-10" -__version__ = "3.4.6" +__version__ = "3.5.0" from consumer import Consumer from consumer import PublishingConsumer @@ -21,11 +21,8 @@ class NullHandler(logging.Handler): """Python 2.6 does not have a NullHandler""" def emit(self, record): """Emit a record - :param record record: The record to emit - """ pass logging.getLogger('rejected').addHandler(NullHandler()) -logging.getLogger('rejected.consumer').addHandler(NullHandler()) From 5c6cb8486c06907a704474e0af4fdee7bce142c5 Mon Sep 17 00:00:00 2001 From: "Gavin M. Roy" Date: Fri, 24 Apr 2015 00:49:48 -0400 Subject: [PATCH 06/24] yapf reformatting --- rejected/__init__.py | 3 ++ rejected/controller.py | 13 ++++----- rejected/mcp.py | 64 ++++++++++++++++++++++-------------------- rejected/state.py | 23 +++++++-------- 4 files changed, 54 insertions(+), 49 deletions(-) diff --git a/rejected/__init__.py b/rejected/__init__.py index 467a846..a0f800f 100644 --- a/rejected/__init__.py +++ b/rejected/__init__.py @@ -17,12 +17,15 @@ try: from logging import NullHandler except ImportError: + class NullHandler(logging.Handler): """Python 2.6 does not have a NullHandler""" + def emit(self, record): """Emit a record :param record record: The record to emit """ pass + logging.getLogger('rejected').addHandler(NullHandler()) diff --git a/rejected/controller.py b/rejected/controller.py index 66938f2..92ed4c1 100644 --- a/rejected/controller.py +++ b/rejected/controller.py @@ -8,9 +8,7 @@ import signal import sys -from rejected import common from rejected import mcp -from rejected import __version__ LOGGER = logging.getLogger(__name__) @@ -20,6 +18,7 @@ class Controller(helper.Controller): of the OS level concerns. """ + def _master_control_program(self): """Return an instance of the MasterControlProgram. @@ -43,7 +42,6 @@ def _prepend_python_path(self, path): #pragma: no cover def setup(self): """Continue the run process blocking on MasterControlProgram.run""" # If the app was invoked to specified to prepend the path, do so now - common.add_null_handler() if self.args.prepend_path: self._prepend_python_path(self.args.prepend_path) @@ -95,7 +93,7 @@ def add_parser_arguments(): default=None, dest='profile', help='Profile the consumer modules, specifying ' - 'the output directory.') + 'the output directory.') argparser.add_argument('-o', '--only', action='store', default=None, @@ -108,10 +106,11 @@ def add_parser_arguments(): help='Prepend the python path with the value.') argparser.add_argument('-q', '--qty', action='store', - default=1, + type=int, + default=None, dest='quantity', - help='Run the specified quanty of consumer processes' - ' when used in conjunction with -o') + help='Run the specified quantity of consumer ' + 'processes when used in conjunction with -o') def main(): diff --git a/rejected/mcp.py b/rejected/mcp.py index d6da38b..5684d9a 100644 --- a/rejected/mcp.py +++ b/rejected/mcp.py @@ -23,6 +23,7 @@ class Consumer(object): the MCP """ + def __init__(self, connections, last_proc_num, processes, qty, queue): self.connections = connections self.last_proc_num = last_proc_num @@ -132,10 +133,12 @@ def calculate_stats(self, data): # Return a data structure that can be used in reporting out the stats stats['processes'] = len(self.active_processes) - return {'last_poll': timestamp, - 'consumers': consumer_stats, - 'process_data': data, - 'counts': stats} + return { + 'last_poll': timestamp, + 'consumers': consumer_stats, + 'process_data': data, + 'counts': stats + } @staticmethod def calculate_velocity(counts): @@ -205,11 +208,13 @@ def consumer_stats_counter(): :rtype: dict """ - return {process.Process.ERROR: 0, - process.Process.PROCESSED: 0, - process.Process.REDELIVERED: 0, - process.Process.TIME_SPENT: 0, - process.Process.TIME_WAITED: 0} + return { + process.Process.ERROR: 0, + process.Process.PROCESSED: 0, + process.Process.REDELIVERED: 0, + process.Process.TIME_SPENT: 0, + process.Process.TIME_WAITED: 0 + } def get_consumer_process(self, consumer, name): """Get the process object for the specified consumer and process name. @@ -295,8 +300,7 @@ def log_stats(self): LOGGER.info('%i total %s have processed %i messages with %i ' 'errors, waiting %.2f seconds and have spent %.2f seconds ' 'processing messages with an overall velocity of %.2f ' - 'messages per second.', - self.stats['counts']['processes'], + 'messages per second.', self.stats['counts']['processes'], self.consumer_keyword(self.stats['counts']), self.stats['counts']['processed'], self.stats['counts']['failed'], @@ -310,16 +314,14 @@ def log_stats(self): 'of %.2f messages per second.', self.stats['consumers'][key]['processes'], self.consumer_keyword(self.stats['consumers'][key]), - key, - self.stats['consumers'][key]['processed'], + key, self.stats['consumers'][key]['processed'], self.stats['consumers'][key]['failed'], self.stats['consumers'][key]['idle_time'], self.stats['consumers'][key]['processing_time'], self.calculate_velocity(self.stats['consumers'][key])) if self.poll_data['processes']: LOGGER.warning('%i process(es) did not respond with stats in ' - 'time: %r', - len(self.poll_data['processes']), + 'time: %r', len(self.poll_data['processes']), self.poll_data['processes']) def new_consumer(self, config): @@ -329,9 +331,8 @@ def new_consumer(self, config): :rtype: dict """ - return Consumer(dict([(c, []) for c in config['connections']]), - 0, dict(), - config.get('qty', self.DEFAULT_CONSUMER_QTY), + return Consumer(dict([(c, []) for c in config['connections']]), 0, + dict(), config.get('qty', self.DEFAULT_CONSUMER_QTY), config['queue']) def new_process(self, consumer_name, connection_name): @@ -344,15 +345,17 @@ def new_process(self, consumer_name, connection_name): """ process_name = '%s-%s' % (consumer_name, self.new_process_number(consumer_name)) - LOGGER.debug('Creating a new process for %s: %s', - connection_name, process_name) - kwargs = {'config': self.config.application, - 'connection_name': connection_name, - 'consumer_name': consumer_name, - 'profile': self.profile, - 'daemon': False, - 'stats_queue': self.stats_queue, - 'logging_config': self.config.logging} + LOGGER.debug('Creating a new process for %s: %s', connection_name, + process_name) + kwargs = { + 'config': self.config.application, + 'connection_name': connection_name, + 'consumer_name': consumer_name, + 'profile': self.profile, + 'daemon': False, + 'stats_queue': self.stats_queue, + 'logging_config': self.config.logging + } return process_name, process.Process(name=process_name, kwargs=kwargs) def new_process_number(self, name): @@ -417,8 +420,7 @@ def poll_duration_exceeded(self): :rtype: bool """ - return (time.time() - - self.poll_data['timestamp']) >= self.poll_interval + return (time.time() - self.poll_data['timestamp']) >= self.poll_interval def poll_results_check(self): """Check the polling results by checking to see if the stats queue is @@ -561,8 +563,8 @@ def start_process(self, name, connection): """ process_name, proc = self.new_process(name, connection) - LOGGER.info('Spawning %s process for %s to %s', - process_name, name, connection) + LOGGER.info('Spawning %s process for %s to %s', process_name, name, + connection) # Append the process to the consumer process list self.consumers[name].processes[process_name] = proc diff --git a/rejected/state.py b/rejected/state.py index 44f624b..15bf05f 100644 --- a/rejected/state.py +++ b/rejected/state.py @@ -24,14 +24,16 @@ class State(object): STATE_STOPPED = 0x08 # For reverse lookup - STATES = {0x01: 'Initializing', - 0x02: 'Connecting', - 0x03: 'Idle', - 0x04: 'Active', - 0x05: 'Sleeping', - 0x06: 'Stop Requested', - 0x07: 'Shutting down', - 0x08: 'Stopped'} + STATES = { + 0x01: 'Initializing', + 0x02: 'Connecting', + 0x03: 'Idle', + 0x04: 'Active', + 0x05: 'Sleeping', + 0x06: 'Stop Requested', + 0x07: 'Shutting down', + 0x08: 'Stopped' + } def __init__(self): """Initialize the state of the object""" @@ -50,8 +52,8 @@ def set_state(self, new_state): raise ValueError('Invalid state value: %r' % new_state) # Set the state - LOGGER.debug('State changing from %s to %s', - self.STATES[self.state], self.STATES[new_state]) + LOGGER.debug('State changing from %s to %s', self.STATES[self.state], + self.STATES[new_state]) self.state = new_state self.state_start = time.time() @@ -93,7 +95,6 @@ def is_shutting_down(self): """ return self.state == self.STATE_SHUTTING_DOWN - @property def is_sleeping(self): """Returns a bool determining if the process is sleeping From 5d28ed47c77a8aabd7a4a8f23b72e320b1fa494f Mon Sep 17 00:00:00 2001 From: "Gavin M. Roy" Date: Fri, 24 Apr 2015 00:55:36 -0400 Subject: [PATCH 07/24] WIP Consumer updates - Reformat with yapf - Structure so the processing can make use of tornado's concurrency features - Rename Consumer.configuration with Consumer.settings to more closely match Tornado's RequestHandler - Add Consumer.finish to explicitly end async processing - Add Consumer.require_setting to allow consumers to define the config/settings contract - Add Consumer.on_finish, Consumer.shutdown stubs - Add statsd incr/add_timing methods --- rejected/consumer.py | 212 +++++++++++++++++++++++++++++++++---------- 1 file changed, 162 insertions(+), 50 deletions(-) diff --git a/rejected/consumer.py b/rejected/consumer.py index 1064cf6..be8ed39 100644 --- a/rejected/consumer.py +++ b/rejected/consumer.py @@ -13,7 +13,7 @@ ``content_encoding`` types (``gzip`` or ``bzip2``) be specified in the message's property, it will automatically be decoded. -Supported MIME types are: +Supported `SmartConsumer` MIME types are: - application/json - application/pickle @@ -29,7 +29,9 @@ """ import bz2 +from tornado import concurrent import csv +from tornado import gen import json import logging import pickle @@ -39,14 +41,14 @@ import sys import time import uuid +import warnings import yaml import zlib LOGGER = logging.getLogger(__name__) BS4_MIME_TYPES = ('text/html', 'text/xml') -PICKLE_MIME_TYPES = ('application/pickle', - 'application/x-pickle', +PICKLE_MIME_TYPES = ('application/pickle', 'application/x-pickle', 'application/x-vnd.python.pickle', 'application/vnd.python.pickle') YAML_MIME_TYPES = ('text/yaml', 'text/x-yaml') @@ -75,23 +77,38 @@ class Consumer(object): DROP_INVALID_MESSAGES = False MESSAGE_TYPE = None - def __init__(self, configuration): + def __init__(self, settings): """Creates a new instance of a Consumer class. To perform initialization tasks, extend Consumer.initialize - :param dict configuration: The configuration from rejected + :param dict settings: The configuration from rejected """ - self._config = configuration self._channel = None + self._finished = False self._message = None self._message_body = None + self._settings = settings + self._statsd = None # Run any child object specified initialization self.initialize() def initialize(self): - """Extend this method for any initialization tasks""" + """Extend this method for any initialization tasks that occur only when + the `Consumer` class is created.""" + pass + + def prepare(self): + """Called when a message is received before `process`. + + Asynchronous support: Decorate this method with `.gen.coroutine` + or `.return_future` to make it asynchronous (the + `asynchronous` decorator cannot be used on `prepare`). + + If this method returns a `.Future` execution will not proceed + until the `.Future` is done. + """ pass def process(self): @@ -106,41 +123,52 @@ def process(self): """ raise NotImplementedError - def receive(self, message_in): - """Process the message from RabbitMQ. To implement logic for processing - a message, extend Consumer._process, not this method. + def on_finish(self): + """Called after the end of a request. + Override this method to perform cleanup, logging, etc. + This method is a counterpart to `prepare`. ``on_finish`` may + not produce any output, as it is called after the response + has been sent to the client. + """ + pass - :param rejected.Consumer.Message message_in: The message to process - :rtype: bool + def shutdown(self): + """Override to cleanly shutdown when rejected is stopping""" + pass - """ - LOGGER.debug('Received: %r', message_in) - self._message = message_in - self._message_body = None + def finish(self): + """Finishes message processing for the current message.""" + if self._finished: + raise RuntimeError("finish() called twice") + self._finished = True + self.on_finish() + self._clear() - # Validate the message type if the child sets _MESSAGE_TYPE - if self.MESSAGE_TYPE and self.MESSAGE_TYPE != self.message_type: - LOGGER.error('Received a non-supported message type: %s', - self.message_type) + def require_setting(self, name, feature="this feature"): + """Raises an exception if the given app setting is not defined.""" + if not self.settings.get(name): + raise Exception("You must define the '%s' setting in your " + "application to use %s" % (name, feature)) - # Should the message be dropped or returned to the broker? - if self.DROP_INVALID_MESSAGES: - LOGGER.debug('Dropping the invalid message') - return - else: - raise ConsumerException('Invalid message type') + def statsd_add_timing(self, key, duration): + """Add a timing to statsd - # Let the child object process the message - self.process() + :param str key: The key to add the timing to + :param int|float duration: The timing value - def set_channel(self, channel): - """Assign the _channel attribute to the channel that was passed in. - This should not be extended. + """ + if self._statsd: + self._statsd.add_timing(key, duration) - :param pika.channel.Channel channel: The channel to assign + def statsd_incr(self, key, value=1): + """Increment the specified key in statsd if statsd is enabled. + + :param str key: The key to increment + :param int value: The value to increment the key by """ - self._channel = channel + if self._statsd: + self._statsd.incr(key, value) @property def app_id(self): @@ -166,10 +194,16 @@ def configuration(self): """Access the configuration stanza for the consumer as specified by the ``config`` section for the consumer in the rejected configuration. + .. deprecated:: 3.1 + Use :property:`settings` instead. + :rtype: dict """ - return self._config + warnings.warn('Consumer.configuration is deprecated ' + 'in favor of Consumer.settings', + category=DeprecationWarning) + return self._settings @property def content_encoding(self): @@ -306,6 +340,16 @@ def message_type(self): """ return self._message.properties.type + @property + def settings(self): + """Access the consumer settings as specified by the ``config`` section + for the consumer in the rejected configuration. + + :rtype: dict + + """ + return self._settings + @property def timestamp(self): """Access the unix epoch timestamp value from the properties of the @@ -325,6 +369,72 @@ def user_id(self): """ return self._message.properties.user_id + def _clear(self): + """Resets all assigned data for the current message.""" + self._message = None + self._message_body = None + + @gen.coroutine + def _execute(self, message_in): + """Process the message from RabbitMQ. To implement logic for processing + a message, extend Consumer._process, not this method. + + :param rejected.Consumer.Message message_in: The message to process + :rtype: bool + + """ + LOGGER.debug('Received: %r', message_in) + self._message = message_in + self._message_body = None + + # Validate the message type if the child sets _MESSAGE_TYPE + if self.MESSAGE_TYPE and self.MESSAGE_TYPE != self.message_type: + LOGGER.error('Received a non-supported message type: %s', + self.message_type) + + # Should the message be dropped or returned to the broker? + if self.DROP_INVALID_MESSAGES: + LOGGER.debug('Dropping the invalid message') + return + else: + raise ConsumerException('Invalid message type') + + result = self.prepare() + if concurrent.is_future(result): + result = yield result + if result is not None: + raise TypeError("Expected None, got %r" % result) + if self._finished: + return + + # Let the child object process the message + result = self.process() + if concurrent.is_future(result): + result = yield result + if result is not None: + raise TypeError("Expected None, got %r" % result) + + result = self.finish() + if concurrent.is_future(result): + result = yield result + + def _set_channel(self, channel): + """Assign the _channel attribute to the channel that was passed in. + This should not be extended. + + :param pika.channel.Channel channel: The channel to assign + + """ + self._channel = channel + + def _set_statsd(self, statsd): + """Assign a `StatsdClient` instance to the class. + + :param pika.statsd.StatsdClient statsd: The StatsdClient instance + + """ + self._statsd = statsd + class PublishingConsumer(Consumer): """The PublishingConsumer extends the Consumer class, adding two methods, @@ -344,6 +454,7 @@ class PublishingConsumer(Consumer): a :py:class:`ConsumerException` is raised. """ + def publish_message(self, exchange, routing_key, properties, body): """Publish a message to RabbitMQ on the same channel the original message was received on. @@ -365,8 +476,10 @@ def publish_message(self, exchange, routing_key, properties, body): properties=msg_props, body=body) - def reply(self, response_body, properties, auto_id=True, - exchange=None, reply_to=None): + def reply(self, response_body, properties, + auto_id=True, + exchange=None, + reply_to=None): """Reply to the received message. If auto_id is True, a new uuid4 value will be generated for the @@ -409,10 +522,8 @@ def reply(self, response_body, properties, auto_id=True, if properties.reply_to: properties.reply_to = None - self.publish_message(exchange or self._message.exchange, - reply_to, - dict(properties), - response_body) + self.publish_message(exchange or self._message.exchange, reply_to, + dict(properties), response_body) @staticmethod def _get_pika_properties(properties_in): @@ -462,9 +573,9 @@ class SmartConsumer(Consumer): a :py:class:`ConsumerException` is raised. """ - def __init__(self, configuration): - self._message_body = None - super(SmartConsumer, self).__init__(configuration) + + def __init__(self, settings): + super(SmartConsumer, self).__init__(settings) @property def body(self): @@ -613,8 +724,10 @@ class SmartPublishingConsumer(SmartConsumer, PublishingConsumer): """ """ + def publish_message(self, exchange, routing_key, properties, body, - no_serialization=False, no_encoding=False): + no_serialization=False, + no_encoding=False): """Publish a message to RabbitMQ on the same channel the original message was received on. @@ -660,7 +773,6 @@ def publish_message(self, exchange, routing_key, properties, body, properties=properties_out, body=body) - def _auto_encode(self, content_encoding, value): """Based upon the value of the content_encoding, encode the value. @@ -694,17 +806,17 @@ def _auto_serialize(self, content_type, value): LOGGER.debug('Auto-serializing content as Pickle') return self._dump_pickle_value(value) - if content_type == 'application/x-plist': + if content_type == 'application/x-plist': LOGGER.debug('Auto-serializing content as plist') return self._dump_plist_value(value) - if content_type == 'text/csv': + if content_type == 'text/csv': LOGGER.debug('Auto-serializing content as csv') return self._dump_csv_value(value) # If it's XML or HTML auto - if (bs4 and isinstance(value, bs4.BeautifulSoup) and - content_type in ('text/html', 'text/xml')): + if (bs4 and isinstance(value, bs4.BeautifulSoup) and content_type in + ('text/html', 'text/xml')): LOGGER.debug('Dumping BS4 object into HTML or XML') return self._dump_bs4_value(value) @@ -735,7 +847,7 @@ def _dump_csv_value(value): """ buffer = stringio.StringIO() - writer = csv.writer(buffer,quotechar='"', quoting=csv.QUOTE_ALL) + writer = csv.writer(buffer, quotechar='"', quoting=csv.QUOTE_ALL) writer.writerows(value) buffer.seek(0) value = buffer.read() From edf71194658fd2db71ca788853a93073a328729a Mon Sep 17 00:00:00 2001 From: "Gavin M. Roy" Date: Fri, 24 Apr 2015 00:55:48 -0400 Subject: [PATCH 08/24] Whitespace/formatting clenaup --- tests/test_mcp.py | 1 - tests/test_process.py | 279 +++++++++++++++++++----------------------- tests/test_state.py | 6 +- 3 files changed, 127 insertions(+), 159 deletions(-) diff --git a/tests/test_mcp.py b/tests/test_mcp.py index 42738c0..03033b3 100644 --- a/tests/test_mcp.py +++ b/tests/test_mcp.py @@ -26,4 +26,3 @@ def test_mcp_init_consumers_dict_empty(self): def test_mcp_init_queue_initialized(self): self.assertIsInstance(self._obj.stats_queue, mock.MagicMock) - diff --git a/tests/test_process.py b/tests/test_process.py index ed42995..7a43ad2 100644 --- a/tests/test_process.py +++ b/tests/test_process.py @@ -1,15 +1,17 @@ """Tests for rejected.process""" import copy import mock -from pika import channel -from pika import connection -from pika import credentials -import signal +from mock import patch try: import unittest2 as unittest except ImportError: import unittest +from pika import channel +from pika import connection +from pika import credentials +import signal + from helper import config as helper_config from rejected import consumer @@ -18,6 +20,7 @@ from . import test_state + class TestImportNamspacedClass(unittest.TestCase): def test_import_consumer(self): @@ -44,47 +47,55 @@ def test_import_consumer_failure(self): class TestProcess(test_state.TestState): - config = {'Connections': - {'MockConnection': - {'host': 'localhost', - 'port': 5672, - 'user': 'guest', - 'pass': 'guest', - 'vhost': '/'}, - 'MockRemoteConnection': - {'host': 'remotehost', - 'port': 5672, - 'user': 'guest', - 'pass': 'guest', - 'vhost': '/'} - }, - 'Consumers': - {'MockConsumer': - {'consumer': 'tests.mocks.MockConsumer', - 'connections': ['MockConnection'], - 'config': {'test_value': True, - 'num_value': 100}, - 'min': 2, - 'max': 5, - 'max_errors': 10, - 'qos_prefetch': 5, - 'ack': True, - 'queue': 'mock_queue'}, - 'MockConsumer2': - {'consumer': 'mock_consumer.MockConsumer', - 'connections': ['MockConnection', - 'MockRemoteConnection'], - 'config': {'num_value': 50}, - 'min': 1, - 'max': 2, - 'queue': 'mock_you'}}} + config = { + 'Connections': { + 'MockConnection': { + 'host': 'localhost', + 'port': 5672, + 'user': 'guest', + 'pass': 'guest', + 'vhost': '/' + }, + 'MockRemoteConnection': { + 'host': 'remotehost', + 'port': 5672, + 'user': 'guest', + 'pass': 'guest', + 'vhost': '/' + } + }, + 'Consumers': { + 'MockConsumer': { + 'consumer': 'tests.mocks.MockConsumer', + 'connections': ['MockConnection'], + 'config': {'test_value': True, + 'num_value': 100}, + 'min': 2, + 'max': 5, + 'max_errors': 10, + 'qos_prefetch': 5, + 'ack': True, + 'queue': 'mock_queue' + }, + 'MockConsumer2': { + 'consumer': 'mock_consumer.MockConsumer', + 'connections': ['MockConnection', 'MockRemoteConnection'], + 'config': {'num_value': 50}, + 'min': 1, + 'max': 2, + 'queue': 'mock_you' + } + } + } logging_config = helper_config.LoggingConfig(helper_config.Config.LOGGING) - mock_args = {'cfg': config, - 'connection_name': 'MockConnection', - 'consumer_name': 'MockConsumer', - 'stats_queue': 'StatsQueue', - 'logging_config': logging_config} + mock_args = { + 'cfg': config, + 'connection_name': 'MockConnection', + 'consumer_name': 'MockConsumer', + 'stats_queue': 'StatsQueue', + 'logging_config': logging_config + } def setUp(self): self._obj = self.new_process() @@ -95,26 +106,12 @@ def tearDown(self): def new_kwargs(self, kwargs): return copy.deepcopy(kwargs) - def new_counter(self): - return {process.Process.ACKED: 0, - process.Process.CLOSED_ON_COMPLETE: 0, - process.Process.ERROR: 0, - process.Process.FAILURES: 0, - process.Process.UNHANDLED_EXCEPTIONS: 0, - process.Process.PROCESSED: 0, - process.Process.RECONNECTED: 0, - process.Process.REDELIVERED: 0, - process.Process.REJECTED: 0, - process.Process.REQUEUED: 0, - process.Process.TIME_SPENT: 0, - process.Process.TIME_WAITED: 0} - def new_process(self, kwargs=None): - with mock.patch('multiprocessing.Process'): - return process.Process(group=None, - name='MockProcess', - kwargs=kwargs or - self.new_kwargs(self.mock_args)) + with patch('multiprocessing.Process'): + return process.Process( + group=None, + name='MockProcess', + kwargs=kwargs or self.new_kwargs(self.mock_args)) def new_mock_channel(self): return mock.Mock(spec=channel.Channel) @@ -131,21 +128,19 @@ def new_mock_parameters(self, host, port, vhost, user, password): return mock_parameters def get_connection_parameters(self, host, port, vhost, user, password): - with mock.patch('pika.credentials.PlainCredentials'): - with mock.patch('pika.connection.ConnectionParameters'): - return self._obj.get_connection_parameters(host, - port, - vhost, - user, - password, - 500) + with patch('pika.credentials.PlainCredentials'): + with patch('pika.connection.ConnectionParameters'): + return self._obj.get_connection_parameters(host, port, vhost, + user, password, 500) def default_connection_parameters(self): - return {'host': 'rabbitmq', - 'port': 5672, - 'user': 'nose', - 'password': 'mock', - 'vhost': 'unittest'} + return { + 'host': 'rabbitmq', + 'port': 5672, + 'user': 'nose', + 'password': 'mock', + 'vhost': 'unittest' + } def mock_parameters(self): kwargs = self.default_connection_parameters() @@ -153,32 +148,25 @@ def mock_parameters(self): def test_app_id(self): expectation = 'rejected/%s' % __version__ - self.assertEqual(self._obj._AMQP_APP_ID, expectation) - - def test_counter_data_structure(self): - self.assertDictEqual(self._obj.new_counter_dict(), self.new_counter()) + self.assertEqual(self._obj.AMQP_APP_ID, expectation) def test_startup_state(self): new_process = self.new_process() - self.assertEqual(new_process._state, process.Process.STATE_INITIALIZING) + self.assertEqual(new_process.state, process.Process.STATE_INITIALIZING) def test_startup_time(self): mock_time = 123456789.012345 - with mock.patch('time.time', return_value=mock_time): + with patch('time.time', return_value=mock_time): new_process = self.new_process() - self.assertEqual(new_process._state_start, mock_time) - - def test_startup_counts(self): - new_process = self.new_process() - self.assertDictEqual(new_process._counts, self.new_counter()) + self.assertEqual(new_process.state_start, mock_time) def test_startup_channel_is_none(self): new_process = self.new_process() - self.assertIsNone(new_process._channel) + self.assertIsNone(new_process.channel) def test_startup_consumer_is_none(self): new_process = self.new_process() - self.assertIsNone(new_process._consumer) + self.assertIsNone(new_process.consumer) def test_ack_message(self): delivery_tag = 'MockDeliveryTag-1' @@ -192,7 +180,7 @@ def test_ack_message(self): def test_count(self): expectation = 10 - self._obj._counts[process.Process.REDELIVERED] = expectation + self._obj.stats[process.Process.REDELIVERED] = expectation self.assertEqual(self._obj.count(process.Process.REDELIVERED), expectation) @@ -201,14 +189,15 @@ def test_get_config(self): name = 'MockConsumer' number = 5 pid = 1234 - expectation = {'connection': self.config['Connections'][connection], - 'connection_name': connection, - 'consumer_name': name, - 'process_name': '%s_%i_tag_%i' % (name, pid, number)} - with mock.patch('os.getpid', return_value=pid): - self.assertEqual(self._obj.get_config(self.config, number, - name, connection), - expectation) + expectation = { + 'connection': self.config['Connections'][connection], + 'connection_name': connection, + 'consumer_name': name, + 'process_name': '%s_%i_tag_%i' % (name, pid, number) + } + with patch('os.getpid', return_value=pid): + self.assertEqual(self._obj.get_config(self.config, number, name, + connection), expectation) def test_get_consumer_with_invalid_consumer(self): cfg = self.config['Consumers']['MockConsumer2'] @@ -216,28 +205,30 @@ def test_get_consumer_with_invalid_consumer(self): def test_get_consumer_version_output(self): config = {'consumer': 'optparse.OptionParser'} - with mock.patch('logging.Logger.info') as info: + with patch('logging.Logger.info') as info: import optparse self._obj.get_consumer(config) info.assert_called_with('Creating consumer %s v%s', - config['consumer'], - optparse.__version__) + config['consumer'], optparse.__version__) def test_get_consumer_no_version_output(self): config = {'consumer': 'StringIO.StringIO'} - with mock.patch('logging.Logger.info') as info: + with patch('logging.Logger.info') as info: self._obj.get_consumer(config) info.assert_called_with('Creating consumer %s', config['consumer']) - @mock.patch.object(consumer.Consumer, '__init__', side_effect=ImportError) + @patch.object(consumer.Consumer, '__init__', side_effect=ImportError) def test_get_consumer_with_config_is_none(self, mock_method): - config = {'consumer': 'rejected.consumer.Consumer', - 'config': {'field': 'value', 'true': True}} + config = { + 'consumer': 'rejected.consumer.Consumer', + 'config': {'field': 'value', + 'true': True} + } new_process = self.new_process() new_process.get_consumer(config) self.assertIsNone(new_process.get_consumer(config)) - @mock.patch.object(consumer.Consumer, '__init__', side_effect=ImportError) + @patch.object(consumer.Consumer, '__init__', side_effect=ImportError) def test_get_consumer_with_no_config_is_none(self, mock_method): config = {'consumer': 'rejected.consumer.Consumer'} new_process = self.new_process() @@ -246,10 +237,10 @@ def test_get_consumer_with_no_config_is_none(self, mock_method): def test_reject_message(self): delivery_tag = 'MockDeliveryTag-1' mock_reject = mock.Mock() - self._obj._channel = self.new_mock_channel() - self._obj._ack = True - self._obj._message_connection_id = self._obj._connection_id = 1 - self._obj._channel.basic_nack = mock_reject + self._obj.channel = self.new_mock_channel() + self._obj.ack = True + self._obj.message_connection_id = self._obj.connection_id = 1 + self._obj.channel.basic_nack = mock_reject self._obj.reject(delivery_tag) mock_reject.assert_called_once_with(requeue=True, delivery_tag=delivery_tag) @@ -269,43 +260,20 @@ def test_set_qos_prefetch_no_value(self): mock_channel = self.new_mock_channel() new_process._channel = mock_channel new_process.set_qos_prefetch() - value = new_process._qos_prefetch + value = new_process.qos_prefetch mock_channel.basic_qos.assert_called_once_with(prefetch_count=value) - def test_set_state_idle_to_processing_check_time_waited(self): - new_process = self.new_process() - new_process._state = new_process.STATE_IDLE - new_process._counts[new_process.TIME_WAITED] = 0 - prop_mock = mock.Mock() - value = 10 - prop_mock.__get__ = mock.Mock(return_value=value) - with mock.patch.object(process.Process, 'time_in_state', prop_mock): - new_process.set_state(new_process.STATE_PROCESSING) - self.assertEqual(new_process._counts[new_process.TIME_WAITED], - value) - - def test_set_state_processing_to_idle_check_time_spent(self): - new_process = self.new_process() - new_process._state = new_process.STATE_PROCESSING - new_process._counts[new_process.TIME_SPENT] = 0 - prop_mock = mock.Mock() - value = 10 - prop_mock.__get__ = mock.Mock(return_value=value) - with mock.patch.object(process.Process, 'time_in_state', prop_mock): - new_process.set_state(new_process.STATE_IDLE) - self.assertEqual(new_process._counts[new_process.TIME_SPENT], value) - def test_setup_signal_handlers(self): signals = [mock.call(signal.SIGPROF, self._obj.on_sigprof), mock.call(signal.SIGABRT, self._obj.stop)] - with mock.patch('signal.signal') as signal_signal: + with patch('signal.signal') as signal_signal: self._obj.setup_signal_handlers() signal_signal.assert_has_calls(signals, any_order=True) def mock_setup(self, new_process=None, side_effect=None): - with mock.patch('signal.signal', side_effect=side_effect): - with mock.patch('rejected.process.import_consumer', - return_value=(mock.Mock, None)): + with patch('signal.signal', side_effect=side_effect): + with patch('rejected.process.import_consumer', + return_value=(mock.Mock, None)): if not new_process: new_process = self.new_process(self.mock_args) new_process.setup(**self.mock_args) @@ -329,7 +297,7 @@ def test_setup_config(self): def test_setup_with_consumer_config(self): new_process = self.new_process() new_process.setup(**self.mock_args) - self.assertEqual(new_process._consumer._configuration, + self.assertEqual(new_process.consumer._settings, self.config['Consumers']['MockConsumer']['config']) def test_setup_config_queue_name(self): @@ -353,54 +321,55 @@ def test_setup_prefetch_count_no_config(self): mock_process = self.new_process() mock_process.setup(**args) self.assertEqual(mock_process.base_qos_prefetch, - process.Process._QOS_PREFETCH_COUNT) + process.Process.QOS_PREFETCH_COUNT) def test_setup_prefetch_count_with_config(self): mock_process = self.mock_setup() - self.assertEqual(mock_process.base_qos_prefetch, - self.config['Consumers']['MockConsumer']['qos_prefetch']) + self.assertEqual( + mock_process.base_qos_prefetch, + self.config['Consumers']['MockConsumer']['qos_prefetch']) def test_setup_connection_arguments(self): - with mock.patch.object(process.Process, - 'connect_to_rabbitmq') as mock_method: + with patch.object(process.Process, + 'connect_to_rabbitmq') as mock_method: self.mock_setup() mock_method.assert_called_once_with(self.config['Connections'], 'MockConnection') def test_setup_connection_value(self): mock_connection = mock.Mock() - with mock.patch.object(process.Process, 'connect_to_rabbitmq', - return_value=mock_connection): + with patch.object(process.Process, 'connect_to_rabbitmq', + return_value=mock_connection): mock_process = self.mock_setup() self.assertEqual(mock_process._connection, mock_connection) def test_is_idle_state_processing(self): - self._obj._state = self._obj.STATE_PROCESSING + self._obj.state = self._obj.STATE_PROCESSING self.assertFalse(self._obj.is_idle) def test_is_running_state_processing(self): - self._obj._state = self._obj.STATE_PROCESSING + self._obj.state = self._obj.STATE_PROCESSING self.assertTrue(self._obj.is_running) def test_is_shutting_down_state_processing(self): - self._obj._state = self._obj.STATE_PROCESSING + self._obj.state = self._obj.STATE_PROCESSING self.assertFalse(self._obj.is_shutting_down) def test_is_stopped_state_processing(self): - self._obj._state = self._obj.STATE_PROCESSING + self._obj.state = self._obj.STATE_PROCESSING self.assertFalse(self._obj.is_stopped) def test_too_many_errors_true(self): - self._obj._max_error_count = 1 - self._obj._counts[self._obj.ERROR] = 2 + self._obj.max_error_count = 1 + self._obj.stats[self._obj.ERROR] = 2 self.assertTrue(self._obj.too_many_errors) def test_too_many_errors_false(self): - self._obj._max_error_count = 5 - self._obj._counts[self._obj.ERROR] = 2 + self._obj.max_error_count = 5 + self._obj.stats[self._obj.ERROR] = 2 self.assertFalse(self._obj.too_many_errors) def test_state_processing_desc(self): - self._obj._state = self._obj.STATE_PROCESSING + self._obj.state = self._obj.STATE_PROCESSING self.assertEqual(self._obj.state_description, - self._obj._STATES[self._obj.STATE_PROCESSING]) + self._obj.STATES[self._obj.STATE_PROCESSING]) diff --git a/tests/test_state.py b/tests/test_state.py index 0eed4f1..d34a6c7 100644 --- a/tests/test_state.py +++ b/tests/test_state.py @@ -13,15 +13,15 @@ class TestState(unittest.TestCase): def setUp(self): self._obj = state.State() - def testset_state_invalid_value(self): + def test_set_state_invalid_value(self): self.assertRaises(ValueError, self._obj.set_state, 9999) - def testset_state_expected_assignment(self): + def test_set_state_expected_assignment(self): self.state = self._obj.STATE_IDLE self._obj.set_state(self._obj.STATE_CONNECTING) self.assertEqual(self._obj.state, self._obj.STATE_CONNECTING) - def testset_state_state_start(self): + def test_set_state_state_start(self): self.state = self._obj.STATE_IDLE value = 86400 with mock.patch('time.time', return_value=value): From f83eda032dc569bbd704e37fcf925272242a9479 Mon Sep 17 00:00:00 2001 From: "Gavin M. Roy" Date: Fri, 24 Apr 2015 01:00:14 -0400 Subject: [PATCH 09/24] Work in process cleanup of rejected.process.Process - Extract counters to rejected.stats - Moved statsd support to rejected.statsd - Clean up error/failure counting behavior - Remove redundant methods like is_idle - Move private _variables to variables --- rejected/process.py | 564 ++++++++++++++++++-------------------------- rejected/stats.py | 58 +++++ rejected/statsd.py | 88 +++++++ 3 files changed, 371 insertions(+), 339 deletions(-) create mode 100644 rejected/stats.py create mode 100644 rejected/statsd.py diff --git a/rejected/process.py b/rejected/process.py index c36936f..cb1f50a 100644 --- a/rejected/process.py +++ b/rejected/process.py @@ -1,4 +1,5 @@ -"""Consumer process management. Imports consumer code, manages RabbitMQ +""" +Consumer process management. Imports consumer code, manages RabbitMQ connection state and collects stats about the consuming process. """ @@ -8,24 +9,25 @@ import math import multiprocessing import os +from os import path import pika try: import cProfile as profile except ImportError: import profile +from pika import spec from pika.adapters import tornado_connection import signal -import socket import sys import time from tornado import ioloop import traceback - from rejected import __version__ -from rejected import common +from rejected import state from rejected import consumer from rejected import data +from rejected import stats LOGGER = logging.getLogger(__name__) @@ -51,12 +53,12 @@ def import_consumer(value): return getattr(import_handle, parts[-1]), version -class Process(multiprocessing.Process, common.State): +class Process(multiprocessing.Process, state.State): """Core process class that manages the consumer object and communicates with RabbitMQ. """ - _AMQP_APP_ID = 'rejected/%s' % __version__ + AMQP_APP_ID = 'rejected/%s' % __version__ # Additional State constants STATE_PROCESSING = 0x04 @@ -75,61 +77,53 @@ class Process(multiprocessing.Process, common.State): TIME_WAITED = 'idle_time' UNHANDLED_EXCEPTIONS = 'unhandled_exceptions' - _HBINTERVAL = 300 + HB_INTERVAL = 300 # Default message pre-allocation value - _QOS_PREFETCH_COUNT = 1 - _QOS_PREFETCH_MULTIPLIER = 1.25 - _QOS_MAX = 10000 - _MAX_ERROR_COUNT = 5 - _MAX_ERROR_WINDOW = 60 - _MAX_SHUTDOWN_WAIT = 5 - _RECONNECT_DELAY = 10 - - _STATSD_FORMAT = '{0}.consumer.{1}.{2}.{3}:{4}|c\n' - - def __init__(self, group=None, target=None, name=None, args=(), + QOS_PREFETCH_COUNT = 1 + QOS_PREFETCH_MULTIPLIER = 1.25 + QOS_MAX = 10000 + MAX_ERROR_COUNT = 5 + MAX_ERROR_WINDOW = 60 + MAX_SHUTDOWN_WAIT = 5 + RECONNECT_DELAY = 10 + + def __init__(self, + group=None, + target=None, + name=None, + args=(), kwargs=None): if kwargs is None: kwargs = {} super(Process, self).__init__(group, target, name, args, kwargs) - self._ack = True - self._application = None - self._channel = None - self._config = None - self._connection = None - self._connection_id = 0 - self._connection_name = None - self._connections = None - self._consumer = None - self._consumer_name = None - self._counts = self.new_counter_dict() - self._dynamic_qos = True - self._ioloop = None - self._last_counts = dict() - self._last_failure = 0 - self._last_stats_time = None - self._logging_config = dict() - self._message_connection_id = None - self._max_error_count = self._MAX_ERROR_COUNT - self._max_frame_size = pika.spec.FRAME_MAX_SIZE - self._qos_prefetch = None - self._queue_name = None - self._prepend_path = None - self._state = self.STATE_INITIALIZING - self._state_start = time.time() - self._stats_queue = None - self._statsd = False - self._statsd_host = 'localhost' - self._statsd_port = 8125 - self._statsd_prefix = 'rejected' - self._statsd_socket = socket.socket(socket.AF_INET, - socket.SOCK_DGRAM, - socket.IPPROTO_UDP) - self._node_name = socket.gethostname().split('.')[0] + self.ack = True + self.channel = None + self.config = None + self.connection = None + self.connection_id = 0 + self.connection_name = None + self.connections = None + self.consumer = None + self.consumer_name = None + self.dynamic_qos = True + self.ioloop = None + self.last_failure = 0 + self.last_stats_time = None + self.logging_config = dict() + self.message_connection_id = None + self.max_error_count = self.MAX_ERROR_COUNT + self.max_frame_size = spec.FRAME_MAX_SIZE + self.qos_prefetch = None + self.queue_name = None + self.prepend_path = None + self.state = self.STATE_INITIALIZING + self.state_start = time.time() + + self.stats = None # Override ACTIVE with PROCESSING - self._STATES[0x04] = 'Processing' + self.STATES[0x04] = 'Processing' def ack_message(self, delivery_tag): """Acknowledge the message on the broker and log the ack @@ -139,11 +133,11 @@ def ack_message(self, delivery_tag): """ if not self.can_respond: LOGGER.warning('Can not ack message, disconnected from RabbitMQ') - self.increment_count(self.CLOSED_ON_COMPLETE) + self.stats.incr(self.CLOSED_ON_COMPLETE) return LOGGER.debug('Acking %s', delivery_tag) - self._channel.basic_ack(delivery_tag=delivery_tag) - self.increment_count(self.ACKED) + self.channel.basic_ack(delivery_tag=delivery_tag) + self.stats.incr(self.ACKED) def add_on_channel_close_callback(self): """This method tells pika to call the on_channel_closed method if @@ -151,7 +145,7 @@ def add_on_channel_close_callback(self): """ LOGGER.debug('Adding channel close callback') - self._channel.add_on_close_callback(self.on_channel_closed) + self.channel.add_on_close_callback(self.on_channel_closed) def add_on_connection_close_callback(self): """This method adds an on close callback that will be invoked by pika @@ -159,7 +153,7 @@ def add_on_connection_close_callback(self): """ LOGGER.debug('Adding connection close callback') - self._connection.add_on_close_callback(self.on_connection_closed) + self.connection.add_on_close_callback(self.on_connection_closed) @property def base_qos_prefetch(self): @@ -168,7 +162,7 @@ def base_qos_prefetch(self): :rtype: int """ - return self._config.get('qos_prefetch', self._QOS_PREFETCH_COUNT) + return self.config.get('qos_prefetch', self.QOS_PREFETCH_COUNT) def calculate_qos_prefetch(self): """Determine if the channel should use the dynamic QoS value, stay at @@ -177,27 +171,27 @@ def calculate_qos_prefetch(self): :rtype: bool or int """ - if not self._last_stats_time: + if not self.last_stats_time: return qos_prefetch = self.dynamic_qos_pretch # Don't change anything - if qos_prefetch == self._qos_prefetch: + if qos_prefetch == self.qos_prefetch: LOGGER.debug('No change in QoS prefetch calculation of %i', - self._qos_prefetch) + self.qos_prefetch) return False # Don't change anything - if self.count_processed_last_interval < qos_prefetch: + if self.stats.diff(self.PROCESSED) < qos_prefetch: LOGGER.error('Processed fewer messages last interval than the ' 'qos_prefetch value') return False # If calculated QoS exceeds max - if qos_prefetch > self._QOS_MAX: - LOGGER.debug('Hit QoS Max ceiling of %i', self._QOS_MAX) - return self.set_qos_prefetch(self._QOS_MAX) + if qos_prefetch > self.QOS_MAX: + LOGGER.debug('Hit QoS Max ceiling of %i', self.QOS_MAX) + return self.set_qos_prefetch(self.QOS_MAX) # Set to base value if QoS calc is < than the base if self.base_qos_prefetch > qos_prefetch: @@ -206,15 +200,15 @@ def calculate_qos_prefetch(self): return self.set_qos_prefetch() # Increase the QoS setting - if qos_prefetch > self._qos_prefetch: + if qos_prefetch > self.qos_prefetch: LOGGER.debug('QoS calculation is higher than previous: %i > %i', - qos_prefetch, self._qos_prefetch) + qos_prefetch, self.qos_prefetch) return self.set_qos_prefetch(qos_prefetch) # Lower the QoS value based upon the processed qty - if qos_prefetch < self._qos_prefetch: + if qos_prefetch < self.qos_prefetch: LOGGER.debug('QoS calculation is lower than previous: %i < %i', - qos_prefetch, self._qos_prefetch) + qos_prefetch, self.qos_prefetch) return self.set_qos_prefetch(qos_prefetch) @property @@ -225,20 +219,20 @@ def can_respond(self): :return: bool """ - if not self._channel: + if not self.channel: return False - return self._message_connection_id == self._connection_id + return self.message_connection_id == self.connection_id def cancel_consumer_with_rabbitmq(self): """Tell RabbitMQ the process no longer wants to consumer messages.""" LOGGER.debug('Sending a Basic.Cancel to RabbitMQ') - if self._channel and self._channel.is_open: - self._channel.basic_cancel(consumer_tag=self.name) + if self.channel and self.channel.is_open: + self.channel.basic_cancel(consumer_tag=self.name) def close_connection(self): """This method closes the connection to RabbitMQ.""" LOGGER.info('Closing connection') - self._connection.close() + self.connection.close() def connect_to_rabbitmq(self, cfg, name): """Connect to RabbitMQ returning the connection handle. @@ -248,43 +242,18 @@ def connect_to_rabbitmq(self, cfg, name): :rtype: pika.adapters.tornado_connection.TornadoConnection """ - LOGGER.debug('Connecting to %s:%i:%s as %s', - cfg[name]['host'], cfg[name]['port'], - cfg[name]['vhost'], cfg[name]['user']) + LOGGER.debug('Connecting to %s:%i:%s as %s', cfg[name]['host'], + cfg[name]['port'], cfg[name]['vhost'], cfg[name]['user']) self.set_state(self.STATE_CONNECTING) - self._connection_id += 1 - hb_interval = cfg[name].get('heartbeat_interval', self._HBINTERVAL) - parameters = self.get_connection_parameters(cfg[name]['host'], - cfg[name]['port'], - cfg[name]['vhost'], - cfg[name]['user'], - cfg[name]['pass'], - hb_interval) + self.connection_id += 1 + hb_interval = cfg[name].get('heartbeat_interval', self.HB_INTERVAL) + parameters = self.get_connection_parameters( + cfg[name]['host'], cfg[name]['port'], cfg[name]['vhost'], + cfg[name]['user'], cfg[name]['pass'], hb_interval) return tornado_connection.TornadoConnection(parameters, self.on_connection_open, stop_ioloop_on_close=False) - def count(self, stat): - """Return the current count quantity for a specific stat. - - :param str stat: Name of stat to get value for - :rtype: int or float - - """ - return self._counts.get(stat, 0) - - @property - def count_processed_last_interval(self): - """Return the number of messages counted in the last interval. If - there is no last interval counts, return 0. - - :rtype: int - - """ - if not self._last_counts: - return 0 - return self._counts[self.PROCESSED] - self._last_counts[self.PROCESSED] - @property def dynamic_qos_pretch(self): """Calculate the prefetch count based upon the message velocity * the @@ -295,7 +264,7 @@ def dynamic_qos_pretch(self): """ # Round up the velocity * the multiplier value = int(math.ceil(self.message_velocity * - float(self._QOS_PREFETCH_MULTIPLIER))) + float(self.QOS_PREFETCH_MULTIPLIER))) LOGGER.debug('Calculated prefetch value: %i', value) return value @@ -310,10 +279,12 @@ def get_config(cfg, number, name, connection): :rtype: dict """ - return {'connection': cfg['Connections'][connection], - 'connection_name': connection, - 'consumer_name': name, - 'process_name': '%s_%i_tag_%i' % (name, os.getpid(), number)} + return { + 'connection': cfg['Connections'][connection], + 'connection_name': connection, + 'consumer_name': name, + 'process_name': '%s_%i_tag_%i' % (name, os.getpid(), number) + } def get_connection_parameters(self, host, port, vhost, username, password, heartbeat_interval): @@ -330,7 +301,7 @@ def get_connection_parameters(self, host, port, vhost, username, password, """ credentials = pika.PlainCredentials(username, password) return pika.ConnectionParameters(host, port, vhost, credentials, - frame_max=self._max_frame_size, + frame_max=self.max_frame_size, socket_timeout=10, heartbeat_interval=heartbeat_interval) @@ -376,17 +347,6 @@ def get_consumer(cfg): LOGGER.error(line) return - def increment_count(self, counter, value=1): - """Increment the specified counter, checking to see if the counter is - the error counter. if it is, check to see if there have been too many - errors and if it needs to reconnect. - - :param str counter: The counter name passed in from the constant - :param int|float value: The amount to increment by - - """ - self._counts[counter] += value - def invoke_consumer(self, message): """Wrap the actual processor processing bits @@ -399,10 +359,8 @@ def invoke_consumer(self, message): # Try and process the message try: LOGGER.debug('Processing message') - self._consumer.receive(message) - + self.consumer._execute(message) except KeyboardInterrupt: - self.reject(message.delivery_tag, True) self.stop() return False @@ -420,7 +378,7 @@ def invoke_consumer(self, message): except consumer.ConsumerException as error: self.record_exception(error, True, sys.exc_info()) self.reject(message.delivery_tag, True) - self.processing_failure() + self.processing_error() return False except consumer.MessageException as error: @@ -431,20 +389,11 @@ def invoke_consumer(self, message): except Exception as error: self.record_exception(error, True, sys.exc_info()) self.reject(message.delivery_tag, True) - self.processing_failure() + self.processing_error() return False return True - @property - def is_idle(self): - """Is the system idle - - :rtype: bool - - """ - return self._state == self.STATE_IDLE - @property def is_processing(self): """Returns a bool specifying if the consumer is currently processing @@ -452,7 +401,7 @@ def is_processing(self): :rtype: bool """ - return self._state in [self.STATE_PROCESSING, self.STATE_STOP_REQUESTED] + return self.state in [self.STATE_PROCESSING, self.STATE_STOP_REQUESTED] @property def message_velocity(self): @@ -461,9 +410,8 @@ def message_velocity(self): :rtype: float """ - processed = self.count_processed_last_interval - duration = time.time() - self._last_stats_time - LOGGER.debug('Processed %i messages in %i seconds', processed, duration) + processed = self.stats.diff(self.PROCESSED) + duration = time.time() - self.last_stats_time # If there were no messages, do not calculate, use the base if not processed or not duration: @@ -474,25 +422,6 @@ def message_velocity(self): LOGGER.debug('Message processing velocity: %.2f', velocity) return velocity - def new_counter_dict(self): - """Return a dict object for our internal stats keeping. - - :rtype: dict - - """ - return {self.ACKED: 0, - self.CLOSED_ON_COMPLETE: 0, - self.ERROR: 0, - self.FAILURES: 0, - self.UNHANDLED_EXCEPTIONS: 0, - self.PROCESSED: 0, - self.RECONNECTED: 0, - self.REDELIVERED: 0, - self.REJECTED: 0, - self.REQUEUED: 0, - self.TIME_SPENT: 0, - self.TIME_WAITED: 0} - def on_channel_closed(self, method_frame): """Invoked by pika when RabbitMQ unexpectedly closes the channel. Channels are usually closed if you attempt to do something that @@ -506,7 +435,7 @@ def on_channel_closed(self, method_frame): LOGGER.critical('Channel was closed: (%s) %s', method_frame.method.reply_code, method_frame.method.reply_text) - del self._channel + del self.channel raise ReconnectConnection def on_channel_open(self, channel): @@ -518,30 +447,30 @@ def on_channel_open(self, channel): """ LOGGER.debug('Channel opened') - self._channel = channel + self.channel = channel self.add_on_channel_close_callback() self.setup_channel() - def on_connection_closed(self, unused, code, text): + def on_connection_closed(self, _unused, code, text): """This method is invoked by pika when the connection to RabbitMQ is closed unexpectedly. Since it is unexpected, we will reconnect to RabbitMQ if it disconnects. - :param pika.connection.Connection unused: The closed connection + :param pika.connection.Connection _unused: The closed connection """ LOGGER.critical('Connection from RabbitMQ closed in state %s (%s, %s)', self.state_description, code, text) - self._channel = None + self.channel = None if not self.is_shutting_down and not self.is_waiting_to_shutdown: self.reconnect() - def on_connection_open(self, unused): + def on_connection_open(self, _unused): """This method is called by pika once the connection to RabbitMQ has been established. It passes the handle to the connection object in case we need it, but in this case, we'll just mark it unused. - :type unused: pika.adapters.tornado_connection.TornadoConnection + :type _unused: pika.adapters.tornado_connection.TornadoConnection """ LOGGER.debug('Connection opened') @@ -560,51 +489,40 @@ def on_ready_to_stop(self): signal.signal(signal.SIGTERM, signal.SIG_IGN) # If the connection is still around, close it - if self._connection.is_open: + if self.connection.is_open: LOGGER.debug('Closing connection to RabbitMQ') - self._connection.close() + self.connection.close() # Allow the consumer to gracefully stop and then stop the IOLoop self.stop_consumer() # Stop the IOLoop LOGGER.debug('Stopping IOLoop') - self._ioloop.stop() + self.ioloop.stop() # Note that shutdown is complete and set the state accordingly self.set_state(self.STATE_STOPPED) LOGGER.info('Shutdown complete') os.kill(os.getppid(), signal.SIGALRM) - def on_sigprof(self, unused_signum, unused_frame): + def on_sigprof(self, _unused_signum, _unused_frame): """Called when SIGPROF is sent to the process, will dump the stats, in future versions, queue them for the master process to get data. - :param int unused_signum: The signal number - :param frame unused_frame: The python frame the signal was received at + :param int _unused_signum: The signal number + :param frame _unused_frame: The python frame the signal was received at """ - values = dict() + self.stats.report() if self.is_processing or self.is_idle: self.calculate_qos_prefetch() - for key in self._counts.keys(): - values[key] = self._counts[key] - self._last_counts.get(key, 0) - self._last_counts[key] = self._counts[key] - if self._statsd: - self.send_counter_to_statsd(key, values[key]) - - self._stats_queue.put({'name': self.name, - 'consumer_name': self._consumer_name, - 'counts': values}, True) - - LOGGER.debug('Currently %s: %r', self.state_description, values) - self._last_stats_time = time.time() + self.last_stats_time = time.time() signal.siginterrupt(signal.SIGPROF, False) def open_channel(self): """Open a channel on the existing open connection to RabbitMQ""" - LOGGER.debug('Opening a channel on %r', self._connection) - self._connection.channel(self.on_channel_open) + LOGGER.debug('Opening a channel on %r', self.connection) + self.connection.channel(self.on_channel_open) def process(self, channel=None, method=None, properties=None, body=None): """Process a message from Rabbit @@ -619,75 +537,89 @@ def process(self, channel=None, method=None, properties=None, body=None): LOGGER.critical('Received a message while in state: %s', self.state_description) return self.reject(method.delivery_tag, True) + self.set_state(self.STATE_PROCESSING) + LOGGER.debug('Received message #%s', method.delivery_tag) message = data.Message(channel, method, properties, body) if method.redelivered: - self.increment_count(self.REDELIVERED) - self._state_start = time.time() - if not self.invoke_consumer(message): - LOGGER.debug('Bypassing ack due to False return from _process') + self.stats.incr(self.REDELIVERED) + + LOGGER.debug('Invoking consumer') + + start_time = time.time() + result = self.invoke_consumer(message) + self.stats.add_timing(self.TIME_SPENT, time.time() - start_time) + + if not result: + LOGGER.debug('Bypassing ack due to False return consumer') return - self.increment_count(self.PROCESSED) - if self._ack: + LOGGER.debug('Post invoke consumer') + + self.stats.incr(self.PROCESSED) + if self.ack: self.ack_message(method.delivery_tag) if self.is_waiting_to_shutdown: return self.on_ready_to_stop() self.reset_state() - @property - def profile_file(self): - if not self._kwargs['profile']: - return None - - if os.path.exists(self._kwargs['profile']) and \ - os.path.isdir(self._kwargs['profile']): - return '%s/%s-%s.prof' % (os.path.normpath(self._kwargs['profile']), - os.getpid(), - self._kwargs['consumer_name']) - return None - - def processing_failure(self): + def processing_error(self): """Called when message processing failure happens due to a ConsumerException or an unhandled exception. """ - duration = time.time() - self._last_failure - if duration > self._MAX_ERROR_WINDOW: + duration = time.time() - self.last_failure + if duration > self.MAX_ERROR_WINDOW: LOGGER.info('Resetting failure window, %i seconds since last', duration) - self.reset_failure_counter() - self.increment_count(self.FAILURES, -1) - self._last_failure = time.time() - if self._counts[self.FAILURES] == 0: + self.reset_error_counter() + self.stats.incr(self.ERROR) + self.last_failure = time.time() + if self.too_many_errors: LOGGER.critical('Error threshold exceeded (%i), reconnecting', - self._counts[self.ERROR]) + self.stats[self.ERROR]) self.cancel_consumer_with_rabbitmq() self.close_connection() self.reconnect() + @property + def profile_file(self): + """Return the full path to write the cProfile data + + :return: str + + """ + if not self._kwargs['profile']: + return None + if os.path.exists(self._kwargs['profile']) and \ + os.path.isdir(self._kwargs['profile']): + return '%s/%s-%s.prof' % (path.normpath(self._kwargs['profile']), + os.getpid(), + self._kwargs['consumer_name']) + return None + def reconnect(self): """Reconnect to RabbitMQ after sleeping for _RECONNECT_DELAY""" LOGGER.info('Reconnecting to RabbitMQ in %i seconds', - self._RECONNECT_DELAY) - self.increment_count(self.RECONNECTED) + self.RECONNECT_DELAY) + self.stats.incr(self.RECONNECTED) self.set_state(self.STATE_INITIALIZING) - if self._connection: - if self._connection.socket: - fd = self._connection.socket.fileno() - self._ioloop.remove_handler(fd) - self._connection = None + if self.connection: + if self.connection.socket: + fd = self.connection.socket.fileno() + self.ioloop.remove_handler(fd) + self.connection = None - self._ioloop.add_timeout(time.time() + self._RECONNECT_DELAY, - self._reconnect) + self.ioloop.add_timeout(time.time() + self.RECONNECT_DELAY, + self._reconnect) def _reconnect(self): """Create and set the RabbitMQ connection""" LOGGER.info('Connecting to RabbitMQ') - self.reset_failure_counter() - self._connection = self.connect_to_rabbitmq(self._connections, - self._connection_name) + self.reset_error_counter() + self.connection = self.connect_to_rabbitmq(self.connections, + self.connection_name) self.setup_signal_handlers() def record_exception(self, error, handled=False, exc_info=None): @@ -697,7 +629,7 @@ def record_exception(self, error, handled=False, exc_info=None): :param bool handled: Was the exception handled """ - self.increment_count(self.ERROR) + self.stats.incr(self.ERROR) if handled: if not isinstance(error, consumer.MessageException): LOGGER.exception('Processor handled %s: %s', @@ -705,12 +637,12 @@ def record_exception(self, error, handled=False, exc_info=None): else: LOGGER.exception('Processor threw an uncaught exception %s: %s', error.__class__.__name__, error) - self.increment_count(self.UNHANDLED_EXCEPTIONS) + self.stats.incr(self.UNHANDLED_EXCEPTIONS) if not isinstance(error, consumer.MessageException) and exc_info: formatted_lines = traceback.format_exception(*exc_info) for offset, line in enumerate(formatted_lines): - LOGGER.debug('(%s) %i: %s', error.__class__.__name__, - offset, line.strip()) + LOGGER.debug('(%s) %i: %s', error.__class__.__name__, offset, + line.strip()) def reject(self, delivery_tag, requeue=True): """Reject the message on the broker and log it. We should move this to @@ -720,26 +652,25 @@ def reject(self, delivery_tag, requeue=True): :param bool requeue: Specify if the message should be re-queued or not """ - if not self._ack: + if not self.ack: raise RuntimeError('Can not rejected messages when ack is False') if not self.can_respond: LOGGER.warning('Can not reject message, disconnected from RabbitMQ') - self.increment_count(self.CLOSED_ON_COMPLETE) + self.stats.incr(self.CLOSED_ON_COMPLETE) if self.is_processing: self.reset_state() return - LOGGER.warning('Rejecting message %s %s requeue', delivery_tag, - 'with' if requeue else 'without') - self._channel.basic_nack(delivery_tag=delivery_tag, requeue=requeue) - self.increment_count(self.REQUEUED if requeue else self.REJECTED) + LOGGER.warning('Rejecting message %s %s requeue', delivery_tag, 'with' + if requeue else 'without') + self.channel.basic_nack(delivery_tag=delivery_tag, requeue=requeue) + self.stats.incr(self.REQUEUED if requeue else self.REJECTED) if self.is_processing: self.reset_state() - def reset_failure_counter(self): - """Reset the failure counter to the max error count""" - LOGGER.debug('Resetting the failure counter to %i', - self._max_error_count) - self._counts[self.FAILURES] = self._max_error_count + def reset_error_counter(self): + """Reset the error counter to 0""" + LOGGER.debug('Resetting the error counter') + self.stats[self.ERROR] = 0 def reset_state(self): """Reset the runtime state after processing a message to either idle @@ -764,46 +695,30 @@ def run(self): self.profile_file) else: self._run() - LOGGER.debug('Exiting %s (%i, %i)', self.name, os.getpid(), os.getppid()) + LOGGER.debug('Exiting %s (%i, %i)', self.name, os.getpid(), + os.getppid()) def _run(self): """Run method that can be profiled""" - self._ioloop = ioloop.IOLoop.instance() - common.add_null_handler() + self.ioloop = ioloop.IOLoop.instance() try: - self.setup(self._kwargs['config'], - self._kwargs['connection_name'], + self.setup(self._kwargs['config'], self._kwargs['connection_name'], self._kwargs['consumer_name'], self._kwargs['stats_queue'], self._kwargs['logging_config']) except ImportError as error: name = self._kwargs['consumer_name'] - classname = self._kwargs['config']['Consumers'][name]['consumer'] + class_name = self._kwargs['config']['Consumers'][name]['consumer'] LOGGER.critical('Could not import %s, stopping process: %r', - classname, error) + class_name, error) return if not self.is_stopped: try: - self._ioloop.start() + self.ioloop.start() except KeyboardInterrupt: LOGGER.warning('CTRL-C while waiting for clean shutdown') - def send_counter_to_statsd(self, counter, value=1): - """Send a metric passed in to statsd. - - :param str counter: The counter name - :param int|float value: The count - - """ - payload = self._STATSD_FORMAT.format(self._statsd_prefix, - self._node_name, - self._consumer_name, - counter, - math.ceil(value)) - self._statsd_socket.sendto(payload, (self._statsd_host, - self._statsd_port)) - def set_qos_prefetch(self, value=None): """Set the QOS Prefetch count for the channel. @@ -811,27 +726,10 @@ def set_qos_prefetch(self, value=None): """ qos_prefetch = int(value or self.base_qos_prefetch) - if qos_prefetch != self._qos_prefetch: - self._qos_prefetch = qos_prefetch + if qos_prefetch != self.qos_prefetch: + self.qos_prefetch = qos_prefetch LOGGER.debug('Setting the QOS Prefetch to %i', qos_prefetch) - self._channel.basic_qos(prefetch_count=qos_prefetch) - - def set_state(self, new_state): - """Assign the specified state to this consumer object. - - :param int new_state: The new state of the object - :raises: ValueError - - """ - # Keep track of how much time we're spending waiting and processing - if new_state == self.STATE_PROCESSING and self.is_idle: - self.increment_count(self.TIME_WAITED, self.time_in_state) - - elif new_state == self.STATE_IDLE and self.is_processing: - self.increment_count(self.TIME_SPENT, self.time_in_state) - - # Use the parent object to set the state - super(Process, self).set_state(new_state) + self.channel.basic_qos(prefetch_count=qos_prefetch) def setup(self, cfg, connection_name, consumer_name, stats_queue, logging_config): @@ -845,47 +743,44 @@ def setup(self, cfg, connection_name, consumer_name, stats_queue, :param dict logging_config: Logging config from YAML file """ - LOGGER.info('Initializing for %s on %s connection', - self.name, connection_name) - self._logging_config = logging_config - self._stats_queue = stats_queue - self._connection_name = connection_name - self._consumer_name = consumer_name - self._config = cfg['Consumers'][consumer_name] - self._connections = cfg['Connections'] - self._consumer = self.get_consumer(self._config) + LOGGER.info('Initializing for %s on %s connection', self.name, + connection_name) + self.logging_config = logging_config + self.connection_name = connection_name + self.consumer_name = consumer_name + self.config = cfg['Consumers'][consumer_name] + self.connections = cfg['Connections'] + self.consumer = self.get_consumer(self.config) - if not self._consumer: + if not self.consumer: LOGGER.critical('Could not import and start processor') self.set_state(self.STATE_STOPPED) exit(1) - self._queue_name = self._config['queue'] - self._ack = self._config.get('ack', True) - self._dynamic_qos = self._config.get('dynamic_qos', False) - self._max_error_count = int(self._config.get('max_errors', - self._MAX_ERROR_COUNT)) - self._max_frame_size = self._config.get('max_frame_size', - pika.spec.FRAME_MAX_SIZE) - - self._statsd = False - if 'statsd' in cfg and cfg['statsd'].get('enabled', False): - self._statsd = True - self._statsd_prefix = cfg['statsd'].get('prefix', 'rejected') - self._statsd_host = \ - cfg['statsd'].get('host', os.environ.get('STATSD_HOST', - 'localhost')) - self._statsd_port = \ - cfg['statsd'].get('port', os.environ.get('STATSD_PORT', 8125)) - if self._statsd_host != os.environ.get('STATSD_HOST', None): - os.putenv('STATSD_HOST', self._statsd_host) - if self._statsd_port != os.environ.get('STATSD_PORT', None): - os.putenv('STATSD_PORT', str(self._statsd_port)) - - self.reset_failure_counter() + # Setup the stats counter instance + self.stats = stats.Stats(self.name, consumer_name, stats_queue, + cfg['statsd'] or {}) + + # Set statsd in the consumer + if self.stats.statsd: + try: + self.consumer._set_statsd(self.stats.statsd) + except AttributeError: + LOGGER.info('Consumer does not support statsd assignment') + + # Consumer settings + self.ack = self.config.get('ack', True) + self.dynamic_qos = self.config.get('dynamic_qos', False) + self.max_error_count = int(self.config.get('max_errors', + self.MAX_ERROR_COUNT)) + self.max_frame_size = self.config.get('max_frame_size', + spec.FRAME_MAX_SIZE) + self.queue_name = self.config['queue'] + + self.reset_error_counter() self.setup_signal_handlers() - self._connection = self.connect_to_rabbitmq(self._connections, - self._connection_name) + self.connection = self.connect_to_rabbitmq(self.connections, + self.connection_name) def setup_channel(self): """Setup the channel that will be used to communicate with RabbitMQ and @@ -897,17 +792,17 @@ def setup_channel(self): # Set the channel in the consumer try: - self._consumer.set_channel(self._channel) + self.consumer._set_channel(self.channel) except AttributeError: LOGGER.info('Consumer does not support channel assignment') # Setup QoS, Send a Basic.Recover and then Basic.Consume self.set_qos_prefetch() - self._channel.basic_recover(requeue=True) - self._channel.basic_consume(consumer_callback=self.process, - queue=self._queue_name, - no_ack=not self._ack, - consumer_tag=self.name) + self.channel.basic_recover(requeue=True) + self.channel.basic_consume(consumer_callback=self.process, + queue=self.queue_name, + no_ack=not self.ack, + consumer_tag=self.name) def setup_signal_handlers(self): """Setup the stats and stop signal handlers.""" @@ -926,9 +821,9 @@ def start_message_processing(self): message is processing. """ - self._message_connection_id = self._connection_id + self.message_connection_id = self.connection_id - def stop(self, signum=None, frame_unused=None): + def stop(self, signum=None, _frame_unused=None): """Stop the consumer from consuming by calling BasicCancel and setting our state. @@ -964,19 +859,10 @@ def stop_consumer(self): """ try: LOGGER.info('Shutting down the consumer') - self._consumer.shutdown() + self.consumer.shutdown() except AttributeError: LOGGER.debug('Consumer does not have a shutdown method') - @property - def time_in_state(self): - """Return the time that has been spent in the current state. - - :rtype: float - - """ - return time.time() - self._state_start - @property def too_many_errors(self): """Return a bool if too many errors have occurred. @@ -984,7 +870,7 @@ def too_many_errors(self): :rtype: bool """ - return self.count(self.ERROR) >= self._max_error_count + return self.stats[self.ERROR] >= self.max_error_count class ReconnectConnection(Exception): diff --git a/rejected/stats.py b/rejected/stats.py new file mode 100644 index 0000000..8222b13 --- /dev/null +++ b/rejected/stats.py @@ -0,0 +1,58 @@ +""" +Stats class that wraps the collections.Counter object and transparently +passes calls to increment and add_timing if statsd is enabled. + +""" +try: + import backport_collections as collections +except ImportError: + import collections +from rejected import statsd + + +class Stats(object): + + def __init__(self, name, consumer_name, queue, statsd_cfg): + self.name = name + self.consumer_name = name + self.statsd = None + if statsd_cfg.get('enabled', False): + self.statsd = statsd.StatsdClient(consumer_name, statsd_cfg) + self.counter = collections.Counter() + self.previous = collections.Counter() + self.reporting_queue = queue + + def __getitem__(self, item): + return self.counter.get(item) + + def __setitem__(self, item, value): + self.counter[item] = value + + def add_timing(self, item, duration): + if self.statsd: + self.statsd.add_timing(item, duration) + + def get(self, item): + return self.counter.get(item) + + def diff(self, item): + return self.counter[item] - self.previous[item] + + def incr(self, key, value=1): + self.counter[key] += value + if self.statsd: + self.statsd.incr(key, value) + + def report(self): + """Submit the stats data to both the MCP stats queue and statsd""" + values = dict() + for item in self.counter.keys(): + values[item] = self.diff(item) + if self.statsd: + self.statsd.incr(item, values[item]) + self.previous.update(self.counter) + self.reporting_queue.put({ + 'name': self.name, + 'consumer_name': self.consumer_name, + 'counts': values + }, True) diff --git a/rejected/statsd.py b/rejected/statsd.py new file mode 100644 index 0000000..b57c7f1 --- /dev/null +++ b/rejected/statsd.py @@ -0,0 +1,88 @@ +""" +Statsd Client that takes configuration first from the rejected configuration +file, falling back to environment variables, and finally default values. + +Environment Variables: + +- STATSD_HOST +- STATSD_PORT +- STATSD_PREFIX + +""" +import logging +import os +import socket + +LOGGER = logging.getLogger(__name__) + + +class StatsdClient(object): + """ + + """ + DEFAULT_HOST = 'localhost' + DEFAULT_PORT = 8125 + DEFAULT_PREFIX = 'rejected' + PAYLOAD_FORMAT = '{0}.{1}.{2}.{3}:{4}|{5}' + + def __init__(self, consumer_name, settings): + """ + + :param cfg: + + """ + self._consumer_name = consumer_name + self._hostname = socket.gethostname().split('.')[0] + self._settings = settings + + self._addr = (self._setting('host', self.DEFAULT_HOST), + int(self._setting('port', self.DEFAULT_PORT))) + self._prefix = self._setting('prefix', self.DEFAULT_PREFIX) + self._socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, + socket.IPPROTO_UDP) + + def _setting(self, key, default): + """Return the setting, checking config, then the appropriate + environment variable, falling back to the default. + + :param str key: The key to get + :param any default: The default value if not set + :return: str + + """ + env = 'STATSD_{}'.format(key).upper() + return self._settings.get(key, os.environ.get(env, default)) + + def add_timing(self, key, value=0): + """Add a timer value to statsd for the specified key + + :param str key: The key to add the timing to + :param int|float value: The value of the timing + + """ + self._send(key, value, 'ms') + + def incr(self, key, value=1): + """Increment the counter value in statsd + + :param str key: The key to increment + :param int value: The value to increment by, defaults to 1 + + """ + self._send(key, value, 'c') + + def _send(self, key, value, metric_type): + """Send the specified value to the statsd daemon via UDP without a + direct socket connection. + + :param str key: The key name to send + :param int|float value: The value for the key + + """ + try: + payload = self.PAYLOAD_FORMAT.format(self._prefix, self._hostname, + self._consumer_name, key, + value, metric_type).encode() + self._socket.sendto(payload, self._addr) + except socket.error: + LOGGER.exception('Error sending statsd metric') From 80ebe17c521338807991d49937565a535bddd26d Mon Sep 17 00:00:00 2001 From: "Gavin M. Roy" Date: Fri, 24 Apr 2015 21:30:38 -0400 Subject: [PATCH 10/24] Implement the structure for async (and sync) Consumers can now work as a @gen.coroutine or normal sync process --- rejected/consumer.py | 11 +++---- rejected/process.py | 71 ++++++++++++++++++++++++-------------------- 2 files changed, 42 insertions(+), 40 deletions(-) diff --git a/rejected/consumer.py b/rejected/consumer.py index be8ed39..1bed418 100644 --- a/rejected/consumer.py +++ b/rejected/consumer.py @@ -371,6 +371,7 @@ def user_id(self): def _clear(self): """Resets all assigned data for the current message.""" + self._finished = False self._message = None self._message_body = None @@ -384,8 +385,8 @@ def _execute(self, message_in): """ LOGGER.debug('Received: %r', message_in) + self._clear() self._message = message_in - self._message_body = None # Validate the message type if the child sets _MESSAGE_TYPE if self.MESSAGE_TYPE and self.MESSAGE_TYPE != self.message_type: @@ -393,7 +394,7 @@ def _execute(self, message_in): self.message_type) # Should the message be dropped or returned to the broker? - if self.DROP_INVALID_MESSAGES: + if not self.DROP_INVALID_MESSAGES: LOGGER.debug('Dropping the invalid message') return else: @@ -407,16 +408,12 @@ def _execute(self, message_in): if self._finished: return - # Let the child object process the message result = self.process() if concurrent.is_future(result): result = yield result if result is not None: raise TypeError("Expected None, got %r" % result) - - result = self.finish() - if concurrent.is_future(result): - result = yield result + self.finish() def _set_channel(self, channel): """Assign the _channel attribute to the channel that was passed in. diff --git a/rejected/process.py b/rejected/process.py index cb1f50a..7f34f15 100644 --- a/rejected/process.py +++ b/rejected/process.py @@ -4,6 +4,7 @@ """ from pika import exceptions +from tornado import gen import importlib import logging import math @@ -347,7 +348,24 @@ def get_consumer(cfg): LOGGER.error(line) return - def invoke_consumer(self, message): + def on_processed(self, start_time, method, result): + LOGGER.debug('Post invoke consumer') + self.stats.add_timing(self.TIME_SPENT, time.time() - start_time) + + if not result: + LOGGER.debug('Bypassing ack due to False return consumer') + return + + self.stats.incr(self.PROCESSED) + if self.ack: + self.ack_message(method.delivery_tag) + if self.is_waiting_to_shutdown: + self.on_ready_to_stop() + else: + self.reset_state() + + @gen.engine + def invoke_consumer(self, method, message): """Wrap the actual processor processing bits :param Message message: Message to process @@ -355,44 +373,52 @@ def invoke_consumer(self, message): """ self.start_message_processing() + start_time = time.time() # Try and process the message try: - LOGGER.debug('Processing message') - self.consumer._execute(message) + result = self.consumer._execute(message) + yield result + possible_exception = result.exception() + if possible_exception: + raise possible_exception + self.on_processed(start_time, method, True) + except KeyboardInterrupt: + LOGGER.debug('CTRL-C') self.reject(message.delivery_tag, True) self.stop() - return False + self.on_processed(start_time, method, False) except exceptions.ChannelClosed as error: LOGGER.critical('RabbitMQ closed the channel: %r', error) self.reconnect() - return False + self.on_processed(start_time, method, False) except exceptions.ConnectionClosed as error: LOGGER.critical('RabbitMQ closed the connection: %r', error) self.reconnect() - return False + self.on_processed(start_time, method, False) except consumer.ConsumerException as error: + LOGGER.debug('Consumer Exception') self.record_exception(error, True, sys.exc_info()) self.reject(message.delivery_tag, True) self.processing_error() - return False + self.on_processed(start_time, method, False) except consumer.MessageException as error: + LOGGER.debug('Message Exception') self.record_exception(error, True, sys.exc_info()) self.reject(message.delivery_tag, False) - return False + self.on_processed(start_time, method, False) except Exception as error: + LOGGER.debug('Exception') self.record_exception(error, True, sys.exc_info()) self.reject(message.delivery_tag, True) self.processing_error() - return False - - return True + self.on_processed(start_time, method, False) @property def is_processing(self): @@ -537,32 +563,11 @@ def process(self, channel=None, method=None, properties=None, body=None): LOGGER.critical('Received a message while in state: %s', self.state_description) return self.reject(method.delivery_tag, True) - self.set_state(self.STATE_PROCESSING) - - LOGGER.debug('Received message #%s', method.delivery_tag) message = data.Message(channel, method, properties, body) if method.redelivered: self.stats.incr(self.REDELIVERED) - - LOGGER.debug('Invoking consumer') - - start_time = time.time() - result = self.invoke_consumer(message) - self.stats.add_timing(self.TIME_SPENT, time.time() - start_time) - - if not result: - LOGGER.debug('Bypassing ack due to False return consumer') - return - - LOGGER.debug('Post invoke consumer') - - self.stats.incr(self.PROCESSED) - if self.ack: - self.ack_message(method.delivery_tag) - if self.is_waiting_to_shutdown: - return self.on_ready_to_stop() - self.reset_state() + self.invoke_consumer(method, message) def processing_error(self): """Called when message processing failure happens due to a From 6059fc290f7fdee59658460a56e44565c32dee2e Mon Sep 17 00:00:00 2001 From: "Gavin M. Roy" Date: Fri, 24 Apr 2015 22:26:51 -0400 Subject: [PATCH 11/24] Fix stats keys related bugs and changes --- rejected/mcp.py | 48 +++++++++++------------------------------------- 1 file changed, 11 insertions(+), 37 deletions(-) diff --git a/rejected/mcp.py b/rejected/mcp.py index 5684d9a..02df856 100644 --- a/rejected/mcp.py +++ b/rejected/mcp.py @@ -127,7 +127,7 @@ def calculate_stats(self, data): self.process_count_by_consumer(name) for proc in data[name].keys(): for key in stats: - value = data[name][proc]['counts'][key] + value = data[name][proc]['counts'].get(key, 0) stats[key] += value consumer_stats[name][key] += value @@ -140,20 +140,6 @@ def calculate_stats(self, data): 'counts': stats } - @staticmethod - def calculate_velocity(counts): - """Calculate the message velocity to determine how many messages are - processed per second. - - :param dict counts: The count dictionary to use for calculation - :rtype: float - - """ - total_time = counts['idle_time'] + counts['processing_time'] - if total_time and counts['processed']: - return float(counts['processed'] / float(total_time)) - return 0 - def check_process_counts(self): """Check for the minimum consumer process levels and start up new processes needed. @@ -211,9 +197,7 @@ def consumer_stats_counter(): return { process.Process.ERROR: 0, process.Process.PROCESSED: 0, - process.Process.REDELIVERED: 0, - process.Process.TIME_SPENT: 0, - process.Process.TIME_WAITED: 0 + process.Process.REDELIVERED: 0 } def get_consumer_process(self, consumer, name): @@ -297,28 +281,18 @@ def log_stats(self): LOGGER.info('Did not receive any stats data from children') return - LOGGER.info('%i total %s have processed %i messages with %i ' - 'errors, waiting %.2f seconds and have spent %.2f seconds ' - 'processing messages with an overall velocity of %.2f ' - 'messages per second.', self.stats['counts']['processes'], + LOGGER.info('%i total %s have processed %i messages with %i errors', + self.stats['counts']['processes'], self.consumer_keyword(self.stats['counts']), self.stats['counts']['processed'], - self.stats['counts']['failed'], - self.stats['counts']['idle_time'], - self.stats['counts']['processing_time'], - self.calculate_velocity(self.stats['counts'])) + self.stats['counts']['failed']) for key in self.stats['consumers'].keys(): LOGGER.info('%i %s for %s have processed %i messages with %i ' - 'errors, waiting %.2f seconds and have spent %.2f ' - 'seconds processing messages with an overall velocity ' - 'of %.2f messages per second.', + 'errors', self.stats['consumers'][key]['processes'], self.consumer_keyword(self.stats['consumers'][key]), key, self.stats['consumers'][key]['processed'], - self.stats['consumers'][key]['failed'], - self.stats['consumers'][key]['idle_time'], - self.stats['consumers'][key]['processing_time'], - self.calculate_velocity(self.stats['consumers'][key])) + self.stats['consumers'][key]['failed']) if self.poll_data['processes']: LOGGER.warning('%i process(es) did not respond with stats in ' 'time: %r', len(self.poll_data['processes']), @@ -444,16 +418,16 @@ def poll_results_check(self): LOGGER.warning('Did not receive results from %r', self.poll_data['processes']) - def process(self, consumer_name, process_name): + def process(self, name, process_name): """Return the process handle for the given consumer name and process name. - :param str consumer_name: The consumer name from config + :param str name: The consumer name from config :param str process_name: The automatically assigned process name :rtype: rejected.process.Process """ - return self.consumers[consumer_name].processes[process_name] + return self.consumers[name].processes[process_name] def process_count(self, name, connection): """Return the process count for the given consumer name and connection. @@ -474,7 +448,7 @@ def process_count_by_consumer(self, name): """ count = 0 for connection in self.consumers[name].connections: - count += len(self.consumers[name].connections[connection]) + count += len(self.consumers[name].connections.get(connection)) return count def process_spawn_qty(self, name, connection): From 8291f0afdfb62bdf2884c8458571d9b2fdea7de0 Mon Sep 17 00:00:00 2001 From: "Gavin M. Roy" Date: Fri, 24 Apr 2015 22:27:05 -0400 Subject: [PATCH 12/24] Fix some bugs and stuff --- rejected/stats.py | 29 ++++++++++++++++------------- 1 file changed, 16 insertions(+), 13 deletions(-) diff --git a/rejected/stats.py b/rejected/stats.py index 8222b13..140f7a2 100644 --- a/rejected/stats.py +++ b/rejected/stats.py @@ -12,15 +12,14 @@ class Stats(object): - def __init__(self, name, consumer_name, queue, statsd_cfg): + def __init__(self, name, consumer_name, statsd_cfg): self.name = name - self.consumer_name = name + self.consumer_name = consumer_name self.statsd = None if statsd_cfg.get('enabled', False): self.statsd = statsd.StatsdClient(consumer_name, statsd_cfg) self.counter = collections.Counter() - self.previous = collections.Counter() - self.reporting_queue = queue + self.previous = None def __getitem__(self, item): return self.counter.get(item) @@ -45,14 +44,18 @@ def incr(self, key, value=1): def report(self): """Submit the stats data to both the MCP stats queue and statsd""" - values = dict() - for item in self.counter.keys(): - values[item] = self.diff(item) - if self.statsd: - self.statsd.incr(item, values[item]) - self.previous.update(self.counter) - self.reporting_queue.put({ + if not self.previous: + self.previous = dict() + for key in self.counter: + self.previous[key] = 0 + if self.statsd: + for item in self.counter.keys(): + self.statsd.incr(item, self.diff(item)) + values = { 'name': self.name, 'consumer_name': self.consumer_name, - 'counts': values - }, True) + 'counts': dict(self.counter), + 'previous': self.previous + } + self.previous = dict(self.counter) + return values From 956e32421fbcb827f4066c471f4a2582f8b34fd2 Mon Sep 17 00:00:00 2001 From: "Gavin M. Roy" Date: Fri, 24 Apr 2015 22:33:01 -0400 Subject: [PATCH 13/24] Fixup stats and QoS prefetch calcs --- rejected/process.py | 89 ++++++++++++++++++++------------------------- rejected/stats.py | 2 +- 2 files changed, 40 insertions(+), 51 deletions(-) diff --git a/rejected/process.py b/rejected/process.py index 7f34f15..4609ea8 100644 --- a/rejected/process.py +++ b/rejected/process.py @@ -120,8 +120,8 @@ def __init__(self, self.prepend_path = None self.state = self.STATE_INITIALIZING self.state_start = time.time() - self.stats = None + self.stats_queue = None # Override ACTIVE with PROCESSING self.STATES[0x04] = 'Processing' @@ -165,7 +165,7 @@ def base_qos_prefetch(self): """ return self.config.get('qos_prefetch', self.QOS_PREFETCH_COUNT) - def calculate_qos_prefetch(self): + def calc_qos_prefetch(self, values): """Determine if the channel should use the dynamic QoS value, stay at the same QoS or use the default QoS. @@ -175,7 +175,9 @@ def calculate_qos_prefetch(self): if not self.last_stats_time: return - qos_prefetch = self.dynamic_qos_pretch + velocity = self.calc_velocity(values) + qos_prefetch = int(math.ceil(velocity * + float(self.QOS_PREFETCH_MULTIPLIER))) # Don't change anything if qos_prefetch == self.qos_prefetch: @@ -183,12 +185,6 @@ def calculate_qos_prefetch(self): self.qos_prefetch) return False - # Don't change anything - if self.stats.diff(self.PROCESSED) < qos_prefetch: - LOGGER.error('Processed fewer messages last interval than the ' - 'qos_prefetch value') - return False - # If calculated QoS exceeds max if qos_prefetch > self.QOS_MAX: LOGGER.debug('Hit QoS Max ceiling of %i', self.QOS_MAX) @@ -212,6 +208,26 @@ def calculate_qos_prefetch(self): qos_prefetch, self.qos_prefetch) return self.set_qos_prefetch(qos_prefetch) + def calc_velocity(self, values): + """Return the message consuming velocity for the process. + + :rtype: float + + """ + processed = (values['counts'].get(self.PROCESSED, 0) - + values['previous'].get(self.PROCESSED, 0)) + duration = time.time() - self.last_stats_time + LOGGER.debug("Processed: %s (%s)", processed, duration) + + # If there were no messages, do not calculate, use the base + if not processed or not duration: + return 0 + + # Calculate the velocity as the basis for the calculation + velocity = float(processed) / float(duration) + LOGGER.info('Message processing velocity: %.2f/s', velocity) + return velocity + @property def can_respond(self): """Indicates if the process can still respond to RabbitMQ when the @@ -255,20 +271,6 @@ def connect_to_rabbitmq(self, cfg, name): self.on_connection_open, stop_ioloop_on_close=False) - @property - def dynamic_qos_pretch(self): - """Calculate the prefetch count based upon the message velocity * the - _QOS_PREFETCH_MULTIPLIER. - - :rtype: int - - """ - # Round up the velocity * the multiplier - value = int(math.ceil(self.message_velocity * - float(self.QOS_PREFETCH_MULTIPLIER))) - LOGGER.debug('Calculated prefetch value: %i', value) - return value - @staticmethod def get_config(cfg, number, name, connection): """Initialize a new consumer thread, setting defaults and config values @@ -429,25 +431,6 @@ def is_processing(self): """ return self.state in [self.STATE_PROCESSING, self.STATE_STOP_REQUESTED] - @property - def message_velocity(self): - """Return the message consuming velocity for the process. - - :rtype: float - - """ - processed = self.stats.diff(self.PROCESSED) - duration = time.time() - self.last_stats_time - - # If there were no messages, do not calculate, use the base - if not processed or not duration: - return 0 - - # Calculate the velocity as the basis for the calculation - velocity = float(processed) / float(duration) - LOGGER.debug('Message processing velocity: %.2f', velocity) - return velocity - def on_channel_closed(self, method_frame): """Invoked by pika when RabbitMQ unexpectedly closes the channel. Channels are usually closed if you attempt to do something that @@ -539,9 +522,10 @@ def on_sigprof(self, _unused_signum, _unused_frame): :param frame _unused_frame: The python frame the signal was received at """ - self.stats.report() + values = self.stats.report() + self.stats_queue.put(values, True) if self.is_processing or self.is_idle: - self.calculate_qos_prefetch() + self.calc_qos_prefetch(values) self.last_stats_time = time.time() signal.siginterrupt(signal.SIGPROF, False) @@ -707,8 +691,9 @@ def _run(self): """Run method that can be profiled""" self.ioloop = ioloop.IOLoop.instance() try: - self.setup(self._kwargs['config'], self._kwargs['connection_name'], + self.setup(self._kwargs['config'], self._kwargs['consumer_name'], + self._kwargs['connection_name'], self._kwargs['stats_queue'], self._kwargs['logging_config']) except ImportError as error: @@ -734,16 +719,20 @@ def set_qos_prefetch(self, value=None): if qos_prefetch != self.qos_prefetch: self.qos_prefetch = qos_prefetch LOGGER.debug('Setting the QOS Prefetch to %i', qos_prefetch) - self.channel.basic_qos(prefetch_count=qos_prefetch) + self.channel.basic_qos(self.on_qos_set, 0, qos_prefetch, False) + + def on_qos_set(self, frame): + """Invoked by pika when the QoS is set""" + LOGGER.debug("QoS was set: %r", frame) - def setup(self, cfg, connection_name, consumer_name, stats_queue, + def setup(self, cfg, consumer_name, connection_name, stats_queue, logging_config): """Initialize the consumer, setting up needed attributes and connecting to RabbitMQ. :param dict cfg: Consumer config section - :param str connection_name: The name of the connection :param str consumer_name: Consumer name for config + :param str connection_name: The name of the connection :param multiprocessing.Queue stats_queue: Queue to MCP :param dict logging_config: Logging config from YAML file @@ -756,6 +745,7 @@ def setup(self, cfg, connection_name, consumer_name, stats_queue, self.config = cfg['Consumers'][consumer_name] self.connections = cfg['Connections'] self.consumer = self.get_consumer(self.config) + self.stats_queue = stats_queue if not self.consumer: LOGGER.critical('Could not import and start processor') @@ -763,8 +753,7 @@ def setup(self, cfg, connection_name, consumer_name, stats_queue, exit(1) # Setup the stats counter instance - self.stats = stats.Stats(self.name, consumer_name, stats_queue, - cfg['statsd'] or {}) + self.stats = stats.Stats(self.name, consumer_name, cfg['statsd'] or {}) # Set statsd in the consumer if self.stats.statsd: diff --git a/rejected/stats.py b/rejected/stats.py index 140f7a2..e5c9c99 100644 --- a/rejected/stats.py +++ b/rejected/stats.py @@ -35,7 +35,7 @@ def get(self, item): return self.counter.get(item) def diff(self, item): - return self.counter[item] - self.previous[item] + return self.counter.get(item, 0) - self.previous.get(item, 0) def incr(self, key, value=1): self.counter[key] += value From 067c3978eff7997909bc5e82f69ca998b2b0fb9a Mon Sep 17 00:00:00 2001 From: "Gavin M. Roy" Date: Fri, 24 Apr 2015 22:33:14 -0400 Subject: [PATCH 14/24] Clean up how stats are logged --- rejected/mcp.py | 27 ++++++++++++++------------- 1 file changed, 14 insertions(+), 13 deletions(-) diff --git a/rejected/mcp.py b/rejected/mcp.py index 02df856..99f06bf 100644 --- a/rejected/mcp.py +++ b/rejected/mcp.py @@ -281,22 +281,23 @@ def log_stats(self): LOGGER.info('Did not receive any stats data from children') return - LOGGER.info('%i total %s have processed %i messages with %i errors', - self.stats['counts']['processes'], - self.consumer_keyword(self.stats['counts']), - self.stats['counts']['processed'], - self.stats['counts']['failed']) + if self.poll_data['processes']: + LOGGER.warning('%i process(es) did not respond with stats: %r', + len(self.poll_data['processes']), + self.poll_data['processes']) + + if self.stats['counts']['processes'] > 1: + LOGGER.info('%i consumers processed %i messages with %i errors', + self.stats['counts']['processes'], + self.stats['counts']['processed'], + self.stats['counts']['failed']) + for key in self.stats['consumers'].keys(): - LOGGER.info('%i %s for %s have processed %i messages with %i ' - 'errors', - self.stats['consumers'][key]['processes'], + LOGGER.info('%i %s %s processed %i messages with %i errors', + self.stats['consumers'][key]['processes'], key, self.consumer_keyword(self.stats['consumers'][key]), - key, self.stats['consumers'][key]['processed'], + self.stats['consumers'][key]['processed'], self.stats['consumers'][key]['failed']) - if self.poll_data['processes']: - LOGGER.warning('%i process(es) did not respond with stats in ' - 'time: %r', len(self.poll_data['processes']), - self.poll_data['processes']) def new_consumer(self, config): """Return a consumer dict for the given name and configuration. From 10c7ea573400d463dd8892dc9ad048a6e2746d9d Mon Sep 17 00:00:00 2001 From: "Gavin M. Roy" Date: Fri, 24 Apr 2015 22:34:30 -0400 Subject: [PATCH 15/24] Remove extra logging --- rejected/process.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/rejected/process.py b/rejected/process.py index 4609ea8..7e0fe2c 100644 --- a/rejected/process.py +++ b/rejected/process.py @@ -217,7 +217,6 @@ def calc_velocity(self, values): processed = (values['counts'].get(self.PROCESSED, 0) - values['previous'].get(self.PROCESSED, 0)) duration = time.time() - self.last_stats_time - LOGGER.debug("Processed: %s (%s)", processed, duration) # If there were no messages, do not calculate, use the base if not processed or not duration: @@ -225,7 +224,7 @@ def calc_velocity(self, values): # Calculate the velocity as the basis for the calculation velocity = float(processed) / float(duration) - LOGGER.info('Message processing velocity: %.2f/s', velocity) + LOGGER.debug('Message processing velocity: %.2f/s', velocity) return velocity @property From f3df576f38480ec0e28432cd75ba8ceb6f21108a Mon Sep 17 00:00:00 2001 From: "Gavin M. Roy" Date: Fri, 24 Apr 2015 22:34:47 -0400 Subject: [PATCH 16/24] Update example --- example.yaml | 22 ++++++++++++++++------ 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/example.yaml b/example.yaml index ae45491..de236c0 100644 --- a/example.yaml +++ b/example.yaml @@ -7,6 +7,7 @@ Application: enabled: True host: localhost port: 8125 + prefix: application.rejected Connections: rabbitmq: host: localhost @@ -21,13 +22,26 @@ Application: consumer: rejected.example.ExampleConsumer connections: [rabbitmq] qty: 1 - queue: generated_messages + queue: test dynamic_qos: True ack: True max_errors: 100 config: foo: True bar: baz + example2: + consumer: rejected.example.ExampleConsumer + connections: [rabbitmq] + qty: 2 + queue: test + dynamic_qos: True + ack: True + max_errors: 100 + config: + foo: True + bar: baz + + Daemon: user: rejected @@ -38,7 +52,7 @@ Logging: version: 1 formatters: verbose: - format: "%(levelname) -10s %(asctime)s %(process)-6d %(processName) -25s %(name) -30s %(funcName) -30s: %(message)s" + format: "%(levelname) -10s %(asctime)s %(process)-6d %(processName) -15s %(name) -20s %(funcName) -20s: %(message)s" datefmt: "%Y-%m-%d %H:%M:%S" syslog: format: "%(levelname)s %(name)s.%(funcName)s(): %(message)s" @@ -63,10 +77,6 @@ Logging: level: INFO propagate: true handlers: [console] - rejected.example: - level: INFO - propagate: false - handlers: [console] ROOT: level: CRITICAL propagate: false From da229ef5c2f8ffa7ab9be6c83325d3c7cf2ee55e Mon Sep 17 00:00:00 2001 From: "Gavin M. Roy" Date: Fri, 24 Apr 2015 22:38:43 -0400 Subject: [PATCH 17/24] Add async example and config --- example.yaml | 11 ++++------- rejected/example.py | 17 +++++++++++++++-- 2 files changed, 19 insertions(+), 9 deletions(-) diff --git a/example.yaml b/example.yaml index de236c0..1cd12b7 100644 --- a/example.yaml +++ b/example.yaml @@ -18,21 +18,18 @@ Application: vhost: / heartbeat_interval: 300 Consumers: - example: - consumer: rejected.example.ExampleConsumer + async: + consumer: rejected.example.AsyncExampleConsumer connections: [rabbitmq] qty: 1 queue: test dynamic_qos: True ack: True max_errors: 100 - config: - foo: True - bar: baz - example2: + example: consumer: rejected.example.ExampleConsumer connections: [rabbitmq] - qty: 2 + qty: 1 queue: test dynamic_qos: True ack: True diff --git a/rejected/example.py b/rejected/example.py index a5458ab..445ebf8 100644 --- a/rejected/example.py +++ b/rejected/example.py @@ -3,6 +3,9 @@ import logging import random +from tornado import gen +from tornado import httpclient + __version__ = '1.0.0' LOGGER = logging.getLogger(__name__) @@ -11,8 +14,8 @@ class ExampleConsumer(consumer.Consumer): def process(self): - LOGGER.debug('Message: %r', self._message.body) - action = random.randint(0, 1000) + LOGGER.info('Message: %r', self.body) + action = random.randint(0, 100) if action == 0: raise ValueError('Unhandled exception') elif action < 2: @@ -20,3 +23,13 @@ def process(self): elif action < 5: raise consumer.MessageException('reject') + +class AsyncExampleConsumer(consumer.Consumer): + + @gen.coroutine + def process(self): + LOGGER.info('Message: %r', self.body) + http_client = httpclient.AsyncHTTPClient() + results = yield [http_client.fetch('http://www.github.com'), + http_client.fetch('http://www.reddit.com')] + [LOGGER.info('Length: %s' % len(r.body)) for r in results] From dc2bd88108cbf2c729eeff1904164a3823ab25af Mon Sep 17 00:00:00 2001 From: "Gavin M. Roy" Date: Fri, 24 Apr 2015 22:49:38 -0400 Subject: [PATCH 18/24] Fix double logging --- rejected/process.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/rejected/process.py b/rejected/process.py index 7e0fe2c..dc7b91f 100644 --- a/rejected/process.py +++ b/rejected/process.py @@ -25,9 +25,10 @@ import traceback from rejected import __version__ -from rejected import state from rejected import consumer from rejected import data +from rejected import NullHandler +from rejected import state from rejected import stats LOGGER = logging.getLogger(__name__) @@ -677,6 +678,8 @@ def reset_state(self): def run(self): """Start the consumer""" + logger = logging.getLogger() + logger.addHandler(NullHandler()) if self.profile_file: LOGGER.info('Profiling to %s', self.profile_file) profile.runctx('self._run()', globals(), locals(), @@ -738,7 +741,6 @@ def setup(self, cfg, consumer_name, connection_name, stats_queue, """ LOGGER.info('Initializing for %s on %s connection', self.name, connection_name) - self.logging_config = logging_config self.connection_name = connection_name self.consumer_name = consumer_name self.config = cfg['Consumers'][consumer_name] From fd4e7d53c1e065bb1205cbdf0fd8540f55a4bd92 Mon Sep 17 00:00:00 2001 From: "Gavin M. Roy" Date: Fri, 24 Apr 2015 22:49:46 -0400 Subject: [PATCH 19/24] Better examples --- example.yaml | 2 +- rejected/example.py | 10 ++++------ 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/example.yaml b/example.yaml index 1cd12b7..e5b05f4 100644 --- a/example.yaml +++ b/example.yaml @@ -76,7 +76,7 @@ Logging: handlers: [console] ROOT: level: CRITICAL - propagate: false + propagate: true handlers: [console] disable_existing_loggers: true incremental: false diff --git a/rejected/example.py b/rejected/example.py index 445ebf8..e44b350 100644 --- a/rejected/example.py +++ b/rejected/example.py @@ -14,11 +14,9 @@ class ExampleConsumer(consumer.Consumer): def process(self): - LOGGER.info('Message: %r', self.body) - action = random.randint(0, 100) + LOGGER.debug('Message: %r', self.body) + action = random.randint(0, 10000) if action == 0: - raise ValueError('Unhandled exception') - elif action < 2: raise consumer.ConsumerException('zomg') elif action < 5: raise consumer.MessageException('reject') @@ -28,8 +26,8 @@ class AsyncExampleConsumer(consumer.Consumer): @gen.coroutine def process(self): - LOGGER.info('Message: %r', self.body) + LOGGER.debug('Message: %r', self.body) http_client = httpclient.AsyncHTTPClient() results = yield [http_client.fetch('http://www.github.com'), http_client.fetch('http://www.reddit.com')] - [LOGGER.info('Length: %s' % len(r.body)) for r in results] + LOGGER.info('Length: %r', [len(r.body) for r in results]) From 8e6af5cf3d1de23faa57f138873cfea458f80af0 Mon Sep 17 00:00:00 2001 From: "Gavin M. Roy" Date: Fri, 24 Apr 2015 23:00:57 -0400 Subject: [PATCH 20/24] Remove _clear on finish, fix tests --- rejected/consumer.py | 1 - tests/test_consumer.py | 26 ++++++++++++-------------- 2 files changed, 12 insertions(+), 15 deletions(-) diff --git a/rejected/consumer.py b/rejected/consumer.py index 1bed418..e65fbd3 100644 --- a/rejected/consumer.py +++ b/rejected/consumer.py @@ -142,7 +142,6 @@ def finish(self): raise RuntimeError("finish() called twice") self._finished = True self.on_finish() - self._clear() def require_setting(self, name, feature="this feature"): """Raises an exception if the given app setting is not defined.""" diff --git a/tests/test_consumer.py b/tests/test_consumer.py index 050ccb3..53b81f7 100644 --- a/tests/test_consumer.py +++ b/tests/test_consumer.py @@ -1,16 +1,12 @@ # coding=utf-8 """Tests for rejected.consumer""" -import bs4 -import bz2 -import csv +from tornado import gen import json import mock -import pickle try: import unittest2 as unittest except ImportError: import unittest -import zlib from . import mocks @@ -23,7 +19,7 @@ class ConsumerInitializationTests(unittest.TestCase): def test_configuration_is_assigned(self): cfg = {'foo': 'bar'} obj = consumer.Consumer(cfg) - self.assertDictEqual(obj._config, cfg) + self.assertDictEqual(obj._settings, cfg) def test_channel_is_none(self): obj = consumer.Consumer({}) @@ -51,7 +47,7 @@ class ConsumerSetChannelTests(unittest.TestCase): def test_set_channel_assigns_to_channel(self): obj = consumer.Consumer({}) channel = mock.Mock() - obj.set_channel(channel) + obj._set_channel(channel) self.assertEqual(obj._channel, channel) @@ -67,37 +63,39 @@ def setUp(self): self.message = data.Message(mocks.CHANNEL, mocks.METHOD, mocks.PROPERTIES, mocks.BODY) + @gen.coroutine def test_receive_assigns_message(self): - self.obj.receive(self.message) + yield self.obj._execute(self.message) self.assertEqual(self.obj._message, self.message) def test_receive_invokes_process(self): with mock.patch.object(self.obj, 'process') as process: - self.obj.receive(self.message) + self.obj._execute(self.message) process.assert_called_once_with() def test_receive_drops_invalid_message_type(self): self.obj.MESSAGE_TYPE = 'foo' self.obj.DROP_INVALID_MESSAGES = True with mock.patch.object(self.obj, 'process') as process: - self.obj.receive(self.message) + self.obj._execute(self.message) process.assert_not_called() def test_raises_with_invalid_message_type(self): self.obj.MESSAGE_TYPE = 'foo' self.obj.DROP_INVALID_MESSAGES = False - self.assertRaises(consumer.ConsumerException, self.obj.receive, - self.message) + result = yield self.obj._execute(self.message) + self.assertIsInstance(result.exception, consumer.ConsumerException) class ConsumerPropertyTests(unittest.TestCase): + @gen.coroutine def setUp(self): self.config = {'foo': 'bar', 'baz': 1, 'qux': True} self.message = data.Message(mocks.CHANNEL, mocks.METHOD, mocks.PROPERTIES, mocks.BODY) self.obj = TestConsumer(self.config) - self.obj.receive(self.message) + yield self.obj._execute(self.message) def test_app_id_property(self): self.assertEqual(self.obj.app_id, mocks.PROPERTIES.app_id) @@ -172,7 +170,7 @@ def setUp(self): self.message = data.Message(mocks.CHANNEL, mocks.METHOD, mocks.PROPERTIES, json.dumps(self.body)) self.obj = TestSmartConsumer({}) - self.obj.receive(self.message) + self.obj._execute(self.message) def test_message_body_property(self): self.assertDictEqual(self.obj.body, self.body) From dc5777fd023da6f44286950b2295aa6a356b6da3 Mon Sep 17 00:00:00 2001 From: "Gavin M. Roy" Date: Fri, 24 Apr 2015 23:10:43 -0400 Subject: [PATCH 21/24] Fix and prune tests --- tests/test_process.py | 86 +++++++------------------------------------ 1 file changed, 14 insertions(+), 72 deletions(-) diff --git a/tests/test_process.py b/tests/test_process.py index 7a43ad2..4a686d1 100644 --- a/tests/test_process.py +++ b/tests/test_process.py @@ -48,6 +48,9 @@ def test_import_consumer_failure(self): class TestProcess(test_state.TestState): config = { + 'statsd': { + 'enabled': False + }, 'Connections': { 'MockConnection': { 'host': 'localhost', @@ -168,36 +171,20 @@ def test_startup_consumer_is_none(self): new_process = self.new_process() self.assertIsNone(new_process.consumer) - def test_ack_message(self): - delivery_tag = 'MockDeliveryTag-1' - mock_ack = mock.Mock() - self._obj._ack = True - self._obj._message_connection_id = self._obj._connection_id = 1 - self._obj._channel = self.new_mock_channel() - self._obj._channel.basic_ack = mock_ack - self._obj.ack_message(delivery_tag) - mock_ack.assert_called_once_with(delivery_tag=delivery_tag) - - def test_count(self): - expectation = 10 - self._obj.stats[process.Process.REDELIVERED] = expectation - self.assertEqual(self._obj.count(process.Process.REDELIVERED), - expectation) - def test_get_config(self): - connection = 'MockConnection' + conn = 'MockConnection' name = 'MockConsumer' number = 5 pid = 1234 expectation = { - 'connection': self.config['Connections'][connection], - 'connection_name': connection, + 'connection': self.config['Connections'][conn], + 'connection_name': conn, 'consumer_name': name, 'process_name': '%s_%i_tag_%i' % (name, pid, number) } with patch('os.getpid', return_value=pid): self.assertEqual(self._obj.get_config(self.config, number, name, - connection), expectation) + conn), expectation) def test_get_consumer_with_invalid_consumer(self): cfg = self.config['Consumers']['MockConsumer2'] @@ -234,35 +221,6 @@ def test_get_consumer_with_no_config_is_none(self, mock_method): new_process = self.new_process() self.assertIsNone(new_process.get_consumer(config)) - def test_reject_message(self): - delivery_tag = 'MockDeliveryTag-1' - mock_reject = mock.Mock() - self._obj.channel = self.new_mock_channel() - self._obj.ack = True - self._obj.message_connection_id = self._obj.connection_id = 1 - self._obj.channel.basic_nack = mock_reject - self._obj.reject(delivery_tag) - mock_reject.assert_called_once_with(requeue=True, - delivery_tag=delivery_tag) - - def test_set_qos_prefetch_value(self): - new_process = self.new_process() - new_process.setup(**self.mock_args) - mock_channel = self.new_mock_channel() - new_process._channel = mock_channel - value = 12 - new_process.set_qos_prefetch(value) - mock_channel.basic_qos.assert_called_once_with(prefetch_count=value) - - def test_set_qos_prefetch_no_value(self): - new_process = self.new_process() - new_process.setup(**self.mock_args) - mock_channel = self.new_mock_channel() - new_process._channel = mock_channel - new_process.set_qos_prefetch() - value = new_process.qos_prefetch - mock_channel.basic_qos.assert_called_once_with(prefetch_count=value) - def test_setup_signal_handlers(self): signals = [mock.call(signal.SIGPROF, self._obj.on_sigprof), mock.call(signal.SIGABRT, self._obj.stop)] @@ -281,38 +239,32 @@ def mock_setup(self, new_process=None, side_effect=None): def test_setup_stats_queue(self): mock_process = self.mock_setup() - self.assertEqual(mock_process._stats_queue, + self.assertEqual(mock_process.stats_queue, self.mock_args['stats_queue']) def test_setup_consumer_name(self): mock_process = self.mock_setup() - self.assertEqual(mock_process._stats_queue, + self.assertEqual(mock_process.stats_queue, self.mock_args['stats_queue']) def test_setup_config(self): mock_process = self.mock_setup() config = self.config['Consumers']['MockConsumer'] - self.assertEqual(mock_process._config, config) - - def test_setup_with_consumer_config(self): - new_process = self.new_process() - new_process.setup(**self.mock_args) - self.assertEqual(new_process.consumer._settings, - self.config['Consumers']['MockConsumer']['config']) + self.assertEqual(mock_process.config, config) def test_setup_config_queue_name(self): mock_process = self.mock_setup() - self.assertEqual(mock_process._queue_name, + self.assertEqual(mock_process.queue_name, self.config['Consumers']['MockConsumer']['queue']) def test_setup_config_ack(self): mock_process = self.mock_setup() - self.assertEqual(mock_process._ack, + self.assertEqual(mock_process.ack, self.config['Consumers']['MockConsumer']['ack']) def test_setup_max_error_count(self): mock_process = self.mock_setup() - self.assertEqual(mock_process._max_error_count, + self.assertEqual(mock_process.max_error_count, self.config['Consumers']['MockConsumer']['max_errors']) def test_setup_prefetch_count_no_config(self): @@ -341,7 +293,7 @@ def test_setup_connection_value(self): with patch.object(process.Process, 'connect_to_rabbitmq', return_value=mock_connection): mock_process = self.mock_setup() - self.assertEqual(mock_process._connection, mock_connection) + self.assertEqual(mock_process.connection, mock_connection) def test_is_idle_state_processing(self): self._obj.state = self._obj.STATE_PROCESSING @@ -359,16 +311,6 @@ def test_is_stopped_state_processing(self): self._obj.state = self._obj.STATE_PROCESSING self.assertFalse(self._obj.is_stopped) - def test_too_many_errors_true(self): - self._obj.max_error_count = 1 - self._obj.stats[self._obj.ERROR] = 2 - self.assertTrue(self._obj.too_many_errors) - - def test_too_many_errors_false(self): - self._obj.max_error_count = 5 - self._obj.stats[self._obj.ERROR] = 2 - self.assertFalse(self._obj.too_many_errors) - def test_state_processing_desc(self): self._obj.state = self._obj.STATE_PROCESSING self.assertEqual(self._obj.state_description, From f5a0b90053e94be28fcab2dc0de932e13fad4bac Mon Sep 17 00:00:00 2001 From: "Gavin M. Roy" Date: Fri, 24 Apr 2015 23:11:57 -0400 Subject: [PATCH 22/24] Update badges --- README.rst | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/README.rst b/README.rst index 8b620aa..bd25a2b 100644 --- a/README.rst +++ b/README.rst @@ -74,12 +74,12 @@ Example Configuration config: foo: True bar: baz - + Daemon: user: rejected group: daemon pidfile: /var/run/rejected/example.%(pid)s.pid - + Logging: version: 1 formatters: @@ -126,11 +126,11 @@ Available at https://rejected.readthedocs.org/en/latest/history.html .. |Status| image:: https://travis-ci.org/gmr/rejected.svg?branch=master :target: https://travis-ci.org/gmr/rejected -.. |Coverage| image:: https://coveralls.io/repos/gmr/rejected/badge.png - :target: https://coveralls.io/r/gmr/rejected - +.. |Coverage| image:: https://codecov.io/github/gmr/rejected/coverage.svg?branch=master + :target: https://codecov.io/github/gmr/rejected?branch=master + .. |Downloads| image:: https://pypip.in/d/rejected/badge.svg? :target: https://pypi.python.org/pypi/rejected - + .. |License| image:: https://pypip.in/license/rejected/badge.svg? :target: https://rejected.readthedocs.org From b13368b017dc94b818d625d1de6e54c28f8a8ca7 Mon Sep 17 00:00:00 2001 From: "Gavin M. Roy" Date: Fri, 24 Apr 2015 23:14:10 -0400 Subject: [PATCH 23/24] Only use codecov, add cache --- .travis.yml | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index 1f9828d..6e8dbe7 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,3 +1,4 @@ +sudo: false language: python python: - 2.6 @@ -11,7 +12,6 @@ install: script: nosetests --with-coverage --cover-package=rejected after_success: - codecov - - coveralls services: - rabbitmq deploy: @@ -24,3 +24,6 @@ deploy: all_branches: true password: secure: "QNndN99rD5boB/Sg3I0CzjkFUF1JmGrsQKZ7ONiA+obUWQDqOmggUoPEs1zN8xIExDcM4tPhlCQX0QiYJrKdLQwWiClvKo1wpYUxVm0s/W8SqvhV3IK9VxhMrbZUkmksO48TH4YKav06rEkVxke9g3U92XUJZ6cRAnYUKrjMYaQ=" +cache: + directories: + - $HOME/.pip-cache/ From eca12e694609bfa96f8afaea3b15d907e9c814cd Mon Sep 17 00:00:00 2001 From: "Gavin M. Roy" Date: Fri, 24 Apr 2015 23:15:49 -0400 Subject: [PATCH 24/24] Install 2.6 requirement when testing --- .travis.yml | 2 +- README.rst | 5 +---- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/.travis.yml b/.travis.yml index 6e8dbe7..5f88aae 100644 --- a/.travis.yml +++ b/.travis.yml @@ -6,7 +6,7 @@ python: before_install: - pip install codecov install: - - if [[ $TRAVIS_PYTHON_VERSION == '2.6' ]]; then pip install unittest2 importlib; fi + - if [[ $TRAVIS_PYTHON_VERSION == '2.6' ]]; then pip install backport_collections unittest2 importlib; fi - "pip install -r requirements.txt" - "pip install -r dev-requirements.txt" script: nosetests --with-coverage --cover-package=rejected diff --git a/README.rst b/README.rst index bd25a2b..60a2f90 100644 --- a/README.rst +++ b/README.rst @@ -10,7 +10,7 @@ Rejected runs as a master process with multiple consumer configurations that are each run it an isolated process. It has the ability to collect statistical data from the consumer processes and report on it. -|Version| |Downloads| |Status| |Coverage| |License| +|Version| |Downloads| |Status| |License| Features -------- @@ -126,9 +126,6 @@ Available at https://rejected.readthedocs.org/en/latest/history.html .. |Status| image:: https://travis-ci.org/gmr/rejected.svg?branch=master :target: https://travis-ci.org/gmr/rejected -.. |Coverage| image:: https://codecov.io/github/gmr/rejected/coverage.svg?branch=master - :target: https://codecov.io/github/gmr/rejected?branch=master - .. |Downloads| image:: https://pypip.in/d/rejected/badge.svg? :target: https://pypi.python.org/pypi/rejected