Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1076,7 +1076,11 @@ public void processTimeoutTasks() throws JobException {
StreamingMultiTblTask runningMultiTask = (StreamingMultiTblTask) this.runningStreamTask;
if (TaskStatus.RUNNING.equals(runningMultiTask.getStatus())
&& runningMultiTask.isTimeout()) {
runningMultiTask.onFail("task failed cause timeout");
String timeoutReason = runningMultiTask.getTimeoutReason();
if (StringUtils.isEmpty(timeoutReason)) {
timeoutReason = "task failed cause timeout";
}
runningMultiTask.onFail(timeoutReason);
// renew streaming task by auto resume
}
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ public class StreamingMultiTblTask extends AbstractStreamingTask {
private long scannedRows = 0L;
private long scannedBytes = 0L;
private long timeoutMs;
private long runningBackendId;

public StreamingMultiTblTask(Long jobId,
long taskId,
Expand Down Expand Up @@ -113,6 +114,7 @@ public void run() throws JobException {

private void sendWriteRequest() throws JobException {
Backend backend = StreamingJobUtils.selectBackend();
this.runningBackendId = backend.getId();
WriteRecordRequest params = buildRequestParams();
InternalService.PRequestCdcClientRequest request = InternalService.PRequestCdcClientRequest.newBuilder()
.setApi("/api/writeRecords")
Expand Down Expand Up @@ -264,6 +266,50 @@ public boolean isTimeout() {
return (System.currentTimeMillis() - startTimeMs) > timeoutMs;
}

/**
* When a task encounters a write error, it will time out.
* The job needs to obtain the reason for the timeout,
* such as a data quality error, and needs to expose it to the user.
*/
public String getTimeoutReason() {
try {
if (runningBackendId <= 0) {
log.info("No running backend for task {}", runningBackendId);
return "";
}
Backend backend = Env.getCurrentSystemInfo().getBackend(runningBackendId);
InternalService.PRequestCdcClientRequest request = InternalService.PRequestCdcClientRequest.newBuilder()
.setApi("/api/getFailReason/" + getTaskId())
.build();
TNetworkAddress address = new TNetworkAddress(backend.getHost(), backend.getBrpcPort());
InternalService.PRequestCdcClientResult result = null;
Future<PRequestCdcClientResult> future =
BackendServiceProxy.getInstance().requestCdcClient(address, request);
result = future.get();
TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode());
if (code != TStatusCode.OK) {
log.warn("Failed to get task timeout reason, {}", result.getStatus().getErrorMsgs(0));
return "";
}
String response = result.getResponse();
try {
ResponseBody<String> responseObj = objectMapper.readValue(
response,
new TypeReference<ResponseBody<String>>() {
}
);
if (responseObj.getCode() == RestApiStatusCode.OK.code) {
return responseObj.getData();
}
} catch (JsonProcessingException e) {
log.warn("Failed to get task timeout reason, response: {}", response);
}
} catch (ExecutionException | InterruptedException ex) {
log.error("Send get fail reason request failed: ", ex);
}
return "";
}

@Override
public TRow getTvfInfo(String jobName) {
TRow trow = super.getTvfInfo(jobName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
Expand Down Expand Up @@ -105,4 +106,10 @@ public Object close(@RequestBody JobBaseConfig jobConfig) {
pipelineCoordinator.closeJobStreamLoad(jobConfig.getJobId());
return RestResponse.success(true);
}

/** get task fail reason */
@RequestMapping(path = "/api/getFailReason/{taskId}", method = RequestMethod.POST)
public Object getFailReason(@PathVariable("taskId") String taskId) {
return RestResponse.success(pipelineCoordinator.getTaskFailReason(taskId));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.doris.job.cdc.split.SnapshotSplit;

import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
Expand Down Expand Up @@ -56,6 +57,8 @@ public class PipelineCoordinator {
private static final String SPLIT_ID = "splitId";
// jobId
private final Map<Long, DorisBatchStreamLoad> batchStreamLoadMap = new ConcurrentHashMap<>();
// taskId -> writeFailReason
private final Map<String, String> taskErrorMaps = new ConcurrentHashMap<>();
private final ThreadPoolExecutor executor;
private static final int MAX_CONCURRENT_TASKS = 10;
private static final int QUEUE_CAPACITY = 128;
Expand Down Expand Up @@ -173,6 +176,8 @@ public CompletableFuture<Void> writeRecordsAsync(WriteRecordRequest writeRecordR
writeRecordRequest.getTaskId());
} catch (Exception ex) {
closeJobStreamLoad(writeRecordRequest.getJobId());
String rootCauseMessage = ExceptionUtils.getRootCauseMessage(ex);
taskErrorMaps.put(writeRecordRequest.getTaskId(), rootCauseMessage);
LOG.error(
"Failed to process async write record, jobId={} taskId={}",
writeRecordRequest.getJobId(),
Expand Down Expand Up @@ -300,4 +305,9 @@ private String extractTable(SourceRecord record) {
Struct value = (Struct) record.value();
return value.getStruct(Envelope.FieldName.SOURCE).getString("table");
}

public String getTaskFailReason(String taskId) {
String taskReason = taskErrorMaps.remove(taskId);
return taskReason == null ? "" : taskReason;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.


import org.awaitility.Awaitility

import static java.util.concurrent.TimeUnit.SECONDS

suite("test_streaming_mysql_job_errormsg", "p0,external,mysql,external_docker,external_docker_mysql,nondatalake") {
def jobName = "test_streaming_mysql_job_errormsg"
def currentDb = (sql "select database()")[0][0]
def table1 = "user_info_errormsg"
def mysqlDb = "test_cdc_db"

sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
sql """drop table if exists ${currentDb}.${table1} force"""

// Pre-create table2
sql """
CREATE TABLE IF NOT EXISTS ${currentDb}.${table1} (
`name` varchar(2) NOT NULL,
`age` int NOT NULL
) ENGINE=OLAP
UNIQUE KEY(`name`)
COMMENT 'OLAP'
DISTRIBUTED BY HASH(`name`) BUCKETS AUTO
PROPERTIES ("replication_allocation" = "tag.location.default: 1");
"""

String enabled = context.config.otherConfigs.get("enableJdbcTest")
if (enabled != null && enabled.equalsIgnoreCase("true")) {
String mysql_port = context.config.otherConfigs.get("mysql_57_port");
String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
String s3_endpoint = getS3Endpoint()
String bucket = getS3BucketName()
String driver_url = "https://${bucket}.${s3_endpoint}/regression/jdbc_driver/mysql-connector-j-8.4.0.jar"

// create test
connect("root", "123456", "jdbc:mysql://${externalEnvIp}:${mysql_port}") {
sql """CREATE DATABASE IF NOT EXISTS ${mysqlDb}"""
sql """DROP TABLE IF EXISTS ${mysqlDb}.${table1}"""
sql """CREATE TABLE ${mysqlDb}.${table1} (
`name` varchar(200) NOT NULL,
`age` varchar(8) NOT NULL,
PRIMARY KEY (`name`)
) ENGINE=InnoDB"""
sql """INSERT INTO ${mysqlDb}.${table1} (name, age) VALUES ('ABCDEFG', 'abc');"""
}

sql """CREATE JOB ${jobName}
ON STREAMING
FROM MYSQL (
"jdbc_url" = "jdbc:mysql://${externalEnvIp}:${mysql_port}",
"driver_url" = "${driver_url}",
"driver_class" = "com.mysql.cj.jdbc.Driver",
"user" = "root",
"password" = "123456",
"database" = "${mysqlDb}",
"include_tables" = "${table1}",
"offset" = "initial"
)
TO DATABASE ${currentDb} (
"table.create.properties.replication_num" = "1"
)
"""

// check job running
try {
Awaitility.await().atMost(300, SECONDS)
.pollInterval(1, SECONDS).until(
{
def jobFailCount = sql """ select FailedTaskCount, CanceledTaskCount, SucceedTaskCount from jobs("type"="insert") where Name = '${jobName}' and ExecuteType='STREAMING' """
log.info("jobFailCount: " + jobFailCount)
// check job status and faile task count larger than 1
jobFailCount.size() == 1 && ('1' <= jobFailCount.get(0).get(0) || '1' <= jobFailCount.get(0).get(1))
}
)
} catch (Exception ex){
def showjob = sql """select * from jobs("type"="insert") where Name='${jobName}'"""
def showtask = sql """select * from tasks("type"="insert") where JobName='${jobName}'"""
log.info("show job: " + showjob)
log.info("show task: " + showtask)
throw ex;
}

def jobFailMsg = sql """select errorMsg from jobs("type"="insert") where Name = '${jobName}' and ExecuteType='STREAMING'"""
log.info("jobFailMsg: " + jobFailMsg)
assert jobFailMsg.get(0).get(0).contains("stream load error")

sql """
DROP JOB IF EXISTS where jobname = '${jobName}'
"""

def jobCountRsp = sql """select count(1) from jobs("type"="insert") where Name ='${jobName}'"""
assert jobCountRsp.get(0).get(0) == 0
}
}
Loading