diff --git a/CHANGELOG.md b/CHANGELOG.md index a49b7e6..35c240b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,8 +4,14 @@ ### Added +- LocalPipelineLock for single-thread non-networked pipeline applications. (#80) +- Local lock interface is now exposed through the DataContext for use of arbitrary locks in the pipeline. (#80) + ### Changed +- Bumped `redis` dependency to 4.3.1. (#94) +- Pipeline class now uses middleware architecture for customizable pre/post run actions. (#80) + ### Deprecated ### Removed diff --git a/test/custom_step_lock_tests.py b/test/custom_step_lock_tests.py new file mode 100644 index 0000000..4708262 --- /dev/null +++ b/test/custom_step_lock_tests.py @@ -0,0 +1,40 @@ +import unittest + +from watergrid.context import DataContext +from watergrid.pipelines.pipeline import Pipeline +from watergrid.steps import Step + + +class MockSetLockStep(Step): + def __init__(self): + super().__init__(self.__class__.__name__) + + def run(self, context: DataContext): + if not context.lock.acquire(): + raise Exception("Unable to obtain lock") + + +class MockVerifyStep(Step): + def __init__(self): + super().__init__(self.__class__.__name__) + self.mock_flag = False + + def run(self, context: DataContext): + self.mock_flag = context.lock.has_lock() + + def get_flag(self) -> bool: + return self.mock_flag + + +class CustomStepLockTestCase(unittest.TestCase): + def test_can_use_lock(self): + mock_step = MockVerifyStep() + pipeline = Pipeline("test_pipeline") + pipeline.add_step(MockSetLockStep()) + pipeline.add_step(mock_step) + pipeline.run() + self.assertTrue(mock_step.get_flag()) + + +if __name__ == "__main__": + unittest.main() diff --git a/watergrid/context/data_context.py b/watergrid/context/data_context.py index b0bea8a..a09ddcd 100644 --- a/watergrid/context/data_context.py +++ b/watergrid/context/data_context.py @@ -1,5 +1,6 @@ from watergrid.context import OutputMode from watergrid.context.context_metadata import ContextMetadata +from watergrid.locks.PipelineLock import PipelineLock class DataContext: @@ -12,6 +13,7 @@ def __init__(self): self.data = {} self.output_mode = OutputMode.DIRECT self.metadata = ContextMetadata() + self._lock = None def set(self, key: str, value: object) -> None: """ @@ -91,6 +93,14 @@ def set_run_metadata(self, metadata: ContextMetadata) -> None: """ self.metadata = metadata + @property + def lock(self): + return self._lock + + @lock.setter + def lock(self, lock: PipelineLock): + self._lock = lock + @staticmethod def deep_copy_context(context): """ diff --git a/watergrid/locks/LocalPipelineLock.py b/watergrid/locks/LocalPipelineLock.py new file mode 100644 index 0000000..7e13326 --- /dev/null +++ b/watergrid/locks/LocalPipelineLock.py @@ -0,0 +1,30 @@ +from watergrid.locks.PipelineLock import PipelineLock + + +class LocalPipelineLock(PipelineLock): + def acquire(self) -> bool: + if not self._lock_obj: + self._lock_obj = True + return True + else: + return False + + def has_lock(self) -> bool: + return self._lock_obj + + def extend_lease(self): + pass + + def release(self): + self._lock_obj = False + + def read_key(self, key: str): + return self.__key_value[key] + + def write_key(self, key: str, value): + self.__key_value[key] = value + + def __init__(self, lock_timeout: int = 60): + super().__init__(lock_timeout) + self._lock_obj = False + self.__key_value = {} diff --git a/watergrid/middleware/ContextMiddleware.py b/watergrid/middleware/ContextMiddleware.py new file mode 100644 index 0000000..15f1641 --- /dev/null +++ b/watergrid/middleware/ContextMiddleware.py @@ -0,0 +1,22 @@ +from abc import abstractmethod, ABC + +from watergrid.context import DataContext +from watergrid.steps import Step + + +class ContextMiddleware(ABC): + """ + Abstract class representing an action that is run before and after + every context run within each step in a pipeline. + """ + + def __init__(self): + pass + + @abstractmethod + def pre_context(self, step: Step, context: DataContext): + pass + + @abstractmethod + def post_context(self, step: Step, context: DataContext): + pass diff --git a/watergrid/middleware/PipelineMiddlware.py b/watergrid/middleware/PipelineMiddlware.py new file mode 100644 index 0000000..3f6f123 --- /dev/null +++ b/watergrid/middleware/PipelineMiddlware.py @@ -0,0 +1,19 @@ +from abc import ABC, abstractmethod + + +class PipelineMiddleware(ABC): + """ + Abstract class representing an action that is run before and after + every run of the pipeline. + """ + + def __init__(self): + pass + + @abstractmethod + def pre_pipeline(self, pipeline): + pass + + @abstractmethod + def post_pipeline(self, pipeline): + pass diff --git a/watergrid/middleware/StepMiddleware.py b/watergrid/middleware/StepMiddleware.py new file mode 100644 index 0000000..e19c6c8 --- /dev/null +++ b/watergrid/middleware/StepMiddleware.py @@ -0,0 +1,21 @@ +from abc import ABC, abstractmethod + +from watergrid.steps import Step + + +class StepMiddleware(ABC): + """ + Abstract class representing an action that runs before and after each + action. + """ + + def __init__(self): + pass + + @abstractmethod + def pre_step(self, step: Step, contexts: list) -> None: + pass + + @abstractmethod + def post_step(self, step: Step, contexts: list) -> None: + pass diff --git a/watergrid/middleware/__init__.py b/watergrid/middleware/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/watergrid/middleware/context/ContextMetricsMiddleware.py b/watergrid/middleware/context/ContextMetricsMiddleware.py new file mode 100644 index 0000000..d7d4497 --- /dev/null +++ b/watergrid/middleware/context/ContextMetricsMiddleware.py @@ -0,0 +1,16 @@ +from watergrid.context import DataContext +from watergrid.metrics.MetricsStore import MetricsStore +from watergrid.middleware.ContextMiddleware import ContextMiddleware +from watergrid.steps import Step + + +class ContextMetricsMiddleware(ContextMiddleware): + def pre_context(self, step: Step, context: DataContext): + self._store.start_step_monitoring(step.get_step_name()) + + def post_context(self, step: Step, context: DataContext): + self._store.stop_step_monitoring() + + def __init__(self, store: MetricsStore): + super().__init__() + self._store = store diff --git a/watergrid/middleware/context/LockInjectorMiddleware.py b/watergrid/middleware/context/LockInjectorMiddleware.py new file mode 100644 index 0000000..e981a06 --- /dev/null +++ b/watergrid/middleware/context/LockInjectorMiddleware.py @@ -0,0 +1,22 @@ +from watergrid.context import DataContext +from watergrid.locks.LocalPipelineLock import LocalPipelineLock +from watergrid.middleware.ContextMiddleware import ContextMiddleware +from watergrid.steps import Step + + +class LockInjectorMiddleware(ContextMiddleware): + """ + Modifies a context before an after a step to ensure that the lock context + is not deep copied as when using local locks this will break the + functionality of the lock. + """ + + def pre_context(self, step: Step, context: DataContext): + context.lock = self._pipeline.lock + + def post_context(self, step: Step, context: DataContext): + context.lock = None + + def __init__(self, pipeline): + super().__init__() + self._pipeline = pipeline diff --git a/watergrid/middleware/context/__init__.py b/watergrid/middleware/context/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/watergrid/middleware/pipeline/LastRunMiddleware.py b/watergrid/middleware/pipeline/LastRunMiddleware.py new file mode 100644 index 0000000..3e09da8 --- /dev/null +++ b/watergrid/middleware/pipeline/LastRunMiddleware.py @@ -0,0 +1,19 @@ +import time + +from watergrid.middleware.PipelineMiddlware import PipelineMiddleware + + +class LastRunMiddleware(PipelineMiddleware): + @property + def last_run(self): + return self._last_run + + def pre_pipeline(self, pipeline): + pass + + def post_pipeline(self, pipeline): + self._last_run = time.time() + + def __init__(self): + super().__init__() + self._last_run = None diff --git a/watergrid/middleware/pipeline/PipelineMetricsMiddleware.py b/watergrid/middleware/pipeline/PipelineMetricsMiddleware.py new file mode 100644 index 0000000..4a64ec5 --- /dev/null +++ b/watergrid/middleware/pipeline/PipelineMetricsMiddleware.py @@ -0,0 +1,14 @@ +from watergrid.metrics.MetricsStore import MetricsStore +from watergrid.middleware.PipelineMiddlware import PipelineMiddleware + + +class PipelineMetricsMiddleware(PipelineMiddleware): + def __init__(self, store: MetricsStore): + super().__init__() + self._store = store + + def pre_pipeline(self, pipeline): + self._store.start_pipeline_monitoring(pipeline.get_pipeline_name()) + + def post_pipeline(self, pipeline): + self._store.stop_pipeline_monitoring() diff --git a/watergrid/middleware/pipeline/StepOrderingMiddleware.py b/watergrid/middleware/pipeline/StepOrderingMiddleware.py new file mode 100644 index 0000000..a224715 --- /dev/null +++ b/watergrid/middleware/pipeline/StepOrderingMiddleware.py @@ -0,0 +1,16 @@ +from watergrid.middleware.PipelineMiddlware import PipelineMiddleware +from watergrid.pipelines.pipeline_verifier import PipelineVerifier + + +class StepOrderingMiddleware(PipelineMiddleware): + def pre_pipeline(self, pipeline): + PipelineVerifier.verify_pipeline_dependencies_fulfilled( + pipeline._steps + ) + PipelineVerifier.verify_pipeline_step_ordering(pipeline._steps) + + def post_pipeline(self, pipeline): + pass + + def __init__(self): + super().__init__() diff --git a/watergrid/middleware/pipeline/__init__.py b/watergrid/middleware/pipeline/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/watergrid/pipelines/ha_pipeline.py b/watergrid/pipelines/ha_pipeline.py index 7e21d03..410be0d 100644 --- a/watergrid/pipelines/ha_pipeline.py +++ b/watergrid/pipelines/ha_pipeline.py @@ -20,13 +20,12 @@ def __init__(self, pipeline_name: str, pipeline_lock: PipelineLock): self.__lock_timings = [] def run(self): - self.verify_pipeline() if self.__pipeline_lock.lock(): super().run() self.__pipeline_lock.unlock() else: logging.debug( - "Pipeline {} is already running on another instance".format( + "pipeline {} is already running on another instance".format( self.get_pipeline_name() ) ) @@ -37,7 +36,6 @@ def run_interval(self, job_interval_s: int) -> None: :param job_interval_s: Number of seconds to wait between pipeline runs. :return: None """ - self.verify_pipeline() while True: self._run_interval_loop(job_interval_s) time.sleep(self._calculate_delay(job_interval_s) / 1000) @@ -78,7 +76,7 @@ def _verify_lock_metadata(self, job_interval_s: int) -> None: self._set_last_run(time.time() - job_interval_s) if time.time() - last_run > job_interval_s * 3: logging.warning( - "Pipeline {} has fallen more than three cycles behind. Consider increasing the job interval or " + "pipeline {} has fallen more than three cycles behind. Consider increasing the job interval or " "provisioning more machines.".format(self.get_pipeline_name()) ) self._set_last_run(time.time() - job_interval_s) @@ -90,7 +88,7 @@ def _handle_lock_acquire_failure(self) -> None: :return: None """ logging.debug( - "Pipeline {} is already running on another instance".format( + "pipeline {} is already running on another instance".format( self.get_pipeline_name() ) ) @@ -117,7 +115,7 @@ def _get_last_run(self) -> float: def _get_pipeline_lock_name(self) -> str: """ Builds the name of the lock assigned to this pipeline. - :return: Pipeline lock name. + :return: pipeline lock name. """ return "{}_last_run".format(self.get_pipeline_name()) diff --git a/watergrid/pipelines/pipeline.py b/watergrid/pipelines/pipeline.py index 8fdce8b..a038f64 100644 --- a/watergrid/pipelines/pipeline.py +++ b/watergrid/pipelines/pipeline.py @@ -6,8 +6,22 @@ import pycron from watergrid.context import DataContext, OutputMode, ContextMetadata +from watergrid.locks.LocalPipelineLock import LocalPipelineLock +from watergrid.locks.PipelineLock import PipelineLock from watergrid.metrics.MetricsStore import MetricsStore -from watergrid.pipelines.pipeline_verifier import PipelineVerifier +from watergrid.middleware.context.ContextMetricsMiddleware import ( + ContextMetricsMiddleware, +) +from watergrid.middleware.context.LockInjectorMiddleware import ( + LockInjectorMiddleware, +) +from watergrid.middleware.pipeline.LastRunMiddleware import LastRunMiddleware +from watergrid.middleware.pipeline.PipelineMetricsMiddleware import ( + PipelineMetricsMiddleware, +) +from watergrid.middleware.pipeline.StepOrderingMiddleware import ( + StepOrderingMiddleware, +) from watergrid.steps import Sequence from watergrid.steps import Step @@ -20,11 +34,24 @@ class Pipeline(ABC): :type pipeline_name: str """ - def __init__(self, pipeline_name: str): + def __init__(self, pipeline_name: str, lock: PipelineLock = LocalPipelineLock()): self._pipeline_name = pipeline_name self._steps = [] self._metrics_store = MetricsStore() - self._last_run = None + self._context_middleware = [] + self._step_middleware = [] + self._pipeline_middleware = [] + self._pipeline_middleware.append(StepOrderingMiddleware()) + self._lastrun_tracker = LastRunMiddleware() + self._pipeline_middleware.append(self._lastrun_tracker) + self._pipeline_middleware.append(PipelineMetricsMiddleware(self._metrics_store)) + self._context_middleware.append(ContextMetricsMiddleware(self._metrics_store)) + self._context_middleware.append(LockInjectorMiddleware(self)) + self.lock = lock + + @property + def _last_run(self): + return self._lastrun_tracker.last_run def add_step(self, step: Step): """ @@ -49,20 +76,67 @@ def run(self): Blocking operation that runs all steps in the pipeline once. :return: None """ - self.verify_pipeline() - self._metrics_store.start_pipeline_monitoring(self._pipeline_name) + self.__pipeline_middleware_wrapper() + + def __pipeline_middleware_wrapper(self): + self.__pipeline_middleware_pre() + contexts = [self.__generate_first_context()] try: - self.__run_pipeline_steps() - self._last_run = time.time() + for step in self._steps: + contexts = self.__step_middleware_wrapper(step, contexts) + if len(contexts) == 0: + break except Exception as e: self._metrics_store.report_exception(e) self._metrics_store.stop_step_monitoring() finally: - self._metrics_store.stop_pipeline_monitoring() + self.__pipeline_middleware_post() + + def __pipeline_middleware_pre(self): + for middleware in self._pipeline_middleware: + middleware.pre_pipeline(self) + + def __pipeline_middleware_post(self): + for middleware in reversed(self._pipeline_middleware): + middleware.post_pipeline(self) + + def __step_middleware_wrapper(self, step: Step, contexts: list) -> list: + self.__step_middleware_pre(step, contexts) + next_contexts = [] + for context in contexts: + self.__context_middleware_wrapper(step, context) + self.__process_step_output(context, step.get_step_provides(), next_contexts) + self.__step_middleware_post(step, contexts) + return next_contexts + + def __step_middleware_pre(self, step: Step, contexts: list): + for middleware in self._step_middleware: + middleware.pre_step(self, step, contexts) + + def __step_middleware_post(self, step: Step, contexts: list): + for middleware in reversed(self._step_middleware): + middleware.post_step(self, step, contexts) + + def __context_middleware_wrapper( + self, step: Step, context: DataContext + ) -> DataContext: + self.__context_middleware_pre(step, context) + step.run_step(context) + self.__context_middleware_post(step, context) + return context + + def __context_middleware_pre(self, step: Step, context: DataContext): + for middleware in self._context_middleware: + middleware.pre_context(step, context) + + def __context_middleware_post(self, step: Step, context: DataContext): + for middleware in reversed(self._context_middleware): + middleware.post_context(step, context) def run_loop(self): """ - Runs the pipeline in a loop. Subsequent executions will run immediately after the previous execution. + Runs the pipeline in a loop. Subsequent executions will run immediately + after the previous execution. """ while True: self.run() @@ -97,14 +171,6 @@ def get_pipeline_name(self) -> str: """ return self._pipeline_name - def verify_pipeline(self): - """ - Verifies that the pipeline is valid and that all step dependencies are met. - :return: None - """ - PipelineVerifier.verify_pipeline_dependencies_fulfilled(self._steps) - PipelineVerifier.verify_pipeline_step_ordering(self._steps) - def add_metrics_exporter(self, exporter): """ Adds a metrics exporter to the pipeline metric store. @@ -125,17 +191,6 @@ def get_pipeline_guid(self): result += step.get_step_name() + type(step).__name__ return base64.urlsafe_b64encode(result.encode("utf-8")) - def __run_pipeline_steps(self): - """ - Performs setup and runs all steps in the pipeline. - :return: - """ - contexts = [self.__generate_first_context()] - for step in self._steps: - contexts = self.__run_step_for_each_context(step, contexts) - if len(contexts) == 0: - break - def __generate_first_context(self) -> DataContext: """ Generates the first context for the pipeline. @@ -146,19 +201,6 @@ def __generate_first_context(self) -> DataContext: context.set_run_metadata(context_meta) return context - def __run_step_for_each_context(self, step: Step, context_list: list) -> list: - """ - Runs the given step once for each context in the context list and performs post-processing. - :param step: Step to run. - :param context_list: List of contexts to provide to the step runtime. - :return: List of contexts to provide to the next step. - """ - next_contexts = [] - for context in context_list: - self.__run_step_with_performance_monitoring(step, context) - self.__process_step_output(context, step.get_step_provides(), next_contexts) - return next_contexts - def __process_step_output( self, context: DataContext, step_provides: list, next_contexts: list ): @@ -176,17 +218,6 @@ def __process_step_output( else: self.__forward_context(context, next_contexts) - def __run_step_with_performance_monitoring(self, step: Step, context: DataContext): - """ - Runs a single step in the pipeline and forwards performance data to any installed monitoring plugins. - :param step: Step to run. - :param context: Context to provide to the step runtime. - :return: None - """ - self._metrics_store.start_step_monitoring(step.get_step_name()) - step.run_step(context) - self._metrics_store.stop_step_monitoring() - def __split_context( self, step_provides: list, context: DataContext, next_contexts: list ):