diff --git a/devlib/connection.py b/devlib/connection.py index ecc30714c..4772bd48b 100644 --- a/devlib/connection.py +++ b/devlib/connection.py @@ -27,6 +27,8 @@ import threading import time import logging +import select +import fcntl from devlib.utils.misc import InitCheckpoint @@ -36,6 +38,24 @@ def _kill_pgid_cmd(pgid, sig, busybox): return '{} kill -{} -{}'.format(busybox, sig.value, pgid) +def _popen_communicate(bg, popen, input, timeout): + try: + stdout, stderr = popen.communicate(input=input, timeout=timeout) + except subprocess.TimeoutExpired: + bg.cancel() + raise + + ret = popen.returncode + if ret: + raise subprocess.CalledProcessError( + ret, + popen.args, + stdout, + stderr, + ) + else: + return (stdout, stderr) + class ConnectionBase(InitCheckpoint): """ @@ -126,6 +146,21 @@ def wait(self): Block until the background command completes, and return its exit code. """ + def communicate(self, input=b'', timeout=None): + """ + Block until the background command completes while reading stdout and stderr. + Return ``tuple(stdout, stderr)``. If the return code is non-zero, + raises a :exc:`subprocess.CalledProcessError` exception. + """ + try: + return self._communicate(input=input, timeout=timeout) + finally: + self.close() + + @abstractmethod + def _communicate(self, input, timeout): + pass + @abstractmethod def poll(self): """ @@ -214,6 +249,9 @@ def pid(self): def wait(self): return self.popen.wait() + def _communicate(self, input, timeout): + return _popen_communicate(self, self.popen, input, timeout) + def poll(self): return self.popen.poll() @@ -241,7 +279,7 @@ class ParamikoBackgroundCommand(BackgroundCommand): """ :mod:`paramiko`-based background command. """ - def __init__(self, conn, chan, pid, as_root, stdin, stdout, stderr, redirect_thread): + def __init__(self, conn, chan, pid, as_root, cmd, stdin, stdout, stderr, redirect_thread): self.chan = chan self.as_root = as_root self.conn = conn @@ -250,6 +288,7 @@ def __init__(self, conn, chan, pid, as_root, stdin, stdout, stderr, redirect_thr self._stdout = stdout self._stderr = stderr self.redirect_thread = redirect_thread + self.cmd = cmd def send_signal(self, sig): # If the command has already completed, we don't want to send a signal @@ -272,6 +311,85 @@ def wait(self): self.redirect_thread.join() return status + def _communicate(self, input, timeout): + stdout = self._stdout + stderr = self._stderr + stdin = self._stdin + chan = self.chan + + # For some reason, file descriptors in the read-list of select() can + # still end up blocking in .read(), so make the non-blocking to avoid a + # deadlock. Since _communicate() will consume all input and all output + # until the command dies, we can do whatever we want with the pipe + # without affecting external users. + for s in (stdout, stderr): + fcntl.fcntl(s.fileno(), fcntl.F_SETFL, os.O_NONBLOCK) + + out = {stdout: [], stderr: []} + ret = None + can_send = True + + select_timeout = 1 + if timeout is not None: + select_timeout = min(select_timeout, 1) + + def create_out(): + return ( + b''.join(out[stdout]), + b''.join(out[stderr]) + ) + + start = monotonic() + + while ret is None: + # Even if ret is not None anymore, we need to drain the streams + ret = self.poll() + + if timeout is not None and ret is None and monotonic() - start >= timeout: + self.cancel() + _stdout, _stderr = create_out() + raise subprocess.TimeoutExpired(self.cmd, timeout, _stdout, _stderr) + + can_send &= (not chan.closed) & bool(input) + wlist = [chan] if can_send else [] + + if can_send and chan.send_ready(): + try: + n = chan.send(input) + # stdin might have been closed already + except OSError: + can_send = False + chan.shutdown_write() + else: + input = input[n:] + if not input: + # Send EOF on stdin + chan.shutdown_write() + + rs, ws, _ = select.select( + [x for x in (stdout, stderr) if not x.closed], + wlist, + [], + select_timeout, + ) + + for r in rs: + chunk = r.read() + if chunk: + out[r].append(chunk) + + _stdout, _stderr = create_out() + + if ret: + raise subprocess.CalledProcessError( + ret, + self.cmd, + _stdout, + _stderr, + ) + else: + return (_stdout, _stderr) + def poll(self): # Wait for the redirection thread to finish, otherwise we would # indicate the caller that the command is finished and that the streams @@ -355,6 +473,10 @@ def pid(self): def wait(self): return self.adb_popen.wait() + def _communicate(self, input, timeout): + return _popen_communicate(self, self.adb_popen, input, timeout) + + def poll(self): return self.adb_popen.poll() diff --git a/devlib/utils/ssh.py b/devlib/utils/ssh.py index 6d6bfc562..3cff55ccd 100644 --- a/devlib/utils/ssh.py +++ b/devlib/utils/ssh.py @@ -591,6 +591,7 @@ def background(self, command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, as return bg_cmd def _background(self, command, stdout, stderr, as_root): + orig_command = command stdout, stderr, command = redirect_streams(stdout, stderr, command) command = "printf '%s\n' $$; exec sh -c {}".format(quote(command)) @@ -708,6 +709,7 @@ def callback(out_streams, name, chunk): stdout=out_streams['stdout'][0], stderr=out_streams['stderr'][0], redirect_thread=redirect_thread, + cmd=orig_command, ) def _close(self):