diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java index 011894f6287447..d75427d7554af0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java @@ -1076,7 +1076,11 @@ public void processTimeoutTasks() throws JobException { StreamingMultiTblTask runningMultiTask = (StreamingMultiTblTask) this.runningStreamTask; if (TaskStatus.RUNNING.equals(runningMultiTask.getStatus()) && runningMultiTask.isTimeout()) { - runningMultiTask.onFail("task failed cause timeout"); + String timeoutReason = runningMultiTask.getTimeoutReason(); + if (StringUtils.isEmpty(timeoutReason)) { + timeoutReason = "task failed cause timeout"; + } + runningMultiTask.onFail(timeoutReason); // renew streaming task by auto resume } } finally { diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java index 07d9acf9d3f99c..08f1bb5ccaf6fe 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java @@ -69,6 +69,7 @@ public class StreamingMultiTblTask extends AbstractStreamingTask { private long scannedRows = 0L; private long scannedBytes = 0L; private long timeoutMs; + private long runningBackendId; public StreamingMultiTblTask(Long jobId, long taskId, @@ -113,6 +114,7 @@ public void run() throws JobException { private void sendWriteRequest() throws JobException { Backend backend = StreamingJobUtils.selectBackend(); + this.runningBackendId = backend.getId(); WriteRecordRequest params = buildRequestParams(); InternalService.PRequestCdcClientRequest request = InternalService.PRequestCdcClientRequest.newBuilder() .setApi("/api/writeRecords") @@ -264,6 +266,50 @@ public boolean isTimeout() { return (System.currentTimeMillis() - startTimeMs) > timeoutMs; } + /** + * When a task encounters a write error, it will time out. + * The job needs to obtain the reason for the timeout, + * such as a data quality error, and needs to expose it to the user. + */ + public String getTimeoutReason() { + try { + if (runningBackendId <= 0) { + log.info("No running backend for task {}", runningBackendId); + return ""; + } + Backend backend = Env.getCurrentSystemInfo().getBackend(runningBackendId); + InternalService.PRequestCdcClientRequest request = InternalService.PRequestCdcClientRequest.newBuilder() + .setApi("/api/getFailReason/" + getTaskId()) + .build(); + TNetworkAddress address = new TNetworkAddress(backend.getHost(), backend.getBrpcPort()); + InternalService.PRequestCdcClientResult result = null; + Future future = + BackendServiceProxy.getInstance().requestCdcClient(address, request); + result = future.get(); + TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode()); + if (code != TStatusCode.OK) { + log.warn("Failed to get task timeout reason, {}", result.getStatus().getErrorMsgs(0)); + return ""; + } + String response = result.getResponse(); + try { + ResponseBody responseObj = objectMapper.readValue( + response, + new TypeReference>() { + } + ); + if (responseObj.getCode() == RestApiStatusCode.OK.code) { + return responseObj.getData(); + } + } catch (JsonProcessingException e) { + log.warn("Failed to get task timeout reason, response: {}", response); + } + } catch (ExecutionException | InterruptedException ex) { + log.error("Send get fail reason request failed: ", ex); + } + return ""; + } + @Override public TRow getTvfInfo(String jobName) { TRow trow = super.getTvfInfo(jobName); diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/controller/ClientController.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/controller/ClientController.java index 2f444260559003..2ca45ad247458d 100644 --- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/controller/ClientController.java +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/controller/ClientController.java @@ -32,6 +32,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; @@ -105,4 +106,10 @@ public Object close(@RequestBody JobBaseConfig jobConfig) { pipelineCoordinator.closeJobStreamLoad(jobConfig.getJobId()); return RestResponse.success(true); } + + /** get task fail reason */ + @RequestMapping(path = "/api/getFailReason/{taskId}", method = RequestMethod.POST) + public Object getFailReason(@PathVariable("taskId") String taskId) { + return RestResponse.success(pipelineCoordinator.getTaskFailReason(taskId)); + } } diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java index 591c4790e6ca42..f7f41e43dd7f82 100644 --- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java @@ -28,6 +28,7 @@ import org.apache.doris.job.cdc.split.SnapshotSplit; import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.flink.api.connector.source.SourceSplit; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.source.SourceRecord; @@ -56,6 +57,8 @@ public class PipelineCoordinator { private static final String SPLIT_ID = "splitId"; // jobId private final Map batchStreamLoadMap = new ConcurrentHashMap<>(); + // taskId -> writeFailReason + private final Map taskErrorMaps = new ConcurrentHashMap<>(); private final ThreadPoolExecutor executor; private static final int MAX_CONCURRENT_TASKS = 10; private static final int QUEUE_CAPACITY = 128; @@ -173,6 +176,8 @@ public CompletableFuture writeRecordsAsync(WriteRecordRequest writeRecordR writeRecordRequest.getTaskId()); } catch (Exception ex) { closeJobStreamLoad(writeRecordRequest.getJobId()); + String rootCauseMessage = ExceptionUtils.getRootCauseMessage(ex); + taskErrorMaps.put(writeRecordRequest.getTaskId(), rootCauseMessage); LOG.error( "Failed to process async write record, jobId={} taskId={}", writeRecordRequest.getJobId(), @@ -300,4 +305,9 @@ private String extractTable(SourceRecord record) { Struct value = (Struct) record.value(); return value.getStruct(Envelope.FieldName.SOURCE).getString("table"); } + + public String getTaskFailReason(String taskId) { + String taskReason = taskErrorMaps.remove(taskId); + return taskReason == null ? "" : taskReason; + } } diff --git a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_errormsg.groovy b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_errormsg.groovy new file mode 100644 index 00000000000000..f063b629f90c26 --- /dev/null +++ b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_errormsg.groovy @@ -0,0 +1,111 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +import org.awaitility.Awaitility + +import static java.util.concurrent.TimeUnit.SECONDS + +suite("test_streaming_mysql_job_errormsg", "p0,external,mysql,external_docker,external_docker_mysql,nondatalake") { + def jobName = "test_streaming_mysql_job_errormsg" + def currentDb = (sql "select database()")[0][0] + def table1 = "user_info_errormsg" + def mysqlDb = "test_cdc_db" + + sql """DROP JOB IF EXISTS where jobname = '${jobName}'""" + sql """drop table if exists ${currentDb}.${table1} force""" + + // Pre-create table2 + sql """ + CREATE TABLE IF NOT EXISTS ${currentDb}.${table1} ( + `name` varchar(2) NOT NULL, + `age` int NOT NULL + ) ENGINE=OLAP + UNIQUE KEY(`name`) + COMMENT 'OLAP' + DISTRIBUTED BY HASH(`name`) BUCKETS AUTO + PROPERTIES ("replication_allocation" = "tag.location.default: 1"); + """ + + String enabled = context.config.otherConfigs.get("enableJdbcTest") + if (enabled != null && enabled.equalsIgnoreCase("true")) { + String mysql_port = context.config.otherConfigs.get("mysql_57_port"); + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + String s3_endpoint = getS3Endpoint() + String bucket = getS3BucketName() + String driver_url = "https://${bucket}.${s3_endpoint}/regression/jdbc_driver/mysql-connector-j-8.4.0.jar" + + // create test + connect("root", "123456", "jdbc:mysql://${externalEnvIp}:${mysql_port}") { + sql """CREATE DATABASE IF NOT EXISTS ${mysqlDb}""" + sql """DROP TABLE IF EXISTS ${mysqlDb}.${table1}""" + sql """CREATE TABLE ${mysqlDb}.${table1} ( + `name` varchar(200) NOT NULL, + `age` varchar(8) NOT NULL, + PRIMARY KEY (`name`) + ) ENGINE=InnoDB""" + sql """INSERT INTO ${mysqlDb}.${table1} (name, age) VALUES ('ABCDEFG', 'abc');""" + } + + sql """CREATE JOB ${jobName} + ON STREAMING + FROM MYSQL ( + "jdbc_url" = "jdbc:mysql://${externalEnvIp}:${mysql_port}", + "driver_url" = "${driver_url}", + "driver_class" = "com.mysql.cj.jdbc.Driver", + "user" = "root", + "password" = "123456", + "database" = "${mysqlDb}", + "include_tables" = "${table1}", + "offset" = "initial" + ) + TO DATABASE ${currentDb} ( + "table.create.properties.replication_num" = "1" + ) + """ + + // check job running + try { + Awaitility.await().atMost(300, SECONDS) + .pollInterval(1, SECONDS).until( + { + def jobFailCount = sql """ select FailedTaskCount, CanceledTaskCount, SucceedTaskCount from jobs("type"="insert") where Name = '${jobName}' and ExecuteType='STREAMING' """ + log.info("jobFailCount: " + jobFailCount) + // check job status and faile task count larger than 1 + jobFailCount.size() == 1 && ('1' <= jobFailCount.get(0).get(0) || '1' <= jobFailCount.get(0).get(1)) + } + ) + } catch (Exception ex){ + def showjob = sql """select * from jobs("type"="insert") where Name='${jobName}'""" + def showtask = sql """select * from tasks("type"="insert") where JobName='${jobName}'""" + log.info("show job: " + showjob) + log.info("show task: " + showtask) + throw ex; + } + + def jobFailMsg = sql """select errorMsg from jobs("type"="insert") where Name = '${jobName}' and ExecuteType='STREAMING'""" + log.info("jobFailMsg: " + jobFailMsg) + assert jobFailMsg.get(0).get(0).contains("stream load error") + + sql """ + DROP JOB IF EXISTS where jobname = '${jobName}' + """ + + def jobCountRsp = sql """select count(1) from jobs("type"="insert") where Name ='${jobName}'""" + assert jobCountRsp.get(0).get(0) == 0 + } +}