diff --git a/trio/_subprocess/linux_waitpid.py b/trio/_subprocess/linux_waitpid.py new file mode 100644 index 0000000000..33cd1c01f1 --- /dev/null +++ b/trio/_subprocess/linux_waitpid.py @@ -0,0 +1,55 @@ +import attr +import functools +import os +import outcome +from typing import Any + +from .. import _core +from .._sync import Event +from .._threads import run_sync_in_worker_thread + + +@attr.s +class WaitpidState: + pid = attr.ib() + event = attr.ib(default=attr.Factory(Event)) + outcome = attr.ib(default=None) + + +# https://github.com/python-trio/trio/issues/618 +class StubLimiter: + def release_on_behalf_of(self, x): + pass + + async def acquire_on_behalf_of(self, x): + pass + + +waitpid_limiter = StubLimiter() + + +# adapted from +# https://github.com/python-trio/trio/issues/4#issuecomment-398967572 +async def _task(state: WaitpidState) -> None: + """The waitpid thread runner task. This must be spawned as a system + task.""" + partial = functools.partial( + os.waitpid, # function + state.pid, # pid + 0 # no options + ) + + tresult = await run_sync_in_worker_thread( + outcome.capture, partial, cancellable=True, limiter=waitpid_limiter + ) + state.outcome = tresult + state.event.set() + + +async def waitpid(pid: int) -> Any: + """Waits for a child process with the specified PID to finish running.""" + waiter = WaitpidState(pid=pid) + _core.spawn_system_task(_task, waiter) + + await waiter.event.wait() + return waiter.outcome.unwrap() diff --git a/trio/tests/subprocess/test_waitpid_linux.py b/trio/tests/subprocess/test_waitpid_linux.py new file mode 100644 index 0000000000..53a37c61a1 --- /dev/null +++ b/trio/tests/subprocess/test_waitpid_linux.py @@ -0,0 +1,39 @@ +import sys + +import os +import pytest +import signal + +from ... import _core +from ..._subprocess.linux_waitpid import waitpid + +pytestmark = pytest.mark.skipif( + sys.platform != "linux", reason="linux waitpid only works on linux" +) + + +async def test_waitpid(): + pid = os.spawnvp(os.P_NOWAIT, "/bin/false", ("false",)) + result = await waitpid(pid) + # exit code is a 16-bit int: (code, signal) + assert result[0] == pid + assert os.WIFEXITED(result[1]) and os.WEXITSTATUS(result[1]) == 1 + + pid2 = os.spawnvp(os.P_NOWAIT, "/bin/true", ("true",)) + result = await waitpid(pid2) + assert result[0] == pid2 + assert os.WIFEXITED(result[1]) and os.WEXITSTATUS(result[1]) == 0 + + pid3 = os.spawnvp(os.P_NOWAIT, "/bin/sleep", ("/bin/sleep", "5")) + os.kill(pid3, signal.SIGKILL) + result = await waitpid(pid3) + assert result[0] == pid3 + status = os.WTERMSIG(result[1]) + assert os.WIFSIGNALED(result[1]) and status == 9 + + +async def test_waitpid_no_process(): + with pytest.raises(ChildProcessError): + # this PID does exist, but it's ourselves + # which doesn't work + await waitpid(os.getpid())