Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions be/src/agent/task_worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -868,6 +868,8 @@ void* TaskWorkerPool::_publish_version_worker_thread_callback(void* arg_this) {
publish_version_req = agent_task_req.publish_version_req;
worker_pool_this->_tasks.pop_front();
}

DorisMetrics::publish_task_request_total.increment(1);
LOG(INFO)<< "get publish version task, signature:" << agent_task_req.signature;

TStatusCode::type status_code = TStatusCode::OK;
Expand Down Expand Up @@ -898,6 +900,7 @@ void* TaskWorkerPool::_publish_version_worker_thread_callback(void* arg_this) {
OLAP_LOG_WARNING("publish version failed. signature: %ld", agent_task_req.signature);
error_msgs.push_back("publish version failed");
finish_task_request.__set_error_tablet_ids(error_tablet_ids);
DorisMetrics::publish_task_failed_total.increment(1);
} else {
LOG(INFO) << "publish_version success. signature:" << agent_task_req.signature;
}
Expand Down
4 changes: 2 additions & 2 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ namespace config {
CONF_Int32(tablet_stat_cache_update_interval_second, "300");

// result buffer cancelled time (unit: second)
CONF_Int32(result_buffer_cancelled_interval_time, "5");
CONF_Int32(result_buffer_cancelled_interval_time, "300");

// can perform recovering tablet
CONF_Bool(force_recovery, "false");
Expand All @@ -395,7 +395,7 @@ namespace config {
CONF_Int32(txn_commit_rpc_timeout_ms, "10000");

// If set to true, metric calculator will run
CONF_Bool(enable_metric_calculator, "false");
CONF_Bool(enable_metric_calculator, "true");

// max consumer num in one data consumer group, for routine load
CONF_Int32(max_consumer_num_per_group, "3");
Expand Down
28 changes: 18 additions & 10 deletions be/src/olap/olap_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1832,17 +1832,21 @@ void OLAPEngine::perform_cumulative_compaction() {
CumulativeCompaction cumulative_compaction;
OLAPStatus res = cumulative_compaction.init(best_table);
if (res != OLAP_SUCCESS) {
DorisMetrics::cumulative_compaction_request_failed.increment(1);
LOG(WARNING) << "failed to init cumulative compaction."
<< "table=" << best_table->full_name();
if (res != OLAP_ERR_CUMULATIVE_REPEAT_INIT && res != OLAP_ERR_CE_TRY_CE_LOCK_ERROR) {
DorisMetrics::cumulative_compaction_request_failed.increment(1);
LOG(WARNING) << "failed to init cumulative compaction"
<< ", table=" << best_table->full_name()
<< ", res=" << res;
}
return;
}

res = cumulative_compaction.run();
if (res != OLAP_SUCCESS) {
DorisMetrics::cumulative_compaction_request_failed.increment(1);
LOG(WARNING) << "failed to do cumulative compaction."
<< "table=" << best_table->full_name();
LOG(WARNING) << "failed to do cumulative compaction"
<< ", table=" << best_table->full_name()
<< ", res=" << res;
return;
}
}
Expand All @@ -1855,17 +1859,21 @@ void OLAPEngine::perform_base_compaction() {
BaseCompaction base_compaction;
OLAPStatus res = base_compaction.init(best_table);
if (res != OLAP_SUCCESS) {
DorisMetrics::base_compaction_request_failed.increment(1);
LOG(WARNING) << "failed to init base compaction."
<< "table=" << best_table->full_name();
if (res != OLAP_ERR_BE_TRY_BE_LOCK_ERROR && res != OLAP_ERR_BE_NO_SUITABLE_VERSION) {
DorisMetrics::base_compaction_request_failed.increment(1);
LOG(WARNING) << "failed to init base compaction"
<< ", table=" << best_table->full_name()
<< ", res=" << res;
}
return;
}

res = base_compaction.run();
if (res != OLAP_SUCCESS) {
DorisMetrics::base_compaction_request_failed.increment(1);
LOG(WARNING) << "failed to init base compaction."
<< "table=" << best_table->full_name();
LOG(WARNING) << "failed to init base compaction"
<< ", table=" << best_table->full_name()
<< ", res=" << res;
return;
}
}
Expand Down
20 changes: 14 additions & 6 deletions be/src/runtime/stream_load/stream_load_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "runtime/runtime_state.h"
#include "runtime/stream_load/stream_load_context.h"
#include "util/frontend_helper.h"
#include "util/doris_metrics.h"

#include "gen_cpp/FrontendService.h"
#include "gen_cpp/FrontendService_types.h"
Expand All @@ -42,6 +43,7 @@ Status k_stream_load_plan_status;
#endif

Status StreamLoadExecutor::execute_plan_fragment(StreamLoadContext* ctx) {
DorisMetrics::txn_exec_plan_total.increment(1);
// submit this params
#ifndef BE_TEST
ctx->ref();
Expand All @@ -66,11 +68,13 @@ Status StreamLoadExecutor::execute_plan_fragment(StreamLoadContext* ctx) {
}
if (ctx->number_filtered_rows > 0 &&
!executor->runtime_state()->get_error_log_file_path().empty()) {

// if (ctx->load_type == TLoadType::MANUL_LOAD) {
ctx->error_url = to_load_error_http_path(
ctx->error_url = to_load_error_http_path(
executor->runtime_state()->get_error_log_file_path());
// }
}

if (status.ok()) {
DorisMetrics::stream_receive_bytes_total.increment(ctx->receive_bytes);
DorisMetrics::stream_load_rows_total.increment(ctx->number_loaded_rows);
}
} else {
LOG(WARNING) << "fragment execute failed"
Expand Down Expand Up @@ -108,8 +112,9 @@ Status StreamLoadExecutor::execute_plan_fragment(StreamLoadContext* ctx) {
}

Status StreamLoadExecutor::begin_txn(StreamLoadContext* ctx) {
TNetworkAddress master_addr = _exec_env->master_info()->network_address;
DorisMetrics::txn_begin_request_total.increment(1);

TNetworkAddress master_addr = _exec_env->master_info()->network_address;
TLoadTxnBeginRequest request;
set_request_auth(&request, ctx->auth);
request.db = ctx->db;
Expand Down Expand Up @@ -141,8 +146,9 @@ Status StreamLoadExecutor::begin_txn(StreamLoadContext* ctx) {
}

Status StreamLoadExecutor::commit_txn(StreamLoadContext* ctx) {
TNetworkAddress master_addr = _exec_env->master_info()->network_address;
DorisMetrics::txn_commit_request_total.increment(1);

TNetworkAddress master_addr = _exec_env->master_info()->network_address;
TLoadTxnCommitRequest request;
set_request_auth(&request, ctx->auth);
request.db = ctx->db;
Expand Down Expand Up @@ -183,6 +189,8 @@ Status StreamLoadExecutor::commit_txn(StreamLoadContext* ctx) {
}

void StreamLoadExecutor::rollback_txn(StreamLoadContext* ctx) {
DorisMetrics::txn_rollback_request_total.increment(1);

TNetworkAddress master_addr = _exec_env->master_info()->network_address;
TLoadTxnRollbackRequest request;
set_request_auth(&request, ctx->auth);
Expand Down
33 changes: 33 additions & 0 deletions be/src/util/doris_metrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,21 @@ IntCounter DorisMetrics::cumulative_compaction_bytes_total;
IntCounter DorisMetrics::cumulative_compaction_request_total;
IntCounter DorisMetrics::cumulative_compaction_request_failed;

IntCounter DorisMetrics::publish_task_request_total;
IntCounter DorisMetrics::publish_task_failed_total;

IntCounter DorisMetrics::meta_write_request_total;
IntCounter DorisMetrics::meta_write_request_duration_us;
IntCounter DorisMetrics::meta_read_request_total;
IntCounter DorisMetrics::meta_read_request_duration_us;

IntCounter DorisMetrics::txn_begin_request_total;
IntCounter DorisMetrics::txn_commit_request_total;
IntCounter DorisMetrics::txn_rollback_request_total;
IntCounter DorisMetrics::txn_exec_plan_total;
IntCounter DorisMetrics::stream_receive_bytes_total;
IntCounter DorisMetrics::stream_load_rows_total;

// gauges
IntGauge DorisMetrics::memory_pool_bytes_total;
IntGauge DorisMetrics::process_thread_num;
Expand Down Expand Up @@ -176,6 +186,9 @@ void DorisMetrics::initialize(
REGISTER_ENGINE_REQUEST_METRIC(cumulative_compaction, total, cumulative_compaction_request_total);
REGISTER_ENGINE_REQUEST_METRIC(cumulative_compaction, failed, cumulative_compaction_request_failed);

REGISTER_ENGINE_REQUEST_METRIC(publish, total, publish_task_request_total);
REGISTER_ENGINE_REQUEST_METRIC(publish, failed, publish_task_failed_total);

_metrics->register_metric(
"compaction_deltas_total", MetricLabels().add("type", "base"),
&base_compaction_deltas_total);
Expand All @@ -202,6 +215,26 @@ void DorisMetrics::initialize(
"meta_request_duration", MetricLabels().add("type", "read"),
&meta_read_request_duration_us);

_metrics->register_metric(
"txn_request", MetricLabels().add("type", "begin"),
&txn_begin_request_total);
_metrics->register_metric(
"txn_request", MetricLabels().add("type", "commit"),
&txn_commit_request_total);
_metrics->register_metric(
"txn_request", MetricLabels().add("type", "rollback"),
&txn_rollback_request_total);
_metrics->register_metric(
"txn_request", MetricLabels().add("type", "exec"),
&txn_exec_plan_total);

_metrics->register_metric(
"stream_load", MetricLabels().add("type", "receive_bytes"),
&stream_receive_bytes_total);
_metrics->register_metric(
"stream_load", MetricLabels().add("type", "load_rows"),
&stream_load_rows_total);

// Gauge
REGISTER_DORIS_METRIC(memory_pool_bytes_total);
REGISTER_DORIS_METRIC(process_thread_num);
Expand Down
11 changes: 9 additions & 2 deletions be/src/util/doris_metrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,14 +100,21 @@ class DorisMetrics {
static IntCounter cumulative_compaction_deltas_total;
static IntCounter cumulative_compaction_bytes_total;

static IntCounter alter_task_success_total;
static IntCounter alter_task_failed_total;
static IntCounter publish_task_request_total;
static IntCounter publish_task_failed_total;

static IntCounter meta_write_request_total;
static IntCounter meta_write_request_duration_us;
static IntCounter meta_read_request_total;
static IntCounter meta_read_request_duration_us;

static IntCounter txn_begin_request_total;
static IntCounter txn_commit_request_total;
static IntCounter txn_rollback_request_total;
static IntCounter txn_exec_plan_total;
static IntCounter stream_receive_bytes_total;
static IntCounter stream_load_rows_total;

// Gauges
static IntGauge memory_pool_bytes_total;
static IntGauge process_thread_num;
Expand Down
2 changes: 1 addition & 1 deletion be/src/util/system_metrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ void SystemMetrics::_update_cpu_metrics() {

if (getline(&_line_ptr, &_line_buf_size, fp) < 0) {
char buf[64];
LOG(WARNING) << "geline failed, errno=" << errno
LOG(WARNING) << "getline failed, errno=" << errno
<< ", message=" << strerror_r(errno, buf, 64);
fclose(fp);
return;
Expand Down
1 change: 1 addition & 0 deletions be/test/util/system_metrics_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ TEST_F(SystemMetricsTest, normal) {
network_interfaces.emplace_back("xgbe0");
SystemMetrics metrics;
metrics.install(&registry, disk_devices, network_interfaces);
metrics.update();

TestMetricsVisitor visitor;
registry.collect(&visitor);
Expand Down
2 changes: 1 addition & 1 deletion fe/src/main/java/org/apache/doris/backup/RestoreJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -680,7 +680,7 @@ private void checkAndPrepareMeta() {

// Send create replica task to BE outside the db lock
if (batchTask.getTaskNum() > 0) {
MarkedCountDownLatch latch = new MarkedCountDownLatch(batchTask.getTaskNum());
MarkedCountDownLatch<Long, Long> latch = new MarkedCountDownLatch<Long, Long>(batchTask.getTaskNum());
for (AgentTask task : batchTask.getAllTasks()) {
latch.addMark(((CreateReplicaTask) task).getBackendId(), ((CreateReplicaTask) task).getTabletId());
((CreateReplicaTask) task).setLatch(latch);
Expand Down
2 changes: 1 addition & 1 deletion fe/src/main/java/org/apache/doris/catalog/Catalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -3263,7 +3263,7 @@ private Partition createPartitionWithIndices(String clusterName, long dbId, long
TStorageType storageType = indexIdToStorageType.get(indexId);
List<Column> schema = indexIdToSchema.get(indexId);
int totalTaskNum = index.getTablets().size() * replicationNum;
MarkedCountDownLatch countDownLatch = new MarkedCountDownLatch(totalTaskNum);
MarkedCountDownLatch<Long, Long> countDownLatch = new MarkedCountDownLatch<Long, Long>(totalTaskNum);
AgentBatchTask batchTask = new AgentBatchTask();
for (Tablet tablet : index.getTablets()) {
long tabletId = tablet.getId();
Expand Down
2 changes: 1 addition & 1 deletion fe/src/main/java/org/apache/doris/catalog/Partition.java
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ public void setNextVersionHash(long nextVersionHash, long committedVersionHash)
}

public long getCommittedVersion() {
return Math.max(this.nextVersion - 1, 2);
return this.nextVersion - 1;
}

public long getCommittedVersionHash() {
Expand Down
2 changes: 1 addition & 1 deletion fe/src/main/java/org/apache/doris/common/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -774,7 +774,7 @@ public class Config extends ConfigBase {
/*
* If set to true, metric collector will be run as a daemon timer to collect metrics at fix interval
*/
@ConfField public static boolean enable_metric_calculator = false;
@ConfField public static boolean enable_metric_calculator = true;

/*
* the max concurrent task num of a routine load task
Expand Down
16 changes: 11 additions & 5 deletions fe/src/main/java/org/apache/doris/common/MarkedCountDownLatch.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,28 +25,34 @@
import java.util.Map.Entry;
import java.util.concurrent.CountDownLatch;

public class MarkedCountDownLatch extends CountDownLatch {
public class MarkedCountDownLatch<K, V> extends CountDownLatch {

private Multimap<Long, Long> marks;
private Multimap<K, V> marks;

public MarkedCountDownLatch(int count) {
super(count);
marks = HashMultimap.create();
}

public void addMark(long key, long value) {
public void addMark(K key, V value) {
marks.put(key, value);
}

public synchronized boolean markedCountDown(long key, long value) {
public synchronized boolean markedCountDown(K key, V value) {
if (marks.remove(key, value)) {
super.countDown();
return true;
}
return false;
}

public synchronized List<Entry<Long, Long>> getLeftMarks() {
public synchronized List<Entry<K, V>> getLeftMarks() {
return Lists.newArrayList(marks.entries());
}

public synchronized void countDownToZero() {
while(getCount() > 0) {
super.countDown();
}
}
}
4 changes: 1 addition & 3 deletions fe/src/main/java/org/apache/doris/load/ExportJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -291,11 +291,9 @@ private List<Coordinator> genCoordinators(List<PlanFragment> fragments, List<Sca
for (int i = 0; i < fragments.size(); ++i) {
PlanFragment fragment = fragments.get(i);
ScanNode scanNode = nodes.get(i);

TUniqueId queryId = new TUniqueId(uuid.getMostSignificantBits() + i, uuid.getLeastSignificantBits());

Coordinator coord = new Coordinator(
queryId, desc, Lists.newArrayList(fragment), Lists.newArrayList(scanNode), clusterName);
id, queryId, desc, Lists.newArrayList(fragment), Lists.newArrayList(scanNode), clusterName);
coords.add(coord);
this.coordList.add(coord);
}
Expand Down
2 changes: 1 addition & 1 deletion fe/src/main/java/org/apache/doris/load/Load.java
Original file line number Diff line number Diff line change
Expand Up @@ -3286,7 +3286,7 @@ public void deleteOld(DeleteStmt stmt) throws DdlException {
}

// send tasks to backends
MarkedCountDownLatch countDownLatch = new MarkedCountDownLatch(totalReplicaNum);
MarkedCountDownLatch<Long, Long> countDownLatch = new MarkedCountDownLatch<Long, Long>(totalReplicaNum);
for (AgentTask task : deleteBatchTask.getAllTasks()) {
countDownLatch.addMark(task.getBackendId(), task.getSignature());
((PushTask) task).setCountDownLatch(countDownLatch);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,18 @@
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Table;
import org.apache.doris.common.UserException;
import org.apache.doris.thrift.TExplainLevel;
import org.apache.doris.thrift.TOlapRewriteNode;
import org.apache.doris.thrift.TPlanNode;
import org.apache.doris.thrift.TPlanNodeType;
import org.apache.doris.common.UserException;

import com.google.common.collect.Lists;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.List;

// Used to convert column to valid OLAP table
Expand Down Expand Up @@ -108,7 +110,6 @@ public void computeStats(Analyzer analyzer) {
cardinality = Math.round(((double) getChild(0).cardinality) * computeSelectivity());
Preconditions.checkState(cardinality >= 0);
}
LOG.info("stats Select: cardinality=" + Long.toString(cardinality));
}

@Override
Expand Down
Loading