From 4102f5750574506776fc6c5f92cd8352e92012d3 Mon Sep 17 00:00:00 2001 From: wudi Date: Tue, 13 Jan 2026 20:25:50 +0800 Subject: [PATCH] [Fix](StreamingJob) fix postgres consumer data in multi backend (#59798) ### What problem does this PR solve? Related PR: #59461 1. PostgreSQL uses slots for data consumption, but only one client can use a slot at a time. Therefore, after consuming data from the WAL phase, the slot needs to be closed. This doesn't affect MySQL, but it can be closed to avoid consuming connections. 2. Create pg slot first when create job 3. fix unstable case --- .../streaming/StreamingMultiTblTask.java | 3 +- .../offset/jdbc/JdbcSourceOffsetProvider.java | 55 +++++++++++++++++++ .../controller/ClientController.java | 14 +++++ .../reader/JdbcIncrementalSourceReader.java | 5 +- .../reader/mysql/MySqlSourceReader.java | 5 +- .../reader/postgres/PostgresSourceReader.java | 3 + .../cdc/test_streaming_mysql_job_priv.groovy | 22 +++++++- .../test_streaming_postgres_job_priv.groovy | 52 +++++++++++------- 8 files changed, 133 insertions(+), 26 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java index 08f1bb5ccaf6fe..1f955f0a2c3d3d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java @@ -108,12 +108,13 @@ public void run() throws JobException { log.info("task has been canceled, task id is {}", getTaskId()); return; } - log.info("start to run streaming multi task, offset is {}", runningOffset.toString()); sendWriteRequest(); } private void sendWriteRequest() throws JobException { Backend backend = StreamingJobUtils.selectBackend(); + log.info("start to run streaming multi task {} in backend {}/{}, offset is {}", + taskId, backend.getId(), backend.getHost(), runningOffset.toString()); this.runningBackendId = backend.getId(); WriteRecordRequest params = buildRequestParams(); InternalService.PRequestCdcClientRequest request = InternalService.PRequestCdcClientRequest.newBuilder() diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java b/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java index 560887d61ad1c0..0c114ae8e64d3f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java @@ -18,6 +18,7 @@ package org.apache.doris.job.offset.jdbc; import org.apache.doris.httpv2.entity.ResponseBody; +import org.apache.doris.httpv2.rest.RestApiStatusCode; import org.apache.doris.job.cdc.DataSourceConfigKeys; import org.apache.doris.job.cdc.request.CompareOffsetRequest; import org.apache.doris.job.cdc.request.FetchTableSplitsRequest; @@ -430,6 +431,10 @@ public void splitChunks(List createTbls) throws JobException { this.remainingSplits = tableSplits.values().stream() .flatMap(List::stream) .collect(Collectors.toList()); + } else { + // The source reader is automatically initialized when the split is obtained. + // In latest mode, a separate init is required.init source reader + initSourceReader(); } } @@ -490,6 +495,56 @@ private boolean checkNeedSplitChunks(Map sourceProperties) { return DataSourceConfigKeys.OFFSET_INITIAL.equalsIgnoreCase(startMode); } + /** + * Source reader needs to be initialized here. + * For example, PG slots need to be created first; + * otherwise, conflicts will occur in multi-backends scenarios. + */ + private void initSourceReader() throws JobException { + Backend backend = StreamingJobUtils.selectBackend(); + JobBaseConfig requestParams = new JobBaseConfig(getJobId(), sourceType.name(), sourceProperties); + InternalService.PRequestCdcClientRequest request = InternalService.PRequestCdcClientRequest.newBuilder() + .setApi("/api/initReader") + .setParams(new Gson().toJson(requestParams)).build(); + TNetworkAddress address = new TNetworkAddress(backend.getHost(), backend.getBrpcPort()); + InternalService.PRequestCdcClientResult result = null; + try { + Future future = + BackendServiceProxy.getInstance().requestCdcClient(address, request); + result = future.get(); + TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode()); + if (code != TStatusCode.OK) { + log.warn("Failed to init job {} reader, {}", getJobId(), result.getStatus().getErrorMsgs(0)); + throw new JobException( + "Failed to init source reader," + result.getStatus().getErrorMsgs(0) + ", response: " + + result.getResponse()); + } + String response = result.getResponse(); + try { + ResponseBody responseObj = objectMapper.readValue( + response, + new TypeReference>() { + } + ); + if (responseObj.getCode() == RestApiStatusCode.OK.code) { + log.info("Init {} source reader successfully, response: {}", getJobId(), responseObj.getData()); + return; + } else { + throw new JobException("Failed to init source reader, error: " + responseObj.getData()); + } + } catch (JobException jobex) { + log.warn("Failed to init {} source reader, {}", getJobId(), response); + throw new JobException(jobex.getMessage()); + } catch (Exception e) { + log.warn("Failed to init {} source reader, {}", getJobId(), response); + throw new JobException("Failed to init source reader, cause " + e.getMessage()); + } + } catch (ExecutionException | InterruptedException ex) { + log.warn("init source reader: ", ex); + throw new JobException(ex); + } + } + public void cleanMeta(Long jobId) throws JobException { // clean meta table StreamingJobUtils.deleteJobMeta(jobId); diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/controller/ClientController.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/controller/ClientController.java index 2ca45ad247458d..b3302e2c78519c 100644 --- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/controller/ClientController.java +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/controller/ClientController.java @@ -27,6 +27,8 @@ import org.apache.doris.job.cdc.request.JobBaseConfig; import org.apache.doris.job.cdc.request.WriteRecordRequest; +import org.apache.commons.lang3.exception.ExceptionUtils; + import java.util.List; import org.slf4j.Logger; @@ -44,6 +46,18 @@ public class ClientController { @Autowired private PipelineCoordinator pipelineCoordinator; + /** init source reader */ + @RequestMapping(path = "/api/initReader", method = RequestMethod.POST) + public Object initSourceReader(@RequestBody JobBaseConfig jobConfig) { + try { + SourceReader reader = Env.getCurrentEnv().getReader(jobConfig); + return RestResponse.success("Source reader initialized successfully"); + } catch (Exception ex) { + LOG.error("Failed to create reader, jobId={}", jobConfig.getJobId(), ex); + return RestResponse.internalError(ExceptionUtils.getRootCauseMessage(ex)); + } + } + /** Fetch source splits for snapshot */ @RequestMapping(path = "/api/fetchSplits", method = RequestMethod.POST) public Object fetchSplits(@RequestBody FetchTableSplitsRequest ftsReq) { diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java index 541e3354828820..70ab3961acc3a1 100644 --- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java @@ -172,7 +172,7 @@ public SplitReadResult readSplitRecords(JobBaseRecordRequest baseReq) throws Exc // build split Tuple2 splitFlag = createSourceSplit(offsetMeta, baseReq); split = splitFlag.f0; - closeBinlogReader(); + // closeBinlogReader(); currentSplitRecords = pollSplitRecordsWithSplit(split, baseReq); this.setCurrentSplitRecords(currentSplitRecords); this.setCurrentSplit(split); @@ -616,6 +616,9 @@ public boolean isSnapshotSplit(SourceSplit split) { @Override public void finishSplitRecords() { this.setCurrentSplitRecords(null); + // Close after each read, the binlog client will occupy the connection. + closeBinlogReader(); + this.setCurrentReader(null); } private Map getTableSchemas(JobBaseConfig config) { diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java index 27fbf3be88b363..a3f14a953b6231 100644 --- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java @@ -188,7 +188,7 @@ public SplitReadResult readSplitRecords(JobBaseRecordRequest baseReq) throws Exc Tuple2 splitFlag = createMySqlSplit(offsetMeta, baseReq); split = splitFlag.f0; // reset binlog reader - closeBinlogReader(); + // closeBinlogReader(); currentSplitRecords = pollSplitRecordsWithSplit(split, baseReq); this.setCurrentSplitRecords(currentSplitRecords); this.setCurrentSplit(split); @@ -718,6 +718,9 @@ public boolean isSnapshotSplit(SourceSplit split) { @Override public void finishSplitRecords() { this.setCurrentSplitRecords(null); + // Close after each read, the binlog client will occupy the connection. + closeBinlogReader(); + this.setCurrentReader(null); } @Override diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java index 52c3674444b37c..53b648bf38afcb 100644 --- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java @@ -107,6 +107,9 @@ private void createSlotForGlobalStreamSplit(PostgresDialect postgresDialect) { postgresDialect.getSlotName(), postgresDialect.getPluginName()); // skip creating the replication slot when the slot exists. if (slotInfo != null) { + LOG.info( + "The replication slot {} already exists, skip creating it.", + postgresDialect.getSlotName()); return; } PostgresReplicationConnection replicationConnection = diff --git a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_priv.groovy b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_priv.groovy index 23f3ce7f6793f9..d16bc57e73ecbe 100644 --- a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_priv.groovy +++ b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_priv.groovy @@ -197,12 +197,30 @@ suite("test_streaming_mysql_job_priv", "p0,external,mysql,external_docker,extern sql """FLUSH PRIVILEGES""" } - sleep(30000) + def jobSucceedTaskCnt = sql """ select SucceedTaskCount from jobs("type"="insert") where Name = '${jobName}' and ExecuteType='STREAMING' """ + log.info("jobSucceedTaskCnt: " + jobSucceedTaskCnt) + + try { + Awaitility.await().atMost(300, SECONDS) + .pollInterval(1, SECONDS).until( + { + def jobStatus = sql """ select status, SucceedTaskCount from jobs("type"="insert") where Name = '${jobName}' and ExecuteType='STREAMING' """ + log.info("jobStatus: " + jobStatus) + // check job status running and increase a success task + jobStatus.size() == 1 && 'RUNNING' == jobStatus.get(0).get(0) && jobStatus.get(0).get(1) > jobSucceedTaskCnt.get(0).get(0) + } + ) + } catch (Exception ex){ + def showjob = sql """select * from jobs("type"="insert") where Name='${jobName}'""" + def showtask = sql """select * from tasks("type"="insert") where JobName='${jobName}'""" + log.info("show job: " + showjob) + log.info("show task: " + showtask) + throw ex; + } // check incremental data qt_select """ SELECT * FROM ${tableName} order by name asc """ - sql """DROP USER IF EXISTS '${user}'""" sql """ DROP JOB IF EXISTS where jobname = '${jobName}' diff --git a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_priv.groovy b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_priv.groovy index 682e575596e823..9c0cd6a464c8ca 100644 --- a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_priv.groovy +++ b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_priv.groovy @@ -68,8 +68,9 @@ suite("test_streaming_postgres_job_priv", "p0,external,pg,external_docker,extern sql """GRANT SELECT, INSERT ON ALL TABLES IN SCHEMA ${pgSchema} TO ${newPgUser}""" } - // create job by new user - sql """CREATE JOB ${jobName} + test { + // create job by new user + sql """CREATE JOB ${jobName} ON STREAMING FROM POSTGRES ( "jdbc_url" = "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}", @@ -86,24 +87,7 @@ suite("test_streaming_postgres_job_priv", "p0,external,pg,external_docker,extern "table.create.properties.replication_num" = "1" ) """ - - // check job running - try { - Awaitility.await().atMost(300, SECONDS) - .pollInterval(1, SECONDS).until( - { - def jobStatus = sql """ select status, ErrorMsg from jobs("type"="insert") where Name = '${jobName}' and ExecuteType='STREAMING' """ - log.info("jobStatus: " + jobStatus) - // check job status - jobStatus.size() == 1 && 'PAUSED' == jobStatus.get(0).get(0) && jobStatus.get(0).get(1).contains("Failed to fetch meta") - } - ) - } catch (Exception ex){ - def showjob = sql """select * from jobs("type"="insert") where Name='${jobName}'""" - def showtask = sql """select * from tasks("type"="insert") where JobName='${jobName}'""" - log.info("show job: " + showjob) - log.info("show task: " + showtask) - throw ex; + exception "Failed to init source reader" } // grant replication to user @@ -112,6 +96,25 @@ suite("test_streaming_postgres_job_priv", "p0,external,pg,external_docker,extern } + // create job by new user + sql """CREATE JOB ${jobName} + ON STREAMING + FROM POSTGRES ( + "jdbc_url" = "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}", + "driver_url" = "${driver_url}", + "driver_class" = "org.postgresql.Driver", + "user" = "${newPgUser}", + "password" = "${newPgPassword}", + "database" = "${pgDB}", + "schema" = "${pgSchema}", + "include_tables" = "${tableName}", + "offset" = "latest" + ) + TO DATABASE ${currentDb} ( + "table.create.properties.replication_num" = "1" + ) + """ + Awaitility.await().atMost(300, SECONDS) .pollInterval(3, SECONDS).until( { @@ -135,7 +138,14 @@ suite("test_streaming_postgres_job_priv", "p0,external,pg,external_docker,extern sql """INSERT INTO ${pgDB}.${pgSchema}.${tableName} (name,age) VALUES ('Doris',18);""" } - sleep(30000) + Awaitility.await().atMost(300, SECONDS) + .pollInterval(3, SECONDS).until( + { + def jobSucceedTaskCount = sql """select SucceedTaskCount from jobs("type"="insert") where Name='${jobName}'""" + log.info("jobSucceedTaskCount: " + jobSucceedTaskCount) + jobSucceedTaskCount.size() == 1 && jobSucceedTaskCount.get(0).get(0) >= '2' + } + ) // check incremental data qt_select """ SELECT * FROM ${tableName} order by name asc """