From a52cf4ad3f27341603381fa3ce3b3fe2bcc7d26f Mon Sep 17 00:00:00 2001 From: Matthew Rocklin Date: Fri, 11 Nov 2016 07:20:50 -0500 Subject: [PATCH 1/2] Timeout and check on write The core.write coroutine waits on IOStream.write futures. However, there are some cases where IOStream.write futures don't complete. Notably if something else writes to the same stream before we're done waiting then the first future is forgotten and hangs forever. If `write` is called again before that `.Future` has resolved, the previous future will be orphaned and will never resolve. This solution wraps the wait in a timeout and checks the stream's write_buffer as a secondary check for completion. There are probably better solutions but this seems to work fine. --- distributed/core.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/distributed/core.py b/distributed/core.py index 4f1895bfd4e..a5ca0ab2d9e 100644 --- a/distributed/core.py +++ b/distributed/core.py @@ -242,10 +242,12 @@ def write(stream, msg): futures.append(stream.write(frames[-1])) - if WINDOWS: - yield futures[-1] - else: - yield futures + while stream._write_buffer: + try: + yield gen.with_timeout(timedelta(seconds=0.01), futures[-1]) + break + except gen.TimeoutError: + pass def pingpong(stream): From 6259c607300c8ace86fcb1326d502d686e2c042c Mon Sep 17 00:00:00 2001 From: Matthew Rocklin Date: Fri, 11 Nov 2016 07:27:38 -0500 Subject: [PATCH 2/2] Remove timeout trick from batched This was recently moved to core.write --- distributed/batched.py | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/distributed/batched.py b/distributed/batched.py index 47ba98df2ed..2aeabf6005e 100644 --- a/distributed/batched.py +++ b/distributed/batched.py @@ -64,12 +64,7 @@ def send_next(self, wait=True): wait_time = min(self.last_transmission + self.interval - now, self.interval) yield gen.sleep(wait_time) - while self.stream._write_buffer: - try: - yield gen.with_timeout(timedelta(milliseconds=10), - self.last_send) # hangs otherwise? - except gen.TimeoutError: - pass + yield self.last_send self.buffer, payload = [], self.buffer self.last_payload = payload self.last_transmission = now