Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions cpp/src/arrow/compute/kernels/chunked_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,19 @@ struct ResolvedChunk<Array> {
bool IsNull() const { return array->IsNull(index); }
};

// ResolvedChunk specialization for StructArray
template <>
struct ResolvedChunk<StructArray> {
// The target structarray in chunked array.
const StructArray* array;
// The index in the target array.
const int64_t index;

ResolvedChunk(const StructArray* array, int64_t index) : array(array), index(index) {}

bool IsNull() const { return array->field(static_cast<int>(index)) == nullptr; }
};

struct ChunkedArrayResolver : protected ::arrow::internal::ChunkResolver {
ChunkedArrayResolver(const ChunkedArrayResolver& other)
: ::arrow::internal::ChunkResolver(other.chunks_), chunks_(other.chunks_) {}
Expand Down
56 changes: 56 additions & 0 deletions cpp/src/arrow/compute/kernels/vector_array_sort.cc
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,49 @@ class ArrayCompareSorter {
}
};

template <typename ArrowType>
class StructArrayCompareSorter {
using ArrayType = typename TypeTraits<ArrowType>::ArrayType;

public:
// `offset` is used when this is called on a chunk of a chunked array
NullPartitionResult operator()(uint64_t* indices_begin, uint64_t* indices_end,
const Array& array, int64_t offset,
const ArraySortOptions& options) {
const auto& values = checked_cast<const ArrayType&>(array);
nested_value_comparator_ = std::make_shared<NestedValuesComparator>();

if (nested_value_comparator_->Prepare(values) != Status::OK()) {
// TODO: Improve error handling
return NullPartitionResult();
}

const auto p = PartitionNulls<ArrayType, StablePartitioner>(
indices_begin, indices_end, values, offset, options.null_placement);

bool asc_order = options.order == SortOrder::Ascending;
std::stable_sort(p.non_nulls_begin, p.non_nulls_end,
[&offset, &values, asc_order, this](uint64_t left, uint64_t right) {
// is better to do values.fields.size() or
// values.schema().num_fields() ?
for (ArrayVector::size_type fieldidx = 0;
fieldidx < values.fields().size(); ++fieldidx) {
int result = nested_value_comparator_->Compare(
values, fieldidx, offset, asc_order ? left : right,
asc_order ? right : left);
if (result == -1)
return true;
else if (result == 1)
return false;
}
return false;
});
return p;
}

std::shared_ptr<NestedValuesComparator> nested_value_comparator_;
};

template <typename ArrowType>
class ArrayCountSorter {
using ArrayType = typename TypeTraits<ArrowType>::ArrayType;
Expand Down Expand Up @@ -409,6 +452,11 @@ struct ArraySorter<
ArrayCompareSorter<Type> impl;
};

template <typename Type>
struct ArraySorter<Type, enable_if_t<is_struct_type<Type>::value>> {
StructArrayCompareSorter<Type> impl;
};

