-
-
Notifications
You must be signed in to change notification settings - Fork 748
Open
Labels
flaky testIntermittent failures on CI.Intermittent failures on CI.
Description
https://github.com/dask/distributed/runs/6689711008?check_suite_focus=true#step:11:1715
__________________________________ test_basic __________________________________
ConnectionRefusedError: [Errno 111] Connection refused
The above exception was the direct cause of the following exception:
addr = 'tls://127.0.0.1:8786', timeout = 5, deserialize = True
handshake_overrides = None
connection_args = {'extra_conn_args': {}, 'require_encryption': False, 'ssl_context': <ssl.SSLContext object at 0x7f60118d8e40>}
scheme = 'tls', loc = '127.0.0.1:8786'
backend = <distributed.comm.tcp.TLSBackend object at 0x7f60331d9b80>
connector = <distributed.comm.tcp.TLSConnector object at 0x7f60130ef280>
comm = None, time_left = <function connect.<locals>.time_left at 0x7f6011f2b1f0>
backoff_base = 0.01
asyncdefconnect(
addr, timeout=None, deserialize=True, handshake_overrides=None, **connection_args
):
"""
Connect to the given address (a URI such as ``tcp://127.0.0.1:1234``)
and yield a ``Comm`` object. If the connection attempt fails, it is
retried until the *timeout* is expired.
"""
if timeout isNone:
timeout = dask.config.get("distributed.comm.timeouts.connect")
timeout = parse_timedelta(timeout, default="seconds")
scheme, loc = parse_address(addr)
backend = registry.get_backend(scheme)
connector = backend.get_connector()
comm = None
start = time()
deftime_left():
deadline = start + timeout
returnmax(0, deadline - time())
backoff_base = 0.01
attempt = 0
# Prefer multiple small attempts than one long attempt. This should protect
# primarily from DNS race conditions
# gh3104, gh4176, gh4167
intermediate_cap = timeout / 5
active_exception = None
while time_left() > 0:
try:
> comm = await asyncio.wait_for(
connector.connect(loc, deserialize=deserialize, **connection_args),
timeout=min(intermediate_cap, time_left()),
)
distributed/comm/core.py:289:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
fut = <Task finished name='Task-1963' coro=<BaseTCPConnector.connect() done, defined at /home/runner/work/distributed/distri...<distributed.comm.tcp.TLSConnector object at 0x7f60130ef280>: ConnectionRefusedError: [Errno 111] Connection refused')>
timeout = 4.341820001602173
asyncdefwait_for(fut, timeout, *, loop=None):
"""Wait for the single Future or coroutine to complete, with timeout.
Coroutine will be wrapped in Task.
Returns result of the Future or coroutine. When a timeout occurs,
it cancels the task and raises TimeoutError. To avoid the task
cancellation, wrap it in shield().
If the wait is cancelled, the task is also cancelled.
This function is a coroutine.
"""
if loop isNone:
loop = events.get_running_loop()
else:
warnings.warn("The loop argument is deprecated since Python 3.8, "
"and scheduled for removal in Python 3.10.",
DeprecationWarning, stacklevel=2)
if timeout isNone:
returnawait fut
if timeout <= 0:
fut = ensure_future(fut, loop=loop)
if fut.done():
return fut.result()
await _cancel_and_wait(fut, loop=loop)
try:
return fut.result()
except exceptions.CancelledError as exc:
raise exceptions.TimeoutError() fromexc
waiter = loop.create_future()
timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
cb = functools.partial(_release_waiter, waiter)
fut = ensure_future(fut, loop=loop)
fut.add_done_callback(cb)
try:
# wait until the future completes or the timeout
try:
await waiter
except exceptions.CancelledError:
if fut.done():
return fut.result()
else:
fut.remove_done_callback(cb)
# We must ensure that the task is not running
# after wait_for() returns.
# See https://bugs.python.org/issue32751
await _cancel_and_wait(fut, loop=loop)
raise
if fut.done():
> return fut.result()
/usr/share/miniconda3/envs/dask-distributed/lib/python3.9/asyncio/tasks.py:479:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <distributed.comm.tcp.TLSConnector object at 0x7f60130ef280>
address = '127.0.0.1:8786', deserialize = True
connection_args = {'extra_conn_args': {}, 'require_encryption': False, 'ssl_context': <ssl.SSLContext object at 0x7f60118d8e40>}
ip = '127.0.0.1', port = 8786
kwargs = {'ssl_options': <ssl.SSLContext object at 0x7f60118d8e40>}
asyncdefconnect(self, address, deserialize=True, **connection_args):
self._check_encryption(address, connection_args)
ip, port = parse_host_port(address)
kwargs = self._get_connect_args(**connection_args)
try:
stream = awaitself.client.connect(
ip, port, max_buffer_size=MAX_BUFFER_SIZE, **kwargs
)
# Under certain circumstances tornado will have a closed connnection with an
# error and not raise a StreamClosedError.
#
# This occurs with tornado 5.x and openssl 1.1+
if stream.closed() and stream.error:
raise StreamClosedError(stream.error)
except StreamClosedError as e:
# The socket connect() call failed
> convert_stream_closed_error(self, e)
distributed/comm/tcp.py:451:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
obj = <distributed.comm.tcp.TLSConnector object at 0x7f60130ef280>
exc = ConnectionRefusedError(111, 'Connection refused')
defconvert_stream_closed_error(obj, exc):
"""
Re-raise StreamClosedError as CommClosedError.
"""
if exc.real_error isnotNone:
# The stream was closed because of an underlying OS error
exc = exc.real_error
if ssl andisinstance(exc, ssl.SSLError):
if"UNKNOWN_CA"in exc.reason:
raise FatalCommClosedError(f"in {obj}: {exc.__class__.__name__}: {exc}")
> raise CommClosedError(f"in {obj}: {exc.__class__.__name__}: {exc}") fromexc
E distributed.comm.core.CommClosedError: in <distributed.comm.tcp.TLSConnector object at 0x7f60130ef280>: ConnectionRefusedError: [Errno 111] Connection refused
distributed/comm/tcp.py:148: CommClosedError
The above exception was the direct cause of the following exception:
loop = <tornado.platform.asyncio.AsyncIOLoop object at 0x7f601266ba60>
deftest_basic(loop):
with popen(["dask-scheduler", "--no-dashboard"] + tls_args) as s:
with popen(
["dask-worker", "--no-dashboard", "tls://127.0.0.1:8786"] + tls_args
) as w:
> with Client(
"tls://127.0.0.1:8786", loop=loop, security=tls_security()
) as c:
distributed/cli/tests/test_tls_cli.py:35:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
distributed/client.py:942: in __init__
self.start(timeout=timeout)
distributed/client.py:1100: in start
sync(self.loop, self._start, **kwargs)
distributed/utils.py:387: in sync
raise exc.with_traceback(tb)
distributed/utils.py:360: in f
result = yield future
/usr/share/miniconda3/envs/dask-distributed/lib/python3.9/site-packages/tornado/gen.py:762: in run
value = future.result()
distributed/client.py:1192: in _start
awaitself._ensure_connected(timeout=timeout)
distributed/client.py:1255: in _ensure_connected
comm = await connect(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
addr = 'tls://127.0.0.1:8786', timeout = 5, deserialize = True
handshake_overrides = None
connection_args = {'extra_conn_args': {}, 'require_encryption': False, 'ssl_context': <ssl.SSLContext object at 0x7f60118d8e40>}
scheme = 'tls', loc = '127.0.0.1:8786'
backend = <distributed.comm.tcp.TLSBackend object at 0x7f60331d9b80>
connector = <distributed.comm.tcp.TLSConnector object at 0x7f60130ef280>
comm = None, time_left = <function connect.<locals>.time_left at 0x7f6011f2b1f0>
backoff_base = 0.01
asyncdefconnect(
addr, timeout=None, deserialize=True, handshake_overrides=None, **connection_args
):
"""
Connect to the given address (a URI such as ``tcp://127.0.0.1:1234``)
and yield a ``Comm`` object. If the connection attempt fails, it is
retried until the *timeout* is expired.
"""
if timeout isNone:
timeout = dask.config.get("distributed.comm.timeouts.connect")
timeout = parse_timedelta(timeout, default="seconds")
scheme, loc = parse_address(addr)
backend = registry.get_backend(scheme)
connector = backend.get_connector()
comm = None
start = time()
deftime_left():
deadline = start + timeout
returnmax(0, deadline - time())
backoff_base = 0.01
attempt = 0
# Prefer multiple small attempts than one long attempt. This should protect
# primarily from DNS race conditions
# gh3104, gh4176, gh4167
intermediate_cap = timeout / 5
active_exception = None
while time_left() > 0:
try:
comm = await asyncio.wait_for(
connector.connect(loc, deserialize=deserialize, **connection_args),
timeout=min(intermediate_cap, time_left()),
)
break
except FatalCommClosedError:
raise
# Note: CommClosed inherits from OSError
except (asyncio.TimeoutError, OSError) as exc:
active_exception = exc
# As descibed above, the intermediate timeout is used to distributed
# initial, bulk connect attempts homogeneously. In particular with
# the jitter upon retries we should not be worred about overloading
# any more DNS servers
intermediate_cap = timeout
# FullJitter see https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/
upper_cap = min(time_left(), backoff_base * (2**attempt))
backoff = random.uniform(0, upper_cap)
attempt += 1
logger.debug(
"Could not connect to %s, waiting for %s before retrying", loc, backoff
)
await asyncio.sleep(backoff)
else:
> raiseOSError(
f"Timed out trying to connect to {addr} after {timeout} s"
) fromactive_exception
E OSError: Timed out trying to connect to tls://127.0.0.1:8786 after 5 s
distributed/comm/core.py:315: OSError
----------------------------- Captured stdout call -----------------------------
--------------------------- Subprocess stdout/stderr---------------------------
2022-06-01 13:24:02,570 - distributed.nanny - INFO - Start Nanny at: 'tls://127.0.0.1:41059'
2022-06-01 13:24:03,395 - distributed.worker - INFO - Start worker at: tls://127.0.0.1:40863
2022-06-01 13:24:03,395 - distributed.worker - INFO - Listening to: tls://127.0.0.1:40863
2022-06-01 13:24:03,395 - distributed.worker - INFO - dashboard at: 127.0.0.1:38313
2022-06-01 13:24:03,395 - distributed.worker - INFO - Waiting to connect to: tls://127.0.0.1:8786
2022-06-01 13:24:03,395 - distributed.worker - INFO - -------------------------------------------------
2022-06-01 13:24:03,395 - distributed.worker - INFO - Threads: 2
2022-06-01 13:24:03,395 - distributed.worker - INFO - Memory: 6.78 GiB
2022-06-01 13:24:03,395 - distributed.worker - INFO - Local Directory: /home/runner/work/distributed/distributed/dask-worker-space/worker-3k415mh6
2022-06-01 13:24:03,395 - distributed.worker - INFO - -------------------------------------------------
2022-06-01 13:24:04,061 - distributed.worker - INFO - Registered to: tls://127.0.0.1:8786
2022-06-01 13:24:04,062 - distributed.worker - INFO - -------------------------------------------------
2022-06-01 13:24:04,065 - distributed.core - INFO - Starting established connection
2022-06-01 13:24:06,817 - distributed._signals - INFO - Received signal SIGINT (2)
2022-06-01 13:24:06,817 - distributed.nanny - INFO - Closing Nanny at 'tls://127.0.0.1:41059'.
2022-06-01 13:24:06,818 - distributed.nanny - INFO - Nanny asking worker to close
2022-06-01 13:24:06,819 - distributed.worker - INFO - Stopping worker at tls://127.0.0.1:40863
2022-06-01 13:24:06,821 - distributed.worker - INFO - Connection to scheduler broken. Closing without reporting. Status: Status.closing
2022-06-01 13:24:06,974 - distributed.dask_worker - INFO - End worker
--------------------------------------------------------------------------------
--------------------------- Subprocess stdout/stderr---------------------------
2022-06-01 13:24:02,544 - distributed.scheduler - INFO - -----------------------------------------------
2022-06-01 13:24:02,550 - distributed.http.proxy - INFO - To route to workers diagnostics web server please install jupyter-server-proxy: python -m pip install jupyter-server-proxy
2022-06-01 13:24:02,555 - distributed.scheduler - INFO - State start
2022-06-01 13:24:02,558 - distributed.scheduler - INFO - -----------------------------------------------
2022-06-01 13:24:02,559 - distributed.scheduler - INFO - Clear task state
2022-06-01 13:24:02,559 - distributed.scheduler - INFO - Scheduler at: tls://10.1.0.71:8786
2022-06-01 13:24:02,560 - distributed.scheduler - INFO - dashboard at: :8787
2022-06-01 13:24:03,747 - distributed.scheduler - INFO - Register worker <WorkerState 'tls://127.0.0.1:40863', status: init, memory: 0, processing: 0>
2022-06-01 13:24:04,061 - distributed.scheduler - INFO - Starting worker compute stream, tls://127.0.0.1:40863
2022-06-01 13:24:04,061 - distributed.core - INFO - Starting established connection
2022-06-01 13:24:06,821 - distributed.scheduler - INFO - Remove worker <WorkerState 'tls://127.0.0.1:40863', status: closing, memory: 0, processing: 0>
2022-06-01 13:24:06,822 - distributed.core - INFO - Removing comms to tls://127.0.0.1:40863
2022-06-01 13:24:06,822 - distributed.scheduler - INFO - Lost all workers
2022-06-01 13:24:07,100 - distributed._signals - INFO - Received signal SIGINT (2)
2022-06-01 13:24:07,101 - distributed.scheduler - INFO - Scheduler closing...
2022-06-01 13:24:07,101 - distributed.scheduler - INFO - Scheduler closing all comms
2022-06-01 13:24:07,102 - distributed.scheduler - INFO - Stopped scheduler at 'tls://10.1.0.71:8786'
2022-06-01 13:24:07,102 - distributed.scheduler - INFO - End scheduler
--------------------------------------------------------------------------------
Metadata
Metadata
Assignees
Labels
flaky testIntermittent failures on CI.Intermittent failures on CI.