diff --git a/python/pyarrow/__init__.py b/python/pyarrow/__init__.py index 775ce7ec475..d4d0f00c523 100644 --- a/python/pyarrow/__init__.py +++ b/python/pyarrow/__init__.py @@ -41,5 +41,7 @@ list_, struct, field, DataType, Field, Schema, schema) -from pyarrow.table import Column, RecordBatch, Table, from_pandas_dataframe +from pyarrow.table import (Column, RecordBatch, dataframe_from_batches, Table, + from_pandas_dataframe) + from pyarrow.version import version as __version__ diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index 19da4085e1b..350ebe30c9b 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -158,6 +158,9 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil: CColumn(const shared_ptr[CField]& field, const shared_ptr[CArray]& data) + CColumn(const shared_ptr[CField]& field, + const vector[shared_ptr[CArray]]& chunks) + int64_t length() int64_t null_count() const c_string& name() diff --git a/python/pyarrow/table.pyx b/python/pyarrow/table.pyx index a6715b141ce..45cf7beccee 100644 --- a/python/pyarrow/table.pyx +++ b/python/pyarrow/table.pyx @@ -28,6 +28,7 @@ cimport pyarrow.includes.pyarrow as pyarrow import pyarrow.config from pyarrow.array cimport Array, box_arrow_array +from pyarrow.error import ArrowException from pyarrow.error cimport check_status from pyarrow.schema cimport box_data_type, box_schema @@ -414,6 +415,52 @@ cdef class RecordBatch: return result +def dataframe_from_batches(batches): + """ + Convert a list of Arrow RecordBatches to a pandas.DataFrame + + Parameters + ---------- + + batches: list of RecordBatch + RecordBatch list to be converted, schemas must be equal + """ + + cdef: + vector[shared_ptr[CArray]] c_array_chunks + vector[shared_ptr[CColumn]] c_columns + shared_ptr[CTable] c_table + Array arr + Schema schema + + import pandas as pd + + schema = batches[0].schema + + # check schemas are equal + if any((not schema.equals(other.schema) for other in batches[1:])): + raise ArrowException("Error converting list of RecordBatches to " + "DataFrame, not all schemas are equal") + + cdef int K = batches[0].num_columns + + # create chunked columns from the batches + c_columns.resize(K) + for i in range(K): + for batch in batches: + arr = batch[i] + c_array_chunks.push_back(arr.sp_array) + c_columns[i].reset(new CColumn(schema.sp_schema.get().field(i), + c_array_chunks)) + c_array_chunks.clear() + + # create a Table from columns and convert to DataFrame + c_table.reset(new CTable('', schema.sp_schema, c_columns)) + table = Table() + table.init(c_table) + return table.to_pandas() + + cdef class Table: """ A collection of top-level named, equal length Arrow arrays. diff --git a/python/pyarrow/tests/test_table.py b/python/pyarrow/tests/test_table.py index 4c9d302106a..dc4f37a830e 100644 --- a/python/pyarrow/tests/test_table.py +++ b/python/pyarrow/tests/test_table.py @@ -19,6 +19,7 @@ from pandas.util.testing import assert_frame_equal import pandas as pd +import pytest import pyarrow as pa @@ -50,6 +51,40 @@ def test_recordbatch_from_to_pandas(): assert_frame_equal(data, result) +def test_recordbatchlist_to_pandas(): + data1 = pd.DataFrame({ + 'c1': np.array([1, 1, 2], dtype='uint32'), + 'c2': np.array([1.0, 2.0, 3.0], dtype='float64'), + 'c3': [True, None, False], + 'c4': ['foo', 'bar', None] + }) + + data2 = pd.DataFrame({ + 'c1': np.array([3, 5], dtype='uint32'), + 'c2': np.array([4.0, 5.0], dtype='float64'), + 'c3': [True, True], + 'c4': ['baz', 'qux'] + }) + + batch1 = pa.RecordBatch.from_pandas(data1) + batch2 = pa.RecordBatch.from_pandas(data2) + + result = pa.dataframe_from_batches([batch1, batch2]) + data = pd.concat([data1, data2], ignore_index=True) + assert_frame_equal(data, result) + + +def test_recordbatchlist_schema_equals(): + data1 = pd.DataFrame({'c1': np.array([1], dtype='uint32')}) + data2 = pd.DataFrame({'c1': np.array([4.0, 5.0], dtype='float64')}) + + batch1 = pa.RecordBatch.from_pandas(data1) + batch2 = pa.RecordBatch.from_pandas(data2) + + with pytest.raises(pa.ArrowException): + pa.dataframe_from_batches([batch1, batch2]) + + def test_table_basics(): data = [ pa.from_pylist(range(5)), diff --git a/python/src/pyarrow/adapters/pandas.cc b/python/src/pyarrow/adapters/pandas.cc index 1f5b7009e6a..adb27e83ef1 100644 --- a/python/src/pyarrow/adapters/pandas.cc +++ b/python/src/pyarrow/adapters/pandas.cc @@ -597,14 +597,10 @@ class ArrowDeserializer { Status Convert(PyObject** out) { const std::shared_ptr data = col_->data(); - if (data->num_chunks() > 1) { - return Status::NotImplemented("Chunked column conversion NYI"); - } - - auto chunk = data->chunk(0); - RETURN_NOT_OK(ConvertValues(chunk)); + RETURN_NOT_OK(ConvertValues(data)); *out = reinterpret_cast(out_); + return Status::OK(); } @@ -653,28 +649,49 @@ class ArrowDeserializer { return Status::OK(); } + template + Status ConvertValuesZeroCopy(std::shared_ptr arr) { + typedef typename arrow_traits::T T; + + auto prim_arr = static_cast(arr.get()); + auto in_values = reinterpret_cast(prim_arr->data()->data()); + + // Zero-Copy. We can pass the data pointer directly to NumPy. + void* data = const_cast(in_values); + int type = arrow_traits::npy_type; + RETURN_NOT_OK(OutputFromData(type, data)); + + return Status::OK(); + } + template inline typename std::enable_if< arrow_traits::is_pandas_numeric_nullable, Status>::type - ConvertValues(const std::shared_ptr& arr) { + ConvertValues(const std::shared_ptr& data) { typedef typename arrow_traits::T T; + size_t chunk_offset = 0; - arrow::PrimitiveArray* prim_arr = static_cast( - arr.get()); - const T* in_values = reinterpret_cast(prim_arr->data()->data()); + if (data->num_chunks() == 1 && data->null_count() == 0) { + return ConvertValuesZeroCopy(data->chunk(0)); + } + + RETURN_NOT_OK(AllocateOutput(arrow_traits::npy_type)); - if (arr->null_count() > 0) { - RETURN_NOT_OK(AllocateOutput(arrow_traits::npy_type)); + for (int c = 0; c < data->num_chunks(); c++) { + const std::shared_ptr arr = data->chunk(c); + auto prim_arr = static_cast(arr.get()); + auto in_values = reinterpret_cast(prim_arr->data()->data()); + auto out_values = reinterpret_cast(PyArray_DATA(out_)) + chunk_offset; - T* out_values = reinterpret_cast(PyArray_DATA(out_)); - for (int64_t i = 0; i < arr->length(); ++i) { - out_values[i] = arr->IsNull(i) ? arrow_traits::na_value : in_values[i]; + if (arr->null_count() > 0) { + for (int64_t i = 0; i < arr->length(); ++i) { + out_values[i] = arr->IsNull(i) ? arrow_traits::na_value : in_values[i]; + } + } else { + memcpy(out_values, in_values, sizeof(T) * arr->length()); } - } else { - // Zero-Copy. We can pass the data pointer directly to NumPy. - void* data = const_cast(in_values); - int type = arrow_traits::npy_type; - RETURN_NOT_OK(OutputFromData(type, data)); + + chunk_offset += arr->length(); } return Status::OK(); @@ -684,27 +701,43 @@ class ArrowDeserializer { template inline typename std::enable_if< arrow_traits::is_pandas_numeric_not_nullable, Status>::type - ConvertValues(const std::shared_ptr& arr) { + ConvertValues(const std::shared_ptr& data) { typedef typename arrow_traits::T T; + size_t chunk_offset = 0; - arrow::PrimitiveArray* prim_arr = static_cast( - arr.get()); - - const T* in_values = reinterpret_cast(prim_arr->data()->data()); + if (data->num_chunks() == 1 && data->null_count() == 0) { + return ConvertValuesZeroCopy(data->chunk(0)); + } - if (arr->null_count() > 0) { + if (data->null_count() > 0) { RETURN_NOT_OK(AllocateOutput(NPY_FLOAT64)); - // Upcast to double, set NaN as appropriate - double* out_values = reinterpret_cast(PyArray_DATA(out_)); - for (int i = 0; i < arr->length(); ++i) { - out_values[i] = prim_arr->IsNull(i) ? NAN : in_values[i]; + for (int c = 0; c < data->num_chunks(); c++) { + const std::shared_ptr arr = data->chunk(c); + auto prim_arr = static_cast(arr.get()); + auto in_values = reinterpret_cast(prim_arr->data()->data()); + // Upcast to double, set NaN as appropriate + auto out_values = reinterpret_cast(PyArray_DATA(out_)) + chunk_offset; + + for (int i = 0; i < arr->length(); ++i) { + out_values[i] = prim_arr->IsNull(i) ? NAN : in_values[i]; + } + + chunk_offset += arr->length(); } } else { - // Zero-Copy. We can pass the data pointer directly to NumPy. - void* data = const_cast(in_values); - int type = arrow_traits::npy_type; - RETURN_NOT_OK(OutputFromData(type, data)); + RETURN_NOT_OK(AllocateOutput(arrow_traits::npy_type)); + + for (int c = 0; c < data->num_chunks(); c++) { + const std::shared_ptr arr = data->chunk(c); + auto prim_arr = static_cast(arr.get()); + auto in_values = reinterpret_cast(prim_arr->data()->data()); + auto out_values = reinterpret_cast(PyArray_DATA(out_)) + chunk_offset; + + memcpy(out_values, in_values, sizeof(T) * arr->length()); + + chunk_offset += arr->length(); + } } return Status::OK(); @@ -714,35 +747,48 @@ class ArrowDeserializer { template inline typename std::enable_if< arrow_traits::is_boolean, Status>::type - ConvertValues(const std::shared_ptr& arr) { + ConvertValues(const std::shared_ptr& data) { + size_t chunk_offset = 0; PyAcquireGIL lock; - arrow::BooleanArray* bool_arr = static_cast(arr.get()); - - if (arr->null_count() > 0) { + if (data->null_count() > 0) { RETURN_NOT_OK(AllocateOutput(NPY_OBJECT)); - PyObject** out_values = reinterpret_cast(PyArray_DATA(out_)); - for (int64_t i = 0; i < arr->length(); ++i) { - if (bool_arr->IsNull(i)) { - Py_INCREF(Py_None); - out_values[i] = Py_None; - } else if (bool_arr->Value(i)) { - // True - Py_INCREF(Py_True); - out_values[i] = Py_True; - } else { - // False - Py_INCREF(Py_False); - out_values[i] = Py_False; + for (int c = 0; c < data->num_chunks(); c++) { + const std::shared_ptr arr = data->chunk(c); + auto bool_arr = static_cast(arr.get()); + auto out_values = reinterpret_cast(PyArray_DATA(out_)) + chunk_offset; + + for (int64_t i = 0; i < arr->length(); ++i) { + if (bool_arr->IsNull(i)) { + Py_INCREF(Py_None); + out_values[i] = Py_None; + } else if (bool_arr->Value(i)) { + // True + Py_INCREF(Py_True); + out_values[i] = Py_True; + } else { + // False + Py_INCREF(Py_False); + out_values[i] = Py_False; + } } + + chunk_offset += bool_arr->length(); } } else { RETURN_NOT_OK(AllocateOutput(arrow_traits::npy_type)); - uint8_t* out_values = reinterpret_cast(PyArray_DATA(out_)); - for (int64_t i = 0; i < arr->length(); ++i) { - out_values[i] = static_cast(bool_arr->Value(i)); + for (int c = 0; c < data->num_chunks(); c++) { + const std::shared_ptr arr = data->chunk(c); + auto bool_arr = static_cast(arr.get()); + auto out_values = reinterpret_cast(PyArray_DATA(out_)) + chunk_offset; + + for (int64_t i = 0; i < arr->length(); ++i) { + out_values[i] = static_cast(bool_arr->Value(i)); + } + + chunk_offset += bool_arr->length(); } } @@ -753,42 +799,49 @@ class ArrowDeserializer { template inline typename std::enable_if< T2 == arrow::Type::STRING, Status>::type - ConvertValues(const std::shared_ptr& arr) { + ConvertValues(const std::shared_ptr& data) { + size_t chunk_offset = 0; PyAcquireGIL lock; RETURN_NOT_OK(AllocateOutput(NPY_OBJECT)); - PyObject** out_values = reinterpret_cast(PyArray_DATA(out_)); - - arrow::StringArray* string_arr = static_cast(arr.get()); - - const uint8_t* data; - int32_t length; - if (arr->null_count() > 0) { - for (int64_t i = 0; i < arr->length(); ++i) { - if (string_arr->IsNull(i)) { - Py_INCREF(Py_None); - out_values[i] = Py_None; - } else { - data = string_arr->GetValue(i, &length); - - out_values[i] = make_pystring(data, length); + for (int c = 0; c < data->num_chunks(); c++) { + const std::shared_ptr arr = data->chunk(c); + auto string_arr = static_cast(arr.get()); + auto out_values = reinterpret_cast(PyArray_DATA(out_)) + chunk_offset; + + const uint8_t* data_ptr; + int32_t length; + if (data->null_count() > 0) { + for (int64_t i = 0; i < arr->length(); ++i) { + if (string_arr->IsNull(i)) { + Py_INCREF(Py_None); + out_values[i] = Py_None; + } else { + data_ptr = string_arr->GetValue(i, &length); + + out_values[i] = make_pystring(data_ptr, length); + if (out_values[i] == nullptr) { + return Status::UnknownError("String initialization failed"); + } + } + } + } else { + for (int64_t i = 0; i < arr->length(); ++i) { + data_ptr = string_arr->GetValue(i, &length); + out_values[i] = make_pystring(data_ptr, length); if (out_values[i] == nullptr) { return Status::UnknownError("String initialization failed"); } } } - } else { - for (int64_t i = 0; i < arr->length(); ++i) { - data = string_arr->GetValue(i, &length); - out_values[i] = make_pystring(data, length); - if (out_values[i] == nullptr) { - return Status::UnknownError("String initialization failed"); - } - } + + chunk_offset += string_arr->length(); } + return Status::OK(); } + private: std::shared_ptr col_; PyObject* py_ref_;