diff --git a/be/src/exec/es_http_scan_node.cpp b/be/src/exec/es_http_scan_node.cpp index 668fdbd8e5160d..3ca1f838709427 100644 --- a/be/src/exec/es_http_scan_node.cpp +++ b/be/src/exec/es_http_scan_node.cpp @@ -27,7 +27,8 @@ #include "runtime/dpp_sink_internal.h" #include "service/backend_options.h" #include "util/runtime_profile.h" -#include "exec/es_scan_reader.h" +#include "util/es_scan_reader.h" +#include "util/es_scroll_query.h" #include "exec/es_predicate.h" namespace doris { @@ -38,7 +39,6 @@ EsHttpScanNode::EsHttpScanNode( _tuple_id(tnode.es_scan_node.tuple_id), _runtime_state(nullptr), _tuple_desc(nullptr), - _query_builder(nullptr), _num_running_scanners(0), _scan_finished(false), _eos(false), @@ -79,11 +79,11 @@ Status EsHttpScanNode::prepare(RuntimeState* state) { return Status::OK; } -void EsHttpScanNode::build_predicates() { +void EsHttpScanNode::build_conjuncts_list() { for (int i = 0; i < _conjunct_ctxs.size(); ++i) { std::shared_ptr predicate( new EsPredicate(_conjunct_ctxs[i], _tuple_desc)); - if (predicate->build_disjuncts()) { + if (predicate->build_disjuncts_list()) { _predicates.push_back(predicate); _predicate_to_conjunct.push_back(i); } @@ -106,7 +106,7 @@ Status EsHttpScanNode::open(RuntimeState* state) { } } - build_predicates(); + build_conjuncts_list(); RETURN_IF_ERROR(start_scanners()); @@ -368,14 +368,16 @@ void EsHttpScanNode::scanner_worker(int start_idx, int length) { const TEsScanRange& es_scan_range = _scan_ranges[start_idx + i].scan_range.es_scan_range; - _properties[EsScanReader::INDEX] = es_scan_range.index; + _properties[ESScanReader::KEY_INDEX] = es_scan_range.index; if (es_scan_range.__isset.type) { - _properties[EsScanReader::TYPE] = es_scan_range.type; + _properties[ESScanReader::KEY_TYPE] = es_scan_range.type; } - _properties[EsScanReader::SHARD_ID] = std::to_string(es_scan_range.shard_id); - _properties[EsScanReader::BATCH_SIZE] = std::to_string(_runtime_state->batch_size()); - _properties[EsScanReader::HOST] = get_host_port(es_scan_range.es_hosts); - _properties[EsScanReader::QUERY] = EsQueryBuilder::build(_properties, _column_names, _predicates); + + _properties[ESScanReader::KEY_SHARD] = std::to_string(es_scan_range.shard_id); + _properties[ESScanReader::KEY_BATCH_SIZE] = std::to_string(_runtime_state->batch_size()); + _properties[ESScanReader::KEY_HOST_PORT] = get_host_port(es_scan_range.es_hosts); + _properties[ESScanReader::KEY_QUERY] + = ESScrollQueryBuilder::build(_properties, _column_names, _predicates); status = scanner_scan(_tuple_id, _properties, scanner_expr_ctxs, &counter); if (!status.ok()) { diff --git a/be/src/exec/es_http_scan_node.h b/be/src/exec/es_http_scan_node.h index 3b5658d4d17524..c9d077ee4ab1ae 100644 --- a/be/src/exec/es_http_scan_node.h +++ b/be/src/exec/es_http_scan_node.h @@ -29,7 +29,6 @@ #include "common/status.h" #include "exec/scan_node.h" #include "exec/es_http_scanner.h" -#include "exec/es_query_builder.h" #include "gen_cpp/PaloInternalService_types.h" namespace doris { @@ -86,12 +85,11 @@ class EsHttpScanNode : public ScanNode { private: - void build_predicates(); + void build_conjuncts_list(); TupleId _tuple_id; RuntimeState* _runtime_state; TupleDescriptor* _tuple_desc; - std::unique_ptr _query_builder; int _num_running_scanners; std::atomic _scan_finished; diff --git a/be/src/exec/es_http_scanner.cpp b/be/src/exec/es_http_scanner.cpp index a44f44297fdcd9..bbf0e14ed2b49c 100644 --- a/be/src/exec/es_http_scanner.cpp +++ b/be/src/exec/es_http_scanner.cpp @@ -26,7 +26,6 @@ #include "runtime/raw_value.h" #include "runtime/tuple.h" #include "exprs/expr.h" -#include "exec/es_scan_reader.h" #include "exec/text_converter.h" #include "exec/text_converter.hpp" @@ -82,8 +81,8 @@ Status EsHttpScanner::open() { } } - const std::string& host = _properties.at(EsScanReader::HOST); - _es_reader.reset(new EsScanReader(host, _properties)); + const std::string& host = _properties.at(ESScanReader::KEY_HOST_PORT); + _es_reader.reset(new ESScanReader(host, _properties)); if (_es_reader == nullptr) { return Status("Es reader construct failed."); } diff --git a/be/src/exec/es_http_scanner.h b/be/src/exec/es_http_scanner.h index 850db04e79a569..9db167cbce4e0b 100644 --- a/be/src/exec/es_http_scanner.h +++ b/be/src/exec/es_http_scanner.h @@ -30,6 +30,7 @@ #include "gen_cpp/Types_types.h" #include "runtime/mem_pool.h" #include "util/slice.h" +#include "util/es_scan_reader.h" #include "util/runtime_profile.h" namespace doris { @@ -45,7 +46,6 @@ class TupleRow; class RowDescriptor; class MemTracker; class RuntimeProfile; -class EsScanReader; struct EsScanCounter { EsScanCounter() : num_rows_returned(0), num_rows_filtered(0) { @@ -93,7 +93,7 @@ class EsHttpScanner { const TupleDescriptor* _tuple_desc; EsScanCounter* _counter; - std::unique_ptr _es_reader; + std::unique_ptr _es_reader; std::map _slots_map; // Profile diff --git a/be/src/exec/es_predicate.cpp b/be/src/exec/es_predicate.cpp index 73d6073956fcd9..fb35ae3e7d53f9 100644 --- a/be/src/exec/es_predicate.cpp +++ b/be/src/exec/es_predicate.cpp @@ -55,15 +55,15 @@ EsPredicate::EsPredicate(ExprContext* conjunct_ctx, EsPredicate::~EsPredicate() { } -bool EsPredicate::build_disjuncts() { - return build_disjuncts(_context->root(), _disjuncts); +bool EsPredicate::build_disjuncts_list() { + return build_disjuncts_list(_context->root(), _disjuncts); } vector EsPredicate::get_predicate_list(){ return _disjuncts; } -bool EsPredicate::build_disjuncts(Expr* conjunct, vector& disjuncts) { +bool EsPredicate::build_disjuncts_list(Expr* conjunct, vector& disjuncts) { if (TExprNodeType::BINARY_PRED == conjunct->node_type()) { if (conjunct->children().size() != 2) { VLOG(1) << "get disjuncts fail: number of childs is not 2"; @@ -92,34 +92,28 @@ bool EsPredicate::build_disjuncts(Expr* conjunct, vector& disjunct return false; } - TExtLiteral literal; - if (!to_ext_literal(_context, expr, &literal)) { - VLOG(1) << "get disjuncts fail: can't get literal, node_type=" - << expr->node_type(); - return false; - } + std::shared_ptr literal(new ExtLiteral(expr->node_type())); + literal->value = _context->get_value(expr, NULL); std::unique_ptr predicate(new ExtBinaryPredicate( TExprNodeType::BINARY_PRED, slot_desc->col_name(), slot_desc->type(), op, - literal)); + *literal)); disjuncts.emplace_back(std::move(*predicate)); return true; } if (is_match_func(conjunct)) { - TExtLiteral literal; - if (!to_ext_literal(_context, conjunct->get_child(1), &literal)) { - VLOG(1) << "get disjuncts fail: can't get literal, node_type=" - << conjunct->get_child(1)->node_type(); - return false; - } - vector query_conditions; - query_conditions.push_back(std::move(literal)); + Expr* expr = conjunct->get_child(1); + std::shared_ptr literal(new ExtLiteral(expr->node_type())); + literal->value = _context->get_value(expr, NULL); + + vector query_conditions; + query_conditions.push_back(std::move(*literal)); vector cols; //TODO std::unique_ptr predicate(new ExtFunction( @@ -134,7 +128,7 @@ bool EsPredicate::build_disjuncts(Expr* conjunct, vector& disjunct if (TExprNodeType::IN_PRED == conjunct->node_type()) { TExtInPredicate ext_in_predicate; - vector in_pred_values; + vector in_pred_values; InPredicate* pred = dynamic_cast(conjunct); ext_in_predicate.__set_is_not_in(pred->is_not_in()); if (Expr::type_without_cast(pred->get_child(0)) != TExprNodeType::SLOT_REF) { @@ -159,13 +153,11 @@ bool EsPredicate::build_disjuncts(Expr* conjunct, vector& disjunct return false; } } - TExtLiteral literal; - if (!to_ext_literal(_context, pred->get_child(i), &literal)) { - VLOG(1) << "get disjuncts fail: can't get literal, node_type=" - << pred->get_child(i)->node_type(); - return false; - } - in_pred_values.push_back(literal); + + Expr* expr = conjunct->get_child(i); + std::shared_ptr literal(new ExtLiteral(expr->node_type())); + literal->value = _context->get_value(expr, NULL); + in_pred_values.push_back(*literal); } std::unique_ptr predicate(new ExtInPredicate( @@ -184,10 +176,10 @@ bool EsPredicate::build_disjuncts(Expr* conjunct, vector& disjunct VLOG(1) << "get disjuncts fail: op is not COMPOUND_OR"; return false; } - if (!build_disjuncts(conjunct->get_child(0), disjuncts)) { + if (!build_disjuncts_list(conjunct->get_child(0), disjuncts)) { return false; } - if (!build_disjuncts(conjunct->get_child(1), disjuncts)) { + if (!build_disjuncts_list(conjunct->get_child(1), disjuncts)) { return false; } @@ -221,67 +213,4 @@ SlotDescriptor* EsPredicate::get_slot_desc(SlotRef* slotRef) { return slot_desc; } -bool EsPredicate::to_ext_literal(ExprContext* _context, Expr* expr, TExtLiteral* literal) { - literal->__set_node_type(expr->node_type()); - switch (expr->node_type()) { - case TExprNodeType::BOOL_LITERAL: { - TBoolLiteral bool_literal; - void* value = _context->get_value(expr, NULL); - bool_literal.__set_value(*reinterpret_cast(value)); - literal->__set_bool_literal(bool_literal); - return true; - } - case TExprNodeType::DATE_LITERAL: { - void* value = _context->get_value(expr, NULL); - DateTimeValue date_value = *reinterpret_cast(value); - char str[MAX_DTVALUE_STR_LEN]; - date_value.to_string(str); - TDateLiteral date_literal; - date_literal.__set_value(str); - literal->__set_date_literal(date_literal); - return true; - } - case TExprNodeType::FLOAT_LITERAL: { - TFloatLiteral float_literal; - void* value = _context->get_value(expr, NULL); - float_literal.__set_value(*reinterpret_cast(value)); - literal->__set_float_literal(float_literal); - return true; - } - case TExprNodeType::INT_LITERAL: { - TIntLiteral int_literal; - void* value = _context->get_value(expr, NULL); - int_literal.__set_value(*reinterpret_cast(value)); - literal->__set_int_literal(int_literal); - return true; - } - case TExprNodeType::STRING_LITERAL: { - TStringLiteral string_literal; - void* value = _context->get_value(expr, NULL); - string_literal.__set_value(*reinterpret_cast(value)); - literal->__set_string_literal(string_literal); - return true; - } - case TExprNodeType::DECIMAL_LITERAL: { - TDecimalLiteral decimal_literal; - void* value = _context->get_value(expr, NULL); - decimal_literal.__set_value(reinterpret_cast(value)->to_string()); - literal->__set_decimal_literal(decimal_literal); - return true; - } - case TExprNodeType::LARGE_INT_LITERAL: { - char buf[48]; - int len = 48; - void* value = _context->get_value(expr, NULL); - char* v = LargeIntValue::to_string(*reinterpret_cast<__int128*>(value), buf, &len); - TLargeIntLiteral large_int_literal; - large_int_literal.__set_value(v); - literal->__set_large_int_literal(large_int_literal); - return true; - } - default: - return false; - } -} - } diff --git a/be/src/exec/es_predicate.h b/be/src/exec/es_predicate.h index ee328a69c7bdf6..8c603b21e6be0f 100644 --- a/be/src/exec/es_predicate.h +++ b/be/src/exec/es_predicate.h @@ -34,6 +34,7 @@ class Status; class ExprContext; class ExtBinaryPredicate; + struct ExtPredicate { ExtPredicate(TExprNodeType::type node_type) : node_type(node_type) { } @@ -41,6 +42,14 @@ struct ExtPredicate { TExprNodeType::type node_type; }; +struct ExtLiteral : public ExtPredicate { + ExtLiteral(TExprNodeType::type node_type) : + ExtPredicate(node_type) { + } + + void *value; +}; + struct ExtColumnDesc { ExtColumnDesc(std::string name, TypeDescriptor type) : name(name), @@ -57,7 +66,7 @@ struct ExtBinaryPredicate : public ExtPredicate { std::string name, TypeDescriptor type, TExprOpcode::type op, - TExtLiteral value) : + ExtLiteral value) : ExtPredicate(node_type), col(name, type), op(op), @@ -66,7 +75,7 @@ struct ExtBinaryPredicate : public ExtPredicate { ExtColumnDesc col; TExprOpcode::type op; - TExtLiteral value; + ExtLiteral value; }; struct ExtInPredicate : public ExtPredicate { @@ -74,7 +83,7 @@ struct ExtInPredicate : public ExtPredicate { TExprNodeType::type node_type, std::string name, TypeDescriptor type, - vector values) : + vector values) : ExtPredicate(node_type), is_not_in(false), col(name, type), @@ -83,12 +92,12 @@ struct ExtInPredicate : public ExtPredicate { bool is_not_in; ExtColumnDesc col; - vector values; + vector values; }; struct ExtLikePredicate : public ExtPredicate { ExtColumnDesc col; - TExtLiteral value; + ExtLiteral value; }; struct ExtIsNullPredicate : public ExtPredicate { @@ -101,7 +110,7 @@ struct ExtFunction : public ExtPredicate { TExprNodeType::type node_type, string func_name, vector cols, - vector values) : + vector values) : ExtPredicate(node_type), func_name(func_name), cols(cols), @@ -110,7 +119,7 @@ struct ExtFunction : public ExtPredicate { string func_name; vector cols; - vector values; + vector values; }; class EsPredicate { @@ -120,12 +129,11 @@ class EsPredicate { const TupleDescriptor* tuple_desc); ~EsPredicate(); vector get_predicate_list(); - bool build_disjuncts(); + bool build_disjuncts_list(); private: - bool build_disjuncts(Expr* conjunct, vector& disjuncts); - bool to_ext_literal(ExprContext* context, Expr* expr, TExtLiteral* literal); + bool build_disjuncts_list(Expr* conjunct, vector& disjuncts); bool is_match_func(Expr* conjunct); SlotDescriptor* get_slot_desc(SlotRef* slotRef); diff --git a/be/src/exec/es_query_builder.h b/be/src/exec/es_query_builder.h deleted file mode 100644 index 8d61a0ea4eb17a..00000000000000 --- a/be/src/exec/es_query_builder.h +++ /dev/null @@ -1,44 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#pragma once - -#include - -#include -#include - -#include "common/status.h" - -namespace doris { - -class EsPredicate; - -class EsQueryBuilder { -public: - EsQueryBuilder() {}; - ~EsQueryBuilder() {}; - - static std::string build(const std::map& properties, - const std::vector& columns, - std::vector>) { - return std::string("xxx"); - } -}; - -} - diff --git a/be/src/exec/es_scan_reader.h b/be/src/exec/es_scan_reader.h deleted file mode 100644 index fd5516d7c0e6de..00000000000000 --- a/be/src/exec/es_scan_reader.h +++ /dev/null @@ -1,66 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#pragma once - -#include - -#include -#include - -#include "common/status.h" - -namespace doris { - -class EsScanReader { -public: - constexpr static const char* HOST = "host"; - constexpr static const char* INDEX = "index"; - constexpr static const char* TYPE = "type"; - constexpr static const char* SHARD_ID = "shard_id"; - constexpr static const char* BATCH_SIZE = "batch_size"; - constexpr static const char* QUERY = "query"; - - EsScanReader(const std::string& target, - const std::map& properties) : - _target(target), - _properties(properties), - _eof(false) { - } - - ~EsScanReader() {}; - - Status open() { return Status::OK; } - - Status get_next(bool* eof, std::string* buf) { - const char* json = "{\"_scroll_id\": \"DXF1ZXJ5QW5kRmV0Y2gBAAAAAAAA1ewWbEhKNHRWX1NTNG04bERuV05RUlA5Zw==\",\"hits\": {\"total\": 10,\"hits\": [{\"_source\": {\"id\": 1}},{\"_source\": {\"id\": 2}}]}}"; - buf->append(json); - *eof = true; - return Status::OK; - } - - void close() {}; - -private: - - const std::string& _target; - const std::map& _properties; - bool _eof; -}; - -} -