diff --git a/temporalio/bridge/sdk-core b/temporalio/bridge/sdk-core index e18982ec7..1cbca021d 160000 --- a/temporalio/bridge/sdk-core +++ b/temporalio/bridge/sdk-core @@ -1 +1 @@ -Subproject commit e18982ec72be62e357a5ea418b1670c8b2fee55f +Subproject commit 1cbca021d29763c52129730644a95d6f8cf68931 diff --git a/temporalio/bridge/src/worker.rs b/temporalio/bridge/src/worker.rs index d9a6487b6..85833f440 100644 --- a/temporalio/bridge/src/worker.rs +++ b/temporalio/bridge/src/worker.rs @@ -15,9 +15,8 @@ use temporal_sdk_core::api::errors::PollError; use temporal_sdk_core::replay::{HistoryForReplay, ReplayWorkerInput}; use temporal_sdk_core_api::errors::WorkflowErrorType; use temporal_sdk_core_api::worker::{ - PollerBehavior, SlotInfo, SlotInfoTrait, SlotKind, SlotKindType, SlotMarkUsedContext, - SlotReleaseContext, SlotReservationContext, SlotSupplier as SlotSupplierTrait, - SlotSupplierPermit, + SlotInfo, SlotInfoTrait, SlotKind, SlotKindType, SlotMarkUsedContext, SlotReleaseContext, + SlotReservationContext, SlotSupplier as SlotSupplierTrait, SlotSupplierPermit, }; use temporal_sdk_core_api::Worker; use temporal_sdk_core_protos::coresdk::workflow_completion::WorkflowActivationCompletion; @@ -49,9 +48,9 @@ pub struct WorkerConfig { identity_override: Option, max_cached_workflows: usize, tuner: TunerHolder, - max_concurrent_workflow_task_polls: usize, + workflow_task_poller_behavior: PollerBehavior, nonsticky_to_sticky_poll_ratio: f32, - max_concurrent_activity_task_polls: usize, + activity_task_poller_behavior: PollerBehavior, no_remote_activities: bool, sticky_queue_schedule_to_start_timeout_millis: u64, max_heartbeat_throttle_interval_millis: u64, @@ -63,6 +62,42 @@ pub struct WorkerConfig { nondeterminism_as_workflow_fail_for_types: HashSet, } +#[derive(FromPyObject)] +pub struct PollerBehaviorSimpleMaximum { + pub simple_maximum: usize, +} + +#[derive(FromPyObject)] +pub struct PollerBehaviorAutoscaling { + pub minimum: usize, + pub maximum: usize, + pub initial: usize, +} + +/// Recreates [temporal_sdk_core_api::worker::PollerBehavior] +#[derive(FromPyObject)] +pub enum PollerBehavior { + SimpleMaximum(PollerBehaviorSimpleMaximum), + Autoscaling(PollerBehaviorAutoscaling), +} + +impl From for temporal_sdk_core_api::worker::PollerBehavior { + fn from(value: PollerBehavior) -> Self { + match value { + PollerBehavior::SimpleMaximum(simple) => { + temporal_sdk_core_api::worker::PollerBehavior::SimpleMaximum(simple.simple_maximum) + } + PollerBehavior::Autoscaling(auto) => { + temporal_sdk_core_api::worker::PollerBehavior::Autoscaling { + minimum: auto.minimum, + maximum: auto.maximum, + initial: auto.initial, + } + } + } + } +} + /// Recreates [temporal_sdk_core_api::worker::WorkerVersioningStrategy] #[derive(FromPyObject)] pub enum WorkerVersioningStrategy { @@ -626,14 +661,10 @@ fn convert_worker_config( .versioning_strategy(converted_versioning_strategy) .client_identity_override(conf.identity_override) .max_cached_workflows(conf.max_cached_workflows) - .workflow_task_poller_behavior(PollerBehavior::SimpleMaximum( - conf.max_concurrent_workflow_task_polls, - )) + .workflow_task_poller_behavior(conf.workflow_task_poller_behavior) .tuner(Arc::new(converted_tuner)) .nonsticky_to_sticky_poll_ratio(conf.nonsticky_to_sticky_poll_ratio) - .activity_task_poller_behavior(PollerBehavior::SimpleMaximum( - conf.max_concurrent_activity_task_polls, - )) + .activity_task_poller_behavior(conf.activity_task_poller_behavior) .no_remote_activities(conf.no_remote_activities) .sticky_queue_schedule_to_start_timeout(Duration::from_millis( conf.sticky_queue_schedule_to_start_timeout_millis, diff --git a/temporalio/bridge/worker.py b/temporalio/bridge/worker.py index 3ef78ec76..8c9d2e9e0 100644 --- a/temporalio/bridge/worker.py +++ b/temporalio/bridge/worker.py @@ -48,9 +48,9 @@ class WorkerConfig: identity_override: Optional[str] max_cached_workflows: int tuner: TunerHolder - max_concurrent_workflow_task_polls: int + workflow_task_poller_behavior: PollerBehavior nonsticky_to_sticky_poll_ratio: float - max_concurrent_activity_task_polls: int + activity_task_poller_behavior: PollerBehavior no_remote_activities: bool sticky_queue_schedule_to_start_timeout_millis: int max_heartbeat_throttle_interval_millis: int @@ -62,6 +62,28 @@ class WorkerConfig: nondeterminism_as_workflow_fail_for_types: Set[str] +@dataclass +class PollerBehaviorSimpleMaximum: + """Python representation of the Rust struct for simple poller behavior.""" + + simple_maximum: int + + +@dataclass +class PollerBehaviorAutoscaling: + """Python representation of the Rust struct for autoscaling poller behavior.""" + + minimum: int + maximum: int + initial: int + + +PollerBehavior: TypeAlias = Union[ + PollerBehaviorSimpleMaximum, + PollerBehaviorAutoscaling, +] + + @dataclass class WorkerDeploymentVersion: """Python representation of the Rust struct for configuring a worker deployment version.""" diff --git a/temporalio/worker/__init__.py b/temporalio/worker/__init__.py index 96e37d3b3..bd053fcf2 100644 --- a/temporalio/worker/__init__.py +++ b/temporalio/worker/__init__.py @@ -43,6 +43,9 @@ WorkflowSlotInfo, ) from ._worker import ( + PollerBehavior, + PollerBehaviorAutoscaling, + PollerBehaviorSimpleMaximum, Worker, WorkerConfig, WorkerDeploymentConfig, @@ -65,6 +68,9 @@ "ReplayerConfig", "WorkflowReplayResult", "WorkflowReplayResults", + "PollerBehavior", + "PollerBehaviorSimpleMaximum", + "PollerBehaviorAutoscaling", # Interceptor base classes "Interceptor", "ActivityInboundInterceptor", diff --git a/temporalio/worker/_replayer.py b/temporalio/worker/_replayer.py index 238d64ace..964184196 100644 --- a/temporalio/worker/_replayer.py +++ b/temporalio/worker/_replayer.py @@ -242,9 +242,7 @@ def on_eviction_hook( 1 ), ), - max_concurrent_workflow_task_polls=1, nonsticky_to_sticky_poll_ratio=1, - max_concurrent_activity_task_polls=1, no_remote_activities=True, sticky_queue_schedule_to_start_timeout_millis=1000, max_heartbeat_throttle_interval_millis=1000, @@ -255,6 +253,12 @@ def on_eviction_hook( versioning_strategy=temporalio.bridge.worker.WorkerVersioningStrategyNone( build_id=self._config["build_id"] or load_default_build_id(), ), + workflow_task_poller_behavior=temporalio.bridge.worker.PollerBehaviorSimpleMaximum( + 1 + ), + activity_task_poller_behavior=temporalio.bridge.worker.PollerBehaviorSimpleMaximum( + 1 + ), ), ) # Start worker diff --git a/temporalio/worker/_worker.py b/temporalio/worker/_worker.py index f0b446111..786203dca 100644 --- a/temporalio/worker/_worker.py +++ b/temporalio/worker/_worker.py @@ -18,10 +18,11 @@ Optional, Sequence, Type, + Union, cast, ) -from typing_extensions import TypedDict +from typing_extensions import TypeAlias, TypedDict import temporalio.activity import temporalio.api.common.v1 @@ -48,6 +49,48 @@ logger = logging.getLogger(__name__) +@dataclass(frozen=True) +class PollerBehaviorSimpleMaximum: + """A poller behavior that will attempt to poll as long as a slot is available, up to the + provided maximum. Cannot be less than two for workflow tasks, or one for other tasks. + """ + + maximum: int = 5 + + def _to_bridge(self) -> temporalio.bridge.worker.PollerBehavior: + return temporalio.bridge.worker.PollerBehaviorSimpleMaximum( + simple_maximum=self.maximum + ) + + +@dataclass(frozen=True) +class PollerBehaviorAutoscaling: + """A poller behavior that will automatically scale the number of pollers based on feedback + from the server. A slot must be available before beginning polling. + """ + + minimum: int = 1 + """At least this many poll calls will always be attempted (assuming slots are available).""" + maximum: int = 100 + """At most this many poll calls will ever be open at once. Must be >= `minimum`.""" + initial: int = 5 + """This many polls will be attempted initially before scaling kicks in. Must be between + `minimum` and `maximum`.""" + + def _to_bridge(self) -> temporalio.bridge.worker.PollerBehavior: + return temporalio.bridge.worker.PollerBehaviorAutoscaling( + minimum=self.minimum, + maximum=self.maximum, + initial=self.initial, + ) + + +PollerBehavior: TypeAlias = Union[ + PollerBehaviorSimpleMaximum, + PollerBehaviorAutoscaling, +] + + class Worker: """Worker to process workflows and/or activities. @@ -76,9 +119,9 @@ def __init__( max_concurrent_activities: Optional[int] = None, max_concurrent_local_activities: Optional[int] = None, tuner: Optional[WorkerTuner] = None, - max_concurrent_workflow_task_polls: int = 5, + max_concurrent_workflow_task_polls: Optional[int] = None, nonsticky_to_sticky_poll_ratio: float = 0.2, - max_concurrent_activity_task_polls: int = 5, + max_concurrent_activity_task_polls: Optional[int] = None, no_remote_activities: bool = False, sticky_queue_schedule_to_start_timeout: timedelta = timedelta(seconds=10), max_heartbeat_throttle_interval: timedelta = timedelta(seconds=60), @@ -94,6 +137,12 @@ def __init__( use_worker_versioning: bool = False, disable_safe_workflow_eviction: bool = False, deployment_config: Optional[WorkerDeploymentConfig] = None, + workflow_task_poller_behavior: PollerBehavior = PollerBehaviorSimpleMaximum( + maximum=5 + ), + activity_task_poller_behavior: PollerBehavior = PollerBehaviorSimpleMaximum( + maximum=5 + ), ) -> None: """Create a worker to process workflows and/or activities. @@ -152,10 +201,17 @@ def __init__( ``max_concurrent_workflow_tasks``, ``max_concurrent_activities``, and ``max_concurrent_local_activities`` arguments. + Defaults to fixed-size 100 slots for each slot kind if unset and none of the + max_* arguments are provided. + WARNING: This argument is experimental max_concurrent_workflow_task_polls: Maximum number of concurrent poll workflow task requests we will perform at a time on this worker's task queue. + + If set, will override any value passed to ``workflow_task_poller_behavior``. + + WARNING: Deprecated, use ``workflow_task_poller_behavior`` instead nonsticky_to_sticky_poll_ratio: max_concurrent_workflow_task_polls * this number = the number of max pollers that will be allowed for the nonsticky queue when sticky tasks are enabled. If both @@ -166,6 +222,10 @@ def __init__( max_concurrent_activity_task_polls: Maximum number of concurrent poll activity task requests we will perform at a time on this worker's task queue. + + If set, will override any value passed to ``activity_task_poller_behavior``. + + WARNING: Deprecated, use ``activity_task_poller_behavior`` instead no_remote_activities: If true, this worker will only handle workflow tasks and local activities, it will not poll for activity tasks. sticky_queue_schedule_to_start_timeout: How long a workflow task is @@ -231,6 +291,10 @@ def __init__( deployment_config: Deployment config for the worker. Exclusive with `build_id` and `use_worker_versioning`. WARNING: This is an experimental feature and may change in the future. + workflow_task_poller_behavior: Specify the behavior of workflow task polling. + Defaults to a 5-poller maximum. + activity_task_poller_behavior: Specify the behavior of activity task polling. + Defaults to a 5-poller maximum. """ if not activities and not workflows: raise ValueError("At least one activity or workflow must be specified") @@ -393,6 +457,15 @@ def __init__( build_id=build_id ) + if max_concurrent_workflow_task_polls: + workflow_task_poller_behavior = PollerBehaviorSimpleMaximum( + maximum=max_concurrent_workflow_task_polls + ) + if max_concurrent_activity_task_polls: + activity_task_poller_behavior = PollerBehaviorSimpleMaximum( + maximum=max_concurrent_activity_task_polls + ) + # Create bridge worker last. We have empirically observed that if it is # created before an error is raised from the activity worker # constructor, a deadlock/hang will occur presumably while trying to @@ -408,9 +481,7 @@ def __init__( identity_override=identity, max_cached_workflows=max_cached_workflows, tuner=bridge_tuner, - max_concurrent_workflow_task_polls=max_concurrent_workflow_task_polls, nonsticky_to_sticky_poll_ratio=nonsticky_to_sticky_poll_ratio, - max_concurrent_activity_task_polls=max_concurrent_activity_task_polls, # We have to disable remote activities if a user asks _or_ if we # are not running an activity worker at all. Otherwise shutdown # will not proceed properly. @@ -440,6 +511,8 @@ def __init__( else set() ), versioning_strategy=versioning_strategy, + workflow_task_poller_behavior=workflow_task_poller_behavior._to_bridge(), + activity_task_poller_behavior=activity_task_poller_behavior._to_bridge(), ), ) @@ -696,9 +769,9 @@ class WorkerConfig(TypedDict, total=False): max_concurrent_activities: Optional[int] max_concurrent_local_activities: Optional[int] tuner: Optional[WorkerTuner] - max_concurrent_workflow_task_polls: int + max_concurrent_workflow_task_polls: Optional[int] nonsticky_to_sticky_poll_ratio: float - max_concurrent_activity_task_polls: int + max_concurrent_activity_task_polls: Optional[int] no_remote_activities: bool sticky_queue_schedule_to_start_timeout: timedelta max_heartbeat_throttle_interval: timedelta @@ -714,6 +787,8 @@ class WorkerConfig(TypedDict, total=False): use_worker_versioning: bool disable_safe_workflow_eviction: bool deployment_config: Optional[WorkerDeploymentConfig] + workflow_task_poller_behavior: PollerBehavior + activity_task_poller_behavior: PollerBehavior @dataclass diff --git a/tests/worker/test_worker.py b/tests/worker/test_worker.py index 76eb6a238..c66f3b145 100644 --- a/tests/worker/test_worker.py +++ b/tests/worker/test_worker.py @@ -6,6 +6,7 @@ import uuid from datetime import timedelta from typing import Any, Awaitable, Callable, Optional, Sequence +from urllib.request import urlopen import temporalio.api.enums.v1 import temporalio.worker._worker @@ -20,6 +21,7 @@ ) from temporalio.client import BuildIdOpAddNewDefault, Client, TaskReachabilityType from temporalio.common import RawValue, VersioningBehavior +from temporalio.runtime import PrometheusConfig, Runtime, TelemetryConfig from temporalio.service import RPCError from temporalio.testing import WorkflowEnvironment from temporalio.worker import ( @@ -27,6 +29,7 @@ CustomSlotSupplier, FixedSizeSlotSupplier, LocalActivitySlotInfo, + PollerBehaviorAutoscaling, ResourceBasedSlotConfig, ResourceBasedSlotSupplier, ResourceBasedTunerConfig, @@ -41,7 +44,12 @@ WorkflowSlotInfo, ) from temporalio.workflow import VersioningIntent -from tests.helpers import assert_eventually, new_worker, worker_versioning_enabled +from tests.helpers import ( + assert_eventually, + find_free_port, + new_worker, + worker_versioning_enabled, +) # Passing through because Python 3.9 has an import bug at # https://github.com/python/cpython/issues/91351 @@ -919,6 +927,58 @@ async def test_workflows_can_use_default_versioning_behavior( ) +async def test_can_run_autoscaling_polling_worker( + client: Client, env: WorkflowEnvironment +): + # Create new runtime with Prom server + prom_addr = f"127.0.0.1:{find_free_port()}" + runtime = Runtime( + telemetry=TelemetryConfig( + metrics=PrometheusConfig(bind_address=prom_addr), + ) + ) + client = await Client.connect( + client.service_client.config.target_host, + namespace=client.namespace, + runtime=runtime, + ) + + async with new_worker( + client, + WaitOnSignalWorkflow, + activities=[say_hello], + workflow_task_poller_behavior=PollerBehaviorAutoscaling(initial=2), + activity_task_poller_behavior=PollerBehaviorAutoscaling(initial=2), + ) as w: + # Give pollers a beat to start + await asyncio.sleep(0.3) + + with urlopen(url=f"http://{prom_addr}/metrics") as f: + prom_str: str = f.read().decode("utf-8") + prom_lines = prom_str.splitlines() + matches = [line for line in prom_lines if "temporal_num_pollers" in line] + activity_pollers = [l for l in matches if "activity_task" in l] + assert len(activity_pollers) == 1 + assert activity_pollers[0].endswith("2") + workflow_pollers = [l for l in matches if "workflow_task" in l] + assert len(workflow_pollers) == 2 + # There's sticky & non-sticky pollers, and they may have a count of 1 or 2 depending on + # initialization timing. + assert workflow_pollers[0].endswith("2") or workflow_pollers[0].endswith("1") + assert workflow_pollers[1].endswith("2") or workflow_pollers[1].endswith("1") + + async def do_workflow(): + wf = await client.start_workflow( + WaitOnSignalWorkflow.run, + id=f"resource-based-{uuid.uuid4()}", + task_queue=w.task_queue, + ) + await wf.signal(WaitOnSignalWorkflow.my_signal, "finish") + await wf.result() + + await asyncio.gather(*[do_workflow() for _ in range(20)]) + + async def wait_until_worker_deployment_visible( client: Client, version: WorkerDeploymentVersion ) -> DescribeWorkerDeploymentResponse: