diff --git a/compute_worker/compute_worker.py b/compute_worker/compute_worker.py index 491b84e5f..24e7d25bc 100644 --- a/compute_worker/compute_worker.py +++ b/compute_worker/compute_worker.py @@ -507,12 +507,25 @@ async def _run_container_engine_cmd(self, engine_cmd, kind): websocket = await websockets.connect(self.websocket_url) websocket_errors = (socket.gaierror, websockets.WebSocketException, websockets.ConnectionClosedError, ConnectionRefusedError) + # Function to read a line, if the line is larger than the buffer size we will + # return the buffer so we can continue reading until we get a newline, rather + # than getting a LimitOverrunError + async def _readline_or_chunk(stream): + try: + return await stream.readuntil(b"\n") + except asyncio.exceptions.IncompleteReadError as e: + # Just return what has been read so far + return e.partial + except asyncio.exceptions.LimitOverrunError as e: + # If we get a LimitOverrunError, we will return the buffer so we can continue reading + return await stream.read(e.consumed) + while any(v["continue"] for k, v in self.logs[kind].items() if k in ['stdout', 'stderr']): try: logs = [self.logs[kind][key] for key in ('stdout', 'stderr')] for value in logs: try: - out = await asyncio.wait_for(value["stream"].readline(), timeout=.1) + out = await asyncio.wait_for(_readline_or_chunk(value["stream"]), timeout=.1) if out: value["data"] += out print("WS: " + str(out))