Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
155 changes: 154 additions & 1 deletion be/src/exprs/minmax_predicate.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,11 +128,164 @@ class MinMaxNumFunc : public MinMaxFuncBase {
return Status::OK();
}

private:
protected:
T _max = type_limit<T>::min();
T _min = type_limit<T>::max();
// we use _empty to avoid compare twice
bool _empty = true;
};

template <class T>
class MinNumFunc : public MinMaxNumFunc<T> {
public:
MinNumFunc() = default;
~MinNumFunc() override = default;

void insert(const void* data) override {
if (data == nullptr) {
return;
}

T val_data = *reinterpret_cast<const T*>(data);

if (this->_empty) {
this->_min = val_data;
this->_empty = false;
return;
}
if (val_data < this->_min) {
this->_min = val_data;
}
}

void insert_fixed_len(const char* data, const int* offsets, int number) override {
if (!number) {
return;
}
if (this->_empty) {
this->_min = *((T*)data + offsets[0]);
}
for (int i = this->_empty; i < number; i++) {
this->_min = std::min(this->_min, *((T*)data + offsets[i]));
}
this->_empty = false;
}

bool find(void* data) override {
if (data == nullptr) {
return false;
}

T val_data = *reinterpret_cast<T*>(data);
return val_data >= this->_min;
}

Status merge(MinMaxFuncBase* minmax_func, ObjectPool* pool) override {
if constexpr (std::is_same_v<T, StringRef>) {
MinNumFunc<T>* other_minmax = assert_cast<MinNumFunc<T>*>(minmax_func);
if (other_minmax->_min < this->_min) {
auto& other_min = other_minmax->_min;
auto str = pool->add(new std::string(other_min.data, other_min.size));
this->_min.data = str->data();
this->_min.size = str->length();
}
} else {
MinNumFunc<T>* other_minmax = assert_cast<MinNumFunc<T>*>(minmax_func);
if (other_minmax->_min < this->_min) {
this->_min = other_minmax->_min;
}
}

return Status::OK();
}

//min filter the max is useless, so return nullptr directly
void* get_max() override {
DCHECK(false);
return nullptr;
}

Status assign(void* min_data, void* max_data) override {
this->_min = *(T*)min_data;
return Status::OK();
}
};

