Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions message_passing/waiting_for_handlers/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# Waiting for message handlers

This workflow demonstrates how to wait for signal and update handlers to
finish in the following circumstances:

- Before a successful return
- On failure
- On cancellation

Your workflow can also exit via Continue-As-New. In that case you would
usually wait for the handlers to finish immediately before the call to
continue_as_new(); that's not illustrated in this sample.


To run, open two terminals and `cd` to this directory in them.

Run the worker in one terminal:

poetry run python worker.py

And run the workflow-starter code in the other terminal:

poetry run python starter.py


Here's the output you'll see:

```
workflow exit type: SUCCESS
🟢 caller received update result
🟢 caller received workflow result


workflow exit type: FAILURE
🟢 caller received update result
🔴 caught exception while waiting for workflow result: Workflow execution failed: deliberately failing workflow


workflow exit type: CANCELLATION
🟢 caller received update result
```
21 changes: 21 additions & 0 deletions message_passing/waiting_for_handlers/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from dataclasses import dataclass
from enum import IntEnum

TASK_QUEUE = "my-task-queue"
WORKFLOW_ID = "my-workflow-id"


class WorkflowExitType(IntEnum):
SUCCESS = 0
FAILURE = 1
CANCELLATION = 2


@dataclass
class WorkflowInput:
exit_type: WorkflowExitType


@dataclass
class WorkflowResult:
data: str
8 changes: 8 additions & 0 deletions message_passing/waiting_for_handlers/activities.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import asyncio

from temporalio import activity


@activity.defn
async def activity_executed_by_update_handler():
await asyncio.sleep(1)
69 changes: 69 additions & 0 deletions message_passing/waiting_for_handlers/starter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import asyncio

from temporalio import client, common

from message_passing.waiting_for_handlers import (
TASK_QUEUE,
WORKFLOW_ID,
WorkflowExitType,
WorkflowInput,
)
from message_passing.waiting_for_handlers.workflows import WaitingForHandlersWorkflow


async def starter(exit_type: WorkflowExitType):
cl = await client.Client.connect("localhost:7233")
wf_handle = await cl.start_workflow(
WaitingForHandlersWorkflow.run,
WorkflowInput(exit_type=exit_type),
id=WORKFLOW_ID,
task_queue=TASK_QUEUE,
id_conflict_policy=common.WorkflowIDConflictPolicy.TERMINATE_EXISTING,
)
await _check_run(wf_handle, exit_type)


async def _check_run(
wf_handle: client.WorkflowHandle,
exit_type: WorkflowExitType,
):
try:
up_handle = await wf_handle.start_update(
WaitingForHandlersWorkflow.my_update,
wait_for_stage=client.WorkflowUpdateStage.ACCEPTED,
)
except Exception as e:
print(f" 🔴 caught exception while starting update: {e}: {e.__cause__ or ''}")

if exit_type == WorkflowExitType.CANCELLATION:
await wf_handle.cancel()

try:
await up_handle.result()
print(" 🟢 caller received update result")
except Exception as e:
print(
f" 🔴 caught exception while waiting for update result: {e}: {e.__cause__ or ''}"
)

try:
await wf_handle.result()
print(" 🟢 caller received workflow result")
except BaseException as e:
print(
f" 🔴 caught exception while waiting for workflow result: {e}: {e.__cause__ or ''}"
)


async def main():
for exit_type in [
WorkflowExitType.SUCCESS,
WorkflowExitType.FAILURE,
WorkflowExitType.CANCELLATION,
]:
Comment on lines +59 to +63
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bit complicated for a simple waiting sample IMO, but nothing wrong with it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The purpose of this sample is partly to teach users about what errors they'll see under different sorts of workflow exit. See the README:

workflow exit type: SUCCESS
    🟢 caller received update result
    🟢 caller received workflow result


workflow exit type: FAILURE
    🟢 caller received update result
    🔴 caught exception while waiting for workflow result: Workflow execution failed: deliberately failing workflow


workflow exit type: CANCELLATION
    🟢 caller received update result

print(f"\n\nworkflow exit type: {exit_type.name}")
await starter(exit_type)


if __name__ == "__main__":
asyncio.run(main())
40 changes: 40 additions & 0 deletions message_passing/waiting_for_handlers/worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import asyncio
import logging

from temporalio.client import Client
from temporalio.worker import Worker

from message_passing.waiting_for_handlers import TASK_QUEUE
from message_passing.waiting_for_handlers.activities import (
activity_executed_by_update_handler,
)
from message_passing.waiting_for_handlers.workflows import WaitingForHandlersWorkflow

interrupt_event = asyncio.Event()


async def main():
logging.basicConfig(level=logging.INFO)

client = await Client.connect("localhost:7233")

async with Worker(
client,
task_queue=TASK_QUEUE,
workflows=[WaitingForHandlersWorkflow],
activities=[
activity_executed_by_update_handler,
],
):
logging.info("Worker started, ctrl+c to exit")
await interrupt_event.wait()
logging.info("Shutting down")


if __name__ == "__main__":
loop = asyncio.new_event_loop()
try:
loop.run_until_complete(main())
except KeyboardInterrupt:
interrupt_event.set()
loop.run_until_complete(loop.shutdown_asyncgens())
95 changes: 95 additions & 0 deletions message_passing/waiting_for_handlers/workflows.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import asyncio
from datetime import timedelta

from temporalio import exceptions, workflow

