From 49950d6be9ed309de39ab2443aa50447e710abf3 Mon Sep 17 00:00:00 2001 From: Ruoxi Sun Date: Fri, 24 May 2024 18:14:40 +0800 Subject: [PATCH 01/12] WIP --- cpp/src/arrow/compute/exec.h | 1 + cpp/src/arrow/compute/expression.h | 16 ++++ cpp/src/arrow/compute/kernel.h | 1 + .../arrow/compute/kernels/codegen_internal.h | 74 +++++++++++++++++++ cpp/src/arrow/util/bit_block_counter.h | 73 ++++++++++++++++++ 5 files changed, 165 insertions(+) diff --git a/cpp/src/arrow/compute/exec.h b/cpp/src/arrow/compute/exec.h index 3fbefe4a1ab..61e7011421b 100644 --- a/cpp/src/arrow/compute/exec.h +++ b/cpp/src/arrow/compute/exec.h @@ -422,6 +422,7 @@ struct ARROW_EXPORT ExecSpan { int64_t length = 0; std::vector values; + SelectionVector *selection_vector = NULLPTR; }; /// \defgroup compute-call-function One-shot calls to compute functions diff --git a/cpp/src/arrow/compute/expression.h b/cpp/src/arrow/compute/expression.h index 9a36a6d3368..05a3f581c99 100644 --- a/cpp/src/arrow/compute/expression.h +++ b/cpp/src/arrow/compute/expression.h @@ -60,6 +60,22 @@ class ARROW_EXPORT Expression { void ComputeHash(); }; + struct SpecialCall { + std::string function_name; + std::vector arguments; + std::shared_ptr options; + // Cached hash value + size_t hash; + + // post-Bind properties: + std::shared_ptr function; + const Kernel* kernel = NULLPTR; + std::shared_ptr kernel_state; + TypeHolder type; + + void ComputeHash(); + }; + std::string ToString() const; bool Equals(const Expression& other) const; size_t hash() const; diff --git a/cpp/src/arrow/compute/kernel.h b/cpp/src/arrow/compute/kernel.h index 1adb3e96c97..6485b99a31a 100644 --- a/cpp/src/arrow/compute/kernel.h +++ b/cpp/src/arrow/compute/kernel.h @@ -566,6 +566,7 @@ struct ARROW_EXPORT ScalarKernel : public Kernel { // bitmaps is a reasonable default NullHandling::type null_handling = NullHandling::INTERSECTION; MemAllocation::type mem_allocation = MemAllocation::PREALLOCATE; + bool selection_vector_aware = false; }; // ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/compute/kernels/codegen_internal.h b/cpp/src/arrow/compute/kernels/codegen_internal.h index 9e46a21887f..a75cc52b4aa 100644 --- a/cpp/src/arrow/compute/kernels/codegen_internal.h +++ b/cpp/src/arrow/compute/kernels/codegen_internal.h @@ -418,6 +418,27 @@ static void VisitTwoArrayValuesInline(const ArraySpan& arr0, const ArraySpan& ar std::move(visit_null)); } +template +static void VisitTwoArrayValuesWithSelInline(const ArraySpan& arr0, const ArraySpan& arr1, + VisitFunc&& valid_func, NullFunc&& null_func, + SelectionVector* sel) { + ArrayIterator arr0_it(arr0); + ArrayIterator arr1_it(arr1); + + auto visit_valid = [&](int64_t i) { + valid_func(GetViewType::LogicalValue(arr0_it()), + GetViewType::LogicalValue(arr1_it())); + }; + auto visit_null = [&]() { + arr0_it(); + arr1_it(); + null_func(); + }; + VisitTwoBitBlocksVoid(arr0.buffers[0].data, arr0.offset, arr1.buffers[0].data, + arr1.offset, arr0.length, std::move(visit_valid), + std::move(visit_null)); +} + // ---------------------------------------------------------------------- // Reusable type resolvers @@ -781,6 +802,19 @@ struct ScalarBinaryNotNullStateful { return st; } + Status ArrayArrayWithSel(KernelContext* ctx, const ArraySpan& arg0, + const ArraySpan& arg1, SelectionVector* sel, ExecResult* out) { + Status st = Status::OK(); + OutputArrayWriter writer(out->array_span_mutable()); + VisitTwoArrayValuesInline( + arg0, arg1, + [&](Arg0Value u, Arg1Value v) { + writer.Write(op.template Call(ctx, u, v, &st)); + }, + [&]() { writer.WriteNull(); }); + return st; + } + Status ArrayScalar(KernelContext* ctx, const ArraySpan& arg0, const Scalar& arg1, ExecResult* out) { Status st = Status::OK(); @@ -801,6 +835,26 @@ struct ScalarBinaryNotNullStateful { return st; } + Status ArrayScalarWithSel(KernelContext* ctx, const ArraySpan& arg0, const Scalar& arg1, + SelectionVector* sel, ExecResult* out) { + Status st = Status::OK(); + ArraySpan* out_span = out->array_span_mutable(); + OutputArrayWriter writer(out_span); + if (arg1.is_valid) { + const auto arg1_val = UnboxScalar::Unbox(arg1); + VisitArrayValuesInline( + arg0, + [&](Arg0Value u) { + writer.Write( + op.template Call(ctx, u, arg1_val, &st)); + }, + [&]() { writer.WriteNull(); }); + } else { + writer.WriteAllNull(out_span->length); + } + return st; + } + Status ScalarArray(KernelContext* ctx, const Scalar& arg0, const ArraySpan& arg1, ExecResult* out) { Status st = Status::OK(); @@ -821,6 +875,26 @@ struct ScalarBinaryNotNullStateful { return st; } + Status ScalarArrayWithSel(KernelContext* ctx, const Scalar& arg0, const ArraySpan& arg1, + SelectionVector* sel, ExecResult* out) { + Status st = Status::OK(); + ArraySpan* out_span = out->array_span_mutable(); + OutputArrayWriter writer(out_span); + if (arg0.is_valid) { + const auto arg0_val = UnboxScalar::Unbox(arg0); + VisitArrayValuesInline( + arg1, + [&](Arg1Value v) { + writer.Write( + op.template Call(ctx, arg0_val, v, &st)); + }, + [&]() { writer.WriteNull(); }); + } else { + writer.WriteAllNull(out_span->length); + } + return st; + } + Status Exec(KernelContext* ctx, const ExecSpan& batch, ExecResult* out) { if (batch[0].is_array()) { if (batch[1].is_array()) { diff --git a/cpp/src/arrow/util/bit_block_counter.h b/cpp/src/arrow/util/bit_block_counter.h index 73a1ee8600f..a31161f3935 100644 --- a/cpp/src/arrow/util/bit_block_counter.h +++ b/cpp/src/arrow/util/bit_block_counter.h @@ -479,6 +479,35 @@ static void VisitBitBlocksVoid(const uint8_t* bitmap, int64_t offset, int64_t le } } +template +static void VisitBitBlocksWithSelVoid(const uint8_t* bitmap, int64_t offset, + int64_t length, const int32_t* sel, + int64_t sel_length, VisitNotNull&& visit_not_null, + VisitNull&& visit_null) { + internal::OptionalBitBlockCounter bit_counter(bitmap, offset, length); + int64_t position = 0; + while (position < length) { + internal::BitBlockCount block = bit_counter.NextBlock(); + if (block.AllSet()) { + for (int64_t i = 0; i < block.length; ++i, ++position) { + visit_not_null(position); + } + } else if (block.NoneSet()) { + for (int64_t i = 0; i < block.length; ++i, ++position) { + visit_null(); + } + } else { + for (int64_t i = 0; i < block.length; ++i, ++position) { + if (bit_util::GetBit(bitmap, offset + position)) { + visit_not_null(position); + } else { + visit_null(); + } + } + } + } +} + template static Status VisitTwoBitBlocks(const uint8_t* left_bitmap, int64_t left_offset, const uint8_t* right_bitmap, int64_t right_offset, @@ -566,5 +595,49 @@ static void VisitTwoBitBlocksVoid(const uint8_t* left_bitmap, int64_t left_offse } } +template +static void VisitTwoBitBlocksWithSelVoid(const uint8_t* left_bitmap, int64_t left_offset, + const uint8_t* right_bitmap, + int64_t right_offset, const int32_t* sel, + int64_t length, VisitNotNull&& visit_not_null, + VisitNull&& visit_null) { + if (left_bitmap == NULLPTR || right_bitmap == NULLPTR) { + // At most one bitmap is present + if (left_bitmap == NULLPTR) { + return VisitBitBlocksWithSelVoid(right_bitmap, right_offset, length, + std::forward(visit_not_null), + std::forward(visit_null)); + } else { + return VisitBitBlocksWithSelVoid(left_bitmap, left_offset, length, + std::forward(visit_not_null), + std::forward(visit_null)); + } + } + BinaryBitBlockCounter bit_counter(left_bitmap, left_offset, right_bitmap, right_offset, + length); + int64_t position = 0; + while (position < length) { + BitBlockCount block = bit_counter.NextAndWord(); + if (block.AllSet()) { + for (int64_t i = 0; i < block.length; ++i, ++position) { + visit_not_null(position); + } + } else if (block.NoneSet()) { + for (int64_t i = 0; i < block.length; ++i, ++position) { + visit_null(); + } + } else { + for (int64_t i = 0; i < block.length; ++i, ++position) { + if (bit_util::GetBit(left_bitmap, left_offset + position) && + bit_util::GetBit(right_bitmap, right_offset + position)) { + visit_not_null(position); + } else { + visit_null(); + } + } + } + } +} + } // namespace internal } // namespace arrow From fb7b936ac05aaf8ca860a2c7a9447913768c8a01 Mon Sep 17 00:00:00 2001 From: Ruoxi Sun Date: Tue, 28 May 2024 00:06:45 +0800 Subject: [PATCH 02/12] Some framework change --- cpp/src/arrow/compute/exec.cc | 10 +++ cpp/src/arrow/compute/expression.cc | 97 ++++++++++++++++++++--------- cpp/src/arrow/compute/expression.h | 26 ++------ 3 files changed, 85 insertions(+), 48 deletions(-) diff --git a/cpp/src/arrow/compute/exec.cc b/cpp/src/arrow/compute/exec.cc index f2e45783831..58c958985d8 100644 --- a/cpp/src/arrow/compute/exec.cc +++ b/cpp/src/arrow/compute/exec.cc @@ -781,6 +781,16 @@ class KernelExecutorImpl : public KernelExecutor { class ScalarExecutor : public KernelExecutorImpl { public: Status Execute(const ExecBatch& batch, ExecListener* listener) override { + if (batch.selection_vector && !kernel_->selection_vector_aware) { + // Slow path for selection vector. + ExecBatch selected_batch; + // Gather selected rows into new batch. + DatumAccumulator new_listener; + RETURN_NOT_OK(Execute(selected_batch, &new_listener)); + // Scatter result according to the original selection vector. + return EmitResult(new_listener.values()[0].array(), listener); + } + RETURN_NOT_OK(span_iterator_.Init(batch, exec_context()->exec_chunksize())); if (batch.length == 0) { diff --git a/cpp/src/arrow/compute/expression.cc b/cpp/src/arrow/compute/expression.cc index 532869b3453..a9ff853fbde 100644 --- a/cpp/src/arrow/compute/expression.cc +++ b/cpp/src/arrow/compute/expression.cc @@ -72,11 +72,12 @@ Expression field_ref(FieldRef ref) { } Expression call(std::string function, std::vector arguments, - std::shared_ptr options) { + std::shared_ptr options, bool special_form) { Expression::Call call; call.function_name = std::move(function); call.arguments = std::move(arguments); call.options = std::move(options); + call.special_form = special_form; return Expression(std::move(call)); } @@ -759,42 +760,82 @@ Result ExecuteScalarExpression(const Expression& expr, const ExecBatch& i std::vector arguments(call->arguments.size()); - bool all_scalar = true; - for (size_t i = 0; i < arguments.size(); ++i) { - ARROW_ASSIGN_OR_RAISE( - arguments[i], ExecuteScalarExpression(call->arguments[i], input, exec_context)); - if (arguments[i].is_array()) { - all_scalar = false; + if (!call->special_form) { + bool all_scalar = true; + for (size_t i = 0; i < arguments.size(); ++i) { + ARROW_ASSIGN_OR_RAISE( + arguments[i], ExecuteScalarExpression(call->arguments[i], input, exec_context)); + if (arguments[i].is_array()) { + all_scalar = false; + } } - } - int64_t input_length; - if (!arguments.empty() && all_scalar) { - // all inputs are scalar, so use a 1-long batch to avoid - // computing input.length equivalent outputs - input_length = 1; + int64_t input_length; + if (!arguments.empty() && all_scalar) { + // all inputs are scalar, so use a 1-long batch to avoid + // computing input.length equivalent outputs + input_length = 1; + } else { + input_length = input.length; + } + + auto executor = compute::detail::KernelExecutor::MakeScalar(); + + compute::KernelContext kernel_context(exec_context, call->kernel); + kernel_context.SetState(call->kernel_state.get()); + + const Kernel* kernel = call->kernel; + std::vector types = GetTypes(arguments); + auto options = call->options.get(); + RETURN_NOT_OK(executor->Init(&kernel_context, {kernel, types, options})); + + compute::detail::DatumAccumulator listener; + RETURN_NOT_OK( + executor->Execute(ExecBatch(std::move(arguments), input_length), &listener)); + const auto out = executor->WrapResults(arguments, listener.values()); +#ifndef NDEBUG + DCHECK_OK(executor->CheckResultType(out, call->function_name.c_str())); +#endif + return out; } else { - input_length = input.length; - } + ARROW_ASSIGN_OR_RAISE( + arguments[0], ExecuteScalarExpression(call->arguments[0], input, exec_context)); + // Obtain the selection vector from cond. + ExecBatch if_input = input; + ARROW_ASSIGN_OR_RAISE( + if_input.selection_vector, + SelectionVector::FromMask(*arguments[0].array_as())); + ARROW_ASSIGN_OR_RAISE(arguments[1], ExecuteScalarExpression(call->arguments[1], + if_input, exec_context)); + ExecBatch else_input = input; + // Else input must consider the original selection vector, instead of merely taking + // false rows from cond. + ARROW_ASSIGN_OR_RAISE( + else_input.selection_vector, + SelectionVector::FromMask(*arguments[0].array_as())); + ARROW_ASSIGN_OR_RAISE( + arguments[2], + ExecuteScalarExpression(call->arguments[2], else_input, exec_context)); - auto executor = compute::detail::KernelExecutor::MakeScalar(); + auto executor = compute::detail::KernelExecutor::MakeScalar(); - compute::KernelContext kernel_context(exec_context, call->kernel); - kernel_context.SetState(call->kernel_state.get()); + compute::KernelContext kernel_context(exec_context, call->kernel); + kernel_context.SetState(call->kernel_state.get()); - const Kernel* kernel = call->kernel; - std::vector types = GetTypes(arguments); - auto options = call->options.get(); - RETURN_NOT_OK(executor->Init(&kernel_context, {kernel, types, options})); + const Kernel* kernel = call->kernel; + std::vector types = GetTypes(arguments); + auto options = call->options.get(); + RETURN_NOT_OK(executor->Init(&kernel_context, {kernel, types, options})); - compute::detail::DatumAccumulator listener; - RETURN_NOT_OK( - executor->Execute(ExecBatch(std::move(arguments), input_length), &listener)); - const auto out = executor->WrapResults(arguments, listener.values()); + compute::detail::DatumAccumulator listener; + RETURN_NOT_OK( + executor->Execute(ExecBatch(std::move(arguments), input.length), &listener)); + const auto out = executor->WrapResults(arguments, listener.values()); #ifndef NDEBUG - DCHECK_OK(executor->CheckResultType(out, call->function_name.c_str())); + DCHECK_OK(executor->CheckResultType(out, call->function_name.c_str())); #endif - return out; + return out; + } } namespace { diff --git a/cpp/src/arrow/compute/expression.h b/cpp/src/arrow/compute/expression.h index 05a3f581c99..4061f572593 100644 --- a/cpp/src/arrow/compute/expression.h +++ b/cpp/src/arrow/compute/expression.h @@ -48,22 +48,7 @@ class ARROW_EXPORT Expression { std::string function_name; std::vector arguments; std::shared_ptr options; - // Cached hash value - size_t hash; - - // post-Bind properties: - std::shared_ptr function; - const Kernel* kernel = NULLPTR; - std::shared_ptr kernel_state; - TypeHolder type; - - void ComputeHash(); - }; - - struct SpecialCall { - std::string function_name; - std::vector arguments; - std::shared_ptr options; + bool special_form = false; // Cached hash value size_t hash; @@ -175,14 +160,15 @@ Expression field_ref(FieldRef ref); ARROW_EXPORT Expression call(std::string function, std::vector arguments, - std::shared_ptr options = NULLPTR); + std::shared_ptr options = NULLPTR, + bool special_form = false); template ::value>::type> -Expression call(std::string function, std::vector arguments, - Options options) { +Expression call(std::string function, std::vector arguments, Options options, + bool special_form = false) { return call(std::move(function), std::move(arguments), - std::make_shared(std::move(options))); + std::make_shared(std::move(options)), special_form); } /// Assemble a list of all fields referenced by an Expression at any depth. From 8047d0a40c09f4eaa837d93d971d0a9d97475474 Mon Sep 17 00:00:00 2001 From: Ruoxi Sun Date: Tue, 28 May 2024 01:43:47 +0800 Subject: [PATCH 03/12] Use bit mask as selection vector --- cpp/src/arrow/compute/exec.cc | 10 +++------- cpp/src/arrow/compute/exec.h | 6 +----- cpp/src/arrow/compute/expression.cc | 18 ++++++++---------- 3 files changed, 12 insertions(+), 22 deletions(-) diff --git a/cpp/src/arrow/compute/exec.cc b/cpp/src/arrow/compute/exec.cc index 58c958985d8..80b968ae8a3 100644 --- a/cpp/src/arrow/compute/exec.cc +++ b/cpp/src/arrow/compute/exec.cc @@ -1355,19 +1355,15 @@ const CpuInfo* ExecContext::cpu_info() const { return CpuInfo::GetInstance(); } SelectionVector::SelectionVector(std::shared_ptr data) : data_(std::move(data)) { - DCHECK_EQ(Type::INT32, data_->type->id()); + DCHECK_EQ(Type::BOOL, data_->type->id()); DCHECK_EQ(0, data_->GetNullCount()); - indices_ = data_->GetValues(1); } SelectionVector::SelectionVector(const Array& arr) : SelectionVector(arr.data()) {} -int32_t SelectionVector::length() const { return static_cast(data_->length); } +std::shared_ptr SelectionVector::data() const { return data_; } -Result> SelectionVector::FromMask( - const BooleanArray& arr) { - return Status::NotImplemented("FromMask"); -} +int32_t SelectionVector::length() const { return static_cast(data_->length); } Result CallFunction(const std::string& func_name, const std::vector& args, const FunctionOptions* options, ExecContext* ctx) { diff --git a/cpp/src/arrow/compute/exec.h b/cpp/src/arrow/compute/exec.h index 61e7011421b..e54061bfcfa 100644 --- a/cpp/src/arrow/compute/exec.h +++ b/cpp/src/arrow/compute/exec.h @@ -140,15 +140,11 @@ class ARROW_EXPORT SelectionVector { explicit SelectionVector(const Array& arr); - /// \brief Create SelectionVector from boolean mask - static Result> FromMask(const BooleanArray& arr); - - const int32_t* indices() const { return indices_; } + std::shared_ptr data() const; int32_t length() const; private: std::shared_ptr data_; - const int32_t* indices_; }; /// An index to represent that a batch does not belong to an ordered stream diff --git a/cpp/src/arrow/compute/expression.cc b/cpp/src/arrow/compute/expression.cc index a9ff853fbde..510b4d16b5d 100644 --- a/cpp/src/arrow/compute/expression.cc +++ b/cpp/src/arrow/compute/expression.cc @@ -789,9 +789,10 @@ Result ExecuteScalarExpression(const Expression& expr, const ExecBatch& i auto options = call->options.get(); RETURN_NOT_OK(executor->Init(&kernel_context, {kernel, types, options})); + ExecBatch arguments_batch(std::move(arguments), input_length); + arguments_batch.selection_vector = input.selection_vector; compute::detail::DatumAccumulator listener; - RETURN_NOT_OK( - executor->Execute(ExecBatch(std::move(arguments), input_length), &listener)); + RETURN_NOT_OK(executor->Execute(std::move(arguments_batch), &listener)); const auto out = executor->WrapResults(arguments, listener.values()); #ifndef NDEBUG DCHECK_OK(executor->CheckResultType(out, call->function_name.c_str())); @@ -802,17 +803,13 @@ Result ExecuteScalarExpression(const Expression& expr, const ExecBatch& i arguments[0], ExecuteScalarExpression(call->arguments[0], input, exec_context)); // Obtain the selection vector from cond. ExecBatch if_input = input; - ARROW_ASSIGN_OR_RAISE( - if_input.selection_vector, - SelectionVector::FromMask(*arguments[0].array_as())); + if_input.selection_vector = std::make_shared(arguments[0].array()); ARROW_ASSIGN_OR_RAISE(arguments[1], ExecuteScalarExpression(call->arguments[1], if_input, exec_context)); ExecBatch else_input = input; // Else input must consider the original selection vector, instead of merely taking // false rows from cond. - ARROW_ASSIGN_OR_RAISE( - else_input.selection_vector, - SelectionVector::FromMask(*arguments[0].array_as())); + else_input.selection_vector = std::make_shared(arguments[0].array()); ARROW_ASSIGN_OR_RAISE( arguments[2], ExecuteScalarExpression(call->arguments[2], else_input, exec_context)); @@ -827,9 +824,10 @@ Result ExecuteScalarExpression(const Expression& expr, const ExecBatch& i auto options = call->options.get(); RETURN_NOT_OK(executor->Init(&kernel_context, {kernel, types, options})); + ExecBatch arguments_batch(std::move(arguments), input.length); + arguments_batch.selection_vector = input.selection_vector; compute::detail::DatumAccumulator listener; - RETURN_NOT_OK( - executor->Execute(ExecBatch(std::move(arguments), input.length), &listener)); + RETURN_NOT_OK(executor->Execute(std::move(arguments_batch), &listener)); const auto out = executor->WrapResults(arguments, listener.values()); #ifndef NDEBUG DCHECK_OK(executor->CheckResultType(out, call->function_name.c_str())); From d10d34a8acc76ce5a3763642c0143f87c1cc0c1a Mon Sep 17 00:00:00 2001 From: Ruoxi Sun Date: Tue, 28 May 2024 01:43:54 +0800 Subject: [PATCH 04/12] Add test --- cpp/src/arrow/compute/expression_test.cc | 27 ++++++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/cpp/src/arrow/compute/expression_test.cc b/cpp/src/arrow/compute/expression_test.cc index 30bd882b2c0..af04e137014 100644 --- a/cpp/src/arrow/compute/expression_test.cc +++ b/cpp/src/arrow/compute/expression_test.cc @@ -32,6 +32,7 @@ #include "arrow/compute/registry.h" #include "arrow/testing/gtest_util.h" #include "arrow/testing/matchers.h" +#include "arrow/util/vector.h" using testing::Eq; using testing::HasSubstr; @@ -1775,5 +1776,31 @@ TEST(Projection, AugmentWithKnownValues) { })); } +TEST(Expression, IfElseSpecialForm) { + const std::shared_ptr input_schema = + schema({field("a", int64()), field("b", int64()), field("c", int64()), + field("d", int64())}); + auto input = RecordBatchFromJSON(input_schema, R"([ + {"a": 0, "b": -1, "c": 0, "d": -1}, + {"a": 1, "b": -1, "c": 0, "d": -1}, + {"a": 0, "b": -1, "c": 1, "d": -1}, + {"a": 1, "b": -1, "c": 1, "d": -1} + ])"); + auto a = field_ref("a"); + auto b = field_ref("b"); + auto c = field_ref("c"); + auto d = field_ref("d"); + auto zero = literal(0ll); + auto one = literal(1ll); + auto a_not_zero = not_equal(a, zero); + auto one_div_a = call("divide", {one, a}); + auto if_else_inner = call("if_else", {a_not_zero, one_div_a, zero}, nullptr, true); + auto if_else_inner_not_zero = not_equal(if_else_inner, zero); + auto if_else_outer = + call("if_else", {if_else_inner_not_zero, one_div_a, zero}, nullptr, true); + ASSERT_OK_AND_ASSIGN(auto expr, if_else_outer.Bind(*input_schema)); + ASSERT_OK_AND_ASSIGN(auto result, ExecuteScalarExpression(expr, *input_schema, input)); +} + } // namespace compute } // namespace arrow From 49fb971c532be1a3b23e0da80a5064671fe7159f Mon Sep 17 00:00:00 2001 From: Ruoxi Sun Date: Tue, 28 May 2024 20:24:33 +0800 Subject: [PATCH 05/12] Add basic selection vector operations --- cpp/src/arrow/compute/exec.cc | 33 ++++++++++++++++++++++++++++- cpp/src/arrow/compute/exec.h | 11 +++++++++- cpp/src/arrow/compute/expression.cc | 17 ++++++++++----- 3 files changed, 54 insertions(+), 7 deletions(-) diff --git a/cpp/src/arrow/compute/exec.cc b/cpp/src/arrow/compute/exec.cc index 80b968ae8a3..a5061988e32 100644 --- a/cpp/src/arrow/compute/exec.cc +++ b/cpp/src/arrow/compute/exec.cc @@ -783,7 +783,13 @@ class ScalarExecutor : public KernelExecutorImpl { Status Execute(const ExecBatch& batch, ExecListener* listener) override { if (batch.selection_vector && !kernel_->selection_vector_aware) { // Slow path for selection vector. - ExecBatch selected_batch; + auto values = batch.values; + for (auto& value : values) { + if (value.is_scalar()) continue; + ARROW_ASSIGN_OR_RAISE(value, Filter(value, batch.selection_vector->data(), + FilterOptions::Defaults())); + } + ARROW_ASSIGN_OR_RAISE(ExecBatch selected_batch, ExecBatch::Make(std::move(values))); // Gather selected rows into new batch. DatumAccumulator new_listener; RETURN_NOT_OK(Execute(selected_batch, &new_listener)); @@ -1361,6 +1367,31 @@ SelectionVector::SelectionVector(std::shared_ptr data) SelectionVector::SelectionVector(const Array& arr) : SelectionVector(arr.data()) {} +Result> SelectionVector::Copy( + const std::shared_ptr& mm) const { + ARROW_ASSIGN_OR_RAISE(auto copy, data_->CopyTo(mm)); + return std::make_unique(std::move(copy)); +} + +Status SelectionVector::Intersect(const ArrayData& other) { + DCHECK_EQ(Type::BOOL, other.type->id()); + DCHECK_EQ(data_->length, other.length); + ::arrow::internal::BitmapAnd(data_->buffers[1]->data(), data_->offset, + other.buffers[1]->data(), other.offset, data_->length, + data_->offset, data_->buffers[1]->mutable_data()); + return Status::OK(); +} + +Status SelectionVector::Intersect(const SelectionVector& other) { + return Intersect(*other.data()); +} + +Status SelectionVector::Invert() { + ::arrow::internal::InvertBitmap(data_->buffers[1]->data(), data_->offset, data_->length, + data_->buffers[1]->mutable_data(), data_->offset); + return Status::OK(); +} + std::shared_ptr SelectionVector::data() const { return data_; } int32_t SelectionVector::length() const { return static_cast(data_->length); } diff --git a/cpp/src/arrow/compute/exec.h b/cpp/src/arrow/compute/exec.h index e54061bfcfa..70d3d0a8ee2 100644 --- a/cpp/src/arrow/compute/exec.h +++ b/cpp/src/arrow/compute/exec.h @@ -140,6 +140,15 @@ class ARROW_EXPORT SelectionVector { explicit SelectionVector(const Array& arr); + Result> Copy( + const std::shared_ptr& mm) const; + + Status Intersect(const ArrayData& other); + + Status Intersect(const SelectionVector& other); + + Status Invert(); + std::shared_ptr data() const; int32_t length() const; @@ -418,7 +427,7 @@ struct ARROW_EXPORT ExecSpan { int64_t length = 0; std::vector values; - SelectionVector *selection_vector = NULLPTR; + SelectionVector* selection_vector = NULLPTR; }; /// \defgroup compute-call-function One-shot calls to compute functions diff --git a/cpp/src/arrow/compute/expression.cc b/cpp/src/arrow/compute/expression.cc index 510b4d16b5d..870bd374508 100644 --- a/cpp/src/arrow/compute/expression.cc +++ b/cpp/src/arrow/compute/expression.cc @@ -799,17 +799,24 @@ Result ExecuteScalarExpression(const Expression& expr, const ExecBatch& i #endif return out; } else { + auto overall_sel = input.selection_vector; ARROW_ASSIGN_OR_RAISE( arguments[0], ExecuteScalarExpression(call->arguments[0], input, exec_context)); // Obtain the selection vector from cond. + auto if_sel = std::make_shared(arguments[0].array()); + ARROW_ASSIGN_OR_RAISE(auto else_sel, if_sel->Copy(CPUDevice::memory_manager(exec_context->memory_pool()))); + RETURN_NOT_OK(else_sel->Invert()); + if (overall_sel) { + RETURN_NOT_OK(if_sel->Intersect(*overall_sel)); + RETURN_NOT_OK(else_sel->Intersect(*overall_sel)); + } + ExecBatch if_input = input; - if_input.selection_vector = std::make_shared(arguments[0].array()); + if_input.selection_vector = std::move(if_sel); ARROW_ASSIGN_OR_RAISE(arguments[1], ExecuteScalarExpression(call->arguments[1], if_input, exec_context)); ExecBatch else_input = input; - // Else input must consider the original selection vector, instead of merely taking - // false rows from cond. - else_input.selection_vector = std::make_shared(arguments[0].array()); + else_input.selection_vector = std::move(else_sel); ARROW_ASSIGN_OR_RAISE( arguments[2], ExecuteScalarExpression(call->arguments[2], else_input, exec_context)); @@ -825,7 +832,7 @@ Result ExecuteScalarExpression(const Expression& expr, const ExecBatch& i RETURN_NOT_OK(executor->Init(&kernel_context, {kernel, types, options})); ExecBatch arguments_batch(std::move(arguments), input.length); - arguments_batch.selection_vector = input.selection_vector; + arguments_batch.selection_vector = std::move(overall_sel); compute::detail::DatumAccumulator listener; RETURN_NOT_OK(executor->Execute(std::move(arguments_batch), &listener)); const auto out = executor->WrapResults(arguments, listener.values()); From cf33b98c66b0f89f70f4bb73f274a89d116dc018 Mon Sep 17 00:00:00 2001 From: Ruoxi Sun Date: Mon, 3 Jun 2024 13:04:08 +0800 Subject: [PATCH 06/12] Support expression level selection vector aware inquery --- cpp/src/arrow/compute/expression.cc | 19 ++++++++++++++++++- cpp/src/arrow/compute/expression.h | 3 +++ cpp/src/arrow/compute/kernel.h | 3 ++- 3 files changed, 23 insertions(+), 2 deletions(-) diff --git a/cpp/src/arrow/compute/expression.cc b/cpp/src/arrow/compute/expression.cc index 870bd374508..2522669bafb 100644 --- a/cpp/src/arrow/compute/expression.cc +++ b/cpp/src/arrow/compute/expression.cc @@ -120,6 +120,16 @@ const DataType* Expression::type() const { return CallNotNull(*this)->type.type; } +const bool Expression::selection_vector_aware() const { + DCHECK(IsBound()); + + if (literal() || field_ref()) { + return true; + } + + return CallNotNull(*this)->selection_vector_aware; +} + namespace { std::string PrintDatum(const Datum& datum) { @@ -573,7 +583,12 @@ Result BindNonRecursive(Expression::Call call, bool insert_implicit_ types = GetTypesWithSmallestLiteralRepresentation(call.arguments); ARROW_ASSIGN_OR_RAISE(call.kernel, call.function->DispatchBest(&types)); + call.selection_vector_aware = call.kernel->selection_vector_aware; + for (size_t i = 0; i < types.size(); ++i) { + call.selection_vector_aware = + call.selection_vector_aware && call.arguments[i].selection_vector_aware(); + if (types[i] == call.arguments[i].type()) continue; if (const Datum* lit = call.arguments[i].literal()) { @@ -804,7 +819,9 @@ Result ExecuteScalarExpression(const Expression& expr, const ExecBatch& i arguments[0], ExecuteScalarExpression(call->arguments[0], input, exec_context)); // Obtain the selection vector from cond. auto if_sel = std::make_shared(arguments[0].array()); - ARROW_ASSIGN_OR_RAISE(auto else_sel, if_sel->Copy(CPUDevice::memory_manager(exec_context->memory_pool()))); + ARROW_ASSIGN_OR_RAISE( + auto else_sel, + if_sel->Copy(CPUDevice::memory_manager(exec_context->memory_pool()))); RETURN_NOT_OK(else_sel->Invert()); if (overall_sel) { RETURN_NOT_OK(if_sel->Intersect(*overall_sel)); diff --git a/cpp/src/arrow/compute/expression.h b/cpp/src/arrow/compute/expression.h index 4061f572593..b876470a1b7 100644 --- a/cpp/src/arrow/compute/expression.h +++ b/cpp/src/arrow/compute/expression.h @@ -57,6 +57,7 @@ class ARROW_EXPORT Expression { const Kernel* kernel = NULLPTR; std::shared_ptr kernel_state; TypeHolder type; + bool selection_vector_aware = false; void ComputeHash(); }; @@ -119,6 +120,8 @@ class ARROW_EXPORT Expression { // XXX someday // NullGeneralization::type nullable() const; + const bool selection_vector_aware() const; + struct Parameter { FieldRef ref; diff --git a/cpp/src/arrow/compute/kernel.h b/cpp/src/arrow/compute/kernel.h index 6485b99a31a..97bc6bb09d9 100644 --- a/cpp/src/arrow/compute/kernel.h +++ b/cpp/src/arrow/compute/kernel.h @@ -524,6 +524,8 @@ struct ARROW_EXPORT Kernel { // Additional kernel-specific data std::shared_ptr data; + + bool selection_vector_aware = false; }; /// \brief The scalar kernel execution API that must be implemented for SCALAR @@ -566,7 +568,6 @@ struct ARROW_EXPORT ScalarKernel : public Kernel { // bitmaps is a reasonable default NullHandling::type null_handling = NullHandling::INTERSECTION; MemAllocation::type mem_allocation = MemAllocation::PREALLOCATE; - bool selection_vector_aware = false; }; // ---------------------------------------------------------------------- From 6c6ed3b014bad73983a0b6c9b7a7194f641ba92b Mon Sep 17 00:00:00 2001 From: Ruoxi Sun Date: Mon, 3 Jun 2024 20:15:57 +0800 Subject: [PATCH 07/12] Framework mostly done --- cpp/src/arrow/CMakeLists.txt | 2 + cpp/src/arrow/compute/exec.cc | 16 --- cpp/src/arrow/compute/expression.cc | 126 ++++++------------ cpp/src/arrow/compute/expression.h | 11 +- cpp/src/arrow/compute/expression_internal.h | 36 +++++ cpp/src/arrow/compute/special_form.cc | 38 ++++++ cpp/src/arrow/compute/special_form.h | 42 ++++++ .../special_forms/if_else_special_form.cc | 59 ++++++++ .../special_forms/if_else_special_form.h | 35 +++++ cpp/src/arrow/compute/type_fwd.h | 1 + 10 files changed, 261 insertions(+), 105 deletions(-) create mode 100644 cpp/src/arrow/compute/special_form.cc create mode 100644 cpp/src/arrow/compute/special_form.h create mode 100644 cpp/src/arrow/compute/special_forms/if_else_special_form.cc create mode 100644 cpp/src/arrow/compute/special_forms/if_else_special_form.h diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt index 57a0b383a67..712db14ff1f 100644 --- a/cpp/src/arrow/CMakeLists.txt +++ b/cpp/src/arrow/CMakeLists.txt @@ -717,6 +717,8 @@ set(ARROW_COMPUTE_SRCS compute/row/compare_internal.cc compute/row/grouper.cc compute/row/row_internal.cc + compute/special_form.cc + compute/special_forms/if_else_special_form.cc compute/util.cc compute/util_internal.cc) diff --git a/cpp/src/arrow/compute/exec.cc b/cpp/src/arrow/compute/exec.cc index a5061988e32..65b5dd14305 100644 --- a/cpp/src/arrow/compute/exec.cc +++ b/cpp/src/arrow/compute/exec.cc @@ -781,22 +781,6 @@ class KernelExecutorImpl : public KernelExecutor { class ScalarExecutor : public KernelExecutorImpl { public: Status Execute(const ExecBatch& batch, ExecListener* listener) override { - if (batch.selection_vector && !kernel_->selection_vector_aware) { - // Slow path for selection vector. - auto values = batch.values; - for (auto& value : values) { - if (value.is_scalar()) continue; - ARROW_ASSIGN_OR_RAISE(value, Filter(value, batch.selection_vector->data(), - FilterOptions::Defaults())); - } - ARROW_ASSIGN_OR_RAISE(ExecBatch selected_batch, ExecBatch::Make(std::move(values))); - // Gather selected rows into new batch. - DatumAccumulator new_listener; - RETURN_NOT_OK(Execute(selected_batch, &new_listener)); - // Scatter result according to the original selection vector. - return EmitResult(new_listener.values()[0].array(), listener); - } - RETURN_NOT_OK(span_iterator_.Init(batch, exec_context()->exec_chunksize())); if (batch.length == 0) { diff --git a/cpp/src/arrow/compute/expression.cc b/cpp/src/arrow/compute/expression.cc index 2522669bafb..5a85fc33ccb 100644 --- a/cpp/src/arrow/compute/expression.cc +++ b/cpp/src/arrow/compute/expression.cc @@ -24,9 +24,9 @@ #include "arrow/chunked_array.h" #include "arrow/compute/api_vector.h" -#include "arrow/compute/exec_internal.h" #include "arrow/compute/expression_internal.h" #include "arrow/compute/function_internal.h" +#include "arrow/compute/special_form.h" #include "arrow/compute/util.h" #include "arrow/io/memory.h" #include "arrow/ipc/reader.h" @@ -72,12 +72,12 @@ Expression field_ref(FieldRef ref) { } Expression call(std::string function, std::vector arguments, - std::shared_ptr options, bool special_form) { + std::shared_ptr options, bool is_special_form) { Expression::Call call; call.function_name = std::move(function); call.arguments = std::move(arguments); call.options = std::move(options); - call.special_form = special_form; + call.is_special_form = is_special_form; return Expression(std::move(call)); } @@ -120,7 +120,7 @@ const DataType* Expression::type() const { return CallNotNull(*this)->type.type; } -const bool Expression::selection_vector_aware() const { +bool Expression::selection_vector_aware() const { DCHECK(IsBound()); if (literal() || field_ref()) { @@ -561,6 +561,11 @@ Result BindNonRecursive(Expression::Call call, bool insert_implicit_ ARROW_ASSIGN_OR_RAISE( call.type, call.kernel->signature->out_type().Resolve(&kernel_context, types)); + + if (call.is_special_form) { + ARROW_ASSIGN_OR_RAISE(call.special_form, SpecialForm::Make(call.function_name)); + } + return Status::OK(); }; @@ -732,6 +737,29 @@ Result ExecuteScalarExpression(const Expression& expr, const Schema& full return ExecuteScalarExpression(expr, input, exec_context); } +namespace { + +// Execute a selection-vector-equipped scalar expression that is not fully +// selection-vector-aware using slow path: gather/scatter. +Result ExecuteScalarExpressionWithSelSlowPath(const Expression& expr, + const ExecBatch& input, + compute::ExecContext* exec_context) { + DCHECK(!expr.selection_vector_aware() && input.selection_vector); + auto values = input.values; + for (auto& value : values) { + if (value.is_scalar()) continue; + ARROW_ASSIGN_OR_RAISE( + value, Filter(value, input.selection_vector->data(), FilterOptions::Defaults())); + } + ARROW_ASSIGN_OR_RAISE(ExecBatch selected_batch, ExecBatch::Make(std::move(values))); + ARROW_ASSIGN_OR_RAISE(auto partial_result, + ExecuteScalarExpression(expr, selected_batch, exec_context)); + // TODO: Scatter. + return partial_result; +} + +} // namespace + Result ExecuteScalarExpression(const Expression& expr, const ExecBatch& input, compute::ExecContext* exec_context) { if (exec_context == nullptr) { @@ -748,6 +776,10 @@ Result ExecuteScalarExpression(const Expression& expr, const ExecBatch& i "ExecuteScalarExpression cannot Execute non-scalar expression ", expr.ToString()); } + if (!expr.selection_vector_aware() && input.selection_vector) { + return ExecuteScalarExpressionWithSelSlowPath(expr, input, exec_context); + } + if (auto lit = expr.literal()) return *lit; if (auto param = expr.parameter()) { @@ -773,90 +805,16 @@ Result ExecuteScalarExpression(const Expression& expr, const ExecBatch& i auto call = CallNotNull(expr); - std::vector arguments(call->arguments.size()); - - if (!call->special_form) { - bool all_scalar = true; + if (call->is_special_form) { + DCHECK(call->special_form); + return call->special_form->Execute(*call, input, exec_context); + } else { + std::vector arguments(call->arguments.size()); for (size_t i = 0; i < arguments.size(); ++i) { ARROW_ASSIGN_OR_RAISE( arguments[i], ExecuteScalarExpression(call->arguments[i], input, exec_context)); - if (arguments[i].is_array()) { - all_scalar = false; - } } - - int64_t input_length; - if (!arguments.empty() && all_scalar) { - // all inputs are scalar, so use a 1-long batch to avoid - // computing input.length equivalent outputs - input_length = 1; - } else { - input_length = input.length; - } - - auto executor = compute::detail::KernelExecutor::MakeScalar(); - - compute::KernelContext kernel_context(exec_context, call->kernel); - kernel_context.SetState(call->kernel_state.get()); - - const Kernel* kernel = call->kernel; - std::vector types = GetTypes(arguments); - auto options = call->options.get(); - RETURN_NOT_OK(executor->Init(&kernel_context, {kernel, types, options})); - - ExecBatch arguments_batch(std::move(arguments), input_length); - arguments_batch.selection_vector = input.selection_vector; - compute::detail::DatumAccumulator listener; - RETURN_NOT_OK(executor->Execute(std::move(arguments_batch), &listener)); - const auto out = executor->WrapResults(arguments, listener.values()); -#ifndef NDEBUG - DCHECK_OK(executor->CheckResultType(out, call->function_name.c_str())); -#endif - return out; - } else { - auto overall_sel = input.selection_vector; - ARROW_ASSIGN_OR_RAISE( - arguments[0], ExecuteScalarExpression(call->arguments[0], input, exec_context)); - // Obtain the selection vector from cond. - auto if_sel = std::make_shared(arguments[0].array()); - ARROW_ASSIGN_OR_RAISE( - auto else_sel, - if_sel->Copy(CPUDevice::memory_manager(exec_context->memory_pool()))); - RETURN_NOT_OK(else_sel->Invert()); - if (overall_sel) { - RETURN_NOT_OK(if_sel->Intersect(*overall_sel)); - RETURN_NOT_OK(else_sel->Intersect(*overall_sel)); - } - - ExecBatch if_input = input; - if_input.selection_vector = std::move(if_sel); - ARROW_ASSIGN_OR_RAISE(arguments[1], ExecuteScalarExpression(call->arguments[1], - if_input, exec_context)); - ExecBatch else_input = input; - else_input.selection_vector = std::move(else_sel); - ARROW_ASSIGN_OR_RAISE( - arguments[2], - ExecuteScalarExpression(call->arguments[2], else_input, exec_context)); - - auto executor = compute::detail::KernelExecutor::MakeScalar(); - - compute::KernelContext kernel_context(exec_context, call->kernel); - kernel_context.SetState(call->kernel_state.get()); - - const Kernel* kernel = call->kernel; - std::vector types = GetTypes(arguments); - auto options = call->options.get(); - RETURN_NOT_OK(executor->Init(&kernel_context, {kernel, types, options})); - - ExecBatch arguments_batch(std::move(arguments), input.length); - arguments_batch.selection_vector = std::move(overall_sel); - compute::detail::DatumAccumulator listener; - RETURN_NOT_OK(executor->Execute(std::move(arguments_batch), &listener)); - const auto out = executor->WrapResults(arguments, listener.values()); -#ifndef NDEBUG - DCHECK_OK(executor->CheckResultType(out, call->function_name.c_str())); -#endif - return out; + return ExecuteCallNonRecursive(*call, input, arguments, exec_context); } } diff --git a/cpp/src/arrow/compute/expression.h b/cpp/src/arrow/compute/expression.h index b876470a1b7..7c26d25a1d9 100644 --- a/cpp/src/arrow/compute/expression.h +++ b/cpp/src/arrow/compute/expression.h @@ -48,7 +48,7 @@ class ARROW_EXPORT Expression { std::string function_name; std::vector arguments; std::shared_ptr options; - bool special_form = false; + bool is_special_form = false; // Cached hash value size_t hash; @@ -58,6 +58,7 @@ class ARROW_EXPORT Expression { std::shared_ptr kernel_state; TypeHolder type; bool selection_vector_aware = false; + std::shared_ptr special_form{}; void ComputeHash(); }; @@ -120,7 +121,7 @@ class ARROW_EXPORT Expression { // XXX someday // NullGeneralization::type nullable() const; - const bool selection_vector_aware() const; + bool selection_vector_aware() const; struct Parameter { FieldRef ref; @@ -164,14 +165,14 @@ Expression field_ref(FieldRef ref); ARROW_EXPORT Expression call(std::string function, std::vector arguments, std::shared_ptr options = NULLPTR, - bool special_form = false); + bool is_special_form = false); template ::value>::type> Expression call(std::string function, std::vector arguments, Options options, - bool special_form = false) { + bool is_special_form = false) { return call(std::move(function), std::move(arguments), - std::make_shared(std::move(options)), special_form); + std::make_shared(std::move(options)), is_special_form); } /// Assemble a list of all fields referenced by an Expression at any depth. diff --git a/cpp/src/arrow/compute/expression_internal.h b/cpp/src/arrow/compute/expression_internal.h index 2048ef96651..5c7b5d4fa1a 100644 --- a/cpp/src/arrow/compute/expression_internal.h +++ b/cpp/src/arrow/compute/expression_internal.h @@ -24,6 +24,7 @@ #include "arrow/compute/api_scalar.h" #include "arrow/compute/cast.h" #include "arrow/compute/cast_internal.h" +#include "arrow/compute/exec_internal.h" #include "arrow/compute/registry.h" #include "arrow/record_batch.h" #include "arrow/table.h" @@ -290,5 +291,40 @@ inline Result> GetFunction( return GetCastFunction(*to_type); } +inline Result ExecuteCallNonRecursive(const Expression::Call& call, + const ExecBatch& input, + const std::vector& arguments, + ExecContext* exec_context) { + auto executor = compute::detail::KernelExecutor::MakeScalar(); + + compute::KernelContext kernel_context(exec_context, call.kernel); + kernel_context.SetState(call.kernel_state.get()); + + const Kernel* kernel = call.kernel; + std::vector types = GetTypes(arguments); + auto options = call.options.get(); + RETURN_NOT_OK(executor->Init(&kernel_context, {kernel, types, options})); + + bool all_scalar = std::all_of(arguments.begin(), arguments.end(), + [](const Datum& value) { return value.is_scalar(); }); + int64_t input_length; + if (!arguments.empty() && all_scalar) { + // all inputs are scalar, so use a 1-long batch to avoid + // computing input.length equivalent outputs + input_length = 1; + } else { + input_length = input.length; + } + ExecBatch arguments_batch(std::move(arguments), input_length); + arguments_batch.selection_vector = input.selection_vector; + compute::detail::DatumAccumulator listener; + RETURN_NOT_OK(executor->Execute(std::move(arguments_batch), &listener)); + const auto out = executor->WrapResults(arguments, listener.values()); +#ifndef NDEBUG + DCHECK_OK(executor->CheckResultType(out, call.function_name.c_str())); +#endif + return out; +} + } // namespace compute } // namespace arrow diff --git a/cpp/src/arrow/compute/special_form.cc b/cpp/src/arrow/compute/special_form.cc new file mode 100644 index 00000000000..488b6c76874 --- /dev/null +++ b/cpp/src/arrow/compute/special_form.cc @@ -0,0 +1,38 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// NOTE: API is EXPERIMENTAL and will change without going through a +// deprecation cycle. + +#include "arrow/compute/special_form.h" +#include "arrow/compute/api_vector.h" +#include "arrow/compute/exec.h" +#include "arrow/compute/special_forms/if_else_special_form.h" +#include "arrow/util/logging.h" + +namespace arrow { +namespace compute { + +Result> SpecialForm::Make(const std::string& name) { + if (name == "if_else") { + return std::make_unique(); + } + return Status::Invalid("Unknown special form: ", name); +} + +} // namespace compute +} // namespace arrow diff --git a/cpp/src/arrow/compute/special_form.h b/cpp/src/arrow/compute/special_form.h new file mode 100644 index 00000000000..c5028512879 --- /dev/null +++ b/cpp/src/arrow/compute/special_form.h @@ -0,0 +1,42 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// NOTE: API is EXPERIMENTAL and will change without going through a +// deprecation cycle. + +#pragma once + +#include "arrow/compute/expression.h" +#include "arrow/util/visibility.h" + +#include + +namespace arrow { +namespace compute { + +class ARROW_EXPORT SpecialForm { + public: + static Result> Make(const std::string& name); + + virtual ~SpecialForm() = default; + + virtual Result Execute(const Expression::Call& call, const ExecBatch& input, + ExecContext* exec_context) = 0; +}; + +} // namespace compute +} // namespace arrow diff --git a/cpp/src/arrow/compute/special_forms/if_else_special_form.cc b/cpp/src/arrow/compute/special_forms/if_else_special_form.cc new file mode 100644 index 00000000000..4dfd7bbca98 --- /dev/null +++ b/cpp/src/arrow/compute/special_forms/if_else_special_form.cc @@ -0,0 +1,59 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// NOTE: API is EXPERIMENTAL and will change without going through a +// deprecation cycle. + +#include "arrow/compute/special_forms/if_else_special_form.h" +#include "arrow/compute/exec.h" +#include "arrow/compute/expression.h" +#include "arrow/compute/expression_internal.h" + +namespace arrow { +namespace compute { + +Result IfElseSpecialForm::Execute(const Expression::Call& call, + const ExecBatch& input, + ExecContext* exec_context) { + DCHECK(!call.kernel->selection_vector_aware); + DCHECK(!input.selection_vector); + + std::vector arguments(call.arguments.size()); + ARROW_ASSIGN_OR_RAISE(arguments[0], + ExecuteScalarExpression(call.arguments[0], input, exec_context)); + // Use cond as selection vector for IF. + auto if_sel = std::make_shared(arguments[0].array()); + // Duplicate and invert cond as selection vector for ELSE. + ARROW_ASSIGN_OR_RAISE( + auto else_sel, + if_sel->Copy(CPUDevice::memory_manager(exec_context->memory_pool()))); + RETURN_NOT_OK(else_sel->Invert()); + + ExecBatch if_input = input; + if_input.selection_vector = std::move(if_sel); + ARROW_ASSIGN_OR_RAISE( + arguments[1], ExecuteScalarExpression(call.arguments[1], if_input, exec_context)); + ExecBatch else_input = input; + else_input.selection_vector = std::move(else_sel); + ARROW_ASSIGN_OR_RAISE( + arguments[2], ExecuteScalarExpression(call.arguments[2], else_input, exec_context)); + + return ExecuteCallNonRecursive(call, input, arguments, exec_context); +} + +} // namespace compute +} // namespace arrow diff --git a/cpp/src/arrow/compute/special_forms/if_else_special_form.h b/cpp/src/arrow/compute/special_forms/if_else_special_form.h new file mode 100644 index 00000000000..f03042356b3 --- /dev/null +++ b/cpp/src/arrow/compute/special_forms/if_else_special_form.h @@ -0,0 +1,35 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// NOTE: API is EXPERIMENTAL and will change without going through a +// deprecation cycle. + +#pragma once + +#include "arrow/compute/special_form.h" + +namespace arrow { +namespace compute { + +class ARROW_EXPORT IfElseSpecialForm : public SpecialForm { + public: + Result Execute(const Expression::Call& call, const ExecBatch& input, + ExecContext* exec_context) override; +}; + +} // namespace compute +} // namespace arrow diff --git a/cpp/src/arrow/compute/type_fwd.h b/cpp/src/arrow/compute/type_fwd.h index 89f32ceb0f9..6c9b4cc8a6f 100644 --- a/cpp/src/arrow/compute/type_fwd.h +++ b/cpp/src/arrow/compute/type_fwd.h @@ -50,6 +50,7 @@ struct VectorKernel; struct KernelState; class Expression; +class SpecialForm; ARROW_EXPORT ExecContext* default_exec_context(); ARROW_EXPORT ExecContext* threaded_exec_context(); From 0112889c5b6adf94d8413b9fa19e6718f54fc18e Mon Sep 17 00:00:00 2001 From: Ruoxi Sun Date: Tue, 11 Jun 2024 19:32:38 +0800 Subject: [PATCH 08/12] Revert WIP sel-aware kernel change --- .../arrow/compute/kernels/codegen_internal.h | 74 ------------------- cpp/src/arrow/util/bit_block_counter.h | 73 ------------------ 2 files changed, 147 deletions(-) diff --git a/cpp/src/arrow/compute/kernels/codegen_internal.h b/cpp/src/arrow/compute/kernels/codegen_internal.h index a75cc52b4aa..9e46a21887f 100644 --- a/cpp/src/arrow/compute/kernels/codegen_internal.h +++ b/cpp/src/arrow/compute/kernels/codegen_internal.h @@ -418,27 +418,6 @@ static void VisitTwoArrayValuesInline(const ArraySpan& arr0, const ArraySpan& ar std::move(visit_null)); } -template -static void VisitTwoArrayValuesWithSelInline(const ArraySpan& arr0, const ArraySpan& arr1, - VisitFunc&& valid_func, NullFunc&& null_func, - SelectionVector* sel) { - ArrayIterator arr0_it(arr0); - ArrayIterator arr1_it(arr1); - - auto visit_valid = [&](int64_t i) { - valid_func(GetViewType::LogicalValue(arr0_it()), - GetViewType::LogicalValue(arr1_it())); - }; - auto visit_null = [&]() { - arr0_it(); - arr1_it(); - null_func(); - }; - VisitTwoBitBlocksVoid(arr0.buffers[0].data, arr0.offset, arr1.buffers[0].data, - arr1.offset, arr0.length, std::move(visit_valid), - std::move(visit_null)); -} - // ---------------------------------------------------------------------- // Reusable type resolvers @@ -802,19 +781,6 @@ struct ScalarBinaryNotNullStateful { return st; } - Status ArrayArrayWithSel(KernelContext* ctx, const ArraySpan& arg0, - const ArraySpan& arg1, SelectionVector* sel, ExecResult* out) { - Status st = Status::OK(); - OutputArrayWriter writer(out->array_span_mutable()); - VisitTwoArrayValuesInline( - arg0, arg1, - [&](Arg0Value u, Arg1Value v) { - writer.Write(op.template Call(ctx, u, v, &st)); - }, - [&]() { writer.WriteNull(); }); - return st; - } - Status ArrayScalar(KernelContext* ctx, const ArraySpan& arg0, const Scalar& arg1, ExecResult* out) { Status st = Status::OK(); @@ -835,26 +801,6 @@ struct ScalarBinaryNotNullStateful { return st; } - Status ArrayScalarWithSel(KernelContext* ctx, const ArraySpan& arg0, const Scalar& arg1, - SelectionVector* sel, ExecResult* out) { - Status st = Status::OK(); - ArraySpan* out_span = out->array_span_mutable(); - OutputArrayWriter writer(out_span); - if (arg1.is_valid) { - const auto arg1_val = UnboxScalar::Unbox(arg1); - VisitArrayValuesInline( - arg0, - [&](Arg0Value u) { - writer.Write( - op.template Call(ctx, u, arg1_val, &st)); - }, - [&]() { writer.WriteNull(); }); - } else { - writer.WriteAllNull(out_span->length); - } - return st; - } - Status ScalarArray(KernelContext* ctx, const Scalar& arg0, const ArraySpan& arg1, ExecResult* out) { Status st = Status::OK(); @@ -875,26 +821,6 @@ struct ScalarBinaryNotNullStateful { return st; } - Status ScalarArrayWithSel(KernelContext* ctx, const Scalar& arg0, const ArraySpan& arg1, - SelectionVector* sel, ExecResult* out) { - Status st = Status::OK(); - ArraySpan* out_span = out->array_span_mutable(); - OutputArrayWriter writer(out_span); - if (arg0.is_valid) { - const auto arg0_val = UnboxScalar::Unbox(arg0); - VisitArrayValuesInline( - arg1, - [&](Arg1Value v) { - writer.Write( - op.template Call(ctx, arg0_val, v, &st)); - }, - [&]() { writer.WriteNull(); }); - } else { - writer.WriteAllNull(out_span->length); - } - return st; - } - Status Exec(KernelContext* ctx, const ExecSpan& batch, ExecResult* out) { if (batch[0].is_array()) { if (batch[1].is_array()) { diff --git a/cpp/src/arrow/util/bit_block_counter.h b/cpp/src/arrow/util/bit_block_counter.h index a31161f3935..73a1ee8600f 100644 --- a/cpp/src/arrow/util/bit_block_counter.h +++ b/cpp/src/arrow/util/bit_block_counter.h @@ -479,35 +479,6 @@ static void VisitBitBlocksVoid(const uint8_t* bitmap, int64_t offset, int64_t le } } -template -static void VisitBitBlocksWithSelVoid(const uint8_t* bitmap, int64_t offset, - int64_t length, const int32_t* sel, - int64_t sel_length, VisitNotNull&& visit_not_null, - VisitNull&& visit_null) { - internal::OptionalBitBlockCounter bit_counter(bitmap, offset, length); - int64_t position = 0; - while (position < length) { - internal::BitBlockCount block = bit_counter.NextBlock(); - if (block.AllSet()) { - for (int64_t i = 0; i < block.length; ++i, ++position) { - visit_not_null(position); - } - } else if (block.NoneSet()) { - for (int64_t i = 0; i < block.length; ++i, ++position) { - visit_null(); - } - } else { - for (int64_t i = 0; i < block.length; ++i, ++position) { - if (bit_util::GetBit(bitmap, offset + position)) { - visit_not_null(position); - } else { - visit_null(); - } - } - } - } -} - template static Status VisitTwoBitBlocks(const uint8_t* left_bitmap, int64_t left_offset, const uint8_t* right_bitmap, int64_t right_offset, @@ -595,49 +566,5 @@ static void VisitTwoBitBlocksVoid(const uint8_t* left_bitmap, int64_t left_offse } } -template -static void VisitTwoBitBlocksWithSelVoid(const uint8_t* left_bitmap, int64_t left_offset, - const uint8_t* right_bitmap, - int64_t right_offset, const int32_t* sel, - int64_t length, VisitNotNull&& visit_not_null, - VisitNull&& visit_null) { - if (left_bitmap == NULLPTR || right_bitmap == NULLPTR) { - // At most one bitmap is present - if (left_bitmap == NULLPTR) { - return VisitBitBlocksWithSelVoid(right_bitmap, right_offset, length, - std::forward(visit_not_null), - std::forward(visit_null)); - } else { - return VisitBitBlocksWithSelVoid(left_bitmap, left_offset, length, - std::forward(visit_not_null), - std::forward(visit_null)); - } - } - BinaryBitBlockCounter bit_counter(left_bitmap, left_offset, right_bitmap, right_offset, - length); - int64_t position = 0; - while (position < length) { - BitBlockCount block = bit_counter.NextAndWord(); - if (block.AllSet()) { - for (int64_t i = 0; i < block.length; ++i, ++position) { - visit_not_null(position); - } - } else if (block.NoneSet()) { - for (int64_t i = 0; i < block.length; ++i, ++position) { - visit_null(); - } - } else { - for (int64_t i = 0; i < block.length; ++i, ++position) { - if (bit_util::GetBit(left_bitmap, left_offset + position) && - bit_util::GetBit(right_bitmap, right_offset + position)) { - visit_not_null(position); - } else { - visit_null(); - } - } - } - } -} - } // namespace internal } // namespace arrow From 42d8c875b023bc43ae236254bd6839794a051e9a Mon Sep 17 00:00:00 2001 From: Ruoxi Sun Date: Tue, 11 Jun 2024 20:19:10 +0800 Subject: [PATCH 09/12] Doc --- cpp/src/arrow/compute/exec.h | 1 + cpp/src/arrow/compute/expression.cc | 11 +++++++---- cpp/src/arrow/compute/expression.h | 9 ++++++++- cpp/src/arrow/compute/kernel.h | 4 ++++ .../compute/special_forms/if_else_special_form.cc | 1 + 5 files changed, 21 insertions(+), 5 deletions(-) diff --git a/cpp/src/arrow/compute/exec.h b/cpp/src/arrow/compute/exec.h index 70d3d0a8ee2..be5ec7b4df7 100644 --- a/cpp/src/arrow/compute/exec.h +++ b/cpp/src/arrow/compute/exec.h @@ -427,6 +427,7 @@ struct ARROW_EXPORT ExecSpan { int64_t length = 0; std::vector values; + // A non-owning selection vector to be used by selection-vector-aware kernels. SelectionVector* selection_vector = NULLPTR; }; diff --git a/cpp/src/arrow/compute/expression.cc b/cpp/src/arrow/compute/expression.cc index 5a85fc33ccb..0ade611a7ab 100644 --- a/cpp/src/arrow/compute/expression.cc +++ b/cpp/src/arrow/compute/expression.cc @@ -591,8 +591,7 @@ Result BindNonRecursive(Expression::Call call, bool insert_implicit_ call.selection_vector_aware = call.kernel->selection_vector_aware; for (size_t i = 0; i < types.size(); ++i) { - call.selection_vector_aware = - call.selection_vector_aware && call.arguments[i].selection_vector_aware(); + call.selection_vector_aware &= call.arguments[i].selection_vector_aware(); if (types[i] == call.arguments[i].type()) continue; @@ -739,8 +738,9 @@ Result ExecuteScalarExpression(const Expression& expr, const Schema& full namespace { -// Execute a selection-vector-equipped scalar expression that is not fully -// selection-vector-aware using slow path: gather/scatter. +// Execute a scalar expression that is not fully selection-vector-aware on a batch +// carrying a valid selection vector using the slow path: gathering the input and +// scattering the output. Result ExecuteScalarExpressionWithSelSlowPath(const Expression& expr, const ExecBatch& input, compute::ExecContext* exec_context) { @@ -806,9 +806,12 @@ Result ExecuteScalarExpression(const Expression& expr, const ExecBatch& i auto call = CallNotNull(expr); if (call->is_special_form) { + // Let the special form take over the execution using its own logic of argument + // evaluation. DCHECK(call->special_form); return call->special_form->Execute(*call, input, exec_context); } else { + // Eagerly evaluate all the arguments. std::vector arguments(call->arguments.size()); for (size_t i = 0; i < arguments.size(); ++i) { ARROW_ASSIGN_OR_RAISE( diff --git a/cpp/src/arrow/compute/expression.h b/cpp/src/arrow/compute/expression.h index 7c26d25a1d9..d47e4a149c5 100644 --- a/cpp/src/arrow/compute/expression.h +++ b/cpp/src/arrow/compute/expression.h @@ -48,6 +48,8 @@ class ARROW_EXPORT Expression { std::string function_name; std::vector arguments; std::shared_ptr options; + // Whether this call is a special form (e.g. if-else). If true, the `special_form` + // field will be resolved in binding. bool is_special_form = false; // Cached hash value size_t hash; @@ -57,8 +59,9 @@ class ARROW_EXPORT Expression { const Kernel* kernel = NULLPTR; std::shared_ptr kernel_state; TypeHolder type; + // Whether the entire call (including all its arguments) is selection-vector-aware bool selection_vector_aware = false; - std::shared_ptr special_form{}; + std::shared_ptr special_form = nullptr; void ComputeHash(); }; @@ -121,6 +124,10 @@ class ARROW_EXPORT Expression { // XXX someday // NullGeneralization::type nullable() const; + /// Whether the entire expression (including all its subexpressions) is + /// selection-vector-aware. If true, then the expression can be executed using the "fast + /// path" - all kernels directly working on the selection vector. Otherwise the + /// execution takes the "slow path" - gathering the input and scattering the output. bool selection_vector_aware() const; struct Parameter { diff --git a/cpp/src/arrow/compute/kernel.h b/cpp/src/arrow/compute/kernel.h index 97bc6bb09d9..4f0738aa0b3 100644 --- a/cpp/src/arrow/compute/kernel.h +++ b/cpp/src/arrow/compute/kernel.h @@ -525,6 +525,10 @@ struct ARROW_EXPORT Kernel { // Additional kernel-specific data std::shared_ptr data; + /// \brief Indicates whether the kernel is selection-vector-aware. A + /// selection-vector-aware kernel is able to emit a partial result based on a given + /// selection vector without having to gathering the input and scattering the output, + /// aka. the "slow path", in the caller. bool selection_vector_aware = false; }; diff --git a/cpp/src/arrow/compute/special_forms/if_else_special_form.cc b/cpp/src/arrow/compute/special_forms/if_else_special_form.cc index 4dfd7bbca98..7968ff9a78e 100644 --- a/cpp/src/arrow/compute/special_forms/if_else_special_form.cc +++ b/cpp/src/arrow/compute/special_forms/if_else_special_form.cc @@ -36,6 +36,7 @@ Result IfElseSpecialForm::Execute(const Expression::Call& call, ARROW_ASSIGN_OR_RAISE(arguments[0], ExecuteScalarExpression(call.arguments[0], input, exec_context)); // Use cond as selection vector for IF. + // TODO: Consider chunked array for arguments[0]. auto if_sel = std::make_shared(arguments[0].array()); // Duplicate and invert cond as selection vector for ELSE. ARROW_ASSIGN_OR_RAISE( From 8bb5af38bc88be3c4ed4391faa829a3efa0fa2aa Mon Sep 17 00:00:00 2001 From: Ruoxi Sun Date: Tue, 11 Jun 2024 20:21:10 +0800 Subject: [PATCH 10/12] Doc --- cpp/src/arrow/compute/expression_internal.h | 1 + 1 file changed, 1 insertion(+) diff --git a/cpp/src/arrow/compute/expression_internal.h b/cpp/src/arrow/compute/expression_internal.h index 5c7b5d4fa1a..919a5f0569b 100644 --- a/cpp/src/arrow/compute/expression_internal.h +++ b/cpp/src/arrow/compute/expression_internal.h @@ -291,6 +291,7 @@ inline Result> GetFunction( return GetCastFunction(*to_type); } +// Execute a call expression with all arguments already evaluated. inline Result ExecuteCallNonRecursive(const Expression::Call& call, const ExecBatch& input, const std::vector& arguments, From b382a67122d6f5d1a1e4f944cbc966fd16597560 Mon Sep 17 00:00:00 2001 From: Ruoxi Sun Date: Wed, 12 Jun 2024 01:51:22 +0800 Subject: [PATCH 11/12] More doc --- cpp/src/arrow/compute/special_form.h | 20 +++++++++++++++++++ .../special_forms/if_else_special_form.cc | 10 ++++++---- 2 files changed, 26 insertions(+), 4 deletions(-) diff --git a/cpp/src/arrow/compute/special_form.h b/cpp/src/arrow/compute/special_form.h index c5028512879..b538151bb6e 100644 --- a/cpp/src/arrow/compute/special_form.h +++ b/cpp/src/arrow/compute/special_form.h @@ -28,8 +28,28 @@ namespace arrow { namespace compute { +/// The concept "special form" is borrowed from Lisp +/// (https://courses.cs.northwestern.edu/325/readings/special-forms.html). Velox also uses +/// the same term. A special form behaves like a function call except that it has special +/// evaluation rules, mostly for arguments. +/// For example, the `if_else(cond, expr1, expr2)` special form first evaluates the +/// argument `cond` and obtains a boolean array: +/// [true, false, true, false] +/// then the argument `expr1` should ONLY be evaluated for row: +/// [0, 2] +/// and the argument `expr2` should ONLY be evaluated for row: +/// [1, 3] +/// Consider, if `expr1`/`expr2` has some observable side-effects (e.g., division by zero +/// error) on row [1, 3]/[0, 2], these side-effects would be undesirably observed if +/// evaluated using a regular function call, which always evaluates all its arguments +/// eagerly. +/// Other special forms include `case_when`, `and`, and `or`, etc. +/// In a vectorized execution engine, a special form normally takes advantage of +/// "selection vector" to mask rows of arguments to be evaluated. class ARROW_EXPORT SpecialForm { public: + /// A poor man's factory method to create a special form by name. + /// TODO: More formal factory, a registry maybe? static Result> Make(const std::string& name); virtual ~SpecialForm() = default; diff --git a/cpp/src/arrow/compute/special_forms/if_else_special_form.cc b/cpp/src/arrow/compute/special_forms/if_else_special_form.cc index 7968ff9a78e..cade1a3ed79 100644 --- a/cpp/src/arrow/compute/special_forms/if_else_special_form.cc +++ b/cpp/src/arrow/compute/special_forms/if_else_special_form.cc @@ -29,16 +29,17 @@ namespace compute { Result IfElseSpecialForm::Execute(const Expression::Call& call, const ExecBatch& input, ExecContext* exec_context) { - DCHECK(!call.kernel->selection_vector_aware); - DCHECK(!input.selection_vector); + // The kernel (if_else) is not selection-vector-aware, so the input should not have a + // selection vector. + DCHECK(!call.kernel->selection_vector_aware && !input.selection_vector); std::vector arguments(call.arguments.size()); ARROW_ASSIGN_OR_RAISE(arguments[0], ExecuteScalarExpression(call.arguments[0], input, exec_context)); - // Use cond as selection vector for IF. + // Use cond as selection vector for IF branch. // TODO: Consider chunked array for arguments[0]. auto if_sel = std::make_shared(arguments[0].array()); - // Duplicate and invert cond as selection vector for ELSE. + // Duplicate and invert cond as selection vector for ELSE branch. ARROW_ASSIGN_OR_RAISE( auto else_sel, if_sel->Copy(CPUDevice::memory_manager(exec_context->memory_pool()))); @@ -53,6 +54,7 @@ Result IfElseSpecialForm::Execute(const Expression::Call& call, ARROW_ASSIGN_OR_RAISE( arguments[2], ExecuteScalarExpression(call.arguments[2], else_input, exec_context)); + // Leveraging if_else kernel with all arguments evaluated. return ExecuteCallNonRecursive(call, input, arguments, exec_context); } From 2bca08ca099692436a21f0a3045536a4b0f14f6e Mon Sep 17 00:00:00 2001 From: Ruoxi Sun Date: Wed, 12 Jun 2024 02:32:58 +0800 Subject: [PATCH 12/12] Lint --- cpp/src/arrow/compute/expression.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/arrow/compute/expression.h b/cpp/src/arrow/compute/expression.h index d47e4a149c5..f0f78316676 100644 --- a/cpp/src/arrow/compute/expression.h +++ b/cpp/src/arrow/compute/expression.h @@ -61,7 +61,7 @@ class ARROW_EXPORT Expression { TypeHolder type; // Whether the entire call (including all its arguments) is selection-vector-aware bool selection_vector_aware = false; - std::shared_ptr special_form = nullptr; + std::shared_ptr special_form = NULLPTR; void ComputeHash(); };