From 59477b5567f3990d15a350853fd9a8d48b06b250 Mon Sep 17 00:00:00 2001 From: Phillip Cloud Date: Wed, 5 Apr 2017 14:55:35 -0400 Subject: [PATCH 01/10] ARROW-881: [Python] Reconstruct Pandas DataFrame indexes using custom_metadata --- cpp/src/arrow/type-test.cc | 16 ++ cpp/src/arrow/type.cc | 11 +- cpp/src/arrow/type.h | 7 +- python/pyarrow/__init__.py | 3 +- python/pyarrow/_parquet.pxd | 5 + python/pyarrow/_parquet.pyx | 61 ++++-- python/pyarrow/array.pxi | 92 ++++---- python/pyarrow/includes/common.pxd | 2 +- python/pyarrow/includes/libarrow.pxd | 6 +- python/pyarrow/io.pxi | 2 +- python/pyarrow/ipc.py | 38 ++++ python/pyarrow/lib.pxd | 32 +-- python/pyarrow/memory.pxi | 2 +- python/pyarrow/pandas_compat.py | 102 +++++++++ python/pyarrow/parquet.py | 39 +++- python/pyarrow/table.pxi | 225 ++++++++++++++------ python/pyarrow/tests/pandas_examples.py | 8 +- python/pyarrow/tests/test_convert_pandas.py | 45 ++-- python/pyarrow/tests/test_ipc.py | 51 +++++ python/pyarrow/tests/test_parquet.py | 55 +++-- python/pyarrow/tests/test_table.py | 2 +- 21 files changed, 616 insertions(+), 188 deletions(-) create mode 100644 python/pyarrow/pandas_compat.py diff --git a/cpp/src/arrow/type-test.cc b/cpp/src/arrow/type-test.cc index e73adecdcb5..1fbb683da6c 100644 --- a/cpp/src/arrow/type-test.cc +++ b/cpp/src/arrow/type-test.cc @@ -152,6 +152,22 @@ TEST_F(TestSchema, GetFieldByName) { ASSERT_TRUE(result == nullptr); } +TEST_F(TestSchema, GetFieldIndex) { + auto f0 = field("f0", int32()); + auto f1 = field("f1", uint8(), false); + auto f2 = field("f2", utf8()); + auto f3 = field("f3", list(int16())); + + vector> fields = {f0, f1, f2, f3}; + auto schema = std::make_shared(fields); + + ASSERT_EQ(0, schema->GetFieldIndex(fields[0]->name())); + ASSERT_EQ(1, schema->GetFieldIndex(fields[1]->name())); + ASSERT_EQ(2, schema->GetFieldIndex(fields[2]->name())); + ASSERT_EQ(3, schema->GetFieldIndex(fields[3]->name())); + ASSERT_EQ(-1, schema->GetFieldIndex("not-found")); +} + TEST_F(TestSchema, TestMetadataConstruction) { auto f0 = field("f0", int32()); auto f1 = field("f1", uint8(), false); diff --git a/cpp/src/arrow/type.cc b/cpp/src/arrow/type.cc index afb30272c01..891045e689b 100644 --- a/cpp/src/arrow/type.cc +++ b/cpp/src/arrow/type.cc @@ -265,7 +265,12 @@ bool Schema::Equals(const Schema& other) const { return true; } -std::shared_ptr Schema::GetFieldByName(const std::string& name) { +std::shared_ptr Schema::GetFieldByName(const std::string& name) const { + int64_t i = GetFieldIndex(name); + return i == -1 ? nullptr : fields_[i]; +} + +int64_t Schema::GetFieldIndex(const std::string& name) const { if (fields_.size() > 0 && name_to_index_.size() == 0) { for (size_t i = 0; i < fields_.size(); ++i) { name_to_index_[fields_[i]->name()] = static_cast(i); @@ -274,9 +279,9 @@ std::shared_ptr Schema::GetFieldByName(const std::string& name) { auto it = name_to_index_.find(name); if (it == name_to_index_.end()) { - return nullptr; + return -1; } else { - return fields_[it->second]; + return it->second; } } diff --git a/cpp/src/arrow/type.h b/cpp/src/arrow/type.h index 40615f790fb..3e85291f043 100644 --- a/cpp/src/arrow/type.h +++ b/cpp/src/arrow/type.h @@ -699,7 +699,10 @@ class ARROW_EXPORT Schema { std::shared_ptr field(int i) const { return fields_[i]; } // Returns nullptr if name not found - std::shared_ptr GetFieldByName(const std::string& name); + std::shared_ptr GetFieldByName(const std::string& name) const; + + // Returns -1 if name not found + int64_t GetFieldIndex(const std::string& name) const; const std::vector>& fields() const { return fields_; } std::shared_ptr metadata() const { return metadata_; } @@ -720,7 +723,7 @@ class ARROW_EXPORT Schema { private: std::vector> fields_; - std::unordered_map name_to_index_; + mutable std::unordered_map name_to_index_; std::shared_ptr metadata_; }; diff --git a/python/pyarrow/__init__.py b/python/pyarrow/__init__.py index 632a443ed0d..0f34121f653 100644 --- a/python/pyarrow/__init__.py +++ b/python/pyarrow/__init__.py @@ -104,7 +104,8 @@ def jemalloc_memory_pool(): from pyarrow.ipc import (RecordBatchFileReader, RecordBatchFileWriter, RecordBatchStreamReader, RecordBatchStreamWriter, open_stream, - open_file) + open_file, + serialize_pandas, deserialize_pandas) localfs = LocalFilesystem.get_instance() diff --git a/python/pyarrow/_parquet.pxd b/python/pyarrow/_parquet.pxd index 9f6edc0b31d..2f6b9a9d648 100644 --- a/python/pyarrow/_parquet.pxd +++ b/python/pyarrow/_parquet.pxd @@ -20,6 +20,7 @@ from pyarrow.includes.common cimport * from pyarrow.includes.libarrow cimport (CArray, CSchema, CStatus, CTable, CMemoryPool, + CKeyValueMetadata, RandomAccessFile, OutputStream) @@ -164,6 +165,7 @@ cdef extern from "parquet/api/reader.h" namespace "parquet" nogil: unique_ptr[CRowGroupMetaData] RowGroup(int i) const SchemaDescriptor* schema() + shared_ptr[const CKeyValueMetadata] key_value_metadata() const cdef cppclass ReaderProperties: pass @@ -229,8 +231,11 @@ cdef extern from "parquet/arrow/reader.h" namespace "parquet::arrow" nogil: cdef extern from "parquet/arrow/schema.h" namespace "parquet::arrow" nogil: CStatus FromParquetSchema(const SchemaDescriptor* parquet_schema, + const shared_ptr[const CKeyValueMetadata]& key_value_metadata, shared_ptr[CSchema]* out) + CStatus ToParquetSchema(const CSchema* arrow_schema, + const shared_ptr[const CKeyValueMetadata]& key_value_metadata, shared_ptr[SchemaDescriptor]* out) diff --git a/python/pyarrow/_parquet.pyx b/python/pyarrow/_parquet.pyx index 51bd938c79a..77ef7adadef 100644 --- a/python/pyarrow/_parquet.pyx +++ b/python/pyarrow/_parquet.pyx @@ -40,15 +40,15 @@ cdef class RowGroupMetaData: cdef: unique_ptr[CRowGroupMetaData] up_metadata CRowGroupMetaData* metadata - object parent + FileMetaData parent def __cinit__(self): pass - cdef init_from_file(self, FileMetaData parent, int i): + cdef void init_from_file(self, FileMetaData parent, int i): if i < 0 or i >= parent.num_row_groups: raise IndexError('{0} out of bounds'.format(i)) - self.up_metadata = parent.metadata.RowGroup(i) + self.up_metadata = parent._metadata.RowGroup(i) self.metadata = self.up_metadata.get() self.parent = parent @@ -80,15 +80,15 @@ cdef class RowGroupMetaData: cdef class FileMetaData: cdef: shared_ptr[CFileMetaData] sp_metadata - CFileMetaData* metadata - object _schema + CFileMetaData* _metadata + ParquetSchema _schema def __cinit__(self): pass cdef init(self, const shared_ptr[CFileMetaData]& metadata): self.sp_metadata = metadata - self.metadata = metadata.get() + self._metadata = metadata.get() def __repr__(self): return """{0} @@ -116,27 +116,27 @@ cdef class FileMetaData: property serialized_size: def __get__(self): - return self.metadata.size() + return self._metadata.size() property num_columns: def __get__(self): - return self.metadata.num_columns() + return self._metadata.num_columns() property num_rows: def __get__(self): - return self.metadata.num_rows() + return self._metadata.num_rows() property num_row_groups: def __get__(self): - return self.metadata.num_row_groups() + return self._metadata.num_row_groups() property format_version: def __get__(self): - cdef ParquetVersion version = self.metadata.version() + cdef ParquetVersion version = self._metadata.version() if version == ParquetVersion_V1: return '1.0' if version == ParquetVersion_V2: @@ -149,7 +149,7 @@ cdef class FileMetaData: property created_by: def __get__(self): - return frombytes(self.metadata.created_by()) + return frombytes(self._metadata.created_by()) def row_group(self, int i): """ @@ -159,14 +159,26 @@ cdef class FileMetaData: result.init_from_file(self, i) return result + property metadata: + + def __get__(self): + cdef: + unordered_map[c_string, c_string] metadata + const CKeyValueMetadata* underlying_metadata + underlying_metadata = self._metadata.key_value_metadata().get() + if underlying_metadata != NULL: + underlying_metadata.ToUnorderedMap(&metadata) + return metadata + else: + return None + cdef class ParquetSchema: cdef: - object parent # the FileMetaData owning the SchemaDescriptor + FileMetaData parent # the FileMetaData owning the SchemaDescriptor const SchemaDescriptor* schema def __cinit__(self): - self.parent = None self.schema = NULL def __repr__(self): @@ -186,7 +198,7 @@ cdef class ParquetSchema: cdef init_from_filemeta(self, FileMetaData container): self.parent = container - self.schema = container.metadata.schema() + self.schema = container._metadata.schema() def __len__(self): return self.schema.num_columns() @@ -211,7 +223,9 @@ cdef class ParquetSchema: shared_ptr[CSchema] sp_arrow_schema with nogil: - check_status(FromParquetSchema(self.schema, &sp_arrow_schema)) + check_status(FromParquetSchema( + self.schema, self.parent._metadata.key_value_metadata(), + &sp_arrow_schema)) return pyarrow_wrap_schema(sp_arrow_schema) @@ -232,7 +246,7 @@ cdef class ParquetSchema: cdef class ColumnSchema: cdef: - object parent + ParquetSchema parent const ColumnDescriptor* descr def __cinit__(self): @@ -463,7 +477,7 @@ cdef class ParquetReader: """ cdef: FileMetaData container = self.metadata - const CFileMetaData* metadata = container.metadata + const CFileMetaData* metadata = container._metadata int i = 0 if self.column_idx_map is None: @@ -488,12 +502,13 @@ cdef class ParquetReader: return array -cdef check_compression_name(name): +cdef int check_compression_name(name) except -1: if name.upper() not in ['NONE', 'SNAPPY', 'GZIP', 'LZO', 'BROTLI']: raise ArrowException("Unsupported compression: " + name) + return 0 -cdef ParquetCompression compression_from_name(object name): +cdef ParquetCompression compression_from_name(str name): name = name.upper() if name == "SNAPPY": return ParquetCompression_SNAPPY @@ -546,7 +561,7 @@ cdef class ParquetWriter: maybe_unbox_memory_pool(memory_pool), sink, properties, &self.writer)) - cdef _set_version(self, WriterProperties.Builder* props): + cdef void _set_version(self, WriterProperties.Builder* props): if self.version is not None: if self.version == "1.0": props.version(ParquetVersion_V1) @@ -555,7 +570,7 @@ cdef class ParquetWriter: else: raise ArrowException("Unsupported Parquet format version") - cdef _set_compression_props(self, WriterProperties.Builder* props): + cdef void _set_compression_props(self, WriterProperties.Builder* props): if isinstance(self.compression, basestring): check_compression_name(self.compression) props.compression(compression_from_name(self.compression)) @@ -564,7 +579,7 @@ cdef class ParquetWriter: check_compression_name(codec) props.compression(column, compression_from_name(codec)) - cdef _set_dictionary_props(self, WriterProperties.Builder* props): + cdef void _set_dictionary_props(self, WriterProperties.Builder* props): if isinstance(self.use_dictionary, bool): if self.use_dictionary: props.enable_dictionary() diff --git a/python/pyarrow/array.pxi b/python/pyarrow/array.pxi index c132269c563..5930de39271 100644 --- a/python/pyarrow/array.pxi +++ b/python/pyarrow/array.pxi @@ -159,7 +159,7 @@ cdef class Field: def __cinit__(self): pass - cdef init(self, const shared_ptr[CField]& field): + cdef void init(self, const shared_ptr[CField]& field): self.sp_field = field self.field = field.get() self.type = pyarrow_wrap_data_type(field.get().type()) @@ -264,11 +264,11 @@ cdef class Schema: return result - cdef init(self, const vector[shared_ptr[CField]]& fields): + cdef void init(self, const vector[shared_ptr[CField]]& fields): self.schema = new CSchema(fields) self.sp_schema.reset(self.schema) - cdef init_schema(self, const shared_ptr[CSchema]& schema): + cdef void init_schema(self, const shared_ptr[CSchema]& schema): self.schema = schema.get() self.sp_schema = schema @@ -310,6 +310,9 @@ cdef class Schema: """ return pyarrow_wrap_field(self.schema.GetFieldByName(tobytes(name))) + def get_field_index(self, name): + return self.schema.GetFieldIndex(tobytes(name)) + def add_metadata(self, dict metadata): """ Add metadata as dict of string keys and values to Schema @@ -352,9 +355,9 @@ cdef class Schema: return self.__str__() -cdef box_metadata(const CKeyValueMetadata* metadata): +cdef dict box_metadata(const CKeyValueMetadata* metadata): cdef unordered_map[c_string, c_string] result - if metadata != NULL: + if metadata != nullptr: metadata.ToUnorderedMap(&result) return result else: @@ -813,45 +816,60 @@ cdef class Date64Value(ArrayValue): ap.Value(self.index) / 1000).date() +cdef dict DATETIME_CONVERSION_FUNCTIONS + +try: + import pandas as pd +except ImportError: + DATETIME_CONVERSION_FUNCTIONS = { + TimeUnit_SECOND: lambda x, tzinfo: ( + datetime.datetime.utcfromtimestamp(x).replace(tzinfo=tzinfo) + ), + TimeUnit_MILLI: lambda x, tzinfo: ( + datetime.datetime.utcfromtimestamp(x / 1e3).replace(tzinfo=tzinfo) + ), + TimeUnit_MICRO: lambda x, tzinfo: ( + datetime.datetime.utcfromtimestamp(x / 1e6).replace(tzinfo=tzinfo) + ), + } +else: + DATETIME_CONVERSION_FUNCTIONS = { + TimeUnit_SECOND: lambda x, tzinfo: pd.Timestamp( + x * 1000000000, tz=tzinfo, unit='ns', + ), + TimeUnit_MILLI: lambda x, tzinfo: pd.Timestamp( + x * 1000000, tz=tzinfo, unit='ns', + ), + TimeUnit_MICRO: lambda x, tzinfo: pd.Timestamp( + x * 1000, tz=tzinfo, unit='ns', + ), + TimeUnit_NANO: lambda x, tzinfo: pd.Timestamp( + x, tz=tzinfo, unit='ns', + ) + } + + cdef class TimestampValue(ArrayValue): def as_py(self): cdef: CTimestampArray* ap = self.sp_array.get() - CTimestampType* dtype = ap.type().get() - int64_t val = ap.Value(self.index) + CTimestampType* dtype = ap.type().get() + int64_t value = ap.Value(self.index) - timezone = None - tzinfo = None - if dtype.timezone().size() > 0: - timezone = frombytes(dtype.timezone()) + if not dtype.timezone().empty(): import pytz - tzinfo = pytz.timezone(timezone) + tzinfo = pytz.timezone(frombytes(dtype.timezone())) + else: + tzinfo = None try: - pd = _pandas() - if dtype.unit() == TimeUnit_SECOND: - val = val * 1000000000 - elif dtype.unit() == TimeUnit_MILLI: - val = val * 1000000 - elif dtype.unit() == TimeUnit_MICRO: - val = val * 1000 - return pd.Timestamp(val, tz=tzinfo) - except ImportError: - if dtype.unit() == TimeUnit_SECOND: - result = datetime.datetime.utcfromtimestamp(val) - elif dtype.unit() == TimeUnit_MILLI: - result = datetime.datetime.utcfromtimestamp(float(val) / 1000) - elif dtype.unit() == TimeUnit_MICRO: - result = datetime.datetime.utcfromtimestamp( - float(val) / 1000000) - else: - # TimeUnit_NANO - raise NotImplementedError("Cannot convert nanosecond " - "timestamps without pandas") - if timezone is not None: - result = result.replace(tzinfo=tzinfo) - return result + converter = DATETIME_CONVERSION_FUNCTIONS[dtype.unit()] + except KeyError: + raise ValueError( + 'Cannot convert nanosecond timestamps without pandas' + ) + return converter(value, tzinfo=tzinfo) cdef class FloatValue(ArrayValue): @@ -1042,7 +1060,7 @@ def array(object sequence, DataType type=None, MemoryPool memory_pool=None): cdef class Array: - cdef init(self, const shared_ptr[CArray]& sp_array): + cdef void init(self, const shared_ptr[CArray]& sp_array): self.sp_array = sp_array self.ap = sp_array.get() self.type = pyarrow_wrap_data_type(self.sp_array.get().type()) @@ -1251,7 +1269,7 @@ cdef class Array: cdef class Tensor: - cdef init(self, const shared_ptr[CTensor]& sp_tensor): + cdef void init(self, const shared_ptr[CTensor]& sp_tensor): self.sp_tensor = sp_tensor self.tp = sp_tensor.get() self.type = pyarrow_wrap_data_type(self.tp.type()) diff --git a/python/pyarrow/includes/common.pxd b/python/pyarrow/includes/common.pxd index cc3b4b6fdaf..73bfb4ff508 100644 --- a/python/pyarrow/includes/common.pxd +++ b/python/pyarrow/includes/common.pxd @@ -18,7 +18,7 @@ # distutils: language = c++ from libc.stdint cimport * -from libcpp cimport bool as c_bool +from libcpp cimport bool as c_bool, nullptr from libcpp.memory cimport shared_ptr, unique_ptr, make_shared from libcpp.string cimport string as c_string from libcpp.vector cimport vector diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index a7e2733ca81..9df31c80ccf 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -29,6 +29,7 @@ cdef extern from "arrow/util/key_value_metadata.h" namespace "arrow" nogil: void Append(const c_string& key, const c_string& value) void ToUnorderedMap(unordered_map[c_string, c_string]*) const + cdef extern from "arrow/api.h" namespace "arrow" nogil: enum Type" arrow::Type::type": @@ -205,7 +206,8 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil: shared_ptr[CField] field(int i) shared_ptr[const CKeyValueMetadata] metadata() - shared_ptr[CField] GetFieldByName(c_string& name) + shared_ptr[CField] GetFieldByName(const c_string& name) + int64_t GetFieldIndex(const c_string& name) int num_fields() c_string ToString() @@ -686,8 +688,10 @@ cdef extern from "arrow/python/api.h" namespace "arrow::py" nogil: cdef cppclass PyBytesReader(CBufferReader): PyBytesReader(object fo) + cdef extern from 'arrow/python/init.h': int arrow_init_numpy() except -1 + cdef extern from 'arrow/python/config.h' namespace 'arrow::py': void set_numpy_nan(object o) diff --git a/python/pyarrow/io.pxi b/python/pyarrow/io.pxi index a153f222700..23eb6ef1082 100644 --- a/python/pyarrow/io.pxi +++ b/python/pyarrow/io.pxi @@ -462,7 +462,7 @@ cdef class Buffer: def __cinit__(self): pass - cdef init(self, const shared_ptr[CBuffer]& buffer): + cdef void init(self, const shared_ptr[CBuffer]& buffer): self.buffer = buffer self.shape[0] = self.size self.strides[0] = (1) diff --git a/python/pyarrow/ipc.py b/python/pyarrow/ipc.py index 8338de33a0d..a09e264cb4f 100644 --- a/python/pyarrow/ipc.py +++ b/python/pyarrow/ipc.py @@ -17,6 +17,7 @@ # Arrow file and stream reader/writer classes, and other messaging tools +import pyarrow as pa import pyarrow.lib as lib @@ -119,3 +120,40 @@ def open_file(source, footer_offset=None): reader : RecordBatchFileReader """ return RecordBatchFileReader(source, footer_offset=footer_offset) + + +def serialize_pandas(df): + """Serialize a pandas DataFrame into a buffer protocol compatible object. + + Parameters + ---------- + df : pandas.DataFrame + + Returns + ------- + buf : buffer + An object compatible with the buffer protocol + """ + batch = pa.RecordBatch.from_pandas(df) + sink = pa.InMemoryOutputStream() + writer = pa.RecordBatchFileWriter(sink, batch.schema) + writer.write_batch(batch) + writer.close() + return sink.get_result() + + +def deserialize_pandas(buf, nthreads=1): + """Deserialize a buffer protocol compatible object into a pandas DataFrame. + + Parameters + ---------- + buf : buffer + An object compatible with the buffer protocol + + Returns + ------- + df : pandas.DataFrame + """ + buffer_reader = pa.BufferReader(buf) + reader = pa.RecordBatchFileReader(buffer_reader) + return reader.read_all().to_pandas(nthreads=nthreads) diff --git a/python/pyarrow/lib.pxd b/python/pyarrow/lib.pxd index d3d03aaaefa..4a2ab8698f6 100644 --- a/python/pyarrow/lib.pxd +++ b/python/pyarrow/lib.pxd @@ -33,7 +33,7 @@ cdef class MemoryPool: cdef: CMemoryPool* pool - cdef init(self, CMemoryPool* pool) + cdef void init(self, CMemoryPool* pool) cdef class LoggingMemoryPool(MemoryPool): @@ -89,7 +89,7 @@ cdef class Field: cdef readonly: DataType type - cdef init(self, const shared_ptr[CField]& field) + cdef void init(self, const shared_ptr[CField]& field) cdef class Schema: @@ -97,8 +97,8 @@ cdef class Schema: shared_ptr[CSchema] sp_schema CSchema* schema - cdef init(self, const vector[shared_ptr[CField]]& fields) - cdef init_schema(self, const shared_ptr[CSchema]& schema) + cdef void init(self, const vector[shared_ptr[CField]]& fields) + cdef void init_schema(self, const shared_ptr[CSchema]& schema) cdef class Scalar: @@ -155,7 +155,7 @@ cdef class Array: cdef readonly: DataType type - cdef init(self, const shared_ptr[CArray]& sp_array) + cdef void init(self, const shared_ptr[CArray]& sp_array) cdef getitem(self, int64_t i) @@ -167,7 +167,7 @@ cdef class Tensor: cdef readonly: DataType type - cdef init(self, const shared_ptr[CTensor]& sp_tensor) + cdef void init(self, const shared_ptr[CTensor]& sp_tensor) cdef class NullArray(Array): @@ -266,8 +266,8 @@ cdef class ChunkedArray: shared_ptr[CChunkedArray] sp_chunked_array CChunkedArray* chunked_array - cdef init(self, const shared_ptr[CChunkedArray]& chunked_array) - cdef _check_nullptr(self) + cdef void init(self, const shared_ptr[CChunkedArray]& chunked_array) + cdef int _check_nullptr(self) except -1 cdef class Column: @@ -275,8 +275,8 @@ cdef class Column: shared_ptr[CColumn] sp_column CColumn* column - cdef init(self, const shared_ptr[CColumn]& column) - cdef _check_nullptr(self) + cdef void init(self, const shared_ptr[CColumn]& column) + cdef int _check_nullptr(self) except -1 cdef class Table: @@ -284,8 +284,8 @@ cdef class Table: shared_ptr[CTable] sp_table CTable* table - cdef init(self, const shared_ptr[CTable]& table) - cdef _check_nullptr(self) + cdef void init(self, const shared_ptr[CTable]& table) + cdef int _check_nullptr(self) except -1 cdef class RecordBatch: @@ -294,8 +294,8 @@ cdef class RecordBatch: CRecordBatch* batch Schema _schema - cdef init(self, const shared_ptr[CRecordBatch]& table) - cdef _check_nullptr(self) + cdef void init(self, const shared_ptr[CRecordBatch]& table) + cdef int _check_nullptr(self) except -1 cdef class Buffer: @@ -304,7 +304,7 @@ cdef class Buffer: Py_ssize_t shape[1] Py_ssize_t strides[1] - cdef init(self, const shared_ptr[CBuffer]& buffer) + cdef void init(self, const shared_ptr[CBuffer]& buffer) cdef class NativeFile: @@ -335,3 +335,5 @@ cdef public object pyarrow_wrap_tensor(const shared_ptr[CTensor]& sp_tensor) cdef public object pyarrow_wrap_column(const shared_ptr[CColumn]& ccolumn) cdef public object pyarrow_wrap_table(const shared_ptr[CTable]& ctable) cdef public object pyarrow_wrap_batch(const shared_ptr[CRecordBatch]& cbatch) + +cdef dict box_metadata(const CKeyValueMetadata* sp_metadata) diff --git a/python/pyarrow/memory.pxi b/python/pyarrow/memory.pxi index 15d59d237ad..6671a012550 100644 --- a/python/pyarrow/memory.pxi +++ b/python/pyarrow/memory.pxi @@ -21,7 +21,7 @@ cdef class MemoryPool: - cdef init(self, CMemoryPool* pool): + cdef void init(self, CMemoryPool* pool): self.pool = pool def bytes_allocated(self): diff --git a/python/pyarrow/pandas_compat.py b/python/pyarrow/pandas_compat.py new file mode 100644 index 00000000000..a470aa682c7 --- /dev/null +++ b/python/pyarrow/pandas_compat.py @@ -0,0 +1,102 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import re +import json +import pandas as pd + +import six + +from pyarrow.compat import PY2 + + +INDEX_LEVEL_NAME_REGEX = re.compile(r'^__index_level_\d+__$') + + +def is_unnamed_index_level(name): + return INDEX_LEVEL_NAME_REGEX.match(name) is not None + + +def infer_dtype(column): + try: + return pd.api.types.infer_dtype(column) + except AttributeError: + return pd.lib.infer_dtype(column) + + +def get_column_metadata(column, name): + inferred_dtype = infer_dtype(column) + dtype = column.dtype + + if hasattr(dtype, 'categories'): + extra_metadata = { + 'num_categories': len(column.cat.categories), + 'ordered': column.cat.ordered, + } + elif hasattr(dtype, 'tz'): + extra_metadata = {'timezone': str(dtype.tz)} + else: + extra_metadata = None + + if not isinstance(name, six.string_types): + raise TypeError( + 'Column name must be a string. Got column {} of type {}'.format( + name, type(name).__name__ + ) + ) + + return { + 'name': name, + 'pandas_type': { + 'string': 'bytes' if PY2 else 'unicode', + 'datetime64': ( + 'datetimetz' if hasattr(dtype, 'tz') + else 'datetime' + ), + 'integer': str(dtype), + 'floating': str(dtype), + }.get(inferred_dtype, inferred_dtype), + 'numpy_dtype': str(dtype), + 'metadata': extra_metadata, + } + + +def index_level_name(index, i): + return index.name or '__index_level_{:d}__'.format(i) + + +def construct_metadata(df, index_levels): + return { + b'pandas': json.dumps( + { + 'index_columns': [ + index_level_name(level, i) + for i, level in enumerate(index_levels) + ], + 'columns': [ + get_column_metadata(df[name], name=name) + for name in df.columns + ] + [ + get_column_metadata( + level, name=index_level_name(level, i) + ) + for i, level in enumerate(index_levels) + ], + 'pandas_version': pd.__version__, + } + ).encode('utf8') + } diff --git a/python/pyarrow/parquet.py b/python/pyarrow/parquet.py index 050ec3176d7..6169ac64ee2 100644 --- a/python/pyarrow/parquet.py +++ b/python/pyarrow/parquet.py @@ -15,6 +15,9 @@ # specific language governing permissions and limitations # under the License. +import itertools +import json + import six import numpy as np @@ -99,6 +102,25 @@ def read(self, columns=None, nthreads=1): Content of the file as a table (of columns) """ column_indices = self._get_column_indices(columns) + if nthreads is not None: + self.reader.set_num_threads(nthreads) + + return self.reader.read_all(column_indices=column_indices) + + def read_pandas(self, columns=None, nthreads=1): + column_indices = self._get_column_indices(columns) + custom_metadata = self.metadata.metadata + + if custom_metadata and b'pandas' in custom_metadata: + index_columns = json.loads( + custom_metadata[b'pandas'].decode('utf8') + )['index_columns'] + else: + index_columns = [] + + if column_indices and index_columns: + column_indices += index_columns + if nthreads is not None: self.reader.set_num_threads(nthreads) return self.reader.read_all(column_indices=column_indices) @@ -106,9 +128,7 @@ def read(self, columns=None, nthreads=1): def _get_column_indices(self, column_names): if column_names is None: return None - else: - return [self.reader.column_name_idx(column) - for column in column_names] + return list(map(self.reader.column_name_idx, column_names)) # ---------------------------------------------------------------------- @@ -618,6 +638,19 @@ def read_table(source, columns=None, nthreads=1, metadata=None): return pf.read(columns=columns, nthreads=nthreads) +def read_pandas(source, columns=None, nthreads=1, metadata=None): + if is_string(source): + fs = LocalFilesystem.get_instance() + if fs.isdir(source): + raise NotImplementedError( + 'Reading Parquet files with DataFrame index metadata is not ' + 'yet supported' + ) + + pf = ParquetFile(source, metadata=metadata) + return pf.read_pandas(columns=columns, nthreads=nthreads) + + def write_table(table, where, row_group_size=None, version='1.0', use_dictionary=True, compression='snappy', **kwargs): """ diff --git a/python/pyarrow/table.pxi b/python/pyarrow/table.pxi index bd8cce41400..677be7fe6ad 100644 --- a/python/pyarrow/table.pxi +++ b/python/pyarrow/table.pxi @@ -15,8 +15,18 @@ # specific language governing permissions and limitations # under the License. +import json + from collections import OrderedDict +try: + import pandas as pd +except ImportError: + # The pure-Python based API works without a pandas installation + pass +else: + import pyarrow.pandas_compat as pdcompat + cdef class ChunkedArray: """ @@ -30,14 +40,18 @@ cdef class ChunkedArray: def __cinit__(self): self.chunked_array = NULL - cdef init(self, const shared_ptr[CChunkedArray]& chunked_array): + cdef void init(self, const shared_ptr[CChunkedArray]& chunked_array): self.sp_chunked_array = chunked_array self.chunked_array = chunked_array.get() - cdef _check_nullptr(self): + cdef int _check_nullptr(self) except -1: if self.chunked_array == NULL: - raise ReferenceError("ChunkedArray object references a NULL " - "pointer. Not initialized.") + raise ReferenceError( + "{} object references a NULL pointer. Not initialized.".format( + type(self).__name__ + ) + ) + return 0 def length(self): self._check_nullptr() @@ -111,7 +125,7 @@ cdef class Column: def __cinit__(self): self.column = NULL - cdef init(self, const shared_ptr[CColumn]& column): + cdef void init(self, const shared_ptr[CColumn]& column): self.sp_column = column self.column = column.get() @@ -142,7 +156,7 @@ cdef class Column: check_status(libarrow.ConvertColumnToPandas(self.sp_column, self, &out)) - return _pandas().Series(wrap_array_output(out), name=self.name) + return pd.Series(wrap_array_output(out), name=self.name) def equals(self, Column other): """ @@ -175,14 +189,17 @@ cdef class Column: """ return self.data.to_pylist() - cdef _check_nullptr(self): + cdef int _check_nullptr(self) except -1: if self.column == NULL: - raise ReferenceError("Column object references a NULL pointer." - "Not initialized.") + raise ReferenceError( + "{} object references a NULL pointer. Not initialized.".format( + type(self).__name__ + ) + ) + return 0 def __len__(self): - self._check_nullptr() - return self.column.length() + return self.length() def length(self): self._check_nullptr() @@ -248,8 +265,9 @@ cdef class Column: return chunked_array -cdef shared_ptr[const CKeyValueMetadata] key_value_metadata_from_dict( - dict metadata): +cdef shared_ptr[const CKeyValueMetadata] unbox_metadata(dict metadata): + if metadata is None: + return nullptr cdef: unordered_map[c_string, c_string] unordered_metadata = metadata return ( @@ -289,27 +307,45 @@ cdef int _schema_from_arrays( else: raise TypeError(type(arrays[0])) - schema.reset(new CSchema(fields, key_value_metadata_from_dict(metadata))) + schema.reset(new CSchema(fields, unbox_metadata(metadata))) return 0 -cdef tuple _dataframe_to_arrays(df, bint timestamps_to_ms, Schema schema): +cdef tuple _dataframe_to_arrays( + df, + bint timestamps_to_ms, + Schema schema, + bint preserve_index +): cdef: list names = [] list arrays = [] + list index_levels = [] DataType type = None - dict metadata = {} + dict metadata + + if preserve_index: + index_levels.extend(getattr(df.index, 'levels', [df.index])) for name in df.columns: col = df[name] if schema is not None: type = schema.field_by_name(name).type - arr = Array.from_pandas(col, type=type, - timestamps_to_ms=timestamps_to_ms) + arr = arrays.append( + Array.from_pandas( + col, type=type, timestamps_to_ms=timestamps_to_ms + ) + ) names.append(name) - arrays.append(arr) + for i, level in enumerate(index_levels): + arrays.append( + Array.from_pandas(level, timestamps_to_ms=timestamps_to_ms) + ) + names.append(pdcompat.index_level_name(level, i)) + + metadata = pdcompat.construct_metadata(df, index_levels) return names, arrays, metadata @@ -327,13 +363,18 @@ cdef class RecordBatch: self.batch = NULL self._schema = None - cdef init(self, const shared_ptr[CRecordBatch]& batch): + cdef void init(self, const shared_ptr[CRecordBatch]& batch): self.sp_batch = batch self.batch = batch.get() - cdef _check_nullptr(self): + cdef int _check_nullptr(self) except -1: if self.batch == NULL: - raise ReferenceError("Object not initialized") + raise ReferenceError( + "{} object references a NULL pointer. Not initialized.".format( + type(self).__name__ + ) + ) + return 0 def __len__(self): self._check_nullptr() @@ -455,22 +496,27 @@ cdef class RecordBatch: return Table.from_batches([self]).to_pandas(nthreads=nthreads) @classmethod - def from_pandas(cls, df, schema=None): + def from_pandas(cls, df, Schema schema=None, bint preserve_index=True): """ Convert pandas.DataFrame to an Arrow RecordBatch Parameters ---------- df: pandas.DataFrame - schema: pyarrow.Schema (optional) + schema: pyarrow.Schema, optional The expected schema of the RecordBatch. This can be used to indicate the type of columns if we cannot infer it automatically. + preserve_index : bool, optional + Whether to store the index as an additional column in the resulting + ``RecordBatch``. Returns ------- pyarrow.RecordBatch """ - names, arrays, metadata = _dataframe_to_arrays(df, False, schema) + names, arrays, metadata = _dataframe_to_arrays( + df, False, schema, preserve_index + ) return cls.from_arrays(arrays, names, metadata) @staticmethod @@ -503,7 +549,7 @@ cdef class RecordBatch: raise ValueError('Record batch cannot contain no arrays (for now)') num_rows = len(arrays[0]) - _schema_from_arrays(arrays, names, metadata or {}, &schema) + _schema_from_arrays(arrays, names, metadata, &schema) c_arrays.reserve(len(arrays)) for arr in arrays: @@ -513,19 +559,55 @@ cdef class RecordBatch: return pyarrow_wrap_batch(batch) -cdef table_to_blockmanager(const shared_ptr[CTable]& table, int nthreads): - cdef: - PyObject* result_obj - CColumn* col - int i - +cdef table_to_blockmanager(const shared_ptr[CTable]& ctable, int nthreads): import pandas.core.internals as _int from pandas import RangeIndex, Categorical from pyarrow.compat import DatetimeTZDtype + cdef: + Table table = pyarrow_wrap_table(ctable) + Table block_table = pyarrow_wrap_table(ctable) + Schema schema = table.schema + + size_t row_count = table.num_rows + size_t total_columns = table.num_columns + + dict metadata = schema.metadata + dict pandas_metadata = None + + list index_columns = [] + list index_arrays = [] + + if metadata is not None and b'pandas' in metadata: + pandas_metadata = json.loads(metadata[b'pandas'].decode('utf8')) + index_columns = pandas_metadata['index_columns'] + + cdef: + Column col + int64_t i + + for name in index_columns: + i = schema.get_field_index(name) + if i != -1: + col = table.column(i) + index_name = None if pdcompat.is_unnamed_index_level(name) else name + index_arrays.append( + pd.Index(col.to_pandas().values, name=index_name) + ) + block_table = block_table.remove_column( + block_table.schema.get_field_index(name) + ) + + cdef: + PyObject* result_obj + shared_ptr[CTable] c_block_table = block_table.sp_table + with nogil: - check_status(libarrow.ConvertTableToPandas(table, nthreads, - &result_obj)) + check_status( + libarrow.ConvertTableToPandas( + c_block_table, nthreads, &result_obj + ) + ) result = PyObject_to_object(result_obj) @@ -549,12 +631,13 @@ cdef table_to_blockmanager(const shared_ptr[CTable]& table, int nthreads): block = _int.make_block(block_arr, placement=placement) blocks.append(block) - names = [] - for i in range(table.get().num_columns()): - col = table.get().column(i).get() - names.append(frombytes(col.name())) + cdef list axes = [ + [column.name for column in block_table.itercolumns()], + pd.MultiIndex.from_arrays( + index_arrays + ) if index_arrays else pd.RangeIndex(row_count), + ] - axes = [names, RangeIndex(table.get().num_rows())] return _int.BlockManager(blocks, axes) @@ -572,16 +655,18 @@ cdef class Table: self.table = NULL def __repr__(self): - return 'pyarrow.Table\n{0}'.format(str(self.schema)) + return 'pyarrow.{}\n{}'.format(type(self).__name__, str(self.schema)) - cdef init(self, const shared_ptr[CTable]& table): + cdef void init(self, const shared_ptr[CTable]& table): self.sp_table = table self.table = table.get() - cdef _check_nullptr(self): - if self.table == NULL: - raise ReferenceError("Table object references a NULL pointer." - "Not initialized.") + cdef int _check_nullptr(self) except -1: + if self.table == nullptr: + raise ReferenceError( + "Table object references a NULL pointer. Not initialized." + ) + return 0 def equals(self, Table other): """ @@ -609,22 +694,29 @@ cdef class Table: return result @classmethod - def from_pandas(cls, df, timestamps_to_ms=False, schema=None): + def from_pandas( + cls, + df, + bint timestamps_to_ms=False, + Schema schema=None, + bint preserve_index=True + ): """ Convert pandas.DataFrame to an Arrow Table Parameters ---------- - df: pandas.DataFrame - - timestamps_to_ms: bool + df : pandas.DataFrame + timestamps_to_ms : bool Convert datetime columns to ms resolution. This is needed for compability with other functionality like Parquet I/O which only supports milliseconds. - - schema: pyarrow.Schema (optional) + schema : pyarrow.Schema, optional The expected schema of the Arrow Table. This can be used to indicate the type of columns if we cannot infer it automatically. + preserve_index : bool, optional + Whether to store the index as an additional column in the resulting + ``Table``. Returns ------- @@ -642,9 +734,12 @@ cdef class Table: >>> pa.Table.from_pandas(df) """ - names, arrays, metadata = _dataframe_to_arrays(df, - timestamps_to_ms=timestamps_to_ms, - schema=schema) + names, arrays, metadata = _dataframe_to_arrays( + df, + timestamps_to_ms=timestamps_to_ms, + schema=schema, + preserve_index=preserve_index + ) return cls.from_arrays(arrays, names=names, metadata=metadata) @staticmethod @@ -671,7 +766,7 @@ cdef class Table: shared_ptr[CTable] table size_t K = len(arrays) - _schema_from_arrays(arrays, names, metadata or {}, &schema) + _schema_from_arrays(arrays, names, metadata, &schema) columns.reserve(K) @@ -734,7 +829,7 @@ cdef class Table: nthreads = cpu_count() mgr = table_to_blockmanager(self.sp_table, nthreads) - return _pandas().DataFrame(mgr) + return pd.DataFrame(mgr) def to_pydict(self): """ @@ -744,11 +839,16 @@ cdef class Table: ------- OrderedDict """ - entries = [] - for i in range(self.table.num_columns()): - name = self.column(i).name - column = self.column(i).to_pylist() - entries.append((name, column)) + cdef: + size_t i + size_t num_columns = self.table.num_columns() + list entries = [] + Column column + + for i in range(num_columns): + column = self.column(i) + entries.append((column.name, column.to_pylist())) + return OrderedDict(entries) @property @@ -846,8 +946,7 @@ cdef class Table: """ Add column to Table at position. Returns new table """ - cdef: - shared_ptr[CTable] c_table + cdef shared_ptr[CTable] c_table with nogil: check_status(self.table.AddColumn(i, column.sp_column, &c_table)) diff --git a/python/pyarrow/tests/pandas_examples.py b/python/pyarrow/tests/pandas_examples.py index 313a3ae9f17..17ad4b22b9c 100644 --- a/python/pyarrow/tests/pandas_examples.py +++ b/python/pyarrow/tests/pandas_examples.py @@ -23,7 +23,7 @@ import pyarrow as pa -def dataframe_with_arrays(): +def dataframe_with_arrays(include_index=False): """ Dataframe with numpy arrays columns of every possible primtive type. @@ -72,13 +72,15 @@ def dataframe_with_arrays(): dtype='datetime64[ms]'), ] + if include_index: + fields.append(pa.field('__index_level_0__', pa.int64())) df = pd.DataFrame(arrays) schema = pa.schema(fields) return df, schema -def dataframe_with_lists(): +def dataframe_with_lists(include_index=False): """ Dataframe with list columns of every possible primtive type. @@ -113,6 +115,8 @@ def dataframe_with_lists(): [u"1", u"2", u"3"] ] + if include_index: + fields.append(pa.field('__index_level_0__', pa.int64())) df = pd.DataFrame(arrays) schema = pa.schema(fields) diff --git a/python/pyarrow/tests/test_convert_pandas.py b/python/pyarrow/tests/test_convert_pandas.py index be35905fc75..ca304558c5f 100644 --- a/python/pyarrow/tests/test_convert_pandas.py +++ b/python/pyarrow/tests/test_convert_pandas.py @@ -67,9 +67,10 @@ def tearDown(self): def _check_pandas_roundtrip(self, df, expected=None, nthreads=1, timestamps_to_ms=False, expected_schema=None, - check_dtype=True, schema=None): + check_dtype=True, schema=None, + check_index=False): table = pa.Table.from_pandas(df, timestamps_to_ms=timestamps_to_ms, - schema=schema) + schema=schema, preserve_index=check_index) result = table.to_pandas(nthreads=nthreads) if expected_schema: assert table.schema.equals(expected_schema) @@ -299,8 +300,11 @@ def test_timestamps_notimezone_no_nulls(self): }) field = pa.field('datetime64', pa.timestamp('ms')) schema = pa.schema([field]) - self._check_pandas_roundtrip(df, timestamps_to_ms=True, - expected_schema=schema) + self._check_pandas_roundtrip( + df, + timestamps_to_ms=True, + expected_schema=schema, + ) df = pd.DataFrame({ 'datetime64': np.array([ @@ -311,8 +315,11 @@ def test_timestamps_notimezone_no_nulls(self): }) field = pa.field('datetime64', pa.timestamp('ns')) schema = pa.schema([field]) - self._check_pandas_roundtrip(df, timestamps_to_ms=False, - expected_schema=schema) + self._check_pandas_roundtrip( + df, + timestamps_to_ms=False, + expected_schema=schema, + ) def test_timestamps_notimezone_nulls(self): df = pd.DataFrame({ @@ -324,8 +331,11 @@ def test_timestamps_notimezone_nulls(self): }) field = pa.field('datetime64', pa.timestamp('ms')) schema = pa.schema([field]) - self._check_pandas_roundtrip(df, timestamps_to_ms=True, - expected_schema=schema) + self._check_pandas_roundtrip( + df, + timestamps_to_ms=True, + expected_schema=schema, + ) df = pd.DataFrame({ 'datetime64': np.array([ @@ -336,8 +346,11 @@ def test_timestamps_notimezone_nulls(self): }) field = pa.field('datetime64', pa.timestamp('ns')) schema = pa.schema([field]) - self._check_pandas_roundtrip(df, timestamps_to_ms=False, - expected_schema=schema) + self._check_pandas_roundtrip( + df, + timestamps_to_ms=False, + expected_schema=schema, + ) def test_timestamps_with_timezone(self): df = pd.DataFrame({ @@ -370,7 +383,7 @@ def test_date_infer(self): None, datetime.date(1970, 1, 1), datetime.date(2040, 2, 26)]}) - table = pa.Table.from_pandas(df) + table = pa.Table.from_pandas(df, preserve_index=False) field = pa.field('date', pa.date32()) schema = pa.schema([field]) assert table.schema.equals(schema) @@ -446,7 +459,7 @@ def test_timedelta(self): def test_column_of_arrays(self): df, schema = dataframe_with_arrays() self._check_pandas_roundtrip(df, schema=schema, expected_schema=schema) - table = pa.Table.from_pandas(df, schema=schema) + table = pa.Table.from_pandas(df, schema=schema, preserve_index=False) assert table.schema.equals(schema) for column in df.columns: @@ -456,7 +469,7 @@ def test_column_of_arrays(self): def test_column_of_lists(self): df, schema = dataframe_with_lists() self._check_pandas_roundtrip(df, schema=schema, expected_schema=schema) - table = pa.Table.from_pandas(df, schema=schema) + table = pa.Table.from_pandas(df, schema=schema, preserve_index=False) assert table.schema.equals(schema) for column in df.columns: @@ -543,7 +556,7 @@ def test_decimal_32_from_pandas(self): decimal.Decimal('1234.439'), ] }) - converted = pa.Table.from_pandas(expected) + converted = pa.Table.from_pandas(expected, preserve_index=False) field = pa.field('decimals', pa.decimal(7, 3)) schema = pa.schema([field]) assert converted.schema.equals(schema) @@ -566,7 +579,7 @@ def test_decimal_64_from_pandas(self): decimal.Decimal('129534.123731'), ] }) - converted = pa.Table.from_pandas(expected) + converted = pa.Table.from_pandas(expected, preserve_index=False) field = pa.field('decimals', pa.decimal(12, 6)) schema = pa.schema([field]) assert converted.schema.equals(schema) @@ -589,7 +602,7 @@ def test_decimal_128_from_pandas(self): -decimal.Decimal('314292388910493.12343437128'), ] }) - converted = pa.Table.from_pandas(expected) + converted = pa.Table.from_pandas(expected, preserve_index=False) field = pa.field('decimals', pa.decimal(26, 11)) schema = pa.schema([field]) assert converted.schema.equals(schema) diff --git a/python/pyarrow/tests/test_ipc.py b/python/pyarrow/tests/test_ipc.py index 994876de3c1..eeea39ab194 100644 --- a/python/pyarrow/tests/test_ipc.py +++ b/python/pyarrow/tests/test_ipc.py @@ -240,6 +240,57 @@ def test_get_record_batch_size(): assert pa.get_record_batch_size(batch) > (N * itemsize) +def test_pandas_serialize_round_trip(): + index = pd.Index([1, 2, 3], name='my_index') + columns = ['foo', 'bar'] + df = pd.DataFrame( + {'foo': [1.5, 1.6, 1.7], 'bar': list('abc')}, + index=index, columns=columns + ) + buf = pa.serialize_pandas(df) + result = pa.deserialize_pandas(buf) + assert_frame_equal(result, df) + + +def test_pandas_serialize_round_trip_nthreads(): + index = pd.Index([1, 2, 3], name='my_index') + columns = ['foo', 'bar'] + df = pd.DataFrame( + {'foo': [1.5, 1.6, 1.7], 'bar': list('abc')}, + index=index, columns=columns + ) + buf = pa.serialize_pandas(df) + result = pa.deserialize_pandas(buf, nthreads=2) + assert_frame_equal(result, df) + + +def test_pandas_serialize_round_trip_multi_index(): + index1 = pd.Index([1, 2, 3], name='level_1') + index2 = pd.Index(list('def'), name=None) + index = pd.MultiIndex.from_arrays([index1, index2]) + + columns = ['foo', 'bar'] + df = pd.DataFrame( + {'foo': [1.5, 1.6, 1.7], 'bar': list('abc')}, + index=index, + columns=columns, + ) + buf = pa.serialize_pandas(df) + result = pa.deserialize_pandas(buf) + assert_frame_equal(result, df) + + +@pytest.mark.xfail( + raises=TypeError, + reason='Non string columns are not supported', +) +def test_pandas_serialize_round_trip_not_string_columns(): + df = pd.DataFrame(list(zip([1.5, 1.6, 1.7], 'abc'))) + buf = pa.serialize_pandas(df) + result = pa.deserialize_pandas(buf) + assert_frame_equal(result, df) + + def write_file(batch, sink): writer = pa.RecordBatchFileWriter(sink, batch.schema) writer.write_batch(batch) diff --git a/python/pyarrow/tests/test_parquet.py b/python/pyarrow/tests/test_parquet.py index 5dbe6574756..644efaafc06 100644 --- a/python/pyarrow/tests/test_parquet.py +++ b/python/pyarrow/tests/test_parquet.py @@ -19,6 +19,7 @@ import datetime import io import os +import json import pytest from pyarrow.compat import guid, u @@ -31,15 +32,11 @@ import pandas.util.testing as tm -try: - import pyarrow.parquet as pq - HAVE_PARQUET = True -except ImportError: - HAVE_PARQUET = False +# Skip all parquet tests if we can't import pyarrow.parquet +pq = pytest.importorskip('pyarrow.parquet') -# XXX: Make Parquet tests opt-in rather than skip-if-not-build -parquet = pytest.mark.skipif(not HAVE_PARQUET, - reason='Parquet support not built') +# Ignore these with pytest ... -m 'not parquet' +parquet = pytest.mark.parquet @parquet @@ -91,8 +88,14 @@ def test_pandas_parquet_2_0_rountrip(tmpdir): filename = tmpdir.join('pandas_rountrip.parquet') arrow_table = pa.Table.from_pandas(df, timestamps_to_ms=True) + assert b'pandas' in arrow_table.schema.metadata + pq.write_table(arrow_table, filename.strpath, version="2.0") - table_read = pq.read_table(filename.strpath) + table_read = pq.read_pandas(filename.strpath) + assert b'pandas' in table_read.schema.metadata + + assert arrow_table.schema.metadata == table_read.schema.metadata + df_read = table_read.to_pandas() tm.assert_frame_equal(df, df_read) @@ -167,7 +170,6 @@ def _test_dataframe(size=10000, seed=0): 'int32': _random_integers(size, np.int32), 'int64': _random_integers(size, np.int64), 'float32': np.random.randn(size).astype(np.float32), - 'float64': np.random.randn(size), 'float64': np.arange(size, dtype=np.float64), 'bool': np.random.randn(size) > 0, 'strings': [tm.rands(10) for i in range(size)] @@ -270,7 +272,7 @@ def test_parquet_metadata_api(): meta = fileh.metadata repr(meta) assert meta.num_rows == len(df) - assert meta.num_columns == ncols + assert meta.num_columns == ncols + 1 # +1 for index assert meta.num_row_groups == 1 assert meta.format_version == '2.0' assert 'parquet-cpp' in meta.created_by @@ -278,7 +280,7 @@ def test_parquet_metadata_api(): # Schema schema = fileh.schema assert meta.schema is schema - assert len(schema) == ncols + assert len(schema) == ncols + 1 # +1 for index repr(schema) col = schema[0] @@ -292,7 +294,7 @@ def test_parquet_metadata_api(): assert col.logical_type == 'NONE' with pytest.raises(IndexError): - schema[ncols] + schema[ncols + 1] # +1 for index with pytest.raises(IndexError): schema[-1] @@ -302,7 +304,7 @@ def test_parquet_metadata_api(): repr(rg_meta) assert rg_meta.num_rows == len(df) - assert rg_meta.num_columns == ncols + assert rg_meta.num_columns == ncols + 1 # +1 for index @parquet @@ -502,9 +504,22 @@ def test_read_single_row_group(): result = pa.concat_tables(row_groups) tm.assert_frame_equal(df, result.to_pandas()) + +@parquet +def test_read_single_row_group_with_column_subset(): + N, K = 10000, 4 + df = alltypes_sample(size=N) + a_table = pa.Table.from_pandas(df, timestamps_to_ms=True) + + buf = io.BytesIO() + pq.write_table(a_table, buf, row_group_size=N / K, + compression='snappy', version='2.0') + + buf.seek(0) + pf = pq.ParquetFile(buf) + cols = df.columns[:2] - row_groups = [pf.read_row_group(i, columns=cols) - for i in range(K)] + row_groups = [pf.read_row_group(i, columns=cols) for i in range(K)] result = pa.concat_tables(row_groups) tm.assert_frame_equal(df[cols], result.to_pandas()) @@ -696,6 +711,9 @@ def read_multiple_files(paths, columns=None, nthreads=None, **kwargs): assert result.equals(expected) + with pytest.raises(NotImplementedError): + pq.read_pandas(dirpath) + # Read with provided metadata metadata = pq.ParquetFile(paths[0]).metadata @@ -706,10 +724,11 @@ def read_multiple_files(paths, columns=None, nthreads=None, **kwargs): assert result3.equals(expected) # Read column subset - to_read = [result[0], result[3], result[6]] + to_read = [result[0], result[2], result[6], result[result.num_columns - 1]] + result = pa.localfs.read_parquet( dirpath, columns=[c.name for c in to_read]) - expected = pa.Table.from_arrays(to_read) + expected = pa.Table.from_arrays(to_read, metadata=result.schema.metadata) assert result.equals(expected) # Read with multiple threads diff --git a/python/pyarrow/tests/test_table.py b/python/pyarrow/tests/test_table.py index 72ce6967edf..ed2201171a9 100644 --- a/python/pyarrow/tests/test_table.py +++ b/python/pyarrow/tests/test_table.py @@ -142,7 +142,7 @@ def test_recordbatchlist_to_pandas(): table = pa.Table.from_batches([batch1, batch2]) result = table.to_pandas() - data = pd.concat([data1, data2], ignore_index=True) + data = pd.concat([data1, data2]) assert_frame_equal(data, result) From c35970cabc9df4a1d50bc4bc643cfa5dda92ebe7 Mon Sep 17 00:00:00 2001 From: Phillip Cloud Date: Mon, 15 May 2017 19:01:10 -0400 Subject: [PATCH 02/10] Add test for no index written and pq.read_pandas --- python/pyarrow/tests/test_parquet.py | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/python/pyarrow/tests/test_parquet.py b/python/pyarrow/tests/test_parquet.py index 644efaafc06..a21235409d4 100644 --- a/python/pyarrow/tests/test_parquet.py +++ b/python/pyarrow/tests/test_parquet.py @@ -100,6 +100,29 @@ def test_pandas_parquet_2_0_rountrip(tmpdir): tm.assert_frame_equal(df, df_read) +@parquet +def test_pandas_parquet_2_0_rountrip_read_pandas_no_index_written(tmpdir): + df = alltypes_sample(size=10000) + + filename = tmpdir.join('pandas_rountrip.parquet') + arrow_table = pa.Table.from_pandas( + df, timestamps_to_ms=True, preserve_index=False + ) + js = json.loads(arrow_table.schema.metadata[b'pandas']) + assert not js['index_columns'] + + pq.write_table(arrow_table, filename.strpath, version="2.0") + table_read = pq.read_pandas(filename.strpath) + + js = json.loads(table_read.schema.metadata[b'pandas']) + assert not js['index_columns'] + + assert arrow_table.schema.metadata == table_read.schema.metadata + + df_read = table_read.to_pandas() + tm.assert_frame_equal(df, df_read) + + @parquet def test_pandas_parquet_1_0_rountrip(tmpdir): size = 10000 From 21a88295f7905f8200161edfc54ce031840ff456 Mon Sep 17 00:00:00 2001 From: Phillip Cloud Date: Mon, 15 May 2017 19:01:47 -0400 Subject: [PATCH 03/10] Add docs to pq.read_pandas --- python/pyarrow/parquet.py | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/python/pyarrow/parquet.py b/python/pyarrow/parquet.py index 6169ac64ee2..1a11a8598ae 100644 --- a/python/pyarrow/parquet.py +++ b/python/pyarrow/parquet.py @@ -639,6 +639,29 @@ def read_table(source, columns=None, nthreads=1, metadata=None): def read_pandas(source, columns=None, nthreads=1, metadata=None): + """ + Read a Table from Parquet format, reconstructing the index values if + available. + + Parameters + ---------- + source: str or pyarrow.io.NativeFile + Location of Parquet dataset. If a string passed, can be a single file + name or directory name. For passing Python file objects or byte + buffers, see pyarrow.io.PythonFileInterface or pyarrow.io.BufferReader. + columns: list + If not None, only these columns will be read from the file. + nthreads : int, default 1 + Number of columns to read in parallel. Requires that the underlying + file source is threadsafe + metadata : FileMetaData + If separately computed + + Returns + ------- + pyarrow.Table + Content of the file as a table (of columns) + """ if is_string(source): fs = LocalFilesystem.get_instance() if fs.isdir(source): From 2fa1f16dbd3b9be8a20b34e24afafbd83c352040 Mon Sep 17 00:00:00 2001 From: Phillip Cloud Date: Mon, 15 May 2017 19:02:02 -0400 Subject: [PATCH 04/10] Do not write index_column metadata if not requested --- python/pyarrow/pandas_compat.py | 18 ++++++++++-------- python/pyarrow/table.pxi | 2 +- 2 files changed, 11 insertions(+), 9 deletions(-) diff --git a/python/pyarrow/pandas_compat.py b/python/pyarrow/pandas_compat.py index a470aa682c7..2f72d6a6382 100644 --- a/python/pyarrow/pandas_compat.py +++ b/python/pyarrow/pandas_compat.py @@ -79,23 +79,25 @@ def index_level_name(index, i): return index.name or '__index_level_{:d}__'.format(i) -def construct_metadata(df, index_levels): +def construct_metadata(df, index_levels, preserve_index): return { b'pandas': json.dumps( { 'index_columns': [ index_level_name(level, i) for i, level in enumerate(index_levels) - ], + ] if preserve_index else [], 'columns': [ get_column_metadata(df[name], name=name) for name in df.columns - ] + [ - get_column_metadata( - level, name=index_level_name(level, i) - ) - for i, level in enumerate(index_levels) - ], + ] + ( + [ + get_column_metadata( + level, name=index_level_name(level, i) + ) + for i, level in enumerate(index_levels) + ] if preserve_index else [] + ), 'pandas_version': pd.__version__, } ).encode('utf8') diff --git a/python/pyarrow/table.pxi b/python/pyarrow/table.pxi index 677be7fe6ad..c9915c1be3c 100644 --- a/python/pyarrow/table.pxi +++ b/python/pyarrow/table.pxi @@ -345,7 +345,7 @@ cdef tuple _dataframe_to_arrays( ) names.append(pdcompat.index_level_name(level, i)) - metadata = pdcompat.construct_metadata(df, index_levels) + metadata = pdcompat.construct_metadata(df, index_levels, preserve_index) return names, arrays, metadata From 32c5e64b138a16d9658c68d77fb7cf9d7359da58 Mon Sep 17 00:00:00 2001 From: Phillip Cloud Date: Tue, 16 May 2017 10:45:19 -0400 Subject: [PATCH 05/10] Add test for read_pandas subset --- python/pyarrow/tests/test_parquet.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/python/pyarrow/tests/test_parquet.py b/python/pyarrow/tests/test_parquet.py index a21235409d4..857c2e63f42 100644 --- a/python/pyarrow/tests/test_parquet.py +++ b/python/pyarrow/tests/test_parquet.py @@ -212,6 +212,18 @@ def test_pandas_parquet_native_file_roundtrip(tmpdir): tm.assert_frame_equal(df, df_read) +@parquet +def test_read_pandas_column_subset(tmpdir): + df = _test_dataframe(10000) + arrow_table = pa.Table.from_pandas(df) + imos = pa.BufferOutputStream() + pq.write_table(arrow_table, imos, version="2.0") + buf = imos.get_result() + reader = pa.BufferReader(buf) + df_read = pq.read_pandas(reader, columns=['strings', 'uint8']).to_pandas() + tm.assert_frame_equal(df[['strings', 'uint8']], df_read) + + @parquet def test_pandas_parquet_pyfile_roundtrip(tmpdir): filename = tmpdir.join('pandas_pyfile_roundtrip.parquet').strpath From 2198dc5fd4a8acbeb5df51af7bba417cb5f65851 Mon Sep 17 00:00:00 2001 From: Phillip Cloud Date: Tue, 16 May 2017 10:45:36 -0400 Subject: [PATCH 06/10] Call column_name_idx on index_columns --- python/pyarrow/parquet.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/pyarrow/parquet.py b/python/pyarrow/parquet.py index 1a11a8598ae..f95c2cefd28 100644 --- a/python/pyarrow/parquet.py +++ b/python/pyarrow/parquet.py @@ -118,8 +118,8 @@ def read_pandas(self, columns=None, nthreads=1): else: index_columns = [] - if column_indices and index_columns: - column_indices += index_columns + if column_indices is not None and index_columns: + column_indices += map(self.reader.column_name_idx, index_columns) if nthreads is not None: self.reader.set_num_threads(nthreads) From a42a0845e3253b275b86bf98744104afd12a92c9 Mon Sep 17 00:00:00 2001 From: Phillip Cloud Date: Tue, 16 May 2017 10:48:30 -0400 Subject: [PATCH 07/10] Decode metadata to utf8 because JSON --- python/pyarrow/tests/test_parquet.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/pyarrow/tests/test_parquet.py b/python/pyarrow/tests/test_parquet.py index 857c2e63f42..5641cbb77de 100644 --- a/python/pyarrow/tests/test_parquet.py +++ b/python/pyarrow/tests/test_parquet.py @@ -108,13 +108,13 @@ def test_pandas_parquet_2_0_rountrip_read_pandas_no_index_written(tmpdir): arrow_table = pa.Table.from_pandas( df, timestamps_to_ms=True, preserve_index=False ) - js = json.loads(arrow_table.schema.metadata[b'pandas']) + js = json.loads(arrow_table.schema.metadata[b'pandas'].decode('utf8')) assert not js['index_columns'] pq.write_table(arrow_table, filename.strpath, version="2.0") table_read = pq.read_pandas(filename.strpath) - js = json.loads(table_read.schema.metadata[b'pandas']) + js = json.loads(table_read.schema.metadata[b'pandas'].decode('utf8')) assert not js['index_columns'] assert arrow_table.schema.metadata == table_read.schema.metadata From de616e89fadda72ea785ea873e97f0bc343ede6a Mon Sep 17 00:00:00 2001 From: Phillip Cloud Date: Tue, 16 May 2017 10:54:44 -0400 Subject: [PATCH 08/10] Add doc --- python/pyarrow/parquet.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/python/pyarrow/parquet.py b/python/pyarrow/parquet.py index f95c2cefd28..e69d85eac4e 100644 --- a/python/pyarrow/parquet.py +++ b/python/pyarrow/parquet.py @@ -647,8 +647,8 @@ def read_pandas(source, columns=None, nthreads=1, metadata=None): ---------- source: str or pyarrow.io.NativeFile Location of Parquet dataset. If a string passed, can be a single file - name or directory name. For passing Python file objects or byte - buffers, see pyarrow.io.PythonFileInterface or pyarrow.io.BufferReader. + name. For passing Python file objects or byte buffers, + see pyarrow.io.PythonFileInterface or pyarrow.io.BufferReader. columns: list If not None, only these columns will be read from the file. nthreads : int, default 1 @@ -660,14 +660,15 @@ def read_pandas(source, columns=None, nthreads=1, metadata=None): Returns ------- pyarrow.Table - Content of the file as a table (of columns) + Content of the file as a Table of Columns, including DataFrame indexes + as Columns. """ if is_string(source): fs = LocalFilesystem.get_instance() if fs.isdir(source): raise NotImplementedError( - 'Reading Parquet files with DataFrame index metadata is not ' - 'yet supported' + 'Reading a directory of Parquet files with DataFrame index ' + 'metadata is not yet supported' ) pf = ParquetFile(source, metadata=metadata) From 60f71aacffa674fd1a25a7e42771caac96d2c06a Mon Sep 17 00:00:00 2001 From: Phillip Cloud Date: Tue, 16 May 2017 10:57:04 -0400 Subject: [PATCH 09/10] More doc --- python/pyarrow/ipc.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/python/pyarrow/ipc.py b/python/pyarrow/ipc.py index a09e264cb4f..a61d746d652 100644 --- a/python/pyarrow/ipc.py +++ b/python/pyarrow/ipc.py @@ -149,6 +149,8 @@ def deserialize_pandas(buf, nthreads=1): ---------- buf : buffer An object compatible with the buffer protocol + nthreads : int, optional + The number of threads to use to convert the buffer to a DataFrame. Returns ------- From 4fa679da86ce11de8237789b9ed618a2e92d86e8 Mon Sep 17 00:00:00 2001 From: Phillip Cloud Date: Tue, 16 May 2017 11:02:08 -0400 Subject: [PATCH 10/10] Add metadata test --- python/pyarrow/tests/test_parquet.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/python/pyarrow/tests/test_parquet.py b/python/pyarrow/tests/test_parquet.py index 5641cbb77de..db446d3c4e7 100644 --- a/python/pyarrow/tests/test_parquet.py +++ b/python/pyarrow/tests/test_parquet.py @@ -100,6 +100,24 @@ def test_pandas_parquet_2_0_rountrip(tmpdir): tm.assert_frame_equal(df, df_read) +@parquet +def test_pandas_parquet_custom_metadata(tmpdir): + df = alltypes_sample(size=10000) + + filename = tmpdir.join('pandas_rountrip.parquet') + arrow_table = pa.Table.from_pandas(df, timestamps_to_ms=True) + assert b'pandas' in arrow_table.schema.metadata + + pq.write_table(arrow_table, filename.strpath, version="2.0") + pf = pq.ParquetFile(filename.strpath) + + md = pf.metadata.metadata + assert b'pandas' in md + + js = json.loads(md[b'pandas'].decode('utf8')) + assert js['index_columns'] == ['__index_level_0__'] + + @parquet def test_pandas_parquet_2_0_rountrip_read_pandas_no_index_written(tmpdir): df = alltypes_sample(size=10000)