Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 88 additions & 25 deletions devlib/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,11 @@ class ConnectionBase(InitCheckpoint):
Base class for all connections.
"""
def __init__(self):
self._current_bg_cmds = WeakSet()
self._current_bg_cmds = set()
self._closed = False
self._close_lock = threading.Lock()
self.busybox = None
self.logger = logging.getLogger('Connection')

def cancel_running_command(self):
bg_cmds = set(self._current_bg_cmds)
Expand All @@ -83,11 +84,21 @@ def _close(self):
"""

def close(self):

def finish_bg():
bg_cmds = set(self._current_bg_cmds)
n = len(bg_cmds)
if n:
self.logger.debug(f'Canceling {n} background commands before closing connection')
for bg_cmd in bg_cmds:
bg_cmd.cancel()

# Locking the closing allows any thread to safely call close() as long
# as the connection can be closed from a thread that is not the one it
# started its life in.
with self._close_lock:
if not self._closed:
finish_bg()
self._close()
self._closed = True

Expand All @@ -109,7 +120,34 @@ class BackgroundCommand(ABC):
Instances of this class can be used as context managers, with the same
semantic as :class:`subprocess.Popen`.
"""

def __init__(self, conn):
self.conn = conn

# Poll currently opened background commands on that connection to make
# them deregister themselves if they are completed. This avoids
# accumulating terminated commands and therefore leaking associated
# resources if the user is not careful and does not use the context
# manager API.
for bg_cmd in set(conn._current_bg_cmds):
try:
bg_cmd.poll()
# We don't want anything to fail here because of another command
except Exception:
pass

conn._current_bg_cmds.add(self)

def _deregister(self):
try:
self.conn._current_bg_cmds.remove(self)
except KeyError:
pass

@abstractmethod
def _send_signal(self, sig):
pass

def send_signal(self, sig):
"""
Send a POSIX signal to the background command's process group ID
Expand All @@ -118,6 +156,11 @@ def send_signal(self, sig):
:param signal: Signal to send.
:type signal: signal.Signals
"""
try:
self._send_signal(signal.SIGKILL)
finally:
# Deregister if the command has finished
self.poll()

def kill(self):
"""
Expand All @@ -130,8 +173,11 @@ def cancel(self, kill_timeout=_KILL_TIMEOUT):
Try to gracefully terminate the process by sending ``SIGTERM``, then
waiting for ``kill_timeout`` to send ``SIGKILL``.
"""
if self.poll() is None:
self._cancel(kill_timeout=kill_timeout)
try:
if self.poll() is None:
self._cancel(kill_timeout=kill_timeout)
finally:
self._deregister()

@abstractmethod
def _cancel(self, kill_timeout):
Expand All @@ -141,10 +187,17 @@ def _cancel(self, kill_timeout):
pass

@abstractmethod
def _wait(self):
pass

def wait(self):
"""
Block until the background command completes, and return its exit code.
"""
try:
self._wait()
finally:
self._deregister()

def communicate(self, input=b'', timeout=None):
"""
Expand All @@ -162,10 +215,17 @@ def _communicate(self, input, timeout):
pass

@abstractmethod
def _poll(self):
pass

def poll(self):
"""
Return exit code if the command has exited, None otherwise.
"""
retcode = self._poll()
if retcode is not None:
self._deregister()
return retcode

@property
@abstractmethod
Expand Down Expand Up @@ -202,6 +262,9 @@ def pid(self):
"""

@abstractmethod
def _close(self):
pass

def close(self):
"""
Close all opened streams and then wait for command completion.
Expand All @@ -211,6 +274,10 @@ def close(self):
.. note:: If the command is writing to its stdout/stderr, it might be
blocked on that and die when the streams are closed.
"""
try:
return self._close()
finally:
self._deregister()

def __enter__(self):
return self
Expand All @@ -224,10 +291,11 @@ class PopenBackgroundCommand(BackgroundCommand):
:class:`subprocess.Popen`-based background command.
"""

def __init__(self, popen):
def __init__(self, conn, popen):
super().__init__(conn=conn)
self.popen = popen

def send_signal(self, sig):
def _send_signal(self, sig):
return os.killpg(self.popen.pid, sig)

@property
Expand All @@ -246,13 +314,13 @@ def stderr(self):
def pid(self):
return self.popen.pid

def wait(self):
def _wait(self):
return self.popen.wait()

def _communicate(self, input, timeout):
return _popen_communicate(self, self.popen, input, timeout)

def poll(self):
def _poll(self):
return self.popen.poll()

def _cancel(self, kill_timeout):
Expand All @@ -263,34 +331,32 @@ def _cancel(self, kill_timeout):
except subprocess.TimeoutExpired:
os.killpg(os.getpgid(popen.pid), signal.SIGKILL)

def close(self):
def _close(self):
self.popen.__exit__(None, None, None)
return self.popen.returncode

def __enter__(self):
super().__enter__()
self.popen.__enter__()
return self

def __exit__(self, *args, **kwargs):
self.popen.__exit__(*args, **kwargs)


class ParamikoBackgroundCommand(BackgroundCommand):
"""
:mod:`paramiko`-based background command.
"""
def __init__(self, conn, chan, pid, as_root, cmd, stdin, stdout, stderr, redirect_thread):
super().__init__(conn=conn)
self.chan = chan
self.as_root = as_root
self.conn = conn
self._pid = pid
self._stdin = stdin
self._stdout = stdout
self._stderr = stderr
self.redirect_thread = redirect_thread
self.cmd = cmd

