Dotflow is a lightweight Python library for building execution pipelines. Define tasks with decorators, chain them together, and run workflows in sequential, parallel, or background mode β with built-in retry, timeout, storage, notifications, and more.
Click to expand
We use GitHub issues for tracking bugs and feature requests.
pip install dotflowOptional extras:
pip install dotflow[aws] # AWS S3 storage
pip install dotflow[gcp] # Google Cloud Storage
pip install dotflow[scheduler] # Cron-based schedulerfrom dotflow import DotFlow, action
@action
def extract():
return {"users": 150}
@action
def transform(previous_context):
total = previous_context.storage["users"]
return {"users": total, "active": int(total * 0.8)}
@action
def load(previous_context):
print(f"Loaded {previous_context.storage['active']} active users")
workflow = DotFlow()
workflow.task.add(step=extract)
workflow.task.add(step=transform)
workflow.task.add(step=load)
workflow.start()Dotflow supports 4 execution strategies out of the box:
Tasks run one after another. The context from each task flows to the next.
workflow.task.add(step=task_a)
workflow.task.add(step=task_b)
workflow.start() # or mode="sequential"flowchart LR
A[task_a] --> B[task_b] --> C[Finish]
Same as sequential, but runs in a background thread β non-blocking.
workflow.start(mode="background")Every task runs simultaneously in its own process.
workflow.task.add(step=task_a)
workflow.task.add(step=task_b)
workflow.task.add(step=task_c)
workflow.start(mode="parallel")flowchart TD
S[Start] --> A[task_a] & B[task_b] & C[task_c]
A & B & C --> F[Finish]
Assign tasks to named groups. Groups run in parallel, but tasks within each group run sequentially.
workflow.task.add(step=fetch_users, group_name="users")
workflow.task.add(step=save_users, group_name="users")
workflow.task.add(step=fetch_orders, group_name="orders")
workflow.task.add(step=save_orders, group_name="orders")
workflow.start()flowchart TD
S[Start] --> G1[Group: users] & G2[Group: orders]
G1 --> A[fetch_users] --> B[save_users]
G2 --> C[fetch_orders] --> D[save_orders]
B & D --> F[Finish]
The @action decorator supports built-in resilience options:
@action(retry=3, timeout=10, retry_delay=2, backoff=True)
def unreliable_api_call():
response = requests.get("https://api.example.com/data")
response.raise_for_status()
return response.json()| Parameter | Type | Default | Description |
|---|---|---|---|
retry |
int |
1 |
Number of attempts before failing |
timeout |
int |
0 |
Max seconds per attempt (0 = no limit) |
retry_delay |
int |
1 |
Seconds to wait between retries |
backoff |
bool |
False |
Exponential backoff (delay doubles each retry) |
Tasks communicate through a context chain. Each task receives the previous task's output and can access its own initial context.
@action
def step_one():
return "Hello"
@action
def step_two(previous_context, initial_context):
greeting = previous_context.storage # "Hello"
name = initial_context.storage # "World"
return f"{greeting}, {name}!"
workflow = DotFlow()
workflow.task.add(step=step_one)
workflow.task.add(step=step_two, initial_context="World")
workflow.start()Each Context object contains:
storageβ the return value from the tasktask_idβ the task identifierworkflow_idβ the workflow identifiertimeβ timestamp of execution
Resume a workflow from where it left off. Requires a persistent storage provider and a fixed workflow_id.
from dotflow import DotFlow, Config, action
from dotflow.providers import StorageFile
config = Config(storage=StorageFile())
workflow = DotFlow(config=config, workflow_id="my-pipeline-v1")
workflow.task.add(step=step_a)
workflow.task.add(step=step_b)
workflow.task.add(step=step_c)
# First run β executes all tasks and saves checkpoints
workflow.start()
# If step_c failed, fix and re-run β skips step_a and step_b
workflow.start(resume=True)Choose where task results are persisted:
from dotflow import DotFlow
workflow = DotFlow() # uses StorageDefault (in-memory)from dotflow import DotFlow, Config
from dotflow.providers import StorageFile
config = Config(storage=StorageFile(path=".output"))
workflow = DotFlow(config=config)pip install dotflow[aws]from dotflow import DotFlow, Config
from dotflow.providers import StorageS3
config = Config(storage=StorageS3(bucket="my-bucket", prefix="pipelines/", region="us-east-1"))
workflow = DotFlow(config=config)pip install dotflow[gcp]from dotflow import DotFlow, Config
from dotflow.providers import StorageGCS
config = Config(storage=StorageGCS(bucket="my-bucket", prefix="pipelines/", project="my-project"))
workflow = DotFlow(config=config)Get notified about task status changes via Telegram.
from dotflow import DotFlow, Config
from dotflow.providers import NotifyTelegram
from dotflow.core.types.status import TypeStatus
notify = NotifyTelegram(
token="YOUR_BOT_TOKEN",
chat_id=123456789,
notification_type=TypeStatus.FAILED, # only notify on failures (optional)
)
config = Config(notify=notify)
workflow = DotFlow(config=config)Status types: NOT_STARTED, IN_PROGRESS, COMPLETED, PAUSED, RETRY, FAILED
Return a class instance from a task, and Dotflow will automatically discover and execute all @action-decorated methods in source order.
from dotflow import action
class ETLPipeline:
@action
def extract(self):
return {"raw": [1, 2, 3]}
@action
def transform(self, previous_context):
data = previous_context.storage["raw"]
return {"processed": [x * 2 for x in data]}
@action
def load(self, previous_context):
print(f"Loaded: {previous_context.storage['processed']}")
@action
def run_pipeline():
return ETLPipeline()
workflow = DotFlow()
workflow.task.add(step=run_pipeline)
workflow.start()Organize tasks into named groups for parallel group execution.
workflow.task.add(step=scrape_site_a, group_name="scraping")
workflow.task.add(step=scrape_site_b, group_name="scraping")
workflow.task.add(step=process_data, group_name="processing")
workflow.task.add(step=save_results, group_name="processing")
workflow.start() # groups run in parallel, tasks within each group run sequentiallyExecute a function after each task completes β useful for logging, alerting, or side effects.
def on_task_done(task):
print(f"Task {task.task_id} finished with status: {task.status}")
workflow.task.add(step=my_step, callback=on_task_done)Workflow-level callbacks for success and failure:
def on_success(*args, **kwargs):
print("All tasks completed!")
def on_failure(*args, **kwargs):
print("Something went wrong.")
workflow.start(on_success=on_success, on_failure=on_failure)Control whether the workflow stops or continues when a task fails:
# Stop on first failure (default)
workflow.start(keep_going=False)
# Continue executing remaining tasks even if one fails
workflow.start(keep_going=True)Each task tracks its errors with full detail:
- Attempt number
- Exception type and message
- Traceback
Access results after execution:
for task in workflow.result_task():
print(f"Task {task.task_id}: {task.status}")
if task.errors:
print(f" Errors: {task.errors}")@action automatically detects and handles async functions:
import httpx
from dotflow import DotFlow, action
@action(timeout=30)
async def fetch_data():
async with httpx.AsyncClient() as client:
response = await client.get("https://api.example.com/data")
return response.json()
workflow = DotFlow()
workflow.task.add(step=fetch_data)
workflow.start()Schedule workflows to run automatically using cron expressions.
pip install dotflow[scheduler]from dotflow import DotFlow, Config, action
from dotflow.providers import SchedulerCron
@action
def sync_data():
return {"synced": True}
config = Config(scheduler=SchedulerCron(cron="*/5 * * * *"))
workflow = DotFlow(config=config)
workflow.task.add(step=sync_data)
workflow.schedule()Control what happens when a new execution triggers while the previous one is still running:
| Strategy | Description |
|---|---|
skip |
Drops the new run if the previous is still active (default) |
queue |
Buffers one pending run, executes when the current finishes |
parallel |
Runs up to 10 concurrent executions via semaphore |
from dotflow.providers import SchedulerCron
# Queue overlapping executions
scheduler = SchedulerCron(cron="*/5 * * * *", overlap="queue")
# Allow parallel executions
scheduler = SchedulerCron(cron="*/5 * * * *", overlap="parallel")The scheduler handles graceful shutdown via SIGINT/SIGTERM signals automatically.
Run workflows directly from the command line:
# Simple execution
dotflow start --step my_module.my_task
# With initial context
dotflow start --step my_module.my_task --initial-context '{"key": "value"}'
# With callback
dotflow start --step my_module.my_task --callback my_module.on_done
# With execution mode
dotflow start --step my_module.my_task --mode parallel
# With file storage
dotflow start --step my_module.my_task --storage file --path .output
# With S3 storage
dotflow start --step my_module.my_task --storage s3
# With GCS storage
dotflow start --step my_module.my_task --storage gcs
# Schedule with cron
dotflow schedule --step my_module.my_task --cron "*/5 * * * *"
# Schedule with overlap strategy
dotflow schedule --step my_module.my_task --cron "0 * * * *" --overlap queue
# Schedule with resume
dotflow schedule --step my_module.my_task --cron "0 */6 * * *" --storage file --resumeAvailable CLI commands:
| Command | Description |
|---|---|
dotflow init |
Initialize a new Dotflow project |
dotflow start |
Run a workflow |
dotflow schedule |
Run a workflow on a cron schedule |
dotflow log |
View execution logs |
The Config class lets you swap providers for storage, notifications, logging, and scheduling:
from dotflow import DotFlow, Config
from dotflow.providers import StorageFile, NotifyTelegram, LogDefault, SchedulerCron
config = Config(
storage=StorageFile(path=".output"),
notify=NotifyTelegram(token="...", chat_id=123),
log=LogDefault(),
scheduler=SchedulerCron(cron="0 * * * *"),
)
workflow = DotFlow(config=config)Extend Dotflow by implementing the abstract base classes:
| ABC | Methods | Purpose |
|---|---|---|
Storage |
post, get, key |
Custom storage backends |
Notify |
send |
Custom notification channels |
Log |
info, error |
Custom logging |
Scheduler |
start, stop |
Custom scheduling strategies |
After execution, inspect results directly from the workflow object:
workflow.start()
# List of Task objects
tasks = workflow.result_task()
# List of Context objects (one per task)
contexts = workflow.result_context()
# List of storage values (raw return values)
storages = workflow.result_storage()
# Serialized result (Pydantic model)
result = workflow.result()Task builder utilities:
workflow.task.count() # Number of tasks
workflow.task.clear() # Remove all tasks
workflow.task.reverse() # Reverse execution order
workflow.task.schema() # Pydantic schema of the workflowReference tasks and callbacks by their module path string instead of importing them directly:
workflow.task.add(step="my_package.tasks.process_data")
workflow.task.add(step="my_package.tasks.save_results", callback="my_package.callbacks.notify")All examples are available in the docs_src/ directory.
| Example | Description | Command |
|---|---|---|
| first_steps | Minimal workflow with callback | python docs_src/first_steps/first_steps.py |
| simple_function_workflow | Simple function-based workflow | python docs_src/basic/simple_function_workflow.py |
| simple_class_workflow | Class-based step with retry | python docs_src/basic/simple_class_workflow.py |
| simple_function_workflow_with_error | Error inspection after failure | python docs_src/basic/simple_function_workflow_with_error.py |
| Example | Description | Command |
|---|---|---|
| async_action | Async task functions | python docs_src/async/async_action.py |
| Example | Description | Command |
|---|---|---|
| context | Creating and inspecting a Context | python docs_src/context/context.py |
| initial_context | Passing initial context per task | python docs_src/initial_context/initial_context.py |
| previous_context | Chaining context between tasks | python docs_src/previous_context/previous_context.py |
| many_contexts | Using both initial and previous context | python docs_src/context/many_contexts.py |
| Example | Description | Command |
|---|---|---|
| sequential | Sequential execution | python docs_src/process_mode/sequential.py |
| background | Background (non-blocking) execution | python docs_src/process_mode/background.py |
| parallel | Parallel execution | python docs_src/process_mode/parallel.py |
| parallel_group | Parallel groups execution | python docs_src/process_mode/parallel_group.py |
| sequential_group_mode | Sequential with named groups | python docs_src/workflow/sequential_group_mode.py |
| Example | Description | Command |
|---|---|---|
| retry | Retry on function and class steps | python docs_src/retry/retry.py |
| retry_delay | Retry with delay between attempts | python docs_src/retry/retry_delay.py |
| backoff | Exponential backoff on retries | python docs_src/backoff/backoff.py |
| timeout | Timeout per task execution | python docs_src/timeout/timeout.py |
| Example | Description | Command |
|---|---|---|
| task_callback | Per-task callback on completion | python docs_src/callback/task_callback.py |
| workflow_callback_success | Workflow-level success callback | python docs_src/callback/workflow_callback_success.py |
| workflow_callback_failure | Workflow-level failure callback | python docs_src/callback/workflow_callback_failure.py |
| Example | Description | Command |
|---|---|---|
| errors | Inspecting task errors and retry count | python docs_src/errors/errors.py |
| keep_going_true | Continue workflow after task failure | python docs_src/workflow/keep_going_true.py |
| Example | Description | Command |
|---|---|---|
| step_with_groups | Tasks in named parallel groups | python docs_src/group/step_with_groups.py |
| Example | Description | Command |
|---|---|---|
| storage_file | File-based JSON storage | python docs_src/storage/storage_file.py |
| storage_s3 | AWS S3 storage | python docs_src/storage/storage_s3.py |
| storage_gcs | Google Cloud Storage | python docs_src/storage/storage_gcs.py |
| Example | Description | Command |
|---|---|---|
| checkpoint | Resume workflow from last checkpoint | python docs_src/checkpoint/checkpoint.py |
| Example | Description | Command |
|---|---|---|
| notify_telegram | Telegram notifications on failure | python docs_src/notify/notify_telegram.py |
| Example | Description | Command |
|---|---|---|
| config | Full Config with storage, notify, log | python docs_src/config/config.py |
| storage_provider | Swapping storage providers | python docs_src/config/storage_provider.py |
| notify_provider | Swapping notification providers | python docs_src/config/notify_provider.py |
| log_provider | Custom log provider | python docs_src/config/log_provider.py |
| Example | Description | Command |
|---|---|---|
| step_function_result_task | Inspect task results (function) | python docs_src/output/step_function_result_task.py |
| step_function_result_context | Inspect context results (function) | python docs_src/output/step_function_result_context.py |
| step_function_result_storage | Inspect storage results (function) | python docs_src/output/step_function_result_storage.py |
| step_class_result_task | Inspect task results (class) | python docs_src/output/step_class_result_task.py |
| step_class_result_context | Inspect context results (class) | python docs_src/output/step_class_result_context.py |
| step_class_result_storage | Inspect storage results (class) | python docs_src/output/step_class_result_storage.py |
| Example | Description | Command |
|---|---|---|
| simple_cli | Basic CLI execution | dotflow start --step docs_src.basic.simple_cli.simple_step |
| cli_with_callback | CLI with callback function | dotflow start --step docs_src.cli.cli_with_callback.simple_step --callback docs_src.cli.cli_with_callback.callback |
| cli_with_initial_context | CLI with initial context | dotflow start --step docs_src.cli.cli_with_initial_context.simple_step --initial-context abc |
| cli_with_mode | CLI with execution mode | dotflow start --step docs_src.cli.cli_with_mode.simple_step --mode sequential |
| cli_with_output_context | CLI with file storage output | dotflow start --step docs_src.cli.cli_with_output_context.simple_step --storage file |
| cli_with_path | CLI with custom storage path | dotflow start --step docs_src.cli.cli_with_path.simple_step --path .storage --storage file |
| Icon | Type | Description |
|---|---|---|
| βοΈ | FEATURE | New feature |
| π | PEP8 | Formatting fixes following PEP8 |
| π | ISSUE | Reference to issue |
| πͺ² | BUG | Bug fix |
| π | DOCS | Documentation changes |
| π¦ | PyPI | PyPI releases |
| β€οΈοΈ | TEST | Automated tests |
| β¬οΈ | CI/CD | Changes in continuous integration/delivery |
| SECURITY | Security improvements |
This project is licensed under the terms of the MIT License.
