diff --git a/Makefile b/Makefile index cead17d9..d08ced9e 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,4 @@ -# Generated by Medikit 0.7.2 on 2019-06-01. +# Generated by Medikit 0.7.3 on 2019-06-30. # All changes will be overriden. # Edit Projectfile and run “make update” (or “medikit update”) to regenerate. @@ -21,6 +21,10 @@ QUICK ?= PIP ?= $(PYTHON) -m pip PIP_INSTALL_OPTIONS ?= VERSION ?= $(shell git describe 2>/dev/null || git rev-parse --short HEAD) +BLACK ?= $(shell which black || echo black) +BLACK_OPTIONS ?= --line-length 120 +ISORT ?= $(PYTHON) -m isort +ISORT_OPTIONS ?= --recursive --apply PYTEST ?= $(PYTHON_DIRNAME)/pytest PYTEST_OPTIONS ?= --capture=no --cov=$(PACKAGE) --cov-report html SPHINX_BUILD ?= $(PYTHON_DIRNAME)/sphinx-build @@ -30,7 +34,7 @@ SPHINX_BUILDDIR ?= $(SPHINX_SOURCEDIR)/_build SPHINX_AUTOBUILD ?= $(PYTHON_DIRNAME)/sphinx-autobuild MEDIKIT ?= $(PYTHON) -m medikit MEDIKIT_UPDATE_OPTIONS ?= -MEDIKIT_VERSION ?= 0.7.2 +MEDIKIT_VERSION ?= 0.7.3 .PHONY: $(SPHINX_SOURCEDIR) clean format help install install-dev install-docker install-jupyter install-sqlalchemy medikit quick test update update-requirements watch-$(SPHINX_SOURCEDIR) @@ -111,6 +115,10 @@ else @mkdir -p .medikit; touch $@ endif +format: install-dev ## Reformats the codebase (with black, isort). + $(BLACK) $(BLACK_OPTIONS) . Projectfile + $(ISORT) $(ISORT_OPTIONS) . Projectfile + test: install-dev ## Runs the test suite. $(PYTEST) $(PYTEST_OPTIONS) tests @@ -120,10 +128,6 @@ $(SPHINX_SOURCEDIR): install-dev ## watch-$(SPHINX_SOURCEDIR): ## $(SPHINX_AUTOBUILD) $(SPHINX_SOURCEDIR) $(shell mktemp -d) -format: ## Reformats the whole codebase using our standards (requires black and isort). - black -l 120 . - isort -rc -o mondrian -o whistle -y . - medikit: # Checks installed medikit version and updates it if it is outdated. @$(PYTHON) -c 'import medikit, pip, sys; from packaging.version import Version; sys.exit(0 if (Version(medikit.__version__) >= Version("$(MEDIKIT_VERSION)")) and (Version(pip.__version__) < Version("10")) else 1)' || $(PYTHON) -m pip install -U "pip ~=19.0" "medikit>=$(MEDIKIT_VERSION)" diff --git a/Projectfile b/Projectfile index d6ab52b4..0331ac52 100644 --- a/Projectfile +++ b/Projectfile @@ -2,100 +2,78 @@ from medikit import require -make = require('make') -pytest = require('pytest') -python = require('python') -sphinx = require('sphinx') +make = require("make") +python = require("python") +sphinx = require("sphinx") + +require("format") + +# Tests +with require("pytest") as pytest: + pytest.set_version("~=5.0") + pytest.addons["pytest-timeout"] = ">=1,<2" python.setup( - name='bonobo', - python_requires='>=3.5', - description='Bonobo, a simple, modern and atomic extract-transform-load toolkit for python 3.5+.', - license='Apache License, Version 2.0', - url='https://www.bonobo-project.org/', - download_url='https://github.com/python-bonobo/bonobo/tarball/{version}', - author='Romain Dorgueil', - author_email='romain@dorgueil.net', + name="bonobo", + python_requires=">=3.5", + description="Bonobo, a simple, modern and atomic extract-transform-load toolkit for python 3.5+.", + license="Apache License, Version 2.0", + url="https://www.bonobo-project.org/", + download_url="https://github.com/python-bonobo/bonobo/tarball/{version}", + author="Romain Dorgueil", + author_email="romain@dorgueil.net", data_files=[ ( - 'share/jupyter/nbextensions/bonobo-jupyter', [ - 'bonobo/contrib/jupyter/static/extension.js', - 'bonobo/contrib/jupyter/static/index.js', - 'bonobo/contrib/jupyter/static/index.js.map', - ] - ), + "share/jupyter/nbextensions/bonobo-jupyter", + [ + "bonobo/contrib/jupyter/static/extension.js", + "bonobo/contrib/jupyter/static/index.js", + "bonobo/contrib/jupyter/static/index.js.map", + ], + ) ], entry_points={ - 'console_scripts': [ - 'bonobo = bonobo.commands:entrypoint', + "console_scripts": ["bonobo = bonobo.commands:entrypoint"], + "bonobo.commands": [ + "convert = bonobo.commands.convert:ConvertCommand", + "download = bonobo.commands.download:DownloadCommand", + "examples = bonobo.commands.examples:ExamplesCommand", + "init = bonobo.commands.init:InitCommand", + "inspect = bonobo.commands.inspect:InspectCommand", + "run = bonobo.commands.run:RunCommand", + "version = bonobo.commands.version:VersionCommand", ], - 'bonobo.commands': [ - 'convert = bonobo.commands.convert:ConvertCommand', - 'download = bonobo.commands.download:DownloadCommand', - 'examples = bonobo.commands.examples:ExamplesCommand', - 'init = bonobo.commands.init:InitCommand', - 'inspect = bonobo.commands.inspect:InspectCommand', - 'run = bonobo.commands.run:RunCommand', - 'version = bonobo.commands.version:VersionCommand', - ], - } + }, ) python.add_requirements( - 'cached-property ~=1.4', - 'fs ~=2.0', - 'graphviz >=0.8,<0.9', - 'jinja2 ~=2.9', - 'mondrian ~=0.8', - 'packaging ~=19.0', - 'psutil ~=5.4', - 'python-slugify ~=1.2.0', - 'requests ~=2.0', - 'stevedore ~=1.27', - 'whistle ~=1.0', - dev=[ - 'cookiecutter >=1.5,<1.6', - 'pytest-timeout >=1,<2', - 'sphinx-sitemap >=0.2,<0.3', - ], - docker=[ - 'bonobo-docker ~=0.6.0a1', - ], - jupyter=[ - 'ipywidgets ~=6.0', - 'jupyter ~=1.0', - ], - sqlalchemy=[ - 'bonobo-sqlalchemy ~=0.6.0a1', - ], + "cached-property ~=1.5", + "fs ~=2.4", + "graphviz >=0.11,<0.12", + "jinja2 ~=2.10", + "mondrian ~=0.8", + "packaging ~=19.0", + "psutil ~=5.6", + "python-slugify ~=1.2.0", + "requests ~=2.0", + "stevedore ~=1.30", + "whistle ~=1.0", + dev=["cookiecutter >=1.6,<1.7", "sphinx-sitemap ~=1.0"], + docker=["bonobo-docker ~=0.6"], + jupyter=["ipywidgets ~=6.0", "jupyter ~=1.0"], + sqlalchemy=["bonobo-sqlalchemy ~=0.6"], ) @listen(make.on_generate) def on_make_generate(event): makefile = event.makefile - - # Sphinx - makefile['SPHINX_AUTOBUILD'] = '$(PYTHON_DIRNAME)/sphinx-autobuild' - makefile.add_target( - 'watch-$(SPHINX_SOURCEDIR)', - '$(SPHINX_AUTOBUILD) $(SPHINX_SOURCEDIR) $(shell mktemp -d)', - phony=True - ) - # Formating + # Sphinx + makefile["SPHINX_AUTOBUILD"] = "$(PYTHON_DIRNAME)/sphinx-autobuild" makefile.add_target( - 'format', - ''' - black -l 120 . - isort -rc -o mondrian -o whistle -y . - ''', - phony=True, - doc='Reformats the whole codebase using our standards (requires black and isort).' + "watch-$(SPHINX_SOURCEDIR)", "$(SPHINX_AUTOBUILD) $(SPHINX_SOURCEDIR) $(shell mktemp -d)", phony=True ) - - - # vim: ft=python: diff --git a/bonobo/constants.py b/bonobo/constants.py index 83b47cde..2549bbda 100644 --- a/bonobo/constants.py +++ b/bonobo/constants.py @@ -24,6 +24,7 @@ from bonobo.util.envelopes import UnchangedEnvelope BEGIN = Token("Begin") +ERROR = Token("Error") END = Token("End") NOT_MODIFIED = UnchangedEnvelope() diff --git a/bonobo/execution/contexts/base.py b/bonobo/execution/contexts/base.py index 6f3fa71e..f83aaf14 100644 --- a/bonobo/execution/contexts/base.py +++ b/bonobo/execution/contexts/base.py @@ -26,44 +26,76 @@ def unrecoverable(error_handler): class Lifecycle: - def __init__(self): + def __init__(self, *, daemon=False): + # Daemonized? (lifecycle is equal to execution, not to its own) + self._daemon = bool(daemon) + self._started = False self._stopped = False self._killed = False self._defunct = False + @property + def daemon(self): + return self._daemon + @property def started(self): - return self._started + """ + Is this context started? + + """ + return self._daemon or self._started @property def stopped(self): - return self._stopped + """ + Is this context stopped? + + """ + return not self._daemon and self._stopped @property def killed(self): + """ + Is this context marked as killed? + + """ return self._killed @property def defunct(self): + """ + Is this context marked as defunct? This happens after an unrecoverable error was raised in a node. + """ return self._defunct @property def alive(self): - return self._started and not self._stopped + """ + Is this context alive? It means it should be started, but not yet stopped. + + """ + return self.daemon or (self._started and not self._stopped) @property def should_loop(self): + """ + Should we run the execution context loop, or does the current state means that we should give up on execution? + + """ return self.alive and not any((self.defunct, self.killed)) @property def status(self): """ - One character status for this node. + One character status for this node, used for display. """ if self._defunct: return "!" + if self.daemon: + return "~" if not self.started: return " " if not self.stopped: @@ -71,13 +103,25 @@ def status(self): return "-" def __enter__(self): + """ + Allows to enter this context like a context manager (using `with ...` statement). + + """ self.start() return self def __exit__(self, exc_type=None, exc_val=None, exc_tb=None): # lgtm [py/special-method-wrong-signature] + """ + Allows to exit this context when used as a context manager. + + """ self.stop() def get_flags_as_string(self): + """ + Utility function used to display verbose and explicit status in the console. + + """ if self._defunct: return term.red("[defunct]") if self.killed: @@ -87,18 +131,38 @@ def get_flags_as_string(self): return "" def start(self): + """ + Starts this context. This can only be done once. + + """ + if self.daemon: + return + if self.started: raise RuntimeError("This context is already started ({}).".format(get_name(self))) self._started = True def stop(self): + """ + Stops this context. The context must be started first, but once it is, you can call this method as many time + as you want, the subsequent calls will have no effect. + + """ if not self.started: raise RuntimeError("This context cannot be stopped as it never started ({}).".format(get_name(self))) self._stopped = True + def daemonize(self, daemon=True): + self._daemon = bool(daemon) + def kill(self): + """ + Kills a running context. This only sets a flag that will be used by the loop control structures to actually + stop the work in a clean manner. + + """ if not self.started: raise RuntimeError("Cannot kill an unstarted context.") @@ -112,13 +176,24 @@ def handle_error(self, exctype, exc, tb, *, level=logging.ERROR): return self.error((exctype, exc, tb), level=level) def error(self, exc_info, *, level=logging.ERROR): + """ + Called when a non-fatal error happens. + + """ logging.getLogger(__name__).log(level, repr(self), exc_info=exc_info) def fatal(self, exc_info, *, level=logging.CRITICAL): + """ + Called when a fatal/unrecoverable error happens. + + """ logging.getLogger(__name__).log(level, repr(self), exc_info=exc_info) self._defunct = True def as_dict(self): + """ + Returns a dict describing this context, that can be used for example for JSON serialization. + """ return { "status": self.status, "name": self.name, @@ -128,8 +203,8 @@ def as_dict(self): class BaseContext(Lifecycle, Wrapper): - def __init__(self, wrapped, *, parent=None): - Lifecycle.__init__(self) + def __init__(self, wrapped, *, daemon=False, parent=None): + Lifecycle.__init__(self, daemon=daemon) Wrapper.__init__(self, wrapped) self.parent = parent @@ -137,6 +212,7 @@ def __init__(self, wrapped, *, parent=None): def xstatus(self): """ UNIX-like exit status, only coherent if the context has stopped. + """ if self._defunct: return 70 diff --git a/bonobo/execution/contexts/graph.py b/bonobo/execution/contexts/graph.py index 812c29ab..6c89ec1d 100644 --- a/bonobo/execution/contexts/graph.py +++ b/bonobo/execution/contexts/graph.py @@ -6,12 +6,13 @@ from whistle import EventDispatcher from bonobo.config import create_container -from bonobo.constants import BEGIN, EMPTY, END +from bonobo.constants import BEGIN, EMPTY, END, ERROR from bonobo.errors import InactiveReadableError from bonobo.execution import events from bonobo.execution.contexts.base import BaseContext -from bonobo.execution.contexts.node import AsyncNodeExecutionContext, NodeExecutionContext +from bonobo.execution.contexts.node import NodeExecutionContext from bonobo.execution.contexts.plugin import PluginExecutionContext +from bonobo.structs.inputs import Pipe logger = logging.getLogger(__name__) @@ -32,19 +33,19 @@ class BaseGraphExecutionContext(BaseContext): def started(self): if not len(self.nodes): return super(BaseGraphExecutionContext, self).started - return any(node.started for node in self.nodes) + return any(node.started for node in self.nodes if not node.daemon) @property def stopped(self): if not len(self.nodes): return super(BaseGraphExecutionContext, self).stopped - return all(node.started and node.stopped for node in self.nodes) + return all(node.started and node.stopped for node in self.nodes if not node.daemon) @property def alive(self): if not len(self.nodes): return super(BaseGraphExecutionContext, self).alive - return any(node.alive for node in self.nodes) + return any(node.alive for node in self.nodes if not node.daemon) @property def xstatus(self): @@ -58,20 +59,41 @@ def __init__(self, graph, *, plugins=None, services=None, dispatcher=None): super(BaseGraphExecutionContext, self).__init__(graph) self.dispatcher = dispatcher or EventDispatcher() self.graph = graph - self.nodes = [self.create_node_execution_context_for(node) for node in self.graph] + self.errors = Pipe() + errors = self.graph.outputs_of(ERROR) + self.nodes = [ + self.create_node_execution_context_for( + node, **({"daemon": True} if i in errors else {"_errors": self.errors}) + ) + for i, node in self.graph.items() + ] self.plugins = [self.create_plugin_execution_context_for(plugin) for plugin in plugins or ()] self.services = create_container(services) # Probably not a good idea to use it unless you really know what you're doing. But you can access the context. self.services["__graph_context"] = self + # Plug our error handlers if there are any + for i in self.graph.outputs_of(ERROR): + self[i].input.daemonize() + self.errors.targets.append(self[i].input) + for i, node_context in enumerate(self): outputs = self.graph.outputs_of(i) if len(outputs): node_context.outputs = [self[j].input for j in outputs] - node_context.input.on_begin = partial(node_context._put, BEGIN, _control=True) - node_context.input.on_end = partial(node_context._put, END, _control=True) - node_context.input.on_finalize = partial(node_context.stop) + + # We should propagate the "daemon" property to the next nodes in line + if node_context.daemon: + for j in outputs: + self[j].daemonize() + + # When a signal token is sent to an input, pass it to the node context outputs so it cascade through the + # graph. + if not node_context.daemon: + node_context.input.on_begin.append(partial(node_context._put, BEGIN, _control=True)) + node_context.input.on_end.append(partial(node_context._put, END, _control=True)) + node_context.input.on_finalize.append(partial(node_context.stop)) def __getitem__(self, item): return self.nodes[item] @@ -82,8 +104,12 @@ def __len__(self): def __iter__(self): yield from self.nodes - def create_node_execution_context_for(self, node): - return self.NodeExecutionContextType(node, parent=self) + @classmethod + def create_queue(cls, *args, **kwargs): + return cls.NodeExecutionContextType.create_queue(*args, **kwargs) + + def create_node_execution_context_for(self, node, **kwargs): + return self.NodeExecutionContextType(node, parent=self, **kwargs) def create_plugin_execution_context_for(self, plugin): if isinstance(plugin, type): @@ -108,6 +134,7 @@ def start(self, starter=None): self.register_plugins() self.dispatch(events.START) + self.tick(pause=False) for node in self.nodes: @@ -133,6 +160,7 @@ def stop(self, stopper=None): else: stopper(node_context) self.tick(pause=False) + self.dispatch(events.STOPPED) self.unregister_plugins() @@ -148,8 +176,8 @@ def write(self, *messages): """Push a list of messages in the inputs of this graph's inputs, matching the output of special node "BEGIN" in our graph.""" - for i in self.graph.outputs_of(BEGIN): - for message in messages: + for message in messages: + for i in self.graph.outputs_of(BEGIN): self[i].write(message) def loop(self): @@ -162,19 +190,9 @@ def loop(self): except Empty: continue except InactiveReadableError: + logger.debug("Discarding node {!r}.".format(node)) nodes.discard(node) def run_until_complete(self): self.write(BEGIN, EMPTY, END) self.loop() - - -class AsyncGraphExecutionContext(GraphExecutionContext): - NodeExecutionContextType = AsyncNodeExecutionContext - - def __init__(self, *args, loop, **kwargs): - self._event_loop = loop - super().__init__(*args, **kwargs) - - def create_node_execution_context_for(self, node): - return self.NodeExecutionContextType(node, parent=self, loop=self._event_loop) diff --git a/bonobo/execution/contexts/node.py b/bonobo/execution/contexts/node.py index 814f8e6c..38920a38 100644 --- a/bonobo/execution/contexts/node.py +++ b/bonobo/execution/contexts/node.py @@ -1,7 +1,7 @@ import logging import sys from collections import namedtuple -from queue import Empty +from queue import Empty, Queue from time import sleep from types import GeneratorType @@ -10,7 +10,7 @@ from bonobo.constants import BEGIN, END, TICK_PERIOD from bonobo.errors import InactiveReadableError, UnrecoverableError, UnrecoverableTypeError from bonobo.execution.contexts.base import BaseContext -from bonobo.structs.inputs import AioInput, Input +from bonobo.structs.inputs import Input, Pipe from bonobo.structs.tokens import Flag, Token from bonobo.util import deprecated, ensure_tuple, get_name, isconfigurabletype from bonobo.util.bags import BagType @@ -20,6 +20,7 @@ logger = logging.getLogger(__name__) UnboundArguments = namedtuple("UnboundArguments", ["args", "kwargs"]) +ErrorBag = namedtuple("ErrorBag", ["context", "level", "type", "value", "traceback"]) class NodeExecutionContext(BaseContext, WithStatistics): @@ -35,7 +36,17 @@ class NodeExecutionContext(BaseContext, WithStatistics): QueueType = Input - def __init__(self, wrapped, *, parent=None, services=None, _input=None, _outputs=None): + @classmethod + def create_queue(cls, *args, **kwargs): + return cls.QueueType(*args, **kwargs) + + @property + def should_loop(self): + if self.parent.should_loop: + return super(NodeExecutionContext, self).should_loop + return not self.input.empty() + + def __init__(self, wrapped, *, parent=None, services=None, daemon=False, _input=None, _outputs=None, _errors=None): """ Node execution context has the responsibility fo storing the state of a transformation during its execution. @@ -45,7 +56,7 @@ def __init__(self, wrapped, *, parent=None, services=None, _input=None, _outputs :param _input: input queue (optional) :param _outputs: output queues (optional) """ - BaseContext.__init__(self, wrapped, parent=parent) + BaseContext.__init__(self, wrapped, parent=parent, daemon=daemon) WithStatistics.__init__(self, "in", "out", "err", "warn") # Services: how we'll access external dependencies @@ -59,8 +70,9 @@ def __init__(self, wrapped, *, parent=None, services=None, _input=None, _outputs self.services = None # Input / Output: how the wrapped node will communicate - self.input = _input or self.QueueType() + self.input = _input or self.create_queue() self.outputs = _outputs or [] + self.errors = _errors or Queue(maxsize=0) # Types self._input_type, self._input_length = None, None @@ -69,6 +81,12 @@ def __init__(self, wrapped, *, parent=None, services=None, _input=None, _outputs # Stack: context decorators for the execution self._stack = None + def get_statistics(self, *args, **kwargs): + return super(NodeExecutionContext, self).get_statistics(*args, **kwargs) + ( + ("rrl", self.input._runlevel), + ("wrl", self.input._writable_runlevel), + ) + def __str__(self): return self.__name__ + self.get_statistics_as_string(prefix=" ") @@ -195,6 +213,10 @@ def stop(self): super().stop() + def daemonize(self, daemon=True): + super(NodeExecutionContext, self).daemonize(daemon) + self.input.daemonize(daemon) + def send(self, *_output, _input=None): return self._put(self._cast(_input, _output)) @@ -273,10 +295,21 @@ def write_sync(self, *messages): def error(self, exc_info, *, level=logging.ERROR): self.increment("err") - super().error(exc_info, level=level) + try: + self.errors.put(ErrorBag(self, level, *exc_info)) + except Exception: + logger.exception("An exception occurred while trying to send an error in the error stream.") + + if not (self.errors and isinstance(self.errors, Pipe) and len(self.errors)): + super().error(exc_info, level=level) def fatal(self, exc_info, *, level=logging.CRITICAL): self.increment("err") + try: + self.errors.put(ErrorBag(self, level, *exc_info)) + except Exception: + logger.exception("An exception occurred while trying to send an unrecoverable error in the error stream.") + super().fatal(exc_info, level=level) self.input.shutdown() @@ -381,53 +414,6 @@ def _get_initial_context(self): return UnboundArguments((), {}) -class AsyncNodeExecutionContext(NodeExecutionContext): - QueueType = AioInput - - def __init__(self, *args, loop, **kwargs): - super().__init__(*args, **kwargs) - self._event_loop = loop - - async def _get(self): - """ - Read from the input queue. - - If Queue raises (like Timeout or Empty), stat won't be changed. - - """ - input_bag = await self.input.get() - - # Store or check input type - if self._input_type is None: - self._input_type = type(input_bag) - elif type(input_bag) != self._input_type: - try: - if self._input_type == tuple: - input_bag = self._input_type(input_bag) - else: - input_bag = self._input_type(*input_bag) - except Exception as exc: - raise UnrecoverableTypeError( - "Input type changed to incompatible type between calls to {!r}.\nGot {!r} which is not of type {!r}.".format( - self.wrapped, input_bag, self._input_type - ) - ) from exc - - # Store or check input length, which is a soft fallback in case we're just using tuples - if self._input_length is None: - self._input_length = len(input_bag) - elif len(input_bag) != self._input_length: - raise UnrecoverableTypeError( - "Input length changed between calls to {!r}.\nExpected {} but got {}: {!r}.".format( - self.wrapped, self._input_length, len(input_bag), input_bag - ) - ) - - self.increment("in") # XXX should that go before type check ? - - return input_bag - - def isflag(param): return isinstance(param, Flag) diff --git a/bonobo/execution/strategies/__init__.py b/bonobo/execution/strategies/__init__.py index 5995bf04..bbae0d66 100644 --- a/bonobo/execution/strategies/__init__.py +++ b/bonobo/execution/strategies/__init__.py @@ -6,9 +6,7 @@ at home if you want to give it a shot. """ -from bonobo.execution.strategies.executor import ( - AsyncThreadPoolExecutorStrategy, ProcessPoolExecutorStrategy, ThreadPoolExecutorStrategy -) +from bonobo.execution.strategies.executor import ProcessPoolExecutorStrategy, ThreadPoolExecutorStrategy from bonobo.execution.strategies.naive import NaiveStrategy __all__ = ["create_strategy"] @@ -17,7 +15,6 @@ "naive": NaiveStrategy, "processpool": ProcessPoolExecutorStrategy, "threadpool": ThreadPoolExecutorStrategy, - "aio_threadpool": AsyncThreadPoolExecutorStrategy, } DEFAULT_STRATEGY = "threadpool" diff --git a/bonobo/execution/strategies/executor.py b/bonobo/execution/strategies/executor.py index 17c3ab3b..6cc90963 100644 --- a/bonobo/execution/strategies/executor.py +++ b/bonobo/execution/strategies/executor.py @@ -1,16 +1,10 @@ -import asyncio import functools import logging import sys from concurrent.futures import Executor, ProcessPoolExecutor, ThreadPoolExecutor -from cached_property import cached_property - -from bonobo import settings from bonobo.constants import BEGIN, END -from bonobo.execution.contexts.graph import AsyncGraphExecutionContext from bonobo.execution.strategies.base import Strategy -from bonobo.util import get_name logger = logging.getLogger(__name__) @@ -79,35 +73,6 @@ def create_executor(self, graph): return self.executor_factory(max_workers=len(graph)) -class AsyncThreadPoolExecutorStrategy(ThreadPoolExecutorStrategy): - GraphExecutionContextType = AsyncGraphExecutionContext - - def __init__(self, GraphExecutionContextType=None): - if not settings.ALPHA.get(): - raise NotImplementedError( - "{} is experimental, you need to explicitely activate it using ALPHA=True in system env.".format( - get_name(self) - ) - ) - super().__init__(GraphExecutionContextType) - - @cached_property - def loop(self): - return asyncio.get_event_loop() - - def create_graph_execution_context(self, *args, **kwargs): - return super(AsyncThreadPoolExecutorStrategy, self).create_graph_execution_context( - *args, **kwargs, loop=self.loop - ) - - def get_starter(self, executor, futures): - return functools.partial( - self.loop.run_in_executor, - executor, - super(AsyncThreadPoolExecutorStrategy, self).get_starter(executor, futures), - ) - - class ProcessPoolExecutorStrategy(ExecutorStrategy): executor_factory = ProcessPoolExecutor diff --git a/bonobo/plugins/console.py b/bonobo/plugins/console.py index 7f619a58..525b4e59 100644 --- a/bonobo/plugins/console.py +++ b/bonobo/plugins/console.py @@ -89,12 +89,16 @@ def write(self, context, prefix="", rewind=True, append=None): alive_color = Style.BRIGHT dead_color = Style.BRIGHT + Fore.BLACK + daemon_color = alive_color if context.alive else dead_color for i in context.graph.topologically_sorted_indexes: node = context[i] name_suffix = "({})".format(i) if settings.DEBUG.get() else "" liveliness_color = alive_color if node.alive else dead_color + if node.daemon: + liveliness_color = daemon_color + liveliness_prefix = " {}{}{} ".format(liveliness_color, node.status, Style.RESET_ALL) _line = "".join( ( diff --git a/bonobo/structs/graphs.py b/bonobo/structs/graphs.py index 103ee06f..f1859e1e 100644 --- a/bonobo/structs/graphs.py +++ b/bonobo/structs/graphs.py @@ -1,12 +1,13 @@ import html import json +import types from collections import namedtuple from copy import copy from graphviz import ExecutableNotFound from graphviz.dot import Digraph -from bonobo.constants import BEGIN +from bonobo.constants import BEGIN, ERROR from bonobo.util import get_name from bonobo.util.collections import coalesce @@ -86,7 +87,7 @@ class Graph: name = "" def __init__(self, *chain): - self.edges = {BEGIN: set()} + self.edges = {BEGIN: set(), ERROR: set()} self.named = {} self.nodes = [] if len(chain): @@ -95,6 +96,10 @@ def __init__(self, *chain): def __iter__(self): yield from self.nodes + def items(self): + for i in range(len(self.nodes)): + yield i, self.nodes[i] + def __len__(self): """ The graph length is defined as its node count. @@ -128,6 +133,13 @@ def orphan(self): """ return self.get_cursor(None) + def errors(self): + """ + Create a `GraphCursor` that starts at the special `ERROR` pointer so one can add custom error handling chains. + + """ + return self.get_cursor(ERROR) + def index_of(self, mixed): """ Find the index based on various strategies for a node, probably an input or output of chain. Supported @@ -184,7 +196,7 @@ def add_node(self, new_node, *, _name=None): return idx def get_or_add_node(self, new_node, *, _name=None): - if new_node in self.nodes: + if not isinstance(new_node, types.BuiltinFunctionType) and new_node in self.nodes: if _name is not None: raise RuntimeError("Cannot name a node that is already present in the graph.") return self.index_of(new_node) diff --git a/bonobo/structs/inputs.py b/bonobo/structs/inputs.py index f0277b17..793282dc 100644 --- a/bonobo/structs/inputs.py +++ b/bonobo/structs/inputs.py @@ -15,12 +15,10 @@ # limitations under the License. from abc import ABCMeta, abstractmethod -from asyncio.queues import Queue as AioQueue from queue import Queue from bonobo.constants import BEGIN, END from bonobo.errors import AbstractError, InactiveReadableError, InactiveWritableError -from bonobo.nodes import noop BUFFER_SIZE = 8192 @@ -44,32 +42,40 @@ def put(self, data, block=True, timeout=None): class Input(Queue, Readable, Writable): - def __init__(self, maxsize=BUFFER_SIZE): + def __init__(self, maxsize=BUFFER_SIZE, *, daemon=False): Queue.__init__(self, maxsize) + self._daemon = bool(daemon) self._runlevel = 0 self._writable_runlevel = 0 - self.on_initialize = noop - self.on_begin = noop - self.on_end = noop - self.on_finalize = noop + self.on_initialize = [] + self.on_begin = [] + self.on_end = [] + self.on_finalize = [] + + def daemonize(self, daemon=True): + self._daemon = bool(daemon) + + def call_event_handlers(self, handlers): + for handler in handlers: + handler() def put(self, data, block=True, timeout=None): # Begin token is a metadata to raise the input runlevel. if data == BEGIN: if not self._runlevel: - self.on_initialize() + self.call_event_handlers(self.on_initialize) self._runlevel += 1 self._writable_runlevel += 1 # callback - self.on_begin() + self.call_event_handlers(self.on_begin) return # Check we are actually able to receive data. - if self._writable_runlevel < 1: + if self._writable_runlevel < 1 and not self._daemon: raise InactiveWritableError("Cannot put() on an inactive {}.".format(Writable.__name__)) if data == END: @@ -79,9 +85,9 @@ def put(self, data, block=True, timeout=None): def _decrement_runlevel(self): if self._runlevel == 1: - self.on_finalize() + self.call_event_handlers(self.on_finalize) self._runlevel -= 1 - self.on_end() + self.call_event_handlers(self.on_end) def get(self, block=True, timeout=None): if not self.alive: @@ -115,8 +121,23 @@ def empty(self): @property def alive(self): - return self._runlevel > 0 + return self._daemon or self._runlevel > 0 + + +class Pipe(Input): + def __init__(self, maxsize=0, **kwargs): + super().__init__(maxsize=maxsize, **kwargs) + self.targets = [] + + def put(self, item, block=True, timeout=None): + if len(self.targets): + for target in self.targets: + target.put(item, block=block, timeout=timeout) + else: + super().put(item, block=block, timeout=timeout) + def __len__(self): + return len(self.targets) -class AioInput(AioQueue): - pass + def __bool__(self): + return True diff --git a/bonobo/util/statistics.py b/bonobo/util/statistics.py index 23c7ccdd..03dad3ea 100644 --- a/bonobo/util/statistics.py +++ b/bonobo/util/statistics.py @@ -1,11 +1,14 @@ import time +from bonobo.util import tuplize + class WithStatistics: def __init__(self, *names): self.statistics_names = names self.statistics = {name: 0 for name in names} + @tuplize def get_statistics(self, *args, **kwargs): return ((name, self.statistics[name]) for name in self.statistics_names) diff --git a/bonobo/util/testing.py b/bonobo/util/testing.py index e233812a..ac5a38f5 100644 --- a/bonobo/util/testing.py +++ b/bonobo/util/testing.py @@ -103,8 +103,8 @@ def __init__(self, *args, buffer=None, **kwargs): BufferingContext.__init__(self, buffer) GraphExecutionContext.__init__(self, *args, **kwargs) - def create_node_execution_context_for(self, node): - return self.NodeExecutionContextType(node, parent=self, buffer=self.buffer) + def create_node_execution_context_for(self, node, *args, **kwargs): + return self.NodeExecutionContextType(node, *args, parent=self, buffer=self.buffer, **kwargs) def runner(f): diff --git a/requirements-dev.txt b/requirements-dev.txt index 0c566900..5874851b 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -1,27 +1,27 @@ -e .[dev] -r requirements.txt alabaster==0.7.12 -arrow==0.14.0 +arrow==0.14.2 atomicwrites==1.3.0 attrs==19.1.0 babel==2.7.0 binaryornot==0.4.4 -certifi==2019.3.9 +certifi==2019.6.16 chardet==3.0.4 click==7.0 -cookiecutter==1.5.1 +cookiecutter==1.6.0 coverage==4.5.3 docutils==0.14 future==0.17.1 idna==2.8 imagesize==1.1.0 -importlib-metadata==0.17 +importlib-metadata==0.18 +isort==4.3.21 jinja2-time==0.2.0 jinja2==2.10.1 markupsafe==1.1.1 -more-itertools==7.0.0 +more-itertools==7.1.0 packaging==19.0 -pathlib2==2.3.3 pluggy==0.12.0 poyo==0.4.2 py==1.8.0 @@ -29,15 +29,16 @@ pygments==2.4.2 pyparsing==2.4.0 pytest-cov==2.7.1 pytest-timeout==1.3.3 -pytest==3.10.1 +pytest==5.0.0 python-dateutil==2.8.0 pytz==2019.1 requests==2.22.0 six==1.12.0 -snowballstemmer==1.2.1 -sphinx-sitemap==0.2 +snowballstemmer==1.9.0 +sphinx-sitemap==1.0.2 sphinx==1.8.5 sphinxcontrib-websupport==1.1.2 urllib3==1.25.3 +wcwidth==0.1.7 whichcraft==0.5.2 zipp==0.5.1 diff --git a/requirements-docker.txt b/requirements-docker.txt index 4757fe61..be875749 100644 --- a/requirements-docker.txt +++ b/requirements-docker.txt @@ -2,20 +2,20 @@ -r requirements.txt appdirs==1.4.3 bonobo-docker==0.6.0 -certifi==2019.3.9 +certifi==2019.6.16 chardet==3.0.4 colorama==0.3.9 docker-pycreds==0.4.0 docker==2.7.0 -fs==2.4.5 +fs==2.4.8 graphviz==0.8.4 idna==2.8 jinja2==2.10.1 markupsafe==1.1.1 mondrian==0.8.0 packaging==19.0 -pbr==5.2.1 -psutil==5.6.2 +pbr==5.3.1 +psutil==5.6.3 pyparsing==2.4.0 python-slugify==1.2.6 pytz==2019.1 @@ -23,8 +23,7 @@ requests==2.22.0 semantic-version==2.6.0 six==1.12.0 stevedore==1.30.1 -typing==3.6.6 -unidecode==1.0.23 +unidecode==1.1.1 urllib3==1.25.3 websocket-client==0.56.0 whistle==1.0.1 diff --git a/requirements-jupyter.txt b/requirements-jupyter.txt index 77746b4a..1454dc3f 100644 --- a/requirements-jupyter.txt +++ b/requirements-jupyter.txt @@ -9,14 +9,14 @@ defusedxml==0.6.0 entrypoints==0.3 ipykernel==5.1.1 ipython-genutils==0.2.0 -ipython==7.5.0 +ipython==7.6.0 ipywidgets==6.0.1 -jedi==0.13.3 +jedi==0.14.0 jinja2==2.10.1 jsonschema==3.0.1 jupyter-client==5.2.4 jupyter-console==6.0.0 -jupyter-core==4.4.0 +jupyter-core==4.5.0 jupyter==1.0.0 markupsafe==1.1.1 mistune==0.8.4 @@ -24,22 +24,22 @@ nbconvert==5.5.0 nbformat==4.4.0 notebook==5.7.8 pandocfilters==1.4.2 -parso==0.4.0 +parso==0.5.0 pexpect==4.7.0 pickleshare==0.7.5 -prometheus-client==0.6.0 +prometheus-client==0.7.1 prompt-toolkit==2.0.9 ptyprocess==0.6.0 pygments==2.4.2 pyrsistent==0.15.2 python-dateutil==2.8.0 -pyzmq==18.0.1 +pyzmq==18.0.2 qtconsole==4.5.1 send2trash==1.5.0 six==1.12.0 terminado==0.8.2 testpath==0.4.2 -tornado==6.0.2 +tornado==6.0.3 traitlets==4.3.2 wcwidth==0.1.7 webencodings==0.5.1 diff --git a/requirements-sqlalchemy.txt b/requirements-sqlalchemy.txt index f101ca9c..98e65ee5 100644 --- a/requirements-sqlalchemy.txt +++ b/requirements-sqlalchemy.txt @@ -1,27 +1,26 @@ -e .[sqlalchemy] -r requirements.txt appdirs==1.4.3 -bonobo-sqlalchemy==0.6.0 -certifi==2019.3.9 +bonobo-sqlalchemy==0.6.1 +certifi==2019.6.16 chardet==3.0.4 colorama==0.3.9 -fs==2.4.5 +fs==2.4.8 graphviz==0.8.4 idna==2.8 jinja2==2.10.1 markupsafe==1.1.1 mondrian==0.8.0 packaging==19.0 -pbr==5.2.1 -psutil==5.6.2 +pbr==5.3.1 +psutil==5.6.3 pyparsing==2.4.0 python-slugify==1.2.6 pytz==2019.1 requests==2.22.0 six==1.12.0 -sqlalchemy==1.3.4 +sqlalchemy==1.3.5 stevedore==1.30.1 -typing==3.6.6 -unidecode==1.0.23 +unidecode==1.1.1 urllib3==1.25.3 whistle==1.0.1 diff --git a/requirements.txt b/requirements.txt index 80bc1e64..15bd3ddb 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,25 +1,24 @@ -e . appdirs==1.4.3 cached-property==1.5.1 -certifi==2019.3.9 +certifi==2019.6.16 chardet==3.0.4 colorama==0.3.9 -fs==2.4.5 -graphviz==0.8.4 +fs==2.4.8 +graphviz==0.11 idna==2.8 jinja2==2.10.1 markupsafe==1.1.1 mondrian==0.8.0 packaging==19.0 -pbr==5.2.1 -psutil==5.6.2 +pbr==5.3.1 +psutil==5.6.3 pyparsing==2.4.0 python-slugify==1.2.6 pytz==2019.1 requests==2.22.0 six==1.12.0 stevedore==1.30.1 -typing==3.6.6 -unidecode==1.0.23 +unidecode==1.1.1 urllib3==1.25.3 whistle==1.0.1 diff --git a/setup.cfg b/setup.cfg index 61b41b99..f8a9a39a 100644 --- a/setup.cfg +++ b/setup.cfg @@ -4,3 +4,6 @@ description-file = README.rst [bdist_wheel] universal = 1 +[isort] +line_length = 120 + diff --git a/setup.py b/setup.py index d3d291bb..addbe048 100644 --- a/setup.py +++ b/setup.py @@ -1,4 +1,4 @@ -# Generated by Medikit 0.7.2 on 2019-06-01. +# Generated by Medikit 0.7.3 on 2019-06-30. # All changes will be overriden. # Edit Projectfile and run “make update” (or “medikit update”) to regenerate. @@ -65,31 +65,32 @@ def execfile(fname, globs, locs=None): packages=find_packages(exclude=["ez_setup", "example", "test"]), include_package_data=True, install_requires=[ - "cached-property ~= 1.4", - "fs ~= 2.0", - "graphviz >= 0.8, < 0.9", - "jinja2 ~= 2.9", + "cached-property ~= 1.5", + "fs ~= 2.4", + "graphviz >= 0.11, < 0.12", + "jinja2 ~= 2.10", "mondrian ~= 0.8", "packaging ~= 19.0", - "psutil ~= 5.4", + "psutil ~= 5.6", "python-slugify ~= 1.2.0", "requests ~= 2.0", - "stevedore ~= 1.27", + "stevedore ~= 1.30", "whistle ~= 1.0", ], extras_require={ "dev": [ - "cookiecutter >= 1.5, < 1.6", - "coverage ~= 4.4", - "pytest ~= 3.4", - "pytest-cov ~= 2.5", + "cookiecutter >= 1.6, < 1.7", + "coverage ~= 4.5", + "isort", + "pytest ~= 5.0", + "pytest-cov ~= 2.7", "pytest-timeout >= 1, < 2", "sphinx ~= 1.7", - "sphinx-sitemap >= 0.2, < 0.3", + "sphinx-sitemap ~= 1.0", ], - "docker": ["bonobo-docker ~= 0.6.0a1"], + "docker": ["bonobo-docker ~= 0.6"], "jupyter": ["ipywidgets ~= 6.0", "jupyter ~= 1.0"], - "sqlalchemy": ["bonobo-sqlalchemy ~= 0.6.0a1"], + "sqlalchemy": ["bonobo-sqlalchemy ~= 0.6"], }, entry_points={ "bonobo.commands": [ diff --git a/tests/execution/contexts/test_execution_contexts_node.py b/tests/execution/contexts/test_execution_contexts_node.py index 6b694e4e..eeca8bd9 100644 --- a/tests/execution/contexts/test_execution_contexts_node.py +++ b/tests/execution/contexts/test_execution_contexts_node.py @@ -1,3 +1,4 @@ +import logging from unittest.mock import MagicMock import pytest @@ -261,3 +262,19 @@ def test_split_token_inherit(): assert split_token(F_INHERIT) == ({F_INHERIT}, ()) assert split_token((F_INHERIT,)) == ({F_INHERIT}, ()) assert split_token((F_INHERIT, "foo", "bar")) == ({F_INHERIT}, ("foo", "bar")) + + +def test_nonfatal_error(): + def f(): + raise RuntimeError("woops") + yield "not a chance" + + with BufferingNodeExecutionContext(f) as context: + context.write_sync(EMPTY) + + error = context.errors.get_nowait() + assert error.context is context + assert error.level == logging.ERROR + assert error.type is RuntimeError + assert error.value.args[0] == "woops" + assert error.traceback diff --git a/tests/features/test_not_modified.py b/tests/features/test_not_modified.py index 8d753dab..4bdd5b77 100644 --- a/tests/features/test_not_modified.py +++ b/tests/features/test_not_modified.py @@ -13,5 +13,4 @@ def test_not_modified(): context.write_sync(*input_messages) result = context.get_buffer() - print(result) assert result == input_messages diff --git a/tests/structs/test_inputs.py b/tests/structs/test_inputs.py index c398ca54..7cfcd42d 100644 --- a/tests/structs/test_inputs.py +++ b/tests/structs/test_inputs.py @@ -20,7 +20,7 @@ from bonobo.constants import BEGIN, END from bonobo.errors import InactiveReadableError, InactiveWritableError -from bonobo.structs.inputs import Input +from bonobo.structs.inputs import Input, Pipe def test_input_runlevels(): @@ -64,3 +64,15 @@ def test_input_runlevels(): assert q.get() == "baz" with pytest.raises(InactiveReadableError): q.get() + + +def test_input_runlevels_daemonized(*, queue=None): + queue = queue or Input(daemon=True) + + assert queue.alive + queue.put("hi, daemon") + + +def test_pipes(): + queue = Pipe(daemon=True) + test_input_runlevels_daemonized()