Skip to content

Conversation

@morningman
Copy link
Contributor

No description provided.

columnsInfo = (LoadColumnsInfo) parseNode;
columnsInfo.analyze(analyzer);
} else if (parseNode instanceof Expr) {
importColumnsStmt = (ImportColumnsStmt) parseNode;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why don't you analyze importColumnStmt

}
wherePredicate = (Expr) parseNode;
wherePredicate.analyze(analyzer);
importWhereStmt = (ImportWhereStmt) parseNode;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why don't you analyze importWhereStmt

}

private void checkCustomProperties() throws AnalysisException {
private void checkLoadSourceProperties() throws AnalysisException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LoadSource or DataSource ?

throw new AnalysisException(KAFKA_OFFSETS_PROPERTY + " could not be a empty string");
}
String[] kafkaOffsetsStringList = kafkaOffsetsString.split(",");
if (kafkaOffsetsStringList.length != kafkaPartitionOffsets.size()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if some partition has offset while others hasn't ?

super(-1, LoadDataSourceType.KAFKA);
}

public KafkaRoutineLoadJob(Long id, String name, long dbId, long tableId, String brokerList, String topic) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe id can be initialized by itself

}

// init kafka routine load job
long id = Catalog.getInstance().getNextId();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as constructor

// when currentTotalNum is more then ten thousand or currentErrorNum is more then maxErrorNum
protected int currentErrorNum;
protected int currentTotalNum;
protected long currentErrorNum;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use int is enough. It will be reset after more then 10000

import org.apache.doris.transaction.AbortTransactionException;
import org.apache.doris.transaction.TransactionException;
import org.apache.doris.transaction.TransactionState;
public abstract class TxnStateChangeListener {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why change interface to abstract class?

@morningman morningman merged commit ba8a63b into apache:kafka_routine_load Mar 14, 2019
morningman added a commit to morningman/doris that referenced this pull request Mar 25, 2019
morningman added a commit to morningman/doris that referenced this pull request Apr 4, 2019
EmmyMiao87 pushed a commit to EmmyMiao87/incubator-doris that referenced this pull request Apr 23, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants