Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions be/src/exec/data_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,13 @@ Status DataSink::create_data_sink(
if (!thrift_sink.__isset.stream_sink) {
return Status("Missing data stream sink.");
}

bool send_query_statistics_with_every_batch = params.__isset.send_query_statistics_with_every_batch ?
params.send_query_statistics_with_every_batch : false;
// TODO: figure out good buffer size based on size of output row
tmp_sink = new DataStreamSender(
pool, params.sender_id, row_desc,
thrift_sink.stream_sink, params.destinations, 16 * 1024);
thrift_sink.stream_sink, params.destinations, 16 * 1024,
send_query_statistics_with_every_batch);
// RETURN_IF_ERROR(sender->prepare(state->obj_pool(), thrift_sink.stream_sink));
sink->reset(tmp_sink);
break;
Expand Down
7 changes: 7 additions & 0 deletions be/src/exec/data_sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "gen_cpp/DataSinks_types.h"
#include "gen_cpp/Exprs_types.h"
#include "runtime/mem_tracker.h"
#include "runtime/query_statistics.h"

namespace doris {

Expand Down Expand Up @@ -78,11 +79,17 @@ class DataSink {
// Returns the runtime profile for the sink.
virtual RuntimeProfile* profile() = 0;

virtual void set_query_statistics(std::shared_ptr<QueryStatistics> statistics) {
_query_statistics = statistics;
}
protected:
// Set to true after close() has been called. subclasses should check and set this in
// close().
bool _closed;
std::unique_ptr<MemTracker> _expr_mem_tracker;

// Maybe this will be transfered to BufferControlBlock.
std::shared_ptr<QueryStatistics> _query_statistics;
};

} // namespace doris
Expand Down
15 changes: 11 additions & 4 deletions be/src/exec/exchange_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ ExchangeNode::ExchangeNode(
_next_row_idx(0),
_is_merging(tnode.exchange_node.__isset.sort_info),
_offset(tnode.exchange_node.__isset.offset ? tnode.exchange_node.offset : 0),
_num_rows_skipped(0) {
_num_rows_skipped(0),
_merge_rows_counter(nullptr) {
DCHECK_GE(_offset, 0);
DCHECK(_is_merging || (_offset == 0));
}
Expand All @@ -63,16 +64,16 @@ Status ExchangeNode::init(const TPlanNode& tnode, RuntimeState* state) {
Status ExchangeNode::prepare(RuntimeState* state) {
RETURN_IF_ERROR(ExecNode::prepare(state));
_convert_row_batch_timer = ADD_TIMER(runtime_profile(), "ConvertRowBatchTime");

_merge_rows_counter = ADD_COUNTER(runtime_profile(), "MergeRows", TUnit::UNIT);
// TODO: figure out appropriate buffer size
DCHECK_GT(_num_senders, 0);
_sub_plan_query_statistics_recvr.reset(new QueryStatisticsRecvr());
_stream_recvr = state->exec_env()->stream_mgr()->create_recvr(
state, _input_row_desc,
state->fragment_instance_id(), _id,
_num_senders, config::exchg_node_buffer_size_bytes,
state->runtime_profile(), _is_merging);
state->runtime_profile(), _is_merging, _sub_plan_query_statistics_recvr.get());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is better to let _stream_recvr own _sub_plan_query_statistics_recvr, and this node use _stream_recvr.get_ststs_recvr() to use it.

if (_is_merging) {
_merge_rows_counter = ADD_COUNTER(runtime_profile(), "MergeRows", TUnit::UNIT);
RETURN_IF_ERROR(_sort_exec_exprs.prepare(
state, _row_descriptor, _row_descriptor, expr_mem_tracker()));
// AddExprCtxsToFree(_sort_exec_exprs);
Expand All @@ -95,6 +96,12 @@ Status ExchangeNode::open(RuntimeState* state) {
return Status::OK;
}

Status ExchangeNode::collect_query_statistics(QueryStatistics* statistics) {
RETURN_IF_ERROR(ExecNode::collect_query_statistics(statistics));
statistics->merge(_sub_plan_query_statistics_recvr.get());
return Status::OK;
}

Status ExchangeNode::close(RuntimeState* state) {
if (is_closed()) {
return Status::OK;
Expand Down
9 changes: 8 additions & 1 deletion be/src/exec/exchange_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@
#include <boost/scoped_ptr.hpp>
#include "exec/exec_node.h"
#include "exec/sort_exec_exprs.h"
#include "runtime/data_stream_recvr.h"

namespace doris {

class RowBatch;
class DataStreamRecvr;
class RuntimeProfile;

// Receiver node for data streams. The data stream receiver is created in Prepare()
Expand All @@ -49,6 +49,7 @@ class ExchangeNode : public ExecNode {
// Blocks until the first batch is available for consumption via GetNext().
virtual Status open(RuntimeState* state);
virtual Status get_next(RuntimeState* state, RowBatch* row_batch, bool* eos);
Status collect_query_statistics(QueryStatistics* statistics) override;
virtual Status close(RuntimeState* state);

// the number of senders needs to be set after the c'tor, because it's not
Expand All @@ -61,6 +62,7 @@ class ExchangeNode : public ExecNode {
virtual void debug_string(int indentation_level, std::stringstream* out) const;

private:

// Implements GetNext() for the case where _is_merging is true. Delegates the GetNext()
// call to the underlying DataStreamRecvr.
Status get_next_merging(RuntimeState* state, RowBatch* output_batch, bool* eos);
Expand Down Expand Up @@ -109,6 +111,11 @@ class ExchangeNode : public ExecNode {
int64_t _num_rows_skipped;

RuntimeProfile::Counter* _merge_rows_counter;

// Sub plan query statistics receiver. It is shared with DataStreamRecvr and will be
// called in two different threads. But their calls are all at different time, there is
// no problem of multithreaded access.
std::unique_ptr<QueryStatisticsRecvr> _sub_plan_query_statistics_recvr;
};

};
Expand Down
8 changes: 8 additions & 0 deletions be/src/exec/exec_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,14 @@ Status ExecNode::reset(RuntimeState* state) {
return Status::OK;
}

Status ExecNode::collect_query_statistics(QueryStatistics* statistics) {
DCHECK(statistics != nullptr);
for (auto child_node : _children) {
child_node->collect_query_statistics(statistics);
}
return Status::OK;
}

Status ExecNode::close(RuntimeState* state) {
if (_is_closed) {
return Status::OK;
Expand Down
6 changes: 6 additions & 0 deletions be/src/exec/exec_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include "util/runtime_profile.h"
#include "util/blocking_queue.hpp"
#include "runtime/bufferpool/buffer_pool.h"
#include "runtime/query_statistics.h"

namespace llvm {
class Function;
Expand Down Expand Up @@ -114,6 +115,11 @@ class ExecNode {
// so should be fast.
virtual Status reset(RuntimeState* state);

// This should be called before close() and after get_next(), it is responsible for
// collecting statistics sent with row batch, it can't be called when prepare() returns
// error.
virtual Status collect_query_statistics(QueryStatistics* statistics);

// close() will get called for every exec node, regardless of what else is called and
// the status of these calls (i.e. prepare() may never have been called, or
// prepare()/open()/get_next() returned with an error).
Expand Down
12 changes: 6 additions & 6 deletions be/src/exec/hash_join_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,11 @@ Status HashJoinNode::prepare(RuntimeState* state) {
ADD_TIMER(runtime_profile(), "PushDownComputeTime");
_probe_timer =
ADD_TIMER(runtime_profile(), "ProbeTime");
_build_row_counter =
_build_rows_counter =
ADD_COUNTER(runtime_profile(), "BuildRows", TUnit::UNIT);
_build_buckets_counter =
ADD_COUNTER(runtime_profile(), "BuildBuckets", TUnit::UNIT);
_probe_row_counter =
_probe_rows_counter =
ADD_COUNTER(runtime_profile(), "ProbeRows", TUnit::UNIT);
_hash_tbl_load_factor_counter =
ADD_COUNTER(runtime_profile(), "LoadFactor", TUnit::DOUBLE_VALUE);
Expand Down Expand Up @@ -243,7 +243,7 @@ Status HashJoinNode::construct_hash_table(RuntimeState* state) {

VLOG_ROW << _hash_tbl->debug_string(true, &child(1)->row_desc());

COUNTER_SET(_build_row_counter, _hash_tbl->size());
COUNTER_SET(_build_rows_counter, _hash_tbl->size());
COUNTER_SET(_build_buckets_counter, _hash_tbl->num_buckets());
COUNTER_SET(_hash_tbl_load_factor_counter, _hash_tbl->load_factor());
build_batch.reset();
Expand Down Expand Up @@ -380,7 +380,7 @@ Status HashJoinNode::open(RuntimeState* state) {
// seed probe batch and _current_probe_row, etc.
while (true) {
RETURN_IF_ERROR(child(0)->get_next(state, _probe_batch.get(), &_probe_eos));
COUNTER_UPDATE(_probe_row_counter, _probe_batch->num_rows());
COUNTER_UPDATE(_probe_rows_counter, _probe_batch->num_rows());
_probe_batch_pos = 0;

if (_probe_batch->num_rows() == 0) {
Expand Down Expand Up @@ -571,7 +571,7 @@ Status HashJoinNode::get_next(RuntimeState* state, RowBatch* out_batch, bool* eo

continue;
} else {
COUNTER_UPDATE(_probe_row_counter, _probe_batch->num_rows());
COUNTER_UPDATE(_probe_rows_counter, _probe_batch->num_rows());
break;
}
}
Expand Down Expand Up @@ -695,7 +695,7 @@ Status HashJoinNode::left_join_get_next(RuntimeState* state,
probe_timer.stop();
RETURN_IF_ERROR(child(0)->get_next(state, _probe_batch.get(), &_probe_eos));
probe_timer.start();
COUNTER_UPDATE(_probe_row_counter, _probe_batch->num_rows());
COUNTER_UPDATE(_probe_rows_counter, _probe_batch->num_rows());
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions be/src/exec/hash_join_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,8 @@ class HashJoinNode : public ExecNode {
RuntimeProfile::Counter* _push_down_timer; // time to build hash table
RuntimeProfile::Counter* _push_compute_timer;
RuntimeProfile::Counter* _probe_timer; // time to probe
RuntimeProfile::Counter* _build_row_counter; // num build rows
RuntimeProfile::Counter* _probe_row_counter; // num probe rows
RuntimeProfile::Counter* _build_rows_counter; // num build rows
RuntimeProfile::Counter* _probe_rows_counter; // num probe rows
RuntimeProfile::Counter* _build_buckets_counter; // num buckets in hash table
RuntimeProfile::Counter* _hash_tbl_load_factor_counter;

Expand Down
7 changes: 7 additions & 0 deletions be/src/exec/olap_scan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,13 @@ Status OlapScanNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eo
return _status;
}

Status OlapScanNode::collect_query_statistics(QueryStatistics* statistics) {
RETURN_IF_ERROR(ExecNode::collect_query_statistics(statistics));
statistics->add_scan_bytes(_read_compressed_counter->value());
statistics->add_scan_rows(rows_returned());
return Status::OK;
}

Status OlapScanNode::close(RuntimeState* state) {
if (is_closed()) {
return Status::OK;
Expand Down
1 change: 1 addition & 0 deletions be/src/exec/olap_scan_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ class OlapScanNode : public ScanNode {
virtual Status prepare(RuntimeState* state);
virtual Status open(RuntimeState* state);
virtual Status get_next(RuntimeState* state, RowBatch* row_batch, bool* eos);
Status collect_query_statistics(QueryStatistics* statistics) override;
virtual Status close(RuntimeState* state);
virtual Status set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges);

Expand Down
1 change: 1 addition & 0 deletions be/src/runtime/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ add_library(Runtime STATIC
initial_reservations.cc
snapshot_loader.cpp
kafka_consumer_pipe.cpp
query_statistics.cpp
)

# This test runs forever so should not be part of 'make test'
Expand Down
10 changes: 7 additions & 3 deletions be/src/runtime/buffer_control_block.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,13 @@ void GetResultBatchCtx::on_failure(const Status& status) {
delete this;
}

void GetResultBatchCtx::on_close(int64_t packet_seq) {
void GetResultBatchCtx::on_close(int64_t packet_seq,
QueryStatistics* statistics) {
Status status;
status.to_protobuf(result->mutable_status());
if (statistics != nullptr) {
statistics->to_pb(result->mutable_query_statistics());
}
result->set_packet_seq(packet_seq);
result->set_eos(true);
done->Run();
Expand Down Expand Up @@ -183,7 +187,7 @@ void BufferControlBlock::get_batch(GetResultBatchCtx* ctx) {
return;
}
if (_is_close) {
ctx->on_close(_packet_num);
ctx->on_close(_packet_num, _query_statistics.get());
return;
}
// no ready data, push ctx to waiting list
Expand All @@ -200,7 +204,7 @@ Status BufferControlBlock::close(Status exec_status) {
if (!_waiting_rpc.empty()) {
if (_status.ok()) {
for (auto& ctx : _waiting_rpc) {
ctx->on_close(_packet_num);
ctx->on_close(_packet_num, _query_statistics.get());
}
} else {
for (auto& ctx : _waiting_rpc) {
Expand Down
13 changes: 11 additions & 2 deletions be/src/runtime/buffer_control_block.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <boost/thread/condition_variable.hpp>
#include "common/status.h"
#include "gen_cpp/Types_types.h"
#include "runtime/query_statistics.h"

namespace google {
namespace protobuf {
Expand Down Expand Up @@ -52,7 +53,7 @@ struct GetResultBatchCtx {
}

void on_failure(const Status& status);
void on_close(int64_t packet_seq);
void on_close(int64_t packet_seq, QueryStatistics* statistics = nullptr);
void on_data(TFetchDataResult* t_result, int64_t packet_seq, bool eos = false);
};

Expand Down Expand Up @@ -80,6 +81,9 @@ class BufferControlBlock {
return _fragment_id;
}

void set_query_statistics(std::shared_ptr<QueryStatistics> statistics) {
_query_statistics = statistics;
}
private:
typedef std::list<TFetchDataResult*> ResultQueue;

Expand All @@ -100,8 +104,13 @@ class BufferControlBlock {
boost::condition_variable _data_arriaval;
// signal removal of data by stream consumer
boost::condition_variable _data_removal;

std::deque<GetResultBatchCtx*> _waiting_rpc;

// It is shared with PlanFragmentExecutor and will be called in two different
// threads. But their calls are all at different time, there is no problem of
// multithreaded access.
std::shared_ptr<QueryStatistics> _query_statistics;
};

}
Expand Down
Loading