Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ private boolean canCreateTask(TaskType taskType) {

switch (taskType) {
case SCHEDULED:
return currentJobStatus.equals(JobStatus.RUNNING);
return currentJobStatus.equals(JobStatus.RUNNING) || currentJobStatus.equals(JobStatus.PENDING);
case MANUAL:
return currentJobStatus.equals(JobStatus.RUNNING) || currentJobStatus.equals(JobStatus.PAUSED);
default:
Expand Down Expand Up @@ -294,7 +294,8 @@ public void updateJobStatus(JobStatus newJobStatus) throws JobException {
}
String errorMsg = String.format("Can't update job %s status to the %s status",
jobStatus.name(), newJobStatus.name());
if (newJobStatus.equals(JobStatus.RUNNING) && !jobStatus.equals(JobStatus.PAUSED)) {
if (newJobStatus.equals(JobStatus.RUNNING)
&& (!jobStatus.equals(JobStatus.PAUSED) && !jobStatus.equals(JobStatus.PENDING))) {
throw new IllegalArgumentException(errorMsg);
}
if (newJobStatus.equals(JobStatus.STOPPED) && !jobStatus.equals(JobStatus.RUNNING)) {
Expand Down
30 changes: 30 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/job/common/JobUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.job.common;

import org.apache.doris.job.base.AbstractJob;
import org.apache.doris.job.base.JobExecuteType;

public class JobUtils {
public static boolean checkNeedSchedule(AbstractJob job) {
if (job.getJobConfig().getExecuteType() == JobExecuteType.STREAMING) {
return !job.isFinalStatus();
}
return job.getJobStatus() == JobStatus.RUNNING;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@
package org.apache.doris.job.executor;

import org.apache.doris.job.base.AbstractJob;
import org.apache.doris.job.base.JobExecuteType;
import org.apache.doris.job.common.JobStatus;
import org.apache.doris.job.common.JobType;
import org.apache.doris.job.common.JobUtils;
import org.apache.doris.job.common.TaskType;
import org.apache.doris.job.disruptor.TimerJobEvent;
import org.apache.doris.job.task.AbstractTask;
Expand Down Expand Up @@ -56,7 +55,7 @@ public void onEvent(TimerJobEvent<T> event) {
log.info("job is null,may be job is deleted, ignore");
return;
}
if (event.getJob().isReadyForScheduling(null) && checkStatus(event)) {
if (event.getJob().isReadyForScheduling(null) && JobUtils.checkNeedSchedule(event.getJob())) {
List<? extends AbstractTask> tasks = event.getJob().commonCreateTasks(TaskType.SCHEDULED, null);
if (CollectionUtils.isEmpty(tasks)) {
log.warn("job is ready for scheduling, but create task is empty, skip scheduler,"
Expand All @@ -78,11 +77,4 @@ public void onEvent(TimerJobEvent<T> event) {
log.warn("dispatch timer job error, task id is {}", event.getJob().getJobId(), e);
}
}

private boolean checkStatus(TimerJobEvent<T> event) {
if (event.getJob().getJobConfig().getExecuteType() == JobExecuteType.STREAMING) {
return !event.getJob().isFinalStatus();
}
return event.getJob().getJobStatus() == JobStatus.RUNNING;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.doris.job.executor;

import org.apache.doris.job.base.AbstractJob;
import org.apache.doris.job.common.JobStatus;
import org.apache.doris.job.common.JobUtils;
import org.apache.doris.job.disruptor.TaskDisruptor;

import io.netty.util.Timeout;
Expand All @@ -40,7 +40,7 @@ public TimerJobSchedulerTask(TaskDisruptor dispatchDisruptor, T job) {
@Override
public void run(Timeout timeout) {
try {
if (!JobStatus.RUNNING.equals(job.getJobStatus())) {
if (!JobUtils.checkNeedSchedule(job)) {
log.info("job status is not running, job id is {}, skip dispatch", this.job.getJobId());
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@
import org.apache.doris.job.common.JobStatus;
import org.apache.doris.job.common.JobType;
import org.apache.doris.job.common.PauseReason;
import org.apache.doris.job.common.TaskStatus;
import org.apache.doris.job.common.TaskType;
import org.apache.doris.job.exception.JobException;
import org.apache.doris.job.extensions.insert.InsertJob;
import org.apache.doris.job.offset.Offset;
import org.apache.doris.job.offset.SourceOffsetProvider;
import org.apache.doris.job.offset.SourceOffsetProviderFactory;
import org.apache.doris.job.task.AbstractTask;
Expand All @@ -42,6 +42,7 @@
import org.apache.doris.nereids.analyzer.UnboundTVFRelation;
import org.apache.doris.nereids.parser.NereidsParser;
import org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand;
import org.apache.doris.persist.gson.GsonPostProcessable;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.ShowResultSetMetaData;
Expand All @@ -65,7 +66,7 @@
import java.util.Map;

public class StreamingInsertJob extends AbstractJob<StreamingJobSchedulerTask, Map<Object, Object>> implements
TxnStateChangeCallback {
TxnStateChangeCallback, GsonPostProcessable {
private final long dbId;
private StreamingJobStatistic jobStatistic = new StreamingJobStatistic();
@SerializedName("fm")
Expand All @@ -79,10 +80,16 @@ public class StreamingInsertJob extends AbstractJob<StreamingJobSchedulerTask, M
@Setter
protected long autoResumeCount;
@Getter
@SerializedName("jp")
private StreamingJobProperties jobProperties;
@Getter
@SerializedName("tt")
private String tvfType;
@Getter
StreamingInsertTask runningStreamTask;
SourceOffsetProvider offsetProvider;
@Setter
@Getter
private long lastScheduleTaskTimestamp = -1L;

public StreamingInsertJob(String jobName,
Expand All @@ -98,7 +105,7 @@ public StreamingInsertJob(String jobName,
jobConfig, createTimeMs, executeSql);
this.dbId = ConnectContext.get().getCurrentDbId();
this.jobProperties = jobProperties;
String tvfType = parseTvfType();
this.tvfType = parseTvfType();
this.offsetProvider = SourceOffsetProviderFactory.createSourceOffsetProvider(tvfType);
this.offsetProvider.init(getExecuteSql(), jobProperties);
}
Expand Down Expand Up @@ -140,11 +147,11 @@ public List<StreamingJobSchedulerTask> createTasks(TaskType taskType, Map<Object
}

protected StreamingInsertTask createStreamingInsertTask() {
Offset nextOffset = offsetProvider.getNextOffset();
InsertIntoTableCommand command = offsetProvider.rewriteTvfParams(nextOffset);
this.runningStreamTask = new StreamingInsertTask(getJobId(), AbstractTask.getNextTaskId(), command,
getCurrentDbName(), offsetProvider.getCurrentOffset(), jobProperties);
return this.runningStreamTask;
this.runningStreamTask = new StreamingInsertTask(getJobId(), AbstractTask.getNextTaskId(), getExecuteSql(),
offsetProvider, getCurrentDbName(), jobProperties, getCreateUser());
Env.getCurrentEnv().getJobManager().getStreamingTaskScheduler().registerTask(runningStreamTask);
this.runningStreamTask.setStatus(TaskStatus.PENDING);
return runningStreamTask;
}

protected void fetchMeta() {
Expand Down Expand Up @@ -186,7 +193,6 @@ public void onStreamTaskFail(StreamingInsertTask task) throws JobException {
public void onStreamTaskSuccess(StreamingInsertTask task) {
StreamingInsertTask nextTask = createStreamingInsertTask();
this.runningStreamTask = nextTask;
Env.getCurrentEnv().getJobManager().getStreamingTaskScheduler().registerTask(runningStreamTask);
}

private void updateJobStatisticAndOffset(StreamingTaskTxnCommitAttachment attachment) {
Expand Down Expand Up @@ -270,7 +276,7 @@ public void beforeCommitted(TransactionState txnState) throws TransactionExcepti
loadStatistic.getLoadBytes(),
loadStatistic.getFileNumber(),
loadStatistic.getTotalFileSizeB(),
runningStreamTask.getOffset()));
runningStreamTask.getRunningOffset()));
}

@Override
Expand Down Expand Up @@ -315,4 +321,12 @@ public void replayOnVisible(TransactionState txnState) {


}

@Override
public void gsonPostProcess() throws IOException {
if (offsetProvider == null && jobProperties != null && tvfType != null) {
this.offsetProvider = SourceOffsetProviderFactory.createSourceOffsetProvider(tvfType);
// this.offsetProvider.init(getExecuteSql(), jobProperties);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.doris.job.exception.JobException;
import org.apache.doris.job.extensions.insert.InsertTask;
import org.apache.doris.job.offset.Offset;
import org.apache.doris.job.offset.SourceOffsetProvider;
import org.apache.doris.nereids.StatementContext;
import org.apache.doris.nereids.glue.LogicalPlanAdapter;
import org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand;
Expand All @@ -35,6 +36,7 @@
import org.apache.doris.thrift.TStatusCode;

import lombok.Getter;
import lombok.Setter;
import lombok.extern.log4j.Log4j2;

import java.util.Optional;
Expand All @@ -48,32 +50,36 @@ public class StreamingInsertTask {
private long jobId;
private long taskId;
private String labelName;
@Setter
private TaskStatus status;
private String errMsg;
private Long createTimeMs;
private Long startTimeMs;
private Long finishTimeMs;
private InsertIntoTableCommand command;
private String sql;
private SourceOffsetProvider offsetProvider;
private StmtExecutor stmtExecutor;
private InsertIntoTableCommand command;
private String currentDb;
private UserIdentity userIdentity;
private ConnectContext ctx;
private Offset offset;
private Offset runningOffset;
private AtomicBoolean isCanceled = new AtomicBoolean(false);
private StreamingJobProperties jobProperties;

public StreamingInsertTask(long jobId,
long taskId,
InsertIntoTableCommand command,
String sql,
SourceOffsetProvider offsetProvider,
String currentDb,
Offset offset,
StreamingJobProperties jobProperties) {
StreamingJobProperties jobProperties,
UserIdentity userIdentity) {
this.jobId = jobId;
this.taskId = taskId;
this.command = command;
this.userIdentity = ctx.getCurrentUserIdentity();
this.sql = sql;
this.offsetProvider = offsetProvider;
this.userIdentity = userIdentity;
this.currentDb = currentDb;
this.offset = offset;
this.jobProperties = jobProperties;
this.labelName = getJobId() + LABEL_SPLITTER + getTaskId();
this.createTimeMs = System.currentTimeMillis();
Expand Down Expand Up @@ -101,15 +107,20 @@ public void execute() throws JobException {
}

private void before() throws JobException {
this.status = TaskStatus.RUNNING;
this.startTimeMs = System.currentTimeMillis();

if (isCanceled.get()) {
throw new JobException("Export executor has been canceled, task id: {}", getTaskId());
}
ctx = InsertTask.makeConnectContext(userIdentity, currentDb);
ctx.setSessionVariable(jobProperties.getSessionVariable());
StatementContext statementContext = new StatementContext();
ctx.setStatementContext(statementContext);
this.command.setLabelName(Optional.of(this.labelName));
offsetProvider.init(sql, jobProperties);
this.runningOffset = offsetProvider.getNextOffset();
this.command = offsetProvider.rewriteTvfParams(runningOffset);
this.command.setLabelName(Optional.of(getJobId() + LABEL_SPLITTER + getTaskId()));
this.command.setJobId(getTaskId());
stmtExecutor = new StmtExecutor(ctx, new LogicalPlanAdapter(command, ctx.getStatementContext()));
}
Expand All @@ -123,7 +134,7 @@ private void run() throws JobException {
log.info("task has been canceled, task id is {}", getTaskId());
return;
}
command.runWithUpdateInfo(ctx, stmtExecutor, null);
command.run(ctx, stmtExecutor);
if (ctx.getState().getStateType() == QueryState.MysqlStateType.OK) {
return;
} else {
Expand All @@ -138,13 +149,13 @@ private void run() throws JobException {
}
} catch (Exception e) {
log.warn("execute insert task error, label is {},offset is {}", command.getLabelName(),
offset.toJson(), e);
runningOffset.toJson(), e);
errMsg = Util.getRootCauseMessage(e);
}
retry++;
}
log.error("streaming insert task failed, job id is {}, task id is {}, offset is {}, errMsg is {}",
getJobId(), getTaskId(), offset.toJson(), errMsg);
getJobId(), getTaskId(), runningOffset.toJson(), errMsg);
throw new JobException(errMsg);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,8 @@ public TRow getTvfInfo(String jobName) {
} else {
trow.addToColumnValue(new TCell().setStringVal(runningTask.getUserIdentity().getQualifiedUser()));
}
trow.addToColumnValue(new TCell().setStringVal(runningTask.getOffset().toJson()));
trow.addToColumnValue(new TCell().setStringVal(runningTask.getRunningOffset() == null ? ""
: runningTask.getRunningOffset().toJson()));
return trow;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public class S3SourceOffsetProvider implements SourceOffsetProvider {

@Override
public void init(String executeSql, StreamingJobProperties jobProperties) {
//todo: check is already init
this.executeSql = executeSql;
this.jobProperties = jobProperties;
this.parser = new NereidsParser();
Expand All @@ -71,10 +72,11 @@ public String getSourceType() {
public S3Offset getNextOffset() {
S3Offset offset = new S3Offset();
List<String> rfiles = new ArrayList<>();
String startFile = currentOffset == null ? null : currentOffset.endFile;
try (RemoteFileSystem fileSystem = FileSystemFactory.get(storageProperties)) {
maxRemoteEndFile = fileSystem.globListWithLimit(filePath, rfiles, currentOffset.endFile,
maxRemoteEndFile = fileSystem.globListWithLimit(filePath, rfiles, startFile,
jobProperties.getS3BatchFiles(), jobProperties.getS3BatchSize());
offset.setStartFile(currentOffset.endFile);
offset.setStartFile(startFile);
offset.setEndFile(rfiles.get(rfiles.size() - 1));
offset.setFileLists(rfiles);
} catch (Exception e) {
Expand All @@ -90,15 +92,14 @@ public Offset getCurrentOffset() {
}

@Override
public InsertIntoTableCommand rewriteTvfParams(Offset nextOffset) {
S3Offset offset = (S3Offset) nextOffset;
public InsertIntoTableCommand rewriteTvfParams(Offset runningOffset) {
S3Offset offset = (S3Offset) runningOffset;
Map<String, String> props = new HashMap<>();
//todo: need to change file list to glob string
props.put("uri", offset.getFileLists().toString());

String finalUri = "{" + String.join(",", offset.getFileLists()) + "}";
props.put("uri", finalUri);
InsertIntoTableCommand command = (InsertIntoTableCommand) parser.parseSingle(executeSql);

command.rewriteFirstTvfProperties(getSourceType(), props);
//todo: command query plan is immutable
//command.rewriteFirstTvfProperties(getSourceType(), props);
return command;
}

Expand All @@ -114,6 +115,9 @@ public void fetchRemoteMeta() {

@Override
public boolean hasMoreDataToConsume() {
if (currentOffset == null) {
return true;
}
if (currentOffset.endFile.compareTo(maxRemoteEndFile) < 0) {
return true;
}
Expand Down
Loading
Loading