Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 23 additions & 17 deletions distributed/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,8 +210,9 @@ class Executor(object):
def __init__(self, address, start=True, loop=None, timeout=3):
self.futures = dict()
self.refcount = defaultdict(lambda: 0)
self.loop = loop or IOLoop()
self.loop = loop or IOLoop() if start else IOLoop.current()
self.coroutines = []
self.id = str(uuid.uuid1())
self._start_arg = address

if start:
Expand Down Expand Up @@ -248,7 +249,7 @@ def _send_to_scheduler(self, msg):
if isinstance(self.scheduler, Scheduler):
self.loop.add_callback(self.scheduler_queue.put_nowait, msg)
elif isinstance(self.scheduler_stream, IOStream):
write(self.scheduler_stream, msg)
self.loop.add_callback(write, self.scheduler_stream, msg)
else:
raise NotImplementedError()

Expand All @@ -274,7 +275,8 @@ def _start(self, timeout=3, **kwargs):
elif ident['type'] == 'Scheduler':
self.scheduler = r
self.scheduler_stream = yield connect(*self._start_arg)
yield write(self.scheduler_stream, {'op': 'start-control'})
yield write(self.scheduler_stream, {'op': 'register-client',
'client': self.id})
if 'center' in ident:
cip, cport = ident['center']
self.center = rpc(ip=cip, port=cport)
Expand Down Expand Up @@ -455,10 +457,11 @@ def submit(self, func, *args, **kwargs):

logger.debug("Submit %s(...), %s", funcname(func), key)
self._send_to_scheduler({'op': 'update-graph',
'dsk': {key: task2},
'keys': [key],
'restrictions': restrictions,
'loose_restrictions': loose_restrictions})
'dsk': {key: task2},
'keys': [key],
'restrictions': restrictions,
'loose_restrictions': loose_restrictions,
'client': self.id})

return Future(key, self)

Expand Down Expand Up @@ -541,10 +544,11 @@ def map(self, func, *iterables, **kwargs):

logger.debug("map(%s, ...)", funcname(func))
self._send_to_scheduler({'op': 'update-graph',
'dsk': dsk,
'keys': keys,
'restrictions': restrictions,
'loose_restrictions': loose_restrictions})
'dsk': dsk,
'keys': keys,
'restrictions': restrictions,
'loose_restrictions': loose_restrictions,
'client': self.id})

return [Future(key, self) for key in keys]

Expand All @@ -567,7 +571,7 @@ def _gather(self, futures):
if response == b'error':
logger.debug("Couldn't gather keys %s", data)
self._send_to_scheduler({'op': 'missing-data',
'missing': data.args})
'missing': data.args})
for key in data.args:
self.futures[key]['event'].clear()
else:
Expand Down Expand Up @@ -649,9 +653,10 @@ def _get(self, dsk, keys, restrictions=None, raise_on_error=True):
dsk3 = {k: v for k, v in dsk2.items() if (k == v) is not True}

self._send_to_scheduler({'op': 'update-graph',
'dsk': dsk3,
'keys': flatkeys,
'restrictions': restrictions or {}})
'dsk': dsk3,
'keys': flatkeys,
'restrictions': restrictions or {},
'client': self.id})

packed = pack_data(keys, futures)
if raise_on_error:
Expand Down Expand Up @@ -742,8 +747,9 @@ def compute(self, *args, **kwargs):
dsk3 = {k: unpack_remotedata(v)[0] for k, v in merge(dsk, dsk2).items()}

self._send_to_scheduler({'op': 'update-graph',
'dsk': dsk3,
'keys': names})
'dsk': dsk3,
'keys': names,
'client': self.id})

i = 0
futures = []
Expand Down
13 changes: 8 additions & 5 deletions distributed/hdfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ def get_block_locations(hdfs, filename):
for block in hdfs.get_block_locations(fn)]


def read_block_from_hdfs(host, port, filename, offset, length, delimiter):
def read_block_from_hdfs(filename, offset, length, host=None, port=None,
delimiter=None):
from hdfs3 import HDFileSystem
if sys.version_info[0] == 2:
from locket import lock_file
Expand Down Expand Up @@ -85,13 +86,15 @@ def read_bytes(fn, executor=None, hdfs=None, lazy=False, delimiter=None,
'dsk': {},
'keys': [],
'restrictions': restrictions,
'loose_restrictions': set(names)})
values = [Value(name, [{name: (read_block_from_hdfs, hdfs.host, hdfs.port, fn, offset, length, delimiter)}])
'loose_restrictions': set(names),
'client': executor.id})
values = [Value(name, [{name: (read_block_from_hdfs, fn, offset, length, hdfs.host, hdfs.port, delimiter)}])
for name, fn, offset, length in zip(names, filenames, offsets, lengths)]
return values
else:
return executor.map(hdfs.read_block, filenames, offsets, lengths,
delimiter=delimiter, workers=workers, allow_other_workers=True)
return executor.map(read_block_from_hdfs, filenames, offsets, lengths,
host=hdfs.host, port=hdfs.port, delimiter=delimiter,
workers=workers, allow_other_workers=True)


