Skip to content

Conversation

@fjetter
Copy link
Member

@fjetter fjetter commented Mar 4, 2022

I encountered a problem that the default client and respective config settings were not properly reset when multiple clients were used and they were not closed in the correct order.

For example test_cancel_multi_client closes the clients in the same order they were started but that would leave a global config set, e.g. scheduler="dask.distributed"

This led to leakage of state in test such that some tests would behave differently depending on what test was executed

I noticed this happening in #5791 but have no idea why we're not seeing this on main. I added a pytest autouse fixture to ensure no tests are leaking global clients.

Closes #5772

Note: There are more issues about thread safe get_clients that are not addressed by this: #3827, #5467

Shortcoming / out of scope

If a user defines multiple default clients, modifies any of the keys scheduler or shuffle themselves, closing the last global/default client will overwrite the manual user setting to what it was before any Client was initialized. I'm fine with this since this should not happen

@github-actions
Copy link
Contributor

github-actions bot commented Mar 4, 2022

Unit Test Results

       15 files  +       2         15 suites  +2   8h 0m 18s ⏱️ + 1h 34m 45s
  2 707 tests +       3    2 617 ✔️  -        1       83 💤 ±  0  7 +4 
20 012 runs  +2 392  18 993 ✔️ +2 322  1 012 💤 +66  7 +4 

For more details on these failures, see this check.

Results for commit 7fd04cf. ± Comparison against base commit a8a9a3f.

♻️ This comment has been updated with latest results.

@fjetter fjetter self-assigned this Mar 4, 2022
@fjetter fjetter force-pushed the multi_client_default branch from 23e5546 to b76dfb8 Compare March 7, 2022 15:40
@fjetter
Copy link
Member Author

fjetter commented Mar 8, 2022

Possibly related failure in distributed/tests/test_worker_client.py::test_submit_different_names

@fjetter
Copy link
Member Author

fjetter commented Mar 8, 2022

Turns out distributed/tests/test_worker_client.py::test_submit_different_names is easily reproducible but the error condition is still confusing and I don't understand, yet, why the issue is not 100% reproducible.

The problem is that we're still hitting #2058 because #2066 was only a partial fix. We're not resolving addresses everywhere and some internal checks are comparing localhost to 127.0.0.1 and initialize a second client 💥

ReplayTaskClient(self)

