From fb2f927e392a704e9ada34f2e1ca70aaee86b6b3 Mon Sep 17 00:00:00 2001 From: Drew Hoskins Date: Wed, 20 Nov 2024 16:50:02 -0800 Subject: [PATCH 1/4] Fix Infrequent Polling sample --- polling/frequent/activities.py | 4 ++- polling/infrequent/activities.py | 2 +- polling/infrequent/workflows.py | 4 +-- polling/test_service.py | 8 +++--- tests/polling/infrequent/workflow_test.py | 30 +++++++++++++++++++++++ 5 files changed, 39 insertions(+), 9 deletions(-) create mode 100644 tests/polling/infrequent/workflow_test.py diff --git a/polling/frequent/activities.py b/polling/frequent/activities.py index 2b49e379..4e78b9e1 100644 --- a/polling/frequent/activities.py +++ b/polling/frequent/activities.py @@ -16,10 +16,11 @@ class ComposeGreetingInput: @activity.defn async def compose_greeting(input: ComposeGreetingInput) -> str: test_service = TestService() + attempt = 1 while True: try: try: - result = await test_service.get_service_result(input) + result = await test_service.get_service_result(input, attempt) activity.logger.info(f"Exiting activity ${result}") return result except Exception as e: @@ -28,6 +29,7 @@ async def compose_greeting(input: ComposeGreetingInput) -> str: activity.heartbeat("Invoking activity") await asyncio.sleep(1) + attempt += 1 except asyncio.CancelledError: # activity was either cancelled or workflow was completed or worker shut down # if you need to clean up you can catch this. diff --git a/polling/infrequent/activities.py b/polling/infrequent/activities.py index 2bd71587..6572a3dd 100644 --- a/polling/infrequent/activities.py +++ b/polling/infrequent/activities.py @@ -16,4 +16,4 @@ async def compose_greeting(input: ComposeGreetingInput) -> str: test_service = TestService() # If this raises an exception because it's not done yet, the activity will # continually be scheduled for retry - return await test_service.get_service_result(input) + return await test_service.get_service_result(input, activity.info().attempt) diff --git a/polling/infrequent/workflows.py b/polling/infrequent/workflows.py index 35769573..bc2e4c4b 100644 --- a/polling/infrequent/workflows.py +++ b/polling/infrequent/workflows.py @@ -14,9 +14,9 @@ async def run(self, name: str) -> str: return await workflow.execute_activity( compose_greeting, ComposeGreetingInput("Hello", name), - start_to_close_timeout=timedelta(seconds=2), + start_to_close_timeout=timedelta(seconds=5), retry_policy=RetryPolicy( backoff_coefficient=1.0, - initial_interval=timedelta(seconds=60), + initial_interval=timedelta(seconds=30), ), ) diff --git a/polling/test_service.py b/polling/test_service.py index 3744998a..9e30db80 100644 --- a/polling/test_service.py +++ b/polling/test_service.py @@ -1,14 +1,12 @@ class TestService: def __init__(self): - self.try_attempts = 0 self.error_attempts = 5 - async def get_service_result(self, input): + async def get_service_result(self, input, attempt: int): print( - f"Attempt {self.try_attempts}" + f"Attempt {attempt}" f" of {self.error_attempts} to invoke service" ) - self.try_attempts += 1 - if self.try_attempts % self.error_attempts == 0: + if attempt % self.error_attempts == 0: return f"{input.greeting}, {input.name}!" raise Exception("service is down") diff --git a/tests/polling/infrequent/workflow_test.py b/tests/polling/infrequent/workflow_test.py new file mode 100644 index 00000000..2aa2b3bd --- /dev/null +++ b/tests/polling/infrequent/workflow_test.py @@ -0,0 +1,30 @@ +import uuid +import pytest +from polling.infrequent.activities import compose_greeting +from polling.infrequent.workflows import GreetingWorkflow +from temporalio.client import Client +from temporalio.testing import WorkflowEnvironment +from temporalio.worker import Worker + + +async def test_infrequent_polling_workflow(client: Client, env: WorkflowEnvironment): + if not env.supports_time_skipping: + pytest.skip( + "Too slow to test with time-skipping disabled") + + # Start a worker that hosts the workflow and activity implementations. + task_queue = f"tq-{uuid.uuid4()}" + async with Worker( + client, + task_queue=task_queue, + workflows=[GreetingWorkflow], + activities=[compose_greeting], + ): + handle = await client.start_workflow( + GreetingWorkflow.run, + "Temporal", + id=f"infrequent-polling-{uuid.uuid4()}", + task_queue=task_queue, + ) + result = await handle.result() + assert result == "Hello, Temporal!" \ No newline at end of file From 0204af5b6c236fd8e2ec849e6e42417fc7daa2bc Mon Sep 17 00:00:00 2001 From: Drew Hoskins Date: Wed, 20 Nov 2024 17:02:41 -0800 Subject: [PATCH 2/4] poe format --- polling/test_service.py | 5 +---- tests/polling/infrequent/workflow_test.py | 13 +++++++------ 2 files changed, 8 insertions(+), 10 deletions(-) diff --git a/polling/test_service.py b/polling/test_service.py index 9e30db80..9323cd09 100644 --- a/polling/test_service.py +++ b/polling/test_service.py @@ -3,10 +3,7 @@ def __init__(self): self.error_attempts = 5 async def get_service_result(self, input, attempt: int): - print( - f"Attempt {attempt}" - f" of {self.error_attempts} to invoke service" - ) + print(f"Attempt {attempt}" f" of {self.error_attempts} to invoke service") if attempt % self.error_attempts == 0: return f"{input.greeting}, {input.name}!" raise Exception("service is down") diff --git a/tests/polling/infrequent/workflow_test.py b/tests/polling/infrequent/workflow_test.py index 2aa2b3bd..31f3f987 100644 --- a/tests/polling/infrequent/workflow_test.py +++ b/tests/polling/infrequent/workflow_test.py @@ -1,17 +1,18 @@ import uuid + import pytest -from polling.infrequent.activities import compose_greeting -from polling.infrequent.workflows import GreetingWorkflow from temporalio.client import Client from temporalio.testing import WorkflowEnvironment from temporalio.worker import Worker +from polling.infrequent.activities import compose_greeting +from polling.infrequent.workflows import GreetingWorkflow + async def test_infrequent_polling_workflow(client: Client, env: WorkflowEnvironment): if not env.supports_time_skipping: - pytest.skip( - "Too slow to test with time-skipping disabled") - + pytest.skip("Too slow to test with time-skipping disabled") + # Start a worker that hosts the workflow and activity implementations. task_queue = f"tq-{uuid.uuid4()}" async with Worker( @@ -27,4 +28,4 @@ async def test_infrequent_polling_workflow(client: Client, env: WorkflowEnvironm task_queue=task_queue, ) result = await handle.result() - assert result == "Hello, Temporal!" \ No newline at end of file + assert result == "Hello, Temporal!" From 61cb1bf2ddd6a871783cf97adad9e14bc904d3c6 Mon Sep 17 00:00:00 2001 From: Drew Hoskins Date: Wed, 20 Nov 2024 17:11:28 -0800 Subject: [PATCH 3/4] Add __init__.py --- tests/polling/infrequent/__init__.py | 0 1 file changed, 0 insertions(+), 0 deletions(-) create mode 100644 tests/polling/infrequent/__init__.py diff --git a/tests/polling/infrequent/__init__.py b/tests/polling/infrequent/__init__.py new file mode 100644 index 00000000..e69de29b From f77df0aaa1c0da4fb3325e8dfa7aaf65274b7660 Mon Sep 17 00:00:00 2001 From: Drew Hoskins <166441821+drewhoskins-temporal@users.noreply.github.com> Date: Wed, 20 Nov 2024 17:22:48 -0800 Subject: [PATCH 4/4] Apply suggestions from code review Co-authored-by: Dan Davison --- polling/test_service.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/polling/test_service.py b/polling/test_service.py index 9323cd09..8698b838 100644 --- a/polling/test_service.py +++ b/polling/test_service.py @@ -3,7 +3,7 @@ def __init__(self): self.error_attempts = 5 async def get_service_result(self, input, attempt: int): - print(f"Attempt {attempt}" f" of {self.error_attempts} to invoke service") - if attempt % self.error_attempts == 0: + print(f"Attempt {attempt} of {self.error_attempts} to invoke service") + if attempt > self.error_attempts: return f"{input.greeting}, {input.name}!" raise Exception("service is down")