From f07d5a49070f65137127e19ec3d4fda6d1bf2c54 Mon Sep 17 00:00:00 2001 From: Matthew Rocklin Date: Wed, 23 May 2018 08:52:10 -0400 Subject: [PATCH 1/8] wip - Use tuples in msgpack --- distributed/client.py | 12 ++++++------ distributed/core.py | 2 +- distributed/protocol/core.py | 4 ++-- distributed/protocol/serialize.py | 4 ++-- 4 files changed, 11 insertions(+), 11 deletions(-) diff --git a/distributed/client.py b/distributed/client.py index 8be4f945fb9..b016de7ef8a 100644 --- a/distributed/client.py +++ b/distributed/client.py @@ -960,7 +960,7 @@ def _handle_report(self): continue else: break - if not isinstance(msgs, list): + if not isinstance(msgs, (list, tuple)): msgs = [msgs] breakout = False @@ -2665,7 +2665,7 @@ def ncores(self, workers=None, **kwargs): if (isinstance(workers, tuple) and all(isinstance(i, (str, tuple)) for i in workers)): workers = list(workers) - if workers is not None and not isinstance(workers, (list, set)): + if workers is not None and not isinstance(workers, (tuple, list, set)): workers = [workers] return self.sync(self.scheduler.ncores, workers=workers, **kwargs) @@ -2731,7 +2731,7 @@ def has_what(self, workers=None, **kwargs): if (isinstance(workers, tuple) and all(isinstance(i, (str, tuple)) for i in workers)): workers = list(workers) - if workers is not None and not isinstance(workers, (list, set)): + if workers is not None and not isinstance(workers, (tuple, list, set)): workers = [workers] return self.sync(self.scheduler.has_what, workers=workers, **kwargs) @@ -2760,7 +2760,7 @@ def processing(self, workers=None): if (isinstance(workers, tuple) and all(isinstance(i, (str, tuple)) for i in workers)): workers = list(workers) - if workers is not None and not isinstance(workers, (list, set)): + if workers is not None and not isinstance(workers, (tuple, list, set)): workers = [workers] return self.sync(self.scheduler.processing, workers=workers) @@ -2910,7 +2910,7 @@ def get_metadata(self, keys, default=no_default): -------- Client.set_metadata """ - if not isinstance(keys, list): + if not isinstance(keys, (tuple, list)): keys = [keys] return self.sync(self.scheduler.get_metadata, keys=keys, default=default) @@ -3011,7 +3011,7 @@ def set_metadata(self, key, value): -------- get_metadata """ - if not isinstance(key, list): + if not isinstance(key, (tuple, list)): key = [key] return self.sync(self.scheduler.set_metadata, keys=key, value=value) diff --git a/distributed/core.py b/distributed/core.py index c00ecf088b9..f36643ed8ea 100644 --- a/distributed/core.py +++ b/distributed/core.py @@ -359,7 +359,7 @@ def handle_stream(self, comm, extra=None, every_cycle=[]): try: while not closed: msgs = yield comm.read() - if not isinstance(msgs, list): + if not isinstance(msgs, (list, tuple)): msgs = [msgs] if not comm.closed(): diff --git a/distributed/protocol/core.py b/distributed/protocol/core.py index 24e980351d0..a5f5548b4a1 100644 --- a/distributed/protocol/core.py +++ b/distributed/protocol/core.py @@ -160,7 +160,7 @@ def loads_msgpack(header, payload): dumps_msgpack """ if header: - header = msgpack.loads(header, encoding='utf8') + header = msgpack.loads(header, encoding='utf8', use_list=False) else: header = {} @@ -172,4 +172,4 @@ def loads_msgpack(header, payload): raise ValueError("Data is compressed as %s but we don't have this" " installed" % str(header['compression'])) - return msgpack.loads(payload, encoding='utf8') + return msgpack.loads(payload, encoding='utf8', use_list=False) diff --git a/distributed/protocol/serialize.py b/distributed/protocol/serialize.py index 4e397abee1a..27cb33abe28 100644 --- a/distributed/protocol/serialize.py +++ b/distributed/protocol/serialize.py @@ -63,7 +63,7 @@ def msgpack_dumps(x): def msgpack_loads(header, frames): - return msgpack.loads(b''.join(frames), encoding='utf8') + return msgpack.loads(b''.join(frames), encoding='utf8', use_list=False) def serialization_error_loads(header, frames): @@ -441,7 +441,7 @@ def deserialize_bytes(b): frames = unpack_frames(b) header, frames = frames[0], frames[1:] if header: - header = msgpack.loads(header, encoding='utf8') + header = msgpack.loads(header, encoding='utf8', use_list=False) else: header = {} frames = decompress(header, frames) From 2a9b431e92115ceb51bd75920db2407d241c4228 Mon Sep 17 00:00:00 2001 From: Marius van Niekerk Date: Tue, 29 May 2018 12:06:43 -0400 Subject: [PATCH 2/8] Fixed failing tests --- distributed/client.py | 2 +- distributed/diagnostics/tests/test_eventstream.py | 2 +- distributed/protocol/core.py | 7 ++++++- distributed/protocol/tests/test_protocol.py | 2 +- distributed/tests/test_batched.py | 10 +++++----- distributed/tests/test_client.py | 4 ++-- distributed/tests/test_publish.py | 6 +++--- distributed/tests/test_queues.py | 2 +- distributed/tests/test_variable.py | 2 +- 9 files changed, 21 insertions(+), 16 deletions(-) diff --git a/distributed/client.py b/distributed/client.py index b016de7ef8a..e0155875fe1 100644 --- a/distributed/client.py +++ b/distributed/client.py @@ -3040,7 +3040,7 @@ def get_versions(self, check=False): if check: # we care about the required & optional packages matching def to_packages(d): - return dict(sum(d['packages'].values(), [])) + return dict(sum(map(list, d['packages'].values()), [])) client_versions = to_packages(result['client']) versions = [('scheduler', to_packages(result['scheduler']))] versions.extend((w, to_packages(d)) diff --git a/distributed/diagnostics/tests/test_eventstream.py b/distributed/diagnostics/tests/test_eventstream.py index ee7f343d421..9cb07dac123 100644 --- a/distributed/diagnostics/tests/test_eventstream.py +++ b/distributed/diagnostics/tests/test_eventstream.py @@ -57,7 +57,7 @@ def test_eventstream_remote(c, s, a, b): total = [] while len(total) < 10: msgs = yield comm.read() - assert isinstance(msgs, list) + assert isinstance(msgs, (list, tuple)) total.extend(msgs) assert time() < start + 5 diff --git a/distributed/protocol/core.py b/distributed/protocol/core.py index a5f5548b4a1..41dd933caad 100644 --- a/distributed/protocol/core.py +++ b/distributed/protocol/core.py @@ -122,7 +122,12 @@ def loads(frames, deserialize=True, deserializers=None): else: value = Serialized(head, fs) - get_in(key[:-1], msg)[key[-1]] = value + # TODO: This is problematic with tuple assignment + val_holder = get_in(key[:-1], msg) + if isinstance(val_holder, tuple): + msg[key[0]] = list(val_holder) + val_holder = get_in(key[:-1], msg) + val_holder[key[-1]] = value return msg except Exception: diff --git a/distributed/protocol/tests/test_protocol.py b/distributed/protocol/tests/test_protocol.py index 0906d7687d3..509640e47fa 100644 --- a/distributed/protocol/tests/test_protocol.py +++ b/distributed/protocol/tests/test_protocol.py @@ -56,7 +56,7 @@ def test_small(): def test_small_and_big(): - d = {'x': [1, 2, 3], 'y': b'0' * 10000000} + d = {'x': (1, 2, 3), 'y': b'0' * 10000000} L = dumps(d) assert loads(L) == d # assert loads([small_header, small]) == {'x': [1, 2, 3]} diff --git a/distributed/tests/test_batched.py b/distributed/tests/test_batched.py index eac2bbfbeff..8643147de75 100644 --- a/distributed/tests/test_batched.py +++ b/distributed/tests/test_batched.py @@ -69,9 +69,9 @@ def test_BatchedSend(): b.send('HELLO') result = yield comm.read() - assert result == ['hello', 'hello', 'world'] + assert list(result) == ['hello', 'hello', 'world'] result = yield comm.read() - assert result == ['HELLO', 'HELLO'] + assert list(result) == ['HELLO', 'HELLO'] assert b.byte_count > 1 @@ -88,7 +88,7 @@ def test_send_before_start(): b.start(comm) result = yield comm.read() - assert result == ['hello', 'world'] + assert list(result) == ['hello', 'world'] @gen_test() @@ -104,7 +104,7 @@ def test_send_after_stream_start(): result = yield comm.read() if len(result) < 2: result += yield comm.read() - assert result == ['hello', 'world'] + assert list(result) == ['hello', 'world'] @gen_test() @@ -295,7 +295,7 @@ def test_serializers(): assert 'function' in value msg = yield comm.read() - assert msg == [{'x': 123}, {'x': 'hello'}] + assert list(msg) == [{'x': 123}, {'x': 'hello'}] with pytest.raises(gen.TimeoutError): msg = yield gen.with_timeout(timedelta(milliseconds=100), comm.read()) diff --git a/distributed/tests/test_client.py b/distributed/tests/test_client.py index cc6f992e911..cedc5326234 100644 --- a/distributed/tests/test_client.py +++ b/distributed/tests/test_client.py @@ -3340,7 +3340,7 @@ def test_default_get(): @gen_cluster(client=True) def test_get_processing(c, s, a, b): processing = yield c.processing() - assert processing == valmap(list, s.processing) + assert processing == valmap(tuple, s.processing) futures = c.map(slowinc, range(10), delay=0.1, workers=[a.address], allow_other_workers=True) @@ -3351,7 +3351,7 @@ def test_get_processing(c, s, a, b): assert set(x) == {a.address, b.address} x = yield c.processing(workers=[a.address]) - assert isinstance(x[a.address], list) + assert isinstance(x[a.address], (list, tuple)) @gen_cluster(client=True) diff --git a/distributed/tests/test_publish.py b/distributed/tests/test_publish.py index bf6b2097d5b..7c27f4fef3d 100644 --- a/distributed/tests/test_publish.py +++ b/distributed/tests/test_publish.py @@ -28,10 +28,10 @@ def test_publish_simple(s, a, b): assert "data" in str(exc_info.value) result = yield c.scheduler.publish_list() - assert result == ['data'] + assert list(result) == ['data'] result = yield f.scheduler.publish_list() - assert result == ['data'] + assert list(result) == ['data'] yield c.close() yield f.close() @@ -221,7 +221,7 @@ def test_pickle_safe(c, s, a, b): try: yield c2.publish_dataset(x=[1, 2, 3]) result = yield c2.get_dataset('x') - assert result == [1, 2, 3] + assert list(result) == [1, 2, 3] with pytest.raises(TypeError): yield c2.publish_dataset(y=lambda x: x) diff --git a/distributed/tests/test_queues.py b/distributed/tests/test_queues.py index 715aa99d967..5a26e0cbc1b 100644 --- a/distributed/tests/test_queues.py +++ b/distributed/tests/test_queues.py @@ -51,7 +51,7 @@ def test_queue_with_data(c, s, a, b): yield x.put([1, 'hello']) data = yield xx.get() - assert data == [1, 'hello'] + assert list(data) == [1, 'hello'] with pytest.raises(gen.TimeoutError): yield x.get(timeout=0.1) diff --git a/distributed/tests/test_variable.py b/distributed/tests/test_variable.py index db529f12afe..03407ef2812 100644 --- a/distributed/tests/test_variable.py +++ b/distributed/tests/test_variable.py @@ -47,7 +47,7 @@ def test_queue_with_data(c, s, a, b): yield x.set([1, 'hello']) data = yield xx.get() - assert data == [1, 'hello'] + assert list(data) == [1, 'hello'] def test_sync(loop): From 9b904f392151a3d0c590fbf18a624dbddd49411d Mon Sep 17 00:00:00 2001 From: Marius van Niekerk Date: Wed, 30 May 2018 17:57:06 -0400 Subject: [PATCH 3/8] Fixed another edge case? --- distributed/protocol/core.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/distributed/protocol/core.py b/distributed/protocol/core.py index 41dd933caad..fb85b97bd3d 100644 --- a/distributed/protocol/core.py +++ b/distributed/protocol/core.py @@ -125,6 +125,8 @@ def loads(frames, deserialize=True, deserializers=None): # TODO: This is problematic with tuple assignment val_holder = get_in(key[:-1], msg) if isinstance(val_holder, tuple): + if isinstance(msg, tuple): + msg = list(msg) msg[key[0]] = list(val_holder) val_holder = get_in(key[:-1], msg) val_holder[key[-1]] = value From eb0ec320bbba04a8a1e49b451a469d6d6aa9f27b Mon Sep 17 00:00:00 2001 From: Marius van Niekerk Date: Wed, 30 May 2018 18:18:45 -0400 Subject: [PATCH 4/8] Added additional test for dealing with deep serialization of tuples --- distributed/protocol/core.py | 25 +++++++++++++------- distributed/protocol/tests/test_serialize.py | 7 ++++++ 2 files changed, 24 insertions(+), 8 deletions(-) diff --git a/distributed/protocol/core.py b/distributed/protocol/core.py index fb85b97bd3d..c3eac6a5b53 100644 --- a/distributed/protocol/core.py +++ b/distributed/protocol/core.py @@ -122,14 +122,23 @@ def loads(frames, deserialize=True, deserializers=None): else: value = Serialized(head, fs) - # TODO: This is problematic with tuple assignment - val_holder = get_in(key[:-1], msg) - if isinstance(val_holder, tuple): - if isinstance(msg, tuple): - msg = list(msg) - msg[key[0]] = list(val_holder) - val_holder = get_in(key[:-1], msg) - val_holder[key[-1]] = value + from toolz import get_in + + def put_in(keys, coll, val): + """Inverse of get_in, but does type promotion in the case of lists""" + from cytoolz import reduce + import operator + if len(keys) > 0: + holder = reduce(operator.getitem, keys[:-1], coll) + if isinstance(holder, tuple): + holder = list(holder) + coll = put_in(keys[:-1], coll, holder) + holder[keys[-1]] = val + if len(keys) == 0: + coll = val + return coll + + msg = put_in(key, msg, value) return msg except Exception: diff --git a/distributed/protocol/tests/test_serialize.py b/distributed/protocol/tests/test_serialize.py index bc75082c350..a1cedf5f2f7 100644 --- a/distributed/protocol/tests/test_serialize.py +++ b/distributed/protocol/tests/test_serialize.py @@ -170,6 +170,13 @@ def test_empty_loads(): assert isinstance(e2[0], Empty) +def test_empty_loads_deep(): + from distributed.protocol import loads, dumps + e = Empty() + e2 = loads(dumps([[[to_serialize(e)]]])) + assert isinstance(e2[0][0][0], Empty) + + def test_serialize_bytes(): for x in [1, 'abc', np.arange(5)]: b = serialize_bytes(x) From 2b6978cef3e39bf866776be5c4b179b94e3898c1 Mon Sep 17 00:00:00 2001 From: Marius van Niekerk Date: Wed, 30 May 2018 18:59:39 -0400 Subject: [PATCH 5/8] Fix client tests that tested comparisons with deserialized dicts --- distributed/tests/test_client.py | 24 ++++++++++++++++-------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/distributed/tests/test_client.py b/distributed/tests/test_client.py index cedc5326234..7cc9be216d1 100644 --- a/distributed/tests/test_client.py +++ b/distributed/tests/test_client.py @@ -3384,6 +3384,14 @@ def test_get_foo(c, s, a, b): assert valmap(sorted, x) == {futures[0].key: sorted(s.who_has[futures[0].key])} +def assert_dict_key_equal(expected, actual): + assert set(expected.keys()) == set(actual.keys()) + for k in actual.keys(): + ev = expected[k] + av = actual[k] + assert list(ev) == list(av) + + @gen_cluster(client=True, ncores=[('127.0.0.1', 1)] * 3) def test_get_foo_lost_keys(c, s, u, v, w): x = c.submit(inc, 1, workers=[u.address]) @@ -3393,27 +3401,27 @@ def test_get_foo_lost_keys(c, s, u, v, w): ua, va, wa = u.address, v.address, w.address d = yield c.scheduler.has_what() - assert d == {ua: [x.key], va: [y.key], wa: []} + assert_dict_key_equal(d, {ua: [x.key], va: [y.key], wa: []}) d = yield c.scheduler.has_what(workers=[ua, va]) - assert d == {ua: [x.key], va: [y.key]} + assert_dict_key_equal(d, {ua: [x.key], va: [y.key]}) d = yield c.scheduler.who_has() - assert d == {x.key: [ua], y.key: [va]} + assert_dict_key_equal(d, {x.key: [ua], y.key: [va]}) d = yield c.scheduler.who_has(keys=[x.key, y.key]) - assert d == {x.key: [ua], y.key: [va]} + assert_dict_key_equal(d, {x.key: [ua], y.key: [va]}) yield u._close() yield v._close() d = yield c.scheduler.has_what() - assert d == {wa: []} + assert_dict_key_equal(d, {wa: []}) d = yield c.scheduler.has_what(workers=[ua, va]) - assert d == {ua: [], va: []} + assert_dict_key_equal(d, {ua: [], va: []}) # The scattered key cannot be recomputed so it is forgotten d = yield c.scheduler.who_has() - assert d == {x.key: []} + assert_dict_key_equal(d, {x.key: []}) # ... but when passed explicitly, it is included in the result d = yield c.scheduler.who_has(keys=[x.key, y.key]) - assert d == {x.key: [], y.key: []} + assert_dict_key_equal(d, {x.key: [], y.key: []}) @slow From 0d2b3b1a161d089e5d544251571dc4e3ea5350e5 Mon Sep 17 00:00:00 2001 From: Matthew Rocklin Date: Mon, 11 Jun 2018 08:53:39 -0400 Subject: [PATCH 6/8] clean up tuple/list change --- distributed/client.py | 13 +++++++------ distributed/core.py | 4 ++-- distributed/diagnostics/tests/test_eventstream.py | 2 +- distributed/protocol/core.py | 13 +++++-------- distributed/tests/test_batched.py | 8 ++++---- 5 files changed, 19 insertions(+), 21 deletions(-) diff --git a/distributed/client.py b/distributed/client.py index e0155875fe1..87255dd3499 100644 --- a/distributed/client.py +++ b/distributed/client.py @@ -961,7 +961,7 @@ def _handle_report(self): else: break if not isinstance(msgs, (list, tuple)): - msgs = [msgs] + msgs = (msgs,) breakout = False for msg in msgs: @@ -2910,8 +2910,8 @@ def get_metadata(self, keys, default=no_default): -------- Client.set_metadata """ - if not isinstance(keys, (tuple, list)): - keys = [keys] + if not isinstance(keys, (list, tuple)): + keys = (keys,) return self.sync(self.scheduler.get_metadata, keys=keys, default=default) @@ -3011,8 +3011,8 @@ def set_metadata(self, key, value): -------- get_metadata """ - if not isinstance(key, (tuple, list)): - key = [key] + if not isinstance(key, list): + key = (key,) return self.sync(self.scheduler.set_metadata, keys=key, value=value) def get_versions(self, check=False): @@ -3040,7 +3040,8 @@ def get_versions(self, check=False): if check: # we care about the required & optional packages matching def to_packages(d): - return dict(sum(map(list, d['packages'].values()), [])) + L = list(d['packages'].values()) + return dict(sum(L, type(L[0])())) client_versions = to_packages(result['client']) versions = [('scheduler', to_packages(result['scheduler']))] versions.extend((w, to_packages(d)) diff --git a/distributed/core.py b/distributed/core.py index f36643ed8ea..b152a58cf6d 100644 --- a/distributed/core.py +++ b/distributed/core.py @@ -359,8 +359,8 @@ def handle_stream(self, comm, extra=None, every_cycle=[]): try: while not closed: msgs = yield comm.read() - if not isinstance(msgs, (list, tuple)): - msgs = [msgs] + if not isinstance(msgs, (tuple, list)): + msgs = (msgs,) if not comm.closed(): for msg in msgs: diff --git a/distributed/diagnostics/tests/test_eventstream.py b/distributed/diagnostics/tests/test_eventstream.py index 9cb07dac123..7504a59846b 100644 --- a/distributed/diagnostics/tests/test_eventstream.py +++ b/distributed/diagnostics/tests/test_eventstream.py @@ -57,7 +57,7 @@ def test_eventstream_remote(c, s, a, b): total = [] while len(total) < 10: msgs = yield comm.read() - assert isinstance(msgs, (list, tuple)) + assert isinstance(msgs, tuple) total.extend(msgs) assert time() < start + 5 diff --git a/distributed/protocol/core.py b/distributed/protocol/core.py index c3eac6a5b53..9bf8203f9f8 100644 --- a/distributed/protocol/core.py +++ b/distributed/protocol/core.py @@ -1,13 +1,14 @@ from __future__ import print_function, division, absolute_import import logging +import operator import msgpack try: - from cytoolz import get_in + from cytoolz import get_in, reduce except ImportError: - from toolz import get_in + from toolz import get_in, reduce from .compression import compressions, maybe_compress, decompress from .serialize import (serialize, deserialize, Serialize, Serialized, @@ -122,19 +123,15 @@ def loads(frames, deserialize=True, deserializers=None): else: value = Serialized(head, fs) - from toolz import get_in - def put_in(keys, coll, val): """Inverse of get_in, but does type promotion in the case of lists""" - from cytoolz import reduce - import operator - if len(keys) > 0: + if keys: holder = reduce(operator.getitem, keys[:-1], coll) if isinstance(holder, tuple): holder = list(holder) coll = put_in(keys[:-1], coll, holder) holder[keys[-1]] = val - if len(keys) == 0: + else: coll = val return coll diff --git a/distributed/tests/test_batched.py b/distributed/tests/test_batched.py index 8643147de75..1fcf026b68b 100644 --- a/distributed/tests/test_batched.py +++ b/distributed/tests/test_batched.py @@ -69,9 +69,9 @@ def test_BatchedSend(): b.send('HELLO') result = yield comm.read() - assert list(result) == ['hello', 'hello', 'world'] + assert result == ('hello', 'hello', 'world') result = yield comm.read() - assert list(result) == ['HELLO', 'HELLO'] + assert result == ('HELLO', 'HELLO') assert b.byte_count > 1 @@ -88,7 +88,7 @@ def test_send_before_start(): b.start(comm) result = yield comm.read() - assert list(result) == ['hello', 'world'] + assert result == ('hello', 'world') @gen_test() @@ -104,7 +104,7 @@ def test_send_after_stream_start(): result = yield comm.read() if len(result) < 2: result += yield comm.read() - assert list(result) == ['hello', 'world'] + assert result == ('hello', 'world') @gen_test() From 59b8eebbfba6e45e7d0b9f12655ae8307bd9acf6 Mon Sep 17 00:00:00 2001 From: Matthew Rocklin Date: Mon, 11 Jun 2018 11:13:20 -0400 Subject: [PATCH 7/8] cleanup --- distributed/tests/test_publish.py | 6 +++--- distributed/tests/test_queues.py | 4 ++-- distributed/tests/test_variable.py | 4 ++-- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/distributed/tests/test_publish.py b/distributed/tests/test_publish.py index 7c27f4fef3d..a331b7b957a 100644 --- a/distributed/tests/test_publish.py +++ b/distributed/tests/test_publish.py @@ -28,10 +28,10 @@ def test_publish_simple(s, a, b): assert "data" in str(exc_info.value) result = yield c.scheduler.publish_list() - assert list(result) == ['data'] + assert result == ('data',) result = yield f.scheduler.publish_list() - assert list(result) == ['data'] + assert result == ('data',) yield c.close() yield f.close() @@ -221,7 +221,7 @@ def test_pickle_safe(c, s, a, b): try: yield c2.publish_dataset(x=[1, 2, 3]) result = yield c2.get_dataset('x') - assert list(result) == [1, 2, 3] + assert result == (1, 2, 3) with pytest.raises(TypeError): yield c2.publish_dataset(y=lambda x: x) diff --git a/distributed/tests/test_queues.py b/distributed/tests/test_queues.py index 5a26e0cbc1b..a5a8d63d8fa 100644 --- a/distributed/tests/test_queues.py +++ b/distributed/tests/test_queues.py @@ -48,10 +48,10 @@ def test_queue_with_data(c, s, a, b): xx = yield Queue('x') assert x.client is c - yield x.put([1, 'hello']) + yield x.put((1, 'hello')) data = yield xx.get() - assert list(data) == [1, 'hello'] + assert data == (1, 'hello') with pytest.raises(gen.TimeoutError): yield x.get(timeout=0.1) diff --git a/distributed/tests/test_variable.py b/distributed/tests/test_variable.py index 03407ef2812..6debaa80719 100644 --- a/distributed/tests/test_variable.py +++ b/distributed/tests/test_variable.py @@ -44,10 +44,10 @@ def test_queue_with_data(c, s, a, b): xx = Variable('x') assert x.client is c - yield x.set([1, 'hello']) + yield x.set((1, 'hello')) data = yield xx.get() - assert list(data) == [1, 'hello'] + assert data == (1, 'hello') def test_sync(loop): From 41a457a14c4ce386caf8c5ca3ed84b2dd0ad412f Mon Sep 17 00:00:00 2001 From: Matthew Rocklin Date: Mon, 11 Jun 2018 11:14:00 -0400 Subject: [PATCH 8/8] flake8 --- distributed/protocol/core.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/distributed/protocol/core.py b/distributed/protocol/core.py index 9bf8203f9f8..9209aa06184 100644 --- a/distributed/protocol/core.py +++ b/distributed/protocol/core.py @@ -6,9 +6,9 @@ import msgpack try: - from cytoolz import get_in, reduce + from cytoolz import reduce except ImportError: - from toolz import get_in, reduce + from toolz import reduce from .compression import compressions, maybe_compress, decompress from .serialize import (serialize, deserialize, Serialize, Serialized,