diff --git a/README.md b/README.md index c8dc1cb..9fae026 100644 --- a/README.md +++ b/README.md @@ -161,6 +161,66 @@ Bash.stdout(result) #=> "/tmp\n" ``` +### Async Execution & Cancellation + +`Bash.Session.execute_async/3` returns a handle so you can cancel a long-running +script — including running any user-defined `trap` handlers before exit. + +```elixir +{:ok, session} = Bash.Session.new() +{:ok, ast} = Bash.Parser.parse(""" + trap 'echo cleanup' EXIT + while true; do echo working; done + """) + +{:ok, exec_ref} = Bash.Session.execute_async(session, ast) + +# ... later, from anywhere with the session pid or the ref ... +:ok = Bash.Session.signal(exec_ref, :sigint) + +{:error, result} = Bash.Session.await(exec_ref, 5_000) +result.exit_code +#=> 130 + +# `cleanup` was emitted by the EXIT trap before the script exited +Bash.Session.get_output(session) |> elem(0) +#=> "working\n...working\ncleanup\n" +``` + +**Signals:** + + * `:sigint` — cooperative cancel, runs `INT` then `EXIT` traps, exit 130 + * `:sigterm` — cooperative cancel, runs `TERM` then `EXIT` traps, exit 143 + * `:sigkill` — untrappable hard kill, exit 137 (use when traps may hang) + +**Grace period:** Pass `grace: ms` to escalate a cooperative cancel to +`:sigkill` if it doesn't land within the window. Useful when a script may +be stuck in a non-yielding operation. + +```elixir +:ok = Bash.Session.signal(exec_ref, :sigint, grace: 5_000) +# If INT cancel hasn't landed within 5s, sigkill is sent automatically. +``` + +**Queueing:** Calling `execute_async/3` while another execution is in flight +queues the new one. Each chunk gets its own `ExecRef` and runs sequentially with +shared session state (variables, traps, cwd). To cancel a queued execution +before it runs, signal its ref — it's removed from the queue and `await/2` +returns the matching error result. + +```elixir +{:ok, ref1} = Bash.Session.execute_async(session, slow_ast) +{:ok, ref2} = Bash.Session.execute_async(session, follow_up_ast) +{:ok, ref3} = Bash.Session.execute_async(session, never_run_ast) + +:ok = Bash.Session.signal(ref3, :sigint) # cancel before it runs +{:error, %{exit_code: 130}} = Bash.Session.await(ref3) + +# ref1 still running, ref2 still queued — both proceed normally +{:ok, _} = Bash.Session.await(ref1, 30_000) +{:ok, _} = Bash.Session.await(ref2, 30_000) +``` + ### Elixir Interop Define Elixir functions callable from Bash: diff --git a/lib/bash/ast/for_loop.ex b/lib/bash/ast/for_loop.ex index 3a0e738..cc4a872 100644 --- a/lib/bash/ast/for_loop.ex +++ b/lib/bash/ast/for_loop.ex @@ -231,6 +231,7 @@ defmodule Bash.AST.ForLoop do end defp execute_for_loop([item | rest], var_name, body, session_state, env_acc, count, _last_exit) do + Executor.check_cancel(session_state) # Set the loop variable in the session state with accumulated updates new_variables = Map.merge( diff --git a/lib/bash/ast/helpers.ex b/lib/bash/ast/helpers.ex index 6e3dc3c..46f9270 100644 --- a/lib/bash/ast/helpers.ex +++ b/lib/bash/ast/helpers.ex @@ -47,6 +47,8 @@ defmodule Bash.AST.Helpers do options: acc_options } + Executor.check_cancel(stmt_session) + case Executor.execute(stmt, stmt_session, nil) do {:ok, result, updates} -> variables_from_stmt = Map.get(updates, :variables, %{}) diff --git a/lib/bash/ast/while_loop.ex b/lib/bash/ast/while_loop.ex index facc6c7..f517f76 100644 --- a/lib/bash/ast/while_loop.ex +++ b/lib/bash/ast/while_loop.ex @@ -217,6 +217,8 @@ defmodule Bash.AST.WhileLoop do iteration, effective_stdin ) do + Executor.check_cancel(session_state) + # Safety check to prevent infinite loops if iteration >= @max_loop_iterations do # Write error to stderr sink diff --git a/lib/bash/builtin/trap.ex b/lib/bash/builtin/trap.ex index 7ece2fb..14e1cbc 100644 --- a/lib/bash/builtin/trap.ex +++ b/lib/bash/builtin/trap.ex @@ -423,4 +423,29 @@ defmodule Bash.Builtin.Trap do def get_return_trap(session_state) do get_trap(session_state, "RETURN") end + + # Run the trap registered for the given signal name (e.g. "INT", "EXIT"). + # No-op if no trap is set, the trap is :ignore, the signal name is invalid, + # or the session is already inside a trap (the :in_trap flag prevents + # recursion). Returns :ok regardless of trap success. + @doc false + @spec run(Bash.Session.t(), String.t()) :: :ok + def run(%{in_trap: true}, _signal_name), do: :ok + + def run(session_state, signal_name) do + session_state + |> get_trap(signal_name) + |> do_run(session_state) + end + + defp do_run(nil, _state), do: :ok + defp do_run(:ignore, _state), do: :ok + + defp do_run(command, state) when is_binary(command) do + with {:ok, ast} <- Bash.Parser.parse(command) do + Bash.AST.Helpers.execute_body(ast.statements, %{state | in_trap: true}, %{}) + end + + :ok + end end diff --git a/lib/bash/executor.ex b/lib/bash/executor.ex index 49b903d..bae77bd 100644 --- a/lib/bash/executor.ex +++ b/lib/bash/executor.ex @@ -84,4 +84,25 @@ defmodule Bash.Executor do def execute(token, _session_state, _stdin, _opts) do {:error, {:invalid_ast, token}} end + + @doc """ + Yield to a pending cooperative cancel. + + Loop iterators call this at the start of each iteration with the current + threaded session state. If the session has signalled this Task with + `{:cancel, signal}` (via `Bash.Session.signal/2` with `:sigint`/`:sigterm`), + this throws `{:cancelled, signal, state}` so the Task closure can run any + matching trap (using the current traps map) and return a cancellation + result. Returns `:ok` when no cancel is pending. + """ + @spec check_cancel(map()) :: :ok | no_return() + def check_cancel(%{in_trap: true}), do: :ok + + def check_cancel(state) do + receive do + {:cancel, signal} -> throw({:cancelled, signal, state}) + after + 0 -> :ok + end + end end diff --git a/lib/bash/script.ex b/lib/bash/script.ex index 10a6da8..5d5b68b 100644 --- a/lib/bash/script.ex +++ b/lib/bash/script.ex @@ -20,11 +20,9 @@ defmodule Bash.Script do """ alias Bash.AST - alias Bash.AST.Helpers alias Bash.AST.Pipeline alias Bash.Builtin.Trap alias Bash.Executor - alias Bash.Parser alias Bash.Variable @type separator :: {:separator, String.t()} @@ -247,34 +245,7 @@ defmodule Bash.Script do defp mark_executed(%{exit_code: _} = stmt, code), do: %{stmt | exit_code: code} defp mark_executed(stmt, _code), do: stmt - # Execute EXIT trap if one is set - # The EXIT trap runs when the shell exits (script finishes) - defp execute_exit_trap(session_state) do - # Skip if already executing a trap (prevent infinite recursion) - if Map.get(session_state, :in_trap, false) do - :ok - else - case Trap.get_exit_trap(session_state) do - nil -> - :ok - - :ignore -> - :ok - - trap_command when is_binary(trap_command) -> - # Parse and execute the trap command - case Parser.parse(trap_command) do - {:ok, ast} -> - # Execute trap with in_trap flag to prevent recursion - trap_session = Map.put(session_state, :in_trap, true) - Helpers.execute_body(ast.statements, trap_session, %{}) - - {:error, _, _, _} -> - :ok - end - end - end - end + defp execute_exit_trap(session_state), do: Trap.run(session_state, "EXIT") # Execute statements sequentially, accumulating results defp execute_statements([], _session_state, executed, last_exit_code, output, updates) do @@ -321,6 +292,8 @@ defmodule Bash.Script do # In noexec mode, read but don't execute - continue with exit code 0 execute_statements(rest, session_state, [stmt | executed], 0, output, updates) else + Executor.check_cancel(updated_session) + case Executor.execute(stmt, updated_session, nil) do {:ok, executed_stmt, stmt_updates} -> new_output = output ++ extract_output(executed_stmt) diff --git a/lib/bash/session.ex b/lib/bash/session.ex index 6aba3c5..f027f61 100644 --- a/lib/bash/session.ex +++ b/lib/bash/session.ex @@ -65,6 +65,8 @@ defmodule Bash.Session do alias Bash.Variable alias Bash.AST.Function alias Bash.Execution + alias Bash.Builtin.Trap + alias Bash.Session.ExecRef defstruct [ :id, @@ -132,7 +134,10 @@ defmodule Bash.Session do # Callback for starting background jobs synchronously (used by Script executor) start_background_job_fn: nil, signal_jobs_fn: nil, - pipe_stdin: nil + pipe_stdin: nil, + current_execution: nil, + pending_executions: [], + in_trap: false ] @type t :: %__MODULE__{ @@ -175,9 +180,27 @@ defmodule Bash.Session do completed_jobs: [Job.t()], command_history: [CommandResult.t()], special_vars: %{String.t() => integer() | String.t() | nil}, - positional_params: [[String.t()]] + positional_params: [[String.t()]], + current_execution: running_execution() | nil, + pending_executions: [pending_execution()], + in_trap: boolean() } + @typep running_execution :: %{ + ref: reference(), + caller: pid(), + task_pid: pid(), + task_ref: reference(), + collector: pid() + } + + @typep pending_execution :: %{ + ref: reference(), + caller: pid(), + ast: term(), + opts: keyword() + } + # Client API @doc """ @@ -394,27 +417,203 @@ defmodule Bash.Session do end) """ + @spec execute(pid(), AST.t(), keyword()) :: term() def execute(session, ast, opts \\ []) do - GenServer.call(session, {:execute, ast, opts}, :infinity) + {:ok, exec_ref} = execute_async(session, ast, opts) + await(exec_ref, Keyword.get(opts, :timeout, :infinity)) end @doc """ - Executes a command AST within this session asynchronously. + Executes a command AST asynchronously and returns a handle. + + Returns `{:ok, exec_ref}` immediately. Pass the `exec_ref` to `await/2` + to retrieve the result, or to `signal/2` to interrupt the execution. - Returns immediately without waiting for the command to complete. - The result will be stored in the session's command history. + Bash semantics allow only one foreground execution per session at a time. + If another execution is already running, this call queues behind it; the + queued execution starts as soon as the current one finishes (or is + cancelled). State (variables, traps, working directory) flows from one + chunk to the next, so two `execute_async/3` calls behave like appending + AST chunks to a single sequential script — but each chunk has its own + awaitable handle and can be cancelled independently. ## Examples {:ok, session} = Session.new() - :ok = Session.execute_async(session, ast) - # Command executes in background + {:ok, exec_ref} = Session.execute_async(session, ast) + {:ok, result} = Session.await(exec_ref) + + # Cancel an in-flight execution; user-defined `trap` handlers run. + {:ok, exec_ref} = Session.execute_async(session, long_running_ast) + :ok = Session.signal(exec_ref, :sigint) + {:error, %CommandResult{exit_code: 130}} = Session.await(exec_ref) + + # Queue multiple chunks; each runs after the previous one completes. + {:ok, ref1} = Session.execute_async(session, parsed("echo first")) + {:ok, ref2} = Session.execute_async(session, parsed("echo second")) + {:ok, _} = Session.await(ref1) + {:ok, _} = Session.await(ref2) + + # Cancel a queued chunk before it runs: + {:ok, ref3} = Session.execute_async(session, parsed("never")) + :ok = Session.signal(ref3, :sigint) + {:error, %CommandResult{exit_code: 130}} = Session.await(ref3) + + """ + @spec execute_async(pid(), AST.t(), keyword()) :: {:ok, ExecRef.t()} + def execute_async(session, ast, opts \\ []) do + monitor = Process.monitor(session) + + case GenServer.call(session, {:execute_async, ast, opts, self()}) do + {:ok, ref} -> + {:ok, %ExecRef{session: session, ref: ref, monitor: monitor}} + + {:error, _} = error -> + Process.demonitor(monitor, [:flush]) + error + end + end + + @doc """ + Awaits the result of an execution started with `execute_async/3`. + + Blocks until the execution completes, is cancelled, or `timeout` elapses. + + Returns: + + * `{:ok | :error | :exit | :exec, CommandResult.t()}` — execution finished + (or was cancelled — exit code reflects the signal: 130/143/137 with + `error: :cancelled`). + * `{:error, :timeout}` — `timeout` exceeded; the execution is still in + flight or queued. Call again later or `signal/2` to cancel. + * `{:error, {:session_down, reason}}` — the session GenServer crashed + while this execution was outstanding. + + ## Examples + + {:ok, exec_ref} = Session.execute_async(session, ast) + + # Wait indefinitely + {:ok, result} = Session.await(exec_ref) + + # Wait at most 5 seconds; cancel and try again on timeout + case Session.await(exec_ref, 5_000) do + {:error, :timeout} -> + Session.signal(exec_ref, :sigint) + Session.await(exec_ref) + + result -> + result + end """ - def execute_async(session, ast) do - GenServer.cast(session, {:execute_async, ast}) + @spec await(ExecRef.t(), timeout()) :: + {:ok | :error | :exit | :exec, CommandResult.t()} + | {:error, :timeout | {:session_down, term()}} + def await(%ExecRef{ref: ref, monitor: monitor}, timeout \\ :infinity) do + receive do + {^ref, result} -> + Process.demonitor(monitor, [:flush]) + result + + {:DOWN, ^monitor, :process, _, reason} -> + {:error, {:session_down, reason}} + after + timeout -> {:error, :timeout} + end end + @doc """ + Sends a signal to a foreground execution. + + ## Targeting + + * Pass a session `pid()` — signals the currently running foreground + execution, if any. Pending (queued) executions are unaffected. + * Pass an `ExecRef.t()` — signals that specific execution, whether it's + currently running or still queued. + + Returns `:ok` if a signal was delivered, `{:error, :not_found}` if no + matching execution is in flight. + + ## Signals + + * `:sigint` — cooperative cancel. The script's `INT` and `EXIT` traps run + (with output captured in the session's collector); exit code 130. + * `:sigterm` — cooperative cancel. The script's `TERM` and `EXIT` traps + run; exit code 143. + * `:sigkill` — untrappable hard kill. No traps run. Use this when traps + could hang. Exit code 137. + + Cancelling a queued (not-yet-running) execution removes it from the queue + and produces an `{:error, %CommandResult{exit_code: ..., error: :cancelled}}` + via `await/2`. No traps run because the script never executed. + + For signaling background jobs (started via `&` or job control), use + `signal_job/3`. + + ## Options + + * `:grace` — milliseconds to wait for a cooperative `:sigint`/`:sigterm` + to land before escalating to `:sigkill`. Useful when a script may be + stuck in a non-yielding builtin or a misbehaving trap. Ignored for + `:sigkill` and for queued (not-running) executions. + + ## Examples + + # Signal a specific execution by ref + {:ok, exec_ref} = Session.execute_async(session, long_ast) + :ok = Session.signal(exec_ref, :sigint) + {:error, %CommandResult{exit_code: 130}} = Session.await(exec_ref) + + # Signal whatever is running by session pid (no ref needed) + :ok = Session.signal(session, :sigterm) + + # Hard kill — bypasses traps + :ok = Session.signal(exec_ref, :sigkill) + + # Cooperative cancel with automatic escalation to :sigkill after 5s + :ok = Session.signal(exec_ref, :sigint, grace: 5_000) + + # Cancel a queued execution before it runs + {:ok, ref1} = Session.execute_async(session, current_ast) + {:ok, ref2} = Session.execute_async(session, queued_ast) + :ok = Session.signal(ref2, :sigint) + {:error, %CommandResult{exit_code: 130}} = Session.await(ref2) + + """ + @spec signal(pid() | ExecRef.t(), signal(), keyword()) :: :ok | {:error, :not_found} + def signal(session_or_ref, sig, opts \\ []) + + def signal(%ExecRef{session: pid, ref: ref}, sig, opts) do + GenServer.call(pid, {:signal_exec, ref, sig, opts}) + end + + def signal(session, sig, opts) when is_pid(session) do + GenServer.call(session, {:signal_current, sig, opts}) + end + + @type signal :: + :sigint + | :sigkill + | :sigterm + | :sighup + | :sigquit + | :sigusr1 + | :sigusr2 + | :sigcont + | :sigstop + | :int + | :kill + | :term + | :hup + | :quit + | :usr1 + | :usr2 + | :cont + | :stop + | integer() + @doc """ Start a background job and return its job number and OS PID. @@ -1132,185 +1331,532 @@ defmodule Bash.Session do {:reply, env_map, state} end - def handle_call({:execute, ast}, from, state) do - handle_call({:execute, ast, []}, from, state) - end + def handle_call({:execute_async, ast, opts, caller}, _from, state) do + ref = make_ref() - def handle_call({:execute, ast, opts}, from, state) do - {:ok, collector} = OutputCollector.start_link() - Process.link(collector) + if state.current_execution do + pending = %{ref: ref, caller: caller, ast: ast, opts: opts} + {:reply, {:ok, ref}, %{state | pending_executions: state.pending_executions ++ [pending]}} + else + {:reply, {:ok, ref}, start_execution(ref, ast, opts, caller, state)} + end + end - # Create sinks from opts or use collector-backed sinks - stdout_sink = - case Keyword.get(opts, :stdout_into) do - nil -> Sink.collector(collector) - callback when is_function(callback, 1) -> callback - collectable -> Sink.stream(collectable, stream_type: :stdout) - end + def handle_call({:signal_current, _sig, _opts}, _from, %{current_execution: nil} = state) do + {:reply, {:error, :not_found}, state} + end - stderr_sink = - case Keyword.get(opts, :stderr_into) do - nil -> Sink.collector(collector) - callback when is_function(callback, 1) -> callback - collectable -> Sink.stream(collectable, stream_type: :stderr) - end + def handle_call({:signal_current, sig, opts}, _from, state) do + {:reply, :ok, do_cancel(state.current_execution, sig, state, opts)} + end - # Also support on_output callback by wrapping both sinks - {stdout_sink, stderr_sink} = - case Keyword.get(opts, :on_output) do - nil -> - {stdout_sink, stderr_sink} - - callback when is_function(callback, 1) -> - # Create sinks that write to both collector and callback - wrapped_stdout = fn chunk -> - stdout_sink.(chunk) - callback.(chunk) - :ok - end + def handle_call( + {:signal_exec, ref, sig, opts}, + _from, + %{current_execution: %{ref: ref} = exec} = state + ) do + {:reply, :ok, do_cancel(exec, sig, state, opts)} + end - wrapped_stderr = fn chunk -> - stderr_sink.(chunk) - callback.(chunk) - :ok - end + def handle_call({:signal_exec, ref, sig, _opts}, _from, state) do + case Enum.split_with(state.pending_executions, &(&1.ref == ref)) do + {[pending], remaining} -> + result = %CommandResult{ + command: "cancelled", + exit_code: cancel_exit_code(sig), + error: :cancelled + } - {wrapped_stdout, wrapped_stderr} - end + send(pending.caller, {pending.ref, {:error, result}}) + {:reply, :ok, %{state | pending_executions: remaining}} - # Create a callback function for starting background jobs synchronously - # This allows Scripts to start jobs immediately and get the OS PID for $! - start_bg_job_fn = fn foreground_ast, current_state -> - start_background_job_sync(foreground_ast, current_state, state) + {[], _} -> + {:reply, {:error, :not_found}, state} end + end - # Create a callback function for sending signals to jobs/processes synchronously - # This allows Scripts to send signals immediately (kill builtin) - signal_jobs_fn = fn signal, targets, current_state -> - send_signals_sync(signal, targets, current_state, state) - end + def handle_call({:start_background_job, opts}, _from, state) do + command = Keyword.fetch!(opts, :command) + args = Keyword.get(opts, :args, []) - state_with_sinks = %{ - state - | output_collector: collector, - stdout_sink: stdout_sink, - stderr_sink: stderr_sink, - start_background_job_fn: start_bg_job_fn, - signal_jobs_fn: signal_jobs_fn - } + job_number = state.next_job_number - # No executor_opts needed - sinks are on state now - case execute_command_in_session(ast, state_with_sinks, []) do - {:background, foreground_ast, _session_state} -> - # Command should be run in the background - # Create sinks backed by the session's PERSISTENT output_collector for the job - # The temporary sinks are used for the job notification [1], then transferred - persistent_stdout_sink = Sink.collector(state.output_collector) - persistent_stderr_sink = Sink.collector(state.output_collector) + job_opts = [ + job_number: job_number, + command: command, + args: args, + session_pid: self(), + working_dir: state.working_dir, + env: + Map.new(state.variables, fn {k, v} -> + {k, Variable.get(v, nil)} + end) + |> Map.to_list(), + # Pass sinks so job output streams directly to destination + stdout_sink: state.stdout_sink, + stderr_sink: state.stderr_sink, + # Also pass the session's persistent output collector for later retrieval + output_collector: state.output_collector + ] - bg_state = %{ + case DynamicSupervisor.start_child(state.job_supervisor, {JobProcess, job_opts}) do + {:ok, job_pid} -> + new_state = %{ state - | stdout_sink: persistent_stdout_sink, - stderr_sink: persistent_stderr_sink + | jobs: Map.put(state.jobs, job_number, job_pid), + next_job_number: job_number + 1, + previous_job: state.current_job, + current_job: job_number } - # Use temp sinks for job notification output (the [1] message) - state_with_sinks.stdout_sink.({:stdout, ""}) - - {:reply, reply_value, new_state} = do_start_background_job(foreground_ast, bg_state) - # Transfer job notification output to session's persistent collector - transfer_and_cleanup_collector(collector, state.output_collector) - {:reply, reply_value, new_state} + # We don't know the OS PID yet - it will come via job_started message + {:reply, {:ok, job_number, nil}, new_state} - {:background, executed_script, state_updates, - {:background, foreground_ast, _bg_session_state}} -> - # Background command from within a Script - apply state updates and start the job - # Create sinks backed by the session's PERSISTENT output_collector for the job - persistent_stdout_sink = Sink.collector(state.output_collector) - persistent_stderr_sink = Sink.collector(state.output_collector) + {:error, reason} -> + {:reply, {:error, reason}, state} + end + end - bg_state = apply_state_updates(state, state_updates) + def handle_call(:list_jobs, _from, state) do + jobs = + state.jobs + |> Enum.map(fn {_job_num, pid} -> + try do + JobProcess.get_job(pid) + catch + :exit, _ -> nil + end + end) + |> Enum.reject(&is_nil/1) + |> Enum.sort_by(& &1.job_number) - bg_state = %{ - bg_state - | stdout_sink: persistent_stdout_sink, - stderr_sink: persistent_stderr_sink - } + {:reply, jobs, state} + end - {:reply, _bg_result, new_state} = do_start_background_job(foreground_ast, bg_state) + def handle_call({:get_job, job_number}, _from, state) do + case Map.get(state.jobs, job_number) do + nil -> + {:reply, {:error, :not_found}, state} - # Transfer output (including job notification [1]) to session's persistent collector - # so it can be read after the command completes - transfer_and_cleanup_collector(collector, state.output_collector) + pid -> + try do + job = JobProcess.get_job(pid) + {:reply, {:ok, job}, state} + catch + :exit, _ -> {:reply, {:error, :not_found}, state} + end + end + end - # Update executed_script to use session's persistent collector since temp was cleaned up - executed_script_with_collector = %{executed_script | collector: state.output_collector} + def handle_call({:foreground_job, job_spec}, _from, state) do + job_number = resolve_job_spec(job_spec, state) - final_state = - new_state - |> append_to_history(executed_script_with_collector) - |> update_exit_status(executed_script_with_collector) + case Map.get(state.jobs, job_number) do + nil -> + {:reply, {:error, :no_such_job}, state} - {:reply, {:ok, executed_script_with_collector}, final_state} + pid -> + # This blocks until the job completes + result = JobProcess.foreground(pid) + {:reply, result, state} + end + end - # Handle special job control builtin return values from Script execution - # These include the executed_script so we can return it with proper collector - {:foreground_job, job_number, executed_script, script_updates} -> - # Keep collector alive for Script result - handle_foreground_job_with_script( - job_number, - executed_script, - script_updates, - collector, - from, - state - ) + def handle_call({:background_job, job_spec}, _from, state) do + job_number = resolve_job_spec(job_spec, state) - {:background_job, job_numbers, executed_script, script_updates} -> - # Keep collector alive for Script result - handle_background_jobs_with_script( - job_numbers, - executed_script, - script_updates, - collector, - state - ) + case state.jobs[job_number] do + pid when is_pid(pid) -> + result = JobProcess.background(pid) + {:reply, result, state} - {:wait_for_jobs, job_specs, executed_script, script_updates} -> - # Transfer collected output to persistent collector - transfer_and_cleanup_collector(collector, state.output_collector) - # Update script to use persistent collector - executed_script_with_collector = %{executed_script | collector: state.output_collector} + _ -> + {:reply, {:error, :no_such_job}, state} + end + end - handle_wait_for_jobs_with_script( - job_specs, - executed_script_with_collector, - script_updates, - from, - state - ) + def handle_call({:wait_for_jobs, nil}, _from, state) do + # Wait for all jobs + exit_codes = + Enum.map(state.jobs, fn {_job_num, pid} -> + case JobProcess.wait(pid) do + {:ok, code} -> code + {:error, _} -> 1 + end + end) - {:signal_jobs, signal, targets, executed_script, script_updates} -> - # Keep collector alive for Script result, perform signal operation - handle_signal_jobs_with_script( - signal, - targets, - executed_script, - script_updates, - collector, - state - ) + {:reply, {:ok, exit_codes}, state} + end - # Legacy job control returns (without script) - for non-Script callers - {:foreground_job, job_number} -> - cleanup_collector(collector) - handle_foreground_job(job_number, from, state) + def handle_call({:wait_for_jobs, job_specs}, _from, state) do + exit_codes = + Enum.map(job_specs, fn job_spec -> + job_number = resolve_job_spec(job_spec, state) - {:background_job, job_numbers} -> - cleanup_collector(collector) - handle_background_jobs(job_numbers, state) + case Map.get(state.jobs, job_number) do + nil -> + 127 - {:wait_for_jobs, job_specs} -> + pid -> + case JobProcess.wait(pid) do + {:ok, code} -> code + {:error, _} -> 1 + end + end + end) + + {:reply, {:ok, exit_codes}, state} + end + + def handle_call({:signal_job, job_spec, signal}, _from, state) do + job_number = resolve_job_spec(job_spec, state) + + case Map.get(state.jobs, job_number) do + nil -> + {:reply, {:error, :no_such_job}, state} + + pid -> + result = JobProcess.signal(pid, signal) + {:reply, result, state} + end + end + + def handle_call(:pop_completed_jobs, _from, state) do + {:reply, state.completed_jobs, %{state | completed_jobs: []}} + end + + def handle_call(:get_state, _from, state) do + {:reply, state, state} + end + + def handle_call({:load_api, module}, _from, state) do + new_state = do_load_api(state, module) + {:reply, :ok, new_state} + end + + def handle_call(:get_command_history, _from, state) do + {:reply, state.command_history, state} + end + + def handle_call(:get_output, _from, state) do + {stdout, stderr} = OutputCollector.output(state.output_collector) + {:reply, {IO.iodata_to_binary(stdout), IO.iodata_to_binary(stderr)}, state} + end + + def handle_call(:flush_output, _from, state) do + chunks = OutputCollector.flush(state.output_collector) + + {stdout, stderr} = + Enum.reduce(chunks, {[], []}, fn + {:stdout, data}, {out, err} -> {[data | out], err} + {:stderr, data}, {out, err} -> {out, [data | err]} + end) + + {:reply, + {stdout |> Enum.reverse() |> IO.iodata_to_binary(), + stderr |> Enum.reverse() |> IO.iodata_to_binary()}, state} + end + + @impl GenServer + def handle_info({:job_started, %Job{} = job}, state) do + # Update $! to the OS PID of the most recent background job + new_state = + if job.os_pid do + %{state | special_vars: Map.put(state.special_vars, "!", to_string(job.os_pid))} + else + state + end + + {:noreply, new_state} + end + + def handle_info({:job_completed, %Job{} = job}, state) do + # Remove from active jobs and add to completed for notification + new_jobs = Map.delete(state.jobs, job.job_number) + new_completed = [job | state.completed_jobs] + + # Update current/previous job references + new_state = + cond do + state.current_job == job.job_number -> + %{state | current_job: state.previous_job, previous_job: nil} + + state.previous_job == job.job_number -> + %{state | previous_job: nil} + + true -> + state + end + + {:noreply, %{new_state | jobs: new_jobs, completed_jobs: new_completed}} + end + + def handle_info({:job_stopped, %Job{} = _job}, state), do: {:noreply, state} + def handle_info({:job_resumed, %Job{} = _job}, state), do: {:noreply, state} + + def handle_info( + {task_ref, execution_result}, + %{current_execution: %{task_ref: task_ref} = exec} = state + ) + when is_reference(task_ref) do + Process.demonitor(task_ref, [:flush]) + synthetic_from = {exec.caller, exec.ref} + + case handle_execution_result(execution_result, exec.collector, synthetic_from, state) do + {:reply, reply, new_state} -> + send(exec.caller, {exec.ref, reply}) + {:noreply, start_next_or_idle(new_state)} + + {:noreply, new_state} -> + # A continuation handler will reply later via the synthetic from. + {:noreply, start_next_or_idle(new_state)} + end + end + + # Grace period elapsed for a cooperative cancel that never landed. + # Escalate to :sigkill if the same execution is still current. + def handle_info( + {:force_kill, ref}, + %{current_execution: %{ref: ref} = exec} = state + ) do + {:noreply, do_cancel(exec, :sigkill, state, [])} + end + + def handle_info({:force_kill, _ref}, state), do: {:noreply, state} + + # Task crashed unexpectedly (not via do_cancel, which demonitors first). + # Surface a crash result to the caller so await/2 doesn't hang. + def handle_info( + {:DOWN, task_ref, :process, _pid, reason}, + %{current_execution: %{task_ref: task_ref} = exec} = state + ) do + cleanup_collector(exec.collector) + + result = %CommandResult{ + command: "crashed", + exit_code: 1, + error: {:task_crashed, reason} + } + + send(exec.caller, {exec.ref, {:error, result}}) + {:noreply, start_next_or_idle(state)} + end + + def handle_info({:EXIT, pid, reason}, state) do + # Check if it's our job_supervisor - if so, we need to stop + if pid == state.job_supervisor do + {:stop, reason, state} + else + # Other linked processes exiting normally (like ports) - ignore + {:noreply, state} + end + end + + def handle_info({:continue_after_wait, executed_script, script_updates, from}, state) do + current_state = apply_state_updates(state, script_updates) + + # Set up sinks and callbacks for continuation output + # start_background_job_fn and signal_jobs_fn must be set so background jobs + # started during continuation are executed immediately (not deferred) + start_bg_job_fn = fn foreground_ast, current_session_state -> + start_background_job_sync(foreground_ast, current_session_state, state) + end + + signal_jobs_fn = fn signal, targets, current_session_state -> + send_signals_sync(signal, targets, current_session_state, state) + end + + continuation_state = %{ + current_state + | stdout_sink: Sink.collector(state.output_collector), + stderr_sink: Sink.collector(state.output_collector), + start_background_job_fn: start_bg_job_fn, + signal_jobs_fn: signal_jobs_fn + } + + case Script.continue_execution(executed_script, continuation_state) do + {:ok, final_script, continuation_updates} -> + new_state = + current_state + |> apply_state_updates(continuation_updates) + |> append_to_history(final_script) + |> update_exit_status(final_script) + + GenServer.reply(from, {:ok, final_script}) + {:noreply, new_state} + + {:wait_for_jobs, job_specs, continued_script, continuation_updates} -> + merged_updates = Map.merge(script_updates, continuation_updates) + + handle_wait_for_jobs_with_script( + job_specs, + continued_script, + merged_updates, + from, + current_state + ) + + {terminal, final_script, continuation_updates} + when terminal in [:exit, :error] -> + new_state = + current_state + |> apply_state_updates(continuation_updates) + |> append_to_history(final_script) + |> update_exit_status(final_script) + + GenServer.reply(from, {terminal, final_script}) + {:noreply, new_state} + + _ -> + # Fallback: no remaining statements, reply with what we have + new_state = + current_state + |> append_to_history(executed_script) + |> update_exit_status(executed_script) + + GenServer.reply(from, {:ok, executed_script}) + {:noreply, new_state} + end + end + + def handle_info(_msg, state), do: {:noreply, state} + + @impl GenServer + def terminate(_reason, state) do + stop_all_coprocs(state) + close_all_file_descriptors(state) + + if state.job_supervisor && Process.alive?(state.job_supervisor) do + DynamicSupervisor.stop(state.job_supervisor, :shutdown) + end + + :ok + end + + # Process the result of an execution Task and return either an immediate + # reply tuple (delivered via send to the caller) or :noreply when a + # continuation handler (e.g. fg/wait) will reply later via the synthetic + # `from`. + defp handle_execution_result(execution_result, collector, from, state) do + case execution_result do + {:cancelled, sig} -> + transfer_and_cleanup_collector(collector, state.output_collector) + + result = %CommandResult{ + command: "cancelled", + exit_code: cancel_exit_code(sig), + error: :cancelled + } + + {:reply, {:error, result}, state} + + {:background, foreground_ast, _session_state} -> + # Command should be run in the background + persistent_stdout_sink = Sink.collector(state.output_collector) + persistent_stderr_sink = Sink.collector(state.output_collector) + + bg_state = %{ + state + | stdout_sink: persistent_stdout_sink, + stderr_sink: persistent_stderr_sink + } + + # Use temp collector sink for job notification output (the [1] message) + Sink.collector(collector).({:stdout, ""}) + + {:reply, reply_value, new_state} = do_start_background_job(foreground_ast, bg_state) + transfer_and_cleanup_collector(collector, state.output_collector) + {:reply, reply_value, new_state} + + {:background, executed_script, state_updates, + {:background, foreground_ast, _bg_session_state}} -> + # Background command from within a Script - apply state updates and start the job + # Create sinks backed by the session's PERSISTENT output_collector for the job + persistent_stdout_sink = Sink.collector(state.output_collector) + persistent_stderr_sink = Sink.collector(state.output_collector) + + bg_state = apply_state_updates(state, state_updates) + + bg_state = %{ + bg_state + | stdout_sink: persistent_stdout_sink, + stderr_sink: persistent_stderr_sink + } + + {:reply, _bg_result, new_state} = do_start_background_job(foreground_ast, bg_state) + + # Transfer output (including job notification [1]) to session's persistent collector + # so it can be read after the command completes + transfer_and_cleanup_collector(collector, state.output_collector) + + # Update executed_script to use session's persistent collector since temp was cleaned up + executed_script_with_collector = %{executed_script | collector: state.output_collector} + + final_state = + new_state + |> append_to_history(executed_script_with_collector) + |> update_exit_status(executed_script_with_collector) + + {:reply, {:ok, executed_script_with_collector}, final_state} + + # Handle special job control builtin return values from Script execution + # These include the executed_script so we can return it with proper collector + {:foreground_job, job_number, executed_script, script_updates} -> + # Keep collector alive for Script result + handle_foreground_job_with_script( + job_number, + executed_script, + script_updates, + collector, + from, + state + ) + + {:background_job, job_numbers, executed_script, script_updates} -> + # Keep collector alive for Script result + handle_background_jobs_with_script( + job_numbers, + executed_script, + script_updates, + collector, + state + ) + + {:wait_for_jobs, job_specs, executed_script, script_updates} -> + # Transfer collected output to persistent collector + transfer_and_cleanup_collector(collector, state.output_collector) + # Update script to use persistent collector + executed_script_with_collector = %{executed_script | collector: state.output_collector} + + handle_wait_for_jobs_with_script( + job_specs, + executed_script_with_collector, + script_updates, + from, + state + ) + + {:signal_jobs, signal, targets, executed_script, script_updates} -> + # Keep collector alive for Script result, perform signal operation + handle_signal_jobs_with_script( + signal, + targets, + executed_script, + script_updates, + collector, + state + ) + + # Legacy job control returns (without script) - for non-Script callers + {:foreground_job, job_number} -> + cleanup_collector(collector) + handle_foreground_job(job_number, from, state) + + {:background_job, job_numbers} -> + cleanup_collector(collector) + handle_background_jobs(job_numbers, state) + + {:wait_for_jobs, job_specs} -> # Transfer collected output to persistent collector before cleaning up transfer_and_cleanup_collector(collector, state.output_collector) handle_wait_for_jobs(job_specs, from, state) @@ -1387,428 +1933,227 @@ defmodule Bash.Session do should_errexit?(result, new_state) -> {:reply, {:exit, result}, new_state} - should_onecmd_exit?(new_state, state_updates) -> - {:reply, {:exit, result}, new_state} - - true -> - {:reply, {:error, result}, new_state} - end - - {:error, result} -> - # Error result without state updates - # For Script results, keep collector alive for output reading - handle_collector_for_result(collector, result) - - new_state = - state - |> append_to_history(result) - |> update_exit_status(result) - - # Check errexit - # Check onecmd (-t) - exit after reading and executing one command - cond do - should_errexit?(result, new_state) -> - {:reply, {:exit, result}, new_state} - - should_onecmd_exit?(new_state) -> - {:reply, {:exit, result}, new_state} - - true -> - {:reply, {:error, result}, new_state} - end - - {:exit, result, state_updates} -> - # Script exit with state updates - # For Script results, keep collector alive for output reading - handle_collector_for_result(collector, result) - - new_state = - state - |> append_to_history(result) - |> apply_state_updates(state_updates) - |> update_exit_status(result) - - {:reply, {:exit, result}, new_state} - - {:exit, result} -> - # Script exit without state updates - # For Script results, keep collector alive for output reading - handle_collector_for_result(collector, result) - - new_state = - state - |> append_to_history(result) - |> update_exit_status(result) - - {:reply, {:exit, result}, new_state} - - {:exec, result, state_updates} -> - # Exec replaces shell with command - with state updates - # For Script results, keep collector alive for output reading - handle_collector_for_result(collector, result) - - new_state = - state - |> append_to_history(result) - |> apply_state_updates(state_updates) - |> update_exit_status(result) - - {:reply, {:exec, result}, new_state} - - {:exec, result} -> - # Exec replaces shell with command - without state updates - # For Script results, keep collector alive for output reading - handle_collector_for_result(collector, result) - - new_state = - state - |> append_to_history(result) - |> update_exit_status(result) - - {:reply, {:exec, result}, new_state} - - result -> - # Other result patterns - cleanup collector - cleanup_collector(collector) - {:reply, result, state} - end - end - - def handle_call({:start_background_job, opts}, _from, state) do - command = Keyword.fetch!(opts, :command) - args = Keyword.get(opts, :args, []) - - job_number = state.next_job_number - - job_opts = [ - job_number: job_number, - command: command, - args: args, - session_pid: self(), - working_dir: state.working_dir, - env: - Map.new(state.variables, fn {k, v} -> - {k, Variable.get(v, nil)} - end) - |> Map.to_list(), - # Pass sinks so job output streams directly to destination - stdout_sink: state.stdout_sink, - stderr_sink: state.stderr_sink, - # Also pass the session's persistent output collector for later retrieval - output_collector: state.output_collector - ] - - case DynamicSupervisor.start_child(state.job_supervisor, {JobProcess, job_opts}) do - {:ok, job_pid} -> - new_state = %{ - state - | jobs: Map.put(state.jobs, job_number, job_pid), - next_job_number: job_number + 1, - previous_job: state.current_job, - current_job: job_number - } - - # We don't know the OS PID yet - it will come via job_started message - {:reply, {:ok, job_number, nil}, new_state} - - {:error, reason} -> - {:reply, {:error, reason}, state} - end - end - - def handle_call(:list_jobs, _from, state) do - jobs = - state.jobs - |> Enum.map(fn {_job_num, pid} -> - try do - JobProcess.get_job(pid) - catch - :exit, _ -> nil - end - end) - |> Enum.reject(&is_nil/1) - |> Enum.sort_by(& &1.job_number) - - {:reply, jobs, state} - end - - def handle_call({:get_job, job_number}, _from, state) do - case Map.get(state.jobs, job_number) do - nil -> - {:reply, {:error, :not_found}, state} - - pid -> - try do - job = JobProcess.get_job(pid) - {:reply, {:ok, job}, state} - catch - :exit, _ -> {:reply, {:error, :not_found}, state} - end - end - end - - def handle_call({:foreground_job, job_spec}, _from, state) do - job_number = resolve_job_spec(job_spec, state) - - case Map.get(state.jobs, job_number) do - nil -> - {:reply, {:error, :no_such_job}, state} - - pid -> - # This blocks until the job completes - result = JobProcess.foreground(pid) - {:reply, result, state} - end - end - - def handle_call({:background_job, job_spec}, _from, state) do - job_number = resolve_job_spec(job_spec, state) - - case state.jobs[job_number] do - pid when is_pid(pid) -> - result = JobProcess.background(pid) - {:reply, result, state} - - _ -> - {:reply, {:error, :no_such_job}, state} - end - end - - def handle_call({:wait_for_jobs, nil}, _from, state) do - # Wait for all jobs - exit_codes = - Enum.map(state.jobs, fn {_job_num, pid} -> - case JobProcess.wait(pid) do - {:ok, code} -> code - {:error, _} -> 1 - end - end) - - {:reply, {:ok, exit_codes}, state} - end - - def handle_call({:wait_for_jobs, job_specs}, _from, state) do - exit_codes = - Enum.map(job_specs, fn job_spec -> - job_number = resolve_job_spec(job_spec, state) - - case Map.get(state.jobs, job_number) do - nil -> - 127 + should_onecmd_exit?(new_state, state_updates) -> + {:reply, {:exit, result}, new_state} - pid -> - case JobProcess.wait(pid) do - {:ok, code} -> code - {:error, _} -> 1 - end + true -> + {:reply, {:error, result}, new_state} end - end) - {:reply, {:ok, exit_codes}, state} - end + {:error, result} -> + # Error result without state updates + # For Script results, keep collector alive for output reading + handle_collector_for_result(collector, result) - def handle_call({:signal_job, job_spec, signal}, _from, state) do - job_number = resolve_job_spec(job_spec, state) + new_state = + state + |> append_to_history(result) + |> update_exit_status(result) - case Map.get(state.jobs, job_number) do - nil -> - {:reply, {:error, :no_such_job}, state} + # Check errexit + # Check onecmd (-t) - exit after reading and executing one command + cond do + should_errexit?(result, new_state) -> + {:reply, {:exit, result}, new_state} - pid -> - result = JobProcess.signal(pid, signal) - {:reply, result, state} - end - end + should_onecmd_exit?(new_state) -> + {:reply, {:exit, result}, new_state} - def handle_call(:pop_completed_jobs, _from, state) do - {:reply, state.completed_jobs, %{state | completed_jobs: []}} - end + true -> + {:reply, {:error, result}, new_state} + end - def handle_call(:get_state, _from, state) do - {:reply, state, state} - end + {:exit, result, state_updates} -> + # Script exit with state updates + # For Script results, keep collector alive for output reading + handle_collector_for_result(collector, result) - def handle_call({:load_api, module}, _from, state) do - new_state = do_load_api(state, module) - {:reply, :ok, new_state} - end + new_state = + state + |> append_to_history(result) + |> apply_state_updates(state_updates) + |> update_exit_status(result) - def handle_call(:get_command_history, _from, state) do - {:reply, state.command_history, state} - end + {:reply, {:exit, result}, new_state} - def handle_call(:get_output, _from, state) do - {stdout, stderr} = OutputCollector.output(state.output_collector) - {:reply, {IO.iodata_to_binary(stdout), IO.iodata_to_binary(stderr)}, state} - end + {:exit, result} -> + # Script exit without state updates + # For Script results, keep collector alive for output reading + handle_collector_for_result(collector, result) - def handle_call(:flush_output, _from, state) do - chunks = OutputCollector.flush(state.output_collector) + new_state = + state + |> append_to_history(result) + |> update_exit_status(result) - {stdout, stderr} = - Enum.reduce(chunks, {[], []}, fn - {:stdout, data}, {out, err} -> {[data | out], err} - {:stderr, data}, {out, err} -> {out, [data | err]} - end) + {:reply, {:exit, result}, new_state} - {:reply, - {stdout |> Enum.reverse() |> IO.iodata_to_binary(), - stderr |> Enum.reverse() |> IO.iodata_to_binary()}, state} - end + {:exec, result, state_updates} -> + # Exec replaces shell with command - with state updates + # For Script results, keep collector alive for output reading + handle_collector_for_result(collector, result) - @impl GenServer - def handle_cast({:execute_async, ast}, state) do - # Execute command asynchronously - don't reply to caller - case execute_command_in_session(ast, state) do - {:ok, result, state_updates} -> - # Note: append to history first, then apply updates - # This allows history -c to clear including itself new_state = state |> append_to_history(result) |> apply_state_updates(state_updates) + |> update_exit_status(result) - {:noreply, new_state} + {:reply, {:exec, result}, new_state} - {:ok, result} -> - new_state = append_to_history(state, result) - {:noreply, new_state} + {:exec, result} -> + # Exec replaces shell with command - without state updates + # For Script results, keep collector alive for output reading + handle_collector_for_result(collector, result) - {:error, result} -> - new_state = append_to_history(state, result) - {:noreply, new_state} + new_state = + state + |> append_to_history(result) + |> update_exit_status(result) - _other -> - # For background jobs and other special cases, just continue - {:noreply, state} + {:reply, {:exec, result}, new_state} + + result -> + # Other result patterns - cleanup collector + cleanup_collector(collector) + {:reply, result, state} end end - @impl GenServer - def handle_info({:job_started, %Job{} = job}, state) do - # Update $! to the OS PID of the most recent background job - new_state = - if job.os_pid do - %{state | special_vars: Map.put(state.special_vars, "!", to_string(job.os_pid))} - else - state - end + # Spawn a Task running the AST and stash its bookkeeping in current_execution. + defp start_execution(ref, ast, opts, caller, state) do + {:ok, collector} = OutputCollector.start_link() + Process.link(collector) - {:noreply, new_state} - end + session_pid = self() + original_state = state - def handle_info({:job_completed, %Job{} = job}, state) do - # Remove from active jobs and add to completed for notification - new_jobs = Map.delete(state.jobs, job.job_number) - new_completed = [job | state.completed_jobs] + task = + Task.async(fn -> + sinks = build_sinks(opts, collector) - # Update current/previous job references - new_state = - cond do - state.current_job == job.job_number -> - %{state | current_job: state.previous_job, previous_job: nil} + state_with_sinks = %{ + original_state + | output_collector: collector, + stdout_sink: sinks.stdout, + stderr_sink: sinks.stderr, + start_background_job_fn: fn fg, current -> + start_background_job_sync(fg, current, original_state, session_pid) + end, + signal_jobs_fn: fn s, t, current -> + send_signals_sync(s, t, current, original_state) + end + } - state.previous_job == job.job_number -> - %{state | previous_job: nil} + try do + execute_command_in_session(ast, state_with_sinks, []) + catch + :throw, {:cancelled, sig, runtime_state} -> + run_cancel_traps(sig, runtime_state) + {:cancelled, sig} + end + end) - true -> - state - end + exec = %{ + ref: ref, + caller: caller, + task_pid: task.pid, + task_ref: task.ref, + collector: collector + } - {:noreply, %{new_state | jobs: new_jobs, completed_jobs: new_completed}} + %{state | current_execution: exec} end - def handle_info({:job_stopped, %Job{} = _job}, state), do: {:noreply, state} - def handle_info({:job_resumed, %Job{} = _job}, state), do: {:noreply, state} + # Drain the head of the pending queue and start it; clears current_execution + # if the queue is empty. + defp start_next_or_idle(%{pending_executions: []} = state) do + %{state | current_execution: nil} + end - def handle_info({:EXIT, pid, reason}, state) do - # Check if it's our job_supervisor - if so, we need to stop - if pid == state.job_supervisor do - {:stop, reason, state} - else - # Other linked processes exiting normally (like ports) - ignore - {:noreply, state} - end + defp start_next_or_idle(%{pending_executions: [next | rest]} = state) do + start_execution( + next.ref, + next.ast, + next.opts, + next.caller, + %{state | pending_executions: rest, current_execution: nil} + ) end - def handle_info({:continue_after_wait, executed_script, script_updates, from}, state) do - current_state = apply_state_updates(state, script_updates) + defp build_sinks(opts, collector) do + stdout = sink_from_opt(Keyword.get(opts, :stdout_into), collector, :stdout) + stderr = sink_from_opt(Keyword.get(opts, :stderr_into), collector, :stderr) - # Set up sinks and callbacks for continuation output - # start_background_job_fn and signal_jobs_fn must be set so background jobs - # started during continuation are executed immediately (not deferred) - start_bg_job_fn = fn foreground_ast, current_session_state -> - start_background_job_sync(foreground_ast, current_session_state, state) - end + case Keyword.get(opts, :on_output) do + nil -> + %{stdout: stdout, stderr: stderr} - signal_jobs_fn = fn signal, targets, current_session_state -> - send_signals_sync(signal, targets, current_session_state, state) + callback when is_function(callback, 1) -> + %{ + stdout: wrap_with_callback(stdout, callback), + stderr: wrap_with_callback(stderr, callback) + } end + end - continuation_state = %{ - current_state - | stdout_sink: Sink.collector(state.output_collector), - stderr_sink: Sink.collector(state.output_collector), - start_background_job_fn: start_bg_job_fn, - signal_jobs_fn: signal_jobs_fn - } + defp sink_from_opt(nil, collector, _type), do: Sink.collector(collector) + defp sink_from_opt(callback, _collector, _type) when is_function(callback, 1), do: callback - case Script.continue_execution(executed_script, continuation_state) do - {:ok, final_script, continuation_updates} -> - new_state = - current_state - |> apply_state_updates(continuation_updates) - |> append_to_history(final_script) - |> update_exit_status(final_script) + defp sink_from_opt(collectable, _collector, type), + do: Sink.stream(collectable, stream_type: type) - GenServer.reply(from, {:ok, final_script}) - {:noreply, new_state} + defp wrap_with_callback(sink, callback) do + fn chunk -> + sink.(chunk) + callback.(chunk) + :ok + end + end - {:wait_for_jobs, job_specs, continued_script, continuation_updates} -> - merged_updates = Map.merge(script_updates, continuation_updates) + # Cooperative cancel: signal the Task. It yields at the next loop iteration, + # runs any matching trap and the EXIT trap, then returns {:cancelled, sig} + # which flows through handle_execution_result like any other completion. + # If opts has :grace, schedules a force-kill if the cooperative cancel + # hasn't landed within that many milliseconds. + defp do_cancel(exec, sig, state, opts) when sig in [:int, :sigint, :term, :sigterm, 2, 15] do + send(exec.task_pid, {:cancel, sig}) - handle_wait_for_jobs_with_script( - job_specs, - continued_script, - merged_updates, - from, - current_state - ) + case Keyword.get(opts, :grace) do + nil -> :ok + grace when is_integer(grace) -> Process.send_after(self(), {:force_kill, exec.ref}, grace) + end - {terminal, final_script, continuation_updates} - when terminal in [:exit, :error] -> - new_state = - current_state - |> apply_state_updates(continuation_updates) - |> append_to_history(final_script) - |> update_exit_status(final_script) + state + end - GenServer.reply(from, {terminal, final_script}) - {:noreply, new_state} + # Hard cancel: untrappable kill. No traps run; partial output is preserved. + defp do_cancel(exec, sig, state, _opts) when sig in [:kill, :sigkill, 9] do + Process.demonitor(exec.task_ref, [:flush]) + Process.exit(exec.task_pid, :kill) + transfer_and_cleanup_collector(exec.collector, state.output_collector) - _ -> - # Fallback: no remaining statements, reply with what we have - new_state = - current_state - |> append_to_history(executed_script) - |> update_exit_status(executed_script) + result = %CommandResult{ + command: "cancelled", + exit_code: 137, + error: :cancelled + } - GenServer.reply(from, {:ok, executed_script}) - {:noreply, new_state} - end + send(exec.caller, {exec.ref, {:error, result}}) + start_next_or_idle(state) end - def handle_info(_msg, state), do: {:noreply, state} - - @impl GenServer - def terminate(_reason, state) do - stop_all_coprocs(state) - close_all_file_descriptors(state) + defp cancel_exit_code(sig) when sig in [:int, :sigint, 2], do: 130 + defp cancel_exit_code(sig) when sig in [:term, :sigterm, 15], do: 143 + defp cancel_exit_code(_), do: 130 - if state.job_supervisor && Process.alive?(state.job_supervisor) do - DynamicSupervisor.stop(state.job_supervisor, :shutdown) - end + defp signal_name(sig) when sig in [:int, :sigint, 2], do: "INT" + defp signal_name(sig) when sig in [:term, :sigterm, 15], do: "TERM" + defp signal_name(_), do: "INT" + defp run_cancel_traps(sig, state) do + Trap.run(state, signal_name(sig)) + Trap.run(state, "EXIT") :ok end @@ -1877,7 +2222,7 @@ defmodule Bash.Session do defp resolve_job_spec(_, state), do: state.current_job - defp execute_command_in_session(ast, state, opts \\ []), + defp execute_command_in_session(ast, state, opts), do: Executor.execute(ast, state, nil, opts) # Check if errexit should cause shell to exit @@ -2082,7 +2427,12 @@ defmodule Bash.Session do # Start a background job synchronously and return the OS PID # This is used by Script executor to get $! immediately when & is encountered # Returns {os_pid_string, updated_session_state} or {:error, reason} - defp start_background_job_sync(foreground_ast, current_session_state, original_state) do + defp start_background_job_sync( + foreground_ast, + current_session_state, + original_state, + session_pid \\ self() + ) do {command, args, _command_string} = case foreground_ast do %Compound{} -> @@ -2108,7 +2458,8 @@ defmodule Bash.Session do args, foreground_ast, current_session_state, - original_state + original_state, + session_pid ) end end @@ -2118,7 +2469,8 @@ defmodule Bash.Session do args, _foreground_ast, current_session_state, - original_state + original_state, + session_pid ) do job_number = case Map.get(current_session_state, :next_job_number) do @@ -2134,7 +2486,7 @@ defmodule Bash.Session do job_number: job_number, command: command, args: args, - session_pid: self(), + session_pid: session_pid, working_dir: current_session_state.working_dir, env: Enum.map(current_session_state.variables, fn {k, v} -> @@ -2762,7 +3114,6 @@ defmodule Bash.Session do end defp extract_command_info(%Compound{statements: statements}, state) do - # Find the first actual command (skip operators) first_cmd = Enum.find(statements, fn {:operator, _} -> false @@ -2779,9 +3130,7 @@ defmodule Bash.Session do defp extract_command_info(_ast, _state), do: {"", []} # Build a command string from AST for display - defp build_command_string(ast) do - to_string(ast) - end + defp build_command_string(ast), do: to_string(ast) # Convert a Word to string, expanding variables defp word_to_string(%Bash.AST.Word{parts: parts}, state) do @@ -3031,19 +3380,9 @@ defmodule Bash.Session do # Transfer output from temporary collector to session's persistent collector before cleanup defp transfer_and_cleanup_collector(temp_collector, session_collector) do - # Get output from temporary collector {stdout_iodata, stderr_iodata} = OutputCollector.output(temp_collector) - - # Write to session's persistent collector (convert iodata to binary) - if IO.iodata_length(stdout_iodata) > 0 do - OutputCollector.write(session_collector, :stdout, IO.iodata_to_binary(stdout_iodata)) - end - - if IO.iodata_length(stderr_iodata) > 0 do - OutputCollector.write(session_collector, :stderr, IO.iodata_to_binary(stderr_iodata)) - end - - # Now cleanup the temporary collector + write_if_present(session_collector, :stdout, stdout_iodata) + write_if_present(session_collector, :stderr, stderr_iodata) cleanup_collector(temp_collector) end @@ -3056,49 +3395,41 @@ defmodule Bash.Session do # Transfer output from temp collector to session's persistent collector # For Scripts, also keep the temp collector alive for result.collector access defp transfer_to_persistent_collector(temp_collector, session_collector, %Bash.Script{}) do - # Copy output to session's persistent collector (for session_stdout access) {stdout_iodata, stderr_iodata} = OutputCollector.output(temp_collector) - - if IO.iodata_length(stdout_iodata) > 0 do - OutputCollector.write(session_collector, :stdout, IO.iodata_to_binary(stdout_iodata)) - end - - if IO.iodata_length(stderr_iodata) > 0 do - OutputCollector.write(session_collector, :stderr, IO.iodata_to_binary(stderr_iodata)) - end - - # Keep temp collector alive for result.collector access (get_stdout/get_stderr) + write_if_present(session_collector, :stdout, stdout_iodata) + write_if_present(session_collector, :stderr, stderr_iodata) :ok end defp transfer_to_persistent_collector(temp_collector, session_collector, _result) do - # Non-Script results: transfer and cleanup transfer_and_cleanup_collector(temp_collector, session_collector) end - # Helper to append command result to history - # Accept both CommandResult (legacy) and AST nodes with execution results + defp write_if_present(_collector, _stream, []), do: :ok + defp write_if_present(_collector, _stream, ""), do: :ok + + defp write_if_present(collector, stream, iodata), + do: OutputCollector.write(collector, stream, IO.iodata_to_binary(iodata)) + defp append_to_history(state, %CommandResult{} = result) do %{state | command_history: state.command_history ++ [result]} end defp append_to_history(state, %{exit_code: exit_code} = result) when not is_nil(exit_code) do - # AST node with execution results %{state | command_history: state.command_history ++ [result]} end defp append_to_history(state, _result), do: state - # Convert signal name to number defp signal_to_number(sig) when is_integer(sig), do: sig - defp signal_to_number(:sigterm), do: 15 - defp signal_to_number(:sigkill), do: 9 - defp signal_to_number(:sigstop), do: 19 - defp signal_to_number(:sigcont), do: 18 - defp signal_to_number(:sighup), do: 1 - defp signal_to_number(:sigint), do: 2 - defp signal_to_number(:sigquit), do: 3 - defp signal_to_number(:sigusr1), do: 10 - defp signal_to_number(:sigusr2), do: 12 + defp signal_to_number(sig) when sig in ~w[sigterm term]a, do: 15 + defp signal_to_number(sig) when sig in ~w[sigkill kill]a, do: 9 + defp signal_to_number(sig) when sig in ~w[sigstop stop]a, do: 19 + defp signal_to_number(sig) when sig in ~w[sigcont cont]a, do: 18 + defp signal_to_number(sig) when sig in ~w[sighup hup]a, do: 1 + defp signal_to_number(sig) when sig in ~w[sigint int]a, do: 2 + defp signal_to_number(sig) when sig in ~w[sigquit quit]a, do: 3 + defp signal_to_number(sig) when sig in ~w[sigusr1 usr1]a, do: 10 + defp signal_to_number(sig) when sig in ~w[sigusr2 usr2]a, do: 12 defp signal_to_number(other), do: other end diff --git a/lib/bash/session/exec_ref.ex b/lib/bash/session/exec_ref.ex new file mode 100644 index 0000000..b2c4f18 --- /dev/null +++ b/lib/bash/session/exec_ref.ex @@ -0,0 +1,20 @@ +defmodule Bash.Session.ExecRef do + @moduledoc """ + Opaque handle to an in-flight execution. + + Returned by `Bash.Session.execute_async/3` and accepted by + `Bash.Session.await/2` and `Bash.Session.signal/2`. + + The caller holds `:monitor` so `await/2` can detect a session crash + without a round-trip through the GenServer. + """ + + @enforce_keys [:session, :ref, :monitor] + defstruct [:session, :ref, :monitor] + + @type t :: %__MODULE__{ + session: pid(), + ref: reference(), + monitor: reference() + } +end diff --git a/mix.exs b/mix.exs index 1dd06c0..c169692 100644 --- a/mix.exs +++ b/mix.exs @@ -138,7 +138,8 @@ defmodule Bash.MixProject do ], "Job Control": [ Bash.Job, - Bash.JobProcess + Bash.JobProcess, + Bash.Session.ExecRef ], Builtins: [ Bash.Builtin, diff --git a/test/bash/session_test.exs b/test/bash/session_test.exs index 6b23a34..48fe828 100644 --- a/test/bash/session_test.exs +++ b/test/bash/session_test.exs @@ -599,6 +599,540 @@ defmodule Bash.SessionTest do end end + describe "execute_async/3 and await/2" do + test "returns ExecRef and resolves via await", %{session: session} do + {:ok, ast} = Bash.Parser.parse("echo hello") + assert {:ok, %Session.ExecRef{} = ref} = Session.execute_async(session, ast) + assert {:ok, _result} = Session.await(ref) + end + + test "await with timeout returns {:error, :timeout} for long script", %{session: session} do + {:ok, ast} = Bash.Parser.parse("while true; do :; done") + {:ok, ref} = Session.execute_async(session, ast) + assert {:error, :timeout} = Session.await(ref, 50) + Session.signal(ref, :sigint) + end + + test "await reports session_down if session crashes mid-execution", %{session: session} do + {:ok, ast} = Bash.Parser.parse("while true; do :; done") + {:ok, ref} = Session.execute_async(session, ast) + + capture_log(fn -> + Process.exit(session, :kill) + assert {:error, {:session_down, :killed}} = Session.await(ref, 1000) + end) + end + + test "concurrent execute_async calls queue and run sequentially", %{session: session} do + Session.load_api(session, Bash.SessionTest.TestAPI) + pid = pid_arg(self()) + + # Each chunk reports when it starts. Sequential execution implies + # ast1 reports start, finishes, ast2 reports start. Any parallel + # execution would interleave or deliver ast2's start before ast1's + # body completes. + {:ok, ast1} = + Bash.Parser.parse("session_test.notify '#{pid}'; echo a; echo b") + + {:ok, ast2} = + Bash.Parser.parse("session_test.notify '#{pid}'; echo c; echo d") + + {:ok, ref1} = Session.execute_async(session, ast1) + {:ok, ref2} = Session.execute_async(session, ast2) + assert ref1 != ref2 + + # ast1 begins immediately; ast2 must NOT have started yet + assert_receive :script_running, 1000 + refute_received :script_running + + # await ast1; only after that does ast2 start + assert {:ok, _} = Session.await(ref1, 1000) + assert_receive :script_running, 1000 + + assert {:ok, _} = Session.await(ref2, 1000) + + # Output appears in strictly sequential order. + assert session_stdout(session) == "a\nb\nc\nd\n" + end + end + + describe "signal/2" do + setup %{session: session} do + Session.load_api(session, Bash.SessionTest.TestAPI) + :ok + end + + defp running_loop_script(test_pid) do + """ + session_test.notify '#{pid_arg(test_pid)}' + while true; do :; done + """ + end + + defp start_loop(session) do + {:ok, ast} = Bash.Parser.parse(running_loop_script(self())) + {:ok, ref} = Session.execute_async(session, ast) + assert_receive :script_running, 1000 + ref + end + + test "signal(session, :sigint) cancels foreground with exit 130", %{session: session} do + ref = start_loop(session) + assert :ok = Session.signal(session, :sigint) + assert {:error, %{exit_code: 130, error: :cancelled}} = Session.await(ref, 1000) + end + + test "signal(ref, :sigint) cancels that specific execution", %{session: session} do + ref = start_loop(session) + assert :ok = Session.signal(ref, :sigint) + assert {:error, %{exit_code: 130}} = Session.await(ref, 1000) + end + + test "signal(ref, :sigkill) returns exit code 137", %{session: session} do + ref = start_loop(session) + assert :ok = Session.signal(ref, :sigkill) + assert {:error, %{exit_code: 137}} = Session.await(ref, 1000) + end + + test "signal(session, :sigint) is {:error, :not_found} when idle", %{session: session} do + assert {:error, :not_found} = Session.signal(session, :sigint) + end + + test "signal(stale_ref, :sigint) returns :not_found after that execution finishes", + %{session: session} do + {:ok, ast} = Bash.Parser.parse("echo done") + {:ok, ref} = Session.execute_async(session, ast) + {:ok, _} = Session.await(ref) + assert {:error, :not_found} = Session.signal(ref, :sigint) + end + + test "session state is preserved after cancellation", %{session: session} do + run_script(session, "x=42") + ref = start_loop(session) + Session.signal(ref, :sigint) + Session.await(ref, 1000) + + assert get_var(session, "x") == "42" + end + + test "session is usable after cancellation", %{session: session} do + ref = start_loop(session) + Session.signal(ref, :sigint) + Session.await(ref, 1000) + + result = run_script(session, "echo hello") + assert get_stdout(result) == "hello\n" + end + + test "stdout written before cancel is preserved on the session", %{session: session} do + pid = pid_arg(self()) + + {:ok, ast} = + Bash.Parser.parse(""" + echo before-cancel + session_test.notify '#{pid}' + while true; do :; done + """) + + {:ok, ref} = Session.execute_async(session, ast) + assert_receive :script_running, 1000 + Session.signal(ref, :sigint) + Session.await(ref, 1000) + + assert session_stdout(session) =~ "before-cancel" + end + + test "stderr written before cancel is preserved on the session", %{session: session} do + pid = pid_arg(self()) + + {:ok, ast} = + Bash.Parser.parse(""" + echo oops >&2 + session_test.notify '#{pid}' + while true; do :; done + """) + + {:ok, ref} = Session.execute_async(session, ast) + assert_receive :script_running, 1000 + Session.signal(ref, :sigint) + Session.await(ref, 1000) + + assert session_stderr(session) =~ "oops" + end + + test "INT trap runs on :sigint and its output is collected", %{session: session} do + pid = pid_arg(self()) + + {:ok, ast} = + Bash.Parser.parse(""" + trap 'echo trap-ran' INT + session_test.notify '#{pid}' + while true; do :; done + """) + + {:ok, ref} = Session.execute_async(session, ast) + assert_receive :script_running, 1000 + Session.signal(ref, :sigint) + assert {:error, %{exit_code: 130}} = Session.await(ref, 1000) + + assert session_stdout(session) =~ "trap-ran" + end + + test "EXIT trap also runs on :sigint", %{session: session} do + pid = pid_arg(self()) + + {:ok, ast} = + Bash.Parser.parse(""" + trap 'echo on-exit' EXIT + session_test.notify '#{pid}' + while true; do :; done + """) + + {:ok, ref} = Session.execute_async(session, ast) + assert_receive :script_running, 1000 + Session.signal(ref, :sigint) + assert {:error, %{exit_code: 130}} = Session.await(ref, 1000) + + assert session_stdout(session) =~ "on-exit" + end + + test ":sigterm runs TERM trap and exits 143", %{session: session} do + pid = pid_arg(self()) + + {:ok, ast} = + Bash.Parser.parse(""" + trap 'echo term-ran' TERM + session_test.notify '#{pid}' + while true; do :; done + """) + + {:ok, ref} = Session.execute_async(session, ast) + assert_receive :script_running, 1000 + Session.signal(ref, :sigterm) + assert {:error, %{exit_code: 143}} = Session.await(ref, 1000) + + assert session_stdout(session) =~ "term-ran" + end + + test ":sigkill bypasses traps", %{session: session} do + pid = pid_arg(self()) + + {:ok, ast} = + Bash.Parser.parse(""" + trap 'echo should-not-run' INT + trap 'echo also-not' EXIT + session_test.notify '#{pid}' + while true; do :; done + """) + + {:ok, ref} = Session.execute_async(session, ast) + assert_receive :script_running, 1000 + Session.signal(ref, :sigkill) + assert {:error, %{exit_code: 137}} = Session.await(ref, 1000) + + refute session_stdout(session) =~ "should-not-run" + refute session_stdout(session) =~ "also-not" + end + + test "non-loop scripts also yield to cancellation between statements", + %{session: session} do + pid = pid_arg(session) + + {:ok, ast} = + Bash.Parser.parse(""" + echo first + session_test.cancel_now '#{pid}' + echo should-not-run + """) + + {:ok, ref} = Session.execute_async(session, ast) + assert {:error, %{exit_code: 130}} = Session.await(ref, 1000) + + stdout = session_stdout(session) + assert stdout =~ "first" + refute stdout =~ "should-not-run" + end + + test ":grace escalates to :sigkill when cooperative cancel doesn't land", + %{session: session} do + pid = pid_arg(self()) + + # session_test.spin is a defbash with no yield points — cooperative + # cancel can't land while it's running. + {:ok, ast} = + Bash.Parser.parse(""" + session_test.notify '#{pid}' + session_test.spin + """) + + {:ok, ref} = Session.execute_async(session, ast) + assert_receive :script_running, 1000 + + Session.signal(ref, :sigint, grace: 50) + assert {:error, %{exit_code: 137, error: :cancelled}} = Session.await(ref, 1000) + end + + test ":grace does not escalate when cancel lands within the window", + %{session: session} do + pid = pid_arg(self()) + + # while-true yields every iteration — cancel lands fast. + {:ok, ast} = + Bash.Parser.parse(""" + session_test.notify '#{pid}' + while true; do :; done + """) + + {:ok, %Session.ExecRef{ref: internal_ref} = ref} = Session.execute_async(session, ast) + assert_receive :script_running, 1000 + + Session.signal(ref, :sigint, grace: 5_000) + assert {:error, %{exit_code: 130}} = Session.await(ref, 1000) + + # Simulate the stale force_kill timer firing after the cooperative + # cancel already cleared current_execution. The catch-all handle_info + # clause must absorb it without affecting subsequent work. + send(session, {:force_kill, internal_ref}) + + assert {:error, :not_found} = Session.signal(session, :sigint) + assert {:ok, _} = Session.execute(session, elem(Bash.Parser.parse("echo ok"), 1)) + end + + test "abnormal Task crash surfaces an error and clears current_execution", + %{session: session} do + {:ok, ast} = Bash.Parser.parse("session_test.boom") + + capture_log(fn -> + {:ok, ref} = Session.execute_async(session, ast) + assert {:error, %{error: {:task_crashed, _}}} = Session.await(ref, 1000) + end) + + # Session is still alive and accepts new executions + assert {:error, :not_found} = Session.signal(session, :sigint) + assert {:ok, _result} = Session.execute(session, elem(Bash.Parser.parse("echo ok"), 1)) + end + end + + describe "execute_async queue" do + setup %{session: session} do + Session.load_api(session, Bash.SessionTest.TestAPI) + :ok + end + + test "cancelling current drains pending and runs the next", %{session: session} do + pid_self = pid_arg(self()) + pid_session = pid_arg(session) + + {:ok, looping} = + Bash.Parser.parse(""" + session_test.notify '#{pid_self}' + while true; do :; done + """) + + {:ok, follow_up} = Bash.Parser.parse("echo follow-up") + + {:ok, ref1} = Session.execute_async(session, looping) + {:ok, ref2} = Session.execute_async(session, follow_up) + + assert_receive :script_running, 1000 + Session.signal(session, :sigint) + + assert {:error, %{exit_code: 130}} = Session.await(ref1, 1000) + assert {:ok, _} = Session.await(ref2, 1000) + assert session_stdout(session) =~ "follow-up" + + # State after cancel + drain: idle, ready for more + assert {:error, :not_found} = Session.signal(session, :sigint) + _ = pid_session + end + + test "signal(pending_ref, :sigint) cancels a queued execution with exit 130", + %{session: session} do + {:ok, ast1} = + Bash.Parser.parse( + "session_test.notify '#{pid_arg(self())}'; sleep_loop=true; while $sleep_loop; do :; done" + ) + + {:ok, ast2} = Bash.Parser.parse("echo should-not-run") + + {:ok, ref1} = Session.execute_async(session, ast1) + {:ok, ref2} = Session.execute_async(session, ast2) + + assert_receive :script_running, 1000 + assert :ok = Session.signal(ref2, :sigint) + assert {:error, %{exit_code: 130, error: :cancelled}} = Session.await(ref2, 1000) + + Session.signal(ref1, :sigint) + Session.await(ref1, 1000) + + refute session_stdout(session) =~ "should-not-run" + end + + test "signal(pending_ref, :sigterm) yields exit 143", %{session: session} do + {:ok, ast1} = + Bash.Parser.parse("session_test.notify '#{pid_arg(self())}'; while true; do :; done") + + {:ok, ast2} = Bash.Parser.parse("echo never") + + {:ok, ref1} = Session.execute_async(session, ast1) + {:ok, ref2} = Session.execute_async(session, ast2) + + assert_receive :script_running, 1000 + assert :ok = Session.signal(ref2, :sigterm) + assert {:error, %{exit_code: 143}} = Session.await(ref2, 1000) + + Session.signal(ref1, :sigint) + Session.await(ref1, 1000) + end + + test "signal(unknown_ref, :sigint) returns :not_found", %{session: session} do + stale = %Session.ExecRef{ + session: session, + ref: make_ref(), + monitor: Process.monitor(session) + } + + assert {:error, :not_found} = Session.signal(stale, :sigint) + end + + test "cancelling pending does not affect current or other pending", %{session: session} do + {:ok, looping} = + Bash.Parser.parse("session_test.notify '#{pid_arg(self())}'; while true; do :; done") + + {:ok, ast2} = Bash.Parser.parse("echo two") + {:ok, ast3} = Bash.Parser.parse("echo three") + + {:ok, ref1} = Session.execute_async(session, looping) + {:ok, ref2} = Session.execute_async(session, ast2) + {:ok, ref3} = Session.execute_async(session, ast3) + + assert_receive :script_running, 1000 + assert :ok = Session.signal(ref2, :sigint) + + Session.signal(ref1, :sigint) + assert {:error, %{exit_code: 130}} = Session.await(ref1, 1000) + assert {:error, %{exit_code: 130}} = Session.await(ref2, 1000) + assert {:ok, _} = Session.await(ref3, 1000) + + assert session_stdout(session) =~ "three" + refute session_stdout(session) =~ "two" + end + end + + describe "cancellation with command policy and virtual filesystem" do + alias Bash.Filesystem.ETS, as: FS + + defp start_restricted_ets_session(context) do + table = FS.new(%{"/workspace/data.txt" => "seed\n"}) + + registry_name = Module.concat([context.module, ETSRegistry, context.test]) + supervisor_name = Module.concat([context.module, ETSSupervisor, context.test]) + + start_supervised!({Registry, keys: :unique, name: registry_name}, id: registry_name) + + start_supervised!( + {DynamicSupervisor, strategy: :one_for_one, name: supervisor_name}, + id: supervisor_name + ) + + {:ok, session} = + Session.new( + id: "#{context.test}", + filesystem: {FS, table}, + working_dir: "/workspace", + registry: registry_name, + supervisor: supervisor_name, + command_policy: [commands: :no_external], + apis: [Bash.SessionTest.TestAPI] + ) + + {session, table} + end + + defp pid_arg(pid), do: pid |> :erlang.pid_to_list() |> to_string() + + test "cancels mid-stream while writing to virtual filesystem", context do + {session, table} = start_restricted_ets_session(context) + pid = pid_arg(session) + + {:ok, ast} = + Bash.Parser.parse(""" + i=0 + while [ $i -lt 100 ]; do + echo "line $i" >> output.txt + if [ $i -eq 5 ]; then + session_test.cancel_now '#{pid}' + fi + i=$((i + 1)) + done + """) + + {:ok, ref} = Session.execute_async(session, ast) + assert {:error, %{exit_code: 130}} = Session.await(ref, 5000) + + # Partial writes that landed before the cancel are preserved in the VFS + assert {:ok, contents} = FS.read(table, "/workspace/output.txt") + for i <- 1..5, do: assert(contents =~ "line #{i}") + refute contents =~ "line 6" + + # The session is still alive and the VFS still works + run_script(session, "echo done >> done.txt") + assert {:ok, "done\n"} = FS.read(table, "/workspace/done.txt") + end + + test "cancels mid-stream while reading from virtual filesystem", context do + {session, _table} = start_restricted_ets_session(context) + pid = pid_arg(session) + + run_script(session, "echo content > big.txt") + + {:ok, ast} = + Bash.Parser.parse(""" + n=0 + while [ $n -lt 100 ]; do + contents=$(cat big.txt) + echo "$contents" >> log.txt + n=$((n + 1)) + if [ $n -eq 10 ]; then + session_test.cancel_now '#{pid}' + fi + done + """) + + {:ok, ref} = Session.execute_async(session, ast) + assert {:error, %{exit_code: 130}} = Session.await(ref, 5000) + + result = run_script(session, "echo still-here") + assert get_stdout(result) =~ "still-here" + end + + test "command policy still enforces after cancellation", context do + {session, _table} = start_restricted_ets_session(context) + pid = pid_arg(session) + + {:ok, ast} = Bash.Parser.parse("session_test.cancel_now '#{pid}'; :") + {:ok, ref} = Session.execute_async(session, ast) + Session.await(ref, 5000) + + # External commands remain blocked by policy after cancel + result = run_script(session, "ls /etc") + assert result.exit_code != 0 + end + + test "session state persists across cancel with VFS + policy", context do + {session, table} = start_restricted_ets_session(context) + pid = pid_arg(session) + run_script(session, "x=hello-world") + + {:ok, ast} = Bash.Parser.parse("session_test.cancel_now '#{pid}'; :") + {:ok, ref} = Session.execute_async(session, ast) + Session.await(ref, 5000) + + assert get_var(session, "x") == "hello-world" + assert {:ok, "seed\n"} = FS.read(table, "/workspace/data.txt") + end + end + defmodule TestAPI do @moduledoc false use Bash.Interop, namespace: "session_test" @@ -607,5 +1141,27 @@ defmodule Bash.SessionTest do Bash.puts("pong\n") :ok end + + defbash cancel_now([pid_str], _state) do + pid = pid_str |> String.to_charlist() |> :erlang.list_to_pid() + Bash.Session.signal(pid, :sigint) + :ok + end + + defbash notify([pid_str], _state) do + pid = pid_str |> String.to_charlist() |> :erlang.list_to_pid() + send(pid, :script_running) + :ok + end + + defbash boom(_args, _state) do + raise "boom" + end + + defbash spin(_args, _state) do + spin_loop() + end + + defp spin_loop, do: spin_loop() end end