From e6a6ad6a32539d6b3501ad2693906df66aae10ed Mon Sep 17 00:00:00 2001 From: wudi Date: Tue, 23 Sep 2025 16:27:19 +0800 Subject: [PATCH] fix job api --- .../apache/doris/job/base/AbstractJob.java | 9 +++--- .../java/org/apache/doris/job/base/Job.java | 7 +++++ .../org/apache/doris/job/common/JobUtils.java | 30 ------------------- .../job/executor/DispatchTaskHandler.java | 3 +- .../job/executor/TimerJobSchedulerTask.java | 3 +- .../job/extensions/insert/InsertJob.java | 7 ++--- .../job/extensions/insert/InsertTask.java | 5 +--- .../insert/streaming/StreamingInsertJob.java | 20 +++++++++---- .../streaming/StreamingJobSchedulerTask.java | 2 -- .../job/offset/SourceOffsetProvider.java | 8 ++--- .../apache/doris/job/offset/s3/S3Offset.java | 4 +-- .../job/offset/s3/S3SourceOffsetProvider.java | 12 ++++---- .../doris/job/scheduler/JobScheduler.java | 7 ++--- .../test_streaming_insert_job.groovy | 14 ++++----- 14 files changed, 54 insertions(+), 77 deletions(-) delete mode 100644 fe/fe-core/src/main/java/org/apache/doris/job/common/JobUtils.java diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java b/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java index 5b03ae6d18b0fc..6a113d1a042ca9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java @@ -178,6 +178,11 @@ protected static long getNextJobId() { return System.nanoTime() + RandomUtils.nextInt(); } + @Override + public boolean isJobRunning() { + return JobStatus.RUNNING.equals(getJobStatus()); + } + @Override public void cancelTaskById(long taskId) throws JobException { if (CollectionUtils.isEmpty(runningTasks)) { @@ -484,8 +489,4 @@ public void onReplayEnd(AbstractJob replayJob) throws JobException { public boolean needPersist() { return true; } - - public boolean isFinalStatus() { - return jobStatus.equals(JobStatus.STOPPED) || jobStatus.equals(JobStatus.FINISHED); - } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/base/Job.java b/fe/fe-core/src/main/java/org/apache/doris/job/base/Job.java index 69d1e5e55fb01d..3afcd5eef9b93d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/base/Job.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/base/Job.java @@ -68,6 +68,13 @@ public interface Job { */ boolean isReadyForScheduling(C taskContext); + /** + * Checks if the job is running. + * + * @return True if the job is runnning. + */ + boolean isJobRunning(); + /** * Retrieves the metadata for the job, which is used to display job information. * diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/common/JobUtils.java b/fe/fe-core/src/main/java/org/apache/doris/job/common/JobUtils.java deleted file mode 100644 index 93ff5f41480b38..00000000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/job/common/JobUtils.java +++ /dev/null @@ -1,30 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.job.common; - -import org.apache.doris.job.base.AbstractJob; -import org.apache.doris.job.base.JobExecuteType; - -public class JobUtils { - public static boolean checkNeedSchedule(AbstractJob job) { - if (job.getJobConfig().getExecuteType() == JobExecuteType.STREAMING) { - return !job.isFinalStatus(); - } - return job.getJobStatus() == JobStatus.RUNNING; - } -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/executor/DispatchTaskHandler.java b/fe/fe-core/src/main/java/org/apache/doris/job/executor/DispatchTaskHandler.java index 35b1f351f723d5..228b9724863645 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/executor/DispatchTaskHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/executor/DispatchTaskHandler.java @@ -19,7 +19,6 @@ import org.apache.doris.job.base.AbstractJob; import org.apache.doris.job.common.JobType; -import org.apache.doris.job.common.JobUtils; import org.apache.doris.job.common.TaskType; import org.apache.doris.job.disruptor.TimerJobEvent; import org.apache.doris.job.task.AbstractTask; @@ -55,7 +54,7 @@ public void onEvent(TimerJobEvent event) { log.info("job is null,may be job is deleted, ignore"); return; } - if (event.getJob().isReadyForScheduling(null) && JobUtils.checkNeedSchedule(event.getJob())) { + if (event.getJob().isReadyForScheduling(null) && event.getJob().isJobRunning()) { List tasks = event.getJob().commonCreateTasks(TaskType.SCHEDULED, null); if (CollectionUtils.isEmpty(tasks)) { log.warn("job is ready for scheduling, but create task is empty, skip scheduler," diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/executor/TimerJobSchedulerTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/executor/TimerJobSchedulerTask.java index 0f58452ff7b065..4abf72023a4d27 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/executor/TimerJobSchedulerTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/executor/TimerJobSchedulerTask.java @@ -18,7 +18,6 @@ package org.apache.doris.job.executor; import org.apache.doris.job.base.AbstractJob; -import org.apache.doris.job.common.JobUtils; import org.apache.doris.job.disruptor.TaskDisruptor; import io.netty.util.Timeout; @@ -40,7 +39,7 @@ public TimerJobSchedulerTask(TaskDisruptor dispatchDisruptor, T job) { @Override public void run(Timeout timeout) { try { - if (!JobUtils.checkNeedSchedule(job)) { + if (!job.isJobRunning()) { log.info("job status is not running, job id is {}, skip dispatch", this.job.getJobId()); return; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java index cc16cbb603d351..305089378a0ae9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java @@ -100,8 +100,8 @@ public class InsertJob extends AbstractJob> impl .add(new Column("Comment", ScalarType.createStringType())) // only execute type = streaming need record .add(new Column("Properties", ScalarType.createStringType())) - .add(new Column("ConsumedOffset", ScalarType.createStringType())) - .add(new Column("MaxOffset", ScalarType.createStringType())) + .add(new Column("CurrentOffset", ScalarType.createStringType())) + .add(new Column("EndOffset", ScalarType.createStringType())) .add(new Column("LoadStatistic", ScalarType.createStringType())) .add(new Column("ErrorMsg", ScalarType.createStringType())) .build(); @@ -114,15 +114,12 @@ public class InsertJob extends AbstractJob> impl .addColumn(new Column("EtlInfo", ScalarType.createVarchar(100))) .addColumn(new Column("TaskInfo", ScalarType.createVarchar(100))) .addColumn(new Column("ErrorMsg", ScalarType.createVarchar(100))) - .addColumn(new Column("CreateTimeMs", ScalarType.createVarchar(20))) .addColumn(new Column("FinishTimeMs", ScalarType.createVarchar(20))) .addColumn(new Column("TrackingUrl", ScalarType.createVarchar(200))) .addColumn(new Column("LoadStatistic", ScalarType.createVarchar(200))) .addColumn(new Column("User", ScalarType.createVarchar(50))) .addColumn(new Column("FirstErrorMsg", ScalarType.createVarchar(200))) - // only execute type = streaming need record - .addColumn(new Column("Offset", ScalarType.createStringType())) .build(); public static final ImmutableMap COLUMN_TO_INDEX; diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java index 5875699fb0f50f..aa1ecc02a9f8e1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java @@ -68,8 +68,7 @@ public class InsertTask extends AbstractTask { new Column("TrackingUrl", ScalarType.createStringType()), new Column("LoadStatistic", ScalarType.createStringType()), new Column("User", ScalarType.createStringType()), - new Column("FirstErrorMsg", ScalarType.createStringType()), - new Column("Offset", ScalarType.createStringType())); + new Column("FirstErrorMsg", ScalarType.createStringType())); public static final ImmutableMap COLUMN_TO_INDEX; @@ -277,7 +276,6 @@ public TRow getTvfInfo(String jobName) { trow.addToColumnValue(new TCell().setStringVal(userIdentity.getQualifiedUser())); } trow.addToColumnValue(new TCell().setStringVal(firstErrorMsg == null ? "" : firstErrorMsg)); - trow.addToColumnValue(new TCell().setStringVal("")); return trow; } @@ -299,7 +297,6 @@ private TRow getPendingTaskTVFInfo(String jobName) { trow.addToColumnValue(new TCell().setStringVal("")); trow.addToColumnValue(new TCell().setStringVal(userIdentity.getQualifiedUser())); trow.addToColumnValue(new TCell().setStringVal("")); - trow.addToColumnValue(new TCell().setStringVal("")); return trow; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java index b32ecdcb94ec25..f70cbc37fcae07 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java @@ -63,6 +63,7 @@ import lombok.Setter; import lombok.extern.log4j.Log4j2; import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang3.StringUtils; import java.io.DataOutput; import java.io.IOException; @@ -203,7 +204,16 @@ protected void checkJobParamsInternal() { @Override public boolean isReadyForScheduling(Map taskContext) { - return CollectionUtils.isEmpty(getRunningTasks()); + return CollectionUtils.isEmpty(getRunningTasks()) && !isFinalStatus(); + } + + @Override + public boolean isJobRunning() { + return !isFinalStatus(); + } + + private boolean isFinalStatus() { + return getJobStatus().equals(JobStatus.STOPPED) || getJobStatus().equals(JobStatus.FINISHED); } @Override @@ -354,14 +364,14 @@ public TRow getTvfInfo() { trow.addToColumnValue(new TCell().setStringVal(properties != null ? GsonUtils.GSON.toJson(properties) : FeConstants.null_string)); - if (offsetProvider != null && offsetProvider.getConsumedOffset() != null) { - trow.addToColumnValue(new TCell().setStringVal(offsetProvider.getConsumedOffset())); + if (offsetProvider != null && StringUtils.isNotEmpty(offsetProvider.getShowCurrentOffset())) { + trow.addToColumnValue(new TCell().setStringVal(offsetProvider.getShowCurrentOffset())); } else { trow.addToColumnValue(new TCell().setStringVal(FeConstants.null_string)); } - if (offsetProvider != null && offsetProvider.getMaxOffset() != null) { - trow.addToColumnValue(new TCell().setStringVal(offsetProvider.getMaxOffset())); + if (offsetProvider != null && StringUtils.isNotEmpty(offsetProvider.getShowMaxOffset())) { + trow.addToColumnValue(new TCell().setStringVal(offsetProvider.getShowMaxOffset())); } else { trow.addToColumnValue(new TCell().setStringVal(FeConstants.null_string)); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java index a1c8bce649d46a..0cd023b7d1d9ec 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java @@ -154,8 +154,6 @@ public TRow getTvfInfo(String jobName) { trow.addToColumnValue(new TCell().setStringVal(runningTask.getUserIdentity().getQualifiedUser())); } trow.addToColumnValue(new TCell().setStringVal("")); - trow.addToColumnValue(new TCell().setStringVal(runningTask.getRunningOffset() == null ? FeConstants.null_string - : runningTask.getRunningOffset().toString())); return trow; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java b/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java index a27b66812c21c0..d2cf4e62053927 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java @@ -40,16 +40,16 @@ public interface SourceOffsetProvider { Offset getNextOffset(StreamingJobProperties jobProps, Map properties); /** - * Get consumered offset to show + * Get current offset to show * @return */ - String getConsumedOffset(); + String getShowCurrentOffset(); /** - * Get remote datasource max offset + * Get remote datasource max offset to show * @return */ - String getMaxOffset(); + String getShowMaxOffset(); /** * Rewrite the TVF parameters in the SQL based on the current offset. diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3Offset.java b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3Offset.java index b152660873a4c4..6257eed8c70056 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3Offset.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3Offset.java @@ -28,8 +28,6 @@ @Getter @Setter public class S3Offset implements Offset { - // path/1.csv - String startFile; @SerializedName("endFile") String endFile; // s3://bucket/path/{1.csv,2.csv} @@ -47,6 +45,6 @@ public boolean isEmpty() { @Override public String toString() { - return "{ \"startFile\": \"" + startFile + "\", \"endFile\": \"" + endFile + "\" }"; + return "{\"endFile\": \"" + endFile + "\" }"; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java index e6a5cf809a523d..74743d093994db 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java @@ -33,6 +33,7 @@ import org.apache.doris.persist.gson.GsonUtils; import com.google.common.collect.Maps; +import com.google.gson.Gson; import lombok.extern.log4j.Log4j2; import org.apache.commons.lang3.StringUtils; @@ -72,7 +73,6 @@ public S3Offset getNextOffset(StreamingJobProperties jobProps, Map res = new HashMap<>(); + res.put("endFile", maxEndFile); + return new Gson().toJson(res); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java index ab03cce7c9195c..dde5f891efd7e1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java @@ -24,7 +24,6 @@ import org.apache.doris.job.base.AbstractJob; import org.apache.doris.job.base.JobExecuteType; import org.apache.doris.job.common.JobStatus; -import org.apache.doris.job.common.JobUtils; import org.apache.doris.job.common.TaskType; import org.apache.doris.job.disruptor.TaskDisruptor; import org.apache.doris.job.exception.JobException; @@ -110,7 +109,7 @@ private void batchSchedulerTimerJob() { } public void scheduleOneJob(T job) throws JobException { - if (!JobUtils.checkNeedSchedule(job)) { + if (!job.isJobRunning()) { return; } // not-schedule task @@ -145,7 +144,7 @@ public void scheduleOneJob(T job) throws JobException { } public void cycleTimerJobScheduler(T job) { - if (!JobUtils.checkNeedSchedule(job)) { + if (!job.isJobRunning()) { return; } if (!JobExecuteType.RECURRING.equals(job.getJobConfig().getExecuteType())) { @@ -226,7 +225,7 @@ private void executeTimerJobIdsWithinLastTenMinutesWindow() { clearEndJob(job); continue; } - if (JobUtils.checkNeedSchedule(job) && job.getJobConfig().checkIsTimerJob()) { + if (job.isJobRunning() && job.getJobConfig().checkIsTimerJob()) { cycleTimerJobScheduler(job, lastTimeWindowMs); } } diff --git a/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job.groovy b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job.groovy index 54fc5faa2e8950..6865cedd84241d 100644 --- a/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job.groovy +++ b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job.groovy @@ -97,10 +97,10 @@ suite("test_streaming_insert_job") { def jobOffset = sql """ - select ConsumedOffset, MaxOffset from jobs("type"="insert") where Name='${jobName}' + select currentOffset, endoffset from jobs("type"="insert") where Name='${jobName}' """ - assert jobOffset.get(0).get(0) == "regression/load/data/example_1.csv" - assert jobOffset.get(0).get(1) == "regression/load/data/example_1.csv" + assert jobOffset.get(0).get(0) == "{\"endFile\":\"regression/load/data/example_1.csv\"}"; + assert jobOffset.get(0).get(1) == "{\"endFile\":\"regression/load/data/example_1.csv\"}"; // alter streaming job sql """ @@ -123,21 +123,21 @@ suite("test_streaming_insert_job") { """ def alterJobProperties = sql """ - select status,properties,ConsumedOffset from jobs("type"="insert") where Name='${jobName}' + select status,properties,currentOffset from jobs("type"="insert") where Name='${jobName}' """ assert alterJobProperties.get(0).get(0) == "PAUSED" assert alterJobProperties.get(0).get(1) == "{\"s3.max_batch_files\":\"1\",\"session.insert_max_filter_ratio\":\"0.5\"}" - assert alterJobProperties.get(0).get(2) == "regression/load/data/example_1.csv" + assert alterJobProperties.get(0).get(2) == "{\"endFile\":\"regression/load/data/example_1.csv\"}"; sql """ RESUME JOB where jobname = '${jobName}' """ def resumeJobStatus = sql """ - select status,properties,ConsumedOffset from jobs("type"="insert") where Name='${jobName}' + select status,properties,currentOffset from jobs("type"="insert") where Name='${jobName}' """ assert resumeJobStatus.get(0).get(0) == "RUNNING" || resumeJobStatus.get(0).get(0) == "PENDING" assert resumeJobStatus.get(0).get(1) == "{\"s3.max_batch_files\":\"1\",\"session.insert_max_filter_ratio\":\"0.5\"}" - assert resumeJobStatus.get(0).get(2) == "regression/load/data/example_1.csv" + assert resumeJobStatus.get(0).get(2) == "{\"endFile\":\"regression/load/data/example_1.csv\"}"; Awaitility.await().atMost(60, SECONDS) .pollInterval(1, SECONDS).until(