Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,9 @@ impl Default for RetryConfig {
}

impl RetryConfig {
pub(crate) const fn task_poll_retry_policy() -> Self {
/// The default retry policy for task poll requests (workflow, activity, nexus).
/// Retries indefinitely with exponential backoff (200ms initial, 2x multiplier, 10s max).
pub const fn task_poll_retry_policy() -> Self {
Self {
initial_interval: Duration::from_millis(200),
randomization_factor: 0.2,
Expand Down
15 changes: 14 additions & 1 deletion client/src/retry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ const LONG_POLL_FATAL_GRACE: Duration = Duration::from_secs(60);
pub struct RetryClient<SG> {
client: SG,
retry_config: Arc<RetryConfig>,
task_poll_retry_override: Option<Arc<RetryConfig>>,
}

impl<SG> RetryClient<SG> {
Expand All @@ -33,8 +34,17 @@ impl<SG> RetryClient<SG> {
Self {
client,
retry_config: Arc::new(retry_config),
task_poll_retry_override: None,
}
}

/// Override the retry policy used for task poll requests (workflow, activity, nexus).
/// By default, task polls use an unlimited-retry policy. Use this to set a finite
/// `max_retries` so the worker exits after repeated poll failures.
pub fn with_task_poll_retry_config(mut self, config: RetryConfig) -> Self {
self.task_poll_retry_override = Some(Arc::new(config));
self
}
}

impl<SG> RetryClient<SG> {
Expand Down Expand Up @@ -71,7 +81,10 @@ impl<SG> RetryClient<SG> {
retry_short_circuit = ext.get::<NoRetryOnMatching>().cloned();
}
let retry_cfg = if call_type == CallType::TaskLongPoll {
RetryConfig::task_poll_retry_policy()
self.task_poll_retry_override
.as_deref()
.cloned()
.unwrap_or_else(RetryConfig::task_poll_retry_policy)
} else {
(*self.retry_config).clone()
};
Expand Down
8 changes: 8 additions & 0 deletions core-api/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,14 @@ pub struct WorkerConfig {
#[builder(default = "Duration::from_secs(5)")]
pub local_timeout_buffer_for_activities: Duration,

/// Maximum number of retries for task poll requests (workflow, activity, nexus) before the
/// error is propagated. 0 means unlimited retries (default, preserving existing behavior).
/// Setting this to a nonzero value causes the worker to exit with an error after that many
/// consecutive poll failures, allowing the orchestrator (e.g. Kubernetes) to restart the pod
/// with fresh connections.
#[builder(default = "0")]
pub max_task_poll_retries: usize,

/// Any error types listed here will cause any workflow being processed by this worker to fail,
/// rather than simply failing the workflow task.
#[builder(default)]
Expand Down
9 changes: 8 additions & 1 deletion core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,14 @@ pub(crate) fn init_worker_client(
if let Some(ref id_override) = config.client_identity_override {
client.options_mut().identity.clone_from(id_override);
}
RetryClient::new(client, RetryConfig::default())
let retry_client = RetryClient::new(client, RetryConfig::default());
if config.max_task_poll_retries > 0 {
let mut poll_cfg = RetryConfig::task_poll_retry_policy();
poll_cfg.max_retries = config.max_task_poll_retries;
retry_client.with_task_poll_retry_config(poll_cfg)
} else {
retry_client
}
}

/// Creates a unique sticky queue name for a worker, iff the config allows for 1 or more cached
Expand Down
Loading