Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 60 additions & 0 deletions test/pipeline_versioning_tests.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import unittest

from watergrid.context import DataContext
from watergrid.pipelines.pipeline import Pipeline
from watergrid.steps import Step


class MockStep(Step):
def __init__(self, name):
super().__init__(name)

def run(self, context: DataContext):
pass


class MockStep2(Step):
def __init__(self, name):
super().__init__(name)

def run(self, context: DataContext):
pass


class PipelineVersioningTestCase(unittest.TestCase):
def test_pipeline_has_guid_function(self):
pipeline = Pipeline("test_pipeline")
self.assertIsNotNone(pipeline.get_pipeline_guid())

def test_pipeline_guid_is_unique_by_name(self):
pipeline1 = Pipeline("test_pipeline1")
pipeline2 = Pipeline("test_pipeline2")
self.assertNotEqual(
pipeline1.get_pipeline_guid(), pipeline2.get_pipeline_guid()
)

def test_pipeline_guid_is_unique_by_steps(self):
pipeline1 = Pipeline("test_pipeline")
pipeline1.add_step(MockStep("step1"))
pipeline2 = Pipeline("test_pipeline")
pipeline2.add_step(MockStep("step2"))
self.assertNotEqual(
pipeline1.get_pipeline_guid(), pipeline2.get_pipeline_guid()
)

def test_pipeline_guid_is_unique_by_step_class_names(self):
pipeline1 = Pipeline("test_pipeline")
pipeline1.add_step(MockStep("step1"))
pipeline2 = Pipeline("test_pipeline")
pipeline2.add_step(MockStep2("step1"))
self.assertNotEqual(
pipeline1.get_pipeline_guid(), pipeline2.get_pipeline_guid()
)

def test_pipeline_has_clean_guid(self):
pipeline = Pipeline("test_pipeline")
self.assertNotIn(str(pipeline.get_pipeline_guid()), " ")


if __name__ == "__main__":
unittest.main()
13 changes: 13 additions & 0 deletions watergrid/pipelines/pipeline.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import base64
import copy
import time
from abc import ABC
Expand Down Expand Up @@ -112,6 +113,18 @@ def add_metrics_exporter(self, exporter):
"""
self._metrics_store.add_metrics_exporter(exporter)

def get_pipeline_guid(self):
"""
Generates a unique identifier for the pipeline that can be used to
identify all pipelines that have the same steps in the same order with
the same name.
:return: GUID of the pipeline.
"""
result = self._pipeline_name
for step in self._steps:
result += step.get_step_name() + type(step).__name__
return base64.urlsafe_b64encode(result.encode("utf-8"))

def __run_pipeline_steps(self):
"""
Performs setup and runs all steps in the pipeline.
Expand Down