From aa04d4d26df3012353c4ca3e1b521697210cb9bd Mon Sep 17 00:00:00 2001 From: wudi Date: Mon, 12 Jan 2026 15:08:41 +0800 Subject: [PATCH 1/5] fix get remote meta failed to pause job --- .../insert/streaming/StreamingInsertJob.java | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java index 011894f6287447..652e18bd05c375 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java @@ -111,7 +111,7 @@ public class StreamingInsertJob extends AbstractJob queryAllStreamTasks() { return tasks; } - protected void fetchMeta() { + protected void fetchMeta() throws JobException { try { if (tvfType != null) { if (originTvfProps == null) { @@ -516,10 +516,16 @@ protected void fetchMeta() { offsetProvider.fetchRemoteMeta(new HashMap<>()); } } catch (Exception ex) { - //todo: The job status = MANUAL_PAUSE_ERR, No need to set failureReason again log.warn("fetch remote meta failed, job id: {}", getJobId(), ex); - failureReason = new FailureReason(InternalErrorCode.GET_REMOTE_DATA_ERROR, - "Failed to fetch meta, " + ex.getMessage()); + if (this.getFailureReason() != null + && !InternalErrorCode.MANUAL_PAUSE_ERR.equals(this.getFailureReason().getCode())) { + // When a job is manually paused, it does not need to be set again, + // otherwise, it may be woken up by auto resume. + this.setFailureReason( + new FailureReason(InternalErrorCode.GET_REMOTE_DATA_ERROR, + "Failed to fetch meta, " + ex.getMessage())); + this.updateJobStatus(JobStatus.PAUSED); + } } } From b2ffd83bd74b54638c9b1157aaa620bec104e403 Mon Sep 17 00:00:00 2001 From: wudi Date: Mon, 12 Jan 2026 15:14:05 +0800 Subject: [PATCH 2/5] fix --- .../job/extensions/insert/streaming/StreamingInsertJob.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java index 652e18bd05c375..2311ef54572b5b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java @@ -524,6 +524,8 @@ protected void fetchMeta() throws JobException { this.setFailureReason( new FailureReason(InternalErrorCode.GET_REMOTE_DATA_ERROR, "Failed to fetch meta, " + ex.getMessage())); + // If fetching meta fails, the job is paused + // and auto resume will automatically wake it up. this.updateJobStatus(JobStatus.PAUSED); } } From c5fab237661c009795a182678477e8020d954c09 Mon Sep 17 00:00:00 2001 From: wudi Date: Mon, 12 Jan 2026 15:28:30 +0800 Subject: [PATCH 3/5] fix --- .../job/extensions/insert/streaming/StreamingInsertJob.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java index 2311ef54572b5b..274cb25c2ac990 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java @@ -517,8 +517,8 @@ protected void fetchMeta() throws JobException { } } catch (Exception ex) { log.warn("fetch remote meta failed, job id: {}", getJobId(), ex); - if (this.getFailureReason() != null - && !InternalErrorCode.MANUAL_PAUSE_ERR.equals(this.getFailureReason().getCode())) { + if (this.getFailureReason() == null + || !InternalErrorCode.MANUAL_PAUSE_ERR.equals(this.getFailureReason().getCode())) { // When a job is manually paused, it does not need to be set again, // otherwise, it may be woken up by auto resume. this.setFailureReason( From 743d999081f19acf81fd60165fea86ed0205e5f6 Mon Sep 17 00:00:00 2001 From: wudi Date: Mon, 12 Jan 2026 18:27:54 +0800 Subject: [PATCH 4/5] fix case --- .../cdc/test_streaming_mysql_job_priv.groovy | 22 ++++++++++++++----- .../test_streaming_postgres_job_priv.groovy | 4 ++-- 2 files changed, 19 insertions(+), 7 deletions(-) diff --git a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_priv.groovy b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_priv.groovy index aae1bc581341d7..f9426196cfb54f 100644 --- a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_priv.groovy +++ b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_priv.groovy @@ -173,11 +173,23 @@ suite("test_streaming_mysql_job_priv", "p0,external,mysql,external_docker,extern sql """INSERT INTO ${mysqlDb}.${tableName} (name,age) VALUES ('DorisTestPriv',28);""" } - sleep(20000) - - def jobErrorMsg = sql """select ErrorMsg from jobs("type"="insert") where Name='${jobName}'""" - log.info("jobErrorMsg: " + jobErrorMsg) - assert jobErrorMsg.get(0).get(0).contains("Failed to fetch meta") + try { + Awaitility.await().atMost(300, SECONDS) + .pollInterval(1, SECONDS).until( + { + def jobStatus = sql """ select status, ErrorMsg from jobs("type"="insert") where Name = '${jobName}' and ExecuteType='STREAMING' """ + log.info("jobStatus: " + jobStatus) + // check job status + jobStatus.size() == 1 && 'PAUSED' == jobStatus.get(0).get(0) && jobStatus.get(0).get(1).contains("Failed to fetch meta") + } + ) + } catch (Exception ex){ + def showjob = sql """select * from jobs("type"="insert") where Name='${jobName}'""" + def showtask = sql """select * from tasks("type"="insert") where JobName='${jobName}'""" + log.info("show job: " + showjob) + log.info("show task: " + showtask) + throw ex; + } // grant binlog priv to mysqluser connect("root", "123456", "jdbc:mysql://${externalEnvIp}:${mysql_port}") { diff --git a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_priv.groovy b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_priv.groovy index 6b70301e43dcd9..fc3c744581ef99 100644 --- a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_priv.groovy +++ b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_priv.groovy @@ -90,12 +90,12 @@ suite("test_streaming_postgres_job_priv", "p0,external,pg,external_docker,extern // check job running try { Awaitility.await().atMost(300, SECONDS) - .pollInterval(3, SECONDS).until( + .pollInterval(1, SECONDS).until( { def jobStatus = sql """ select status, ErrorMsg from jobs("type"="insert") where Name = '${jobName}' and ExecuteType='STREAMING' """ log.info("jobStatus: " + jobStatus) // check job status - jobStatus.size() == 1 && 'RUNNING' == jobStatus.get(0).get(0) && jobStatus.get(0).get(1).contains("Failed to fetch meta") + jobStatus.size() == 1 && 'PAUSED' == jobStatus.get(0).get(0) && jobStatus.get(0).get(1).contains("Failed to fetch meta") } ) } catch (Exception ex){ From 6bc3e07b9eb4ff07790931bf67bab037e00c8226 Mon Sep 17 00:00:00 2001 From: wudi Date: Mon, 12 Jan 2026 19:11:28 +0800 Subject: [PATCH 5/5] add message show --- .../source/reader/postgres/PostgresSourceReader.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java index 3e94700f2b464f..52c3674444b37c 100644 --- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java @@ -26,6 +26,7 @@ import org.apache.doris.job.cdc.request.JobBaseConfig; import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.flink.api.connector.source.SourceSplit; import org.apache.flink.cdc.connectors.base.config.JdbcSourceConfig; import org.apache.flink.cdc.connectors.base.dialect.JdbcDataSourceDialect; @@ -116,8 +117,8 @@ private void createSlotForGlobalStreamSplit(PostgresDialect postgresDialect) { } catch (Throwable t) { throw new CdcClientException( String.format( - "Fail to get or create slot for global stream split, the slot name is %s. Due to: ", - postgresDialect.getSlotName()), + "Fail to get or create slot, the slot name is %s. Due to: %s ", + postgresDialect.getSlotName(), ExceptionUtils.getRootCauseMessage(t)), t); } }