Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion distributed/dashboard/tests/test_components.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ async def test_profile_plot(c, s, a, b):
assert len(p.source.data["left"]) >= 1


@gen_cluster(client=True, clean_kwargs={"threads": False})
@gen_cluster(client=True, clean_kwargs={"threads": False, "instances": False})
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instance checks are disabled on a variety of tests largely due to #6308.

Garbage collecting Scheduler objects is useful, but secondary relative to clearing up TaskStates and their expensive run_spec attributes.

We should log an issue that tracks re-enabling them.

async def test_profile_time_plot(c, s, a, b):
from bokeh.io import curdoc

Expand Down
5 changes: 4 additions & 1 deletion distributed/dashboard/tests/test_scheduler_bokeh.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,9 @@
scheduler.PROFILING = False # type: ignore


@gen_cluster(client=True, scheduler_kwargs={"dashboard": True})
@gen_cluster(
client=True, scheduler_kwargs={"dashboard": True}, clean_kwargs={"instances": False}
)
async def test_simple(c, s, a, b):
port = s.http_server.port

Expand Down Expand Up @@ -883,6 +885,7 @@ async def test_lots_of_tasks(c, s, a, b):
@gen_cluster(
client=True,
scheduler_kwargs={"dashboard": True},
clean_kwargs={"instances": False},
config={
"distributed.scheduler.dashboard.tls.key": get_cert("tls-key.pem"),
"distributed.scheduler.dashboard.tls.cert": get_cert("tls-cert.pem"),
Expand Down
4 changes: 3 additions & 1 deletion distributed/dashboard/tests/test_worker_bokeh.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,9 @@ async def test_CommunicatingStream(c, s, a, b):


@gen_cluster(
client=True, clean_kwargs={"threads": False}, worker_kwargs={"dashboard": True}
client=True,
clean_kwargs={"threads": False, "instances": False},
worker_kwargs={"dashboard": True},
)
async def test_prometheus(c, s, a, b):
pytest.importorskip("prometheus_client")
Expand Down
8 changes: 8 additions & 0 deletions distributed/deploy/spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -630,6 +630,14 @@ def from_name(cls, name: str):
"""Create an instance of this class to represent an existing cluster by name."""
raise NotImplementedError()

def clear_instance(self):
del self.scheduler

@classmethod
def clear_instances(cls):
for instance in cls._instances:
instance.clear_instance()


async def run_spec(spec: dict, *args):
workers = {}
Expand Down
4 changes: 2 additions & 2 deletions distributed/deploy/tests/test_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ async def test_memory_limit_none():


def test_cleanup():
with clean(threads=False):
with clean(threads=False, instances=False):
c = LocalCluster(n_workers=2, silence_logs=False, dashboard_address=":0")
port = c.scheduler.port
c.close()
Expand Down Expand Up @@ -882,7 +882,7 @@ def test_starts_up_sync(loop):
def test_dont_select_closed_worker():
# Make sure distributed does not try to reuse a client from a
# closed cluster (https://github.com/dask/distributed/issues/2840).
with clean(threads=False):
with clean(threads=False, instances=False):
cluster = LocalCluster(n_workers=0, dashboard_address=":0")
c = Client(cluster)
cluster.scale(2)
Expand Down
4 changes: 2 additions & 2 deletions distributed/http/scheduler/tests/test_scheduler_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ async def test_prefix(c, s, a, b):
assert is_valid_xml(body)


@gen_cluster(client=True, clean_kwargs={"threads": False})
@gen_cluster(client=True, clean_kwargs={"threads": False, "instances": False})
async def test_prometheus(c, s, a, b):
pytest.importorskip("prometheus_client")
from prometheus_client.parser import text_string_to_metric_families
Expand All @@ -103,7 +103,7 @@ async def test_prometheus(c, s, a, b):
assert client.samples[0].value == 1.0


@gen_cluster(client=True, clean_kwargs={"threads": False})
@gen_cluster(client=True, clean_kwargs={"threads": False, "instances": False})
async def test_prometheus_collect_task_states(c, s, a, b):
pytest.importorskip("prometheus_client")
from prometheus_client.parser import text_string_to_metric_families
Expand Down
2 changes: 1 addition & 1 deletion distributed/http/scheduler/tests/test_semaphore_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from distributed.utils_test import gen_cluster


@gen_cluster(client=True, clean_kwargs={"threads": False})
@gen_cluster(client=True, clean_kwargs={"threads": False, "instances": False})
async def test_prometheus_collect_task_states(c, s, a, b):
pytest.importorskip("prometheus_client")
from prometheus_client.parser import text_string_to_metric_families
Expand Down
25 changes: 23 additions & 2 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -6697,7 +6697,7 @@ async def get_story(self, keys=()):

transition_story = story

def reschedule(self, key=None, worker=None):
def reschedule(self, key=None, worker=None, stimulus_id=None):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a smaller PR addressing this here: #6307

"""Reschedule a task

Things may have shifted and this task may now be better suited to run
Expand All @@ -6715,7 +6715,7 @@ def reschedule(self, key=None, worker=None):
return
if worker and ts.processing_on.address != worker:
return
self.transitions({key: "released"}, f"reschedule-{time()}")
self.transitions({key: "released"}, stimulus_id or f"reschedule-{time()}")

#####################
# Utility functions #
Expand Down Expand Up @@ -7268,6 +7268,27 @@ def request_remove_replicas(self, addr: str, keys: list, *, stimulus_id: str):
}
)

def clear_instance(self):
self.extensions.clear()
self.plugins.clear()
self.services.clear()
self.listeners.clear()
self.handlers.clear()
self.periodic_callbacks.clear()
self.stream_handlers.clear()
self.stream_comms.clear()
self.transitions_table.clear()
self.log.clear()
self.transition_log.clear()

del self.http_application
del self.http_server

@classmethod
def clear_instances(cls):
for instance in cls._instances:
instance.clear_instance()


def _remove_from_processing(
state: SchedulerState, ts: TaskState
Expand Down
11 changes: 7 additions & 4 deletions distributed/tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2888,7 +2888,12 @@ async def test_rebalance(c, s, a, b):
assert len(b.data) == 50


@gen_cluster(nthreads=[("", 1)] * 3, client=True, config=REBALANCE_MANAGED_CONFIG)
@gen_cluster(
nthreads=[("", 1)] * 3,
client=True,
config=REBALANCE_MANAGED_CONFIG,
clean_kwargs={"instances": False},
)
async def test_rebalance_workers_and_keys(client, s, a, b, c):
"""Test Client.rebalance(). These are just to test the Client wrapper around
Scheduler.rebalance(); for more thorough tests on the latter see test_scheduler.py.
Expand Down Expand Up @@ -4471,9 +4476,6 @@ def assert_no_data_loss(scheduler):

@gen_cluster(client=True)
async def test_interleave_computations(c, s, a, b):
import distributed

distributed.g = s
xs = [delayed(slowinc)(i, delay=0.02) for i in range(30)]
ys = [delayed(slowdec)(x, delay=0.02) for x in xs]
zs = [delayed(slowadd)(x, y, delay=0.02) for x, y in zip(xs, ys)]
Expand Down Expand Up @@ -4919,6 +4921,7 @@ def test_quiet_client_close(loop):


@pytest.mark.slow
@pytest.mark.parametrize("loop", [{"instances": False}], indirect=True)
def test_quiet_client_close_when_cluster_is_closed_before_client(loop):
with captured_logger(logging.getLogger("tornado.application")) as logger:
cluster = LocalCluster(loop=loop, n_workers=1, dashboard_address=":0")
Expand Down
2 changes: 1 addition & 1 deletion distributed/tests/test_cluster_dump.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def blocked_inc(x, event):
return x + 1


@gen_cluster(client=True)
@gen_cluster(client=True, clean_kwargs={"instances": False})
async def test_cluster_dump_state(c, s, a, b, tmp_path):
filename = tmp_path / "dump"
futs = c.map(inc, range(2))
Expand Down
4 changes: 2 additions & 2 deletions distributed/tests/test_publish.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from distributed.utils_test import gen_cluster, inc


@gen_cluster()
@gen_cluster(clean_kwargs={"instances": False})
async def test_publish_simple(s, a, b):
c = Client(s.address, asynchronous=True)
f = Client(s.address, asynchronous=True)
Expand Down Expand Up @@ -259,7 +259,7 @@ async def test_datasets_async(c, s, a, b):
len(c.datasets)


@gen_cluster(client=True)
@gen_cluster(client=True, clean_kwargs={"instances": False})
async def test_pickle_safe(c, s, a, b):
async with Client(s.address, asynchronous=True, serializers=["msgpack"]) as c2:
await c2.publish_dataset(x=[1, 2, 3])
Expand Down
17 changes: 13 additions & 4 deletions distributed/tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2180,7 +2180,11 @@ async def test_gather_no_workers(c, s, a, b):
@pytest.mark.slow
@pytest.mark.parametrize("reschedule_different_worker", [True, False])
@pytest.mark.parametrize("swap_data_insert_order", [True, False])
@gen_cluster(client=True, client_kwargs={"direct_to_workers": False})
@gen_cluster(
client=True,
client_kwargs={"direct_to_workers": False},
clean_kwargs={"instances": False},
)
async def test_gather_allow_worker_reconnect(
c, s, a, b, reschedule_different_worker, swap_data_insert_order
):
Expand Down Expand Up @@ -2806,7 +2810,12 @@ async def test_rebalance_managed_memory(c, s, a, b):
assert len(b.data) == 50


@gen_cluster(nthreads=[("", 1)] * 3, client=True, config=REBALANCE_MANAGED_CONFIG)
@gen_cluster(
nthreads=[("", 1)] * 3,
client=True,
config=REBALANCE_MANAGED_CONFIG,
clean_kwargs={"instances": False},
)
async def test_rebalance_workers_and_keys(client, s, a, b, c):
futures = await client.scatter(range(100), workers=[a.address])
assert (len(a.data), len(b.data), len(c.data)) == (100, 0, 0)
Expand Down Expand Up @@ -3250,7 +3259,7 @@ async def test_worker_heartbeat_after_cancel(c, s, *workers):
await asyncio.gather(*(w.heartbeat() for w in workers))


@gen_cluster(client=True, nthreads=[("", 1)])
@gen_cluster(client=True, nthreads=[("", 1)], clean_kwargs={"instances": False})
async def test_worker_reconnect_task_memory(c, s, a):
a.periodic_callbacks["heartbeat"].stop()

Expand All @@ -3270,7 +3279,7 @@ async def test_worker_reconnect_task_memory(c, s, a):
}


@gen_cluster(client=True, nthreads=[("", 1)])
@gen_cluster(client=True, nthreads=[("", 1)], clean_kwargs={"instances": False})
async def test_worker_reconnect_task_memory_with_resources(c, s, a):
async with Worker(s.address, resources={"A": 1}) as b:
while s.workers[b.address].status != Status.running:
Expand Down
2 changes: 1 addition & 1 deletion distributed/tests/test_semaphore.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ def f(x, release=True):


@pytest.mark.slow
@gen_cluster(client=True, timeout=120)
@gen_cluster(client=True, timeout=120, clean_kwargs={"instances": False})
async def test_close_async(c, s, a, b):
sem = await Semaphore(name="test")

Expand Down
2 changes: 1 addition & 1 deletion distributed/tests/test_stories.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ async def get_story(self, *args, **kw):
raise CommClosedError


@gen_cluster(client=True, Worker=WorkerBrokenStory)
@gen_cluster(client=True, Worker=WorkerBrokenStory, clean_kwargs={"instances": False})
@pytest.mark.parametrize("on_error", ["ignore", "raise"])
async def test_client_story_failed_worker(c, s, a, b, on_error):
f = c.submit(inc, 1)
Expand Down
6 changes: 5 additions & 1 deletion distributed/tests/test_worker_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,11 @@ def test_secede_without_stealing_issue_1262():
"""
# run the loop as an inner function so all workers are closed
# and exceptions can be examined
@gen_cluster(client=True, scheduler_kwargs={"extensions": {}})
@gen_cluster(
client=True,
scheduler_kwargs={"extensions": {}},
clean_kwargs={"instances": False},
)
async def secede_test(c, s, a, b):
def func(x):
with worker_client() as wc:
Expand Down
2 changes: 1 addition & 1 deletion distributed/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -1152,7 +1152,7 @@ def clear(self):
self.deque.clear()

@classmethod
def clear_all_instances(cls):
def clear_instances(cls):
"""
Clear the internal storage of all live DequeHandlers.
"""
Expand Down
47 changes: 35 additions & 12 deletions distributed/utils_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@
from typing import Any, Generator, Literal

from distributed.compatibility import MACOS
from distributed.profile import wait_profiler
from distributed.scheduler import Scheduler
from distributed.utils import has_keyword

try:
import ssl
Expand Down Expand Up @@ -138,9 +140,11 @@ async def cleanup_global_workers():
await worker.close(report=False, executor_wait=False)


@pytest.fixture
def loop():
with check_instances():
@pytest.fixture(params=[{"instances": True}])
def loop(request):
instances = request.param["instances"]

with check_instances() if instances else nullcontext():
with pristine_loop() as loop:
# Monkey-patch IOLoop.start to wait for loop stop
orig_start = loop.start
Expand Down Expand Up @@ -1768,6 +1772,8 @@ def check_instances():

_global_clients.clear()

Scheduler.clear_instances()

for w in Worker._instances:
with suppress(RuntimeError): # closed IOLoop
w.loop.add_callback(w.close, report=False, executor_wait=False)
Expand Down Expand Up @@ -1796,22 +1802,39 @@ def check_instances():
for n in Nanny._instances
), {n: n.status for n in Nanny._instances}

# assert not list(SpecCluster._instances) # TODO
assert all(c.status == Status.closed for c in SpecCluster._instances), list(
SpecCluster._instances
)
SpecCluster._instances.clear()
SpecCluster.clear_instances()
DequeHandler.clear_instances()

from _pytest.logging import LogCaptureHandler

for v in logging.Logger.manager.loggerDict.values():
if not isinstance(v, logging.PlaceHolder):
for h in v.handlers:
if isinstance(h, LogCaptureHandler):
h.reset()

has_keyword.cache_clear()

wait_profiler()
gc.collect()

if Scheduler._instances:
s = next(iter(Scheduler._instances))
import objgraph

objgraph.show_backrefs([s], filename="scheduler.png")
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The objgraph import should be removed upon PR approval

assert not Scheduler._instances

SpecCluster._instances.clear()
Nanny._instances.clear()
DequeHandler.clear_all_instances()


@contextmanager
def clean(threads=True, instances=True, timeout=1, processes=True):
with check_thread_leak() if threads else nullcontext():
with pristine_loop() as loop:
with check_process_leak(check=processes):
with check_instances() if instances else nullcontext():
with check_instances() if instances else nullcontext():
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The IOLoop needs to be cleaned up first, otherwise callbacks on the loop like Scheduler.remove_worker will hold onto Scheduler instances and cause check_instances to fail.

I'm not aware of any drawbacks to this ordering.

with pristine_loop() as loop:
with check_process_leak(check=processes):
with check_active_rpc(loop, timeout):
reset_config()

Expand Down