Skip to content

Conversation

@mrocklin
Copy link
Member

@mrocklin mrocklin commented Mar 3, 2017

This implements a custom (and more efficient) serializer for Pandas dataframes and series. It works by calling serialize on each series's values attribute and then merging the results. This redirects either to numpy (common case) or pickle (in cases like categoricals) so we can get benefits in the common case without covering the entire pandas abstraction.

I'm not sure that we should merge this; I suspect that there are quite a few corner cases that it does not handle. It's useful to have it up here for benchmarking and review purposes.

compression = [None]
lengths = [len(head)]
for column in df.columns:
x = df[column].values
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

using .values will cause coercion to numpy datatypes for most things (except categoricals), IOW, datetimes w/tz are converted to UTC (and thus this is not idempotent). Better to use df[column]._values which preserves the structure.

pd.DataFrame({'x': [b'a', b'b', b'c']}),
pd.DataFrame({'x': pd.Categorical(['a', 'b', 'a'], ordered=True)}),
pd.DataFrame({'x': pd.Categorical(['a', 'b', 'a'], ordered=False)}),
pd.Series(np.arange(10000000)),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should test various dtypes (int16, float32), datetimes, timedelta, period (pd.period_range), datetime w/tz (pd.date_range(...., tz='US/Eastern')

@mrocklin mrocklin force-pushed the pandas-serialization branch from ecf3d54 to 16ae338 Compare March 9, 2017 17:52
@mrocklin
Copy link
Member Author

mrocklin commented Jun 1, 2017

Closing this for now. This isn't a high priority and we're waiting for Arrow/Pandas to handle this on their end.

@mrocklin mrocklin closed this Jun 1, 2017
@ogrisel
Copy link
Contributor

ogrisel commented Nov 8, 2017

@mrocklin the fact that worker.data.slow is falling back to cloudpickle.dumps is causing large memory usage spikes when evicting data. This can typically cause the worker to get killed by the nnany when working with data frame partitions that are on the order of 1GB.

It would be great to have a no-copy serialization mechanism, at least for spilling data to disk on workers. This PR looked promising as it could access the underlying numpy data blocks as sliceable memoryviews and therefore pass those as slices of memoryview to msgpack to do no-copy serialization to disk.

I am not sure how pyarrow could allow some no-copy view on pandas 0.21 dataframes. It seems that pyarrow.Table.from_pandas(df) makes at least one memory copy (based on some psutil experiments).

@mrocklin
Copy link
Member Author

mrocklin commented Nov 8, 2017

It is possible to resurrect this work. It was put on hold partially because there wasn't a motivating use case. It sounds like now we do have a motivating use case.

I would be more comfortable with this direction if we were to whitelist when it can be used. For example I expect it to be error prone when dealing with categoricals, timestamps with timezones, and so on.

I am not sure how pyarrow could allow some no-copy view on pandas 0.21 dataframes. It seems that pyarrow.Table.from_pandas(df) makes at least one memory copy (based on some psutil experiments).

@wesm @xhochy do either of you have thoughts on this? My understanding here is the same as @ogrisel 's, that today Arrow is probably not a suitable mechanism for a pandas -> list of memoryviews conversion in a zero-copy manner, and that this is probably out of scope near term. Is this understanding accurate?

@mrocklin
Copy link
Member Author

mrocklin commented Nov 8, 2017

Also @xhochy I think that @ogrisel is trying to resolve problems related to what @bluenote10 ran into regarding memory use on Pandas dataframes.

@xhochy
Copy link

xhochy commented Nov 8, 2017

Zero-copy serialization of Pandas DataFrames to Arrow will only be able for a very narrow scope:

  • integer columns: ✅
  • float columns without nulls (or treating nan as non-null in the case of nans): ✅
  • string columns: ❌
  • categorical columns: ❌
  • boolean columns: ❌
  • datetime columns: ✅ (not sure if we do this atm)
  • object columns: ❌

We can serialise all these types of columns, some can also be reconstructed without any loss of information/type but zero-copy is quite hard as there are some minor differences in the memory layout of Arrow and Pandas.

@wesm
Copy link

wesm commented Nov 8, 2017

I'm open to building something outside the box to help here. Per http://arrow.apache.org/blog/2017/10/15/fast-python-serialization-with-ray-and-arrow/, we can serialize general Python lists and dicts containing NumPy arrays. So without too much effort, we could disassemble the BlockManager into a dictionary using a custom serializer and then do a zero-copy reconstruction from the resulting buffer.

@wesm
Copy link

wesm commented Nov 8, 2017

I can prototype something and put up a patch if that sounds good?

@mrocklin
Copy link
Member Author

mrocklin commented Nov 8, 2017

I would be very happy to have others deal with this problem :)

