From f6bf4ed48ad778b17ebaa042caa99775ea6ea52f Mon Sep 17 00:00:00 2001 From: Wolf Vollprecht Date: Fri, 28 Nov 2025 20:39:53 +0100 Subject: [PATCH 1/4] feat: add ProcessSignaler to expose child process PIDs --- src/shell/commands/executable.rs | 21 +++- src/shell/execute.rs | 63 +++++++++++ src/shell/mod.rs | 2 + src/shell/types.rs | 177 +++++++++++++++++++++++++++++++ tests/integration_test.rs | 114 ++++++++++++++++++++ 5 files changed, 372 insertions(+), 5 deletions(-) diff --git a/src/shell/commands/executable.rs b/src/shell/commands/executable.rs index 2d5a75d..86ae08a 100644 --- a/src/shell/commands/executable.rs +++ b/src/shell/commands/executable.rs @@ -59,18 +59,28 @@ impl ShellCommand for ExecutableCommand { context.state.track_child_process(&child); + // Notify about the spawned child process + let process_signaler = context.state.process_signaler().clone(); + if let Some(pid) = child.id() { + process_signaler.notify_spawn(pid); + } + // avoid deadlock since this is holding onto the pipes drop(sub_command); loop { tokio::select! { result = child.wait() => match result { - Ok(status) => return ExecuteResult::Continue( - status.code().unwrap_or(1), - Vec::new(), - Vec::new(), - ), + Ok(status) => { + process_signaler.notify_exit(); + return ExecuteResult::Continue( + status.code().unwrap_or(1), + Vec::new(), + Vec::new(), + ); + } Err(err) => { + process_signaler.notify_exit(); let _ = stderr.write_line(&format!("{}", err)); return ExecuteResult::from_exit_code(1); } @@ -83,6 +93,7 @@ impl ShellCommand for ExecutableCommand { if cfg!(not(unix)) && signal.causes_abort() { let _ = child.start_kill(); let status = child.wait().await.ok(); + process_signaler.notify_exit(); return ExecuteResult::from_exit_code( status.and_then(|s| s.code()).unwrap_or(signal.aborted_code()), ); diff --git a/src/shell/execute.rs b/src/shell/execute.rs index 23be78a..febdc7f 100644 --- a/src/shell/execute.rs +++ b/src/shell/execute.rs @@ -37,6 +37,7 @@ use crate::shell::types::EnvChange; use crate::shell::types::ExecuteResult; use crate::shell::types::FutureExecuteResult; use crate::shell::types::KillSignal; +use crate::shell::types::ProcessSignaler; use crate::shell::types::ShellPipeReader; use crate::shell::types::ShellPipeWriter; use crate::shell::types::ShellState; @@ -80,6 +81,68 @@ pub async fn execute( .await } +/// Executes a command list and returns the ProcessSignaler for monitoring child processes. +/// +/// This is useful when you need to track spawned child PIDs for signal forwarding. +/// The returned `ProcessSignaler` can be used to: +/// - Get the current foreground process PID via `current_pid()` +/// - Subscribe to process spawn notifications via `subscribe()` +/// +/// # Example +/// +/// ```ignore +/// use deno_task_shell::{execute_with_signaler, KillSignal, SignalKind}; +/// +/// let kill_signal = KillSignal::default(); +/// let (signaler, execute_future) = execute_with_signaler( +/// list, +/// env_vars, +/// cwd, +/// custom_commands, +/// kill_signal.clone(), +/// ); +/// +/// // Check the current child process +/// if let Some(child_pid) = signaler.current_pid() { +/// // Decide whether to forward signals based on process group +/// let child_pgid = unsafe { libc::getpgid(child_pid as i32) }; +/// let our_pgid = unsafe { libc::getpgid(0) }; +/// +/// if child_pgid != our_pgid { +/// kill_signal.send(SignalKind::SIGINT); +/// } +/// } +/// +/// let exit_code = execute_future.await; +/// ``` +pub fn execute_with_signaler( + list: SequentialList, + env_vars: HashMap, + cwd: PathBuf, + custom_commands: HashMap>, + kill_signal: KillSignal, +) -> (ProcessSignaler, impl std::future::Future) { + let signaler = ProcessSignaler::new(); + let state = ShellState::new_with_process_signaler( + env_vars, + cwd, + custom_commands, + kill_signal, + signaler.clone(), + ); + let future = async move { + execute_with_pipes( + list, + state, + ShellPipeReader::stdin(), + ShellPipeWriter::stdout(), + ShellPipeWriter::stderr(), + ) + .await + }; + (signaler, future) +} + /// Executes a `SequentialList` of commands with specified input and output pipes. /// /// This function accepts a list of commands, a shell state, and pipes for standard input, output, and error. diff --git a/src/shell/mod.rs b/src/shell/mod.rs index 7f30d6b..6e20cc1 100644 --- a/src/shell/mod.rs +++ b/src/shell/mod.rs @@ -6,11 +6,13 @@ pub use commands::ShellCommand; pub use commands::ShellCommandContext; pub use execute::execute; pub use execute::execute_with_pipes; +pub use execute::execute_with_signaler; pub use types::EnvChange; pub use types::ExecuteResult; pub use types::FutureExecuteResult; pub use types::KillSignal; pub use types::KillSignalDropGuard; +pub use types::ProcessSignaler; pub use types::ShellPipeReader; pub use types::ShellPipeWriter; pub use types::ShellState; diff --git a/src/shell/types.rs b/src/shell/types.rs index 3ff5ede..978efef 100644 --- a/src/shell/types.rs +++ b/src/shell/types.rs @@ -57,6 +57,7 @@ pub struct ShellState { kill_signal: KillSignal, process_tracker: ChildProcessTracker, tree_exit_code_cell: TreeExitCodeCell, + process_signaler: ProcessSignaler, } impl ShellState { @@ -65,6 +66,25 @@ impl ShellState { cwd: PathBuf, custom_commands: HashMap>, kill_signal: KillSignal, + ) -> Self { + Self::new_with_process_signaler( + env_vars, + cwd, + custom_commands, + kill_signal, + ProcessSignaler::new(), + ) + } + + /// Creates a new ShellState with a custom ProcessSignaler. + /// + /// Use this when you need to track child process PIDs for signal forwarding. + pub fn new_with_process_signaler( + env_vars: HashMap, + cwd: PathBuf, + custom_commands: HashMap>, + kill_signal: KillSignal, + process_signaler: ProcessSignaler, ) -> Self { assert!(cwd.is_absolute()); let mut commands = builtin_commands(); @@ -77,6 +97,7 @@ impl ShellState { kill_signal, process_tracker: ChildProcessTracker::new(), tree_exit_code_cell: Default::default(), + process_signaler, }; // ensure the data is normalized for (name, value) in env_vars { @@ -169,6 +190,11 @@ impl ShellState { &self.kill_signal } + /// Returns the process signaler for tracking child process PIDs. + pub fn process_signaler(&self) -> &ProcessSignaler { + &self.process_signaler + } + pub fn track_child_process(&self, child: &tokio::process::Child) { self.process_tracker.track(child); } @@ -632,6 +658,81 @@ impl KillSignalDropGuard { } } +#[derive(Debug, Default)] +struct ProcessSignalerInner { + /// The PID of the current foreground process, if any. + current_pid: Cell>, + /// Sender for process spawn notifications. + /// Lazily initialized on first subscribe. + sender: RefCell>>, +} + +/// Provides access to the currently running foreground child process. +/// +/// This is useful for signal forwarding scenarios where you need to check +/// if the child process is in the same process group as the parent. +/// +/// # Example +/// +/// ```ignore +/// let signaler = ProcessSignaler::new(); +/// let mut receiver = signaler.subscribe(); +/// +/// // In a signal handler, check if we should forward the signal +/// if let Some(child_pid) = signaler.current_pid() { +/// // Check if child is in same process group +/// let child_pgid = unsafe { libc::getpgid(child_pid as i32) }; +/// let our_pgid = unsafe { libc::getpgid(0) }; +/// +/// if child_pgid != our_pgid { +/// // Child in different process group, forward signal +/// kill_signal.send(SignalKind::SIGINT); +/// } +/// } +/// ``` +#[derive(Debug, Clone, Default)] +pub struct ProcessSignaler(Rc); + +impl ProcessSignaler { + /// Creates a new ProcessSignaler. + pub fn new() -> Self { + Self::default() + } + + /// Returns the PID of the current foreground child process, if any. + /// + /// Returns `None` if no child process is currently running. + pub fn current_pid(&self) -> Option { + self.0.current_pid.get() + } + + /// Subscribe to receive notifications when child processes spawn. + /// + /// Returns a receiver that yields PIDs of spawned processes. + /// The channel is lazily created on first subscription. + pub fn subscribe(&self) -> broadcast::Receiver { + let mut sender_ref = self.0.sender.borrow_mut(); + if sender_ref.is_none() { + let (sender, _) = broadcast::channel(16); + *sender_ref = Some(sender); + } + sender_ref.as_ref().unwrap().subscribe() + } + + /// Called internally when a child process is spawned. + pub(crate) fn notify_spawn(&self, pid: u32) { + self.0.current_pid.set(Some(pid)); + if let Some(sender) = self.0.sender.borrow().as_ref() { + let _ = sender.send(pid); + } + } + + /// Called internally when a child process exits. + pub(crate) fn notify_exit(&self) { + self.0.current_pid.set(None); + } +} + #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub enum SignalKind { SIGTERM, @@ -715,6 +816,7 @@ impl From for i32 { #[cfg(test)] mod test { + use super::ProcessSignaler; use crate::KillSignal; use crate::SignalKind; @@ -860,4 +962,79 @@ mod test { Some(SignalKind::SIGTERM.aborted_code()) ); } + + #[test] + fn test_process_signaler_current_pid() { + let signaler = ProcessSignaler::new(); + assert_eq!(signaler.current_pid(), None); + + signaler.notify_spawn(1234); + assert_eq!(signaler.current_pid(), Some(1234)); + + signaler.notify_spawn(5678); + assert_eq!(signaler.current_pid(), Some(5678)); + + signaler.notify_exit(); + assert_eq!(signaler.current_pid(), None); + } + + #[test] + fn test_process_signaler_clone() { + let signaler = ProcessSignaler::new(); + let signaler_clone = signaler.clone(); + + signaler.notify_spawn(1234); + + // Both should see the same PID since they share the inner state + assert_eq!(signaler.current_pid(), Some(1234)); + assert_eq!(signaler_clone.current_pid(), Some(1234)); + + signaler_clone.notify_exit(); + assert_eq!(signaler.current_pid(), None); + assert_eq!(signaler_clone.current_pid(), None); + } + + #[tokio::test] + async fn test_process_signaler_subscribe() { + let signaler = ProcessSignaler::new(); + let mut receiver = signaler.subscribe(); + + // Spawn notification should be received + signaler.notify_spawn(1234); + + let pid = receiver.recv().await.unwrap(); + assert_eq!(pid, 1234); + + // Multiple spawns should be received + signaler.notify_spawn(5678); + let pid2 = receiver.recv().await.unwrap(); + assert_eq!(pid2, 5678); + } + + #[tokio::test] + async fn test_process_signaler_multiple_subscribers() { + let signaler = ProcessSignaler::new(); + let mut receiver1 = signaler.subscribe(); + let mut receiver2 = signaler.subscribe(); + + signaler.notify_spawn(1234); + + // Both receivers should get the notification + let pid1 = receiver1.recv().await.unwrap(); + let pid2 = receiver2.recv().await.unwrap(); + assert_eq!(pid1, 1234); + assert_eq!(pid2, 1234); + } + + #[test] + fn test_process_signaler_no_subscribers() { + // Should not panic when there are no subscribers + let signaler = ProcessSignaler::new(); + signaler.notify_spawn(1234); + signaler.notify_exit(); + + // PID should still be tracked even without subscribers + signaler.notify_spawn(5678); + assert_eq!(signaler.current_pid(), Some(5678)); + } } diff --git a/tests/integration_test.rs b/tests/integration_test.rs index af2c5f7..26b34b0 100644 --- a/tests/integration_test.rs +++ b/tests/integration_test.rs @@ -6,6 +6,8 @@ use std::time::Instant; use deno_task_shell::ExecuteResult; use deno_task_shell::KillSignal; use deno_task_shell::SignalKind; +use deno_task_shell::execute_with_signaler; +use deno_task_shell::parser::parse; use futures::FutureExt; use self::test_builder::TestBuilder; @@ -1631,3 +1633,115 @@ fn no_such_file_error_text() -> &'static str { "No such file or directory (os error 2)" } } + +#[tokio::test] +async fn test_process_signaler_with_execute() { + use std::collections::HashMap; + + let list = parse("echo hello").unwrap(); + let kill_signal = KillSignal::default(); + + let (signaler, execute_future) = execute_with_signaler( + list, + HashMap::new(), + std::env::current_dir().unwrap(), + HashMap::new(), + kill_signal, + ); + + // Subscribe before executing to ensure we receive the notification + let _receiver = signaler.subscribe(); + + // Run the execution in a local set + let local_set = tokio::task::LocalSet::new(); + let exit_code = local_set.run_until(execute_future).await; + + assert_eq!(exit_code, 0); + + // We should have received a PID notification for the echo command + // (echo is a builtin, so no external process is spawned) + // The test validates that the mechanism works without errors +} + +#[tokio::test] +async fn test_process_signaler_tracks_external_command() { + use std::collections::HashMap; + use std::cell::Cell; + use std::rc::Rc; + + // Use an external command that definitely spawns a process + // Note: We use /usr/bin/sleep directly because `sleep` is a builtin in deno_task_shell + let list = parse("/usr/bin/sleep 0.1").unwrap(); + let kill_signal = KillSignal::default(); + + let (signaler, execute_future) = execute_with_signaler( + list, + HashMap::new(), + std::env::current_dir().unwrap(), + HashMap::new(), + kill_signal, + ); + + let signaler_clone = signaler.clone(); + let seen_pid = Rc::new(Cell::new(None::)); + let seen_pid_clone = seen_pid.clone(); + + // Run the execution in a local set + let local_set = tokio::task::LocalSet::new(); + let exit_code = local_set.run_until(async move { + // Spawn a local task to periodically check the current PID + let handle = tokio::task::spawn_local(async move { + // Check every 5ms for up to 1 second + for _ in 0..200 { + if let Some(pid) = signaler_clone.current_pid() { + seen_pid_clone.set(Some(pid)); + break; + } + tokio::time::sleep(Duration::from_millis(5)).await; + } + }); + + // Yield to let the checker task start + tokio::task::yield_now().await; + + let exit_code = execute_future.await; + + // Wait for the checker task to complete + let _ = handle.await; + + exit_code + }).await; + + assert_eq!(exit_code, 0); + assert!( + seen_pid.get().is_some(), + "Should have seen a PID during execution of the sleep command, got: {:?}", + seen_pid.get() + ); +} + +#[tokio::test] +async fn test_process_signaler_current_pid_after_execution() { + use std::collections::HashMap; + + // Use an external command that takes some time + let list = parse("/usr/bin/sleep 0.01").unwrap(); + let kill_signal = KillSignal::default(); + + let (signaler, execute_future) = execute_with_signaler( + list, + HashMap::new(), + std::env::current_dir().unwrap(), + HashMap::new(), + kill_signal, + ); + + // Run the execution in a local set + let local_set = tokio::task::LocalSet::new(); + let exit_code = local_set.run_until(execute_future).await; + + assert_eq!(exit_code, 0); + + // After execution, current_pid should be None + assert_eq!(signaler.current_pid(), None); +} From 88b8bdcc524d75fc18013104c3e9959cff53ebed Mon Sep 17 00:00:00 2001 From: Wolf Vollprecht Date: Fri, 28 Nov 2025 20:49:41 +0100 Subject: [PATCH 2/4] format --- tests/integration_test.rs | 40 ++++++++++++++++++++------------------- 1 file changed, 21 insertions(+), 19 deletions(-) diff --git a/tests/integration_test.rs b/tests/integration_test.rs index 26b34b0..6a36318 100644 --- a/tests/integration_test.rs +++ b/tests/integration_test.rs @@ -1665,8 +1665,8 @@ async fn test_process_signaler_with_execute() { #[tokio::test] async fn test_process_signaler_tracks_external_command() { - use std::collections::HashMap; use std::cell::Cell; + use std::collections::HashMap; use std::rc::Rc; // Use an external command that definitely spawns a process @@ -1688,29 +1688,31 @@ async fn test_process_signaler_tracks_external_command() { // Run the execution in a local set let local_set = tokio::task::LocalSet::new(); - let exit_code = local_set.run_until(async move { - // Spawn a local task to periodically check the current PID - let handle = tokio::task::spawn_local(async move { - // Check every 5ms for up to 1 second - for _ in 0..200 { - if let Some(pid) = signaler_clone.current_pid() { - seen_pid_clone.set(Some(pid)); - break; + let exit_code = local_set + .run_until(async move { + // Spawn a local task to periodically check the current PID + let handle = tokio::task::spawn_local(async move { + // Check every 5ms for up to 1 second + for _ in 0..200 { + if let Some(pid) = signaler_clone.current_pid() { + seen_pid_clone.set(Some(pid)); + break; + } + tokio::time::sleep(Duration::from_millis(5)).await; } - tokio::time::sleep(Duration::from_millis(5)).await; - } - }); + }); - // Yield to let the checker task start - tokio::task::yield_now().await; + // Yield to let the checker task start + tokio::task::yield_now().await; - let exit_code = execute_future.await; + let exit_code = execute_future.await; - // Wait for the checker task to complete - let _ = handle.await; + // Wait for the checker task to complete + let _ = handle.await; - exit_code - }).await; + exit_code + }) + .await; assert_eq!(exit_code, 0); assert!( From 10e52bd36f620e3e73bc43f9765872baccf33cf7 Mon Sep 17 00:00:00 2001 From: Wolf Vollprecht Date: Sat, 29 Nov 2025 09:23:19 +0100 Subject: [PATCH 3/4] integrate iwth killsignal --- src/shell/commands/executable.rs | 14 +- src/shell/execute.rs | 63 -------- src/shell/mod.rs | 2 - src/shell/types.rs | 253 +++++++++++-------------------- tests/integration_test.rs | 115 -------------- 5 files changed, 93 insertions(+), 354 deletions(-) diff --git a/src/shell/commands/executable.rs b/src/shell/commands/executable.rs index 86ae08a..122c1ba 100644 --- a/src/shell/commands/executable.rs +++ b/src/shell/commands/executable.rs @@ -59,10 +59,10 @@ impl ShellCommand for ExecutableCommand { context.state.track_child_process(&child); - // Notify about the spawned child process - let process_signaler = context.state.process_signaler().clone(); + // Track the spawned child process in the kill signal + let kill_signal = context.state.kill_signal().clone(); if let Some(pid) = child.id() { - process_signaler.notify_spawn(pid); + kill_signal.set_child_process(pid); } // avoid deadlock since this is holding onto the pipes @@ -72,7 +72,7 @@ impl ShellCommand for ExecutableCommand { tokio::select! { result = child.wait() => match result { Ok(status) => { - process_signaler.notify_exit(); + kill_signal.clear_child_process(); return ExecuteResult::Continue( status.code().unwrap_or(1), Vec::new(), @@ -80,12 +80,12 @@ impl ShellCommand for ExecutableCommand { ); } Err(err) => { - process_signaler.notify_exit(); + kill_signal.clear_child_process(); let _ = stderr.write_line(&format!("{}", err)); return ExecuteResult::from_exit_code(1); } }, - signal = context.state.kill_signal().wait_any() => { + signal = kill_signal.wait_any() => { if let Some(_id) = child.id() { #[cfg(unix)] kill(_id as i32, signal); @@ -93,7 +93,7 @@ impl ShellCommand for ExecutableCommand { if cfg!(not(unix)) && signal.causes_abort() { let _ = child.start_kill(); let status = child.wait().await.ok(); - process_signaler.notify_exit(); + kill_signal.clear_child_process(); return ExecuteResult::from_exit_code( status.and_then(|s| s.code()).unwrap_or(signal.aborted_code()), ); diff --git a/src/shell/execute.rs b/src/shell/execute.rs index febdc7f..23be78a 100644 --- a/src/shell/execute.rs +++ b/src/shell/execute.rs @@ -37,7 +37,6 @@ use crate::shell::types::EnvChange; use crate::shell::types::ExecuteResult; use crate::shell::types::FutureExecuteResult; use crate::shell::types::KillSignal; -use crate::shell::types::ProcessSignaler; use crate::shell::types::ShellPipeReader; use crate::shell::types::ShellPipeWriter; use crate::shell::types::ShellState; @@ -81,68 +80,6 @@ pub async fn execute( .await } -/// Executes a command list and returns the ProcessSignaler for monitoring child processes. -/// -/// This is useful when you need to track spawned child PIDs for signal forwarding. -/// The returned `ProcessSignaler` can be used to: -/// - Get the current foreground process PID via `current_pid()` -/// - Subscribe to process spawn notifications via `subscribe()` -/// -/// # Example -/// -/// ```ignore -/// use deno_task_shell::{execute_with_signaler, KillSignal, SignalKind}; -/// -/// let kill_signal = KillSignal::default(); -/// let (signaler, execute_future) = execute_with_signaler( -/// list, -/// env_vars, -/// cwd, -/// custom_commands, -/// kill_signal.clone(), -/// ); -/// -/// // Check the current child process -/// if let Some(child_pid) = signaler.current_pid() { -/// // Decide whether to forward signals based on process group -/// let child_pgid = unsafe { libc::getpgid(child_pid as i32) }; -/// let our_pgid = unsafe { libc::getpgid(0) }; -/// -/// if child_pgid != our_pgid { -/// kill_signal.send(SignalKind::SIGINT); -/// } -/// } -/// -/// let exit_code = execute_future.await; -/// ``` -pub fn execute_with_signaler( - list: SequentialList, - env_vars: HashMap, - cwd: PathBuf, - custom_commands: HashMap>, - kill_signal: KillSignal, -) -> (ProcessSignaler, impl std::future::Future) { - let signaler = ProcessSignaler::new(); - let state = ShellState::new_with_process_signaler( - env_vars, - cwd, - custom_commands, - kill_signal, - signaler.clone(), - ); - let future = async move { - execute_with_pipes( - list, - state, - ShellPipeReader::stdin(), - ShellPipeWriter::stdout(), - ShellPipeWriter::stderr(), - ) - .await - }; - (signaler, future) -} - /// Executes a `SequentialList` of commands with specified input and output pipes. /// /// This function accepts a list of commands, a shell state, and pipes for standard input, output, and error. diff --git a/src/shell/mod.rs b/src/shell/mod.rs index 6e20cc1..7f30d6b 100644 --- a/src/shell/mod.rs +++ b/src/shell/mod.rs @@ -6,13 +6,11 @@ pub use commands::ShellCommand; pub use commands::ShellCommandContext; pub use execute::execute; pub use execute::execute_with_pipes; -pub use execute::execute_with_signaler; pub use types::EnvChange; pub use types::ExecuteResult; pub use types::FutureExecuteResult; pub use types::KillSignal; pub use types::KillSignalDropGuard; -pub use types::ProcessSignaler; pub use types::ShellPipeReader; pub use types::ShellPipeWriter; pub use types::ShellState; diff --git a/src/shell/types.rs b/src/shell/types.rs index 978efef..0c0fe68 100644 --- a/src/shell/types.rs +++ b/src/shell/types.rs @@ -57,7 +57,6 @@ pub struct ShellState { kill_signal: KillSignal, process_tracker: ChildProcessTracker, tree_exit_code_cell: TreeExitCodeCell, - process_signaler: ProcessSignaler, } impl ShellState { @@ -66,25 +65,6 @@ impl ShellState { cwd: PathBuf, custom_commands: HashMap>, kill_signal: KillSignal, - ) -> Self { - Self::new_with_process_signaler( - env_vars, - cwd, - custom_commands, - kill_signal, - ProcessSignaler::new(), - ) - } - - /// Creates a new ShellState with a custom ProcessSignaler. - /// - /// Use this when you need to track child process PIDs for signal forwarding. - pub fn new_with_process_signaler( - env_vars: HashMap, - cwd: PathBuf, - custom_commands: HashMap>, - kill_signal: KillSignal, - process_signaler: ProcessSignaler, ) -> Self { assert!(cwd.is_absolute()); let mut commands = builtin_commands(); @@ -97,7 +77,6 @@ impl ShellState { kill_signal, process_tracker: ChildProcessTracker::new(), tree_exit_code_cell: Default::default(), - process_signaler, }; // ensure the data is normalized for (name, value) in env_vars { @@ -190,11 +169,6 @@ impl ShellState { &self.kill_signal } - /// Returns the process signaler for tracking child process PIDs. - pub fn process_signaler(&self) -> &ProcessSignaler { - &self.process_signaler - } - pub fn track_child_process(&self, child: &tokio::process::Child) { self.process_tracker.track(child); } @@ -524,6 +498,17 @@ pub fn pipe() -> (ShellPipeReader, ShellPipeWriter) { ) } +/// Information about the current child process being tracked. +#[derive(Debug, Clone, Copy, Default)] +struct ChildProcessInfo { + /// The PID of the current foreground child process. + pid: Option, + /// The PGID of the current foreground child process (Unix only). + /// Cached at spawn time to avoid repeated syscalls. + #[cfg(unix)] + pgid: Option, +} + #[derive(Debug)] struct KillSignalInner { // WARNING: This should struct should not be made Sync. @@ -535,6 +520,8 @@ struct KillSignalInner { aborted_code: RefCell>, sender: broadcast::Sender, children: RefCell>>, + /// Information about the current child process. + child_process: Cell, } impl KillSignalInner { @@ -570,6 +557,7 @@ impl Default for KillSignal { aborted_code: RefCell::new(None), sender, children: Default::default(), + child_process: Cell::new(ChildProcessInfo::default()), })) } } @@ -588,6 +576,7 @@ impl KillSignal { aborted_code: RefCell::new(self.aborted_code()), sender, children: RefCell::new(Vec::new()), + child_process: Cell::new(ChildProcessInfo::default()), }); // Add the child to the parent's list of children @@ -633,6 +622,48 @@ impl KillSignal { // unwrap is ok because we're holding a sender in `self` receiver.recv().await.unwrap() } + + /// Returns the PID of the current foreground child process, if any. + /// + /// This is useful for signal forwarding scenarios where you need to check + /// if the child process is in the same process group as the parent. + pub fn current_child_pid(&self) -> Option { + self.0.child_process.get().pid + } + + /// Returns the PGID of the current foreground child process, if any. + /// + /// The PGID is cached at spawn time to avoid repeated syscalls. + /// This is useful for determining whether to forward signals: + /// if the child is in the same process group, the terminal's signal + /// will already reach it directly. + #[cfg(unix)] + pub fn current_child_pgid(&self) -> Option { + self.0.child_process.get().pgid + } + + /// Called internally when a child process is spawned. + /// + /// On Unix, this also caches the child's PGID. + pub(crate) fn set_child_process(&self, pid: u32) { + #[cfg(unix)] + let pgid = { + // Cache the PGID at spawn time + let pgid = unsafe { nix::libc::getpgid(pid as i32) }; + if pgid > 0 { Some(pgid) } else { None } + }; + + self.0.child_process.set(ChildProcessInfo { + pid: Some(pid), + #[cfg(unix)] + pgid, + }); + } + + /// Called internally when a child process exits. + pub(crate) fn clear_child_process(&self) { + self.0.child_process.set(ChildProcessInfo::default()); + } } /// Guard that on drop will send a signal on the associated `KillSignal`. @@ -658,81 +689,6 @@ impl KillSignalDropGuard { } } -#[derive(Debug, Default)] -struct ProcessSignalerInner { - /// The PID of the current foreground process, if any. - current_pid: Cell>, - /// Sender for process spawn notifications. - /// Lazily initialized on first subscribe. - sender: RefCell>>, -} - -/// Provides access to the currently running foreground child process. -/// -/// This is useful for signal forwarding scenarios where you need to check -/// if the child process is in the same process group as the parent. -/// -/// # Example -/// -/// ```ignore -/// let signaler = ProcessSignaler::new(); -/// let mut receiver = signaler.subscribe(); -/// -/// // In a signal handler, check if we should forward the signal -/// if let Some(child_pid) = signaler.current_pid() { -/// // Check if child is in same process group -/// let child_pgid = unsafe { libc::getpgid(child_pid as i32) }; -/// let our_pgid = unsafe { libc::getpgid(0) }; -/// -/// if child_pgid != our_pgid { -/// // Child in different process group, forward signal -/// kill_signal.send(SignalKind::SIGINT); -/// } -/// } -/// ``` -#[derive(Debug, Clone, Default)] -pub struct ProcessSignaler(Rc); - -impl ProcessSignaler { - /// Creates a new ProcessSignaler. - pub fn new() -> Self { - Self::default() - } - - /// Returns the PID of the current foreground child process, if any. - /// - /// Returns `None` if no child process is currently running. - pub fn current_pid(&self) -> Option { - self.0.current_pid.get() - } - - /// Subscribe to receive notifications when child processes spawn. - /// - /// Returns a receiver that yields PIDs of spawned processes. - /// The channel is lazily created on first subscription. - pub fn subscribe(&self) -> broadcast::Receiver { - let mut sender_ref = self.0.sender.borrow_mut(); - if sender_ref.is_none() { - let (sender, _) = broadcast::channel(16); - *sender_ref = Some(sender); - } - sender_ref.as_ref().unwrap().subscribe() - } - - /// Called internally when a child process is spawned. - pub(crate) fn notify_spawn(&self, pid: u32) { - self.0.current_pid.set(Some(pid)); - if let Some(sender) = self.0.sender.borrow().as_ref() { - let _ = sender.send(pid); - } - } - - /// Called internally when a child process exits. - pub(crate) fn notify_exit(&self) { - self.0.current_pid.set(None); - } -} - #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub enum SignalKind { SIGTERM, @@ -816,7 +772,6 @@ impl From for i32 { #[cfg(test)] mod test { - use super::ProcessSignaler; use crate::KillSignal; use crate::SignalKind; @@ -964,77 +919,41 @@ mod test { } #[test] - fn test_process_signaler_current_pid() { - let signaler = ProcessSignaler::new(); - assert_eq!(signaler.current_pid(), None); + fn test_kill_signal_child_process_tracking() { + let kill_signal = KillSignal::default(); - signaler.notify_spawn(1234); - assert_eq!(signaler.current_pid(), Some(1234)); + // Initially no child process + assert_eq!(kill_signal.current_child_pid(), None); + #[cfg(unix)] + assert_eq!(kill_signal.current_child_pgid(), None); - signaler.notify_spawn(5678); - assert_eq!(signaler.current_pid(), Some(5678)); + // Set a child process + kill_signal.set_child_process(1234); + assert_eq!(kill_signal.current_child_pid(), Some(1234)); + // PGID is retrieved via syscall, so it might be None for a fake PID - signaler.notify_exit(); - assert_eq!(signaler.current_pid(), None); + // Clear the child process + kill_signal.clear_child_process(); + assert_eq!(kill_signal.current_child_pid(), None); + #[cfg(unix)] + assert_eq!(kill_signal.current_child_pgid(), None); } #[test] - fn test_process_signaler_clone() { - let signaler = ProcessSignaler::new(); - let signaler_clone = signaler.clone(); - - signaler.notify_spawn(1234); - - // Both should see the same PID since they share the inner state - assert_eq!(signaler.current_pid(), Some(1234)); - assert_eq!(signaler_clone.current_pid(), Some(1234)); - - signaler_clone.notify_exit(); - assert_eq!(signaler.current_pid(), None); - assert_eq!(signaler_clone.current_pid(), None); - } - - #[tokio::test] - async fn test_process_signaler_subscribe() { - let signaler = ProcessSignaler::new(); - let mut receiver = signaler.subscribe(); - - // Spawn notification should be received - signaler.notify_spawn(1234); - - let pid = receiver.recv().await.unwrap(); - assert_eq!(pid, 1234); - - // Multiple spawns should be received - signaler.notify_spawn(5678); - let pid2 = receiver.recv().await.unwrap(); - assert_eq!(pid2, 5678); - } - - #[tokio::test] - async fn test_process_signaler_multiple_subscribers() { - let signaler = ProcessSignaler::new(); - let mut receiver1 = signaler.subscribe(); - let mut receiver2 = signaler.subscribe(); - - signaler.notify_spawn(1234); - - // Both receivers should get the notification - let pid1 = receiver1.recv().await.unwrap(); - let pid2 = receiver2.recv().await.unwrap(); - assert_eq!(pid1, 1234); - assert_eq!(pid2, 1234); - } + fn test_child_signal_has_separate_child_process_tracking() { + let parent_signal = KillSignal::default(); + let child_signal = parent_signal.child_signal(); - #[test] - fn test_process_signaler_no_subscribers() { - // Should not panic when there are no subscribers - let signaler = ProcessSignaler::new(); - signaler.notify_spawn(1234); - signaler.notify_exit(); - - // PID should still be tracked even without subscribers - signaler.notify_spawn(5678); - assert_eq!(signaler.current_pid(), Some(5678)); + // Set child process on parent + parent_signal.set_child_process(1234); + assert_eq!(parent_signal.current_child_pid(), Some(1234)); + // Child signal should not see parent's child process + assert_eq!(child_signal.current_child_pid(), None); + + // Set child process on child signal + child_signal.set_child_process(5678); + assert_eq!(child_signal.current_child_pid(), Some(5678)); + // Parent should still see its own child process + assert_eq!(parent_signal.current_child_pid(), Some(1234)); } } diff --git a/tests/integration_test.rs b/tests/integration_test.rs index 6a36318..b36decf 100644 --- a/tests/integration_test.rs +++ b/tests/integration_test.rs @@ -6,8 +6,6 @@ use std::time::Instant; use deno_task_shell::ExecuteResult; use deno_task_shell::KillSignal; use deno_task_shell::SignalKind; -use deno_task_shell::execute_with_signaler; -use deno_task_shell::parser::parse; use futures::FutureExt; use self::test_builder::TestBuilder; @@ -1634,116 +1632,3 @@ fn no_such_file_error_text() -> &'static str { } } -#[tokio::test] -async fn test_process_signaler_with_execute() { - use std::collections::HashMap; - - let list = parse("echo hello").unwrap(); - let kill_signal = KillSignal::default(); - - let (signaler, execute_future) = execute_with_signaler( - list, - HashMap::new(), - std::env::current_dir().unwrap(), - HashMap::new(), - kill_signal, - ); - - // Subscribe before executing to ensure we receive the notification - let _receiver = signaler.subscribe(); - - // Run the execution in a local set - let local_set = tokio::task::LocalSet::new(); - let exit_code = local_set.run_until(execute_future).await; - - assert_eq!(exit_code, 0); - - // We should have received a PID notification for the echo command - // (echo is a builtin, so no external process is spawned) - // The test validates that the mechanism works without errors -} - -#[tokio::test] -async fn test_process_signaler_tracks_external_command() { - use std::cell::Cell; - use std::collections::HashMap; - use std::rc::Rc; - - // Use an external command that definitely spawns a process - // Note: We use /usr/bin/sleep directly because `sleep` is a builtin in deno_task_shell - let list = parse("/usr/bin/sleep 0.1").unwrap(); - let kill_signal = KillSignal::default(); - - let (signaler, execute_future) = execute_with_signaler( - list, - HashMap::new(), - std::env::current_dir().unwrap(), - HashMap::new(), - kill_signal, - ); - - let signaler_clone = signaler.clone(); - let seen_pid = Rc::new(Cell::new(None::)); - let seen_pid_clone = seen_pid.clone(); - - // Run the execution in a local set - let local_set = tokio::task::LocalSet::new(); - let exit_code = local_set - .run_until(async move { - // Spawn a local task to periodically check the current PID - let handle = tokio::task::spawn_local(async move { - // Check every 5ms for up to 1 second - for _ in 0..200 { - if let Some(pid) = signaler_clone.current_pid() { - seen_pid_clone.set(Some(pid)); - break; - } - tokio::time::sleep(Duration::from_millis(5)).await; - } - }); - - // Yield to let the checker task start - tokio::task::yield_now().await; - - let exit_code = execute_future.await; - - // Wait for the checker task to complete - let _ = handle.await; - - exit_code - }) - .await; - - assert_eq!(exit_code, 0); - assert!( - seen_pid.get().is_some(), - "Should have seen a PID during execution of the sleep command, got: {:?}", - seen_pid.get() - ); -} - -#[tokio::test] -async fn test_process_signaler_current_pid_after_execution() { - use std::collections::HashMap; - - // Use an external command that takes some time - let list = parse("/usr/bin/sleep 0.01").unwrap(); - let kill_signal = KillSignal::default(); - - let (signaler, execute_future) = execute_with_signaler( - list, - HashMap::new(), - std::env::current_dir().unwrap(), - HashMap::new(), - kill_signal, - ); - - // Run the execution in a local set - let local_set = tokio::task::LocalSet::new(); - let exit_code = local_set.run_until(execute_future).await; - - assert_eq!(exit_code, 0); - - // After execution, current_pid should be None - assert_eq!(signaler.current_pid(), None); -} From b6c834af92c4a71570fcccde7834e4af22c721cf Mon Sep 17 00:00:00 2001 From: Wolf Vollprecht Date: Mon, 1 Dec 2025 13:36:36 +0100 Subject: [PATCH 4/4] send PGID in SignalMessage to prevent double signaling --- src/shell/commands/executable.rs | 12 ++- src/shell/commands/mod.rs | 2 +- src/shell/mod.rs | 1 + src/shell/types.rs | 138 +++++++++++++++++++++++++++---- tests/integration_test.rs | 1 - 5 files changed, 133 insertions(+), 21 deletions(-) diff --git a/src/shell/commands/executable.rs b/src/shell/commands/executable.rs index 122c1ba..dfea6cf 100644 --- a/src/shell/commands/executable.rs +++ b/src/shell/commands/executable.rs @@ -68,6 +68,10 @@ impl ShellCommand for ExecutableCommand { // avoid deadlock since this is holding onto the pipes drop(sub_command); + // Get the child's PGID for signal forwarding decisions + #[cfg(unix)] + let child_pgid = kill_signal.current_child_pgid(); + loop { tokio::select! { result = child.wait() => match result { @@ -88,14 +92,16 @@ impl ShellCommand for ExecutableCommand { signal = kill_signal.wait_any() => { if let Some(_id) = child.id() { #[cfg(unix)] - kill(_id as i32, signal); + if signal.should_forward_to(child_pgid) { + kill(_id as i32, signal.kind); + } - if cfg!(not(unix)) && signal.causes_abort() { + if cfg!(not(unix)) && signal.kind.causes_abort() { let _ = child.start_kill(); let status = child.wait().await.ok(); kill_signal.clear_child_process(); return ExecuteResult::from_exit_code( - status.and_then(|s| s.code()).unwrap_or(signal.aborted_code()), + status.and_then(|s| s.code()).unwrap_or(signal.kind.aborted_code()), ); } } diff --git a/src/shell/commands/mod.rs b/src/shell/commands/mod.rs index 0a2d7e2..0f58bc9 100644 --- a/src/shell/commands/mod.rs +++ b/src/shell/commands/mod.rs @@ -131,7 +131,7 @@ macro_rules! execute_with_cancellation { result }, signal = $kill_signal.wait_aborted() => { - ExecuteResult::from_exit_code(signal.aborted_code()) + ExecuteResult::from_exit_code(signal.kind.aborted_code()) } } }; diff --git a/src/shell/mod.rs b/src/shell/mod.rs index 7f30d6b..f0a8d64 100644 --- a/src/shell/mod.rs +++ b/src/shell/mod.rs @@ -15,6 +15,7 @@ pub use types::ShellPipeReader; pub use types::ShellPipeWriter; pub use types::ShellState; pub use types::SignalKind; +pub use types::SignalMessage; pub use types::pipe; pub use which::CommandPathResolutionError; diff --git a/src/shell/types.rs b/src/shell/types.rs index 0c0fe68..d6f66ff 100644 --- a/src/shell/types.rs +++ b/src/shell/types.rs @@ -518,26 +518,26 @@ struct KillSignalInner { // then awaited after. If an abort happened between that then // it could be missed. aborted_code: RefCell>, - sender: broadcast::Sender, + sender: broadcast::Sender, children: RefCell>>, /// Information about the current child process. child_process: Cell, } impl KillSignalInner { - pub fn send(&self, signal_kind: SignalKind) { - if signal_kind.causes_abort() { + pub fn send(&self, signal: SignalMessage) { + if signal.kind.causes_abort() { let mut stored_aborted_code = self.aborted_code.borrow_mut(); if stored_aborted_code.is_none() { - *stored_aborted_code = Some(signal_kind.aborted_code()); + *stored_aborted_code = Some(signal.kind.aborted_code()); } } - _ = self.sender.send(signal_kind); + _ = self.sender.send(signal); // notify children self.children.borrow_mut().retain(|weak_child| { if let Some(child) = weak_child.upgrade() { - child.send(signal_kind); + child.send(signal); true } else { false // clean-up dropped children @@ -599,25 +599,35 @@ impl KillSignal { } } - /// Send a signal to commands being run. + /// Send a signal to commands being run (programmatic, always forwarded). pub fn send(&self, signal: SignalKind) { - self.0.send(signal) + self.0.send(SignalMessage::new(signal)) + } + + /// Send a signal that originated from a specific process group. + /// + /// Use this when forwarding signals that came from the terminal (e.g., Ctrl+C). + /// Children in the same process group as `origin_pgid` will not receive + /// a duplicate signal, since the terminal already sent it to them directly. + #[cfg(unix)] + pub fn send_from_pgid(&self, signal: SignalKind, origin_pgid: i32) { + self.0.send(SignalMessage::from_pgid(signal, origin_pgid)) } /// Waits for only signals deemed to abort a command. - pub async fn wait_aborted(&self) -> SignalKind { + pub async fn wait_aborted(&self) -> SignalMessage { let mut receiver = self.0.sender.subscribe(); loop { // unwrap is ok because we're holding a sender in `self` let signal = receiver.recv().await.unwrap(); - if signal.causes_abort() { + if signal.kind.causes_abort() { return signal; } } } /// Waits for any signal to be received. - pub async fn wait_any(&self) -> SignalKind { + pub async fn wait_any(&self) -> SignalMessage { let mut receiver = self.0.sender.subscribe(); // unwrap is ok because we're holding a sender in `self` receiver.recv().await.unwrap() @@ -700,6 +710,62 @@ pub enum SignalKind { Other(i32), } +/// A signal message that includes origin information to prevent double-signaling. +/// +/// When a signal originates from the terminal (e.g., Ctrl+C), it's sent to all +/// processes in the foreground process group. If the shell also forwards the +/// signal programmatically, children would receive it twice. The `origin_pgid` +/// field allows recipients to skip forwarding if the child is in the same +/// process group as the signal origin. +#[derive(Debug, Clone, Copy)] +pub struct SignalMessage { + pub kind: SignalKind, + /// The process group ID where this signal originated. + /// If Some, children in the same PGID already received this signal + /// from the terminal and should not be signaled again. + /// If None, the signal is programmatic and should always be forwarded. + #[cfg(unix)] + pub origin_pgid: Option, +} + +impl SignalMessage { + /// Create a new signal message for programmatic signals (always forwarded). + pub fn new(kind: SignalKind) -> Self { + Self { + kind, + #[cfg(unix)] + origin_pgid: None, + } + } + + /// Create a new signal message originating from a specific process group. + /// Children in the same PGID will not receive a duplicate signal. + #[cfg(unix)] + pub fn from_pgid(kind: SignalKind, pgid: i32) -> Self { + Self { + kind, + origin_pgid: Some(pgid), + } + } + + /// Check if a signal should be forwarded to a child process. + /// Returns true if the signal should be forwarded, false if the child + /// already received it from the terminal. + #[cfg(unix)] + pub fn should_forward_to(&self, child_pgid: Option) -> bool { + match (self.origin_pgid, child_pgid) { + (Some(origin), Some(child)) => origin != child, + _ => true, // Forward if we can't determine (programmatic or unknown PGID) + } + } + + /// On non-Unix platforms, always forward signals. + #[cfg(not(unix))] + pub fn should_forward_to(&self, _child_pgid: Option) -> bool { + true + } +} + impl SignalKind { pub fn causes_abort(&self) -> bool { match self { @@ -787,7 +853,7 @@ mod test { // Wait for the signal in the main task let signal = kill_signal.wait_any().await; - assert_eq!(signal, SignalKind::SIGTERM); + assert_eq!(signal.kind, SignalKind::SIGTERM); } #[tokio::test] @@ -810,7 +876,7 @@ mod test { ); for signal in [signals.0, signals.1, signals.2].into_iter() { - assert_eq!(signal, SignalKind::SIGKILL); + assert_eq!(signal.kind, SignalKind::SIGKILL); } assert_eq!(child_signal.aborted_code(), Some(128 + 9)); assert_eq!(sibling_signal.aborted_code(), Some(128 + 9)); @@ -846,7 +912,7 @@ mod test { // Wait for the aborting signal in the main task let signal = kill_signal.wait_aborted().await; - assert_eq!(signal, SignalKind::SIGABRT); + assert_eq!(signal.kind, SignalKind::SIGABRT); assert!(kill_signal.aborted_code().is_some()); } @@ -868,7 +934,7 @@ mod test { // Wait for the signal in the child let signal = child_signal.wait_aborted().await; - assert_eq!(signal, SignalKind::SIGQUIT); + assert_eq!(signal.kind, SignalKind::SIGQUIT); assert_eq!(parent_signal.aborted_code(), Some(128 + 3)); assert_eq!(child_signal.aborted_code(), Some(128 + 3)); } @@ -893,7 +959,7 @@ mod test { // Verify no panic occurred and the parent still functions let signal = parent_signal.wait_any().await; - assert_eq!(signal, SignalKind::SIGTERM); + assert_eq!(signal.kind, SignalKind::SIGTERM); } #[tokio::test] @@ -956,4 +1022,44 @@ mod test { // Parent should still see its own child process assert_eq!(parent_signal.current_child_pid(), Some(1234)); } + + #[cfg(unix)] + #[test] + fn test_signal_message_should_forward_to() { + use super::SignalMessage; + + // Programmatic signal (no origin_pgid) should always forward + let programmatic = SignalMessage::new(SignalKind::SIGINT); + assert!(programmatic.should_forward_to(Some(1000))); + assert!(programmatic.should_forward_to(Some(2000))); + assert!(programmatic.should_forward_to(None)); + + // Signal from PGID 1000 should NOT forward to child in same PGID + let from_tty = SignalMessage::from_pgid(SignalKind::SIGINT, 1000); + assert!(!from_tty.should_forward_to(Some(1000))); // Same PGID - don't forward + assert!(from_tty.should_forward_to(Some(2000))); // Different PGID - forward + assert!(from_tty.should_forward_to(None)); // Unknown PGID - forward (safe default) + } + + #[cfg(unix)] + #[tokio::test] + async fn test_send_from_pgid() { + let kill_signal = KillSignal::default(); + + // Spawn a task to send a signal from a specific PGID + let signal_sender = kill_signal.clone(); + deno_unsync::spawn(async move { + signal_sender.send_from_pgid(SignalKind::SIGINT, 12345); + }); + + // Wait for the signal + let signal = kill_signal.wait_any().await; + assert_eq!(signal.kind, SignalKind::SIGINT); + assert_eq!(signal.origin_pgid, Some(12345)); + + // Same PGID should not forward + assert!(!signal.should_forward_to(Some(12345))); + // Different PGID should forward + assert!(signal.should_forward_to(Some(99999))); + } } diff --git a/tests/integration_test.rs b/tests/integration_test.rs index b36decf..af2c5f7 100644 --- a/tests/integration_test.rs +++ b/tests/integration_test.rs @@ -1631,4 +1631,3 @@ fn no_such_file_error_text() -> &'static str { "No such file or directory (os error 2)" } } -