Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion compute_worker/compute_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -507,12 +507,25 @@ async def _run_container_engine_cmd(self, engine_cmd, kind):
websocket = await websockets.connect(self.websocket_url)
websocket_errors = (socket.gaierror, websockets.WebSocketException, websockets.ConnectionClosedError, ConnectionRefusedError)

# Function to read a line, if the line is larger than the buffer size we will
# return the buffer so we can continue reading until we get a newline, rather
# than getting a LimitOverrunError
async def _readline_or_chunk(stream):
try:
return await stream.readuntil(b"\n")
except asyncio.exceptions.IncompleteReadError as e:
# Just return what has been read so far
return e.partial
except asyncio.exceptions.LimitOverrunError as e:
# If we get a LimitOverrunError, we will return the buffer so we can continue reading
return await stream.read(e.consumed)

while any(v["continue"] for k, v in self.logs[kind].items() if k in ['stdout', 'stderr']):
try:
logs = [self.logs[kind][key] for key in ('stdout', 'stderr')]
for value in logs:
try:
out = await asyncio.wait_for(value["stream"].readline(), timeout=.1)
out = await asyncio.wait_for(_readline_or_chunk(value["stream"]), timeout=.1)
if out:
value["data"] += out
print("WS: " + str(out))
Expand Down