diff --git a/be/src/runtime/stream_load/stream_load_context.h b/be/src/runtime/stream_load/stream_load_context.h index 11ff1f3355f362..4569051fd45b50 100644 --- a/be/src/runtime/stream_load/stream_load_context.h +++ b/be/src/runtime/stream_load/stream_load_context.h @@ -43,7 +43,8 @@ class KafkaLoadInfo { KafkaLoadInfo(const TKafkaLoadInfo& t_info): brokers(t_info.brokers), topic(t_info.topic), - begin_offset(t_info.partition_begin_offset) { + begin_offset(t_info.partition_begin_offset), + cmt_offset(t_info.partition_begin_offset) { if (t_info.__isset.max_interval_s) { max_interval_s = t_info.max_interval_s; } if (t_info.__isset.max_batch_rows) { max_batch_rows = t_info.max_batch_rows; } diff --git a/fe/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java b/fe/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java index 967de0ae0e7e23..6826f3aaaac11d 100644 --- a/fe/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java +++ b/fe/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java @@ -82,6 +82,7 @@ public class CreateRoutineLoadStmt extends DdlStmt { public static final String KAFKA_TOPIC_PROPERTY = "kafka_topic"; // optional public static final String KAFKA_PARTITIONS_PROPERTY = "kafka_partitions"; + public static final String KAFKA_OFFSETS_PROPERTY = "kafka_offsets"; private static final String NAME_TYPE = "ROUTINE LOAD NAME"; private static final String ENDPOINT_REGEX = "[-A-Za-z0-9+&@#/%?=~_|!:,.;]+[-A-Za-z0-9+&@#/%=~_|]"; @@ -96,6 +97,7 @@ public class CreateRoutineLoadStmt extends DdlStmt { .add(KAFKA_BROKER_LIST_PROPERTY) .add(KAFKA_TOPIC_PROPERTY) .add(KAFKA_PARTITIONS_PROPERTY) + .add(KAFKA_OFFSETS_PROPERTY) .build(); private final String name; @@ -113,6 +115,7 @@ public class CreateRoutineLoadStmt extends DdlStmt { private String kafkaBrokerList; private String kafkaTopic; private List kafkaPartitions; + private List kafkaOffsets; public CreateRoutineLoadStmt(String name, TableName dbTableName, List loadPropertyList, Map properties, @@ -170,6 +173,10 @@ public List getKafkaPartitions() { return kafkaPartitions; } + public List getKafkaOffsets(){ + return kafkaOffsets; + } + @Override public void analyze(Analyzer analyzer) throws AnalysisException, UserException { super.analyze(analyzer); @@ -243,30 +250,32 @@ private void checkLoadProperties(Analyzer analyzer) throws AnalysisException { } private void checkRoutineLoadProperties() throws AnalysisException { - Optional optional = properties.keySet().parallelStream() - .filter(entity -> !PROPERTIES_SET.contains(entity)).findFirst(); - if (optional.isPresent()) { - throw new AnalysisException(optional.get() + " is invalid property"); - } - - // check desired concurrent number - final String desiredConcurrentNumberString = properties.get(DESIRED_CONCURRENT_NUMBER_PROPERTY); - if (desiredConcurrentNumberString != null) { - desiredConcurrentNum = getIntegerValueFromString(desiredConcurrentNumberString, - DESIRED_CONCURRENT_NUMBER_PROPERTY); - if (desiredConcurrentNum <= 0) { - throw new AnalysisException(DESIRED_CONCURRENT_NUMBER_PROPERTY + " must be greater then 0"); + if (properties != null) { + Optional optional = properties.keySet().parallelStream() + .filter(entity -> !PROPERTIES_SET.contains(entity)).findFirst(); + if (optional.isPresent()) { + throw new AnalysisException(optional.get() + " is invalid property"); } - } - // check max error number - final String maxErrorNumberString = properties.get(MAX_ERROR_NUMBER_PROPERTY); - if (maxErrorNumberString != null) { - maxErrorNum = getIntegerValueFromString(maxErrorNumberString, MAX_ERROR_NUMBER_PROPERTY); - if (maxErrorNum < 0) { - throw new AnalysisException(MAX_ERROR_NUMBER_PROPERTY + " must be greater then or equal to 0"); + // check desired concurrent number + final String desiredConcurrentNumberString = properties.get(DESIRED_CONCURRENT_NUMBER_PROPERTY); + if (desiredConcurrentNumberString != null) { + desiredConcurrentNum = getIntegerValueFromString(desiredConcurrentNumberString, + DESIRED_CONCURRENT_NUMBER_PROPERTY); + if (desiredConcurrentNum <= 0) { + throw new AnalysisException(DESIRED_CONCURRENT_NUMBER_PROPERTY + " must be greater then 0"); + } } + // check max error number + final String maxErrorNumberString = properties.get(MAX_ERROR_NUMBER_PROPERTY); + if (maxErrorNumberString != null) { + maxErrorNum = getIntegerValueFromString(maxErrorNumberString, MAX_ERROR_NUMBER_PROPERTY); + if (maxErrorNum < 0) { + throw new AnalysisException(MAX_ERROR_NUMBER_PROPERTY + " must be greater then or equal to 0"); + } + + } } } @@ -326,6 +335,16 @@ private void checkKafkaCustomProperties() throws AnalysisException { } } } + // check offsets + // Todo(ml) + final String kafkaOffsetsString = customProperties.get(KAFKA_OFFSETS_PROPERTY); + if (kafkaOffsetsString != null) { + kafkaOffsets = new ArrayList<>(); + String[] kafkaOffsetsStringList = customProperties.get(KAFKA_OFFSETS_PROPERTY).split(","); + for (String s : kafkaOffsetsStringList) { + kafkaOffsets.add(Long.valueOf(s)); + } + } } private int getIntegerValueFromString(String valueString, String propertyName) throws AnalysisException { diff --git a/fe/src/main/java/org/apache/doris/load/TxnStateChangeListener.java b/fe/src/main/java/org/apache/doris/load/TxnStateChangeListener.java index d394c51897316d..8ec5d940dfe2e9 100644 --- a/fe/src/main/java/org/apache/doris/load/TxnStateChangeListener.java +++ b/fe/src/main/java/org/apache/doris/load/TxnStateChangeListener.java @@ -18,16 +18,19 @@ package org.apache.doris.load; import org.apache.doris.transaction.AbortTransactionException; +import org.apache.doris.transaction.TransactionException; import org.apache.doris.transaction.TransactionState; public interface TxnStateChangeListener { + void beforeCommitted(TransactionState txnState) throws TransactionException; + /** * update catalog of job which has related txn after transaction has been committed * * @param txnState */ - void onCommitted(TransactionState txnState); + void onCommitted(TransactionState txnState) throws TransactionException; /** * this interface is executed before txn aborted, you can check if txn could be abort in this stage @@ -37,7 +40,7 @@ public interface TxnStateChangeListener { * @throws AbortTransactionException if transaction could not be abort or there are some exception before aborted, * it will throw this exception */ - void beforeAborted(TransactionState txnState, TransactionState.TxnStatusChangeReason txnStatusChangeReason) + void beforeAborted(TransactionState txnState, String txnStatusChangeReason) throws AbortTransactionException; /** @@ -46,5 +49,5 @@ void beforeAborted(TransactionState txnState, TransactionState.TxnStatusChangeRe * @param txnState * @param txnStatusChangeReason maybe null */ - void onAborted(TransactionState txnState, TransactionState.TxnStatusChangeReason txnStatusChangeReason); + void onAborted(TransactionState txnState, String txnStatusChangeReason); } diff --git a/fe/src/main/java/org/apache/doris/load/routineload/KafkaProgress.java b/fe/src/main/java/org/apache/doris/load/routineload/KafkaProgress.java index 97097cfbbb75ec..c344f08c3f0e1a 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/KafkaProgress.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/KafkaProgress.java @@ -35,6 +35,7 @@ // {"partitionIdToOffset": {}} public class KafkaProgress extends RoutineLoadProgress { + // (partition id, begin offset) private Map partitionIdToOffset; public KafkaProgress() { @@ -57,7 +58,7 @@ public void setPartitionIdToOffset(Map partitionIdToOffset) { public void update(RoutineLoadProgress progress) { KafkaProgress newProgress = (KafkaProgress) progress; newProgress.getPartitionIdToOffset().entrySet().parallelStream() - .forEach(entity -> partitionIdToOffset.put(entity.getKey(), entity.getValue())); + .forEach(entity -> partitionIdToOffset.put(entity.getKey(), entity.getValue() + 1)); } @Override @@ -78,9 +79,15 @@ public void readFields(DataInput in) throws IOException { } } + // (partition id, end offset) + // end offset = -1 while begin offset of partition is 0 @Override public String toString() { + Map showPartitionIdToOffset = new HashMap<>(); + for (Map.Entry entry : partitionIdToOffset.entrySet()) { + showPartitionIdToOffset.put(entry.getKey(), entry.getValue() - 1); + } return "KafkaProgress [partitionIdToOffset=" - + Joiner.on("|").withKeyValueSeparator("_").join(partitionIdToOffset) + "]"; + + Joiner.on("|").withKeyValueSeparator("_").join(showPartitionIdToOffset) + "]"; } } diff --git a/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java b/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java index 44790073575b1e..cd81b0e7eb5e40 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java @@ -119,7 +119,7 @@ private void setCustomKafkaPartitions(List kafkaPartitions) throws Load } @Override - public List divideRoutineLoadJob(int currentConcurrentTaskNum) { + public void divideRoutineLoadJob(int currentConcurrentTaskNum) { List result = new ArrayList<>(); writeLock(); try { @@ -148,7 +148,6 @@ public List divideRoutineLoadJob(int currentConcurrentTaskN } finally { writeUnlock(); } - return result; } @Override @@ -261,9 +260,7 @@ public static KafkaRoutineLoadJob fromCreateStmt(CreateRoutineLoadStmt stmt) thr private void updateNewPartitionProgress() { // update the progress of new partitions for (Integer kafkaPartition : currentKafkaPartitions) { - if (((KafkaProgress) progress).getPartitionIdToOffset().containsKey(kafkaPartition)) { - ((KafkaProgress) progress).getPartitionIdToOffset().get(kafkaPartition); - } else { + if (!((KafkaProgress) progress).getPartitionIdToOffset().containsKey(kafkaPartition)) { ((KafkaProgress) progress).getPartitionIdToOffset().put(kafkaPartition, 0L); } } @@ -290,6 +287,12 @@ private void setOptional(CreateRoutineLoadStmt stmt) throws LoadException { } if (stmt.getKafkaPartitions() != null) { setCustomKafkaPartitions(stmt.getKafkaPartitions()); + if (stmt.getKafkaOffsets() != null) { + for (int i = 0; i < customKafkaPartitions.size(); i++) { + ((KafkaProgress) progress).getPartitionIdToOffset() + .put(customKafkaPartitions.get(i), stmt.getKafkaOffsets().get(i)); + } + } } } } diff --git a/fe/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java b/fe/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java index d3ea5f5ae06ce6..f961c41b5ac360 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java @@ -52,7 +52,7 @@ public KafkaTaskInfo(UUID id, long jobId) { public KafkaTaskInfo(KafkaTaskInfo kafkaTaskInfo) throws LabelAlreadyUsedException, BeginTransactionException, AnalysisException { - super(UUID.randomUUID(), kafkaTaskInfo.getJobId()); + super(UUID.randomUUID(), kafkaTaskInfo.getJobId(), kafkaTaskInfo.getPreviousBeId()); this.partitions = kafkaTaskInfo.getPartitions(); } @@ -109,6 +109,10 @@ private TExecPlanFragmentParams updateTExecPlanFragmentParams(RoutineLoadJob rou TExecPlanFragmentParams tExecPlanFragmentParams = routineLoadJob.gettExecPlanFragmentParams(); TPlanFragment tPlanFragment = tExecPlanFragmentParams.getFragment(); tPlanFragment.getOutput_sink().getOlap_table_sink().setTxn_id(this.txnId); + TUniqueId queryId = new TUniqueId(id.getMostSignificantBits(), id.getLeastSignificantBits()); + tExecPlanFragmentParams.getParams().setQuery_id(queryId); + tExecPlanFragmentParams.getParams().getPer_node_scan_ranges().values().stream() + .forEach(entity -> entity.get(0).getScan_range().getBroker_scan_range().getRanges().get(0).setLoad_id(queryId)); return tExecPlanFragmentParams; } } diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java index 524c577bbfb8b9..f48d0570975f72 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java @@ -17,6 +17,7 @@ package org.apache.doris.load.routineload; +import com.google.common.collect.Maps; import org.apache.doris.analysis.CreateRoutineLoadStmt; import org.apache.doris.analysis.TableName; import org.apache.doris.catalog.Catalog; @@ -41,6 +42,7 @@ import org.apache.doris.thrift.TResourceInfo; import org.apache.doris.transaction.AbortTransactionException; import org.apache.doris.transaction.BeginTransactionException; +import org.apache.doris.transaction.TransactionException; import org.apache.doris.transaction.TransactionState; import com.google.common.annotations.VisibleForTesting; @@ -54,7 +56,9 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.Optional; +import java.util.UUID; import java.util.concurrent.locks.ReentrantReadWriteLock; /** @@ -280,7 +284,7 @@ public String getPartitions() { } public String getClusterName() throws MetaNotFoundException { - Database database = Catalog.getCurrentCatalog().getDb(id); + Database database = Catalog.getCurrentCatalog().getDb(dbId); if (database == null) { throw new MetaNotFoundException("Database " + dbId + "has been deleted"); } @@ -363,16 +367,6 @@ public List processTimeoutTasks() { continue; } } - - try { - result.add(unprotectRenewTask(routineLoadTaskInfo)); - LOG.debug("Task {} was ran more then {} minutes. It was removed and rescheduled", - oldSignature, DEFAULT_TASK_TIMEOUT_SECONDS); - } catch (UserException e) { - state = JobState.CANCELLED; - // TODO(ml): edit log - LOG.warn("failed to renew a routine load task in job {} with error message {}", id, e.getMessage()); - } } } } finally { @@ -381,12 +375,32 @@ public List processTimeoutTasks() { return result; } - abstract List divideRoutineLoadJob(int currentConcurrentTaskNum); + abstract void divideRoutineLoadJob(int currentConcurrentTaskNum); public int calculateCurrentConcurrentTaskNum() throws MetaNotFoundException { return 0; } + public Map getBeIdToConcurrentTaskNum() { + Map beIdConcurrentTasksNum = Maps.newHashMap(); + readLock(); + try { + for (RoutineLoadTaskInfo routineLoadTaskInfo : routineLoadTaskInfoList) { + if (routineLoadTaskInfo.getBeId() != -1L) { + long beId = routineLoadTaskInfo.getBeId(); + if (beIdConcurrentTasksNum.containsKey(beId)) { + beIdConcurrentTasksNum.put(beId, beIdConcurrentTasksNum.get(beId) + 1); + } else { + beIdConcurrentTasksNum.put(beId, 1); + } + } + } + return beIdConcurrentTasksNum; + } finally { + readUnlock(); + } + } + @Override public void write(DataOutput out) throws IOException { // TODO(ml) @@ -409,10 +423,10 @@ public void removeNeedScheduleTask(RoutineLoadTaskInfo routineLoadTaskInfo) { // if rate of error data is more then max_filter_ratio, pause job protected void updateProgress(RLTaskTxnCommitAttachment attachment) { - updateNumOfData(attachment.getFilteredRows(), attachment.getLoadedRows()); + updateNumOfData(attachment.getFilteredRows(), attachment.getLoadedRows() + attachment.getFilteredRows()); } - public boolean containsTask(String taskId) { + public boolean containsTask(UUID taskId) { readLock(); try { return routineLoadTaskInfoList.parallelStream() @@ -488,22 +502,14 @@ public void plan() throws UserException { } @Override - public void beforeAborted(TransactionState txnState, TransactionState.TxnStatusChangeReason txnStatusChangeReason) + public void beforeAborted(TransactionState txnState, String txnStatusChangeReason) throws AbortTransactionException { readLock(); try { - if (txnStatusChangeReason != null) { - switch (txnStatusChangeReason) { - case TIMEOUT: - default: - String taskId = txnState.getLabel(); - if (routineLoadTaskInfoList.parallelStream().anyMatch(entity -> entity.getId().equals(taskId))) { - throw new AbortTransactionException( - "there are task " + taskId + " related to this txn, " - + "txn could not be abort", txnState.getTransactionId()); - } - break; - } + String taskId = txnState.getLabel(); + if (routineLoadTaskInfoList.parallelStream().anyMatch(entity -> entity.getId().toString().equals(taskId))) { + LOG.debug("there is a txn{} of routine load task {} will be aborted", + txnState.getTransactionId(), taskId); } } finally { readUnlock(); @@ -511,49 +517,99 @@ public void beforeAborted(TransactionState txnState, TransactionState.TxnStatusC } @Override - public void onCommitted(TransactionState txnState) { - // step0: get progress from transaction state - RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment = (RLTaskTxnCommitAttachment) txnState.getTxnCommitAttachment(); + public void beforeCommitted(TransactionState txnState) throws TransactionException { + readLock(); + try { + // check if task has been aborted + Optional routineLoadTaskInfoOptional = + routineLoadTaskInfoList.parallelStream() + .filter(entity -> entity.getId().toString().equals(txnState.getLabel())).findFirst(); + if (!routineLoadTaskInfoOptional.isPresent()) { + throw new TransactionException("txn " + txnState.getTransactionId() + " could not be committed" + + " while task " + txnState.getLabel() + "has been aborted "); + } + } finally { + readUnlock(); + } + } + @Override + public void onCommitted(TransactionState txnState) throws TransactionException { writeLock(); try { - // step1: find task in job + // step0: find task in job Optional routineLoadTaskInfoOptional = routineLoadTaskInfoList.parallelStream() - .filter(entity -> entity.getId().equals(txnState.getLabel())).findFirst(); + .filter(entity -> entity.getId().toString().equals(txnState.getLabel())).findFirst(); if (routineLoadTaskInfoOptional.isPresent()) { - RoutineLoadTaskInfo routineLoadTaskInfo = routineLoadTaskInfoOptional.get(); - - // step2: update job progress - updateProgress(rlTaskTxnCommitAttachment); - - if (state == JobState.RUNNING) { - // step3: create a new task for partitions - RoutineLoadTaskInfo newRoutineLoadTaskInfo = unprotectRenewTask(routineLoadTaskInfo); - Catalog.getCurrentCatalog().getRoutineLoadManager() - .getNeedScheduleTasksQueue().add(newRoutineLoadTaskInfo); - } + executeCommitTask(routineLoadTaskInfoOptional.get(), txnState); } else { LOG.debug("There is no {} task in task info list. Maybe task has been renew or job state has changed. " + " Transaction {} will not be committed", txnState.getLabel(), txnState.getTransactionId()); + throw new TransactionException("txn " + txnState.getTransactionId() + " could not be committed" + + " while task " + txnState.getLabel() + "has been aborted "); } + } catch (TransactionException e) { + LOG.warn(e.getMessage(), e); + throw e; } catch (Throwable e) { - LOG.error("failed to update offset in routine load task {} when transaction {} has been committed. " - + "change job to paused", - rlTaskTxnCommitAttachment.getTaskId(), txnState.getTransactionId(), e); + LOG.warn(e.getMessage(), e); updateState(JobState.PAUSED, "failed to update offset when transaction " - + txnState.getTransactionId() + " has been committed"); + + txnState.getTransactionId() + " has been committed"); } finally { writeUnlock(); } } + // txn will be aborted but progress will be update + // be will abort txn when all of kafka data is wrong + // progress will be update otherwise the progress will be hung @Override - public void onAborted(TransactionState txnState, TransactionState.TxnStatusChangeReason txnStatusChangeReason) { - updateState(JobState.PAUSED, txnStatusChangeReason.name()); - LOG.debug("job {} need to be pause while txn {} abort with reason {}", - id, txnState.getTransactionId(), txnStatusChangeReason.name()); + public void onAborted(TransactionState txnState, String txnStatusChangeReason) { + if (txnStatusChangeReason != null) { + LOG.debug("task will be reschedule when txn {} abort with reason {}", txnState.getTransactionId(), + txnStatusChangeReason); + } else { + LOG.debug("task will be reschedule when txn {} abort", txnState.getTransactionId()); + } + writeLock(); + try { + // step0: find task in job + Optional routineLoadTaskInfoOptional = + routineLoadTaskInfoList.parallelStream() + .filter(entity -> entity.getId().toString().equals(txnState.getLabel())).findFirst(); + if (routineLoadTaskInfoOptional.isPresent()) { + executeCommitTask(routineLoadTaskInfoOptional.get(), txnState); + } else { + LOG.debug("There is no {} task in task info list. Maybe task has been renew or job state has changed. " + + " Transaction {} will be aborted successfully", + txnState.getLabel(), txnState.getTransactionId()); + } + } catch (Exception e) { + updateState(JobState.PAUSED, + "failed to renew task when txn has been aborted with error " + e.getMessage()); + // TODO(ml): edit log + LOG.warn("failed to renew a routine load task in job {} with error message {}", id, e.getMessage()); + } finally { + writeUnlock(); + } + } + + // check task exists or not before call method + private void executeCommitTask(RoutineLoadTaskInfo routineLoadTaskInfo, TransactionState txnState) + throws LabelAlreadyUsedException, BeginTransactionException, AnalysisException { + // step0: get progress from transaction state + RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment = (RLTaskTxnCommitAttachment) txnState.getTxnCommitAttachment(); + // step1: update job progress + updateProgress(rlTaskTxnCommitAttachment); + + if (state == JobState.RUNNING) { + // step2: create a new task for partitions + RoutineLoadTaskInfo newRoutineLoadTaskInfo = unprotectRenewTask(routineLoadTaskInfo); + Catalog.getCurrentCatalog().getRoutineLoadManager() + .getNeedScheduleTasksQueue().add(newRoutineLoadTaskInfo); + } } protected static void unprotectCheckCreate(CreateRoutineLoadStmt stmt) throws AnalysisException { diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java index 4c5b938446d7ed..72fa8d0461ecbf 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java @@ -17,9 +17,11 @@ package org.apache.doris.load.routineload; +import com.google.common.base.Joiner; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Queues; +import com.sleepycat.je.tree.IN; import org.apache.doris.analysis.CreateRoutineLoadStmt; import org.apache.doris.analysis.PauseRoutineLoadStmt; import org.apache.doris.analysis.ResumeRoutineLoadStmt; @@ -53,7 +55,7 @@ public class RoutineLoadManager { // Long is beId, integer is the size of tasks in be private Map beIdToMaxConcurrentTasks; - private Map beIdToConcurrentTasks; +// private Map beIdToConcurrentTasks; // stream load job meta private Map idToRoutineLoadJob; @@ -82,7 +84,7 @@ private void writeUnlock() { public RoutineLoadManager() { idToRoutineLoadJob = Maps.newConcurrentMap(); dbToNameToRoutineLoadJob = Maps.newConcurrentMap(); - beIdToConcurrentTasks = Maps.newHashMap(); +// beIdToConcurrentTasks = Maps.newHashMap(); beIdToMaxConcurrentTasks = Maps.newHashMap(); needScheduleTasksQueue = Queues.newLinkedBlockingQueue(); lock = new ReentrantReadWriteLock(true); @@ -109,6 +111,7 @@ public int getTotalMaxConcurrentTaskNum() { public void updateBeIdTaskMaps() { writeLock(); try { + // step1: update backend number in all of cluster updateBeIdToMaxConcurrentTasks(); List beIds = Catalog.getCurrentSystemInfo().getBackendIds(true); @@ -121,7 +124,6 @@ public void updateBeIdTaskMaps() { newBeIds.parallelStream().forEach(entity -> beIdToMaxConcurrentTasks.put(entity, DEFAULT_BE_CONCURRENT_TASK_NUM)); for (long beId : unavailableBeIds) { beIdToMaxConcurrentTasks.remove(beId); - beIdToConcurrentTasks.remove(beId); } LOG.info("There are {} backends which participate in routine load scheduler. " + "There are {} new backends and {} unavailable backends for routine load", @@ -131,19 +133,22 @@ public void updateBeIdTaskMaps() { } } - public void addNumOfConcurrentTasksByBeId(long beId) { - writeLock(); - try { - if (beIdToConcurrentTasks.get(beId) == null) { - beIdToConcurrentTasks.put(beId, 1); - } else { - int concurrentTaskNum = (int) beIdToConcurrentTasks.get(beId); - concurrentTaskNum++; - beIdToConcurrentTasks.put(beId, concurrentTaskNum); + private Map getBeIdConcurrentTaskMaps() { + Map beIdToConcurrentTasks = Maps.newHashMap(); + for (RoutineLoadJob routineLoadJob : getRoutineLoadJobByState(RoutineLoadJob.JobState.RUNNING)) { + Map jobBeIdToConcurrentTaskNum = routineLoadJob.getBeIdToConcurrentTaskNum(); + for (Map.Entry entry : jobBeIdToConcurrentTaskNum.entrySet()) { + if (beIdToConcurrentTasks.containsKey(entry.getKey())) { + beIdToConcurrentTasks.put(entry.getKey(), beIdToConcurrentTasks.get(entry.getKey()) + entry.getValue()); + } else { + beIdToConcurrentTasks.put(entry.getKey(), entry.getValue()); + } } - } finally { - writeUnlock(); } + LOG.debug("beIdToConcurrentTasks is {}", Joiner.on(",") + .withKeyValueSeparator(":").join(beIdToConcurrentTasks)); + return beIdToConcurrentTasks; + } public void addRoutineLoadJob(CreateRoutineLoadStmt createRoutineLoadStmt) @@ -320,11 +325,12 @@ public int getClusterIdleSlotNum() { try { int result = 0; updateBeIdToMaxConcurrentTasks(); + Map beIdToConcurrentTasks = getBeIdConcurrentTaskMaps(); for (Map.Entry entry : beIdToMaxConcurrentTasks.entrySet()) { - if (beIdToConcurrentTasks.get(entry.getKey()) == null) { - result += entry.getValue(); - } else { + if (beIdToConcurrentTasks.containsKey(entry.getKey())) { result += entry.getValue() - beIdToConcurrentTasks.get(entry.getKey()); + } else { + result += entry.getValue(); } } return result; @@ -344,6 +350,7 @@ public long getMinTaskBeId(String clusterName) throws LoadException { long result = -1L; int maxIdleSlotNum = 0; updateBeIdToMaxConcurrentTasks(); + Map beIdToConcurrentTasks = getBeIdConcurrentTaskMaps(); for (Long beId : beIdsInCluster) { int idleTaskNum = 0; if (beIdToConcurrentTasks.containsKey(beId)) { @@ -351,6 +358,8 @@ public long getMinTaskBeId(String clusterName) throws LoadException { } else { idleTaskNum = DEFAULT_BE_CONCURRENT_TASK_NUM; } + LOG.debug("be {} has idle {}, concurrent task {}, max concurrent task {}", beId, idleTaskNum, + beIdToConcurrentTasks.get(beId), beIdToMaxConcurrentTasks.get(beId)); result = maxIdleSlotNum < idleTaskNum ? beId : result; maxIdleSlotNum = Math.max(maxIdleSlotNum, idleTaskNum); } @@ -363,6 +372,36 @@ public long getMinTaskBeId(String clusterName) throws LoadException { } } + public boolean checkBeToTask(long beId, String clusterName) throws LoadException { + List beIdsInCluster = Catalog.getCurrentSystemInfo().getClusterBackendIds(clusterName); + if (beIdsInCluster == null) { + throw new LoadException("The " + clusterName + " has been deleted"); + } + + if (!beIdsInCluster.contains(beId)) { + LOG.debug("the previous be id {} does not belong to cluster name {}", beId, clusterName); + return false; + } + + // check if be has idle slot + readLock(); + try { + int idleTaskNum = 0; + Map beIdToConcurrentTasks = getBeIdConcurrentTaskMaps(); + if (beIdToConcurrentTasks.containsKey(beId)) { + idleTaskNum = beIdToMaxConcurrentTasks.get(beId) - beIdToConcurrentTasks.get(beId); + } else { + idleTaskNum = DEFAULT_BE_CONCURRENT_TASK_NUM; + } + if (idleTaskNum > 0) { + return true; + } + return false; + } finally { + readUnlock(); + } + } + public RoutineLoadJob getJob(long jobId) { return idToRoutineLoadJob.get(jobId); } @@ -385,7 +424,7 @@ public RoutineLoadJob getJobByName(String jobName) { } Optional optional = routineLoadJobList.parallelStream() .filter(entity -> !entity.getState().isFinalState()).findFirst(); - if (optional.isPresent()) { + if (!optional.isPresent()) { return null; } return optional.get(); @@ -394,7 +433,7 @@ public RoutineLoadJob getJobByName(String jobName) { } } - public RoutineLoadJob getJobByTaskId(String taskId) throws MetaNotFoundException { + public RoutineLoadJob getJobByTaskId(UUID taskId) throws MetaNotFoundException { for (RoutineLoadJob routineLoadJob : idToRoutineLoadJob.values()) { if (routineLoadJob.containsTask(taskId)) { return routineLoadJob; @@ -411,12 +450,10 @@ public List getRoutineLoadJobByState(RoutineLoadJob.JobState job return stateJobs; } - public List processTimeoutTasks() { - List routineLoadTaskInfoList = new ArrayList<>(); + public void processTimeoutTasks() { for (RoutineLoadJob routineLoadJob : idToRoutineLoadJob.values()) { - routineLoadTaskInfoList.addAll(routineLoadJob.processTimeoutTasks()); + routineLoadJob.processTimeoutTasks(); } - return routineLoadTaskInfoList; } // Remove old routine load jobs from idToRoutineLoadJob diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadScheduler.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadScheduler.java index e8e744d22848cf..20f4a005b93ee3 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadScheduler.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadScheduler.java @@ -96,8 +96,7 @@ private void process() { LOG.debug("begin to check timeout tasks"); // check timeout tasks - List rescheduleTasksList = routineLoadManager.processTimeoutTasks(); - routineLoadManager.addTasksToNeedScheduleQueue(rescheduleTasksList); + routineLoadManager.processTimeoutTasks(); } private List getNeedScheduleRoutineJobs() throws LoadException { diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java index 3fa4ce1efccda5..4d0af1a5c535a2 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java @@ -23,8 +23,6 @@ import org.apache.doris.common.LabelAlreadyUsedException; import org.apache.doris.common.LoadException; import org.apache.doris.common.UserException; -import org.apache.doris.task.RoutineLoadTask; -import org.apache.doris.thrift.TExecPlanFragmentParams; import org.apache.doris.thrift.TRoutineLoadTask; import org.apache.doris.transaction.BeginTransactionException; import org.apache.doris.transaction.TransactionState; @@ -46,13 +44,23 @@ public abstract class RoutineLoadTaskInfo { protected long jobId; private long createTimeMs; private long loadStartTimeMs; - private TExecPlanFragmentParams tExecPlanFragmentParams; + // the be id of previous task + protected long previousBeId = -1L; + // the be id of this task + protected long beId = -1L; public RoutineLoadTaskInfo(UUID id, long jobId) { this.id = id; this.jobId = jobId; this.createTimeMs = System.currentTimeMillis(); } + + public RoutineLoadTaskInfo(UUID id, long jobId, long previousBeId) { + this.id = id; + this.jobId = jobId; + this.createTimeMs = System.currentTimeMillis(); + this.previousBeId = previousBeId; + } public UUID getId() { return id; @@ -65,7 +73,19 @@ public long getJobId() { public void setLoadStartTimeMs(long loadStartTimeMs) { this.loadStartTimeMs = loadStartTimeMs; } - + + public long getPreviousBeId() { + return previousBeId; + } + + public void setBeId(long beId) { + this.beId = beId; + } + + public long getBeId() { + return beId; + } + public long getLoadStartTimeMs() { return loadStartTimeMs; } diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java index 9141c2e8f3ed7f..0c56b40c452075 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java @@ -98,17 +98,18 @@ private void process() throws LoadException, UserException, InterruptedException RoutineLoadTaskInfo routineLoadTaskInfo = needScheduleTasksQueue.peek(); RoutineLoadJob routineLoadJob = null; try { - routineLoadJob = routineLoadManager.getJobByTaskId(routineLoadTaskInfo.getId().toString()); + routineLoadJob = routineLoadManager.getJobByTaskId(routineLoadTaskInfo.getId()); + allocateTaskToBe(routineLoadTaskInfo, routineLoadJob); + routineLoadTaskInfo.beginTxn(); } catch (MetaNotFoundException e) { - LOG.warn("task {} has been abandoned", routineLoadTaskInfo.getId()); + needScheduleTasksQueue.take(); + // task has been abandoned while renew task has been added in queue + // or database has been deleted + LOG.warn("task {} has been abandoned with error message {}", + routineLoadTaskInfo.getId(), e.getMessage(), e); return; - } - long beId; - try { - beId = routineLoadManager.getMinTaskBeId(routineLoadJob.getClusterName()); - routineLoadTaskInfo.beginTxn(); - } catch (Exception e) { - LOG.warn("put task to the rear of queue with error " + e.getMessage()); + } catch (LoadException e) { + LOG.warn("put task to the rear of queue with error " + e.getMessage(), e); needScheduleTasksQueue.take(); needScheduleTasksQueue.put(routineLoadTaskInfo); needScheduleTaskNum--; @@ -128,21 +129,20 @@ private void process() throws LoadException, UserException, InterruptedException routineLoadJob.removeNeedScheduleTask(routineLoadTaskInfo); routineLoadTaskInfo.setLoadStartTimeMs(System.currentTimeMillis()); // add to batch task map - if (beIdTobatchTask.containsKey(beId)) { - beIdTobatchTask.get(beId).add(tRoutineLoadTask); + if (beIdTobatchTask.containsKey(routineLoadTaskInfo.getBeId())) { + beIdTobatchTask.get(routineLoadTaskInfo.getBeId()).add(tRoutineLoadTask); } else { List tRoutineLoadTaskList = Lists.newArrayList(); tRoutineLoadTaskList.add(tRoutineLoadTask); - beIdTobatchTask.put(beId, tRoutineLoadTaskList); + beIdTobatchTask.put(routineLoadTaskInfo.getBeId(), tRoutineLoadTaskList); } // count clusterIdleSlotNum--; scheduledTaskNum++; - routineLoadManager.addNumOfConcurrentTasksByBeId(beId); needScheduleTaskNum--; } submitBatchTask(beIdTobatchTask); - LOG.info("{} tasks have bean allocated to be.", scheduledTaskNum); + LOG.info("{} tasks have been allocated to be.", scheduledTaskNum); } private void submitBatchTask(Map> beIdToRoutineLoadTask) { @@ -167,4 +167,18 @@ private void submitBatchTask(Map> beIdToRoutineLoad } } + + // check if previous be has idle slot + // true: allocate previous be to task + // false: allocate the most idle be to task + private void allocateTaskToBe(RoutineLoadTaskInfo routineLoadTaskInfo, RoutineLoadJob routineLoadJob) + throws MetaNotFoundException, LoadException { + if (routineLoadTaskInfo.getPreviousBeId() != -1L) { + if (routineLoadManager.checkBeToTask(routineLoadTaskInfo.getPreviousBeId(), routineLoadJob.getClusterName())) { + routineLoadTaskInfo.setBeId(routineLoadTaskInfo.getPreviousBeId()); + return; + } + } + routineLoadTaskInfo.setBeId(routineLoadManager.getMinTaskBeId(routineLoadJob.getClusterName())); + } } diff --git a/fe/src/main/java/org/apache/doris/planner/StreamLoadScanNode.java b/fe/src/main/java/org/apache/doris/planner/StreamLoadScanNode.java index e0abee1b2e8ce6..4f722f8afb5f05 100644 --- a/fe/src/main/java/org/apache/doris/planner/StreamLoadScanNode.java +++ b/fe/src/main/java/org/apache/doris/planner/StreamLoadScanNode.java @@ -124,7 +124,7 @@ public void init(Analyzer analyzer) throws UserException { // columns: k1, k2, v1, v2=k1 + k2 // this means that there are three columns(k1, k2, v1) in source file, // and v2 is derived from (k1 + k2) - if (streamLoadTask.getColumnToColumnExpr() != null || streamLoadTask.getColumnToColumnExpr().size() != 0) { + if (streamLoadTask.getColumnToColumnExpr() != null && streamLoadTask.getColumnToColumnExpr().size() != 0) { for (Map.Entry entry : streamLoadTask.getColumnToColumnExpr().entrySet()) { // make column name case match with real column name String column = entry.getKey(); diff --git a/fe/src/main/java/org/apache/doris/qe/ShowExecutor.java b/fe/src/main/java/org/apache/doris/qe/ShowExecutor.java index 0bc7a8c74ae18d..a91a44bf38edd9 100644 --- a/fe/src/main/java/org/apache/doris/qe/ShowExecutor.java +++ b/fe/src/main/java/org/apache/doris/qe/ShowExecutor.java @@ -798,7 +798,7 @@ private void handleShowRoutineLoad() throws AnalysisException { RoutineLoadJob routineLoadJob = Catalog.getCurrentCatalog().getRoutineLoadManager().getJobByName(showRoutineLoadStmt.getName()); if (routineLoadJob == null) { - throw new AnalysisException("There is no routine load job with id " + showRoutineLoadStmt.getName()); + throw new AnalysisException("There is no routine load job with name " + showRoutineLoadStmt.getName()); } // check auth @@ -831,6 +831,7 @@ private void handleShowRoutineLoad() throws AnalysisException { row.add(routineLoadJob.getState().name()); row.add(routineLoadJob.getDesiredConcurrentNumber()); row.add(routineLoadJob.getProgress().toString()); + rows.add(row); resultSet = new ShowResultSet(showRoutineLoadStmt.getMetaData(), rows); } diff --git a/fe/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 2ef34910f9775f..7c52d52f2f5b33 100644 --- a/fe/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -688,7 +688,8 @@ private void loadTxnRollbackImpl(TLoadTxnRollbackRequest request) throws UserExc } Catalog.getCurrentGlobalTransactionMgr().abortTransaction(request.getTxnId(), - request.isSetReason() ? request.getReason() : "system cancel"); + request.isSetReason() ? request.getReason() : "system cancel", + TxnCommitAttachment.fromThrift(request.getTxnCommitAttachment())); } @Override diff --git a/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java b/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java index b022debb86e0f3..eece30350918c1 100644 --- a/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java +++ b/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java @@ -434,6 +434,10 @@ public boolean commitAndPublishTransaction(Database db, long transactionId, } public void abortTransaction(long transactionId, String reason) throws UserException { + abortTransaction(transactionId, reason, null); + } + + public void abortTransaction(long transactionId, String reason, TxnCommitAttachment txnCommitAttachment) throws UserException { if (transactionId < 0) { LOG.info("transaction id is {}, less than 0, maybe this is an old type load job, ignore abort operation", transactionId); return; @@ -449,7 +453,7 @@ public void abortTransaction(long transactionId, String reason) throws UserExcep } return; } - + public void abortTransaction(Long dbId, String label, String reason) throws UserException { Preconditions.checkNotNull(label); writeLock(); @@ -789,7 +793,7 @@ public void removeOldTransactions() { || !checkTxnHasRelatedJob(transactionState, dbIdToTxnIds)) { try { transactionState.setTransactionStatus(TransactionStatus.ABORTED, - TransactionState.TxnStatusChangeReason.TIMEOUT); + TransactionState.TxnStatusChangeReason.TIMEOUT.name()); } catch (TransactionException e) { LOG.warn("txn {} could not be aborted with error message {}", transactionState.getTransactionId(), e.getMessage()); @@ -877,7 +881,7 @@ private void unprotectAbortTransaction(long transactionId, String reason) throws } transactionState.setFinishTime(System.currentTimeMillis()); transactionState.setReason(reason); - transactionState.setTransactionStatus(TransactionStatus.ABORTED); + transactionState.setTransactionStatus(TransactionStatus.ABORTED, reason); unprotectUpsertTransactionState(transactionState); for (PublishVersionTask task : transactionState.getPublishVersionTasks().values()) { AgentTaskQueue.removeTask(task.getBackendId(), TTaskType.PUBLISH_VERSION, task.getSignature()); diff --git a/fe/src/main/java/org/apache/doris/transaction/TransactionState.java b/fe/src/main/java/org/apache/doris/transaction/TransactionState.java index b7e814ba48a647..eff96f9a91cc2a 100644 --- a/fe/src/main/java/org/apache/doris/transaction/TransactionState.java +++ b/fe/src/main/java/org/apache/doris/transaction/TransactionState.java @@ -309,7 +309,7 @@ public void setTransactionStatus(TransactionStatus transactionStatus) throws Tra setTransactionStatus(transactionStatus, null); } - public void setTransactionStatus(TransactionStatus transactionStatus, TxnStatusChangeReason txnStatusChangeReason) + public void setTransactionStatus(TransactionStatus transactionStatus, String txnStatusChangeReason) throws TransactionException { // before state changed if (txnStateChangeListener != null) { @@ -317,6 +317,8 @@ public void setTransactionStatus(TransactionStatus transactionStatus, TxnStatusC case ABORTED: txnStateChangeListener.beforeAborted(this, txnStatusChangeReason); break; + case COMMITTED: + txnStateChangeListener.beforeCommitted(this); default: break; } diff --git a/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java b/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java index c78908b697892f..3ac1c088ead059 100644 --- a/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java +++ b/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java @@ -110,6 +110,7 @@ public KafkaRoutineLoadJob fromCreateStmt(CreateRoutineLoadStmt stmt) { Assert.assertEquals(jobName, routineLoadJob.getName()); Assert.assertEquals(1L, routineLoadJob.getTableId()); Assert.assertEquals(RoutineLoadJob.JobState.NEED_SCHEDULE, routineLoadJob.getState()); + Assert.assertEquals(true, routineLoadJob instanceof KafkaRoutineLoadJob); Map>> dbToNameToRoutineLoadJob = Deencapsulation.getField(routineLoadManager, "dbToNameToRoutineLoadJob"); @@ -244,7 +245,7 @@ public void testGetMinTaskBeId() throws LoadException { }; RoutineLoadManager routineLoadManager = new RoutineLoadManager(); - routineLoadManager.addNumOfConcurrentTasksByBeId(1L); +// routineLoadManager.increaseNumOfConcurrentTasksByBeId(1L); Assert.assertEquals(2L, routineLoadManager.getMinTaskBeId("default")); } @@ -264,7 +265,7 @@ public void testGetTotalIdleTaskNum() { }; RoutineLoadManager routineLoadManager = new RoutineLoadManager(); - routineLoadManager.addNumOfConcurrentTasksByBeId(1L); +// routineLoadManager.increaseNumOfConcurrentTasksByBeId(1L); Assert.assertEquals(DEFAULT_BE_CONCURRENT_TASK_NUM * 2 - 1, routineLoadManager.getClusterIdleSlotNum()); } diff --git a/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java b/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java index 824cd29193d4be..18ae4c5c37349a 100644 --- a/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java +++ b/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java @@ -115,7 +115,7 @@ public void testRunOneCycle(@Injectable KafkaRoutineLoadJob kafkaRoutineLoadJob1 result = routineLoadTaskInfoQueue; routineLoadManager.getMinTaskBeId(anyString); result = beId; - routineLoadManager.getJobByTaskId(anyString); + routineLoadManager.getJobByTaskId((UUID) any); result = kafkaRoutineLoadJob1; routineLoadManager.getJob(anyLong); result = kafkaRoutineLoadJob1; @@ -147,8 +147,8 @@ public void testRunOneCycle(@Injectable KafkaRoutineLoadJob kafkaRoutineLoadJob1 Assert.assertEquals(200L, (long) ((KafkaRoutineLoadTask) routineLoadTask).getPartitionIdToOffset().get(2)); - routineLoadManager.addNumOfConcurrentTasksByBeId(beId); - times = 1; +// routineLoadManager.increaseNumOfConcurrentTasksByBeId(beId); +// times = 1; } }; }