Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions cpp/src/arrow/array/array_dict.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,10 @@ class ARROW_EXPORT DictionaryArray : public Array {

const DictionaryType* dict_type() const { return dict_type_; }

bool IsNull(int64_t i) const {
return indices_->IsNull(i) || dictionary()->IsNull(GetValueIndex(i));
}

private:
void SetData(const std::shared_ptr<ArrayData>& data);
const DictionaryType* dict_type_;
Expand Down
83 changes: 82 additions & 1 deletion cpp/src/arrow/compute/kernels/vector_array_sort.cc
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,78 @@ class ArrayCompareSorter {
}
};

template <>
class ArrayCompareSorter<DictionaryType> {
public:
NullPartitionResult operator()(uint64_t* indices_begin, uint64_t* indices_end,
const Array& array, int64_t offset,
const ArraySortOptions& options) {
const auto& dict_array = checked_cast<const DictionaryArray&>(array);
const auto& dict_values = dict_array.dictionary();
const auto& dict_indices = dict_array.indices();

// Algorithm:
// 1) Use the Rank function to get an exactly-equivalent-order array
// of the dictionary values, but with a datatype that's friendlier to
// sorting (uint64).
// 2) Act as if we were sorting a dictionary array with the same indices,
// but with the ranks as dictionary values.
// 2a) Dictionary-decode the ranks by calling Take.
// 2b) Sort the decoded ranks. Not only those are uint64, they are dense
// in a [0, k) range where k is the number of unique dictionary values.
// Therefore, unless the dictionary is very large, a fast counting sort
// will be used.
//
// The bottom line is that performance will usually be much better
// (potentially an order of magnitude faster) than by naively decoding
// the original dictionary and sorting the decoded version.

// TODO special-case all-nulls arrays to avoid ranking and decoding them?

// FIXME Should be able to use the caller's KernelContext for rank() and take()

// FIXME Propagate errors instead of aborting
auto ranks = *RanksWithNulls(dict_values);

auto decoded_ranks = *Take(*ranks, *dict_indices);

auto rank_sorter = *GetArraySorter(*decoded_ranks->type() /* should be uint64 */);
return rank_sorter(indices_begin, indices_end, *decoded_ranks, offset, options);
}

private:
Result<std::shared_ptr<Array>> RanksWithNulls(const std::shared_ptr<Array>& array) {
// Notes:
// * The order is always ascending here, since the goal is to produce
// an exactly-equivalent-order of the dictionary values.
// * We're going to re-emit nulls in the output, so we can just always consider
// them "at the end". Note that choosing AtStart would merely shift other
// ranks by 1 if there are any nulls...
RankOptions rank_options(SortOrder::Ascending, NullPlacement::AtEnd,
RankOptions::Dense);

// XXX Should this support Type::NA?
auto data = array->data();
std::shared_ptr<Buffer> null_bitmap;
if (array->null_count() > 0) {
null_bitmap = array->null_bitmap();
data = array->data()->Copy();
data->buffers[0] = nullptr;
data->null_count = 0;
DCHECK_EQ(data->offset, 0); // FIXME
}
ARROW_ASSIGN_OR_RAISE(auto rank_datum, CallFunction("rank", {array}, &rank_options));
auto rank_data = rank_datum.array();
DCHECK_EQ(rank_data->GetNullCount(), 0);
// If there were nulls in the input, paste them in the output
if (null_bitmap) {
rank_data->buffers[0] = std::move(null_bitmap);
rank_data->null_count = array->null_count();
}
return MakeArray(rank_data);
}
};

