diff --git a/CHANGELOG.md b/CHANGELOG.md index 826875e..6081b51 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,28 +3,35 @@ ## [Unreleased] ### Added + - `ConsoleMetricsExporter` for locally debugging pipelines without an APM service. (#46) - Built-in Elastic APM metrics exporter. (#47) +- `Sequence` class for logical groupings of steps. (#53) ### Changed + - Bumped `redis` dependency to 4.2.2. (#50) -- Dependencies for `MetricsExporter` and `PipelineLock` modules must now be installed separately through `watergrid[...]` metapackages. (#54) - Bumped `elastic-apm` to 6.9.1. (#56) ### Deprecated ### Removed +- Dependencies for `MetricsExporter` and `PipelineLock` modules are no longer included in the base package and must +now be installed separately through `watergrid[...]` metapackages. (#54) + ### Fixed ### Security ## [1.0.1] - 2022-04-01 ### Fixed + - Broken release pipeline to PyPI. ## [1.0.0] - 2022-04-01 ### Added + - Core library functionality. - High availability pipeline using the `HAPipeline` class. - `RedisLock` for high availability mode using Redis. @@ -34,6 +41,7 @@ - Base class for building a metric exporter to an external APM solution. ### Fixed + - Staggered node startup no longer causes mid-interval pipeline runs on other nodes in HA mode. diff --git a/test/sequence_tests.py b/test/sequence_tests.py new file mode 100644 index 0000000..e1ee361 --- /dev/null +++ b/test/sequence_tests.py @@ -0,0 +1,38 @@ +import unittest + +from watergrid.context import DataContext +from watergrid.pipelines.pipeline import Pipeline +from watergrid.steps import Sequence +from watergrid.steps import Step + + +class MockStep(Step): + def __init__(self): + super().__init__("mock_step") + self.flag = False + + def run(self, context: DataContext): + self.flag = True + + def get_flag(self) -> bool: + return self.flag + + +class TestSequence(Sequence): + def __init__(self): + super().__init__("test_sequence") + + +class SequenceTestCase(unittest.TestCase): + def test_sequence_adds_steps(self): + pipeline = Pipeline("test_pipeline") + mock_step = MockStep() + sequence = TestSequence() + sequence.add_step(mock_step) + pipeline.add_steps(sequence) + pipeline.run() + self.assertTrue(mock_step.get_flag()) + + +if __name__ == "__main__": + unittest.main() diff --git a/watergrid/pipelines/pipeline.py b/watergrid/pipelines/pipeline.py index 086cc96..6b7c958 100644 --- a/watergrid/pipelines/pipeline.py +++ b/watergrid/pipelines/pipeline.py @@ -6,6 +6,7 @@ from watergrid.context import DataContext, OutputMode, ContextMetadata from watergrid.metrics.MetricsStore import MetricsStore from watergrid.pipelines.pipeline_verifier import PipelineVerifier +from watergrid.steps import Sequence from watergrid.steps import Step @@ -32,6 +33,15 @@ def add_step(self, step: Step): """ self._steps.append(step) + def add_steps(self, steps: Sequence): + """ + Adds a sequence of steps to the pipeline. + :param steps: Sequence implementation that contains the steps to add. + :return: None + """ + for step in steps.export_steps(): + self.add_step(step) + def run(self): """ Blocking operation that runs all steps in the pipeline once. diff --git a/watergrid/steps/__init__.py b/watergrid/steps/__init__.py index ea99d01..1f10640 100644 --- a/watergrid/steps/__init__.py +++ b/watergrid/steps/__init__.py @@ -1 +1,2 @@ from watergrid.steps.step import Step +from watergrid.steps.sequence import Sequence diff --git a/watergrid/steps/sequence.py b/watergrid/steps/sequence.py new file mode 100644 index 0000000..b9bc1fb --- /dev/null +++ b/watergrid/steps/sequence.py @@ -0,0 +1,31 @@ +from abc import ABC + +from watergrid.steps import Step + + +class Sequence(ABC): + """ + A sequence object is used to form a logical grouping of steps. It can be + used to simplify your pipeline initialization code, or it can be used to + rapidly re-use multiple steps at once. + """ + + def __init__(self, name: str): + self.name = name + self.steps = [] + + def add_step(self, step: Step) -> None: + """ + Adds a step to the sequence. + :param step: Step to add. + :return: None + """ + self.steps.append(step) + + def export_steps(self) -> list: + """ + Returns a list of all steps in the sequence. Used internally by the + pipeline class. + :return: List of all steps. + """ + return self.steps