diff --git a/fe/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/src/main/java/org/apache/doris/catalog/Catalog.java index 9124fcc8b25b58..ace9aa03d2badd 100644 --- a/fe/src/main/java/org/apache/doris/catalog/Catalog.java +++ b/fe/src/main/java/org/apache/doris/catalog/Catalog.java @@ -658,11 +658,6 @@ public void initialize(String[] args) throws Exception { // the clear threads runs every min(transaction_clean_interval_second,stream_load_default_timeout_second)/10 txnCleaner.setInterval(Math.min(Config.transaction_clean_interval_second, Config.stream_load_default_timeout_second) * 100L); - - // 8. start routine load scheduler - routineLoadScheduler.start(); - routineLoadTaskScheduler.start(); - } private void getClusterIdAndRole() throws IOException { @@ -1129,6 +1124,11 @@ private void transferToMaster() throws IOException { domainResolver.start(); tabletStatMgr.start(); + + // start routine load scheduler + routineLoadScheduler.start(); + routineLoadTaskScheduler.start(); + MetricRepo.init(); } diff --git a/fe/src/main/java/org/apache/doris/journal/JournalEntity.java b/fe/src/main/java/org/apache/doris/journal/JournalEntity.java index 59e6a79ce823e4..e5f06e9df060a4 100644 --- a/fe/src/main/java/org/apache/doris/journal/JournalEntity.java +++ b/fe/src/main/java/org/apache/doris/journal/JournalEntity.java @@ -418,7 +418,8 @@ public void readFields(DataInput in) throws IOException { needRead = false; break; } - case OperationType.OP_CHANGE_ROUTINE_LOAD_JOB: { + case OperationType.OP_CHANGE_ROUTINE_LOAD_JOB: + case OperationType.OP_REMOVE_ROUTINE_LOAD_JOB: { data = RoutineLoadOperation.read(in); needRead = false; break; diff --git a/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java b/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java index fa4be5739bbcf2..9b0710ca1c340e 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java @@ -364,5 +364,7 @@ public void readFields(DataInput in) throws IOException { for (int i = 0; i < size; i++) { customKafkaPartitions.add(in.readInt()); } + + setConsumer(); } } diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java index 7ebfb90e118d94..0607045ecfd475 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java @@ -26,6 +26,7 @@ import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.Table; import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ErrorReport; @@ -50,6 +51,7 @@ import org.apache.doris.transaction.TxnStateChangeListener; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -143,7 +145,7 @@ public boolean isFinalState() { protected RoutineLoadProgress progress; protected String pausedReason; protected String cancelReason; - protected long endTimestamp; + protected long endTimestamp = -1; /* * currentErrorRows and currentTotalRows is used for check error rate @@ -185,7 +187,6 @@ public RoutineLoadJob(Long id, String name, String clusterName, long dbId, long this.clusterName = clusterName; this.dbId = dbId; this.tableId = tableId; - this.endTimestamp = -1; this.authCode = new StringBuilder().append(ConnectContext.get().getQualifiedUser()) .append(ConnectContext.get().getRemoteIP()) .append(id).append(System.currentTimeMillis()).toString().hashCode(); @@ -205,7 +206,6 @@ public RoutineLoadJob(long id, String name, long dbId, long tableId, this.desireTaskConcurrentNum = desireTaskConcurrentNum; this.dataSourceType = dataSourceType; this.maxErrorNum = maxErrorNum; - this.endTimestamp = -1; } protected void setOptional(CreateRoutineLoadStmt stmt) throws UserException { @@ -602,6 +602,7 @@ public ListenResult onCommitted(TransactionState txnState) throws TransactionExc @Override public void replayOnCommitted(TransactionState txnState) { replayUpdateProgress((RLTaskTxnCommitAttachment) txnState.getTxnCommitAttachment()); + LOG.debug("replay on committed: {}", txnState); } // the task is aborted when the correct number of rows is more then 0 @@ -658,6 +659,7 @@ public ListenResult onAborted(TransactionState txnState, String txnStatusChangeR @Override public void replayOnAborted(TransactionState txnState) { replayUpdateProgress((RLTaskTxnCommitAttachment) txnState.getTxnCommitAttachment()); + LOG.debug("replay on aborted: {}", txnState); } // check task exists or not before call method @@ -789,6 +791,10 @@ protected void unprotectUpdateState(JobState jobState, String reason, boolean is break; } + if (state.isFinalState()) { + Catalog.getCurrentGlobalTransactionMgr().getListenerRegistry().unregister(id); + } + if (!isReplay) { Catalog.getInstance().getEditLog().logOpRoutineLoadJob(new RoutineLoadOperation(id, jobState)); } @@ -894,6 +900,17 @@ public static RoutineLoadJob read(DataInput in) throws IOException { return job; } + public boolean needRemove() { + if (state != JobState.CANCELLED && state != JobState.STOPPED) { + return false; + } + Preconditions.checkState(endTimestamp != -1, endTimestamp); + if ((System.currentTimeMillis() - endTimestamp) > Config.label_clean_interval_second * 1000) { + return true; + } + return false; + } + @Override public void write(DataOutput out) throws IOException { // ATTN: must write type first diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java index 1d78459ac748c8..3f160c7caf1606 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java @@ -24,7 +24,6 @@ import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Database; import org.apache.doris.common.AnalysisException; -import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ErrorReport; @@ -34,11 +33,11 @@ import org.apache.doris.common.io.Writable; import org.apache.doris.common.util.LogBuilder; import org.apache.doris.common.util.LogKey; +import org.apache.doris.load.routineload.RoutineLoadJob.JobState; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.persist.RoutineLoadOperation; import org.apache.doris.qe.ConnectContext; -import com.google.common.base.Joiner; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -110,8 +109,7 @@ private Map getBeIdConcurrentTaskMaps() { } } } - LOG.debug("beIdToConcurrentTasks is {}", Joiner.on(",") - .withKeyValueSeparator(":").join(beIdToConcurrentTasks)); + // LOG.debug("beIdToConcurrentTasks is {}", Joiner.on(",").withKeyValueSeparator(":").join(beIdToConcurrentTasks)); return beIdToConcurrentTasks; } @@ -223,6 +221,7 @@ public void pauseRoutineLoadJob(PauseRoutineLoadStmt pauseRoutineLoadStmt) throw routineLoadJob.updateState(RoutineLoadJob.JobState.PAUSED, "User " + ConnectContext.get().getQualifiedUser() + "pauses routine load job", false /* not replay */); + LOG.info("pause routine load job: {}, {}", routineLoadJob.getId(), routineLoadJob.getName()); } public void resumeRoutineLoadJob(ResumeRoutineLoadStmt resumeRoutineLoadStmt) throws DdlException, @@ -250,6 +249,7 @@ public void resumeRoutineLoadJob(ResumeRoutineLoadStmt resumeRoutineLoadStmt) th tableName); } routineLoadJob.updateState(RoutineLoadJob.JobState.NEED_SCHEDULE, "user operation", false /* not replay */); + LOG.info("resume routine load job: {}, {}", routineLoadJob.getId(), routineLoadJob.getName()); } public void stopRoutineLoadJob(StopRoutineLoadStmt stopRoutineLoadStmt) throws DdlException, AnalysisException { @@ -276,6 +276,7 @@ public void stopRoutineLoadJob(StopRoutineLoadStmt stopRoutineLoadStmt) throws D tableName); } routineLoadJob.updateState(RoutineLoadJob.JobState.STOPPED, "user operation", false /* not replay */); + LOG.info("stop routine load job: {}, {}", routineLoadJob.getId(), routineLoadJob.getName()); } public int getSizeOfIdToRoutineLoadTask() { @@ -388,7 +389,7 @@ public RoutineLoadJob getJobByName(String jobName) { if (routineLoadJobList == null) { return null; } - Optional optional = routineLoadJobList.parallelStream() + Optional optional = routineLoadJobList.stream() .filter(entity -> !entity.getState().isFinalState()).findFirst(); if (!optional.isPresent()) { return null; @@ -409,10 +410,10 @@ public RoutineLoadJob getJobByTaskId(UUID taskId) throws MetaNotFoundException { } public List getRoutineLoadJobByState(RoutineLoadJob.JobState jobState) { - LOG.debug("begin to get routine load job by state {}", jobState.name()); + // LOG.debug("begin to get routine load job by state {}", jobState.name()); List stateJobs = idToRoutineLoadJob.values().stream() .filter(entity -> entity.getState() == jobState).collect(Collectors.toList()); - LOG.debug("got {} routine load jobs by state {}", stateJobs.size(), jobState.name()); + // LOG.debug("got {} routine load jobs by state {}", stateJobs.size(), jobState.name()); return stateJobs; } @@ -432,11 +433,13 @@ public void cleanOldRoutineLoadJobs() { long currentTimestamp = System.currentTimeMillis(); while (iterator.hasNext()) { RoutineLoadJob routineLoadJob = iterator.next().getValue(); - long jobEndTimestamp = routineLoadJob.getEndTimestamp(); - if (jobEndTimestamp != -1L && - ((currentTimestamp - jobEndTimestamp) > Config.label_clean_interval_second * 1000)) { + if (routineLoadJob.needRemove()) { dbToNameToRoutineLoadJob.get(routineLoadJob.getDbId()).get(routineLoadJob.getName()).remove(routineLoadJob); iterator.remove(); + + RoutineLoadOperation operation = new RoutineLoadOperation(routineLoadJob.getId(), + JobState.CANCELLED); + Catalog.getInstance().getEditLog().logRemoveRoutineLoadJob(operation); LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, routineLoadJob.getId()) .add("end_timestamp", routineLoadJob.getEndTimestamp()) .add("current_timestamp", currentTimestamp) @@ -450,6 +453,19 @@ public void cleanOldRoutineLoadJobs() { } } + public void replayRemoveOldRoutineLoad(RoutineLoadOperation operation) { + writeLock(); + try { + RoutineLoadJob job = idToRoutineLoadJob.remove(operation.getId()); + if (job != null) { + dbToNameToRoutineLoadJob.get(job.getDbId()).get(job.getName()).remove(job); + } + LOG.info("replay remove routine load job: {}", operation.getId()); + } finally { + writeUnlock(); + } + } + public void updateRoutineLoadJob() { for (RoutineLoadJob routineLoadJob : idToRoutineLoadJob.values()) { routineLoadJob.update(); @@ -493,6 +509,9 @@ public void readFields(DataInput in) throws IOException { map.put(routineLoadJob.getName(), jobs); } jobs.add(routineLoadJob); + if (!routineLoadJob.getState().isFinalState()) { + Catalog.getCurrentGlobalTransactionMgr().getListenerRegistry().register(routineLoadJob); + } } } } diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java index 66d26e51a45655..d4a50a73184857 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java @@ -93,6 +93,10 @@ private void process() throws LoadException, UserException, InterruptedException int clusterIdleSlotNum = routineLoadManager.getClusterIdleSlotNum(); int needScheduleTaskNum = sizeOfTasksQueue < clusterIdleSlotNum ? sizeOfTasksQueue : clusterIdleSlotNum; + if (needScheduleTaskNum == 0) { + return; + } + LOG.info("There are {} tasks need to be scheduled in queue", needScheduleTasksQueue.size()); int scheduledTaskNum = 0; diff --git a/fe/src/main/java/org/apache/doris/persist/EditLog.java b/fe/src/main/java/org/apache/doris/persist/EditLog.java index b4a30f902a918b..8524bf72259aa7 100644 --- a/fe/src/main/java/org/apache/doris/persist/EditLog.java +++ b/fe/src/main/java/org/apache/doris/persist/EditLog.java @@ -674,6 +674,11 @@ public static void loadJournal(Catalog catalog, JournalEntity journal) { Catalog.getCurrentCatalog().getRoutineLoadManager().replayChangeRoutineLoadJob(operation); break; } + case OperationType.OP_REMOVE_ROUTINE_LOAD_JOB: { + RoutineLoadOperation operation = (RoutineLoadOperation) journal.getData(); + Catalog.getCurrentCatalog().getRoutineLoadManager().replayRemoveOldRoutineLoad(operation); + break; + } default: { IOException e = new IOException(); LOG.error("UNKNOWN Operation Type {}", opCode, e); @@ -1184,4 +1189,8 @@ public void logCreateRoutineLoadJob(RoutineLoadJob routineLoadJob) { public void logOpRoutineLoadJob(RoutineLoadOperation routineLoadOperation) { logEdit(OperationType.OP_CHANGE_ROUTINE_LOAD_JOB, routineLoadOperation); } + + public void logRemoveRoutineLoadJob(RoutineLoadOperation operation) { + logEdit(OperationType.OP_REMOVE_ROUTINE_LOAD_JOB, operation); + } } diff --git a/fe/src/main/java/org/apache/doris/persist/OperationType.java b/fe/src/main/java/org/apache/doris/persist/OperationType.java index dc7c8026e6309f..05b788ec238c4a 100644 --- a/fe/src/main/java/org/apache/doris/persist/OperationType.java +++ b/fe/src/main/java/org/apache/doris/persist/OperationType.java @@ -159,5 +159,5 @@ public class OperationType { // routine load 200 public static final short OP_CREATE_ROUTINE_LOAD_JOB = 200; public static final short OP_CHANGE_ROUTINE_LOAD_JOB = 201; - + public static final short OP_REMOVE_ROUTINE_LOAD_JOB = 202; } diff --git a/fe/src/main/java/org/apache/doris/persist/RoutineLoadOperation.java b/fe/src/main/java/org/apache/doris/persist/RoutineLoadOperation.java index 395e2b655cf8e8..208096877019e2 100644 --- a/fe/src/main/java/org/apache/doris/persist/RoutineLoadOperation.java +++ b/fe/src/main/java/org/apache/doris/persist/RoutineLoadOperation.java @@ -64,7 +64,7 @@ public void write(DataOutput out) throws IOException { @Override public void readFields(DataInput in) throws IOException { - in.readLong(); + id = in.readLong(); jobState = JobState.valueOf(Text.readString(in)); } } diff --git a/fe/src/main/java/org/apache/doris/transaction/TransactionState.java b/fe/src/main/java/org/apache/doris/transaction/TransactionState.java index cfd3cebfed3423..d09ab522b351b3 100644 --- a/fe/src/main/java/org/apache/doris/transaction/TransactionState.java +++ b/fe/src/main/java/org/apache/doris/transaction/TransactionState.java @@ -135,7 +135,7 @@ public String toString() { private long publishVersionTime; private TransactionStatus preStatus = null; - private long listenerId; + private long listenerId = -1; // the result of calling txn state change listener. // this is used for replaying @@ -393,6 +393,7 @@ public String toString() { if (txnCommitAttachment != null) { sb.append(" attactment: ").append(txnCommitAttachment); } + sb.append(", listen result: ").append(listenResult.name()); return sb.toString(); } @@ -440,6 +441,7 @@ public void write(DataOutput out) throws IOException { txnCommitAttachment.write(out); } Text.writeString(out, listenResult.name()); + out.writeLong(listenerId); } @Override @@ -470,6 +472,7 @@ public void readFields(DataInput in) throws IOException { txnCommitAttachment = TxnCommitAttachment.read(in); } listenResult = ListenResult.valueOf(Text.readString(in)); + listenerId = in.readLong(); } } } diff --git a/fe/src/main/java/org/apache/doris/transaction/TxnStateListenerRegistry.java b/fe/src/main/java/org/apache/doris/transaction/TxnStateListenerRegistry.java index d2529e76c03a82..d24db98e2ce8aa 100644 --- a/fe/src/main/java/org/apache/doris/transaction/TxnStateListenerRegistry.java +++ b/fe/src/main/java/org/apache/doris/transaction/TxnStateListenerRegistry.java @@ -19,6 +19,9 @@ import com.google.common.collect.Maps; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + import java.util.Map; /* @@ -28,6 +31,8 @@ // saves all TxnStateChangeListeners public class TxnStateListenerRegistry { + private static final Logger LOG = LogManager.getLogger(TxnStateListenerRegistry.class); + private Map listeners = Maps.newHashMap(); public synchronized boolean register(TxnStateChangeListener listener) { @@ -35,11 +40,13 @@ public synchronized boolean register(TxnStateChangeListener listener) { return false; } listeners.put(listener.getId(), listener); + LOG.info("register txn state listener: {}", listener.getId()); return true; } public synchronized void unregister(long id) { listeners.remove(id); + LOG.info("unregister txn state listener: {}", id); } public synchronized TxnStateChangeListener getListener(long id) {