From 557bc194aa61379f1f3d394d41e0cffd2e1e93e4 Mon Sep 17 00:00:00 2001 From: Daria Sukhonina Date: Tue, 2 Dec 2025 14:24:13 +0300 Subject: [PATCH 01/10] Add BranchKey data structure --- compiler/rustc_data_structures/src/sync.rs | 2 + .../src/sync/branch_key.rs | 39 +++++++++++++++++++ 2 files changed, 41 insertions(+) create mode 100644 compiler/rustc_data_structures/src/sync/branch_key.rs diff --git a/compiler/rustc_data_structures/src/sync.rs b/compiler/rustc_data_structures/src/sync.rs index 3881f3c2aa841..4ddb2b5391393 100644 --- a/compiler/rustc_data_structures/src/sync.rs +++ b/compiler/rustc_data_structures/src/sync.rs @@ -46,12 +46,14 @@ pub use self::parallel::{ pub use self::vec::{AppendOnlyIndexVec, AppendOnlyVec}; pub use self::worker_local::{Registry, WorkerLocal}; pub use crate::marker::*; +pub use self::branch_key::BranchKey; mod freeze; mod lock; mod parallel; mod vec; mod worker_local; +mod branch_key; /// Keep the conditional imports together in a submodule, so that import-sorting /// doesn't split them up. diff --git a/compiler/rustc_data_structures/src/sync/branch_key.rs b/compiler/rustc_data_structures/src/sync/branch_key.rs new file mode 100644 index 0000000000000..bb4205b4998e5 --- /dev/null +++ b/compiler/rustc_data_structures/src/sync/branch_key.rs @@ -0,0 +1,39 @@ +use std::cmp; + +#[derive(Clone, Copy, Debug, PartialEq)] +pub struct BranchKey(u128); + +impl BranchKey { + pub fn root() -> Self { + Self(0x80000000_00000000_00000000_00000000) + } + + pub fn bit_branch(self) -> [Self; 2] { + let trailing_zeroes = self.0.trailing_zeros(); + assert!(trailing_zeroes >= 1, "query branch space is exhausted"); + [BranchKey(self.0 ^ (0b11 << (trailing_zeroes - 1))), BranchKey(self.0 ^ (0b01 << (trailing_zeroes - 1)))] + } + + pub fn bits_branch_iter(self, bits: u32) -> impl Iterator { + let trailing_zeroes = self.0.trailing_zeros(); + let allocated_shift = trailing_zeroes + .checked_sub(bits) + .unwrap_or_else(|| panic!("query branch space is exhausted to fit {bits} bits")); + let step = 1 << (allocated_shift + 1); + let zero = self.0 & !(1 << trailing_zeroes) | (1 << allocated_shift); + (0..1 << bits).map(move |n| { + BranchKey(zero + step * n) + }) + } + + pub fn n_branch_iter(self, n: u32) -> impl Iterator { + let iter = self.bits_branch_iter(n.saturating_sub(1).checked_ilog2().map_or(0, |b| b + 1)); + iter.take(n.try_into().unwrap()) + } + + pub fn raw_cmp(self, other: Self) -> cmp::Ordering { + self.0.cmp(&other.0) + } +} + + From 95c00dfa276e63b1ce8a95fa6237a4f3e831cc3f Mon Sep 17 00:00:00 2001 From: Daria Sukhonina Date: Tue, 2 Dec 2025 14:26:41 +0300 Subject: [PATCH 02/10] Use weak arc pointers to detect unused latches --- compiler/rustc_query_system/src/query/job.rs | 37 +++++++++---------- .../rustc_query_system/src/query/plumbing.rs | 4 +- 2 files changed, 19 insertions(+), 22 deletions(-) diff --git a/compiler/rustc_query_system/src/query/job.rs b/compiler/rustc_query_system/src/query/job.rs index 7d9b594d501ff..a5124948c435c 100644 --- a/compiler/rustc_query_system/src/query/job.rs +++ b/compiler/rustc_query_system/src/query/job.rs @@ -3,7 +3,7 @@ use std::hash::Hash; use std::io::Write; use std::iter; use std::num::NonZero; -use std::sync::Arc; +use std::sync::{Arc, Weak}; use parking_lot::{Condvar, Mutex}; use rustc_data_structures::fx::{FxHashMap, FxHashSet}; @@ -54,8 +54,8 @@ impl QueryJobId { map.get(&self).unwrap().job.parent } - fn latch(self, map: &QueryMap) -> Option<&QueryLatch> { - map.get(&self).unwrap().job.latch.as_ref() + fn latch(self, map: &QueryMap) -> &Weak> { + &map.get(&self).unwrap().job.latch } } @@ -77,7 +77,7 @@ pub struct QueryJob { pub parent: Option, /// The latch that is used to wait on this job. - latch: Option>, + latch: Weak>, } impl Clone for QueryJob { @@ -90,14 +90,17 @@ impl QueryJob { /// Creates a new query job. #[inline] pub fn new(id: QueryJobId, span: Span, parent: Option) -> Self { - QueryJob { id, span, parent, latch: None } + QueryJob { id, span, parent, latch: Weak::new() } } - pub(super) fn latch(&mut self) -> QueryLatch { - if self.latch.is_none() { - self.latch = Some(QueryLatch::new()); + pub(super) fn latch(&mut self) -> Arc> { + if let Some(latch) = self.latch.upgrade() { + latch + } else { + let latch = Arc::new(QueryLatch::new()); + self.latch = Arc::downgrade(&latch); + latch } - self.latch.as_ref().unwrap().clone() } /// Signals to waiters that the query is complete. @@ -106,7 +109,7 @@ impl QueryJob { /// as there are no concurrent jobs which could be waiting on us #[inline] pub fn signal_complete(self) { - if let Some(latch) = self.latch { + if let Some(latch) = self.latch.upgrade() { latch.set(); } } @@ -187,19 +190,13 @@ struct QueryLatchInfo { #[derive(Debug)] pub(super) struct QueryLatch { - info: Arc>>, -} - -impl Clone for QueryLatch { - fn clone(&self) -> Self { - Self { info: Arc::clone(&self.info) } - } + info: Mutex>, } impl QueryLatch { fn new() -> Self { QueryLatch { - info: Arc::new(Mutex::new(QueryLatchInfo { complete: false, waiters: Vec::new() })), + info: Mutex::new(QueryLatchInfo { complete: false, waiters: Vec::new() }), } } @@ -296,7 +293,7 @@ where } // Visit the explicit waiters which use condvars and are resumable - if let Some(latch) = query.latch(query_map) { + if let Some(latch) = query.latch(query_map).upgrade() { for (i, waiter) in latch.info.lock().waiters.iter().enumerate() { if let Some(waiter_query) = waiter.query { if visit(waiter.span, waiter_query).is_some() { @@ -486,7 +483,7 @@ fn remove_cycle( let (waitee_query, waiter_idx) = waiter.unwrap(); // Extract the waiter we want to resume - let waiter = waitee_query.latch(query_map).unwrap().extract_waiter(waiter_idx); + let waiter = waitee_query.latch(query_map).upgrade().unwrap().extract_waiter(waiter_idx); // Set the cycle error so it will be picked up when resumed *waiter.cycle.lock() = Some(error); diff --git a/compiler/rustc_query_system/src/query/plumbing.rs b/compiler/rustc_query_system/src/query/plumbing.rs index dea47c8fa787e..dc0827db49a2c 100644 --- a/compiler/rustc_query_system/src/query/plumbing.rs +++ b/compiler/rustc_query_system/src/query/plumbing.rs @@ -296,7 +296,7 @@ fn wait_for_query( qcx: Qcx, span: Span, key: Q::Key, - latch: QueryLatch, + latch: &QueryLatch, current: Option, ) -> (Q::Value, Option) where @@ -394,7 +394,7 @@ where // Only call `wait_for_query` if we're using a Rayon thread pool // as it will attempt to mark the worker thread as blocked. - return wait_for_query(query, qcx, span, key, latch, current_job_id); + return wait_for_query(query, qcx, span, key, &latch, current_job_id); } let id = job.id; From 44c5a429a080041e178c777d8382a8f4c472eca4 Mon Sep 17 00:00:00 2001 From: Daria Sukhonina Date: Wed, 3 Dec 2025 14:19:34 +0300 Subject: [PATCH 03/10] Format changes --- compiler/rustc_data_structures/src/sync.rs | 4 ++-- compiler/rustc_data_structures/src/sync/branch_key.rs | 11 +++++------ compiler/rustc_query_system/src/query/job.rs | 4 +--- 3 files changed, 8 insertions(+), 11 deletions(-) diff --git a/compiler/rustc_data_structures/src/sync.rs b/compiler/rustc_data_structures/src/sync.rs index 4ddb2b5391393..39101f0899ff8 100644 --- a/compiler/rustc_data_structures/src/sync.rs +++ b/compiler/rustc_data_structures/src/sync.rs @@ -36,6 +36,7 @@ pub use parking_lot::{ }; pub use self::atomic::AtomicU64; +pub use self::branch_key::BranchKey; pub use self::freeze::{FreezeLock, FreezeReadGuard, FreezeWriteGuard}; #[doc(no_inline)] pub use self::lock::{Lock, LockGuard, Mode}; @@ -46,14 +47,13 @@ pub use self::parallel::{ pub use self::vec::{AppendOnlyIndexVec, AppendOnlyVec}; pub use self::worker_local::{Registry, WorkerLocal}; pub use crate::marker::*; -pub use self::branch_key::BranchKey; +mod branch_key; mod freeze; mod lock; mod parallel; mod vec; mod worker_local; -mod branch_key; /// Keep the conditional imports together in a submodule, so that import-sorting /// doesn't split them up. diff --git a/compiler/rustc_data_structures/src/sync/branch_key.rs b/compiler/rustc_data_structures/src/sync/branch_key.rs index bb4205b4998e5..0e9e02bb61526 100644 --- a/compiler/rustc_data_structures/src/sync/branch_key.rs +++ b/compiler/rustc_data_structures/src/sync/branch_key.rs @@ -11,7 +11,10 @@ impl BranchKey { pub fn bit_branch(self) -> [Self; 2] { let trailing_zeroes = self.0.trailing_zeros(); assert!(trailing_zeroes >= 1, "query branch space is exhausted"); - [BranchKey(self.0 ^ (0b11 << (trailing_zeroes - 1))), BranchKey(self.0 ^ (0b01 << (trailing_zeroes - 1)))] + [ + BranchKey(self.0 ^ (0b11 << (trailing_zeroes - 1))), + BranchKey(self.0 ^ (0b01 << (trailing_zeroes - 1))), + ] } pub fn bits_branch_iter(self, bits: u32) -> impl Iterator { @@ -21,9 +24,7 @@ impl BranchKey { .unwrap_or_else(|| panic!("query branch space is exhausted to fit {bits} bits")); let step = 1 << (allocated_shift + 1); let zero = self.0 & !(1 << trailing_zeroes) | (1 << allocated_shift); - (0..1 << bits).map(move |n| { - BranchKey(zero + step * n) - }) + (0..1 << bits).map(move |n| BranchKey(zero + step * n)) } pub fn n_branch_iter(self, n: u32) -> impl Iterator { @@ -35,5 +36,3 @@ impl BranchKey { self.0.cmp(&other.0) } } - - diff --git a/compiler/rustc_query_system/src/query/job.rs b/compiler/rustc_query_system/src/query/job.rs index a5124948c435c..ac8caff942d0f 100644 --- a/compiler/rustc_query_system/src/query/job.rs +++ b/compiler/rustc_query_system/src/query/job.rs @@ -195,9 +195,7 @@ pub(super) struct QueryLatch { impl QueryLatch { fn new() -> Self { - QueryLatch { - info: Mutex::new(QueryLatchInfo { complete: false, waiters: Vec::new() }), - } + QueryLatch { info: Mutex::new(QueryLatchInfo { complete: false, waiters: Vec::new() }) } } /// Awaits for the query job to complete. From fecf664390b3a3a852ccdd4e975cb7adf8db3a50 Mon Sep 17 00:00:00 2001 From: Daria Sukhonina Date: Wed, 3 Dec 2025 14:22:04 +0300 Subject: [PATCH 04/10] Move parallel interfaces to rustc_middle --- Cargo.lock | 1 + compiler/rustc_codegen_ssa/src/base.rs | 2 +- compiler/rustc_data_structures/src/sync.rs | 4 +- .../src/sync/parallel.rs | 182 ----------------- .../rustc_incremental/src/persist/save.rs | 2 +- compiler/rustc_interface/src/passes.rs | 3 +- compiler/rustc_lint/src/late.rs | 2 +- compiler/rustc_metadata/src/rmeta/encoder.rs | 2 +- compiler/rustc_middle/Cargo.toml | 1 + compiler/rustc_middle/src/hir/map.rs | 3 +- compiler/rustc_middle/src/hir/mod.rs | 2 +- compiler/rustc_middle/src/lib.rs | 1 + compiler/rustc_middle/src/sync.rs | 185 ++++++++++++++++++ compiler/rustc_monomorphize/src/collector.rs | 2 +- .../rustc_monomorphize/src/partitioning.rs | 3 +- 15 files changed, 200 insertions(+), 195 deletions(-) create mode 100644 compiler/rustc_middle/src/sync.rs diff --git a/Cargo.lock b/Cargo.lock index 07de5d2216855..7340dcba75df1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4257,6 +4257,7 @@ dependencies = [ "bitflags", "either", "gsgdt", + "parking_lot", "polonius-engine", "rustc_abi", "rustc_apfloat", diff --git a/compiler/rustc_codegen_ssa/src/base.rs b/compiler/rustc_codegen_ssa/src/base.rs index 8ab0b367f08a6..c1baa326ab9d4 100644 --- a/compiler/rustc_codegen_ssa/src/base.rs +++ b/compiler/rustc_codegen_ssa/src/base.rs @@ -10,7 +10,6 @@ use rustc_ast::expand::allocator::{ }; use rustc_data_structures::fx::{FxHashMap, FxIndexSet}; use rustc_data_structures::profiling::{get_resident_set_size, print_time_passes_entry}; -use rustc_data_structures::sync::{IntoDynSyncSend, par_map}; use rustc_data_structures::unord::UnordMap; use rustc_hir::attrs::{AttributeKind, DebuggerVisualizerType, OptimizeAttr}; use rustc_hir::def_id::{CRATE_DEF_ID, DefId, LOCAL_CRATE}; @@ -25,6 +24,7 @@ use rustc_middle::mir::BinOp; use rustc_middle::mir::interpret::ErrorHandled; use rustc_middle::mir::mono::{CodegenUnit, CodegenUnitNameBuilder, MonoItem, MonoItemPartitions}; use rustc_middle::query::Providers; +use rustc_middle::sync::{IntoDynSyncSend, par_map}; use rustc_middle::ty::layout::{HasTyCtxt, HasTypingEnv, LayoutOf, TyAndLayout}; use rustc_middle::ty::{self, Instance, Ty, TyCtxt}; use rustc_middle::{bug, span_bug}; diff --git a/compiler/rustc_data_structures/src/sync.rs b/compiler/rustc_data_structures/src/sync.rs index 39101f0899ff8..732bb1879b494 100644 --- a/compiler/rustc_data_structures/src/sync.rs +++ b/compiler/rustc_data_structures/src/sync.rs @@ -41,9 +41,7 @@ pub use self::freeze::{FreezeLock, FreezeReadGuard, FreezeWriteGuard}; #[doc(no_inline)] pub use self::lock::{Lock, LockGuard, Mode}; pub use self::mode::{is_dyn_thread_safe, set_dyn_thread_safe_mode}; -pub use self::parallel::{ - broadcast, join, par_for_each_in, par_map, parallel_guard, scope, spawn, try_par_for_each_in, -}; +pub use self::parallel::{ParallelGuard, broadcast, parallel_guard, spawn}; pub use self::vec::{AppendOnlyIndexVec, AppendOnlyVec}; pub use self::worker_local::{Registry, WorkerLocal}; pub use crate::marker::*; diff --git a/compiler/rustc_data_structures/src/sync/parallel.rs b/compiler/rustc_data_structures/src/sync/parallel.rs index b515c0bee8a6e..15039567f6859 100644 --- a/compiler/rustc_data_structures/src/sync/parallel.rs +++ b/compiler/rustc_data_structures/src/sync/parallel.rs @@ -43,54 +43,6 @@ pub fn parallel_guard(f: impl FnOnce(&ParallelGuard) -> R) -> R { ret } -fn serial_join(oper_a: A, oper_b: B) -> (RA, RB) -where - A: FnOnce() -> RA, - B: FnOnce() -> RB, -{ - let (a, b) = parallel_guard(|guard| { - let a = guard.run(oper_a); - let b = guard.run(oper_b); - (a, b) - }); - (a.unwrap(), b.unwrap()) -} - -/// Runs a list of blocks in parallel. The first block is executed immediately on -/// the current thread. Use that for the longest running block. -#[macro_export] -macro_rules! parallel { - (impl $fblock:block [$($c:expr,)*] [$block:expr $(, $rest:expr)*]) => { - parallel!(impl $fblock [$block, $($c,)*] [$($rest),*]) - }; - (impl $fblock:block [$($blocks:expr,)*] []) => { - $crate::sync::parallel_guard(|guard| { - $crate::sync::scope(|s| { - $( - let block = $crate::sync::FromDyn::from(|| $blocks); - s.spawn(move |_| { - guard.run(move || block.into_inner()()); - }); - )* - guard.run(|| $fblock); - }); - }); - }; - ($fblock:block, $($blocks:block),*) => { - if $crate::sync::is_dyn_thread_safe() { - // Reverse the order of the later blocks since Rayon executes them in reverse order - // when using a single thread. This ensures the execution order matches that - // of a single threaded rustc. - parallel!(impl $fblock [] [$($blocks),*]); - } else { - $crate::sync::parallel_guard(|guard| { - guard.run(|| $fblock); - $(guard.run(|| $blocks);)* - }); - } - }; - } - pub fn spawn(func: impl FnOnce() + DynSend + 'static) { if mode::is_dyn_thread_safe() { let func = FromDyn::from(func); @@ -102,140 +54,6 @@ pub fn spawn(func: impl FnOnce() + DynSend + 'static) { } } -// This function only works when `mode::is_dyn_thread_safe()`. -pub fn scope<'scope, OP, R>(op: OP) -> R -where - OP: FnOnce(&rustc_thread_pool::Scope<'scope>) -> R + DynSend, - R: DynSend, -{ - let op = FromDyn::from(op); - rustc_thread_pool::scope(|s| FromDyn::from(op.into_inner()(s))).into_inner() -} - -#[inline] -pub fn join(oper_a: A, oper_b: B) -> (RA, RB) -where - A: FnOnce() -> RA + DynSend, - B: FnOnce() -> RB + DynSend, -{ - if mode::is_dyn_thread_safe() { - let oper_a = FromDyn::from(oper_a); - let oper_b = FromDyn::from(oper_b); - let (a, b) = parallel_guard(|guard| { - rustc_thread_pool::join( - move || guard.run(move || FromDyn::from(oper_a.into_inner()())), - move || guard.run(move || FromDyn::from(oper_b.into_inner()())), - ) - }); - (a.unwrap().into_inner(), b.unwrap().into_inner()) - } else { - serial_join(oper_a, oper_b) - } -} - -fn par_slice( - items: &mut [I], - guard: &ParallelGuard, - for_each: impl Fn(&mut I) + DynSync + DynSend, -) { - struct State<'a, F> { - for_each: FromDyn, - guard: &'a ParallelGuard, - group: usize, - } - - fn par_rec( - items: &mut [I], - state: &State<'_, F>, - ) { - if items.len() <= state.group { - for item in items { - state.guard.run(|| (state.for_each)(item)); - } - } else { - let (left, right) = items.split_at_mut(items.len() / 2); - let mut left = state.for_each.derive(left); - let mut right = state.for_each.derive(right); - rustc_thread_pool::join(move || par_rec(*left, state), move || par_rec(*right, state)); - } - } - - let state = State { - for_each: FromDyn::from(for_each), - guard, - group: std::cmp::max(items.len() / 128, 1), - }; - par_rec(items, &state) -} - -pub fn par_for_each_in>( - t: T, - for_each: impl Fn(&I) + DynSync + DynSend, -) { - parallel_guard(|guard| { - if mode::is_dyn_thread_safe() { - let mut items: Vec<_> = t.into_iter().collect(); - par_slice(&mut items, guard, |i| for_each(&*i)) - } else { - t.into_iter().for_each(|i| { - guard.run(|| for_each(&i)); - }); - } - }); -} - -/// This runs `for_each` in parallel for each iterator item. If one or more of the -/// `for_each` calls returns `Err`, the function will also return `Err`. The error returned -/// will be non-deterministic, but this is expected to be used with `ErrorGuaranteed` which -/// are all equivalent. -pub fn try_par_for_each_in( - t: T, - for_each: impl Fn(&::Item) -> Result<(), E> + DynSync + DynSend, -) -> Result<(), E> -where - ::Item: DynSend, -{ - parallel_guard(|guard| { - if mode::is_dyn_thread_safe() { - let mut items: Vec<_> = t.into_iter().collect(); - - let error = Mutex::new(None); - - par_slice(&mut items, guard, |i| { - if let Err(err) = for_each(&*i) { - *error.lock() = Some(err); - } - }); - - if let Some(err) = error.into_inner() { Err(err) } else { Ok(()) } - } else { - t.into_iter().filter_map(|i| guard.run(|| for_each(&i))).fold(Ok(()), Result::and) - } - }) -} - -pub fn par_map, R: DynSend, C: FromIterator>( - t: T, - map: impl Fn(I) -> R + DynSync + DynSend, -) -> C { - parallel_guard(|guard| { - if mode::is_dyn_thread_safe() { - let map = FromDyn::from(map); - - let mut items: Vec<(Option, Option)> = - t.into_iter().map(|i| (Some(i), None)).collect(); - - par_slice(&mut items, guard, |i| { - i.1 = Some(map(i.0.take().unwrap())); - }); - - items.into_iter().filter_map(|i| i.1).collect() - } else { - t.into_iter().filter_map(|i| guard.run(|| map(i))).collect() - } - }) -} - pub fn broadcast(op: impl Fn(usize) -> R + DynSync) -> Vec { if mode::is_dyn_thread_safe() { let op = FromDyn::from(op); diff --git a/compiler/rustc_incremental/src/persist/save.rs b/compiler/rustc_incremental/src/persist/save.rs index 58fea3278a839..445001bc3fbda 100644 --- a/compiler/rustc_incremental/src/persist/save.rs +++ b/compiler/rustc_incremental/src/persist/save.rs @@ -2,10 +2,10 @@ use std::fs; use std::sync::Arc; use rustc_data_structures::fx::FxIndexMap; -use rustc_data_structures::sync::join; use rustc_middle::dep_graph::{ DepGraph, SerializedDepGraph, WorkProduct, WorkProductId, WorkProductMap, }; +use rustc_middle::sync::join; use rustc_middle::ty::TyCtxt; use rustc_serialize::Encodable as RustcEncodable; use rustc_serialize::opaque::{FileEncodeResult, FileEncoder}; diff --git a/compiler/rustc_interface/src/passes.rs b/compiler/rustc_interface/src/passes.rs index a0383b187de51..5e024d319d0e1 100644 --- a/compiler/rustc_interface/src/passes.rs +++ b/compiler/rustc_interface/src/passes.rs @@ -12,7 +12,7 @@ use rustc_codegen_ssa::{CodegenResults, CrateInfo}; use rustc_data_structures::jobserver::Proxy; use rustc_data_structures::steal::Steal; use rustc_data_structures::sync::{AppendOnlyIndexVec, FreezeLock, WorkerLocal}; -use rustc_data_structures::{parallel, thousands}; +use rustc_data_structures::thousands; use rustc_errors::timings::TimingSection; use rustc_expand::base::{ExtCtxt, LintStoreExpand}; use rustc_feature::Features; @@ -27,6 +27,7 @@ use rustc_metadata::EncodedMetadata; use rustc_metadata::creader::CStore; use rustc_middle::arena::Arena; use rustc_middle::dep_graph::DepsType; +use rustc_middle::parallel; use rustc_middle::ty::{self, CurrentGcx, GlobalCtxt, RegisteredTools, TyCtxt}; use rustc_middle::util::Providers; use rustc_parse::lexer::StripTokens; diff --git a/compiler/rustc_lint/src/late.rs b/compiler/rustc_lint/src/late.rs index ccfba715a1be3..9f0366781cd13 100644 --- a/compiler/rustc_lint/src/late.rs +++ b/compiler/rustc_lint/src/late.rs @@ -7,10 +7,10 @@ use std::any::Any; use std::cell::Cell; use rustc_data_structures::stack::ensure_sufficient_stack; -use rustc_data_structures::sync::join; use rustc_hir::def_id::{LocalDefId, LocalModDefId}; use rustc_hir::{self as hir, AmbigArg, HirId, intravisit as hir_visit}; use rustc_middle::hir::nested_filter; +use rustc_middle::sync::join; use rustc_middle::ty::{self, TyCtxt}; use rustc_session::Session; use rustc_session::lint::LintPass; diff --git a/compiler/rustc_metadata/src/rmeta/encoder.rs b/compiler/rustc_metadata/src/rmeta/encoder.rs index b15ed34fc3569..0863ce3f2b1bd 100644 --- a/compiler/rustc_metadata/src/rmeta/encoder.rs +++ b/compiler/rustc_metadata/src/rmeta/encoder.rs @@ -7,7 +7,6 @@ use std::sync::Arc; use rustc_data_structures::fx::{FxIndexMap, FxIndexSet}; use rustc_data_structures::memmap::{Mmap, MmapMut}; -use rustc_data_structures::sync::{join, par_for_each_in}; use rustc_data_structures::temp_dir::MaybeTempDir; use rustc_data_structures::thousands::usize_with_underscores; use rustc_feature::Features; @@ -21,6 +20,7 @@ use rustc_middle::dep_graph::WorkProductId; use rustc_middle::middle::dependency_format::Linkage; use rustc_middle::mir::interpret; use rustc_middle::query::Providers; +use rustc_middle::sync::{join, par_for_each_in}; use rustc_middle::traits::specialization_graph; use rustc_middle::ty::AssocContainer; use rustc_middle::ty::codec::TyEncoder; diff --git a/compiler/rustc_middle/Cargo.toml b/compiler/rustc_middle/Cargo.toml index fbcce16cedca8..669d87182ae18 100644 --- a/compiler/rustc_middle/Cargo.toml +++ b/compiler/rustc_middle/Cargo.toml @@ -8,6 +8,7 @@ edition = "2024" bitflags = "2.4.1" either = "1.5.0" gsgdt = "0.1.2" +parking_lot = "0.12" polonius-engine = "0.13.0" rustc_abi = { path = "../rustc_abi" } rustc_apfloat = "0.2.0" diff --git a/compiler/rustc_middle/src/hir/map.rs b/compiler/rustc_middle/src/hir/map.rs index 5da762ef85650..e2fe506aede3d 100644 --- a/compiler/rustc_middle/src/hir/map.rs +++ b/compiler/rustc_middle/src/hir/map.rs @@ -7,7 +7,7 @@ use rustc_ast::visit::{VisitorResult, walk_list}; use rustc_data_structures::fingerprint::Fingerprint; use rustc_data_structures::stable_hasher::{HashStable, StableHasher}; use rustc_data_structures::svh::Svh; -use rustc_data_structures::sync::{DynSend, DynSync, par_for_each_in, try_par_for_each_in}; +use rustc_data_structures::sync::{DynSend, DynSync}; use rustc_hir::attrs::AttributeKind; use rustc_hir::def::{DefKind, Res}; use rustc_hir::def_id::{DefId, LOCAL_CRATE, LocalDefId, LocalModDefId}; @@ -15,6 +15,7 @@ use rustc_hir::definitions::{DefKey, DefPath, DefPathHash}; use rustc_hir::intravisit::Visitor; use rustc_hir::*; use rustc_hir_pretty as pprust_hir; +use rustc_middle::sync::{par_for_each_in, try_par_for_each_in}; use rustc_span::def_id::StableCrateId; use rustc_span::{ErrorGuaranteed, Ident, Span, Symbol, kw, with_metavar_spans}; diff --git a/compiler/rustc_middle/src/hir/mod.rs b/compiler/rustc_middle/src/hir/mod.rs index 217ecbab059ec..28864007edf2c 100644 --- a/compiler/rustc_middle/src/hir/mod.rs +++ b/compiler/rustc_middle/src/hir/mod.rs @@ -9,7 +9,6 @@ pub mod place; use rustc_data_structures::fingerprint::Fingerprint; use rustc_data_structures::sorted_map::SortedMap; use rustc_data_structures::stable_hasher::{HashStable, StableHasher}; -use rustc_data_structures::sync::{DynSend, DynSync, try_par_for_each_in}; use rustc_hir::def::{DefKind, Res}; use rustc_hir::def_id::{DefId, LocalDefId, LocalModDefId}; use rustc_hir::lints::DelayedLint; @@ -18,6 +17,7 @@ use rustc_macros::{Decodable, Encodable, HashStable}; use rustc_span::{ErrorGuaranteed, ExpnId, Span}; use crate::query::Providers; +use crate::sync::{DynSend, DynSync, try_par_for_each_in}; use crate::ty::TyCtxt; /// Gather the LocalDefId for each item-like within a module, including items contained within diff --git a/compiler/rustc_middle/src/lib.rs b/compiler/rustc_middle/src/lib.rs index ee3e89e57bd42..b19e0b04c1483 100644 --- a/compiler/rustc_middle/src/lib.rs +++ b/compiler/rustc_middle/src/lib.rs @@ -81,6 +81,7 @@ pub mod lint; pub mod metadata; pub mod middle; pub mod mir; +pub mod sync; pub mod thir; pub mod traits; pub mod ty; diff --git a/compiler/rustc_middle/src/sync.rs b/compiler/rustc_middle/src/sync.rs new file mode 100644 index 0000000000000..a937b5893fed2 --- /dev/null +++ b/compiler/rustc_middle/src/sync.rs @@ -0,0 +1,185 @@ +use parking_lot::Mutex; +pub use rustc_data_structures::marker::{DynSend, DynSync}; +pub use rustc_data_structures::sync::*; + +fn serial_join(oper_a: A, oper_b: B) -> (RA, RB) +where + A: FnOnce() -> RA, + B: FnOnce() -> RB, +{ + let (a, b) = parallel_guard(|guard| { + let a = guard.run(oper_a); + let b = guard.run(oper_b); + (a, b) + }); + (a.unwrap(), b.unwrap()) +} + +/// Runs a list of blocks in parallel. The first block is executed immediately on +/// the current thread. Use that for the longest running block. +#[macro_export] +macro_rules! parallel { + (impl $fblock:block [$($c:expr,)*] [$block:expr $(, $rest:expr)*]) => { + parallel!(impl $fblock [$block, $($c,)*] [$($rest),*]) + }; + (impl $fblock:block [$($blocks:expr,)*] []) => { + $crate::sync::parallel_guard(|guard| { + $crate::sync::scope(|s| { + $( + let block = $crate::sync::FromDyn::from(|| $blocks); + s.spawn(move |_| { + guard.run(move || block.into_inner()()); + }); + )* + guard.run(|| $fblock); + }); + }); + }; + ($fblock:block, $($blocks:block),*) => { + if $crate::sync::is_dyn_thread_safe() { + // Reverse the order of the later blocks since Rayon executes them in reverse order + // when using a single thread. This ensures the execution order matches that + // of a single threaded rustc. + parallel!(impl $fblock [] [$($blocks),*]); + } else { + $crate::sync::parallel_guard(|guard| { + guard.run(|| $fblock); + $(guard.run(|| $blocks);)* + }); + } + }; + } + +// This function only works when `is_dyn_thread_safe()`. +pub fn scope<'scope, OP, R>(op: OP) -> R +where + OP: FnOnce(&rustc_thread_pool::Scope<'scope>) -> R + DynSend, + R: DynSend, +{ + let op = FromDyn::from(op); + rustc_thread_pool::scope(|s| FromDyn::from(op.into_inner()(s))).into_inner() +} + +#[inline] +pub fn join(oper_a: A, oper_b: B) -> (RA, RB) +where + A: FnOnce() -> RA + DynSend, + B: FnOnce() -> RB + DynSend, +{ + if is_dyn_thread_safe() { + let oper_a = FromDyn::from(oper_a); + let oper_b = FromDyn::from(oper_b); + let (a, b) = parallel_guard(|guard| { + rustc_thread_pool::join( + move || guard.run(move || FromDyn::from(oper_a.into_inner()())), + move || guard.run(move || FromDyn::from(oper_b.into_inner()())), + ) + }); + (a.unwrap().into_inner(), b.unwrap().into_inner()) + } else { + serial_join(oper_a, oper_b) + } +} + +fn par_slice( + items: &mut [I], + guard: &ParallelGuard, + for_each: impl Fn(&mut I) + DynSync + DynSend, +) { + struct State<'a, F> { + for_each: FromDyn, + guard: &'a ParallelGuard, + group: usize, + } + + fn par_rec( + items: &mut [I], + state: &State<'_, F>, + ) { + if items.len() <= state.group { + for item in items { + state.guard.run(|| (state.for_each)(item)); + } + } else { + let (left, right) = items.split_at_mut(items.len() / 2); + let mut left = state.for_each.derive(left); + let mut right = state.for_each.derive(right); + rustc_thread_pool::join(move || par_rec(*left, state), move || par_rec(*right, state)); + } + } + + let state = State { + for_each: FromDyn::from(for_each), + guard, + group: std::cmp::max(items.len() / 128, 1), + }; + par_rec(items, &state) +} + +pub fn par_for_each_in>( + t: T, + for_each: impl Fn(&I) + DynSync + DynSend, +) { + parallel_guard(|guard| { + if is_dyn_thread_safe() { + let mut items: Vec<_> = t.into_iter().collect(); + par_slice(&mut items, guard, |i| for_each(&*i)) + } else { + t.into_iter().for_each(|i| { + guard.run(|| for_each(&i)); + }); + } + }); +} + +/// This runs `for_each` in parallel for each iterator item. If one or more of the +/// `for_each` calls returns `Err`, the function will also return `Err`. The error returned +/// will be non-deterministic, but this is expected to be used with `ErrorGuaranteed` which +/// are all equivalent. +pub fn try_par_for_each_in( + t: T, + for_each: impl Fn(&::Item) -> Result<(), E> + DynSync + DynSend, +) -> Result<(), E> +where + ::Item: DynSend, +{ + parallel_guard(|guard| { + if is_dyn_thread_safe() { + let mut items: Vec<_> = t.into_iter().collect(); + + let error = Mutex::new(None); + + par_slice(&mut items, guard, |i| { + if let Err(err) = for_each(&*i) { + *error.lock() = Some(err); + } + }); + + if let Some(err) = error.into_inner() { Err(err) } else { Ok(()) } + } else { + t.into_iter().filter_map(|i| guard.run(|| for_each(&i))).fold(Ok(()), Result::and) + } + }) +} + +pub fn par_map, R: DynSend, C: FromIterator>( + t: T, + map: impl Fn(I) -> R + DynSync + DynSend, +) -> C { + parallel_guard(|guard| { + if is_dyn_thread_safe() { + let map = FromDyn::from(map); + + let mut items: Vec<(Option, Option)> = + t.into_iter().map(|i| (Some(i), None)).collect(); + + par_slice(&mut items, guard, |i| { + i.1 = Some(map(i.0.take().unwrap())); + }); + + items.into_iter().filter_map(|i| i.1).collect() + } else { + t.into_iter().filter_map(|i| guard.run(|| map(i))).collect() + } + }) +} diff --git a/compiler/rustc_monomorphize/src/collector.rs b/compiler/rustc_monomorphize/src/collector.rs index cd699436e0120..1d71cedd2c856 100644 --- a/compiler/rustc_monomorphize/src/collector.rs +++ b/compiler/rustc_monomorphize/src/collector.rs @@ -211,7 +211,6 @@ use std::cell::OnceCell; use std::ops::ControlFlow; use rustc_data_structures::fx::FxIndexMap; -use rustc_data_structures::sync::{MTLock, par_for_each_in}; use rustc_data_structures::unord::{UnordMap, UnordSet}; use rustc_hir as hir; use rustc_hir::attrs::InlineAttr; @@ -227,6 +226,7 @@ use rustc_middle::mir::mono::{ use rustc_middle::mir::visit::Visitor as MirVisitor; use rustc_middle::mir::{self, Body, Location, MentionedItem, traversal}; use rustc_middle::query::TyCtxtAt; +use rustc_middle::sync::{MTLock, par_for_each_in}; use rustc_middle::ty::adjustment::{CustomCoerceUnsized, PointerCoercion}; use rustc_middle::ty::layout::ValidityRequirement; use rustc_middle::ty::{ diff --git a/compiler/rustc_monomorphize/src/partitioning.rs b/compiler/rustc_monomorphize/src/partitioning.rs index 1c8d6db08c316..3ecddbbc731c1 100644 --- a/compiler/rustc_monomorphize/src/partitioning.rs +++ b/compiler/rustc_monomorphize/src/partitioning.rs @@ -99,14 +99,12 @@ use std::io::Write; use std::path::{Path, PathBuf}; use rustc_data_structures::fx::{FxIndexMap, FxIndexSet}; -use rustc_data_structures::sync; use rustc_data_structures::unord::{UnordMap, UnordSet}; use rustc_hir::LangItem; use rustc_hir::attrs::{InlineAttr, Linkage}; use rustc_hir::def::DefKind; use rustc_hir::def_id::{DefId, DefIdSet, LOCAL_CRATE}; use rustc_hir::definitions::DefPathDataName; -use rustc_middle::bug; use rustc_middle::middle::codegen_fn_attrs::CodegenFnAttrFlags; use rustc_middle::middle::exported_symbols::{SymbolExportInfo, SymbolExportLevel}; use rustc_middle::mir::mono::{ @@ -116,6 +114,7 @@ use rustc_middle::mir::mono::{ use rustc_middle::ty::print::{characteristic_def_id_of_type, with_no_trimmed_paths}; use rustc_middle::ty::{self, InstanceKind, TyCtxt}; use rustc_middle::util::Providers; +use rustc_middle::{bug, sync}; use rustc_session::CodegenUnits; use rustc_session::config::{DumpMonoStatsFormat, SwitchWithOptPath}; use rustc_span::Symbol; From 23975f0f2cc0a11e158a6137cf73834da0f4c3d1 Mon Sep 17 00:00:00 2001 From: Daria Sukhonina Date: Wed, 3 Dec 2025 21:07:39 +0300 Subject: [PATCH 05/10] Integrate query_branch tracking into parallel interfaces --- .../src/sync/branch_key.rs | 39 ++++++----- compiler/rustc_middle/src/sync.rs | 66 +++++++++++++++++-- compiler/rustc_middle/src/ty/context/tls.rs | 13 +++- compiler/rustc_query_impl/src/plumbing.rs | 2 + 4 files changed, 93 insertions(+), 27 deletions(-) diff --git a/compiler/rustc_data_structures/src/sync/branch_key.rs b/compiler/rustc_data_structures/src/sync/branch_key.rs index 0e9e02bb61526..24d97ecbeb13d 100644 --- a/compiler/rustc_data_structures/src/sync/branch_key.rs +++ b/compiler/rustc_data_structures/src/sync/branch_key.rs @@ -4,35 +4,38 @@ use std::cmp; pub struct BranchKey(u128); impl BranchKey { - pub fn root() -> Self { + pub const fn root() -> Self { Self(0x80000000_00000000_00000000_00000000) } - pub fn bit_branch(self) -> [Self; 2] { - let trailing_zeroes = self.0.trailing_zeros(); - assert!(trailing_zeroes >= 1, "query branch space is exhausted"); - [ - BranchKey(self.0 ^ (0b11 << (trailing_zeroes - 1))), - BranchKey(self.0 ^ (0b01 << (trailing_zeroes - 1))), - ] - } - - pub fn bits_branch_iter(self, bits: u32) -> impl Iterator { + fn bits_branch(self, branch_num: u128, bits: u32) -> BranchKey { let trailing_zeroes = self.0.trailing_zeros(); let allocated_shift = trailing_zeroes .checked_sub(bits) .unwrap_or_else(|| panic!("query branch space is exhausted to fit {bits} bits")); - let step = 1 << (allocated_shift + 1); - let zero = self.0 & !(1 << trailing_zeroes) | (1 << allocated_shift); - (0..1 << bits).map(move |n| BranchKey(zero + step * n)) + BranchKey( + self.0 & !(1 << trailing_zeroes) + | (1 << allocated_shift) + | (branch_num << (allocated_shift + 1)), + ) } - pub fn n_branch_iter(self, n: u32) -> impl Iterator { - let iter = self.bits_branch_iter(n.saturating_sub(1).checked_ilog2().map_or(0, |b| b + 1)); - iter.take(n.try_into().unwrap()) + pub fn branch(self, branch_num: u128, branch_space: u128) -> BranchKey { + debug_assert!( + branch_num < branch_space, + "branch_num = {branch_num} should be less than branch_space = {branch_space}" + ); + // floor(log2(n - 1)) + 1 == ceil(log2(n)) + self.bits_branch(branch_num, (branch_space - 1).checked_ilog2().map_or(0, |b| b + 1)) } - pub fn raw_cmp(self, other: Self) -> cmp::Ordering { + pub fn disjoint_cmp(self, other: Self) -> cmp::Ordering { self.0.cmp(&other.0) } } + +impl Default for BranchKey { + fn default() -> Self { + BranchKey::root() + } +} diff --git a/compiler/rustc_middle/src/sync.rs b/compiler/rustc_middle/src/sync.rs index a937b5893fed2..073771e786c70 100644 --- a/compiler/rustc_middle/src/sync.rs +++ b/compiler/rustc_middle/src/sync.rs @@ -2,6 +2,8 @@ use parking_lot::Mutex; pub use rustc_data_structures::marker::{DynSend, DynSync}; pub use rustc_data_structures::sync::*; +pub use crate::ty::tls; + fn serial_join(oper_a: A, oper_b: B) -> (RA, RB) where A: FnOnce() -> RA, @@ -23,11 +25,13 @@ macro_rules! parallel { parallel!(impl $fblock [$block, $($c,)*] [$($rest),*]) }; (impl $fblock:block [$($blocks:expr,)*] []) => { + #[allow(unreachable_code)] + let n = 1 $(+ 'a: { break 'a 1; let _ = || $blocks; })*; $crate::sync::parallel_guard(|guard| { - $crate::sync::scope(|s| { + $crate::sync::scope(n, |mut s| { $( let block = $crate::sync::FromDyn::from(|| $blocks); - s.spawn(move |_| { + s.spawn(move || { guard.run(move || block.into_inner()()); }); )* @@ -51,13 +55,36 @@ macro_rules! parallel { } // This function only works when `is_dyn_thread_safe()`. -pub fn scope<'scope, OP, R>(op: OP) -> R +pub fn scope<'scope, OP, R>(spawn_limit: u128, op: OP) -> R where - OP: FnOnce(&rustc_thread_pool::Scope<'scope>) -> R + DynSend, + OP: for<'a, 'tcx> FnOnce(Scope<'a, 'scope>) -> R + DynSend, R: DynSend, { let op = FromDyn::from(op); - rustc_thread_pool::scope(|s| FromDyn::from(op.into_inner()(s))).into_inner() + rustc_thread_pool::scope(|scope| { + FromDyn::from(op.into_inner()(Scope { scope, next_branch: 0, branch_limit: spawn_limit })) + }) + .into_inner() +} + +pub struct Scope<'a, 'scope> { + scope: &'a rustc_thread_pool::Scope<'scope>, + branch_limit: u128, + next_branch: u128, +} + +impl<'a, 'scope> Scope<'a, 'scope> { + pub fn spawn(&mut self, f: F) + where + F: FnOnce() + Send + 'scope, + { + if self.next_branch >= self.branch_limit { + panic!("number of spawns exceeded the spawn_limit = {}", self.branch_limit); + } + let query_branch = self.next_branch; + self.next_branch += 1; + branch_context(query_branch, self.branch_limit, || self.scope.spawn(|_| f())); + } } #[inline] @@ -70,7 +97,7 @@ where let oper_a = FromDyn::from(oper_a); let oper_b = FromDyn::from(oper_b); let (a, b) = parallel_guard(|guard| { - rustc_thread_pool::join( + raw_branched_join( move || guard.run(move || FromDyn::from(oper_a.into_inner()())), move || guard.run(move || FromDyn::from(oper_b.into_inner()())), ) @@ -104,7 +131,7 @@ fn par_slice( let (left, right) = items.split_at_mut(items.len() / 2); let mut left = state.for_each.derive(left); let mut right = state.for_each.derive(right); - rustc_thread_pool::join(move || par_rec(*left, state), move || par_rec(*right, state)); + raw_branched_join(move || par_rec(*left, state), move || par_rec(*right, state)); } } @@ -183,3 +210,28 @@ pub fn par_map, R: DynSend, C: FromIterato } }) } + +fn raw_branched_join(oper_a: A, oper_b: B) -> (RA, RB) +where + A: FnOnce() -> RA + Send, + B: FnOnce() -> RB + Send, +{ + rustc_thread_pool::join(|| branch_context(0, 2, oper_a), || branch_context(1, 2, oper_b)) +} + +fn branch_context(branch_num: u128, branch_space: u128, f: F) -> R +where + F: FnOnce() -> R, +{ + tls::with_context_opt(|icx| { + if let Some(icx) = icx { + let icx = tls::ImplicitCtxt { + query_branch: icx.query_branch.branch(branch_num, branch_space), + ..*icx + }; + tls::enter_context(&icx, f) + } else { + f() + } + }) +} diff --git a/compiler/rustc_middle/src/ty/context/tls.rs b/compiler/rustc_middle/src/ty/context/tls.rs index fa9995898ac20..a9c0dcd88202c 100644 --- a/compiler/rustc_middle/src/ty/context/tls.rs +++ b/compiler/rustc_middle/src/ty/context/tls.rs @@ -1,6 +1,6 @@ use std::{mem, ptr}; -use rustc_data_structures::sync; +use rustc_data_structures::sync::{self, BranchKey}; use super::{GlobalCtxt, TyCtxt}; use crate::dep_graph::TaskDepsRef; @@ -23,6 +23,9 @@ pub struct ImplicitCtxt<'a, 'tcx> { /// Used to prevent queries from calling too deeply. pub query_depth: usize, + /// Branch per query, used to calculate deterministic query cycles + pub query_branch: BranchKey, + /// The current dep graph task. This is used to add dependencies to queries /// when executing them. pub task_deps: TaskDepsRef<'a>, @@ -31,7 +34,13 @@ pub struct ImplicitCtxt<'a, 'tcx> { impl<'a, 'tcx> ImplicitCtxt<'a, 'tcx> { pub fn new(gcx: &'tcx GlobalCtxt<'tcx>) -> Self { let tcx = TyCtxt { gcx }; - ImplicitCtxt { tcx, query: None, query_depth: 0, task_deps: TaskDepsRef::Ignore } + ImplicitCtxt { + tcx, + query: None, + query_depth: 0, + query_branch: BranchKey::root(), + task_deps: TaskDepsRef::Ignore, + } } } diff --git a/compiler/rustc_query_impl/src/plumbing.rs b/compiler/rustc_query_impl/src/plumbing.rs index 39b6fac4ebc0b..24064c0cae6f8 100644 --- a/compiler/rustc_query_impl/src/plumbing.rs +++ b/compiler/rustc_query_impl/src/plumbing.rs @@ -20,6 +20,7 @@ use rustc_middle::query::Key; use rustc_middle::query::on_disk_cache::{ AbsoluteBytePos, CacheDecoder, CacheEncoder, EncodedDepNodeIndex, }; +use rustc_middle::sync::BranchKey; use rustc_middle::ty::codec::TyEncoder; use rustc_middle::ty::print::with_reduced_queries; use rustc_middle::ty::tls::{self, ImplicitCtxt}; @@ -162,6 +163,7 @@ impl<'tcx> QueryContext for QueryCtxt<'tcx> { tcx: self.tcx, query: Some(token), query_depth: current_icx.query_depth + depth_limit as usize, + query_branch: BranchKey::root(), task_deps: current_icx.task_deps, }; From c7a4e8bb6266d1c061a1399ae39972b877787c55 Mon Sep 17 00:00:00 2001 From: Daria Sukhonina Date: Fri, 5 Dec 2025 15:46:21 +0300 Subject: [PATCH 06/10] Parse query cycle --- compiler/rustc_interface/src/interface.rs | 2 +- compiler/rustc_middle/src/sync.rs | 11 +- compiler/rustc_middle/src/ty/context/tls.rs | 16 +- compiler/rustc_query_impl/src/plumbing.rs | 14 +- compiler/rustc_query_system/src/query/job.rs | 484 ++++++++---------- compiler/rustc_query_system/src/query/mod.rs | 6 +- .../rustc_query_system/src/query/plumbing.rs | 18 +- 7 files changed, 263 insertions(+), 288 deletions(-) diff --git a/compiler/rustc_interface/src/interface.rs b/compiler/rustc_interface/src/interface.rs index c0f8f33692e8c..49feac95b8e68 100644 --- a/compiler/rustc_interface/src/interface.rs +++ b/compiler/rustc_interface/src/interface.rs @@ -589,7 +589,7 @@ pub fn try_print_query_stack( if let Some(icx) = icx { ty::print::with_no_queries!(print_query_stack( QueryCtxt::new(icx.tcx), - icx.query, + icx.query.map(|i| i.id), dcx, limit_frames, file, diff --git a/compiler/rustc_middle/src/sync.rs b/compiler/rustc_middle/src/sync.rs index 073771e786c70..55c4aef18fb9f 100644 --- a/compiler/rustc_middle/src/sync.rs +++ b/compiler/rustc_middle/src/sync.rs @@ -1,6 +1,7 @@ use parking_lot::Mutex; pub use rustc_data_structures::marker::{DynSend, DynSync}; pub use rustc_data_structures::sync::*; +use rustc_query_system::query::QueryInclusion; pub use crate::ty::tls; @@ -224,9 +225,15 @@ where F: FnOnce() -> R, { tls::with_context_opt(|icx| { - if let Some(icx) = icx { + if let Some(icx) = icx + && let Some(QueryInclusion { id, branch, real_depth }) = icx.query + { let icx = tls::ImplicitCtxt { - query_branch: icx.query_branch.branch(branch_num, branch_space), + query: Some(QueryInclusion { + id, + branch: branch.branch(branch_num, branch_space), + real_depth, + }), ..*icx }; tls::enter_context(&icx, f) diff --git a/compiler/rustc_middle/src/ty/context/tls.rs b/compiler/rustc_middle/src/ty/context/tls.rs index a9c0dcd88202c..6bd8b253e24c5 100644 --- a/compiler/rustc_middle/src/ty/context/tls.rs +++ b/compiler/rustc_middle/src/ty/context/tls.rs @@ -1,6 +1,7 @@ use std::{mem, ptr}; -use rustc_data_structures::sync::{self, BranchKey}; +use rustc_data_structures::sync; +use rustc_query_system::query::QueryInclusion; use super::{GlobalCtxt, TyCtxt}; use crate::dep_graph::TaskDepsRef; @@ -18,14 +19,11 @@ pub struct ImplicitCtxt<'a, 'tcx> { /// The current query job, if any. This is updated by `JobOwner::start` in /// `ty::query::plumbing` when executing a query. - pub query: Option, + pub query: Option, /// Used to prevent queries from calling too deeply. pub query_depth: usize, - /// Branch per query, used to calculate deterministic query cycles - pub query_branch: BranchKey, - /// The current dep graph task. This is used to add dependencies to queries /// when executing them. pub task_deps: TaskDepsRef<'a>, @@ -34,13 +32,7 @@ pub struct ImplicitCtxt<'a, 'tcx> { impl<'a, 'tcx> ImplicitCtxt<'a, 'tcx> { pub fn new(gcx: &'tcx GlobalCtxt<'tcx>) -> Self { let tcx = TyCtxt { gcx }; - ImplicitCtxt { - tcx, - query: None, - query_depth: 0, - query_branch: BranchKey::root(), - task_deps: TaskDepsRef::Ignore, - } + ImplicitCtxt { tcx, query: None, query_depth: 0, task_deps: TaskDepsRef::Ignore } } } diff --git a/compiler/rustc_query_impl/src/plumbing.rs b/compiler/rustc_query_impl/src/plumbing.rs index 24064c0cae6f8..abc2ce71a2c79 100644 --- a/compiler/rustc_query_impl/src/plumbing.rs +++ b/compiler/rustc_query_impl/src/plumbing.rs @@ -28,7 +28,7 @@ use rustc_middle::ty::{self, TyCtxt}; use rustc_query_system::dep_graph::{DepNodeParams, HasDepContext}; use rustc_query_system::ich::StableHashingContext; use rustc_query_system::query::{ - QueryCache, QueryConfig, QueryContext, QueryJobId, QueryMap, QuerySideEffect, + QueryCache, QueryConfig, QueryContext, QueryInclusion, QueryJobId, QueryMap, QuerySideEffect, QueryStackDeferred, QueryStackFrame, QueryStackFrameExtra, force_query, }; use rustc_query_system::{QueryOverflow, QueryOverflowNote}; @@ -85,7 +85,7 @@ impl<'tcx> QueryContext for QueryCtxt<'tcx> { } #[inline] - fn current_query_job(self) -> Option { + fn current_query_inclusion(self) -> Option { tls::with_related_context(self.tcx, |icx| icx.query) } @@ -161,9 +161,15 @@ impl<'tcx> QueryContext for QueryCtxt<'tcx> { // Update the `ImplicitCtxt` to point to our new query job. let new_icx = ImplicitCtxt { tcx: self.tcx, - query: Some(token), + query: Some(QueryInclusion { + id: token, + branch: BranchKey::root(), + real_depth: NonZero::new( + current_icx.query.map_or(0, |q| q.real_depth.get()).wrapping_add(1), + ) + .expect("real query depth exceeded type bounds"), + }), query_depth: current_icx.query_depth + depth_limit as usize, - query_branch: BranchKey::root(), task_deps: current_icx.task_deps, }; diff --git a/compiler/rustc_query_system/src/query/job.rs b/compiler/rustc_query_system/src/query/job.rs index ac8caff942d0f..5651475b74d8f 100644 --- a/compiler/rustc_query_system/src/query/job.rs +++ b/compiler/rustc_query_system/src/query/job.rs @@ -1,12 +1,15 @@ +use std::collections::hash_map; use std::fmt::Debug; use std::hash::Hash; use std::io::Write; -use std::iter; use std::num::NonZero; use std::sync::{Arc, Weak}; +use std::thread::ThreadId; +use std::{cmp, iter}; use parking_lot::{Condvar, Mutex}; use rustc_data_structures::fx::{FxHashMap, FxHashSet}; +use rustc_data_structures::sync::BranchKey; use rustc_errors::{Diag, DiagCtxtHandle}; use rustc_hir::def::DefKind; use rustc_session::Session; @@ -50,7 +53,7 @@ impl QueryJobId { map.get(&self).unwrap().job.span } - fn parent(self, map: &QueryMap) -> Option { + fn parent(self, map: &QueryMap) -> Option { map.get(&self).unwrap().job.parent } @@ -74,7 +77,10 @@ pub struct QueryJob { pub span: Span, /// The parent query job which created this job and is implicitly waiting on it. - pub parent: Option, + pub parent: Option, + + /// Id of the query's execution thread. + pub thread_id: ThreadId, /// The latch that is used to wait on this job. latch: Weak>, @@ -82,15 +88,30 @@ pub struct QueryJob { impl Clone for QueryJob { fn clone(&self) -> Self { - Self { id: self.id, span: self.span, parent: self.parent, latch: self.latch.clone() } + Self { + id: self.id, + span: self.span, + parent: self.parent, + thread_id: self.thread_id, + latch: self.latch.clone(), + } } } impl QueryJob { /// Creates a new query job. #[inline] - pub fn new(id: QueryJobId, span: Span, parent: Option) -> Self { - QueryJob { id, span, parent, latch: Weak::new() } + pub fn new( + id: QueryJobId, + span: Span, + parent: Option, + thread_id: ThreadId, + ) -> Self { + QueryJob { id, span, parent, thread_id, latch: Weak::new() } + } + + pub fn real_depth(&self) -> usize { + self.parent.as_ref().map_or(0, |i| i.real_depth.get()) } pub(super) fn latch(&mut self) -> Arc> { @@ -119,12 +140,11 @@ impl QueryJobId { pub(super) fn find_cycle_in_stack( &self, query_map: QueryMap, - current_job: &Option, + mut current_job: Option, span: Span, ) -> CycleError { // Find the waitee amongst `current_job` parents let mut cycle = Vec::new(); - let mut current_job = Option::clone(current_job); while let Some(job) = current_job { let info = query_map.get(&job).unwrap(); @@ -143,11 +163,11 @@ impl QueryJobId { .job .parent .as_ref() - .map(|parent| (info.job.span, parent.query(&query_map))); + .map(|parent| (info.job.span, parent.id.query(&query_map))); return CycleError { usage, cycle }; } - current_job = info.job.parent; + current_job = info.job.parent.map(|i| i.id); } panic!("did not find a cycle") @@ -159,29 +179,43 @@ impl QueryJobId { let mut depth = 1; let info = query_map.get(&self).unwrap(); let dep_kind = info.query.dep_kind; - let mut current_id = info.job.parent; + let mut current = info.job.parent; let mut last_layout = (info.clone(), depth); - while let Some(id) = current_id { - let info = query_map.get(&id).unwrap(); + while let Some(inclusion) = current { + let info = query_map.get(&inclusion.id).unwrap(); if info.query.dep_kind == dep_kind { depth += 1; last_layout = (info.clone(), depth); } - current_id = info.job.parent; + current = info.job.parent; } last_layout } } +#[derive(Clone, Copy, Debug)] +pub struct QueryInclusion { + pub id: QueryJobId, + pub branch: BranchKey, + pub real_depth: NonZero, +} + #[derive(Debug)] struct QueryWaiter { - query: Option, + query: Option, + thread_id: ThreadId, condvar: Condvar, span: Span, cycle: Mutex>>, } +impl QueryWaiter { + fn real_depth(&self) -> usize { + self.query.as_ref().map_or(0, |i| i.real_depth.get()) + } +} + #[derive(Debug)] struct QueryLatchInfo { complete: bool, @@ -202,11 +236,16 @@ impl QueryLatch { pub(super) fn wait_on( &self, qcx: impl QueryContext, - query: Option, + query: Option, span: Span, ) -> Result<(), CycleError> { - let waiter = - Arc::new(QueryWaiter { query, span, cycle: Mutex::new(None), condvar: Condvar::new() }); + let waiter = Arc::new(QueryWaiter { + query, + span, + thread_id: std::thread::current().id(), + cycle: Mutex::new(None), + condvar: Condvar::new(), + }); self.wait_on_inner(qcx, &waiter); // FIXME: Get rid of this lock. We have ownership of the QueryWaiter // although another thread may still have a Arc reference so we cannot @@ -266,110 +305,6 @@ impl QueryLatch { /// A resumable waiter of a query. The usize is the index into waiters in the query's latch type Waiter = (QueryJobId, usize); -/// Visits all the non-resumable and resumable waiters of a query. -/// Only waiters in a query are visited. -/// `visit` is called for every waiter and is passed a query waiting on `query_ref` -/// and a span indicating the reason the query waited on `query_ref`. -/// If `visit` returns Some, this function returns. -/// For visits of non-resumable waiters it returns the return value of `visit`. -/// For visits of resumable waiters it returns Some(Some(Waiter)) which has the -/// required information to resume the waiter. -/// If all `visit` calls returns None, this function also returns None. -fn visit_waiters( - query_map: &QueryMap, - query: QueryJobId, - mut visit: F, -) -> Option> -where - F: FnMut(Span, QueryJobId) -> Option>, -{ - // Visit the parent query which is a non-resumable waiter since it's on the same stack - if let Some(parent) = query.parent(query_map) - && let Some(cycle) = visit(query.span(query_map), parent) - { - return Some(cycle); - } - - // Visit the explicit waiters which use condvars and are resumable - if let Some(latch) = query.latch(query_map).upgrade() { - for (i, waiter) in latch.info.lock().waiters.iter().enumerate() { - if let Some(waiter_query) = waiter.query { - if visit(waiter.span, waiter_query).is_some() { - // Return a value which indicates that this waiter can be resumed - return Some(Some((query, i))); - } - } - } - } - - None -} - -/// Look for query cycles by doing a depth first search starting at `query`. -/// `span` is the reason for the `query` to execute. This is initially DUMMY_SP. -/// If a cycle is detected, this initial value is replaced with the span causing -/// the cycle. -fn cycle_check( - query_map: &QueryMap, - query: QueryJobId, - span: Span, - stack: &mut Vec<(Span, QueryJobId)>, - visited: &mut FxHashSet, -) -> Option> { - if !visited.insert(query) { - return if let Some(p) = stack.iter().position(|q| q.1 == query) { - // We detected a query cycle, fix up the initial span and return Some - - // Remove previous stack entries - stack.drain(0..p); - // Replace the span for the first query with the cycle cause - stack[0].0 = span; - Some(None) - } else { - None - }; - } - - // Query marked as visited is added it to the stack - stack.push((span, query)); - - // Visit all the waiters - let r = visit_waiters(query_map, query, |span, successor| { - cycle_check(query_map, successor, span, stack, visited) - }); - - // Remove the entry in our stack if we didn't find a cycle - if r.is_none() { - stack.pop(); - } - - r -} - -/// Finds out if there's a path to the compiler root (aka. code which isn't in a query) -/// from `query` without going through any of the queries in `visited`. -/// This is achieved with a depth first search. -fn connected_to_root( - query_map: &QueryMap, - query: QueryJobId, - visited: &mut FxHashSet, -) -> bool { - // We already visited this or we're deliberately ignoring it - if !visited.insert(query) { - return false; - } - - // This query is connected to the root (it has no query parent), return true - if query.parent(query_map).is_none() { - return true; - } - - visit_waiters(query_map, query, |_, successor| { - connected_to_root(query_map, successor, visited).then_some(None) - }) - .is_some() -} - // Deterministically pick an query from a list fn pick_query<'a, I: Clone, T, F>(query_map: &QueryMap, queries: &'a [T], f: F) -> &'a T where @@ -391,159 +326,188 @@ where .unwrap() } -/// Looks for query cycles starting from the last query in `jobs`. -/// If a cycle is found, all queries in the cycle is removed from `jobs` and -/// the function return true. -/// If a cycle was not found, the starting query is removed from `jobs` and -/// the function returns false. -fn remove_cycle( - query_map: &QueryMap, - jobs: &mut Vec, - wakelist: &mut Vec>>, -) -> bool { - let mut visited = FxHashSet::default(); - let mut stack = Vec::new(); - // Look for a cycle starting with the last query in `jobs` - if let Some(waiter) = - cycle_check(query_map, jobs.pop().unwrap(), DUMMY_SP, &mut stack, &mut visited) - { - // The stack is a vector of pairs of spans and queries; reverse it so that - // the earlier entries require later entries - let (mut spans, queries): (Vec<_>, Vec<_>) = stack.into_iter().rev().unzip(); - - // Shift the spans so that queries are matched with the span for their waitee - spans.rotate_right(1); - - // Zip them back together - let mut stack: Vec<_> = iter::zip(spans, queries).collect(); - - // Remove the queries in our cycle from the list of jobs to look at - for r in &stack { - if let Some(pos) = jobs.iter().position(|j| j == &r.1) { - jobs.remove(pos); - } - } - - // Find the queries in the cycle which are - // connected to queries outside the cycle - let entry_points = stack - .iter() - .filter_map(|&(span, query)| { - if query.parent(query_map).is_none() { - // This query is connected to the root (it has no query parent) - Some((span, query, None)) - } else { - let mut waiters = Vec::new(); - // Find all the direct waiters who lead to the root - visit_waiters(query_map, query, |span, waiter| { - // Mark all the other queries in the cycle as already visited - let mut visited = FxHashSet::from_iter(stack.iter().map(|q| q.1)); - - if connected_to_root(query_map, waiter, &mut visited) { - waiters.push((span, waiter)); - } - - None - }); - if waiters.is_empty() { - None - } else { - // Deterministically pick one of the waiters to show to the user - let waiter = *pick_query(query_map, &waiters, |s| *s); - Some((span, query, Some(waiter))) - } - } - }) - .collect::)>>(); - - // Deterministically pick an entry point - let (_, entry_point, usage) = pick_query(query_map, &entry_points, |e| (e.0, e.1)); - - // Shift the stack so that our entry point is first - let entry_point_pos = stack.iter().position(|(_, query)| query == entry_point); - if let Some(pos) = entry_point_pos { - stack.rotate_left(pos); - } - - let usage = usage.as_ref().map(|(span, query)| (*span, query.query(query_map))); - - // Create the cycle error - let error = CycleError { - usage, - cycle: stack - .iter() - .map(|&(s, ref q)| QueryInfo { span: s, query: q.query(query_map) }) - .collect(), - }; - - // We unwrap `waiter` here since there must always be one - // edge which is resumable / waited using a query latch - let (waitee_query, waiter_idx) = waiter.unwrap(); - - // Extract the waiter we want to resume - let waiter = waitee_query.latch(query_map).upgrade().unwrap().extract_waiter(waiter_idx); - - // Set the cycle error so it will be picked up when resumed - *waiter.cycle.lock() = Some(error); - - // Put the waiter on the list of things to resume - wakelist.push(waiter); - - true - } else { - false - } -} - /// Detects query cycles by using depth first search over all active query jobs. /// If a query cycle is found it will break the cycle by finding an edge which /// uses a query latch and then resuming that waiter. /// There may be multiple cycles involved in a deadlock, so this searches /// all active queries for cycles before finally resuming all the waiters at once. +#[allow(rustc::potential_query_instability)] pub fn break_query_cycles( query_map: QueryMap, registry: &rustc_thread_pool::Registry, ) { - let mut wakelist = Vec::new(); - // It is OK per the comments: - // - https://github.com/rust-lang/rust/pull/131200#issuecomment-2798854932 - // - https://github.com/rust-lang/rust/pull/131200#issuecomment-2798866392 - #[allow(rustc::potential_query_instability)] - let mut jobs: Vec = query_map.keys().cloned().collect(); - - let mut found_cycle = false; - - while jobs.len() > 0 { - if remove_cycle(&query_map, &mut jobs, &mut wakelist) { - found_cycle = true; - } + use std::cmp::Ordering::*; + + #[derive(Debug)] + struct QueryWaitIntermediate { + depth: usize, + inner: Option>, } - // Check that a cycle was found. It is possible for a deadlock to occur without - // a query cycle if a query which can be waited on uses Rayon to do multithreading - // internally. Such a query (X) may be executing on 2 threads (A and B) and A may - // wait using Rayon on B. Rayon may then switch to executing another query (Y) - // which in turn will wait on X causing a deadlock. We have a false dependency from - // X to Y due to Rayon waiting and a true dependency from Y to X. The algorithm here - // only considers the true dependency and won't detect a cycle. - if !found_cycle { - panic!( - "deadlock detected as we're unable to find a query cycle to break\n\ - current query map:\n{:#?}", - query_map - ); + #[derive(Debug)] + enum QueryWait { + /// Waits on a running query + Waiter { waited_on: QueryJobId, waiter: Arc> }, + /// Waits other for tasks inside of `join` or `scope` + Direct { waited_on: Vec }, } - // Mark all the thread we're about to wake up as unblocked. This needs to be done before - // we wake the threads up as otherwise Rayon could detect a deadlock if a thread we - // resumed fell asleep and this thread had yet to mark the remaining threads as unblocked. - for _ in 0..wakelist.len() { - rustc_thread_pool::mark_unblocked(registry); + impl QueryWaitIntermediate { + fn from_depth(depth: usize) -> Self { + QueryWaitIntermediate { depth, inner: None } + } + + fn try_finalize(self) -> Option> { + self.inner + } } - for waiter in wakelist.into_iter() { - waiter.condvar.notify_one(); + let mut waits = FxHashMap::>::default(); + for query in query_map.values() { + // Account for every query + let query_depth = query.job.real_depth(); + let entry = waits.entry(query.job.thread_id); + match entry { + hash_map::Entry::Vacant(entry) => { + entry.insert(QueryWaitIntermediate::from_depth(query_depth)); + } + hash_map::Entry::Occupied(mut entry) => { + let wait = entry.get_mut(); + match (query_depth.cmp(&wait.depth), &mut wait.inner) { + (Less, _) => (), + (Equal, None) => { + panic!("encountered two queries on the same thread but at the same depth") + } + // Update thread's depth + (Greater, None) => wait.depth = query_depth, + + (Equal, Some(_)) => (), + (Greater, Some(QueryWait::Waiter { .. })) => { + panic!("query is deeper than thread's waiter") + } + // Overwrite direct wait cause a deeper query is found + (Greater, Some(QueryWait::Direct { .. })) => { + *wait = QueryWaitIntermediate::from_depth(query_depth) + } + } + } + } + + if let Some(inclusion) = query.job.parent { + let parent = &query_map[&inclusion.id]; + if parent.job.thread_id != query.job.thread_id { + // Consider adding a `QueryWaitDep::Direct` wait + let depth = parent.job.real_depth(); + let entry = waits.entry(parent.job.thread_id); + match entry { + hash_map::Entry::Vacant(entry) => { + entry.insert(QueryWaitIntermediate { + depth, + inner: Some(QueryWait::Direct { waited_on: vec![query.job.id] }), + }); + } + hash_map::Entry::Occupied(mut entry) => { + let wait = entry.get_mut(); + match (depth.cmp(&wait.depth), &mut wait.inner) { + (Less, _) => (), + (Equal, None) | (Greater, None | Some(QueryWait::Direct { .. })) => { + *wait = QueryWaitIntermediate { + depth, + inner: Some(QueryWait::Direct { + waited_on: vec![query.job.id], + }), + } + } + (Equal, Some(QueryWait::Direct { waited_on })) => { + if waited_on.contains(&query.job.id) { + panic!("trying to push another direct dependency") + } + waited_on.push(query.job.id) + } + (Equal, Some(QueryWait::Waiter { .. })) => { + panic!( + "query can only wait on a running query or in `join`/`scope`" + ) + } + (Greater, Some(QueryWait::Waiter { .. })) => { + panic!("query is deeper than thread's waiter") + } + } + } + } + } + } + + let Some(latch) = query.job.latch.upgrade() else { + continue; + }; + let lock = latch.info.try_lock().unwrap(); + assert!(!lock.complete); + for waiter in &lock.waiters { + let depth = waiter.real_depth(); + let old = waits.insert( + waiter.thread_id, + QueryWaitIntermediate { + depth, + inner: Some(QueryWait::Waiter { + waited_on: query.job.id, + waiter: waiter.clone(), + }), + }, + ); + // waiter has to be in the thread's deepest query + if let Some(wait) = old { + assert!(wait.depth <= depth); + if wait.depth == depth { + assert!(wait.inner.is_none()) + } + } + } } + + let waits: FxHashMap<_, _> = waits + .into_iter() + .map(|(k, v)| (k, v.try_finalize().expect("failed to process a query cycle"))) + .collect(); + for wait in waits.values() { + match wait { + QueryWait::Waiter { .. } => continue, + QueryWait::Direct { waited_on } => { + let parent = waited_on[0].parent(&query_map).unwrap().id; + for waited_on in &waited_on[1..] { + assert_eq!(parent, waited_on.parent(&query_map).unwrap().id) + } + } + } + } + + panic!("fuh: {waits:#?}") + + // // Check that a cycle was found. It is possible for a deadlock to occur without + // // a query cycle if a query which can be waited on uses Rayon to do multithreading + // // internally. Such a query (X) may be executing on 2 threads (A and B) and A may + // // wait using Rayon on B. Rayon may then switch to executing another query (Y) + // // which in turn will wait on X causing a deadlock. We have a false dependency from + // // X to Y due to Rayon waiting and a true dependency from Y to X. The algorithm here + // // only considers the true dependency and won't detect a cycle. + // if !found_cycle { + // panic!( + // "deadlock detected as we're unable to find a query cycle to break\n\ + // current query map:\n{:#?}", + // query_map + // ); + // } + + // // Mark all the thread we're about to wake up as unblocked. This needs to be done before + // // we wake the threads up as otherwise Rayon could detect a deadlock if a thread we + // // resumed fell asleep and this thread had yet to mark the remaining threads as unblocked. + // for _ in 0..wakelist.len() { + // rustc_thread_pool::mark_unblocked(registry); + // } + + // for waiter in wakelist.into_iter() { + // waiter.condvar.notify_one(); + // } } #[inline(never)] @@ -648,7 +612,7 @@ pub fn print_query_stack( ); } - current_query = query_info.job.parent; + current_query = query_info.job.parent.map(|i| i.id); count_total += 1; } diff --git a/compiler/rustc_query_system/src/query/mod.rs b/compiler/rustc_query_system/src/query/mod.rs index ce3456d532e69..39817e1e014d2 100644 --- a/compiler/rustc_query_system/src/query/mod.rs +++ b/compiler/rustc_query_system/src/query/mod.rs @@ -8,8 +8,8 @@ pub use self::plumbing::*; mod job; pub use self::job::{ - QueryInfo, QueryJob, QueryJobId, QueryJobInfo, QueryMap, break_query_cycles, print_query_stack, - report_cycle, + QueryInclusion, QueryInfo, QueryJob, QueryJobId, QueryJobInfo, QueryMap, break_query_cycles, + print_query_stack, report_cycle, }; mod caches; @@ -159,7 +159,7 @@ pub trait QueryContext: HasDepContext { fn next_job_id(self) -> QueryJobId; /// Get the query information from the TLS context. - fn current_query_job(self) -> Option; + fn current_query_inclusion(self) -> Option; fn collect_active_jobs( self, diff --git a/compiler/rustc_query_system/src/query/plumbing.rs b/compiler/rustc_query_system/src/query/plumbing.rs index dc0827db49a2c..801678f3b11f0 100644 --- a/compiler/rustc_query_system/src/query/plumbing.rs +++ b/compiler/rustc_query_system/src/query/plumbing.rs @@ -24,7 +24,9 @@ use crate::dep_graph::{DepContext, DepGraphData, DepNode, DepNodeIndex, DepNodeP use crate::ich::StableHashingContext; use crate::query::caches::QueryCache; use crate::query::job::{QueryInfo, QueryJob, QueryJobId, QueryJobInfo, QueryLatch, report_cycle}; -use crate::query::{QueryContext, QueryMap, QueryStackFrame, SerializedDepNodeIndex}; +use crate::query::{ + QueryContext, QueryInclusion, QueryMap, QueryStackFrame, SerializedDepNodeIndex, +}; #[inline] fn equivalent_key(k: &K) -> impl Fn(&(K, V)) -> bool + '_ { @@ -286,7 +288,11 @@ where // We need the complete map to ensure we find a cycle to break. let query_map = qcx.collect_active_jobs(false).ok().expect("failed to collect active queries"); - let error = try_execute.find_cycle_in_stack(query_map, &qcx.current_query_job(), span); + let error = try_execute.find_cycle_in_stack( + query_map, + qcx.current_query_inclusion().map(|i| i.id), + span, + ); (mk_cycle(query, qcx, error.lift(qcx)), None) } @@ -297,7 +303,7 @@ fn wait_for_query( span: Span, key: Q::Key, latch: &QueryLatch, - current: Option, + current: Option, ) -> (Q::Value, Option) where Q: QueryConfig, @@ -369,14 +375,14 @@ where } } - let current_job_id = qcx.current_query_job(); + let current_inclusion = qcx.current_query_inclusion(); match state_lock.entry(key_hash, equivalent_key(&key), |(k, _)| sharded::make_hash(k)) { Entry::Vacant(entry) => { // Nothing has computed or is computing the query, so we start a new job and insert it in the // state map. let id = qcx.next_job_id(); - let job = QueryJob::new(id, span, current_job_id); + let job = QueryJob::new(id, span, current_inclusion, std::thread::current().id()); entry.insert((key, QueryResult::Started(job))); // Drop the lock before we start executing the query @@ -394,7 +400,7 @@ where // Only call `wait_for_query` if we're using a Rayon thread pool // as it will attempt to mark the worker thread as blocked. - return wait_for_query(query, qcx, span, key, &latch, current_job_id); + return wait_for_query(query, qcx, span, key, &latch, current_inclusion); } let id = job.id; From 7d041f264cc0ebdb6a2f9884752e3e468c23bdbb Mon Sep 17 00:00:00 2001 From: Daria Sukhonina Date: Mon, 8 Dec 2025 16:07:49 +0300 Subject: [PATCH 07/10] Parse thread's query stack start --- compiler/rustc_query_system/src/query/job.rs | 171 ++++++++----------- 1 file changed, 73 insertions(+), 98 deletions(-) diff --git a/compiler/rustc_query_system/src/query/job.rs b/compiler/rustc_query_system/src/query/job.rs index 5651475b74d8f..28c178675e0c6 100644 --- a/compiler/rustc_query_system/src/query/job.rs +++ b/compiler/rustc_query_system/src/query/job.rs @@ -5,10 +5,10 @@ use std::io::Write; use std::num::NonZero; use std::sync::{Arc, Weak}; use std::thread::ThreadId; -use std::{cmp, iter}; +use std::ops; use parking_lot::{Condvar, Mutex}; -use rustc_data_structures::fx::{FxHashMap, FxHashSet}; +use rustc_data_structures::fx::FxHashMap; use rustc_data_structures::sync::BranchKey; use rustc_errors::{Diag, DiagCtxtHandle}; use rustc_hir::def::DefKind; @@ -336,12 +336,11 @@ pub fn break_query_cycles( query_map: QueryMap, registry: &rustc_thread_pool::Registry, ) { - use std::cmp::Ordering::*; - #[derive(Debug)] - struct QueryWaitIntermediate { - depth: usize, - inner: Option>, + struct QueryStackIntermediate { + start: Option, + depth: ops::RangeInclusive, + wait: Option>, } #[derive(Debug)] @@ -352,89 +351,42 @@ pub fn break_query_cycles( Direct { waited_on: Vec }, } - impl QueryWaitIntermediate { + impl QueryStackIntermediate { fn from_depth(depth: usize) -> Self { - QueryWaitIntermediate { depth, inner: None } + QueryStackIntermediate { start: None, depth: depth..=depth, wait: None } } - fn try_finalize(self) -> Option> { - self.inner + fn update_depth(&mut self, depth: usize) { + let (start, end) = self.depth.clone().into_inner(); + if depth < start { + self.depth = depth..=end; + } + if end < depth { + self.depth = start..=depth + } } } - let mut waits = FxHashMap::>::default(); + let mut stacks = FxHashMap::>::default(); for query in query_map.values() { - // Account for every query let query_depth = query.job.real_depth(); - let entry = waits.entry(query.job.thread_id); + let entry = stacks.entry(query.job.thread_id); match entry { hash_map::Entry::Vacant(entry) => { - entry.insert(QueryWaitIntermediate::from_depth(query_depth)); + entry.insert(QueryStackIntermediate::from_depth(query_depth)); } hash_map::Entry::Occupied(mut entry) => { - let wait = entry.get_mut(); - match (query_depth.cmp(&wait.depth), &mut wait.inner) { - (Less, _) => (), - (Equal, None) => { - panic!("encountered two queries on the same thread but at the same depth") - } - // Update thread's depth - (Greater, None) => wait.depth = query_depth, - - (Equal, Some(_)) => (), - (Greater, Some(QueryWait::Waiter { .. })) => { - panic!("query is deeper than thread's waiter") - } - // Overwrite direct wait cause a deeper query is found - (Greater, Some(QueryWait::Direct { .. })) => { - *wait = QueryWaitIntermediate::from_depth(query_depth) - } - } + entry.get_mut().update_depth(query_depth); } } if let Some(inclusion) = query.job.parent { let parent = &query_map[&inclusion.id]; if parent.job.thread_id != query.job.thread_id { - // Consider adding a `QueryWaitDep::Direct` wait - let depth = parent.job.real_depth(); - let entry = waits.entry(parent.job.thread_id); - match entry { - hash_map::Entry::Vacant(entry) => { - entry.insert(QueryWaitIntermediate { - depth, - inner: Some(QueryWait::Direct { waited_on: vec![query.job.id] }), - }); - } - hash_map::Entry::Occupied(mut entry) => { - let wait = entry.get_mut(); - match (depth.cmp(&wait.depth), &mut wait.inner) { - (Less, _) => (), - (Equal, None) | (Greater, None | Some(QueryWait::Direct { .. })) => { - *wait = QueryWaitIntermediate { - depth, - inner: Some(QueryWait::Direct { - waited_on: vec![query.job.id], - }), - } - } - (Equal, Some(QueryWait::Direct { waited_on })) => { - if waited_on.contains(&query.job.id) { - panic!("trying to push another direct dependency") - } - waited_on.push(query.job.id) - } - (Equal, Some(QueryWait::Waiter { .. })) => { - panic!( - "query can only wait on a running query or in `join`/`scope`" - ) - } - (Greater, Some(QueryWait::Waiter { .. })) => { - panic!("query is deeper than thread's waiter") - } - } - } - } + // Register the thread's query stack beginning + let stack = stacks.get_mut(&query.job.thread_id).unwrap(); + assert!(stack.start.is_none(), "found two active queries at a thread's begining"); + stack.start = Some(query.job.id); } } @@ -444,44 +396,67 @@ pub fn break_query_cycles( let lock = latch.info.try_lock().unwrap(); assert!(!lock.complete); for waiter in &lock.waiters { - let depth = waiter.real_depth(); - let old = waits.insert( - waiter.thread_id, - QueryWaitIntermediate { - depth, - inner: Some(QueryWait::Waiter { - waited_on: query.job.id, - waiter: waiter.clone(), - }), - }, + let waiting_stack = stacks + .entry(waiter.thread_id) + .or_insert_with(|| QueryStackIntermediate::from_depth(waiter.real_depth() - 1)); + assert!( + waiting_stack.wait.is_none(), + "found two active queries a thread is waiting for" ); - // waiter has to be in the thread's deepest query - if let Some(wait) = old { - assert!(wait.depth <= depth); - if wait.depth == depth { - assert!(wait.inner.is_none()) + waiting_stack.wait = + Some(QueryWait::Waiter { waited_on: query.job.id, waiter: Arc::clone(&waiter) }); + } + } + + // Figure out what queries leftover stacks are blocked on + let mut root_thread = None; + let thread_ids: Vec<_> = stacks.keys().copied().collect(); + for thread_id in &thread_ids { + let stack = &stacks[thread_id]; + if let Some(start_id) = stack.start { + let start = &query_map[&start_id]; + let inclusion = start.job.parent.unwrap(); + let parent = &query_map[&inclusion.id]; + assert_eq!(inclusion.real_depth.get(), *stack.depth.start()); + let waiting_stack = stacks.get_mut(&parent.job.thread_id).unwrap(); + if *waiting_stack.depth.end() == (inclusion.real_depth.get() - 1) { + match &mut waiting_stack.wait { + None => { + waiting_stack.wait = Some(QueryWait::Direct { waited_on: vec![start_id] }) + } + Some(QueryWait::Direct { waited_on }) => { + assert!(!waited_on.contains(&start_id)); + waited_on.push(start_id); + } + Some(QueryWait::Waiter { .. }) => (), } } + } else { + assert!(root_thread.is_none(), "found multiple threads without start"); + root_thread = Some(*thread_id); } } - let waits: FxHashMap<_, _> = waits - .into_iter() - .map(|(k, v)| (k, v.try_finalize().expect("failed to process a query cycle"))) - .collect(); - for wait in waits.values() { - match wait { - QueryWait::Waiter { .. } => continue, + let root_thread = root_thread.expect("no root thread was found"); + + for stack in stacks.values() { + match stack.wait.as_ref().expect("failed to figure out what active thread is waiting") { + QueryWait::Waiter { waited_on: _, waiter } => { + assert_eq!(waiter.real_depth() - 1, *stack.depth.end()) + } QueryWait::Direct { waited_on } => { - let parent = waited_on[0].parent(&query_map).unwrap().id; - for waited_on in &waited_on[1..] { - assert_eq!(parent, waited_on.parent(&query_map).unwrap().id) + let waited_on_query = &query_map[&waited_on[0]]; + let query_inclusion = waited_on_query.job.parent.unwrap(); + let parent_id = query_inclusion.id; + for waited_on_id in &waited_on[1..] { + assert_eq!(parent_id, query_map[waited_on_id].job.parent.unwrap().id); } + assert_eq!(query_inclusion.real_depth.get() - 1, *stack.depth.end()); } } } - panic!("fuh: {waits:#?}") + panic!("fuh: {stacks:#x?}") // // Check that a cycle was found. It is possible for a deadlock to occur without // // a query cycle if a query which can be waited on uses Rayon to do multithreading From dae8d1d28331223035c4369b1e326c71aead4dd4 Mon Sep 17 00:00:00 2001 From: Daria Sukhonina Date: Tue, 9 Dec 2025 17:30:06 +0300 Subject: [PATCH 08/10] Finish draft impl of new cycle detection --- .../src/sync/branch_key.rs | 27 +- compiler/rustc_query_system/src/query/job.rs | 241 ++++++++++++++---- 2 files changed, 213 insertions(+), 55 deletions(-) diff --git a/compiler/rustc_data_structures/src/sync/branch_key.rs b/compiler/rustc_data_structures/src/sync/branch_key.rs index 24d97ecbeb13d..cde71f6c4b63b 100644 --- a/compiler/rustc_data_structures/src/sync/branch_key.rs +++ b/compiler/rustc_data_structures/src/sync/branch_key.rs @@ -1,6 +1,6 @@ use std::cmp; -#[derive(Clone, Copy, Debug, PartialEq)] +#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)] pub struct BranchKey(u128); impl BranchKey { @@ -8,16 +8,14 @@ impl BranchKey { Self(0x80000000_00000000_00000000_00000000) } - fn bits_branch(self, branch_num: u128, bits: u32) -> BranchKey { - let trailing_zeroes = self.0.trailing_zeros(); - let allocated_shift = trailing_zeroes - .checked_sub(bits) - .unwrap_or_else(|| panic!("query branch space is exhausted to fit {bits} bits")); - BranchKey( - self.0 & !(1 << trailing_zeroes) + fn bits_branch(self, branch_num: u128, bits: u32) -> Result { + let trailing_zeros = self.0.trailing_zeros(); + let allocated_shift = trailing_zeros.checked_sub(bits).ok_or(BranchNestingError(()))?; + Ok(BranchKey( + self.0 & !(1 << trailing_zeros) | (1 << allocated_shift) | (branch_num << (allocated_shift + 1)), - ) + )) } pub fn branch(self, branch_num: u128, branch_space: u128) -> BranchKey { @@ -27,13 +25,24 @@ impl BranchKey { ); // floor(log2(n - 1)) + 1 == ceil(log2(n)) self.bits_branch(branch_num, (branch_space - 1).checked_ilog2().map_or(0, |b| b + 1)) + .expect("query branch space is exhausted") } pub fn disjoint_cmp(self, other: Self) -> cmp::Ordering { self.0.cmp(&other.0) } + + pub fn nest(self, then: Self) -> Result { + let trailing_zeros = then.0.trailing_zeros(); + let branch_num = then.0.wrapping_shr(trailing_zeros + 1); + let bits = u128::BITS - trailing_zeros; + self.bits_branch(branch_num, bits) + } } +#[derive(Debug)] +pub struct BranchNestingError(()); + impl Default for BranchKey { fn default() -> Self { BranchKey::root() diff --git a/compiler/rustc_query_system/src/query/job.rs b/compiler/rustc_query_system/src/query/job.rs index 28c178675e0c6..5535764de0f88 100644 --- a/compiler/rustc_query_system/src/query/job.rs +++ b/compiler/rustc_query_system/src/query/job.rs @@ -1,19 +1,21 @@ -use std::collections::hash_map; +use std::collections::{BTreeMap, hash_map}; use std::fmt::Debug; use std::hash::Hash; use std::io::Write; use std::num::NonZero; use std::sync::{Arc, Weak}; use std::thread::ThreadId; -use std::ops; +use std::{iter, ops}; use parking_lot::{Condvar, Mutex}; -use rustc_data_structures::fx::FxHashMap; +use rustc_data_structures::fx::{FxHashMap, FxHashSet}; +use rustc_data_structures::indexmap::{self, IndexMap, IndexSet}; use rustc_data_structures::sync::BranchKey; use rustc_errors::{Diag, DiagCtxtHandle}; use rustc_hir::def::DefKind; use rustc_session::Session; use rustc_span::{DUMMY_SP, Span}; +use smallvec::SmallVec; use super::QueryStackFrameExtra; use crate::dep_graph::DepContext; @@ -337,21 +339,13 @@ pub fn break_query_cycles( registry: &rustc_thread_pool::Registry, ) { #[derive(Debug)] - struct QueryStackIntermediate { + struct QueryStackIntermediate { start: Option, depth: ops::RangeInclusive, - wait: Option>, + wait: Option, } - #[derive(Debug)] - enum QueryWait { - /// Waits on a running query - Waiter { waited_on: QueryJobId, waiter: Arc> }, - /// Waits other for tasks inside of `join` or `scope` - Direct { waited_on: Vec }, - } - - impl QueryStackIntermediate { + impl QueryStackIntermediate { fn from_depth(depth: usize) -> Self { QueryStackIntermediate { start: None, depth: depth..=depth, wait: None } } @@ -367,27 +361,59 @@ pub fn break_query_cycles( } } - let mut stacks = FxHashMap::>::default(); + #[derive(Debug)] + enum QueryWait { + /// Waits on a running query + Waiter { waited_on: QueryJobId, waiter_idx: usize }, + /// Waits other for tasks inside of `join` or `scope` + Direct { waited_on: Vec }, + } + + impl QueryWait { + fn waiting_query(&self, query_map: &QueryMap) -> QueryJobId { + match self { + QueryWait::Waiter { waited_on, waiter_idx } => { + query_map[waited_on] + .job + .latch + .upgrade() + .unwrap() + .info + .try_lock() + .unwrap() + .waiters[*waiter_idx] + .query + .unwrap() + .id + } + QueryWait::Direct { waited_on } => query_map[&waited_on[0]].job.parent.unwrap().id, + } + } + } + + let mut stacks = FxHashMap::::default(); for query in query_map.values() { let query_depth = query.job.real_depth(); let entry = stacks.entry(query.job.thread_id); - match entry { + let stack = match entry { hash_map::Entry::Vacant(entry) => { - entry.insert(QueryStackIntermediate::from_depth(query_depth)); + entry.insert(QueryStackIntermediate::from_depth(query_depth)) } hash_map::Entry::Occupied(mut entry) => { - entry.get_mut().update_depth(query_depth); + let stack = entry.into_mut(); + stack.update_depth(query_depth); + stack } - } + }; - if let Some(inclusion) = query.job.parent { - let parent = &query_map[&inclusion.id]; - if parent.job.thread_id != query.job.thread_id { - // Register the thread's query stack beginning - let stack = stacks.get_mut(&query.job.thread_id).unwrap(); - assert!(stack.start.is_none(), "found two active queries at a thread's begining"); - stack.start = Some(query.job.id); - } + if query + .job + .parent + .is_none_or(|inclusion| query_map[&inclusion.id].job.thread_id != query.job.thread_id) + { + // Register the thread's query stack beginning + assert!(stack.start.is_none(), "found two active queries at a thread's begining"); + stack.start = Some(query.job.id); } let Some(latch) = query.job.latch.upgrade() else { @@ -395,7 +421,7 @@ pub fn break_query_cycles( }; let lock = latch.info.try_lock().unwrap(); assert!(!lock.complete); - for waiter in &lock.waiters { + for (waiter_idx, waiter) in lock.waiters.iter().enumerate() { let waiting_stack = stacks .entry(waiter.thread_id) .or_insert_with(|| QueryStackIntermediate::from_depth(waiter.real_depth() - 1)); @@ -403,46 +429,55 @@ pub fn break_query_cycles( waiting_stack.wait.is_none(), "found two active queries a thread is waiting for" ); - waiting_stack.wait = - Some(QueryWait::Waiter { waited_on: query.job.id, waiter: Arc::clone(&waiter) }); + waiting_stack.wait = Some(QueryWait::Waiter { waited_on: query.job.id, waiter_idx }); } } // Figure out what queries leftover stacks are blocked on - let mut root_thread = None; - let thread_ids: Vec<_> = stacks.keys().copied().collect(); + let mut root_query = None; + let mut thread_ids: Vec<_> = stacks.keys().copied().collect(); for thread_id in &thread_ids { let stack = &stacks[thread_id]; - if let Some(start_id) = stack.start { - let start = &query_map[&start_id]; - let inclusion = start.job.parent.unwrap(); + let start = stack.start.unwrap(); + if let Some(inclusion) = query_map[&start].job.parent { let parent = &query_map[&inclusion.id]; assert_eq!(inclusion.real_depth.get(), *stack.depth.start()); let waiting_stack = stacks.get_mut(&parent.job.thread_id).unwrap(); if *waiting_stack.depth.end() == (inclusion.real_depth.get() - 1) { match &mut waiting_stack.wait { - None => { - waiting_stack.wait = Some(QueryWait::Direct { waited_on: vec![start_id] }) - } + None => waiting_stack.wait = Some(QueryWait::Direct { waited_on: vec![start] }), Some(QueryWait::Direct { waited_on }) => { - assert!(!waited_on.contains(&start_id)); - waited_on.push(start_id); + assert!(!waited_on.contains(&start)); + waited_on.push(start); } Some(QueryWait::Waiter { .. }) => (), } } } else { - assert!(root_thread.is_none(), "found multiple threads without start"); - root_thread = Some(*thread_id); + assert!(root_query.is_none(), "found multiple threads without start"); + root_query = Some(start); } } - let root_thread = root_thread.expect("no root thread was found"); + let root_query = root_query.expect("no root query was found"); for stack in stacks.values() { match stack.wait.as_ref().expect("failed to figure out what active thread is waiting") { - QueryWait::Waiter { waited_on: _, waiter } => { - assert_eq!(waiter.real_depth() - 1, *stack.depth.end()) + QueryWait::Waiter { waited_on, waiter_idx } => { + assert_eq!( + query_map[waited_on] + .job + .latch + .upgrade() + .unwrap() + .info + .try_lock() + .unwrap() + .waiters[*waiter_idx] + .real_depth() + - 1, + *stack.depth.end() + ) } QueryWait::Direct { waited_on } => { let waited_on_query = &query_map[&waited_on[0]]; @@ -456,7 +491,121 @@ pub fn break_query_cycles( } } - panic!("fuh: {stacks:#x?}") + fn collect_branches(query_id: QueryJobId, query_map: &QueryMap) -> Vec { + let query = &query_map[&query_id]; + let Some(inclusion) = query.job.parent.as_ref() else { return Vec::new() }; + // Skip trivial branches + if inclusion.branch == BranchKey::root() { + return collect_branches(inclusion.id, query_map); + } + let mut out = collect_branches(inclusion.id, query_map); + out.push(inclusion.branch); + out + } + let branches: FxHashMap<_, _> = thread_ids + .iter() + .map(|t| { + ( + *t, + collect_branches( + stacks[t].wait.as_ref().unwrap().waiting_query(&query_map), + &query_map, + ), + ) + }) + .collect(); + + thread_ids.sort_by_key(|t| branches[t].as_slice()); + + let branch_enumerations: FxHashMap<_, _> = + thread_ids.iter().enumerate().map(|(v, k)| (*k, v)).collect(); + + let mut subqueries = FxHashMap::<_, BTreeMap>::default(); + for query in query_map.values() { + let Some(inclusion) = &query.job.parent else { + continue; + }; + let old = subqueries + .entry(inclusion.id) + .or_default() + .insert(inclusion.branch, (query.job.id, usize::MAX)); + assert!(old.is_none()); + } + + for stack in stacks.values() { + let &QueryWait::Waiter { waited_on, waiter_idx } = stack.wait.as_ref().unwrap() else { + continue; + }; + + let inclusion = + query_map[&waited_on].job.latch.upgrade().unwrap().info.try_lock().unwrap().waiters + [waiter_idx] + .query + .unwrap(); + let old = subqueries + .entry(inclusion.id) + .or_default() + .insert(inclusion.branch, (waited_on, waiter_idx)); + assert!(old.is_none()); + } + + let mut visited = IndexMap::new(); + let mut last_usage = None; + let mut last_waiter_idx = usize::MAX; + let mut current = root_query; + while let indexmap::map::Entry::Vacant(entry) = visited.entry(current) { + entry.insert((last_usage, last_waiter_idx)); + last_usage = Some(current); + (current, last_waiter_idx) = *subqueries + .get(¤t) + .unwrap_or_else(|| { + panic!( + "deadlock detected as we're unable to find a query cycle to break\n\ + current query map:\n{:#?}", + query_map + ) + }) + .first_key_value() + .unwrap() + .1; + } + let usage = visited[¤t].0; + let mut iter = visited.keys().rev(); + let mut cycle = Vec::new(); + loop { + let query_id = *iter.next().unwrap(); + let query = &query_map[&query_id]; + cycle.push(QueryInfo { span: query.job.span, query: query.query.clone() }); + if query_id == current { + break; + } + } + + cycle.reverse(); + let cycle_error = CycleError { + usage: usage.map(|id| { + let query = &query_map[&id]; + (query.job.span, query.query.clone()) + }), + cycle, + }; + + let (waited_on, waiter_idx) = if last_waiter_idx != usize::MAX { + (current, last_waiter_idx) + } else { + let (&waited_on, &(_, waiter_idx)) = + visited.iter().rev().find(|(_, (_, waiter_idx))| *waiter_idx != usize::MAX).unwrap(); + (waited_on, waiter_idx) + }; + let waited_on = &query_map[&waited_on]; + let latch = waited_on.job.latch.upgrade().unwrap(); + let latch_info_lock = latch.info.try_lock().unwrap(); + let waiter = &latch_info_lock.waiters[waiter_idx]; + let mut cycle_lock = waiter.cycle.try_lock().unwrap(); + assert!(cycle_lock.is_none()); + *cycle_lock = Some(cycle_error); + rustc_thread_pool::mark_unblocked(registry); + waiter.condvar.notify_one(); // // Check that a cycle was found. It is possible for a deadlock to occur without // // a query cycle if a query which can be waited on uses Rayon to do multithreading From df8fb4e5592242f1ba8cd5bdad17e1513510075e Mon Sep 17 00:00:00 2001 From: Daria Sukhonina Date: Wed, 10 Dec 2025 15:30:21 +0300 Subject: [PATCH 09/10] Clean up unused code for now --- compiler/rustc_middle/src/ty/context/tls.rs | 1 - compiler/rustc_query_system/src/query/job.rs | 109 +------------------ 2 files changed, 6 insertions(+), 104 deletions(-) diff --git a/compiler/rustc_middle/src/ty/context/tls.rs b/compiler/rustc_middle/src/ty/context/tls.rs index 6bd8b253e24c5..58c284f5e45e8 100644 --- a/compiler/rustc_middle/src/ty/context/tls.rs +++ b/compiler/rustc_middle/src/ty/context/tls.rs @@ -5,7 +5,6 @@ use rustc_query_system::query::QueryInclusion; use super::{GlobalCtxt, TyCtxt}; use crate::dep_graph::TaskDepsRef; -use crate::query::plumbing::QueryJobId; /// This is the implicit state of rustc. It contains the current /// `TyCtxt` and query. It is updated when creating a local interner or diff --git a/compiler/rustc_query_system/src/query/job.rs b/compiler/rustc_query_system/src/query/job.rs index 5535764de0f88..03c8aa4b49bf7 100644 --- a/compiler/rustc_query_system/src/query/job.rs +++ b/compiler/rustc_query_system/src/query/job.rs @@ -3,19 +3,18 @@ use std::fmt::Debug; use std::hash::Hash; use std::io::Write; use std::num::NonZero; +use std::ops; use std::sync::{Arc, Weak}; use std::thread::ThreadId; -use std::{iter, ops}; use parking_lot::{Condvar, Mutex}; -use rustc_data_structures::fx::{FxHashMap, FxHashSet}; -use rustc_data_structures::indexmap::{self, IndexMap, IndexSet}; +use rustc_data_structures::fx::FxHashMap; +use rustc_data_structures::indexmap::{self, IndexMap}; use rustc_data_structures::sync::BranchKey; use rustc_errors::{Diag, DiagCtxtHandle}; use rustc_hir::def::DefKind; use rustc_session::Session; -use rustc_span::{DUMMY_SP, Span}; -use smallvec::SmallVec; +use rustc_span::Span; use super::QueryStackFrameExtra; use crate::dep_graph::DepContext; @@ -50,18 +49,6 @@ impl QueryJobId { fn query(self, map: &QueryMap) -> QueryStackFrame { map.get(&self).unwrap().query.clone() } - - fn span(self, map: &QueryMap) -> Span { - map.get(&self).unwrap().job.span - } - - fn parent(self, map: &QueryMap) -> Option { - map.get(&self).unwrap().job.parent - } - - fn latch(self, map: &QueryMap) -> &Weak> { - &map.get(&self).unwrap().job.latch - } } #[derive(Clone, Debug)] @@ -293,39 +280,6 @@ impl QueryLatch { waiter.condvar.notify_one(); } } - - /// Removes a single waiter from the list of waiters. - /// This is used to break query cycles. - fn extract_waiter(&self, waiter: usize) -> Arc> { - let mut info = self.info.lock(); - debug_assert!(!info.complete); - // Remove the waiter from the list of waiters - info.waiters.remove(waiter) - } -} - -/// A resumable waiter of a query. The usize is the index into waiters in the query's latch -type Waiter = (QueryJobId, usize); - -// Deterministically pick an query from a list -fn pick_query<'a, I: Clone, T, F>(query_map: &QueryMap, queries: &'a [T], f: F) -> &'a T -where - F: Fn(&T) -> (Span, QueryJobId), -{ - // Deterministically pick an entry point - // FIXME: Sort this instead - queries - .iter() - .min_by_key(|v| { - let (span, query) = f(v); - let hash = query.query(query_map).hash; - // Prefer entry points which have valid spans for nicer error messages - // We add an integer to the tuple ensuring that entry points - // with valid spans are picked first - let span_cmp = if span == DUMMY_SP { 1 } else { 0 }; - (span_cmp, hash) - }) - .unwrap() } /// Detects query cycles by using depth first search over all active query jobs. @@ -369,28 +323,6 @@ pub fn break_query_cycles( Direct { waited_on: Vec }, } - impl QueryWait { - fn waiting_query(&self, query_map: &QueryMap) -> QueryJobId { - match self { - QueryWait::Waiter { waited_on, waiter_idx } => { - query_map[waited_on] - .job - .latch - .upgrade() - .unwrap() - .info - .try_lock() - .unwrap() - .waiters[*waiter_idx] - .query - .unwrap() - .id - } - QueryWait::Direct { waited_on } => query_map[&waited_on[0]].job.parent.unwrap().id, - } - } - } - let mut stacks = FxHashMap::::default(); for query in query_map.values() { let query_depth = query.job.real_depth(); @@ -399,7 +331,7 @@ pub fn break_query_cycles( hash_map::Entry::Vacant(entry) => { entry.insert(QueryStackIntermediate::from_depth(query_depth)) } - hash_map::Entry::Occupied(mut entry) => { + hash_map::Entry::Occupied(entry) => { let stack = entry.into_mut(); stack.update_depth(query_depth); stack @@ -435,7 +367,7 @@ pub fn break_query_cycles( // Figure out what queries leftover stacks are blocked on let mut root_query = None; - let mut thread_ids: Vec<_> = stacks.keys().copied().collect(); + let thread_ids: Vec<_> = stacks.keys().copied().collect(); for thread_id in &thread_ids { let stack = &stacks[thread_id]; let start = stack.start.unwrap(); @@ -491,35 +423,6 @@ pub fn break_query_cycles( } } - fn collect_branches(query_id: QueryJobId, query_map: &QueryMap) -> Vec { - let query = &query_map[&query_id]; - let Some(inclusion) = query.job.parent.as_ref() else { return Vec::new() }; - // Skip trivial branches - if inclusion.branch == BranchKey::root() { - return collect_branches(inclusion.id, query_map); - } - let mut out = collect_branches(inclusion.id, query_map); - out.push(inclusion.branch); - out - } - let branches: FxHashMap<_, _> = thread_ids - .iter() - .map(|t| { - ( - *t, - collect_branches( - stacks[t].wait.as_ref().unwrap().waiting_query(&query_map), - &query_map, - ), - ) - }) - .collect(); - - thread_ids.sort_by_key(|t| branches[t].as_slice()); - - let branch_enumerations: FxHashMap<_, _> = - thread_ids.iter().enumerate().map(|(v, k)| (*k, v)).collect(); - let mut subqueries = FxHashMap::<_, BTreeMap>::default(); for query in query_map.values() { let Some(inclusion) = &query.job.parent else { From ba45f0b7d89e3b0722b20d2d25292983a5120fea Mon Sep 17 00:00:00 2001 From: Daria Sukhonina Date: Wed, 10 Dec 2025 17:01:26 +0300 Subject: [PATCH 10/10] Get rid of warnings for now --- compiler/rustc_middle/src/query/plumbing.rs | 1 - compiler/rustc_query_system/src/query/job.rs | 2 ++ 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/compiler/rustc_middle/src/query/plumbing.rs b/compiler/rustc_middle/src/query/plumbing.rs index 8d01d9482ed4b..e9775fd2e0faa 100644 --- a/compiler/rustc_middle/src/query/plumbing.rs +++ b/compiler/rustc_middle/src/query/plumbing.rs @@ -6,7 +6,6 @@ use rustc_hir::hir_id::OwnerId; use rustc_macros::HashStable; use rustc_query_system::HandleCycleError; use rustc_query_system::dep_graph::{DepNodeIndex, SerializedDepNodeIndex}; -pub(crate) use rustc_query_system::query::QueryJobId; use rustc_query_system::query::*; use rustc_span::{ErrorGuaranteed, Span}; pub use sealed::IntoQueryParam; diff --git a/compiler/rustc_query_system/src/query/job.rs b/compiler/rustc_query_system/src/query/job.rs index 03c8aa4b49bf7..5241734113f64 100644 --- a/compiler/rustc_query_system/src/query/job.rs +++ b/compiler/rustc_query_system/src/query/job.rs @@ -195,6 +195,8 @@ struct QueryWaiter { query: Option, thread_id: ThreadId, condvar: Condvar, + // remove this after making sure PR it's ok to do + #[allow(dead_code)] span: Span, cycle: Mutex>>, }