from message_passing.waiting_for_handlers import (
WorkflowExitType,
WorkflowInput,
WorkflowResult,
)
from message_passing.waiting_for_handlers.activities import (
activity_executed_by_update_handler,
)


def is_workflow_exit_exception(e: BaseException) -> bool:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not accurate for all users to copy because users can customize what is a workflow exit exception. If we feel this is a utility users need, we should consider exposing on the SDK

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, it says that in the comment:

    # 👉 If you have set additional failure_exception_types you should also
    # check for these here.

and we have temporalio/features#589 for the utility.

"""
True if the exception is of a type that will cause the workflow to exit.

This is as opposed to exceptions that cause a workflow task failure, which
are retried automatically by Temporal.
"""
# 👉 If you have set additional failure_exception_types you should also
# check for these here.
return isinstance(e, (asyncio.CancelledError, exceptions.FailureError))


@workflow.defn
class WaitingForHandlersWorkflow:
@workflow.run
async def run(self, input: WorkflowInput) -> WorkflowResult:
"""
This workflow.run method demonstrates a pattern that can be used to wait for signal and
update handlers to finish in the following circumstances:

- On successful workflow return
- On workflow cancellation
- On workflow failure

Your workflow can also exit via Continue-As-New. In that case you would usually wait for
the handlers to finish immediately before the call to continue_as_new(); that's not
illustrated in this sample.

If you additionally need to perform cleanup or compensation on workflow failure or
cancellation, see the message_passing/waiting_for_handlers_and_compensation sample.
"""
try:
# 👉 Use this `try...except` style, instead of waiting for message
# handlers to finish in a `finally` block. The reason is that some
# exception types cause a workflow task failure as opposed to
# workflow exit, in which case we do *not* want to wait for message
# handlers to finish.
result = await self._my_workflow_application_logic(input)
await workflow.wait_condition(workflow.all_handlers_finished)
return result
# 👉 Catch BaseException since asyncio.CancelledError does not inherit
# from Exception.
except BaseException as e:
if is_workflow_exit_exception(e):
await workflow.wait_condition(workflow.all_handlers_finished)
raise
Comment on lines +56 to +61
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For simplicity reasons, I wonder if this sample should allow a workflow failure to not wait for a handler. That might be what some users want anyways (to fail the workflow now instead of waiting to fail).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that showing them how to wait for the handler when the workflow fails is useful since it points them towards how to implement graceful cleanup of a handler execution.


# Methods below this point can be ignored unless you are interested in
# the implementation details of this sample.

def __init__(self) -> None:
self._update_started = False

@workflow.update
async def my_update(self) -> str:
self._update_started = True
await workflow.execute_activity(
activity_executed_by_update_handler,
start_to_close_timeout=timedelta(seconds=10),
)
return "update-result"

async def _my_workflow_application_logic(
self, input: WorkflowInput
) -> WorkflowResult:
# The main workflow logic is implemented in a separate method in order
# to separate "platform-level" concerns (waiting for handlers to finish
# and error handling) from application logic.

# Wait until handlers have started, so that we are demonstrating that we
# wait for them to finish.
await workflow.wait_condition(lambda: self._update_started)
if input.exit_type == WorkflowExitType.SUCCESS:
return WorkflowResult(data="workflow-result")
elif input.exit_type == WorkflowExitType.FAILURE:
raise exceptions.ApplicationError("deliberately failing workflow")
elif input.exit_type == WorkflowExitType.CANCELLATION:
# Block forever; the starter will send a workflow cancellation request.
await asyncio.Future()
raise AssertionError("unreachable")
39 changes: 39 additions & 0 deletions message_passing/waiting_for_handlers_and_compensation/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# Waiting for message handlers, and performing compensation and cleanup in message handlers

This sample demonstrates how to do the following:

1. Ensure that all update/signal handlers are finished before a successful
workflow return, and on workflow cancellation and failure.
2. Perform compensation/cleanup in an update handler when the workflow is
cancelled or fails.

For a simpler sample showing how to do (1) without (2), see [safe_message_handlers](../safe_message_handlers/README.md).

To run, open two terminals and `cd` to this directory in them.

Run the worker in one terminal:

poetry run python worker.py

And run the workflow-starter code in the other terminal:

poetry run python starter.py


Here's the output you'll see:

```
workflow exit type: SUCCESS
🟢 caller received update result
🟢 caller received workflow result


workflow exit type: FAILURE
🔴 caught exception while waiting for update result: Workflow update failed: The update failed because the workflow run exited
🔴 caught exception while waiting for workflow result: Workflow execution failed: deliberately failing workflow


workflow exit type: CANCELLATION
🔴 caught exception while waiting for update result: Workflow update failed: The update failed because the workflow run exited
🔴 caught exception while waiting for workflow result: Workflow execution failed: Workflow cancelled
```
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from dataclasses import dataclass
from enum import IntEnum

TASK_QUEUE = "my-task-queue"
WORKFLOW_ID = "my-workflow-id"


class WorkflowExitType(IntEnum):
SUCCESS = 0
FAILURE = 1
CANCELLATION = 2


@dataclass
class WorkflowInput:
exit_type: WorkflowExitType


@dataclass
class WorkflowResult:
data: str
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import asyncio

from temporalio import activity


@activity.defn
async def activity_executed_to_perform_workflow_compensation():
await asyncio.sleep(1)


@activity.defn
async def activity_executed_by_update_handler():
await asyncio.sleep(1)


@activity.defn
async def activity_executed_by_update_handler_to_perform_compensation():
await asyncio.sleep(1)
Loading