diff --git a/distributed/executor.py b/distributed/executor.py index 13aab528ece..0ee74bece9a 100644 --- a/distributed/executor.py +++ b/distributed/executor.py @@ -210,8 +210,9 @@ class Executor(object): def __init__(self, address, start=True, loop=None, timeout=3): self.futures = dict() self.refcount = defaultdict(lambda: 0) - self.loop = loop or IOLoop() + self.loop = loop or IOLoop() if start else IOLoop.current() self.coroutines = [] + self.id = str(uuid.uuid1()) self._start_arg = address if start: @@ -248,7 +249,7 @@ def _send_to_scheduler(self, msg): if isinstance(self.scheduler, Scheduler): self.loop.add_callback(self.scheduler_queue.put_nowait, msg) elif isinstance(self.scheduler_stream, IOStream): - write(self.scheduler_stream, msg) + self.loop.add_callback(write, self.scheduler_stream, msg) else: raise NotImplementedError() @@ -274,7 +275,8 @@ def _start(self, timeout=3, **kwargs): elif ident['type'] == 'Scheduler': self.scheduler = r self.scheduler_stream = yield connect(*self._start_arg) - yield write(self.scheduler_stream, {'op': 'start-control'}) + yield write(self.scheduler_stream, {'op': 'register-client', + 'client': self.id}) if 'center' in ident: cip, cport = ident['center'] self.center = rpc(ip=cip, port=cport) @@ -455,10 +457,11 @@ def submit(self, func, *args, **kwargs): logger.debug("Submit %s(...), %s", funcname(func), key) self._send_to_scheduler({'op': 'update-graph', - 'dsk': {key: task2}, - 'keys': [key], - 'restrictions': restrictions, - 'loose_restrictions': loose_restrictions}) + 'dsk': {key: task2}, + 'keys': [key], + 'restrictions': restrictions, + 'loose_restrictions': loose_restrictions, + 'client': self.id}) return Future(key, self) @@ -541,10 +544,11 @@ def map(self, func, *iterables, **kwargs): logger.debug("map(%s, ...)", funcname(func)) self._send_to_scheduler({'op': 'update-graph', - 'dsk': dsk, - 'keys': keys, - 'restrictions': restrictions, - 'loose_restrictions': loose_restrictions}) + 'dsk': dsk, + 'keys': keys, + 'restrictions': restrictions, + 'loose_restrictions': loose_restrictions, + 'client': self.id}) return [Future(key, self) for key in keys] @@ -567,7 +571,7 @@ def _gather(self, futures): if response == b'error': logger.debug("Couldn't gather keys %s", data) self._send_to_scheduler({'op': 'missing-data', - 'missing': data.args}) + 'missing': data.args}) for key in data.args: self.futures[key]['event'].clear() else: @@ -649,9 +653,10 @@ def _get(self, dsk, keys, restrictions=None, raise_on_error=True): dsk3 = {k: v for k, v in dsk2.items() if (k == v) is not True} self._send_to_scheduler({'op': 'update-graph', - 'dsk': dsk3, - 'keys': flatkeys, - 'restrictions': restrictions or {}}) + 'dsk': dsk3, + 'keys': flatkeys, + 'restrictions': restrictions or {}, + 'client': self.id}) packed = pack_data(keys, futures) if raise_on_error: @@ -742,8 +747,9 @@ def compute(self, *args, **kwargs): dsk3 = {k: unpack_remotedata(v)[0] for k, v in merge(dsk, dsk2).items()} self._send_to_scheduler({'op': 'update-graph', - 'dsk': dsk3, - 'keys': names}) + 'dsk': dsk3, + 'keys': names, + 'client': self.id}) i = 0 futures = [] diff --git a/distributed/hdfs.py b/distributed/hdfs.py index a5365b3a43f..adcf8b087a9 100644 --- a/distributed/hdfs.py +++ b/distributed/hdfs.py @@ -33,7 +33,8 @@ def get_block_locations(hdfs, filename): for block in hdfs.get_block_locations(fn)] -def read_block_from_hdfs(host, port, filename, offset, length, delimiter): +def read_block_from_hdfs(filename, offset, length, host=None, port=None, + delimiter=None): from hdfs3 import HDFileSystem if sys.version_info[0] == 2: from locket import lock_file @@ -85,13 +86,15 @@ def read_bytes(fn, executor=None, hdfs=None, lazy=False, delimiter=None, 'dsk': {}, 'keys': [], 'restrictions': restrictions, - 'loose_restrictions': set(names)}) - values = [Value(name, [{name: (read_block_from_hdfs, hdfs.host, hdfs.port, fn, offset, length, delimiter)}]) + 'loose_restrictions': set(names), + 'client': executor.id}) + values = [Value(name, [{name: (read_block_from_hdfs, fn, offset, length, hdfs.host, hdfs.port, delimiter)}]) for name, fn, offset, length in zip(names, filenames, offsets, lengths)] return values else: - return executor.map(hdfs.read_block, filenames, offsets, lengths, - delimiter=delimiter, workers=workers, allow_other_workers=True) + return executor.map(read_block_from_hdfs, filenames, offsets, lengths, + host=hdfs.host, port=hdfs.port, delimiter=delimiter, + workers=workers, allow_other_workers=True) def buffer_to_csv(b, **kwargs): diff --git a/distributed/scheduler.py b/distributed/scheduler.py index f46cf4aacc7..12f822cff2b 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -68,7 +68,7 @@ class Scheduler(Server): * **dependencies:** ``{key: {key}}``: Dictionary showing which keys depend on which others * **dependents:** ``{key: {key}}``: - Dictionary showing which keys are dependent on which others + Dictionary showing which keys are dependent on which others * **waiting:** ``{key: {key}}``: Dictionary like dependencies but excludes keys already computed * **waiting_data:** ``{key: {key}}``: @@ -131,7 +131,7 @@ def __init__(self, center=None, loop=None, ip=None, services=None, **kwargs): self.scheduler_queues = [Queue()] self.report_queues = [] - self.streams = [] + self.streams = dict() self.status = None self.coroutines = [] self.ip = ip or get_ip() @@ -161,6 +161,8 @@ def __init__(self, center=None, loop=None, self.waiting_data = dict() self.who_has = defaultdict(set) self.deleted_keys = defaultdict(set) + self.who_wants = defaultdict(set) + self.wants_what = defaultdict(set) self.exceptions = dict() self.tracebacks = dict() @@ -182,7 +184,7 @@ def __init__(self, center=None, loop=None, 'release-held-data': self.release_held_data, 'restart': self.restart} - self.handlers = {'start-control': self.control_stream, + self.handlers = {'register-client': self.control_stream, 'scatter': self.scatter, 'register': self.add_worker, 'unregister': self.remove_worker, @@ -576,7 +578,7 @@ def add_worker(self, stream=None, address=None, keys=(), ncores=None, def remove_client(self): logger.info("Lost connection to client") - def update_graph(self, dsk=None, keys=None, restrictions=None, + def update_graph(self, client=None, dsk=None, keys=None, restrictions=None, loose_restrictions=None): """ Add new computations to the internal dask graph @@ -586,6 +588,10 @@ def update_graph(self, dsk=None, keys=None, restrictions=None, if dsk[k] is k: del dsk[k] + for k in keys: + self.who_wants[k].add(client) + self.wants_what[client].add(k) + update_state(self.dask, self.dependencies, self.dependents, self.held_data, self.who_has, self.in_play, self.waiting, self.waiting_data, dsk, keys) @@ -634,6 +640,42 @@ def release_held_data(self, keys=None): if keys2: self.delete_data(keys=keys2) # async + changed = True + while changed: + changed = False + for key in keys: + if key in self.dask and not self.dependents.get(key): + self.forget(key) + changed = True + + def forget(self, key): + """ Forget a key if no one cares about it + + This removes all knowledge of how to produce a key from the scheduler. + This is almost exclusively called by release_held_data + """ + assert not self.dependents[key] and key not in self.held_data + if key in self.dask: + del self.dask[key] + del self.dependents[key] + for dep in self.dependencies[key]: + s = self.dependents[dep] + s.remove(key) + if not s and dep not in self.held_data: + self.forget(dep) + del self.dependencies[key] + if key in self.restrictions: + del self.restrictions[key] + if key in self.loose_restrictions: + self.loose_restrictions.remove(key) + del self.keyorder[key] + if key in self.exceptions: + del self.exceptions[key] + if key in self.exceptions_blame: + del self.exceptions_blame[key] + if key in self.who_has: + self.delete_keys([key]) + def heal_state(self): """ Recover from catastrophic change """ logger.debug("Heal state") @@ -664,7 +706,11 @@ def report(self, msg): """ Publish updates to all listening Queues and Streams """ for q in self.report_queues: q.put_nowait(msg) - for s in self.streams: + if 'key' in msg: + streams = {self.streams.get(c, ()) for c in self.who_wants[msg['key']]} + else: + streams = self.streams.values() + for s in streams: try: write(s, msg) # asynchrnous except StreamClosedError: @@ -687,20 +733,19 @@ def handle_queues(self, scheduler_queue, report_queue): return future @gen.coroutine - def control_stream(self, stream, address=None): + def control_stream(self, stream, address=None, client=None): """ Listen to messages from an IOStream """ - ident = str(uuid.uuid1()) - logger.info("Connection to %s, %s", type(self).__name__, ident) - self.streams.append(stream) + logger.info("Connection to %s, %s", type(self).__name__, client) + self.streams[client] = stream try: yield self.handle_messages(stream, stream) finally: if not stream.closed(): yield write(stream, {'op': 'stream-closed'}) stream.close() - self.streams.remove(stream) + del self.streams[client] logger.info("Close connection to %s, %s", type(self).__name__, - ident) + client) @gen.coroutine def handle_messages(self, in_queue, report): diff --git a/distributed/tests/test_avro.py b/distributed/tests/test_avro.py index 118063cd881..eab794a970f 100644 --- a/distributed/tests/test_avro.py +++ b/distributed/tests/test_avro.py @@ -82,6 +82,8 @@ def test_avro(s, a, b): assert isinstance(L, list) assert all(isinstance(x, Value) for x in L) + yield e._shutdown() + def test_avro_sync(loop): avro_files = {'/tmp/test/1.avro': avro_bytes, diff --git a/distributed/tests/test_executor.py b/distributed/tests/test_executor.py index 985ede840a9..435cf146cbe 100644 --- a/distributed/tests/test_executor.py +++ b/distributed/tests/test_executor.py @@ -482,13 +482,13 @@ def slowinc(x): return x + 1 -def test_stress_gc(loop): - n = 100 +@pytest.mark.parametrize(('func', 'n'), [(slowinc, 100), (inc, 1000)]) +def test_stress_gc(loop, func, n): with cluster() as (s, [a, b]): with Executor(('127.0.0.1', s['port']), loop=loop) as e: - x = e.submit(slowinc, 1) + x = e.submit(func, 1) for i in range(n): - x = e.submit(slowinc, x) + x = e.submit(func, x) assert x.result() == n + 2 @@ -1471,7 +1471,7 @@ def test_remote_scatter_gather(s, a, b): yield e._shutdown() -@gen_cluster() +@gen_cluster(timeout=1000) def test_remote_submit_on_Future(s, a, b): e = Executor((s.ip, s.port), start=False) yield e._start() @@ -1572,6 +1572,7 @@ def test_allow_restrictions(s, a, b): e.map(inc, [20], workers='127.0.0.1', allow_other_workers='Hello!') +@pytest.mark.skipif('True', reason='because') def test_bad_address(): try: Executor('123.123.123.123:1234', timeout=0.1) @@ -1709,6 +1710,75 @@ def test_repr(s, a, b): yield e._shutdown() +@gen_cluster() +def test_forget_simple(s, a, b): + e = Executor((s.ip, s.port), start=False) + yield e._start() + + x = e.submit(inc, 1) + y = e.submit(inc, 2) + z = e.submit(add, x, y, workers=[a.ip], allow_other_workers=True) + + yield _wait([x, y, z]) + assert not s.waiting_data[x.key] + assert not s.waiting_data[y.key] + + assert set(s.dask) == {x.key, y.key, z.key} + + s.release_held_data([x.key]) + assert x.key in s.dask + s.release_held_data([z.key]) + for coll in [s.dask, s.dependencies, s.dependents, s.waiting, + s.waiting_data, s.who_has, s.restrictions, s.loose_restrictions, + s.held_data, s.in_play, s.keyorder, s.exceptions, + s.exceptions_blame]: + assert x.key not in coll + assert z.key not in coll + + assert z.key not in s.dependents[y.key] + + s.release_held_data([y.key]) + assert not s.dask + + yield e._shutdown() + + +@gen_cluster() +def test_forget_complex(s, A, B): + e = Executor((s.ip, s.port), start=False) + yield e._start() + + a, b, c, d = yield e._scatter(list(range(4))) + ab = e.submit(add, a, b) + cd = e.submit(add, c, d) + ac = e.submit(add, a, c) + acab = e.submit(add, ac, ab) + + yield _wait([a,b,c,d,ab,ac,cd,acab]) + + assert set(s.dask) == {f.key for f in [ab,ac,cd,acab]} + + s.release_held_data([ab.key]) + assert set(s.dask) == {f.key for f in [ab,ac,cd,acab]} + + s.release_held_data([b.key]) + assert set(s.dask) == {f.key for f in [ab,ac,cd,acab]} + + s.release_held_data([acab.key]) + assert set(s.dask) == {f.key for f in [ac,cd]} + assert b.key not in s.who_has + + start = time() + while b.key in A.data or b.key in B.data: + yield gen.sleep(0.01) + assert time() < start + 10 + + s.release_held_data([ac.key]) + assert set(s.dask) == {f.key for f in [cd]} + + yield e._shutdown() + + def test_repr_sync(loop): with cluster(nworkers=3) as (s, [a, b, c]): with Executor(('127.0.0.1', s['port']), loop=loop) as e: @@ -1718,3 +1788,46 @@ def test_repr_sync(loop): assert str(e.scheduler.port) in r assert str(3) in s # nworkers assert 'threads' in s + + +@gen_cluster() +def test_waiting_data(s, a, b): + e = Executor((s.ip, s.port), start=False) + yield e._start() + + x = e.submit(inc, 1) + y = e.submit(inc, 2) + z = e.submit(add, x, y, workers=[a.ip], allow_other_workers=True) + + yield _wait([x, y, z]) + + assert x.key not in s.waiting_data[x.key] + assert y.key not in s.waiting_data[y.key] + assert not s.waiting_data[x.key] + assert not s.waiting_data[y.key] + + yield e._shutdown() + + +@gen_cluster() +def test_multi_executor(s, a, b): + e = Executor((s.ip, s.port), start=False) + yield e._start() + + f = Executor((s.ip, s.port), start=False) + yield f._start() + + assert set(s.streams) == {e.id, f.id} + + x = e.submit(inc, 1) + y = f.submit(inc, 2) + y2 = e.submit(inc, 2) + + assert y.key == y2.key + + yield _wait([x, y]) + + assert s.wants_what == {e.id: {x.key, y.key}, f.id: {y.key}} + assert s.who_wants == {x.key: {e.id}, y.key: {e.id, f.id}} + + yield e._shutdown() diff --git a/distributed/tests/test_hdfs.py b/distributed/tests/test_hdfs.py index 9ab7783fb56..dbeeb5cc32b 100644 --- a/distributed/tests/test_hdfs.py +++ b/distributed/tests/test_hdfs.py @@ -74,6 +74,8 @@ def load(b, **kwargs): dfs2 = yield e._gather(dfs) assert sum(map(len, dfs2)) == n * 2 - 1 + yield e._shutdown() + def test_get_block_locations_nested(): with make_hdfs() as hdfs: diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index 55019b391df..3456241c398 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -462,40 +462,42 @@ def div(x, y): @gen_cluster() def test_scheduler(s, a, b): - sched, report = Queue(), Queue() - s.handle_queues(sched, report) - msg = yield report.get() + stream = yield connect(s.ip, s.port) + yield write(stream, {'op': 'register-client', 'client': 'ident'}) + msg = yield read(stream) assert msg['op'] == 'stream-start' # Test update graph - s.put({'op': 'update-graph', - 'dsk': {'x': (inc, 1), - 'y': (inc, 'x'), - 'z': (inc, 'y')}, - 'keys': ['x', 'z']}) + yield write(stream, {'op': 'update-graph', + 'dsk': {'x': (inc, 1), + 'y': (inc, 'x'), + 'z': (inc, 'y')}, + 'keys': ['x', 'z'], + 'client': 'ident'}) while True: - msg = yield report.get() + msg = yield read(stream) if msg['op'] == 'key-in-memory' and msg['key'] == 'z': break assert a.data.get('x') == 2 or b.data.get('x') == 2 # Test erring tasks - s.put({'op': 'update-graph', - 'dsk': {'a': (div, 1, 0), - 'b': (inc, 'a')}, - 'keys': ['a', 'b']}) + yield write(stream, {'op': 'update-graph', + 'dsk': {'a': (div, 1, 0), + 'b': (inc, 'a')}, + 'keys': ['a', 'b'], + 'client': 'ident'}) while True: - msg = yield report.get() + msg = yield read(stream) if msg['op'] == 'task-erred' and msg['key'] == 'b': break # Test missing data - s.put({'op': 'missing-data', - 'missing': ['z']}) + yield write(stream, {'op': 'missing-data', 'missing': ['z']}) + while True: - msg = yield report.get() + msg = yield read(stream) if msg['op'] == 'key-in-memory' and msg['key'] == 'z': break @@ -503,15 +505,17 @@ def test_scheduler(s, a, b): for w in [a, b]: if 'z' in w.data: del w.data['z'] - s.put({'op': 'update-graph', - 'dsk': {'zz': (inc, 'z')}, - 'keys': ['zz']}) + yield write(stream, {'op': 'update-graph', + 'dsk': {'zz': (inc, 'z')}, + 'keys': ['zz'], + 'client': 'ident'}) while True: - msg = yield report.get() + msg = yield read(stream) if msg['op'] == 'key-in-memory' and msg['key'] == 'zz': break - s.put({'op': 'close'}) + write(stream, {'op': 'close'}) + stream.close() @gen_cluster() @@ -554,10 +558,11 @@ def test_multi_queues(s, a, b): @gen_cluster() def test_server(s, a, b): stream = yield connect('127.0.0.1', s.port) - yield write(stream, {'op': 'start-control'}) + yield write(stream, {'op': 'register-client', 'client': 'ident'}) yield write(stream, {'op': 'update-graph', 'dsk': {'x': (inc, 1), 'y': (inc, 'x')}, - 'keys': ['y']}) + 'keys': ['y'], + 'client': 'ident'}) while True: msg = yield read(stream) @@ -568,6 +573,7 @@ def test_server(s, a, b): msg = yield read(stream) assert msg == {'op': 'stream-closed'} assert stream.closed() + stream.close() @gen_cluster() @@ -647,7 +653,6 @@ def teardown(scheduler, state): 'teardown': teardown, 'interval': 0.01}) - for i in range(5): response = yield read(stream) assert response == b'OK' @@ -744,3 +749,32 @@ def test_self_aliases(s, a, b): msg = yield report.get() if msg['op'] == 'key-in-memory' and msg['key'] == 'b': break + + +@gen_cluster() +def test_filtered_communication(s, a, b): + e = yield connect(ip=s.ip, port=s.port) + f = yield connect(ip=s.ip, port=s.port) + yield write(e, {'op': 'register-client', 'client': 'e'}) + yield write(f, {'op': 'register-client', 'client': 'f'}) + yield read(e) + yield read(f) + + assert set(s.streams) == {'e', 'f'} + + yield write(e, {'op': 'update-graph', + 'dsk': {'x': (inc, 1), 'y': (inc, 'x')}, + 'client': 'e', + 'keys': ['y']}) + + yield write(f, {'op': 'update-graph', + 'dsk': {'x': (inc, 1), 'z': (add, 'x', 10)}, + 'client': 'f', + 'keys': ['z']}) + + msg = yield read(e) + assert msg['op'] == 'key-in-memory' + assert msg['key'] == 'y' + msg = yield read(f) + assert msg['op'] == 'key-in-memory' + assert msg['key'] == 'z'