Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 41 additions & 41 deletions cpp/src/arrow/python/python_to_arrow.cc
Original file line number Diff line number Diff line change
Expand Up @@ -388,36 +388,36 @@ class PyValue {
}
};

template <typename T>
Status Extend(T* converter, PyObject* values, int64_t size) {
/// Ensure we've allocated enough space
RETURN_NOT_OK(converter->Reserve(size));
// Iterate over the items adding each one
return internal::VisitSequence(values, [converter](PyObject* item, bool* /* unused */) {
return converter->Append(item);
});
}

// Convert and append a sequence of values masked with a numpy array
template <typename T>
Status ExtendMasked(T* converter, PyObject* values, PyObject* mask, int64_t size) {
/// Ensure we've allocated enough space
RETURN_NOT_OK(converter->Reserve(size));
// Iterate over the items adding each one
return internal::VisitSequenceMasked(
values, mask, [converter](PyObject* item, bool is_masked, bool* /* unused */) {
if (is_masked) {
return converter->AppendNull();
} else {
// This will also apply the null-checking convention in the event
// that the value is not masked
return converter->Append(item); // perhaps use AppendValue instead?
}
});
}

// The base Converter class is a mixin with predefined behavior and constructors.
using PyConverter = Converter<PyObject*, PyConversionOptions>;
class PyConverter : public Converter<PyObject*, PyConversionOptions> {
public:
// Iterate over the input values and defer the conversion to the Append method
Status Extend(PyObject* values, int64_t size) override {
/// Ensure we've allocated enough space
RETURN_NOT_OK(this->Reserve(size));
// Iterate over the items adding each one
return internal::VisitSequence(values, [this](PyObject* item, bool* /* unused */) {
return this->Append(item);
});
}

// Convert and append a sequence of values masked with a numpy array
Status ExtendMasked(PyObject* values, PyObject* mask, int64_t size) override {
/// Ensure we've allocated enough space
RETURN_NOT_OK(this->Reserve(size));
// Iterate over the items adding each one
return internal::VisitSequenceMasked(
values, mask, [this](PyObject* item, bool is_masked, bool* /* unused */) {
if (is_masked) {
return this->AppendNull();
} else {
// This will also apply the null-checking convention in the event
// that the value is not masked
return this->Append(item); // perhaps use AppendValue instead?
}
});
}
};

