Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 123 additions & 1 deletion devlib/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import threading
import time
import logging
import select
import fcntl

from devlib.utils.misc import InitCheckpoint

Expand All @@ -36,6 +38,24 @@
def _kill_pgid_cmd(pgid, sig, busybox):
return '{} kill -{} -{}'.format(busybox, sig.value, pgid)

def _popen_communicate(bg, popen, input, timeout):
try:
stdout, stderr = popen.communicate(input=input, timeout=timeout)
except subprocess.TimeoutExpired:
bg.cancel()
raise

ret = popen.returncode
if ret:
raise subprocess.CalledProcessError(
ret,
popen.args,
stdout,
stderr,
)
else:
return (stdout, stderr)


class ConnectionBase(InitCheckpoint):
"""
Expand Down Expand Up @@ -126,6 +146,21 @@ def wait(self):
Block until the background command completes, and return its exit code.
"""

def communicate(self, input=b'', timeout=None):
"""
Block until the background command completes while reading stdout and stderr.
Return ``tuple(stdout, stderr)``. If the return code is non-zero,
raises a :exc:`subprocess.CalledProcessError` exception.
"""
try:
return self._communicate(input=input, timeout=timeout)
finally:
self.close()

@abstractmethod
def _communicate(self, input, timeout):
pass

@abstractmethod
def poll(self):
"""
Expand Down Expand Up @@ -214,6 +249,9 @@ def pid(self):
def wait(self):
return self.popen.wait()

def _communicate(self, input, timeout):
return _popen_communicate(self, self.popen, input, timeout)

def poll(self):
return self.popen.poll()

Expand Down Expand Up @@ -241,7 +279,7 @@ class ParamikoBackgroundCommand(BackgroundCommand):
"""
:mod:`paramiko`-based background command.
"""
def __init__(self, conn, chan, pid, as_root, stdin, stdout, stderr, redirect_thread):
def __init__(self, conn, chan, pid, as_root, cmd, stdin, stdout, stderr, redirect_thread):
self.chan = chan
self.as_root = as_root
self.conn = conn
Expand All @@ -250,6 +288,7 @@ def __init__(self, conn, chan, pid, as_root, stdin, stdout, stderr, redirect_thr
self._stdout = stdout
self._stderr = stderr
self.redirect_thread = redirect_thread
self.cmd = cmd

def send_signal(self, sig):
# If the command has already completed, we don't want to send a signal
Expand All @@ -272,6 +311,85 @@ def wait(self):
self.redirect_thread.join()
return status

def _communicate(self, input, timeout):
stdout = self._stdout
stderr = self._stderr
stdin = self._stdin
chan = self.chan

# For some reason, file descriptors in the read-list of select() can
# still end up blocking in .read(), so make the non-blocking to avoid a
# deadlock. Since _communicate() will consume all input and all output
# until the command dies, we can do whatever we want with the pipe
# without affecting external users.
for s in (stdout, stderr):
fcntl.fcntl(s.fileno(), fcntl.F_SETFL, os.O_NONBLOCK)

out = {stdout: [], stderr: []}
ret = None
can_send = True

select_timeout = 1
if timeout is not None:
select_timeout = min(select_timeout, 1)

def create_out():
return (
b''.join(out[stdout]),
b''.join(out[stderr])
)

start = monotonic()

while ret is None:
# Even if ret is not None anymore, we need to drain the streams
ret = self.poll()

if timeout is not None and ret is None and monotonic() - start >= timeout:
self.cancel()
_stdout, _stderr = create_out()
raise subprocess.TimeoutExpired(self.cmd, timeout, _stdout, _stderr)

can_send &= (not chan.closed) & bool(input)
wlist = [chan] if can_send else []

if can_send and chan.send_ready():
try:
n = chan.send(input)
# stdin might have been closed already
except OSError:
can_send = False
chan.shutdown_write()
else:
input = input[n:]
if not input:
# Send EOF on stdin
chan.shutdown_write()

rs, ws, _ = select.select(
[x for x in (stdout, stderr) if not x.closed],
wlist,
[],
select_timeout,
)

for r in rs:
chunk = r.read()
if chunk:
out[r].append(chunk)

_stdout, _stderr = create_out()

if ret:
raise subprocess.CalledProcessError(
ret,
self.cmd,
_stdout,
_stderr,
)
else:
return (_stdout, _stderr)

def poll(self):
# Wait for the redirection thread to finish, otherwise we would
# indicate the caller that the command is finished and that the streams
Expand Down Expand Up @@ -355,6 +473,10 @@ def pid(self):
def wait(self):
return self.adb_popen.wait()

def _communicate(self, input, timeout):
return _popen_communicate(self, self.adb_popen, input, timeout)


def poll(self):
return self.adb_popen.poll()

Expand Down
2 changes: 2 additions & 0 deletions devlib/utils/ssh.py
Original file line number Diff line number Diff line change
Expand Up @@ -591,6 +591,7 @@ def background(self, command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, as
return bg_cmd

def _background(self, command, stdout, stderr, as_root):
orig_command = command
stdout, stderr, command = redirect_streams(stdout, stderr, command)

command = "printf '%s\n' $$; exec sh -c {}".format(quote(command))
Expand Down Expand Up @@ -708,6 +709,7 @@ def callback(out_streams, name, chunk):
stdout=out_streams['stdout'][0],
stderr=out_streams['stderr'][0],
redirect_thread=redirect_thread,
cmd=orig_command,
)

def _close(self):
Expand Down