From 0bfcd65287fd345885f8147ee3c8498113213790 Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Sun, 9 Mar 2025 17:45:43 +0100 Subject: [PATCH 1/2] Setting object_storage_max_nodes --- src/Interpreters/Cluster.cpp | 17 +++- src/Interpreters/Cluster.h | 4 +- src/Server/TCPHandler.cpp | 1 - src/Storages/IStorageCluster.cpp | 7 +- src/Storages/IStorageCluster.h | 3 +- tests/integration/test_s3_cluster/test.py | 97 ++++++++++++++++++++++- 6 files changed, 118 insertions(+), 11 deletions(-) diff --git a/src/Interpreters/Cluster.cpp b/src/Interpreters/Cluster.cpp index 00c67cb8a37c..540ed670384c 100644 --- a/src/Interpreters/Cluster.cpp +++ b/src/Interpreters/Cluster.cpp @@ -717,9 +717,9 @@ void Cluster::initMisc() } } -std::unique_ptr Cluster::getClusterWithReplicasAsShards(const Settings & settings, size_t max_replicas_from_shard) const +std::unique_ptr Cluster::getClusterWithReplicasAsShards(const Settings & settings, size_t max_replicas_from_shard, size_t max_hosts) const { - return std::unique_ptr{ new Cluster(ReplicasAsShardsTag{}, *this, settings, max_replicas_from_shard)}; + return std::unique_ptr{ new Cluster(ReplicasAsShardsTag{}, *this, settings, max_replicas_from_shard, max_hosts)}; } std::unique_ptr Cluster::getClusterWithSingleShard(size_t index) const @@ -768,7 +768,7 @@ void shuffleReplicas(std::vector & replicas, const Settings & } -Cluster::Cluster(Cluster::ReplicasAsShardsTag, const Cluster & from, const Settings & settings, size_t max_replicas_from_shard) +Cluster::Cluster(Cluster::ReplicasAsShardsTag, const Cluster & from, const Settings & settings, size_t max_replicas_from_shard, size_t max_hosts) { if (from.addresses_with_failover.empty()) throw Exception(ErrorCodes::LOGICAL_ERROR, "Cluster is empty"); @@ -835,6 +835,17 @@ Cluster::Cluster(Cluster::ReplicasAsShardsTag, const Cluster & from, const Setti secret = from.secret; name = from.name; + if (max_hosts > 0 && shards_info.size() > max_hosts) + { + pcg64_fast gen{randomSeed()}; + std::shuffle(shards_info.begin(), shards_info.end(), gen); + shards_info.resize(max_hosts); + + shard_num = 0; + for (auto & shard_info : shards_info) + shard_info.shard_num = ++shard_num; + } + initMisc(); } diff --git a/src/Interpreters/Cluster.h b/src/Interpreters/Cluster.h index e1868a18ad4f..b581963b08dc 100644 --- a/src/Interpreters/Cluster.h +++ b/src/Interpreters/Cluster.h @@ -266,7 +266,7 @@ class Cluster std::unique_ptr getClusterWithMultipleShards(const std::vector & indices) const; /// Get a new Cluster that contains all servers (all shards with all replicas) from existing cluster as independent shards. - std::unique_ptr getClusterWithReplicasAsShards(const Settings & settings, size_t max_replicas_from_shard = 0) const; + std::unique_ptr getClusterWithReplicasAsShards(const Settings & settings, size_t max_replicas_from_shard = 0, size_t max_hosts = 0) const; /// Returns false if cluster configuration doesn't allow to use it for cross-replication. /// NOTE: true does not mean, that it's actually a cross-replication cluster. @@ -292,7 +292,7 @@ class Cluster /// For getClusterWithReplicasAsShards implementation struct ReplicasAsShardsTag {}; - Cluster(ReplicasAsShardsTag, const Cluster & from, const Settings & settings, size_t max_replicas_from_shard); + Cluster(ReplicasAsShardsTag, const Cluster & from, const Settings & settings, size_t max_replicas_from_shard, size_t max_hosts); void addShard( const Settings & settings, diff --git a/src/Server/TCPHandler.cpp b/src/Server/TCPHandler.cpp index 526acc140c9c..acf615f7704b 100644 --- a/src/Server/TCPHandler.cpp +++ b/src/Server/TCPHandler.cpp @@ -35,7 +35,6 @@ #include #include #include -#include #include #include #include diff --git a/src/Storages/IStorageCluster.cpp b/src/Storages/IStorageCluster.cpp index 6b5e9f0e49ba..d7756162fd7f 100644 --- a/src/Storages/IStorageCluster.cpp +++ b/src/Storages/IStorageCluster.cpp @@ -34,6 +34,7 @@ namespace Setting extern const SettingsBool async_query_sending_for_remote; extern const SettingsBool async_socket_for_remote; extern const SettingsBool skip_unavailable_shards; + extern const SettingsUInt64 object_storage_max_nodes; } namespace ErrorCodes @@ -92,7 +93,7 @@ void IStorageCluster::read( storage_snapshot->check(column_names); updateBeforeRead(context); - auto cluster = getClusterImpl(context, cluster_name_from_settings); + auto cluster = getClusterImpl(context, cluster_name_from_settings, context->getSettingsRef()[Setting::object_storage_max_nodes]); /// Calculate the header. This is significant, because some columns could be thrown away in some cases like query with count(*) @@ -223,9 +224,9 @@ ContextPtr ReadFromCluster::updateSettings(const Settings & settings) return new_context; } -ClusterPtr IStorageCluster::getClusterImpl(ContextPtr context, const String & cluster_name_) +ClusterPtr IStorageCluster::getClusterImpl(ContextPtr context, const String & cluster_name_, size_t max_hosts) { - return context->getCluster(cluster_name_)->getClusterWithReplicasAsShards(context->getSettingsRef()); + return context->getCluster(cluster_name_)->getClusterWithReplicasAsShards(context->getSettingsRef(), /* max_replicas_from_shard */ 0, max_hosts); } } diff --git a/src/Storages/IStorageCluster.h b/src/Storages/IStorageCluster.h index 2992c3bc2497..9b53cd156a68 100644 --- a/src/Storages/IStorageCluster.h +++ b/src/Storages/IStorageCluster.h @@ -39,6 +39,7 @@ class IStorageCluster : public IStorage bool async_insert) override; ClusterPtr getCluster(ContextPtr context) const { return getClusterImpl(context, cluster_name); } + /// Query is needed for pruning by virtual columns (_file, _path) virtual RemoteQueryExecutor::Extension getTaskIteratorExtension(const ActionsDAG::Node * predicate, const ContextPtr & context) const = 0; @@ -79,7 +80,7 @@ class IStorageCluster : public IStorage } private: - static ClusterPtr getClusterImpl(ContextPtr context, const String & cluster_name_); + static ClusterPtr getClusterImpl(ContextPtr context, const String & cluster_name_, size_t max_hosts = 0); LoggerPtr log; String cluster_name; diff --git a/tests/integration/test_s3_cluster/test.py b/tests/integration/test_s3_cluster/test.py index 837be9cc6a0f..489fe100bbeb 100644 --- a/tests/integration/test_s3_cluster/test.py +++ b/tests/integration/test_s3_cluster/test.py @@ -72,7 +72,7 @@ def started_cluster(): "s0_0_0", main_configs=["configs/cluster.xml", "configs/named_collections.xml"], user_configs=["configs/users.xml"], - macros={"replica": "node1", "shard": "shard1"}, + macros={"replica": "replica1", "shard": "shard1"}, with_minio=True, with_zookeeper=True, ) @@ -912,3 +912,98 @@ def test_distributed_s3_table_engine(started_cluster): """ ) assert int(hosts_engine_distributed) == 3 + + +def test_cluster_hosts_limit(started_cluster): + node = started_cluster.instances["s0_0_0"] + + query_id_def = str(uuid.uuid4()) + resp_def = node.query( + """ + SELECT * from s3Cluster( + 'cluster_simple', + 'http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV', + 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') ORDER BY (name, value, polygon) + """, + query_id = query_id_def + ) + + # object_storage_max_nodes is greater than number of hosts in cluster + query_id_4_hosts = str(uuid.uuid4()) + resp_4_hosts = node.query( + """ + SELECT * from s3Cluster( + 'cluster_simple', + 'http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV', + 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') ORDER BY (name, value, polygon) + SETTINGS object_storage_max_nodes=4 + """, + query_id = query_id_4_hosts + ) + assert resp_def == resp_4_hosts + + # object_storage_max_nodes is equal number of hosts in cluster + query_id_3_hosts = str(uuid.uuid4()) + resp_3_hosts = node.query( + """ + SELECT * from s3Cluster( + 'cluster_simple', + 'http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV', + 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') ORDER BY (name, value, polygon) + SETTINGS object_storage_max_nodes=3 + """, + query_id = query_id_3_hosts + ) + assert resp_def == resp_3_hosts + + # object_storage_max_nodes is less than number of hosts in cluster + query_id_2_hosts = str(uuid.uuid4()) + resp_2_hosts = node.query( + """ + SELECT * from s3Cluster( + 'cluster_simple', + 'http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV', + 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') ORDER BY (name, value, polygon) + SETTINGS object_storage_max_nodes=2 + """, + query_id = query_id_2_hosts + ) + assert resp_def == resp_2_hosts + + node.query("SYSTEM FLUSH LOGS ON CLUSTER 'cluster_simple'") + + hosts_def = node.query( + f""" + SELECT uniq(hostname) + FROM clusterAllReplicas('cluster_simple', system.query_log) + WHERE type='QueryFinish' AND initial_query_id='{query_id_def}' AND query_id!='{query_id_def}' + """ + ) + assert int(hosts_def) == 3 + + hosts_4 = node.query( + f""" + SELECT uniq(hostname) + FROM clusterAllReplicas('cluster_simple', system.query_log) + WHERE type='QueryFinish' AND initial_query_id='{query_id_4_hosts}' AND query_id!='{query_id_4_hosts}' + """ + ) + assert int(hosts_4) == 3 + + hosts_3 = node.query( + f""" + SELECT uniq(hostname) + FROM clusterAllReplicas('cluster_simple', system.query_log) + WHERE type='QueryFinish' AND initial_query_id='{query_id_3_hosts}' AND query_id!='{query_id_3_hosts}' + """ + ) + assert int(hosts_3) == 3 + + hosts_2 = node.query( + f""" + SELECT uniq(hostname) + FROM clusterAllReplicas('cluster_simple', system.query_log) + WHERE type='QueryFinish' AND initial_query_id='{query_id_2_hosts}' AND query_id!='{query_id_2_hosts}' + """ + ) + assert int(hosts_2) == 2 From 5b70ae4285d1a1f5d1cf0153bdbd1d0a3ad8b000 Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Fri, 28 Mar 2025 12:16:07 +0100 Subject: [PATCH 2/2] Fix addresses_with_failover and slot_to_shard --- src/Interpreters/Cluster.cpp | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/Interpreters/Cluster.cpp b/src/Interpreters/Cluster.cpp index 540ed670384c..7366efa37642 100644 --- a/src/Interpreters/Cluster.cpp +++ b/src/Interpreters/Cluster.cpp @@ -811,9 +811,6 @@ Cluster::Cluster(Cluster::ReplicasAsShardsTag, const Cluster & from, const Setti info.pool = std::make_shared(ConnectionPoolPtrs{pool}, settings[Setting::load_balancing]); info.per_replica_pools = {std::move(pool)}; - addresses_with_failover.emplace_back(Addresses{address}); - - slot_to_shard.insert(std::end(slot_to_shard), info.weight, shards_info.size()); shards_info.emplace_back(std::move(info)); } }; @@ -846,6 +843,12 @@ Cluster::Cluster(Cluster::ReplicasAsShardsTag, const Cluster & from, const Setti shard_info.shard_num = ++shard_num; } + for (size_t i = 0; i < shards_info.size(); ++i) + { + addresses_with_failover.emplace_back(shards_info[i].local_addresses); + slot_to_shard.insert(std::end(slot_to_shard), shards_info[i].weight, i); + } + initMisc(); }