diff --git a/cpp/src/arrow/compute/api_vector.cc b/cpp/src/arrow/compute/api_vector.cc index 95114d8d8a5..a5cb61d6b55 100644 --- a/cpp/src/arrow/compute/api_vector.cc +++ b/cpp/src/arrow/compute/api_vector.cc @@ -220,6 +220,14 @@ Result ReplaceWithMask(const Datum& values, const Datum& mask, return CallFunction("replace_with_mask", {values, mask, replacements}, ctx); } +Result FillNullForward(const Datum& values, ExecContext* ctx) { + return CallFunction("fill_null_forward", {values}, ctx); +} + +Result FillNullBackward(const Datum& values, ExecContext* ctx) { + return CallFunction("fill_null_backward", {values}, ctx); +} + Result> SortIndices(const Array& values, const ArraySortOptions& options, ExecContext* ctx) { diff --git a/cpp/src/arrow/compute/api_vector.h b/cpp/src/arrow/compute/api_vector.h index 8788d5d160e..152005093dc 100644 --- a/cpp/src/arrow/compute/api_vector.h +++ b/cpp/src/arrow/compute/api_vector.h @@ -246,6 +246,34 @@ ARROW_EXPORT Result ReplaceWithMask(const Datum& values, const Datum& mask, const Datum& replacements, ExecContext* ctx = NULLPTR); +/// \brief FillNullForward fill null values in forward direction +/// +/// The output array will be of the same type as the input values +/// array, with replaced null values in forward direction. +/// +/// For example given values = ["a", "b", "c", null, null, "f"], +/// the output will be = ["a", "b", "c", "c", "c", "f"] +/// +/// \param[in] values datum from which to take +/// \param[in] ctx the function execution context, optional +/// \return the resulting datum +ARROW_EXPORT +Result FillNullForward(const Datum& values, ExecContext* ctx = NULLPTR); + +/// \brief FillNullBackward fill null values in backward direction +/// +/// The output array will be of the same type as the input values +/// array, with replaced null values in backward direction. +/// +/// For example given values = ["a", "b", "c", null, null, "f"], +/// the output will be = ["a", "b", "c", "f", "f", "f"] +/// +/// \param[in] values datum from which to take +/// \param[in] ctx the function execution context, optional +/// \return the resulting datum +ARROW_EXPORT +Result FillNullBackward(const Datum& values, ExecContext* ctx = NULLPTR); + /// \brief Take from an array of values at indices in another array /// /// The output array will be of the same type as the input values diff --git a/cpp/src/arrow/compute/kernels/vector_replace.cc b/cpp/src/arrow/compute/kernels/vector_replace.cc index 63d0ed94b56..35790e96af4 100644 --- a/cpp/src/arrow/compute/kernels/vector_replace.cc +++ b/cpp/src/arrow/compute/kernels/vector_replace.cc @@ -442,23 +442,404 @@ struct ReplaceWithMaskFunctor { } return ReplaceWithMask::ExecArrayMask(ctx, array, mask, replacements, output); } + + static std::shared_ptr GetSignature(detail::GetTypeId get_id) { + return KernelSignature::Make( + {InputType::Array(get_id.id), InputType(boolean()), InputType(get_id.id)}, + OutputType(FirstType)); + } }; -} // namespace +// This is for fixed-size types only +template +void FillNullInDirectionImpl(const ArrayData& current_chunk, const uint8_t* null_bitmap, + ArrayData* output, int8_t direction, + const ArrayData& last_valid_value_chunk, + int64_t* last_valid_value_offset) { + uint8_t* out_bitmap = output->buffers[0]->mutable_data(); + uint8_t* out_values = output->buffers[1]->mutable_data(); + arrow::internal::CopyBitmap(current_chunk.buffers[0]->data(), current_chunk.offset, + current_chunk.length, out_bitmap, output->offset); + ReplaceWithMask::CopyData(*current_chunk.type, out_values, + /*out_offset=*/output->offset, current_chunk, + /*in_offset=*/0, current_chunk.length); + + bool has_fill_value = *last_valid_value_offset != -1; + int64_t write_offset = direction == 1 ? 0 : current_chunk.length - 1; + int64_t bitmap_offset = 0; + + arrow::internal::OptionalBitBlockCounter counter(null_bitmap, output->offset, + current_chunk.length); + bool use_current_chunk = false; + while (bitmap_offset < current_chunk.length) { + BitBlockCount block = counter.NextBlock(); + if (block.AllSet()) { + *last_valid_value_offset = + write_offset + direction * (block.length - 1 + bitmap_offset); + has_fill_value = true; + use_current_chunk = true; + } else { + uint64_t block_start_offset = write_offset + direction * bitmap_offset; + uint64_t write_value_offset = block_start_offset; + if (block.popcount) { + for (int64_t i = 0; i < block.length; i++, write_value_offset += direction) { + auto current_bit = bit_util::GetBit(null_bitmap, bitmap_offset + i); + if (!current_bit) { + if (has_fill_value) { + ReplaceWithMask::CopyData( + *current_chunk.type, out_values, write_value_offset, + use_current_chunk ? current_chunk : last_valid_value_chunk, + *last_valid_value_offset, + /*length=*/1); + bit_util::SetBitTo(out_bitmap, write_value_offset, true); + } + } else { + has_fill_value = true; + use_current_chunk = true; + *last_valid_value_offset = write_value_offset; + } + } + } else { + for (int64_t i = 0; i < block.length; i++, write_value_offset += direction) { + if (has_fill_value) { + ReplaceWithMask::CopyData( + *current_chunk.type, out_values, write_value_offset, + use_current_chunk ? current_chunk : last_valid_value_chunk, + *last_valid_value_offset, + /*length=*/1); + bit_util::SetBitTo(out_bitmap, write_value_offset, true); + } + } + } + } + bitmap_offset += block.length; + } + output->null_count = -1; + output->GetNullCount(); +} -const FunctionDoc replace_with_mask_doc( - "Replace items selected with a mask", - ("Given an array and a boolean mask (either scalar or of equal length),\n" - "along with replacement values (either scalar or array),\n" - "each element of the array for which the corresponding mask element is\n" - "true will be replaced by the next value from the replacements,\n" - "or with null if the mask is null.\n" - "Hence, for replacement arrays, len(replacements) == sum(mask == true)."), - {"values", "mask", "replacements"}); +template +struct FillNullExecutor {}; -void RegisterVectorReplace(FunctionRegistry* registry) { - auto func = std::make_shared("replace_with_mask", Arity::Ternary(), - &replace_with_mask_doc); +template +struct FillNullExecutor> { + static Status ExecFillNull(KernelContext* ctx, const ArrayData& array, + const uint8_t* reversed_bitmap, ArrayData* output, + int8_t direction, const ArrayData& last_valid_value_chunk, + int64_t* last_valid_value_offset) { + FillNullInDirectionImpl(array, reversed_bitmap, output, direction, + last_valid_value_chunk, last_valid_value_offset); + return Status::OK(); + } +}; + +template +struct FillNullExecutor< + Type, enable_if_t::value || + std::is_same::value>> { + static Status ExecFillNull(KernelContext* ctx, const ArrayData& array, + const uint8_t* reversed_bitmap, ArrayData* output, + int8_t direction, const ArrayData& last_valid_value_chunk, + int64_t* last_valid_value_offset) { + FillNullInDirectionImpl(array, reversed_bitmap, output, direction, + last_valid_value_chunk, last_valid_value_offset); + return Status::OK(); + } +}; + +template +struct FillNullExecutor> { + static Status ExecFillNull(KernelContext* ctx, const ArrayData& array, + const uint8_t* reversed_bitmap, ArrayData* output, + int8_t direction, const ArrayData& last_valid_value_chunk, + int64_t* last_valid_value_offset) { + FillNullInDirectionImpl(array, reversed_bitmap, output, direction, + last_valid_value_chunk, last_valid_value_offset); + return Status::OK(); + } +}; + +template +struct FillNullExecutor> { + using offset_type = typename Type::offset_type; + using BuilderType = typename TypeTraits::BuilderType; + + static Status ExecFillNull(KernelContext* ctx, const ArrayData& current_chunk, + const uint8_t* reversed_bitmap, ArrayData* output, + int8_t direction, const ArrayData& last_valid_value_chunk, + int64_t* last_valid_value_offset) { + BuilderType builder(current_chunk.type, ctx->memory_pool()); + RETURN_NOT_OK(builder.Reserve(current_chunk.length)); + RETURN_NOT_OK(builder.ReserveData(current_chunk.buffers[2]->size())); + int64_t array_value_index = direction == 1 ? 0 : current_chunk.length - 1; + const uint8_t* data = current_chunk.buffers[2]->data(); + const uint8_t* data_prev = last_valid_value_chunk.buffers[2]->data(); + const offset_type* offsets = current_chunk.GetValues(1); + const offset_type* offsets_prev = last_valid_value_chunk.GetValues(1); + + bool has_fill_value_last_chunk = *last_valid_value_offset != -1; + bool has_fill_value_current_chunk = false; + /*tuple for store: */ + std::vector> offsets_reversed; + RETURN_NOT_OK(VisitNullBitmapInline<>( + reversed_bitmap, output->offset, current_chunk.length, + current_chunk.GetNullCount(), + [&]() { + const offset_type offset0 = offsets[array_value_index]; + const offset_type offset1 = offsets[array_value_index + 1]; + offsets_reversed.push_back( + std::make_tuple(/*current_chunk=*/true, offset0, offset1 - offset0)); + *last_valid_value_offset = array_value_index; + has_fill_value_current_chunk = true; + has_fill_value_last_chunk = false; + array_value_index += direction; + return Status::OK(); + }, + [&]() { + if (has_fill_value_current_chunk) { + const offset_type offset0 = offsets[*last_valid_value_offset]; + const offset_type offset1 = offsets[*last_valid_value_offset + 1]; + offsets_reversed.push_back( + std::make_tuple(/*current_chunk=*/true, offset0, offset1 - offset0)); + } else if (has_fill_value_last_chunk) { + const offset_type offset0 = offsets_prev[*last_valid_value_offset]; + const offset_type offset1 = offsets_prev[*last_valid_value_offset + 1]; + offsets_reversed.push_back( + std::make_tuple(/*current_chunk=*/false, offset0, offset1 - offset0)); + } else { + offsets_reversed.push_back(std::make_tuple(/*current_chunk=*/false, -1, -1)); + } + array_value_index += direction; + return Status::OK(); + })); + + if (direction == 1) { + for (auto it = offsets_reversed.begin(); it != offsets_reversed.end(); ++it) { + if (std::get<1>(*it) == -1 && std::get<2>(*it) == -1) { + RETURN_NOT_OK(builder.AppendNull()); + } else if (std::get<0>(*it)) { + RETURN_NOT_OK(builder.Append(data + std::get<1>(*it), std::get<2>(*it))); + } else { + RETURN_NOT_OK(builder.Append(data_prev + std::get<1>(*it), std::get<2>(*it))); + } + } + } else { + for (auto it = offsets_reversed.rbegin(); it != offsets_reversed.rend(); ++it) { + if (std::get<1>(*it) == -1 && std::get<2>(*it) == -1) { + RETURN_NOT_OK(builder.AppendNull()); + } else if (std::get<0>(*it)) { + RETURN_NOT_OK(builder.Append(data + std::get<1>(*it), std::get<2>(*it))); + } else { + RETURN_NOT_OK(builder.Append(data_prev + std::get<1>(*it), std::get<2>(*it))); + } + } + } + + std::shared_ptr temp_output; + RETURN_NOT_OK(builder.Finish(&temp_output)); + *output = *temp_output->data(); + // Builder type != logical type due to GenerateTypeAgnosticVarBinaryBase + output->type = current_chunk.type; + return Status::OK(); + } +}; + +template +struct FillNullExecutor> { + static Status ExecFillNull(KernelContext* ctx, const ArrayData& array, + const uint8_t* reversed_bitmap, ArrayData* output, + int8_t direction, const ArrayData& last_valid_value_chunk, + int64_t* last_valid_value_offset) { + *output = array; + return Status::OK(); + } +}; + +template +struct FillNullForwardFunctor { + static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) { + switch (batch[0].kind()) { + case Datum::ARRAY: { + auto array_input = *batch[0].array(); + int64_t last_valid_value_offset = -1; + return FillNullForwardArray(ctx, array_input, out, array_input, + &last_valid_value_offset); + } + case Datum::CHUNKED_ARRAY: { + return FillNullForwardChunkedArray(ctx, batch[0].chunked_array(), out); + } + default: + break; + } + return Status::NotImplemented("Unsupported type for fill_null_forward: ", + batch[0].ToString()); + } + + static Status FillNullForwardArray(KernelContext* ctx, const ArrayData& array, + Datum* out, const ArrayData& last_valid_value_chunk, + int64_t* last_valid_value_offset) { + ArrayData* output = out->array().get(); + output->length = array.length; + int8_t direction = 1; + + if (array.MayHaveNulls()) { + ARROW_ASSIGN_OR_RAISE( + auto null_bitmap, + arrow::internal::CopyBitmap(ctx->memory_pool(), array.buffers[0]->data(), + array.offset, array.length)); + return FillNullExecutor::ExecFillNull(ctx, array, null_bitmap->data(), output, + direction, last_valid_value_chunk, + last_valid_value_offset); + } else { + if (array.length > 0) { + *last_valid_value_offset = array.length - 1; + } + *output = array; + } + return Status::OK(); + } + + static Status FillNullForwardChunkedArray(KernelContext* ctx, + const std::shared_ptr& values, + Datum* out) { + if (values->null_count() == 0) { + *out = Datum(values); + return Status::OK(); + } + if (values->null_count() == values->length()) { + *out = Datum(values); + return Status::OK(); + } + + ArrayVector new_chunks; + if (values->length() > 0) { + ArrayData* array_with_current = values->chunk(/*first_chunk=*/0)->data().get(); + int64_t last_valid_value_offset = -1; + + for (const auto& chunk : values->chunks()) { + if (is_fixed_width(out->type()->id())) { + auto* output = out->mutable_array(); + auto bit_width = checked_cast(*output->type).bit_width(); + auto data_bytes = bit_util::BytesForBits(bit_width * chunk->length()); + ARROW_ASSIGN_OR_RAISE(output->buffers[0], ctx->AllocateBitmap(chunk->length())); + ARROW_ASSIGN_OR_RAISE(output->buffers[1], ctx->Allocate(data_bytes)); + } + RETURN_NOT_OK(FillNullForwardArray(ctx, *chunk->data(), out, *array_with_current, + &last_valid_value_offset)); + if (chunk->null_count() != chunk->length()) { + array_with_current = &*chunk->data(); + } + new_chunks.push_back(MakeArray(out->make_array()->data()->Copy())); + } + } + + auto output = std::make_shared(std::move(new_chunks), values->type()); + *out = Datum(output); + return Status::OK(); + } + + static std::shared_ptr GetSignature(detail::GetTypeId get_id) { + return KernelSignature::Make({InputType::Array(get_id.id)}, OutputType(FirstType)); + } +}; + +template +struct FillNullBackwardFunctor { + static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) { + switch (batch[0].kind()) { + case Datum::ARRAY: { + auto array_input = *batch[0].array(); + int64_t last_valid_value_offset = -1; + return FillNullBackwardArray(ctx, array_input, out, array_input, + &last_valid_value_offset); + } + case Datum::CHUNKED_ARRAY: { + return FillNullBackwardChunkedArray(ctx, batch[0].chunked_array(), out); + } + default: + break; + } + return Status::NotImplemented("Unsupported type for fill_null_backward operation: ", + batch[0].ToString()); + } + + static Status FillNullBackwardArray(KernelContext* ctx, const ArrayData& array, + Datum* out, const ArrayData& last_valid_value_chunk, + int64_t* last_valid_value_offset) { + ArrayData* output = out->array().get(); + output->length = array.length; + int8_t direction = -1; + + if (array.MayHaveNulls()) { + ARROW_ASSIGN_OR_RAISE( + auto reversed_bitmap, + arrow::internal::ReverseBitmap(ctx->memory_pool(), array.buffers[0]->data(), + array.offset, array.length)); + return FillNullExecutor::ExecFillNull( + ctx, array, reversed_bitmap->data(), output, direction, last_valid_value_chunk, + last_valid_value_offset); + } else { + if (array.length > 0) { + *last_valid_value_offset = 0; + } + *output = array; + } + return Status::OK(); + } + + static Status FillNullBackwardChunkedArray(KernelContext* ctx, + const std::shared_ptr& values, + Datum* out) { + if (values->null_count() == 0) { + *out = Datum(values); + return Status::OK(); + } + if (values->null_count() == values->length()) { + *out = Datum(values); + return Status::OK(); + } + std::vector> new_chunks; + + if (values->length() > 0) { + auto chunks_length = static_cast(values->chunks().size()); + ArrayData* array_with_current = + values->chunk(/*first_chunk=*/chunks_length - 1)->data().get(); + int64_t last_valid_value_offset = -1; + auto chunks = values->chunks(); + for (int i = chunks_length - 1; i >= 0; --i) { + const auto& chunk = chunks[i]; + if (is_fixed_width(out->type()->id())) { + auto* output = out->mutable_array(); + auto bit_width = checked_cast(*output->type).bit_width(); + auto data_bytes = bit_util::BytesForBits(bit_width * chunk->length()); + ARROW_ASSIGN_OR_RAISE(output->buffers[0], ctx->AllocateBitmap(chunk->length())); + ARROW_ASSIGN_OR_RAISE(output->buffers[1], ctx->Allocate(data_bytes)); + } + RETURN_NOT_OK(FillNullBackwardArray(ctx, *chunk->data(), out, *array_with_current, + &last_valid_value_offset)); + if (chunk->null_count() != chunk->length()) { + array_with_current = &*chunk->data(); + } + new_chunks.push_back(MakeArray(out->make_array()->data()->Copy())); + } + } + + std::reverse(new_chunks.begin(), new_chunks.end()); + auto output = std::make_shared(std::move(new_chunks), values->type()); + *out = Datum(output); + return Status::OK(); + } + + static std::shared_ptr GetSignature(detail::GetTypeId get_id) { + return KernelSignature::Make({InputType::Array(get_id.id)}, OutputType(FirstType)); + } +}; +} // namespace + +template