Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
176 changes: 88 additions & 88 deletions be/src/vec/exec/format/orc/vorc_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
#include <map>
#include <ostream>
#include <tuple>
#include <utility>

#include "cctz/civil_time.h"
#include "cctz/time_zone.h"
Expand Down Expand Up @@ -568,14 +567,12 @@ std::tuple<bool, orc::Literal> convert_to_orc_literal(const orc::Type* type,

std::tuple<bool, orc::Literal, orc::PredicateDataType> OrcReader::_make_orc_literal(
const VSlotRef* slot_ref, const VLiteral* literal) {
DCHECK(_col_name_to_file_col_name_low_case.contains(slot_ref->expr_name()));
auto file_col_name_low_case = _col_name_to_file_col_name_low_case[slot_ref->expr_name()];
if (!_type_map.contains(file_col_name_low_case)) {
// TODO: this is for acid table
LOG(WARNING) << "Column " << slot_ref->expr_name() << " not found in _type_map";
return std::make_tuple(false, orc::Literal(false), orc::PredicateDataType::LONG);
}
DCHECK(_type_map.contains(file_col_name_low_case));
const auto* orc_type = _type_map[file_col_name_low_case];
if (!TYPEKIND_TO_PREDICATE_TYPE.contains(orc_type->getKind())) {
LOG(WARNING) << "Unsupported Push Down Orc Type [TypeKind=" << orc_type->getKind() << "]";
Expand Down Expand Up @@ -627,37 +624,15 @@ std::tuple<bool, orc::Literal, orc::PredicateDataType> OrcReader::_make_orc_lite
}
}

// check if the slot of expr can be pushed down to orc reader and make orc predicate type
// check if the slot of expr can be pushed down to orc reader
bool OrcReader::_check_slot_can_push_down(const VExprSPtr& expr) {
if (!expr->children()[0]->is_slot_ref()) {
return false;
}
const auto* slot_ref = static_cast<const VSlotRef*>(expr->children()[0].get());
// check if the slot exists in orc file and not partition column
if (!_col_name_to_file_col_name.contains(slot_ref->expr_name()) ||
_lazy_read_ctx.predicate_partition_columns.contains(slot_ref->expr_name())) {
return false;
}
auto [valid, _, predicate_type] = _make_orc_literal(slot_ref, nullptr);
if (valid) {
_vslot_ref_to_orc_predicate_data_type[slot_ref] = predicate_type;
}
return valid;
}

// check if the literal of expr can be pushed down to orc reader and make orc literal
bool OrcReader::_check_literal_can_push_down(const VExprSPtr& expr, uint16_t child_id) {
if (!expr->children()[child_id]->is_literal()) {
return false;
}
// the slot has been checked in _check_slot_can_push_down before calling this function
const auto* slot_ref = static_cast<const VSlotRef*>(expr->children()[0].get());
const auto* literal = static_cast<const VLiteral*>(expr->children()[child_id].get());
auto [valid, orc_literal, _] = _make_orc_literal(slot_ref, literal);
if (valid) {
_vliteral_to_orc_literal.insert(std::make_pair(literal, orc_literal));
}
return valid;
return _col_name_to_file_col_name.contains(slot_ref->expr_name()) &&
!_lazy_read_ctx.predicate_partition_columns.contains(slot_ref->expr_name());
}

// check if there are rest children of expr can be pushed down to orc reader
Expand All @@ -667,7 +642,7 @@ bool OrcReader::_check_rest_children_can_push_down(const VExprSPtr& expr) {
}

for (size_t i = 1; i < expr->children().size(); ++i) {
if (!_check_literal_can_push_down(expr, i)) {
if (!expr->children()[i]->is_literal()) {
return false;
}
}
Expand All @@ -676,10 +651,7 @@ bool OrcReader::_check_rest_children_can_push_down(const VExprSPtr& expr) {

// check if the expr can be pushed down to orc reader
bool OrcReader::_check_expr_can_push_down(const VExprSPtr& expr) {
if (expr == nullptr) {
return false;
}

DCHECK(expr != nullptr);
switch (expr->op()) {
case TExprOpcode::COMPOUND_AND:
// at least one child can be pushed down
Expand Down Expand Up @@ -721,167 +693,198 @@ bool OrcReader::_check_expr_can_push_down(const VExprSPtr& expr) {
}
}

void OrcReader::_build_less_than(const VExprSPtr& expr,
bool OrcReader::_build_less_than(const VExprSPtr& expr,
std::unique_ptr<orc::SearchArgumentBuilder>& builder) {
DCHECK(expr->children().size() == 2);
DCHECK(expr->children()[0]->is_slot_ref());
DCHECK(expr->children()[1]->is_literal());
const auto* slot_ref = static_cast<const VSlotRef*>(expr->children()[0].get());
const auto* literal = static_cast<const VLiteral*>(expr->children()[1].get());
DCHECK(_vslot_ref_to_orc_predicate_data_type.contains(slot_ref));
auto predicate_type = _vslot_ref_to_orc_predicate_data_type[slot_ref];
DCHECK(_vliteral_to_orc_literal.contains(literal));
auto orc_literal = _vliteral_to_orc_literal.find(literal)->second;
auto [valid, orc_literal, predicate_type] = _make_orc_literal(slot_ref, literal);
if (!valid) {
return false;
}
builder->lessThan(slot_ref->expr_name(), predicate_type, orc_literal);
return true;
}

void OrcReader::_build_less_than_equals(const VExprSPtr& expr,
bool OrcReader::_build_less_than_equals(const VExprSPtr& expr,
std::unique_ptr<orc::SearchArgumentBuilder>& builder) {
DCHECK(expr->children().size() == 2);
DCHECK(expr->children()[0]->is_slot_ref());
DCHECK(expr->children()[1]->is_literal());
const auto* slot_ref = static_cast<const VSlotRef*>(expr->children()[0].get());
const auto* literal = static_cast<const VLiteral*>(expr->children()[1].get());
DCHECK(_vslot_ref_to_orc_predicate_data_type.contains(slot_ref));
auto predicate_type = _vslot_ref_to_orc_predicate_data_type[slot_ref];
DCHECK(_vliteral_to_orc_literal.contains(literal));
auto orc_literal = _vliteral_to_orc_literal.find(literal)->second;
auto [valid, orc_literal, predicate_type] = _make_orc_literal(slot_ref, literal);
if (!valid) {
return false;
}
builder->lessThanEquals(slot_ref->expr_name(), predicate_type, orc_literal);
return true;
}

void OrcReader::_build_equals(const VExprSPtr& expr,
bool OrcReader::_build_equals(const VExprSPtr& expr,
std::unique_ptr<orc::SearchArgumentBuilder>& builder) {
DCHECK(expr->children().size() == 2);
DCHECK(expr->children()[0]->is_slot_ref());
DCHECK(expr->children()[1]->is_literal());
const auto* slot_ref = static_cast<const VSlotRef*>(expr->children()[0].get());
const auto* literal = static_cast<const VLiteral*>(expr->children()[1].get());
DCHECK(_vslot_ref_to_orc_predicate_data_type.contains(slot_ref));
auto predicate_type = _vslot_ref_to_orc_predicate_data_type[slot_ref];
DCHECK(_vliteral_to_orc_literal.contains(literal));
auto orc_literal = _vliteral_to_orc_literal.find(literal)->second;
auto [valid, orc_literal, predicate_type] = _make_orc_literal(slot_ref, literal);
if (!valid) {
return false;
}
builder->equals(slot_ref->expr_name(), predicate_type, orc_literal);
return true;
}

void OrcReader::_build_filter_in(const VExprSPtr& expr,
bool OrcReader::_build_filter_in(const VExprSPtr& expr,
std::unique_ptr<orc::SearchArgumentBuilder>& builder) {
DCHECK(expr->children().size() >= 2);
DCHECK(expr->children()[0]->is_slot_ref());
const auto* slot_ref = static_cast<const VSlotRef*>(expr->children()[0].get());
std::vector<orc::Literal> literals;
DCHECK(_vslot_ref_to_orc_predicate_data_type.contains(slot_ref));
orc::PredicateDataType predicate_type = _vslot_ref_to_orc_predicate_data_type[slot_ref];
orc::PredicateDataType predicate_type = orc::PredicateDataType::LONG;
for (size_t i = 1; i < expr->children().size(); ++i) {
DCHECK(expr->children()[i]->is_literal());
const auto* literal = static_cast<const VLiteral*>(expr->children()[i].get());
DCHECK(_vliteral_to_orc_literal.contains(literal));
auto orc_literal = _vliteral_to_orc_literal.find(literal)->second;
auto [valid, orc_literal, type] = _make_orc_literal(slot_ref, literal);
if (!valid) {
return false;
}
literals.emplace_back(orc_literal);
predicate_type = type;
}
DCHECK(!literals.empty());
builder->in(slot_ref->expr_name(), predicate_type, literals);
return true;
}

void OrcReader::_build_is_null(const VExprSPtr& expr,
bool OrcReader::_build_is_null(const VExprSPtr& expr,
std::unique_ptr<orc::SearchArgumentBuilder>& builder) {
DCHECK(expr->children().size() == 1);
DCHECK(expr->children()[0]->is_slot_ref());
const auto* slot_ref = static_cast<const VSlotRef*>(expr->children()[0].get());
DCHECK(_vslot_ref_to_orc_predicate_data_type.contains(slot_ref));
auto predicate_type = _vslot_ref_to_orc_predicate_data_type[slot_ref];
auto [valid, _, predicate_type] = _make_orc_literal(slot_ref, nullptr);
builder->isNull(slot_ref->expr_name(), predicate_type);
return true;
}

bool OrcReader::_build_search_argument(const VExprSPtr& expr,
std::unique_ptr<orc::SearchArgumentBuilder>& builder) {
// OPTIMIZE: check expr only once
if (expr == nullptr) {
return false;
}

// if expr can not be pushed down, skip it and continue to next expr
if (!_check_expr_can_push_down(expr)) {
return false;
}

switch (expr->op()) {
case TExprOpcode::COMPOUND_AND: {
builder->startAnd();
bool at_least_one_can_push_down = false;
builder->startAnd();
for (const auto& child : expr->children()) {
if (_build_search_argument(child, builder)) {
at_least_one_can_push_down = true;
}
}
DCHECK(at_least_one_can_push_down);
if (!at_least_one_can_push_down) {
// if all exprs can not be pushed down, builder->end() will throw exception
return false;
}
builder->end();
break;
}
case TExprOpcode::COMPOUND_OR: {
case TExprOpcode::COMPOUND_OR:
builder->startOr();
bool all_can_push_down = true;
for (const auto& child : expr->children()) {
if (!_build_search_argument(child, builder)) {
all_can_push_down = false;
return false;
}
}
DCHECK(all_can_push_down);
builder->end();
break;
}
case TExprOpcode::COMPOUND_NOT: {
DCHECK_EQ(expr->children().size(), 1);
case TExprOpcode::COMPOUND_NOT:
builder->startNot();
auto res = _build_search_argument(expr->children()[0], builder);
DCHECK(res);
DCHECK_EQ(expr->children().size(), 1);
if (!_build_search_argument(expr->children()[0], builder)) {
return false;
}
builder->end();
break;
}
case TExprOpcode::GE:
builder->startNot();
_build_less_than(expr, builder);
if (!_build_less_than(expr, builder)) {
return false;
}
builder->end();
break;
case TExprOpcode::GT:
builder->startNot();
_build_less_than_equals(expr, builder);
if (!_build_less_than_equals(expr, builder)) {
return false;
}
builder->end();
break;
case TExprOpcode::LE:
_build_less_than_equals(expr, builder);
if (!_build_less_than_equals(expr, builder)) {
return false;
}
break;
case TExprOpcode::LT:
_build_less_than(expr, builder);
if (!_build_less_than(expr, builder)) {
return false;
}
break;
case TExprOpcode::EQ:
_build_equals(expr, builder);
if (!_build_equals(expr, builder)) {
return false;
}
break;
case TExprOpcode::NE:
builder->startNot();
_build_equals(expr, builder);
if (!_build_equals(expr, builder)) {
return false;
}
builder->end();
break;
case TExprOpcode::FILTER_IN:
_build_filter_in(expr, builder);
if (!_build_filter_in(expr, builder)) {
return false;
}
break;
case TExprOpcode::FILTER_NOT_IN:
builder->startNot();
_build_filter_in(expr, builder);
if (!_build_filter_in(expr, builder)) {
return false;
}
builder->end();
break;
// is null and is not null is represented as function call
case TExprOpcode::INVALID_OPCODE:
case TExprOpcode::INVALID_OPCODE: {
DCHECK(expr->node_type() == TExprNodeType::FUNCTION_CALL);
if (expr->fn().name.function_name == "is_null_pred") {
_build_is_null(expr, builder);
if (!_build_is_null(expr, builder)) {
return false;
}
} else if (expr->fn().name.function_name == "is_not_null_pred") {
builder->startNot();
_build_is_null(expr, builder);
if (!_build_is_null(expr, builder)) {
return false;
}
builder->end();
} else {
// should not reach here, because _check_expr_can_push_down has already checked
__builtin_unreachable();
}
break;

default:
}
default: {
// should not reach here, because _check_expr_can_push_down has already checked
__builtin_unreachable();
}
}
return true;
}

Expand All @@ -895,8 +898,6 @@ bool OrcReader::_init_search_argument(const VExprContextSPtrs& conjuncts) {
bool at_least_one_can_push_down = false;
builder->startAnd();
for (const auto& expr_ctx : conjuncts) {
_vslot_ref_to_orc_predicate_data_type.clear();
_vliteral_to_orc_literal.clear();
if (_build_search_argument(expr_ctx->root(), builder)) {
at_least_one_can_push_down = true;
}
Expand Down Expand Up @@ -942,7 +943,7 @@ Status OrcReader::set_fill_columns(
visit_slot(child.get());
}
} else if (VInPredicate* in_predicate = typeid_cast<VInPredicate*>(filter_impl)) {
if (!in_predicate->children().empty()) {
if (in_predicate->children().size() > 0) {
visit_slot(in_predicate->children()[0].get());
}
} else {
Expand Down Expand Up @@ -1178,8 +1179,7 @@ Status OrcReader::_fill_partition_columns(
if (num_deserialized != rows) {
return Status::InternalError(
"Failed to fill partition column: {}={} ."
"Number of rows expected to be written : {}, number of rows actually "
"written : "
"Number of rows expected to be written : {}, number of rows actually written : "
"{}",
slot_desc->col_name(), value, num_deserialized, rows);
}
Expand Down
Loading
Loading