diff --git a/.travis.yml b/.travis.yml index 1f9828d..5f88aae 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,3 +1,4 @@ +sudo: false language: python python: - 2.6 @@ -5,13 +6,12 @@ python: before_install: - pip install codecov install: - - if [[ $TRAVIS_PYTHON_VERSION == '2.6' ]]; then pip install unittest2 importlib; fi + - if [[ $TRAVIS_PYTHON_VERSION == '2.6' ]]; then pip install backport_collections unittest2 importlib; fi - "pip install -r requirements.txt" - "pip install -r dev-requirements.txt" script: nosetests --with-coverage --cover-package=rejected after_success: - codecov - - coveralls services: - rabbitmq deploy: @@ -24,3 +24,6 @@ deploy: all_branches: true password: secure: "QNndN99rD5boB/Sg3I0CzjkFUF1JmGrsQKZ7ONiA+obUWQDqOmggUoPEs1zN8xIExDcM4tPhlCQX0QiYJrKdLQwWiClvKo1wpYUxVm0s/W8SqvhV3IK9VxhMrbZUkmksO48TH4YKav06rEkVxke9g3U92XUJZ6cRAnYUKrjMYaQ=" +cache: + directories: + - $HOME/.pip-cache/ diff --git a/README.rst b/README.rst index 8b620aa..60a2f90 100644 --- a/README.rst +++ b/README.rst @@ -10,7 +10,7 @@ Rejected runs as a master process with multiple consumer configurations that are each run it an isolated process. It has the ability to collect statistical data from the consumer processes and report on it. -|Version| |Downloads| |Status| |Coverage| |License| +|Version| |Downloads| |Status| |License| Features -------- @@ -74,12 +74,12 @@ Example Configuration config: foo: True bar: baz - + Daemon: user: rejected group: daemon pidfile: /var/run/rejected/example.%(pid)s.pid - + Logging: version: 1 formatters: @@ -126,11 +126,8 @@ Available at https://rejected.readthedocs.org/en/latest/history.html .. |Status| image:: https://travis-ci.org/gmr/rejected.svg?branch=master :target: https://travis-ci.org/gmr/rejected -.. |Coverage| image:: https://coveralls.io/repos/gmr/rejected/badge.png - :target: https://coveralls.io/r/gmr/rejected - .. |Downloads| image:: https://pypip.in/d/rejected/badge.svg? :target: https://pypi.python.org/pypi/rejected - + .. |License| image:: https://pypip.in/license/rejected/badge.svg? :target: https://rejected.readthedocs.org diff --git a/example.yaml b/example.yaml index ae45491..e5b05f4 100644 --- a/example.yaml +++ b/example.yaml @@ -7,6 +7,7 @@ Application: enabled: True host: localhost port: 8125 + prefix: application.rejected Connections: rabbitmq: host: localhost @@ -17,11 +18,19 @@ Application: vhost: / heartbeat_interval: 300 Consumers: + async: + consumer: rejected.example.AsyncExampleConsumer + connections: [rabbitmq] + qty: 1 + queue: test + dynamic_qos: True + ack: True + max_errors: 100 example: consumer: rejected.example.ExampleConsumer connections: [rabbitmq] qty: 1 - queue: generated_messages + queue: test dynamic_qos: True ack: True max_errors: 100 @@ -29,6 +38,8 @@ Application: foo: True bar: baz + + Daemon: user: rejected group: daemon @@ -38,7 +49,7 @@ Logging: version: 1 formatters: verbose: - format: "%(levelname) -10s %(asctime)s %(process)-6d %(processName) -25s %(name) -30s %(funcName) -30s: %(message)s" + format: "%(levelname) -10s %(asctime)s %(process)-6d %(processName) -15s %(name) -20s %(funcName) -20s: %(message)s" datefmt: "%Y-%m-%d %H:%M:%S" syslog: format: "%(levelname)s %(name)s.%(funcName)s(): %(message)s" @@ -63,13 +74,9 @@ Logging: level: INFO propagate: true handlers: [console] - rejected.example: - level: INFO - propagate: false - handlers: [console] ROOT: level: CRITICAL - propagate: false + propagate: true handlers: [console] disable_existing_loggers: true incremental: false diff --git a/rejected/__init__.py b/rejected/__init__.py index 31b3b38..a0f800f 100644 --- a/rejected/__init__.py +++ b/rejected/__init__.py @@ -4,7 +4,7 @@ """ __author__ = 'Gavin M. Roy ' __since__ = "2009-09-10" -__version__ = "3.4.6" +__version__ = "3.5.0" from consumer import Consumer from consumer import PublishingConsumer @@ -17,15 +17,15 @@ try: from logging import NullHandler except ImportError: + class NullHandler(logging.Handler): """Python 2.6 does not have a NullHandler""" + def emit(self, record): """Emit a record - :param record record: The record to emit - """ pass + logging.getLogger('rejected').addHandler(NullHandler()) -logging.getLogger('rejected.consumer').addHandler(NullHandler()) diff --git a/rejected/consumer.py b/rejected/consumer.py index 1064cf6..e65fbd3 100644 --- a/rejected/consumer.py +++ b/rejected/consumer.py @@ -13,7 +13,7 @@ ``content_encoding`` types (``gzip`` or ``bzip2``) be specified in the message's property, it will automatically be decoded. -Supported MIME types are: +Supported `SmartConsumer` MIME types are: - application/json - application/pickle @@ -29,7 +29,9 @@ """ import bz2 +from tornado import concurrent import csv +from tornado import gen import json import logging import pickle @@ -39,14 +41,14 @@ import sys import time import uuid +import warnings import yaml import zlib LOGGER = logging.getLogger(__name__) BS4_MIME_TYPES = ('text/html', 'text/xml') -PICKLE_MIME_TYPES = ('application/pickle', - 'application/x-pickle', +PICKLE_MIME_TYPES = ('application/pickle', 'application/x-pickle', 'application/x-vnd.python.pickle', 'application/vnd.python.pickle') YAML_MIME_TYPES = ('text/yaml', 'text/x-yaml') @@ -75,23 +77,38 @@ class Consumer(object): DROP_INVALID_MESSAGES = False MESSAGE_TYPE = None - def __init__(self, configuration): + def __init__(self, settings): """Creates a new instance of a Consumer class. To perform initialization tasks, extend Consumer.initialize - :param dict configuration: The configuration from rejected + :param dict settings: The configuration from rejected """ - self._config = configuration self._channel = None + self._finished = False self._message = None self._message_body = None + self._settings = settings + self._statsd = None # Run any child object specified initialization self.initialize() def initialize(self): - """Extend this method for any initialization tasks""" + """Extend this method for any initialization tasks that occur only when + the `Consumer` class is created.""" + pass + + def prepare(self): + """Called when a message is received before `process`. + + Asynchronous support: Decorate this method with `.gen.coroutine` + or `.return_future` to make it asynchronous (the + `asynchronous` decorator cannot be used on `prepare`). + + If this method returns a `.Future` execution will not proceed + until the `.Future` is done. + """ pass def process(self): @@ -106,41 +123,51 @@ def process(self): """ raise NotImplementedError - def receive(self, message_in): - """Process the message from RabbitMQ. To implement logic for processing - a message, extend Consumer._process, not this method. + def on_finish(self): + """Called after the end of a request. + Override this method to perform cleanup, logging, etc. + This method is a counterpart to `prepare`. ``on_finish`` may + not produce any output, as it is called after the response + has been sent to the client. + """ + pass - :param rejected.Consumer.Message message_in: The message to process - :rtype: bool + def shutdown(self): + """Override to cleanly shutdown when rejected is stopping""" + pass - """ - LOGGER.debug('Received: %r', message_in) - self._message = message_in - self._message_body = None + def finish(self): + """Finishes message processing for the current message.""" + if self._finished: + raise RuntimeError("finish() called twice") + self._finished = True + self.on_finish() - # Validate the message type if the child sets _MESSAGE_TYPE - if self.MESSAGE_TYPE and self.MESSAGE_TYPE != self.message_type: - LOGGER.error('Received a non-supported message type: %s', - self.message_type) + def require_setting(self, name, feature="this feature"): + """Raises an exception if the given app setting is not defined.""" + if not self.settings.get(name): + raise Exception("You must define the '%s' setting in your " + "application to use %s" % (name, feature)) - # Should the message be dropped or returned to the broker? - if self.DROP_INVALID_MESSAGES: - LOGGER.debug('Dropping the invalid message') - return - else: - raise ConsumerException('Invalid message type') + def statsd_add_timing(self, key, duration): + """Add a timing to statsd - # Let the child object process the message - self.process() + :param str key: The key to add the timing to + :param int|float duration: The timing value - def set_channel(self, channel): - """Assign the _channel attribute to the channel that was passed in. - This should not be extended. + """ + if self._statsd: + self._statsd.add_timing(key, duration) - :param pika.channel.Channel channel: The channel to assign + def statsd_incr(self, key, value=1): + """Increment the specified key in statsd if statsd is enabled. + + :param str key: The key to increment + :param int value: The value to increment the key by """ - self._channel = channel + if self._statsd: + self._statsd.incr(key, value) @property def app_id(self): @@ -166,10 +193,16 @@ def configuration(self): """Access the configuration stanza for the consumer as specified by the ``config`` section for the consumer in the rejected configuration. + .. deprecated:: 3.1 + Use :property:`settings` instead. + :rtype: dict """ - return self._config + warnings.warn('Consumer.configuration is deprecated ' + 'in favor of Consumer.settings', + category=DeprecationWarning) + return self._settings @property def content_encoding(self): @@ -306,6 +339,16 @@ def message_type(self): """ return self._message.properties.type + @property + def settings(self): + """Access the consumer settings as specified by the ``config`` section + for the consumer in the rejected configuration. + + :rtype: dict + + """ + return self._settings + @property def timestamp(self): """Access the unix epoch timestamp value from the properties of the @@ -325,6 +368,69 @@ def user_id(self): """ return self._message.properties.user_id + def _clear(self): + """Resets all assigned data for the current message.""" + self._finished = False + self._message = None + self._message_body = None + + @gen.coroutine + def _execute(self, message_in): + """Process the message from RabbitMQ. To implement logic for processing + a message, extend Consumer._process, not this method. + + :param rejected.Consumer.Message message_in: The message to process + :rtype: bool + + """ + LOGGER.debug('Received: %r', message_in) + self._clear() + self._message = message_in + + # Validate the message type if the child sets _MESSAGE_TYPE + if self.MESSAGE_TYPE and self.MESSAGE_TYPE != self.message_type: + LOGGER.error('Received a non-supported message type: %s', + self.message_type) + + # Should the message be dropped or returned to the broker? + if not self.DROP_INVALID_MESSAGES: + LOGGER.debug('Dropping the invalid message') + return + else: + raise ConsumerException('Invalid message type') + + result = self.prepare() + if concurrent.is_future(result): + result = yield result + if result is not None: + raise TypeError("Expected None, got %r" % result) + if self._finished: + return + + result = self.process() + if concurrent.is_future(result): + result = yield result + if result is not None: + raise TypeError("Expected None, got %r" % result) + self.finish() + + def _set_channel(self, channel): + """Assign the _channel attribute to the channel that was passed in. + This should not be extended. + + :param pika.channel.Channel channel: The channel to assign + + """ + self._channel = channel + + def _set_statsd(self, statsd): + """Assign a `StatsdClient` instance to the class. + + :param pika.statsd.StatsdClient statsd: The StatsdClient instance + + """ + self._statsd = statsd + class PublishingConsumer(Consumer): """The PublishingConsumer extends the Consumer class, adding two methods, @@ -344,6 +450,7 @@ class PublishingConsumer(Consumer): a :py:class:`ConsumerException` is raised. """ + def publish_message(self, exchange, routing_key, properties, body): """Publish a message to RabbitMQ on the same channel the original message was received on. @@ -365,8 +472,10 @@ def publish_message(self, exchange, routing_key, properties, body): properties=msg_props, body=body) - def reply(self, response_body, properties, auto_id=True, - exchange=None, reply_to=None): + def reply(self, response_body, properties, + auto_id=True, + exchange=None, + reply_to=None): """Reply to the received message. If auto_id is True, a new uuid4 value will be generated for the @@ -409,10 +518,8 @@ def reply(self, response_body, properties, auto_id=True, if properties.reply_to: properties.reply_to = None - self.publish_message(exchange or self._message.exchange, - reply_to, - dict(properties), - response_body) + self.publish_message(exchange or self._message.exchange, reply_to, + dict(properties), response_body) @staticmethod def _get_pika_properties(properties_in): @@ -462,9 +569,9 @@ class SmartConsumer(Consumer): a :py:class:`ConsumerException` is raised. """ - def __init__(self, configuration): - self._message_body = None - super(SmartConsumer, self).__init__(configuration) + + def __init__(self, settings): + super(SmartConsumer, self).__init__(settings) @property def body(self): @@ -613,8 +720,10 @@ class SmartPublishingConsumer(SmartConsumer, PublishingConsumer): """ """ + def publish_message(self, exchange, routing_key, properties, body, - no_serialization=False, no_encoding=False): + no_serialization=False, + no_encoding=False): """Publish a message to RabbitMQ on the same channel the original message was received on. @@ -660,7 +769,6 @@ def publish_message(self, exchange, routing_key, properties, body, properties=properties_out, body=body) - def _auto_encode(self, content_encoding, value): """Based upon the value of the content_encoding, encode the value. @@ -694,17 +802,17 @@ def _auto_serialize(self, content_type, value): LOGGER.debug('Auto-serializing content as Pickle') return self._dump_pickle_value(value) - if content_type == 'application/x-plist': + if content_type == 'application/x-plist': LOGGER.debug('Auto-serializing content as plist') return self._dump_plist_value(value) - if content_type == 'text/csv': + if content_type == 'text/csv': LOGGER.debug('Auto-serializing content as csv') return self._dump_csv_value(value) # If it's XML or HTML auto - if (bs4 and isinstance(value, bs4.BeautifulSoup) and - content_type in ('text/html', 'text/xml')): + if (bs4 and isinstance(value, bs4.BeautifulSoup) and content_type in + ('text/html', 'text/xml')): LOGGER.debug('Dumping BS4 object into HTML or XML') return self._dump_bs4_value(value) @@ -735,7 +843,7 @@ def _dump_csv_value(value): """ buffer = stringio.StringIO() - writer = csv.writer(buffer,quotechar='"', quoting=csv.QUOTE_ALL) + writer = csv.writer(buffer, quotechar='"', quoting=csv.QUOTE_ALL) writer.writerows(value) buffer.seek(0) value = buffer.read() diff --git a/rejected/controller.py b/rejected/controller.py index 66938f2..92ed4c1 100644 --- a/rejected/controller.py +++ b/rejected/controller.py @@ -8,9 +8,7 @@ import signal import sys -from rejected import common from rejected import mcp -from rejected import __version__ LOGGER = logging.getLogger(__name__) @@ -20,6 +18,7 @@ class Controller(helper.Controller): of the OS level concerns. """ + def _master_control_program(self): """Return an instance of the MasterControlProgram. @@ -43,7 +42,6 @@ def _prepend_python_path(self, path): #pragma: no cover def setup(self): """Continue the run process blocking on MasterControlProgram.run""" # If the app was invoked to specified to prepend the path, do so now - common.add_null_handler() if self.args.prepend_path: self._prepend_python_path(self.args.prepend_path) @@ -95,7 +93,7 @@ def add_parser_arguments(): default=None, dest='profile', help='Profile the consumer modules, specifying ' - 'the output directory.') + 'the output directory.') argparser.add_argument('-o', '--only', action='store', default=None, @@ -108,10 +106,11 @@ def add_parser_arguments(): help='Prepend the python path with the value.') argparser.add_argument('-q', '--qty', action='store', - default=1, + type=int, + default=None, dest='quantity', - help='Run the specified quanty of consumer processes' - ' when used in conjunction with -o') + help='Run the specified quantity of consumer ' + 'processes when used in conjunction with -o') def main(): diff --git a/rejected/data.py b/rejected/data.py index 4b70658..0fa9309 100644 --- a/rejected/data.py +++ b/rejected/data.py @@ -30,7 +30,6 @@ def __repr__(self): return '<%s(%s)>' % (self.__class__.__name__, items) - class Message(Data): """Class for containing all the attributes about a message object creating a flatter, move convenient way to access the data while supporting the legacy diff --git a/rejected/example.py b/rejected/example.py index a5458ab..e44b350 100644 --- a/rejected/example.py +++ b/rejected/example.py @@ -3,6 +3,9 @@ import logging import random +from tornado import gen +from tornado import httpclient + __version__ = '1.0.0' LOGGER = logging.getLogger(__name__) @@ -11,12 +14,20 @@ class ExampleConsumer(consumer.Consumer): def process(self): - LOGGER.debug('Message: %r', self._message.body) - action = random.randint(0, 1000) + LOGGER.debug('Message: %r', self.body) + action = random.randint(0, 10000) if action == 0: - raise ValueError('Unhandled exception') - elif action < 2: raise consumer.ConsumerException('zomg') elif action < 5: raise consumer.MessageException('reject') + +class AsyncExampleConsumer(consumer.Consumer): + + @gen.coroutine + def process(self): + LOGGER.debug('Message: %r', self.body) + http_client = httpclient.AsyncHTTPClient() + results = yield [http_client.fetch('http://www.github.com'), + http_client.fetch('http://www.reddit.com')] + LOGGER.info('Length: %r', [len(r.body) for r in results]) diff --git a/rejected/mcp.py b/rejected/mcp.py index 173b895..99f06bf 100644 --- a/rejected/mcp.py +++ b/rejected/mcp.py @@ -11,20 +11,35 @@ import sys import time -from rejected import common +from rejected import state from rejected import process from rejected import __version__ LOGGER = logging.getLogger(__name__) -class MasterControlProgram(common.State): +class Consumer(object): + """Class used for keeping track of each consumer type being managed by + the MCP + + """ + + def __init__(self, connections, last_proc_num, processes, qty, queue): + self.connections = connections + self.last_proc_num = last_proc_num + self.processes = processes + self.qty = qty + self.queue = queue + + +class MasterControlProgram(state.State): """Master Control Program keeps track of and manages consumer processes.""" - _DEFAULT_CONSUMER_QTY = 1 - _MAX_SHUTDOWN_WAIT = 10 - _POLL_INTERVAL = 60.0 - _POLL_RESULTS_INTERVAL = 3.0 - _SHUTDOWN_WAIT = 1 + + DEFAULT_CONSUMER_QTY = 1 + MAX_SHUTDOWN_WAIT = 10 + POLL_INTERVAL = 60.0 + POLL_RESULTS_INTERVAL = 3.0 + SHUTDOWN_WAIT = 1 def __init__(self, config, consumer=None, profile=None, quantity=None): """Initialize the Master Control Program @@ -41,27 +56,26 @@ def __init__(self, config, consumer=None, profile=None, quantity=None): super(MasterControlProgram, self).__init__() # Default values - self._consumer = consumer - self._consumers = dict() - self._config = config - self._last_poll_results = dict() - self._poll_data = {'time': 0, 'processes': list()} - self._poll_timer = None - self._profile = profile - self._quantity = quantity if consumer else None - self._results_timer = None - self._stats = dict() - self._stats_queue = multiprocessing.Queue() - self._polled = False + self.consumer_cfg = self.get_consumer_cfg(config, consumer, quantity) + self.consumers = dict() + self.config = config + self.last_poll_results = dict() + self.poll_data = {'time': 0, 'processes': list()} + self.poll_timer = None + self.profile = profile + self.results_timer = None + self.stats = dict() + self.stats_queue = multiprocessing.Queue() + self.polled = False # Carry for logging internal stats collection data - self._log_stats_enabled = config.application.get('log_stats', False) - LOGGER.debug('Stats logging enabled: %s', self._log_stats_enabled) + self.log_stats_enabled = config.application.get('log_stats', False) + LOGGER.debug('Stats logging enabled: %s', self.log_stats_enabled) # Setup the poller related threads - self._poll_interval = config.application.get('poll_interval', - self._POLL_INTERVAL) - LOGGER.debug('Set process poll interval to %.2f', self._poll_interval) + self.poll_interval = config.application.get('poll_interval', + self.POLL_INTERVAL) + LOGGER.debug('Set process poll interval to %.2f', self.poll_interval) @property def active_processes(self): @@ -71,8 +85,8 @@ def active_processes(self): """ active_processes, dead_processes = list(), list() - for consumer in self._consumers: - for name in self._consumers[consumer]['processes']: + for consumer in self.consumers: + for name in self.consumers[consumer].processes: child = self.get_consumer_process(consumer, name) if int(child.pid) == os.getpid(): continue @@ -82,7 +96,7 @@ def active_processes(self): dead_processes.append((consumer, name)) continue - if self.is_a_dead_or_zombie_process(proc): + if self.is_dead(proc): dead_processes.append((consumer, name)) else: active_processes.append(child) @@ -113,30 +127,18 @@ def calculate_stats(self, data): self.process_count_by_consumer(name) for proc in data[name].keys(): for key in stats: - value = data[name][proc]['counts'][key] + value = data[name][proc]['counts'].get(key, 0) stats[key] += value consumer_stats[name][key] += value # Return a data structure that can be used in reporting out the stats stats['processes'] = len(self.active_processes) - return {'last_poll': timestamp, - 'consumers': consumer_stats, - 'process_data': data, - 'counts': stats} - - @staticmethod - def calculate_velocity(counts): - """Calculate the message velocity to determine how many messages are - processed per second. - - :param dict counts: The count dictionary to use for calculation - :rtype: float - - """ - total_time = counts['idle_time'] + counts['processing_time'] - if total_time and counts['processed']: - return float(counts['processed'] / float(total_time)) - return 0 + return { + 'last_poll': timestamp, + 'consumers': consumer_stats, + 'process_data': data, + 'counts': stats + } def check_process_counts(self): """Check for the minimum consumer process levels and start up new @@ -144,8 +146,8 @@ def check_process_counts(self): """ LOGGER.debug('Checking minimum consumer process levels') - for name in self._consumers: - for connection in self._consumers[name]['connections']: + for name in self.consumers: + for connection in self.consumers[name].connections: processes_needed = self.process_spawn_qty(name, connection) LOGGER.debug('Need to spawn %i processes for %s on %s', processes_needed, name, connection) @@ -159,7 +161,7 @@ def collect_results(self, data_values): :type data_values: dict """ - self._last_poll_results['timestamp'] = self._poll_data['timestamp'] + self.last_poll_results['timestamp'] = self.poll_data['timestamp'] # Get the name and consumer name and remove it from what is reported consumer_name = data_values['consumer_name'] @@ -168,29 +170,12 @@ def collect_results(self, data_values): del data_values['name'] # Add it to our last poll global data - if consumer_name not in self._last_poll_results: - self._last_poll_results[consumer_name] = dict() - self._last_poll_results[consumer_name][process_name] = data_values + if consumer_name not in self.last_poll_results: + self.last_poll_results[consumer_name] = dict() + self.last_poll_results[consumer_name][process_name] = data_values # Calculate the stats - self._stats = self.calculate_stats(self._last_poll_results) - - def consumer_dict(self, configuration): - """Return a consumer dict for the given name and configuration. - - :param dict configuration: The consumer configuration - :rtype: dict - - """ - # Keep a dict that has a list of processes by connection - connections = dict() - for connection in configuration['connections']: - connections[connection] = list() - return {'connections': connections, - 'qty': configuration.get('qty', self._DEFAULT_CONSUMER_QTY), - 'last_proc_num': 0, - 'queue': configuration['queue'], - 'processes': dict()} + self.stats = self.calculate_stats(self.last_poll_results) @staticmethod def consumer_keyword(counts): @@ -209,11 +194,11 @@ def consumer_stats_counter(): :rtype: dict """ - return {process.Process.ERROR: 0, - process.Process.PROCESSED: 0, - process.Process.REDELIVERED: 0, - process.Process.TIME_SPENT: 0, - process.Process.TIME_WAITED: 0} + return { + process.Process.ERROR: 0, + process.Process.PROCESSED: 0, + process.Process.REDELIVERED: 0 + } def get_consumer_process(self, consumer, name): """Get the process object for the specified consumer and process name. @@ -223,28 +208,48 @@ def get_consumer_process(self, consumer, name): :returns: multiprocessing.Process """ - return self._consumers[consumer]['processes'].get(name) + return self.consumers[consumer].processes.get(name) @staticmethod - def is_a_dead_or_zombie_process(process): - """Checks to see if the specified process is a zombie or dead. + def get_consumer_cfg(config, only, qty): + """Get the consumers config, possibly filtering the config if only + or qty is set. - :param psutil.Process: The process to check + :param config: The consumers config section + :type config: helper.config.Config + :param str only: When set, filter to run only this consumer + :param int qty: When set, set the consumer qty to this value + :rtype: dict + + """ + consumers = dict(config.application.Consumers) + if only: + for key in consumers.keys(): + if key != only: + del consumers[key] + if qty: + consumers[only]['qty'] = qty + return consumers + + @staticmethod + def is_dead(proc): + """Checks to see if the specified process is dead. + + :param psutil.Process proc: The process to check :rtype: bool """ try: - if process.status in [psutil.STATUS_DEAD, psutil.STATUS_ZOMBIE]: + if proc.status == psutil.STATUS_DEAD: try: - LOGGER.debug('Found dead or zombie process with ' - '%s fds', - process.get_num_fds()) + LOGGER.debug('Found dead process with %s fds', + proc.get_num_fds()) except psutil.AccessDenied as error: - LOGGER.debug('Found dead or zombie process, ' - 'could not get fd count: %s', error) + LOGGER.debug('Found dead process, could not get ' + 'fd count: %s', error) return True except psutil.NoSuchProcess: - LOGGER.debug('Found dead or zombie process') + LOGGER.debug('Process is dead and does not exist') return True return False @@ -258,13 +263,13 @@ def kill_processes(self): processes = True while processes: processes = self.active_processes - for process in processes: - if int(process.pid) != int(os.getpid()): - LOGGER.warning('Killing %s (%s)', process.name, process.pid) - os.kill(int(process.pid), signal.SIGKILL) + for proc in processes: + if int(proc.pid) != int(os.getpid()): + LOGGER.warning('Killing %s (%s)', proc.name, proc.pid) + os.kill(int(proc.pid), signal.SIGKILL) else: LOGGER.warning('Cowardly refusing kill self (%s, %s)', - process.pid, os.getpid()) + proc.pid, os.getpid()) time.sleep(0.5) LOGGER.info('Killed all children') @@ -272,39 +277,38 @@ def kill_processes(self): def log_stats(self): """Output the stats to the LOGGER.""" - if not self._stats.get('counts'): + if not self.stats.get('counts'): LOGGER.info('Did not receive any stats data from children') return - LOGGER.info('%i total %s have processed %i messages with %i ' - 'errors, waiting %.2f seconds and have spent %.2f seconds ' - 'processing messages with an overall velocity of %.2f ' - 'messages per second.', - self._stats['counts']['processes'], - self.consumer_keyword(self._stats['counts']), - self._stats['counts']['processed'], - self._stats['counts']['failed'], - self._stats['counts']['idle_time'], - self._stats['counts']['processing_time'], - self.calculate_velocity(self._stats['counts'])) - for key in self._stats['consumers'].keys(): - LOGGER.info('%i %s for %s have processed %i messages with %i ' - 'errors, waiting %.2f seconds and have spent %.2f ' - 'seconds processing messages with an overall velocity ' - 'of %.2f messages per second.', - self._stats['consumers'][key]['processes'], - self.consumer_keyword(self._stats['consumers'][key]), - key, - self._stats['consumers'][key]['processed'], - self._stats['consumers'][key]['failed'], - self._stats['consumers'][key]['idle_time'], - self._stats['consumers'][key]['processing_time'], - self.calculate_velocity(self._stats['consumers'][key])) - if self._poll_data['processes']: - LOGGER.warning('%i process(es) did not respond with stats in ' - 'time: %r', - len(self._poll_data['processes']), - self._poll_data['processes']) + if self.poll_data['processes']: + LOGGER.warning('%i process(es) did not respond with stats: %r', + len(self.poll_data['processes']), + self.poll_data['processes']) + + if self.stats['counts']['processes'] > 1: + LOGGER.info('%i consumers processed %i messages with %i errors', + self.stats['counts']['processes'], + self.stats['counts']['processed'], + self.stats['counts']['failed']) + + for key in self.stats['consumers'].keys(): + LOGGER.info('%i %s %s processed %i messages with %i errors', + self.stats['consumers'][key]['processes'], key, + self.consumer_keyword(self.stats['consumers'][key]), + self.stats['consumers'][key]['processed'], + self.stats['consumers'][key]['failed']) + + def new_consumer(self, config): + """Return a consumer dict for the given name and configuration. + + :param dict config: The consumer configuration + :rtype: dict + + """ + return Consumer(dict([(c, []) for c in config['connections']]), 0, + dict(), config.get('qty', self.DEFAULT_CONSUMER_QTY), + config['queue']) def new_process(self, consumer_name, connection_name): """Create a new consumer instances @@ -316,15 +320,17 @@ def new_process(self, consumer_name, connection_name): """ process_name = '%s-%s' % (consumer_name, self.new_process_number(consumer_name)) - LOGGER.debug('Creating a new process for %s: %s', - connection_name, process_name) - kwargs = {'config': self._config.application, - 'connection_name': connection_name, - 'consumer_name': consumer_name, - 'profile': self._profile, - 'daemon': False, - 'stats_queue': self._stats_queue, - 'logging_config': self._config.logging} + LOGGER.debug('Creating a new process for %s: %s', connection_name, + process_name) + kwargs = { + 'config': self.config.application, + 'connection_name': connection_name, + 'consumer_name': consumer_name, + 'profile': self.profile, + 'daemon': False, + 'stats_queue': self.stats_queue, + 'logging_config': self.config.logging + } return process_name, process.Process(name=process_name, kwargs=kwargs) def new_process_number(self, name): @@ -335,22 +341,22 @@ def new_process_number(self, name): :rtype: int """ - self._consumers[name]['last_proc_num'] += 1 - return self._consumers[name]['last_proc_num'] + self.consumers[name].last_proc_num += 1 + return self.consumers[name].last_proc_num - def on_timer(self, signum, unused_frame): + def on_timer(self, _signum, _unused_frame): LOGGER.debug('Timer fired') - if not self._polled: + if not self.polled: self.poll() - self._polled = True + self.polled = True self.set_timer(5) # Wait 5 seconds for results else: - self._polled = False + self.polled = False self.poll_results_check() - self.set_timer(self._poll_interval) # Wait poll interval duration + self.set_timer(self.poll_interval) # Wait poll interval duration # If stats logging is enabled, log the stats - if self._log_stats_enabled: + if self.log_stats_enabled: self.log_stats() def poll(self): @@ -367,8 +373,7 @@ def poll(self): return self.set_state(self.STATE_STOPPED) # Start our data collection dict - self._poll_data = {'timestamp': time.time(), - 'processes': list()} + self.poll_data = {'timestamp': time.time(), 'processes': list()} # Iterate through all of the consumers for proc in self.active_processes: @@ -379,7 +384,7 @@ def poll(self): # Send the profile signal os.kill(int(proc.pid), signal.SIGPROF) - self._poll_data['processes'].append(proc.name) + self.poll_data['processes'].append(proc.name) # Check if we need to start more processes self.check_process_counts() @@ -390,8 +395,7 @@ def poll_duration_exceeded(self): :rtype: bool """ - return (time.time() - - self._poll_data['timestamp']) >= self._poll_interval + return (time.time() - self.poll_data['timestamp']) >= self.poll_interval def poll_results_check(self): """Check the polling results by checking to see if the stats queue is @@ -402,29 +406,29 @@ def poll_results_check(self): LOGGER.debug('Checking for poll results') while True: try: - stats = self._stats_queue.get(False) + stats = self.stats_queue.get(False) except Queue.Empty: break try: - self._poll_data['processes'].remove(stats['name']) + self.poll_data['processes'].remove(stats['name']) except ValueError: pass self.collect_results(stats) - if self._poll_data['processes']: + if self.poll_data['processes']: LOGGER.warning('Did not receive results from %r', - self._poll_data['processes']) + self.poll_data['processes']) - def process(self, consumer_name, process_name): + def process(self, name, process_name): """Return the process handle for the given consumer name and process name. - :param str consumer_name: The consumer name from config + :param str name: The consumer name from config :param str process_name: The automatically assigned process name :rtype: rejected.process.Process """ - return self._consumers[consumer_name]['processes'][process_name] + return self.consumers[name].processes[process_name] def process_count(self, name, connection): """Return the process count for the given consumer name and connection. @@ -434,7 +438,7 @@ def process_count(self, name, connection): :rtype: int """ - return len(self._consumers[name]['connections'][connection]) + return len(self.consumers[name].connections[connection]) def process_count_by_consumer(self, name): """Return the process count by consumer only. @@ -444,8 +448,8 @@ def process_count_by_consumer(self, name): """ count = 0 - for connection in self._consumers[name]['connections']: - count += len(self._consumers[name]['connections'][connection]) + for connection in self.consumers[name].connections: + count += len(self.consumers[name].connections.get(connection)) return count def process_spawn_qty(self, name, connection): @@ -457,8 +461,7 @@ def process_spawn_qty(self, name, connection): :rtype: int """ - return self._consumers[name]['qty'] - self.process_count(name, - connection) + return self.consumers[name].qty - self.process_count(name, connection) def remove_consumer_process(self, consumer, name): """Remove all details for the specified consumer and process name. @@ -467,51 +470,33 @@ def remove_consumer_process(self, consumer, name): :param str name: The process name """ - for conn in self._consumers[consumer]['connections']: - if name in self._consumers[consumer]['connections'][conn]: - self._consumers[consumer]['connections'][conn].remove(name) - if name in self._consumers[consumer]['processes']: + for conn in self.consumers[consumer].connections: + if name in self.consumers[consumer].connections[conn]: + self.consumers[consumer].connections[conn].remove(name) + if name in self.consumers[consumer].processes: try: - self._consumers[consumer]['processes'][name].terminate() + self.consumers[consumer].processes[name].terminate() except OSError: pass - del self._consumers[consumer]['processes'][name] + del self.consumers[consumer].processes[name] def run(self): """When the consumer is ready to start running, kick off all of our consumer consumers and then loop while we process messages. """ - # Set the state to active self.set_state(self.STATE_ACTIVE) - - # Get the consumer section from the config - if 'Consumers' not in self._config.application: - LOGGER.error('Missing Consumers section of configuration, ' - 'aborting: %r', self._config.application) - return self.set_state(self.STATE_STOPPED) - - # Strip consumers if a consumer is specified - if self._consumer: - consumers = self._config.application.Consumers.keys() - for consumer in consumers: - if consumer != self._consumer: - LOGGER.debug('Removing %s for %s only processing', - consumer, self._consumer) - del self._config.application.Consumers[consumer] - - # Setup consumers and start the processes self.setup_consumers() # Set the SIGALRM handler for poll interval signal.signal(signal.SIGALRM, self.on_timer) # Kick off the poll timer - signal.setitimer(signal.ITIMER_REAL, self._poll_interval, 0) + signal.setitimer(signal.ITIMER_REAL, self.poll_interval, 0) # Loop for the lifetime of the app, pausing for a signal to pop up while self.is_running and self.total_process_count: - if self._state != self.STATE_SLEEPING: + if not self.is_sleeping: self.set_state(self.STATE_SLEEPING) signal.pause() @@ -521,14 +506,14 @@ def run(self): @staticmethod def set_process_name(): """Set the process name for the top level process so that it shows up - in logs in a more trackable fasion. + in logs in a more trackable fashion. """ - process = multiprocessing.current_process() + proc = multiprocessing.current_process() for offset in xrange(0, len(sys.argv)): if sys.argv[offset] == '-c': name = sys.argv[offset + 1].split('/')[-1] - process.name = name.split('.')[0] + proc.name = name.split('.')[0] break def set_timer(self, duration): @@ -552,17 +537,16 @@ def start_process(self, name, connection): :param str connection: The connection name """ - process_name, process = self.new_process(name, connection) - - LOGGER.info('Spawning %s process for %s to %s', - process_name, name, connection) + process_name, proc = self.new_process(name, connection) + LOGGER.info('Spawning %s process for %s to %s', process_name, name, + connection) # Append the process to the consumer process list - self._consumers[name]['processes'][process_name] = process - self._consumers[name]['connections'][connection].append(process_name) + self.consumers[name].processes[process_name] = proc + self.consumers[name].connections[connection].append(process_name) # Start the process - process.start() + proc.start() def start_processes(self, name, connection, quantity): """Start the specified quantity of consumer processes for the given @@ -573,35 +557,17 @@ def start_processes(self, name, connection, quantity): :param int quantity: The quantity of processes to start """ - for process in xrange(0, quantity): - self.start_process(name, connection) + [self.start_process(name, connection) for _i in range(0, quantity)] def setup_consumers(self): """Iterate through each consumer in the configuration and kick off the minimal amount of processes, setting up the runtime data as well. """ - for name in self._config.application.Consumers: - - # Hold the config as a shortcut - config = self._config.application.Consumers[name] - - # If queue is not configured, report the error but skip processes - if 'queue' not in config: - LOGGER.critical('Consumer %s is missing a queue, skipping', - name) - continue - - # Create the dictionary values for this process - self._consumers[name] = self.consumer_dict(config) - - if self._quantity: - self._consumers[name]['qty'] = self._quantity - - # Iterate through the connections to create new consumer processes - for connection in self._consumers[name]['connections']: - self.start_processes(name, connection, - self._consumers[name]['qty']) + for name in self.consumer_cfg.keys(): + self.consumers[name] = self.new_consumer(self.consumer_cfg[name]) + for connection in self.consumers[name].connections: + self.start_processes(name, connection, self.consumers[name].qty) def stop_processes(self): """Iterate through all of the consumer processes shutting them down.""" @@ -615,9 +581,9 @@ def stop_processes(self): # Send SIGABRT LOGGER.info('Sending SIGABRT to active children') - for process in multiprocessing.active_children(): - if int(process.pid) != os.getpid(): - os.kill(int(process.pid), signal.SIGABRT) + for proc in multiprocessing.active_children(): + if int(proc.pid) != os.getpid(): + os.kill(int(proc.pid), signal.SIGABRT) # Wait for them to finish up to MAX_SHUTDOWN_WAIT iterations = 0 @@ -634,7 +600,7 @@ def stop_processes(self): return iterations += 1 - if iterations == self._MAX_SHUTDOWN_WAIT: + if iterations == self.MAX_SHUTDOWN_WAIT: self.kill_processes() break processes = self.total_process_count diff --git a/rejected/process.py b/rejected/process.py index c36936f..dc7b91f 100644 --- a/rejected/process.py +++ b/rejected/process.py @@ -1,31 +1,35 @@ -"""Consumer process management. Imports consumer code, manages RabbitMQ +""" +Consumer process management. Imports consumer code, manages RabbitMQ connection state and collects stats about the consuming process. """ from pika import exceptions +from tornado import gen import importlib import logging import math import multiprocessing import os +from os import path import pika try: import cProfile as profile except ImportError: import profile +from pika import spec from pika.adapters import tornado_connection import signal -import socket import sys import time from tornado import ioloop import traceback - from rejected import __version__ -from rejected import common from rejected import consumer from rejected import data +from rejected import NullHandler +from rejected import state +from rejected import stats LOGGER = logging.getLogger(__name__) @@ -51,12 +55,12 @@ def import_consumer(value): return getattr(import_handle, parts[-1]), version -class Process(multiprocessing.Process, common.State): +class Process(multiprocessing.Process, state.State): """Core process class that manages the consumer object and communicates with RabbitMQ. """ - _AMQP_APP_ID = 'rejected/%s' % __version__ + AMQP_APP_ID = 'rejected/%s' % __version__ # Additional State constants STATE_PROCESSING = 0x04 @@ -75,61 +79,53 @@ class Process(multiprocessing.Process, common.State): TIME_WAITED = 'idle_time' UNHANDLED_EXCEPTIONS = 'unhandled_exceptions' - _HBINTERVAL = 300 + HB_INTERVAL = 300 # Default message pre-allocation value - _QOS_PREFETCH_COUNT = 1 - _QOS_PREFETCH_MULTIPLIER = 1.25 - _QOS_MAX = 10000 - _MAX_ERROR_COUNT = 5 - _MAX_ERROR_WINDOW = 60 - _MAX_SHUTDOWN_WAIT = 5 - _RECONNECT_DELAY = 10 - - _STATSD_FORMAT = '{0}.consumer.{1}.{2}.{3}:{4}|c\n' - - def __init__(self, group=None, target=None, name=None, args=(), + QOS_PREFETCH_COUNT = 1 + QOS_PREFETCH_MULTIPLIER = 1.25 + QOS_MAX = 10000 + MAX_ERROR_COUNT = 5 + MAX_ERROR_WINDOW = 60 + MAX_SHUTDOWN_WAIT = 5 + RECONNECT_DELAY = 10 + + def __init__(self, + group=None, + target=None, + name=None, + args=(), kwargs=None): if kwargs is None: kwargs = {} super(Process, self).__init__(group, target, name, args, kwargs) - self._ack = True - self._application = None - self._channel = None - self._config = None - self._connection = None - self._connection_id = 0 - self._connection_name = None - self._connections = None - self._consumer = None - self._consumer_name = None - self._counts = self.new_counter_dict() - self._dynamic_qos = True - self._ioloop = None - self._last_counts = dict() - self._last_failure = 0 - self._last_stats_time = None - self._logging_config = dict() - self._message_connection_id = None - self._max_error_count = self._MAX_ERROR_COUNT - self._max_frame_size = pika.spec.FRAME_MAX_SIZE - self._qos_prefetch = None - self._queue_name = None - self._prepend_path = None - self._state = self.STATE_INITIALIZING - self._state_start = time.time() - self._stats_queue = None - self._statsd = False - self._statsd_host = 'localhost' - self._statsd_port = 8125 - self._statsd_prefix = 'rejected' - self._statsd_socket = socket.socket(socket.AF_INET, - socket.SOCK_DGRAM, - socket.IPPROTO_UDP) - self._node_name = socket.gethostname().split('.')[0] + self.ack = True + self.channel = None + self.config = None + self.connection = None + self.connection_id = 0 + self.connection_name = None + self.connections = None + self.consumer = None + self.consumer_name = None + self.dynamic_qos = True + self.ioloop = None + self.last_failure = 0 + self.last_stats_time = None + self.logging_config = dict() + self.message_connection_id = None + self.max_error_count = self.MAX_ERROR_COUNT + self.max_frame_size = spec.FRAME_MAX_SIZE + self.qos_prefetch = None + self.queue_name = None + self.prepend_path = None + self.state = self.STATE_INITIALIZING + self.state_start = time.time() + self.stats = None + self.stats_queue = None # Override ACTIVE with PROCESSING - self._STATES[0x04] = 'Processing' + self.STATES[0x04] = 'Processing' def ack_message(self, delivery_tag): """Acknowledge the message on the broker and log the ack @@ -139,11 +135,11 @@ def ack_message(self, delivery_tag): """ if not self.can_respond: LOGGER.warning('Can not ack message, disconnected from RabbitMQ') - self.increment_count(self.CLOSED_ON_COMPLETE) + self.stats.incr(self.CLOSED_ON_COMPLETE) return LOGGER.debug('Acking %s', delivery_tag) - self._channel.basic_ack(delivery_tag=delivery_tag) - self.increment_count(self.ACKED) + self.channel.basic_ack(delivery_tag=delivery_tag) + self.stats.incr(self.ACKED) def add_on_channel_close_callback(self): """This method tells pika to call the on_channel_closed method if @@ -151,7 +147,7 @@ def add_on_channel_close_callback(self): """ LOGGER.debug('Adding channel close callback') - self._channel.add_on_close_callback(self.on_channel_closed) + self.channel.add_on_close_callback(self.on_channel_closed) def add_on_connection_close_callback(self): """This method adds an on close callback that will be invoked by pika @@ -159,7 +155,7 @@ def add_on_connection_close_callback(self): """ LOGGER.debug('Adding connection close callback') - self._connection.add_on_close_callback(self.on_connection_closed) + self.connection.add_on_close_callback(self.on_connection_closed) @property def base_qos_prefetch(self): @@ -168,36 +164,32 @@ def base_qos_prefetch(self): :rtype: int """ - return self._config.get('qos_prefetch', self._QOS_PREFETCH_COUNT) + return self.config.get('qos_prefetch', self.QOS_PREFETCH_COUNT) - def calculate_qos_prefetch(self): + def calc_qos_prefetch(self, values): """Determine if the channel should use the dynamic QoS value, stay at the same QoS or use the default QoS. :rtype: bool or int """ - if not self._last_stats_time: + if not self.last_stats_time: return - qos_prefetch = self.dynamic_qos_pretch + velocity = self.calc_velocity(values) + qos_prefetch = int(math.ceil(velocity * + float(self.QOS_PREFETCH_MULTIPLIER))) # Don't change anything - if qos_prefetch == self._qos_prefetch: + if qos_prefetch == self.qos_prefetch: LOGGER.debug('No change in QoS prefetch calculation of %i', - self._qos_prefetch) - return False - - # Don't change anything - if self.count_processed_last_interval < qos_prefetch: - LOGGER.error('Processed fewer messages last interval than the ' - 'qos_prefetch value') + self.qos_prefetch) return False # If calculated QoS exceeds max - if qos_prefetch > self._QOS_MAX: - LOGGER.debug('Hit QoS Max ceiling of %i', self._QOS_MAX) - return self.set_qos_prefetch(self._QOS_MAX) + if qos_prefetch > self.QOS_MAX: + LOGGER.debug('Hit QoS Max ceiling of %i', self.QOS_MAX) + return self.set_qos_prefetch(self.QOS_MAX) # Set to base value if QoS calc is < than the base if self.base_qos_prefetch > qos_prefetch: @@ -206,17 +198,36 @@ def calculate_qos_prefetch(self): return self.set_qos_prefetch() # Increase the QoS setting - if qos_prefetch > self._qos_prefetch: + if qos_prefetch > self.qos_prefetch: LOGGER.debug('QoS calculation is higher than previous: %i > %i', - qos_prefetch, self._qos_prefetch) + qos_prefetch, self.qos_prefetch) return self.set_qos_prefetch(qos_prefetch) # Lower the QoS value based upon the processed qty - if qos_prefetch < self._qos_prefetch: + if qos_prefetch < self.qos_prefetch: LOGGER.debug('QoS calculation is lower than previous: %i < %i', - qos_prefetch, self._qos_prefetch) + qos_prefetch, self.qos_prefetch) return self.set_qos_prefetch(qos_prefetch) + def calc_velocity(self, values): + """Return the message consuming velocity for the process. + + :rtype: float + + """ + processed = (values['counts'].get(self.PROCESSED, 0) - + values['previous'].get(self.PROCESSED, 0)) + duration = time.time() - self.last_stats_time + + # If there were no messages, do not calculate, use the base + if not processed or not duration: + return 0 + + # Calculate the velocity as the basis for the calculation + velocity = float(processed) / float(duration) + LOGGER.debug('Message processing velocity: %.2f/s', velocity) + return velocity + @property def can_respond(self): """Indicates if the process can still respond to RabbitMQ when the @@ -225,20 +236,20 @@ def can_respond(self): :return: bool """ - if not self._channel: + if not self.channel: return False - return self._message_connection_id == self._connection_id + return self.message_connection_id == self.connection_id def cancel_consumer_with_rabbitmq(self): """Tell RabbitMQ the process no longer wants to consumer messages.""" LOGGER.debug('Sending a Basic.Cancel to RabbitMQ') - if self._channel and self._channel.is_open: - self._channel.basic_cancel(consumer_tag=self.name) + if self.channel and self.channel.is_open: + self.channel.basic_cancel(consumer_tag=self.name) def close_connection(self): """This method closes the connection to RabbitMQ.""" LOGGER.info('Closing connection') - self._connection.close() + self.connection.close() def connect_to_rabbitmq(self, cfg, name): """Connect to RabbitMQ returning the connection handle. @@ -248,57 +259,18 @@ def connect_to_rabbitmq(self, cfg, name): :rtype: pika.adapters.tornado_connection.TornadoConnection """ - LOGGER.debug('Connecting to %s:%i:%s as %s', - cfg[name]['host'], cfg[name]['port'], - cfg[name]['vhost'], cfg[name]['user']) + LOGGER.debug('Connecting to %s:%i:%s as %s', cfg[name]['host'], + cfg[name]['port'], cfg[name]['vhost'], cfg[name]['user']) self.set_state(self.STATE_CONNECTING) - self._connection_id += 1 - hb_interval = cfg[name].get('heartbeat_interval', self._HBINTERVAL) - parameters = self.get_connection_parameters(cfg[name]['host'], - cfg[name]['port'], - cfg[name]['vhost'], - cfg[name]['user'], - cfg[name]['pass'], - hb_interval) + self.connection_id += 1 + hb_interval = cfg[name].get('heartbeat_interval', self.HB_INTERVAL) + parameters = self.get_connection_parameters( + cfg[name]['host'], cfg[name]['port'], cfg[name]['vhost'], + cfg[name]['user'], cfg[name]['pass'], hb_interval) return tornado_connection.TornadoConnection(parameters, self.on_connection_open, stop_ioloop_on_close=False) - def count(self, stat): - """Return the current count quantity for a specific stat. - - :param str stat: Name of stat to get value for - :rtype: int or float - - """ - return self._counts.get(stat, 0) - - @property - def count_processed_last_interval(self): - """Return the number of messages counted in the last interval. If - there is no last interval counts, return 0. - - :rtype: int - - """ - if not self._last_counts: - return 0 - return self._counts[self.PROCESSED] - self._last_counts[self.PROCESSED] - - @property - def dynamic_qos_pretch(self): - """Calculate the prefetch count based upon the message velocity * the - _QOS_PREFETCH_MULTIPLIER. - - :rtype: int - - """ - # Round up the velocity * the multiplier - value = int(math.ceil(self.message_velocity * - float(self._QOS_PREFETCH_MULTIPLIER))) - LOGGER.debug('Calculated prefetch value: %i', value) - return value - @staticmethod def get_config(cfg, number, name, connection): """Initialize a new consumer thread, setting defaults and config values @@ -310,10 +282,12 @@ def get_config(cfg, number, name, connection): :rtype: dict """ - return {'connection': cfg['Connections'][connection], - 'connection_name': connection, - 'consumer_name': name, - 'process_name': '%s_%i_tag_%i' % (name, os.getpid(), number)} + return { + 'connection': cfg['Connections'][connection], + 'connection_name': connection, + 'consumer_name': name, + 'process_name': '%s_%i_tag_%i' % (name, os.getpid(), number) + } def get_connection_parameters(self, host, port, vhost, username, password, heartbeat_interval): @@ -330,7 +304,7 @@ def get_connection_parameters(self, host, port, vhost, username, password, """ credentials = pika.PlainCredentials(username, password) return pika.ConnectionParameters(host, port, vhost, credentials, - frame_max=self._max_frame_size, + frame_max=self.max_frame_size, socket_timeout=10, heartbeat_interval=heartbeat_interval) @@ -376,18 +350,24 @@ def get_consumer(cfg): LOGGER.error(line) return - def increment_count(self, counter, value=1): - """Increment the specified counter, checking to see if the counter is - the error counter. if it is, check to see if there have been too many - errors and if it needs to reconnect. + def on_processed(self, start_time, method, result): + LOGGER.debug('Post invoke consumer') + self.stats.add_timing(self.TIME_SPENT, time.time() - start_time) - :param str counter: The counter name passed in from the constant - :param int|float value: The amount to increment by + if not result: + LOGGER.debug('Bypassing ack due to False return consumer') + return - """ - self._counts[counter] += value + self.stats.incr(self.PROCESSED) + if self.ack: + self.ack_message(method.delivery_tag) + if self.is_waiting_to_shutdown: + self.on_ready_to_stop() + else: + self.reset_state() - def invoke_consumer(self, message): + @gen.engine + def invoke_consumer(self, method, message): """Wrap the actual processor processing bits :param Message message: Message to process @@ -395,55 +375,52 @@ def invoke_consumer(self, message): """ self.start_message_processing() + start_time = time.time() # Try and process the message try: - LOGGER.debug('Processing message') - self._consumer.receive(message) + result = self.consumer._execute(message) + yield result + possible_exception = result.exception() + if possible_exception: + raise possible_exception + self.on_processed(start_time, method, True) except KeyboardInterrupt: - + LOGGER.debug('CTRL-C') self.reject(message.delivery_tag, True) self.stop() - return False + self.on_processed(start_time, method, False) except exceptions.ChannelClosed as error: LOGGER.critical('RabbitMQ closed the channel: %r', error) self.reconnect() - return False + self.on_processed(start_time, method, False) except exceptions.ConnectionClosed as error: LOGGER.critical('RabbitMQ closed the connection: %r', error) self.reconnect() - return False + self.on_processed(start_time, method, False) except consumer.ConsumerException as error: + LOGGER.debug('Consumer Exception') self.record_exception(error, True, sys.exc_info()) self.reject(message.delivery_tag, True) - self.processing_failure() - return False + self.processing_error() + self.on_processed(start_time, method, False) except consumer.MessageException as error: + LOGGER.debug('Message Exception') self.record_exception(error, True, sys.exc_info()) self.reject(message.delivery_tag, False) - return False + self.on_processed(start_time, method, False) except Exception as error: + LOGGER.debug('Exception') self.record_exception(error, True, sys.exc_info()) self.reject(message.delivery_tag, True) - self.processing_failure() - return False - - return True - - @property - def is_idle(self): - """Is the system idle - - :rtype: bool - - """ - return self._state == self.STATE_IDLE + self.processing_error() + self.on_processed(start_time, method, False) @property def is_processing(self): @@ -452,46 +429,7 @@ def is_processing(self): :rtype: bool """ - return self._state in [self.STATE_PROCESSING, self.STATE_STOP_REQUESTED] - - @property - def message_velocity(self): - """Return the message consuming velocity for the process. - - :rtype: float - - """ - processed = self.count_processed_last_interval - duration = time.time() - self._last_stats_time - LOGGER.debug('Processed %i messages in %i seconds', processed, duration) - - # If there were no messages, do not calculate, use the base - if not processed or not duration: - return 0 - - # Calculate the velocity as the basis for the calculation - velocity = float(processed) / float(duration) - LOGGER.debug('Message processing velocity: %.2f', velocity) - return velocity - - def new_counter_dict(self): - """Return a dict object for our internal stats keeping. - - :rtype: dict - - """ - return {self.ACKED: 0, - self.CLOSED_ON_COMPLETE: 0, - self.ERROR: 0, - self.FAILURES: 0, - self.UNHANDLED_EXCEPTIONS: 0, - self.PROCESSED: 0, - self.RECONNECTED: 0, - self.REDELIVERED: 0, - self.REJECTED: 0, - self.REQUEUED: 0, - self.TIME_SPENT: 0, - self.TIME_WAITED: 0} + return self.state in [self.STATE_PROCESSING, self.STATE_STOP_REQUESTED] def on_channel_closed(self, method_frame): """Invoked by pika when RabbitMQ unexpectedly closes the channel. @@ -506,7 +444,7 @@ def on_channel_closed(self, method_frame): LOGGER.critical('Channel was closed: (%s) %s', method_frame.method.reply_code, method_frame.method.reply_text) - del self._channel + del self.channel raise ReconnectConnection def on_channel_open(self, channel): @@ -518,30 +456,30 @@ def on_channel_open(self, channel): """ LOGGER.debug('Channel opened') - self._channel = channel + self.channel = channel self.add_on_channel_close_callback() self.setup_channel() - def on_connection_closed(self, unused, code, text): + def on_connection_closed(self, _unused, code, text): """This method is invoked by pika when the connection to RabbitMQ is closed unexpectedly. Since it is unexpected, we will reconnect to RabbitMQ if it disconnects. - :param pika.connection.Connection unused: The closed connection + :param pika.connection.Connection _unused: The closed connection """ LOGGER.critical('Connection from RabbitMQ closed in state %s (%s, %s)', self.state_description, code, text) - self._channel = None + self.channel = None if not self.is_shutting_down and not self.is_waiting_to_shutdown: self.reconnect() - def on_connection_open(self, unused): + def on_connection_open(self, _unused): """This method is called by pika once the connection to RabbitMQ has been established. It passes the handle to the connection object in case we need it, but in this case, we'll just mark it unused. - :type unused: pika.adapters.tornado_connection.TornadoConnection + :type _unused: pika.adapters.tornado_connection.TornadoConnection """ LOGGER.debug('Connection opened') @@ -560,51 +498,41 @@ def on_ready_to_stop(self): signal.signal(signal.SIGTERM, signal.SIG_IGN) # If the connection is still around, close it - if self._connection.is_open: + if self.connection.is_open: LOGGER.debug('Closing connection to RabbitMQ') - self._connection.close() + self.connection.close() # Allow the consumer to gracefully stop and then stop the IOLoop self.stop_consumer() # Stop the IOLoop LOGGER.debug('Stopping IOLoop') - self._ioloop.stop() + self.ioloop.stop() # Note that shutdown is complete and set the state accordingly self.set_state(self.STATE_STOPPED) LOGGER.info('Shutdown complete') os.kill(os.getppid(), signal.SIGALRM) - def on_sigprof(self, unused_signum, unused_frame): + def on_sigprof(self, _unused_signum, _unused_frame): """Called when SIGPROF is sent to the process, will dump the stats, in future versions, queue them for the master process to get data. - :param int unused_signum: The signal number - :param frame unused_frame: The python frame the signal was received at + :param int _unused_signum: The signal number + :param frame _unused_frame: The python frame the signal was received at """ - values = dict() + values = self.stats.report() + self.stats_queue.put(values, True) if self.is_processing or self.is_idle: - self.calculate_qos_prefetch() - for key in self._counts.keys(): - values[key] = self._counts[key] - self._last_counts.get(key, 0) - self._last_counts[key] = self._counts[key] - if self._statsd: - self.send_counter_to_statsd(key, values[key]) - - self._stats_queue.put({'name': self.name, - 'consumer_name': self._consumer_name, - 'counts': values}, True) - - LOGGER.debug('Currently %s: %r', self.state_description, values) - self._last_stats_time = time.time() + self.calc_qos_prefetch(values) + self.last_stats_time = time.time() signal.siginterrupt(signal.SIGPROF, False) def open_channel(self): """Open a channel on the existing open connection to RabbitMQ""" - LOGGER.debug('Opening a channel on %r', self._connection) - self._connection.channel(self.on_channel_open) + LOGGER.debug('Opening a channel on %r', self.connection) + self.connection.channel(self.on_channel_open) def process(self, channel=None, method=None, properties=None, body=None): """Process a message from Rabbit @@ -620,74 +548,67 @@ def process(self, channel=None, method=None, properties=None, body=None): self.state_description) return self.reject(method.delivery_tag, True) self.set_state(self.STATE_PROCESSING) - LOGGER.debug('Received message #%s', method.delivery_tag) message = data.Message(channel, method, properties, body) if method.redelivered: - self.increment_count(self.REDELIVERED) - self._state_start = time.time() - if not self.invoke_consumer(message): - LOGGER.debug('Bypassing ack due to False return from _process') - return - - self.increment_count(self.PROCESSED) - if self._ack: - self.ack_message(method.delivery_tag) - if self.is_waiting_to_shutdown: - return self.on_ready_to_stop() - self.reset_state() - - @property - def profile_file(self): - if not self._kwargs['profile']: - return None - - if os.path.exists(self._kwargs['profile']) and \ - os.path.isdir(self._kwargs['profile']): - return '%s/%s-%s.prof' % (os.path.normpath(self._kwargs['profile']), - os.getpid(), - self._kwargs['consumer_name']) - return None + self.stats.incr(self.REDELIVERED) + self.invoke_consumer(method, message) - def processing_failure(self): + def processing_error(self): """Called when message processing failure happens due to a ConsumerException or an unhandled exception. """ - duration = time.time() - self._last_failure - if duration > self._MAX_ERROR_WINDOW: + duration = time.time() - self.last_failure + if duration > self.MAX_ERROR_WINDOW: LOGGER.info('Resetting failure window, %i seconds since last', duration) - self.reset_failure_counter() - self.increment_count(self.FAILURES, -1) - self._last_failure = time.time() - if self._counts[self.FAILURES] == 0: + self.reset_error_counter() + self.stats.incr(self.ERROR) + self.last_failure = time.time() + if self.too_many_errors: LOGGER.critical('Error threshold exceeded (%i), reconnecting', - self._counts[self.ERROR]) + self.stats[self.ERROR]) self.cancel_consumer_with_rabbitmq() self.close_connection() self.reconnect() + @property + def profile_file(self): + """Return the full path to write the cProfile data + + :return: str + + """ + if not self._kwargs['profile']: + return None + if os.path.exists(self._kwargs['profile']) and \ + os.path.isdir(self._kwargs['profile']): + return '%s/%s-%s.prof' % (path.normpath(self._kwargs['profile']), + os.getpid(), + self._kwargs['consumer_name']) + return None + def reconnect(self): """Reconnect to RabbitMQ after sleeping for _RECONNECT_DELAY""" LOGGER.info('Reconnecting to RabbitMQ in %i seconds', - self._RECONNECT_DELAY) - self.increment_count(self.RECONNECTED) + self.RECONNECT_DELAY) + self.stats.incr(self.RECONNECTED) self.set_state(self.STATE_INITIALIZING) - if self._connection: - if self._connection.socket: - fd = self._connection.socket.fileno() - self._ioloop.remove_handler(fd) - self._connection = None + if self.connection: + if self.connection.socket: + fd = self.connection.socket.fileno() + self.ioloop.remove_handler(fd) + self.connection = None - self._ioloop.add_timeout(time.time() + self._RECONNECT_DELAY, - self._reconnect) + self.ioloop.add_timeout(time.time() + self.RECONNECT_DELAY, + self._reconnect) def _reconnect(self): """Create and set the RabbitMQ connection""" LOGGER.info('Connecting to RabbitMQ') - self.reset_failure_counter() - self._connection = self.connect_to_rabbitmq(self._connections, - self._connection_name) + self.reset_error_counter() + self.connection = self.connect_to_rabbitmq(self.connections, + self.connection_name) self.setup_signal_handlers() def record_exception(self, error, handled=False, exc_info=None): @@ -697,7 +618,7 @@ def record_exception(self, error, handled=False, exc_info=None): :param bool handled: Was the exception handled """ - self.increment_count(self.ERROR) + self.stats.incr(self.ERROR) if handled: if not isinstance(error, consumer.MessageException): LOGGER.exception('Processor handled %s: %s', @@ -705,12 +626,12 @@ def record_exception(self, error, handled=False, exc_info=None): else: LOGGER.exception('Processor threw an uncaught exception %s: %s', error.__class__.__name__, error) - self.increment_count(self.UNHANDLED_EXCEPTIONS) + self.stats.incr(self.UNHANDLED_EXCEPTIONS) if not isinstance(error, consumer.MessageException) and exc_info: formatted_lines = traceback.format_exception(*exc_info) for offset, line in enumerate(formatted_lines): - LOGGER.debug('(%s) %i: %s', error.__class__.__name__, - offset, line.strip()) + LOGGER.debug('(%s) %i: %s', error.__class__.__name__, offset, + line.strip()) def reject(self, delivery_tag, requeue=True): """Reject the message on the broker and log it. We should move this to @@ -720,26 +641,25 @@ def reject(self, delivery_tag, requeue=True): :param bool requeue: Specify if the message should be re-queued or not """ - if not self._ack: + if not self.ack: raise RuntimeError('Can not rejected messages when ack is False') if not self.can_respond: LOGGER.warning('Can not reject message, disconnected from RabbitMQ') - self.increment_count(self.CLOSED_ON_COMPLETE) + self.stats.incr(self.CLOSED_ON_COMPLETE) if self.is_processing: self.reset_state() return - LOGGER.warning('Rejecting message %s %s requeue', delivery_tag, - 'with' if requeue else 'without') - self._channel.basic_nack(delivery_tag=delivery_tag, requeue=requeue) - self.increment_count(self.REQUEUED if requeue else self.REJECTED) + LOGGER.warning('Rejecting message %s %s requeue', delivery_tag, 'with' + if requeue else 'without') + self.channel.basic_nack(delivery_tag=delivery_tag, requeue=requeue) + self.stats.incr(self.REQUEUED if requeue else self.REJECTED) if self.is_processing: self.reset_state() - def reset_failure_counter(self): - """Reset the failure counter to the max error count""" - LOGGER.debug('Resetting the failure counter to %i', - self._max_error_count) - self._counts[self.FAILURES] = self._max_error_count + def reset_error_counter(self): + """Reset the error counter to 0""" + LOGGER.debug('Resetting the error counter') + self.stats[self.ERROR] = 0 def reset_state(self): """Reset the runtime state after processing a message to either idle @@ -758,52 +678,39 @@ def reset_state(self): def run(self): """Start the consumer""" + logger = logging.getLogger() + logger.addHandler(NullHandler()) if self.profile_file: LOGGER.info('Profiling to %s', self.profile_file) profile.runctx('self._run()', globals(), locals(), self.profile_file) else: self._run() - LOGGER.debug('Exiting %s (%i, %i)', self.name, os.getpid(), os.getppid()) + LOGGER.debug('Exiting %s (%i, %i)', self.name, os.getpid(), + os.getppid()) def _run(self): """Run method that can be profiled""" - self._ioloop = ioloop.IOLoop.instance() - common.add_null_handler() + self.ioloop = ioloop.IOLoop.instance() try: self.setup(self._kwargs['config'], - self._kwargs['connection_name'], self._kwargs['consumer_name'], + self._kwargs['connection_name'], self._kwargs['stats_queue'], self._kwargs['logging_config']) except ImportError as error: name = self._kwargs['consumer_name'] - classname = self._kwargs['config']['Consumers'][name]['consumer'] + class_name = self._kwargs['config']['Consumers'][name]['consumer'] LOGGER.critical('Could not import %s, stopping process: %r', - classname, error) + class_name, error) return if not self.is_stopped: try: - self._ioloop.start() + self.ioloop.start() except KeyboardInterrupt: LOGGER.warning('CTRL-C while waiting for clean shutdown') - def send_counter_to_statsd(self, counter, value=1): - """Send a metric passed in to statsd. - - :param str counter: The counter name - :param int|float value: The count - - """ - payload = self._STATSD_FORMAT.format(self._statsd_prefix, - self._node_name, - self._consumer_name, - counter, - math.ceil(value)) - self._statsd_socket.sendto(payload, (self._statsd_host, - self._statsd_port)) - def set_qos_prefetch(self, value=None): """Set the QOS Prefetch count for the channel. @@ -811,81 +718,64 @@ def set_qos_prefetch(self, value=None): """ qos_prefetch = int(value or self.base_qos_prefetch) - if qos_prefetch != self._qos_prefetch: - self._qos_prefetch = qos_prefetch + if qos_prefetch != self.qos_prefetch: + self.qos_prefetch = qos_prefetch LOGGER.debug('Setting the QOS Prefetch to %i', qos_prefetch) - self._channel.basic_qos(prefetch_count=qos_prefetch) + self.channel.basic_qos(self.on_qos_set, 0, qos_prefetch, False) - def set_state(self, new_state): - """Assign the specified state to this consumer object. + def on_qos_set(self, frame): + """Invoked by pika when the QoS is set""" + LOGGER.debug("QoS was set: %r", frame) - :param int new_state: The new state of the object - :raises: ValueError - - """ - # Keep track of how much time we're spending waiting and processing - if new_state == self.STATE_PROCESSING and self.is_idle: - self.increment_count(self.TIME_WAITED, self.time_in_state) - - elif new_state == self.STATE_IDLE and self.is_processing: - self.increment_count(self.TIME_SPENT, self.time_in_state) - - # Use the parent object to set the state - super(Process, self).set_state(new_state) - - def setup(self, cfg, connection_name, consumer_name, stats_queue, + def setup(self, cfg, consumer_name, connection_name, stats_queue, logging_config): """Initialize the consumer, setting up needed attributes and connecting to RabbitMQ. :param dict cfg: Consumer config section - :param str connection_name: The name of the connection :param str consumer_name: Consumer name for config + :param str connection_name: The name of the connection :param multiprocessing.Queue stats_queue: Queue to MCP :param dict logging_config: Logging config from YAML file """ - LOGGER.info('Initializing for %s on %s connection', - self.name, connection_name) - self._logging_config = logging_config - self._stats_queue = stats_queue - self._connection_name = connection_name - self._consumer_name = consumer_name - self._config = cfg['Consumers'][consumer_name] - self._connections = cfg['Connections'] - self._consumer = self.get_consumer(self._config) - - if not self._consumer: + LOGGER.info('Initializing for %s on %s connection', self.name, + connection_name) + self.connection_name = connection_name + self.consumer_name = consumer_name + self.config = cfg['Consumers'][consumer_name] + self.connections = cfg['Connections'] + self.consumer = self.get_consumer(self.config) + self.stats_queue = stats_queue + + if not self.consumer: LOGGER.critical('Could not import and start processor') self.set_state(self.STATE_STOPPED) exit(1) - self._queue_name = self._config['queue'] - self._ack = self._config.get('ack', True) - self._dynamic_qos = self._config.get('dynamic_qos', False) - self._max_error_count = int(self._config.get('max_errors', - self._MAX_ERROR_COUNT)) - self._max_frame_size = self._config.get('max_frame_size', - pika.spec.FRAME_MAX_SIZE) - - self._statsd = False - if 'statsd' in cfg and cfg['statsd'].get('enabled', False): - self._statsd = True - self._statsd_prefix = cfg['statsd'].get('prefix', 'rejected') - self._statsd_host = \ - cfg['statsd'].get('host', os.environ.get('STATSD_HOST', - 'localhost')) - self._statsd_port = \ - cfg['statsd'].get('port', os.environ.get('STATSD_PORT', 8125)) - if self._statsd_host != os.environ.get('STATSD_HOST', None): - os.putenv('STATSD_HOST', self._statsd_host) - if self._statsd_port != os.environ.get('STATSD_PORT', None): - os.putenv('STATSD_PORT', str(self._statsd_port)) - - self.reset_failure_counter() + # Setup the stats counter instance + self.stats = stats.Stats(self.name, consumer_name, cfg['statsd'] or {}) + + # Set statsd in the consumer + if self.stats.statsd: + try: + self.consumer._set_statsd(self.stats.statsd) + except AttributeError: + LOGGER.info('Consumer does not support statsd assignment') + + # Consumer settings + self.ack = self.config.get('ack', True) + self.dynamic_qos = self.config.get('dynamic_qos', False) + self.max_error_count = int(self.config.get('max_errors', + self.MAX_ERROR_COUNT)) + self.max_frame_size = self.config.get('max_frame_size', + spec.FRAME_MAX_SIZE) + self.queue_name = self.config['queue'] + + self.reset_error_counter() self.setup_signal_handlers() - self._connection = self.connect_to_rabbitmq(self._connections, - self._connection_name) + self.connection = self.connect_to_rabbitmq(self.connections, + self.connection_name) def setup_channel(self): """Setup the channel that will be used to communicate with RabbitMQ and @@ -897,17 +787,17 @@ def setup_channel(self): # Set the channel in the consumer try: - self._consumer.set_channel(self._channel) + self.consumer._set_channel(self.channel) except AttributeError: LOGGER.info('Consumer does not support channel assignment') # Setup QoS, Send a Basic.Recover and then Basic.Consume self.set_qos_prefetch() - self._channel.basic_recover(requeue=True) - self._channel.basic_consume(consumer_callback=self.process, - queue=self._queue_name, - no_ack=not self._ack, - consumer_tag=self.name) + self.channel.basic_recover(requeue=True) + self.channel.basic_consume(consumer_callback=self.process, + queue=self.queue_name, + no_ack=not self.ack, + consumer_tag=self.name) def setup_signal_handlers(self): """Setup the stats and stop signal handlers.""" @@ -926,9 +816,9 @@ def start_message_processing(self): message is processing. """ - self._message_connection_id = self._connection_id + self.message_connection_id = self.connection_id - def stop(self, signum=None, frame_unused=None): + def stop(self, signum=None, _frame_unused=None): """Stop the consumer from consuming by calling BasicCancel and setting our state. @@ -964,19 +854,10 @@ def stop_consumer(self): """ try: LOGGER.info('Shutting down the consumer') - self._consumer.shutdown() + self.consumer.shutdown() except AttributeError: LOGGER.debug('Consumer does not have a shutdown method') - @property - def time_in_state(self): - """Return the time that has been spent in the current state. - - :rtype: float - - """ - return time.time() - self._state_start - @property def too_many_errors(self): """Return a bool if too many errors have occurred. @@ -984,7 +865,7 @@ def too_many_errors(self): :rtype: bool """ - return self.count(self.ERROR) >= self._max_error_count + return self.stats[self.ERROR] >= self.max_error_count class ReconnectConnection(Exception): diff --git a/rejected/common.py b/rejected/state.py similarity index 60% rename from rejected/common.py rename to rejected/state.py index 3cbed5c..15bf05f 100644 --- a/rejected/common.py +++ b/rejected/state.py @@ -1,5 +1,5 @@ """ -Common Mixin Classes +Base State Tracking Class """ import logging @@ -8,20 +8,6 @@ LOGGER = logging.getLogger(__name__) -try: - from logging import NullHandler -except ImportError: - # Python 2.6 does not have a NullHandler - class NullHandler(logging.Handler): - def emit(self, record): - pass - - -def add_null_handler(): - logger = logging.getLogger() - logger.addHandler(NullHandler()) - - class State(object): """Class that is to be extended by MCP and process for maintaining the internal state of the application. @@ -38,19 +24,21 @@ class State(object): STATE_STOPPED = 0x08 # For reverse lookup - _STATES = {0x01: 'Initializing', - 0x02: 'Connecting', - 0x03: 'Idle', - 0x04: 'Active', - 0x05: 'Sleeping', - 0x06: 'Stop Requested', - 0x07: 'Shutting down', - 0x08: 'Stopped'} + STATES = { + 0x01: 'Initializing', + 0x02: 'Connecting', + 0x03: 'Idle', + 0x04: 'Active', + 0x05: 'Sleeping', + 0x06: 'Stop Requested', + 0x07: 'Shutting down', + 0x08: 'Stopped' + } def __init__(self): """Initialize the state of the object""" - self._state = self.STATE_INITIALIZING - self._state_start = time.time() + self.state = self.STATE_INITIALIZING + self.state_start = time.time() def set_state(self, new_state): """Assign the specified state to this consumer object. @@ -60,14 +48,14 @@ def set_state(self, new_state): """ # Make sure it's a valid state - if new_state not in self._STATES: + if new_state not in self.STATES: raise ValueError('Invalid state value: %r' % new_state) # Set the state - LOGGER.debug('State changing from %s to %s', - self._STATES[self._state], self._STATES[new_state]) - self._state = new_state - self._state_start = time.time() + LOGGER.debug('State changing from %s to %s', self.STATES[self.state], + self.STATES[new_state]) + self.state = new_state + self.state_start = time.time() @property def is_connecting(self): @@ -76,7 +64,7 @@ def is_connecting(self): :rtype: bool """ - return self._state == self.STATE_CONNECTING + return self.state == self.STATE_CONNECTING @property def is_idle(self): @@ -85,7 +73,7 @@ def is_idle(self): :rtype: bool """ - return self._state == self.STATE_IDLE + return self.state == self.STATE_IDLE @property def is_running(self): @@ -95,8 +83,8 @@ def is_running(self): :rtype: bool """ - return self._state in [self.STATE_IDLE, self.STATE_ACTIVE, - self.STATE_SLEEPING] + return self.state in [self.STATE_IDLE, self.STATE_ACTIVE, + self.STATE_SLEEPING] @property def is_shutting_down(self): @@ -105,8 +93,7 @@ def is_shutting_down(self): :rtype: bool """ - return self._state == self.STATE_SHUTTING_DOWN - + return self.state == self.STATE_SHUTTING_DOWN @property def is_sleeping(self): @@ -115,7 +102,7 @@ def is_sleeping(self): :rtype: bool """ - return self._state == self.STATE_SLEEPING + return self.state == self.STATE_SLEEPING @property def is_stopped(self): @@ -124,7 +111,7 @@ def is_stopped(self): :rtype: bool """ - return self._state == self.STATE_STOPPED + return self.state == self.STATE_STOPPED @property def is_waiting_to_shutdown(self): @@ -133,7 +120,7 @@ def is_waiting_to_shutdown(self): :rtype: bool """ - return self._state == self.STATE_STOP_REQUESTED + return self.state == self.STATE_STOP_REQUESTED @property def state_description(self): @@ -142,4 +129,13 @@ def state_description(self): :rtype: str """ - return self._STATES[self._state] + return self.STATES[self.state] + + @property + def time_in_state(self): + """Return the time that has been spent in the current state. + + :rtype: float + + """ + return time.time() - self.state_start diff --git a/rejected/stats.py b/rejected/stats.py new file mode 100644 index 0000000..e5c9c99 --- /dev/null +++ b/rejected/stats.py @@ -0,0 +1,61 @@ +""" +Stats class that wraps the collections.Counter object and transparently +passes calls to increment and add_timing if statsd is enabled. + +""" +try: + import backport_collections as collections +except ImportError: + import collections +from rejected import statsd + + +class Stats(object): + + def __init__(self, name, consumer_name, statsd_cfg): + self.name = name + self.consumer_name = consumer_name + self.statsd = None + if statsd_cfg.get('enabled', False): + self.statsd = statsd.StatsdClient(consumer_name, statsd_cfg) + self.counter = collections.Counter() + self.previous = None + + def __getitem__(self, item): + return self.counter.get(item) + + def __setitem__(self, item, value): + self.counter[item] = value + + def add_timing(self, item, duration): + if self.statsd: + self.statsd.add_timing(item, duration) + + def get(self, item): + return self.counter.get(item) + + def diff(self, item): + return self.counter.get(item, 0) - self.previous.get(item, 0) + + def incr(self, key, value=1): + self.counter[key] += value + if self.statsd: + self.statsd.incr(key, value) + + def report(self): + """Submit the stats data to both the MCP stats queue and statsd""" + if not self.previous: + self.previous = dict() + for key in self.counter: + self.previous[key] = 0 + if self.statsd: + for item in self.counter.keys(): + self.statsd.incr(item, self.diff(item)) + values = { + 'name': self.name, + 'consumer_name': self.consumer_name, + 'counts': dict(self.counter), + 'previous': self.previous + } + self.previous = dict(self.counter) + return values diff --git a/rejected/statsd.py b/rejected/statsd.py new file mode 100644 index 0000000..b57c7f1 --- /dev/null +++ b/rejected/statsd.py @@ -0,0 +1,88 @@ +""" +Statsd Client that takes configuration first from the rejected configuration +file, falling back to environment variables, and finally default values. + +Environment Variables: + +- STATSD_HOST +- STATSD_PORT +- STATSD_PREFIX + +""" +import logging +import os +import socket + +LOGGER = logging.getLogger(__name__) + + +class StatsdClient(object): + """ + + """ + DEFAULT_HOST = 'localhost' + DEFAULT_PORT = 8125 + DEFAULT_PREFIX = 'rejected' + PAYLOAD_FORMAT = '{0}.{1}.{2}.{3}:{4}|{5}' + + def __init__(self, consumer_name, settings): + """ + + :param cfg: + + """ + self._consumer_name = consumer_name + self._hostname = socket.gethostname().split('.')[0] + self._settings = settings + + self._addr = (self._setting('host', self.DEFAULT_HOST), + int(self._setting('port', self.DEFAULT_PORT))) + self._prefix = self._setting('prefix', self.DEFAULT_PREFIX) + self._socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, + socket.IPPROTO_UDP) + + def _setting(self, key, default): + """Return the setting, checking config, then the appropriate + environment variable, falling back to the default. + + :param str key: The key to get + :param any default: The default value if not set + :return: str + + """ + env = 'STATSD_{}'.format(key).upper() + return self._settings.get(key, os.environ.get(env, default)) + + def add_timing(self, key, value=0): + """Add a timer value to statsd for the specified key + + :param str key: The key to add the timing to + :param int|float value: The value of the timing + + """ + self._send(key, value, 'ms') + + def incr(self, key, value=1): + """Increment the counter value in statsd + + :param str key: The key to increment + :param int value: The value to increment by, defaults to 1 + + """ + self._send(key, value, 'c') + + def _send(self, key, value, metric_type): + """Send the specified value to the statsd daemon via UDP without a + direct socket connection. + + :param str key: The key name to send + :param int|float value: The value for the key + + """ + try: + payload = self.PAYLOAD_FORMAT.format(self._prefix, self._hostname, + self._consumer_name, key, + value, metric_type).encode() + self._socket.sendto(payload, self._addr) + except socket.error: + LOGGER.exception('Error sending statsd metric') diff --git a/setup.py b/setup.py index ae976fa..01003e2 100644 --- a/setup.py +++ b/setup.py @@ -20,6 +20,7 @@ extras_require = {'html': ['beautifulsoup4']} if sys.version_info < (2, 7, 0): + install_requires.append('backport_collections') install_requires.append('importlib') setup(name='rejected', diff --git a/tests/test_consumer.py b/tests/test_consumer.py index 050ccb3..53b81f7 100644 --- a/tests/test_consumer.py +++ b/tests/test_consumer.py @@ -1,16 +1,12 @@ # coding=utf-8 """Tests for rejected.consumer""" -import bs4 -import bz2 -import csv +from tornado import gen import json import mock -import pickle try: import unittest2 as unittest except ImportError: import unittest -import zlib from . import mocks @@ -23,7 +19,7 @@ class ConsumerInitializationTests(unittest.TestCase): def test_configuration_is_assigned(self): cfg = {'foo': 'bar'} obj = consumer.Consumer(cfg) - self.assertDictEqual(obj._config, cfg) + self.assertDictEqual(obj._settings, cfg) def test_channel_is_none(self): obj = consumer.Consumer({}) @@ -51,7 +47,7 @@ class ConsumerSetChannelTests(unittest.TestCase): def test_set_channel_assigns_to_channel(self): obj = consumer.Consumer({}) channel = mock.Mock() - obj.set_channel(channel) + obj._set_channel(channel) self.assertEqual(obj._channel, channel) @@ -67,37 +63,39 @@ def setUp(self): self.message = data.Message(mocks.CHANNEL, mocks.METHOD, mocks.PROPERTIES, mocks.BODY) + @gen.coroutine def test_receive_assigns_message(self): - self.obj.receive(self.message) + yield self.obj._execute(self.message) self.assertEqual(self.obj._message, self.message) def test_receive_invokes_process(self): with mock.patch.object(self.obj, 'process') as process: - self.obj.receive(self.message) + self.obj._execute(self.message) process.assert_called_once_with() def test_receive_drops_invalid_message_type(self): self.obj.MESSAGE_TYPE = 'foo' self.obj.DROP_INVALID_MESSAGES = True with mock.patch.object(self.obj, 'process') as process: - self.obj.receive(self.message) + self.obj._execute(self.message) process.assert_not_called() def test_raises_with_invalid_message_type(self): self.obj.MESSAGE_TYPE = 'foo' self.obj.DROP_INVALID_MESSAGES = False - self.assertRaises(consumer.ConsumerException, self.obj.receive, - self.message) + result = yield self.obj._execute(self.message) + self.assertIsInstance(result.exception, consumer.ConsumerException) class ConsumerPropertyTests(unittest.TestCase): + @gen.coroutine def setUp(self): self.config = {'foo': 'bar', 'baz': 1, 'qux': True} self.message = data.Message(mocks.CHANNEL, mocks.METHOD, mocks.PROPERTIES, mocks.BODY) self.obj = TestConsumer(self.config) - self.obj.receive(self.message) + yield self.obj._execute(self.message) def test_app_id_property(self): self.assertEqual(self.obj.app_id, mocks.PROPERTIES.app_id) @@ -172,7 +170,7 @@ def setUp(self): self.message = data.Message(mocks.CHANNEL, mocks.METHOD, mocks.PROPERTIES, json.dumps(self.body)) self.obj = TestSmartConsumer({}) - self.obj.receive(self.message) + self.obj._execute(self.message) def test_message_body_property(self): self.assertDictEqual(self.obj.body, self.body) diff --git a/tests/test_mcp.py b/tests/test_mcp.py index d6c6496..03033b3 100644 --- a/tests/test_mcp.py +++ b/tests/test_mcp.py @@ -1,28 +1,28 @@ """Tests for the MCP""" import mock import multiprocessing +from mock import patch from helper import config from rejected import mcp from . import test_state -class TestMCP(test_state.TestState): - CONFIG = {'poll_interval': 30.0, 'log_stats': True} +class TestMCP(test_state.TestState): - @mock.patch.object(multiprocessing, 'Queue') - def setUp(self, mock_queue_unused): + CONFIG = {'poll_interval': 30.0, 'log_stats': True, 'Consumers': {}} + @patch.object(multiprocessing, 'Queue') + def setUp(self, _mock_queue_unused): self.cfg = config.Config() self.cfg.application.update(self.CONFIG) self._obj = mcp.MasterControlProgram(self.cfg) def test_mcp_init_consumers_dict(self): - self.assertIsInstance(self._obj._consumers, dict) + self.assertIsInstance(self._obj.consumers, dict) def test_mcp_init_consumers_dict_empty(self): - self.assertTrue(not self._obj._consumers, dict) + self.assertTrue(not self._obj.consumers, dict) def test_mcp_init_queue_initialized(self): - self.assertIsInstance(self._obj._stats_queue, mock.MagicMock) - + self.assertIsInstance(self._obj.stats_queue, mock.MagicMock) diff --git a/tests/test_process.py b/tests/test_process.py index ed42995..4a686d1 100644 --- a/tests/test_process.py +++ b/tests/test_process.py @@ -1,15 +1,17 @@ """Tests for rejected.process""" import copy import mock -from pika import channel -from pika import connection -from pika import credentials -import signal +from mock import patch try: import unittest2 as unittest except ImportError: import unittest +from pika import channel +from pika import connection +from pika import credentials +import signal + from helper import config as helper_config from rejected import consumer @@ -18,6 +20,7 @@ from . import test_state + class TestImportNamspacedClass(unittest.TestCase): def test_import_consumer(self): @@ -44,47 +47,58 @@ def test_import_consumer_failure(self): class TestProcess(test_state.TestState): - config = {'Connections': - {'MockConnection': - {'host': 'localhost', - 'port': 5672, - 'user': 'guest', - 'pass': 'guest', - 'vhost': '/'}, - 'MockRemoteConnection': - {'host': 'remotehost', - 'port': 5672, - 'user': 'guest', - 'pass': 'guest', - 'vhost': '/'} - }, - 'Consumers': - {'MockConsumer': - {'consumer': 'tests.mocks.MockConsumer', - 'connections': ['MockConnection'], - 'config': {'test_value': True, - 'num_value': 100}, - 'min': 2, - 'max': 5, - 'max_errors': 10, - 'qos_prefetch': 5, - 'ack': True, - 'queue': 'mock_queue'}, - 'MockConsumer2': - {'consumer': 'mock_consumer.MockConsumer', - 'connections': ['MockConnection', - 'MockRemoteConnection'], - 'config': {'num_value': 50}, - 'min': 1, - 'max': 2, - 'queue': 'mock_you'}}} + config = { + 'statsd': { + 'enabled': False + }, + 'Connections': { + 'MockConnection': { + 'host': 'localhost', + 'port': 5672, + 'user': 'guest', + 'pass': 'guest', + 'vhost': '/' + }, + 'MockRemoteConnection': { + 'host': 'remotehost', + 'port': 5672, + 'user': 'guest', + 'pass': 'guest', + 'vhost': '/' + } + }, + 'Consumers': { + 'MockConsumer': { + 'consumer': 'tests.mocks.MockConsumer', + 'connections': ['MockConnection'], + 'config': {'test_value': True, + 'num_value': 100}, + 'min': 2, + 'max': 5, + 'max_errors': 10, + 'qos_prefetch': 5, + 'ack': True, + 'queue': 'mock_queue' + }, + 'MockConsumer2': { + 'consumer': 'mock_consumer.MockConsumer', + 'connections': ['MockConnection', 'MockRemoteConnection'], + 'config': {'num_value': 50}, + 'min': 1, + 'max': 2, + 'queue': 'mock_you' + } + } + } logging_config = helper_config.LoggingConfig(helper_config.Config.LOGGING) - mock_args = {'cfg': config, - 'connection_name': 'MockConnection', - 'consumer_name': 'MockConsumer', - 'stats_queue': 'StatsQueue', - 'logging_config': logging_config} + mock_args = { + 'cfg': config, + 'connection_name': 'MockConnection', + 'consumer_name': 'MockConsumer', + 'stats_queue': 'StatsQueue', + 'logging_config': logging_config + } def setUp(self): self._obj = self.new_process() @@ -95,26 +109,12 @@ def tearDown(self): def new_kwargs(self, kwargs): return copy.deepcopy(kwargs) - def new_counter(self): - return {process.Process.ACKED: 0, - process.Process.CLOSED_ON_COMPLETE: 0, - process.Process.ERROR: 0, - process.Process.FAILURES: 0, - process.Process.UNHANDLED_EXCEPTIONS: 0, - process.Process.PROCESSED: 0, - process.Process.RECONNECTED: 0, - process.Process.REDELIVERED: 0, - process.Process.REJECTED: 0, - process.Process.REQUEUED: 0, - process.Process.TIME_SPENT: 0, - process.Process.TIME_WAITED: 0} - def new_process(self, kwargs=None): - with mock.patch('multiprocessing.Process'): - return process.Process(group=None, - name='MockProcess', - kwargs=kwargs or - self.new_kwargs(self.mock_args)) + with patch('multiprocessing.Process'): + return process.Process( + group=None, + name='MockProcess', + kwargs=kwargs or self.new_kwargs(self.mock_args)) def new_mock_channel(self): return mock.Mock(spec=channel.Channel) @@ -131,21 +131,19 @@ def new_mock_parameters(self, host, port, vhost, user, password): return mock_parameters def get_connection_parameters(self, host, port, vhost, user, password): - with mock.patch('pika.credentials.PlainCredentials'): - with mock.patch('pika.connection.ConnectionParameters'): - return self._obj.get_connection_parameters(host, - port, - vhost, - user, - password, - 500) + with patch('pika.credentials.PlainCredentials'): + with patch('pika.connection.ConnectionParameters'): + return self._obj.get_connection_parameters(host, port, vhost, + user, password, 500) def default_connection_parameters(self): - return {'host': 'rabbitmq', - 'port': 5672, - 'user': 'nose', - 'password': 'mock', - 'vhost': 'unittest'} + return { + 'host': 'rabbitmq', + 'port': 5672, + 'user': 'nose', + 'password': 'mock', + 'vhost': 'unittest' + } def mock_parameters(self): kwargs = self.default_connection_parameters() @@ -153,62 +151,40 @@ def mock_parameters(self): def test_app_id(self): expectation = 'rejected/%s' % __version__ - self.assertEqual(self._obj._AMQP_APP_ID, expectation) - - def test_counter_data_structure(self): - self.assertDictEqual(self._obj.new_counter_dict(), self.new_counter()) + self.assertEqual(self._obj.AMQP_APP_ID, expectation) def test_startup_state(self): new_process = self.new_process() - self.assertEqual(new_process._state, process.Process.STATE_INITIALIZING) + self.assertEqual(new_process.state, process.Process.STATE_INITIALIZING) def test_startup_time(self): mock_time = 123456789.012345 - with mock.patch('time.time', return_value=mock_time): + with patch('time.time', return_value=mock_time): new_process = self.new_process() - self.assertEqual(new_process._state_start, mock_time) - - def test_startup_counts(self): - new_process = self.new_process() - self.assertDictEqual(new_process._counts, self.new_counter()) + self.assertEqual(new_process.state_start, mock_time) def test_startup_channel_is_none(self): new_process = self.new_process() - self.assertIsNone(new_process._channel) + self.assertIsNone(new_process.channel) def test_startup_consumer_is_none(self): new_process = self.new_process() - self.assertIsNone(new_process._consumer) - - def test_ack_message(self): - delivery_tag = 'MockDeliveryTag-1' - mock_ack = mock.Mock() - self._obj._ack = True - self._obj._message_connection_id = self._obj._connection_id = 1 - self._obj._channel = self.new_mock_channel() - self._obj._channel.basic_ack = mock_ack - self._obj.ack_message(delivery_tag) - mock_ack.assert_called_once_with(delivery_tag=delivery_tag) - - def test_count(self): - expectation = 10 - self._obj._counts[process.Process.REDELIVERED] = expectation - self.assertEqual(self._obj.count(process.Process.REDELIVERED), - expectation) + self.assertIsNone(new_process.consumer) def test_get_config(self): - connection = 'MockConnection' + conn = 'MockConnection' name = 'MockConsumer' number = 5 pid = 1234 - expectation = {'connection': self.config['Connections'][connection], - 'connection_name': connection, - 'consumer_name': name, - 'process_name': '%s_%i_tag_%i' % (name, pid, number)} - with mock.patch('os.getpid', return_value=pid): - self.assertEqual(self._obj.get_config(self.config, number, - name, connection), - expectation) + expectation = { + 'connection': self.config['Connections'][conn], + 'connection_name': conn, + 'consumer_name': name, + 'process_name': '%s_%i_tag_%i' % (name, pid, number) + } + with patch('os.getpid', return_value=pid): + self.assertEqual(self._obj.get_config(self.config, number, name, + conn), expectation) def test_get_consumer_with_invalid_consumer(self): cfg = self.config['Consumers']['MockConsumer2'] @@ -216,96 +192,46 @@ def test_get_consumer_with_invalid_consumer(self): def test_get_consumer_version_output(self): config = {'consumer': 'optparse.OptionParser'} - with mock.patch('logging.Logger.info') as info: + with patch('logging.Logger.info') as info: import optparse self._obj.get_consumer(config) info.assert_called_with('Creating consumer %s v%s', - config['consumer'], - optparse.__version__) + config['consumer'], optparse.__version__) def test_get_consumer_no_version_output(self): config = {'consumer': 'StringIO.StringIO'} - with mock.patch('logging.Logger.info') as info: + with patch('logging.Logger.info') as info: self._obj.get_consumer(config) info.assert_called_with('Creating consumer %s', config['consumer']) - @mock.patch.object(consumer.Consumer, '__init__', side_effect=ImportError) + @patch.object(consumer.Consumer, '__init__', side_effect=ImportError) def test_get_consumer_with_config_is_none(self, mock_method): - config = {'consumer': 'rejected.consumer.Consumer', - 'config': {'field': 'value', 'true': True}} + config = { + 'consumer': 'rejected.consumer.Consumer', + 'config': {'field': 'value', + 'true': True} + } new_process = self.new_process() new_process.get_consumer(config) self.assertIsNone(new_process.get_consumer(config)) - @mock.patch.object(consumer.Consumer, '__init__', side_effect=ImportError) + @patch.object(consumer.Consumer, '__init__', side_effect=ImportError) def test_get_consumer_with_no_config_is_none(self, mock_method): config = {'consumer': 'rejected.consumer.Consumer'} new_process = self.new_process() self.assertIsNone(new_process.get_consumer(config)) - def test_reject_message(self): - delivery_tag = 'MockDeliveryTag-1' - mock_reject = mock.Mock() - self._obj._channel = self.new_mock_channel() - self._obj._ack = True - self._obj._message_connection_id = self._obj._connection_id = 1 - self._obj._channel.basic_nack = mock_reject - self._obj.reject(delivery_tag) - mock_reject.assert_called_once_with(requeue=True, - delivery_tag=delivery_tag) - - def test_set_qos_prefetch_value(self): - new_process = self.new_process() - new_process.setup(**self.mock_args) - mock_channel = self.new_mock_channel() - new_process._channel = mock_channel - value = 12 - new_process.set_qos_prefetch(value) - mock_channel.basic_qos.assert_called_once_with(prefetch_count=value) - - def test_set_qos_prefetch_no_value(self): - new_process = self.new_process() - new_process.setup(**self.mock_args) - mock_channel = self.new_mock_channel() - new_process._channel = mock_channel - new_process.set_qos_prefetch() - value = new_process._qos_prefetch - mock_channel.basic_qos.assert_called_once_with(prefetch_count=value) - - def test_set_state_idle_to_processing_check_time_waited(self): - new_process = self.new_process() - new_process._state = new_process.STATE_IDLE - new_process._counts[new_process.TIME_WAITED] = 0 - prop_mock = mock.Mock() - value = 10 - prop_mock.__get__ = mock.Mock(return_value=value) - with mock.patch.object(process.Process, 'time_in_state', prop_mock): - new_process.set_state(new_process.STATE_PROCESSING) - self.assertEqual(new_process._counts[new_process.TIME_WAITED], - value) - - def test_set_state_processing_to_idle_check_time_spent(self): - new_process = self.new_process() - new_process._state = new_process.STATE_PROCESSING - new_process._counts[new_process.TIME_SPENT] = 0 - prop_mock = mock.Mock() - value = 10 - prop_mock.__get__ = mock.Mock(return_value=value) - with mock.patch.object(process.Process, 'time_in_state', prop_mock): - new_process.set_state(new_process.STATE_IDLE) - self.assertEqual(new_process._counts[new_process.TIME_SPENT], value) - def test_setup_signal_handlers(self): signals = [mock.call(signal.SIGPROF, self._obj.on_sigprof), mock.call(signal.SIGABRT, self._obj.stop)] - with mock.patch('signal.signal') as signal_signal: + with patch('signal.signal') as signal_signal: self._obj.setup_signal_handlers() signal_signal.assert_has_calls(signals, any_order=True) def mock_setup(self, new_process=None, side_effect=None): - with mock.patch('signal.signal', side_effect=side_effect): - with mock.patch('rejected.process.import_consumer', - return_value=(mock.Mock, None)): + with patch('signal.signal', side_effect=side_effect): + with patch('rejected.process.import_consumer', + return_value=(mock.Mock, None)): if not new_process: new_process = self.new_process(self.mock_args) new_process.setup(**self.mock_args) @@ -313,38 +239,32 @@ def mock_setup(self, new_process=None, side_effect=None): def test_setup_stats_queue(self): mock_process = self.mock_setup() - self.assertEqual(mock_process._stats_queue, + self.assertEqual(mock_process.stats_queue, self.mock_args['stats_queue']) def test_setup_consumer_name(self): mock_process = self.mock_setup() - self.assertEqual(mock_process._stats_queue, + self.assertEqual(mock_process.stats_queue, self.mock_args['stats_queue']) def test_setup_config(self): mock_process = self.mock_setup() config = self.config['Consumers']['MockConsumer'] - self.assertEqual(mock_process._config, config) - - def test_setup_with_consumer_config(self): - new_process = self.new_process() - new_process.setup(**self.mock_args) - self.assertEqual(new_process._consumer._configuration, - self.config['Consumers']['MockConsumer']['config']) + self.assertEqual(mock_process.config, config) def test_setup_config_queue_name(self): mock_process = self.mock_setup() - self.assertEqual(mock_process._queue_name, + self.assertEqual(mock_process.queue_name, self.config['Consumers']['MockConsumer']['queue']) def test_setup_config_ack(self): mock_process = self.mock_setup() - self.assertEqual(mock_process._ack, + self.assertEqual(mock_process.ack, self.config['Consumers']['MockConsumer']['ack']) def test_setup_max_error_count(self): mock_process = self.mock_setup() - self.assertEqual(mock_process._max_error_count, + self.assertEqual(mock_process.max_error_count, self.config['Consumers']['MockConsumer']['max_errors']) def test_setup_prefetch_count_no_config(self): @@ -353,54 +273,45 @@ def test_setup_prefetch_count_no_config(self): mock_process = self.new_process() mock_process.setup(**args) self.assertEqual(mock_process.base_qos_prefetch, - process.Process._QOS_PREFETCH_COUNT) + process.Process.QOS_PREFETCH_COUNT) def test_setup_prefetch_count_with_config(self): mock_process = self.mock_setup() - self.assertEqual(mock_process.base_qos_prefetch, - self.config['Consumers']['MockConsumer']['qos_prefetch']) + self.assertEqual( + mock_process.base_qos_prefetch, + self.config['Consumers']['MockConsumer']['qos_prefetch']) def test_setup_connection_arguments(self): - with mock.patch.object(process.Process, - 'connect_to_rabbitmq') as mock_method: + with patch.object(process.Process, + 'connect_to_rabbitmq') as mock_method: self.mock_setup() mock_method.assert_called_once_with(self.config['Connections'], 'MockConnection') def test_setup_connection_value(self): mock_connection = mock.Mock() - with mock.patch.object(process.Process, 'connect_to_rabbitmq', - return_value=mock_connection): + with patch.object(process.Process, 'connect_to_rabbitmq', + return_value=mock_connection): mock_process = self.mock_setup() - self.assertEqual(mock_process._connection, mock_connection) + self.assertEqual(mock_process.connection, mock_connection) def test_is_idle_state_processing(self): - self._obj._state = self._obj.STATE_PROCESSING + self._obj.state = self._obj.STATE_PROCESSING self.assertFalse(self._obj.is_idle) def test_is_running_state_processing(self): - self._obj._state = self._obj.STATE_PROCESSING + self._obj.state = self._obj.STATE_PROCESSING self.assertTrue(self._obj.is_running) def test_is_shutting_down_state_processing(self): - self._obj._state = self._obj.STATE_PROCESSING + self._obj.state = self._obj.STATE_PROCESSING self.assertFalse(self._obj.is_shutting_down) def test_is_stopped_state_processing(self): - self._obj._state = self._obj.STATE_PROCESSING + self._obj.state = self._obj.STATE_PROCESSING self.assertFalse(self._obj.is_stopped) - def test_too_many_errors_true(self): - self._obj._max_error_count = 1 - self._obj._counts[self._obj.ERROR] = 2 - self.assertTrue(self._obj.too_many_errors) - - def test_too_many_errors_false(self): - self._obj._max_error_count = 5 - self._obj._counts[self._obj.ERROR] = 2 - self.assertFalse(self._obj.too_many_errors) - def test_state_processing_desc(self): - self._obj._state = self._obj.STATE_PROCESSING + self._obj.state = self._obj.STATE_PROCESSING self.assertEqual(self._obj.state_description, - self._obj._STATES[self._obj.STATE_PROCESSING]) + self._obj.STATES[self._obj.STATE_PROCESSING]) diff --git a/tests/test_state.py b/tests/test_state.py index ee0b61f..d34a6c7 100644 --- a/tests/test_state.py +++ b/tests/test_state.py @@ -5,168 +5,168 @@ except ImportError: import unittest -from rejected import common +from rejected import state class TestState(unittest.TestCase): def setUp(self): - self._obj = common.State() + self._obj = state.State() - def testset_state_invalid_value(self): + def test_set_state_invalid_value(self): self.assertRaises(ValueError, self._obj.set_state, 9999) - def testset_state_expected_assignment(self): - self._state = self._obj.STATE_IDLE + def test_set_state_expected_assignment(self): + self.state = self._obj.STATE_IDLE self._obj.set_state(self._obj.STATE_CONNECTING) - self.assertEqual(self._obj._state, self._obj.STATE_CONNECTING) + self.assertEqual(self._obj.state, self._obj.STATE_CONNECTING) - def testset_state_state_start(self): - self._state = self._obj.STATE_IDLE + def test_set_state_state_start(self): + self.state = self._obj.STATE_IDLE value = 86400 with mock.patch('time.time', return_value=value): self._obj.set_state(self._obj.STATE_CONNECTING) - self.assertEqual(self._obj._state_start, value) + self.assertEqual(self._obj.state_start, value) def test_state_initializing_desc(self): - self._obj._state = self._obj.STATE_INITIALIZING + self._obj.state = self._obj.STATE_INITIALIZING self.assertEqual(self._obj.state_description, - self._obj._STATES[self._obj.STATE_INITIALIZING]) + self._obj.STATES[self._obj.STATE_INITIALIZING]) def test_state_connecting_desc(self): - self._obj._state = self._obj.STATE_CONNECTING + self._obj.state = self._obj.STATE_CONNECTING self.assertEqual(self._obj.state_description, - self._obj._STATES[self._obj.STATE_CONNECTING]) + self._obj.STATES[self._obj.STATE_CONNECTING]) def test_state_idle_desc(self): - self._obj._state = self._obj.STATE_IDLE + self._obj.state = self._obj.STATE_IDLE self.assertEqual(self._obj.state_description, - self._obj._STATES[self._obj.STATE_IDLE]) + self._obj.STATES[self._obj.STATE_IDLE]) def test_state_active_desc(self): - self._obj._state = self._obj.STATE_ACTIVE + self._obj.state = self._obj.STATE_ACTIVE self.assertEqual(self._obj.state_description, - self._obj._STATES[self._obj.STATE_ACTIVE]) + self._obj.STATES[self._obj.STATE_ACTIVE]) def test_state_stop_requested_desc(self): - self._obj._state = self._obj.STATE_STOP_REQUESTED + self._obj.state = self._obj.STATE_STOP_REQUESTED self.assertEqual(self._obj.state_description, - self._obj._STATES[self._obj.STATE_STOP_REQUESTED]) + self._obj.STATES[self._obj.STATE_STOP_REQUESTED]) def test_state_shutting_down_desc(self): - self._obj._state = self._obj.STATE_SHUTTING_DOWN + self._obj.state = self._obj.STATE_SHUTTING_DOWN self.assertEqual(self._obj.state_description, - self._obj._STATES[self._obj.STATE_SHUTTING_DOWN]) + self._obj.STATES[self._obj.STATE_SHUTTING_DOWN]) def test_state_stopped_desc(self): - self._obj._state = self._obj.STATE_STOPPED + self._obj.state = self._obj.STATE_STOPPED self.assertEqual(self._obj.state_description, - self._obj._STATES[self._obj.STATE_STOPPED]) + self._obj.STATES[self._obj.STATE_STOPPED]) def test_is_idle_state_initializing(self): - self._obj._state = self._obj.STATE_INITIALIZING + self._obj.state = self._obj.STATE_INITIALIZING self.assertFalse(self._obj.is_idle) def test_is_idle_state_connecting(self): - self._obj._state = self._obj.STATE_CONNECTING + self._obj.state = self._obj.STATE_CONNECTING self.assertFalse(self._obj.is_idle) def test_is_idle_state_idle(self): - self._obj._state = self._obj.STATE_IDLE + self._obj.state = self._obj.STATE_IDLE self.assertTrue(self._obj.is_idle) def test_is_idle_state_processing(self): - self._obj._state = self._obj.STATE_ACTIVE + self._obj.state = self._obj.STATE_ACTIVE self.assertFalse(self._obj.is_idle) def test_is_idle_state_stop_requested(self): - self._obj._state = self._obj.STATE_STOP_REQUESTED + self._obj.state = self._obj.STATE_STOP_REQUESTED self.assertFalse(self._obj.is_idle) def test_is_idle_state_shutting_down(self): - self._obj._state = self._obj.STATE_SHUTTING_DOWN + self._obj.state = self._obj.STATE_SHUTTING_DOWN self.assertFalse(self._obj.is_idle) def test_is_idle_state_stopped(self): - self._obj._state = self._obj.STATE_STOPPED + self._obj.state = self._obj.STATE_STOPPED self.assertFalse(self._obj.is_idle) def test_is_running_state_initializing(self): - self._obj._state = self._obj.STATE_INITIALIZING + self._obj.state = self._obj.STATE_INITIALIZING self.assertFalse(self._obj.is_running) def test_is_running_state_connecting(self): - self._obj._state = self._obj.STATE_CONNECTING + self._obj.state = self._obj.STATE_CONNECTING self.assertFalse(self._obj.is_running) def test_is_running_state_idle(self): - self._obj._state = self._obj.STATE_IDLE + self._obj.state = self._obj.STATE_IDLE self.assertTrue(self._obj.is_running) def test_is_running_state_processing(self): - self._obj._state = self._obj.STATE_ACTIVE + self._obj.state = self._obj.STATE_ACTIVE self.assertTrue(self._obj.is_running) def test_is_running_state_stop_requested(self): - self._obj._state = self._obj.STATE_STOP_REQUESTED + self._obj.state = self._obj.STATE_STOP_REQUESTED self.assertFalse(self._obj.is_running) def test_is_running_state_shutting_down(self): - self._obj._state = self._obj.STATE_SHUTTING_DOWN + self._obj.state = self._obj.STATE_SHUTTING_DOWN self.assertFalse(self._obj.is_running) def test_is_running_state_stopped(self): - self._obj._state = self._obj.STATE_STOPPED + self._obj.state = self._obj.STATE_STOPPED self.assertFalse(self._obj.is_running) def test_is_shutting_down_state_initializing(self): - self._obj._state = self._obj.STATE_INITIALIZING + self._obj.state = self._obj.STATE_INITIALIZING self.assertFalse(self._obj.is_shutting_down) def test_is_shutting_down_state_connecting(self): - self._obj._state = self._obj.STATE_CONNECTING + self._obj.state = self._obj.STATE_CONNECTING self.assertFalse(self._obj.is_shutting_down) def test_is_shutting_down_state_idle(self): - self._obj._state = self._obj.STATE_IDLE + self._obj.state = self._obj.STATE_IDLE self.assertFalse(self._obj.is_shutting_down) def test_is_shutting_down_state_processing(self): - self._obj._state = self._obj.STATE_ACTIVE + self._obj.state = self._obj.STATE_ACTIVE self.assertFalse(self._obj.is_shutting_down) def test_is_shutting_down_state_stop_requested(self): - self._obj._state = self._obj.STATE_STOP_REQUESTED + self._obj.state = self._obj.STATE_STOP_REQUESTED self.assertFalse(self._obj.is_shutting_down) def test_is_shutting_down_state_shutting_down(self): - self._obj._state = self._obj.STATE_SHUTTING_DOWN + self._obj.state = self._obj.STATE_SHUTTING_DOWN self.assertTrue(self._obj.is_shutting_down) def test_is_shutting_down_state_stopped(self): - self._obj._state = self._obj.STATE_STOPPED + self._obj.state = self._obj.STATE_STOPPED self.assertFalse(self._obj.is_shutting_down) def test_is_stopped_state_initializing(self): - self._obj._state = self._obj.STATE_INITIALIZING + self._obj.state = self._obj.STATE_INITIALIZING self.assertFalse(self._obj.is_stopped) def test_is_stopped_state_connecting(self): - self._obj._state = self._obj.STATE_CONNECTING + self._obj.state = self._obj.STATE_CONNECTING self.assertFalse(self._obj.is_stopped) def test_is_stopped_state_idle(self): - self._obj._state = self._obj.STATE_IDLE + self._obj.state = self._obj.STATE_IDLE self.assertFalse(self._obj.is_stopped) def test_is_stopped_state_processing(self): - self._obj._state = self._obj.STATE_ACTIVE + self._obj.state = self._obj.STATE_ACTIVE self.assertFalse(self._obj.is_stopped) def test_is_stopped_state_stop_requested(self): - self._obj._state = self._obj.STATE_STOP_REQUESTED + self._obj.state = self._obj.STATE_STOP_REQUESTED self.assertFalse(self._obj.is_stopped) def test_is_stopped_state_shutting_down(self): - self._obj._state = self._obj.STATE_SHUTTING_DOWN + self._obj.state = self._obj.STATE_SHUTTING_DOWN self.assertFalse(self._obj.is_stopped)