Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions python/doc/source/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,14 @@
Apache Arrow (Python)
=====================

Arrow is a columnar in-memory analytics layer designed to accelerate big data.
It houses a set of canonical in-memory representations of flat and hierarchical
data along with multiple language-bindings for structure manipulation. It also
provides IPC and common algorithm implementations.
Apache Arrow is a cross-language development platform for in-memory data. It
specifies a standardized language-independent columnar memory format for flat
and hierarchical data, organized for efficient analytic operations on modern
hardware. It also provides computational libraries and zero-copy streaming
messaging and interprocess communication.

The Arrow Python bindings have first-class integration with NumPy, pandas, and
built-in Python objects.

This is the documentation of the Python API of Apache Arrow. For more details
on the format and other language bindings see
Expand Down
77 changes: 77 additions & 0 deletions python/doc/source/ipc.rst
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,83 @@ Lastly, we use this context as an additioanl argument to ``pyarrow.serialize``:
buf = pa.serialize(val, context=context).to_buffer()
restored_val = pa.deserialize(buf, context=context)

The ``SerializationContext`` also has convenience methods ``serialize`` and
``deserialize``, so these are equivalent statements:

.. code-block:: python

buf = context.serialize(val).to_buffer()
restored_val = context.deserialize(buf)

Component-based Serialization
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

For serializing Python objects containing some number of NumPy arrays, Arrow
buffers, or other data types, it may be desirable to transport their serialized
representation without having to produce an intermediate copy using the
``to_buffer`` method. To motivate this, support we have a list of NumPy arrays:

.. ipython:: python

import numpy as np
data = [np.random.randn(10, 10) for i in range(5)]

The call ``pa.serialize(data)`` does not copy the memory inside each of these
NumPy arrays. This serialized representation can be then decomposed into a
dictionary containing a sequence of ``pyarrow.Buffer`` objects containing
metadata for each array and references to the memory inside the arrays. To do
this, use the ``to_components`` method:

.. ipython:: python

serialized = pa.serialize(data)
components = serialized.to_components()

The particular details of the output of ``to_components`` are not too
important. The objects in the ``'data'`` field are ``pyarrow.Buffer`` objects,
which are zero-copy convertible to Python ``memoryview`` objects:

.. ipython:: python

memoryview(components['data'][0])

A memoryview can be converted back to a ``Buffer`` with ``pyarrow.frombuffer``:

.. ipython:: python

mv = memoryview(components['data'][0])
buf = pa.frombuffer(mv)

An object can be reconstructed from its component-based representation using
``deserialize_components``:

.. ipython:: python

restored_data = pa.deserialize_components(components)
restored_data[0]

``deserialize_components`` is also available as a method on
``SerializationContext`` objects.

Serializing pandas Objects
--------------------------

We provide a serialization context that has optimized handling of pandas
objects like ``DataFrame`` and ``Series``. This is the
``pyarrow.pandas_serialization_context`` member. Combined with component-based
serialization above, this enables zero-copy transport of pandas DataFrame
objects not containing any Python objects:

.. ipython:: python

import pandas as pd
df = pd.DataFrame({'a': [1, 2, 3, 4, 5]})
context = pa.pandas_serialization_context
serialized_df = context.serialize(df)
df_components = serialized_df.to_components()
original_df = context.deserialize_components(df_components)
original_df

Feather Format
--------------

