Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion be/src/exprs/agg_fn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ AggFn::AggFn(const TExprNode& tnode, const SlotDescriptor& intermediate_slot_des
: Expr(tnode),
is_merge_(tnode.agg_expr.is_merge_agg),
intermediate_slot_desc_(intermediate_slot_desc),
output_slot_desc_(output_slot_desc) {
output_slot_desc_(output_slot_desc),
_vararg_start_idx(tnode.__isset.vararg_start_idx ? tnode.vararg_start_idx : -1) {
// TODO(pengyubing) arg_type_descs_ is used for codegen
// arg_type_descs_(AnyValUtil::column_type_to_type_desc(
// TypeDescriptor::from_thrift(tnode.agg_expr.arg_types))) {
Expand Down
6 changes: 6 additions & 0 deletions be/src/exprs/agg_fn.h
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,10 @@ class AggFn : public Expr {
virtual std::string DebugString() const;
static std::string DebugString(const std::vector<AggFn*>& exprs);

const int get_vararg_start_idx() const {
return _vararg_start_idx;
}

private:
friend class Expr;
friend class NewAggFnEvaluator;
Expand Down Expand Up @@ -178,6 +182,8 @@ class AggFn : public Expr {
void* serialize_fn_ = nullptr;
void* get_value_fn_ = nullptr;
void* finalize_fn_ = nullptr;

int _vararg_start_idx;

AggFn(const TExprNode& node, const SlotDescriptor& intermediate_slot_desc,
const SlotDescriptor& output_slot_desc);
Expand Down
200 changes: 148 additions & 52 deletions be/src/exprs/new_agg_fn_evaluator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,25 @@ typedef void (*UpdateFn7)(FunctionContext*, const AnyVal&, const AnyVal&,
typedef void (*UpdateFn8)(FunctionContext*, const AnyVal&, const AnyVal&,
const AnyVal&, const AnyVal&, const AnyVal&, const AnyVal&, const AnyVal&,
const AnyVal&, AnyVal*);

typedef void (*VarargUpdateFn0)(FunctionContext*, int num_varargs, const AnyVal*, AnyVal*);
typedef void (*VarargUpdateFn1)(FunctionContext*, const AnyVal&, int num_varargs, const AnyVal*, AnyVal*);
typedef void (*VarargUpdateFn2)(FunctionContext*, const AnyVal&, const AnyVal&, int num_varargs,
const AnyVal*, AnyVal*);
typedef void (*VarargUpdateFn3)(FunctionContext*, const AnyVal&, const AnyVal&, const AnyVal&,
int num_varargs, const AnyVal*, AnyVal*);
typedef void (*VarargUpdateFn4)(FunctionContext*, const AnyVal&, const AnyVal&, const AnyVal&,
const AnyVal&, int num_varargs, const AnyVal*, AnyVal*);
typedef void (*VarargUpdateFn5)(FunctionContext*, const AnyVal&, const AnyVal&, const AnyVal&,
const AnyVal&, const AnyVal&, int num_varargs, const AnyVal*, AnyVal*);
typedef void (*VarargUpdateFn6)(FunctionContext*, const AnyVal&, const AnyVal&, const AnyVal&,
const AnyVal&, const AnyVal&, const AnyVal&, int num_varargs, const AnyVal*, AnyVal*);
typedef void (*VarargUpdateFn7)(FunctionContext*, const AnyVal&, const AnyVal&, const AnyVal&,
const AnyVal&, const AnyVal&, const AnyVal&, const AnyVal&, int num_varargs, const AnyVal*, AnyVal*);
typedef void (*VarargUpdateFn8)(FunctionContext*, const AnyVal&, const AnyVal&, const AnyVal&,
const AnyVal&, const AnyVal&, const AnyVal&, const AnyVal&, const AnyVal&, int num_varargs,
const AnyVal*, AnyVal*);

typedef StringVal (*SerializeFn)(FunctionContext*, const StringVal&);
typedef AnyVal (*GetValueFn)(FunctionContext*, const AnyVal&);
typedef AnyVal (*FinalizeFn)(FunctionContext*, const AnyVal&);
Expand Down Expand Up @@ -394,58 +413,135 @@ void NewAggFnEvaluator::Update(const TupleRow* row, Tuple* dst, void* fn) {
// TODO: this part is not so good and not scalable. It can be replaced with
// codegen but we can also consider leaving it for the first few cases for
// debugging.
switch (input_evals_.size()) {
case 0:
reinterpret_cast<UpdateFn0>(fn)(agg_fn_ctx_.get(), staging_intermediate_val_);
break;
case 1:
reinterpret_cast<UpdateFn1>(fn)(agg_fn_ctx_.get(),
*staging_input_vals_[0], staging_intermediate_val_);
break;
case 2:
reinterpret_cast<UpdateFn2>(fn)(agg_fn_ctx_.get(),
*staging_input_vals_[0], *staging_input_vals_[1], staging_intermediate_val_);
break;
case 3:
reinterpret_cast<UpdateFn3>(fn)(agg_fn_ctx_.get(),
*staging_input_vals_[0], *staging_input_vals_[1],
*staging_input_vals_[2], staging_intermediate_val_);
break;
case 4:
reinterpret_cast<UpdateFn4>(fn)(agg_fn_ctx_.get(),
*staging_input_vals_[0], *staging_input_vals_[1],
*staging_input_vals_[2], *staging_input_vals_[3], staging_intermediate_val_);
break;
case 5:
reinterpret_cast<UpdateFn5>(fn)(agg_fn_ctx_.get(),
*staging_input_vals_[0], *staging_input_vals_[1],
*staging_input_vals_[2], *staging_input_vals_[3],
*staging_input_vals_[4], staging_intermediate_val_);
break;
case 6:
reinterpret_cast<UpdateFn6>(fn)(agg_fn_ctx_.get(),
*staging_input_vals_[0], *staging_input_vals_[1],
*staging_input_vals_[2], *staging_input_vals_[3],
*staging_input_vals_[4], *staging_input_vals_[5], staging_intermediate_val_);
break;
case 7:
reinterpret_cast<UpdateFn7>(fn)(agg_fn_ctx_.get(),
*staging_input_vals_[0], *staging_input_vals_[1],
*staging_input_vals_[2], *staging_input_vals_[3],
*staging_input_vals_[4], *staging_input_vals_[5],
*staging_input_vals_[6], staging_intermediate_val_);
break;
case 8:
reinterpret_cast<UpdateFn8>(fn)(agg_fn_ctx_.get(),
*staging_input_vals_[0], *staging_input_vals_[1],
*staging_input_vals_[2], *staging_input_vals_[3],
*staging_input_vals_[4], *staging_input_vals_[5],
*staging_input_vals_[6], *staging_input_vals_[7],
staging_intermediate_val_);
break;
default:
DCHECK(false) << "NYI";
}
if (agg_fn_.get_vararg_start_idx() == -1) {
switch (input_evals_.size()) {
case 0:
reinterpret_cast<UpdateFn0>(fn)(agg_fn_ctx_.get(), staging_intermediate_val_);
break;
case 1:
reinterpret_cast<UpdateFn1>(fn)(agg_fn_ctx_.get(),
*staging_input_vals_[0], staging_intermediate_val_);
break;
case 2:
reinterpret_cast<UpdateFn2>(fn)(agg_fn_ctx_.get(),
*staging_input_vals_[0], *staging_input_vals_[1],
staging_intermediate_val_);
break;
case 3:
reinterpret_cast<UpdateFn3>(fn)(agg_fn_ctx_.get(),
*staging_input_vals_[0], *staging_input_vals_[1],
*staging_input_vals_[2], staging_intermediate_val_);
break;
case 4:
reinterpret_cast<UpdateFn4>(fn)(agg_fn_ctx_.get(),
*staging_input_vals_[0], *staging_input_vals_[1],
*staging_input_vals_[2], *staging_input_vals_[3],
staging_intermediate_val_);
break;
case 5:
reinterpret_cast<UpdateFn5>(fn)(agg_fn_ctx_.get(),
*staging_input_vals_[0], *staging_input_vals_[1],
*staging_input_vals_[2], *staging_input_vals_[3],
*staging_input_vals_[4], staging_intermediate_val_);
break;
case 6:
reinterpret_cast<UpdateFn6>(fn)(agg_fn_ctx_.get(),
*staging_input_vals_[0], *staging_input_vals_[1],
*staging_input_vals_[2], *staging_input_vals_[3],
*staging_input_vals_[4], *staging_input_vals_[5],
staging_intermediate_val_);
break;
case 7:
reinterpret_cast<UpdateFn7>(fn)(agg_fn_ctx_.get(),
*staging_input_vals_[0], *staging_input_vals_[1],
*staging_input_vals_[2], *staging_input_vals_[3],
*staging_input_vals_[4], *staging_input_vals_[5],
*staging_input_vals_[6], staging_intermediate_val_);
break;
case 8:
reinterpret_cast<UpdateFn8>(fn)(agg_fn_ctx_.get(),
*staging_input_vals_[0], *staging_input_vals_[1],
*staging_input_vals_[2], *staging_input_vals_[3],
*staging_input_vals_[4], *staging_input_vals_[5],
*staging_input_vals_[6], *staging_input_vals_[7],
staging_intermediate_val_);
break;
default:
DCHECK(false) << "NYI";
}
} else {
int num_varargs = input_evals_.size() - agg_fn_.get_vararg_start_idx();
const AnyVal* varargs = *(staging_input_vals_.data() + agg_fn_.get_vararg_start_idx());
switch (agg_fn_.get_vararg_start_idx()) {
case 0:
reinterpret_cast<VarargUpdateFn0>(fn)(agg_fn_ctx_.get(),
num_varargs, varargs,
staging_intermediate_val_);
break;
case 1:
reinterpret_cast<VarargUpdateFn1>(fn)(agg_fn_ctx_.get(),
*staging_input_vals_[0],
num_varargs, varargs,
staging_intermediate_val_);
break;
case 2:
reinterpret_cast<VarargUpdateFn2>(fn)(agg_fn_ctx_.get(),
*staging_input_vals_[0], *staging_input_vals_[1],
num_varargs, varargs,
staging_intermediate_val_);
break;
case 3:
reinterpret_cast<VarargUpdateFn3>(fn)(agg_fn_ctx_.get(),
*staging_input_vals_[0], *staging_input_vals_[1],
*staging_input_vals_[2],
num_varargs, varargs,
staging_intermediate_val_);
break;
case 4:
reinterpret_cast<VarargUpdateFn4>(fn)(agg_fn_ctx_.get(),
*staging_input_vals_[0], *staging_input_vals_[1],
*staging_input_vals_[2], *staging_input_vals_[3],
num_varargs, varargs,
staging_intermediate_val_);
break;
case 5:
reinterpret_cast<VarargUpdateFn5>(fn)(agg_fn_ctx_.get(),
*staging_input_vals_[0], *staging_input_vals_[1],
*staging_input_vals_[2], *staging_input_vals_[3],
*staging_input_vals_[4],
num_varargs, varargs,
staging_intermediate_val_);
break;
case 6:
reinterpret_cast<VarargUpdateFn6>(fn)(agg_fn_ctx_.get(),
*staging_input_vals_[0], *staging_input_vals_[1],
*staging_input_vals_[2], *staging_input_vals_[3],
*staging_input_vals_[4], *staging_input_vals_[5],
num_varargs, varargs,
staging_intermediate_val_);
break;
case 7:
reinterpret_cast<VarargUpdateFn7>(fn)(agg_fn_ctx_.get(),
*staging_input_vals_[0], *staging_input_vals_[1],
*staging_input_vals_[2], *staging_input_vals_[3],
*staging_input_vals_[4], *staging_input_vals_[5],
*staging_input_vals_[6],
num_varargs, varargs,
staging_intermediate_val_);
break;
case 8:
reinterpret_cast<VarargUpdateFn8>(fn)(agg_fn_ctx_.get(),
*staging_input_vals_[0], *staging_input_vals_[1],
*staging_input_vals_[2], *staging_input_vals_[3],
*staging_input_vals_[4], *staging_input_vals_[5],
*staging_input_vals_[6], *staging_input_vals_[7],
num_varargs, varargs,
staging_intermediate_val_);
break;
default:
DCHECK(false) << "NYI";
}
}
SetDstSlot(staging_intermediate_val_, slot_desc, dst);
}

Expand Down