diff --git a/be/src/exec/data_sink.cpp b/be/src/exec/data_sink.cpp index 8f480df1e091b0..a470702e397206 100644 --- a/be/src/exec/data_sink.cpp +++ b/be/src/exec/data_sink.cpp @@ -49,11 +49,13 @@ Status DataSink::create_data_sink( if (!thrift_sink.__isset.stream_sink) { return Status("Missing data stream sink."); } - + bool send_query_statistics_with_every_batch = params.__isset.send_query_statistics_with_every_batch ? + params.send_query_statistics_with_every_batch : false; // TODO: figure out good buffer size based on size of output row tmp_sink = new DataStreamSender( pool, params.sender_id, row_desc, - thrift_sink.stream_sink, params.destinations, 16 * 1024); + thrift_sink.stream_sink, params.destinations, 16 * 1024, + send_query_statistics_with_every_batch); // RETURN_IF_ERROR(sender->prepare(state->obj_pool(), thrift_sink.stream_sink)); sink->reset(tmp_sink); break; diff --git a/be/src/exec/data_sink.h b/be/src/exec/data_sink.h index 2064bf62e374c2..942e33bdaaf5fd 100644 --- a/be/src/exec/data_sink.h +++ b/be/src/exec/data_sink.h @@ -25,6 +25,7 @@ #include "gen_cpp/DataSinks_types.h" #include "gen_cpp/Exprs_types.h" #include "runtime/mem_tracker.h" +#include "runtime/query_statistics.h" namespace doris { @@ -78,11 +79,17 @@ class DataSink { // Returns the runtime profile for the sink. virtual RuntimeProfile* profile() = 0; + virtual void set_query_statistics(std::shared_ptr statistics) { + _query_statistics = statistics; + } protected: // Set to true after close() has been called. subclasses should check and set this in // close(). bool _closed; std::unique_ptr _expr_mem_tracker; + + // Maybe this will be transfered to BufferControlBlock. + std::shared_ptr _query_statistics; }; } // namespace doris diff --git a/be/src/exec/exchange_node.cpp b/be/src/exec/exchange_node.cpp index 4c7828c30966a8..ea9ef64d3d3164 100644 --- a/be/src/exec/exchange_node.cpp +++ b/be/src/exec/exchange_node.cpp @@ -43,7 +43,8 @@ ExchangeNode::ExchangeNode( _next_row_idx(0), _is_merging(tnode.exchange_node.__isset.sort_info), _offset(tnode.exchange_node.__isset.offset ? tnode.exchange_node.offset : 0), - _num_rows_skipped(0) { + _num_rows_skipped(0), + _merge_rows_counter(nullptr) { DCHECK_GE(_offset, 0); DCHECK(_is_merging || (_offset == 0)); } @@ -63,16 +64,16 @@ Status ExchangeNode::init(const TPlanNode& tnode, RuntimeState* state) { Status ExchangeNode::prepare(RuntimeState* state) { RETURN_IF_ERROR(ExecNode::prepare(state)); _convert_row_batch_timer = ADD_TIMER(runtime_profile(), "ConvertRowBatchTime"); - + _merge_rows_counter = ADD_COUNTER(runtime_profile(), "MergeRows", TUnit::UNIT); // TODO: figure out appropriate buffer size DCHECK_GT(_num_senders, 0); + _sub_plan_query_statistics_recvr.reset(new QueryStatisticsRecvr()); _stream_recvr = state->exec_env()->stream_mgr()->create_recvr( state, _input_row_desc, state->fragment_instance_id(), _id, _num_senders, config::exchg_node_buffer_size_bytes, - state->runtime_profile(), _is_merging); + state->runtime_profile(), _is_merging, _sub_plan_query_statistics_recvr.get()); if (_is_merging) { - _merge_rows_counter = ADD_COUNTER(runtime_profile(), "MergeRows", TUnit::UNIT); RETURN_IF_ERROR(_sort_exec_exprs.prepare( state, _row_descriptor, _row_descriptor, expr_mem_tracker())); // AddExprCtxsToFree(_sort_exec_exprs); @@ -95,6 +96,12 @@ Status ExchangeNode::open(RuntimeState* state) { return Status::OK; } +Status ExchangeNode::collect_query_statistics(QueryStatistics* statistics) { + RETURN_IF_ERROR(ExecNode::collect_query_statistics(statistics)); + statistics->merge(_sub_plan_query_statistics_recvr.get()); + return Status::OK; +} + Status ExchangeNode::close(RuntimeState* state) { if (is_closed()) { return Status::OK; diff --git a/be/src/exec/exchange_node.h b/be/src/exec/exchange_node.h index 6450d39f913026..e53742b30d65cf 100644 --- a/be/src/exec/exchange_node.h +++ b/be/src/exec/exchange_node.h @@ -21,11 +21,11 @@ #include #include "exec/exec_node.h" #include "exec/sort_exec_exprs.h" +#include "runtime/data_stream_recvr.h" namespace doris { class RowBatch; -class DataStreamRecvr; class RuntimeProfile; // Receiver node for data streams. The data stream receiver is created in Prepare() @@ -49,6 +49,7 @@ class ExchangeNode : public ExecNode { // Blocks until the first batch is available for consumption via GetNext(). virtual Status open(RuntimeState* state); virtual Status get_next(RuntimeState* state, RowBatch* row_batch, bool* eos); + Status collect_query_statistics(QueryStatistics* statistics) override; virtual Status close(RuntimeState* state); // the number of senders needs to be set after the c'tor, because it's not @@ -61,6 +62,7 @@ class ExchangeNode : public ExecNode { virtual void debug_string(int indentation_level, std::stringstream* out) const; private: + // Implements GetNext() for the case where _is_merging is true. Delegates the GetNext() // call to the underlying DataStreamRecvr. Status get_next_merging(RuntimeState* state, RowBatch* output_batch, bool* eos); @@ -109,6 +111,11 @@ class ExchangeNode : public ExecNode { int64_t _num_rows_skipped; RuntimeProfile::Counter* _merge_rows_counter; + + // Sub plan query statistics receiver. It is shared with DataStreamRecvr and will be + // called in two different threads. But their calls are all at different time, there is + // no problem of multithreaded access. + std::unique_ptr _sub_plan_query_statistics_recvr; }; }; diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp index 37b6b3a2dc7491..65b04c9f1763d3 100644 --- a/be/src/exec/exec_node.cpp +++ b/be/src/exec/exec_node.cpp @@ -212,6 +212,14 @@ Status ExecNode::reset(RuntimeState* state) { return Status::OK; } +Status ExecNode::collect_query_statistics(QueryStatistics* statistics) { + DCHECK(statistics != nullptr); + for (auto child_node : _children) { + child_node->collect_query_statistics(statistics); + } + return Status::OK; +} + Status ExecNode::close(RuntimeState* state) { if (_is_closed) { return Status::OK; diff --git a/be/src/exec/exec_node.h b/be/src/exec/exec_node.h index bf2df357d66b29..0e9e265baaab85 100644 --- a/be/src/exec/exec_node.h +++ b/be/src/exec/exec_node.h @@ -29,6 +29,7 @@ #include "util/runtime_profile.h" #include "util/blocking_queue.hpp" #include "runtime/bufferpool/buffer_pool.h" +#include "runtime/query_statistics.h" namespace llvm { class Function; @@ -114,6 +115,11 @@ class ExecNode { // so should be fast. virtual Status reset(RuntimeState* state); + // This should be called before close() and after get_next(), it is responsible for + // collecting statistics sent with row batch, it can't be called when prepare() returns + // error. + virtual Status collect_query_statistics(QueryStatistics* statistics); + // close() will get called for every exec node, regardless of what else is called and // the status of these calls (i.e. prepare() may never have been called, or // prepare()/open()/get_next() returned with an error). diff --git a/be/src/exec/hash_join_node.cpp b/be/src/exec/hash_join_node.cpp index 378f8f5a0a727e..e24842af8af619 100644 --- a/be/src/exec/hash_join_node.cpp +++ b/be/src/exec/hash_join_node.cpp @@ -92,11 +92,11 @@ Status HashJoinNode::prepare(RuntimeState* state) { ADD_TIMER(runtime_profile(), "PushDownComputeTime"); _probe_timer = ADD_TIMER(runtime_profile(), "ProbeTime"); - _build_row_counter = + _build_rows_counter = ADD_COUNTER(runtime_profile(), "BuildRows", TUnit::UNIT); _build_buckets_counter = ADD_COUNTER(runtime_profile(), "BuildBuckets", TUnit::UNIT); - _probe_row_counter = + _probe_rows_counter = ADD_COUNTER(runtime_profile(), "ProbeRows", TUnit::UNIT); _hash_tbl_load_factor_counter = ADD_COUNTER(runtime_profile(), "LoadFactor", TUnit::DOUBLE_VALUE); @@ -243,7 +243,7 @@ Status HashJoinNode::construct_hash_table(RuntimeState* state) { VLOG_ROW << _hash_tbl->debug_string(true, &child(1)->row_desc()); - COUNTER_SET(_build_row_counter, _hash_tbl->size()); + COUNTER_SET(_build_rows_counter, _hash_tbl->size()); COUNTER_SET(_build_buckets_counter, _hash_tbl->num_buckets()); COUNTER_SET(_hash_tbl_load_factor_counter, _hash_tbl->load_factor()); build_batch.reset(); @@ -380,7 +380,7 @@ Status HashJoinNode::open(RuntimeState* state) { // seed probe batch and _current_probe_row, etc. while (true) { RETURN_IF_ERROR(child(0)->get_next(state, _probe_batch.get(), &_probe_eos)); - COUNTER_UPDATE(_probe_row_counter, _probe_batch->num_rows()); + COUNTER_UPDATE(_probe_rows_counter, _probe_batch->num_rows()); _probe_batch_pos = 0; if (_probe_batch->num_rows() == 0) { @@ -571,7 +571,7 @@ Status HashJoinNode::get_next(RuntimeState* state, RowBatch* out_batch, bool* eo continue; } else { - COUNTER_UPDATE(_probe_row_counter, _probe_batch->num_rows()); + COUNTER_UPDATE(_probe_rows_counter, _probe_batch->num_rows()); break; } } @@ -695,7 +695,7 @@ Status HashJoinNode::left_join_get_next(RuntimeState* state, probe_timer.stop(); RETURN_IF_ERROR(child(0)->get_next(state, _probe_batch.get(), &_probe_eos)); probe_timer.start(); - COUNTER_UPDATE(_probe_row_counter, _probe_batch->num_rows()); + COUNTER_UPDATE(_probe_rows_counter, _probe_batch->num_rows()); } } } diff --git a/be/src/exec/hash_join_node.h b/be/src/exec/hash_join_node.h index 8a64fb3ab142b8..1f054d24099563 100644 --- a/be/src/exec/hash_join_node.h +++ b/be/src/exec/hash_join_node.h @@ -134,8 +134,8 @@ class HashJoinNode : public ExecNode { RuntimeProfile::Counter* _push_down_timer; // time to build hash table RuntimeProfile::Counter* _push_compute_timer; RuntimeProfile::Counter* _probe_timer; // time to probe - RuntimeProfile::Counter* _build_row_counter; // num build rows - RuntimeProfile::Counter* _probe_row_counter; // num probe rows + RuntimeProfile::Counter* _build_rows_counter; // num build rows + RuntimeProfile::Counter* _probe_rows_counter; // num probe rows RuntimeProfile::Counter* _build_buckets_counter; // num buckets in hash table RuntimeProfile::Counter* _hash_tbl_load_factor_counter; diff --git a/be/src/exec/olap_scan_node.cpp b/be/src/exec/olap_scan_node.cpp index b0a456d99b934e..7de863ecef7968 100644 --- a/be/src/exec/olap_scan_node.cpp +++ b/be/src/exec/olap_scan_node.cpp @@ -306,6 +306,13 @@ Status OlapScanNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eo return _status; } +Status OlapScanNode::collect_query_statistics(QueryStatistics* statistics) { + RETURN_IF_ERROR(ExecNode::collect_query_statistics(statistics)); + statistics->add_scan_bytes(_read_compressed_counter->value()); + statistics->add_scan_rows(rows_returned()); + return Status::OK; +} + Status OlapScanNode::close(RuntimeState* state) { if (is_closed()) { return Status::OK; diff --git a/be/src/exec/olap_scan_node.h b/be/src/exec/olap_scan_node.h index 0db834d0a013aa..a5227e87e3cbdc 100644 --- a/be/src/exec/olap_scan_node.h +++ b/be/src/exec/olap_scan_node.h @@ -56,6 +56,7 @@ class OlapScanNode : public ScanNode { virtual Status prepare(RuntimeState* state); virtual Status open(RuntimeState* state); virtual Status get_next(RuntimeState* state, RowBatch* row_batch, bool* eos); + Status collect_query_statistics(QueryStatistics* statistics) override; virtual Status close(RuntimeState* state); virtual Status set_scan_ranges(const std::vector& scan_ranges); diff --git a/be/src/runtime/CMakeLists.txt b/be/src/runtime/CMakeLists.txt index fd911382e65947..13464da33b01a4 100644 --- a/be/src/runtime/CMakeLists.txt +++ b/be/src/runtime/CMakeLists.txt @@ -93,6 +93,7 @@ add_library(Runtime STATIC initial_reservations.cc snapshot_loader.cpp kafka_consumer_pipe.cpp + query_statistics.cpp ) # This test runs forever so should not be part of 'make test' diff --git a/be/src/runtime/buffer_control_block.cpp b/be/src/runtime/buffer_control_block.cpp index f4486dea08720a..6199d731d62edb 100644 --- a/be/src/runtime/buffer_control_block.cpp +++ b/be/src/runtime/buffer_control_block.cpp @@ -31,9 +31,13 @@ void GetResultBatchCtx::on_failure(const Status& status) { delete this; } -void GetResultBatchCtx::on_close(int64_t packet_seq) { +void GetResultBatchCtx::on_close(int64_t packet_seq, + QueryStatistics* statistics) { Status status; status.to_protobuf(result->mutable_status()); + if (statistics != nullptr) { + statistics->to_pb(result->mutable_query_statistics()); + } result->set_packet_seq(packet_seq); result->set_eos(true); done->Run(); @@ -183,7 +187,7 @@ void BufferControlBlock::get_batch(GetResultBatchCtx* ctx) { return; } if (_is_close) { - ctx->on_close(_packet_num); + ctx->on_close(_packet_num, _query_statistics.get()); return; } // no ready data, push ctx to waiting list @@ -200,7 +204,7 @@ Status BufferControlBlock::close(Status exec_status) { if (!_waiting_rpc.empty()) { if (_status.ok()) { for (auto& ctx : _waiting_rpc) { - ctx->on_close(_packet_num); + ctx->on_close(_packet_num, _query_statistics.get()); } } else { for (auto& ctx : _waiting_rpc) { diff --git a/be/src/runtime/buffer_control_block.h b/be/src/runtime/buffer_control_block.h index 701ef5f70a1e9e..71682287e08368 100644 --- a/be/src/runtime/buffer_control_block.h +++ b/be/src/runtime/buffer_control_block.h @@ -24,6 +24,7 @@ #include #include "common/status.h" #include "gen_cpp/Types_types.h" +#include "runtime/query_statistics.h" namespace google { namespace protobuf { @@ -52,7 +53,7 @@ struct GetResultBatchCtx { } void on_failure(const Status& status); - void on_close(int64_t packet_seq); + void on_close(int64_t packet_seq, QueryStatistics* statistics = nullptr); void on_data(TFetchDataResult* t_result, int64_t packet_seq, bool eos = false); }; @@ -80,6 +81,9 @@ class BufferControlBlock { return _fragment_id; } + void set_query_statistics(std::shared_ptr statistics) { + _query_statistics = statistics; + } private: typedef std::list ResultQueue; @@ -100,8 +104,13 @@ class BufferControlBlock { boost::condition_variable _data_arriaval; // signal removal of data by stream consumer boost::condition_variable _data_removal; - + std::deque _waiting_rpc; + + // It is shared with PlanFragmentExecutor and will be called in two different + // threads. But their calls are all at different time, there is no problem of + // multithreaded access. + std::shared_ptr _query_statistics; }; } diff --git a/be/src/runtime/data_stream_mgr.cpp b/be/src/runtime/data_stream_mgr.cpp index 0c75fbb04a64af..3d87293fb0d5d5 100644 --- a/be/src/runtime/data_stream_mgr.cpp +++ b/be/src/runtime/data_stream_mgr.cpp @@ -51,14 +51,14 @@ inline uint32_t DataStreamMgr::get_hash_value( shared_ptr DataStreamMgr::create_recvr(RuntimeState* state, const RowDescriptor& row_desc, const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, int num_senders, int buffer_size, RuntimeProfile* profile, - bool is_merging) { + bool is_merging, QueryStatisticsRecvr* sub_plan_query_statistics_recvr) { DCHECK(profile != NULL); VLOG_FILE << "creating receiver for fragment=" << fragment_instance_id << ", node=" << dest_node_id; shared_ptr recvr( new DataStreamRecvr(this, state->instance_mem_tracker(), row_desc, fragment_instance_id, dest_node_id, num_senders, is_merging, buffer_size, - profile)); + profile, sub_plan_query_statistics_recvr)); uint32_t hash_value = get_hash_value(fragment_instance_id, dest_node_id); lock_guard l(_lock); _fragment_stream_set.insert(std::make_pair(fragment_instance_id, dest_node_id)); @@ -93,18 +93,14 @@ shared_ptr DataStreamMgr::find_recvr( return shared_ptr(); } -Status DataStreamMgr::add_data( - const PUniqueId& finst_id, int32_t node_id, - const PRowBatch& pb_batch, int32_t sender_id, - int be_number, int64_t packet_seq, - ::google::protobuf::Closure** done) { - VLOG_ROW << "add_data(): fragment_instance_id=" << print_id(finst_id) - << " node=" << node_id; +Status DataStreamMgr::transmit_data(const PTransmitDataParams* request, ::google::protobuf::Closure** done) { + const PUniqueId& finst_id = request->finst_id(); TUniqueId t_finst_id; t_finst_id.hi = finst_id.hi(); t_finst_id.lo = finst_id.lo(); - shared_ptr recvr = find_recvr(t_finst_id, node_id); - if (recvr == NULL) { + shared_ptr recvr = find_recvr(t_finst_id, request->node_id()); + + if (recvr == nullptr) { // The receiver may remove itself from the receiver map via deregister_recvr() // at any time without considering the remaining number of senders. // As a consequence, find_recvr() may return an innocuous NULL if a thread @@ -114,28 +110,20 @@ Status DataStreamMgr::add_data( // errors from receiver-initiated teardowns. return Status::OK; } - recvr->add_batch(pb_batch, sender_id, be_number, packet_seq, done); - return Status::OK; -} -Status DataStreamMgr::close_sender(const TUniqueId& fragment_instance_id, - PlanNodeId dest_node_id, - int sender_id, - int be_number) { - VLOG_FILE << "close_sender(): fragment_instance_id=" << fragment_instance_id - << ", node=" << dest_node_id; - shared_ptr recvr = find_recvr(fragment_instance_id, dest_node_id); - if (recvr == NULL) { - // The receiver may remove itself from the receiver map via deregister_recvr() - // at any time without considering the remaining number of senders. - // As a consequence, find_recvr() may return an innocuous NULL if a thread - // calling deregister_recvr() beat the thread calling find_recvr() - // in acquiring _lock. - // TODO: Rethink the lifecycle of DataStreamRecvr to distinguish - // errors from receiver-initiated teardowns. - return Status::OK; + bool eos = request->eos(); + if (request->has_row_batch()) { + recvr->add_batch(request->row_batch(), request->sender_id(), + request->be_number(), request->packet_seq(), eos ? nullptr : done); } - recvr->remove_sender(sender_id, be_number); + + if (request->has_query_statistics()) { + recvr->add_sub_plan_statistics(request->query_statistics(), request->sender_id()); + } + + if (eos) { + recvr->remove_sender(request->sender_id(), request->be_number()); + } return Status::OK; } diff --git a/be/src/runtime/data_stream_mgr.h b/be/src/runtime/data_stream_mgr.h index a4880b65e14458..fbfb9fcd0e8905 100644 --- a/be/src/runtime/data_stream_mgr.h +++ b/be/src/runtime/data_stream_mgr.h @@ -30,7 +30,9 @@ #include "common/object_pool.h" #include "runtime/descriptors.h" // for PlanNodeId #include "runtime/mem_tracker.h" +#include "runtime/query_statistics.h" #include "util/runtime_profile.h" +#include "gen_cpp/palo_internal_service.pb.h" #include "gen_cpp/Types_types.h" // for TUniqueId namespace google { @@ -76,27 +78,9 @@ class DataStreamMgr { RuntimeState* state, const RowDescriptor& row_desc, const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, int num_senders, int buffer_size, RuntimeProfile* profile, - bool is_merging); - - // Adds a row batch to the recvr identified by fragment_instance_id/dest_node_id - // if the recvr has not been cancelled. sender_id identifies the sender instance - // from which the data came. - // The call blocks if this ends up pushing the stream over its buffering limit; - // it unblocks when the consumer removed enough data to make space for - // row_batch. - // TODO: enforce per-sender quotas (something like 200% of buffer_size/#senders), - // so that a single sender can't flood the buffer and stall everybody else. - // Returns OK if successful, error status otherwise. - Status add_data(const PUniqueId& fragment_instance_id, int32_t node_id, - const PRowBatch& pb_batch, int32_t sender_id, - int32_t be_number, int64_t packet_seq, - ::google::protobuf::Closure** done); - - // Notifies the recvr associated with the fragment/node id that the specified - // sender has closed. - // Returns OK if successful, error status otherwise. - Status close_sender(const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, - int sender_id, int be_number); + bool is_merging, QueryStatisticsRecvr* sub_plan_query_statistics_recvr); + + Status transmit_data(const PTransmitDataParams* request, ::google::protobuf::Closure** done); // Closes all receivers registered for fragment_instance_id immediately. void cancel(const TUniqueId& fragment_instance_id); diff --git a/be/src/runtime/data_stream_recvr.cc b/be/src/runtime/data_stream_recvr.cc index f007e6f67b361f..c77099d8130545 100644 --- a/be/src/runtime/data_stream_recvr.cc +++ b/be/src/runtime/data_stream_recvr.cc @@ -242,6 +242,7 @@ void DataStreamRecvr::SenderQueue::add_batch( // it in this thread. batch = new RowBatch(_recvr->row_desc(), pb_batch, _recvr->mem_tracker()); } + VLOG_ROW << "added #rows=" << batch->num_rows() << " batch_size=" << batch_size << "\n"; _batch_queue.emplace_back(batch_size, batch); @@ -350,7 +351,7 @@ DataStreamRecvr::DataStreamRecvr( DataStreamMgr* stream_mgr, MemTracker* parent_tracker, const RowDescriptor& row_desc, const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, int num_senders, bool is_merging, int total_buffer_limit, - RuntimeProfile* profile) : + RuntimeProfile* profile, QueryStatisticsRecvr* sub_plan_query_statistics_recvr) : _mgr(stream_mgr), _fragment_instance_id(fragment_instance_id), _dest_node_id(dest_node_id), @@ -358,7 +359,8 @@ DataStreamRecvr::DataStreamRecvr( _row_desc(row_desc), _is_merging(is_merging), _num_buffered_bytes(0), - _profile(profile) { + _profile(profile), + _sub_plan_query_statistics_recvr(sub_plan_query_statistics_recvr) { _mem_tracker.reset(new MemTracker(-1, "DataStreamRecvr", parent_tracker)); // Create one queue per sender if is_merging is true. diff --git a/be/src/runtime/data_stream_recvr.h b/be/src/runtime/data_stream_recvr.h index 22bca1cecddd19..226054f8d47cb4 100644 --- a/be/src/runtime/data_stream_recvr.h +++ b/be/src/runtime/data_stream_recvr.h @@ -25,6 +25,7 @@ #include "common/status.h" #include "gen_cpp/Types_types.h" // for TUniqueId #include "runtime/descriptors.h" +#include "runtime/query_statistics.h" #include "util/tuple_row_compare.h" namespace google { @@ -99,6 +100,10 @@ class DataStreamRecvr { const RowDescriptor& row_desc() const { return _row_desc; } MemTracker* mem_tracker() const { return _mem_tracker.get(); } + void add_sub_plan_statistics(const PQueryStatistics& statistics, int sender_id) { + _sub_plan_query_statistics_recvr->insert(statistics, sender_id); + } + private: friend class DataStreamMgr; class SenderQueue; @@ -106,7 +111,7 @@ class DataStreamRecvr { DataStreamRecvr(DataStreamMgr* stream_mgr, MemTracker* parent_tracker, const RowDescriptor& row_desc, const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, int num_senders, bool is_merging, int total_buffer_limit, - RuntimeProfile* profile); + RuntimeProfile* profile, QueryStatisticsRecvr* sub_plan_query_statistics_recvr); // If receive queue is full, done is enqueue pending, and return with *done is nullptr void add_batch(const PRowBatch& batch, int sender_id, @@ -194,6 +199,9 @@ class DataStreamRecvr { // Wall time senders spend waiting for the recv buffer to have capacity. RuntimeProfile::Counter* _buffer_full_wall_timer; + // Sub plan query statistics receiver. + QueryStatisticsRecvr* _sub_plan_query_statistics_recvr; + // Total time spent waiting for data to arrive in the recv buffer // RuntimeProfile::Counter* _data_arrival_timer; }; diff --git a/be/src/runtime/data_stream_sender.cpp b/be/src/runtime/data_stream_sender.cpp index d0c60960fc166d..dfe0726c8097d7 100644 --- a/be/src/runtime/data_stream_sender.cpp +++ b/be/src/runtime/data_stream_sender.cpp @@ -71,7 +71,10 @@ class DataStreamSender::Channel { Channel(DataStreamSender* parent, const RowDescriptor& row_desc, const TNetworkAddress& brpc_dest, const TUniqueId& fragment_instance_id, - PlanNodeId dest_node_id, int buffer_size) : + PlanNodeId dest_node_id, + int buffer_size, + bool is_transfer_chain, + bool send_query_statistics_with_every_batch) : _parent(parent), _buffer_size(buffer_size), _row_desc(row_desc), @@ -80,7 +83,9 @@ class DataStreamSender::Channel { _num_data_bytes_sent(0), _packet_seq(0), _need_close(false), - _brpc_dest_addr(brpc_dest) { + _brpc_dest_addr(brpc_dest), + _is_transfer_chain(is_transfer_chain), + _send_query_statistics_with_every_batch(send_query_statistics_with_every_batch) { } virtual ~Channel() { @@ -163,6 +168,9 @@ class DataStreamSender::Channel { palo::PInternalService_Stub* _brpc_stub = nullptr; RefCountClosure* _closure = nullptr; int32_t _brpc_timeout_ms = 500; + // whether the dest can be treated as query statistics transfer chain. + bool _is_transfer_chain; + bool _send_query_statistics_with_every_batch; }; Status DataStreamSender::Channel::init(RuntimeState* state) { @@ -203,6 +211,10 @@ Status DataStreamSender::Channel::send_batch(PRowBatch* batch, bool eos) { } VLOG_ROW << "Channel::send_batch() instance_id=" << _fragment_instance_id << " dest_node=" << _dest_node_id; + if (_is_transfer_chain && (_send_query_statistics_with_every_batch || eos)) { + auto statistic = _brpc_request.mutable_query_statistics(); + _parent->_query_statistics->to_pb(statistic); + } _brpc_request.set_eos(eos); if (batch != nullptr) { @@ -285,7 +297,8 @@ DataStreamSender::DataStreamSender( ObjectPool* pool, int sender_id, const RowDescriptor& row_desc, const TDataStreamSink& sink, const std::vector& destinations, - int per_channel_buffer_size) : + int per_channel_buffer_size, + bool send_query_statistics_with_every_batch) : _sender_id(sender_id), _pool(pool), _row_desc(row_desc), @@ -305,11 +318,14 @@ DataStreamSender::DataStreamSender( || sink.output_partition.type == TPartitionType::RANGE_PARTITIONED); // TODO: use something like google3's linked_ptr here (scoped_ptr isn't copyable) for (int i = 0; i < destinations.size(); ++i) { + // Select first dest as transfer chain. + bool is_transfer_chain = (i == 0); _channel_shared_ptrs.emplace_back( new Channel(this, row_desc, destinations[i].brpc_server, destinations[i].fragment_instance_id, - sink.dest_node_id, per_channel_buffer_size)); + sink.dest_node_id, per_channel_buffer_size, + is_transfer_chain, send_query_statistics_with_every_batch)); _channels.push_back(_channel_shared_ptrs[i].get()); } } diff --git a/be/src/runtime/data_stream_sender.h b/be/src/runtime/data_stream_sender.h index 30f26fbe889927..a4787491d6e4a0 100644 --- a/be/src/runtime/data_stream_sender.h +++ b/be/src/runtime/data_stream_sender.h @@ -59,7 +59,7 @@ class DataStreamSender : public DataSink { DataStreamSender(ObjectPool* pool, int sender_id, const RowDescriptor& row_desc, const TDataStreamSink& sink, const std::vector& destinations, - int per_channel_buffer_size); + int per_channel_buffer_size, bool send_query_statistics_with_every_batch); virtual ~DataStreamSender(); virtual Status init(const TDataSink& thrift_sink); diff --git a/be/src/runtime/plan_fragment_executor.cpp b/be/src/runtime/plan_fragment_executor.cpp index 1e769ee6dfb0a8..19d786d3de0298 100644 --- a/be/src/runtime/plan_fragment_executor.cpp +++ b/be/src/runtime/plan_fragment_executor.cpp @@ -54,7 +54,8 @@ PlanFragmentExecutor::PlanFragmentExecutor( _prepared(false), _closed(false), _has_thread_token(false), - _is_report_success(true) { + _is_report_success(true), + _collect_query_statistics_with_every_batch(false) { } PlanFragmentExecutor::~PlanFragmentExecutor() { @@ -196,6 +197,9 @@ Status PlanFragmentExecutor::prepare(const TExecPlanFragmentParams& request) { if (sink_profile != NULL) { profile()->add_child(sink_profile, true, NULL); } + + _collect_query_statistics_with_every_batch = params.__isset.send_query_statistics_with_every_batch ? + params.send_query_statistics_with_every_batch : false; } else { _sink.reset(NULL); } @@ -226,6 +230,9 @@ Status PlanFragmentExecutor::prepare(const TExecPlanFragmentParams& request) { // _row_batch->tuple_data_pool()->set_limits(*_runtime_state->mem_trackers()); VLOG(3) << "plan_root=\n" << _plan->debug_string(); _prepared = true; + + _query_statistics.reset(new QueryStatistics()); + _sink->set_query_statistics(_query_statistics); return Status::OK; } @@ -315,8 +322,12 @@ Status PlanFragmentExecutor::open_internal() { VLOG_ROW << row->to_string(row_desc()); } } - + SCOPED_TIMER(profile()->total_time_counter()); + // Collect this plan and sub plan statisticss, and send to parent plan. + if (_collect_query_statistics_with_every_batch) { + collect_query_statistics(); + } RETURN_IF_ERROR(_sink->send(runtime_state(), batch)); } @@ -333,6 +344,7 @@ Status PlanFragmentExecutor::open_internal() { // audit the sinks to check that this is ok, or change that behaviour. { SCOPED_TIMER(profile()->total_time_counter()); + collect_query_statistics(); Status status = _sink->close(runtime_state(), _status); RETURN_IF_ERROR(status); } @@ -349,6 +361,11 @@ Status PlanFragmentExecutor::open_internal() { return Status::OK; } +void PlanFragmentExecutor::collect_query_statistics() { + _query_statistics->clear(); + _plan->collect_query_statistics(_query_statistics.get()); +} + void PlanFragmentExecutor::report_profile() { VLOG_FILE << "report_profile(): instance_id=" << _runtime_state->fragment_instance_id(); diff --git a/be/src/runtime/plan_fragment_executor.h b/be/src/runtime/plan_fragment_executor.h index 299191b598136b..cc5c51d4b250ae 100644 --- a/be/src/runtime/plan_fragment_executor.h +++ b/be/src/runtime/plan_fragment_executor.h @@ -24,6 +24,7 @@ #include "common/status.h" #include "common/object_pool.h" +#include "runtime/query_statistics.h" #include "runtime/runtime_state.h" namespace doris { @@ -204,6 +205,12 @@ class PlanFragmentExecutor { // of the execution. RuntimeProfile::Counter* _average_thread_tokens; + // It is shared with BufferControlBlock and will be called in two different + // threads. But their calls are all at different time, there is no problem of + // multithreaded access. + std::shared_ptr _query_statistics; + bool _collect_query_statistics_with_every_batch; + ObjectPool* obj_pool() { return _runtime_state->obj_pool(); } @@ -256,6 +263,9 @@ class PlanFragmentExecutor { const DescriptorTbl& desc_tbl() { return _runtime_state->desc_tbl(); } + + void collect_query_statistics(); + }; } diff --git a/be/src/runtime/query_statistics.cpp b/be/src/runtime/query_statistics.cpp new file mode 100644 index 00000000000000..3865d415815bd2 --- /dev/null +++ b/be/src/runtime/query_statistics.cpp @@ -0,0 +1,48 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "runtime/query_statistics.h" + +namespace doris { + +void QueryStatistics::merge(QueryStatisticsRecvr* recvr) { + recvr->merge(this); +} + +void QueryStatisticsRecvr::insert(const PQueryStatistics& statistics, int sender_id) { + std::lock_guard l(_lock); + QueryStatistics* query_statistics = nullptr; + auto iter = _query_statistics.find(sender_id); + if (iter == _query_statistics.end()) { + query_statistics = new QueryStatistics; + _query_statistics[sender_id] = query_statistics; + } else { + query_statistics = iter->second; + } + query_statistics->from_pb(statistics); +} + +QueryStatisticsRecvr::~QueryStatisticsRecvr() { + // It is unnecessary to lock here, because the destructor will be + // called alter DataStreamRecvr's close in ExchangeNode. + for (auto& pair : _query_statistics) { + delete pair.second; + } + _query_statistics.clear(); +} + +} diff --git a/be/src/runtime/query_statistics.h b/be/src/runtime/query_statistics.h new file mode 100644 index 00000000000000..79b911e720a126 --- /dev/null +++ b/be/src/runtime/query_statistics.h @@ -0,0 +1,101 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef DORIS_BE_EXEC_QUERY_STATISTICS_H +#define DORIS_BE_EXEC_QUERY_STATISTICS_H + +#include + +#include "gen_cpp/data.pb.h" +#include "util/spinlock.h" + +namespace doris { + +class QueryStatisticsRecvr; + +// This is responsible for collecting query statistics, usually it consists of +// two parts, one is current fragment or plan's statistics, the other is sub fragment +// or plan's statistics and QueryStatisticsRecvr is responsible for collecting it. +class QueryStatistics { +public: + + QueryStatistics() : scan_rows(0), scan_bytes(0) { + } + + void merge(const QueryStatistics& other) { + scan_rows += other.scan_rows; + scan_bytes += other.scan_bytes; + } + + void add_scan_rows(int64_t scan_rows) { + this->scan_rows += scan_rows; + } + + void add_scan_bytes(int64_t scan_bytes) { + this->scan_bytes += scan_bytes; + } + + void merge(QueryStatisticsRecvr* recvr); + + void clear() { + scan_rows = 0; + scan_bytes = 0; + } + + void to_pb(PQueryStatistics* statistics) { + DCHECK(statistics != nullptr); + statistics->set_scan_rows(scan_rows); + statistics->set_scan_bytes(scan_bytes); + } + + void from_pb(const PQueryStatistics& statistics) { + scan_rows = statistics.scan_rows(); + scan_bytes = statistics.scan_bytes(); + } + +private: + + int64_t scan_rows; + int64_t scan_bytes; +}; + +// It is used for collecting sub plan query statistics in DataStreamRecvr. +class QueryStatisticsRecvr { +public: + + ~QueryStatisticsRecvr(); + + void insert(const PQueryStatistics& statistics, int sender_id); + +private: + +friend class QueryStatistics; + + void merge(QueryStatistics* statistics) { + std::lock_guard l(_lock); + for (auto& pair : _query_statistics) { + statistics->merge(*(pair.second)); + } + } + + std::map _query_statistics; + SpinLock _lock; +}; + +} + +#endif diff --git a/be/src/runtime/result_sink.cpp b/be/src/runtime/result_sink.cpp index b5eb2c978bf6fd..b25bd0ef2a3c27 100644 --- a/be/src/runtime/result_sink.cpp +++ b/be/src/runtime/result_sink.cpp @@ -92,5 +92,9 @@ Status ResultSink::close(RuntimeState* state, Status exec_status) { return Status::OK; } +void ResultSink::set_query_statistics(std::shared_ptr statistics) { + _sender->set_query_statistics(statistics); +} + } /* vim: set ts=4 sw=4 sts=4 tw=100 : */ diff --git a/be/src/runtime/result_sink.h b/be/src/runtime/result_sink.h index 5b831fcee1c1da..53739740bd573c 100644 --- a/be/src/runtime/result_sink.h +++ b/be/src/runtime/result_sink.h @@ -56,6 +56,8 @@ class ResultSink : public DataSink { return _profile; } + void set_query_statistics(std::shared_ptr statistics) override; + private: Status prepare_exprs(RuntimeState* state); diff --git a/be/src/service/backend_service.cpp b/be/src/service/backend_service.cpp index 056495a2816e6d..496fa89131806c 100644 --- a/be/src/service/backend_service.cpp +++ b/be/src/service/backend_service.cpp @@ -130,14 +130,14 @@ void BackendService::transmit_data(TTransmitDataResult& return_val, } if (params.eos) { - Status status = _exec_env->stream_mgr()->close_sender( - params.dest_fragment_instance_id, - params.dest_node_id, - params.sender_id, - params.be_number); - VLOG_ROW << "params.eos: " << (params.eos ? "true" : "false") - << " close_sender status: " << status.get_error_msg(); - status.set_t_status(&return_val); + // Status status = _exec_env->stream_mgr()->close_sender( + // params.dest_fragment_instance_id, + // params.dest_node_id, + // params.sender_id, + // params.be_number); + //VLOG_ROW << "params.eos: " << (params.eos ? "true" : "false") + // << " close_sender status: " << status.get_error_msg(); + //status.set_t_status(&return_val); } } diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 2f4b3f347a09d6..07095faaa983d3 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -46,22 +46,9 @@ void PInternalServiceImpl::transmit_data(google::protobuf::RpcController* cnt const PTransmitDataParams* request, PTransmitDataResult* response, google::protobuf::Closure* done) { - bool eos = request->eos(); - if (request->has_row_batch()) { - _exec_env->stream_mgr()->add_data( - request->finst_id(), request->node_id(), - request->row_batch(), request->sender_id(), - request->be_number(), request->packet_seq(), - eos ? nullptr : &done); - } - if (eos) { - TUniqueId finst_id; - finst_id.__set_hi(request->finst_id().hi()); - finst_id.__set_lo(request->finst_id().lo()); - _exec_env->stream_mgr()->close_sender( - finst_id, request->node_id(), - request->sender_id(), request->be_number()); - } + VLOG_ROW << "transmit data: fragment_instance_id=" << print_id(request->finst_id()) + << " node=" << request->node_id(); + _exec_env->stream_mgr()->transmit_data(request, &done); if (done != nullptr) { done->Run(); } diff --git a/fe/src/main/java/org/apache/doris/common/proc/CurrentQueryFragmentProcNode.java b/fe/src/main/java/org/apache/doris/common/proc/CurrentQueryFragmentProcNode.java index 21ffef73fa9b63..4136d756fc5682 100644 --- a/fe/src/main/java/org/apache/doris/common/proc/CurrentQueryFragmentProcNode.java +++ b/fe/src/main/java/org/apache/doris/common/proc/CurrentQueryFragmentProcNode.java @@ -39,7 +39,7 @@ public class CurrentQueryFragmentProcNode implements ProcNodeInterface { private static final Logger LOG = LogManager.getLogger(CurrentQueryFragmentProcNode.class); public static final ImmutableList TITLE_NAMES = new ImmutableList.Builder() .add("FragmentId").add("InstanceId").add("Host") - .add("IO").add("CPU").build(); + .add("ScanRawData").add("ProcessRows").build(); private QueryStatisticsItem item; public CurrentQueryFragmentProcNode(QueryStatisticsItem item) { @@ -79,8 +79,8 @@ private ProcResult requestFragmentExecInfos() throws AnalysisException { rowData.add(instanceConsumption.getFragmentId()); rowData.add(instanceConsumption.getInstanceId().toString()); rowData.add(instanceConsumption.getAddress().toString()); - rowData.add(String.valueOf(instanceConsumption.getTotalIoConsumption())); - rowData.add(String.valueOf(instanceConsumption.getTotalCpuConsumption())); + rowData.add(instanceConsumption.getFormattingScanBytes()); + rowData.add(instanceConsumption.getFormattingProcessRows()); sortedRowDatas.add(rowData); } diff --git a/fe/src/main/java/org/apache/doris/common/proc/CurrentQueryInfoProvider.java b/fe/src/main/java/org/apache/doris/common/proc/CurrentQueryInfoProvider.java index f18d9d1399c0f7..3cddd59a4ef0f1 100644 --- a/fe/src/main/java/org/apache/doris/common/proc/CurrentQueryInfoProvider.java +++ b/fe/src/main/java/org/apache/doris/common/proc/CurrentQueryInfoProvider.java @@ -36,6 +36,7 @@ import org.apache.logging.log4j.Logger; import java.util.Collection; +import java.util.Formatter; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutionException; @@ -334,21 +335,35 @@ private String parsePossibleExecNodeName(String str) { } } - public long getTotalCpuConsumption() { + private long getTotalCpuConsumption() { long cpu = 0; for (ConsumptionCalculator consumption : calculators) { - cpu += consumption.getCpu(); + cpu += consumption.getProcessRows(); } return cpu; } - public long getTotalIoConsumption() { + private long getTotalIoConsumption() { long io = 0; for (ConsumptionCalculator consumption : calculators) { - io += consumption.getIo(); + io += consumption.getScanBytes(); } return io; } + + public String getFormattingProcessRows() { + final StringBuilder builder = new StringBuilder(); + builder.append(getTotalCpuConsumption()).append(" Rows"); + return builder.toString(); + } + + public String getFormattingScanBytes() { + final Pair pair = DebugUtil.getByteUint(getTotalIoConsumption()); + final Formatter fmt = new Formatter(); + final StringBuilder builder = new StringBuilder(); + builder.append(fmt.format("%.2f", pair.first)).append(" ").append(pair.second); + return builder.toString(); + } } public static class InstanceConsumption extends Consumption { @@ -388,27 +403,27 @@ public ConsumptionCalculator(List> counterMaps) { this.counterMaps = counterMaps; } - public long getCpu() { + public long getProcessRows() { long cpu = 0; for (Map counters : counterMaps) { - cpu += getCpuByRows(counters); + cpu += getProcessRows(counters); } return cpu; } - public long getIo() { + public long getScanBytes() { long io = 0; for (Map counters : counterMaps) { - io += getIoByByte(counters); + io += getScanBytes(counters); } return io; } - protected long getCpuByRows(Map counters) { + protected long getProcessRows(Map counters) { return 0; } - protected long getIoByByte(Map counters) { + protected long getScanBytes(Map counters) { return 0; } } @@ -419,7 +434,7 @@ public OlapScanNodeConsumptionCalculator(List> counterMaps) } @Override - protected long getIoByByte(Map counters) { + protected long getScanBytes(Map counters) { final Counter counter = counters.get("CompressedBytesRead"); return counter == null ? 0 : counter.getValue(); } @@ -431,7 +446,7 @@ public HashJoinConsumptionCalculator(List> counterMaps) { } @Override - protected long getCpuByRows(Map counters) { + protected long getProcessRows(Map counters) { final Counter probeCounter = counters.get("ProbeRows"); final Counter buildCounter = counters.get("BuildRows"); return probeCounter == null || buildCounter == null ? @@ -445,7 +460,7 @@ public HashAggConsumptionCalculator(List> counterMaps) { } @Override - protected long getCpuByRows(Map counters) { + protected long getProcessRows(Map counters) { final Counter buildCounter = counters.get("BuildRows"); return buildCounter == null ? 0 : buildCounter.getValue(); } @@ -457,7 +472,7 @@ public SortConsumptionCalculator(List> counterMaps) { } @Override - protected long getCpuByRows(Map counters) { + protected long getProcessRows(Map counters) { final Counter sortRowsCounter = counters.get("SortRows"); return sortRowsCounter == null ? 0 : sortRowsCounter.getValue(); } @@ -469,7 +484,7 @@ public WindowsConsumptionCalculator(List> counterMaps) { } @Override - protected long getCpuByRows(Map counters) { + protected long getProcessRows(Map counters) { final Counter processRowsCounter = counters.get("ProcessRows"); return processRowsCounter == null ? 0 : processRowsCounter.getValue(); @@ -482,7 +497,7 @@ public UnionConsumptionCalculator(List> counterMaps) { } @Override - protected long getCpuByRows(Map counters) { + protected long getProcessRows(Map counters) { final Counter materializeRowsCounter = counters.get("MaterializeRows"); return materializeRowsCounter == null ? 0 : materializeRowsCounter.getValue(); } @@ -495,7 +510,7 @@ public ExchangeConsumptionCalculator(List> counterMaps) { } @Override - protected long getCpuByRows(Map counters) { + protected long getProcessRows(Map counters) { final Counter mergeRowsCounter = counters.get("MergeRows"); return mergeRowsCounter == null ? 0 : mergeRowsCounter.getValue(); } diff --git a/fe/src/main/java/org/apache/doris/common/proc/CurrentQueryStatisticsProcDir.java b/fe/src/main/java/org/apache/doris/common/proc/CurrentQueryStatisticsProcDir.java index a59c3dbe9e07bf..02257be5673901 100644 --- a/fe/src/main/java/org/apache/doris/common/proc/CurrentQueryStatisticsProcDir.java +++ b/fe/src/main/java/org/apache/doris/common/proc/CurrentQueryStatisticsProcDir.java @@ -37,7 +37,7 @@ public class CurrentQueryStatisticsProcDir implements ProcDirInterface { private static final Logger LOG = LogManager.getLogger(CurrentQueryStatisticsProcDir.class); public static final ImmutableList TITLE_NAMES = new ImmutableList.Builder() .add("ConnectionId").add("QueryId").add("Database").add("User") - .add("IO").add("CPU").add("ExecTime").build(); + .add("ScanRawData").add("ProcessRows").add("ExecTime").build(); private static final int EXEC_TIME_INDEX = 6; @@ -76,8 +76,8 @@ public ProcResult fetchResult() throws AnalysisException { values.add(item.getDb()); values.add(item.getUser()); final CurrentQueryInfoProvider.Consumption consumption = consumptions.get(item.getQueryId()); - values.add(String.valueOf(consumption.getTotalIoConsumption())); - values.add(String.valueOf(consumption.getTotalCpuConsumption())); + values.add(consumption.getFormattingScanBytes()); + values.add(consumption.getFormattingProcessRows()); values.add(item.getQueryExecTime()); sortedRowData.add(values); } diff --git a/fe/src/main/java/org/apache/doris/planner/PlanFragment.java b/fe/src/main/java/org/apache/doris/planner/PlanFragment.java index 5c9220332867b2..29cce7d71c5193 100644 --- a/fe/src/main/java/org/apache/doris/planner/PlanFragment.java +++ b/fe/src/main/java/org/apache/doris/planner/PlanFragment.java @@ -96,6 +96,11 @@ public class PlanFragment extends TreeNode { // if the output is UNPARTITIONED, it is being broadcast private DataPartition outputPartition; + // Whether query statistics is sent with every batch. In order to get the query + // statistics correctly when query contains limit, it is necessary to send query + // statistics with every batch, or only in close. + private boolean transferQueryStatisticsWithEveryBatch; + // TODO: SubstitutionMap outputSmap; // substitution map to remap exprs onto the output of this fragment, to be applied // at destination fragment @@ -108,6 +113,7 @@ public PlanFragment(PlanFragmentId id, PlanNode root, DataPartition partition) { this.planRoot = root; this.dataPartition = partition; this.outputPartition = DataPartition.UNPARTITIONED; + this.transferQueryStatisticsWithEveryBatch = false; setFragmentInPlanTree(planRoot); } @@ -180,7 +186,6 @@ public TPlanFragment toThrift() { // TODO chenhao , calculated by cost result.setMin_reservation_bytes(0); result.setInitial_reservation_total_claims(0); - return result; } @@ -270,4 +275,11 @@ public PlanFragmentId getFragmentId() { return fragmentId; } + public void setTransferQueryStatisticsWithEveryBatch(boolean value) { + transferQueryStatisticsWithEveryBatch = value; + } + + public boolean isTransferQueryStatisticsWithEveryBatch() { + return transferQueryStatisticsWithEveryBatch; + } } diff --git a/fe/src/main/java/org/apache/doris/planner/Planner.java b/fe/src/main/java/org/apache/doris/planner/Planner.java index 96d3c5dedaa0d8..de6ee206aa3f98 100644 --- a/fe/src/main/java/org/apache/doris/planner/Planner.java +++ b/fe/src/main/java/org/apache/doris/planner/Planner.java @@ -176,7 +176,11 @@ public void createPlanFragments(StatementBase statment, Analyzer analyzer, TQuer fragments = distributedPlanner.createPlanFragments(singleNodePlan); } + // Optimize the transfer of query statistic when query does't contain limit. PlanFragment rootFragment = fragments.get(fragments.size() - 1); + QueryStatisticsTransferOptimizer queryStatisticTransferOptimizer = new QueryStatisticsTransferOptimizer(rootFragment); + queryStatisticTransferOptimizer.optimizeQueryStatisticsTransfer(); + if (statment instanceof InsertStmt) { InsertStmt insertStmt = (InsertStmt) statment; rootFragment = distributedPlanner.createInsertFragment(rootFragment, insertStmt, fragments); @@ -230,4 +234,53 @@ private PlanNode addUnassignedConjuncts(Analyzer analyzer, PlanNode root) Preconditions.checkState(selectNode.hasValidStats()); return selectNode; } + + private static class QueryStatisticsTransferOptimizer { + private final PlanFragment root; + + public QueryStatisticsTransferOptimizer(PlanFragment root) { + Preconditions.checkNotNull(root); + this.root = root; + } + + public void optimizeQueryStatisticsTransfer() { + optimizeQueryStatisticsTransfer(root, null); + } + + private void optimizeQueryStatisticsTransfer(PlanFragment fragment, PlanFragment parent) { + if (parent != null && hasLimit(parent.getPlanRoot(), fragment.getPlanRoot())) { + fragment.setTransferQueryStatisticsWithEveryBatch(true); + } + for (PlanFragment child : fragment.getChildren()) { + optimizeQueryStatisticsTransfer(child, fragment); + } + } + + // Check whether leaf node contains limit. + private boolean hasLimit(PlanNode ancestor, PlanNode successor) { + final List exchangeNodes = Lists.newArrayList(); + collectExchangeNode(ancestor, exchangeNodes); + for (PlanNode leaf : exchangeNodes) { + if (leaf.getChild(0) == successor + && leaf.hasLimit()) { + return true; + } + } + return false; + } + + private void collectExchangeNode(PlanNode planNode, List exchangeNodes) { + if (planNode instanceof ExchangeNode) { + exchangeNodes.add(planNode); + } + + for (PlanNode child : planNode.getChildren()) { + if (child instanceof ExchangeNode) { + exchangeNodes.add(child); + } else { + collectExchangeNode(child, exchangeNodes); + } + } + } + } } diff --git a/fe/src/main/java/org/apache/doris/qe/ConnectProcessor.java b/fe/src/main/java/org/apache/doris/qe/ConnectProcessor.java index 560d538bb2589e..ffa52f7abd1120 100644 --- a/fe/src/main/java/org/apache/doris/qe/ConnectProcessor.java +++ b/fe/src/main/java/org/apache/doris/qe/ConnectProcessor.java @@ -28,15 +28,19 @@ import org.apache.doris.common.DdlException; import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ErrorReport; +import org.apache.doris.common.Pair; +import org.apache.doris.common.util.DebugUtil; import org.apache.doris.metric.MetricRepo; import org.apache.doris.mysql.MysqlChannel; import org.apache.doris.mysql.MysqlCommand; import org.apache.doris.mysql.MysqlPacket; import org.apache.doris.mysql.MysqlProto; import org.apache.doris.mysql.MysqlSerializer; +import org.apache.doris.rpc.PQueryStatistics; import org.apache.doris.thrift.TMasterOpRequest; import org.apache.doris.thrift.TMasterOpResult; +import com.google.common.base.Preconditions; import com.google.common.base.Strings; import org.apache.logging.log4j.LogManager; @@ -46,6 +50,7 @@ import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousCloseException; +import java.util.Formatter; import java.util.List; /** @@ -92,12 +97,30 @@ private void handlePing() { ctx.getState().setOk(); } - private void auditAfterExec(String origStmt, StatementBase parsedStmt) { + private String getFormattingScanRows(PQueryStatistics statistics) { + final StringBuilder builder = new StringBuilder(); + builder.append(statistics.scanRows).append(" Rows"); + return builder.toString(); + } + + private String getFormattingScanBytes(PQueryStatistics statistics) { + final Pair pair = DebugUtil.getByteUint(statistics.scanBytes); + final Formatter fmt = new Formatter(); + final StringBuilder builder = new StringBuilder(); + builder.append(fmt.format("%.2f", pair.first)).append(" ").append(pair.second); + return builder.toString(); + } + + private void auditAfterExec(String origStmt, StatementBase parsedStmt, + PQueryStatistics statistics) { // slow query long elapseMs = System.currentTimeMillis() - ctx.getStartTime(); // query state log ctx.getAuditBuilder().put("state", ctx.getState()); ctx.getAuditBuilder().put("time", elapseMs); + Preconditions.checkNotNull(statistics); + ctx.getAuditBuilder().put("ScanRows", getFormattingScanRows(statistics)); + ctx.getAuditBuilder().put("ScanRawData", getFormattingScanBytes(statistics)); ctx.getAuditBuilder().put("returnRows", ctx.getReturnRows()); ctx.getAuditBuilder().put("stmt_id", ctx.getStmtId()); @@ -177,7 +200,8 @@ private void handleQuery() { // audit after exec // replace '\n' to '\\n' to make string in one line - auditAfterExec(stmt.replace("\n", " \\n"), executor.getParsedStmt()); + auditAfterExec(stmt.replace("\n", " \\n"), executor.getParsedStmt(), + executor.getQueryStatisticsForAuditLog()); } // Get the column definitions of a table diff --git a/fe/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/src/main/java/org/apache/doris/qe/Coordinator.java index 252a9984e6e1cf..a51c075fb69a02 100644 --- a/fe/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/src/main/java/org/apache/doris/qe/Coordinator.java @@ -574,12 +574,12 @@ void updateStatus(Status status) { } } - TResultBatch getNext() throws Exception { + public RowBatch getNext() throws Exception { if (receiver == null) { throw new UserException("There is no receiver."); } - TResultBatch resultBatch; + RowBatch resultBatch; Status status = new Status(); resultBatch = receiver.getNext(status); @@ -611,7 +611,7 @@ TResultBatch getNext() throws Exception { } } - if (resultBatch == null) { + if (resultBatch.isEos()) { this.returnedAllResults = true; // if this query is a block query do not cancel. @@ -622,7 +622,7 @@ TResultBatch getNext() throws Exception { cancelInternal(); } } else { - numReceivedRows += resultBatch.getRowsSize(); + numReceivedRows += resultBatch.getBatch().getRowsSize(); } return resultBatch; @@ -1038,7 +1038,7 @@ private Set getScanHosts(PlanNodeId id, FragmentExecParams frag return result; } - public void createScanInstance(PlanNodeId leftMostScanId, FragmentExecParams fragmentExecParams) + private void createScanInstance(PlanNodeId leftMostScanId, FragmentExecParams fragmentExecParams) throws UserException { int maxNumInstance = queryOptions.mt_dop; if (maxNumInstance == 0) { @@ -1150,7 +1150,7 @@ private void validate() { } // create collocated instance according to inputFragments - public void createCollocatedInstance(FragmentExecParams fragmentExecParams) { + private void createCollocatedInstance(FragmentExecParams fragmentExecParams) { Preconditions.checkState(fragmentExecParams.inputFragments.size() >= 1); final FragmentExecParams inputFragmentParams = fragmentExecParamsMap.get(fragmentExecParams. inputFragments.get(0)); @@ -1169,7 +1169,7 @@ private TUniqueId getNextInstanceId() { } - public void createUnionInstance(FragmentExecParams fragmentExecParams) { + private void createUnionInstance(FragmentExecParams fragmentExecParams) { final PlanFragment fragment = fragmentExecParams.fragment; // Add hosts of scan nodes List scanNodeIds = findScanNodes(fragment.getPlanRoot()); @@ -1563,7 +1563,6 @@ List toThrift(int backendNum) { params.setResource_info(tResourceInfo); params.params.setQuery_id(queryId); params.params.setFragment_instance_id(instanceExecParam.instanceId); - Map> scanRanges = instanceExecParam.perNodeScanRanges; if (scanRanges == null) { scanRanges = Maps.newHashMap(); @@ -1579,7 +1578,8 @@ List toThrift(int backendNum) { params.setBackend_num(backendNum++); params.setQuery_globals(queryGlobals); params.setQuery_options(queryOptions); - + params.params.setSend_query_statistics_with_every_batch( + fragment.isTransferQueryStatisticsWithEveryBatch()); if (queryOptions.getQuery_type() == TQueryType.LOAD) { LoadErrorHub.Param param = Catalog.getCurrentCatalog().getLoadInstance().getLoadErrorHubInfo(); if (param != null) { diff --git a/fe/src/main/java/org/apache/doris/qe/ResultReceiver.java b/fe/src/main/java/org/apache/doris/qe/ResultReceiver.java index acb87f135d8fa6..ed8a0323f7852a 100644 --- a/fe/src/main/java/org/apache/doris/qe/ResultReceiver.java +++ b/fe/src/main/java/org/apache/doris/qe/ResultReceiver.java @@ -58,11 +58,11 @@ public ResultReceiver(TUniqueId tid, Long backendId, TNetworkAddress address, in this.timeoutTs = System.currentTimeMillis() + timeoutMs; } - public TResultBatch getNext(Status status) throws TException { + public RowBatch getNext(Status status) throws TException { if (isDone) { return null; } - + final RowBatch rowBatch = new RowBatch(); try { while (!isDone && !isCancel) { PFetchDataRequest request = new PFetchDataRequest(finstId); @@ -90,7 +90,9 @@ public TResultBatch getNext(Status status) throws TException { if (code != TStatusCode.OK) { status.setPstatus(pResult.status); return null; - } + } + + rowBatch.setQueryStatistics(pResult.statistics); if (packetIdx != pResult.packetSeq) { LOG.warn("receive packet failed, expect={}, receive={}", packetIdx, pResult.packetSeq); @@ -106,7 +108,9 @@ public TResultBatch getNext(Status status) throws TException { TResultBatch resultBatch = new TResultBatch(); TDeserializer deserializer = new TDeserializer(); deserializer.deserialize(resultBatch, serialResult); - return resultBatch; + rowBatch.setBatch(resultBatch); + rowBatch.setEos(pResult.eos); + return rowBatch; } } } catch (RpcException e) { @@ -134,7 +138,7 @@ public TResultBatch getNext(Status status) throws TException { if (isCancel) { status.setStatus(Status.CANCELLED); } - return null; + return rowBatch; } public void cancel() { diff --git a/fe/src/main/java/org/apache/doris/qe/RowBatch.java b/fe/src/main/java/org/apache/doris/qe/RowBatch.java new file mode 100644 index 00000000000000..98bdfcf10888ab --- /dev/null +++ b/fe/src/main/java/org/apache/doris/qe/RowBatch.java @@ -0,0 +1,55 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.qe; + +import org.apache.doris.rpc.PQueryStatistics; +import org.apache.doris.thrift.TResultBatch; + +public final class RowBatch { + private TResultBatch batch; + private PQueryStatistics statistics; + private boolean eos; + + public RowBatch() { + eos = true; + } + + public TResultBatch getBatch() { + return batch; + } + + public void setBatch(TResultBatch batch) { + this.batch = batch; + } + + public PQueryStatistics getQueryStatistics() { + return statistics; + } + + public void setQueryStatistics(PQueryStatistics statistics) { + this.statistics = statistics; + } + + public boolean isEos() { + return eos; + } + + public void setEos(boolean eos) { + this.eos = eos; + } +} diff --git a/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java index 53b6c023091503..64159dc06636dd 100644 --- a/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -63,6 +63,7 @@ import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.planner.Planner; import org.apache.doris.rewrite.ExprRewriter; +import org.apache.doris.rpc.PQueryStatistics; import org.apache.doris.rpc.RpcException; import org.apache.doris.thrift.TExplainLevel; import org.apache.doris.thrift.TQueryOptions; @@ -101,6 +102,7 @@ public class StmtExecutor { private Planner planner; private boolean isProxy; private ShowResultSet proxyResultSet = null; + private PQueryStatistics statisticsForAuditLog; public StmtExecutor(ConnectContext context, String stmt, boolean isProxy) { this.context = context; @@ -537,23 +539,23 @@ private void handleQueryStmt() throws Exception { // so We need to send fields after first batch arrived // send result - TResultBatch batch; + RowBatch batch; MysqlChannel channel = context.getMysqlChannel(); - boolean isSendFields = false; - while ((batch = coord.getNext()) != null) { - if (!isSendFields) { - sendFields(queryStmt.getColLabels(), queryStmt.getResultExprs()); + sendFields(queryStmt.getColLabels(), queryStmt.getResultExprs()); + while (true) { + batch = coord.getNext(); + if (batch.getBatch() != null) { + for (ByteBuffer row : batch.getBatch().getRows()) { + channel.sendOnePacket(row); + } + context.updateReturnRows(batch.getBatch().getRows().size()); } - isSendFields = true; - - for (ByteBuffer row : batch.getRows()) { - channel.sendOnePacket(row); + if (batch.isEos()) { + break; } - context.updateReturnRows(batch.getRows().size()); - } - if (!isSendFields) { - sendFields(queryStmt.getColLabels(), queryStmt.getResultExprs()); } + + statisticsForAuditLog = batch.getQueryStatistics(); context.getState().setEof(); } @@ -774,4 +776,11 @@ private void handleExportStmt() throws Exception { ExportStmt exportStmt = (ExportStmt) parsedStmt; context.getCatalog().getExportMgr().addExportJob(exportStmt); } + + public PQueryStatistics getQueryStatisticsForAuditLog() { + if (statisticsForAuditLog == null) { + statisticsForAuditLog = new PQueryStatistics(); + } + return statisticsForAuditLog; + } } diff --git a/fe/src/main/java/org/apache/doris/rpc/PFetchDataResult.java b/fe/src/main/java/org/apache/doris/rpc/PFetchDataResult.java index cd4af477e49196..389327ea62c766 100644 --- a/fe/src/main/java/org/apache/doris/rpc/PFetchDataResult.java +++ b/fe/src/main/java/org/apache/doris/rpc/PFetchDataResult.java @@ -28,4 +28,6 @@ public class PFetchDataResult { public long packetSeq; @Protobuf(order = 3, required = false) public boolean eos; + @Protobuf(order = 4, required = false) + public PQueryStatistics statistics; } diff --git a/fe/src/main/java/org/apache/doris/rpc/PQueryStatistics.java b/fe/src/main/java/org/apache/doris/rpc/PQueryStatistics.java new file mode 100644 index 00000000000000..333ded38c820ca --- /dev/null +++ b/fe/src/main/java/org/apache/doris/rpc/PQueryStatistics.java @@ -0,0 +1,35 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.rpc; + +import com.baidu.bjf.remoting.protobuf.annotation.Protobuf; +import com.baidu.bjf.remoting.protobuf.annotation.ProtobufClass; + +@ProtobufClass +public class PQueryStatistics { + @Protobuf(order = 1, required = false) + public long scanRows; + @Protobuf(order = 2, required = false) + public long scanBytes; + + public PQueryStatistics() { + scanRows = 0; + scanBytes = 0; + } + +} diff --git a/fe/src/test/java/org/apache/doris/qe/ConnectProcessorTest.java b/fe/src/test/java/org/apache/doris/qe/ConnectProcessorTest.java index e3e4a573c60f17..5a5f1ff999d137 100644 --- a/fe/src/test/java/org/apache/doris/qe/ConnectProcessorTest.java +++ b/fe/src/test/java/org/apache/doris/qe/ConnectProcessorTest.java @@ -26,6 +26,7 @@ import org.apache.doris.mysql.MysqlErrPacket; import org.apache.doris.mysql.MysqlOkPacket; import org.apache.doris.mysql.MysqlSerializer; +import org.apache.doris.rpc.PQueryStatistics; import org.easymock.EasyMock; import org.junit.Assert; @@ -231,6 +232,7 @@ public void testQuery() throws Exception { // Mock statement executor StmtExecutor qe = EasyMock.createNiceMock(StmtExecutor.class); qe.execute(); + EasyMock.expect(qe.getQueryStatisticsForAuditLog()).andReturn(new PQueryStatistics()); EasyMock.expectLastCall().anyTimes(); EasyMock.replay(qe); PowerMock.expectNew( @@ -254,11 +256,11 @@ public void testQueryFail() throws Exception { StmtExecutor qe = EasyMock.createNiceMock(StmtExecutor.class); qe.execute(); EasyMock.expectLastCall().andThrow(new IOException("Fail")).anyTimes(); + EasyMock.expect(qe.getQueryStatisticsForAuditLog()).andReturn(new PQueryStatistics()); EasyMock.replay(qe); PowerMock.expectNew(StmtExecutor.class, EasyMock.isA(ConnectContext.class), EasyMock.isA(String.class)) .andReturn(qe).anyTimes(); PowerMock.replay(StmtExecutor.class); - processor.processOnce(); Assert.assertEquals(MysqlCommand.COM_QUERY, myContext.getCommand()); } @@ -272,6 +274,7 @@ public void testQueryFail2() throws Exception { // Mock statement executor StmtExecutor qe = EasyMock.createNiceMock(StmtExecutor.class); qe.execute(); + EasyMock.expect(qe.getQueryStatisticsForAuditLog()).andReturn(new PQueryStatistics()); EasyMock.expectLastCall().andThrow(new NullPointerException("Fail")).anyTimes(); EasyMock.replay(qe); PowerMock.expectNew(StmtExecutor.class, EasyMock.isA(ConnectContext.class), EasyMock.isA(String.class)) diff --git a/fe/src/test/java/org/apache/doris/qe/StmtExecutorTest.java b/fe/src/test/java/org/apache/doris/qe/StmtExecutorTest.java index 0e35a7b7a5facf..3b12236e856644 100644 --- a/fe/src/test/java/org/apache/doris/qe/StmtExecutorTest.java +++ b/fe/src/test/java/org/apache/doris/qe/StmtExecutorTest.java @@ -160,7 +160,7 @@ public void testSelect() throws Exception { cood.endProfile(); EasyMock.expectLastCall().anyTimes(); EasyMock.expect(cood.getQueryProfile()).andReturn(new RuntimeProfile()).anyTimes(); - EasyMock.expect(cood.getNext()).andReturn(null).anyTimes(); + EasyMock.expect(cood.getNext()).andReturn(new RowBatch()).anyTimes(); EasyMock.replay(cood); PowerMock.expectNew(Coordinator.class, EasyMock.isA(ConnectContext.class), EasyMock.isA(Analyzer.class), EasyMock.isA(Planner.class)) diff --git a/gensrc/proto/data.proto b/gensrc/proto/data.proto index bec67edbc3413c..d690fb6c58ddd4 100644 --- a/gensrc/proto/data.proto +++ b/gensrc/proto/data.proto @@ -19,6 +19,11 @@ syntax="proto2"; package doris; +message PQueryStatistics { + optional int64 scan_rows = 1; + optional int64 scan_bytes = 2; +} + message PRowBatch { required int32 num_rows = 1; repeated int32 row_tuples = 2; diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index 71782934b2c690..61ea6f3f6ca241 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -41,6 +41,7 @@ message PTransmitDataParams { optional PRowBatch row_batch = 6; // different per packet required int64 packet_seq = 7; + optional PQueryStatistics query_statistics = 8; }; message PTransmitDataResult { @@ -129,6 +130,7 @@ message PFetchDataResult { // valid when status is ok optional int64 packet_seq = 2; optional bool eos = 3; + optional PQueryStatistics query_statistics = 4; }; message PTriggerProfileReportRequest { diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index 4bc4d5ee13ef22..25428d880589a7 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -175,6 +175,7 @@ struct TPlanFragmentExecParams { // Id of this fragment in its role as a sender. 9: optional i32 sender_id 10: optional i32 num_senders + 11: optional bool send_query_statistics_with_every_batch } // Global query parameters assigned by the coordinator.