From 133ad649f5c430b1e1feedc86bb0674af6933a8a Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Sat, 27 Jun 2020 10:29:17 -0500 Subject: [PATCH 1/2] Start drafting binary filter --- .../arrow/compute/kernels/vector_selection.cc | 243 +++++++++++++++++- 1 file changed, 241 insertions(+), 2 deletions(-) diff --git a/cpp/src/arrow/compute/kernels/vector_selection.cc b/cpp/src/arrow/compute/kernels/vector_selection.cc index 71ad39d15ab..a9d7222ac42 100644 --- a/cpp/src/arrow/compute/kernels/vector_selection.cc +++ b/cpp/src/arrow/compute/kernels/vector_selection.cc @@ -841,6 +841,245 @@ void PrimitiveFilter(KernelContext* ctx, const ExecBatch& batch, Datum* out) { } } +// ---------------------------------------------------------------------- +// Optimized filter for base binary types (32-bit and 64-bit) + +template +struct BinaryFilterImpl { + using offset_type = typename Type::offset_type; + static constexpr int64_t kOffsetLimit = std::numeric_limits::max() - 1; + + std::shared_ptr values_as_binary; + TypedBufferBuilder offset_builder; + TypedBufferBuilder data_builder; + + KernelContext* ctx; + std::shared_ptr values; + std::shared_ptr selection; + int64_t output_length; + ArrayData* out; + TypedBufferBuilder validity_builder; + + VarBinaryImpl(KernelContext* ctx, const ExecBatch& batch, int64_t output_length, + Datum* out) + : Base(ctx, batch, output_length, out), + offset_builder(ctx->memory_pool()), + data_builder(ctx->memory_pool()) {} + + Selection(KernelContext* ctx, const ExecBatch& batch, int64_t output_length, Datum* out) + : ctx(ctx), + values(batch[0].array()), + selection(batch[1].array()), + output_length(output_length), + out(out->mutable_array()), + validity_builder(ctx->memory_pool()) {} + + + Status FinishCommon() { + out->buffers.resize(values->buffers.size()); + out->length = validity_builder.length(); + out->null_count = validity_builder.false_count(); + return validity_builder.Finish(&out->buffers[0]); + } + + // We use the NullVisitor both for "selected" nulls as well as "emitted" + // nulls coming from the filter when using FilterOptions::EMIT_NULL + template + Status VisitFilter(ValidVisitor&& visit_valid, NullVisitor&& visit_null) { + auto null_selection = FilterState::Get(ctx).null_selection_behavior; + + const auto filter_data = selection->buffers[1]->data(); + + const uint8_t* filter_is_valid = GetValidityBitmap(*selection); + const int64_t filter_offset = selection->offset; + OptionalBitIndexer values_is_valid(values->buffers[0], values->offset); + + // We use 3 block counters for fast scanning of the filter + // + // * values_valid_counter: for values null/not-null + // * filter_valid_counter: for filter null/not-null + // * filter_counter: for filter true/false + OptionalBitBlockCounter values_valid_counter(GetValidityBitmap(*values), + values->offset, values->length); + OptionalBitBlockCounter filter_valid_counter(filter_is_valid, filter_offset, + selection->length); + BitBlockCounter filter_counter(filter_data, filter_offset, selection->length); + int64_t in_position = 0; + + auto AppendNotNull = [&](int64_t index) -> Status { + validity_builder.UnsafeAppend(true); + return visit_valid(index); + }; + + auto AppendNull = [&]() -> Status { + validity_builder.UnsafeAppend(false); + return visit_null(); + }; + + auto AppendMaybeNull = [&](int64_t index) -> Status { + if (values_is_valid[index]) { + return AppendNotNull(index); + } else { + return AppendNull(); + } + }; + + while (in_position < selection->length) { + BitBlockCount filter_valid_block = filter_valid_counter.NextWord(); + BitBlockCount values_valid_block = values_valid_counter.NextWord(); + BitBlockCount filter_block = filter_counter.NextWord(); + if (filter_block.NoneSet() && null_selection == FilterOptions::DROP) { + // For this exceedingly common case in low-selectivity filters we can + // skip further analysis of the data and move on to the next block. + in_position += filter_block.length; + } else if (filter_valid_block.AllSet()) { + // Simpler path: no filter values are null + if (filter_block.AllSet()) { + // Fastest path: filter values are all true and not null + if (values_valid_block.AllSet()) { + // The values aren't null either + validity_builder.UnsafeAppend(filter_block.length, true); + for (int64_t i = 0; i < filter_block.length; ++i) { + RETURN_NOT_OK(visit_valid(in_position++)); + } + } else { + // Some of the values in this block are null + for (int64_t i = 0; i < filter_block.length; ++i) { + RETURN_NOT_OK(AppendMaybeNull(in_position++)); + } + } + } else { // !filter_block.AllSet() + // Some of the filter values are false, but all not null + if (values_valid_block.AllSet()) { + // All the values are not-null, so we can skip null checking for + // them + for (int64_t i = 0; i < filter_block.length; ++i) { + if (BitUtil::GetBit(filter_data, filter_offset + in_position)) { + RETURN_NOT_OK(AppendNotNull(in_position)); + } + ++in_position; + } + } else { + // Some of the values in the block are null, so we have to check + // each one + for (int64_t i = 0; i < filter_block.length; ++i) { + if (BitUtil::GetBit(filter_data, filter_offset + in_position)) { + RETURN_NOT_OK(AppendMaybeNull(in_position)); + } + ++in_position; + } + } + } + } else { // !filter_valid_block.AllSet() + // Some of the filter values are null, so we have to handle the DROP + // versus EMIT_NULL null selection behavior. + if (null_selection == FilterOptions::DROP) { + // Filter null values are treated as false. + for (int64_t i = 0; i < filter_block.length; ++i) { + if (BitUtil::GetBit(filter_is_valid, filter_offset + in_position) && + BitUtil::GetBit(filter_data, filter_offset + in_position)) { + RETURN_NOT_OK(AppendMaybeNull(in_position)); + } + ++in_position; + } + } else { + // Filter null values are appended to output as null whether the + // value in the corresponding slot is valid or not + for (int64_t i = 0; i < filter_block.length; ++i) { + const bool filter_not_null = + BitUtil::GetBit(filter_is_valid, filter_offset + in_position); + if (filter_not_null && + BitUtil::GetBit(filter_data, filter_offset + in_position)) { + RETURN_NOT_OK(AppendMaybeNull(in_position)); + } else if (!filter_not_null) { + // EMIT_NULL case + RETURN_NOT_OK(AppendNull()); + } + ++in_position; + } + } + } + } + return Status::OK(); + } + + virtual Status Init() { return Status::OK(); } + + // Implementation specific finish logic + virtual Status Finish() = 0; + + Status ExecFilter() { + RETURN_NOT_OK(this->validity_builder.Reserve(output_length)); + RETURN_NOT_OK(Init()); + // CRTP dispatch + Status s = static_cast(this)->template GenerateOutput(); + RETURN_NOT_OK(s); + RETURN_NOT_OK(this->FinishCommon()); + return Finish(); + + template + Status GenerateOutput() { + ValuesArrayType typed_values(this->values_as_binary); + + // Presize the data builder with a rough estimate of the required data size + if (values->length > 0) { + const double mean_value_length = + (typed_values.total_values_length() / static_cast(values->length)); + + // TODO: See if possible to reduce output_length for take/filter cases + // where there are nulls in the selection array + RETURN_NOT_OK( + data_builder.Reserve(static_cast(mean_value_length * output_length))); + } + int64_t space_available = data_builder.capacity(); + + const offset_type* raw_offsets = typed_values.raw_value_offsets(); + const uint8_t* raw_data = typed_values.raw_data(); + + offset_type offset = 0; + Adapter adapter(this); + RETURN_NOT_OK(adapter.Generate( + [&](int64_t index) { + offset_builder.UnsafeAppend(offset); + offset_type val_offset = raw_offsets[index]; + offset_type val_size = raw_offsets[index + 1] - val_offset; + + // Use static property to prune this code from the filter path in + // optimized builds + if (Adapter::is_take && + ARROW_PREDICT_FALSE(static_cast(offset) + + static_cast(val_size)) > kOffsetLimit) { + return Status::Invalid("Take operation overflowed binary array capacity"); + } + offset += val_size; + if (ARROW_PREDICT_FALSE(val_size > space_available)) { + RETURN_NOT_OK(data_builder.Reserve(val_size)); + space_available = data_builder.capacity() - data_builder.length(); + } + data_builder.UnsafeAppend(raw_data + val_offset, val_size); + space_available -= val_size; + return Status::OK(); + }, + [&]() { + offset_builder.UnsafeAppend(offset); + return Status::OK(); + })); + offset_builder.UnsafeAppend(offset); + return Status::OK(); + } + + Status Init() override { + ARROW_ASSIGN_OR_RAISE(this->values_as_binary, + GetArrayView(this->values, TypeTraits::type_singleton())); + return offset_builder.Reserve(output_length + 1); + } + + Status Finish() override { + RETURN_NOT_OK(offset_builder.Finish(&out->buffers[1])); + return data_builder.Finish(&out->buffers[2]); + } +}; + // ---------------------------------------------------------------------- // Null take and filter @@ -1195,8 +1434,8 @@ struct Selection { static inline Status VisitNoop() { return Status::OK(); } -// A take implementation for 32-bit and 64-bit variable binary types. Common -// generated kernels are shared between Binary/String and +// A selection implementation for 32-bit and 64-bit variable binary +// types. Common generated kernels are shared between Binary/String and // LargeBinary/LargeString template struct VarBinaryImpl : public Selection, Type> { From de06c49424c7dc89e82d057d07e527fd87e7ac71 Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Sun, 28 Jun 2020 13:14:41 -0500 Subject: [PATCH 2/2] Complete implementation, tests passing --- .../arrow/compute/kernels/vector_selection.cc | 547 ++++++++++-------- 1 file changed, 313 insertions(+), 234 deletions(-) diff --git a/cpp/src/arrow/compute/kernels/vector_selection.cc b/cpp/src/arrow/compute/kernels/vector_selection.cc index a9d7222ac42..8d57ad880f7 100644 --- a/cpp/src/arrow/compute/kernels/vector_selection.cc +++ b/cpp/src/arrow/compute/kernels/vector_selection.cc @@ -233,19 +233,18 @@ using FilterState = OptionsWrapper; using TakeState = OptionsWrapper; Status PreallocateData(KernelContext* ctx, int64_t length, int bit_width, - bool allocate_validity, Datum* out) { + bool allocate_validity, ArrayData* out) { // Preallocate memory - ArrayData* out_arr = out->mutable_array(); - out_arr->length = length; - out_arr->buffers.resize(2); + out->length = length; + out->buffers.resize(2); if (allocate_validity) { - ARROW_ASSIGN_OR_RAISE(out_arr->buffers[0], ctx->AllocateBitmap(length)); + ARROW_ASSIGN_OR_RAISE(out->buffers[0], ctx->AllocateBitmap(length)); } if (bit_width == 1) { - ARROW_ASSIGN_OR_RAISE(out_arr->buffers[1], ctx->AllocateBitmap(length)); + ARROW_ASSIGN_OR_RAISE(out->buffers[1], ctx->AllocateBitmap(length)); } else { - ARROW_ASSIGN_OR_RAISE(out_arr->buffers[1], ctx->Allocate(length * bit_width / 8)); + ARROW_ASSIGN_OR_RAISE(out->buffers[1], ctx->Allocate(length * bit_width / 8)); } return Status::OK(); } @@ -265,7 +264,7 @@ Status PreallocateData(KernelContext* ctx, int64_t length, int bit_width, template struct PrimitiveTakeImpl { static void Exec(const PrimitiveArg& values, const PrimitiveArg& indices, - Datum* out_datum) { + ArrayData* out_arr) { auto values_data = reinterpret_cast(values.data); auto values_is_valid = values.is_valid; auto values_offset = values.offset; @@ -274,7 +273,6 @@ struct PrimitiveTakeImpl { auto indices_is_valid = indices.is_valid; auto indices_offset = indices.offset; - ArrayData* out_arr = out_datum->mutable_array(); auto out = out_arr->GetMutableValues(1); auto out_is_valid = out_arr->buffers[0]->mutable_data(); auto out_offset = out_arr->offset; @@ -364,7 +362,7 @@ struct PrimitiveTakeImpl { template struct BooleanTakeImpl { static void Exec(const PrimitiveArg& values, const PrimitiveArg& indices, - Datum* out_datum) { + ArrayData* out_arr) { const uint8_t* values_data = values.data; auto values_is_valid = values.is_valid; auto values_offset = values.offset; @@ -373,7 +371,6 @@ struct BooleanTakeImpl { auto indices_is_valid = indices.is_valid; auto indices_offset = indices.offset; - ArrayData* out_arr = out_datum->mutable_array(); auto out = out_arr->buffers[1]->mutable_data(); auto out_is_valid = out_arr->buffers[0]->mutable_data(); auto out_offset = out_arr->offset; @@ -463,7 +460,7 @@ struct BooleanTakeImpl { template