From 863be2f739d300e590537223dc5056aea3036275 Mon Sep 17 00:00:00 2001 From: Eric Lunderberg Date: Thu, 9 May 2024 21:02:52 +0000 Subject: [PATCH] [Disco] Treat hangup of disco worker process as kShutdown Prior to this commit, each disco worker needed to receive `DiscoAction::kShutdown` in order to close cleanly. While this is sent from the destructor of `ProcessSessionObj`, which owns the worker processes, this does not guarantee that the disco workers will receive the shutdown command. For example, the controller process holding the `ProcessSessionObj` may reach a timeout and be terminated, preventing it from sending the `DiscoAction::kShutdown` command. This commit updates the disco worker to check for a closed pipe that occurs between two packets, and to treat this as if the `DiscoAction::kShutdown` command were received. A closed pipe that occurs at any other location is still treated as an error and reported. --- src/runtime/disco/process_session.cc | 38 +++++++++++++++++++++++++--- 1 file changed, 34 insertions(+), 4 deletions(-) diff --git a/src/runtime/disco/process_session.cc b/src/runtime/disco/process_session.cc index 6474db479e94..6687a64e7f85 100644 --- a/src/runtime/disco/process_session.cc +++ b/src/runtime/disco/process_session.cc @@ -48,11 +48,21 @@ class DiscoPipeMessageQueue : private dmlc::Stream, private DiscoProtocol(num_args); + type_codes = ArenaAlloc(num_args); + TVMArgsSetter setter(values, type_codes); + setter(0, static_cast(DiscoAction::kShutDown)); + setter(1, 0); + } else { + RPCReference::RecvPackedSeq(&values, &type_codes, &num_args, this); + } return TVMArgs(values, type_codes, num_args); } @@ -62,18 +72,38 @@ class DiscoPipeMessageQueue : private dmlc::Stream, private DiscoProtocolRecycleAll(); RPCCode code = RPCCode::kReturn; this->Read(&code); + return false; } size_t Read(void* data, size_t size) final {