From 6d05e34afddccf451dfc7b36fcd5c153d100ae68 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kriszti=C3=A1n=20Sz=C5=B1cs?= Date: Fri, 18 Jun 2021 15:45:51 +0200 Subject: [PATCH 01/14] Fix auto chunking --- cpp/src/arrow/python/inference.cc | 9 +-- cpp/src/arrow/python/iterators.h | 19 +++--- cpp/src/arrow/python/numpy_internal.h | 2 +- cpp/src/arrow/python/python_to_arrow.cc | 46 ++++++++----- cpp/src/arrow/util/converter.h | 70 ++++++++++++++------ python/pyarrow/tests/test_convert_builtin.py | 6 +- python/pyarrow/tests/test_pandas.py | 6 +- 7 files changed, 103 insertions(+), 55 deletions(-) diff --git a/cpp/src/arrow/python/inference.cc b/cpp/src/arrow/python/inference.cc index 9d6707aa11d..5086815f84f 100644 --- a/cpp/src/arrow/python/inference.cc +++ b/cpp/src/arrow/python/inference.cc @@ -379,12 +379,13 @@ class TypeInferrer { // Infer value type from a sequence of values Status VisitSequence(PyObject* obj, PyObject* mask = nullptr) { if (mask == nullptr || mask == Py_None) { - return internal::VisitSequence(obj, [this](PyObject* value, bool* keep_going) { - return Visit(value, keep_going); - }); + return internal::VisitSequence( + obj, /*offset=*/0, + [this](PyObject* value, bool* keep_going) { return Visit(value, keep_going); }); } else { return internal::VisitSequenceMasked( - obj, mask, [this](PyObject* value, uint8_t masked, bool* keep_going) { + obj, mask, /*offset=*/0, + [this](PyObject* value, uint8_t masked, bool* keep_going) { if (!masked) { return Visit(value, keep_going); } else { diff --git a/cpp/src/arrow/python/iterators.h b/cpp/src/arrow/python/iterators.h index 6b0b55342a5..58213ee2dbc 100644 --- a/cpp/src/arrow/python/iterators.h +++ b/cpp/src/arrow/python/iterators.h @@ -36,7 +36,7 @@ namespace internal { // // If keep_going is set to false, the iteration terminates template -inline Status VisitSequenceGeneric(PyObject* obj, VisitorFunc&& func) { +inline Status VisitSequenceGeneric(PyObject* obj, int64_t offset, VisitorFunc&& func) { // VisitorFunc may set to false to terminate iteration bool keep_going = true; @@ -49,7 +49,7 @@ inline Status VisitSequenceGeneric(PyObject* obj, VisitorFunc&& func) { if (PyArray_DESCR(arr_obj)->type_num == NPY_OBJECT) { // It's an array object, we can fetch object pointers directly const Ndarray1DIndexer objects(arr_obj); - for (int64_t i = 0; keep_going && i < objects.size(); ++i) { + for (int64_t i = offset; keep_going && i < objects.size(); ++i) { RETURN_NOT_OK(func(objects[i], i, &keep_going)); } return Status::OK(); @@ -64,7 +64,7 @@ inline Status VisitSequenceGeneric(PyObject* obj, VisitorFunc&& func) { if (PyList_Check(obj) || PyTuple_Check(obj)) { // Use fast item access const Py_ssize_t size = PySequence_Fast_GET_SIZE(obj); - for (Py_ssize_t i = 0; keep_going && i < size; ++i) { + for (Py_ssize_t i = offset; keep_going && i < size; ++i) { PyObject* value = PySequence_Fast_GET_ITEM(obj, i); RETURN_NOT_OK(func(value, static_cast(i), &keep_going)); } @@ -72,7 +72,7 @@ inline Status VisitSequenceGeneric(PyObject* obj, VisitorFunc&& func) { // Regular sequence: avoid making a potentially large copy const Py_ssize_t size = PySequence_Size(obj); RETURN_IF_PYERROR(); - for (Py_ssize_t i = 0; keep_going && i < size; ++i) { + for (Py_ssize_t i = offset; keep_going && i < size; ++i) { OwnedRef value_ref(PySequence_ITEM(obj, i)); RETURN_IF_PYERROR(); RETURN_NOT_OK(func(value_ref.obj(), static_cast(i), &keep_going)); @@ -86,16 +86,17 @@ inline Status VisitSequenceGeneric(PyObject* obj, VisitorFunc&& func) { // Visit sequence with no null mask template -inline Status VisitSequence(PyObject* obj, VisitorFunc&& func) { +inline Status VisitSequence(PyObject* obj, int64_t offset, VisitorFunc&& func) { return VisitSequenceGeneric( - obj, [&func](PyObject* value, int64_t i /* unused */, bool* keep_going) { + obj, offset, [&func](PyObject* value, int64_t i /* unused */, bool* keep_going) { return func(value, keep_going); }); } /// Visit sequence with null mask template -inline Status VisitSequenceMasked(PyObject* obj, PyObject* mo, VisitorFunc&& func) { +inline Status VisitSequenceMasked(PyObject* obj, PyObject* mo, int64_t offset, + VisitorFunc&& func) { if (mo == nullptr || !PyArray_Check(mo)) { return Status::Invalid("Null mask must be NumPy array"); } @@ -115,7 +116,7 @@ inline Status VisitSequenceMasked(PyObject* obj, PyObject* mo, VisitorFunc&& fun Ndarray1DIndexer mask_values(mask); return VisitSequenceGeneric( - obj, [&func, &mask_values](PyObject* value, int64_t i, bool* keep_going) { + obj, offset, [&func, &mask_values](PyObject* value, int64_t i, bool* keep_going) { return func(value, mask_values[i], keep_going); }); } else { @@ -132,7 +133,7 @@ template inline Status VisitIterable(PyObject* obj, VisitorFunc&& func) { if (PySequence_Check(obj)) { // Numpy arrays fall here as well - return VisitSequence(obj, std::forward(func)); + return VisitSequence(obj, /*offset=*/0, std::forward(func)); } // Fall back on the iterator protocol OwnedRef iter_ref(PyObject_GetIter(obj)); diff --git a/cpp/src/arrow/python/numpy_internal.h b/cpp/src/arrow/python/numpy_internal.h index f43599eb3eb..973f577cb13 100644 --- a/cpp/src/arrow/python/numpy_internal.h +++ b/cpp/src/arrow/python/numpy_internal.h @@ -52,7 +52,7 @@ class Ndarray1DIndexer { int64_t size() const { return PyArray_SIZE(arr_); } - T* data() const { return data_; } + const T* data() const { return reinterpret_cast(data_); } bool is_strided() const { return stride_ != sizeof(T); } diff --git a/cpp/src/arrow/python/python_to_arrow.cc b/cpp/src/arrow/python/python_to_arrow.cc index b2d9f1cb5a3..e8ac9b97d8f 100644 --- a/cpp/src/arrow/python/python_to_arrow.cc +++ b/cpp/src/arrow/python/python_to_arrow.cc @@ -392,22 +392,25 @@ class PyValue { class PyConverter : public Converter { public: // Iterate over the input values and defer the conversion to the Append method - Status Extend(PyObject* values, int64_t size) override { + Status Extend(PyObject* values, int64_t size, int64_t offset = 0) override { + DCHECK_GE(size, offset); /// Ensure we've allocated enough space - RETURN_NOT_OK(this->Reserve(size)); + RETURN_NOT_OK(this->Reserve(size - offset)); // Iterate over the items adding each one - return internal::VisitSequence(values, [this](PyObject* item, bool* /* unused */) { - return this->Append(item); - }); + return internal::VisitSequence( + values, offset, + [this](PyObject* item, bool* /* unused */) { return this->Append(item); }); } // Convert and append a sequence of values masked with a numpy array - Status ExtendMasked(PyObject* values, PyObject* mask, int64_t size) override { + Status ExtendMasked(PyObject* values, PyObject* mask, int64_t size, + int64_t offset = 0) override { + DCHECK_GE(size, offset); /// Ensure we've allocated enough space - RETURN_NOT_OK(this->Reserve(size)); + RETURN_NOT_OK(this->Reserve(size - offset)); // Iterate over the items adding each one return internal::VisitSequenceMasked( - values, mask, [this](PyObject* item, bool is_masked, bool* /* unused */) { + values, mask, offset, [this](PyObject* item, bool is_masked, bool* /* unused */) { if (is_masked) { return this->AppendNull(); } else { @@ -578,6 +581,9 @@ class PyPrimitiveConverter> // observed binary value observed_binary_ = true; } + // Since we don't know the varying length input size in advance, we need to + // reserve space in the value builder one by one. ReserveData raises CapacityError + // if the value would not fit into the array. ARROW_RETURN_NOT_OK(this->primitive_builder_->ReserveData(view_.size)); this->primitive_builder_->UnsafeAppend(view_.bytes, static_cast(view_.size)); @@ -655,6 +661,8 @@ class PyListConverter : public ListConverter { return ValidateBuilder(this->list_type_); } + bool rewind_on_capacity_error() const override { return true; } + protected: Status ValidateBuilder(const MapType*) { if (this->list_builder_->key_builder()->null_count() > 0) { @@ -669,7 +677,8 @@ class PyListConverter : public ListConverter { Status AppendSequence(PyObject* value) { int64_t size = static_cast(PySequence_Size(value)); RETURN_NOT_OK(this->list_builder_->ValidateOverflow(size)); - return this->value_converter_->Extend(value, size); + RETURN_NOT_OK(this->value_converter_->Extend(value, size)); + return Status::OK(); } Status AppendNdarray(PyObject* value) { @@ -684,12 +693,13 @@ class PyListConverter : public ListConverter { switch (value_type->id()) { // If the value type does not match the expected NumPy dtype, then fall through // to a slower PySequence-based path -#define LIST_FAST_CASE(TYPE_ID, TYPE, NUMPY_TYPE) \ - case Type::TYPE_ID: { \ - if (PyArray_DESCR(ndarray)->type_num != NUMPY_TYPE) { \ - return this->value_converter_->Extend(value, size); \ - } \ - return AppendNdarrayTyped(ndarray); \ +#define LIST_FAST_CASE(TYPE_ID, TYPE, NUMPY_TYPE) \ + case Type::TYPE_ID: { \ + if (PyArray_DESCR(ndarray)->type_num != NUMPY_TYPE) { \ + RETURN_NOT_OK(this->value_converter_->Extend(value, size)); \ + return Status::OK(); \ + } \ + return AppendNdarrayTyped(ndarray); \ } LIST_FAST_CASE(BOOL, BooleanType, NPY_BOOL) LIST_FAST_CASE(UINT8, UInt8Type, NPY_UINT8) @@ -707,7 +717,8 @@ class PyListConverter : public ListConverter { LIST_FAST_CASE(DURATION, DurationType, NPY_TIMEDELTA) #undef LIST_FAST_CASE default: { - return this->value_converter_->Extend(value, size); + RETURN_NOT_OK(this->value_converter_->Extend(value, size)); + return Status::OK(); } } } @@ -728,7 +739,6 @@ class PyListConverter : public ListConverter { auto value_builder = checked_cast(this->value_converter_->builder().get()); - // TODO(wesm): Vector append when not strided Ndarray1DIndexer values(ndarray); if (null_sentinels_possible) { for (int64_t i = 0; i < values.size(); ++i) { @@ -738,6 +748,8 @@ class PyListConverter : public ListConverter { RETURN_NOT_OK(value_builder->Append(values[i])); } } + } else if (!values.is_strided()) { + RETURN_NOT_OK(value_builder->AppendValues(values.data(), values.size())); } else { for (int64_t i = 0; i < values.size(); ++i) { RETURN_NOT_OK(value_builder->Append(values[i])); diff --git a/cpp/src/arrow/util/converter.h b/cpp/src/arrow/util/converter.h index 2c40a48726b..6b5ba62eef0 100644 --- a/cpp/src/arrow/util/converter.h +++ b/cpp/src/arrow/util/converter.h @@ -54,11 +54,12 @@ class Converter { virtual Status Append(InputType value) { return Status::NotImplemented("Append"); } - virtual Status Extend(InputType values, int64_t size) { + virtual Status Extend(InputType values, int64_t size, int64_t offset = 0) { return Status::NotImplemented("Extend"); } - virtual Status ExtendMasked(InputType values, InputType mask, int64_t size) { + virtual Status ExtendMasked(InputType values, InputType mask, int64_t size, + int64_t offset = 0) { return Status::NotImplemented("ExtendMasked"); } @@ -70,6 +71,8 @@ class Converter { bool may_overflow() const { return may_overflow_; } + virtual bool rewind_on_capacity_error() const { return false; } + virtual Status Reserve(int64_t additional_capacity) { return builder_->Reserve(additional_capacity); } @@ -302,32 +305,59 @@ class Chunker { return status; } - // we could get bit smarter here since the whole batch of appendable values - // will be rejected if a capacity error is raised - Status Extend(InputType values, int64_t size) { - auto status = converter_->Extend(values, size); - if (ARROW_PREDICT_FALSE(status.IsCapacityError())) { - if (converter_->builder()->length() == 0) { + Status Extend(InputType values, int64_t size, int64_t offset = 0) { + while (offset < size) { + auto length_before = converter_->builder()->length(); + auto status = converter_->Extend(values, size, offset); + auto length_after = converter_->builder()->length(); + auto num_converted = length_after - length_before; + + offset += num_converted; + length_ += num_converted; + + if (status.IsCapacityError()) { + if (converter_->builder()->length() == 0) { + // Builder length == 0 means the individual element is too large to append. + // In this case, no need to try again. + return status; + } else if (converter_->rewind_on_capacity_error()) { + length_ -= 1; + offset -= 1; + } + ARROW_RETURN_NOT_OK(FinishChunk()); + } else if (!status.ok()) { return status; } - ARROW_RETURN_NOT_OK(FinishChunk()); - return Extend(values, size); } - length_ += size; - return status; + return Status::OK(); } - Status ExtendMasked(InputType values, InputType mask, int64_t size) { - auto status = converter_->ExtendMasked(values, mask, size); - if (ARROW_PREDICT_FALSE(status.IsCapacityError())) { - if (converter_->builder()->length() == 0) { + Status ExtendMasked(InputType values, InputType mask, int64_t size, + int64_t offset = 0) { + while (offset < size) { + auto length_before = converter_->builder()->length(); + auto status = converter_->ExtendMasked(values, mask, size, offset); + auto length_after = converter_->builder()->length(); + auto num_converted = length_after - length_before; + + offset += num_converted; + length_ += num_converted; + + if (status.IsCapacityError()) { + if (converter_->builder()->length() == 0) { + // Builder length == 0 means the individual element is too large to append. + // In this case, no need to try again. + return status; + } else if (converter_->rewind_on_capacity_error()) { + length_ -= 1; + offset -= 1; + } + ARROW_RETURN_NOT_OK(FinishChunk()); + } else if (!status.ok()) { return status; } - ARROW_RETURN_NOT_OK(FinishChunk()); - return ExtendMasked(values, mask, size); } - length_ += size; - return status; + return Status::OK(); } Status FinishChunk() { diff --git a/python/pyarrow/tests/test_convert_builtin.py b/python/pyarrow/tests/test_convert_builtin.py index ba94b340bd3..55e30626cbc 100644 --- a/python/pyarrow/tests/test_convert_builtin.py +++ b/python/pyarrow/tests/test_convert_builtin.py @@ -2185,7 +2185,11 @@ def test_auto_chunking_list_like(): assert arr.num_chunks == 2 assert len(arr.chunk(0)) == 7 assert len(arr.chunk(1)) == 1 - assert arr.chunk(1)[0].as_py() == list(item) + chunk = arr.chunk(1) + scalar = chunk[0] + assert isinstance(scalar, pa.ListScalar) + expected = pa.array(item, type=pa.uint8()) + assert scalar.values == expected @pytest.mark.slow diff --git a/python/pyarrow/tests/test_pandas.py b/python/pyarrow/tests/test_pandas.py index 7f904433fa2..47363a90836 100644 --- a/python/pyarrow/tests/test_pandas.py +++ b/python/pyarrow/tests/test_pandas.py @@ -2171,16 +2171,16 @@ def test_list_of_dictionary(self): @pytest.mark.large_memory def test_auto_chunking_on_list_overflow(self): # ARROW-9976 - n = 2**24 + n = 2**21 df = pd.DataFrame.from_dict({ - "a": list(np.zeros((n, 2**7), dtype='uint8')), + "a": list(np.zeros((n, 2**10), dtype='uint8')), "b": range(n) }) table = pa.Table.from_pandas(df) column_a = table[0] assert column_a.num_chunks == 2 - assert len(column_a.chunk(0)) == 2**24 - 1 + assert len(column_a.chunk(0)) == 2**21 - 1 assert len(column_a.chunk(1)) == 1 def test_map_array_roundtrip(self): From 4b1e7717ea5bac1f033d2fb43deb5fa818902437 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kriszti=C3=A1n=20Sz=C5=B1cs?= Date: Fri, 18 Jun 2021 16:03:54 +0200 Subject: [PATCH 02/14] Consolidate binary-like converters --- cpp/src/arrow/python/python_to_arrow.cc | 43 ++++--------------------- 1 file changed, 7 insertions(+), 36 deletions(-) diff --git a/cpp/src/arrow/python/python_to_arrow.cc b/cpp/src/arrow/python/python_to_arrow.cc index e8ac9b97d8f..f3b82113520 100644 --- a/cpp/src/arrow/python/python_to_arrow.cc +++ b/cpp/src/arrow/python/python_to_arrow.cc @@ -517,34 +517,6 @@ class PyPrimitiveConverter< } }; -template -class PyPrimitiveConverter> - : public PrimitiveConverter { - public: - using OffsetType = typename T::offset_type; - - Status Append(PyObject* value) override { - if (PyValue::IsNull(this->options_, value)) { - this->primitive_builder_->UnsafeAppendNull(); - } else { - ARROW_RETURN_NOT_OK( - PyValue::Convert(this->primitive_type_, this->options_, value, view_)); - // Since we don't know the varying length input size in advance, we need to - // reserve space in the value builder one by one. ReserveData raises CapacityError - // if the value would not fit into the array. - ARROW_RETURN_NOT_OK(this->primitive_builder_->ReserveData(view_.size)); - this->primitive_builder_->UnsafeAppend(view_.bytes, - static_cast(view_.size)); - } - return Status::OK(); - } - - protected: - // Create a single instance of PyBytesView here to prevent unnecessary object - // creation/destruction. This significantly improves the conversion performance. - PyBytesView view_; -}; - template class PyPrimitiveConverter::value>> : public PrimitiveConverter { @@ -566,7 +538,7 @@ class PyPrimitiveConverter:: }; template -class PyPrimitiveConverter> +class PyPrimitiveConverter> : public PrimitiveConverter { public: using OffsetType = typename T::offset_type; @@ -693,13 +665,12 @@ class PyListConverter : public ListConverter { switch (value_type->id()) { // If the value type does not match the expected NumPy dtype, then fall through // to a slower PySequence-based path -#define LIST_FAST_CASE(TYPE_ID, TYPE, NUMPY_TYPE) \ - case Type::TYPE_ID: { \ - if (PyArray_DESCR(ndarray)->type_num != NUMPY_TYPE) { \ - RETURN_NOT_OK(this->value_converter_->Extend(value, size)); \ - return Status::OK(); \ - } \ - return AppendNdarrayTyped(ndarray); \ +#define LIST_FAST_CASE(TYPE_ID, TYPE, NUMPY_TYPE) \ + case Type::TYPE_ID: { \ + if (PyArray_DESCR(ndarray)->type_num != NUMPY_TYPE) { \ + return this->value_converter_->Extend(value, size); \ + } \ + return AppendNdarrayTyped(ndarray); \ } LIST_FAST_CASE(BOOL, BooleanType, NPY_BOOL) LIST_FAST_CASE(UINT8, UInt8Type, NPY_UINT8) From 7839fc7fd063ef8cb07016f2ed6c21b1fe0f21cc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kriszti=C3=A1n=20Sz=C5=B1cs?= Date: Fri, 18 Jun 2021 17:06:20 +0200 Subject: [PATCH 03/14] Copy new tests cases from the original PR --- cpp/src/arrow/array/builder_binary.h | 7 --- python/pyarrow/tests/test_convert_builtin.py | 47 ++++++++++++++++++++ 2 files changed, 47 insertions(+), 7 deletions(-) diff --git a/cpp/src/arrow/array/builder_binary.h b/cpp/src/arrow/array/builder_binary.h index a60031258ad..c1c664a1249 100644 --- a/cpp/src/arrow/array/builder_binary.h +++ b/cpp/src/arrow/array/builder_binary.h @@ -291,14 +291,7 @@ class BaseBinaryBuilder : public ArrayBuilder { } Status Resize(int64_t capacity) override { - // XXX Why is this check necessary? There is no reason to disallow, say, - // binary arrays with more than 2**31 empty or null values. - if (capacity > memory_limit()) { - return Status::CapacityError("BinaryBuilder cannot reserve space for more than ", - memory_limit(), " child elements, got ", capacity); - } ARROW_RETURN_NOT_OK(CheckCapacity(capacity)); - // One more than requested for offsets ARROW_RETURN_NOT_OK(offsets_builder_.Resize(capacity + 1)); return ArrayBuilder::Resize(capacity); diff --git a/python/pyarrow/tests/test_convert_builtin.py b/python/pyarrow/tests/test_convert_builtin.py index 55e30626cbc..23bb4d2a0a8 100644 --- a/python/pyarrow/tests/test_convert_builtin.py +++ b/python/pyarrow/tests/test_convert_builtin.py @@ -2236,3 +2236,50 @@ def test_nested_auto_chunking(ty, char): 'integer': 1, 'string-like': char } + + +@pytest.mark.slow +@pytest.mark.large_memory +def test_array_from_pylist_data_overflow(): + # Regression test for ARROW-12983 + # Data buffer overflow - should result in chunked array + items = [b'a' * 4096] * (2 ** 19) + arr = pa.array(items, type=pa.string()) + assert isinstance(arr, pa.ChunkedArray) + assert len(arr) == 2**19 + assert len(arr.chunks) > 1 + + mask = np.zeros(2**19, bool) + arr = pa.array(items, mask=mask, type=pa.string()) + assert isinstance(arr, pa.ChunkedArray) + assert len(arr) == 2**19 + assert len(arr.chunks) > 1 + + arr = pa.array(items, type=pa.binary()) + assert isinstance(arr, pa.ChunkedArray) + assert len(arr) == 2**19 + assert len(arr.chunks) > 1 + + +@pytest.mark.slow +@pytest.mark.large_memory +def test_array_from_pylist_offset_overflow(): + # Regression test for ARROW-12983 + # Offset buffer overflow - should result in chunked array + # Note this doesn't apply to primitive arrays + items = [b'a'] * (2 ** 31) + arr = pa.array(items, type=pa.string()) + assert isinstance(arr, pa.ChunkedArray) + assert len(arr) == 2**31 + assert len(arr.chunks) > 1 + + mask = np.zeros(2**31, bool) + arr = pa.array(items, mask=mask, type=pa.string()) + assert isinstance(arr, pa.ChunkedArray) + assert len(arr) == 2**31 + assert len(arr.chunks) > 1 + + arr = pa.array(items, type=pa.binary()) + assert isinstance(arr, pa.ChunkedArray) + assert len(arr) == 2**31 + assert len(arr.chunks) > 1 From 04fd121b65b163595f70f8c41ee399bae949f3f1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kriszti=C3=A1n=20Sz=C5=B1cs?= Date: Fri, 18 Jun 2021 18:03:26 +0200 Subject: [PATCH 04/14] Fix chunking for the struct types --- cpp/src/arrow/python/python_to_arrow.cc | 2 -- cpp/src/arrow/util/converter.h | 11 +++++++---- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/cpp/src/arrow/python/python_to_arrow.cc b/cpp/src/arrow/python/python_to_arrow.cc index f3b82113520..e6c8d306ccd 100644 --- a/cpp/src/arrow/python/python_to_arrow.cc +++ b/cpp/src/arrow/python/python_to_arrow.cc @@ -633,8 +633,6 @@ class PyListConverter : public ListConverter { return ValidateBuilder(this->list_type_); } - bool rewind_on_capacity_error() const override { return true; } - protected: Status ValidateBuilder(const MapType*) { if (this->list_builder_->key_builder()->null_count() > 0) { diff --git a/cpp/src/arrow/util/converter.h b/cpp/src/arrow/util/converter.h index 6b5ba62eef0..1861476d585 100644 --- a/cpp/src/arrow/util/converter.h +++ b/cpp/src/arrow/util/converter.h @@ -71,7 +71,7 @@ class Converter { bool may_overflow() const { return may_overflow_; } - virtual bool rewind_on_capacity_error() const { return false; } + bool rewind_on_error() const { return rewind_on_error_; } virtual Status Reserve(int64_t additional_capacity) { return builder_->Reserve(additional_capacity); @@ -99,6 +99,7 @@ class Converter { std::shared_ptr builder_; OptionsType options_; bool may_overflow_ = false; + bool rewind_on_error_ = false; }; template @@ -137,7 +138,8 @@ class ListConverter : public BaseConverter { std::make_shared(pool, value_converter_->builder(), this->type_); list_builder_ = checked_cast(this->builder_.get()); // Narrow list types may overflow - this->may_overflow_ = sizeof(typename ArrowType::offset_type) < sizeof(int64_t); + this->may_overflow_ = this->rewind_on_error_ = + sizeof(typename ArrowType::offset_type) < sizeof(int64_t); return Status::OK(); } @@ -170,6 +172,7 @@ class StructConverter : public BaseConverter { (MakeConverter( field->type(), this->options_, pool))); this->may_overflow_ |= child_converter->may_overflow(); + this->rewind_on_error_ = this->may_overflow_; child_builders.push_back(child_converter->builder()); children_.push_back(std::move(child_converter)); } @@ -320,7 +323,7 @@ class Chunker { // Builder length == 0 means the individual element is too large to append. // In this case, no need to try again. return status; - } else if (converter_->rewind_on_capacity_error()) { + } else if (converter_->rewind_on_error()) { length_ -= 1; offset -= 1; } @@ -348,7 +351,7 @@ class Chunker { // Builder length == 0 means the individual element is too large to append. // In this case, no need to try again. return status; - } else if (converter_->rewind_on_capacity_error()) { + } else if (converter_->rewind_on_error()) { length_ -= 1; offset -= 1; } From 27a766090b53006f86cf9bc80eeeb590a10a6735 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kriszti=C3=A1n=20Sz=C5=B1cs?= Date: Fri, 18 Jun 2021 18:05:37 +0200 Subject: [PATCH 05/14] Temporarily enable large memory tests to see whether GHA tolerates it or not --- .github/workflows/python.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.github/workflows/python.yml b/.github/workflows/python.yml index 981fd61c029..e89329d8948 100644 --- a/.github/workflows/python.yml +++ b/.github/workflows/python.yml @@ -36,6 +36,8 @@ concurrency: cancel-in-progress: true env: + PYARROW_TEST_SLOW: ON + PYARROW_TEST_LARGE_MEMORY: ON DOCKER_VOLUME_PREFIX: ".docker/" ARCHERY_DOCKER_USER: ${{ secrets.DOCKERHUB_USER }} ARCHERY_DOCKER_PASSWORD: ${{ secrets.DOCKERHUB_TOKEN }} From 6acc8b033abb201a2f7106302bab8edf4a45198c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kriszti=C3=A1n=20Sz=C5=B1cs?= Date: Fri, 18 Jun 2021 18:08:35 +0200 Subject: [PATCH 06/14] Better name for the rewind flag --- cpp/src/arrow/util/converter.h | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/cpp/src/arrow/util/converter.h b/cpp/src/arrow/util/converter.h index 1861476d585..4a233a5cecf 100644 --- a/cpp/src/arrow/util/converter.h +++ b/cpp/src/arrow/util/converter.h @@ -71,7 +71,7 @@ class Converter { bool may_overflow() const { return may_overflow_; } - bool rewind_on_error() const { return rewind_on_error_; } + bool rewind_on_overflow() const { return rewind_on_overflow_; } virtual Status Reserve(int64_t additional_capacity) { return builder_->Reserve(additional_capacity); @@ -99,7 +99,7 @@ class Converter { std::shared_ptr builder_; OptionsType options_; bool may_overflow_ = false; - bool rewind_on_error_ = false; + bool rewind_on_overflow_ = false; }; template @@ -138,7 +138,7 @@ class ListConverter : public BaseConverter { std::make_shared(pool, value_converter_->builder(), this->type_); list_builder_ = checked_cast(this->builder_.get()); // Narrow list types may overflow - this->may_overflow_ = this->rewind_on_error_ = + this->may_overflow_ = this->rewind_on_overflow_ = sizeof(typename ArrowType::offset_type) < sizeof(int64_t); return Status::OK(); } @@ -172,7 +172,7 @@ class StructConverter : public BaseConverter { (MakeConverter( field->type(), this->options_, pool))); this->may_overflow_ |= child_converter->may_overflow(); - this->rewind_on_error_ = this->may_overflow_; + this->rewind_on_overflow_ = this->may_overflow_; child_builders.push_back(child_converter->builder()); children_.push_back(std::move(child_converter)); } @@ -323,7 +323,7 @@ class Chunker { // Builder length == 0 means the individual element is too large to append. // In this case, no need to try again. return status; - } else if (converter_->rewind_on_error()) { + } else if (converter_->rewind_on_overflow()) { length_ -= 1; offset -= 1; } @@ -351,7 +351,7 @@ class Chunker { // Builder length == 0 means the individual element is too large to append. // In this case, no need to try again. return status; - } else if (converter_->rewind_on_error()) { + } else if (converter_->rewind_on_overflow()) { length_ -= 1; offset -= 1; } From f81657ad95d9796f63018bc861526d00c58fbaa9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kriszti=C3=A1n=20Sz=C5=B1cs?= Date: Fri, 18 Jun 2021 18:11:53 +0200 Subject: [PATCH 07/14] Update R bindings --- r/src/r_to_arrow.cpp | 133 ++++++++++++++++++++++--------------------- 1 file changed, 69 insertions(+), 64 deletions(-) diff --git a/r/src/r_to_arrow.cpp b/r/src/r_to_arrow.cpp index d0f4f3a6def..683e8f278e8 100644 --- a/r/src/r_to_arrow.cpp +++ b/r/src/r_to_arrow.cpp @@ -326,7 +326,7 @@ class RConverter : public Converter { public: virtual Status Append(SEXP) { return Status::NotImplemented("Append"); } - virtual Status Extend(SEXP values, int64_t size) { + virtual Status Extend(SEXP values, int64_t size, int64_t offset = 0) { return Status::NotImplemented("Extend"); } @@ -337,7 +337,7 @@ class RConverter : public Converter { tasks.Append(false, task); } - virtual Status ExtendMasked(SEXP values, SEXP mask, int64_t size) { + virtual Status ExtendMasked(SEXP values, SEXP mask, int64_t size, int64_t offset = 0) { return Status::NotImplemented("ExtendMasked"); } }; @@ -434,8 +434,8 @@ template class RPrimitiveConverter> : public PrimitiveConverter { public: - Status Extend(SEXP, int64_t size) override { - return this->primitive_builder_->AppendNulls(size); + Status Extend(SEXP, int64_t size, int64_t offset = 0) override { + return this->primitive_builder_->AppendNulls(size - offset); } }; @@ -445,17 +445,17 @@ class RPrimitiveConverter< T, enable_if_t::value || is_floating_type::value>> : public PrimitiveConverter { public: - Status Extend(SEXP x, int64_t size) override { + Status Extend(SEXP x, int64_t size, int64_t offset = 0) override { auto rtype = GetVectorType(x); switch (rtype) { case UINT8: - return ExtendDispatch(x, size); + return ExtendDispatch(x, size, offset); case INT32: - return ExtendDispatch(x, size); + return ExtendDispatch(x, size, offset); case FLOAT64: - return ExtendDispatch(x, size); + return ExtendDispatch(x, size, offset); case INT64: - return ExtendDispatch(x, size); + return ExtendDispatch(x, size, offset); default: break; @@ -471,14 +471,14 @@ class RPrimitiveConverter< private: template - Status ExtendDispatch(SEXP x, int64_t size) { + Status ExtendDispatch(SEXP x, int64_t size, int64_t offset) { if (ALTREP(x)) { // `x` is an ALTREP R vector storing `r_value_type` // and that type matches exactly the type of the array this is building - return Extend_impl(RVectorIterator_ALTREP(x, 0), size); + return Extend_impl(RVectorIterator_ALTREP(x, offset), size); } else { // `x` is not an ALTREP vector so we have direct access to a range of values - return Extend_impl(RVectorIterator(x, 0), size); + return Extend_impl(RVectorIterator(x, offset), size); } } @@ -514,16 +514,16 @@ template class RPrimitiveConverter::value>> : public PrimitiveConverter { public: - Status Extend(SEXP x, int64_t size) override { + Status Extend(SEXP x, int64_t size, int64_t offset = 0) override { auto rtype = GetVectorType(x); if (rtype != BOOLEAN) { return Status::Invalid("Expecting a logical vector"); } if (ALTREP(x)) { - return Extend_impl(RVectorIterator_ALTREP(x, 0), size); + return Extend_impl(RVectorIterator_ALTREP(x, offset), size); } else { - return Extend_impl(RVectorIterator(x, 0), size); + return Extend_impl(RVectorIterator(x, offset), size); } } @@ -553,16 +553,16 @@ template class RPrimitiveConverter::value>> : public PrimitiveConverter { public: - Status Extend(SEXP x, int64_t size) override { + Status Extend(SEXP x, int64_t size, int64_t offset = 0) override { switch (GetVectorType(x)) { case DATE_INT: - return AppendRange_Date_dispatch(x, size); + return AppendRange_Date_dispatch(x, size, offset); case DATE_DBL: - return AppendRange_Date_dispatch(x, size); + return AppendRange_Date_dispatch(x, size, offset); case POSIXCT: - return AppendRange_Posixct_dispatch(x, size); + return AppendRange_Posixct_dispatch(x, size, offset); default: break; @@ -578,11 +578,12 @@ class RPrimitiveConverter::value>> private: template - Status AppendRange_Date_dispatch(SEXP x, int64_t size) { + Status AppendRange_Date_dispatch(SEXP x, int64_t size, int64_t offset) { if (ALTREP(x)) { - return AppendRange_Date(RVectorIterator_ALTREP(x, 0), size); + return AppendRange_Date(RVectorIterator_ALTREP(x, offset), + size - offset); } else { - return AppendRange_Date(RVectorIterator(x, 0), size); + return AppendRange_Date(RVectorIterator(x, offset), size - offset); } } @@ -602,11 +603,12 @@ class RPrimitiveConverter::value>> return VisitVector(it, size, append_null, append_value); } - Status AppendRange_Posixct_dispatch(SEXP x, int64_t size) { + Status AppendRange_Posixct_dispatch(SEXP x, int64_t size, int64_t offset) { if (ALTREP(x)) { - return AppendRange_Posixct(RVectorIterator_ALTREP(x, 0), size); + return AppendRange_Posixct(RVectorIterator_ALTREP(x, offset), + size - offset); } else { - return AppendRange_Posixct(RVectorIterator(x, 0), size); + return AppendRange_Posixct(RVectorIterator(x, offset), size - offset); } } @@ -660,8 +662,8 @@ template class RPrimitiveConverter::value>> : public PrimitiveConverter { public: - Status Extend(SEXP x, int64_t size) override { - RETURN_NOT_OK(this->Reserve(size)); + Status Extend(SEXP x, int64_t size, int64_t offset = 0) override { + RETURN_NOT_OK(this->Reserve(size - offset)); auto rtype = GetVectorType(x); if (rtype != TIME) { return Status::Invalid("Invalid conversion to time"); @@ -699,10 +701,11 @@ class RPrimitiveConverter::value>> }; if (ALTREP(x)) { - return VisitVector(RVectorIterator_ALTREP(x, 0), size, append_null, + return VisitVector(RVectorIterator_ALTREP(x, offset), size, append_null, append_value); } else { - return VisitVector(RVectorIterator(x, 0), size, append_null, append_value); + return VisitVector(RVectorIterator(x, offset), size, append_null, + append_value); } } @@ -716,8 +719,8 @@ template class RPrimitiveConverter::value>> : public PrimitiveConverter { public: - Status Extend(SEXP x, int64_t size) override { - RETURN_NOT_OK(this->Reserve(size)); + Status Extend(SEXP x, int64_t size, int64_t offset = 0) override { + RETURN_NOT_OK(this->Reserve(size - offset)); RVectorType rtype = GetVectorType(x); if (rtype != POSIXCT) { @@ -737,10 +740,11 @@ class RPrimitiveConverter::value>> }; if (ALTREP(x)) { - return VisitVector(RVectorIterator_ALTREP(x, 0), size, append_null, + return VisitVector(RVectorIterator_ALTREP(x, offset), size, append_null, append_value); } else { - return VisitVector(RVectorIterator(x, 0), size, append_null, append_value); + return VisitVector(RVectorIterator(x, offset), size, append_null, + append_value); } } @@ -754,7 +758,7 @@ template class RPrimitiveConverter::value>> : public PrimitiveConverter { public: - Status Extend(SEXP x, int64_t size) override { + Status Extend(SEXP x, int64_t size, int64_t offset = 0) override { return Status::NotImplemented("Extend"); } }; @@ -786,8 +790,8 @@ class RPrimitiveConverter> public: using OffsetType = typename T::offset_type; - Status Extend(SEXP x, int64_t size) override { - RETURN_NOT_OK(this->Reserve(size)); + Status Extend(SEXP x, int64_t size, int64_t offset = 0) override { + RETURN_NOT_OK(this->Reserve(size - offset)); RETURN_NOT_OK(check_binary(x, size)); auto append_null = [this]() { @@ -801,7 +805,7 @@ class RPrimitiveConverter> this->primitive_builder_->UnsafeAppend(RAW_RO(raw), static_cast(n)); return Status::OK(); }; - return VisitVector(RVectorIterator(x, 0), size, append_null, append_value); + return VisitVector(RVectorIterator(x, offset), size, append_null, append_value); } void DelayedExtend(SEXP values, int64_t size, RTasks& tasks) override { @@ -814,8 +818,8 @@ template class RPrimitiveConverter::value>> : public PrimitiveConverter { public: - Status Extend(SEXP x, int64_t size) override { - RETURN_NOT_OK(this->Reserve(size)); + Status Extend(SEXP x, int64_t size, int64_t offset = 0) override { + RETURN_NOT_OK(this->Reserve(size - offset)); RETURN_NOT_OK(check_binary(x, size)); auto append_null = [this]() { @@ -833,7 +837,7 @@ class RPrimitiveConverter::v this->primitive_builder_->UnsafeAppend(RAW_RO(raw)); return Status::OK(); }; - return VisitVector(RVectorIterator(x, 0), size, append_null, append_value); + return VisitVector(RVectorIterator(x, offset), size, append_null, append_value); } void DelayedExtend(SEXP values, int64_t size, RTasks& tasks) override { @@ -848,12 +852,12 @@ class RPrimitiveConverter> public: using OffsetType = typename T::offset_type; - Status Extend(SEXP x, int64_t size) override { + Status Extend(SEXP x, int64_t size, int64_t offset = 0) override { RVectorType rtype = GetVectorType(x); if (rtype != STRING) { return Status::Invalid("Expecting a character vector"); } - return UnsafeAppendUtf8Strings(arrow::r::utf8_strings(x), size); + return UnsafeAppendUtf8Strings(arrow::r::utf8_strings(x), size, offset); } void DelayedExtend(SEXP values, int64_t size, RTasks& tasks) override { @@ -863,14 +867,14 @@ class RPrimitiveConverter> } private: - Status UnsafeAppendUtf8Strings(const cpp11::strings& s, int64_t size) { + Status UnsafeAppendUtf8Strings(const cpp11::strings& s, int64_t size, int64_t offset) { RETURN_NOT_OK(this->primitive_builder_->Reserve(s.size())); const SEXP* p_strings = reinterpret_cast(DATAPTR_RO(s)); // we know all the R strings are utf8 already, so we can get // a definite size and then use UnsafeAppend*() int64_t total_length = 0; - for (R_xlen_t i = 0; i < size; i++, ++p_strings) { + for (R_xlen_t i = offset; i < size; i++, ++p_strings) { SEXP si = *p_strings; total_length += si == NA_STRING ? 0 : LENGTH(si); } @@ -878,7 +882,7 @@ class RPrimitiveConverter> // append p_strings = reinterpret_cast(DATAPTR_RO(s)); - for (R_xlen_t i = 0; i < size; i++, ++p_strings) { + for (R_xlen_t i = offset; i < size; i++, ++p_strings) { SEXP si = *p_strings; if (si == NA_STRING) { this->primitive_builder_->UnsafeAppendNull(); @@ -895,7 +899,7 @@ template class RPrimitiveConverter::value>> : public PrimitiveConverter { public: - Status Extend(SEXP x, int64_t size) override { + Status Extend(SEXP x, int64_t size, int64_t offset = 0) override { // TODO: look in lubridate return Status::NotImplemented("Extend"); } @@ -911,7 +915,7 @@ template class RDictionaryConverter> : public DictionaryConverter { public: - Status Extend(SEXP x, int64_t size) override { + Status Extend(SEXP x, int64_t size, int64_t offset = 0) override { return Status::NotImplemented("Extend"); } }; @@ -922,14 +926,14 @@ class RDictionaryConverter> public: using BuilderType = DictionaryBuilder; - Status Extend(SEXP x, int64_t size) override { - RETURN_NOT_OK(ExtendSetup(x, size)); - return ExtendImpl(x, size, GetCharLevels(x)); + Status Extend(SEXP x, int64_t size, int64_t offset = 0) override { + RETURN_NOT_OK(ExtendSetup(x, size, offset)); + return ExtendImpl(x, size, offset, GetCharLevels(x)); } void DelayedExtend(SEXP values, int64_t size, RTasks& tasks) override { // the setup runs synchronously first - Status setup = ExtendSetup(values, size); + Status setup = ExtendSetup(values, size, /*offset=*/0); if (!setup.ok()) { // if that fails, propagate the error @@ -938,7 +942,7 @@ class RDictionaryConverter> auto char_levels = GetCharLevels(values); tasks.Append(true, [this, values, size, char_levels]() { - return this->ExtendImpl(values, size, char_levels); + return this->ExtendImpl(values, size, /*offset=*/0, char_levels); }); } } @@ -970,7 +974,7 @@ class RDictionaryConverter> return char_levels; } - Status ExtendSetup(SEXP x, int64_t size) { + Status ExtendSetup(SEXP x, int64_t size, int64_t offset) { RVectorType rtype = GetVectorType(x); if (rtype != FACTOR) { return Status::Invalid("invalid R type to convert to dictionary"); @@ -982,17 +986,18 @@ class RDictionaryConverter> RETURN_NOT_OK(this->value_builder_->InsertMemoValues(*memo_array)); // then we can proceed - return this->Reserve(size); + return this->Reserve(size - offset); } - Status ExtendImpl(SEXP values, int64_t size, + Status ExtendImpl(SEXP values, int64_t size, int64_t offset, const std::vector& char_levels) { auto append_null = [this]() { return this->value_builder_->AppendNull(); }; auto append_value = [this, &char_levels](int value) { return this->value_builder_->Append(char_levels[value - 1]); }; - return VisitVector(RVectorIterator(values, 0), size, append_null, append_value); + return VisitVector(RVectorIterator(values, offset), size, append_null, + append_value); } }; @@ -1014,7 +1019,7 @@ struct RConverterTrait> { template class RListConverter : public ListConverter { public: - Status Extend(SEXP x, int64_t size) override { + Status Extend(SEXP x, int64_t size, int64_t offset = 0) override { RETURN_NOT_OK(this->Reserve(size)); RVectorType rtype = GetVectorType(x); @@ -1034,7 +1039,7 @@ class RListConverter : public ListConverter { return this->value_converter_.get()->Extend(value, n); }; - return VisitVector(RVectorIterator(x, 0), size, append_null, append_value); + return VisitVector(RVectorIterator(x, offset), size, append_null, append_value); } void DelayedExtend(SEXP values, int64_t size, RTasks& tasks) override { @@ -1056,12 +1061,12 @@ struct RConverterTrait { class RStructConverter : public StructConverter { public: - Status Extend(SEXP x, int64_t size) override { - RETURN_NOT_OK(ExtendSetup(x, size)); + Status Extend(SEXP x, int64_t size, int64_t offset = 0) override { + RETURN_NOT_OK(ExtendSetup(x, size, offset)); auto fields = this->struct_type_->fields(); R_xlen_t n_columns = XLENGTH(x); - for (R_xlen_t i = 0; i < n_columns; i++) { + for (R_xlen_t i = offset; i < n_columns; i++) { auto status = children_[i]->Extend(VECTOR_ELT(x, i), size); if (!status.ok()) { return Status::Invalid("Problem with column ", (i + 1), " (", fields[i]->name(), @@ -1074,7 +1079,7 @@ class RStructConverter : public StructConverter { void DelayedExtend(SEXP values, int64_t size, RTasks& tasks) override { // the setup runs synchronously first - Status setup = ExtendSetup(values, size); + Status setup = ExtendSetup(values, size, /*offset=*/0); if (!setup.ok()) { // if that fails, propagate the error @@ -1095,7 +1100,7 @@ class RStructConverter : public StructConverter { return StructConverter::Init(pool); } - Status ExtendSetup(SEXP x, int64_t size) { + Status ExtendSetup(SEXP x, int64_t size, int64_t offset) { // check that x is compatible R_xlen_t n_columns = XLENGTH(x); @@ -1133,7 +1138,7 @@ class RStructConverter : public StructConverter { } } - RETURN_NOT_OK(this->Reserve(size)); + RETURN_NOT_OK(this->Reserve(size - offset)); for (R_xlen_t i = 0; i < size; i++) { RETURN_NOT_OK(struct_builder_->Append()); From 5cca163c61146d876da1d8bb5a3ae688da9934a9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kriszti=C3=A1n=20Sz=C5=B1cs?= Date: Mon, 21 Jun 2021 14:58:27 +0200 Subject: [PATCH 08/14] Show system memory on macOS hosted runner --- .github/workflows/python.yml | 12 ++++++++--- python/pyarrow/tests/test_convert_builtin.py | 22 ++++++++++---------- 2 files changed, 20 insertions(+), 14 deletions(-) diff --git a/.github/workflows/python.yml b/.github/workflows/python.yml index e89329d8948..8183bae8d68 100644 --- a/.github/workflows/python.yml +++ b/.github/workflows/python.yml @@ -36,8 +36,6 @@ concurrency: cancel-in-progress: true env: - PYARROW_TEST_SLOW: ON - PYARROW_TEST_LARGE_MEMORY: ON DOCKER_VOLUME_PREFIX: ".docker/" ARCHERY_DOCKER_USER: ${{ secrets.DOCKERHUB_USER }} ARCHERY_DOCKER_PASSWORD: ${{ secrets.DOCKERHUB_TOKEN }} @@ -114,7 +112,7 @@ jobs: macos: name: AMD64 MacOS 10.15 Python 3 runs-on: macos-latest - if: ${{ !contains(github.event.pull_request.title, 'WIP') }} + # if: ${{ !contains(github.event.pull_request.title, 'WIP') }} timeout-minutes: 60 env: ARROW_HOME: /usr/local @@ -133,6 +131,8 @@ jobs: ARROW_WITH_BROTLI: ON ARROW_BUILD_TESTS: OFF CMAKE_ARGS: "-DPython3_EXECUTABLE=/usr/local/bin/python3" + # PYARROW_TEST_SLOW: ON + PYARROW_TEST_LARGE_MEMORY: ON steps: - name: Checkout Arrow uses: actions/checkout@v2 @@ -141,6 +141,12 @@ jobs: - name: Fetch Submodules and Tags shell: bash run: ci/scripts/util_checkout.sh + - name: Show available RAM size + shell: bash + run: | + hwmemsize=$(sysctl -n hw.memsize) + ramsize=$(expr $hwmemsize / $((1024**3))) + echo "System Memory: ${ramsize} GB" - name: Install Dependencies shell: bash run: | diff --git a/python/pyarrow/tests/test_convert_builtin.py b/python/pyarrow/tests/test_convert_builtin.py index 23bb4d2a0a8..e5903e6b68a 100644 --- a/python/pyarrow/tests/test_convert_builtin.py +++ b/python/pyarrow/tests/test_convert_builtin.py @@ -2192,17 +2192,17 @@ def test_auto_chunking_list_like(): assert scalar.values == expected -@pytest.mark.slow -@pytest.mark.large_memory -def test_auto_chunking_map_type(): - # takes ~20 minutes locally - ty = pa.map_(pa.int8(), pa.int8()) - item = [(1, 1)] * 2**28 - data = [item] * 2**3 - arr = pa.array(data, type=ty) - assert isinstance(arr, pa.ChunkedArray) - assert len(arr.chunk(0)) == 7 - assert len(arr.chunk(1)) == 1 +# @pytest.mark.slow +# @pytest.mark.large_memory +# def test_auto_chunking_map_type(): +# # takes ~20 minutes locally +# ty = pa.map_(pa.int8(), pa.int8()) +# item = [(1, 1)] * 2**28 +# data = [item] * 2**3 +# arr = pa.array(data, type=ty) +# assert isinstance(arr, pa.ChunkedArray) +# assert len(arr.chunk(0)) == 7 +# assert len(arr.chunk(1)) == 1 @pytest.mark.large_memory From 319a58f63897d25c4d8bd84ee427c5843ac43d52 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kriszti=C3=A1n=20Sz=C5=B1cs?= Date: Mon, 21 Jun 2021 15:49:17 +0200 Subject: [PATCH 09/14] Re-annotate slow test cases --- ci/scripts/python_test.sh | 2 +- python/pyarrow/tests/test_array.py | 1 + python/pyarrow/tests/test_convert_builtin.py | 25 ++++++++++---------- 3 files changed, 14 insertions(+), 14 deletions(-) diff --git a/ci/scripts/python_test.sh b/ci/scripts/python_test.sh index 80a9cdef4a3..6e05af89a19 100755 --- a/ci/scripts/python_test.sh +++ b/ci/scripts/python_test.sh @@ -29,4 +29,4 @@ export LD_LIBRARY_PATH=${ARROW_HOME}/lib:${LD_LIBRARY_PATH} # Enable some checks inside Python itself export PYTHONDEVMODE=1 -pytest -r s ${PYTEST_ARGS} --pyargs pyarrow +pytest -r s -v ${PYTEST_ARGS} --pyargs pyarrow diff --git a/python/pyarrow/tests/test_array.py b/python/pyarrow/tests/test_array.py index 30500bc3c5b..9f6ab678a95 100644 --- a/python/pyarrow/tests/test_array.py +++ b/python/pyarrow/tests/test_array.py @@ -2630,6 +2630,7 @@ def test_array_from_numpy_str_utf8(): pa.array(vec, pa.string(), mask=np.array([False])) +@pytest.mark.slow @pytest.mark.large_memory def test_numpy_binary_overflow_to_chunked(): # ARROW-3762, ARROW-5966 diff --git a/python/pyarrow/tests/test_convert_builtin.py b/python/pyarrow/tests/test_convert_builtin.py index e5903e6b68a..1a500b8523f 100644 --- a/python/pyarrow/tests/test_convert_builtin.py +++ b/python/pyarrow/tests/test_convert_builtin.py @@ -756,6 +756,7 @@ def test_large_binary_array(ty): assert len(arr) == nrepeats +@pytest.mark.slow @pytest.mark.large_memory @pytest.mark.parametrize("ty", [pa.large_binary(), pa.large_string()]) def test_large_binary_value(ty): @@ -2169,7 +2170,6 @@ def test_auto_chunking_list_of_binary(): assert arr.chunk(1).to_pylist() == [['x' * 1024]] * 2 -@pytest.mark.slow @pytest.mark.large_memory def test_auto_chunking_list_like(): item = np.ones((2**28,), dtype='uint8') @@ -2192,17 +2192,17 @@ def test_auto_chunking_list_like(): assert scalar.values == expected -# @pytest.mark.slow -# @pytest.mark.large_memory -# def test_auto_chunking_map_type(): -# # takes ~20 minutes locally -# ty = pa.map_(pa.int8(), pa.int8()) -# item = [(1, 1)] * 2**28 -# data = [item] * 2**3 -# arr = pa.array(data, type=ty) -# assert isinstance(arr, pa.ChunkedArray) -# assert len(arr.chunk(0)) == 7 -# assert len(arr.chunk(1)) == 1 +@pytest.mark.slow +@pytest.mark.large_memory +def test_auto_chunking_map_type(): + # takes ~20 minutes locally + ty = pa.map_(pa.int8(), pa.int8()) + item = [(1, 1)] * 2**28 + data = [item] * 2**3 + arr = pa.array(data, type=ty) + assert isinstance(arr, pa.ChunkedArray) + assert len(arr.chunk(0)) == 7 + assert len(arr.chunk(1)) == 1 @pytest.mark.large_memory @@ -2238,7 +2238,6 @@ def test_nested_auto_chunking(ty, char): } -@pytest.mark.slow @pytest.mark.large_memory def test_array_from_pylist_data_overflow(): # Regression test for ARROW-12983 From 0a20b7db14a4278eaa58c4968d43cfd6194a60b7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kriszti=C3=A1n=20Sz=C5=B1cs?= Date: Mon, 21 Jun 2021 15:54:37 +0200 Subject: [PATCH 10/14] Explain rewind_on_overflow logic --- cpp/src/arrow/util/converter.h | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/cpp/src/arrow/util/converter.h b/cpp/src/arrow/util/converter.h index 4a233a5cecf..0b29e0f5bc7 100644 --- a/cpp/src/arrow/util/converter.h +++ b/cpp/src/arrow/util/converter.h @@ -324,6 +324,11 @@ class Chunker { // In this case, no need to try again. return status; } else if (converter_->rewind_on_overflow()) { + // The list-like and binary-like conversion paths may raise a capacity error, + // we need to handle them differently. While the binary-like converters check + // the capacity before append/extend the list-like converters just check after + // append/extend. Thus depending on the implementation semantics we may need + // to rewind (slice) the output chunk by one. length_ -= 1; offset -= 1; } @@ -352,6 +357,11 @@ class Chunker { // In this case, no need to try again. return status; } else if (converter_->rewind_on_overflow()) { + // The list-like and binary-like conversion paths may raise a capacity error, + // we need to handle them differently. While the binary-like converters check + // the capacity before append/extend the list-like converters just check after + // append/extend. Thus depending on the implementation semantics we may need + // to rewind (slice) the output chunk by one. length_ -= 1; offset -= 1; } From 010dcae0bc122f19f55b91ccb91e02781f064b9a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kriszti=C3=A1n=20Sz=C5=B1cs?= Date: Mon, 21 Jun 2021 15:57:16 +0200 Subject: [PATCH 11/14] Trigger CI --- .github/workflows/python.yml | 1 - 1 file changed, 1 deletion(-) diff --git a/.github/workflows/python.yml b/.github/workflows/python.yml index 8183bae8d68..474efc35d00 100644 --- a/.github/workflows/python.yml +++ b/.github/workflows/python.yml @@ -131,7 +131,6 @@ jobs: ARROW_WITH_BROTLI: ON ARROW_BUILD_TESTS: OFF CMAKE_ARGS: "-DPython3_EXECUTABLE=/usr/local/bin/python3" - # PYARROW_TEST_SLOW: ON PYARROW_TEST_LARGE_MEMORY: ON steps: - name: Checkout Arrow From 161aa1e432ce1423930ba762868f651a94257dbf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kriszti=C3=A1n=20Sz=C5=B1cs?= Date: Mon, 21 Jun 2021 16:48:50 +0200 Subject: [PATCH 12/14] Re-annotate more slow test cases --- python/pyarrow/tests/parquet/test_data_types.py | 5 +++++ python/pyarrow/tests/test_pandas.py | 2 +- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/python/pyarrow/tests/parquet/test_data_types.py b/python/pyarrow/tests/parquet/test_data_types.py index 850dff94df4..bdbc6b7b5a5 100644 --- a/python/pyarrow/tests/parquet/test_data_types.py +++ b/python/pyarrow/tests/parquet/test_data_types.py @@ -404,6 +404,7 @@ def test_fixed_size_binary(): # ----------------------------------------------------------------------------- +@pytest.mark.slow @pytest.mark.large_memory def test_large_table_int32_overflow(): size = np.iinfo('int32').max + 1 @@ -424,6 +425,7 @@ def _simple_table_roundtrip(table, use_legacy_dataset=False, **write_kwargs): return _read_table(buf, use_legacy_dataset=use_legacy_dataset) +@pytest.mark.slow @pytest.mark.large_memory @parametrize_legacy_dataset def test_byte_array_exactly_2gb(use_legacy_dataset): @@ -444,6 +446,7 @@ def test_byte_array_exactly_2gb(use_legacy_dataset): assert t.equals(result) +@pytest.mark.slow @pytest.mark.pandas @pytest.mark.large_memory @parametrize_legacy_dataset @@ -469,6 +472,7 @@ def test_binary_array_overflow_to_chunked(use_legacy_dataset): assert tbl.equals(read_tbl) +@pytest.mark.slow @pytest.mark.pandas @pytest.mark.large_memory @parametrize_legacy_dataset @@ -499,6 +503,7 @@ def test_large_binary(): _check_roundtrip(table, use_dictionary=use_dictionary) +@pytest.mark.slow @pytest.mark.large_memory def test_large_binary_huge(): s = b'xy' * 997 diff --git a/python/pyarrow/tests/test_pandas.py b/python/pyarrow/tests/test_pandas.py index 47363a90836..b6557875c2c 100644 --- a/python/pyarrow/tests/test_pandas.py +++ b/python/pyarrow/tests/test_pandas.py @@ -2167,7 +2167,6 @@ def test_list_of_dictionary(self): expected[2] = None tm.assert_series_equal(arr.to_pandas(), expected) - @pytest.mark.slow @pytest.mark.large_memory def test_auto_chunking_on_list_overflow(self): # ARROW-9976 @@ -2356,6 +2355,7 @@ def test_from_numpy_nested(self): {'x': {'xx': 1, 'yy': True}, 'y': 2, 'z': 'foo'}, {'x': {'xx': 3, 'yy': False}, 'y': 4, 'z': 'bar'}] + @pytest.mark.slow @pytest.mark.large_memory def test_from_numpy_large(self): # Exercise rechunking + nulls From e45079b9406e30e12082a4f0f165379e4c14e703 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kriszti=C3=A1n=20Sz=C5=B1cs?= Date: Mon, 21 Jun 2021 18:38:45 +0200 Subject: [PATCH 13/14] Trigger all of the builds --- .github/workflows/python.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/python.yml b/.github/workflows/python.yml index 474efc35d00..c6781a4b149 100644 --- a/.github/workflows/python.yml +++ b/.github/workflows/python.yml @@ -112,7 +112,7 @@ jobs: macos: name: AMD64 MacOS 10.15 Python 3 runs-on: macos-latest - # if: ${{ !contains(github.event.pull_request.title, 'WIP') }} + if: ${{ !contains(github.event.pull_request.title, 'WIP') }} timeout-minutes: 60 env: ARROW_HOME: /usr/local From 7f8b74ad629794aa72881a8b7c8a14a3d88bfaf7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kriszti=C3=A1n=20Sz=C5=B1cs?= Date: Tue, 22 Jun 2021 11:24:55 +0200 Subject: [PATCH 14/14] Directly return status --- cpp/src/arrow/python/python_to_arrow.cc | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/cpp/src/arrow/python/python_to_arrow.cc b/cpp/src/arrow/python/python_to_arrow.cc index e6c8d306ccd..521249fd542 100644 --- a/cpp/src/arrow/python/python_to_arrow.cc +++ b/cpp/src/arrow/python/python_to_arrow.cc @@ -647,8 +647,7 @@ class PyListConverter : public ListConverter { Status AppendSequence(PyObject* value) { int64_t size = static_cast(PySequence_Size(value)); RETURN_NOT_OK(this->list_builder_->ValidateOverflow(size)); - RETURN_NOT_OK(this->value_converter_->Extend(value, size)); - return Status::OK(); + return this->value_converter_->Extend(value, size); } Status AppendNdarray(PyObject* value) { @@ -686,8 +685,7 @@ class PyListConverter : public ListConverter { LIST_FAST_CASE(DURATION, DurationType, NPY_TIMEDELTA) #undef LIST_FAST_CASE default: { - RETURN_NOT_OK(this->value_converter_->Extend(value, size)); - return Status::OK(); + return this->value_converter_->Extend(value, size); } } }