Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions be/src/common/daemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
#include "exprs/new_in_predicate.h"
#include "exprs/operators.h"
#include "exprs/string_functions.h"
#include "exprs/table_function/dummy_table_functions.h"
#include "exprs/time_operators.h"
#include "exprs/timestamp_functions.h"
#include "exprs/topn_function.h"
Expand Down Expand Up @@ -264,6 +265,7 @@ void Daemon::init(int argc, char** argv, const std::vector<StorePath>& paths) {
HllFunctions::init();
HashFunctions::init();
TopNFunctions::init();
DummyTableFunctions::init();

LOG(INFO) << CpuInfo::debug_string();
LOG(INFO) << DiskInfo::debug_string();
Expand Down
1 change: 1 addition & 0 deletions be/src/exec/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ set(EXEC_FILES
plain_text_line_reader.cpp
csv_scan_node.cpp
csv_scanner.cpp
table_function_node.cpp
es_scan_node.cpp
es_http_scan_node.cpp
es_http_scanner.cpp
Expand Down
5 changes: 5 additions & 0 deletions be/src/exec/exec_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
#include "exec/schema_scan_node.h"
#include "exec/select_node.h"
#include "exec/spill_sort_node.h"
#include "exec/table_function_node.h"
#include "exec/topn_node.h"
#include "exec/union_node.h"
#include "exprs/expr_context.h"
Expand Down Expand Up @@ -472,6 +473,10 @@ Status ExecNode::create_node(RuntimeState* state, ObjectPool* pool, const TPlanN
*node = pool->add(new AssertNumRowsNode(pool, tnode, descs));
return Status::OK();

case TPlanNodeType::TABLE_FUNCTION_NODE:
*node = pool->add(new TableFunctionNode(pool, tnode, descs));
return Status::OK();

default:
map<int, const char*>::const_iterator i =
_TPlanNodeType_VALUES_TO_NAMES.find(tnode.node_type);
Expand Down
340 changes: 340 additions & 0 deletions be/src/exec/table_function_node.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,340 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "exec/table_function_node.h"

#include "exprs/expr.h"
#include "exprs/expr_context.h"
#include "runtime/descriptors.h"
#include "runtime/raw_value.h"
#include "runtime/row_batch.h"
#include "runtime/runtime_state.h"
#include "runtime/tuple_row.h"
#include "exprs/table_function/table_function_factory.h"

namespace doris {

TableFunctionNode::TableFunctionNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs)
: ExecNode(pool, tnode, descs) {

}

TableFunctionNode::~TableFunctionNode() {

}

Status TableFunctionNode::init(const TPlanNode& tnode, RuntimeState* state) {
RETURN_IF_ERROR(ExecNode::init(tnode, state));

for (const TExpr& texpr : tnode.table_function_node.fnCallExprList) {
ExprContext* ctx = nullptr;
RETURN_IF_ERROR(Expr::create_expr_tree(_pool, texpr, &ctx));
_fn_ctxs.push_back(ctx);

Expr* root = ctx->root();
const std::string& tf_name = root->fn().name.function_name;
TableFunction* fn;
RETURN_IF_ERROR(TableFunctionFactory::get_fn(tf_name, _pool, &fn));
fn->set_expr_context(ctx);
_fns.push_back(fn);
}
_fn_num = _fns.size();
_fn_values.resize(_fn_num);

// Prepare output slot ids
RETURN_IF_ERROR(_prepare_output_slot_ids(tnode));
return Status::OK();
}

Status TableFunctionNode::_prepare_output_slot_ids(const TPlanNode& tnode) {
// Prepare output slot ids
if (tnode.table_function_node.outputSlotIds.empty()) {
return Status::InternalError("Output slots of table function node is empty");
}
SlotId max_id = -1;
for (auto slot_id : tnode.table_function_node.outputSlotIds) {
if (slot_id > max_id) {
max_id = slot_id;
}
}
_output_slot_ids = std::vector<bool>(max_id + 1, false);
for (auto slot_id : tnode.table_function_node.outputSlotIds) {
_output_slot_ids[slot_id] = true;
}

return Status::OK();
}

Status TableFunctionNode::prepare(RuntimeState* state) {
RETURN_IF_ERROR(ExecNode::prepare(state));

RETURN_IF_ERROR(Expr::prepare(_fn_ctxs, state, _row_descriptor, expr_mem_tracker()));
for (auto fn : _fns) {
RETURN_IF_ERROR(fn->prepare());
}
return Status::OK();
}

Status TableFunctionNode::open(RuntimeState* state) {
SCOPED_TIMER(_runtime_profile->total_time_counter());
RETURN_IF_CANCELLED(state);
RETURN_IF_ERROR(ExecNode::open(state));

RETURN_IF_ERROR(Expr::open(_fn_ctxs, state));
for (auto fn : _fns) {
RETURN_IF_ERROR(fn->open());
}

RETURN_IF_ERROR(_children[0]->open(state));
return Status::OK();
}

Status TableFunctionNode::_process_next_child_row() {
if (_cur_child_offset == _cur_child_batch->num_rows()) {
_child_batch_exhausted = true;
return Status::OK();
}
_cur_child_tuple_row = _cur_child_batch->get_row(_cur_child_offset++);
for (TableFunction* fn : _fns) {
RETURN_IF_ERROR(fn->process(_cur_child_tuple_row));
}

_child_batch_exhausted = false;
return Status::OK();
}

// Returns the index of fn of the last eos counted from back to front
// eg: there are 3 functions in `_fns`
// eos: false, true, true
// return: 1
//
// eos: false, false, true
// return: 2
//
// eos: false, false, false
// return: -1
//
// eos: true, true, true
// return: 0
//
// return:
// 0: all fns are eos
// -1: all fns are not eos
// >0: some of fns are eos
int TableFunctionNode::_find_last_fn_eos_idx() {
for (int i = _fn_num - 1; i >=0; --i) {
if (!_fns[i]->eos()) {
if (i == _fn_num - 1) {
return -1;
} else {
return i + 1;
}
}
}
// all eos
return 0;
}

// Roll to reset the table function.
// Eg:
// There are 3 functions f1, f2 and f3 in `_fns`.
// If `last_eos_idx` is 1, which means f2 and f3 are eos.
// So we need to forward f1, and reset f2 and f3.
bool TableFunctionNode::_roll_table_functions(int last_eos_idx) {
bool fn_eos = false;
int i = last_eos_idx - 1;
for (; i >= 0; --i) {
_fns[i]->forward(&fn_eos);
if (!fn_eos) {
break;
}
}
if (i == -1) {
// after forward, all functions are eos.
// we should process next child row to get more table function results.
return false;
}

for (int j = i + 1; j < _fn_num; ++j) {
_fns[j]->reset();
}

return true;
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add examples of real data above two while(true)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

// There are 2 while loops in this method.
// The outer loop is to get the next batch from child node.
// And the inner loop is to expand the row by table functions, and output row by row.
Status TableFunctionNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) {
RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::GETNEXT));
SCOPED_TIMER(_runtime_profile->total_time_counter());

const RowDescriptor& parent_rowdesc = row_batch->row_desc();
const RowDescriptor& child_rowdesc = _children[0]->row_desc();
if (_parent_tuple_desc_size == -1) {
_parent_tuple_desc_size = parent_rowdesc.tuple_descriptors().size();
_child_tuple_desc_size = child_rowdesc.tuple_descriptors().size();
for (int i = 0; i < _child_tuple_desc_size; ++i) {
_child_slot_sizes.push_back(child_rowdesc.tuple_descriptors()[i]->slots().size());
}
}

uint8_t* tuple_buffer = nullptr;
Tuple* tuple_ptr = nullptr;
Tuple* pre_tuple_ptr = nullptr;
int row_idx = 0;

while (true) {
RETURN_IF_CANCELLED(state);
RETURN_IF_ERROR(state->check_query_state("TableFunctionNode, while getting next batch."));

if (_cur_child_batch == nullptr) {
_cur_child_batch.reset(new RowBatch(child_rowdesc, state->batch_size(), mem_tracker().get()));
}
if (_child_batch_exhausted) {
// current child batch is exhausted, get next batch from child
RETURN_IF_ERROR(_children[0]->get_next(state, _cur_child_batch.get(), eos));
if (*eos) {
break;
}
_cur_child_offset = 0;
RETURN_IF_ERROR(_process_next_child_row());
if (_child_batch_exhausted) {
_cur_child_batch->reset();
continue;
}
}

while (true) {
int idx = _find_last_fn_eos_idx();
if (idx == 0) {
// all table functions' results are exhausted, process next child row
RETURN_IF_ERROR(_process_next_child_row());
if (_child_batch_exhausted) {
break;
}
} else if (idx < _fn_num && idx != -1) {
// some of table functions' results are exhausted
if (!_roll_table_functions(idx)) {
// continue to process next child row
continue;
}
}

// get slots from every table function
// Notice that _fn_values[i] may be null if the table function has empty result set.
for (int i = 0; i < _fn_num; i++) {
RETURN_IF_ERROR(_fns[i]->get_value(&_fn_values[i]));
}

// allocate memory for row batch for the first time
if (tuple_buffer == nullptr) {
int64_t tuple_buffer_size;
RETURN_IF_ERROR(
row_batch->resize_and_allocate_tuple_buffer(state, &tuple_buffer_size, &tuple_buffer));
tuple_ptr = reinterpret_cast<Tuple*>(tuple_buffer);
}

pre_tuple_ptr = tuple_ptr;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logic of constructing a new line can be abstracted separately into a function

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried. But it has to pass a lot param to that new method, which seems not necessary.

// The tuples order in parent row batch should be
// child1, child2, tf1, tf2, ...
TupleRow* parent_tuple_row = row_batch->get_row(row_idx++);
// 1. copy child tuples
int tuple_idx = 0;
for (int i = 0; i < _child_tuple_desc_size; tuple_idx++, i++) {
TupleDescriptor* child_tuple_desc = child_rowdesc.tuple_descriptors()[tuple_idx];
TupleDescriptor* parent_tuple_desc = parent_rowdesc.tuple_descriptors()[tuple_idx];

for (int j = 0; j < _child_slot_sizes[i]; ++j) {
SlotDescriptor* child_slot_desc = child_tuple_desc->slots()[j];
SlotDescriptor* parent_slot_desc = parent_tuple_desc->slots()[j];

if (_output_slot_ids[parent_slot_desc->id()]) {
Tuple* child_tuple = _cur_child_tuple_row->get_tuple(child_rowdesc.get_tuple_idx(child_tuple_desc->id()));
void* dest_slot = tuple_ptr->get_slot(parent_slot_desc->tuple_offset());
RawValue::write(child_tuple->get_slot(child_slot_desc->tuple_offset()), dest_slot, parent_slot_desc->type(), row_batch->tuple_data_pool());
tuple_ptr->set_not_null(parent_slot_desc->null_indicator_offset());
} else {
tuple_ptr->set_null(parent_slot_desc->null_indicator_offset());
}
}
parent_tuple_row->set_tuple(tuple_idx, tuple_ptr);
tuple_ptr = reinterpret_cast<Tuple*>(reinterpret_cast<uint8_t*>(tuple_ptr) + parent_tuple_desc->byte_size());
}

// 2. copy funtion result
for (int i = 0; tuple_idx < _parent_tuple_desc_size; tuple_idx++, i++) {
TupleDescriptor* parent_tuple_desc = parent_rowdesc.tuple_descriptors()[tuple_idx];
SlotDescriptor* parent_slot_desc = parent_tuple_desc->slots()[0];
void* dest_slot = tuple_ptr->get_slot(parent_slot_desc->tuple_offset());
// if (_fn_values[i] != nullptr && _output_slot_ids[parent_slot_desc->id()]) {
if (_fn_values[i] != nullptr) {
RawValue::write(_fn_values[i], dest_slot, parent_slot_desc->type(), row_batch->tuple_data_pool());
tuple_ptr->set_not_null(parent_slot_desc->null_indicator_offset());
} else {
tuple_ptr->set_null(parent_slot_desc->null_indicator_offset());
}
parent_tuple_row->set_tuple(tuple_idx, tuple_ptr);

tuple_ptr = reinterpret_cast<Tuple*>(reinterpret_cast<uint8_t*>(tuple_ptr) + parent_tuple_desc->byte_size());
}

// 3. eval conjuncts
if (eval_conjuncts(&_conjunct_ctxs[0], _conjunct_ctxs.size(), parent_tuple_row)) {
row_batch->commit_last_row();
++_num_rows_returned;
} else {
tuple_ptr = pre_tuple_ptr;
}

// Forward after write success.
// Because data in `_fn_values` points to the data saved in functions.
// And `forward` will change the data in functions.
bool tmp;
_fns[_fn_num - 1]->forward(&tmp);

if (row_batch->at_capacity()) {
*eos = false;
break;
}
}

if (row_batch->at_capacity()) {
break;
}
}
if (reached_limit()) {
int num_rows_over = _num_rows_returned - _limit;
row_batch->set_num_rows(row_batch->num_rows() - num_rows_over);
_num_rows_returned -= num_rows_over;
COUNTER_SET(_rows_returned_counter, _num_rows_returned);
*eos = true;
}

return Status::OK();
}

Status TableFunctionNode::close(RuntimeState* state) {
if (is_closed()) {
return Status::OK();
}
RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::CLOSE));
Expr::close(_fn_ctxs, state);
return ExecNode::close(state);
}

}; // namespace doris
Loading