template <typename ArrowType>
class ArrayCountSorter {
using ArrayType = typename TypeTraits<ArrowType>::ArrayType;
Expand Down Expand Up @@ -405,7 +477,8 @@ struct ArraySorter<Type, enable_if_t<is_integer_type<Type>::value &&
template <typename Type>
struct ArraySorter<
Type, enable_if_t<is_floating_type<Type>::value || is_base_binary_type<Type>::value ||
is_fixed_size_binary_type<Type>::value>> {
is_fixed_size_binary_type<Type>::value ||
is_dictionary_type<Type>::value>> {
ArrayCompareSorter<Type> impl;
};

Expand Down Expand Up @@ -507,6 +580,13 @@ void AddArraySortingKernels(VectorKernel base, VectorFunction* func) {
DCHECK_OK(func->AddKernel(base));
}

template <template <typename...> class ExecTemplate>
void AddDictArraySortingKernels(VectorKernel base, VectorFunction* func) {
base.signature = KernelSignature::Make({Type::DICTIONARY}, uint64());
base.exec = ExecTemplate<UInt64Type, DictionaryType>::Exec;
DCHECK_OK(func->AddKernel(base));
}

const ArraySortOptions* GetDefaultArraySortOptions() {
static const auto kDefaultArraySortOptions = ArraySortOptions::Defaults();
return &kDefaultArraySortOptions;
Expand Down Expand Up @@ -561,6 +641,7 @@ void RegisterVectorArraySort(FunctionRegistry* registry) {
base.init = ArraySortIndicesState::Init;
base.exec_chunked = ArraySortIndicesChunked;
AddArraySortingKernels<ArraySortIndices>(base, array_sort_indices.get());
AddDictArraySortingKernels<ArraySortIndices>(base, array_sort_indices.get());
DCHECK_OK(registry->AddFunction(std::move(array_sort_indices)));

// partition_nth_indices has a parameter so needs its init function
Expand Down
8 changes: 4 additions & 4 deletions cpp/src/arrow/compute/kernels/vector_sort.cc
Original file line number Diff line number Diff line change
Expand Up @@ -374,8 +374,8 @@ class ConcreteRecordBatchColumnSorter : public RecordBatchColumnSorter {
} else {
// NOTE that null_count_ is merely an upper bound on the number of nulls
// in this particular range.
p = PartitionNullsOnly<StablePartitioner>(indices_begin, indices_end, array_,
offset, null_placement_);
p = PartitionNullsOnly<ArrayType, StablePartitioner>(
indices_begin, indices_end, array_, offset, null_placement_);
DCHECK_LE(p.nulls_end - p.nulls_begin, null_count_);
}
const NullPartitionResult q = PartitionNullLikes<ArrayType, StablePartitioner>(
Expand Down Expand Up @@ -797,8 +797,8 @@ class MultipleKeyRecordBatchSorter : public TypeVisitor {
using ArrayType = typename TypeTraits<Type>::ArrayType;
const ArrayType& array = checked_cast<const ArrayType&>(first_sort_key.array);

const auto p = PartitionNullsOnly<StablePartitioner>(indices_begin_, indices_end_,
array, 0, null_placement_);
const auto p = PartitionNullsOnly<ArrayType, StablePartitioner>(
indices_begin_, indices_end_, array, 0, null_placement_);
const auto q = PartitionNullLikes<ArrayType, StablePartitioner>(
p.non_nulls_begin, p.non_nulls_end, array, 0, null_placement_);

Expand Down
81 changes: 81 additions & 0 deletions cpp/src/arrow/compute/kernels/vector_sort_benchmark.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,17 @@

namespace arrow {
namespace compute {

constexpr auto kSeed = 0x0ff1ce;
constexpr int32_t kDictionarySize = 20; // a typical dictionary size

static void ArraySortIndicesBenchmark(benchmark::State& state,
const std::shared_ptr<Array>& values) {
for (auto _ : state) {
ABORT_NOT_OK(SortIndices(*values).status());
}
// This may be redundant with the SetItemsProcessed() call in RegressionArgs,
// if size_is_bytes was false.
state.SetItemsProcessed(state.iterations() * values->length());
}

Expand All @@ -56,6 +60,51 @@ static void ArraySortIndicesInt64Benchmark(benchmark::State& state, int64_t min,
ArraySortIndicesBenchmark(state, values);
}

static void ArraySortIndicesInt64DictBenchmark(benchmark::State& state, int64_t min,
int64_t max) {
RegressionArgs args(state);

const int64_t array_size = args.size / sizeof(int64_t);

auto rand = random::RandomArrayGenerator(kSeed);
auto dict_values = rand.Int64(kDictionarySize, min, max, args.null_proportion / 2);
auto dict_indices =
rand.Int64(array_size, 0, kDictionarySize - 1, args.null_proportion / 2);
auto dict_array = *DictionaryArray::FromArrays(dict_indices, dict_values);

ArraySortIndicesBenchmark(state, dict_array);
}

static void ArraySortIndicesStringsBenchmark(benchmark::State& state) {
RegressionArgs args(state, /*size_is_bytes=*/false);
// XXX This division to make numbers comparable with ArraySortIndicesInt64Benchmark
// (including the SetItemsProcessed() call in the RegressionArgs destructor)
args.size /= sizeof(int64_t);

const int64_t array_size = args.size;
auto rand = random::RandomArrayGenerator(kSeed);
auto values =
rand.String(array_size, /*min_length=*/3, /*max_length=*/25, args.null_proportion);

ArraySortIndicesBenchmark(state, values);
}

static void ArraySortIndicesStringsDictBenchmark(benchmark::State& state) {
RegressionArgs args(state, /*size_is_bytes=*/false);
// XXX Same as in ArraySortIndicesStringsBenchmark above
args.size /= sizeof(int64_t);

const int64_t array_size = args.size;
auto rand = random::RandomArrayGenerator(kSeed);
auto dict_values = rand.String(kDictionarySize, /*min_length=*/3, /*max_length=*/25,
args.null_proportion / 2);
auto dict_indices =
rand.Int64(array_size, 0, kDictionarySize - 1, args.null_proportion / 2);
auto dict_array = *DictionaryArray::FromArrays(dict_indices, dict_values);

ArraySortIndicesBenchmark(state, dict_array);
}

static void ChunkedArraySortIndicesInt64Benchmark(benchmark::State& state, int64_t min,
int64_t max) {
RegressionArgs args(state);
Expand All @@ -81,6 +130,20 @@ static void ArraySortIndicesInt64Wide(benchmark::State& state) {
ArraySortIndicesInt64Benchmark(state, min, max);
}

static void ArraySortIndicesInt64Dict(benchmark::State& state) {
const auto min = std::numeric_limits<int64_t>::min();
const auto max = std::numeric_limits<int64_t>::max();
ArraySortIndicesInt64DictBenchmark(state, min, max);
}

static void ArraySortIndicesStrings(benchmark::State& state) {
ArraySortIndicesStringsBenchmark(state);
}

static void ArraySortIndicesStringsDict(benchmark::State& state) {
ArraySortIndicesStringsDictBenchmark(state);
}

static void ArraySortIndicesBool(benchmark::State& state) {
RegressionArgs args(state);

Expand Down Expand Up @@ -249,6 +312,24 @@ BENCHMARK(ArraySortIndicesInt64Wide)
->Args({1 << 23, 100})
->Unit(benchmark::TimeUnit::kNanosecond);

BENCHMARK(ArraySortIndicesInt64Dict)
->Apply(RegressionSetArgs)
->Args({1 << 20, 100})
->Args({1 << 23, 100})
->Unit(benchmark::TimeUnit::kNanosecond);

BENCHMARK(ArraySortIndicesStrings)
->Apply(RegressionSetArgs)
->Args({1 << 20, 100})
->Args({1 << 23, 100})
->Unit(benchmark::TimeUnit::kNanosecond);

BENCHMARK(ArraySortIndicesStringsDict)
->Apply(RegressionSetArgs)
->Args({1 << 20, 100})
->Args({1 << 23, 100})
->Unit(benchmark::TimeUnit::kNanosecond);

BENCHMARK(ArraySortIndicesBool)
->Apply(RegressionSetArgs)
->Args({1 << 20, 100})
Expand Down
8 changes: 4 additions & 4 deletions cpp/src/arrow/compute/kernels/vector_sort_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -191,9 +191,9 @@ struct NullPartitionResult {
// Move nulls (not null-like values) to end of array.
//
// `offset` is used when this is called on a chunk of a chunked array
template <typename Partitioner>
template <typename ArrayType, typename Partitioner>
NullPartitionResult PartitionNullsOnly(uint64_t* indices_begin, uint64_t* indices_end,
const Array& values, int64_t offset,
const ArrayType& values, int64_t offset,
NullPlacement null_placement) {
if (values.null_count() == 0) {
return NullPartitionResult::NoNulls(indices_begin, indices_end, null_placement);
Expand Down Expand Up @@ -254,8 +254,8 @@ NullPartitionResult PartitionNulls(uint64_t* indices_begin, uint64_t* indices_en
const ArrayType& values, int64_t offset,
NullPlacement null_placement) {
// Partition nulls at start (resp. end), and null-like values just before (resp. after)
NullPartitionResult p = PartitionNullsOnly<Partitioner>(indices_begin, indices_end,
values, offset, null_placement);
NullPartitionResult p = PartitionNullsOnly<ArrayType, Partitioner>(
indices_begin, indices_end, values, offset, null_placement);
NullPartitionResult q = PartitionNullLikes<ArrayType, Partitioner>(
p.non_nulls_begin, p.non_nulls_end, values, offset, null_placement);
return NullPartitionResult{q.non_nulls_begin, q.non_nulls_end,
Expand Down
71 changes: 71 additions & 0 deletions cpp/src/arrow/compute/kernels/vector_sort_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,77 @@ TEST(ArraySortIndicesFunction, Array) {
AssertDatumsEqual(expected, actual, /*verbose=*/true);
}

TEST(ArraySortIndicesFunction, DictionaryArray) {
// Decoded dictionary array:
// (["b", "c", null, null, null, "b", "c", "c", "a"])

std::vector<std::string> expected_str = {
"[8, 0, 5, 1, 6, 7, 2, 3, 4]", // SortOrder::Ascending NullPlacement::AtEnd
"[2, 3, 4, 8, 0, 5, 1, 6, 7]", // SortOrder::Ascending NullPlacement::AtStart
"[1, 6, 7, 0, 5, 8, 2, 3, 4]", // SortOrder::Descending NullPlacement::AtEnd
"[2, 3, 4, 1, 6, 7, 0, 5, 8]" // SortOrder::Descending NullPlacement::AtStart
};

for (auto index_type : all_dictionary_index_types()) {
ARROW_SCOPED_TRACE("index_type = ", index_type->ToString());
int i = 0;
auto dict_arr = DictArrayFromJSON(dictionary(index_type, utf8()),
"[0, 4, null, 1, null, 0, 4, 2, 3]",
"[ \"b\", null, \"c\", \"a\", \"c\"]");
for (auto order : AllOrders()) {
for (auto null_placement : AllNullPlacements()) {
ARROW_SCOPED_TRACE("i = ", i);
ArraySortOptions options{order, null_placement};
auto expected = ArrayFromJSON(uint64(), expected_str[i++]);
ASSERT_OK_AND_ASSIGN(auto actual,
CallFunction("array_sort_indices", {dict_arr}, &options));
ValidateOutput(actual);
AssertDatumsEqual(expected, actual, /*verbose=*/true);
}
}
}
}

Result<std::shared_ptr<Array>> DecodeDictionary(const Array& array) {
const auto& dict_array = checked_cast<const DictionaryArray&>(array);
ARROW_ASSIGN_OR_RAISE(auto decoded_datum,
Take(dict_array.dictionary(), dict_array.indices()));
return decoded_datum.make_array();
}

TEST(ArraySortIndicesFunction, RandomDictionaryArray) {
::arrow::random::RandomArrayGenerator rng(/*seed=*/1234);
constexpr int64_t kLength = 200;
constexpr int64_t kDictLength = 20;

// Ensure there are duplicates in the dictionary and the indices,
// and nulls in both as well.
auto dict_values = rng.StringWithRepeats(kDictLength, /*unique=*/kDictLength / 2,
/*min_length=*/1, /*max_length=*/10,
/*null_probability=*/0.2);
auto dict_indices = rng.Int64(kLength, 0, kDictLength - 1, /*null_probability = */ 0.2);
ASSERT_OK_AND_ASSIGN(auto dict_array,
DictionaryArray::FromArrays(dict_indices, dict_values));
ASSERT_OK_AND_ASSIGN(auto decoded, DecodeDictionary(*dict_array));

for (auto order : AllOrders()) {
for (auto null_placement : AllNullPlacements()) {
ArraySortOptions options{order, null_placement};
// Sorting the dictionary array...
ASSERT_OK_AND_ASSIGN(auto actual,
CallFunction("array_sort_indices", {dict_array}, &options));
ValidateOutput(actual);

// should give identical results to sorting the decoded array
ASSERT_OK_AND_ASSIGN(auto expected,
CallFunction("array_sort_indices", {decoded}, &options));
AssertDatumsEqual(expected, actual, /*verbose=*/true);
}
}

// TODO test with sliced dict indices and values...
}

TEST(ArraySortIndicesFunction, ChunkedArray) {
auto arr = ChunkedArrayFromJSON(int16(), {"[0, 1]", "[null, -3, null, -42, 5]"});
auto expected = ChunkedArrayFromJSON(uint64(), {"[5, 3, 0, 1, 6, 2, 4]"});
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/util/benchmark_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ void RegressionSetArgs(benchmark::internal::Benchmark* bench) {
// RAII struct to handle some of the boilerplate in regression benchmarks
struct RegressionArgs {
// size of memory tested (per iteration) in bytes
const int64_t size;
int64_t size;

// proportion of nulls in generated arrays
double null_proportion;
Expand Down