diff --git a/cloud/src/common/bvars.cpp b/cloud/src/common/bvars.cpp index dc401398f68c35..e38a9144164979 100644 --- a/cloud/src/common/bvars.cpp +++ b/cloud/src/common/bvars.cpp @@ -72,6 +72,7 @@ BvarLatencyRecorderWithTag g_bvar_ms_get_delete_bitmap_update_lock("ms", "get_delete_bitmap_update_lock"); BvarLatencyRecorderWithTag g_bvar_ms_get_instance("ms", "get_instance"); BvarLatencyRecorderWithTag g_bvar_ms_get_rl_task_commit_attach("ms", "get_rl_task_commit_attach"); +BvarLatencyRecorderWithTag g_bvar_ms_reset_rl_progress("ms", "reset_rl_progress"); BvarLatencyRecorderWithTag g_bvar_ms_get_txn_id("ms", "get_txn_id"); BvarLatencyRecorderWithTag g_bvar_ms_start_tablet_job("ms", "start_tablet_job"); diff --git a/cloud/src/common/bvars.h b/cloud/src/common/bvars.h index f2957e35940334..f221a76eea02be 100644 --- a/cloud/src/common/bvars.h +++ b/cloud/src/common/bvars.h @@ -173,6 +173,7 @@ extern BvarLatencyRecorderWithTag g_bvar_ms_get_cluster_status; extern BvarLatencyRecorderWithTag g_bvar_ms_set_cluster_status; extern BvarLatencyRecorderWithTag g_bvar_ms_get_instance; extern BvarLatencyRecorderWithTag g_bvar_ms_get_rl_task_commit_attach; +extern BvarLatencyRecorderWithTag g_bvar_ms_reset_rl_progress; extern BvarLatencyRecorderWithTag g_bvar_ms_get_txn_id; extern BvarLatencyRecorderWithTag g_bvar_ms_check_kv; diff --git a/cloud/src/meta-service/meta_service.h b/cloud/src/meta-service/meta_service.h index e2360e9e6ba2f7..6b1cfa0d11f1ce 100644 --- a/cloud/src/meta-service/meta_service.h +++ b/cloud/src/meta-service/meta_service.h @@ -265,6 +265,10 @@ class MetaServiceImpl : public cloud::MetaService { GetRLTaskCommitAttachResponse* response, ::google::protobuf::Closure* done) override; + void reset_rl_progress(::google::protobuf::RpcController* controller, + const ResetRLProgressRequest* request, ResetRLProgressResponse* response, + ::google::protobuf::Closure* done) override; + void get_txn_id(::google::protobuf::RpcController* controller, const GetTxnIdRequest* request, GetTxnIdResponse* response, ::google::protobuf::Closure* done) override; @@ -616,6 +620,12 @@ class MetaServiceProxy final : public MetaService { done); } + void reset_rl_progress(::google::protobuf::RpcController* controller, + const ResetRLProgressRequest* request, ResetRLProgressResponse* response, + ::google::protobuf::Closure* done) override { + call_impl(&cloud::MetaService::reset_rl_progress, controller, request, response, done); + } + void get_txn_id(::google::protobuf::RpcController* controller, const GetTxnIdRequest* request, GetTxnIdResponse* response, ::google::protobuf::Closure* done) override { call_impl(&cloud::MetaService::get_txn_id, controller, request, response, done); diff --git a/cloud/src/meta-service/meta_service_txn.cpp b/cloud/src/meta-service/meta_service_txn.cpp index 4d48d7c9df4f0e..f3acd85928f82c 100644 --- a/cloud/src/meta-service/meta_service_txn.cpp +++ b/cloud/src/meta-service/meta_service_txn.cpp @@ -644,6 +644,58 @@ void MetaServiceImpl::get_rl_task_commit_attach(::google::protobuf::RpcControlle } } +void MetaServiceImpl::reset_rl_progress(::google::protobuf::RpcController* controller, + const ResetRLProgressRequest* request, + ResetRLProgressResponse* response, + ::google::protobuf::Closure* done) { + RPC_PREPROCESS(reset_rl_progress); + instance_id = get_instance_id(resource_mgr_, request->cloud_unique_id()); + if (instance_id.empty()) { + code = MetaServiceCode::INVALID_ARGUMENT; + msg = "empty instance_id"; + LOG(INFO) << msg << ", cloud_unique_id=" << request->cloud_unique_id(); + return; + } + RPC_RATE_LIMIT(reset_rl_progress) + + std::unique_ptr txn; + TxnErrorCode err = txn_kv_->create_txn(&txn); + if (err != TxnErrorCode::TXN_OK) { + code = cast_as(err); + ss << "filed to create txn, err=" << err; + msg = ss.str(); + return; + } + + if (!request->has_db_id() || !request->has_job_id()) { + code = MetaServiceCode::INVALID_ARGUMENT; + msg = "empty db_id or job_id"; + LOG(INFO) << msg << ", cloud_unique_id=" << request->cloud_unique_id(); + return; + } + + int64_t db_id = request->db_id(); + int64_t job_id = request->job_id(); + std::string rl_progress_key; + std::string rl_progress_val; + RLJobProgressKeyInfo rl_progress_key_info {instance_id, db_id, job_id}; + rl_job_progress_key_info(rl_progress_key_info, &rl_progress_key); + txn->remove(rl_progress_key); + err = txn->commit(); + if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) { + code = MetaServiceCode::ROUTINE_LOAD_PROGRESS_NOT_FOUND; + ss << "progress info not found, db_id=" << db_id << " job_id=" << job_id << " err=" << err; + msg = ss.str(); + return; + } else if (err != TxnErrorCode::TXN_OK) { + code = cast_as(err); + ss << "failed to remove progress info, db_id=" << db_id << " job_id=" << job_id + << " err=" << err; + msg = ss.str(); + return; + } +} + void scan_tmp_rowset( const std::string& instance_id, int64_t txn_id, std::shared_ptr& txn_kv, MetaServiceCode& code, std::string& msg, int64_t* db_id, diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java index d5cdc79eb7f7d5..a2c639f94e8fef 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java @@ -352,6 +352,16 @@ public Cloud.GetInstanceResponse getInstance(Cloud.GetInstanceRequest request) { return blockingStub.getRlTaskCommitAttach(request); } + public Cloud. ResetRLProgressResponse resetRLProgress(Cloud. ResetRLProgressRequest request) { + if (!request.hasCloudUniqueId()) { + Cloud. ResetRLProgressRequest.Builder builder = + Cloud. ResetRLProgressRequest.newBuilder(); + builder.mergeFrom(request); + return blockingStub.resetRlProgress(builder.setCloudUniqueId(Config.cloud_unique_id).build()); + } + return blockingStub.resetRlProgress(request); + } + public Cloud.GetObjStoreInfoResponse getObjStoreInfo(Cloud.GetObjStoreInfoRequest request) { if (!request.hasCloudUniqueId()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java index d7ec328906775e..34649f629e9f76 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java @@ -498,6 +498,16 @@ public Cloud.AlterObjStoreInfoResponse alterObjStoreInfo(Cloud.AlterObjStoreInfo } } + public Cloud.ResetRLProgressResponse resetRLProgress(Cloud.ResetRLProgressRequest request) + throws RpcException { + try { + final MetaServiceClient client = getProxy(); + return client.resetRLProgress(request); + } catch (Exception e) { + throw new RpcException("", e.getMessage(), e); + } + } + public Cloud.GetObjStoreInfoResponse getObjStoreInfo(Cloud.GetObjStoreInfoRequest request) throws RpcException { try { diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaProgress.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaProgress.java index cb6c36dc2e69bd..1952ccaf33252d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaProgress.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaProgress.java @@ -132,15 +132,17 @@ private void getReadableProgress(ConcurrentMap showPartitionIdT } } - // modify the partition offset of this progress. - // throw exception is the specified partition does not exist in progress. - public void modifyOffset(List> kafkaPartitionOffsets) throws DdlException { + public void checkPartitions(List> kafkaPartitionOffsets) throws DdlException { for (Pair pair : kafkaPartitionOffsets) { if (!partitionIdToOffset.containsKey(pair.first)) { throw new DdlException("The specified partition " + pair.first + " is not in the consumed partitions"); } } + } + // modify the partition offset of this progress. + // throw exception is the specified partition does not exist in progress. + public void modifyOffset(List> kafkaPartitionOffsets) { for (Pair pair : kafkaPartitionOffsets) { partitionIdToOffset.put(pair.first, pair.second); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java index 751516b559a8c8..555ae986873060 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java @@ -713,22 +713,35 @@ private void modifyPropertiesInternal(Map jobProperties, customKafkaProperties = dataSourceProperties.getCustomKafkaProperties(); } - // modify partition offset first - if (!kafkaPartitionOffsets.isEmpty()) { - // we can only modify the partition that is being consumed - ((KafkaProgress) progress).modifyOffset(kafkaPartitionOffsets); - } - + // convertCustomProperties and check partitions before reset progress to make modify operation atomic if (!customKafkaProperties.isEmpty()) { this.customProperties.putAll(customKafkaProperties); convertCustomProperties(true); } - // modify broker list and topic - if (!Strings.isNullOrEmpty(dataSourceProperties.getBrokerList())) { - this.brokerList = dataSourceProperties.getBrokerList(); + + if (!kafkaPartitionOffsets.isEmpty()) { + ((KafkaProgress) progress).checkPartitions(kafkaPartitionOffsets); } + + // It is necessary to reset the Kafka progress cache if topic change, + // and should reset cache before modifying partition offset. if (!Strings.isNullOrEmpty(dataSourceProperties.getTopic())) { + if (Config.isCloudMode()) { + resetCloudProgress(); + } this.topic = dataSourceProperties.getTopic(); + this.progress = new KafkaProgress(); + } + + // modify partition offset + if (!kafkaPartitionOffsets.isEmpty()) { + // we can only modify the partition that is being consumed + ((KafkaProgress) progress).modifyOffset(kafkaPartitionOffsets); + } + + // modify broker list + if (!Strings.isNullOrEmpty(dataSourceProperties.getBrokerList())) { + this.brokerList = dataSourceProperties.getBrokerList(); } } if (!jobProperties.isEmpty()) { @@ -743,6 +756,31 @@ private void modifyPropertiesInternal(Map jobProperties, this.id, jobProperties, dataSourceProperties); } + private void resetCloudProgress() throws DdlException { + Cloud.ResetRLProgressRequest.Builder builder = + Cloud.ResetRLProgressRequest.newBuilder(); + builder.setCloudUniqueId(Config.cloud_unique_id); + builder.setDbId(dbId); + builder.setJobId(id); + + Cloud.ResetRLProgressResponse response; + try { + response = MetaServiceProxy.getInstance().resetRLProgress(builder.build()); + if (response.getStatus().getCode() != Cloud.MetaServiceCode.OK) { + LOG.warn("failed to reset cloud progress, response: {}", response); + if (response.getStatus().getCode() == Cloud.MetaServiceCode.ROUTINE_LOAD_PROGRESS_NOT_FOUND) { + LOG.warn("not found routine load progress, response: {}", response); + return; + } else { + throw new DdlException(response.getStatus().getMsg()); + } + } + } catch (RpcException e) { + LOG.info("failed to reset cloud progress {}", e); + throw new DdlException(e.getMessage()); + } + } + @Override public void replayModifyProperties(AlterRoutineLoadJobOperationLog log) { try { diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto index f8797ea54e733a..6c3b677ce9bac6 100644 --- a/gensrc/proto/cloud.proto +++ b/gensrc/proto/cloud.proto @@ -1423,6 +1423,16 @@ message GetRLTaskCommitAttachResponse { optional RLTaskTxnCommitAttachmentPB commit_attach = 2; } +message ResetRLProgressRequest { + optional string cloud_unique_id = 1; // For auth + optional int64 db_id = 2; + optional int64 job_id = 3; +} + +message ResetRLProgressResponse { + optional MetaServiceResponseStatus status = 1; +} + message CheckKeyInfos { repeated int64 db_ids = 1; repeated int64 table_ids = 2; @@ -1513,6 +1523,7 @@ service MetaService { // routine load progress rpc get_rl_task_commit_attach(GetRLTaskCommitAttachRequest) returns (GetRLTaskCommitAttachResponse); + rpc reset_rl_progress(ResetRLProgressRequest) returns (ResetRLProgressResponse); // check KV rpc check_kv(CheckKVRequest) returns (CheckKVResponse); diff --git a/regression-test/data/load_p0/routine_load/test_routine_load_topic_change.out b/regression-test/data/load_p0/routine_load/test_routine_load_topic_change.out new file mode 100644 index 00000000000000..1f534d0a0823ec --- /dev/null +++ b/regression-test/data/load_p0/routine_load/test_routine_load_topic_change.out @@ -0,0 +1,16 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql_topic_change -- +1 eab 2023-07-15 def 2023-07-20T05:48:31 "ghi" +2 eab 2023-07-15 def 2023-07-20T05:48:31 "ghi" +3 eab 2023-07-15 def 2023-07-20T05:48:31 "ghi" +4 eab 2023-07-15 def 2023-07-20T05:48:31 "ghi" +5 eab 2023-07-15 def 2023-07-20T05:48:31 "ghi" + +-- !sql_topic_change1 -- +1 eab 2023-07-15 def 2023-07-20T05:48:31 "ghi" +2 eab 2023-07-15 def 2023-07-20T05:48:31 "ghi" +3 eab 2023-07-15 def 2023-07-20T05:48:31 "ghi" +4 eab 2023-07-15 def 2023-07-20T05:48:31 "ghi" +5 eab 2023-07-15 def 2023-07-20T05:48:31 "ghi" +6 eab 2023-07-15 def 2023-07-20T05:48:31 "ghi" + diff --git a/regression-test/suites/load_p0/routine_load/data/test_topic_after.csv b/regression-test/suites/load_p0/routine_load/data/test_topic_after.csv new file mode 100644 index 00000000000000..de1727d2d81c9b --- /dev/null +++ b/regression-test/suites/load_p0/routine_load/data/test_topic_after.csv @@ -0,0 +1 @@ +6,eab,2023-07-15,def,2023-07-20:05:48:31,"ghi" \ No newline at end of file diff --git a/regression-test/suites/load_p0/routine_load/data/test_topic_before.csv b/regression-test/suites/load_p0/routine_load/data/test_topic_before.csv new file mode 100644 index 00000000000000..f1a48b1e411249 --- /dev/null +++ b/regression-test/suites/load_p0/routine_load/data/test_topic_before.csv @@ -0,0 +1,5 @@ +1,eab,2023-07-15,def,2023-07-20:05:48:31,"ghi" +2,eab,2023-07-15,def,2023-07-20:05:48:31,"ghi" +3,eab,2023-07-15,def,2023-07-20:05:48:31,"ghi" +4,eab,2023-07-15,def,2023-07-20:05:48:31,"ghi" +5,eab,2023-07-15,def,2023-07-20:05:48:31,"ghi" \ No newline at end of file diff --git a/regression-test/suites/load_p0/routine_load/test_routine_load_topic_change.groovy b/regression-test/suites/load_p0/routine_load/test_routine_load_topic_change.groovy new file mode 100644 index 00000000000000..25bf9933d112ff --- /dev/null +++ b/regression-test/suites/load_p0/routine_load/test_routine_load_topic_change.groovy @@ -0,0 +1,140 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.kafka.clients.admin.AdminClient +import org.apache.kafka.clients.producer.KafkaProducer +import org.apache.kafka.clients.producer.ProducerRecord +import org.apache.kafka.clients.producer.ProducerConfig + +suite("test_routine_load_topic_change","p0") { + // send data to Kafka + def kafkaCsvTpoics = [ + "test_topic_before", + "test_topic_after", + ] + String enabled = context.config.otherConfigs.get("enableKafkaTest") + String kafka_port = context.config.otherConfigs.get("kafka_port") + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + def kafka_broker = "${externalEnvIp}:${kafka_port}" + if (enabled != null && enabled.equalsIgnoreCase("true")) { + // define kafka + def props = new Properties() + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "${kafka_broker}".toString()) + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") + // Create kafka producer + def producer = new KafkaProducer<>(props) + + for (String kafkaCsvTopic in kafkaCsvTpoics) { + def txt = new File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text + def lines = txt.readLines() + lines.each { line -> + logger.info("=====${line}========") + def record = new ProducerRecord<>(kafkaCsvTopic, null, line) + producer.send(record) + } + } + } + + // test create routine load job with enclose and escape + def tableName = "test_routine_load_topic_change" + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + `k1` int(20) NULL, + `k2` string NULL, + `v1` date NULL, + `v2` string NULL, + `v3` datetime NULL, + `v4` string NULL + ) ENGINE=OLAP + DUPLICATE KEY(`k1`) + COMMENT 'OLAP' + DISTRIBUTED BY HASH(`k1`) BUCKETS 3 + PROPERTIES ("replication_allocation" = "tag.location.default: 1"); + """ + + if (enabled != null && enabled.equalsIgnoreCase("true")) { + def jobName = "test_topic_change" + try { + sql """ + CREATE ROUTINE LOAD ${jobName} on ${tableName} + COLUMNS TERMINATED BY "," + PROPERTIES + ( + "max_batch_interval" = "5", + "max_batch_rows" = "300000", + "max_batch_size" = "209715200" + ) + FROM KAFKA + ( + "kafka_broker_list" = "${externalEnvIp}:${kafka_port}", + "kafka_topic" = "${kafkaCsvTpoics[0]}", + "property.kafka_default_offsets" = "OFFSET_BEGINNING" + ); + """ + sql "sync" + + def count = 0 + while (true) { + def res = sql "select count(*) from ${tableName}" + def state = sql "show routine load for ${jobName}" + log.info("routine load state: ${state[0][8].toString()}".toString()) + log.info("routine load statistic: ${state[0][14].toString()}".toString()) + log.info("reason of state changed: ${state[0][17].toString()}".toString()) + if (res[0][0] > 0) { + break + } + if (count >= 120) { + log.error("routine load can not visible for long time") + assertEquals(20, res[0][0]) + break + } + sleep(1000) + count++ + } + qt_sql_topic_change "select * from ${tableName} order by k1" + + sql "pause routine load for ${jobName}" + def res = sql "show routine load for ${jobName}" + log.info("routine load job properties: ${res[0][11].toString()}".toString()) + sql "ALTER ROUTINE LOAD FOR ${jobName} FROM KAFKA(\"kafka_topic\" = \"${kafkaCsvTpoics[1]}\", \"property.kafka_default_offsets\" = \"OFFSET_BEGINNING\");" + sql "resume routine load for ${jobName}" + count = 0 + while (true) { + res = sql "select count(*) from ${tableName}" + def state = sql "show routine load for ${jobName}" + log.info("routine load state: ${state[0][8].toString()}".toString()) + log.info("routine load statistic: ${state[0][14].toString()}".toString()) + log.info("reason of state changed: ${state[0][17].toString()}".toString()) + if (res[0][0] > 5) { + break + } + if (count >= 120) { + log.error("routine load can not visible for long time") + assertEquals(20, res[0][0]) + break + } + sleep(1000) + count++ + } + qt_sql_topic_change1 "select * from ${tableName} order by k1" + } finally { + sql "stop routine load for ${jobName}" + } + } +} \ No newline at end of file