diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt index 2cae7a731f9..0aee6e30cd6 100644 --- a/cpp/src/arrow/CMakeLists.txt +++ b/cpp/src/arrow/CMakeLists.txt @@ -408,6 +408,9 @@ if(ARROW_COMPUTE) compute/exec/tpch_node.cc compute/exec/union_node.cc compute/exec/util.cc + compute/exec/window_functions/bit_vector_navigator.cc + compute/exec/window_functions/merge_tree.cc + compute/exec/window_functions/window_quantiles.cc compute/function.cc compute/function_internal.cc compute/kernel.cc diff --git a/cpp/src/arrow/compute/exec/util.h b/cpp/src/arrow/compute/exec/util.h index 7e716808fa0..210bfae7e8c 100644 --- a/cpp/src/arrow/compute/exec/util.h +++ b/cpp/src/arrow/compute/exec/util.h @@ -19,6 +19,7 @@ #include #include +#include #include #include #include @@ -361,5 +362,22 @@ struct ARROW_EXPORT TableSinkNodeConsumer : public SinkNodeConsumer { util::Mutex consume_mutex_; }; +class Random64BitCopy { + public: + Random64BitCopy() : rs{0, 0, 0, 0, 0, 0, 0, 0}, re(rs) {} + uint64_t next() { return rdist(re); } + template + inline T from_range(const T& min_val, const T& max_val) { + return static_cast(min_val + (next() % (max_val - min_val + 1))); + } + std::mt19937& get_engine() { return re; } + + private: + std::random_device rd; + std::seed_seq rs; + std::mt19937 re; + std::uniform_int_distribution rdist; +}; + } // namespace compute } // namespace arrow diff --git a/cpp/src/arrow/compute/exec/window_functions/bit_vector_navigator.cc b/cpp/src/arrow/compute/exec/window_functions/bit_vector_navigator.cc new file mode 100644 index 00000000000..6a7b6da16ec --- /dev/null +++ b/cpp/src/arrow/compute/exec/window_functions/bit_vector_navigator.cc @@ -0,0 +1,121 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/compute/exec/window_functions/bit_vector_navigator.h" +#include +#include "arrow/compute/exec/util.h" + +namespace arrow { +namespace compute { + +void BitVectorNavigator::SelectsForRangeOfRanks( + int64_t rank_begin, int64_t rank_end, int64_t num_bits, const uint64_t* bitvec, + const uint64_t* popcounts, int64_t* outputs, int64_t hardware_flags, + util::TempVectorStack* temp_vector_stack) { + ARROW_DCHECK(rank_begin <= rank_end); + if (rank_begin == rank_end) { + return; + } + int64_t popcount_all = PopCount(num_bits, bitvec, popcounts); + if (rank_end <= 0LL) { + for (int64_t i = 0LL; i < rank_end - rank_begin; ++i) { + outputs[i] = -1LL; + } + return; + } + if (rank_begin >= popcount_all) { + for (int64_t i = 0LL; i < rank_end - rank_begin; ++i) { + outputs[i] = num_bits; + } + return; + } + if (rank_begin < 0LL) { + for (int64_t i = 0LL; i < -rank_begin; ++i) { + outputs[i] = -1LL; + } + outputs += -rank_begin; + rank_begin = 0LL; + } + if (rank_end > popcount_all) { + for (int64_t i = popcount_all - rank_begin; i < rank_end - rank_begin; ++i) { + outputs[i] = num_bits; + } + rank_end = popcount_all; + } + + int64_t minibatch_length_max = util::MiniBatch::kMiniBatchLength; + auto indexes = util::TempVectorHolder( + temp_vector_stack, static_cast(minibatch_length_max)); + int num_indexes; + + int64_t first_select = + BitVectorNavigator::Select(rank_begin, num_bits, bitvec, popcounts); + int64_t last_select = + BitVectorNavigator::Select(rank_begin, num_bits, bitvec, popcounts); + + for (int64_t minibatch_begin = first_select; minibatch_begin < last_select + 1; + minibatch_begin += minibatch_length_max) { + int64_t minibatch_end = + std::min(last_select + 1, minibatch_begin + minibatch_length_max); + util::bit_util::bits_to_indexes( + /*bit_to_search=*/1, hardware_flags, minibatch_end - minibatch_begin, + reinterpret_cast(bitvec), &num_indexes, indexes.mutable_data(), + minibatch_begin); + for (int i = 0; i < num_indexes; ++i) { + outputs[i] = minibatch_begin + indexes.mutable_data()[i]; + } + outputs += num_indexes; + } +} + +void BitVectorNavigator::SelectsForRelativeRanksForRangeOfRows( + int64_t batch_begin, int64_t batch_end, int64_t rank_delta, int64_t num_rows, + const uint64_t* ties_bitvec, const uint64_t* ties_popcounts, int64_t* outputs, + int64_t hardware_flags, util::TempVectorStack* temp_vector_stack) { + // Break into mini-batches + int64_t minibatch_length_max = util::MiniBatch::kMiniBatchLength; + auto selects_for_ranks_buf = util::TempVectorHolder( + temp_vector_stack, static_cast(minibatch_length_max)); + auto selects_for_ranks = selects_for_ranks_buf.mutable_data(); + for (int64_t minibatch_begin = batch_begin; minibatch_begin < batch_end; + minibatch_begin += minibatch_length_max) { + int64_t minibatch_end = std::min(batch_end, minibatch_begin + minibatch_length_max); + + // First and last rank that we are interested in + int64_t first_rank = + BitVectorNavigator::RankNext(minibatch_begin, ties_bitvec, ties_popcounts) - 1LL; + int64_t last_rank = + BitVectorNavigator::RankNext(minibatch_end - 1, ties_bitvec, ties_popcounts) - + 1LL; + + // Do select for each rank in the calculated range. + // + BitVectorNavigator::SelectsForRangeOfRanks( + first_rank + rank_delta, last_rank + rank_delta + 1, num_rows, ties_bitvec, + ties_popcounts, selects_for_ranks, hardware_flags, temp_vector_stack); + + int irank = 0; + outputs[minibatch_begin - batch_begin] = selects_for_ranks[irank]; + for (int64_t i = minibatch_begin + 1; i < minibatch_end; ++i) { + irank += bit_util::GetBit(reinterpret_cast(ties_bitvec), i) ? 1 : 0; + outputs[minibatch_begin - batch_begin] = selects_for_ranks[irank]; + } + } +} + +} // namespace compute +} // namespace arrow diff --git a/cpp/src/arrow/compute/exec/window_functions/bit_vector_navigator.h b/cpp/src/arrow/compute/exec/window_functions/bit_vector_navigator.h new file mode 100644 index 00000000000..ba62e21ea12 --- /dev/null +++ b/cpp/src/arrow/compute/exec/window_functions/bit_vector_navigator.h @@ -0,0 +1,158 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include "arrow/compute/exec/util.h" +#include "arrow/util/bit_util.h" + +namespace arrow { +namespace compute { + +// Bit-vector allocated size must be multiple of 64-bits. +// There is exactly ceil(num_bits / 64) 64-bit population counters. +// +class BitVectorNavigator { + public: + static uint64_t GenPopCounts(int64_t num_bits, const uint64_t* bits, + uint64_t* pop_counts) { + int64_t num_pop_counts = (num_bits + 63) / 64; + uint64_t sum = 0; + for (int64_t i = 0; i < num_pop_counts; ++i) { + pop_counts[i] = sum; + sum += ARROW_POPCOUNT64(bits[i]); + } + return sum; + } + + // O(1) + static inline int64_t PopCount(int64_t num_bits, const uint64_t* bitvec, + const uint64_t* popcounts) { + int64_t last_word = (num_bits - 1) / 64; + return popcounts[last_word] + ARROW_POPCOUNT64(bitvec[last_word]); + } + + // O(log(N)) + // The output is set to -1 if rank is below zero and to num_bits if + // rank is above the maximum rank of any row in the represented range. + static inline int64_t Select(int64_t rank, int64_t num_bits, const uint64_t* bits, + const uint64_t* pop_counts) { + if (rank < 0) { + return -1LL; + } + int64_t max_rank = PopCount(num_bits, bits, pop_counts) - 1LL; + if (rank > max_rank) { + return num_bits; + } + + int64_t num_pop_counts = (num_bits + 63) / 64; + // Find index of 64-bit block that contains the nth set bit. + int64_t block_id = (std::upper_bound(pop_counts, pop_counts + num_pop_counts, + static_cast(rank)) - + pop_counts) - + 1; + // Binary search position of (n - pop_count + 1)th bit set in the 64-bit + // block. + uint64_t block = bits[block_id]; + int64_t bit_rank = rank - pop_counts[block_id]; + int bit_id = 0; + for (int half_bits = 32; half_bits >= 1; half_bits /= 2) { + uint64_t mask = ((1ULL << half_bits) - 1ULL); + int64_t lower_half_pop_count = ARROW_POPCOUNT64(block & mask); + if (bit_rank >= lower_half_pop_count) { + block >>= half_bits; + bit_rank -= lower_half_pop_count; + bit_id += half_bits; + } + } + return block_id * 64 + bit_id; + } + + // TODO: We could implement BitVectorNavigator::Select that works on batches + // instead of single rows. Then it could use precomputed static B-tree to + // speed up binary search. + // + + // O(1) + // Input row number must be valid (between 0 and number of rows less 1). + static inline int64_t Rank(int64_t pos, const uint64_t* bits, + const uint64_t* pop_counts) { + int64_t block = pos >> 6; + int offset = static_cast(pos & 63LL); + uint64_t mask = (1ULL << offset) - 1ULL; + int64_t rank1 = + static_cast(pop_counts[block]) + ARROW_POPCOUNT64(bits[block] & mask); + return rank1; + } + + // O(1) + // Rank of the next row (also valid for the last row when the next row would + // be outside of the range of rows). + static inline int64_t RankNext(int64_t pos, const uint64_t* bits, + const uint64_t* pop_counts) { + int64_t block = pos >> 6; + int offset = static_cast(pos & 63LL); + uint64_t mask = ~0ULL >> (63 - offset); + int64_t rank1 = + static_cast(pop_counts[block]) + ARROW_POPCOUNT64(bits[block] & mask); + return rank1; + } + + // Input ranks may be outside of range of ranks present in the input bit + // vector. + // + static void SelectsForRangeOfRanks(int64_t rank_begin, int64_t rank_end, + int64_t num_bits, const uint64_t* bitvec, + const uint64_t* popcounts, int64_t* outputs, + int64_t hardware_flags, + util::TempVectorStack* temp_vector_stack); + + static void SelectsForRelativeRanksForRangeOfRows( + int64_t batch_begin, int64_t batch_end, int64_t rank_delta, int64_t num_rows, + const uint64_t* ties_bitvec, const uint64_t* ties_popcounts, int64_t* outputs, + int64_t hardware_flags, util::TempVectorStack* temp_vector_stack); + + template + static void GenSelectedIds(int64_t num_rows, const uint64_t* bitvec, INDEX_T* ids, + int64_t hardware_flags, + util::TempVectorStack* temp_vector_stack) { + // Break into mini-batches. + // + int64_t batch_length_max = util::MiniBatch::kMiniBatchLength; + auto batch_ids_buf = + util::TempVectorHolder(temp_vector_stack, batch_length_max); + auto batch_ids = batch_ids_buf.mutable_data(); + int batch_num_ids; + int64_t num_ids = 0; + for (int64_t batch_begin = 0; batch_begin < num_rows; + batch_begin += batch_length_max) { + int64_t batch_length = std::min(num_rows - batch_begin, batch_length_max); + util::bit_util::bits_to_indexes( + /*bit_to_search=*/1, hardware_flags, batch_length, + reinterpret_cast(bitvec + (batch_begin / 64)), &batch_num_ids, + batch_ids); + for (int i = 0; i < batch_num_ids; ++i) { + ids[num_ids + i] = static_cast(batch_begin + batch_ids[i]); + } + num_ids += batch_num_ids; + } + } +}; + +} // namespace compute +} // namespace arrow diff --git a/cpp/src/arrow/compute/exec/window_functions/merge_path.h b/cpp/src/arrow/compute/exec/window_functions/merge_path.h new file mode 100644 index 00000000000..802bf7129d7 --- /dev/null +++ b/cpp/src/arrow/compute/exec/window_functions/merge_path.h @@ -0,0 +1,114 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include + +namespace arrow { +namespace compute { + +class MergeUtil { + public: + // Merge Path + // https://birk.net.technion.ac.il/files/2021/01/OdehGreenMwassiShmueliBirk-MergePath-ParallelMergingMadeSimple-IPDPS_Workshops2012.pdf + // + // Given the desired size N of prefix of merged sequence, + // Find the sizes A and B of prefixes of two sorted input sequences that + // will generate these N elements. Return the size of the first sequence + // A. + // + template + static int64_t MergePath(int64_t n, int64_t cardA, int64_t cardB, + /* takes as an input int64_t index of left element + and index of right element */ + T_IS_LESS is_less) { + // Binary search for A + // in [AMIN = max(0, N - cardB); AMAX = min(cardA - 1, N - 1)] range for + // the largest value such that: + // inA[A] < inB[N - 1 - A] + // If inA[AMIN] >= inB[N - 1 - AMIN] then return AMIN - 1. + // A will be the largest index to use from A. + // + int64_t l = std::max(static_cast(0), n - cardB) - 1; + int64_t r = std::min(cardA, n); + while (l + 1 < r) { + int64_t m = (l + r) / 2; + if (is_less(m, n - 1 - m)) { + l = m; + } else { + r = m; + } + } + return l + 1; + } + + template + static bool IsNextFromA(int64_t offsetA, int64_t offsetB, int64_t cardA, int64_t cardB, + /* takes as an input int64_t index of left element + and index of right element */ + T_IS_LESS is_less) { + if (offsetA == cardA) { + ARROW_DCHECK(offsetB < cardB); + return false; + } else if (offsetB == cardB) { + ARROW_DCHECK(offsetA < cardA); + return true; + } else { + if (is_less(offsetA, offsetB)) { + return true; + } else { + return false; + } + } + } + + template + static void NthElement(int64_t n, int64_t cardA, int64_t cardB, bool* fromA, + int64_t* offset, + /* takes as an input int64_t index of left element + and index of right element */ + T_IS_LESS is_less) { + int64_t offsetA = MergePath(n, cardA, cardB, is_less); + int64_t offsetB = n - offsetA; + *fromA = IsNextFromA(offsetA, offsetB, cardA, cardB, is_less); + *offset = *fromA ? offsetA : offsetB; + } + + template + static void NthPair(int64_t n, int64_t cardA, int64_t cardB, bool* first_fromA, + int64_t* first_offset, bool* second_fromA, int64_t* second_offset, + /* takes as an input int64_t index of left element + and index of right element */ + T_IS_LESS is_less) { + int64_t offsetA = MergePath(n, cardA, cardB, is_less); + int64_t offsetB = n - offsetA; + *first_fromA = IsNextFromA(offsetA, offsetB, cardA, cardB, is_less); + *first_offset = *first_fromA ? offsetA : offsetB; + if (*first_fromA) { + ++offsetA; + } else { + ++offsetB; + } + *second_fromA = IsNextFromA(offsetA, offsetB, cardA, cardB, is_less); + *second_offset = *second_fromA ? offsetA : offsetB; + } +}; + +} // namespace compute +} // namespace arrow diff --git a/cpp/src/arrow/compute/exec/window_functions/merge_tree.cc b/cpp/src/arrow/compute/exec/window_functions/merge_tree.cc new file mode 100644 index 00000000000..aa374717298 --- /dev/null +++ b/cpp/src/arrow/compute/exec/window_functions/merge_tree.cc @@ -0,0 +1,231 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/compute/exec/window_functions/merge_tree.h" + +namespace arrow { +namespace compute { + +void MergeTree::Build(int64_t num_rows, const int64_t* permutation, + int num_levels_to_skip, int64_t hardware_flags, + util::TempVectorStack* temp_vector_stack) { + num_rows_ = num_rows; + if (num_rows == 0) { + return; + } + + int height = 1 + arrow::bit_util::Log2(num_rows); + level_bitvecs_.resize(height); + level_popcounts_.resize(height); + + int64_t num_bit_words = arrow::bit_util::CeilDiv(num_rows, 64); + + // We skip level 0 on purpose - it is not used. + // We also skip num_levels_to_skip from the top. + // + for (int level = 1; level < height - num_levels_to_skip; ++level) { + level_bitvecs_[level].resize(num_bit_words); + level_popcounts_[level].resize(num_bit_words); + } + + std::vector permutation_temp[2]; + permutation_temp[0].resize(num_rows); + permutation_temp[1].resize(num_rows); + int64_t* permutation_pingpong[2]; + permutation_pingpong[0] = permutation_temp[0].data(); + permutation_pingpong[1] = permutation_temp[1].data(); + + // Generate tree layers top-down + // + int top_level = height - num_levels_to_skip - 1; + for (int target_level = top_level; target_level > 0; --target_level) { + int flip = target_level % 2; + const int64_t* permutation_up = + (target_level == top_level - 1) ? permutation : permutation_pingpong[flip]; + if (target_level < top_level) { + int64_t* permutation_this = permutation_pingpong[1 - flip]; + Split(target_level + 1, permutation_up, permutation_this, hardware_flags, + temp_vector_stack); + } + const int64_t* permutation_this = + (target_level == top_level) ? permutation : permutation_pingpong[1 - flip]; + GenBitvec(target_level, permutation_this); + } +} + +void MergeTree::RangeQueryStep(int level, int64_t num_queries, const int64_t* begins, + const int64_t* ends, RangeQueryState* query_states, + RangeQueryState* query_outputs) const { + for (int64_t iquery = 0; iquery < num_queries; ++iquery) { + int64_t begin = begins[iquery]; + int64_t end = ends[iquery]; + RangeQueryState& state = query_states[iquery]; + RangeQueryState& output = query_outputs[iquery]; + ARROW_DCHECK(begin <= end && begin >= 0 && end <= num_rows_); + + RangeQueryState parent_state; + parent_state.pos[0] = state.pos[0]; + parent_state.pos[1] = state.pos[1]; + state.pos[0] = state.pos[1] = output.pos[0] = output.pos[1] = RangeQueryState::kEmpty; + + for (int iparent_pos = 0; iparent_pos < 2; ++iparent_pos) { + int64_t parent_pos = parent_state.pos[iparent_pos]; + if (parent_pos != RangeQueryState::kEmpty) { + RangeQueryState child_state; + Cascade(level, parent_pos, &child_state); + for (int ichild_pos = 0; ichild_pos < 2; ++ichild_pos) { + int64_t child_pos = child_state.pos[ichild_pos]; + if (child_pos != RangeQueryState::kEmpty) { + int64_t child_node; + int64_t child_length; + RangeQueryState::NodeAndLengthFromPos(level - 1, child_pos, &child_node, + &child_length); + if (NodeFullyInsideRange(level - 1, child_node, begin, end)) { + output.AppendPos(child_pos); + } else if (NodePartiallyInsideRange(level - 1, child_node, begin, end)) { + state.AppendPos(child_pos); + } + } + } + } + } + } +} + +void MergeTree::NthElement(int64_t num_queries, const uint16_t* opt_ids, + const int64_t* begins, const int64_t* ends, + /* ns[i] must be in the range [0; ends[i] - begins[i]) */ + const int64_t* ns, int64_t* row_numbers, + util::TempVectorStack* temp_vector_stack) const { + int64_t batch_length_max = util::MiniBatch::kMiniBatchLength; + + // Allocate temporary buffers + // + auto temp_begins_buf = util::TempVectorHolder( + temp_vector_stack, static_cast(batch_length_max)); + int64_t* temp_begins = temp_begins_buf.mutable_data(); + + auto temp_ends_buf = util::TempVectorHolder( + temp_vector_stack, static_cast(batch_length_max)); + int64_t* temp_ends = temp_ends_buf.mutable_data(); + + auto temp_ns_buf = util::TempVectorHolder( + temp_vector_stack, static_cast(batch_length_max)); + int64_t* temp_ns = temp_ns_buf.mutable_data(); + + for (int64_t batch_begin = 0; batch_begin < num_queries; + batch_begin += batch_length_max) { + int64_t batch_length = std::min(num_queries - batch_begin, batch_length_max); + + // Initialize tree cursors (begin and end of a range of some top level + // node for each query/frame). + // + if (opt_ids) { + for (int64_t i = 0; i < batch_length; ++i) { + uint16_t id = opt_ids[batch_begin + i]; + temp_begins[i] = begins[id]; + temp_ends[i] = ends[id]; + temp_ns[i] = ns[id]; + ARROW_DCHECK(temp_ns[i] >= 0 && temp_ns[i] < temp_ends[i] - temp_begins[i]); + } + } else { + memcpy(temp_begins, begins + batch_begin, batch_length * sizeof(temp_begins[0])); + memcpy(temp_ends, ends + batch_begin, batch_length * sizeof(temp_ends[0])); + memcpy(temp_ns, ns + batch_begin, batch_length * sizeof(temp_ns[0])); + } + + // Traverse the tree top-down + // + int top_level = static_cast(level_bitvecs_.size()) - 1; + for (int level = top_level; level > 0; --level) { + for (int64_t i = 0; i < batch_length; ++i) { + NthElementStep(level, temp_begins + i, temp_ends + i, temp_ns + i); + } + } + + // Output results + // + if (opt_ids) { + for (int64_t i = 0; i < batch_length; ++i) { + uint16_t id = opt_ids[batch_begin + i]; + row_numbers[id] = temp_begins[i]; + } + } else { + for (int64_t i = 0; i < batch_length; ++i) { + row_numbers[batch_begin + i] = temp_begins[i]; + } + } + } +} + +void MergeTree::GenBitvec( + /* level to generate for */ int level, const int64_t* permutation) { + uint64_t result = 0ULL; + for (int64_t base = 0; base < num_rows_; base += 64) { + for (int64_t i = base; i < std::min(base + 64, num_rows_); ++i) { + int64_t bit = (permutation[i] >> (level - 1)) & 1; + result |= static_cast(bit) << (i & 63); + } + level_bitvecs_[level][base / 64] = result; + result = 0ULL; + } + + BitVectorNavigator::GenPopCounts(num_rows_, level_bitvecs_[level].data(), + level_popcounts_[level].data()); +} + +void MergeTree::Cascade(int level, int64_t pos, RangeQueryState* result) const { + ARROW_DCHECK(level > 0); + + int64_t node; + int64_t length; + RangeQueryState::NodeAndLengthFromPos(level, pos, &node, &length); + + int64_t node_begin = node << level; + // We use RankNext for node_begin + length - 1 instead of Rank for node_begin + // + length, because the latter one may be equal to num_rows_ which is an + // index out of range for bitvector. + // + int64_t rank = + BitVectorNavigator::RankNext(node_begin + length - 1, level_bitvecs_[level].data(), + level_popcounts_[level].data()); + int64_t local_rank = rank - (node_begin / 2); + result->pos[0] = + RangeQueryState::PosFromNodeAndLength(level - 1, node * 2, length - local_rank); + bool has_right_child = + (node_begin + (static_cast(1) << (level - 1))) < num_rows_; + result->pos[1] = has_right_child ? RangeQueryState::PosFromNodeAndLength( + level - 1, node * 2 + 1, local_rank) + : RangeQueryState::kEmpty; +} + +bool MergeTree::NodeFullyInsideRange(int level, int64_t node, int64_t begin, + int64_t end) const { + int64_t node_begin = node << level; + int64_t node_end = std::min(num_rows_, node_begin + (static_cast(1) << level)); + return node_begin >= begin && node_end <= end; +} + +bool MergeTree::NodePartiallyInsideRange(int level, int64_t node, int64_t begin, + int64_t end) const { + int64_t node_begin = node << level; + int64_t node_end = std::min(num_rows_, node_begin + (static_cast(1) << level)); + return node_begin < end && node_end > begin; +} + +} // namespace compute +} // namespace arrow diff --git a/cpp/src/arrow/compute/exec/window_functions/merge_tree.h b/cpp/src/arrow/compute/exec/window_functions/merge_tree.h new file mode 100644 index 00000000000..fbab9c6886d --- /dev/null +++ b/cpp/src/arrow/compute/exec/window_functions/merge_tree.h @@ -0,0 +1,197 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include "arrow/compute/exec/util.h" +#include "arrow/util/bit_util.h" +#include "bit_vector_navigator.h" + +namespace arrow { +namespace compute { + +// TODO: Support multiple [begin, end) ranges in range and nth_element queries. +// + +// One way to think about MergeTree is that, when we traverse top down, we +// switch to sortedness on X axis, and when we traverse bottom up, we switch to +// sortedness on Y axis. At the lowest level of MergeTree rows are sorted on X +// and the highest level they are sorted on Y. +// +class MergeTree { + public: + MergeTree() : num_rows_(0) {} + + void Build(int64_t num_rows, const int64_t* permutation, int num_levels_to_skip, + int64_t hardware_flags, util::TempVectorStack* temp_vector_stack); + + int get_height() const { return num_rows_ ? 1 + arrow::bit_util::Log2(num_rows_) : 0; } + + template + void Split( + /* upper level */ int level, const S* in, S* out, int64_t hardware_flags, + util::TempVectorStack* temp_vector_stack) const { + int64_t lower_node_length = 1LL << (level - 1); + int64_t lower_node_mask = lower_node_length - 1LL; + + int64_t batch_length_max = util::MiniBatch::kMiniBatchLength; + int num_ids; + auto ids_buf = util::TempVectorHolder( + temp_vector_stack, static_cast(batch_length_max)); + uint16_t* ids = ids_buf.mutable_data(); + + // Break into mini-batches + int64_t rank_batch_begin[2]; + rank_batch_begin[0] = 0; + rank_batch_begin[1] = 0; + for (int64_t batch_begin = 0; batch_begin < num_rows_; + batch_begin += batch_length_max) { + int64_t batch_length = std::min(num_rows_ - batch_begin, batch_length_max); + + for (int child = 0; child <= 1; ++child) { + // Get parent node positions (relative to the batch) for all elements + // coming from left child + util::bit_util::bits_to_indexes( + child, hardware_flags, static_cast(batch_length), + reinterpret_cast(level_bitvecs_[level].data() + + batch_begin / 64), + &num_ids, ids); + + for (int i = 0; i < num_ids; ++i) { + int64_t upper_pos = batch_begin + ids[i]; + int64_t rank = rank_batch_begin[child] + i; + int64_t lower_pos = (rank & ~lower_node_mask) * 2 + child * lower_node_length + + (rank & lower_node_mask); + out[lower_pos] = in[upper_pos]; + } + rank_batch_begin[child] += num_ids; + } + } + } + + // State or output for range query. + // + // Represents between zero and two different nodes from a single level of the + // tree. + // + // For each node remembers the length of its prefix, which represents a + // subrange of selected elements of that node. + // + // Length is between 1 and the number of node elements at this level (both + // bounds inclusive), because empty set of selected elements is represented by + // a special constant kEmpty. + // + struct RangeQueryState { + static constexpr int64_t kEmpty = -1LL; + + static int64_t PosFromNodeAndLength(int level, int64_t node, int64_t length) { + if (length == 0) { + return kEmpty; + } + return (node << level) + length - 1; + } + + static void NodeAndLengthFromPos(int level, int64_t pos, int64_t* node, + int64_t* length) { + ARROW_DCHECK(pos != kEmpty); + *node = pos >> level; + *length = 1 + pos - (*node << level); + } + + void AppendPos(int64_t new_pos) { + // One of the two positions must be set to null + // + if (pos[0] == kEmpty) { + pos[0] = new_pos; + } else { + ARROW_DCHECK(pos[1] == kEmpty); + pos[1] = new_pos; + } + } + + int64_t pos[2]; + }; + + // Visiting each level updates state cursor pair and outputs state cursor + // pair. + // + void RangeQueryStep(int level, int64_t num_queries, const int64_t* begins, + const int64_t* ends, RangeQueryState* query_states, + RangeQueryState* query_outputs) const; + + int64_t NthElement(int64_t begin, int64_t end, int64_t n) const { + ARROW_DCHECK(n >= 0 && n < end - begin); + int64_t temp_begin = begin; + int64_t temp_end = end; + int64_t temp_n = n; + + // Traverse the tree top-down + // + int top_level = static_cast(level_bitvecs_.size()) - 1; + for (int level = top_level; level > 0; --level) { + NthElementStep(level, &temp_begin, &temp_end, &temp_n); + } + + return temp_begin; + } + + void NthElement(int64_t num_queries, const uint16_t* opt_ids, const int64_t* begins, + const int64_t* ends, + /* ns[i] must be in the range [0; ends[i] - begins[i]) */ + const int64_t* ns, int64_t* row_numbers, + util::TempVectorStack* temp_vector_stack) const; + + private: + /* output 0 if value comes from left child and 1 otherwise */ + void GenBitvec( + /* level to generate for */ int level, + /* source permutation of rows for elements in this level */ + const int64_t* permutation); + + void Cascade(int level, int64_t pos, RangeQueryState* result) const; + + bool NodeFullyInsideRange(int level, int64_t node, int64_t begin, int64_t end) const; + + bool NodePartiallyInsideRange(int level, int64_t node, int64_t begin, + int64_t end) const; + + void NthElementStep(int level, int64_t* begin, int64_t* end, int64_t* n) const { + int64_t node_length = 1LL << level; + uint64_t node_mask = node_length - 1; + int64_t node_begin = (*begin & ~node_mask); + + int64_t rank_begin = BitVectorNavigator::Rank(*begin, level_bitvecs_[level].data(), + level_popcounts_[level].data()); + int64_t rank_end = BitVectorNavigator::RankNext( + *end - 1, level_bitvecs_[level].data(), level_popcounts_[level].data()); + int64_t length_left = (*end - *begin) - (rank_end - rank_begin); + int64_t child_mask = (length_left <= *n ? ~0LL : 0LL); + + *begin = node_begin + ((node_length / 2 + rank_begin - node_begin / 2) & child_mask) + + (((*begin - node_begin) - (rank_begin - node_begin / 2)) & ~child_mask); + *end = *begin + ((rank_end - rank_begin) & child_mask) + (length_left & ~child_mask); + *n -= (length_left & child_mask); + } + + int64_t num_rows_; + std::vector> level_bitvecs_; + std::vector> level_popcounts_; +}; + +} // namespace compute +} // namespace arrow diff --git a/cpp/src/arrow/compute/exec/window_functions/window_frame.h b/cpp/src/arrow/compute/exec/window_functions/window_frame.h new file mode 100644 index 00000000000..4762f040099 --- /dev/null +++ b/cpp/src/arrow/compute/exec/window_functions/window_frame.h @@ -0,0 +1,143 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include +#include "arrow/compute/exec/util.h" + +namespace arrow { +namespace compute { + +struct WindowFrames { + static constexpr int kMaxRangesInFrame = 3; + + int num_ranges_in_frame; + int64_t num_frames; + + // Range can be empty, in that case begin == end. Otherwise begin < end. + // + // Ranges in a single frame must be disjoint but begin of next range can be + // equal to the end of the previous one. + // + const int64_t* begins[kMaxRangesInFrame]; + const int64_t* ends[kMaxRangesInFrame]; + + // Row filter has bits set to 0 for rows that should not be included in the + // range. + // + // Null row filter means that all rows are qualified. + // + const uint8_t* row_filter; + + bool FramesProgressing() const { + for (int64_t i = 1; i < num_frames; ++i) { + if (!(begins[i] >= begins[i - 1] && ends[i] >= ends[i - 1])) { + return false; + } + } + return true; + } + + bool FramesExpanding() const { + for (int64_t i = 1; i < num_frames; ++i) { + if (!((begins[i] >= ends[i - 1] || begins[i] == begins[i - 1]) && + (ends[i] >= ends[i - 1]))) { + return false; + } + } + return true; + } +}; + +inline void GenerateTestFrames(Random64BitCopy& rand, int64_t num_rows, + std::vector& begins, std::vector& ends, + bool progressive, bool expansive) { + begins.resize(num_rows); + ends.resize(num_rows); + + if (!progressive && !expansive) { + constexpr int64_t max_frame_length = 100; + for (int64_t i = 0; i < num_rows; ++i) { + int64_t length = + rand.from_range(static_cast(0), std::min(num_rows, max_frame_length)); + int64_t begin = rand.from_range(static_cast(0), num_rows - length); + begins[i] = begin; + ends[i] = begin + length; + } + } else if (progressive && !expansive) { + int64_t dist = rand.from_range(static_cast(1), + std::max(static_cast(1), num_rows / 4)); + std::vector pos; + for (int64_t i = 0; i < num_rows + dist; ++i) { + pos.push_back(rand.from_range(static_cast(0), num_rows)); + } + std::sort(pos.begin(), pos.end()); + for (int64_t i = 0; i < num_rows; ++i) { + begins[i] = pos[i]; + ends[i] = pos[i + dist]; + } + } else { + int64_t num_partitions = + rand.from_range(static_cast(1), bit_util::CeilDiv(num_rows, 128LL)); + std::set partition_ends_set; + std::vector partition_ends; + partition_ends_set.insert(num_rows); + partition_ends.push_back(num_rows); + for (int64_t i = 1; i < num_partitions; ++i) { + int64_t partition_end; + for (;;) { + partition_end = rand.from_range(static_cast(1), num_rows - 1); + if (partition_ends_set.find(partition_end) == partition_ends_set.end()) { + break; + } + } + partition_ends.push_back(partition_end); + partition_ends_set.insert(partition_end); + } + std::sort(partition_ends.begin(), partition_ends.end()); + for (int64_t ipartition = 0; ipartition < num_partitions; ++ipartition) { + int64_t partition_begin = ipartition == 0 ? 0LL : partition_ends[ipartition - 1]; + int64_t partition_end = partition_ends[ipartition]; + int64_t partition_length = partition_end - partition_begin; + int64_t begin = rand.from_range(0LL, 2LL); + + if (begin >= partition_length) { + begin = partition_length - 1; + } + int64_t end = begin + rand.from_range(0LL, 2LL); + if (end > partition_length) { + end = partition_length; + } + begins[partition_begin + 0] = partition_begin + begin; + ends[partition_begin + 0] = partition_begin + end; + for (int64_t i = 1; i < partition_length; ++i) { + int64_t end_step = rand.from_range(0LL, 2LL); + end += end_step; + if (end > partition_length) { + end = partition_length; + } + begins[partition_begin + i] = partition_begin + begin; + ends[partition_begin + i] = partition_begin + end; + } + } + } +} + +} // namespace compute +} // namespace arrow diff --git a/cpp/src/arrow/compute/exec/window_functions/window_quantiles.cc b/cpp/src/arrow/compute/exec/window_functions/window_quantiles.cc new file mode 100644 index 00000000000..24a50b2c601 --- /dev/null +++ b/cpp/src/arrow/compute/exec/window_functions/window_quantiles.cc @@ -0,0 +1,1085 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/compute/exec/window_functions/window_quantiles.h" + +namespace arrow { +namespace compute { + +void WindowQuantiles::ForFramesWithSeparateOrder( + int64_t num_rows, const int64_t* permutation, const Quantiles& quantiles, + const WindowFrames& frames, + // TODO: output a pair of vectors instead of vector of pairs + std::vector>& output_l, + std::vector>& output_r, + std::vector>& output_lerp, int64_t hardware_flags, + util::TempVectorStack* temp_vector_stack) { + const int64_t* frame_begins = frames.begins[0]; + const int64_t* frame_ends = frames.ends[0]; + + // Build pure merge tree - with no data in the nodes + // + MergeTree tree; + BuildMergeTree(num_rows, permutation, &tree, hardware_flags, temp_vector_stack); + + int num_quantiles = quantiles.num_quantiles; + + // Reserve space for output + // + output_l.resize(num_quantiles); + output_r.resize(num_quantiles); + output_lerp.resize(num_quantiles); + for (int i = 0; i < num_quantiles; ++i) { + output_l[i].resize(num_rows); + output_r[i].resize(num_rows); + output_lerp[i].resize(num_rows); + } + + // Run queries one quantile at a time, one mini-batch at a time. + // + int64_t batch_length_max = util::MiniBatch::kMiniBatchLength; + + for (int quantile = 0; quantile < num_quantiles; ++quantile) { + for (int64_t batch_begin = 0; batch_begin < num_rows; + batch_begin += batch_length_max / 2) { + int64_t batch_length = std::min(num_rows - batch_begin, batch_length_max / 2); + + QuantileMulti(batch_begin, batch_length, frames, quantiles.numerators[quantile], + quantiles.denominators[quantile], tree, output_l[quantile], + output_r[quantile], output_lerp[quantile], temp_vector_stack); + + // At this point we have row numbers relative to the sorted order + // represented by permutation array. To get original row numbers we have + // to map using permutation array. + // + for (int64_t i = batch_begin; i < batch_begin + batch_length; ++i) { + if (frame_begins[i] < frame_ends[i]) { + output_l[quantile][i] = permutation[output_l[quantile][i]]; + output_r[quantile][i] = permutation[output_r[quantile][i]]; + } + } + } + } +} + +// The result is the average of *output_value_l and *output_value_r. +// +bool WindowQuantiles::MADForEntirePartition(int64_t num_rows, const int64_t* vals, + const uint8_t* validity, + uint64_t* output_value_l, + uint64_t* output_value_r, + bool use_merge_path) { + ARROW_DCHECK(num_rows > 0); + + int64_t num_nulls = + validity ? num_rows - arrow::internal::CountSetBits(validity, /*offset=*/0, + static_cast(num_rows)) + : 0LL; + if (num_nulls == num_rows) { + return false; + } + + // Filter out nulls + std::vector vals_reordered; + if (validity && num_nulls > 0) { + for (int64_t i = 0; i < num_rows; ++i) { + if (bit_util::GetBit(validity, i)) { + vals_reordered.push_back(vals[i]); + } + } + } else { + vals_reordered.resize(num_rows); + memcpy(vals_reordered.data(), vals, num_rows * sizeof(int64_t)); + } + + Quantiles median; + int numerator = 1; + int denominator = 2; + median.num_quantiles = 1; + median.numerators = &numerator; + median.denominators = &denominator; + int64_t median_l; + int64_t median_r; + int ignored; + ForEntirePartitionNoNullsInPlace(num_rows - num_nulls, vals_reordered.data(), median, + &median_l, &median_r, &ignored); + + std::vector absolute_differences(vals_reordered.size()); + for (size_t i = 0; i < vals_reordered.size(); ++i) { + int64_t value = vals_reordered[i]; + if (i < (vals_reordered.size() + 1) / 2) { + ARROW_DCHECK(value <= median_l); + absolute_differences[i] = median_r - value; + } else { + ARROW_DCHECK(value >= median_r); + absolute_differences[i] = value - median_l; + } + // absolute_differences[i] = + // value <= median_l ? (median_r - value) : (value - median_l); + } + + uint64_t mad_l; + uint64_t mad_r; + + if (use_merge_path) { + int64_t length_l = (vals_reordered.size() + 1) / 2; + int64_t length_r = vals_reordered.size() - length_l; + bool first_from_l, second_from_l; + int64_t first_offset, second_offset; + if (vals_reordered.size() % 2 == 1) { + MergeUtil::NthElement( + length_l - 1, length_l, length_r, &first_from_l, &first_offset, + [&](int64_t offset_l, int64_t offset_r) { + int64_t row_number_l = length_l - 1 - offset_l; + int64_t row_number_r = length_l + offset_r; + std::nth_element(vals_reordered.data(), vals_reordered.data() + row_number_l, + vals_reordered.data() + vals_reordered.size()); + std::nth_element(vals_reordered.data(), vals_reordered.data() + row_number_r, + vals_reordered.data() + vals_reordered.size()); + uint64_t left_value = median_r - vals_reordered[row_number_l]; + uint64_t right_value = vals_reordered[row_number_r] - median_l; + return left_value < right_value; + }); + second_from_l = first_from_l; + second_offset = first_offset; + } else { + MergeUtil::NthPair( + length_l - 1, length_l, length_r, &first_from_l, &first_offset, &second_from_l, + &second_offset, [&](int64_t offset_l, int64_t offset_r) { + int64_t row_number_l = length_l - 1 - offset_l; + int64_t row_number_r = length_l + offset_r; + std::nth_element(vals_reordered.data(), vals_reordered.data() + row_number_l, + vals_reordered.data() + vals_reordered.size()); + std::nth_element(vals_reordered.data(), vals_reordered.data() + row_number_r, + vals_reordered.data() + vals_reordered.size()); + uint64_t left_value = median_r - vals_reordered[row_number_l]; + uint64_t right_value = vals_reordered[row_number_r] - median_l; + return left_value < right_value; + }); + } + mad_l = (first_from_l ? median_r - vals_reordered[length_l - 1 - first_offset] + : vals_reordered[length_l + first_offset] - median_l); + mad_r = (second_from_l ? median_r - vals_reordered[length_l - 1 - second_offset] + : vals_reordered[length_l + second_offset] - median_l); + } else { + ForEntirePartitionNoNullsInPlace(num_rows - num_nulls, absolute_differences.data(), + median, &mad_l, &mad_r, &ignored); + } + + *output_value_l = mad_l - (median_r - median_l); + *output_value_r = mad_r; + + return true; +} + +void WindowQuantiles::MADForFramesWithSeparateOrder( + int64_t num_rows, const int64_t* vals, const int64_t* permutation, + const WindowFrames& frames, uint64_t* mad_l, uint64_t* mad_r, int64_t hardware_flags, + util::TempVectorStack* temp_vector_stack) { + // Build pure merge tree - with no data in the nodes + // + MergeTree tree; + BuildMergeTree(num_rows, permutation, &tree, hardware_flags, temp_vector_stack); + + auto nth_value_in_frame = [&](int64_t frame_begin, int64_t frame_end, int64_t n) { + return vals[permutation[tree.NthElement(frame_begin, frame_end, n)]]; + }; + + for (int64_t i = 0; i < num_rows; ++i) { + int64_t frame_begin = frames.begins[0][i]; + int64_t frame_end = frames.ends[0][i]; + int64_t length = frame_end - frame_begin; + ARROW_DCHECK(length > 0); + + int64_t median_l = nth_value_in_frame(frame_begin, frame_end, (length - 1) / 2); + int64_t median_r = (length % 2 == 1) + ? median_l + : nth_value_in_frame(frame_begin, frame_end, (length + 1) / 2); + + int64_t length_l = (length + 1) / 2; + int64_t length_r = length - length_l; + bool first_from_l, second_from_l; + int64_t first_offset, second_offset; + if (length % 2 == 1) { + MergeUtil::NthElement(length_l - 1, length_l, length_r, &first_from_l, + &first_offset, [&](int64_t offset_l, int64_t offset_r) { + int64_t val_l = nth_value_in_frame(frame_begin, frame_end, + length_l - 1 - offset_l); + int64_t val_r = nth_value_in_frame(frame_begin, frame_end, + length_l + offset_r); + uint64_t diff_l = median_r - val_l; + uint64_t diff_r = val_r - median_l; + return diff_l < diff_r; + }); + second_from_l = first_from_l; + second_offset = first_offset; + } else { + MergeUtil::NthPair( + length_l - 1, length_l, length_r, &first_from_l, &first_offset, &second_from_l, + &second_offset, [&](int64_t offset_l, int64_t offset_r) { + int64_t val_l = + nth_value_in_frame(frame_begin, frame_end, length_l - 1 - offset_l); + int64_t val_r = + nth_value_in_frame(frame_begin, frame_end, length_l + offset_r); + uint64_t diff_l = median_r - val_l; + uint64_t diff_r = val_r - median_l; + return diff_l < diff_r; + }); + } + mad_l[i] = + (first_from_l + ? median_r - + nth_value_in_frame(frame_begin, frame_end, length_l - 1 - first_offset) + : nth_value_in_frame(frame_begin, frame_end, length_l + first_offset) - + median_l); + mad_r[i] = + (second_from_l + ? median_r - nth_value_in_frame(frame_begin, frame_end, + length_l - 1 - second_offset) + : nth_value_in_frame(frame_begin, frame_end, length_l + second_offset) - + median_l); + + mad_l[i] -= (median_r - median_l); + } +} + +void WindowQuantiles::ForFramesWithSameOrder( + int64_t num_rows, const int64_t* frame_begins, const int64_t* frame_ends, + const Quantiles& quantiles, + /* pair of left row_number and lerp_nominator, indexed + by quantile id and row number */ + std::vector>>& output) { + int num_quantiles = quantiles.num_quantiles; + output.resize(num_quantiles); + for (int i = 0; i < num_quantiles; ++i) { + output[i].resize(num_rows); + } + for (int64_t row = 0; row < num_rows; ++row) { + int64_t frame_begin = frame_begins[row]; + int64_t frame_end = frame_ends[row]; + if (frame_end == frame_begin) { + for (int quantile = 0; quantile < num_quantiles; ++quantile) { + output[quantile][row] = std::make_pair(-1, -1); + } + } + for (int quantile = 0; quantile < num_quantiles; ++quantile) { + int64_t output_row_number; + int lerp_nominator; + RowNumberFromQuantile(quantiles.numerators[quantile], + quantiles.denominators[quantile], frame_end - frame_begin, + &output_row_number, &lerp_nominator); + output_row_number += frame_begin; + output[quantile][row] = std::make_pair(output_row_number, lerp_nominator); + } + } +} + +void WindowQuantiles::RowNumberFromQuantile(int numerator, int denominator, + int64_t num_rows, int64_t* row_number, + int* lerp_nominator) { + ARROW_DCHECK(numerator >= 0 && numerator <= denominator && denominator > 0); + *row_number = (num_rows - 1) * numerator / denominator; + *lerp_nominator = denominator - (((num_rows - 1) * numerator) % denominator); +} + +bool WindowQuantiles::IsInterpolationNeeded(int interpolation_numerator, + int interpolation_denominator) { + return interpolation_numerator == interpolation_denominator; +} + +bool WindowQuantiles::IsInterpolationNeeded(int64_t num_rows, int quantile_numerator, + int quantile_denominator) { + return (num_rows - 1) * quantile_numerator % quantile_denominator == 0; +} + +void WindowQuantiles::BuildMergeTree(int64_t num_rows, const int64_t* permutation, + MergeTree* tree, int64_t hardware_flags, + util::TempVectorStack* temp_vector_stack) { + // Build inverse permutation + // + std::vector inverse_permutation(num_rows); + for (int64_t i = 0; i < num_rows; ++i) { + inverse_permutation[permutation[i]] = i; + } + + // Build merge bit vectors with counters (bitvec and popcounts) for each + // level of the tree above 0. + // + tree->Build(num_rows, inverse_permutation.data(), 0, hardware_flags, temp_vector_stack); +} + +void WindowQuantiles::QuantileMulti(int64_t batch_begin, int64_t batch_length, + const WindowFrames& frames, int quantile_numerator, + int quantile_denominator, const MergeTree& merge_tree, + std::vector& output_l, + std::vector& output_r, + std::vector& output_lerp, + util::TempVectorStack* temp_vector_stack) { + const int64_t* frame_begins = frames.begins[0]; + const int64_t* frame_ends = frames.ends[0]; + + // Allocate temporary buffers + // + auto ids_buf = util::TempVectorHolder(temp_vector_stack, + static_cast(batch_length)); + uint16_t* ids = ids_buf.mutable_data(); + int num_ids = 0; + auto frames_ns_buf = util::TempVectorHolder( + temp_vector_stack, static_cast(batch_length)); + int64_t* frames_ns = frames_ns_buf.mutable_data(); + + // Filter out empty frames + // + for (int64_t i = 0; i < batch_length; ++i) { + int64_t frame_begin = frame_begins[batch_begin + i]; + int64_t frame_end = frame_ends[batch_begin + i]; + if (frame_end > frame_begin) { + ids[num_ids++] = static_cast(i); + } + } + + // Compute n for nth element based on quantile and frames. + // + for (int64_t i = 0; i < num_ids; ++i) { + uint16_t id = ids[i]; + int64_t frame_number = batch_begin + id; + int64_t frame_begin = frame_begins[frame_number]; + int64_t frame_end = frame_ends[frame_number]; + int64_t row_number; + int lerp_nominator; + RowNumberFromQuantile(quantile_numerator, quantile_denominator, + frame_end - frame_begin, &row_number, &lerp_nominator); + output_lerp[frame_number] = lerp_nominator; + frames_ns[id] = row_number; + } + + // For each frame we may run up to two queries: for finding nth row + // and (n+1)th row in the merge tree sort order within the given frame. + // Depending on the number of rows in the frame and the quantile the + // second row may not be needed. + // + merge_tree.NthElement(num_ids, ids, frame_begins + batch_begin, + frame_ends + batch_begin, frames_ns, + output_l.data() + batch_begin, temp_vector_stack); + + // For the second pass filter out frames for which the quantile is + // determined by a value of a single row (for which the interpolation + // between a pair of rows is not needed). + // + int num_ids_new = 0; + for (int64_t i = 0; i < num_ids; ++i) { + uint16_t id = ids[i]; + int64_t frame_number = batch_begin + id; + if (output_lerp[frame_number] < quantile_denominator) { + ids[num_ids_new++] = id; + ++frames_ns[id]; + } else { + output_r[batch_begin + id] = output_l[batch_begin + id]; + } + } + num_ids = num_ids_new; + + merge_tree.NthElement(num_ids, ids, frame_begins + batch_begin, + frame_ends + batch_begin, frames_ns, + output_r.data() + batch_begin, temp_vector_stack); +} + +// Organize requested list of quantiles in order to minimize the combined +// cost of the sequence of std::nth_element() calls. +// +void WindowQuantiles::SortQuantiles( + const Quantiles& quantiles, int64_t num_rows, + /* Range of rows for which std::nth_element() will be called for + the kth quantile in the given original order */ + int64_t* begins, int64_t* ends, + /* Permutation of given quantiles representing order in which to + call std::nth_element() */ + int* permutation) { + int num_quantiles = quantiles.num_quantiles; + + // Find row numbers corresponding to quantiles and sort quantile ids on + // them. + // + std::vector> row_numbers; + for (int i = 0; i < num_quantiles; ++i) { + int64_t row_number; + int lerp_nominator; + RowNumberFromQuantile(quantiles.numerators[i], quantiles.denominators[i], num_rows, + &row_number, &lerp_nominator); + row_numbers.push_back(std::make_pair(row_number, i)); + } + std::sort(row_numbers.begin(), row_numbers.end()); + + const int left_boundary = num_quantiles; + const int right_boundary = num_quantiles + 1; + row_numbers.push_back(std::make_pair(0LL, left_boundary)); + row_numbers.push_back(std::make_pair(num_rows, right_boundary)); + + // Connect into doubly linked list + // + std::vector prev(num_quantiles + 2); + std::vector next(num_quantiles + 2); + for (int i = 0; i < num_quantiles; ++i) { + prev[i] = i == 0 ? left_boundary : i - 1; + next[i] = i == num_quantiles - 1 ? right_boundary : i + 1; + } + + prev[left_boundary] = left_boundary; + next[left_boundary] = 0; + prev[right_boundary] = num_quantiles - 1; + next[right_boundary] = right_boundary; + + // We look for the quantile with the smallest sum of distances to its + // direct neighbours (or range boundaries if it is the first or the + // last). We remove it and repeat the process for the remaining list of + // quantiles. + // + std::set> ranges; + auto add_range = [&](int i) { + if (i != left_boundary && i != right_boundary) { + ranges.insert( + std::make_pair(row_numbers[next[i]].first - row_numbers[prev[i]].first, i)); + } + }; + auto del_range = [&](int i) { + if (i != left_boundary && i != right_boundary) { + ranges.erase(ranges.find( + std::make_pair(row_numbers[next[i]].first - row_numbers[prev[i]].first, i))); + } + }; + for (int i = 0; i < num_quantiles; ++i) { + add_range(i); + } + for (int output_pos = 0; output_pos < num_quantiles; ++output_pos) { + int i = ranges.begin()->second; + int id = row_numbers[i].second; + + permutation[output_pos] = id; + begins[id] = row_numbers[prev[i]].first; + ends[id] = row_numbers[next[i]].first; + + ranges.erase(ranges.begin()); + del_range(prev[i]); + del_range(next[i]); + next[prev[i]] = next[i]; + prev[next[i]] = prev[i]; + add_range(prev[i]); + add_range(next[i]); + } + std::reverse(permutation, permutation + num_quantiles); +} + +bool WindowQuantilesBasic::ForEntirePartition(int64_t num_rows, const int64_t* vals, + const uint8_t* validity, + const WindowQuantiles::Quantiles& quantiles, + int64_t* output_l, int64_t* output_r, + int* output_lerp_nominator) { + std::vector vals_sorted; + for (int64_t i = 0; i < num_rows; ++i) { + if (bit_util::GetBit(validity, i)) { + vals_sorted.push_back(vals[i]); + } + } + if (vals_sorted.empty()) { + return false; + } + + std::sort(vals_sorted.begin(), vals_sorted.end()); + + int64_t num_vals_sorted = static_cast(vals_sorted.size()); + for (int quantile = 0; quantile < quantiles.num_quantiles; ++quantile) { + int numerator = quantiles.numerators[quantile]; + int denominator = quantiles.denominators[quantile]; + int64_t id_l = (num_vals_sorted - 1) * numerator / denominator; + int64_t id_r = id_l + 1; + + output_l[quantile] = vals_sorted[id_l]; + output_lerp_nominator[quantile] = static_cast( + denominator - ((num_vals_sorted - 1) * numerator - id_l * denominator)); + output_r[quantile] = output_lerp_nominator[quantile] == denominator + ? vals_sorted[id_l] + : vals_sorted[id_r]; + } + + return true; +} + +bool WindowQuantilesBasic::MADForEntirePartition(int64_t num_rows, const int64_t* vals, + const uint8_t* optional_validity, + uint64_t* output_value_l, + uint64_t* output_value_r) { + std::vector vals_sorted; + if (optional_validity) { + for (int64_t i = 0; i < num_rows; ++i) { + if (bit_util::GetBit(optional_validity, i)) { + vals_sorted.push_back(vals[i]); + } + } + } else { + vals_sorted.resize(num_rows); + memcpy(vals_sorted.data(), vals, num_rows * sizeof(int64_t)); + } + if (vals_sorted.empty()) { + return false; + } + + std::sort(vals_sorted.begin(), vals_sorted.end()); + + int64_t num_vals_sorted = static_cast(vals_sorted.size()); + int64_t median_l, median_r; + median_l = vals_sorted[(num_vals_sorted - 1) / 2]; + median_r = vals_sorted[num_vals_sorted / 2]; + + std::vector absolute_differences; + for (int64_t i = 0; i < num_vals_sorted; ++i) { + int64_t value = vals_sorted[i]; + if (value <= median_l) { + absolute_differences.push_back(median_r - value); + } else { + absolute_differences.push_back(value - median_l); + } + } + + std::sort(absolute_differences.begin(), absolute_differences.end()); + + *output_value_l = + absolute_differences[(num_vals_sorted - 1) / 2] - (median_r - median_l); + *output_value_r = absolute_differences[num_vals_sorted / 2]; + + return true; +} + +void WindowQuantilesBasic::MADForFramesWithSeparateOrder( + int64_t num_rows, const int64_t* vals, const int64_t* permutation, + const WindowFrames& frames, uint64_t* mad_l, uint64_t* mad_r) { + const int64_t* frame_begins = frames.begins[0]; + const int64_t* frame_ends = frames.ends[0]; + for (int64_t i = 0; i < num_rows; ++i) { + ARROW_DCHECK(frame_ends[i] > frame_begins[i]); + MADForEntirePartition(frame_ends[i] - frame_begins[i], vals + frame_begins[i], + nullptr, &mad_l[i], &mad_r[i]); + } +} + +// For empty frames the result is undefined. +// +void WindowQuantilesBasic::ForFramesWithSeparateOrder( + int64_t num_rows, const int64_t* permutation, + const WindowQuantiles::Quantiles& quantiles, const WindowFrames& frames, + std::vector>& output_l, + std::vector>& output_r, + std::vector>& output_lerp) { + const int64_t* frame_begins = frames.begins[0]; + const int64_t* frame_ends = frames.ends[0]; + + std::vector rank_of_row(num_rows); + for (int64_t i = 0; i < num_rows; ++i) { + rank_of_row[permutation[i]] = i; + } + for (int quantile = 0; quantile < quantiles.num_quantiles; ++quantile) { + for (int64_t frame = 0; frame < num_rows; ++frame) { + int64_t begin = frame_begins[frame]; + int64_t end = frame_ends[frame]; + int64_t num_values_frame = end - begin; + if (num_values_frame == 0) { + // The case of an empty frame + continue; + } + // Each pair is: {rank, row_number}. + std::vector> frame_values; + for (int64_t i = begin; i < end; ++i) { + frame_values.push_back(std::make_pair(rank_of_row[i], i)); + } + std::sort(frame_values.begin(), frame_values.end()); + + int numerator = quantiles.numerators[quantile]; + int denominator = quantiles.denominators[quantile]; + int64_t id_l = (end - begin - 1) * numerator / denominator; + + output_l[quantile][frame] = output_r[quantile][frame] = frame_values[id_l].second; + output_lerp[quantile][frame] = static_cast( + denominator - ((num_values_frame - 1) * numerator - id_l * denominator)); + if (output_lerp[quantile][frame] < denominator) { + output_r[quantile][frame] = frame_values[id_l + 1].second; + } + } + } +} + +void WindowQuantilesTests::TestQuantilesForEntirePartition() { + Random64BitCopy rand; + MemoryPool* pool = default_memory_pool(); + util::TempVectorStack temp_vector_stack; + Status status = temp_vector_stack.Init(pool, 128 * util::MiniBatch::kMiniBatchLength); + ARROW_DCHECK(status.ok()); + + constexpr int num_tests = 100; + const int num_tests_to_skip = 0; + for (int test = 0; test < num_tests; ++test) { + // Generate random quantile queries + // + constexpr int max_quantiles = 10; + WindowQuantiles::Quantiles quantiles; + quantiles.num_quantiles = rand.from_range(1, max_quantiles); + std::vector numerators(quantiles.num_quantiles); + std::vector denominators(quantiles.num_quantiles); + quantiles.numerators = numerators.data(); + quantiles.denominators = denominators.data(); + for (int i = 0; i < quantiles.num_quantiles; ++i) { + denominators[i] = rand.from_range(1, 100); + numerators[i] = rand.from_range(0, denominators[i]); + } + + // Generate random values + // + constexpr int64_t max_rows = 1100; + int64_t num_rows = rand.from_range(static_cast(1), max_rows); + std::vector validity(bit_util::BytesForBits(num_rows)); + std::vector vals(num_rows); + constexpr int64_t max_val = 65535; + // TODO: test with nulls + int null_probability = 0; // rand.from_range(0, 256); + int tie_probability = rand.from_range(0, 256); + for (int64_t i = 0; i < num_rows; ++i) { + bool null = rand.from_range(0, 255) < null_probability; + bool tie = rand.from_range(0, 255) < tie_probability; + if (null) { + bit_util::ClearBit(validity.data(), i); + } else { + bit_util::SetBit(validity.data(), i); + if (tie && i > 0) { + vals[i] = vals[rand.from_range(static_cast(0), i - 1)]; + } else { + vals[i] = rand.from_range(static_cast(0), max_val); + } + } + } + std::vector vals_sorted(num_rows); + for (int64_t i = 0; i < num_rows; ++i) { + vals_sorted[i] = vals[i]; + } + std::sort(vals_sorted.begin(), vals_sorted.end()); + + printf("num_quantiles %d num_rows %d ", quantiles.num_quantiles, + static_cast(num_rows)); + + if (test < num_tests_to_skip) { + continue; + } + + bool out_flag[2]; + std::vector out_l[2]; + std::vector out_r[2]; + std::vector out_lerp[2]; + for (int i = 0; i < 2; ++i) { + out_l[i].resize(quantiles.num_quantiles); + out_r[i].resize(quantiles.num_quantiles); + out_lerp[i].resize(quantiles.num_quantiles); + } + int64_t num_repeats; +#ifndef NDEBUG + num_repeats = 1; +#else + num_repeats = std::max(1LL, 1024 * 1024LL / num_rows); +#endif + printf("num_repeats %d ", static_cast(num_repeats)); + // int64_t start = __rdtsc(); + for (int repeat = 0; repeat < num_repeats; ++repeat) { + out_flag[0] = WindowQuantilesBasic::ForEntirePartition( + num_rows, vals.data(), validity.data(), quantiles, out_l[0].data(), + out_r[0].data(), out_lerp[0].data()); + } + // int64_t end = __rdtsc(); + // printf("cpr basic %.1f ", + // static_cast(end - start) / static_cast(num_rows * + // num_repeats)); + // start = __rdtsc(); + for (int repeat = 0; repeat < num_repeats; ++repeat) { + out_flag[1] = WindowQuantiles::ForEntirePartition( + num_rows, vals.data(), validity.data(), quantiles, out_l[1].data(), + out_r[1].data(), out_lerp[1].data()); + } + // end = __rdtsc(); + // printf("cpr normal %.1f ", + // static_cast(end - start) / static_cast(num_rows * + // num_repeats)); + + bool ok = true; + if (out_flag[0] != out_flag[1]) { + ARROW_DCHECK(false); + ok = false; + } + for (int i = 0; i < quantiles.num_quantiles; ++i) { + if (out_l[0] != out_l[1]) { + ARROW_DCHECK(false); + ok = false; + } + if (out_r[0] != out_r[1]) { + ARROW_DCHECK(false); + ok = false; + } + if (out_lerp[0] != out_lerp[1]) { + ARROW_DCHECK(false); + ok = false; + } + } + printf("%s\n", ok ? "correct" : "wrong"); + } +} + +void WindowQuantilesTests::TestQuantilesForFramesWithSeparateOrder() { + Random64BitCopy rand; + MemoryPool* pool = default_memory_pool(); + util::TempVectorStack temp_vector_stack; + Status status = temp_vector_stack.Init(pool, 128 * util::MiniBatch::kMiniBatchLength); + ARROW_DCHECK(status.ok()); + int64_t hardware_flags = 0LL; + + constexpr int num_tests = 100; + const int num_tests_to_skip = 1; + for (int test = 0; test < num_tests; ++test) { + // Generate random quantile queries + // + constexpr int max_quantiles = 10; + WindowQuantiles::Quantiles quantiles; + quantiles.num_quantiles = rand.from_range(1, max_quantiles); + std::vector numerators(quantiles.num_quantiles); + std::vector denominators(quantiles.num_quantiles); + quantiles.numerators = numerators.data(); + quantiles.denominators = denominators.data(); + for (int i = 0; i < quantiles.num_quantiles; ++i) { + denominators[i] = rand.from_range(1, 100); + numerators[i] = rand.from_range(0, denominators[i]); + } + + // Generate random values + // + constexpr int64_t max_rows = 1100; + int64_t num_rows = rand.from_range(static_cast(1), max_rows); + std::vector vals(num_rows); + constexpr int64_t max_val = 65535; + int tie_probability = rand.from_range(0, 256); + for (int64_t i = 0; i < num_rows; ++i) { + bool tie = rand.from_range(0, 255) < tie_probability; + if (tie && i > 0) { + vals[i] = vals[rand.from_range(static_cast(0), i - 1)]; + } else { + vals[i] = rand.from_range(static_cast(0), max_val); + } + } + + // Sort on values + // + std::vector> vals_sorted(num_rows); + for (int64_t i = 0; i < num_rows; ++i) { + vals_sorted[i] = std::make_pair(vals[i], i); + } + std::sort(vals_sorted.begin(), vals_sorted.end()); + std::vector permutation(num_rows); + for (int64_t i = 0; i < num_rows; ++i) { + permutation[i] = vals_sorted[i].second; + } + + // Generate random frames + // + constexpr int64_t max_frame_length = 100; + std::vector begins(num_rows); + std::vector ends(num_rows); + WindowFrames frames; + frames.num_frames = num_rows; + frames.begins[0] = begins.data(); + frames.ends[0] = ends.data(); + int64_t sum_frame_length = 0LL; + for (int64_t i = 0; i < num_rows; ++i) { + int64_t frame_length = + rand.from_range(static_cast(0), std::min(num_rows, max_frame_length)); + begins[i] = rand.from_range(static_cast(0), num_rows - frame_length); + ends[i] = begins[i] + frame_length; + sum_frame_length += frame_length; + } + + printf("num_quantiles %d num_rows %d avg_frame_length %.1f ", quantiles.num_quantiles, + static_cast(num_rows), + static_cast(sum_frame_length) / static_cast(num_rows)); + + if (test < num_tests_to_skip) { + continue; + } + + std::vector> out_l[2]; + std::vector> out_r[2]; + std::vector> out_lerp[2]; + for (int i = 0; i < 2; ++i) { + out_l[i].resize(quantiles.num_quantiles); + out_r[i].resize(quantiles.num_quantiles); + out_lerp[i].resize(quantiles.num_quantiles); + for (int j = 0; j < quantiles.num_quantiles; ++j) { + out_l[i][j].resize(num_rows); + out_r[i][j].resize(num_rows); + out_lerp[i][j].resize(num_rows); + } + } + + int64_t num_repeats; +#ifndef NDEBUG + num_repeats = 1; +#else + num_repeats = std::max(1LL, 1024 * 1024LL / num_rows); +#endif + printf("num_repeats %d ", static_cast(num_repeats)); + + // int64_t start = __rdtsc(); + for (int repeat = 0; repeat < num_repeats; ++repeat) { + WindowQuantilesBasic::ForFramesWithSeparateOrder(num_rows, permutation.data(), + quantiles, frames, out_l[0], + out_r[0], out_lerp[0]); + } + // int64_t end = __rdtsc(); + // printf("cpr basic %.1f ", + // static_cast(end - start) / static_cast(num_rows * + // num_repeats)); + // start = __rdtsc(); + for (int repeat = 0; repeat < num_repeats; ++repeat) { + WindowQuantiles::ForFramesWithSeparateOrder(num_rows, permutation.data(), quantiles, + frames, out_l[1], out_r[1], out_lerp[1], + hardware_flags, &temp_vector_stack); + } + // end = __rdtsc(); + // printf("cpr normal %.1f ", + // static_cast(end - start) / static_cast(num_rows * + // num_repeats)); + + bool ok = true; + for (int i = 0; i < quantiles.num_quantiles; ++i) { + for (int64_t j = 0; j < num_rows; ++j) { + if (frames.begins[0][j] == frames.ends[0][j]) { + continue; + } + if (out_l[0][i][j] != out_l[1][i][j] || out_r[0][i][j] != out_r[1][i][j]) { + ARROW_DCHECK(false); + ok = false; + } + if (out_lerp[0][i][j] != out_lerp[1][i][j]) { + ARROW_DCHECK(false); + ok = false; + } + } + } + printf("%s\n", ok ? "correct" : "wrong"); + } +} + +void WindowQuantilesTests::TestMADForEntirePartition() { + Random64BitCopy rand; + MemoryPool* pool = default_memory_pool(); + util::TempVectorStack temp_vector_stack; + Status status = temp_vector_stack.Init(pool, 128 * util::MiniBatch::kMiniBatchLength); + ARROW_DCHECK(status.ok()); + + constexpr int num_tests = 100; + const int num_tests_to_skip = 1; + for (int test = 0; test < num_tests; ++test) { + // Generate random values + // + constexpr int64_t max_rows = 1100; + int64_t num_rows = rand.from_range(static_cast(1), max_rows); + std::vector vals(num_rows); + constexpr int64_t max_val = 65535; + int tie_probability = rand.from_range(0, 256); + for (int64_t i = 0; i < num_rows; ++i) { + bool tie = rand.from_range(0, 255) < tie_probability; + if (tie && i > 0) { + vals[i] = vals[rand.from_range(static_cast(0), i - 1)]; + } else { + vals[i] = rand.from_range(static_cast(0), max_val); + } + } + std::vector vals_sorted(num_rows); + for (int64_t i = 0; i < num_rows; ++i) { + vals_sorted[i] = vals[i]; + } + std::sort(vals_sorted.begin(), vals_sorted.end()); + + printf("num_rows %d ", static_cast(num_rows)); + + if (test < num_tests_to_skip) { + continue; + } + + uint64_t mad_l[3]; + uint64_t mad_r[3]; + + int64_t num_repeats; +#ifndef NDEBUG + num_repeats = 1; +#else + num_repeats = std::max(1LL, 1024 * 1024LL / num_rows); +#endif + printf("num_repeats %d ", static_cast(num_repeats)); + + // int64_t start = __rdtsc(); + for (int repeat = 0; repeat < num_repeats; ++repeat) { + WindowQuantilesBasic::MADForEntirePartition(num_rows, vals.data(), nullptr, + &mad_l[0], &mad_r[0]); + } + // int64_t end = __rdtsc(); + // printf("cpr basic %.1f ", + // static_cast(end - start) / static_cast(num_rows * + // num_repeats)); + + for (bool use_merge_path : {false, true}) { + // start = __rdtsc(); + for (int repeat = 0; repeat < num_repeats; ++repeat) { + WindowQuantiles::MADForEntirePartition( + num_rows, vals.data(), nullptr, &mad_l[use_merge_path ? 2 : 1], + &mad_r[use_merge_path ? 2 : 1], use_merge_path); + } + // end = __rdtsc(); + // printf( + // "cpr normal %s %.1f ", use_merge_path ? "merge_path" : "", + // static_cast(end - start) / static_cast(num_rows * + // num_repeats)); + } + + bool ok = true; + for (int i = 1; i < 3; ++i) { + if (mad_l[0] != mad_l[i]) { + ARROW_DCHECK(false); + ok = false; + } + if (mad_r[0] != mad_r[i]) { + ARROW_DCHECK(false); + ok = false; + } + } + printf("%s\n", ok ? "correct" : "wrong"); + } +} + +void WindowQuantilesTests::TestMADForFramesWithSeparateOrder() { + Random64BitCopy rand; + MemoryPool* pool = default_memory_pool(); + util::TempVectorStack temp_vector_stack; + Status status = temp_vector_stack.Init(pool, 128 * util::MiniBatch::kMiniBatchLength); + ARROW_DCHECK(status.ok()); + int64_t hardware_flags = 0LL; + + constexpr int num_tests = 100; + const int num_tests_to_skip = 1; + for (int test = 0; test < num_tests; ++test) { + // Generate random values + // + constexpr int64_t max_rows = 1100; + int64_t num_rows = rand.from_range(static_cast(1), max_rows); + std::vector vals(num_rows); + constexpr int64_t max_val = 65535; + int tie_probability = rand.from_range(0, 256); + for (int64_t i = 0; i < num_rows; ++i) { + bool tie = rand.from_range(0, 255) < tie_probability; + if (tie && i > 0) { + vals[i] = vals[rand.from_range(static_cast(0), i - 1)]; + } else { + vals[i] = rand.from_range(static_cast(0), max_val); + } + } + + // Sort on values + // + std::vector> vals_sorted(num_rows); + for (int64_t i = 0; i < num_rows; ++i) { + vals_sorted[i] = std::make_pair(vals[i], i); + } + std::sort(vals_sorted.begin(), vals_sorted.end()); + std::vector permutation(num_rows); + for (int64_t i = 0; i < num_rows; ++i) { + permutation[i] = vals_sorted[i].second; + } + + // Generate random frames + // + constexpr int64_t max_frame_length = 100; + std::vector begins(num_rows); + std::vector ends(num_rows); + WindowFrames frames; + frames.num_frames = num_rows; + frames.begins[0] = begins.data(); + frames.ends[0] = ends.data(); + int64_t sum_frame_length = 0LL; + for (int64_t i = 0; i < num_rows; ++i) { + int64_t frame_length = + rand.from_range(static_cast(1), std::min(num_rows, max_frame_length)); + begins[i] = rand.from_range(static_cast(0), num_rows - frame_length); + ends[i] = begins[i] + frame_length; + sum_frame_length += frame_length; + } + + printf("num_rows %d avg_frame_length %.1f ", static_cast(num_rows), + static_cast(sum_frame_length) / static_cast(num_rows)); + + if (test < num_tests_to_skip) { + continue; + } + + std::vector mad_l[2]; + std::vector mad_r[2]; + for (int i = 0; i < 2; ++i) { + mad_l[i].resize(num_rows); + mad_r[i].resize(num_rows); + } + + int64_t num_repeats; +#ifndef NDEBUG + num_repeats = 1; +#else + num_repeats = std::max(1LL, 1024 * 1024LL / num_rows); +#endif + printf("num_repeats %d ", static_cast(num_repeats)); + + // int64_t start = __rdtsc(); + for (int repeat = 0; repeat < num_repeats; ++repeat) { + WindowQuantilesBasic::MADForFramesWithSeparateOrder( + num_rows, vals.data(), permutation.data(), frames, mad_l[0].data(), + mad_r[0].data()); + } + // int64_t end = __rdtsc(); + // printf("cpr basic %.1f ", + // static_cast(end - start) / static_cast(num_rows * + // num_repeats)); + // start = __rdtsc(); + for (int repeat = 0; repeat < num_repeats; ++repeat) { + WindowQuantiles::MADForFramesWithSeparateOrder( + num_rows, vals.data(), permutation.data(), frames, mad_l[1].data(), + mad_r[1].data(), hardware_flags, &temp_vector_stack); + } + // end = __rdtsc(); + // printf("cpr normal %.1f ", + // static_cast(end - start) / static_cast(num_rows * + // num_repeats)); + + bool ok = true; + for (int64_t i = 0; i < num_rows; ++i) { + if (mad_l[0][i] != mad_l[1][i] || mad_r[0][i] != mad_r[1][i]) { + ARROW_DCHECK(false); + ok = false; + } + } + printf("%s\n", ok ? "correct" : "wrong"); + } +} + +} // namespace compute +} // namespace arrow diff --git a/cpp/src/arrow/compute/exec/window_functions/window_quantiles.h b/cpp/src/arrow/compute/exec/window_functions/window_quantiles.h new file mode 100644 index 00000000000..5bf12bb099a --- /dev/null +++ b/cpp/src/arrow/compute/exec/window_functions/window_quantiles.h @@ -0,0 +1,219 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include +#include "arrow/compute/exec/util.h" +#include "arrow/compute/exec/window_functions/bit_vector_navigator.h" +#include "arrow/compute/exec/window_functions/merge_path.h" +#include "arrow/compute/exec/window_functions/merge_tree.h" +#include "arrow/compute/exec/window_functions/window_frame.h" +#include "arrow/util/bitmap_ops.h" + +namespace arrow { +namespace compute { + +class WindowQuantiles { + public: + struct Quantiles { + int num_quantiles; + const int* numerators; + const int* denominators; + }; + + static void ForFramesWithSeparateOrder( + int64_t num_rows, const int64_t* permutation, const Quantiles& quantiles, + const WindowFrames& frames, + // TODO: output a pair of vectors instead of vector of pairs + std::vector>& output_l, + std::vector>& output_r, + std::vector>& output_lerp, int64_t hardware_flags, + util::TempVectorStack* temp_vector_stack); + + // Nulls are filtered out before computation. + template + static bool ForEntirePartition(int64_t num_rows, const T* vals, const uint8_t* validity, + const Quantiles& quantiles, T* output_l, T* output_r, + int* output_lerp_nominator) { + ARROW_DCHECK(num_rows > 0); + + int64_t num_nulls = + validity ? num_rows - arrow::internal::CountSetBits(validity, /*offset=*/0, + static_cast(num_rows)) + : 0LL; + if (num_nulls == num_rows) { + return false; + } + + // Filter out nulls + std::vector vals_reordered; + if (validity && num_nulls > 0) { + for (int64_t i = 0; i < num_rows; ++i) { + if (bit_util::GetBit(validity, i)) { + vals_reordered.push_back(vals[i]); + } + } + } else { + vals_reordered.resize(num_rows); + memcpy(vals_reordered.data(), vals, num_rows * sizeof(T)); + } + + ForEntirePartitionNoNullsInPlace(num_rows - num_nulls, vals_reordered.data(), + quantiles, output_l, output_r, + output_lerp_nominator); + + return true; + } + + template + static void ForEntirePartitionNoNullsInPlace(int64_t num_rows, T* vals, + const Quantiles& quantiles, T* output_l, + T* output_r, int* output_lerp_nominator) { + ARROW_DCHECK(num_rows > 0); + + int num_quantiles = quantiles.num_quantiles; + std::vector begins(num_quantiles); + std::vector ends(num_quantiles); + std::vector permutation(num_quantiles); + SortQuantiles(quantiles, num_rows, begins.data(), ends.data(), permutation.data()); + + for (int i = 0; i < num_quantiles; ++i) { + int id = permutation[i]; + int64_t begin = begins[id]; + int64_t end = ends[id]; + int64_t row_number; + int lerp_nominator; + RowNumberFromQuantile(quantiles.numerators[id], quantiles.denominators[id], + num_rows, &row_number, &lerp_nominator); + + if (end - begin > 1 && row_number >= begin && row_number < end) { + std::nth_element(vals + begin, vals + row_number, vals + end); + if (lerp_nominator < quantiles.denominators[id] && row_number + 1 < end) { + int64_t min_row_number = + (std::min_element(vals + row_number + 1, vals + num_rows) - vals); + if (min_row_number != row_number + 1) { + std::swap(vals[row_number + 1], vals[min_row_number]); + } + } + } + + output_l[id] = vals[row_number]; + output_r[id] = (lerp_nominator < quantiles.denominators[id] ? vals[row_number + 1] + : vals[row_number]); + output_lerp_nominator[id] = lerp_nominator; + } + } + + // The result is the average of *output_value_l and *output_value_r. + // + static bool MADForEntirePartition(int64_t num_rows, const int64_t* vals, + const uint8_t* validity, uint64_t* output_value_l, + uint64_t* output_value_r, + bool use_merge_path = false); + + static void MADForFramesWithSeparateOrder(int64_t num_rows, const int64_t* vals, + const int64_t* permutation, + const WindowFrames& frames, uint64_t* mad_l, + uint64_t* mad_r, int64_t hardware_flags, + util::TempVectorStack* temp_vector_stack); + + static void ForFramesWithSameOrder( + int64_t num_rows, const int64_t* frame_begins, const int64_t* frame_ends, + const Quantiles& quantiles, + /* pair of left row_number and lerp_nominator, indexed + by quantile id and row number */ + std::vector>>& output); + + private: + static void RowNumberFromQuantile(int numerator, int denominator, int64_t num_rows, + int64_t* row_number, int* lerp_nominator); + + static bool IsInterpolationNeeded(int interpolation_numerator, + int interpolation_denominator); + + static bool IsInterpolationNeeded(int64_t num_rows, int quantile_numerator, + int quantile_denominator); + + static void BuildMergeTree(int64_t num_rows, const int64_t* permutation, + MergeTree* tree, int64_t hardware_flags, + util::TempVectorStack* temp_vector_stack); + + static void QuantileMulti(int64_t batch_begin, int64_t batch_length, + const WindowFrames& frames, int quantile_numerator, + int quantile_denominator, const MergeTree& merge_tree, + std::vector& output_l, + std::vector& output_r, std::vector& output_lerp, + util::TempVectorStack* temp_vector_stack); + + // Organize requested list of quantiles in order to minimize the combined + // cost of the sequence of std::nth_element() calls. + // + static void SortQuantiles(const Quantiles& quantiles, int64_t num_rows, + /* Range of rows for which std::nth_element() will be called + for the kth quantile in the given original order */ + int64_t* begins, int64_t* ends, + /* Permutation of given quantiles representing order in which + to call std::nth_element() */ + int* permutation); +}; + +class WindowQuantilesBasic { + public: + static bool ForEntirePartition(int64_t num_rows, const int64_t* vals, + const uint8_t* validity, + const WindowQuantiles::Quantiles& quantiles, + int64_t* output_l, int64_t* output_r, + int* output_lerp_nominator); + + static bool MADForEntirePartition(int64_t num_rows, const int64_t* vals, + const uint8_t* optional_validity, + uint64_t* output_value_l, uint64_t* output_value_r); + + static void MADForFramesWithSeparateOrder(int64_t num_rows, const int64_t* vals, + const int64_t* permutation, + const WindowFrames& frames, uint64_t* mad_l, + uint64_t* mad_r); + + // For empty frames the result is undefined. + // + static void ForFramesWithSeparateOrder(int64_t num_rows, const int64_t* permutation, + const WindowQuantiles::Quantiles& quantiles, + const WindowFrames& frames, + std::vector>& output_l, + std::vector>& output_r, + std::vector>& output_lerp); +}; + +// TODO: For small number of rows (e.g. less than 100) and entire partition +// quantiles sorting can be faster than nth_element. +// + +class WindowQuantilesTests { + public: + static void TestQuantilesForEntirePartition(); + + static void TestQuantilesForFramesWithSeparateOrder(); + + static void TestMADForEntirePartition(); + + static void TestMADForFramesWithSeparateOrder(); +}; + +} // namespace compute +} // namespace arrow