From 0b5ac64acc33da9d4ffa1cb4f11c9da15cb24c4d Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Tue, 16 Jun 2020 19:29:33 -0500 Subject: [PATCH 01/11] Benchmark enhancements --- .../kernels/scalar_compare_benchmark.cc | 56 ++++++++++++------- 1 file changed, 35 insertions(+), 21 deletions(-) diff --git a/cpp/src/arrow/compute/kernels/scalar_compare_benchmark.cc b/cpp/src/arrow/compute/kernels/scalar_compare_benchmark.cc index f7d74b4843e..8af575f5874 100644 --- a/cpp/src/arrow/compute/kernels/scalar_compare_benchmark.cc +++ b/cpp/src/arrow/compute/kernels/scalar_compare_benchmark.cc @@ -30,37 +30,51 @@ namespace compute { constexpr auto kSeed = 0x94378165; -static void CompareArrayScalarKernel(benchmark::State& state) { - RegressionArgs args(state); - const int64_t array_size = args.size / sizeof(int64_t); +template +static void CompareArrayScalar(benchmark::State& state) { + RegressionArgs args(state, /*size_is_bytes=*/false); + auto ty = TypeTraits::type_singleton(); auto rand = random::RandomArrayGenerator(kSeed); - auto array = std::static_pointer_cast>( - rand.Int64(array_size, -100, 100, args.null_proportion)); - - CompareOptions ge{GREATER_EQUAL}; - + auto array = rand.ArrayOf(ty, args.size, args.null_proportion); + auto scalar = *rand.ArrayOf(ty, 1, 0)->GetScalar(0); for (auto _ : state) { - ABORT_NOT_OK(Compare(array, Datum(int64_t(0)), ge).status()); + ABORT_NOT_OK(Compare(array, Datum(scalar), CompareOptions(op)).status()); } } -static void CompareArrayArrayKernel(benchmark::State& state) { - RegressionArgs args(state); - const int64_t array_size = args.size / sizeof(int64_t); +template +static void CompareArrayArray(benchmark::State& state) { + RegressionArgs args(state, /*size_is_bytes=*/false); + auto ty = TypeTraits::type_singleton(); auto rand = random::RandomArrayGenerator(kSeed); - auto lhs = std::static_pointer_cast>( - rand.Int64(array_size, -100, 100, args.null_proportion)); - auto rhs = std::static_pointer_cast>( - rand.Int64(array_size, -100, 100, args.null_proportion)); - - CompareOptions ge(GREATER_EQUAL); + auto lhs = rand.ArrayOf(ty, args.size, args.null_proportion); + auto rhs = rand.ArrayOf(ty, args.size, args.null_proportion); for (auto _ : state) { - ABORT_NOT_OK(Compare(lhs, rhs, ge).status()); + ABORT_NOT_OK(Compare(lhs, rhs, CompareOptions(op)).status()); } } -BENCHMARK(CompareArrayScalarKernel)->Apply(RegressionSetArgs); -BENCHMARK(CompareArrayArrayKernel)->Apply(RegressionSetArgs); +static void GreaterArrayArrayInt64(benchmark::State& state) { + CompareArrayArray(state); +} + +static void GreaterArrayScalarInt64(benchmark::State& state) { + CompareArrayScalar(state); +} + +static void GreaterArrayArrayString(benchmark::State& state) { + CompareArrayArray(state); +} + +static void GreaterArrayScalarString(benchmark::State& state) { + CompareArrayScalar(state); +} + +BENCHMARK(GreaterArrayArrayInt64)->Apply(RegressionSetArgs); +BENCHMARK(GreaterArrayScalarInt64)->Apply(RegressionSetArgs); + +BENCHMARK(GreaterArrayArrayString)->Apply(RegressionSetArgs); +BENCHMARK(GreaterArrayScalarString)->Apply(RegressionSetArgs); } // namespace compute } // namespace arrow From 0d5ed54f209cc351360e4e66141080580d23e05b Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Tue, 16 Jun 2020 10:48:22 -0500 Subject: [PATCH 02/11] Work on paring down comparisons Finish paring down compare, implement needed simplification of kernel generators Benchmark improvements --- .../arrow/compute/kernels/codegen_internal.cc | 11 +- .../arrow/compute/kernels/codegen_internal.h | 125 +++++++++++++++--- .../arrow/compute/kernels/scalar_compare.cc | 98 ++++++++------ cpp/src/arrow/compute/kernels/vector_sort.cc | 2 +- cpp/src/arrow/scalar.h | 19 +-- 5 files changed, 177 insertions(+), 78 deletions(-) diff --git a/cpp/src/arrow/compute/kernels/codegen_internal.cc b/cpp/src/arrow/compute/kernels/codegen_internal.cc index 427ad2b0c54..2606c978e39 100644 --- a/cpp/src/arrow/compute/kernels/codegen_internal.cc +++ b/cpp/src/arrow/compute/kernels/codegen_internal.cc @@ -31,11 +31,12 @@ void ExecFail(KernelContext* ctx, const ExecBatch& batch, Datum* out) { ctx->SetStatus(Status::NotImplemented("This kernel is malformed")); } -void BinaryExecFlipped(KernelContext* ctx, ArrayKernelExec exec, const ExecBatch& batch, - Datum* out) { - ExecBatch flipped_batch = batch; - std::swap(flipped_batch.values[0], flipped_batch.values[1]); - exec(ctx, flipped_batch, out); +ArrayKernelExec MakeFlippedBinaryExec(ArrayKernelExec exec) { + return [exec](KernelContext* ctx, const ExecBatch& batch, Datum* out) { + ExecBatch flipped_batch = batch; + std::swap(flipped_batch.values[0], flipped_batch.values[1]); + exec(ctx, flipped_batch, out); + }; } std::vector> g_signed_int_types; diff --git a/cpp/src/arrow/compute/kernels/codegen_internal.h b/cpp/src/arrow/compute/kernels/codegen_internal.h index c547c807757..87d2366f428 100644 --- a/cpp/src/arrow/compute/kernels/codegen_internal.h +++ b/cpp/src/arrow/compute/kernels/codegen_internal.h @@ -23,8 +23,10 @@ #include #include +#include "arrow/array/builder_binary.h" #include "arrow/array/data.h" #include "arrow/buffer.h" +#include "arrow/buffer_builder.h" #include "arrow/compute/exec.h" #include "arrow/compute/kernel.h" #include "arrow/datum.h" @@ -121,10 +123,26 @@ struct ArrayIterator> { template struct ArrayIterator> { - int64_t position = 0; - typename TypeTraits::ArrayType arr; - explicit ArrayIterator(const ArrayData& data) : arr(data.Copy()) {} - util::string_view operator()() { return arr.GetView(position++); } + using offset_type = typename Type::offset_type; + const ArrayData& arr; + const offset_type* offsets; + offset_type cur_offset; + const char* data; + int64_t position; + explicit ArrayIterator(const ArrayData& arr) + : arr(arr), + offsets(reinterpret_cast(arr.buffers[1]->data()) + + arr.offset), + cur_offset(offsets[0]), + data(reinterpret_cast(arr.buffers[2]->data())), + position(0) {} + + util::string_view operator()() { + offset_type next_offset = offsets[position++ + 1]; + auto result = util::string_view(data + cur_offset, next_offset - cur_offset); + cur_offset = next_offset; + return result; + } }; template @@ -229,8 +247,7 @@ Result FirstType(KernelContext*, const std::vector& desc void ExecFail(KernelContext* ctx, const ExecBatch& batch, Datum* out); -void BinaryExecFlipped(KernelContext* ctx, ArrayKernelExec exec, const ExecBatch& batch, - Datum* out); +ArrayKernelExec MakeFlippedBinaryExec(ArrayKernelExec exec); // ---------------------------------------------------------------------- // Helpers for iterating over common DataType instances for adding kernels to @@ -485,18 +502,38 @@ struct ScalarUnaryNotNullStateful { struct ArrayExec> { static void Exec(const ThisType& functor, KernelContext* ctx, const ExecBatch& batch, Datum* out) { - typename TypeTraits::BuilderType builder; - VisitArrayValuesInline(*batch[0].array(), [&](util::optional v) { - if (v.has_value()) { - KERNEL_RETURN_IF_ERROR(ctx, builder.Append(functor.op.Call(ctx, *v))); - } else { - KERNEL_RETURN_IF_ERROR(ctx, builder.AppendNull()); + using offset_type = typename Type::offset_type; + const ArrayData& arg0 = *batch[0].array(); + typename TypeTraits::ArrayType arg0_boxed(batch[0].array()); + + ArrayData* out_arr = out->mutable_array(); + TypedBufferBuilder offset_builder; + TypedBufferBuilder data_builder; + + KERNEL_RETURN_IF_ERROR(ctx, offset_builder.Reserve(arg0.length + 1)); + offset_type offset = 0; + + const offset_type* in_offsets = arg0_boxed.raw_value_offsets(); + const uint8_t* in_data = arg0.buffers[2]->data(); + offset_type cur_offset = in_offsets[0]; + for (int64_t i = 0; i < arg0.length; ++i) { + offset_type next_offset = in_offsets[i + 1]; + if (arg0_boxed.IsValid(i)) { + auto val_size = next_offset - cur_offset; + auto val = + functor.op.Call(ctx, util::string_view(in_data + cur_offset, val_size)); + if (std::is_same::value && + (static_cast(offset) + val_size > kBinaryMemoryLimit)) { + ctx->SetStatus(Status::OutOfMemory("Overflowed 32-bit binary builder")); + return; + } + KERNEL_RETURN_IF_ERROR(ctx, data_builder.Append(val.data(), val_size)); } - }); + cur_offset = next_offset; + } if (!ctx->HasError()) { - std::shared_ptr result; - ctx->SetStatus(builder.FinishInternal(&result)); - out->value = std::move(result); + KERNEL_RETURN_IF_ERROR(ctx, offset_builder.Finish(&out_arr->buffers[1])); + KERNEL_RETURN_IF_ERROR(ctx, data_builder.Finish(&out_arr->buffers[2])); } } }; @@ -787,6 +824,41 @@ ArrayKernelExec Integer(detail::GetTypeId get_id) { } } +template