Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,14 @@

### Added

- LocalPipelineLock for single-thread non-networked pipeline applications. (#80)
- Local lock interface is now exposed through the DataContext for use of arbitrary locks in the pipeline. (#80)

### Changed

- Bumped `redis` dependency to 4.3.1. (#94)
- Pipeline class now uses middleware architecture for customizable pre/post run actions. (#80)

### Deprecated

### Removed
Expand Down
40 changes: 40 additions & 0 deletions test/custom_step_lock_tests.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import unittest

from watergrid.context import DataContext
from watergrid.pipelines.pipeline import Pipeline
from watergrid.steps import Step


class MockSetLockStep(Step):
def __init__(self):
super().__init__(self.__class__.__name__)

def run(self, context: DataContext):
if not context.lock.acquire():
raise Exception("Unable to obtain lock")


class MockVerifyStep(Step):
def __init__(self):
super().__init__(self.__class__.__name__)
self.mock_flag = False

def run(self, context: DataContext):
self.mock_flag = context.lock.has_lock()

def get_flag(self) -> bool:
return self.mock_flag


class CustomStepLockTestCase(unittest.TestCase):
def test_can_use_lock(self):
mock_step = MockVerifyStep()
pipeline = Pipeline("test_pipeline")
pipeline.add_step(MockSetLockStep())
pipeline.add_step(mock_step)
pipeline.run()
self.assertTrue(mock_step.get_flag())


if __name__ == "__main__":
unittest.main()
10 changes: 10 additions & 0 deletions watergrid/context/data_context.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from watergrid.context import OutputMode
from watergrid.context.context_metadata import ContextMetadata
from watergrid.locks.PipelineLock import PipelineLock


class DataContext:
Expand All @@ -12,6 +13,7 @@ def __init__(self):
self.data = {}
self.output_mode = OutputMode.DIRECT
self.metadata = ContextMetadata()
self._lock = None

def set(self, key: str, value: object) -> None:
"""
Expand Down Expand Up @@ -91,6 +93,14 @@ def set_run_metadata(self, metadata: ContextMetadata) -> None:
"""
self.metadata = metadata

@property
def lock(self):
return self._lock

@lock.setter
def lock(self, lock: PipelineLock):
self._lock = lock

@staticmethod
def deep_copy_context(context):
"""
Expand Down
30 changes: 30 additions & 0 deletions watergrid/locks/LocalPipelineLock.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
from watergrid.locks.PipelineLock import PipelineLock


class LocalPipelineLock(PipelineLock):
def acquire(self) -> bool:
if not self._lock_obj:
self._lock_obj = True
return True
else:
return False

def has_lock(self) -> bool:
return self._lock_obj

def extend_lease(self):
pass

def release(self):
self._lock_obj = False

def read_key(self, key: str):
return self.__key_value[key]

def write_key(self, key: str, value):
self.__key_value[key] = value

def __init__(self, lock_timeout: int = 60):
super().__init__(lock_timeout)
self._lock_obj = False
self.__key_value = {}
22 changes: 22 additions & 0 deletions watergrid/middleware/ContextMiddleware.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from abc import abstractmethod, ABC

from watergrid.context import DataContext
from watergrid.steps import Step


class ContextMiddleware(ABC):
"""
Abstract class representing an action that is run before and after
every context run within each step in a pipeline.
"""

def __init__(self):
pass

@abstractmethod
def pre_context(self, step: Step, context: DataContext):
pass

@abstractmethod
def post_context(self, step: Step, context: DataContext):
pass
19 changes: 19 additions & 0 deletions watergrid/middleware/PipelineMiddlware.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from abc import ABC, abstractmethod


class PipelineMiddleware(ABC):
"""
Abstract class representing an action that is run before and after
every run of the pipeline.
"""

def __init__(self):
pass

@abstractmethod
def pre_pipeline(self, pipeline):
pass

@abstractmethod
def post_pipeline(self, pipeline):
pass
21 changes: 21 additions & 0 deletions watergrid/middleware/StepMiddleware.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from abc import ABC, abstractmethod

from watergrid.steps import Step


class StepMiddleware(ABC):
"""
Abstract class representing an action that runs before and after each
action.
"""

def __init__(self):
pass

@abstractmethod
def pre_step(self, step: Step, contexts: list) -> None:
pass

@abstractmethod
def post_step(self, step: Step, contexts: list) -> None:
pass
Empty file.
16 changes: 16 additions & 0 deletions watergrid/middleware/context/ContextMetricsMiddleware.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from watergrid.context import DataContext
from watergrid.metrics.MetricsStore import MetricsStore
from watergrid.middleware.ContextMiddleware import ContextMiddleware
from watergrid.steps import Step


class ContextMetricsMiddleware(ContextMiddleware):
def pre_context(self, step: Step, context: DataContext):
self._store.start_step_monitoring(step.get_step_name())

def post_context(self, step: Step, context: DataContext):
self._store.stop_step_monitoring()

def __init__(self, store: MetricsStore):
super().__init__()
self._store = store
22 changes: 22 additions & 0 deletions watergrid/middleware/context/LockInjectorMiddleware.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from watergrid.context import DataContext
from watergrid.locks.LocalPipelineLock import LocalPipelineLock
from watergrid.middleware.ContextMiddleware import ContextMiddleware
from watergrid.steps import Step


class LockInjectorMiddleware(ContextMiddleware):
"""
Modifies a context before an after a step to ensure that the lock context
is not deep copied as when using local locks this will break the
functionality of the lock.
"""

def pre_context(self, step: Step, context: DataContext):
context.lock = self._pipeline.lock

def post_context(self, step: Step, context: DataContext):
context.lock = None

def __init__(self, pipeline):
super().__init__()
self._pipeline = pipeline
Empty file.
19 changes: 19 additions & 0 deletions watergrid/middleware/pipeline/LastRunMiddleware.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import time

from watergrid.middleware.PipelineMiddlware import PipelineMiddleware


class LastRunMiddleware(PipelineMiddleware):
@property
def last_run(self):
return self._last_run

def pre_pipeline(self, pipeline):
pass

def post_pipeline(self, pipeline):
self._last_run = time.time()

def __init__(self):
super().__init__()
self._last_run = None
14 changes: 14 additions & 0 deletions watergrid/middleware/pipeline/PipelineMetricsMiddleware.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from watergrid.metrics.MetricsStore import MetricsStore
from watergrid.middleware.PipelineMiddlware import PipelineMiddleware


class PipelineMetricsMiddleware(PipelineMiddleware):
def __init__(self, store: MetricsStore):
super().__init__()
self._store = store

def pre_pipeline(self, pipeline):
self._store.start_pipeline_monitoring(pipeline.get_pipeline_name())

def post_pipeline(self, pipeline):
self._store.stop_pipeline_monitoring()
16 changes: 16 additions & 0 deletions watergrid/middleware/pipeline/StepOrderingMiddleware.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from watergrid.middleware.PipelineMiddlware import PipelineMiddleware
from watergrid.pipelines.pipeline_verifier import PipelineVerifier


class StepOrderingMiddleware(PipelineMiddleware):
def pre_pipeline(self, pipeline):
PipelineVerifier.verify_pipeline_dependencies_fulfilled(
pipeline._steps
)
PipelineVerifier.verify_pipeline_step_ordering(pipeline._steps)

def post_pipeline(self, pipeline):
pass

def __init__(self):
super().__init__()
Empty file.
10 changes: 4 additions & 6 deletions watergrid/pipelines/ha_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,12 @@ def __init__(self, pipeline_name: str, pipeline_lock: PipelineLock):
self.__lock_timings = []

def run(self):
self.verify_pipeline()
if self.__pipeline_lock.lock():
super().run()
self.__pipeline_lock.unlock()
else:
logging.debug(
"Pipeline {} is already running on another instance".format(
"pipeline {} is already running on another instance".format(
self.get_pipeline_name()
)
)
Expand All @@ -37,7 +36,6 @@ def run_interval(self, job_interval_s: int) -> None:
:param job_interval_s: Number of seconds to wait between pipeline runs.
:return: None
"""
self.verify_pipeline()
while True:
self._run_interval_loop(job_interval_s)
time.sleep(self._calculate_delay(job_interval_s) / 1000)
Expand Down Expand Up @@ -78,7 +76,7 @@ def _verify_lock_metadata(self, job_interval_s: int) -> None:
self._set_last_run(time.time() - job_interval_s)
if time.time() - last_run > job_interval_s * 3:
logging.warning(
"Pipeline {} has fallen more than three cycles behind. Consider increasing the job interval or "
"pipeline {} has fallen more than three cycles behind. Consider increasing the job interval or "
"provisioning more machines.".format(self.get_pipeline_name())
)
self._set_last_run(time.time() - job_interval_s)
Expand All @@ -90,7 +88,7 @@ def _handle_lock_acquire_failure(self) -> None:
:return: None
"""
logging.debug(
"Pipeline {} is already running on another instance".format(
"pipeline {} is already running on another instance".format(
self.get_pipeline_name()
)
)
Expand All @@ -117,7 +115,7 @@ def _get_last_run(self) -> float:
def _get_pipeline_lock_name(self) -> str:
"""
Builds the name of the lock assigned to this pipeline.
:return: Pipeline lock name.
:return: pipeline lock name.
"""
return "{}_last_run".format(self.get_pipeline_name())

Expand Down
Loading