Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 13 additions & 13 deletions be/src/agent/task_worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -263,14 +263,14 @@ void TaskWorkerPool::_finish_task(const TFinishTaskRequest& finish_task_request)
uint32_t try_time = 0;

while (try_time < TASK_FINISH_MAX_RETRY) {
DorisMetrics::instance()->finish_task_requests_total.increment(1);
DorisMetrics::instance()->finish_task_requests_total->increment(1);
AgentStatus client_status = _master_client->finish_task(finish_task_request, &result);

if (client_status == DORIS_SUCCESS) {
LOG(INFO) << "finish task success.";
break;
} else {
DorisMetrics::instance()->finish_task_requests_failed.increment(1);
DorisMetrics::instance()->finish_task_requests_failed->increment(1);
LOG(WARNING) << "finish task failed. status_code=" << result.status.status_code;
try_time += 1;
}
Expand Down Expand Up @@ -694,7 +694,7 @@ void* TaskWorkerPool::_publish_version_worker_thread_callback(void* arg_this) {
worker_pool_this->_tasks.pop_front();
}

DorisMetrics::instance()->publish_task_request_total.increment(1);
DorisMetrics::instance()->publish_task_request_total->increment(1);
LOG(INFO) << "get publish version task, signature:" << agent_task_req.signature;

Status st;
Expand All @@ -718,7 +718,7 @@ void* TaskWorkerPool::_publish_version_worker_thread_callback(void* arg_this) {

TFinishTaskRequest finish_task_request;
if (res != OLAP_SUCCESS) {
DorisMetrics::instance()->publish_task_failed_total.increment(1);
DorisMetrics::instance()->publish_task_failed_total->increment(1);
// if publish failed, return failed, FE will ignore this error and
// check error tablet ids and FE will also republish this task
LOG(WARNING) << "publish version failed. signature:" << agent_task_req.signature
Expand Down Expand Up @@ -890,7 +890,7 @@ void* TaskWorkerPool::_clone_worker_thread_callback(void* arg_this) {
worker_pool_this->_tasks.pop_front();
}

DorisMetrics::instance()->clone_requests_total.increment(1);
DorisMetrics::instance()->clone_requests_total->increment(1);
LOG(INFO) << "get clone task. signature:" << agent_task_req.signature;

vector<string> error_msgs;
Expand All @@ -907,7 +907,7 @@ void* TaskWorkerPool::_clone_worker_thread_callback(void* arg_this) {

TStatusCode::type status_code = TStatusCode::OK;
if (status != DORIS_SUCCESS && status != DORIS_CREATE_TABLE_EXIST) {
DorisMetrics::instance()->clone_requests_failed.increment(1);
DorisMetrics::instance()->clone_requests_failed->increment(1);
status_code = TStatusCode::RUNTIME_ERROR;
LOG(WARNING) << "clone failed. signature: " << agent_task_req.signature;
error_msgs.push_back("clone failed.");
Expand Down Expand Up @@ -1050,12 +1050,12 @@ void* TaskWorkerPool::_report_task_worker_thread_callback(void* arg_this) {
request.__set_tasks(_s_task_signatures);
}

DorisMetrics::instance()->report_task_requests_total.increment(1);
DorisMetrics::instance()->report_task_requests_total->increment(1);
TMasterResult result;
AgentStatus status = worker_pool_this->_master_client->report(request, &result);

if (status != DORIS_SUCCESS) {
DorisMetrics::instance()->report_task_requests_failed.increment(1);
DorisMetrics::instance()->report_task_requests_failed->increment(1);
LOG(WARNING) << "finish report task failed. status:" << status << ", master host:"
<< worker_pool_this->_master_info.network_address.hostname
<< "port:" << worker_pool_this->_master_info.network_address.port;
Expand Down Expand Up @@ -1103,12 +1103,12 @@ void* TaskWorkerPool::_report_disk_state_worker_thread_callback(void* arg_this)
}
request.__set_disks(disks);

DorisMetrics::instance()->report_disk_requests_total.increment(1);
DorisMetrics::instance()->report_disk_requests_total->increment(1);
TMasterResult result;
AgentStatus status = worker_pool_this->_master_client->report(request, &result);

if (status != DORIS_SUCCESS) {
DorisMetrics::instance()->report_disk_requests_failed.increment(1);
DorisMetrics::instance()->report_disk_requests_failed->increment(1);
LOG(WARNING) << "finish report disk state failed. status:" << status << ", master host:"
<< worker_pool_this->_master_info.network_address.hostname
<< ", port:" << worker_pool_this->_master_info.network_address.port;
Expand Down Expand Up @@ -1161,15 +1161,15 @@ void* TaskWorkerPool::_report_tablet_worker_thread_callback(void* arg_this) {
#endif
}
int64_t max_compaction_score =
std::max(DorisMetrics::instance()->tablet_cumulative_max_compaction_score.value(),
DorisMetrics::instance()->tablet_base_max_compaction_score.value());
std::max(DorisMetrics::instance()->tablet_cumulative_max_compaction_score->value(),
DorisMetrics::instance()->tablet_base_max_compaction_score->value());
request.__set_tablet_max_compaction_score(max_compaction_score);

TMasterResult result;
status = worker_pool_this->_master_client->report(request, &result);

if (status != DORIS_SUCCESS) {
DorisMetrics::instance()->report_all_tablets_requests_failed.increment(1);
DorisMetrics::instance()->report_all_tablets_requests_failed->increment(1);
LOG(WARNING) << "finish report olap table state failed. status:" << status
<< ", master host:"
<< worker_pool_this->_master_info.network_address.hostname
Expand Down
18 changes: 9 additions & 9 deletions be/src/common/daemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,8 @@ void* calculate_metrics(void* dummy) {

if (last_ts == -1L) {
last_ts = GetCurrentTimeMicros() / 1000;
lst_push_bytes = DorisMetrics::instance()->push_request_write_bytes.value();
lst_query_bytes = DorisMetrics::instance()->query_scan_bytes.value();
lst_push_bytes = DorisMetrics::instance()->push_request_write_bytes->value();
lst_query_bytes = DorisMetrics::instance()->query_scan_bytes->value();
DorisMetrics::instance()->system_metrics()->get_disks_io_time(&lst_disks_io_time);
DorisMetrics::instance()->system_metrics()->get_network_traffic(&lst_net_send_bytes, &lst_net_receive_bytes);
} else {
Expand All @@ -147,21 +147,21 @@ void* calculate_metrics(void* dummy) {
last_ts = current_ts;

// 1. push bytes per second
int64_t current_push_bytes = DorisMetrics::instance()->push_request_write_bytes.value();
int64_t current_push_bytes = DorisMetrics::instance()->push_request_write_bytes->value();
int64_t pps = (current_push_bytes - lst_push_bytes) / (interval + 1);
DorisMetrics::instance()->push_request_write_bytes_per_second.set_value(
DorisMetrics::instance()->push_request_write_bytes_per_second->set_value(
pps < 0 ? 0 : pps);
lst_push_bytes = current_push_bytes;

// 2. query bytes per second
int64_t current_query_bytes = DorisMetrics::instance()->query_scan_bytes.value();
int64_t current_query_bytes = DorisMetrics::instance()->query_scan_bytes->value();
int64_t qps = (current_query_bytes - lst_query_bytes) / (interval + 1);
DorisMetrics::instance()->query_scan_bytes_per_second.set_value(
DorisMetrics::instance()->query_scan_bytes_per_second->set_value(
qps < 0 ? 0 : qps);
lst_query_bytes = current_query_bytes;

// 3. max disk io util
DorisMetrics::instance()->max_disk_io_util_percent.set_value(
DorisMetrics::instance()->max_disk_io_util_percent->set_value(
DorisMetrics::instance()->system_metrics()->get_max_io_util(lst_disks_io_time, 15));
// update lst map
DorisMetrics::instance()->system_metrics()->get_disks_io_time(&lst_disks_io_time);
Expand All @@ -171,8 +171,8 @@ void* calculate_metrics(void* dummy) {
int64_t max_receive = 0;
DorisMetrics::instance()->system_metrics()->get_max_net_traffic(
lst_net_send_bytes, lst_net_receive_bytes, 15, &max_send, &max_receive);
DorisMetrics::instance()->max_network_send_bytes_rate.set_value(max_send);
DorisMetrics::instance()->max_network_receive_bytes_rate.set_value(max_receive);
DorisMetrics::instance()->max_network_send_bytes_rate->set_value(max_send);
DorisMetrics::instance()->max_network_receive_bytes_rate->set_value(max_receive);
// update lst map
DorisMetrics::instance()->system_metrics()->get_network_traffic(&lst_net_send_bytes, &lst_net_receive_bytes);
}
Expand Down
7 changes: 5 additions & 2 deletions be/src/exec/olap_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -479,8 +479,11 @@ void OlapScanner::update_counter() {
COUNTER_UPDATE(_parent->_filtered_segment_counter, _reader->stats().filtered_segment_number);
COUNTER_UPDATE(_parent->_total_segment_counter, _reader->stats().total_segment_number);

DorisMetrics::instance()->query_scan_bytes.increment(_compressed_bytes_read);
DorisMetrics::instance()->query_scan_rows.increment(_raw_rows_read);
DorisMetrics::instance()->query_scan_bytes->increment(_compressed_bytes_read);
DorisMetrics::instance()->query_scan_rows->increment(_raw_rows_read);

_tablet->query_scan_bytes->increment(_compressed_bytes_read);
_tablet->query_scan_rows->increment(_raw_rows_read);

_has_update_counter = true;
}
Expand Down
4 changes: 2 additions & 2 deletions be/src/exec/tablet_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -604,8 +604,8 @@ Status OlapTableSink::send(RuntimeState* state, RowBatch* input_batch) {
// the real 'num_rows_load_total' will be set when sink being closed.
state->update_num_rows_load_total(input_batch->num_rows());
state->update_num_bytes_load_total(input_batch->total_byte_size());
DorisMetrics::instance()->load_rows.increment(input_batch->num_rows());
DorisMetrics::instance()->load_bytes.increment(input_batch->total_byte_size());
DorisMetrics::instance()->load_rows->increment(input_batch->num_rows());
DorisMetrics::instance()->load_bytes->increment(input_batch->total_byte_size());
RowBatch* batch = input_batch;
if (!_output_expr_ctxs.empty()) {
SCOPED_RAW_TIMER(&_convert_batch_ns);
Expand Down
4 changes: 2 additions & 2 deletions be/src/http/action/compaction_action.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ OLAPStatus CompactionAction::_execute_compaction_callback(TabletSharedPtr tablet
OLAPStatus res = base_compaction.compact();
if (res != OLAP_SUCCESS) {
if (res != OLAP_ERR_BE_NO_SUITABLE_VERSION) {
DorisMetrics::instance()->base_compaction_request_failed.increment(1);
DorisMetrics::instance()->base_compaction_request_failed->increment(1);
LOG(WARNING) << "failed to init base compaction. res=" << res
<< ", table=" << tablet->full_name();
}
Expand All @@ -232,7 +232,7 @@ OLAPStatus CompactionAction::_execute_compaction_callback(TabletSharedPtr tablet
OLAPStatus res = cumulative_compaction.compact();
if (res != OLAP_SUCCESS) {
if (res != OLAP_ERR_CUMULATIVE_NO_SUITABLE_VERSIONS) {
DorisMetrics::instance()->cumulative_compaction_request_failed.increment(1);
DorisMetrics::instance()->cumulative_compaction_request_failed->increment(1);
LOG(WARNING) << "failed to do cumulative compaction. res=" << res
<< ", table=" << tablet->full_name();
}
Expand Down
5 changes: 3 additions & 2 deletions be/src/http/action/metrics_action.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,14 @@ namespace doris {

void MetricsAction::handle(HttpRequest* req) {
const std::string& type = req->param("type");
const std::string& with_tablet = req->param("with_tablet");
std::string str;
if (type == "core") {
str = _metric_registry->to_core_string();
} else if (type == "json") {
str = _metric_registry->to_json();
str = _metric_registry->to_json(with_tablet == "true");
} else {
str = _metric_registry->to_prometheus();
str = _metric_registry->to_prometheus(with_tablet == "true");
}

req->add_output_header(HttpHeaders::CONTENT_TYPE, "text/plain; version=0.0.4");
Expand Down
24 changes: 12 additions & 12 deletions be/src/http/action/stream_load.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,15 +89,15 @@ static bool is_format_support_streaming(TFileFormatType::type format) {
}

StreamLoadAction::StreamLoadAction(ExecEnv* exec_env) : _exec_env(exec_env) {
_stream_load_entity = DorisMetrics::instance()->metric_registry()->register_entity("stream_load", {});
METRIC_REGISTER(_stream_load_entity, streaming_load_requests_total);
METRIC_REGISTER(_stream_load_entity, streaming_load_bytes);
METRIC_REGISTER(_stream_load_entity, streaming_load_duration_ms);
METRIC_REGISTER(_stream_load_entity, streaming_load_current_processing);
_stream_load_entity = DorisMetrics::instance()->metric_registry()->register_entity("stream_load");
INT_COUNTER_METRIC_REGISTER(_stream_load_entity, streaming_load_requests_total);
INT_COUNTER_METRIC_REGISTER(_stream_load_entity, streaming_load_bytes);
INT_COUNTER_METRIC_REGISTER(_stream_load_entity, streaming_load_duration_ms);
INT_GAUGE_METRIC_REGISTER(_stream_load_entity, streaming_load_current_processing);
}

StreamLoadAction::~StreamLoadAction() {
DorisMetrics::instance()->metric_registry()->deregister_entity("stream_load");
DorisMetrics::instance()->metric_registry()->deregister_entity(_stream_load_entity);
}

void StreamLoadAction::handle(HttpRequest* req) {
Expand Down Expand Up @@ -130,10 +130,10 @@ void StreamLoadAction::handle(HttpRequest* req) {
HttpChannel::send_reply(req, str);

// update statstics
streaming_load_requests_total.increment(1);
streaming_load_duration_ms.increment(ctx->load_cost_nanos / 1000000);
streaming_load_bytes.increment(ctx->receive_bytes);
streaming_load_current_processing.increment(-1);
streaming_load_requests_total->increment(1);
streaming_load_duration_ms->increment(ctx->load_cost_nanos / 1000000);
streaming_load_bytes->increment(ctx->receive_bytes);
streaming_load_current_processing->increment(-1);
}

Status StreamLoadAction::_handle(StreamLoadContext* ctx) {
Expand Down Expand Up @@ -165,7 +165,7 @@ Status StreamLoadAction::_handle(StreamLoadContext* ctx) {
}

int StreamLoadAction::on_header(HttpRequest* req) {
streaming_load_current_processing.increment(1);
streaming_load_current_processing->increment(1);

StreamLoadContext* ctx = new StreamLoadContext(_exec_env);
ctx->ref();
Expand Down Expand Up @@ -196,7 +196,7 @@ int StreamLoadAction::on_header(HttpRequest* req) {
}
auto str = ctx->to_json();
HttpChannel::send_reply(req, str);
streaming_load_current_processing.increment(-1);
streaming_load_current_processing->increment(-1);
return -1;
}
return 0;
Expand Down
10 changes: 5 additions & 5 deletions be/src/http/action/stream_load.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,11 @@ class StreamLoadAction : public HttpHandler {
private:
ExecEnv* _exec_env;

MetricEntity* _stream_load_entity;
IntCounter streaming_load_requests_total;
IntCounter streaming_load_bytes;
IntCounter streaming_load_duration_ms;
IntGauge streaming_load_current_processing;
std::shared_ptr<MetricEntity> _stream_load_entity;
IntCounter* streaming_load_requests_total;
IntCounter* streaming_load_bytes;
IntCounter* streaming_load_duration_ms;
IntGauge* streaming_load_current_processing;
};

}
4 changes: 2 additions & 2 deletions be/src/olap/base_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ OLAPStatus BaseCompaction::compact() {
_state = CompactionState::SUCCESS;

// 4. add metric to base compaction
DorisMetrics::instance()->base_compaction_deltas_total.increment(_input_rowsets.size());
DorisMetrics::instance()->base_compaction_bytes_total.increment(_input_rowsets_size);
DorisMetrics::instance()->base_compaction_deltas_total->increment(_input_rowsets.size());
DorisMetrics::instance()->base_compaction_bytes_total->increment(_input_rowsets_size);
TRACE("save base compaction metrics");

return OLAP_SUCCESS;
Expand Down
16 changes: 15 additions & 1 deletion be/src/olap/base_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,34 @@

#include "olap/base_tablet.h"

#include "gutil/strings/substitute.h"
#include "olap/data_dir.h"
#include "util/doris_metrics.h"
#include "util/path_util.h"

namespace doris {

extern MetricPrototype METRIC_query_scan_bytes;
extern MetricPrototype METRIC_query_scan_rows;

BaseTablet::BaseTablet(TabletMetaSharedPtr tablet_meta, DataDir* data_dir)
: _state(tablet_meta->tablet_state()),
_tablet_meta(tablet_meta),
_schema(tablet_meta->tablet_schema()),
_data_dir(data_dir) {
_gen_tablet_path();

_metric_entity = DorisMetrics::instance()->metric_registry()->register_entity(
strings::Substitute("Tablet.$0", tablet_id()),
{{"tablet_id", std::to_string(tablet_id())}},
MetricEntityType::kTablet);
INT_COUNTER_METRIC_REGISTER(_metric_entity, query_scan_bytes);
INT_COUNTER_METRIC_REGISTER(_metric_entity, query_scan_rows);
}

BaseTablet::~BaseTablet() {}
BaseTablet::~BaseTablet() {
DorisMetrics::instance()->metric_registry()->deregister_entity(_metric_entity);
}

OLAPStatus BaseTablet::set_tablet_state(TabletState state) {
if (_tablet_meta->tablet_state() == TABLET_SHUTDOWN && state != TABLET_SHUTDOWN) {
Expand Down
7 changes: 7 additions & 0 deletions be/src/olap/base_tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "olap/olap_define.h"
#include "olap/tablet_meta.h"
#include "olap/utils.h"
#include "util/metrics.h"

namespace doris {

Expand Down Expand Up @@ -74,6 +75,12 @@ class BaseTablet : public std::enable_shared_from_this<BaseTablet> {
DataDir* _data_dir;
std::string _tablet_path;

// metrics of this tablet
std::shared_ptr<MetricEntity> _metric_entity = nullptr;
public:
IntCounter* query_scan_bytes;
IntCounter* query_scan_rows;

private:
DISALLOW_COPY_AND_ASSIGN(BaseTablet);
};
Expand Down
4 changes: 2 additions & 2 deletions be/src/olap/cumulative_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ OLAPStatus CumulativeCompaction::compact() {
<< _tablet->cumulative_layer_point() << ", tablet=" << _tablet->full_name();

// 6. add metric to cumulative compaction
DorisMetrics::instance()->cumulative_compaction_deltas_total.increment(_input_rowsets.size());
DorisMetrics::instance()->cumulative_compaction_bytes_total.increment(_input_rowsets_size);
DorisMetrics::instance()->cumulative_compaction_deltas_total->increment(_input_rowsets.size());
DorisMetrics::instance()->cumulative_compaction_bytes_total->increment(_input_rowsets_size);
TRACE("save cumulative compaction metrics");

return OLAP_SUCCESS;
Expand Down
Loading