diff --git a/python/doc/source/index.rst b/python/doc/source/index.rst index b933d2359f7..c35f20be863 100644 --- a/python/doc/source/index.rst +++ b/python/doc/source/index.rst @@ -18,10 +18,14 @@ Apache Arrow (Python) ===================== -Arrow is a columnar in-memory analytics layer designed to accelerate big data. -It houses a set of canonical in-memory representations of flat and hierarchical -data along with multiple language-bindings for structure manipulation. It also -provides IPC and common algorithm implementations. +Apache Arrow is a cross-language development platform for in-memory data. It +specifies a standardized language-independent columnar memory format for flat +and hierarchical data, organized for efficient analytic operations on modern +hardware. It also provides computational libraries and zero-copy streaming +messaging and interprocess communication. + +The Arrow Python bindings have first-class integration with NumPy, pandas, and +built-in Python objects. This is the documentation of the Python API of Apache Arrow. For more details on the format and other language bindings see diff --git a/python/doc/source/ipc.rst b/python/doc/source/ipc.rst index 17fe84e0363..6842cb5be9f 100644 --- a/python/doc/source/ipc.rst +++ b/python/doc/source/ipc.rst @@ -256,6 +256,83 @@ Lastly, we use this context as an additioanl argument to ``pyarrow.serialize``: buf = pa.serialize(val, context=context).to_buffer() restored_val = pa.deserialize(buf, context=context) +The ``SerializationContext`` also has convenience methods ``serialize`` and +``deserialize``, so these are equivalent statements: + +.. code-block:: python + + buf = context.serialize(val).to_buffer() + restored_val = context.deserialize(buf) + +Component-based Serialization +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +For serializing Python objects containing some number of NumPy arrays, Arrow +buffers, or other data types, it may be desirable to transport their serialized +representation without having to produce an intermediate copy using the +``to_buffer`` method. To motivate this, support we have a list of NumPy arrays: + +.. ipython:: python + + import numpy as np + data = [np.random.randn(10, 10) for i in range(5)] + +The call ``pa.serialize(data)`` does not copy the memory inside each of these +NumPy arrays. This serialized representation can be then decomposed into a +dictionary containing a sequence of ``pyarrow.Buffer`` objects containing +metadata for each array and references to the memory inside the arrays. To do +this, use the ``to_components`` method: + +.. ipython:: python + + serialized = pa.serialize(data) + components = serialized.to_components() + +The particular details of the output of ``to_components`` are not too +important. The objects in the ``'data'`` field are ``pyarrow.Buffer`` objects, +which are zero-copy convertible to Python ``memoryview`` objects: + +.. ipython:: python + + memoryview(components['data'][0]) + +A memoryview can be converted back to a ``Buffer`` with ``pyarrow.frombuffer``: + +.. ipython:: python + + mv = memoryview(components['data'][0]) + buf = pa.frombuffer(mv) + +An object can be reconstructed from its component-based representation using +``deserialize_components``: + +.. ipython:: python + + restored_data = pa.deserialize_components(components) + restored_data[0] + +``deserialize_components`` is also available as a method on +``SerializationContext`` objects. + +Serializing pandas Objects +-------------------------- + +We provide a serialization context that has optimized handling of pandas +objects like ``DataFrame`` and ``Series``. This is the +``pyarrow.pandas_serialization_context`` member. Combined with component-based +serialization above, this enables zero-copy transport of pandas DataFrame +objects not containing any Python objects: + +.. ipython:: python + + import pandas as pd + df = pd.DataFrame({'a': [1, 2, 3, 4, 5]}) + context = pa.pandas_serialization_context + serialized_df = context.serialize(df) + df_components = serialized_df.to_components() + original_df = context.deserialize_components(df_components) + original_df + Feather Format -------------- diff --git a/python/manylinux1/README.md b/python/manylinux1/README.md index a74f7a27b93..3d462ff2f72 100644 --- a/python/manylinux1/README.md +++ b/python/manylinux1/README.md @@ -37,7 +37,7 @@ git clone ../../ arrow # Build the native baseimage docker build -t arrow-base-x86_64 -f Dockerfile-x86_64 . # Build the python packages -docker run --rm -t -i -v $PWD:/io arrow-base-x86_64 /io/build_arrow.sh +docker run --shm-size=2g --rm -t -i -v $PWD:/io arrow-base-x86_64 /io/build_arrow.sh # Now the new packages are located in the dist/ folder ls -l dist/ ``` diff --git a/python/pyarrow/pandas_compat.py b/python/pyarrow/pandas_compat.py index a50ef96e79f..8459ec31bed 100644 --- a/python/pyarrow/pandas_compat.py +++ b/python/pyarrow/pandas_compat.py @@ -20,6 +20,7 @@ import json import re +import pandas.core.internals as _int import numpy as np import pandas as pd @@ -348,25 +349,85 @@ def get_datetimetz_type(values, dtype, type_): return values, type_ +# ---------------------------------------------------------------------- +# Converting pandas.DataFrame to a dict containing only NumPy arrays or other +# objects friendly to pyarrow.serialize -def make_datetimetz(tz): + +def dataframe_to_serialized_dict(frame): + block_manager = frame._data + + blocks = [] + axes = [ax for ax in block_manager.axes] + + for block in block_manager.blocks: + values = block.values + block_data = {} + + if isinstance(block, _int.DatetimeTZBlock): + block_data['timezone'] = values.tz.zone + values = values.values + elif isinstance(block, _int.CategoricalBlock): + block_data.update(dictionary=values.categories, + ordered=values.ordered) + values = values.codes + + block_data.update( + placement=block.mgr_locs.as_array, + block=values + ) + blocks.append(block_data) + + return { + 'blocks': blocks, + 'axes': axes + } + + +def serialized_dict_to_dataframe(data): + reconstructed_blocks = [_reconstruct_block(block) + for block in data['blocks']] + + block_mgr = _int.BlockManager(reconstructed_blocks, data['axes']) + return pd.DataFrame(block_mgr) + + +def _reconstruct_block(item): + # Construct the individual blocks converting dictionary types to pandas + # categorical types and Timestamps-with-timezones types to the proper + # pandas Blocks + + block_arr = item['block'] + placement = item['placement'] + if 'dictionary' in item: + cat = pd.Categorical.from_codes(block_arr, + categories=item['dictionary'], + ordered=item['ordered']) + block = _int.make_block(cat, placement=placement, + klass=_int.CategoricalBlock, + fastpath=True) + elif 'timezone' in item: + dtype = _make_datetimetz(item['timezone']) + block = _int.make_block(block_arr, placement=placement, + klass=_int.DatetimeTZBlock, + dtype=dtype, fastpath=True) + else: + block = _int.make_block(block_arr, placement=placement) + + return block + + +def _make_datetimetz(tz): from pyarrow.compat import DatetimeTZDtype return DatetimeTZDtype('ns', tz=tz) -def backwards_compatible_index_name(raw_name, logical_name): - pattern = r'^__index_level_\d+__$' - if raw_name == logical_name and re.match(pattern, raw_name) is not None: - return None - else: - return logical_name +# ---------------------------------------------------------------------- +# Converting pyarrow.Table efficiently to pandas.DataFrame def table_to_blockmanager(options, table, memory_pool, nthreads=1, categoricals=None): - import pandas.core.internals as _int - import pyarrow.lib as lib - index_columns = [] columns = [] column_indexes = [] @@ -405,37 +466,13 @@ def table_to_blockmanager(options, table, memory_pool, nthreads=1, index_arrays.append(pd.Series(values, dtype=col_pandas.dtype)) index_names.append( - backwards_compatible_index_name(raw_name, logical_name) + _backwards_compatible_index_name(raw_name, logical_name) ) block_table = block_table.remove_column( block_table.schema.get_field_index(raw_name) ) - # Convert an arrow table to Block from the internal pandas API - result = lib.table_to_blocks(options, block_table, nthreads, memory_pool) - - # Construct the individual blocks converting dictionary types to pandas - # categorical types and Timestamps-with-timezones types to the proper - # pandas Blocks - blocks = [] - for item in result: - block_arr = item['block'] - placement = item['placement'] - if 'dictionary' in item: - cat = pd.Categorical(block_arr, - categories=item['dictionary'], - ordered=item['ordered'], fastpath=True) - block = _int.make_block(cat, placement=placement, - klass=_int.CategoricalBlock, - fastpath=True) - elif 'timezone' in item: - dtype = make_datetimetz(item['timezone']) - block = _int.make_block(block_arr, placement=placement, - klass=_int.DatetimeTZBlock, - dtype=dtype, fastpath=True) - else: - block = _int.make_block(block_arr, placement=placement) - blocks.append(block) + blocks = _table_to_blocks(options, block_table, nthreads, memory_pool) # Construct the row index if len(index_arrays) > 1: @@ -477,31 +514,7 @@ def table_to_blockmanager(options, table, memory_pool, nthreads=1, # if we're reconstructing the index if has_pandas_metadata: - - # Get levels and labels, and provide sane defaults if the index has a - # single level to avoid if/else spaghetti. - levels = getattr(columns, 'levels', None) or [columns] - labels = getattr(columns, 'labels', None) or [ - pd.RangeIndex(len(level)) for level in levels - ] - - # Convert each level to the dtype provided in the metadata - levels_dtypes = [ - (level, col_index.get('numpy_type', level.dtype)) - for level, col_index in zip_longest( - levels, column_indexes, fillvalue={} - ) - ] - new_levels = [ - _level if _level.dtype == _dtype else _level.astype(_dtype) - for _level, _dtype in levels_dtypes - ] - - columns = pd.MultiIndex( - levels=new_levels, - labels=labels, - names=columns.names - ) + columns = _reconstruct_columns_from_metadata(columns, column_indexes) # ARROW-1751: flatten a single level column MultiIndex for pandas 0.21.0 columns = _flatten_single_level_multiindex(columns) @@ -510,6 +523,55 @@ def table_to_blockmanager(options, table, memory_pool, nthreads=1, return _int.BlockManager(blocks, axes) +def _backwards_compatible_index_name(raw_name, logical_name): + # Part of table_to_blockmanager + pattern = r'^__index_level_\d+__$' + if raw_name == logical_name and re.match(pattern, raw_name) is not None: + return None + else: + return logical_name + + +def _reconstruct_columns_from_metadata(columns, column_indexes): + # Part of table_to_blockmanager + + # Get levels and labels, and provide sane defaults if the index has a + # single level to avoid if/else spaghetti. + levels = getattr(columns, 'levels', None) or [columns] + labels = getattr(columns, 'labels', None) or [ + pd.RangeIndex(len(level)) for level in levels + ] + + # Convert each level to the dtype provided in the metadata + levels_dtypes = [ + (level, col_index.get('numpy_type', level.dtype)) + for level, col_index in zip_longest( + levels, column_indexes, fillvalue={} + ) + ] + new_levels = [ + _level if _level.dtype == _dtype else _level.astype(_dtype) + for _level, _dtype in levels_dtypes + ] + + return pd.MultiIndex( + levels=new_levels, + labels=labels, + names=columns.names + ) + + +def _table_to_blocks(options, block_table, nthreads, memory_pool): + # Part of table_to_blockmanager + + # Convert an arrow table to Block from the internal pandas API + result = pa.lib.table_to_blocks(options, block_table, nthreads, + memory_pool) + + # Defined above + return [_reconstruct_block(item) for item in result] + + def _flatten_single_level_multiindex(index): if isinstance(index, pd.MultiIndex) and index.nlevels == 1: levels, = index.levels diff --git a/python/pyarrow/serialization.pxi b/python/pyarrow/serialization.pxi index faf164b3ebd..cbc5e3b8408 100644 --- a/python/pyarrow/serialization.pxi +++ b/python/pyarrow/serialization.pxi @@ -152,6 +152,30 @@ cdef class SerializationContext: obj.__dict__.update(serialized_obj) return obj + def serialize(self, obj): + """ + Call pyarrow.serialize and pass this SerializationContext + """ + return serialize(obj, context=self) + + def serialize_to(self, object value, sink): + """ + Call pyarrow.serialize_to and pass this SerializationContext + """ + return serialize_to(value, sink, context=self) + + def deserialize(self, what): + """ + Call pyarrow.deserialize and pass this SerializationContext + """ + return deserialize(what, context=self) + + def deserialize_components(self, what): + """ + Call pyarrow.deserialize_components and pass this SerializationContext + """ + return deserialize_components(what, context=self) + _default_serialization_context = SerializationContext() diff --git a/python/pyarrow/serialization.py b/python/pyarrow/serialization.py index 08e6cce751b..b6d2b0258bd 100644 --- a/python/pyarrow/serialization.py +++ b/python/pyarrow/serialization.py @@ -43,15 +43,19 @@ def _deserialize_numpy_array_list(data): return np.array(data[0], dtype=np.dtype(data[1])) -def _serialize_numpy_array_pickle(obj): - pickled = pickle.dumps(obj, protocol=pickle.HIGHEST_PROTOCOL) +def _pickle_to_buffer(x): + pickled = pickle.dumps(x, protocol=pickle.HIGHEST_PROTOCOL) return frombuffer(pickled) -def _deserialize_numpy_array_pickle(data): +def _load_pickle_from_buffer(data): return pickle.loads(memoryview(data)) +_serialize_numpy_array_pickle = _pickle_to_buffer +_deserialize_numpy_array_pickle = _load_pickle_from_buffer + + def register_default_serialization_handlers(serialization_context): # ---------------------------------------------------------------------- @@ -107,38 +111,6 @@ def _deserialize_default_dict(data): custom_serializer=_serialize_numpy_array_list, custom_deserializer=_deserialize_numpy_array_list) - # ---------------------------------------------------------------------- - # Set up serialization for pandas Series and DataFrame - - try: - import pandas as pd - - def _serialize_pandas_series(obj): - return serialize_pandas(pd.DataFrame({obj.name: obj})) - - def _deserialize_pandas_series(data): - deserialized = deserialize_pandas(data) - return deserialized[deserialized.columns[0]] - - def _serialize_pandas_dataframe(obj): - return serialize_pandas(obj) - - def _deserialize_pandas_dataframe(data): - return deserialize_pandas(data) - - serialization_context.register_type( - pd.Series, 'pd.Series', - custom_serializer=_serialize_pandas_series, - custom_deserializer=_deserialize_pandas_series) - - serialization_context.register_type( - pd.DataFrame, 'pd.DataFrame', - custom_serializer=_serialize_pandas_dataframe, - custom_deserializer=_deserialize_pandas_dataframe) - except ImportError: - # no pandas - pass - # ---------------------------------------------------------------------- # Set up serialization for pytorch tensors @@ -165,8 +137,87 @@ def _deserialize_torch_tensor(data): register_default_serialization_handlers(_default_serialization_context) + +# ---------------------------------------------------------------------- +# pandas-specific serialization matters + + pandas_serialization_context = _default_serialization_context.clone() + +def _register_pandas_arrow_handlers(context): + try: + import pandas as pd + except ImportError: + return + + def _serialize_pandas_series(obj): + return serialize_pandas(pd.DataFrame({obj.name: obj})) + + def _deserialize_pandas_series(data): + deserialized = deserialize_pandas(data) + return deserialized[deserialized.columns[0]] + + def _serialize_pandas_dataframe(obj): + return serialize_pandas(obj) + + def _deserialize_pandas_dataframe(data): + return deserialize_pandas(data) + + context.register_type( + pd.Series, 'pd.Series', + custom_serializer=_serialize_pandas_series, + custom_deserializer=_deserialize_pandas_series) + + context.register_type( + pd.DataFrame, 'pd.DataFrame', + custom_serializer=_serialize_pandas_dataframe, + custom_deserializer=_deserialize_pandas_dataframe) + + +def _register_custom_pandas_handlers(context): + # ARROW-1784, faster path for pandas-only visibility + + try: + import pandas as pd + except ImportError: + return + + import pyarrow.pandas_compat as pdcompat + + def _serialize_pandas_dataframe(obj): + return pdcompat.dataframe_to_serialized_dict(obj) + + def _deserialize_pandas_dataframe(data): + return pdcompat.serialized_dict_to_dataframe(data) + + def _serialize_pandas_series(obj): + return _serialize_pandas_dataframe(pd.DataFrame({obj.name: obj})) + + def _deserialize_pandas_series(data): + deserialized = _deserialize_pandas_dataframe(data) + return deserialized[deserialized.columns[0]] + + context.register_type( + pd.Series, 'pd.Series', + custom_serializer=_serialize_pandas_series, + custom_deserializer=_deserialize_pandas_series) + + context.register_type( + pd.Index, 'pd.Index', + custom_serializer=_pickle_to_buffer, + custom_deserializer=_load_pickle_from_buffer) + + context.register_type( + pd.DataFrame, 'pd.DataFrame', + custom_serializer=_serialize_pandas_dataframe, + custom_deserializer=_deserialize_pandas_dataframe) + + +_register_pandas_arrow_handlers(_default_serialization_context) +_register_custom_pandas_handlers(pandas_serialization_context) + + pandas_serialization_context.register_type( np.ndarray, 'np.array', custom_serializer=_serialize_numpy_array_pickle, diff --git a/python/pyarrow/tests/test_convert_pandas.py b/python/pyarrow/tests/test_convert_pandas.py index a2a1d6b6910..960ea526362 100644 --- a/python/pyarrow/tests/test_convert_pandas.py +++ b/python/pyarrow/tests/test_convert_pandas.py @@ -60,70 +60,73 @@ def _alltypes_example(size=100): }) -class TestPandasConversion(object): +def _check_pandas_roundtrip(df, expected=None, nthreads=1, + expected_schema=None, + check_dtype=True, schema=None, + preserve_index=False, + as_batch=False): + klass = pa.RecordBatch if as_batch else pa.Table + table = klass.from_pandas(df, schema=schema, + preserve_index=preserve_index, + nthreads=nthreads) + + result = table.to_pandas(nthreads=nthreads) + if expected_schema: + assert table.schema.equals(expected_schema) + if expected is None: + expected = df + tm.assert_frame_equal(result, expected, check_dtype=check_dtype, + check_index_type=('equiv' if preserve_index + else False)) + + +def _check_series_roundtrip(s, type_=None): + arr = pa.array(s, from_pandas=True, type=type_) + + result = pd.Series(arr.to_pandas(), name=s.name) + if patypes.is_timestamp(arr.type) and arr.type.tz is not None: + result = (result.dt.tz_localize('utc') + .dt.tz_convert(arr.type.tz)) + + tm.assert_series_equal(s, result) + + +def _check_array_roundtrip(values, expected=None, mask=None, + type=None): + arr = pa.array(values, from_pandas=True, mask=mask, type=type) + result = arr.to_pandas() + + values_nulls = pd.isnull(values) + if mask is None: + assert arr.null_count == values_nulls.sum() + else: + assert arr.null_count == (mask | values_nulls).sum() + + if mask is None: + tm.assert_series_equal(pd.Series(result), pd.Series(values), + check_names=False) + else: + expected = pd.Series(np.ma.masked_array(values, mask=mask)) + tm.assert_series_equal(pd.Series(result), expected, + check_names=False) + + +def _check_array_from_pandas_roundtrip(np_array): + arr = pa.array(np_array, from_pandas=True) + result = arr.to_pandas() + npt.assert_array_equal(result, np_array) - def setUp(self): - pass - - def tearDown(self): - pass - - def _check_pandas_roundtrip(self, df, expected=None, nthreads=1, - expected_schema=None, - check_dtype=True, schema=None, - preserve_index=False, - as_batch=False): - klass = pa.RecordBatch if as_batch else pa.Table - table = klass.from_pandas(df, schema=schema, - preserve_index=preserve_index, - nthreads=nthreads) - - result = table.to_pandas(nthreads=nthreads) - if expected_schema: - assert table.schema.equals(expected_schema) - if expected is None: - expected = df - tm.assert_frame_equal(result, expected, check_dtype=check_dtype, - check_index_type=('equiv' if preserve_index - else False)) - - def _check_series_roundtrip(self, s, type_=None): - arr = pa.array(s, from_pandas=True, type=type_) - - result = pd.Series(arr.to_pandas(), name=s.name) - if patypes.is_timestamp(arr.type) and arr.type.tz is not None: - result = (result.dt.tz_localize('utc') - .dt.tz_convert(arr.type.tz)) - - tm.assert_series_equal(s, result) - - def _check_array_roundtrip(self, values, expected=None, mask=None, - type=None): - arr = pa.array(values, from_pandas=True, mask=mask, type=type) - result = arr.to_pandas() - - values_nulls = pd.isnull(values) - if mask is None: - assert arr.null_count == values_nulls.sum() - else: - assert arr.null_count == (mask | values_nulls).sum() - - if mask is None: - tm.assert_series_equal(pd.Series(result), pd.Series(values), - check_names=False) - else: - expected = pd.Series(np.ma.masked_array(values, mask=mask)) - tm.assert_series_equal(pd.Series(result), expected, - check_names=False) + +class TestPandasConversion(object): def test_all_none_objects(self): df = pd.DataFrame({'a': [None, None, None]}) - self._check_pandas_roundtrip(df) + _check_pandas_roundtrip(df) def test_all_none_category(self): df = pd.DataFrame({'a': [None, None, None]}) df['a'] = df['a'].astype('category') - self._check_pandas_roundtrip(df) + _check_pandas_roundtrip(df) def test_non_string_columns(self): df = pd.DataFrame({0: [1, 2, 3]}) @@ -133,14 +136,14 @@ def test_non_string_columns(self): def test_column_index_names_are_preserved(self): df = pd.DataFrame({'data': [1, 2, 3]}) df.columns.names = ['a'] - self._check_pandas_roundtrip(df, preserve_index=True) + _check_pandas_roundtrip(df, preserve_index=True) def test_multiindex_columns(self): columns = pd.MultiIndex.from_arrays([ ['one', 'two'], ['X', 'Y'] ]) df = pd.DataFrame([(1, 'a'), (2, 'b'), (3, 'c')], columns=columns) - self._check_pandas_roundtrip(df, preserve_index=True) + _check_pandas_roundtrip(df, preserve_index=True) def test_multiindex_columns_with_dtypes(self): columns = pd.MultiIndex.from_arrays( @@ -151,11 +154,11 @@ def test_multiindex_columns_with_dtypes(self): names=['level_1', 'level_2'], ) df = pd.DataFrame([(1, 'a'), (2, 'b'), (3, 'c')], columns=columns) - self._check_pandas_roundtrip(df, preserve_index=True) + _check_pandas_roundtrip(df, preserve_index=True) def test_integer_index_column(self): df = pd.DataFrame([(1, 'a'), (2, 'b'), (3, 'c')]) - self._check_pandas_roundtrip(df, preserve_index=True) + _check_pandas_roundtrip(df, preserve_index=True) def test_categorical_column_index(self): # I *really* hope no one uses category dtypes for single level column @@ -203,7 +206,7 @@ def test_categorical_row_index(self): df['a'] = df.a.astype('category') df = df.set_index('a') - self._check_pandas_roundtrip(df, preserve_index=True) + _check_pandas_roundtrip(df, preserve_index=True) def test_float_no_nulls(self): data = {} @@ -218,7 +221,7 @@ def test_float_no_nulls(self): df = pd.DataFrame(data) schema = pa.schema(fields) - self._check_pandas_roundtrip(df, expected_schema=schema) + _check_pandas_roundtrip(df, expected_schema=schema) def test_zero_copy_success(self): result = pa.array([0, 1, 2]).to_pandas(zero_copy_only=True) @@ -312,8 +315,8 @@ def test_float_object_nulls(self): expected = pd.DataFrame({'floats': pd.to_numeric(arr)}) field = pa.field('floats', pa.float64()) schema = pa.schema([field]) - self._check_pandas_roundtrip(df, expected=expected, - expected_schema=schema) + _check_pandas_roundtrip(df, expected=expected, + expected_schema=schema) def test_int_object_nulls(self): arr = np.array([None, 1, np.int64(3)] * 5, dtype=object) @@ -321,8 +324,8 @@ def test_int_object_nulls(self): expected = pd.DataFrame({'ints': pd.to_numeric(arr)}) field = pa.field('ints', pa.int64()) schema = pa.schema([field]) - self._check_pandas_roundtrip(df, expected=expected, - expected_schema=schema) + _check_pandas_roundtrip(df, expected=expected, + expected_schema=schema) def test_integer_no_nulls(self): data = OrderedDict() @@ -347,7 +350,7 @@ def test_integer_no_nulls(self): df = pd.DataFrame(data) schema = pa.schema(fields) - self._check_pandas_roundtrip(df, expected_schema=schema) + _check_pandas_roundtrip(df, expected_schema=schema) def test_integer_with_nulls(self): # pandas requires upcast to float dtype @@ -395,7 +398,7 @@ def test_boolean_no_nulls(self): df = pd.DataFrame({'bools': np.random.randn(num_values) > 0}) field = pa.field('bools', pa.bool_()) schema = pa.schema([field]) - self._check_pandas_roundtrip(df, expected_schema=schema) + _check_pandas_roundtrip(df, expected_schema=schema) def test_boolean_nulls(self): # pandas requires upcast to object dtype @@ -425,7 +428,7 @@ def test_boolean_object_nulls(self): df = pd.DataFrame({'bools': arr}) field = pa.field('bools', pa.bool_()) schema = pa.schema([field]) - self._check_pandas_roundtrip(df, expected_schema=schema) + _check_pandas_roundtrip(df, expected_schema=schema) def test_all_nulls_cast_numeric(self): arr = np.array([None], dtype=object) @@ -445,7 +448,7 @@ def test_unicode(self): field = pa.field('strings', pa.string()) schema = pa.schema([field]) - self._check_pandas_roundtrip(df, expected_schema=schema) + _check_pandas_roundtrip(df, expected_schema=schema) def test_bytes_to_binary(self): values = [u('qux'), b'foo', None, 'bar', 'qux', np.nan] @@ -456,7 +459,7 @@ def test_bytes_to_binary(self): values2 = [b'qux', b'foo', None, b'bar', b'qux', np.nan] expected = pd.DataFrame({'strings': values2}) - self._check_pandas_roundtrip(df, expected) + _check_pandas_roundtrip(df, expected) @pytest.mark.large_memory def test_bytes_exceed_2gb(self): @@ -499,7 +502,7 @@ def test_timestamps_notimezone_no_nulls(self): }) field = pa.field('datetime64', pa.timestamp('ns')) schema = pa.schema([field]) - self._check_pandas_roundtrip( + _check_pandas_roundtrip( df, expected_schema=schema, ) @@ -514,7 +517,7 @@ def test_timestamps_notimezone_nulls(self): }) field = pa.field('datetime64', pa.timestamp('ns')) schema = pa.schema([field]) - self._check_pandas_roundtrip( + _check_pandas_roundtrip( df, expected_schema=schema, ) @@ -529,9 +532,9 @@ def test_timestamps_with_timezone(self): }) df['datetime64'] = (df['datetime64'].dt.tz_localize('US/Eastern') .to_frame()) - self._check_pandas_roundtrip(df) + _check_pandas_roundtrip(df) - self._check_series_roundtrip(df['datetime64']) + _check_series_roundtrip(df['datetime64']) # drop-in a null and ns instead of ms df = pd.DataFrame({ @@ -545,7 +548,7 @@ def test_timestamps_with_timezone(self): df['datetime64'] = (df['datetime64'].dt.tz_localize('US/Eastern') .to_frame()) - self._check_pandas_roundtrip(df) + _check_pandas_roundtrip(df) def test_datetime64_to_date32(self): # ARROW-1718 @@ -638,13 +641,13 @@ def test_timedelta(self): def test_column_of_arrays(self): df, schema = dataframe_with_arrays() - self._check_pandas_roundtrip(df, schema=schema, expected_schema=schema) + _check_pandas_roundtrip(df, schema=schema, expected_schema=schema) table = pa.Table.from_pandas(df, schema=schema, preserve_index=False) assert table.schema.equals(schema) for column in df.columns: field = schema.field_by_name(column) - self._check_array_roundtrip(df[column], type=field.type) + _check_array_roundtrip(df[column], type=field.type) def test_column_of_arrays_to_py(self): # Test regression in ARROW-1199 not caught in above test @@ -665,13 +668,13 @@ def test_column_of_arrays_to_py(self): def test_column_of_lists(self): df, schema = dataframe_with_lists() - self._check_pandas_roundtrip(df, schema=schema, expected_schema=schema) + _check_pandas_roundtrip(df, schema=schema, expected_schema=schema) table = pa.Table.from_pandas(df, schema=schema, preserve_index=False) assert table.schema.equals(schema) for column in df.columns: field = schema.field_by_name(column) - self._check_array_roundtrip(df[column], type=field.type) + _check_array_roundtrip(df[column], type=field.type) def test_column_of_lists_chunked(self): # ARROW-1357 @@ -723,7 +726,7 @@ def test_column_of_lists_strided(self): arr = df['int64'].values[::3] assert arr.strides[0] != 8 - self._check_array_roundtrip(arr) + _check_array_roundtrip(arr) def test_nested_lists_all_none(self): data = np.array([[None, None], None], dtype=object) @@ -742,8 +745,8 @@ def test_nested_lists_all_none(self): def test_threaded_conversion(self): df = _alltypes_example() - self._check_pandas_roundtrip(df, nthreads=2) - self._check_pandas_roundtrip(df, nthreads=2, as_batch=True) + _check_pandas_roundtrip(df, nthreads=2) + _check_pandas_roundtrip(df, nthreads=2, as_batch=True) def test_category(self): repeats = 5 @@ -761,7 +764,7 @@ def test_category(self): 'strings': v1 * repeats, 'strings2': v1 * repeats, 'strings3': v3 * repeats}) - self._check_pandas_roundtrip(df) + _check_pandas_roundtrip(df) arrays = [ pd.Categorical(v1 * repeats), @@ -769,7 +772,7 @@ def test_category(self): pd.Categorical(v3 * repeats) ] for values in arrays: - self._check_array_roundtrip(values) + _check_array_roundtrip(values) def test_mixed_types_fails(self): data = pd.DataFrame({'a': ['a', 1, 2.0]}) @@ -816,9 +819,9 @@ def test_strided_data_import(self): df = pd.DataFrame(case, columns=columns) col = df['a'] - self._check_pandas_roundtrip(df) - self._check_array_roundtrip(col) - self._check_array_roundtrip(col, mask=strided_mask) + _check_pandas_roundtrip(df) + _check_array_roundtrip(col) + _check_array_roundtrip(col, mask=strided_mask) def test_decimal_32_from_pandas(self): expected = pd.DataFrame({ @@ -978,11 +981,6 @@ def test_arrow_time_to_pandas(self): tm.assert_frame_equal(df, expected_df) - def _check_array_from_pandas_roundtrip(self, np_array): - arr = pa.array(np_array, from_pandas=True) - result = arr.to_pandas() - npt.assert_array_equal(result, np_array) - def test_numpy_datetime64_columns(self): datetime64_ns = np.array([ '2007-07-13T01:23:34.123456789', @@ -990,7 +988,7 @@ def test_numpy_datetime64_columns(self): '2006-01-13T12:34:56.432539784', '2010-08-13T05:46:57.437699912'], dtype='datetime64[ns]') - self._check_array_from_pandas_roundtrip(datetime64_ns) + _check_array_from_pandas_roundtrip(datetime64_ns) datetime64_us = np.array([ '2007-07-13T01:23:34.123456', @@ -998,7 +996,7 @@ def test_numpy_datetime64_columns(self): '2006-01-13T12:34:56.432539', '2010-08-13T05:46:57.437699'], dtype='datetime64[us]') - self._check_array_from_pandas_roundtrip(datetime64_us) + _check_array_from_pandas_roundtrip(datetime64_us) datetime64_ms = np.array([ '2007-07-13T01:23:34.123', @@ -1006,7 +1004,7 @@ def test_numpy_datetime64_columns(self): '2006-01-13T12:34:56.432', '2010-08-13T05:46:57.437'], dtype='datetime64[ms]') - self._check_array_from_pandas_roundtrip(datetime64_ms) + _check_array_from_pandas_roundtrip(datetime64_ms) datetime64_s = np.array([ '2007-07-13T01:23:34', @@ -1014,7 +1012,7 @@ def test_numpy_datetime64_columns(self): '2006-01-13T12:34:56', '2010-08-13T05:46:57'], dtype='datetime64[s]') - self._check_array_from_pandas_roundtrip(datetime64_s) + _check_array_from_pandas_roundtrip(datetime64_s) def test_numpy_datetime64_day_unit(self): datetime64_d = np.array([ @@ -1023,7 +1021,7 @@ def test_numpy_datetime64_day_unit(self): '2006-01-15', '2010-08-19'], dtype='datetime64[D]') - self._check_array_from_pandas_roundtrip(datetime64_d) + _check_array_from_pandas_roundtrip(datetime64_d) def test_all_nones(self): def _check_series(s): @@ -1070,8 +1068,8 @@ def test_partial_schema(self): pa.field('c', pa.int64()) ]) - self._check_pandas_roundtrip(df, schema=partial_schema, - expected_schema=expected_schema) + _check_pandas_roundtrip(df, schema=partial_schema, + expected_schema=expected_schema) def test_structarray(self): ints = pa.array([None, 2, 3], type=pa.int64()) @@ -1106,7 +1104,7 @@ def test_infer_lists(self): pa.field('nested_strs', pa.list_(pa.list_(pa.string()))) ]) - self._check_pandas_roundtrip(df, expected_schema=expected_schema) + _check_pandas_roundtrip(df, expected_schema=expected_schema) def test_infer_numpy_array(self): data = OrderedDict([ @@ -1120,7 +1118,7 @@ def test_infer_numpy_array(self): pa.field('ints', pa.list_(pa.int64())) ]) - self._check_pandas_roundtrip(df, expected_schema=expected_schema) + _check_pandas_roundtrip(df, expected_schema=expected_schema) def test_metadata_with_mixed_types(self): df = pd.DataFrame({'data': [b'some_bytes', u'some_unicode']}) @@ -1175,12 +1173,12 @@ def test_table_str_to_categorical(self): def test_table_batch_empty_dataframe(self): df = pd.DataFrame({}) - self._check_pandas_roundtrip(df) - self._check_pandas_roundtrip(df, as_batch=True) + _check_pandas_roundtrip(df) + _check_pandas_roundtrip(df, as_batch=True) df2 = pd.DataFrame({}, index=[0, 1, 2]) - self._check_pandas_roundtrip(df2, preserve_index=True) - self._check_pandas_roundtrip(df2, as_batch=True, preserve_index=True) + _check_pandas_roundtrip(df2, preserve_index=True) + _check_pandas_roundtrip(df2, as_batch=True, preserve_index=True) def test_array_from_pandas_date_with_mask(self): m = np.array([True, False, True]) @@ -1222,6 +1220,51 @@ def test_array_from_pandas_typed_array_with_mask(self, t, data, expected): type=pa.list_(t())).equals(result) +def _fully_loaded_dataframe_example(): + from distutils.version import LooseVersion + + index = pd.MultiIndex.from_arrays([ + pd.date_range('2000-01-01', periods=5).repeat(2), + np.tile(np.array(['foo', 'bar'], dtype=object), 5) + ]) + + c1 = pd.date_range('2000-01-01', periods=10) + data = { + 0: c1, + 1: c1.tz_localize('utc'), + 2: c1.tz_localize('US/Eastern'), + 3: c1[::2].tz_localize('utc').repeat(2).astype('category'), + 4: ['foo', 'bar'] * 5, + 5: pd.Series(['foo', 'bar'] * 5).astype('category').values, + 6: [True, False] * 5, + 7: np.random.randn(10), + 8: np.random.randint(0, 100, size=10), + 9: pd.period_range('2013', periods=10, freq='M') + } + + if LooseVersion(pd.__version__) >= '0.21': + # There is an issue with pickling IntervalIndex in pandas 0.20.x + data[10] = pd.interval_range(start=1, freq=1, periods=10) + + return pd.DataFrame(data, index=index) + + +def _check_serialize_components_roundtrip(df): + ctx = pa.pandas_serialization_context + + components = ctx.serialize(df).to_components() + deserialized = ctx.deserialize_components(components) + + tm.assert_frame_equal(df, deserialized) + + +def test_serialize_deserialize_pandas(): + # ARROW-1784, serialize and deserialize DataFrame by decomposing + # BlockManager + df = _fully_loaded_dataframe_example() + _check_serialize_components_roundtrip(df) + + def _pytime_from_micros(val): microseconds = val % 1000000 val //= 1000000