Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions cpp/src/arrow/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -389,8 +389,6 @@ if(ARROW_COMPUTE)
compute/exec/hash_join_dict.cc
compute/exec/hash_join_node.cc
compute/exec/ir_consumer.cc
compute/exec/key_compare.cc
compute/exec/key_encode.cc
compute/exec/key_hash.cc
compute/exec/key_map.cc
compute/exec/order_by_impl.cc
Expand Down Expand Up @@ -441,17 +439,22 @@ if(ARROW_COMPUTE)
compute/kernels/vector_nested.cc
compute/kernels/vector_replace.cc
compute/kernels/vector_selection.cc
compute/kernels/vector_sort.cc)
compute/kernels/vector_sort.cc
compute/row/encode.cc
compute/row/encode_internal.cc
compute/row/compare_internal.cc
compute/row/grouper.cc
compute/row/row_internal.cc)

append_avx2_src(compute/kernels/aggregate_basic_avx2.cc)
append_avx512_src(compute/kernels/aggregate_basic_avx512.cc)

append_avx2_src(compute/exec/bloom_filter_avx2.cc)
append_avx2_src(compute/exec/key_compare_avx2.cc)
append_avx2_src(compute/exec/key_encode_avx2.cc)
append_avx2_src(compute/exec/key_hash_avx2.cc)
append_avx2_src(compute/exec/key_map_avx2.cc)
append_avx2_src(compute/exec/util_avx2.cc)
append_avx2_src(compute/row/compare_internal_avx2.cc)
append_avx2_src(compute/row/encode_internal_avx2.cc)

list(APPEND ARROW_TESTING_SRCS compute/exec/test_util.cc)
endif()
Expand Down
1 change: 1 addition & 0 deletions cpp/src/arrow/compute/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 +71,4 @@ add_arrow_benchmark(function_benchmark PREFIX "arrow-compute")
add_subdirectory(kernels)