Expand Down
2 changes: 1 addition & 1 deletion python/manylinux1/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ git clone ../../ arrow
# Build the native baseimage
docker build -t arrow-base-x86_64 -f Dockerfile-x86_64 .
# Build the python packages
docker run --rm -t -i -v $PWD:/io arrow-base-x86_64 /io/build_arrow.sh
docker run --shm-size=2g --rm -t -i -v $PWD:/io arrow-base-x86_64 /io/build_arrow.sh
# Now the new packages are located in the dist/ folder
ls -l dist/
```
184 changes: 123 additions & 61 deletions python/pyarrow/pandas_compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import json
import re

import pandas.core.internals as _int
import numpy as np
import pandas as pd

Expand Down Expand Up @@ -348,25 +349,85 @@ def get_datetimetz_type(values, dtype, type_):

return values, type_

# ----------------------------------------------------------------------
# Converting pandas.DataFrame to a dict containing only NumPy arrays or other
# objects friendly to pyarrow.serialize

def make_datetimetz(tz):

def dataframe_to_serialized_dict(frame):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jreback let me know if I missed anything on these functions

block_manager = frame._data

blocks = []
axes = [ax for ax in block_manager.axes]

for block in block_manager.blocks:
values = block.values
block_data = {}

if isinstance(block, _int.DatetimeTZBlock):
block_data['timezone'] = values.tz.zone
values = values.values
elif isinstance(block, _int.CategoricalBlock):
block_data.update(dictionary=values.categories,
ordered=values.ordered)
values = values.codes

block_data.update(
placement=block.mgr_locs.as_array,
block=values
)
blocks.append(block_data)

return {
'blocks': blocks,
'axes': axes
}


def serialized_dict_to_dataframe(data):
reconstructed_blocks = [_reconstruct_block(block)
for block in data['blocks']]

block_mgr = _int.BlockManager(reconstructed_blocks, data['axes'])
return pd.DataFrame(block_mgr)


def _reconstruct_block(item):
# Construct the individual blocks converting dictionary types to pandas
# categorical types and Timestamps-with-timezones types to the proper
# pandas Blocks

block_arr = item['block']
placement = item['placement']
if 'dictionary' in item:
cat = pd.Categorical.from_codes(block_arr,
categories=item['dictionary'],
ordered=item['ordered'])
block = _int.make_block(cat, placement=placement,
klass=_int.CategoricalBlock,
fastpath=True)
elif 'timezone' in item:
dtype = _make_datetimetz(item['timezone'])
block = _int.make_block(block_arr, placement=placement,
klass=_int.DatetimeTZBlock,
dtype=dtype, fastpath=True)
else:
block = _int.make_block(block_arr, placement=placement)

return block


def _make_datetimetz(tz):
from pyarrow.compat import DatetimeTZDtype
return DatetimeTZDtype('ns', tz=tz)


def backwards_compatible_index_name(raw_name, logical_name):
pattern = r'^__index_level_\d+__$'
if raw_name == logical_name and re.match(pattern, raw_name) is not None:
return None
else:
return logical_name
# ----------------------------------------------------------------------
# Converting pyarrow.Table efficiently to pandas.DataFrame


def table_to_blockmanager(options, table, memory_pool, nthreads=1,
categoricals=None):
import pandas.core.internals as _int
import pyarrow.lib as lib

index_columns = []
columns = []
column_indexes = []
Expand Down Expand Up @@ -405,37 +466,13 @@ def table_to_blockmanager(options, table, memory_pool, nthreads=1,

index_arrays.append(pd.Series(values, dtype=col_pandas.dtype))
index_names.append(
backwards_compatible_index_name(raw_name, logical_name)
_backwards_compatible_index_name(raw_name, logical_name)
)
block_table = block_table.remove_column(
block_table.schema.get_field_index(raw_name)
)

# Convert an arrow table to Block from the internal pandas API
result = lib.table_to_blocks(options, block_table, nthreads, memory_pool)

# Construct the individual blocks converting dictionary types to pandas
# categorical types and Timestamps-with-timezones types to the proper
# pandas Blocks
blocks = []
for item in result:
block_arr = item['block']
placement = item['placement']
if 'dictionary' in item:
cat = pd.Categorical(block_arr,
categories=item['dictionary'],
ordered=item['ordered'], fastpath=True)
block = _int.make_block(cat, placement=placement,
klass=_int.CategoricalBlock,
fastpath=True)
elif 'timezone' in item:
dtype = make_datetimetz(item['timezone'])
block = _int.make_block(block_arr, placement=placement,
klass=_int.DatetimeTZBlock,
dtype=dtype, fastpath=True)
else:
block = _int.make_block(block_arr, placement=placement)
blocks.append(block)
blocks = _table_to_blocks(options, block_table, nthreads, memory_pool)

# Construct the row index
if len(index_arrays) > 1:
Expand Down Expand Up @@ -477,31 +514,7 @@ def table_to_blockmanager(options, table, memory_pool, nthreads=1,

# if we're reconstructing the index
if has_pandas_metadata:

# Get levels and labels, and provide sane defaults if the index has a
# single level to avoid if/else spaghetti.
levels = getattr(columns, 'levels', None) or [columns]
labels = getattr(columns, 'labels', None) or [
pd.RangeIndex(len(level)) for level in levels
]

# Convert each level to the dtype provided in the metadata
levels_dtypes = [
(level, col_index.get('numpy_type', level.dtype))
for level, col_index in zip_longest(
levels, column_indexes, fillvalue={}
)
]
new_levels = [
_level if _level.dtype == _dtype else _level.astype(_dtype)
for _level, _dtype in levels_dtypes
]

columns = pd.MultiIndex(
levels=new_levels,
labels=labels,
names=columns.names
)
columns = _reconstruct_columns_from_metadata(columns, column_indexes)

# ARROW-1751: flatten a single level column MultiIndex for pandas 0.21.0
columns = _flatten_single_level_multiindex(columns)
Expand All @@ -510,6 +523,55 @@ def table_to_blockmanager(options, table, memory_pool, nthreads=1,
return _int.BlockManager(blocks, axes)


def _backwards_compatible_index_name(raw_name, logical_name):
# Part of table_to_blockmanager
pattern = r'^__index_level_\d+__$'
if raw_name == logical_name and re.match(pattern, raw_name) is not None:
return None
else:
return logical_name


def _reconstruct_columns_from_metadata(columns, column_indexes):
# Part of table_to_blockmanager

# Get levels and labels, and provide sane defaults if the index has a
# single level to avoid if/else spaghetti.
levels = getattr(columns, 'levels', None) or [columns]
labels = getattr(columns, 'labels', None) or [
pd.RangeIndex(len(level)) for level in levels
]

# Convert each level to the dtype provided in the metadata
levels_dtypes = [
(level, col_index.get('numpy_type', level.dtype))
for level, col_index in zip_longest(
levels, column_indexes, fillvalue={}
)
]
new_levels = [
_level if _level.dtype == _dtype else _level.astype(_dtype)
for _level, _dtype in levels_dtypes
]

return pd.MultiIndex(
levels=new_levels,
labels=labels,
names=columns.names
)


def _table_to_blocks(options, block_table, nthreads, memory_pool):
# Part of table_to_blockmanager

# Convert an arrow table to Block from the internal pandas API
result = pa.lib.table_to_blocks(options, block_table, nthreads,
memory_pool)

# Defined above
return [_reconstruct_block(item) for item in result]


def _flatten_single_level_multiindex(index):
if isinstance(index, pd.MultiIndex) and index.nlevels == 1:
levels, = index.levels
Expand Down
24 changes: 24 additions & 0 deletions python/pyarrow/serialization.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,30 @@ cdef class SerializationContext:
obj.__dict__.update(serialized_obj)
return obj

def serialize(self, obj):
"""
Call pyarrow.serialize and pass this SerializationContext
"""
return serialize(obj, context=self)

def serialize_to(self, object value, sink):
"""
Call pyarrow.serialize_to and pass this SerializationContext
"""
return serialize_to(value, sink, context=self)

def deserialize(self, what):
"""
Call pyarrow.deserialize and pass this SerializationContext
"""
return deserialize(what, context=self)

def deserialize_components(self, what):
"""
Call pyarrow.deserialize_components and pass this SerializationContext
"""
return deserialize_components(what, context=self)


_default_serialization_context = SerializationContext()

Expand Down
Loading