Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import com.google.gson.annotations.SerializedName;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.log4j.Log4j2;
import org.apache.commons.collections.CollectionUtils;

import java.io.DataOutput;
Expand All @@ -65,6 +66,7 @@
import java.util.List;
import java.util.Map;

@Log4j2
public class StreamingInsertJob extends AbstractJob<StreamingJobSchedulerTask, Map<Object, Object>> implements
TxnStateChangeCallback, GsonPostProcessable {
private final long dbId;
Expand All @@ -83,14 +85,16 @@ public class StreamingInsertJob extends AbstractJob<StreamingJobSchedulerTask, M
@SerializedName("jp")
private StreamingJobProperties jobProperties;
@Getter
@SerializedName("tt")
@SerializedName("tvf")
private String tvfType;
private Map<String, String> originTvfProps;
@Getter
StreamingInsertTask runningStreamTask;
SourceOffsetProvider offsetProvider;
@Setter
@Getter
private long lastScheduleTaskTimestamp = -1L;
private InsertIntoTableCommand baseCommand;

public StreamingInsertJob(String jobName,
JobStatus jobStatus,
Expand All @@ -105,17 +109,28 @@ public StreamingInsertJob(String jobName,
jobConfig, createTimeMs, executeSql);
this.dbId = ConnectContext.get().getCurrentDbId();
this.jobProperties = jobProperties;
this.tvfType = parseTvfType();
this.offsetProvider = SourceOffsetProviderFactory.createSourceOffsetProvider(tvfType);
this.offsetProvider.init(getExecuteSql(), jobProperties);
init();
}

private String parseTvfType() {
NereidsParser parser = new NereidsParser();
InsertIntoTableCommand command = (InsertIntoTableCommand) parser.parseSingle(getExecuteSql());
UnboundTVFRelation firstTVF = command.getFirstTVF();
Preconditions.checkNotNull(firstTVF, "Only support insert sql with tvf");
return firstTVF.getFunctionName();
private void init() {
try {
UnboundTVFRelation currentTvf = getCurrentTvf();
this.originTvfProps = currentTvf.getProperties().getMap();
this.offsetProvider = SourceOffsetProviderFactory.createSourceOffsetProvider(currentTvf.getFunctionName());
} catch (Exception ex) {
log.warn("init streaming insert job failed, sql: {}", getExecuteSql(), ex);
throw new RuntimeException("init streaming insert job failed, sql: " + getExecuteSql(), ex);
}
}

private UnboundTVFRelation getCurrentTvf() throws Exception {
if (baseCommand == null) {
this.baseCommand = (InsertIntoTableCommand) new NereidsParser().parseSingle(getExecuteSql());
}
List<UnboundTVFRelation> allTVFRelation = baseCommand.getAllTVFRelation();
Preconditions.checkArgument(allTVFRelation.size() == 1, "Only support one source in insert streaming job");
UnboundTVFRelation unboundTVFRelation = allTVFRelation.get(0);
return unboundTVFRelation;
}

@Override
Expand Down Expand Up @@ -155,7 +170,14 @@ protected StreamingInsertTask createStreamingInsertTask() {
}

protected void fetchMeta() {
offsetProvider.fetchRemoteMeta();
try {
if (originTvfProps == null) {
this.originTvfProps = getCurrentTvf().getProperties().getMap();
}
offsetProvider.fetchRemoteMeta(originTvfProps);
} catch (Exception ex) {
log.warn("fetch remote meta failed, job id: {}", getJobId(), ex);
}
}

public boolean needScheduleTask() {
Expand Down Expand Up @@ -228,8 +250,19 @@ public TRow getTvfInfo() {
trow.addToColumnValue(new TCell().setStringVal(String.valueOf(getFailedTaskCount().get())));
trow.addToColumnValue(new TCell().setStringVal(String.valueOf(getCanceledTaskCount().get())));
trow.addToColumnValue(new TCell().setStringVal(getComment()));
trow.addToColumnValue(new TCell().setStringVal(FeConstants.null_string));
trow.addToColumnValue(new TCell().setStringVal(FeConstants.null_string));

if (offsetProvider != null && offsetProvider.getSyncOffset() != null) {
trow.addToColumnValue(new TCell().setStringVal(offsetProvider.getSyncOffset()));
} else {
trow.addToColumnValue(new TCell().setStringVal(FeConstants.null_string));
}
if (offsetProvider != null && offsetProvider.getRemoteOffset() != null) {
trow.addToColumnValue(new TCell().setStringVal(offsetProvider.getRemoteOffset()));
} else {
trow.addToColumnValue(new TCell().setStringVal(FeConstants.null_string));
}

trow.addToColumnValue(new TCell().setStringVal(offsetProvider.getRemoteOffset()));
trow.addToColumnValue(new TCell().setStringVal(
jobStatistic == null ? FeConstants.null_string : jobStatistic.toJson()));
trow.addToColumnValue(new TCell().setStringVal(failMsg == null ? FeConstants.null_string : failMsg.getMsg()));
Expand Down Expand Up @@ -324,9 +357,8 @@ public void replayOnVisible(TransactionState txnState) {

@Override
public void gsonPostProcess() throws IOException {
if (offsetProvider == null && jobProperties != null && tvfType != null) {
this.offsetProvider = SourceOffsetProviderFactory.createSourceOffsetProvider(tvfType);
// this.offsetProvider.init(getExecuteSql(), jobProperties);
if (offsetProvider == null) {
offsetProvider = SourceOffsetProviderFactory.createSourceOffsetProvider(tvfType);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,29 +57,29 @@ public class StreamingInsertTask {
private Long startTimeMs;
private Long finishTimeMs;
private String sql;
private SourceOffsetProvider offsetProvider;
private StmtExecutor stmtExecutor;
private InsertIntoTableCommand command;
private InsertIntoTableCommand taskCommand;
private String currentDb;
private UserIdentity userIdentity;
private ConnectContext ctx;
private Offset runningOffset;
private AtomicBoolean isCanceled = new AtomicBoolean(false);
private StreamingJobProperties jobProperties;
SourceOffsetProvider offsetProvider;

public StreamingInsertTask(long jobId,
long taskId,
String sql,
SourceOffsetProvider offsetProvider,
String currentDb,
StreamingJobProperties jobProperties,
UserIdentity userIdentity) {
UserIdentity userIdentity) {
this.jobId = jobId;
this.taskId = taskId;
this.sql = sql;
this.offsetProvider = offsetProvider;
this.userIdentity = userIdentity;
this.currentDb = currentDb;
this.offsetProvider = offsetProvider;
this.jobProperties = jobProperties;
this.labelName = getJobId() + LABEL_SPLITTER + getTaskId();
this.createTimeMs = System.currentTimeMillis();
Expand All @@ -106,7 +106,7 @@ public void execute() throws JobException {
}
}

private void before() throws JobException {
private void before() throws Exception {
this.status = TaskStatus.RUNNING;
this.startTimeMs = System.currentTimeMillis();

Expand All @@ -117,12 +117,12 @@ private void before() throws JobException {
ctx.setSessionVariable(jobProperties.getSessionVariable());
StatementContext statementContext = new StatementContext();
ctx.setStatementContext(statementContext);
offsetProvider.init(sql, jobProperties);
this.runningOffset = offsetProvider.getNextOffset();
this.command = offsetProvider.rewriteTvfParams(runningOffset);
this.command.setLabelName(Optional.of(getJobId() + LABEL_SPLITTER + getTaskId()));
this.command.setJobId(getTaskId());
stmtExecutor = new StmtExecutor(ctx, new LogicalPlanAdapter(command, ctx.getStatementContext()));

this.runningOffset = offsetProvider.getNextOffset(jobProperties, jobProperties.getProperties());
this.taskCommand = offsetProvider.rewriteTvfParams(sql, runningOffset);
this.taskCommand.setLabelName(Optional.of(getJobId() + LABEL_SPLITTER + getTaskId()));
this.taskCommand.setJobId(getTaskId());
this.stmtExecutor = new StmtExecutor(ctx, new LogicalPlanAdapter(taskCommand, ctx.getStatementContext()));
}

private void run() throws JobException {
Expand All @@ -134,21 +134,21 @@ private void run() throws JobException {
log.info("task has been canceled, task id is {}", getTaskId());
return;
}
command.run(ctx, stmtExecutor);
taskCommand.run(ctx, stmtExecutor);
if (ctx.getState().getStateType() == QueryState.MysqlStateType.OK) {
return;
} else {
errMsg = ctx.getState().getErrorMessage();
}
log.error(
"streaming insert failed with {}, reason {}, to retry",
command.getLabelName(),
taskCommand.getLabelName(),
errMsg);
if (retry == MAX_RETRY) {
errMsg = "reached max retry times, failed with" + errMsg;
}
} catch (Exception e) {
log.warn("execute insert task error, label is {},offset is {}", command.getLabelName(),
log.warn("execute insert task error, label is {},offset is {}", taskCommand.getLabelName(),
runningOffset.toJson(), e);
errMsg = Util.getRootCauseMessage(e);
}
Expand Down Expand Up @@ -209,8 +209,8 @@ public void closeOrReleaseResources() {
if (null != stmtExecutor) {
stmtExecutor = null;
}
if (null != command) {
command = null;
if (null != taskCommand) {
taskCommand = null;
}
if (null != ctx) {
ctx = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,13 @@
import org.apache.doris.job.extensions.insert.streaming.StreamingJobProperties;
import org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand;

import java.util.Map;

/**
* Interface for managing offsets and metadata of a data source.
*/
public interface SourceOffsetProvider {

/**
* init
*/
void init(String executeSql, StreamingJobProperties jobProperties);

/**
* Get source type, e.g. s3, kafka
* @return
Expand All @@ -40,20 +37,32 @@ public interface SourceOffsetProvider {
* Get next offset to consume
* @return
*/
Offset getNextOffset();
Offset getNextOffset(StreamingJobProperties jobProps, Map<String, String> properties);

/**
* Get current offset
* @return
*/
Offset getCurrentOffset();

/**
* Get sync offset to show
* @return
*/
String getSyncOffset();

/**
* Get remote offset
* @return
*/
String getRemoteOffset();

/**
* Rewrite the TVF parameters in the SQL based on the current offset.
* @param nextOffset
* @return rewritten InsertIntoTableCommand
*/
InsertIntoTableCommand rewriteTvfParams(Offset nextOffset);
InsertIntoTableCommand rewriteTvfParams(String executeSql, Offset nextOffset) throws Exception;

/**
* Update the offset of the source.
Expand All @@ -64,7 +73,7 @@ public interface SourceOffsetProvider {
/**
* Fetch remote meta information, such as listing files in S3 or getting latest offsets in Kafka.
*/
void fetchRemoteMeta();
void fetchRemoteMeta(Map<String, String> properties) throws Exception;

/**
* Whether there is more data to consume
Expand Down
Loading
Loading