Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion otherlibs/stdune/src/signal_stubs.c
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
#include <caml/mlvalues.h>

#ifdef _WIN32
#include <caml/fail.h>
#else
#include <caml/unixsupport.h>
#include <signal.h>
#include <sys/ioctl.h>
#endif
Expand Down
4 changes: 2 additions & 2 deletions src/dune_scheduler/dune
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,6 @@
fiber
memo
threads.posix
dune_trace
dune_util)
dune_trace
dune_util)
(synopsis "Internal Dune library, do not use!"))
9 changes: 6 additions & 3 deletions src/dune_scheduler/process_watcher.ml
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,6 @@ let dyn_of_process_state = function
| Zombie _ -> Dyn.variant "Zombie" []
;;

(* This mutable table is safe: it does not interact with the state we track in
the build system. *)
type t =
{ mutex : Mutex.t Lazy.t
; something_is_running : Condition.t option
Expand Down Expand Up @@ -214,7 +212,12 @@ let init events =
in
if Sys.win32
then (
let (_ : Thread.t) = Thread0.spawn ~name:"process-watcher" (fun () -> run_win32 t) in
let (_ : Thread.t) =
Thread0.spawn ~name:"process-watcher" (fun () ->
run_win32 t)
in
());
t
;;

let shutdown (_ : t) = ()
1 change: 1 addition & 0 deletions src/dune_scheduler/process_watcher.mli
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,4 @@ val is_running : t -> Pid.t -> bool
val killall : t -> int -> unit

val wait_unix : t -> Fiber.fill list
val shutdown : t -> unit
1 change: 1 addition & 0 deletions src/dune_scheduler/scheduler.ml
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ let kill_and_wait_for_all_processes t =
then (
Unix.kill (Unix.getpid ()) (Signal.to_int Thread.signal_watcher_interrupt);
Thread.join t.signal_watcher);
Process_watcher.shutdown t.process_watcher;
!saw_signal
;;

Expand Down
16 changes: 12 additions & 4 deletions src/dune_scheduler/signal_watcher.ml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,16 @@ open Stdune

let signos = List.map Thread0.interrupt_signals ~f:Signal.to_int

let macos_sigchld_warning =
{|

*****************************************
* Ignoring invalid macOS SIGCHLD return *
*****************************************

|}
;;

let warning =
{|

Expand Down Expand Up @@ -55,10 +65,8 @@ let run ~print_ctrl_c_warning q : unit =
if n = 2 && print_ctrl_c_warning then prerr_endline warning;
if n = 3 then sys_exit 1
| _ ->
(* we only blocked the signals above *)
Code_error.raise
"signal watcher received an unexpected signal"
[ "signal", Signal.to_dyn signal ]
prerr_endline macos_sigchld_warning;
Event.Queue.send_job_completed_ready q
done
;;

Expand Down
7 changes: 4 additions & 3 deletions src/dune_scheduler/thread0.ml
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,14 @@ let signal_watcher_interrupt : Signal.t = Usr1
let signal_watcher_debug : Signal.t = Usr2

(* These are the signals that will make the scheduler attempt to terminate dune
or signal to dune to reap a process *)
or signal to dune to reap a process.
*)
let interrupt_signals : Signal.t list =
[ signal_watcher_interrupt; signal_watcher_debug; Chld; Int; Quit; Term ]
;;

(* In addition, the scheduler also blocks some other signals so that only
designated threads can handle them by unblocking *)
designated threads can handle them by unblocking. *)
let blocked_signals : Signal.t list = Terminal_signals.signals @ interrupt_signals

let block_signals =
Expand All @@ -35,7 +36,7 @@ let create =
then Thread.create
else
(* On unix, we make sure to block signals globally before starting a
thread so that only the signal watcher thread can receive signals. *)
thread so that only the signal watcher thread can receive signals. *)
fun f x ->
Lazy.force block_signals;
Thread.create f x
Expand Down
4 changes: 2 additions & 2 deletions src/dune_scheduler/thread0.mli
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ val signal_watcher_interrupt : Signal.t
(** Magic signal to make dune debugging info *)
val signal_watcher_debug : Signal.t

val join : t -> unit
val interrupt_signals : Signal.t list
val join : t -> unit
val wait_signal : int list -> int
val spawn : name:string -> (unit -> unit) -> Thread.t
val delay : float -> unit
val wait_signal : int list -> int
70 changes: 70 additions & 0 deletions test/expect-tests/dune_scheduler/async_io_tests.ml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,65 @@ let config =
}
;;

let print_relevant_output stdout stderr =
let print_if_nonempty s =
let s = String.trim s in
if not (String.is_empty s) then print_endline s
in
print_if_nonempty stdout;
print_if_nonempty stderr
;;

let signal_cleanup_repro_prog =
let inline_test_dir = Filename.dirname Sys.executable_name in
let build_dir = Filename.dirname inline_test_dir in
let prog = Filename.concat build_dir "signal_cleanup_repro.exe" in
if Sys.file_exists prog
then prog
else
Code_error.raise
"could not locate signal cleanup repro executable"
[ "cwd", Dyn.string (Sys.getcwd ())
; "test_executable", Dyn.string Sys.executable_name
; "expected", Dyn.string prog
]
;;

let signal_cleanup_repro_env () =
let env =
Env.initial
|> Env.add ~var:"DUNE_SIGNAL_REPRO_SHUTDOWN" ~value:"signal"
|> Env.add ~var:"DUNE_SIGNAL_REPRO_ITERS" ~value:"1"
|> Env.add ~var:"DUNE_SIGNAL_REPRO_JOBS" ~value:"128"
|> Env.add ~var:"DUNE_SIGNAL_REPRO_DELAY" ~value:"0.002"
in
Env.to_unix env |> Array.of_list
;;

let run_signal_cleanup_repro () =
let ic, oc, ec =
Unix.open_process_args_full
signal_cleanup_repro_prog
[| signal_cleanup_repro_prog |]
(signal_cleanup_repro_env ())
in
close_out oc;
let stdout = In_channel.input_all ic in
let stderr = In_channel.input_all ec in
match Unix.close_process_full (ic, oc, ec) with
| WEXITED 0 ->
let stdout = String.trim stdout in
if String.equal stdout "ok after 1 iterations"
then true
else (
print_relevant_output stdout stderr;
false)
| status ->
ignore status;
print_relevant_output stdout stderr;
false
;;

let%expect_test "read readiness" =
(Scheduler.Run.go config ~on_event:(fun _ _ -> ())
@@ fun () ->
Expand Down Expand Up @@ -91,3 +150,14 @@ let%expect_test "cancel task" =
(fun () -> Async_io.Task.cancel task));
[%expect {| successfully cancelled |}]
;;

let%expect_test "SIGCHLD wakeups survive interrupted high-concurrency builds" =
let rec loop n =
if n = 0
then print_endline "passed 20 fresh interrupted runs"
else if run_signal_cleanup_repro ()
then loop (n - 1)
in
loop 20;
[%expect {| passed 20 fresh interrupted runs |}]
;;
13 changes: 10 additions & 3 deletions test/expect-tests/dune_scheduler/dune
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
(library
(name dune_scheduler_tests)
(inline_tests)
(modules async_io_tests)
(inline_tests
(deps signal_cleanup_repro.exe))
(preprocess
(pps ppx_expect))
(libraries
Expand All @@ -9,9 +11,14 @@
unix
threads.posix
fiber
;; This is because of the (implicit_transitive_deps false)
;; in dune-project
; This is because of the (implicit_transitive_deps false)
; in dune-project
ppx_expect.config
ppx_expect.config_types
base
ppx_inline_test.config))

(executable
(name signal_cleanup_repro)
(modules signal_cleanup_repro)
(libraries stdune dune_scheduler unix threads.posix fiber))
144 changes: 144 additions & 0 deletions test/expect-tests/dune_scheduler/signal_cleanup_repro.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
open Stdune
module Event = Dune_scheduler__Event
module Process_watcher = Dune_scheduler__Process_watcher
module Signal_watcher = Dune_scheduler__Signal_watcher
module Thread0 = Dune_scheduler__Thread0

let debug =
match Sys.getenv_opt "DUNE_SIGNAL_REPRO_DEBUG" with
| None -> fun _ -> ()
| Some _ -> prerr_endline
;;

let env_int name ~default =
match Sys.getenv_opt name with
| None -> default
| Some value -> Int.of_string value |> Option.value_exn
;;