template <typename T, typename Enable = void>
class PyPrimitiveConverter;
Expand Down Expand Up @@ -669,7 +669,7 @@ class PyListConverter : public ListConverter<T, PyConverter, PyConverterTrait> {
Status AppendSequence(PyObject* value) {
int64_t size = static_cast<int64_t>(PySequence_Size(value));
RETURN_NOT_OK(this->list_builder_->ValidateOverflow(size));
return Extend(this->value_converter_.get(), value, size);
return this->value_converter_->Extend(value, size);
}

Status AppendNdarray(PyObject* value) {
Expand All @@ -684,12 +684,12 @@ class PyListConverter : public ListConverter<T, PyConverter, PyConverterTrait> {
switch (value_type->id()) {
// If the value type does not match the expected NumPy dtype, then fall through
// to a slower PySequence-based path
#define LIST_FAST_CASE(TYPE_ID, TYPE, NUMPY_TYPE) \
case Type::TYPE_ID: { \
if (PyArray_DESCR(ndarray)->type_num != NUMPY_TYPE) { \
return Extend(this->value_converter_.get(), value, size); \
} \
return AppendNdarrayTyped<TYPE, NUMPY_TYPE>(ndarray); \
#define LIST_FAST_CASE(TYPE_ID, TYPE, NUMPY_TYPE) \
case Type::TYPE_ID: { \
if (PyArray_DESCR(ndarray)->type_num != NUMPY_TYPE) { \
return this->value_converter_->Extend(value, size); \
} \
return AppendNdarrayTyped<TYPE, NUMPY_TYPE>(ndarray); \
}
LIST_FAST_CASE(BOOL, BooleanType, NPY_BOOL)
LIST_FAST_CASE(UINT8, UInt8Type, NPY_UINT8)
Expand All @@ -707,7 +707,7 @@ class PyListConverter : public ListConverter<T, PyConverter, PyConverterTrait> {
LIST_FAST_CASE(DURATION, DurationType, NPY_TIMEDELTA)
#undef LIST_FAST_CASE
default: {
return Extend(this->value_converter_.get(), value, size);
return this->value_converter_->Extend(value, size);
}
}
}
Expand Down Expand Up @@ -1041,18 +1041,18 @@ Result<std::shared_ptr<ChunkedArray>> ConvertPySequence(PyObject* obj, PyObject*
// the overflow and automatically creates new chunks.
ARROW_ASSIGN_OR_RAISE(auto chunked_converter, MakeChunker(std::move(converter)));
if (mask != nullptr && mask != Py_None) {
RETURN_NOT_OK(ExtendMasked(chunked_converter.get(), seq, mask, size));
RETURN_NOT_OK(chunked_converter->ExtendMasked(seq, mask, size));
} else {
RETURN_NOT_OK(Extend(chunked_converter.get(), seq, size));
RETURN_NOT_OK(chunked_converter->Extend(seq, size));
}
return chunked_converter->ToChunkedArray();
} else {
// If the converter can't overflow spare the capacity error checking on the hot-path,
// this improves the performance roughly by ~10% for primitive types.
if (mask != nullptr && mask != Py_None) {
RETURN_NOT_OK(ExtendMasked(converter.get(), seq, mask, size));
RETURN_NOT_OK(converter->ExtendMasked(seq, mask, size));
} else {
RETURN_NOT_OK(Extend(converter.get(), seq, size));
RETURN_NOT_OK(converter->Extend(seq, size));
}
return converter->ToChunkedArray();
}
Expand Down
38 changes: 37 additions & 1 deletion cpp/src/arrow/util/converter.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,15 @@ class Converter {
return Init(pool);
}

virtual Status Append(InputType value) = 0;
virtual Status Append(InputType value) { return Status::NotImplemented("Append"); }
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bkietz I think we should keep both Append and Extend since we wrap the converter object with the Chunker, so if one implementation (like the python one) choose to use Append then we don't need to subclass the chunker class.


virtual Status Extend(InputType values, int64_t size) {
return Status::NotImplemented("Extend");
}

virtual Status ExtendMasked(InputType values, InputType mask, int64_t size) {
return Status::NotImplemented("ExtendMasked");
}

const std::shared_ptr<ArrayBuilder>& builder() const { return builder_; }

Expand Down Expand Up @@ -294,6 +302,34 @@ class Chunker {
return status;
}

// we could get bit smarter here since the whole batch of appendable values
// will be rejected if a capacity error is raised
Status Extend(InputType values, int64_t size) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The chunker's extend wrapper is untested since we use Append from the python code. Theoretically we can wrap the converter's Extend method just like in the case of Append though we reject the whole batch rather than a single item.

We could improve this logic but would require details about the iteration.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think adding a int64_t start in addition to size as in #8650 would help. but I haven't yet reviewed how the chunker code works, e.g. can it "know" that it can handle n extra values in the current chunk ?

auto status = converter_->Extend(values, size);
if (ARROW_PREDICT_FALSE(status.IsCapacityError())) {
if (converter_->builder()->length() == 0) {
return status;
}
ARROW_RETURN_NOT_OK(FinishChunk());
return Extend(values, size);
}
length_ += size;
return status;
}

Status ExtendMasked(InputType values, InputType mask, int64_t size) {
auto status = converter_->ExtendMasked(values, mask, size);
if (ARROW_PREDICT_FALSE(status.IsCapacityError())) {
if (converter_->builder()->length() == 0) {
return status;
}
ARROW_RETURN_NOT_OK(FinishChunk());
return ExtendMasked(values, mask, size);
}
length_ += size;
return status;
}

Status FinishChunk() {
ARROW_ASSIGN_OR_RAISE(auto chunk, converter_->ToArray(length_));
chunks_.push_back(chunk);
Expand Down