Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions be/src/exec/tablet_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -598,6 +598,9 @@ Status OlapTableSink::send(RuntimeState* state, RowBatch* input_batch) {
// update incrementally so that FE can get the progress.
// the real 'num_rows_load_total' will be set when sink being closed.
state->update_num_rows_load_total(input_batch->num_rows());
state->update_num_bytes_load_total(input_batch->total_byte_size());
DorisMetrics::instance()->load_rows_total.increment(input_batch->num_rows());
DorisMetrics::instance()->load_bytes_total.increment(input_batch->total_byte_size());
RowBatch* batch = input_batch;
if (!_output_expr_ctxs.empty()) {
SCOPED_RAW_TIMER(&_convert_batch_ns);
Expand Down
80 changes: 72 additions & 8 deletions be/src/http/action/metrics_action.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@

#include "http/action/metrics_action.h"

#include <rapidjson/rapidjson.h>
#include <rapidjson/stringbuffer.h>
#include <rapidjson/document.h>
#include <rapidjson/writer.h>
#include <string>

#include "http/http_request.h"
Expand All @@ -36,7 +40,7 @@ class PrometheusMetricsVisitor : public MetricsVisitor {
std::string to_string() const { return _ss.str(); }
private:
void _visit_simple_metric(
const std::string& name, const MetricLabels& labels, SimpleMetric* metric);
const std::string& name, const MetricLabels& labels, Metric* metric);
private:
std::stringstream _ss;
};
Expand Down Expand Up @@ -88,7 +92,7 @@ void PrometheusMetricsVisitor::visit(const std::string& prefix,
case MetricType::COUNTER:
case MetricType::GAUGE:
for (auto& it : collector->metrics()) {
_visit_simple_metric(metric_name, it.first, (SimpleMetric*) it.second);
_visit_simple_metric(metric_name, it.first, (Metric*) it.second);
}
break;
default:
Expand All @@ -97,7 +101,7 @@ void PrometheusMetricsVisitor::visit(const std::string& prefix,
}

void PrometheusMetricsVisitor::_visit_simple_metric(
const std::string& name, const MetricLabels& labels, SimpleMetric* metric) {
const std::string& name, const MetricLabels& labels, Metric* metric) {
_ss << name;
// labels
if (!labels.empty()) {
Expand Down Expand Up @@ -138,20 +142,80 @@ void SimpleCoreMetricsVisitor::visit(const std::string& prefix,
}

for (auto& it : collector->metrics()) {
_ss << metric_name << " LONG " << ((SimpleMetric*) it.second)->to_string()
_ss << metric_name << " LONG " << ((Metric*) it.second)->to_string()
<< "\n";
}
}

class JsonMetricsVisitor : public MetricsVisitor {
public:
JsonMetricsVisitor() {
}
virtual ~JsonMetricsVisitor() {}
void visit(const std::string& prefix, const std::string& name,
MetricCollector* collector) override;
std::string to_string() {
rapidjson::StringBuffer strBuf;
rapidjson::Writer<rapidjson::StringBuffer> writer(strBuf);
doc.Accept(writer);
return strBuf.GetString();
}

private:
rapidjson::Document doc{rapidjson::kArrayType};
};

void JsonMetricsVisitor::visit(const std::string& prefix,
const std::string& name,
MetricCollector* collector) {
if (collector->empty() || name.empty()) {
return;
}

rapidjson::Document::AllocatorType& allocator = doc.GetAllocator();
switch (collector->type()) {
case MetricType::COUNTER:
case MetricType::GAUGE:
for (auto& it : collector->metrics()) {
const MetricLabels& labels = it.first;
Metric* metric = reinterpret_cast<Metric*>(it.second);
rapidjson::Value metric_obj(rapidjson::kObjectType);
rapidjson::Value tag_obj(rapidjson::kObjectType);
tag_obj.AddMember("metric", rapidjson::Value(name.c_str(), allocator), allocator);
// labels
if (!labels.empty()) {
for (auto& label : labels.labels) {
tag_obj.AddMember(
rapidjson::Value(label.name.c_str(), allocator),
rapidjson::Value(label.value.c_str(), allocator),
allocator);
}
}
metric_obj.AddMember("tags", tag_obj, allocator);
rapidjson::Value unit_val(unit_name(metric->unit()), allocator);
metric_obj.AddMember("unit", unit_val, allocator);
metric->write_value(metric_obj, allocator);
doc.PushBack(metric_obj, allocator);
}
break;
default:
break;
}
}

void MetricsAction::handle(HttpRequest* req) {
const std::string& type = req->param("type");
std::string str;
if (type != "core") {
PrometheusMetricsVisitor visitor;
if (type == "core") {
SimpleCoreMetricsVisitor visitor;
_metrics->collect(&visitor);
str.assign(visitor.to_string());
} else if (type == "agent") {
JsonMetricsVisitor visitor;
_metrics->collect(&visitor);
str.assign(visitor.to_string());
} else {
SimpleCoreMetricsVisitor visitor;
PrometheusMetricsVisitor visitor;
_metrics->collect(&visitor);
str.assign(visitor.to_string());
}
Expand All @@ -160,4 +224,4 @@ void MetricsAction::handle(HttpRequest* req) {
HttpChannel::send_reply(req, str);
}

}
} // namespace doris
28 changes: 14 additions & 14 deletions be/src/http/action/stream_load.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,10 @@

namespace doris {

IntCounter k_streaming_load_requests_total;
IntCounter k_streaming_load_bytes;
IntCounter k_streaming_load_duration_ms;
static IntGauge k_streaming_load_current_processing;
METRIC_DEFINE_INT_COUNTER(streaming_load_requests_total, MetricUnit::NUMBER);
METRIC_DEFINE_INT_COUNTER(streaming_load_bytes, MetricUnit::BYTES);
METRIC_DEFINE_INT_COUNTER(streaming_load_duration_ms, MetricUnit::MILLISECONDS);
METRIC_DEFINE_INT_GAUGE(streaming_load_current_processing, MetricUnit::NUMBER);

#ifdef BE_TEST
TStreamLoadPutResult k_stream_load_put_result;
Expand All @@ -89,13 +89,13 @@ static bool is_format_support_streaming(TFileFormatType::type format) {

StreamLoadAction::StreamLoadAction(ExecEnv* exec_env) : _exec_env(exec_env) {
DorisMetrics::instance()->metrics()->register_metric("streaming_load_requests_total",
&k_streaming_load_requests_total);
&streaming_load_requests_total);
DorisMetrics::instance()->metrics()->register_metric("streaming_load_bytes",
&k_streaming_load_bytes);
&streaming_load_bytes);
DorisMetrics::instance()->metrics()->register_metric("streaming_load_duration_ms",
&k_streaming_load_duration_ms);
&streaming_load_duration_ms);
DorisMetrics::instance()->metrics()->register_metric("streaming_load_current_processing",
&k_streaming_load_current_processing);
&streaming_load_current_processing);
}

StreamLoadAction::~StreamLoadAction() {
Expand Down Expand Up @@ -131,10 +131,10 @@ void StreamLoadAction::handle(HttpRequest* req) {
HttpChannel::send_reply(req, str);

// update statstics
k_streaming_load_requests_total.increment(1);
k_streaming_load_duration_ms.increment(ctx->load_cost_nanos / 1000000);
k_streaming_load_bytes.increment(ctx->receive_bytes);
k_streaming_load_current_processing.increment(-1);
streaming_load_requests_total.increment(1);
streaming_load_duration_ms.increment(ctx->load_cost_nanos / 1000000);
streaming_load_bytes.increment(ctx->receive_bytes);
streaming_load_current_processing.increment(-1);
}

Status StreamLoadAction::_handle(StreamLoadContext* ctx) {
Expand Down Expand Up @@ -164,7 +164,7 @@ Status StreamLoadAction::_handle(StreamLoadContext* ctx) {
}

int StreamLoadAction::on_header(HttpRequest* req) {
k_streaming_load_current_processing.increment(1);
streaming_load_current_processing.increment(1);

StreamLoadContext* ctx = new StreamLoadContext(_exec_env);
ctx->ref();
Expand Down Expand Up @@ -195,7 +195,7 @@ int StreamLoadAction::on_header(HttpRequest* req) {
}
auto str = ctx->to_json();
HttpChannel::send_reply(req, str);
k_streaming_load_current_processing.increment(-1);
streaming_load_current_processing.increment(-1);
return -1;
}
return 0;
Expand Down
4 changes: 2 additions & 2 deletions be/src/runtime/client_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -216,12 +216,12 @@ void ClientCacheHelper::init_metrics(MetricRegistry* metrics, const std::string&
// usage, but ensures that _metrics_enabled is published.
boost::lock_guard<boost::mutex> lock(_lock);

_used_clients.reset(new IntGauge());
_used_clients.reset(new IntGauge(MetricUnit::NUMBER));
metrics->register_metric("thrift_used_clients",
MetricLabels().add("name", key_prefix),
_used_clients.get());

_opened_clients.reset(new IntGauge());
_opened_clients.reset(new IntGauge(MetricUnit::NUMBER));
metrics->register_metric("thrift_opened_clients",
MetricLabels().add("name", key_prefix),
_opened_clients.get());
Expand Down
12 changes: 6 additions & 6 deletions be/src/runtime/memory/chunk_allocator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,12 @@ namespace doris {

ChunkAllocator* ChunkAllocator::_s_instance = nullptr;

static IntCounter local_core_alloc_count;
static IntCounter other_core_alloc_count;
static IntCounter system_alloc_count;
static IntCounter system_free_count;
static IntCounter system_alloc_cost_ns;
static IntCounter system_free_cost_ns;
static IntCounter local_core_alloc_count(MetricUnit::NUMBER);
static IntCounter other_core_alloc_count(MetricUnit::NUMBER);
static IntCounter system_alloc_count(MetricUnit::NUMBER);
static IntCounter system_free_count(MetricUnit::NUMBER);
static IntCounter system_alloc_cost_ns(MetricUnit::NANOSECONDS);
static IntCounter system_free_cost_ns(MetricUnit::NANOSECONDS);

#ifdef BE_TEST
static std::mutex s_mutex;
Expand Down
14 changes: 14 additions & 0 deletions be/src/runtime/runtime_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,10 @@ class RuntimeState {
void append_error_msg_to_file(const std::string& line, const std::string& error_msg,
bool is_summary = false);

int64_t num_bytes_load_total() {
return _num_bytes_load_total.load();
}

int64_t num_rows_load_total() {
return _num_rows_load_total.load();
}
Expand All @@ -413,6 +417,14 @@ class RuntimeState {
_num_rows_load_total.store(num_rows);
}

void update_num_bytes_load_total(int64_t bytes_load) {
_num_bytes_load_total.fetch_add(bytes_load);
}

void set_update_num_bytes_load_total(int64_t bytes_load) {
_num_bytes_load_total.store(bytes_load);
}

void update_num_rows_load_filtered(int64_t num_rows) {
_num_rows_load_filtered.fetch_add(num_rows);
}
Expand Down Expand Up @@ -587,6 +599,8 @@ class RuntimeState {
std::atomic<int64_t> _num_rows_load_unselected; // rows filtered by predicates
std::atomic<int64_t> _num_print_error_rows;

std::atomic<int64_t> _num_bytes_load_total; // total bytes read from source

std::vector<std::string> _export_output_files;

std::string _import_label;
Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/tmp_file_mgr.cc
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ Status TmpFileMgr::init_custom(
}

DCHECK(metrics != NULL);
_num_active_scratch_dirs_metric.reset(new IntGauge());
_num_active_scratch_dirs_metric.reset(new IntGauge(MetricUnit::NUMBER));
metrics->register_metric("active_scratch_dirs", _num_active_scratch_dirs_metric.get());
//_active_scratch_dirs_metric = metrics->register_metric(new SetMetric<std::string>(
// TMP_FILE_MGR_ACTIVE_SCRATCH_DIRS_LIST,
Expand Down
13 changes: 9 additions & 4 deletions be/src/util/doris_metrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,11 @@ DorisMetrics::DorisMetrics() : _name("doris_be"), _hook_name("doris_metrics"), _
_metrics.register_metric(
"stream_load", MetricLabels().add("type", "load_rows"),
&stream_load_rows_total);
_metrics.register_metric(
"load", MetricLabels().add("type", "receive_bytes"),
&stream_receive_bytes_total);
_metrics.register_metric("load_rows", &load_rows_total);
_metrics.register_metric("load_bytes", &load_bytes_total);

// Gauge
REGISTER_DORIS_METRIC(memory_pool_bytes_total);
Expand Down Expand Up @@ -188,13 +193,13 @@ void DorisMetrics::initialize(
const std::vector<std::string>& network_interfaces) {
// disk usage
for (auto& path : paths) {
IntGauge* gauge = disks_total_capacity.set_key(path);
IntGauge* gauge = disks_total_capacity.set_key(path, MetricUnit::BYTES);
_metrics.register_metric("disks_total_capacity", MetricLabels().add("path", path), gauge);
gauge = disks_avail_capacity.set_key(path);
gauge = disks_avail_capacity.set_key(path, MetricUnit::BYTES);
_metrics.register_metric("disks_avail_capacity", MetricLabels().add("path", path), gauge);
gauge = disks_data_used_capacity.set_key(path);
gauge = disks_data_used_capacity.set_key(path, MetricUnit::BYTES);
_metrics.register_metric("disks_data_used_capacity", MetricLabels().add("path", path), gauge);
gauge = disks_state.set_key(path);
gauge = disks_state.set_key(path, MetricUnit::BYTES);
_metrics.register_metric("disks_state", MetricLabels().add("path", path), gauge);
}

Expand Down
Loading