Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,6 @@ public static StorageProperties createPrimary(Map<String, String> origProps) {

private static final List<Function<Map<String, String>, StorageProperties>> PROVIDERS =
Arrays.asList(
props -> (isFsSupport(props, FS_HDFS_SUPPORT)
|| HdfsProperties.guessIsMe(props)) ? new HdfsProperties(props) : null,
props -> ((isFsSupport(props, FS_OSS_HDFS_SUPPORT)
|| isFsSupport(props, DEPRECATED_OSS_HDFS_SUPPORT))
|| OSSHdfsProperties.guessIsMe(props)) ? new OSSHdfsProperties(props) : null,
Expand All @@ -181,7 +179,9 @@ public static StorageProperties createPrimary(Map<String, String> origProps) {
props -> (isFsSupport(props, FS_BROKER_SUPPORT)
|| BrokerProperties.guessIsMe(props)) ? new BrokerProperties(props) : null,
props -> (isFsSupport(props, FS_LOCAL_SUPPORT)
|| LocalProperties.guessIsMe(props)) ? new LocalProperties(props) : null
|| LocalProperties.guessIsMe(props)) ? new LocalProperties(props) : null,
props -> (isFsSupport(props, FS_HDFS_SUPPORT)
|| HdfsProperties.guessIsMe(props)) ? new HdfsProperties(props) : null
);

protected StorageProperties(Type type, Map<String, String> origProps) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ default Status globList(String remotePath, List<RemoteFile> result) {
* @param fileNumLimit limit the total number of files to be listed.
* @return
*/
default String globListWithLimit(String remotePath, List<String> result,
default String globListWithLimit(String remotePath, List<RemoteFile> result,
String startFile, long fileSizeLimit, long fileNumLimit) {
throw new UnsupportedOperationException("Unsupported operation glob list with limit on current file system.");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -642,7 +642,7 @@ public Status globList(String remotePath, List<RemoteFile> result, boolean fileN
*
* @return The largest file name after listObject this time
*/
public String globListWithLimit(String remotePath, List<String> result, String startFile,
public String globListWithLimit(String remotePath, List<RemoteFile> result, String startFile,
long fileSizeLimit, long fileNumLimit) {
long roundCnt = 0;
long elementCnt = 0;
Expand Down Expand Up @@ -704,9 +704,16 @@ public String globListWithLimit(String remotePath, List<String> result, String s
}

matchCnt++;
RemoteFile remoteFile = new RemoteFile(objPath.getFileName().toString(),
!isPrefix,
isPrefix ? -1 : obj.size(),
isPrefix ? -1 : obj.size(),
isPrefix ? 0 : obj.lastModified().toEpochMilli()
);
remoteFile.setBucket(bucket);
remoteFile.setParentPath(objPath.getParent().toString());
matchFileSize += obj.size();
String remoteFileName = "s3://" + bucket + "/" + objPath;
result.add(remoteFileName);
result.add(remoteFile);

if (reachLimit(result.size(), matchFileSize, fileSizeLimit, fileNumLimit)) {
break;
Expand All @@ -718,8 +725,7 @@ public String globListWithLimit(String remotePath, List<String> result, String s
}
//record current last object file name
S3Object lastS3Object = response.contents().get(response.contents().size() - 1);
java.nio.file.Path lastObjPath = Paths.get(lastS3Object.key());
currentMaxFile = "s3://" + bucket + "/" + lastObjPath;
currentMaxFile = lastS3Object.key();

isTruncated = response.isTruncated();
if (isTruncated) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ public class RemoteFile {
private long modificationTime;
private Path path;
BlockLocation[] blockLocations;
private String parentPath;
private String bucket;

public RemoteFile(String name, boolean isFile, long size, long blockSize) {
this(name, null, isFile, !isFile, size, blockSize, 0, null);
Expand Down Expand Up @@ -75,6 +77,22 @@ public void setPath(Path path) {
this.path = path;
}

public String getBucket() {
return bucket;
}

public void setBucket(String bucket) {
this.bucket = bucket;
}

public String getParentPath() {
return parentPath;
}

public void setParentPath(String parentPath) {
this.parentPath = parentPath;
}

public boolean isFile() {
return isFile;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public Status globList(String remotePath, List<RemoteFile> result, boolean fileN
}

@Override
public String globListWithLimit(String remotePath, List<String> result, String startFile,
public String globListWithLimit(String remotePath, List<RemoteFile> result, String startFile,
long fileSizeLimit, long fileNumLimit) {
S3ObjStorage objStorage = (S3ObjStorage) this.objStorage;
return objStorage.globListWithLimit(remotePath, result, startFile, fileSizeLimit, fileNumLimit);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,13 +99,13 @@ public abstract class AbstractJob<T extends AbstractTask, C> implements Job<T, C


@SerializedName(value = "stc")
private AtomicLong succeedTaskCount = new AtomicLong(0);
protected AtomicLong succeedTaskCount = new AtomicLong(0);

@SerializedName(value = "ftc")
private AtomicLong failedTaskCount = new AtomicLong(0);
protected AtomicLong failedTaskCount = new AtomicLong(0);

@SerializedName(value = "ctc")
private AtomicLong canceledTaskCount = new AtomicLong(0);
protected AtomicLong canceledTaskCount = new AtomicLong(0);

public AbstractJob() {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@

package org.apache.doris.job.base;

import org.apache.doris.common.AnalysisException;

public interface JobProperties {
default void validate() throws AnalysisException {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,18 @@
import org.apache.doris.catalog.Env;
import org.apache.doris.cloud.proto.Cloud;
import org.apache.doris.cloud.rpc.MetaServiceProxy;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.InternalErrorCode;
import org.apache.doris.common.UserException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.job.base.AbstractJob;
import org.apache.doris.job.base.JobExecuteType;
import org.apache.doris.job.base.JobExecutionConfiguration;
import org.apache.doris.job.base.TimerDefinition;
import org.apache.doris.job.common.IntervalUnit;
import org.apache.doris.job.common.JobStatus;
import org.apache.doris.job.common.JobType;
import org.apache.doris.job.common.PauseReason;
Expand Down Expand Up @@ -75,8 +79,6 @@ public class StreamingInsertJob extends AbstractJob<StreamingJobSchedulerTask, M
TxnStateChangeCallback, GsonPostProcessable {
private final long dbId;
private StreamingJobStatistic jobStatistic = new StreamingJobStatistic();
@SerializedName("fm")
private FailMsg failMsg;
@Getter
protected PauseReason pauseReason;
@Getter
Expand All @@ -86,7 +88,8 @@ public class StreamingInsertJob extends AbstractJob<StreamingJobSchedulerTask, M
@Setter
protected long autoResumeCount;
@Getter
@SerializedName("jp")
@SerializedName("props")
private final Map<String, String> properties;
private StreamingJobProperties jobProperties;
@Getter
@SerializedName("tvf")
Expand All @@ -108,26 +111,40 @@ public StreamingInsertJob(String jobName,
JobExecutionConfiguration jobConfig,
Long createTimeMs,
String executeSql,
StreamingJobProperties jobProperties) {
Map<String, String> properties) {
super(getNextJobId(), jobName, jobStatus, dbName, comment, createUser,
jobConfig, createTimeMs, executeSql);
this.dbId = ConnectContext.get().getCurrentDbId();
this.jobProperties = jobProperties;
this.properties = properties;
init();
}

private void init() {
try {
this.jobProperties = new StreamingJobProperties(properties);
jobProperties.validate();
// build time definition
JobExecutionConfiguration execConfig = getJobConfig();
TimerDefinition timerDefinition = new TimerDefinition();
timerDefinition.setInterval(jobProperties.getMaxIntervalSecond());
timerDefinition.setIntervalUnit(IntervalUnit.SECOND);
timerDefinition.setStartTimeMs(execConfig.getTimerDefinition().getStartTimeMs());
execConfig.setTimerDefinition(timerDefinition);

UnboundTVFRelation currentTvf = getCurrentTvf();
this.tvfType = currentTvf.getFunctionName();
this.originTvfProps = currentTvf.getProperties().getMap();
this.offsetProvider = SourceOffsetProviderFactory.createSourceOffsetProvider(currentTvf.getFunctionName());
} catch (AnalysisException ae) {
log.warn("parse streaming insert job failed, props: {}", properties, ae);
throw new RuntimeException("parse streaming insert job failed, " + ae.getMessage());
} catch (Exception ex) {
log.warn("init streaming insert job failed, sql: {}", getExecuteSql(), ex);
throw new RuntimeException("init streaming insert job failed, sql: " + getExecuteSql(), ex);
throw new RuntimeException("init streaming insert job failed, " + ex.getMessage());
}
}

private UnboundTVFRelation getCurrentTvf() throws Exception {
private UnboundTVFRelation getCurrentTvf() {
if (baseCommand == null) {
this.baseCommand = (InsertIntoTableCommand) new NereidsParser().parseSingle(getExecuteSql());
}
Expand Down Expand Up @@ -170,7 +187,7 @@ public List<StreamingJobSchedulerTask> createTasks(TaskType taskType, Map<Object

protected StreamingInsertTask createStreamingInsertTask() {
this.runningStreamTask = new StreamingInsertTask(getJobId(), AbstractTask.getNextTaskId(), getExecuteSql(),
offsetProvider, getCurrentDbName(), jobProperties, getCreateUser());
offsetProvider, getCurrentDbName(), jobProperties, originTvfProps, getCreateUser());
Env.getCurrentEnv().getJobManager().getStreamingTaskManager().registerTask(runningStreamTask);
this.runningStreamTask.setStatus(TaskStatus.PENDING);
return runningStreamTask;
Expand Down Expand Up @@ -213,17 +230,21 @@ public void onTaskSuccess(StreamingJobSchedulerTask task) throws JobException {
}

public void onStreamTaskFail(StreamingInsertTask task) throws JobException {
failedTaskCount.incrementAndGet();
Env.getCurrentEnv().getJobManager().getStreamingTaskManager().removeRunningTask(task);
if (getJobConfig().getExecuteType().equals(JobExecuteType.INSTANT)) {
this.failMsg = new FailMsg(FailMsg.CancelType.LOAD_RUN_FAIL, task.getErrMsg());
this.pauseReason = new PauseReason(InternalErrorCode.INTERNAL_ERR, task.getErrMsg());
}
updateJobStatus(JobStatus.PAUSED);
}

public void onStreamTaskSuccess(StreamingInsertTask task) {
succeedTaskCount.incrementAndGet();
Env.getCurrentEnv().getJobManager().getStreamingTaskManager().removeRunningTask(task);
StreamingInsertTask nextTask = createStreamingInsertTask();
this.runningStreamTask = nextTask;
//todo: maybe fetch from txn attachment?
offsetProvider.updateOffset(task.getRunningOffset());
}

private void updateJobStatisticAndOffset(StreamingTaskTxnCommitAttachment attachment) {
Expand Down Expand Up @@ -262,7 +283,7 @@ public TRow getTvfInfo() {
trow.addToColumnValue(new TCell().setStringVal(getJobName()));
trow.addToColumnValue(new TCell().setStringVal(getCreateUser().getQualifiedUser()));
trow.addToColumnValue(new TCell().setStringVal(getJobConfig().getExecuteType().name()));
trow.addToColumnValue(new TCell().setStringVal(getJobConfig().convertRecurringStrategyToString()));
trow.addToColumnValue(new TCell().setStringVal(""));
trow.addToColumnValue(new TCell().setStringVal(getJobStatus().name()));
trow.addToColumnValue(new TCell().setStringVal(getExecuteSql()));
trow.addToColumnValue(new TCell().setStringVal(TimeUtils.longToTimeString(getCreateTimeMs())));
Expand All @@ -276,16 +297,16 @@ public TRow getTvfInfo() {
} else {
trow.addToColumnValue(new TCell().setStringVal(FeConstants.null_string));
}

if (offsetProvider != null && offsetProvider.getRemoteOffset() != null) {
trow.addToColumnValue(new TCell().setStringVal(offsetProvider.getRemoteOffset()));
} else {
trow.addToColumnValue(new TCell().setStringVal(FeConstants.null_string));
}

trow.addToColumnValue(new TCell().setStringVal(offsetProvider.getRemoteOffset()));
trow.addToColumnValue(new TCell().setStringVal(
jobStatistic == null ? FeConstants.null_string : jobStatistic.toJson()));
trow.addToColumnValue(new TCell().setStringVal(failMsg == null ? FeConstants.null_string : failMsg.getMsg()));
trow.addToColumnValue(new TCell().setStringVal(pauseReason == null ? FeConstants.null_string : pauseReason.getMsg()));
return trow;
}

Expand Down Expand Up @@ -336,7 +357,6 @@ public void beforeCommitted(TransactionState txnState) throws TransactionExcepti

@Override
public void beforeAborted(TransactionState txnState) throws TransactionException {

}

@Override
Expand Down Expand Up @@ -387,7 +407,6 @@ public void replayOnCloudMode() throws UserException {
@Override
public void afterAborted(TransactionState txnState, boolean txnOperated, String txnStatusChangeReason)
throws UserException {

}

@Override
Expand All @@ -402,14 +421,16 @@ public void afterVisible(TransactionState txnState, boolean txnOperated) {

@Override
public void replayOnVisible(TransactionState txnState) {


}

@Override
public void gsonPostProcess() throws IOException {
if (offsetProvider == null) {
if (offsetProvider == null && tvfType != null) {
offsetProvider = SourceOffsetProviderFactory.createSourceOffsetProvider(tvfType);
}

if (jobProperties == null && properties != null) {
jobProperties = new StreamingJobProperties(properties);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.doris.job.offset.SourceOffsetProvider;
import org.apache.doris.nereids.StatementContext;
import org.apache.doris.nereids.glue.LogicalPlanAdapter;
import org.apache.doris.nereids.parser.NereidsParser;
import org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.QueryState;
Expand All @@ -39,6 +40,7 @@
import lombok.Setter;
import lombok.extern.log4j.Log4j2;

import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;

Expand All @@ -65,6 +67,7 @@ public class StreamingInsertTask {
private Offset runningOffset;
private AtomicBoolean isCanceled = new AtomicBoolean(false);
private StreamingJobProperties jobProperties;
private Map<String, String> originTvfProps;
SourceOffsetProvider offsetProvider;

public StreamingInsertTask(long jobId,
Expand All @@ -73,6 +76,7 @@ public StreamingInsertTask(long jobId,
SourceOffsetProvider offsetProvider,
String currentDb,
StreamingJobProperties jobProperties,
Map<String, String> originTvfProps,
UserIdentity userIdentity) {
this.jobId = jobId;
this.taskId = taskId;
Expand All @@ -81,6 +85,7 @@ public StreamingInsertTask(long jobId,
this.currentDb = currentDb;
this.offsetProvider = offsetProvider;
this.jobProperties = jobProperties;
this.originTvfProps = originTvfProps;
this.labelName = getJobId() + LABEL_SPLITTER + getTaskId();
this.createTimeMs = System.currentTimeMillis();
}
Expand Down Expand Up @@ -118,8 +123,15 @@ private void before() throws Exception {
StatementContext statementContext = new StatementContext();
ctx.setStatementContext(statementContext);

this.runningOffset = offsetProvider.getNextOffset(jobProperties, jobProperties.getProperties());
this.taskCommand = offsetProvider.rewriteTvfParams(sql, runningOffset);
this.runningOffset = offsetProvider.getNextOffset(jobProperties, originTvfProps);
InsertIntoTableCommand baseCommand = (InsertIntoTableCommand) new NereidsParser().parseSingle(sql);
StmtExecutor baseStmtExecutor =
new StmtExecutor(ctx, new LogicalPlanAdapter(baseCommand, ctx.getStatementContext()));
baseCommand.initPlan(ctx, baseStmtExecutor, false);
if (!baseCommand.getParsedPlan().isPresent()) {
throw new JobException("Can not get Parsed plan");
}
this.taskCommand = offsetProvider.rewriteTvfParams(baseCommand, runningOffset);
this.taskCommand.setLabelName(Optional.of(getJobId() + LABEL_SPLITTER + getTaskId()));
this.taskCommand.setJobId(getTaskId());
this.stmtExecutor = new StmtExecutor(ctx, new LogicalPlanAdapter(taskCommand, ctx.getStatementContext()));
Expand Down
Loading
Loading