diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt index 2a4748d0a40..dc41f238d2c 100644 --- a/cpp/src/arrow/CMakeLists.txt +++ b/cpp/src/arrow/CMakeLists.txt @@ -401,8 +401,6 @@ list(APPEND compute/exec/hash_join.cc compute/exec/hash_join_dict.cc compute/exec/hash_join_node.cc - compute/exec/key_hash.cc - compute/exec/key_map.cc compute/exec/map_node.cc compute/exec/options.cc compute/exec/order_by_impl.cc @@ -420,6 +418,8 @@ list(APPEND compute/function.cc compute/function_internal.cc compute/kernel.cc + compute/key_hash.cc + compute/key_map.cc compute/light_array.cc compute/ordering.cc compute/registry.cc @@ -439,11 +439,12 @@ list(APPEND compute/row/encode_internal.cc compute/row/compare_internal.cc compute/row/grouper.cc - compute/row/row_internal.cc) + compute/row/row_internal.cc + compute/util.cc) append_avx2_src(compute/exec/bloom_filter_avx2.cc) -append_avx2_src(compute/exec/key_hash_avx2.cc) -append_avx2_src(compute/exec/key_map_avx2.cc) +append_avx2_src(compute/key_hash_avx2.cc) +append_avx2_src(compute/key_map_avx2.cc) append_avx2_src(compute/exec/swiss_join_avx2.cc) append_avx2_src(compute/exec/util_avx2.cc) append_avx2_src(compute/row/compare_internal_avx2.cc) diff --git a/cpp/src/arrow/compute/CMakeLists.txt b/cpp/src/arrow/compute/CMakeLists.txt index cdf019b798b..4accec15d9c 100644 --- a/cpp/src/arrow/compute/CMakeLists.txt +++ b/cpp/src/arrow/compute/CMakeLists.txt @@ -82,7 +82,8 @@ add_arrow_compute_test(internals_test exec_test.cc kernel_test.cc light_array_test.cc - registry_test.cc) + registry_test.cc + key_hash_test.cc) add_arrow_benchmark(function_benchmark PREFIX "arrow-compute") diff --git a/cpp/src/arrow/compute/exec/CMakeLists.txt b/cpp/src/arrow/compute/exec/CMakeLists.txt index 9f3eedb63de..914840b0396 100644 --- a/cpp/src/arrow/compute/exec/CMakeLists.txt +++ b/cpp/src/arrow/compute/exec/CMakeLists.txt @@ -46,15 +46,13 @@ add_arrow_compute_test(hash_join_node_test "arrow-compute" SOURCES hash_join_node_test.cc - bloom_filter_test.cc - key_hash_test.cc) + bloom_filter_test.cc) add_arrow_compute_test(pivot_longer_node_test PREFIX "arrow-compute" SOURCES pivot_longer_node_test.cc test_nodes.cc) - add_arrow_compute_test(asof_join_node_test REQUIRE_ALL_KERNELS PREFIX @@ -71,6 +69,11 @@ add_arrow_compute_test(util_test SOURCES util_test.cc task_util_test.cc) +add_arrow_compute_test(light_array_exec_test + PREFIX + "arrow-compute" + SOURCES + light_array_exec_test.cc) add_arrow_benchmark(expression_benchmark PREFIX "arrow-compute") diff --git a/cpp/src/arrow/compute/exec/asof_join_node.cc b/cpp/src/arrow/compute/exec/asof_join_node.cc index 00a733be25e..47acc41e889 100644 --- a/cpp/src/arrow/compute/exec/asof_join_node.cc +++ b/cpp/src/arrow/compute/exec/asof_join_node.cc @@ -30,11 +30,11 @@ #include "arrow/array/builder_binary.h" #include "arrow/array/builder_primitive.h" #include "arrow/compute/exec/exec_plan.h" -#include "arrow/compute/exec/key_hash.h" #include "arrow/compute/exec/options.h" #include "arrow/compute/exec/query_context.h" #include "arrow/compute/exec/schema_util.h" #include "arrow/compute/exec/util.h" +#include "arrow/compute/key_hash.h" #include "arrow/compute/light_array.h" #include "arrow/record_batch.h" #include "arrow/result.h" diff --git a/cpp/src/arrow/compute/exec/bloom_filter_test.cc b/cpp/src/arrow/compute/exec/bloom_filter_test.cc index 50993b4cb10..3a79c10be2a 100644 --- a/cpp/src/arrow/compute/exec/bloom_filter_test.cc +++ b/cpp/src/arrow/compute/exec/bloom_filter_test.cc @@ -23,10 +23,10 @@ #include #include #include "arrow/compute/exec/bloom_filter.h" -#include "arrow/compute/exec/key_hash.h" #include "arrow/compute/exec/task_util.h" #include "arrow/compute/exec/test_util.h" #include "arrow/compute/exec/util.h" +#include "arrow/compute/key_hash.h" #include "arrow/util/bitmap_ops.h" #include "arrow/util/cpu_info.h" diff --git a/cpp/src/arrow/compute/exec/hash_join_node.cc b/cpp/src/arrow/compute/exec/hash_join_node.cc index 6155ebd603f..6da58330e22 100644 --- a/cpp/src/arrow/compute/exec/hash_join_node.cc +++ b/cpp/src/arrow/compute/exec/hash_join_node.cc @@ -24,10 +24,10 @@ #include "arrow/compute/exec/hash_join.h" #include "arrow/compute/exec/hash_join_dict.h" #include "arrow/compute/exec/hash_join_node.h" -#include "arrow/compute/exec/key_hash.h" #include "arrow/compute/exec/options.h" #include "arrow/compute/exec/schema_util.h" #include "arrow/compute/exec/util.h" +#include "arrow/compute/key_hash.h" #include "arrow/util/checked_cast.h" #include "arrow/util/future.h" #include "arrow/util/thread_pool.h" diff --git a/cpp/src/arrow/compute/exec/light_array_exec_test.cc b/cpp/src/arrow/compute/exec/light_array_exec_test.cc new file mode 100644 index 00000000000..cbc93299a5b --- /dev/null +++ b/cpp/src/arrow/compute/exec/light_array_exec_test.cc @@ -0,0 +1,172 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/compute/light_array.h" + +#include + +#include "arrow/compute/exec/test_util.h" +#include "arrow/testing/gtest_util.h" +#include "arrow/type.h" +#include "arrow/util/checked_cast.h" + +namespace arrow { +namespace compute { + +TEST(KeyColumnArray, FromExecBatch) { + ExecBatch batch = + ExecBatchFromJSON({int64(), boolean()}, "[[1, true], [2, false], [null, null]]"); + std::vector arrays; + ASSERT_OK(ColumnArraysFromExecBatch(batch, &arrays)); + + ASSERT_EQ(2, arrays.size()); + ASSERT_EQ(8, arrays[0].metadata().fixed_length); + ASSERT_EQ(0, arrays[1].metadata().fixed_length); + ASSERT_EQ(3, arrays[0].length()); + ASSERT_EQ(3, arrays[1].length()); + + ASSERT_OK(ColumnArraysFromExecBatch(batch, 1, 1, &arrays)); + + ASSERT_EQ(2, arrays.size()); + ASSERT_EQ(8, arrays[0].metadata().fixed_length); + ASSERT_EQ(0, arrays[1].metadata().fixed_length); + ASSERT_EQ(1, arrays[0].length()); + ASSERT_EQ(1, arrays[1].length()); +} + +TEST(ExecBatchBuilder, AppendBatches) { + std::unique_ptr owned_pool = MemoryPool::CreateDefault(); + MemoryPool* pool = owned_pool.get(); + ExecBatch batch_one = + ExecBatchFromJSON({int64(), boolean()}, "[[1, true], [2, false], [null, null]]"); + ExecBatch batch_two = + ExecBatchFromJSON({int64(), boolean()}, "[[null, true], [5, true], [6, false]]"); + ExecBatch combined = ExecBatchFromJSON( + {int64(), boolean()}, + "[[1, true], [2, false], [null, null], [null, true], [5, true], [6, false]]"); + { + ExecBatchBuilder builder; + uint16_t row_ids[3] = {0, 1, 2}; + ASSERT_OK(builder.AppendSelected(pool, batch_one, 3, row_ids, /*num_cols=*/2)); + ASSERT_OK(builder.AppendSelected(pool, batch_two, 3, row_ids, /*num_cols=*/2)); + ExecBatch built = builder.Flush(); + ASSERT_EQ(combined, built); + ASSERT_NE(0, pool->bytes_allocated()); + } + ASSERT_EQ(0, pool->bytes_allocated()); +} + +TEST(ExecBatchBuilder, AppendBatchesSomeRows) { + std::unique_ptr owned_pool = MemoryPool::CreateDefault(); + MemoryPool* pool = owned_pool.get(); + ExecBatch batch_one = + ExecBatchFromJSON({int64(), boolean()}, "[[1, true], [2, false], [null, null]]"); + ExecBatch batch_two = + ExecBatchFromJSON({int64(), boolean()}, "[[null, true], [5, true], [6, false]]"); + ExecBatch combined = ExecBatchFromJSON( + {int64(), boolean()}, "[[1, true], [2, false], [null, true], [5, true]]"); + { + ExecBatchBuilder builder; + uint16_t row_ids[2] = {0, 1}; + ASSERT_OK(builder.AppendSelected(pool, batch_one, 2, row_ids, /*num_cols=*/2)); + ASSERT_OK(builder.AppendSelected(pool, batch_two, 2, row_ids, /*num_cols=*/2)); + ExecBatch built = builder.Flush(); + ASSERT_EQ(combined, built); + ASSERT_NE(0, pool->bytes_allocated()); + } + ASSERT_EQ(0, pool->bytes_allocated()); +} + +TEST(ExecBatchBuilder, AppendBatchesSomeCols) { + std::unique_ptr owned_pool = MemoryPool::CreateDefault(); + MemoryPool* pool = owned_pool.get(); + ExecBatch batch_one = + ExecBatchFromJSON({int64(), boolean()}, "[[1, true], [2, false], [null, null]]"); + ExecBatch batch_two = + ExecBatchFromJSON({int64(), boolean()}, "[[null, true], [5, true], [6, false]]"); + ExecBatch first_col_only = + ExecBatchFromJSON({int64()}, "[[1], [2], [null], [null], [5], [6]]"); + ExecBatch last_col_only = ExecBatchFromJSON( + {boolean()}, "[[true], [false], [null], [true], [true], [false]]"); + { + ExecBatchBuilder builder; + uint16_t row_ids[3] = {0, 1, 2}; + int first_col_ids[1] = {0}; + ASSERT_OK(builder.AppendSelected(pool, batch_one, 3, row_ids, /*num_cols=*/1, + first_col_ids)); + ASSERT_OK(builder.AppendSelected(pool, batch_two, 3, row_ids, /*num_cols=*/1, + first_col_ids)); + ExecBatch built = builder.Flush(); + ASSERT_EQ(first_col_only, built); + ASSERT_NE(0, pool->bytes_allocated()); + } + { + ExecBatchBuilder builder; + uint16_t row_ids[3] = {0, 1, 2}; + // If we don't specify col_ids and num_cols is 1 it is implicitly the first col + ASSERT_OK(builder.AppendSelected(pool, batch_one, 3, row_ids, /*num_cols=*/1)); + ASSERT_OK(builder.AppendSelected(pool, batch_two, 3, row_ids, /*num_cols=*/1)); + ExecBatch built = builder.Flush(); + ASSERT_EQ(first_col_only, built); + ASSERT_NE(0, pool->bytes_allocated()); + } + { + ExecBatchBuilder builder; + uint16_t row_ids[3] = {0, 1, 2}; + int last_col_ids[1] = {1}; + ASSERT_OK(builder.AppendSelected(pool, batch_one, 3, row_ids, /*num_cols=*/1, + last_col_ids)); + ASSERT_OK(builder.AppendSelected(pool, batch_two, 3, row_ids, /*num_cols=*/1, + last_col_ids)); + ExecBatch built = builder.Flush(); + ASSERT_EQ(last_col_only, built); + ASSERT_NE(0, pool->bytes_allocated()); + } + ASSERT_EQ(0, pool->bytes_allocated()); +} + +TEST(ExecBatchBuilder, AppendNulls) { + std::unique_ptr owned_pool = MemoryPool::CreateDefault(); + MemoryPool* pool = owned_pool.get(); + ExecBatch batch_one = + ExecBatchFromJSON({int64(), boolean()}, "[[1, true], [2, false], [null, null]]"); + ExecBatch combined = ExecBatchFromJSON( + {int64(), boolean()}, + "[[1, true], [2, false], [null, null], [null, null], [null, null]]"); + ExecBatch just_nulls = + ExecBatchFromJSON({int64(), boolean()}, "[[null, null], [null, null]]"); + { + ExecBatchBuilder builder; + uint16_t row_ids[3] = {0, 1, 2}; + ASSERT_OK(builder.AppendSelected(pool, batch_one, 3, row_ids, /*num_cols=*/2)); + ASSERT_OK(builder.AppendNulls(pool, {int64(), boolean()}, 2)); + ExecBatch built = builder.Flush(); + ASSERT_EQ(combined, built); + ASSERT_NE(0, pool->bytes_allocated()); + } + { + ExecBatchBuilder builder; + ASSERT_OK(builder.AppendNulls(pool, {int64(), boolean()}, 2)); + ExecBatch built = builder.Flush(); + ASSERT_EQ(just_nulls, built); + ASSERT_NE(0, pool->bytes_allocated()); + } + ASSERT_EQ(0, pool->bytes_allocated()); +} + +} // namespace compute +} // namespace arrow diff --git a/cpp/src/arrow/compute/exec/swiss_join.cc b/cpp/src/arrow/compute/exec/swiss_join.cc index de9b720c480..8bf2ee1df47 100644 --- a/cpp/src/arrow/compute/exec/swiss_join.cc +++ b/cpp/src/arrow/compute/exec/swiss_join.cc @@ -22,10 +22,10 @@ #include #include "arrow/array/util.h" // MakeArrayFromScalar #include "arrow/compute/exec/hash_join.h" -#include "arrow/compute/exec/key_hash.h" #include "arrow/compute/exec/swiss_join_internal.h" #include "arrow/compute/exec/util.h" #include "arrow/compute/kernels/row_encoder_internal.h" +#include "arrow/compute/key_hash.h" #include "arrow/compute/row/compare_internal.h" #include "arrow/compute/row/encode_internal.h" #include "arrow/util/bit_util.h" diff --git a/cpp/src/arrow/compute/exec/swiss_join_internal.h b/cpp/src/arrow/compute/exec/swiss_join_internal.h index 355aff70944..766f40e131c 100644 --- a/cpp/src/arrow/compute/exec/swiss_join_internal.h +++ b/cpp/src/arrow/compute/exec/swiss_join_internal.h @@ -18,12 +18,12 @@ #pragma once #include -#include "arrow/compute/exec/key_map.h" #include "arrow/compute/exec/options.h" #include "arrow/compute/exec/partition_util.h" #include "arrow/compute/exec/schema_util.h" #include "arrow/compute/exec/task_util.h" #include "arrow/compute/kernels/row_encoder_internal.h" +#include "arrow/compute/key_map.h" #include "arrow/compute/light_array.h" #include "arrow/compute/row/encode_internal.h" diff --git a/cpp/src/arrow/compute/exec/util.cc b/cpp/src/arrow/compute/exec/util.cc index 752f8cac764..6a1fd37aa19 100644 --- a/cpp/src/arrow/compute/exec/util.cc +++ b/cpp/src/arrow/compute/exec/util.cc @@ -25,315 +25,6 @@ #include "arrow/util/ubsan.h" namespace arrow { - -using bit_util::CountTrailingZeros; - -namespace util { - -inline uint64_t bit_util::SafeLoadUpTo8Bytes(const uint8_t* bytes, int num_bytes) { - // This will not be correct on big-endian architectures. -#if !ARROW_LITTLE_ENDIAN - ARROW_DCHECK(false); -#endif - ARROW_DCHECK(num_bytes >= 0 && num_bytes <= 8); - if (num_bytes == 8) { - return util::SafeLoad(reinterpret_cast(bytes)); - } else { - uint64_t word = 0; - for (int i = 0; i < num_bytes; ++i) { - word |= static_cast(bytes[i]) << (8 * i); - } - return word; - } -} - -inline void bit_util::SafeStoreUpTo8Bytes(uint8_t* bytes, int num_bytes, uint64_t value) { - // This will not be correct on big-endian architectures. -#if !ARROW_LITTLE_ENDIAN - ARROW_DCHECK(false); -#endif - ARROW_DCHECK(num_bytes >= 0 && num_bytes <= 8); - if (num_bytes == 8) { - util::SafeStore(reinterpret_cast(bytes), value); - } else { - for (int i = 0; i < num_bytes; ++i) { - bytes[i] = static_cast(value >> (8 * i)); - } - } -} - -inline void bit_util::bits_to_indexes_helper(uint64_t word, uint16_t base_index, - int* num_indexes, uint16_t* indexes) { - int n = *num_indexes; - while (word) { - indexes[n++] = base_index + static_cast(CountTrailingZeros(word)); - word &= word - 1; - } - *num_indexes = n; -} - -inline void bit_util::bits_filter_indexes_helper(uint64_t word, - const uint16_t* input_indexes, - int* num_indexes, uint16_t* indexes) { - int n = *num_indexes; - while (word) { - indexes[n++] = input_indexes[CountTrailingZeros(word)]; - word &= word - 1; - } - *num_indexes = n; -} - -template -void bit_util::bits_to_indexes_internal(int64_t hardware_flags, const int num_bits, - const uint8_t* bits, - const uint16_t* input_indexes, int* num_indexes, - uint16_t* indexes, uint16_t base_index) { - // 64 bits at a time - constexpr int unroll = 64; - int tail = num_bits % unroll; -#if defined(ARROW_HAVE_AVX2) - if (hardware_flags & arrow::internal::CpuInfo::AVX2) { - if (filter_input_indexes) { - bits_filter_indexes_avx2(bit_to_search, num_bits - tail, bits, input_indexes, - num_indexes, indexes); - } else { - bits_to_indexes_avx2(bit_to_search, num_bits - tail, bits, num_indexes, indexes, - base_index); - } - } else { -#endif - *num_indexes = 0; - for (int i = 0; i < num_bits / unroll; ++i) { - uint64_t word = util::SafeLoad(&reinterpret_cast(bits)[i]); - if (bit_to_search == 0) { - word = ~word; - } - if (filter_input_indexes) { - bits_filter_indexes_helper(word, input_indexes + i * 64, num_indexes, indexes); - } else { - bits_to_indexes_helper(word, i * 64 + base_index, num_indexes, indexes); - } - } -#if defined(ARROW_HAVE_AVX2) - } -#endif - // Optionally process the last partial word with masking out bits outside range - if (tail) { - const uint8_t* bits_tail = bits + (num_bits - tail) / 8; - uint64_t word = SafeLoadUpTo8Bytes(bits_tail, (tail + 7) / 8); - if (bit_to_search == 0) { - word = ~word; - } - word &= ~0ULL >> (64 - tail); - if (filter_input_indexes) { - bits_filter_indexes_helper(word, input_indexes + num_bits - tail, num_indexes, - indexes); - } else { - bits_to_indexes_helper(word, num_bits - tail + base_index, num_indexes, indexes); - } - } -} - -void bit_util::bits_to_indexes(int bit_to_search, int64_t hardware_flags, int num_bits, - const uint8_t* bits, int* num_indexes, uint16_t* indexes, - int bit_offset) { - bits += bit_offset / 8; - bit_offset %= 8; - *num_indexes = 0; - uint16_t base_index = 0; - if (bit_offset != 0) { - uint64_t bits_head = bits[0] >> bit_offset; - int bits_in_first_byte = std::min(num_bits, 8 - bit_offset); - bits_to_indexes(bit_to_search, hardware_flags, bits_in_first_byte, - reinterpret_cast(&bits_head), num_indexes, indexes); - if (num_bits <= bits_in_first_byte) { - return; - } - num_bits -= bits_in_first_byte; - indexes += *num_indexes; - bits += 1; - base_index = bits_in_first_byte; - } - - int num_indexes_new = 0; - if (bit_to_search == 0) { - bits_to_indexes_internal<0, false>(hardware_flags, num_bits, bits, nullptr, - &num_indexes_new, indexes, base_index); - } else { - ARROW_DCHECK(bit_to_search == 1); - bits_to_indexes_internal<1, false>(hardware_flags, num_bits, bits, nullptr, - &num_indexes_new, indexes, base_index); - } - *num_indexes += num_indexes_new; -} - -void bit_util::bits_filter_indexes(int bit_to_search, int64_t hardware_flags, - const int num_bits, const uint8_t* bits, - const uint16_t* input_indexes, int* num_indexes, - uint16_t* indexes, int bit_offset) { - bits += bit_offset / 8; - bit_offset %= 8; - if (bit_offset != 0) { - int num_indexes_head = 0; - uint64_t bits_head = bits[0] >> bit_offset; - int bits_in_first_byte = std::min(num_bits, 8 - bit_offset); - bits_filter_indexes(bit_to_search, hardware_flags, bits_in_first_byte, - reinterpret_cast(&bits_head), input_indexes, - &num_indexes_head, indexes); - int num_indexes_tail = 0; - if (num_bits > bits_in_first_byte) { - bits_filter_indexes(bit_to_search, hardware_flags, num_bits - bits_in_first_byte, - bits + 1, input_indexes + bits_in_first_byte, &num_indexes_tail, - indexes + num_indexes_head); - } - *num_indexes = num_indexes_head + num_indexes_tail; - return; - } - - if (bit_to_search == 0) { - bits_to_indexes_internal<0, true>(hardware_flags, num_bits, bits, input_indexes, - num_indexes, indexes); - } else { - ARROW_DCHECK(bit_to_search == 1); - bits_to_indexes_internal<1, true>(hardware_flags, num_bits, bits, input_indexes, - num_indexes, indexes); - } -} - -void bit_util::bits_split_indexes(int64_t hardware_flags, const int num_bits, - const uint8_t* bits, int* num_indexes_bit0, - uint16_t* indexes_bit0, uint16_t* indexes_bit1, - int bit_offset) { - bits_to_indexes(0, hardware_flags, num_bits, bits, num_indexes_bit0, indexes_bit0, - bit_offset); - int num_indexes_bit1; - bits_to_indexes(1, hardware_flags, num_bits, bits, &num_indexes_bit1, indexes_bit1, - bit_offset); -} - -void bit_util::bits_to_bytes(int64_t hardware_flags, const int num_bits, - const uint8_t* bits, uint8_t* bytes, int bit_offset) { - bits += bit_offset / 8; - bit_offset %= 8; - if (bit_offset != 0) { - uint64_t bits_head = bits[0] >> bit_offset; - int bits_in_first_byte = std::min(num_bits, 8 - bit_offset); - bits_to_bytes(hardware_flags, bits_in_first_byte, - reinterpret_cast(&bits_head), bytes); - if (num_bits > bits_in_first_byte) { - bits_to_bytes(hardware_flags, num_bits - bits_in_first_byte, bits + 1, - bytes + bits_in_first_byte); - } - return; - } - - int num_processed = 0; -#if defined(ARROW_HAVE_AVX2) - if (hardware_flags & arrow::internal::CpuInfo::AVX2) { - // The function call below processes whole 32 bit chunks together. - num_processed = num_bits - (num_bits % 32); - bits_to_bytes_avx2(num_processed, bits, bytes); - } -#endif - // Processing 8 bits at a time - constexpr int unroll = 8; - for (int i = num_processed / unroll; i < num_bits / unroll; ++i) { - uint8_t bits_next = bits[i]; - // Clear the lowest bit and then make 8 copies of remaining 7 bits, each 7 bits apart - // from the previous. - uint64_t unpacked = static_cast(bits_next & 0xfe) * - ((1ULL << 7) | (1ULL << 14) | (1ULL << 21) | (1ULL << 28) | - (1ULL << 35) | (1ULL << 42) | (1ULL << 49)); - unpacked |= (bits_next & 1); - unpacked &= 0x0101010101010101ULL; - unpacked *= 255; - util::SafeStore(&reinterpret_cast(bytes)[i], unpacked); - } - int tail = num_bits % unroll; - if (tail) { - uint8_t bits_next = bits[(num_bits - tail) / unroll]; - // Clear the lowest bit and then make 8 copies of remaining 7 bits, each 7 bits apart - // from the previous. - uint64_t unpacked = static_cast(bits_next & 0xfe) * - ((1ULL << 7) | (1ULL << 14) | (1ULL << 21) | (1ULL << 28) | - (1ULL << 35) | (1ULL << 42) | (1ULL << 49)); - unpacked |= (bits_next & 1); - unpacked &= 0x0101010101010101ULL; - unpacked *= 255; - SafeStoreUpTo8Bytes(bytes + num_bits - tail, tail, unpacked); - } -} - -void bit_util::bytes_to_bits(int64_t hardware_flags, const int num_bits, - const uint8_t* bytes, uint8_t* bits, int bit_offset) { - bits += bit_offset / 8; - bit_offset %= 8; - if (bit_offset != 0) { - uint64_t bits_head; - int bits_in_first_byte = std::min(num_bits, 8 - bit_offset); - bytes_to_bits(hardware_flags, bits_in_first_byte, bytes, - reinterpret_cast(&bits_head)); - uint8_t mask = (1 << bit_offset) - 1; - *bits = static_cast((*bits & mask) | (bits_head << bit_offset)); - - if (num_bits > bits_in_first_byte) { - bytes_to_bits(hardware_flags, num_bits - bits_in_first_byte, - bytes + bits_in_first_byte, bits + 1); - } - return; - } - - int num_processed = 0; -#if defined(ARROW_HAVE_AVX2) - if (hardware_flags & arrow::internal::CpuInfo::AVX2) { - // The function call below processes whole 32 bit chunks together. - num_processed = num_bits - (num_bits % 32); - bytes_to_bits_avx2(num_processed, bytes, bits); - } -#endif - // Process 8 bits at a time - constexpr int unroll = 8; - for (int i = num_processed / unroll; i < num_bits / unroll; ++i) { - uint64_t bytes_next = util::SafeLoad(&reinterpret_cast(bytes)[i]); - bytes_next &= 0x0101010101010101ULL; - bytes_next |= (bytes_next >> 7); // Pairs of adjacent output bits in individual bytes - bytes_next |= (bytes_next >> 14); // 4 adjacent output bits in individual bytes - bytes_next |= (bytes_next >> 28); // All 8 output bits in the lowest byte - bits[i] = static_cast(bytes_next & 0xff); - } - int tail = num_bits % unroll; - if (tail) { - uint64_t bytes_next = SafeLoadUpTo8Bytes(bytes + num_bits - tail, tail); - bytes_next &= 0x0101010101010101ULL; - bytes_next |= (bytes_next >> 7); // Pairs of adjacent output bits in individual bytes - bytes_next |= (bytes_next >> 14); // 4 adjacent output bits in individual bytes - bytes_next |= (bytes_next >> 28); // All 8 output bits in the lowest byte - bits[num_bits / 8] = static_cast(bytes_next & 0xff); - } -} - -bool bit_util::are_all_bytes_zero(int64_t hardware_flags, const uint8_t* bytes, - uint32_t num_bytes) { -#if defined(ARROW_HAVE_AVX2) - if (hardware_flags & arrow::internal::CpuInfo::AVX2) { - return are_all_bytes_zero_avx2(bytes, num_bytes); - } -#endif - uint64_t result_or = 0; - uint32_t i; - for (i = 0; i < num_bytes / 8; ++i) { - uint64_t x = util::SafeLoad(&reinterpret_cast(bytes)[i]); - result_or |= x; - } - if (num_bytes % 8 > 0) { - uint64_t tail = 0; - result_or |= memcmp(bytes + i * 8, &tail, num_bytes % 8); - } - return result_or == 0; -} - -} // namespace util - namespace compute { Status ValidateExecNodeInputs(ExecPlan* plan, const std::vector& inputs, diff --git a/cpp/src/arrow/compute/exec/util.h b/cpp/src/arrow/compute/exec/util.h index a2018277cdc..8e5001d99c1 100644 --- a/cpp/src/arrow/compute/exec/util.h +++ b/cpp/src/arrow/compute/exec/util.h @@ -28,6 +28,7 @@ #include "arrow/compute/exec/expression.h" #include "arrow/compute/exec/options.h" #include "arrow/compute/type_fwd.h" +#include "arrow/compute/util.h" #include "arrow/memory_pool.h" #include "arrow/result.h" #include "arrow/status.h" @@ -38,200 +39,7 @@ #include "arrow/util/thread_pool.h" #include "arrow/util/type_fwd.h" -#if defined(__clang__) || defined(__GNUC__) -#define BYTESWAP(x) __builtin_bswap64(x) -#define ROTL(x, n) (((x) << (n)) | ((x) >> ((-n) & 31))) -#define ROTL64(x, n) (((x) << (n)) | ((x) >> ((-n) & 63))) -#define PREFETCH(ptr) __builtin_prefetch((ptr), 0 /* rw==read */, 3 /* locality */) -#elif defined(_MSC_VER) -#include -#define BYTESWAP(x) _byteswap_uint64(x) -#define ROTL(x, n) _rotl((x), (n)) -#define ROTL64(x, n) _rotl64((x), (n)) -#if defined(_M_X64) || defined(_M_I86) -#include // https://msdn.microsoft.com/fr-fr/library/84szxsww(v=vs.90).aspx -#define PREFETCH(ptr) _mm_prefetch((const char*)(ptr), _MM_HINT_T0) -#else -#define PREFETCH(ptr) (void)(ptr) /* disabled */ -#endif -#endif - namespace arrow { -namespace util { - -template -inline void CheckAlignment(const void* ptr) { - ARROW_DCHECK(reinterpret_cast(ptr) % sizeof(T) == 0); -} - -// Some platforms typedef int64_t as long int instead of long long int, -// which breaks the _mm256_i64gather_epi64 and _mm256_i32gather_epi64 intrinsics -// which need long long. -// We use the cast to the type below in these intrinsics to make the code -// compile in all cases. -// -using int64_for_gather_t = const long long int; // NOLINT runtime-int - -// All MiniBatch... classes use TempVectorStack for vector allocations and can -// only work with vectors up to 1024 elements. -// -// They should only be allocated on the stack to guarantee the right sequence -// of allocation and deallocation of vectors from TempVectorStack. -// -class MiniBatch { - public: - static constexpr int kLogMiniBatchLength = 10; - static constexpr int kMiniBatchLength = 1 << kLogMiniBatchLength; -}; - -/// Storage used to allocate temporary vectors of a batch size. -/// Temporary vectors should resemble allocating temporary variables on the stack -/// but in the context of vectorized processing where we need to store a vector of -/// temporaries instead of a single value. -class TempVectorStack { - template - friend class TempVectorHolder; - - public: - Status Init(MemoryPool* pool, int64_t size) { - num_vectors_ = 0; - top_ = 0; - buffer_size_ = PaddedAllocationSize(size) + kPadding + 2 * sizeof(uint64_t); - ARROW_ASSIGN_OR_RAISE(auto buffer, AllocateResizableBuffer(size, pool)); - // Ensure later operations don't accidentally read uninitialized memory. - std::memset(buffer->mutable_data(), 0xFF, size); - buffer_ = std::move(buffer); - return Status::OK(); - } - - private: - int64_t PaddedAllocationSize(int64_t num_bytes) { - // Round up allocation size to multiple of 8 bytes - // to avoid returning temp vectors with unaligned address. - // - // Also add padding at the end to facilitate loads and stores - // using SIMD when number of vector elements is not divisible - // by the number of SIMD lanes. - // - return ::arrow::bit_util::RoundUp(num_bytes, sizeof(int64_t)) + kPadding; - } - void alloc(uint32_t num_bytes, uint8_t** data, int* id) { - int64_t old_top = top_; - top_ += PaddedAllocationSize(num_bytes) + 2 * sizeof(uint64_t); - // Stack overflow check - ARROW_DCHECK(top_ <= buffer_size_); - *data = buffer_->mutable_data() + old_top + sizeof(uint64_t); - // We set 8 bytes before the beginning of the allocated range and - // 8 bytes after the end to check for stack overflow (which would - // result in those known bytes being corrupted). - reinterpret_cast(buffer_->mutable_data() + old_top)[0] = kGuard1; - reinterpret_cast(buffer_->mutable_data() + top_)[-1] = kGuard2; - *id = num_vectors_++; - } - void release(int id, uint32_t num_bytes) { - ARROW_DCHECK(num_vectors_ == id + 1); - int64_t size = PaddedAllocationSize(num_bytes) + 2 * sizeof(uint64_t); - ARROW_DCHECK(reinterpret_cast(buffer_->mutable_data() + top_)[-1] == - kGuard2); - ARROW_DCHECK(top_ >= size); - top_ -= size; - ARROW_DCHECK(reinterpret_cast(buffer_->mutable_data() + top_)[0] == - kGuard1); - --num_vectors_; - } - static constexpr uint64_t kGuard1 = 0x3141592653589793ULL; - static constexpr uint64_t kGuard2 = 0x0577215664901532ULL; - static constexpr int64_t kPadding = 64; - int num_vectors_; - int64_t top_; - std::unique_ptr buffer_; - int64_t buffer_size_; -}; - -template -class TempVectorHolder { - friend class TempVectorStack; - - public: - ~TempVectorHolder() { stack_->release(id_, num_elements_ * sizeof(T)); } - T* mutable_data() { return reinterpret_cast(data_); } - TempVectorHolder(TempVectorStack* stack, uint32_t num_elements) { - stack_ = stack; - num_elements_ = num_elements; - stack_->alloc(num_elements * sizeof(T), &data_, &id_); - } - - private: - TempVectorStack* stack_; - uint8_t* data_; - int id_; - uint32_t num_elements_; -}; - -class bit_util { - public: - static void bits_to_indexes(int bit_to_search, int64_t hardware_flags, - const int num_bits, const uint8_t* bits, int* num_indexes, - uint16_t* indexes, int bit_offset = 0); - - static void bits_filter_indexes(int bit_to_search, int64_t hardware_flags, - const int num_bits, const uint8_t* bits, - const uint16_t* input_indexes, int* num_indexes, - uint16_t* indexes, int bit_offset = 0); - - // Input and output indexes may be pointing to the same data (in-place filtering). - static void bits_split_indexes(int64_t hardware_flags, const int num_bits, - const uint8_t* bits, int* num_indexes_bit0, - uint16_t* indexes_bit0, uint16_t* indexes_bit1, - int bit_offset = 0); - - // Bit 1 is replaced with byte 0xFF. - static void bits_to_bytes(int64_t hardware_flags, const int num_bits, - const uint8_t* bits, uint8_t* bytes, int bit_offset = 0); - - // Return highest bit of each byte. - static void bytes_to_bits(int64_t hardware_flags, const int num_bits, - const uint8_t* bytes, uint8_t* bits, int bit_offset = 0); - - static bool are_all_bytes_zero(int64_t hardware_flags, const uint8_t* bytes, - uint32_t num_bytes); - - private: - inline static uint64_t SafeLoadUpTo8Bytes(const uint8_t* bytes, int num_bytes); - inline static void SafeStoreUpTo8Bytes(uint8_t* bytes, int num_bytes, uint64_t value); - inline static void bits_to_indexes_helper(uint64_t word, uint16_t base_index, - int* num_indexes, uint16_t* indexes); - inline static void bits_filter_indexes_helper(uint64_t word, - const uint16_t* input_indexes, - int* num_indexes, uint16_t* indexes); - template - static void bits_to_indexes_internal(int64_t hardware_flags, const int num_bits, - const uint8_t* bits, const uint16_t* input_indexes, - int* num_indexes, uint16_t* indexes, - uint16_t base_index = 0); - -#if defined(ARROW_HAVE_AVX2) - static void bits_to_indexes_avx2(int bit_to_search, const int num_bits, - const uint8_t* bits, int* num_indexes, - uint16_t* indexes, uint16_t base_index = 0); - static void bits_filter_indexes_avx2(int bit_to_search, const int num_bits, - const uint8_t* bits, const uint16_t* input_indexes, - int* num_indexes, uint16_t* indexes); - template - static void bits_to_indexes_imp_avx2(const int num_bits, const uint8_t* bits, - int* num_indexes, uint16_t* indexes, - uint16_t base_index = 0); - template - static void bits_filter_indexes_imp_avx2(const int num_bits, const uint8_t* bits, - const uint16_t* input_indexes, - int* num_indexes, uint16_t* indexes); - static void bits_to_bytes_avx2(const int num_bits, const uint8_t* bits, uint8_t* bytes); - static void bytes_to_bits_avx2(const int num_bits, const uint8_t* bytes, uint8_t* bits); - static bool are_all_bytes_zero_avx2(const uint8_t* bytes, uint32_t num_bytes); -#endif -}; - -} // namespace util namespace compute { ARROW_EXPORT diff --git a/cpp/src/arrow/compute/kernels/codegen_internal.h b/cpp/src/arrow/compute/kernels/codegen_internal.h index b05a47b600c..3dd1f2b8112 100644 --- a/cpp/src/arrow/compute/kernels/codegen_internal.h +++ b/cpp/src/arrow/compute/kernels/codegen_internal.h @@ -30,7 +30,6 @@ #include "arrow/array/data.h" #include "arrow/buffer.h" #include "arrow/buffer_builder.h" -#include "arrow/compute/exec.h" #include "arrow/compute/kernel.h" #include "arrow/datum.h" #include "arrow/result.h" diff --git a/cpp/src/arrow/compute/kernels/common_internal.h b/cpp/src/arrow/compute/kernels/common_internal.h index bf90d114512..744aee23795 100644 --- a/cpp/src/arrow/compute/kernels/common_internal.h +++ b/cpp/src/arrow/compute/kernels/common_internal.h @@ -30,7 +30,7 @@ #include "arrow/array/data.h" #include "arrow/buffer.h" #include "arrow/chunked_array.h" -#include "arrow/compute/exec.h" + #include "arrow/compute/function.h" #include "arrow/compute/kernel.h" #include "arrow/compute/kernels/codegen_internal.h" diff --git a/cpp/src/arrow/compute/kernels/hash_aggregate.cc b/cpp/src/arrow/compute/kernels/hash_aggregate.cc index c0459a14859..380dde016ef 100644 --- a/cpp/src/arrow/compute/kernels/hash_aggregate.cc +++ b/cpp/src/arrow/compute/kernels/hash_aggregate.cc @@ -29,10 +29,6 @@ #include "arrow/buffer_builder.h" #include "arrow/compute/api_aggregate.h" #include "arrow/compute/api_vector.h" -#include "arrow/compute/exec/key_hash.h" -#include "arrow/compute/exec/key_map.h" -#include "arrow/compute/exec/util.h" -#include "arrow/compute/exec_internal.h" #include "arrow/compute/kernel.h" #include "arrow/compute/kernels/aggregate_internal.h" #include "arrow/compute/kernels/aggregate_var_std_internal.h" diff --git a/cpp/src/arrow/compute/kernels/row_encoder_internal.h b/cpp/src/arrow/compute/kernels/row_encoder_internal.h index 5fe80e0f506..9bf7c1d1c4f 100644 --- a/cpp/src/arrow/compute/kernels/row_encoder_internal.h +++ b/cpp/src/arrow/compute/kernels/row_encoder_internal.h @@ -19,7 +19,6 @@ #include -#include "arrow/compute/exec.h" #include "arrow/compute/kernels/codegen_internal.h" #include "arrow/visit_data_inline.h" diff --git a/cpp/src/arrow/compute/exec/key_hash.cc b/cpp/src/arrow/compute/key_hash.cc similarity index 99% rename from cpp/src/arrow/compute/exec/key_hash.cc rename to cpp/src/arrow/compute/key_hash.cc index 5ff0d4cf1e5..3fcfbf3d831 100644 --- a/cpp/src/arrow/compute/exec/key_hash.cc +++ b/cpp/src/arrow/compute/key_hash.cc @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -#include "arrow/compute/exec/key_hash.h" +#include "arrow/compute/key_hash.h" #include diff --git a/cpp/src/arrow/compute/exec/key_hash.h b/cpp/src/arrow/compute/key_hash.h similarity index 99% rename from cpp/src/arrow/compute/exec/key_hash.h rename to cpp/src/arrow/compute/key_hash.h index 68197973e02..ddf86dfcdc0 100644 --- a/cpp/src/arrow/compute/exec/key_hash.h +++ b/cpp/src/arrow/compute/key_hash.h @@ -23,8 +23,8 @@ #include -#include "arrow/compute/exec/util.h" #include "arrow/compute/light_array.h" +#include "arrow/compute/util.h" namespace arrow { namespace compute { diff --git a/cpp/src/arrow/compute/exec/key_hash_avx2.cc b/cpp/src/arrow/compute/key_hash_avx2.cc similarity index 99% rename from cpp/src/arrow/compute/exec/key_hash_avx2.cc rename to cpp/src/arrow/compute/key_hash_avx2.cc index d36df9fc9f3..f30c3460bda 100644 --- a/cpp/src/arrow/compute/exec/key_hash_avx2.cc +++ b/cpp/src/arrow/compute/key_hash_avx2.cc @@ -17,7 +17,7 @@ #include -#include "arrow/compute/exec/key_hash.h" +#include "arrow/compute/key_hash.h" #include "arrow/util/bit_util.h" namespace arrow { diff --git a/cpp/src/arrow/compute/exec/key_hash_test.cc b/cpp/src/arrow/compute/key_hash_test.cc similarity index 99% rename from cpp/src/arrow/compute/exec/key_hash_test.cc rename to cpp/src/arrow/compute/key_hash_test.cc index 47f0f34560e..d030e622641 100644 --- a/cpp/src/arrow/compute/exec/key_hash_test.cc +++ b/cpp/src/arrow/compute/key_hash_test.cc @@ -21,9 +21,9 @@ #include #include #include "arrow/array/builder_binary.h" -#include "arrow/compute/exec/key_hash.h" #include "arrow/compute/exec/test_util.h" #include "arrow/compute/exec/util.h" +#include "arrow/compute/key_hash.h" #include "arrow/util/cpu_info.h" #include "arrow/util/pcg_random.h" diff --git a/cpp/src/arrow/compute/exec/key_map.cc b/cpp/src/arrow/compute/key_map.cc similarity index 99% rename from cpp/src/arrow/compute/exec/key_map.cc rename to cpp/src/arrow/compute/key_map.cc index a61184e4ca9..ebbf8a7b828 100644 --- a/cpp/src/arrow/compute/exec/key_map.cc +++ b/cpp/src/arrow/compute/key_map.cc @@ -15,9 +15,7 @@ // specific language governing permissions and limitations // under the License. -#include "arrow/compute/exec/key_map.h" - -#include +#include "arrow/compute/key_map.h" #include #include diff --git a/cpp/src/arrow/compute/exec/key_map.h b/cpp/src/arrow/compute/key_map.h similarity index 99% rename from cpp/src/arrow/compute/exec/key_map.h rename to cpp/src/arrow/compute/key_map.h index cc630e0b1c3..4702c5ecc8b 100644 --- a/cpp/src/arrow/compute/exec/key_map.h +++ b/cpp/src/arrow/compute/key_map.h @@ -19,7 +19,8 @@ #include -#include "arrow/compute/exec/util.h" +#include "arrow/compute/util.h" +#include "arrow/compute/util_internal.h" #include "arrow/memory_pool.h" #include "arrow/result.h" #include "arrow/status.h" diff --git a/cpp/src/arrow/compute/exec/key_map_avx2.cc b/cpp/src/arrow/compute/key_map_avx2.cc similarity index 99% rename from cpp/src/arrow/compute/exec/key_map_avx2.cc rename to cpp/src/arrow/compute/key_map_avx2.cc index 4c77f3af237..102e28eca41 100644 --- a/cpp/src/arrow/compute/exec/key_map_avx2.cc +++ b/cpp/src/arrow/compute/key_map_avx2.cc @@ -17,7 +17,7 @@ #include -#include "arrow/compute/exec/key_map.h" +#include "arrow/compute/key_map.h" namespace arrow { namespace compute { diff --git a/cpp/src/arrow/compute/light_array.h b/cpp/src/arrow/compute/light_array.h index 389b63cca41..d617b0aa064 100644 --- a/cpp/src/arrow/compute/light_array.h +++ b/cpp/src/arrow/compute/light_array.h @@ -21,7 +21,7 @@ #include "arrow/array.h" #include "arrow/compute/exec.h" -#include "arrow/compute/exec/util.h" +#include "arrow/compute/util.h" #include "arrow/type.h" #include "arrow/util/cpu_info.h" #include "arrow/util/logging.h" diff --git a/cpp/src/arrow/compute/light_array_test.cc b/cpp/src/arrow/compute/light_array_test.cc index dcc7841a091..c5e83b546b6 100644 --- a/cpp/src/arrow/compute/light_array_test.cc +++ b/cpp/src/arrow/compute/light_array_test.cc @@ -20,7 +20,6 @@ #include #include -#include "arrow/compute/exec/test_util.h" #include "arrow/testing/generator.h" #include "arrow/testing/gtest_util.h" #include "arrow/type.h" @@ -216,27 +215,6 @@ TEST(KeyColumnArray, SliceBool) { } } -TEST(KeyColumnArray, FromExecBatch) { - ExecBatch batch = - ExecBatchFromJSON({int64(), boolean()}, "[[1, true], [2, false], [null, null]]"); - std::vector arrays; - ASSERT_OK(ColumnArraysFromExecBatch(batch, &arrays)); - - ASSERT_EQ(2, arrays.size()); - ASSERT_EQ(8, arrays[0].metadata().fixed_length); - ASSERT_EQ(0, arrays[1].metadata().fixed_length); - ASSERT_EQ(3, arrays[0].length()); - ASSERT_EQ(3, arrays[1].length()); - - ASSERT_OK(ColumnArraysFromExecBatch(batch, 1, 1, &arrays)); - - ASSERT_EQ(2, arrays.size()); - ASSERT_EQ(8, arrays[0].metadata().fixed_length); - ASSERT_EQ(0, arrays[1].metadata().fixed_length); - ASSERT_EQ(1, arrays[0].length()); - ASSERT_EQ(1, arrays[1].length()); -} - TEST(ResizableArrayData, Basic) { std::unique_ptr pool = MemoryPool::CreateDefault(); for (const auto& type : kSampleFixedDataTypes) { @@ -315,126 +293,6 @@ TEST(ResizableArrayData, Binary) { } } -TEST(ExecBatchBuilder, AppendBatches) { - std::unique_ptr owned_pool = MemoryPool::CreateDefault(); - MemoryPool* pool = owned_pool.get(); - ExecBatch batch_one = - ExecBatchFromJSON({int64(), boolean()}, "[[1, true], [2, false], [null, null]]"); - ExecBatch batch_two = - ExecBatchFromJSON({int64(), boolean()}, "[[null, true], [5, true], [6, false]]"); - ExecBatch combined = ExecBatchFromJSON( - {int64(), boolean()}, - "[[1, true], [2, false], [null, null], [null, true], [5, true], [6, false]]"); - { - ExecBatchBuilder builder; - uint16_t row_ids[3] = {0, 1, 2}; - ASSERT_OK(builder.AppendSelected(pool, batch_one, 3, row_ids, /*num_cols=*/2)); - ASSERT_OK(builder.AppendSelected(pool, batch_two, 3, row_ids, /*num_cols=*/2)); - ExecBatch built = builder.Flush(); - ASSERT_EQ(combined, built); - ASSERT_NE(0, pool->bytes_allocated()); - } - ASSERT_EQ(0, pool->bytes_allocated()); -} - -TEST(ExecBatchBuilder, AppendBatchesSomeRows) { - std::unique_ptr owned_pool = MemoryPool::CreateDefault(); - MemoryPool* pool = owned_pool.get(); - ExecBatch batch_one = - ExecBatchFromJSON({int64(), boolean()}, "[[1, true], [2, false], [null, null]]"); - ExecBatch batch_two = - ExecBatchFromJSON({int64(), boolean()}, "[[null, true], [5, true], [6, false]]"); - ExecBatch combined = ExecBatchFromJSON( - {int64(), boolean()}, "[[1, true], [2, false], [null, true], [5, true]]"); - { - ExecBatchBuilder builder; - uint16_t row_ids[2] = {0, 1}; - ASSERT_OK(builder.AppendSelected(pool, batch_one, 2, row_ids, /*num_cols=*/2)); - ASSERT_OK(builder.AppendSelected(pool, batch_two, 2, row_ids, /*num_cols=*/2)); - ExecBatch built = builder.Flush(); - ASSERT_EQ(combined, built); - ASSERT_NE(0, pool->bytes_allocated()); - } - ASSERT_EQ(0, pool->bytes_allocated()); -} - -TEST(ExecBatchBuilder, AppendBatchesSomeCols) { - std::unique_ptr owned_pool = MemoryPool::CreateDefault(); - MemoryPool* pool = owned_pool.get(); - ExecBatch batch_one = - ExecBatchFromJSON({int64(), boolean()}, "[[1, true], [2, false], [null, null]]"); - ExecBatch batch_two = - ExecBatchFromJSON({int64(), boolean()}, "[[null, true], [5, true], [6, false]]"); - ExecBatch first_col_only = - ExecBatchFromJSON({int64()}, "[[1], [2], [null], [null], [5], [6]]"); - ExecBatch last_col_only = ExecBatchFromJSON( - {boolean()}, "[[true], [false], [null], [true], [true], [false]]"); - { - ExecBatchBuilder builder; - uint16_t row_ids[3] = {0, 1, 2}; - int first_col_ids[1] = {0}; - ASSERT_OK(builder.AppendSelected(pool, batch_one, 3, row_ids, /*num_cols=*/1, - first_col_ids)); - ASSERT_OK(builder.AppendSelected(pool, batch_two, 3, row_ids, /*num_cols=*/1, - first_col_ids)); - ExecBatch built = builder.Flush(); - ASSERT_EQ(first_col_only, built); - ASSERT_NE(0, pool->bytes_allocated()); - } - { - ExecBatchBuilder builder; - uint16_t row_ids[3] = {0, 1, 2}; - // If we don't specify col_ids and num_cols is 1 it is implicitly the first col - ASSERT_OK(builder.AppendSelected(pool, batch_one, 3, row_ids, /*num_cols=*/1)); - ASSERT_OK(builder.AppendSelected(pool, batch_two, 3, row_ids, /*num_cols=*/1)); - ExecBatch built = builder.Flush(); - ASSERT_EQ(first_col_only, built); - ASSERT_NE(0, pool->bytes_allocated()); - } - { - ExecBatchBuilder builder; - uint16_t row_ids[3] = {0, 1, 2}; - int last_col_ids[1] = {1}; - ASSERT_OK(builder.AppendSelected(pool, batch_one, 3, row_ids, /*num_cols=*/1, - last_col_ids)); - ASSERT_OK(builder.AppendSelected(pool, batch_two, 3, row_ids, /*num_cols=*/1, - last_col_ids)); - ExecBatch built = builder.Flush(); - ASSERT_EQ(last_col_only, built); - ASSERT_NE(0, pool->bytes_allocated()); - } - ASSERT_EQ(0, pool->bytes_allocated()); -} - -TEST(ExecBatchBuilder, AppendNulls) { - std::unique_ptr owned_pool = MemoryPool::CreateDefault(); - MemoryPool* pool = owned_pool.get(); - ExecBatch batch_one = - ExecBatchFromJSON({int64(), boolean()}, "[[1, true], [2, false], [null, null]]"); - ExecBatch combined = ExecBatchFromJSON( - {int64(), boolean()}, - "[[1, true], [2, false], [null, null], [null, null], [null, null]]"); - ExecBatch just_nulls = - ExecBatchFromJSON({int64(), boolean()}, "[[null, null], [null, null]]"); - { - ExecBatchBuilder builder; - uint16_t row_ids[3] = {0, 1, 2}; - ASSERT_OK(builder.AppendSelected(pool, batch_one, 3, row_ids, /*num_cols=*/2)); - ASSERT_OK(builder.AppendNulls(pool, {int64(), boolean()}, 2)); - ExecBatch built = builder.Flush(); - ASSERT_EQ(combined, built); - ASSERT_NE(0, pool->bytes_allocated()); - } - { - ExecBatchBuilder builder; - ASSERT_OK(builder.AppendNulls(pool, {int64(), boolean()}, 2)); - ExecBatch built = builder.Flush(); - ASSERT_EQ(just_nulls, built); - ASSERT_NE(0, pool->bytes_allocated()); - } - ASSERT_EQ(0, pool->bytes_allocated()); -} - TEST(ExecBatchBuilder, AppendNullsBeyondLimit) { std::unique_ptr owned_pool = MemoryPool::CreateDefault(); int num_rows_max = ExecBatchBuilder::num_rows_max(); diff --git a/cpp/src/arrow/compute/row/compare_internal.cc b/cpp/src/arrow/compute/row/compare_internal.cc index 750012e60e2..39ac33932b5 100644 --- a/cpp/src/arrow/compute/row/compare_internal.cc +++ b/cpp/src/arrow/compute/row/compare_internal.cc @@ -22,7 +22,8 @@ #include #include -#include "arrow/compute/exec/util.h" +#include "arrow/compute/util.h" +#include "arrow/compute/util_internal.h" #include "arrow/util/bit_util.h" #include "arrow/util/ubsan.h" diff --git a/cpp/src/arrow/compute/row/compare_internal.h b/cpp/src/arrow/compute/row/compare_internal.h index f9ec1e7f535..778485e5c46 100644 --- a/cpp/src/arrow/compute/row/compare_internal.h +++ b/cpp/src/arrow/compute/row/compare_internal.h @@ -19,10 +19,10 @@ #include -#include "arrow/compute/exec/util.h" #include "arrow/compute/light_array.h" #include "arrow/compute/row/encode_internal.h" #include "arrow/compute/row/row_internal.h" +#include "arrow/compute/util.h" #include "arrow/memory_pool.h" #include "arrow/result.h" #include "arrow/status.h" diff --git a/cpp/src/arrow/compute/row/encode_internal.cc b/cpp/src/arrow/compute/row/encode_internal.cc index 9d138258d66..3a6a85b0272 100644 --- a/cpp/src/arrow/compute/row/encode_internal.cc +++ b/cpp/src/arrow/compute/row/encode_internal.cc @@ -16,7 +16,6 @@ // under the License. #include "arrow/compute/row/encode_internal.h" -#include "arrow/compute/exec.h" #include "arrow/util/checked_cast.h" namespace arrow { diff --git a/cpp/src/arrow/compute/row/encode_internal.h b/cpp/src/arrow/compute/row/encode_internal.h index 970537a3067..bdf38df4fc3 100644 --- a/cpp/src/arrow/compute/row/encode_internal.h +++ b/cpp/src/arrow/compute/row/encode_internal.h @@ -22,10 +22,10 @@ #include #include "arrow/array/data.h" -#include "arrow/compute/exec.h" -#include "arrow/compute/exec/util.h" +#include "arrow/compute/key_map.h" #include "arrow/compute/light_array.h" #include "arrow/compute/row/row_internal.h" +#include "arrow/compute/util.h" #include "arrow/memory_pool.h" #include "arrow/result.h" #include "arrow/status.h" diff --git a/cpp/src/arrow/compute/row/grouper.cc b/cpp/src/arrow/compute/row/grouper.cc index d003137d3e5..ca26600c98b 100644 --- a/cpp/src/arrow/compute/row/grouper.cc +++ b/cpp/src/arrow/compute/row/grouper.cc @@ -20,12 +20,10 @@ #include #include -#include "arrow/compute/exec/key_hash.h" -#include "arrow/compute/exec/key_map.h" -#include "arrow/compute/exec/options.h" -#include "arrow/compute/exec_internal.h" +#include "arrow/compute/api_vector.h" #include "arrow/compute/function.h" #include "arrow/compute/kernels/row_encoder_internal.h" +#include "arrow/compute/key_hash.h" #include "arrow/compute/light_array.h" #include "arrow/compute/registry.h" #include "arrow/compute/row/compare_internal.h" diff --git a/cpp/src/arrow/compute/row/grouper.h b/cpp/src/arrow/compute/row/grouper.h index ce09adf09b3..94c591687da 100644 --- a/cpp/src/arrow/compute/row/grouper.h +++ b/cpp/src/arrow/compute/row/grouper.h @@ -20,8 +20,6 @@ #include #include -#include "arrow/compute/exec.h" -#include "arrow/compute/exec/options.h" #include "arrow/compute/kernel.h" #include "arrow/datum.h" #include "arrow/result.h" diff --git a/cpp/src/arrow/compute/row/row_internal.cc b/cpp/src/arrow/compute/row/row_internal.cc index 11a8a0bc436..f6a62c09fcf 100644 --- a/cpp/src/arrow/compute/row/row_internal.cc +++ b/cpp/src/arrow/compute/row/row_internal.cc @@ -17,7 +17,7 @@ #include "arrow/compute/row/row_internal.h" -#include "arrow/compute/exec/util.h" +#include "arrow/compute/util.h" namespace arrow { namespace compute { diff --git a/cpp/src/arrow/compute/util.cc b/cpp/src/arrow/compute/util.cc new file mode 100644 index 00000000000..78f90ea37f7 --- /dev/null +++ b/cpp/src/arrow/compute/util.cc @@ -0,0 +1,363 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/compute/util.h" + +#include "arrow/table.h" +#include "arrow/util/bit_util.h" +#include "arrow/util/bitmap_ops.h" +#include "arrow/util/logging.h" +#include "arrow/util/tracing_internal.h" +#include "arrow/util/ubsan.h" + +namespace arrow { + +using bit_util::CountTrailingZeros; + +namespace util { + +void TempVectorStack::alloc(uint32_t num_bytes, uint8_t** data, int* id) { + int64_t old_top = top_; + top_ += PaddedAllocationSize(num_bytes) + 2 * sizeof(uint64_t); + // Stack overflow check + ARROW_DCHECK(top_ <= buffer_size_); + *data = buffer_->mutable_data() + old_top + sizeof(uint64_t); + // We set 8 bytes before the beginning of the allocated range and + // 8 bytes after the end to check for stack overflow (which would + // result in those known bytes being corrupted). + reinterpret_cast(buffer_->mutable_data() + old_top)[0] = kGuard1; + reinterpret_cast(buffer_->mutable_data() + top_)[-1] = kGuard2; + *id = num_vectors_++; +} + +void TempVectorStack::release(int id, uint32_t num_bytes) { + ARROW_DCHECK(num_vectors_ == id + 1); + int64_t size = PaddedAllocationSize(num_bytes) + 2 * sizeof(uint64_t); + ARROW_DCHECK(reinterpret_cast(buffer_->mutable_data() + top_)[-1] == + kGuard2); + ARROW_DCHECK(top_ >= size); + top_ -= size; + ARROW_DCHECK(reinterpret_cast(buffer_->mutable_data() + top_)[0] == + kGuard1); + --num_vectors_; +} + +inline uint64_t bit_util::SafeLoadUpTo8Bytes(const uint8_t* bytes, int num_bytes) { + // This will not be correct on big-endian architectures. +#if !ARROW_LITTLE_ENDIAN + ARROW_DCHECK(false); +#endif + ARROW_DCHECK(num_bytes >= 0 && num_bytes <= 8); + if (num_bytes == 8) { + return util::SafeLoad(reinterpret_cast(bytes)); + } else { + uint64_t word = 0; + for (int i = 0; i < num_bytes; ++i) { + word |= static_cast(bytes[i]) << (8 * i); + } + return word; + } +} + +inline void bit_util::SafeStoreUpTo8Bytes(uint8_t* bytes, int num_bytes, uint64_t value) { + // This will not be correct on big-endian architectures. +#if !ARROW_LITTLE_ENDIAN + ARROW_DCHECK(false); +#endif + ARROW_DCHECK(num_bytes >= 0 && num_bytes <= 8); + if (num_bytes == 8) { + util::SafeStore(reinterpret_cast(bytes), value); + } else { + for (int i = 0; i < num_bytes; ++i) { + bytes[i] = static_cast(value >> (8 * i)); + } + } +} + +inline void bit_util::bits_to_indexes_helper(uint64_t word, uint16_t base_index, + int* num_indexes, uint16_t* indexes) { + int n = *num_indexes; + while (word) { + indexes[n++] = base_index + static_cast(CountTrailingZeros(word)); + word &= word - 1; + } + *num_indexes = n; +} + +inline void bit_util::bits_filter_indexes_helper(uint64_t word, + const uint16_t* input_indexes, + int* num_indexes, uint16_t* indexes) { + int n = *num_indexes; + while (word) { + indexes[n++] = input_indexes[CountTrailingZeros(word)]; + word &= word - 1; + } + *num_indexes = n; +} + +template +void bit_util::bits_to_indexes_internal(int64_t hardware_flags, const int num_bits, + const uint8_t* bits, + const uint16_t* input_indexes, int* num_indexes, + uint16_t* indexes, uint16_t base_index) { + // 64 bits at a time + constexpr int unroll = 64; + int tail = num_bits % unroll; +#if defined(ARROW_HAVE_AVX2) + if (hardware_flags & arrow::internal::CpuInfo::AVX2) { + if (filter_input_indexes) { + bits_filter_indexes_avx2(bit_to_search, num_bits - tail, bits, input_indexes, + num_indexes, indexes); + } else { + bits_to_indexes_avx2(bit_to_search, num_bits - tail, bits, num_indexes, indexes, + base_index); + } + } else { +#endif + *num_indexes = 0; + for (int i = 0; i < num_bits / unroll; ++i) { + uint64_t word = util::SafeLoad(&reinterpret_cast(bits)[i]); + if (bit_to_search == 0) { + word = ~word; + } + if (filter_input_indexes) { + bits_filter_indexes_helper(word, input_indexes + i * 64, num_indexes, indexes); + } else { + bits_to_indexes_helper(word, i * 64 + base_index, num_indexes, indexes); + } + } +#if defined(ARROW_HAVE_AVX2) + } +#endif + // Optionally process the last partial word with masking out bits outside range + if (tail) { + const uint8_t* bits_tail = bits + (num_bits - tail) / 8; + uint64_t word = SafeLoadUpTo8Bytes(bits_tail, (tail + 7) / 8); + if (bit_to_search == 0) { + word = ~word; + } + word &= ~0ULL >> (64 - tail); + if (filter_input_indexes) { + bits_filter_indexes_helper(word, input_indexes + num_bits - tail, num_indexes, + indexes); + } else { + bits_to_indexes_helper(word, num_bits - tail + base_index, num_indexes, indexes); + } + } +} + +void bit_util::bits_to_indexes(int bit_to_search, int64_t hardware_flags, int num_bits, + const uint8_t* bits, int* num_indexes, uint16_t* indexes, + int bit_offset) { + bits += bit_offset / 8; + bit_offset %= 8; + *num_indexes = 0; + uint16_t base_index = 0; + if (bit_offset != 0) { + uint64_t bits_head = bits[0] >> bit_offset; + int bits_in_first_byte = std::min(num_bits, 8 - bit_offset); + bits_to_indexes(bit_to_search, hardware_flags, bits_in_first_byte, + reinterpret_cast(&bits_head), num_indexes, indexes); + if (num_bits <= bits_in_first_byte) { + return; + } + num_bits -= bits_in_first_byte; + indexes += *num_indexes; + bits += 1; + base_index = bits_in_first_byte; + } + + int num_indexes_new = 0; + if (bit_to_search == 0) { + bits_to_indexes_internal<0, false>(hardware_flags, num_bits, bits, nullptr, + &num_indexes_new, indexes, base_index); + } else { + ARROW_DCHECK(bit_to_search == 1); + bits_to_indexes_internal<1, false>(hardware_flags, num_bits, bits, nullptr, + &num_indexes_new, indexes, base_index); + } + *num_indexes += num_indexes_new; +} + +void bit_util::bits_filter_indexes(int bit_to_search, int64_t hardware_flags, + const int num_bits, const uint8_t* bits, + const uint16_t* input_indexes, int* num_indexes, + uint16_t* indexes, int bit_offset) { + bits += bit_offset / 8; + bit_offset %= 8; + if (bit_offset != 0) { + int num_indexes_head = 0; + uint64_t bits_head = bits[0] >> bit_offset; + int bits_in_first_byte = std::min(num_bits, 8 - bit_offset); + bits_filter_indexes(bit_to_search, hardware_flags, bits_in_first_byte, + reinterpret_cast(&bits_head), input_indexes, + &num_indexes_head, indexes); + int num_indexes_tail = 0; + if (num_bits > bits_in_first_byte) { + bits_filter_indexes(bit_to_search, hardware_flags, num_bits - bits_in_first_byte, + bits + 1, input_indexes + bits_in_first_byte, &num_indexes_tail, + indexes + num_indexes_head); + } + *num_indexes = num_indexes_head + num_indexes_tail; + return; + } + + if (bit_to_search == 0) { + bits_to_indexes_internal<0, true>(hardware_flags, num_bits, bits, input_indexes, + num_indexes, indexes); + } else { + ARROW_DCHECK(bit_to_search == 1); + bits_to_indexes_internal<1, true>(hardware_flags, num_bits, bits, input_indexes, + num_indexes, indexes); + } +} + +void bit_util::bits_split_indexes(int64_t hardware_flags, const int num_bits, + const uint8_t* bits, int* num_indexes_bit0, + uint16_t* indexes_bit0, uint16_t* indexes_bit1, + int bit_offset) { + bits_to_indexes(0, hardware_flags, num_bits, bits, num_indexes_bit0, indexes_bit0, + bit_offset); + int num_indexes_bit1; + bits_to_indexes(1, hardware_flags, num_bits, bits, &num_indexes_bit1, indexes_bit1, + bit_offset); +} + +void bit_util::bits_to_bytes(int64_t hardware_flags, const int num_bits, + const uint8_t* bits, uint8_t* bytes, int bit_offset) { + bits += bit_offset / 8; + bit_offset %= 8; + if (bit_offset != 0) { + uint64_t bits_head = bits[0] >> bit_offset; + int bits_in_first_byte = std::min(num_bits, 8 - bit_offset); + bits_to_bytes(hardware_flags, bits_in_first_byte, + reinterpret_cast(&bits_head), bytes); + if (num_bits > bits_in_first_byte) { + bits_to_bytes(hardware_flags, num_bits - bits_in_first_byte, bits + 1, + bytes + bits_in_first_byte); + } + return; + } + + int num_processed = 0; +#if defined(ARROW_HAVE_AVX2) + if (hardware_flags & arrow::internal::CpuInfo::AVX2) { + // The function call below processes whole 32 bit chunks together. + num_processed = num_bits - (num_bits % 32); + bits_to_bytes_avx2(num_processed, bits, bytes); + } +#endif + // Processing 8 bits at a time + constexpr int unroll = 8; + for (int i = num_processed / unroll; i < num_bits / unroll; ++i) { + uint8_t bits_next = bits[i]; + // Clear the lowest bit and then make 8 copies of remaining 7 bits, each 7 bits apart + // from the previous. + uint64_t unpacked = static_cast(bits_next & 0xfe) * + ((1ULL << 7) | (1ULL << 14) | (1ULL << 21) | (1ULL << 28) | + (1ULL << 35) | (1ULL << 42) | (1ULL << 49)); + unpacked |= (bits_next & 1); + unpacked &= 0x0101010101010101ULL; + unpacked *= 255; + util::SafeStore(&reinterpret_cast(bytes)[i], unpacked); + } + int tail = num_bits % unroll; + if (tail) { + uint8_t bits_next = bits[(num_bits - tail) / unroll]; + // Clear the lowest bit and then make 8 copies of remaining 7 bits, each 7 bits apart + // from the previous. + uint64_t unpacked = static_cast(bits_next & 0xfe) * + ((1ULL << 7) | (1ULL << 14) | (1ULL << 21) | (1ULL << 28) | + (1ULL << 35) | (1ULL << 42) | (1ULL << 49)); + unpacked |= (bits_next & 1); + unpacked &= 0x0101010101010101ULL; + unpacked *= 255; + SafeStoreUpTo8Bytes(bytes + num_bits - tail, tail, unpacked); + } +} + +void bit_util::bytes_to_bits(int64_t hardware_flags, const int num_bits, + const uint8_t* bytes, uint8_t* bits, int bit_offset) { + bits += bit_offset / 8; + bit_offset %= 8; + if (bit_offset != 0) { + uint64_t bits_head; + int bits_in_first_byte = std::min(num_bits, 8 - bit_offset); + bytes_to_bits(hardware_flags, bits_in_first_byte, bytes, + reinterpret_cast(&bits_head)); + uint8_t mask = (1 << bit_offset) - 1; + *bits = static_cast((*bits & mask) | (bits_head << bit_offset)); + + if (num_bits > bits_in_first_byte) { + bytes_to_bits(hardware_flags, num_bits - bits_in_first_byte, + bytes + bits_in_first_byte, bits + 1); + } + return; + } + + int num_processed = 0; +#if defined(ARROW_HAVE_AVX2) + if (hardware_flags & arrow::internal::CpuInfo::AVX2) { + // The function call below processes whole 32 bit chunks together. + num_processed = num_bits - (num_bits % 32); + bytes_to_bits_avx2(num_processed, bytes, bits); + } +#endif + // Process 8 bits at a time + constexpr int unroll = 8; + for (int i = num_processed / unroll; i < num_bits / unroll; ++i) { + uint64_t bytes_next = util::SafeLoad(&reinterpret_cast(bytes)[i]); + bytes_next &= 0x0101010101010101ULL; + bytes_next |= (bytes_next >> 7); // Pairs of adjacent output bits in individual bytes + bytes_next |= (bytes_next >> 14); // 4 adjacent output bits in individual bytes + bytes_next |= (bytes_next >> 28); // All 8 output bits in the lowest byte + bits[i] = static_cast(bytes_next & 0xff); + } + int tail = num_bits % unroll; + if (tail) { + uint64_t bytes_next = SafeLoadUpTo8Bytes(bytes + num_bits - tail, tail); + bytes_next &= 0x0101010101010101ULL; + bytes_next |= (bytes_next >> 7); // Pairs of adjacent output bits in individual bytes + bytes_next |= (bytes_next >> 14); // 4 adjacent output bits in individual bytes + bytes_next |= (bytes_next >> 28); // All 8 output bits in the lowest byte + bits[num_bits / 8] = static_cast(bytes_next & 0xff); + } +} + +bool bit_util::are_all_bytes_zero(int64_t hardware_flags, const uint8_t* bytes, + uint32_t num_bytes) { +#if defined(ARROW_HAVE_AVX2) + if (hardware_flags & arrow::internal::CpuInfo::AVX2) { + return are_all_bytes_zero_avx2(bytes, num_bytes); + } +#endif + uint64_t result_or = 0; + uint32_t i; + for (i = 0; i < num_bytes / 8; ++i) { + uint64_t x = util::SafeLoad(&reinterpret_cast(bytes)[i]); + result_or |= x; + } + if (num_bytes % 8 > 0) { + uint64_t tail = 0; + result_or |= memcmp(bytes + i * 8, &tail, num_bytes % 8); + } + return result_or == 0; +} + +} // namespace util + +} // namespace arrow diff --git a/cpp/src/arrow/compute/util.h b/cpp/src/arrow/compute/util.h new file mode 100644 index 00000000000..60c20137c8c --- /dev/null +++ b/cpp/src/arrow/compute/util.h @@ -0,0 +1,205 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include +#include +#include +#include +#include + +#include "arrow/buffer.h" +#include "arrow/compute/type_fwd.h" +#include "arrow/memory_pool.h" +#include "arrow/result.h" +#include "arrow/status.h" +#include "arrow/util/bit_util.h" +#include "arrow/util/cpu_info.h" +#include "arrow/util/mutex.h" +#include "arrow/util/thread_pool.h" +#include "arrow/util/type_fwd.h" + +#if defined(__clang__) || defined(__GNUC__) +#define BYTESWAP(x) __builtin_bswap64(x) +#define ROTL(x, n) (((x) << (n)) | ((x) >> ((-n) & 31))) +#define ROTL64(x, n) (((x) << (n)) | ((x) >> ((-n) & 63))) +#define PREFETCH(ptr) __builtin_prefetch((ptr), 0 /* rw==read */, 3 /* locality */) +#elif defined(_MSC_VER) +#include +#define BYTESWAP(x) _byteswap_uint64(x) +#define ROTL(x, n) _rotl((x), (n)) +#define ROTL64(x, n) _rotl64((x), (n)) +#if defined(_M_X64) || defined(_M_I86) +#include // https://msdn.microsoft.com/fr-fr/library/84szxsww(v=vs.90).aspx +#define PREFETCH(ptr) _mm_prefetch((const char*)(ptr), _MM_HINT_T0) +#else +#define PREFETCH(ptr) (void)(ptr) /* disabled */ +#endif +#endif + +namespace arrow { +namespace util { + +// Some platforms typedef int64_t as long int instead of long long int, +// which breaks the _mm256_i64gather_epi64 and _mm256_i32gather_epi64 intrinsics +// which need long long. +// We use the cast to the type below in these intrinsics to make the code +// compile in all cases. +// +using int64_for_gather_t = const long long int; // NOLINT runtime-int + +// All MiniBatch... classes use TempVectorStack for vector allocations and can +// only work with vectors up to 1024 elements. +// +// They should only be allocated on the stack to guarantee the right sequence +// of allocation and deallocation of vectors from TempVectorStack. +// +class MiniBatch { + public: + static constexpr int kLogMiniBatchLength = 10; + static constexpr int kMiniBatchLength = 1 << kLogMiniBatchLength; +}; + +/// Storage used to allocate temporary vectors of a batch size. +/// Temporary vectors should resemble allocating temporary variables on the stack +/// but in the context of vectorized processing where we need to store a vector of +/// temporaries instead of a single value. +class TempVectorStack { + template + friend class TempVectorHolder; + + public: + Status Init(MemoryPool* pool, int64_t size) { + num_vectors_ = 0; + top_ = 0; + buffer_size_ = PaddedAllocationSize(size) + kPadding + 2 * sizeof(uint64_t); + ARROW_ASSIGN_OR_RAISE(auto buffer, AllocateResizableBuffer(size, pool)); + // Ensure later operations don't accidentally read uninitialized memory. + std::memset(buffer->mutable_data(), 0xFF, size); + buffer_ = std::move(buffer); + return Status::OK(); + } + + private: + int64_t PaddedAllocationSize(int64_t num_bytes) { + // Round up allocation size to multiple of 8 bytes + // to avoid returning temp vectors with unaligned address. + // + // Also add padding at the end to facilitate loads and stores + // using SIMD when number of vector elements is not divisible + // by the number of SIMD lanes. + // + return ::arrow::bit_util::RoundUp(num_bytes, sizeof(int64_t)) + kPadding; + } + void alloc(uint32_t num_bytes, uint8_t** data, int* id); + void release(int id, uint32_t num_bytes); + static constexpr uint64_t kGuard1 = 0x3141592653589793ULL; + static constexpr uint64_t kGuard2 = 0x0577215664901532ULL; + static constexpr int64_t kPadding = 64; + int num_vectors_; + int64_t top_; + std::unique_ptr buffer_; + int64_t buffer_size_; +}; + +template +class TempVectorHolder { + friend class TempVectorStack; + + public: + ~TempVectorHolder() { stack_->release(id_, num_elements_ * sizeof(T)); } + T* mutable_data() { return reinterpret_cast(data_); } + TempVectorHolder(TempVectorStack* stack, uint32_t num_elements) { + stack_ = stack; + num_elements_ = num_elements; + stack_->alloc(num_elements * sizeof(T), &data_, &id_); + } + + private: + TempVectorStack* stack_; + uint8_t* data_; + int id_; + uint32_t num_elements_; +}; + +class bit_util { + public: + static void bits_to_indexes(int bit_to_search, int64_t hardware_flags, + const int num_bits, const uint8_t* bits, int* num_indexes, + uint16_t* indexes, int bit_offset = 0); + + static void bits_filter_indexes(int bit_to_search, int64_t hardware_flags, + const int num_bits, const uint8_t* bits, + const uint16_t* input_indexes, int* num_indexes, + uint16_t* indexes, int bit_offset = 0); + + // Input and output indexes may be pointing to the same data (in-place filtering). + static void bits_split_indexes(int64_t hardware_flags, const int num_bits, + const uint8_t* bits, int* num_indexes_bit0, + uint16_t* indexes_bit0, uint16_t* indexes_bit1, + int bit_offset = 0); + + // Bit 1 is replaced with byte 0xFF. + static void bits_to_bytes(int64_t hardware_flags, const int num_bits, + const uint8_t* bits, uint8_t* bytes, int bit_offset = 0); + + // Return highest bit of each byte. + static void bytes_to_bits(int64_t hardware_flags, const int num_bits, + const uint8_t* bytes, uint8_t* bits, int bit_offset = 0); + + static bool are_all_bytes_zero(int64_t hardware_flags, const uint8_t* bytes, + uint32_t num_bytes); + + private: + inline static uint64_t SafeLoadUpTo8Bytes(const uint8_t* bytes, int num_bytes); + inline static void SafeStoreUpTo8Bytes(uint8_t* bytes, int num_bytes, uint64_t value); + inline static void bits_to_indexes_helper(uint64_t word, uint16_t base_index, + int* num_indexes, uint16_t* indexes); + inline static void bits_filter_indexes_helper(uint64_t word, + const uint16_t* input_indexes, + int* num_indexes, uint16_t* indexes); + template + static void bits_to_indexes_internal(int64_t hardware_flags, const int num_bits, + const uint8_t* bits, const uint16_t* input_indexes, + int* num_indexes, uint16_t* indexes, + uint16_t base_index = 0); + +#if defined(ARROW_HAVE_AVX2) + static void bits_to_indexes_avx2(int bit_to_search, const int num_bits, + const uint8_t* bits, int* num_indexes, + uint16_t* indexes, uint16_t base_index = 0); + static void bits_filter_indexes_avx2(int bit_to_search, const int num_bits, + const uint8_t* bits, const uint16_t* input_indexes, + int* num_indexes, uint16_t* indexes); + template + static void bits_to_indexes_imp_avx2(const int num_bits, const uint8_t* bits, + int* num_indexes, uint16_t* indexes, + uint16_t base_index = 0); + template + static void bits_filter_indexes_imp_avx2(const int num_bits, const uint8_t* bits, + const uint16_t* input_indexes, + int* num_indexes, uint16_t* indexes); + static void bits_to_bytes_avx2(const int num_bits, const uint8_t* bits, uint8_t* bytes); + static void bytes_to_bits_avx2(const int num_bits, const uint8_t* bytes, uint8_t* bits); + static bool are_all_bytes_zero_avx2(const uint8_t* bytes, uint32_t num_bytes); +#endif +}; + +} // namespace util +} // namespace arrow diff --git a/cpp/src/arrow/compute/util_internal.h b/cpp/src/arrow/compute/util_internal.h new file mode 100644 index 00000000000..87e89a33507 --- /dev/null +++ b/cpp/src/arrow/compute/util_internal.h @@ -0,0 +1,31 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "arrow/util/logging.h" + +namespace arrow { +namespace util { + +template +void CheckAlignment(const void* ptr) { + ARROW_DCHECK(reinterpret_cast(ptr) % sizeof(T) == 0); +} + +} // namespace util +} // namespace arrow