diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java index 5b386886b19ce8..e855aa4f83676f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java @@ -105,7 +105,7 @@ public class InsertJob extends AbstractJob> impl .add(new Column("ErrorMsg", ScalarType.createStringType())) .build(); - private static final ShowResultSetMetaData TASK_META_DATA = + public static final ShowResultSetMetaData TASK_META_DATA = ShowResultSetMetaData.builder() .addColumn(new Column("TaskId", ScalarType.createVarchar(80))) .addColumn(new Column("Label", ScalarType.createVarchar(80))) diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java index a4bbf51b8a40d8..c88fd019c46b98 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java @@ -18,46 +18,70 @@ package org.apache.doris.job.extensions.insert.streaming; import org.apache.doris.analysis.UserIdentity; +import org.apache.doris.catalog.Env; +import org.apache.doris.common.FeConstants; +import org.apache.doris.common.UserException; import org.apache.doris.common.io.Text; +import org.apache.doris.common.util.TimeUtils; import org.apache.doris.job.base.AbstractJob; +import org.apache.doris.job.base.JobExecuteType; import org.apache.doris.job.base.JobExecutionConfiguration; import org.apache.doris.job.common.JobStatus; import org.apache.doris.job.common.JobType; import org.apache.doris.job.common.PauseReason; +import org.apache.doris.job.common.TaskType; import org.apache.doris.job.exception.JobException; +import org.apache.doris.job.extensions.insert.InsertJob; +import org.apache.doris.job.offset.SourceOffsetProvider; +import org.apache.doris.job.offset.SourceOffsetProviderFactory; +import org.apache.doris.job.task.AbstractTask; +import org.apache.doris.load.FailMsg; +import org.apache.doris.load.loadv2.LoadStatistic; +import org.apache.doris.nereids.parser.NereidsParser; +import org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand; import org.apache.doris.persist.gson.GsonUtils; import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.ShowResultSetMetaData; +import org.apache.doris.thrift.TCell; +import org.apache.doris.thrift.TRow; +import org.apache.doris.transaction.TransactionException; +import org.apache.doris.transaction.TransactionState; +import org.apache.doris.transaction.TxnStateChangeCallback; import com.google.gson.annotations.SerializedName; import lombok.Getter; import lombok.Setter; +import org.apache.commons.collections.CollectionUtils; import java.io.DataOutput; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.Map; -public class StreamingInsertJob extends AbstractJob> { +public class StreamingInsertJob extends AbstractJob> implements + TxnStateChangeCallback { @SerializedName("did") private final long dbId; - + private LoadStatistic loadStatistic = new LoadStatistic(); + @SerializedName("fm") + private FailMsg failMsg; @Getter protected PauseReason pauseReason; - @Getter @Setter protected long latestAutoResumeTimestamp; - @Getter @Setter protected long autoResumeCount; - @Getter @SerializedName("jp") private StreamingJobProperties jobProperties; - + @Getter + StreamingInsertTask runningStreamTask; + SourceOffsetProvider offsetProvider; private long lastScheduleTaskTimestamp = -1L; public StreamingInsertJob(String jobName, @@ -73,6 +97,14 @@ public StreamingInsertJob(String jobName, jobConfig, createTimeMs, executeSql); this.dbId = ConnectContext.get().getCurrentDbId(); this.jobProperties = jobProperties; + String tvfType = parseTvfType(); + this.offsetProvider = SourceOffsetProviderFactory.createSourceOffsetProvider(tvfType); + } + + private String parseTvfType() { + NereidsParser parser = new NereidsParser(); + InsertIntoTableCommand command = (InsertIntoTableCommand) parser.parseSingle(getExecuteSql()); + return command.getFirstTvfName(); } @Override @@ -80,35 +112,109 @@ public void updateJobStatus(JobStatus status) throws JobException { super.updateJobStatus(status); } - protected void createStreamingInsertTask() { + @Override + public JobType getJobType() { + return JobType.INSERT; + } + + @Override + protected void checkJobParamsInternal() { + } + + @Override + public boolean isReadyForScheduling(Map taskContext) { + return CollectionUtils.isEmpty(getRunningTasks()); + } + + @Override + public List createTasks(TaskType taskType, Map taskContext) { + List newTasks = new ArrayList<>(); + StreamingJobSchedulerTask streamingJobSchedulerTask = new StreamingJobSchedulerTask(this); + newTasks.add(streamingJobSchedulerTask); + super.initTasks(newTasks, taskType); + return newTasks; + } + + protected StreamingInsertTask createStreamingInsertTask() { + InsertIntoTableCommand command = offsetProvider.rewriteTvfParams(getExecuteSql()); + this.runningStreamTask = new StreamingInsertTask(getJobId(), AbstractTask.getNextTaskId(), command, + loadStatistic, getCurrentDbName(), offsetProvider.getCurrentOffset(), jobProperties); + return this.runningStreamTask; } protected void fetchMeta() { + offsetProvider.fetchRemoteMeta(); + } + + public boolean needScheduleTask() { + return (getJobStatus().equals(JobStatus.RUNNING) || getJobStatus().equals(JobStatus.PENDING)); + } + + // When consumer to EOF, delay schedule task appropriately can avoid too many small transactions. + public boolean needDelayScheduleTask() { + return System.currentTimeMillis() - lastScheduleTaskTimestamp > jobProperties.getMaxIntervalSecond() * 1000; + } + + public boolean hasMoreDataToConsume() { + return offsetProvider.hasMoreDataToConsume(); } @Override - public JobType getJobType() { - return JobType.INSERT; + public void onTaskFail(StreamingJobSchedulerTask task) throws JobException { + // Here is the failure of StreamingJobSchedulerTask, no processing is required + getRunningTasks().remove(task); } @Override - protected void checkJobParamsInternal() { + public void onTaskSuccess(StreamingJobSchedulerTask task) throws JobException { + // Here is the success of StreamingJobSchedulerTask, no processing is required + getRunningTasks().remove(task); + } + + public void onStreamTaskFail(StreamingInsertTask task) throws JobException { + if (getJobConfig().getExecuteType().equals(JobExecuteType.INSTANT)) { + this.failMsg = new FailMsg(FailMsg.CancelType.LOAD_RUN_FAIL, task.getErrMsg()); + } + updateJobStatus(JobStatus.PAUSED); } + public void onStreamTaskSuccess(StreamingInsertTask task) { + StreamingInsertTask nextTask = createStreamingInsertTask(); + this.runningStreamTask = nextTask; + Env.getCurrentEnv().getJobManager().getStreamingTaskScheduler().registerTask(runningStreamTask); + } + + @Override - public boolean isReadyForScheduling(Map taskContext) { - return true; + public ShowResultSetMetaData getTaskMetaData() { + return InsertJob.TASK_META_DATA; } @Override - public java.util.List createTasks(org.apache.doris.job.common.TaskType taskType, - Map taskContext) { - return java.util.Collections.emptyList(); + public List getShowInfo() { + return getCommonShowInfo(); } @Override - public org.apache.doris.qe.ShowResultSetMetaData getTaskMetaData() { - return org.apache.doris.qe.ShowResultSetMetaData.builder().build(); + public TRow getTvfInfo() { + TRow trow = new TRow(); + trow.addToColumnValue(new TCell().setStringVal(String.valueOf(getJobId()))); + trow.addToColumnValue(new TCell().setStringVal(getJobName())); + trow.addToColumnValue(new TCell().setStringVal(getCreateUser().getQualifiedUser())); + trow.addToColumnValue(new TCell().setStringVal(getJobConfig().getExecuteType().name())); + trow.addToColumnValue(new TCell().setStringVal(getJobConfig().convertRecurringStrategyToString())); + trow.addToColumnValue(new TCell().setStringVal(getJobStatus().name())); + trow.addToColumnValue(new TCell().setStringVal(getExecuteSql())); + trow.addToColumnValue(new TCell().setStringVal(TimeUtils.longToTimeString(getCreateTimeMs()))); + trow.addToColumnValue(new TCell().setStringVal(String.valueOf(getSucceedTaskCount().get()))); + trow.addToColumnValue(new TCell().setStringVal(String.valueOf(getFailedTaskCount().get()))); + trow.addToColumnValue(new TCell().setStringVal(String.valueOf(getCanceledTaskCount().get()))); + trow.addToColumnValue(new TCell().setStringVal(getComment())); + trow.addToColumnValue(new TCell().setStringVal(FeConstants.null_string)); + trow.addToColumnValue(new TCell().setStringVal(FeConstants.null_string)); + trow.addToColumnValue(new TCell().setStringVal(loadStatistic.toJson())); + trow.addToColumnValue(new TCell().setStringVal(failMsg == null ? FeConstants.null_string : failMsg.getMsg())); + return trow; } @Override @@ -119,7 +225,11 @@ public String formatMsgWhenExecuteQueueFull(Long taskId) { @Override public List queryTasks() { - return new ArrayList<>(); + if (!getRunningTasks().isEmpty()) { + return getRunningTasks(); + } else { + return Arrays.asList(new StreamingJobSchedulerTask(this)); + } } @Override @@ -127,17 +237,50 @@ public void write(DataOutput out) throws IOException { Text.writeString(out, GsonUtils.GSON.toJson(this)); } - public boolean needScheduleTask() { - return (getJobStatus().equals(JobStatus.RUNNING) || getJobStatus().equals(JobStatus.PENDING)); + @Override + public long getId() { + return getJobId(); } - // When consumer to EOF, delay schedule task appropriately can avoid too many small transactions. - public boolean needDelayScheduleTask() { - return System.currentTimeMillis() - lastScheduleTaskTimestamp > jobProperties.getMaxIntervalSecond() * 1000; + @Override + public void beforeCommitted(TransactionState txnState) throws TransactionException { + } - public boolean hasMoreDataToConsume() { - // TODO: implement this - return true; + @Override + public void beforeAborted(TransactionState txnState) throws TransactionException { + + } + + @Override + public void afterCommitted(TransactionState txnState, boolean txnOperated) throws UserException { + + } + + @Override + public void replayOnCommitted(TransactionState txnState) { + + } + + @Override + public void afterAborted(TransactionState txnState, boolean txnOperated, String txnStatusChangeReason) + throws UserException { + + } + + @Override + public void replayOnAborted(TransactionState txnState) { + + } + + @Override + public void afterVisible(TransactionState txnState, boolean txnOperated) { + + } + + @Override + public void replayOnVisible(TransactionState txnState) { + + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java index 4e07ee01e305cf..56a59d3a7f6a7b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java @@ -17,20 +17,206 @@ package org.apache.doris.job.extensions.insert.streaming; +import org.apache.doris.analysis.UserIdentity; +import org.apache.doris.catalog.Env; +import org.apache.doris.common.Status; +import org.apache.doris.common.util.Util; +import org.apache.doris.job.base.Job; +import org.apache.doris.job.common.TaskStatus; +import org.apache.doris.job.exception.JobException; +import org.apache.doris.job.extensions.insert.InsertTask; +import org.apache.doris.job.offset.Offset; +import org.apache.doris.load.loadv2.LoadStatistic; +import org.apache.doris.nereids.StatementContext; +import org.apache.doris.nereids.glue.LogicalPlanAdapter; +import org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.QueryState; +import org.apache.doris.qe.StmtExecutor; +import org.apache.doris.thrift.TStatusCode; + import lombok.Getter; +import lombok.extern.log4j.Log4j2; + +import java.util.Optional; +import java.util.concurrent.atomic.AtomicBoolean; +@Log4j2 +@Getter public class StreamingInsertTask { - @Getter + private static final String LABEL_SPLITTER = "_"; + private static final int MAX_RETRY = 3; private long jobId; - - @Getter private long taskId; + private String labelName; + private TaskStatus status; + private String errMsg; + private Long createTimeMs; + private Long startTimeMs; + private Long finishTimeMs; + private InsertIntoTableCommand command; + private StmtExecutor stmtExecutor; + private String currentDb; + private UserIdentity userIdentity; + private ConnectContext ctx; + private LoadStatistic loadStatistic; + private Offset offset; + private AtomicBoolean isCanceled = new AtomicBoolean(false); + private StreamingJobProperties jobProperties; - public StreamingInsertTask(long jobId, long taskId) { + public StreamingInsertTask(long jobId, + long taskId, + InsertIntoTableCommand command, + LoadStatistic loadStatistic, + String currentDb, + Offset offset, + StreamingJobProperties jobProperties) { this.jobId = jobId; this.taskId = taskId; + this.command = command; + this.loadStatistic = loadStatistic; + this.userIdentity = ctx.getCurrentUserIdentity(); + this.currentDb = currentDb; + this.offset = offset; + this.jobProperties = jobProperties; + this.labelName = getJobId() + LABEL_SPLITTER + getTaskId(); + this.createTimeMs = System.currentTimeMillis(); + } + + public void execute() throws JobException { + try { + before(); + run(); + onSuccess(); + } catch (Exception e) { + if (TaskStatus.CANCELED.equals(status)) { + return; + } + onFail(e.getMessage()); + log.warn("execute task error, job id is {}, task id is {}", jobId, taskId, e); + } finally { + // The cancel logic will call the closeOrReleased Resources method by itself. + // If it is also called here, + // it may result in the inability to obtain relevant information when canceling the task + if (!TaskStatus.CANCELED.equals(status)) { + closeOrReleaseResources(); + } + } + } + + private void before() throws JobException { + this.startTimeMs = System.currentTimeMillis(); + if (isCanceled.get()) { + throw new JobException("Export executor has been canceled, task id: {}", getTaskId()); + } + ctx = InsertTask.makeConnectContext(userIdentity, currentDb); + ctx.setSessionVariable(jobProperties.getSessionVariable()); + StatementContext statementContext = new StatementContext(); + ctx.setStatementContext(statementContext); + this.command.setLabelName(Optional.of(this.labelName)); + this.command.setJobId(getTaskId()); + stmtExecutor = new StmtExecutor(ctx, new LogicalPlanAdapter(command, ctx.getStatementContext())); + } + + private void run() throws JobException { + String errMsg = null; + int retry = 0; + while (retry <= MAX_RETRY) { + try { + if (isCanceled.get()) { + log.info("task has been canceled, task id is {}", getTaskId()); + return; + } + command.runWithUpdateInfo(ctx, stmtExecutor, loadStatistic); + if (ctx.getState().getStateType() == QueryState.MysqlStateType.OK) { + return; + } else { + errMsg = ctx.getState().getErrorMessage(); + } + log.error( + "streaming insert failed with {}, reason {}, to retry", + command.getLabelName(), + errMsg); + if (retry == MAX_RETRY) { + errMsg = "reached max retry times, failed with" + errMsg; + } + } catch (Exception e) { + log.warn("execute insert task error, label is {},offset is {}", command.getLabelName(), + offset.toJson(), e); + errMsg = Util.getRootCauseMessage(e); + } + retry++; + } + log.error("streaming insert task failed, job id is {}, task id is {}, offset is {}, errMsg is {}", + getJobId(), getTaskId(), offset.toJson(), errMsg); + throw new JobException(errMsg); + } + + public boolean onSuccess() throws JobException { + if (TaskStatus.CANCELED.equals(status)) { + return false; + } + this.status = TaskStatus.SUCCESS; + this.finishTimeMs = System.currentTimeMillis(); + if (!isCallable()) { + return false; + } + Job job = Env.getCurrentEnv().getJobManager().getJob(getJobId()); + if (null == job) { + log.info("job is null, job id is {}", jobId); + return false; + } + + StreamingInsertJob streamingInsertJob = (StreamingInsertJob) job; + streamingInsertJob.onStreamTaskSuccess(this); + return true; + } + + public void onFail(String errMsg) throws JobException { + this.errMsg = errMsg; + if (TaskStatus.CANCELED.equals(status)) { + return; + } + this.status = TaskStatus.FAILED; + this.finishTimeMs = System.currentTimeMillis(); + if (!isCallable()) { + return; + } + Job job = Env.getCurrentEnv().getJobManager().getJob(getJobId()); + StreamingInsertJob streamingInsertJob = (StreamingInsertJob) job; + streamingInsertJob.onStreamTaskFail(this); + } + + public void cancel(boolean needWaitCancelComplete) throws Exception { + if (isCanceled.get()) { + return; + } + isCanceled.getAndSet(true); + if (null != stmtExecutor) { + stmtExecutor.cancel(new Status(TStatusCode.CANCELLED, "streaming insert task cancelled"), + needWaitCancelComplete); + } + } + + public void closeOrReleaseResources() { + if (null != stmtExecutor) { + stmtExecutor = null; + } + if (null != command) { + command = null; + } + if (null != ctx) { + ctx = null; + } } - public void execute() { + private boolean isCallable() { + if (status.equals(TaskStatus.CANCELED)) { + return false; + } + if (null != Env.getCurrentEnv().getJobManager().getJob(jobId)) { + return true; + } + return false; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobProperties.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobProperties.java index 25d256b127b791..4f463a090d5225 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobProperties.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobProperties.java @@ -20,9 +20,13 @@ import org.apache.doris.common.AnalysisException; import org.apache.doris.common.util.Util; import org.apache.doris.job.base.JobProperties; +import org.apache.doris.job.exception.JobException; +import org.apache.doris.persist.gson.GsonUtils; +import org.apache.doris.qe.SessionVariable; import lombok.Data; +import java.util.HashMap; import java.util.Map; @Data @@ -30,13 +34,16 @@ public class StreamingJobProperties implements JobProperties { public static final String MAX_INTERVAL_SECOND_PROPERTY = "max_interval"; public static final String S3_BATCH_FILES_PROPERTY = "s3.batch_files"; public static final String S3_BATCH_SIZE_PROPERTY = "s3.batch_size"; + public static final String SESSION_VAR_PREFIX = "session."; + public static final long DEFAULT_MAX_INTERVAL_SECOND = 10; public static final long DEFAULT_S3_BATCH_FILES = 256; public static final long DEFAULT_S3_BATCH_SIZE = 10 * 1024 * 1024 * 1024L; // 10GB - public static final long DEFAULT_INSERT_TIMEOUT = 30 * 60; // 30min + public static final int DEFAULT_INSERT_TIMEOUT = 30 * 60; // 30min private final Map properties; private long maxIntervalSecond; + private int maxRetry; private long s3BatchFiles; private long s3BatchSize; @@ -61,4 +68,23 @@ public void validate() throws AnalysisException { && v <= (long) (1024 * 1024 * 1024) * 10, StreamingJobProperties.S3_BATCH_SIZE_PROPERTY + " should between 100MB and 10GB"); } + + public SessionVariable getSessionVariable() throws JobException { + final Map sessionVarMap = new HashMap<>(); + for (Map.Entry entry : properties.entrySet()) { + if (entry.getKey().startsWith(SESSION_VAR_PREFIX)) { + String subKey = entry.getKey().substring(SESSION_VAR_PREFIX.length()); + sessionVarMap.put(subKey, entry.getValue()); + } + } + + SessionVariable sessionVariable = new SessionVariable(); + try { + sessionVariable.setInsertTimeoutS(DEFAULT_INSERT_TIMEOUT); + sessionVariable.readFromJson(GsonUtils.GSON.toJson(sessionVarMap)); + } catch (Exception e) { + throw new JobException("Invalid session variable, " + e.getMessage()); + } + return sessionVariable; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java index 888669f428af90..d724b09fbf0046 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java @@ -18,17 +18,17 @@ package org.apache.doris.job.extensions.insert.streaming; import org.apache.doris.common.InternalErrorCode; +import org.apache.doris.common.util.TimeUtils; import org.apache.doris.job.common.JobStatus; import org.apache.doris.job.common.PauseReason; import org.apache.doris.job.exception.JobException; import org.apache.doris.job.task.AbstractTask; +import org.apache.doris.thrift.TCell; import org.apache.doris.thrift.TRow; public class StreamingJobSchedulerTask extends AbstractTask { - private static final long BACK_OFF_BASIC_TIME_SEC = 10L; private static final long MAX_BACK_OFF_TIME_SEC = 60 * 5; - private StreamingInsertJob streamingInsertJob; public StreamingJobSchedulerTask(StreamingInsertJob streamingInsertJob) { @@ -80,14 +80,43 @@ private void autoResumeHandler() throws JobException { @Override protected void closeOrReleaseResources() { + if (streamingInsertJob.getRunningStreamTask() != null) { + streamingInsertJob.getRunningStreamTask().closeOrReleaseResources(); + } } @Override protected void executeCancelLogic(boolean needWaitCancelComplete) throws Exception { + if (streamingInsertJob.getRunningStreamTask() != null) { + streamingInsertJob.getRunningStreamTask().cancel(needWaitCancelComplete); + } } @Override public TRow getTvfInfo(String jobName) { + StreamingInsertTask runningTask = streamingInsertJob.getRunningStreamTask(); + TRow trow = new TRow(); + trow.addToColumnValue(new TCell().setStringVal(String.valueOf(runningTask.getTaskId()))); + trow.addToColumnValue(new TCell().setStringVal(String.valueOf(runningTask.getJobId()))); + trow.addToColumnValue(new TCell().setStringVal(jobName)); + trow.addToColumnValue(new TCell().setStringVal(runningTask.getLabelName())); + trow.addToColumnValue(new TCell().setStringVal(runningTask.getStatus().name())); + trow.addToColumnValue(new TCell().setStringVal(runningTask.getErrMsg())); + // create time + trow.addToColumnValue(new TCell().setStringVal(TimeUtils.longToTimeString(runningTask.getCreateTimeMs()))); + trow.addToColumnValue(new TCell().setStringVal(null == getStartTimeMs() ? "" + : TimeUtils.longToTimeString(runningTask.getStartTimeMs()))); + // load end time + trow.addToColumnValue(new TCell().setStringVal(TimeUtils.longToTimeString(runningTask.getFinishTimeMs()))); + // tracking url + trow.addToColumnValue(new TCell().setStringVal("trackingUrl")); + trow.addToColumnValue(new TCell().setStringVal("statistic")); + if (runningTask.getUserIdentity() == null) { + trow.addToColumnValue(new TCell().setStringVal("")); + } else { + trow.addToColumnValue(new TCell().setStringVal(runningTask.getUserIdentity().getQualifiedUser())); + } + trow.addToColumnValue(new TCell().setStringVal(runningTask.getOffset().toJson())); return null; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java b/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java index ae6bad07066d4e..9954d26b6f1dde 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java @@ -48,6 +48,7 @@ import org.apache.doris.qe.ConnectContext; import com.google.common.collect.Lists; +import lombok.Getter; import lombok.extern.log4j.Log4j2; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; @@ -73,6 +74,7 @@ public class JobManager, C> implements Writable { private JobScheduler jobScheduler; + @Getter private StreamingTaskScheduler streamingTaskScheduler; // lock for job @@ -112,6 +114,21 @@ public T getJob(long jobId) { return jobMap.get(jobId); } + /** + * get streaming running job + * + * @return running job + */ + public int getStreamingJobCnt() { + int count = 0; + for (T job : jobMap.values()) { + if (job.getJobConfig().getExecuteType().equals(JobExecuteType.STREAMING)) { + count++; + } + } + return count; + } + public void registerJob(T job) throws JobException { job.initParams(); createJobInternal(job, false); diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java b/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java index f88079617de532..3d62073929068b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java @@ -36,11 +36,17 @@ public interface SourceOffsetProvider { Offset getNextOffset(); /** - * Rewrite the TVF parameters in the InsertIntoTableCommand based on the current offset. - * @param command + * Get current offset * @return */ - InsertIntoTableCommand rewriteTvfParamsInCommand(InsertIntoTableCommand command); + Offset getCurrentOffset(); + + /** + * Rewrite the TVF parameters in the SQL based on the current offset. + * @param sql + * @return rewritten InsertIntoTableCommand + */ + InsertIntoTableCommand rewriteTvfParams(String sql); /** * Update the progress of the source. @@ -57,6 +63,6 @@ public interface SourceOffsetProvider { * Whether there is more data to consume * @return */ - boolean hasMoreData(); + boolean hasMoreDataToConsume(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProviderFactory.java b/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProviderFactory.java index 5ba1d903d78135..9cefa4e9d42314 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProviderFactory.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProviderFactory.java @@ -17,11 +17,15 @@ package org.apache.doris.job.offset; +import org.apache.doris.job.exception.JobException; import org.apache.doris.job.offset.s3.S3SourceOffsetProvider; +import lombok.extern.log4j.Log4j2; + import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +@Log4j2 public class SourceOffsetProviderFactory { private static final Map> map = new ConcurrentHashMap<>(); @@ -29,9 +33,16 @@ public class SourceOffsetProviderFactory { map.put("s3", S3SourceOffsetProvider.class); } - public static SourceOffsetProvider createSourceOffsetProvider(String sourceType) throws InstantiationException, - IllegalAccessException { - Class cla = map.get(sourceType.toUpperCase()); - return cla.newInstance(); + public static SourceOffsetProvider createSourceOffsetProvider(String sourceType) { + try { + Class cla = map.get(sourceType.toUpperCase()); + if (cla == null) { + throw new JobException("Unsupported source type: " + sourceType); + } + return cla.newInstance(); + } catch (Exception e) { + log.error("Failed to create source provider for type: " + sourceType, e); + throw new RuntimeException("Failed to create source provider for type: " + sourceType); + } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3Offset.java b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3Offset.java index 86ff467796af8d..a175575757f080 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3Offset.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3Offset.java @@ -20,13 +20,14 @@ import org.apache.doris.job.offset.Offset; import org.apache.doris.persist.gson.GsonUtils; +import lombok.Getter; + import java.util.List; public class S3Offset implements Offset { String startFile; - String endFile; - + @Getter List fileLists; @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java index 087d9c2beb7685..771736a9559f93 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java @@ -19,35 +19,60 @@ import org.apache.doris.job.offset.Offset; import org.apache.doris.job.offset.SourceOffsetProvider; +import org.apache.doris.nereids.parser.NereidsParser; import org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand; +import java.util.HashMap; +import java.util.Map; + public class S3SourceOffsetProvider implements SourceOffsetProvider { + S3Offset currentOffset; + String maxRemoteEndFile; @Override public String getSourceType() { - return null; + return "s3"; } @Override - public Offset getNextOffset() { + public S3Offset getNextOffset() { + //todo: listObjects from end file return null; } @Override - public InsertIntoTableCommand rewriteTvfParamsInCommand(InsertIntoTableCommand command) { - return null; + public Offset getCurrentOffset() { + return currentOffset; + } + + @Override + public InsertIntoTableCommand rewriteTvfParams(String sql) { + S3Offset nextOffset = getNextOffset(); + Map props = new HashMap<>(); + //todo: need to change file list to glob string + props.put("uri", nextOffset.getFileLists().toString()); + + NereidsParser parser = new NereidsParser(); + InsertIntoTableCommand command = (InsertIntoTableCommand) parser.parseSingle(sql); + command.rewriteTvfProperties(getSourceType(), props); + return command; } @Override public void updateProgress(Offset offset) { + this.currentOffset = (S3Offset) offset; } @Override public void fetchRemoteMeta() { + // list object } @Override - public boolean hasMoreData() { + public boolean hasMoreDataToConsume() { + if (currentOffset.endFile.compareTo(maxRemoteEndFile) < 0) { + return true; + } return false; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/StreamingTaskScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/StreamingTaskScheduler.java index d87a2b7833d402..bdc27e983e17d9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/StreamingTaskScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/StreamingTaskScheduler.java @@ -21,6 +21,7 @@ import org.apache.doris.common.Config; import org.apache.doris.common.CustomThreadFactory; import org.apache.doris.common.util.MasterDaemon; +import org.apache.doris.job.exception.JobException; import org.apache.doris.job.extensions.insert.streaming.StreamingInsertJob; import org.apache.doris.job.extensions.insert.streaming.StreamingInsertTask; @@ -75,11 +76,17 @@ private void process() throws InterruptedException { private void scheduleTasks(List tasks) { for (StreamingInsertTask task : tasks) { - threadPool.execute(() -> scheduleOneTask(task)); + threadPool.execute(() -> { + try { + scheduleOneTask(task); + } catch (Exception e) { + log.error("Failed to schedule task, task id: {}, job id: {}", task.getTaskId(), task.getJobId(), e); + } + }); } } - private void scheduleOneTask(StreamingInsertTask task) { + private void scheduleOneTask(StreamingInsertTask task) throws JobException { StreamingInsertJob job = (StreamingInsertJob) Env.getCurrentEnv().getJobManager().getJob(task.getJobId()); if (job == null) { log.warn("Job not found, job id: {}", task.getJobId()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java index 4e2ac653cf700f..18c5f525295d0d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java @@ -60,7 +60,7 @@ public AbstractTask() { taskId = getNextTaskId(); } - private static long getNextTaskId() { + public static long getNextTaskId() { // do not use Env.getNextId(), just generate id without logging return System.nanoTime() + RandomUtils.nextInt(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobCommand.java index ef8c9bb8b7a120..53a90893b81444 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobCommand.java @@ -66,7 +66,6 @@ public StmtType stmtType() { @Override public void doRun(ConnectContext ctx, StmtExecutor executor) throws Exception { - validate(); AbstractJob job = analyzeAndBuildJobInfo(ctx); ctx.getEnv().getJobManager().alterJob(job); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreateJobCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreateJobCommand.java index fecd457ada56eb..fe81921211fe9b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreateJobCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreateJobCommand.java @@ -19,7 +19,9 @@ import org.apache.doris.analysis.StmtType; import org.apache.doris.catalog.Env; +import org.apache.doris.common.Config; import org.apache.doris.job.base.AbstractJob; +import org.apache.doris.job.exception.JobException; import org.apache.doris.nereids.trees.plans.PlanType; import org.apache.doris.nereids.trees.plans.commands.info.CreateJobInfo; import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; @@ -56,10 +58,20 @@ public CreateJobCommand(CreateJobInfo jobInfo) { @Override public void run(ConnectContext ctx, StmtExecutor executor) throws Exception { + validate(); AbstractJob job = createJobInfo.analyzeAndBuildJobInfo(ctx); Env.getCurrentEnv().getJobManager().registerJob(job); } + private void validate() throws JobException { + if (createJobInfo.streamingJob()) { + int streamingJobCnt = Env.getCurrentEnv().getJobManager().getStreamingJobCnt(); + if (streamingJobCnt >= Config.max_streaming_job_num) { + throw new JobException("Exceed max streaming job num limit " + Config.max_streaming_job_num); + } + } + } + @Override public R accept(PlanVisitor visitor, C context) { return visitor.visitCreateJobCommand(this, context); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateJobInfo.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateJobInfo.java index 0d52e23ece56ff..4334526630ee8b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateJobInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateJobInfo.java @@ -274,7 +274,7 @@ private AbstractJob analyzeAndCreateStreamingInsertJob(String sql, String curren LogicalPlan logicalPlan = parser.parseSingle(sql); if (logicalPlan instanceof InsertIntoTableCommand) { return new StreamingInsertJob(labelNameOptional.get(), - JobStatus.RUNNING, + JobStatus.PENDING, currentDbName, comment, ConnectContext.get().getCurrentUserIdentity(), @@ -314,4 +314,8 @@ public static Long stripQuotesAndParseTimestamp(String str) { } return TimeUtils.timeStringToLong(str.trim()); } + + public boolean streamingJob() { + return streamingJob; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java index 8ffa8884dd995c..a5a428c78158c5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java @@ -38,6 +38,7 @@ import org.apache.doris.nereids.CascadesContext; import org.apache.doris.nereids.NereidsPlanner; import org.apache.doris.nereids.StatementContext; +import org.apache.doris.nereids.analyzer.UnboundTVFRelation; import org.apache.doris.nereids.analyzer.UnboundTableSink; import org.apache.doris.nereids.exceptions.AnalysisException; import org.apache.doris.nereids.glue.LogicalPlanAdapter; @@ -79,6 +80,7 @@ import org.apache.logging.log4j.Logger; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.concurrent.atomic.AtomicReference; @@ -536,6 +538,51 @@ public List getTargetColumns() { } } + // todo: add ut + public String getFirstTvfName() { + return getFirstTvfInPlan(getLogicalQuery()); + } + + private String getFirstTvfInPlan(LogicalPlan plan) { + if (plan instanceof UnboundTVFRelation) { + UnboundTVFRelation tvfRelation = (UnboundTVFRelation) plan; + return tvfRelation.getFunctionName(); + } + + for (Plan child : plan.children()) { + if (child instanceof LogicalPlan) { + String result = getFirstTvfInPlan((LogicalPlan) child); + if (!result.isEmpty()) { + return result; + } + } + } + return ""; + } + + // todo: add ut + public void rewriteTvfProperties(String functionName, Map props) { + rewriteTvfInPlan(originLogicalQuery, functionName, props); + if (logicalQuery.isPresent()) { + rewriteTvfInPlan(logicalQuery.get(), functionName, props); + } + } + + private void rewriteTvfInPlan(LogicalPlan plan, String functionName, Map props) { + if (plan instanceof UnboundTVFRelation) { + UnboundTVFRelation tvfRelation = (UnboundTVFRelation) plan; + if (functionName.equalsIgnoreCase(tvfRelation.getFunctionName())) { + tvfRelation.getProperties().getMap().putAll(props); + } + } + + for (Plan child : plan.children()) { + if (child instanceof LogicalPlan) { + rewriteTvfInPlan((LogicalPlan) child, functionName, props); + } + } + } + @Override public Plan getExplainPlan(ConnectContext ctx) { Optional analyzeContext = Optional.of(