From a3671e45ce0465f1776be1293dee4930d874511d Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Mon, 13 Jul 2020 18:28:17 -0700 Subject: [PATCH 1/8] Skip coercing to `bytes` in `merge_frames` As the frames we receive are typically mutable, non-`bytes` objects like `bytearray`s or NumPy `ndarray`s, coercing to `bytes` at this stage triggers a copy of all frames. As we are going to toss those copied versions anyways when joining them into a larger `bytes` object, this ends up being wasteful with memory. Fortunately `bytes.join(...)` accepts any and all `bytes`-like objects. So instead just pass them all through as-is to `bytes.join(...)`, which is free and doesn't require a copy. Should cutdown on the memory usage in this part of the code. --- distributed/protocol/utils.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/distributed/protocol/utils.py b/distributed/protocol/utils.py index fa020dae909..defbda2ba4f 100644 --- a/distributed/protocol/utils.py +++ b/distributed/protocol/utils.py @@ -1,7 +1,7 @@ import struct import msgpack -from ..utils import ensure_bytes, nbytes +from ..utils import nbytes BIG_BYTES_SHARD_SIZE = 2 ** 26 @@ -84,7 +84,7 @@ def merge_frames(header, frames): if len(L) == 1: # no work necessary out.extend(L) else: - out.append(b"".join(map(ensure_bytes, L))) + out.append(b"".join(L)) return out From 1e327eb35d4b4a703a5d5ef65eef464efd02bf7b Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Mon, 13 Jul 2020 19:53:47 -0700 Subject: [PATCH 2/8] Create empty `bytes` object explicitly Instead of using `b""`, create the `bytes` object explicitly. --- distributed/protocol/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/protocol/utils.py b/distributed/protocol/utils.py index defbda2ba4f..9bdbaa8f117 100644 --- a/distributed/protocol/utils.py +++ b/distributed/protocol/utils.py @@ -84,7 +84,7 @@ def merge_frames(header, frames): if len(L) == 1: # no work necessary out.extend(L) else: - out.append(b"".join(L)) + out.append(bytes().join(L)) return out From 4b3be4d88a18485c9ee4a0e304298b7c9c18310a Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Mon, 13 Jul 2020 19:53:48 -0700 Subject: [PATCH 3/8] Use `bytearray`s to join frames Instead of using a `bytes` object to merge frames together, use a `bytearray`. This ensures the result is mutable, which downstream objects from serialization may care about. --- distributed/protocol/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/protocol/utils.py b/distributed/protocol/utils.py index 9bdbaa8f117..0ae4c945411 100644 --- a/distributed/protocol/utils.py +++ b/distributed/protocol/utils.py @@ -84,7 +84,7 @@ def merge_frames(header, frames): if len(L) == 1: # no work necessary out.extend(L) else: - out.append(bytes().join(L)) + out.append(bytearray().join(L)) return out From 70dc67e3eee411c2ad47b7fbeddb6f8bd34ce8a8 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Mon, 13 Jul 2020 19:53:50 -0700 Subject: [PATCH 4/8] Just use append with singleton list --- distributed/protocol/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/protocol/utils.py b/distributed/protocol/utils.py index 0ae4c945411..a4a28da30c4 100644 --- a/distributed/protocol/utils.py +++ b/distributed/protocol/utils.py @@ -82,7 +82,7 @@ def merge_frames(header, frames): frames.append(mv[l:]) l = 0 if len(L) == 1: # no work necessary - out.extend(L) + out.append(L[0]) else: out.append(bytearray().join(L)) return out From 37479b6461542318b83c90227cd2a724672d946e Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Mon, 13 Jul 2020 19:53:51 -0700 Subject: [PATCH 5/8] Add `ensure_bytearray` Provides a utility method to coerce the input into a `bytearray` (copying if needed). --- distributed/utils.py | 42 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/distributed/utils.py b/distributed/utils.py index dec1b6b79d3..14249acea23 100644 --- a/distributed/utils.py +++ b/distributed/utils.py @@ -944,6 +944,48 @@ def ensure_bytes(s): ) from e +def ensure_bytearray(s): + """Attempt to turn `s` into `bytearray`. + + Parameters + ---------- + s : Any + The object to be converted. Will correctly handled + + * str + * bytes + * objects implementing the buffer protocol (memoryview, ndarray, etc.) + + Returns + ------- + b : bytes + + Raises + ------ + TypeError + When `s` cannot be converted + + Examples + -------- + + >>> ensure_bytearray('123') + bytearray(b'123') + >>> ensure_bytearray(b'123') + bytearray(b'123') + """ + if isinstance(s, bytearray): + return s + elif hasattr(s, "encode"): + return bytearray(s.encode()) + else: + try: + return bytearray(s) + except Exception as e: + raise TypeError( + "Object %s is neither a bytes object nor has an encode method" % s + ) from e + + def divide_n_among_bins(n, bins): """ From 7968897ac7eca725eeaa1377cdd9ec18f7d9e781 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Mon, 13 Jul 2020 19:53:53 -0700 Subject: [PATCH 6/8] Ensure `bytearray`s in the trivial case --- distributed/protocol/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/protocol/utils.py b/distributed/protocol/utils.py index a4a28da30c4..b32de137a7c 100644 --- a/distributed/protocol/utils.py +++ b/distributed/protocol/utils.py @@ -62,7 +62,7 @@ def merge_frames(header, frames): assert sum(lengths) == sum(map(nbytes, frames)) if all(len(f) == l for f, l in zip(frames, lengths)): - return frames + return list(map(ensure_bytearray, frames)) frames = frames[::-1] lengths = lengths[::-1] From f31f68a2623efa1131530294430d6420b9841a02 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Mon, 13 Jul 2020 19:53:55 -0700 Subject: [PATCH 7/8] Ensure we return a `bytearray` in singleton case --- distributed/protocol/utils.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/distributed/protocol/utils.py b/distributed/protocol/utils.py index b32de137a7c..c287949bc53 100644 --- a/distributed/protocol/utils.py +++ b/distributed/protocol/utils.py @@ -1,7 +1,7 @@ import struct import msgpack -from ..utils import nbytes +from ..utils import ensure_bytearray, nbytes BIG_BYTES_SHARD_SIZE = 2 ** 26 @@ -82,7 +82,7 @@ def merge_frames(header, frames): frames.append(mv[l:]) l = 0 if len(L) == 1: # no work necessary - out.append(L[0]) + out.append(ensure_bytearray(L[0])) else: out.append(bytearray().join(L)) return out From 8f34e7fb1094bce4023d90520bfcaad2b64c017c Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Mon, 13 Jul 2020 19:53:56 -0700 Subject: [PATCH 8/8] Test NumPy array is writable after serialization --- distributed/protocol/tests/test_numpy.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/distributed/protocol/tests/test_numpy.py b/distributed/protocol/tests/test_numpy.py index 0e299632902..d09b5e435ad 100644 --- a/distributed/protocol/tests/test_numpy.py +++ b/distributed/protocol/tests/test_numpy.py @@ -100,6 +100,14 @@ def test_dumps_serialize_numpy(x): np.testing.assert_equal(x, y) +def test_dumps_numpy_writable(): + a1 = np.arange(1000) + a1.flags.writeable = False + (a2,) = loads(dumps([to_serialize(a1)])) + assert (a1 == a2).all() + assert a2.flags.writeable + + @pytest.mark.parametrize( "x", [