-
-
Notifications
You must be signed in to change notification settings - Fork 748
Closed
Labels
flaky testIntermittent failures on CI.Intermittent failures on CI.
Description
Effectively identical to #6759.
Notice the OSError: Timed out trying to connect to tcp://127.0.0.1:49612 after 5 s buried midway through.
____________________ test_signal_handling[Signals.SIGTERM] _____________________
ConnectionRefusedError: [Errno 61] Connection refused
The above exception was the direct cause of the following exception:
addr = 'tcp://127.0.0.1:49612', timeout = 5, deserialize = True
handshake_overrides = None
connection_args = {'extra_conn_args': {}, 'require_encryption': False, 'ssl_context': None}
scheme = 'tcp', loc = '127.0.0.1:49612'
backend = <distributed.comm.tcp.TCPBackend object at 0x7f8dc5f83a90>
connector = <distributed.comm.tcp.TCPConnector object at 0x7f8dcdf50fd0>
comm = None, time_left = <function connect.<locals>.time_left at 0x7f8dcc9008b0>
backoff_base = 0.01
asyncdefconnect(
addr, timeout=None, deserialize=True, handshake_overrides=None, **connection_args
):
"""
Connect to the given address (a URI such as ``tcp://127.0.0.1:1234``)
and yield a ``Comm`` object. If the connection attempt fails, it is
retried until the *timeout* is expired.
"""
if timeout isNone:
timeout = dask.config.get("distributed.comm.timeouts.connect")
timeout = parse_timedelta(timeout, default="seconds")
scheme, loc = parse_address(addr)
backend = registry.get_backend(scheme)
connector = backend.get_connector()
comm = None
start = time()
deftime_left():
deadline = start + timeout
returnmax(0, deadline - time())
backoff_base = 0.01
attempt = 0
# Prefer multiple small attempts than one long attempt. This should protect
# primarily from DNS race conditions
# gh3104, gh4176, gh4167
intermediate_cap = timeout / 5
active_exception = None
while time_left() > 0:
try:
> comm = await asyncio.wait_for(
connector.connect(loc, deserialize=deserialize, **connection_args),
timeout=min(intermediate_cap, time_left()),
)
distributed/comm/core.py:291:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
fut = <Task finished name='Task-468' coro=<BaseTCPConnector.connect() done, defined at /Users/runner/work/distributed/distri... <distributed.comm.tcp.TCPConnector object at 0x7f8dcdf50fd0>: ConnectionRefusedError: [Errno 61] Connection refused')>
timeout = 2.4676921367645264
asyncdefwait_for(fut, timeout, *, loop=None):
"""Wait for the single Future or coroutine to complete, with timeout.
Coroutine will be wrapped in Task.
Returns result of the Future or coroutine. When a timeout occurs,
it cancels the task and raises TimeoutError. To avoid the task
cancellation, wrap it in shield().
If the wait is cancelled, the task is also cancelled.
This function is a coroutine.
"""
if loop isNone:
loop = events.get_running_loop()
else:
warnings.warn("The loop argument is deprecated since Python 3.8, "
"and scheduled for removal in Python 3.10.",
DeprecationWarning, stacklevel=2)
if timeout isNone:
returnawait fut
if timeout <= 0:
fut = ensure_future(fut, loop=loop)
if fut.done():
return fut.result()
await _cancel_and_wait(fut, loop=loop)
try:
fut.result()
except exceptions.CancelledError as exc:
raise exceptions.TimeoutError() fromexc
else:
raise exceptions.TimeoutError()
waiter = loop.create_future()
timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
cb = functools.partial(_release_waiter, waiter)
fut = ensure_future(fut, loop=loop)
fut.add_done_callback(cb)
try:
# wait until the future completes or the timeout
try:
await waiter
except exceptions.CancelledError:
if fut.done():
return fut.result()
else:
fut.remove_done_callback(cb)
# We must ensure that the task is not running
# after wait_for() returns.
# See https://bugs.python.org/issue32751
await _cancel_and_wait(fut, loop=loop)
raise
if fut.done():
> return fut.result()
../../../miniconda3/envs/dask-distributed/lib/python3.8/asyncio/tasks.py:494:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <distributed.comm.tcp.TCPConnector object at 0x7f8dcdf50fd0>
address = '127.0.0.1:49612', deserialize = True
connection_args = {'extra_conn_args': {}, 'require_encryption': False, 'ssl_context': None}
ip = '127.0.0.1', port = 49612, kwargs = {}
asyncdefconnect(self, address, deserialize=True, **connection_args):
self._check_encryption(address, connection_args)
ip, port = parse_host_port(address)
kwargs = self._get_connect_args(**connection_args)
try:
stream = awaitself.client.connect(
ip, port, max_buffer_size=MAX_BUFFER_SIZE, **kwargs
)
# Under certain circumstances tornado will have a closed connnection with an
# error and not raise a StreamClosedError.
#
# This occurs with tornado 5.x and openssl 1.1+
if stream.closed() and stream.error:
raise StreamClosedError(stream.error)
except StreamClosedError as e:
# The socket connect() call failed
> convert_stream_closed_error(self, e)
distributed/comm/tcp.py:461:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
obj = <distributed.comm.tcp.TCPConnector object at 0x7f8dcdf50fd0>
exc = ConnectionRefusedError(61, 'Connection refused')
defconvert_stream_closed_error(obj, exc):
"""
Re-raise StreamClosedError as CommClosedError.
"""
if exc.real_error isnotNone:
# The stream was closed because of an underlying OS error
exc = exc.real_error
ifisinstance(exc, ssl.SSLError):
if"UNKNOWN_CA"in exc.reason:
raise FatalCommClosedError(f"in {obj}: {exc.__class__.__name__}: {exc}")
> raise CommClosedError(f"in {obj}: {exc.__class__.__name__}: {exc}") fromexc
E distributed.comm.core.CommClosedError: in <distributed.comm.tcp.TCPConnector object at 0x7f8dcdf50fd0>: ConnectionRefusedError: [Errno 61] Connection refused
distributed/comm/tcp.py:142: CommClosedError
The above exception was the direct cause of the following exception:
loop = <tornado.platform.asyncio.AsyncIOLoop object at 0x7f8dcc8c4700>
sig = <Signals.SIGTERM: 15>
@pytest.mark.slow
@pytest.mark.skipif(WINDOWS, reason="POSIX only")
@pytest.mark.parametrize("sig", [signal.SIGINT, signal.SIGTERM])
deftest_signal_handling(loop, sig):
port = open_port()
with subprocess.Popen(
[
"python",
"-m",
"distributed.cli.dask_scheduler",
f"--port={port}",
"--dashboard-address=:0",
],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
) as scheduler:
# Wait for scheduler to start
> with Client(f"127.0.0.1:{port}", loop=loop) as c:
distributed/cli/tests/test_dask_scheduler.py:548:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <Client: No scheduler connected>, address = '127.0.0.1:49612'
loop = <tornado.platform.asyncio.AsyncIOLoop object at 0x7f8dcc8c4700>
timeout = 5, set_as_default = True, scheduler_file = None
security = Security(require_encryption=False, tls_min_version=771)
asynchronous = False, name = None, heartbeat_interval = 5, serializers = None
deserializers = None
extensions = {'pubsub': <class 'distributed.pubsub.PubSubClientExtension'>}
direct_to_workers = None, connection_limit = 512, kwargs = {}
scheduler_info_interval = 2, preload = [], preload_argv = []
def__init__(
self,
address=None,
loop=None,
timeout=no_default,
set_as_default=True,
scheduler_file=None,
security=None,
asynchronous=False,
name=None,
heartbeat_interval=None,
serializers=None,
deserializers=None,
extensions=DEFAULT_EXTENSIONS,
direct_to_workers=None,
connection_limit=512,
**kwargs,
):
if timeout == no_default:
timeout = dask.config.get("distributed.comm.timeouts.connect")
if timeout isnotNone:
timeout = parse_timedelta(timeout, "s")
self._timeout = timeout
self.futures = dict()
self.refcount = defaultdict(lambda: 0)
self._handle_report_task = None
if name isNone:
name = dask.config.get("client-name", None)
self.id = (
type(self).__name__
+ ("-" + name + "-"if name else"-")
+ str(uuid.uuid1(clock_seq=os.getpid()))
)
self.generation = 0
self.status = "newly-created"
self._pending_msg_buffer = []
self.extensions = {}
self.scheduler_file = scheduler_file
self._startup_kwargs = kwargs
self.cluster = None
self.scheduler = None
self._scheduler_identity = {}
# A reentrant-lock on the refcounts for futures associated with this
# client. Should be held by individual operations modifying refcounts,
# or any bulk operation that needs to ensure the set of futures doesn't
# change during operation.
self._refcount_lock = threading.RLock()
self.datasets = Datasets(self)
self._serializers = serializers
if deserializers isNone:
deserializers = serializers
self._deserializers = deserializers
self.direct_to_workers = direct_to_workers
# Communication
self.scheduler_comm = None
if address isNone:
address = dask.config.get("scheduler-address", None)
if address:
logger.info("Config value `scheduler-address` found: %s", address)
if address isnotNoneand kwargs:
raiseValueError(f"Unexpected keyword arguments: {sorted(kwargs)}")
ifisinstance(address, (rpc, PooledRPCCall)):
self.scheduler = address
elifisinstance(getattr(address, "scheduler_address", None), str):
# It's a LocalCluster or LocalCluster-compatible object
self.cluster = address
status = getattr(self.cluster, "status")
if status and status in [Status.closed, Status.closing]:
raiseRuntimeError(
f"Trying to connect to an already closed or closing Cluster {self.cluster}."
)
with suppress(AttributeError):
loop = address.loop
if security isNone:
security = getattr(self.cluster, "security", None)
elif address isnotNoneandnotisinstance(address, str):
raiseTypeError(
"Scheduler address must be a string or a Cluster instance, got {}".format(
type(address)
)
)
# If connecting to an address and no explicit security is configured, attempt
# to load security credentials with a security loader (if configured).
if security isNoneandisinstance(address, str):
security = _maybe_call_security_loader(address)
if security isNone:
security = Security()
elifisinstance(security, dict):
security = Security(**security)
elif security isTrue:
security = Security.temporary()
self._startup_kwargs["security"] = security
elifnotisinstance(security, Security): # pragma: no cover
raiseTypeError("security must be a Security object")
self.security = security
if name == "worker":
self.connection_args = self.security.get_connection_args("worker")
else:
self.connection_args = self.security.get_connection_args("client")
self._asynchronous = asynchronous
self._loop_runner = LoopRunner(loop=loop, asynchronous=asynchronous)
self.io_loop = self.loop = self._loop_runner.loop
self._connecting_to_scheduler = False
self._gather_keys = None
self._gather_future = None
if heartbeat_interval isNone:
heartbeat_interval = dask.config.get("distributed.client.heartbeat")
heartbeat_interval = parse_timedelta(heartbeat_interval, default="ms")
scheduler_info_interval = parse_timedelta(
dask.config.get("distributed.client.scheduler-info-interval", default="ms")
)
self._periodic_callbacks = dict()
self._periodic_callbacks["scheduler-info"] = PeriodicCallback(
self._update_scheduler_info, scheduler_info_interval * 1000
)
self._periodic_callbacks["heartbeat"] = PeriodicCallback(
self._heartbeat, heartbeat_interval * 1000
)
self._start_arg = address
self._set_as_default = set_as_default
if set_as_default:
self._set_config = dask.config.set(
scheduler="dask.distributed", shuffle="tasks"
)
self._event_handlers = {}
self._stream_handlers = {
"key-in-memory": self._handle_key_in_memory,
"lost-data": self._handle_lost_data,
"cancelled-key": self._handle_cancelled_key,
"task-retried": self._handle_retried_key,
"task-erred": self._handle_task_erred,
"restart": self._handle_restart,
"error": self._handle_error,
"event": self._handle_event,
}
self._state_handlers = {
"memory": self._handle_key_in_memory,
"lost": self._handle_lost_data,
"erred": self._handle_task_erred,
}
self.rpc = ConnectionPool(
limit=connection_limit,
serializers=serializers,
deserializers=deserializers,
deserialize=True,
connection_args=self.connection_args,
timeout=timeout,
server=self,
)
self.extensions = {
name: extension(self) for name, extension in extensions.items()
}
preload = dask.config.get("distributed.client.preload")
preload_argv = dask.config.get("distributed.client.preload-argv")
self.preloads = preloading.process_preloads(self, preload, preload_argv)
> self.start(timeout=timeout)
distributed/client.py:940:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <Client: No scheduler connected>, kwargs = {'timeout': 5}
defstart(self, **kwargs):
"""Start scheduler running in separate thread"""
ifself.status != "newly-created":
return
self._loop_runner.start()
ifself._set_as_default:
_set_global_client(self)
ifself.asynchronous:
self._started = asyncio.ensure_future(self._start(**kwargs))
else:
> sync(self.loop, self._start, **kwargs)
distributed/client.py:1098:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
loop = <tornado.platform.asyncio.AsyncIOLoop object at 0x7f8dcc8c4700>
func = <bound method Client._start of <Client: No scheduler connected>>
callback_timeout = None, args = (), kwargs = {'timeout': 5}
f = <function sync.<locals>.f at 0x7f8dcdc9a280>
wait = <function sync.<locals>.wait at 0x7f8dcdc9a700>, typ = <class 'OSError'>
exc = OSError('Timed out trying to connect to tcp://127.0.0.1:49612 after 5 s')
tb = <traceback object at 0x7f8dcdd4fec0>
defsync(loop, func, *args, callback_timeout=None, **kwargs):
"""
Run coroutine in loop running in separate thread.
"""
callback_timeout = _parse_timedelta(callback_timeout, "s")
if loop.asyncio_loop.is_closed():
raiseRuntimeError("IOLoop is closed")
e = threading.Event()
main_tid = threading.get_ident()
result = error = future = None# set up non-locals
@gen.coroutine
deff():
nonlocal result, error, future
try:
if main_tid == threading.get_ident():
raiseRuntimeError("sync() called from thread of running loop")
yield gen.moment
future = func(*args, **kwargs)
if callback_timeout isnotNone:
future = asyncio.wait_for(future, callback_timeout)
future = asyncio.ensure_future(future)
result = yield future
exceptException:
error = sys.exc_info()
finally:
e.set()
defcancel():
if future isnotNone:
future.cancel()
defwait(timeout):
try:
return e.wait(timeout)
exceptKeyboardInterrupt:
loop.add_callback(cancel)
raise
loop.add_callback(f)
if callback_timeout isnotNone:
ifnot wait(callback_timeout):
raiseTimeoutError(f"timed out after {callback_timeout} s.")
else:
whilenot e.is_set():
wait(10)
if error:
typ, exc, tb = error
> raise exc.with_traceback(tb)
distributed/utils.py:405:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
@gen.coroutine
deff():
nonlocal result, error, future
try:
if main_tid == threading.get_ident():
raiseRuntimeError("sync() called from thread of running loop")
yield gen.moment
future = func(*args, **kwargs)
if callback_timeout isnotNone:
future = asyncio.wait_for(future, callback_timeout)
future = asyncio.ensure_future(future)
> result = yield future
distributed/utils.py:378:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <tornado.gen.Runner object at 0x7f8dcdc9efd0>
defrun(self) -> None:
"""Starts or resumes the generator, running until it reaches a
yield point that is not ready.
"""
ifself.running orself.finished:
return
try:
self.running = True
whileTrue:
future = self.future
if future isNone:
raiseException("No pending future")
ifnot future.done():
return
self.future = None
try:
exc_info = None
try:
> value = future.result()
../../../miniconda3/envs/dask-distributed/lib/python3.8/site-packages/tornado/gen.py:762:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <Client: No scheduler connected>, timeout = 5, kwargs = {}
address = '127.0.0.1:49612'
asyncdef_start(self, timeout=no_default, **kwargs):
self.status = "connecting"
awaitself.rpc.start()
if timeout == no_default:
timeout = self._timeout
if timeout isnotNone:
timeout = parse_timedelta(timeout, "s")
address = self._start_arg
ifself.cluster isnotNone:
# Ensure the cluster is started (no-op if already running)
try:
awaitself.cluster
exceptException:
logger.info(
"Tried to start cluster and received an error. Proceeding.",
exc_info=True,
)
address = self.cluster.scheduler_address
elifself.scheduler_file isnotNone:
whilenot os.path.exists(self.scheduler_file):
await asyncio.sleep(0.01)
for i inrange(10):
try:
withopen(self.scheduler_file) as f:
cfg = json.load(f)
address = cfg["address"]
break
except (ValueError, KeyError): # JSON file not yet flushed
await asyncio.sleep(0.01)
elifself._start_arg isNone:
fromdistributed.deployimport LocalCluster
self.cluster = await LocalCluster(
loop=self.loop,
asynchronous=self._asynchronous,
**self._startup_kwargs,
)
address = self.cluster.scheduler_address
self._gather_semaphore = asyncio.Semaphore(5)
ifself.scheduler isNone:
self.scheduler = self.rpc(address)
self.scheduler_comm = None
try:
> awaitself._ensure_connected(timeout=timeout)
distributed/client.py:1178:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <Client: No scheduler connected>, timeout = 5
asyncdef_ensure_connected(self, timeout=None):
if (
self.scheduler_comm
andnotself.scheduler_comm.closed()
orself._connecting_to_scheduler
orself.scheduler isNone
):
return
self._connecting_to_scheduler = True
try:
> comm = await connect(
self.scheduler.address, timeout=timeout, **self.connection_args
)
distributed/client.py:1241:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
addr = 'tcp://127.0.0.1:49612', timeout = 5, deserialize = True
handshake_overrides = None
connection_args = {'extra_conn_args': {}, 'require_encryption': False, 'ssl_context': None}
scheme = 'tcp', loc = '127.0.0.1:49612'
backend = <distributed.comm.tcp.TCPBackend object at 0x7f8dc5f83a90>
connector = <distributed.comm.tcp.TCPConnector object at 0x7f8dcdf50fd0>
comm = None, time_left = <function connect.<locals>.time_left at 0x7f8dcc9008b0>
backoff_base = 0.01
asyncdefconnect(
addr, timeout=None, deserialize=True, handshake_overrides=None, **connection_args
):
"""
Connect to the given address (a URI such as ``tcp://127.0.0.1:1234``)
and yield a ``Comm`` object. If the connection attempt fails, it is
retried until the *timeout* is expired.
"""
if timeout isNone:
/Users/runner/miniconda3/envs/dask-distributed/lib/python3.8/site-packages/coverage/data.py:130: CoverageWarning: Data file '/Users/runner/work/distributed/distributed/.coverage.Mac-1658296738475.local.78023.140429' doesn't seem to be a coverage data file: cannot unpack non-iterable NoneType object
timeout = dask.config.get("distributed.comm.timeouts.connect")
timeout = parse_timedelta(timeout, default="seconds")
scheme, loc = parse_address(addr)
backend = registry.get_backend(scheme)
connector = backend.get_connector()
comm = None
start = time()
deftime_left():
deadline = start + timeout
returnmax(0, deadline - time())
backoff_base = 0.01
attempt = 0
# Prefer multiple small attempts than one long attempt. This should protect
# primarily from DNS race conditions
# gh3104, gh4176, gh4167
intermediate_cap = timeout / 5
active_exception = None
while time_left() > 0:
try:
comm = await asyncio.wait_for(
connector.connect(loc, deserialize=deserialize, **connection_args),
timeout=min(intermediate_cap, time_left()),
)
break
except FatalCommClosedError:
raise
# Note: CommClosed inherits from OSError
except (asyncio.TimeoutError, OSError) as exc:
active_exception = exc
# As descibed above, the intermediate timeout is used to distributed
# initial, bulk connect attempts homogeneously. In particular with
# the jitter upon retries we should not be worred about overloading
# any more DNS servers
intermediate_cap = timeout
# FullJitter see https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/
upper_cap = min(time_left(), backoff_base * (2**attempt))
backoff = random.uniform(0, upper_cap)
attempt += 1
logger.debug(
"Could not connect to %s, waiting for %s before retrying", loc, backoff
)
await asyncio.sleep(backoff)
else:
> raiseOSError(
data._warn(str(exc))
/Users/runner/miniconda3/envs/dask-distributed/lib/python3.8/site-packages/coverage/data.py:130: CoverageWarning: Couldn't use data file '/Users/runner/work/distributed/distributed/.coverage.Mac-1658296738475.local.72760.033674': database disk image is malformed
data._warn(str(exc))
/Users/runner/miniconda3/envs/dask-distributed/lib/python3.8/site-packages/coverage/data.py:130: CoverageWarning: Couldn't use data file '/Users/runner/work/distributed/distributed/.coverage.Mac-1658296738475.local.87402.361271': database disk image is malformed
data._warn(str(exc))
/Users/runner/miniconda3/envs/dask-distributed/lib/python3.8/site-packages/coverage/data.py:130: CoverageWarning: Data file '/Users/runner/work/distributed/distributed/.coverage.Mac-1658296738475.local.7[3182](https://github.com/dask/distributed/runs/7423197754?check_suite_focus=true#step:11:3183).363723' doesn't seem to be a coverage data file: cannot unpack non-iterable NoneType object
data._warn(str(exc))
/Users/runner/miniconda3/envs/dask-distributed/lib/python3.8/site-packages/coverage/data.py:130: CoverageWarning: Data file '/Users/runner/work/distributed/distributed/.coverage.Mac-1658296738475.local.71032.592752' doesn't seem to be a coverage data file: cannot unpack non-iterable NoneType object
data._warn(str(exc))
/Users/runner/miniconda3/envs/dask-distributed/lib/python3.8/site-packages/coverage/data.py:130: CoverageWarning: Data file '/Users/runner/work/distributed/distributed/.coverage.Mac-1658296738475.local.70399.726712' doesn't seem to be a coverage data file: cannot unpack non-iterable NoneType object
data._warn(str(exc))
/Users/runner/miniconda3/envs/dask-distributed/lib/python3.8/site-packages/coverage/data.py:130: CoverageWarning: Couldn't use data file '/Users/runner/work/distributed/distributed/.coverage.Mac-1658296738475.local.3822.835247': database disk image is malformed
data._warn(str(exc))
/Users/runner/miniconda3/envs/dask-distributed/lib/python3.8/site-packages/coverage/data.py:130: CoverageWarning: Data file '/Users/runner/work/distributed/distributed/.coverage.Mac-1658296738475.local.86633.233724' doesn't seem to be a coverage data file: cannot unpack non-iterable NoneType object
data._warn(str(exc))
/Users/runner/miniconda3/envs/dask-distributed/lib/python3.8/site-packages/coverage/data.py:130: CoverageWarning: Couldn't use data file '/Users/runner/work/distributed/distributed/.coverage.Mac-1658296738475.local.99702.590003': database disk image is malformed
data._warn(str(exc))
/Users/runner/miniconda3/envs/dask-distributed/lib/python3.8/site-packages/coverage/data.py:130: CoverageWarning: Data file '/Users/runner/work/distributed/distributed/.coverage.Mac-1658296738475.local.84241.329987' doesn't seem to be a coverage data file: cannot unpack non-iterable NoneType object
data._warn(str(exc))
/Users/runner/miniconda3/envs/dask-distributed/lib/python3.8/site-packages/coverage/data.py:130: CoverageWarning: Data file '/Users/runner/work/distributed/distributed/.coverage.Mac-1658296738475.local.72367.236197' doesn't seem to be a coverage data file: cannot unpack non-iterable NoneType object
data._warn(str(exc))
/Users/runner/miniconda3/envs/dask-distributed/lib/python3.8/site-packages/coverage/data.py:130: CoverageWarning: Data file '/Users/runner/work/distributed/distributed/.coverage.Mac-1658296738475.local.77724.449058' doesn't seem to be a coverage data file: cannot unpack non-iterable NoneType object
data._warn(str(exc))
/Users/runner/miniconda3/envs/dask-distributed/lib/python3.8/site-packages/coverage/data.py:130: CoverageWarning: Data file '/Users/runner/work/distributed/distributed/.coverage.Mac-1658296738475.local.85709.795777' doesn't seem to be a coverage data file: cannot unpack non-iterable NoneType object
data._warn(str(exc))
/Users/runner/miniconda3/envs/dask-distributed/lib/python3.8/site-packages/coverage/data.py:130: CoverageWarning: Data file '/Users/runner/work/distributed/distributed/.coverage.Mac-1658296738475.local.77169.228350' doesn't seem to be a coverage data file: cannot unpack non-iterable NoneType object
data._warn(str(exc))
/Users/runner/miniconda3/envs/dask-distributed/lib/python3.8/site-packages/coverage/data.py:130: CoverageWarning: Data file '/Users/runner/work/distributed/distributed/.coverage.Mac-1658296738475.local.85670.761014' doesn't seem to be a coverage data file: cannot unpack non-iterable NoneType object
data._warn(str(exc))
/Users/runner/miniconda3/envs/dask-distributed/lib/python3.8/site-packages/coverage/data.py:130: CoverageWarning: Data file '/Users/runner/work/distributed/distributed/.coverage.Mac-1658296738475.local.91091.296176' doesn't seem to be a coverage data file: cannot unpack non-iterable NoneType object
data._warn(str(exc))
/Users/runner/miniconda3/envs/dask-distributed/lib/python3.8/site-packages/coverage/data.py:130: CoverageWarning: Data file '/Users/runner/work/distributed/distributed/.coverage.Mac-1658296738475.local.61627.753139' doesn't seem to be a coverage data file: cannot unpack non-iterable NoneType object
data._warn(str(exc))
/Users/runner/miniconda3/envs/dask-distributed/lib/python3.8/site-packages/coverage/data.py:130: CoverageWarning: Data file '/Users/runner/work/distributed/distributed/.coverage.Mac-1658296738475.local.86578.711167' doesn't seem to be a coverage data file: cannot unpack non-iterable NoneType object
data._warn(str(exc))
/Users/runner/miniconda3/envs/dask-distributed/lib/python3.8/site-packages/coverage/data.py:130: CoverageWarning: Data file '/Users/runner/work/distributed/distributed/.coverage.Mac-1658296738475.local.72421.482150' doesn't seem to be a coverage data file: cannot unpack non-iterable NoneType object
data._warn(str(exc))
/Users/runner/miniconda3/envs/dask-distributed/lib/python3.8/site-packages/coverage/data.py:130: CoverageWarning: Data file '/Users/runner/work/distributed/distributed/.coverage.Mac-1658296738475.local.71160.058181' doesn't seem to be a coverage data file: cannot unpack non-iterable NoneType object
data._warn(str(exc))
/Users/runner/miniconda3/envs/dask-distributed/lib/python3.8/site-packages/coverage/data.py:130: CoverageWarning: Data file '/Users/runner/work/distributed/distributed/.coverage.Mac-1658296738475.local.98830.330873' doesn't seem to be a coverage data file: cannot unpack non-iterable NoneType object
data._warn(str(exc))
/Users/runner/miniconda3/envs/dask-distributed/lib/python3.8/site-packages/coverage/data.py:130: CoverageWarning: Data file '/Users/runner/work/distributed/distributed/.coverage.Mac-1658296738475.local.72366.830983' doesn't seem to be a coverage data file: cannot unpack non-iterable NoneType object
data._warn(str(exc))
/Users/runner/miniconda3/envs/dask-distributed/lib/python3.8/site-packages/coverage/data.py:130: CoverageWarning: Data file '/Users/runner/work/distributed/distributed/.coverage.Mac-1658296738475.local.98833.571890' doesn't seem to be a coverage data file: cannot unpack non-iterable NoneType object
data._warn(str(exc))
/Users/runner/miniconda3/envs/dask-distributed/lib/python3.8/site-packages/coverage/data.py:130: CoverageWarning: Data file '/Users/runner/work/distributed/distributed/.coverage.Mac-1658296738475.local.83366.015473' doesn't seem to be a coverage data file: cannot unpack non-iterable NoneType object
data._warn(str(exc))
/Users/runner/miniconda3/envs/dask-distributed/lib/python3.8/site-packages/coverage/data.py:130: CoverageWarning: Data file '/Users/runner/work/distributed/distributed/.coverage.Mac-1658296738475.local.77236.386746' doesn't seem to be a coverage data file: cannot unpack non-iterable NoneType object
data._warn(str(exc))
/Users/runner/miniconda3/envs/dask-distributed/lib/python3.8/site-packages/coverage/data.py:130: CoverageWarning: Data file '/Users/runner/work/distributed/distributed/.coverage.Mac-1658296738475.local.81251.843601' doesn't seem to be a coverage data file: cannot unpack non-iterable NoneType object
data._warn(str(exc))
f"Timed out trying to connect to {addr} after {timeout} s"
) fromactive_exception
E OSError: Timed out trying to connect to tcp://127.0.0.1:49612 after 5 s
distributed/comm/core.py:317: OSError
During handling of the above exception, another exception occurred:
loop = <tornado.platform.asyncio.AsyncIOLoop object at 0x7f8dcc8c4700>
sig = <Signals.SIGTERM: 15>
@pytest.mark.slow
@pytest.mark.skipif(WINDOWS, reason="POSIX only")
@pytest.mark.parametrize("sig", [signal.SIGINT, signal.SIGTERM])
deftest_signal_handling(loop, sig):
port = open_port()
with subprocess.Popen(
[
"python",
"-m",
"distributed.cli.dask_scheduler",
f"--port={port}",
"--dashboard-address=:0",
],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
) as scheduler:
# Wait for scheduler to start
> with Client(f"127.0.0.1:{port}", loop=loop) as c:
distributed/cli/tests/test_dask_scheduler.py:548:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
../../../miniconda3/envs/dask-distributed/lib/python3.8/subprocess.py:937: in __exit__
self.wait()
../../../miniconda3/envs/dask-distributed/lib/python3.8/subprocess.py:1083: in wait
returnself._wait(timeout=timeout)
../../../miniconda3/envs/dask-distributed/lib/python3.8/subprocess.py:1806: in _wait
(pid, sts) = self._try_wait(0)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <subprocess.Popen object at 0x7f8dcc8c4dc0>, wait_flags = 0
def_try_wait(self, wait_flags):
"""All callers to this function MUST hold self._waitpid_lock."""
try:
> (pid, sts) = os.waitpid(self.pid, wait_flags)
E Failed: Timeout >300.0s
../../../miniconda3/envs/dask-distributed/lib/python3.8/subprocess.py:1764: Failed
----------------------------- Captured stderr call -----------------------------
+++++++++++++++++++++++++++++++++++ Timeout ++++++++++++++++++++++++++++++++++++
~~~~~~~~~~~~~~ Stack of ThreadPoolExecutor-17_0 (123145390088192) ~~~~~~~~~~~~~~
File "/Users/runner/miniconda3/envs/dask-distributed/lib/python3.8/threading.py", line 890, in _bootstrap
self._bootstrap_inner()
File "/Users/runner/miniconda3/envs/dask-distributed/lib/python3.8/threading.py", line 932, in _bootstrap_inner
self.run()
File "/Users/runner/miniconda3/envs/dask-distributed/lib/python3.8/threading.py", line 870, in run
self._target(*self._args, **self._kwargs)
File "/Users/runner/miniconda3/envs/dask-distributed/lib/python3.8/concurrent/futures/thread.py", line 78, in _worker
work_item = work_queue.get(block=True)
~~~~~~~~~~~~~~~~~~~~~~ Stack of IO loop (123145373298688) ~~~~~~~~~~~~~~~~~~~~~~
File "/Users/runner/miniconda3/envs/dask-distributed/lib/python3.8/threading.py", line 890, in _bootstrap
self._bootstrap_inner()
File "/Users/runner/miniconda3/envs/dask-distributed/lib/python3.8/threading.py", line 932, in _bootstrap_inner
self.run()
File "/Users/runner/miniconda3/envs/dask-distributed/lib/python3.8/threading.py", line 870, in run
self._target(*self._args, **self._kwargs)
File "/Users/runner/work/distributed/distributed/distributed/utils.py", line 485, in run_loop
loop.start()
File "/Users/runner/work/distributed/distributed/distributed/utils_test.py", line 158, in start
orig_start()
File "/Users/runner/miniconda3/envs/dask-distributed/lib/python3.8/site-packages/tornado/platform/asyncio.py", line 199, in start
self.asyncio_loop.run_forever()
File "/Users/runner/miniconda3/envs/dask-distributed/lib/python3.8/asyncio/base_events.py", line 570, in run_forever
self._run_once()
File "/Users/runner/miniconda3/envs/dask-distributed/lib/python3.8/asyncio/base_events.py", line 1823, in _run_once
event_list = self._selector.select(timeout)
File "/Users/runner/miniconda3/envs/dask-distributed/lib/python3.8/selectors.py", line 558, in select
kev_list = self._selector.control(None, max_ev, timeout)
~~~~~~~~~~~~~~~~~~ Stack of Dask-Offload_0 (123145356509184) ~~~~~~~~~~~~~~~~~~~
File "/Users/runner/miniconda3/envs/dask-distributed/lib/python3.8/threading.py", line 890, in _bootstrap
self._bootstrap_inner()
File "/Users/runner/miniconda3/envs/dask-distributed/lib/python3.8/threading.py", line 932, in _bootstrap_inner
self.run()
File "/Users/runner/miniconda3/envs/dask-distributed/lib/python3.8/threading.py", line 870, in run
self._target(*self._args, **self._kwargs)
File "/Users/runner/miniconda3/envs/dask-distributed/lib/python3.8/concurrent/futures/thread.py", line 78, in _worker
work_item = work_queue.get(block=True)
+++++++++++++++++++++++++++++++++++ Timeout ++++++++++++++++++++++++++++++++++++
https://github.com/dask/distributed/runs/7423197754?check_suite_focus=true#step:11:3917
Metadata
Metadata
Assignees
Labels
flaky testIntermittent failures on CI.Intermittent failures on CI.