Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion be/src/runtime/stream_load/stream_load_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ class KafkaLoadInfo {
KafkaLoadInfo(const TKafkaLoadInfo& t_info):
brokers(t_info.brokers),
topic(t_info.topic),
begin_offset(t_info.partition_begin_offset) {
begin_offset(t_info.partition_begin_offset),
cmt_offset(t_info.partition_begin_offset) {

if (t_info.__isset.max_interval_s) { max_interval_s = t_info.max_interval_s; }
if (t_info.__isset.max_batch_rows) { max_batch_rows = t_info.max_batch_rows; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ public class CreateRoutineLoadStmt extends DdlStmt {
public static final String KAFKA_TOPIC_PROPERTY = "kafka_topic";
// optional
public static final String KAFKA_PARTITIONS_PROPERTY = "kafka_partitions";
public static final String KAFKA_OFFSETS_PROPERTY = "kafka_offsets";

private static final String NAME_TYPE = "ROUTINE LOAD NAME";
private static final String ENDPOINT_REGEX = "[-A-Za-z0-9+&@#/%?=~_|!:,.;]+[-A-Za-z0-9+&@#/%=~_|]";
Expand All @@ -96,6 +97,7 @@ public class CreateRoutineLoadStmt extends DdlStmt {
.add(KAFKA_BROKER_LIST_PROPERTY)
.add(KAFKA_TOPIC_PROPERTY)
.add(KAFKA_PARTITIONS_PROPERTY)
.add(KAFKA_OFFSETS_PROPERTY)
.build();

private final String name;
Expand All @@ -113,6 +115,7 @@ public class CreateRoutineLoadStmt extends DdlStmt {
private String kafkaBrokerList;
private String kafkaTopic;
private List<Integer> kafkaPartitions;
private List<Long> kafkaOffsets;

public CreateRoutineLoadStmt(String name, TableName dbTableName, List<ParseNode> loadPropertyList,
Map<String, String> properties,
Expand Down Expand Up @@ -170,6 +173,10 @@ public List<Integer> getKafkaPartitions() {
return kafkaPartitions;
}

public List<Long> getKafkaOffsets(){
return kafkaOffsets;
}

@Override
public void analyze(Analyzer analyzer) throws AnalysisException, UserException {
super.analyze(analyzer);
Expand Down Expand Up @@ -243,30 +250,32 @@ private void checkLoadProperties(Analyzer analyzer) throws AnalysisException {
}

private void checkRoutineLoadProperties() throws AnalysisException {
Optional<String> optional = properties.keySet().parallelStream()
.filter(entity -> !PROPERTIES_SET.contains(entity)).findFirst();
if (optional.isPresent()) {
throw new AnalysisException(optional.get() + " is invalid property");
}

// check desired concurrent number
final String desiredConcurrentNumberString = properties.get(DESIRED_CONCURRENT_NUMBER_PROPERTY);
if (desiredConcurrentNumberString != null) {
desiredConcurrentNum = getIntegerValueFromString(desiredConcurrentNumberString,
DESIRED_CONCURRENT_NUMBER_PROPERTY);
if (desiredConcurrentNum <= 0) {
throw new AnalysisException(DESIRED_CONCURRENT_NUMBER_PROPERTY + " must be greater then 0");
if (properties != null) {
Optional<String> optional = properties.keySet().parallelStream()
.filter(entity -> !PROPERTIES_SET.contains(entity)).findFirst();
if (optional.isPresent()) {
throw new AnalysisException(optional.get() + " is invalid property");
}
}

// check max error number
final String maxErrorNumberString = properties.get(MAX_ERROR_NUMBER_PROPERTY);
if (maxErrorNumberString != null) {
maxErrorNum = getIntegerValueFromString(maxErrorNumberString, MAX_ERROR_NUMBER_PROPERTY);
if (maxErrorNum < 0) {
throw new AnalysisException(MAX_ERROR_NUMBER_PROPERTY + " must be greater then or equal to 0");
// check desired concurrent number
final String desiredConcurrentNumberString = properties.get(DESIRED_CONCURRENT_NUMBER_PROPERTY);
if (desiredConcurrentNumberString != null) {
desiredConcurrentNum = getIntegerValueFromString(desiredConcurrentNumberString,
DESIRED_CONCURRENT_NUMBER_PROPERTY);
if (desiredConcurrentNum <= 0) {
throw new AnalysisException(DESIRED_CONCURRENT_NUMBER_PROPERTY + " must be greater then 0");
}
}

// check max error number
final String maxErrorNumberString = properties.get(MAX_ERROR_NUMBER_PROPERTY);
if (maxErrorNumberString != null) {
maxErrorNum = getIntegerValueFromString(maxErrorNumberString, MAX_ERROR_NUMBER_PROPERTY);
if (maxErrorNum < 0) {
throw new AnalysisException(MAX_ERROR_NUMBER_PROPERTY + " must be greater then or equal to 0");
}

}
}
}

Expand Down Expand Up @@ -326,6 +335,16 @@ private void checkKafkaCustomProperties() throws AnalysisException {
}
}
}
// check offsets
// Todo(ml)
final String kafkaOffsetsString = customProperties.get(KAFKA_OFFSETS_PROPERTY);
if (kafkaOffsetsString != null) {
kafkaOffsets = new ArrayList<>();
String[] kafkaOffsetsStringList = customProperties.get(KAFKA_OFFSETS_PROPERTY).split(",");
for (String s : kafkaOffsetsStringList) {
kafkaOffsets.add(Long.valueOf(s));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should check the number format. And also check whether the number of specified offsets equals to number of specified partitions

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will do it later

}
}
}

private int getIntegerValueFromString(String valueString, String propertyName) throws AnalysisException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,19 @@
package org.apache.doris.load;

import org.apache.doris.transaction.AbortTransactionException;
import org.apache.doris.transaction.TransactionException;
import org.apache.doris.transaction.TransactionState;

public interface TxnStateChangeListener {

void beforeCommitted(TransactionState txnState) throws TransactionException;

/**
* update catalog of job which has related txn after transaction has been committed
*
* @param txnState
*/
void onCommitted(TransactionState txnState);
void onCommitted(TransactionState txnState) throws TransactionException;

/**
* this interface is executed before txn aborted, you can check if txn could be abort in this stage
Expand All @@ -37,7 +40,7 @@ public interface TxnStateChangeListener {
* @throws AbortTransactionException if transaction could not be abort or there are some exception before aborted,
* it will throw this exception
*/
void beforeAborted(TransactionState txnState, TransactionState.TxnStatusChangeReason txnStatusChangeReason)
void beforeAborted(TransactionState txnState, String txnStatusChangeReason)
throws AbortTransactionException;

/**
Expand All @@ -46,5 +49,5 @@ void beforeAborted(TransactionState txnState, TransactionState.TxnStatusChangeRe
* @param txnState
* @param txnStatusChangeReason maybe null
*/
void onAborted(TransactionState txnState, TransactionState.TxnStatusChangeReason txnStatusChangeReason);
void onAborted(TransactionState txnState, String txnStatusChangeReason);
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
// {"partitionIdToOffset": {}}
public class KafkaProgress extends RoutineLoadProgress {

// (partition id, begin offset)
private Map<Integer, Long> partitionIdToOffset;

public KafkaProgress() {
Expand All @@ -57,7 +58,7 @@ public void setPartitionIdToOffset(Map<Integer, Long> partitionIdToOffset) {
public void update(RoutineLoadProgress progress) {
KafkaProgress newProgress = (KafkaProgress) progress;
newProgress.getPartitionIdToOffset().entrySet().parallelStream()
.forEach(entity -> partitionIdToOffset.put(entity.getKey(), entity.getValue()));
.forEach(entity -> partitionIdToOffset.put(entity.getKey(), entity.getValue() + 1));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why + 1?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The offset of txn is the end offset while the kafka offset of progress is begin offset . The next task will read data from this begin offset.

}

@Override
Expand All @@ -78,9 +79,15 @@ public void readFields(DataInput in) throws IOException {
}
}

// (partition id, end offset)
// end offset = -1 while begin offset of partition is 0
@Override
public String toString() {
Map<Integer, Long> showPartitionIdToOffset = new HashMap<>();
for (Map.Entry<Integer, Long> entry : partitionIdToOffset.entrySet()) {
showPartitionIdToOffset.put(entry.getKey(), entry.getValue() - 1);
}
return "KafkaProgress [partitionIdToOffset="
+ Joiner.on("|").withKeyValueSeparator("_").join(partitionIdToOffset) + "]";
+ Joiner.on("|").withKeyValueSeparator("_").join(showPartitionIdToOffset) + "]";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ private void setCustomKafkaPartitions(List<Integer> kafkaPartitions) throws Load
}

@Override
public List<RoutineLoadTaskInfo> divideRoutineLoadJob(int currentConcurrentTaskNum) {
public void divideRoutineLoadJob(int currentConcurrentTaskNum) {
List<RoutineLoadTaskInfo> result = new ArrayList<>();
writeLock();
try {
Expand Down Expand Up @@ -148,7 +148,6 @@ public List<RoutineLoadTaskInfo> divideRoutineLoadJob(int currentConcurrentTaskN
} finally {
writeUnlock();
}
return result;
}

@Override
Expand Down Expand Up @@ -261,9 +260,7 @@ public static KafkaRoutineLoadJob fromCreateStmt(CreateRoutineLoadStmt stmt) thr
private void updateNewPartitionProgress() {
// update the progress of new partitions
for (Integer kafkaPartition : currentKafkaPartitions) {
if (((KafkaProgress) progress).getPartitionIdToOffset().containsKey(kafkaPartition)) {
((KafkaProgress) progress).getPartitionIdToOffset().get(kafkaPartition);
} else {
if (!((KafkaProgress) progress).getPartitionIdToOffset().containsKey(kafkaPartition)) {
((KafkaProgress) progress).getPartitionIdToOffset().put(kafkaPartition, 0L);
}
}
Expand All @@ -290,6 +287,12 @@ private void setOptional(CreateRoutineLoadStmt stmt) throws LoadException {
}
if (stmt.getKafkaPartitions() != null) {
setCustomKafkaPartitions(stmt.getKafkaPartitions());
if (stmt.getKafkaOffsets() != null) {
for (int i = 0; i < customKafkaPartitions.size(); i++) {
((KafkaProgress) progress).getPartitionIdToOffset()
.put(customKafkaPartitions.get(i), stmt.getKafkaOffsets().get(i));
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public KafkaTaskInfo(UUID id, long jobId) {

public KafkaTaskInfo(KafkaTaskInfo kafkaTaskInfo) throws LabelAlreadyUsedException,
BeginTransactionException, AnalysisException {
super(UUID.randomUUID(), kafkaTaskInfo.getJobId());
super(UUID.randomUUID(), kafkaTaskInfo.getJobId(), kafkaTaskInfo.getPreviousBeId());
this.partitions = kafkaTaskInfo.getPartitions();
}

Expand Down Expand Up @@ -109,6 +109,10 @@ private TExecPlanFragmentParams updateTExecPlanFragmentParams(RoutineLoadJob rou
TExecPlanFragmentParams tExecPlanFragmentParams = routineLoadJob.gettExecPlanFragmentParams();
TPlanFragment tPlanFragment = tExecPlanFragmentParams.getFragment();
tPlanFragment.getOutput_sink().getOlap_table_sink().setTxn_id(this.txnId);
TUniqueId queryId = new TUniqueId(id.getMostSignificantBits(), id.getLeastSignificantBits());
tExecPlanFragmentParams.getParams().setQuery_id(queryId);
tExecPlanFragmentParams.getParams().getPer_node_scan_ranges().values().stream()
.forEach(entity -> entity.get(0).getScan_range().getBroker_scan_range().getRanges().get(0).setLoad_id(queryId));
return tExecPlanFragmentParams;
}
}
Loading