Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,28 +3,35 @@
## [Unreleased]

### Added

- `ConsoleMetricsExporter` for locally debugging pipelines without an APM service. (#46)
- Built-in Elastic APM metrics exporter. (#47)
- `Sequence` class for logical groupings of steps. (#53)

### Changed

- Bumped `redis` dependency to 4.2.2. (#50)
- Dependencies for `MetricsExporter` and `PipelineLock` modules must now be installed separately through `watergrid[...]` metapackages. (#54)
- Bumped `elastic-apm` to 6.9.1. (#56)

### Deprecated

### Removed

- Dependencies for `MetricsExporter` and `PipelineLock` modules are no longer included in the base package and must
now be installed separately through `watergrid[...]` metapackages. (#54)

### Fixed

### Security

## [1.0.1] - 2022-04-01
### Fixed

- Broken release pipeline to PyPI.

## [1.0.0] - 2022-04-01
### Added

- Core library functionality.
- High availability pipeline using the `HAPipeline` class.
- `RedisLock` for high availability mode using Redis.
Expand All @@ -34,6 +41,7 @@
- Base class for building a metric exporter to an external APM solution.

### Fixed

- Staggered node startup no longer causes mid-interval pipeline runs on other nodes in HA mode.


Expand Down
38 changes: 38 additions & 0 deletions test/sequence_tests.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import unittest

from watergrid.context import DataContext
from watergrid.pipelines.pipeline import Pipeline
from watergrid.steps import Sequence
from watergrid.steps import Step


class MockStep(Step):
def __init__(self):
super().__init__("mock_step")
self.flag = False

def run(self, context: DataContext):
self.flag = True

def get_flag(self) -> bool:
return self.flag


class TestSequence(Sequence):
def __init__(self):
super().__init__("test_sequence")


class SequenceTestCase(unittest.TestCase):
def test_sequence_adds_steps(self):
pipeline = Pipeline("test_pipeline")
mock_step = MockStep()
sequence = TestSequence()
sequence.add_step(mock_step)
pipeline.add_steps(sequence)
pipeline.run()
self.assertTrue(mock_step.get_flag())


if __name__ == "__main__":
unittest.main()
10 changes: 10 additions & 0 deletions watergrid/pipelines/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from watergrid.context import DataContext, OutputMode, ContextMetadata
from watergrid.metrics.MetricsStore import MetricsStore
from watergrid.pipelines.pipeline_verifier import PipelineVerifier
from watergrid.steps import Sequence
from watergrid.steps import Step


Expand All @@ -32,6 +33,15 @@ def add_step(self, step: Step):
"""
self._steps.append(step)

def add_steps(self, steps: Sequence):
"""
Adds a sequence of steps to the pipeline.
:param steps: Sequence implementation that contains the steps to add.
:return: None
"""
for step in steps.export_steps():
self.add_step(step)

def run(self):
"""
Blocking operation that runs all steps in the pipeline once.
Expand Down
1 change: 1 addition & 0 deletions watergrid/steps/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
from watergrid.steps.step import Step
from watergrid.steps.sequence import Sequence
31 changes: 31 additions & 0 deletions watergrid/steps/sequence.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from abc import ABC

from watergrid.steps import Step


class Sequence(ABC):
"""
A sequence object is used to form a logical grouping of steps. It can be
used to simplify your pipeline initialization code, or it can be used to
rapidly re-use multiple steps at once.
"""

def __init__(self, name: str):
self.name = name
self.steps = []

def add_step(self, step: Step) -> None:
"""
Adds a step to the sequence.
:param step: Step to add.
:return: None
"""
self.steps.append(step)

def export_steps(self) -> list:
"""
Returns a list of all steps in the sequence. Used internally by the
pipeline class.
:return: List of all steps.
"""
return self.steps