Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
3895a79
Do no allocate large temp buffers in _Pickler.dump
ogrisel Nov 9, 2017
9fc8d67
Add comment to justify iteration over chunks
ogrisel Nov 9, 2017
964a67f
Concatenate short bytes
ogrisel Nov 9, 2017
d7216c1
Add entry to NEWS
ogrisel Nov 9, 2017
410d73a
Reduce overhead for small bytes objects
ogrisel Nov 10, 2017
37b4e5b
Even more overhead reduction for small bytes buffers
ogrisel Nov 10, 2017
ba73a71
Fix no-copy for protocol < 4
ogrisel Nov 10, 2017
8eac6a2
More overhead reduction for small bytes
ogrisel Nov 10, 2017
6adc17b
Direct write for large bytes in the C-pickler
ogrisel Nov 10, 2017
e795e66
Update NEWS to include the C pickler
ogrisel Nov 11, 2017
bb4d3eb
No-copy dump for large unicode in C pickler
ogrisel Nov 12, 2017
5d52bd7
Update NEWS to mention str [ci skip]
ogrisel Nov 12, 2017
054c94b
Simpler NEWS entry [ci skip]
ogrisel Nov 12, 2017
31c3afa
C-code style fixes
ogrisel Nov 12, 2017
dcce63a
Fixed leak
ogrisel Nov 12, 2017
d233208
Do not wrap large objects in frames
ogrisel Nov 12, 2017
dfe6314
Fix bug in function call result handling
ogrisel Nov 12, 2017
61611b2
Fix frameless blobs test for pickletools.optimize
ogrisel Nov 12, 2017
e67df81
Flush to file after each frame commit
ogrisel Nov 12, 2017
d1e9a7d
Extend NEWS entry to mention write-on-frame-commit
ogrisel Nov 12, 2017
7f08831
Make proto 4 Python pickler issue (2 * n_frames + 1) calls to write
ogrisel Nov 17, 2017
750ae86
Cleanup _Pickler_CommitFrame
ogrisel Nov 17, 2017
c0c8973
Get rid of the frameless_blobs=False exception: actually it works out…
ogrisel Nov 17, 2017
304571a
Implement frameless blobs for pickletools.optimize
ogrisel Nov 17, 2017
4ee2ee9
Do not skip test silently if self.pickler is renamed
ogrisel Nov 17, 2017
b4c978b
Remove on attribute lookup
ogrisel Nov 17, 2017
e60a3d5
Typo in comment
ogrisel Nov 17, 2017
8181630
Add comments for tests that require self.pickler
ogrisel Nov 17, 2017
25e4e04
revert variable renaming
ogrisel Nov 24, 2017
b57ef9d
Disable test_framed_write_sizes in OptimizedPickleTests
ogrisel Nov 24, 2017
727b5c2
Small code style fix
ogrisel Nov 24, 2017
8af9321
Add comments to explain tradeoffs in Python pickle frame commit
ogrisel Nov 27, 2017
c7bbffa
Implement no-copy support for delayed writers in Python pickler
ogrisel Dec 3, 2017
796c469
Fix renamed test_framed_write_sizes_with_delayed_writer in test_pickl…
ogrisel Dec 3, 2017
a2e87e9
Add comment to explain flush after commit in _Pickler_OpcodeBoundary
ogrisel Jan 4, 2018
8edaa64
Removed unused argument payload_size
ogrisel Jan 4, 2018
609ebfe
Update the changelog to reflect the new behavior of the Python pickler
ogrisel Jan 4, 2018
28f6297
Fix phrasing
ogrisel Jan 4, 2018
7c91f06
Fixed typos in comments
ogrisel Jan 5, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 40 additions & 10 deletions Lib/pickle.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,21 +201,46 @@ def commit_frame(self, force=False):
if self.current_frame:
f = self.current_frame
if f.tell() >= self._FRAME_SIZE_TARGET or force:
with f.getbuffer() as data:
n = len(data)
write = self.file_write
write(FRAME)
write(pack("<Q", n))
write(data)
f.seek(0)
f.truncate()
data = f.getbuffer()
write = self.file_write
# Issue a single call to the write method of the underlying
# file object for the frame opcode with the size of the
# frame. The concatenation is expected to be less expensive
# than issuing an additional call to write.
write(FRAME + pack("<Q", len(data)))

