From 2d1837a4be383e76285a035a11064a17e9345586 Mon Sep 17 00:00:00 2001 From: Luke Sandberg Date: Fri, 3 Apr 2026 11:39:27 -0700 Subject: [PATCH 1/6] fetch: respect HTTP Cache-Control headers with TTL-based invalidation (#91729) --- .../crates/turbo-tasks-fetch/src/client.rs | 259 ++++++++++++++++-- .../crates/turbo-tasks-fetch/tests/fetch.rs | 231 +++++++++++++++- 2 files changed, 470 insertions(+), 20 deletions(-) diff --git a/turbopack/crates/turbo-tasks-fetch/src/client.rs b/turbopack/crates/turbo-tasks-fetch/src/client.rs index 1c374a301976..209186911797 100644 --- a/turbopack/crates/turbo-tasks-fetch/src/client.rs +++ b/turbopack/crates/turbo-tasks-fetch/src/client.rs @@ -1,9 +1,18 @@ -use std::{hash::Hash, sync::LazyLock}; +use std::{ + cmp::max, + fmt::{Display, Formatter}, + hash::Hash, + sync::LazyLock, + time::{Duration, SystemTime}, +}; use anyhow::Result; use quick_cache::sync::Cache; use turbo_rcstr::RcStr; -use turbo_tasks::{Completion, ReadRef, Vc, duration_span}; +use turbo_tasks::{ + Completion, FxIndexSet, InvalidationReason, InvalidationReasonKind, Invalidator, ReadRef, + ResolvedVc, Vc, duration_span, util::StaticOrArc, +}; use crate::{FetchError, FetchResult, HttpResponse, HttpResponseBody}; @@ -19,8 +28,21 @@ static CLIENT_CACHE: LazyLock, reqwest::Client> /// This is needed because [`reqwest::ClientBuilder`] does not implement the required traits. This /// factory cannot be a closure because closures do not implement `Eq` or `Hash`. #[turbo_tasks::value(shared)] -#[derive(Hash, Default)] -pub struct FetchClientConfig {} +#[derive(Hash)] +pub struct FetchClientConfig { + /// Minimum cache TTL in seconds. Responses with a `Cache-Control: max-age` shorter than this + /// will be clamped to this value. This prevents pathologically short timeouts from causing an + /// invalidation bomb. Defaults to 1 hour. + pub min_cache_control_secs: u64, +} + +impl Default for FetchClientConfig { + fn default() -> Self { + Self { + min_cache_control_secs: 60 * 60, + } + } +} impl FetchClientConfig { /// Returns a cached instance of `reqwest::Client` it exists, otherwise constructs a new one. @@ -78,17 +100,66 @@ impl FetchClientConfig { } } +/// Invalidation was caused by a max-age deadline returned by a server +#[derive(PartialEq, Eq, Hash)] +pub(crate) struct HttpTimeout {} + +impl InvalidationReason for HttpTimeout { + fn kind(&self) -> Option> { + Some(StaticOrArc::Static(&HTTP_TIMEOUT_KIND)) + } +} + +impl Display for HttpTimeout { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "http max-age timeout") + } +} + +/// Invalidation kind for [Write] +#[derive(PartialEq, Eq, Hash)] +struct HttpTimeoutKind; + +static HTTP_TIMEOUT_KIND: HttpTimeoutKind = HttpTimeoutKind; + +impl InvalidationReasonKind for HttpTimeoutKind { + fn fmt( + &self, + reasons: &FxIndexSet>, + f: &mut Formatter<'_>, + ) -> std::fmt::Result { + write!(f, "{} fetches timed out", reasons.len(),) + } +} + +/// Internal result from `fetch_inner` that includes the invalidator for TTL-based re-fetching. +#[turbo_tasks::value(shared)] +struct FetchInnerResult { + result: ResolvedVc, + /// Invalidator for the `fetch_inner` task. Used by the outer `fetch` to set up a timer that + /// triggers re-fetching when the Cache-Control max-age expires. + invalidator: Option, + /// Absolute deadline (seconds since UNIX epoch) after which the cached response should be + /// re-fetched. Computed as `now + max-age` at fetch time. An absolute timestamp is used + /// instead of a relative duration so that the remaining TTL is correct on warm cache restore. + deadline_secs: Option, +} + #[turbo_tasks::value_impl] impl FetchClientConfig { + /// Performs the actual HTTP request. This task is `network` but NOT `session_dependent`, so + /// its cached result survives restarts. The outer `fetch` task (which IS `session_dependent`) + /// reads the cached invalidator and sets up a timer for TTL-based re-fetching. #[turbo_tasks::function(network)] - pub async fn fetch( + async fn fetch_inner( self: Vc, url: RcStr, user_agent: Option, - ) -> Result> { + ) -> Result> { let url_ref = &*url; let this = self.await?; - let response_result: reqwest::Result = async move { + let min_cache_control_secs = this.min_cache_control_secs; + let response_result: reqwest::Result<(HttpResponse, Option)> = async move { let reqwest_client = this.try_get_cached_reqwest_client()?; let mut builder = reqwest_client.get(url_ref); @@ -103,6 +174,7 @@ impl FetchClientConfig { .and_then(|r| r.error_for_status())?; let status = response.status().as_u16(); + let max_age = parse_cache_control(response.headers()); let body = { let _span = duration_span!("fetch response", url = url_ref); @@ -110,27 +182,144 @@ impl FetchClientConfig { } .to_vec(); - Ok(HttpResponse { - status, - body: HttpResponseBody(body).resolved_cell(), - }) + Ok(( + HttpResponse { + status, + body: HttpResponseBody(body).resolved_cell(), + }, + max_age, + )) } .await; match response_result { - Ok(resp) => Ok(Vc::cell(Ok(resp.resolved_cell()))), + Ok((resp, max_age_secs)) => { + if let Some(max_age_secs) = max_age_secs { + let max_age_secs = max(max_age_secs, min_cache_control_secs); + let deadline_secs = { + // Transform the relative offset to an absolute deadline so it can be + // cached. + let now = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .expect("system clock is before UNIX epoch") + .as_secs(); + now + max_age_secs + }; + let invalidator = turbo_tasks::get_invalidator(); + Ok(FetchInnerResult { + result: ResolvedVc::cell(Ok(resp.resolved_cell())), + invalidator, + deadline_secs: Some(deadline_secs), + } + .cell()) + } else { + Completion::session_dependent().await?; + Ok(FetchInnerResult { + result: ResolvedVc::cell(Ok(resp.resolved_cell())), + invalidator: None, + deadline_secs: None, + } + .cell()) + } + } Err(err) => { - // the client failed to construct or the HTTP request failed - // Mark session dependent so we get retried in the next sessions - // In dev our caller will keep going, but in prod builds this will fail the build - // anyway. + // Read session_dependent_completion so that this task is re-dirtied on session + // restore. This ensures transient errors (network down, DNS failure) are retried + // on the next session without a timer or busy-loop. Completion::session_dependent().as_side_effect().await?; - Ok(Vc::cell(Err( - FetchError::from_reqwest_error(&err, &url).resolved_cell() - ))) + Ok(FetchInnerResult { + result: ResolvedVc::cell(Err( + FetchError::from_reqwest_error(&err, &url).resolved_cell() + )), + + invalidator: None, + deadline_secs: None, + } + .cell()) + } + } + } + + /// Fetches the given URL and returns the response. Results are cached across sessions using + /// TTL from the response's `Cache-Control: max-age` header. + /// + /// This is the outer task in a two-task pattern: + /// - `fetch` (session_dependent): always re-executes on restore, reads the cached inner result, + /// and spawns a timer for mid-session TTL expiry. + /// - `fetch_inner` (network, NOT session_dependent): performs the actual HTTP request and stays + /// cached across restarts. Returns an `Invalidator` that the outer task uses to trigger + /// re-fetching when the TTL expires. + #[turbo_tasks::function(network, session_dependent)] + pub async fn fetch( + self: Vc, + url: RcStr, + user_agent: Option, + ) -> Result> { + let FetchInnerResult { + result, + deadline_secs, + invalidator, + } = *self.fetch_inner(url, user_agent).await?; + + // Set up a timer to invalidate fetch_inner when the TTL expires. + // On warm cache restore, this re-executes (session_dependent), reads the persisted + // deadline from fetch_inner's cached result, and starts a timer for the remaining time. + // + // Skip when dependency tracking is disabled (e.g. one-shot `next build`) since + // invalidation panics without dependency tracking and the timer would be wasted work. + if turbo_tasks::turbo_tasks().is_tracking_dependencies() + && let (Some(deadline_secs), Some(invalidator)) = (deadline_secs, invalidator) + { + // transform absolute deadline back to a relative duration for the sleep call + let now = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .expect("system clock is before UNIX epoch") + .as_secs(); + let remaining = Duration::from_secs(deadline_secs.saturating_sub(now)); + // NOTE: in the case where the deadline is expired on session start this timeout will + // immediately invalidate and race with us returning. This is basically fine since in + // the most common case the actual fetch result is identical so this gives us a kind of + // 'stale while revalidate' feature. + // alternatively we could synchronously invalidate and re-execute `fetch-inner` but that + // simply adds latency in the common case where our fetch is identical. + // NOTE(2): if for some reason `fetch` is re-executed but `fetch-inner` isn't we could + // end up with multiple timers. Currently there is no known case where this could + // happen, if it somehow does we could end up with redundant invalidations and + // re-fetches. The solution is to detect this with a mutable hash map on + // FetchClientConfig to track outstanding timers and cancel them. + turbo_tasks::spawn(async move { + tokio::time::sleep(remaining).await; + invalidator.invalidate_with_reason(&*turbo_tasks::turbo_tasks(), HttpTimeout {}); + }); + } + + Ok(*result) + } +} + +/// Parses the `max-age` directive from a `Cache-Control` header value. +/// Returns the max-age in seconds, or `None` if not present or unparseable. +/// None means we shouldn't cache longer than the current session +fn parse_cache_control(headers: &reqwest::header::HeaderMap) -> Option { + let value = headers.get(reqwest::header::CACHE_CONTROL)?.to_str().ok()?; + let mut max_age = None; + for directive in value.split(',') { + let (key, val) = { + if let Some(index) = directive.find('=') { + (directive[0..index].trim(), Some(&directive[index + 1..])) + } else { + (directive.trim(), None) } + }; + if key.eq_ignore_ascii_case("max-age") + && let Some(val) = val + { + max_age = val.trim().parse().ok(); + } else if key.eq_ignore_ascii_case("no-cache") || key.eq_ignore_ascii_case("no-store") { + return None; } } + max_age } #[doc(hidden)] @@ -142,3 +331,35 @@ pub fn __test_only_reqwest_client_cache_clear() { pub fn __test_only_reqwest_client_cache_len() -> usize { CLIENT_CACHE.len() } + +#[cfg(test)] +mod tests { + use reqwest::header::{CACHE_CONTROL, HeaderMap, HeaderValue}; + + use super::parse_cache_control; + + fn headers(value: &str) -> HeaderMap { + let mut h = HeaderMap::new(); + h.insert(CACHE_CONTROL, HeaderValue::from_str(value).unwrap()); + h + } + + #[test] + fn max_age() { + assert_eq!(parse_cache_control(&headers("max-age=300")), Some(300)); + assert_eq!(parse_cache_control(&headers("MAX-AGE = 300")), Some(300)); + assert_eq!( + parse_cache_control(&headers("public, max-age=3600, must-revalidate")), + Some(3600) + ); + } + + #[test] + fn no_cache_headers() { + assert_eq!(parse_cache_control(&headers("NO-CACHE")), None); + assert_eq!(parse_cache_control(&headers("no-cache")), None); + assert_eq!(parse_cache_control(&headers("no-store")), None); + assert_eq!(parse_cache_control(&headers("max-age=300, no-store")), None); + assert_eq!(parse_cache_control(&HeaderMap::new()), None); + } +} diff --git a/turbopack/crates/turbo-tasks-fetch/tests/fetch.rs b/turbopack/crates/turbo-tasks-fetch/tests/fetch.rs index 5b9002c5dbd2..bf49601bee88 100644 --- a/turbopack/crates/turbo-tasks-fetch/tests/fetch.rs +++ b/turbopack/crates/turbo-tasks-fetch/tests/fetch.rs @@ -8,7 +8,7 @@ use std::sync::Arc; use anyhow::Result; use tokio::sync::Mutex as TokioMutex; use turbo_rcstr::{RcStr, rcstr}; -use turbo_tasks::{ReadRef, Vc}; +use turbo_tasks::{ReadRef, TurboTasksApi, Vc}; use turbo_tasks_fetch::{ __test_only_reqwest_client_cache_clear, __test_only_reqwest_client_cache_len, FetchClientConfig, FetchErrorKind, FetchIssue, @@ -299,6 +299,235 @@ async fn errors_on_404() { .unwrap() } +/// Helper: create a TT instance for fetch tests. When `initial` is true, clears the cache +/// directory first (cold start). When false, reuses existing cache (warm restore). +fn create_fetch_tt(name: &str, initial: bool) -> Arc { + REGISTRATION.create_turbo_tasks(name, initial) +} + +#[turbo_tasks::function(operation)] +async fn fetch_body(url: RcStr) -> Result> { + let client_vc = FetchClientConfig { + min_cache_control_secs: 0, + } + .cell(); + let response = &*client_vc + .fetch(url, /* user_agent */ None) + .await? + .unwrap() + .await?; + Ok(response.body.to_string()) +} + +/// Test that the TTL timer invalidates `fetch_inner` within a session. +/// +/// 1. Server returns body "v1" with `max-age=1` +/// 2. First fetch returns "v1" +/// 3. Server changes to return "v2" +/// 4. Wait 2s for TTL to expire (timer fires, invalidates fetch_inner) +/// 5. Strongly consistent read returns "v2" +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn ttl_invalidates_within_session() { + let _guard = GLOBAL_TEST_LOCK.lock().await; + let mut server = mockito::Server::new_async().await; + let url = RcStr::from(format!("{}/ttl-within", server.url())); + + server + .mock("GET", "/ttl-within") + .with_body("v1") + .with_header("Cache-Control", "max-age=1") + .create_async() + .await; + + let tt = create_fetch_tt("ttl_invalidates_within_session", true); + let body = turbo_tasks::run_once(tt.clone(), { + let url = url.clone(); + async move { + let body = fetch_body(url).read_strongly_consistent().await?; + Ok((*body).clone()) + } + }) + .await + .unwrap(); + assert_eq!(&*body, "v1"); + + // Change the server response + server.reset(); + server + .mock("GET", "/ttl-within") + .with_body("v2") + .with_header("Cache-Control", "max-age=1") + .create_async() + .await; + + // Wait for the TTL timer to fire (max-age=1, so wait 2s to be safe) + tokio::time::sleep(std::time::Duration::from_secs(2)).await; + + // The timer should have invalidated fetch_inner, so a new strongly consistent read + // should re-fetch and return the updated body. + let body = turbo_tasks::run_once(tt.clone(), { + let url = url.clone(); + async move { + let body = fetch_body(url).read_strongly_consistent().await?; + Ok((*body).clone()) + } + }) + .await + .unwrap(); + assert_eq!(&*body, "v2"); + + tt.stop_and_wait().await; +} + +/// Test that after a session restore, an expired TTL causes a re-fetch. +/// +/// 1. Server returns "v1" with `max-age=1` +/// 2. Fetch, stop TT +/// 3. Wait for TTL to expire +/// 4. Create new TT (warm restore), server now returns "v2" +/// 5. Fetch should return "v2" (deadline expired, timer fires immediately on restore) +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn ttl_invalidates_on_session_restore() { + let _guard = GLOBAL_TEST_LOCK.lock().await; + let mut server = mockito::Server::new_async().await; + let url = RcStr::from(format!("{}/ttl-restore", server.url())); + + server + .mock("GET", "/ttl-restore") + .with_body("v1") + .with_header("Cache-Control", "max-age=1") + .create_async() + .await; + + // Session 1: fetch and cache + let tt = create_fetch_tt("ttl_invalidates_on_session_restore", true); + let body = turbo_tasks::run_once(tt.clone(), { + let url = url.clone(); + async move { + let body = fetch_body(url).read_strongly_consistent().await?; + Ok((*body).clone()) + } + }) + .await + .unwrap(); + assert_eq!(&*body, "v1"); + tt.stop_and_wait().await; + + // Wait for TTL to expire + tokio::time::sleep(std::time::Duration::from_secs(2)).await; + + // Change server response + server.reset(); + server + .mock("GET", "/ttl-restore") + .with_body("v2") + .with_header("Cache-Control", "max-age=1") + .create_async() + .await; + + // Session 2: warm restore — TTL expired, should re-fetch. + // On restore, `fetch` (session_dependent) re-executes and reads the cached `fetch_inner` + // result. The deadline is expired, so it spawns a zero-duration timer. That timer + // invalidates `fetch_inner` asynchronously, which triggers a second round of execution. + // We need to read twice: the first read returns the stale cached value, then wait for the + // timer-triggered re-execution to settle. + let tt = create_fetch_tt("ttl_invalidates_on_session_restore", false); + turbo_tasks::run_once(tt.clone(), { + let url = url.clone(); + async move { + // First read returns the stale cached value, but triggers the timer + let _body = fetch_body(url).read_strongly_consistent().await?; + Ok(()) + } + }) + .await + .unwrap(); + + // Wait for the timer to fire and re-execution to settle + tokio::time::sleep(std::time::Duration::from_millis(500)).await; + + let body = turbo_tasks::run_once(tt.clone(), { + let url = url.clone(); + async move { + let body = fetch_body(url).read_strongly_consistent().await?; + Ok((*body).clone()) + } + }) + .await + .unwrap(); + assert_eq!(&*body, "v2"); + tt.stop_and_wait().await; +} + +/// Test that fetch errors are retried on session restore. +/// +/// 1. Server returns connection refused (error) +/// 2. Fetch returns error +/// 3. Stop TT, start new session +/// 4. Server now returns 200 +/// 5. Fetch should succeed (error was session-dependent, retried on restore) +/// +/// TODO: Consider retrying errors within a session with backoff. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn errors_retried_on_session_restore() { + let _guard = GLOBAL_TEST_LOCK.lock().await; + let mut server = mockito::Server::new_async().await; + let url = RcStr::from(format!("{}/error-restore", server.url())); + + // Session 1: server returns 500 + server + .mock("GET", "/error-restore") + .with_status(500) + .create_async() + .await; + + let tt = create_fetch_tt("errors_retried_on_session_restore", true); + let is_err = turbo_tasks::run_once(tt.clone(), { + let url = url.clone(); + async move { + #[turbo_tasks::function(operation)] + async fn fetch_is_err(url: RcStr) -> Result> { + let client_vc = FetchClientConfig::default().cell(); + let result = &*client_vc.fetch(url, None).await?; + Ok(Vc::cell(result.is_err())) + } + let is_err = *fetch_is_err(url).read_strongly_consistent().await?; + Ok(is_err) + } + }) + .await + .unwrap(); + assert!(is_err, "first fetch should be an error"); + tt.stop_and_wait().await; + + // Session 2: server now returns 200 + server.reset(); + server + .mock("GET", "/error-restore") + .with_body("success") + .create_async() + .await; + + let tt = create_fetch_tt("errors_retried_on_session_restore", false); + let is_err = turbo_tasks::run_once(tt.clone(), { + let url = url.clone(); + async move { + #[turbo_tasks::function(operation)] + async fn fetch_is_err2(url: RcStr) -> Result> { + let client_vc = FetchClientConfig::default().cell(); + let result = &*client_vc.fetch(url, None).await?; + Ok(Vc::cell(result.is_err())) + } + let is_err = *fetch_is_err2(url).read_strongly_consistent().await?; + Ok(is_err) + } + }) + .await + .unwrap(); + assert!(!is_err, "second fetch should succeed after session restore"); + tt.stop_and_wait().await; +} + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn client_cache() { let mut server = mockito::Server::new_async().await; From 3da9cf94e20bd2f98fb8359c60cce6a5018b8892 Mon Sep 17 00:00:00 2001 From: Luke Sandberg Date: Fri, 24 Apr 2026 17:24:29 -0700 Subject: [PATCH 2/6] review feedback --- .../crates/turbo-tasks-fetch/src/client.rs | 66 +++++++++---------- .../crates/turbo-tasks-fetch/tests/fetch.rs | 39 ++++------- 2 files changed, 47 insertions(+), 58 deletions(-) diff --git a/turbopack/crates/turbo-tasks-fetch/src/client.rs b/turbopack/crates/turbo-tasks-fetch/src/client.rs index 209186911797..5649c26dca85 100644 --- a/turbopack/crates/turbo-tasks-fetch/src/client.rs +++ b/turbopack/crates/turbo-tasks-fetch/src/client.rs @@ -33,13 +33,13 @@ pub struct FetchClientConfig { /// Minimum cache TTL in seconds. Responses with a `Cache-Control: max-age` shorter than this /// will be clamped to this value. This prevents pathologically short timeouts from causing an /// invalidation bomb. Defaults to 1 hour. - pub min_cache_control_secs: u64, + pub min_cache_control: Duration, } impl Default for FetchClientConfig { fn default() -> Self { Self { - min_cache_control_secs: 60 * 60, + min_cache_control: Duration::from_secs(60 * 60), } } } @@ -102,7 +102,7 @@ impl FetchClientConfig { /// Invalidation was caused by a max-age deadline returned by a server #[derive(PartialEq, Eq, Hash)] -pub(crate) struct HttpTimeout {} +pub(crate) struct HttpTimeout; impl InvalidationReason for HttpTimeout { fn kind(&self) -> Option> { @@ -116,7 +116,7 @@ impl Display for HttpTimeout { } } -/// Invalidation kind for [Write] +/// Invalidation kind for [HttpTimeout] #[derive(PartialEq, Eq, Hash)] struct HttpTimeoutKind; @@ -128,7 +128,7 @@ impl InvalidationReasonKind for HttpTimeoutKind { reasons: &FxIndexSet>, f: &mut Formatter<'_>, ) -> std::fmt::Result { - write!(f, "{} fetches timed out", reasons.len(),) + write!(f, "{} fetches timed out", reasons.len()) } } @@ -158,7 +158,7 @@ impl FetchClientConfig { ) -> Result> { let url_ref = &*url; let this = self.await?; - let min_cache_control_secs = this.min_cache_control_secs; + let min_cache_control_secs = this.min_cache_control; let response_result: reqwest::Result<(HttpResponse, Option)> = async move { let reqwest_client = this.try_get_cached_reqwest_client()?; @@ -195,21 +195,21 @@ impl FetchClientConfig { match response_result { Ok((resp, max_age_secs)) => { if let Some(max_age_secs) = max_age_secs { - let max_age_secs = max(max_age_secs, min_cache_control_secs); + let max_age_secs = max(max_age_secs, min_cache_control_secs.as_secs()); let deadline_secs = { // Transform the relative offset to an absolute deadline so it can be // cached. - let now = SystemTime::now() + SystemTime::now() .duration_since(SystemTime::UNIX_EPOCH) - .expect("system clock is before UNIX epoch") - .as_secs(); - now + max_age_secs + // If the system clock is borked, just don't respect deadlines + .ok() + .map(|d| d.as_secs() + max_age_secs) }; let invalidator = turbo_tasks::get_invalidator(); Ok(FetchInnerResult { result: ResolvedVc::cell(Ok(resp.resolved_cell())), invalidator, - deadline_secs: Some(deadline_secs), + deadline_secs, } .cell()) } else { @@ -231,7 +231,6 @@ impl FetchClientConfig { result: ResolvedVc::cell(Err( FetchError::from_reqwest_error(&err, &url).resolved_cell() )), - invalidator: None, deadline_secs: None, } @@ -271,26 +270,27 @@ impl FetchClientConfig { && let (Some(deadline_secs), Some(invalidator)) = (deadline_secs, invalidator) { // transform absolute deadline back to a relative duration for the sleep call - let now = SystemTime::now() - .duration_since(SystemTime::UNIX_EPOCH) - .expect("system clock is before UNIX epoch") - .as_secs(); - let remaining = Duration::from_secs(deadline_secs.saturating_sub(now)); - // NOTE: in the case where the deadline is expired on session start this timeout will - // immediately invalidate and race with us returning. This is basically fine since in - // the most common case the actual fetch result is identical so this gives us a kind of - // 'stale while revalidate' feature. - // alternatively we could synchronously invalidate and re-execute `fetch-inner` but that - // simply adds latency in the common case where our fetch is identical. - // NOTE(2): if for some reason `fetch` is re-executed but `fetch-inner` isn't we could - // end up with multiple timers. Currently there is no known case where this could - // happen, if it somehow does we could end up with redundant invalidations and - // re-fetches. The solution is to detect this with a mutable hash map on - // FetchClientConfig to track outstanding timers and cancel them. - turbo_tasks::spawn(async move { - tokio::time::sleep(remaining).await; - invalidator.invalidate_with_reason(&*turbo_tasks::turbo_tasks(), HttpTimeout {}); - }); + // IF the system clock is broken, just don't bother. + if let Ok(now) = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH) { + let remaining = Duration::from_secs(deadline_secs.saturating_sub(now.as_secs())); + // NOTE: in the case where the deadline is expired on session start this timeout + // will immediately invalidate and race with us returning. This is + // basically fine since in the most common case the actual fetch + // result is identical so this gives us a kind of 'stale while + // revalidate' feature. alternatively we could synchronously + // invalidate and re-execute `fetch-inner` but that simply adds + // latency in the common case where our fetch is identical. NOTE(2): + // if for some reason `fetch` is re-executed but `fetch-inner` isn't we could + // end up with multiple timers. Currently there is no known case where this could + // happen, if it somehow does we could end up with redundant invalidations and + // re-fetches. The solution is to detect this with a mutable hash map on + // FetchClientConfig to track outstanding timers and cancel them. + turbo_tasks::spawn(async move { + tokio::time::sleep(remaining).await; + invalidator + .invalidate_with_reason(&*turbo_tasks::turbo_tasks(), HttpTimeout {}); + }); + } } Ok(*result) diff --git a/turbopack/crates/turbo-tasks-fetch/tests/fetch.rs b/turbopack/crates/turbo-tasks-fetch/tests/fetch.rs index bf49601bee88..80644dc11848 100644 --- a/turbopack/crates/turbo-tasks-fetch/tests/fetch.rs +++ b/turbopack/crates/turbo-tasks-fetch/tests/fetch.rs @@ -8,7 +8,7 @@ use std::sync::Arc; use anyhow::Result; use tokio::sync::Mutex as TokioMutex; use turbo_rcstr::{RcStr, rcstr}; -use turbo_tasks::{ReadRef, TurboTasksApi, Vc}; +use turbo_tasks::{ReadRef, Vc}; use turbo_tasks_fetch::{ __test_only_reqwest_client_cache_clear, __test_only_reqwest_client_cache_len, FetchClientConfig, FetchErrorKind, FetchIssue, @@ -299,12 +299,6 @@ async fn errors_on_404() { .unwrap() } -/// Helper: create a TT instance for fetch tests. When `initial` is true, clears the cache -/// directory first (cold start). When false, reuses existing cache (warm restore). -fn create_fetch_tt(name: &str, initial: bool) -> Arc { - REGISTRATION.create_turbo_tasks(name, initial) -} - #[turbo_tasks::function(operation)] async fn fetch_body(url: RcStr) -> Result> { let client_vc = FetchClientConfig { @@ -339,7 +333,7 @@ async fn ttl_invalidates_within_session() { .create_async() .await; - let tt = create_fetch_tt("ttl_invalidates_within_session", true); + let tt = REGISTRATION.create_turbo_tasks("ttl_invalidates_within_session", true); let body = turbo_tasks::run_once(tt.clone(), { let url = url.clone(); async move { @@ -400,7 +394,7 @@ async fn ttl_invalidates_on_session_restore() { .await; // Session 1: fetch and cache - let tt = create_fetch_tt("ttl_invalidates_on_session_restore", true); + let tt = REGISTRATION.create_turbo_tasks("ttl_invalidates_on_session_restore", true); let body = turbo_tasks::run_once(tt.clone(), { let url = url.clone(); async move { @@ -431,7 +425,7 @@ async fn ttl_invalidates_on_session_restore() { // invalidates `fetch_inner` asynchronously, which triggers a second round of execution. // We need to read twice: the first read returns the stale cached value, then wait for the // timer-triggered re-execution to settle. - let tt = create_fetch_tt("ttl_invalidates_on_session_restore", false); + let tt = REGISTRATION.create_turbo_tasks("ttl_invalidates_on_session_restore", false); turbo_tasks::run_once(tt.clone(), { let url = url.clone(); async move { @@ -459,6 +453,13 @@ async fn ttl_invalidates_on_session_restore() { tt.stop_and_wait().await; } +#[turbo_tasks::function(operation)] +async fn fetch_is_err(url: RcStr) -> Result> { + let client_vc = FetchClientConfig::default().cell(); + let result = &*client_vc.fetch(url, None).await?; + Ok(Vc::cell(result.is_err())) +} + /// Test that fetch errors are retried on session restore. /// /// 1. Server returns connection refused (error) @@ -481,16 +482,10 @@ async fn errors_retried_on_session_restore() { .create_async() .await; - let tt = create_fetch_tt("errors_retried_on_session_restore", true); + let tt = REGISTRATION.create_turbo_tasks("errors_retried_on_session_restore", true); let is_err = turbo_tasks::run_once(tt.clone(), { let url = url.clone(); async move { - #[turbo_tasks::function(operation)] - async fn fetch_is_err(url: RcStr) -> Result> { - let client_vc = FetchClientConfig::default().cell(); - let result = &*client_vc.fetch(url, None).await?; - Ok(Vc::cell(result.is_err())) - } let is_err = *fetch_is_err(url).read_strongly_consistent().await?; Ok(is_err) } @@ -508,17 +503,11 @@ async fn errors_retried_on_session_restore() { .create_async() .await; - let tt = create_fetch_tt("errors_retried_on_session_restore", false); + let tt = REGISTRATION.create_turbo_tasks("errors_retried_on_session_restore", false); let is_err = turbo_tasks::run_once(tt.clone(), { let url = url.clone(); async move { - #[turbo_tasks::function(operation)] - async fn fetch_is_err2(url: RcStr) -> Result> { - let client_vc = FetchClientConfig::default().cell(); - let result = &*client_vc.fetch(url, None).await?; - Ok(Vc::cell(result.is_err())) - } - let is_err = *fetch_is_err2(url).read_strongly_consistent().await?; + let is_err = *fetch_is_err(url).read_strongly_consistent().await?; Ok(is_err) } }) From 0a9ab82c0a1cea5ee1c73732d732069728351dbe Mon Sep 17 00:00:00 2001 From: Luke Sandberg Date: Fri, 1 May 2026 22:53:59 -0700 Subject: [PATCH 3/6] fix --- turbopack/crates/turbo-tasks-fetch/tests/fetch.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/turbopack/crates/turbo-tasks-fetch/tests/fetch.rs b/turbopack/crates/turbo-tasks-fetch/tests/fetch.rs index 80644dc11848..ad66381d4166 100644 --- a/turbopack/crates/turbo-tasks-fetch/tests/fetch.rs +++ b/turbopack/crates/turbo-tasks-fetch/tests/fetch.rs @@ -3,7 +3,7 @@ #![allow(clippy::needless_return)] // tokio macro-generated code doesn't respect this #![cfg(test)] -use std::sync::Arc; +use std::{sync::Arc, time::Duration}; use anyhow::Result; use tokio::sync::Mutex as TokioMutex; @@ -302,7 +302,7 @@ async fn errors_on_404() { #[turbo_tasks::function(operation)] async fn fetch_body(url: RcStr) -> Result> { let client_vc = FetchClientConfig { - min_cache_control_secs: 0, + min_cache_control: Duration::ZERO, } .cell(); let response = &*client_vc From df11034cfc3eeb282526d3f246ded1c37e3b0cf6 Mon Sep 17 00:00:00 2001 From: Luke Sandberg Date: Fri, 1 May 2026 23:51:17 -0700 Subject: [PATCH 4/6] fix dependent session dependence --- turbopack/crates/turbo-tasks-fetch/src/client.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/turbopack/crates/turbo-tasks-fetch/src/client.rs b/turbopack/crates/turbo-tasks-fetch/src/client.rs index 5649c26dca85..2db05755535a 100644 --- a/turbopack/crates/turbo-tasks-fetch/src/client.rs +++ b/turbopack/crates/turbo-tasks-fetch/src/client.rs @@ -226,7 +226,7 @@ impl FetchClientConfig { // Read session_dependent_completion so that this task is re-dirtied on session // restore. This ensures transient errors (network down, DNS failure) are retried // on the next session without a timer or busy-loop. - Completion::session_dependent().as_side_effect().await?; + Completion::session_dependent().await?; Ok(FetchInnerResult { result: ResolvedVc::cell(Err( FetchError::from_reqwest_error(&err, &url).resolved_cell() From 4ad107c1d3363deac35a6fa408a178b19c5bc6ea Mon Sep 17 00:00:00 2001 From: Luke Sandberg Date: Wed, 13 May 2026 08:47:05 -0700 Subject: [PATCH 5/6] resolve merge issues --- .../crates/turbo-tasks-fetch/tests/fetch.rs | 21 ++++++++++++------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/turbopack/crates/turbo-tasks-fetch/tests/fetch.rs b/turbopack/crates/turbo-tasks-fetch/tests/fetch.rs index ad66381d4166..409454eb03db 100644 --- a/turbopack/crates/turbo-tasks-fetch/tests/fetch.rs +++ b/turbopack/crates/turbo-tasks-fetch/tests/fetch.rs @@ -14,7 +14,7 @@ use turbo_tasks_fetch::{ FetchClientConfig, FetchErrorKind, FetchIssue, }; use turbo_tasks_fs::{DiskFileSystem, FileSystem, FileSystemPath}; -use turbo_tasks_testing::{Registration, register, run_once}; +use turbo_tasks_testing::{Registration, TestInstance, register, run_once}; use turbopack_core::issue::{Issue, IssueSeverity, StyledString}; static REGISTRATION: Registration = register!(); @@ -299,7 +299,7 @@ async fn errors_on_404() { .unwrap() } -#[turbo_tasks::function(operation)] +#[turbo_tasks::function(operation, root)] async fn fetch_body(url: RcStr) -> Result> { let client_vc = FetchClientConfig { min_cache_control: Duration::ZERO, @@ -333,7 +333,8 @@ async fn ttl_invalidates_within_session() { .create_async() .await; - let tt = REGISTRATION.create_turbo_tasks("ttl_invalidates_within_session", true); + let TestInstance { tt, .. } = + REGISTRATION.create_turbo_tasks("ttl_invalidates_within_session", true); let body = turbo_tasks::run_once(tt.clone(), { let url = url.clone(); async move { @@ -394,7 +395,8 @@ async fn ttl_invalidates_on_session_restore() { .await; // Session 1: fetch and cache - let tt = REGISTRATION.create_turbo_tasks("ttl_invalidates_on_session_restore", true); + let TestInstance { tt, .. } = + REGISTRATION.create_turbo_tasks("ttl_invalidates_on_session_restore", true); let body = turbo_tasks::run_once(tt.clone(), { let url = url.clone(); async move { @@ -425,7 +427,8 @@ async fn ttl_invalidates_on_session_restore() { // invalidates `fetch_inner` asynchronously, which triggers a second round of execution. // We need to read twice: the first read returns the stale cached value, then wait for the // timer-triggered re-execution to settle. - let tt = REGISTRATION.create_turbo_tasks("ttl_invalidates_on_session_restore", false); + let TestInstance { tt, .. } = + REGISTRATION.create_turbo_tasks("ttl_invalidates_on_session_restore", false); turbo_tasks::run_once(tt.clone(), { let url = url.clone(); async move { @@ -453,7 +456,7 @@ async fn ttl_invalidates_on_session_restore() { tt.stop_and_wait().await; } -#[turbo_tasks::function(operation)] +#[turbo_tasks::function(operation, root)] async fn fetch_is_err(url: RcStr) -> Result> { let client_vc = FetchClientConfig::default().cell(); let result = &*client_vc.fetch(url, None).await?; @@ -482,7 +485,8 @@ async fn errors_retried_on_session_restore() { .create_async() .await; - let tt = REGISTRATION.create_turbo_tasks("errors_retried_on_session_restore", true); + let TestInstance { tt, .. } = + REGISTRATION.create_turbo_tasks("errors_retried_on_session_restore", true); let is_err = turbo_tasks::run_once(tt.clone(), { let url = url.clone(); async move { @@ -503,7 +507,8 @@ async fn errors_retried_on_session_restore() { .create_async() .await; - let tt = REGISTRATION.create_turbo_tasks("errors_retried_on_session_restore", false); + let TestInstance { tt, .. } = + REGISTRATION.create_turbo_tasks("errors_retried_on_session_restore", false); let is_err = turbo_tasks::run_once(tt.clone(), { let url = url.clone(); async move { From 5357124d6fe40c4b5d5d0a8fb8e34a9c28ccd72d Mon Sep 17 00:00:00 2001 From: Luke Sandberg Date: Wed, 13 May 2026 16:38:47 -0700 Subject: [PATCH 6/6] simplfiy syntax fewer braces --- turbopack/crates/turbo-tasks-fetch/src/client.rs | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/turbopack/crates/turbo-tasks-fetch/src/client.rs b/turbopack/crates/turbo-tasks-fetch/src/client.rs index 2db05755535a..f9c156a0c1a1 100644 --- a/turbopack/crates/turbo-tasks-fetch/src/client.rs +++ b/turbopack/crates/turbo-tasks-fetch/src/client.rs @@ -196,15 +196,13 @@ impl FetchClientConfig { Ok((resp, max_age_secs)) => { if let Some(max_age_secs) = max_age_secs { let max_age_secs = max(max_age_secs, min_cache_control_secs.as_secs()); - let deadline_secs = { - // Transform the relative offset to an absolute deadline so it can be - // cached. - SystemTime::now() - .duration_since(SystemTime::UNIX_EPOCH) - // If the system clock is borked, just don't respect deadlines - .ok() - .map(|d| d.as_secs() + max_age_secs) - }; + // Transform the relative offset to an absolute deadline so it can be + // cached. + let deadline_secs = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + // If the system clock is borked, just don't respect deadlines + .ok() + .map(|d| d.as_secs() + max_age_secs); let invalidator = turbo_tasks::get_invalidator(); Ok(FetchInnerResult { result: ResolvedVc::cell(Ok(resp.resolved_cell())),