Skip to content

Conversation

@douglasdavis
Copy link
Member

This adds an additional check to avoid quiet clusters (as Matt describes in #4637) repeatedly using the same worker (the new test fails without the added sum of occupancies check). I understand this is a CPU sensitive area, and perhaps the sum of occupancies is too expensive a task to use here. Very happy to hear suggestions to try alternatives.

@mrocklin
Copy link
Member

mrocklin commented Mar 26, 2021 via email

@douglasdavis douglasdavis force-pushed the gh4637 branch 5 times, most recently from 56d13f7 to 5635353 Compare April 1, 2021 00:45
@mrocklin
Copy link
Member

mrocklin commented Apr 1, 2021

How are things going here @douglasdavis ? I would be happy to meet up and debrief if you prefer. Sometimes it's nice to talk through these things with someone.

@douglasdavis
Copy link
Member Author

douglasdavis commented Apr 1, 2021

Sounds like a good idea! I pushed a commit yesterday inspired by your idea to zoom to the nth task and search from there, I think it would useful to chat about it.

Copy link
Member

@jakirkham jakirkham left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Included some comments regarding typing things in the Scheduler for Cythonization. Maybe not the most critical thing atm given the code is still being worked out, but wanted to share this perspective as well. Hopefully this shows how one can approach this when appropriate

@douglasdavis
Copy link
Member Author

douglasdavis commented Apr 2, 2021

Included some comments regarding typing things in the Scheduler for Cythonization. Maybe not the most critical thing atm given the code is still being worked out, but wanted to share this perspective as well. Hopefully this shows how one can approach this when appropriate

many thanks for pointing that out @jakirkham - I commited the typing suggestions to make sure to stick with them for the rest of the PR.

douglasdavis and others added 10 commits April 2, 2021 12:24
use type annotated attribute

Co-authored-by: jakirkham <jakirkham@gmail.com>
type annotations

Co-authored-by: jakirkham <jakirkham@gmail.com>
type annotations

Co-authored-by: jakirkham <jakirkham@gmail.com>
type annotations

Co-authored-by: jakirkham <jakirkham@gmail.com>
Co-authored-by: jakirkham <jakirkham@gmail.com>
douglasdavis and others added 2 commits April 6, 2021 17:47
Co-authored-by: jakirkham <jakirkham@gmail.com>
@douglasdavis
Copy link
Member Author

douglasdavis commented Apr 9, 2021 via email

Copy link
Member

@jakirkham jakirkham left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we are using worker_pool.values() in a few places, maybe it makes sense to construct this object once and reuse it elsewhere? Included a few relevant suggestion below

@douglasdavis
Copy link
Member Author

Since we are using worker_pool.values() in a few places, maybe it makes sense to construct this object once and reuse it elsewhere? Included a few relevant suggestion below

agreed!

Copy link
Member

@jakirkham jakirkham left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thanks Doug! 😄

Copy link
Member

@jrbourbeau jrbourbeau left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @douglasdavis! The test_retries failure here looks like a genuine (non-flaky) test failure. I'll take a look at this (though if you have any thoughts, that would also be welcome : ) )

@jakirkham
Copy link
Member

Do we have an issue tracking those flaky tests? Think I'm also seeing these in another PR

@jrbourbeau
Copy link
Member

The list of current flaky tests is here

@jakirkham
Copy link
Member

Thanks. Do we know why these are affecting all jobs now?

@jrbourbeau
Copy link
Member

It seems test flakyness has increased has generally increased over the past few weeks (I'll open an issue about this) but they're not at the rate where they will impact all jobs. Can you point me to the PR you're referring to?

@jakirkham
Copy link
Member

By all jobs I mean all jobs in a PR. For example all of the jobs for this PR are failing

@jrbourbeau
Copy link
Member

Gotcha -- it looks like there are genuine, non-flaky test failures both in this PR and over in #4650 (see #4650 (review))

@jrbourbeau
Copy link
Member

@douglasdavis it looks like distributed/tests/test_client_executor.py::test_retries was unintentionally passing before this PR. Specifically, varying behaves differently depending on if workers in a cluster all exist in the same process (like they do when we use gen_cluster) or if they are all in separate processes (like they are when we use the client fixture in test_retries).

I recommend bumping up the retries= in test_retries to the following:

diff --git a/distributed/tests/test_client_executor.py b/distributed/tests/test_client_executor.py
index 555bcb86..3942a2ca 100644
--- a/distributed/tests/test_client_executor.py
+++ b/distributed/tests/test_client_executor.py
@@ -210,26 +210,24 @@ def test_unsupported_arguments(client, s, a, b):
 def test_retries(client):
     args = [ZeroDivisionError("one"), ZeroDivisionError("two"), 42]

-    with client.get_executor(retries=3, pure=False) as e:
+    with client.get_executor(retries=5, pure=False) as e:
         future = e.submit(varying(args))
         assert future.result() == 42

-    with client.get_executor(retries=2) as e:
+    with client.get_executor(retries=4) as e:
         future = e.submit(varying(args))
         result = future.result()
         assert result == 42

-    with client.get_executor(retries=1) as e:
+    with client.get_executor(retries=2) as e:
         future = e.submit(varying(args))
-        with pytest.raises(ZeroDivisionError) as exc_info:
+        with pytest.raises(ZeroDivisionError, match="two"):
             res = future.result()
-        exc_info.match("two")

     with client.get_executor(retries=0) as e:
         future = e.submit(varying(args))
-        with pytest.raises(ZeroDivisionError) as exc_info:
+        with pytest.raises(ZeroDivisionError, match="one"):
             res = future.result()
-        exc_info.match("one")

which should resolve the issue. Alternatively, one could try to use gen_cluster in test_retries but I don't know if client.get_executor works with asynchronous clients (I tried this out briefly but couldn't get tests to pass).

@douglasdavis
Copy link
Member Author

thanks @jrbourbeau! I'm still getting a test failure locally; not exactly sure what's going on. I went ahead and committed your suggested changes to see how the full CI suite goes. More details about the local failure I'm seeing are below.

This import error is popping up:

ImportError: cannot import name 'dumps_msgpack' from 'distributed.protocol.core'

there is indeed no dumps_msgpack import-able in the distributed.protocol.core module.

full report from `pytest distributed/tests -k test_retries` (dask-dev) ddavis@strange ~/software/repos/distributed (gh4637) $ pytest distributed/tests -k test_retries -vv ========================================== test session starts ========================================== platform linux -- Python 3.9.4, pytest-6.2.3, py-1.10.0, pluggy-0.13.1 -- /home/ddavis/.pyenv/versions/3.9.4/envs/dask-dev/bin/python3.9 cachedir: .pytest_cache rootdir: /home/ddavis/software/repos/distributed, configfile: setup.cfg plugins: flaky-3.7.0, asyncio-0.14.0 collected 1192 items / 1188 deselected / 4 selected

distributed/tests/test_client.py::test_retries_get FAILED [ 25%]
distributed/tests/test_client.py::test_retries_dask_array <- distributed/utils_test.py FAILED [ 50%]
distributed/tests/test_client_executor.py::test_retries FAILED [ 75%]
distributed/tests/test_scheduler.py::test_retries <- distributed/utils_test.py FAILED [100%]

=============================================== FAILURES ================================================
___________________________________________ test_retries_get ____________________________________________

c = <Client: 'tcp://127.0.0.1:44311' processes=2 threads=2, memory=62.58 GiB>

def test_retries_get(c):
    args = [ZeroDivisionError("one"), ZeroDivisionError("two"), 3]
    x = delayed(varying(args))()
  assert x.compute(retries=5) == 3

distributed/tests/test_client.py:302:


../../../.pyenv/versions/3.9.4/envs/dask-dev/lib/python3.9/site-packages/dask/base.py:284: in compute
(result,) = compute(self, traverse=False, **kwargs)
../../../.pyenv/versions/3.9.4/envs/dask-dev/lib/python3.9/site-packages/dask/base.py:566: in compute
results = schedule(dsk, keys, **kwargs)
distributed/client.py:2646: in get
futures = self._graph_to_futures(
distributed/client.py:2554: in _graph_to_futures
dsk = dsk.dask_distributed_pack(self, keyset)


self = <dask.highlevelgraph.HighLevelGraph object at 0x7faee221ca30>
client = <Client: 'tcp://127.0.0.1:44311' processes=2 threads=2, memory=62.58 GiB>
client_keys = {'func-79d80bbd-d455-4145-a5ca-4aa55baffa9e'}

def __dask_distributed_pack__(self, client, client_keys: Iterable[Hashable]) -> Any:
    """Pack the high level graph for Scheduler -> Worker communication

    The approach is to delegate the packaging to each layer in the high level graph
    by calling .__dask_distributed_pack__() and .__dask_distributed_annotations_pack__()
    on each layer. If the layer doesn't implement packaging, we materialize the
    layer and pack it.

    Parameters
    ----------
    client : distributed.Client
        The client calling this function.
    client_keys : Iterable
        List of keys requested by the client.

    Returns
    -------
    data: list of header and payload
        Packed high level graph serialized by dumps_msgpack
    """
  from distributed.protocol.core import dumps_msgpack

E ImportError: cannot import name 'dumps_msgpack' from 'distributed.protocol.core' (/home/ddavis/software/repos/distributed/distributed/protocol/core.py)

../../../.pyenv/versions/3.9.4/envs/dask-dev/lib/python3.9/site-packages/dask/highlevelgraph.py:946: ImportError
----------------------------------------- Captured stderr setup -----------------------------------------
distributed.http.proxy - INFO - To route to workers diagnostics web server please install jupyter-server-proxy: python -m pip install jupyter-server-proxy
distributed.scheduler - INFO - Clear task state
distributed.scheduler - INFO - Scheduler at: tcp://127.0.0.1:44311
distributed.scheduler - INFO - dashboard at: 127.0.0.1:8787
distributed.worker - INFO - Start worker at: tcp://127.0.0.1:38663
distributed.worker - INFO - Start worker at: tcp://127.0.0.1:46493
distributed.worker - INFO - Listening to: tcp://127.0.0.1:46493
distributed.worker - INFO - Listening to: tcp://127.0.0.1:38663
distributed.worker - INFO - dashboard at: 127.0.0.1:34269
distributed.worker - INFO - dashboard at: 127.0.0.1:46841
distributed.worker - INFO - Waiting to connect to: tcp://127.0.0.1:44311
distributed.worker - INFO - Waiting to connect to: tcp://127.0.0.1:44311
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - Threads: 1
distributed.worker - INFO - Threads: 1
distributed.worker - INFO - Memory: 31.29 GiB
distributed.worker - INFO - Memory: 31.29 GiB
distributed.worker - INFO - Local Directory: /home/ddavis/software/repos/distributed/_test_worker-073c4115-a5d6-4b38-bcb0-ed7ca81d803e/dask-worker-space/worker-e8jd6kxq
distributed.worker - INFO - Local Directory: /home/ddavis/software/repos/distributed/_test_worker-d80affb8-e03b-47fd-a8e5-b95c3782fa56/dask-worker-space/worker-4rmx_22m
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - -------------------------------------------------
distributed.scheduler - INFO - Register worker <Worker 'tcp://127.0.0.1:46493', name: tcp://127.0.0.1:46493, memory: 0, processing: 0>
distributed.scheduler - INFO - Starting worker compute stream, tcp://127.0.0.1:46493
distributed.core - INFO - Starting established connection
distributed.worker - INFO - Registered to: tcp://127.0.0.1:44311
distributed.worker - INFO - -------------------------------------------------
distributed.scheduler - INFO - Register worker <Worker 'tcp://127.0.0.1:38663', name: tcp://127.0.0.1:38663, memory: 0, processing: 0>
distributed.scheduler - INFO - Starting worker compute stream, tcp://127.0.0.1:38663
distributed.core - INFO - Starting established connection
distributed.core - INFO - Starting established connection
distributed.worker - INFO - Registered to: tcp://127.0.0.1:44311
distributed.worker - INFO - -------------------------------------------------
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Receive client connection: Client-4e88d74f-9ca0-11eb-b766-7085c25d8254
distributed.core - INFO - Starting established connection
--------------------------------------- Captured stderr teardown ----------------------------------------
distributed.scheduler - INFO - Remove client Client-4e88d74f-9ca0-11eb-b766-7085c25d8254
distributed.scheduler - INFO - Remove client Client-4e88d74f-9ca0-11eb-b766-7085c25d8254
distributed.scheduler - INFO - Close client connection: Client-4e88d74f-9ca0-11eb-b766-7085c25d8254
distributed.worker - INFO - Stopping worker at tcp://127.0.0.1:46493
distributed.worker - INFO - Stopping worker at tcp://127.0.0.1:38663
distributed.scheduler - INFO - Remove worker <Worker 'tcp://127.0.0.1:46493', name: tcp://127.0.0.1:46493, memory: 0, processing: 0>
distributed.core - INFO - Removing comms to tcp://127.0.0.1:46493
distributed.scheduler - INFO - Remove worker <Worker 'tcp://127.0.0.1:38663', name: tcp://127.0.0.1:38663, memory: 0, processing: 0>
distributed.core - INFO - Removing comms to tcp://127.0.0.1:38663
distributed.scheduler - INFO - Lost all workers
distributed.scheduler - INFO - Scheduler closing...
distributed.scheduler - INFO - Scheduler closing all comms
________________________________________ test_retries_dask_array ________________________________________

def test_func():
    result = None
    workers = []
    with clean(timeout=active_rpc_timeout, **clean_kwargs) as loop:

        async def coro():
            with dask.config.set(config):
                s = False
                for _ in range(60):
                    try:
                        s, ws = await start_cluster(
                            nthreads,
                            scheduler,
                            loop,
                            security=security,
                            Worker=Worker,
                            scheduler_kwargs=scheduler_kwargs,
                            worker_kwargs=worker_kwargs,
                        )
                    except Exception as e:
                        logger.error(
                            "Failed to start gen_cluster: "
                            f"{e.__class__.__name__}: {e}; retrying",
                            exc_info=True,
                        )
                        await asyncio.sleep(1)
                    else:
                        workers[:] = ws
                        args = [s] + workers
                        break
                if s is False:
                    raise Exception("Could not start cluster")
                if client:
                    c = await Client(
                        s.address,
                        loop=loop,
                        security=security,
                        asynchronous=True,
                        **client_kwargs,
                    )
                    args = [c] + args
                try:
                    future = func(*args)
                    if timeout:
                        future = asyncio.wait_for(future, timeout)
                    result = await future
                    if s.validate:
                        s.validate_state()
                finally:
                    if client and c.status not in ("closing", "closed"):
                        await c._close(fast=s.status == Status.closed)
                    await end_cluster(s, workers)
                    await asyncio.wait_for(cleanup_global_workers(), 1)

                try:
                    c = await default_client()
                except ValueError:
                    pass
                else:
                    await c._close(fast=True)

                def get_unclosed():
                    return [c for c in Comm._instances if not c.closed()] + [
                        c
                        for c in _global_clients.values()
                        if c.status != "closed"
                    ]

                try:
                    start = time()
                    while time() < start + 60:
                        gc.collect()
                        if not get_unclosed():
                            break
                        await asyncio.sleep(0.05)
                    else:
                        if allow_unclosed:
                            print(f"Unclosed Comms: {get_unclosed()}")
                        else:
                            raise RuntimeError("Unclosed Comms", get_unclosed())
                finally:
                    Comm._instances.clear()
                    _global_clients.clear()

                return result
      result = loop.run_sync(
            coro, timeout=timeout * 2 if timeout else timeout
        )

distributed/utils_test.py:955:


../../../.pyenv/versions/3.9.4/envs/dask-dev/lib/python3.9/site-packages/tornado/ioloop.py:530: in run_sync
return future_cell[0].result()
distributed/utils_test.py:914: in coro
result = await future
../../../.pyenv/versions/3.9.4/lib/python3.9/asyncio/tasks.py:481: in wait_for
return fut.result()
distributed/tests/test_client.py:375: in test_retries_dask_array
future = c.compute(x.sum(), retries=2)
distributed/client.py:2849: in compute
futures_dict = self._graph_to_futures(
distributed/client.py:2554: in _graph_to_futures
dsk = dsk.dask_distributed_pack(self, keyset)


self = <dask.highlevelgraph.HighLevelGraph object at 0x7faee20d9d00>, client = <Client: not connected>
client_keys = {'finalize-4d16e7984a435462f464d04214f86e01'}

def __dask_distributed_pack__(self, client, client_keys: Iterable[Hashable]) -> Any:
    """Pack the high level graph for Scheduler -> Worker communication

    The approach is to delegate the packaging to each layer in the high level graph
    by calling .__dask_distributed_pack__() and .__dask_distributed_annotations_pack__()
    on each layer. If the layer doesn't implement packaging, we materialize the
    layer and pack it.

    Parameters
    ----------
    client : distributed.Client
        The client calling this function.
    client_keys : Iterable
        List of keys requested by the client.

    Returns
    -------
    data: list of header and payload
        Packed high level graph serialized by dumps_msgpack
    """
  from distributed.protocol.core import dumps_msgpack

E ImportError: cannot import name 'dumps_msgpack' from 'distributed.protocol.core' (/home/ddavis/software/repos/distributed/distributed/protocol/core.py)

../../../.pyenv/versions/3.9.4/envs/dask-dev/lib/python3.9/site-packages/dask/highlevelgraph.py:946: ImportError
----------------------------------------- Captured stderr call ------------------------------------------
distributed.http.proxy - INFO - To route to workers diagnostics web server please install jupyter-server-proxy: python -m pip install jupyter-server-proxy
distributed.scheduler - INFO - Clear task state
distributed.scheduler - INFO - Scheduler at: tcp://127.0.0.1:35655
distributed.scheduler - INFO - dashboard at: 127.0.0.1:8787
distributed.worker - INFO - Start worker at: tcp://127.0.0.1:36387
distributed.worker - INFO - Listening to: tcp://127.0.0.1:36387
distributed.worker - INFO - dashboard at: 127.0.0.1:33309
distributed.worker - INFO - Waiting to connect to: tcp://127.0.0.1:35655
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - Threads: 1
distributed.worker - INFO - Memory: 31.29 GiB
distributed.worker - INFO - Local Directory: /tmp/dask-worker-space/dask-worker-space/worker-lilb48qn
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - Start worker at: tcp://127.0.0.1:42597
distributed.worker - INFO - Listening to: tcp://127.0.0.1:42597
distributed.worker - INFO - dashboard at: 127.0.0.1:41751
distributed.worker - INFO - Waiting to connect to: tcp://127.0.0.1:35655
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - Threads: 2
distributed.worker - INFO - Memory: 31.29 GiB
distributed.worker - INFO - Local Directory: /tmp/dask-worker-space/dask-worker-space/worker-ynjqj6rs
distributed.worker - INFO - -------------------------------------------------
distributed.scheduler - INFO - Register worker <Worker 'tcp://127.0.0.1:42597', name: 1, memory: 0, processing: 0>
distributed.scheduler - INFO - Starting worker compute stream, tcp://127.0.0.1:42597
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Register worker <Worker 'tcp://127.0.0.1:36387', name: 0, memory: 0, processing: 0>
distributed.scheduler - INFO - Starting worker compute stream, tcp://127.0.0.1:36387
distributed.core - INFO - Starting established connection
distributed.worker - INFO - Registered to: tcp://127.0.0.1:35655
distributed.worker - INFO - -------------------------------------------------
distributed.core - INFO - Starting established connection
distributed.worker - INFO - Registered to: tcp://127.0.0.1:35655
distributed.worker - INFO - -------------------------------------------------
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Receive client connection: Client-4ff42110-9ca0-11eb-b766-7085c25d8254
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Remove client Client-4ff42110-9ca0-11eb-b766-7085c25d8254
distributed.scheduler - INFO - Remove client Client-4ff42110-9ca0-11eb-b766-7085c25d8254
distributed.scheduler - INFO - Close client connection: Client-4ff42110-9ca0-11eb-b766-7085c25d8254
distributed.worker - INFO - Stopping worker at tcp://127.0.0.1:36387
distributed.worker - INFO - Stopping worker at tcp://127.0.0.1:42597
distributed.scheduler - INFO - Remove worker <Worker 'tcp://127.0.0.1:36387', name: 0, memory: 0, processing: 0>
distributed.core - INFO - Removing comms to tcp://127.0.0.1:36387
distributed.scheduler - INFO - Remove worker <Worker 'tcp://127.0.0.1:42597', name: 1, memory: 0, processing: 0>
distributed.core - INFO - Removing comms to tcp://127.0.0.1:42597
distributed.scheduler - INFO - Lost all workers
distributed.scheduler - INFO - Scheduler closing...
distributed.scheduler - INFO - Scheduler closing all comms
_____________________________________________ test_retries ______________________________________________

client = <Client: 'tcp://127.0.0.1:37433' processes=2 threads=2, memory=62.58 GiB>

def test_retries(client):
    args = [ZeroDivisionError("one"), ZeroDivisionError("two"), 42]

    with client.get_executor(retries=5, pure=False) as e:
      future = e.submit(varying(args))

distributed/tests/test_client_executor.py:214:


distributed/cfexecutor.py:91: in submit
future = self._client.submit(fn, *args, **merge(self._kwargs, kwargs))
distributed/client.py:1586: in submit
futures = self._graph_to_futures(
distributed/client.py:2554: in _graph_to_futures
dsk = dsk.dask_distributed_pack(self, keyset)


self = <dask.highlevelgraph.HighLevelGraph object at 0x7faee1d6dbb0>
client = <Client: 'tcp://127.0.0.1:37433' processes=2 threads=2, memory=62.58 GiB>
client_keys = {'func-58c70458-9c56-4a8d-86be-4b2b26ed831e'}

def __dask_distributed_pack__(self, client, client_keys: Iterable[Hashable]) -> Any:
    """Pack the high level graph for Scheduler -> Worker communication

    The approach is to delegate the packaging to each layer in the high level graph
    by calling .__dask_distributed_pack__() and .__dask_distributed_annotations_pack__()
    on each layer. If the layer doesn't implement packaging, we materialize the
    layer and pack it.

    Parameters
    ----------
    client : distributed.Client
        The client calling this function.
    client_keys : Iterable
        List of keys requested by the client.

    Returns
    -------
    data: list of header and payload
        Packed high level graph serialized by dumps_msgpack
    """
  from distributed.protocol.core import dumps_msgpack

E ImportError: cannot import name 'dumps_msgpack' from 'distributed.protocol.core' (/home/ddavis/software/repos/distributed/distributed/protocol/core.py)

../../../.pyenv/versions/3.9.4/envs/dask-dev/lib/python3.9/site-packages/dask/highlevelgraph.py:946: ImportError
----------------------------------------- Captured stderr setup -----------------------------------------
distributed.http.proxy - INFO - To route to workers diagnostics web server please install jupyter-server-proxy: python -m pip install jupyter-server-proxy
distributed.scheduler - INFO - Clear task state
distributed.scheduler - INFO - Scheduler at: tcp://127.0.0.1:37433
distributed.scheduler - INFO - dashboard at: 127.0.0.1:8787
distributed.worker - INFO - Start worker at: tcp://127.0.0.1:38385
distributed.worker - INFO - Start worker at: tcp://127.0.0.1:44107
distributed.worker - INFO - Listening to: tcp://127.0.0.1:38385
distributed.worker - INFO - dashboard at: 127.0.0.1:42139
distributed.worker - INFO - Listening to: tcp://127.0.0.1:44107
distributed.worker - INFO - Waiting to connect to: tcp://127.0.0.1:37433
distributed.worker - INFO - dashboard at: 127.0.0.1:44941
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - Waiting to connect to: tcp://127.0.0.1:37433
distributed.worker - INFO - Threads: 1
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - Threads: 1
distributed.worker - INFO - Memory: 31.29 GiB
distributed.worker - INFO - Local Directory: /home/ddavis/software/repos/distributed/_test_worker-c8d63ec0-b144-44d6-bf0b-76585c501ef6/dask-worker-space/worker-hl29mkzn
distributed.worker - INFO - Memory: 31.29 GiB
distributed.worker - INFO - Local Directory: /home/ddavis/software/repos/distributed/_test_worker-35eada34-412a-441f-97f3-6e0946f83504/dask-worker-space/worker-0ce3k0kv
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - -------------------------------------------------
distributed.scheduler - INFO - Register worker <Worker 'tcp://127.0.0.1:44107', name: tcp://127.0.0.1:44107, memory: 0, processing: 0>
distributed.scheduler - INFO - Starting worker compute stream, tcp://127.0.0.1:44107
distributed.core - INFO - Starting established connection
distributed.worker - INFO - Registered to: tcp://127.0.0.1:37433
distributed.worker - INFO - -------------------------------------------------
distributed.scheduler - INFO - Register worker <Worker 'tcp://127.0.0.1:38385', name: tcp://127.0.0.1:38385, memory: 0, processing: 0>
distributed.scheduler - INFO - Starting worker compute stream, tcp://127.0.0.1:38385
distributed.core - INFO - Starting established connection
distributed.core - INFO - Starting established connection
distributed.worker - INFO - Registered to: tcp://127.0.0.1:37433
distributed.worker - INFO - -------------------------------------------------
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Receive client connection: Client-50663b45-9ca0-11eb-b766-7085c25d8254
distributed.core - INFO - Starting established connection
--------------------------------------- Captured stderr teardown ----------------------------------------
distributed.scheduler - INFO - Remove client Client-50663b45-9ca0-11eb-b766-7085c25d8254
distributed.scheduler - INFO - Remove client Client-50663b45-9ca0-11eb-b766-7085c25d8254
distributed.scheduler - INFO - Close client connection: Client-50663b45-9ca0-11eb-b766-7085c25d8254
distributed.worker - INFO - Stopping worker at tcp://127.0.0.1:44107
distributed.worker - INFO - Stopping worker at tcp://127.0.0.1:38385
distributed.scheduler - INFO - Remove worker <Worker 'tcp://127.0.0.1:44107', name: tcp://127.0.0.1:44107, memory: 0, processing: 0>
distributed.core - INFO - Removing comms to tcp://127.0.0.1:44107
distributed.scheduler - INFO - Remove worker <Worker 'tcp://127.0.0.1:38385', name: tcp://127.0.0.1:38385, memory: 0, processing: 0>
distributed.core - INFO - Removing comms to tcp://127.0.0.1:38385
distributed.scheduler - INFO - Lost all workers
distributed.scheduler - INFO - Scheduler closing...
distributed.scheduler - INFO - Scheduler closing all comms
_____________________________________________ test_retries ______________________________________________

def test_func():
    result = None
    workers = []
    with clean(timeout=active_rpc_timeout, **clean_kwargs) as loop:

        async def coro():
            with dask.config.set(config):
                s = False
                for _ in range(60):
                    try:
                        s, ws = await start_cluster(
                            nthreads,
                            scheduler,
                            loop,
                            security=security,
                            Worker=Worker,
                            scheduler_kwargs=scheduler_kwargs,
                            worker_kwargs=worker_kwargs,
                        )
                    except Exception as e:
                        logger.error(
                            "Failed to start gen_cluster: "
                            f"{e.__class__.__name__}: {e}; retrying",
                            exc_info=True,
                        )
                        await asyncio.sleep(1)
                    else:
                        workers[:] = ws
                        args = [s] + workers
                        break
                if s is False:
                    raise Exception("Could not start cluster")
                if client:
                    c = await Client(
                        s.address,
                        loop=loop,
                        security=security,
                        asynchronous=True,
                        **client_kwargs,
                    )
                    args = [c] + args
                try:
                    future = func(*args)
                    if timeout:
                        future = asyncio.wait_for(future, timeout)
                    result = await future
                    if s.validate:
                        s.validate_state()
                finally:
                    if client and c.status not in ("closing", "closed"):
                        await c._close(fast=s.status == Status.closed)
                    await end_cluster(s, workers)
                    await asyncio.wait_for(cleanup_global_workers(), 1)

                try:
                    c = await default_client()
                except ValueError:
                    pass
                else:
                    await c._close(fast=True)

                def get_unclosed():
                    return [c for c in Comm._instances if not c.closed()] + [
                        c
                        for c in _global_clients.values()
                        if c.status != "closed"
                    ]

                try:
                    start = time()
                    while time() < start + 60:
                        gc.collect()
                        if not get_unclosed():
                            break
                        await asyncio.sleep(0.05)
                    else:
                        if allow_unclosed:
                            print(f"Unclosed Comms: {get_unclosed()}")
                        else:
                            raise RuntimeError("Unclosed Comms", get_unclosed())
                finally:
                    Comm._instances.clear()
                    _global_clients.clear()

                return result
      result = loop.run_sync(
            coro, timeout=timeout * 2 if timeout else timeout
        )

distributed/utils_test.py:955:


../../../.pyenv/versions/3.9.4/envs/dask-dev/lib/python3.9/site-packages/tornado/ioloop.py:530: in run_sync
return future_cell[0].result()
distributed/utils_test.py:914: in coro
result = await future
../../../.pyenv/versions/3.9.4/lib/python3.9/asyncio/tasks.py:481: in wait_for
return fut.result()
distributed/tests/test_scheduler.py:1400: in test_retries
future = c.submit(varying(args), retries=3)
distributed/client.py:1586: in submit
futures = self._graph_to_futures(
distributed/client.py:2554: in _graph_to_futures
dsk = dsk.dask_distributed_pack(self, keyset)


self = <dask.highlevelgraph.HighLevelGraph object at 0x7faee20c90a0>, client = <Client: not connected>
client_keys = {'func-24c1cd9c6d5696647fb020a299f60ee9'}

def __dask_distributed_pack__(self, client, client_keys: Iterable[Hashable]) -> Any:
    """Pack the high level graph for Scheduler -> Worker communication

    The approach is to delegate the packaging to each layer in the high level graph
    by calling .__dask_distributed_pack__() and .__dask_distributed_annotations_pack__()
    on each layer. If the layer doesn't implement packaging, we materialize the
    layer and pack it.

    Parameters
    ----------
    client : distributed.Client
        The client calling this function.
    client_keys : Iterable
        List of keys requested by the client.

    Returns
    -------
    data: list of header and payload
        Packed high level graph serialized by dumps_msgpack
    """
  from distributed.protocol.core import dumps_msgpack

E ImportError: cannot import name 'dumps_msgpack' from 'distributed.protocol.core' (/home/ddavis/software/repos/distributed/distributed/protocol/core.py)

../../../.pyenv/versions/3.9.4/envs/dask-dev/lib/python3.9/site-packages/dask/highlevelgraph.py:946: ImportError
----------------------------------------- Captured stderr call ------------------------------------------
distributed.scheduler - INFO - Clear task state
distributed.scheduler - INFO - Scheduler at: tcp://127.0.0.1:35647
distributed.scheduler - INFO - dashboard at: 127.0.0.1:8787
distributed.worker - INFO - Start worker at: tcp://127.0.0.1:41435
distributed.worker - INFO - Listening to: tcp://127.0.0.1:41435
distributed.worker - INFO - dashboard at: 127.0.0.1:38335
distributed.worker - INFO - Waiting to connect to: tcp://127.0.0.1:35647
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - Threads: 1
distributed.worker - INFO - Memory: 31.29 GiB
distributed.worker - INFO - Local Directory: /tmp/dask-worker-space/dask-worker-space/worker-rppfb0t5
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - Start worker at: tcp://127.0.0.1:34335
distributed.worker - INFO - Listening to: tcp://127.0.0.1:34335
distributed.worker - INFO - dashboard at: 127.0.0.1:36775
distributed.worker - INFO - Waiting to connect to: tcp://127.0.0.1:35647
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - Threads: 2
distributed.worker - INFO - Memory: 31.29 GiB
distributed.worker - INFO - Local Directory: /tmp/dask-worker-space/dask-worker-space/worker-6tkp49gn
distributed.worker - INFO - -------------------------------------------------
distributed.scheduler - INFO - Register worker <Worker 'tcp://127.0.0.1:41435', name: 0, memory: 0, processing: 0>
distributed.scheduler - INFO - Starting worker compute stream, tcp://127.0.0.1:41435
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Register worker <Worker 'tcp://127.0.0.1:34335', name: 1, memory: 0, processing: 0>
distributed.scheduler - INFO - Starting worker compute stream, tcp://127.0.0.1:34335
distributed.core - INFO - Starting established connection
distributed.worker - INFO - Registered to: tcp://127.0.0.1:35647
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - Registered to: tcp://127.0.0.1:35647
distributed.worker - INFO - -------------------------------------------------
distributed.core - INFO - Starting established connection
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Receive client connection: Client-51b04972-9ca0-11eb-b766-7085c25d8254
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Remove client Client-51b04972-9ca0-11eb-b766-7085c25d8254
distributed.scheduler - INFO - Remove client Client-51b04972-9ca0-11eb-b766-7085c25d8254
distributed.scheduler - INFO - Close client connection: Client-51b04972-9ca0-11eb-b766-7085c25d8254
distributed.worker - INFO - Stopping worker at tcp://127.0.0.1:41435
distributed.worker - INFO - Stopping worker at tcp://127.0.0.1:34335
distributed.scheduler - INFO - Remove worker <Worker 'tcp://127.0.0.1:41435', name: 0, memory: 0, processing: 0>
distributed.core - INFO - Removing comms to tcp://127.0.0.1:41435
distributed.scheduler - INFO - Remove worker <Worker 'tcp://127.0.0.1:34335', name: 1, memory: 0, processing: 0>
distributed.core - INFO - Removing comms to tcp://127.0.0.1:34335
distributed.scheduler - INFO - Lost all workers
distributed.scheduler - INFO - Scheduler closing...
distributed.scheduler - INFO - Scheduler closing all comms
=========================================== warnings summary ============================================
../../../.pyenv/versions/3.9.4/envs/dask-dev/lib/python3.9/site-packages/_pytest/config/init.py:1233
/home/ddavis/.pyenv/versions/3.9.4/envs/dask-dev/lib/python3.9/site-packages/_pytest/config/init.py:1233: PytestConfigWarning: Unknown config option: timeout

self._warn_or_fail_if_strict(f"Unknown config option: {key}\n")

../../../.pyenv/versions/3.9.4/envs/dask-dev/lib/python3.9/site-packages/_pytest/config/init.py:1233
/home/ddavis/.pyenv/versions/3.9.4/envs/dask-dev/lib/python3.9/site-packages/_pytest/config/init.py:1233: PytestConfigWarning: Unknown config option: timeout_method

self._warn_or_fail_if_strict(f"Unknown config option: {key}\n")

-- Docs: https://docs.pytest.org/en/stable/warnings.html
========================================= slowest 20 durations ==========================================
2.04s teardown distributed/tests/test_client.py::test_retries_get
2.04s teardown distributed/tests/test_client_executor.py::test_retries
0.52s setup distributed/tests/test_client.py::test_retries_get
0.52s setup distributed/tests/test_client_executor.py::test_retries
0.03s call distributed/tests/test_client.py::test_retries_dask_array
0.02s call distributed/tests/test_scheduler.py::test_retries
0.00s call distributed/tests/test_client.py::test_retries_get
0.00s call distributed/tests/test_client_executor.py::test_retries
0.00s teardown distributed/tests/test_client.py::test_retries_dask_array
0.00s teardown distributed/tests/test_scheduler.py::test_retries
0.00s setup distributed/tests/test_client.py::test_retries_dask_array
0.00s setup distributed/tests/test_scheduler.py::test_retries
============================ 4 failed, 1188 deselected, 2 warnings in 6.62s =============================

@jakirkham
Copy link
Member

jakirkham commented Apr 13, 2021

May need to merge with latest main. That function was removed ( #4677 )

Edit: The other possibility is an older version of distributed is still installed in that environment and getting imported instead

@douglasdavis
Copy link
Member Author

douglasdavis commented Apr 13, 2021

Thanks @jakirkham a fresh env did the trick. Not 100% sure but I think I had mixed pip install -e . with an old pip install .

@jakirkham
Copy link
Member

Yeah happens to me periodically as well 🙂

Copy link
Member

@jrbourbeau jrbourbeau left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @douglasdavis! This is in

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Repeatedly use the same worker on first task

4 participants