def buffer_to_csv(b, **kwargs):
Expand Down
67 changes: 56 additions & 11 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class Scheduler(Server):
* **dependencies:** ``{key: {key}}``:
Dictionary showing which keys depend on which others
* **dependents:** ``{key: {key}}``:
Dictionary showing which keys are dependent on which others
Dictionary showing which keys are dependent on which others
* **waiting:** ``{key: {key}}``:
Dictionary like dependencies but excludes keys already computed
* **waiting_data:** ``{key: {key}}``:
Expand Down Expand Up @@ -131,7 +131,7 @@ def __init__(self, center=None, loop=None,
ip=None, services=None, **kwargs):
self.scheduler_queues = [Queue()]
self.report_queues = []
self.streams = []
self.streams = dict()
self.status = None
self.coroutines = []
self.ip = ip or get_ip()
Expand Down Expand Up @@ -161,6 +161,8 @@ def __init__(self, center=None, loop=None,
self.waiting_data = dict()
self.who_has = defaultdict(set)
self.deleted_keys = defaultdict(set)
self.who_wants = defaultdict(set)
self.wants_what = defaultdict(set)

self.exceptions = dict()
self.tracebacks = dict()
Expand All @@ -182,7 +184,7 @@ def __init__(self, center=None, loop=None,
'release-held-data': self.release_held_data,
'restart': self.restart}

self.handlers = {'start-control': self.control_stream,
self.handlers = {'register-client': self.control_stream,
'scatter': self.scatter,
'register': self.add_worker,
'unregister': self.remove_worker,
Expand Down Expand Up @@ -576,7 +578,7 @@ def add_worker(self, stream=None, address=None, keys=(), ncores=None,
def remove_client(self):
logger.info("Lost connection to client")

def update_graph(self, dsk=None, keys=None, restrictions=None,
def update_graph(self, client=None, dsk=None, keys=None, restrictions=None,
loose_restrictions=None):
""" Add new computations to the internal dask graph

Expand All @@ -586,6 +588,10 @@ def update_graph(self, dsk=None, keys=None, restrictions=None,
if dsk[k] is k:
del dsk[k]

for k in keys:
self.who_wants[k].add(client)
self.wants_what[client].add(k)

update_state(self.dask, self.dependencies, self.dependents,
self.held_data, self.who_has, self.in_play,
self.waiting, self.waiting_data, dsk, keys)
Expand Down Expand Up @@ -634,6 +640,42 @@ def release_held_data(self, keys=None):
if keys2:
self.delete_data(keys=keys2) # async

changed = True
while changed:
changed = False
for key in keys:
if key in self.dask and not self.dependents.get(key):
self.forget(key)
changed = True

def forget(self, key):
""" Forget a key if no one cares about it

This removes all knowledge of how to produce a key from the scheduler.
This is almost exclusively called by release_held_data
"""
assert not self.dependents[key] and key not in self.held_data
if key in self.dask:
del self.dask[key]
del self.dependents[key]
for dep in self.dependencies[key]:
s = self.dependents[dep]
s.remove(key)
if not s and dep not in self.held_data:
self.forget(dep)
del self.dependencies[key]
if key in self.restrictions:
del self.restrictions[key]
if key in self.loose_restrictions:
self.loose_restrictions.remove(key)
del self.keyorder[key]
if key in self.exceptions:
del self.exceptions[key]
if key in self.exceptions_blame:
del self.exceptions_blame[key]
if key in self.who_has:
self.delete_keys([key])

def heal_state(self):
""" Recover from catastrophic change """
logger.debug("Heal state")
Expand Down Expand Up @@ -664,7 +706,11 @@ def report(self, msg):
""" Publish updates to all listening Queues and Streams """
for q in self.report_queues:
q.put_nowait(msg)
for s in self.streams:
if 'key' in msg:
streams = {self.streams.get(c, ()) for c in self.who_wants[msg['key']]}
else:
streams = self.streams.values()
for s in streams:
try:
write(s, msg) # asynchrnous
except StreamClosedError:
Expand All @@ -687,20 +733,19 @@ def handle_queues(self, scheduler_queue, report_queue):
return future

@gen.coroutine
def control_stream(self, stream, address=None):
def control_stream(self, stream, address=None, client=None):
""" Listen to messages from an IOStream """
ident = str(uuid.uuid1())
logger.info("Connection to %s, %s", type(self).__name__, ident)
self.streams.append(stream)
logger.info("Connection to %s, %s", type(self).__name__, client)
self.streams[client] = stream
try:
yield self.handle_messages(stream, stream)
finally:
if not stream.closed():
yield write(stream, {'op': 'stream-closed'})
stream.close()
self.streams.remove(stream)
del self.streams[client]
logger.info("Close connection to %s, %s", type(self).__name__,
ident)
client)

@gen.coroutine
def handle_messages(self, in_queue, report):
Expand Down
2 changes: 2 additions & 0 deletions distributed/tests/test_avro.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ def test_avro(s, a, b):
assert isinstance(L, list)
assert all(isinstance(x, Value) for x in L)

yield e._shutdown()


def test_avro_sync(loop):
avro_files = {'/tmp/test/1.avro': avro_bytes,
Expand Down
Loading