Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 13 additions & 11 deletions be/src/exec/es_http_scan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@
#include "runtime/dpp_sink_internal.h"
#include "service/backend_options.h"
#include "util/runtime_profile.h"
#include "exec/es_scan_reader.h"
#include "util/es_scan_reader.h"
#include "util/es_scroll_query.h"
#include "exec/es_predicate.h"

namespace doris {
Expand All @@ -38,7 +39,6 @@ EsHttpScanNode::EsHttpScanNode(
_tuple_id(tnode.es_scan_node.tuple_id),
_runtime_state(nullptr),
_tuple_desc(nullptr),
_query_builder(nullptr),
_num_running_scanners(0),
_scan_finished(false),
_eos(false),
Expand Down Expand Up @@ -79,11 +79,11 @@ Status EsHttpScanNode::prepare(RuntimeState* state) {
return Status::OK;
}

void EsHttpScanNode::build_predicates() {
void EsHttpScanNode::build_conjuncts_list() {
for (int i = 0; i < _conjunct_ctxs.size(); ++i) {
std::shared_ptr<EsPredicate> predicate(
new EsPredicate(_conjunct_ctxs[i], _tuple_desc));
if (predicate->build_disjuncts()) {
if (predicate->build_disjuncts_list()) {
_predicates.push_back(predicate);
_predicate_to_conjunct.push_back(i);
}
Expand All @@ -106,7 +106,7 @@ Status EsHttpScanNode::open(RuntimeState* state) {
}
}

build_predicates();
build_conjuncts_list();

RETURN_IF_ERROR(start_scanners());

Expand Down Expand Up @@ -368,14 +368,16 @@ void EsHttpScanNode::scanner_worker(int start_idx, int length) {
const TEsScanRange& es_scan_range =
_scan_ranges[start_idx + i].scan_range.es_scan_range;

_properties[EsScanReader::INDEX] = es_scan_range.index;
_properties[ESScanReader::KEY_INDEX] = es_scan_range.index;
if (es_scan_range.__isset.type) {
_properties[EsScanReader::TYPE] = es_scan_range.type;
_properties[ESScanReader::KEY_TYPE] = es_scan_range.type;
}
_properties[EsScanReader::SHARD_ID] = std::to_string(es_scan_range.shard_id);
_properties[EsScanReader::BATCH_SIZE] = std::to_string(_runtime_state->batch_size());
_properties[EsScanReader::HOST] = get_host_port(es_scan_range.es_hosts);
_properties[EsScanReader::QUERY] = EsQueryBuilder::build(_properties, _column_names, _predicates);

_properties[ESScanReader::KEY_SHARD] = std::to_string(es_scan_range.shard_id);
_properties[ESScanReader::KEY_BATCH_SIZE] = std::to_string(_runtime_state->batch_size());
_properties[ESScanReader::KEY_HOST_PORT] = get_host_port(es_scan_range.es_hosts);
_properties[ESScanReader::KEY_QUERY]
= ESScrollQueryBuilder::build(_properties, _column_names, _predicates);

status = scanner_scan(_tuple_id, _properties, scanner_expr_ctxs, &counter);
if (!status.ok()) {
Expand Down
4 changes: 1 addition & 3 deletions be/src/exec/es_http_scan_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
#include "common/status.h"
#include "exec/scan_node.h"
#include "exec/es_http_scanner.h"
#include "exec/es_query_builder.h"
#include "gen_cpp/PaloInternalService_types.h"

namespace doris {
Expand Down Expand Up @@ -86,12 +85,11 @@ class EsHttpScanNode : public ScanNode {

private:

void build_predicates();
void build_conjuncts_list();

TupleId _tuple_id;
RuntimeState* _runtime_state;
TupleDescriptor* _tuple_desc;
std::unique_ptr<EsQueryBuilder> _query_builder;

int _num_running_scanners;
std::atomic<bool> _scan_finished;
Expand Down
5 changes: 2 additions & 3 deletions be/src/exec/es_http_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
#include "runtime/raw_value.h"
#include "runtime/tuple.h"
#include "exprs/expr.h"
#include "exec/es_scan_reader.h"
#include "exec/text_converter.h"
#include "exec/text_converter.hpp"

Expand Down Expand Up @@ -82,8 +81,8 @@ Status EsHttpScanner::open() {
}
}

const std::string& host = _properties.at(EsScanReader::HOST);
_es_reader.reset(new EsScanReader(host, _properties));
const std::string& host = _properties.at(ESScanReader::KEY_HOST_PORT);
_es_reader.reset(new ESScanReader(host, _properties));
if (_es_reader == nullptr) {
return Status("Es reader construct failed.");
}
Expand Down
4 changes: 2 additions & 2 deletions be/src/exec/es_http_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "gen_cpp/Types_types.h"
#include "runtime/mem_pool.h"
#include "util/slice.h"
#include "util/es_scan_reader.h"
#include "util/runtime_profile.h"

namespace doris {
Expand All @@ -45,7 +46,6 @@ class TupleRow;
class RowDescriptor;
class MemTracker;
class RuntimeProfile;
class EsScanReader;

struct EsScanCounter {
EsScanCounter() : num_rows_returned(0), num_rows_filtered(0) {
Expand Down Expand Up @@ -93,7 +93,7 @@ class EsHttpScanner {

const TupleDescriptor* _tuple_desc;
EsScanCounter* _counter;
std::unique_ptr<EsScanReader> _es_reader;
std::unique_ptr<ESScanReader> _es_reader;
std::map<std::string, SlotDescriptor*> _slots_map;

// Profile
Expand Down
111 changes: 20 additions & 91 deletions be/src/exec/es_predicate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,15 @@ EsPredicate::EsPredicate(ExprContext* conjunct_ctx,
EsPredicate::~EsPredicate() {
}

bool EsPredicate::build_disjuncts() {
return build_disjuncts(_context->root(), _disjuncts);
bool EsPredicate::build_disjuncts_list() {
return build_disjuncts_list(_context->root(), _disjuncts);
}

vector<ExtPredicate> EsPredicate::get_predicate_list(){
return _disjuncts;
}

bool EsPredicate::build_disjuncts(Expr* conjunct, vector<ExtPredicate>& disjuncts) {
bool EsPredicate::build_disjuncts_list(Expr* conjunct, vector<ExtPredicate>& disjuncts) {
if (TExprNodeType::BINARY_PRED == conjunct->node_type()) {
if (conjunct->children().size() != 2) {
VLOG(1) << "get disjuncts fail: number of childs is not 2";
Expand Down Expand Up @@ -92,34 +92,28 @@ bool EsPredicate::build_disjuncts(Expr* conjunct, vector<ExtPredicate>& disjunct
return false;
}

TExtLiteral literal;
if (!to_ext_literal(_context, expr, &literal)) {
VLOG(1) << "get disjuncts fail: can't get literal, node_type="
<< expr->node_type();
return false;
}

std::shared_ptr<ExtLiteral> literal(new ExtLiteral(expr->node_type()));
literal->value = _context->get_value(expr, NULL);
std::unique_ptr<ExtPredicate> predicate(new ExtBinaryPredicate(
TExprNodeType::BINARY_PRED,
slot_desc->col_name(),
slot_desc->type(),
op,
literal));
*literal));

disjuncts.emplace_back(std::move(*predicate));
return true;
}

if (is_match_func(conjunct)) {
TExtLiteral literal;
if (!to_ext_literal(_context, conjunct->get_child(1), &literal)) {
VLOG(1) << "get disjuncts fail: can't get literal, node_type="
<< conjunct->get_child(1)->node_type();
return false;
}

vector<TExtLiteral> query_conditions;
query_conditions.push_back(std::move(literal));
Expr* expr = conjunct->get_child(1);
std::shared_ptr<ExtLiteral> literal(new ExtLiteral(expr->node_type()));
literal->value = _context->get_value(expr, NULL);

vector<ExtLiteral> query_conditions;
query_conditions.push_back(std::move(*literal));
vector<ExtColumnDesc> cols; //TODO

std::unique_ptr<ExtPredicate> predicate(new ExtFunction(
Expand All @@ -134,7 +128,7 @@ bool EsPredicate::build_disjuncts(Expr* conjunct, vector<ExtPredicate>& disjunct

if (TExprNodeType::IN_PRED == conjunct->node_type()) {
TExtInPredicate ext_in_predicate;
vector<TExtLiteral> in_pred_values;
vector<ExtLiteral> in_pred_values;
InPredicate* pred = dynamic_cast<InPredicate*>(conjunct);
ext_in_predicate.__set_is_not_in(pred->is_not_in());
if (Expr::type_without_cast(pred->get_child(0)) != TExprNodeType::SLOT_REF) {
Expand All @@ -159,13 +153,11 @@ bool EsPredicate::build_disjuncts(Expr* conjunct, vector<ExtPredicate>& disjunct
return false;
}
}
TExtLiteral literal;
if (!to_ext_literal(_context, pred->get_child(i), &literal)) {
VLOG(1) << "get disjuncts fail: can't get literal, node_type="
<< pred->get_child(i)->node_type();
return false;
}
in_pred_values.push_back(literal);

Expr* expr = conjunct->get_child(i);
std::shared_ptr<ExtLiteral> literal(new ExtLiteral(expr->node_type()));
literal->value = _context->get_value(expr, NULL);
in_pred_values.push_back(*literal);
}

std::unique_ptr<ExtPredicate> predicate(new ExtInPredicate(
Expand All @@ -184,10 +176,10 @@ bool EsPredicate::build_disjuncts(Expr* conjunct, vector<ExtPredicate>& disjunct
VLOG(1) << "get disjuncts fail: op is not COMPOUND_OR";
return false;
}
if (!build_disjuncts(conjunct->get_child(0), disjuncts)) {
if (!build_disjuncts_list(conjunct->get_child(0), disjuncts)) {
return false;
}
if (!build_disjuncts(conjunct->get_child(1), disjuncts)) {
if (!build_disjuncts_list(conjunct->get_child(1), disjuncts)) {
return false;
}

Expand Down Expand Up @@ -221,67 +213,4 @@ SlotDescriptor* EsPredicate::get_slot_desc(SlotRef* slotRef) {
return slot_desc;
}

bool EsPredicate::to_ext_literal(ExprContext* _context, Expr* expr, TExtLiteral* literal) {
literal->__set_node_type(expr->node_type());
switch (expr->node_type()) {
case TExprNodeType::BOOL_LITERAL: {
TBoolLiteral bool_literal;
void* value = _context->get_value(expr, NULL);
bool_literal.__set_value(*reinterpret_cast<bool*>(value));
literal->__set_bool_literal(bool_literal);
return true;
}
case TExprNodeType::DATE_LITERAL: {
void* value = _context->get_value(expr, NULL);
DateTimeValue date_value = *reinterpret_cast<DateTimeValue*>(value);
char str[MAX_DTVALUE_STR_LEN];
date_value.to_string(str);
TDateLiteral date_literal;
date_literal.__set_value(str);
literal->__set_date_literal(date_literal);
return true;
}
case TExprNodeType::FLOAT_LITERAL: {
TFloatLiteral float_literal;
void* value = _context->get_value(expr, NULL);
float_literal.__set_value(*reinterpret_cast<float*>(value));
literal->__set_float_literal(float_literal);
return true;
}
case TExprNodeType::INT_LITERAL: {
TIntLiteral int_literal;
void* value = _context->get_value(expr, NULL);
int_literal.__set_value(*reinterpret_cast<int32_t*>(value));
literal->__set_int_literal(int_literal);
return true;
}
case TExprNodeType::STRING_LITERAL: {
TStringLiteral string_literal;
void* value = _context->get_value(expr, NULL);
string_literal.__set_value(*reinterpret_cast<string*>(value));
literal->__set_string_literal(string_literal);
return true;
}
case TExprNodeType::DECIMAL_LITERAL: {
TDecimalLiteral decimal_literal;
void* value = _context->get_value(expr, NULL);
decimal_literal.__set_value(reinterpret_cast<DecimalValue*>(value)->to_string());
literal->__set_decimal_literal(decimal_literal);
return true;
}
case TExprNodeType::LARGE_INT_LITERAL: {
char buf[48];
int len = 48;
void* value = _context->get_value(expr, NULL);
char* v = LargeIntValue::to_string(*reinterpret_cast<__int128*>(value), buf, &len);
TLargeIntLiteral large_int_literal;
large_int_literal.__set_value(v);
literal->__set_large_int_literal(large_int_literal);
return true;
}
default:
return false;
}
}

}
28 changes: 18 additions & 10 deletions be/src/exec/es_predicate.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,22 @@ class Status;
class ExprContext;
class ExtBinaryPredicate;


struct ExtPredicate {
ExtPredicate(TExprNodeType::type node_type) : node_type(node_type) {
}

TExprNodeType::type node_type;
};

struct ExtLiteral : public ExtPredicate {
ExtLiteral(TExprNodeType::type node_type) :
ExtPredicate(node_type) {
}

void *value;
};

struct ExtColumnDesc {
ExtColumnDesc(std::string name, TypeDescriptor type) :
name(name),
Expand All @@ -57,7 +66,7 @@ struct ExtBinaryPredicate : public ExtPredicate {
std::string name,
TypeDescriptor type,
TExprOpcode::type op,
TExtLiteral value) :
ExtLiteral value) :
ExtPredicate(node_type),
col(name, type),
op(op),
Expand All @@ -66,15 +75,15 @@ struct ExtBinaryPredicate : public ExtPredicate {

ExtColumnDesc col;
TExprOpcode::type op;
TExtLiteral value;
ExtLiteral value;
};

struct ExtInPredicate : public ExtPredicate {
ExtInPredicate(
TExprNodeType::type node_type,
std::string name,
TypeDescriptor type,
vector<TExtLiteral> values) :
vector<ExtLiteral> values) :
ExtPredicate(node_type),
is_not_in(false),
col(name, type),
Expand All @@ -83,12 +92,12 @@ struct ExtInPredicate : public ExtPredicate {

bool is_not_in;
ExtColumnDesc col;
vector<TExtLiteral> values;
vector<ExtLiteral> values;
};

struct ExtLikePredicate : public ExtPredicate {
ExtColumnDesc col;
TExtLiteral value;
ExtLiteral value;
};

struct ExtIsNullPredicate : public ExtPredicate {
Expand All @@ -101,7 +110,7 @@ struct ExtFunction : public ExtPredicate {
TExprNodeType::type node_type,
string func_name,
vector<ExtColumnDesc> cols,
vector<TExtLiteral> values) :
vector<ExtLiteral> values) :
ExtPredicate(node_type),
func_name(func_name),
cols(cols),
Expand All @@ -110,7 +119,7 @@ struct ExtFunction : public ExtPredicate {

string func_name;
vector<ExtColumnDesc> cols;
vector<TExtLiteral> values;
vector<ExtLiteral> values;
};

class EsPredicate {
Expand All @@ -120,12 +129,11 @@ class EsPredicate {
const TupleDescriptor* tuple_desc);
~EsPredicate();
vector<ExtPredicate> get_predicate_list();
bool build_disjuncts();
bool build_disjuncts_list();

private:

bool build_disjuncts(Expr* conjunct, vector<ExtPredicate>& disjuncts);
bool to_ext_literal(ExprContext* context, Expr* expr, TExtLiteral* literal);
bool build_disjuncts_list(Expr* conjunct, vector<ExtPredicate>& disjuncts);
bool is_match_func(Expr* conjunct);
SlotDescriptor* get_slot_desc(SlotRef* slotRef);

Expand Down
Loading