diff --git a/.github/workflows/python.yml b/.github/workflows/python.yml index 981fd61c029..c6781a4b149 100644 --- a/.github/workflows/python.yml +++ b/.github/workflows/python.yml @@ -131,6 +131,7 @@ jobs: ARROW_WITH_BROTLI: ON ARROW_BUILD_TESTS: OFF CMAKE_ARGS: "-DPython3_EXECUTABLE=/usr/local/bin/python3" + PYARROW_TEST_LARGE_MEMORY: ON steps: - name: Checkout Arrow uses: actions/checkout@v2 @@ -139,6 +140,12 @@ jobs: - name: Fetch Submodules and Tags shell: bash run: ci/scripts/util_checkout.sh + - name: Show available RAM size + shell: bash + run: | + hwmemsize=$(sysctl -n hw.memsize) + ramsize=$(expr $hwmemsize / $((1024**3))) + echo "System Memory: ${ramsize} GB" - name: Install Dependencies shell: bash run: | diff --git a/ci/scripts/python_test.sh b/ci/scripts/python_test.sh index 80a9cdef4a3..6e05af89a19 100755 --- a/ci/scripts/python_test.sh +++ b/ci/scripts/python_test.sh @@ -29,4 +29,4 @@ export LD_LIBRARY_PATH=${ARROW_HOME}/lib:${LD_LIBRARY_PATH} # Enable some checks inside Python itself export PYTHONDEVMODE=1 -pytest -r s ${PYTEST_ARGS} --pyargs pyarrow +pytest -r s -v ${PYTEST_ARGS} --pyargs pyarrow diff --git a/cpp/src/arrow/array/builder_binary.h b/cpp/src/arrow/array/builder_binary.h index a60031258ad..c1c664a1249 100644 --- a/cpp/src/arrow/array/builder_binary.h +++ b/cpp/src/arrow/array/builder_binary.h @@ -291,14 +291,7 @@ class BaseBinaryBuilder : public ArrayBuilder { } Status Resize(int64_t capacity) override { - // XXX Why is this check necessary? There is no reason to disallow, say, - // binary arrays with more than 2**31 empty or null values. - if (capacity > memory_limit()) { - return Status::CapacityError("BinaryBuilder cannot reserve space for more than ", - memory_limit(), " child elements, got ", capacity); - } ARROW_RETURN_NOT_OK(CheckCapacity(capacity)); - // One more than requested for offsets ARROW_RETURN_NOT_OK(offsets_builder_.Resize(capacity + 1)); return ArrayBuilder::Resize(capacity); diff --git a/cpp/src/arrow/python/inference.cc b/cpp/src/arrow/python/inference.cc index 9d6707aa11d..5086815f84f 100644 --- a/cpp/src/arrow/python/inference.cc +++ b/cpp/src/arrow/python/inference.cc @@ -379,12 +379,13 @@ class TypeInferrer { // Infer value type from a sequence of values Status VisitSequence(PyObject* obj, PyObject* mask = nullptr) { if (mask == nullptr || mask == Py_None) { - return internal::VisitSequence(obj, [this](PyObject* value, bool* keep_going) { - return Visit(value, keep_going); - }); + return internal::VisitSequence( + obj, /*offset=*/0, + [this](PyObject* value, bool* keep_going) { return Visit(value, keep_going); }); } else { return internal::VisitSequenceMasked( - obj, mask, [this](PyObject* value, uint8_t masked, bool* keep_going) { + obj, mask, /*offset=*/0, + [this](PyObject* value, uint8_t masked, bool* keep_going) { if (!masked) { return Visit(value, keep_going); } else { diff --git a/cpp/src/arrow/python/iterators.h b/cpp/src/arrow/python/iterators.h index 6b0b55342a5..58213ee2dbc 100644 --- a/cpp/src/arrow/python/iterators.h +++ b/cpp/src/arrow/python/iterators.h @@ -36,7 +36,7 @@ namespace internal { // // If keep_going is set to false, the iteration terminates template -inline Status VisitSequenceGeneric(PyObject* obj, VisitorFunc&& func) { +inline Status VisitSequenceGeneric(PyObject* obj, int64_t offset, VisitorFunc&& func) { // VisitorFunc may set to false to terminate iteration bool keep_going = true; @@ -49,7 +49,7 @@ inline Status VisitSequenceGeneric(PyObject* obj, VisitorFunc&& func) { if (PyArray_DESCR(arr_obj)->type_num == NPY_OBJECT) { // It's an array object, we can fetch object pointers directly const Ndarray1DIndexer objects(arr_obj); - for (int64_t i = 0; keep_going && i < objects.size(); ++i) { + for (int64_t i = offset; keep_going && i < objects.size(); ++i) { RETURN_NOT_OK(func(objects[i], i, &keep_going)); } return Status::OK(); @@ -64,7 +64,7 @@ inline Status VisitSequenceGeneric(PyObject* obj, VisitorFunc&& func) { if (PyList_Check(obj) || PyTuple_Check(obj)) { // Use fast item access const Py_ssize_t size = PySequence_Fast_GET_SIZE(obj); - for (Py_ssize_t i = 0; keep_going && i < size; ++i) { + for (Py_ssize_t i = offset; keep_going && i < size; ++i) { PyObject* value = PySequence_Fast_GET_ITEM(obj, i); RETURN_NOT_OK(func(value, static_cast(i), &keep_going)); } @@ -72,7 +72,7 @@ inline Status VisitSequenceGeneric(PyObject* obj, VisitorFunc&& func) { // Regular sequence: avoid making a potentially large copy const Py_ssize_t size = PySequence_Size(obj); RETURN_IF_PYERROR(); - for (Py_ssize_t i = 0; keep_going && i < size; ++i) { + for (Py_ssize_t i = offset; keep_going && i < size; ++i) { OwnedRef value_ref(PySequence_ITEM(obj, i)); RETURN_IF_PYERROR(); RETURN_NOT_OK(func(value_ref.obj(), static_cast(i), &keep_going)); @@ -86,16 +86,17 @@ inline Status VisitSequenceGeneric(PyObject* obj, VisitorFunc&& func) { // Visit sequence with no null mask template -inline Status VisitSequence(PyObject* obj, VisitorFunc&& func) { +inline Status VisitSequence(PyObject* obj, int64_t offset, VisitorFunc&& func) { return VisitSequenceGeneric( - obj, [&func](PyObject* value, int64_t i /* unused */, bool* keep_going) { + obj, offset, [&func](PyObject* value, int64_t i /* unused */, bool* keep_going) { return func(value, keep_going); }); } /// Visit sequence with null mask template -inline Status VisitSequenceMasked(PyObject* obj, PyObject* mo, VisitorFunc&& func) { +inline Status VisitSequenceMasked(PyObject* obj, PyObject* mo, int64_t offset, + VisitorFunc&& func) { if (mo == nullptr || !PyArray_Check(mo)) { return Status::Invalid("Null mask must be NumPy array"); } @@ -115,7 +116,7 @@ inline Status VisitSequenceMasked(PyObject* obj, PyObject* mo, VisitorFunc&& fun Ndarray1DIndexer mask_values(mask); return VisitSequenceGeneric( - obj, [&func, &mask_values](PyObject* value, int64_t i, bool* keep_going) { + obj, offset, [&func, &mask_values](PyObject* value, int64_t i, bool* keep_going) { return func(value, mask_values[i], keep_going); }); } else { @@ -132,7 +133,7 @@ template inline Status VisitIterable(PyObject* obj, VisitorFunc&& func) { if (PySequence_Check(obj)) { // Numpy arrays fall here as well - return VisitSequence(obj, std::forward(func)); + return VisitSequence(obj, /*offset=*/0, std::forward(func)); } // Fall back on the iterator protocol OwnedRef iter_ref(PyObject_GetIter(obj)); diff --git a/cpp/src/arrow/python/numpy_internal.h b/cpp/src/arrow/python/numpy_internal.h index f43599eb3eb..973f577cb13 100644 --- a/cpp/src/arrow/python/numpy_internal.h +++ b/cpp/src/arrow/python/numpy_internal.h @@ -52,7 +52,7 @@ class Ndarray1DIndexer { int64_t size() const { return PyArray_SIZE(arr_); } - T* data() const { return data_; } + const T* data() const { return reinterpret_cast(data_); } bool is_strided() const { return stride_ != sizeof(T); } diff --git a/cpp/src/arrow/python/python_to_arrow.cc b/cpp/src/arrow/python/python_to_arrow.cc index b2d9f1cb5a3..521249fd542 100644 --- a/cpp/src/arrow/python/python_to_arrow.cc +++ b/cpp/src/arrow/python/python_to_arrow.cc @@ -392,22 +392,25 @@ class PyValue { class PyConverter : public Converter { public: // Iterate over the input values and defer the conversion to the Append method - Status Extend(PyObject* values, int64_t size) override { + Status Extend(PyObject* values, int64_t size, int64_t offset = 0) override { + DCHECK_GE(size, offset); /// Ensure we've allocated enough space - RETURN_NOT_OK(this->Reserve(size)); + RETURN_NOT_OK(this->Reserve(size - offset)); // Iterate over the items adding each one - return internal::VisitSequence(values, [this](PyObject* item, bool* /* unused */) { - return this->Append(item); - }); + return internal::VisitSequence( + values, offset, + [this](PyObject* item, bool* /* unused */) { return this->Append(item); }); } // Convert and append a sequence of values masked with a numpy array - Status ExtendMasked(PyObject* values, PyObject* mask, int64_t size) override { + Status ExtendMasked(PyObject* values, PyObject* mask, int64_t size, + int64_t offset = 0) override { + DCHECK_GE(size, offset); /// Ensure we've allocated enough space - RETURN_NOT_OK(this->Reserve(size)); + RETURN_NOT_OK(this->Reserve(size - offset)); // Iterate over the items adding each one return internal::VisitSequenceMasked( - values, mask, [this](PyObject* item, bool is_masked, bool* /* unused */) { + values, mask, offset, [this](PyObject* item, bool is_masked, bool* /* unused */) { if (is_masked) { return this->AppendNull(); } else { @@ -514,34 +517,6 @@ class PyPrimitiveConverter< } }; -template -class PyPrimitiveConverter> - : public PrimitiveConverter { - public: - using OffsetType = typename T::offset_type; - - Status Append(PyObject* value) override { - if (PyValue::IsNull(this->options_, value)) { - this->primitive_builder_->UnsafeAppendNull(); - } else { - ARROW_RETURN_NOT_OK( - PyValue::Convert(this->primitive_type_, this->options_, value, view_)); - // Since we don't know the varying length input size in advance, we need to - // reserve space in the value builder one by one. ReserveData raises CapacityError - // if the value would not fit into the array. - ARROW_RETURN_NOT_OK(this->primitive_builder_->ReserveData(view_.size)); - this->primitive_builder_->UnsafeAppend(view_.bytes, - static_cast(view_.size)); - } - return Status::OK(); - } - - protected: - // Create a single instance of PyBytesView here to prevent unnecessary object - // creation/destruction. This significantly improves the conversion performance. - PyBytesView view_; -}; - template class PyPrimitiveConverter::value>> : public PrimitiveConverter { @@ -563,7 +538,7 @@ class PyPrimitiveConverter:: }; template -class PyPrimitiveConverter> +class PyPrimitiveConverter> : public PrimitiveConverter { public: using OffsetType = typename T::offset_type; @@ -578,6 +553,9 @@ class PyPrimitiveConverter> // observed binary value observed_binary_ = true; } + // Since we don't know the varying length input size in advance, we need to + // reserve space in the value builder one by one. ReserveData raises CapacityError + // if the value would not fit into the array. ARROW_RETURN_NOT_OK(this->primitive_builder_->ReserveData(view_.size)); this->primitive_builder_->UnsafeAppend(view_.bytes, static_cast(view_.size)); @@ -728,7 +706,6 @@ class PyListConverter : public ListConverter { auto value_builder = checked_cast(this->value_converter_->builder().get()); - // TODO(wesm): Vector append when not strided Ndarray1DIndexer values(ndarray); if (null_sentinels_possible) { for (int64_t i = 0; i < values.size(); ++i) { @@ -738,6 +715,8 @@ class PyListConverter : public ListConverter { RETURN_NOT_OK(value_builder->Append(values[i])); } } + } else if (!values.is_strided()) { + RETURN_NOT_OK(value_builder->AppendValues(values.data(), values.size())); } else { for (int64_t i = 0; i < values.size(); ++i) { RETURN_NOT_OK(value_builder->Append(values[i])); diff --git a/cpp/src/arrow/util/converter.h b/cpp/src/arrow/util/converter.h index 2c40a48726b..0b29e0f5bc7 100644 --- a/cpp/src/arrow/util/converter.h +++ b/cpp/src/arrow/util/converter.h @@ -54,11 +54,12 @@ class Converter { virtual Status Append(InputType value) { return Status::NotImplemented("Append"); } - virtual Status Extend(InputType values, int64_t size) { + virtual Status Extend(InputType values, int64_t size, int64_t offset = 0) { return Status::NotImplemented("Extend"); } - virtual Status ExtendMasked(InputType values, InputType mask, int64_t size) { + virtual Status ExtendMasked(InputType values, InputType mask, int64_t size, + int64_t offset = 0) { return Status::NotImplemented("ExtendMasked"); } @@ -70,6 +71,8 @@ class Converter { bool may_overflow() const { return may_overflow_; } + bool rewind_on_overflow() const { return rewind_on_overflow_; } + virtual Status Reserve(int64_t additional_capacity) { return builder_->Reserve(additional_capacity); } @@ -96,6 +99,7 @@ class Converter { std::shared_ptr builder_; OptionsType options_; bool may_overflow_ = false; + bool rewind_on_overflow_ = false; }; template @@ -134,7 +138,8 @@ class ListConverter : public BaseConverter { std::make_shared(pool, value_converter_->builder(), this->type_); list_builder_ = checked_cast(this->builder_.get()); // Narrow list types may overflow - this->may_overflow_ = sizeof(typename ArrowType::offset_type) < sizeof(int64_t); + this->may_overflow_ = this->rewind_on_overflow_ = + sizeof(typename ArrowType::offset_type) < sizeof(int64_t); return Status::OK(); } @@ -167,6 +172,7 @@ class StructConverter : public BaseConverter { (MakeConverter( field->type(), this->options_, pool))); this->may_overflow_ |= child_converter->may_overflow(); + this->rewind_on_overflow_ = this->may_overflow_; child_builders.push_back(child_converter->builder()); children_.push_back(std::move(child_converter)); } @@ -302,32 +308,69 @@ class Chunker { return status; } - // we could get bit smarter here since the whole batch of appendable values - // will be rejected if a capacity error is raised - Status Extend(InputType values, int64_t size) { - auto status = converter_->Extend(values, size); - if (ARROW_PREDICT_FALSE(status.IsCapacityError())) { - if (converter_->builder()->length() == 0) { + Status Extend(InputType values, int64_t size, int64_t offset = 0) { + while (offset < size) { + auto length_before = converter_->builder()->length(); + auto status = converter_->Extend(values, size, offset); + auto length_after = converter_->builder()->length(); + auto num_converted = length_after - length_before; + + offset += num_converted; + length_ += num_converted; + + if (status.IsCapacityError()) { + if (converter_->builder()->length() == 0) { + // Builder length == 0 means the individual element is too large to append. + // In this case, no need to try again. + return status; + } else if (converter_->rewind_on_overflow()) { + // The list-like and binary-like conversion paths may raise a capacity error, + // we need to handle them differently. While the binary-like converters check + // the capacity before append/extend the list-like converters just check after + // append/extend. Thus depending on the implementation semantics we may need + // to rewind (slice) the output chunk by one. + length_ -= 1; + offset -= 1; + } + ARROW_RETURN_NOT_OK(FinishChunk()); + } else if (!status.ok()) { return status; } - ARROW_RETURN_NOT_OK(FinishChunk()); - return Extend(values, size); } - length_ += size; - return status; + return Status::OK(); } - Status ExtendMasked(InputType values, InputType mask, int64_t size) { - auto status = converter_->ExtendMasked(values, mask, size); - if (ARROW_PREDICT_FALSE(status.IsCapacityError())) { - if (converter_->builder()->length() == 0) { + Status ExtendMasked(InputType values, InputType mask, int64_t size, + int64_t offset = 0) { + while (offset < size) { + auto length_before = converter_->builder()->length(); + auto status = converter_->ExtendMasked(values, mask, size, offset); + auto length_after = converter_->builder()->length(); + auto num_converted = length_after - length_before; + + offset += num_converted; + length_ += num_converted; + + if (status.IsCapacityError()) { + if (converter_->builder()->length() == 0) { + // Builder length == 0 means the individual element is too large to append. + // In this case, no need to try again. + return status; + } else if (converter_->rewind_on_overflow()) { + // The list-like and binary-like conversion paths may raise a capacity error, + // we need to handle them differently. While the binary-like converters check + // the capacity before append/extend the list-like converters just check after + // append/extend. Thus depending on the implementation semantics we may need + // to rewind (slice) the output chunk by one. + length_ -= 1; + offset -= 1; + } + ARROW_RETURN_NOT_OK(FinishChunk()); + } else if (!status.ok()) { return status; } - ARROW_RETURN_NOT_OK(FinishChunk()); - return ExtendMasked(values, mask, size); } - length_ += size; - return status; + return Status::OK(); } Status FinishChunk() { diff --git a/python/pyarrow/tests/parquet/test_data_types.py b/python/pyarrow/tests/parquet/test_data_types.py index 850dff94df4..bdbc6b7b5a5 100644 --- a/python/pyarrow/tests/parquet/test_data_types.py +++ b/python/pyarrow/tests/parquet/test_data_types.py @@ -404,6 +404,7 @@ def test_fixed_size_binary(): # ----------------------------------------------------------------------------- +@pytest.mark.slow @pytest.mark.large_memory def test_large_table_int32_overflow(): size = np.iinfo('int32').max + 1 @@ -424,6 +425,7 @@ def _simple_table_roundtrip(table, use_legacy_dataset=False, **write_kwargs): return _read_table(buf, use_legacy_dataset=use_legacy_dataset) +@pytest.mark.slow @pytest.mark.large_memory @parametrize_legacy_dataset def test_byte_array_exactly_2gb(use_legacy_dataset): @@ -444,6 +446,7 @@ def test_byte_array_exactly_2gb(use_legacy_dataset): assert t.equals(result) +@pytest.mark.slow @pytest.mark.pandas @pytest.mark.large_memory @parametrize_legacy_dataset @@ -469,6 +472,7 @@ def test_binary_array_overflow_to_chunked(use_legacy_dataset): assert tbl.equals(read_tbl) +@pytest.mark.slow @pytest.mark.pandas @pytest.mark.large_memory @parametrize_legacy_dataset @@ -499,6 +503,7 @@ def test_large_binary(): _check_roundtrip(table, use_dictionary=use_dictionary) +@pytest.mark.slow @pytest.mark.large_memory def test_large_binary_huge(): s = b'xy' * 997 diff --git a/python/pyarrow/tests/test_array.py b/python/pyarrow/tests/test_array.py index 30500bc3c5b..9f6ab678a95 100644 --- a/python/pyarrow/tests/test_array.py +++ b/python/pyarrow/tests/test_array.py @@ -2630,6 +2630,7 @@ def test_array_from_numpy_str_utf8(): pa.array(vec, pa.string(), mask=np.array([False])) +@pytest.mark.slow @pytest.mark.large_memory def test_numpy_binary_overflow_to_chunked(): # ARROW-3762, ARROW-5966 diff --git a/python/pyarrow/tests/test_convert_builtin.py b/python/pyarrow/tests/test_convert_builtin.py index ba94b340bd3..1a500b8523f 100644 --- a/python/pyarrow/tests/test_convert_builtin.py +++ b/python/pyarrow/tests/test_convert_builtin.py @@ -756,6 +756,7 @@ def test_large_binary_array(ty): assert len(arr) == nrepeats +@pytest.mark.slow @pytest.mark.large_memory @pytest.mark.parametrize("ty", [pa.large_binary(), pa.large_string()]) def test_large_binary_value(ty): @@ -2169,7 +2170,6 @@ def test_auto_chunking_list_of_binary(): assert arr.chunk(1).to_pylist() == [['x' * 1024]] * 2 -@pytest.mark.slow @pytest.mark.large_memory def test_auto_chunking_list_like(): item = np.ones((2**28,), dtype='uint8') @@ -2185,7 +2185,11 @@ def test_auto_chunking_list_like(): assert arr.num_chunks == 2 assert len(arr.chunk(0)) == 7 assert len(arr.chunk(1)) == 1 - assert arr.chunk(1)[0].as_py() == list(item) + chunk = arr.chunk(1) + scalar = chunk[0] + assert isinstance(scalar, pa.ListScalar) + expected = pa.array(item, type=pa.uint8()) + assert scalar.values == expected @pytest.mark.slow @@ -2232,3 +2236,49 @@ def test_nested_auto_chunking(ty, char): 'integer': 1, 'string-like': char } + + +@pytest.mark.large_memory +def test_array_from_pylist_data_overflow(): + # Regression test for ARROW-12983 + # Data buffer overflow - should result in chunked array + items = [b'a' * 4096] * (2 ** 19) + arr = pa.array(items, type=pa.string()) + assert isinstance(arr, pa.ChunkedArray) + assert len(arr) == 2**19 + assert len(arr.chunks) > 1 + + mask = np.zeros(2**19, bool) + arr = pa.array(items, mask=mask, type=pa.string()) + assert isinstance(arr, pa.ChunkedArray) + assert len(arr) == 2**19 + assert len(arr.chunks) > 1 + + arr = pa.array(items, type=pa.binary()) + assert isinstance(arr, pa.ChunkedArray) + assert len(arr) == 2**19 + assert len(arr.chunks) > 1 + + +@pytest.mark.slow +@pytest.mark.large_memory +def test_array_from_pylist_offset_overflow(): + # Regression test for ARROW-12983 + # Offset buffer overflow - should result in chunked array + # Note this doesn't apply to primitive arrays + items = [b'a'] * (2 ** 31) + arr = pa.array(items, type=pa.string()) + assert isinstance(arr, pa.ChunkedArray) + assert len(arr) == 2**31 + assert len(arr.chunks) > 1 + + mask = np.zeros(2**31, bool) + arr = pa.array(items, mask=mask, type=pa.string()) + assert isinstance(arr, pa.ChunkedArray) + assert len(arr) == 2**31 + assert len(arr.chunks) > 1 + + arr = pa.array(items, type=pa.binary()) + assert isinstance(arr, pa.ChunkedArray) + assert len(arr) == 2**31 + assert len(arr.chunks) > 1 diff --git a/python/pyarrow/tests/test_pandas.py b/python/pyarrow/tests/test_pandas.py index 7f904433fa2..b6557875c2c 100644 --- a/python/pyarrow/tests/test_pandas.py +++ b/python/pyarrow/tests/test_pandas.py @@ -2167,20 +2167,19 @@ def test_list_of_dictionary(self): expected[2] = None tm.assert_series_equal(arr.to_pandas(), expected) - @pytest.mark.slow @pytest.mark.large_memory def test_auto_chunking_on_list_overflow(self): # ARROW-9976 - n = 2**24 + n = 2**21 df = pd.DataFrame.from_dict({ - "a": list(np.zeros((n, 2**7), dtype='uint8')), + "a": list(np.zeros((n, 2**10), dtype='uint8')), "b": range(n) }) table = pa.Table.from_pandas(df) column_a = table[0] assert column_a.num_chunks == 2 - assert len(column_a.chunk(0)) == 2**24 - 1 + assert len(column_a.chunk(0)) == 2**21 - 1 assert len(column_a.chunk(1)) == 1 def test_map_array_roundtrip(self): @@ -2356,6 +2355,7 @@ def test_from_numpy_nested(self): {'x': {'xx': 1, 'yy': True}, 'y': 2, 'z': 'foo'}, {'x': {'xx': 3, 'yy': False}, 'y': 4, 'z': 'bar'}] + @pytest.mark.slow @pytest.mark.large_memory def test_from_numpy_large(self): # Exercise rechunking + nulls diff --git a/r/src/r_to_arrow.cpp b/r/src/r_to_arrow.cpp index d0f4f3a6def..683e8f278e8 100644 --- a/r/src/r_to_arrow.cpp +++ b/r/src/r_to_arrow.cpp @@ -326,7 +326,7 @@ class RConverter : public Converter { public: virtual Status Append(SEXP) { return Status::NotImplemented("Append"); } - virtual Status Extend(SEXP values, int64_t size) { + virtual Status Extend(SEXP values, int64_t size, int64_t offset = 0) { return Status::NotImplemented("Extend"); } @@ -337,7 +337,7 @@ class RConverter : public Converter { tasks.Append(false, task); } - virtual Status ExtendMasked(SEXP values, SEXP mask, int64_t size) { + virtual Status ExtendMasked(SEXP values, SEXP mask, int64_t size, int64_t offset = 0) { return Status::NotImplemented("ExtendMasked"); } }; @@ -434,8 +434,8 @@ template class RPrimitiveConverter> : public PrimitiveConverter { public: - Status Extend(SEXP, int64_t size) override { - return this->primitive_builder_->AppendNulls(size); + Status Extend(SEXP, int64_t size, int64_t offset = 0) override { + return this->primitive_builder_->AppendNulls(size - offset); } }; @@ -445,17 +445,17 @@ class RPrimitiveConverter< T, enable_if_t::value || is_floating_type::value>> : public PrimitiveConverter { public: - Status Extend(SEXP x, int64_t size) override { + Status Extend(SEXP x, int64_t size, int64_t offset = 0) override { auto rtype = GetVectorType(x); switch (rtype) { case UINT8: - return ExtendDispatch(x, size); + return ExtendDispatch(x, size, offset); case INT32: - return ExtendDispatch(x, size); + return ExtendDispatch(x, size, offset); case FLOAT64: - return ExtendDispatch(x, size); + return ExtendDispatch(x, size, offset); case INT64: - return ExtendDispatch(x, size); + return ExtendDispatch(x, size, offset); default: break; @@ -471,14 +471,14 @@ class RPrimitiveConverter< private: template - Status ExtendDispatch(SEXP x, int64_t size) { + Status ExtendDispatch(SEXP x, int64_t size, int64_t offset) { if (ALTREP(x)) { // `x` is an ALTREP R vector storing `r_value_type` // and that type matches exactly the type of the array this is building - return Extend_impl(RVectorIterator_ALTREP(x, 0), size); + return Extend_impl(RVectorIterator_ALTREP(x, offset), size); } else { // `x` is not an ALTREP vector so we have direct access to a range of values - return Extend_impl(RVectorIterator(x, 0), size); + return Extend_impl(RVectorIterator(x, offset), size); } } @@ -514,16 +514,16 @@ template class RPrimitiveConverter::value>> : public PrimitiveConverter { public: - Status Extend(SEXP x, int64_t size) override { + Status Extend(SEXP x, int64_t size, int64_t offset = 0) override { auto rtype = GetVectorType(x); if (rtype != BOOLEAN) { return Status::Invalid("Expecting a logical vector"); } if (ALTREP(x)) { - return Extend_impl(RVectorIterator_ALTREP(x, 0), size); + return Extend_impl(RVectorIterator_ALTREP(x, offset), size); } else { - return Extend_impl(RVectorIterator(x, 0), size); + return Extend_impl(RVectorIterator(x, offset), size); } } @@ -553,16 +553,16 @@ template class RPrimitiveConverter::value>> : public PrimitiveConverter { public: - Status Extend(SEXP x, int64_t size) override { + Status Extend(SEXP x, int64_t size, int64_t offset = 0) override { switch (GetVectorType(x)) { case DATE_INT: - return AppendRange_Date_dispatch(x, size); + return AppendRange_Date_dispatch(x, size, offset); case DATE_DBL: - return AppendRange_Date_dispatch(x, size); + return AppendRange_Date_dispatch(x, size, offset); case POSIXCT: - return AppendRange_Posixct_dispatch(x, size); + return AppendRange_Posixct_dispatch(x, size, offset); default: break; @@ -578,11 +578,12 @@ class RPrimitiveConverter::value>> private: template - Status AppendRange_Date_dispatch(SEXP x, int64_t size) { + Status AppendRange_Date_dispatch(SEXP x, int64_t size, int64_t offset) { if (ALTREP(x)) { - return AppendRange_Date(RVectorIterator_ALTREP(x, 0), size); + return AppendRange_Date(RVectorIterator_ALTREP(x, offset), + size - offset); } else { - return AppendRange_Date(RVectorIterator(x, 0), size); + return AppendRange_Date(RVectorIterator(x, offset), size - offset); } } @@ -602,11 +603,12 @@ class RPrimitiveConverter::value>> return VisitVector(it, size, append_null, append_value); } - Status AppendRange_Posixct_dispatch(SEXP x, int64_t size) { + Status AppendRange_Posixct_dispatch(SEXP x, int64_t size, int64_t offset) { if (ALTREP(x)) { - return AppendRange_Posixct(RVectorIterator_ALTREP(x, 0), size); + return AppendRange_Posixct(RVectorIterator_ALTREP(x, offset), + size - offset); } else { - return AppendRange_Posixct(RVectorIterator(x, 0), size); + return AppendRange_Posixct(RVectorIterator(x, offset), size - offset); } } @@ -660,8 +662,8 @@ template class RPrimitiveConverter::value>> : public PrimitiveConverter { public: - Status Extend(SEXP x, int64_t size) override { - RETURN_NOT_OK(this->Reserve(size)); + Status Extend(SEXP x, int64_t size, int64_t offset = 0) override { + RETURN_NOT_OK(this->Reserve(size - offset)); auto rtype = GetVectorType(x); if (rtype != TIME) { return Status::Invalid("Invalid conversion to time"); @@ -699,10 +701,11 @@ class RPrimitiveConverter::value>> }; if (ALTREP(x)) { - return VisitVector(RVectorIterator_ALTREP(x, 0), size, append_null, + return VisitVector(RVectorIterator_ALTREP(x, offset), size, append_null, append_value); } else { - return VisitVector(RVectorIterator(x, 0), size, append_null, append_value); + return VisitVector(RVectorIterator(x, offset), size, append_null, + append_value); } } @@ -716,8 +719,8 @@ template class RPrimitiveConverter::value>> : public PrimitiveConverter { public: - Status Extend(SEXP x, int64_t size) override { - RETURN_NOT_OK(this->Reserve(size)); + Status Extend(SEXP x, int64_t size, int64_t offset = 0) override { + RETURN_NOT_OK(this->Reserve(size - offset)); RVectorType rtype = GetVectorType(x); if (rtype != POSIXCT) { @@ -737,10 +740,11 @@ class RPrimitiveConverter::value>> }; if (ALTREP(x)) { - return VisitVector(RVectorIterator_ALTREP(x, 0), size, append_null, + return VisitVector(RVectorIterator_ALTREP(x, offset), size, append_null, append_value); } else { - return VisitVector(RVectorIterator(x, 0), size, append_null, append_value); + return VisitVector(RVectorIterator(x, offset), size, append_null, + append_value); } } @@ -754,7 +758,7 @@ template class RPrimitiveConverter::value>> : public PrimitiveConverter { public: - Status Extend(SEXP x, int64_t size) override { + Status Extend(SEXP x, int64_t size, int64_t offset = 0) override { return Status::NotImplemented("Extend"); } }; @@ -786,8 +790,8 @@ class RPrimitiveConverter> public: using OffsetType = typename T::offset_type; - Status Extend(SEXP x, int64_t size) override { - RETURN_NOT_OK(this->Reserve(size)); + Status Extend(SEXP x, int64_t size, int64_t offset = 0) override { + RETURN_NOT_OK(this->Reserve(size - offset)); RETURN_NOT_OK(check_binary(x, size)); auto append_null = [this]() { @@ -801,7 +805,7 @@ class RPrimitiveConverter> this->primitive_builder_->UnsafeAppend(RAW_RO(raw), static_cast(n)); return Status::OK(); }; - return VisitVector(RVectorIterator(x, 0), size, append_null, append_value); + return VisitVector(RVectorIterator(x, offset), size, append_null, append_value); } void DelayedExtend(SEXP values, int64_t size, RTasks& tasks) override { @@ -814,8 +818,8 @@ template class RPrimitiveConverter::value>> : public PrimitiveConverter { public: - Status Extend(SEXP x, int64_t size) override { - RETURN_NOT_OK(this->Reserve(size)); + Status Extend(SEXP x, int64_t size, int64_t offset = 0) override { + RETURN_NOT_OK(this->Reserve(size - offset)); RETURN_NOT_OK(check_binary(x, size)); auto append_null = [this]() { @@ -833,7 +837,7 @@ class RPrimitiveConverter::v this->primitive_builder_->UnsafeAppend(RAW_RO(raw)); return Status::OK(); }; - return VisitVector(RVectorIterator(x, 0), size, append_null, append_value); + return VisitVector(RVectorIterator(x, offset), size, append_null, append_value); } void DelayedExtend(SEXP values, int64_t size, RTasks& tasks) override { @@ -848,12 +852,12 @@ class RPrimitiveConverter> public: using OffsetType = typename T::offset_type; - Status Extend(SEXP x, int64_t size) override { + Status Extend(SEXP x, int64_t size, int64_t offset = 0) override { RVectorType rtype = GetVectorType(x); if (rtype != STRING) { return Status::Invalid("Expecting a character vector"); } - return UnsafeAppendUtf8Strings(arrow::r::utf8_strings(x), size); + return UnsafeAppendUtf8Strings(arrow::r::utf8_strings(x), size, offset); } void DelayedExtend(SEXP values, int64_t size, RTasks& tasks) override { @@ -863,14 +867,14 @@ class RPrimitiveConverter> } private: - Status UnsafeAppendUtf8Strings(const cpp11::strings& s, int64_t size) { + Status UnsafeAppendUtf8Strings(const cpp11::strings& s, int64_t size, int64_t offset) { RETURN_NOT_OK(this->primitive_builder_->Reserve(s.size())); const SEXP* p_strings = reinterpret_cast(DATAPTR_RO(s)); // we know all the R strings are utf8 already, so we can get // a definite size and then use UnsafeAppend*() int64_t total_length = 0; - for (R_xlen_t i = 0; i < size; i++, ++p_strings) { + for (R_xlen_t i = offset; i < size; i++, ++p_strings) { SEXP si = *p_strings; total_length += si == NA_STRING ? 0 : LENGTH(si); } @@ -878,7 +882,7 @@ class RPrimitiveConverter> // append p_strings = reinterpret_cast(DATAPTR_RO(s)); - for (R_xlen_t i = 0; i < size; i++, ++p_strings) { + for (R_xlen_t i = offset; i < size; i++, ++p_strings) { SEXP si = *p_strings; if (si == NA_STRING) { this->primitive_builder_->UnsafeAppendNull(); @@ -895,7 +899,7 @@ template class RPrimitiveConverter::value>> : public PrimitiveConverter { public: - Status Extend(SEXP x, int64_t size) override { + Status Extend(SEXP x, int64_t size, int64_t offset = 0) override { // TODO: look in lubridate return Status::NotImplemented("Extend"); } @@ -911,7 +915,7 @@ template class RDictionaryConverter> : public DictionaryConverter { public: - Status Extend(SEXP x, int64_t size) override { + Status Extend(SEXP x, int64_t size, int64_t offset = 0) override { return Status::NotImplemented("Extend"); } }; @@ -922,14 +926,14 @@ class RDictionaryConverter> public: using BuilderType = DictionaryBuilder; - Status Extend(SEXP x, int64_t size) override { - RETURN_NOT_OK(ExtendSetup(x, size)); - return ExtendImpl(x, size, GetCharLevels(x)); + Status Extend(SEXP x, int64_t size, int64_t offset = 0) override { + RETURN_NOT_OK(ExtendSetup(x, size, offset)); + return ExtendImpl(x, size, offset, GetCharLevels(x)); } void DelayedExtend(SEXP values, int64_t size, RTasks& tasks) override { // the setup runs synchronously first - Status setup = ExtendSetup(values, size); + Status setup = ExtendSetup(values, size, /*offset=*/0); if (!setup.ok()) { // if that fails, propagate the error @@ -938,7 +942,7 @@ class RDictionaryConverter> auto char_levels = GetCharLevels(values); tasks.Append(true, [this, values, size, char_levels]() { - return this->ExtendImpl(values, size, char_levels); + return this->ExtendImpl(values, size, /*offset=*/0, char_levels); }); } } @@ -970,7 +974,7 @@ class RDictionaryConverter> return char_levels; } - Status ExtendSetup(SEXP x, int64_t size) { + Status ExtendSetup(SEXP x, int64_t size, int64_t offset) { RVectorType rtype = GetVectorType(x); if (rtype != FACTOR) { return Status::Invalid("invalid R type to convert to dictionary"); @@ -982,17 +986,18 @@ class RDictionaryConverter> RETURN_NOT_OK(this->value_builder_->InsertMemoValues(*memo_array)); // then we can proceed - return this->Reserve(size); + return this->Reserve(size - offset); } - Status ExtendImpl(SEXP values, int64_t size, + Status ExtendImpl(SEXP values, int64_t size, int64_t offset, const std::vector& char_levels) { auto append_null = [this]() { return this->value_builder_->AppendNull(); }; auto append_value = [this, &char_levels](int value) { return this->value_builder_->Append(char_levels[value - 1]); }; - return VisitVector(RVectorIterator(values, 0), size, append_null, append_value); + return VisitVector(RVectorIterator(values, offset), size, append_null, + append_value); } }; @@ -1014,7 +1019,7 @@ struct RConverterTrait> { template class RListConverter : public ListConverter { public: - Status Extend(SEXP x, int64_t size) override { + Status Extend(SEXP x, int64_t size, int64_t offset = 0) override { RETURN_NOT_OK(this->Reserve(size)); RVectorType rtype = GetVectorType(x); @@ -1034,7 +1039,7 @@ class RListConverter : public ListConverter { return this->value_converter_.get()->Extend(value, n); }; - return VisitVector(RVectorIterator(x, 0), size, append_null, append_value); + return VisitVector(RVectorIterator(x, offset), size, append_null, append_value); } void DelayedExtend(SEXP values, int64_t size, RTasks& tasks) override { @@ -1056,12 +1061,12 @@ struct RConverterTrait { class RStructConverter : public StructConverter { public: - Status Extend(SEXP x, int64_t size) override { - RETURN_NOT_OK(ExtendSetup(x, size)); + Status Extend(SEXP x, int64_t size, int64_t offset = 0) override { + RETURN_NOT_OK(ExtendSetup(x, size, offset)); auto fields = this->struct_type_->fields(); R_xlen_t n_columns = XLENGTH(x); - for (R_xlen_t i = 0; i < n_columns; i++) { + for (R_xlen_t i = offset; i < n_columns; i++) { auto status = children_[i]->Extend(VECTOR_ELT(x, i), size); if (!status.ok()) { return Status::Invalid("Problem with column ", (i + 1), " (", fields[i]->name(), @@ -1074,7 +1079,7 @@ class RStructConverter : public StructConverter { void DelayedExtend(SEXP values, int64_t size, RTasks& tasks) override { // the setup runs synchronously first - Status setup = ExtendSetup(values, size); + Status setup = ExtendSetup(values, size, /*offset=*/0); if (!setup.ok()) { // if that fails, propagate the error @@ -1095,7 +1100,7 @@ class RStructConverter : public StructConverter { return StructConverter::Init(pool); } - Status ExtendSetup(SEXP x, int64_t size) { + Status ExtendSetup(SEXP x, int64_t size, int64_t offset) { // check that x is compatible R_xlen_t n_columns = XLENGTH(x); @@ -1133,7 +1138,7 @@ class RStructConverter : public StructConverter { } } - RETURN_NOT_OK(this->Reserve(size)); + RETURN_NOT_OK(this->Reserve(size - offset)); for (R_xlen_t i = 0; i < size; i++) { RETURN_NOT_OK(struct_builder_->Append());