From 97307954d0afcac54db19879aafca19311ae2fac Mon Sep 17 00:00:00 2001
From: David Li
Date: Wed, 19 May 2021 14:01:30 -0400
Subject: [PATCH 1/5] ARROW-2665: [C++][Python] Add index() kernel
---
cpp/src/arrow/compute/api_aggregate.cc | 4 +
cpp/src/arrow/compute/api_aggregate.h | 19 +++
cpp/src/arrow/compute/kernel.h | 2 +-
.../arrow/compute/kernels/aggregate_basic.cc | 75 +++++++++
.../arrow/compute/kernels/aggregate_test.cc | 147 ++++++++++++++++++
docs/source/cpp/compute.rst | 2 +
docs/source/python/api/compute.rst | 1 +
python/pyarrow/_compute.pyx | 26 ++++
python/pyarrow/array.pxi | 8 +
python/pyarrow/compute.py | 35 +++++
python/pyarrow/includes/libarrow.pxd | 4 +
python/pyarrow/table.pxi | 8 +
python/pyarrow/tests/test_compute.py | 14 ++
13 files changed, 344 insertions(+), 1 deletion(-)
diff --git a/cpp/src/arrow/compute/api_aggregate.cc b/cpp/src/arrow/compute/api_aggregate.cc
index dca54a0faba..967c8179da7 100644
--- a/cpp/src/arrow/compute/api_aggregate.cc
+++ b/cpp/src/arrow/compute/api_aggregate.cc
@@ -77,5 +77,9 @@ Result TDigest(const Datum& value, const TDigestOptions& options,
return CallFunction("tdigest", {value}, &options, ctx);
}
+Result Index(const Datum& value, IndexOptions options, ExecContext* ctx) {
+ return CallFunction("index", {value}, &options, ctx);
+}
+
} // namespace compute
} // namespace arrow
diff --git a/cpp/src/arrow/compute/api_aggregate.h b/cpp/src/arrow/compute/api_aggregate.h
index a7ceb2ac2fd..d781bbb6205 100644
--- a/cpp/src/arrow/compute/api_aggregate.h
+++ b/cpp/src/arrow/compute/api_aggregate.h
@@ -126,6 +126,13 @@ struct ARROW_EXPORT TDigestOptions : public FunctionOptions {
uint32_t buffer_size;
};
+/// \brief Control Index kernel behavior
+struct ARROW_EXPORT IndexOptions : public FunctionOptions {
+ explicit IndexOptions(std::shared_ptr value) : value{std::move(value)} {}
+
+ std::shared_ptr value;
+};
+
/// @}
/// \brief Count non-null (or null) values in an array.
@@ -293,6 +300,18 @@ Result TDigest(const Datum& value,
const TDigestOptions& options = TDigestOptions::Defaults(),
ExecContext* ctx = NULLPTR);
+/// \brief Find the first index of a value in an array.
+///
+/// \param[in] value The array to search.
+/// \param[in] options The array to search for. See IndexOoptions.
+/// \param[in] ctx the function execution context, optional
+/// \return out a Scalar containing the index (or -1 if not found).
+///
+/// \since 5.0.0
+/// \note API not yet finalized
+ARROW_EXPORT
+Result Index(const Datum& value, IndexOptions options, ExecContext* ctx = NULLPTR);
+
namespace internal {
/// Internal use only: streaming group identifier.
diff --git a/cpp/src/arrow/compute/kernel.h b/cpp/src/arrow/compute/kernel.h
index 0fecea080d8..0d5fa147727 100644
--- a/cpp/src/arrow/compute/kernel.h
+++ b/cpp/src/arrow/compute/kernel.h
@@ -52,7 +52,7 @@ struct ARROW_EXPORT KernelState {
/// \brief Context/state for the execution of a particular kernel.
class ARROW_EXPORT KernelContext {
public:
- explicit KernelContext(ExecContext* exec_ctx) : exec_ctx_(exec_ctx) {}
+ explicit KernelContext(ExecContext* exec_ctx) : exec_ctx_(exec_ctx), state_() {}
/// \brief Allocate buffer from the context's memory pool. The contents are
/// not initialized.
diff --git a/cpp/src/arrow/compute/kernels/aggregate_basic.cc b/cpp/src/arrow/compute/kernels/aggregate_basic.cc
index 446c1b9fc62..e783c6f6d4d 100644
--- a/cpp/src/arrow/compute/kernels/aggregate_basic.cc
+++ b/cpp/src/arrow/compute/kernels/aggregate_basic.cc
@@ -225,6 +225,65 @@ Result> AllInit(KernelContext*, const KernelInitArg
return ::arrow::internal::make_unique();
}
+// ----------------------------------------------------------------------
+// Index implementation
+
+struct IndexImpl : public ScalarAggregator {
+ explicit IndexImpl(IndexOptions options, int64_t seen, int64_t index)
+ : options(std::move(options)), seen{seen}, index{index} {}
+
+ Status Consume(KernelContext* ctx, const ExecBatch& batch) override {
+ // short-circuit
+ if (index >= 0 || !options.value->is_valid) {
+ return Status::OK();
+ }
+
+ const auto& data = *batch[0].array();
+ seen = data.length;
+ ARROW_ASSIGN_OR_RAISE(
+ auto result, CallFunction("equal", {data, options.value}, ctx->exec_context()));
+ const auto found = result.array_as();
+ // TODO: could be done with CountLeadingZeros adjusting for alignment/null bitmap
+ for (int64_t i = 0; i < found->length(); i++) {
+ if (found->IsValid(i) && found->Value(i)) {
+ index = i;
+ break;
+ }
+ }
+ return Status::OK();
+ }
+
+ Status MergeFrom(KernelContext*, KernelState&& src) override {
+ const auto& other = checked_cast(src);
+ if (index < 0 && other.index >= 0) {
+ index = seen + other.index;
+ }
+ seen += other.seen;
+ return Status::OK();
+ }
+
+ Status Finalize(KernelContext*, Datum* out) override {
+ out->value = std::make_shared(index >= 0 ? index : -1);
+ return Status::OK();
+ }
+
+ const IndexOptions options;
+ int64_t seen = 0;
+ int64_t index = -1;
+};
+
+Result> IndexInit(KernelContext* ctx,
+ const KernelInitArgs& args) {
+ int64_t seen = 0;
+ int64_t index = -1;
+ if (auto state = static_cast(ctx->state())) {
+ seen = state->seen;
+ index = state->index;
+ }
+ return ::arrow::internal::make_unique(
+ static_cast(*args.options), seen, index);
+}
+
void AddBasicAggKernels(KernelInit init,
const std::vector>& types,
std::shared_ptr out_ty, ScalarAggregateFunction* func,
@@ -290,6 +349,12 @@ const FunctionDoc all_doc{"Test whether all elements in a boolean array evaluate
("Null values are ignored."),
{"array"}};
+const FunctionDoc index_doc{"Find the index of the first occurrence of a given value",
+ ("The result is always computed as an int64_t, regardless\n"
+ "of the offset type of the input array."),
+ {"array"},
+ "IndexOptions"};
+
} // namespace
void RegisterScalarAggregateBasic(FunctionRegistry* registry) {
@@ -374,6 +439,16 @@ void RegisterScalarAggregateBasic(FunctionRegistry* registry) {
func = std::make_shared("all", Arity::Unary(), &all_doc);
aggregate::AddBasicAggKernels(aggregate::AllInit, {boolean()}, boolean(), func.get());
DCHECK_OK(registry->AddFunction(std::move(func)));
+
+ // index
+ func = std::make_shared("index", Arity::Unary(), &index_doc);
+ aggregate::AddBasicAggKernels(aggregate::IndexInit, BaseBinaryTypes(), int64(),
+ func.get());
+ aggregate::AddBasicAggKernels(aggregate::IndexInit, PrimitiveTypes(), int64(),
+ func.get());
+ aggregate::AddBasicAggKernels(aggregate::IndexInit, TemporalTypes(), int64(),
+ func.get());
+ DCHECK_OK(registry->AddFunction(std::move(func)));
}
} // namespace internal
diff --git a/cpp/src/arrow/compute/kernels/aggregate_test.cc b/cpp/src/arrow/compute/kernels/aggregate_test.cc
index f1a2863e97d..537a02f8913 100644
--- a/cpp/src/arrow/compute/kernels/aggregate_test.cc
+++ b/cpp/src/arrow/compute/kernels/aggregate_test.cc
@@ -1044,6 +1044,153 @@ TEST_F(TestAllKernel, Basics) {
this->AssertAllIs(chunked_input5, false);
}
+//
+// Index
+//
+
+template
+class TestIndexKernel : public ::testing::Test {
+ public:
+ using ScalarType = typename TypeTraits::ScalarType;
+ void AssertIndexIs(const Datum& array, const std::shared_ptr& value,
+ int64_t expected) {
+ IndexOptions options(value);
+ ASSERT_OK_AND_ASSIGN(Datum out, Index(array, options));
+ const Int64Scalar& out_index = out.scalar_as();
+ ASSERT_EQ(out_index.value, expected);
+ }
+
+ void AssertIndexIs(const std::string& json, const std::shared_ptr& value,
+ int64_t expected) {
+ SCOPED_TRACE("Value: " + value->ToString());
+ SCOPED_TRACE("Input: " + json);
+ auto array = ArrayFromJSON(type_singleton(), json);
+ AssertIndexIs(array, value, expected);
+ }
+
+ void AssertIndexIs(const std::vector& json,
+ const std::shared_ptr& value, int64_t expected) {
+ auto array = ChunkedArrayFromJSON(type_singleton(), json);
+ AssertIndexIs(array, value, expected);
+ }
+
+ std::shared_ptr type_singleton() { return std::make_shared(); }
+};
+
+template
+class TestNumericIndexKernel : public TestIndexKernel {};
+TYPED_TEST_SUITE(TestNumericIndexKernel, NumericArrowTypes);
+TYPED_TEST(TestNumericIndexKernel, Basics) {
+ std::vector chunked_input0 = {"[]", "[0]"};
+ std::vector chunked_input1 = {"[1, 0, null]", "[0, 0]"};
+ std::vector chunked_input2 = {"[1, 1, 1]", "[1, 0]", "[0, 1]"};
+ std::vector chunked_input3 = {"[1, 1, 1]", "[1, 1]"};
+ std::vector chunked_input4 = {"[1, 1, 1]", "[1, 1]", "[0]"};
+
+ auto value = std::make_shared(0);
+ auto null_value = std::make_shared(0);
+ null_value->is_valid = false;
+
+ this->AssertIndexIs("[]", value, -1);
+ this->AssertIndexIs("[0]", value, 0);
+ this->AssertIndexIs("[1, 2, 3, 4]", value, -1);
+ this->AssertIndexIs("[1, 2, 3, 4, 0]", value, 4);
+ this->AssertIndexIs("[null, null, null]", value, -1);
+ this->AssertIndexIs("[null, null, null]", null_value, -1);
+ this->AssertIndexIs("[0, null, null]", null_value, -1);
+ this->AssertIndexIs(chunked_input0, value, 0);
+ this->AssertIndexIs(chunked_input1, value, 1);
+ this->AssertIndexIs(chunked_input2, value, 4);
+ this->AssertIndexIs(chunked_input3, value, -1);
+ this->AssertIndexIs(chunked_input4, value, 5);
+}
+TYPED_TEST(TestNumericIndexKernel, Random) {
+ constexpr auto kChunks = 4;
+ auto rand = random::RandomArrayGenerator(0x5487655);
+ auto value = std::make_shared(0);
+
+ // Test chunked array sizes from 32 to 2048
+ for (size_t i = 3; i <= 9; i += 2) {
+ const int64_t chunk_length = 1UL << i;
+ ArrayVector chunks;
+ for (int i = 0; i < kChunks; i++) {
+ chunks.push_back(
+ rand.ArrayOf(this->type_singleton(), chunk_length, /*null_probability=*/0.1));
+ }
+ ChunkedArray chunked_array(std::move(chunks));
+
+ int64_t expected = -1;
+ int64_t index = 0;
+ for (auto chunk : chunked_array.chunks()) {
+ auto typed_chunk = arrow::internal::checked_pointer_cast<
+ typename TypeTraits::ArrayType>(chunk);
+ for (auto value : *typed_chunk) {
+ if (value.has_value() && value.value() == 0) {
+ expected = index;
+ break;
+ }
+ index++;
+ }
+ if (expected >= 0) break;
+ }
+
+ this->AssertIndexIs(Datum(chunked_array), value, expected);
+ }
+}
+
+template
+class TestDateTimeIndexKernel : public TestIndexKernel {};
+TYPED_TEST_SUITE(TestDateTimeIndexKernel, TemporalArrowTypes);
+TYPED_TEST(TestDateTimeIndexKernel, Basics) {
+ auto type = this->type_singleton();
+ auto value = std::make_shared(42, type);
+ auto null_value = std::make_shared(42, type);
+ null_value->is_valid = false;
+
+ this->AssertIndexIs("[]", value, -1);
+ this->AssertIndexIs("[42]", value, 0);
+ this->AssertIndexIs("[84, 84, 84, 84]", value, -1);
+ this->AssertIndexIs("[84, 84, 84, 84, 42]", value, 4);
+ this->AssertIndexIs("[null, null, null]", value, -1);
+ this->AssertIndexIs("[null, null, null]", null_value, -1);
+ this->AssertIndexIs("[42, null, null]", null_value, -1);
+}
+
+template
+class TestBooleanIndexKernel : public TestIndexKernel {};
+TYPED_TEST_SUITE(TestBooleanIndexKernel, ::testing::Types);
+TYPED_TEST(TestBooleanIndexKernel, Basics) {
+ auto value = std::make_shared(true);
+ auto null_value = std::make_shared(true);
+ null_value->is_valid = false;
+
+ this->AssertIndexIs("[]", value, -1);
+ this->AssertIndexIs("[true]", value, 0);
+ this->AssertIndexIs("[false, false, false, false]", value, -1);
+ this->AssertIndexIs("[false, false, false, false, true]", value, 4);
+ this->AssertIndexIs("[null, null, null]", value, -1);
+ this->AssertIndexIs("[null, null, null]", null_value, -1);
+ this->AssertIndexIs("[true, null, null]", null_value, -1);
+}
+
+template
+class TestStringIndexKernel : public TestIndexKernel {};
+TYPED_TEST_SUITE(TestStringIndexKernel, BinaryTypes);
+TYPED_TEST(TestStringIndexKernel, Basics) {
+ auto buffer = Buffer::FromString("foo");
+ auto value = std::make_shared(buffer);
+ auto null_value = std::make_shared(buffer);
+ null_value->is_valid = false;
+
+ this->AssertIndexIs(R"([])", value, -1);
+ this->AssertIndexIs(R"(["foo"])", value, 0);
+ this->AssertIndexIs(R"(["bar", "bar", "bar", "bar"])", value, -1);
+ this->AssertIndexIs(R"(["bar", "bar", "bar", "bar", "foo"])", value, 4);
+ this->AssertIndexIs(R"([null, null, null])", value, -1);
+ this->AssertIndexIs(R"([null, null, null])", null_value, -1);
+ this->AssertIndexIs(R"(["foo", null, null])", null_value, -1);
+}
+
//
// Mode
//
diff --git a/docs/source/cpp/compute.rst b/docs/source/cpp/compute.rst
index b20af43ac37..cb48e084d2e 100644
--- a/docs/source/cpp/compute.rst
+++ b/docs/source/cpp/compute.rst
@@ -192,6 +192,8 @@ Aggregations
+--------------------------+------------+--------------------+-----------------------+--------------------------------------------+
| count | Unary | Any | Scalar Int64 | :struct:`ScalarAggregateOptions` |
+--------------------------+------------+--------------------+-----------------------+--------------------------------------------+
+| index | Unary | Any | Scalar Int64 | :struct:`IndexOptions` |
++--------------------------+------------+--------------------+-----------------------+--------------------------------------------+
| mean | Unary | Numeric | Scalar Float64 | :struct:`ScalarAggregateOptions` |
+--------------------------+------------+--------------------+-----------------------+--------------------------------------------+
| min_max | Unary | Numeric | Scalar Struct (1) | :struct:`ScalarAggregateOptions` |
diff --git a/docs/source/python/api/compute.rst b/docs/source/python/api/compute.rst
index 2e841c54886..3d1a4eec5f0 100644
--- a/docs/source/python/api/compute.rst
+++ b/docs/source/python/api/compute.rst
@@ -28,6 +28,7 @@ Aggregations
:toctree: ../generated/
count
+ index
mean
min_max
mode
diff --git a/python/pyarrow/_compute.pyx b/python/pyarrow/_compute.pyx
index aea72c457e0..debea53b17b 100644
--- a/python/pyarrow/_compute.pyx
+++ b/python/pyarrow/_compute.pyx
@@ -841,6 +841,32 @@ class ScalarAggregateOptions(_ScalarAggregateOptions):
self._set_options(skip_nulls, min_count)
+cdef class _IndexOptions(FunctionOptions):
+ cdef:
+ unique_ptr[CIndexOptions] index_options
+
+ cdef const CFunctionOptions* get_options(self) except NULL:
+ return self.index_options.get()
+
+ def _set_options(self, Scalar scalar):
+ self.index_options.reset(
+ new CIndexOptions(pyarrow_unwrap_scalar(scalar)))
+
+
+class IndexOptions(_IndexOptions):
+ """
+ Options for the index kernel.
+
+ Parameters
+ ----------
+ value : Scalar
+ The value to search for.
+ """
+
+ def __init__(self, value):
+ self._set_options(value)
+
+
cdef class _ModeOptions(FunctionOptions):
cdef:
unique_ptr[CModeOptions] mode_options
diff --git a/python/pyarrow/array.pxi b/python/pyarrow/array.pxi
index 3da5033ac47..278b29000f6 100644
--- a/python/pyarrow/array.pxi
+++ b/python/pyarrow/array.pxi
@@ -1122,6 +1122,14 @@ cdef class Array(_PandasConvertible):
"""
return _pc().filter(self, mask, null_selection_behavior)
+ def index(self, value, start=None, end=None, *, memory_pool=None):
+ """
+ Find the first index of a value.
+
+ See pyarrow.compute.index for full usage.
+ """
+ return _pc().index(self, value, start, end, memory_pool=memory_pool)
+
def _to_pandas(self, options, **kwargs):
return _array_like_to_pandas(self, options)
diff --git a/python/pyarrow/compute.py b/python/pyarrow/compute.py
index cb6ba475b5f..03a099d6abf 100644
--- a/python/pyarrow/compute.py
+++ b/python/pyarrow/compute.py
@@ -34,6 +34,7 @@
DictionaryEncodeOptions,
ExtractRegexOptions,
FilterOptions,
+ IndexOptions,
MatchSubstringOptions,
ModeOptions,
ScalarAggregateOptions,
@@ -435,6 +436,40 @@ def filter(data, mask, null_selection_behavior='drop'):
return call_function('filter', [data, mask], options)
+def index(data, value, start=None, end=None, *, memory_pool=None):
+ """
+ Find the index of the first occurrence of a given value.
+
+ Parameters
+ ----------
+ data : Array or ChunkedArray
+ value : Scalar-like object
+ start : int, optional
+ end : int, optional
+
+ Returns
+ -------
+ index : the index, or -1 if not found
+ """
+ if start is not None:
+ if end is not None:
+ data = data.slice(start, end - start)
+ else:
+ data = data.slice(start)
+ elif end is not None:
+ data = data.slice(0, end)
+
+ if not isinstance(value, pa.Scalar):
+ value = pa.scalar(value, type=data.type)
+ elif data.type != value.type:
+ value = pa.scalar(value.as_py(), type=data.type)
+ options = IndexOptions(value=value)
+ result = call_function('index', [data], options, memory_pool)
+ if start is not None and result.as_py() >= 0:
+ result = pa.scalar(result.as_py() + start, type=pa.int64())
+ return result
+
+
def take(data, indices, *, boundscheck=True, memory_pool=None):
"""
Select values (or records) from array- or table-like data given integer
diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd
index 9b05359bdf4..a8306b47798 100644
--- a/python/pyarrow/includes/libarrow.pxd
+++ b/python/pyarrow/includes/libarrow.pxd
@@ -1883,6 +1883,10 @@ cdef extern from "arrow/compute/api.h" namespace "arrow::compute" nogil:
CModeOptions(int64_t n)
int64_t n
+ cdef cppclass CIndexOptions \
+ "arrow::compute::IndexOptions"(CFunctionOptions):
+ CIndexOptions(shared_ptr[CScalar] value)
+
cdef cppclass CPartitionNthOptions \
"arrow::compute::PartitionNthOptions"(CFunctionOptions):
CPartitionNthOptions(int64_t pivot)
diff --git a/python/pyarrow/table.pxi b/python/pyarrow/table.pxi
index 662c0e39fd9..f9dcb2aa60b 100644
--- a/python/pyarrow/table.pxi
+++ b/python/pyarrow/table.pxi
@@ -382,6 +382,14 @@ cdef class ChunkedArray(_PandasConvertible):
"""
return _pc().filter(self, mask, null_selection_behavior)
+ def index(self, value, start=None, end=None, *, memory_pool=None):
+ """
+ Find the first index of a value.
+
+ See pyarrow.compute.index for full usage.
+ """
+ return _pc().index(self, value, start, end, memory_pool=memory_pool)
+
def take(self, object indices):
"""
Select values from a chunked array. See pyarrow.compute.take for full
diff --git a/python/pyarrow/tests/test_compute.py b/python/pyarrow/tests/test_compute.py
index db4c6ba1fe5..b5312e3e11b 100644
--- a/python/pyarrow/tests/test_compute.py
+++ b/python/pyarrow/tests/test_compute.py
@@ -1114,6 +1114,20 @@ def test_count():
pc.count(arr, min_count='zzz')
+def test_index():
+ arr = pa.array([0, 1, None, 3, 4], type=pa.int64())
+ assert pc.index(arr, pa.scalar(0)).as_py() == 0
+ assert pc.index(arr, pa.scalar(2, type=pa.int8())).as_py() == -1
+ assert pc.index(arr, 4).as_py() == 4
+ assert arr.index(3, start=2).as_py() == 3
+ assert arr.index(None).as_py() == -1
+
+ arr = pa.chunked_array([[1, 2], [1, 3]], type=pa.int64())
+ assert arr.index(1).as_py() == 0
+ assert arr.index(1, start=2).as_py() == 2
+ assert arr.index(1, start=1, end=2).as_py() == -1
+
+
def test_partition_nth():
data = list(range(100, 140))
random.shuffle(data)
From 836115294089e4d17a5e113f0d70e1a9fff5e730 Mon Sep 17 00:00:00 2001
From: David Li
Date: Wed, 19 May 2021 16:18:59 -0400
Subject: [PATCH 2/5] ARROW-2665: [C++] Try to satisfy MSVC
---
.../arrow/compute/kernels/aggregate_test.cc | 19 +++++++++++++------
1 file changed, 13 insertions(+), 6 deletions(-)
diff --git a/cpp/src/arrow/compute/kernels/aggregate_test.cc b/cpp/src/arrow/compute/kernels/aggregate_test.cc
index 537a02f8913..6b097859ca5 100644
--- a/cpp/src/arrow/compute/kernels/aggregate_test.cc
+++ b/cpp/src/arrow/compute/kernels/aggregate_test.cc
@@ -1078,7 +1078,10 @@ class TestIndexKernel : public ::testing::Test {
};
template
-class TestNumericIndexKernel : public TestIndexKernel {};
+class TestNumericIndexKernel : public TestIndexKernel {
+ public:
+ using CType = typename TypeTraits::CType;
+};
TYPED_TEST_SUITE(TestNumericIndexKernel, NumericArrowTypes);
TYPED_TEST(TestNumericIndexKernel, Basics) {
std::vector chunked_input0 = {"[]", "[0]"};
@@ -1087,8 +1090,10 @@ TYPED_TEST(TestNumericIndexKernel, Basics) {
std::vector chunked_input3 = {"[1, 1, 1]", "[1, 1]"};
std::vector chunked_input4 = {"[1, 1, 1]", "[1, 1]", "[0]"};
- auto value = std::make_shared(0);
- auto null_value = std::make_shared(0);
+ auto value = std::make_shared(
+ static_cast(0));
+ auto null_value = std::make_shared(
+ static_cast(0));
null_value->is_valid = false;
this->AssertIndexIs("[]", value, -1);
@@ -1107,11 +1112,12 @@ TYPED_TEST(TestNumericIndexKernel, Basics) {
TYPED_TEST(TestNumericIndexKernel, Random) {
constexpr auto kChunks = 4;
auto rand = random::RandomArrayGenerator(0x5487655);
- auto value = std::make_shared(0);
+ auto value = std::make_shared(
+ static_cast(0));
// Test chunked array sizes from 32 to 2048
for (size_t i = 3; i <= 9; i += 2) {
- const int64_t chunk_length = 1UL << i;
+ const int64_t chunk_length = static_cast(1) << i;
ArrayVector chunks;
for (int i = 0; i < kChunks; i++) {
chunks.push_back(
@@ -1125,7 +1131,8 @@ TYPED_TEST(TestNumericIndexKernel, Random) {
auto typed_chunk = arrow::internal::checked_pointer_cast<
typename TypeTraits::ArrayType>(chunk);
for (auto value : *typed_chunk) {
- if (value.has_value() && value.value() == 0) {
+ if (value.has_value() &&
+ value.value() == static_cast(0)) {
expected = index;
break;
}
From 4eb39f8df3ce2ad2d19f0cc3f3e62aaf1cc40ee2 Mon Sep 17 00:00:00 2001
From: David Li
Date: Thu, 20 May 2021 10:39:15 -0400
Subject: [PATCH 3/5] ARROW-2665: [C++] Add more context to tests
---
cpp/src/arrow/compute/kernels/aggregate_test.cc | 2 ++
1 file changed, 2 insertions(+)
diff --git a/cpp/src/arrow/compute/kernels/aggregate_test.cc b/cpp/src/arrow/compute/kernels/aggregate_test.cc
index 6b097859ca5..476caab03d5 100644
--- a/cpp/src/arrow/compute/kernels/aggregate_test.cc
+++ b/cpp/src/arrow/compute/kernels/aggregate_test.cc
@@ -1070,7 +1070,9 @@ class TestIndexKernel : public ::testing::Test {
void AssertIndexIs(const std::vector& json,
const std::shared_ptr& value, int64_t expected) {
+ SCOPED_TRACE("Value: " + value->ToString());
auto array = ChunkedArrayFromJSON(type_singleton(), json);
+ SCOPED_TRACE("Input: " + array->ToString());
AssertIndexIs(array, value, expected);
}
From a8c57c18a4c12633d93c75f52c85cd5ee38e0e0b Mon Sep 17 00:00:00 2001
From: David Li
Date: Thu, 20 May 2021 10:39:47 -0400
Subject: [PATCH 4/5] ARROW-2665: [C++] Make Index kernel standalone
---
.../arrow/compute/kernels/aggregate_basic.cc | 104 ++++++++++++++----
1 file changed, 81 insertions(+), 23 deletions(-)
diff --git a/cpp/src/arrow/compute/kernels/aggregate_basic.cc b/cpp/src/arrow/compute/kernels/aggregate_basic.cc
index e783c6f6d4d..65fdc0c6549 100644
--- a/cpp/src/arrow/compute/kernels/aggregate_basic.cc
+++ b/cpp/src/arrow/compute/kernels/aggregate_basic.cc
@@ -228,9 +228,15 @@ Result> AllInit(KernelContext*, const KernelInitArg
// ----------------------------------------------------------------------
// Index implementation
+template
struct IndexImpl : public ScalarAggregator {
- explicit IndexImpl(IndexOptions options, int64_t seen, int64_t index)
- : options(std::move(options)), seen{seen}, index{index} {}
+ explicit IndexImpl(IndexOptions options, KernelState* raw_state)
+ : options(std::move(options)), seen(0), index(-1) {
+ if (auto state = static_cast*>(raw_state)) {
+ seen = state->seen;
+ index = state->index;
+ }
+ }
Status Consume(KernelContext* ctx, const ExecBatch& batch) override {
// short-circuit
@@ -238,14 +244,17 @@ struct IndexImpl : public ScalarAggregator {
return Status::OK();
}
- const auto& data = *batch[0].array();
- seen = data.length;
- ARROW_ASSIGN_OR_RAISE(
- auto result, CallFunction("equal", {data, options.value}, ctx->exec_context()));
- const auto found = result.array_as();
- // TODO: could be done with CountLeadingZeros adjusting for alignment/null bitmap
- for (int64_t i = 0; i < found->length(); i++) {
- if (found->IsValid(i) && found->Value(i)) {
+ auto input = batch[0].array();
+ seen = input->length;
+ internal::ArrayIterator it(*input);
+ auto desired = internal::UnboxScalar::Unbox(*options.value);
+ const bool is_valid = !input->MayHaveNulls();
+ const uint8_t* null_bitmap_data =
+ input->GetValuesSafe(0, /*absolute_offset=*/0);
+ for (int64_t i = 0; i < input->length; i++) {
+ auto val = it(); // Advances iterator
+ if ((is_valid || BitUtil::GetBit(null_bitmap_data, i + input->offset)) &&
+ val == desired) {
index = i;
break;
}
@@ -272,17 +281,66 @@ struct IndexImpl : public ScalarAggregator {
int64_t index = -1;
};
-Result> IndexInit(KernelContext* ctx,
- const KernelInitArgs& args) {
- int64_t seen = 0;
- int64_t index = -1;
- if (auto state = static_cast(ctx->state())) {
- seen = state->seen;
- index = state->index;
+struct IndexInit {
+ std::unique_ptr state;
+ KernelContext* ctx;
+ const IndexOptions& options;
+ const DataType& type;
+
+ IndexInit(KernelContext* ctx, const IndexOptions& options, const DataType& type)
+ : ctx(ctx), options(options), type(type) {}
+
+ Status Visit(const DataType& type) {
+ return Status::NotImplemented("Index kernel not implemented for ", type.ToString());
}
- return ::arrow::internal::make_unique(
- static_cast(*args.options), seen, index);
-}
+
+ Status Visit(const BooleanType&) {
+ state.reset(new IndexImpl(options, ctx->state()));
+ return Status::OK();
+ }
+
+ template
+ enable_if_number Visit(const Type&) {
+ state.reset(new IndexImpl(options, ctx->state()));
+ return Status::OK();
+ }
+
+ template
+ enable_if_base_binary Visit(const Type&) {
+ state.reset(new IndexImpl(options, ctx->state()));
+ return Status::OK();
+ }
+
+ template
+ enable_if_date Visit(const Type&) {
+ state.reset(new IndexImpl(options, ctx->state()));
+ return Status::OK();
+ }
+
+ template
+ enable_if_time Visit(const Type&) {
+ state.reset(new IndexImpl(options, ctx->state()));
+ return Status::OK();
+ }
+
+ template
+ enable_if_timestamp Visit(const Type&) {
+ state.reset(new IndexImpl(options, ctx->state()));
+ return Status::OK();
+ }
+
+ Result> Create() {
+ RETURN_NOT_OK(VisitTypeInline(type, this));
+ return std::move(state);
+ }
+
+ static Result> Init(KernelContext* ctx,
+ const KernelInitArgs& args) {
+ IndexInit visitor(ctx, static_cast(*args.options),
+ *args.inputs[0].type);
+ return visitor.Create();
+ }
+};
void AddBasicAggKernels(KernelInit init,
const std::vector>& types,
@@ -442,11 +500,11 @@ void RegisterScalarAggregateBasic(FunctionRegistry* registry) {
// index
func = std::make_shared("index", Arity::Unary(), &index_doc);
- aggregate::AddBasicAggKernels(aggregate::IndexInit, BaseBinaryTypes(), int64(),
+ aggregate::AddBasicAggKernels(aggregate::IndexInit::Init, BaseBinaryTypes(), int64(),
func.get());
- aggregate::AddBasicAggKernels(aggregate::IndexInit, PrimitiveTypes(), int64(),
+ aggregate::AddBasicAggKernels(aggregate::IndexInit::Init, PrimitiveTypes(), int64(),
func.get());
- aggregate::AddBasicAggKernels(aggregate::IndexInit, TemporalTypes(), int64(),
+ aggregate::AddBasicAggKernels(aggregate::IndexInit::Init, TemporalTypes(), int64(),
func.get());
DCHECK_OK(registry->AddFunction(std::move(func)));
}
From 29604cd1b76de1ac7777f4a6fa4c8b272d40d213 Mon Sep 17 00:00:00 2001
From: Yibo Cai
Date: Fri, 21 May 2021 06:03:09 +0000
Subject: [PATCH 5/5] Fix merge conflicts and use VisitArrayValuesInline
---
.../arrow/compute/kernels/aggregate_basic.cc | 34 ++++++++++++-------
.../arrow/compute/kernels/codegen_internal.h | 2 +-
2 files changed, 22 insertions(+), 14 deletions(-)
diff --git a/cpp/src/arrow/compute/kernels/aggregate_basic.cc b/cpp/src/arrow/compute/kernels/aggregate_basic.cc
index 65fdc0c6549..1ea63cdc4a0 100644
--- a/cpp/src/arrow/compute/kernels/aggregate_basic.cc
+++ b/cpp/src/arrow/compute/kernels/aggregate_basic.cc
@@ -230,6 +230,8 @@ Result> AllInit(KernelContext*, const KernelInitArg
template
struct IndexImpl : public ScalarAggregator {
+ using ArgValue = typename internal::GetViewType::T;
+
explicit IndexImpl(IndexOptions options, KernelState* raw_state)
: options(std::move(options)), seen(0), index(-1) {
if (auto state = static_cast*>(raw_state)) {
@@ -246,19 +248,25 @@ struct IndexImpl : public ScalarAggregator {
auto input = batch[0].array();
seen = input->length;
- internal::ArrayIterator it(*input);
- auto desired = internal::UnboxScalar::Unbox(*options.value);
- const bool is_valid = !input->MayHaveNulls();
- const uint8_t* null_bitmap_data =
- input->GetValuesSafe(0, /*absolute_offset=*/0);
- for (int64_t i = 0; i < input->length; i++) {
- auto val = it(); // Advances iterator
- if ((is_valid || BitUtil::GetBit(null_bitmap_data, i + input->offset)) &&
- val == desired) {
- index = i;
- break;
- }
- }
+ const ArgValue desired = internal::UnboxScalar::Unbox(*options.value);
+ int64_t i = 0;
+
+ ARROW_UNUSED(internal::VisitArrayValuesInline(
+ *input,
+ [&](ArgValue v) -> Status {
+ if (v == desired) {
+ index = i;
+ return Status::Cancelled("Found");
+ } else {
+ ++i;
+ return Status::OK();
+ }
+ },
+ [&]() -> Status {
+ ++i;
+ return Status::OK();
+ }));
+
return Status::OK();
}
diff --git a/cpp/src/arrow/compute/kernels/codegen_internal.h b/cpp/src/arrow/compute/kernels/codegen_internal.h
index 7b394565f7d..e31771a89ca 100644
--- a/cpp/src/arrow/compute/kernels/codegen_internal.h
+++ b/cpp/src/arrow/compute/kernels/codegen_internal.h
@@ -349,7 +349,7 @@ template
static typename arrow::internal::call_traits::enable_if_return::type
VisitArrayValuesInline(const ArrayData& arr, VisitFunc&& valid_func,
NullFunc&& null_func) {
- VisitArrayDataInline(
+ return VisitArrayDataInline(
arr,
[&](typename GetViewType::PhysicalType v) {
return valid_func(GetViewType::LogicalValue(std::move(v)));