# Issue a separate call to write to append the frame
# contents without concatenation to the above to avoid a
# memory copy.
write(data)

# Start the new frame with a new io.BytesIO instance so that
# the file object can have delayed access to the previous frame
# contents via an unreleased memoryview of the previous
# io.BytesIO instance.
self.current_frame = io.BytesIO()

def write(self, data):
if self.current_frame:
return self.current_frame.write(data)
else:
return self.file_write(data)

def write_large_bytes(self, header, payload):
write = self.file_write
if self.current_frame:
# Terminate the current frame and flush it to the file.
self.commit_frame(force=True)

# Perform direct write of the header and payload of the large binary
# object. Be careful not to concatenate the header and the payload
# prior to calling 'write' as we do not want to allocate a large
# temporary bytes object.
# We intentionally do not insert a protocol 4 frame opcode to make
# it possible to optimize file.read calls in the loader.
write(header)
write(payload)


class _Unframer:

Expand Down Expand Up @@ -379,6 +404,7 @@ def __init__(self, file, protocol=None, *, fix_imports=True):
raise TypeError("file must have a 'write' attribute")
self.framer = _Framer(self._file_write)
self.write = self.framer.write
self._write_large_bytes = self.framer.write_large_bytes
self.memo = {}
self.proto = int(protocol)
self.bin = protocol >= 1
Expand Down Expand Up @@ -696,7 +722,9 @@ def save_bytes(self, obj):
if n <= 0xff:
self.write(SHORT_BINBYTES + pack("<B", n) + obj)
elif n > 0xffffffff and self.proto >= 4:
self.write(BINBYTES8 + pack("<Q", n) + obj)
self._write_large_bytes(BINBYTES8 + pack("<Q", n), obj)
elif n >= self.framer._FRAME_SIZE_TARGET:
self._write_large_bytes(BINBYTES + pack("<I", n), obj)
else:
self.write(BINBYTES + pack("<I", n) + obj)
self.memoize(obj)
Expand All @@ -709,7 +737,9 @@ def save_str(self, obj):
if n <= 0xff and self.proto >= 4:
self.write(SHORT_BINUNICODE + pack("<B", n) + encoded)
elif n > 0xffffffff and self.proto >= 4:
self.write(BINUNICODE8 + pack("<Q", n) + encoded)
self._write_large_bytes(BINUNICODE8 + pack("<Q", n), encoded)
elif n >= self.framer._FRAME_SIZE_TARGET:
self._write_large_bytes(BINUNICODE + pack("<I", n), encoded)
else:
self.write(BINUNICODE + pack("<I", n) + encoded)
else:
Expand Down
11 changes: 8 additions & 3 deletions Lib/pickletools.py
Original file line number Diff line number Diff line change
Expand Up @@ -2279,7 +2279,7 @@ def optimize(p):
if arg > proto:
proto = arg
if pos == 0:
protoheader = p[pos: end_pos]
protoheader = p[pos:end_pos]
else:
opcodes.append((pos, end_pos))
else:
Expand All @@ -2295,6 +2295,7 @@ def optimize(p):
pickler.framer.start_framing()
idx = 0
for op, arg in opcodes:
frameless = False
if op is put:
if arg not in newids:
continue
Expand All @@ -2305,8 +2306,12 @@ def optimize(p):
data = pickler.get(newids[arg])
else:
data = p[op:arg]
pickler.framer.commit_frame()
pickler.write(data)
frameless = len(data) > pickler.framer._FRAME_SIZE_TARGET
pickler.framer.commit_frame(force=frameless)
if frameless:
pickler.framer.file_write(data)
else:
pickler.write(data)
pickler.framer.end_framing()
return out.getvalue()

