Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion polling/frequent/activities.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@ class ComposeGreetingInput:
@activity.defn
async def compose_greeting(input: ComposeGreetingInput) -> str:
test_service = TestService()
attempt = 1
while True:
try:
try:
result = await test_service.get_service_result(input)
result = await test_service.get_service_result(input, attempt)
Copy link
Contributor

@dandavison dandavison Nov 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that the way the sample previously did not pass the attempt number to the test service is desirable to maintain, since in real world examples the service in question won't want to know about our attempt counting.

In other words, the way it is now will make some readers think that the solution to the polling-from-activity problem involves passing an attempt count to a service.

Is there a way to fix the algorithm without explicitly passing the attempt number to the service?

Copy link
Contributor Author

@drewhoskins-temporal drewhoskins-temporal Nov 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand and like the meta-point of wanting samples to look real-world, though this one feels shippable anyway. I could use a global, but it wouldn't work out-of-process, and anyway I'm not inspired to spend the time to change this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I do think it's worth finding a way to make your test pass without making the sample misleading. Here's a global variable version: #152 (I'm starting to see why the code had that modulo-arithmetic return condition!)

Copy link
Contributor Author

@drewhoskins-temporal drewhoskins-temporal Nov 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, exactly, this doesn't work predictably when it's run multiple times on the same worker. I really don't think anybody's going to be misled by it as it is, and I would greatly prefer predictability and test isolation as a user.
That said, I wouldn't block your approach if you feel strongly enough.
Can you either accept my PR and then add on top of it, or just leave it as I have it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GitHub was having problems with this PR after I brought it up-to-date with main and it's closed currently. Are you able to reopen it?

The code in this sample falls into two categories:

  1. Code that we are providing to users, essentially saying "follow this closely; this is what your code should look like". Note that users may well copy and paste code in this category. In particular: workflows.py, activities.py

  2. Private implementation detail of the sample, which exists only to make the sample work. Users will not copy and paste this code because, in their use cases, its role will be played by something specific to their domain. In particular: test_service.py

For (2) the basic idea of the sample is to create a toy service that is stateful, only responding after a few attempts. But that statefulness needs to be private to the test_service implementation, since it's modeling real-world slowness-to-come-up. The one thing we don't want to do is give users the impression that they need to count their attempts in their activity code and send them to a service. The sample in its current form takes care not to do that, and we don't need to make the same worse in that respect.

There are a few options I think:

  1. We could move the counter in the test_service to the module level and use modulo arithmetic to address the issue that it might be used by multiple workflows.
  2. We could move the line test_service = TestService() in activities.py to the top level and use modulo arithmetic to address the issue that it might be used by multiple workflows.
  3. We could use a dict keyed by activity.info().workflow_id for the counter in test_service

All those would teach the lesson we're trying to teach correctly. (1) and (2) are suboptimal in that they'd result in some potentially confusing shared state if someone were to run multiple instances of the sample concurrently. So (3) is perhaps the best choice, seeing as it's hardly more complex than (1) and (2).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've implemented option (3) in #152, which builds upon this PR (i.e. using the test you added).

activity.logger.info(f"Exiting activity ${result}")
return result
except Exception as e:
Expand All @@ -28,6 +29,7 @@ async def compose_greeting(input: ComposeGreetingInput) -> str:

activity.heartbeat("Invoking activity")
await asyncio.sleep(1)
attempt += 1
except asyncio.CancelledError:
# activity was either cancelled or workflow was completed or worker shut down
# if you need to clean up you can catch this.
Expand Down
2 changes: 1 addition & 1 deletion polling/infrequent/activities.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,4 @@ async def compose_greeting(input: ComposeGreetingInput) -> str:
test_service = TestService()
# If this raises an exception because it's not done yet, the activity will
# continually be scheduled for retry
return await test_service.get_service_result(input)
return await test_service.get_service_result(input, activity.info().attempt)
4 changes: 2 additions & 2 deletions polling/infrequent/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ async def run(self, name: str) -> str:
return await workflow.execute_activity(
compose_greeting,
ComposeGreetingInput("Hello", name),
start_to_close_timeout=timedelta(seconds=2),
start_to_close_timeout=timedelta(seconds=5),
retry_policy=RetryPolicy(
backoff_coefficient=1.0,
initial_interval=timedelta(seconds=60),
initial_interval=timedelta(seconds=30),
),
)
11 changes: 3 additions & 8 deletions polling/test_service.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,9 @@
class TestService:
def __init__(self):
self.try_attempts = 0
self.error_attempts = 5

async def get_service_result(self, input):
print(
f"Attempt {self.try_attempts}"
f" of {self.error_attempts} to invoke service"
)
self.try_attempts += 1
if self.try_attempts % self.error_attempts == 0:
async def get_service_result(self, input, attempt: int):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
async def get_service_result(self, input, attempt: int):
async def get_service_result(self, input: ComposeGreetingInput, attempt: int):

Copy link
Contributor Author

@drewhoskins-temporal drewhoskins-temporal Nov 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I went a couple turns down this rabbit hole, but it turns out to require a shockingly large refactoring, which is not worth doing.

print(f"Attempt {attempt} of {self.error_attempts} to invoke service")
if attempt > self.error_attempts:
return f"{input.greeting}, {input.name}!"
raise Exception("service is down")
Empty file.
31 changes: 31 additions & 0 deletions tests/polling/infrequent/workflow_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import uuid

import pytest
from temporalio.client import Client
from temporalio.testing import WorkflowEnvironment
from temporalio.worker import Worker

from polling.infrequent.activities import compose_greeting
from polling.infrequent.workflows import GreetingWorkflow


async def test_infrequent_polling_workflow(client: Client, env: WorkflowEnvironment):
if not env.supports_time_skipping:
pytest.skip("Too slow to test with time-skipping disabled")

# Start a worker that hosts the workflow and activity implementations.
task_queue = f"tq-{uuid.uuid4()}"
async with Worker(
client,
task_queue=task_queue,
workflows=[GreetingWorkflow],
activities=[compose_greeting],
):
handle = await client.start_workflow(
GreetingWorkflow.run,
"Temporal",
id=f"infrequent-polling-{uuid.uuid4()}",
task_queue=task_queue,
)
result = await handle.result()
assert result == "Hello, Temporal!"
Loading