From 08d3207df1e32212be5bdca3a582e831522c0068 Mon Sep 17 00:00:00 2001 From: emmymiao87 <522274284@qq.com> Date: Wed, 6 Mar 2019 10:41:58 +0800 Subject: [PATCH 1/2] Change the relationskip between txn and task 1. Check if properties is null before check routine load properties 2. Change transactionStateChange reason to string 3. calculate current num by beId 4. Add kafka offset properties 5. Perfer to use previous be id 6. Add before commit listerner of txn: if txn is committed after task is aborted, commit will be aborted 7. queryId of stream load plan = taskId --- .../runtime/stream_load/stream_load_context.h | 3 +- .../doris/analysis/CreateRoutineLoadStmt.java | 59 ++++--- .../doris/load/TxnStateChangeListener.java | 9 +- .../doris/load/routineload/KafkaProgress.java | 11 +- .../load/routineload/KafkaRoutineLoadJob.java | 13 +- .../doris/load/routineload/KafkaTaskInfo.java | 6 +- .../load/routineload/RoutineLoadJob.java | 157 ++++++++++++------ .../load/routineload/RoutineLoadManager.java | 83 ++++++--- .../routineload/RoutineLoadScheduler.java | 3 +- .../load/routineload/RoutineLoadTaskInfo.java | 28 +++- .../routineload/RoutineLoadTaskScheduler.java | 42 +++-- .../doris/planner/StreamLoadScanNode.java | 2 +- .../org/apache/doris/qe/ShowExecutor.java | 3 +- .../doris/service/FrontendServiceImpl.java | 3 +- .../transaction/GlobalTransactionMgr.java | 10 +- .../doris/transaction/TransactionState.java | 4 +- .../routineload/RoutineLoadManagerTest.java | 5 +- .../RoutineLoadTaskSchedulerTest.java | 6 +- 18 files changed, 309 insertions(+), 138 deletions(-) diff --git a/be/src/runtime/stream_load/stream_load_context.h b/be/src/runtime/stream_load/stream_load_context.h index 11ff1f3355f362..4569051fd45b50 100644 --- a/be/src/runtime/stream_load/stream_load_context.h +++ b/be/src/runtime/stream_load/stream_load_context.h @@ -43,7 +43,8 @@ class KafkaLoadInfo { KafkaLoadInfo(const TKafkaLoadInfo& t_info): brokers(t_info.brokers), topic(t_info.topic), - begin_offset(t_info.partition_begin_offset) { + begin_offset(t_info.partition_begin_offset), + cmt_offset(t_info.partition_begin_offset) { if (t_info.__isset.max_interval_s) { max_interval_s = t_info.max_interval_s; } if (t_info.__isset.max_batch_rows) { max_batch_rows = t_info.max_batch_rows; } diff --git a/fe/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java b/fe/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java index 967de0ae0e7e23..6826f3aaaac11d 100644 --- a/fe/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java +++ b/fe/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java @@ -82,6 +82,7 @@ public class CreateRoutineLoadStmt extends DdlStmt { public static final String KAFKA_TOPIC_PROPERTY = "kafka_topic"; // optional public static final String KAFKA_PARTITIONS_PROPERTY = "kafka_partitions"; + public static final String KAFKA_OFFSETS_PROPERTY = "kafka_offsets"; private static final String NAME_TYPE = "ROUTINE LOAD NAME"; private static final String ENDPOINT_REGEX = "[-A-Za-z0-9+&@#/%?=~_|!:,.;]+[-A-Za-z0-9+&@#/%=~_|]"; @@ -96,6 +97,7 @@ public class CreateRoutineLoadStmt extends DdlStmt { .add(KAFKA_BROKER_LIST_PROPERTY) .add(KAFKA_TOPIC_PROPERTY) .add(KAFKA_PARTITIONS_PROPERTY) + .add(KAFKA_OFFSETS_PROPERTY) .build(); private final String name; @@ -113,6 +115,7 @@ public class CreateRoutineLoadStmt extends DdlStmt { private String kafkaBrokerList; private String kafkaTopic; private List kafkaPartitions; + private List kafkaOffsets; public CreateRoutineLoadStmt(String name, TableName dbTableName, List loadPropertyList, Map properties, @@ -170,6 +173,10 @@ public List getKafkaPartitions() { return kafkaPartitions; } + public List getKafkaOffsets(){ + return kafkaOffsets; + } + @Override public void analyze(Analyzer analyzer) throws AnalysisException, UserException { super.analyze(analyzer); @@ -243,30 +250,32 @@ private void checkLoadProperties(Analyzer analyzer) throws AnalysisException { } private void checkRoutineLoadProperties() throws AnalysisException { - Optional optional = properties.keySet().parallelStream() - .filter(entity -> !PROPERTIES_SET.contains(entity)).findFirst(); - if (optional.isPresent()) { - throw new AnalysisException(optional.get() + " is invalid property"); - } - - // check desired concurrent number - final String desiredConcurrentNumberString = properties.get(DESIRED_CONCURRENT_NUMBER_PROPERTY); - if (desiredConcurrentNumberString != null) { - desiredConcurrentNum = getIntegerValueFromString(desiredConcurrentNumberString, - DESIRED_CONCURRENT_NUMBER_PROPERTY); - if (desiredConcurrentNum <= 0) { - throw new AnalysisException(DESIRED_CONCURRENT_NUMBER_PROPERTY + " must be greater then 0"); + if (properties != null) { + Optional optional = properties.keySet().parallelStream() + .filter(entity -> !PROPERTIES_SET.contains(entity)).findFirst(); + if (optional.isPresent()) { + throw new AnalysisException(optional.get() + " is invalid property"); } - } - // check max error number - final String maxErrorNumberString = properties.get(MAX_ERROR_NUMBER_PROPERTY); - if (maxErrorNumberString != null) { - maxErrorNum = getIntegerValueFromString(maxErrorNumberString, MAX_ERROR_NUMBER_PROPERTY); - if (maxErrorNum < 0) { - throw new AnalysisException(MAX_ERROR_NUMBER_PROPERTY + " must be greater then or equal to 0"); + // check desired concurrent number + final String desiredConcurrentNumberString = properties.get(DESIRED_CONCURRENT_NUMBER_PROPERTY); + if (desiredConcurrentNumberString != null) { + desiredConcurrentNum = getIntegerValueFromString(desiredConcurrentNumberString, + DESIRED_CONCURRENT_NUMBER_PROPERTY); + if (desiredConcurrentNum <= 0) { + throw new AnalysisException(DESIRED_CONCURRENT_NUMBER_PROPERTY + " must be greater then 0"); + } } + // check max error number + final String maxErrorNumberString = properties.get(MAX_ERROR_NUMBER_PROPERTY); + if (maxErrorNumberString != null) { + maxErrorNum = getIntegerValueFromString(maxErrorNumberString, MAX_ERROR_NUMBER_PROPERTY); + if (maxErrorNum < 0) { + throw new AnalysisException(MAX_ERROR_NUMBER_PROPERTY + " must be greater then or equal to 0"); + } + + } } } @@ -326,6 +335,16 @@ private void checkKafkaCustomProperties() throws AnalysisException { } } } + // check offsets + // Todo(ml) + final String kafkaOffsetsString = customProperties.get(KAFKA_OFFSETS_PROPERTY); + if (kafkaOffsetsString != null) { + kafkaOffsets = new ArrayList<>(); + String[] kafkaOffsetsStringList = customProperties.get(KAFKA_OFFSETS_PROPERTY).split(","); + for (String s : kafkaOffsetsStringList) { + kafkaOffsets.add(Long.valueOf(s)); + } + } } private int getIntegerValueFromString(String valueString, String propertyName) throws AnalysisException { diff --git a/fe/src/main/java/org/apache/doris/load/TxnStateChangeListener.java b/fe/src/main/java/org/apache/doris/load/TxnStateChangeListener.java index d394c51897316d..8ec5d940dfe2e9 100644 --- a/fe/src/main/java/org/apache/doris/load/TxnStateChangeListener.java +++ b/fe/src/main/java/org/apache/doris/load/TxnStateChangeListener.java @@ -18,16 +18,19 @@ package org.apache.doris.load; import org.apache.doris.transaction.AbortTransactionException; +import org.apache.doris.transaction.TransactionException; import org.apache.doris.transaction.TransactionState; public interface TxnStateChangeListener { + void beforeCommitted(TransactionState txnState) throws TransactionException; + /** * update catalog of job which has related txn after transaction has been committed * * @param txnState */ - void onCommitted(TransactionState txnState); + void onCommitted(TransactionState txnState) throws TransactionException; /** * this interface is executed before txn aborted, you can check if txn could be abort in this stage @@ -37,7 +40,7 @@ public interface TxnStateChangeListener { * @throws AbortTransactionException if transaction could not be abort or there are some exception before aborted, * it will throw this exception */ - void beforeAborted(TransactionState txnState, TransactionState.TxnStatusChangeReason txnStatusChangeReason) + void beforeAborted(TransactionState txnState, String txnStatusChangeReason) throws AbortTransactionException; /** @@ -46,5 +49,5 @@ void beforeAborted(TransactionState txnState, TransactionState.TxnStatusChangeRe * @param txnState * @param txnStatusChangeReason maybe null */ - void onAborted(TransactionState txnState, TransactionState.TxnStatusChangeReason txnStatusChangeReason); + void onAborted(TransactionState txnState, String txnStatusChangeReason); } diff --git a/fe/src/main/java/org/apache/doris/load/routineload/KafkaProgress.java b/fe/src/main/java/org/apache/doris/load/routineload/KafkaProgress.java index 97097cfbbb75ec..c344f08c3f0e1a 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/KafkaProgress.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/KafkaProgress.java @@ -35,6 +35,7 @@ // {"partitionIdToOffset": {}} public class KafkaProgress extends RoutineLoadProgress { + // (partition id, begin offset) private Map partitionIdToOffset; public KafkaProgress() { @@ -57,7 +58,7 @@ public void setPartitionIdToOffset(Map partitionIdToOffset) { public void update(RoutineLoadProgress progress) { KafkaProgress newProgress = (KafkaProgress) progress; newProgress.getPartitionIdToOffset().entrySet().parallelStream() - .forEach(entity -> partitionIdToOffset.put(entity.getKey(), entity.getValue())); + .forEach(entity -> partitionIdToOffset.put(entity.getKey(), entity.getValue() + 1)); } @Override @@ -78,9 +79,15 @@ public void readFields(DataInput in) throws IOException { } } + // (partition id, end offset) + // end offset = -1 while begin offset of partition is 0 @Override public String toString() { + Map showPartitionIdToOffset = new HashMap<>(); + for (Map.Entry entry : partitionIdToOffset.entrySet()) { + showPartitionIdToOffset.put(entry.getKey(), entry.getValue() - 1); + } return "KafkaProgress [partitionIdToOffset=" - + Joiner.on("|").withKeyValueSeparator("_").join(partitionIdToOffset) + "]"; + + Joiner.on("|").withKeyValueSeparator("_").join(showPartitionIdToOffset) + "]"; } } diff --git a/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java b/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java index 44790073575b1e..cd81b0e7eb5e40 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java @@ -119,7 +119,7 @@ private void setCustomKafkaPartitions(List kafkaPartitions) throws Load } @Override - public List divideRoutineLoadJob(int currentConcurrentTaskNum) { + public void divideRoutineLoadJob(int currentConcurrentTaskNum) { List result = new ArrayList<>(); writeLock(); try { @@ -148,7 +148,6 @@ public List divideRoutineLoadJob(int currentConcurrentTaskN } finally { writeUnlock(); } - return result; } @Override @@ -261,9 +260,7 @@ public static KafkaRoutineLoadJob fromCreateStmt(CreateRoutineLoadStmt stmt) thr private void updateNewPartitionProgress() { // update the progress of new partitions for (Integer kafkaPartition : currentKafkaPartitions) { - if (((KafkaProgress) progress).getPartitionIdToOffset().containsKey(kafkaPartition)) { - ((KafkaProgress) progress).getPartitionIdToOffset().get(kafkaPartition); - } else { + if (!((KafkaProgress) progress).getPartitionIdToOffset().containsKey(kafkaPartition)) { ((KafkaProgress) progress).getPartitionIdToOffset().put(kafkaPartition, 0L); } } @@ -290,6 +287,12 @@ private void setOptional(CreateRoutineLoadStmt stmt) throws LoadException { } if (stmt.getKafkaPartitions() != null) { setCustomKafkaPartitions(stmt.getKafkaPartitions()); + if (stmt.getKafkaOffsets() != null) { + for (int i = 0; i < customKafkaPartitions.size(); i++) { + ((KafkaProgress) progress).getPartitionIdToOffset() + .put(customKafkaPartitions.get(i), stmt.getKafkaOffsets().get(i)); + } + } } } } diff --git a/fe/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java b/fe/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java index d3ea5f5ae06ce6..f961c41b5ac360 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java @@ -52,7 +52,7 @@ public KafkaTaskInfo(UUID id, long jobId) { public KafkaTaskInfo(KafkaTaskInfo kafkaTaskInfo) throws LabelAlreadyUsedException, BeginTransactionException, AnalysisException { - super(UUID.randomUUID(), kafkaTaskInfo.getJobId()); + super(UUID.randomUUID(), kafkaTaskInfo.getJobId(), kafkaTaskInfo.getPreviousBeId()); this.partitions = kafkaTaskInfo.getPartitions(); } @@ -109,6 +109,10 @@ private TExecPlanFragmentParams updateTExecPlanFragmentParams(RoutineLoadJob rou TExecPlanFragmentParams tExecPlanFragmentParams = routineLoadJob.gettExecPlanFragmentParams(); TPlanFragment tPlanFragment = tExecPlanFragmentParams.getFragment(); tPlanFragment.getOutput_sink().getOlap_table_sink().setTxn_id(this.txnId); + TUniqueId queryId = new TUniqueId(id.getMostSignificantBits(), id.getLeastSignificantBits()); + tExecPlanFragmentParams.getParams().setQuery_id(queryId); + tExecPlanFragmentParams.getParams().getPer_node_scan_ranges().values().stream() + .forEach(entity -> entity.get(0).getScan_range().getBroker_scan_range().getRanges().get(0).setLoad_id(queryId)); return tExecPlanFragmentParams; } } diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java index 524c577bbfb8b9..dc690345bfb6a4 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java @@ -17,6 +17,7 @@ package org.apache.doris.load.routineload; +import com.google.common.collect.Maps; import org.apache.doris.analysis.CreateRoutineLoadStmt; import org.apache.doris.analysis.TableName; import org.apache.doris.catalog.Catalog; @@ -41,6 +42,7 @@ import org.apache.doris.thrift.TResourceInfo; import org.apache.doris.transaction.AbortTransactionException; import org.apache.doris.transaction.BeginTransactionException; +import org.apache.doris.transaction.TransactionException; import org.apache.doris.transaction.TransactionState; import com.google.common.annotations.VisibleForTesting; @@ -54,7 +56,9 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.Optional; +import java.util.UUID; import java.util.concurrent.locks.ReentrantReadWriteLock; /** @@ -280,7 +284,7 @@ public String getPartitions() { } public String getClusterName() throws MetaNotFoundException { - Database database = Catalog.getCurrentCatalog().getDb(id); + Database database = Catalog.getCurrentCatalog().getDb(dbId); if (database == null) { throw new MetaNotFoundException("Database " + dbId + "has been deleted"); } @@ -363,16 +367,6 @@ public List processTimeoutTasks() { continue; } } - - try { - result.add(unprotectRenewTask(routineLoadTaskInfo)); - LOG.debug("Task {} was ran more then {} minutes. It was removed and rescheduled", - oldSignature, DEFAULT_TASK_TIMEOUT_SECONDS); - } catch (UserException e) { - state = JobState.CANCELLED; - // TODO(ml): edit log - LOG.warn("failed to renew a routine load task in job {} with error message {}", id, e.getMessage()); - } } } } finally { @@ -381,12 +375,32 @@ public List processTimeoutTasks() { return result; } - abstract List divideRoutineLoadJob(int currentConcurrentTaskNum); + abstract void divideRoutineLoadJob(int currentConcurrentTaskNum); public int calculateCurrentConcurrentTaskNum() throws MetaNotFoundException { return 0; } + public Map getBeIdToConcurrentTaskNum() { + Map beIdConcurrentTasksNum = Maps.newHashMap(); + readLock(); + try { + for (RoutineLoadTaskInfo routineLoadTaskInfo : routineLoadTaskInfoList) { + if (routineLoadTaskInfo.getBeId() != -1L) { + long beId = routineLoadTaskInfo.getBeId(); + if (beIdConcurrentTasksNum.containsKey(beId)) { + beIdConcurrentTasksNum.put(beId, beIdConcurrentTasksNum.get(beId) + 1); + } else { + beIdConcurrentTasksNum.put(beId, 1); + } + } + } + return beIdConcurrentTasksNum; + } finally { + readUnlock(); + } + } + @Override public void write(DataOutput out) throws IOException { // TODO(ml) @@ -409,10 +423,10 @@ public void removeNeedScheduleTask(RoutineLoadTaskInfo routineLoadTaskInfo) { // if rate of error data is more then max_filter_ratio, pause job protected void updateProgress(RLTaskTxnCommitAttachment attachment) { - updateNumOfData(attachment.getFilteredRows(), attachment.getLoadedRows()); + updateNumOfData(attachment.getFilteredRows(), attachment.getLoadedRows() + attachment.getFilteredRows()); } - public boolean containsTask(String taskId) { + public boolean containsTask(UUID taskId) { readLock(); try { return routineLoadTaskInfoList.parallelStream() @@ -488,22 +502,13 @@ public void plan() throws UserException { } @Override - public void beforeAborted(TransactionState txnState, TransactionState.TxnStatusChangeReason txnStatusChangeReason) + public void beforeAborted(TransactionState txnState, String txnStatusChangeReason) throws AbortTransactionException { readLock(); try { - if (txnStatusChangeReason != null) { - switch (txnStatusChangeReason) { - case TIMEOUT: - default: - String taskId = txnState.getLabel(); - if (routineLoadTaskInfoList.parallelStream().anyMatch(entity -> entity.getId().equals(taskId))) { - throw new AbortTransactionException( - "there are task " + taskId + " related to this txn, " - + "txn could not be abort", txnState.getTransactionId()); - } - break; - } + String taskId = txnState.getLabel(); + if (routineLoadTaskInfoList.parallelStream().anyMatch(entity -> entity.getId().toString().equals(taskId))) { + LOG.debug("there are a txn of routine load task will be aborted"); } } finally { readUnlock(); @@ -511,49 +516,99 @@ public void beforeAborted(TransactionState txnState, TransactionState.TxnStatusC } @Override - public void onCommitted(TransactionState txnState) { - // step0: get progress from transaction state - RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment = (RLTaskTxnCommitAttachment) txnState.getTxnCommitAttachment(); + public void beforeCommitted(TransactionState txnState) throws TransactionException { + readLock(); + try { + // check if task has been aborted + Optional routineLoadTaskInfoOptional = + routineLoadTaskInfoList.parallelStream() + .filter(entity -> entity.getId().toString().equals(txnState.getLabel())).findFirst(); + if (!routineLoadTaskInfoOptional.isPresent()) { + throw new TransactionException("txn " + txnState.getTransactionId() + " could not be committed" + + " while task " + txnState.getLabel() + "has been aborted "); + } + } finally { + readUnlock(); + } + } + @Override + public void onCommitted(TransactionState txnState) throws TransactionException { writeLock(); try { - // step1: find task in job + // step0: find task in job Optional routineLoadTaskInfoOptional = routineLoadTaskInfoList.parallelStream() - .filter(entity -> entity.getId().equals(txnState.getLabel())).findFirst(); + .filter(entity -> entity.getId().toString().equals(txnState.getLabel())).findFirst(); if (routineLoadTaskInfoOptional.isPresent()) { - RoutineLoadTaskInfo routineLoadTaskInfo = routineLoadTaskInfoOptional.get(); - - // step2: update job progress - updateProgress(rlTaskTxnCommitAttachment); - - if (state == JobState.RUNNING) { - // step3: create a new task for partitions - RoutineLoadTaskInfo newRoutineLoadTaskInfo = unprotectRenewTask(routineLoadTaskInfo); - Catalog.getCurrentCatalog().getRoutineLoadManager() - .getNeedScheduleTasksQueue().add(newRoutineLoadTaskInfo); - } + executeCommitTask(routineLoadTaskInfoOptional.get(), txnState); } else { LOG.debug("There is no {} task in task info list. Maybe task has been renew or job state has changed. " + " Transaction {} will not be committed", txnState.getLabel(), txnState.getTransactionId()); + throw new TransactionException("txn " + txnState.getTransactionId() + " could not be committed" + + " while task " + txnState.getLabel() + "has been aborted "); } + } catch (TransactionException e) { + LOG.warn(e.getMessage(), e); + throw e; } catch (Throwable e) { - LOG.error("failed to update offset in routine load task {} when transaction {} has been committed. " - + "change job to paused", - rlTaskTxnCommitAttachment.getTaskId(), txnState.getTransactionId(), e); + LOG.warn(e.getMessage(), e); updateState(JobState.PAUSED, "failed to update offset when transaction " - + txnState.getTransactionId() + " has been committed"); + + txnState.getTransactionId() + " has been committed"); } finally { writeUnlock(); } } + // txn will be aborted but progress will be update + // be will abort txn when all of kafka data is wrong + // progress will be update otherwise the progress will be hung @Override - public void onAborted(TransactionState txnState, TransactionState.TxnStatusChangeReason txnStatusChangeReason) { - updateState(JobState.PAUSED, txnStatusChangeReason.name()); - LOG.debug("job {} need to be pause while txn {} abort with reason {}", - id, txnState.getTransactionId(), txnStatusChangeReason.name()); + public void onAborted(TransactionState txnState, String txnStatusChangeReason) { + if (txnStatusChangeReason != null) { + LOG.debug("task will be reschedule when txn {} abort with reason {}", txnState.getTransactionId(), + txnStatusChangeReason); + } else { + LOG.debug("task will be reschedule when txn {} abort", txnState.getTransactionId()); + } + writeLock(); + try { + // step0: find task in job + Optional routineLoadTaskInfoOptional = + routineLoadTaskInfoList.parallelStream() + .filter(entity -> entity.getId().toString().equals(txnState.getLabel())).findFirst(); + if (routineLoadTaskInfoOptional.isPresent()) { + executeCommitTask(routineLoadTaskInfoOptional.get(), txnState); + } else { + LOG.debug("There is no {} task in task info list. Maybe task has been renew or job state has changed. " + + " Transaction {} will be aborted successfully", + txnState.getLabel(), txnState.getTransactionId()); + } + } catch (Exception e) { + updateState(JobState.PAUSED, + "failed to renew task when txn has been aborted with error " + e.getMessage()); + // TODO(ml): edit log + LOG.warn("failed to renew a routine load task in job {} with error message {}", id, e.getMessage()); + } finally { + writeUnlock(); + } + } + + // check task exists or not before call method + private void executeCommitTask(RoutineLoadTaskInfo routineLoadTaskInfo, TransactionState txnState) + throws LabelAlreadyUsedException, BeginTransactionException, AnalysisException { + // step0: get progress from transaction state + RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment = (RLTaskTxnCommitAttachment) txnState.getTxnCommitAttachment(); + // step1: update job progress + updateProgress(rlTaskTxnCommitAttachment); + + if (state == JobState.RUNNING) { + // step2: create a new task for partitions + RoutineLoadTaskInfo newRoutineLoadTaskInfo = unprotectRenewTask(routineLoadTaskInfo); + Catalog.getCurrentCatalog().getRoutineLoadManager() + .getNeedScheduleTasksQueue().add(newRoutineLoadTaskInfo); + } } protected static void unprotectCheckCreate(CreateRoutineLoadStmt stmt) throws AnalysisException { diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java index 4c5b938446d7ed..72fa8d0461ecbf 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java @@ -17,9 +17,11 @@ package org.apache.doris.load.routineload; +import com.google.common.base.Joiner; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Queues; +import com.sleepycat.je.tree.IN; import org.apache.doris.analysis.CreateRoutineLoadStmt; import org.apache.doris.analysis.PauseRoutineLoadStmt; import org.apache.doris.analysis.ResumeRoutineLoadStmt; @@ -53,7 +55,7 @@ public class RoutineLoadManager { // Long is beId, integer is the size of tasks in be private Map beIdToMaxConcurrentTasks; - private Map beIdToConcurrentTasks; +// private Map beIdToConcurrentTasks; // stream load job meta private Map idToRoutineLoadJob; @@ -82,7 +84,7 @@ private void writeUnlock() { public RoutineLoadManager() { idToRoutineLoadJob = Maps.newConcurrentMap(); dbToNameToRoutineLoadJob = Maps.newConcurrentMap(); - beIdToConcurrentTasks = Maps.newHashMap(); +// beIdToConcurrentTasks = Maps.newHashMap(); beIdToMaxConcurrentTasks = Maps.newHashMap(); needScheduleTasksQueue = Queues.newLinkedBlockingQueue(); lock = new ReentrantReadWriteLock(true); @@ -109,6 +111,7 @@ public int getTotalMaxConcurrentTaskNum() { public void updateBeIdTaskMaps() { writeLock(); try { + // step1: update backend number in all of cluster updateBeIdToMaxConcurrentTasks(); List beIds = Catalog.getCurrentSystemInfo().getBackendIds(true); @@ -121,7 +124,6 @@ public void updateBeIdTaskMaps() { newBeIds.parallelStream().forEach(entity -> beIdToMaxConcurrentTasks.put(entity, DEFAULT_BE_CONCURRENT_TASK_NUM)); for (long beId : unavailableBeIds) { beIdToMaxConcurrentTasks.remove(beId); - beIdToConcurrentTasks.remove(beId); } LOG.info("There are {} backends which participate in routine load scheduler. " + "There are {} new backends and {} unavailable backends for routine load", @@ -131,19 +133,22 @@ public void updateBeIdTaskMaps() { } } - public void addNumOfConcurrentTasksByBeId(long beId) { - writeLock(); - try { - if (beIdToConcurrentTasks.get(beId) == null) { - beIdToConcurrentTasks.put(beId, 1); - } else { - int concurrentTaskNum = (int) beIdToConcurrentTasks.get(beId); - concurrentTaskNum++; - beIdToConcurrentTasks.put(beId, concurrentTaskNum); + private Map getBeIdConcurrentTaskMaps() { + Map beIdToConcurrentTasks = Maps.newHashMap(); + for (RoutineLoadJob routineLoadJob : getRoutineLoadJobByState(RoutineLoadJob.JobState.RUNNING)) { + Map jobBeIdToConcurrentTaskNum = routineLoadJob.getBeIdToConcurrentTaskNum(); + for (Map.Entry entry : jobBeIdToConcurrentTaskNum.entrySet()) { + if (beIdToConcurrentTasks.containsKey(entry.getKey())) { + beIdToConcurrentTasks.put(entry.getKey(), beIdToConcurrentTasks.get(entry.getKey()) + entry.getValue()); + } else { + beIdToConcurrentTasks.put(entry.getKey(), entry.getValue()); + } } - } finally { - writeUnlock(); } + LOG.debug("beIdToConcurrentTasks is {}", Joiner.on(",") + .withKeyValueSeparator(":").join(beIdToConcurrentTasks)); + return beIdToConcurrentTasks; + } public void addRoutineLoadJob(CreateRoutineLoadStmt createRoutineLoadStmt) @@ -320,11 +325,12 @@ public int getClusterIdleSlotNum() { try { int result = 0; updateBeIdToMaxConcurrentTasks(); + Map beIdToConcurrentTasks = getBeIdConcurrentTaskMaps(); for (Map.Entry entry : beIdToMaxConcurrentTasks.entrySet()) { - if (beIdToConcurrentTasks.get(entry.getKey()) == null) { - result += entry.getValue(); - } else { + if (beIdToConcurrentTasks.containsKey(entry.getKey())) { result += entry.getValue() - beIdToConcurrentTasks.get(entry.getKey()); + } else { + result += entry.getValue(); } } return result; @@ -344,6 +350,7 @@ public long getMinTaskBeId(String clusterName) throws LoadException { long result = -1L; int maxIdleSlotNum = 0; updateBeIdToMaxConcurrentTasks(); + Map beIdToConcurrentTasks = getBeIdConcurrentTaskMaps(); for (Long beId : beIdsInCluster) { int idleTaskNum = 0; if (beIdToConcurrentTasks.containsKey(beId)) { @@ -351,6 +358,8 @@ public long getMinTaskBeId(String clusterName) throws LoadException { } else { idleTaskNum = DEFAULT_BE_CONCURRENT_TASK_NUM; } + LOG.debug("be {} has idle {}, concurrent task {}, max concurrent task {}", beId, idleTaskNum, + beIdToConcurrentTasks.get(beId), beIdToMaxConcurrentTasks.get(beId)); result = maxIdleSlotNum < idleTaskNum ? beId : result; maxIdleSlotNum = Math.max(maxIdleSlotNum, idleTaskNum); } @@ -363,6 +372,36 @@ public long getMinTaskBeId(String clusterName) throws LoadException { } } + public boolean checkBeToTask(long beId, String clusterName) throws LoadException { + List beIdsInCluster = Catalog.getCurrentSystemInfo().getClusterBackendIds(clusterName); + if (beIdsInCluster == null) { + throw new LoadException("The " + clusterName + " has been deleted"); + } + + if (!beIdsInCluster.contains(beId)) { + LOG.debug("the previous be id {} does not belong to cluster name {}", beId, clusterName); + return false; + } + + // check if be has idle slot + readLock(); + try { + int idleTaskNum = 0; + Map beIdToConcurrentTasks = getBeIdConcurrentTaskMaps(); + if (beIdToConcurrentTasks.containsKey(beId)) { + idleTaskNum = beIdToMaxConcurrentTasks.get(beId) - beIdToConcurrentTasks.get(beId); + } else { + idleTaskNum = DEFAULT_BE_CONCURRENT_TASK_NUM; + } + if (idleTaskNum > 0) { + return true; + } + return false; + } finally { + readUnlock(); + } + } + public RoutineLoadJob getJob(long jobId) { return idToRoutineLoadJob.get(jobId); } @@ -385,7 +424,7 @@ public RoutineLoadJob getJobByName(String jobName) { } Optional optional = routineLoadJobList.parallelStream() .filter(entity -> !entity.getState().isFinalState()).findFirst(); - if (optional.isPresent()) { + if (!optional.isPresent()) { return null; } return optional.get(); @@ -394,7 +433,7 @@ public RoutineLoadJob getJobByName(String jobName) { } } - public RoutineLoadJob getJobByTaskId(String taskId) throws MetaNotFoundException { + public RoutineLoadJob getJobByTaskId(UUID taskId) throws MetaNotFoundException { for (RoutineLoadJob routineLoadJob : idToRoutineLoadJob.values()) { if (routineLoadJob.containsTask(taskId)) { return routineLoadJob; @@ -411,12 +450,10 @@ public List getRoutineLoadJobByState(RoutineLoadJob.JobState job return stateJobs; } - public List processTimeoutTasks() { - List routineLoadTaskInfoList = new ArrayList<>(); + public void processTimeoutTasks() { for (RoutineLoadJob routineLoadJob : idToRoutineLoadJob.values()) { - routineLoadTaskInfoList.addAll(routineLoadJob.processTimeoutTasks()); + routineLoadJob.processTimeoutTasks(); } - return routineLoadTaskInfoList; } // Remove old routine load jobs from idToRoutineLoadJob diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadScheduler.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadScheduler.java index e8e744d22848cf..20f4a005b93ee3 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadScheduler.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadScheduler.java @@ -96,8 +96,7 @@ private void process() { LOG.debug("begin to check timeout tasks"); // check timeout tasks - List rescheduleTasksList = routineLoadManager.processTimeoutTasks(); - routineLoadManager.addTasksToNeedScheduleQueue(rescheduleTasksList); + routineLoadManager.processTimeoutTasks(); } private List getNeedScheduleRoutineJobs() throws LoadException { diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java index 3fa4ce1efccda5..4d0af1a5c535a2 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java @@ -23,8 +23,6 @@ import org.apache.doris.common.LabelAlreadyUsedException; import org.apache.doris.common.LoadException; import org.apache.doris.common.UserException; -import org.apache.doris.task.RoutineLoadTask; -import org.apache.doris.thrift.TExecPlanFragmentParams; import org.apache.doris.thrift.TRoutineLoadTask; import org.apache.doris.transaction.BeginTransactionException; import org.apache.doris.transaction.TransactionState; @@ -46,13 +44,23 @@ public abstract class RoutineLoadTaskInfo { protected long jobId; private long createTimeMs; private long loadStartTimeMs; - private TExecPlanFragmentParams tExecPlanFragmentParams; + // the be id of previous task + protected long previousBeId = -1L; + // the be id of this task + protected long beId = -1L; public RoutineLoadTaskInfo(UUID id, long jobId) { this.id = id; this.jobId = jobId; this.createTimeMs = System.currentTimeMillis(); } + + public RoutineLoadTaskInfo(UUID id, long jobId, long previousBeId) { + this.id = id; + this.jobId = jobId; + this.createTimeMs = System.currentTimeMillis(); + this.previousBeId = previousBeId; + } public UUID getId() { return id; @@ -65,7 +73,19 @@ public long getJobId() { public void setLoadStartTimeMs(long loadStartTimeMs) { this.loadStartTimeMs = loadStartTimeMs; } - + + public long getPreviousBeId() { + return previousBeId; + } + + public void setBeId(long beId) { + this.beId = beId; + } + + public long getBeId() { + return beId; + } + public long getLoadStartTimeMs() { return loadStartTimeMs; } diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java index 9141c2e8f3ed7f..0c56b40c452075 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java @@ -98,17 +98,18 @@ private void process() throws LoadException, UserException, InterruptedException RoutineLoadTaskInfo routineLoadTaskInfo = needScheduleTasksQueue.peek(); RoutineLoadJob routineLoadJob = null; try { - routineLoadJob = routineLoadManager.getJobByTaskId(routineLoadTaskInfo.getId().toString()); + routineLoadJob = routineLoadManager.getJobByTaskId(routineLoadTaskInfo.getId()); + allocateTaskToBe(routineLoadTaskInfo, routineLoadJob); + routineLoadTaskInfo.beginTxn(); } catch (MetaNotFoundException e) { - LOG.warn("task {} has been abandoned", routineLoadTaskInfo.getId()); + needScheduleTasksQueue.take(); + // task has been abandoned while renew task has been added in queue + // or database has been deleted + LOG.warn("task {} has been abandoned with error message {}", + routineLoadTaskInfo.getId(), e.getMessage(), e); return; - } - long beId; - try { - beId = routineLoadManager.getMinTaskBeId(routineLoadJob.getClusterName()); - routineLoadTaskInfo.beginTxn(); - } catch (Exception e) { - LOG.warn("put task to the rear of queue with error " + e.getMessage()); + } catch (LoadException e) { + LOG.warn("put task to the rear of queue with error " + e.getMessage(), e); needScheduleTasksQueue.take(); needScheduleTasksQueue.put(routineLoadTaskInfo); needScheduleTaskNum--; @@ -128,21 +129,20 @@ private void process() throws LoadException, UserException, InterruptedException routineLoadJob.removeNeedScheduleTask(routineLoadTaskInfo); routineLoadTaskInfo.setLoadStartTimeMs(System.currentTimeMillis()); // add to batch task map - if (beIdTobatchTask.containsKey(beId)) { - beIdTobatchTask.get(beId).add(tRoutineLoadTask); + if (beIdTobatchTask.containsKey(routineLoadTaskInfo.getBeId())) { + beIdTobatchTask.get(routineLoadTaskInfo.getBeId()).add(tRoutineLoadTask); } else { List tRoutineLoadTaskList = Lists.newArrayList(); tRoutineLoadTaskList.add(tRoutineLoadTask); - beIdTobatchTask.put(beId, tRoutineLoadTaskList); + beIdTobatchTask.put(routineLoadTaskInfo.getBeId(), tRoutineLoadTaskList); } // count clusterIdleSlotNum--; scheduledTaskNum++; - routineLoadManager.addNumOfConcurrentTasksByBeId(beId); needScheduleTaskNum--; } submitBatchTask(beIdTobatchTask); - LOG.info("{} tasks have bean allocated to be.", scheduledTaskNum); + LOG.info("{} tasks have been allocated to be.", scheduledTaskNum); } private void submitBatchTask(Map> beIdToRoutineLoadTask) { @@ -167,4 +167,18 @@ private void submitBatchTask(Map> beIdToRoutineLoad } } + + // check if previous be has idle slot + // true: allocate previous be to task + // false: allocate the most idle be to task + private void allocateTaskToBe(RoutineLoadTaskInfo routineLoadTaskInfo, RoutineLoadJob routineLoadJob) + throws MetaNotFoundException, LoadException { + if (routineLoadTaskInfo.getPreviousBeId() != -1L) { + if (routineLoadManager.checkBeToTask(routineLoadTaskInfo.getPreviousBeId(), routineLoadJob.getClusterName())) { + routineLoadTaskInfo.setBeId(routineLoadTaskInfo.getPreviousBeId()); + return; + } + } + routineLoadTaskInfo.setBeId(routineLoadManager.getMinTaskBeId(routineLoadJob.getClusterName())); + } } diff --git a/fe/src/main/java/org/apache/doris/planner/StreamLoadScanNode.java b/fe/src/main/java/org/apache/doris/planner/StreamLoadScanNode.java index e0abee1b2e8ce6..4f722f8afb5f05 100644 --- a/fe/src/main/java/org/apache/doris/planner/StreamLoadScanNode.java +++ b/fe/src/main/java/org/apache/doris/planner/StreamLoadScanNode.java @@ -124,7 +124,7 @@ public void init(Analyzer analyzer) throws UserException { // columns: k1, k2, v1, v2=k1 + k2 // this means that there are three columns(k1, k2, v1) in source file, // and v2 is derived from (k1 + k2) - if (streamLoadTask.getColumnToColumnExpr() != null || streamLoadTask.getColumnToColumnExpr().size() != 0) { + if (streamLoadTask.getColumnToColumnExpr() != null && streamLoadTask.getColumnToColumnExpr().size() != 0) { for (Map.Entry entry : streamLoadTask.getColumnToColumnExpr().entrySet()) { // make column name case match with real column name String column = entry.getKey(); diff --git a/fe/src/main/java/org/apache/doris/qe/ShowExecutor.java b/fe/src/main/java/org/apache/doris/qe/ShowExecutor.java index 0bc7a8c74ae18d..a91a44bf38edd9 100644 --- a/fe/src/main/java/org/apache/doris/qe/ShowExecutor.java +++ b/fe/src/main/java/org/apache/doris/qe/ShowExecutor.java @@ -798,7 +798,7 @@ private void handleShowRoutineLoad() throws AnalysisException { RoutineLoadJob routineLoadJob = Catalog.getCurrentCatalog().getRoutineLoadManager().getJobByName(showRoutineLoadStmt.getName()); if (routineLoadJob == null) { - throw new AnalysisException("There is no routine load job with id " + showRoutineLoadStmt.getName()); + throw new AnalysisException("There is no routine load job with name " + showRoutineLoadStmt.getName()); } // check auth @@ -831,6 +831,7 @@ private void handleShowRoutineLoad() throws AnalysisException { row.add(routineLoadJob.getState().name()); row.add(routineLoadJob.getDesiredConcurrentNumber()); row.add(routineLoadJob.getProgress().toString()); + rows.add(row); resultSet = new ShowResultSet(showRoutineLoadStmt.getMetaData(), rows); } diff --git a/fe/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 2ef34910f9775f..7c52d52f2f5b33 100644 --- a/fe/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -688,7 +688,8 @@ private void loadTxnRollbackImpl(TLoadTxnRollbackRequest request) throws UserExc } Catalog.getCurrentGlobalTransactionMgr().abortTransaction(request.getTxnId(), - request.isSetReason() ? request.getReason() : "system cancel"); + request.isSetReason() ? request.getReason() : "system cancel", + TxnCommitAttachment.fromThrift(request.getTxnCommitAttachment())); } @Override diff --git a/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java b/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java index b022debb86e0f3..eece30350918c1 100644 --- a/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java +++ b/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java @@ -434,6 +434,10 @@ public boolean commitAndPublishTransaction(Database db, long transactionId, } public void abortTransaction(long transactionId, String reason) throws UserException { + abortTransaction(transactionId, reason, null); + } + + public void abortTransaction(long transactionId, String reason, TxnCommitAttachment txnCommitAttachment) throws UserException { if (transactionId < 0) { LOG.info("transaction id is {}, less than 0, maybe this is an old type load job, ignore abort operation", transactionId); return; @@ -449,7 +453,7 @@ public void abortTransaction(long transactionId, String reason) throws UserExcep } return; } - + public void abortTransaction(Long dbId, String label, String reason) throws UserException { Preconditions.checkNotNull(label); writeLock(); @@ -789,7 +793,7 @@ public void removeOldTransactions() { || !checkTxnHasRelatedJob(transactionState, dbIdToTxnIds)) { try { transactionState.setTransactionStatus(TransactionStatus.ABORTED, - TransactionState.TxnStatusChangeReason.TIMEOUT); + TransactionState.TxnStatusChangeReason.TIMEOUT.name()); } catch (TransactionException e) { LOG.warn("txn {} could not be aborted with error message {}", transactionState.getTransactionId(), e.getMessage()); @@ -877,7 +881,7 @@ private void unprotectAbortTransaction(long transactionId, String reason) throws } transactionState.setFinishTime(System.currentTimeMillis()); transactionState.setReason(reason); - transactionState.setTransactionStatus(TransactionStatus.ABORTED); + transactionState.setTransactionStatus(TransactionStatus.ABORTED, reason); unprotectUpsertTransactionState(transactionState); for (PublishVersionTask task : transactionState.getPublishVersionTasks().values()) { AgentTaskQueue.removeTask(task.getBackendId(), TTaskType.PUBLISH_VERSION, task.getSignature()); diff --git a/fe/src/main/java/org/apache/doris/transaction/TransactionState.java b/fe/src/main/java/org/apache/doris/transaction/TransactionState.java index b7e814ba48a647..eff96f9a91cc2a 100644 --- a/fe/src/main/java/org/apache/doris/transaction/TransactionState.java +++ b/fe/src/main/java/org/apache/doris/transaction/TransactionState.java @@ -309,7 +309,7 @@ public void setTransactionStatus(TransactionStatus transactionStatus) throws Tra setTransactionStatus(transactionStatus, null); } - public void setTransactionStatus(TransactionStatus transactionStatus, TxnStatusChangeReason txnStatusChangeReason) + public void setTransactionStatus(TransactionStatus transactionStatus, String txnStatusChangeReason) throws TransactionException { // before state changed if (txnStateChangeListener != null) { @@ -317,6 +317,8 @@ public void setTransactionStatus(TransactionStatus transactionStatus, TxnStatusC case ABORTED: txnStateChangeListener.beforeAborted(this, txnStatusChangeReason); break; + case COMMITTED: + txnStateChangeListener.beforeCommitted(this); default: break; } diff --git a/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java b/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java index c78908b697892f..3ac1c088ead059 100644 --- a/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java +++ b/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java @@ -110,6 +110,7 @@ public KafkaRoutineLoadJob fromCreateStmt(CreateRoutineLoadStmt stmt) { Assert.assertEquals(jobName, routineLoadJob.getName()); Assert.assertEquals(1L, routineLoadJob.getTableId()); Assert.assertEquals(RoutineLoadJob.JobState.NEED_SCHEDULE, routineLoadJob.getState()); + Assert.assertEquals(true, routineLoadJob instanceof KafkaRoutineLoadJob); Map>> dbToNameToRoutineLoadJob = Deencapsulation.getField(routineLoadManager, "dbToNameToRoutineLoadJob"); @@ -244,7 +245,7 @@ public void testGetMinTaskBeId() throws LoadException { }; RoutineLoadManager routineLoadManager = new RoutineLoadManager(); - routineLoadManager.addNumOfConcurrentTasksByBeId(1L); +// routineLoadManager.increaseNumOfConcurrentTasksByBeId(1L); Assert.assertEquals(2L, routineLoadManager.getMinTaskBeId("default")); } @@ -264,7 +265,7 @@ public void testGetTotalIdleTaskNum() { }; RoutineLoadManager routineLoadManager = new RoutineLoadManager(); - routineLoadManager.addNumOfConcurrentTasksByBeId(1L); +// routineLoadManager.increaseNumOfConcurrentTasksByBeId(1L); Assert.assertEquals(DEFAULT_BE_CONCURRENT_TASK_NUM * 2 - 1, routineLoadManager.getClusterIdleSlotNum()); } diff --git a/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java b/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java index 824cd29193d4be..18ae4c5c37349a 100644 --- a/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java +++ b/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java @@ -115,7 +115,7 @@ public void testRunOneCycle(@Injectable KafkaRoutineLoadJob kafkaRoutineLoadJob1 result = routineLoadTaskInfoQueue; routineLoadManager.getMinTaskBeId(anyString); result = beId; - routineLoadManager.getJobByTaskId(anyString); + routineLoadManager.getJobByTaskId((UUID) any); result = kafkaRoutineLoadJob1; routineLoadManager.getJob(anyLong); result = kafkaRoutineLoadJob1; @@ -147,8 +147,8 @@ public void testRunOneCycle(@Injectable KafkaRoutineLoadJob kafkaRoutineLoadJob1 Assert.assertEquals(200L, (long) ((KafkaRoutineLoadTask) routineLoadTask).getPartitionIdToOffset().get(2)); - routineLoadManager.addNumOfConcurrentTasksByBeId(beId); - times = 1; +// routineLoadManager.increaseNumOfConcurrentTasksByBeId(beId); +// times = 1; } }; } From d80a2f9aeeef829de353f7dd761ef66965bf3c04 Mon Sep 17 00:00:00 2001 From: emmymiao87 <522274284@qq.com> Date: Thu, 7 Mar 2019 19:14:31 +0800 Subject: [PATCH 2/2] Fix the incorrect grammer of debug log --- .../java/org/apache/doris/load/routineload/RoutineLoadJob.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java index dc690345bfb6a4..f48d0570975f72 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java @@ -508,7 +508,8 @@ public void beforeAborted(TransactionState txnState, String txnStatusChangeReaso try { String taskId = txnState.getLabel(); if (routineLoadTaskInfoList.parallelStream().anyMatch(entity -> entity.getId().toString().equals(taskId))) { - LOG.debug("there are a txn of routine load task will be aborted"); + LOG.debug("there is a txn{} of routine load task {} will be aborted", + txnState.getTransactionId(), taskId); } } finally { readUnlock();