Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions cpp/src/arrow/python/inference.cc
Original file line number Diff line number Diff line change
Expand Up @@ -379,12 +379,13 @@ class TypeInferrer {
// Infer value type from a sequence of values
Status VisitSequence(PyObject* obj, PyObject* mask = nullptr) {
if (mask == nullptr || mask == Py_None) {
return internal::VisitSequence(obj, [this](PyObject* value, bool* keep_going) {
return Visit(value, keep_going);
});
return internal::VisitSequence(
obj, /*offset=*/0,
[this](PyObject* value, bool* keep_going) { return Visit(value, keep_going); });
} else {
return internal::VisitSequenceMasked(
obj, mask, [this](PyObject* value, uint8_t masked, bool* keep_going) {
obj, mask, /*offset=*/0,
[this](PyObject* value, uint8_t masked, bool* keep_going) {
if (!masked) {
return Visit(value, keep_going);
} else {
Expand Down
23 changes: 12 additions & 11 deletions cpp/src/arrow/python/iterators.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ namespace internal {
//
// If keep_going is set to false, the iteration terminates
template <class VisitorFunc>
inline Status VisitSequenceGeneric(PyObject* obj, VisitorFunc&& func) {
inline Status VisitSequenceGeneric(PyObject* obj, int64_t offset, VisitorFunc&& func) {
// VisitorFunc may set to false to terminate iteration
bool keep_going = true;

Expand All @@ -49,7 +49,7 @@ inline Status VisitSequenceGeneric(PyObject* obj, VisitorFunc&& func) {
if (PyArray_DESCR(arr_obj)->type_num == NPY_OBJECT) {
// It's an array object, we can fetch object pointers directly
const Ndarray1DIndexer<PyObject*> objects(arr_obj);
for (int64_t i = 0; keep_going && i < objects.size(); ++i) {
for (int64_t i = offset; keep_going && i < objects.size(); ++i) {
RETURN_NOT_OK(func(objects[i], i, &keep_going));
}
return Status::OK();
Expand All @@ -64,15 +64,15 @@ inline Status VisitSequenceGeneric(PyObject* obj, VisitorFunc&& func) {
if (PyList_Check(obj) || PyTuple_Check(obj)) {
// Use fast item access
const Py_ssize_t size = PySequence_Fast_GET_SIZE(obj);
for (Py_ssize_t i = 0; keep_going && i < size; ++i) {
for (Py_ssize_t i = offset; keep_going && i < size; ++i) {
PyObject* value = PySequence_Fast_GET_ITEM(obj, i);
RETURN_NOT_OK(func(value, static_cast<int64_t>(i), &keep_going));
}
} else {
// Regular sequence: avoid making a potentially large copy
const Py_ssize_t size = PySequence_Size(obj);
RETURN_IF_PYERROR();
for (Py_ssize_t i = 0; keep_going && i < size; ++i) {
for (Py_ssize_t i = offset; keep_going && i < size; ++i) {
OwnedRef value_ref(PySequence_ITEM(obj, i));
RETURN_IF_PYERROR();
RETURN_NOT_OK(func(value_ref.obj(), static_cast<int64_t>(i), &keep_going));
Expand All @@ -84,18 +84,19 @@ inline Status VisitSequenceGeneric(PyObject* obj, VisitorFunc&& func) {
return Status::OK();
}

// Visit sequence with no null mask
// Visit sequence with offset but no null mask
template <class VisitorFunc>
inline Status VisitSequence(PyObject* obj, VisitorFunc&& func) {
inline Status VisitSequence(PyObject* obj, int64_t offset, VisitorFunc&& func) {
return VisitSequenceGeneric(
obj, [&func](PyObject* value, int64_t i /* unused */, bool* keep_going) {
obj, offset, [&func](PyObject* value, int64_t i /* unused */, bool* keep_going) {
return func(value, keep_going);
});
}

/// Visit sequence with null mask
/// Visit sequence with null mask and offset
template <class VisitorFunc>
inline Status VisitSequenceMasked(PyObject* obj, PyObject* mo, VisitorFunc&& func) {
inline Status VisitSequenceMasked(PyObject* obj, PyObject* mo, int64_t offset,
VisitorFunc&& func) {
if (mo == nullptr || !PyArray_Check(mo)) {
return Status::Invalid("Null mask must be NumPy array");
}
Expand All @@ -115,7 +116,7 @@ inline Status VisitSequenceMasked(PyObject* obj, PyObject* mo, VisitorFunc&& fun
Ndarray1DIndexer<uint8_t> mask_values(mask);

return VisitSequenceGeneric(
obj, [&func, &mask_values](PyObject* value, int64_t i, bool* keep_going) {
obj, offset, [&func, &mask_values](PyObject* value, int64_t i, bool* keep_going) {
return func(value, mask_values[i], keep_going);
});
} else {
Expand All @@ -132,7 +133,7 @@ template <class VisitorFunc>
inline Status VisitIterable(PyObject* obj, VisitorFunc&& func) {
if (PySequence_Check(obj)) {
// Numpy arrays fall here as well
return VisitSequence(obj, std::forward<VisitorFunc>(func));
return VisitSequence(obj, /*offset=*/0, std::forward<VisitorFunc>(func));
}
// Fall back on the iterator protocol
OwnedRef iter_ref(PyObject_GetIter(obj));
Expand Down
89 changes: 50 additions & 39 deletions cpp/src/arrow/python/python_to_arrow.cc
Original file line number Diff line number Diff line change
Expand Up @@ -392,22 +392,25 @@ class PyValue {
class PyConverter : public Converter<PyObject*, PyConversionOptions> {
public:
// Iterate over the input values and defer the conversion to the Append method
Status Extend(PyObject* values, int64_t size) override {
/// Ensure we've allocated enough space
RETURN_NOT_OK(this->Reserve(size));
Status Extend(PyObject* values, int64_t size, int64_t offset = 0) override {
DCHECK_GE(size, offset);
// Ensure we've allocated enough space
RETURN_NOT_OK(this->Reserve(size - offset));
// Iterate over the items adding each one
return internal::VisitSequence(values, [this](PyObject* item, bool* /* unused */) {
return this->Append(item);
});
return internal::VisitSequence(
values, offset,
[this](PyObject* item, bool* /* unused */) { return this->Append(item); });
}

// Convert and append a sequence of values masked with a numpy array
Status ExtendMasked(PyObject* values, PyObject* mask, int64_t size) override {
/// Ensure we've allocated enough space
RETURN_NOT_OK(this->Reserve(size));
Status ExtendMasked(PyObject* values, PyObject* mask, int64_t size,
int64_t offset = 0) override {
DCHECK_GE(size, offset);
// Ensure we've allocated enough space
RETURN_NOT_OK(this->Reserve(size - offset));
// Iterate over the items adding each one
return internal::VisitSequenceMasked(
values, mask, [this](PyObject* item, bool is_masked, bool* /* unused */) {
values, mask, offset, [this](PyObject* item, bool is_masked, bool* /* unused */) {
if (is_masked) {
return this->AppendNull();
} else {
Expand Down Expand Up @@ -514,34 +517,6 @@ class PyPrimitiveConverter<
}
};

template <typename T>
class PyPrimitiveConverter<T, enable_if_binary<T>>
: public PrimitiveConverter<T, PyConverter> {
public:
using OffsetType = typename T::offset_type;

Status Append(PyObject* value) override {
if (PyValue::IsNull(this->options_, value)) {
this->primitive_builder_->UnsafeAppendNull();
} else {
ARROW_RETURN_NOT_OK(
PyValue::Convert(this->primitive_type_, this->options_, value, view_));
// Since we don't know the varying length input size in advance, we need to
// reserve space in the value builder one by one. ReserveData raises CapacityError
// if the value would not fit into the array.
ARROW_RETURN_NOT_OK(this->primitive_builder_->ReserveData(view_.size));
this->primitive_builder_->UnsafeAppend(view_.bytes,
static_cast<OffsetType>(view_.size));
}
return Status::OK();
}

protected:
// Create a single instance of PyBytesView here to prevent unnecessary object
// creation/destruction. This significantly improves the conversion performance.
PyBytesView view_;
};

template <typename T>
class PyPrimitiveConverter<T, enable_if_t<std::is_same<T, FixedSizeBinaryType>::value>>
: public PrimitiveConverter<T, PyConverter> {
Expand All @@ -563,7 +538,7 @@ class PyPrimitiveConverter<T, enable_if_t<std::is_same<T, FixedSizeBinaryType>::
};

template <typename T>
class PyPrimitiveConverter<T, enable_if_string_like<T>>
class PyPrimitiveConverter<T, enable_if_base_binary<T>>
: public PrimitiveConverter<T, PyConverter> {
public:
using OffsetType = typename T::offset_type;
Expand All @@ -578,13 +553,47 @@ class PyPrimitiveConverter<T, enable_if_string_like<T>>
// observed binary value
observed_binary_ = true;
}
// Since we don't know the varying length input size in advance, we need to
// reserve space in the value builder one by one. ReserveData raises CapacityError
// if the value would not fit into the array.
ARROW_RETURN_NOT_OK(this->primitive_builder_->ReserveData(view_.size));
this->primitive_builder_->UnsafeAppend(view_.bytes,
static_cast<OffsetType>(view_.size));
}
return Status::OK();
}

Result<int64_t> ExtendAsMuchAsPossible(PyObject* values, int64_t size,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you try to reconcile the PyPrimitiveConverter for binary and string types? They really look very similar.

int64_t offset) override {
DCHECK_GE(size, offset);
// See BaseBinaryBuilder::Resize - avoid error in Reserve()
if (size - offset > BaseBinaryBuilder<T>::memory_limit()) {
size = offset + BaseBinaryBuilder<T>::memory_limit() - 1;
}
const auto status = this->Extend(values, size, offset);
const auto num_converted = this->builder()->length();
if (ARROW_PREDICT_TRUE(status.ok() ||
(status.IsCapacityError() && num_converted > 0))) {
return num_converted;
}
return status;
}

Result<int64_t> ExtendMaskedAsMuchAsPossible(PyObject* values, PyObject* mask,
int64_t size, int64_t offset) override {
DCHECK_GE(size, offset);
if (size - offset >= BaseBinaryBuilder<T>::memory_limit()) {
size = offset + BaseBinaryBuilder<T>::memory_limit() - 1;
}
const auto status = this->ExtendMasked(values, mask, size, offset);
const auto num_converted = this->builder()->length();
if (ARROW_PREDICT_TRUE(status.ok() ||
(status.IsCapacityError() && num_converted > 0))) {
return num_converted;
}
return status;
}

Result<std::shared_ptr<Array>> ToArray() override {
ARROW_ASSIGN_OR_RAISE(auto array, (PrimitiveConverter<T, PyConverter>::ToArray()));
if (observed_binary_) {
Expand All @@ -597,6 +606,8 @@ class PyPrimitiveConverter<T, enable_if_string_like<T>>
}

protected:
// Create a single instance of PyBytesView here to prevent unnecessary object
// creation/destruction. This significantly improves the conversion performance.
PyBytesView view_;
bool observed_binary_ = false;
};
Expand Down
70 changes: 50 additions & 20 deletions cpp/src/arrow/util/converter.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "arrow/type.h"
#include "arrow/type_traits.h"
#include "arrow/util/checked_cast.h"
#include "arrow/util/logging.h"
#include "arrow/util/make_unique.h"
#include "arrow/visitor_inline.h"

Expand Down Expand Up @@ -54,14 +55,40 @@ class Converter {

virtual Status Append(InputType value) { return Status::NotImplemented("Append"); }

virtual Status Extend(InputType values, int64_t size) {
/// \brief Extend the array with values from the input sequence starting at the offset.
/// Fails if the array cannot fit all values.
/// \param[in] values The input sequence.
/// \param[in] size The total length of the input sequence, independent of offset.
/// \param[in] offset The offset to start converting from.
virtual Status Extend(InputType values, int64_t size, int64_t offset = 0) {
return Status::NotImplemented("Extend");
}

virtual Status ExtendMasked(InputType values, InputType mask, int64_t size) {
/// \brief Extend the array with as many values as possible from the
/// input sequence starting at the offset.
///
/// \param[in] values The input sequence.
/// \param[in] size The total length of the input sequence (independent of offset).
/// \param[in] offset The offset to start at.
virtual Result<int64_t> ExtendAsMuchAsPossible(InputType values, int64_t size,
int64_t offset) {
ARROW_DCHECK_GE(size, offset);
ARROW_RETURN_NOT_OK(Extend(values, size, offset));
return builder()->length();
}

virtual Status ExtendMasked(InputType values, InputType mask, int64_t size,
int64_t offset = 0) {
return Status::NotImplemented("ExtendMasked");
}

virtual Result<int64_t> ExtendMaskedAsMuchAsPossible(InputType values, InputType mask,
int64_t size, int64_t offset) {
ARROW_DCHECK_GE(size, offset);
ARROW_RETURN_NOT_OK(ExtendMasked(values, mask, size, offset));
return builder()->length();
}

const std::shared_ptr<ArrayBuilder>& builder() const { return builder_; }

const std::shared_ptr<DataType>& type() const { return type_; }
Expand Down Expand Up @@ -302,32 +329,35 @@ class Chunker {
return status;
}

// we could get bit smarter here since the whole batch of appendable values
// will be rejected if a capacity error is raised
Status Extend(InputType values, int64_t size) {
auto status = converter_->Extend(values, size);
if (ARROW_PREDICT_FALSE(status.IsCapacityError())) {
if (converter_->builder()->length() == 0) {
return status;
int64_t offset = 0;
while (offset < size) {
ARROW_ASSIGN_OR_RAISE(const int64_t num_converted,
converter_->ExtendAsMuchAsPossible(values, size, offset));
offset += num_converted;
length_ += num_converted;
if (offset < size) {
// Need another chunk, finish up the current one
ARROW_RETURN_NOT_OK(FinishChunk());
}
ARROW_RETURN_NOT_OK(FinishChunk());
return Extend(values, size);
}
length_ += size;
return status;
return Status::OK();
}

Status ExtendMasked(InputType values, InputType mask, int64_t size) {
auto status = converter_->ExtendMasked(values, mask, size);
if (ARROW_PREDICT_FALSE(status.IsCapacityError())) {
if (converter_->builder()->length() == 0) {
return status;
int64_t offset = 0;
while (offset < size) {
ARROW_ASSIGN_OR_RAISE(
const int64_t num_converted,
converter_->ExtendMaskedAsMuchAsPossible(values, mask, size, offset));
offset += num_converted;
length_ += num_converted;
if (offset < size) {
// Need another chunk, finish up the current one
ARROW_RETURN_NOT_OK(FinishChunk());
}
ARROW_RETURN_NOT_OK(FinishChunk());
return ExtendMasked(values, mask, size);
}
length_ += size;
return status;
return Status::OK();
}

Status FinishChunk() {
Expand Down
1 change: 1 addition & 0 deletions python/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ Testing/
*.cpp
pyarrow/*_api.h
pyarrow/_generated_version.py
cython_debug/

# Bundled headers
pyarrow/include
Expand Down
47 changes: 47 additions & 0 deletions python/pyarrow/tests/test_array.py
Original file line number Diff line number Diff line change
Expand Up @@ -2875,3 +2875,50 @@ def test_to_pandas_timezone():
arr = pa.chunked_array([arr])
s = arr.to_pandas()
assert s.dt.tz is not None


@pytest.mark.slow
@pytest.mark.large_memory
def test_array_from_pylist_data_overflow():
# Regression test for ARROW-12983
# Data buffer overflow - should result in chunked array
items = [b'a' * 4096] * (2 ** 19)
arr = pa.array(items, type=pa.string())
assert isinstance(arr, pa.ChunkedArray)
assert len(arr) == 2**19
assert len(arr.chunks) > 1

mask = np.zeros(2**19, bool)
arr = pa.array(items, mask=mask, type=pa.string())
assert isinstance(arr, pa.ChunkedArray)
assert len(arr) == 2**19
assert len(arr.chunks) > 1

arr = pa.array(items, type=pa.binary())
assert isinstance(arr, pa.ChunkedArray)
assert len(arr) == 2**19
assert len(arr.chunks) > 1


@pytest.mark.slow
@pytest.mark.large_memory
def test_array_from_pylist_offset_overflow():
# Regression test for ARROW-12983
# Offset buffer overflow - should result in chunked array
# Note this doesn't apply to primitive arrays
items = [b'a'] * (2 ** 31)
arr = pa.array(items, type=pa.string())
assert isinstance(arr, pa.ChunkedArray)
assert len(arr) == 2**31
assert len(arr.chunks) > 1

mask = np.zeros(2**31, bool)
arr = pa.array(items, mask=mask, type=pa.string())
assert isinstance(arr, pa.ChunkedArray)
assert len(arr) == 2**31
assert len(arr.chunks) > 1

arr = pa.array(items, type=pa.binary())
assert isinstance(arr, pa.ChunkedArray)
assert len(arr) == 2**31
assert len(arr.chunks) > 1
Loading