From 844e9aa974e7abff6f5822dbfb60c3841d62f0df Mon Sep 17 00:00:00 2001 From: michalursa Date: Mon, 5 Sep 2022 23:55:33 -0700 Subject: [PATCH] Window Functions adding helper class for computing window frames --- cpp/src/arrow/CMakeLists.txt | 2 + cpp/src/arrow/compute/exec/util.h | 18 + .../window_functions/bit_vector_navigator.cc | 122 +++++ .../window_functions/bit_vector_navigator.h | 158 +++++++ .../exec/window_functions/window_frame.cc | 438 ++++++++++++++++++ .../exec/window_functions/window_frame.h | 244 ++++++++++ 6 files changed, 982 insertions(+) create mode 100644 cpp/src/arrow/compute/exec/window_functions/bit_vector_navigator.cc create mode 100644 cpp/src/arrow/compute/exec/window_functions/bit_vector_navigator.h create mode 100644 cpp/src/arrow/compute/exec/window_functions/window_frame.cc create mode 100644 cpp/src/arrow/compute/exec/window_functions/window_frame.h diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt index 2cae7a731f9..dceab311025 100644 --- a/cpp/src/arrow/CMakeLists.txt +++ b/cpp/src/arrow/CMakeLists.txt @@ -408,6 +408,8 @@ if(ARROW_COMPUTE) compute/exec/tpch_node.cc compute/exec/union_node.cc compute/exec/util.cc + compute/exec/window_functions/bit_vector_navigator.cc + compute/exec/window_functions/window_frame.cc compute/function.cc compute/function_internal.cc compute/kernel.cc diff --git a/cpp/src/arrow/compute/exec/util.h b/cpp/src/arrow/compute/exec/util.h index 7e716808fa0..210bfae7e8c 100644 --- a/cpp/src/arrow/compute/exec/util.h +++ b/cpp/src/arrow/compute/exec/util.h @@ -19,6 +19,7 @@ #include #include +#include #include #include #include @@ -361,5 +362,22 @@ struct ARROW_EXPORT TableSinkNodeConsumer : public SinkNodeConsumer { util::Mutex consume_mutex_; }; +class Random64BitCopy { + public: + Random64BitCopy() : rs{0, 0, 0, 0, 0, 0, 0, 0}, re(rs) {} + uint64_t next() { return rdist(re); } + template + inline T from_range(const T& min_val, const T& max_val) { + return static_cast(min_val + (next() % (max_val - min_val + 1))); + } + std::mt19937& get_engine() { return re; } + + private: + std::random_device rd; + std::seed_seq rs; + std::mt19937 re; + std::uniform_int_distribution rdist; +}; + } // namespace compute } // namespace arrow diff --git a/cpp/src/arrow/compute/exec/window_functions/bit_vector_navigator.cc b/cpp/src/arrow/compute/exec/window_functions/bit_vector_navigator.cc new file mode 100644 index 00000000000..104d952d6fc --- /dev/null +++ b/cpp/src/arrow/compute/exec/window_functions/bit_vector_navigator.cc @@ -0,0 +1,122 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/compute/exec/window_functions/bit_vector_navigator.h" +#include +#include "arrow/compute/exec/util.h" + +namespace arrow { +namespace compute { + +void BitVectorNavigator::SelectsForRangeOfRanks( + int64_t rank_begin, int64_t rank_end, int64_t num_bits, const uint64_t* bitvec, + const uint64_t* popcounts, int64_t* outputs, int64_t hardware_flags, + util::TempVectorStack* temp_vector_stack) { + ARROW_DCHECK(rank_begin <= rank_end); + if (rank_begin == rank_end) { + return; + } + int64_t popcount_all = PopCount(num_bits, bitvec, popcounts); + if (rank_end <= 0LL) { + for (int64_t i = 0LL; i < rank_end - rank_begin; ++i) { + outputs[i] = -1LL; + } + return; + } + if (rank_begin >= popcount_all) { + for (int64_t i = 0LL; i < rank_end - rank_begin; ++i) { + outputs[i] = num_bits; + } + return; + } + if (rank_begin < 0LL) { + for (int64_t i = 0LL; i < -rank_begin; ++i) { + outputs[i] = -1LL; + } + outputs += -rank_begin; + rank_begin = 0LL; + } + if (rank_end > popcount_all) { + for (int64_t i = popcount_all - rank_begin; i < rank_end - rank_begin; ++i) { + outputs[i] = num_bits; + } + rank_end = popcount_all; + } + + int64_t minibatch_length_max = util::MiniBatch::kMiniBatchLength; + auto indexes = util::TempVectorHolder( + temp_vector_stack, static_cast(minibatch_length_max)); + int num_indexes; + + int64_t first_select = + BitVectorNavigator::Select(rank_begin, num_bits, bitvec, popcounts); + int64_t last_select = + BitVectorNavigator::Select(rank_begin, num_bits, bitvec, popcounts); + + for (int64_t minibatch_begin = first_select; minibatch_begin < last_select + 1; + minibatch_begin += minibatch_length_max) { + int64_t minibatch_end = + std::min(last_select + 1, minibatch_begin + minibatch_length_max); + util::bit_util::bits_to_indexes( + /*bit_to_search=*/1, hardware_flags, + static_cast(minibatch_end - minibatch_begin), + reinterpret_cast(bitvec), &num_indexes, indexes.mutable_data(), + static_cast(minibatch_begin)); + for (int i = 0; i < num_indexes; ++i) { + outputs[i] = minibatch_begin + indexes.mutable_data()[i]; + } + outputs += num_indexes; + } +} + +void BitVectorNavigator::SelectsForRelativeRanksForRangeOfRows( + int64_t batch_begin, int64_t batch_end, int64_t rank_delta, int64_t num_rows, + const uint64_t* ties_bitvec, const uint64_t* ties_popcounts, int64_t* outputs, + int64_t hardware_flags, util::TempVectorStack* temp_vector_stack) { + // Break into mini-batches + int64_t minibatch_length_max = util::MiniBatch::kMiniBatchLength; + auto selects_for_ranks_buf = util::TempVectorHolder( + temp_vector_stack, static_cast(minibatch_length_max)); + auto selects_for_ranks = selects_for_ranks_buf.mutable_data(); + for (int64_t minibatch_begin = batch_begin; minibatch_begin < batch_end; + minibatch_begin += minibatch_length_max) { + int64_t minibatch_end = std::min(batch_end, minibatch_begin + minibatch_length_max); + + // First and last rank that we are interested in + int64_t first_rank = + BitVectorNavigator::RankNext(minibatch_begin, ties_bitvec, ties_popcounts) - 1LL; + int64_t last_rank = + BitVectorNavigator::RankNext(minibatch_end - 1, ties_bitvec, ties_popcounts) - + 1LL; + + // Do select for each rank in the calculated range. + // + BitVectorNavigator::SelectsForRangeOfRanks( + first_rank + rank_delta, last_rank + rank_delta + 1, num_rows, ties_bitvec, + ties_popcounts, selects_for_ranks, hardware_flags, temp_vector_stack); + + int irank = 0; + outputs[minibatch_begin - batch_begin] = selects_for_ranks[irank]; + for (int64_t i = minibatch_begin + 1; i < minibatch_end; ++i) { + irank += bit_util::GetBit(reinterpret_cast(ties_bitvec), i) ? 1 : 0; + outputs[minibatch_begin - batch_begin] = selects_for_ranks[irank]; + } + } +} + +} // namespace compute +} // namespace arrow diff --git a/cpp/src/arrow/compute/exec/window_functions/bit_vector_navigator.h b/cpp/src/arrow/compute/exec/window_functions/bit_vector_navigator.h new file mode 100644 index 00000000000..ba62e21ea12 --- /dev/null +++ b/cpp/src/arrow/compute/exec/window_functions/bit_vector_navigator.h @@ -0,0 +1,158 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include "arrow/compute/exec/util.h" +#include "arrow/util/bit_util.h" + +namespace arrow { +namespace compute { + +// Bit-vector allocated size must be multiple of 64-bits. +// There is exactly ceil(num_bits / 64) 64-bit population counters. +// +class BitVectorNavigator { + public: + static uint64_t GenPopCounts(int64_t num_bits, const uint64_t* bits, + uint64_t* pop_counts) { + int64_t num_pop_counts = (num_bits + 63) / 64; + uint64_t sum = 0; + for (int64_t i = 0; i < num_pop_counts; ++i) { + pop_counts[i] = sum; + sum += ARROW_POPCOUNT64(bits[i]); + } + return sum; + } + + // O(1) + static inline int64_t PopCount(int64_t num_bits, const uint64_t* bitvec, + const uint64_t* popcounts) { + int64_t last_word = (num_bits - 1) / 64; + return popcounts[last_word] + ARROW_POPCOUNT64(bitvec[last_word]); + } + + // O(log(N)) + // The output is set to -1 if rank is below zero and to num_bits if + // rank is above the maximum rank of any row in the represented range. + static inline int64_t Select(int64_t rank, int64_t num_bits, const uint64_t* bits, + const uint64_t* pop_counts) { + if (rank < 0) { + return -1LL; + } + int64_t max_rank = PopCount(num_bits, bits, pop_counts) - 1LL; + if (rank > max_rank) { + return num_bits; + } + + int64_t num_pop_counts = (num_bits + 63) / 64; + // Find index of 64-bit block that contains the nth set bit. + int64_t block_id = (std::upper_bound(pop_counts, pop_counts + num_pop_counts, + static_cast(rank)) - + pop_counts) - + 1; + // Binary search position of (n - pop_count + 1)th bit set in the 64-bit + // block. + uint64_t block = bits[block_id]; + int64_t bit_rank = rank - pop_counts[block_id]; + int bit_id = 0; + for (int half_bits = 32; half_bits >= 1; half_bits /= 2) { + uint64_t mask = ((1ULL << half_bits) - 1ULL); + int64_t lower_half_pop_count = ARROW_POPCOUNT64(block & mask); + if (bit_rank >= lower_half_pop_count) { + block >>= half_bits; + bit_rank -= lower_half_pop_count; + bit_id += half_bits; + } + } + return block_id * 64 + bit_id; + } + + // TODO: We could implement BitVectorNavigator::Select that works on batches + // instead of single rows. Then it could use precomputed static B-tree to + // speed up binary search. + // + + // O(1) + // Input row number must be valid (between 0 and number of rows less 1). + static inline int64_t Rank(int64_t pos, const uint64_t* bits, + const uint64_t* pop_counts) { + int64_t block = pos >> 6; + int offset = static_cast(pos & 63LL); + uint64_t mask = (1ULL << offset) - 1ULL; + int64_t rank1 = + static_cast(pop_counts[block]) + ARROW_POPCOUNT64(bits[block] & mask); + return rank1; + } + + // O(1) + // Rank of the next row (also valid for the last row when the next row would + // be outside of the range of rows). + static inline int64_t RankNext(int64_t pos, const uint64_t* bits, + const uint64_t* pop_counts) { + int64_t block = pos >> 6; + int offset = static_cast(pos & 63LL); + uint64_t mask = ~0ULL >> (63 - offset); + int64_t rank1 = + static_cast(pop_counts[block]) + ARROW_POPCOUNT64(bits[block] & mask); + return rank1; + } + + // Input ranks may be outside of range of ranks present in the input bit + // vector. + // + static void SelectsForRangeOfRanks(int64_t rank_begin, int64_t rank_end, + int64_t num_bits, const uint64_t* bitvec, + const uint64_t* popcounts, int64_t* outputs, + int64_t hardware_flags, + util::TempVectorStack* temp_vector_stack); + + static void SelectsForRelativeRanksForRangeOfRows( + int64_t batch_begin, int64_t batch_end, int64_t rank_delta, int64_t num_rows, + const uint64_t* ties_bitvec, const uint64_t* ties_popcounts, int64_t* outputs, + int64_t hardware_flags, util::TempVectorStack* temp_vector_stack); + + template + static void GenSelectedIds(int64_t num_rows, const uint64_t* bitvec, INDEX_T* ids, + int64_t hardware_flags, + util::TempVectorStack* temp_vector_stack) { + // Break into mini-batches. + // + int64_t batch_length_max = util::MiniBatch::kMiniBatchLength; + auto batch_ids_buf = + util::TempVectorHolder(temp_vector_stack, batch_length_max); + auto batch_ids = batch_ids_buf.mutable_data(); + int batch_num_ids; + int64_t num_ids = 0; + for (int64_t batch_begin = 0; batch_begin < num_rows; + batch_begin += batch_length_max) { + int64_t batch_length = std::min(num_rows - batch_begin, batch_length_max); + util::bit_util::bits_to_indexes( + /*bit_to_search=*/1, hardware_flags, batch_length, + reinterpret_cast(bitvec + (batch_begin / 64)), &batch_num_ids, + batch_ids); + for (int i = 0; i < batch_num_ids; ++i) { + ids[num_ids + i] = static_cast(batch_begin + batch_ids[i]); + } + num_ids += batch_num_ids; + } + } +}; + +} // namespace compute +} // namespace arrow diff --git a/cpp/src/arrow/compute/exec/window_functions/window_frame.cc b/cpp/src/arrow/compute/exec/window_functions/window_frame.cc new file mode 100644 index 00000000000..bdd5cbae39a --- /dev/null +++ b/cpp/src/arrow/compute/exec/window_functions/window_frame.cc @@ -0,0 +1,438 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/compute/exec/window_functions/window_frame.h" +#include +#include +#include "arrow/compute/exec/window_functions/bit_vector_navigator.h" + +namespace arrow { +namespace compute { + +void WindowFrameGenerator::GenTies(InputValues vals, InputTies* ties) { + ties->num_rows = vals.num_rows; + + uint64_t bits = 0ULL; + int64_t i = 0; + + auto push_bit = [&bits, &i, ties](uint64_t bit) { + bits |= bit << (i & 63); + if ((i & 63) == 63) { + ties->bitvec[i / 64] = bits; + bits = 0ULL; + } + ++i; + }; + + if (vals.nulls_before && vals.num_nulls > 0) { + push_bit(1); + for (int64_t j = 1; j < vals.num_nulls; ++j) { + push_bit(0); + } + } + + if (vals.num_rows > vals.num_nulls) { + push_bit(1); + for (int64_t j = 1; j < vals.num_rows - vals.num_nulls; ++j) { + push_bit(vals.vals[i] == vals.vals[i - 1] ? 0 : 1); + } + } + + if (!vals.nulls_before && vals.num_nulls > 0) { + push_bit(1); + for (int64_t j = 1; j < vals.num_nulls; ++j) { + push_bit(0); + } + } + + if (vals.num_rows % 64 > 0) { + ties->bitvec[vals.num_rows / 64] = bits; + } +} + +void WindowFrameGenerator::UnboundedRanges(InputBatch* batch, bool unbounded_left, + bool unbounded_right, int64_t num_rows) { + if (unbounded_left) { + memset(batch->frame_begins, 0, + (batch->end - batch->begin) * sizeof(batch->frame_begins[0])); + } + if (unbounded_right) { + for (int64_t irow = batch->begin; irow < batch->end; ++irow) { + int64_t ibatch = irow - batch->begin; + batch->frame_ends[ibatch] = num_rows; + } + } +} + +void WindowFrameGenerator::Groups(InputBatch* batch, bool constant_deltas, + const int64_t* left_delta, const int64_t* right_delta, + InputTies ties, int64_t hardware_flags, + util::TempVectorStack* temp_vector_stack) { + // The case of empty batch + if (batch->end == batch->begin) { + return; + } + + // The case of unbounded ranges + UnboundedRanges(batch, !left_delta, !right_delta, ties.num_rows); + + // The case of constant deltas + if (constant_deltas) { + for (int side = 0; side < 2; ++side) { + const int64_t* delta = side == 0 ? left_delta : right_delta; + int64_t incr = side == 0 ? 0 : 1; + if (delta) { + BitVectorNavigator::SelectsForRelativeRanksForRangeOfRows( + batch->begin, batch->end, delta[0] + incr, ties.num_rows, ties.bitvec, + ties.popcounts, batch->frame_begins, hardware_flags, temp_vector_stack); + } + } + } + // The case of varying deltas + else { + for (int side = 0; side < 2; ++side) { + const int64_t* delta = side == 0 ? left_delta : right_delta; + int64_t incr = side == 0 ? 0 : 1; + if (delta) { + for (int64_t irow = batch->begin; irow < batch->end; ++irow) { + int64_t ibatch = irow - batch->begin; + int64_t rank = + BitVectorNavigator::RankNext(irow, ties.bitvec, ties.popcounts) - 1LL; + int64_t select = BitVectorNavigator::Select( + rank + delta[ibatch] + incr, ties.num_rows, ties.bitvec, ties.popcounts); + batch->frame_begins[ibatch] = std::max(static_cast(0LL), select); + } + } + } + } +} + +void WindowFrameGenerator::Rows(InputBatch* batch, bool constant_deltas, + const int64_t* left_delta, const int64_t* right_delta, + int64_t num_rows) { + // The case of empty batch + if (batch->end == batch->begin) { + return; + } + + // The case of unbounded ranges + UnboundedRanges(batch, !left_delta, !right_delta, num_rows); + + int64_t mask = constant_deltas ? 0LL : ~0LL; + for (int side = 0; side < 2; ++side) { + const int64_t* delta = side == 0 ? left_delta : right_delta; + int64_t incr = side == 0 ? 0 : 1; + if (delta) { + for (int64_t irow = batch->begin; irow < batch->end; ++irow) { + int64_t ibatch = irow - batch->begin; + batch->frame_begins[ibatch] = std::min( + num_rows, + std::max(static_cast(0LL), irow + delta[ibatch & mask] + incr)); + } + } + } +} + +void WindowFrameGenerator::Range(InputBatch* batch, bool constant_deltas, + const int64_t* left_delta, const int64_t* right_delta, + InputValues vals) { + // The case of empty batch + if (batch->end == batch->begin) { + return; + } + + // The case of unbounded ranges + UnboundedRanges(batch, !left_delta, !right_delta, vals.num_rows); + + // Handle nulls. Every null row in a batch will be assigned a frame that + // consist of all the nulls. + int64_t rows_begin = 0; + int64_t rows_end = vals.num_rows; + int64_t batch_begin = batch->begin; + int64_t batch_end = batch->end; + if (vals.num_nulls > 0) { + if (vals.nulls_before) { + rows_begin += vals.num_nulls; + if (batch->begin < vals.num_nulls) { + for (int64_t irow = batch->begin; irow < std::min(batch->end, vals.num_nulls); + ++irow) { + int64_t ibatch = irow - batch->begin; + batch->frame_begins[ibatch] = 0; + batch->frame_ends[ibatch] = vals.num_nulls; + } + batch_begin = std::min(batch->end, vals.num_nulls); + } + } else { + rows_end -= vals.num_nulls; + if (batch->end > vals.num_rows - vals.num_nulls) { + for (int64_t irow = std::max(batch->begin, vals.num_rows - vals.num_nulls); + irow < batch->end; ++irow) { + int64_t ibatch = irow - batch->begin; + batch->frame_begins[ibatch] = vals.num_rows - vals.num_nulls; + batch->frame_ends[ibatch] = vals.num_rows; + } + batch_end = std::max(batch->begin, vals.num_rows - vals.num_nulls); + } + } + if (rows_begin == rows_end) { + for (int64_t ibatch = batch_begin; ibatch < batch_end; ++ibatch) { + batch->frame_begins[ibatch] = vals.nulls_before ? vals.num_rows : 0; + batch->frame_ends[ibatch] = batch->frame_begins[ibatch]; + } + return; + } + if (batch_begin == batch_end) { + return; + } + } + + // The case of constant deltas + if (constant_deltas) { + if (left_delta) { + int64_t current_frame_begin = + rows_begin + (std::lower_bound(vals.vals + rows_begin, vals.vals + rows_end, + vals.vals[batch_begin] + left_delta[0]) - + vals.vals); + + batch->frame_begins[0] = current_frame_begin; + + for (int64_t irow = batch_begin + 1; irow < batch_end; ++irow) { + int64_t ibatch = irow - batch->begin; + int64_t current_frame_left_bound = vals.vals[irow] + left_delta[0]; + while (current_frame_begin < vals.num_rows && + vals.vals[current_frame_begin] < current_frame_left_bound) { + ++current_frame_begin; + } + batch->frame_begins[ibatch] = current_frame_begin; + } + } + if (right_delta) { + int64_t current_frame_end = + rows_begin + (std::upper_bound(vals.vals + rows_begin, vals.vals + rows_end, + vals.vals[batch->begin] + right_delta[0]) - + vals.vals); + + batch->frame_ends[0] = current_frame_end; + + for (int64_t irow = batch_begin + 1; irow < batch_end; ++irow) { + int64_t ibatch = irow - batch->begin; + int64_t current_frame_left_bound = vals.vals[irow] + left_delta[0]; + while (current_frame_end < vals.num_rows && + vals.vals[current_frame_end] <= current_frame_left_bound) { + ++current_frame_end; + } + batch->frame_ends[ibatch] = current_frame_end; + } + } + } + // The case of varying deltas + else { + if (left_delta) { + for (int64_t irow = batch_begin; irow < batch_end; ++irow) { + int64_t ibatch = irow - batch->begin; + + int64_t frame_left_bound = vals.vals[irow] + left_delta[ibatch]; + int64_t frame_begin = + rows_begin + (std::lower_bound(vals.vals + rows_begin, vals.vals + rows_end, + frame_left_bound) - + vals.vals); + + batch->frame_begins[ibatch] = frame_begin; + } + } + if (right_delta) { + for (int64_t irow = batch_begin; irow < batch_end; ++irow) { + int64_t ibatch = irow - batch->begin; + + int64_t frame_right_bound = vals.vals[irow] + right_delta[ibatch]; + int64_t frame_end = + rows_begin + (std::upper_bound(vals.vals + rows_begin, vals.vals + rows_end, + frame_right_bound) - + vals.vals); + + batch->frame_ends[ibatch] = frame_end; + } + } + } +} + +void WindowFrameGeneratorBasic::Groups(WindowFrameGenerator::InputBatch* batch, + bool constant_deltas, const int64_t* left_delta, + const int64_t* right_delta, + WindowFrameGenerator::InputValues vals) { + const int64_t mask = constant_deltas ? 0LL : ~0LL; + for (int64_t i = batch->begin; i < batch->end; ++i) { + if (left_delta) { + batch->frame_begins[i - batch->begin] = + GroupBegin(i, left_delta[(i - batch->begin) & mask], vals); + } else { + batch->frame_begins[i - batch->begin] = 0; + } + if (right_delta) { + batch->frame_ends[i - batch->begin] = + GroupBegin(i, right_delta[(i - batch->begin) & mask] + 1, vals); + } else { + batch->frame_ends[i - batch->begin] = vals.num_rows; + } + } +} + +void WindowFrameGeneratorBasic::Rows(WindowFrameGenerator::InputBatch* batch, + bool constant_deltas, const int64_t* left_delta, + const int64_t* right_delta, int64_t num_rows) { + const int64_t mask = constant_deltas ? 0LL : ~0LL; + if (left_delta) { + for (int64_t i = batch->begin; i < batch->end; ++i) { + batch->frame_begins[i - batch->begin] = std::min( + num_rows, + std::max(static_cast(0LL), i + left_delta[(i - batch->begin) & mask])); + } + } else { + for (int64_t i = batch->begin; i < batch->end; ++i) { + batch->frame_begins[i - batch->begin] = 0LL; + } + } + if (right_delta) { + for (int64_t i = batch->begin; i < batch->end; ++i) { + batch->frame_ends[i - batch->begin] = + std::min(num_rows, std::max(static_cast(0LL), + i + right_delta[(i - batch->begin) & mask] + 1)); + } + } else { + for (int64_t i = batch->begin; i < batch->end; ++i) { + batch->frame_ends[i - batch->begin] = num_rows; + } + } +} + +void WindowFrameGeneratorBasic::Range(WindowFrameGenerator::InputBatch* batch, + bool constant_deltas, const int64_t* left_delta, + const int64_t* right_delta, + WindowFrameGenerator::InputValues vals) { + auto equals = [vals](int64_t l, int64_t r) { + bool is_null_l = + vals.nulls_before ? l < vals.num_nulls : l >= vals.num_rows - vals.num_nulls; + bool is_null_r = + vals.nulls_before ? r < vals.num_nulls : r >= vals.num_rows - vals.num_nulls; + return (is_null_l && is_null_r) || + (!is_null_l && !is_null_r && (vals.vals[l] == vals.vals[r])); + }; + + const int64_t mask = constant_deltas ? 0LL : ~0LL; + + if (left_delta) { + for (int64_t i = batch->begin; i < batch->end; ++i) { + batch->frame_begins[i - batch->begin] = + RangeBegin(vals.vals[i] + left_delta[(i - batch->begin) & mask], vals); + } + } else { + for (int64_t i = batch->begin; i < batch->end; ++i) { + batch->frame_begins[i - batch->begin] = 0LL; + } + } + + if (right_delta) { + for (int64_t i = batch->begin; i < batch->end; ++i) { + int64_t last = + RangeBegin(vals.vals[i] + right_delta[(i - batch->begin) & mask], vals); + while (last + 1 < vals.num_rows && equals(last + 1, last)) { + ++last; + } + batch->frame_ends[i - batch->begin] = last + 1; + } + } else { + for (int64_t i = batch->begin; i < batch->end; ++i) { + batch->frame_ends[i - batch->begin] = vals.num_rows; + } + } +} + +int64_t WindowFrameGeneratorBasic::GroupBegin(int64_t row_number, + int64_t relative_group_number, + WindowFrameGenerator::InputValues vals) { + auto equals = [vals](int64_t l, int64_t r) { + bool is_null_l = + vals.nulls_before ? l < vals.num_nulls : l >= vals.num_rows - vals.num_nulls; + bool is_null_r = + vals.nulls_before ? r < vals.num_nulls : r >= vals.num_rows - vals.num_nulls; + return (is_null_l && is_null_r) || + (!is_null_l && !is_null_r && (vals.vals[l] == vals.vals[r])); + }; + + // Find first row in current row's group + int64_t begin = row_number; + while (begin > 0 && equals(begin - 1, begin)) { + --begin; + } + if (relative_group_number < 0) { + // Skip a given number of preceding groups + for (int64_t group = 0; group < -relative_group_number; ++group) { + if (begin > 0) { + --begin; + } + while (begin > 0 && equals(begin - 1, begin)) { + --begin; + } + } + } else { + // Skip a given number of following groups + for (int64_t group = 0; group < relative_group_number; ++group) { + while (begin < vals.num_rows && equals(begin + 1, begin)) { + ++begin; + } + if (begin < vals.num_rows) { + ++begin; + } + } + } + return begin; +} + +int64_t WindowFrameGeneratorBasic::RangeBegin(int64_t val, + WindowFrameGenerator::InputValues vals) { + auto less = [vals](int64_t index, int64_t value) { + bool is_null_l = vals.nulls_before ? index < vals.num_nulls + : index >= vals.num_rows - vals.num_nulls; + bool is_null_r = false; + if (vals.nulls_before) { + if (is_null_l && !is_null_r) { + return true; + } + if (is_null_r) { + return false; + } + } else { + if (!is_null_l && is_null_r) { + return true; + } + if (is_null_l) { + return false; + } + } + return (vals.vals[index] < value); + }; + + int64_t result = 0; + while (result < vals.num_rows && less(result, val)) { + ++result; + } + return result; + // return std::lower_bound(vals, vals + num_rows, val) - vals; +} + +} // namespace compute +} // namespace arrow diff --git a/cpp/src/arrow/compute/exec/window_functions/window_frame.h b/cpp/src/arrow/compute/exec/window_functions/window_frame.h new file mode 100644 index 00000000000..e886497bc7c --- /dev/null +++ b/cpp/src/arrow/compute/exec/window_functions/window_frame.h @@ -0,0 +1,244 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include +#include "arrow/compute/exec/util.h" + +namespace arrow { +namespace compute { + +struct WindowFrames { + static constexpr int kMaxRangesInFrame = 3; + + int num_ranges_in_frame; + int64_t num_frames; + + // Range can be empty, in that case begin == end. Otherwise begin < end. + // + // Ranges in a single frame must be disjoint but begin of next range can be + // equal to the end of the previous one. + // + const int64_t* begins[kMaxRangesInFrame]; + const int64_t* ends[kMaxRangesInFrame]; + + // Row filter has bits set to 0 for rows that should not be included in the + // range. + // + // Null row filter means that all rows are qualified. + // + const uint8_t* row_filter; + + bool FramesProgressing() const { + for (int64_t i = 1; i < num_frames; ++i) { + if (!(begins[i] >= begins[i - 1] && ends[i] >= ends[i - 1])) { + return false; + } + } + return true; + } + + bool FramesExpanding() const { + for (int64_t i = 1; i < num_frames; ++i) { + if (!((begins[i] >= ends[i - 1] || begins[i] == begins[i - 1]) && + (ends[i] >= ends[i - 1]))) { + return false; + } + } + return true; + } +}; + +inline void GenerateTestFrames(Random64BitCopy& rand, int64_t num_rows, + std::vector& begins, std::vector& ends, + bool progressive, bool expansive) { + begins.resize(num_rows); + ends.resize(num_rows); + + if (!progressive && !expansive) { + constexpr int64_t max_frame_length = 100; + for (int64_t i = 0; i < num_rows; ++i) { + int64_t length = rand.from_range(static_cast(0LL), + std::min(num_rows, max_frame_length)); + int64_t begin = rand.from_range(static_cast(0LL), num_rows - length); + begins[i] = begin; + ends[i] = begin + length; + } + } else if (progressive && !expansive) { + int64_t dist = rand.from_range(static_cast(1LL), + std::max(static_cast(1LL), num_rows / 4)); + std::vector pos; + for (int64_t i = 0; i < num_rows + dist; ++i) { + pos.push_back(rand.from_range(static_cast(0LL), num_rows)); + } + std::sort(pos.begin(), pos.end()); + for (int64_t i = 0; i < num_rows; ++i) { + begins[i] = pos[i]; + ends[i] = pos[i + dist]; + } + } else { + int64_t num_partitions = + rand.from_range(static_cast(1LL), bit_util::CeilDiv(num_rows, 128LL)); + std::set partition_ends_set; + std::vector partition_ends; + partition_ends_set.insert(num_rows); + partition_ends.push_back(num_rows); + for (int64_t i = 1; i < num_partitions; ++i) { + int64_t partition_end; + for (;;) { + partition_end = rand.from_range(static_cast(1LL), num_rows - 1); + if (partition_ends_set.find(partition_end) == partition_ends_set.end()) { + break; + } + } + partition_ends.push_back(partition_end); + partition_ends_set.insert(partition_end); + } + std::sort(partition_ends.begin(), partition_ends.end()); + for (int64_t ipartition = 0; ipartition < num_partitions; ++ipartition) { + int64_t partition_begin = ipartition == 0 ? 0LL : partition_ends[ipartition - 1]; + int64_t partition_end = partition_ends[ipartition]; + int64_t partition_length = partition_end - partition_begin; + int64_t begin = rand.from_range(0LL, 2LL); + + if (begin >= partition_length) { + begin = partition_length - 1; + } + int64_t end = begin + rand.from_range(0LL, 2LL); + if (end > partition_length) { + end = partition_length; + } + begins[partition_begin + 0] = partition_begin + begin; + ends[partition_begin + 0] = partition_begin + end; + for (int64_t i = 1; i < partition_length; ++i) { + int64_t end_step = rand.from_range(0LL, 2LL); + end += end_step; + if (end > partition_length) { + end = partition_length; + } + begins[partition_begin + i] = partition_begin + begin; + ends[partition_begin + i] = partition_begin + end; + } + } + } +} + +// TODO: Enable treating multiple partitions as a single one with window +// frames cropped to boundaries of these partitions. +// This should help amortize the costs of data structure allocations done per +// partition among a group of them, in case when there are many small +// partitions. +// + +class WindowFrameGenerator { + public: + // Sorted values for entire window + struct InputValues { + InputValues(int64_t num_rows, bool nulls_before, int64_t num_nulls, + const int64_t* vals) + : num_rows(num_rows), + nulls_before(nulls_before), + num_nulls(num_nulls), + vals(vals) {} + int64_t num_rows; + bool nulls_before; + int64_t num_nulls; + const int64_t* vals; + }; + + // Bit vector representing ties (first row in every tie group has its bit set + // to 1) + struct InputTies { + InputTies(int64_t num_rows, uint64_t* bitvec, uint64_t* popcounts) + : num_rows(num_rows), bitvec(bitvec), popcounts(popcounts) {} + int64_t num_rows; + uint64_t* bitvec; + uint64_t* popcounts; + }; + + struct InputBatch { + InputBatch(int64_t begin, int64_t end, int64_t* frame_begins, int64_t* frame_ends) + : begin(begin), end(end), frame_begins(frame_begins), frame_ends(frame_ends) {} + + // Range of row numbers that make input batch + int64_t begin; + int64_t end; + + // Output frames for input rows + int64_t* frame_begins; + int64_t* frame_ends; + }; + + // First row in every group of ties gets the corresponding bit set to 1. + // + // For the purpose of groups we consider null key being equal to null key. + // + static void GenTies(InputValues vals, InputTies* ties); + + // Deltas with respect to current row's tie group + // for range of groups to include in its frame. + // Range bounds specified by deltas are + // inclusive. Null pointer means that the + // corresponding side of the range is unbounded. + // + static void Groups(InputBatch* batch, bool constant_deltas, const int64_t* left_delta, + const int64_t* right_delta, InputTies ties, int64_t hardware_flags, + util::TempVectorStack* temp_vector_stack); + + // Constant value specifying delta between current row number and row number + // of the first row of its frame. Preceding can use kUnbounded constant. + static void Rows(InputBatch* batch, bool constant_deltas, const int64_t* left_delta, + const int64_t* right_delta, int64_t num_rows); + + // Constant value specifying delta between current row value and inclusive + // lower bound of its frame. Preceding can use kUnbounded constant. + // + // Only meaningful for a single column sort key. + // + static void Range(InputBatch* batch, bool constant_deltas, const int64_t* left_delta, + const int64_t* right_delta, InputValues vals); + + private: + static void UnboundedRanges(InputBatch* batch, bool unbounded_left, + bool unbounded_right, int64_t num_rows); +}; + +class WindowFrameGeneratorBasic { + public: + static void Groups(WindowFrameGenerator::InputBatch* batch, bool constant_deltas, + const int64_t* left_delta, const int64_t* right_delta, + WindowFrameGenerator::InputValues vals); + + static void Rows(WindowFrameGenerator::InputBatch* batch, bool constant_deltas, + const int64_t* left_delta, const int64_t* right_delta, + int64_t num_rows); + + static void Range(WindowFrameGenerator::InputBatch* batch, bool constant_deltas, + const int64_t* left_delta, const int64_t* right_delta, + WindowFrameGenerator::InputValues vals); + + private: + static int64_t GroupBegin(int64_t row_number, int64_t relative_group_number, + WindowFrameGenerator::InputValues vals); + + static int64_t RangeBegin(int64_t val, WindowFrameGenerator::InputValues vals); +}; + +} // namespace compute +} // namespace arrow