Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 13 additions & 15 deletions be/src/exec/base_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,11 +130,13 @@ Status BaseScanner::init_expr_ctxes() {
// preceding filter expr should be initialized by using `_row_desc`, which is the source row descriptor
if (!_pre_filter_texprs.empty()) {
if (_state->enable_vectorized_exec()) {
RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(
_state->obj_pool(), _pre_filter_texprs, &_vpre_filter_ctxs));
RETURN_IF_ERROR(vectorized::VExpr::prepare(_vpre_filter_ctxs, _state, *_row_desc,
_mem_tracker));
RETURN_IF_ERROR(vectorized::VExpr::open(_vpre_filter_ctxs, _state));
// for vectorized, preceding filter exprs should be compounded to one passed from fe.
DCHECK(_pre_filter_texprs.size() == 1);
_vpre_filter_ctx_ptr.reset(new doris::vectorized::VExprContext*);
RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(
_state->obj_pool(), _pre_filter_texprs[0], _vpre_filter_ctx_ptr.get()));
RETURN_IF_ERROR((*_vpre_filter_ctx_ptr)->prepare(_state, *_row_desc, _mem_tracker));
RETURN_IF_ERROR((*_vpre_filter_ctx_ptr)->open(_state));
} else {
RETURN_IF_ERROR(Expr::create_expr_trees(_state->obj_pool(), _pre_filter_texprs,
&_pre_filter_ctxs));
Expand Down Expand Up @@ -302,14 +304,10 @@ Status BaseScanner::_fill_dest_tuple(Tuple* dest_tuple, MemPool* mem_pool) {
Status BaseScanner::_filter_src_block() {
auto origin_column_num = _src_block.columns();
// filter block
if (!_vpre_filter_ctxs.empty()) {
for (auto _vpre_filter_ctx : _vpre_filter_ctxs) {
auto old_rows = _src_block.rows();
RETURN_IF_ERROR(vectorized::VExprContext::filter_block(_vpre_filter_ctx, &_src_block,
origin_column_num));
_counter->num_rows_unselected += old_rows - _src_block.rows();
}
}
auto old_rows = _src_block.rows();
RETURN_IF_ERROR(vectorized::VExprContext::filter_block(_vpre_filter_ctx_ptr, &_src_block,
origin_column_num));
_counter->num_rows_unselected += old_rows - _src_block.rows();
return Status::OK();
}

Expand Down Expand Up @@ -453,8 +451,8 @@ void BaseScanner::close() {
Expr::close(_pre_filter_ctxs, _state);
}

if (_state->enable_vectorized_exec() && !_vpre_filter_ctxs.empty()) {
vectorized::VExpr::close(_vpre_filter_ctxs, _state);
if (_vpre_filter_ctx_ptr) {
(*_vpre_filter_ctx_ptr)->close(_state);
}
}

Expand Down
4 changes: 2 additions & 2 deletions be/src/exec/base_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class BaseScanner {
if (_state->enable_vectorized_exec()) {
vectorized::VExpr::close(_dest_vexpr_ctx, _state);
}
};
}

virtual Status init_expr_ctxes();
// Open this scanner, will initialize information need to
Expand Down Expand Up @@ -138,7 +138,7 @@ class BaseScanner {

// for vectorized load
std::vector<vectorized::VExprContext*> _dest_vexpr_ctx;
std::vector<vectorized::VExprContext*> _vpre_filter_ctxs;
std::unique_ptr<vectorized::VExprContext*> _vpre_filter_ctx_ptr;
vectorized::Block _src_block;
int _num_of_columns_from_file;

Expand Down
103 changes: 99 additions & 4 deletions be/test/vec/exec/vbroker_scanner_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,6 @@ TEST_F(VBrokerScannerTest, normal) {
range.file_type = TFileType::FILE_LOCAL;
range.format_type = TFileFormatType::FORMAT_CSV_PLAIN;
ranges.push_back(range);

VBrokerScanner scanner(&_runtime_state, _profile, _params, ranges, _addresses, _pre_filter,
&_counter);
auto st = scanner.open();
Expand All @@ -376,7 +375,6 @@ TEST_F(VBrokerScannerTest, normal) {
ASSERT_TRUE(eof);
auto columns = block->get_columns();
ASSERT_EQ(columns.size(), 3);

ASSERT_EQ(columns[0]->get_int(0), 1);
ASSERT_EQ(columns[0]->get_int(1), 4);
ASSERT_EQ(columns[0]->get_int(2), 8);
Expand All @@ -390,6 +388,105 @@ TEST_F(VBrokerScannerTest, normal) {
ASSERT_EQ(columns[2]->get_int(2), 10);
}

TEST_F(VBrokerScannerTest, normal_with_pre_filter) {
std::vector<TBrokerRangeDesc> ranges;
TBrokerRangeDesc range;
range.path = "./be/test/exec/test_data/broker_scanner/normal.csv";
range.start_offset = 0;
range.size = -1;
range.splittable = true;
range.file_type = TFileType::FILE_LOCAL;
range.format_type = TFileFormatType::FORMAT_CSV_PLAIN;
ranges.push_back(range);

// init pre_filter expr: k1 < '8'
TTypeDesc int_type;
{
TTypeNode node;
node.__set_type(TTypeNodeType::SCALAR);
TScalarType scalar_type;
scalar_type.__set_type(TPrimitiveType::INT);
node.__set_scalar_type(scalar_type);
int_type.types.push_back(node);
}
TTypeDesc varchar_type;
{
TTypeNode node;
node.__set_type(TTypeNodeType::SCALAR);
TScalarType scalar_type;
scalar_type.__set_type(TPrimitiveType::VARCHAR);
scalar_type.__set_len(5000);
node.__set_scalar_type(scalar_type);
varchar_type.types.push_back(node);
}

TExpr filter_expr;
{
TExprNode expr_node;
expr_node.__set_node_type(TExprNodeType::BINARY_PRED);
expr_node.type = gen_type_desc(TPrimitiveType::BOOLEAN);
expr_node.__set_num_children(2);
expr_node.__isset.opcode = true;
expr_node.__set_opcode(TExprOpcode::LT);
expr_node.__isset.vector_opcode = true;
expr_node.__set_vector_opcode(TExprOpcode::LT);
expr_node.__isset.fn = true;
expr_node.fn.name.function_name = "lt";
expr_node.fn.binary_type = TFunctionBinaryType::BUILTIN;
expr_node.fn.ret_type = int_type;
expr_node.fn.has_var_args = false;
filter_expr.nodes.push_back(expr_node);
}
{
TExprNode expr_node;
expr_node.__set_node_type(TExprNodeType::SLOT_REF);
expr_node.type = varchar_type;
expr_node.__set_num_children(0);
expr_node.__isset.slot_ref = true;
TSlotRef slot_ref;
slot_ref.__set_slot_id(4);
slot_ref.__set_tuple_id(1);
expr_node.__set_slot_ref(slot_ref);
expr_node.__isset.output_column = true;
expr_node.__set_output_column(0);
filter_expr.nodes.push_back(expr_node);
}
{
TExprNode expr_node;
expr_node.__set_node_type(TExprNodeType::STRING_LITERAL);
expr_node.type = varchar_type;
expr_node.__set_num_children(0);
expr_node.__isset.string_literal = true;
TStringLiteral string_literal;
string_literal.__set_value("8");
expr_node.__set_string_literal(string_literal);
filter_expr.nodes.push_back(expr_node);
}
_pre_filter.push_back(filter_expr);
VBrokerScanner scanner(&_runtime_state, _profile, _params, ranges, _addresses, _pre_filter,
&_counter);
auto st = scanner.open();
ASSERT_TRUE(st.ok());

std::unique_ptr<vectorized::Block> block(new vectorized::Block());
bool eof = false;
// end of file
st = scanner.get_next(block.get(), &eof);
ASSERT_TRUE(st.ok());
ASSERT_TRUE(eof);
auto columns = block->get_columns();
ASSERT_EQ(columns.size(), 3);

ASSERT_EQ(columns[0]->get_int(0), 1);
ASSERT_EQ(columns[0]->get_int(1), 4);

ASSERT_EQ(columns[1]->get_int(0), 2);
ASSERT_EQ(columns[1]->get_int(1), 5);

ASSERT_EQ(columns[2]->get_int(0), 3);
ASSERT_EQ(columns[2]->get_int(1), 6);
}

TEST_F(VBrokerScannerTest, normal2) {
std::vector<TBrokerRangeDesc> ranges;

Expand All @@ -406,7 +503,6 @@ TEST_F(VBrokerScannerTest, normal2) {
range.start_offset = 0;
range.size = 4;
ranges.push_back(range);

VBrokerScanner scanner(&_runtime_state, _profile, _params, ranges, _addresses, _pre_filter,
&_counter);
auto st = scanner.open();
Expand Down Expand Up @@ -440,7 +536,6 @@ TEST_F(VBrokerScannerTest, normal5) {
range.file_type = TFileType::FILE_LOCAL;
range.format_type = TFileFormatType::FORMAT_CSV_PLAIN;
ranges.push_back(range);

VBrokerScanner scanner(&_runtime_state, _profile, _params, ranges, _addresses, _pre_filter,
&_counter);
auto st = scanner.open();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.UserException;
import org.apache.doris.load.loadv2.LoadTask;
import org.apache.doris.rewrite.ExprRewriter;
Expand Down Expand Up @@ -213,8 +214,12 @@ protected void toThrift(TPlanNode planNode) {
planNode.setNodeType(TPlanNodeType.BROKER_SCAN_NODE);
TBrokerScanNode brokerScanNode = new TBrokerScanNode(desc.getId().asInt());
if (!preFilterConjuncts.isEmpty()) {
for (Expr e : preFilterConjuncts) {
brokerScanNode.addToPreFilterExprs(e.treeToThrift());
if (Config.enable_vectorized_load && vpreFilterConjunct != null) {
brokerScanNode.addToPreFilterExprs(vpreFilterConjunct.treeToThrift());
} else {
for (Expr e : preFilterConjuncts) {
brokerScanNode.addToPreFilterExprs(e.treeToThrift());
}
}
}
planNode.setBrokerScanNode(brokerScanNode);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ abstract public class PlanNode extends TreeNode<PlanNode> {
// 4. Filter data by using "conjuncts".
protected List<Expr> preFilterConjuncts = Lists.newArrayList();

protected Expr vpreFilterConjunct = null;

// Fragment that this PlanNode is executed in. Valid only after this PlanNode has been
// assigned to a fragment. Set and maintained by enclosing PlanFragment.
protected PlanFragment fragment;
Expand Down Expand Up @@ -904,6 +906,11 @@ public void convertToVectoriezd() {
initCompoundPredicate(vconjunct);
}

if (!preFilterConjuncts.isEmpty()) {
vpreFilterConjunct = convertConjunctsToAndCompoundPredicate(preFilterConjuncts);
initCompoundPredicate(vpreFilterConjunct);
}

for (PlanNode child : children) {
child.convertToVectoriezd();
}
Expand Down
2 changes: 1 addition & 1 deletion gensrc/thrift/PlanNodes.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ struct TBrokerScanNode {
// Partition info used to process partition select in broker load
2: optional list<Exprs.TExpr> partition_exprs
3: optional list<Partitions.TRangePartition> partition_infos
4: optional list<Exprs.TExpr> pre_filter_exprs
4: optional list<Exprs.TExpr> pre_filter_exprs
}

struct TEsScanNode {
Expand Down