template <class T>
class MaxNumFunc : public MinMaxNumFunc<T> {
public:
MaxNumFunc() = default;
~MaxNumFunc() override = default;

void insert(const void* data) override {
if (data == nullptr) {
return;
}

T val_data = *reinterpret_cast<const T*>(data);

if (this->_empty) {
this->_max = val_data;
this->_empty = false;
return;
}
if (val_data > this->_max) {
this->_max = val_data;
}
}

void insert_fixed_len(const char* data, const int* offsets, int number) override {
if (!number) {
return;
}
if (this->_empty) {
this->_max = *((T*)data + offsets[0]);
}
for (int i = this->_empty; i < number; i++) {
this->_max = std::max(this->_max, *((T*)data + offsets[i]));
}
this->_empty = false;
}

bool find(void* data) override {
if (data == nullptr) {
return false;
}

T val_data = *reinterpret_cast<T*>(data);
return val_data <= this->_max;
}

Status merge(MinMaxFuncBase* minmax_func, ObjectPool* pool) override {
if constexpr (std::is_same_v<T, StringRef>) {
MinMaxNumFunc<T>* other_minmax = assert_cast<MinMaxNumFunc<T>*>(minmax_func);

if (other_minmax->_max > this->_max) {
auto& other_max = other_minmax->_max;
auto str = pool->add(new std::string(other_max.data, other_max.size));
this->_max.data = str->data();
this->_max.size = str->length();
}
} else {
MinMaxNumFunc<T>* other_minmax = assert_cast<MinMaxNumFunc<T>*>(minmax_func);
if (other_minmax->_max > this->_max) {
this->_max = other_minmax->_max;
}
}

return Status::OK();
}

//max filter the min is useless, so return nullptr directly
void* get_min() override {
DCHECK(false);
return nullptr;
}

Status assign(void* min_data, void* max_data) override {
this->_max = *(T*)max_data;
return Status::OK();
}
};

} // namespace doris
45 changes: 45 additions & 0 deletions be/src/exprs/runtime_filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,8 @@ class RuntimePredicateWrapper {
_context.hybrid_set.reset(create_set(_column_return_type));
break;
}
case RuntimeFilterType::MIN_FILTER:
case RuntimeFilterType::MAX_FILTER:
case RuntimeFilterType::MINMAX_FILTER: {
_context.minmax_func.reset(create_minmax_filter(_column_return_type));
break;
Expand Down Expand Up @@ -405,6 +407,8 @@ class RuntimePredicateWrapper {
_context.hybrid_set->insert(data);
break;
}
case RuntimeFilterType::MIN_FILTER:
case RuntimeFilterType::MAX_FILTER:
case RuntimeFilterType::MINMAX_FILTER: {
_context.minmax_func->insert(data);
break;
Expand Down Expand Up @@ -448,6 +452,8 @@ class RuntimePredicateWrapper {
_context.hybrid_set->insert_fixed_len(data, offsets, number);
break;
}
case RuntimeFilterType::MIN_FILTER:
case RuntimeFilterType::MAX_FILTER:
case RuntimeFilterType::MINMAX_FILTER: {
_context.minmax_func->insert_fixed_len(data, offsets, number);
break;
Expand Down Expand Up @@ -575,6 +581,8 @@ class RuntimePredicateWrapper {
}
break;
}
case RuntimeFilterType::MIN_FILTER:
case RuntimeFilterType::MAX_FILTER:
case RuntimeFilterType::MINMAX_FILTER: {
static_cast<void>(
_context.minmax_func->merge(wrapper->_context.minmax_func.get(), _pool));
Expand Down Expand Up @@ -1770,6 +1778,43 @@ Status RuntimePredicateWrapper::get_push_exprs(std::list<vectorized::VExprContex
}
break;
}
case RuntimeFilterType::MIN_FILTER: {
// create min filter
vectorized::VExprSPtr min_pred;
TExprNode min_pred_node;
RETURN_IF_ERROR(create_vbin_predicate(probe_ctx->root()->type(), TExprOpcode::GE, min_pred,
&min_pred_node));
vectorized::VExprSPtr min_literal;
RETURN_IF_ERROR(create_literal(probe_ctx->root()->type(), _context.minmax_func->get_min(),
min_literal));
min_pred->add_child(probe_ctx->root());
min_pred->add_child(min_literal);
container.push_back(
vectorized::VRuntimeFilterWrapper::create_shared(min_pred_node, min_pred));
vectorized::VExprContextSPtr new_probe_ctx;
RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(probe_expr, new_probe_ctx));
probe_ctxs.push_back(new_probe_ctx);
break;
}
case RuntimeFilterType::MAX_FILTER: {
vectorized::VExprSPtr max_pred;
// create max filter
TExprNode max_pred_node;
RETURN_IF_ERROR(create_vbin_predicate(probe_ctx->root()->type(), TExprOpcode::LE, max_pred,
&max_pred_node));
vectorized::VExprSPtr max_literal;
RETURN_IF_ERROR(create_literal(probe_ctx->root()->type(), _context.minmax_func->get_max(),
max_literal));
max_pred->add_child(probe_ctx->root());
max_pred->add_child(max_literal);
container.push_back(
vectorized::VRuntimeFilterWrapper::create_shared(max_pred_node, max_pred));

vectorized::VExprContextSPtr new_probe_ctx;
RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(probe_expr, new_probe_ctx));
probe_ctxs.push_back(new_probe_ctx);
break;
}
case RuntimeFilterType::MINMAX_FILTER: {
vectorized::VExprSPtr max_pred;
// create max filter
Expand Down
54 changes: 45 additions & 9 deletions be/src/exprs/runtime_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,29 @@ enum class RuntimeFilterType {
MINMAX_FILTER = 1,
BLOOM_FILTER = 2,
IN_OR_BLOOM_FILTER = 3,
BITMAP_FILTER = 4
BITMAP_FILTER = 4,
MIN_FILTER = 5, // only min // now only support at local
MAX_FILTER = 6 // only max // now only support at local
};

static RuntimeFilterType get_minmax_filter_type(TMinMaxRuntimeFilterType::type ttype) {
switch (ttype) {
case TMinMaxRuntimeFilterType::MIN: {
return RuntimeFilterType::MIN_FILTER;
}
case TMinMaxRuntimeFilterType::MAX: {
return RuntimeFilterType::MAX_FILTER;
}
case TMinMaxRuntimeFilterType::MIN_MAX: {
return RuntimeFilterType::MINMAX_FILTER;
}
default: {
throw doris::Exception(doris::ErrorCode::INTERNAL_ERROR,
"Invalid minmax runtime filter type!");
}
}
}

static RuntimeFilterType get_runtime_filter_type(TRuntimeFilterType::type ttype) {
switch (ttype) {
case TRuntimeFilterType::BLOOM: {
Expand Down Expand Up @@ -189,10 +209,15 @@ class IRuntimeFilter {
_is_ignored(false),
registration_time_(MonotonicMillis()),
_enable_pipeline_exec(_state->enable_pipeline_exec()),
_runtime_filter_type(get_runtime_filter_type(desc->type)),
_name(fmt::format("RuntimeFilter: (id = {}, type = {})", _filter_id,
to_string(_runtime_filter_type))),
_profile(new RuntimeProfile(_name)) {}
_profile(new RuntimeProfile(_name)) {
if (desc->__isset.min_max_type && desc->type == TRuntimeFilterType::MIN_MAX) {
_runtime_filter_type = get_minmax_filter_type(desc->min_max_type);
} else {
_runtime_filter_type = get_runtime_filter_type(desc->type);
}
_name = fmt::format("RuntimeFilter: (id = {}, type = {})", _filter_id,
to_string(_runtime_filter_type));
}

IRuntimeFilter(QueryContext* query_ctx, ObjectPool* pool, const TRuntimeFilterDesc* desc)
: _query_ctx(query_ctx),
Expand All @@ -209,10 +234,15 @@ class IRuntimeFilter {
_is_ignored(false),
registration_time_(MonotonicMillis()),
_enable_pipeline_exec(query_ctx->enable_pipeline_exec()),
_runtime_filter_type(get_runtime_filter_type(desc->type)),
_name(fmt::format("RuntimeFilter: (id = {}, type = {})", _filter_id,
to_string(_runtime_filter_type))),
_profile(new RuntimeProfile(_name)) {}
_profile(new RuntimeProfile(_name)) {
if (desc->__isset.min_max_type && desc->type == TRuntimeFilterType::MIN_MAX) {
_runtime_filter_type = get_minmax_filter_type(desc->min_max_type);
} else {
_runtime_filter_type = get_runtime_filter_type(desc->type);
}
_name = fmt::format("RuntimeFilter: (id = {}, type = {})", _filter_id,
to_string(_runtime_filter_type));
}

~IRuntimeFilter() = default;

Expand Down Expand Up @@ -335,6 +365,12 @@ class IRuntimeFilter {
case RuntimeFilterType::BLOOM_FILTER: {
return std::string("bloomfilter");
}
case RuntimeFilterType::MIN_FILTER: {
return std::string("only_min");
}
case RuntimeFilterType::MAX_FILTER: {
return std::string("only_max");
}
case RuntimeFilterType::MINMAX_FILTER: {
return std::string("minmax");
}
Expand Down