diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java index 011894f6287447..274cb25c2ac990 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java @@ -111,7 +111,7 @@ public class StreamingInsertJob extends AbstractJob queryAllStreamTasks() { return tasks; } - protected void fetchMeta() { + protected void fetchMeta() throws JobException { try { if (tvfType != null) { if (originTvfProps == null) { @@ -516,10 +516,18 @@ protected void fetchMeta() { offsetProvider.fetchRemoteMeta(new HashMap<>()); } } catch (Exception ex) { - //todo: The job status = MANUAL_PAUSE_ERR, No need to set failureReason again log.warn("fetch remote meta failed, job id: {}", getJobId(), ex); - failureReason = new FailureReason(InternalErrorCode.GET_REMOTE_DATA_ERROR, - "Failed to fetch meta, " + ex.getMessage()); + if (this.getFailureReason() == null + || !InternalErrorCode.MANUAL_PAUSE_ERR.equals(this.getFailureReason().getCode())) { + // When a job is manually paused, it does not need to be set again, + // otherwise, it may be woken up by auto resume. + this.setFailureReason( + new FailureReason(InternalErrorCode.GET_REMOTE_DATA_ERROR, + "Failed to fetch meta, " + ex.getMessage())); + // If fetching meta fails, the job is paused + // and auto resume will automatically wake it up. + this.updateJobStatus(JobStatus.PAUSED); + } } } diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java index 3e94700f2b464f..52c3674444b37c 100644 --- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java @@ -26,6 +26,7 @@ import org.apache.doris.job.cdc.request.JobBaseConfig; import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.flink.api.connector.source.SourceSplit; import org.apache.flink.cdc.connectors.base.config.JdbcSourceConfig; import org.apache.flink.cdc.connectors.base.dialect.JdbcDataSourceDialect; @@ -116,8 +117,8 @@ private void createSlotForGlobalStreamSplit(PostgresDialect postgresDialect) { } catch (Throwable t) { throw new CdcClientException( String.format( - "Fail to get or create slot for global stream split, the slot name is %s. Due to: ", - postgresDialect.getSlotName()), + "Fail to get or create slot, the slot name is %s. Due to: %s ", + postgresDialect.getSlotName(), ExceptionUtils.getRootCauseMessage(t)), t); } } diff --git a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_priv.groovy b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_priv.groovy index aae1bc581341d7..f9426196cfb54f 100644 --- a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_priv.groovy +++ b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_priv.groovy @@ -173,11 +173,23 @@ suite("test_streaming_mysql_job_priv", "p0,external,mysql,external_docker,extern sql """INSERT INTO ${mysqlDb}.${tableName} (name,age) VALUES ('DorisTestPriv',28);""" } - sleep(20000) - - def jobErrorMsg = sql """select ErrorMsg from jobs("type"="insert") where Name='${jobName}'""" - log.info("jobErrorMsg: " + jobErrorMsg) - assert jobErrorMsg.get(0).get(0).contains("Failed to fetch meta") + try { + Awaitility.await().atMost(300, SECONDS) + .pollInterval(1, SECONDS).until( + { + def jobStatus = sql """ select status, ErrorMsg from jobs("type"="insert") where Name = '${jobName}' and ExecuteType='STREAMING' """ + log.info("jobStatus: " + jobStatus) + // check job status + jobStatus.size() == 1 && 'PAUSED' == jobStatus.get(0).get(0) && jobStatus.get(0).get(1).contains("Failed to fetch meta") + } + ) + } catch (Exception ex){ + def showjob = sql """select * from jobs("type"="insert") where Name='${jobName}'""" + def showtask = sql """select * from tasks("type"="insert") where JobName='${jobName}'""" + log.info("show job: " + showjob) + log.info("show task: " + showtask) + throw ex; + } // grant binlog priv to mysqluser connect("root", "123456", "jdbc:mysql://${externalEnvIp}:${mysql_port}") { diff --git a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_priv.groovy b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_priv.groovy index 6b70301e43dcd9..fc3c744581ef99 100644 --- a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_priv.groovy +++ b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_priv.groovy @@ -90,12 +90,12 @@ suite("test_streaming_postgres_job_priv", "p0,external,pg,external_docker,extern // check job running try { Awaitility.await().atMost(300, SECONDS) - .pollInterval(3, SECONDS).until( + .pollInterval(1, SECONDS).until( { def jobStatus = sql """ select status, ErrorMsg from jobs("type"="insert") where Name = '${jobName}' and ExecuteType='STREAMING' """ log.info("jobStatus: " + jobStatus) // check job status - jobStatus.size() == 1 && 'RUNNING' == jobStatus.get(0).get(0) && jobStatus.get(0).get(1).contains("Failed to fetch meta") + jobStatus.size() == 1 && 'PAUSED' == jobStatus.get(0).get(0) && jobStatus.get(0).get(1).contains("Failed to fetch meta") } ) } catch (Exception ex){