From 70918b0a146a6b9b7ec545a109b9df595a067abe Mon Sep 17 00:00:00 2001 From: David Li Date: Tue, 24 Aug 2021 10:30:49 -0400 Subject: [PATCH 01/14] ARROW-7179: [C++] Somewhat optimize binary coalesce on primitives --- .../arrow/compute/kernels/scalar_if_else.cc | 121 ++++++++++++++++++ .../kernels/scalar_if_else_benchmark.cc | 91 +++++++++++++ 2 files changed, 212 insertions(+) diff --git a/cpp/src/arrow/compute/kernels/scalar_if_else.cc b/cpp/src/arrow/compute/kernels/scalar_if_else.cc index affe9267942..f4c6fa0589e 100644 --- a/cpp/src/arrow/compute/kernels/scalar_if_else.cc +++ b/cpp/src/arrow/compute/kernels/scalar_if_else.cc @@ -1827,9 +1827,130 @@ Status ExecArrayCoalesce(KernelContext* ctx, const ExecBatch& batch, Datum* out) return Status::OK(); } +template +Status ExecArrayScalarCoalesce(KernelContext* ctx, Datum left, Datum right, + int64_t length, Datum* out) { + ArrayData* output = out->mutable_array(); + const int64_t out_offset = output->offset; + uint8_t* out_valid = output->buffers[0]->mutable_data(); + uint8_t* out_values = output->buffers[1]->mutable_data(); + + const ArrayData& left_arr = *left.array(); + const uint8_t* left_valid = + left_arr.MayHaveNulls() ? left_arr.buffers[0]->data() : nullptr; + arrow::internal::OptionalBitBlockCounter bit_counter(left_valid, left_arr.offset, + left_arr.length); + int64_t offset = 0; + + const uint8_t* left_values = left_arr.buffers[1]->data(); + const Scalar& right_scalar = *right.scalar(); + while (offset < length) { + const auto block = bit_counter.NextWord(); + if (block.AllSet()) { + // All from left + CopyFixedWidth::CopyArray(*left_arr.type, left_values, + left_arr.offset + offset, block.length, out_values, + out_offset + offset); + } else if (block.NoneSet()) { + // All from right + CopyFixedWidth::CopyScalar(right_scalar, block.length, out_values, + out_offset + offset); + } else if (block.popcount) { + // One by one + for (int64_t j = 0; j < block.length; ++j) { + if (BitUtil::GetBit(left_valid, left_arr.offset + offset + j)) { + CopyFixedWidth::CopyArray( + *left_arr.type, left_values, left_arr.offset + offset + j, + /*length=*/1, out_values, out_offset + offset + j); + } else { + CopyFixedWidth::CopyScalar(right_scalar, /*length=*/1, out_values, + out_offset + offset + j); + } + } + } + offset += block.length; + } + + if (right_scalar.is_valid || !left_valid) { + BitUtil::SetBitsTo(out_valid, out_offset, length, true); + } else { + arrow::internal::CopyBitmap(left_valid, left_arr.offset, length, out_valid, + out_offset); + } + return Status::OK(); +} + +// Special case: implement 'coalesce' for any 2 arguments for any fixed-width +// type (a 'fill_null' operation) +template +Status ExecBinaryCoalesce(KernelContext* ctx, Datum left, Datum right, int64_t length, + Datum* out) { + if (left.is_scalar() && right.is_scalar()) { + // Both scalar + *out = left.scalar()->is_valid ? left : right; + return Status::OK(); + } + + ArrayData* output = out->mutable_array(); + const int64_t out_offset = output->offset; + uint8_t* out_valid = output->buffers[0]->mutable_data(); + uint8_t* out_values = output->buffers[1]->mutable_data(); + + if (left.is_scalar()) { + // LHS is scalar + CopyValues(left.scalar()->is_valid ? left : right, /*in_offset=*/0, length, + out_valid, out_values, out_offset); + return Status::OK(); + } + + // Array with nulls + const ArrayData& left_arr = *left.array(); + const uint8_t* left_valid = + left_arr.MayHaveNulls() ? left_arr.buffers[0]->data() : nullptr; + arrow::internal::OptionalBitBlockCounter bit_counter(left_valid, left_arr.offset, + left_arr.length); + int64_t offset = 0; + + if (right.is_scalar()) { + return ExecArrayScalarCoalesce(ctx, left, right, length, out); + } + + // RHS is array + // TODO: benchmark this + while (offset < length) { + const auto block = bit_counter.NextWord(); + if (block.AllSet()) { + // All from left + CopyValues(left, offset, block.length, out_valid, out_values, + out_offset + offset); + } else if (block.NoneSet()) { + // All from right + CopyValues(right, offset, block.length, out_valid, out_values, + out_offset + offset); + } else if (block.popcount) { + // One by one + for (int64_t j = 0; j < block.length; ++j) { + if (BitUtil::GetBit(left_valid, left_arr.offset + offset + j)) { + CopyOneValue(left, offset + j, out_valid, out_values, + out_offset + offset + j); + } else { + CopyOneValue(right, offset + j, out_valid, out_values, + out_offset + offset + j); + } + } + } + offset += block.length; + } + return Status::OK(); +} + template struct CoalesceFunctor { static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) { + // Special case for two arguments (since "fill_null" is a common operation) + if (batch.num_values() == 2) { + return ExecBinaryCoalesce(ctx, batch[0], batch[1], batch.length, out); + } for (const auto& datum : batch.values) { if (datum.is_array()) { return ExecArrayCoalesce(ctx, batch, out); diff --git a/cpp/src/arrow/compute/kernels/scalar_if_else_benchmark.cc b/cpp/src/arrow/compute/kernels/scalar_if_else_benchmark.cc index a8041f9086e..a9273d6ab49 100644 --- a/cpp/src/arrow/compute/kernels/scalar_if_else_benchmark.cc +++ b/cpp/src/arrow/compute/kernels/scalar_if_else_benchmark.cc @@ -291,6 +291,84 @@ static void CoalesceBench(benchmark::State& state) { sizeof(CType)); } +template +static void CoalesceScalarBench(benchmark::State& state) { + using CType = typename Type::c_type; + auto type = TypeTraits::type_singleton(); + + int64_t len = state.range(0); + int64_t offset = state.range(1); + + random::RandomArrayGenerator rand(/*seed=*/0); + + std::vector arguments = { + rand.ArrayOf(type, len, /*null_probability=*/0.25)->Slice(offset), + Datum(CType(42)), + }; + + for (auto _ : state) { + ABORT_NOT_OK(CallFunction("coalesce", arguments)); + } + + state.SetBytesProcessed(state.iterations() * (len - offset) * sizeof(CType)); +} + +static void CoalesceScalarStringBench(benchmark::State& state) { + int64_t len = state.range(0); + int64_t offset = state.range(1); + + random::RandomArrayGenerator rand(/*seed=*/0); + + auto arr = rand.ArrayOf(utf8(), len, /*null_probability=*/0.25)->Slice(offset); + std::vector arguments = {arr, Datum("foobar")}; + + for (auto _ : state) { + ABORT_NOT_OK(CallFunction("coalesce", arguments)); + } + + state.SetBytesProcessed(state.iterations() * + static_cast(*arr).total_values_length()); +} + +template +static void FillNullScalarBench(benchmark::State& state) { + using CType = typename Type::c_type; + auto type = TypeTraits::type_singleton(); + + int64_t len = state.range(0); + int64_t offset = state.range(1); + + random::RandomArrayGenerator rand(/*seed=*/0); + + std::vector arguments = { + rand.ArrayOf(type, len, /*null_probability=*/0.25)->Slice(offset), + Datum(CType(42)), + }; + + for (auto _ : state) { + ABORT_NOT_OK(CallFunction("fill_null", arguments)); + } + + state.SetBytesProcessed(state.iterations() * (len - offset) * sizeof(CType)); +} + +static void FillNullScalarStringBench(benchmark::State& state) { + int64_t len = state.range(0); + int64_t offset = state.range(1); + + random::RandomArrayGenerator rand(/*seed=*/0); + + auto arr = rand.ArrayOf(utf8(), len, /*null_probability=*/0.25)->Slice(offset); + std::vector arguments = {arr, Datum("foobar")}; + + for (auto _ : state) { + ABORT_NOT_OK(CallFunction("fill_null", arguments)); + } + + state.SetBytesProcessed(state.iterations() * + static_cast(*arr).total_values_length()); +} + template static void CoalesceNonNullBench(benchmark::State& state) { using CType = typename Type::c_type; @@ -318,6 +396,14 @@ static void CoalesceBench64(benchmark::State& state) { return CoalesceBench(state); } +static void CoalesceScalarBench64(benchmark::State& state) { + return CoalesceScalarBench(state); +} + +static void FillNullScalarBench64(benchmark::State& state) { + return FillNullScalarBench(state); +} + static void CoalesceNonNullBench64(benchmark::State& state) { return CoalesceBench(state); } @@ -389,6 +475,11 @@ BENCHMARK(CaseWhenBenchStringContiguous)->Args({kFewItems, 99}); BENCHMARK(CoalesceBench64)->Args({kNumItems, 0}); BENCHMARK(CoalesceBench64)->Args({kNumItems, 99}); +BENCHMARK(CoalesceScalarBench64)->Args({kNumItems, 0}); +BENCHMARK(FillNullScalarBench64)->Args({kNumItems, 0}); +BENCHMARK(CoalesceScalarStringBench)->Args({kNumItems, 0}); +BENCHMARK(FillNullScalarStringBench)->Args({kNumItems, 0}); + BENCHMARK(CoalesceNonNullBench64)->Args({kNumItems, 0}); BENCHMARK(CoalesceNonNullBench64)->Args({kNumItems, 99}); From 2ad5f7199c05f1158dc0799e2d1af32beb931cee Mon Sep 17 00:00:00 2001 From: David Li Date: Tue, 24 Aug 2021 11:01:24 -0400 Subject: [PATCH 02/14] ARROW-7179: [C++] Optimize binary coalesce on strings --- .../arrow/compute/kernels/scalar_if_else.cc | 38 +++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/cpp/src/arrow/compute/kernels/scalar_if_else.cc b/cpp/src/arrow/compute/kernels/scalar_if_else.cc index f4c6fa0589e..35495de6ed0 100644 --- a/cpp/src/arrow/compute/kernels/scalar_if_else.cc +++ b/cpp/src/arrow/compute/kernels/scalar_if_else.cc @@ -1970,8 +1970,15 @@ struct CoalesceFunctor { template struct CoalesceFunctor> { using offset_type = typename Type::offset_type; + using ArrayType = typename TypeTraits::ArrayType; using BuilderType = typename TypeTraits::BuilderType; static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) { + if (batch.num_values() == 2 && batch.values[0].is_array() && + batch.values[1].is_scalar()) { + // Specialized implementation for common case ('fill_null' operation) + return ExecArrayScalar(ctx, *batch.values[0].array(), *batch.values[1].scalar(), + out); + } for (const auto& datum : batch.values) { if (datum.is_array()) { return ExecArray(ctx, batch, out); @@ -1980,6 +1987,37 @@ struct CoalesceFunctor> { return ExecScalarCoalesce(ctx, batch, out); } + static Status ExecArrayScalar(KernelContext* ctx, const ArrayData& left, + const Scalar& right, Datum* out) { + const int64_t null_count = left.GetNullCount(); + if (null_count == 0 || !right.is_valid) { + *out = left; + return Status::OK(); + } + ArrayData* output = out->mutable_array(); + BuilderType builder(left.type, ctx->memory_pool()); + RETURN_NOT_OK(builder.Reserve(left.length)); + const auto& scalar = checked_cast(right); + const offset_type* offsets = left.GetValues(1); + const int64_t data_reserve = static_cast(offsets[left.length] - offsets[0]) + + null_count * scalar.value->size(); + if (data_reserve > std::numeric_limits::max()) { + return Status::CapacityError( + "Result will not fit in a 32-bit binary-like array, convert to large type"); + } + RETURN_NOT_OK(builder.ReserveData(static_cast(data_reserve))); + + util::string_view fill_value(*scalar.value); + VisitArrayDataInline( + left, [&](util::string_view s) { builder.UnsafeAppend(s); }, + [&]() { builder.UnsafeAppend(fill_value); }); + + ARROW_ASSIGN_OR_RAISE(auto temp_output, builder.Finish()); + *output = *temp_output->data(); + output->type = left.type; + return Status::OK(); + } + static Status ExecArray(KernelContext* ctx, const ExecBatch& batch, Datum* out) { // Special case: grab any leading non-null scalar or array arguments for (const auto& datum : batch.values) { From 3022002228bda111588a82ba517f2cc08234cfa2 Mon Sep 17 00:00:00 2001 From: David Li Date: Tue, 24 Aug 2021 11:09:33 -0400 Subject: [PATCH 03/14] ARROW-7179: [C++] Remove most references to fill_null --- cpp/src/arrow/CMakeLists.txt | 1 - cpp/src/arrow/compute/api_scalar.cc | 4 - cpp/src/arrow/compute/api_scalar.h | 15 -- cpp/src/arrow/compute/kernels/CMakeLists.txt | 1 - .../arrow/compute/kernels/scalar_fill_null.cc | 244 ------------------ .../compute/kernels/scalar_fill_null_test.cc | 184 ------------- .../kernels/scalar_if_else_benchmark.cc | 45 ---- cpp/src/arrow/compute/registry.cc | 1 - cpp/src/arrow/compute/registry_internal.h | 1 - docs/source/cpp/compute.rst | 15 +- docs/source/python/compute.rst | 14 +- python/pyarrow/compute.py | 16 +- r/R/compute.R | 2 +- r/man/call_function.Rd | 2 +- 14 files changed, 21 insertions(+), 524 deletions(-) delete mode 100644 cpp/src/arrow/compute/kernels/scalar_fill_null.cc delete mode 100644 cpp/src/arrow/compute/kernels/scalar_fill_null_test.cc diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt index d003e29999d..637f3d1a54f 100644 --- a/cpp/src/arrow/CMakeLists.txt +++ b/cpp/src/arrow/CMakeLists.txt @@ -403,7 +403,6 @@ if(ARROW_COMPUTE) compute/kernels/scalar_string.cc compute/kernels/scalar_temporal.cc compute/kernels/scalar_validity.cc - compute/kernels/scalar_fill_null.cc compute/kernels/scalar_if_else.cc compute/kernels/util_internal.cc compute/kernels/vector_hash.cc diff --git a/cpp/src/arrow/compute/api_scalar.cc b/cpp/src/arrow/compute/api_scalar.cc index 599ce75bece..b7287129cbc 100644 --- a/cpp/src/arrow/compute/api_scalar.cc +++ b/cpp/src/arrow/compute/api_scalar.cc @@ -472,10 +472,6 @@ Result Compare(const Datum& left, const Datum& right, CompareOptions opti SCALAR_EAGER_UNARY(IsValid, "is_valid") SCALAR_EAGER_UNARY(IsNan, "is_nan") -Result FillNull(const Datum& values, const Datum& fill_value, ExecContext* ctx) { - return CallFunction("fill_null", {values, fill_value}, ctx); -} - Result IfElse(const Datum& cond, const Datum& if_true, const Datum& if_false, ExecContext* ctx) { return CallFunction("if_else", {cond, if_true, if_false}, ctx); diff --git a/cpp/src/arrow/compute/api_scalar.h b/cpp/src/arrow/compute/api_scalar.h index 769ab0e7874..2cbc0fde2b2 100644 --- a/cpp/src/arrow/compute/api_scalar.h +++ b/cpp/src/arrow/compute/api_scalar.h @@ -787,21 +787,6 @@ Result IsNull(const Datum& values, NullOptions options = NullOptions::Def ARROW_EXPORT Result IsNan(const Datum& values, ExecContext* ctx = NULLPTR); -/// \brief FillNull replaces each null element in `values` -/// with `fill_value` -/// -/// \param[in] values input to examine for nullity -/// \param[in] fill_value scalar -/// \param[in] ctx the function execution context, optional -/// -/// \return the resulting datum -/// -/// \since 1.0.0 -/// \note API not yet finalized -ARROW_EXPORT -Result FillNull(const Datum& values, const Datum& fill_value, - ExecContext* ctx = NULLPTR); - /// \brief IfElse returns elements chosen from `left` or `right` /// depending on `cond`. `null` values in `cond` will be promoted to the result /// diff --git a/cpp/src/arrow/compute/kernels/CMakeLists.txt b/cpp/src/arrow/compute/kernels/CMakeLists.txt index 474ce1418fd..4096e497c0a 100644 --- a/cpp/src/arrow/compute/kernels/CMakeLists.txt +++ b/cpp/src/arrow/compute/kernels/CMakeLists.txt @@ -29,7 +29,6 @@ add_arrow_compute_test(scalar_test scalar_string_test.cc scalar_temporal_test.cc scalar_validity_test.cc - scalar_fill_null_test.cc scalar_if_else_test.cc test_util.cc) diff --git a/cpp/src/arrow/compute/kernels/scalar_fill_null.cc b/cpp/src/arrow/compute/kernels/scalar_fill_null.cc deleted file mode 100644 index cf22b0de3dc..00000000000 --- a/cpp/src/arrow/compute/kernels/scalar_fill_null.cc +++ /dev/null @@ -1,244 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include -#include - -#include "arrow/compute/kernels/common.h" -#include "arrow/scalar.h" -#include "arrow/util/bit_block_counter.h" -#include "arrow/util/bit_util.h" -#include "arrow/util/bitmap_ops.h" - -namespace arrow { - -using internal::BitBlockCount; -using internal::BitBlockCounter; - -namespace compute { -namespace internal { - -namespace { - -template -struct FillNullFunctor {}; - -// Numeric inputs - -template -struct FillNullFunctor::value>> { - using T = typename TypeTraits::CType; - - static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) { - const ArrayData& data = *batch[0].array(); - const Scalar& fill_value = *batch[1].scalar(); - ArrayData* output = out->mutable_array(); - - // Ensure the kernel is configured properly to have no validity bitmap / - // null count 0 unless we explicitly propagate it below. - DCHECK(output->buffers[0] == nullptr); - - T value = UnboxScalar::Unbox(fill_value); - if (data.MayHaveNulls() != 0 && fill_value.is_valid) { - ARROW_ASSIGN_OR_RAISE(std::shared_ptr out_buf, - ctx->Allocate(data.length * sizeof(T))); - - const uint8_t* is_valid = data.buffers[0]->data(); - const T* in_values = data.GetValues(1); - T* out_values = reinterpret_cast(out_buf->mutable_data()); - int64_t offset = data.offset; - BitBlockCounter bit_counter(is_valid, data.offset, data.length); - while (offset < data.offset + data.length) { - BitBlockCount block = bit_counter.NextWord(); - if (block.AllSet()) { - // Block all not null - std::memcpy(out_values, in_values, block.length * sizeof(T)); - } else if (block.NoneSet()) { - // Block all null - std::fill(out_values, out_values + block.length, value); - } else { - for (int64_t i = 0; i < block.length; ++i) { - out_values[i] = BitUtil::GetBit(is_valid, offset + i) ? in_values[i] : value; - } - } - offset += block.length; - out_values += block.length; - in_values += block.length; - } - output->buffers[1] = out_buf; - output->null_count = 0; - } else { - *output = data; - } - - return Status::OK(); - } -}; - -// Boolean input - -template -struct FillNullFunctor::value>> { - static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) { - const ArrayData& data = *batch[0].array(); - const Scalar& fill_value = *batch[1].scalar(); - ArrayData* output = out->mutable_array(); - - bool value = UnboxScalar::Unbox(fill_value); - if (data.MayHaveNulls() != 0 && fill_value.is_valid) { - ARROW_ASSIGN_OR_RAISE(std::shared_ptr out_buf, - ctx->AllocateBitmap(data.length)); - - const uint8_t* is_valid = data.buffers[0]->data(); - const uint8_t* data_bitmap = data.buffers[1]->data(); - uint8_t* out_bitmap = out_buf->mutable_data(); - - int64_t data_offset = data.offset; - BitBlockCounter bit_counter(is_valid, data.offset, data.length); - - int64_t out_offset = 0; - while (out_offset < data.length) { - BitBlockCount block = bit_counter.NextWord(); - if (block.AllSet()) { - // Block all not null - ::arrow::internal::CopyBitmap(data_bitmap, data_offset, block.length, - out_bitmap, out_offset); - } else if (block.NoneSet()) { - // Block all null - BitUtil::SetBitsTo(out_bitmap, out_offset, block.length, value); - } else { - for (int64_t i = 0; i < block.length; ++i) { - BitUtil::SetBitTo(out_bitmap, out_offset + i, - BitUtil::GetBit(is_valid, data_offset + i) - ? BitUtil::GetBit(data_bitmap, data_offset + i) - : value); - } - } - data_offset += block.length; - out_offset += block.length; - } - output->buffers[1] = out_buf; - output->null_count = 0; - } else { - *output = data; - } - - return Status::OK(); - } -}; - -// Null input - -template -struct FillNullFunctor::value>> { - static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) { - // Nothing preallocated, so we assign into the output - *out->mutable_array() = *batch[0].array(); - return Status::OK(); - } -}; - -// Binary-like input - -template -struct FillNullFunctor::value>> { - using BuilderType = typename TypeTraits::BuilderType; - - static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) { - const ArrayData& input = *batch[0].array(); - const auto& fill_value_scalar = - checked_cast(*batch[1].scalar()); - ArrayData* output = out->mutable_array(); - - // Ensure the kernel is configured properly to have no validity bitmap / - // null count 0 unless we explicitly propagate it below. - DCHECK(output->buffers[0] == nullptr); - - const int64_t null_count = input.GetNullCount(); - - if (null_count > 0 && fill_value_scalar.is_valid) { - util::string_view fill_value(*fill_value_scalar.value); - BuilderType builder(input.type, ctx->memory_pool()); - RETURN_NOT_OK(builder.ReserveData(input.buffers[2]->size() + - fill_value.length() * null_count)); - RETURN_NOT_OK(builder.Resize(input.length)); - - VisitArrayDataInline( - input, [&](util::string_view s) { builder.UnsafeAppend(s); }, - [&]() { builder.UnsafeAppend(fill_value); }); - std::shared_ptr string_array; - RETURN_NOT_OK(builder.Finish(&string_array)); - *output = *string_array->data(); - // The builder does not match the logical type, due to - // GenerateTypeAgnosticVarBinaryBase - output->type = input.type; - } else { - *output = input; - } - - return Status::OK(); - } -}; - -void AddBasicFillNullKernels(ScalarKernel kernel, ScalarFunction* func) { - auto AddKernels = [&](const std::vector>& types) { - for (const std::shared_ptr& ty : types) { - kernel.signature = - KernelSignature::Make({InputType::Array(ty), InputType::Scalar(ty)}, ty); - kernel.exec = GenerateTypeAgnosticPrimitive(*ty); - DCHECK_OK(func->AddKernel(kernel)); - } - }; - AddKernels(NumericTypes()); - AddKernels(TemporalTypes()); - AddKernels({boolean(), null()}); -} - -void AddBinaryFillNullKernels(ScalarKernel kernel, ScalarFunction* func) { - for (const std::shared_ptr& ty : BaseBinaryTypes()) { - kernel.signature = - KernelSignature::Make({InputType::Array(ty), InputType::Scalar(ty)}, ty); - kernel.exec = GenerateTypeAgnosticVarBinaryBase(*ty); - DCHECK_OK(func->AddKernel(kernel)); - } -} - -const FunctionDoc fill_null_doc{ - "Replace null elements", - ("`fill_value` must be a scalar of the same type as `values`.\n" - "Each non-null value in `values` is emitted as-is.\n" - "Each null value in `values` is replaced with `fill_value`."), - {"values", "fill_value"}}; - -} // namespace - -void RegisterScalarFillNull(FunctionRegistry* registry) { - { - ScalarKernel fill_null_base; - fill_null_base.null_handling = NullHandling::COMPUTED_NO_PREALLOCATE; - fill_null_base.mem_allocation = MemAllocation::NO_PREALLOCATE; - auto fill_null = - std::make_shared("fill_null", Arity::Binary(), &fill_null_doc); - AddBasicFillNullKernels(fill_null_base, fill_null.get()); - AddBinaryFillNullKernels(fill_null_base, fill_null.get()); - DCHECK_OK(registry->AddFunction(fill_null)); - } -} - -} // namespace internal -} // namespace compute -} // namespace arrow diff --git a/cpp/src/arrow/compute/kernels/scalar_fill_null_test.cc b/cpp/src/arrow/compute/kernels/scalar_fill_null_test.cc deleted file mode 100644 index 70ce4d5ca7b..00000000000 --- a/cpp/src/arrow/compute/kernels/scalar_fill_null_test.cc +++ /dev/null @@ -1,184 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include -#include - -#include - -#include "arrow/array/array_base.h" -#include "arrow/compute/api.h" -#include "arrow/compute/kernels/test_util.h" -#include "arrow/result.h" -#include "arrow/scalar.h" -#include "arrow/testing/gtest_compat.h" -#include "arrow/testing/gtest_util.h" -#include "arrow/testing/random.h" -#include "arrow/type.h" -#include "arrow/type_traits.h" - -namespace arrow { -namespace compute { - -void CheckFillNull(const Array& input, const Datum& fill_value, const Array& expected, - bool all_valid = true) { - auto Check = [&](const Array& input, const Array& expected) { - ASSERT_OK_AND_ASSIGN(Datum datum_out, FillNull(input, fill_value)); - std::shared_ptr result = datum_out.make_array(); - ValidateOutput(*result); - AssertArraysEqual(expected, *result, /*verbose=*/true); - if (all_valid) { - // Check null count of ArrayData is set, not the computed Array.null_count - ASSERT_EQ(result->data()->null_count, 0); - } - }; - - Check(input, expected); - if (input.length() > 0) { - Check(*input.Slice(1), *expected.Slice(1)); - } -} - -void CheckFillNull(const std::shared_ptr& type, const std::string& in_values, - const Datum& fill_value, const std::string& out_values, - bool all_valid = true) { - std::shared_ptr input = ArrayFromJSON(type, in_values); - std::shared_ptr expected = ArrayFromJSON(type, out_values); - CheckFillNull(*input, fill_value, *expected, all_valid); -} - -class TestFillNullKernel : public ::testing::Test {}; - -template -class TestFillNullPrimitive : public ::testing::Test {}; - -typedef ::testing::Types - PrimitiveTypes; - -TEST_F(TestFillNullKernel, FillNullInvalidScalar) { - auto scalar = std::make_shared(3); - scalar->is_valid = false; - CheckFillNull(int8(), "[1, null, 3, 2]", Datum(scalar), "[1, null, 3, 2]", - /*all_valid=*/false); -} - -TYPED_TEST_SUITE(TestFillNullPrimitive, PrimitiveTypes); - -TYPED_TEST(TestFillNullPrimitive, FillNull) { - using T = typename TypeParam::c_type; - using ArrayType = typename TypeTraits::ArrayType; - using ScalarType = typename TypeTraits::ScalarType; - auto type = TypeTraits::type_singleton(); - auto scalar = std::make_shared(static_cast(5)); - // No Nulls - CheckFillNull(type, "[2, 4, 7, 9]", Datum(scalar), "[2, 4, 7, 9]"); - // Some Null - CheckFillNull(type, "[null, 4, null, 8]", Datum(scalar), "[5, 4, 5, 8]"); - // Empty Array - CheckFillNull(type, "[]", Datum(scalar), "[]"); - - random::RandomArrayGenerator rand(/*seed=*/0); - auto arr = std::static_pointer_cast( - rand.ArrayOf(type, 1000, /*null_probability=*/0.01)); - - std::shared_ptr expected_data = arr->data()->Copy(); - expected_data->null_count = 0; - expected_data->buffers[0] = nullptr; - expected_data->buffers[1] = *AllocateBuffer(arr->length() * sizeof(T)); - T* out_data = expected_data->GetMutableValues(1); - for (int64_t i = 0; i < arr->length(); ++i) { - if (arr->IsValid(i)) { - out_data[i] = arr->Value(i); - } else { - out_data[i] = scalar->value; - } - } - CheckFillNull(*arr, Datum(scalar), ArrayType(expected_data)); -} - -TEST_F(TestFillNullKernel, FillNullNull) { - auto datum = Datum(std::make_shared()); - CheckFillNull(null(), "[null, null, null, null]", datum, "[null, null, null, null]", - /*all_valid=*/false); -} - -TEST_F(TestFillNullKernel, FillNullBoolean) { - auto scalar1 = std::make_shared(false); - auto scalar2 = std::make_shared(true); - // no nulls - CheckFillNull(boolean(), "[true, false, true, false]", Datum(scalar1), - "[true, false, true, false]"); - // some nulls - CheckFillNull(boolean(), "[true, false, false, null]", Datum(scalar1), - "[true, false, false, false]"); - CheckFillNull(boolean(), "[true, null, false, null]", Datum(scalar2), - "[true, true, false, true]"); - - random::RandomArrayGenerator rand(/*seed=*/0); - auto arr = std::static_pointer_cast( - rand.Boolean(1000, /*true_probability=*/0.5, /*null_probability=*/0.01)); - - auto expected_data = arr->data()->Copy(); - expected_data->null_count = 0; - expected_data->buffers[0] = nullptr; - expected_data->buffers[1] = *AllocateEmptyBitmap(arr->length()); - uint8_t* out_data = expected_data->buffers[1]->mutable_data(); - for (int64_t i = 0; i < arr->length(); ++i) { - if (arr->IsValid(i)) { - BitUtil::SetBitTo(out_data, i, arr->Value(i)); - } else { - BitUtil::SetBitTo(out_data, i, true); - } - } - CheckFillNull(*arr, Datum(std::make_shared(true)), - BooleanArray(expected_data)); -} - -TEST_F(TestFillNullKernel, FillNullTimeStamp) { - auto time32_type = time32(TimeUnit::SECOND); - auto time64_type = time64(TimeUnit::NANO); - auto scalar1 = std::make_shared(5, time32_type); - auto scalar2 = std::make_shared(6, time64_type); - // no nulls - CheckFillNull(time32_type, "[2, 1, 6, 9]", Datum(scalar1), "[2, 1, 6, 9]"); - CheckFillNull(time64_type, "[2, 1, 6, 9]", Datum(scalar2), "[2, 1, 6, 9]"); - // some nulls - CheckFillNull(time32_type, "[2, 1, 6, null]", Datum(scalar1), "[2, 1, 6, 5]"); - CheckFillNull(time64_type, "[2, 1, 6, null]", Datum(scalar2), "[2, 1, 6, 6]"); -} - -TEST_F(TestFillNullKernel, FillNullString) { - auto type = large_utf8(); - auto scalar = std::make_shared("arrow"); - // no nulls - CheckFillNull(type, R"(["foo", "bar"])", Datum(scalar), R"(["foo", "bar"])"); - // some nulls - CheckFillNull(type, R"(["foo", "bar", null])", Datum(scalar), - R"(["foo", "bar", "arrow"])"); -} - -TEST_F(TestFillNullKernel, FillNullSetsZeroNullCount) { - auto arr = ArrayFromJSON(int32(), "[1, null, 3, 4]"); - auto fill_value = Datum(std::make_shared(2, int32())); - std::shared_ptr result = (*FillNull(arr, fill_value)).array(); - ASSERT_EQ(result->null_count, 0); -} - -} // namespace compute -} // namespace arrow diff --git a/cpp/src/arrow/compute/kernels/scalar_if_else_benchmark.cc b/cpp/src/arrow/compute/kernels/scalar_if_else_benchmark.cc index a9273d6ab49..38bd728c3dd 100644 --- a/cpp/src/arrow/compute/kernels/scalar_if_else_benchmark.cc +++ b/cpp/src/arrow/compute/kernels/scalar_if_else_benchmark.cc @@ -330,45 +330,6 @@ static void CoalesceScalarStringBench(benchmark::State& state) { static_cast(*arr).total_values_length()); } -template -static void FillNullScalarBench(benchmark::State& state) { - using CType = typename Type::c_type; - auto type = TypeTraits::type_singleton(); - - int64_t len = state.range(0); - int64_t offset = state.range(1); - - random::RandomArrayGenerator rand(/*seed=*/0); - - std::vector arguments = { - rand.ArrayOf(type, len, /*null_probability=*/0.25)->Slice(offset), - Datum(CType(42)), - }; - - for (auto _ : state) { - ABORT_NOT_OK(CallFunction("fill_null", arguments)); - } - - state.SetBytesProcessed(state.iterations() * (len - offset) * sizeof(CType)); -} - -static void FillNullScalarStringBench(benchmark::State& state) { - int64_t len = state.range(0); - int64_t offset = state.range(1); - - random::RandomArrayGenerator rand(/*seed=*/0); - - auto arr = rand.ArrayOf(utf8(), len, /*null_probability=*/0.25)->Slice(offset); - std::vector arguments = {arr, Datum("foobar")}; - - for (auto _ : state) { - ABORT_NOT_OK(CallFunction("fill_null", arguments)); - } - - state.SetBytesProcessed(state.iterations() * - static_cast(*arr).total_values_length()); -} - template static void CoalesceNonNullBench(benchmark::State& state) { using CType = typename Type::c_type; @@ -400,10 +361,6 @@ static void CoalesceScalarBench64(benchmark::State& state) { return CoalesceScalarBench(state); } -static void FillNullScalarBench64(benchmark::State& state) { - return FillNullScalarBench(state); -} - static void CoalesceNonNullBench64(benchmark::State& state) { return CoalesceBench(state); } @@ -476,9 +433,7 @@ BENCHMARK(CoalesceBench64)->Args({kNumItems, 0}); BENCHMARK(CoalesceBench64)->Args({kNumItems, 99}); BENCHMARK(CoalesceScalarBench64)->Args({kNumItems, 0}); -BENCHMARK(FillNullScalarBench64)->Args({kNumItems, 0}); BENCHMARK(CoalesceScalarStringBench)->Args({kNumItems, 0}); -BENCHMARK(FillNullScalarStringBench)->Args({kNumItems, 0}); BENCHMARK(CoalesceNonNullBench64)->Args({kNumItems, 0}); BENCHMARK(CoalesceNonNullBench64)->Args({kNumItems, 99}); diff --git a/cpp/src/arrow/compute/registry.cc b/cpp/src/arrow/compute/registry.cc index ca7b6137306..894e65bbe0c 100644 --- a/cpp/src/arrow/compute/registry.cc +++ b/cpp/src/arrow/compute/registry.cc @@ -160,7 +160,6 @@ static std::unique_ptr CreateBuiltInRegistry() { RegisterScalarSetLookup(registry.get()); RegisterScalarStringAscii(registry.get()); RegisterScalarValidity(registry.get()); - RegisterScalarFillNull(registry.get()); RegisterScalarIfElse(registry.get()); RegisterScalarTemporal(registry.get()); diff --git a/cpp/src/arrow/compute/registry_internal.h b/cpp/src/arrow/compute/registry_internal.h index 892b54341da..b2004eb2530 100644 --- a/cpp/src/arrow/compute/registry_internal.h +++ b/cpp/src/arrow/compute/registry_internal.h @@ -33,7 +33,6 @@ void RegisterScalarNested(FunctionRegistry* registry); void RegisterScalarSetLookup(FunctionRegistry* registry); void RegisterScalarStringAscii(FunctionRegistry* registry); void RegisterScalarValidity(FunctionRegistry* registry); -void RegisterScalarFillNull(FunctionRegistry* registry); void RegisterScalarIfElse(FunctionRegistry* registry); void RegisterScalarTemporal(FunctionRegistry* registry); diff --git a/docs/source/cpp/compute.rst b/docs/source/cpp/compute.rst index 653fb224e50..2b39e3ca33a 100644 --- a/docs/source/cpp/compute.rst +++ b/docs/source/cpp/compute.rst @@ -47,9 +47,8 @@ Computation inputs are represented as a general :class:`Datum` class, which is a tagged union of several shapes of data such as :class:`Scalar`, :class:`Array` and :class:`ChunkedArray`. Many compute functions support both array (chunked or not) and scalar inputs, however some will mandate -either. For example, the ``fill_null`` function requires its second input -to be a scalar, while ``sort_indices`` requires its first and only input to -be an array. +either. For example, while ``sort_indices`` requires its first and only +input to be an array. Invoking functions ------------------ @@ -1034,9 +1033,7 @@ depending on a condition. +------------------+------------+---------------------------------------------------+---------------------+---------+ | coalesce | Varargs | Any | Input type | \(3) | +------------------+------------+---------------------------------------------------+---------------------+---------+ -| fill_null | Binary | Boolean, Null, Numeric, Temporal, String-like | Input type | \(4) | -+------------------+------------+---------------------------------------------------+---------------------+---------+ -| if_else | Ternary | Boolean, Null, Numeric, Temporal | Input type | \(5) | +| if_else | Ternary | Boolean, Null, Numeric, Temporal | Input type | \(4) | +------------------+------------+---------------------------------------------------+---------------------+---------+ * \(1) This function acts like a SQL "case when" statement or switch-case. The @@ -1062,11 +1059,7 @@ depending on a condition. * \(3) Each row of the output will be the corresponding value of the first input which is non-null for that row, otherwise null. -* \(4) First input must be an array, second input a scalar of the same type. - Output is an array of the same type as the inputs, and with the same values - as the first input, except for nulls replaced with the second input value. - -* \(5) First input must be a Boolean scalar or array. Second and third inputs +* \(4) First input must be a Boolean scalar or array. Second and third inputs could be scalars or arrays and must be of the same type. Output is an array (or scalar if all inputs are scalar) of the same type as the second/ third input. If the nulls present on the first input, they will be promoted to the diff --git a/docs/source/python/compute.rst b/docs/source/python/compute.rst index bd58cb5ed9c..133520de970 100644 --- a/docs/source/python/compute.rst +++ b/docs/source/python/compute.rst @@ -22,12 +22,10 @@ Compute Functions ================= -Arrow supports logical compute operations over inputs of possibly -varying types. Many compute functions support both array (chunked or not) -and scalar inputs, but some will mandate either. For example, -the ``fill_null`` function requires its second input to be a scalar, -while ``sort_indices`` requires its first and only input to -be an array. +Arrow supports logical compute operations over inputs of possibly +varying types. Many compute functions support both array (chunked or not) +and scalar inputs, but some will mandate either. For example, +``sort_indices`` requires its first and only input to be an array. Below are a few simple examples: @@ -44,12 +42,12 @@ Below are a few simple examples: true, true, false - ] + ] >>> x, y = pa.scalar(7.8), pa.scalar(9.3) >>> pc.multiply(x, y) -These functions can do more than just element-by-element operations. +These functions can do more than just element-by-element operations. Here is an example of sorting a table: >>> import pyarrow as pa diff --git a/python/pyarrow/compute.py b/python/pyarrow/compute.py index 1a017ea2ef4..6f8b9fa3dae 100644 --- a/python/pyarrow/compute.py +++ b/python/pyarrow/compute.py @@ -599,13 +599,15 @@ def fill_null(values, fill_value): the same type as values or able to be implicitly casted to the array's type. + This is an alias for :func:`coalesce`. + Parameters ---------- - data : Array, ChunkedArray - replace each null element with fill_value - fill_value: Scalar-like object - Either a pyarrow.Scalar or any python object coercible to a - Scalar. If not same type as data will attempt to cast. + data : Array, ChunkedArray, or Scalar-like object + Each null element is replaced with the corresponding value + from fill_value. + fill_value: Array, ChunkedArray, or Scalar-like object + If not same type as data will attempt to cast. Returns ------- @@ -625,9 +627,9 @@ def fill_null(values, fill_value): 3 ] """ - if not isinstance(fill_value, pa.Scalar): + if not isinstance(fill_value, (pa.Array, pa.ChunkedArray, pa.Scalar)): fill_value = pa.scalar(fill_value, type=values.type) elif values.type != fill_value.type: fill_value = pa.scalar(fill_value.as_py(), type=values.type) - return call_function("fill_null", [values, fill_value]) + return call_function("coalesce", [values, fill_value]) diff --git a/r/R/compute.R b/r/R/compute.R index 3953eaa5dfc..39940eedc8c 100644 --- a/r/R/compute.R +++ b/r/R/compute.R @@ -35,7 +35,7 @@ #' @examplesIf arrow_available() #' a <- Array$create(c(1L, 2L, 3L, NA, 5L)) #' s <- Scalar$create(4L) -#' call_function("fill_null", a, s) +#' call_function("coalesce", a, s) #' #' a <- Array$create(rnorm(10000)) #' call_function("quantile", a, options = list(q = seq(0, 1, 0.25))) diff --git a/r/man/call_function.Rd b/r/man/call_function.Rd index bef89f10b18..c216af06f1f 100644 --- a/r/man/call_function.Rd +++ b/r/man/call_function.Rd @@ -39,7 +39,7 @@ When passing indices in \code{...}, \code{args}, or \code{options}, express them \dontshow{if (arrow_available()) (if (getRversion() >= "3.4") withAutoprint else force)(\{ # examplesIf} a <- Array$create(c(1L, 2L, 3L, NA, 5L)) s <- Scalar$create(4L) -call_function("fill_null", a, s) +call_function("coalesce", a, s) a <- Array$create(rnorm(10000)) call_function("quantile", a, options = list(q = seq(0, 1, 0.25))) From 567b4d42ec05a8c9659c82a93165220139c44a18 Mon Sep 17 00:00:00 2001 From: David Li Date: Tue, 24 Aug 2021 11:41:21 -0400 Subject: [PATCH 04/14] ARROW-7179: [C++] Clean up benchmarks --- .../arrow/compute/kernels/scalar_if_else.cc | 3 +- .../kernels/scalar_if_else_benchmark.cc | 74 +++++++++++-------- 2 files changed, 46 insertions(+), 31 deletions(-) diff --git a/cpp/src/arrow/compute/kernels/scalar_if_else.cc b/cpp/src/arrow/compute/kernels/scalar_if_else.cc index 35495de6ed0..33cce28bcdb 100644 --- a/cpp/src/arrow/compute/kernels/scalar_if_else.cc +++ b/cpp/src/arrow/compute/kernels/scalar_if_else.cc @@ -1827,6 +1827,8 @@ Status ExecArrayCoalesce(KernelContext* ctx, const ExecBatch& batch, Datum* out) return Status::OK(); } +// Special case: implement 'coalesce' for an array and a scalar for any +// fixed-width type (a 'fill_null' operation) template Status ExecArrayScalarCoalesce(KernelContext* ctx, Datum left, Datum right, int64_t length, Datum* out) { @@ -1916,7 +1918,6 @@ Status ExecBinaryCoalesce(KernelContext* ctx, Datum left, Datum right, int64_t l } // RHS is array - // TODO: benchmark this while (offset < length) { const auto block = bit_counter.NextWord(); if (block.AllSet()) { diff --git a/cpp/src/arrow/compute/kernels/scalar_if_else_benchmark.cc b/cpp/src/arrow/compute/kernels/scalar_if_else_benchmark.cc index 38bd728c3dd..71eed3f4a4d 100644 --- a/cpp/src/arrow/compute/kernels/scalar_if_else_benchmark.cc +++ b/cpp/src/arrow/compute/kernels/scalar_if_else_benchmark.cc @@ -163,19 +163,22 @@ static void CaseWhenBench(benchmark::State& state) { field("cond", boolean(), key_value_metadata({{"null_probability", "0.01"}})); auto cond = rand.ArrayOf(*field("", struct_({cond_field, cond_field, cond_field}), key_value_metadata({{"null_probability", "0.0"}})), - len); + len) + ->Slice(offset); auto val1 = std::static_pointer_cast( - rand.ArrayOf(type, len, /*null_probability=*/0.01)); + rand.ArrayOf(type, len, /*null_probability=*/0.01)) + ->Slice(offset); auto val2 = std::static_pointer_cast( - rand.ArrayOf(type, len, /*null_probability=*/0.01)); + rand.ArrayOf(type, len, /*null_probability=*/0.01)) + ->Slice(offset); auto val3 = std::static_pointer_cast( - rand.ArrayOf(type, len, /*null_probability=*/0.01)); + rand.ArrayOf(type, len, /*null_probability=*/0.01)) + ->Slice(offset); auto val4 = std::static_pointer_cast( - rand.ArrayOf(type, len, /*null_probability=*/0.01)); + rand.ArrayOf(type, len, /*null_probability=*/0.01)) + ->Slice(offset); for (auto _ : state) { - ABORT_NOT_OK( - CaseWhen(cond->Slice(offset), {val1->Slice(offset), val2->Slice(offset), - val3->Slice(offset), val4->Slice(offset)})); + ABORT_NOT_OK(CaseWhen(cond, {val1, val2, val3, val4})); } // Set bytes processed to ~length of output @@ -232,18 +235,22 @@ static void CaseWhenBenchContiguous(benchmark::State& state) { auto cond2 = std::static_pointer_cast( rand.ArrayOf(boolean(), len, /*null_probability=*/0.01)); auto val1 = std::static_pointer_cast( - rand.ArrayOf(type, len, /*null_probability=*/0.01)); + rand.ArrayOf(type, len, /*null_probability=*/0.01)) + ->Slice(offset); auto val2 = std::static_pointer_cast( - rand.ArrayOf(type, len, /*null_probability=*/0.01)); + rand.ArrayOf(type, len, /*null_probability=*/0.01)) + ->Slice(offset); auto val3 = std::static_pointer_cast( - rand.ArrayOf(type, len, /*null_probability=*/0.01)); + rand.ArrayOf(type, len, /*null_probability=*/0.01)) + ->Slice(offset); ASSERT_OK_AND_ASSIGN( - auto cond, StructArray::Make({cond1, cond2}, std::vector{"a", "b"}, - nullptr, /*null_count=*/0)); + std::shared_ptr cond, + StructArray::Make({cond1, cond2}, std::vector{"a", "b"}, nullptr, + /*null_count=*/0)); + cond = cond->Slice(offset); for (auto _ : state) { - ABORT_NOT_OK(CaseWhen(cond->Slice(offset), {val1->Slice(offset), val2->Slice(offset), - val3->Slice(offset)})); + ABORT_NOT_OK(CaseWhen(cond, {val1, val2, val3})); } // Set bytes processed to ~length of output @@ -269,16 +276,16 @@ static void CaseWhenBenchStringContiguous(benchmark::State& state) { template static void CoalesceBench(benchmark::State& state) { - using CType = typename Type::c_type; auto type = TypeTraits::type_singleton(); int64_t len = state.range(0); int64_t offset = state.range(1); + int64_t num_arguments = state.range(2); random::RandomArrayGenerator rand(/*seed=*/0); std::vector arguments; - for (int i = 0; i < 4; i++) { + for (int i = 0; i < num_arguments; i++) { arguments.emplace_back( rand.ArrayOf(type, len, /*null_probability=*/0.25)->Slice(offset)); } @@ -287,8 +294,9 @@ static void CoalesceBench(benchmark::State& state) { ABORT_NOT_OK(CallFunction("coalesce", arguments)); } - state.SetBytesProcessed(state.iterations() * arguments.size() * (len - offset) * - sizeof(CType)); + state.SetBytesProcessed(state.iterations() * + GetBytesProcessed::Get(arguments.front().make_array())); + state.SetItemsProcessed(state.iterations() * (len - offset)); } template @@ -310,7 +318,9 @@ static void CoalesceScalarBench(benchmark::State& state) { ABORT_NOT_OK(CallFunction("coalesce", arguments)); } - state.SetBytesProcessed(state.iterations() * (len - offset) * sizeof(CType)); + state.SetBytesProcessed(state.iterations() * + GetBytesProcessed::Get(arguments.front().make_array())); + state.SetItemsProcessed(state.iterations() * (len - offset)); } static void CoalesceScalarStringBench(benchmark::State& state) { @@ -326,13 +336,13 @@ static void CoalesceScalarStringBench(benchmark::State& state) { ABORT_NOT_OK(CallFunction("coalesce", arguments)); } - state.SetBytesProcessed(state.iterations() * - static_cast(*arr).total_values_length()); + state.SetBytesProcessed(state.iterations() * GetBytesProcessed::Get( + arguments.front().make_array())); + state.SetItemsProcessed(state.iterations() * (len - offset)); } template static void CoalesceNonNullBench(benchmark::State& state) { - using CType = typename Type::c_type; auto type = TypeTraits::type_singleton(); int64_t len = state.range(0); @@ -349,8 +359,9 @@ static void CoalesceNonNullBench(benchmark::State& state) { ABORT_NOT_OK(CallFunction("coalesce", arguments)); } - state.SetBytesProcessed(state.iterations() * arguments.size() * (len - offset) * - sizeof(CType)); + state.SetBytesProcessed(state.iterations() * + GetBytesProcessed::Get(arguments.front().make_array())); + state.SetItemsProcessed(state.iterations() * (len - offset)); } static void CoalesceBench64(benchmark::State& state) { @@ -362,13 +373,12 @@ static void CoalesceScalarBench64(benchmark::State& state) { } static void CoalesceNonNullBench64(benchmark::State& state) { - return CoalesceBench(state); + return CoalesceNonNullBench(state); } template static void ChooseBench(benchmark::State& state) { constexpr int kNumChoices = 5; - using CType = typename Type::c_type; auto type = TypeTraits::type_singleton(); int64_t len = state.range(0); @@ -389,7 +399,9 @@ static void ChooseBench(benchmark::State& state) { ABORT_NOT_OK(CallFunction("choose", arguments)); } - state.SetBytesProcessed(state.iterations() * (len - offset) * sizeof(CType)); + state.SetBytesProcessed(state.iterations() * + GetBytesProcessed::Get(arguments[1].make_array())); + state.SetItemsProcessed(state.iterations() * (len - offset)); } static void ChooseBench64(benchmark::State& state) { @@ -429,8 +441,10 @@ BENCHMARK(CaseWhenBenchString)->Args({kFewItems, 99}); BENCHMARK(CaseWhenBenchStringContiguous)->Args({kFewItems, 0}); BENCHMARK(CaseWhenBenchStringContiguous)->Args({kFewItems, 99}); -BENCHMARK(CoalesceBench64)->Args({kNumItems, 0}); -BENCHMARK(CoalesceBench64)->Args({kNumItems, 99}); +BENCHMARK(CoalesceBench64)->Args({kNumItems, 0, 2}); +BENCHMARK(CoalesceBench64)->Args({kNumItems, 0, 4}); +BENCHMARK(CoalesceBench64)->Args({kNumItems, 99, 2}); +BENCHMARK(CoalesceBench64)->Args({kNumItems, 99, 4}); BENCHMARK(CoalesceScalarBench64)->Args({kNumItems, 0}); BENCHMARK(CoalesceScalarStringBench)->Args({kNumItems, 0}); From 66c210c37ddb70db930ceae683e8e7281de98fe5 Mon Sep 17 00:00:00 2001 From: David Li Date: Tue, 24 Aug 2021 11:49:58 -0400 Subject: [PATCH 05/14] ARROW-7179: [C++] Clean up Python tests --- python/pyarrow/tests/test_compute.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/python/pyarrow/tests/test_compute.py b/python/pyarrow/tests/test_compute.py index 3c05a3071e3..8713830400a 100644 --- a/python/pyarrow/tests/test_compute.py +++ b/python/pyarrow/tests/test_compute.py @@ -1339,7 +1339,8 @@ def test_is_null(): def test_fill_null(): arr = pa.array([1, 2, None, 4], type=pa.int8()) fill_value = pa.array([5], type=pa.int8()) - with pytest.raises(pa.ArrowInvalid, match="tried to convert to int"): + with pytest.raises(pa.ArrowInvalid, + match="Array arguments must all be the same length"): arr.fill_null(fill_value) arr = pa.array([None, None, None, None], type=pa.null()) From 7a72ed8709535d29c3ee312ca2143a73ce943bfe Mon Sep 17 00:00:00 2001 From: David Li Date: Wed, 25 Aug 2021 12:31:56 -0400 Subject: [PATCH 06/14] ARROW-7179: [C++] Simplify some cases --- .../arrow/compute/kernels/scalar_if_else.cc | 30 ++++++++----------- 1 file changed, 13 insertions(+), 17 deletions(-) diff --git a/cpp/src/arrow/compute/kernels/scalar_if_else.cc b/cpp/src/arrow/compute/kernels/scalar_if_else.cc index 33cce28bcdb..b1dc257dca8 100644 --- a/cpp/src/arrow/compute/kernels/scalar_if_else.cc +++ b/cpp/src/arrow/compute/kernels/scalar_if_else.cc @@ -1838,10 +1838,8 @@ Status ExecArrayScalarCoalesce(KernelContext* ctx, Datum left, Datum right, uint8_t* out_values = output->buffers[1]->mutable_data(); const ArrayData& left_arr = *left.array(); - const uint8_t* left_valid = - left_arr.MayHaveNulls() ? left_arr.buffers[0]->data() : nullptr; - arrow::internal::OptionalBitBlockCounter bit_counter(left_valid, left_arr.offset, - left_arr.length); + const uint8_t* left_valid = left_arr.buffers[0]->data(); + BitBlockCounter bit_counter(left_valid, left_arr.offset, left_arr.length); int64_t offset = 0; const uint8_t* left_values = left_arr.buffers[1]->data(); @@ -1897,27 +1895,25 @@ Status ExecBinaryCoalesce(KernelContext* ctx, Datum left, Datum right, int64_t l const int64_t out_offset = output->offset; uint8_t* out_valid = output->buffers[0]->mutable_data(); uint8_t* out_values = output->buffers[1]->mutable_data(); - if (left.is_scalar()) { - // LHS is scalar + // (Scalar, Any) CopyValues(left.scalar()->is_valid ? left : right, /*in_offset=*/0, length, out_valid, out_values, out_offset); return Status::OK(); + } else if (left.null_count() == 0) { + // LHS is array without nulls. Must copy (since we preallocate) + CopyValues(left, /*in_offset=*/0, length, out_valid, out_values, out_offset); + return Status::OK(); + } else if (right.is_scalar()) { + // (Array, Scalar) + return ExecArrayScalarCoalesce(ctx, left, right, length, out); } - // Array with nulls + // (Array, Array) const ArrayData& left_arr = *left.array(); - const uint8_t* left_valid = - left_arr.MayHaveNulls() ? left_arr.buffers[0]->data() : nullptr; - arrow::internal::OptionalBitBlockCounter bit_counter(left_valid, left_arr.offset, - left_arr.length); + const uint8_t* left_valid = left_arr.buffers[0]->data(); + BitBlockCounter bit_counter(left_valid, left_arr.offset, left_arr.length); int64_t offset = 0; - - if (right.is_scalar()) { - return ExecArrayScalarCoalesce(ctx, left, right, length, out); - } - - // RHS is array while (offset < length) { const auto block = bit_counter.NextWord(); if (block.AllSet()) { From 7e4d9a5db7214341ede5770ac92da2b63d465245 Mon Sep 17 00:00:00 2001 From: David Li Date: Wed, 25 Aug 2021 13:18:41 -0400 Subject: [PATCH 07/14] ARROW-7179: [C++] Better parameterize coalesce benchmarks --- .../kernels/scalar_if_else_benchmark.cc | 106 ++++++++---------- 1 file changed, 49 insertions(+), 57 deletions(-) diff --git a/cpp/src/arrow/compute/kernels/scalar_if_else_benchmark.cc b/cpp/src/arrow/compute/kernels/scalar_if_else_benchmark.cc index 71eed3f4a4d..f36920f3694 100644 --- a/cpp/src/arrow/compute/kernels/scalar_if_else_benchmark.cc +++ b/cpp/src/arrow/compute/kernels/scalar_if_else_benchmark.cc @@ -274,20 +274,46 @@ static void CaseWhenBenchStringContiguous(benchmark::State& state) { return CaseWhenBenchContiguous(state); } +struct CoalesceParams { + int64_t length; + int64_t num_arguments; + double null_probability; +}; + +std::vector g_coalesce_params = { + {kNumItems, 2, 0.01}, + {kNumItems, 4, 0.01}, + {kNumItems, 2, 0.25}, + {kNumItems, 4, 0.25}, +}; + +struct CoalesceArgs : public CoalesceParams { + explicit CoalesceArgs(benchmark::State& state) : state_(state) { + const auto& params = g_coalesce_params[state.range(0)]; + length = params.length; + num_arguments = params.num_arguments; + null_probability = params.null_probability; + } + + ~CoalesceArgs() { + state_.counters["length"] = length; + state_.counters["null%"] = null_probability * 100; + state_.counters["num_args"] = num_arguments; + } + + private: + benchmark::State& state_; +}; + template static void CoalesceBench(benchmark::State& state) { auto type = TypeTraits::type_singleton(); - - int64_t len = state.range(0); - int64_t offset = state.range(1); - int64_t num_arguments = state.range(2); - + CoalesceArgs params(state); random::RandomArrayGenerator rand(/*seed=*/0); std::vector arguments; - for (int i = 0; i < num_arguments; i++) { - arguments.emplace_back( - rand.ArrayOf(type, len, /*null_probability=*/0.25)->Slice(offset)); + for (int i = 0; i < params.num_arguments; i++) { + arguments.emplace_back(rand.ArrayOf(type, params.length, params.null_probability)); } for (auto _ : state) { @@ -296,21 +322,18 @@ static void CoalesceBench(benchmark::State& state) { state.SetBytesProcessed(state.iterations() * GetBytesProcessed::Get(arguments.front().make_array())); - state.SetItemsProcessed(state.iterations() * (len - offset)); + state.SetItemsProcessed(state.iterations() * params.length); } template static void CoalesceScalarBench(benchmark::State& state) { using CType = typename Type::c_type; auto type = TypeTraits::type_singleton(); - - int64_t len = state.range(0); - int64_t offset = state.range(1); - + CoalesceArgs params(state); random::RandomArrayGenerator rand(/*seed=*/0); std::vector arguments = { - rand.ArrayOf(type, len, /*null_probability=*/0.25)->Slice(offset), + rand.ArrayOf(type, params.length, params.null_probability), Datum(CType(42)), }; @@ -320,16 +343,14 @@ static void CoalesceScalarBench(benchmark::State& state) { state.SetBytesProcessed(state.iterations() * GetBytesProcessed::Get(arguments.front().make_array())); - state.SetItemsProcessed(state.iterations() * (len - offset)); + state.SetItemsProcessed(state.iterations() * params.length); } static void CoalesceScalarStringBench(benchmark::State& state) { - int64_t len = state.range(0); - int64_t offset = state.range(1); - + CoalesceArgs params(state); random::RandomArrayGenerator rand(/*seed=*/0); - auto arr = rand.ArrayOf(utf8(), len, /*null_probability=*/0.25)->Slice(offset); + auto arr = rand.ArrayOf(utf8(), params.length, params.null_probability); std::vector arguments = {arr, Datum("foobar")}; for (auto _ : state) { @@ -338,30 +359,7 @@ static void CoalesceScalarStringBench(benchmark::State& state) { state.SetBytesProcessed(state.iterations() * GetBytesProcessed::Get( arguments.front().make_array())); - state.SetItemsProcessed(state.iterations() * (len - offset)); -} - -template -static void CoalesceNonNullBench(benchmark::State& state) { - auto type = TypeTraits::type_singleton(); - - int64_t len = state.range(0); - int64_t offset = state.range(1); - - random::RandomArrayGenerator rand(/*seed=*/0); - - std::vector arguments; - arguments.emplace_back( - rand.ArrayOf(type, len, /*null_probability=*/0.25)->Slice(offset)); - arguments.emplace_back(rand.ArrayOf(type, len, /*null_probability=*/0)->Slice(offset)); - - for (auto _ : state) { - ABORT_NOT_OK(CallFunction("coalesce", arguments)); - } - - state.SetBytesProcessed(state.iterations() * - GetBytesProcessed::Get(arguments.front().make_array())); - state.SetItemsProcessed(state.iterations() * (len - offset)); + state.SetItemsProcessed(state.iterations() * params.length); } static void CoalesceBench64(benchmark::State& state) { @@ -372,10 +370,6 @@ static void CoalesceScalarBench64(benchmark::State& state) { return CoalesceScalarBench(state); } -static void CoalesceNonNullBench64(benchmark::State& state) { - return CoalesceNonNullBench(state); -} - template static void ChooseBench(benchmark::State& state) { constexpr int kNumChoices = 5; @@ -441,16 +435,14 @@ BENCHMARK(CaseWhenBenchString)->Args({kFewItems, 99}); BENCHMARK(CaseWhenBenchStringContiguous)->Args({kFewItems, 0}); BENCHMARK(CaseWhenBenchStringContiguous)->Args({kFewItems, 99}); -BENCHMARK(CoalesceBench64)->Args({kNumItems, 0, 2}); -BENCHMARK(CoalesceBench64)->Args({kNumItems, 0, 4}); -BENCHMARK(CoalesceBench64)->Args({kNumItems, 99, 2}); -BENCHMARK(CoalesceBench64)->Args({kNumItems, 99, 4}); - -BENCHMARK(CoalesceScalarBench64)->Args({kNumItems, 0}); -BENCHMARK(CoalesceScalarStringBench)->Args({kNumItems, 0}); - -BENCHMARK(CoalesceNonNullBench64)->Args({kNumItems, 0}); -BENCHMARK(CoalesceNonNullBench64)->Args({kNumItems, 99}); +void CoalesceSetArgs(benchmark::internal::Benchmark* bench) { + for (size_t i = 0; i < g_coalesce_params.size(); i++) { + bench->Args({static_cast(i)}); + } +} +BENCHMARK(CoalesceBench64)->Apply(CoalesceSetArgs); +BENCHMARK(CoalesceScalarBench64)->Apply(CoalesceSetArgs); +BENCHMARK(CoalesceScalarStringBench)->Apply(CoalesceSetArgs); BENCHMARK(ChooseBench64)->Args({kNumItems, 0}); BENCHMARK(ChooseBench64)->Args({kNumItems, 99}); From c3a1f8596197f413ab7d218958482afab0aa7abc Mon Sep 17 00:00:00 2001 From: David Li Date: Wed, 25 Aug 2021 13:23:13 -0400 Subject: [PATCH 08/14] ARROW-7179: [C++] Minor tweak --- cpp/src/arrow/compute/kernels/scalar_if_else.cc | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/cpp/src/arrow/compute/kernels/scalar_if_else.cc b/cpp/src/arrow/compute/kernels/scalar_if_else.cc index b1dc257dca8..844037ab13f 100644 --- a/cpp/src/arrow/compute/kernels/scalar_if_else.cc +++ b/cpp/src/arrow/compute/kernels/scalar_if_else.cc @@ -1839,11 +1839,11 @@ Status ExecArrayScalarCoalesce(KernelContext* ctx, Datum left, Datum right, const ArrayData& left_arr = *left.array(); const uint8_t* left_valid = left_arr.buffers[0]->data(); - BitBlockCounter bit_counter(left_valid, left_arr.offset, left_arr.length); - int64_t offset = 0; - const uint8_t* left_values = left_arr.buffers[1]->data(); const Scalar& right_scalar = *right.scalar(); + + BitBlockCounter bit_counter(left_valid, left_arr.offset, left_arr.length); + int64_t offset = 0; while (offset < length) { const auto block = bit_counter.NextWord(); if (block.AllSet()) { From 34748de8bf272596ce809bb728ca7e963b3211ed Mon Sep 17 00:00:00 2001 From: David Li Date: Wed, 25 Aug 2021 13:33:43 -0400 Subject: [PATCH 09/14] ARROW-7179: [C++] Don't benchmark redundant cases --- .../arrow/compute/kernels/scalar_if_else_benchmark.cc | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/cpp/src/arrow/compute/kernels/scalar_if_else_benchmark.cc b/cpp/src/arrow/compute/kernels/scalar_if_else_benchmark.cc index f36920f3694..7fc42f65030 100644 --- a/cpp/src/arrow/compute/kernels/scalar_if_else_benchmark.cc +++ b/cpp/src/arrow/compute/kernels/scalar_if_else_benchmark.cc @@ -440,9 +440,16 @@ void CoalesceSetArgs(benchmark::internal::Benchmark* bench) { bench->Args({static_cast(i)}); } } +void CoalesceSetBinaryArgs(benchmark::internal::Benchmark* bench) { + for (size_t i = 0; i < g_coalesce_params.size(); i++) { + if (g_coalesce_params[i].num_arguments == 2) { + bench->Args({static_cast(i)}); + } + } +} BENCHMARK(CoalesceBench64)->Apply(CoalesceSetArgs); -BENCHMARK(CoalesceScalarBench64)->Apply(CoalesceSetArgs); -BENCHMARK(CoalesceScalarStringBench)->Apply(CoalesceSetArgs); +BENCHMARK(CoalesceScalarBench64)->Apply(CoalesceSetBinaryArgs); +BENCHMARK(CoalesceScalarStringBench)->Apply(CoalesceSetBinaryArgs); BENCHMARK(ChooseBench64)->Args({kNumItems, 0}); BENCHMARK(ChooseBench64)->Args({kNumItems, 99}); From 73d366a5c0eeafc02204e01b03c07dc48737d2e7 Mon Sep 17 00:00:00 2001 From: David Li Date: Wed, 25 Aug 2021 14:14:07 -0400 Subject: [PATCH 10/14] ARROW-7179: [C++] Add casts for MSVC --- cpp/src/arrow/compute/kernels/scalar_if_else_benchmark.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cpp/src/arrow/compute/kernels/scalar_if_else_benchmark.cc b/cpp/src/arrow/compute/kernels/scalar_if_else_benchmark.cc index 7fc42f65030..5e03d9be80b 100644 --- a/cpp/src/arrow/compute/kernels/scalar_if_else_benchmark.cc +++ b/cpp/src/arrow/compute/kernels/scalar_if_else_benchmark.cc @@ -296,9 +296,9 @@ struct CoalesceArgs : public CoalesceParams { } ~CoalesceArgs() { - state_.counters["length"] = length; + state_.counters["length"] = static_cast(length); state_.counters["null%"] = null_probability * 100; - state_.counters["num_args"] = num_arguments; + state_.counters["num_args"] = static_cast(num_arguments); } private: From cfcf2f44417c0a96dd37e96f9f5648d7124d8084 Mon Sep 17 00:00:00 2001 From: David Li Date: Wed, 25 Aug 2021 13:29:44 -0400 Subject: [PATCH 11/14] ARROW-7179: [C++] Use BitRunReader --- .../arrow/compute/kernels/scalar_if_else.cc | 30 +++++++------------ 1 file changed, 10 insertions(+), 20 deletions(-) diff --git a/cpp/src/arrow/compute/kernels/scalar_if_else.cc b/cpp/src/arrow/compute/kernels/scalar_if_else.cc index 844037ab13f..844073151be 100644 --- a/cpp/src/arrow/compute/kernels/scalar_if_else.cc +++ b/cpp/src/arrow/compute/kernels/scalar_if_else.cc @@ -23,6 +23,7 @@ #include #include #include +#include #include #include #include @@ -1842,33 +1843,22 @@ Status ExecArrayScalarCoalesce(KernelContext* ctx, Datum left, Datum right, const uint8_t* left_values = left_arr.buffers[1]->data(); const Scalar& right_scalar = *right.scalar(); - BitBlockCounter bit_counter(left_valid, left_arr.offset, left_arr.length); + arrow::internal::BitRunReader reader(left_valid, left_arr.offset, left_arr.length); int64_t offset = 0; - while (offset < length) { - const auto block = bit_counter.NextWord(); - if (block.AllSet()) { + while (true) { + const auto run = reader.NextRun(); + if (run.length == 0) break; + if (run.set) { // All from left CopyFixedWidth::CopyArray(*left_arr.type, left_values, - left_arr.offset + offset, block.length, out_values, + left_arr.offset + offset, run.length, out_values, out_offset + offset); - } else if (block.NoneSet()) { + } else { // All from right - CopyFixedWidth::CopyScalar(right_scalar, block.length, out_values, + CopyFixedWidth::CopyScalar(right_scalar, run.length, out_values, out_offset + offset); - } else if (block.popcount) { - // One by one - for (int64_t j = 0; j < block.length; ++j) { - if (BitUtil::GetBit(left_valid, left_arr.offset + offset + j)) { - CopyFixedWidth::CopyArray( - *left_arr.type, left_values, left_arr.offset + offset + j, - /*length=*/1, out_values, out_offset + offset + j); - } else { - CopyFixedWidth::CopyScalar(right_scalar, /*length=*/1, out_values, - out_offset + offset + j); - } - } } - offset += block.length; + offset += run.length; } if (right_scalar.is_valid || !left_valid) { From 40b079bee6bc70fd3d7e5bc4334a449d99da866c Mon Sep 17 00:00:00 2001 From: David Li Date: Thu, 26 Aug 2021 09:07:25 -0400 Subject: [PATCH 12/14] ARROW-7179: [C++] Test 50% and 99% nulls --- cpp/src/arrow/compute/kernels/scalar_if_else_benchmark.cc | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/cpp/src/arrow/compute/kernels/scalar_if_else_benchmark.cc b/cpp/src/arrow/compute/kernels/scalar_if_else_benchmark.cc index 5e03d9be80b..feadccf84e0 100644 --- a/cpp/src/arrow/compute/kernels/scalar_if_else_benchmark.cc +++ b/cpp/src/arrow/compute/kernels/scalar_if_else_benchmark.cc @@ -285,6 +285,10 @@ std::vector g_coalesce_params = { {kNumItems, 4, 0.01}, {kNumItems, 2, 0.25}, {kNumItems, 4, 0.25}, + {kNumItems, 2, 0.50}, + {kNumItems, 4, 0.50}, + {kNumItems, 2, 0.99}, + {kNumItems, 4, 0.99}, }; struct CoalesceArgs : public CoalesceParams { From 8fa64f1fefcceebecf17e9e0ddbdf034bb8e0ffb Mon Sep 17 00:00:00 2001 From: David Li Date: Thu, 26 Aug 2021 09:40:42 -0400 Subject: [PATCH 13/14] ARROW-7179: [C++] Lint --- .../arrow/compute/kernels/scalar_if_else_benchmark.cc | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/cpp/src/arrow/compute/kernels/scalar_if_else_benchmark.cc b/cpp/src/arrow/compute/kernels/scalar_if_else_benchmark.cc index feadccf84e0..b6d6bf6e44f 100644 --- a/cpp/src/arrow/compute/kernels/scalar_if_else_benchmark.cc +++ b/cpp/src/arrow/compute/kernels/scalar_if_else_benchmark.cc @@ -281,14 +281,9 @@ struct CoalesceParams { }; std::vector g_coalesce_params = { - {kNumItems, 2, 0.01}, - {kNumItems, 4, 0.01}, - {kNumItems, 2, 0.25}, - {kNumItems, 4, 0.25}, - {kNumItems, 2, 0.50}, - {kNumItems, 4, 0.50}, - {kNumItems, 2, 0.99}, - {kNumItems, 4, 0.99}, + {kNumItems, 2, 0.01}, {kNumItems, 4, 0.01}, {kNumItems, 2, 0.25}, + {kNumItems, 4, 0.25}, {kNumItems, 2, 0.50}, {kNumItems, 4, 0.50}, + {kNumItems, 2, 0.99}, {kNumItems, 4, 0.99}, }; struct CoalesceArgs : public CoalesceParams { From 43a44500991e2193a99e582d50f669ab1f6beeea Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Thu, 26 Aug 2021 18:53:47 +0200 Subject: [PATCH 14/14] Improve coalesce performance Before: ``` -------------------------------------------------------------------------------------- Benchmark Time CPU Iterations UserCounters... -------------------------------------------------------------------------------------- CoalesceBench64/0 3366683 ns 3364669 ns 213 bytes_per_second=2.32192G/s items_per_second=311.643M/s length=1048.58k null%=1 num_args=2 CoalesceBench64/1 2851663 ns 2849846 ns 245 bytes_per_second=2.74138G/s items_per_second=367.941M/s length=1048.58k null%=1 num_args=4 CoalesceBench64/2 7499865 ns 7495813 ns 95 bytes_per_second=1067.26M/s items_per_second=139.888M/s length=1048.58k null%=25 num_args=2 CoalesceBench64/3 11773437 ns 11766272 ns 60 bytes_per_second=679.909M/s items_per_second=89.1171M/s length=1048.58k null%=25 num_args=4 CoalesceBench64/4 9636207 ns 9631169 ns 73 bytes_per_second=830.636M/s items_per_second=108.873M/s length=1048.58k null%=50 num_args=2 CoalesceBench64/5 19456855 ns 19445858 ns 36 bytes_per_second=411.399M/s items_per_second=53.9228M/s length=1048.58k null%=50 num_args=4 CoalesceBench64/6 3288217 ns 3286426 ns 214 bytes_per_second=2.3772G/s items_per_second=319.063M/s length=1048.58k null%=99 num_args=2 CoalesceBench64/7 7603232 ns 7599720 ns 92 bytes_per_second=1052.67M/s items_per_second=137.976M/s length=1048.58k null%=99 num_args=4 CoalesceScalarBench64/0 775260 ns 774797 ns 904 bytes_per_second=10.0833G/s items_per_second=1.35336G/s length=1048.58k null%=1 num_args=2 CoalesceScalarBench64/2 3500267 ns 3498388 ns 201 bytes_per_second=2.23317G/s items_per_second=299.731M/s length=1048.58k null%=25 num_args=2 CoalesceScalarBench64/4 4815186 ns 4812821 ns 146 bytes_per_second=1.62327G/s items_per_second=217.871M/s length=1048.58k null%=50 num_args=2 CoalesceScalarBench64/6 446897 ns 446783 ns 1541 bytes_per_second=17.4861G/s items_per_second=2.34695G/s length=1048.58k null%=99 num_args=2 CoalesceScalarStringBench/0 74138532 ns 74089097 ns 10 bytes_per_second=6.72872G/s items_per_second=14.1529M/s length=1048.58k null%=1 num_args=2 CoalesceScalarStringBench/2 58106933 ns 58064020 ns 9 bytes_per_second=6.52407G/s items_per_second=18.059M/s length=1048.58k null%=25 num_args=2 CoalesceScalarStringBench/4 52094990 ns 52064312 ns 10 bytes_per_second=4.88432G/s items_per_second=20.14M/s length=1048.58k null%=50 num_args=2 CoalesceScalarStringBench/6 5136540 ns 5133121 ns 138 bytes_per_second=1.7244G/s items_per_second=204.276M/s length=1048.58k null%=99 num_args=2 ``` After: ``` -------------------------------------------------------------------------------------- Benchmark Time CPU Iterations UserCounters... -------------------------------------------------------------------------------------- CoalesceBench64/0 1047061 ns 1046399 ns 661 bytes_per_second=7.46608G/s items_per_second=1002.08M/s length=1048.58k null%=1 num_args=2 CoalesceBench64/1 1377282 ns 1376405 ns 511 bytes_per_second=5.67602G/s items_per_second=761.822M/s length=1048.58k null%=1 num_args=4 CoalesceBench64/2 2804061 ns 2802178 ns 251 bytes_per_second=2.78801G/s items_per_second=374.2M/s length=1048.58k null%=25 num_args=2 CoalesceBench64/3 5234633 ns 5230898 ns 134 bytes_per_second=1.49353G/s items_per_second=200.458M/s length=1048.58k null%=25 num_args=4 CoalesceBench64/4 3700820 ns 3698116 ns 190 bytes_per_second=2.11256G/s items_per_second=283.543M/s length=1048.58k null%=50 num_args=2 CoalesceBench64/5 7731316 ns 7726379 ns 90 bytes_per_second=1035.41M/s items_per_second=135.714M/s length=1048.58k null%=50 num_args=4 CoalesceBench64/6 1004359 ns 1003745 ns 693 bytes_per_second=7.78335G/s items_per_second=1044.66M/s length=1048.58k null%=99 num_args=2 CoalesceBench64/7 4660379 ns 4658001 ns 151 bytes_per_second=1.67722G/s items_per_second=225.113M/s length=1048.58k null%=99 num_args=4 CoalesceScalarBench64/0 656265 ns 655870 ns 1067 bytes_per_second=11.9117G/s items_per_second=1.59876G/s length=1048.58k null%=1 num_args=2 CoalesceScalarBench64/2 2889294 ns 2887898 ns 242 bytes_per_second=2.70525G/s items_per_second=363.093M/s length=1048.58k null%=25 num_args=2 CoalesceScalarBench64/4 4015990 ns 4014054 ns 175 bytes_per_second=1.94629G/s items_per_second=261.226M/s length=1048.58k null%=50 num_args=2 CoalesceScalarBench64/6 390245 ns 390138 ns 1800 bytes_per_second=20.025G/s items_per_second=2.68771G/s length=1048.58k null%=99 num_args=2 CoalesceScalarStringBench/0 82277097 ns 82223643 ns 9 bytes_per_second=6.06303G/s items_per_second=12.7527M/s length=1048.58k null%=1 num_args=2 CoalesceScalarStringBench/2 70821126 ns 70771323 ns 10 bytes_per_second=5.35265G/s items_per_second=14.8164M/s length=1048.58k null%=25 num_args=2 CoalesceScalarStringBench/4 47119447 ns 47087724 ns 13 bytes_per_second=5.40053G/s items_per_second=22.2686M/s length=1048.58k null%=50 num_args=2 CoalesceScalarStringBench/6 4579486 ns 4576728 ns 150 bytes_per_second=1.93403G/s items_per_second=229.11M/s length=1048.58k null%=99 num_args=2 ``` --- .../arrow/compute/kernels/scalar_if_else.cc | 263 +++++++++++------- 1 file changed, 160 insertions(+), 103 deletions(-) diff --git a/cpp/src/arrow/compute/kernels/scalar_if_else.cc b/cpp/src/arrow/compute/kernels/scalar_if_else.cc index 844073151be..4de04da7a81 100644 --- a/cpp/src/arrow/compute/kernels/scalar_if_else.cc +++ b/cpp/src/arrow/compute/kernels/scalar_if_else.cc @@ -15,24 +15,26 @@ // specific language governing permissions and limitations // under the License. -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include +#include "arrow/array/builder_nested.h" +#include "arrow/array/builder_primitive.h" +#include "arrow/array/builder_time.h" +#include "arrow/array/builder_union.h" +#include "arrow/compute/api.h" +#include "arrow/compute/kernels/codegen_internal.h" +#include "arrow/compute/util_internal.h" +#include "arrow/util/bit_block_counter.h" +#include "arrow/util/bit_run_reader.h" +#include "arrow/util/bitmap.h" +#include "arrow/util/bitmap_ops.h" +#include "arrow/util/bitmap_reader.h" namespace arrow { + using internal::BitBlockCount; using internal::BitBlockCounter; using internal::Bitmap; using internal::BitmapWordReader; +using internal::BitRunReader; namespace compute { namespace internal { @@ -1072,6 +1074,7 @@ struct CopyFixedWidth { arrow::internal::CopyBitmap(in_values, in_offset, length, raw_out_values, out_offset); } }; + template struct CopyFixedWidth> { using CType = typename TypeTraits::CType; @@ -1088,6 +1091,7 @@ struct CopyFixedWidth> { in_values + in_offset * sizeof(CType), length * sizeof(CType)); } }; + template struct CopyFixedWidth> { static void CopyScalar(const Scalar& values, const int64_t length, @@ -1115,6 +1119,7 @@ struct CopyFixedWidth> { std::memcpy(next, in_values + in_offset * width, length * width); } }; + template struct CopyFixedWidth> { using ScalarType = typename TypeTraits::ScalarType; @@ -1138,6 +1143,7 @@ struct CopyFixedWidth> { std::memcpy(next, in_values + in_offset * width, length * width); } }; + // Copy fixed-width values from a scalar/array datum into an output values buffer template void CopyValues(const Datum& in_values, const int64_t in_offset, const int64_t length, @@ -1726,54 +1732,45 @@ Status ExecScalarCoalesce(KernelContext* ctx, const ExecBatch& batch, Datum* out template void CopyValuesAllValid(Datum source, uint8_t* out_valid, uint8_t* out_values, const int64_t out_offset, const int64_t length) { - BitBlockCounter counter(out_valid, out_offset, length); + BitRunReader bit_reader(out_valid, out_offset, length); int64_t offset = 0; - while (offset < length) { - const auto block = counter.NextWord(); - if (block.NoneSet()) { - CopyValues(source, offset, block.length, out_valid, out_values, + while (true) { + const auto run = bit_reader.NextRun(); + if (run.length == 0) { + break; + } + if (!run.set) { + CopyValues(source, offset, run.length, out_valid, out_values, out_offset + offset); - } else if (!block.AllSet()) { - for (int64_t j = 0; j < block.length; ++j) { - if (!BitUtil::GetBit(out_valid, out_offset + offset + j)) { - CopyValues(source, offset + j, 1, out_valid, out_values, - out_offset + offset + j); - } - } } - offset += block.length; + offset += run.length; } + DCHECK_EQ(offset, length); } // Helper: zero the values buffer of the output wherever the slot is null void InitializeNullSlots(const DataType& type, uint8_t* out_valid, uint8_t* out_values, const int64_t out_offset, const int64_t length) { - BitBlockCounter counter(out_valid, out_offset, length); + BitRunReader bit_reader(out_valid, out_offset, length); int64_t offset = 0; - auto bit_width = checked_cast(type).bit_width(); - auto byte_width = BitUtil::BytesForBits(bit_width); - while (offset < length) { - const auto block = counter.NextWord(); - if (block.NoneSet()) { + const auto bit_width = checked_cast(type).bit_width(); + const auto byte_width = BitUtil::BytesForBits(bit_width); + while (true) { + const auto run = bit_reader.NextRun(); + if (run.length == 0) { + break; + } + if (!run.set) { if (bit_width == 1) { - BitUtil::SetBitsTo(out_values, out_offset + offset, block.length, false); + BitUtil::SetBitsTo(out_values, out_offset + offset, run.length, false); } else { - std::memset(out_values + (out_offset + offset) * byte_width, 0x00, - byte_width * block.length); - } - } else if (!block.AllSet()) { - for (int64_t j = 0; j < block.length; ++j) { - if (BitUtil::GetBit(out_valid, out_offset + offset + j)) continue; - if (bit_width == 1) { - BitUtil::ClearBit(out_values, out_offset + offset + j); - } else { - std::memset(out_values + (out_offset + offset + j) * byte_width, 0x00, - byte_width); - } + std::memset(out_values + (out_offset + offset) * byte_width, 0, + byte_width * run.length); } } - offset += block.length; + offset += run.length; } + DCHECK_EQ(offset, length); } // Implement 'coalesce' for any mix of scalar/array arguments for any fixed-width type @@ -1783,42 +1780,68 @@ Status ExecArrayCoalesce(KernelContext* ctx, const ExecBatch& batch, Datum* out) const int64_t out_offset = output->offset; // Use output validity buffer as mask to decide what values to copy uint8_t* out_valid = output->buffers[0]->mutable_data(); - // Clear output buffer - no values are set initially + + // Clear output validity buffer - no values are set initially BitUtil::SetBitsTo(out_valid, out_offset, batch.length, false); uint8_t* out_values = output->buffers[1]->mutable_data(); for (const auto& datum : batch.values) { - if ((datum.is_scalar() && datum.scalar()->is_valid) || - (datum.is_array() && !datum.array()->MayHaveNulls())) { + if (datum.null_count() == 0) { // Valid scalar, or all-valid array CopyValuesAllValid(datum, out_valid, out_values, out_offset, batch.length); break; } else if (datum.is_array()) { // Array with nulls const ArrayData& arr = *datum.array(); - const DataType& type = *datum.type(); + const int64_t in_offset = arr.offset; + const int64_t in_null_count = arr.null_count; + DCHECK_GT(in_null_count, 0); // computed in datum.null_count() + const DataType& type = *arr.type; const uint8_t* in_valid = arr.buffers[0]->data(); const uint8_t* in_values = arr.buffers[1]->data(); - BinaryBitBlockCounter counter(in_valid, arr.offset, out_valid, out_offset, - batch.length); - int64_t offset = 0; - while (offset < batch.length) { - const auto block = counter.NextAndNotWord(); - if (block.AllSet()) { - CopyValues(datum, offset, block.length, out_valid, out_values, - out_offset + offset); - } else if (block.popcount) { - for (int64_t j = 0; j < block.length; ++j) { - if (!BitUtil::GetBit(out_valid, out_offset + offset + j) && - BitUtil::GetBit(in_valid, arr.offset + offset + j)) { - // This version lets us avoid calling MayHaveNulls() on every iteration - // (which does an atomic load and can add up) - CopyOneArrayValue(type, in_valid, in_values, arr.offset + offset + j, - out_valid, out_values, out_offset + offset + j); + + if (in_null_count < 0.8 * batch.length) { + // The input is not mostly null, we deem it more efficient to + // copy values even underlying null slots instead of the more + // expensive bitmasking using BinaryBitBlockCounter. + BitRunReader bit_reader(out_valid, out_offset, batch.length); + int64_t offset = 0; + while (true) { + const auto run = bit_reader.NextRun(); + if (run.length == 0) { + break; + } + if (!run.set) { + // Copy from input + CopyFixedWidth::CopyArray(type, in_values, in_offset + offset, + run.length, out_values, out_offset + offset); + } + offset += run.length; + } + arrow::internal::BitmapOr(out_valid, out_offset, in_valid, in_offset, + batch.length, out_offset, out_valid); + } else { + BinaryBitBlockCounter counter(in_valid, in_offset, out_valid, out_offset, + batch.length); + int64_t offset = 0; + while (offset < batch.length) { + const auto block = counter.NextAndNotWord(); + if (block.AllSet()) { + CopyValues(datum, offset, block.length, out_valid, out_values, + out_offset + offset); + } else if (block.popcount) { + for (int64_t j = 0; j < block.length; ++j) { + if (!BitUtil::GetBit(out_valid, out_offset + offset + j) && + BitUtil::GetBit(in_valid, in_offset + offset + j)) { + // This version lets us avoid calling MayHaveNulls() on every iteration + // (which does an atomic load and can add up) + CopyOneArrayValue(type, in_valid, in_values, in_offset + offset + j, + out_valid, out_values, out_offset + offset + j); + } } } + offset += block.length; } - offset += block.length; } } } @@ -1843,22 +1866,44 @@ Status ExecArrayScalarCoalesce(KernelContext* ctx, Datum left, Datum right, const uint8_t* left_values = left_arr.buffers[1]->data(); const Scalar& right_scalar = *right.scalar(); - arrow::internal::BitRunReader reader(left_valid, left_arr.offset, left_arr.length); - int64_t offset = 0; - while (true) { - const auto run = reader.NextRun(); - if (run.length == 0) break; - if (run.set) { - // All from left - CopyFixedWidth::CopyArray(*left_arr.type, left_values, - left_arr.offset + offset, run.length, out_values, - out_offset + offset); - } else { - // All from right - CopyFixedWidth::CopyScalar(right_scalar, run.length, out_values, - out_offset + offset); + if (left.null_count() < length * 0.2) { + // There are less than 20% nulls in the left array, so first copy + // the left values, then fill any nulls with the right value + CopyFixedWidth::CopyArray(*left_arr.type, left_values, left_arr.offset, length, + out_values, out_offset); + + BitRunReader reader(left_valid, left_arr.offset, left_arr.length); + int64_t offset = 0; + while (true) { + const auto run = reader.NextRun(); + if (run.length == 0) break; + if (!run.set) { + // All from right + CopyFixedWidth::CopyScalar(right_scalar, run.length, out_values, + out_offset + offset); + } + offset += run.length; } - offset += run.length; + DCHECK_EQ(offset, length); + } else { + BitRunReader reader(left_valid, left_arr.offset, left_arr.length); + int64_t offset = 0; + while (true) { + const auto run = reader.NextRun(); + if (run.length == 0) break; + if (run.set) { + // All from left + CopyFixedWidth::CopyArray(*left_arr.type, left_values, + left_arr.offset + offset, run.length, out_values, + out_offset + offset); + } else { + // All from right + CopyFixedWidth::CopyScalar(right_scalar, run.length, out_values, + out_offset + offset); + } + offset += run.length; + } + DCHECK_EQ(offset, length); } if (right_scalar.is_valid || !left_valid) { @@ -1885,12 +1930,16 @@ Status ExecBinaryCoalesce(KernelContext* ctx, Datum left, Datum right, int64_t l const int64_t out_offset = output->offset; uint8_t* out_valid = output->buffers[0]->mutable_data(); uint8_t* out_values = output->buffers[1]->mutable_data(); + + const int64_t left_null_count = left.null_count(); + const int64_t right_null_count = right.null_count(); + if (left.is_scalar()) { // (Scalar, Any) CopyValues(left.scalar()->is_valid ? left : right, /*in_offset=*/0, length, out_valid, out_values, out_offset); return Status::OK(); - } else if (left.null_count() == 0) { + } else if (left_null_count == 0) { // LHS is array without nulls. Must copy (since we preallocate) CopyValues(left, /*in_offset=*/0, length, out_valid, out_values, out_offset); return Status::OK(); @@ -1901,32 +1950,40 @@ Status ExecBinaryCoalesce(KernelContext* ctx, Datum left, Datum right, int64_t l // (Array, Array) const ArrayData& left_arr = *left.array(); + const ArrayData& right_arr = *right.array(); const uint8_t* left_valid = left_arr.buffers[0]->data(); - BitBlockCounter bit_counter(left_valid, left_arr.offset, left_arr.length); + const uint8_t* left_values = left_arr.buffers[1]->data(); + const uint8_t* right_valid = + right_null_count > 0 ? right_arr.buffers[0]->data() : nullptr; + const uint8_t* right_values = right_arr.buffers[1]->data(); + + BitRunReader bit_reader(left_valid, left_arr.offset, left_arr.length); int64_t offset = 0; - while (offset < length) { - const auto block = bit_counter.NextWord(); - if (block.AllSet()) { + while (true) { + const auto run = bit_reader.NextRun(); + if (run.length == 0) { + break; + } + if (run.set) { // All from left - CopyValues(left, offset, block.length, out_valid, out_values, - out_offset + offset); - } else if (block.NoneSet()) { + CopyFixedWidth::CopyArray(*left_arr.type, left_values, + left_arr.offset + offset, run.length, out_values, + out_offset + offset); + } else { // All from right - CopyValues(right, offset, block.length, out_valid, out_values, - out_offset + offset); - } else if (block.popcount) { - // One by one - for (int64_t j = 0; j < block.length; ++j) { - if (BitUtil::GetBit(left_valid, left_arr.offset + offset + j)) { - CopyOneValue(left, offset + j, out_valid, out_values, - out_offset + offset + j); - } else { - CopyOneValue(right, offset + j, out_valid, out_values, - out_offset + offset + j); - } - } + CopyFixedWidth::CopyArray(*right_arr.type, right_values, + right_arr.offset + offset, run.length, out_values, + out_offset + offset); } - offset += block.length; + offset += run.length; + } + DCHECK_EQ(offset, length); + + if (right_null_count == 0) { + BitUtil::SetBitsTo(out_valid, out_offset, length, true); + } else { + arrow::internal::BitmapOr(left_valid, left_arr.offset, right_valid, right_arr.offset, + length, out_offset, out_valid); } return Status::OK(); }