Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions fe/src/main/java/org/apache/doris/catalog/Catalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -658,11 +658,6 @@ public void initialize(String[] args) throws Exception {
// the clear threads runs every min(transaction_clean_interval_second,stream_load_default_timeout_second)/10
txnCleaner.setInterval(Math.min(Config.transaction_clean_interval_second,
Config.stream_load_default_timeout_second) * 100L);

// 8. start routine load scheduler
routineLoadScheduler.start();
routineLoadTaskScheduler.start();

}

private void getClusterIdAndRole() throws IOException {
Expand Down Expand Up @@ -1129,6 +1124,11 @@ private void transferToMaster() throws IOException {
domainResolver.start();

tabletStatMgr.start();

// start routine load scheduler
routineLoadScheduler.start();
routineLoadTaskScheduler.start();

MetricRepo.init();
}

Expand Down
3 changes: 2 additions & 1 deletion fe/src/main/java/org/apache/doris/journal/JournalEntity.java
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,8 @@ public void readFields(DataInput in) throws IOException {
needRead = false;
break;
}
case OperationType.OP_CHANGE_ROUTINE_LOAD_JOB: {
case OperationType.OP_CHANGE_ROUTINE_LOAD_JOB:
case OperationType.OP_REMOVE_ROUTINE_LOAD_JOB: {
data = RoutineLoadOperation.read(in);
needRead = false;
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -364,5 +364,7 @@ public void readFields(DataInput in) throws IOException {
for (int i = 0; i < size; i++) {
customKafkaPartitions.add(in.readInt());
}

setConsumer();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Table;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
Expand All @@ -50,6 +51,7 @@
import org.apache.doris.transaction.TxnStateChangeListener;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;

Expand Down Expand Up @@ -143,7 +145,7 @@ public boolean isFinalState() {
protected RoutineLoadProgress progress;
protected String pausedReason;
protected String cancelReason;
protected long endTimestamp;
protected long endTimestamp = -1;

/*
* currentErrorRows and currentTotalRows is used for check error rate
Expand Down Expand Up @@ -185,7 +187,6 @@ public RoutineLoadJob(Long id, String name, String clusterName, long dbId, long
this.clusterName = clusterName;
this.dbId = dbId;
this.tableId = tableId;
this.endTimestamp = -1;
this.authCode = new StringBuilder().append(ConnectContext.get().getQualifiedUser())
.append(ConnectContext.get().getRemoteIP())
.append(id).append(System.currentTimeMillis()).toString().hashCode();
Expand All @@ -205,7 +206,6 @@ public RoutineLoadJob(long id, String name, long dbId, long tableId,
this.desireTaskConcurrentNum = desireTaskConcurrentNum;
this.dataSourceType = dataSourceType;
this.maxErrorNum = maxErrorNum;
this.endTimestamp = -1;
}

protected void setOptional(CreateRoutineLoadStmt stmt) throws UserException {
Expand Down Expand Up @@ -602,6 +602,7 @@ public ListenResult onCommitted(TransactionState txnState) throws TransactionExc
@Override
public void replayOnCommitted(TransactionState txnState) {
replayUpdateProgress((RLTaskTxnCommitAttachment) txnState.getTxnCommitAttachment());
LOG.debug("replay on committed: {}", txnState);
}

// the task is aborted when the correct number of rows is more then 0
Expand Down Expand Up @@ -658,6 +659,7 @@ public ListenResult onAborted(TransactionState txnState, String txnStatusChangeR
@Override
public void replayOnAborted(TransactionState txnState) {
replayUpdateProgress((RLTaskTxnCommitAttachment) txnState.getTxnCommitAttachment());
LOG.debug("replay on aborted: {}", txnState);
}

// check task exists or not before call method
Expand Down Expand Up @@ -789,6 +791,10 @@ protected void unprotectUpdateState(JobState jobState, String reason, boolean is
break;
}

if (state.isFinalState()) {
Catalog.getCurrentGlobalTransactionMgr().getListenerRegistry().unregister(id);
}

if (!isReplay) {
Catalog.getInstance().getEditLog().logOpRoutineLoadJob(new RoutineLoadOperation(id, jobState));
}
Expand Down Expand Up @@ -894,6 +900,17 @@ public static RoutineLoadJob read(DataInput in) throws IOException {
return job;
}

public boolean needRemove() {
if (state != JobState.CANCELLED && state != JobState.STOPPED) {
return false;
}
Preconditions.checkState(endTimestamp != -1, endTimestamp);
if ((System.currentTimeMillis() - endTimestamp) > Config.label_clean_interval_second * 1000) {
return true;
}
return false;
}

@Override
public void write(DataOutput out) throws IOException {
// ATTN: must write type first
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Database;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
Expand All @@ -34,11 +33,11 @@
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.LogBuilder;
import org.apache.doris.common.util.LogKey;
import org.apache.doris.load.routineload.RoutineLoadJob.JobState;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.persist.RoutineLoadOperation;
import org.apache.doris.qe.ConnectContext;

import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;

Expand Down Expand Up @@ -110,8 +109,7 @@ private Map<Long, Integer> getBeIdConcurrentTaskMaps() {
}
}
}
LOG.debug("beIdToConcurrentTasks is {}", Joiner.on(",")
.withKeyValueSeparator(":").join(beIdToConcurrentTasks));
// LOG.debug("beIdToConcurrentTasks is {}", Joiner.on(",").withKeyValueSeparator(":").join(beIdToConcurrentTasks));
return beIdToConcurrentTasks;

}
Expand Down Expand Up @@ -223,6 +221,7 @@ public void pauseRoutineLoadJob(PauseRoutineLoadStmt pauseRoutineLoadStmt) throw
routineLoadJob.updateState(RoutineLoadJob.JobState.PAUSED,
"User " + ConnectContext.get().getQualifiedUser() + "pauses routine load job",
false /* not replay */);
LOG.info("pause routine load job: {}, {}", routineLoadJob.getId(), routineLoadJob.getName());
}

public void resumeRoutineLoadJob(ResumeRoutineLoadStmt resumeRoutineLoadStmt) throws DdlException,
Expand Down Expand Up @@ -250,6 +249,7 @@ public void resumeRoutineLoadJob(ResumeRoutineLoadStmt resumeRoutineLoadStmt) th
tableName);
}
routineLoadJob.updateState(RoutineLoadJob.JobState.NEED_SCHEDULE, "user operation", false /* not replay */);
LOG.info("resume routine load job: {}, {}", routineLoadJob.getId(), routineLoadJob.getName());
}

