Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .github/workflows/python.yml
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ jobs:
ARROW_WITH_BROTLI: ON
ARROW_BUILD_TESTS: OFF
CMAKE_ARGS: "-DPython3_EXECUTABLE=/usr/local/bin/python3"
PYARROW_TEST_LARGE_MEMORY: ON
steps:
- name: Checkout Arrow
uses: actions/checkout@v2
Expand All @@ -139,6 +140,12 @@ jobs:
- name: Fetch Submodules and Tags
shell: bash
run: ci/scripts/util_checkout.sh
- name: Show available RAM size
shell: bash
run: |
hwmemsize=$(sysctl -n hw.memsize)
ramsize=$(expr $hwmemsize / $((1024**3)))
echo "System Memory: ${ramsize} GB"
- name: Install Dependencies
shell: bash
run: |
Expand Down
2 changes: 1 addition & 1 deletion ci/scripts/python_test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,4 @@ export LD_LIBRARY_PATH=${ARROW_HOME}/lib:${LD_LIBRARY_PATH}
# Enable some checks inside Python itself
export PYTHONDEVMODE=1

pytest -r s ${PYTEST_ARGS} --pyargs pyarrow
pytest -r s -v ${PYTEST_ARGS} --pyargs pyarrow
7 changes: 0 additions & 7 deletions cpp/src/arrow/array/builder_binary.h
Original file line number Diff line number Diff line change
Expand Up @@ -291,14 +291,7 @@ class BaseBinaryBuilder : public ArrayBuilder {
}

