Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -198,10 +198,10 @@ public void analyze(Analyzer analyzer) throws UserException {
dbTableName.analyze(analyzer);
// check load properties include column separator etc.
checkLoadProperties(analyzer);
// check routine load properties include desired concurrent number etc.
// check routine load job properties include desired concurrent number etc.
checkJobProperties();
// check data load source properties
checkLoadSourceProperties();
// check data source properties
checkDataSourceProperties();
}

public void checkLoadProperties(Analyzer analyzer) throws UserException {
Expand Down Expand Up @@ -274,7 +274,7 @@ private int getIntegetPropertyOrDefault(String propName, String hintMsg, int def
return defaultVal;
}

private void checkLoadSourceProperties() throws AnalysisException {
private void checkDataSourceProperties() throws AnalysisException {
LoadDataSourceType type;
try {
type = LoadDataSourceType.valueOf(typeName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,11 @@ public class KafkaProgress extends RoutineLoadProgress {
private Map<Integer, Long> partitionIdToOffset = Maps.newHashMap();

public KafkaProgress() {
super(LoadDataSourceType.KAFKA);
}

public KafkaProgress(TKafkaRLTaskProgress tKafkaRLTaskProgress) {
super(LoadDataSourceType.KAFKA);
this.partitionIdToOffset = tKafkaRLTaskProgress.getPartitionCmtOffset();
}

Expand All @@ -58,6 +60,18 @@ public void setPartitionIdToOffset(Map<Integer, Long> partitionIdToOffset) {
this.partitionIdToOffset = partitionIdToOffset;
}

// (partition id, end offset)
// end offset = -1 while begin offset of partition is 0
@Override
public String toString() {
Map<Integer, Long> showPartitionIdToOffset = new HashMap<>();
for (Map.Entry<Integer, Long> entry : partitionIdToOffset.entrySet()) {
showPartitionIdToOffset.put(entry.getKey(), entry.getValue() - 1);
}
return "KafkaProgress [partitionIdToOffset="
+ Joiner.on("|").withKeyValueSeparator("_").join(showPartitionIdToOffset) + "]";
}

@Override
public void update(RoutineLoadProgress progress) {
KafkaProgress newProgress = (KafkaProgress) progress;
Expand All @@ -67,31 +81,21 @@ public void update(RoutineLoadProgress progress) {

@Override
public void write(DataOutput out) throws IOException {
super.write(out);
out.writeInt(partitionIdToOffset.size());
for (Map.Entry entry : partitionIdToOffset.entrySet()) {
for (Map.Entry<Integer, Long> entry : partitionIdToOffset.entrySet()) {
out.writeInt((Integer) entry.getKey());
out.writeLong((Long) entry.getValue());
}
}

@Override
public void readFields(DataInput in) throws IOException {
super.readFields(in);
int size = in.readInt();
partitionIdToOffset = new HashMap<>();
for (int i = 0; i < size; i++) {
partitionIdToOffset.put(in.readInt(), in.readLong());
}
}

// (partition id, end offset)
// end offset = -1 while begin offset of partition is 0
@Override
public String toString() {
Map<Integer, Long> showPartitionIdToOffset = new HashMap<>();
for (Map.Entry<Integer, Long> entry : partitionIdToOffset.entrySet()) {
showPartitionIdToOffset.put(entry.getKey(), entry.getValue() - 1);
}
return "KafkaProgress [partitionIdToOffset="
+ Joiner.on("|").withKeyValueSeparator("_").join(showPartitionIdToOffset) + "]";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@
public class KafkaRoutineLoadJob extends RoutineLoadJob {
private static final Logger LOG = LogManager.getLogger(KafkaRoutineLoadJob.class);

private static final int FETCH_PARTITIONS_TIMEOUT_SECOND = 10;
private static final int FETCH_PARTITIONS_TIMEOUT_SECOND = 5;

private String brokerList;
private String topic;
Expand Down Expand Up @@ -177,8 +177,14 @@ boolean checkCommitInfo(RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment) {
}

@Override
protected void updateProgress(RLTaskTxnCommitAttachment attachment, boolean isReplay) {
super.updateProgress(attachment, isReplay);
protected void updateProgress(RLTaskTxnCommitAttachment attachment) {
super.updateProgress(attachment);
this.progress.update(attachment.getProgress());
}

@Override
protected void replayUpdateProgress(RLTaskTxnCommitAttachment attachment) {
super.replayUpdateProgress(attachment);
this.progress.update(attachment.getProgress());
}

Expand Down Expand Up @@ -221,7 +227,8 @@ protected boolean unprotectNeedReschedule() {
.build(), e);
if (this.state == JobState.NEED_SCHEDULE) {
unprotectUpdateState(JobState.PAUSED,
"Job failed to fetch all current partition with error " + e.getMessage(), false);
"Job failed to fetch all current partition with error " + e.getMessage(),
false /* not replay */);
}
return false;
}
Expand Down Expand Up @@ -257,8 +264,8 @@ protected boolean unprotectNeedReschedule() {

private List<Integer> getAllKafkaPartitions() {
List<Integer> result = new ArrayList<>();
List<PartitionInfo> partitionList = consumer.partitionsFor(
topic, Duration.ofSeconds(FETCH_PARTITIONS_TIMEOUT_SECOND));
List<PartitionInfo> partitionList = consumer.partitionsFor(topic,
Duration.ofSeconds(FETCH_PARTITIONS_TIMEOUT_SECOND));
for (PartitionInfo partitionInfo : partitionList) {
result.add(partitionInfo.partition());
}
Expand All @@ -271,8 +278,9 @@ public static KafkaRoutineLoadJob fromCreateStmt(CreateRoutineLoadStmt stmt) thr
if (db == null) {
ErrorReport.reportDdlException(ErrorCode.ERR_BAD_DB_ERROR, stmt.getDBTableName().getDb());
}
db.readLock();

long tableId = -1L;
db.readLock();
try {
unprotectedCheckMeta(db, stmt.getDBTableName().getTbl(), stmt.getRoutineLoadDesc());
tableId = db.getTable(stmt.getDBTableName().getTbl()).getId();
Expand All @@ -282,10 +290,8 @@ public static KafkaRoutineLoadJob fromCreateStmt(CreateRoutineLoadStmt stmt) thr

// init kafka routine load job
long id = Catalog.getInstance().getNextId();
KafkaRoutineLoadJob kafkaRoutineLoadJob =
new KafkaRoutineLoadJob(id, stmt.getName(), db.getId(), tableId,
stmt.getKafkaBrokerList(),
stmt.getKafkaTopic());
KafkaRoutineLoadJob kafkaRoutineLoadJob = new KafkaRoutineLoadJob(id, stmt.getName(), db.getId(),
tableId, stmt.getKafkaBrokerList(), stmt.getKafkaTopic());
kafkaRoutineLoadJob.setOptional(stmt);

return kafkaRoutineLoadJob;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.doris.thrift.TRLTaskTxnCommitAttachment;
import org.apache.doris.thrift.TUniqueId;
import org.apache.doris.transaction.TransactionState;
import org.apache.doris.transaction.TxnCommitAttachment;

import java.io.DataInput;
Expand All @@ -34,20 +35,20 @@ public class RLTaskTxnCommitAttachment extends TxnCommitAttachment {
private long filteredRows;
private long loadedRows;
private RoutineLoadProgress progress;
private LoadDataSourceType loadDataSourceType;

public RLTaskTxnCommitAttachment() {
super(TransactionState.LoadJobSourceType.ROUTINE_LOAD_TASK);
}

public RLTaskTxnCommitAttachment(TRLTaskTxnCommitAttachment rlTaskTxnCommitAttachment) {
super(TransactionState.LoadJobSourceType.ROUTINE_LOAD_TASK);
this.jobId = rlTaskTxnCommitAttachment.getJobId();
this.taskId = rlTaskTxnCommitAttachment.getId();
this.filteredRows = rlTaskTxnCommitAttachment.getFilteredRows();
this.loadedRows = rlTaskTxnCommitAttachment.getLoadedRows();

switch (rlTaskTxnCommitAttachment.getLoadSourceType()) {
case KAFKA:
this.loadDataSourceType = LoadDataSourceType.KAFKA;
this.progress = new KafkaProgress(rlTaskTxnCommitAttachment.getKafkaRLTaskProgress());
default:
break;
Expand Down Expand Up @@ -81,11 +82,17 @@ public String toString() {

@Override
public void write(DataOutput out) throws IOException {
// TODO: think twice
super.write(out);
out.writeLong(filteredRows);
out.writeLong(loadedRows);
progress.write(out);
}

@Override
public void readFields(DataInput in) throws IOException {
// TODO: think twice
super.readFields(in);
filteredRows = in.readLong();
loadedRows = in.readLong();
progress = RoutineLoadProgress.read(in);
}
}
Loading