diff --git a/distributed/batched.py b/distributed/batched.py index 47ba98df2ed..2aeabf6005e 100644 --- a/distributed/batched.py +++ b/distributed/batched.py @@ -64,12 +64,7 @@ def send_next(self, wait=True): wait_time = min(self.last_transmission + self.interval - now, self.interval) yield gen.sleep(wait_time) - while self.stream._write_buffer: - try: - yield gen.with_timeout(timedelta(milliseconds=10), - self.last_send) # hangs otherwise? - except gen.TimeoutError: - pass + yield self.last_send self.buffer, payload = [], self.buffer self.last_payload = payload self.last_transmission = now diff --git a/distributed/core.py b/distributed/core.py index 4f1895bfd4e..a5ca0ab2d9e 100644 --- a/distributed/core.py +++ b/distributed/core.py @@ -242,10 +242,12 @@ def write(stream, msg): futures.append(stream.write(frames[-1])) - if WINDOWS: - yield futures[-1] - else: - yield futures + while stream._write_buffer: + try: + yield gen.with_timeout(timedelta(seconds=0.01), futures[-1]) + break + except gen.TimeoutError: + pass def pingpong(stream):