struct ArraySorterFactory {
ArraySortFunc sorter;

Expand Down Expand Up @@ -507,6 +555,13 @@ void AddArraySortingKernels(VectorKernel base, VectorFunction* func) {
DCHECK_OK(func->AddKernel(base));
}

template <template <typename...> class ExecTemplate>
void AddArraySortingNestedKernels(VectorKernel base, VectorFunction* func) {
base.signature = KernelSignature::Make({Type::STRUCT}, uint64());
base.exec = ExecTemplate<UInt64Type, StructType>::Exec;
DCHECK_OK(func->AddKernel(base));
}

const ArraySortOptions* GetDefaultArraySortOptions() {
static const auto kDefaultArraySortOptions = ArraySortOptions::Defaults();
return &kDefaultArraySortOptions;
Expand Down Expand Up @@ -561,6 +616,7 @@ void RegisterVectorArraySort(FunctionRegistry* registry) {
base.init = ArraySortIndicesState::Init;
base.exec_chunked = ArraySortIndicesChunked;
AddArraySortingKernels<ArraySortIndices>(base, array_sort_indices.get());
AddArraySortingNestedKernels<ArraySortIndices>(base, array_sort_indices.get());
DCHECK_OK(registry->AddFunction(std::move(array_sort_indices)));

// partition_nth_indices has a parameter so needs its init function
Expand Down
17 changes: 6 additions & 11 deletions cpp/src/arrow/compute/kernels/vector_sort.cc
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,8 @@ class ChunkedArraySorter : public TypeVisitor {
return Status::OK();
}

Status Visit(const StructType&) override { return SortInternal<StructType>(); }

private:
template <typename Type>
Status SortInternal() {
Expand Down Expand Up @@ -269,25 +271,18 @@ class ChunkedArraySorter : public TypeVisitor {
template <typename ArrayType>
void MergeNonNulls(uint64_t* range_begin, uint64_t* range_middle, uint64_t* range_end,
const std::vector<const Array*>& arrays, uint64_t* temp_indices) {
const ChunkedArrayResolver left_resolver(arrays);
const ChunkedArrayResolver right_resolver(arrays);
const ChunkedArrayResolver chunk_resolver(arrays);
ResolvedChunkComparator<ArrayType> resolved_chunk_comparator;

if (order_ == SortOrder::Ascending) {
std::merge(range_begin, range_middle, range_middle, range_end, temp_indices,
[&](uint64_t left, uint64_t right) {
const auto chunk_left = left_resolver.Resolve<ArrayType>(left);
const auto chunk_right = right_resolver.Resolve<ArrayType>(right);
return chunk_left.Value() < chunk_right.Value();
return resolved_chunk_comparator.Compare(chunk_resolver, left, right);
});
} else {
std::merge(range_begin, range_middle, range_middle, range_end, temp_indices,
[&](uint64_t left, uint64_t right) {
const auto chunk_left = left_resolver.Resolve<ArrayType>(left);
const auto chunk_right = right_resolver.Resolve<ArrayType>(right);
// We don't use 'left > right' here to reduce required
// operator. If we use 'right < left' here, '<' is only
// required.
return chunk_right.Value() < chunk_left.Value();
return resolved_chunk_comparator.Compare(chunk_resolver, right, left);
});
}
// Copy back temp area into main buffer
Expand Down
143 changes: 143 additions & 0 deletions cpp/src/arrow/compute/kernels/vector_sort_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,149 @@ Status SortChunkedArray(ExecContext* ctx, uint64_t* indices_begin, uint64_t* ind
const ChunkedArray& values, SortOrder sort_order,
NullPlacement null_placement);

class NestedValuesComparator {
public:
// StructArray Compare overload
int Compare(StructArray const& array, uint64_t field_index, uint64_t offset,
uint64_t leftrowidx, uint64_t rightrowidx) {
std::shared_ptr<Array> field = array.field(static_cast<int>(field_index));
return comparators_[field_index]->Compare(*field, *field, offset, leftrowidx,
rightrowidx);
}

// StructArray Compare overload for different struct arrays
int Compare(StructArray const& array_left, StructArray const& array_right,
uint64_t field_index, uint64_t offset, uint64_t leftrowidx,
uint64_t rightrowidx) {
std::shared_ptr<Array> field_left = array_left.field(static_cast<int>(field_index));
std::shared_ptr<Array> field_right = array_right.field(static_cast<int>(field_index));
return comparators_[field_index]->Compare(*field_left, *field_right, offset,
leftrowidx, rightrowidx);
}

// StructArray Prepare overload
Status Prepare(StructArray const& array) {
auto fields = array.fields();
for (auto field = fields.begin(); field != fields.end(); field++) {
std::shared_ptr<DataType> physical_type = GetPhysicalType((*field)->type());
NestedComparatorFactory comparator_factory = NestedComparatorFactory();
std::shared_ptr<NestedValueComparator> current_field_comparator;
ARROW_ASSIGN_OR_RAISE(
current_field_comparator,
NestedComparatorFactory().MakeFieldComparator(*physical_type));
comparators_.push_back(current_field_comparator);
}
return Status::OK();
}

private:
struct NestedValueComparator {
virtual int Compare(Array const& left_array, Array const& right_array,
uint64_t offset, uint64_t leftidx, uint64_t rightidx) = 0;

virtual ~NestedValueComparator() = default;
};

template <typename Type>
struct ConcreteNestedValueComparator : NestedValueComparator {
using FieldArrayType = typename TypeTraits<Type>::ArrayType;
using GetView = GetViewType<Type>;

virtual int Compare(Array const& left_array, Array const& right_array,
uint64_t offset, uint64_t leftidx, uint64_t rightidx) {
const FieldArrayType& left_values = checked_cast<const FieldArrayType&>(left_array);
const FieldArrayType& right_values =
checked_cast<const FieldArrayType&>(right_array);
auto left_value = GetView::LogicalValue(left_values.GetView(leftidx - offset));
auto right_value = GetView::LogicalValue(right_values.GetView(rightidx - offset));

if (left_value == right_value)
return 0;
else if (left_value < right_value)
return -1;
else
return 1;
}
};

/**
* Internal use factory whose purpose is to detect the right type
* of comparator that should be built to be able to compare two
* nested values in the same field or column
*/
struct NestedComparatorFactory {
Result<std::shared_ptr<NestedValueComparator>> MakeFieldComparator(
DataType const& physical_type) {
RETURN_NOT_OK(VisitTypeInline(physical_type, this));
DCHECK_NE(current_field_comparator_, nullptr);
return std::move(current_field_comparator_);
}

#define VISIT(TYPE) \
Status Visit(const TYPE& type) { return VisitGeneric(type); }

VISIT_SORTABLE_PHYSICAL_TYPES(VISIT)
#undef VISIT

Status Visit(const DataType& type) {
return Status::TypeError("Unsupported type for NestedComparatorFactory: ",
type.ToString());
}

template <typename Type>
Status VisitGeneric(const Type&) {
current_field_comparator_ = std::shared_ptr<NestedValueComparator>(
new ConcreteNestedValueComparator<Type>());
return Status::OK();
}

std::shared_ptr<NestedValueComparator> current_field_comparator_;
};

std::vector<std::shared_ptr<NestedValueComparator>> comparators_;
};

template <typename ArrayType>
struct ResolvedChunkComparator {
bool Compare(const ChunkedArrayResolver& chunk_resolver, uint64_t left,
uint64_t right) {
const auto chunk_left = chunk_resolver.Resolve<ArrayType>(left);
const auto chunk_right = chunk_resolver.Resolve<ArrayType>(right);
return chunk_left.Value() < chunk_right.Value();
}
};

template <>
struct ResolvedChunkComparator<StructArray> {
bool Compare(const ChunkedArrayResolver& chunk_resolver, uint64_t left,
uint64_t right) {
const auto chunk_left = chunk_resolver.Resolve<StructArray>(left);
const auto chunk_right = chunk_resolver.Resolve<StructArray>(right);
NestedValuesComparator nested_values_comparator;
auto status = nested_values_comparator.Prepare(*chunk_left.array);

if (!status.ok()) {
return false;
}

for (int field_index = 0; field_index < chunk_left.array->num_fields();
field_index++) {
int val = nested_values_comparator.Compare(*(chunk_left.array),
*(chunk_right.array), field_index, 0,
chunk_left.index, chunk_right.index);

if (val == 0) {
// if field values are equal, check next field
continue;
}

return val == -1;
}

return false;
}
};

} // namespace internal
} // namespace compute
} // namespace arrow
39 changes: 39 additions & 0 deletions python/pyarrow/array.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -1402,6 +1402,22 @@ cdef class Array(_PandasConvertible):
"""
return _pc().index(self, value, start, end, memory_pool=memory_pool)

def sort(self, order="ascending"):
"""
Sort the Array

Parameters
----------
order : "ascending" or "descending"
The order of the sorting.

Returns
-------
result : Array
"""
indices = _pc().sort_indices(self, sort_keys=[("", order)])
return self.take(indices)

def _to_pandas(self, options, types_mapper=None, **kwargs):
return _array_like_to_pandas(self, options, types_mapper=types_mapper)

Expand Down Expand Up @@ -2746,6 +2762,29 @@ cdef class StructArray(Array):
result.validate()
return result

def sort(self, order="ascending", fieldname=None):
"""
Sort the StructArray

Parameters
----------
order : "ascending" or "descending"
The order of the sorting.
fieldname : str or None, default None
If to sort the array by one of its fields
or by the whole array.

Returns
-------
result : StructArray
"""
if fieldname is not None:
tosort = self.field(fieldname)
Comment on lines +2781 to +2782
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We currently don't use "fieldname" anywhere. Maybe just field as the keyword argument?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

field implied a Field instance to me, that's why I went for fieldname.

else:
tosort = self
indices = _pc().sort_indices(tosort, sort_keys=[("", order)])
return self.take(indices)


cdef class ExtensionArray(Array):
"""
Expand Down
16 changes: 16 additions & 0 deletions python/pyarrow/table.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,22 @@ cdef class ChunkedArray(_PandasConvertible):

return result

def sort(self, order="ascending"):
"""
Sort the ChunkedArray
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we support the fieldname keyword here as well? (and raise an error in case the type of the ChunkedArray is not a struct)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ChunkedArray doesn't provide support for accessing fields directly (IE: no ChunkedArray.field(X) method) so we would probably have to implement that too if we want to add support for sorting an individual field. Which seems to expand the scope of this change. I propose we make a separate ticket for improving support for Struct data in ChunkedArray

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's indeed a general issue that ChunkedArray can be annoying when working with a type that type-specific methods on the array (that's not only for StructArray)


Parameters
----------
order : "ascending" or "descending"
The order of the sorting.

Returns
-------
result : ChunkedArray
"""
indices = _pc().sort_indices(self, sort_keys=[("", order)])
return self.take(indices)

def _to_pandas(self, options, types_mapper=None, **kwargs):
return _array_like_to_pandas(self, options, types_mapper=types_mapper)

Expand Down
Loading