def _set_default_configs(self):
self._set_config_stack.push(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the dask.config.set.__enter__ is a noop so push and enter_context do the same thing - but I think it's clearer to use enter_context anyway

Suggested change
self._set_config_stack.push(
self._set_config_stack.enter_context(


_default_event_handlers = {"print": _handle_print, "warn": _handle_warn}

_set_config_stack = contextlib.ExitStack()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a ClassVar and so there's a global ExitStack at distributed.client.Client._set_config_stack and ExitStack isn't threadsafe

Copy link
Member

@graingert graingert Mar 9, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe something like this?

class _ConfigureDask:
    def __init__(self):
        self._rlock = threading.RLock()
        self._count = 0

    def __enter__(self):
        with self._rlock:
            self._count += 1
            if self._count == 1:
                self._set = dask.config.set(scheduler="dask.distributed", shuffle="tasks")

    def __exit__(self, *exc_info):
        with self._rlock:
            self._count -= 1
            if self._count == 0:
                self._set.__exit__(None, None, None)
                del self._set

_configure_dask = _ConfigureDask()

Copy link
Member

@graingert graingert Mar 9, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or maybe something like this:

class _GlobalClientManager:
    def __init__(self):
        self._lock = threading.RLock()
        self._global_clients: weakref.WeakValueDictionary[
            int, Client
        ] = weakref.WeakValueDictionary()
        self._global_client_index = 0
        self._set = None

    def _get_global_client(self) -> Client | None:
        with self._lock:
            L = sorted(self._global_clients, reverse=True)
            for k in L:
                c = self._global_clients[k]
                if c.status != "closed":
                    return c
                else:
                    del self._global_clients[k]
            return None

    def _set_global_client(self, c: Client) -> None:
        with self._lock:
            if not self._set:
                self._set = dask.config.set(
                    scheduler="dask.distributed", shuffle="tasks"
                )
            self._global_clients[self._global_client_index] = c
            self._global_client_index += 1

    def _del_global_client(self, c: Client) -> None:
        with self._lock:
            for k in list(self._global_clients):
                try:
                    if _global_clients[k] is c:
                        del _global_clients[k]
                except KeyError:  # pragma: no cover
                    pass

            if not self._global_clients:
                self._set.__exit__(None, None, None)
                self._set = None


_global_client_manager = _GlobalClientManager()
_set_global_client = _global_client_manager._set_global_client
_get_global_client = _global_client_manager._get_global_client
_del_global_client = _global_client_manager._del_global_client

and it would fix #5772 also

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what our appetite for metaclasses in the code base is, but I've personally liked using Multiton's for heavyweight resource objects which should be unique per-process for the arguments used to create it.

import weakref
from threading import Lock

class ClientMultiton(type):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.__cache = weakref.WeakValueDictionary()
        self.__lock = Lock()

    def __call__(cls, *args, **kw):
        key = args + (frozenset(kw.items()),) if kw else args

        with cls.__lock:
            try:
                # Obtain existing instance for this key
                instance = cls.__cache[key]
            except KeyError:
                pass
            else:
                # Return instance if it's not closed
                if not instance.closed:
                    return instance

                # Delete the closed cache entry
                # and proceed with recreating the instance
                # for this key
                del cls.__cache[key]

            cls.__cache[key] = instance = type.__call__(cls, *args, **kw)
            return instance


class Client(metaclass=ClientMultiton):
    def __init__(self, address=None, timeout=100):
        self.address = address
        self.timeout = timeout
        self._closed = False
        self._lock = Lock()

    @property
    def closed(self):
        # Dangerous if self._closed can transition from False to True
        with self._lock:  
            return self._closed

    def close(self):
        with self._lock:
            if self._closed:
                return

            print(f"Closing {self.address}")
            self._closed = True

    def __del__(self):
        self.close()

if __name__ == "__main__":
    def inner():
        c1 = Client(address="tcp://172.0.0.1")
        assert Client(address="tcp://172.0.0.1") is c1
        assert Client(address="tcp://172.0.0.1") is Client(address="tcp://172.0.0.1")

        addresses = list(sorted(c.address for c in Client._ClientMultiton__cache.values()))
        assert addresses == ["tcp://172.0.0.1"]

        c2 = Client(address="tcp://172.0.0.2")
        assert c2 is not c1
        addresses = list(sorted(c.address for c in Client._ClientMultiton__cache.values()))
        assert addresses == ["tcp://172.0.0.1", "tcp://172.0.0.2"]

        c1.close()
        addresses = list(sorted(c.address for c in Client._ClientMultiton__cache.values()))
        assert addresses == ["tcp://172.0.0.1", "tcp://172.0.0.2"]

        c3 = Client(address="tcp://172.0.0.1")
        assert c3 is not c1
        assert not c3.closed
        addresses = list(sorted(c.address for c in Client._ClientMultiton__cache.values()))
        assert addresses == ["tcp://172.0.0.1", "tcp://172.0.0.2"]
    
    inner()
    print("Done")
    assert len(Client._ClientMultiton__cache) == 0

One of the nice things about the above pattern is that one can defer to the garbage collector for cleanup by simply removing any references to the object (in a Future for instance?)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what our appetite for metaclasses in the code base

Generally speaking I'm not a huge fan of metaclasses. Mostly because they are used very rarely and most people are having a hard time understanding it.

However, your suggestion looks pretty appealing and I'm intrigued. I'll look into this and would consider using the metaclass if it helps us to reduce code complexity in other areas.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could help us cleaning up a bit of code around get_client and particularly in Worker._get_client it would come in handy to rely on this multiton.

The multiton pattern itself does not help us deal with the default client mechanism. under the assumption that there are multiple clients allowed the complexity around default clients would merely shift from this manager class to the metaclass.
We can't remove all of the default clients logic if we allow connection to multiple schedulers or even allow multiple distinct clients to connect to the same scheduler. The later is explicitly tested in a few tests.
Just having tests around is not necessarily an indication that this is a useful feature to have. For now, I don't see how this pattern would help us a lot without changing behaviour. I was hoping to stick with existing behaviour for now.

We should keep this in mind. I think having this would help in some cases and would allow us to write a cleaner handlign of clients overall. I expect this to be a bigger effort than what I feel comfortable doing in this PR

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should keep this in mind. I think having this would help in some cases and would allow us to write a cleaner handlign of clients overall. I expect this to be a bigger effort than what I feel comfortable doing in this PR

There are also examples of a lazily initialised Multiton (with support for a finaliser) under a BSD3 license in the following locations

https://github.com/ratt-ru/dask-ms/blob/master/daskms/patterns.py
https://github.com/ratt-ru/dask-ms/blob/master/daskms/tests/test_patterns.py

async def _start(self, timeout=no_default, **kwargs):
self.status = "connecting"
if self._set_as_default:
_set_global_client(self)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think moving _set_global_client into async def _start is worse - as this is likely to be called via Client(asynchronous=False) and so run in an off-main-thread eventloop thread

],
)
@gen_cluster()
async def test_submit_different_names(s, a, b):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Despite it's being marked flaky recently, this test has been solid before the changes proposed in this PR. This was achieved rather coincidentally since Clients were already added to the global default_client dict after initialization, before they were actually started.
Running this test on main yields an exception pointing this out.

>   assert c_inner.status == "running"
E   AssertionError: assert 'newly-created' == 'running'
E     - running
E     + newly-created

What's happening on main is that

  • As soon as the futures are deserialized on the worker, a
    get_client("tcp://localhost:XXX") is called in Future.__setstate__
  • localhost is resolved as part of get_client
  • since now the input address matches with the worker address, we call Worker._get_client
  • Worker._get_client however does not match the default client (the one we created manually in the test)
    client.scheduler
    and client.scheduler.address == self.scheduler.address
    # The below conditions should only happen in case a second
    # cluster is alive, e.g. if a submitted task spawned its onwn
    # LocalCluster, see gh4565
    or (
    isinstance(client._start_arg, str)
    and client._start_arg == self.scheduler.address
    or isinstance(client._start_arg, Cluster)
    and client._start_arg.scheduler_address == self.scheduler.address
    )
  • It will create a new client using the Worker RPC which always uses the resolved address
    self._client = Client(
    self.scheduler,
    loop=self.loop,
    security=self.security,
    set_as_default=True,
    asynchronous=asynchronous,
    direct_to_workers=True,
    name="worker",
    timeout=timeout,
    )
    Worker._initialized_clients.add(self._client)
  • This new client will be set to Worker._client and is the new
    default_client. main does only filter global clients for != closed and
    not running
  • Since the new one is the default client, the compute call will get the same
    one and it will always match addresses since it was created using the worker
    scheduler RPC
  • This Client is initialized using asynchronous=True but is not awaited. Therefore, it is not actually up, yet, which is why we receive the above newly-created status

Diff to this branch:

  • This branch requries a default client to be actually running. This leads us to
    initialize potentially many clients causing this confusion

An ideal world would...

  • Detect the default client as the manually created one using localhost
  • Recognize that the localhost client is talking to the same scheduler
  • Not initialize any further Clients

@gjoseph92
Copy link
Collaborator

I only barely skimmed this, but just wondering: how does this relate to #5485 / #5467 (comment)? Basically, does this also handle asyncio correctly? Or is this is addressing an orthogonal problem? I guess I would have expected to see some contextvars in here, but I'm probably misunderstanding the problem this is solving.

@fjetter
Copy link
Member Author

fjetter commented Mar 11, 2022

Or is this is addressing an orthogonal problem?

It relates but I believe is a different problem. The original problem I intended to solve is not related to threading at all, actually. Tom just pointed out that there are also race conditions in a threading context which is why I introduced the global client manager with a lock.

I think the asyncio detection works reliably since we introduced the SyncMethodMixin with

def in_async_call(loop, default=False):
"""Whether this call is currently within an async call"""
try:
return loop.asyncio_loop is asyncio.get_running_loop()
except RuntimeError:
# No *running* loop in thread. If the event loop isn't running, it
# _could_ be started later in this thread though. Return the default.
if not loop.asyncio_loop.is_running():
return default
return False

@fjetter fjetter force-pushed the multi_client_default branch from d16d28b to 7e19f2a Compare March 15, 2022 12:37
@fjetter fjetter force-pushed the multi_client_default branch from 7e19f2a to 1d8a357 Compare March 22, 2022 14:13
@fjetter
Copy link
Member Author

fjetter commented Mar 24, 2022

I believe test failures are unrelated. @graingert any further comments before I merge?

Copy link
Member

@graingert graingert left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

still some left-over _set_config_stack

@fjetter fjetter force-pushed the multi_client_default branch from a2f5d9d to 662b5c6 Compare April 1, 2022 10:09
@fjetter fjetter changed the title Reset config properly if multiple default clients are used Ensure default client mechanism is threadsafe Apr 1, 2022
@fjetter fjetter force-pushed the multi_client_default branch from 7fd04cf to 103db32 Compare December 12, 2022 15:26
@github-actions
Copy link
Contributor

Unit Test Results

See test report for an extended history of previous test failures. This is useful for diagnosing flaky tests.

       18 files  ±  0         18 suites  ±0   8h 14m 45s ⏱️ - 10m 9s
  3 259 tests +  4    3 169 ✔️ +  2       88 💤 +  2    2 ±0 
29 340 runs  +36  28 086 ✔️ +10  1 244 💤 +18  10 +8 

For more details on these failures, see this check.

Results for commit caf75f5. ± Comparison against base commit 19deee3.

@fjetter
Copy link
Member Author

fjetter commented Dec 13, 2022

I see many failures in test_package_install_restarts_on_nanny. Will check if this is related

@fjetter
Copy link
Member Author

fjetter commented Dec 13, 2022

Indeed, test_package_install_restarts_on_nanny is related. It is starting a worker client and this client blocks something during shutdown

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

RuntimeError: dictionary changed size during iteration from distributed.default_client()

4 participants