From 502e03c5ae32d5475c5f0b071eb1067ca9dea337 Mon Sep 17 00:00:00 2001 From: Vladimir Cherkasov Date: Wed, 22 Jan 2025 08:22:32 +0000 Subject: [PATCH 1/7] Merge pull request #74521 from RinChanNOWWW/fix-watch-leak Resolve the issue of leaking keeper watches. --- src/Interpreters/ClusterDiscovery.cpp | 12 +++++++++--- src/Interpreters/ClusterDiscovery.h | 4 ++++ 2 files changed, 13 insertions(+), 3 deletions(-) diff --git a/src/Interpreters/ClusterDiscovery.cpp b/src/Interpreters/ClusterDiscovery.cpp index ab2ec886e76d..90abb09d1913 100644 --- a/src/Interpreters/ClusterDiscovery.cpp +++ b/src/Interpreters/ClusterDiscovery.cpp @@ -166,6 +166,14 @@ ClusterDiscovery::ClusterDiscovery( LOG_TRACE(log, "Clusters in discovery mode: {}", fmt::join(clusters_info_names, ", ")); clusters_to_update = std::make_shared(clusters_info_names.begin(), clusters_info_names.end()); + + /// Init get_nodes_callbacks after init clusters_to_update. + for (const auto & e : clusters_info) + get_nodes_callbacks[e.first] = std::make_shared( + [cluster_name = e.first, my_clusters_to_update = clusters_to_update](auto) + { + my_clusters_to_update->set(cluster_name); + }); } /// List node in zookeper for cluster @@ -175,10 +183,8 @@ Strings ClusterDiscovery::getNodeNames(zkutil::ZooKeeperPtr & zk, int * version, bool set_callback) { - auto watch_callback = [cluster_name, my_clusters_to_update = clusters_to_update](auto) { my_clusters_to_update->set(cluster_name); }; - Coordination::Stat stat; - Strings nodes = zk->getChildrenWatch(getShardsListPath(zk_root), &stat, set_callback ? watch_callback : Coordination::WatchCallback{}); + Strings nodes = zk->getChildrenWatch(getShardsListPath(zk_root), &stat, set_callback ? get_nodes_callbacks[cluster_name] : Coordination::WatchCallbackPtr{}); if (version) *version = stat.cversion; return nodes; diff --git a/src/Interpreters/ClusterDiscovery.h b/src/Interpreters/ClusterDiscovery.h index b253473ce3e4..92d699656374 100644 --- a/src/Interpreters/ClusterDiscovery.h +++ b/src/Interpreters/ClusterDiscovery.h @@ -149,6 +149,10 @@ class ClusterDiscovery /// It prevents accessing to invalid object after ClusterDiscovery is destroyed. std::shared_ptr clusters_to_update; + /// Hold the callback pointers of each cluster. + /// To avoid registering callbacks for the same path multiple times. + std::unordered_map get_nodes_callbacks; + mutable std::mutex mutex; std::unordered_map cluster_impls; From 6e68a611dc2c5152660d66fee07b0156bfe29e85 Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Tue, 11 Feb 2025 14:16:39 +0100 Subject: [PATCH 2/7] Autodiscovery dynamic clusters --- src/Interpreters/ClusterDiscovery.cpp | 407 ++++++++++++++++-- src/Interpreters/ClusterDiscovery.h | 61 ++- .../config/config_dynamic_cluster1.xml | 11 + .../config/config_dynamic_cluster2.xml | 11 + .../config/config_dynamic_cluster3.xml | 11 + .../config_dynamic_cluster_observer.xml | 17 + .../test_dynamic_clusters.py | 95 ++++ 7 files changed, 563 insertions(+), 50 deletions(-) create mode 100644 tests/integration/test_cluster_discovery/config/config_dynamic_cluster1.xml create mode 100644 tests/integration/test_cluster_discovery/config/config_dynamic_cluster2.xml create mode 100644 tests/integration/test_cluster_discovery/config/config_dynamic_cluster3.xml create mode 100644 tests/integration/test_cluster_discovery/config/config_dynamic_cluster_observer.xml create mode 100644 tests/integration/test_cluster_discovery/test_dynamic_clusters.py diff --git a/src/Interpreters/ClusterDiscovery.cpp b/src/Interpreters/ClusterDiscovery.cpp index 90abb09d1913..e5e0088288d7 100644 --- a/src/Interpreters/ClusterDiscovery.cpp +++ b/src/Interpreters/ClusterDiscovery.cpp @@ -36,6 +36,7 @@ namespace ErrorCodes extern const int KEEPER_EXCEPTION; extern const int LOGICAL_ERROR; extern const int NO_ELEMENTS_IN_CONFIG; + extern const int EXCESSIVE_ELEMENT_IN_CONFIG; } namespace FailPoints @@ -110,6 +111,54 @@ class ClusterDiscovery::ConcurrentFlags bool stop_flag = false; }; +template +class ClusterDiscovery::Flags +{ +public: + void set(const T & key, bool value = true) + { + std::unique_lock lk(mu); + flags[key] = value; + any_need_update |= value; + cv.notify_one(); + } + + void remove(const T & key) + { + std::unique_lock lk(mu); + flags.erase(key); + } + + std::unordered_map wait(std::chrono::milliseconds timeout, bool & finished) + { + std::unique_lock lk(mu); + cv.wait_for(lk, timeout, [this]() -> bool { return any_need_update || stop_flag; }); + finished = stop_flag; + + any_need_update = false; + auto res = flags; + for (auto & f : flags) + f.second = false; + return res; + } + + void stop() + { + std::unique_lock lk(mu); + stop_flag = true; + cv.notify_one(); + } + +private: + std::condition_variable cv; + std::mutex mu; + + /// flag indicates that update is required + std::unordered_map flags; + bool any_need_update = true; + bool stop_flag = false; +}; + ClusterDiscovery::ClusterDiscovery( const Poco::Util::AbstractConfiguration & config, ContextPtr context_, @@ -123,23 +172,60 @@ ClusterDiscovery::ClusterDiscovery( Poco::Util::AbstractConfiguration::Keys config_keys; config.keys(config_prefix, config_keys); + multicluster_discovery_paths = std::make_shared>>(); + for (const auto & key : config_keys) { String cluster_config_prefix = config_prefix + "." + key + ".discovery"; if (!config.has(cluster_config_prefix)) continue; - String zk_name_and_root = config.getString(cluster_config_prefix + ".path"); - if (zk_name_and_root.empty()) - throw Exception(ErrorCodes::NO_ELEMENTS_IN_CONFIG, "ZooKeeper path for cluster '{}' is empty", key); - String zk_root = zkutil::extractZooKeeperPath(zk_name_and_root, true); - String zk_name = zkutil::extractZooKeeperName(zk_name_and_root); + String zk_name_and_root = config.getString(cluster_config_prefix + ".path", ""); + String zk_multicluster_name_and_root = config.getString(cluster_config_prefix + ".multicluster_root_path", ""); + bool is_observer = ConfigHelper::getBool(config, cluster_config_prefix + ".observer"); const auto & password = config.getString(cluster_config_prefix + ".password", ""); const auto & cluster_secret = config.getString(cluster_config_prefix + ".secret", ""); if (!password.empty() && !cluster_secret.empty()) throw Exception(ErrorCodes::NO_ELEMENTS_IN_CONFIG, "Both 'password' and 'secret' are specified for cluster '{}', only one option can be used at the same time", key); + if (!zk_multicluster_name_and_root.empty()) + { + if (!zk_name_and_root.empty()) + throw Exception( + ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG, + "Autodiscovery cluster node {} has 'path' and 'multicluster_root_path' subnodes simultaneously", + key); + if (!is_observer) + throw Exception( + ErrorCodes::NO_ELEMENTS_IN_CONFIG, + "Autodiscovery cluster node {} must be in observer mode", + key); + + String zk_root = zkutil::extractZooKeeperPath(zk_multicluster_name_and_root, true); + String zk_name = zkutil::extractZooKeeperName(zk_multicluster_name_and_root); + + auto mcd = std::make_shared( + /* zk_name */ zk_name, + /* zk_path */ zk_root, + /* is_secure_connection */ config.getBool(cluster_config_prefix + ".secure", false), + /* username */ config.getString(cluster_config_prefix + ".user", context->getUserName()), + /* password */ password, + /* cluster_secret */ cluster_secret + ); + + multicluster_discovery_paths->push_back( + mcd + ); + continue; + } + + if (zk_name_and_root.empty()) + throw Exception(ErrorCodes::NO_ELEMENTS_IN_CONFIG, "ZooKeeper path for cluster '{}' is empty", key); + + String zk_root = zkutil::extractZooKeeperPath(zk_name_and_root, true); + String zk_name = zkutil::extractZooKeeperName(zk_name_and_root); + clusters_info.emplace( key, ClusterInfo( @@ -153,7 +239,7 @@ ClusterDiscovery::ClusterDiscovery( /* port= */ context->getTCPPort(), /* secure= */ config.getBool(cluster_config_prefix + ".secure", false), /* shard_id= */ config.getUInt(cluster_config_prefix + ".shard", 0), - /* observer_mode= */ ConfigHelper::getBool(config, cluster_config_prefix + ".observer"), + /* observer_mode= */ is_observer, /* invisible= */ ConfigHelper::getBool(config, cluster_config_prefix + ".invisible") ) ); @@ -165,7 +251,8 @@ ClusterDiscovery::ClusterDiscovery( clusters_info_names.emplace_back(e.first); LOG_TRACE(log, "Clusters in discovery mode: {}", fmt::join(clusters_info_names, ", ")); - clusters_to_update = std::make_shared(clusters_info_names.begin(), clusters_info_names.end()); + clusters_to_update = std::make_shared(clusters_info_names.begin(), clusters_info_names.end()); + dynamic_clusters_to_update = std::make_shared(); /// Init get_nodes_callbacks after init clusters_to_update. for (const auto & e : clusters_info) @@ -181,10 +268,33 @@ Strings ClusterDiscovery::getNodeNames(zkutil::ZooKeeperPtr & zk, const String & zk_root, const String & cluster_name, int * version, - bool set_callback) + bool set_callback, + size_t zk_root_index) { Coordination::Stat stat; - Strings nodes = zk->getChildrenWatch(getShardsListPath(zk_root), &stat, set_callback ? get_nodes_callbacks[cluster_name] : Coordination::WatchCallbackPtr{}); + Strings nodes; + if (zk_root_index != 0) + { + auto dynamic_callback = get_dynamic_nodes_callbacks.find(cluster_name); + if (dynamic_callback == get_dynamic_nodes_callbacks.end()) + { + auto watch_dynamic_callback = std::make_shared([ + cluster_name, + zk_root_index, + my_clusters_to_update = dynamic_clusters_to_update, + my_discovery_paths = multicluster_discovery_paths + ](auto) + { + (*my_discovery_paths)[zk_root_index - 1]->need_update = true; + my_clusters_to_update->set(cluster_name); + }); + auto res = get_dynamic_nodes_callbacks.insert(std::make_pair(cluster_name, watch_dynamic_callback)); + dynamic_callback = res.first; + } + nodes = zk->getChildrenWatch(getShardsListPath(zk_root), &stat, set_callback ? *(dynamic_callback->second) : Coordination::WatchCallback{}); + } + else + nodes = zk->getChildrenWatch(getShardsListPath(zk_root), &stat, set_callback ? get_nodes_callbacks[cluster_name] : Coordination::WatchCallbackPtr{}); if (version) *version = stat.cversion; return nodes; @@ -293,21 +403,20 @@ static bool contains(const Strings & list, const String & value) /// Reads data from zookeeper and tries to update cluster. /// Returns true on success (or no update required). -bool ClusterDiscovery::updateCluster(ClusterInfo & cluster_info) +bool ClusterDiscovery::upsertCluster(ClusterInfo & cluster_info) { LOG_DEBUG(log, "Updating cluster '{}'", cluster_info.name); auto zk = context->getDefaultOrAuxiliaryZooKeeper(cluster_info.zk_name); int start_version; - Strings node_uuids = getNodeNames(zk, cluster_info.zk_root, cluster_info.name, &start_version, false); + Strings node_uuids = getNodeNames(zk, cluster_info.zk_root, cluster_info.name, &start_version, false, cluster_info.zk_root_index); auto & nodes_info = cluster_info.nodes_info; - auto on_exit = [this, start_version, &zk, &cluster_info, &nodes_info]() { /// in case of successful update we still need to check if configuration of cluster still valid and also set watch callback int current_version; - getNodeNames(zk, cluster_info.zk_root, cluster_info.name, ¤t_version, true); + getNodeNames(zk, cluster_info.zk_root, cluster_info.name, ¤t_version, true, cluster_info.zk_root_index); if (current_version != start_version) { @@ -339,24 +448,36 @@ bool ClusterDiscovery::updateCluster(ClusterInfo & cluster_info) } nodes_info = getNodes(zk, cluster_info.zk_root, node_uuids); - if (nodes_info.empty()) - { - LOG_WARNING(log, "Can't get nodes info for '{}'", cluster_info.name); - return false; - } if (bool ok = on_exit(); !ok) return false; LOG_DEBUG(log, "Updating system.clusters record for '{}' with {} nodes", cluster_info.name, cluster_info.nodes_info.size()); - auto cluster = makeCluster(cluster_info); - - std::lock_guard lock(mutex); - cluster_impls[cluster_info.name] = cluster; + if (nodes_info.empty()) + { + removeCluster(cluster_info.name); + } + else + { + auto cluster = makeCluster(cluster_info); + std::lock_guard lock(mutex); + cluster_impls[cluster_info.name] = cluster; + } return true; } +void ClusterDiscovery::removeCluster(const String & name) +{ + { + std::lock_guard lock(mutex); + cluster_impls.erase(name); + } + dynamic_clusters_info.erase(name); + dynamic_clusters_to_update->remove(name); + LOG_DEBUG(log, "Dynamic cluster '{}' removed successfully", name); +} + void ClusterDiscovery::registerInZk(zkutil::ZooKeeperPtr & zk, ClusterInfo & info) { /// Create root node in observer mode not to get 'No node' error @@ -394,19 +515,123 @@ void ClusterDiscovery::initialUpdate() { auto zk = context->getDefaultOrAuxiliaryZooKeeper(info.zk_name); registerInZk(zk, info); - if (!updateCluster(info)) + if (!upsertCluster(info)) { LOG_WARNING(log, "Error on initial cluster '{}' update, will retry in background", info.name); clusters_to_update->set(info.name); } } + + for (auto & path : (*multicluster_discovery_paths)) + { + auto zk = context->getDefaultOrAuxiliaryZooKeeper(path->zk_name); + + zk->createAncestors(path->zk_path); + zk->createIfNotExists(path->zk_path, ""); + + auto watch_callback = [&path](auto) { path->need_update = true; }; + zk->getChildrenWatch(path->zk_path, nullptr, watch_callback); + } + + findDynamicClusters(dynamic_clusters_info); + + for (auto & [_, info] : dynamic_clusters_info) + { + auto zk = context->getDefaultOrAuxiliaryZooKeeper(info.zk_name); + if (!upsertCluster(info)) + { + LOG_WARNING(log, "Error on initial dynamic cluster '{}' update, will retry in background", info.name); + dynamic_clusters_to_update->set(info.name); + } + else + dynamic_clusters_to_update->set(info.name, false); + } + LOG_DEBUG(log, "Initialized"); is_initialized = true; } +void ClusterDiscovery::findDynamicClusters( + std::unordered_map & info, + std::unordered_set * unchanged_roots) +{ + using namespace std::chrono_literals; + + constexpr auto force_update_interval = 2min; + + size_t zk_root_index = 0; + + for (auto & path : (*multicluster_discovery_paths)) + { + ++zk_root_index; + + if (unchanged_roots) + { + if (!path->need_update.exchange(false)) + { + /// force updating periodically + bool force_update = path->watch.elapsedSeconds() > std::chrono::seconds(force_update_interval).count(); + if (!force_update) + { + unchanged_roots->insert(zk_root_index); + continue; + } + } + } + + auto zk = context->getDefaultOrAuxiliaryZooKeeper(path->zk_name); + + auto clusters = zk->getChildren(path->zk_path); + + for (const auto & cluster : clusters) + { + if (clusters_info.count(cluster)) + { /// Not a warning - node can register itsefs in one cluster and discover other clusters + LOG_TRACE(log, "Found dynamic duplicate of cluster '{}' in config and Keeper, skipped", cluster); + continue; + } + + if (info.count(cluster)) + { /// Possible with several root paths, it's a configuration error + LOG_WARNING(log, "Found dynamic duplicate of cluster '{}' in Keeper, skipped record by path {}:{}", + cluster, path->zk_name, path->zk_path); + continue; + } + + auto shards = zk->getChildren(getShardsListPath(path->zk_path + "/" + cluster)); + if (shards.empty()) + { /// When node suddenly goes off (crush, etc), ephemeral record in Keeper is removed, but root for cluster is not + LOG_TRACE(log, "Empty cluster '{}' in Keeper, skipped", cluster); + continue; + } + + info.emplace( + cluster, + ClusterInfo( + /* name_= */ cluster, + /* zk_name_= */ path->zk_name, + /* zk_root_= */ path->zk_path + "/" + cluster, + /* host_name= */ "", + /* username= */ path->username, + /* password= */ path->password, + /* cluster_secret= */ path->cluster_secret, + /* port= */ context->getTCPPort(), + /* secure= */ path->is_secure_connection, + /* shard_id= */ 0, + /* observer_mode= */ true, + /* invisible= */ false, + /* zk_root_index= */ zk_root_index + ) + ); + } + + path->watch.restart(); + } +} + void ClusterDiscovery::start() { - if (clusters_info.empty()) + if (clusters_info.empty() && multicluster_discovery_paths->empty()) { LOG_DEBUG(log, "No defined clusters for discovery"); return; @@ -467,36 +692,130 @@ bool ClusterDiscovery::runMainThread(std::function up_to_date_callback) while (!finished) { bool all_up_to_date = true; - auto & clusters = clusters_to_update->wait(5s, finished); - for (auto & [cluster_name, need_update] : clusters) + + if (!multicluster_discovery_paths->empty()) { - auto cluster_info_it = clusters_info.find(cluster_name); - if (cluster_info_it == clusters_info.end()) + std::unordered_map new_dynamic_clusters_info; + std::unordered_set unchanged_roots; + findDynamicClusters(new_dynamic_clusters_info, &unchanged_roots); + + std::unordered_set clusters_to_insert; + std::unordered_set clusters_to_remove; + + for (const auto & [cluster_name, info] : dynamic_clusters_info) { - LOG_ERROR(log, "Unknown cluster '{}'", cluster_name); - continue; + auto p = new_dynamic_clusters_info.find(cluster_name); + if (p != new_dynamic_clusters_info.end()) + new_dynamic_clusters_info.erase(p); + else + { + if (!unchanged_roots.count(info.zk_root_index)) + clusters_to_remove.insert(cluster_name); + } } - auto & cluster_info = cluster_info_it->second; + /// new_dynamic_clusters_info now contains only new clusters + + for (const auto & [cluster_name, _] : new_dynamic_clusters_info) + clusters_to_insert.insert(cluster_name); + + for (const auto & cluster_name : clusters_to_remove) + removeCluster(cluster_name); - if (!need_update.exchange(false)) + dynamic_clusters_info.merge(new_dynamic_clusters_info); + + auto clusters = dynamic_clusters_to_update->wait(5s, finished); + for (auto & [cluster_name, need_update] : clusters) { - /// force updating periodically - bool force_update = cluster_info.watch.elapsedSeconds() > std::chrono::seconds(force_update_interval).count(); - if (!force_update) - continue; + auto cluster_info_it = clusters_info.find(cluster_name); + if (cluster_info_it == clusters_info.end()) + { + cluster_info_it = dynamic_clusters_info.find(cluster_name); + if (cluster_info_it == dynamic_clusters_info.end()) + { + LOG_ERROR(log, "Unknown cluster '{}'", cluster_name); + continue; + } + } + auto & cluster_info = cluster_info_it->second; + + if (!need_update) + { + /// force updating periodically + bool force_update = cluster_info.watch.elapsedSeconds() > std::chrono::seconds(force_update_interval).count(); + if (!force_update) + continue; + } + + if (upsertCluster(cluster_info)) + { + cluster_info.watch.restart(); + } + else + { + all_up_to_date = false; + } } - if (updateCluster(cluster_info)) + for (const auto & cluster_name : clusters_to_insert) { - cluster_info.watch.restart(); - LOG_DEBUG(log, "Cluster '{}' updated successfully", cluster_name); + auto cluster_info_it = dynamic_clusters_info.find(cluster_name); + if (cluster_info_it == dynamic_clusters_info.end()) + { + LOG_ERROR(log, "Unknown dynamic cluster '{}'", cluster_name); + continue; + } + auto & cluster_info = cluster_info_it->second; + if (upsertCluster(cluster_info)) + { + cluster_info.watch.restart(); + LOG_DEBUG(log, "Dynamic cluster '{}' inserted successfully", cluster_name); + } + else + { + all_up_to_date = false; + /// no need to trigger convar, will retry after timeout in `wait` + clusters_to_update->set(cluster_name); + LOG_WARNING(log, "Dynamic cluster '{}' wasn't inserted, will retry", cluster_name); + } } - else + } + + { + auto & clusters = clusters_to_update->wait(5s, finished); + for (auto & [cluster_name, need_update] : clusters) { - all_up_to_date = false; - /// no need to trigger convar, will retry after timeout in `wait` - need_update = true; - LOG_WARNING(log, "Cluster '{}' wasn't updated, will retry", cluster_name); + auto cluster_info_it = clusters_info.find(cluster_name); + if (cluster_info_it == clusters_info.end()) + { + cluster_info_it = dynamic_clusters_info.find(cluster_name); + if (cluster_info_it == dynamic_clusters_info.end()) + { + LOG_ERROR(log, "Unknown cluster '{}'", cluster_name); + continue; + } + } + auto & cluster_info = cluster_info_it->second; + + if (!need_update.exchange(false)) + { + /// force updating periodically + bool force_update = cluster_info.watch.elapsedSeconds() > std::chrono::seconds(force_update_interval).count(); + if (!force_update) + continue; + } + + if (upsertCluster(cluster_info)) + { + cluster_info.watch.restart(); + LOG_DEBUG(log, "Cluster '{}' updated successfully", cluster_name); + } + else + { + all_up_to_date = false; + /// no need to trigger convar, will retry after timeout in `wait` + need_update = true; + LOG_WARNING(log, "Cluster '{}' wasn't updated, will retry", cluster_name); + } } } diff --git a/src/Interpreters/ClusterDiscovery.h b/src/Interpreters/ClusterDiscovery.h index 92d699656374..df893ad956bf 100644 --- a/src/Interpreters/ClusterDiscovery.h +++ b/src/Interpreters/ClusterDiscovery.h @@ -88,6 +88,9 @@ class ClusterDiscovery String password; String cluster_secret; + /// For dynamic clusters, index+1 in multicluster_discovery_paths where cluster was found + size_t zk_root_index; + ClusterInfo(const String & name_, const String & zk_name_, const String & zk_root_, @@ -99,7 +102,9 @@ class ClusterDiscovery bool secure, size_t shard_id, bool observer_mode, - bool invisible) + bool invisible, + size_t zk_root_index_ = 0 + ) : name(name_) , zk_name(zk_name_) , zk_root(zk_root_) @@ -110,6 +115,7 @@ class ClusterDiscovery , username(username_) , password(password_) , cluster_secret(cluster_secret_) + , zk_root_index(zk_root_index_) { } }; @@ -121,19 +127,23 @@ class ClusterDiscovery Strings getNodeNames(zkutil::ZooKeeperPtr & zk, const String & zk_root, const String & cluster_name, - int * version = nullptr, - bool set_callback = true); + int * version, + bool set_callback, + size_t zk_root_index); NodesInfo getNodes(zkutil::ZooKeeperPtr & zk, const String & zk_root, const Strings & node_uuids); ClusterPtr makeCluster(const ClusterInfo & cluster_info); bool needUpdate(const Strings & node_uuids, const NodesInfo & nodes); - bool updateCluster(ClusterInfo & cluster_info); + bool upsertCluster(ClusterInfo & cluster_info); + void removeCluster(const String & key); bool runMainThread(std::function up_to_date_callback); void shutdown(); + void findDynamicClusters(std::unordered_map & info, std::unordered_set * unchanged_roots = nullptr); + /// cluster name -> cluster info (zk root, set of nodes) std::unordered_map clusters_info; @@ -142,16 +152,20 @@ class ClusterDiscovery String current_node_name; template class ConcurrentFlags; - using UpdateFlags = ConcurrentFlags; + using UpdateConcurrentFlags = ConcurrentFlags; + template class Flags; + using UpdateFlags = Flags; /// Cluster names to update. /// The `shared_ptr` is used because it's passed to watch callback. /// It prevents accessing to invalid object after ClusterDiscovery is destroyed. - std::shared_ptr clusters_to_update; + std::shared_ptr clusters_to_update; + std::shared_ptr dynamic_clusters_to_update; /// Hold the callback pointers of each cluster. /// To avoid registering callbacks for the same path multiple times. std::unordered_map get_nodes_callbacks; + std::unordered_map get_dynamic_nodes_callbacks; mutable std::mutex mutex; std::unordered_map cluster_impls; @@ -160,6 +174,41 @@ class ClusterDiscovery ThreadFromGlobalPool main_thread; LoggerPtr log; + + struct MulticlusterDiscovery + { + const String zk_name; + const String zk_path; + bool is_secure_connection; + String username; + String password; + String cluster_secret; + + Stopwatch watch; + mutable std::atomic_bool need_update; + + MulticlusterDiscovery(const String & zk_name_, + const String & zk_path_, + bool is_secure_connection_, + const String & username_, + const String & password_, + const String & cluster_secret_) + : zk_name(zk_name_) + , zk_path(zk_path_) + , is_secure_connection(is_secure_connection_) + , username(username_) + , password(password_) + , cluster_secret(cluster_secret_) + { + need_update = true; + } + + String getFullPath() const { return zk_name + ":" + zk_path; } + }; + + std::shared_ptr>> multicluster_discovery_paths; + mutable std::mutex dynamic_clusters_mutex; + std::unordered_map dynamic_clusters_info; }; } diff --git a/tests/integration/test_cluster_discovery/config/config_dynamic_cluster1.xml b/tests/integration/test_cluster_discovery/config/config_dynamic_cluster1.xml new file mode 100644 index 000000000000..451a43b08a83 --- /dev/null +++ b/tests/integration/test_cluster_discovery/config/config_dynamic_cluster1.xml @@ -0,0 +1,11 @@ + + 1 + + + + /clickhouse/discovery/test_auto_cluster1 + 1 + + + + diff --git a/tests/integration/test_cluster_discovery/config/config_dynamic_cluster2.xml b/tests/integration/test_cluster_discovery/config/config_dynamic_cluster2.xml new file mode 100644 index 000000000000..8b51241477e6 --- /dev/null +++ b/tests/integration/test_cluster_discovery/config/config_dynamic_cluster2.xml @@ -0,0 +1,11 @@ + + 1 + + + + /clickhouse/discovery/test_auto_cluster2 + 1 + + + + diff --git a/tests/integration/test_cluster_discovery/config/config_dynamic_cluster3.xml b/tests/integration/test_cluster_discovery/config/config_dynamic_cluster3.xml new file mode 100644 index 000000000000..35ccf5cadc74 --- /dev/null +++ b/tests/integration/test_cluster_discovery/config/config_dynamic_cluster3.xml @@ -0,0 +1,11 @@ + + 1 + + + + /clickhouse/discovery2/test_auto_cluster3 + 1 + + + + diff --git a/tests/integration/test_cluster_discovery/config/config_dynamic_cluster_observer.xml b/tests/integration/test_cluster_discovery/config/config_dynamic_cluster_observer.xml new file mode 100644 index 000000000000..224ce1ce66bc --- /dev/null +++ b/tests/integration/test_cluster_discovery/config/config_dynamic_cluster_observer.xml @@ -0,0 +1,17 @@ + + 1 + + + + + /clickhouse/discovery + + + + + + /clickhouse/discovery2 + + + + diff --git a/tests/integration/test_cluster_discovery/test_dynamic_clusters.py b/tests/integration/test_cluster_discovery/test_dynamic_clusters.py new file mode 100644 index 000000000000..ea16d8347605 --- /dev/null +++ b/tests/integration/test_cluster_discovery/test_dynamic_clusters.py @@ -0,0 +1,95 @@ +import time +import pytest +import json + +from helpers.cluster import ClickHouseCluster + +cluster = ClickHouseCluster(__file__) + +shard_configs = { + "node0": ["config/config_dynamic_cluster1.xml"], + "node1": ["config/config_dynamic_cluster1.xml"], + "node2": ["config/config_dynamic_cluster2.xml"], + "node3": ["config/config_dynamic_cluster3.xml"], + "node_observer": [], +} + +nodes = { + node_name: cluster.add_instance( + node_name, + main_configs=shard_config + ["config/config_dynamic_cluster_observer.xml"], + stay_alive=True, + with_zookeeper=True, + ) + for node_name, shard_config in shard_configs.items() +} + + +@pytest.fixture(scope="module") +def start_cluster(): + try: + cluster.start() + yield cluster + finally: + cluster.shutdown() + + +def get_clusters_hosts(node, expected): + count = 120 + while True: + resp = node.query("SELECT cluster, host_name FROM system.clusters ORDER BY cluster, host_name FORMAT JSONCompact") + hosts = json.loads(resp)["data"] + if count <= 0 or len(hosts) == expected: + break + time.sleep(1) + count -= 1 + return hosts + + +def test_cluster_discovery_startup_and_stop(start_cluster): + """ + Start cluster, check nodes count in system.clusters, + then stop/start some nodes and check that it (dis)appeared in cluster. + """ + + for node in ["node0", "node1", "node2", "node3", "node_observer"]: + nodes[node].stop_clickhouse() + + for node in ["node0", "node1", "node2", "node_observer"]: + nodes[node].start_clickhouse() + + expect1 = [["test_auto_cluster1", "node0"], ["test_auto_cluster1", "node1"], ["test_auto_cluster2", "node2"]] + for node in ["node0", "node1", "node2", "node_observer"]: + clusters = get_clusters_hosts(nodes[node], 3) + assert clusters == expect1 + + # Kill cluster test_auto_cluster2 + nodes["node2"].stop_clickhouse(kill=True) + + expect2 = [["test_auto_cluster1", "node0"], ["test_auto_cluster1", "node1"]] + for node in ["node0", "node1", "node_observer"]: + clusters = get_clusters_hosts(nodes[node], 2) + assert clusters == expect2 + + # Kill node in cluster test_auto_cluster1 + nodes["node1"].stop_clickhouse(kill=True) + + expect3 = [["test_auto_cluster1", "node0"]] + for node in ["node0", "node_observer"]: + clusters = get_clusters_hosts(nodes[node], 1) + assert clusters == expect3 + + # Restore cluster test_auto_cluster2 + nodes["node2"].start_clickhouse() + + expect4 = [["test_auto_cluster1", "node0"], ["test_auto_cluster2", "node2"]] + for node in ["node0", "node2", "node_observer"]: + clusters = get_clusters_hosts(nodes[node], 2) + assert clusters == expect4 + + nodes["node3"].start_clickhouse() + + expect5 = [["test_auto_cluster1", "node0"], ["test_auto_cluster2", "node2"], ["test_auto_cluster3", "node3"]] + for node in ["node0", "node2", "node3", "node_observer"]: + clusters = get_clusters_hosts(nodes[node], 3) + assert clusters == expect5 From 6c54b5a5ea551c63ce0ab0d33ad61025eaed3dcf Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Fri, 21 Feb 2025 11:30:41 +0100 Subject: [PATCH 3/7] More simple code --- src/Interpreters/ClusterDiscovery.cpp | 74 ++++++++++++--------------- src/Interpreters/ClusterDiscovery.h | 4 +- 2 files changed, 33 insertions(+), 45 deletions(-) diff --git a/src/Interpreters/ClusterDiscovery.cpp b/src/Interpreters/ClusterDiscovery.cpp index e5e0088288d7..bea33c83bf8d 100644 --- a/src/Interpreters/ClusterDiscovery.cpp +++ b/src/Interpreters/ClusterDiscovery.cpp @@ -273,10 +273,11 @@ Strings ClusterDiscovery::getNodeNames(zkutil::ZooKeeperPtr & zk, { Coordination::Stat stat; Strings nodes; - if (zk_root_index != 0) + + if (set_callback) { - auto dynamic_callback = get_dynamic_nodes_callbacks.find(cluster_name); - if (dynamic_callback == get_dynamic_nodes_callbacks.end()) + auto callback = get_nodes_callbacks.find(cluster_name); + if (callback == get_nodes_callbacks.end()) { auto watch_dynamic_callback = std::make_shared([ cluster_name, @@ -288,13 +289,14 @@ Strings ClusterDiscovery::getNodeNames(zkutil::ZooKeeperPtr & zk, (*my_discovery_paths)[zk_root_index - 1]->need_update = true; my_clusters_to_update->set(cluster_name); }); - auto res = get_dynamic_nodes_callbacks.insert(std::make_pair(cluster_name, watch_dynamic_callback)); - dynamic_callback = res.first; + auto res = get_nodes_callbacks.insert(std::make_pair(cluster_name, watch_dynamic_callback)); + callback = res.first; } - nodes = zk->getChildrenWatch(getShardsListPath(zk_root), &stat, set_callback ? *(dynamic_callback->second) : Coordination::WatchCallback{}); + nodes = zk->getChildrenWatch(getShardsListPath(zk_root), &stat, *(callback->second)); } else - nodes = zk->getChildrenWatch(getShardsListPath(zk_root), &stat, set_callback ? get_nodes_callbacks[cluster_name] : Coordination::WatchCallbackPtr{}); + nodes = zk->getChildrenWatch(getShardsListPath(zk_root), &stat, Coordination::WatchCallback{}); + if (version) *version = stat.cversion; return nodes; @@ -473,7 +475,7 @@ void ClusterDiscovery::removeCluster(const String & name) std::lock_guard lock(mutex); cluster_impls.erase(name); } - dynamic_clusters_info.erase(name); + clusters_info.erase(name); dynamic_clusters_to_update->remove(name); LOG_DEBUG(log, "Dynamic cluster '{}' removed successfully", name); } @@ -511,17 +513,6 @@ void ClusterDiscovery::initialUpdate() throw Exception(ErrorCodes::KEEPER_EXCEPTION, "Failpoint cluster_discovery_faults is triggered"); }); - for (auto & [_, info] : clusters_info) - { - auto zk = context->getDefaultOrAuxiliaryZooKeeper(info.zk_name); - registerInZk(zk, info); - if (!upsertCluster(info)) - { - LOG_WARNING(log, "Error on initial cluster '{}' update, will retry in background", info.name); - clusters_to_update->set(info.name); - } - } - for (auto & path : (*multicluster_discovery_paths)) { auto zk = context->getDefaultOrAuxiliaryZooKeeper(path->zk_name); @@ -533,17 +524,21 @@ void ClusterDiscovery::initialUpdate() zk->getChildrenWatch(path->zk_path, nullptr, watch_callback); } - findDynamicClusters(dynamic_clusters_info); + findDynamicClusters(clusters_info); - for (auto & [_, info] : dynamic_clusters_info) + for (auto & [_, info] : clusters_info) { auto zk = context->getDefaultOrAuxiliaryZooKeeper(info.zk_name); + registerInZk(zk, info); if (!upsertCluster(info)) { - LOG_WARNING(log, "Error on initial dynamic cluster '{}' update, will retry in background", info.name); - dynamic_clusters_to_update->set(info.name); + LOG_WARNING(log, "Error on initial cluster '{}' update, will retry in background", info.name); + if (info.zk_root_index) + dynamic_clusters_to_update->set(info.name); + else + clusters_to_update->set(info.name); } - else + else if (info.zk_root_index) dynamic_clusters_to_update->set(info.name, false); } @@ -585,7 +580,8 @@ void ClusterDiscovery::findDynamicClusters( for (const auto & cluster : clusters) { - if (clusters_info.count(cluster)) + auto p = clusters_info.find(cluster); + if (p != clusters_info.end() && !p->second.zk_root_index) { /// Not a warning - node can register itsefs in one cluster and discover other clusters LOG_TRACE(log, "Found dynamic duplicate of cluster '{}' in config and Keeper, skipped", cluster); continue; @@ -702,8 +698,10 @@ bool ClusterDiscovery::runMainThread(std::function up_to_date_callback) std::unordered_set clusters_to_insert; std::unordered_set clusters_to_remove; - for (const auto & [cluster_name, info] : dynamic_clusters_info) + for (const auto & [cluster_name, info] : clusters_info) { + if (!info.zk_root_index) + continue; auto p = new_dynamic_clusters_info.find(cluster_name); if (p != new_dynamic_clusters_info.end()) new_dynamic_clusters_info.erase(p); @@ -721,7 +719,7 @@ bool ClusterDiscovery::runMainThread(std::function up_to_date_callback) for (const auto & cluster_name : clusters_to_remove) removeCluster(cluster_name); - dynamic_clusters_info.merge(new_dynamic_clusters_info); + clusters_info.merge(new_dynamic_clusters_info); auto clusters = dynamic_clusters_to_update->wait(5s, finished); for (auto & [cluster_name, need_update] : clusters) @@ -729,15 +727,11 @@ bool ClusterDiscovery::runMainThread(std::function up_to_date_callback) auto cluster_info_it = clusters_info.find(cluster_name); if (cluster_info_it == clusters_info.end()) { - cluster_info_it = dynamic_clusters_info.find(cluster_name); - if (cluster_info_it == dynamic_clusters_info.end()) - { - LOG_ERROR(log, "Unknown cluster '{}'", cluster_name); - continue; - } + LOG_ERROR(log, "Unknown cluster '{}'", cluster_name); + continue; } - auto & cluster_info = cluster_info_it->second; + auto & cluster_info = cluster_info_it->second; if (!need_update) { /// force updating periodically @@ -758,8 +752,8 @@ bool ClusterDiscovery::runMainThread(std::function up_to_date_callback) for (const auto & cluster_name : clusters_to_insert) { - auto cluster_info_it = dynamic_clusters_info.find(cluster_name); - if (cluster_info_it == dynamic_clusters_info.end()) + auto cluster_info_it = clusters_info.find(cluster_name); + if (cluster_info_it == clusters_info.end()) { LOG_ERROR(log, "Unknown dynamic cluster '{}'", cluster_name); continue; @@ -787,12 +781,8 @@ bool ClusterDiscovery::runMainThread(std::function up_to_date_callback) auto cluster_info_it = clusters_info.find(cluster_name); if (cluster_info_it == clusters_info.end()) { - cluster_info_it = dynamic_clusters_info.find(cluster_name); - if (cluster_info_it == dynamic_clusters_info.end()) - { - LOG_ERROR(log, "Unknown cluster '{}'", cluster_name); - continue; - } + LOG_ERROR(log, "Unknown cluster '{}'", cluster_name); + continue; } auto & cluster_info = cluster_info_it->second; diff --git a/src/Interpreters/ClusterDiscovery.h b/src/Interpreters/ClusterDiscovery.h index df893ad956bf..3f1171f8fc13 100644 --- a/src/Interpreters/ClusterDiscovery.h +++ b/src/Interpreters/ClusterDiscovery.h @@ -89,6 +89,7 @@ class ClusterDiscovery String cluster_secret; /// For dynamic clusters, index+1 in multicluster_discovery_paths where cluster was found + /// 0 for static ckusters size_t zk_root_index; ClusterInfo(const String & name_, @@ -165,7 +166,6 @@ class ClusterDiscovery /// Hold the callback pointers of each cluster. /// To avoid registering callbacks for the same path multiple times. std::unordered_map get_nodes_callbacks; - std::unordered_map get_dynamic_nodes_callbacks; mutable std::mutex mutex; std::unordered_map cluster_impls; @@ -207,8 +207,6 @@ class ClusterDiscovery }; std::shared_ptr>> multicluster_discovery_paths; - mutable std::mutex dynamic_clusters_mutex; - std::unordered_map dynamic_clusters_info; }; } From 0575462b2521a0f6ce1bbb66dd0dff5637d93c67 Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Fri, 21 Feb 2025 13:29:47 +0100 Subject: [PATCH 4/7] Fix watching for empty cluster nodes --- src/Interpreters/ClusterDiscovery.cpp | 19 ++++++------------- .../test_dynamic_clusters.py | 2 +- 2 files changed, 7 insertions(+), 14 deletions(-) diff --git a/src/Interpreters/ClusterDiscovery.cpp b/src/Interpreters/ClusterDiscovery.cpp index bea33c83bf8d..b879051f4f28 100644 --- a/src/Interpreters/ClusterDiscovery.cpp +++ b/src/Interpreters/ClusterDiscovery.cpp @@ -459,13 +459,13 @@ bool ClusterDiscovery::upsertCluster(ClusterInfo & cluster_info) if (nodes_info.empty()) { removeCluster(cluster_info.name); + return on_exit(); } - else - { - auto cluster = makeCluster(cluster_info); - std::lock_guard lock(mutex); - cluster_impls[cluster_info.name] = cluster; - } + + auto cluster = makeCluster(cluster_info); + std::lock_guard lock(mutex); + cluster_impls[cluster_info.name] = cluster; + return true; } @@ -594,13 +594,6 @@ void ClusterDiscovery::findDynamicClusters( continue; } - auto shards = zk->getChildren(getShardsListPath(path->zk_path + "/" + cluster)); - if (shards.empty()) - { /// When node suddenly goes off (crush, etc), ephemeral record in Keeper is removed, but root for cluster is not - LOG_TRACE(log, "Empty cluster '{}' in Keeper, skipped", cluster); - continue; - } - info.emplace( cluster, ClusterInfo( diff --git a/tests/integration/test_cluster_discovery/test_dynamic_clusters.py b/tests/integration/test_cluster_discovery/test_dynamic_clusters.py index ea16d8347605..ece8012da19a 100644 --- a/tests/integration/test_cluster_discovery/test_dynamic_clusters.py +++ b/tests/integration/test_cluster_discovery/test_dynamic_clusters.py @@ -35,7 +35,7 @@ def start_cluster(): def get_clusters_hosts(node, expected): - count = 120 + count = 30 while True: resp = node.query("SELECT cluster, host_name FROM system.clusters ORDER BY cluster, host_name FORMAT JSONCompact") hosts = json.loads(resp)["data"] From 8fb51c7776f4fe9470814ea6d02366e431eccb7d Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Fri, 21 Feb 2025 15:22:05 +0100 Subject: [PATCH 5/7] Fix style and tidy build --- src/Interpreters/ClusterDiscovery.cpp | 14 +++++++++----- src/Interpreters/ClusterDiscovery.h | 2 +- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/src/Interpreters/ClusterDiscovery.cpp b/src/Interpreters/ClusterDiscovery.cpp index b879051f4f28..b0b3b70edf6a 100644 --- a/src/Interpreters/ClusterDiscovery.cpp +++ b/src/Interpreters/ClusterDiscovery.cpp @@ -118,6 +118,8 @@ class ClusterDiscovery::Flags void set(const T & key, bool value = true) { std::unique_lock lk(mu); + if (stop_flag) + return; flags[key] = value; any_need_update |= value; cv.notify_one(); @@ -126,7 +128,8 @@ class ClusterDiscovery::Flags void remove(const T & key) { std::unique_lock lk(mu); - flags.erase(key); + if (!stop_flag) + flags.erase(key); } std::unordered_map wait(std::chrono::milliseconds timeout, bool & finished) @@ -210,7 +213,7 @@ ClusterDiscovery::ClusterDiscovery( /* zk_path */ zk_root, /* is_secure_connection */ config.getBool(cluster_config_prefix + ".secure", false), /* username */ config.getString(cluster_config_prefix + ".user", context->getUserName()), - /* password */ password, + /* password */ password, /* cluster_secret */ cluster_secret ); @@ -555,7 +558,7 @@ void ClusterDiscovery::findDynamicClusters( constexpr auto force_update_interval = 2min; size_t zk_root_index = 0; - + for (auto & path : (*multicluster_discovery_paths)) { ++zk_root_index; @@ -587,7 +590,7 @@ void ClusterDiscovery::findDynamicClusters( continue; } - if (info.count(cluster)) + if (info.contains(cluster)) { /// Possible with several root paths, it's a configuration error LOG_WARNING(log, "Found dynamic duplicate of cluster '{}' in Keeper, skipped record by path {}:{}", cluster, path->zk_name, path->zk_path); @@ -700,7 +703,7 @@ bool ClusterDiscovery::runMainThread(std::function up_to_date_callback) new_dynamic_clusters_info.erase(p); else { - if (!unchanged_roots.count(info.zk_root_index)) + if (!unchanged_roots.contains(info.zk_root_index)) clusters_to_remove.insert(cluster_name); } } @@ -830,6 +833,7 @@ void ClusterDiscovery::shutdown() { LOG_DEBUG(log, "Shutting down"); clusters_to_update->stop(); + dynamic_clusters_to_update->stop(); if (main_thread.joinable()) main_thread.join(); diff --git a/src/Interpreters/ClusterDiscovery.h b/src/Interpreters/ClusterDiscovery.h index 3f1171f8fc13..a48f93d9c803 100644 --- a/src/Interpreters/ClusterDiscovery.h +++ b/src/Interpreters/ClusterDiscovery.h @@ -138,7 +138,7 @@ class ClusterDiscovery bool needUpdate(const Strings & node_uuids, const NodesInfo & nodes); bool upsertCluster(ClusterInfo & cluster_info); - void removeCluster(const String & key); + void removeCluster(const String & name); bool runMainThread(std::function up_to_date_callback); void shutdown(); From 5af952d21bff17e99a0206c866b06994f70f3129 Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Fri, 21 Feb 2025 21:34:13 +0100 Subject: [PATCH 6/7] Use single structure for update flags --- src/Interpreters/ClusterDiscovery.cpp | 231 ++++++++------------------ src/Interpreters/ClusterDiscovery.h | 3 +- 2 files changed, 70 insertions(+), 164 deletions(-) diff --git a/src/Interpreters/ClusterDiscovery.cpp b/src/Interpreters/ClusterDiscovery.cpp index b0b3b70edf6a..47dc3ead3191 100644 --- a/src/Interpreters/ClusterDiscovery.cpp +++ b/src/Interpreters/ClusterDiscovery.cpp @@ -34,7 +34,6 @@ namespace DB namespace ErrorCodes { extern const int KEEPER_EXCEPTION; - extern const int LOGICAL_ERROR; extern const int NO_ELEMENTS_IN_CONFIG; extern const int EXCESSIVE_ELEMENT_IN_CONFIG; } @@ -54,67 +53,17 @@ fs::path getShardsListPath(const String & zk_root) } -/* - * Holds boolean flags for fixed set of keys. - * Flags can be concurrently set from different threads, and consumer can wait for it. - */ template -class ClusterDiscovery::ConcurrentFlags +class ClusterDiscovery::Flags { public: template - ConcurrentFlags(It begin, It end) + Flags(It begin, It end) { for (auto it = begin; it != end; ++it) flags.emplace(*it, false); } - void set(const T & key) - { - auto it = flags.find(key); - if (it == flags.end()) - throw DB::Exception(ErrorCodes::LOGICAL_ERROR, "Unknown value '{}'", key); - it->second = true; - any_need_update = true; - cv.notify_one(); - } - - /// waits unit at least one flag is set - /// caller should handle all set flags (or set it again manually) - /// note: keys of returen map should not be changed! - /// @param finished - output parameter indicates that stop() was called - std::unordered_map & wait(std::chrono::milliseconds timeout, bool & finished) - { - std::unique_lock lk(mu); - cv.wait_for(lk, timeout, [this]() -> bool { return any_need_update || stop_flag; }); - finished = stop_flag; - - /// all set flags expected to be handled by caller - any_need_update = false; - return flags; - } - - void stop() - { - std::unique_lock lk(mu); - stop_flag = true; - cv.notify_one(); - } - -private: - std::condition_variable cv; - std::mutex mu; - - /// flag indicates that update is required - std::unordered_map flags; - std::atomic_bool any_need_update = true; - bool stop_flag = false; -}; - -template -class ClusterDiscovery::Flags -{ -public: void set(const T & key, bool value = true) { std::unique_lock lk(mu); @@ -254,8 +203,7 @@ ClusterDiscovery::ClusterDiscovery( clusters_info_names.emplace_back(e.first); LOG_TRACE(log, "Clusters in discovery mode: {}", fmt::join(clusters_info_names, ", ")); - clusters_to_update = std::make_shared(clusters_info_names.begin(), clusters_info_names.end()); - dynamic_clusters_to_update = std::make_shared(); + clusters_to_update = std::make_shared(clusters_info_names.begin(), clusters_info_names.end()); /// Init get_nodes_callbacks after init clusters_to_update. for (const auto & e : clusters_info) @@ -285,7 +233,7 @@ Strings ClusterDiscovery::getNodeNames(zkutil::ZooKeeperPtr & zk, auto watch_dynamic_callback = std::make_shared([ cluster_name, zk_root_index, - my_clusters_to_update = dynamic_clusters_to_update, + my_clusters_to_update = clusters_to_update, my_discovery_paths = multicluster_discovery_paths ](auto) { @@ -479,7 +427,7 @@ void ClusterDiscovery::removeCluster(const String & name) cluster_impls.erase(name); } clusters_info.erase(name); - dynamic_clusters_to_update->remove(name); + clusters_to_update->remove(name); LOG_DEBUG(log, "Dynamic cluster '{}' removed successfully", name); } @@ -536,13 +484,10 @@ void ClusterDiscovery::initialUpdate() if (!upsertCluster(info)) { LOG_WARNING(log, "Error on initial cluster '{}' update, will retry in background", info.name); - if (info.zk_root_index) - dynamic_clusters_to_update->set(info.name); - else - clusters_to_update->set(info.name); + clusters_to_update->set(info.name); } else if (info.zk_root_index) - dynamic_clusters_to_update->set(info.name, false); + clusters_to_update->set(info.name, false); } LOG_DEBUG(log, "Initialized"); @@ -685,123 +630,86 @@ bool ClusterDiscovery::runMainThread(std::function up_to_date_callback) { bool all_up_to_date = true; - if (!multicluster_discovery_paths->empty()) - { - std::unordered_map new_dynamic_clusters_info; - std::unordered_set unchanged_roots; - findDynamicClusters(new_dynamic_clusters_info, &unchanged_roots); + std::unordered_map new_dynamic_clusters_info; + std::unordered_set unchanged_roots; + findDynamicClusters(new_dynamic_clusters_info, &unchanged_roots); - std::unordered_set clusters_to_insert; - std::unordered_set clusters_to_remove; + std::unordered_set clusters_to_insert; + std::unordered_set clusters_to_remove; - for (const auto & [cluster_name, info] : clusters_info) + for (const auto & [cluster_name, info] : clusters_info) + { + if (!info.zk_root_index) + continue; + if (!new_dynamic_clusters_info.erase(cluster_name)) { - if (!info.zk_root_index) - continue; - auto p = new_dynamic_clusters_info.find(cluster_name); - if (p != new_dynamic_clusters_info.end()) - new_dynamic_clusters_info.erase(p); - else - { - if (!unchanged_roots.contains(info.zk_root_index)) - clusters_to_remove.insert(cluster_name); - } + if (!unchanged_roots.contains(info.zk_root_index)) + clusters_to_remove.insert(cluster_name); } - /// new_dynamic_clusters_info now contains only new clusters + } + /// new_dynamic_clusters_info now contains only new clusters - for (const auto & [cluster_name, _] : new_dynamic_clusters_info) - clusters_to_insert.insert(cluster_name); + for (const auto & [cluster_name, _] : new_dynamic_clusters_info) + clusters_to_insert.insert(cluster_name); - for (const auto & cluster_name : clusters_to_remove) - removeCluster(cluster_name); + for (const auto & cluster_name : clusters_to_remove) + removeCluster(cluster_name); - clusters_info.merge(new_dynamic_clusters_info); + clusters_info.merge(new_dynamic_clusters_info); - auto clusters = dynamic_clusters_to_update->wait(5s, finished); - for (auto & [cluster_name, need_update] : clusters) + auto clusters = clusters_to_update->wait(5s, finished); + for (auto & [cluster_name, need_update] : clusters) + { + auto cluster_info_it = clusters_info.find(cluster_name); + if (cluster_info_it == clusters_info.end()) { - auto cluster_info_it = clusters_info.find(cluster_name); - if (cluster_info_it == clusters_info.end()) - { - LOG_ERROR(log, "Unknown cluster '{}'", cluster_name); - continue; - } - - auto & cluster_info = cluster_info_it->second; - if (!need_update) - { - /// force updating periodically - bool force_update = cluster_info.watch.elapsedSeconds() > std::chrono::seconds(force_update_interval).count(); - if (!force_update) - continue; - } - - if (upsertCluster(cluster_info)) - { - cluster_info.watch.restart(); - } - else - { - all_up_to_date = false; - } + LOG_ERROR(log, "Unknown cluster '{}'", cluster_name); + continue; } - for (const auto & cluster_name : clusters_to_insert) + auto & cluster_info = cluster_info_it->second; + if (!need_update) { - auto cluster_info_it = clusters_info.find(cluster_name); - if (cluster_info_it == clusters_info.end()) - { - LOG_ERROR(log, "Unknown dynamic cluster '{}'", cluster_name); + /// force updating periodically + bool force_update = cluster_info.watch.elapsedSeconds() > std::chrono::seconds(force_update_interval).count(); + if (!force_update) continue; - } - auto & cluster_info = cluster_info_it->second; - if (upsertCluster(cluster_info)) - { - cluster_info.watch.restart(); - LOG_DEBUG(log, "Dynamic cluster '{}' inserted successfully", cluster_name); - } - else - { - all_up_to_date = false; - /// no need to trigger convar, will retry after timeout in `wait` - clusters_to_update->set(cluster_name); - LOG_WARNING(log, "Dynamic cluster '{}' wasn't inserted, will retry", cluster_name); - } + } + + if (upsertCluster(cluster_info)) + { + cluster_info.watch.restart(); + LOG_DEBUG(log, "Cluster '{}' updated successfully", cluster_name); + } + else + { + all_up_to_date = false; + /// no need to trigger convar, will retry after timeout in `wait` + need_update = true; + LOG_WARNING(log, "Cluster '{}' wasn't updated, will retry", cluster_name); } } + for (const auto & cluster_name : clusters_to_insert) { - auto & clusters = clusters_to_update->wait(5s, finished); - for (auto & [cluster_name, need_update] : clusters) + auto cluster_info_it = clusters_info.find(cluster_name); + if (cluster_info_it == clusters_info.end()) { - auto cluster_info_it = clusters_info.find(cluster_name); - if (cluster_info_it == clusters_info.end()) - { - LOG_ERROR(log, "Unknown cluster '{}'", cluster_name); - continue; - } - auto & cluster_info = cluster_info_it->second; - - if (!need_update.exchange(false)) - { - /// force updating periodically - bool force_update = cluster_info.watch.elapsedSeconds() > std::chrono::seconds(force_update_interval).count(); - if (!force_update) - continue; - } - - if (upsertCluster(cluster_info)) - { - cluster_info.watch.restart(); - LOG_DEBUG(log, "Cluster '{}' updated successfully", cluster_name); - } - else - { - all_up_to_date = false; - /// no need to trigger convar, will retry after timeout in `wait` - need_update = true; - LOG_WARNING(log, "Cluster '{}' wasn't updated, will retry", cluster_name); - } + LOG_ERROR(log, "Unknown dynamic cluster '{}'", cluster_name); + continue; + } + auto & cluster_info = cluster_info_it->second; + if (upsertCluster(cluster_info)) + { + cluster_info.watch.restart(); + LOG_DEBUG(log, "Dynamic cluster '{}' inserted successfully", cluster_name); + } + else + { + all_up_to_date = false; + /// no need to trigger convar, will retry after timeout in `wait` + clusters_to_update->set(cluster_name); + LOG_WARNING(log, "Dynamic cluster '{}' wasn't inserted, will retry", cluster_name); } } @@ -833,7 +741,6 @@ void ClusterDiscovery::shutdown() { LOG_DEBUG(log, "Shutting down"); clusters_to_update->stop(); - dynamic_clusters_to_update->stop(); if (main_thread.joinable()) main_thread.join(); diff --git a/src/Interpreters/ClusterDiscovery.h b/src/Interpreters/ClusterDiscovery.h index a48f93d9c803..0b3ac2298c94 100644 --- a/src/Interpreters/ClusterDiscovery.h +++ b/src/Interpreters/ClusterDiscovery.h @@ -160,8 +160,7 @@ class ClusterDiscovery /// Cluster names to update. /// The `shared_ptr` is used because it's passed to watch callback. /// It prevents accessing to invalid object after ClusterDiscovery is destroyed. - std::shared_ptr clusters_to_update; - std::shared_ptr dynamic_clusters_to_update; + std::shared_ptr clusters_to_update; /// Hold the callback pointers of each cluster. /// To avoid registering callbacks for the same path multiple times. From 7a6921f9ec15d7ecdcdc3ced18f8931b35da1352 Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Fri, 21 Feb 2025 23:32:32 +0100 Subject: [PATCH 7/7] Fix use string after remove --- src/Interpreters/ClusterDiscovery.cpp | 11 ++++++----- src/Interpreters/ClusterDiscovery.h | 2 -- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/src/Interpreters/ClusterDiscovery.cpp b/src/Interpreters/ClusterDiscovery.cpp index 47dc3ead3191..75a5fa8f44fc 100644 --- a/src/Interpreters/ClusterDiscovery.cpp +++ b/src/Interpreters/ClusterDiscovery.cpp @@ -409,8 +409,10 @@ bool ClusterDiscovery::upsertCluster(ClusterInfo & cluster_info) if (nodes_info.empty()) { - removeCluster(cluster_info.name); - return on_exit(); + String name = cluster_info.name; + /// cluster_info removed inside removeCluster, can't use reference to name. + removeCluster(name); + return true; } auto cluster = makeCluster(cluster_info); @@ -426,7 +428,6 @@ void ClusterDiscovery::removeCluster(const String & name) std::lock_guard lock(mutex); cluster_impls.erase(name); } - clusters_info.erase(name); clusters_to_update->remove(name); LOG_DEBUG(log, "Dynamic cluster '{}' removed successfully", name); } @@ -658,7 +659,7 @@ bool ClusterDiscovery::runMainThread(std::function up_to_date_callback) clusters_info.merge(new_dynamic_clusters_info); auto clusters = clusters_to_update->wait(5s, finished); - for (auto & [cluster_name, need_update] : clusters) + for (const auto & [cluster_name, need_update] : clusters) { auto cluster_info_it = clusters_info.find(cluster_name); if (cluster_info_it == clusters_info.end()) @@ -685,7 +686,7 @@ bool ClusterDiscovery::runMainThread(std::function up_to_date_callback) { all_up_to_date = false; /// no need to trigger convar, will retry after timeout in `wait` - need_update = true; + clusters_to_update->set(cluster_name); LOG_WARNING(log, "Cluster '{}' wasn't updated, will retry", cluster_name); } } diff --git a/src/Interpreters/ClusterDiscovery.h b/src/Interpreters/ClusterDiscovery.h index 0b3ac2298c94..b8f8176ed47c 100644 --- a/src/Interpreters/ClusterDiscovery.h +++ b/src/Interpreters/ClusterDiscovery.h @@ -152,8 +152,6 @@ class ClusterDiscovery String current_node_name; - template class ConcurrentFlags; - using UpdateConcurrentFlags = ConcurrentFlags; template class Flags; using UpdateFlags = Flags;