diff --git a/distributed/protocol/core.py b/distributed/protocol/core.py index fb85d32fee2..0d87595bf75 100644 --- a/distributed/protocol/core.py +++ b/distributed/protocol/core.py @@ -2,7 +2,7 @@ import msgpack -from .compression import compressions, decompress, maybe_compress +from .compression import decompress, maybe_compress from .serialize import ( Serialize, Serialized, @@ -111,55 +111,3 @@ def _decode_default(obj): except Exception: logger.critical("Failed to deserialize", exc_info=True) raise - - -def dumps_msgpack(msg, compression=None): - """Dump msg into header and payload, both bytestrings - - All of the message must be msgpack encodable - - See Also: - loads_msgpack - """ - header = {} - payload = msgpack.dumps(msg, default=msgpack_encode_default, use_bin_type=True) - - fmt, payload = maybe_compress(payload, compression=compression) - if fmt: - header["compression"] = fmt - - if header: - header_bytes = msgpack.dumps(header, use_bin_type=True) - else: - header_bytes = b"" - - return [header_bytes, payload] - - -def loads_msgpack(header, payload): - """Read msgpack header and payload back to Python object - - See Also: - dumps_msgpack - """ - header = bytes(header) - if header: - header = msgpack.loads( - header, object_hook=msgpack_decode_default, use_list=False, **msgpack_opts - ) - else: - header = {} - - if header.get("compression"): - try: - decompress = compressions[header["compression"]]["decompress"] - payload = decompress(payload) - except KeyError: - raise ValueError( - "Data is compressed as %s but we don't have this" - " installed" % str(header["compression"]) - ) - - return msgpack.loads( - payload, object_hook=msgpack_decode_default, use_list=False, **msgpack_opts - )