diff --git a/be/CMakeLists.txt b/be/CMakeLists.txt index fe4337ad8c7f45..6a059259981a5e 100644 --- a/be/CMakeLists.txt +++ b/be/CMakeLists.txt @@ -284,7 +284,7 @@ set(CXX_GCC_FLAGS "-g -Wno-unused-local-typedefs") # Debug information is stored as dwarf2 to be as compatible as possible # -Werror: compile warnings should be errors when using the toolchain compiler. # Only enable for debug builds because this is what we test in pre-commit tests. -set(CXX_FLAGS_DEBUG "${CXX_GCC_FLAGS} -Werror -ggdb") +set(CXX_FLAGS_DEBUG "${CXX_GCC_FLAGS} -Werror -O0 -gdwarf-2") # For CMAKE_BUILD_TYPE=Release # -O3: Enable all compiler optimizations diff --git a/be/src/exec/CMakeLists.txt b/be/src/exec/CMakeLists.txt index ac101769ef104c..ae7ad19996cf97 100644 --- a/be/src/exec/CMakeLists.txt +++ b/be/src/exec/CMakeLists.txt @@ -63,6 +63,9 @@ set(EXEC_FILES csv_scan_node.cpp csv_scanner.cpp es_scan_node.cpp + es_http_scan_node.cpp + es_http_scanner.cpp + es_predicate.cpp spill_sort_node.cc union_node.cpp union_node_ir.cpp diff --git a/be/src/exec/es_http_scan_node.cpp b/be/src/exec/es_http_scan_node.cpp new file mode 100644 index 00000000000000..668fdbd8e5160d --- /dev/null +++ b/be/src/exec/es_http_scan_node.cpp @@ -0,0 +1,407 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "exec/es_http_scan_node.h" + +#include +#include + +#include "common/object_pool.h" +#include "exprs/expr.h" +#include "runtime/runtime_state.h" +#include "runtime/row_batch.h" +#include "runtime/dpp_sink_internal.h" +#include "service/backend_options.h" +#include "util/runtime_profile.h" +#include "exec/es_scan_reader.h" +#include "exec/es_predicate.h" + +namespace doris { + +EsHttpScanNode::EsHttpScanNode( + ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) : + ScanNode(pool, tnode, descs), + _tuple_id(tnode.es_scan_node.tuple_id), + _runtime_state(nullptr), + _tuple_desc(nullptr), + _query_builder(nullptr), + _num_running_scanners(0), + _scan_finished(false), + _eos(false), + _max_buffered_batches(1024), + _wait_scanner_timer(nullptr) { +} + +EsHttpScanNode::~EsHttpScanNode() { +} + +Status EsHttpScanNode::init(const TPlanNode& tnode, RuntimeState* state) { + RETURN_IF_ERROR(ScanNode::init(tnode)); + _properties = tnode.es_scan_node.properties; + return Status::OK; +} + +Status EsHttpScanNode::prepare(RuntimeState* state) { + VLOG_QUERY << "EsHttpScanNode prepare"; + RETURN_IF_ERROR(ScanNode::prepare(state)); + + _runtime_state = state; + _tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id); + if (_tuple_desc == nullptr) { + std::stringstream ss; + ss << "Failed to get tuple descriptor, _tuple_id=" << _tuple_id; + return Status(ss.str()); + } + + for (auto slot_desc : _tuple_desc->slots()) { + if (!slot_desc->is_materialized()) { + continue; + } + _column_names.push_back(slot_desc->col_name()); + } + + _wait_scanner_timer = ADD_TIMER(runtime_profile(), "WaitScannerTime"); + + return Status::OK; +} + +void EsHttpScanNode::build_predicates() { + for (int i = 0; i < _conjunct_ctxs.size(); ++i) { + std::shared_ptr predicate( + new EsPredicate(_conjunct_ctxs[i], _tuple_desc)); + if (predicate->build_disjuncts()) { + _predicates.push_back(predicate); + _predicate_to_conjunct.push_back(i); + } + } +} + +Status EsHttpScanNode::open(RuntimeState* state) { + SCOPED_TIMER(_runtime_profile->total_time_counter()); + RETURN_IF_ERROR(ExecNode::open(state)); + RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::OPEN)); + RETURN_IF_CANCELLED(state); + + for (int conj_idx = 0; conj_idx < _conjunct_ctxs.size(); ++conj_idx) { + // if conjunct is constant, compute direct and set eos = true + if (_conjunct_ctxs[conj_idx]->root()->is_constant()) { + void* value = _conjunct_ctxs[conj_idx]->get_value(NULL); + if (value == NULL || *reinterpret_cast(value) == false) { + _eos = true; + } + } + } + + build_predicates(); + + RETURN_IF_ERROR(start_scanners()); + + return Status::OK; +} + +Status EsHttpScanNode::start_scanners() { + { + std::unique_lock l(_batch_queue_lock); + _num_running_scanners = 1; + } + _scanner_threads.emplace_back(&EsHttpScanNode::scanner_worker, this, 0, + _scan_ranges.size()); + return Status::OK; +} + +Status EsHttpScanNode::get_next(RuntimeState* state, RowBatch* row_batch, + bool* eos) { + SCOPED_TIMER(_runtime_profile->total_time_counter()); + if (state->is_cancelled()) { + std::unique_lock l(_batch_queue_lock); + if (update_status(Status::CANCELLED)) { + _queue_writer_cond.notify_all(); + } + } + + if (_eos) { + *eos = true; + return Status::OK; + } + + if (_scan_finished.load()) { + *eos = true; + return Status::OK; + } + + std::shared_ptr scanner_batch; + { + std::unique_lock l(_batch_queue_lock); + while (_process_status.ok() && + !_runtime_state->is_cancelled() && + _num_running_scanners > 0 && + _batch_queue.empty()) { + SCOPED_TIMER(_wait_scanner_timer); + _queue_reader_cond.wait_for(l, std::chrono::seconds(1)); + } + if (!_process_status.ok()) { + // Some scanner process failed. + return _process_status; + } + if (_runtime_state->is_cancelled()) { + if (update_status(Status::CANCELLED)) { + _queue_writer_cond.notify_all(); + } + return _process_status; + } + if (!_batch_queue.empty()) { + scanner_batch = _batch_queue.front(); + _batch_queue.pop_front(); + } + } + + // All scanner has been finished, and all cached batch has been read + if (scanner_batch == nullptr) { + _scan_finished.store(true); + *eos = true; + return Status::OK; + } + + // notify one scanner + _queue_writer_cond.notify_one(); + + // get scanner's batch memory + row_batch->acquire_state(scanner_batch.get()); + _num_rows_returned += row_batch->num_rows(); + COUNTER_SET(_rows_returned_counter, _num_rows_returned); + + // This is first time reach limit. + // Only valid when query 'select * from table1 limit 20' + if (reached_limit()) { + int num_rows_over = _num_rows_returned - _limit; + row_batch->set_num_rows(row_batch->num_rows() - num_rows_over); + _num_rows_returned -= num_rows_over; + COUNTER_SET(_rows_returned_counter, _num_rows_returned); + + _scan_finished.store(true); + _queue_writer_cond.notify_all(); + *eos = true; + } else { + *eos = false; + } + + if (VLOG_ROW_IS_ON) { + for (int i = 0; i < row_batch->num_rows(); ++i) { + TupleRow* row = row_batch->get_row(i); + VLOG_ROW << "EsHttpScanNode output row: " + << Tuple::to_string(row->get_tuple(0), *_tuple_desc); + } + } + + return Status::OK; +} + +Status EsHttpScanNode::close(RuntimeState* state) { + if (is_closed()) { + return Status::OK; + } + RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::CLOSE)); + SCOPED_TIMER(_runtime_profile->total_time_counter()); + _scan_finished.store(true); + _queue_writer_cond.notify_all(); + _queue_reader_cond.notify_all(); + for (int i = 0; i < _scanner_threads.size(); ++i) { + _scanner_threads[i].join(); + } + + _batch_queue.clear(); + + return ExecNode::close(state); +} + +// This function is called after plan node has been prepared. +Status EsHttpScanNode::set_scan_ranges(const std::vector& scan_ranges) { + _scan_ranges = scan_ranges; + return Status::OK; +} + +void EsHttpScanNode::debug_string(int ident_level, std::stringstream* out) const { + (*out) << "EsHttpScanNode"; +} + +Status EsHttpScanNode::scanner_scan( + TupleId _tuple_id, + std::map properties, + const std::vector& conjunct_ctxs, + EsScanCounter* counter) { + std::unique_ptr scanner(new EsHttpScanner( + _runtime_state, + runtime_profile(), + _tuple_id, + properties, + conjunct_ctxs, + counter)); + RETURN_IF_ERROR(scanner->open()); + bool scanner_eof = false; + + while (!scanner_eof) { + // Fill one row batch + std::shared_ptr row_batch( + new RowBatch(row_desc(), _runtime_state->batch_size(), mem_tracker())); + + // create new tuple buffer for row_batch + MemPool* tuple_pool = row_batch->tuple_data_pool(); + int tuple_buffer_size = row_batch->capacity() * _tuple_desc->byte_size(); + void* tuple_buffer = tuple_pool->allocate(tuple_buffer_size); + if (tuple_buffer == nullptr) { + return Status("Allocate memory for row batch failed."); + } + + Tuple* tuple = reinterpret_cast(tuple_buffer); + while (!scanner_eof) { + RETURN_IF_CANCELLED(_runtime_state); + // If we have finished all works + if (_scan_finished.load()) { + return Status::OK; + } + + // This row batch has been filled up, and break this + if (row_batch->is_full()) { + break; + } + + int row_idx = row_batch->add_row(); + TupleRow* row = row_batch->get_row(row_idx); + // scan node is the first tuple of tuple row + row->set_tuple(0, tuple); + memset(tuple, 0, _tuple_desc->num_null_bytes()); + + // Get from scanner + RETURN_IF_ERROR(scanner->get_next(tuple, tuple_pool, &scanner_eof)); + if (scanner_eof) { + continue; + } + + // eval conjuncts of this row. + if (eval_conjuncts(&conjunct_ctxs[0], conjunct_ctxs.size(), row)) { + row_batch->commit_last_row(); + char* new_tuple = reinterpret_cast(tuple); + new_tuple += _tuple_desc->byte_size(); + tuple = reinterpret_cast(new_tuple); + counter->num_rows_returned++; + } else { + counter->num_rows_filtered++; + } + } + + // Row batch has been filled, push this to the queue + if (row_batch->num_rows() > 0) { + std::unique_lock l(_batch_queue_lock); + while (_process_status.ok() && + !_scan_finished.load() && + !_runtime_state->is_cancelled() && + _batch_queue.size() >= _max_buffered_batches) { + _queue_writer_cond.wait_for(l, std::chrono::seconds(1)); + } + // Process already set failed, so we just return OK + if (!_process_status.ok()) { + return Status::OK; + } + // Scan already finished, just return + if (_scan_finished.load()) { + return Status::OK; + } + // Runtime state is canceled, just return cancel + if (_runtime_state->is_cancelled()) { + return Status::CANCELLED; + } + // Queue size Must be samller than _max_buffered_batches + _batch_queue.push_back(row_batch); + + // Notify reader to + _queue_reader_cond.notify_one(); + } + } + + return Status::OK; +} + +static std::string get_host_port(const std::vector& es_hosts) { + + std::string host_port; + std::string localhost = BackendOptions::get_localhost(); + + TNetworkAddress host = es_hosts[0]; + for (auto& es_host : es_hosts) { + if (es_host.hostname == localhost) { + host = es_host; + break; + } + } + + host_port = host.hostname; + host_port += ":"; + host_port += std::to_string(host.port); + return host_port; +} + +void EsHttpScanNode::scanner_worker(int start_idx, int length) { + // Clone expr context + std::vector scanner_expr_ctxs; + auto status = Expr::clone_if_not_exists(_conjunct_ctxs, _runtime_state, + &scanner_expr_ctxs); + if (!status.ok()) { + LOG(WARNING) << "Clone conjuncts failed."; + } + + EsScanCounter counter; + for (int i = 0; i < length && status.ok(); ++i) { + const TEsScanRange& es_scan_range = + _scan_ranges[start_idx + i].scan_range.es_scan_range; + + _properties[EsScanReader::INDEX] = es_scan_range.index; + if (es_scan_range.__isset.type) { + _properties[EsScanReader::TYPE] = es_scan_range.type; + } + _properties[EsScanReader::SHARD_ID] = std::to_string(es_scan_range.shard_id); + _properties[EsScanReader::BATCH_SIZE] = std::to_string(_runtime_state->batch_size()); + _properties[EsScanReader::HOST] = get_host_port(es_scan_range.es_hosts); + _properties[EsScanReader::QUERY] = EsQueryBuilder::build(_properties, _column_names, _predicates); + + status = scanner_scan(_tuple_id, _properties, scanner_expr_ctxs, &counter); + if (!status.ok()) { + LOG(WARNING) << "Scanner[" << start_idx + i << "] prcess failed. status=" + << status.get_error_msg(); + } + } + + // Update stats + _runtime_state->update_num_rows_load_success(counter.num_rows_returned); + _runtime_state->update_num_rows_load_filtered(counter.num_rows_filtered); + + // scanner is going to finish + { + std::lock_guard l(_batch_queue_lock); + if (!status.ok()) { + update_status(status); + } + // This scanner will finish + _num_running_scanners--; + } + _queue_reader_cond.notify_all(); + // If one scanner failed, others don't need scan any more + if (!status.ok()) { + _queue_writer_cond.notify_all(); + } + Expr::close(scanner_expr_ctxs, _runtime_state); +} +} diff --git a/be/src/exec/es_http_scan_node.h b/be/src/exec/es_http_scan_node.h new file mode 100644 index 00000000000000..3b5658d4d17524 --- /dev/null +++ b/be/src/exec/es_http_scan_node.h @@ -0,0 +1,121 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef BE_EXEC_ES_HTTP_SCAN_NODE_H +#define BE_EXEC_ES_HTTP_SCAN_NODE_H + +#include +#include +#include +#include +#include +#include +#include + +#include "common/status.h" +#include "exec/scan_node.h" +#include "exec/es_http_scanner.h" +#include "exec/es_query_builder.h" +#include "gen_cpp/PaloInternalService_types.h" + +namespace doris { + +class RuntimeState; +class PartRangeKey; +class PartitionInfo; +class EsHttpScanCounter; +class EsPredicate; + +class EsHttpScanNode : public ScanNode { +public: + EsHttpScanNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs); + virtual ~EsHttpScanNode(); + + virtual Status init(const TPlanNode& tnode, RuntimeState* state = nullptr) override; + + virtual Status prepare(RuntimeState* state) override; + + virtual Status open(RuntimeState* state) override; + + virtual Status get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) override; + + virtual Status close(RuntimeState* state) override; + + virtual Status set_scan_ranges(const std::vector& scan_ranges) override; + +protected: + // Write debug string of this into out. + virtual void debug_string(int indentation_level, std::stringstream* out) const override; + +private: + // Update process status to one failed status, + // NOTE: Must hold the mutex of this scan node + bool update_status(const Status& new_status) { + if (_process_status.ok()) { + _process_status = new_status; + return true; + } + return false; + } + + // Create scanners to do scan job + Status start_scanners(); + + // One scanner worker, This scanner will hanle 'length' ranges start from start_idx + void scanner_worker(int start_idx, int length); + + // Scan one range + Status scanner_scan(TupleId _tuple_id, + std::map properties, + const std::vector& conjunct_ctxs, + EsScanCounter* counter); + +private: + + void build_predicates(); + + TupleId _tuple_id; + RuntimeState* _runtime_state; + TupleDescriptor* _tuple_desc; + std::unique_ptr _query_builder; + + int _num_running_scanners; + std::atomic _scan_finished; + bool _eos; + int _max_buffered_batches; + RuntimeProfile::Counter* _wait_scanner_timer; + + bool _all_scanners_finished; + Status _process_status; + + std::vector _scanner_threads; + std::map _properties; + std::vector _scan_ranges; + std::vector _column_names; + + std::mutex _batch_queue_lock; + std::condition_variable _queue_reader_cond; + std::condition_variable _queue_writer_cond; + std::deque> _batch_queue; + std::vector> _predicates; + + std::vector _predicate_to_conjunct; +}; + +} + +#endif diff --git a/be/src/exec/es_http_scanner.cpp b/be/src/exec/es_http_scanner.cpp new file mode 100644 index 00000000000000..a44f44297fdcd9 --- /dev/null +++ b/be/src/exec/es_http_scanner.cpp @@ -0,0 +1,130 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "exec/es_http_scanner.h" + +#include +#include + +#include "runtime/descriptors.h" +#include "runtime/exec_env.h" +#include "runtime/mem_tracker.h" +#include "runtime/raw_value.h" +#include "runtime/tuple.h" +#include "exprs/expr.h" +#include "exec/es_scan_reader.h" +#include "exec/text_converter.h" +#include "exec/text_converter.hpp" + +namespace doris { + +EsHttpScanner::EsHttpScanner( + RuntimeState* state, + RuntimeProfile* profile, + TupleId tuple_id, + std::map properties, + const std::vector& conjunct_ctxs, + EsScanCounter* counter) : + _state(state), + _profile(profile), + _tuple_id(tuple_id), + _properties(properties), + _conjunct_ctxs(conjunct_ctxs), + _next_range(0), + _line_eof(false), +#if BE_TEST + _mem_tracker(new MemTracker()), + _mem_pool(_mem_tracker.get()), +#else + _mem_tracker(new MemTracker(-1, "EsHttp Scanner", state->instance_mem_tracker())), + _mem_pool(_state->instance_mem_tracker()), +#endif + _tuple_desc(nullptr), + _counter(counter), + _es_reader(nullptr), + _rows_read_counter(nullptr), + _read_timer(nullptr), + _materialize_timer(nullptr) { +} + +EsHttpScanner::~EsHttpScanner() { + close(); +} + +Status EsHttpScanner::open() { + _tuple_desc = _state->desc_tbl().get_tuple_descriptor(_tuple_id); + if (_tuple_desc == nullptr) { + std::stringstream ss; + ss << "Unknown tuple descriptor, tuple_id=" << _tuple_id; + return Status(ss.str()); + } + + for (auto slot : _tuple_desc->slots()) { + auto pair = _slots_map.emplace(slot->col_name(), slot); + if (!pair.second) { + std::stringstream ss; + ss << "Failed to insert slot, col_name=" << slot->col_name(); + return Status(ss.str()); + } + } + + const std::string& host = _properties.at(EsScanReader::HOST); + _es_reader.reset(new EsScanReader(host, _properties)); + if (_es_reader == nullptr) { + return Status("Es reader construct failed."); + } + + _es_reader->open(); + + //_text_converter.reset(new(std::nothrow) TextConverter('\\')); + //if (_text_converter == nullptr) { + // return Status("No memory error."); + //} + + _rows_read_counter = ADD_COUNTER(_profile, "RowsRead", TUnit::UNIT); + _read_timer = ADD_TIMER(_profile, "TotalRawReadTime(*)"); + _materialize_timer = ADD_TIMER(_profile, "MaterializeTupleTime(*)"); + + return Status::OK; +} + +Status EsHttpScanner::get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof) { + SCOPED_TIMER(_read_timer); + while (!eof) { + std::string batch_row_buffer; + if (_line_eof) { + //RETURN_IF_ERROR(_es_reader->get_next(&eof, &batch_row_buffer)); + } + //get_next_line(&batch_row_buffer); + { + COUNTER_UPDATE(_rows_read_counter, 1); + SCOPED_TIMER(_materialize_timer); + //if (convert_one_row(Slice(ptr, size), tuple, tuple_pool)) { + // break; + //} + } + } + return Status::OK; +} + +void EsHttpScanner::close() { + if (_es_reader != nullptr) { + _es_reader->close(); + } +} + +} diff --git a/be/src/exec/es_http_scanner.h b/be/src/exec/es_http_scanner.h new file mode 100644 index 00000000000000..850db04e79a569 --- /dev/null +++ b/be/src/exec/es_http_scanner.h @@ -0,0 +1,107 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef BE_EXEC_ES_HTTP_SCANNER_H +#define BE_EXEC_ES_HTTP_SCANNER_H + +#include +#include +#include +#include +#include + +#include "common/status.h" +#include "common/global_types.h" +#include "gen_cpp/PlanNodes_types.h" +#include "gen_cpp/Types_types.h" +#include "runtime/mem_pool.h" +#include "util/slice.h" +#include "util/runtime_profile.h" + +namespace doris { + +class Tuple; +class SlotDescriptor; +class Slice; +class RuntimeState; +class ExprContext; +class TextConverter; +class TupleDescriptor; +class TupleRow; +class RowDescriptor; +class MemTracker; +class RuntimeProfile; +class EsScanReader; + +struct EsScanCounter { + EsScanCounter() : num_rows_returned(0), num_rows_filtered(0) { + } + + int64_t num_rows_returned; + int64_t num_rows_filtered; +}; + +class EsHttpScanner { +public: + EsHttpScanner( + RuntimeState* state, + RuntimeProfile* profile, + TupleId tuple_id, + std::map properties, + const std::vector& conjunct_ctxs, + EsScanCounter* counter); + ~EsHttpScanner(); + + Status open(); + + Status get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof); + + void close(); + +private: + + RuntimeState* _state; + RuntimeProfile* _profile; + TupleId _tuple_id; + const std::map& _properties; + const std::vector& _conjunct_ctxs; + + std::unique_ptr _text_converter; + + int _next_range; + bool _line_eof; + + std::vector _slot_descs; + std::unique_ptr _row_desc; + + std::unique_ptr _mem_tracker; + MemPool _mem_pool; + + const TupleDescriptor* _tuple_desc; + EsScanCounter* _counter; + std::unique_ptr _es_reader; + std::map _slots_map; + + // Profile + RuntimeProfile::Counter* _rows_read_counter; + RuntimeProfile::Counter* _read_timer; + RuntimeProfile::Counter* _materialize_timer; +}; + +} + +#endif diff --git a/be/src/exec/es_predicate.cpp b/be/src/exec/es_predicate.cpp new file mode 100644 index 00000000000000..73d6073956fcd9 --- /dev/null +++ b/be/src/exec/es_predicate.cpp @@ -0,0 +1,287 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "exec/es_predicate.h" + +#include +#include +#include +#include + +#include "common/status.h" +#include "common/logging.h" +#include "exprs/expr.h" +#include "exprs/expr_context.h" +#include "exprs/in_predicate.h" + +#include "gen_cpp/PlanNodes_types.h" +#include "olap/olap_common.h" +#include "olap/utils.h" +#include "runtime/client_cache.h" +#include "runtime/runtime_state.h" +#include "runtime/row_batch.h" +#include "runtime/string_value.h" +#include "runtime/tuple_row.h" + +#include "service/backend_options.h" +#include "util/runtime_profile.h" +#include "util/debug_util.h" + +namespace doris { + +using namespace std; + +EsPredicate::EsPredicate(ExprContext* conjunct_ctx, + const TupleDescriptor* tuple_desc) : + _context(conjunct_ctx), + _disjuncts_num(0), + _tuple_desc(tuple_desc) { +} + +EsPredicate::~EsPredicate() { +} + +bool EsPredicate::build_disjuncts() { + return build_disjuncts(_context->root(), _disjuncts); +} + +vector EsPredicate::get_predicate_list(){ + return _disjuncts; +} + +bool EsPredicate::build_disjuncts(Expr* conjunct, vector& disjuncts) { + if (TExprNodeType::BINARY_PRED == conjunct->node_type()) { + if (conjunct->children().size() != 2) { + VLOG(1) << "get disjuncts fail: number of childs is not 2"; + return false; + } + + SlotRef* slotRef = nullptr; + TExprOpcode::type op; + Expr* expr = nullptr; + if (TExprNodeType::SLOT_REF == conjunct->get_child(0)->node_type()) { + expr = conjunct->get_child(1); + slotRef = (SlotRef*)(conjunct->get_child(0)); + op = conjunct->op(); + } else if (TExprNodeType::SLOT_REF == conjunct->get_child(1)->node_type()) { + expr = conjunct->get_child(0); + slotRef = (SlotRef*)(conjunct->get_child(1)); + op = conjunct->op(); + } else { + VLOG(1) << "get disjuncts fail: no SLOT_REF child"; + return false; + } + + SlotDescriptor* slot_desc = get_slot_desc(slotRef); + if (slot_desc == nullptr) { + VLOG(1) << "get disjuncts fail: slot_desc is null"; + return false; + } + + TExtLiteral literal; + if (!to_ext_literal(_context, expr, &literal)) { + VLOG(1) << "get disjuncts fail: can't get literal, node_type=" + << expr->node_type(); + return false; + } + + std::unique_ptr predicate(new ExtBinaryPredicate( + TExprNodeType::BINARY_PRED, + slot_desc->col_name(), + slot_desc->type(), + op, + literal)); + + disjuncts.emplace_back(std::move(*predicate)); + return true; + } + + if (is_match_func(conjunct)) { + TExtLiteral literal; + if (!to_ext_literal(_context, conjunct->get_child(1), &literal)) { + VLOG(1) << "get disjuncts fail: can't get literal, node_type=" + << conjunct->get_child(1)->node_type(); + return false; + } + + vector query_conditions; + query_conditions.push_back(std::move(literal)); + vector cols; //TODO + + std::unique_ptr predicate(new ExtFunction( + TExprNodeType::FUNCTION_CALL, + conjunct->fn().name.function_name, + cols, + query_conditions)); + disjuncts.emplace_back(std::move(*predicate)); + + return true; + } + + if (TExprNodeType::IN_PRED == conjunct->node_type()) { + TExtInPredicate ext_in_predicate; + vector in_pred_values; + InPredicate* pred = dynamic_cast(conjunct); + ext_in_predicate.__set_is_not_in(pred->is_not_in()); + if (Expr::type_without_cast(pred->get_child(0)) != TExprNodeType::SLOT_REF) { + return false; + } + + SlotRef* slot_ref = (SlotRef*)(conjunct->get_child(0)); + SlotDescriptor* slot_desc = get_slot_desc(slot_ref); + if (slot_desc == nullptr) { + return false; + } + + for (int i = 1; i < pred->children().size(); ++i) { + // varchar, string, all of them are string type, but varchar != string + // TODO add date, datetime support? + if (pred->get_child(0)->type().is_string_type()) { + if (!pred->get_child(i)->type().is_string_type()) { + return false; + } + } else { + if (pred->get_child(i)->type().type != pred->get_child(0)->type().type) { + return false; + } + } + TExtLiteral literal; + if (!to_ext_literal(_context, pred->get_child(i), &literal)) { + VLOG(1) << "get disjuncts fail: can't get literal, node_type=" + << pred->get_child(i)->node_type(); + return false; + } + in_pred_values.push_back(literal); + } + + std::unique_ptr predicate(new ExtInPredicate( + TExprNodeType::IN_PRED, + slot_desc->col_name(), + slot_desc->type(), + in_pred_values)); + + disjuncts.emplace_back(std::move(*predicate)); + + return true; + } + + if (TExprNodeType::COMPOUND_PRED == conjunct->node_type()) { + if (TExprOpcode::COMPOUND_OR != conjunct->op()) { + VLOG(1) << "get disjuncts fail: op is not COMPOUND_OR"; + return false; + } + if (!build_disjuncts(conjunct->get_child(0), disjuncts)) { + return false; + } + if (!build_disjuncts(conjunct->get_child(1), disjuncts)) { + return false; + } + + return true; + } + + // if go to here, report error + VLOG(1) << "get disjuncts fail: node type is " << conjunct->node_type() + << ", should be BINARY_PRED or COMPOUND_PRED"; + return false; +} + +bool EsPredicate::is_match_func(Expr* conjunct) { + if (TExprNodeType::FUNCTION_CALL == conjunct->node_type() + && conjunct->fn().name.function_name == "esquery") { + return true; + } + return false; +} + +SlotDescriptor* EsPredicate::get_slot_desc(SlotRef* slotRef) { + std::vector slot_ids; + slotRef->get_slot_ids(&slot_ids); + SlotDescriptor* slot_desc = nullptr; + for (SlotDescriptor* slot : _tuple_desc->slots()) { + if (slot->id() == slot_ids[0]) { + slot_desc = slot; + break; + } + } + return slot_desc; +} + +bool EsPredicate::to_ext_literal(ExprContext* _context, Expr* expr, TExtLiteral* literal) { + literal->__set_node_type(expr->node_type()); + switch (expr->node_type()) { + case TExprNodeType::BOOL_LITERAL: { + TBoolLiteral bool_literal; + void* value = _context->get_value(expr, NULL); + bool_literal.__set_value(*reinterpret_cast(value)); + literal->__set_bool_literal(bool_literal); + return true; + } + case TExprNodeType::DATE_LITERAL: { + void* value = _context->get_value(expr, NULL); + DateTimeValue date_value = *reinterpret_cast(value); + char str[MAX_DTVALUE_STR_LEN]; + date_value.to_string(str); + TDateLiteral date_literal; + date_literal.__set_value(str); + literal->__set_date_literal(date_literal); + return true; + } + case TExprNodeType::FLOAT_LITERAL: { + TFloatLiteral float_literal; + void* value = _context->get_value(expr, NULL); + float_literal.__set_value(*reinterpret_cast(value)); + literal->__set_float_literal(float_literal); + return true; + } + case TExprNodeType::INT_LITERAL: { + TIntLiteral int_literal; + void* value = _context->get_value(expr, NULL); + int_literal.__set_value(*reinterpret_cast(value)); + literal->__set_int_literal(int_literal); + return true; + } + case TExprNodeType::STRING_LITERAL: { + TStringLiteral string_literal; + void* value = _context->get_value(expr, NULL); + string_literal.__set_value(*reinterpret_cast(value)); + literal->__set_string_literal(string_literal); + return true; + } + case TExprNodeType::DECIMAL_LITERAL: { + TDecimalLiteral decimal_literal; + void* value = _context->get_value(expr, NULL); + decimal_literal.__set_value(reinterpret_cast(value)->to_string()); + literal->__set_decimal_literal(decimal_literal); + return true; + } + case TExprNodeType::LARGE_INT_LITERAL: { + char buf[48]; + int len = 48; + void* value = _context->get_value(expr, NULL); + char* v = LargeIntValue::to_string(*reinterpret_cast<__int128*>(value), buf, &len); + TLargeIntLiteral large_int_literal; + large_int_literal.__set_value(v); + literal->__set_large_int_literal(large_int_literal); + return true; + } + default: + return false; + } +} + +} diff --git a/be/src/exec/es_predicate.h b/be/src/exec/es_predicate.h new file mode 100644 index 00000000000000..ee328a69c7bdf6 --- /dev/null +++ b/be/src/exec/es_predicate.h @@ -0,0 +1,140 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef BE_EXEC_ES_PREDICATE_H +#define BE_EXEC_ES_PREDICATE_H + +#include +#include + +#include "exprs/slot_ref.h" +#include "gen_cpp/Exprs_types.h" +#include "gen_cpp/Opcodes_types.h" +#include "gen_cpp/PaloExternalDataSourceService_types.h" +#include "runtime/descriptors.h" +#include "runtime/tuple.h" + +namespace doris { + +class Status; +class ExprContext; +class ExtBinaryPredicate; + +struct ExtPredicate { + ExtPredicate(TExprNodeType::type node_type) : node_type(node_type) { + } + + TExprNodeType::type node_type; +}; + +struct ExtColumnDesc { + ExtColumnDesc(std::string name, TypeDescriptor type) : + name(name), + type(type) { + } + + std::string name; + TypeDescriptor type; +}; + +struct ExtBinaryPredicate : public ExtPredicate { + ExtBinaryPredicate( + TExprNodeType::type node_type, + std::string name, + TypeDescriptor type, + TExprOpcode::type op, + TExtLiteral value) : + ExtPredicate(node_type), + col(name, type), + op(op), + value(value) { + } + + ExtColumnDesc col; + TExprOpcode::type op; + TExtLiteral value; +}; + +struct ExtInPredicate : public ExtPredicate { + ExtInPredicate( + TExprNodeType::type node_type, + std::string name, + TypeDescriptor type, + vector values) : + ExtPredicate(node_type), + is_not_in(false), + col(name, type), + values(values) { + } + + bool is_not_in; + ExtColumnDesc col; + vector values; +}; + +struct ExtLikePredicate : public ExtPredicate { + ExtColumnDesc col; + TExtLiteral value; +}; + +struct ExtIsNullPredicate : public ExtPredicate { + bool is_not_null; + ExtColumnDesc col; +}; + +struct ExtFunction : public ExtPredicate { + ExtFunction( + TExprNodeType::type node_type, + string func_name, + vector cols, + vector values) : + ExtPredicate(node_type), + func_name(func_name), + cols(cols), + values(values) { + } + + string func_name; + vector cols; + vector values; +}; + +class EsPredicate { + + public: + EsPredicate(ExprContext* conjunct_ctx, + const TupleDescriptor* tuple_desc); + ~EsPredicate(); + vector get_predicate_list(); + bool build_disjuncts(); + + private: + + bool build_disjuncts(Expr* conjunct, vector& disjuncts); + bool to_ext_literal(ExprContext* context, Expr* expr, TExtLiteral* literal); + bool is_match_func(Expr* conjunct); + SlotDescriptor* get_slot_desc(SlotRef* slotRef); + + ExprContext* _context; + int _disjuncts_num; + const TupleDescriptor* _tuple_desc; + vector _disjuncts; +}; + +} + +#endif diff --git a/be/src/exec/es_query_builder.h b/be/src/exec/es_query_builder.h new file mode 100644 index 00000000000000..8d61a0ea4eb17a --- /dev/null +++ b/be/src/exec/es_query_builder.h @@ -0,0 +1,44 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include + +#include +#include + +#include "common/status.h" + +namespace doris { + +class EsPredicate; + +class EsQueryBuilder { +public: + EsQueryBuilder() {}; + ~EsQueryBuilder() {}; + + static std::string build(const std::map& properties, + const std::vector& columns, + std::vector>) { + return std::string("xxx"); + } +}; + +} + diff --git a/be/src/exec/es_scan_reader.h b/be/src/exec/es_scan_reader.h new file mode 100644 index 00000000000000..fd5516d7c0e6de --- /dev/null +++ b/be/src/exec/es_scan_reader.h @@ -0,0 +1,66 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include + +#include +#include + +#include "common/status.h" + +namespace doris { + +class EsScanReader { +public: + constexpr static const char* HOST = "host"; + constexpr static const char* INDEX = "index"; + constexpr static const char* TYPE = "type"; + constexpr static const char* SHARD_ID = "shard_id"; + constexpr static const char* BATCH_SIZE = "batch_size"; + constexpr static const char* QUERY = "query"; + + EsScanReader(const std::string& target, + const std::map& properties) : + _target(target), + _properties(properties), + _eof(false) { + } + + ~EsScanReader() {}; + + Status open() { return Status::OK; } + + Status get_next(bool* eof, std::string* buf) { + const char* json = "{\"_scroll_id\": \"DXF1ZXJ5QW5kRmV0Y2gBAAAAAAAA1ewWbEhKNHRWX1NTNG04bERuV05RUlA5Zw==\",\"hits\": {\"total\": 10,\"hits\": [{\"_source\": {\"id\": 1}},{\"_source\": {\"id\": 2}}]}}"; + buf->append(json); + *eof = true; + return Status::OK; + } + + void close() {}; + +private: + + const std::string& _target; + const std::map& _properties; + bool _eof; +}; + +} + diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp index 679d42c21d9249..c934cf5fff20f7 100644 --- a/be/src/exec/exec_node.cpp +++ b/be/src/exec/exec_node.cpp @@ -31,6 +31,7 @@ #include "exec/new_partitioned_aggregation_node.h" #include "exec/csv_scan_node.h" #include "exec/es_scan_node.h" +#include "exec/es_http_scan_node.h" #include "exec/pre_aggregation_node.h" #include "exec/hash_join_node.h" #include "exec/broker_scan_node.h" @@ -366,6 +367,10 @@ Status ExecNode::create_node(RuntimeState* state, ObjectPool* pool, const TPlanN *node = pool->add(new EsScanNode(pool, tnode, descs)); return Status::OK; + case TPlanNodeType::ES_HTTP_SCAN_NODE: + *node = pool->add(new EsHttpScanNode(pool, tnode, descs)); + return Status::OK; + case TPlanNodeType::SCHEMA_SCAN_NODE: *node = pool->add(new SchemaScanNode(pool, tnode, descs)); return Status::OK; @@ -515,6 +520,7 @@ void ExecNode::collect_scan_nodes(vector* nodes) { collect_nodes(TPlanNodeType::OLAP_SCAN_NODE, nodes); collect_nodes(TPlanNodeType::BROKER_SCAN_NODE, nodes); collect_nodes(TPlanNodeType::ES_SCAN_NODE, nodes); + collect_nodes(TPlanNodeType::ES_HTTP_SCAN_NODE, nodes); } void ExecNode::init_runtime_profile(const std::string& name) { diff --git a/be/src/exprs/expr_context.h b/be/src/exprs/expr_context.h index cbf2b6ea991134..de57638857be3f 100644 --- a/be/src/exprs/expr_context.h +++ b/be/src/exprs/expr_context.h @@ -176,6 +176,7 @@ class ExprContext { friend class InPredicate; friend class OlapScanNode; friend class EsScanNode; + friend class EsPredicate; /// FunctionContexts for each registered expression. The FunctionContexts are created /// and owned by this ExprContext. diff --git a/fe/src/main/java/org/apache/doris/planner/EsScanNode.java b/fe/src/main/java/org/apache/doris/planner/EsScanNode.java index 790b976ebb0d44..38b0f3296135d4 100644 --- a/fe/src/main/java/org/apache/doris/planner/EsScanNode.java +++ b/fe/src/main/java/org/apache/doris/planner/EsScanNode.java @@ -128,7 +128,7 @@ public void finalize(Analyzer analyzer) throws UserException { @Override protected void toThrift(TPlanNode msg) { - msg.node_type = TPlanNodeType.ES_SCAN_NODE; + msg.node_type = TPlanNodeType.ES_HTTP_SCAN_NODE; Map properties = Maps.newHashMap(); properties.put(EsTable.USER, table.getUserName()); properties.put(EsTable.PASSWORD, table.getPasswd()); diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index 69476511499d69..67fbef8f807116 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -43,7 +43,8 @@ enum TPlanNodeType { BROKER_SCAN_NODE, EMPTY_SET_NODE, UNION_NODE, - ES_SCAN_NODE + ES_SCAN_NODE, + ES_HTTP_SCAN_NODE } // phases of an execution node