public void stopRoutineLoadJob(StopRoutineLoadStmt stopRoutineLoadStmt) throws DdlException, AnalysisException {
Expand All @@ -276,6 +276,7 @@ public void stopRoutineLoadJob(StopRoutineLoadStmt stopRoutineLoadStmt) throws D
tableName);
}
routineLoadJob.updateState(RoutineLoadJob.JobState.STOPPED, "user operation", false /* not replay */);
LOG.info("stop routine load job: {}, {}", routineLoadJob.getId(), routineLoadJob.getName());
}

public int getSizeOfIdToRoutineLoadTask() {
Expand Down Expand Up @@ -388,7 +389,7 @@ public RoutineLoadJob getJobByName(String jobName) {
if (routineLoadJobList == null) {
return null;
}
Optional<RoutineLoadJob> optional = routineLoadJobList.parallelStream()
Optional<RoutineLoadJob> optional = routineLoadJobList.stream()
.filter(entity -> !entity.getState().isFinalState()).findFirst();
if (!optional.isPresent()) {
return null;
Expand All @@ -409,10 +410,10 @@ public RoutineLoadJob getJobByTaskId(UUID taskId) throws MetaNotFoundException {
}

public List<RoutineLoadJob> getRoutineLoadJobByState(RoutineLoadJob.JobState jobState) {
LOG.debug("begin to get routine load job by state {}", jobState.name());
// LOG.debug("begin to get routine load job by state {}", jobState.name());
List<RoutineLoadJob> stateJobs = idToRoutineLoadJob.values().stream()
.filter(entity -> entity.getState() == jobState).collect(Collectors.toList());
LOG.debug("got {} routine load jobs by state {}", stateJobs.size(), jobState.name());
// LOG.debug("got {} routine load jobs by state {}", stateJobs.size(), jobState.name());
return stateJobs;
}

Expand All @@ -432,11 +433,13 @@ public void cleanOldRoutineLoadJobs() {
long currentTimestamp = System.currentTimeMillis();
while (iterator.hasNext()) {
RoutineLoadJob routineLoadJob = iterator.next().getValue();
long jobEndTimestamp = routineLoadJob.getEndTimestamp();
if (jobEndTimestamp != -1L &&
((currentTimestamp - jobEndTimestamp) > Config.label_clean_interval_second * 1000)) {
if (routineLoadJob.needRemove()) {
dbToNameToRoutineLoadJob.get(routineLoadJob.getDbId()).get(routineLoadJob.getName()).remove(routineLoadJob);
iterator.remove();

RoutineLoadOperation operation = new RoutineLoadOperation(routineLoadJob.getId(),
JobState.CANCELLED);
Catalog.getInstance().getEditLog().logRemoveRoutineLoadJob(operation);
LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, routineLoadJob.getId())
.add("end_timestamp", routineLoadJob.getEndTimestamp())
.add("current_timestamp", currentTimestamp)
Expand All @@ -450,6 +453,19 @@ public void cleanOldRoutineLoadJobs() {
}
}

public void replayRemoveOldRoutineLoad(RoutineLoadOperation operation) {
writeLock();
try {
RoutineLoadJob job = idToRoutineLoadJob.remove(operation.getId());
if (job != null) {
dbToNameToRoutineLoadJob.get(job.getDbId()).get(job.getName()).remove(job);
}
LOG.info("replay remove routine load job: {}", operation.getId());
} finally {
writeUnlock();
}
}

public void updateRoutineLoadJob() {
for (RoutineLoadJob routineLoadJob : idToRoutineLoadJob.values()) {
routineLoadJob.update();
Expand Down Expand Up @@ -493,6 +509,9 @@ public void readFields(DataInput in) throws IOException {
map.put(routineLoadJob.getName(), jobs);
}
jobs.add(routineLoadJob);
if (!routineLoadJob.getState().isFinalState()) {
Catalog.getCurrentGlobalTransactionMgr().getListenerRegistry().register(routineLoadJob);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@ private void process() throws LoadException, UserException, InterruptedException
int clusterIdleSlotNum = routineLoadManager.getClusterIdleSlotNum();
int needScheduleTaskNum = sizeOfTasksQueue < clusterIdleSlotNum ? sizeOfTasksQueue : clusterIdleSlotNum;

if (needScheduleTaskNum == 0) {
return;
}

LOG.info("There are {} tasks need to be scheduled in queue", needScheduleTasksQueue.size());

int scheduledTaskNum = 0;
Expand Down
9 changes: 9 additions & 0 deletions fe/src/main/java/org/apache/doris/persist/EditLog.java
Original file line number Diff line number Diff line change
Expand Up @@ -674,6 +674,11 @@ public static void loadJournal(Catalog catalog, JournalEntity journal) {
Catalog.getCurrentCatalog().getRoutineLoadManager().replayChangeRoutineLoadJob(operation);
break;
}
case OperationType.OP_REMOVE_ROUTINE_LOAD_JOB: {
RoutineLoadOperation operation = (RoutineLoadOperation) journal.getData();
Catalog.getCurrentCatalog().getRoutineLoadManager().replayRemoveOldRoutineLoad(operation);
break;
}
default: {
IOException e = new IOException();
LOG.error("UNKNOWN Operation Type {}", opCode, e);
Expand Down Expand Up @@ -1184,4 +1189,8 @@ public void logCreateRoutineLoadJob(RoutineLoadJob routineLoadJob) {
public void logOpRoutineLoadJob(RoutineLoadOperation routineLoadOperation) {
logEdit(OperationType.OP_CHANGE_ROUTINE_LOAD_JOB, routineLoadOperation);
}

public void logRemoveRoutineLoadJob(RoutineLoadOperation operation) {
logEdit(OperationType.OP_REMOVE_ROUTINE_LOAD_JOB, operation);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -159,5 +159,5 @@ public class OperationType {
// routine load 200
public static final short OP_CREATE_ROUTINE_LOAD_JOB = 200;
public static final short OP_CHANGE_ROUTINE_LOAD_JOB = 201;

public static final short OP_REMOVE_ROUTINE_LOAD_JOB = 202;
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public void write(DataOutput out) throws IOException {

@Override
public void readFields(DataInput in) throws IOException {
in.readLong();
id = in.readLong();
jobState = JobState.valueOf(Text.readString(in));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ public String toString() {
private long publishVersionTime;
private TransactionStatus preStatus = null;

private long listenerId;
private long listenerId = -1;

// the result of calling txn state change listener.
// this is used for replaying
Expand Down Expand Up @@ -393,6 +393,7 @@ public String toString() {
if (txnCommitAttachment != null) {
sb.append(" attactment: ").append(txnCommitAttachment);
}
sb.append(", listen result: ").append(listenResult.name());
return sb.toString();
}

Expand Down Expand Up @@ -440,6 +441,7 @@ public void write(DataOutput out) throws IOException {
txnCommitAttachment.write(out);
}
Text.writeString(out, listenResult.name());
out.writeLong(listenerId);
}

@Override
Expand Down Expand Up @@ -470,6 +472,7 @@ public void readFields(DataInput in) throws IOException {
txnCommitAttachment = TxnCommitAttachment.read(in);
}
listenResult = ListenResult.valueOf(Text.readString(in));
listenerId = in.readLong();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@

import com.google.common.collect.Maps;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.Map;

/*
Expand All @@ -28,18 +31,22 @@

// saves all TxnStateChangeListeners
public class TxnStateListenerRegistry {
private static final Logger LOG = LogManager.getLogger(TxnStateListenerRegistry.class);

private Map<Long, TxnStateChangeListener> listeners = Maps.newHashMap();

public synchronized boolean register(TxnStateChangeListener listener) {
if (listeners.containsKey(listener.getId())) {
return false;
}
listeners.put(listener.getId(), listener);
LOG.info("register txn state listener: {}", listener.getId());
return true;
}

public synchronized void unregister(long id) {
listeners.remove(id);
LOG.info("unregister txn state listener: {}", id);
}

public synchronized TxnStateChangeListener getListener(long id) {
Expand Down