From aaaba135714fb9c3521727876c3b020a2c57732f Mon Sep 17 00:00:00 2001 From: hui lai Date: Thu, 24 Jul 2025 11:06:02 +0800 Subject: [PATCH] [fix](load) fix multi table load plan fail after restart master Fe or leader change (#53799) multi table load plan fail after restart master Fe or leader change: ``` mysql> show routine load for test_multi_table\G *************************** Id: 1753247186255 Name: test2 CreateTime: 2025-07-23 13:06:53 PauseTime: NULL EndTime: NULL DbName: db TableName: IsMultiTable: true State: RUNNING DataSourceType: KAFKA CurrentTaskNum: 1 JobProperties: {"max_batch_rows": "3000000","timezone":"Asia/Shanghai","send_batch_parallelism":"1","loadd_to_single_tablet":"false","column_separator":";'''","line_delimiter":"\n","delete":"*"," current_concurrent_number":"1","partial_columns":"false","merge_type":"APPEND","exec_mem_limit":"2147483648","strict_mode":"false","max_batch_interval": 20","max_batch_size": "209715200","esscape":"\u 0000","enclose":"\u0000","partitions":"**","columnToColumnExpr":"","whereExpr":"*****'',"desired_concurrent_number":"256","precedingFilter":"*","format":"csv","max_error_number":"0","max_filter_ratio":"1. 0","sequence_col":"****} DataSourceProperties: {"topic":"my-topic","currentkafkaPartitions": "0", "brokerList": "10.16.10.10.10.77:19092"} CustomProperties: {"kafka_default_offsets":"OFFSET_BEGINNING","group.id": "test2_7f6143d8-f270-4667-851a-e8fb87c27d32"} Statistic: {"receivedBytes":89,"runningTxns": [1542060502549504],"errorRows":0, "committedTaskNum":0, "loadedRows":1,"LoadRowsRate":0,"abortedTaskNum":7,"errorRowsAfterResumed":0,"totalRows" :1,"unselectedRows":0,"receivedBytesRate":1,"taskExecuteTimeMs":51588} Progress: {"0":"0"} Lag: {"0":1} ReasonOfStateChanged: ErrorLogUrls: OtherMsg: 2025-07-23 13:08:07: [INTERNAL_ERROR]TStatus:AnalysisException: errCode = 2, detailMessage = , connect context's user is null, ComputeGroupException: CURRENT_USER_NO_AUTH_TO_US E_DEFAULT_COMPUTE_GROUP, you can contact the system admministrator and request that they grant you the defaultcompute group permissions, use SQL 'SHOW PROPERTY like'default_compute_group'` and NT USAGE_PRIV ON COMPUTE GROUP {compute_group_name}TO{user} GRA 0# # doris::Status doris::Status::create(doris::TStatus const&) at /mnt/disk1/laihui/build/ldb_toolchain/bin/../lib/gcc/x86_64-pc-linux-gnu/114/include/g++-v14/bits/basic_string.h:228 1# doris::io::MultiTablePipe::request_and_exec_plans() at /mnt/disk1/laihui/doris/be/src/common/status.h:522 2# doris: RoutineLoadTaskExecutor::exec_task(std::shared_ptr, doris::DataConsumerPool*, std::function)>) at /mnt/di sk1/laihui/doris/be/src/runtime/routine_load/routine_load_task_executor.cpp:0 3# std::_Function_handler - [ ] Regression test - [ ] Unit Test - [ ] Manual test (add detailed scripts or steps below) - [ ] No need to test or manual test. Explain why: - [ ] This is a refactor/code format and no logic has been changed. - [ ] Previous test can cover this change. - [ ] No code files have been changed. - [ ] Other reason - Behavior changed: - [ ] No. - [ ] Yes. - Does this need documentation? - [ ] No. - [ ] Yes. - [ ] Confirm the release note - [ ] Confirm test cases - [ ] Confirm document - [ ] Add branch pick label --- .../doris/load/routineload/KafkaTaskInfo.java | 2 +- .../load/routineload/RoutineLoadJob.java | 6 - .../test_multi_table_load_restart.groovy | 148 ++++++++++++++++++ 3 files changed, 149 insertions(+), 7 deletions(-) create mode 100644 regression-test/suites/load_p0/routine_load/test_multi_table_load_restart.groovy diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java index e3292dc671f8b4..0474f0d4fdc956 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java @@ -109,7 +109,7 @@ public TRoutineLoadTask createRoutineLoadTask() throws UserException { tRoutineLoadTask.setFormat(TFileFormatType.FORMAT_CSV_PLAIN); } tRoutineLoadTask.setMemtableOnSinkNode(routineLoadJob.isMemtableOnSinkNode()); - tRoutineLoadTask.setQualifiedUser(routineLoadJob.getQualifiedUser()); + tRoutineLoadTask.setQualifiedUser(routineLoadJob.getUserIdentity().getQualifiedUser()); tRoutineLoadTask.setCloudCluster(routineLoadJob.getCloudCluster()); return tRoutineLoadTask; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java index 30ef3f54f1aec2..9c83bb6e9f9971 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java @@ -292,7 +292,6 @@ public boolean isFinalState() { protected byte escape = 0; // use for cloud cluster mode - protected String qualifiedUser; protected String cloudCluster; public void setTypeRead(boolean isTypeRead) { @@ -343,7 +342,6 @@ public RoutineLoadJob(Long id, String name, SessionVariable var = ConnectContext.get().getSessionVariable(); sessionVariables.put(SessionVariable.SQL_MODE, Long.toString(var.getSqlMode())); this.memtableOnSinkNode = ConnectContext.get().getSessionVariable().enableMemtableOnSinkNode; - this.qualifiedUser = ConnectContext.get().getQualifiedUser(); try { this.cloudCluster = ConnectContext.get().getCloudCluster(); } catch (ComputeGroupException e) { @@ -789,10 +787,6 @@ public void setComment(String comment) { this.comment = comment; } - public String getQualifiedUser() { - return qualifiedUser; - } - public String getCloudCluster() { return cloudCluster; } diff --git a/regression-test/suites/load_p0/routine_load/test_multi_table_load_restart.groovy b/regression-test/suites/load_p0/routine_load/test_multi_table_load_restart.groovy new file mode 100644 index 00000000000000..d89f513eb5d0e6 --- /dev/null +++ b/regression-test/suites/load_p0/routine_load/test_multi_table_load_restart.groovy @@ -0,0 +1,148 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.doris.regression.suite.ClusterOptions +import org.apache.kafka.clients.admin.AdminClient +import org.apache.kafka.clients.producer.KafkaProducer +import org.apache.kafka.clients.producer.ProducerRecord +import org.apache.kafka.clients.producer.ProducerConfig + +suite("test_multi_table_load_restart","docker") { + def options = new ClusterOptions() + options.cloudMode = true + docker(options) { + def kafkaCsvTpoics = [ + "test_multi_table_load_restart", + ] + String enabled = context.config.otherConfigs.get("enableKafkaTest") + String kafka_port = context.config.otherConfigs.get("kafka_port") + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + def kafka_broker = "${externalEnvIp}:${kafka_port}" + + if (enabled != null && enabled.equalsIgnoreCase("true")) { + def props = new Properties() + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "${kafka_broker}".toString()) + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") + props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000") + props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000") + def verifyKafkaConnection = { prod -> + try { + logger.info("=====try to connect Kafka========") + def partitions = prod.partitionsFor("__connection_verification_topic") + return partitions != null + } catch (Exception e) { + throw new Exception("Kafka connect fail: ${e.message}".toString()) + } + } + def producer = new KafkaProducer<>(props) + try { + logger.info("Kafka connecting: ${kafka_broker}") + if (!verifyKafkaConnection(producer)) { + throw new Exception("can't get any kafka info") + } + } catch (Exception e) { + logger.error("FATAL: " + e.getMessage()) + producer.close() + throw e + } + logger.info("Kafka connect success") + for (String kafkaCsvTopic in kafkaCsvTpoics) { + def testData = [ + "test_multi_table_load_restart|1,test_data_1,2023-01-01,value1,2023-01-01 10:00:00,extra1" + ] + testData.each { line -> + logger.info("Sending data to kafka: ${line}") + def record = new ProducerRecord<>(kafkaCsvTopic, null, line) + producer.send(record) + } + } + + def tableName = "test_multi_table_load_restart" + def job = "test_multi_table_load_restart" + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + `k1` int(20) NULL, + `k2` string NULL, + `v1` date NULL, + `v2` string NULL, + `v3` datetime NULL, + `v4` string NULL + ) ENGINE=OLAP + DUPLICATE KEY(`k1`) + COMMENT 'OLAP' + DISTRIBUTED BY HASH(`k1`) BUCKETS 3 + PROPERTIES ("replication_allocation" = "tag.location.default: 1"); + """ + + try { + sql """ + CREATE ROUTINE LOAD ${job} + COLUMNS TERMINATED BY "," + FROM KAFKA + ( + "kafka_broker_list" = "${kafka_broker}", + "kafka_topic" = "${kafkaCsvTpoics[0]}", + "property.kafka_default_offsets" = "OFFSET_BEGINNING" + ); + """ + sql "sync" + + cluster.restartFrontends() + sleep(30000) + context.reconnectFe() + + for (String kafkaCsvTopic in kafkaCsvTpoics) { + def testData = [ + "test_multi_table_load_restart|2,test_data_1,2023-01-01,value1,2023-01-01 10:00:00,extra1" + ] + testData.each { line -> + logger.info("Sending data to kafka: ${line}") + def record = new ProducerRecord<>(kafkaCsvTopic, null, line) + producer.send(record) + } + } + producer.close() + + def count = 0 + def maxWaitCount = 60 + while (count < maxWaitCount) { + def state = sql "show routine load for ${job}" + def routineLoadState = state[0][8].toString() + def statistic = state[0][14].toString() + logger.info("Routine load state: ${routineLoadState}") + logger.info("Routine load statistic: ${statistic}") + def rowCount = sql "select count(*) from ${tableName}" + if (routineLoadState == "RUNNING" && rowCount[0][0] == 2) { + break + } + sleep(1000) + count++ + } + } catch (Exception e) { + logger.error("Test failed with exception: ${e.message}") + } finally { + try { + sql "stop routine load for ${job}" + } catch (Exception e) { + logger.warn("Failed to stop routine load job: ${e.message}") + } + } + } + } +} \ No newline at end of file