diff --git a/CHANGELOG.md b/CHANGELOG.md index 6081b51..942d88f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,8 @@ - Dependencies for `MetricsExporter` and `PipelineLock` modules are no longer included in the base package and must now be installed separately through `watergrid[...]` metapackages. (#54) +- Documentation from Readme. Docs now located at +[https://armmaster17.github.io/watergrid-python/](https://armmaster17.github.io/watergrid-python/). (#65) ### Fixed diff --git a/README.md b/README.md index 36a681f..eaac752 100644 --- a/README.md +++ b/README.md @@ -5,37 +5,32 @@ ![PyPI](https://img.shields.io/pypi/v/watergrid) ![License](https://img.shields.io/badge/license-MIT-blue.svg) -Lightweight distributed framework for data stream processing. +Watergrid is a lightweight, distributed framework for data stream processing. -# Getting Started -## Concepts -### Pipeline -A watergrid application is composed of a single pipeline, There are two types of pipelines: -- `StandalonePipeline` - For low impact use cases where simplicity is preferred. -- `HAPipeline` - For use cases where high availability is required along with only-once processing. +## Why Watergrid -### Steps -A pipeline is composed of one or more steps. Once you create a pipeline, you can add steps using the `add_step()` method. -Watergrid expects that you create a new class for each step that you want to perform in the pipeline. All steps must -inherit from the abstract `Step` class. Your steps should provide an override for the `run(context)` method. Inside this -method you can perform any actions you want. +- Watergrid lets developers write their ETL pipelines as applications, not +scripts or jobs. This lets you re-use your existing CI/CD infrastructure and +deployment practices. +- Watergrid encourages you to write your ETL operations as modular "steps", +making it easy to isolate and test atomic parts of your pipelines. +- Watergrid lets you scale up to multi-node clusters by changing only a few +lines of Python code. +- Watergrid is minimalistic, and easy to use. +- Watergrid does not depend on complicated software setups that execute jobs. +Everything is self-contained in the library itself. +- Watergrid lets you use your existing Redis infrastructure for distributed +jobs instead of a proprietary data storage/transmission solution. +- Watergrid includes an API for interfacing with an APM of your choice out of +the box. -### Context -The context is a key-value store that is passed to each step in the pipeline. You can use the context to store -data created in your step, and to access data created in previous steps. Changing the `OutputMode` of the context -in a set allows for splitting or filtering the context after the step completes. This can be used to have subsequent -steps run multiple times based on the output of the current step. +## Getting Started -### Locks -When using the `HAPipeline` class, you must provide a lock to prevent multiple instances from running the same pipeline -at the same time. The `RedisLock` class is provided by watergrid, but you are free to implement your own lock -using the `Lock` abstract class. +Creating an ETL pipeline wit Watergrid is as simple as: + +1. Run `pip install watergrid` +1. Paste the following code into a Python file: -## Standalone Mode -In standalone mode, only one instance of your application is required. This mode is the easiest to set up, -but does not provide any fault tolerance or host failover. -1. Install WaterGrid-Python `pip install git+https://github.com/ARMmaster17/watergrid-python.git@main` -2. Create a pipeline and run it. ```python from watergrid.pipelines import StandalonePipeline from watergrid.steps import Step @@ -44,124 +39,24 @@ from watergrid.context import DataContext class SampleStep(Step): def __init__(self): super().__init__(self.__class__.__name__) - - def run(self, context: DataContext): - print("Hello World!") - -def main(): - pipeline = StandalonePipeline('sample_pipeline') - pipeline.add_step(SampleStep()) - while True: - pipeline.run() - - -if __name__ == '__main__': - main() -``` - -## High Availability Mode -In HA mode, you can have several servers running on separate machines. Only -one server will be able to run the pipeline at a time. If a machine fails, another will take over. -1. Install WaterGrid-Python `pip install git+https://github.com/ARMmaster17/watergrid-python.git@main` -2. Install Redis (or use the `PipelineLock` to create your own global mutex). -3. Create a pipeline and run it. -```python -from watergrid.pipelines import HAPipeline -from watergrid.steps import Step -from watergrid.context import DataContext -from watergrid.locks import RedisPipelineLock - -class SampleStep(Step): - def __init__(self): - super().__init__(self.__class__.__name__) - def run(self, context: DataContext): print("Hello World!") def main(): - pipeline_name = "sample_pipeline" - redis_lock = RedisPipelineLock() - # Call redis_lock.set_XXXX to configure connection properties if needed. - pipeline = HAPipeline(pipeline_name, redis_lock) - pipeline.add_step(SampleStep()) - while True: - pipeline.run() + pipeline = StandalonePipeline('hello_world_pipeline') + pipeline.add_step(SampleStep()) + while True: + pipeline.run() - if __name__ == '__main__': - main() -``` - -If Redis is not running on localhost on port 6379, you can call `redis_lock.set_XXXX()` to set those values accordingly. - -# Step Operations - -## Creating Custom Steps -Every step of your pipeline should be its own class and inherit from the `Step` class. Here is an example: - -```python -from watergrid.steps import Step -from watergrid.context import DataContext - -class AddValueStep(Step): - def __init__(self): - # Use requires to denote which steps need to run before this one, and - # use provides to denote which steps can run after this one. - super().__init__(self.__class__.__name__, requires=['value'], provides=['added_value']) - - def run(self, context: DataContext): - # Use the context object to pass values between steps. - context.set('added_value', context.get('value') + 1) + main() ``` -Note that the `requires` and `provides` lists are optional. If you do not specify them, the step will run -in any order in the pipeline. The keys provided in the two lists can be arbitrary, and do not need to match -the name of your step or any of the context keys that it utilizes. - -## Split Steps -Sometimes you will have a pipeline step that will have a list of several values, -and you want to split the list so that each step will run once for each value. The split step -will perform the conversion of 1:X contexts after the step completes. -```python -from watergrid.steps import Step -from watergrid.context import DataContext -from watergrid.context import OutputMode +Check out the [documentation site](https://armmaster17.github.io/watergrid-python/getting_started.html) +to learn more. -class AddValuesStep(Step): - def __init__(self): - super().__init__(self.__class__.__name__, provides=['added_value']) - - def run(self, context: DataContext): - context.set('added_value', [0, 1, 2, 3, 4, 5]) - context.set_output_mode(OutputMode.SPLIT) - # The pipeline will automatically split the first key listed in provides[]. - # For example, in the next step context.get('added_value') will return 0. - # Then the next step will run again with the values 1, 2, 3, etc... -``` - -## Filter Steps -Filter steps have the option to pass back the value of `None`. If this is the case, this instance of the context will be deleted and not passed to the next step. Works great after split steps. - -Note that the pipeline will only filter the first field listed in the provides list. - -```python -from watergrid.steps import Step -from watergrid.context import DataContext -from watergrid.context import OutputMode -class FilterStep(Step): - def __init__(self): - super().__init__(self.__class__.__name__, requires=['value'], provides=['filtered_value']) - - def run(self, context: DataContext): - value = context.get('value') - context.set_output_mode(OutputMode.FILTER) - if value == 1: - context.set('filtered_value', value) # If the value is 1, pass it along to the next step. - else: - context.set('filtered_value', None) # If the value is zero, delete this context and don't pass it to the next step. -``` +## Example Projects -# Example Projects - [RSSMQ](https://github.com/ARMmaster17/rssmq/tree/126-refactor-to-use-watergrid) - Forwards RSS feed items to various HTTP APIs. - [atc-metrics-streamer](https://github.com/ARMmaster17/atc-metrics-streamer/tree/watergrid-transplant) - Streams metrics from Apache Traffic Control to Kafka. diff --git a/docs/concepts.rst b/docs/concepts.rst new file mode 100644 index 0000000..eb7f5ff --- /dev/null +++ b/docs/concepts.rst @@ -0,0 +1,32 @@ +Concepts +======== + +Pipeline +-------- + +A watergrid application is composed of a single pipeline, There are two types of pipelines: +- `StandalonePipeline` - For low impact use cases where simplicity is preferred. +- `HAPipeline` - For use cases where high availability is required along with only-once processing. + +Steps +----- + +A pipeline is composed of one or more steps. Once you create a pipeline, you can add steps using the `add_step()` method. +Watergrid expects that you create a new class for each step that you want to perform in the pipeline. All steps must +inherit from the abstract `Step` class. Your steps should provide an override for the `run(context)` method. Inside this +method you can perform any actions you want. + +Context +------- + +The context is a key-value store that is passed to each step in the pipeline. You can use the context to store +data created in your step, and to access data created in previous steps. Changing the `OutputMode` of the context +in a set allows for splitting or filtering the context after the step completes. This can be used to have subsequent +steps run multiple times based on the output of the current step. + +Locks +----- + +When using the `HAPipeline` class, you must provide a lock to prevent multiple instances from running the same pipeline +at the same time. The `RedisLock` class is provided by watergrid, but you are free to implement your own lock +using the `Lock` abstract class. \ No newline at end of file diff --git a/docs/conf.py b/docs/conf.py index 5b9bdb0..3f22247 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -22,7 +22,7 @@ author = "Joshua Zenn (ARMmaster17)" # The full version, including alpha/beta/rc tags -release = "1.0.0-rc2" +release = "1.0.1" # -- General configuration --------------------------------------------------- @@ -55,4 +55,4 @@ # so a file named "default.css" will overwrite the builtin "default.css". html_static_path = ["_static"] -html_baseurl = '/watergrid-python/' +html_baseurl = "/watergrid-python/" diff --git a/docs/getting_started.rst b/docs/getting_started.rst index 8cb7921..6e81ed1 100644 --- a/docs/getting_started.rst +++ b/docs/getting_started.rst @@ -29,7 +29,7 @@ Create a file called ``main.py`` and paste the following code into it: print("Hello World!") def main(): - pipeline = StandalonePipeline('tutorial_pipeline) + pipeline = StandalonePipeline('tutorial_pipeline') pipeline.add_step(SampleStep()) pipeline.run() diff --git a/docs/index.rst b/docs/index.rst index 0fc0889..d270676 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -28,8 +28,10 @@ Lightweight framework for creating ETL and automation pipelines. :caption: Contents: getting_started + concepts pipelines/standalone_pipeline pipelines/ha_pipeline + working_with_steps Indices and tables ================== diff --git a/docs/pipelines/ha_pipeline.rst b/docs/pipelines/ha_pipeline.rst index 0ec9e1b..b819090 100644 --- a/docs/pipelines/ha_pipeline.rst +++ b/docs/pipelines/ha_pipeline.rst @@ -1,7 +1,42 @@ HAPipeline ================== -.. autoclass:: watergrid.pipelines.HAPipeline - :members: - :undoc-members: - :inherited-members: \ No newline at end of file +In HA mode, you can have several servers running on separate machines. Only +one server will be able to run the pipeline at a time. If a machine fails, another will take over. + +Steps +----- + +1. Install WaterGrid-Python `pip install watergrid==1.0.1` +2. Install Redis (or use the `PipelineLock` class to create your own global mutex). +3. Create a pipeline and run it. + +.. code-block:: python + + from watergrid.pipelines import HAPipeline + from watergrid.steps import Step + from watergrid.context import DataContext + from watergrid.locks import RedisPipelineLock + + class SampleStep(Step): + def __init__(self): + super().__init__(self.__class__.__name__) + + def run(self, context: DataContext): + print("Hello World!") + + def main(): + pipeline_name = "sample_pipeline" + redis_lock = RedisPipelineLock() + # Call redis_lock.set_XXXX to configure connection properties if needed. + redis_lock.connect() # Required by RedisPipelineLock, does not apply to all locks. + pipeline = HAPipeline(pipeline_name, redis_lock) + pipeline.add_step(SampleStep()) + while True: + pipeline.run() + + + if __name__ == '__main__': + main() + +If Redis is not running on localhost on port 6379, you can call `redis_lock.set_XXXX()` to set those values accordingly. diff --git a/docs/pipelines/standalone_pipeline.rst b/docs/pipelines/standalone_pipeline.rst index be7d284..8d8fb0d 100644 --- a/docs/pipelines/standalone_pipeline.rst +++ b/docs/pipelines/standalone_pipeline.rst @@ -1,7 +1,35 @@ StandalonePipeline ================== -.. autoclass:: watergrid.pipelines.StandalonePipeline - :members: - :undoc-members: - :inherited-members: \ No newline at end of file +In standalone mode, only one instance of your application is required. This mode is the easiest to set up, +but does not provide any fault tolerance or host failover. If you are just getting started, this is the pipeline +mode to start with. + +Steps +----- + +1. Install WaterGrid-Python `pip install watergrid--1.0.1` +2. Create a pipeline and run it. + +.. code-block:: python + + from watergrid.pipelines import StandalonePipeline + from watergrid.steps import Step + from watergrid.context import DataContext + + class SampleStep(Step): + def __init__(self): + super().__init__(self.__class__.__name__) + + def run(self, context: DataContext): + print("Hello World!") + + def main(): + pipeline = StandalonePipeline('sample_pipeline') + pipeline.add_step(SampleStep()) + while True: + pipeline.run() + + + if __name__ == '__main__': + main() \ No newline at end of file diff --git a/docs/working_with_steps.rst b/docs/working_with_steps.rst new file mode 100644 index 0000000..2098148 --- /dev/null +++ b/docs/working_with_steps.rst @@ -0,0 +1,72 @@ +Working With Steps +================== + +Custom Steps +------------ + +Every step of your pipeline should be its own class and inherit from the `Step` class. Here is an example: + +.. code-block:: python + + from watergrid.steps import Step + from watergrid.context import DataContext + + class AddValueStep(Step): + def __init__(self): + # Use requires to denote which steps need to run before this one, and + # use provides to denote which steps can run after this one. + super().__init__(self.__class__.__name__, requires=['value'], provides=['added_value']) + + def run(self, context: DataContext): + # Use the context object to pass values between steps. + context.set('added_value', context.get('value') + 1) + +Note that the `requires` and `provides` lists are optional. If you do not specify them, the step will run +in any order in the pipeline. The keys provided in the two lists can be arbitrary, and do not need to match +the name of your step or any of the context keys that it utilizes. + +Split Steps +----------- +Sometimes you will have a pipeline step that will have a list of several values, +and you want to split the list so that subsequent steps will run once for each value. The split step +will perform the conversion of 1:X contexts after the step completes. + +.. code-block:: python + + from watergrid.steps import Step + from watergrid.context import DataContext + from watergrid.context import OutputMode + + class AddValuesStep(Step): + def __init__(self): + super().__init__(self.__class__.__name__, provides=['added_value']) + + def run(self, context: DataContext): + context.set('added_value', [0, 1, 2, 3, 4, 5]) + context.set_output_mode(OutputMode.SPLIT) + # The pipeline will automatically split the first key listed in provides[]. + # For example, in the next step context.get('added_value') will return 0. + # Then the next step will run again with the values 1, 2, 3, etc... + +Filter Steps +------------ +Filter steps have the option to pass back the value of `None`. If this is the case, this instance of the context will be deleted and not passed to the next step. Works great after split steps. + +Note that the pipeline will only filter the first field listed in the provides list. + +.. code-block:: python + + from watergrid.steps import Step + from watergrid.context import DataContext + from watergrid.context import OutputMode + class FilterStep(Step): + def __init__(self): + super().__init__(self.__class__.__name__, requires=['value'], provides=['filtered_value']) + + def run(self, context: DataContext): + value = context.get('value') + context.set_output_mode(OutputMode.FILTER) + if value == 1: + context.set('filtered_value', value) # If the value is 1, pass it along to the next step. + else: + context.set('filtered_value', None) # If the value is zero, delete this context and don't pass it to the next step.