From 462f3a77cf2ac4bfa958fffea4beafd04a3dee71 Mon Sep 17 00:00:00 2001 From: Caio Nogueira Date: Wed, 25 Feb 2026 12:03:15 +0000 Subject: [PATCH 1/2] Updates Python Workflows docs to new sdk changes --- .../workers/languages/python/examples.mdx | 10 +-- src/content/docs/workflows/python/dag.mdx | 33 ++++++--- src/content/docs/workflows/python/index.mdx | 10 +-- .../workflows/python/python-workers-api.mdx | 73 ++++++++++++++----- 4 files changed, 84 insertions(+), 42 deletions(-) diff --git a/src/content/docs/workers/languages/python/examples.mdx b/src/content/docs/workers/languages/python/examples.mdx index 47b8249680832ab..382c1b53802e505 100644 --- a/src/content/docs/workers/languages/python/examples.mdx +++ b/src/content/docs/workers/languages/python/examples.mdx @@ -213,20 +213,20 @@ from workers import WorkflowEntrypoint class MyWorkflow(WorkflowEntrypoint): async def run(self, event, step): - @step.do("dependency a") + @step.do() async def step_a(): # do some work return 10 - @step.do("dependency b") + @step.do() async def step_b(): # do some work return 20 - @step.do("my final step", depends=[step_a, step_b], concurrent=True) - async def my_final_step(result_a=0, result_b=0): + @step.do(concurrent=True) + async def my_final_step(step_a, step_b): # should return 30 - return result_a + result_b + return step_a + step_b await my_final_step() ``` diff --git a/src/content/docs/workflows/python/dag.mdx b/src/content/docs/workflows/python/dag.mdx index 8bdba984a0e6cf3..e5f081863718129 100644 --- a/src/content/docs/workflows/python/dag.mdx +++ b/src/content/docs/workflows/python/dag.mdx @@ -5,10 +5,10 @@ sidebar: order: 4 --- -The Python Workflows SDK supports DAG workflows in a declarative way, using the `step.do` decorator with the `depends` parameter to define dependencies (other steps that must complete before this step can run). +The Python Workflows SDK supports DAG workflows in a declarative way, using `step.do` and parameter names to define dependencies (other steps that must complete before a step can run). ```python -from workers import Response, WorkflowEntrypoint +from workers import Response, WorkflowEntrypoint, WorkerEntrypoint class PythonWorkflowStarter(WorkflowEntrypoint): async def run(self, event, step): @@ -20,30 +20,39 @@ class PythonWorkflowStarter(WorkflowEntrypoint): await step.sleep('demo sleep', '10 seconds') - @step.do('dependency1') + @step.do() async def dep_1(): # does stuff print('executing dep1') + return 'dep1' - @step.do('dependency2') + @step.do() async def dep_2(): # does stuff print('executing dep2') + return 'dep2' - @step.do('demo do', depends=[dep_1, dep_2], concurrent=True) - async def final_step(res1, res2): + @step.do(concurrent=True) + async def final_step(dep_1, dep_2): # does stuff - print('something') + print(f'{dep_1} {dep_2}') await await_step(final_step) -async def on_fetch(request, env): - await env.MY_WORKFLOW.create() - return Response("Hello world!") +class Default(WorkerEntrypoint): + async def fetch(self, request): + await self.env.MY_WORKFLOW.create() + return Response("Hello world!") ``` -On this example, `dep_1` and `dep_2` are run concurrently before execution of `final_step`, which depends on both of them. +In this example, `dep_1` and `dep_2` are run concurrently before execution of `final_step`, which depends on both of them. -Having `concurrent=True` allows the dependencies to be resolved concurrently. If one of the callables passed to `depends` has already completed, it will be skipped and its return value will be reused. +Having `concurrent=True` allows dependencies to be resolved concurrently. If a dependency has already completed, it will be skipped and its return value will be reused. + +:::note + +Older compatibility behavior supports explicit dependency lists with `depends=[...]`. For new workflows, use parameter names to express step dependencies. + +::: This pattern is useful for diamond shaped workflows, where a step depends on two or more other steps that can run concurrently. diff --git a/src/content/docs/workflows/python/index.mdx b/src/content/docs/workflows/python/index.mdx index 023ddaad0db9721..0b9f09fd8f64bae 100644 --- a/src/content/docs/workflows/python/index.mdx +++ b/src/content/docs/workflows/python/index.mdx @@ -31,7 +31,7 @@ class MyWorkflow(WorkflowEntrypoint): For example, a Workflow may be defined as: ```python -from workers import Response, WorkflowEntrypoint +from workers import Response, WorkflowEntrypoint, WorkerEntrypoint class PythonWorkflowStarter(WorkflowEntrypoint): async def run(self, event, step): @@ -49,9 +49,10 @@ class PythonWorkflowStarter(WorkflowEntrypoint): await step_1() await step_2() -async def on_fetch(request, env): - await env.MY_WORKFLOW.create() - return Response("Hello world!") +class Default(WorkerEntrypoint): + async def fetch(self, request): + await self.env.MY_WORKFLOW.create() + return Response("Hello world!") ``` You must add both `python_workflows` and `python_workers` compatibility flags to your Wrangler configuration file. @@ -64,7 +65,6 @@ You must add both `python_workflows` and `python_workers` compatibility flags to "main": "src/entry.py", "compatibility_flags": [ "python_workers", - "experimental", "python_workflows" ], "compatibility_date": "$today", diff --git a/src/content/docs/workflows/python/python-workers-api.mdx b/src/content/docs/workflows/python/python-workers-api.mdx index 3573cb02df6b591..3162e48d623a5fb 100644 --- a/src/content/docs/workflows/python/python-workers-api.mdx +++ b/src/content/docs/workflows/python/python-workers-api.mdx @@ -21,18 +21,33 @@ class MyWorkflow(WorkflowEntrypoint): ## WorkflowStep -* step.do(name, depends=[], concurrent=False, config=None) is a decorator that allows you to define a step in a workflow. - * `name` - the name of the step. - * `depends` - an optional list of steps that must complete before this step can run. See [DAG Workflows](/workflows/python/dag). - * `concurrent` - an optional boolean that indicates whether this step can run concurrently with other steps. - * `config` - an optional [`WorkflowStepConfig`](/workflows/build/workers-api/#workflowstepconfig) for configuring [step specific retry behaviour](/workflows/build/sleeping-and-retrying/). This is passed as a Python dictionary and then type translated into a `WorkflowStepConfig` object. +- step.do(name=None, *, concurrent=False, config=None) is a + decorator that allows you to define a step in a workflow. + +`name` is an optional name for the step. If omitted, the function name (`func.__name__`) is used. + +`concurrent` is an optional boolean that indicates whether dependencies for this step can run concurrently. + +`config` is an optional [`WorkflowStepConfig`](/workflows/build/workers-api/#workflowstepconfig) for configuring [step specific retry behaviour](/workflows/build/sleeping-and-retrying/). This is passed as a Python dictionary and then type translated into a `WorkflowStepConfig` object. + +All parameters except `name` are keyword-only. + +Dependencies are resolved implicitly by parameter name. If a step function parameter name matches a previously declared step function, its result is injected into the step. + +If you define a `ctx` parameter, the step context is injected into that argument. + +:::note + +Older compatibility behavior supports explicit dependency lists with `depends=[...]`. For new workflows, prefer implicit dependency resolution by parameter name. + +::: ```python from workers import WorkflowEntrypoint class MyWorkflow(WorkflowEntrypoint): async def run(self, event, step): - @step.do("my first step") + @step.do() async def my_first_step(): # do some work return "Hello World!" @@ -45,20 +60,22 @@ Note that the decorator doesn't make the call to the step, it just returns a cal When returning state from a step, you must make sure that the returned value is serializable. Since steps run through an FFI layer, the returned value gets type translated via [FFI.](https://pyodide.org/en/stable/usage/api/python-api/ffi.html#pyodide.ffi.to_js) Refer to [Pyodide's documentation](https://pyodide.org/en/stable/usage/type-conversions.html#type-translations-pyproxy-to-js) regarding type conversions for more information. -* step.sleep(name, duration) +- step.sleep(name, duration) + +`name` is the name of the step. - * `name` - the name of the step. - * `duration` - the duration to sleep until, in either seconds or as a `WorkflowDuration` compatible string. +`duration` is the duration to sleep until, in either seconds or as a `WorkflowDuration` compatible string. ```python async def run(self, event, step): await step.sleep("my-sleep-step", "10 seconds") ``` -* step.sleep_until(name, timestamp) +- step.sleep_until(name, timestamp) - * `name` - the name of the step. - * `timestamp` - a `datetime.datetime` object or seconds from the Unix epoch to sleep the workflow instance until. +`name` is the name of the step. + +`timestamp` is a `datetime.datetime` object or seconds from the Unix epoch to sleep the workflow instance until. ```python import datetime @@ -67,11 +84,13 @@ async def run(self, event, step): await step.sleep_until("my-sleep-step", datetime.datetime.now() + datetime.timedelta(seconds=10)) ``` -* step.wait_for_event(name, event_type, timeout="24 hours") +- step.wait_for_event(name, event_type, timeout="24 hours") + +`name` is the name of the step. + +`event_type` is the type of event to wait for. - * `name` - the name of the step. - * `event_type` - the type of event to wait for. - * `timeout` - the timeout for the `wait_for_event` call. The default timeout is 24 hours. +`timeout` is the timeout for the `wait_for_event` call. The default timeout is 24 hours. ```python async def run(self, event, step): @@ -82,10 +101,10 @@ async def run(self, event, step): The `event` parameter is a dictionary that contains the payload passed to the workflow instance, along with other metadata: -* payload - the payload passed to the workflow instance. -* timestamp - the timestamp that the workflow was triggered. -* instanceId - the ID of the current workflow instance. -* workflowName - the name of the workflow. +- payload - the payload passed to the workflow instance. +- timestamp - the timestamp that the workflow was triggered. +- instanceId - the ID of the current workflow instance. +- workflowName - the name of the workflow. ## Error Handling @@ -141,6 +160,20 @@ class DemoWorkflowClass(WorkflowEntrypoint): pass ``` +### Access step context (`ctx`) + +```python +from workers import WorkflowEntrypoint + +class CtxWorkflow(WorkflowEntrypoint): + async def run(self, event, step): + @step.do() + async def read_context(ctx): + return ctx["attempt"] + + return await read_context() +``` + ### Create an instance via binding Note that `env` is a JavaScript object exposed to the Python script via [JsProxy](https://pyodide.org/en/stable/usage/api/python-api/ffi.html#pyodide.ffi.JsProxy). You can access the binding like you would on a JavaScript worker. Refer to the [Workflow binding documentation](/workflows/build/workers-api/#workflow) to learn more about the methods available. From b7d6aadc52f7e8366f5558885e7d737fb43128b4 Mon Sep 17 00:00:00 2001 From: "ask-bonk[bot]" Date: Wed, 25 Feb 2026 13:34:46 +0000 Subject: [PATCH 2/2] Reformatted params as indented bullets Co-authored-by: Oxyjun --- .../workflows/python/python-workers-api.mdx | 39 ++++++++----------- 1 file changed, 16 insertions(+), 23 deletions(-) diff --git a/src/content/docs/workflows/python/python-workers-api.mdx b/src/content/docs/workflows/python/python-workers-api.mdx index 3162e48d623a5fb..aded391f4657b58 100644 --- a/src/content/docs/workflows/python/python-workers-api.mdx +++ b/src/content/docs/workflows/python/python-workers-api.mdx @@ -21,16 +21,13 @@ class MyWorkflow(WorkflowEntrypoint): ## WorkflowStep -- step.do(name=None, *, concurrent=False, config=None) is a - decorator that allows you to define a step in a workflow. +{/* prettier-ignore */} +- step.do(name=None, *, concurrent=False, config=None) — a decorator that allows you to define a step in a workflow. + - `name` — an optional name for the step. If omitted, the function name (`func.__name__`) is used. + - `concurrent` — an optional boolean that indicates whether dependencies for this step can run concurrently. + - `config` — an optional [`WorkflowStepConfig`](/workflows/build/workers-api/#workflowstepconfig) for configuring [step specific retry behaviour](/workflows/build/sleeping-and-retrying/). This is passed as a Python dictionary and then type translated into a `WorkflowStepConfig` object. -`name` is an optional name for the step. If omitted, the function name (`func.__name__`) is used. - -`concurrent` is an optional boolean that indicates whether dependencies for this step can run concurrently. - -`config` is an optional [`WorkflowStepConfig`](/workflows/build/workers-api/#workflowstepconfig) for configuring [step specific retry behaviour](/workflows/build/sleeping-and-retrying/). This is passed as a Python dictionary and then type translated into a `WorkflowStepConfig` object. - -All parameters except `name` are keyword-only. + All parameters except `name` are keyword-only. Dependencies are resolved implicitly by parameter name. If a step function parameter name matches a previously declared step function, its result is injected into the step. @@ -60,22 +57,20 @@ Note that the decorator doesn't make the call to the step, it just returns a cal When returning state from a step, you must make sure that the returned value is serializable. Since steps run through an FFI layer, the returned value gets type translated via [FFI.](https://pyodide.org/en/stable/usage/api/python-api/ffi.html#pyodide.ffi.to_js) Refer to [Pyodide's documentation](https://pyodide.org/en/stable/usage/type-conversions.html#type-translations-pyproxy-to-js) regarding type conversions for more information. +{/* prettier-ignore */} - step.sleep(name, duration) - -`name` is the name of the step. - -`duration` is the duration to sleep until, in either seconds or as a `WorkflowDuration` compatible string. + - `name` — the name of the step. + - `duration` — the duration to sleep until, in either seconds or as a `WorkflowDuration` compatible string. ```python async def run(self, event, step): await step.sleep("my-sleep-step", "10 seconds") ``` +{/* prettier-ignore */} - step.sleep_until(name, timestamp) - -`name` is the name of the step. - -`timestamp` is a `datetime.datetime` object or seconds from the Unix epoch to sleep the workflow instance until. + - `name` — the name of the step. + - `timestamp` — a `datetime.datetime` object or seconds from the Unix epoch to sleep the workflow instance until. ```python import datetime @@ -84,13 +79,11 @@ async def run(self, event, step): await step.sleep_until("my-sleep-step", datetime.datetime.now() + datetime.timedelta(seconds=10)) ``` +{/* prettier-ignore */} - step.wait_for_event(name, event_type, timeout="24 hours") - -`name` is the name of the step. - -`event_type` is the type of event to wait for. - -`timeout` is the timeout for the `wait_for_event` call. The default timeout is 24 hours. + - `name` — the name of the step. + - `event_type` — the type of event to wait for. + - `timeout` — the timeout for the `wait_for_event` call. The default timeout is 24 hours. ```python async def run(self, event, step):