Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions cpp/src/arrow/compute/api_aggregate.cc
Original file line number Diff line number Diff line change
Expand Up @@ -77,5 +77,9 @@ Result<Datum> TDigest(const Datum& value, const TDigestOptions& options,
return CallFunction("tdigest", {value}, &options, ctx);
}

Result<Datum> Index(const Datum& value, IndexOptions options, ExecContext* ctx) {
return CallFunction("index", {value}, &options, ctx);
}

} // namespace compute
} // namespace arrow
19 changes: 19 additions & 0 deletions cpp/src/arrow/compute/api_aggregate.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,13 @@ struct ARROW_EXPORT TDigestOptions : public FunctionOptions {
uint32_t buffer_size;
};

/// \brief Control Index kernel behavior
struct ARROW_EXPORT IndexOptions : public FunctionOptions {
explicit IndexOptions(std::shared_ptr<Scalar> value) : value{std::move(value)} {}

std::shared_ptr<Scalar> value;
};

/// @}

/// \brief Count non-null (or null) values in an array.
Expand Down Expand Up @@ -293,6 +300,18 @@ Result<Datum> TDigest(const Datum& value,
const TDigestOptions& options = TDigestOptions::Defaults(),
ExecContext* ctx = NULLPTR);

/// \brief Find the first index of a value in an array.
///
/// \param[in] value The array to search.
/// \param[in] options The array to search for. See IndexOoptions.
/// \param[in] ctx the function execution context, optional
/// \return out a Scalar containing the index (or -1 if not found).
///
/// \since 5.0.0
/// \note API not yet finalized
ARROW_EXPORT
Result<Datum> Index(const Datum& value, IndexOptions options, ExecContext* ctx = NULLPTR);

