From c5bb4e4038c590fa2fa7d1cb9d2ace4a03c8a8d1 Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Wed, 20 Nov 2024 20:41:38 -0500 Subject: [PATCH 1/6] Lint fixes --- polling/frequent/activities.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/polling/frequent/activities.py b/polling/frequent/activities.py index 2b49e379..7acfcc34 100644 --- a/polling/frequent/activities.py +++ b/polling/frequent/activities.py @@ -1,5 +1,4 @@ import asyncio -import time from dataclasses import dataclass from temporalio import activity @@ -22,7 +21,7 @@ async def compose_greeting(input: ComposeGreetingInput) -> str: result = await test_service.get_service_result(input) activity.logger.info(f"Exiting activity ${result}") return result - except Exception as e: + except Exception: # swallow exception since service is down activity.logger.debug("Failed, trying again shortly", exc_info=True) From ddd7b83bb95dd3fcfb2f21e91a2458a4af043970 Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Thu, 21 Nov 2024 13:48:40 -0500 Subject: [PATCH 2/6] Fix infrequent polling sample --- polling/frequent/activities.py | 9 +------ polling/infrequent/activities.py | 10 +------- polling/periodic_sequence/activities.py | 10 ++------ polling/periodic_sequence/workflows.py | 6 ++--- polling/test_service.py | 23 ++++++++++++----- tests/polling/infrequent/__init__.py | 0 tests/polling/infrequent/workflow_test.py | 31 +++++++++++++++++++++++ 7 files changed, 53 insertions(+), 36 deletions(-) create mode 100644 tests/polling/infrequent/__init__.py create mode 100644 tests/polling/infrequent/workflow_test.py diff --git a/polling/frequent/activities.py b/polling/frequent/activities.py index 7acfcc34..96fed1af 100644 --- a/polling/frequent/activities.py +++ b/polling/frequent/activities.py @@ -1,15 +1,8 @@ import asyncio -from dataclasses import dataclass from temporalio import activity -from polling.test_service import TestService - - -@dataclass -class ComposeGreetingInput: - greeting: str - name: str +from polling.test_service import ComposeGreetingInput, TestService @activity.defn diff --git a/polling/infrequent/activities.py b/polling/infrequent/activities.py index 2bd71587..cbed7027 100644 --- a/polling/infrequent/activities.py +++ b/polling/infrequent/activities.py @@ -1,14 +1,6 @@ -from dataclasses import dataclass - from temporalio import activity -from polling.test_service import TestService - - -@dataclass -class ComposeGreetingInput: - greeting: str - name: str +from polling.test_service import ComposeGreetingInput, TestService @activity.defn diff --git a/polling/periodic_sequence/activities.py b/polling/periodic_sequence/activities.py index 1a1196c6..87b69890 100644 --- a/polling/periodic_sequence/activities.py +++ b/polling/periodic_sequence/activities.py @@ -1,14 +1,8 @@ -from dataclasses import dataclass +from typing import Any, NoReturn from temporalio import activity -@dataclass -class ComposeGreetingInput: - greeting: str - name: str - - @activity.defn -async def compose_greeting(input: ComposeGreetingInput) -> str: +async def compose_greeting(input: Any) -> NoReturn: raise RuntimeError("Service is down") diff --git a/polling/periodic_sequence/workflows.py b/polling/periodic_sequence/workflows.py index d38d41ce..917170c1 100644 --- a/polling/periodic_sequence/workflows.py +++ b/polling/periodic_sequence/workflows.py @@ -6,10 +6,8 @@ from temporalio.exceptions import ActivityError with workflow.unsafe.imports_passed_through(): - from polling.periodic_sequence.activities import ( - ComposeGreetingInput, - compose_greeting, - ) + from polling.periodic_sequence.activities import compose_greeting + from polling.test_service import ComposeGreetingInput @workflow.defn diff --git a/polling/test_service.py b/polling/test_service.py index 3744998a..42a54ab4 100644 --- a/polling/test_service.py +++ b/polling/test_service.py @@ -1,14 +1,23 @@ +from dataclasses import dataclass + + +@dataclass +class ComposeGreetingInput: + greeting: str + name: str + + +try_attempts = 0 + + class TestService: def __init__(self): - self.try_attempts = 0 self.error_attempts = 5 async def get_service_result(self, input): - print( - f"Attempt {self.try_attempts}" - f" of {self.error_attempts} to invoke service" - ) - self.try_attempts += 1 - if self.try_attempts % self.error_attempts == 0: + global try_attempts + print(f"Attempt {try_attempts} of {self.error_attempts} to invoke service") + try_attempts += 1 + if try_attempts % self.error_attempts == 0: return f"{input.greeting}, {input.name}!" raise Exception("service is down") diff --git a/tests/polling/infrequent/__init__.py b/tests/polling/infrequent/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/polling/infrequent/workflow_test.py b/tests/polling/infrequent/workflow_test.py new file mode 100644 index 00000000..31f3f987 --- /dev/null +++ b/tests/polling/infrequent/workflow_test.py @@ -0,0 +1,31 @@ +import uuid + +import pytest +from temporalio.client import Client +from temporalio.testing import WorkflowEnvironment +from temporalio.worker import Worker + +from polling.infrequent.activities import compose_greeting +from polling.infrequent.workflows import GreetingWorkflow + + +async def test_infrequent_polling_workflow(client: Client, env: WorkflowEnvironment): + if not env.supports_time_skipping: + pytest.skip("Too slow to test with time-skipping disabled") + + # Start a worker that hosts the workflow and activity implementations. + task_queue = f"tq-{uuid.uuid4()}" + async with Worker( + client, + task_queue=task_queue, + workflows=[GreetingWorkflow], + activities=[compose_greeting], + ): + handle = await client.start_workflow( + GreetingWorkflow.run, + "Temporal", + id=f"infrequent-polling-{uuid.uuid4()}", + task_queue=task_queue, + ) + result = await handle.result() + assert result == "Hello, Temporal!" From 6fcc52c1b31b61d0e2e65d05a75d91f0175affb7 Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Mon, 25 Nov 2024 12:16:39 -0500 Subject: [PATCH 3/6] Isolate counters associated with different workflows --- polling/frequent/activities.py | 5 ++--- polling/infrequent/activities.py | 5 ++--- polling/test_service.py | 26 +++++++++++++------------- 3 files changed, 17 insertions(+), 19 deletions(-) diff --git a/polling/frequent/activities.py b/polling/frequent/activities.py index 96fed1af..a112b417 100644 --- a/polling/frequent/activities.py +++ b/polling/frequent/activities.py @@ -2,16 +2,15 @@ from temporalio import activity -from polling.test_service import ComposeGreetingInput, TestService +from polling.test_service import ComposeGreetingInput, get_service_result @activity.defn async def compose_greeting(input: ComposeGreetingInput) -> str: - test_service = TestService() while True: try: try: - result = await test_service.get_service_result(input) + result = await get_service_result(input) activity.logger.info(f"Exiting activity ${result}") return result except Exception: diff --git a/polling/infrequent/activities.py b/polling/infrequent/activities.py index cbed7027..b3db1aed 100644 --- a/polling/infrequent/activities.py +++ b/polling/infrequent/activities.py @@ -1,11 +1,10 @@ from temporalio import activity -from polling.test_service import ComposeGreetingInput, TestService +from polling.test_service import ComposeGreetingInput, get_service_result @activity.defn async def compose_greeting(input: ComposeGreetingInput) -> str: - test_service = TestService() # If this raises an exception because it's not done yet, the activity will # continually be scheduled for retry - return await test_service.get_service_result(input) + return await get_service_result(input) diff --git a/polling/test_service.py b/polling/test_service.py index 42a54ab4..79d8ea7a 100644 --- a/polling/test_service.py +++ b/polling/test_service.py @@ -1,5 +1,11 @@ +from collections import Counter from dataclasses import dataclass +from temporalio import activity + +attempts = Counter() +ERROR_ATTEMPTS = 5 + @dataclass class ComposeGreetingInput: @@ -7,17 +13,11 @@ class ComposeGreetingInput: name: str -try_attempts = 0 - - -class TestService: - def __init__(self): - self.error_attempts = 5 +async def get_service_result(input): + attempts[activity.info().workflow_id] += 1 + attempt = attempts[activity.info().workflow_id] - async def get_service_result(self, input): - global try_attempts - print(f"Attempt {try_attempts} of {self.error_attempts} to invoke service") - try_attempts += 1 - if try_attempts % self.error_attempts == 0: - return f"{input.greeting}, {input.name}!" - raise Exception("service is down") + print(f"Attempt {attempt} of {ERROR_ATTEMPTS} to invoke service") + if attempt == ERROR_ATTEMPTS: + return f"{input.greeting}, {input.name}!" + raise Exception("service is down") From ecd5b1e0d36432cb7c4d753a58868e83e12ea20b Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Mon, 25 Nov 2024 12:25:25 -0500 Subject: [PATCH 4/6] Add type annotation --- polling/test_service.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/polling/test_service.py b/polling/test_service.py index 79d8ea7a..9227b8f4 100644 --- a/polling/test_service.py +++ b/polling/test_service.py @@ -1,9 +1,9 @@ -from collections import Counter from dataclasses import dataclass +from typing import Counter from temporalio import activity -attempts = Counter() +attempts = Counter[str]() ERROR_ATTEMPTS = 5 From 14bce90727d108381d723dad2c0a09ec6cf477ed Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Mon, 25 Nov 2024 15:41:06 -0500 Subject: [PATCH 5/6] s/macos-12/macos-13/ --- .github/workflows/ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index c32f9da1..8bcb6c9e 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -16,7 +16,7 @@ jobs: os: [ubuntu-latest, macos-intel, macos-arm, windows-latest] include: - os: macos-intel - runsOn: macos-12 + runsOn: macos-13 - os: macos-arm runsOn: macos-14 # macOS ARM 3.8 does not have an available Python build at From bd524ab7402b753afc27dc5bf14a5c3b4341b01d Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Mon, 25 Nov 2024 15:43:35 -0500 Subject: [PATCH 6/6] Clean up --- polling/test_service.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/polling/test_service.py b/polling/test_service.py index 9227b8f4..490f8476 100644 --- a/polling/test_service.py +++ b/polling/test_service.py @@ -14,10 +14,10 @@ class ComposeGreetingInput: async def get_service_result(input): - attempts[activity.info().workflow_id] += 1 - attempt = attempts[activity.info().workflow_id] + workflow_id = activity.info().workflow_id + attempts[workflow_id] += 1 - print(f"Attempt {attempt} of {ERROR_ATTEMPTS} to invoke service") - if attempt == ERROR_ATTEMPTS: + print(f"Attempt {attempts[workflow_id]} of {ERROR_ATTEMPTS} to invoke service") + if attempts[workflow_id] == ERROR_ATTEMPTS: return f"{input.greeting}, {input.name}!" raise Exception("service is down")