Status Resize(int64_t capacity) override {
// XXX Why is this check necessary? There is no reason to disallow, say,
// binary arrays with more than 2**31 empty or null values.
if (capacity > memory_limit()) {
return Status::CapacityError("BinaryBuilder cannot reserve space for more than ",
memory_limit(), " child elements, got ", capacity);
}
ARROW_RETURN_NOT_OK(CheckCapacity(capacity));

// One more than requested for offsets
ARROW_RETURN_NOT_OK(offsets_builder_.Resize(capacity + 1));
return ArrayBuilder::Resize(capacity);
Expand Down
9 changes: 5 additions & 4 deletions cpp/src/arrow/python/inference.cc
Original file line number Diff line number Diff line change
Expand Up @@ -379,12 +379,13 @@ class TypeInferrer {
// Infer value type from a sequence of values
Status VisitSequence(PyObject* obj, PyObject* mask = nullptr) {
if (mask == nullptr || mask == Py_None) {
return internal::VisitSequence(obj, [this](PyObject* value, bool* keep_going) {
return Visit(value, keep_going);
});
return internal::VisitSequence(
obj, /*offset=*/0,
[this](PyObject* value, bool* keep_going) { return Visit(value, keep_going); });
} else {
return internal::VisitSequenceMasked(
obj, mask, [this](PyObject* value, uint8_t masked, bool* keep_going) {
obj, mask, /*offset=*/0,
[this](PyObject* value, uint8_t masked, bool* keep_going) {
if (!masked) {
return Visit(value, keep_going);
} else {
Expand Down
19 changes: 10 additions & 9 deletions cpp/src/arrow/python/iterators.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ namespace internal {
//
// If keep_going is set to false, the iteration terminates
template <class VisitorFunc>
inline Status VisitSequenceGeneric(PyObject* obj, VisitorFunc&& func) {
inline Status VisitSequenceGeneric(PyObject* obj, int64_t offset, VisitorFunc&& func) {
// VisitorFunc may set to false to terminate iteration
bool keep_going = true;

Expand All @@ -49,7 +49,7 @@ inline Status VisitSequenceGeneric(PyObject* obj, VisitorFunc&& func) {
if (PyArray_DESCR(arr_obj)->type_num == NPY_OBJECT) {
// It's an array object, we can fetch object pointers directly
const Ndarray1DIndexer<PyObject*> objects(arr_obj);
for (int64_t i = 0; keep_going && i < objects.size(); ++i) {
for (int64_t i = offset; keep_going && i < objects.size(); ++i) {
RETURN_NOT_OK(func(objects[i], i, &keep_going));
}
return Status::OK();
Expand All @@ -64,15 +64,15 @@ inline Status VisitSequenceGeneric(PyObject* obj, VisitorFunc&& func) {
if (PyList_Check(obj) || PyTuple_Check(obj)) {
// Use fast item access
const Py_ssize_t size = PySequence_Fast_GET_SIZE(obj);
for (Py_ssize_t i = 0; keep_going && i < size; ++i) {
for (Py_ssize_t i = offset; keep_going && i < size; ++i) {
PyObject* value = PySequence_Fast_GET_ITEM(obj, i);
RETURN_NOT_OK(func(value, static_cast<int64_t>(i), &keep_going));
}
} else {
// Regular sequence: avoid making a potentially large copy
const Py_ssize_t size = PySequence_Size(obj);
RETURN_IF_PYERROR();
for (Py_ssize_t i = 0; keep_going && i < size; ++i) {
for (Py_ssize_t i = offset; keep_going && i < size; ++i) {
OwnedRef value_ref(PySequence_ITEM(obj, i));
RETURN_IF_PYERROR();
RETURN_NOT_OK(func(value_ref.obj(), static_cast<int64_t>(i), &keep_going));
Expand All @@ -86,16 +86,17 @@ inline Status VisitSequenceGeneric(PyObject* obj, VisitorFunc&& func) {

// Visit sequence with no null mask
template <class VisitorFunc>
inline Status VisitSequence(PyObject* obj, VisitorFunc&& func) {
inline Status VisitSequence(PyObject* obj, int64_t offset, VisitorFunc&& func) {
return VisitSequenceGeneric(
obj, [&func](PyObject* value, int64_t i /* unused */, bool* keep_going) {
obj, offset, [&func](PyObject* value, int64_t i /* unused */, bool* keep_going) {
return func(value, keep_going);
});
}

/// Visit sequence with null mask
template <class VisitorFunc>
inline Status VisitSequenceMasked(PyObject* obj, PyObject* mo, VisitorFunc&& func) {
inline Status VisitSequenceMasked(PyObject* obj, PyObject* mo, int64_t offset,
VisitorFunc&& func) {
if (mo == nullptr || !PyArray_Check(mo)) {
return Status::Invalid("Null mask must be NumPy array");
}
Expand All @@ -115,7 +116,7 @@ inline Status VisitSequenceMasked(PyObject* obj, PyObject* mo, VisitorFunc&& fun
Ndarray1DIndexer<uint8_t> mask_values(mask);

return VisitSequenceGeneric(
obj, [&func, &mask_values](PyObject* value, int64_t i, bool* keep_going) {
obj, offset, [&func, &mask_values](PyObject* value, int64_t i, bool* keep_going) {
return func(value, mask_values[i], keep_going);
});
} else {
Expand All @@ -132,7 +133,7 @@ template <class VisitorFunc>
inline Status VisitIterable(PyObject* obj, VisitorFunc&& func) {
if (PySequence_Check(obj)) {
// Numpy arrays fall here as well
return VisitSequence(obj, std::forward<VisitorFunc>(func));
return VisitSequence(obj, /*offset=*/0, std::forward<VisitorFunc>(func));
}
// Fall back on the iterator protocol
OwnedRef iter_ref(PyObject_GetIter(obj));
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/python/numpy_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class Ndarray1DIndexer {

int64_t size() const { return PyArray_SIZE(arr_); }

T* data() const { return data_; }
const T* data() const { return reinterpret_cast<const T*>(data_); }

bool is_strided() const { return stride_ != sizeof(T); }

Expand Down
55 changes: 17 additions & 38 deletions cpp/src/arrow/python/python_to_arrow.cc
Original file line number Diff line number Diff line change
Expand Up @@ -392,22 +392,25 @@ class PyValue {
class PyConverter : public Converter<PyObject*, PyConversionOptions> {
public:
// Iterate over the input values and defer the conversion to the Append method
Status Extend(PyObject* values, int64_t size) override {
Status Extend(PyObject* values, int64_t size, int64_t offset = 0) override {
DCHECK_GE(size, offset);
/// Ensure we've allocated enough space
RETURN_NOT_OK(this->Reserve(size));
RETURN_NOT_OK(this->Reserve(size - offset));
// Iterate over the items adding each one
return internal::VisitSequence(values, [this](PyObject* item, bool* /* unused */) {
return this->Append(item);
});
return internal::VisitSequence(
values, offset,
[this](PyObject* item, bool* /* unused */) { return this->Append(item); });
}

// Convert and append a sequence of values masked with a numpy array
Status ExtendMasked(PyObject* values, PyObject* mask, int64_t size) override {
Status ExtendMasked(PyObject* values, PyObject* mask, int64_t size,
int64_t offset = 0) override {
DCHECK_GE(size, offset);
/// Ensure we've allocated enough space
RETURN_NOT_OK(this->Reserve(size));
RETURN_NOT_OK(this->Reserve(size - offset));
// Iterate over the items adding each one
return internal::VisitSequenceMasked(
values, mask, [this](PyObject* item, bool is_masked, bool* /* unused */) {
values, mask, offset, [this](PyObject* item, bool is_masked, bool* /* unused */) {
if (is_masked) {
return this->AppendNull();
} else {
Expand Down Expand Up @@ -514,34 +517,6 @@ class PyPrimitiveConverter<
}
};

template <typename T>
class PyPrimitiveConverter<T, enable_if_binary<T>>
: public PrimitiveConverter<T, PyConverter> {
public:
using OffsetType = typename T::offset_type;

Status Append(PyObject* value) override {
if (PyValue::IsNull(this->options_, value)) {
this->primitive_builder_->UnsafeAppendNull();
} else {
ARROW_RETURN_NOT_OK(
PyValue::Convert(this->primitive_type_, this->options_, value, view_));
// Since we don't know the varying length input size in advance, we need to
// reserve space in the value builder one by one. ReserveData raises CapacityError
// if the value would not fit into the array.
ARROW_RETURN_NOT_OK(this->primitive_builder_->ReserveData(view_.size));
this->primitive_builder_->UnsafeAppend(view_.bytes,
static_cast<OffsetType>(view_.size));
}
return Status::OK();
}

protected:
// Create a single instance of PyBytesView here to prevent unnecessary object
// creation/destruction. This significantly improves the conversion performance.
PyBytesView view_;
};

template <typename T>
class PyPrimitiveConverter<T, enable_if_t<std::is_same<T, FixedSizeBinaryType>::value>>
: public PrimitiveConverter<T, PyConverter> {
Expand All @@ -563,7 +538,7 @@ class PyPrimitiveConverter<T, enable_if_t<std::is_same<T, FixedSizeBinaryType>::
};

template <typename T>
class PyPrimitiveConverter<T, enable_if_string_like<T>>
class PyPrimitiveConverter<T, enable_if_base_binary<T>>
: public PrimitiveConverter<T, PyConverter> {
public:
using OffsetType = typename T::offset_type;
Expand All @@ -578,6 +553,9 @@ class PyPrimitiveConverter<T, enable_if_string_like<T>>
// observed binary value
observed_binary_ = true;
}
// Since we don't know the varying length input size in advance, we need to
// reserve space in the value builder one by one. ReserveData raises CapacityError
// if the value would not fit into the array.
ARROW_RETURN_NOT_OK(this->primitive_builder_->ReserveData(view_.size));
this->primitive_builder_->UnsafeAppend(view_.bytes,
static_cast<OffsetType>(view_.size));
Expand Down Expand Up @@ -728,7 +706,6 @@ class PyListConverter : public ListConverter<T, PyConverter, PyConverterTrait> {
auto value_builder =
checked_cast<ValueBuilderType*>(this->value_converter_->builder().get());

// TODO(wesm): Vector append when not strided
Ndarray1DIndexer<NumpyType> values(ndarray);
if (null_sentinels_possible) {
for (int64_t i = 0; i < values.size(); ++i) {
Expand All @@ -738,6 +715,8 @@ class PyListConverter : public ListConverter<T, PyConverter, PyConverterTrait> {
RETURN_NOT_OK(value_builder->Append(values[i]));
}
}
} else if (!values.is_strided()) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is unrelated to the fix, but quality of life improvement regarding the testing speed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created a jira for the changelog https://issues.apache.org/jira/browse/ARROW-13142

RETURN_NOT_OK(value_builder->AppendValues(values.data(), values.size()));
} else {
for (int64_t i = 0; i < values.size(); ++i) {
RETURN_NOT_OK(value_builder->Append(values[i]));
Expand Down
85 changes: 64 additions & 21 deletions cpp/src/arrow/util/converter.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,12 @@ class Converter {

virtual Status Append(InputType value) { return Status::NotImplemented("Append"); }

virtual Status Extend(InputType values, int64_t size) {
virtual Status Extend(InputType values, int64_t size, int64_t offset = 0) {
return Status::NotImplemented("Extend");
}

virtual Status ExtendMasked(InputType values, InputType mask, int64_t size) {
virtual Status ExtendMasked(InputType values, InputType mask, int64_t size,
int64_t offset = 0) {
return Status::NotImplemented("ExtendMasked");
}

Expand All @@ -70,6 +71,8 @@ class Converter {

bool may_overflow() const { return may_overflow_; }

bool rewind_on_overflow() const { return rewind_on_overflow_; }

virtual Status Reserve(int64_t additional_capacity) {
return builder_->Reserve(additional_capacity);
}
Expand All @@ -96,6 +99,7 @@ class Converter {
std::shared_ptr<ArrayBuilder> builder_;
OptionsType options_;
bool may_overflow_ = false;
bool rewind_on_overflow_ = false;
};

template <typename ArrowType, typename BaseConverter>
Expand Down Expand Up @@ -134,7 +138,8 @@ class ListConverter : public BaseConverter {
std::make_shared<BuilderType>(pool, value_converter_->builder(), this->type_);
list_builder_ = checked_cast<BuilderType*>(this->builder_.get());
// Narrow list types may overflow
this->may_overflow_ = sizeof(typename ArrowType::offset_type) < sizeof(int64_t);
this->may_overflow_ = this->rewind_on_overflow_ =
sizeof(typename ArrowType::offset_type) < sizeof(int64_t);
return Status::OK();
}

Expand Down Expand Up @@ -167,6 +172,7 @@ class StructConverter : public BaseConverter {
(MakeConverter<BaseConverter, ConverterTrait>(
field->type(), this->options_, pool)));
this->may_overflow_ |= child_converter->may_overflow();
this->rewind_on_overflow_ = this->may_overflow_;
child_builders.push_back(child_converter->builder());
children_.push_back(std::move(child_converter));
}
Expand Down Expand Up @@ -302,32 +308,69 @@ class Chunker {
return status;
}

// we could get bit smarter here since the whole batch of appendable values
// will be rejected if a capacity error is raised
Status Extend(InputType values, int64_t size) {
auto status = converter_->Extend(values, size);
if (ARROW_PREDICT_FALSE(status.IsCapacityError())) {
if (converter_->builder()->length() == 0) {
Status Extend(InputType values, int64_t size, int64_t offset = 0) {
while (offset < size) {
auto length_before = converter_->builder()->length();
auto status = converter_->Extend(values, size, offset);
auto length_after = converter_->builder()->length();
auto num_converted = length_after - length_before;

offset += num_converted;
length_ += num_converted;

if (status.IsCapacityError()) {
if (converter_->builder()->length() == 0) {
// Builder length == 0 means the individual element is too large to append.
// In this case, no need to try again.
return status;
} else if (converter_->rewind_on_overflow()) {
// The list-like and binary-like conversion paths may raise a capacity error,
// we need to handle them differently. While the binary-like converters check
// the capacity before append/extend the list-like converters just check after
// append/extend. Thus depending on the implementation semantics we may need
// to rewind (slice) the output chunk by one.
length_ -= 1;
offset -= 1;
}
ARROW_RETURN_NOT_OK(FinishChunk());
} else if (!status.ok()) {
return status;
}
ARROW_RETURN_NOT_OK(FinishChunk());
return Extend(values, size);
}
length_ += size;
return status;
return Status::OK();
}

Status ExtendMasked(InputType values, InputType mask, int64_t size) {
auto status = converter_->ExtendMasked(values, mask, size);
if (ARROW_PREDICT_FALSE(status.IsCapacityError())) {
if (converter_->builder()->length() == 0) {
Status ExtendMasked(InputType values, InputType mask, int64_t size,
int64_t offset = 0) {
while (offset < size) {
auto length_before = converter_->builder()->length();
auto status = converter_->ExtendMasked(values, mask, size, offset);
auto length_after = converter_->builder()->length();
auto num_converted = length_after - length_before;

offset += num_converted;
length_ += num_converted;

if (status.IsCapacityError()) {
if (converter_->builder()->length() == 0) {
// Builder length == 0 means the individual element is too large to append.
// In this case, no need to try again.
return status;
} else if (converter_->rewind_on_overflow()) {
// The list-like and binary-like conversion paths may raise a capacity error,
// we need to handle them differently. While the binary-like converters check
// the capacity before append/extend the list-like converters just check after
// append/extend. Thus depending on the implementation semantics we may need
// to rewind (slice) the output chunk by one.
length_ -= 1;
offset -= 1;
}
ARROW_RETURN_NOT_OK(FinishChunk());
} else if (!status.ok()) {
return status;
}
ARROW_RETURN_NOT_OK(FinishChunk());
return ExtendMasked(values, mask, size);
}
length_ += size;
return status;
return Status::OK();
}

Status FinishChunk() {
Expand Down
Loading