From 958ab7fc65dd75465f5488d6e04f9f27f35ad0b9 Mon Sep 17 00:00:00 2001 From: laihui <1353307710@qq.com> Date: Sat, 24 Aug 2024 11:11:08 +0800 Subject: [PATCH] fix alter consume offset can not work in cloud mode --- cloud/src/meta-service/meta_service_txn.cpp | 64 ++++++- .../load/routineload/KafkaRoutineLoadJob.java | 30 ++-- gensrc/proto/cloud.proto | 1 + .../routine_load/test_routine_load_alter.out | 12 ++ .../data/test_routine_load_alter.csv | 3 + .../test_routine_load_alter.groovy | 156 ++++++++++++++++++ 6 files changed, 248 insertions(+), 18 deletions(-) create mode 100644 regression-test/data/load_p0/routine_load/test_routine_load_alter.out create mode 100644 regression-test/suites/load_p0/routine_load/data/test_routine_load_alter.csv create mode 100644 regression-test/suites/load_p0/routine_load/test_routine_load_alter.groovy diff --git a/cloud/src/meta-service/meta_service_txn.cpp b/cloud/src/meta-service/meta_service_txn.cpp index c3c107de0defe1..d3fd5848378737 100644 --- a/cloud/src/meta-service/meta_service_txn.cpp +++ b/cloud/src/meta-service/meta_service_txn.cpp @@ -684,16 +684,64 @@ void MetaServiceImpl::reset_rl_progress(::google::protobuf::RpcController* contr std::string rl_progress_val; RLJobProgressKeyInfo rl_progress_key_info {instance_id, db_id, job_id}; rl_job_progress_key_info(rl_progress_key_info, &rl_progress_key); - txn->remove(rl_progress_key); + + if (request->partition_to_offset().size() == 0) { + txn->remove(rl_progress_key); + } + + if (request->partition_to_offset().size() > 0) { + bool prev_progress_existed = true; + RoutineLoadProgressPB prev_progress_info; + TxnErrorCode err = txn->get(rl_progress_key, &rl_progress_val); + if (err != TxnErrorCode::TXN_OK) { + if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) { + prev_progress_existed = false; + } else { + code = cast_as(err); + ss << "failed to get routine load progress, db_id=" << db_id << "job_id=" << job_id + << " err=" << err; + msg = ss.str(); + return; + } + } + if (prev_progress_existed) { + if (!prev_progress_info.ParseFromString(rl_progress_val)) { + code = MetaServiceCode::PROTOBUF_PARSE_ERR; + ss << "failed to parse routine load progress, db_id=" << db_id + << "job_id=" << job_id; + msg = ss.str(); + return; + } + } + + std::string new_progress_val; + RoutineLoadProgressPB new_progress_info; + for (auto const& elem : request->partition_to_offset()) { + new_progress_info.mutable_partition_to_offset()->insert(elem); + } + if (request->partition_to_offset().size() > 0) { + for (auto const& elem : prev_progress_info.partition_to_offset()) { + auto it = new_progress_info.partition_to_offset().find(elem.first); + if (it == new_progress_info.partition_to_offset().end()) { + new_progress_info.mutable_partition_to_offset()->insert(elem); + } + } + } + + if (!new_progress_info.SerializeToString(&new_progress_val)) { + code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR; + ss << "failed to serialize new progress val" + << "db_id=" << db_id << "job_id=" << job_id; + msg = ss.str(); + return; + } + txn->put(rl_progress_key, new_progress_val); + } + err = txn->commit(); - if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) { - code = MetaServiceCode::ROUTINE_LOAD_PROGRESS_NOT_FOUND; - ss << "progress info not found, db_id=" << db_id << " job_id=" << job_id << " err=" << err; - msg = ss.str(); - return; - } else if (err != TxnErrorCode::TXN_OK) { + if (err != TxnErrorCode::TXN_OK) { code = cast_as(err); - ss << "failed to remove progress info, db_id=" << db_id << " job_id=" << job_id + ss << "failed to commit progress info, db_id=" << db_id << " job_id=" << job_id << " err=" << err; msg = ss.str(); return; diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java index 7f464f3cbec6fb..292f87f8a22924 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java @@ -723,12 +723,28 @@ private void modifyPropertiesInternal(Map jobProperties, ((KafkaProgress) progress).checkPartitions(kafkaPartitionOffsets); } + if (Config.isCloudMode()) { + Cloud.ResetRLProgressRequest.Builder builder = Cloud.ResetRLProgressRequest.newBuilder(); + builder.setCloudUniqueId(Config.cloud_unique_id); + builder.setDbId(dbId); + builder.setJobId(id); + if (!kafkaPartitionOffsets.isEmpty()) { + Map partitionOffsetMap = new HashMap<>(); + for (Pair pair : kafkaPartitionOffsets) { + // The reason why the value recorded in MS in cloud mode needs to be subtracted by one is + // this value will be incremented + // when pulling MS persistent progress data and updating memory + // in routineLoadJob.updateCloudProgress(). + partitionOffsetMap.put(pair.first, pair.second - 1); + } + builder.putAllPartitionToOffset(partitionOffsetMap); + } + resetCloudProgress(builder); + } + // It is necessary to reset the Kafka progress cache if topic change, // and should reset cache before modifying partition offset. if (!Strings.isNullOrEmpty(dataSourceProperties.getTopic())) { - if (Config.isCloudMode()) { - resetCloudProgress(); - } this.topic = dataSourceProperties.getTopic(); this.progress = new KafkaProgress(); } @@ -756,13 +772,7 @@ private void modifyPropertiesInternal(Map jobProperties, this.id, jobProperties, dataSourceProperties); } - private void resetCloudProgress() throws DdlException { - Cloud.ResetRLProgressRequest.Builder builder = - Cloud.ResetRLProgressRequest.newBuilder(); - builder.setCloudUniqueId(Config.cloud_unique_id); - builder.setDbId(dbId); - builder.setJobId(id); - + private void resetCloudProgress(Cloud.ResetRLProgressRequest.Builder builder) throws DdlException { Cloud.ResetRLProgressResponse response; try { response = MetaServiceProxy.getInstance().resetRLProgress(builder.build()); diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto index 1542fd18253fd7..132e649f830c0b 100644 --- a/gensrc/proto/cloud.proto +++ b/gensrc/proto/cloud.proto @@ -1443,6 +1443,7 @@ message ResetRLProgressRequest { optional string cloud_unique_id = 1; // For auth optional int64 db_id = 2; optional int64 job_id = 3; + map partition_to_offset = 4; } message ResetRLProgressResponse { diff --git a/regression-test/data/load_p0/routine_load/test_routine_load_alter.out b/regression-test/data/load_p0/routine_load/test_routine_load_alter.out new file mode 100644 index 00000000000000..427cdb2439463d --- /dev/null +++ b/regression-test/data/load_p0/routine_load/test_routine_load_alter.out @@ -0,0 +1,12 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql_before -- +3 eab 2023-07-15 def 2023-07-20T05:48:31 "ghi" + +-- !sql_alter_after -- +1 eab 2023-07-15 def 2023-07-20T05:48:31 "ghi" +2 eab 2023-07-15 def 2023-07-20T05:48:31 "ghi" +2 eab 2023-07-15 def 2023-07-20T05:48:31 "ghi" +3 eab 2023-07-15 def 2023-07-20T05:48:31 "ghi" +3 eab 2023-07-15 def 2023-07-20T05:48:31 "ghi" +3 eab 2023-07-15 def 2023-07-20T05:48:31 "ghi" + diff --git a/regression-test/suites/load_p0/routine_load/data/test_routine_load_alter.csv b/regression-test/suites/load_p0/routine_load/data/test_routine_load_alter.csv new file mode 100644 index 00000000000000..ddc942ccfdc60c --- /dev/null +++ b/regression-test/suites/load_p0/routine_load/data/test_routine_load_alter.csv @@ -0,0 +1,3 @@ +1,eab,2023-07-15,def,2023-07-20:05:48:31,"ghi" +2,eab,2023-07-15,def,2023-07-20:05:48:31,"ghi" +3,eab,2023-07-15,def,2023-07-20:05:48:31,"ghi" \ No newline at end of file diff --git a/regression-test/suites/load_p0/routine_load/test_routine_load_alter.groovy b/regression-test/suites/load_p0/routine_load/test_routine_load_alter.groovy new file mode 100644 index 00000000000000..e1cfa4ae9aaefc --- /dev/null +++ b/regression-test/suites/load_p0/routine_load/test_routine_load_alter.groovy @@ -0,0 +1,156 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.kafka.clients.admin.AdminClient +import org.apache.kafka.clients.producer.KafkaProducer +import org.apache.kafka.clients.producer.ProducerRecord +import org.apache.kafka.clients.producer.ProducerConfig + +suite("test_routine_load_alter","p0") { + def kafkaCsvTpoics = [ + "test_routine_load_alter", + ] + String enabled = context.config.otherConfigs.get("enableKafkaTest") + String kafka_port = context.config.otherConfigs.get("kafka_port") + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + def kafka_broker = "${externalEnvIp}:${kafka_port}" + if (enabled != null && enabled.equalsIgnoreCase("true")) { + // define kafka + def props = new Properties() + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "${kafka_broker}".toString()) + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") + // Create kafka producer + def producer = new KafkaProducer<>(props) + + for (String kafkaCsvTopic in kafkaCsvTpoics) { + def txt = new File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text + def lines = txt.readLines() + lines.each { line -> + logger.info("=====${line}========") + def record = new ProducerRecord<>(kafkaCsvTopic, null, line) + producer.send(record) + } + } + } + + if (enabled != null && enabled.equalsIgnoreCase("true")) { + def tableName = "test_routine_load_alter" + def jobName = "test_alter" + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + `k1` int(20) NULL, + `k2` string NULL, + `v1` date NULL, + `v2` string NULL, + `v3` datetime NULL, + `v4` string NULL + ) ENGINE=OLAP + DUPLICATE KEY(`k1`) + COMMENT 'OLAP' + DISTRIBUTED BY HASH(`k1`) BUCKETS 3 + PROPERTIES ("replication_allocation" = "tag.location.default: 1"); + """ + + try { + sql """ + CREATE ROUTINE LOAD ${jobName} ON ${tableName} + COLUMNS TERMINATED BY "," + PROPERTIES + ( + "max_batch_interval" = "5", + "max_batch_rows" = "300000", + "max_batch_size" = "209715200" + ) + FROM KAFKA + ( + "kafka_broker_list" = "${externalEnvIp}:${kafka_port}", + "kafka_topic" = "${kafkaCsvTpoics[0]}", + "kafka_partitions" = "0", + "kafka_offsets" = "2" + ); + """ + sql "sync" + + def count = 0 + while (true) { + res = sql "select count(*) from ${tableName}" + def state = sql "show routine load for ${jobName}" + log.info("routine load state: ${state[0][8].toString()}".toString()) + log.info("routine load statistic: ${state[0][14].toString()}".toString()) + log.info("reason of state changed: ${state[0][17].toString()}".toString()) + if (res[0][0] > 0) { + break + } + if (count >= 120) { + log.error("routine load can not visible for long time") + assertEquals(20, res[0][0]) + break + } + sleep(5000) + count++ + } + qt_sql_before "select * from ${tableName} order by k1" + + // test alter offset + sql "pause routine load for ${jobName}" + sql "ALTER ROUTINE LOAD FOR ${jobName} FROM KAFKA(\"kafka_partitions\" = \"0\", \"kafka_offsets\" = \"1\");" + sql "resume routine load for ${jobName}" + + def props = new Properties() + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "${kafka_broker}".toString()) + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") + // Create kafka producer + def producer = new KafkaProducer<>(props) + + for (String kafkaCsvTopic in kafkaCsvTpoics) { + def txt = new File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text + def lines = txt.readLines() + lines.each { line -> + logger.info("=====${line}========") + def record = new ProducerRecord<>(kafkaCsvTopic, null, line) + producer.send(record) + } + } + + count = 0 + while (true) { + res = sql "select count(*) from ${tableName}" + log.info("count: ${res[0][0]}".toString()) + def state = sql "show routine load for ${jobName}" + log.info("routine load state: ${state[0][8].toString()}".toString()) + log.info("routine load statistic: ${state[0][14].toString()}".toString()) + log.info("reason of state changed: ${state[0][17].toString()}".toString()) + if (res[0][0] >= 6) { + break + } + if (count >= 120) { + log.error("routine load can not visible for long time") + assertEquals(20, res[0][0]) + break + } + sleep(5000) + count++ + } + qt_sql_alter_after "select * from ${tableName} order by k1" + } finally { + sql "stop routine load for ${jobName}" + } + } +}