Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public class CreateRoutineLoadStmt extends DdlStmt {
public static final String MAX_ERROR_NUMBER_PROPERTY = "max_error_number";

// kafka type properties
public static final String KAFKA_ENDPOINT_PROPERTY = "kafka_endpoint";
public static final String KAFKA_BROKER_LIST_PROPERTY = "kafka_broker_list";
public static final String KAFKA_TOPIC_PROPERTY = "kafka_topic";
// optional
public static final String KAFKA_PARTITIONS_PROPERTY = "kafka_partitions";
Expand All @@ -93,7 +93,7 @@ public class CreateRoutineLoadStmt extends DdlStmt {
.build();

private static final ImmutableSet<String> KAFKA_PROPERTIES_SET = new ImmutableSet.Builder<String>()
.add(KAFKA_ENDPOINT_PROPERTY)
.add(KAFKA_BROKER_LIST_PROPERTY)
.add(KAFKA_TOPIC_PROPERTY)
.add(KAFKA_PARTITIONS_PROPERTY)
.build();
Expand All @@ -110,7 +110,7 @@ public class CreateRoutineLoadStmt extends DdlStmt {
private RoutineLoadDesc routineLoadDesc;
private int desiredConcurrentNum;
private int maxErrorNum;
private String kafkaEndpoint;
private String kafkaBrokerList;
private String kafkaTopic;
private List<Integer> kafkaPartitions;

Expand All @@ -121,7 +121,7 @@ public CreateRoutineLoadStmt(String name, TableName dbTableName, List<ParseNode>
this.dbTableName = dbTableName;
this.loadPropertyList = loadPropertyList;
this.properties = properties;
this.typeName = typeName;
this.typeName = typeName.toUpperCase();
this.customProperties = customProperties;
}

Expand All @@ -145,6 +145,7 @@ public Map<String, String> getCustomProperties() {
return customProperties;
}

// nullable
public RoutineLoadDesc getRoutineLoadDesc() {
return routineLoadDesc;
}
Expand All @@ -157,8 +158,8 @@ public int getMaxErrorNum() {
return maxErrorNum;
}

public String getKafkaEndpoint() {
return kafkaEndpoint;
public String getKafkaBrokerList() {
return kafkaBrokerList;
}

public String getKafkaTopic() {
Expand All @@ -176,6 +177,7 @@ public void analyze(Analyzer analyzer) throws AnalysisException, UserException {
FeNameFormat.checkCommonName(NAME_TYPE, name);
// check dbName and tableName
checkDBTableName();
dbTableName.analyze(analyzer);
// check load properties include column separator etc.
checkLoadProperties(analyzer);
// check routine load properties include desired concurrent number etc.
Expand Down Expand Up @@ -291,12 +293,16 @@ private void checkKafkaCustomProperties() throws AnalysisException {
throw new AnalysisException(optional.get() + " is invalid kafka custom property");
}
// check endpoint
kafkaEndpoint = customProperties.get(KAFKA_ENDPOINT_PROPERTY);
if (Strings.isNullOrEmpty(kafkaEndpoint)) {
throw new AnalysisException(KAFKA_ENDPOINT_PROPERTY + " is required property");
kafkaBrokerList = customProperties.get(KAFKA_BROKER_LIST_PROPERTY);
if (Strings.isNullOrEmpty(kafkaBrokerList)) {
throw new AnalysisException(KAFKA_BROKER_LIST_PROPERTY + " is required property");
}
if (!Pattern.matches(ENDPOINT_REGEX, kafkaEndpoint)) {
throw new AnalysisException(KAFKA_ENDPOINT_PROPERTY + " not match pattern " + ENDPOINT_REGEX);
String[] kafkaBrokerList = this.kafkaBrokerList.split(",");
for (String broker : kafkaBrokerList) {
if (!Pattern.matches(ENDPOINT_REGEX, broker)) {
throw new AnalysisException(KAFKA_BROKER_LIST_PROPERTY + ":" + broker
+ " not match pattern " + ENDPOINT_REGEX);
}
}
// check topic
kafkaTopic = customProperties.get(KAFKA_TOPIC_PROPERTY);
Expand Down
14 changes: 14 additions & 0 deletions fe/src/main/java/org/apache/doris/catalog/Catalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.FeMetaVersion;
import org.apache.doris.common.MarkedCountDownLatch;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.common.io.Text;
Expand Down Expand Up @@ -133,6 +134,8 @@
import org.apache.doris.load.LoadJob;
import org.apache.doris.load.LoadJob.JobState;
import org.apache.doris.load.routineload.RoutineLoadManager;
import org.apache.doris.load.routineload.RoutineLoadScheduler;
import org.apache.doris.load.routineload.RoutineLoadTaskScheduler;
import org.apache.doris.master.Checkpoint;
import org.apache.doris.master.MetaHelper;
import org.apache.doris.meta.MetaContext;
Expand Down Expand Up @@ -349,6 +352,10 @@ public class Catalog {

private TabletChecker tabletChecker;

private RoutineLoadScheduler routineLoadScheduler;

private RoutineLoadTaskScheduler routineLoadTaskScheduler;

public List<Frontend> getFrontends(FrontendNodeType nodeType) {
if (nodeType == null) {
// get all
Expand Down Expand Up @@ -466,6 +473,9 @@ private Catalog() {
this.stat = new TabletSchedulerStat();
this.tabletScheduler = new TabletScheduler(this, systemInfo, tabletInvertedIndex, stat);
this.tabletChecker = new TabletChecker(this, systemInfo, tabletScheduler, stat);

this.routineLoadScheduler = new RoutineLoadScheduler(routineLoadManager);
this.routineLoadTaskScheduler = new RoutineLoadTaskScheduler(routineLoadManager);
}

public static void destroyCheckpoint() {
Expand Down Expand Up @@ -650,6 +660,10 @@ public void initialize(String[] args) throws Exception {
txnCleaner.setInterval(Math.min(Config.transaction_clean_interval_second,
Config.stream_load_default_timeout_second) * 100L);

// 8. start routine load scheduler
routineLoadScheduler.start();
routineLoadTaskScheduler.start();

}

private void getClusterIdAndRole() throws IOException {
Expand Down
2 changes: 2 additions & 0 deletions fe/src/main/java/org/apache/doris/load/RoutineLoadDesc.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public class RoutineLoadDesc {
private final ColumnSeparator columnSeparator;
private final LoadColumnsInfo columnsInfo;
private final Expr wherePredicate;
// nullable
private final List<String> partitionNames;

public RoutineLoadDesc(ColumnSeparator columnSeparator, LoadColumnsInfo columnsInfo,
Expand All @@ -50,6 +51,7 @@ public Expr getWherePredicate() {
return wherePredicate;
}

// nullable
public List<String> getPartitionNames() {
return partitionNames;
}
Expand Down
Loading