diff --git a/cpp/src/arrow/compute/api_aggregate.cc b/cpp/src/arrow/compute/api_aggregate.cc index dca54a0faba..967c8179da7 100644 --- a/cpp/src/arrow/compute/api_aggregate.cc +++ b/cpp/src/arrow/compute/api_aggregate.cc @@ -77,5 +77,9 @@ Result TDigest(const Datum& value, const TDigestOptions& options, return CallFunction("tdigest", {value}, &options, ctx); } +Result Index(const Datum& value, IndexOptions options, ExecContext* ctx) { + return CallFunction("index", {value}, &options, ctx); +} + } // namespace compute } // namespace arrow diff --git a/cpp/src/arrow/compute/api_aggregate.h b/cpp/src/arrow/compute/api_aggregate.h index a7ceb2ac2fd..d781bbb6205 100644 --- a/cpp/src/arrow/compute/api_aggregate.h +++ b/cpp/src/arrow/compute/api_aggregate.h @@ -126,6 +126,13 @@ struct ARROW_EXPORT TDigestOptions : public FunctionOptions { uint32_t buffer_size; }; +/// \brief Control Index kernel behavior +struct ARROW_EXPORT IndexOptions : public FunctionOptions { + explicit IndexOptions(std::shared_ptr value) : value{std::move(value)} {} + + std::shared_ptr value; +}; + /// @} /// \brief Count non-null (or null) values in an array. @@ -293,6 +300,18 @@ Result TDigest(const Datum& value, const TDigestOptions& options = TDigestOptions::Defaults(), ExecContext* ctx = NULLPTR); +/// \brief Find the first index of a value in an array. +/// +/// \param[in] value The array to search. +/// \param[in] options The array to search for. See IndexOoptions. +/// \param[in] ctx the function execution context, optional +/// \return out a Scalar containing the index (or -1 if not found). +/// +/// \since 5.0.0 +/// \note API not yet finalized +ARROW_EXPORT +Result Index(const Datum& value, IndexOptions options, ExecContext* ctx = NULLPTR); + namespace internal { /// Internal use only: streaming group identifier. diff --git a/cpp/src/arrow/compute/kernel.h b/cpp/src/arrow/compute/kernel.h index 0fecea080d8..0d5fa147727 100644 --- a/cpp/src/arrow/compute/kernel.h +++ b/cpp/src/arrow/compute/kernel.h @@ -52,7 +52,7 @@ struct ARROW_EXPORT KernelState { /// \brief Context/state for the execution of a particular kernel. class ARROW_EXPORT KernelContext { public: - explicit KernelContext(ExecContext* exec_ctx) : exec_ctx_(exec_ctx) {} + explicit KernelContext(ExecContext* exec_ctx) : exec_ctx_(exec_ctx), state_() {} /// \brief Allocate buffer from the context's memory pool. The contents are /// not initialized. diff --git a/cpp/src/arrow/compute/kernels/aggregate_basic.cc b/cpp/src/arrow/compute/kernels/aggregate_basic.cc index 446c1b9fc62..1ea63cdc4a0 100644 --- a/cpp/src/arrow/compute/kernels/aggregate_basic.cc +++ b/cpp/src/arrow/compute/kernels/aggregate_basic.cc @@ -225,6 +225,131 @@ Result> AllInit(KernelContext*, const KernelInitArg return ::arrow::internal::make_unique(); } +// ---------------------------------------------------------------------- +// Index implementation + +template +struct IndexImpl : public ScalarAggregator { + using ArgValue = typename internal::GetViewType::T; + + explicit IndexImpl(IndexOptions options, KernelState* raw_state) + : options(std::move(options)), seen(0), index(-1) { + if (auto state = static_cast*>(raw_state)) { + seen = state->seen; + index = state->index; + } + } + + Status Consume(KernelContext* ctx, const ExecBatch& batch) override { + // short-circuit + if (index >= 0 || !options.value->is_valid) { + return Status::OK(); + } + + auto input = batch[0].array(); + seen = input->length; + const ArgValue desired = internal::UnboxScalar::Unbox(*options.value); + int64_t i = 0; + + ARROW_UNUSED(internal::VisitArrayValuesInline( + *input, + [&](ArgValue v) -> Status { + if (v == desired) { + index = i; + return Status::Cancelled("Found"); + } else { + ++i; + return Status::OK(); + } + }, + [&]() -> Status { + ++i; + return Status::OK(); + })); + + return Status::OK(); + } + + Status MergeFrom(KernelContext*, KernelState&& src) override { + const auto& other = checked_cast(src); + if (index < 0 && other.index >= 0) { + index = seen + other.index; + } + seen += other.seen; + return Status::OK(); + } + + Status Finalize(KernelContext*, Datum* out) override { + out->value = std::make_shared(index >= 0 ? index : -1); + return Status::OK(); + } + + const IndexOptions options; + int64_t seen = 0; + int64_t index = -1; +}; + +struct IndexInit { + std::unique_ptr state; + KernelContext* ctx; + const IndexOptions& options; + const DataType& type; + + IndexInit(KernelContext* ctx, const IndexOptions& options, const DataType& type) + : ctx(ctx), options(options), type(type) {} + + Status Visit(const DataType& type) { + return Status::NotImplemented("Index kernel not implemented for ", type.ToString()); + } + + Status Visit(const BooleanType&) { + state.reset(new IndexImpl(options, ctx->state())); + return Status::OK(); + } + + template + enable_if_number Visit(const Type&) { + state.reset(new IndexImpl(options, ctx->state())); + return Status::OK(); + } + + template + enable_if_base_binary Visit(const Type&) { + state.reset(new IndexImpl(options, ctx->state())); + return Status::OK(); + } + + template + enable_if_date Visit(const Type&) { + state.reset(new IndexImpl(options, ctx->state())); + return Status::OK(); + } + + template + enable_if_time Visit(const Type&) { + state.reset(new IndexImpl(options, ctx->state())); + return Status::OK(); + } + + template + enable_if_timestamp Visit(const Type&) { + state.reset(new IndexImpl(options, ctx->state())); + return Status::OK(); + } + + Result> Create() { + RETURN_NOT_OK(VisitTypeInline(type, this)); + return std::move(state); + } + + static Result> Init(KernelContext* ctx, + const KernelInitArgs& args) { + IndexInit visitor(ctx, static_cast(*args.options), + *args.inputs[0].type); + return visitor.Create(); + } +}; + void AddBasicAggKernels(KernelInit init, const std::vector>& types, std::shared_ptr out_ty, ScalarAggregateFunction* func, @@ -290,6 +415,12 @@ const FunctionDoc all_doc{"Test whether all elements in a boolean array evaluate ("Null values are ignored."), {"array"}}; +const FunctionDoc index_doc{"Find the index of the first occurrence of a given value", + ("The result is always computed as an int64_t, regardless\n" + "of the offset type of the input array."), + {"array"}, + "IndexOptions"}; + } // namespace void RegisterScalarAggregateBasic(FunctionRegistry* registry) { @@ -374,6 +505,16 @@ void RegisterScalarAggregateBasic(FunctionRegistry* registry) { func = std::make_shared("all", Arity::Unary(), &all_doc); aggregate::AddBasicAggKernels(aggregate::AllInit, {boolean()}, boolean(), func.get()); DCHECK_OK(registry->AddFunction(std::move(func))); + + // index + func = std::make_shared("index", Arity::Unary(), &index_doc); + aggregate::AddBasicAggKernels(aggregate::IndexInit::Init, BaseBinaryTypes(), int64(), + func.get()); + aggregate::AddBasicAggKernels(aggregate::IndexInit::Init, PrimitiveTypes(), int64(), + func.get()); + aggregate::AddBasicAggKernels(aggregate::IndexInit::Init, TemporalTypes(), int64(), + func.get()); + DCHECK_OK(registry->AddFunction(std::move(func))); } } // namespace internal diff --git a/cpp/src/arrow/compute/kernels/aggregate_test.cc b/cpp/src/arrow/compute/kernels/aggregate_test.cc index f1a2863e97d..476caab03d5 100644 --- a/cpp/src/arrow/compute/kernels/aggregate_test.cc +++ b/cpp/src/arrow/compute/kernels/aggregate_test.cc @@ -1044,6 +1044,162 @@ TEST_F(TestAllKernel, Basics) { this->AssertAllIs(chunked_input5, false); } +// +// Index +// + +template +class TestIndexKernel : public ::testing::Test { + public: + using ScalarType = typename TypeTraits::ScalarType; + void AssertIndexIs(const Datum& array, const std::shared_ptr& value, + int64_t expected) { + IndexOptions options(value); + ASSERT_OK_AND_ASSIGN(Datum out, Index(array, options)); + const Int64Scalar& out_index = out.scalar_as(); + ASSERT_EQ(out_index.value, expected); + } + + void AssertIndexIs(const std::string& json, const std::shared_ptr& value, + int64_t expected) { + SCOPED_TRACE("Value: " + value->ToString()); + SCOPED_TRACE("Input: " + json); + auto array = ArrayFromJSON(type_singleton(), json); + AssertIndexIs(array, value, expected); + } + + void AssertIndexIs(const std::vector& json, + const std::shared_ptr& value, int64_t expected) { + SCOPED_TRACE("Value: " + value->ToString()); + auto array = ChunkedArrayFromJSON(type_singleton(), json); + SCOPED_TRACE("Input: " + array->ToString()); + AssertIndexIs(array, value, expected); + } + + std::shared_ptr type_singleton() { return std::make_shared(); } +}; + +template +class TestNumericIndexKernel : public TestIndexKernel { + public: + using CType = typename TypeTraits::CType; +}; +TYPED_TEST_SUITE(TestNumericIndexKernel, NumericArrowTypes); +TYPED_TEST(TestNumericIndexKernel, Basics) { + std::vector chunked_input0 = {"[]", "[0]"}; + std::vector chunked_input1 = {"[1, 0, null]", "[0, 0]"}; + std::vector chunked_input2 = {"[1, 1, 1]", "[1, 0]", "[0, 1]"}; + std::vector chunked_input3 = {"[1, 1, 1]", "[1, 1]"}; + std::vector chunked_input4 = {"[1, 1, 1]", "[1, 1]", "[0]"}; + + auto value = std::make_shared( + static_cast(0)); + auto null_value = std::make_shared( + static_cast(0)); + null_value->is_valid = false; + + this->AssertIndexIs("[]", value, -1); + this->AssertIndexIs("[0]", value, 0); + this->AssertIndexIs("[1, 2, 3, 4]", value, -1); + this->AssertIndexIs("[1, 2, 3, 4, 0]", value, 4); + this->AssertIndexIs("[null, null, null]", value, -1); + this->AssertIndexIs("[null, null, null]", null_value, -1); + this->AssertIndexIs("[0, null, null]", null_value, -1); + this->AssertIndexIs(chunked_input0, value, 0); + this->AssertIndexIs(chunked_input1, value, 1); + this->AssertIndexIs(chunked_input2, value, 4); + this->AssertIndexIs(chunked_input3, value, -1); + this->AssertIndexIs(chunked_input4, value, 5); +} +TYPED_TEST(TestNumericIndexKernel, Random) { + constexpr auto kChunks = 4; + auto rand = random::RandomArrayGenerator(0x5487655); + auto value = std::make_shared( + static_cast(0)); + + // Test chunked array sizes from 32 to 2048 + for (size_t i = 3; i <= 9; i += 2) { + const int64_t chunk_length = static_cast(1) << i; + ArrayVector chunks; + for (int i = 0; i < kChunks; i++) { + chunks.push_back( + rand.ArrayOf(this->type_singleton(), chunk_length, /*null_probability=*/0.1)); + } + ChunkedArray chunked_array(std::move(chunks)); + + int64_t expected = -1; + int64_t index = 0; + for (auto chunk : chunked_array.chunks()) { + auto typed_chunk = arrow::internal::checked_pointer_cast< + typename TypeTraits::ArrayType>(chunk); + for (auto value : *typed_chunk) { + if (value.has_value() && + value.value() == static_cast(0)) { + expected = index; + break; + } + index++; + } + if (expected >= 0) break; + } + + this->AssertIndexIs(Datum(chunked_array), value, expected); + } +} + +template +class TestDateTimeIndexKernel : public TestIndexKernel {}; +TYPED_TEST_SUITE(TestDateTimeIndexKernel, TemporalArrowTypes); +TYPED_TEST(TestDateTimeIndexKernel, Basics) { + auto type = this->type_singleton(); + auto value = std::make_shared(42, type); + auto null_value = std::make_shared(42, type); + null_value->is_valid = false; + + this->AssertIndexIs("[]", value, -1); + this->AssertIndexIs("[42]", value, 0); + this->AssertIndexIs("[84, 84, 84, 84]", value, -1); + this->AssertIndexIs("[84, 84, 84, 84, 42]", value, 4); + this->AssertIndexIs("[null, null, null]", value, -1); + this->AssertIndexIs("[null, null, null]", null_value, -1); + this->AssertIndexIs("[42, null, null]", null_value, -1); +} + +template +class TestBooleanIndexKernel : public TestIndexKernel {}; +TYPED_TEST_SUITE(TestBooleanIndexKernel, ::testing::Types); +TYPED_TEST(TestBooleanIndexKernel, Basics) { + auto value = std::make_shared(true); + auto null_value = std::make_shared(true); + null_value->is_valid = false; + + this->AssertIndexIs("[]", value, -1); + this->AssertIndexIs("[true]", value, 0); + this->AssertIndexIs("[false, false, false, false]", value, -1); + this->AssertIndexIs("[false, false, false, false, true]", value, 4); + this->AssertIndexIs("[null, null, null]", value, -1); + this->AssertIndexIs("[null, null, null]", null_value, -1); + this->AssertIndexIs("[true, null, null]", null_value, -1); +} + +template +class TestStringIndexKernel : public TestIndexKernel {}; +TYPED_TEST_SUITE(TestStringIndexKernel, BinaryTypes); +TYPED_TEST(TestStringIndexKernel, Basics) { + auto buffer = Buffer::FromString("foo"); + auto value = std::make_shared(buffer); + auto null_value = std::make_shared(buffer); + null_value->is_valid = false; + + this->AssertIndexIs(R"([])", value, -1); + this->AssertIndexIs(R"(["foo"])", value, 0); + this->AssertIndexIs(R"(["bar", "bar", "bar", "bar"])", value, -1); + this->AssertIndexIs(R"(["bar", "bar", "bar", "bar", "foo"])", value, 4); + this->AssertIndexIs(R"([null, null, null])", value, -1); + this->AssertIndexIs(R"([null, null, null])", null_value, -1); + this->AssertIndexIs(R"(["foo", null, null])", null_value, -1); +} + // // Mode // diff --git a/cpp/src/arrow/compute/kernels/codegen_internal.h b/cpp/src/arrow/compute/kernels/codegen_internal.h index 7b394565f7d..e31771a89ca 100644 --- a/cpp/src/arrow/compute/kernels/codegen_internal.h +++ b/cpp/src/arrow/compute/kernels/codegen_internal.h @@ -349,7 +349,7 @@ template static typename arrow::internal::call_traits::enable_if_return::type VisitArrayValuesInline(const ArrayData& arr, VisitFunc&& valid_func, NullFunc&& null_func) { - VisitArrayDataInline( + return VisitArrayDataInline( arr, [&](typename GetViewType::PhysicalType v) { return valid_func(GetViewType::LogicalValue(std::move(v))); diff --git a/docs/source/cpp/compute.rst b/docs/source/cpp/compute.rst index b20af43ac37..cb48e084d2e 100644 --- a/docs/source/cpp/compute.rst +++ b/docs/source/cpp/compute.rst @@ -192,6 +192,8 @@ Aggregations +--------------------------+------------+--------------------+-----------------------+--------------------------------------------+ | count | Unary | Any | Scalar Int64 | :struct:`ScalarAggregateOptions` | +--------------------------+------------+--------------------+-----------------------+--------------------------------------------+ +| index | Unary | Any | Scalar Int64 | :struct:`IndexOptions` | ++--------------------------+------------+--------------------+-----------------------+--------------------------------------------+ | mean | Unary | Numeric | Scalar Float64 | :struct:`ScalarAggregateOptions` | +--------------------------+------------+--------------------+-----------------------+--------------------------------------------+ | min_max | Unary | Numeric | Scalar Struct (1) | :struct:`ScalarAggregateOptions` | diff --git a/docs/source/python/api/compute.rst b/docs/source/python/api/compute.rst index 2e841c54886..3d1a4eec5f0 100644 --- a/docs/source/python/api/compute.rst +++ b/docs/source/python/api/compute.rst @@ -28,6 +28,7 @@ Aggregations :toctree: ../generated/ count + index mean min_max mode diff --git a/python/pyarrow/_compute.pyx b/python/pyarrow/_compute.pyx index aea72c457e0..debea53b17b 100644 --- a/python/pyarrow/_compute.pyx +++ b/python/pyarrow/_compute.pyx @@ -841,6 +841,32 @@ class ScalarAggregateOptions(_ScalarAggregateOptions): self._set_options(skip_nulls, min_count) +cdef class _IndexOptions(FunctionOptions): + cdef: + unique_ptr[CIndexOptions] index_options + + cdef const CFunctionOptions* get_options(self) except NULL: + return self.index_options.get() + + def _set_options(self, Scalar scalar): + self.index_options.reset( + new CIndexOptions(pyarrow_unwrap_scalar(scalar))) + + +class IndexOptions(_IndexOptions): + """ + Options for the index kernel. + + Parameters + ---------- + value : Scalar + The value to search for. + """ + + def __init__(self, value): + self._set_options(value) + + cdef class _ModeOptions(FunctionOptions): cdef: unique_ptr[CModeOptions] mode_options diff --git a/python/pyarrow/array.pxi b/python/pyarrow/array.pxi index 3da5033ac47..278b29000f6 100644 --- a/python/pyarrow/array.pxi +++ b/python/pyarrow/array.pxi @@ -1122,6 +1122,14 @@ cdef class Array(_PandasConvertible): """ return _pc().filter(self, mask, null_selection_behavior) + def index(self, value, start=None, end=None, *, memory_pool=None): + """ + Find the first index of a value. + + See pyarrow.compute.index for full usage. + """ + return _pc().index(self, value, start, end, memory_pool=memory_pool) + def _to_pandas(self, options, **kwargs): return _array_like_to_pandas(self, options) diff --git a/python/pyarrow/compute.py b/python/pyarrow/compute.py index cb6ba475b5f..03a099d6abf 100644 --- a/python/pyarrow/compute.py +++ b/python/pyarrow/compute.py @@ -34,6 +34,7 @@ DictionaryEncodeOptions, ExtractRegexOptions, FilterOptions, + IndexOptions, MatchSubstringOptions, ModeOptions, ScalarAggregateOptions, @@ -435,6 +436,40 @@ def filter(data, mask, null_selection_behavior='drop'): return call_function('filter', [data, mask], options) +def index(data, value, start=None, end=None, *, memory_pool=None): + """ + Find the index of the first occurrence of a given value. + + Parameters + ---------- + data : Array or ChunkedArray + value : Scalar-like object + start : int, optional + end : int, optional + + Returns + ------- + index : the index, or -1 if not found + """ + if start is not None: + if end is not None: + data = data.slice(start, end - start) + else: + data = data.slice(start) + elif end is not None: + data = data.slice(0, end) + + if not isinstance(value, pa.Scalar): + value = pa.scalar(value, type=data.type) + elif data.type != value.type: + value = pa.scalar(value.as_py(), type=data.type) + options = IndexOptions(value=value) + result = call_function('index', [data], options, memory_pool) + if start is not None and result.as_py() >= 0: + result = pa.scalar(result.as_py() + start, type=pa.int64()) + return result + + def take(data, indices, *, boundscheck=True, memory_pool=None): """ Select values (or records) from array- or table-like data given integer diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index 9b05359bdf4..a8306b47798 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -1883,6 +1883,10 @@ cdef extern from "arrow/compute/api.h" namespace "arrow::compute" nogil: CModeOptions(int64_t n) int64_t n + cdef cppclass CIndexOptions \ + "arrow::compute::IndexOptions"(CFunctionOptions): + CIndexOptions(shared_ptr[CScalar] value) + cdef cppclass CPartitionNthOptions \ "arrow::compute::PartitionNthOptions"(CFunctionOptions): CPartitionNthOptions(int64_t pivot) diff --git a/python/pyarrow/table.pxi b/python/pyarrow/table.pxi index 662c0e39fd9..f9dcb2aa60b 100644 --- a/python/pyarrow/table.pxi +++ b/python/pyarrow/table.pxi @@ -382,6 +382,14 @@ cdef class ChunkedArray(_PandasConvertible): """ return _pc().filter(self, mask, null_selection_behavior) + def index(self, value, start=None, end=None, *, memory_pool=None): + """ + Find the first index of a value. + + See pyarrow.compute.index for full usage. + """ + return _pc().index(self, value, start, end, memory_pool=memory_pool) + def take(self, object indices): """ Select values from a chunked array. See pyarrow.compute.take for full diff --git a/python/pyarrow/tests/test_compute.py b/python/pyarrow/tests/test_compute.py index db4c6ba1fe5..b5312e3e11b 100644 --- a/python/pyarrow/tests/test_compute.py +++ b/python/pyarrow/tests/test_compute.py @@ -1114,6 +1114,20 @@ def test_count(): pc.count(arr, min_count='zzz') +def test_index(): + arr = pa.array([0, 1, None, 3, 4], type=pa.int64()) + assert pc.index(arr, pa.scalar(0)).as_py() == 0 + assert pc.index(arr, pa.scalar(2, type=pa.int8())).as_py() == -1 + assert pc.index(arr, 4).as_py() == 4 + assert arr.index(3, start=2).as_py() == 3 + assert arr.index(None).as_py() == -1 + + arr = pa.chunked_array([[1, 2], [1, 3]], type=pa.int64()) + assert arr.index(1).as_py() == 0 + assert arr.index(1, start=2).as_py() == 2 + assert arr.index(1, start=1, end=2).as_py() == -1 + + def test_partition_nth(): data = list(range(100, 140)) random.shuffle(data)