diff --git a/CHANGELOG.md b/CHANGELOG.md index 1369e20..51b4d26 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ ## [Unreleased] ### Added +- `ConsoleMetricsExporter` for locally debugging pipelines without an APM service. ### Changed - Bumped redis dependency to 4.2.2. diff --git a/test/console_me_tests.py b/test/console_me_tests.py new file mode 100644 index 0000000..af827d2 --- /dev/null +++ b/test/console_me_tests.py @@ -0,0 +1,60 @@ +import unittest + +from watergrid.context import DataContext +from watergrid.metrics.ConsoleMetricsExporter import ConsoleMetricsExporter +from watergrid.pipelines.pipeline import Pipeline +from watergrid.steps import Step + + +class MockStep(Step): + def __init__(self, throw_exception=False): + self.throw_exception = throw_exception + super().__init__(self.__class__.__name__) + + def run(self, context: DataContext): + if self.throw_exception: + raise Exception("MockStep failed") + + +class ConsoleMetricsExporterTestCase(unittest.TestCase): + def test_outputs_pipeline_start(self): + pipeline = Pipeline("test_pipeline") + pipeline.add_metrics_exporter(ConsoleMetricsExporter()) + with self.assertLogs() as captured: + pipeline.run() + self.assertIn("INFO:root:Starting pipeline: test_pipeline", captured.output) + + def test_outputs_pipeline_end(self): + pipeline = Pipeline("test_pipeline") + pipeline.add_metrics_exporter(ConsoleMetricsExporter()) + with self.assertLogs() as captured: + pipeline.run() + self.assertIn("INFO:root:Ending pipeline", captured.output) + + def test_outputs_pipeline_end_with_error(self): + pipeline = Pipeline("test_pipeline") + pipeline.add_metrics_exporter(ConsoleMetricsExporter()) + pipeline.add_step(MockStep(throw_exception=True)) + with self.assertLogs() as captured: + pipeline.run() + self.assertIn("ERROR:root:Exception: MockStep failed", captured.output) + + def test_outputs_step_start(self): + pipeline = Pipeline("test_pipeline") + pipeline.add_metrics_exporter(ConsoleMetricsExporter()) + pipeline.add_step(MockStep()) + with self.assertLogs() as captured: + pipeline.run() + self.assertIn("INFO:root:Starting step: MockStep", captured.output) + + def test_outputs_step_end(self): + pipeline = Pipeline("test_pipeline") + pipeline.add_metrics_exporter(ConsoleMetricsExporter()) + pipeline.add_step(MockStep()) + with self.assertLogs() as captured: + pipeline.run() + self.assertIn("INFO:root:Ending step", captured.output) + + +if __name__ == "__main__": + unittest.main() diff --git a/watergrid/metrics/ConsoleMetricsExporter.py b/watergrid/metrics/ConsoleMetricsExporter.py new file mode 100644 index 0000000..50060ab --- /dev/null +++ b/watergrid/metrics/ConsoleMetricsExporter.py @@ -0,0 +1,25 @@ +import logging + +from watergrid.metrics.MetricsExporter import MetricsExporter + + +class ConsoleMetricsExporter(MetricsExporter): + def __init__(self): + super().__init__() + logging.basicConfig(level=logging.INFO) + self._logger = logging.getLogger(__name__) + + def start_pipeline(self, pipeline_name): + logging.info("Starting pipeline: " + pipeline_name) + + def end_pipeline(self): + logging.info("Ending pipeline") + + def start_step(self, step_name): + logging.info("Starting step: " + step_name) + + def end_step(self): + logging.info("Ending step") + + def capture_exception(self, exception: Exception): + logging.error("Exception: " + str(exception))