From a50d45ce09b6f609a3959e6ac03195207c7e218d Mon Sep 17 00:00:00 2001 From: Douglas Raillard Date: Thu, 6 Apr 2023 18:14:45 +0100 Subject: [PATCH 1/4] connection: Terminate background commands on close() Ensure all background commands are terminated before we close the connection. --- devlib/connection.py | 11 +++++++++++ devlib/utils/ssh.py | 3 --- 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/devlib/connection.py b/devlib/connection.py index 020db209d..9699d7656 100644 --- a/devlib/connection.py +++ b/devlib/connection.py @@ -66,6 +66,7 @@ def __init__(self): self._closed = False self._close_lock = threading.Lock() self.busybox = None + self.logger = logging.getLogger('Connection') def cancel_running_command(self): bg_cmds = set(self._current_bg_cmds) @@ -83,11 +84,21 @@ def _close(self): """ def close(self): + + def finish_bg(): + bg_cmds = set(self._current_bg_cmds) + n = len(bg_cmds) + if n: + self.logger.debug(f'Canceling {n} background commands before closing connection') + for bg_cmd in bg_cmds: + bg_cmd.cancel() + # Locking the closing allows any thread to safely call close() as long # as the connection can be closed from a thread that is not the one it # started its life in. with self._close_lock: if not self._closed: + finish_bg() self._close() self._closed = True diff --git a/devlib/utils/ssh.py b/devlib/utils/ssh.py index 39d704645..c3f71f55c 100644 --- a/devlib/utils/ssh.py +++ b/devlib/utils/ssh.py @@ -696,9 +696,6 @@ def callback(out_streams, name, chunk): def _close(self): logger.debug('Logging out {}@{}'.format(self.username, self.host)) with _handle_paramiko_exceptions(): - bg_cmds = set(self._current_bg_cmds) - for bg_cmd in bg_cmds: - bg_cmd.close() self.client.close() def _execute_command(self, command, as_root, log, timeout, executor): From 0e16b778ef0e8fa3e815927be3e84fbef6900837 Mon Sep 17 00:00:00 2001 From: Douglas Raillard Date: Thu, 6 Apr 2023 21:02:17 +0100 Subject: [PATCH 2/4] connection: Add BackgroundCommand.__init__(conn) Add a constructor to BackgroundCommand so that the command knows the connection it's tied to. --- devlib/connection.py | 11 ++++++++--- devlib/host.py | 2 +- devlib/utils/android.py | 13 +++++++++---- 3 files changed, 18 insertions(+), 8 deletions(-) diff --git a/devlib/connection.py b/devlib/connection.py index 9699d7656..b10fde39a 100644 --- a/devlib/connection.py +++ b/devlib/connection.py @@ -120,6 +120,10 @@ class BackgroundCommand(ABC): Instances of this class can be used as context managers, with the same semantic as :class:`subprocess.Popen`. """ + + def __init__(self, conn): + self.conn = conn + @abstractmethod def send_signal(self, sig): """ @@ -235,7 +239,8 @@ class PopenBackgroundCommand(BackgroundCommand): :class:`subprocess.Popen`-based background command. """ - def __init__(self, popen): + def __init__(self, conn, popen): + super().__init__(conn=conn) self.popen = popen def send_signal(self, sig): @@ -291,9 +296,9 @@ class ParamikoBackgroundCommand(BackgroundCommand): :mod:`paramiko`-based background command. """ def __init__(self, conn, chan, pid, as_root, cmd, stdin, stdout, stderr, redirect_thread): + super().__init__(conn=conn) self.chan = chan self.as_root = as_root - self.conn = conn self._pid = pid self._stdin = stdin self._stdout = stdout @@ -454,7 +459,7 @@ class AdbBackgroundCommand(BackgroundCommand): """ def __init__(self, conn, adb_popen, pid, as_root): - self.conn = conn + super().__init__(conn=conn) self.as_root = as_root self.adb_popen = adb_popen self._pid = pid diff --git a/devlib/host.py b/devlib/host.py index a6796da5f..d067d807c 100644 --- a/devlib/host.py +++ b/devlib/host.py @@ -141,7 +141,7 @@ def preexec_fn(): shell=True, preexec_fn=preexec_fn, ) - bg_cmd = PopenBackgroundCommand(popen) + bg_cmd = PopenBackgroundCommand(self, popen) self._current_bg_cmds.add(bg_cmd) return bg_cmd diff --git a/devlib/utils/android.py b/devlib/utils/android.py index b57910da6..50bca3011 100755 --- a/devlib/utils/android.py +++ b/devlib/utils/android.py @@ -334,7 +334,12 @@ def _push_pull(self, action, sources, dest, timeout): adb_command(self.device, command, timeout=timeout, adb_server=self.adb_server) else: with self.transfer_mgr.manage(sources, dest, action): - bg_cmd = adb_command_background(self.device, command, adb_server=self.adb_server) + bg_cmd = adb_command_background( + device=self.device, + conn=self, + command=command, + adb_server=self.adb_server + ) self.transfer_mgr.set_transfer_and_wait(bg_cmd) # pylint: disable=unused-argument @@ -692,11 +697,11 @@ def adb_command(device, command, timeout=None, adb_server=None): return output -def adb_command_background(device, command, adb_server=None): +def adb_command_background(device, conn, command, adb_server=None): full_command = get_adb_command(device, command, adb_server) logger.debug(full_command) - proc = get_subprocess(full_command, shell=True) - cmd = PopenBackgroundCommand(proc) + popen = get_subprocess(full_command, shell=True) + cmd = PopenBackgroundCommand(conn=conn, popen=popen) return cmd From 57139040e3ecccc126850f4d23eb3753dc46e8e8 Mon Sep 17 00:00:00 2001 From: Douglas Raillard Date: Thu, 6 Apr 2023 21:07:29 +0100 Subject: [PATCH 3/4] connection: Make BackgroundCommand deregister itself Instead of loosely tracking the current BackgroundCommand for a connection in _current_bg_cmds WeakSet attribute, use a normal set and make the BackgroundCommand deregister itself upon termination. This allows canceling any outstanding BackgroundCommand when the connection is closed. Currently, destroying a BackgroundCommand will not cancel the command but devlib will simply loose track of it, and some threads will likely fail in the background if they try to use the now broken connection. --- devlib/connection.py | 78 +++++++++++++++++++++++++++++------------ devlib/host.py | 1 - devlib/utils/android.py | 1 - devlib/utils/ssh.py | 5 +-- 4 files changed, 57 insertions(+), 28 deletions(-) diff --git a/devlib/connection.py b/devlib/connection.py index b10fde39a..f098a54d8 100644 --- a/devlib/connection.py +++ b/devlib/connection.py @@ -62,7 +62,7 @@ class ConnectionBase(InitCheckpoint): Base class for all connections. """ def __init__(self): - self._current_bg_cmds = WeakSet() + self._current_bg_cmds = set() self._closed = False self._close_lock = threading.Lock() self.busybox = None @@ -123,8 +123,18 @@ class BackgroundCommand(ABC): def __init__(self, conn): self.conn = conn + conn._current_bg_cmds.add(self) + + def _deregister(self): + try: + self.conn._current_bg_cmds.remove(self) + except KeyError: + pass @abstractmethod + def _send_signal(self, sig): + pass + def send_signal(self, sig): """ Send a POSIX signal to the background command's process group ID @@ -133,6 +143,11 @@ def send_signal(self, sig): :param signal: Signal to send. :type signal: signal.Signals """ + try: + self._send_signal(signal.SIGKILL) + finally: + # Deregister if the command has finished + self.poll() def kill(self): """ @@ -145,8 +160,11 @@ def cancel(self, kill_timeout=_KILL_TIMEOUT): Try to gracefully terminate the process by sending ``SIGTERM``, then waiting for ``kill_timeout`` to send ``SIGKILL``. """ - if self.poll() is None: - self._cancel(kill_timeout=kill_timeout) + try: + if self.poll() is None: + self._cancel(kill_timeout=kill_timeout) + finally: + self._deregister() @abstractmethod def _cancel(self, kill_timeout): @@ -156,10 +174,17 @@ def _cancel(self, kill_timeout): pass @abstractmethod + def _wait(self): + pass + def wait(self): """ Block until the background command completes, and return its exit code. """ + try: + self._wait() + finally: + self._deregister() def communicate(self, input=b'', timeout=None): """ @@ -177,10 +202,17 @@ def _communicate(self, input, timeout): pass @abstractmethod + def _poll(self): + pass + def poll(self): """ Return exit code if the command has exited, None otherwise. """ + retcode = self._poll() + if retcode is not None: + self._deregister() + return retcode @property @abstractmethod @@ -217,6 +249,9 @@ def pid(self): """ @abstractmethod + def _close(self): + pass + def close(self): """ Close all opened streams and then wait for command completion. @@ -226,6 +261,10 @@ def close(self): .. note:: If the command is writing to its stdout/stderr, it might be blocked on that and die when the streams are closed. """ + try: + return self._close() + finally: + self._deregister() def __enter__(self): return self @@ -243,7 +282,7 @@ def __init__(self, conn, popen): super().__init__(conn=conn) self.popen = popen - def send_signal(self, sig): + def _send_signal(self, sig): return os.killpg(self.popen.pid, sig) @property @@ -262,13 +301,13 @@ def stderr(self): def pid(self): return self.popen.pid - def wait(self): + def _wait(self): return self.popen.wait() def _communicate(self, input, timeout): return _popen_communicate(self, self.popen, input, timeout) - def poll(self): + def _poll(self): return self.popen.poll() def _cancel(self, kill_timeout): @@ -279,17 +318,15 @@ def _cancel(self, kill_timeout): except subprocess.TimeoutExpired: os.killpg(os.getpgid(popen.pid), signal.SIGKILL) - def close(self): + def _close(self): self.popen.__exit__(None, None, None) return self.popen.returncode def __enter__(self): + super().__enter__() self.popen.__enter__() return self - def __exit__(self, *args, **kwargs): - self.popen.__exit__(*args, **kwargs) - class ParamikoBackgroundCommand(BackgroundCommand): """ @@ -306,7 +343,7 @@ def __init__(self, conn, chan, pid, as_root, cmd, stdin, stdout, stderr, redirec self.redirect_thread = redirect_thread self.cmd = cmd - def send_signal(self, sig): + def _send_signal(self, sig): # If the command has already completed, we don't want to send a signal # to another process that might have gotten that PID in the meantime. if self.poll() is not None: @@ -320,7 +357,7 @@ def send_signal(self, sig): def pid(self): return self._pid - def wait(self): + def _wait(self): status = self.chan.recv_exit_status() # Ensure that the redirection thread is finished copying the content # from paramiko to the pipe. @@ -406,7 +443,7 @@ def create_out(): else: return (_stdout, _stderr) - def poll(self): + def _poll(self): # Wait for the redirection thread to finish, otherwise we would # indicate the caller that the command is finished and that the streams # are safe to drain, but actually the redirection thread is not @@ -440,7 +477,7 @@ def stdout(self): def stderr(self): return self._stderr - def close(self): + def _close(self): for x in (self.stdin, self.stdout, self.stderr): if x is not None: x.close() @@ -464,7 +501,7 @@ def __init__(self, conn, adb_popen, pid, as_root): self.adb_popen = adb_popen self._pid = pid - def send_signal(self, sig): + def _send_signal(self, sig): self.conn.execute( _kill_pgid_cmd(self.pid, sig, self.conn.busybox), as_root=self.as_root, @@ -486,14 +523,13 @@ def stderr(self): def pid(self): return self._pid - def wait(self): + def _wait(self): return self.adb_popen.wait() def _communicate(self, input, timeout): return _popen_communicate(self, self.adb_popen, input, timeout) - - def poll(self): + def _poll(self): return self.adb_popen.poll() def _cancel(self, kill_timeout): @@ -504,17 +540,15 @@ def _cancel(self, kill_timeout): self.send_signal(signal.SIGKILL) self.adb_popen.kill() - def close(self): + def _close(self): self.adb_popen.__exit__(None, None, None) return self.adb_popen.returncode def __enter__(self): + super().__enter__() self.adb_popen.__enter__() return self - def __exit__(self, *args, **kwargs): - self.adb_popen.__exit__(*args, **kwargs) - class TransferManagerBase(ABC): diff --git a/devlib/host.py b/devlib/host.py index d067d807c..68c535d39 100644 --- a/devlib/host.py +++ b/devlib/host.py @@ -142,7 +142,6 @@ def preexec_fn(): preexec_fn=preexec_fn, ) bg_cmd = PopenBackgroundCommand(self, popen) - self._current_bg_cmds.add(bg_cmd) return bg_cmd def _close(self): diff --git a/devlib/utils/android.py b/devlib/utils/android.py index 50bca3011..1cae582e9 100755 --- a/devlib/utils/android.py +++ b/devlib/utils/android.py @@ -368,7 +368,6 @@ def background(self, command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, as if as_root and self.connected_as_root: as_root = False bg_cmd = self._background(command, stdout, stderr, as_root) - self._current_bg_cmds.add(bg_cmd) return bg_cmd def _background(self, command, stdout, stderr, as_root): diff --git a/devlib/utils/ssh.py b/devlib/utils/ssh.py index c3f71f55c..b8485219c 100644 --- a/devlib/utils/ssh.py +++ b/devlib/utils/ssh.py @@ -553,10 +553,7 @@ def execute(self, command, timeout=None, check_exit_code=True, def background(self, command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, as_root=False): with _handle_paramiko_exceptions(command): - bg_cmd = self._background(command, stdout, stderr, as_root) - - self._current_bg_cmds.add(bg_cmd) - return bg_cmd + return self._background(command, stdout, stderr, as_root) def _background(self, command, stdout, stderr, as_root): orig_command = command From ab263194275464332d12383e8a4d53a9db71b16f Mon Sep 17 00:00:00 2001 From: Douglas Raillard Date: Thu, 6 Apr 2023 21:39:53 +0100 Subject: [PATCH 4/4] connection: Ensure we don't leak too many BackgroundCommand Make BackgroundCommand.__init__() poll all current BackgroundCommands on the associated connection so they deregister themselves if they are completed. This ensures that a BackgroundCommand-heavy application that also does not close them properly will not accumulate useless instances forever and leak associated resources like Popen objects. --- devlib/connection.py | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/devlib/connection.py b/devlib/connection.py index f098a54d8..8c4105591 100644 --- a/devlib/connection.py +++ b/devlib/connection.py @@ -123,6 +123,19 @@ class BackgroundCommand(ABC): def __init__(self, conn): self.conn = conn + + # Poll currently opened background commands on that connection to make + # them deregister themselves if they are completed. This avoids + # accumulating terminated commands and therefore leaking associated + # resources if the user is not careful and does not use the context + # manager API. + for bg_cmd in set(conn._current_bg_cmds): + try: + bg_cmd.poll() + # We don't want anything to fail here because of another command + except Exception: + pass + conn._current_bg_cmds.add(self) def _deregister(self):