From abef73eb2e62f1b5eaca67888b4b4dca38752baf Mon Sep 17 00:00:00 2001
From: David Li
Date: Mon, 7 Jun 2021 14:15:23 -0400
Subject: [PATCH 1/6] ARROW-12983: [C++][Python] Properly convert large Python
sequences to chunked arrays
---
cpp/src/arrow/python/iterators.h | 35 +++++++---
cpp/src/arrow/python/python_to_arrow.cc | 89 ++++++++++++++++++++++---
cpp/src/arrow/util/converter.h | 69 +++++++++++++------
python/.gitignore | 1 +
python/pyarrow/tests/test_array.py | 47 +++++++++++++
5 files changed, 202 insertions(+), 39 deletions(-)
diff --git a/cpp/src/arrow/python/iterators.h b/cpp/src/arrow/python/iterators.h
index 6b0b55342a5..6ea527119e4 100644
--- a/cpp/src/arrow/python/iterators.h
+++ b/cpp/src/arrow/python/iterators.h
@@ -36,7 +36,7 @@ namespace internal {
//
// If keep_going is set to false, the iteration terminates
template
-inline Status VisitSequenceGeneric(PyObject* obj, VisitorFunc&& func) {
+inline Status VisitSequenceGeneric(PyObject* obj, int64_t offset, VisitorFunc&& func) {
// VisitorFunc may set to false to terminate iteration
bool keep_going = true;
@@ -49,7 +49,7 @@ inline Status VisitSequenceGeneric(PyObject* obj, VisitorFunc&& func) {
if (PyArray_DESCR(arr_obj)->type_num == NPY_OBJECT) {
// It's an array object, we can fetch object pointers directly
const Ndarray1DIndexer objects(arr_obj);
- for (int64_t i = 0; keep_going && i < objects.size(); ++i) {
+ for (int64_t i = offset; keep_going && i < objects.size(); ++i) {
RETURN_NOT_OK(func(objects[i], i, &keep_going));
}
return Status::OK();
@@ -64,7 +64,7 @@ inline Status VisitSequenceGeneric(PyObject* obj, VisitorFunc&& func) {
if (PyList_Check(obj) || PyTuple_Check(obj)) {
// Use fast item access
const Py_ssize_t size = PySequence_Fast_GET_SIZE(obj);
- for (Py_ssize_t i = 0; keep_going && i < size; ++i) {
+ for (Py_ssize_t i = offset; keep_going && i < size; ++i) {
PyObject* value = PySequence_Fast_GET_ITEM(obj, i);
RETURN_NOT_OK(func(value, static_cast(i), &keep_going));
}
@@ -72,7 +72,7 @@ inline Status VisitSequenceGeneric(PyObject* obj, VisitorFunc&& func) {
// Regular sequence: avoid making a potentially large copy
const Py_ssize_t size = PySequence_Size(obj);
RETURN_IF_PYERROR();
- for (Py_ssize_t i = 0; keep_going && i < size; ++i) {
+ for (Py_ssize_t i = offset; keep_going && i < size; ++i) {
OwnedRef value_ref(PySequence_ITEM(obj, i));
RETURN_IF_PYERROR();
RETURN_NOT_OK(func(value_ref.obj(), static_cast(i), &keep_going));
@@ -84,18 +84,29 @@ inline Status VisitSequenceGeneric(PyObject* obj, VisitorFunc&& func) {
return Status::OK();
}
-// Visit sequence with no null mask
+// Visit sequence with no null mask or offset
template
inline Status VisitSequence(PyObject* obj, VisitorFunc&& func) {
return VisitSequenceGeneric(
- obj, [&func](PyObject* value, int64_t i /* unused */, bool* keep_going) {
+ obj, /*offset=*/0,
+ [&func](PyObject* value, int64_t i /* unused */, bool* keep_going) {
return func(value, keep_going);
});
}
-/// Visit sequence with null mask
+// Visit sequence with offset but no null mask
template
-inline Status VisitSequenceMasked(PyObject* obj, PyObject* mo, VisitorFunc&& func) {
+inline Status VisitSequence(PyObject* obj, int64_t offset, VisitorFunc&& func) {
+ return VisitSequenceGeneric(
+ obj, offset, [&func](PyObject* value, int64_t i /* unused */, bool* keep_going) {
+ return func(value, keep_going);
+ });
+}
+
+/// Visit sequence with null mask and offset
+template
+inline Status VisitSequenceMasked(PyObject* obj, PyObject* mo, int64_t offset,
+ VisitorFunc&& func) {
if (mo == nullptr || !PyArray_Check(mo)) {
return Status::Invalid("Null mask must be NumPy array");
}
@@ -115,7 +126,7 @@ inline Status VisitSequenceMasked(PyObject* obj, PyObject* mo, VisitorFunc&& fun
Ndarray1DIndexer mask_values(mask);
return VisitSequenceGeneric(
- obj, [&func, &mask_values](PyObject* value, int64_t i, bool* keep_going) {
+ obj, offset, [&func, &mask_values](PyObject* value, int64_t i, bool* keep_going) {
return func(value, mask_values[i], keep_going);
});
} else {
@@ -123,6 +134,12 @@ inline Status VisitSequenceMasked(PyObject* obj, PyObject* mo, VisitorFunc&& fun
}
}
+/// Visit sequence with null mask but no offset
+template
+inline Status VisitSequenceMasked(PyObject* obj, PyObject* mo, VisitorFunc&& func) {
+ return VisitSequenceMasked(obj, mo, /*offset=*/0, func);
+}
+
// Like IterateSequence, but accepts any generic iterable (including
// non-restartable iterators, e.g. generators).
//
diff --git a/cpp/src/arrow/python/python_to_arrow.cc b/cpp/src/arrow/python/python_to_arrow.cc
index b2d9f1cb5a3..7076ec35363 100644
--- a/cpp/src/arrow/python/python_to_arrow.cc
+++ b/cpp/src/arrow/python/python_to_arrow.cc
@@ -392,22 +392,25 @@ class PyValue {
class PyConverter : public Converter {
public:
// Iterate over the input values and defer the conversion to the Append method
- Status Extend(PyObject* values, int64_t size) override {
- /// Ensure we've allocated enough space
- RETURN_NOT_OK(this->Reserve(size));
+ Status Extend(PyObject* values, int64_t size, int64_t offset = 0) override {
+ DCHECK_GE(size, offset);
+ // Ensure we've allocated enough space
+ RETURN_NOT_OK(this->Reserve(size - offset));
// Iterate over the items adding each one
- return internal::VisitSequence(values, [this](PyObject* item, bool* /* unused */) {
- return this->Append(item);
- });
+ return internal::VisitSequence(
+ values, offset,
+ [this](PyObject* item, bool* /* unused */) { return this->Append(item); });
}
// Convert and append a sequence of values masked with a numpy array
- Status ExtendMasked(PyObject* values, PyObject* mask, int64_t size) override {
- /// Ensure we've allocated enough space
- RETURN_NOT_OK(this->Reserve(size));
+ Status ExtendMasked(PyObject* values, PyObject* mask, int64_t size,
+ int64_t offset = 0) override {
+ DCHECK_GE(size, offset);
+ // Ensure we've allocated enough space
+ RETURN_NOT_OK(this->Reserve(size - offset));
// Iterate over the items adding each one
return internal::VisitSequenceMasked(
- values, mask, [this](PyObject* item, bool is_masked, bool* /* unused */) {
+ values, mask, offset, [this](PyObject* item, bool is_masked, bool* /* unused */) {
if (is_masked) {
return this->AppendNull();
} else {
@@ -536,6 +539,41 @@ class PyPrimitiveConverter>
return Status::OK();
}
+ Result ExtendAsMuchAsPossible(PyObject* values, int64_t size,
+ int64_t offset) override {
+ DCHECK_GE(size, offset);
+ // See BaseBinaryBuilder::Resize - avoid error in Reserve()
+ ARROW_LOG(WARNING) << "size=" << size << " offset=" << offset
+ << " limit=" << BaseBinaryBuilder::memory_limit();
+ if (size - offset >= BaseBinaryBuilder::memory_limit()) {
+ size = offset + BaseBinaryBuilder::memory_limit() - 1;
+ }
+ ARROW_LOG(WARNING) << "size=" << size << " offset=" << offset
+ << " limit=" << BaseBinaryBuilder::memory_limit();
+ const auto status = this->Extend(values, size, offset);
+ const auto num_converted = this->builder()->length();
+ if (ARROW_PREDICT_TRUE(status.ok() ||
+ (status.IsCapacityError() && num_converted > 0))) {
+ return num_converted;
+ }
+ return status;
+ }
+
+ Result ExtendMaskedAsMuchAsPossible(PyObject* values, PyObject* mask,
+ int64_t size, int64_t offset) override {
+ DCHECK_GE(size, offset);
+ if (size - offset >= BaseBinaryBuilder::memory_limit()) {
+ size = offset + BaseBinaryBuilder::memory_limit() - 1;
+ }
+ const auto status = this->ExtendMasked(values, mask, size, offset);
+ const auto num_converted = this->builder()->length();
+ if (ARROW_PREDICT_TRUE(status.ok() ||
+ (status.IsCapacityError() && num_converted > 0))) {
+ return num_converted;
+ }
+ return status;
+ }
+
protected:
// Create a single instance of PyBytesView here to prevent unnecessary object
// creation/destruction. This significantly improves the conversion performance.
@@ -585,6 +623,37 @@ class PyPrimitiveConverter>
return Status::OK();
}
+ Result ExtendAsMuchAsPossible(PyObject* values, int64_t size,
+ int64_t offset) override {
+ DCHECK_GE(size, offset);
+ // See BaseBinaryBuilder::Resize - avoid error in Reserve()
+ if (size - offset > BaseBinaryBuilder::memory_limit()) {
+ size = offset + BaseBinaryBuilder::memory_limit();
+ }
+ const auto status = this->Extend(values, size, offset);
+ const auto num_converted = this->builder()->length();
+ if (ARROW_PREDICT_TRUE(status.ok() ||
+ (status.IsCapacityError() && num_converted > 0))) {
+ return num_converted;
+ }
+ return status;
+ }
+
+ Result ExtendMaskedAsMuchAsPossible(PyObject* values, PyObject* mask,
+ int64_t size, int64_t offset) override {
+ DCHECK_GE(size, offset);
+ if (size - offset >= BaseBinaryBuilder::memory_limit()) {
+ size = offset + BaseBinaryBuilder::memory_limit() - 1;
+ }
+ const auto status = this->ExtendMasked(values, mask, size, offset);
+ const auto num_converted = this->builder()->length();
+ if (ARROW_PREDICT_TRUE(status.ok() ||
+ (status.IsCapacityError() && num_converted > 0))) {
+ return num_converted;
+ }
+ return status;
+ }
+
Result> ToArray() override {
ARROW_ASSIGN_OR_RAISE(auto array, (PrimitiveConverter::ToArray()));
if (observed_binary_) {
diff --git a/cpp/src/arrow/util/converter.h b/cpp/src/arrow/util/converter.h
index 2c40a48726b..f9724d44d6d 100644
--- a/cpp/src/arrow/util/converter.h
+++ b/cpp/src/arrow/util/converter.h
@@ -54,14 +54,40 @@ class Converter {
virtual Status Append(InputType value) { return Status::NotImplemented("Append"); }
- virtual Status Extend(InputType values, int64_t size) {
+ /// \brief Extend the array with values from the input sequence starting at the offset.
+ /// Fails if the array cannot fit all values.
+ /// \param[in] values The input sequence.
+ /// \param[in] size The total length of the input sequence, independent of offset.
+ /// \param[in] offset The offset to start converting from.
+ virtual Status Extend(InputType values, int64_t size, int64_t offset = 0) {
return Status::NotImplemented("Extend");
}
- virtual Status ExtendMasked(InputType values, InputType mask, int64_t size) {
+ /// \brief Extend the array with as many values as possible from the
+ /// input sequence starting at the offset.
+ ///
+ /// \param[in] values The input sequence.
+ /// \param[in] size The total length of the input sequence (independent of offset).
+ /// \param[in] offset The offset to start at.
+ virtual Result ExtendAsMuchAsPossible(InputType values, int64_t size,
+ int64_t offset) {
+ DCHECK_GE(size, offset);
+ RETURN_NOT_OK(Extend(values, size, offset));
+ return builder()->length();
+ }
+
+ virtual Status ExtendMasked(InputType values, InputType mask, int64_t size,
+ int64_t offset = 0) {
return Status::NotImplemented("ExtendMasked");
}
+ virtual Result ExtendMaskedAsMuchAsPossible(InputType values, InputType mask,
+ int64_t size, int64_t offset) {
+ DCHECK_GE(size, offset);
+ RETURN_NOT_OK(ExtendMasked(values, mask, size, offset));
+ return builder()->length();
+ }
+
const std::shared_ptr& builder() const { return builder_; }
const std::shared_ptr& type() const { return type_; }
@@ -302,32 +328,35 @@ class Chunker {
return status;
}
- // we could get bit smarter here since the whole batch of appendable values
- // will be rejected if a capacity error is raised
Status Extend(InputType values, int64_t size) {
- auto status = converter_->Extend(values, size);
- if (ARROW_PREDICT_FALSE(status.IsCapacityError())) {
- if (converter_->builder()->length() == 0) {
- return status;
+ int64_t offset = 0;
+ while (offset < size) {
+ ARROW_ASSIGN_OR_RAISE(const int64_t num_converted,
+ converter_->ExtendAsMuchAsPossible(values, size, offset));
+ offset += num_converted;
+ length_ += num_converted;
+ if (offset < size) {
+ // Need another chunk, finish up the current one
+ ARROW_RETURN_NOT_OK(FinishChunk());
}
- ARROW_RETURN_NOT_OK(FinishChunk());
- return Extend(values, size);
}
- length_ += size;
- return status;
+ return Status::OK();
}
Status ExtendMasked(InputType values, InputType mask, int64_t size) {
- auto status = converter_->ExtendMasked(values, mask, size);
- if (ARROW_PREDICT_FALSE(status.IsCapacityError())) {
- if (converter_->builder()->length() == 0) {
- return status;
+ int64_t offset = 0;
+ while (offset < size) {
+ ARROW_ASSIGN_OR_RAISE(
+ const int64_t num_converted,
+ converter_->ExtendMaskedAsMuchAsPossible(values, mask, size, offset));
+ offset += num_converted;
+ length_ += num_converted;
+ if (offset < size) {
+ // Need another chunk, finish up the current one
+ ARROW_RETURN_NOT_OK(FinishChunk());
}
- ARROW_RETURN_NOT_OK(FinishChunk());
- return ExtendMasked(values, mask, size);
}
- length_ += size;
- return status;
+ return Status::OK();
}
Status FinishChunk() {
diff --git a/python/.gitignore b/python/.gitignore
index ef1237a2d10..af83a453fe8 100644
--- a/python/.gitignore
+++ b/python/.gitignore
@@ -17,6 +17,7 @@ Testing/
*.cpp
pyarrow/*_api.h
pyarrow/_generated_version.py
+cython_debug/
# Bundled headers
pyarrow/include
diff --git a/python/pyarrow/tests/test_array.py b/python/pyarrow/tests/test_array.py
index 30500bc3c5b..5e565214b16 100644
--- a/python/pyarrow/tests/test_array.py
+++ b/python/pyarrow/tests/test_array.py
@@ -2875,3 +2875,50 @@ def test_to_pandas_timezone():
arr = pa.chunked_array([arr])
s = arr.to_pandas()
assert s.dt.tz is not None
+
+
+@pytest.mark.slow
+@pytest.mark.large_memory
+def test_array_from_pylist_data_overflow():
+ # Regression test for ARROW-12983
+ # Data buffer overflow - should result in chunked array
+ items = [b'a' * 4096] * (2 ** 19)
+ arr = pa.array(items, type=pa.string())
+ assert isinstance(arr, pa.ChunkedArray)
+ assert len(arr) == 2**19
+ assert len(arr.chunks) > 1
+
+ mask = np.zeros(2**19, bool)
+ arr = pa.array(items, mask=mask, type=pa.string())
+ assert isinstance(arr, pa.ChunkedArray)
+ assert len(arr) == 2**19
+ assert len(arr.chunks) > 1
+
+ arr = pa.array(items, type=pa.binary())
+ assert isinstance(arr, pa.ChunkedArray)
+ assert len(arr) == 2**19
+ assert len(arr.chunks) > 1
+
+
+@pytest.mark.slow
+@pytest.mark.large_memory
+def test_array_from_pylist_offset_overflow():
+ # Regression test for ARROW-12983
+ # Offset buffer overflow - should result in chunked array
+ # Note this doesn't apply to primitive arrays
+ items = [b'a'] * (2 ** 31)
+ arr = pa.array(items, type=pa.string())
+ assert isinstance(arr, pa.ChunkedArray)
+ assert len(arr) == 2**31
+ assert len(arr.chunks) > 1
+
+ mask = np.zeros(2**31, bool)
+ arr = pa.array(items, mask=mask, type=pa.string())
+ assert isinstance(arr, pa.ChunkedArray)
+ assert len(arr) == 2**31
+ assert len(arr.chunks) > 1
+
+ arr = pa.array(items, type=pa.binary())
+ assert isinstance(arr, pa.ChunkedArray)
+ assert len(arr) == 2**31
+ assert len(arr.chunks) > 1
From 07fe1dade9f7219a6ddde9807dbfd66370b20911 Mon Sep 17 00:00:00 2001
From: David Li
Date: Mon, 7 Jun 2021 15:17:34 -0400
Subject: [PATCH 2/6] ARROW-12983: [R] Update r_to_arrow.cpp to new interface
---
r/src/r_to_arrow.cpp | 133 ++++++++++++++++++++++---------------------
1 file changed, 69 insertions(+), 64 deletions(-)
diff --git a/r/src/r_to_arrow.cpp b/r/src/r_to_arrow.cpp
index d0f4f3a6def..683e8f278e8 100644
--- a/r/src/r_to_arrow.cpp
+++ b/r/src/r_to_arrow.cpp
@@ -326,7 +326,7 @@ class RConverter : public Converter {
public:
virtual Status Append(SEXP) { return Status::NotImplemented("Append"); }
- virtual Status Extend(SEXP values, int64_t size) {
+ virtual Status Extend(SEXP values, int64_t size, int64_t offset = 0) {
return Status::NotImplemented("Extend");
}
@@ -337,7 +337,7 @@ class RConverter : public Converter {
tasks.Append(false, task);
}
- virtual Status ExtendMasked(SEXP values, SEXP mask, int64_t size) {
+ virtual Status ExtendMasked(SEXP values, SEXP mask, int64_t size, int64_t offset = 0) {
return Status::NotImplemented("ExtendMasked");
}
};
@@ -434,8 +434,8 @@ template
class RPrimitiveConverter>
: public PrimitiveConverter {
public:
- Status Extend(SEXP, int64_t size) override {
- return this->primitive_builder_->AppendNulls(size);
+ Status Extend(SEXP, int64_t size, int64_t offset = 0) override {
+ return this->primitive_builder_->AppendNulls(size - offset);
}
};
@@ -445,17 +445,17 @@ class RPrimitiveConverter<
T, enable_if_t::value || is_floating_type::value>>
: public PrimitiveConverter {
public:
- Status Extend(SEXP x, int64_t size) override {
+ Status Extend(SEXP x, int64_t size, int64_t offset = 0) override {
auto rtype = GetVectorType(x);
switch (rtype) {
case UINT8:
- return ExtendDispatch(x, size);
+ return ExtendDispatch(x, size, offset);
case INT32:
- return ExtendDispatch(x, size);
+ return ExtendDispatch(x, size, offset);
case FLOAT64:
- return ExtendDispatch(x, size);
+ return ExtendDispatch(x, size, offset);
case INT64:
- return ExtendDispatch(x, size);
+ return ExtendDispatch(x, size, offset);
default:
break;
@@ -471,14 +471,14 @@ class RPrimitiveConverter<
private:
template
- Status ExtendDispatch(SEXP x, int64_t size) {
+ Status ExtendDispatch(SEXP x, int64_t size, int64_t offset) {
if (ALTREP(x)) {
// `x` is an ALTREP R vector storing `r_value_type`
// and that type matches exactly the type of the array this is building
- return Extend_impl(RVectorIterator_ALTREP(x, 0), size);
+ return Extend_impl(RVectorIterator_ALTREP(x, offset), size);
} else {
// `x` is not an ALTREP vector so we have direct access to a range of values
- return Extend_impl(RVectorIterator(x, 0), size);
+ return Extend_impl(RVectorIterator(x, offset), size);
}
}
@@ -514,16 +514,16 @@ template
class RPrimitiveConverter::value>>
: public PrimitiveConverter {
public:
- Status Extend(SEXP x, int64_t size) override {
+ Status Extend(SEXP x, int64_t size, int64_t offset = 0) override {
auto rtype = GetVectorType(x);
if (rtype != BOOLEAN) {
return Status::Invalid("Expecting a logical vector");
}
if (ALTREP(x)) {
- return Extend_impl(RVectorIterator_ALTREP(x, 0), size);
+ return Extend_impl(RVectorIterator_ALTREP(x, offset), size);
} else {
- return Extend_impl(RVectorIterator(x, 0), size);
+ return Extend_impl(RVectorIterator(x, offset), size);
}
}
@@ -553,16 +553,16 @@ template
class RPrimitiveConverter::value>>
: public PrimitiveConverter {
public:
- Status Extend(SEXP x, int64_t size) override {
+ Status Extend(SEXP x, int64_t size, int64_t offset = 0) override {
switch (GetVectorType(x)) {
case DATE_INT:
- return AppendRange_Date_dispatch(x, size);
+ return AppendRange_Date_dispatch(x, size, offset);
case DATE_DBL:
- return AppendRange_Date_dispatch(x, size);
+ return AppendRange_Date_dispatch(x, size, offset);
case POSIXCT:
- return AppendRange_Posixct_dispatch(x, size);
+ return AppendRange_Posixct_dispatch(x, size, offset);
default:
break;
@@ -578,11 +578,12 @@ class RPrimitiveConverter::value>>
private:
template
- Status AppendRange_Date_dispatch(SEXP x, int64_t size) {
+ Status AppendRange_Date_dispatch(SEXP x, int64_t size, int64_t offset) {
if (ALTREP(x)) {
- return AppendRange_Date(RVectorIterator_ALTREP(x, 0), size);
+ return AppendRange_Date(RVectorIterator_ALTREP(x, offset),
+ size - offset);
} else {
- return AppendRange_Date(RVectorIterator(x, 0), size);
+ return AppendRange_Date(RVectorIterator(x, offset), size - offset);
}
}
@@ -602,11 +603,12 @@ class RPrimitiveConverter::value>>
return VisitVector(it, size, append_null, append_value);
}
- Status AppendRange_Posixct_dispatch(SEXP x, int64_t size) {
+ Status AppendRange_Posixct_dispatch(SEXP x, int64_t size, int64_t offset) {
if (ALTREP(x)) {
- return AppendRange_Posixct(RVectorIterator_ALTREP(x, 0), size);
+ return AppendRange_Posixct(RVectorIterator_ALTREP(x, offset),
+ size - offset);
} else {
- return AppendRange_Posixct(RVectorIterator(x, 0), size);
+ return AppendRange_Posixct(RVectorIterator(x, offset), size - offset);
}
}
@@ -660,8 +662,8 @@ template
class RPrimitiveConverter::value>>
: public PrimitiveConverter {
public:
- Status Extend(SEXP x, int64_t size) override {
- RETURN_NOT_OK(this->Reserve(size));
+ Status Extend(SEXP x, int64_t size, int64_t offset = 0) override {
+ RETURN_NOT_OK(this->Reserve(size - offset));
auto rtype = GetVectorType(x);
if (rtype != TIME) {
return Status::Invalid("Invalid conversion to time");
@@ -699,10 +701,11 @@ class RPrimitiveConverter::value>>
};
if (ALTREP(x)) {
- return VisitVector(RVectorIterator_ALTREP(x, 0), size, append_null,
+ return VisitVector(RVectorIterator_ALTREP(x, offset), size, append_null,
append_value);
} else {
- return VisitVector(RVectorIterator(x, 0), size, append_null, append_value);
+ return VisitVector(RVectorIterator(x, offset), size, append_null,
+ append_value);
}
}
@@ -716,8 +719,8 @@ template
class RPrimitiveConverter::value>>
: public PrimitiveConverter {
public:
- Status Extend(SEXP x, int64_t size) override {
- RETURN_NOT_OK(this->Reserve(size));
+ Status Extend(SEXP x, int64_t size, int64_t offset = 0) override {
+ RETURN_NOT_OK(this->Reserve(size - offset));
RVectorType rtype = GetVectorType(x);
if (rtype != POSIXCT) {
@@ -737,10 +740,11 @@ class RPrimitiveConverter::value>>
};
if (ALTREP(x)) {
- return VisitVector(RVectorIterator_ALTREP(x, 0), size, append_null,
+ return VisitVector(RVectorIterator_ALTREP(x, offset), size, append_null,
append_value);
} else {
- return VisitVector(RVectorIterator(x, 0), size, append_null, append_value);
+ return VisitVector(RVectorIterator(x, offset), size, append_null,
+ append_value);
}
}
@@ -754,7 +758,7 @@ template
class RPrimitiveConverter::value>>
: public PrimitiveConverter {
public:
- Status Extend(SEXP x, int64_t size) override {
+ Status Extend(SEXP x, int64_t size, int64_t offset = 0) override {
return Status::NotImplemented("Extend");
}
};
@@ -786,8 +790,8 @@ class RPrimitiveConverter>
public:
using OffsetType = typename T::offset_type;
- Status Extend(SEXP x, int64_t size) override {
- RETURN_NOT_OK(this->Reserve(size));
+ Status Extend(SEXP x, int64_t size, int64_t offset = 0) override {
+ RETURN_NOT_OK(this->Reserve(size - offset));
RETURN_NOT_OK(check_binary(x, size));
auto append_null = [this]() {
@@ -801,7 +805,7 @@ class RPrimitiveConverter>
this->primitive_builder_->UnsafeAppend(RAW_RO(raw), static_cast(n));
return Status::OK();
};
- return VisitVector(RVectorIterator(x, 0), size, append_null, append_value);
+ return VisitVector(RVectorIterator(x, offset), size, append_null, append_value);
}
void DelayedExtend(SEXP values, int64_t size, RTasks& tasks) override {
@@ -814,8 +818,8 @@ template
class RPrimitiveConverter::value>>
: public PrimitiveConverter {
public:
- Status Extend(SEXP x, int64_t size) override {
- RETURN_NOT_OK(this->Reserve(size));
+ Status Extend(SEXP x, int64_t size, int64_t offset = 0) override {
+ RETURN_NOT_OK(this->Reserve(size - offset));
RETURN_NOT_OK(check_binary(x, size));
auto append_null = [this]() {
@@ -833,7 +837,7 @@ class RPrimitiveConverter::v
this->primitive_builder_->UnsafeAppend(RAW_RO(raw));
return Status::OK();
};
- return VisitVector(RVectorIterator(x, 0), size, append_null, append_value);
+ return VisitVector(RVectorIterator(x, offset), size, append_null, append_value);
}
void DelayedExtend(SEXP values, int64_t size, RTasks& tasks) override {
@@ -848,12 +852,12 @@ class RPrimitiveConverter>
public:
using OffsetType = typename T::offset_type;
- Status Extend(SEXP x, int64_t size) override {
+ Status Extend(SEXP x, int64_t size, int64_t offset = 0) override {
RVectorType rtype = GetVectorType(x);
if (rtype != STRING) {
return Status::Invalid("Expecting a character vector");
}
- return UnsafeAppendUtf8Strings(arrow::r::utf8_strings(x), size);
+ return UnsafeAppendUtf8Strings(arrow::r::utf8_strings(x), size, offset);
}
void DelayedExtend(SEXP values, int64_t size, RTasks& tasks) override {
@@ -863,14 +867,14 @@ class RPrimitiveConverter>
}
private:
- Status UnsafeAppendUtf8Strings(const cpp11::strings& s, int64_t size) {
+ Status UnsafeAppendUtf8Strings(const cpp11::strings& s, int64_t size, int64_t offset) {
RETURN_NOT_OK(this->primitive_builder_->Reserve(s.size()));
const SEXP* p_strings = reinterpret_cast(DATAPTR_RO(s));
// we know all the R strings are utf8 already, so we can get
// a definite size and then use UnsafeAppend*()
int64_t total_length = 0;
- for (R_xlen_t i = 0; i < size; i++, ++p_strings) {
+ for (R_xlen_t i = offset; i < size; i++, ++p_strings) {
SEXP si = *p_strings;
total_length += si == NA_STRING ? 0 : LENGTH(si);
}
@@ -878,7 +882,7 @@ class RPrimitiveConverter>
// append
p_strings = reinterpret_cast(DATAPTR_RO(s));
- for (R_xlen_t i = 0; i < size; i++, ++p_strings) {
+ for (R_xlen_t i = offset; i < size; i++, ++p_strings) {
SEXP si = *p_strings;
if (si == NA_STRING) {
this->primitive_builder_->UnsafeAppendNull();
@@ -895,7 +899,7 @@ template
class RPrimitiveConverter::value>>
: public PrimitiveConverter {
public:
- Status Extend(SEXP x, int64_t size) override {
+ Status Extend(SEXP x, int64_t size, int64_t offset = 0) override {
// TODO: look in lubridate
return Status::NotImplemented("Extend");
}
@@ -911,7 +915,7 @@ template
class RDictionaryConverter>
: public DictionaryConverter {
public:
- Status Extend(SEXP x, int64_t size) override {
+ Status Extend(SEXP x, int64_t size, int64_t offset = 0) override {
return Status::NotImplemented("Extend");
}
};
@@ -922,14 +926,14 @@ class RDictionaryConverter>
public:
using BuilderType = DictionaryBuilder;
- Status Extend(SEXP x, int64_t size) override {
- RETURN_NOT_OK(ExtendSetup(x, size));
- return ExtendImpl(x, size, GetCharLevels(x));
+ Status Extend(SEXP x, int64_t size, int64_t offset = 0) override {
+ RETURN_NOT_OK(ExtendSetup(x, size, offset));
+ return ExtendImpl(x, size, offset, GetCharLevels(x));
}
void DelayedExtend(SEXP values, int64_t size, RTasks& tasks) override {
// the setup runs synchronously first
- Status setup = ExtendSetup(values, size);
+ Status setup = ExtendSetup(values, size, /*offset=*/0);
if (!setup.ok()) {
// if that fails, propagate the error
@@ -938,7 +942,7 @@ class RDictionaryConverter>
auto char_levels = GetCharLevels(values);
tasks.Append(true, [this, values, size, char_levels]() {
- return this->ExtendImpl(values, size, char_levels);
+ return this->ExtendImpl(values, size, /*offset=*/0, char_levels);
});
}
}
@@ -970,7 +974,7 @@ class RDictionaryConverter>
return char_levels;
}
- Status ExtendSetup(SEXP x, int64_t size) {
+ Status ExtendSetup(SEXP x, int64_t size, int64_t offset) {
RVectorType rtype = GetVectorType(x);
if (rtype != FACTOR) {
return Status::Invalid("invalid R type to convert to dictionary");
@@ -982,17 +986,18 @@ class RDictionaryConverter>
RETURN_NOT_OK(this->value_builder_->InsertMemoValues(*memo_array));
// then we can proceed
- return this->Reserve(size);
+ return this->Reserve(size - offset);
}
- Status ExtendImpl(SEXP values, int64_t size,
+ Status ExtendImpl(SEXP values, int64_t size, int64_t offset,
const std::vector& char_levels) {
auto append_null = [this]() { return this->value_builder_->AppendNull(); };
auto append_value = [this, &char_levels](int value) {
return this->value_builder_->Append(char_levels[value - 1]);
};
- return VisitVector(RVectorIterator(values, 0), size, append_null, append_value);
+ return VisitVector(RVectorIterator(values, offset), size, append_null,
+ append_value);
}
};
@@ -1014,7 +1019,7 @@ struct RConverterTrait> {
template
class RListConverter : public ListConverter {
public:
- Status Extend(SEXP x, int64_t size) override {
+ Status Extend(SEXP x, int64_t size, int64_t offset = 0) override {
RETURN_NOT_OK(this->Reserve(size));
RVectorType rtype = GetVectorType(x);
@@ -1034,7 +1039,7 @@ class RListConverter : public ListConverter {
return this->value_converter_.get()->Extend(value, n);
};
- return VisitVector(RVectorIterator(x, 0), size, append_null, append_value);
+ return VisitVector(RVectorIterator(x, offset), size, append_null, append_value);
}
void DelayedExtend(SEXP values, int64_t size, RTasks& tasks) override {
@@ -1056,12 +1061,12 @@ struct RConverterTrait {
class RStructConverter : public StructConverter {
public:
- Status Extend(SEXP x, int64_t size) override {
- RETURN_NOT_OK(ExtendSetup(x, size));
+ Status Extend(SEXP x, int64_t size, int64_t offset = 0) override {
+ RETURN_NOT_OK(ExtendSetup(x, size, offset));
auto fields = this->struct_type_->fields();
R_xlen_t n_columns = XLENGTH(x);
- for (R_xlen_t i = 0; i < n_columns; i++) {
+ for (R_xlen_t i = offset; i < n_columns; i++) {
auto status = children_[i]->Extend(VECTOR_ELT(x, i), size);
if (!status.ok()) {
return Status::Invalid("Problem with column ", (i + 1), " (", fields[i]->name(),
@@ -1074,7 +1079,7 @@ class RStructConverter : public StructConverter {
void DelayedExtend(SEXP values, int64_t size, RTasks& tasks) override {
// the setup runs synchronously first
- Status setup = ExtendSetup(values, size);
+ Status setup = ExtendSetup(values, size, /*offset=*/0);
if (!setup.ok()) {
// if that fails, propagate the error
@@ -1095,7 +1100,7 @@ class RStructConverter : public StructConverter {
return StructConverter::Init(pool);
}
- Status ExtendSetup(SEXP x, int64_t size) {
+ Status ExtendSetup(SEXP x, int64_t size, int64_t offset) {
// check that x is compatible
R_xlen_t n_columns = XLENGTH(x);
@@ -1133,7 +1138,7 @@ class RStructConverter : public StructConverter {
}
}
- RETURN_NOT_OK(this->Reserve(size));
+ RETURN_NOT_OK(this->Reserve(size - offset));
for (R_xlen_t i = 0; i < size; i++) {
RETURN_NOT_OK(struct_builder_->Append());
From 62756c7671469f2d9316e6b6bd347f1e5fdca6bb Mon Sep 17 00:00:00 2001
From: David Li
Date: Mon, 7 Jun 2021 15:31:29 -0400
Subject: [PATCH 3/6] ARROW-12983: [C++] Fix lint error
---
cpp/src/arrow/util/converter.h | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
diff --git a/cpp/src/arrow/util/converter.h b/cpp/src/arrow/util/converter.h
index f9724d44d6d..33237907f30 100644
--- a/cpp/src/arrow/util/converter.h
+++ b/cpp/src/arrow/util/converter.h
@@ -72,7 +72,7 @@ class Converter {
virtual Result ExtendAsMuchAsPossible(InputType values, int64_t size,
int64_t offset) {
DCHECK_GE(size, offset);
- RETURN_NOT_OK(Extend(values, size, offset));
+ ARROW_RETURN_NOT_OK(Extend(values, size, offset));
return builder()->length();
}
@@ -84,7 +84,7 @@ class Converter {
virtual Result ExtendMaskedAsMuchAsPossible(InputType values, InputType mask,
int64_t size, int64_t offset) {
DCHECK_GE(size, offset);
- RETURN_NOT_OK(ExtendMasked(values, mask, size, offset));
+ ARROW_RETURN_NOT_OK(ExtendMasked(values, mask, size, offset));
return builder()->length();
}
From c7e1198041b9132111f3b19d572b8550000900e7 Mon Sep 17 00:00:00 2001
From: David Li
Date: Mon, 7 Jun 2021 15:36:41 -0400
Subject: [PATCH 4/6] ARROW-12983: [C++] Fix DCHECKs
---
cpp/src/arrow/util/converter.h | 5 +++--
1 file changed, 3 insertions(+), 2 deletions(-)
diff --git a/cpp/src/arrow/util/converter.h b/cpp/src/arrow/util/converter.h
index 33237907f30..7a45efc9d33 100644
--- a/cpp/src/arrow/util/converter.h
+++ b/cpp/src/arrow/util/converter.h
@@ -25,6 +25,7 @@
#include "arrow/type.h"
#include "arrow/type_traits.h"
#include "arrow/util/checked_cast.h"
+#include "arrow/util/logging.h"
#include "arrow/util/make_unique.h"
#include "arrow/visitor_inline.h"
@@ -71,7 +72,7 @@ class Converter {
/// \param[in] offset The offset to start at.
virtual Result ExtendAsMuchAsPossible(InputType values, int64_t size,
int64_t offset) {
- DCHECK_GE(size, offset);
+ ARROW_DCHECK_GE(size, offset);
ARROW_RETURN_NOT_OK(Extend(values, size, offset));
return builder()->length();
}
@@ -83,7 +84,7 @@ class Converter {
virtual Result ExtendMaskedAsMuchAsPossible(InputType values, InputType mask,
int64_t size, int64_t offset) {
- DCHECK_GE(size, offset);
+ ARROW_DCHECK_GE(size, offset);
ARROW_RETURN_NOT_OK(ExtendMasked(values, mask, size, offset));
return builder()->length();
}
From 6d951b351569a81124d98dd5680c26aaa76e8d9e Mon Sep 17 00:00:00 2001
From: David Li
Date: Mon, 7 Jun 2021 15:41:08 -0400
Subject: [PATCH 5/6] ARROW-12983: [C++] Remove debug logs
---
cpp/src/arrow/python/python_to_arrow.cc | 4 ----
1 file changed, 4 deletions(-)
diff --git a/cpp/src/arrow/python/python_to_arrow.cc b/cpp/src/arrow/python/python_to_arrow.cc
index 7076ec35363..73d0e5fcd57 100644
--- a/cpp/src/arrow/python/python_to_arrow.cc
+++ b/cpp/src/arrow/python/python_to_arrow.cc
@@ -543,13 +543,9 @@ class PyPrimitiveConverter>
int64_t offset) override {
DCHECK_GE(size, offset);
// See BaseBinaryBuilder::Resize - avoid error in Reserve()
- ARROW_LOG(WARNING) << "size=" << size << " offset=" << offset
- << " limit=" << BaseBinaryBuilder::memory_limit();
if (size - offset >= BaseBinaryBuilder::memory_limit()) {
size = offset + BaseBinaryBuilder::memory_limit() - 1;
}
- ARROW_LOG(WARNING) << "size=" << size << " offset=" << offset
- << " limit=" << BaseBinaryBuilder::memory_limit();
const auto status = this->Extend(values, size, offset);
const auto num_converted = this->builder()->length();
if (ARROW_PREDICT_TRUE(status.ok() ||
From 8516d402b13590e3cd99dc2005d03e274b4cfd2f Mon Sep 17 00:00:00 2001
From: David Li
Date: Thu, 17 Jun 2021 14:48:34 -0400
Subject: [PATCH 6/6] ARROW-12983: [C++] Consolidate binary/string converters
---
cpp/src/arrow/python/inference.cc | 9 ++--
cpp/src/arrow/python/iterators.h | 18 +------
cpp/src/arrow/python/python_to_arrow.cc | 68 +++----------------------
3 files changed, 13 insertions(+), 82 deletions(-)
diff --git a/cpp/src/arrow/python/inference.cc b/cpp/src/arrow/python/inference.cc
index 9d6707aa11d..5086815f84f 100644
--- a/cpp/src/arrow/python/inference.cc
+++ b/cpp/src/arrow/python/inference.cc
@@ -379,12 +379,13 @@ class TypeInferrer {
// Infer value type from a sequence of values
Status VisitSequence(PyObject* obj, PyObject* mask = nullptr) {
if (mask == nullptr || mask == Py_None) {
- return internal::VisitSequence(obj, [this](PyObject* value, bool* keep_going) {
- return Visit(value, keep_going);
- });
+ return internal::VisitSequence(
+ obj, /*offset=*/0,
+ [this](PyObject* value, bool* keep_going) { return Visit(value, keep_going); });
} else {
return internal::VisitSequenceMasked(
- obj, mask, [this](PyObject* value, uint8_t masked, bool* keep_going) {
+ obj, mask, /*offset=*/0,
+ [this](PyObject* value, uint8_t masked, bool* keep_going) {
if (!masked) {
return Visit(value, keep_going);
} else {
diff --git a/cpp/src/arrow/python/iterators.h b/cpp/src/arrow/python/iterators.h
index 6ea527119e4..5ecc9d6dc0c 100644
--- a/cpp/src/arrow/python/iterators.h
+++ b/cpp/src/arrow/python/iterators.h
@@ -84,16 +84,6 @@ inline Status VisitSequenceGeneric(PyObject* obj, int64_t offset, VisitorFunc&&
return Status::OK();
}
-// Visit sequence with no null mask or offset
-template
-inline Status VisitSequence(PyObject* obj, VisitorFunc&& func) {
- return VisitSequenceGeneric(
- obj, /*offset=*/0,
- [&func](PyObject* value, int64_t i /* unused */, bool* keep_going) {
- return func(value, keep_going);
- });
-}
-
// Visit sequence with offset but no null mask
template
inline Status VisitSequence(PyObject* obj, int64_t offset, VisitorFunc&& func) {
@@ -134,12 +124,6 @@ inline Status VisitSequenceMasked(PyObject* obj, PyObject* mo, int64_t offset,
}
}
-/// Visit sequence with null mask but no offset
-template
-inline Status VisitSequenceMasked(PyObject* obj, PyObject* mo, VisitorFunc&& func) {
- return VisitSequenceMasked(obj, mo, /*offset=*/0, func);
-}
-
// Like IterateSequence, but accepts any generic iterable (including
// non-restartable iterators, e.g. generators).
//
@@ -149,7 +133,7 @@ template
inline Status VisitIterable(PyObject* obj, VisitorFunc&& func) {
if (PySequence_Check(obj)) {
// Numpy arrays fall here as well
- return VisitSequence(obj, std::forward(func));
+ return VisitSequence(obj, /*offset=*/0, std::forward(func));
}
// Fall back on the iterator protocol
OwnedRef iter_ref(PyObject_GetIter(obj));
diff --git a/cpp/src/arrow/python/python_to_arrow.cc b/cpp/src/arrow/python/python_to_arrow.cc
index 73d0e5fcd57..69f0de730d2 100644
--- a/cpp/src/arrow/python/python_to_arrow.cc
+++ b/cpp/src/arrow/python/python_to_arrow.cc
@@ -517,65 +517,6 @@ class PyPrimitiveConverter<
}
};
-template
-class PyPrimitiveConverter>
- : public PrimitiveConverter {
- public:
- using OffsetType = typename T::offset_type;
-
- Status Append(PyObject* value) override {
- if (PyValue::IsNull(this->options_, value)) {
- this->primitive_builder_->UnsafeAppendNull();
- } else {
- ARROW_RETURN_NOT_OK(
- PyValue::Convert(this->primitive_type_, this->options_, value, view_));
- // Since we don't know the varying length input size in advance, we need to
- // reserve space in the value builder one by one. ReserveData raises CapacityError
- // if the value would not fit into the array.
- ARROW_RETURN_NOT_OK(this->primitive_builder_->ReserveData(view_.size));
- this->primitive_builder_->UnsafeAppend(view_.bytes,
- static_cast(view_.size));
- }
- return Status::OK();
- }
-
- Result ExtendAsMuchAsPossible(PyObject* values, int64_t size,
- int64_t offset) override {
- DCHECK_GE(size, offset);
- // See BaseBinaryBuilder::Resize - avoid error in Reserve()
- if (size - offset >= BaseBinaryBuilder::memory_limit()) {
- size = offset + BaseBinaryBuilder::memory_limit() - 1;
- }
- const auto status = this->Extend(values, size, offset);
- const auto num_converted = this->builder()->length();
- if (ARROW_PREDICT_TRUE(status.ok() ||
- (status.IsCapacityError() && num_converted > 0))) {
- return num_converted;
- }
- return status;
- }
-
- Result ExtendMaskedAsMuchAsPossible(PyObject* values, PyObject* mask,
- int64_t size, int64_t offset) override {
- DCHECK_GE(size, offset);
- if (size - offset >= BaseBinaryBuilder::memory_limit()) {
- size = offset + BaseBinaryBuilder::memory_limit() - 1;
- }
- const auto status = this->ExtendMasked(values, mask, size, offset);
- const auto num_converted = this->builder()->length();
- if (ARROW_PREDICT_TRUE(status.ok() ||
- (status.IsCapacityError() && num_converted > 0))) {
- return num_converted;
- }
- return status;
- }
-
- protected:
- // Create a single instance of PyBytesView here to prevent unnecessary object
- // creation/destruction. This significantly improves the conversion performance.
- PyBytesView view_;
-};
-
template
class PyPrimitiveConverter::value>>
: public PrimitiveConverter {
@@ -597,7 +538,7 @@ class PyPrimitiveConverter::
};
template
-class PyPrimitiveConverter>
+class PyPrimitiveConverter>
: public PrimitiveConverter {
public:
using OffsetType = typename T::offset_type;
@@ -612,6 +553,9 @@ class PyPrimitiveConverter>
// observed binary value
observed_binary_ = true;
}
+ // Since we don't know the varying length input size in advance, we need to
+ // reserve space in the value builder one by one. ReserveData raises CapacityError
+ // if the value would not fit into the array.
ARROW_RETURN_NOT_OK(this->primitive_builder_->ReserveData(view_.size));
this->primitive_builder_->UnsafeAppend(view_.bytes,
static_cast(view_.size));
@@ -624,7 +568,7 @@ class PyPrimitiveConverter>
DCHECK_GE(size, offset);
// See BaseBinaryBuilder::Resize - avoid error in Reserve()
if (size - offset > BaseBinaryBuilder::memory_limit()) {
- size = offset + BaseBinaryBuilder::memory_limit();
+ size = offset + BaseBinaryBuilder::memory_limit() - 1;
}
const auto status = this->Extend(values, size, offset);
const auto num_converted = this->builder()->length();
@@ -662,6 +606,8 @@ class PyPrimitiveConverter>
}
protected:
+ // Create a single instance of PyBytesView here to prevent unnecessary object
+ // creation/destruction. This significantly improves the conversion performance.
PyBytesView view_;
bool observed_binary_ = false;
};