From f29918828f39cc25de22ebfe677f91b48159616d Mon Sep 17 00:00:00 2001 From: emmymiao87 <522274284@qq.com> Date: Wed, 13 Mar 2019 11:01:22 +0800 Subject: [PATCH] Add routine load job cleaner 1. the stopped and cancelled job will be cleaned after the interval of clean second 2. the interval of clean second * 1000 = current timestamp - end timestamp 3. if job could not fetch topic metadata when need_schedule, job will be cancelled 4. fix the deadlock of job and txn. the lock of txn must be in front of the lock of job 5. the job will be paused or cancelled depend on the abort reason of txn 6. the job will be cancelled immediately if the abort reason named offsets out of range --- .../load/routineload/KafkaRoutineLoadJob.java | 15 +- .../load/routineload/RoutineLoadJob.java | 141 +++++++++++------- .../load/routineload/RoutineLoadManager.java | 31 +++- .../routineload/RoutineLoadScheduler.java | 38 +++-- .../routineload/RoutineLoadTaskScheduler.java | 11 +- .../doris/transaction/TransactionState.java | 22 ++- .../load/routineload/KafkaProducerTest.java | 12 +- 7 files changed, 187 insertions(+), 83 deletions(-) diff --git a/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java b/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java index 757f3bbd10f222..08e03f0a956ebb 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java @@ -169,10 +169,15 @@ public int calculateCurrentConcurrentTaskNum() throws MetaNotFoundException { return Math.min(Math.min(partitionNum, Math.min(desireTaskConcurrentNum, aliveBeNum)), DEFAULT_TASK_MAX_CONCURRENT_NUM); } + // partitionIdToOffset must be not empty when loaded rows > 0 + // situation1: be commit txn but fe throw error when committing txn, + // fe rollback txn without partitionIdToOffset by itself + // this task should not be commit + // otherwise currentErrorNum and currentTotalNum is updated when progress is not updated @Override boolean checkCommitInfo(RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment) { if (rlTaskTxnCommitAttachment.getLoadedRows() > 0 - && ((KafkaProgress) rlTaskTxnCommitAttachment.getProgress()).getPartitionIdToOffset().size() == 0) { + && ((KafkaProgress) rlTaskTxnCommitAttachment.getProgress()).getPartitionIdToOffset().isEmpty()) { LOG.warn(new LogBuilder(LogKey.ROUINTE_LOAD_TASK, DebugUtil.printId(rlTaskTxnCommitAttachment.getTaskId())) .add("job_id", id) .add("loaded_rows", rlTaskTxnCommitAttachment.getLoadedRows()) @@ -223,7 +228,13 @@ protected boolean unprotectNeedReschedule() { try { newCurrentKafkaPartition = getAllKafkaPartitions(); } catch (Exception e) { - LOG.warn("Job {} failed to fetch all current partition", id); + LOG.warn(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id) + .add("error_msg", "Job failed to fetch all current partition with error " + e.getMessage()) + .build(), e); + if (this.state == JobState.NEED_SCHEDULE) { + unprotectUpdateState(JobState.PAUSED, + "Job failed to fetch all current partition with error " + e.getMessage()); + } return false; } if (currentKafkaPartitions.containsAll(newCurrentKafkaPartition)) { diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java index f0ee4dbfd22f79..09a49fba96695d 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java @@ -137,6 +137,7 @@ public boolean isFinalState() { protected RoutineLoadProgress progress; protected String pausedReason; protected String cancelReason; + protected long endTimestamp; // currentErrorNum and currentTotalNum will be update // when currentTotalNum is more then ten thousand or currentErrorNum is more then maxErrorNum @@ -160,6 +161,7 @@ public RoutineLoadJob(String name, long dbId, long tableId, LoadDataSourceType d this.state = JobState.NEED_SCHEDULE; this.dataSourceType = dataSourceType; this.resourceInfo = ConnectContext.get().toResourceCtx(); + this.endTimestamp = -1; this.authCode = new StringBuilder().append(ConnectContext.get().getQualifiedUser()) .append(ConnectContext.get().getRemoteIP()) .append(id).append(System.currentTimeMillis()).toString().hashCode(); @@ -183,6 +185,7 @@ public RoutineLoadJob(long id, String name, long dbId, long tableId, this.dataSourceType = dataSourceType; this.maxErrorNum = maxErrorNum; this.resourceInfo = ConnectContext.get().toResourceCtx(); + this.endTimestamp = -1; this.routineLoadTaskInfoList = new ArrayList<>(); lock = new ReentrantReadWriteLock(true); } @@ -254,6 +257,10 @@ public long getAuthCode() { return authCode; } + public long getEndTimestamp() { + return endTimestamp; + } + // this is a unprotected method which is called in the initialization function protected void setRoutineLoadDesc(RoutineLoadDesc routineLoadDesc) throws LoadException { if (this.routineLoadDesc != null) { @@ -343,6 +350,7 @@ public TExecPlanFragmentParams gettExecPlanFragmentParams() { // only check loading task public List processTimeoutTasks() { List result = new ArrayList<>(); + List timeoutTaskList = new ArrayList<>(); writeLock(); try { List runningTasks = new ArrayList<>(routineLoadTaskInfoList); @@ -350,24 +358,28 @@ public List processTimeoutTasks() { if ((routineLoadTaskInfo.getLoadStartTimeMs() != 0L) && ((System.currentTimeMillis() - routineLoadTaskInfo.getLoadStartTimeMs()) > DEFAULT_TASK_TIMEOUT_SECONDS * 1000)) { - UUID oldTaskId = routineLoadTaskInfo.getId(); - // abort txn if not committed - try { - Catalog.getCurrentGlobalTransactionMgr() - .abortTransaction(routineLoadTaskInfo.getTxnId(), "routine load task of txn was timeout"); - } catch (UserException e) { - if (e.getMessage().contains("committed")) { - LOG.debug(new LogBuilder(LogKey.ROUINTE_LOAD_TASK, oldTaskId) - .add("msg", "txn of task has been committed when checking timeout") - .build()); - continue; - } - } + timeoutTaskList.add(routineLoadTaskInfo); } } } finally { writeUnlock(); } + + for (RoutineLoadTaskInfo routineLoadTaskInfo : timeoutTaskList) { + UUID oldTaskId = routineLoadTaskInfo.getId(); + // abort txn if not committed + try { + Catalog.getCurrentGlobalTransactionMgr() + .abortTransaction(routineLoadTaskInfo.getTxnId(), "routine load task of txn was timeout"); + } catch (UserException e) { + if (e.getMessage().contains("committed")) { + LOG.debug(new LogBuilder(LogKey.ROUINTE_LOAD_TASK, oldTaskId) + .add("msg", "txn of task has been committed when checking timeout") + .build(), e); + continue; + } + } + } return result; } @@ -510,6 +522,10 @@ public void plan() throws UserException { @Override public void beforeAborted(TransactionState txnState, String txnStatusChangeReason) throws AbortTransactionException { + LOG.debug(new LogBuilder(LogKey.ROUINTE_LOAD_TASK, txnState.getLabel()) + .add("txn_state", txnState) + .add("msg", "task before aborted") + .build()); readLock(); try { String taskId = txnState.getLabel(); @@ -526,6 +542,10 @@ public void beforeAborted(TransactionState txnState, String txnStatusChangeReaso @Override public void beforeCommitted(TransactionState txnState) throws TransactionException { + LOG.debug(new LogBuilder(LogKey.ROUINTE_LOAD_TASK, txnState.getLabel()) + .add("txn_state", txnState) + .add("msg", "task before committed") + .build()); readLock(); try { // check if task has been aborted @@ -578,17 +598,7 @@ public void onCommitted(TransactionState txnState) throws TransactionException { // txn will be aborted but progress will be update // progress will be update otherwise the progress will be hung @Override - public void onAborted(TransactionState txnState, String txnStatusChangeReason) { - if (txnStatusChangeReason != null) { - LOG.debug(new LogBuilder(LogKey.ROUINTE_LOAD_TASK, txnState.getLabel()) - .add("txn_id", txnState.getTransactionId()) - .add("msg", "txn abort with reason " + txnStatusChangeReason) - .build()); - } else { - LOG.debug(new LogBuilder(LogKey.ROUINTE_LOAD_TASK, txnState.getLabel()) - .add("txn_id", txnState.getTransactionId()) - .add("msg", "txn abort").build()); - } + public void onAborted(TransactionState txnState, String txnStatusChangeReasonString) { writeLock(); try { // step0: find task in job @@ -596,7 +606,30 @@ public void onAborted(TransactionState txnState, String txnStatusChangeReason) { routineLoadTaskInfoList.parallelStream() .filter(entity -> DebugUtil.printId(entity.getId()).equals(txnState.getLabel())).findFirst(); if (routineLoadTaskInfoOptional.isPresent()) { - // todo(ml): use previous be id depend on change reason + // step1: job state will be changed depending on txnStatusChangeReasonString + if (txnStatusChangeReasonString != null) { + LOG.debug(new LogBuilder(LogKey.ROUINTE_LOAD_TASK, txnState.getLabel()) + .add("txn_id", txnState.getTransactionId()) + .add("msg", "txn abort with reason " + txnStatusChangeReasonString) + .build()); + TransactionState.TxnStatusChangeReason txnStatusChangeReason = + TransactionState.TxnStatusChangeReason.fromString(txnStatusChangeReasonString); + if (txnStatusChangeReason != null) { + switch (txnStatusChangeReason) { + case OFFSET_OUT_OF_RANGE: + updateState(JobState.CANCELLED, txnStatusChangeReason.toString()); + return; + default: + break; + } + } + // todo(ml): use previous be id depend on change reason + } else { + LOG.debug(new LogBuilder(LogKey.ROUINTE_LOAD_TASK, txnState.getLabel()) + .add("txn_id", txnState.getTransactionId()) + .add("msg", "txn abort").build()); + } + // step2: commit task , update progress, maybe create a new task executeCommitTask(routineLoadTaskInfoOptional.get(), txnState); } else { LOG.debug(new LogBuilder(LogKey.ROUINTE_LOAD_TASK, txnState.getLabel()) @@ -686,37 +719,41 @@ public void updateState(JobState jobState) { public void updateState(JobState jobState, String reason) { writeLock(); try { - LOG.debug(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id) - .add("current_job_state", getState()) - .add("desire_job_state", jobState) - .add("msg", "job will be change to desire state") - .build()); - checkStateTransform(jobState); - switch (jobState) { - case PAUSED: - executePause(reason); - break; - case NEED_SCHEDULE: - executeNeedSchedule(); - break; - case STOPPED: - executeStop(); - break; - case CANCELLED: - executeCancel(reason); - break; - default: - break; - } - LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id) - .add("current_job_state", getState()) - .add("msg", "job state has been changed") - .build()); + unprotectUpdateState(jobState, reason); } finally { writeUnlock(); } } + protected void unprotectUpdateState(JobState jobState, String reason) { + LOG.debug(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id) + .add("current_job_state", getState()) + .add("desire_job_state", jobState) + .add("msg", "job will be change to desire state") + .build()); + checkStateTransform(jobState); + switch (jobState) { + case PAUSED: + executePause(reason); + break; + case NEED_SCHEDULE: + executeNeedSchedule(); + break; + case STOPPED: + executeStop(); + break; + case CANCELLED: + executeCancel(reason); + break; + default: + break; + } + LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id) + .add("current_job_state", getState()) + .add("msg", "job state has been changed") + .build()); + } + private void executePause(String reason) { // TODO(ml): edit log // remove all of task in jobs and change job state to paused @@ -735,12 +772,14 @@ private void executeStop() { // TODO(ml): edit log state = JobState.STOPPED; routineLoadTaskInfoList.clear(); + endTimestamp = System.currentTimeMillis(); } private void executeCancel(String reason) { cancelReason = reason; state = JobState.CANCELLED; routineLoadTaskInfoList.clear(); + endTimestamp = System.currentTimeMillis(); } public void update() { diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java index 90a5539304f3ed..5304df7658cf17 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java @@ -29,11 +29,14 @@ import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Database; import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ErrorReport; import org.apache.doris.common.LoadException; import org.apache.doris.common.MetaNotFoundException; +import org.apache.doris.common.util.LogBuilder; +import org.apache.doris.common.util.LogKey; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.qe.ConnectContext; import org.apache.logging.log4j.LogManager; @@ -41,6 +44,7 @@ import java.util.ArrayList; import java.util.Collection; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; @@ -83,7 +87,7 @@ private void writeUnlock() { public RoutineLoadManager() { idToRoutineLoadJob = Maps.newConcurrentMap(); dbToNameToRoutineLoadJob = Maps.newConcurrentMap(); - beIdToMaxConcurrentTasks = Maps.newHashMap(); + beIdToMaxConcurrentTasks = Maps.newConcurrentMap(); lock = new ReentrantReadWriteLock(true); } @@ -448,8 +452,29 @@ public void processTimeoutTasks() { // Remove old routine load jobs from idToRoutineLoadJob // This function is called periodically. // Cancelled and stopped job will be remove after Configure.label_keep_max_second seconds - public void removeOldRoutineLoadJobs() { - // TODO(ml): remove old routine load job + public void cleanOldRoutineLoadJobs() { + writeLock(); + try { + Iterator> iterator = idToRoutineLoadJob.entrySet().iterator(); + long currentTimestamp = System.currentTimeMillis(); + while (iterator.hasNext()) { + RoutineLoadJob routineLoadJob = iterator.next().getValue(); + long jobEndTimestamp = routineLoadJob.getEndTimestamp(); + if (jobEndTimestamp != -1L && + ((currentTimestamp - jobEndTimestamp) > Config.label_clean_interval_second * 1000)) { + dbToNameToRoutineLoadJob.get(routineLoadJob.getDbId()).get(routineLoadJob.getName()).remove(routineLoadJob); + iterator.remove(); + LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, routineLoadJob.getId()) + .add("end_timestamp", routineLoadJob.getEndTimestamp()) + .add("current_timestamp", currentTimestamp) + .add("job_state", routineLoadJob.getState()) + .add("msg", "old job has been cleaned") + ); + } + } + } finally { + writeUnlock(); + } } public void updateRoutineLoadJob() { diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadScheduler.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadScheduler.java index 20f4a005b93ee3..cc79e1ec52c319 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadScheduler.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadScheduler.java @@ -23,6 +23,8 @@ import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.util.Daemon; +import org.apache.doris.common.util.LogBuilder; +import org.apache.doris.common.util.LogKey; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -51,7 +53,7 @@ protected void runOneCycle() { try { process(); } catch (Throwable e) { - LOG.error("failed to schedule jobs with error massage {}", e.getMessage(), e); + LOG.warn("failed to schedule jobs with error massage {}", e.getMessage(), e); } } @@ -63,7 +65,7 @@ private void process() { try { routineLoadJobList = getNeedScheduleRoutineJobs(); } catch (LoadException e) { - LOG.error("failed to get need schedule routine jobs"); + LOG.warn("failed to get need schedule routine jobs", e); } LOG.info("there are {} job need schedule", routineLoadJobList.size()); @@ -72,23 +74,30 @@ private void process() { // create plan of routine load job routineLoadJob.plan(); // judge nums of tasks more then max concurrent tasks of cluster - int currentConcurrentTaskNum = routineLoadJob.calculateCurrentConcurrentTaskNum(); - int totalTaskNum = currentConcurrentTaskNum + routineLoadManager.getSizeOfIdToRoutineLoadTask(); - if (totalTaskNum > routineLoadManager.getTotalMaxConcurrentTaskNum()) { - LOG.info("job {} concurrent task num {}, current total task num {}. " - + "desired total task num {} more then total max task num {}, " - + "skip this turn of job scheduler", - routineLoadJob.getId(), currentConcurrentTaskNum, - routineLoadManager.getSizeOfIdToRoutineLoadTask(), - totalTaskNum, routineLoadManager.getTotalMaxConcurrentTaskNum()); + int desiredConcurrentTaskNum = routineLoadJob.calculateCurrentConcurrentTaskNum(); + int currentTotalTaskNum = routineLoadManager.getSizeOfIdToRoutineLoadTask(); + int desiredTotalTaskNum = desiredConcurrentTaskNum + currentTotalTaskNum; + if (desiredTotalTaskNum > routineLoadManager.getTotalMaxConcurrentTaskNum()) { + LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, routineLoadJob.getId()) + .add("desired_concurrent_task_num", desiredConcurrentTaskNum) + .add("current_total_task_num", currentTotalTaskNum) + .add("desired_total_task_num", desiredTotalTaskNum) + .add("total_max_task_num", routineLoadManager.getTotalMaxConcurrentTaskNum()) + .add("msg", "skip this turn of job scheduler while there are not enough slot in backends") + .build()); break; } // check state and divide job into tasks - routineLoadJob.divideRoutineLoadJob(currentConcurrentTaskNum); + routineLoadJob.divideRoutineLoadJob(desiredConcurrentTaskNum); } catch (MetaNotFoundException e) { + LOG.warn(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, routineLoadJob.getId()) + .add("error_msg", "failed to get metadata, change job state to cancelled") + .build(), e); routineLoadJob.updateState(RoutineLoadJob.JobState.CANCELLED, e.getMessage()); } catch (Throwable e) { - LOG.warn("failed to scheduler job, change job state to paused", e); + LOG.warn(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, routineLoadJob.getId()) + .add("error_msg", "failed to scheduler job, change job state to paused") + .build(), e); routineLoadJob.updateState(RoutineLoadJob.JobState.PAUSED, e.getMessage()); continue; } @@ -97,6 +106,9 @@ private void process() { LOG.debug("begin to check timeout tasks"); // check timeout tasks routineLoadManager.processTimeoutTasks(); + + LOG.debug("begin to clean old jobs "); + routineLoadManager.cleanOldRoutineLoadJobs(); } private List getNeedScheduleRoutineJobs() throws LoadException { diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java index a5555854bc1c0c..a8f520c6c27cef 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java @@ -96,10 +96,10 @@ private void process() throws LoadException, UserException, InterruptedException int scheduledTaskNum = 0; // get idle be task num // allocate task to be - if (needScheduleTaskNum == 0) { - Thread.sleep(1000); - return; - } +// if (needScheduleTaskNum == 0) { +// Thread.sleep(1000); +// return; +// } while (needScheduleTaskNum > 0) { // allocate be to task and begin transaction for task RoutineLoadTaskInfo routineLoadTaskInfo = null; @@ -107,7 +107,7 @@ private void process() throws LoadException, UserException, InterruptedException routineLoadTaskInfo = needScheduleTasksQueue.take(); } catch (InterruptedException e) { LOG.warn("Taking routine load task from queue has been interrupted with error msg {}", - e.getMessage()); + e.getMessage(),e); return; } RoutineLoadJob routineLoadJob = null; @@ -192,6 +192,7 @@ private void allocateTaskToBe(RoutineLoadTaskInfo routineLoadTaskInfo, RoutineLo if (routineLoadManager.checkBeToTask(routineLoadTaskInfo.getPreviousBeId(), routineLoadJob.getClusterName())) { LOG.debug(new LogBuilder(LogKey.ROUINTE_LOAD_TASK, routineLoadTaskInfo.getId()) .add("job_id", routineLoadJob.getId()) + .add("previous_be_id", routineLoadTaskInfo.getPreviousBeId()) .add("msg", "task use the previous be id") .build()); routineLoadTaskInfo.setBeId(routineLoadTaskInfo.getPreviousBeId()); diff --git a/fe/src/main/java/org/apache/doris/transaction/TransactionState.java b/fe/src/main/java/org/apache/doris/transaction/TransactionState.java index eff96f9a91cc2a..69b360a11a75e9 100644 --- a/fe/src/main/java/org/apache/doris/transaction/TransactionState.java +++ b/fe/src/main/java/org/apache/doris/transaction/TransactionState.java @@ -86,7 +86,27 @@ public String toString() { public enum TxnStatusChangeReason { DB_DROPPED, - TIMEOUT + TIMEOUT, + OFFSET_OUT_OF_RANGE; + + public static TxnStatusChangeReason fromString(String reasonString) { + for (TxnStatusChangeReason txnStatusChangeReason : TxnStatusChangeReason.values()) { + if (reasonString.contains(txnStatusChangeReason.toString())) { + return txnStatusChangeReason; + } + } + return null; + } + + @Override + public String toString() { + switch (this) { + case OFFSET_OUT_OF_RANGE: + return "Offset out of range"; + default: + return this.name(); + } + } } private long dbId; diff --git a/fe/src/test/java/org/apache/doris/load/routineload/KafkaProducerTest.java b/fe/src/test/java/org/apache/doris/load/routineload/KafkaProducerTest.java index 0e3006b5c74997..f4c57fcc86b5b5 100644 --- a/fe/src/test/java/org/apache/doris/load/routineload/KafkaProducerTest.java +++ b/fe/src/test/java/org/apache/doris/load/routineload/KafkaProducerTest.java @@ -45,14 +45,12 @@ public Producer createProducer() { public static void main(String[] args) throws InterruptedException { KafkaProducerTest kafkaProducerTest = new KafkaProducerTest(); Producer kafkaProducer = kafkaProducerTest.createProducer(); - int i = 411; + int i = 1; while (true) { String value = String.valueOf(i); -// if (i % 5 == 0) { -// value = value + "\t" + value; -// } else if (i % 6 == 0) { -// value = value + "\t" + value; -// } + if (i % 10000 == 0) { + value = value + "\t" + value; + } ProducerRecord record = new ProducerRecord<>("miaoling", value); try { RecordMetadata metadata = kafkaProducer.send(record).get(); @@ -65,8 +63,6 @@ public static void main(String[] args) throws InterruptedException { System.out.println(e); } i++; - Thread.sleep(500); - } }