diff --git a/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java b/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java index 757f3bbd10f222..08e03f0a956ebb 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java @@ -169,10 +169,15 @@ public int calculateCurrentConcurrentTaskNum() throws MetaNotFoundException { return Math.min(Math.min(partitionNum, Math.min(desireTaskConcurrentNum, aliveBeNum)), DEFAULT_TASK_MAX_CONCURRENT_NUM); } + // partitionIdToOffset must be not empty when loaded rows > 0 + // situation1: be commit txn but fe throw error when committing txn, + // fe rollback txn without partitionIdToOffset by itself + // this task should not be commit + // otherwise currentErrorNum and currentTotalNum is updated when progress is not updated @Override boolean checkCommitInfo(RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment) { if (rlTaskTxnCommitAttachment.getLoadedRows() > 0 - && ((KafkaProgress) rlTaskTxnCommitAttachment.getProgress()).getPartitionIdToOffset().size() == 0) { + && ((KafkaProgress) rlTaskTxnCommitAttachment.getProgress()).getPartitionIdToOffset().isEmpty()) { LOG.warn(new LogBuilder(LogKey.ROUINTE_LOAD_TASK, DebugUtil.printId(rlTaskTxnCommitAttachment.getTaskId())) .add("job_id", id) .add("loaded_rows", rlTaskTxnCommitAttachment.getLoadedRows()) @@ -223,7 +228,13 @@ protected boolean unprotectNeedReschedule() { try { newCurrentKafkaPartition = getAllKafkaPartitions(); } catch (Exception e) { - LOG.warn("Job {} failed to fetch all current partition", id); + LOG.warn(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id) + .add("error_msg", "Job failed to fetch all current partition with error " + e.getMessage()) + .build(), e); + if (this.state == JobState.NEED_SCHEDULE) { + unprotectUpdateState(JobState.PAUSED, + "Job failed to fetch all current partition with error " + e.getMessage()); + } return false; } if (currentKafkaPartitions.containsAll(newCurrentKafkaPartition)) { diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java index f0ee4dbfd22f79..09a49fba96695d 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java @@ -137,6 +137,7 @@ public boolean isFinalState() { protected RoutineLoadProgress progress; protected String pausedReason; protected String cancelReason; + protected long endTimestamp; // currentErrorNum and currentTotalNum will be update // when currentTotalNum is more then ten thousand or currentErrorNum is more then maxErrorNum @@ -160,6 +161,7 @@ public RoutineLoadJob(String name, long dbId, long tableId, LoadDataSourceType d this.state = JobState.NEED_SCHEDULE; this.dataSourceType = dataSourceType; this.resourceInfo = ConnectContext.get().toResourceCtx(); + this.endTimestamp = -1; this.authCode = new StringBuilder().append(ConnectContext.get().getQualifiedUser()) .append(ConnectContext.get().getRemoteIP()) .append(id).append(System.currentTimeMillis()).toString().hashCode(); @@ -183,6 +185,7 @@ public RoutineLoadJob(long id, String name, long dbId, long tableId, this.dataSourceType = dataSourceType; this.maxErrorNum = maxErrorNum; this.resourceInfo = ConnectContext.get().toResourceCtx(); + this.endTimestamp = -1; this.routineLoadTaskInfoList = new ArrayList<>(); lock = new ReentrantReadWriteLock(true); } @@ -254,6 +257,10 @@ public long getAuthCode() { return authCode; } + public long getEndTimestamp() { + return endTimestamp; + } + // this is a unprotected method which is called in the initialization function protected void setRoutineLoadDesc(RoutineLoadDesc routineLoadDesc) throws LoadException { if (this.routineLoadDesc != null) { @@ -343,6 +350,7 @@ public TExecPlanFragmentParams gettExecPlanFragmentParams() { // only check loading task public List processTimeoutTasks() { List result = new ArrayList<>(); + List timeoutTaskList = new ArrayList<>(); writeLock(); try { List runningTasks = new ArrayList<>(routineLoadTaskInfoList); @@ -350,24 +358,28 @@ public List processTimeoutTasks() { if ((routineLoadTaskInfo.getLoadStartTimeMs() != 0L) && ((System.currentTimeMillis() - routineLoadTaskInfo.getLoadStartTimeMs()) > DEFAULT_TASK_TIMEOUT_SECONDS * 1000)) { - UUID oldTaskId = routineLoadTaskInfo.getId(); - // abort txn if not committed - try { - Catalog.getCurrentGlobalTransactionMgr() - .abortTransaction(routineLoadTaskInfo.getTxnId(), "routine load task of txn was timeout"); - } catch (UserException e) { - if (e.getMessage().contains("committed")) { - LOG.debug(new LogBuilder(LogKey.ROUINTE_LOAD_TASK, oldTaskId) - .add("msg", "txn of task has been committed when checking timeout") - .build()); - continue; - } - } + timeoutTaskList.add(routineLoadTaskInfo); } } } finally { writeUnlock(); } + + for (RoutineLoadTaskInfo routineLoadTaskInfo : timeoutTaskList) { + UUID oldTaskId = routineLoadTaskInfo.getId(); + // abort txn if not committed + try { + Catalog.getCurrentGlobalTransactionMgr() + .abortTransaction(routineLoadTaskInfo.getTxnId(), "routine load task of txn was timeout"); + } catch (UserException e) { + if (e.getMessage().contains("committed")) { + LOG.debug(new LogBuilder(LogKey.ROUINTE_LOAD_TASK, oldTaskId) + .add("msg", "txn of task has been committed when checking timeout") + .build(), e); + continue; + } + } + } return result; } @@ -510,6 +522,10 @@ public void plan() throws UserException { @Override public void beforeAborted(TransactionState txnState, String txnStatusChangeReason) throws AbortTransactionException { + LOG.debug(new LogBuilder(LogKey.ROUINTE_LOAD_TASK, txnState.getLabel()) + .add("txn_state", txnState) + .add("msg", "task before aborted") + .build()); readLock(); try { String taskId = txnState.getLabel(); @@ -526,6 +542,10 @@ public void beforeAborted(TransactionState txnState, String txnStatusChangeReaso @Override public void beforeCommitted(TransactionState txnState) throws TransactionException { + LOG.debug(new LogBuilder(LogKey.ROUINTE_LOAD_TASK, txnState.getLabel()) + .add("txn_state", txnState) + .add("msg", "task before committed") + .build()); readLock(); try { // check if task has been aborted @@ -578,17 +598,7 @@ public void onCommitted(TransactionState txnState) throws TransactionException { // txn will be aborted but progress will be update // progress will be update otherwise the progress will be hung @Override - public void onAborted(TransactionState txnState, String txnStatusChangeReason) { - if (txnStatusChangeReason != null) { - LOG.debug(new LogBuilder(LogKey.ROUINTE_LOAD_TASK, txnState.getLabel()) - .add("txn_id", txnState.getTransactionId()) - .add("msg", "txn abort with reason " + txnStatusChangeReason) - .build()); - } else { - LOG.debug(new LogBuilder(LogKey.ROUINTE_LOAD_TASK, txnState.getLabel()) - .add("txn_id", txnState.getTransactionId()) - .add("msg", "txn abort").build()); - } + public void onAborted(TransactionState txnState, String txnStatusChangeReasonString) { writeLock(); try { // step0: find task in job @@ -596,7 +606,30 @@ public void onAborted(TransactionState txnState, String txnStatusChangeReason) { routineLoadTaskInfoList.parallelStream() .filter(entity -> DebugUtil.printId(entity.getId()).equals(txnState.getLabel())).findFirst(); if (routineLoadTaskInfoOptional.isPresent()) { - // todo(ml): use previous be id depend on change reason + // step1: job state will be changed depending on txnStatusChangeReasonString + if (txnStatusChangeReasonString != null) { + LOG.debug(new LogBuilder(LogKey.ROUINTE_LOAD_TASK, txnState.getLabel()) + .add("txn_id", txnState.getTransactionId()) + .add("msg", "txn abort with reason " + txnStatusChangeReasonString) + .build()); + TransactionState.TxnStatusChangeReason txnStatusChangeReason = + TransactionState.TxnStatusChangeReason.fromString(txnStatusChangeReasonString); + if (txnStatusChangeReason != null) { + switch (txnStatusChangeReason) { + case OFFSET_OUT_OF_RANGE: + updateState(JobState.CANCELLED, txnStatusChangeReason.toString()); + return; + default: + break; + } + } + // todo(ml): use previous be id depend on change reason + } else { + LOG.debug(new LogBuilder(LogKey.ROUINTE_LOAD_TASK, txnState.getLabel()) + .add("txn_id", txnState.getTransactionId()) + .add("msg", "txn abort").build()); + } + // step2: commit task , update progress, maybe create a new task executeCommitTask(routineLoadTaskInfoOptional.get(), txnState); } else { LOG.debug(new LogBuilder(LogKey.ROUINTE_LOAD_TASK, txnState.getLabel()) @@ -686,37 +719,41 @@ public void updateState(JobState jobState) { public void updateState(JobState jobState, String reason) { writeLock(); try { - LOG.debug(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id) - .add("current_job_state", getState()) - .add("desire_job_state", jobState) - .add("msg", "job will be change to desire state") - .build()); - checkStateTransform(jobState); - switch (jobState) { - case PAUSED: - executePause(reason); - break; - case NEED_SCHEDULE: - executeNeedSchedule(); - break; - case STOPPED: - executeStop(); - break; - case CANCELLED: - executeCancel(reason); - break; - default: - break; - } - LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id) - .add("current_job_state", getState()) - .add("msg", "job state has been changed") - .build()); + unprotectUpdateState(jobState, reason); } finally { writeUnlock(); } } + protected void unprotectUpdateState(JobState jobState, String reason) { + LOG.debug(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id) + .add("current_job_state", getState()) + .add("desire_job_state", jobState) + .add("msg", "job will be change to desire state") + .build()); + checkStateTransform(jobState); + switch (jobState) { + case PAUSED: + executePause(reason); + break; + case NEED_SCHEDULE: + executeNeedSchedule(); + break; + case STOPPED: + executeStop(); + break; + case CANCELLED: + executeCancel(reason); + break; + default: + break; + } + LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id) + .add("current_job_state", getState()) + .add("msg", "job state has been changed") + .build()); + } + private void executePause(String reason) { // TODO(ml): edit log // remove all of task in jobs and change job state to paused @@ -735,12 +772,14 @@ private void executeStop() { // TODO(ml): edit log state = JobState.STOPPED; routineLoadTaskInfoList.clear(); + endTimestamp = System.currentTimeMillis(); } private void executeCancel(String reason) { cancelReason = reason; state = JobState.CANCELLED; routineLoadTaskInfoList.clear(); + endTimestamp = System.currentTimeMillis(); } public void update() { diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java index 90a5539304f3ed..5304df7658cf17 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java @@ -29,11 +29,14 @@ import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Database; import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ErrorReport; import org.apache.doris.common.LoadException; import org.apache.doris.common.MetaNotFoundException; +import org.apache.doris.common.util.LogBuilder; +import org.apache.doris.common.util.LogKey; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.qe.ConnectContext; import org.apache.logging.log4j.LogManager; @@ -41,6 +44,7 @@ import java.util.ArrayList; import java.util.Collection; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; @@ -83,7 +87,7 @@ private void writeUnlock() { public RoutineLoadManager() { idToRoutineLoadJob = Maps.newConcurrentMap(); dbToNameToRoutineLoadJob = Maps.newConcurrentMap(); - beIdToMaxConcurrentTasks = Maps.newHashMap(); + beIdToMaxConcurrentTasks = Maps.newConcurrentMap(); lock = new ReentrantReadWriteLock(true); } @@ -448,8 +452,29 @@ public void processTimeoutTasks() { // Remove old routine load jobs from idToRoutineLoadJob // This function is called periodically. // Cancelled and stopped job will be remove after Configure.label_keep_max_second seconds - public void removeOldRoutineLoadJobs() { - // TODO(ml): remove old routine load job + public void cleanOldRoutineLoadJobs() { + writeLock(); + try { + Iterator> iterator = idToRoutineLoadJob.entrySet().iterator(); + long currentTimestamp = System.currentTimeMillis(); + while (iterator.hasNext()) { + RoutineLoadJob routineLoadJob = iterator.next().getValue(); + long jobEndTimestamp = routineLoadJob.getEndTimestamp(); + if (jobEndTimestamp != -1L && + ((currentTimestamp - jobEndTimestamp) > Config.label_clean_interval_second * 1000)) { + dbToNameToRoutineLoadJob.get(routineLoadJob.getDbId()).get(routineLoadJob.getName()).remove(routineLoadJob); + iterator.remove(); + LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, routineLoadJob.getId()) + .add("end_timestamp", routineLoadJob.getEndTimestamp()) + .add("current_timestamp", currentTimestamp) + .add("job_state", routineLoadJob.getState()) + .add("msg", "old job has been cleaned") + ); + } + } + } finally { + writeUnlock(); + } } public void updateRoutineLoadJob() { diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadScheduler.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadScheduler.java index 20f4a005b93ee3..cc79e1ec52c319 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadScheduler.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadScheduler.java @@ -23,6 +23,8 @@ import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.util.Daemon; +import org.apache.doris.common.util.LogBuilder; +import org.apache.doris.common.util.LogKey; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -51,7 +53,7 @@ protected void runOneCycle() { try { process(); } catch (Throwable e) { - LOG.error("failed to schedule jobs with error massage {}", e.getMessage(), e); + LOG.warn("failed to schedule jobs with error massage {}", e.getMessage(), e); } } @@ -63,7 +65,7 @@ private void process() { try { routineLoadJobList = getNeedScheduleRoutineJobs(); } catch (LoadException e) { - LOG.error("failed to get need schedule routine jobs"); + LOG.warn("failed to get need schedule routine jobs", e); } LOG.info("there are {} job need schedule", routineLoadJobList.size()); @@ -72,23 +74,30 @@ private void process() { // create plan of routine load job routineLoadJob.plan(); // judge nums of tasks more then max concurrent tasks of cluster - int currentConcurrentTaskNum = routineLoadJob.calculateCurrentConcurrentTaskNum(); - int totalTaskNum = currentConcurrentTaskNum + routineLoadManager.getSizeOfIdToRoutineLoadTask(); - if (totalTaskNum > routineLoadManager.getTotalMaxConcurrentTaskNum()) { - LOG.info("job {} concurrent task num {}, current total task num {}. " - + "desired total task num {} more then total max task num {}, " - + "skip this turn of job scheduler", - routineLoadJob.getId(), currentConcurrentTaskNum, - routineLoadManager.getSizeOfIdToRoutineLoadTask(), - totalTaskNum, routineLoadManager.getTotalMaxConcurrentTaskNum()); + int desiredConcurrentTaskNum = routineLoadJob.calculateCurrentConcurrentTaskNum(); + int currentTotalTaskNum = routineLoadManager.getSizeOfIdToRoutineLoadTask(); + int desiredTotalTaskNum = desiredConcurrentTaskNum + currentTotalTaskNum; + if (desiredTotalTaskNum > routineLoadManager.getTotalMaxConcurrentTaskNum()) { + LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, routineLoadJob.getId()) + .add("desired_concurrent_task_num", desiredConcurrentTaskNum) + .add("current_total_task_num", currentTotalTaskNum) + .add("desired_total_task_num", desiredTotalTaskNum) + .add("total_max_task_num", routineLoadManager.getTotalMaxConcurrentTaskNum()) + .add("msg", "skip this turn of job scheduler while there are not enough slot in backends") + .build()); break; } // check state and divide job into tasks - routineLoadJob.divideRoutineLoadJob(currentConcurrentTaskNum); + routineLoadJob.divideRoutineLoadJob(desiredConcurrentTaskNum); } catch (MetaNotFoundException e) { + LOG.warn(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, routineLoadJob.getId()) + .add("error_msg", "failed to get metadata, change job state to cancelled") + .build(), e); routineLoadJob.updateState(RoutineLoadJob.JobState.CANCELLED, e.getMessage()); } catch (Throwable e) { - LOG.warn("failed to scheduler job, change job state to paused", e); + LOG.warn(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, routineLoadJob.getId()) + .add("error_msg", "failed to scheduler job, change job state to paused") + .build(), e); routineLoadJob.updateState(RoutineLoadJob.JobState.PAUSED, e.getMessage()); continue; } @@ -97,6 +106,9 @@ private void process() { LOG.debug("begin to check timeout tasks"); // check timeout tasks routineLoadManager.processTimeoutTasks(); + + LOG.debug("begin to clean old jobs "); + routineLoadManager.cleanOldRoutineLoadJobs(); } private List getNeedScheduleRoutineJobs() throws LoadException { diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java index a5555854bc1c0c..a8f520c6c27cef 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java @@ -96,10 +96,10 @@ private void process() throws LoadException, UserException, InterruptedException int scheduledTaskNum = 0; // get idle be task num // allocate task to be - if (needScheduleTaskNum == 0) { - Thread.sleep(1000); - return; - } +// if (needScheduleTaskNum == 0) { +// Thread.sleep(1000); +// return; +// } while (needScheduleTaskNum > 0) { // allocate be to task and begin transaction for task RoutineLoadTaskInfo routineLoadTaskInfo = null; @@ -107,7 +107,7 @@ private void process() throws LoadException, UserException, InterruptedException routineLoadTaskInfo = needScheduleTasksQueue.take(); } catch (InterruptedException e) { LOG.warn("Taking routine load task from queue has been interrupted with error msg {}", - e.getMessage()); + e.getMessage(),e); return; } RoutineLoadJob routineLoadJob = null; @@ -192,6 +192,7 @@ private void allocateTaskToBe(RoutineLoadTaskInfo routineLoadTaskInfo, RoutineLo if (routineLoadManager.checkBeToTask(routineLoadTaskInfo.getPreviousBeId(), routineLoadJob.getClusterName())) { LOG.debug(new LogBuilder(LogKey.ROUINTE_LOAD_TASK, routineLoadTaskInfo.getId()) .add("job_id", routineLoadJob.getId()) + .add("previous_be_id", routineLoadTaskInfo.getPreviousBeId()) .add("msg", "task use the previous be id") .build()); routineLoadTaskInfo.setBeId(routineLoadTaskInfo.getPreviousBeId()); diff --git a/fe/src/main/java/org/apache/doris/transaction/TransactionState.java b/fe/src/main/java/org/apache/doris/transaction/TransactionState.java index eff96f9a91cc2a..69b360a11a75e9 100644 --- a/fe/src/main/java/org/apache/doris/transaction/TransactionState.java +++ b/fe/src/main/java/org/apache/doris/transaction/TransactionState.java @@ -86,7 +86,27 @@ public String toString() { public enum TxnStatusChangeReason { DB_DROPPED, - TIMEOUT + TIMEOUT, + OFFSET_OUT_OF_RANGE; + + public static TxnStatusChangeReason fromString(String reasonString) { + for (TxnStatusChangeReason txnStatusChangeReason : TxnStatusChangeReason.values()) { + if (reasonString.contains(txnStatusChangeReason.toString())) { + return txnStatusChangeReason; + } + } + return null; + } + + @Override + public String toString() { + switch (this) { + case OFFSET_OUT_OF_RANGE: + return "Offset out of range"; + default: + return this.name(); + } + } } private long dbId; diff --git a/fe/src/test/java/org/apache/doris/load/routineload/KafkaProducerTest.java b/fe/src/test/java/org/apache/doris/load/routineload/KafkaProducerTest.java index 0e3006b5c74997..f4c57fcc86b5b5 100644 --- a/fe/src/test/java/org/apache/doris/load/routineload/KafkaProducerTest.java +++ b/fe/src/test/java/org/apache/doris/load/routineload/KafkaProducerTest.java @@ -45,14 +45,12 @@ public Producer createProducer() { public static void main(String[] args) throws InterruptedException { KafkaProducerTest kafkaProducerTest = new KafkaProducerTest(); Producer kafkaProducer = kafkaProducerTest.createProducer(); - int i = 411; + int i = 1; while (true) { String value = String.valueOf(i); -// if (i % 5 == 0) { -// value = value + "\t" + value; -// } else if (i % 6 == 0) { -// value = value + "\t" + value; -// } + if (i % 10000 == 0) { + value = value + "\t" + value; + } ProducerRecord record = new ProducerRecord<>("miaoling", value); try { RecordMetadata metadata = kafkaProducer.send(record).get(); @@ -65,8 +63,6 @@ public static void main(String[] args) throws InterruptedException { System.out.println(e); } i++; - Thread.sleep(500); - } }