From 933870d24923b51d653441aefc393247dc7c609c Mon Sep 17 00:00:00 2001 From: Joshua Zenn Date: Mon, 11 Apr 2022 09:25:40 -0400 Subject: [PATCH 1/2] #53 Sequence groupings --- CHANGELOG.md | 3 ++- test/sequence_tests.py | 38 +++++++++++++++++++++++++++++++++ watergrid/pipelines/pipeline.py | 10 +++++++++ watergrid/steps/__init__.py | 1 + watergrid/steps/sequence.py | 28 ++++++++++++++++++++++++ 5 files changed, 79 insertions(+), 1 deletion(-) create mode 100644 test/sequence_tests.py create mode 100644 watergrid/steps/sequence.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 826875e..c850ae3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,15 +5,16 @@ ### Added - `ConsoleMetricsExporter` for locally debugging pipelines without an APM service. (#46) - Built-in Elastic APM metrics exporter. (#47) +- `Sequence` class for logical groupings of steps. (#53) ### Changed - Bumped `redis` dependency to 4.2.2. (#50) -- Dependencies for `MetricsExporter` and `PipelineLock` modules must now be installed separately through `watergrid[...]` metapackages. (#54) - Bumped `elastic-apm` to 6.9.1. (#56) ### Deprecated ### Removed +- Dependencies for `MetricsExporter` and `PipelineLock` modules are no longer included in the base package and must now be installed separately through `watergrid[...]` metapackages. (#54) ### Fixed diff --git a/test/sequence_tests.py b/test/sequence_tests.py new file mode 100644 index 0000000..e1ee361 --- /dev/null +++ b/test/sequence_tests.py @@ -0,0 +1,38 @@ +import unittest + +from watergrid.context import DataContext +from watergrid.pipelines.pipeline import Pipeline +from watergrid.steps import Sequence +from watergrid.steps import Step + + +class MockStep(Step): + def __init__(self): + super().__init__("mock_step") + self.flag = False + + def run(self, context: DataContext): + self.flag = True + + def get_flag(self) -> bool: + return self.flag + + +class TestSequence(Sequence): + def __init__(self): + super().__init__("test_sequence") + + +class SequenceTestCase(unittest.TestCase): + def test_sequence_adds_steps(self): + pipeline = Pipeline("test_pipeline") + mock_step = MockStep() + sequence = TestSequence() + sequence.add_step(mock_step) + pipeline.add_steps(sequence) + pipeline.run() + self.assertTrue(mock_step.get_flag()) + + +if __name__ == "__main__": + unittest.main() diff --git a/watergrid/pipelines/pipeline.py b/watergrid/pipelines/pipeline.py index 086cc96..6b7c958 100644 --- a/watergrid/pipelines/pipeline.py +++ b/watergrid/pipelines/pipeline.py @@ -6,6 +6,7 @@ from watergrid.context import DataContext, OutputMode, ContextMetadata from watergrid.metrics.MetricsStore import MetricsStore from watergrid.pipelines.pipeline_verifier import PipelineVerifier +from watergrid.steps import Sequence from watergrid.steps import Step @@ -32,6 +33,15 @@ def add_step(self, step: Step): """ self._steps.append(step) + def add_steps(self, steps: Sequence): + """ + Adds a sequence of steps to the pipeline. + :param steps: Sequence implementation that contains the steps to add. + :return: None + """ + for step in steps.export_steps(): + self.add_step(step) + def run(self): """ Blocking operation that runs all steps in the pipeline once. diff --git a/watergrid/steps/__init__.py b/watergrid/steps/__init__.py index ea99d01..1f10640 100644 --- a/watergrid/steps/__init__.py +++ b/watergrid/steps/__init__.py @@ -1 +1,2 @@ from watergrid.steps.step import Step +from watergrid.steps.sequence import Sequence diff --git a/watergrid/steps/sequence.py b/watergrid/steps/sequence.py new file mode 100644 index 0000000..eb47878 --- /dev/null +++ b/watergrid/steps/sequence.py @@ -0,0 +1,28 @@ +from abc import ABC + +from watergrid.steps import Step + + +class Sequence(ABC): + """ + A sequence object is used to form a logical grouping of steps. It can be used to simplify your pipeline + initialization code, or it can be used to rapidly re-use multiple steps at once. + """ + def __init__(self, name: str): + self.name = name + self.steps = [] + + def add_step(self, step: Step) -> None: + """ + Adds a step to the sequence. + :param step: Step to add. + :return: None + """ + self.steps.append(step) + + def export_steps(self) -> list: + """ + Returns a list of all steps in the sequence. Used internally by the pipeline class. + :return: List of all steps. + """ + return self.steps From 25ce1a220b9fd439054ea5a58831993ff6655146 Mon Sep 17 00:00:00 2001 From: Joshua Zenn Date: Mon, 11 Apr 2022 20:21:31 -0400 Subject: [PATCH 2/2] #53 Linter fixes --- CHANGELOG.md | 9 ++++++++- watergrid/steps/sequence.py | 9 ++++++--- 2 files changed, 14 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c850ae3..6081b51 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,18 +3,22 @@ ## [Unreleased] ### Added + - `ConsoleMetricsExporter` for locally debugging pipelines without an APM service. (#46) - Built-in Elastic APM metrics exporter. (#47) - `Sequence` class for logical groupings of steps. (#53) ### Changed + - Bumped `redis` dependency to 4.2.2. (#50) - Bumped `elastic-apm` to 6.9.1. (#56) ### Deprecated ### Removed -- Dependencies for `MetricsExporter` and `PipelineLock` modules are no longer included in the base package and must now be installed separately through `watergrid[...]` metapackages. (#54) + +- Dependencies for `MetricsExporter` and `PipelineLock` modules are no longer included in the base package and must +now be installed separately through `watergrid[...]` metapackages. (#54) ### Fixed @@ -22,10 +26,12 @@ ## [1.0.1] - 2022-04-01 ### Fixed + - Broken release pipeline to PyPI. ## [1.0.0] - 2022-04-01 ### Added + - Core library functionality. - High availability pipeline using the `HAPipeline` class. - `RedisLock` for high availability mode using Redis. @@ -35,6 +41,7 @@ - Base class for building a metric exporter to an external APM solution. ### Fixed + - Staggered node startup no longer causes mid-interval pipeline runs on other nodes in HA mode. diff --git a/watergrid/steps/sequence.py b/watergrid/steps/sequence.py index eb47878..b9bc1fb 100644 --- a/watergrid/steps/sequence.py +++ b/watergrid/steps/sequence.py @@ -5,9 +5,11 @@ class Sequence(ABC): """ - A sequence object is used to form a logical grouping of steps. It can be used to simplify your pipeline - initialization code, or it can be used to rapidly re-use multiple steps at once. + A sequence object is used to form a logical grouping of steps. It can be + used to simplify your pipeline initialization code, or it can be used to + rapidly re-use multiple steps at once. """ + def __init__(self, name: str): self.name = name self.steps = [] @@ -22,7 +24,8 @@ def add_step(self, step: Step) -> None: def export_steps(self) -> list: """ - Returns a list of all steps in the sequence. Used internally by the pipeline class. + Returns a list of all steps in the sequence. Used internally by the + pipeline class. :return: List of all steps. """ return self.steps