diff --git a/be/src/exec/broker_scan_node.cpp b/be/src/exec/broker_scan_node.cpp index cd0b3b746ec210..2a55c39cf30e3c 100644 --- a/be/src/exec/broker_scan_node.cpp +++ b/be/src/exec/broker_scan_node.cpp @@ -344,7 +344,7 @@ Status BrokerScanNode::scanner_scan( tuple = reinterpret_cast(new_tuple); counter->num_rows_returned++; } else { - counter->num_rows_filtered++; + counter->num_rows_unselected++; } } @@ -409,6 +409,7 @@ void BrokerScanNode::scanner_worker(int start_idx, int length) { // Update stats _runtime_state->update_num_rows_load_success(counter.num_rows_returned); _runtime_state->update_num_rows_load_filtered(counter.num_rows_filtered); + _runtime_state->update_num_rows_load_unselected(counter.num_rows_unselected); // scanner is going to finish { diff --git a/be/src/exec/broker_scanner.h b/be/src/exec/broker_scanner.h index 26a84c7ca3a54f..c2af32851454f4 100644 --- a/be/src/exec/broker_scanner.h +++ b/be/src/exec/broker_scanner.h @@ -49,11 +49,15 @@ class RuntimeProfile; class StreamLoadPipe; struct BrokerScanCounter { - BrokerScanCounter() : num_rows_returned(0), num_rows_filtered(0) { + BrokerScanCounter() : + num_rows_returned(0), + num_rows_filtered(0), + num_rows_unselected(0) { } - int64_t num_rows_returned; - int64_t num_rows_filtered; + int64_t num_rows_returned; // qualified rows + int64_t num_rows_filtered; // unqualified rows + int64_t num_rows_unselected; // rows filterd by predicates }; // Broker scanner convert the data read from broker to doris's tuple. diff --git a/be/src/runtime/routine_load/data_consumer.cpp b/be/src/runtime/routine_load/data_consumer.cpp index 5aec75b1c158d8..e9822fea2ec26a 100644 --- a/be/src/runtime/routine_load/data_consumer.cpp +++ b/be/src/runtime/routine_load/data_consumer.cpp @@ -135,14 +135,14 @@ Status KafkaDataConsumer::start(StreamLoadContext* ctx) { _last_visit_time = time(nullptr); - int64_t left_time = ctx->max_interval_s; + int64_t left_time = ctx->max_interval_s * 1000; int64_t left_rows = ctx->max_batch_rows; int64_t left_bytes = ctx->max_batch_size; std::shared_ptr kakfa_pipe = std::static_pointer_cast(ctx->body_sink); LOG(INFO) << "start consumer" - << ". max time(s): " << left_time + << ". max time(ms): " << left_time << ", batch rows: " << left_rows << ", batch size: " << left_bytes << ". " << ctx->brief(); @@ -168,10 +168,11 @@ Status KafkaDataConsumer::start(StreamLoadContext* ctx) { if (left_time <= 0 || left_rows <= 0 || left_bytes <=0) { LOG(INFO) << "kafka consume batch done" - << ". left time=" << left_time - << ", left rows=" << left_rows - << ", left bytes=" << left_bytes - << ", consumer time(ms)=" << consumer_watch.elapsed_time() / 1000 / 1000; + << ". consume time(ms)=" << ctx->max_interval_s * 1000 - left_time + << ", received rows=" << ctx->max_batch_rows - left_rows + << ", received bytes=" << ctx->max_batch_size - left_bytes + << ", kafka consume time(ms)=" << consumer_watch.elapsed_time() / 1000 / 1000; + if (left_bytes == ctx->max_batch_size) { // nothing to be consumed, cancel it @@ -230,7 +231,7 @@ Status KafkaDataConsumer::start(StreamLoadContext* ctx) { return st; } - left_time = ctx->max_interval_s - watch.elapsed_time() / 1000 / 1000 / 1000; + left_time = ctx->max_interval_s * 1000 - watch.elapsed_time() / 1000 / 1000; } return Status::OK; diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp index 5accbebb7232f3..b79f084b361832 100644 --- a/be/src/runtime/runtime_state.cpp +++ b/be/src/runtime/runtime_state.cpp @@ -61,6 +61,7 @@ RuntimeState::RuntimeState( _root_node_id(-1), _num_rows_load_success(0), _num_rows_load_filtered(0), + _num_rows_load_unselected(0), _num_print_error_rows(0), _normal_row_number(0), _error_row_number(0), @@ -86,6 +87,7 @@ RuntimeState::RuntimeState( _root_node_id(-1), _num_rows_load_success(0), _num_rows_load_filtered(0), + _num_rows_load_unselected(0), _num_print_error_rows(0), _normal_row_number(0), _error_row_number(0), diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index e89cbaaa78d4e5..ce58034557f13e 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -419,6 +419,10 @@ class RuntimeState { return _num_rows_load_filtered.load(); } + int64_t num_rows_load_unselected() { + return _num_rows_load_unselected.load(); + } + void update_num_rows_load_success(int64_t num_rows) { _num_rows_load_success.fetch_add(num_rows); } @@ -426,6 +430,11 @@ class RuntimeState { void update_num_rows_load_filtered(int64_t num_rows) { _num_rows_load_filtered.fetch_add(num_rows); } + + void update_num_rows_load_unselected(int64_t num_rows) { + _num_rows_load_unselected.fetch_add(num_rows); + } + void export_load_error(const std::string& error_msg); void set_per_fragment_instance_idx(int idx) { @@ -585,6 +594,7 @@ class RuntimeState { std::vector _output_files; std::atomic _num_rows_load_success; std::atomic _num_rows_load_filtered; + std::atomic _num_rows_load_unselected; std::atomic _num_print_error_rows; std::vector _export_output_files; diff --git a/be/src/runtime/stream_load/stream_load_context.cpp b/be/src/runtime/stream_load/stream_load_context.cpp index c4b2a706604d6f..30bdcd6e7de0e4 100644 --- a/be/src/runtime/stream_load/stream_load_context.cpp +++ b/be/src/runtime/stream_load/stream_load_context.cpp @@ -60,6 +60,8 @@ std::string StreamLoadContext::to_json() const { writer.Int64(number_loaded_rows); writer.Key("NumberFilteredRows"); writer.Int64(number_filtered_rows); + writer.Key("NumberUnselectedRows"); + writer.Int64(number_unselected_rows); writer.Key("LoadBytes"); writer.Int64(receive_bytes); writer.Key("LoadTimeMs"); diff --git a/be/src/runtime/stream_load/stream_load_context.h b/be/src/runtime/stream_load/stream_load_context.h index 5c320e84cd090d..9a279e08775ca4 100644 --- a/be/src/runtime/stream_load/stream_load_context.h +++ b/be/src/runtime/stream_load/stream_load_context.h @@ -149,6 +149,7 @@ class StreamLoadContext { int64_t number_loaded_rows = 0; int64_t number_filtered_rows = 0; + int64_t number_unselected_rows = 0; int64_t loaded_bytes = 0; int64_t start_nanos = 0; int64_t load_cost_nanos = 0; diff --git a/be/src/runtime/stream_load/stream_load_executor.cpp b/be/src/runtime/stream_load/stream_load_executor.cpp index 774b0d7bb47c3c..ffa2680a84b246 100644 --- a/be/src/runtime/stream_load/stream_load_executor.cpp +++ b/be/src/runtime/stream_load/stream_load_executor.cpp @@ -53,6 +53,8 @@ Status StreamLoadExecutor::execute_plan_fragment(StreamLoadContext* ctx) { if (status.ok()) { ctx->number_loaded_rows = executor->runtime_state()->num_rows_load_success(); ctx->number_filtered_rows = executor->runtime_state()->num_rows_load_filtered(); + ctx->number_unselected_rows = executor->runtime_state()->num_rows_load_unselected(); + int64_t num_total_rows = ctx->number_loaded_rows + ctx->number_filtered_rows; if ((0.0 + ctx->number_filtered_rows) / num_total_rows > ctx->max_filter_ratio) { @@ -218,6 +220,7 @@ bool StreamLoadExecutor::collect_load_stat(StreamLoadContext* ctx, TTxnCommitAtt rl_attach.id = ctx->id.to_thrift(); rl_attach.__set_loadedRows(ctx->number_loaded_rows); rl_attach.__set_filteredRows(ctx->number_filtered_rows); + rl_attach.__set_unselectedRows(ctx->number_unselected_rows); rl_attach.__set_receivedBytes(ctx->receive_bytes); rl_attach.__set_loadedBytes(ctx->loaded_bytes); rl_attach.__set_loadCostMs(ctx->load_cost_nanos / 1000 / 1000); diff --git a/fe/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java b/fe/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java index b8c9e0a32193e0..aca5c3cf44cec0 100644 --- a/fe/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java +++ b/fe/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java @@ -21,8 +21,10 @@ import org.apache.doris.common.FeNameFormat; import org.apache.doris.common.Pair; import org.apache.doris.common.UserException; +import org.apache.doris.common.util.Util; import org.apache.doris.load.RoutineLoadDesc; import org.apache.doris.load.routineload.LoadDataSourceType; +import org.apache.doris.load.routineload.RoutineLoadJob; import com.google.common.base.Strings; import com.google.common.collect.ImmutableSet; @@ -32,6 +34,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.function.Predicate; import java.util.regex.Pattern; /* @@ -79,9 +82,9 @@ public class CreateRoutineLoadStmt extends DdlStmt { // max error number in ten thousand records public static final String MAX_ERROR_NUMBER_PROPERTY = "max_error_number"; // the following 3 properties limit the time and batch size of a single routine load task - public static final String MAX_BATCH_INTERVAL_SECOND = "max_batch_interval"; - public static final String MAX_BATCH_ROWS = "max_batch_rows"; - public static final String MAX_BATCH_SIZE = "max_batch_size"; + public static final String MAX_BATCH_INTERVAL_SEC_PROPERTY = "max_batch_interval"; + public static final String MAX_BATCH_ROWS_PROPERTY = "max_batch_rows"; + public static final String MAX_BATCH_SIZE_PROPERTY = "max_batch_size"; // kafka type properties public static final String KAFKA_BROKER_LIST_PROPERTY = "kafka_broker_list"; @@ -96,9 +99,9 @@ public class CreateRoutineLoadStmt extends DdlStmt { private static final ImmutableSet PROPERTIES_SET = new ImmutableSet.Builder() .add(DESIRED_CONCURRENT_NUMBER_PROPERTY) .add(MAX_ERROR_NUMBER_PROPERTY) - .add(MAX_BATCH_INTERVAL_SECOND) - .add(MAX_BATCH_ROWS) - .add(MAX_BATCH_SIZE) + .add(MAX_BATCH_INTERVAL_SEC_PROPERTY) + .add(MAX_BATCH_ROWS_PROPERTY) + .add(MAX_BATCH_SIZE_PROPERTY) .build(); private static final ImmutableSet KAFKA_PROPERTIES_SET = new ImmutableSet.Builder() @@ -121,10 +124,10 @@ public class CreateRoutineLoadStmt extends DdlStmt { private String dbName; private RoutineLoadDesc routineLoadDesc; private int desiredConcurrentNum = 1; - private int maxErrorNum = -1; - private int maxBatchIntervalS = -1; - private int maxBatchRows = -1; - private int maxBatchSizeBytes = -1; + private long maxErrorNum = -1; + private long maxBatchIntervalS = -1; + private long maxBatchRows = -1; + private long maxBatchSizeBytes = -1; // kafka related properties private String kafkaBrokerList; @@ -132,6 +135,12 @@ public class CreateRoutineLoadStmt extends DdlStmt { // pair private List> kafkaPartitionOffsets = Lists.newArrayList(); + private static final Predicate DESIRED_CONCURRENT_NUMBER_PRED = (v) -> { return v > 0L; }; + private static final Predicate MAX_ERROR_NUMBER_PRED = (v) -> { return v >= 0L; }; + private static final Predicate MAX_BATCH_INTERVAL_PRED = (v) -> { return v >= 5 && v <= 60; }; + private static final Predicate MAX_BATCH_ROWS_PRED = (v) -> { return v > 200000; }; + private static final Predicate MAX_BATCH_SIZE_PRED = (v) -> { return v >= 100 * 1024 * 1024 && v <= 1024 * 1024 * 1024; }; + public CreateRoutineLoadStmt(LabelName labelName, String tableName, List loadPropertyList, Map jobProperties, String typeName, Map dataSourceProperties) { @@ -167,19 +176,19 @@ public int getDesiredConcurrentNum() { return desiredConcurrentNum; } - public int getMaxErrorNum() { + public long getMaxErrorNum() { return maxErrorNum; } - public int getMaxBatchIntervalS() { + public long getMaxBatchIntervalS() { return maxBatchIntervalS; } - public int getMaxBatchRows() { + public long getMaxBatchRows() { return maxBatchRows; } - public int getMaxBatchSize() { + public long getMaxBatchSize() { return maxBatchSizeBytes; } @@ -268,26 +277,25 @@ private void checkJobProperties() throws AnalysisException { throw new AnalysisException(optional.get() + " is invalid property"); } - desiredConcurrentNum = getIntegetPropertyOrDefault(DESIRED_CONCURRENT_NUMBER_PROPERTY, - "must be greater then 0", desiredConcurrentNum); - maxErrorNum = getIntegetPropertyOrDefault(MAX_ERROR_NUMBER_PROPERTY, - "must be greater then or equal to 0", maxErrorNum); - maxBatchIntervalS = getIntegetPropertyOrDefault(MAX_BATCH_INTERVAL_SECOND, - "must be greater then 0", maxBatchIntervalS); - maxBatchRows = getIntegetPropertyOrDefault(MAX_BATCH_ROWS, "must be greater then 0", maxBatchRows); - maxBatchSizeBytes = getIntegetPropertyOrDefault(MAX_BATCH_SIZE, "must be greater then 0", maxBatchSizeBytes); - } - - private int getIntegetPropertyOrDefault(String propName, String hintMsg, int defaultVal) throws AnalysisException { - final String propVal = jobProperties.get(propName); - if (propVal != null) { - int intVal = getIntegerValueFromString(propVal, propName); - if (intVal <= 0) { - throw new AnalysisException(propName + " " + hintMsg); - } - return intVal; - } - return defaultVal; + desiredConcurrentNum = ((Long) Util.getLongPropertyOrDefault(jobProperties.get(DESIRED_CONCURRENT_NUMBER_PROPERTY), + RoutineLoadJob.DEFAULT_TASK_MAX_CONCURRENT_NUM, DESIRED_CONCURRENT_NUMBER_PRED, + DESIRED_CONCURRENT_NUMBER_PROPERTY + " should > 0")).intValue(); + + maxErrorNum = Util.getLongPropertyOrDefault(jobProperties.get(MAX_ERROR_NUMBER_PROPERTY), + RoutineLoadJob.DEFAULT_MAX_ERROR_NUM, MAX_ERROR_NUMBER_PRED, + MAX_ERROR_NUMBER_PROPERTY + " should >= 0"); + + maxBatchIntervalS = Util.getLongPropertyOrDefault(jobProperties.get(MAX_BATCH_INTERVAL_SEC_PROPERTY), + RoutineLoadJob.DEFAULT_MAX_INTERVAL_SECOND, MAX_BATCH_INTERVAL_PRED, + MAX_BATCH_INTERVAL_SEC_PROPERTY + " should between 5 and 60"); + + maxBatchRows = Util.getLongPropertyOrDefault(jobProperties.get(MAX_BATCH_ROWS_PROPERTY), + RoutineLoadJob.DEFAULT_MAX_BATCH_ROWS, MAX_BATCH_ROWS_PRED, + MAX_BATCH_ROWS_PROPERTY + " should > 200000"); + + maxBatchSizeBytes = Util.getLongPropertyOrDefault(jobProperties.get(MAX_BATCH_SIZE_PROPERTY), + RoutineLoadJob.DEFAULT_MAX_BATCH_SIZE, MAX_BATCH_SIZE_PRED, + MAX_BATCH_SIZE_PROPERTY + " should between 100MB and 1GB"); } private void checkDataSourceProperties() throws AnalysisException { diff --git a/fe/src/main/java/org/apache/doris/analysis/ImportColumnDesc.java b/fe/src/main/java/org/apache/doris/analysis/ImportColumnDesc.java index 547729f8e53bb3..1274fa71cf958e 100644 --- a/fe/src/main/java/org/apache/doris/analysis/ImportColumnDesc.java +++ b/fe/src/main/java/org/apache/doris/analysis/ImportColumnDesc.java @@ -50,7 +50,7 @@ public String toString() { StringBuilder sb = new StringBuilder(); sb.append(columnName); if (expr != null) { - sb.append(" = ").append(expr.toSql()); + sb.append("=").append(expr.toSql()); } return sb.toString(); } diff --git a/fe/src/main/java/org/apache/doris/analysis/ShowRoutineLoadStmt.java b/fe/src/main/java/org/apache/doris/analysis/ShowRoutineLoadStmt.java index 71a9f871a8f050..cf3edc9e1f6e19 100644 --- a/fe/src/main/java/org/apache/doris/analysis/ShowRoutineLoadStmt.java +++ b/fe/src/main/java/org/apache/doris/analysis/ShowRoutineLoadStmt.java @@ -18,8 +18,6 @@ package org.apache.doris.analysis; -import com.google.common.base.Strings; -import com.google.common.collect.ImmutableList; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.ScalarType; import org.apache.doris.cluster.ClusterNamespace; @@ -27,6 +25,9 @@ import org.apache.doris.common.UserException; import org.apache.doris.qe.ShowResultSetMetaData; +import com.google.common.base.Strings; +import com.google.common.collect.ImmutableList; + /* Show routine load progress by routine load name @@ -63,15 +64,16 @@ public class ShowRoutineLoadStmt extends ShowStmt { new ImmutableList.Builder() .add("Id") .add("Name") + .add("CreateTime") + .add("EndTime") .add("DBId") .add("TableId") .add("State") .add("DataSourceType") + .add("CurrentTaskNum") .add("JobProperties") .add("DataSourceProperties") - .add("CurrentTaskConcurrentNumber") - .add("TotalRows") - .add("TotalErrorRows") + .add("Statistic") .add("Progress") .add("ReasonOfStateChanged") .build(); diff --git a/fe/src/main/java/org/apache/doris/common/util/Util.java b/fe/src/main/java/org/apache/doris/common/util/Util.java index 9eeb20be68daed..a98446ca02972b 100644 --- a/fe/src/main/java/org/apache/doris/common/util/Util.java +++ b/fe/src/main/java/org/apache/doris/common/util/Util.java @@ -19,7 +19,9 @@ import org.apache.doris.catalog.Column; import org.apache.doris.catalog.PrimitiveType; +import org.apache.doris.common.AnalysisException; +import com.google.common.base.Strings; import com.google.common.collect.Lists; import org.apache.logging.log4j.LogManager; @@ -40,6 +42,7 @@ import java.util.Map; import java.util.Random; import java.util.Set; +import java.util.function.Predicate; import java.util.zip.Adler32; public class Util { @@ -362,5 +365,29 @@ public static String getResultForUrl(String urlStr, String encodedAuthInfo, int LOG.debug("get result from url {}: {}", urlStr, sb.toString()); return sb.toString(); } + + public static long getLongPropertyOrDefault(String valStr, long defaultVal, Predicate pred, + String hintMsg) throws AnalysisException { + if (Strings.isNullOrEmpty(valStr)) { + return defaultVal; + } + + long result = defaultVal; + try { + result = Long.valueOf(valStr); + } catch (NumberFormatException e) { + throw new AnalysisException(hintMsg); + } + + if (pred == null) { + return result; + } + + if (!pred.test(result)) { + throw new AnalysisException(hintMsg); + } + + return result; + } } diff --git a/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java b/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java index e118d6c4c5bbf7..73dc4e3ffdaf5e 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java @@ -41,6 +41,7 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.gson.Gson; +import com.google.gson.GsonBuilder; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.PartitionInfo; @@ -52,6 +53,7 @@ import java.io.IOException; import java.time.Duration; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Properties; @@ -74,7 +76,7 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob { private List currentKafkaPartitions = Lists.newArrayList(); // this is the kafka consumer which is used to fetch the number of partitions - private KafkaConsumer consumer; + private KafkaConsumer consumer; public KafkaRoutineLoadJob() { // for serialization, id is dummy @@ -125,7 +127,7 @@ public void divideRoutineLoadJob(int currentConcurrentTaskNum) { for (int i = 0; i < currentConcurrentTaskNum; i++) { Map taskKafkaProgress = Maps.newHashMap(); for (int j = 0; j < currentKafkaPartitions.size(); j++) { - if (j % currentConcurrentTaskNum == 0) { + if (j % currentConcurrentTaskNum == i) { int kafkaPartition = currentKafkaPartitions.get(j); taskKafkaProgress.put(kafkaPartition, ((KafkaProgress) progress).getPartitionIdToOffset().get(kafkaPartition)); @@ -161,7 +163,7 @@ public int calculateCurrentConcurrentTaskNum() throws MetaNotFoundException { LOG.info("current concurrent task number is min " + "(current size of partition {}, desire task concurrent num {}, alive be num {})", partitionNum, desireTaskConcurrentNum, aliveBeNum); - currentTaskConcurrentNum = + currentTaskConcurrentNum = Math.min(Math.min(partitionNum, Math.min(desireTaskConcurrentNum, aliveBeNum)), DEFAULT_TASK_MAX_CONCURRENT_NUM); return currentTaskConcurrentNum; } @@ -172,7 +174,7 @@ public int calculateCurrentConcurrentTaskNum() throws MetaNotFoundException { // this task should not be commit // otherwise currentErrorNum and currentTotalNum is updated when progress is not updated @Override - boolean checkCommitInfo(RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment) { + protected boolean checkCommitInfo(RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment) { if (rlTaskTxnCommitAttachment.getLoadedRows() > 0 && ((KafkaProgress) rlTaskTxnCommitAttachment.getProgress()).getPartitionIdToOffset().isEmpty()) { LOG.warn(new LogBuilder(LogKey.ROUINTE_LOAD_TASK, DebugUtil.printId(rlTaskTxnCommitAttachment.getTaskId())) @@ -273,6 +275,23 @@ protected boolean unprotectNeedReschedule() { } } + @Override + protected String getStatistic() { + Map summary = Maps.newHashMap(); + summary.put("totalRows", Long.valueOf(totalRows)); + summary.put("loadedRows", Long.valueOf(totalRows - errorRows - unselectedRows)); + summary.put("errorRows", Long.valueOf(errorRows)); + summary.put("unselectedRows", Long.valueOf(unselectedRows)); + summary.put("receivedBytes", Long.valueOf(receivedBytes)); + summary.put("taskExecuteTaskMs", Long.valueOf(totalTaskExcutionTimeMs)); + summary.put("receivedBytesRate", Long.valueOf(receivedBytes / totalTaskExcutionTimeMs * 1000)); + summary.put("loadRowsRate", Long.valueOf((totalRows - errorRows - unselectedRows) / totalTaskExcutionTimeMs * 1000)); + summary.put("committedTaskNum", Long.valueOf(committedTaskNum)); + summary.put("abortedTaskNum", Long.valueOf(abortedTaskNum)); + Gson gson = new GsonBuilder().disableHtmlEscaping().create(); + return gson.toJson(summary); + } + private List getAllKafkaPartitions() { List result = new ArrayList<>(); List partitionList = consumer.partitionsFor(topic, @@ -358,8 +377,10 @@ protected String dataSourcePropertiesJsonToString() { Map dataSourceProperties = Maps.newHashMap(); dataSourceProperties.put("brokerList", brokerList); dataSourceProperties.put("topic", topic); - dataSourceProperties.put("currentKafkaPartitions", Joiner.on(",").join(currentKafkaPartitions)); - Gson gson = new Gson(); + List sortedPartitions = Lists.newArrayList(currentKafkaPartitions); + Collections.sort(sortedPartitions); + dataSourceProperties.put("currentKafkaPartitions", Joiner.on(",").join(sortedPartitions)); + Gson gson = new GsonBuilder().disableHtmlEscaping().create(); return gson.toJson(dataSourceProperties); } diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RLTaskTxnCommitAttachment.java b/fe/src/main/java/org/apache/doris/load/routineload/RLTaskTxnCommitAttachment.java index 5006bd42ccba66..9488fc0b920c58 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/RLTaskTxnCommitAttachment.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/RLTaskTxnCommitAttachment.java @@ -34,6 +34,9 @@ public class RLTaskTxnCommitAttachment extends TxnCommitAttachment { private TUniqueId taskId; private long filteredRows; private long loadedRows; + private long unselectedRows; + private long receivedBytes; + private long taskExecutionTimeMs; private RoutineLoadProgress progress; public RLTaskTxnCommitAttachment() { @@ -46,6 +49,9 @@ public RLTaskTxnCommitAttachment(TRLTaskTxnCommitAttachment rlTaskTxnCommitAttac this.taskId = rlTaskTxnCommitAttachment.getId(); this.filteredRows = rlTaskTxnCommitAttachment.getFilteredRows(); this.loadedRows = rlTaskTxnCommitAttachment.getLoadedRows(); + this.unselectedRows = rlTaskTxnCommitAttachment.getUnselectedRows(); + this.receivedBytes = rlTaskTxnCommitAttachment.getReceivedBytes(); + this.taskExecutionTimeMs = rlTaskTxnCommitAttachment.getLoadCostMs(); switch (rlTaskTxnCommitAttachment.getLoadSourceType()) { case KAFKA: @@ -67,14 +73,32 @@ public long getLoadedRows() { return loadedRows; } + public long getUnselectedRows() { + return unselectedRows; + } + + public long getTotalRows() { + return filteredRows + loadedRows + unselectedRows; + } + + public long getReceivedBytes() { + return receivedBytes; + } + + public long getTaskExecutionTimeMs() { + return taskExecutionTimeMs; + } + public RoutineLoadProgress getProgress() { return progress; } @Override public String toString() { - return "RoutineLoadTaskTxnExtra [filteredRows=" + filteredRows + return "RLTaskTxnCommitAttachment [filteredRows=" + filteredRows + ", loadedRows=" + loadedRows + + ", receivedBytes=" + receivedBytes + + ", taskExecutionTimeMs=" + taskExecutionTimeMs + ", taskId=" + taskId + ", jobId=" + jobId + ", progress=" + progress.toString() + "]"; @@ -85,6 +109,9 @@ public void write(DataOutput out) throws IOException { super.write(out); out.writeLong(filteredRows); out.writeLong(loadedRows); + out.writeLong(unselectedRows); + out.writeLong(receivedBytes); + out.writeLong(taskExecutionTimeMs); progress.write(out); } @@ -93,6 +120,9 @@ public void readFields(DataInput in) throws IOException { super.readFields(in); filteredRows = in.readLong(); loadedRows = in.readLong(); + unselectedRows = in.readLong(); + receivedBytes = in.readLong(); + taskExecutionTimeMs = in.readLong(); progress = RoutineLoadProgress.read(in); } } diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java index fc298183f25307..1a8edabb7e480a 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java @@ -40,7 +40,9 @@ import org.apache.doris.common.io.Writable; import org.apache.doris.common.util.LogBuilder; import org.apache.doris.common.util.LogKey; +import org.apache.doris.common.util.TimeUtils; import org.apache.doris.load.RoutineLoadDesc; +import org.apache.doris.metric.MetricRepo; import org.apache.doris.persist.RoutineLoadOperation; import org.apache.doris.planner.StreamLoadPlanner; import org.apache.doris.qe.ConnectContext; @@ -58,6 +60,7 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.gson.Gson; +import com.google.gson.GsonBuilder; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -81,25 +84,24 @@ * The routine load job support different streaming medium such as KAFKA */ public abstract class RoutineLoadJob implements TxnStateChangeListener, Writable { - private static final Logger LOG = LogManager.getLogger(RoutineLoadJob.class); - private static final int DEFAULT_TASK_TIMEOUT_SECONDS = 10; - private static final int ERROR_SAMPLE_NUM = 1000 * 10000; - private static final int DEFAULT_MAX_ERROR_NUM = 0; - private static final int DEFAULT_MAX_INTERVAL_SECOND = 5; - private static final int DEFAULT_MAX_BATCH_ROWS = 100000; - private static final int DEFAULT_MAX_BATCH_SIZE = 100 * 1024 * 1024; // 100MB + public static final int DEFAULT_TASK_MAX_CONCURRENT_NUM = 3; + public static final long DEFAULT_MAX_ERROR_NUM = 0; + + public static final long DEFAULT_MAX_INTERVAL_SECOND = 10; + public static final long DEFAULT_MAX_BATCH_ROWS = 200000; + public static final long DEFAULT_MAX_BATCH_SIZE = 100 * 1024 * 1024; // 100MB + protected static final String STAR_STRING = "*"; - protected static final int DEFAULT_TASK_MAX_CONCURRENT_NUM = 3; /** * +-----------------+ - * fe schedule job | NEED_SCHEDULE | user resume job - * +----------- + | <---------+ + * fe schedule job | NEED_SCHEDULE | user resume job + * +--------------- + | <---------+ * | | | | * v +-----------------+ ^ - * | + * | | * +------------+ user pause job +-------+----+ * | RUNNING | | PAUSED | * | +-----------------------> | | @@ -145,25 +147,42 @@ public boolean isFinalState() { // max number of error data in ten thousand data // maxErrorNum / BASE_OF_ERROR_RATE = max error rate of routine load job // if current error rate is more then max error rate, the job will be paused - protected int maxErrorNum = DEFAULT_MAX_ERROR_NUM; // optional - protected int maxBatchIntervalS = DEFAULT_MAX_INTERVAL_SECOND; - protected int maxBatchRows = DEFAULT_MAX_BATCH_ROWS; - protected int maxBatchSizeBytes = DEFAULT_MAX_BATCH_SIZE; + protected long maxErrorNum = DEFAULT_MAX_ERROR_NUM; // optional + + /* + * The following 3 variables control the max execute time of a single task. + * The default max batch interval time is 10 secs. + * If a task can consume data from source at rate of 10MB/s, and 500B a row, + * then we can process 100MB for 10 secs, which is 200000 rows + */ + protected long maxBatchIntervalS = DEFAULT_MAX_INTERVAL_SECOND; + protected long maxBatchRows = DEFAULT_MAX_BATCH_ROWS; + protected long maxBatchSizeBytes = DEFAULT_MAX_BATCH_SIZE; protected int currentTaskConcurrentNum; protected RoutineLoadProgress progress; + protected String pausedReason; protected String cancelReason; + + protected long createTimestamp = System.currentTimeMillis(); protected long endTimestamp = -1; /* - * currentErrorRows and currentTotalRows is used for check error rate - * errorRows and totalRows are used for statistics + * The following variables are for statistics + * currentErrorRows/currentTotalRows: the row statistics of current sampling period + * errorRows/totalRows/receivedBytes: cumulative measurement + * totalTaskExcutorTimeMs: cumulative execution time of tasks */ - protected long currentErrorRows; - protected long currentTotalRows; - protected long errorRows; - protected long totalRows; + protected long currentErrorRows = 0; + protected long currentTotalRows = 0; + protected long errorRows = 0; + protected long totalRows = 0; + protected long unselectedRows = 0; + protected long receivedBytes = 0; + protected long totalTaskExcutionTimeMs = 1; // init as 1 to avoid division by zero + protected long committedTaskNum = 0; + protected long abortedTaskNum = 0; // The tasks belong to this job protected List routineLoadTaskInfoList = Lists.newArrayList(); @@ -353,15 +372,15 @@ public RoutineLoadProgress getProgress() { return progress; } - public int getMaxBatchIntervalS() { + public long getMaxBatchIntervalS() { return maxBatchIntervalS; } - public int getMaxBatchRows() { + public long getMaxBatchRows() { return maxBatchRows; } - public int getMaxBatchSizeBytes() { + public long getMaxBatchSizeBytes() { return maxBatchSizeBytes; } @@ -379,16 +398,15 @@ public TExecPlanFragmentParams gettExecPlanFragmentParams() { } // only check loading task - public List processTimeoutTasks() { - List result = new ArrayList<>(); + public void processTimeoutTasks() { List timeoutTaskList = new ArrayList<>(); writeLock(); try { List runningTasks = new ArrayList<>(routineLoadTaskInfoList); for (RoutineLoadTaskInfo routineLoadTaskInfo : runningTasks) { - if ((routineLoadTaskInfo.getLoadStartTimeMs() != 0L) + if (routineLoadTaskInfo.isRunning() && ((System.currentTimeMillis() - routineLoadTaskInfo.getLoadStartTimeMs()) - > DEFAULT_TASK_TIMEOUT_SECONDS * 1000)) { + > maxBatchIntervalS * 2 * 1000)) { timeoutTaskList.add(routineLoadTaskInfo); } } @@ -411,7 +429,6 @@ public List processTimeoutTasks() { } } } - return result; } abstract void divideRoutineLoadJob(int currentConcurrentTaskNum); @@ -469,18 +486,29 @@ private void checkStateTransform(RoutineLoadJob.JobState desireState) // if rate of error data is more then max_filter_ratio, pause job protected void updateProgress(RLTaskTxnCommitAttachment attachment) { - updateNumOfData(attachment.getFilteredRows(), attachment.getLoadedRows() + attachment.getFilteredRows(), + updateNumOfData(attachment.getTotalRows(), attachment.getFilteredRows(), attachment.getUnselectedRows(), + attachment.getReceivedBytes(), attachment.getTaskExecutionTimeMs(), false /* not replay */); } - private void updateNumOfData(long numOfErrorRows, long numOfTotalRows, boolean isReplay) { - totalRows += numOfTotalRows; - errorRows += numOfErrorRows; + private void updateNumOfData(long numOfTotalRows, long numOfErrorRows, long unselectedRows, long receivedBytes, + long taskExecutionTime, boolean isReplay) { + this.totalRows += numOfTotalRows; + this.errorRows += numOfErrorRows; + this.unselectedRows += unselectedRows; + this.receivedBytes += receivedBytes; + this.totalTaskExcutionTimeMs += taskExecutionTime; + + if (MetricRepo.isInit.get()) { + MetricRepo.COUNTER_ROUTINE_LOAD_ROWS.increase(numOfTotalRows); + MetricRepo.COUNTER_ROUTINE_LOAD_ERROR_ROWS.increase(numOfErrorRows); + MetricRepo.COUNTER_ROUTINE_LOAD_RECEIVED_BYTES.increase(receivedBytes); + } // check error rate currentErrorRows += numOfErrorRows; currentTotalRows += numOfTotalRows; - if (currentTotalRows > ERROR_SAMPLE_NUM) { + if (currentTotalRows > maxBatchRows * 10) { if (currentErrorRows > maxErrorNum) { LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id) .add("current_total_rows", currentTotalRows) @@ -528,8 +556,8 @@ private void updateNumOfData(long numOfErrorRows, long numOfTotalRows, boolean i } protected void replayUpdateProgress(RLTaskTxnCommitAttachment attachment) { - updateNumOfData(attachment.getFilteredRows(), attachment.getLoadedRows() + attachment.getFilteredRows(), - true /* is replay */); + updateNumOfData(attachment.getTotalRows(), attachment.getFilteredRows(), attachment.getUnselectedRows(), + attachment.getReceivedBytes(), attachment.getTaskExecutionTimeMs(), true /* is replay */); } abstract RoutineLoadTaskInfo unprotectRenewTask(RoutineLoadTaskInfo routineLoadTaskInfo) throws AnalysisException, @@ -605,6 +633,7 @@ public ListenResult onCommitted(TransactionState txnState) throws TransactionExc RoutineLoadTaskInfo routineLoadTaskInfo = routineLoadTaskInfoOptional.get(); taskBeId = routineLoadTaskInfo.getBeId(); executeCommitTask(routineLoadTaskInfo, txnState); + ++committedTaskNum; result = ListenResult.CHANGED; } else { LOG.debug(new LogBuilder(LogKey.ROUINTE_LOAD_TASK, txnState.getLabel()).add("txn_id", @@ -631,6 +660,7 @@ public ListenResult onCommitted(TransactionState txnState) throws TransactionExc public void replayOnCommitted(TransactionState txnState) { Preconditions.checkNotNull(txnState.getTxnCommitAttachment(), txnState); replayUpdateProgress((RLTaskTxnCommitAttachment) txnState.getTxnCommitAttachment()); + this.committedTaskNum++; LOG.debug("replay on committed: {}", txnState); } @@ -674,6 +704,7 @@ public ListenResult onAborted(TransactionState txnState, String txnStatusChangeR } // step2: commit task , update progress, maybe create a new task executeCommitTask(routineLoadTaskInfo, txnState); + ++abortedTaskNum; result = ListenResult.CHANGED; } } catch (Exception e) { @@ -695,6 +726,7 @@ public void replayOnAborted(TransactionState txnState) { if (txnState.getTxnCommitAttachment() != null) { replayUpdateProgress((RLTaskTxnCommitAttachment) txnState.getTxnCommitAttachment()); } + this.abortedTaskNum++; LOG.debug("replay on aborted: {}, has attachment: {}", txnState, txnState.getTxnCommitAttachment() == null); } @@ -709,7 +741,7 @@ private ListenResult executeCommitTask(RoutineLoadTaskInfo routineLoadTaskInfo, .add("job_id", routineLoadTaskInfo.getJobId()) .add("txn_id", routineLoadTaskInfo.getTxnId()) .add("msg", "commit task will be ignore when attachment txn of task is null," - + " maybe task was committed by master when timeout") + + " maybe task was aborted by master when timeout") .build()); } else if (checkCommitInfo(rlTaskTxnCommitAttachment)) { // step2: update job progress @@ -720,7 +752,8 @@ private ListenResult executeCommitTask(RoutineLoadTaskInfo routineLoadTaskInfo, if (state == JobState.RUNNING) { // step2: create a new task for partitions RoutineLoadTaskInfo newRoutineLoadTaskInfo = unprotectRenewTask(routineLoadTaskInfo); - Catalog.getCurrentCatalog().getRoutineLoadTaskScheduler().addTaskInQueue(newRoutineLoadTaskInfo); + Catalog.getCurrentCatalog().getRoutineLoadTaskScheduler().addTaskInQueue( + Lists.newArrayList(newRoutineLoadTaskInfo)); } return result; @@ -902,21 +935,35 @@ public void setOrigStmt(String origStmt) { } // check the correctness of commit info - abstract boolean checkCommitInfo(RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment); + protected abstract boolean checkCommitInfo(RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment); + + protected abstract String getStatistic(); public List getShowInfo() { + Database db = Catalog.getCurrentCatalog().getDb(dbId); + Table tbl = null; + if (db != null) { + db.readLock(); + try { + tbl = db.getTable(tableId); + } finally { + db.readUnlock(); + } + } + List row = Lists.newArrayList(); row.add(String.valueOf(id)); row.add(name); - row.add(String.valueOf(dbId)); - row.add(String.valueOf(tableId)); + row.add(TimeUtils.longToTimeString(createTimestamp)); + row.add(TimeUtils.longToTimeString(endTimestamp)); + row.add(db == null ? String.valueOf(dbId) : db.getFullName()); + row.add(tbl == null ? String.valueOf(tableId) : tbl.getName()); row.add(getState().name()); row.add(dataSourceType.name()); + row.add(String.valueOf(getSizeOfRoutineLoadTaskInfoList())); row.add(jobPropertiesToJsonString()); row.add(dataSourcePropertiesJsonToString()); - row.add(String.valueOf(currentTaskConcurrentNum)); - row.add(String.valueOf(totalRows)); - row.add(String.valueOf(errorRows)); + row.add(getStatistic()); row.add(getProgress().toJsonString()); switch (state) { case PAUSED: @@ -941,13 +988,14 @@ private String jobPropertiesToJsonString() { Map jobProperties = Maps.newHashMap(); jobProperties.put("partitions", partitions == null ? STAR_STRING : Joiner.on(",").join(partitions)); jobProperties.put("columnToColumnExpr", columnDescs == null ? STAR_STRING : Joiner.on(",").join(columnDescs)); - jobProperties.put("whereExpr", whereExpr == null ? STAR_STRING : whereExpr.toString()); + jobProperties.put("whereExpr", whereExpr == null ? STAR_STRING : whereExpr.toSql()); jobProperties.put("columnSeparator", columnSeparator == null ? "\t" : columnSeparator.toString()); jobProperties.put("maxErrorNum", String.valueOf(maxErrorNum)); jobProperties.put("maxBatchIntervalS", String.valueOf(maxBatchIntervalS)); jobProperties.put("maxBatchRows", String.valueOf(maxBatchRows)); jobProperties.put("maxBatchSizeBytes", String.valueOf(maxBatchSizeBytes)); - Gson gson = new Gson(); + jobProperties.put("currentTaskConcurrentNum", String.valueOf(currentTaskConcurrentNum)); + Gson gson = new GsonBuilder().disableHtmlEscaping().create(); return gson.toJson(jobProperties); } @@ -989,13 +1037,25 @@ public void write(DataOutput out) throws IOException { out.writeLong(dbId); out.writeLong(tableId); out.writeInt(desireTaskConcurrentNum); - out.writeInt(maxErrorNum); - out.writeInt(maxBatchIntervalS); - out.writeInt(maxBatchRows); - out.writeInt(maxBatchSizeBytes); + out.writeLong(maxErrorNum); + out.writeLong(maxBatchIntervalS); + out.writeLong(maxBatchRows); + out.writeLong(maxBatchSizeBytes); progress.write(out); + + out.writeLong(createTimestamp); + out.writeLong(endTimestamp); + out.writeLong(currentErrorRows); out.writeLong(currentTotalRows); + out.writeLong(errorRows); + out.writeLong(totalRows); + out.writeLong(unselectedRows); + out.writeLong(receivedBytes); + out.writeLong(totalTaskExcutionTimeMs); + out.writeLong(committedTaskNum); + out.writeLong(abortedTaskNum); + Text.writeString(out, origStmt); } @@ -1012,10 +1072,10 @@ public void readFields(DataInput in) throws IOException { dbId = in.readLong(); tableId = in.readLong(); desireTaskConcurrentNum = in.readInt(); - maxErrorNum = in.readInt(); - maxBatchIntervalS = in.readInt(); - maxBatchRows = in.readInt(); - maxBatchSizeBytes = in.readInt(); + maxErrorNum = in.readLong(); + maxBatchIntervalS = in.readLong(); + maxBatchRows = in.readLong(); + maxBatchSizeBytes = in.readLong(); switch (dataSourceType) { case KAFKA: { @@ -1027,8 +1087,19 @@ public void readFields(DataInput in) throws IOException { throw new IOException("unknown data source type: " + dataSourceType); } + createTimestamp = in.readLong(); + endTimestamp = in.readLong(); + currentErrorRows = in.readLong(); currentTotalRows = in.readLong(); + errorRows = in.readLong(); + totalRows = in.readLong(); + unselectedRows = in.readLong(); + receivedBytes = in.readLong(); + totalTaskExcutionTimeMs = in.readLong(); + committedTaskNum = in.readLong(); + abortedTaskNum = in.readLong(); + origStmt = Text.readString(in); // parse the origin stmt to get routine load desc diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java index a3153ed115865f..97a79e8162362d 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java @@ -58,7 +58,7 @@ public class RoutineLoadManager implements Writable { private static final Logger LOG = LogManager.getLogger(RoutineLoadManager.class); - private static final int DEFAULT_BE_CONCURRENT_TASK_NUM = 100; + private static final int DEFAULT_BE_CONCURRENT_TASK_NUM = 10; // Long is beId, integer is the size of tasks in be private Map beIdToMaxConcurrentTasks = Maps.newHashMap(); diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java index c535da4947bbe9..7d8bd8004e3827 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java @@ -18,7 +18,6 @@ package org.apache.doris.load.routineload; -import com.google.common.collect.Lists; import org.apache.doris.catalog.Catalog; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.LabelAlreadyUsedException; @@ -30,6 +29,8 @@ import org.apache.doris.transaction.BeginTransactionException; import org.apache.doris.transaction.TransactionState; +import com.google.common.collect.Lists; + import java.util.List; import java.util.UUID; @@ -107,6 +108,10 @@ public long getTxnId() { return txnId; } + public boolean isRunning() { + return loadStartTimeMs > 0; + } + abstract TRoutineLoadTask createRoutineLoadTask() throws LoadException, UserException; public void beginTxn() throws LabelAlreadyUsedException, BeginTransactionException, AnalysisException { diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java index d4a50a73184857..b34703adfce864 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java @@ -59,7 +59,7 @@ public class RoutineLoadTaskScheduler extends Daemon { private static final long BACKEND_SLOT_UPDATE_INTERVAL_MS = 10000; // 10s private RoutineLoadManager routineLoadManager; - private LinkedBlockingQueue needScheduleTasksQueue; + private LinkedBlockingQueue needScheduleTasksQueue = Queues.newLinkedBlockingQueue(); private long lastBackendSlotUpdateTime = -1; @@ -67,13 +67,11 @@ public class RoutineLoadTaskScheduler extends Daemon { public RoutineLoadTaskScheduler() { super("routine load task", 0); this.routineLoadManager = Catalog.getInstance().getRoutineLoadManager(); - this.needScheduleTasksQueue = Queues.newLinkedBlockingQueue(); } public RoutineLoadTaskScheduler(RoutineLoadManager routineLoadManager) { super("routine load task", 0); this.routineLoadManager = routineLoadManager; - this.needScheduleTasksQueue = Queues.newLinkedBlockingQueue(); } @Override @@ -157,10 +155,6 @@ private void updateBackendSlotIfNecessary() { } } - public void addTaskInQueue(RoutineLoadTaskInfo routineLoadTaskInfo) { - needScheduleTasksQueue.add(routineLoadTaskInfo); - } - public void addTaskInQueue(List routineLoadTaskInfoList) { needScheduleTasksQueue.addAll(routineLoadTaskInfoList); } diff --git a/fe/src/main/java/org/apache/doris/metric/MetricRepo.java b/fe/src/main/java/org/apache/doris/metric/MetricRepo.java index 21c38ac3cff272..384da7305bbd55 100644 --- a/fe/src/main/java/org/apache/doris/metric/MetricRepo.java +++ b/fe/src/main/java/org/apache/doris/metric/MetricRepo.java @@ -61,8 +61,13 @@ public final class MetricRepo { public static LongCounterMetric COUNTER_EDIT_LOG_SIZE_BYTES; public static LongCounterMetric COUNTER_IMAGE_WRITE; public static LongCounterMetric COUNTER_IMAGE_PUSH; + public static LongCounterMetric COUNTER_TXN_BEGIN; public static LongCounterMetric COUNTER_TXN_FAILED; public static LongCounterMetric COUNTER_TXN_SUCCESS; + public static LongCounterMetric COUNTER_ROUTINE_LOAD_ROWS; + public static LongCounterMetric COUNTER_ROUTINE_LOAD_RECEIVED_BYTES; + public static LongCounterMetric COUNTER_ROUTINE_LOAD_ERROR_ROWS; + public static Histogram HISTO_QUERY_LATENCY; public static Histogram HISTO_EDIT_LOG_WRITE_LATENCY; @@ -182,13 +187,24 @@ public Long getValue() { COUNTER_IMAGE_PUSH = new LongCounterMetric("image_push", "counter of image succeeded in pushing to other frontends"); PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_IMAGE_PUSH); - COUNTER_TXN_SUCCESS = new LongCounterMetric("txn_success", - "counter of success transactions"); + + COUNTER_TXN_BEGIN = new LongCounterMetric("txn_begin", "counter of begining transactions"); + PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_TXN_BEGIN); + COUNTER_TXN_FAILED = new LongCounterMetric("txn_failed", "counter of failed transactions"); + COUNTER_TXN_SUCCESS = new LongCounterMetric("txn_success", "counter of success transactions"); PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_TXN_SUCCESS); - COUNTER_TXN_FAILED = new LongCounterMetric("txn_failed", - "counter of failed transactions"); + COUNTER_TXN_FAILED = new LongCounterMetric("txn_failed", "counter of failed transactions"); PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_TXN_FAILED); + COUNTER_ROUTINE_LOAD_ROWS = new LongCounterMetric("routine_load_rows", "total rows of routine load"); + PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_ROUTINE_LOAD_ROWS); + COUNTER_ROUTINE_LOAD_RECEIVED_BYTES = new LongCounterMetric("routine_load_receive_bytes", + "total received bytes of routine load"); + PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_ROUTINE_LOAD_RECEIVED_BYTES); + COUNTER_ROUTINE_LOAD_ERROR_ROWS = new LongCounterMetric("routine_load_error_rows", + "total error rows of routine load"); + PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_ROUTINE_LOAD_ERROR_ROWS); + // 3. histogram HISTO_QUERY_LATENCY = METRIC_REGISTER.histogram(MetricRegistry.name("query", "latency", "ms")); HISTO_EDIT_LOG_WRITE_LATENCY = METRIC_REGISTER.histogram(MetricRegistry.name("editlog", "write", "latency", diff --git a/fe/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 60dde9ca014067..0ef9ee47dc3427 100644 --- a/fe/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -331,7 +331,8 @@ public TFetchResourceResult fetchResource() throws TException { @Override public TFeResult miniLoad(TMiniLoadRequest request) throws TException { - LOG.info("mini load request is {}", request); + LOG.info("receive mini load request: label: {}, db: {}, tbl: {}, backend: {}", + request.getLabel(), request.getDb(), request.getTbl(), request.getBackend()); ConnectContext context = new ConnectContext(null); String cluster = SystemInfoService.DEFAULT_CLUSTER; @@ -508,7 +509,7 @@ private void checkPasswordAndPrivs(String cluster, String user, String passwd, S @Override public TFeResult loadCheck(TLoadCheckRequest request) throws TException { - LOG.info("load check request. label: {}, user: {}, ip: {}", + LOG.info("receive load check request. label: {}, user: {}, ip: {}", request.getLabel(), request.getUser(), request.getUser_ip()); TStatus status = new TStatus(TStatusCode.OK); @@ -539,9 +540,10 @@ public TFeResult loadCheck(TLoadCheckRequest request) throws TException { public TLoadTxnBeginResult loadTxnBegin(TLoadTxnBeginRequest request) throws TException { TNetworkAddress clientAddr = getClientAddr(); - LOG.info("receive loadTxnBegin request, db: {}, tbl: {}, label: {}, backend: {}", + LOG.info("receive txn begin request, db: {}, tbl: {}, label: {}, backend: {}", request.getDb(), request.getTbl(), request.getLabel(), clientAddr == null ? "unknown" : clientAddr.getHostname()); + LOG.debug("txn begin request: {}", request); TLoadTxnBeginResult result = new TLoadTxnBeginResult(); @@ -598,7 +600,10 @@ private long loadTxnBeginImpl(TLoadTxnBeginRequest request) throws UserException @Override public TLoadTxnCommitResult loadTxnCommit(TLoadTxnCommitRequest request) throws TException { - LOG.info("receive loadTxnCommit request, request={}", request); + LOG.info("receive txn commit request. db: {}, tbl: {}, txn id: {}", + request.getDb(), request.getTbl(), request.getTxnId()); + LOG.debug("txn commit request: {}", request); + TLoadTxnCommitResult result = new TLoadTxnCommitResult(); TStatus status = new TStatus(TStatusCode.OK); result.setStatus(status); @@ -654,7 +659,9 @@ private boolean loadTxnCommitImpl(TLoadTxnCommitRequest request) throws UserExce @Override public TLoadTxnRollbackResult loadTxnRollback(TLoadTxnRollbackRequest request) throws TException { - LOG.info("receive loadTxnRollback request, request={}", request); + LOG.info("receive txn rollback request. db: {}, tbl: {}, txn id: {}, reason: {}", + request.getDb(), request.getTbl(), request.getTxnId(), request.getReason()); + LOG.debug("txn rollback request: {}", request); TLoadTxnRollbackResult result = new TLoadTxnRollbackResult(); TStatus status = new TStatus(TStatusCode.OK); @@ -694,7 +701,9 @@ private void loadTxnRollbackImpl(TLoadTxnRollbackRequest request) throws UserExc @Override public TStreamLoadPutResult streamLoadPut(TStreamLoadPutRequest request) throws TException { - LOG.info("receive streamLoadPut request, request={}", request); + LOG.info("receive stream load put request. db:{}, tbl: {}, txn id: {}", + request.getDb(), request.getTbl(), request.getTxnId()); + LOG.debug("stream load put request: {}", request); TStreamLoadPutResult result = new TStreamLoadPutResult(); TStatus status = new TStatus(TStatusCode.OK); diff --git a/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java b/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java index 9f4a466d5400aa..99648d5ca6265a 100644 --- a/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java +++ b/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java @@ -39,6 +39,7 @@ import org.apache.doris.common.util.TimeUtils; import org.apache.doris.common.util.Util; import org.apache.doris.load.Load; +import org.apache.doris.metric.MetricRepo; import org.apache.doris.persist.EditLog; import org.apache.doris.task.AgentTaskQueue; import org.apache.doris.task.PublishVersionTask; @@ -156,6 +157,11 @@ public long beginTransaction(long dbId, String label, long timestamp, coordinator, listenerId); transactionState.setPrepareTime(System.currentTimeMillis()); unprotectUpsertTransactionState(transactionState); + + if (MetricRepo.isInit.get()) { + MetricRepo.COUNTER_TXN_SUCCESS.increase(1L); + } + return tid; } finally { writeUnlock(); diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index 35e9827289205f..64057b95a05fa8 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -513,10 +513,11 @@ struct TRLTaskTxnCommitAttachment { 3: required i64 jobId 4: optional i64 loadedRows 5: optional i64 filteredRows - 6: optional i64 receivedBytes - 7: optional i64 loadedBytes - 8: optional i64 loadCostMs - 9: optional TKafkaRLTaskProgress kafkaRLTaskProgress + 6: optional i64 unselectedRows + 7: optional i64 receivedBytes + 8: optional i64 loadedBytes + 9: optional i64 loadCostMs + 10: optional TKafkaRLTaskProgress kafkaRLTaskProgress } struct TTxnCommitAttachment {