From c315096bf4aa2e49d7218afe57517f3a520ac8ef Mon Sep 17 00:00:00 2001 From: lide-reed Date: Thu, 21 Mar 2019 14:53:52 +0800 Subject: [PATCH 1/4] Palo on Es http Init version (compile passed) --- be/CMakeLists.txt | 2 +- be/src/exec/CMakeLists.txt | 2 + be/src/exec/es_http_scan_node.cpp | 382 ++++++++++++++++++ be/src/exec/es_http_scan_node.h | 111 +++++ be/src/exec/es_http_scanner.cpp | 129 ++++++ be/src/exec/es_http_scanner.h | 107 +++++ be/src/exec/es_reader.h | 69 ++++ be/src/exec/exec_node.cpp | 6 + be/src/exprs/expr_context.h | 1 + .../org/apache/doris/planner/EsScanNode.java | 2 +- gensrc/thrift/PlanNodes.thrift | 3 +- 11 files changed, 811 insertions(+), 3 deletions(-) create mode 100644 be/src/exec/es_http_scan_node.cpp create mode 100644 be/src/exec/es_http_scan_node.h create mode 100644 be/src/exec/es_http_scanner.cpp create mode 100644 be/src/exec/es_http_scanner.h create mode 100644 be/src/exec/es_reader.h diff --git a/be/CMakeLists.txt b/be/CMakeLists.txt index fe4337ad8c7f45..6a059259981a5e 100644 --- a/be/CMakeLists.txt +++ b/be/CMakeLists.txt @@ -284,7 +284,7 @@ set(CXX_GCC_FLAGS "-g -Wno-unused-local-typedefs") # Debug information is stored as dwarf2 to be as compatible as possible # -Werror: compile warnings should be errors when using the toolchain compiler. # Only enable for debug builds because this is what we test in pre-commit tests. -set(CXX_FLAGS_DEBUG "${CXX_GCC_FLAGS} -Werror -ggdb") +set(CXX_FLAGS_DEBUG "${CXX_GCC_FLAGS} -Werror -O0 -gdwarf-2") # For CMAKE_BUILD_TYPE=Release # -O3: Enable all compiler optimizations diff --git a/be/src/exec/CMakeLists.txt b/be/src/exec/CMakeLists.txt index ac101769ef104c..a839db5296b7c2 100644 --- a/be/src/exec/CMakeLists.txt +++ b/be/src/exec/CMakeLists.txt @@ -63,6 +63,8 @@ set(EXEC_FILES csv_scan_node.cpp csv_scanner.cpp es_scan_node.cpp + es_http_scan_node.cpp + es_http_scanner.cpp spill_sort_node.cc union_node.cpp union_node_ir.cpp diff --git a/be/src/exec/es_http_scan_node.cpp b/be/src/exec/es_http_scan_node.cpp new file mode 100644 index 00000000000000..93729e026d233e --- /dev/null +++ b/be/src/exec/es_http_scan_node.cpp @@ -0,0 +1,382 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "exec/es_http_scan_node.h" + +#include +#include + +#include "common/object_pool.h" +#include "exprs/expr.h" +#include "runtime/runtime_state.h" +#include "runtime/row_batch.h" +#include "runtime/dpp_sink_internal.h" +#include "service/backend_options.h" +#include "util/runtime_profile.h" + +namespace doris { + +EsHttpScanNode::EsHttpScanNode( + ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) : + ScanNode(pool, tnode, descs), + _tuple_id(tnode.es_scan_node.tuple_id), + _runtime_state(nullptr), + _tuple_desc(nullptr), + _num_running_scanners(0), + _scan_finished(false), + _eos(false), + _max_buffered_batches(1024), + _wait_scanner_timer(nullptr) { +} + +EsHttpScanNode::~EsHttpScanNode() { +} + +Status EsHttpScanNode::init(const TPlanNode& tnode, RuntimeState* state) { + RETURN_IF_ERROR(ScanNode::init(tnode)); + _properties = tnode.es_http_scan_node.properties; + return Status::OK; +} + +Status EsHttpScanNode::prepare(RuntimeState* state) { + VLOG_QUERY << "EsHttpScanNode prepare"; + RETURN_IF_ERROR(ScanNode::prepare(state)); + + _runtime_state = state; + _tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id); + if (_tuple_desc == nullptr) { + std::stringstream ss; + ss << "Failed to get tuple descriptor, _tuple_id=" << _tuple_id; + return Status(ss.str()); + } + + _wait_scanner_timer = ADD_TIMER(runtime_profile(), "WaitScannerTime"); + + return Status::OK; +} + +Status EsHttpScanNode::open(RuntimeState* state) { + SCOPED_TIMER(_runtime_profile->total_time_counter()); + RETURN_IF_ERROR(ExecNode::open(state)); + RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::OPEN)); + RETURN_IF_CANCELLED(state); + + for (int conj_idx = 0; conj_idx < _conjunct_ctxs.size(); ++conj_idx) { + // if conjunct is constant, compute direct and set eos = true + if (_conjunct_ctxs[conj_idx]->root()->is_constant()) { + void* value = _conjunct_ctxs[conj_idx]->get_value(NULL); + if (value == NULL || *reinterpret_cast(value) == false) { + _eos = true; + } + } + } + + RETURN_IF_ERROR(start_scanners()); + + return Status::OK; +} + +Status EsHttpScanNode::start_scanners() { + { + std::unique_lock l(_batch_queue_lock); + _num_running_scanners = 1; + } + _scanner_threads.emplace_back(&EsHttpScanNode::scanner_worker, this, 0, + _scan_ranges.size()); + return Status::OK; +} + +Status EsHttpScanNode::get_next(RuntimeState* state, RowBatch* row_batch, + bool* eos) { + SCOPED_TIMER(_runtime_profile->total_time_counter()); + if (state->is_cancelled()) { + std::unique_lock l(_batch_queue_lock); + if (update_status(Status::CANCELLED)) { + _queue_writer_cond.notify_all(); + } + } + + if (_eos) { + *eos = true; + return Status::OK; + } + + if (_scan_finished.load()) { + *eos = true; + return Status::OK; + } + + std::shared_ptr scanner_batch; + { + std::unique_lock l(_batch_queue_lock); + while (_process_status.ok() && + !_runtime_state->is_cancelled() && + _num_running_scanners > 0 && + _batch_queue.empty()) { + SCOPED_TIMER(_wait_scanner_timer); + _queue_reader_cond.wait_for(l, std::chrono::seconds(1)); + } + if (!_process_status.ok()) { + // Some scanner process failed. + return _process_status; + } + if (_runtime_state->is_cancelled()) { + if (update_status(Status::CANCELLED)) { + _queue_writer_cond.notify_all(); + } + return _process_status; + } + if (!_batch_queue.empty()) { + scanner_batch = _batch_queue.front(); + _batch_queue.pop_front(); + } + } + + // All scanner has been finished, and all cached batch has been read + if (scanner_batch == nullptr) { + _scan_finished.store(true); + *eos = true; + return Status::OK; + } + + // notify one scanner + _queue_writer_cond.notify_one(); + + // get scanner's batch memory + row_batch->acquire_state(scanner_batch.get()); + _num_rows_returned += row_batch->num_rows(); + COUNTER_SET(_rows_returned_counter, _num_rows_returned); + + // This is first time reach limit. + // Only valid when query 'select * from table1 limit 20' + if (reached_limit()) { + int num_rows_over = _num_rows_returned - _limit; + row_batch->set_num_rows(row_batch->num_rows() - num_rows_over); + _num_rows_returned -= num_rows_over; + COUNTER_SET(_rows_returned_counter, _num_rows_returned); + + _scan_finished.store(true); + _queue_writer_cond.notify_all(); + *eos = true; + } else { + *eos = false; + } + + if (VLOG_ROW_IS_ON) { + for (int i = 0; i < row_batch->num_rows(); ++i) { + TupleRow* row = row_batch->get_row(i); + VLOG_ROW << "EsHttpScanNode output row: " + << Tuple::to_string(row->get_tuple(0), *_tuple_desc); + } + } + + return Status::OK; +} + +Status EsHttpScanNode::close(RuntimeState* state) { + if (is_closed()) { + return Status::OK; + } + RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::CLOSE)); + SCOPED_TIMER(_runtime_profile->total_time_counter()); + _scan_finished.store(true); + _queue_writer_cond.notify_all(); + _queue_reader_cond.notify_all(); + for (int i = 0; i < _scanner_threads.size(); ++i) { + _scanner_threads[i].join(); + } + + _batch_queue.clear(); + + return ExecNode::close(state); +} + +// This function is called after plan node has been prepared. +Status EsHttpScanNode::set_scan_ranges(const std::vector& scan_ranges) { + _scan_ranges = scan_ranges; + return Status::OK; +} + +void EsHttpScanNode::debug_string(int ident_level, std::stringstream* out) const { + (*out) << "EsHttpScanNode"; +} + +Status EsHttpScanNode::scanner_scan( + TupleId _tuple_id, + std::map properties, + const std::vector& conjunct_ctxs, + EsScanCounter* counter) { + std::unique_ptr scanner(new EsHttpScanner( + _runtime_state, + runtime_profile(), + _tuple_id, + properties, + conjunct_ctxs, + counter)); + RETURN_IF_ERROR(scanner->open()); + bool scanner_eof = false; + + while (!scanner_eof) { + // Fill one row batch + std::shared_ptr row_batch( + new RowBatch(row_desc(), _runtime_state->batch_size(), mem_tracker())); + + // create new tuple buffer for row_batch + MemPool* tuple_pool = row_batch->tuple_data_pool(); + int tuple_buffer_size = row_batch->capacity() * _tuple_desc->byte_size(); + void* tuple_buffer = tuple_pool->allocate(tuple_buffer_size); + if (tuple_buffer == nullptr) { + return Status("Allocate memory for row batch failed."); + } + + Tuple* tuple = reinterpret_cast(tuple_buffer); + while (!scanner_eof) { + RETURN_IF_CANCELLED(_runtime_state); + // If we have finished all works + if (_scan_finished.load()) { + return Status::OK; + } + + // This row batch has been filled up, and break this + if (row_batch->is_full()) { + break; + } + + int row_idx = row_batch->add_row(); + TupleRow* row = row_batch->get_row(row_idx); + // scan node is the first tuple of tuple row + row->set_tuple(0, tuple); + memset(tuple, 0, _tuple_desc->num_null_bytes()); + + // Get from scanner + RETURN_IF_ERROR(scanner->get_next(tuple, tuple_pool, &scanner_eof)); + if (scanner_eof) { + continue; + } + + // eval conjuncts of this row. + if (eval_conjuncts(&conjunct_ctxs[0], conjunct_ctxs.size(), row)) { + row_batch->commit_last_row(); + char* new_tuple = reinterpret_cast(tuple); + new_tuple += _tuple_desc->byte_size(); + tuple = reinterpret_cast(new_tuple); + counter->num_rows_returned++; + } else { + counter->num_rows_filtered++; + } + } + + // Row batch has been filled, push this to the queue + if (row_batch->num_rows() > 0) { + std::unique_lock l(_batch_queue_lock); + while (_process_status.ok() && + !_scan_finished.load() && + !_runtime_state->is_cancelled() && + _batch_queue.size() >= _max_buffered_batches) { + _queue_writer_cond.wait_for(l, std::chrono::seconds(1)); + } + // Process already set failed, so we just return OK + if (!_process_status.ok()) { + return Status::OK; + } + // Scan already finished, just return + if (_scan_finished.load()) { + return Status::OK; + } + // Runtime state is canceled, just return cancel + if (_runtime_state->is_cancelled()) { + return Status::CANCELLED; + } + // Queue size Must be samller than _max_buffered_batches + _batch_queue.push_back(row_batch); + + // Notify reader to + _queue_reader_cond.notify_one(); + } + } + + return Status::OK; +} + +static std::string get_host_port(const std::vector& es_hosts) { + + std::string host_port; + std::string localhost = BackendOptions::get_localhost(); + + TNetworkAddress host = es_hosts[0]; + for (auto& es_host : es_hosts) { + if (es_host.hostname == localhost) { + host = es_host; + break; + } + } + + host_port = host.hostname; + host_port += ":"; + host_port += std::to_string(host.port); + return host_port; +} + +void EsHttpScanNode::scanner_worker(int start_idx, int length) { + // Clone expr context + std::vector scanner_expr_ctxs; + auto status = Expr::clone_if_not_exists(_conjunct_ctxs, _runtime_state, &scanner_expr_ctxs); + if (!status.ok()) { + LOG(WARNING) << "Clone conjuncts failed."; + } + + EsScanCounter counter; + for (int i = 0; i < length && status.ok(); ++i) { + const TEsScanRange& es_scan_range = + _scan_ranges[start_idx + i].scan_range.es_scan_range; + + _properties[EsReader::INDEX] = es_scan_range.index; + if (es_scan_range.__isset.type) { + _properties[EsReader::TYPE] = es_scan_range.type; + } + _properties[EsReader::SHARD_ID] = std::to_string(es_scan_range.shard_id); + _properties[EsReader::BATCH_SIZE] = std::to_string(_runtime_state->batch_size()); + _properties[EsReader::HOST] = get_host_port(es_scan_range.es_hosts); + + status = scanner_scan(_tuple_id, _properties, scanner_expr_ctxs, &counter); + if (!status.ok()) { + LOG(WARNING) << "Scanner[" << start_idx + i << "] prcess failed. status=" + << status.get_error_msg(); + } + } + + // Update stats + _runtime_state->update_num_rows_load_success(counter.num_rows_returned); + _runtime_state->update_num_rows_load_filtered(counter.num_rows_filtered); + + // scanner is going to finish + { + std::lock_guard l(_batch_queue_lock); + if (!status.ok()) { + update_status(status); + } + // This scanner will finish + _num_running_scanners--; + } + _queue_reader_cond.notify_all(); + // If one scanner failed, others don't need scan any more + if (!status.ok()) { + _queue_writer_cond.notify_all(); + } + Expr::close(scanner_expr_ctxs, _runtime_state); +} +} diff --git a/be/src/exec/es_http_scan_node.h b/be/src/exec/es_http_scan_node.h new file mode 100644 index 00000000000000..6194e0fc63fd24 --- /dev/null +++ b/be/src/exec/es_http_scan_node.h @@ -0,0 +1,111 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef BE_EXEC_ES_HTTP_SCAN_NODE_H +#define BE_EXEC_ES_HTTP_SCAN_NODE_H + +#include +#include +#include +#include +#include +#include +#include + +#include "common/status.h" +#include "exec/scan_node.h" +#include "exec/es_http_scanner.h" +#include "gen_cpp/PaloInternalService_types.h" + +namespace doris { + +class RuntimeState; +class PartRangeKey; +class PartitionInfo; +class EsHttpScanCounter; + +class EsHttpScanNode : public ScanNode { +public: + EsHttpScanNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs); + virtual ~EsHttpScanNode(); + + virtual Status init(const TPlanNode& tnode, RuntimeState* state = nullptr) override; + + virtual Status prepare(RuntimeState* state) override; + + virtual Status open(RuntimeState* state) override; + + virtual Status get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) override; + + virtual Status close(RuntimeState* state) override; + + virtual Status set_scan_ranges(const std::vector& scan_ranges) override; + +protected: + // Write debug string of this into out. + virtual void debug_string(int indentation_level, std::stringstream* out) const override; + +private: + // Update process status to one failed status, + // NOTE: Must hold the mutex of this scan node + bool update_status(const Status& new_status) { + if (_process_status.ok()) { + _process_status = new_status; + return true; + } + return false; + } + + // Create scanners to do scan job + Status start_scanners(); + + // One scanner worker, This scanner will hanle 'length' ranges start from start_idx + void scanner_worker(int start_idx, int length); + + // Scan one range + Status scanner_scan(TupleId _tuple_id, + std::map properties, + const std::vector& conjunct_ctxs, + EsScanCounter* counter); +private: + + TupleId _tuple_id; + RuntimeState* _runtime_state; + TupleDescriptor* _tuple_desc; + + int _num_running_scanners; + std::atomic _scan_finished; + bool _eos; + int _max_buffered_batches; + RuntimeProfile::Counter* _wait_scanner_timer; + + bool _all_scanners_finished; + Status _process_status; + + std::vector _scanner_threads; + std::map _properties; + std::vector _scan_ranges; + + std::mutex _batch_queue_lock; + std::condition_variable _queue_reader_cond; + std::condition_variable _queue_writer_cond; + std::deque> _batch_queue; +}; + +} + +#endif diff --git a/be/src/exec/es_http_scanner.cpp b/be/src/exec/es_http_scanner.cpp new file mode 100644 index 00000000000000..350aa1c8a56684 --- /dev/null +++ b/be/src/exec/es_http_scanner.cpp @@ -0,0 +1,129 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "exec/es_http_scanner.h" + +#include +#include + +#include "runtime/descriptors.h" +#include "runtime/exec_env.h" +#include "runtime/mem_tracker.h" +#include "runtime/raw_value.h" +#include "runtime/tuple.h" +#include "exprs/expr.h" +#include "exec/text_converter.h" +#include "exec/text_converter.hpp" + +namespace doris { + +EsHttpScanner::EsHttpScanner( + RuntimeState* state, + RuntimeProfile* profile, + TupleId tuple_id, + std::map properties, + const std::vector& conjunct_ctxs, + EsScanCounter* counter) : + _state(state), + _profile(profile), + _tuple_id(tuple_id), + _properties(properties), + _conjunct_ctxs(conjunct_ctxs), + _next_range(0), + _eof(false), +#if BE_TEST + _mem_tracker(new MemTracker()), + _mem_pool(_mem_tracker.get()), +#else + _mem_tracker(new MemTracker(-1, "EsHttp Scanner", state->instance_mem_tracker())), + _mem_pool(_state->instance_mem_tracker()), +#endif + _tuple_desc(nullptr), + _counter(counter), + _rows_read_counter(nullptr), + _read_timer(nullptr), + _materialize_timer(nullptr) { +} + +EsHttpScanner::~EsHttpScanner() { + close(); +} + +Status EsHttpScanner::open() { + _tuple_desc = _state->desc_tbl().get_tuple_descriptor(_tuple_id); + if (_tuple_desc == nullptr) { + std::stringstream ss; + ss << "Unknown tuple descriptor, tuple_id=" << _tuple_id; + return Status(ss.str()); + } + + for (auto slot_desc : _tuple_desc->slots()) { + if (!slot_desc->is_materialized()) { + continue; + } + _column_names.push_back(slot_desc->col_name()); + } + + const std::string& host = _properties.at(EsReader::HOST); + _es_reader.reset(new EsReader(host, _properties, _column_names)); + if (_es_reader == nullptr) { + return Status("Es reader construct failed."); + } + + _es_reader->open(); + + //_text_converter.reset(new(std::nothrow) TextConverter('\\')); + //if (_text_converter == nullptr) { + // return Status("No memory error."); + //} + + _rows_read_counter = ADD_COUNTER(_profile, "RowsRead", TUnit::UNIT); + _read_timer = ADD_TIMER(_profile, "TotalRawReadTime(*)"); + _materialize_timer = ADD_TIMER(_profile, "MaterializeTupleTime(*)"); + + return Status::OK; +} + +Status EsHttpScanner::get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof) { + SCOPED_TIMER(_read_timer); + _eof = false; + while (!_eof) { + uint8_t* ptr = nullptr; + size_t size = 0; + RETURN_IF_ERROR(_es_reader->read(ptr, &size, &_eof)); + *eof = _eof; + if (size == 0) { + continue; + } + { + COUNTER_UPDATE(_rows_read_counter, 1); + SCOPED_TIMER(_materialize_timer); + //if (convert_one_row(Slice(ptr, size), tuple, tuple_pool)) { + // break; + //} + } + } + return Status::OK; +} + +void EsHttpScanner::close() { + if (_es_reader != nullptr) { + _es_reader->close(); + } +} + +} diff --git a/be/src/exec/es_http_scanner.h b/be/src/exec/es_http_scanner.h new file mode 100644 index 00000000000000..7c01750a60a573 --- /dev/null +++ b/be/src/exec/es_http_scanner.h @@ -0,0 +1,107 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef BE_EXEC_ES_HTTP_SCANNER_H +#define BE_EXEC_ES_HTTP_SCANNER_H + +#include +#include +#include +#include +#include + +#include "common/status.h" +#include "common/global_types.h" +#include "exec/es_reader.h" +#include "gen_cpp/PlanNodes_types.h" +#include "gen_cpp/Types_types.h" +#include "runtime/mem_pool.h" +#include "util/slice.h" +#include "util/runtime_profile.h" + +namespace doris { + +class Tuple; +class SlotDescriptor; +class Slice; +class RuntimeState; +class ExprContext; +class TextConverter; +class TupleDescriptor; +class TupleRow; +class RowDescriptor; +class MemTracker; +class RuntimeProfile; + +struct EsScanCounter { + EsScanCounter() : num_rows_returned(0), num_rows_filtered(0) { + } + + int64_t num_rows_returned; + int64_t num_rows_filtered; +}; + +class EsHttpScanner { +public: + EsHttpScanner( + RuntimeState* state, + RuntimeProfile* profile, + TupleId tuple_id, + std::map properties, + const std::vector& conjunct_ctxs, + EsScanCounter* counter); + ~EsHttpScanner(); + + Status open(); + + Status get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof); + + void close(); + +private: + + RuntimeState* _state; + RuntimeProfile* _profile; + TupleId _tuple_id; + const std::map& _properties; + const std::vector& _conjunct_ctxs; + + std::unique_ptr _text_converter; + + int _next_range; + bool _eof; + + std::vector _slot_descs; + std::unique_ptr _row_desc; + + std::unique_ptr _mem_tracker; + MemPool _mem_pool; + + const TupleDescriptor* _tuple_desc; + EsScanCounter* _counter; + std::vector _column_names; + std::unique_ptr _es_reader; + + // Profile + RuntimeProfile::Counter* _rows_read_counter; + RuntimeProfile::Counter* _read_timer; + RuntimeProfile::Counter* _materialize_timer; +}; + +} + +#endif diff --git a/be/src/exec/es_reader.h b/be/src/exec/es_reader.h new file mode 100644 index 00000000000000..7eb6cbea048dc7 --- /dev/null +++ b/be/src/exec/es_reader.h @@ -0,0 +1,69 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include + +#include +#include + +#include "common/status.h" + +namespace doris { + +class EsReader { +public: + constexpr static const char* HOST = "host"; + constexpr static const char* INDEX = "index"; + constexpr static const char* TYPE = "type"; + constexpr static const char* SHARD_ID = "shard_id"; + constexpr static const char* BATCH_SIZE = "batch_size"; + + EsReader(const std::string& target, + const std::map& properties, + const std::vector& columns) : + _target(target), + _properties(properties), + _columns(columns), + _eof(false) { + } + + ~EsReader() {}; + + Status open() { return Status::OK; } + + Status read(uint8_t* buf, size_t* buf_len, bool* eof) { + const char* json = "{\"_scroll_id\": \"DXF1ZXJ5QW5kRmV0Y2gBAAAAAAAA1ewWbEhKNHRWX1NTNG04bERuV05RUlA5Zw==\",\"hits\": {\"total\": 10,\"hits\": [{\"_source\": {\"id\": 1}},{\"_source\": {\"id\": 2}}]}}"; + memcpy(buf, json, strlen(json)); + *buf_len = strlen(json); + *eof = true; + return Status::OK; + } + + void close() {}; + +private: + + const std::string& _target; + const std::map& _properties; + const std::vector& _columns; + bool _eof; +}; + +} + diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp index 679d42c21d9249..c934cf5fff20f7 100644 --- a/be/src/exec/exec_node.cpp +++ b/be/src/exec/exec_node.cpp @@ -31,6 +31,7 @@ #include "exec/new_partitioned_aggregation_node.h" #include "exec/csv_scan_node.h" #include "exec/es_scan_node.h" +#include "exec/es_http_scan_node.h" #include "exec/pre_aggregation_node.h" #include "exec/hash_join_node.h" #include "exec/broker_scan_node.h" @@ -366,6 +367,10 @@ Status ExecNode::create_node(RuntimeState* state, ObjectPool* pool, const TPlanN *node = pool->add(new EsScanNode(pool, tnode, descs)); return Status::OK; + case TPlanNodeType::ES_HTTP_SCAN_NODE: + *node = pool->add(new EsHttpScanNode(pool, tnode, descs)); + return Status::OK; + case TPlanNodeType::SCHEMA_SCAN_NODE: *node = pool->add(new SchemaScanNode(pool, tnode, descs)); return Status::OK; @@ -515,6 +520,7 @@ void ExecNode::collect_scan_nodes(vector* nodes) { collect_nodes(TPlanNodeType::OLAP_SCAN_NODE, nodes); collect_nodes(TPlanNodeType::BROKER_SCAN_NODE, nodes); collect_nodes(TPlanNodeType::ES_SCAN_NODE, nodes); + collect_nodes(TPlanNodeType::ES_HTTP_SCAN_NODE, nodes); } void ExecNode::init_runtime_profile(const std::string& name) { diff --git a/be/src/exprs/expr_context.h b/be/src/exprs/expr_context.h index cbf2b6ea991134..8ec749d223808b 100644 --- a/be/src/exprs/expr_context.h +++ b/be/src/exprs/expr_context.h @@ -176,6 +176,7 @@ class ExprContext { friend class InPredicate; friend class OlapScanNode; friend class EsScanNode; + friend class EsHttpScanNode; /// FunctionContexts for each registered expression. The FunctionContexts are created /// and owned by this ExprContext. diff --git a/fe/src/main/java/org/apache/doris/planner/EsScanNode.java b/fe/src/main/java/org/apache/doris/planner/EsScanNode.java index 790b976ebb0d44..38b0f3296135d4 100644 --- a/fe/src/main/java/org/apache/doris/planner/EsScanNode.java +++ b/fe/src/main/java/org/apache/doris/planner/EsScanNode.java @@ -128,7 +128,7 @@ public void finalize(Analyzer analyzer) throws UserException { @Override protected void toThrift(TPlanNode msg) { - msg.node_type = TPlanNodeType.ES_SCAN_NODE; + msg.node_type = TPlanNodeType.ES_HTTP_SCAN_NODE; Map properties = Maps.newHashMap(); properties.put(EsTable.USER, table.getUserName()); properties.put(EsTable.PASSWORD, table.getPasswd()); diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index 69476511499d69..67fbef8f807116 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -43,7 +43,8 @@ enum TPlanNodeType { BROKER_SCAN_NODE, EMPTY_SET_NODE, UNION_NODE, - ES_SCAN_NODE + ES_SCAN_NODE, + ES_HTTP_SCAN_NODE } // phases of an execution node From 8ab3e8b833c4577235199ebe5f7f7b3a6bc49daa Mon Sep 17 00:00:00 2001 From: lide-reed Date: Thu, 21 Mar 2019 18:46:28 +0800 Subject: [PATCH 2/4] Add EsQueryBuilder and adjust EsScanReader --- be/src/exec/es_http_scan_node.cpp | 23 +++++++--- be/src/exec/es_http_scan_node.h | 3 ++ be/src/exec/es_http_scanner.cpp | 20 +++++---- be/src/exec/es_http_scanner.h | 6 +-- be/src/exec/es_query_builder.h | 42 +++++++++++++++++++ be/src/exec/{es_reader.h => es_scan_reader.h} | 17 ++++---- 6 files changed, 84 insertions(+), 27 deletions(-) create mode 100644 be/src/exec/es_query_builder.h rename be/src/exec/{es_reader.h => es_scan_reader.h} (83%) diff --git a/be/src/exec/es_http_scan_node.cpp b/be/src/exec/es_http_scan_node.cpp index 93729e026d233e..9eb7a86d24f29d 100644 --- a/be/src/exec/es_http_scan_node.cpp +++ b/be/src/exec/es_http_scan_node.cpp @@ -27,6 +27,8 @@ #include "runtime/dpp_sink_internal.h" #include "service/backend_options.h" #include "util/runtime_profile.h" +#include "exec/es_query_builder.h" +#include "exec/es_scan_reader.h" namespace doris { @@ -36,6 +38,7 @@ EsHttpScanNode::EsHttpScanNode( _tuple_id(tnode.es_scan_node.tuple_id), _runtime_state(nullptr), _tuple_desc(nullptr), + _query_builder(nullptr), _num_running_scanners(0), _scan_finished(false), _eos(false), @@ -48,7 +51,7 @@ EsHttpScanNode::~EsHttpScanNode() { Status EsHttpScanNode::init(const TPlanNode& tnode, RuntimeState* state) { RETURN_IF_ERROR(ScanNode::init(tnode)); - _properties = tnode.es_http_scan_node.properties; + _properties = tnode.es_scan_node.properties; return Status::OK; } @@ -64,6 +67,13 @@ Status EsHttpScanNode::prepare(RuntimeState* state) { return Status(ss.str()); } + for (auto slot_desc : _tuple_desc->slots()) { + if (!slot_desc->is_materialized()) { + continue; + } + _column_names.push_back(slot_desc->col_name()); + } + _wait_scanner_timer = ADD_TIMER(runtime_profile(), "WaitScannerTime"); return Status::OK; @@ -344,13 +354,14 @@ void EsHttpScanNode::scanner_worker(int start_idx, int length) { const TEsScanRange& es_scan_range = _scan_ranges[start_idx + i].scan_range.es_scan_range; - _properties[EsReader::INDEX] = es_scan_range.index; + _properties[EsScanReader::INDEX] = es_scan_range.index; if (es_scan_range.__isset.type) { - _properties[EsReader::TYPE] = es_scan_range.type; + _properties[EsScanReader::TYPE] = es_scan_range.type; } - _properties[EsReader::SHARD_ID] = std::to_string(es_scan_range.shard_id); - _properties[EsReader::BATCH_SIZE] = std::to_string(_runtime_state->batch_size()); - _properties[EsReader::HOST] = get_host_port(es_scan_range.es_hosts); + _properties[EsScanReader::SHARD_ID] = std::to_string(es_scan_range.shard_id); + _properties[EsScanReader::BATCH_SIZE] = std::to_string(_runtime_state->batch_size()); + _properties[EsScanReader::HOST] = get_host_port(es_scan_range.es_hosts); + _properties[EsScanReader::QUERY] = EsQueryBuilder::build(_properties, _column_names); status = scanner_scan(_tuple_id, _properties, scanner_expr_ctxs, &counter); if (!status.ok()) { diff --git a/be/src/exec/es_http_scan_node.h b/be/src/exec/es_http_scan_node.h index 6194e0fc63fd24..3f38c51153bca6 100644 --- a/be/src/exec/es_http_scan_node.h +++ b/be/src/exec/es_http_scan_node.h @@ -37,6 +37,7 @@ class RuntimeState; class PartRangeKey; class PartitionInfo; class EsHttpScanCounter; +class EsQueryBuilder; class EsHttpScanNode : public ScanNode { public: @@ -86,6 +87,7 @@ class EsHttpScanNode : public ScanNode { TupleId _tuple_id; RuntimeState* _runtime_state; TupleDescriptor* _tuple_desc; + std::unique_ptr _query_builder; int _num_running_scanners; std::atomic _scan_finished; @@ -99,6 +101,7 @@ class EsHttpScanNode : public ScanNode { std::vector _scanner_threads; std::map _properties; std::vector _scan_ranges; + std::vector _column_names; std::mutex _batch_queue_lock; std::condition_variable _queue_reader_cond; diff --git a/be/src/exec/es_http_scanner.cpp b/be/src/exec/es_http_scanner.cpp index 350aa1c8a56684..2a0eafc44478e6 100644 --- a/be/src/exec/es_http_scanner.cpp +++ b/be/src/exec/es_http_scanner.cpp @@ -26,6 +26,7 @@ #include "runtime/raw_value.h" #include "runtime/tuple.h" #include "exprs/expr.h" +#include "exec/es_scan_reader.h" #include "exec/text_converter.h" #include "exec/text_converter.hpp" @@ -54,6 +55,7 @@ EsHttpScanner::EsHttpScanner( #endif _tuple_desc(nullptr), _counter(counter), + _es_reader(nullptr), _rows_read_counter(nullptr), _read_timer(nullptr), _materialize_timer(nullptr) { @@ -71,15 +73,17 @@ Status EsHttpScanner::open() { return Status(ss.str()); } - for (auto slot_desc : _tuple_desc->slots()) { - if (!slot_desc->is_materialized()) { - continue; + for (auto slot : _tuple_desc->slots()) { + auto pair = _slots_map.emplace(slot->col_name(), slot); + if (!pair.second) { + std::stringstream ss; + ss << "Failed to insert slot, col_name=" << slot->col_name(); + return Status(ss.str()); } - _column_names.push_back(slot_desc->col_name()); } - const std::string& host = _properties.at(EsReader::HOST); - _es_reader.reset(new EsReader(host, _properties, _column_names)); + const std::string& host = _properties.at(EsScanReader::HOST); + _es_reader.reset(new EsScanReader(host, _properties)); if (_es_reader == nullptr) { return Status("Es reader construct failed."); } @@ -102,9 +106,9 @@ Status EsHttpScanner::get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof) { SCOPED_TIMER(_read_timer); _eof = false; while (!_eof) { - uint8_t* ptr = nullptr; + std::string* buffer = new std::string(); size_t size = 0; - RETURN_IF_ERROR(_es_reader->read(ptr, &size, &_eof)); + RETURN_IF_ERROR(_es_reader->get_next(&_eof, buffer)); *eof = _eof; if (size == 0) { continue; diff --git a/be/src/exec/es_http_scanner.h b/be/src/exec/es_http_scanner.h index 7c01750a60a573..22ae3b4b5d60b0 100644 --- a/be/src/exec/es_http_scanner.h +++ b/be/src/exec/es_http_scanner.h @@ -26,7 +26,6 @@ #include "common/status.h" #include "common/global_types.h" -#include "exec/es_reader.h" #include "gen_cpp/PlanNodes_types.h" #include "gen_cpp/Types_types.h" #include "runtime/mem_pool.h" @@ -46,6 +45,7 @@ class TupleRow; class RowDescriptor; class MemTracker; class RuntimeProfile; +class EsScanReader; struct EsScanCounter { EsScanCounter() : num_rows_returned(0), num_rows_filtered(0) { @@ -93,8 +93,8 @@ class EsHttpScanner { const TupleDescriptor* _tuple_desc; EsScanCounter* _counter; - std::vector _column_names; - std::unique_ptr _es_reader; + std::unique_ptr _es_reader; + std::map _slots_map; // Profile RuntimeProfile::Counter* _rows_read_counter; diff --git a/be/src/exec/es_query_builder.h b/be/src/exec/es_query_builder.h new file mode 100644 index 00000000000000..919c5f5d6eabf1 --- /dev/null +++ b/be/src/exec/es_query_builder.h @@ -0,0 +1,42 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include + +#include +#include + +#include "common/status.h" + +namespace doris { + +class EsQueryBuilder { +public: + EsQueryBuilder() {}; + ~EsQueryBuilder() {}; + + static std::string build(const std::map& properties, + const std::vector& columns + ) { + return std::string("xxx"); + } +}; + +} + diff --git a/be/src/exec/es_reader.h b/be/src/exec/es_scan_reader.h similarity index 83% rename from be/src/exec/es_reader.h rename to be/src/exec/es_scan_reader.h index 7eb6cbea048dc7..fd5516d7c0e6de 100644 --- a/be/src/exec/es_reader.h +++ b/be/src/exec/es_scan_reader.h @@ -26,31 +26,29 @@ namespace doris { -class EsReader { +class EsScanReader { public: constexpr static const char* HOST = "host"; constexpr static const char* INDEX = "index"; constexpr static const char* TYPE = "type"; constexpr static const char* SHARD_ID = "shard_id"; constexpr static const char* BATCH_SIZE = "batch_size"; + constexpr static const char* QUERY = "query"; - EsReader(const std::string& target, - const std::map& properties, - const std::vector& columns) : + EsScanReader(const std::string& target, + const std::map& properties) : _target(target), _properties(properties), - _columns(columns), _eof(false) { } - ~EsReader() {}; + ~EsScanReader() {}; Status open() { return Status::OK; } - Status read(uint8_t* buf, size_t* buf_len, bool* eof) { + Status get_next(bool* eof, std::string* buf) { const char* json = "{\"_scroll_id\": \"DXF1ZXJ5QW5kRmV0Y2gBAAAAAAAA1ewWbEhKNHRWX1NTNG04bERuV05RUlA5Zw==\",\"hits\": {\"total\": 10,\"hits\": [{\"_source\": {\"id\": 1}},{\"_source\": {\"id\": 2}}]}}"; - memcpy(buf, json, strlen(json)); - *buf_len = strlen(json); + buf->append(json); *eof = true; return Status::OK; } @@ -61,7 +59,6 @@ class EsReader { const std::string& _target; const std::map& _properties; - const std::vector& _columns; bool _eof; }; From e6b576311c19d1c4a5582f3f6c8cf1aa20c173d5 Mon Sep 17 00:00:00 2001 From: lide-reed Date: Mon, 25 Mar 2019 09:50:31 +0800 Subject: [PATCH 3/4] Intruduce ES Predicate process --- be/src/exec/CMakeLists.txt | 1 + be/src/exec/es_http_scan_node.cpp | 18 +- be/src/exec/es_http_scan_node.h | 4 + be/src/exec/es_http_scanner.cpp | 15 +- be/src/exec/es_http_scanner.h | 2 +- be/src/exec/es_predicate.cpp | 291 ++++++++++++++++++++++++++++++ be/src/exec/es_predicate.h | 144 +++++++++++++++ be/src/exprs/expr_context.h | 2 +- 8 files changed, 465 insertions(+), 12 deletions(-) create mode 100644 be/src/exec/es_predicate.cpp create mode 100644 be/src/exec/es_predicate.h diff --git a/be/src/exec/CMakeLists.txt b/be/src/exec/CMakeLists.txt index a839db5296b7c2..ae7ad19996cf97 100644 --- a/be/src/exec/CMakeLists.txt +++ b/be/src/exec/CMakeLists.txt @@ -65,6 +65,7 @@ set(EXEC_FILES es_scan_node.cpp es_http_scan_node.cpp es_http_scanner.cpp + es_predicate.cpp spill_sort_node.cc union_node.cpp union_node_ir.cpp diff --git a/be/src/exec/es_http_scan_node.cpp b/be/src/exec/es_http_scan_node.cpp index 9eb7a86d24f29d..a663e581a789d8 100644 --- a/be/src/exec/es_http_scan_node.cpp +++ b/be/src/exec/es_http_scan_node.cpp @@ -29,6 +29,7 @@ #include "util/runtime_profile.h" #include "exec/es_query_builder.h" #include "exec/es_scan_reader.h" +#include "exec/es_predicate.h" namespace doris { @@ -79,6 +80,20 @@ Status EsHttpScanNode::prepare(RuntimeState* state) { return Status::OK; } +vector EsHttpScanNode::get_predicates() { + vector predicates; + for (int i = 0; i < _conjunct_ctxs.size(); ++i) { + std::unique_ptr predicate( + new ExtPredicate(_conjunct_ctxs[i], _tuple_desc)); + if (predicate->build_disjuncts()) { + predicates.emplace_back(std::move(predicate)); + predicate_to_conjunct.push_back(i); + } + } + + return predicates; +} + Status EsHttpScanNode::open(RuntimeState* state) { SCOPED_TIMER(_runtime_profile->total_time_counter()); RETURN_IF_ERROR(ExecNode::open(state)); @@ -344,7 +359,8 @@ static std::string get_host_port(const std::vector& es_hosts) { void EsHttpScanNode::scanner_worker(int start_idx, int length) { // Clone expr context std::vector scanner_expr_ctxs; - auto status = Expr::clone_if_not_exists(_conjunct_ctxs, _runtime_state, &scanner_expr_ctxs); + auto status = Expr::clone_if_not_exists(_conjunct_ctxs, _runtime_state, + &scanner_expr_ctxs); if (!status.ok()) { LOG(WARNING) << "Clone conjuncts failed."; } diff --git a/be/src/exec/es_http_scan_node.h b/be/src/exec/es_http_scan_node.h index 3f38c51153bca6..b740aa5f5ec308 100644 --- a/be/src/exec/es_http_scan_node.h +++ b/be/src/exec/es_http_scan_node.h @@ -38,6 +38,7 @@ class PartRangeKey; class PartitionInfo; class EsHttpScanCounter; class EsQueryBuilder; +class ExtPredicate; class EsHttpScanNode : public ScanNode { public: @@ -82,6 +83,9 @@ class EsHttpScanNode : public ScanNode { std::map properties, const std::vector& conjunct_ctxs, EsScanCounter* counter); + + vector get_predicates(); + private: TupleId _tuple_id; diff --git a/be/src/exec/es_http_scanner.cpp b/be/src/exec/es_http_scanner.cpp index 2a0eafc44478e6..a44f44297fdcd9 100644 --- a/be/src/exec/es_http_scanner.cpp +++ b/be/src/exec/es_http_scanner.cpp @@ -45,7 +45,7 @@ EsHttpScanner::EsHttpScanner( _properties(properties), _conjunct_ctxs(conjunct_ctxs), _next_range(0), - _eof(false), + _line_eof(false), #if BE_TEST _mem_tracker(new MemTracker()), _mem_pool(_mem_tracker.get()), @@ -104,15 +104,12 @@ Status EsHttpScanner::open() { Status EsHttpScanner::get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof) { SCOPED_TIMER(_read_timer); - _eof = false; - while (!_eof) { - std::string* buffer = new std::string(); - size_t size = 0; - RETURN_IF_ERROR(_es_reader->get_next(&_eof, buffer)); - *eof = _eof; - if (size == 0) { - continue; + while (!eof) { + std::string batch_row_buffer; + if (_line_eof) { + //RETURN_IF_ERROR(_es_reader->get_next(&eof, &batch_row_buffer)); } + //get_next_line(&batch_row_buffer); { COUNTER_UPDATE(_rows_read_counter, 1); SCOPED_TIMER(_materialize_timer); diff --git a/be/src/exec/es_http_scanner.h b/be/src/exec/es_http_scanner.h index 22ae3b4b5d60b0..850db04e79a569 100644 --- a/be/src/exec/es_http_scanner.h +++ b/be/src/exec/es_http_scanner.h @@ -83,7 +83,7 @@ class EsHttpScanner { std::unique_ptr _text_converter; int _next_range; - bool _eof; + bool _line_eof; std::vector _slot_descs; std::unique_ptr _row_desc; diff --git a/be/src/exec/es_predicate.cpp b/be/src/exec/es_predicate.cpp new file mode 100644 index 00000000000000..fe56078fb11ed2 --- /dev/null +++ b/be/src/exec/es_predicate.cpp @@ -0,0 +1,291 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "exec/es_predicate.h" + +#include +#include +#include +#include + +#include "common/status.h" +#include "common/logging.h" +#include "exprs/expr.h" +#include "exprs/expr_context.h" +#include "exprs/in_predicate.h" + +#include "gen_cpp/PlanNodes_types.h" +#include "olap/olap_common.h" +#include "olap/utils.h" +#include "runtime/client_cache.h" +#include "runtime/runtime_state.h" +#include "runtime/row_batch.h" +#include "runtime/string_value.h" +#include "runtime/tuple_row.h" + +#include "service/backend_options.h" +#include "util/runtime_profile.h" +#include "util/debug_util.h" + +namespace doris { + +using namespace std; + +EsPredicate::EsPredicate(ExprContext* conjunct_ctx, + const TupleDescriptor* tuple_desc) : + _context(conjunct_ctx), + _disjuncts_num(0), + _tuple_desc(tuple_desc) { +} + +EsPredicate::~EsPredicate() { +} + +bool EsPredicate::build_disjuncts() { + return build_disjuncts(_context[i]->root(), _disjuncts); +} + +vector EsPredicate::get_predicate_list(){ + return _disjuncts; +} + +bool EsPredicate::build_disjuncts() { + return build_disjuncts(_context[i]->root(), _disjuncts); +} + +bool EsPredicate::build_disjuncts(Expr* conjunct, vector& disjuncts) { + if (TExprNodeType::BINARY_PRED == conjunct->node_type()) { + if (conjunct->children().size() != 2) { + VLOG(1) << "get disjuncts fail: number of childs is not 2"; + return false; + } + + SlotRef* slotRef = nullptr; + TExprOpcode::type op; + Expr* expr = nullptr; + if (TExprNodeType::SLOT_REF == conjunct->get_child(0)->node_type()) { + expr = conjunct->get_child(1); + slotRef = (SlotRef*)(conjunct->get_child(0)); + op = conjunct->op(); + } else if (TExprNodeType::SLOT_REF == conjunct->get_child(1)->node_type()) { + expr = conjunct->get_child(0); + slotRef = (SlotRef*)(conjunct->get_child(1)); + op = conjunct->op(); + } else { + VLOG(1) << "get disjuncts fail: no SLOT_REF child"; + return false; + } + + SlotDescriptor* slot_desc = get_slot_desc(slotRef); + if (slot_desc == nullptr) { + VLOG(1) << "get disjuncts fail: slot_desc is null"; + return false; + } + + TExtLiteral literal; + if (!to_ext_literal(_context, expr, &literal)) { + VLOG(1) << "get disjuncts fail: can't get literal, node_type=" + << expr->node_type(); + return false; + } + + std::unique_ptr predicate(new ExtBinaryPredicate( + TExprNodeType::BINARY_PRED, + slot_desc->col_name(), + slot_desc->type(), + op, + literal)); + + disjuncts.emplace_back(std::move(*predicate)); + return true; + } + + if (is_match_func(conjunct)) { + TExtLiteral literal; + if (!to_ext_literal(_context, conjunct->get_child(1), &literal)) { + VLOG(1) << "get disjuncts fail: can't get literal, node_type=" + << conjunct->get_child(1)->node_type(); + return false; + } + + vector query_conditions; + query_conditions.push_back(std::move(literal)); + vector cols; //TODO + + std::unique_ptr predicate(new ExtFunction( + TExprNodeType::FUNCTION_CALL, + conjunct->fn().name.function_name, + cols, + query_conditions)); + disjuncts.emplace_back(std::move(*predicate)); + + return true; + } + + if (TExprNodeType::IN_PRED == conjunct->node_type()) { + TExtInPredicate ext_in_predicate; + vector in_pred_values; + InPredicate* pred = dynamic_cast(conjunct); + ext_in_predicate.__set_is_not_in(pred->is_not_in()); + if (Expr::type_without_cast(pred->get_child(0)) != TExprNodeType::SLOT_REF) { + return false; + } + + SlotRef* slot_ref = (SlotRef*)(conjunct->get_child(0)); + SlotDescriptor* slot_desc = get_slot_desc(slot_ref); + if (slot_desc == nullptr) { + return false; + } + + for (int i = 1; i < pred->children().size(); ++i) { + // varchar, string, all of them are string type, but varchar != string + // TODO add date, datetime support? + if (pred->get_child(0)->type().is_string_type()) { + if (!pred->get_child(i)->type().is_string_type()) { + return false; + } + } else { + if (pred->get_child(i)->type().type != pred->get_child(0)->type().type) { + return false; + } + } + TExtLiteral literal; + if (!to_ext_literal(_context, pred->get_child(i), &literal)) { + VLOG(1) << "get disjuncts fail: can't get literal, node_type=" + << pred->get_child(i)->node_type(); + return false; + } + in_pred_values.push_back(literal); + } + + std::unique_ptr predicate(new ExtInPredicate( + TExprNodeType::IN_PRED, + slot_desc->col_name(), + slot_desc->type(), + in_pred_values)); + + disjuncts.emplace_back(std::move(*predicate)); + + return true; + } + + if (TExprNodeType::COMPOUND_PRED == conjunct->node_type()) { + if (TExprOpcode::COMPOUND_OR != conjunct->op()) { + VLOG(1) << "get disjuncts fail: op is not COMPOUND_OR"; + return false; + } + if (!build_disjuncts(_context, conjunct->get_child(0), disjuncts)) { + return false; + } + if (!build_disjuncts(_context, conjunct->get_child(1), disjuncts)) { + return false; + } + + return true; + } + + // if go to here, report error + VLOG(1) << "get disjuncts fail: node type is " << conjunct->node_type() + << ", should be BINARY_PRED or COMPOUND_PRED"; + return false; +} + +bool EsPredicate::is_match_func(Expr* conjunct) { + if (TExprNodeType::FUNCTION_CALL == conjunct->node_type() + && conjunct->fn().name.function_name == "esquery") { + return true; + } + return false; +} + +SlotDescriptor* EsPredicate::get_slot_desc(SlotRef* slotRef) { + std::vector slot_ids; + slotRef->get_slot_ids(&slot_ids); + SlotDescriptor* slot_desc = nullptr; + for (SlotDescriptor* slot : _tuple_desc->slots()) { + if (slot->id() == slot_ids[0]) { + slot_desc = slot; + break; + } + } + return slot_desc; +} + +bool EsPredicate::to_ext_literal(ExprContext* _context, Expr* expr, TExtLiteral* literal) { + literal->__set_node_type(expr->node_type()); + switch (expr->node_type()) { + case TExprNodeType::BOOL_LITERAL: { + TBoolLiteral bool_literal; + void* value = _context->get_value(expr, NULL); + bool_literal.__set_value(*reinterpret_cast(value)); + literal->__set_bool_literal(bool_literal); + return true; + } + case TExprNodeType::DATE_LITERAL: { + void* value = _context->get_value(expr, NULL); + DateTimeValue date_value = *reinterpret_cast(value); + char str[MAX_DTVALUE_STR_LEN]; + date_value.to_string(str); + TDateLiteral date_literal; + date_literal.__set_value(str); + literal->__set_date_literal(date_literal); + return true; + } + case TExprNodeType::FLOAT_LITERAL: { + TFloatLiteral float_literal; + void* value = _context->get_value(expr, NULL); + float_literal.__set_value(*reinterpret_cast(value)); + literal->__set_float_literal(float_literal); + return true; + } + case TExprNodeType::INT_LITERAL: { + TIntLiteral int_literal; + void* value = _context->get_value(expr, NULL); + int_literal.__set_value(*reinterpret_cast(value)); + literal->__set_int_literal(int_literal); + return true; + } + case TExprNodeType::STRING_LITERAL: { + TStringLiteral string_literal; + void* value = _context->get_value(expr, NULL); + string_literal.__set_value(*reinterpret_cast(value)); + literal->__set_string_literal(string_literal); + return true; + } + case TExprNodeType::DECIMAL_LITERAL: { + TDecimalLiteral decimal_literal; + void* value = _context->get_value(expr, NULL); + decimal_literal.__set_value(reinterpret_cast(value)->to_string()); + literal->__set_decimal_literal(decimal_literal); + return true; + } + case TExprNodeType::LARGE_INT_LITERAL: { + char buf[48]; + int len = 48; + void* value = _context->get_value(expr, NULL); + char* v = LargeIntValue::to_string(*reinterpret_cast<__int128*>(value), buf, &len); + TLargeIntLiteral large_int_literal; + large_int_literal.__set_value(v); + literal->__set_large_int_literal(large_int_literal); + return true; + } + default: + return false; + } +} + +} diff --git a/be/src/exec/es_predicate.h b/be/src/exec/es_predicate.h new file mode 100644 index 00000000000000..178b847d9b82f9 --- /dev/null +++ b/be/src/exec/es_predicate.h @@ -0,0 +1,144 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef BE_EXEC_ES_PREDICATE_H +#define BE_EXEC_ES_PREDICATE_H + +#include +#include + +#include "exprs/slot_ref.h" +#include "gen_cpp/Exprs_types.h" +#include "gen_cpp/Opcodes_types.h" +#include "gen_cpp/PaloExternalDataSourceService_types.h" +#include "runtime/descriptors.h" +#include "runtime/tuple.h" + +namespace doris { + +class Status; +class ExprContext; +class ExtBinaryPredicate; + +struct ExtPredicate { + ExtPredicate(TExprNodeType::type node_type) : node_type(node_type) { + } + + //ExtBinaryPredicate* binary_predicate() { + // return dynamic_cast(this); + //} + + TExprNodeType::type node_type; +}; + +struct ExtColumnDesc { + ExtColumnDesc(std::string name, TypeDescriptor type) : + name(name), + type(type) { + } + + std::string name; + TypeDescriptor type; +}; + +struct ExtBinaryPredicate : public ExtPredicate { + ExtBinaryPredicate( + TExprNodeType::type node_type, + std::string name, + TypeDescriptor type, + TExprOpcode::type op, + TExtLiteral value) : + ExtPredicate(node_type), + col(name, type), + op(op), + value(value) { + } + + ExtColumnDesc col; + TExprOpcode::type op; + TExtLiteral value; +}; + +struct ExtInPredicate : public ExtPredicate { + ExtInPredicate( + TExprNodeType::type node_type, + std::string name, + TypeDescriptor type, + vector values) : + ExtPredicate(node_type), + is_not_in(false), + col(name, type), + values(values) { + } + + bool is_not_in; + ExtColumnDesc col; + vector values; +}; + +struct ExtLikePredicate : public ExtPredicate { + ExtColumnDesc col; + TExtLiteral value; +}; + +struct ExtIsNullPredicate : public ExtPredicate { + bool is_not_null; + ExtColumnDesc col; +}; + +struct ExtFunction : public ExtPredicate { + ExtFunction( + TExprNodeType::type node_type, + string func_name, + vector cols, + vector values) : + ExtPredicate(node_type), + func_name(func_name), + cols(cols), + values(values) { + } + + string func_name; + vector cols; + vector values; +}; + +class EsPredicate { + + public: + EsPredicate(ExprContext* conjunct_ctx, + const TupleDescriptor* tuple_desc); + ~EsPredicate(); + vector get_predicate_list(); + bool build_disjuncts(); + + private: + + bool build_disjuncts(Expr* conjunct, vector& disjuncts); + bool to_ext_literal(ExprContext* context, Expr* expr, TExtLiteral* literal); + bool is_match_func(Expr* conjunct); + SlotDescriptor* get_slot_desc(SlotRef* slotRef); + + ExprContext* _context; + const TupleDescriptor* _tuple_desc; + vector _disjuncts; + int _disjuncts_num; +}; + +} + +#endif diff --git a/be/src/exprs/expr_context.h b/be/src/exprs/expr_context.h index 8ec749d223808b..de57638857be3f 100644 --- a/be/src/exprs/expr_context.h +++ b/be/src/exprs/expr_context.h @@ -176,7 +176,7 @@ class ExprContext { friend class InPredicate; friend class OlapScanNode; friend class EsScanNode; - friend class EsHttpScanNode; + friend class EsPredicate; /// FunctionContexts for each registered expression. The FunctionContexts are created /// and owned by this ExprContext. From fd6665aafb3effbbdd81d7b58e987b6c4d518deb Mon Sep 17 00:00:00 2001 From: lide-reed Date: Mon, 25 Mar 2019 15:48:59 +0800 Subject: [PATCH 4/4] Fix compile issues --- be/src/exec/es_http_scan_node.cpp | 20 +++++++++----------- be/src/exec/es_http_scan_node.h | 11 +++++++---- be/src/exec/es_predicate.cpp | 10 +++------- be/src/exec/es_predicate.h | 6 +----- be/src/exec/es_query_builder.h | 6 ++++-- 5 files changed, 24 insertions(+), 29 deletions(-) diff --git a/be/src/exec/es_http_scan_node.cpp b/be/src/exec/es_http_scan_node.cpp index a663e581a789d8..668fdbd8e5160d 100644 --- a/be/src/exec/es_http_scan_node.cpp +++ b/be/src/exec/es_http_scan_node.cpp @@ -27,7 +27,6 @@ #include "runtime/dpp_sink_internal.h" #include "service/backend_options.h" #include "util/runtime_profile.h" -#include "exec/es_query_builder.h" #include "exec/es_scan_reader.h" #include "exec/es_predicate.h" @@ -80,18 +79,15 @@ Status EsHttpScanNode::prepare(RuntimeState* state) { return Status::OK; } -vector EsHttpScanNode::get_predicates() { - vector predicates; +void EsHttpScanNode::build_predicates() { for (int i = 0; i < _conjunct_ctxs.size(); ++i) { - std::unique_ptr predicate( - new ExtPredicate(_conjunct_ctxs[i], _tuple_desc)); + std::shared_ptr predicate( + new EsPredicate(_conjunct_ctxs[i], _tuple_desc)); if (predicate->build_disjuncts()) { - predicates.emplace_back(std::move(predicate)); - predicate_to_conjunct.push_back(i); + _predicates.push_back(predicate); + _predicate_to_conjunct.push_back(i); } } - - return predicates; } Status EsHttpScanNode::open(RuntimeState* state) { @@ -110,6 +106,8 @@ Status EsHttpScanNode::open(RuntimeState* state) { } } + build_predicates(); + RETURN_IF_ERROR(start_scanners()); return Status::OK; @@ -377,8 +375,8 @@ void EsHttpScanNode::scanner_worker(int start_idx, int length) { _properties[EsScanReader::SHARD_ID] = std::to_string(es_scan_range.shard_id); _properties[EsScanReader::BATCH_SIZE] = std::to_string(_runtime_state->batch_size()); _properties[EsScanReader::HOST] = get_host_port(es_scan_range.es_hosts); - _properties[EsScanReader::QUERY] = EsQueryBuilder::build(_properties, _column_names); - + _properties[EsScanReader::QUERY] = EsQueryBuilder::build(_properties, _column_names, _predicates); + status = scanner_scan(_tuple_id, _properties, scanner_expr_ctxs, &counter); if (!status.ok()) { LOG(WARNING) << "Scanner[" << start_idx + i << "] prcess failed. status=" diff --git a/be/src/exec/es_http_scan_node.h b/be/src/exec/es_http_scan_node.h index b740aa5f5ec308..3b5658d4d17524 100644 --- a/be/src/exec/es_http_scan_node.h +++ b/be/src/exec/es_http_scan_node.h @@ -29,6 +29,7 @@ #include "common/status.h" #include "exec/scan_node.h" #include "exec/es_http_scanner.h" +#include "exec/es_query_builder.h" #include "gen_cpp/PaloInternalService_types.h" namespace doris { @@ -37,8 +38,7 @@ class RuntimeState; class PartRangeKey; class PartitionInfo; class EsHttpScanCounter; -class EsQueryBuilder; -class ExtPredicate; +class EsPredicate; class EsHttpScanNode : public ScanNode { public: @@ -84,10 +84,10 @@ class EsHttpScanNode : public ScanNode { const std::vector& conjunct_ctxs, EsScanCounter* counter); - vector get_predicates(); - private: + void build_predicates(); + TupleId _tuple_id; RuntimeState* _runtime_state; TupleDescriptor* _tuple_desc; @@ -111,6 +111,9 @@ class EsHttpScanNode : public ScanNode { std::condition_variable _queue_reader_cond; std::condition_variable _queue_writer_cond; std::deque> _batch_queue; + std::vector> _predicates; + + std::vector _predicate_to_conjunct; }; } diff --git a/be/src/exec/es_predicate.cpp b/be/src/exec/es_predicate.cpp index fe56078fb11ed2..73d6073956fcd9 100644 --- a/be/src/exec/es_predicate.cpp +++ b/be/src/exec/es_predicate.cpp @@ -56,17 +56,13 @@ EsPredicate::~EsPredicate() { } bool EsPredicate::build_disjuncts() { - return build_disjuncts(_context[i]->root(), _disjuncts); + return build_disjuncts(_context->root(), _disjuncts); } vector EsPredicate::get_predicate_list(){ return _disjuncts; } -bool EsPredicate::build_disjuncts() { - return build_disjuncts(_context[i]->root(), _disjuncts); -} - bool EsPredicate::build_disjuncts(Expr* conjunct, vector& disjuncts) { if (TExprNodeType::BINARY_PRED == conjunct->node_type()) { if (conjunct->children().size() != 2) { @@ -188,10 +184,10 @@ bool EsPredicate::build_disjuncts(Expr* conjunct, vector& disjunct VLOG(1) << "get disjuncts fail: op is not COMPOUND_OR"; return false; } - if (!build_disjuncts(_context, conjunct->get_child(0), disjuncts)) { + if (!build_disjuncts(conjunct->get_child(0), disjuncts)) { return false; } - if (!build_disjuncts(_context, conjunct->get_child(1), disjuncts)) { + if (!build_disjuncts(conjunct->get_child(1), disjuncts)) { return false; } diff --git a/be/src/exec/es_predicate.h b/be/src/exec/es_predicate.h index 178b847d9b82f9..ee328a69c7bdf6 100644 --- a/be/src/exec/es_predicate.h +++ b/be/src/exec/es_predicate.h @@ -38,10 +38,6 @@ struct ExtPredicate { ExtPredicate(TExprNodeType::type node_type) : node_type(node_type) { } - //ExtBinaryPredicate* binary_predicate() { - // return dynamic_cast(this); - //} - TExprNodeType::type node_type; }; @@ -134,9 +130,9 @@ class EsPredicate { SlotDescriptor* get_slot_desc(SlotRef* slotRef); ExprContext* _context; + int _disjuncts_num; const TupleDescriptor* _tuple_desc; vector _disjuncts; - int _disjuncts_num; }; } diff --git a/be/src/exec/es_query_builder.h b/be/src/exec/es_query_builder.h index 919c5f5d6eabf1..8d61a0ea4eb17a 100644 --- a/be/src/exec/es_query_builder.h +++ b/be/src/exec/es_query_builder.h @@ -26,14 +26,16 @@ namespace doris { +class EsPredicate; + class EsQueryBuilder { public: EsQueryBuilder() {}; ~EsQueryBuilder() {}; static std::string build(const std::map& properties, - const std::vector& columns - ) { + const std::vector& columns, + std::vector>) { return std::string("xxx"); } };