namespace internal {

/// Internal use only: streaming group identifier.
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/compute/kernel.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ struct ARROW_EXPORT KernelState {
/// \brief Context/state for the execution of a particular kernel.
class ARROW_EXPORT KernelContext {
public:
explicit KernelContext(ExecContext* exec_ctx) : exec_ctx_(exec_ctx) {}
explicit KernelContext(ExecContext* exec_ctx) : exec_ctx_(exec_ctx), state_() {}

/// \brief Allocate buffer from the context's memory pool. The contents are
/// not initialized.
Expand Down
141 changes: 141 additions & 0 deletions cpp/src/arrow/compute/kernels/aggregate_basic.cc
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,131 @@ Result<std::unique_ptr<KernelState>> AllInit(KernelContext*, const KernelInitArg
return ::arrow::internal::make_unique<BooleanAllImpl>();
}

// ----------------------------------------------------------------------
// Index implementation

template <typename ArgType>
struct IndexImpl : public ScalarAggregator {
using ArgValue = typename internal::GetViewType<ArgType>::T;

explicit IndexImpl(IndexOptions options, KernelState* raw_state)
: options(std::move(options)), seen(0), index(-1) {
if (auto state = static_cast<IndexImpl<ArgType>*>(raw_state)) {
seen = state->seen;
index = state->index;
}
}

Status Consume(KernelContext* ctx, const ExecBatch& batch) override {
// short-circuit
if (index >= 0 || !options.value->is_valid) {
return Status::OK();
}

auto input = batch[0].array();
seen = input->length;
const ArgValue desired = internal::UnboxScalar<ArgType>::Unbox(*options.value);
int64_t i = 0;

ARROW_UNUSED(internal::VisitArrayValuesInline<ArgType>(
*input,
[&](ArgValue v) -> Status {
if (v == desired) {
index = i;
return Status::Cancelled("Found");
} else {
++i;
return Status::OK();
}
},
[&]() -> Status {
++i;
return Status::OK();
}));

return Status::OK();
}

Status MergeFrom(KernelContext*, KernelState&& src) override {
const auto& other = checked_cast<const IndexImpl&>(src);
if (index < 0 && other.index >= 0) {
index = seen + other.index;
}
seen += other.seen;
return Status::OK();
}

Status Finalize(KernelContext*, Datum* out) override {
out->value = std::make_shared<Int64Scalar>(index >= 0 ? index : -1);
return Status::OK();
}

const IndexOptions options;
int64_t seen = 0;
int64_t index = -1;
};

struct IndexInit {
std::unique_ptr<KernelState> state;
KernelContext* ctx;
const IndexOptions& options;
const DataType& type;

IndexInit(KernelContext* ctx, const IndexOptions& options, const DataType& type)
: ctx(ctx), options(options), type(type) {}

Status Visit(const DataType& type) {
return Status::NotImplemented("Index kernel not implemented for ", type.ToString());
}

Status Visit(const BooleanType&) {
state.reset(new IndexImpl<BooleanType>(options, ctx->state()));
return Status::OK();
}

template <typename Type>
enable_if_number<Type, Status> Visit(const Type&) {
state.reset(new IndexImpl<Type>(options, ctx->state()));
return Status::OK();
}

template <typename Type>
enable_if_base_binary<Type, Status> Visit(const Type&) {
state.reset(new IndexImpl<Type>(options, ctx->state()));
return Status::OK();
}

template <typename Type>
enable_if_date<Type, Status> Visit(const Type&) {
state.reset(new IndexImpl<Type>(options, ctx->state()));
return Status::OK();
}

template <typename Type>
enable_if_time<Type, Status> Visit(const Type&) {
state.reset(new IndexImpl<Type>(options, ctx->state()));
return Status::OK();
}

template <typename Type>
enable_if_timestamp<Type, Status> Visit(const Type&) {
state.reset(new IndexImpl<Type>(options, ctx->state()));
return Status::OK();
}

Result<std::unique_ptr<KernelState>> Create() {
RETURN_NOT_OK(VisitTypeInline(type, this));
return std::move(state);
}

static Result<std::unique_ptr<KernelState>> Init(KernelContext* ctx,
const KernelInitArgs& args) {
IndexInit visitor(ctx, static_cast<const IndexOptions&>(*args.options),
*args.inputs[0].type);
return visitor.Create();
}
};

void AddBasicAggKernels(KernelInit init,
const std::vector<std::shared_ptr<DataType>>& types,
std::shared_ptr<DataType> out_ty, ScalarAggregateFunction* func,
Expand Down Expand Up @@ -290,6 +415,12 @@ const FunctionDoc all_doc{"Test whether all elements in a boolean array evaluate
("Null values are ignored."),
{"array"}};

const FunctionDoc index_doc{"Find the index of the first occurrence of a given value",
("The result is always computed as an int64_t, regardless\n"
"of the offset type of the input array."),
{"array"},
"IndexOptions"};

} // namespace

void RegisterScalarAggregateBasic(FunctionRegistry* registry) {
Expand Down Expand Up @@ -374,6 +505,16 @@ void RegisterScalarAggregateBasic(FunctionRegistry* registry) {
func = std::make_shared<ScalarAggregateFunction>("all", Arity::Unary(), &all_doc);
aggregate::AddBasicAggKernels(aggregate::AllInit, {boolean()}, boolean(), func.get());
DCHECK_OK(registry->AddFunction(std::move(func)));

// index
func = std::make_shared<ScalarAggregateFunction>("index", Arity::Unary(), &index_doc);
aggregate::AddBasicAggKernels(aggregate::IndexInit::Init, BaseBinaryTypes(), int64(),
func.get());
aggregate::AddBasicAggKernels(aggregate::IndexInit::Init, PrimitiveTypes(), int64(),
func.get());
aggregate::AddBasicAggKernels(aggregate::IndexInit::Init, TemporalTypes(), int64(),
func.get());
DCHECK_OK(registry->AddFunction(std::move(func)));
}

} // namespace internal
Expand Down
156 changes: 156 additions & 0 deletions cpp/src/arrow/compute/kernels/aggregate_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1044,6 +1044,162 @@ TEST_F(TestAllKernel, Basics) {
this->AssertAllIs(chunked_input5, false);
}

//
// Index
//

template <typename ArrowType>
class TestIndexKernel : public ::testing::Test {
public:
using ScalarType = typename TypeTraits<ArrowType>::ScalarType;
void AssertIndexIs(const Datum& array, const std::shared_ptr<ScalarType>& value,
int64_t expected) {
IndexOptions options(value);
ASSERT_OK_AND_ASSIGN(Datum out, Index(array, options));
const Int64Scalar& out_index = out.scalar_as<Int64Scalar>();
ASSERT_EQ(out_index.value, expected);
}

void AssertIndexIs(const std::string& json, const std::shared_ptr<ScalarType>& value,
int64_t expected) {
SCOPED_TRACE("Value: " + value->ToString());
SCOPED_TRACE("Input: " + json);
auto array = ArrayFromJSON(type_singleton(), json);
AssertIndexIs(array, value, expected);
}

void AssertIndexIs(const std::vector<std::string>& json,
const std::shared_ptr<ScalarType>& value, int64_t expected) {
SCOPED_TRACE("Value: " + value->ToString());
auto array = ChunkedArrayFromJSON(type_singleton(), json);
SCOPED_TRACE("Input: " + array->ToString());
AssertIndexIs(array, value, expected);
}

std::shared_ptr<DataType> type_singleton() { return std::make_shared<ArrowType>(); }
};

template <typename ArrowType>
class TestNumericIndexKernel : public TestIndexKernel<ArrowType> {
public:
using CType = typename TypeTraits<ArrowType>::CType;
};
TYPED_TEST_SUITE(TestNumericIndexKernel, NumericArrowTypes);
TYPED_TEST(TestNumericIndexKernel, Basics) {
std::vector<std::string> chunked_input0 = {"[]", "[0]"};
std::vector<std::string> chunked_input1 = {"[1, 0, null]", "[0, 0]"};
std::vector<std::string> chunked_input2 = {"[1, 1, 1]", "[1, 0]", "[0, 1]"};
std::vector<std::string> chunked_input3 = {"[1, 1, 1]", "[1, 1]"};
std::vector<std::string> chunked_input4 = {"[1, 1, 1]", "[1, 1]", "[0]"};

auto value = std::make_shared<typename TestFixture::ScalarType>(
static_cast<typename TestFixture::CType>(0));
auto null_value = std::make_shared<typename TestFixture::ScalarType>(
static_cast<typename TestFixture::CType>(0));
null_value->is_valid = false;

this->AssertIndexIs("[]", value, -1);
this->AssertIndexIs("[0]", value, 0);
this->AssertIndexIs("[1, 2, 3, 4]", value, -1);
this->AssertIndexIs("[1, 2, 3, 4, 0]", value, 4);
this->AssertIndexIs("[null, null, null]", value, -1);
this->AssertIndexIs("[null, null, null]", null_value, -1);
this->AssertIndexIs("[0, null, null]", null_value, -1);
this->AssertIndexIs(chunked_input0, value, 0);
this->AssertIndexIs(chunked_input1, value, 1);
this->AssertIndexIs(chunked_input2, value, 4);
this->AssertIndexIs(chunked_input3, value, -1);
this->AssertIndexIs(chunked_input4, value, 5);
}
TYPED_TEST(TestNumericIndexKernel, Random) {
constexpr auto kChunks = 4;
auto rand = random::RandomArrayGenerator(0x5487655);
auto value = std::make_shared<typename TestFixture::ScalarType>(
static_cast<typename TestFixture::CType>(0));

// Test chunked array sizes from 32 to 2048
for (size_t i = 3; i <= 9; i += 2) {
const int64_t chunk_length = static_cast<int64_t>(1) << i;
ArrayVector chunks;
for (int i = 0; i < kChunks; i++) {
chunks.push_back(
rand.ArrayOf(this->type_singleton(), chunk_length, /*null_probability=*/0.1));
}
ChunkedArray chunked_array(std::move(chunks));

int64_t expected = -1;
int64_t index = 0;
for (auto chunk : chunked_array.chunks()) {
auto typed_chunk = arrow::internal::checked_pointer_cast<
typename TypeTraits<TypeParam>::ArrayType>(chunk);
for (auto value : *typed_chunk) {
if (value.has_value() &&
value.value() == static_cast<typename TestFixture::CType>(0)) {
expected = index;
break;
}
index++;
}
if (expected >= 0) break;
}

this->AssertIndexIs(Datum(chunked_array), value, expected);
}
}

template <typename ArrowType>
class TestDateTimeIndexKernel : public TestIndexKernel<ArrowType> {};
TYPED_TEST_SUITE(TestDateTimeIndexKernel, TemporalArrowTypes);
TYPED_TEST(TestDateTimeIndexKernel, Basics) {
auto type = this->type_singleton();
auto value = std::make_shared<typename TestFixture::ScalarType>(42, type);
auto null_value = std::make_shared<typename TestFixture::ScalarType>(42, type);
null_value->is_valid = false;

this->AssertIndexIs("[]", value, -1);
this->AssertIndexIs("[42]", value, 0);
this->AssertIndexIs("[84, 84, 84, 84]", value, -1);
this->AssertIndexIs("[84, 84, 84, 84, 42]", value, 4);
this->AssertIndexIs("[null, null, null]", value, -1);
this->AssertIndexIs("[null, null, null]", null_value, -1);
this->AssertIndexIs("[42, null, null]", null_value, -1);
}

template <typename ArrowType>
class TestBooleanIndexKernel : public TestIndexKernel<ArrowType> {};
TYPED_TEST_SUITE(TestBooleanIndexKernel, ::testing::Types<BooleanType>);
TYPED_TEST(TestBooleanIndexKernel, Basics) {
auto value = std::make_shared<typename TestFixture::ScalarType>(true);
auto null_value = std::make_shared<typename TestFixture::ScalarType>(true);
null_value->is_valid = false;

this->AssertIndexIs("[]", value, -1);
this->AssertIndexIs("[true]", value, 0);
this->AssertIndexIs("[false, false, false, false]", value, -1);
this->AssertIndexIs("[false, false, false, false, true]", value, 4);
this->AssertIndexIs("[null, null, null]", value, -1);
this->AssertIndexIs("[null, null, null]", null_value, -1);
this->AssertIndexIs("[true, null, null]", null_value, -1);
}

template <typename ArrowType>
class TestStringIndexKernel : public TestIndexKernel<ArrowType> {};
TYPED_TEST_SUITE(TestStringIndexKernel, BinaryTypes);
TYPED_TEST(TestStringIndexKernel, Basics) {
auto buffer = Buffer::FromString("foo");
auto value = std::make_shared<typename TestFixture::ScalarType>(buffer);
auto null_value = std::make_shared<typename TestFixture::ScalarType>(buffer);
null_value->is_valid = false;

this->AssertIndexIs(R"([])", value, -1);
this->AssertIndexIs(R"(["foo"])", value, 0);
this->AssertIndexIs(R"(["bar", "bar", "bar", "bar"])", value, -1);
this->AssertIndexIs(R"(["bar", "bar", "bar", "bar", "foo"])", value, 4);
this->AssertIndexIs(R"([null, null, null])", value, -1);
this->AssertIndexIs(R"([null, null, null])", null_value, -1);
this->AssertIndexIs(R"(["foo", null, null])", null_value, -1);
}

//
// Mode
//
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/compute/kernels/codegen_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ template <typename T, typename VisitFunc, typename NullFunc>
static typename arrow::internal::call_traits::enable_if_return<VisitFunc, Status>::type
VisitArrayValuesInline(const ArrayData& arr, VisitFunc&& valid_func,
NullFunc&& null_func) {
VisitArrayDataInline<T>(
return VisitArrayDataInline<T>(
arr,
[&](typename GetViewType<T>::PhysicalType v) {
return valid_func(GetViewType<T>::LogicalValue(std::move(v)));
Expand Down
2 changes: 2 additions & 0 deletions docs/source/cpp/compute.rst
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,8 @@ Aggregations
+--------------------------+------------+--------------------+-----------------------+--------------------------------------------+
| count | Unary | Any | Scalar Int64 | :struct:`ScalarAggregateOptions` |
+--------------------------+------------+--------------------+-----------------------+--------------------------------------------+
| index | Unary | Any | Scalar Int64 | :struct:`IndexOptions` |
+--------------------------+------------+--------------------+-----------------------+--------------------------------------------+
| mean | Unary | Numeric | Scalar Float64 | :struct:`ScalarAggregateOptions` |
+--------------------------+------------+--------------------+-----------------------+--------------------------------------------+
| min_max | Unary | Numeric | Scalar Struct (1) | :struct:`ScalarAggregateOptions` |
Expand Down
Loading