Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions cpp/src/arrow/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -389,8 +389,6 @@ if(ARROW_COMPUTE)
compute/exec/hash_join_dict.cc
compute/exec/hash_join_node.cc
compute/exec/ir_consumer.cc
compute/exec/key_compare.cc
compute/exec/key_encode.cc
compute/exec/key_hash.cc
compute/exec/key_map.cc
compute/exec/order_by_impl.cc
Expand Down Expand Up @@ -441,17 +439,22 @@ if(ARROW_COMPUTE)
compute/kernels/vector_nested.cc
compute/kernels/vector_replace.cc
compute/kernels/vector_selection.cc
compute/kernels/vector_sort.cc)
compute/kernels/vector_sort.cc
compute/row/encode.cc
compute/row/encode_internal.cc
compute/row/compare_internal.cc
compute/row/grouper.cc
compute/row/row_internal.cc)

append_avx2_src(compute/kernels/aggregate_basic_avx2.cc)
append_avx512_src(compute/kernels/aggregate_basic_avx512.cc)

append_avx2_src(compute/exec/bloom_filter_avx2.cc)
append_avx2_src(compute/exec/key_compare_avx2.cc)
append_avx2_src(compute/exec/key_encode_avx2.cc)
append_avx2_src(compute/exec/key_hash_avx2.cc)
append_avx2_src(compute/exec/key_map_avx2.cc)
append_avx2_src(compute/exec/util_avx2.cc)
append_avx2_src(compute/row/compare_internal_avx2.cc)
append_avx2_src(compute/row/encode_internal_avx2.cc)

list(APPEND ARROW_TESTING_SRCS compute/exec/test_util.cc)
endif()
Expand Down
1 change: 1 addition & 0 deletions cpp/src/arrow/compute/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 +71,4 @@ add_arrow_benchmark(function_benchmark PREFIX "arrow-compute")
add_subdirectory(kernels)

add_subdirectory(exec)
add_subdirectory(row)
6 changes: 6 additions & 0 deletions cpp/src/arrow/compute/api.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,9 @@
/// @}

#include "arrow/compute/exec/options.h" // IWYU pragma: export

/// \defgroup execnode-row Utilities for working with data in a row-major format
/// @{
/// @}

