diff --git a/devlib/connection.py b/devlib/connection.py index 020db209d..8c4105591 100644 --- a/devlib/connection.py +++ b/devlib/connection.py @@ -62,10 +62,11 @@ class ConnectionBase(InitCheckpoint): Base class for all connections. """ def __init__(self): - self._current_bg_cmds = WeakSet() + self._current_bg_cmds = set() self._closed = False self._close_lock = threading.Lock() self.busybox = None + self.logger = logging.getLogger('Connection') def cancel_running_command(self): bg_cmds = set(self._current_bg_cmds) @@ -83,11 +84,21 @@ def _close(self): """ def close(self): + + def finish_bg(): + bg_cmds = set(self._current_bg_cmds) + n = len(bg_cmds) + if n: + self.logger.debug(f'Canceling {n} background commands before closing connection') + for bg_cmd in bg_cmds: + bg_cmd.cancel() + # Locking the closing allows any thread to safely call close() as long # as the connection can be closed from a thread that is not the one it # started its life in. with self._close_lock: if not self._closed: + finish_bg() self._close() self._closed = True @@ -109,7 +120,34 @@ class BackgroundCommand(ABC): Instances of this class can be used as context managers, with the same semantic as :class:`subprocess.Popen`. """ + + def __init__(self, conn): + self.conn = conn + + # Poll currently opened background commands on that connection to make + # them deregister themselves if they are completed. This avoids + # accumulating terminated commands and therefore leaking associated + # resources if the user is not careful and does not use the context + # manager API. + for bg_cmd in set(conn._current_bg_cmds): + try: + bg_cmd.poll() + # We don't want anything to fail here because of another command + except Exception: + pass + + conn._current_bg_cmds.add(self) + + def _deregister(self): + try: + self.conn._current_bg_cmds.remove(self) + except KeyError: + pass + @abstractmethod + def _send_signal(self, sig): + pass + def send_signal(self, sig): """ Send a POSIX signal to the background command's process group ID @@ -118,6 +156,11 @@ def send_signal(self, sig): :param signal: Signal to send. :type signal: signal.Signals """ + try: + self._send_signal(signal.SIGKILL) + finally: + # Deregister if the command has finished + self.poll() def kill(self): """ @@ -130,8 +173,11 @@ def cancel(self, kill_timeout=_KILL_TIMEOUT): Try to gracefully terminate the process by sending ``SIGTERM``, then waiting for ``kill_timeout`` to send ``SIGKILL``. """ - if self.poll() is None: - self._cancel(kill_timeout=kill_timeout) + try: + if self.poll() is None: + self._cancel(kill_timeout=kill_timeout) + finally: + self._deregister() @abstractmethod def _cancel(self, kill_timeout): @@ -141,10 +187,17 @@ def _cancel(self, kill_timeout): pass @abstractmethod + def _wait(self): + pass + def wait(self): """ Block until the background command completes, and return its exit code. """ + try: + self._wait() + finally: + self._deregister() def communicate(self, input=b'', timeout=None): """ @@ -162,10 +215,17 @@ def _communicate(self, input, timeout): pass @abstractmethod + def _poll(self): + pass + def poll(self): """ Return exit code if the command has exited, None otherwise. """ + retcode = self._poll() + if retcode is not None: + self._deregister() + return retcode @property @abstractmethod @@ -202,6 +262,9 @@ def pid(self): """ @abstractmethod + def _close(self): + pass + def close(self): """ Close all opened streams and then wait for command completion. @@ -211,6 +274,10 @@ def close(self): .. note:: If the command is writing to its stdout/stderr, it might be blocked on that and die when the streams are closed. """ + try: + return self._close() + finally: + self._deregister() def __enter__(self): return self @@ -224,10 +291,11 @@ class PopenBackgroundCommand(BackgroundCommand): :class:`subprocess.Popen`-based background command. """ - def __init__(self, popen): + def __init__(self, conn, popen): + super().__init__(conn=conn) self.popen = popen - def send_signal(self, sig): + def _send_signal(self, sig): return os.killpg(self.popen.pid, sig) @property @@ -246,13 +314,13 @@ def stderr(self): def pid(self): return self.popen.pid - def wait(self): + def _wait(self): return self.popen.wait() def _communicate(self, input, timeout): return _popen_communicate(self, self.popen, input, timeout) - def poll(self): + def _poll(self): return self.popen.poll() def _cancel(self, kill_timeout): @@ -263,26 +331,24 @@ def _cancel(self, kill_timeout): except subprocess.TimeoutExpired: os.killpg(os.getpgid(popen.pid), signal.SIGKILL) - def close(self): + def _close(self): self.popen.__exit__(None, None, None) return self.popen.returncode def __enter__(self): + super().__enter__() self.popen.__enter__() return self - def __exit__(self, *args, **kwargs): - self.popen.__exit__(*args, **kwargs) - class ParamikoBackgroundCommand(BackgroundCommand): """ :mod:`paramiko`-based background command. """ def __init__(self, conn, chan, pid, as_root, cmd, stdin, stdout, stderr, redirect_thread): + super().__init__(conn=conn) self.chan = chan self.as_root = as_root - self.conn = conn self._pid = pid self._stdin = stdin self._stdout = stdout @@ -290,7 +356,7 @@ def __init__(self, conn, chan, pid, as_root, cmd, stdin, stdout, stderr, redirec self.redirect_thread = redirect_thread self.cmd = cmd - def send_signal(self, sig): + def _send_signal(self, sig): # If the command has already completed, we don't want to send a signal # to another process that might have gotten that PID in the meantime. if self.poll() is not None: @@ -304,7 +370,7 @@ def send_signal(self, sig): def pid(self): return self._pid - def wait(self): + def _wait(self): status = self.chan.recv_exit_status() # Ensure that the redirection thread is finished copying the content # from paramiko to the pipe. @@ -390,7 +456,7 @@ def create_out(): else: return (_stdout, _stderr) - def poll(self): + def _poll(self): # Wait for the redirection thread to finish, otherwise we would # indicate the caller that the command is finished and that the streams # are safe to drain, but actually the redirection thread is not @@ -424,7 +490,7 @@ def stdout(self): def stderr(self): return self._stderr - def close(self): + def _close(self): for x in (self.stdin, self.stdout, self.stderr): if x is not None: x.close() @@ -443,12 +509,12 @@ class AdbBackgroundCommand(BackgroundCommand): """ def __init__(self, conn, adb_popen, pid, as_root): - self.conn = conn + super().__init__(conn=conn) self.as_root = as_root self.adb_popen = adb_popen self._pid = pid - def send_signal(self, sig): + def _send_signal(self, sig): self.conn.execute( _kill_pgid_cmd(self.pid, sig, self.conn.busybox), as_root=self.as_root, @@ -470,14 +536,13 @@ def stderr(self): def pid(self): return self._pid - def wait(self): + def _wait(self): return self.adb_popen.wait() def _communicate(self, input, timeout): return _popen_communicate(self, self.adb_popen, input, timeout) - - def poll(self): + def _poll(self): return self.adb_popen.poll() def _cancel(self, kill_timeout): @@ -488,17 +553,15 @@ def _cancel(self, kill_timeout): self.send_signal(signal.SIGKILL) self.adb_popen.kill() - def close(self): + def _close(self): self.adb_popen.__exit__(None, None, None) return self.adb_popen.returncode def __enter__(self): + super().__enter__() self.adb_popen.__enter__() return self - def __exit__(self, *args, **kwargs): - self.adb_popen.__exit__(*args, **kwargs) - class TransferManagerBase(ABC): diff --git a/devlib/host.py b/devlib/host.py index a6796da5f..68c535d39 100644 --- a/devlib/host.py +++ b/devlib/host.py @@ -141,8 +141,7 @@ def preexec_fn(): shell=True, preexec_fn=preexec_fn, ) - bg_cmd = PopenBackgroundCommand(popen) - self._current_bg_cmds.add(bg_cmd) + bg_cmd = PopenBackgroundCommand(self, popen) return bg_cmd def _close(self): diff --git a/devlib/utils/android.py b/devlib/utils/android.py index b57910da6..1cae582e9 100755 --- a/devlib/utils/android.py +++ b/devlib/utils/android.py @@ -334,7 +334,12 @@ def _push_pull(self, action, sources, dest, timeout): adb_command(self.device, command, timeout=timeout, adb_server=self.adb_server) else: with self.transfer_mgr.manage(sources, dest, action): - bg_cmd = adb_command_background(self.device, command, adb_server=self.adb_server) + bg_cmd = adb_command_background( + device=self.device, + conn=self, + command=command, + adb_server=self.adb_server + ) self.transfer_mgr.set_transfer_and_wait(bg_cmd) # pylint: disable=unused-argument @@ -363,7 +368,6 @@ def background(self, command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, as if as_root and self.connected_as_root: as_root = False bg_cmd = self._background(command, stdout, stderr, as_root) - self._current_bg_cmds.add(bg_cmd) return bg_cmd def _background(self, command, stdout, stderr, as_root): @@ -692,11 +696,11 @@ def adb_command(device, command, timeout=None, adb_server=None): return output -def adb_command_background(device, command, adb_server=None): +def adb_command_background(device, conn, command, adb_server=None): full_command = get_adb_command(device, command, adb_server) logger.debug(full_command) - proc = get_subprocess(full_command, shell=True) - cmd = PopenBackgroundCommand(proc) + popen = get_subprocess(full_command, shell=True) + cmd = PopenBackgroundCommand(conn=conn, popen=popen) return cmd diff --git a/devlib/utils/ssh.py b/devlib/utils/ssh.py index 39d704645..b8485219c 100644 --- a/devlib/utils/ssh.py +++ b/devlib/utils/ssh.py @@ -553,10 +553,7 @@ def execute(self, command, timeout=None, check_exit_code=True, def background(self, command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, as_root=False): with _handle_paramiko_exceptions(command): - bg_cmd = self._background(command, stdout, stderr, as_root) - - self._current_bg_cmds.add(bg_cmd) - return bg_cmd + return self._background(command, stdout, stderr, as_root) def _background(self, command, stdout, stderr, as_root): orig_command = command @@ -696,9 +693,6 @@ def callback(out_streams, name, chunk): def _close(self): logger.debug('Logging out {}@{}'.format(self.username, self.host)) with _handle_paramiko_exceptions(): - bg_cmds = set(self._current_bg_cmds) - for bg_cmd in bg_cmds: - bg_cmd.close() self.client.close() def _execute_command(self, command, as_root, log, timeout, executor):