Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
## [Unreleased]

### Added
- `ConsoleMetricsExporter` for locally debugging pipelines without an APM service.

### Changed
- Bumped redis dependency to 4.2.2.
Expand Down
60 changes: 60 additions & 0 deletions test/console_me_tests.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import unittest

from watergrid.context import DataContext
from watergrid.metrics.ConsoleMetricsExporter import ConsoleMetricsExporter
from watergrid.pipelines.pipeline import Pipeline
from watergrid.steps import Step


class MockStep(Step):
def __init__(self, throw_exception=False):
self.throw_exception = throw_exception
super().__init__(self.__class__.__name__)

def run(self, context: DataContext):
if self.throw_exception:
raise Exception("MockStep failed")


class ConsoleMetricsExporterTestCase(unittest.TestCase):
def test_outputs_pipeline_start(self):
pipeline = Pipeline("test_pipeline")
pipeline.add_metrics_exporter(ConsoleMetricsExporter())
with self.assertLogs() as captured:
pipeline.run()
self.assertIn("INFO:root:Starting pipeline: test_pipeline", captured.output)

def test_outputs_pipeline_end(self):
pipeline = Pipeline("test_pipeline")
pipeline.add_metrics_exporter(ConsoleMetricsExporter())
with self.assertLogs() as captured:
pipeline.run()
self.assertIn("INFO:root:Ending pipeline", captured.output)

def test_outputs_pipeline_end_with_error(self):
pipeline = Pipeline("test_pipeline")
pipeline.add_metrics_exporter(ConsoleMetricsExporter())
pipeline.add_step(MockStep(throw_exception=True))
with self.assertLogs() as captured:
pipeline.run()
self.assertIn("ERROR:root:Exception: MockStep failed", captured.output)

def test_outputs_step_start(self):
pipeline = Pipeline("test_pipeline")
pipeline.add_metrics_exporter(ConsoleMetricsExporter())
pipeline.add_step(MockStep())
with self.assertLogs() as captured:
pipeline.run()
self.assertIn("INFO:root:Starting step: MockStep", captured.output)

def test_outputs_step_end(self):
pipeline = Pipeline("test_pipeline")
pipeline.add_metrics_exporter(ConsoleMetricsExporter())
pipeline.add_step(MockStep())
with self.assertLogs() as captured:
pipeline.run()
self.assertIn("INFO:root:Ending step", captured.output)


if __name__ == "__main__":
unittest.main()
25 changes: 25 additions & 0 deletions watergrid/metrics/ConsoleMetricsExporter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import logging

from watergrid.metrics.MetricsExporter import MetricsExporter


class ConsoleMetricsExporter(MetricsExporter):
def __init__(self):
super().__init__()
logging.basicConfig(level=logging.INFO)
self._logger = logging.getLogger(__name__)

def start_pipeline(self, pipeline_name):
logging.info("Starting pipeline: " + pipeline_name)

def end_pipeline(self):
logging.info("Ending pipeline")

def start_step(self, step_name):
logging.info("Starting step: " + step_name)

def end_step(self):
logging.info("Ending step")

def capture_exception(self, exception: Exception):
logging.error("Exception: " + str(exception))