From 878eca3390f9128fc07fc02a95a0450222a630f3 Mon Sep 17 00:00:00 2001 From: Sheer El Showk Date: Mon, 2 Nov 2020 13:05:33 +0100 Subject: [PATCH 1/7] Better task duration estimates for outliers. Rather than always using the average task duration as an estimate we flag "outliers" (tasks that are taking 2x longer than expected duration) and we set expected duration to be twice their current running time. NOTE: also added check for missing key in worker metrics code. --- distributed/scheduler.py | 39 +++++++++++++++----- distributed/tests/test_worker.py | 63 +++++++++++++++++++++++++++++++- distributed/worker.py | 3 +- 3 files changed, 93 insertions(+), 12 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index d0637b680fb..6497f37d835 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -4068,6 +4068,28 @@ def decide_worker(self, ts): return worker + def set_duration_estimate(self, ts, ws): + """Estimate task duration using worker state and task state. + + If a task takes longer than twice the current average duration we + estimate the task duration to be 2x current-runtime, otherwise we set it + to be the average duration. + """ + # we might want to pull these vars from settings? + support_outliers = True + threshold = 2 + + duration = self.get_task_duration(ts) + comm = self.get_comm_cost(ts, ws) + new_duration = duration + comm + exec_time = -1 + if 'executing' in ws.metrics and ts.key in ws.metrics['executing']: + exec_time = ws.metrics['executing'][ts.key] + if support_outliers and exec_time > threshold * duration: + new_duration = 2 * exec_time + ws.processing[ts] = new_duration + return ws.processing[ts] + def transition_waiting_processing(self, key): try: ts = self.tasks[key] @@ -4086,13 +4108,10 @@ def transition_waiting_processing(self, key): return {} worker = ws.address - duration = self.get_task_duration(ts) - comm = self.get_comm_cost(ts, ws) - - ws.processing[ts] = duration + comm + duration_estimate = self.set_duration_estimate(ts, ws) ts.processing_on = ws - ws.occupancy += duration + comm - self.total_occupancy += duration + comm + ws.occupancy += duration_estimate + self.total_occupancy += duration_estimate ts.state = "processing" self.consume_resources(ts, ws) self.check_idle_saturated(ws) @@ -5304,10 +5323,7 @@ def _reevaluate_occupancy_worker(self, ws): new = 0 nbytes = 0 for ts in ws.processing: - duration = self.get_task_duration(ts) - comm = self.get_comm_cost(ts, ws) - ws.processing[ts] = duration + comm - new += duration + comm + new += self.set_duration_estimate(ts, ws) ws.occupancy = new self.total_occupancy += new - old @@ -5597,6 +5613,9 @@ def heartbeat_interval(n): return n / 200 + 1 + + + class KilledWorker(Exception): def __init__(self, task, last_worker): super().__init__(task, last_worker) diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index 55e0e8c387e..e06a8c9334a 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -1692,7 +1692,6 @@ async def test_heartbeat_executing(cleanup): assert f.key in ws.metrics["executing"] await f - @pytest.mark.asyncio @pytest.mark.parametrize("reconnect", [True, False]) async def test_heartbeat_comm_closed(cleanup, monkeypatch, reconnect): @@ -1715,6 +1714,68 @@ def bad_heartbeat_worker(*args, **kwargs): assert w.status == Status.closed assert "Heartbeat to scheduler failed" in logger.getvalue() +@pytest.mark.asyncio +async def test_outlier_timings(cleanup): + """Test adaptive duration estimates for outliers (defined as tasks running + 2x average duration). + """ + async with await Scheduler() as s: + async with await Worker(s.address) as w: + async with Client(s.address, asynchronous=True) as c: + # error tolerance on estimated duration (needed cause exec times + # depend on worker heartbeats) + tolerance = 0.2 + ws = s.workers[w.address] + # initialize a task to set initial duration average to 0.1 + r = c.submit(slowinc, 2.2, delay=0.1) + await r + # sleep times for some tasks with a shifting average duration + times = [5.]*3 + [11.]*3 + [15.] + # Store expected task duration after t seconds + # (NOTE t is the _sum_ of sleep times). + # at any t: + # - if t < 2 * cur_avg then duration = cur_avg + # - if t > 2 * cur_avg then duration = 2 * t + # the cur_avg will depend on tasks that already completed by t + sleep2duration = [ + # avg at this point will be 0.1 + (0.13, 0.1), + # we're in the outlier regime + (1, 2), # 1s + (1, 4), # 2s + (2, 8), # 4s + # the new average is ~4.5 so no longer an outlier + (3, 5.), # 7s + # we're an outlier again so expect 2x time + (3.5, 21), # 10.5s + # back to average + (1.5, 11.), # 12s + (3, 11.), # 15s + ] + key2time=dict() + # start a bunch of tasks with different sleep intervals + for n,td in enumerate(times): + r = c.submit(slowinc, n, delay=td) + key2time[r.key] = (td,r) + t0=time() + # every time we wait up we check that any running tasks have + # apporpriate durations + for (sleep_time, exp_avg) in sleep2duration: + await asyncio.sleep(sleep_time) + # transition any finished tasks so their averages get + # updated in the scheduler + for key,(td,r) in key2time.items(): + if td < sleep_time: + await r + for ts in ws.processing: + #logger.info(f"***************************************************************") + #logger.info(f"TIMES *** {time() - t0} t={sleep_time}, avg[{ts}] = {ws.processing[ts]}") + #logger.info(f"***************************************************************") + expected_duration = ws.processing[ts] + # expected duraction should be close to d0 + assert expected_duration > (1-tolerance) * exp_avg + assert expected_duration < (1+tolerance) * exp_avg + @pytest.mark.asyncio async def test_bad_local_directory(cleanup): diff --git a/distributed/worker.py b/distributed/worker.py index 830bc458ac6..504f4d91cdb 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -784,7 +784,8 @@ async def get_metrics(self): }, executing={ key: now - self.tasks[key].start_time - for key in self.active_threads.values() + for key in self.active_threads.values() + if key in self.tasks }, ) custom = {} From f5681d7a9b7c46176560557ba5cb2777b6646c05 Mon Sep 17 00:00:00 2001 From: James Bourbeau Date: Fri, 4 Dec 2020 13:59:28 -0600 Subject: [PATCH 2/7] Update test --- distributed/tests/test_steal.py | 16 ++++++++ distributed/tests/test_worker.py | 63 -------------------------------- 2 files changed, 16 insertions(+), 63 deletions(-) diff --git a/distributed/tests/test_steal.py b/distributed/tests/test_steal.py index 8277ede1833..bee0f06544e 100644 --- a/distributed/tests/test_steal.py +++ b/distributed/tests/test_steal.py @@ -809,3 +809,19 @@ async def test_worker_stealing_interval(c, s, a, b): with dask.config.set({"distributed.scheduler.work-stealing-interval": 2}): ws = WorkStealing(s) assert ws._pc.callback_time == 2 + + +@gen_cluster(client=True) +async def test_balance_with_longer_task(c, s, a, b): + np = pytest.importorskip("numpy") + + await c.submit(slowinc, 0, delay=0) # scheduler learns that slowinc is very fast + x = await c.scatter(np.arange(10000), workers=[a.address]) + y = c.submit( + slowinc, 1, delay=5, workers=[a.address], priority=1 + ) # a surprisingly long task + z = c.submit( + inc, x, workers=[a.address], allow_other_workers=True, priority=0 + ) # a task after y, suggesting a, but open to b + await z + assert z.key in b.data diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index bd4909eda22..b41e174bf5f 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -1720,69 +1720,6 @@ def bad_heartbeat_worker(*args, **kwargs): assert "Heartbeat to scheduler failed" in logger.getvalue() -@pytest.mark.asyncio -async def test_outlier_timings(cleanup): - """Test adaptive duration estimates for outliers (defined as tasks running - 2x average duration). - """ - async with await Scheduler() as s: - async with await Worker(s.address) as w: - async with Client(s.address, asynchronous=True) as c: - # error tolerance on estimated duration (needed cause exec times - # depend on worker heartbeats) - tolerance = 0.2 - ws = s.workers[w.address] - # initialize a task to set initial duration average to 0.1 - r = c.submit(slowinc, 2.2, delay=0.1) - await r - # sleep times for some tasks with a shifting average duration - times = [5.0] * 3 + [11.0] * 3 + [15.0] - # Store expected task duration after t seconds - # (NOTE t is the _sum_ of sleep times). - # at any t: - # - if t < 2 * cur_avg then duration = cur_avg - # - if t > 2 * cur_avg then duration = 2 * t - # the cur_avg will depend on tasks that already completed by t - sleep2duration = [ - # avg at this point will be 0.1 - (0.13, 0.1), - # we're in the outlier regime - (1, 2), # 1s - (1, 4), # 2s - (2, 8), # 4s - # the new average is ~4.5 so no longer an outlier - (3, 5.0), # 7s - # we're an outlier again so expect 2x time - (3.5, 21), # 10.5s - # back to average - (1.5, 11.0), # 12s - (3, 11.0), # 15s - ] - key2time = dict() - # start a bunch of tasks with different sleep intervals - for n, td in enumerate(times): - r = c.submit(slowinc, n, delay=td) - key2time[r.key] = (td, r) - t0 = time() - # every time we wait up we check that any running tasks have - # apporpriate durations - for (sleep_time, exp_avg) in sleep2duration: - await asyncio.sleep(sleep_time) - # transition any finished tasks so their averages get - # updated in the scheduler - for key, (td, r) in key2time.items(): - if td < sleep_time: - await r - for ts in ws.processing: - # logger.info(f"***************************************************************") - # logger.info(f"TIMES *** {time() - t0} t={sleep_time}, avg[{ts}] = {ws.processing[ts]}") - # logger.info(f"***************************************************************") - expected_duration = ws.processing[ts] - # expected duraction should be close to d0 - assert expected_duration > (1 - tolerance) * exp_avg - assert expected_duration < (1 + tolerance) * exp_avg - - @pytest.mark.asyncio async def test_bad_local_directory(cleanup): async with await Scheduler() as s: From 5b10cfabdcd03a34e0d63fbf24613ee0643c240b Mon Sep 17 00:00:00 2001 From: James Bourbeau Date: Fri, 4 Dec 2020 14:42:27 -0600 Subject: [PATCH 3/7] Add WorkerState.executing --- distributed/scheduler.py | 33 +++++++++++++++++++++----------- distributed/tests/test_worker.py | 14 +++++++------- distributed/worker.py | 11 ++++++----- 3 files changed, 35 insertions(+), 23 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index ef6722135f2..1a494234512 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -225,6 +225,12 @@ class WorkerState: This attribute is kept in sync with :attr:`TaskState.processing_on`. + .. attribute:: executing: {TaskState: duration} + + A dictionary of tasks that are currently being run on this worker. + Each task state is asssociated with the duration in seconds which + the task has been running. + .. attribute:: has_what: {TaskState} The set of tasks which currently reside on this worker. @@ -287,6 +293,7 @@ class WorkerState: "address", "bandwidth", "extra", + "executing", "has_what", "_hash", "last_seen", @@ -320,6 +327,7 @@ def __init__( versions=None, nanny=None, extra=None, + executing=None, ): self.address = address self.pid = pid @@ -347,6 +355,7 @@ def __init__( self.used_resources = {} self.extra = extra or {} + self.executing = executing or {} def __hash__(self): return self._hash @@ -387,6 +396,7 @@ def clean(self): extra=self.extra, ) ws.processing = {ts.key: cost for ts, cost in self.processing.items()} + ws.executing = {ts.key: duration for ts, duration in self.executing.items()} return ws def __repr__(self): @@ -1695,6 +1705,7 @@ def heartbeat_worker( resources=None, host_info=None, metrics=None, + executing=None, ): address = self.coerce_address(address, resolve_address) address = normalize_address(address) @@ -1733,6 +1744,11 @@ def heartbeat_worker( ws.last_seen = time() + if executing is not None: + ws.executing = { + self.tasks[key]: duration for key, duration in executing.items() + } + if metrics: ws.metrics = metrics @@ -4258,19 +4274,14 @@ def set_duration_estimate(self, ts, ws): estimate the task duration to be 2x current-runtime, otherwise we set it to be the average duration. """ - # we might want to pull these vars from settings? - support_outliers = True - threshold = 2 - duration = self.get_task_duration(ts) comm = self.get_comm_cost(ts, ws) - new_duration = duration + comm - exec_time = -1 - if "executing" in ws.metrics and ts.key in ws.metrics["executing"]: - exec_time = ws.metrics["executing"][ts.key] - if support_outliers and exec_time > threshold * duration: - new_duration = 2 * exec_time - ws.processing[ts] = new_duration + total_duration = duration + comm + if ts in ws.executing: + exec_time = ws.executing[ts] + if exec_time > 2 * duration: + total_duration = 2 * exec_time + ws.processing[ts] = total_duration return ws.processing[ts] def transition_waiting_processing(self, key): diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index b41e174bf5f..92f09a6bbc8 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -1681,19 +1681,19 @@ async def test_update_latency(cleanup): @pytest.mark.asyncio -async def test_heartbeat_executing(cleanup): +async def test_workerstate_executing(cleanup): async with await Scheduler() as s: async with await Worker(s.address) as w: async with Client(s.address, asynchronous=True) as c: ws = s.workers[w.address] # Initially there are no active tasks - assert not ws.metrics["executing"] - # Submit a task and ensure the worker's heartbeat includes the task - # in it's executing + assert not ws.executing + # Submit a task and ensure the WorkerState is updated with the task + # it's executing f = c.submit(slowinc, 1, delay=1) - while not ws.metrics["executing"]: - await w.heartbeat() - assert f.key in ws.metrics["executing"] + while not ws.executing: + await asyncio.sleep(0.01) + assert s.tasks[f.key] in ws.executing await f diff --git a/distributed/worker.py b/distributed/worker.py index f1896d40c81..e1b0225fc49 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -791,6 +791,7 @@ def local_dir(self): async def get_metrics(self): now = time() core = dict( + executing=self.executing_count, in_memory=len(self.data), ready=len(self.ready), in_flight=self.in_flight_tasks, @@ -799,11 +800,6 @@ async def get_metrics(self): "workers": dict(self.bandwidth_workers), "types": keymap(typename, self.bandwidth_types), }, - executing={ - key: now - self.tasks[key].start_time - for key in self.active_threads.values() - if key in self.tasks - }, ) custom = {} for k, metric in self.metrics.items(): @@ -931,6 +927,11 @@ async def heartbeat(self): address=self.contact_address, now=time(), metrics=await self.get_metrics(), + executing={ + key: start - self.tasks[key].start_time + for key in self.active_threads.values() + if key in self.tasks + }, ) end = time() middle = (start + end) / 2 From 7a61d5b39e462f4fcc74bdcf1edeb8af574e74f9 Mon Sep 17 00:00:00 2001 From: James Bourbeau Date: Fri, 4 Dec 2020 19:47:09 -0600 Subject: [PATCH 4/7] Apply suggestions from code review Co-authored-by: jakirkham --- distributed/scheduler.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 790b508f15d..9ac39a9aeb2 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -4416,7 +4416,7 @@ def decide_worker(self, ts): return worker - def set_duration_estimate(self, ts, ws): + def set_duration_estimate(self, ts: TaskState, ws: WorkerState): """Estimate task duration using worker state and task state. If a task takes longer than twice the current average duration we @@ -4426,12 +4426,12 @@ def set_duration_estimate(self, ts, ws): duration = self.get_task_duration(ts) comm = self.get_comm_cost(ts, ws) total_duration = duration + comm - if ts in ws.executing: - exec_time = ws.executing[ts] + if ts in ws._executing: + exec_time = ws._executing[ts] if exec_time > 2 * duration: total_duration = 2 * exec_time - ws.processing[ts] = total_duration - return ws.processing[ts] + ws._processing[ts] = total_duration + return ws._processing[ts] def transition_waiting_processing(self, key): try: From ec3e194ea3ec8eb0ec45d5d54c5f1486b4f424cd Mon Sep 17 00:00:00 2001 From: James Bourbeau Date: Fri, 4 Dec 2020 22:00:31 -0600 Subject: [PATCH 5/7] Apply suggestions from code review Co-authored-by: jakirkham --- distributed/scheduler.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 9ac39a9aeb2..f5380a67ddf 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -4423,11 +4423,11 @@ def set_duration_estimate(self, ts: TaskState, ws: WorkerState): estimate the task duration to be 2x current-runtime, otherwise we set it to be the average duration. """ - duration = self.get_task_duration(ts) - comm = self.get_comm_cost(ts, ws) - total_duration = duration + comm + duration: double = self.get_task_duration(ts) + comm: double = self.get_comm_cost(ts, ws) + total_duration: double = duration + comm if ts in ws._executing: - exec_time = ws._executing[ts] + exec_time: double = ws._executing[ts] if exec_time > 2 * duration: total_duration = 2 * exec_time ws._processing[ts] = total_duration From 47f0f45ec9721ae518383644dfbe9f8d2a2674e9 Mon Sep 17 00:00:00 2001 From: James Bourbeau Date: Mon, 7 Dec 2020 19:54:42 -0600 Subject: [PATCH 6/7] Update distributed/scheduler.py Co-authored-by: jakirkham --- distributed/scheduler.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index b96a089b6c2..e5f1158226b 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -525,8 +525,8 @@ def clean(self): extra=self._extra, ) ts: TaskState - ws._processing = {ts.key: cost for ts, cost in self._processing.items()} - ws._executing = {ts.key: duration for ts, duration in self._executing.items()} + ws._processing = {ts._key: cost for ts, cost in self._processing.items()} + ws._executing = {ts._key: duration for ts, duration in self._executing.items()} return ws def __repr__(self): From e9205b405c002796c0b746dec95860b65f799e63 Mon Sep 17 00:00:00 2001 From: James Bourbeau Date: Fri, 18 Dec 2020 18:01:45 -0600 Subject: [PATCH 7/7] Trigger CI