diff --git a/be/src/agent/agent_server.cpp b/be/src/agent/agent_server.cpp index 55b4e8990563d7..998456a500d071 100644 --- a/be/src/agent/agent_server.cpp +++ b/be/src/agent/agent_server.cpp @@ -42,6 +42,7 @@ #include "olap/snapshot_manager.h" #include "olap/storage_engine.h" #include "runtime/exec_env.h" +#include "util/work_thread_pool.hpp" namespace doris { @@ -168,8 +169,8 @@ void AgentServer::start_workers(StorageEngine& engine, ExecEnv* exec_env) { _workers[TTaskType::ALTER] = std::make_unique( "ALTER_TABLE", config::alter_tablet_worker_count, [&engine](auto&& task) { return alter_tablet_callback(engine, task); }); - _workers[TTaskType::CLONE] = std::make_unique( - "CLONE", config::clone_worker_count, [&engine, &cluster_info = _cluster_info](auto&& task) { return clone_callback(engine, cluster_info, task); }); + _workers[TTaskType::CLONE] = std::make_unique( + "CLONE", config::clone_worker_count,config::clone_worker_count, [&engine, &cluster_info = _cluster_info](auto&& task) { return clone_callback(engine, cluster_info, task); }); _workers[TTaskType::STORAGE_MEDIUM_MIGRATE] = std::make_unique( "STORAGE_MEDIUM_MIGRATE", config::storage_medium_migrate_count, [&engine](auto&& task) { return storage_medium_migrate_callback(engine, task); }); @@ -198,6 +199,8 @@ void AgentServer::start_workers(StorageEngine& engine, ExecEnv* exec_env) { _report_workers.push_back(std::make_unique( "REPORT_INDEX_POLICY", _cluster_info, config::report_index_policy_interval_seconds,[&cluster_info = _cluster_info] { report_index_policy_callback(cluster_info); })); // clang-format on + + exec_env->storage_engine().to_local().workers = &_workers; } void AgentServer::cloud_start_workers(CloudStorageEngine& engine, ExecEnv* exec_env) { diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp index 5bd18d62ca6856..cc59978da937e7 100644 --- a/be/src/agent/task_worker_pool.cpp +++ b/be/src/agent/task_worker_pool.cpp @@ -17,6 +17,7 @@ #include "agent/task_worker_pool.h" +#include #include #include #include @@ -85,6 +86,7 @@ #include "runtime/memory/global_memory_arbitrator.h" #include "runtime/snapshot_loader.h" #include "service/backend_options.h" +#include "util/brpc_client_cache.h" #include "util/debug_points.h" #include "util/doris_metrics.h" #include "util/jni-util.h" @@ -609,6 +611,52 @@ Status PriorTaskWorkerPool::submit_task(const TAgentTaskRequest& task) { }); } +Status PriorTaskWorkerPool::submit_high_prior_and_cancel_low(const TAgentTaskRequest& task) { + const TTaskType::type task_type = task.task_type; + int64_t signature = task.signature; + std::string type_str; + EnumToString(TTaskType, task_type, type_str); + auto req = std::make_unique(task); + + DCHECK(req->__isset.priority && req->priority == TPriority::HIGH); + do { + std::lock_guard lock(s_task_signatures_mtx); + auto& set = s_task_signatures[task_type]; + if (!set.contains(signature)) { + // If it doesn't exist, put it directly into the priority queue + add_task_count(*req, 1); + set.insert(signature); + std::lock_guard lock(_mtx); + _high_prior_queue.push_back(std::move(req)); + _high_prior_condv.notify_one(); + _normal_condv.notify_one(); + break; + } else { + std::lock_guard lock(_mtx); + for (auto it = _normal_queue.begin(); it != _normal_queue.end();) { + // If it exists in the normal queue, cancel the task in the normal queue + if ((*it)->signature == signature) { + _normal_queue.erase(it); // cancel the original task + _high_prior_queue.push_back(std::move(req)); // add the new task to the queue + _high_prior_condv.notify_one(); + _normal_condv.notify_one(); + break; + } else { + ++it; // doesn't meet the condition, continue to the next one + } + } + // If it exists in the high priority queue, no operation is needed + LOG_INFO("task has already existed in high prior queue.").tag("signature", signature); + } + } while (false); + + // Set the receiving time of task so that we can determine whether it is timed out later + (const_cast(task)).__set_recv_time(time(nullptr)); + + LOG_INFO("successfully submit task").tag("type", type_str).tag("signature", signature); + return Status::OK(); +} + void PriorTaskWorkerPool::normal_loop() { while (true) { std::unique_ptr req; diff --git a/be/src/agent/task_worker_pool.h b/be/src/agent/task_worker_pool.h index 8d9be32b3dc017..b64d84fb5c4a19 100644 --- a/be/src/agent/task_worker_pool.h +++ b/be/src/agent/task_worker_pool.h @@ -89,6 +89,8 @@ class PriorTaskWorkerPool final : public TaskWorkerPoolIf { Status submit_task(const TAgentTaskRequest& task) override; + Status submit_high_prior_and_cancel_low(const TAgentTaskRequest& task); + private: void normal_loop(); diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index d63570ff4dffa9..15b7dc92d83227 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1532,6 +1532,10 @@ DEFINE_mInt32(segments_key_bounds_truncation_threshold, "-1"); // ATTENTION: for test only, use random segments key bounds truncation threshold every time DEFINE_mBool(random_segments_key_bounds_truncation, "false"); +DEFINE_mBool(enable_auto_clone_on_compaction_missing_version, "false"); + +DEFINE_mBool(enable_auto_clone_on_mow_publish_missing_version, "false"); + // clang-format off #ifdef BE_TEST // test s3 diff --git a/be/src/common/config.h b/be/src/common/config.h index a867bfccfada96..71e5ae1f90ea8d 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1603,6 +1603,10 @@ DECLARE_mInt32(segments_key_bounds_truncation_threshold); // ATTENTION: for test only, use random segments key bounds truncation threshold every time DECLARE_mBool(random_segments_key_bounds_truncation); +DECLARE_mBool(enable_auto_clone_on_compaction_missing_version); + +DECLARE_mBool(enable_auto_clone_on_mow_publish_missing_version); + #ifdef BE_TEST // test s3 DECLARE_String(test_s3_resource); diff --git a/be/src/olap/cumulative_compaction.cpp b/be/src/olap/cumulative_compaction.cpp index a0746997128e46..8cd41be388dbe6 100644 --- a/be/src/olap/cumulative_compaction.cpp +++ b/be/src/olap/cumulative_compaction.cpp @@ -18,10 +18,13 @@ #include "olap/cumulative_compaction.h" #include +#include +#include #include #include #include +#include #include "common/config.h" #include "common/logging.h" @@ -29,7 +32,9 @@ #include "olap/cumulative_compaction_time_series_policy.h" #include "olap/olap_define.h" #include "olap/rowset/rowset_meta.h" +#include "olap/storage_engine.h" #include "olap/tablet.h" +#include "runtime/exec_env.h" #include "runtime/thread_context.h" #include "util/doris_metrics.h" #include "util/time.h" @@ -187,6 +192,24 @@ Status CumulativeCompaction::pick_rowsets_to_compact() { << ", first missed version prev rowset verison=" << missing_versions[0] << ", first missed version next rowset version=" << missing_versions[1] << ", tablet=" << _tablet->tablet_id(); + if (config::enable_auto_clone_on_compaction_missing_version) { + LOG_INFO("cumulative compaction submit missing rowset clone task.") + .tag("tablet_id", _tablet->tablet_id()) + .tag("version", missing_versions.back().first) + .tag("replica_id", tablet()->replica_id()) + .tag("partition_id", _tablet->partition_id()) + .tag("table_id", _tablet->table_id()); + Status st = _engine.submit_clone_task(tablet(), missing_versions.back().first); + if (!st) { + LOG_WARNING("cumulative compaction failed to submit missing rowset clone task.") + .tag("st", st.to_string()) + .tag("tablet_id", _tablet->tablet_id()) + .tag("version", missing_versions.back().first) + .tag("replica_id", tablet()->replica_id()) + .tag("partition_id", _tablet->partition_id()) + .tag("table_id", _tablet->table_id()); + } + } } int64_t max_score = config::cumulative_compaction_max_deltas; diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp index dcb296cce82db2..5388e457352a6d 100644 --- a/be/src/olap/storage_engine.cpp +++ b/be/src/olap/storage_engine.cpp @@ -20,6 +20,8 @@ // IWYU pragma: no_include #include #include +#include +#include #include #include #include @@ -37,6 +39,7 @@ #include #include #include +#include #include #include #include @@ -49,6 +52,7 @@ #include "common/config.h" #include "common/logging.h" #include "common/status.h" +#include "gen_cpp/FrontendService.h" #include "io/fs/local_file_system.h" #include "olap/binlog.h" #include "olap/data_dir.h" @@ -67,6 +71,7 @@ #include "olap/tablet_meta.h" #include "olap/tablet_meta_manager.h" #include "olap/txn_manager.h" +#include "runtime/client_cache.h" #include "runtime/stream_load/stream_load_recorder.h" #include "util/doris_metrics.h" #include "util/mem_info.h" @@ -74,8 +79,10 @@ #include "util/stopwatch.hpp" #include "util/thread.h" #include "util/threadpool.h" +#include "util/thrift_rpc_helper.h" #include "util/uid_util.h" #include "util/work_thread_pool.hpp" +#include "vec/common/assert_cast.h" using std::filesystem::directory_iterator; using std::filesystem::path; @@ -1500,6 +1507,79 @@ bool StorageEngine::get_peer_replica_info(int64_t tablet_id, TReplicaInfo* repli return false; } +bool StorageEngine::get_peers_replica_backends(int64_t tablet_id, std::vector* backends) { + TabletSharedPtr tablet = _tablet_manager->get_tablet(tablet_id); + if (tablet == nullptr) { + LOG(WARNING) << "tablet is no longer exist: tablet_id=" << tablet_id; + return false; + } + int64_t cur_time = UnixMillis(); + if (cur_time - _last_get_peers_replica_backends_time_ms < 10000) { + LOG_WARNING("failed to get peers replica backens.") + .tag("last time", _last_get_peers_replica_backends_time_ms) + .tag("cur time", cur_time); + return false; + } + LOG_INFO("start get peers replica backends info.").tag("tablet id", tablet_id); + ClusterInfo* cluster_info = ExecEnv::GetInstance()->cluster_info(); + if (cluster_info == nullptr) { + LOG(WARNING) << "Have not get FE Master heartbeat yet"; + return false; + } + TNetworkAddress master_addr = cluster_info->master_fe_addr; + if (master_addr.hostname.empty() || master_addr.port == 0) { + LOG(WARNING) << "Have not get FE Master heartbeat yet"; + return false; + } + TGetTabletReplicaInfosRequest request; + TGetTabletReplicaInfosResult result; + request.tablet_ids.emplace_back(tablet_id); + Status rpc_st = ThriftRpcHelper::rpc( + master_addr.hostname, master_addr.port, + [&request, &result](FrontendServiceConnection& client) { + client->getTabletReplicaInfos(result, request); + }); + + if (!rpc_st.ok()) { + LOG(WARNING) << "Failed to get tablet replica infos, encounter rpc failure, " + "tablet id: " + << tablet_id; + return false; + } + std::unique_lock lock(_peer_replica_infos_mutex); + if (result.tablet_replica_infos.contains(tablet_id)) { + std::vector reps = result.tablet_replica_infos[tablet_id]; + DCHECK_NE(reps.size(), 0); + for (const auto& rep : reps) { + if (rep.replica_id != tablet->replica_id()) { + TBackend backend; + backend.__set_host(rep.host); + backend.__set_be_port(rep.be_port); + backend.__set_http_port(rep.http_port); + backend.__set_brpc_port(rep.brpc_port); + if (rep.__isset.is_alive) { + backend.__set_is_alive(rep.is_alive); + } + if (rep.__isset.backend_id) { + backend.__set_id(rep.backend_id); + } + backends->emplace_back(backend); + std::stringstream backend_string; + backend.printTo(backend_string); + LOG_INFO("get 1 peer replica backend info.") + .tag("tablet id", tablet_id) + .tag("backend info", backend_string.str()); + } + } + _last_get_peers_replica_backends_time_ms = UnixMillis(); + LOG_INFO("succeed get peers replica backends info.") + .tag("tablet id", tablet_id) + .tag("replica num", backends->size()); + return true; + } + return false; +} + bool StorageEngine::should_fetch_from_peer(int64_t tablet_id) { #ifdef BE_TEST if (tablet_id % 2 == 0) { @@ -1617,6 +1697,36 @@ Status StorageEngine::_persist_broken_paths() { return Status::OK(); } +Status StorageEngine::submit_clone_task(Tablet* tablet, int64_t version) { + std::vector backends; + if (!get_peers_replica_backends(tablet->tablet_id(), &backends)) { + LOG(WARNING) << tablet->tablet_id() << " tablet doesn't have peer replica backends"; + return Status::InternalError(""); + } + TAgentTaskRequest task; + TCloneReq req; + req.__set_tablet_id(tablet->tablet_id()); + req.__set_schema_hash(tablet->schema_hash()); + req.__set_src_backends(backends); + req.__set_version(version); + req.__set_replica_id(tablet->replica_id()); + req.__set_partition_id(tablet->partition_id()); + req.__set_table_id(tablet->table_id()); + task.__set_task_type(TTaskType::CLONE); + task.__set_clone_req(req); + task.__set_priority(TPriority::HIGH); + task.__set_signature(tablet->tablet_id()); + LOG_INFO("BE start to submit missing rowset clone task.") + .tag("tablet_id", tablet->tablet_id()) + .tag("version", version) + .tag("replica_id", tablet->replica_id()) + .tag("partition_id", tablet->partition_id()) + .tag("table_id", tablet->table_id()); + RETURN_IF_ERROR(assert_cast(workers->at(TTaskType::CLONE).get()) + ->submit_high_prior_and_cancel_low(task)); + return Status::OK(); +} + int CreateTabletRRIdxCache::get_index(const std::string& key) { auto* lru_handle = lookup(key); if (lru_handle) { diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h index bf4dbd65b4ad45..f2350eedc4f802 100644 --- a/be/src/olap/storage_engine.h +++ b/be/src/olap/storage_engine.h @@ -37,6 +37,7 @@ #include #include +#include "agent/task_worker_pool.h" #include "common/config.h" #include "common/status.h" #include "olap/calc_delete_bitmap_executor.h" @@ -312,6 +313,8 @@ class StorageEngine final : public BaseStorageEngine { bool get_peer_replica_info(int64_t tablet_id, TReplicaInfo* replica, std::string* token); + bool get_peers_replica_backends(int64_t tablet_id, std::vector* backends); + bool should_fetch_from_peer(int64_t tablet_id); const std::shared_ptr& get_stream_load_recorder() { @@ -341,6 +344,10 @@ class StorageEngine final : public BaseStorageEngine { std::set get_broken_paths() { return _broken_paths; } + Status submit_clone_task(Tablet* tablet, int64_t version); + + std::unordered_map>* workers; + private: // Instance should be inited from `static open()` // MUST NOT be called in other circumstances. @@ -561,6 +568,8 @@ class StorageEngine final : public BaseStorageEngine { // thread to check tablet delete bitmap count tasks scoped_refptr _check_delete_bitmap_score_thread; + + int64_t _last_get_peers_replica_backends_time_ms {0}; }; // lru cache for create tabelt round robin in disks diff --git a/be/src/olap/task/engine_clone_task.cpp b/be/src/olap/task/engine_clone_task.cpp index b840714633b8d9..db0ad79f51f05b 100644 --- a/be/src/olap/task/engine_clone_task.cpp +++ b/be/src/olap/task/engine_clone_task.cpp @@ -39,6 +39,7 @@ #include #include #include +#include #include "common/config.h" #include "common/logging.h" @@ -986,5 +987,4 @@ Status EngineCloneTask::_finish_full_clone(Tablet* tablet, return tablet->revise_tablet_meta(to_add, to_delete, false); // TODO(plat1ko): write cooldown meta to remote if this replica is cooldown replica } - } // namespace doris diff --git a/be/src/olap/task/engine_publish_version_task.cpp b/be/src/olap/task/engine_publish_version_task.cpp index 2dcc1723b71005..6c37e55da757e4 100644 --- a/be/src/olap/task/engine_publish_version_task.cpp +++ b/be/src/olap/task/engine_publish_version_task.cpp @@ -32,6 +32,7 @@ #include #include +#include "cloud/config.h" #include "common/logging.h" #include "olap/storage_engine.h" #include "olap/tablet_manager.h" @@ -216,6 +217,25 @@ Status EnginePublishVersionTask::execute() { continue; } auto handle_version_not_continuous = [&]() { + if (config::enable_auto_clone_on_mow_publish_missing_version) { + LOG_INFO("mow publish submit missing rowset clone task.") + .tag("tablet_id", tablet->tablet_id()) + .tag("version", version.first - 1) + .tag("replica_id", tablet->replica_id()) + .tag("partition_id", tablet->partition_id()) + .tag("table_id", tablet->table_id()); + Status st = _engine.submit_clone_task(tablet.get(), version.first - 1); + if (!st) { + LOG_WARNING( + "mow publish failed to submit missing rowset clone task.") + .tag("st", st.to_string()) + .tag("tablet_id", tablet->tablet_id()) + .tag("version", version.first - 1) + .tag("replica_id", tablet->replica_id()) + .tag("partition_id", tablet->partition_id()) + .tag("table_id", tablet->table_id()); + } + } add_error_tablet_id(tablet_info.tablet_id); // When there are too many missing versions, do not directly retry the // publish and handle it through async publish. diff --git a/be/src/util/blocking_priority_queue.hpp b/be/src/util/blocking_priority_queue.hpp index bfc1c34e8f16d7..43fe1e4df473d8 100644 --- a/be/src/util/blocking_priority_queue.hpp +++ b/be/src/util/blocking_priority_queue.hpp @@ -22,6 +22,7 @@ #include +#include #include #include #include diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 9610afe22dd5e5..adfa8237cb3020 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -2741,6 +2741,8 @@ public TGetTabletReplicaInfosResult getTabletReplicaInfos(TGetTabletReplicaInfos replicaInfo.setBePort(backend.getBePort()); replicaInfo.setHttpPort(backend.getHttpPort()); replicaInfo.setBrpcPort(backend.getBrpcPort()); + replicaInfo.setIsAlive(backend.isAlive()); + replicaInfo.setBackendId(backend.getId()); replicaInfo.setReplicaId(replica.getId()); replicaInfos.add(replicaInfo); } diff --git a/gensrc/thrift/Types.thrift b/gensrc/thrift/Types.thrift index 4fe2e1ab0825b1..54436bbc130969 100644 --- a/gensrc/thrift/Types.thrift +++ b/gensrc/thrift/Types.thrift @@ -670,6 +670,8 @@ struct TReplicaInfo { 3: required TPort http_port 4: required TPort brpc_port 5: required TReplicaId replica_id + 6: optional bool is_alive + 7: optional i64 backend_id } struct TResourceInfo { diff --git a/regression-test/suites/fault_injection_p0/test_compaction_clone_missing_rowset_fault_injection.groovy b/regression-test/suites/fault_injection_p0/test_compaction_clone_missing_rowset_fault_injection.groovy new file mode 100644 index 00000000000000..a7f060a110888b --- /dev/null +++ b/regression-test/suites/fault_injection_p0/test_compaction_clone_missing_rowset_fault_injection.groovy @@ -0,0 +1,112 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.doris.regression.suite.ClusterOptions +import org.apache.http.NoHttpResponseException +import org.apache.doris.regression.util.DebugPoint +import org.apache.doris.regression.util.NodeType + +suite('test_compaction_clone_missing_rowset_fault_injection', 'docker') { + def options = new ClusterOptions() + options.cloudMode = false + options.enableDebugPoints() + options.feConfigs += [ "disable_tablet_scheduler=true" ] + options.beConfigs += [ "enable_auto_clone_on_compaction_missing_version=true" ] + options.beNum = 3 + docker(options) { + + def injectBe = null + def normalBe = null + def backends = sql_return_maparray('show backends') + + injectBe = backends[0] + assertNotNull(injectBe) + normalBe = backends[1] + assertNotNull(normalBe) + + try { + def tableName = "test_compaction_clone_missing_rowset" + sql """ DROP TABLE IF EXISTS ${tableName} force""" + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + `k` int , + `v` int , + ) engine=olap + DUPLICATE KEY(k) + DISTRIBUTED BY HASH(k) + BUCKETS 1 + properties( + "replication_num" = "3", + "disable_auto_compaction" = "true") + """ + sql """ INSERT INTO ${tableName} VALUES (1,0)""" + DebugPoint.enableDebugPoint(injectBe.Host, injectBe.HttpPort.toInteger(), NodeType.BE, "EnginePublishVersionTask.finish.random", [percent:"1.0"]) + sql """ INSERT INTO ${tableName} VALUES (2,0)""" + sql """ INSERT INTO ${tableName} VALUES (3,0)""" + sql """ INSERT INTO ${tableName} VALUES (4,0)""" + DebugPoint.disableDebugPoint(injectBe.Host, injectBe.HttpPort.toInteger(), NodeType.BE, "EnginePublishVersionTask.finish.random") + sql """ INSERT INTO ${tableName} VALUES (5,0)""" + + def array = sql_return_maparray("SHOW TABLETS FROM ${tableName}") + def tabletId = array[0].TabletId + + // 1st check rowsets + logger.info("1st show:" + tabletId) + def (code, out, err) = be_show_tablet_status(injectBe.Host, injectBe.HttpPort, tabletId) + logger.info("1st show: code=" + code + ", out=" + out + ", err=" + err) + assertTrue(out.contains("[0-1]")) + assertTrue(out.contains("[2-2]")) + // missing rowset [3-5] + assertTrue(out.contains("[3-5]")) + assertTrue(out.contains("[6-6]")) + + logger.info("1st run cumu compaction:" + tabletId) + (code, out, err) = be_run_cumulative_compaction(injectBe.Host, injectBe.HttpPort, tabletId) + logger.info("1st Run cumu compaction: code=" + code + ", out=" + out + ", err=" + err) + + sleep(10000) + + // 2nd check rowsets + logger.info("2nd show:" + tabletId) + (code, out, err) = be_show_tablet_status(injectBe.Host, injectBe.HttpPort, tabletId) + logger.info("2nd show: code=" + code + ", out=" + out + ", err=" + err) + assertTrue(out.contains("[0-1]")) + assertTrue(out.contains("[2-2]")) + assertTrue(out.contains("[3-3]")) + assertTrue(out.contains("[4-4]")) + assertTrue(out.contains("[5-5]")) + assertTrue(out.contains("[6-6]")) + + logger.info("2nd cumu compaction:" + tabletId) + (code, out, err) = be_run_cumulative_compaction(injectBe.Host, injectBe.HttpPort, tabletId) + logger.info("2nd cumu compaction: code=" + code + ", out=" + out + ", err=" + err) + + // check rowsets + logger.info("3rd show:" + tabletId) + (code, out, err) = be_show_tablet_status(injectBe.Host, injectBe.HttpPort, tabletId) + logger.info("3rd show: code=" + code + ", out=" + out + ", err=" + err) + assertTrue(out.contains("[0-1]")) + assertTrue(out.contains("[2-2]")) + assertTrue(out.contains("[3-6]")) + + } finally { + if (injectBe != null) { + DebugPoint.disableDebugPoint(injectBe.Host, injectBe.HttpPort.toInteger(), NodeType.BE, "EnginePublishVersionTask.finish.random") + } + } + } +} diff --git a/regression-test/suites/fault_injection_p0/test_mow_publish_clone_missing_rowset.groovy b/regression-test/suites/fault_injection_p0/test_mow_publish_clone_missing_rowset.groovy new file mode 100644 index 00000000000000..14f0073f5c88ec --- /dev/null +++ b/regression-test/suites/fault_injection_p0/test_mow_publish_clone_missing_rowset.groovy @@ -0,0 +1,134 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.doris.regression.suite.ClusterOptions +import org.apache.http.NoHttpResponseException +import org.apache.doris.regression.util.DebugPoint +import org.apache.doris.regression.util.NodeType + +suite('test_mow_publish_clone_missing_rowset_fault_injection', 'docker') { + + def set_be_param = { paramName, paramValue, beIp, bePort -> + def (code, out, err) = curl("POST", String.format("http://%s:%s/api/update_config?%s=%s", beIp, bePort, paramName, paramValue)) + assertTrue(out.contains("OK")) + } + + def options = new ClusterOptions() + options.cloudMode = false + options.enableDebugPoints() + options.feConfigs += [ "disable_tablet_scheduler=true" ] + options.beConfigs += [ "enable_auto_clone_on_mow_publish_missing_version=false" ] + options.beNum = 3 + docker(options) { + + def injectBe = null + def normalBe = null + def backends = sql_return_maparray('show backends') + + injectBe = backends[0] + assertNotNull(injectBe) + normalBe = backends[1] + assertNotNull(normalBe) + + try { + def tableName = "test_mow_publish_clone_missing_rowset" + sql """ DROP TABLE IF EXISTS ${tableName} force""" + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + `k` int , + `v` int , + ) engine=olap + UNIQUE KEY(k) + DISTRIBUTED BY HASH(k) + BUCKETS 1 + properties( + "replication_num" = "3", + "disable_auto_compaction" = "true") + """ + sql """ INSERT INTO ${tableName} VALUES (1,0)""" + DebugPoint.enableDebugPoint(injectBe.Host, injectBe.HttpPort.toInteger(), NodeType.BE, "EnginePublishVersionTask.finish.random", [percent:"1.0"]) + sql """ INSERT INTO ${tableName} VALUES (2,0)""" + DebugPoint.disableDebugPoint(injectBe.Host, injectBe.HttpPort.toInteger(), NodeType.BE, "EnginePublishVersionTask.finish.random") + sql """ INSERT INTO ${tableName} VALUES (3,0)""" + sql """ INSERT INTO ${tableName} VALUES (4,0)""" + sql """ INSERT INTO ${tableName} VALUES (5,0)""" + + def array = sql_return_maparray("SHOW TABLETS FROM ${tableName}") + def tabletId = array[0].TabletId + + // normal be check rowsets + logger.info("normal be show:" + tabletId) + def (code, out, err) = be_show_tablet_status(normalBe.Host, normalBe.HttpPort, tabletId) + logger.info("normal be show: code=" + code + ", out=" + out + ", err=" + err) + assertTrue(out.contains("[0-1]")) + assertTrue(out.contains("[2-2]")) + assertTrue(out.contains("[3-3]")) + assertTrue(out.contains("[4-4]")) + assertTrue(out.contains("[5-5]")) + assertTrue(out.contains("[6-6]")) + + // 1st inject be check rowsets + logger.info("1st inject be show:" + tabletId) + (code, out, err) = be_show_tablet_status(injectBe.Host, injectBe.HttpPort, tabletId) + logger.info("1st inject be show: code=" + code + ", out=" + out + ", err=" + err) + assertTrue(out.contains("[0-1]")) + assertTrue(out.contains("[2-2]")) + assertFalse(out.contains("[3-3]")) + assertFalse(out.contains("[4-4]")) + assertFalse(out.contains("[5-5]")) + assertFalse(out.contains("[6-6]")) + + set_be_param("enable_auto_clone_on_mow_publish_missing_version", "true", injectBe.Host, injectBe.HttpPort); + Thread.sleep(10000) + // submit clone task + sql """ INSERT INTO ${tableName} VALUES (6,0)""" + + sleep(10000) + + // 2nd inject be check rowsets + logger.info("2nd inject be show:" + tabletId) + (code, out, err) = be_show_tablet_status(injectBe.Host, injectBe.HttpPort, tabletId) + logger.info("2nd inject be show: code=" + code + ", out=" + out + ", err=" + err) + assertTrue(out.contains("[0-1]")) + assertTrue(out.contains("[2-2]")) + assertTrue(out.contains("[3-3]")) + assertTrue(out.contains("[4-4]")) + assertTrue(out.contains("[5-5]")) + assertTrue(out.contains("[6-6]")) + assertTrue(out.contains("[7-7]")) + + sql """ INSERT INTO ${tableName} VALUES (7,0)""" + + // 3rd inject be check rowsets + logger.info("3rd inject be show:" + tabletId) + (code, out, err) = be_show_tablet_status(injectBe.Host, injectBe.HttpPort, tabletId) + logger.info("3rd inject be show: code=" + code + ", out=" + out + ", err=" + err) + assertTrue(out.contains("[0-1]")) + assertTrue(out.contains("[2-2]")) + assertTrue(out.contains("[3-3]")) + assertTrue(out.contains("[4-4]")) + assertTrue(out.contains("[5-5]")) + assertTrue(out.contains("[6-6]")) + assertTrue(out.contains("[7-7]")) + assertTrue(out.contains("[8-8]")) + } finally { + if (injectBe != null) { + DebugPoint.disableDebugPoint(injectBe.Host, injectBe.HttpPort.toInteger(), NodeType.BE, "EnginePublishVersionTask.finish.random") + } + } + } +} \ No newline at end of file