Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 48 additions & 17 deletions be/src/exec/olap_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,10 @@ class ColumnValueRange {
ColumnValueRange(std::string col_name, PrimitiveType type, const T& min, const T& max, bool contain_null);

// should add fixed value before add range
Status add_fixed_value(T value);
Status add_fixed_value(const T& value);

// should remove fixed value after add fixed value
void remove_fixed_value(const T& value);

Status add_range(SQLFilterOp op, T value);

Expand Down Expand Up @@ -109,24 +112,16 @@ class ColumnValueRange {

PrimitiveType type() const { return _column_type; }

const std::string& column_name() const { return _column_name; }

bool contain_null() const { return _contain_null; }

size_t get_fixed_value_size() const { return _fixed_values.size(); }

void to_olap_filter(std::list<TCondition>& filters) {
void to_olap_filter(std::vector<TCondition>& filters) {
if (is_fixed_value_range()) {
// 1. convert to in filter condition
TCondition condition;
condition.__set_column_name(_column_name);
condition.__set_condition_op("*=");

for (const auto& value : _fixed_values) {
condition.condition_values.push_back(cast_to_string(value));
}

if (condition.condition_values.size() != 0) {
filters.push_back(condition);
}
to_in_condition(filters, true);
} else if (_low_value < _high_value) {
// 2. convert to min max filter condition
TCondition null_pred;
Expand Down Expand Up @@ -178,6 +173,20 @@ class ColumnValueRange {
}
}

void to_in_condition(std::vector<TCondition>& filters, bool is_in = true) {
TCondition condition;
condition.__set_column_name(_column_name);
condition.__set_condition_op(is_in ? "*=" : "!*=");

for (const auto& value : _fixed_values) {
condition.condition_values.push_back(cast_to_string(value));
}

if (condition.condition_values.size() != 0) {
filters.push_back(condition);
}
}

void set_whole_value_range() {
_fixed_values.clear();
_low_value = TYPE_MIN;
Expand Down Expand Up @@ -207,8 +216,20 @@ class ColumnValueRange {
_contain_null = contain_null;
};

static void add_fixed_value_range(ColumnValueRange<T>& range, T* value) {
range.add_fixed_value(*value);
}

static void remove_fixed_value_range(ColumnValueRange<T>& range, T* value) {
range.remove_fixed_value(*value);
}

static ColumnValueRange<T> create_empty_column_value_range(PrimitiveType type) {
return ColumnValueRange<T>("", type, TYPE_MAX, TYPE_MIN, false);
return ColumnValueRange<T>::create_empty_column_value_range("", type);
}

static ColumnValueRange<T> create_empty_column_value_range(const std::string& col_name, PrimitiveType type) {
return ColumnValueRange<T>(col_name, type, TYPE_MAX, TYPE_MIN, false);
}

protected:
Expand Down Expand Up @@ -323,16 +344,25 @@ ColumnValueRange<T>::ColumnValueRange(std::string col_name, PrimitiveType type,
_contain_null(contain_null){}

template <class T>
Status ColumnValueRange<T>::add_fixed_value(T value) {
Status ColumnValueRange<T>::add_fixed_value(const T& value) {
if (INVALID_TYPE == _column_type) {
return Status::InternalError("AddFixedValue failed, Invalid type");
}

_fixed_values.insert(value);
_contain_null = false;

_high_value = TYPE_MIN;
_low_value = TYPE_MAX;

return Status::OK();
}

template <class T>
void ColumnValueRange<T>::remove_fixed_value(const T& value) {
_fixed_values.erase(value);
}

template <class T>
bool ColumnValueRange<T>::is_fixed_value_range() const {
return _fixed_values.size() != 0;
Expand Down Expand Up @@ -480,7 +510,6 @@ Status ColumnValueRange<T>::add_range(SQLFilterOp op, T value) {
_low_value = TYPE_MAX;
} else {
if (_high_value > _low_value) {

switch (op) {
case FILTER_LARGER: {
if (value >= _low_value) {
Expand Down Expand Up @@ -593,7 +622,7 @@ void ColumnValueRange<T>::intersection(ColumnValueRange<T>& range) {
}

std::set<T> result_values;
// 3. fixed_value intersection, fixex value range do not contain null
// 3. fixed_value intersection, fixed value range do not contain null
if (is_fixed_value_range() || range.is_fixed_value_range()) {
if (is_fixed_value_range() && range.is_fixed_value_range()) {
set_intersection(_fixed_values.begin(), _fixed_values.end(), range._fixed_values.begin(),
Expand Down Expand Up @@ -621,6 +650,8 @@ void ColumnValueRange<T>::intersection(ColumnValueRange<T>& range) {
if (!result_values.empty()) {
_fixed_values = std::move(result_values);
_contain_null = false;
_high_value = TYPE_MIN;
_low_value = TYPE_MAX;
} else {
set_empty_value_range();
}
Expand Down
135 changes: 109 additions & 26 deletions be/src/exec/olap_scan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -536,14 +536,12 @@ Status OlapScanNode::normalize_conjuncts() {
}

Status OlapScanNode::build_olap_filters() {
_olap_filter.clear();

for (auto& iter : _column_value_ranges) {
ToOlapFilterVisitor visitor;
boost::variant<std::list<TCondition>> filters;
boost::variant<std::vector<TCondition>> filters;
boost::apply_visitor(visitor, iter.second, filters);

std::list<TCondition> new_filters = boost::get<std::list<TCondition>>(filters);
std::vector<TCondition> new_filters = boost::get<std::vector<TCondition>>(filters);
if (new_filters.empty()) {
continue;
}
Expand Down Expand Up @@ -735,13 +733,16 @@ Status OlapScanNode::normalize_predicate(ColumnValueRange<T>& range, SlotDescrip
// 1. Normalize InPredicate, add to ColumnValueRange
RETURN_IF_ERROR(normalize_in_and_eq_predicate(slot, &range));

// 2. Normalize BinaryPredicate , add to ColumnValueRange
// 2. Normalize NotInPredicate, add to ColumnValueRange
RETURN_IF_ERROR(normalize_not_in_and_not_eq_predicate(slot, &range));

// 3. Normalize BinaryPredicate , add to ColumnValueRange
RETURN_IF_ERROR(normalize_noneq_binary_predicate(slot, &range));

// 3. Check whether range is empty, set _eos
// 4. Check whether range is empty, set _eos
if (range.is_empty_value_range()) _eos = true;

// 4. Add range to Column->ColumnValueRange map
// 5. Add range to Column->ColumnValueRange map
_column_value_ranges[slot->col_name()] = range;

return Status::OK();
Expand All @@ -759,11 +760,6 @@ static bool ignore_cast(SlotDescriptor* slot, Expr* expr) {


bool OlapScanNode::should_push_down_in_predicate(doris::SlotDescriptor *slot, doris::InPredicate* pred) {
if (pred->is_not_in()) {
// can not push down NOT IN predicate to storage engine
return false;
}

if (Expr::type_without_cast(pred->get_child(0)) != TExprNodeType::SLOT_REF) {
// not a slot ref(column)
return false;
Expand Down Expand Up @@ -844,21 +840,17 @@ std::pair<bool, void*> OlapScanNode::should_push_down_eq_predicate(doris::SlotDe
return result_pair;
}

template <typename T>
Status OlapScanNode::insert_value_to_range(doris::ColumnValueRange<T>& temp_range, doris::PrimitiveType type, void *value) {
template <typename T, typename ChangeFixedValueRangeFunc>
Status OlapScanNode::change_fixed_value_range(ColumnValueRange<T>& temp_range, PrimitiveType type, void *value,
const ChangeFixedValueRangeFunc& func) {
switch (type) {
case TYPE_TINYINT: {
int32_t v = *reinterpret_cast<int8_t*>(value);
temp_range.add_fixed_value(*reinterpret_cast<T*>(&v));
break;
}
case TYPE_DATE: {
DateTimeValue date_value =
*reinterpret_cast<DateTimeValue*>(value);
// There is must return empty data in olap_scan_node,
// Because data value loss accuracy
if (!date_value.check_loss_accuracy_cast_to_date()) {
temp_range.add_fixed_value(*reinterpret_cast<T*>(&date_value));
func(temp_range, reinterpret_cast<T*>(&date_value));
}
break;
}
Expand All @@ -868,16 +860,17 @@ Status OlapScanNode::insert_value_to_range(doris::ColumnValueRange<T>& temp_rang
case TYPE_VARCHAR:
case TYPE_HLL:
case TYPE_DATETIME:
case TYPE_TINYINT:
case TYPE_SMALLINT:
case TYPE_INT:
case TYPE_BIGINT:
case TYPE_LARGEINT: {
temp_range.add_fixed_value(*reinterpret_cast<T*>(value));
func(temp_range, reinterpret_cast<T*>(value));
break;
}
case TYPE_BOOLEAN: {
bool v = *reinterpret_cast<bool*>(value);
temp_range.add_fixed_value(*reinterpret_cast<T*>(&v));
func(temp_range, reinterpret_cast<T*>(&v));
break;
}
default: {
Expand Down Expand Up @@ -917,7 +910,8 @@ Status OlapScanNode::normalize_in_and_eq_predicate(SlotDescriptor* slot,
continue;
}
auto value = const_cast<void*>(iter->get_value());
RETURN_IF_ERROR(insert_value_to_range(temp_range, slot->type().type, value));
RETURN_IF_ERROR(change_fixed_value_range(temp_range, slot->type().type, value,
ColumnValueRange<T>::add_fixed_value_range));
iter->next();
}

Expand All @@ -926,8 +920,7 @@ Status OlapScanNode::normalize_in_and_eq_predicate(SlotDescriptor* slot,
}
range->intersection(temp_range);
} // end of handle in predicate

// 2. Normalize eq conjuncts like 'where col = value'
// 2. Normalize eq conjuncts like 'where col = value'
else if (TExprNodeType::BINARY_PRED == _conjunct_ctxs[conj_idx]->root()->node_type() &&
FILTER_IN == to_olap_filter_type(_conjunct_ctxs[conj_idx]->root()->op(), false)) {
Expr* pred = _conjunct_ctxs[conj_idx]->root();
Expand All @@ -945,7 +938,8 @@ Status OlapScanNode::normalize_in_and_eq_predicate(SlotDescriptor* slot,
auto value = result_pair.second;
// where A = NULL should return empty result set
if (value != nullptr) {
RETURN_IF_ERROR(insert_value_to_range(temp_range, slot->type().type, value));
RETURN_IF_ERROR(change_fixed_value_range(temp_range, slot->type().type, value,
ColumnValueRange<T>::add_fixed_value_range));
}

if (is_key_column(slot->col_name())) {
Expand All @@ -966,6 +960,95 @@ Status OlapScanNode::normalize_in_and_eq_predicate(SlotDescriptor* slot,
return Status::OK();
}

// Construct the ColumnValueRange for one specified column
// It will only handle the NotInPredicate and not eq BinaryPredicate in _conjunct_ctxs.
// It will try to push down conditions of that column as much as possible,
// But if the number of conditions exceeds the limit, none of conditions will be pushed down.
template <class T>
Status OlapScanNode::normalize_not_in_and_not_eq_predicate(SlotDescriptor* slot,
ColumnValueRange<T>* range) {
// If the conjunct of slot is fixed value, will change the fixed value set of column value range
// else add value to not in range and push down predicate directly
bool is_fixed_range = range->is_fixed_value_range();
auto not_in_range = ColumnValueRange<T>::create_empty_column_value_range(range->column_name(), range->type());

std::vector<uint32_t> filter_conjuncts_index;
for (int conj_idx = 0; conj_idx < _conjunct_ctxs.size(); ++conj_idx) {
// 1. Normalize in conjuncts like 'where col not in (v1, v2, v3)'
if (TExprOpcode::FILTER_NOT_IN == _conjunct_ctxs[conj_idx]->root()->op()) {
InPredicate* pred = dynamic_cast<InPredicate*>(_conjunct_ctxs[conj_idx]->root());
if (!should_push_down_in_predicate(slot, pred)) {
continue;
}

// begin to push InPredicate value into ColumnValueRange
auto iter = pred->hybrid_set()->begin();
while (iter->has_next()) {
// column not in (NULL) is always true
if (NULL == iter->get_value()) {
continue;
}
auto value = const_cast<void*>(iter->get_value());
if (is_fixed_range) {
RETURN_IF_ERROR(change_fixed_value_range(*range, slot->type().type, value,
ColumnValueRange<T>::remove_fixed_value_range));
} else {
RETURN_IF_ERROR(change_fixed_value_range(not_in_range, slot->type().type, value,
ColumnValueRange<T>::add_fixed_value_range));
}
iter->next();
}

// only where a in ('a', 'b', NULL) contain NULL will
// clear temp_range to whole range, no need do intersection
if (is_key_column(slot->col_name())) {
filter_conjuncts_index.emplace_back(conj_idx);
}
} // end of handle not in predicate

// 2. Normalize eq conjuncts like 'where col != value'
if (TExprNodeType::BINARY_PRED == _conjunct_ctxs[conj_idx]->root()->node_type() &&
FILTER_NOT_IN == to_olap_filter_type(_conjunct_ctxs[conj_idx]->root()->op(), false)) {
Expr* pred = _conjunct_ctxs[conj_idx]->root();
DCHECK(pred->get_num_children() == 2);

for (int child_idx = 0; child_idx < 2; ++child_idx) {
// TODO: should use C++17 structured bindlings to refactor this code in the future:
// 'auto [should_push_down, value] = should_push_down_eq_predicate(slot, pred, conj_idx, child_idx);'
// make code tidier and readabler
auto result_pair = should_push_down_eq_predicate(slot, pred, conj_idx, child_idx);
if (!result_pair.first) {
continue;
}
auto value = result_pair.second;

if (is_fixed_range) {
RETURN_IF_ERROR(change_fixed_value_range(*range, slot->type().type, value,
ColumnValueRange<T>::remove_fixed_value_range));
} else {
RETURN_IF_ERROR(change_fixed_value_range(not_in_range, slot->type().type, value,
ColumnValueRange<T>::add_fixed_value_range));
}

if (is_key_column(slot->col_name())) {
filter_conjuncts_index.emplace_back(conj_idx);
}
} // end for each binary predicate child
} // end of handling eq binary predicate
}

// exceed limit, no conditions will be pushed down to storage engine.
if (is_fixed_range || not_in_range.get_fixed_value_size() <= _max_pushdown_conditions_per_column) {
if (!is_fixed_range) {
// push down not in condition to storage engine
not_in_range.to_in_condition(_olap_filter, false);
}
std::copy(filter_conjuncts_index.cbegin(), filter_conjuncts_index.cend(),
std::inserter(_pushed_conjuncts_index, _pushed_conjuncts_index.begin()));
}
return Status::OK();
}

template <typename T>
bool OlapScanNode::normalize_is_null_predicate(Expr* expr, SlotDescriptor* slot,
const std::string& is_null_str, ColumnValueRange<T>* range) {
Expand Down
8 changes: 6 additions & 2 deletions be/src/exec/olap_scan_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,9 @@ class OlapScanNode : public ScanNode {
template <class T>
Status normalize_in_and_eq_predicate(SlotDescriptor* slot, ColumnValueRange<T>* range);

template <class T>
Status normalize_not_in_and_not_eq_predicate(SlotDescriptor* slot, ColumnValueRange<T>* range);

template <class T>
Status normalize_noneq_binary_predicate(SlotDescriptor* slot, ColumnValueRange<T>* range);

Expand All @@ -178,8 +181,9 @@ class OlapScanNode : public ScanNode {

std::pair<bool, void*> should_push_down_eq_predicate(SlotDescriptor* slot, Expr* pred, int conj_idx, int child_idx);

template <typename T>
static Status insert_value_to_range(ColumnValueRange<T>& range, PrimitiveType type, void* value);
template <typename T, typename ChangeFixedValueRangeFunc>
static Status change_fixed_value_range(ColumnValueRange <T> &range, PrimitiveType type, void *value,
const ChangeFixedValueRangeFunc& func);

friend class OlapScanner;

Expand Down
5 changes: 2 additions & 3 deletions be/src/olap/olap_cond.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ bool Cond::eval(const std::pair<WrapperField*, WrapperField*>& statistic) const
operand_field->cmp(statistic.second) <= 0;
}
case OP_NE: {
return operand_field->cmp(statistic.first) < 0 || operand_field->cmp(statistic.second) > 0;
return true;
}
case OP_LT: {
return operand_field->cmp(statistic.first) > 0;
Expand All @@ -249,8 +249,7 @@ bool Cond::eval(const std::pair<WrapperField*, WrapperField*>& statistic) const
max_value_field->cmp(statistic.first) >= 0;
}
case OP_NOT_IN: {
return min_value_field->cmp(statistic.second) > 0 ||
max_value_field->cmp(statistic.first) < 0;
return true;
}
case OP_IS: {
if (operand_field->is_null()) {
Expand Down
Loading