add_subdirectory(exec)
add_subdirectory(row)
93 changes: 15 additions & 78 deletions cpp/src/arrow/compute/api_aggregate.h
Original file line number Diff line number Diff line change
Expand Up @@ -395,84 +395,6 @@ Result<Datum> Index(const Datum& value, const IndexOptions& options,

namespace internal {

/// Internal use only: streaming group identifier.
/// Consumes batches of keys and yields batches of the group ids.
class ARROW_EXPORT Grouper {
public:
virtual ~Grouper() = default;

/// Construct a Grouper which receives the specified key types
static Result<std::unique_ptr<Grouper>> Make(const std::vector<ValueDescr>& descrs,
ExecContext* ctx = default_exec_context());

/// Consume a batch of keys, producing the corresponding group ids as an integer array.
/// Currently only uint32 indices will be produced, eventually the bit width will only
/// be as wide as necessary.
virtual Result<Datum> Consume(const ExecBatch& batch) = 0;

/// Get current unique keys. May be called multiple times.
virtual Result<ExecBatch> GetUniques() = 0;

/// Get the current number of groups.
virtual uint32_t num_groups() const = 0;

/// \brief Assemble lists of indices of identical elements.
///
/// \param[in] ids An unsigned, all-valid integral array which will be
/// used as grouping criteria.
/// \param[in] num_groups An upper bound for the elements of ids
/// \return A num_groups-long ListArray where the slot at i contains a
/// list of indices where i appears in ids.
///
/// MakeGroupings([
/// 2,
/// 2,
/// 5,
/// 5,
/// 2,
/// 3
/// ], 8) == [
/// [],
/// [],
/// [0, 1, 4],
/// [5],
/// [],
/// [2, 3],
/// [],
/// []
/// ]
static Result<std::shared_ptr<ListArray>> MakeGroupings(
const UInt32Array& ids, uint32_t num_groups,
ExecContext* ctx = default_exec_context());

/// \brief Produce a ListArray whose slots are selections of `array` which correspond to
/// the provided groupings.
///
/// For example,
/// ApplyGroupings([
/// [],
/// [],
/// [0, 1, 4],
/// [5],
/// [],
/// [2, 3],
/// [],
/// []
/// ], [2, 2, 5, 5, 2, 3]) == [
/// [],
/// [],
/// [2, 2, 2],
/// [3],
/// [],
/// [5, 5],
/// [],
/// []
/// ]
static Result<std::shared_ptr<ListArray>> ApplyGroupings(
const ListArray& groupings, const Array& array,
ExecContext* ctx = default_exec_context());
};

/// \brief Configure a grouped aggregation
struct ARROW_EXPORT Aggregate {
/// the name of the aggregation function
Expand All @@ -489,6 +411,21 @@ Result<Datum> GroupBy(const std::vector<Datum>& arguments, const std::vector<Dat
const std::vector<Aggregate>& aggregates, bool use_threads = false,
ExecContext* ctx = default_exec_context());

Result<std::vector<const HashAggregateKernel*>> GetKernels(
ExecContext* ctx, const std::vector<internal::Aggregate>& aggregates,
const std::vector<ValueDescr>& in_descrs);

Result<std::vector<std::unique_ptr<KernelState>>> InitKernels(
const std::vector<const HashAggregateKernel*>& kernels, ExecContext* ctx,
const std::vector<internal::Aggregate>& aggregates,
const std::vector<ValueDescr>& in_descrs);

Result<FieldVector> ResolveKernels(
const std::vector<internal::Aggregate>& aggregates,
const std::vector<const HashAggregateKernel*>& kernels,
const std::vector<std::unique_ptr<KernelState>>& states, ExecContext* ctx,
const std::vector<ValueDescr>& descrs);

} // namespace internal
} // namespace compute
} // namespace arrow
24 changes: 3 additions & 21 deletions cpp/src/arrow/compute/exec/aggregate_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "arrow/compute/exec/util.h"
#include "arrow/compute/exec_internal.h"
#include "arrow/compute/registry.h"
#include "arrow/compute/row/grouper.h"
#include "arrow/datum.h"
#include "arrow/result.h"
#include "arrow/util/checked_cast.h"
Expand All @@ -39,25 +40,6 @@ using internal::checked_cast;

namespace compute {

namespace internal {

Result<std::vector<const HashAggregateKernel*>> GetKernels(
ExecContext* ctx, const std::vector<internal::Aggregate>& aggregates,
const std::vector<ValueDescr>& in_descrs);

Result<std::vector<std::unique_ptr<KernelState>>> InitKernels(
const std::vector<const HashAggregateKernel*>& kernels, ExecContext* ctx,
const std::vector<internal::Aggregate>& aggregates,
const std::vector<ValueDescr>& in_descrs);

Result<FieldVector> ResolveKernels(
const std::vector<internal::Aggregate>& aggregates,
const std::vector<const HashAggregateKernel*>& kernels,
const std::vector<std::unique_ptr<KernelState>>& states, ExecContext* ctx,
const std::vector<ValueDescr>& descrs);

} // namespace internal

namespace {

void AggregatesToString(
Expand Down Expand Up @@ -647,7 +629,7 @@ class GroupByNode : public ExecNode {

private:
struct ThreadLocalState {
std::unique_ptr<internal::Grouper> grouper;
std::unique_ptr<Grouper> grouper;
std::vector<std::unique_ptr<KernelState>> agg_states;
};

Expand All @@ -670,7 +652,7 @@ class GroupByNode : public ExecNode {
}

// Construct grouper
ARROW_ASSIGN_OR_RAISE(state->grouper, internal::Grouper::Make(key_descrs, ctx_));
ARROW_ASSIGN_OR_RAISE(state->grouper, Grouper::Make(key_descrs, ctx_));

// Build vector of aggregate source field data types
std::vector<ValueDescr> agg_src_descrs(agg_kernels_.size());
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/compute/exec/hash_join.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
namespace arrow {
namespace compute {

using internal::RowEncoder;
using internal::KeyEncoder;

class HashJoinBasicImpl : public HashJoinImpl {
private:
Expand Down
Loading