From 413f5336b2c4a9177c85d1f4c9cab4ddbb3c9994 Mon Sep 17 00:00:00 2001 From: lostbean Date: Sun, 12 Apr 2026 10:28:01 -0300 Subject: [PATCH] Add cancel_execution/1 API for in-flight execution Session.execute/3 blocks with :infinity timeout. The only way to stop a running script was Session.stop/1, destroying the entire session. Execution now runs in a spawned Task, allowing the GenServer to handle cancel_execution calls while a script is in progress. The cancelled execution returns {:error, result} with exit code 130 (SIGINT). Session state is preserved after cancellation. Implementation: - Spawn execution in Task.async from handle_call({:execute, ...}) - Store {from, ref, pid, collector} as current_execution in state - handle_info processes Task completion via handle_execution_result/4 - cancel_execution kills the Task and replies to both callers - Sink setup moved into Task to fix process-dictionary-based Collectables - Background job functions receive explicit session_pid for routing --- lib/bash/session.ex | 677 ++++++++++++++++++++----------------- test/bash/session_test.exs | 55 +++ 2 files changed, 416 insertions(+), 316 deletions(-) diff --git a/lib/bash/session.ex b/lib/bash/session.ex index 6aba3c5..1455b41 100644 --- a/lib/bash/session.ex +++ b/lib/bash/session.ex @@ -132,7 +132,8 @@ defmodule Bash.Session do # Callback for starting background jobs synchronously (used by Script executor) start_background_job_fn: nil, signal_jobs_fn: nil, - pipe_stdin: nil + pipe_stdin: nil, + current_execution: nil ] @type t :: %__MODULE__{ @@ -175,7 +176,8 @@ defmodule Bash.Session do completed_jobs: [Job.t()], command_history: [CommandResult.t()], special_vars: %{String.t() => integer() | String.t() | nil}, - positional_params: [[String.t()]] + positional_params: [[String.t()]], + current_execution: {GenServer.from(), reference(), pid(), pid()} | nil } # Client API @@ -398,6 +400,29 @@ defmodule Bash.Session do GenServer.call(session, {:execute, ast, opts}, :infinity) end + @doc """ + Cancels the currently executing script and returns the session to idle. + + The cancelled execution returns `{:error, result}` where `result` has + exit code 130 (matching bash's SIGINT convention). Session state + (variables, functions, working directory) is preserved. + + Returns `:ok` if an execution was cancelled, or `:ok` if no execution + is in progress (no-op). + + ## Examples + + {:ok, session} = Session.new() + task = Task.async(fn -> Session.execute(session, ast) end) + :ok = Session.cancel_execution(session) + {:error, result} = Task.await(task) + + """ + @spec cancel_execution(pid()) :: :ok + def cancel_execution(session) when is_pid(session) do + GenServer.call(session, :cancel_execution) + end + @doc """ Executes a command AST within this session asynchronously. @@ -1140,338 +1165,89 @@ defmodule Bash.Session do {:ok, collector} = OutputCollector.start_link() Process.link(collector) - # Create sinks from opts or use collector-backed sinks - stdout_sink = - case Keyword.get(opts, :stdout_into) do - nil -> Sink.collector(collector) - callback when is_function(callback, 1) -> callback - collectable -> Sink.stream(collectable, stream_type: :stdout) - end + session_pid = self() - stderr_sink = - case Keyword.get(opts, :stderr_into) do - nil -> Sink.collector(collector) - callback when is_function(callback, 1) -> callback - collectable -> Sink.stream(collectable, stream_type: :stderr) - end - - # Also support on_output callback by wrapping both sinks - {stdout_sink, stderr_sink} = - case Keyword.get(opts, :on_output) do - nil -> - {stdout_sink, stderr_sink} - - callback when is_function(callback, 1) -> - # Create sinks that write to both collector and callback - wrapped_stdout = fn chunk -> - stdout_sink.(chunk) - callback.(chunk) - :ok + task = + Task.async(fn -> + stdout_sink = + case Keyword.get(opts, :stdout_into) do + nil -> Sink.collector(collector) + callback when is_function(callback, 1) -> callback + collectable -> Sink.stream(collectable, stream_type: :stdout) end - wrapped_stderr = fn chunk -> - stderr_sink.(chunk) - callback.(chunk) - :ok + stderr_sink = + case Keyword.get(opts, :stderr_into) do + nil -> Sink.collector(collector) + callback when is_function(callback, 1) -> callback + collectable -> Sink.stream(collectable, stream_type: :stderr) end - {wrapped_stdout, wrapped_stderr} - end - - # Create a callback function for starting background jobs synchronously - # This allows Scripts to start jobs immediately and get the OS PID for $! - start_bg_job_fn = fn foreground_ast, current_state -> - start_background_job_sync(foreground_ast, current_state, state) - end - - # Create a callback function for sending signals to jobs/processes synchronously - # This allows Scripts to send signals immediately (kill builtin) - signal_jobs_fn = fn signal, targets, current_state -> - send_signals_sync(signal, targets, current_state, state) - end - - state_with_sinks = %{ - state - | output_collector: collector, - stdout_sink: stdout_sink, - stderr_sink: stderr_sink, - start_background_job_fn: start_bg_job_fn, - signal_jobs_fn: signal_jobs_fn - } - - # No executor_opts needed - sinks are on state now - case execute_command_in_session(ast, state_with_sinks, []) do - {:background, foreground_ast, _session_state} -> - # Command should be run in the background - # Create sinks backed by the session's PERSISTENT output_collector for the job - # The temporary sinks are used for the job notification [1], then transferred - persistent_stdout_sink = Sink.collector(state.output_collector) - persistent_stderr_sink = Sink.collector(state.output_collector) - - bg_state = %{ - state - | stdout_sink: persistent_stdout_sink, - stderr_sink: persistent_stderr_sink - } - - # Use temp sinks for job notification output (the [1] message) - state_with_sinks.stdout_sink.({:stdout, ""}) - - {:reply, reply_value, new_state} = do_start_background_job(foreground_ast, bg_state) - # Transfer job notification output to session's persistent collector - transfer_and_cleanup_collector(collector, state.output_collector) - {:reply, reply_value, new_state} - - {:background, executed_script, state_updates, - {:background, foreground_ast, _bg_session_state}} -> - # Background command from within a Script - apply state updates and start the job - # Create sinks backed by the session's PERSISTENT output_collector for the job - persistent_stdout_sink = Sink.collector(state.output_collector) - persistent_stderr_sink = Sink.collector(state.output_collector) - - bg_state = apply_state_updates(state, state_updates) - - bg_state = %{ - bg_state - | stdout_sink: persistent_stdout_sink, - stderr_sink: persistent_stderr_sink - } - - {:reply, _bg_result, new_state} = do_start_background_job(foreground_ast, bg_state) - - # Transfer output (including job notification [1]) to session's persistent collector - # so it can be read after the command completes - transfer_and_cleanup_collector(collector, state.output_collector) - - # Update executed_script to use session's persistent collector since temp was cleaned up - executed_script_with_collector = %{executed_script | collector: state.output_collector} - - final_state = - new_state - |> append_to_history(executed_script_with_collector) - |> update_exit_status(executed_script_with_collector) - - {:reply, {:ok, executed_script_with_collector}, final_state} - - # Handle special job control builtin return values from Script execution - # These include the executed_script so we can return it with proper collector - {:foreground_job, job_number, executed_script, script_updates} -> - # Keep collector alive for Script result - handle_foreground_job_with_script( - job_number, - executed_script, - script_updates, - collector, - from, - state - ) - - {:background_job, job_numbers, executed_script, script_updates} -> - # Keep collector alive for Script result - handle_background_jobs_with_script( - job_numbers, - executed_script, - script_updates, - collector, - state - ) - - {:wait_for_jobs, job_specs, executed_script, script_updates} -> - # Transfer collected output to persistent collector - transfer_and_cleanup_collector(collector, state.output_collector) - # Update script to use persistent collector - executed_script_with_collector = %{executed_script | collector: state.output_collector} - - handle_wait_for_jobs_with_script( - job_specs, - executed_script_with_collector, - script_updates, - from, - state - ) - - {:signal_jobs, signal, targets, executed_script, script_updates} -> - # Keep collector alive for Script result, perform signal operation - handle_signal_jobs_with_script( - signal, - targets, - executed_script, - script_updates, - collector, - state - ) - - # Legacy job control returns (without script) - for non-Script callers - {:foreground_job, job_number} -> - cleanup_collector(collector) - handle_foreground_job(job_number, from, state) + {stdout_sink, stderr_sink} = + case Keyword.get(opts, :on_output) do + nil -> + {stdout_sink, stderr_sink} - {:background_job, job_numbers} -> - cleanup_collector(collector) - handle_background_jobs(job_numbers, state) - - {:wait_for_jobs, job_specs} -> - # Transfer collected output to persistent collector before cleaning up - transfer_and_cleanup_collector(collector, state.output_collector) - handle_wait_for_jobs(job_specs, from, state) - - {:signal_jobs, signal, targets} -> - cleanup_collector(collector) - handle_signal_jobs(signal, targets, state) - - {:ok, result, state_updates} -> - # Command succeeded with state updates (e.g., cd, ForLoop, history) - # Transfer output to session's persistent collector for session_stdout access - transfer_to_persistent_collector(collector, state.output_collector, result) - - # Note: append to history first, then apply updates - # This allows history -c to clear including itself - new_state = - state - |> append_to_history(result) - |> apply_state_updates(state_updates) - |> update_exit_status(result) + callback when is_function(callback, 1) -> + wrapped_stdout = fn chunk -> + stdout_sink.(chunk) + callback.(chunk) + :ok + end - # Check errexit - exit if enabled and command had non-zero exit - # Check onecmd (-t) - exit after reading and executing one command - # Pass state_updates to avoid triggering on the command that sets onecmd - cond do - should_errexit?(result, new_state) -> - {:reply, {:exit, result}, new_state} + wrapped_stderr = fn chunk -> + stderr_sink.(chunk) + callback.(chunk) + :ok + end - should_onecmd_exit?(new_state, state_updates) -> - {:reply, {:exit, result}, new_state} + {wrapped_stdout, wrapped_stderr} + end - true -> - {:reply, {:ok, result}, new_state} + start_bg_job_fn = fn foreground_ast, current_state -> + start_background_job_sync(foreground_ast, current_state, state, session_pid) end - {:ok, result} -> - # Normal result without state updates - # Transfer output to session's persistent collector for session_stdout access - transfer_to_persistent_collector(collector, state.output_collector, result) - - new_state = - state - |> append_to_history(result) - |> update_exit_status(result) - - # Check errexit - # Check onecmd (-t) - exit after reading and executing one command - cond do - should_errexit?(result, new_state) -> - {:reply, {:exit, result}, new_state} - - should_onecmd_exit?(new_state) -> - {:reply, {:exit, result}, new_state} - - true -> - {:reply, {:ok, result}, new_state} + signal_jobs_fn = fn signal, targets, current_state -> + send_signals_sync(signal, targets, current_state, state) end - {:error, result, state_updates} -> - # Error result with state updates - # For Script results, keep collector alive for output reading - handle_collector_for_result(collector, result) - - new_state = + state_with_sinks = %{ state - |> append_to_history(result) - |> apply_state_updates(state_updates) - |> update_exit_status(result) - - # Check errexit - # Check onecmd (-t) - exit after reading and executing one command - # Pass state_updates to avoid triggering on the command that sets onecmd - cond do - should_errexit?(result, new_state) -> - {:reply, {:exit, result}, new_state} - - should_onecmd_exit?(new_state, state_updates) -> - {:reply, {:exit, result}, new_state} - - true -> - {:reply, {:error, result}, new_state} - end - - {:error, result} -> - # Error result without state updates - # For Script results, keep collector alive for output reading - handle_collector_for_result(collector, result) - - new_state = - state - |> append_to_history(result) - |> update_exit_status(result) - - # Check errexit - # Check onecmd (-t) - exit after reading and executing one command - cond do - should_errexit?(result, new_state) -> - {:reply, {:exit, result}, new_state} - - should_onecmd_exit?(new_state) -> - {:reply, {:exit, result}, new_state} - - true -> - {:reply, {:error, result}, new_state} - end - - {:exit, result, state_updates} -> - # Script exit with state updates - # For Script results, keep collector alive for output reading - handle_collector_for_result(collector, result) - - new_state = - state - |> append_to_history(result) - |> apply_state_updates(state_updates) - |> update_exit_status(result) - - {:reply, {:exit, result}, new_state} - - {:exit, result} -> - # Script exit without state updates - # For Script results, keep collector alive for output reading - handle_collector_for_result(collector, result) - - new_state = - state - |> append_to_history(result) - |> update_exit_status(result) + | output_collector: collector, + stdout_sink: stdout_sink, + stderr_sink: stderr_sink, + start_background_job_fn: start_bg_job_fn, + signal_jobs_fn: signal_jobs_fn + } - {:reply, {:exit, result}, new_state} + execute_command_in_session(ast, state_with_sinks, []) + end) - {:exec, result, state_updates} -> - # Exec replaces shell with command - with state updates - # For Script results, keep collector alive for output reading - handle_collector_for_result(collector, result) + Process.unlink(task.pid) - new_state = - state - |> append_to_history(result) - |> apply_state_updates(state_updates) - |> update_exit_status(result) + {:noreply, %{state | current_execution: {from, task.ref, task.pid, collector}}} + end - {:reply, {:exec, result}, new_state} + def handle_call(:cancel_execution, _from, %{current_execution: nil} = state) do + {:reply, :ok, state} + end - {:exec, result} -> - # Exec replaces shell with command - without state updates - # For Script results, keep collector alive for output reading - handle_collector_for_result(collector, result) + def handle_call(:cancel_execution, _from, state) do + {exec_from, task_ref, task_pid, collector} = state.current_execution + Process.demonitor(task_ref, [:flush]) + Process.exit(task_pid, :kill) - new_state = - state - |> append_to_history(result) - |> update_exit_status(result) + cleanup_collector(collector) - {:reply, {:exec, result}, new_state} + cancel_result = %CommandResult{ + command: "cancelled", + exit_code: 130, + error: :cancelled + } - result -> - # Other result patterns - cleanup collector - cleanup_collector(collector) - {:reply, result, state} - end + GenServer.reply(exec_from, {:error, cancel_result}) + {:reply, :ok, %{state | current_execution: nil}} end def handle_call({:start_background_job, opts}, _from, state) do @@ -1798,6 +1574,39 @@ defmodule Bash.Session do end end + def handle_info( + {ref, execution_result}, + %{current_execution: {from, ref, _task_pid, collector}} = state + ) do + Process.demonitor(ref, [:flush]) + + case handle_execution_result(execution_result, collector, from, state) do + {:reply, reply, new_state} -> + GenServer.reply(from, reply) + {:noreply, %{new_state | current_execution: nil}} + + {:noreply, new_state} -> + {:noreply, %{new_state | current_execution: nil}} + end + end + + def handle_info( + {:DOWN, ref, :process, _pid, reason}, + %{current_execution: {from, ref, _task_pid, collector}} = state + ) + when reason != :normal do + cleanup_collector(collector) + + cancel_result = %CommandResult{ + command: "cancelled", + exit_code: 130, + error: :cancelled + } + + GenServer.reply(from, {:error, cancel_result}) + {:noreply, %{state | current_execution: nil}} + end + def handle_info(_msg, state), do: {:noreply, state} @impl GenServer @@ -1812,6 +1621,235 @@ defmodule Bash.Session do :ok end + # Process the result from an execution task. + # Returns {:reply, reply, new_state} or {:noreply, new_state}. + defp handle_execution_result(execution_result, collector, from, state) do + case execution_result do + {:background, foreground_ast, _session_state} -> + persistent_stdout_sink = Sink.collector(state.output_collector) + persistent_stderr_sink = Sink.collector(state.output_collector) + + bg_state = %{ + state + | stdout_sink: persistent_stdout_sink, + stderr_sink: persistent_stderr_sink + } + + Sink.collector(collector).({:stdout, ""}) + + {:reply, reply_value, new_state} = do_start_background_job(foreground_ast, bg_state) + transfer_and_cleanup_collector(collector, state.output_collector) + {:reply, reply_value, new_state} + + {:background, executed_script, state_updates, + {:background, foreground_ast, _bg_session_state}} -> + persistent_stdout_sink = Sink.collector(state.output_collector) + persistent_stderr_sink = Sink.collector(state.output_collector) + + bg_state = apply_state_updates(state, state_updates) + + bg_state = %{ + bg_state + | stdout_sink: persistent_stdout_sink, + stderr_sink: persistent_stderr_sink + } + + {:reply, _bg_result, new_state} = do_start_background_job(foreground_ast, bg_state) + + transfer_and_cleanup_collector(collector, state.output_collector) + + executed_script_with_collector = %{executed_script | collector: state.output_collector} + + final_state = + new_state + |> append_to_history(executed_script_with_collector) + |> update_exit_status(executed_script_with_collector) + + {:reply, {:ok, executed_script_with_collector}, final_state} + + {:foreground_job, job_number, executed_script, script_updates} -> + handle_foreground_job_with_script( + job_number, + executed_script, + script_updates, + collector, + from, + state + ) + + {:background_job, job_numbers, executed_script, script_updates} -> + handle_background_jobs_with_script( + job_numbers, + executed_script, + script_updates, + collector, + state + ) + + {:wait_for_jobs, job_specs, executed_script, script_updates} -> + transfer_and_cleanup_collector(collector, state.output_collector) + executed_script_with_collector = %{executed_script | collector: state.output_collector} + + handle_wait_for_jobs_with_script( + job_specs, + executed_script_with_collector, + script_updates, + from, + state + ) + + {:signal_jobs, signal, targets, executed_script, script_updates} -> + handle_signal_jobs_with_script( + signal, + targets, + executed_script, + script_updates, + collector, + state + ) + + {:foreground_job, job_number} -> + cleanup_collector(collector) + handle_foreground_job(job_number, from, state) + + {:background_job, job_numbers} -> + cleanup_collector(collector) + handle_background_jobs(job_numbers, state) + + {:wait_for_jobs, job_specs} -> + transfer_and_cleanup_collector(collector, state.output_collector) + handle_wait_for_jobs(job_specs, from, state) + + {:signal_jobs, signal, targets} -> + cleanup_collector(collector) + handle_signal_jobs(signal, targets, state) + + {:ok, result, state_updates} -> + transfer_to_persistent_collector(collector, state.output_collector, result) + + new_state = + state + |> append_to_history(result) + |> apply_state_updates(state_updates) + |> update_exit_status(result) + + cond do + should_errexit?(result, new_state) -> + {:reply, {:exit, result}, new_state} + + should_onecmd_exit?(new_state, state_updates) -> + {:reply, {:exit, result}, new_state} + + true -> + {:reply, {:ok, result}, new_state} + end + + {:ok, result} -> + transfer_to_persistent_collector(collector, state.output_collector, result) + + new_state = + state + |> append_to_history(result) + |> update_exit_status(result) + + cond do + should_errexit?(result, new_state) -> + {:reply, {:exit, result}, new_state} + + should_onecmd_exit?(new_state) -> + {:reply, {:exit, result}, new_state} + + true -> + {:reply, {:ok, result}, new_state} + end + + {:error, result, state_updates} -> + handle_collector_for_result(collector, result) + + new_state = + state + |> append_to_history(result) + |> apply_state_updates(state_updates) + |> update_exit_status(result) + + cond do + should_errexit?(result, new_state) -> + {:reply, {:exit, result}, new_state} + + should_onecmd_exit?(new_state, state_updates) -> + {:reply, {:exit, result}, new_state} + + true -> + {:reply, {:error, result}, new_state} + end + + {:error, result} -> + handle_collector_for_result(collector, result) + + new_state = + state + |> append_to_history(result) + |> update_exit_status(result) + + cond do + should_errexit?(result, new_state) -> + {:reply, {:exit, result}, new_state} + + should_onecmd_exit?(new_state) -> + {:reply, {:exit, result}, new_state} + + true -> + {:reply, {:error, result}, new_state} + end + + {:exit, result, state_updates} -> + handle_collector_for_result(collector, result) + + new_state = + state + |> append_to_history(result) + |> apply_state_updates(state_updates) + |> update_exit_status(result) + + {:reply, {:exit, result}, new_state} + + {:exit, result} -> + handle_collector_for_result(collector, result) + + new_state = + state + |> append_to_history(result) + |> update_exit_status(result) + + {:reply, {:exit, result}, new_state} + + {:exec, result, state_updates} -> + handle_collector_for_result(collector, result) + + new_state = + state + |> append_to_history(result) + |> apply_state_updates(state_updates) + |> update_exit_status(result) + + {:reply, {:exec, result}, new_state} + + {:exec, result} -> + handle_collector_for_result(collector, result) + + new_state = + state + |> append_to_history(result) + |> update_exit_status(result) + + {:reply, {:exec, result}, new_state} + + result -> + cleanup_collector(collector) + {:reply, result, state} + end + end + defp stop_all_coprocs(state) do state.file_descriptors |> Enum.reduce(MapSet.new(), fn @@ -2082,7 +2120,12 @@ defmodule Bash.Session do # Start a background job synchronously and return the OS PID # This is used by Script executor to get $! immediately when & is encountered # Returns {os_pid_string, updated_session_state} or {:error, reason} - defp start_background_job_sync(foreground_ast, current_session_state, original_state) do + defp start_background_job_sync( + foreground_ast, + current_session_state, + original_state, + session_pid \\ self() + ) do {command, args, _command_string} = case foreground_ast do %Compound{} -> @@ -2108,7 +2151,8 @@ defmodule Bash.Session do args, foreground_ast, current_session_state, - original_state + original_state, + session_pid ) end end @@ -2118,7 +2162,8 @@ defmodule Bash.Session do args, _foreground_ast, current_session_state, - original_state + original_state, + session_pid ) do job_number = case Map.get(current_session_state, :next_job_number) do @@ -2134,7 +2179,7 @@ defmodule Bash.Session do job_number: job_number, command: command, args: args, - session_pid: self(), + session_pid: session_pid, working_dir: current_session_state.working_dir, env: Enum.map(current_session_state.variables, fn {k, v} -> diff --git a/test/bash/session_test.exs b/test/bash/session_test.exs index dac40c7..ab6b321 100644 --- a/test/bash/session_test.exs +++ b/test/bash/session_test.exs @@ -596,6 +596,61 @@ defmodule Bash.SessionTest do end end + describe "cancel_execution" do + test "cancels a running execution and returns error with exit code 130", %{session: session} do + run_script(session, "x=42") + + {:ok, ast} = Bash.Parser.parse("while true; do echo x; done") + + task = + Task.async(fn -> + Session.execute(session, ast) + end) + + Process.sleep(50) + assert :ok = Session.cancel_execution(session) + assert {:error, result} = Task.await(task, 5000) + assert result.exit_code == 130 + end + + test "session state is preserved after cancellation", %{session: session} do + run_script(session, "x=42") + + {:ok, ast} = Bash.Parser.parse("while true; do echo x; done") + + task = + Task.async(fn -> + Session.execute(session, ast) + end) + + Process.sleep(50) + Session.cancel_execution(session) + Task.await(task, 5000) + + assert get_var(session, "x") == "42" + end + + test "session is usable after cancellation", %{session: session} do + {:ok, ast} = Bash.Parser.parse("while true; do echo x; done") + + task = + Task.async(fn -> + Session.execute(session, ast) + end) + + Process.sleep(50) + Session.cancel_execution(session) + Task.await(task, 5000) + + result = run_script(session, "echo hello") + assert get_stdout(result) == "hello\n" + end + + test "cancel_execution is a no-op when nothing is running", %{session: session} do + assert :ok = Session.cancel_execution(session) + end + end + defmodule TestAPI do @moduledoc false use Bash.Interop, namespace: "session_test"