def send_signal(self, sig):
def _send_signal(self, sig):
# If the command has already completed, we don't want to send a signal
# to another process that might have gotten that PID in the meantime.
if self.poll() is not None:
Expand All @@ -304,7 +370,7 @@ def send_signal(self, sig):
def pid(self):
return self._pid

def wait(self):
def _wait(self):
status = self.chan.recv_exit_status()
# Ensure that the redirection thread is finished copying the content
# from paramiko to the pipe.
Expand Down Expand Up @@ -390,7 +456,7 @@ def create_out():
else:
return (_stdout, _stderr)

def poll(self):
def _poll(self):
# Wait for the redirection thread to finish, otherwise we would
# indicate the caller that the command is finished and that the streams
# are safe to drain, but actually the redirection thread is not
Expand Down Expand Up @@ -424,7 +490,7 @@ def stdout(self):
def stderr(self):
return self._stderr

def close(self):
def _close(self):
for x in (self.stdin, self.stdout, self.stderr):
if x is not None:
x.close()
Expand All @@ -443,12 +509,12 @@ class AdbBackgroundCommand(BackgroundCommand):
"""

def __init__(self, conn, adb_popen, pid, as_root):
self.conn = conn
super().__init__(conn=conn)
self.as_root = as_root
self.adb_popen = adb_popen
self._pid = pid

def send_signal(self, sig):
def _send_signal(self, sig):
self.conn.execute(
_kill_pgid_cmd(self.pid, sig, self.conn.busybox),
as_root=self.as_root,
Expand All @@ -470,14 +536,13 @@ def stderr(self):
def pid(self):
return self._pid

def wait(self):
def _wait(self):
return self.adb_popen.wait()

def _communicate(self, input, timeout):
return _popen_communicate(self, self.adb_popen, input, timeout)


def poll(self):
def _poll(self):
return self.adb_popen.poll()

def _cancel(self, kill_timeout):
Expand All @@ -488,17 +553,15 @@ def _cancel(self, kill_timeout):
self.send_signal(signal.SIGKILL)
self.adb_popen.kill()

def close(self):
def _close(self):
self.adb_popen.__exit__(None, None, None)
return self.adb_popen.returncode

def __enter__(self):
super().__enter__()
self.adb_popen.__enter__()
return self

def __exit__(self, *args, **kwargs):
self.adb_popen.__exit__(*args, **kwargs)


class TransferManagerBase(ABC):

Expand Down
3 changes: 1 addition & 2 deletions devlib/host.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,7 @@ def preexec_fn():
shell=True,
preexec_fn=preexec_fn,
)
bg_cmd = PopenBackgroundCommand(popen)
self._current_bg_cmds.add(bg_cmd)
bg_cmd = PopenBackgroundCommand(self, popen)
return bg_cmd

def _close(self):
Expand Down
14 changes: 9 additions & 5 deletions devlib/utils/android.py
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,12 @@ def _push_pull(self, action, sources, dest, timeout):
adb_command(self.device, command, timeout=timeout, adb_server=self.adb_server)
else:
with self.transfer_mgr.manage(sources, dest, action):
bg_cmd = adb_command_background(self.device, command, adb_server=self.adb_server)
bg_cmd = adb_command_background(
device=self.device,
conn=self,
command=command,
adb_server=self.adb_server
)
self.transfer_mgr.set_transfer_and_wait(bg_cmd)

# pylint: disable=unused-argument
Expand Down Expand Up @@ -363,7 +368,6 @@ def background(self, command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, as
if as_root and self.connected_as_root:
as_root = False
bg_cmd = self._background(command, stdout, stderr, as_root)
self._current_bg_cmds.add(bg_cmd)
return bg_cmd

def _background(self, command, stdout, stderr, as_root):
Expand Down Expand Up @@ -692,11 +696,11 @@ def adb_command(device, command, timeout=None, adb_server=None):
return output


def adb_command_background(device, command, adb_server=None):
def adb_command_background(device, conn, command, adb_server=None):
full_command = get_adb_command(device, command, adb_server)
logger.debug(full_command)
proc = get_subprocess(full_command, shell=True)
cmd = PopenBackgroundCommand(proc)
popen = get_subprocess(full_command, shell=True)
cmd = PopenBackgroundCommand(conn=conn, popen=popen)
return cmd


Expand Down
8 changes: 1 addition & 7 deletions devlib/utils/ssh.py
Original file line number Diff line number Diff line change
Expand Up @@ -553,10 +553,7 @@ def execute(self, command, timeout=None, check_exit_code=True,

def background(self, command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, as_root=False):
with _handle_paramiko_exceptions(command):
bg_cmd = self._background(command, stdout, stderr, as_root)

self._current_bg_cmds.add(bg_cmd)
return bg_cmd
return self._background(command, stdout, stderr, as_root)

def _background(self, command, stdout, stderr, as_root):
orig_command = command
Expand Down Expand Up @@ -696,9 +693,6 @@ def callback(out_streams, name, chunk):
def _close(self):
logger.debug('Logging out {}@{}'.format(self.username, self.host))
with _handle_paramiko_exceptions():
bg_cmds = set(self._current_bg_cmds)
for bg_cmd in bg_cmds:
bg_cmd.close()
self.client.close()

def _execute_command(self, command, as_root, log, timeout, executor):
Expand Down