From 6c63e93036df9e0e7e94a774a291fa42844db56c Mon Sep 17 00:00:00 2001 From: Joshua Zenn Date: Wed, 25 May 2022 23:11:27 -0400 Subject: [PATCH 1/5] #80 Local pipeline lock implementation --- CHANGELOG.md | 2 ++ watergrid/locks/LocalPipelineLock.py | 30 ++++++++++++++++++++++++++++ 2 files changed, 32 insertions(+) create mode 100644 watergrid/locks/LocalPipelineLock.py diff --git a/CHANGELOG.md b/CHANGELOG.md index a49b7e6..887d6ab 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,8 @@ ### Added +- LocalPipelineLock for single-thread non-networked pipeline applications. (#80) + ### Changed ### Deprecated diff --git a/watergrid/locks/LocalPipelineLock.py b/watergrid/locks/LocalPipelineLock.py new file mode 100644 index 0000000..3b56da1 --- /dev/null +++ b/watergrid/locks/LocalPipelineLock.py @@ -0,0 +1,30 @@ +from watergrid.locks.PipelineLock import PipelineLock + + +class LocalPipelineLock(PipelineLock): + def acquire(self) -> bool: + if not self.__lock_obj: + self.__lock_obj = True + return True + else: + return False + + def has_lock(self) -> bool: + return self.__lock_obj + + def extend_lease(self): + pass + + def release(self): + self.__lock_obj = False + + def read_key(self, key: str): + return self.__key_value[key] + + def write_key(self, key: str, value): + self.__key_value[key] = value + + def __init__(self, lock_timeout: int = 60): + super().__init__(lock_timeout) + self.__lock_obj = False + self.__key_value = {} \ No newline at end of file From 417225ab05212a9618c8711273b1aa52f78b0264 Mon Sep 17 00:00:00 2001 From: Joshua Zenn Date: Sat, 28 May 2022 12:51:22 -0400 Subject: [PATCH 2/5] #80 Switched to middleware architecture --- watergrid/middleware/ContextMiddleware.py | 22 ++++ watergrid/middleware/PipelineMiddlware.py | 19 +++ watergrid/middleware/StepMiddleware.py | 21 +++ watergrid/middleware/__init__.py | 0 .../context/ContextMetricsMiddleware.py | 16 +++ watergrid/middleware/context/__init__.py | 0 .../middleware/pipeline/LastRunMiddleware.py | 20 +++ .../pipeline/PipelineMetricsMiddleware.py | 14 ++ .../pipeline/StepOrderingMiddleware.py | 14 ++ watergrid/middleware/pipeline/__init__.py | 0 watergrid/pipelines/ha_pipeline.py | 10 +- watergrid/pipelines/pipeline.py | 122 +++++++++++------- 12 files changed, 205 insertions(+), 53 deletions(-) create mode 100644 watergrid/middleware/ContextMiddleware.py create mode 100644 watergrid/middleware/PipelineMiddlware.py create mode 100644 watergrid/middleware/StepMiddleware.py create mode 100644 watergrid/middleware/__init__.py create mode 100644 watergrid/middleware/context/ContextMetricsMiddleware.py create mode 100644 watergrid/middleware/context/__init__.py create mode 100644 watergrid/middleware/pipeline/LastRunMiddleware.py create mode 100644 watergrid/middleware/pipeline/PipelineMetricsMiddleware.py create mode 100644 watergrid/middleware/pipeline/StepOrderingMiddleware.py create mode 100644 watergrid/middleware/pipeline/__init__.py diff --git a/watergrid/middleware/ContextMiddleware.py b/watergrid/middleware/ContextMiddleware.py new file mode 100644 index 0000000..15f1641 --- /dev/null +++ b/watergrid/middleware/ContextMiddleware.py @@ -0,0 +1,22 @@ +from abc import abstractmethod, ABC + +from watergrid.context import DataContext +from watergrid.steps import Step + + +class ContextMiddleware(ABC): + """ + Abstract class representing an action that is run before and after + every context run within each step in a pipeline. + """ + + def __init__(self): + pass + + @abstractmethod + def pre_context(self, step: Step, context: DataContext): + pass + + @abstractmethod + def post_context(self, step: Step, context: DataContext): + pass diff --git a/watergrid/middleware/PipelineMiddlware.py b/watergrid/middleware/PipelineMiddlware.py new file mode 100644 index 0000000..3f6f123 --- /dev/null +++ b/watergrid/middleware/PipelineMiddlware.py @@ -0,0 +1,19 @@ +from abc import ABC, abstractmethod + + +class PipelineMiddleware(ABC): + """ + Abstract class representing an action that is run before and after + every run of the pipeline. + """ + + def __init__(self): + pass + + @abstractmethod + def pre_pipeline(self, pipeline): + pass + + @abstractmethod + def post_pipeline(self, pipeline): + pass diff --git a/watergrid/middleware/StepMiddleware.py b/watergrid/middleware/StepMiddleware.py new file mode 100644 index 0000000..e19c6c8 --- /dev/null +++ b/watergrid/middleware/StepMiddleware.py @@ -0,0 +1,21 @@ +from abc import ABC, abstractmethod + +from watergrid.steps import Step + + +class StepMiddleware(ABC): + """ + Abstract class representing an action that runs before and after each + action. + """ + + def __init__(self): + pass + + @abstractmethod + def pre_step(self, step: Step, contexts: list) -> None: + pass + + @abstractmethod + def post_step(self, step: Step, contexts: list) -> None: + pass diff --git a/watergrid/middleware/__init__.py b/watergrid/middleware/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/watergrid/middleware/context/ContextMetricsMiddleware.py b/watergrid/middleware/context/ContextMetricsMiddleware.py new file mode 100644 index 0000000..d7d4497 --- /dev/null +++ b/watergrid/middleware/context/ContextMetricsMiddleware.py @@ -0,0 +1,16 @@ +from watergrid.context import DataContext +from watergrid.metrics.MetricsStore import MetricsStore +from watergrid.middleware.ContextMiddleware import ContextMiddleware +from watergrid.steps import Step + + +class ContextMetricsMiddleware(ContextMiddleware): + def pre_context(self, step: Step, context: DataContext): + self._store.start_step_monitoring(step.get_step_name()) + + def post_context(self, step: Step, context: DataContext): + self._store.stop_step_monitoring() + + def __init__(self, store: MetricsStore): + super().__init__() + self._store = store diff --git a/watergrid/middleware/context/__init__.py b/watergrid/middleware/context/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/watergrid/middleware/pipeline/LastRunMiddleware.py b/watergrid/middleware/pipeline/LastRunMiddleware.py new file mode 100644 index 0000000..25f1d7a --- /dev/null +++ b/watergrid/middleware/pipeline/LastRunMiddleware.py @@ -0,0 +1,20 @@ +import time + +from watergrid.middleware.PipelineMiddlware import PipelineMiddleware + + +class LastRunMiddleware(PipelineMiddleware): + @property + def last_run(self): + return self._last_run + + def pre_pipeline(self, pipeline): + pass + + def post_pipeline(self, pipeline): + self._last_run = time.time() + + def __init__(self): + super().__init__() + self._last_run = None + diff --git a/watergrid/middleware/pipeline/PipelineMetricsMiddleware.py b/watergrid/middleware/pipeline/PipelineMetricsMiddleware.py new file mode 100644 index 0000000..4a64ec5 --- /dev/null +++ b/watergrid/middleware/pipeline/PipelineMetricsMiddleware.py @@ -0,0 +1,14 @@ +from watergrid.metrics.MetricsStore import MetricsStore +from watergrid.middleware.PipelineMiddlware import PipelineMiddleware + + +class PipelineMetricsMiddleware(PipelineMiddleware): + def __init__(self, store: MetricsStore): + super().__init__() + self._store = store + + def pre_pipeline(self, pipeline): + self._store.start_pipeline_monitoring(pipeline.get_pipeline_name()) + + def post_pipeline(self, pipeline): + self._store.stop_pipeline_monitoring() diff --git a/watergrid/middleware/pipeline/StepOrderingMiddleware.py b/watergrid/middleware/pipeline/StepOrderingMiddleware.py new file mode 100644 index 0000000..31b22ab --- /dev/null +++ b/watergrid/middleware/pipeline/StepOrderingMiddleware.py @@ -0,0 +1,14 @@ +from watergrid.middleware.PipelineMiddlware import PipelineMiddleware +from watergrid.pipelines.pipeline_verifier import PipelineVerifier + + +class StepOrderingMiddleware(PipelineMiddleware): + def pre_pipeline(self, pipeline): + PipelineVerifier.verify_pipeline_dependencies_fulfilled(pipeline._steps) + PipelineVerifier.verify_pipeline_step_ordering(pipeline._steps) + + def post_pipeline(self, pipeline): + pass + + def __init__(self): + super().__init__() \ No newline at end of file diff --git a/watergrid/middleware/pipeline/__init__.py b/watergrid/middleware/pipeline/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/watergrid/pipelines/ha_pipeline.py b/watergrid/pipelines/ha_pipeline.py index 7e21d03..410be0d 100644 --- a/watergrid/pipelines/ha_pipeline.py +++ b/watergrid/pipelines/ha_pipeline.py @@ -20,13 +20,12 @@ def __init__(self, pipeline_name: str, pipeline_lock: PipelineLock): self.__lock_timings = [] def run(self): - self.verify_pipeline() if self.__pipeline_lock.lock(): super().run() self.__pipeline_lock.unlock() else: logging.debug( - "Pipeline {} is already running on another instance".format( + "pipeline {} is already running on another instance".format( self.get_pipeline_name() ) ) @@ -37,7 +36,6 @@ def run_interval(self, job_interval_s: int) -> None: :param job_interval_s: Number of seconds to wait between pipeline runs. :return: None """ - self.verify_pipeline() while True: self._run_interval_loop(job_interval_s) time.sleep(self._calculate_delay(job_interval_s) / 1000) @@ -78,7 +76,7 @@ def _verify_lock_metadata(self, job_interval_s: int) -> None: self._set_last_run(time.time() - job_interval_s) if time.time() - last_run > job_interval_s * 3: logging.warning( - "Pipeline {} has fallen more than three cycles behind. Consider increasing the job interval or " + "pipeline {} has fallen more than three cycles behind. Consider increasing the job interval or " "provisioning more machines.".format(self.get_pipeline_name()) ) self._set_last_run(time.time() - job_interval_s) @@ -90,7 +88,7 @@ def _handle_lock_acquire_failure(self) -> None: :return: None """ logging.debug( - "Pipeline {} is already running on another instance".format( + "pipeline {} is already running on another instance".format( self.get_pipeline_name() ) ) @@ -117,7 +115,7 @@ def _get_last_run(self) -> float: def _get_pipeline_lock_name(self) -> str: """ Builds the name of the lock assigned to this pipeline. - :return: Pipeline lock name. + :return: pipeline lock name. """ return "{}_last_run".format(self.get_pipeline_name()) diff --git a/watergrid/pipelines/pipeline.py b/watergrid/pipelines/pipeline.py index 8fdce8b..cdb93d5 100644 --- a/watergrid/pipelines/pipeline.py +++ b/watergrid/pipelines/pipeline.py @@ -7,6 +7,14 @@ from watergrid.context import DataContext, OutputMode, ContextMetadata from watergrid.metrics.MetricsStore import MetricsStore +from watergrid.middleware.context.ContextMetricsMiddleware import \ + ContextMetricsMiddleware +from watergrid.middleware.pipeline.LastRunMiddleware import LastRunMiddleware +from watergrid.middleware.StepMiddleware import StepMiddleware +from watergrid.middleware.pipeline.PipelineMetricsMiddleware import \ + PipelineMetricsMiddleware +from watergrid.middleware.pipeline.StepOrderingMiddleware import \ + StepOrderingMiddleware from watergrid.pipelines.pipeline_verifier import PipelineVerifier from watergrid.steps import Sequence from watergrid.steps import Step @@ -24,7 +32,18 @@ def __init__(self, pipeline_name: str): self._pipeline_name = pipeline_name self._steps = [] self._metrics_store = MetricsStore() - self._last_run = None + self._context_middleware = [] + self._step_middleware = [] + self._pipeline_middleware = [] + self._pipeline_middleware.append(StepOrderingMiddleware()) + self._lastrun_tracker = LastRunMiddleware() + self._pipeline_middleware.append(self._lastrun_tracker) + self._pipeline_middleware.append(PipelineMetricsMiddleware(self._metrics_store)) + self._context_middleware.append(ContextMetricsMiddleware(self._metrics_store)) + + @property + def _last_run(self): + return self._lastrun_tracker.last_run def add_step(self, step: Step): """ @@ -49,20 +68,65 @@ def run(self): Blocking operation that runs all steps in the pipeline once. :return: None """ - self.verify_pipeline() - self._metrics_store.start_pipeline_monitoring(self._pipeline_name) + self.__pipeline_middleware_wrapper() + + def __pipeline_middleware_wrapper(self): + self.__pipeline_middleware_pre() + contexts = [self.__generate_first_context()] try: - self.__run_pipeline_steps() - self._last_run = time.time() + for step in self._steps: + contexts = self.__step_middleware_wrapper(step, contexts) + if len(contexts) == 0: + break except Exception as e: self._metrics_store.report_exception(e) self._metrics_store.stop_step_monitoring() finally: - self._metrics_store.stop_pipeline_monitoring() + self.__pipeline_middleware_post() + + def __pipeline_middleware_pre(self): + for middleware in self._pipeline_middleware: + middleware.pre_pipeline(self) + + def __pipeline_middleware_post(self): + for middleware in reversed(self._pipeline_middleware): + middleware.post_pipeline(self) + + def __step_middleware_wrapper(self, step: Step, contexts: list) -> list: + self.__step_middleware_pre(step, contexts) + next_contexts = [] + for context in contexts: + self.__context_middleware_wrapper(step, context) + self.__process_step_output(context, step.get_step_provides(), next_contexts) + self.__step_middleware_post(step, contexts) + return next_contexts + + def __step_middleware_pre(self, step: Step, contexts: list): + for middleware in self._step_middleware: + middleware.pre_step(self, step, contexts) + + def __step_middleware_post(self, step: Step, contexts: list): + for middleware in reversed(self._step_middleware): + middleware.post_step(self, step, contexts) + + def __context_middleware_wrapper(self, step: Step, context: DataContext) -> DataContext: + self.__context_middleware_pre(step, context) + step.run_step(context) + self.__context_middleware_post(step, context) + return context + + def __context_middleware_pre(self, step: Step, context: DataContext): + for middleware in self._context_middleware: + middleware.pre_context(step, context) + + def __context_middleware_post(self, step: Step, context: DataContext): + for middleware in reversed(self._context_middleware): + middleware.post_context(step, context) def run_loop(self): """ - Runs the pipeline in a loop. Subsequent executions will run immediately after the previous execution. + Runs the pipeline in a loop. Subsequent executions will run immediately + after the previous execution. """ while True: self.run() @@ -97,14 +161,6 @@ def get_pipeline_name(self) -> str: """ return self._pipeline_name - def verify_pipeline(self): - """ - Verifies that the pipeline is valid and that all step dependencies are met. - :return: None - """ - PipelineVerifier.verify_pipeline_dependencies_fulfilled(self._steps) - PipelineVerifier.verify_pipeline_step_ordering(self._steps) - def add_metrics_exporter(self, exporter): """ Adds a metrics exporter to the pipeline metric store. @@ -125,16 +181,12 @@ def get_pipeline_guid(self): result += step.get_step_name() + type(step).__name__ return base64.urlsafe_b64encode(result.encode("utf-8")) - def __run_pipeline_steps(self): + def _add_step_middleware(self, middleware: StepMiddleware): """ - Performs setup and runs all steps in the pipeline. - :return: + Adds a middlware layer that will run before and after each step + in the pipeline. """ - contexts = [self.__generate_first_context()] - for step in self._steps: - contexts = self.__run_step_for_each_context(step, contexts) - if len(contexts) == 0: - break + self._middleware.append(middleware) def __generate_first_context(self) -> DataContext: """ @@ -146,19 +198,6 @@ def __generate_first_context(self) -> DataContext: context.set_run_metadata(context_meta) return context - def __run_step_for_each_context(self, step: Step, context_list: list) -> list: - """ - Runs the given step once for each context in the context list and performs post-processing. - :param step: Step to run. - :param context_list: List of contexts to provide to the step runtime. - :return: List of contexts to provide to the next step. - """ - next_contexts = [] - for context in context_list: - self.__run_step_with_performance_monitoring(step, context) - self.__process_step_output(context, step.get_step_provides(), next_contexts) - return next_contexts - def __process_step_output( self, context: DataContext, step_provides: list, next_contexts: list ): @@ -176,17 +215,6 @@ def __process_step_output( else: self.__forward_context(context, next_contexts) - def __run_step_with_performance_monitoring(self, step: Step, context: DataContext): - """ - Runs a single step in the pipeline and forwards performance data to any installed monitoring plugins. - :param step: Step to run. - :param context: Context to provide to the step runtime. - :return: None - """ - self._metrics_store.start_step_monitoring(step.get_step_name()) - step.run_step(context) - self._metrics_store.stop_step_monitoring() - def __split_context( self, step_provides: list, context: DataContext, next_contexts: list ): From 16e590989dfd6b99d4985b1643b3688eb31fb0e2 Mon Sep 17 00:00:00 2001 From: Joshua Zenn Date: Sat, 28 May 2022 21:35:34 -0400 Subject: [PATCH 3/5] #80 Arbitrary locks in DataContext --- CHANGELOG.md | 4 ++ test/custom_step_lock_tests.py | 40 +++++++++++++++++++ watergrid/context/data_context.py | 10 +++++ watergrid/locks/LocalPipelineLock.py | 10 ++--- .../context/LockInjectorMiddleware.py | 21 ++++++++++ watergrid/pipelines/pipeline.py | 16 ++++---- 6 files changed, 87 insertions(+), 14 deletions(-) create mode 100644 test/custom_step_lock_tests.py create mode 100644 watergrid/middleware/context/LockInjectorMiddleware.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 887d6ab..35c240b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,9 +5,13 @@ ### Added - LocalPipelineLock for single-thread non-networked pipeline applications. (#80) +- Local lock interface is now exposed through the DataContext for use of arbitrary locks in the pipeline. (#80) ### Changed +- Bumped `redis` dependency to 4.3.1. (#94) +- Pipeline class now uses middleware architecture for customizable pre/post run actions. (#80) + ### Deprecated ### Removed diff --git a/test/custom_step_lock_tests.py b/test/custom_step_lock_tests.py new file mode 100644 index 0000000..4f80c38 --- /dev/null +++ b/test/custom_step_lock_tests.py @@ -0,0 +1,40 @@ +import unittest + +from watergrid.context import DataContext +from watergrid.pipelines.pipeline import Pipeline +from watergrid.steps import Step + + +class MockSetLockStep(Step): + def __init__(self): + super().__init__(self.__class__.__name__) + + def run(self, context: DataContext): + if not context.lock.acquire(): + raise Exception("Unable to obtain lock") + + +class MockVerifyStep(Step): + def __init__(self): + super().__init__(self.__class__.__name__) + self.mock_flag = False + + def run(self, context: DataContext): + self.mock_flag = context.lock.has_lock() + + def get_flag(self) -> bool: + return self.mock_flag + + +class CustomStepLockTestCase(unittest.TestCase): + def test_can_use_lock(self): + mock_step = MockVerifyStep() + pipeline = Pipeline("test_pipeline") + pipeline.add_step(MockSetLockStep()) + pipeline.add_step(mock_step) + pipeline.run() + self.assertTrue(mock_step.get_flag()) + + +if __name__ == '__main__': + unittest.main() diff --git a/watergrid/context/data_context.py b/watergrid/context/data_context.py index b0bea8a..a09ddcd 100644 --- a/watergrid/context/data_context.py +++ b/watergrid/context/data_context.py @@ -1,5 +1,6 @@ from watergrid.context import OutputMode from watergrid.context.context_metadata import ContextMetadata +from watergrid.locks.PipelineLock import PipelineLock class DataContext: @@ -12,6 +13,7 @@ def __init__(self): self.data = {} self.output_mode = OutputMode.DIRECT self.metadata = ContextMetadata() + self._lock = None def set(self, key: str, value: object) -> None: """ @@ -91,6 +93,14 @@ def set_run_metadata(self, metadata: ContextMetadata) -> None: """ self.metadata = metadata + @property + def lock(self): + return self._lock + + @lock.setter + def lock(self, lock: PipelineLock): + self._lock = lock + @staticmethod def deep_copy_context(context): """ diff --git a/watergrid/locks/LocalPipelineLock.py b/watergrid/locks/LocalPipelineLock.py index 3b56da1..749f651 100644 --- a/watergrid/locks/LocalPipelineLock.py +++ b/watergrid/locks/LocalPipelineLock.py @@ -3,20 +3,20 @@ class LocalPipelineLock(PipelineLock): def acquire(self) -> bool: - if not self.__lock_obj: - self.__lock_obj = True + if not self._lock_obj: + self._lock_obj = True return True else: return False def has_lock(self) -> bool: - return self.__lock_obj + return self._lock_obj def extend_lease(self): pass def release(self): - self.__lock_obj = False + self._lock_obj = False def read_key(self, key: str): return self.__key_value[key] @@ -26,5 +26,5 @@ def write_key(self, key: str, value): def __init__(self, lock_timeout: int = 60): super().__init__(lock_timeout) - self.__lock_obj = False + self._lock_obj = False self.__key_value = {} \ No newline at end of file diff --git a/watergrid/middleware/context/LockInjectorMiddleware.py b/watergrid/middleware/context/LockInjectorMiddleware.py new file mode 100644 index 0000000..2330bf6 --- /dev/null +++ b/watergrid/middleware/context/LockInjectorMiddleware.py @@ -0,0 +1,21 @@ +from watergrid.context import DataContext +from watergrid.locks.LocalPipelineLock import LocalPipelineLock +from watergrid.middleware.ContextMiddleware import ContextMiddleware +from watergrid.steps import Step + + +class LockInjectorMiddleware(ContextMiddleware): + """ + Modifies a context before an after a step to ensure that the lock context + is not deep copied as when using local locks this will break the functionality + of the lock. + """ + def pre_context(self, step: Step, context: DataContext): + context.lock = self._pipeline.lock + + def post_context(self, step: Step, context: DataContext): + context.lock = None + + def __init__(self, pipeline): + super().__init__() + self._pipeline = pipeline \ No newline at end of file diff --git a/watergrid/pipelines/pipeline.py b/watergrid/pipelines/pipeline.py index cdb93d5..00c33b0 100644 --- a/watergrid/pipelines/pipeline.py +++ b/watergrid/pipelines/pipeline.py @@ -6,16 +6,19 @@ import pycron from watergrid.context import DataContext, OutputMode, ContextMetadata +from watergrid.locks.LocalPipelineLock import LocalPipelineLock +from watergrid.locks.PipelineLock import PipelineLock from watergrid.metrics.MetricsStore import MetricsStore from watergrid.middleware.context.ContextMetricsMiddleware import \ ContextMetricsMiddleware +from watergrid.middleware.context.LockInjectorMiddleware import \ + LockInjectorMiddleware from watergrid.middleware.pipeline.LastRunMiddleware import LastRunMiddleware from watergrid.middleware.StepMiddleware import StepMiddleware from watergrid.middleware.pipeline.PipelineMetricsMiddleware import \ PipelineMetricsMiddleware from watergrid.middleware.pipeline.StepOrderingMiddleware import \ StepOrderingMiddleware -from watergrid.pipelines.pipeline_verifier import PipelineVerifier from watergrid.steps import Sequence from watergrid.steps import Step @@ -28,7 +31,7 @@ class Pipeline(ABC): :type pipeline_name: str """ - def __init__(self, pipeline_name: str): + def __init__(self, pipeline_name: str, lock: PipelineLock = LocalPipelineLock()): self._pipeline_name = pipeline_name self._steps = [] self._metrics_store = MetricsStore() @@ -40,6 +43,8 @@ def __init__(self, pipeline_name: str): self._pipeline_middleware.append(self._lastrun_tracker) self._pipeline_middleware.append(PipelineMetricsMiddleware(self._metrics_store)) self._context_middleware.append(ContextMetricsMiddleware(self._metrics_store)) + self._context_middleware.append(LockInjectorMiddleware(self)) + self.lock = lock @property def _last_run(self): @@ -181,13 +186,6 @@ def get_pipeline_guid(self): result += step.get_step_name() + type(step).__name__ return base64.urlsafe_b64encode(result.encode("utf-8")) - def _add_step_middleware(self, middleware: StepMiddleware): - """ - Adds a middlware layer that will run before and after each step - in the pipeline. - """ - self._middleware.append(middleware) - def __generate_first_context(self) -> DataContext: """ Generates the first context for the pipeline. From a1b8ade8bc0bf5a7bc42d092f22370480b45d48c Mon Sep 17 00:00:00 2001 From: Joshua Zenn Date: Sat, 28 May 2022 21:52:46 -0400 Subject: [PATCH 4/5] #80 Linter fixes --- watergrid/locks/LocalPipelineLock.py | 2 +- .../context/LockInjectorMiddleware.py | 6 ++--- .../pipeline/StepOrderingMiddleware.py | 2 +- watergrid/pipelines/pipeline.py | 25 +++++++++++-------- 4 files changed, 20 insertions(+), 15 deletions(-) diff --git a/watergrid/locks/LocalPipelineLock.py b/watergrid/locks/LocalPipelineLock.py index 749f651..7e13326 100644 --- a/watergrid/locks/LocalPipelineLock.py +++ b/watergrid/locks/LocalPipelineLock.py @@ -27,4 +27,4 @@ def write_key(self, key: str, value): def __init__(self, lock_timeout: int = 60): super().__init__(lock_timeout) self._lock_obj = False - self.__key_value = {} \ No newline at end of file + self.__key_value = {} diff --git a/watergrid/middleware/context/LockInjectorMiddleware.py b/watergrid/middleware/context/LockInjectorMiddleware.py index 2330bf6..26b5417 100644 --- a/watergrid/middleware/context/LockInjectorMiddleware.py +++ b/watergrid/middleware/context/LockInjectorMiddleware.py @@ -7,8 +7,8 @@ class LockInjectorMiddleware(ContextMiddleware): """ Modifies a context before an after a step to ensure that the lock context - is not deep copied as when using local locks this will break the functionality - of the lock. + is not deep copied as when using local locks this will break the + functionality of the lock. """ def pre_context(self, step: Step, context: DataContext): context.lock = self._pipeline.lock @@ -18,4 +18,4 @@ def post_context(self, step: Step, context: DataContext): def __init__(self, pipeline): super().__init__() - self._pipeline = pipeline \ No newline at end of file + self._pipeline = pipeline diff --git a/watergrid/middleware/pipeline/StepOrderingMiddleware.py b/watergrid/middleware/pipeline/StepOrderingMiddleware.py index 31b22ab..32a2bbd 100644 --- a/watergrid/middleware/pipeline/StepOrderingMiddleware.py +++ b/watergrid/middleware/pipeline/StepOrderingMiddleware.py @@ -11,4 +11,4 @@ def post_pipeline(self, pipeline): pass def __init__(self): - super().__init__() \ No newline at end of file + super().__init__() diff --git a/watergrid/pipelines/pipeline.py b/watergrid/pipelines/pipeline.py index 00c33b0..dac8190 100644 --- a/watergrid/pipelines/pipeline.py +++ b/watergrid/pipelines/pipeline.py @@ -9,16 +9,19 @@ from watergrid.locks.LocalPipelineLock import LocalPipelineLock from watergrid.locks.PipelineLock import PipelineLock from watergrid.metrics.MetricsStore import MetricsStore -from watergrid.middleware.context.ContextMetricsMiddleware import \ - ContextMetricsMiddleware -from watergrid.middleware.context.LockInjectorMiddleware import \ - LockInjectorMiddleware +from watergrid.middleware.context.ContextMetricsMiddleware import ( + ContextMetricsMiddleware, +) +from watergrid.middleware.context.LockInjectorMiddleware import ( + LockInjectorMiddleware, +) from watergrid.middleware.pipeline.LastRunMiddleware import LastRunMiddleware -from watergrid.middleware.StepMiddleware import StepMiddleware -from watergrid.middleware.pipeline.PipelineMetricsMiddleware import \ - PipelineMetricsMiddleware -from watergrid.middleware.pipeline.StepOrderingMiddleware import \ - StepOrderingMiddleware +from watergrid.middleware.pipeline.PipelineMetricsMiddleware import ( + PipelineMetricsMiddleware, +) +from watergrid.middleware.pipeline.StepOrderingMiddleware import ( + StepOrderingMiddleware, +) from watergrid.steps import Sequence from watergrid.steps import Step @@ -114,7 +117,9 @@ def __step_middleware_post(self, step: Step, contexts: list): for middleware in reversed(self._step_middleware): middleware.post_step(self, step, contexts) - def __context_middleware_wrapper(self, step: Step, context: DataContext) -> DataContext: + def __context_middleware_wrapper( + self, step: Step, context: DataContext + ) -> DataContext: self.__context_middleware_pre(step, context) step.run_step(context) self.__context_middleware_post(step, context) From 026e94fa09dd05fb8702f768a1a79739a9df3d77 Mon Sep 17 00:00:00 2001 From: Joshua Zenn Date: Sat, 28 May 2022 21:59:31 -0400 Subject: [PATCH 5/5] #80 Even more linter fixes --- test/custom_step_lock_tests.py | 2 +- watergrid/middleware/context/LockInjectorMiddleware.py | 1 + watergrid/middleware/pipeline/LastRunMiddleware.py | 1 - watergrid/middleware/pipeline/StepOrderingMiddleware.py | 4 +++- watergrid/pipelines/pipeline.py | 2 +- 5 files changed, 6 insertions(+), 4 deletions(-) diff --git a/test/custom_step_lock_tests.py b/test/custom_step_lock_tests.py index 4f80c38..4708262 100644 --- a/test/custom_step_lock_tests.py +++ b/test/custom_step_lock_tests.py @@ -36,5 +36,5 @@ def test_can_use_lock(self): self.assertTrue(mock_step.get_flag()) -if __name__ == '__main__': +if __name__ == "__main__": unittest.main() diff --git a/watergrid/middleware/context/LockInjectorMiddleware.py b/watergrid/middleware/context/LockInjectorMiddleware.py index 26b5417..e981a06 100644 --- a/watergrid/middleware/context/LockInjectorMiddleware.py +++ b/watergrid/middleware/context/LockInjectorMiddleware.py @@ -10,6 +10,7 @@ class LockInjectorMiddleware(ContextMiddleware): is not deep copied as when using local locks this will break the functionality of the lock. """ + def pre_context(self, step: Step, context: DataContext): context.lock = self._pipeline.lock diff --git a/watergrid/middleware/pipeline/LastRunMiddleware.py b/watergrid/middleware/pipeline/LastRunMiddleware.py index 25f1d7a..3e09da8 100644 --- a/watergrid/middleware/pipeline/LastRunMiddleware.py +++ b/watergrid/middleware/pipeline/LastRunMiddleware.py @@ -17,4 +17,3 @@ def post_pipeline(self, pipeline): def __init__(self): super().__init__() self._last_run = None - diff --git a/watergrid/middleware/pipeline/StepOrderingMiddleware.py b/watergrid/middleware/pipeline/StepOrderingMiddleware.py index 32a2bbd..a224715 100644 --- a/watergrid/middleware/pipeline/StepOrderingMiddleware.py +++ b/watergrid/middleware/pipeline/StepOrderingMiddleware.py @@ -4,7 +4,9 @@ class StepOrderingMiddleware(PipelineMiddleware): def pre_pipeline(self, pipeline): - PipelineVerifier.verify_pipeline_dependencies_fulfilled(pipeline._steps) + PipelineVerifier.verify_pipeline_dependencies_fulfilled( + pipeline._steps + ) PipelineVerifier.verify_pipeline_step_ordering(pipeline._steps) def post_pipeline(self, pipeline): diff --git a/watergrid/pipelines/pipeline.py b/watergrid/pipelines/pipeline.py index dac8190..a038f64 100644 --- a/watergrid/pipelines/pipeline.py +++ b/watergrid/pipelines/pipeline.py @@ -118,7 +118,7 @@ def __step_middleware_post(self, step: Step, contexts: list): middleware.post_step(self, step, contexts) def __context_middleware_wrapper( - self, step: Step, context: DataContext + self, step: Step, context: DataContext ) -> DataContext: self.__context_middleware_pre(step, context) step.run_step(context)