diff --git a/be/src/exprs/minmax_predicate.h b/be/src/exprs/minmax_predicate.h index c390c8e71591d5..cdf898292fc322 100644 --- a/be/src/exprs/minmax_predicate.h +++ b/be/src/exprs/minmax_predicate.h @@ -128,11 +128,164 @@ class MinMaxNumFunc : public MinMaxFuncBase { return Status::OK(); } -private: +protected: T _max = type_limit::min(); T _min = type_limit::max(); // we use _empty to avoid compare twice bool _empty = true; }; +template +class MinNumFunc : public MinMaxNumFunc { +public: + MinNumFunc() = default; + ~MinNumFunc() override = default; + + void insert(const void* data) override { + if (data == nullptr) { + return; + } + + T val_data = *reinterpret_cast(data); + + if (this->_empty) { + this->_min = val_data; + this->_empty = false; + return; + } + if (val_data < this->_min) { + this->_min = val_data; + } + } + + void insert_fixed_len(const char* data, const int* offsets, int number) override { + if (!number) { + return; + } + if (this->_empty) { + this->_min = *((T*)data + offsets[0]); + } + for (int i = this->_empty; i < number; i++) { + this->_min = std::min(this->_min, *((T*)data + offsets[i])); + } + this->_empty = false; + } + + bool find(void* data) override { + if (data == nullptr) { + return false; + } + + T val_data = *reinterpret_cast(data); + return val_data >= this->_min; + } + + Status merge(MinMaxFuncBase* minmax_func, ObjectPool* pool) override { + if constexpr (std::is_same_v) { + MinNumFunc* other_minmax = assert_cast*>(minmax_func); + if (other_minmax->_min < this->_min) { + auto& other_min = other_minmax->_min; + auto str = pool->add(new std::string(other_min.data, other_min.size)); + this->_min.data = str->data(); + this->_min.size = str->length(); + } + } else { + MinNumFunc* other_minmax = assert_cast*>(minmax_func); + if (other_minmax->_min < this->_min) { + this->_min = other_minmax->_min; + } + } + + return Status::OK(); + } + + //min filter the max is useless, so return nullptr directly + void* get_max() override { + DCHECK(false); + return nullptr; + } + + Status assign(void* min_data, void* max_data) override { + this->_min = *(T*)min_data; + return Status::OK(); + } +}; + +template +class MaxNumFunc : public MinMaxNumFunc { +public: + MaxNumFunc() = default; + ~MaxNumFunc() override = default; + + void insert(const void* data) override { + if (data == nullptr) { + return; + } + + T val_data = *reinterpret_cast(data); + + if (this->_empty) { + this->_max = val_data; + this->_empty = false; + return; + } + if (val_data > this->_max) { + this->_max = val_data; + } + } + + void insert_fixed_len(const char* data, const int* offsets, int number) override { + if (!number) { + return; + } + if (this->_empty) { + this->_max = *((T*)data + offsets[0]); + } + for (int i = this->_empty; i < number; i++) { + this->_max = std::max(this->_max, *((T*)data + offsets[i])); + } + this->_empty = false; + } + + bool find(void* data) override { + if (data == nullptr) { + return false; + } + + T val_data = *reinterpret_cast(data); + return val_data <= this->_max; + } + + Status merge(MinMaxFuncBase* minmax_func, ObjectPool* pool) override { + if constexpr (std::is_same_v) { + MinMaxNumFunc* other_minmax = assert_cast*>(minmax_func); + + if (other_minmax->_max > this->_max) { + auto& other_max = other_minmax->_max; + auto str = pool->add(new std::string(other_max.data, other_max.size)); + this->_max.data = str->data(); + this->_max.size = str->length(); + } + } else { + MinMaxNumFunc* other_minmax = assert_cast*>(minmax_func); + if (other_minmax->_max > this->_max) { + this->_max = other_minmax->_max; + } + } + + return Status::OK(); + } + + //max filter the min is useless, so return nullptr directly + void* get_min() override { + DCHECK(false); + return nullptr; + } + + Status assign(void* min_data, void* max_data) override { + this->_max = *(T*)max_data; + return Status::OK(); + } +}; + } // namespace doris diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp index f7d60043135a0c..3679ce4c84c917 100644 --- a/be/src/exprs/runtime_filter.cpp +++ b/be/src/exprs/runtime_filter.cpp @@ -323,6 +323,8 @@ class RuntimePredicateWrapper { _context.hybrid_set.reset(create_set(_column_return_type)); break; } + case RuntimeFilterType::MIN_FILTER: + case RuntimeFilterType::MAX_FILTER: case RuntimeFilterType::MINMAX_FILTER: { _context.minmax_func.reset(create_minmax_filter(_column_return_type)); break; @@ -405,6 +407,8 @@ class RuntimePredicateWrapper { _context.hybrid_set->insert(data); break; } + case RuntimeFilterType::MIN_FILTER: + case RuntimeFilterType::MAX_FILTER: case RuntimeFilterType::MINMAX_FILTER: { _context.minmax_func->insert(data); break; @@ -448,6 +452,8 @@ class RuntimePredicateWrapper { _context.hybrid_set->insert_fixed_len(data, offsets, number); break; } + case RuntimeFilterType::MIN_FILTER: + case RuntimeFilterType::MAX_FILTER: case RuntimeFilterType::MINMAX_FILTER: { _context.minmax_func->insert_fixed_len(data, offsets, number); break; @@ -575,6 +581,8 @@ class RuntimePredicateWrapper { } break; } + case RuntimeFilterType::MIN_FILTER: + case RuntimeFilterType::MAX_FILTER: case RuntimeFilterType::MINMAX_FILTER: { static_cast( _context.minmax_func->merge(wrapper->_context.minmax_func.get(), _pool)); @@ -1770,6 +1778,43 @@ Status RuntimePredicateWrapper::get_push_exprs(std::listroot()->type(), TExprOpcode::GE, min_pred, + &min_pred_node)); + vectorized::VExprSPtr min_literal; + RETURN_IF_ERROR(create_literal(probe_ctx->root()->type(), _context.minmax_func->get_min(), + min_literal)); + min_pred->add_child(probe_ctx->root()); + min_pred->add_child(min_literal); + container.push_back( + vectorized::VRuntimeFilterWrapper::create_shared(min_pred_node, min_pred)); + vectorized::VExprContextSPtr new_probe_ctx; + RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(probe_expr, new_probe_ctx)); + probe_ctxs.push_back(new_probe_ctx); + break; + } + case RuntimeFilterType::MAX_FILTER: { + vectorized::VExprSPtr max_pred; + // create max filter + TExprNode max_pred_node; + RETURN_IF_ERROR(create_vbin_predicate(probe_ctx->root()->type(), TExprOpcode::LE, max_pred, + &max_pred_node)); + vectorized::VExprSPtr max_literal; + RETURN_IF_ERROR(create_literal(probe_ctx->root()->type(), _context.minmax_func->get_max(), + max_literal)); + max_pred->add_child(probe_ctx->root()); + max_pred->add_child(max_literal); + container.push_back( + vectorized::VRuntimeFilterWrapper::create_shared(max_pred_node, max_pred)); + + vectorized::VExprContextSPtr new_probe_ctx; + RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(probe_expr, new_probe_ctx)); + probe_ctxs.push_back(new_probe_ctx); + break; + } case RuntimeFilterType::MINMAX_FILTER: { vectorized::VExprSPtr max_pred; // create max filter diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h index f03b27cfbf9311..a05e594db622aa 100644 --- a/be/src/exprs/runtime_filter.h +++ b/be/src/exprs/runtime_filter.h @@ -76,9 +76,29 @@ enum class RuntimeFilterType { MINMAX_FILTER = 1, BLOOM_FILTER = 2, IN_OR_BLOOM_FILTER = 3, - BITMAP_FILTER = 4 + BITMAP_FILTER = 4, + MIN_FILTER = 5, // only min // now only support at local + MAX_FILTER = 6 // only max // now only support at local }; +static RuntimeFilterType get_minmax_filter_type(TMinMaxRuntimeFilterType::type ttype) { + switch (ttype) { + case TMinMaxRuntimeFilterType::MIN: { + return RuntimeFilterType::MIN_FILTER; + } + case TMinMaxRuntimeFilterType::MAX: { + return RuntimeFilterType::MAX_FILTER; + } + case TMinMaxRuntimeFilterType::MIN_MAX: { + return RuntimeFilterType::MINMAX_FILTER; + } + default: { + throw doris::Exception(doris::ErrorCode::INTERNAL_ERROR, + "Invalid minmax runtime filter type!"); + } + } +} + static RuntimeFilterType get_runtime_filter_type(TRuntimeFilterType::type ttype) { switch (ttype) { case TRuntimeFilterType::BLOOM: { @@ -189,10 +209,15 @@ class IRuntimeFilter { _is_ignored(false), registration_time_(MonotonicMillis()), _enable_pipeline_exec(_state->enable_pipeline_exec()), - _runtime_filter_type(get_runtime_filter_type(desc->type)), - _name(fmt::format("RuntimeFilter: (id = {}, type = {})", _filter_id, - to_string(_runtime_filter_type))), - _profile(new RuntimeProfile(_name)) {} + _profile(new RuntimeProfile(_name)) { + if (desc->__isset.min_max_type && desc->type == TRuntimeFilterType::MIN_MAX) { + _runtime_filter_type = get_minmax_filter_type(desc->min_max_type); + } else { + _runtime_filter_type = get_runtime_filter_type(desc->type); + } + _name = fmt::format("RuntimeFilter: (id = {}, type = {})", _filter_id, + to_string(_runtime_filter_type)); + } IRuntimeFilter(QueryContext* query_ctx, ObjectPool* pool, const TRuntimeFilterDesc* desc) : _query_ctx(query_ctx), @@ -209,10 +234,15 @@ class IRuntimeFilter { _is_ignored(false), registration_time_(MonotonicMillis()), _enable_pipeline_exec(query_ctx->enable_pipeline_exec()), - _runtime_filter_type(get_runtime_filter_type(desc->type)), - _name(fmt::format("RuntimeFilter: (id = {}, type = {})", _filter_id, - to_string(_runtime_filter_type))), - _profile(new RuntimeProfile(_name)) {} + _profile(new RuntimeProfile(_name)) { + if (desc->__isset.min_max_type && desc->type == TRuntimeFilterType::MIN_MAX) { + _runtime_filter_type = get_minmax_filter_type(desc->min_max_type); + } else { + _runtime_filter_type = get_runtime_filter_type(desc->type); + } + _name = fmt::format("RuntimeFilter: (id = {}, type = {})", _filter_id, + to_string(_runtime_filter_type)); + } ~IRuntimeFilter() = default; @@ -335,6 +365,12 @@ class IRuntimeFilter { case RuntimeFilterType::BLOOM_FILTER: { return std::string("bloomfilter"); } + case RuntimeFilterType::MIN_FILTER: { + return std::string("only_min"); + } + case RuntimeFilterType::MAX_FILTER: { + return std::string("only_max"); + } case RuntimeFilterType::MINMAX_FILTER: { return std::string("minmax"); }