let env_float name ~default =
match Sys.getenv_opt name with
| None -> default
| Some value -> Float.of_string value |> Option.value_exn
;;

let sh =
Bin.which "sh" ~path:(Env_path.path Env.initial)
|> Option.map ~f:Path.to_string
|> Option.value_exn
;;

let spawn_sleep command =
let argv = [| sh; "-c"; command |] in
Unix.create_process sh argv Unix.stdin Unix.stdout Unix.stderr |> Pid.of_int
;;

let reset_signal_mask () =
let signos = List.map Thread0.interrupt_signals ~f:Signal.to_int in
ignore (Unix.sigprocmask SIG_UNBLOCK signos : int list)
;;

let jobs_per_iteration = env_int "DUNE_SIGNAL_REPRO_JOBS" ~default:64
let iterations = env_int "DUNE_SIGNAL_REPRO_ITERS" ~default:100
let interrupt_after = env_float "DUNE_SIGNAL_REPRO_DELAY" ~default:0.005
let timeout_after = env_float "DUNE_SIGNAL_REPRO_TIMEOUT" ~default:5.0
let commands = [| "sleep 0.01"; "sleep 0.02"; "sleep 0.03"; "sleep 0.05" |]

type shutdown_mode =
| Queue
| Signal

let shutdown_mode =
match Sys.getenv_opt "DUNE_SIGNAL_REPRO_SHUTDOWN" with
| Some "signal" -> Signal
| Some "queue" | None -> Queue
| Some mode ->
Code_error.raise "unknown signal repro shutdown mode" [ "mode", Dyn.string mode ]
;;

let cleanup_iteration events process_watcher =
debug "starting iteration";
let pids =
List.init jobs_per_iteration ~f:(fun i ->
spawn_sleep commands.(i mod Array.length commands))
in
List.iter pids ~f:(fun pid ->
let job : Event.job =
{ pid; is_process_group_leader = false; ivar = Fiber.Ivar.create () }
in
Process_watcher.register_job process_watcher job);
let (_ : Thread.t) =
Thread.create
(fun () ->
Thread.delay interrupt_after;
match shutdown_mode with
| Queue -> Event.Queue.send_shutdown events (Signal Int)
| Signal -> Unix.kill (Unix.getpid ()) Sys.sigint)
()
in
let rec wait_for_shutdown () =
match Event.Queue.next events with
| Shutdown (Signal Int) -> ()
| Shutdown _ -> wait_for_shutdown ()
| Job_complete_ready ->
ignore (Process_watcher.wait_unix process_watcher : Fiber.fill list);
wait_for_shutdown ()
| Fiber_fill_ivar _
| File_watcher_task _
| Build_inputs_changed _
| File_system_sync _
| File_system_watcher_terminated -> wait_for_shutdown ()
in
wait_for_shutdown ();
debug "got shutdown";
Process_watcher.killall process_watcher Sys.sigkill;
while Event.Queue.pending_jobs events > 0 do
match Event.Queue.next events with
| Shutdown _ -> ()
| Job_complete_ready ->
ignore (Process_watcher.wait_unix process_watcher : Fiber.fill list)
| Fiber_fill_ivar _
| File_watcher_task _
| Build_inputs_changed _
| File_system_sync _
| File_system_watcher_terminated -> ()
done
;;

let () =
Printexc.record_backtrace true;
try
reset_signal_mask ();
let events = Event.Queue.create () in
debug "created queue";
let signal_watcher = Signal_watcher.init ~print_ctrl_c_warning:false events in
debug "started signal watcher";
let finished = ref false in
let (_ : Thread.t) =
Thread.create
(fun () ->
Thread.delay timeout_after;
if not !finished
then (
prerr_endline
"signal watcher cleanup repro timed out while exercising SIGINT";
exit 2))
()
in
let process_watcher = Process_watcher.init events in
debug "started process watcher";
for _ = 1 to iterations do
cleanup_iteration events process_watcher
done;
debug "finished iterations";
Unix.kill (Unix.getpid ()) (Signal.to_int Thread0.signal_watcher_interrupt);
Thread0.join signal_watcher;
finished := true;
debug "joined signal watcher";
Printf.printf "ok after %d iterations\n%!" iterations
with
| exn ->
prerr_endline (Printexc.to_string exn);
prerr_string (Printexc.get_backtrace ());
exit 1
;;
Empty file.
Loading