Expand Down
131 changes: 118 additions & 13 deletions Lib/test/pickletester.py
Original file line number Diff line number Diff line change
Expand Up @@ -2042,21 +2042,40 @@ def test_setitems_on_non_dicts(self):
def check_frame_opcodes(self, pickled):
"""
Check the arguments of FRAME opcodes in a protocol 4+ pickle.

Note that binary objects that are larger than FRAME_SIZE_TARGET are not
framed by default and are therefore considered a frame by themselves in
the following consistency check.
"""
frame_opcode_size = 9
last_arg = last_pos = None
last_arg = last_pos = last_frame_opcode_size = None
frameless_opcode_sizes = {
'BINBYTES': 5,
'BINUNICODE': 5,
'BINBYTES8': 9,
'BINUNICODE8': 9,
}
for op, arg, pos in pickletools.genops(pickled):
if op.name != 'FRAME':
if op.name in frameless_opcode_sizes:
if len(arg) > self.FRAME_SIZE_TARGET:
frame_opcode_size = frameless_opcode_sizes[op.name]
arg = len(arg)
else:
continue
elif op.name == 'FRAME':
frame_opcode_size = 9
else:
continue

if last_pos is not None:
# The previous frame's size should be equal to the number
# of bytes up to the current frame.
frame_size = pos - last_pos - frame_opcode_size
frame_size = pos - last_pos - last_frame_opcode_size
self.assertEqual(frame_size, last_arg)
last_arg, last_pos = arg, pos
last_frame_opcode_size = frame_opcode_size
# The last frame's size should be equal to the number of bytes up
# to the pickle's end.
frame_size = len(pickled) - last_pos - frame_opcode_size
frame_size = len(pickled) - last_pos - last_frame_opcode_size
self.assertEqual(frame_size, last_arg)

def test_framing_many_objects(self):
Expand All @@ -2076,15 +2095,36 @@ def test_framing_many_objects(self):

def test_framing_large_objects(self):
N = 1024 * 1024
obj = [b'x' * N, b'y' * N, b'z' * N]
obj = [b'x' * N, b'y' * N, 'z' * N]
for proto in range(4, pickle.HIGHEST_PROTOCOL + 1):
with self.subTest(proto=proto):
pickled = self.dumps(obj, proto)
unpickled = self.loads(pickled)
self.assertEqual(obj, unpickled)
n_frames = count_opcode(pickle.FRAME, pickled)
self.assertGreaterEqual(n_frames, len(obj))
self.check_frame_opcodes(pickled)
for fast in [True, False]:
with self.subTest(proto=proto, fast=fast):
if hasattr(self, 'pickler'):
buf = io.BytesIO()
pickler = self.pickler(buf, protocol=proto)
pickler.fast = fast
pickler.dump(obj)
pickled = buf.getvalue()
elif fast:
continue
else:
# Fallback to self.dumps when fast=False and
# self.pickler is not available.
pickled = self.dumps(obj, proto)
unpickled = self.loads(pickled)
# More informative error message in case of failure.
self.assertEqual([len(x) for x in obj],
[len(x) for x in unpickled])
# Perform full equality check if the lengths match.
self.assertEqual(obj, unpickled)
n_frames = count_opcode(pickle.FRAME, pickled)
if not fast:
# One frame per memoize for each large object.
self.assertGreaterEqual(n_frames, len(obj))
else:
# One frame at the beginning and one at the end.
self.assertGreaterEqual(n_frames, 2)
self.check_frame_opcodes(pickled)

def test_optional_frames(self):
if pickle.HIGHEST_PROTOCOL < 4:
Expand Down Expand Up @@ -2125,6 +2165,71 @@ def remove_frames(pickled, keep_frame=None):
count_opcode(pickle.FRAME, pickled))
self.assertEqual(obj, self.loads(some_frames_pickle))

def test_framed_write_sizes_with_delayed_writer(self):
class ChunkAccumulator:
"""Accumulate pickler output in a list of raw chunks."""

def __init__(self):
self.chunks = []

def write(self, chunk):
self.chunks.append(chunk)

def concatenate_chunks(self):
# Some chunks can be memoryview instances, we need to convert
# them to bytes to be able to call join
return b"".join([c.tobytes() if hasattr(c, 'tobytes') else c
for c in self.chunks])

small_objects = [(str(i).encode('ascii'), i % 42, {'i': str(i)})
for i in range(int(1e4))]

for proto in range(4, pickle.HIGHEST_PROTOCOL + 1):
# Protocol 4 packs groups of small objects into frames and issues
# calls to write only once or twice per frame:
# The C pickler issues one call to write per-frame (header and
# contents) while Python pickler issues two calls to write: one for
# the frame header and one for the frame binary contents.
writer = ChunkAccumulator()
self.pickler(writer, proto).dump(small_objects)

# Actually read the binary content of the chunks after the end
# of the call to dump: ant memoryview passed to write should not
# be released otherwise this delayed access would not be possible.
pickled = writer.concatenate_chunks()
reconstructed = self.loads(pickled)
self.assertEqual(reconstructed, small_objects)
self.assertGreater(len(writer.chunks), 1)

n_frames, remainder = divmod(len(pickled), self.FRAME_SIZE_TARGET)
if remainder > 0:
n_frames += 1

# There should be at least one call to write per frame
self.assertGreaterEqual(len(writer.chunks), n_frames)

# but not too many either: there can be one for the proto,
# one per-frame header and one per frame for the actual contents.
self.assertGreaterEqual(2 * n_frames + 1, len(writer.chunks))

chunk_sizes = [len(c) for c in writer.chunks[:-1]]
large_sizes = [s for s in chunk_sizes
if s >= self.FRAME_SIZE_TARGET]
small_sizes = [s for s in chunk_sizes
if s < self.FRAME_SIZE_TARGET]

# Large chunks should not be too large:
for chunk_size in large_sizes:
self.assertGreater(2 * self.FRAME_SIZE_TARGET, chunk_size)

last_chunk_size = len(writer.chunks[-1])
self.assertGreater(2 * self.FRAME_SIZE_TARGET, last_chunk_size)

# Small chunks (if any) should be very small
# (only proto and frame headers)
for chunk_size in small_sizes:
self.assertGreaterEqual(9, chunk_size)

def test_nested_names(self):
global Nested
class Nested:
Expand Down
3 changes: 3 additions & 0 deletions Lib/test/test_pickletools.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ def loads(self, buf, **kwds):
# Test relies on precise output of dumps()
test_pickle_to_2x = None

# Test relies on writing by chunks into a file object.
test_framed_write_sizes_with_delayed_writer = None

def test_optimize_long_binget(self):
data = [str(i) for i in range(257)]
data.append(data[-1])
Expand Down
14 changes: 14 additions & 0 deletions Misc/NEWS.d/next/Library/2017-11-10-00-05-08.bpo-31993.-OMNg8.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
The picklers do no longer allocate temporary memory when dumping large
``bytes`` and ``str`` objects into a file object. Instead the data is
directly streamed into the underlying file object.

Previously the C implementation would buffer all content and issue a
single call to ``file.write`` at the end of the dump. With protocol 4
this behavior has changed to issue one call to ``file.write`` per frame.

The Python pickler with protocol 4 now dumps each frame content as a
memoryview to an IOBytes instance that is never reused and the
memoryview is no longer released after the call to write. This makes it
possible for the file object to delay access to the memoryview of
previous frames without forcing any additional memory copy as was
already possible with the C pickler.
Loading