diff --git a/polling/frequent/activities.py b/polling/frequent/activities.py index 2b49e379..4e78b9e1 100644 --- a/polling/frequent/activities.py +++ b/polling/frequent/activities.py @@ -16,10 +16,11 @@ class ComposeGreetingInput: @activity.defn async def compose_greeting(input: ComposeGreetingInput) -> str: test_service = TestService() + attempt = 1 while True: try: try: - result = await test_service.get_service_result(input) + result = await test_service.get_service_result(input, attempt) activity.logger.info(f"Exiting activity ${result}") return result except Exception as e: @@ -28,6 +29,7 @@ async def compose_greeting(input: ComposeGreetingInput) -> str: activity.heartbeat("Invoking activity") await asyncio.sleep(1) + attempt += 1 except asyncio.CancelledError: # activity was either cancelled or workflow was completed or worker shut down # if you need to clean up you can catch this. diff --git a/polling/infrequent/activities.py b/polling/infrequent/activities.py index 2bd71587..6572a3dd 100644 --- a/polling/infrequent/activities.py +++ b/polling/infrequent/activities.py @@ -16,4 +16,4 @@ async def compose_greeting(input: ComposeGreetingInput) -> str: test_service = TestService() # If this raises an exception because it's not done yet, the activity will # continually be scheduled for retry - return await test_service.get_service_result(input) + return await test_service.get_service_result(input, activity.info().attempt) diff --git a/polling/infrequent/workflows.py b/polling/infrequent/workflows.py index 35769573..bc2e4c4b 100644 --- a/polling/infrequent/workflows.py +++ b/polling/infrequent/workflows.py @@ -14,9 +14,9 @@ async def run(self, name: str) -> str: return await workflow.execute_activity( compose_greeting, ComposeGreetingInput("Hello", name), - start_to_close_timeout=timedelta(seconds=2), + start_to_close_timeout=timedelta(seconds=5), retry_policy=RetryPolicy( backoff_coefficient=1.0, - initial_interval=timedelta(seconds=60), + initial_interval=timedelta(seconds=30), ), ) diff --git a/polling/test_service.py b/polling/test_service.py index 3744998a..8698b838 100644 --- a/polling/test_service.py +++ b/polling/test_service.py @@ -1,14 +1,9 @@ class TestService: def __init__(self): - self.try_attempts = 0 self.error_attempts = 5 - async def get_service_result(self, input): - print( - f"Attempt {self.try_attempts}" - f" of {self.error_attempts} to invoke service" - ) - self.try_attempts += 1 - if self.try_attempts % self.error_attempts == 0: + async def get_service_result(self, input, attempt: int): + print(f"Attempt {attempt} of {self.error_attempts} to invoke service") + if attempt > self.error_attempts: return f"{input.greeting}, {input.name}!" raise Exception("service is down") diff --git a/tests/polling/infrequent/__init__.py b/tests/polling/infrequent/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/polling/infrequent/workflow_test.py b/tests/polling/infrequent/workflow_test.py new file mode 100644 index 00000000..31f3f987 --- /dev/null +++ b/tests/polling/infrequent/workflow_test.py @@ -0,0 +1,31 @@ +import uuid + +import pytest +from temporalio.client import Client +from temporalio.testing import WorkflowEnvironment +from temporalio.worker import Worker + +from polling.infrequent.activities import compose_greeting +from polling.infrequent.workflows import GreetingWorkflow + + +async def test_infrequent_polling_workflow(client: Client, env: WorkflowEnvironment): + if not env.supports_time_skipping: + pytest.skip("Too slow to test with time-skipping disabled") + + # Start a worker that hosts the workflow and activity implementations. + task_queue = f"tq-{uuid.uuid4()}" + async with Worker( + client, + task_queue=task_queue, + workflows=[GreetingWorkflow], + activities=[compose_greeting], + ): + handle = await client.start_workflow( + GreetingWorkflow.run, + "Temporal", + id=f"infrequent-polling-{uuid.uuid4()}", + task_queue=task_queue, + ) + result = await handle.result() + assert result == "Hello, Temporal!"