diff --git a/client/src/lib.rs b/client/src/lib.rs index e6b0c87a7..409c46f90 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -232,7 +232,9 @@ impl Default for RetryConfig { } impl RetryConfig { - pub(crate) const fn task_poll_retry_policy() -> Self { + /// The default retry policy for task poll requests (workflow, activity, nexus). + /// Retries indefinitely with exponential backoff (200ms initial, 2x multiplier, 10s max). + pub const fn task_poll_retry_policy() -> Self { Self { initial_interval: Duration::from_millis(200), randomization_factor: 0.2, diff --git a/client/src/retry.rs b/client/src/retry.rs index 9bc8d5ce1..34eead2d0 100644 --- a/client/src/retry.rs +++ b/client/src/retry.rs @@ -25,6 +25,7 @@ const LONG_POLL_FATAL_GRACE: Duration = Duration::from_secs(60); pub struct RetryClient { client: SG, retry_config: Arc, + task_poll_retry_override: Option>, } impl RetryClient { @@ -33,8 +34,17 @@ impl RetryClient { Self { client, retry_config: Arc::new(retry_config), + task_poll_retry_override: None, } } + + /// Override the retry policy used for task poll requests (workflow, activity, nexus). + /// By default, task polls use an unlimited-retry policy. Use this to set a finite + /// `max_retries` so the worker exits after repeated poll failures. + pub fn with_task_poll_retry_config(mut self, config: RetryConfig) -> Self { + self.task_poll_retry_override = Some(Arc::new(config)); + self + } } impl RetryClient { @@ -71,7 +81,10 @@ impl RetryClient { retry_short_circuit = ext.get::().cloned(); } let retry_cfg = if call_type == CallType::TaskLongPoll { - RetryConfig::task_poll_retry_policy() + self.task_poll_retry_override + .as_deref() + .cloned() + .unwrap_or_else(RetryConfig::task_poll_retry_policy) } else { (*self.retry_config).clone() }; diff --git a/core-api/src/worker.rs b/core-api/src/worker.rs index 38e6e503c..24fe02552 100644 --- a/core-api/src/worker.rs +++ b/core-api/src/worker.rs @@ -122,6 +122,14 @@ pub struct WorkerConfig { #[builder(default = "Duration::from_secs(5)")] pub local_timeout_buffer_for_activities: Duration, + /// Maximum number of retries for task poll requests (workflow, activity, nexus) before the + /// error is propagated. 0 means unlimited retries (default, preserving existing behavior). + /// Setting this to a nonzero value causes the worker to exit with an error after that many + /// consecutive poll failures, allowing the orchestrator (e.g. Kubernetes) to restart the pod + /// with fresh connections. + #[builder(default = "0")] + pub max_task_poll_retries: usize, + /// Any error types listed here will cause any workflow being processed by this worker to fail, /// rather than simply failing the workflow task. #[builder(default)] diff --git a/core/src/lib.rs b/core/src/lib.rs index dcaf1c147..06db1cf30 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -134,7 +134,14 @@ pub(crate) fn init_worker_client( if let Some(ref id_override) = config.client_identity_override { client.options_mut().identity.clone_from(id_override); } - RetryClient::new(client, RetryConfig::default()) + let retry_client = RetryClient::new(client, RetryConfig::default()); + if config.max_task_poll_retries > 0 { + let mut poll_cfg = RetryConfig::task_poll_retry_policy(); + poll_cfg.max_retries = config.max_task_poll_retries; + retry_client.with_task_poll_retry_config(poll_cfg) + } else { + retry_client + } } /// Creates a unique sticky queue name for a worker, iff the config allows for 1 or more cached