From 98673700168afac582c3e26abc3755443cfbe68f Mon Sep 17 00:00:00 2001 From: michalursa Date: Tue, 30 Aug 2022 01:26:58 -0700 Subject: [PATCH] Adding helper classes for computing window aggregates and distinct aggregates --- cpp/src/arrow/CMakeLists.txt | 4 + cpp/src/arrow/compute/exec/util.h | 18 + .../window_functions/bit_vector_navigator.cc | 121 ++++ .../window_functions/bit_vector_navigator.h | 158 +++++ .../exec/window_functions/merge_tree.cc | 231 +++++++ .../exec/window_functions/merge_tree.h | 197 ++++++ .../exec/window_functions/radix_sort.h | 94 +++ .../exec/window_functions/window_aggregate.cc | 376 +++++++++++ .../exec/window_functions/window_aggregate.h | 151 +++++ .../exec/window_functions/window_distinct.cc | 618 ++++++++++++++++++ .../exec/window_functions/window_distinct.h | 89 +++ .../exec/window_functions/window_expr.h | 143 ++++ .../exec/window_functions/window_frame.h | 143 ++++ 13 files changed, 2343 insertions(+) create mode 100644 cpp/src/arrow/compute/exec/window_functions/bit_vector_navigator.cc create mode 100644 cpp/src/arrow/compute/exec/window_functions/bit_vector_navigator.h create mode 100644 cpp/src/arrow/compute/exec/window_functions/merge_tree.cc create mode 100644 cpp/src/arrow/compute/exec/window_functions/merge_tree.h create mode 100644 cpp/src/arrow/compute/exec/window_functions/radix_sort.h create mode 100644 cpp/src/arrow/compute/exec/window_functions/window_aggregate.cc create mode 100644 cpp/src/arrow/compute/exec/window_functions/window_aggregate.h create mode 100644 cpp/src/arrow/compute/exec/window_functions/window_distinct.cc create mode 100644 cpp/src/arrow/compute/exec/window_functions/window_distinct.h create mode 100644 cpp/src/arrow/compute/exec/window_functions/window_expr.h create mode 100644 cpp/src/arrow/compute/exec/window_functions/window_frame.h diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt index 2cae7a731f9..7ab81b18386 100644 --- a/cpp/src/arrow/CMakeLists.txt +++ b/cpp/src/arrow/CMakeLists.txt @@ -408,6 +408,10 @@ if(ARROW_COMPUTE) compute/exec/tpch_node.cc compute/exec/union_node.cc compute/exec/util.cc + compute/exec/window_functions/bit_vector_navigator.cc + compute/exec/window_functions/merge_tree.cc + compute/exec/window_functions/window_aggregate.cc + compute/exec/window_functions/window_distinct.cc compute/function.cc compute/function_internal.cc compute/kernel.cc diff --git a/cpp/src/arrow/compute/exec/util.h b/cpp/src/arrow/compute/exec/util.h index 7e716808fa0..210bfae7e8c 100644 --- a/cpp/src/arrow/compute/exec/util.h +++ b/cpp/src/arrow/compute/exec/util.h @@ -19,6 +19,7 @@ #include #include +#include #include #include #include @@ -361,5 +362,22 @@ struct ARROW_EXPORT TableSinkNodeConsumer : public SinkNodeConsumer { util::Mutex consume_mutex_; }; +class Random64BitCopy { + public: + Random64BitCopy() : rs{0, 0, 0, 0, 0, 0, 0, 0}, re(rs) {} + uint64_t next() { return rdist(re); } + template + inline T from_range(const T& min_val, const T& max_val) { + return static_cast(min_val + (next() % (max_val - min_val + 1))); + } + std::mt19937& get_engine() { return re; } + + private: + std::random_device rd; + std::seed_seq rs; + std::mt19937 re; + std::uniform_int_distribution rdist; +}; + } // namespace compute } // namespace arrow diff --git a/cpp/src/arrow/compute/exec/window_functions/bit_vector_navigator.cc b/cpp/src/arrow/compute/exec/window_functions/bit_vector_navigator.cc new file mode 100644 index 00000000000..6a7b6da16ec --- /dev/null +++ b/cpp/src/arrow/compute/exec/window_functions/bit_vector_navigator.cc @@ -0,0 +1,121 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/compute/exec/window_functions/bit_vector_navigator.h" +#include +#include "arrow/compute/exec/util.h" + +namespace arrow { +namespace compute { + +void BitVectorNavigator::SelectsForRangeOfRanks( + int64_t rank_begin, int64_t rank_end, int64_t num_bits, const uint64_t* bitvec, + const uint64_t* popcounts, int64_t* outputs, int64_t hardware_flags, + util::TempVectorStack* temp_vector_stack) { + ARROW_DCHECK(rank_begin <= rank_end); + if (rank_begin == rank_end) { + return; + } + int64_t popcount_all = PopCount(num_bits, bitvec, popcounts); + if (rank_end <= 0LL) { + for (int64_t i = 0LL; i < rank_end - rank_begin; ++i) { + outputs[i] = -1LL; + } + return; + } + if (rank_begin >= popcount_all) { + for (int64_t i = 0LL; i < rank_end - rank_begin; ++i) { + outputs[i] = num_bits; + } + return; + } + if (rank_begin < 0LL) { + for (int64_t i = 0LL; i < -rank_begin; ++i) { + outputs[i] = -1LL; + } + outputs += -rank_begin; + rank_begin = 0LL; + } + if (rank_end > popcount_all) { + for (int64_t i = popcount_all - rank_begin; i < rank_end - rank_begin; ++i) { + outputs[i] = num_bits; + } + rank_end = popcount_all; + } + + int64_t minibatch_length_max = util::MiniBatch::kMiniBatchLength; + auto indexes = util::TempVectorHolder( + temp_vector_stack, static_cast(minibatch_length_max)); + int num_indexes; + + int64_t first_select = + BitVectorNavigator::Select(rank_begin, num_bits, bitvec, popcounts); + int64_t last_select = + BitVectorNavigator::Select(rank_begin, num_bits, bitvec, popcounts); + + for (int64_t minibatch_begin = first_select; minibatch_begin < last_select + 1; + minibatch_begin += minibatch_length_max) { + int64_t minibatch_end = + std::min(last_select + 1, minibatch_begin + minibatch_length_max); + util::bit_util::bits_to_indexes( + /*bit_to_search=*/1, hardware_flags, minibatch_end - minibatch_begin, + reinterpret_cast(bitvec), &num_indexes, indexes.mutable_data(), + minibatch_begin); + for (int i = 0; i < num_indexes; ++i) { + outputs[i] = minibatch_begin + indexes.mutable_data()[i]; + } + outputs += num_indexes; + } +} + +void BitVectorNavigator::SelectsForRelativeRanksForRangeOfRows( + int64_t batch_begin, int64_t batch_end, int64_t rank_delta, int64_t num_rows, + const uint64_t* ties_bitvec, const uint64_t* ties_popcounts, int64_t* outputs, + int64_t hardware_flags, util::TempVectorStack* temp_vector_stack) { + // Break into mini-batches + int64_t minibatch_length_max = util::MiniBatch::kMiniBatchLength; + auto selects_for_ranks_buf = util::TempVectorHolder( + temp_vector_stack, static_cast(minibatch_length_max)); + auto selects_for_ranks = selects_for_ranks_buf.mutable_data(); + for (int64_t minibatch_begin = batch_begin; minibatch_begin < batch_end; + minibatch_begin += minibatch_length_max) { + int64_t minibatch_end = std::min(batch_end, minibatch_begin + minibatch_length_max); + + // First and last rank that we are interested in + int64_t first_rank = + BitVectorNavigator::RankNext(minibatch_begin, ties_bitvec, ties_popcounts) - 1LL; + int64_t last_rank = + BitVectorNavigator::RankNext(minibatch_end - 1, ties_bitvec, ties_popcounts) - + 1LL; + + // Do select for each rank in the calculated range. + // + BitVectorNavigator::SelectsForRangeOfRanks( + first_rank + rank_delta, last_rank + rank_delta + 1, num_rows, ties_bitvec, + ties_popcounts, selects_for_ranks, hardware_flags, temp_vector_stack); + + int irank = 0; + outputs[minibatch_begin - batch_begin] = selects_for_ranks[irank]; + for (int64_t i = minibatch_begin + 1; i < minibatch_end; ++i) { + irank += bit_util::GetBit(reinterpret_cast(ties_bitvec), i) ? 1 : 0; + outputs[minibatch_begin - batch_begin] = selects_for_ranks[irank]; + } + } +} + +} // namespace compute +} // namespace arrow diff --git a/cpp/src/arrow/compute/exec/window_functions/bit_vector_navigator.h b/cpp/src/arrow/compute/exec/window_functions/bit_vector_navigator.h new file mode 100644 index 00000000000..ba62e21ea12 --- /dev/null +++ b/cpp/src/arrow/compute/exec/window_functions/bit_vector_navigator.h @@ -0,0 +1,158 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include "arrow/compute/exec/util.h" +#include "arrow/util/bit_util.h" + +namespace arrow { +namespace compute { + +// Bit-vector allocated size must be multiple of 64-bits. +// There is exactly ceil(num_bits / 64) 64-bit population counters. +// +class BitVectorNavigator { + public: + static uint64_t GenPopCounts(int64_t num_bits, const uint64_t* bits, + uint64_t* pop_counts) { + int64_t num_pop_counts = (num_bits + 63) / 64; + uint64_t sum = 0; + for (int64_t i = 0; i < num_pop_counts; ++i) { + pop_counts[i] = sum; + sum += ARROW_POPCOUNT64(bits[i]); + } + return sum; + } + + // O(1) + static inline int64_t PopCount(int64_t num_bits, const uint64_t* bitvec, + const uint64_t* popcounts) { + int64_t last_word = (num_bits - 1) / 64; + return popcounts[last_word] + ARROW_POPCOUNT64(bitvec[last_word]); + } + + // O(log(N)) + // The output is set to -1 if rank is below zero and to num_bits if + // rank is above the maximum rank of any row in the represented range. + static inline int64_t Select(int64_t rank, int64_t num_bits, const uint64_t* bits, + const uint64_t* pop_counts) { + if (rank < 0) { + return -1LL; + } + int64_t max_rank = PopCount(num_bits, bits, pop_counts) - 1LL; + if (rank > max_rank) { + return num_bits; + } + + int64_t num_pop_counts = (num_bits + 63) / 64; + // Find index of 64-bit block that contains the nth set bit. + int64_t block_id = (std::upper_bound(pop_counts, pop_counts + num_pop_counts, + static_cast(rank)) - + pop_counts) - + 1; + // Binary search position of (n - pop_count + 1)th bit set in the 64-bit + // block. + uint64_t block = bits[block_id]; + int64_t bit_rank = rank - pop_counts[block_id]; + int bit_id = 0; + for (int half_bits = 32; half_bits >= 1; half_bits /= 2) { + uint64_t mask = ((1ULL << half_bits) - 1ULL); + int64_t lower_half_pop_count = ARROW_POPCOUNT64(block & mask); + if (bit_rank >= lower_half_pop_count) { + block >>= half_bits; + bit_rank -= lower_half_pop_count; + bit_id += half_bits; + } + } + return block_id * 64 + bit_id; + } + + // TODO: We could implement BitVectorNavigator::Select that works on batches + // instead of single rows. Then it could use precomputed static B-tree to + // speed up binary search. + // + + // O(1) + // Input row number must be valid (between 0 and number of rows less 1). + static inline int64_t Rank(int64_t pos, const uint64_t* bits, + const uint64_t* pop_counts) { + int64_t block = pos >> 6; + int offset = static_cast(pos & 63LL); + uint64_t mask = (1ULL << offset) - 1ULL; + int64_t rank1 = + static_cast(pop_counts[block]) + ARROW_POPCOUNT64(bits[block] & mask); + return rank1; + } + + // O(1) + // Rank of the next row (also valid for the last row when the next row would + // be outside of the range of rows). + static inline int64_t RankNext(int64_t pos, const uint64_t* bits, + const uint64_t* pop_counts) { + int64_t block = pos >> 6; + int offset = static_cast(pos & 63LL); + uint64_t mask = ~0ULL >> (63 - offset); + int64_t rank1 = + static_cast(pop_counts[block]) + ARROW_POPCOUNT64(bits[block] & mask); + return rank1; + } + + // Input ranks may be outside of range of ranks present in the input bit + // vector. + // + static void SelectsForRangeOfRanks(int64_t rank_begin, int64_t rank_end, + int64_t num_bits, const uint64_t* bitvec, + const uint64_t* popcounts, int64_t* outputs, + int64_t hardware_flags, + util::TempVectorStack* temp_vector_stack); + + static void SelectsForRelativeRanksForRangeOfRows( + int64_t batch_begin, int64_t batch_end, int64_t rank_delta, int64_t num_rows, + const uint64_t* ties_bitvec, const uint64_t* ties_popcounts, int64_t* outputs, + int64_t hardware_flags, util::TempVectorStack* temp_vector_stack); + + template + static void GenSelectedIds(int64_t num_rows, const uint64_t* bitvec, INDEX_T* ids, + int64_t hardware_flags, + util::TempVectorStack* temp_vector_stack) { + // Break into mini-batches. + // + int64_t batch_length_max = util::MiniBatch::kMiniBatchLength; + auto batch_ids_buf = + util::TempVectorHolder(temp_vector_stack, batch_length_max); + auto batch_ids = batch_ids_buf.mutable_data(); + int batch_num_ids; + int64_t num_ids = 0; + for (int64_t batch_begin = 0; batch_begin < num_rows; + batch_begin += batch_length_max) { + int64_t batch_length = std::min(num_rows - batch_begin, batch_length_max); + util::bit_util::bits_to_indexes( + /*bit_to_search=*/1, hardware_flags, batch_length, + reinterpret_cast(bitvec + (batch_begin / 64)), &batch_num_ids, + batch_ids); + for (int i = 0; i < batch_num_ids; ++i) { + ids[num_ids + i] = static_cast(batch_begin + batch_ids[i]); + } + num_ids += batch_num_ids; + } + } +}; + +} // namespace compute +} // namespace arrow diff --git a/cpp/src/arrow/compute/exec/window_functions/merge_tree.cc b/cpp/src/arrow/compute/exec/window_functions/merge_tree.cc new file mode 100644 index 00000000000..aa374717298 --- /dev/null +++ b/cpp/src/arrow/compute/exec/window_functions/merge_tree.cc @@ -0,0 +1,231 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/compute/exec/window_functions/merge_tree.h" + +namespace arrow { +namespace compute { + +void MergeTree::Build(int64_t num_rows, const int64_t* permutation, + int num_levels_to_skip, int64_t hardware_flags, + util::TempVectorStack* temp_vector_stack) { + num_rows_ = num_rows; + if (num_rows == 0) { + return; + } + + int height = 1 + arrow::bit_util::Log2(num_rows); + level_bitvecs_.resize(height); + level_popcounts_.resize(height); + + int64_t num_bit_words = arrow::bit_util::CeilDiv(num_rows, 64); + + // We skip level 0 on purpose - it is not used. + // We also skip num_levels_to_skip from the top. + // + for (int level = 1; level < height - num_levels_to_skip; ++level) { + level_bitvecs_[level].resize(num_bit_words); + level_popcounts_[level].resize(num_bit_words); + } + + std::vector permutation_temp[2]; + permutation_temp[0].resize(num_rows); + permutation_temp[1].resize(num_rows); + int64_t* permutation_pingpong[2]; + permutation_pingpong[0] = permutation_temp[0].data(); + permutation_pingpong[1] = permutation_temp[1].data(); + + // Generate tree layers top-down + // + int top_level = height - num_levels_to_skip - 1; + for (int target_level = top_level; target_level > 0; --target_level) { + int flip = target_level % 2; + const int64_t* permutation_up = + (target_level == top_level - 1) ? permutation : permutation_pingpong[flip]; + if (target_level < top_level) { + int64_t* permutation_this = permutation_pingpong[1 - flip]; + Split(target_level + 1, permutation_up, permutation_this, hardware_flags, + temp_vector_stack); + } + const int64_t* permutation_this = + (target_level == top_level) ? permutation : permutation_pingpong[1 - flip]; + GenBitvec(target_level, permutation_this); + } +} + +void MergeTree::RangeQueryStep(int level, int64_t num_queries, const int64_t* begins, + const int64_t* ends, RangeQueryState* query_states, + RangeQueryState* query_outputs) const { + for (int64_t iquery = 0; iquery < num_queries; ++iquery) { + int64_t begin = begins[iquery]; + int64_t end = ends[iquery]; + RangeQueryState& state = query_states[iquery]; + RangeQueryState& output = query_outputs[iquery]; + ARROW_DCHECK(begin <= end && begin >= 0 && end <= num_rows_); + + RangeQueryState parent_state; + parent_state.pos[0] = state.pos[0]; + parent_state.pos[1] = state.pos[1]; + state.pos[0] = state.pos[1] = output.pos[0] = output.pos[1] = RangeQueryState::kEmpty; + + for (int iparent_pos = 0; iparent_pos < 2; ++iparent_pos) { + int64_t parent_pos = parent_state.pos[iparent_pos]; + if (parent_pos != RangeQueryState::kEmpty) { + RangeQueryState child_state; + Cascade(level, parent_pos, &child_state); + for (int ichild_pos = 0; ichild_pos < 2; ++ichild_pos) { + int64_t child_pos = child_state.pos[ichild_pos]; + if (child_pos != RangeQueryState::kEmpty) { + int64_t child_node; + int64_t child_length; + RangeQueryState::NodeAndLengthFromPos(level - 1, child_pos, &child_node, + &child_length); + if (NodeFullyInsideRange(level - 1, child_node, begin, end)) { + output.AppendPos(child_pos); + } else if (NodePartiallyInsideRange(level - 1, child_node, begin, end)) { + state.AppendPos(child_pos); + } + } + } + } + } + } +} + +void MergeTree::NthElement(int64_t num_queries, const uint16_t* opt_ids, + const int64_t* begins, const int64_t* ends, + /* ns[i] must be in the range [0; ends[i] - begins[i]) */ + const int64_t* ns, int64_t* row_numbers, + util::TempVectorStack* temp_vector_stack) const { + int64_t batch_length_max = util::MiniBatch::kMiniBatchLength; + + // Allocate temporary buffers + // + auto temp_begins_buf = util::TempVectorHolder( + temp_vector_stack, static_cast(batch_length_max)); + int64_t* temp_begins = temp_begins_buf.mutable_data(); + + auto temp_ends_buf = util::TempVectorHolder( + temp_vector_stack, static_cast(batch_length_max)); + int64_t* temp_ends = temp_ends_buf.mutable_data(); + + auto temp_ns_buf = util::TempVectorHolder( + temp_vector_stack, static_cast(batch_length_max)); + int64_t* temp_ns = temp_ns_buf.mutable_data(); + + for (int64_t batch_begin = 0; batch_begin < num_queries; + batch_begin += batch_length_max) { + int64_t batch_length = std::min(num_queries - batch_begin, batch_length_max); + + // Initialize tree cursors (begin and end of a range of some top level + // node for each query/frame). + // + if (opt_ids) { + for (int64_t i = 0; i < batch_length; ++i) { + uint16_t id = opt_ids[batch_begin + i]; + temp_begins[i] = begins[id]; + temp_ends[i] = ends[id]; + temp_ns[i] = ns[id]; + ARROW_DCHECK(temp_ns[i] >= 0 && temp_ns[i] < temp_ends[i] - temp_begins[i]); + } + } else { + memcpy(temp_begins, begins + batch_begin, batch_length * sizeof(temp_begins[0])); + memcpy(temp_ends, ends + batch_begin, batch_length * sizeof(temp_ends[0])); + memcpy(temp_ns, ns + batch_begin, batch_length * sizeof(temp_ns[0])); + } + + // Traverse the tree top-down + // + int top_level = static_cast(level_bitvecs_.size()) - 1; + for (int level = top_level; level > 0; --level) { + for (int64_t i = 0; i < batch_length; ++i) { + NthElementStep(level, temp_begins + i, temp_ends + i, temp_ns + i); + } + } + + // Output results + // + if (opt_ids) { + for (int64_t i = 0; i < batch_length; ++i) { + uint16_t id = opt_ids[batch_begin + i]; + row_numbers[id] = temp_begins[i]; + } + } else { + for (int64_t i = 0; i < batch_length; ++i) { + row_numbers[batch_begin + i] = temp_begins[i]; + } + } + } +} + +void MergeTree::GenBitvec( + /* level to generate for */ int level, const int64_t* permutation) { + uint64_t result = 0ULL; + for (int64_t base = 0; base < num_rows_; base += 64) { + for (int64_t i = base; i < std::min(base + 64, num_rows_); ++i) { + int64_t bit = (permutation[i] >> (level - 1)) & 1; + result |= static_cast(bit) << (i & 63); + } + level_bitvecs_[level][base / 64] = result; + result = 0ULL; + } + + BitVectorNavigator::GenPopCounts(num_rows_, level_bitvecs_[level].data(), + level_popcounts_[level].data()); +} + +void MergeTree::Cascade(int level, int64_t pos, RangeQueryState* result) const { + ARROW_DCHECK(level > 0); + + int64_t node; + int64_t length; + RangeQueryState::NodeAndLengthFromPos(level, pos, &node, &length); + + int64_t node_begin = node << level; + // We use RankNext for node_begin + length - 1 instead of Rank for node_begin + // + length, because the latter one may be equal to num_rows_ which is an + // index out of range for bitvector. + // + int64_t rank = + BitVectorNavigator::RankNext(node_begin + length - 1, level_bitvecs_[level].data(), + level_popcounts_[level].data()); + int64_t local_rank = rank - (node_begin / 2); + result->pos[0] = + RangeQueryState::PosFromNodeAndLength(level - 1, node * 2, length - local_rank); + bool has_right_child = + (node_begin + (static_cast(1) << (level - 1))) < num_rows_; + result->pos[1] = has_right_child ? RangeQueryState::PosFromNodeAndLength( + level - 1, node * 2 + 1, local_rank) + : RangeQueryState::kEmpty; +} + +bool MergeTree::NodeFullyInsideRange(int level, int64_t node, int64_t begin, + int64_t end) const { + int64_t node_begin = node << level; + int64_t node_end = std::min(num_rows_, node_begin + (static_cast(1) << level)); + return node_begin >= begin && node_end <= end; +} + +bool MergeTree::NodePartiallyInsideRange(int level, int64_t node, int64_t begin, + int64_t end) const { + int64_t node_begin = node << level; + int64_t node_end = std::min(num_rows_, node_begin + (static_cast(1) << level)); + return node_begin < end && node_end > begin; +} + +} // namespace compute +} // namespace arrow diff --git a/cpp/src/arrow/compute/exec/window_functions/merge_tree.h b/cpp/src/arrow/compute/exec/window_functions/merge_tree.h new file mode 100644 index 00000000000..fbab9c6886d --- /dev/null +++ b/cpp/src/arrow/compute/exec/window_functions/merge_tree.h @@ -0,0 +1,197 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include "arrow/compute/exec/util.h" +#include "arrow/util/bit_util.h" +#include "bit_vector_navigator.h" + +namespace arrow { +namespace compute { + +// TODO: Support multiple [begin, end) ranges in range and nth_element queries. +// + +// One way to think about MergeTree is that, when we traverse top down, we +// switch to sortedness on X axis, and when we traverse bottom up, we switch to +// sortedness on Y axis. At the lowest level of MergeTree rows are sorted on X +// and the highest level they are sorted on Y. +// +class MergeTree { + public: + MergeTree() : num_rows_(0) {} + + void Build(int64_t num_rows, const int64_t* permutation, int num_levels_to_skip, + int64_t hardware_flags, util::TempVectorStack* temp_vector_stack); + + int get_height() const { return num_rows_ ? 1 + arrow::bit_util::Log2(num_rows_) : 0; } + + template + void Split( + /* upper level */ int level, const S* in, S* out, int64_t hardware_flags, + util::TempVectorStack* temp_vector_stack) const { + int64_t lower_node_length = 1LL << (level - 1); + int64_t lower_node_mask = lower_node_length - 1LL; + + int64_t batch_length_max = util::MiniBatch::kMiniBatchLength; + int num_ids; + auto ids_buf = util::TempVectorHolder( + temp_vector_stack, static_cast(batch_length_max)); + uint16_t* ids = ids_buf.mutable_data(); + + // Break into mini-batches + int64_t rank_batch_begin[2]; + rank_batch_begin[0] = 0; + rank_batch_begin[1] = 0; + for (int64_t batch_begin = 0; batch_begin < num_rows_; + batch_begin += batch_length_max) { + int64_t batch_length = std::min(num_rows_ - batch_begin, batch_length_max); + + for (int child = 0; child <= 1; ++child) { + // Get parent node positions (relative to the batch) for all elements + // coming from left child + util::bit_util::bits_to_indexes( + child, hardware_flags, static_cast(batch_length), + reinterpret_cast(level_bitvecs_[level].data() + + batch_begin / 64), + &num_ids, ids); + + for (int i = 0; i < num_ids; ++i) { + int64_t upper_pos = batch_begin + ids[i]; + int64_t rank = rank_batch_begin[child] + i; + int64_t lower_pos = (rank & ~lower_node_mask) * 2 + child * lower_node_length + + (rank & lower_node_mask); + out[lower_pos] = in[upper_pos]; + } + rank_batch_begin[child] += num_ids; + } + } + } + + // State or output for range query. + // + // Represents between zero and two different nodes from a single level of the + // tree. + // + // For each node remembers the length of its prefix, which represents a + // subrange of selected elements of that node. + // + // Length is between 1 and the number of node elements at this level (both + // bounds inclusive), because empty set of selected elements is represented by + // a special constant kEmpty. + // + struct RangeQueryState { + static constexpr int64_t kEmpty = -1LL; + + static int64_t PosFromNodeAndLength(int level, int64_t node, int64_t length) { + if (length == 0) { + return kEmpty; + } + return (node << level) + length - 1; + } + + static void NodeAndLengthFromPos(int level, int64_t pos, int64_t* node, + int64_t* length) { + ARROW_DCHECK(pos != kEmpty); + *node = pos >> level; + *length = 1 + pos - (*node << level); + } + + void AppendPos(int64_t new_pos) { + // One of the two positions must be set to null + // + if (pos[0] == kEmpty) { + pos[0] = new_pos; + } else { + ARROW_DCHECK(pos[1] == kEmpty); + pos[1] = new_pos; + } + } + + int64_t pos[2]; + }; + + // Visiting each level updates state cursor pair and outputs state cursor + // pair. + // + void RangeQueryStep(int level, int64_t num_queries, const int64_t* begins, + const int64_t* ends, RangeQueryState* query_states, + RangeQueryState* query_outputs) const; + + int64_t NthElement(int64_t begin, int64_t end, int64_t n) const { + ARROW_DCHECK(n >= 0 && n < end - begin); + int64_t temp_begin = begin; + int64_t temp_end = end; + int64_t temp_n = n; + + // Traverse the tree top-down + // + int top_level = static_cast(level_bitvecs_.size()) - 1; + for (int level = top_level; level > 0; --level) { + NthElementStep(level, &temp_begin, &temp_end, &temp_n); + } + + return temp_begin; + } + + void NthElement(int64_t num_queries, const uint16_t* opt_ids, const int64_t* begins, + const int64_t* ends, + /* ns[i] must be in the range [0; ends[i] - begins[i]) */ + const int64_t* ns, int64_t* row_numbers, + util::TempVectorStack* temp_vector_stack) const; + + private: + /* output 0 if value comes from left child and 1 otherwise */ + void GenBitvec( + /* level to generate for */ int level, + /* source permutation of rows for elements in this level */ + const int64_t* permutation); + + void Cascade(int level, int64_t pos, RangeQueryState* result) const; + + bool NodeFullyInsideRange(int level, int64_t node, int64_t begin, int64_t end) const; + + bool NodePartiallyInsideRange(int level, int64_t node, int64_t begin, + int64_t end) const; + + void NthElementStep(int level, int64_t* begin, int64_t* end, int64_t* n) const { + int64_t node_length = 1LL << level; + uint64_t node_mask = node_length - 1; + int64_t node_begin = (*begin & ~node_mask); + + int64_t rank_begin = BitVectorNavigator::Rank(*begin, level_bitvecs_[level].data(), + level_popcounts_[level].data()); + int64_t rank_end = BitVectorNavigator::RankNext( + *end - 1, level_bitvecs_[level].data(), level_popcounts_[level].data()); + int64_t length_left = (*end - *begin) - (rank_end - rank_begin); + int64_t child_mask = (length_left <= *n ? ~0LL : 0LL); + + *begin = node_begin + ((node_length / 2 + rank_begin - node_begin / 2) & child_mask) + + (((*begin - node_begin) - (rank_begin - node_begin / 2)) & ~child_mask); + *end = *begin + ((rank_end - rank_begin) & child_mask) + (length_left & ~child_mask); + *n -= (length_left & child_mask); + } + + int64_t num_rows_; + std::vector> level_bitvecs_; + std::vector> level_popcounts_; +}; + +} // namespace compute +} // namespace arrow diff --git a/cpp/src/arrow/compute/exec/window_functions/radix_sort.h b/cpp/src/arrow/compute/exec/window_functions/radix_sort.h new file mode 100644 index 00000000000..541d390cbde --- /dev/null +++ b/cpp/src/arrow/compute/exec/window_functions/radix_sort.h @@ -0,0 +1,94 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include "arrow/util/bit_util.h" + +namespace arrow { +namespace compute { + +class RadixSort { + public: + static void Sort(int64_t num_rows, int64_t value_max, const int64_t* values, + int64_t* values_sorted, int64_t* permutation) { + int num_bits = std::max( + 1, 64 - arrow::bit_util::CountLeadingZeros(static_cast(value_max))); + constexpr int num_bits_per_group = 8; + constexpr int num_bit_groups_max = 8; + const int64_t bit_group_mask = ((1LL << num_bits_per_group) - 1LL); + int num_bit_groups = + static_cast(arrow::bit_util::CeilDiv(num_bits, num_bits_per_group)); + int64_t counters[num_bit_groups_max << num_bits_per_group]; + memset(counters, 0, sizeof(counters[0]) * (num_bit_groups << num_bits_per_group)); + for (int64_t i = 0; i < num_rows; ++i) { + int64_t value = values[i]; + for (int bit_group = 0; bit_group < num_bit_groups; ++bit_group) { + int64_t bucket_id = (value >> (num_bits_per_group * bit_group)) & bit_group_mask; + ++counters[(bit_group << num_bits_per_group) + bucket_id]; + } + } + for (int bit_group = 0; bit_group < num_bit_groups; ++bit_group) { + int64_t sum = 0; + int64_t* counters_base = counters + (bit_group << num_bits_per_group); + for (int i = 0; i < (1 << num_bits_per_group); ++i) { + int64_t sum_next = sum + counters_base[i]; + counters_base[i] = sum; + sum = sum_next; + } + } + std::vector values_sorted_2nd; + std::vector permutation_2nd; + if (num_bit_groups > 1) { + values_sorted_2nd.resize(num_rows); + permutation_2nd.resize(num_rows); + } + for (int bit_group = 0; bit_group < num_bit_groups; ++bit_group) { + int64_t* values_sorted_target = + ((num_bit_groups - bit_group) & 1) ? values_sorted : values_sorted_2nd.data(); + int64_t* permutation_target = + ((num_bit_groups - bit_group) & 1) ? permutation : permutation_2nd.data(); + int64_t* counters_base = counters + (bit_group << num_bits_per_group); + if (bit_group == 0) { + for (int64_t i = 0; i < num_rows; ++i) { + int64_t value = values[i]; + int64_t bucket = value & bit_group_mask; + int64_t pos = counters_base[bucket]++; + values_sorted_target[pos] = value; + permutation_target[pos] = i; + } + } else { + const int64_t* values_sorted_source = values_sorted_target == values_sorted + ? values_sorted_2nd.data() + : values_sorted; + const int64_t* permutation_source = + permutation_target == permutation ? permutation_2nd.data() : permutation; + for (int64_t i = 0; i < num_rows; ++i) { + int64_t value = values_sorted_source[i]; + int64_t bucket = ((value >> (num_bits_per_group * bit_group)) & bit_group_mask); + int64_t pos = counters_base[bucket]++; + values_sorted_target[pos] = value; + permutation_target[pos] = permutation_source[i]; + } + } + } + } +}; + +} // namespace compute +} // namespace arrow diff --git a/cpp/src/arrow/compute/exec/window_functions/window_aggregate.cc b/cpp/src/arrow/compute/exec/window_functions/window_aggregate.cc new file mode 100644 index 00000000000..60cbdf7396d --- /dev/null +++ b/cpp/src/arrow/compute/exec/window_functions/window_aggregate.cc @@ -0,0 +1,376 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/compute/exec/window_functions/window_aggregate.h" +#include +#include // For test +#include "arrow/compute/exec/util.h" +#include "arrow/compute/exec/window_functions/bit_vector_navigator.h" + +namespace arrow { +namespace compute { + +IntervalTree::~IntervalTree() { + for (size_t i = 0; i < levels_.size(); ++i) { + if (levels_[i]) { + delete levels_[i]; + } + } + if (temp_vector_) { + delete temp_vector_; + } +} + +void IntervalTree::Build(int64_t num_rows, util::TempVectorStack* temp_vector_stack) { + ARROW_DCHECK(num_rows_ == 0); + num_rows_ = num_rows; + if (num_rows_ == 0) { + return; + } + + // Allocate vectors for levels of the tree (starting from level 1). + // + levels_.clear(); + level_sizes_.clear(); + levels_.push_back(nullptr); // Level 0 is ignored + level_sizes_.push_back(0LL); + int64_t last_level_size = num_rows_; + while (last_level_size >= kFanout) { + int64_t next_level_size = bit_util::CeilDiv(last_level_size, kFanout); + levels_.push_back(func_->Alloc(next_level_size)); + level_sizes_.push_back(next_level_size); + func_->Zero(level_sizes_.back(), levels_.back(), 0); + last_level_size = next_level_size; + } + + int64_t batch_length_max = util::MiniBatch::kMiniBatchLength; + ARROW_DCHECK(batch_length_max % kFanout == 0); + + // Temp buffers + // + auto out_ids_buf = util::TempVectorHolder( + temp_vector_stack, static_cast(batch_length_max)); + int64_t* out_ids = out_ids_buf.mutable_data(); + auto in_ids_buf = util::TempVectorHolder( + temp_vector_stack, static_cast(batch_length_max)); + int64_t* in_ids = in_ids_buf.mutable_data(); + + int num_levels = static_cast(level_sizes_.size()); + for (int target_level = 1; target_level < num_levels; ++target_level) { + int64_t num_rows_in = + (target_level == 1) ? num_rows_ : level_sizes_[target_level - 1]; + for (int64_t batch_begin = 0; batch_begin < num_rows_in; + batch_begin += batch_length_max) { + int64_t batch_length = std::min(num_rows_in - batch_begin, batch_length_max); + std::iota(in_ids, in_ids + batch_length, batch_begin); + for (int64_t i = 0; i < batch_length; ++i) { + out_ids[i] = ((batch_begin + i) >> kLogFanout); + } + func_->Sum(batch_length, levels_[target_level], out_ids, + target_level == 1 ? nullptr : levels_[target_level - 1], in_ids); + } + } + + delete temp_vector_; + temp_vector_ = nullptr; +} + +void IntervalTree::SumOfRange(int64_t num, const int64_t* begins, const int64_t* ends, + WindowAggregateFunc::Vector* results, + uint8_t* results_validity, + util::TempVectorStack* temp_vector_stack) { + // Zero and mark all results as valid + // + memset(results_validity, 0xff, bit_util::CeilDiv(num, 8)); + func_->Zero(num, results, 0); + + // Bottom-up interval tree traversal + // + // Breaking queries into batches. + // We use smaller batches to have room in temporary arrays for multiple + // elements per query (up to kFanout * 2). + // + int64_t batch_length_max = (util::MiniBatch::kMiniBatchLength >> kLogFanout); + + // Ids of queries within a batch still participating in interval tree + // traversal + auto ids_buf = util::TempVectorHolder( + temp_vector_stack, static_cast(batch_length_max)); + uint16_t* ids = ids_buf.mutable_data(); + int num_ids = 0; + // Buffers for source and destination ids for requests to perform addition + // (room for up to 2 * (kFanout - 1) sums per query) + // + auto sum_dst_buf = util::TempVectorHolder( + temp_vector_stack, static_cast(batch_length_max * (kFanout - 1) * 2)); + int64_t* sum_dst = sum_dst_buf.mutable_data(); + auto sum_src_buf = util::TempVectorHolder( + temp_vector_stack, static_cast(batch_length_max * (kFanout - 1) * 2)); + int64_t* sum_src = sum_src_buf.mutable_data(); + + for (int64_t batch_begin = 0; batch_begin < num; batch_begin += batch_length_max) { + int64_t batch_length = std::min(num - batch_begin, batch_length_max); + + num_ids = 0; + for (int64_t i = batch_begin; i < batch_begin + batch_length; ++i) { + if (begins[i] < ends[i]) { + ids[num_ids++] = static_cast(i - batch_begin); + } else { + bit_util::ClearBit(results_validity, i); + } + } + + int level = 0; + while (num_ids > 0) { + ARROW_DCHECK(level < static_cast(levels_.size())); + int64_t num_sums = 0; + int num_ids_new = 0; + int bit_shift = kLogFanout * level; + int64_t cell_size = 1LL << bit_shift; + for (int i = 0; i < num_ids; ++i) { + uint16_t id = ids[i]; + int64_t begin = (begins[batch_begin + id] + cell_size - 1LL) >> bit_shift; + int64_t end = ends[batch_begin + id] >> bit_shift; + int64_t begin_round_up = + ((begin + kFanout - 1) & ~static_cast(kFanout - 1)); + int64_t end_round_down = (end & ~static_cast(kFanout - 1)); + if (begin_round_up > end_round_down) { + begin_round_up = end_round_down = end; + } + for (int64_t j = begin; j < begin_round_up; ++j) { + sum_dst[num_sums] = batch_begin + id; + sum_src[num_sums] = j; + ++num_sums; + } + for (int64_t j = end_round_down; j < end; ++j) { + sum_dst[num_sums] = batch_begin + id; + sum_src[num_sums] = j; + ++num_sums; + } + if (begin_round_up < end_round_down) { + ids[num_ids_new++] = id; + } + } + func_->Sum(num_sums, results, sum_dst, levels_[level], sum_src); + num_ids = num_ids_new; + ++level; + } + } +} + +void WindowFrameDeltaStream::Init(int64_t num_frames, const int64_t* begins, + const int64_t* ends) { + num_frames_ = num_frames; + begins_ = begins; + ends_ = ends; + frame_ = 0; + begin_ = 0; + end_ = 0; + reset_next_ = true; +} + +void WindowFrameDeltaStream::GetNextInputBatch( + uint16_t max_batch_size, uint16_t* num_inputs, uint16_t* num_output_frames, + uint8_t* reset_sum_bitvec, uint8_t* negate_input_bitvec, int64_t* input_row_number, + uint16_t* input_prefix_length, int64_t* output_frame) { + *num_inputs = 0; + *num_output_frames = 0; + + int64_t num_frames = num_frames_; + const int64_t* begins = begins_; + const int64_t* ends = ends_; + + for (;;) { + // Ignore empty frames + // + while (frame_ < num_frames && begins[frame_] == ends[frame_]) { + ++frame_; + } + + // Ignore frames that are the same as the previous one + // + while (frame_ < num_frames && begins[frame_] == begin_ && ends[frame_] == end_) { + ++frame_; + } + + if (frame_ == num_frames) { + return; + } + + int64_t target_begin = begins[frame_]; + int64_t target_end = ends[frame_]; + + if (begin_ < target_begin && end_ <= target_begin) { + reset_next_ = true; + begin_ = end_ = target_begin; + } + + while (begin_ < target_begin) { + if (*num_inputs == max_batch_size) { + return; + } + bit_util::SetBitTo(reset_sum_bitvec, *num_inputs, reset_next_); + reset_next_ = false; + bit_util::SetBit(negate_input_bitvec, *num_inputs); + input_row_number[*num_inputs] = begin_; + ++num_inputs; + ++begin_; + } + + while (end_ < target_end) { + if (*num_inputs == max_batch_size) { + return; + } + bit_util::SetBitTo(reset_sum_bitvec, *num_inputs, reset_next_); + reset_next_ = false; + bit_util::ClearBit(negate_input_bitvec, *num_inputs); + input_row_number[*num_inputs] = end_; + ++*num_inputs; + ++end_; + } + + input_prefix_length[*num_output_frames] = *num_inputs; + output_frame[*num_output_frames] = frame_; + ++*num_output_frames; + ++frame_; + } +} + +void WindowAggregateBasic::Sum(int64_t num_rows, const int64_t* vals, + const int64_t* begins, const int64_t* ends, + std::vector>& results, + uint8_t* results_validity) { + results.resize(num_rows); + for (int64_t i = 0; i < num_rows; ++i) { + int64_t begin = begins[i]; + int64_t end = ends[i]; + if (begin == end) { + bit_util::ClearBit(results_validity, i); + results[i] = std::make_pair(0ULL, 0ULL); + continue; + } + bit_util::SetBit(results_validity, i); + + uint64_t overflow = 0ULL; + uint64_t result = 0ULL; + for (int64_t j = begin; j < end; ++j) { + result += static_cast(vals[j]); + if (result < static_cast(vals[j])) { + ++overflow; + } + if (vals[j] < 0LL) { + overflow += ~0ULL; + } + } + results[i] = std::make_pair(result, overflow); + } +} + +void WindowAggregateTest::TestSum() { + Random64BitCopy rand; + MemoryPool* pool = default_memory_pool(); + util::TempVectorStack temp_vector_stack; + Status status = temp_vector_stack.Init(pool, 128 * util::MiniBatch::kMiniBatchLength); + ARROW_DCHECK(status.ok()); + + constexpr int num_tests = 100; + const int num_tests_to_skip = 1; + for (int test = 0; test < num_tests; ++test) { + // Generate random values + // + constexpr int64_t max_rows = 1100; + int64_t num_rows = rand.from_range(static_cast(1), max_rows); + std::vector vals(num_rows); + constexpr int64_t max_val = 65535; + for (int64_t i = 0; i < num_rows; ++i) { + vals[i] = rand.from_range(static_cast(0), max_val); + } + + // Generate random frames + // + std::vector begins; + std::vector ends; + GenerateTestFrames(rand, num_rows, begins, ends, /*progressive=*/false, + /*expansive=*/false); + + printf("num_rows %d ", static_cast(num_rows)); + + if (test < num_tests_to_skip) { + continue; + } + + WinAggFun_SumInt64 func; + func.Init(num_rows, vals.data()); + + std::vector> out0(num_rows); + WinAggFun_SumInt64::VectorUint128* out1; + out1 = func.Alloc(num_rows); + std::vector out_validity[2]; + out_validity[0].resize(bit_util::BytesForBits(num_rows)); + out_validity[1].resize(bit_util::BytesForBits(num_rows)); + + int64_t num_repeats; +#ifndef NDEBUG + num_repeats = 1; +#else + num_repeats = std::max(1LL, 1024 * 1024LL / num_rows); +#endif + printf("num_repeats %d ", static_cast(num_repeats)); + + // int64_t start = __rdtsc(); + for (int repeat = 0; repeat < num_repeats; ++repeat) { + WindowAggregateBasic::Sum(num_rows, vals.data(), begins.data(), ends.data(), out0, + out_validity[1].data()); + } + // int64_t end = __rdtsc(); + // printf("cpr basic %.1f ", + // static_cast(end - start) / static_cast(num_rows * + // num_repeats)); + // start = __rdtsc(); + for (int repeat = 0; repeat < num_repeats; ++repeat) { + WindowAggregate::Sum(num_rows, &func, begins.data(), ends.data(), out1, + out_validity[0].data(), &temp_vector_stack); + } + // end = __rdtsc(); + // printf("cpr normal %.1f ", + // static_cast(end - start) / static_cast(num_rows * + // num_repeats)); + + bool ok = true; + for (int64_t i = 0; i < num_rows; ++i) { + bool valid[2]; + for (int j = 0; j < 2; ++j) { + valid[j] = bit_util::GetBit(out_validity[j].data(), i); + } + if (valid[0] != valid[1]) { + ARROW_DCHECK(false); + ok = false; + } + if (out1->vals_[i].first != out0[i].first || + out1->vals_[i].second != out0[i].second) { + ARROW_DCHECK(false); + ok = false; + } + } + printf("%s\n", ok ? "correct" : "wrong"); + + delete out1; + } +} + +} // namespace compute +} // namespace arrow diff --git a/cpp/src/arrow/compute/exec/window_functions/window_aggregate.h b/cpp/src/arrow/compute/exec/window_functions/window_aggregate.h new file mode 100644 index 00000000000..1c8ef849e4a --- /dev/null +++ b/cpp/src/arrow/compute/exec/window_functions/window_aggregate.h @@ -0,0 +1,151 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include "arrow/compute/exec/util.h" +#include "arrow/compute/exec/window_functions/window_expr.h" +#include "arrow/compute/exec/window_functions/window_frame.h" + +namespace arrow { +namespace compute { + +class IntervalTree { + public: + IntervalTree() : func_(nullptr), num_rows_(0), temp_vector_(nullptr) {} + + ~IntervalTree(); + + void Init(WindowAggregateFunc* func) { func_ = func; } + + void Build(int64_t num_rows, util::TempVectorStack* temp_vector_stack); + + void SumOfRange(int64_t num, const int64_t* begins, const int64_t* ends, + WindowAggregateFunc::Vector* results, uint8_t* results_validity, + util::TempVectorStack* temp_vector_stack); + + private: + static constexpr int kLogFanout = 2; + static constexpr int64_t kFanout = 1LL << kLogFanout; + WindowAggregateFunc* func_; + int64_t num_rows_; + std::vector levels_; + std::vector level_sizes_; + WindowAggregateFunc::Vector* temp_vector_; +}; + +class WindowFrameDeltaStream { + public: + void Init(int64_t num_frames, const int64_t* begins, const int64_t* ends); + void GetNextInputBatch(uint16_t max_batch_size, uint16_t* num_inputs, + uint16_t* num_output_frames, uint8_t* reset_sum_bitvec, + uint8_t* negate_input_bitvec, int64_t* input_row_number, + uint16_t* input_prefix_length, int64_t* output_frame); + + private: + int64_t num_frames_; + const int64_t* begins_; + const int64_t* ends_; + int64_t frame_; + int64_t begin_, end_; + bool reset_next_; + + int64_t next_frame_to_output_; +}; + +class WindowFramePeersEnumerator {}; + +class WindowAggregate { + public: + static void Sum(int64_t num_rows, WindowAggregateFunc* func, const int64_t* begins, + const int64_t* ends, WindowAggregateFunc::Vector* results, + uint8_t* results_validity, util::TempVectorStack* temp_vector_stack) { + IntervalTree tree; + tree.Init(func); + tree.Build(num_rows, temp_vector_stack); + tree.SumOfRange(num_rows, begins, ends, results, results_validity, temp_vector_stack); + } + // static void SumProgressive(int64_t num_rows, WindowAggregateFunc* func, + // const int64_t* begins, const int64_t* ends, + // WindowAggregateFunc::Vector* results, + // uint8_t* results_validity, + // util::TempVectorStack* temp_vector_stack) { + // int64_t batch_length_max = util::MiniBatch::kMiniBatchLength; + + // // TODO: Zero output + // // + + // auto reset_sum_bitvec_buf = util::TempVectorHolder( + // temp_vector_stack, + // static_cast(bit_util::BytesForBits(batch_length_max))); + // uint8_t* reset_sum_bitvec = reset_sum_bitvec_buf.mutable_data(); + + // auto negate_input_bitvec_buf = util::TempVectorHolder( + // temp_vector_stack, + // static_cast(bit_util::BytesForBits(batch_length_max))); + // uint8_t* negate_input_bitvec = negate_input_bitvec_buf.mutable_data(); + + // auto input_row_number_buf = util::TempVectorHolder( + // temp_vector_stack, static_cast(batch_length_max)); + // int64_t* input_row_number = input_row_number_buf.mutable_data(); + + // auto input_prefix_length_buf = util::TempVectorHolder( + // temp_vector_stack, static_cast(batch_length_max)); + // uint16_t* input_prefix_length = input_prefix_length_buf.mutable_data(); + + // auto output_frame_buf = util::TempVectorHolder( + // temp_vector_stack, static_cast(batch_length_max)); + // int64_t* output_frame = output_frame_buf.mutable_data(); + + // WindowFrameDeltaStream frame_deltas; + // frame_deltas.Init(num_rows, begins, ends); + + // // TODO: Allocate temporary vector for prefix sums + + // // TODO: Allocate out_ids and in_ids for sums + + // int64_t num_frames_processed = 0; + // for (;;) { + // uint16_t num_inputs; + // uint16_t num_output_frames; + // frame_deltas.GetNextInputBatch(static_cast(batch_length_max), + // &num_inputs, + // &num_output_frames, reset_sum_bitvec, + // negate_input_bitvec, input_row_number, + // input_prefix_length, output_frame); + + // // TODO: Finish + // } + // } +}; + +class WindowAggregateBasic { + public: + static void Sum(int64_t num_rows, const int64_t* vals, const int64_t* begins, + const int64_t* ends, + std::vector>& results, + uint8_t* results_validity); +}; + +class WindowAggregateTest { + public: + static void TestSum(); +}; + +} // namespace compute +} // namespace arrow diff --git a/cpp/src/arrow/compute/exec/window_functions/window_distinct.cc b/cpp/src/arrow/compute/exec/window_functions/window_distinct.cc new file mode 100644 index 00000000000..e6f5f5aaa7c --- /dev/null +++ b/cpp/src/arrow/compute/exec/window_functions/window_distinct.cc @@ -0,0 +1,618 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/compute/exec/window_functions/window_distinct.h" +#include +#include // For test +#include "arrow/compute/exec/util.h" +#include "arrow/compute/exec/window_functions/bit_vector_navigator.h" +#include "arrow/compute/exec/window_functions/merge_tree.h" +#include "arrow/compute/exec/window_functions/radix_sort.h" + +namespace arrow { +namespace compute { + +void WindowDistinct::Init(int64_t num_rows, int64_t num_groups, + const int64_t* group_ids, /*const WindowFrames &frames,*/ + int64_t hardware_flags, + util::TempVectorStack* temp_vector_stack) { + num_rows_ = num_rows; + num_groups_ = num_groups; + + std::vector group_ids_sorted(num_rows); + std::vector row_numbers_sorted_on_group_id(num_rows); + RadixSort::Sort(num_rows, num_groups - 1, group_ids, group_ids_sorted.data(), + row_numbers_sorted_on_group_id.data()); + + std::vector prev(num_rows); + GenListsForGroups(num_rows, num_groups, group_ids_sorted.data(), + row_numbers_sorted_on_group_id.data(), prev.data(), nullptr); + + int64_t num_bit_words = bit_util::CeilDiv(num_rows_, 64); + std::vector has_prev_bitvec(num_bit_words); + std::vector has_prev_popcounts(num_bit_words); + has_next_bitvec_.resize(num_bit_words); + has_next_popcounts_.resize(num_bit_words); + GenHasPrevNextBitvecs(num_rows_, group_ids_sorted.data(), + row_numbers_sorted_on_group_id.data(), has_prev_bitvec.data(), + has_next_bitvec_.data()); + BitVectorNavigator::GenPopCounts(num_rows_, has_prev_bitvec.data(), + has_prev_popcounts.data()); + BitVectorNavigator::GenPopCounts(num_rows_, has_next_bitvec_.data(), + has_next_popcounts_.data()); + + row_numbers_sorted_on_prev_.resize(num_rows); + GenRowNumbersSortedOnPrev(prev.data(), has_prev_bitvec.data(), + has_prev_popcounts.data(), + row_numbers_sorted_on_prev_.data()); + + // std::vector row_numbers_sorted_on_prev1(num_rows); + // count_prev_LE_.resize(num_rows); + // GenCountLE(num_rows, num_rows - 1, prev.data(), count_prev_LE_.data(), + // row_numbers_sorted_on_prev1.data()); + + // for (int64_t i = 0; i < num_rows_; ++i) { + // ARROW_DCHECK(row_numbers_sorted_on_prev[i] == + // row_numbers_sorted_on_prev1[i]); + // } + + merge_tree_.Build(num_rows_, row_numbers_sorted_on_prev_.data(), + /*num_levels_to_skip=*/0, hardware_flags, temp_vector_stack); +} + +void WindowDistinct::Count(const WindowFrames& frames, int64_t* output, + int64_t hardware_flags, + util::TempVectorStack* temp_vector_stack) const { + memset(output, 0, frames.num_frames * sizeof(int64_t)); + + int64_t batch_length_max = util::MiniBatch::kMiniBatchLength; + auto query_states_buf = util::TempVectorHolder( + temp_vector_stack, static_cast(batch_length_max)); + MergeTree::RangeQueryState* query_states = query_states_buf.mutable_data(); + auto query_outputs_buf = util::TempVectorHolder( + temp_vector_stack, static_cast(batch_length_max)); + MergeTree::RangeQueryState* query_outputs = query_outputs_buf.mutable_data(); + + int64_t num_frames = frames.num_frames; + const int64_t* begins = frames.begins[0]; + const int64_t* ends = frames.ends[0]; + + int tree_height = merge_tree_.get_height(); + + if (tree_height == 1) { + // Special case when there is at most one row + // + ARROW_DCHECK(num_rows_ < 2); + if (num_rows_) { + output[0] = (frames.ends[0][0] > frames.begins[0][0]) ? 1 : 0; + } + return; + } + + for (int64_t batch_begin = 0; batch_begin < num_frames; + batch_begin += batch_length_max) { + int64_t batch_length_next = std::min(num_frames - batch_begin, batch_length_max); + + for (int64_t i = batch_begin; i < batch_begin + batch_length_next; ++i) { + int64_t top_level_rank = CountPrevLE(begins[i]); + // int64_t top_level_rank1 = count_prev_LE_[begins[i]]; + // ARROW_DCHECK(top_level_rank == top_level_rank1); + query_states[i - batch_begin].pos[0] = + MergeTree::RangeQueryState::PosFromNodeAndLength(tree_height - 1, 0LL, + top_level_rank); + query_states[i - batch_begin].pos[1] = MergeTree::RangeQueryState::kEmpty; + } + + for (int level = tree_height - 1; level > 0; --level) { + merge_tree_.RangeQueryStep(level, batch_length_next, begins + batch_begin, + ends + batch_begin, query_states, query_outputs); + for (int64_t i = batch_begin; i < batch_begin + batch_length_next; ++i) { + for (int ioutput_pos = 0; ioutput_pos < 2; ++ioutput_pos) { + int64_t output_pos = query_outputs[i - batch_begin].pos[ioutput_pos]; + if (output_pos != MergeTree::RangeQueryState::kEmpty) { + int64_t output_node; + int64_t output_length; + MergeTree::RangeQueryState::NodeAndLengthFromPos( + level - 1, output_pos, &output_node, &output_length); + output[i] += output_length; + } + } + } + } + } +} + +void WindowDistinct::Sum(const WindowFrames& frames, WindowAggregateFunc* func, + WindowAggregateFunc::Vector* results, uint8_t* results_validity, + int64_t hardware_flags, + util::TempVectorStack* temp_vector_stack) const { + int64_t num_frames = frames.num_frames; + const int64_t* begins = frames.begins[0]; + const int64_t* ends = frames.ends[0]; + + int64_t batch_length_max = util::MiniBatch::kMiniBatchLength; + + int tree_height = merge_tree_.get_height(); + + // Zero out the result + // + func->Zero(num_frames, results, 0); + memset(results_validity, 0xff, bit_util::BytesForBits(num_frames)); + + // Allocate temporary buffers + // + + WindowAggregateFunc::Vector* sum_buf[2]; + sum_buf[0] = func->Alloc(num_frames); + sum_buf[1] = func->Alloc(num_frames); + + if (tree_height == 1) { + // Special case when there is at most one row + // + ARROW_DCHECK(num_rows_ < 2); + if (num_rows_) { + if (ends[0] > begins[0]) { + func->Zero(1, results, 0); + int64_t out_id = 0, in_id = 0; + func->Sum(1, results, &out_id, nullptr, &in_id); + } else { + bit_util::ClearBit(results_validity, 0); + } + } + return; + } + + // Initialize query states + // + std::vector query_states(num_frames); + for (int64_t i = 0; i < num_frames; ++i) { + if (ends[i] == begins[i]) { + bit_util::ClearBit(results_validity, i); + } + int64_t top_level_rank = CountPrevLE(begins[i]); + query_states[i].pos[0] = MergeTree::RangeQueryState::PosFromNodeAndLength( + tree_height - 1, 0LL, top_level_rank); + query_states[i].pos[1] = MergeTree::RangeQueryState::kEmpty; + } + + auto query_outputs_buf = util::TempVectorHolder( + temp_vector_stack, static_cast(batch_length_max)); + MergeTree::RangeQueryState* query_outputs = query_outputs_buf.mutable_data(); + + // Two elements per batch row in these temp buffers + // + auto add_dst_ids_buf = util::TempVectorHolder( + temp_vector_stack, static_cast(2 * batch_length_max)); + int64_t* add_dst_ids = add_dst_ids_buf.mutable_data(); + + auto add_src_ids_buf = util::TempVectorHolder( + temp_vector_stack, static_cast(2 * batch_length_max)); + int64_t* add_src_ids = add_src_ids_buf.mutable_data(); + + int64_t num_adds; + + for (int level = tree_height - 1; level > 0; --level) { + WindowAggregateFunc::Vector* sum_in = sum_buf[level & 1]; + WindowAggregateFunc::Vector* sum_out = sum_buf[1 - (level & 1)]; + + if (level == tree_height - 1) { + // Reorder source values based on row_numbers_sorted_on_prev_ + // + std::vector out_ids(num_frames); + std::iota(out_ids.begin(), out_ids.end(), 0); + func->Zero(num_frames, sum_in, 0); + func->Sum(num_frames, sum_in, out_ids.data(), nullptr, + row_numbers_sorted_on_prev_.data()); + } + + // Split values related to the current tree level + // + func->Split(num_frames, sum_out, sum_in, merge_tree_, level, hardware_flags, + temp_vector_stack); + + // Compute prefix sum of values related to the tree level below + // + WindowAggregateFunc::Vector* segmented_prefix_sum = sum_in; + func->PrefixSum(num_frames, segmented_prefix_sum, sum_out, level - 1); + + // Break into mini-batches + // + for (int64_t batch_begin = 0; batch_begin < num_frames; + batch_begin += batch_length_max) { + int64_t batch_length_next = std::min(num_frames - batch_begin, batch_length_max); + + merge_tree_.RangeQueryStep(level, batch_length_next, begins + batch_begin, + ends + batch_begin, query_states.data() + batch_begin, + query_outputs); + + num_adds = 0; + for (int64_t i = 0; i < batch_length_next; ++i) { + for (int ioutput_pos = 0; ioutput_pos < 2; ++ioutput_pos) { + int64_t output_pos = query_outputs[i].pos[ioutput_pos]; + if (output_pos != MergeTree::RangeQueryState::kEmpty) { + int64_t output_node; + int64_t output_length; + MergeTree::RangeQueryState::NodeAndLengthFromPos( + level - 1, output_pos, &output_node, &output_length); + + add_dst_ids[num_adds] = batch_begin + i; + add_src_ids[num_adds] = (output_node << (level - 1)) + output_length - 1; + ++num_adds; + } + } + } + func->Sum(num_adds, results, add_dst_ids, segmented_prefix_sum, add_src_ids); + } + } + + delete sum_buf[0]; + delete sum_buf[1]; +} + +// Output 0 for the first row with each group id. +// Otherwise output 1 plus row number of the last preceding row with the same +// group id. +// Symmetrically 1 plus number of rows is output when there is no following +// row within the same group. +// +// Starting from 0 instead of -1 makes radix sorting easier. +// +void WindowDistinct::GenListsForGroups(int64_t num_rows, int64_t num_groups, + const int64_t* group_ids_sorted, + const int64_t* row_numbers_sorted_on_group_id, + int64_t* opt_prev, int64_t* opt_next) { + for (int64_t i = 0; i < num_rows; ++i) { + int64_t row_number = row_numbers_sorted_on_group_id[i]; + if (opt_prev) { + bool group_first = (i == 0 || group_ids_sorted[i] != group_ids_sorted[i - 1]); + int64_t prev = group_first ? 0LL : 1LL + row_numbers_sorted_on_group_id[i - 1]; + opt_prev[row_number] = prev; + } + if (opt_next) { + bool group_last = + (i == num_rows - 1 || group_ids_sorted[i] != group_ids_sorted[i + 1]); + int64_t next = + group_last ? 1LL + num_rows : 1LL + row_numbers_sorted_on_group_id[i + 1]; + opt_next[row_number] = next; + } + } +} + +void WindowDistinct::GenHasPrevNextBitvecs(int64_t num_rows, + const int64_t* group_ids_sorted, + const int64_t* row_numbers_sorted_on_group_ids, + uint64_t* has_prev_bitvec, + uint64_t* has_next_bitvec) { + if (has_prev_bitvec) { + memset(has_prev_bitvec, 0xff, bit_util::BytesForBits(num_rows)); + for (int64_t i = 0; i < num_rows; ++i) { + bool group_first = (i == 0 || group_ids_sorted[i] != group_ids_sorted[i - 1]); + if (group_first) { + bit_util::ClearBit(reinterpret_cast(has_prev_bitvec), + row_numbers_sorted_on_group_ids[i]); + } + } + } + if (has_next_bitvec) { + memset(has_next_bitvec, 0xff, bit_util::BytesForBits(num_rows)); + for (int64_t i = 0; i < num_rows; ++i) { + bool group_last = + (i == num_rows - 1 || group_ids_sorted[i] != group_ids_sorted[i + 1]); + if (group_last) { + bit_util::ClearBit(reinterpret_cast(has_next_bitvec), + row_numbers_sorted_on_group_ids[i]); + } + } + } +} + +void WindowDistinct::GenRowNumbersSortedOnPrev( + const int64_t* prev, const uint64_t* has_prev_bitvec, + const uint64_t* has_prev_popcounts, int64_t* row_numbers_sorted_on_prev) const { + for (int64_t i = 0; i < num_rows_; ++i) { + if (!bit_util::GetBit(reinterpret_cast(has_prev_bitvec), i)) { + int64_t rank = i - BitVectorNavigator::Rank(i, has_prev_bitvec, has_prev_popcounts); + row_numbers_sorted_on_prev[rank] = i; + } else { + int64_t rank = CountPrevLE(prev[i] - 1); + row_numbers_sorted_on_prev[rank] = i; + } + } +} + +// Return the number of rows with prev attribute that is less than or equal to +// the given value. This is used to initialize top-down merge tree traversal. +// +int64_t WindowDistinct::CountPrevLE(int64_t upper_bound) const { + return num_groups_ + BitVectorNavigator::Rank(upper_bound, has_next_bitvec_.data(), + has_next_popcounts_.data()); +} + +// void WindowDistinct::GenCountLE(int64_t num_rows, int64_t max_value, +// const int64_t *values, +// /* num_rows elements */ +// int64_t *count_LE, +// int64_t *row_numbers_sorted_on_values) { +// std::vector values_sorted(num_rows); +// // Sort all the rows on values. +// // This is sorting of a dense set of integers. We will use radix sort. +// // +// RadixSort::Sort(num_rows, max_value, values, values_sorted.data(), +// row_numbers_sorted_on_values); + +// int64_t output_cursor = 0LL; +// int64_t last_output_value = 0LL; +// for (int64_t i = 0; i < num_rows; ++i) { +// int64_t value = values_sorted[i]; +// bool last_in_tie_group = +// (i == (num_rows - 1)) || values_sorted[i + 1] != values_sorted[i]; +// if (last_in_tie_group) { +// while (output_cursor < value) { +// count_LE[output_cursor++] = last_output_value; +// } +// last_output_value = i + 1; +// count_LE[output_cursor++] = last_output_value; +// } +// } +// while (output_cursor <= max_value) { +// count_LE[output_cursor++] = last_output_value; +// } +// } + +void WindowDistinctBasic::Count(int64_t num_rows, const int64_t* group_ids, + const WindowFrames& frames, int64_t* output) { + if (num_rows == 0) { + return; + } + + const int64_t* begins = frames.begins[0]; + const int64_t* ends = frames.ends[0]; + + int64_t max_group_id = *std::max_element(group_ids, group_ids + num_rows); + std::vector group_id_set(max_group_id + 1); + + for (int64_t ioutput = 0; ioutput < num_rows; ++ioutput) { + int64_t begin = begins[ioutput]; + int64_t end = ends[ioutput]; + + for (int64_t i = 0; i < max_group_id + 1; ++i) { + group_id_set[i] = false; + } + int64_t result = 0LL; + for (int64_t irow = begin; irow < end; ++irow) { + int64_t group_id = group_ids[irow]; + if (!group_id_set[group_id]) { + group_id_set[group_id] = true; + ++result; + } + } + output[ioutput] = result; + } +} + +void WindowDistinctBasic::Sum(int64_t num_rows, const WindowFrames& frames, + const int64_t* group_ids, const int64_t* vals, + std::vector>& results, + uint8_t* results_validity) { + if (num_rows == 0) { + return; + } + + int64_t num_frames = frames.num_frames; + const int64_t* begins = frames.begins[0]; + const int64_t* ends = frames.ends[0]; + + int64_t max_group_id = *std::max_element(group_ids, group_ids + num_rows); + std::vector group_id_set(max_group_id + 1); + + results.resize(num_frames); + for (int64_t i = 0; i < num_frames; ++i) { + int64_t begin = begins[i]; + int64_t end = ends[i]; + + if (begin == end) { + bit_util::ClearBit(results_validity, i); + results[i] = std::make_pair(0ULL, 0ULL); + continue; + } + bit_util::SetBit(results_validity, i); + + for (int64_t i = 0; i < max_group_id + 1; ++i) { + group_id_set[i] = false; + } + + uint64_t overflow = 0ULL; + uint64_t result = 0ULL; + for (int64_t j = begin; j < end; ++j) { + int64_t group_id = group_ids[j]; + if (!group_id_set[group_id]) { + group_id_set[group_id] = true; + result += static_cast(vals[j]); + if (result < static_cast(vals[j])) { + ++overflow; + } + if (vals[j] < 0LL) { + overflow += ~0ULL; + } + } + } + + results[i] = std::make_pair(result, overflow); + } +} + +void WindowDistinctTest::TestCountOrSum(bool test_sum) { + Random64BitCopy rand; + MemoryPool* pool = default_memory_pool(); + int64_t hardware_flags = 0LL; + util::TempVectorStack temp_vector_stack; + Status status = temp_vector_stack.Init(pool, 128 * util::MiniBatch::kMiniBatchLength); + ARROW_DCHECK(status.ok()); + + constexpr int64_t num_groups_max = 32; + constexpr int64_t num_rows_max = 8 * 1024; // 64; + constexpr int64_t num_tests = 30; + + int num_tests_to_skip = 0; + for (int64_t itest = 0; itest < num_tests; ++itest) { + int64_t num_rows = rand.from_range(static_cast(1), num_rows_max); + int64_t num_groups = + std::min(num_rows, rand.from_range(static_cast(1), num_groups_max)); + int64_t num_repeats = 1; +#ifdef NDEBUG + num_repeats = bit_util::CeilDiv(128 * 1024, num_rows); +#endif + printf("num_repeats = %d ", static_cast(num_repeats)); + std::vector values; + std::vector group_ids; + values.resize(num_rows); + group_ids.resize(num_rows); + std::unordered_set uniques; + std::vector group_keys; + for (int64_t i = 0; i < num_groups; ++i) { + for (;;) { + int64_t group_key = rand.next(); + if (uniques.find(group_key) == uniques.end()) { + uniques.insert(group_key); + group_keys.push_back(group_key); + break; + } + } + } + for (int64_t i = 0; i < num_rows; ++i) { + int64_t group_id = + i < num_groups ? i : rand.from_range(static_cast(0), num_groups - 1); + values[i] = group_keys[group_id]; + group_ids[i] = group_id; + } + + // Generate random frames + // + std::vector begins; + std::vector ends; + GenerateTestFrames(rand, num_rows, begins, ends, /*progressive=*/false, + /*expansive=*/false); + WindowFrames frames; + frames.num_ranges_in_frame = 1; + frames.num_frames = num_rows; + frames.begins[0] = begins.data(); + frames.ends[0] = ends.data(); + + // Generate random values for sum if needed + // + std::vector vals; + WinAggFun_SumInt64 func; + if (test_sum) { + vals.resize(num_rows); + constexpr int64_t max_val = 65535; + for (int64_t i = 0; i < num_rows; ++i) { + vals[i] = rand.from_range(static_cast(0), max_val); + } + func.Init(num_rows, vals.data()); + } + + if (itest < num_tests_to_skip) { + continue; + } + + std::vector output, output_basic; + if (!test_sum) { + output.resize(num_rows); + output_basic.resize(num_rows); + } + + std::vector> output_sum_basic; + std::vector output_sum_validity, output_sum_validity_basic; + WinAggFun_SumInt64::VectorUint128* output_sum = nullptr; + + if (test_sum) { + output_sum = func.Alloc(num_rows); + output_sum_basic.resize(num_rows); + output_sum_validity.resize(bit_util::BytesForBits(num_rows)); + output_sum_validity_basic.resize(bit_util::BytesForBits(num_rows)); + } + + // int64_t start = __rdtsc(); + for (int64_t irepeat = 0; irepeat < num_repeats; ++irepeat) { + if (test_sum) { + WindowDistinctBasic::Sum(num_rows, frames, group_ids.data(), vals.data(), + output_sum_basic, output_sum_validity_basic.data()); + } else { + WindowDistinctBasic::Count(num_rows, group_ids.data(), frames, + output_basic.data()); + } + } + // int64_t end = __rdtsc(); + // printf("CountDistinct::Run cycles per frame %.1f\n", + // static_cast(end - start) / static_cast(num_rows * + // num_repeats)); + + // start = __rdtsc(); + for (int64_t irepeat = 0; irepeat < num_repeats; ++irepeat) { + WindowDistinct eval; + eval.Init(num_rows, num_groups, group_ids.data(), hardware_flags, + &temp_vector_stack); + if (test_sum) { + eval.Sum(frames, &func, output_sum, output_sum_validity.data(), hardware_flags, + &temp_vector_stack); + } else { + eval.Count(frames, output.data(), hardware_flags, &temp_vector_stack); + } + } + // end = __rdtsc(); + // printf("CountDistinct::RunBasic cycles per frame %.1f\n", + // static_cast(end - start) / static_cast(num_rows * + // num_repeats)); + + bool ok = true; + if (test_sum) { + for (int64_t i = 0; i < num_rows; ++i) { + bool l_valid = bit_util::GetBit(output_sum_validity_basic.data(), i); + bool r_valid = bit_util::GetBit(output_sum_validity.data(), i); + if (l_valid != r_valid) { + ARROW_DCHECK(false); + ok = false; + break; + } + if (l_valid && r_valid) { + if (output_sum_basic[i].first != output_sum->vals_[i].first || + output_sum_basic[i].second != output_sum->vals_[i].second) { + ARROW_DCHECK(false); + ok = false; + break; + } + } + } + } else { + for (int64_t i = 0; i < num_rows; ++i) { + if (output[i] != output_basic[i]) { + ARROW_DCHECK(false); + ok = false; + break; + } + } + } + printf("%s\n", ok ? "correct" : "wrong"); + + if (output_sum) { + delete output_sum; + } + } +} + +} // namespace compute +} // namespace arrow diff --git a/cpp/src/arrow/compute/exec/window_functions/window_distinct.h b/cpp/src/arrow/compute/exec/window_functions/window_distinct.h new file mode 100644 index 00000000000..2deb3828148 --- /dev/null +++ b/cpp/src/arrow/compute/exec/window_functions/window_distinct.h @@ -0,0 +1,89 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include "arrow/compute/exec/util.h" +#include "arrow/compute/exec/window_functions/merge_tree.h" +#include "arrow/compute/exec/window_functions/window_expr.h" +#include "arrow/compute/exec/window_functions/window_frame.h" + +namespace arrow { +namespace compute { + +class WindowDistinct { + public: + void Init(int64_t num_rows, int64_t num_groups, const int64_t* group_ids, + /*const WindowFrames &frames,*/ int64_t hardware_flags, + util::TempVectorStack* stack); + void Count(const WindowFrames& frames, int64_t* output, int64_t hardware_flags, + util::TempVectorStack* temp_vector_stack) const; + void Sum(const WindowFrames& frames, WindowAggregateFunc* func, + WindowAggregateFunc::Vector* results, uint8_t* results_validity, + int64_t hardware_flags, util::TempVectorStack* temp_vector_stack) const; + + private: + static void GenListsForGroups(int64_t num_rows, int64_t num_groups, + const int64_t* group_ids_sorted, + const int64_t* row_numbers_sorted_on_group_id, + int64_t* opt_prev, int64_t* opt_next); + + // static void GenCountLE(int64_t num_rows, int64_t max_value, + // const int64_t *values, + // /* num_rows elements */ + // int64_t *count_LE, + // int64_t *row_numbers_sorted_on_values); + + static void GenHasPrevNextBitvecs(int64_t num_rows, const int64_t* group_ids_sorted, + const int64_t* row_numbers_sorted_on_group_ids, + uint64_t* has_prev_bitvec, uint64_t* has_next_bitvec); + + void GenRowNumbersSortedOnPrev(const int64_t* prev, const uint64_t* has_prev_bitvec, + const uint64_t* has_prev_popcounts, + int64_t* row_numbers_sorted_on_prev) const; + + int64_t CountPrevLE(int64_t upper_bound) const; + + int64_t num_rows_; + int64_t num_groups_; + MergeTree merge_tree_; + // std::vector prev_; + // std::vector next_; + std::vector has_next_bitvec_; + std::vector has_next_popcounts_; + // std::vector count_prev_LE_; + std::vector row_numbers_sorted_on_prev_; +}; + +class WindowDistinctBasic { + public: + static void Count(int64_t num_rows, const int64_t* group_ids, + const WindowFrames& frames, int64_t* output); + static void Sum(int64_t num_rows, const WindowFrames& frames, const int64_t* group_ids, + const int64_t* vals, + std::vector>& results, + uint8_t* results_validity); +}; + +class WindowDistinctTest { + public: + static void TestCountOrSum(bool test_sum); +}; + +} // namespace compute +} // namespace arrow \ No newline at end of file diff --git a/cpp/src/arrow/compute/exec/window_functions/window_expr.h b/cpp/src/arrow/compute/exec/window_functions/window_expr.h new file mode 100644 index 00000000000..d34035c2739 --- /dev/null +++ b/cpp/src/arrow/compute/exec/window_functions/window_expr.h @@ -0,0 +1,143 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include +#include "arrow/compute/exec/util.h" +#include "arrow/compute/exec/window_functions/bit_vector_navigator.h" +#include "arrow/compute/exec/window_functions/merge_tree.h" + +// This file collects all interface used by window function implementation that +// need to be aware of different data types and operators operating on them +// (addition, subtraction, comparison). +// +// The rest of window function implementation does not ever need to access and +// interpret input values directly. +// + +namespace arrow { +namespace compute { + +class WindowAggregateFunc { + public: + class Vector { + public: + virtual ~Vector() {} + }; + // virtual bool HasInverse() const; + virtual Vector* Alloc(int64_t num) = 0; + virtual void Zero(int64_t num, Vector* out_vec, int64_t out_begin) = 0; + virtual void Sum(int64_t num, Vector* out_vec, const int64_t* out_ids, + // If in_vec is null then use source array values instead + const Vector* in_vec, const int64_t* in_ids) = 0; + // virtual void Take(int64_t num, Vector *out_vec, int64_t out_begin, + // const Vector *in_vec, const int64_t *in_ids, + // const uint8_t *opt_in_invert_bitvec) = 0; + // virtual void PrefixSum(int64_t num, Vector *out_vec, int64_t out_begin, + // bool preceded_by_base_value, + // int log_reset_distance = 62) = 0; + virtual void Split(int64_t num, Vector* out_vec, const Vector* in_vec, + const MergeTree& merge_tree, int level, int64_t hardware_flags, + util::TempVectorStack* temp_vector_stack) = 0; + // Inclusive prefix sum + // + virtual void PrefixSum(int64_t num, Vector* out_vec, const Vector* in_vec, + int log_segment_length) = 0; +}; + +class WinAggFun_SumInt64 : public WindowAggregateFunc { + public: + class VectorUint128 : public WindowAggregateFunc::Vector { + public: + VectorUint128(int64_t num) { vals_.resize(num); } + ~VectorUint128() override {} + std::vector> vals_; + }; + WinAggFun_SumInt64() : num_rows_(0), source_(nullptr) {} + void Init(int64_t num_rows, const int64_t* source) { + num_rows_ = num_rows; + source_ = source; + } + VectorUint128* Alloc(int64_t num) override { return new VectorUint128(num); } + void Zero(int64_t num, Vector* out_vec, int64_t out_begin) override { + VectorUint128* out_vec_cast = reinterpret_cast(out_vec); + for (int64_t i = 0; i < num; ++i) { + out_vec_cast->vals_[i] = std::make_pair(0ULL, 0ULL); + } + } + void Sum(int64_t num, Vector* out_vec, const int64_t* out_ids, + // If in_vec is null then use source array values instead + const Vector* in_vec, const int64_t* in_ids) override { + VectorUint128* out_vec_cast = reinterpret_cast(out_vec); + if (in_vec) { + const VectorUint128* in_vec_cast = reinterpret_cast(in_vec); + for (int64_t i = 0; i < num; ++i) { + std::pair& dst = out_vec_cast->vals_[out_ids[i]]; + const std::pair& src = in_vec_cast->vals_[in_ids[i]]; + dst.first += src.first; + dst.second += src.second + (dst.first < src.first ? 1 : 0); + } + } else { + for (int64_t i = 0; i < num; ++i) { + std::pair& dst = out_vec_cast->vals_[out_ids[i]]; + uint64_t src = static_cast(source_[in_ids[i]]); + dst.first += src; + dst.second += static_cast(-static_cast(src >> 63)) + + (dst.first < src ? 1 : 0); + } + } + } + + void Split(int64_t num, Vector* out_vec, const Vector* in_vec, + const MergeTree& merge_tree, int level, int64_t hardware_flags, + util::TempVectorStack* temp_vector_stack) override { + const VectorUint128* in_vec_cast = reinterpret_cast(in_vec); + VectorUint128* out_vec_cast = reinterpret_cast(out_vec); + merge_tree.Split(level, in_vec_cast->vals_.data(), out_vec_cast->vals_.data(), + hardware_flags, temp_vector_stack); + } + + void PrefixSum(int64_t num, Vector* out_vec, const Vector* in_vec, + int log_segment_length) override { + const VectorUint128* in_vec_cast = reinterpret_cast(in_vec); + VectorUint128* out_vec_cast = reinterpret_cast(out_vec); + + int64_t mask = (1LL << log_segment_length) - 1; + + uint64_t sum_lo, sum_hi; + for (int64_t i = 0; i < num; ++i) { + if ((i & mask) == 0) { + sum_lo = sum_hi = 0; + } + const std::pair& src = in_vec_cast->vals_[i]; + std::pair& dst = out_vec_cast->vals_[i]; + sum_lo += src.first; + sum_hi += src.second + (sum_lo < src.first ? 1 : 0); + dst.first = sum_lo; + dst.second = sum_hi; + } + } + + private: + int64_t num_rows_; + const int64_t* source_; +}; + +} // namespace compute +} // namespace arrow diff --git a/cpp/src/arrow/compute/exec/window_functions/window_frame.h b/cpp/src/arrow/compute/exec/window_functions/window_frame.h new file mode 100644 index 00000000000..4762f040099 --- /dev/null +++ b/cpp/src/arrow/compute/exec/window_functions/window_frame.h @@ -0,0 +1,143 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include +#include "arrow/compute/exec/util.h" + +namespace arrow { +namespace compute { + +struct WindowFrames { + static constexpr int kMaxRangesInFrame = 3; + + int num_ranges_in_frame; + int64_t num_frames; + + // Range can be empty, in that case begin == end. Otherwise begin < end. + // + // Ranges in a single frame must be disjoint but begin of next range can be + // equal to the end of the previous one. + // + const int64_t* begins[kMaxRangesInFrame]; + const int64_t* ends[kMaxRangesInFrame]; + + // Row filter has bits set to 0 for rows that should not be included in the + // range. + // + // Null row filter means that all rows are qualified. + // + const uint8_t* row_filter; + + bool FramesProgressing() const { + for (int64_t i = 1; i < num_frames; ++i) { + if (!(begins[i] >= begins[i - 1] && ends[i] >= ends[i - 1])) { + return false; + } + } + return true; + } + + bool FramesExpanding() const { + for (int64_t i = 1; i < num_frames; ++i) { + if (!((begins[i] >= ends[i - 1] || begins[i] == begins[i - 1]) && + (ends[i] >= ends[i - 1]))) { + return false; + } + } + return true; + } +}; + +inline void GenerateTestFrames(Random64BitCopy& rand, int64_t num_rows, + std::vector& begins, std::vector& ends, + bool progressive, bool expansive) { + begins.resize(num_rows); + ends.resize(num_rows); + + if (!progressive && !expansive) { + constexpr int64_t max_frame_length = 100; + for (int64_t i = 0; i < num_rows; ++i) { + int64_t length = + rand.from_range(static_cast(0), std::min(num_rows, max_frame_length)); + int64_t begin = rand.from_range(static_cast(0), num_rows - length); + begins[i] = begin; + ends[i] = begin + length; + } + } else if (progressive && !expansive) { + int64_t dist = rand.from_range(static_cast(1), + std::max(static_cast(1), num_rows / 4)); + std::vector pos; + for (int64_t i = 0; i < num_rows + dist; ++i) { + pos.push_back(rand.from_range(static_cast(0), num_rows)); + } + std::sort(pos.begin(), pos.end()); + for (int64_t i = 0; i < num_rows; ++i) { + begins[i] = pos[i]; + ends[i] = pos[i + dist]; + } + } else { + int64_t num_partitions = + rand.from_range(static_cast(1), bit_util::CeilDiv(num_rows, 128LL)); + std::set partition_ends_set; + std::vector partition_ends; + partition_ends_set.insert(num_rows); + partition_ends.push_back(num_rows); + for (int64_t i = 1; i < num_partitions; ++i) { + int64_t partition_end; + for (;;) { + partition_end = rand.from_range(static_cast(1), num_rows - 1); + if (partition_ends_set.find(partition_end) == partition_ends_set.end()) { + break; + } + } + partition_ends.push_back(partition_end); + partition_ends_set.insert(partition_end); + } + std::sort(partition_ends.begin(), partition_ends.end()); + for (int64_t ipartition = 0; ipartition < num_partitions; ++ipartition) { + int64_t partition_begin = ipartition == 0 ? 0LL : partition_ends[ipartition - 1]; + int64_t partition_end = partition_ends[ipartition]; + int64_t partition_length = partition_end - partition_begin; + int64_t begin = rand.from_range(0LL, 2LL); + + if (begin >= partition_length) { + begin = partition_length - 1; + } + int64_t end = begin + rand.from_range(0LL, 2LL); + if (end > partition_length) { + end = partition_length; + } + begins[partition_begin + 0] = partition_begin + begin; + ends[partition_begin + 0] = partition_begin + end; + for (int64_t i = 1; i < partition_length; ++i) { + int64_t end_step = rand.from_range(0LL, 2LL); + end += end_step; + if (end > partition_length) { + end = partition_length; + } + begins[partition_begin + i] = partition_begin + begin; + ends[partition_begin + i] = partition_begin + end; + } + } + } +} + +} // namespace compute +} // namespace arrow