#include "arrow/compute/row/grouper.h" // IWYU pragma: export
93 changes: 15 additions & 78 deletions cpp/src/arrow/compute/api_aggregate.h
Original file line number Diff line number Diff line change
Expand Up @@ -395,84 +395,6 @@ Result<Datum> Index(const Datum& value, const IndexOptions& options,

namespace internal {

/// Internal use only: streaming group identifier.
/// Consumes batches of keys and yields batches of the group ids.
class ARROW_EXPORT Grouper {
public:
virtual ~Grouper() = default;

/// Construct a Grouper which receives the specified key types
static Result<std::unique_ptr<Grouper>> Make(const std::vector<ValueDescr>& descrs,
ExecContext* ctx = default_exec_context());

/// Consume a batch of keys, producing the corresponding group ids as an integer array.
/// Currently only uint32 indices will be produced, eventually the bit width will only
/// be as wide as necessary.
virtual Result<Datum> Consume(const ExecBatch& batch) = 0;

/// Get current unique keys. May be called multiple times.
virtual Result<ExecBatch> GetUniques() = 0;

/// Get the current number of groups.
virtual uint32_t num_groups() const = 0;

/// \brief Assemble lists of indices of identical elements.
///
/// \param[in] ids An unsigned, all-valid integral array which will be
/// used as grouping criteria.
/// \param[in] num_groups An upper bound for the elements of ids
/// \return A num_groups-long ListArray where the slot at i contains a
/// list of indices where i appears in ids.
///
/// MakeGroupings([
/// 2,
/// 2,
/// 5,
/// 5,
/// 2,
/// 3
/// ], 8) == [
/// [],
/// [],
/// [0, 1, 4],
/// [5],
/// [],
/// [2, 3],
/// [],
/// []
/// ]
static Result<std::shared_ptr<ListArray>> MakeGroupings(
const UInt32Array& ids, uint32_t num_groups,
ExecContext* ctx = default_exec_context());

/// \brief Produce a ListArray whose slots are selections of `array` which correspond to
/// the provided groupings.
///
/// For example,
/// ApplyGroupings([
/// [],
/// [],
/// [0, 1, 4],
/// [5],
/// [],
/// [2, 3],
/// [],
/// []
/// ], [2, 2, 5, 5, 2, 3]) == [
/// [],
/// [],
/// [2, 2, 2],
/// [3],
/// [],
/// [5, 5],
/// [],
/// []
/// ]
static Result<std::shared_ptr<ListArray>> ApplyGroupings(
const ListArray& groupings, const Array& array,
ExecContext* ctx = default_exec_context());
};

/// \brief Configure a grouped aggregation
struct ARROW_EXPORT Aggregate {
/// the name of the aggregation function
Expand All @@ -482,6 +404,21 @@ struct ARROW_EXPORT Aggregate {
const FunctionOptions* options;
};

Result<std::vector<const HashAggregateKernel*>> GetKernels(
ExecContext* ctx, const std::vector<internal::Aggregate>& aggregates,
const std::vector<ValueDescr>& in_descrs);

Result<std::vector<std::unique_ptr<KernelState>>> InitKernels(
const std::vector<const HashAggregateKernel*>& kernels, ExecContext* ctx,
const std::vector<internal::Aggregate>& aggregates,
const std::vector<ValueDescr>& in_descrs);

Result<FieldVector> ResolveKernels(
const std::vector<internal::Aggregate>& aggregates,
const std::vector<const HashAggregateKernel*>& kernels,
const std::vector<std::unique_ptr<KernelState>>& states, ExecContext* ctx,
const std::vector<ValueDescr>& descrs);

/// Internal use only: helper function for testing HashAggregateKernels.
/// This will be replaced by streaming execution operators.
ARROW_EXPORT
Expand Down
24 changes: 3 additions & 21 deletions cpp/src/arrow/compute/exec/aggregate_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "arrow/compute/exec/util.h"
#include "arrow/compute/exec_internal.h"
#include "arrow/compute/registry.h"
#include "arrow/compute/row/grouper.h"
#include "arrow/datum.h"
#include "arrow/result.h"
#include "arrow/util/checked_cast.h"
Expand All @@ -39,25 +40,6 @@ using internal::checked_cast;

namespace compute {

namespace internal {

Result<std::vector<const HashAggregateKernel*>> GetKernels(
ExecContext* ctx, const std::vector<internal::Aggregate>& aggregates,
const std::vector<ValueDescr>& in_descrs);

Result<std::vector<std::unique_ptr<KernelState>>> InitKernels(
const std::vector<const HashAggregateKernel*>& kernels, ExecContext* ctx,
const std::vector<internal::Aggregate>& aggregates,
const std::vector<ValueDescr>& in_descrs);

Result<FieldVector> ResolveKernels(
const std::vector<internal::Aggregate>& aggregates,
const std::vector<const HashAggregateKernel*>& kernels,
const std::vector<std::unique_ptr<KernelState>>& states, ExecContext* ctx,
const std::vector<ValueDescr>& descrs);

} // namespace internal

namespace {

void AggregatesToString(
Expand Down Expand Up @@ -647,7 +629,7 @@ class GroupByNode : public ExecNode {

private:
struct ThreadLocalState {
std::unique_ptr<internal::Grouper> grouper;
std::unique_ptr<Grouper> grouper;
std::vector<std::unique_ptr<KernelState>> agg_states;
};

Expand All @@ -670,7 +652,7 @@ class GroupByNode : public ExecNode {
}

// Construct grouper
ARROW_ASSIGN_OR_RAISE(state->grouper, internal::Grouper::Make(key_descrs, ctx_));
ARROW_ASSIGN_OR_RAISE(state->grouper, Grouper::Make(key_descrs, ctx_));

// Build vector of aggregate source field data types
std::vector<ValueDescr> agg_src_descrs(agg_kernels_.size());
Expand Down
3 changes: 1 addition & 2 deletions cpp/src/arrow/compute/exec/hash_join.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,11 @@
#include "arrow/compute/exec/key_hash.h"
#include "arrow/compute/exec/task_util.h"
#include "arrow/compute/kernels/row_encoder.h"
#include "arrow/compute/row/encode_internal.h"

namespace arrow {
namespace compute {

using internal::RowEncoder;

class HashJoinBasicImpl : public HashJoinImpl {
private:
struct ThreadLocalState;
Expand Down
Loading