diff --git a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 index a0ae7c6916e636..4b6dd3668d4ac5 100644 --- a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 +++ b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 @@ -111,7 +111,7 @@ supportedJobStatement commentSpec? DO supportedDmlStatement #createScheduledJob | PAUSE JOB WHERE (jobNameKey=identifier) EQ (jobNameValue=STRING_LITERAL) #pauseJob - | ALTER JOB FOR (jobNameKey=identifier) (propertyClause | supportedDmlStatement | propertyClause supportedDmlStatement) #alterJob + | ALTER JOB FOR (jobName=multipartIdentifier) (propertyClause | supportedDmlStatement | propertyClause supportedDmlStatement) #alterJob | DROP JOB (IF EXISTS)? WHERE (jobNameKey=identifier) EQ (jobNameValue=STRING_LITERAL) #dropJob | RESUME JOB WHERE (jobNameKey=identifier) EQ (jobNameValue=STRING_LITERAL) #resumeJob | CANCEL TASK WHERE (jobNameKey=identifier) EQ (jobNameValue=STRING_LITERAL) AND (taskIdKey=identifier) EQ (taskIdValue=INTEGER_VALUE) #cancelJobTask diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/TxnUtil.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/TxnUtil.java index 4155e6c5e67ab7..424ce238544410 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/TxnUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/TxnUtil.java @@ -289,7 +289,7 @@ public static TxnCommitAttachmentPB streamingTaskTxnCommitAttachmentToPb(Streami .setFileSize(streamingTaskTxnCommitAttachment.getFileSize()); if (streamingTaskTxnCommitAttachment.getOffset() != null) { - builder.setOffset(streamingTaskTxnCommitAttachment.getOffset().endOffset()); + builder.setOffset(streamingTaskTxnCommitAttachment.getOffset()); } attachementBuilder.setStreamingTaskTxnCommitAttachment(builder.build()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/obj/S3ObjStorage.java b/fe/fe-core/src/main/java/org/apache/doris/fs/obj/S3ObjStorage.java index 4c79df0acf705a..9b0e5aeb80194d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/obj/S3ObjStorage.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/obj/S3ObjStorage.java @@ -682,6 +682,7 @@ public String globListWithLimit(String remotePath, List result, Stri String currentMaxFile = ""; boolean isTruncated = false; + boolean reachLimit = false; do { roundCnt++; ListObjectsV2Response response = listObjectsV2(request); @@ -716,12 +717,16 @@ public String globListWithLimit(String remotePath, List result, Stri result.add(remoteFile); if (reachLimit(result.size(), matchFileSize, fileSizeLimit, fileNumLimit)) { + reachLimit = true; break; } objPath = objPath.getParent(); isPrefix = true; } + if (reachLimit) { + break; + } } //record current last object file name S3Object lastS3Object = response.contents().get(response.contents().size() - 1); @@ -733,7 +738,7 @@ public String globListWithLimit(String remotePath, List result, Stri .continuationToken(response.nextContinuationToken()) .build(); } - } while (isTruncated); + } while (isTruncated && !reachLimit); if (LOG.isDebugEnabled()) { LOG.debug("remotePath:{}, result:{}", remotePath, result); diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecutionConfiguration.java b/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecutionConfiguration.java index 3d8c9afa36b290..423b436979f0d8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecutionConfiguration.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecutionConfiguration.java @@ -124,22 +124,7 @@ public List getTriggerDelayTimes(Long currentTimeMs, Long startTimeMs, Lon return delayTimeSeconds; } - if (JobExecuteType.STREAMING.equals(executeType) && null != timerDefinition) { - if (null == timerDefinition.getStartTimeMs() || null != timerDefinition.getLatestSchedulerTimeMs()) { - return delayTimeSeconds; - } - - // If the job is already executed or in the schedule queue, or not within this schedule window - if (endTimeMs < timerDefinition.getStartTimeMs()) { - return delayTimeSeconds; - } - - delayTimeSeconds.add(queryDelayTimeSecond(currentTimeMs, timerDefinition.getStartTimeMs())); - this.timerDefinition.setLatestSchedulerTimeMs(timerDefinition.getStartTimeMs()); - return delayTimeSeconds; - } - - if (JobExecuteType.RECURRING.equals(executeType)) { + if (JobExecuteType.RECURRING.equals(executeType) || JobExecuteType.STREAMING.equals(executeType)) { if (timerDefinition.getStartTimeMs() > endTimeMs || null != timerDefinition.getEndTimeMs() && timerDefinition.getEndTimeMs() < startTimeMs) { return delayTimeSeconds; diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/common/PauseReason.java b/fe/fe-core/src/main/java/org/apache/doris/job/common/FailureReason.java similarity index 91% rename from fe/fe-core/src/main/java/org/apache/doris/job/common/PauseReason.java rename to fe/fe-core/src/main/java/org/apache/doris/job/common/FailureReason.java index 49a46327b32756..2acfb472a40c0c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/common/PauseReason.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/common/FailureReason.java @@ -27,17 +27,21 @@ import java.io.DataOutput; import java.io.IOException; -public class PauseReason implements Writable { +public class FailureReason implements Writable { @SerializedName(value = "code") private InternalErrorCode code; @SerializedName(value = "msg") private String msg; - public PauseReason(InternalErrorCode errCode, String msg) { + public FailureReason(InternalErrorCode errCode, String msg) { this.code = errCode; this.msg = msg; } + public FailureReason(String msg) { + this.msg = msg; + } + public InternalErrorCode getCode() { return code; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java index 67b56dbc13df20..a4d6793b0b8bcb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java @@ -99,6 +99,7 @@ public class InsertJob extends AbstractJob> impl .addAll(COMMON_SCHEMA) .add(new Column("Comment", ScalarType.createStringType())) // only execute type = streaming need record + .add(new Column("Properties", ScalarType.createStringType())) .add(new Column("Progress", ScalarType.createStringType())) .add(new Column("RemoteOffset", ScalarType.createStringType())) .add(new Column("LoadStatistic", ScalarType.createStringType())) @@ -549,6 +550,7 @@ public TRow getTvfInfo() { trow.addToColumnValue(new TCell().setStringVal(getComment())); trow.addToColumnValue(new TCell().setStringVal(FeConstants.null_string)); trow.addToColumnValue(new TCell().setStringVal(FeConstants.null_string)); + trow.addToColumnValue(new TCell().setStringVal(FeConstants.null_string)); trow.addToColumnValue(new TCell().setStringVal( loadStatistic == null ? FeConstants.null_string : loadStatistic.toJson())); trow.addToColumnValue(new TCell().setStringVal(failMsg == null ? FeConstants.null_string : failMsg.getMsg())); diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java index 34cfdf6edea874..261ca012311704 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java @@ -68,7 +68,8 @@ public class InsertTask extends AbstractTask { new Column("TrackingUrl", ScalarType.createStringType()), new Column("LoadStatistic", ScalarType.createStringType()), new Column("User", ScalarType.createStringType()), - new Column("Offset", ScalarType.createStringType())); + new Column("Offset", ScalarType.createStringType()), + new Column("OtherMsg", ScalarType.createStringType())); public static final ImmutableMap COLUMN_TO_INDEX; @@ -295,6 +296,7 @@ private TRow getPendingTaskTVFInfo(String jobName) { trow.addToColumnValue(new TCell().setStringVal("")); trow.addToColumnValue(new TCell().setStringVal(userIdentity.getQualifiedUser())); trow.addToColumnValue(new TCell().setStringVal("")); + trow.addToColumnValue(new TCell().setStringVal("")); return trow; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java index b872040c4c354a..157629520a9cb4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java @@ -24,7 +24,6 @@ import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; import org.apache.doris.common.FeConstants; -import org.apache.doris.common.InternalErrorCode; import org.apache.doris.common.UserException; import org.apache.doris.common.io.Text; import org.apache.doris.common.util.TimeUtils; @@ -32,10 +31,10 @@ import org.apache.doris.job.base.JobExecuteType; import org.apache.doris.job.base.JobExecutionConfiguration; import org.apache.doris.job.base.TimerDefinition; +import org.apache.doris.job.common.FailureReason; import org.apache.doris.job.common.IntervalUnit; import org.apache.doris.job.common.JobStatus; import org.apache.doris.job.common.JobType; -import org.apache.doris.job.common.PauseReason; import org.apache.doris.job.common.TaskStatus; import org.apache.doris.job.common.TaskType; import org.apache.doris.job.exception.JobException; @@ -72,6 +71,7 @@ import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantReadWriteLock; @Log4j2 @@ -80,7 +80,8 @@ public class StreamingInsertJob extends AbstractJob loadJobs = Env.getCurrentEnv().getLoadManager().queryLoadJobsByJobIds(taskIds); if (loadJobs.size() != 1) { + shouldRealseLock = true; throw new TransactionException("load job not found, insert job id is " + runningStreamTask.getTaskId()); } LoadJob loadJob = loadJobs.get(0); @@ -377,7 +387,7 @@ public void beforeCommitted(TransactionState txnState) throws TransactionExcepti loadStatistic.getLoadBytes(), loadStatistic.getFileNumber(), loadStatistic.getTotalFileSizeB(), - runningStreamTask.getRunningOffset())); + runningStreamTask.getRunningOffset().toJson())); } finally { if (shouldRealseLock) { lock.writeLock().unlock(); @@ -387,6 +397,7 @@ public void beforeCommitted(TransactionState txnState) throws TransactionExcepti @Override public void beforeAborted(TransactionState txnState) throws TransactionException { + } @Override @@ -403,6 +414,7 @@ public void replayOnCommitted(TransactionState txnState) { StreamingTaskTxnCommitAttachment attachment = (StreamingTaskTxnCommitAttachment) txnState.getTxnCommitAttachment(); updateJobStatisticAndOffset(attachment); + succeedTaskCount.incrementAndGet(); } public void replayOnCloudMode() throws UserException { @@ -462,5 +474,19 @@ public void gsonPostProcess() throws IOException { if (jobProperties == null && properties != null) { jobProperties = new StreamingJobProperties(properties); } + + if (null == getSucceedTaskCount()) { + setSucceedTaskCount(new AtomicLong(0)); + } + if (null == getFailedTaskCount()) { + setFailedTaskCount(new AtomicLong(0)); + } + if (null == getCanceledTaskCount()) { + setCanceledTaskCount(new AtomicLong(0)); + } + + if (null == lock) { + this.lock = new ReentrantReadWriteLock(true); + } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java index c171173fd3c772..3073142c151b09 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java @@ -55,6 +55,8 @@ public class StreamingInsertTask { @Setter private TaskStatus status; private String errMsg; + @Setter + private String otherMsg; private Long createTimeMs; private Long startTimeMs; private Long finishTimeMs; @@ -99,8 +101,8 @@ public void execute() throws JobException { if (TaskStatus.CANCELED.equals(status)) { return; } - onFail(e.getMessage()); log.warn("execute task error, job id is {}, task id is {}", jobId, taskId, e); + onFail(e.getMessage()); } finally { // The cancel logic will call the closeOrReleased Resources method by itself. // If it is also called here, diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java index 6f083a82c553c9..b658c97c828f31 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java @@ -17,15 +17,21 @@ package org.apache.doris.job.extensions.insert.streaming; +import org.apache.doris.catalog.Env; +import org.apache.doris.common.FeConstants; import org.apache.doris.common.InternalErrorCode; import org.apache.doris.common.util.TimeUtils; +import org.apache.doris.job.common.FailureReason; import org.apache.doris.job.common.JobStatus; -import org.apache.doris.job.common.PauseReason; import org.apache.doris.job.exception.JobException; import org.apache.doris.job.task.AbstractTask; +import org.apache.doris.load.loadv2.LoadJob; import org.apache.doris.thrift.TCell; import org.apache.doris.thrift.TRow; +import java.util.Arrays; +import java.util.List; + public class StreamingJobSchedulerTask extends AbstractTask { private static final long BACK_OFF_BASIC_TIME_SEC = 10L; private static final long MAX_BACK_OFF_TIME_SEC = 60 * 5; @@ -55,15 +61,15 @@ public void run() throws JobException { } private void autoResumeHandler() throws JobException { - final PauseReason pauseReason = streamingInsertJob.getPauseReason(); + final FailureReason failureReason = streamingInsertJob.getFailureReason(); final long latestAutoResumeTimestamp = streamingInsertJob.getLatestAutoResumeTimestamp(); final long autoResumeCount = streamingInsertJob.getAutoResumeCount(); final long current = System.currentTimeMillis(); - if (pauseReason != null - && pauseReason.getCode() != InternalErrorCode.MANUAL_PAUSE_ERR - && pauseReason.getCode() != InternalErrorCode.TOO_MANY_FAILURE_ROWS_ERR - && pauseReason.getCode() != InternalErrorCode.CANNOT_RESUME_ERR) { + if (failureReason != null + && failureReason.getCode() != InternalErrorCode.MANUAL_PAUSE_ERR + && failureReason.getCode() != InternalErrorCode.TOO_MANY_FAILURE_ROWS_ERR + && failureReason.getCode() != InternalErrorCode.CANNOT_RESUME_ERR) { long autoResumeIntervalTimeSec = autoResumeCount < 5 ? Math.min((long) Math.pow(2, autoResumeCount) * BACK_OFF_BASIC_TIME_SEC, MAX_BACK_OFF_TIME_SEC) : MAX_BACK_OFF_TIME_SEC; @@ -107,20 +113,40 @@ public TRow getTvfInfo(String jobName) { trow.addToColumnValue(new TCell().setStringVal(runningTask.getErrMsg())); // create time trow.addToColumnValue(new TCell().setStringVal(TimeUtils.longToTimeString(runningTask.getCreateTimeMs()))); - trow.addToColumnValue(new TCell().setStringVal(null == getStartTimeMs() ? "" + trow.addToColumnValue(new TCell().setStringVal(null == getStartTimeMs() ? FeConstants.null_string : TimeUtils.longToTimeString(runningTask.getStartTimeMs()))); // load end time trow.addToColumnValue(new TCell().setStringVal(TimeUtils.longToTimeString(runningTask.getFinishTimeMs()))); - // tracking url - trow.addToColumnValue(new TCell().setStringVal("trackingUrl")); - trow.addToColumnValue(new TCell().setStringVal("statistic")); + + List loadJobs = Env.getCurrentEnv().getLoadManager() + .queryLoadJobsByJobIds(Arrays.asList(runningTask.getTaskId())); + if (!loadJobs.isEmpty()) { + LoadJob loadJob = loadJobs.get(0); + if (loadJob.getLoadingStatus() != null && loadJob.getLoadingStatus().getTrackingUrl() != null) { + trow.addToColumnValue(new TCell().setStringVal(loadJob.getLoadingStatus().getTrackingUrl())); + } else { + trow.addToColumnValue(new TCell().setStringVal(FeConstants.null_string)); + } + + if (loadJob.getLoadStatistic() != null) { + trow.addToColumnValue(new TCell().setStringVal(loadJob.getLoadStatistic().toJson())); + } else { + trow.addToColumnValue(new TCell().setStringVal(FeConstants.null_string)); + } + } else { + trow.addToColumnValue(new TCell().setStringVal(FeConstants.null_string)); + trow.addToColumnValue(new TCell().setStringVal(FeConstants.null_string)); + } + if (runningTask.getUserIdentity() == null) { - trow.addToColumnValue(new TCell().setStringVal("")); + trow.addToColumnValue(new TCell().setStringVal(FeConstants.null_string)); } else { trow.addToColumnValue(new TCell().setStringVal(runningTask.getUserIdentity().getQualifiedUser())); } - trow.addToColumnValue(new TCell().setStringVal(runningTask.getRunningOffset() == null ? "" + trow.addToColumnValue(new TCell().setStringVal(runningTask.getRunningOffset() == null ? FeConstants.null_string : runningTask.getRunningOffset().toJson())); + trow.addToColumnValue(new TCell().setStringVal(null == runningTask.getOtherMsg() + ? FeConstants.null_string : runningTask.getOtherMsg())); return trow; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingTaskTxnCommitAttachment.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingTaskTxnCommitAttachment.java index 8660ed94739c0f..4b7590824b4ee2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingTaskTxnCommitAttachment.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingTaskTxnCommitAttachment.java @@ -18,7 +18,6 @@ package org.apache.doris.job.extensions.insert.streaming; import org.apache.doris.cloud.proto.Cloud.StreamingTaskCommitAttachmentPB; -import org.apache.doris.job.offset.Offset; import org.apache.doris.transaction.TransactionState; import org.apache.doris.transaction.TxnCommitAttachment; @@ -28,7 +27,7 @@ public class StreamingTaskTxnCommitAttachment extends TxnCommitAttachment { public StreamingTaskTxnCommitAttachment(long jobId, long taskId, - long scannedRows, long loadBytes, long fileNumber, long fileSize, Offset offset) { + long scannedRows, long loadBytes, long fileNumber, long fileSize, String offset) { super(TransactionState.LoadJobSourceType.STREAMING_JOB); this.jobId = jobId; this.taskId = taskId; @@ -45,7 +44,7 @@ public StreamingTaskTxnCommitAttachment(StreamingTaskCommitAttachmentPB pb) { this.loadBytes = pb.getLoadBytes(); this.fileNumber = pb.getFileNumber(); this.fileSize = pb.getFileSize(); - this.offset.setEndOffset(pb.getOffset()); + this.offset = pb.getOffset(); } @Getter @@ -66,7 +65,7 @@ public StreamingTaskTxnCommitAttachment(StreamingTaskCommitAttachmentPB pb) { private long fileSize; @SerializedName(value = "of") @Getter - private Offset offset; + private String offset; @Override public String toString() { @@ -75,7 +74,7 @@ public String toString() { + ", loadBytes=" + loadBytes + ", fileNumber=" + fileNumber + ", fileSize=" + fileSize - + ", offset=" + offset.toString() + + ", offset=" + offset + "]"; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java b/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java index e670dac553cf4b..d7e1bee4669384 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java @@ -81,5 +81,11 @@ public interface SourceOffsetProvider { */ boolean hasMoreDataToConsume(); + /** + * Deserialize string offset to Offset + * @return + */ + Offset deserializeOffset(String offset); + } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3Offset.java b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3Offset.java index 2ab2030fbbb3e9..3d260218886223 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3Offset.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3Offset.java @@ -20,6 +20,7 @@ import org.apache.doris.job.offset.Offset; import org.apache.doris.persist.gson.GsonUtils; +import com.google.gson.annotations.SerializedName; import lombok.Getter; import lombok.Setter; @@ -27,6 +28,7 @@ @Setter public class S3Offset implements Offset { String startFile; + @SerializedName("ef") String endFile; String fileLists; diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java index f63333468fa426..e429ae7375b151 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java @@ -29,6 +29,7 @@ import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand; import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; +import org.apache.doris.persist.gson.GsonUtils; import com.google.common.collect.Maps; import lombok.extern.log4j.Log4j2; @@ -72,8 +73,7 @@ public S3Offset getNextOffset(StreamingJobProperties jobProps, Map { Env.getCurrentEnv().getJobManager().getStreamingTaskManager().registerTask(task); }, delayMs, TimeUnit.MILLISECONDS); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java index 80a0886017ea6d..a7b67f2ffda584 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java @@ -1163,11 +1163,10 @@ private void checkJobNameKey(String key, String keyFormat, DorisParser.Supported @Override public LogicalPlan visitAlterJob(DorisParser.AlterJobContext ctx) { - checkJobNameKey(stripQuotes(ctx.jobNameKey.getText()), JOB_NAME, ctx); Map properties = ctx.propertyClause() != null ? Maps.newHashMap(visitPropertyClause(ctx.propertyClause())) : Maps.newHashMap(); String executeSql = getOriginSql(ctx.supportedDmlStatement()); - return new AlterJobCommand(stripQuotes(ctx.jobNameKey.getText()), properties, executeSql); + return new AlterJobCommand(ctx.jobName.getText(), properties, executeSql); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobCommand.java index b1823de4a1d50f..786c8184e407ad 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobCommand.java @@ -80,7 +80,7 @@ private AbstractJob analyzeAndBuildJobInfo(ConnectContext ctx) throws JobExcepti Map updateProps = properties == null || properties.isEmpty() ? originJob.getProperties() : properties; - return new StreamingInsertJob(jobName, + StreamingInsertJob streamingInsertJob = new StreamingInsertJob(jobName, job.getJobStatus(), job.getCurrentDbName(), job.getComment(), @@ -89,6 +89,8 @@ private AbstractJob analyzeAndBuildJobInfo(ConnectContext ctx) throws JobExcepti System.currentTimeMillis(), updateSQL, updateProps); + streamingInsertJob.setJobId(job.getJobId()); + return streamingInsertJob; } else { throw new JobException("Unsupported job type for ALTER:" + job.getJobType()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java b/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java index af79329f681930..dffd7a8c80e7db 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java @@ -183,6 +183,7 @@ import org.apache.doris.fs.remote.dfs.OFSFileSystem; import org.apache.doris.job.extensions.insert.InsertJob; import org.apache.doris.job.extensions.insert.streaming.StreamingInsertJob; +import org.apache.doris.job.extensions.insert.streaming.StreamingTaskTxnCommitAttachment; import org.apache.doris.job.extensions.mtmv.MTMVJob; import org.apache.doris.load.loadv2.BrokerLoadJob; import org.apache.doris.load.loadv2.BulkLoadJob; @@ -557,7 +558,9 @@ public class GsonUtils { .registerDefaultSubtype(TxnCommitAttachment.class) .registerSubtype(LoadJobFinalOperation.class, LoadJobFinalOperation.class.getSimpleName()) .registerSubtype(MiniLoadTxnCommitAttachment.class, MiniLoadTxnCommitAttachment.class.getSimpleName()) - .registerSubtype(RLTaskTxnCommitAttachment.class, RLTaskTxnCommitAttachment.class.getSimpleName()); + .registerSubtype(RLTaskTxnCommitAttachment.class, RLTaskTxnCommitAttachment.class.getSimpleName()) + .registerSubtype(StreamingTaskTxnCommitAttachment.class, + StreamingTaskTxnCommitAttachment.class.getSimpleName()); // runtime adapter for class "RoutineLoadProgress". private static RuntimeTypeAdapterFactory routineLoadTypeAdapterFactory diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/DropFunctionTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/DropFunctionTest.java index b2de57f911001b..bef0bb92d20593 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/catalog/DropFunctionTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/DropFunctionTest.java @@ -23,7 +23,6 @@ import org.apache.doris.nereids.trees.plans.commands.CreateDatabaseCommand; import org.apache.doris.nereids.trees.plans.commands.CreateFunctionCommand; import org.apache.doris.nereids.trees.plans.commands.DropFunctionCommand; -import org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand; import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.StmtExecutor; @@ -36,9 +35,7 @@ import org.junit.Test; import java.io.File; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.UUID; @@ -66,15 +63,9 @@ public static void teardown() { public void testDropGlobalFunction() throws Exception { ConnectContext ctx = UtFrameUtils.createDefaultCtx(); // 1. create database db1 - //String sql = "create database db1;"; - String sql = "insert into db1.tb select * from s3('url'='s3://a/*.csv')"; + String sql = "create database db1;"; NereidsParser nereidsParser = new NereidsParser(); LogicalPlan logicalPlan = nereidsParser.parseSingle(sql); - InsertIntoTableCommand baseCommand = (InsertIntoTableCommand) new NereidsParser().parseSingle(sql); - baseCommand.initPlan(ConnectContext.get(), ConnectContext.get().getExecutor(), false); - Map map = new HashMap<>(); - map.put("url", "s3:/xxxx/*."); - StmtExecutor stmtExecutor = new StmtExecutor(connectContext, sql); if (logicalPlan instanceof CreateDatabaseCommand) { ((CreateDatabaseCommand) logicalPlan).run(connectContext, stmtExecutor); diff --git a/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job.groovy b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job.groovy index bd8ee2759fd138..3982876b9f4303 100644 --- a/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job.groovy +++ b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job.groovy @@ -42,9 +42,13 @@ suite("test_streaming_insert_job") { """ - // create recurring job + // create streaming job sql """ - CREATE JOB ${jobName} ON STREAMING DO INSERT INTO ${tableName} + CREATE JOB ${jobName} + PROPERTIES( + "s3.batch_files" = "1" + ) + ON STREAMING DO INSERT INTO ${tableName} SELECT * FROM S3 ( "uri" = "s3://${s3BucketName}/regression/load/data/example_[0-1].csv", @@ -57,12 +61,13 @@ suite("test_streaming_insert_job") { "s3.secret_key" = "${getS3SK()}" ); """ - Awaitility.await().atMost(30, SECONDS).until( + Awaitility.await().atMost(30, SECONDS) + .pollInterval(1, SECONDS).until( { print("check success task count") def jobSuccendCount = sql """ select SucceedTaskCount from jobs("type"="insert") where Name like '%${jobName}%' and ExecuteType='STREAMING' """ - // check job status and succeed task count larger than 1 - jobSuccendCount.size() == 1 && '1' <= jobSuccendCount.get(0).get(0) + // check job status and succeed task count larger than 2 + jobSuccendCount.size() == 1 && '2' <= jobSuccendCount.get(0).get(0) } ) @@ -76,6 +81,40 @@ suite("test_streaming_insert_job") { qt_select """ SELECT * FROM ${tableName} order by c1 """ + def jobOffset = sql """ + select progress, remoteoffset from jobs("type"="insert") where Name='${jobName}' + """ + assert jobOffset.get(0).get(0) == "regression/load/data/example_1.csv" + assert jobOffset.get(0).get(1) == "regression/load/data/example_1.csv" + //todo check status + + // alter streaming job + sql """ + ALTER JOB FOR ${jobName} + PROPERTIES( + "s3.batch_files" = "1", + "session.insert_max_filter_ratio" = "0.5" + ) + INSERT INTO ${tableName} + SELECT * FROM S3 + ( + "uri" = "s3://${s3BucketName}/regression/load/data/example_[0-1].csv", + "format" = "csv", + "provider" = "${getS3Provider()}", + "column_separator" = ",", + "s3.endpoint" = "${getS3Endpoint()}", + "s3.region" = "${getS3Region()}", + "s3.access_key" = "${getS3AK()}", + "s3.secret_key" = "${getS3SK()}" + ); + """ + + def alterJobProperties = sql """ + select properties from jobs("type"="insert") where Name='${jobName}' + """ + assert alterJobProperties.get(0).get(0) == "{\"s3.batch_files\":\"1\",\"session.insert_max_filter_ratio\":\"0.5\"}" + + sql """ DROP JOB IF EXISTS where jobname = '${jobName}' """ @@ -83,4 +122,6 @@ suite("test_streaming_insert_job") { def jobCountRsp = sql """select count(1) from jobs("type"="insert") where Name ='${jobName}'""" assert jobCountRsp.get(0).get(0) == 0 + + }