From 48cb7f88b764121342182daf56a940acfec31d90 Mon Sep 17 00:00:00 2001 From: Chris Harris Date: Tue, 5 Mar 2024 08:06:50 -0800 Subject: [PATCH] Prevent LimitOverrunError with large output lines If a submission writes a output line larger than the stream buffer size ( default 64k ) a LimitOverrunError will be raise. Rather than using readline(...) use readutil(....) and in the case of a overrun just return the current buffer, the rest of the line will be returned with the next read. Signed-off-by: Chris Harris --- compute_worker/compute_worker.py | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/compute_worker/compute_worker.py b/compute_worker/compute_worker.py index 491b84e5f..24e7d25bc 100644 --- a/compute_worker/compute_worker.py +++ b/compute_worker/compute_worker.py @@ -507,12 +507,25 @@ async def _run_container_engine_cmd(self, engine_cmd, kind): websocket = await websockets.connect(self.websocket_url) websocket_errors = (socket.gaierror, websockets.WebSocketException, websockets.ConnectionClosedError, ConnectionRefusedError) + # Function to read a line, if the line is larger than the buffer size we will + # return the buffer so we can continue reading until we get a newline, rather + # than getting a LimitOverrunError + async def _readline_or_chunk(stream): + try: + return await stream.readuntil(b"\n") + except asyncio.exceptions.IncompleteReadError as e: + # Just return what has been read so far + return e.partial + except asyncio.exceptions.LimitOverrunError as e: + # If we get a LimitOverrunError, we will return the buffer so we can continue reading + return await stream.read(e.consumed) + while any(v["continue"] for k, v in self.logs[kind].items() if k in ['stdout', 'stderr']): try: logs = [self.logs[kind][key] for key in ('stdout', 'stderr')] for value in logs: try: - out = await asyncio.wait_for(value["stream"].readline(), timeout=.1) + out = await asyncio.wait_for(_readline_or_chunk(value["stream"]), timeout=.1) if out: value["data"] += out print("WS: " + str(out))