Skip to content

Flaky test_multiple_workers #6754

@gjoseph92

Description

@gjoseph92

Just one failure on Windows (unusual compared to other #6731 failures, which are predominantly on macOS).

____________________________ test_multiple_workers ____________________________
addr = 'tcp://127.0.0.1:51201', timeout = 5, deserialize = True
handshake_overrides = None
connection_args = {'extra_conn_args': {}, 'require_encryption': False, 'ssl_context': None}
scheme = 'tcp', loc = '127.0.0.1:51201'
backend = <distributed.comm.tcp.TCPBackend object at 0x000001B178175F10>
connector = <distributed.comm.tcp.TCPConnector object at 0x000001B1017F1250>
comm = None
time_left = <function connect.<locals>.time_left at 0x000001B17F793A60>
backoff_base = 0.01
asyncdefconnect(
        addr, timeout=None, deserialize=True, handshake_overrides=None, **connection_args
    ):
"""
    Connect to the given address (a URI such as ``tcp://127.0.0.1:1234``)
    and yield a ``Comm`` object.  If the connection attempt fails, it is
    retried until the *timeout* is expired.
    """
if timeout isNone:
            timeout = dask.config.get("distributed.comm.timeouts.connect")
        timeout = parse_timedelta(timeout, default="seconds")
        scheme, loc = parse_address(addr)
        backend = registry.get_backend(scheme)
        connector = backend.get_connector()
        comm = None
        start = time()
deftime_left():
            deadline = start + timeout
returnmax(0, deadline - time())
        backoff_base = 0.01
        attempt = 0
# Prefer multiple small attempts than one long attempt. This should protect
# primarily from DNS race conditions
# gh3104, gh4176, gh4167
        intermediate_cap = timeout / 5
        active_exception = None
while time_left() > 0:
try:
>               comm = await asyncio.wait_for(
                    connector.connect(loc, deserialize=deserialize, **connection_args),
                    timeout=min(intermediate_cap, time_left()),
                )
distributed\comm\core.py:291: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
fut = <Task cancelled name='Task-323' coro=<BaseTCPConnector.connect() done, defined at d:\a\distributed\distributed\distributed\comm\tcp.py:443>>
timeout = 0.0052509307861328125
asyncdefwait_for(fut, timeout, *, loop=None):
"""Wait for the single Future or coroutine to complete, with timeout.
    Coroutine will be wrapped in Task.
    Returns result of the Future or coroutine.  When a timeout occurs,
    it cancels the task and raises TimeoutError.  To avoid the task
    cancellation, wrap it in shield().
    If the wait is cancelled, the task is also cancelled.
    This function is a coroutine.
    """
if loop isNone:
            loop = events.get_running_loop()
else:
            warnings.warn("The loop argument is deprecated since Python 3.8, "
"and scheduled for removal in Python 3.10.",
DeprecationWarning, stacklevel=2)
if timeout isNone:
returnawait fut
if timeout <= 0:
            fut = ensure_future(fut, loop=loop)
if fut.done():
return fut.result()
await _cancel_and_wait(fut, loop=loop)
try:
                fut.result()
except exceptions.CancelledError as exc:
raise exceptions.TimeoutError() fromexc
else:
raise exceptions.TimeoutError()
        waiter = loop.create_future()
        timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
        cb = functools.partial(_release_waiter, waiter)
        fut = ensure_future(fut, loop=loop)
        fut.add_done_callback(cb)
try:
# wait until the future completes or the timeout
try:
await waiter
except exceptions.CancelledError:
if fut.done():
return fut.result()
else:
                    fut.remove_done_callback(cb)
# We must ensure that the task is not running
# after wait_for() returns.
# See https://bugs.python.org/issue32751
await _cancel_and_wait(fut, loop=loop)
raise
if fut.done():
return fut.result()
else:
                fut.remove_done_callback(cb)
# We must ensure that the task is not running
# after wait_for() returns.
# See https://bugs.python.org/issue32751
await _cancel_and_wait(fut, loop=loop)
>               raise exceptions.TimeoutError()
E               asyncio.exceptions.TimeoutError
C:\Miniconda3\envs\dask-distributed\lib\asyncio\tasks.py:501: TimeoutError
The above exception was the direct cause of the following exception:
loop = <tornado.platform.asyncio.AsyncIOLoop object at 0x000001B1010D8880>
deftest_multiple_workers(loop):
        scheduler_address = f"127.0.0.1:{open_port()}"
with popen(["dask-scheduler", "--no-dashboard", "--host", scheduler_address]) as s:
with popen(["dask-worker", scheduler_address, "--no-dashboard"]) as a:
with popen(["dask-worker", scheduler_address, "--no-dashboard"]) as b:
>                   with Client(scheduler_address, loop=loop) as c:
distributed\cli\tests\test_dask_scheduler.py:524: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
distributed\client.py:940: in __init__
self.start(timeout=timeout)
distributed\client.py:1098: in start
    sync(self.loop, self._start, **kwargs)
distributed\utils.py:405: in sync
raise exc.with_traceback(tb)
distributed\utils.py:378: in f
    result = yield future
C:\Miniconda3\envs\dask-distributed\lib\site-packages\tornado\gen.py:762: in run
    value = future.result()
distributed\client.py:1178: in _start
awaitself._ensure_connected(timeout=timeout)
distributed\client.py:1241: in _ensure_connected
    comm = await connect(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
addr = 'tcp://127.0.0.1:51201', timeout = 5, deserialize = True
handshake_overrides = None
connection_args = {'extra_conn_args': {}, 'require_encryption': False, 'ssl_context': None}
scheme = 'tcp', loc = '127.0.0.1:51201'
backend = <distributed.comm.tcp.TCPBackend object at 0x000001B178175F10>
connector = <distributed.comm.tcp.TCPConnector object at 0x000001B1017F1250>
comm = None
time_left = <function connect.<locals>.time_left at 0x000001B17F793A60>
backoff_base = 0.01
asyncdefconnect(
        addr, timeout=None, deserialize=True, handshake_overrides=None, **connection_args
    ):
"""
    Connect to the given address (a URI such as ``tcp://127.0.0.1:1234``)
    and yield a ``Comm`` object.  If the connection attempt fails, it is
    retried until the *timeout* is expired.
    """
if timeout isNone:
            timeout = dask.config.get("distributed.comm.timeouts.connect")
        timeout = parse_timedelta(timeout, default="seconds")
        scheme, loc = parse_address(addr)
        backend = registry.get_backend(scheme)
        connector = backend.get_connector()
        comm = None
        start = time()
deftime_left():
            deadline = start + timeout
returnmax(0, deadline - time())
        backoff_base = 0.01
        attempt = 0
# Prefer multiple small attempts than one long attempt. This should protect
# primarily from DNS race conditions
# gh3104, gh4176, gh4167
        intermediate_cap = timeout / 5
        active_exception = None
while time_left() > 0:
try:
                comm = await asyncio.wait_for(
                    connector.connect(loc, deserialize=deserialize, **connection_args),
                    timeout=min(intermediate_cap, time_left()),
                )
break
except FatalCommClosedError:
raise
# Note: CommClosed inherits from OSError
except (asyncio.TimeoutError, OSError) as exc:
                active_exception = exc
# As descibed above, the intermediate timeout is used to distributed
# initial, bulk connect attempts homogeneously. In particular with
# the jitter upon retries we should not be worred about overloading
# any more DNS servers
                intermediate_cap = timeout
# FullJitter see https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/
                upper_cap = min(time_left(), backoff_base * (2**attempt))
                backoff = random.uniform(0, upper_cap)
                attempt += 1
                logger.debug(
"Could not connect to %s, waiting for %s before retrying", loc, backoff
                )
await asyncio.sleep(backoff)
else:
>           raiseOSError(
f"Timed out trying to connect to {addr} after {timeout} s"
            ) fromactive_exception
E           OSError: Timed out trying to connect to tcp://127.0.0.1:51201 after 5 s
distributed\comm\core.py:317: OSError
---------------------------- Captured stderr call -----------------------------
[2022](https://github.com/dask/distributed/runs/7272306647?check_suite_focus=true#step:11:2023)-07-10 18:09:15,453 - distributed.scheduler - INFO - -----------------------------------------------
2022-07-10 18:09:15,472 - distributed.http.proxy - INFO - To route to workers diagnostics web server please install jupyter-server-proxy: python -m pip install jupyter-server-proxy
2022-07-10 18:09:15,480 - distributed.scheduler - INFO - State start
2022-07-10 18:09:15,515 - distributed.scheduler - INFO - -----------------------------------------------
2022-07-10 18:09:15,516 - distributed.scheduler - INFO - Clear task state
2022-07-10 18:09:15,516 - distributed.scheduler - INFO -   Scheduler at:     tcp://127.0.0.1:51201
2022-07-10 18:09:15,517 - distributed.scheduler - INFO -   dashboard at:            127.0.0.1:8787

Metadata

Metadata

Assignees

No one assigned

    Labels

    flaky testIntermittent failures on CI.

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions