diff --git a/cpp/src/arrow/python/inference.cc b/cpp/src/arrow/python/inference.cc index 9d6707aa11d..5086815f84f 100644 --- a/cpp/src/arrow/python/inference.cc +++ b/cpp/src/arrow/python/inference.cc @@ -379,12 +379,13 @@ class TypeInferrer { // Infer value type from a sequence of values Status VisitSequence(PyObject* obj, PyObject* mask = nullptr) { if (mask == nullptr || mask == Py_None) { - return internal::VisitSequence(obj, [this](PyObject* value, bool* keep_going) { - return Visit(value, keep_going); - }); + return internal::VisitSequence( + obj, /*offset=*/0, + [this](PyObject* value, bool* keep_going) { return Visit(value, keep_going); }); } else { return internal::VisitSequenceMasked( - obj, mask, [this](PyObject* value, uint8_t masked, bool* keep_going) { + obj, mask, /*offset=*/0, + [this](PyObject* value, uint8_t masked, bool* keep_going) { if (!masked) { return Visit(value, keep_going); } else { diff --git a/cpp/src/arrow/python/iterators.h b/cpp/src/arrow/python/iterators.h index 6b0b55342a5..5ecc9d6dc0c 100644 --- a/cpp/src/arrow/python/iterators.h +++ b/cpp/src/arrow/python/iterators.h @@ -36,7 +36,7 @@ namespace internal { // // If keep_going is set to false, the iteration terminates template -inline Status VisitSequenceGeneric(PyObject* obj, VisitorFunc&& func) { +inline Status VisitSequenceGeneric(PyObject* obj, int64_t offset, VisitorFunc&& func) { // VisitorFunc may set to false to terminate iteration bool keep_going = true; @@ -49,7 +49,7 @@ inline Status VisitSequenceGeneric(PyObject* obj, VisitorFunc&& func) { if (PyArray_DESCR(arr_obj)->type_num == NPY_OBJECT) { // It's an array object, we can fetch object pointers directly const Ndarray1DIndexer objects(arr_obj); - for (int64_t i = 0; keep_going && i < objects.size(); ++i) { + for (int64_t i = offset; keep_going && i < objects.size(); ++i) { RETURN_NOT_OK(func(objects[i], i, &keep_going)); } return Status::OK(); @@ -64,7 +64,7 @@ inline Status VisitSequenceGeneric(PyObject* obj, VisitorFunc&& func) { if (PyList_Check(obj) || PyTuple_Check(obj)) { // Use fast item access const Py_ssize_t size = PySequence_Fast_GET_SIZE(obj); - for (Py_ssize_t i = 0; keep_going && i < size; ++i) { + for (Py_ssize_t i = offset; keep_going && i < size; ++i) { PyObject* value = PySequence_Fast_GET_ITEM(obj, i); RETURN_NOT_OK(func(value, static_cast(i), &keep_going)); } @@ -72,7 +72,7 @@ inline Status VisitSequenceGeneric(PyObject* obj, VisitorFunc&& func) { // Regular sequence: avoid making a potentially large copy const Py_ssize_t size = PySequence_Size(obj); RETURN_IF_PYERROR(); - for (Py_ssize_t i = 0; keep_going && i < size; ++i) { + for (Py_ssize_t i = offset; keep_going && i < size; ++i) { OwnedRef value_ref(PySequence_ITEM(obj, i)); RETURN_IF_PYERROR(); RETURN_NOT_OK(func(value_ref.obj(), static_cast(i), &keep_going)); @@ -84,18 +84,19 @@ inline Status VisitSequenceGeneric(PyObject* obj, VisitorFunc&& func) { return Status::OK(); } -// Visit sequence with no null mask +// Visit sequence with offset but no null mask template -inline Status VisitSequence(PyObject* obj, VisitorFunc&& func) { +inline Status VisitSequence(PyObject* obj, int64_t offset, VisitorFunc&& func) { return VisitSequenceGeneric( - obj, [&func](PyObject* value, int64_t i /* unused */, bool* keep_going) { + obj, offset, [&func](PyObject* value, int64_t i /* unused */, bool* keep_going) { return func(value, keep_going); }); } -/// Visit sequence with null mask +/// Visit sequence with null mask and offset template -inline Status VisitSequenceMasked(PyObject* obj, PyObject* mo, VisitorFunc&& func) { +inline Status VisitSequenceMasked(PyObject* obj, PyObject* mo, int64_t offset, + VisitorFunc&& func) { if (mo == nullptr || !PyArray_Check(mo)) { return Status::Invalid("Null mask must be NumPy array"); } @@ -115,7 +116,7 @@ inline Status VisitSequenceMasked(PyObject* obj, PyObject* mo, VisitorFunc&& fun Ndarray1DIndexer mask_values(mask); return VisitSequenceGeneric( - obj, [&func, &mask_values](PyObject* value, int64_t i, bool* keep_going) { + obj, offset, [&func, &mask_values](PyObject* value, int64_t i, bool* keep_going) { return func(value, mask_values[i], keep_going); }); } else { @@ -132,7 +133,7 @@ template inline Status VisitIterable(PyObject* obj, VisitorFunc&& func) { if (PySequence_Check(obj)) { // Numpy arrays fall here as well - return VisitSequence(obj, std::forward(func)); + return VisitSequence(obj, /*offset=*/0, std::forward(func)); } // Fall back on the iterator protocol OwnedRef iter_ref(PyObject_GetIter(obj)); diff --git a/cpp/src/arrow/python/python_to_arrow.cc b/cpp/src/arrow/python/python_to_arrow.cc index b2d9f1cb5a3..69f0de730d2 100644 --- a/cpp/src/arrow/python/python_to_arrow.cc +++ b/cpp/src/arrow/python/python_to_arrow.cc @@ -392,22 +392,25 @@ class PyValue { class PyConverter : public Converter { public: // Iterate over the input values and defer the conversion to the Append method - Status Extend(PyObject* values, int64_t size) override { - /// Ensure we've allocated enough space - RETURN_NOT_OK(this->Reserve(size)); + Status Extend(PyObject* values, int64_t size, int64_t offset = 0) override { + DCHECK_GE(size, offset); + // Ensure we've allocated enough space + RETURN_NOT_OK(this->Reserve(size - offset)); // Iterate over the items adding each one - return internal::VisitSequence(values, [this](PyObject* item, bool* /* unused */) { - return this->Append(item); - }); + return internal::VisitSequence( + values, offset, + [this](PyObject* item, bool* /* unused */) { return this->Append(item); }); } // Convert and append a sequence of values masked with a numpy array - Status ExtendMasked(PyObject* values, PyObject* mask, int64_t size) override { - /// Ensure we've allocated enough space - RETURN_NOT_OK(this->Reserve(size)); + Status ExtendMasked(PyObject* values, PyObject* mask, int64_t size, + int64_t offset = 0) override { + DCHECK_GE(size, offset); + // Ensure we've allocated enough space + RETURN_NOT_OK(this->Reserve(size - offset)); // Iterate over the items adding each one return internal::VisitSequenceMasked( - values, mask, [this](PyObject* item, bool is_masked, bool* /* unused */) { + values, mask, offset, [this](PyObject* item, bool is_masked, bool* /* unused */) { if (is_masked) { return this->AppendNull(); } else { @@ -514,34 +517,6 @@ class PyPrimitiveConverter< } }; -template -class PyPrimitiveConverter> - : public PrimitiveConverter { - public: - using OffsetType = typename T::offset_type; - - Status Append(PyObject* value) override { - if (PyValue::IsNull(this->options_, value)) { - this->primitive_builder_->UnsafeAppendNull(); - } else { - ARROW_RETURN_NOT_OK( - PyValue::Convert(this->primitive_type_, this->options_, value, view_)); - // Since we don't know the varying length input size in advance, we need to - // reserve space in the value builder one by one. ReserveData raises CapacityError - // if the value would not fit into the array. - ARROW_RETURN_NOT_OK(this->primitive_builder_->ReserveData(view_.size)); - this->primitive_builder_->UnsafeAppend(view_.bytes, - static_cast(view_.size)); - } - return Status::OK(); - } - - protected: - // Create a single instance of PyBytesView here to prevent unnecessary object - // creation/destruction. This significantly improves the conversion performance. - PyBytesView view_; -}; - template class PyPrimitiveConverter::value>> : public PrimitiveConverter { @@ -563,7 +538,7 @@ class PyPrimitiveConverter:: }; template -class PyPrimitiveConverter> +class PyPrimitiveConverter> : public PrimitiveConverter { public: using OffsetType = typename T::offset_type; @@ -578,6 +553,9 @@ class PyPrimitiveConverter> // observed binary value observed_binary_ = true; } + // Since we don't know the varying length input size in advance, we need to + // reserve space in the value builder one by one. ReserveData raises CapacityError + // if the value would not fit into the array. ARROW_RETURN_NOT_OK(this->primitive_builder_->ReserveData(view_.size)); this->primitive_builder_->UnsafeAppend(view_.bytes, static_cast(view_.size)); @@ -585,6 +563,37 @@ class PyPrimitiveConverter> return Status::OK(); } + Result ExtendAsMuchAsPossible(PyObject* values, int64_t size, + int64_t offset) override { + DCHECK_GE(size, offset); + // See BaseBinaryBuilder::Resize - avoid error in Reserve() + if (size - offset > BaseBinaryBuilder::memory_limit()) { + size = offset + BaseBinaryBuilder::memory_limit() - 1; + } + const auto status = this->Extend(values, size, offset); + const auto num_converted = this->builder()->length(); + if (ARROW_PREDICT_TRUE(status.ok() || + (status.IsCapacityError() && num_converted > 0))) { + return num_converted; + } + return status; + } + + Result ExtendMaskedAsMuchAsPossible(PyObject* values, PyObject* mask, + int64_t size, int64_t offset) override { + DCHECK_GE(size, offset); + if (size - offset >= BaseBinaryBuilder::memory_limit()) { + size = offset + BaseBinaryBuilder::memory_limit() - 1; + } + const auto status = this->ExtendMasked(values, mask, size, offset); + const auto num_converted = this->builder()->length(); + if (ARROW_PREDICT_TRUE(status.ok() || + (status.IsCapacityError() && num_converted > 0))) { + return num_converted; + } + return status; + } + Result> ToArray() override { ARROW_ASSIGN_OR_RAISE(auto array, (PrimitiveConverter::ToArray())); if (observed_binary_) { @@ -597,6 +606,8 @@ class PyPrimitiveConverter> } protected: + // Create a single instance of PyBytesView here to prevent unnecessary object + // creation/destruction. This significantly improves the conversion performance. PyBytesView view_; bool observed_binary_ = false; }; diff --git a/cpp/src/arrow/util/converter.h b/cpp/src/arrow/util/converter.h index 2c40a48726b..7a45efc9d33 100644 --- a/cpp/src/arrow/util/converter.h +++ b/cpp/src/arrow/util/converter.h @@ -25,6 +25,7 @@ #include "arrow/type.h" #include "arrow/type_traits.h" #include "arrow/util/checked_cast.h" +#include "arrow/util/logging.h" #include "arrow/util/make_unique.h" #include "arrow/visitor_inline.h" @@ -54,14 +55,40 @@ class Converter { virtual Status Append(InputType value) { return Status::NotImplemented("Append"); } - virtual Status Extend(InputType values, int64_t size) { + /// \brief Extend the array with values from the input sequence starting at the offset. + /// Fails if the array cannot fit all values. + /// \param[in] values The input sequence. + /// \param[in] size The total length of the input sequence, independent of offset. + /// \param[in] offset The offset to start converting from. + virtual Status Extend(InputType values, int64_t size, int64_t offset = 0) { return Status::NotImplemented("Extend"); } - virtual Status ExtendMasked(InputType values, InputType mask, int64_t size) { + /// \brief Extend the array with as many values as possible from the + /// input sequence starting at the offset. + /// + /// \param[in] values The input sequence. + /// \param[in] size The total length of the input sequence (independent of offset). + /// \param[in] offset The offset to start at. + virtual Result ExtendAsMuchAsPossible(InputType values, int64_t size, + int64_t offset) { + ARROW_DCHECK_GE(size, offset); + ARROW_RETURN_NOT_OK(Extend(values, size, offset)); + return builder()->length(); + } + + virtual Status ExtendMasked(InputType values, InputType mask, int64_t size, + int64_t offset = 0) { return Status::NotImplemented("ExtendMasked"); } + virtual Result ExtendMaskedAsMuchAsPossible(InputType values, InputType mask, + int64_t size, int64_t offset) { + ARROW_DCHECK_GE(size, offset); + ARROW_RETURN_NOT_OK(ExtendMasked(values, mask, size, offset)); + return builder()->length(); + } + const std::shared_ptr& builder() const { return builder_; } const std::shared_ptr& type() const { return type_; } @@ -302,32 +329,35 @@ class Chunker { return status; } - // we could get bit smarter here since the whole batch of appendable values - // will be rejected if a capacity error is raised Status Extend(InputType values, int64_t size) { - auto status = converter_->Extend(values, size); - if (ARROW_PREDICT_FALSE(status.IsCapacityError())) { - if (converter_->builder()->length() == 0) { - return status; + int64_t offset = 0; + while (offset < size) { + ARROW_ASSIGN_OR_RAISE(const int64_t num_converted, + converter_->ExtendAsMuchAsPossible(values, size, offset)); + offset += num_converted; + length_ += num_converted; + if (offset < size) { + // Need another chunk, finish up the current one + ARROW_RETURN_NOT_OK(FinishChunk()); } - ARROW_RETURN_NOT_OK(FinishChunk()); - return Extend(values, size); } - length_ += size; - return status; + return Status::OK(); } Status ExtendMasked(InputType values, InputType mask, int64_t size) { - auto status = converter_->ExtendMasked(values, mask, size); - if (ARROW_PREDICT_FALSE(status.IsCapacityError())) { - if (converter_->builder()->length() == 0) { - return status; + int64_t offset = 0; + while (offset < size) { + ARROW_ASSIGN_OR_RAISE( + const int64_t num_converted, + converter_->ExtendMaskedAsMuchAsPossible(values, mask, size, offset)); + offset += num_converted; + length_ += num_converted; + if (offset < size) { + // Need another chunk, finish up the current one + ARROW_RETURN_NOT_OK(FinishChunk()); } - ARROW_RETURN_NOT_OK(FinishChunk()); - return ExtendMasked(values, mask, size); } - length_ += size; - return status; + return Status::OK(); } Status FinishChunk() { diff --git a/python/.gitignore b/python/.gitignore index ef1237a2d10..af83a453fe8 100644 --- a/python/.gitignore +++ b/python/.gitignore @@ -17,6 +17,7 @@ Testing/ *.cpp pyarrow/*_api.h pyarrow/_generated_version.py +cython_debug/ # Bundled headers pyarrow/include diff --git a/python/pyarrow/tests/test_array.py b/python/pyarrow/tests/test_array.py index 30500bc3c5b..5e565214b16 100644 --- a/python/pyarrow/tests/test_array.py +++ b/python/pyarrow/tests/test_array.py @@ -2875,3 +2875,50 @@ def test_to_pandas_timezone(): arr = pa.chunked_array([arr]) s = arr.to_pandas() assert s.dt.tz is not None + + +@pytest.mark.slow +@pytest.mark.large_memory +def test_array_from_pylist_data_overflow(): + # Regression test for ARROW-12983 + # Data buffer overflow - should result in chunked array + items = [b'a' * 4096] * (2 ** 19) + arr = pa.array(items, type=pa.string()) + assert isinstance(arr, pa.ChunkedArray) + assert len(arr) == 2**19 + assert len(arr.chunks) > 1 + + mask = np.zeros(2**19, bool) + arr = pa.array(items, mask=mask, type=pa.string()) + assert isinstance(arr, pa.ChunkedArray) + assert len(arr) == 2**19 + assert len(arr.chunks) > 1 + + arr = pa.array(items, type=pa.binary()) + assert isinstance(arr, pa.ChunkedArray) + assert len(arr) == 2**19 + assert len(arr.chunks) > 1 + + +@pytest.mark.slow +@pytest.mark.large_memory +def test_array_from_pylist_offset_overflow(): + # Regression test for ARROW-12983 + # Offset buffer overflow - should result in chunked array + # Note this doesn't apply to primitive arrays + items = [b'a'] * (2 ** 31) + arr = pa.array(items, type=pa.string()) + assert isinstance(arr, pa.ChunkedArray) + assert len(arr) == 2**31 + assert len(arr.chunks) > 1 + + mask = np.zeros(2**31, bool) + arr = pa.array(items, mask=mask, type=pa.string()) + assert isinstance(arr, pa.ChunkedArray) + assert len(arr) == 2**31 + assert len(arr.chunks) > 1 + + arr = pa.array(items, type=pa.binary()) + assert isinstance(arr, pa.ChunkedArray) + assert len(arr) == 2**31 + assert len(arr.chunks) > 1 diff --git a/r/src/r_to_arrow.cpp b/r/src/r_to_arrow.cpp index d0f4f3a6def..683e8f278e8 100644 --- a/r/src/r_to_arrow.cpp +++ b/r/src/r_to_arrow.cpp @@ -326,7 +326,7 @@ class RConverter : public Converter { public: virtual Status Append(SEXP) { return Status::NotImplemented("Append"); } - virtual Status Extend(SEXP values, int64_t size) { + virtual Status Extend(SEXP values, int64_t size, int64_t offset = 0) { return Status::NotImplemented("Extend"); } @@ -337,7 +337,7 @@ class RConverter : public Converter { tasks.Append(false, task); } - virtual Status ExtendMasked(SEXP values, SEXP mask, int64_t size) { + virtual Status ExtendMasked(SEXP values, SEXP mask, int64_t size, int64_t offset = 0) { return Status::NotImplemented("ExtendMasked"); } }; @@ -434,8 +434,8 @@ template class RPrimitiveConverter> : public PrimitiveConverter { public: - Status Extend(SEXP, int64_t size) override { - return this->primitive_builder_->AppendNulls(size); + Status Extend(SEXP, int64_t size, int64_t offset = 0) override { + return this->primitive_builder_->AppendNulls(size - offset); } }; @@ -445,17 +445,17 @@ class RPrimitiveConverter< T, enable_if_t::value || is_floating_type::value>> : public PrimitiveConverter { public: - Status Extend(SEXP x, int64_t size) override { + Status Extend(SEXP x, int64_t size, int64_t offset = 0) override { auto rtype = GetVectorType(x); switch (rtype) { case UINT8: - return ExtendDispatch(x, size); + return ExtendDispatch(x, size, offset); case INT32: - return ExtendDispatch(x, size); + return ExtendDispatch(x, size, offset); case FLOAT64: - return ExtendDispatch(x, size); + return ExtendDispatch(x, size, offset); case INT64: - return ExtendDispatch(x, size); + return ExtendDispatch(x, size, offset); default: break; @@ -471,14 +471,14 @@ class RPrimitiveConverter< private: template - Status ExtendDispatch(SEXP x, int64_t size) { + Status ExtendDispatch(SEXP x, int64_t size, int64_t offset) { if (ALTREP(x)) { // `x` is an ALTREP R vector storing `r_value_type` // and that type matches exactly the type of the array this is building - return Extend_impl(RVectorIterator_ALTREP(x, 0), size); + return Extend_impl(RVectorIterator_ALTREP(x, offset), size); } else { // `x` is not an ALTREP vector so we have direct access to a range of values - return Extend_impl(RVectorIterator(x, 0), size); + return Extend_impl(RVectorIterator(x, offset), size); } } @@ -514,16 +514,16 @@ template class RPrimitiveConverter::value>> : public PrimitiveConverter { public: - Status Extend(SEXP x, int64_t size) override { + Status Extend(SEXP x, int64_t size, int64_t offset = 0) override { auto rtype = GetVectorType(x); if (rtype != BOOLEAN) { return Status::Invalid("Expecting a logical vector"); } if (ALTREP(x)) { - return Extend_impl(RVectorIterator_ALTREP(x, 0), size); + return Extend_impl(RVectorIterator_ALTREP(x, offset), size); } else { - return Extend_impl(RVectorIterator(x, 0), size); + return Extend_impl(RVectorIterator(x, offset), size); } } @@ -553,16 +553,16 @@ template class RPrimitiveConverter::value>> : public PrimitiveConverter { public: - Status Extend(SEXP x, int64_t size) override { + Status Extend(SEXP x, int64_t size, int64_t offset = 0) override { switch (GetVectorType(x)) { case DATE_INT: - return AppendRange_Date_dispatch(x, size); + return AppendRange_Date_dispatch(x, size, offset); case DATE_DBL: - return AppendRange_Date_dispatch(x, size); + return AppendRange_Date_dispatch(x, size, offset); case POSIXCT: - return AppendRange_Posixct_dispatch(x, size); + return AppendRange_Posixct_dispatch(x, size, offset); default: break; @@ -578,11 +578,12 @@ class RPrimitiveConverter::value>> private: template - Status AppendRange_Date_dispatch(SEXP x, int64_t size) { + Status AppendRange_Date_dispatch(SEXP x, int64_t size, int64_t offset) { if (ALTREP(x)) { - return AppendRange_Date(RVectorIterator_ALTREP(x, 0), size); + return AppendRange_Date(RVectorIterator_ALTREP(x, offset), + size - offset); } else { - return AppendRange_Date(RVectorIterator(x, 0), size); + return AppendRange_Date(RVectorIterator(x, offset), size - offset); } } @@ -602,11 +603,12 @@ class RPrimitiveConverter::value>> return VisitVector(it, size, append_null, append_value); } - Status AppendRange_Posixct_dispatch(SEXP x, int64_t size) { + Status AppendRange_Posixct_dispatch(SEXP x, int64_t size, int64_t offset) { if (ALTREP(x)) { - return AppendRange_Posixct(RVectorIterator_ALTREP(x, 0), size); + return AppendRange_Posixct(RVectorIterator_ALTREP(x, offset), + size - offset); } else { - return AppendRange_Posixct(RVectorIterator(x, 0), size); + return AppendRange_Posixct(RVectorIterator(x, offset), size - offset); } } @@ -660,8 +662,8 @@ template class RPrimitiveConverter::value>> : public PrimitiveConverter { public: - Status Extend(SEXP x, int64_t size) override { - RETURN_NOT_OK(this->Reserve(size)); + Status Extend(SEXP x, int64_t size, int64_t offset = 0) override { + RETURN_NOT_OK(this->Reserve(size - offset)); auto rtype = GetVectorType(x); if (rtype != TIME) { return Status::Invalid("Invalid conversion to time"); @@ -699,10 +701,11 @@ class RPrimitiveConverter::value>> }; if (ALTREP(x)) { - return VisitVector(RVectorIterator_ALTREP(x, 0), size, append_null, + return VisitVector(RVectorIterator_ALTREP(x, offset), size, append_null, append_value); } else { - return VisitVector(RVectorIterator(x, 0), size, append_null, append_value); + return VisitVector(RVectorIterator(x, offset), size, append_null, + append_value); } } @@ -716,8 +719,8 @@ template class RPrimitiveConverter::value>> : public PrimitiveConverter { public: - Status Extend(SEXP x, int64_t size) override { - RETURN_NOT_OK(this->Reserve(size)); + Status Extend(SEXP x, int64_t size, int64_t offset = 0) override { + RETURN_NOT_OK(this->Reserve(size - offset)); RVectorType rtype = GetVectorType(x); if (rtype != POSIXCT) { @@ -737,10 +740,11 @@ class RPrimitiveConverter::value>> }; if (ALTREP(x)) { - return VisitVector(RVectorIterator_ALTREP(x, 0), size, append_null, + return VisitVector(RVectorIterator_ALTREP(x, offset), size, append_null, append_value); } else { - return VisitVector(RVectorIterator(x, 0), size, append_null, append_value); + return VisitVector(RVectorIterator(x, offset), size, append_null, + append_value); } } @@ -754,7 +758,7 @@ template class RPrimitiveConverter::value>> : public PrimitiveConverter { public: - Status Extend(SEXP x, int64_t size) override { + Status Extend(SEXP x, int64_t size, int64_t offset = 0) override { return Status::NotImplemented("Extend"); } }; @@ -786,8 +790,8 @@ class RPrimitiveConverter> public: using OffsetType = typename T::offset_type; - Status Extend(SEXP x, int64_t size) override { - RETURN_NOT_OK(this->Reserve(size)); + Status Extend(SEXP x, int64_t size, int64_t offset = 0) override { + RETURN_NOT_OK(this->Reserve(size - offset)); RETURN_NOT_OK(check_binary(x, size)); auto append_null = [this]() { @@ -801,7 +805,7 @@ class RPrimitiveConverter> this->primitive_builder_->UnsafeAppend(RAW_RO(raw), static_cast(n)); return Status::OK(); }; - return VisitVector(RVectorIterator(x, 0), size, append_null, append_value); + return VisitVector(RVectorIterator(x, offset), size, append_null, append_value); } void DelayedExtend(SEXP values, int64_t size, RTasks& tasks) override { @@ -814,8 +818,8 @@ template class RPrimitiveConverter::value>> : public PrimitiveConverter { public: - Status Extend(SEXP x, int64_t size) override { - RETURN_NOT_OK(this->Reserve(size)); + Status Extend(SEXP x, int64_t size, int64_t offset = 0) override { + RETURN_NOT_OK(this->Reserve(size - offset)); RETURN_NOT_OK(check_binary(x, size)); auto append_null = [this]() { @@ -833,7 +837,7 @@ class RPrimitiveConverter::v this->primitive_builder_->UnsafeAppend(RAW_RO(raw)); return Status::OK(); }; - return VisitVector(RVectorIterator(x, 0), size, append_null, append_value); + return VisitVector(RVectorIterator(x, offset), size, append_null, append_value); } void DelayedExtend(SEXP values, int64_t size, RTasks& tasks) override { @@ -848,12 +852,12 @@ class RPrimitiveConverter> public: using OffsetType = typename T::offset_type; - Status Extend(SEXP x, int64_t size) override { + Status Extend(SEXP x, int64_t size, int64_t offset = 0) override { RVectorType rtype = GetVectorType(x); if (rtype != STRING) { return Status::Invalid("Expecting a character vector"); } - return UnsafeAppendUtf8Strings(arrow::r::utf8_strings(x), size); + return UnsafeAppendUtf8Strings(arrow::r::utf8_strings(x), size, offset); } void DelayedExtend(SEXP values, int64_t size, RTasks& tasks) override { @@ -863,14 +867,14 @@ class RPrimitiveConverter> } private: - Status UnsafeAppendUtf8Strings(const cpp11::strings& s, int64_t size) { + Status UnsafeAppendUtf8Strings(const cpp11::strings& s, int64_t size, int64_t offset) { RETURN_NOT_OK(this->primitive_builder_->Reserve(s.size())); const SEXP* p_strings = reinterpret_cast(DATAPTR_RO(s)); // we know all the R strings are utf8 already, so we can get // a definite size and then use UnsafeAppend*() int64_t total_length = 0; - for (R_xlen_t i = 0; i < size; i++, ++p_strings) { + for (R_xlen_t i = offset; i < size; i++, ++p_strings) { SEXP si = *p_strings; total_length += si == NA_STRING ? 0 : LENGTH(si); } @@ -878,7 +882,7 @@ class RPrimitiveConverter> // append p_strings = reinterpret_cast(DATAPTR_RO(s)); - for (R_xlen_t i = 0; i < size; i++, ++p_strings) { + for (R_xlen_t i = offset; i < size; i++, ++p_strings) { SEXP si = *p_strings; if (si == NA_STRING) { this->primitive_builder_->UnsafeAppendNull(); @@ -895,7 +899,7 @@ template class RPrimitiveConverter::value>> : public PrimitiveConverter { public: - Status Extend(SEXP x, int64_t size) override { + Status Extend(SEXP x, int64_t size, int64_t offset = 0) override { // TODO: look in lubridate return Status::NotImplemented("Extend"); } @@ -911,7 +915,7 @@ template class RDictionaryConverter> : public DictionaryConverter { public: - Status Extend(SEXP x, int64_t size) override { + Status Extend(SEXP x, int64_t size, int64_t offset = 0) override { return Status::NotImplemented("Extend"); } }; @@ -922,14 +926,14 @@ class RDictionaryConverter> public: using BuilderType = DictionaryBuilder; - Status Extend(SEXP x, int64_t size) override { - RETURN_NOT_OK(ExtendSetup(x, size)); - return ExtendImpl(x, size, GetCharLevels(x)); + Status Extend(SEXP x, int64_t size, int64_t offset = 0) override { + RETURN_NOT_OK(ExtendSetup(x, size, offset)); + return ExtendImpl(x, size, offset, GetCharLevels(x)); } void DelayedExtend(SEXP values, int64_t size, RTasks& tasks) override { // the setup runs synchronously first - Status setup = ExtendSetup(values, size); + Status setup = ExtendSetup(values, size, /*offset=*/0); if (!setup.ok()) { // if that fails, propagate the error @@ -938,7 +942,7 @@ class RDictionaryConverter> auto char_levels = GetCharLevels(values); tasks.Append(true, [this, values, size, char_levels]() { - return this->ExtendImpl(values, size, char_levels); + return this->ExtendImpl(values, size, /*offset=*/0, char_levels); }); } } @@ -970,7 +974,7 @@ class RDictionaryConverter> return char_levels; } - Status ExtendSetup(SEXP x, int64_t size) { + Status ExtendSetup(SEXP x, int64_t size, int64_t offset) { RVectorType rtype = GetVectorType(x); if (rtype != FACTOR) { return Status::Invalid("invalid R type to convert to dictionary"); @@ -982,17 +986,18 @@ class RDictionaryConverter> RETURN_NOT_OK(this->value_builder_->InsertMemoValues(*memo_array)); // then we can proceed - return this->Reserve(size); + return this->Reserve(size - offset); } - Status ExtendImpl(SEXP values, int64_t size, + Status ExtendImpl(SEXP values, int64_t size, int64_t offset, const std::vector& char_levels) { auto append_null = [this]() { return this->value_builder_->AppendNull(); }; auto append_value = [this, &char_levels](int value) { return this->value_builder_->Append(char_levels[value - 1]); }; - return VisitVector(RVectorIterator(values, 0), size, append_null, append_value); + return VisitVector(RVectorIterator(values, offset), size, append_null, + append_value); } }; @@ -1014,7 +1019,7 @@ struct RConverterTrait> { template class RListConverter : public ListConverter { public: - Status Extend(SEXP x, int64_t size) override { + Status Extend(SEXP x, int64_t size, int64_t offset = 0) override { RETURN_NOT_OK(this->Reserve(size)); RVectorType rtype = GetVectorType(x); @@ -1034,7 +1039,7 @@ class RListConverter : public ListConverter { return this->value_converter_.get()->Extend(value, n); }; - return VisitVector(RVectorIterator(x, 0), size, append_null, append_value); + return VisitVector(RVectorIterator(x, offset), size, append_null, append_value); } void DelayedExtend(SEXP values, int64_t size, RTasks& tasks) override { @@ -1056,12 +1061,12 @@ struct RConverterTrait { class RStructConverter : public StructConverter { public: - Status Extend(SEXP x, int64_t size) override { - RETURN_NOT_OK(ExtendSetup(x, size)); + Status Extend(SEXP x, int64_t size, int64_t offset = 0) override { + RETURN_NOT_OK(ExtendSetup(x, size, offset)); auto fields = this->struct_type_->fields(); R_xlen_t n_columns = XLENGTH(x); - for (R_xlen_t i = 0; i < n_columns; i++) { + for (R_xlen_t i = offset; i < n_columns; i++) { auto status = children_[i]->Extend(VECTOR_ELT(x, i), size); if (!status.ok()) { return Status::Invalid("Problem with column ", (i + 1), " (", fields[i]->name(), @@ -1074,7 +1079,7 @@ class RStructConverter : public StructConverter { void DelayedExtend(SEXP values, int64_t size, RTasks& tasks) override { // the setup runs synchronously first - Status setup = ExtendSetup(values, size); + Status setup = ExtendSetup(values, size, /*offset=*/0); if (!setup.ok()) { // if that fails, propagate the error @@ -1095,7 +1100,7 @@ class RStructConverter : public StructConverter { return StructConverter::Init(pool); } - Status ExtendSetup(SEXP x, int64_t size) { + Status ExtendSetup(SEXP x, int64_t size, int64_t offset) { // check that x is compatible R_xlen_t n_columns = XLENGTH(x); @@ -1133,7 +1138,7 @@ class RStructConverter : public StructConverter { } } - RETURN_NOT_OK(this->Reserve(size)); + RETURN_NOT_OK(this->Reserve(size - offset)); for (R_xlen_t i = 0; i < size; i++) { RETURN_NOT_OK(struct_builder_->Append());