diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp index d8a8d924d767f0..067f734be5d515 100644 --- a/be/src/agent/task_worker_pool.cpp +++ b/be/src/agent/task_worker_pool.cpp @@ -868,6 +868,8 @@ void* TaskWorkerPool::_publish_version_worker_thread_callback(void* arg_this) { publish_version_req = agent_task_req.publish_version_req; worker_pool_this->_tasks.pop_front(); } + + DorisMetrics::publish_task_request_total.increment(1); LOG(INFO)<< "get publish version task, signature:" << agent_task_req.signature; TStatusCode::type status_code = TStatusCode::OK; @@ -898,6 +900,7 @@ void* TaskWorkerPool::_publish_version_worker_thread_callback(void* arg_this) { OLAP_LOG_WARNING("publish version failed. signature: %ld", agent_task_req.signature); error_msgs.push_back("publish version failed"); finish_task_request.__set_error_tablet_ids(error_tablet_ids); + DorisMetrics::publish_task_failed_total.increment(1); } else { LOG(INFO) << "publish_version success. signature:" << agent_task_req.signature; } diff --git a/be/src/common/config.h b/be/src/common/config.h index 633defec052533..f3b664b4e5a64b 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -377,7 +377,7 @@ namespace config { CONF_Int32(tablet_stat_cache_update_interval_second, "300"); // result buffer cancelled time (unit: second) - CONF_Int32(result_buffer_cancelled_interval_time, "5"); + CONF_Int32(result_buffer_cancelled_interval_time, "300"); // can perform recovering tablet CONF_Bool(force_recovery, "false"); @@ -395,7 +395,7 @@ namespace config { CONF_Int32(txn_commit_rpc_timeout_ms, "10000"); // If set to true, metric calculator will run - CONF_Bool(enable_metric_calculator, "false"); + CONF_Bool(enable_metric_calculator, "true"); // max consumer num in one data consumer group, for routine load CONF_Int32(max_consumer_num_per_group, "3"); diff --git a/be/src/olap/olap_engine.cpp b/be/src/olap/olap_engine.cpp index 2b4ba8a66ac813..f16791b7c5005e 100644 --- a/be/src/olap/olap_engine.cpp +++ b/be/src/olap/olap_engine.cpp @@ -1832,17 +1832,21 @@ void OLAPEngine::perform_cumulative_compaction() { CumulativeCompaction cumulative_compaction; OLAPStatus res = cumulative_compaction.init(best_table); if (res != OLAP_SUCCESS) { - DorisMetrics::cumulative_compaction_request_failed.increment(1); - LOG(WARNING) << "failed to init cumulative compaction." - << "table=" << best_table->full_name(); + if (res != OLAP_ERR_CUMULATIVE_REPEAT_INIT && res != OLAP_ERR_CE_TRY_CE_LOCK_ERROR) { + DorisMetrics::cumulative_compaction_request_failed.increment(1); + LOG(WARNING) << "failed to init cumulative compaction" + << ", table=" << best_table->full_name() + << ", res=" << res; + } return; } res = cumulative_compaction.run(); if (res != OLAP_SUCCESS) { DorisMetrics::cumulative_compaction_request_failed.increment(1); - LOG(WARNING) << "failed to do cumulative compaction." - << "table=" << best_table->full_name(); + LOG(WARNING) << "failed to do cumulative compaction" + << ", table=" << best_table->full_name() + << ", res=" << res; return; } } @@ -1855,17 +1859,21 @@ void OLAPEngine::perform_base_compaction() { BaseCompaction base_compaction; OLAPStatus res = base_compaction.init(best_table); if (res != OLAP_SUCCESS) { - DorisMetrics::base_compaction_request_failed.increment(1); - LOG(WARNING) << "failed to init base compaction." - << "table=" << best_table->full_name(); + if (res != OLAP_ERR_BE_TRY_BE_LOCK_ERROR && res != OLAP_ERR_BE_NO_SUITABLE_VERSION) { + DorisMetrics::base_compaction_request_failed.increment(1); + LOG(WARNING) << "failed to init base compaction" + << ", table=" << best_table->full_name() + << ", res=" << res; + } return; } res = base_compaction.run(); if (res != OLAP_SUCCESS) { DorisMetrics::base_compaction_request_failed.increment(1); - LOG(WARNING) << "failed to init base compaction." - << "table=" << best_table->full_name(); + LOG(WARNING) << "failed to init base compaction" + << ", table=" << best_table->full_name() + << ", res=" << res; return; } } diff --git a/be/src/runtime/stream_load/stream_load_executor.cpp b/be/src/runtime/stream_load/stream_load_executor.cpp index 432d0c5b5ae62d..3bfc4d2a2b4ccd 100644 --- a/be/src/runtime/stream_load/stream_load_executor.cpp +++ b/be/src/runtime/stream_load/stream_load_executor.cpp @@ -26,6 +26,7 @@ #include "runtime/runtime_state.h" #include "runtime/stream_load/stream_load_context.h" #include "util/frontend_helper.h" +#include "util/doris_metrics.h" #include "gen_cpp/FrontendService.h" #include "gen_cpp/FrontendService_types.h" @@ -42,6 +43,7 @@ Status k_stream_load_plan_status; #endif Status StreamLoadExecutor::execute_plan_fragment(StreamLoadContext* ctx) { + DorisMetrics::txn_exec_plan_total.increment(1); // submit this params #ifndef BE_TEST ctx->ref(); @@ -66,11 +68,13 @@ Status StreamLoadExecutor::execute_plan_fragment(StreamLoadContext* ctx) { } if (ctx->number_filtered_rows > 0 && !executor->runtime_state()->get_error_log_file_path().empty()) { - - // if (ctx->load_type == TLoadType::MANUL_LOAD) { - ctx->error_url = to_load_error_http_path( + ctx->error_url = to_load_error_http_path( executor->runtime_state()->get_error_log_file_path()); - // } + } + + if (status.ok()) { + DorisMetrics::stream_receive_bytes_total.increment(ctx->receive_bytes); + DorisMetrics::stream_load_rows_total.increment(ctx->number_loaded_rows); } } else { LOG(WARNING) << "fragment execute failed" @@ -108,8 +112,9 @@ Status StreamLoadExecutor::execute_plan_fragment(StreamLoadContext* ctx) { } Status StreamLoadExecutor::begin_txn(StreamLoadContext* ctx) { - TNetworkAddress master_addr = _exec_env->master_info()->network_address; + DorisMetrics::txn_begin_request_total.increment(1); + TNetworkAddress master_addr = _exec_env->master_info()->network_address; TLoadTxnBeginRequest request; set_request_auth(&request, ctx->auth); request.db = ctx->db; @@ -141,8 +146,9 @@ Status StreamLoadExecutor::begin_txn(StreamLoadContext* ctx) { } Status StreamLoadExecutor::commit_txn(StreamLoadContext* ctx) { - TNetworkAddress master_addr = _exec_env->master_info()->network_address; + DorisMetrics::txn_commit_request_total.increment(1); + TNetworkAddress master_addr = _exec_env->master_info()->network_address; TLoadTxnCommitRequest request; set_request_auth(&request, ctx->auth); request.db = ctx->db; @@ -183,6 +189,8 @@ Status StreamLoadExecutor::commit_txn(StreamLoadContext* ctx) { } void StreamLoadExecutor::rollback_txn(StreamLoadContext* ctx) { + DorisMetrics::txn_rollback_request_total.increment(1); + TNetworkAddress master_addr = _exec_env->master_info()->network_address; TLoadTxnRollbackRequest request; set_request_auth(&request, ctx->auth); diff --git a/be/src/util/doris_metrics.cpp b/be/src/util/doris_metrics.cpp index 88ab785121f2b0..3a8158e720c360 100644 --- a/be/src/util/doris_metrics.cpp +++ b/be/src/util/doris_metrics.cpp @@ -80,11 +80,21 @@ IntCounter DorisMetrics::cumulative_compaction_bytes_total; IntCounter DorisMetrics::cumulative_compaction_request_total; IntCounter DorisMetrics::cumulative_compaction_request_failed; +IntCounter DorisMetrics::publish_task_request_total; +IntCounter DorisMetrics::publish_task_failed_total; + IntCounter DorisMetrics::meta_write_request_total; IntCounter DorisMetrics::meta_write_request_duration_us; IntCounter DorisMetrics::meta_read_request_total; IntCounter DorisMetrics::meta_read_request_duration_us; +IntCounter DorisMetrics::txn_begin_request_total; +IntCounter DorisMetrics::txn_commit_request_total; +IntCounter DorisMetrics::txn_rollback_request_total; +IntCounter DorisMetrics::txn_exec_plan_total; +IntCounter DorisMetrics::stream_receive_bytes_total; +IntCounter DorisMetrics::stream_load_rows_total; + // gauges IntGauge DorisMetrics::memory_pool_bytes_total; IntGauge DorisMetrics::process_thread_num; @@ -176,6 +186,9 @@ void DorisMetrics::initialize( REGISTER_ENGINE_REQUEST_METRIC(cumulative_compaction, total, cumulative_compaction_request_total); REGISTER_ENGINE_REQUEST_METRIC(cumulative_compaction, failed, cumulative_compaction_request_failed); + REGISTER_ENGINE_REQUEST_METRIC(publish, total, publish_task_request_total); + REGISTER_ENGINE_REQUEST_METRIC(publish, failed, publish_task_failed_total); + _metrics->register_metric( "compaction_deltas_total", MetricLabels().add("type", "base"), &base_compaction_deltas_total); @@ -202,6 +215,26 @@ void DorisMetrics::initialize( "meta_request_duration", MetricLabels().add("type", "read"), &meta_read_request_duration_us); + _metrics->register_metric( + "txn_request", MetricLabels().add("type", "begin"), + &txn_begin_request_total); + _metrics->register_metric( + "txn_request", MetricLabels().add("type", "commit"), + &txn_commit_request_total); + _metrics->register_metric( + "txn_request", MetricLabels().add("type", "rollback"), + &txn_rollback_request_total); + _metrics->register_metric( + "txn_request", MetricLabels().add("type", "exec"), + &txn_exec_plan_total); + + _metrics->register_metric( + "stream_load", MetricLabels().add("type", "receive_bytes"), + &stream_receive_bytes_total); + _metrics->register_metric( + "stream_load", MetricLabels().add("type", "load_rows"), + &stream_load_rows_total); + // Gauge REGISTER_DORIS_METRIC(memory_pool_bytes_total); REGISTER_DORIS_METRIC(process_thread_num); diff --git a/be/src/util/doris_metrics.h b/be/src/util/doris_metrics.h index 97c6fb54fc6e24..d0582d2b8f0bf6 100644 --- a/be/src/util/doris_metrics.h +++ b/be/src/util/doris_metrics.h @@ -100,14 +100,21 @@ class DorisMetrics { static IntCounter cumulative_compaction_deltas_total; static IntCounter cumulative_compaction_bytes_total; - static IntCounter alter_task_success_total; - static IntCounter alter_task_failed_total; + static IntCounter publish_task_request_total; + static IntCounter publish_task_failed_total; static IntCounter meta_write_request_total; static IntCounter meta_write_request_duration_us; static IntCounter meta_read_request_total; static IntCounter meta_read_request_duration_us; + static IntCounter txn_begin_request_total; + static IntCounter txn_commit_request_total; + static IntCounter txn_rollback_request_total; + static IntCounter txn_exec_plan_total; + static IntCounter stream_receive_bytes_total; + static IntCounter stream_load_rows_total; + // Gauges static IntGauge memory_pool_bytes_total; static IntGauge process_thread_num; diff --git a/be/src/util/system_metrics.cpp b/be/src/util/system_metrics.cpp index 5cc1a37c2e2479..ee43b209ec7ce8 100644 --- a/be/src/util/system_metrics.cpp +++ b/be/src/util/system_metrics.cpp @@ -139,7 +139,7 @@ void SystemMetrics::_update_cpu_metrics() { if (getline(&_line_ptr, &_line_buf_size, fp) < 0) { char buf[64]; - LOG(WARNING) << "geline failed, errno=" << errno + LOG(WARNING) << "getline failed, errno=" << errno << ", message=" << strerror_r(errno, buf, 64); fclose(fp); return; diff --git a/be/test/util/system_metrics_test.cpp b/be/test/util/system_metrics_test.cpp index 818e7873d9dad2..9936223d6915ef 100644 --- a/be/test/util/system_metrics_test.cpp +++ b/be/test/util/system_metrics_test.cpp @@ -111,6 +111,7 @@ TEST_F(SystemMetricsTest, normal) { network_interfaces.emplace_back("xgbe0"); SystemMetrics metrics; metrics.install(®istry, disk_devices, network_interfaces); + metrics.update(); TestMetricsVisitor visitor; registry.collect(&visitor); diff --git a/fe/src/main/java/org/apache/doris/backup/RestoreJob.java b/fe/src/main/java/org/apache/doris/backup/RestoreJob.java index 36e6a660b00308..997ec82b9f75b9 100644 --- a/fe/src/main/java/org/apache/doris/backup/RestoreJob.java +++ b/fe/src/main/java/org/apache/doris/backup/RestoreJob.java @@ -680,7 +680,7 @@ private void checkAndPrepareMeta() { // Send create replica task to BE outside the db lock if (batchTask.getTaskNum() > 0) { - MarkedCountDownLatch latch = new MarkedCountDownLatch(batchTask.getTaskNum()); + MarkedCountDownLatch latch = new MarkedCountDownLatch(batchTask.getTaskNum()); for (AgentTask task : batchTask.getAllTasks()) { latch.addMark(((CreateReplicaTask) task).getBackendId(), ((CreateReplicaTask) task).getTabletId()); ((CreateReplicaTask) task).setLatch(latch); diff --git a/fe/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/src/main/java/org/apache/doris/catalog/Catalog.java index da9d5253b4d740..2a3bd89da51d8a 100644 --- a/fe/src/main/java/org/apache/doris/catalog/Catalog.java +++ b/fe/src/main/java/org/apache/doris/catalog/Catalog.java @@ -3263,7 +3263,7 @@ private Partition createPartitionWithIndices(String clusterName, long dbId, long TStorageType storageType = indexIdToStorageType.get(indexId); List schema = indexIdToSchema.get(indexId); int totalTaskNum = index.getTablets().size() * replicationNum; - MarkedCountDownLatch countDownLatch = new MarkedCountDownLatch(totalTaskNum); + MarkedCountDownLatch countDownLatch = new MarkedCountDownLatch(totalTaskNum); AgentBatchTask batchTask = new AgentBatchTask(); for (Tablet tablet : index.getTablets()) { long tabletId = tablet.getId(); diff --git a/fe/src/main/java/org/apache/doris/catalog/Partition.java b/fe/src/main/java/org/apache/doris/catalog/Partition.java index 791cfdd36ae915..4dec9661808b64 100644 --- a/fe/src/main/java/org/apache/doris/catalog/Partition.java +++ b/fe/src/main/java/org/apache/doris/catalog/Partition.java @@ -197,7 +197,7 @@ public void setNextVersionHash(long nextVersionHash, long committedVersionHash) } public long getCommittedVersion() { - return Math.max(this.nextVersion - 1, 2); + return this.nextVersion - 1; } public long getCommittedVersionHash() { diff --git a/fe/src/main/java/org/apache/doris/common/Config.java b/fe/src/main/java/org/apache/doris/common/Config.java index a47bedcfcc4ae5..a3100635923163 100644 --- a/fe/src/main/java/org/apache/doris/common/Config.java +++ b/fe/src/main/java/org/apache/doris/common/Config.java @@ -774,7 +774,7 @@ public class Config extends ConfigBase { /* * If set to true, metric collector will be run as a daemon timer to collect metrics at fix interval */ - @ConfField public static boolean enable_metric_calculator = false; + @ConfField public static boolean enable_metric_calculator = true; /* * the max concurrent task num of a routine load task diff --git a/fe/src/main/java/org/apache/doris/common/MarkedCountDownLatch.java b/fe/src/main/java/org/apache/doris/common/MarkedCountDownLatch.java index 85e680427a82b3..bc25b3877f2b8f 100644 --- a/fe/src/main/java/org/apache/doris/common/MarkedCountDownLatch.java +++ b/fe/src/main/java/org/apache/doris/common/MarkedCountDownLatch.java @@ -25,20 +25,20 @@ import java.util.Map.Entry; import java.util.concurrent.CountDownLatch; -public class MarkedCountDownLatch extends CountDownLatch { +public class MarkedCountDownLatch extends CountDownLatch { - private Multimap marks; + private Multimap marks; public MarkedCountDownLatch(int count) { super(count); marks = HashMultimap.create(); } - public void addMark(long key, long value) { + public void addMark(K key, V value) { marks.put(key, value); } - public synchronized boolean markedCountDown(long key, long value) { + public synchronized boolean markedCountDown(K key, V value) { if (marks.remove(key, value)) { super.countDown(); return true; @@ -46,7 +46,13 @@ public synchronized boolean markedCountDown(long key, long value) { return false; } - public synchronized List> getLeftMarks() { + public synchronized List> getLeftMarks() { return Lists.newArrayList(marks.entries()); } + + public synchronized void countDownToZero() { + while(getCount() > 0) { + super.countDown(); + } + } } diff --git a/fe/src/main/java/org/apache/doris/load/ExportJob.java b/fe/src/main/java/org/apache/doris/load/ExportJob.java index dd0291b59414fd..2f5ecef6c2b450 100644 --- a/fe/src/main/java/org/apache/doris/load/ExportJob.java +++ b/fe/src/main/java/org/apache/doris/load/ExportJob.java @@ -291,11 +291,9 @@ private List genCoordinators(List fragments, List countDownLatch = new MarkedCountDownLatch(totalReplicaNum); for (AgentTask task : deleteBatchTask.getAllTasks()) { countDownLatch.addMark(task.getBackendId(), task.getSignature()); ((PushTask) task).setCountDownLatch(countDownLatch); diff --git a/fe/src/main/java/org/apache/doris/planner/OlapRewriteNode.java b/fe/src/main/java/org/apache/doris/planner/OlapRewriteNode.java index 414b79c79df5a9..4759dba8b9cdc9 100644 --- a/fe/src/main/java/org/apache/doris/planner/OlapRewriteNode.java +++ b/fe/src/main/java/org/apache/doris/planner/OlapRewriteNode.java @@ -24,16 +24,18 @@ import org.apache.doris.analysis.TupleDescriptor; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Table; +import org.apache.doris.common.UserException; import org.apache.doris.thrift.TExplainLevel; import org.apache.doris.thrift.TOlapRewriteNode; import org.apache.doris.thrift.TPlanNode; import org.apache.doris.thrift.TPlanNodeType; -import org.apache.doris.common.UserException; -import com.google.common.collect.Lists; import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; + import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; + import java.util.List; // Used to convert column to valid OLAP table @@ -108,7 +110,6 @@ public void computeStats(Analyzer analyzer) { cardinality = Math.round(((double) getChild(0).cardinality) * computeSelectivity()); Preconditions.checkState(cardinality >= 0); } - LOG.info("stats Select: cardinality=" + Long.toString(cardinality)); } @Override diff --git a/fe/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/src/main/java/org/apache/doris/qe/Coordinator.java index e709ed68e5fda7..af5c3cfbc5c56f 100644 --- a/fe/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/src/main/java/org/apache/doris/qe/Coordinator.java @@ -21,6 +21,7 @@ import org.apache.doris.analysis.DescriptorTable; import org.apache.doris.catalog.Catalog; import org.apache.doris.common.Config; +import org.apache.doris.common.MarkedCountDownLatch; import org.apache.doris.common.Pair; import org.apache.doris.common.Reference; import org.apache.doris.common.Status; @@ -63,7 +64,6 @@ import org.apache.doris.thrift.TQueryType; import org.apache.doris.thrift.TReportExecStatusParams; import org.apache.doris.thrift.TResourceInfo; -import org.apache.doris.thrift.TResultBatch; import org.apache.doris.thrift.TScanRange; import org.apache.doris.thrift.TScanRangeLocation; import org.apache.doris.thrift.TScanRangeLocations; @@ -77,6 +77,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.collect.Sets; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -94,7 +95,6 @@ import java.util.Random; import java.util.Set; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -151,12 +151,13 @@ public class Coordinator { private ConcurrentMap backendExecStateMap = Maps.newConcurrentMap(); private List scanNodes; + // number of instances of this query, equals to // number of backends executing plan fragments on behalf of this query; // set in computeFragmentExecParams(); // same as backend_exec_states_.size() after Exec() - private int numBackends; - - private CountDownLatch profileDoneSignal; + private Set instanceIds = Sets.newHashSet(); + // instance id -> dummy value + private MarkedCountDownLatch profileDoneSignal; private boolean isBlockQuery; @@ -172,12 +173,13 @@ public class Coordinator { private List commitInfos = Lists.newArrayList(); // Input parameter + private long jobId = -1; // job which this task belongs to private TUniqueId queryId; private TResourceInfo tResourceInfo; private boolean needReport; private String clusterName; - // paralle execute + // parallel execute private final TUniqueId nextInstanceId; // Used for query @@ -200,9 +202,10 @@ public Coordinator(ConnectContext context, Analyzer analyzer, Planner planner) { } // Used for pull load task coordinator - public Coordinator(TUniqueId queryId, DescriptorTable descTable, + public Coordinator(Long jobId, TUniqueId queryId, DescriptorTable descTable, List fragments, List scanNodes, String cluster) { this.isBlockQuery = true; + this.jobId = jobId; this.queryId = queryId; this.descTable = descTable.toThrift(); this.fragments = fragments; @@ -217,6 +220,10 @@ public Coordinator(TUniqueId queryId, DescriptorTable descTable, nextInstanceId.setLo(queryId.lo + 1); } + public long getJobId() { + return jobId; + } + public TUniqueId getQueryId() { return queryId; } @@ -385,7 +392,10 @@ public void exec() throws Exception { // to keep things simple, make async Cancel() calls wait until plan fragment // execution has been initiated, otherwise we might try to cancel fragment // execution at backends where it hasn't even started - profileDoneSignal = new CountDownLatch(numBackends); + profileDoneSignal = new MarkedCountDownLatch(instanceIds.size()); + for (TUniqueId instanceId : instanceIds) { + profileDoneSignal.addMark(instanceId, -1L /* value is meaningless */); + } lock(); try { // execute all instances from up to bottom @@ -539,7 +549,7 @@ private void updateCommitInfos(List commitInfos) { } } - void updateStatus(Status status) { + private void updateStatus(Status status, TUniqueId instanceId) { lock.lock(); try { // The query is done and we are just waiting for remote fragments to clean up. @@ -558,7 +568,8 @@ void updateStatus(Status status) { } queryStatus.setStatus(status); - LOG.warn("One instance report fail throw updateStatus(), need cancel"); + LOG.warn("one instance report fail throw updateStatus(), need cancel. job id: {}, query id: {}, instance id: {}", + jobId, DebugUtil.printId(queryId), instanceId != null ? DebugUtil.printId(instanceId) : "NaN"); cancelInternal(); } finally { lock.unlock(); @@ -575,13 +586,17 @@ public RowBatch getNext() throws Exception { resultBatch = receiver.getNext(status); if (!status.ok()) { - LOG.warn("get next fail, need cancel"); + LOG.warn("get next fail, need cancel. query id: {}", DebugUtil.printId(queryId)); } - updateStatus(status); + updateStatus(status, null /* no instance id */); + Status copyStatus = null; lock(); - Status copyStatus = new Status(queryStatus); - unlock(); + try { + copyStatus = new Status(queryStatus); + } finally { + unlock(); + } if (!copyStatus.ok()) { if (Strings.isNullOrEmpty(copyStatus.getErrorMsg())) { @@ -608,7 +623,7 @@ public RowBatch getNext() throws Exception { // if this query is a block query do not cancel. Long numLimitRows = fragments.get(0).getPlanRoot().getLimit(); boolean hasLimit = numLimitRows > 0; - if (!isBlockQuery && numBackends > 1 && hasLimit && numReceivedRows >= numLimitRows) { + if (!isBlockQuery && instanceIds.size() > 1 && hasLimit && numReceivedRows >= numLimitRows) { LOG.debug("no block query, return num >= limit rows, need cancel"); cancelInternal(); } @@ -643,6 +658,11 @@ private void cancelInternal() { receiver.cancel(); } cancelRemoteFragmentsAsync(); + if (profileDoneSignal != null) { + // count down to zero to notify all objects waiting for this + profileDoneSignal.countDownToZero(); + LOG.info("unfinished instance: {}", profileDoneSignal.getLeftMarks()); + } } private void cancelRemoteFragmentsAsync() { @@ -690,7 +710,7 @@ private void computeFragmentExecParams() throws Exception { computeFragmentHosts(); // assign instance ids - numBackends = 0; + instanceIds.clear(); for (FragmentExecParams params : fragmentExecParamsMap.values()) { LOG.debug("fragment {} has instances {}", params.fragment.getFragmentId(), params.instanceExecParams.size()); for (int j = 0; j < params.instanceExecParams.size(); ++j) { @@ -698,10 +718,9 @@ private void computeFragmentExecParams() throws Exception { // globally-unique instance id TUniqueId instanceId = new TUniqueId(); instanceId.setHi(queryId.hi); - instanceId.setLo(queryId.lo + numBackends + 1); + instanceId.setLo(queryId.lo + instanceIds.size() + 1); params.instanceExecParams.get(j).instanceId = instanceId; - - numBackends++; + instanceIds.add(instanceId); } } @@ -845,7 +864,7 @@ private void computeFragmentHosts() throws Exception { params.instanceExecParams.add(instanceParam); } } else { - //normat fragment + // normal fragment Iterator iter = fragmentExecParamsMap.get(fragment.getFragmentId()).scanRangeAssignment.entrySet().iterator(); int parallelExecInstanceNum = fragment.getParallel_exec_num(); while (iter.hasNext()) { @@ -892,8 +911,14 @@ private boolean isColocateJoin(PlanNode node) { return false; } - if (ConnectContext.get().getSessionVariable().isDisableColocateJoin()) { - return false; + // TODO(cmy): some internal process, such as broker load task, do not have ConnectContext. + // Any configurations needed by the Coordinator should be passed in Coordinator initialization. + // Refine this later. + // Currently, just ignore the session variables if ConnectContext does not exist + if (ConnectContext.get() != null) { + if (ConnectContext.get().getSessionVariable().isDisableColocateJoin()) { + return false; + } } //cache the colocateFragmentIds @@ -1055,7 +1080,8 @@ private void computeScanRangeAssignmentByScheduler( public void updateFragmentExecStatus(TReportExecStatusParams params) { if (params.backend_num >= backendExecStates.size()) { - LOG.error("unknown backend number"); + LOG.error("unknown backend number: {}, expected less than: {}", + params.backend_num, backendExecStates.size()); return; } @@ -1090,9 +1116,8 @@ public void updateFragmentExecStatus(TReportExecStatusParams params) { // (UpdateStatus() initiates cancellation, if it hasn't already been initiated) if (!(returnedAllResults && status.isCancelled()) && !status.ok()) { LOG.warn("One instance report fail, query_id={} instance_id={}", - DebugUtil.printId(queryId), - DebugUtil.printId(params.getFragment_instance_id())); - updateStatus(status); + DebugUtil.printId(queryId), DebugUtil.printId(params.getFragment_instance_id())); + updateStatus(status, params.getFragment_instance_id()); } if (done) { if (params.isSetDelta_urls()) { @@ -1110,7 +1135,7 @@ public void updateFragmentExecStatus(TReportExecStatusParams params) { if (params.isSetCommitInfos()) { updateCommitInfos(params.getCommitInfos()); } - profileDoneSignal.countDown(); + profileDoneSignal.markedCountDown(params.getFragment_instance_id(), -1L); } return; @@ -1162,7 +1187,6 @@ class BucketSeqToScanRange extends HashMap bucketSeqToAddress = Maps.newHashMap(); private Set colocateFragmentIds = new HashSet<>(); - // record backend execute state // TODO(zhaochun): add profile information and others public class BackendExecState { diff --git a/fe/src/main/java/org/apache/doris/qe/QeProcessorImpl.java b/fe/src/main/java/org/apache/doris/qe/QeProcessorImpl.java index 1b005818e1fd75..c7876bf491eb4b 100644 --- a/fe/src/main/java/org/apache/doris/qe/QeProcessorImpl.java +++ b/fe/src/main/java/org/apache/doris/qe/QeProcessorImpl.java @@ -55,7 +55,7 @@ public void registerQuery(TUniqueId queryId, Coordinator coord) throws UserExcep @Override public void registerQuery(TUniqueId queryId, QueryInfo info) throws UserException { - LOG.info("register query id = " + DebugUtil.printId(queryId)); + LOG.info("register query id = " + DebugUtil.printId(queryId) + ", job: " + info.getCoord().getJobId()); final QueryInfo result = coordinatorMap.putIfAbsent(queryId, info); if (result != null) { throw new UserException("queryId " + queryId + " already exists"); diff --git a/fe/src/main/java/org/apache/doris/task/PullLoadJobMgr.java b/fe/src/main/java/org/apache/doris/task/PullLoadJobMgr.java index a8298156a54e7f..f3e4865ef95b3a 100644 --- a/fe/src/main/java/org/apache/doris/task/PullLoadJobMgr.java +++ b/fe/src/main/java/org/apache/doris/task/PullLoadJobMgr.java @@ -17,8 +17,8 @@ package org.apache.doris.task; -import org.apache.doris.common.UserException; import org.apache.doris.common.Status; +import org.apache.doris.common.UserException; import org.apache.doris.thrift.TStatusCode; import com.google.common.collect.Maps; @@ -151,6 +151,11 @@ private void processOneTask(PullLoadTask task, PullLoadJob job) throws UserExcep if (!needRetry) { break; } + try { + Thread.sleep(5000); + } catch (InterruptedException e) { + + } } } } @@ -182,8 +187,9 @@ public void run() { job.onTaskFailed(task); } } catch (Throwable e) { - LOG.warn("Process one pull load task exception. taskId={}:{}", task.jobId, task.taskId, e); - task.onFailed(new Status(TStatusCode.INTERNAL_ERROR, e.getMessage())); + LOG.warn("Process one pull load task exception. job id: {}, task id: {}", + task.jobId, task.taskId, e); + task.onFailed(null, new Status(TStatusCode.INTERNAL_ERROR, e.getMessage())); job.onTaskFailed(task); } } diff --git a/fe/src/main/java/org/apache/doris/task/PullLoadTask.java b/fe/src/main/java/org/apache/doris/task/PullLoadTask.java index d5fea3737a1a7f..791527df4e3bae 100644 --- a/fe/src/main/java/org/apache/doris/task/PullLoadTask.java +++ b/fe/src/main/java/org/apache/doris/task/PullLoadTask.java @@ -21,8 +21,9 @@ import org.apache.doris.catalog.Database; import org.apache.doris.catalog.OlapTable; import org.apache.doris.common.Config; -import org.apache.doris.common.UserException; import org.apache.doris.common.Status; +import org.apache.doris.common.UserException; +import org.apache.doris.common.util.DebugUtil; import org.apache.doris.load.BrokerFileGroup; import org.apache.doris.qe.Coordinator; import org.apache.doris.qe.QeProcessorImpl; @@ -52,7 +53,6 @@ public class PullLoadTask { public final List fileGroups; public final long jobDeadlineMs; - // s private PullLoadTaskPlanner planner; // Useful things after executed @@ -69,7 +69,7 @@ private enum State { CANCELLED, } - private TUniqueId executeId; + private TUniqueId queryId; private Coordinator curCoordinator; private State executeState = State.RUNNING; private Status executeStatus; @@ -128,10 +128,12 @@ public Status getExecuteStatus() { return executeStatus; } - public synchronized void onCancelled() { + public synchronized void onCancelled(String reason) { if (executeState == State.RUNNING) { executeState = State.CANCELLED; executeStatus = Status.CANCELLED; + LOG.info("cancel one pull load task({}). task id: {}, query id: {}, job id: {}", + reason, taskId, DebugUtil.printId(curCoordinator.getQueryId()), jobId); } } @@ -145,30 +147,27 @@ public synchronized void onFinished(Map fileMap, this.fileMap = fileMap; this.counters = counters; this.trackingUrl = trackingUrl; + LOG.info("finished one pull load task. task id: {}, query id: {}, job id: {}", + taskId, DebugUtil.printId(curCoordinator.getQueryId()), jobId); } } public synchronized void onFailed(TUniqueId id, Status failStatus) { if (executeState == State.RUNNING) { - if (!executeId.equals(id)) { + if (id != null && !queryId.equals(id)) { return; } executeState = State.FAILED; executeStatus = failStatus; - } - } - - public synchronized void onFailed(Status failStatus) { - if (executeState == State.RUNNING) { - executeState = State.FAILED; - executeStatus = failStatus; + LOG.info("failed one pull load task({}). task id: {}, query id: {}, job id: {}", + failStatus.getErrorMsg(), taskId, id != null ? DebugUtil.printId(id) : "NaN", jobId); } } private void actualExecute() { int waitSecond = (int) (getLeftTimeMs() / 1000); if (waitSecond <= 0) { - onCancelled(); + onCancelled("waiting timeout"); return; } @@ -176,8 +175,11 @@ private void actualExecute() { try { curCoordinator.exec(); } catch (Exception e) { - onFailed(executeId, new Status(TStatusCode.INTERNAL_ERROR, "Coordinator execute failed.")); + LOG.warn("pull load task exec failed", e); + onFailed(queryId, new Status(TStatusCode.INTERNAL_ERROR, "Coordinator execute failed: " + e.getMessage())); + return; } + if (curCoordinator.join(waitSecond)) { Status status = curCoordinator.getExecStatus(); if (status.ok()) { @@ -187,10 +189,10 @@ private void actualExecute() { } onFinished(resultFileMap, curCoordinator.getLoadCounters(), curCoordinator.getTrackingUrl()); } else { - onFailed(executeId, status); + onFailed(queryId, status); } } else { - onCancelled(); + onCancelled("execution timeout"); } } @@ -205,8 +207,8 @@ public void executeOnce() throws UserException { // New one query id, UUID uuid = UUID.randomUUID(); - executeId = new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits()); - curCoordinator = new Coordinator(executeId, planner.getDescTable(), + queryId = new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits()); + curCoordinator = new Coordinator(jobId, queryId, planner.getDescTable(), planner.getFragments(), planner.getScanNodes(), db.getClusterName()); curCoordinator.setQueryType(TQueryType.LOAD); curCoordinator.setExecMemoryLimit(execMemLimit); @@ -215,15 +217,14 @@ public void executeOnce() throws UserException { boolean needUnregister = false; try { - QeProcessorImpl.INSTANCE - .registerQuery(executeId, curCoordinator); + QeProcessorImpl.INSTANCE.registerQuery(queryId, curCoordinator); actualExecute(); needUnregister = true; } catch (UserException e) { - onFailed(executeId, new Status(TStatusCode.INTERNAL_ERROR, e.getMessage())); + onFailed(queryId, new Status(TStatusCode.INTERNAL_ERROR, e.getMessage())); } finally { if (needUnregister) { - QeProcessorImpl.INSTANCE.unregisterQuery(executeId); + QeProcessorImpl.INSTANCE.unregisterQuery(queryId); } synchronized (this) { curThread = null; diff --git a/fe/src/test/java/org/apache/doris/qe/StmtExecutorTest.java b/fe/src/test/java/org/apache/doris/qe/StmtExecutorTest.java index a8f2b93320d741..2a9a65650f6b78 100644 --- a/fe/src/test/java/org/apache/doris/qe/StmtExecutorTest.java +++ b/fe/src/test/java/org/apache/doris/qe/StmtExecutorTest.java @@ -161,6 +161,7 @@ public void testSelect() throws Exception { EasyMock.expectLastCall().anyTimes(); EasyMock.expect(cood.getQueryProfile()).andReturn(new RuntimeProfile()).anyTimes(); EasyMock.expect(cood.getNext()).andReturn(new RowBatch()).anyTimes(); + EasyMock.expect(cood.getJobId()).andReturn(-1L).anyTimes(); EasyMock.replay(cood); PowerMock.expectNew(Coordinator.class, EasyMock.isA(ConnectContext.class), EasyMock.isA(Analyzer.class), EasyMock.isA(Planner.class)) diff --git a/fe/src/test/java/org/apache/doris/task/AgentTaskTest.java b/fe/src/test/java/org/apache/doris/task/AgentTaskTest.java index 3160b5a078de23..2df873efc7ad77 100644 --- a/fe/src/test/java/org/apache/doris/task/AgentTaskTest.java +++ b/fe/src/test/java/org/apache/doris/task/AgentTaskTest.java @@ -17,8 +17,6 @@ package org.apache.doris.task; -import com.google.common.collect.Range; - import org.apache.doris.catalog.AggregateType; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.KeysType; @@ -35,6 +33,9 @@ import org.apache.doris.thrift.TStorageMedium; import org.apache.doris.thrift.TStorageType; import org.apache.doris.thrift.TTaskType; + +import com.google.common.collect.Range; + import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -75,7 +76,7 @@ public class AgentTaskTest { private TStorageType storageType = TStorageType.COLUMN; private List columns; - private MarkedCountDownLatch latch = new MarkedCountDownLatch(3); + private MarkedCountDownLatch latch = new MarkedCountDownLatch(3); private Range range1; private Range range2; diff --git a/run-ut.sh b/run-ut.sh index 245f7d3b8a91da..fb421a0749abc5 100755 --- a/run-ut.sh +++ b/run-ut.sh @@ -98,7 +98,7 @@ if [ ${RUN} -ne 1 ]; then fi echo "******************************" -echo " Runing PaloBe Unittest " +echo " Running PaloBe Unittest " echo "******************************" cd ${DORIS_HOME} @@ -120,7 +120,7 @@ if [ ${RUN} -ne 1 ]; then fi echo "******************************" -echo " Runing PaloBe Unittest " +echo " Running PaloBe Unittest " echo "******************************" export DORIS_TEST_BINARY_DIR=${DORIS_TEST_BINARY_DIR}/test/