To be clear on our objectives, we're looking to avoid any significant memory allocation. My understanding of the blogpost above is that it is still about turning Python objects into bytes, which we don't want to do (except for metadata).

and then do a zero-copy reconstruction from the resulting buffer.

Again, to be clear, it probably isn't feasible to construct or consume a single buffer here (this would, I think, imply concatenation and so memory allocation). Instead I would assume that we would need to pass around a sequence of bytes-like objects where those bytes-like objects were directly taken from the blocks.

@wesm
Copy link

wesm commented Nov 8, 2017

Hopefully this helps explain what is going on:

In [1]: import pyarrow as pa

In [2]: import numpy as np

In [3]: data = {i: np.random.randn(1000, 1000) for i in range(10)}

In [4]: prepped = pa.serialize(data)

In [5]: %timeit prepped = pa.serialize(data)
27.8 µs ± 3.26 µs per loop (mean ± std. dev. of 7 runs, 10000 loops each)

In [6]: as_buf = prepped.to_buffer()

In [7]: %timeit as_buf = prepped.to_buffer()
11.5 ms ± 89.1 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

In [8]: data_deserialized = pa.deserialize(as_buf)

In [9]: %timeit data_deserialized = pa.deserialize(as_buf)
34.8 µs ± 227 ns per loop (mean ± std. dev. of 7 runs, 10000 loops each)

The operation pyarrow.serialize is a zero copy operation, unless you have PyObject* arrays. The action to_buffer creates a single memoryview-compatible object containing the whole payload. You can also write the stream to a file-like object, too

The action pa.deserialize is zero-copy (assuming that you don't have any object arrays, which require unpickling), returning the original data, but containing memory references to the input buffer

So what I'm proposing is to decompose a pandas DataFrame into a collection of NumPy arrays representing the BlockManager, and then pushing them through this zero-copy serialization machinery

@wesm
Copy link

wesm commented Nov 8, 2017

Writing to some other object looks like

In [10]: import io

In [11]: buf = io.BytesIO()

In [12]: prepped.write_to(buf)

@mrocklin
Copy link
Member Author

mrocklin commented Nov 8, 2017

Dask's communication machinery deals with bytes or memoryview objects, it doesn't use file-like objects. Is it possible to pass through the memoryview object without an allocation?

For example in NumPy arrays we do something like the following:

def serialize_numpy(x):
    header = {'shape': x.shape, 'dtype': x.dtype, 'strides': x.strides}
    frames = [x.data]
    return header, frames

This PR did more or less the same thing, except that the header was much more complex and the list of frames was much longer (probably one per block or one per column).

@wesm
Copy link

wesm commented Nov 8, 2017

That would be a little bit more work, but it's certainly possible and would be valuable. The purpose of ARROW-1509 https://issues.apache.org/jira/browse/ARROW-1509 is to make the framing of the serialized payload less monolithic in the way you describe

cc @pcmoritz @robertnishihara @cpcloud

@pitrou
Copy link
Member

pitrou commented Nov 8, 2017

The action to_buffer creates a single memoryview-compatible object containing the whole payload.

Which is a copy, right? Can you instead grow a to_buffers() method which returns a list of buffers without copying data?

@mrocklin
Copy link
Member Author

mrocklin commented Nov 8, 2017

If Arrow is able to efficiently convert Python things into header + list of bytes/memoryviews without allocating much, and then take that same representation and convert back to Python things then it would be easy to integrate into Dask's communication machinery (and presumably, anything that relies on sockets/tornado/asyncio)

We have an immediate need for this for Pandas dataframes in particular due to memory issues when dealing with large dataframes in small containers. Do you have a sense for when this work might start/complete?

@wesm
Copy link

wesm commented Nov 8, 2017

Which is a copy, right? Can you instead grow a to_buffers() method which returns a list of buffers without copying data?

yes, this is exactly something that ARROW-1509 will permit quite easily

In theory this is all about a day or so of work for someone that knows what they are doing. I will create a JIRA to scope this for 0.8.0, so I think it can be completed in the next 2 weeks -- whenever it's merged it will appear in Linux nightlies (https://anaconda.org/twosigma/pyarrow) so it can be validated more easily by others without having to set up the build environment.

I can do the development work myself if no one else steps up to take a look, doesn't seem like it will be that difficult.

@mrocklin
Copy link
Member Author

mrocklin commented Nov 8, 2017

Awesome. I'll be quite glad if this can help us resolve our memory issues. Thanks for engaging here @wesm

@wesm
Copy link

wesm commented Nov 8, 2017

I have some pressing stuff to wrap up this week so the earliest I could take a look would be early next week. Will keep you in the loop

@wesm
Copy link

wesm commented Nov 9, 2017

@wesm
Copy link

wesm commented Nov 16, 2017

I haven't gotten to this yet, but it's on deck for either sometime this weekend or first thing next week (Thanksgiving week being quiet will be good for writing a lot of patches)

@mrocklin
Copy link
Member Author

This would definitely be nice, but it isn't critical to Dask. No major rush on our end.

@ogrisel
Copy link
Contributor

ogrisel commented Dec 6, 2017

Just a heads up to let you know that I am working on a generic improvement in cloudpickle itself to implement memoryview-based nocopy semantics for large pydata objects: cloudpipe/cloudpickle#138

This could be useful for other objects with nested numpy arrays (e.g. scipy sparse matrices, big scikit-learn estimators such as random forests...).

@wesm
Copy link

wesm commented Dec 6, 2017

That's cool, this sounds like a very similar strategy to what we've been doing in the Arrow serializer

@pitrou
Copy link
Member

pitrou commented Dec 6, 2017

Just a heads up to let you know that I am working on a generic improvement in cloudpickle itself to implement memoryview-based nocopy semantics for large pydata objects

This is a bit different though, as it only does "nocopy" pickling if you pickle to a file-like object (i.e. call dump() rather than dumps()). Which will be tricky if your file object is non-blocking :-)

This is an inherent limitation of pickle, as it outputs a single stream of bytes, while the pyarrow approach returns a list of buffers.

@ogrisel
Copy link
Contributor

ogrisel commented Dec 7, 2017

This is an inherent limitation of pickle, as it outputs a single stream of bytes, while the pyarrow approach returns a list of buffers.

One of the goals of my PR is to actually support the "list of buffers" pattern in cloudpickle. Check the use of ChunksAppender in the test_nocopy_numpy_array test:

https://github.com/cloudpipe/cloudpickle/pull/138/files#diff-ad0a048ef2f4c0dc3eaf2baa553b134eR980

The call to materialize() happens after the end of the pickling process and is meant to simulate the kind of async access to the buffers done by tornado in dask distributed.

This is actually the motivation to not call .release() in the frame buffer of the Python pickler in upstream CPython and use a new BytesIO instance for each protocol 4 frame instead.

@pitrou
Copy link
Member

pitrou commented Dec 7, 2017

This is actually the motivation to not call .release() in the frame buffer of the Python pickler in upstream CPython and use a new BytesIO instance for each protocol 4 frame instead.

I'm worried about the implications in terms of buffer lifetime. This will need a bit more analysis IMHO.

@ogrisel
Copy link
Contributor

ogrisel commented Dec 8, 2017

FYI I have updated cloudpipe/cloudpickle#138 and I can now get nocopy list-of-buffers semantics for pandas data frames, numpy arrays and scipy sparse matrices under CPython 3.5 and 3.6.

@mrocklin
Copy link
Member Author

mrocklin commented Dec 8, 2017

This is really impressive. Have you tried this with dask? If so then are the issues you were seeing before about memory spikes when writing to disk being resolved?

@ogrisel
Copy link
Contributor

ogrisel commented Dec 8, 2017

I have not yet tried with dask, I am battling to understand the details of refcounting to find a way to ensure that my mutable bytes hack is safe.

But I am pretty sure that this new cloudpickle + the throttled GC will solve the memory usage spikes when workers spill to disk.

@ogrisel
Copy link
Contributor

ogrisel commented Dec 8, 2017

I think I found a solution. I plan to run some tests with dask next week.

@jakirkham
Copy link
Member

Obviously this is an old thread and things have changed since then, but wanted to link this comment ( #614 (comment) ) showing how zero-copy serialization with pickle protocol 5 efficiently handles Pandas serialization today. Though this is not unique to Pandas and can work with anything that supports pickle protocol 5

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants