From 15148db01faef6f30b87f365370a6f5be82f4dda Mon Sep 17 00:00:00 2001 From: Vasily Nemkov Date: Thu, 29 May 2025 10:05:43 +0200 Subject: [PATCH 1/6] Merge pull request #797 from Altinity/feature/antalya-25.3/rendezvous_hashing 25.3 Antalya port of #709, #760 - Rendezvous hashing --- src/Storages/IStorageCluster.cpp | 4 +-- src/Storages/IStorageCluster.h | 5 ++- .../StorageObjectStorageCluster.cpp | 20 +++++++++-- .../StorageObjectStorageCluster.h | 4 ++- ...rageObjectStorageStableTaskDistributor.cpp | 33 +++++++++++++++++-- ...torageObjectStorageStableTaskDistributor.h | 5 ++- src/Storages/StorageDistributed.cpp | 3 +- src/Storages/StorageFileCluster.cpp | 5 ++- src/Storages/StorageFileCluster.h | 5 ++- src/Storages/StorageReplicatedMergeTree.cpp | 3 +- src/Storages/StorageURLCluster.cpp | 5 ++- src/Storages/StorageURLCluster.h | 5 ++- 12 files changed, 78 insertions(+), 19 deletions(-) diff --git a/src/Storages/IStorageCluster.cpp b/src/Storages/IStorageCluster.cpp index 7c8d410cee53..12c503fa7570 100644 --- a/src/Storages/IStorageCluster.cpp +++ b/src/Storages/IStorageCluster.cpp @@ -117,7 +117,7 @@ void ReadFromCluster::createExtension(const ActionsDAG::Node * predicate, size_t if (extension) return; - extension = storage->getTaskIteratorExtension(predicate, context, number_of_replicas); + extension = storage->getTaskIteratorExtension(predicate, context, cluster); } /// The code executes on initiator @@ -196,7 +196,7 @@ void ReadFromCluster::initializePipeline(QueryPipelineBuilder & pipeline, const if (current_settings[Setting::max_parallel_replicas] > 1) max_replicas_to_use = std::min(max_replicas_to_use, current_settings[Setting::max_parallel_replicas].value); - createExtension(nullptr, max_replicas_to_use); + createExtension(nullptr); for (const auto & shard_info : cluster->getShardsInfo()) { diff --git a/src/Storages/IStorageCluster.h b/src/Storages/IStorageCluster.h index 6017613c7bea..e4ff87f9b5e4 100644 --- a/src/Storages/IStorageCluster.h +++ b/src/Storages/IStorageCluster.h @@ -35,7 +35,10 @@ class IStorageCluster : public IStorage ClusterPtr getCluster(ContextPtr context) const; /// Query is needed for pruning by virtual columns (_file, _path) - virtual RemoteQueryExecutor::Extension getTaskIteratorExtension(const ActionsDAG::Node * predicate, const ContextPtr & context, size_t number_of_replicas) const = 0; + virtual RemoteQueryExecutor::Extension getTaskIteratorExtension( + const ActionsDAG::Node * predicate, + const ContextPtr & context, + ClusterPtr cluster) const = 0; QueryProcessingStage::Enum getQueryProcessingStage(ContextPtr, QueryProcessingStage::Enum, const StorageSnapshotPtr &, SelectQueryInfo &) const override; diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp index 424b0d5bfb52..46aef403ff85 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp @@ -18,7 +18,6 @@ #include #include - namespace DB { namespace Setting @@ -214,13 +213,28 @@ void StorageObjectStorageCluster::updateQueryToSendIfNeeded( } RemoteQueryExecutor::Extension StorageObjectStorageCluster::getTaskIteratorExtension( - const ActionsDAG::Node * predicate, const ContextPtr & local_context, const size_t number_of_replicas) const + const ActionsDAG::Node * predicate, + const ContextPtr & local_context, + ClusterPtr cluster) const { auto iterator = StorageObjectStorageSource::createFileIterator( configuration, configuration->getQuerySettings(local_context), object_storage, /* distributed_processing */false, local_context, predicate, {}, virtual_columns, hive_partition_columns_to_read_from_file_path, nullptr, local_context->getFileProgressCallback(), /*ignore_archive_globs=*/true, /*skip_object_metadata=*/true); - auto task_distributor = std::make_shared(iterator, number_of_replicas); + std::vector ids_of_hosts; + for (const auto & shard : cluster->getShardsInfo()) + { + if (shard.per_replica_pools.empty()) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Cluster {} with empty shard {}", cluster->getName(), shard.shard_num); + for (const auto & replica : shard.per_replica_pools) + { + if (!replica) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Cluster {}, shard {} with empty node", cluster->getName(), shard.shard_num); + ids_of_hosts.push_back(replica->getAddress()); + } + } + + auto task_distributor = std::make_shared(iterator, ids_of_hosts); auto callback = std::make_shared( [task_distributor](size_t number_of_current_replica) mutable -> String diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.h b/src/Storages/ObjectStorage/StorageObjectStorageCluster.h index 1a557143076a..85a584c5e0d6 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.h +++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.h @@ -25,7 +25,9 @@ class StorageObjectStorageCluster : public IStorageCluster std::string getName() const override; RemoteQueryExecutor::Extension getTaskIteratorExtension( - const ActionsDAG::Node * predicate, const ContextPtr & context, size_t number_of_replicas) const override; + const ActionsDAG::Node * predicate, + const ContextPtr & context, + ClusterPtr cluster) const override; String getPathSample(ContextPtr context); diff --git a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp index c5c86a47babb..a5b85454d69d 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp @@ -13,9 +13,10 @@ namespace ErrorCodes StorageObjectStorageStableTaskDistributor::StorageObjectStorageStableTaskDistributor( std::shared_ptr iterator_, - size_t number_of_replicas_) + std::vector ids_of_nodes_) : iterator(std::move(iterator_)) - , connection_to_files(number_of_replicas_) + , connection_to_files(ids_of_nodes_.size()) + , ids_of_nodes(ids_of_nodes_) , iterator_exhausted(false) { } @@ -45,13 +46,39 @@ std::optional StorageObjectStorageStableTaskDistributor::getNextTask(siz size_t StorageObjectStorageStableTaskDistributor::getReplicaForFile(const String & file_path) { - return ConsistentHashing(sipHash64(file_path), connection_to_files.size()); + size_t nodes_count = ids_of_nodes.size(); + + /// Trivial case + if (nodes_count < 2) + return 0; + + /// Rendezvous hashing + size_t best_id = 0; + UInt64 best_weight = sipHash64(ids_of_nodes[0] + file_path); + for (size_t id = 1; id < nodes_count; ++id) + { + UInt64 weight = sipHash64(ids_of_nodes[id] + file_path); + if (weight > best_weight) + { + best_weight = weight; + best_id = id; + } + } + return best_id; } std::optional StorageObjectStorageStableTaskDistributor::getPreQueuedFile(size_t number_of_current_replica) { std::lock_guard lock(mutex); + if (connection_to_files.size() <= number_of_current_replica) + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "Replica number {} is out of range. Expected range: [0, {})", + number_of_current_replica, + connection_to_files.size() + ); + auto & files = connection_to_files[number_of_current_replica]; while (!files.empty()) diff --git a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h index 46e805a59603..678ff4372f5f 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h +++ b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h @@ -4,6 +4,7 @@ #include #include #include +#include #include #include #include @@ -17,7 +18,7 @@ class StorageObjectStorageStableTaskDistributor public: StorageObjectStorageStableTaskDistributor( std::shared_ptr iterator_, - size_t number_of_replicas_); + std::vector ids_of_nodes_); std::optional getNextTask(size_t number_of_current_replica); @@ -32,6 +33,8 @@ class StorageObjectStorageStableTaskDistributor std::vector> connection_to_files; std::unordered_set unprocessed_files; + std::vector ids_of_nodes; + std::mutex mutex; bool iterator_exhausted = false; diff --git a/src/Storages/StorageDistributed.cpp b/src/Storages/StorageDistributed.cpp index d3f6407c88b1..0ae57212024d 100644 --- a/src/Storages/StorageDistributed.cpp +++ b/src/Storages/StorageDistributed.cpp @@ -1317,8 +1317,7 @@ std::optional StorageDistributed::distributedWriteFromClusterStor const auto cluster = getCluster(); /// Select query is needed for pruining on virtual columns - auto number_of_replicas = static_cast(cluster->getShardsInfo().size()); - auto extension = src_storage_cluster.getTaskIteratorExtension(predicate, local_context, number_of_replicas); + auto extension = src_storage_cluster.getTaskIteratorExtension(predicate, local_context, cluster); /// Here we take addresses from destination cluster and assume source table exists on these nodes size_t replica_index = 0; diff --git a/src/Storages/StorageFileCluster.cpp b/src/Storages/StorageFileCluster.cpp index 1946fdc8c77b..0155c3a08cec 100644 --- a/src/Storages/StorageFileCluster.cpp +++ b/src/Storages/StorageFileCluster.cpp @@ -100,7 +100,10 @@ void StorageFileCluster::updateQueryToSendIfNeeded(DB::ASTPtr & query, const Sto ); } -RemoteQueryExecutor::Extension StorageFileCluster::getTaskIteratorExtension(const ActionsDAG::Node * predicate, const ContextPtr & context, const size_t) const +RemoteQueryExecutor::Extension StorageFileCluster::getTaskIteratorExtension( + const ActionsDAG::Node * predicate, + const ContextPtr & context, + ClusterPtr) const { auto iterator = std::make_shared(paths, std::nullopt, predicate, getVirtualsList(), hive_partition_columns_to_read_from_file_path, context); auto callback = std::make_shared([iter = std::move(iterator)](size_t) mutable -> String { return iter->next(); }); diff --git a/src/Storages/StorageFileCluster.h b/src/Storages/StorageFileCluster.h index 2cbd82ba4000..5fb08a48eec6 100644 --- a/src/Storages/StorageFileCluster.h +++ b/src/Storages/StorageFileCluster.h @@ -27,7 +27,10 @@ class StorageFileCluster : public IStorageCluster const ConstraintsDescription & constraints_); std::string getName() const override { return "FileCluster"; } - RemoteQueryExecutor::Extension getTaskIteratorExtension(const ActionsDAG::Node * predicate, const ContextPtr & context, size_t number_of_replicas) const override; + RemoteQueryExecutor::Extension getTaskIteratorExtension( + const ActionsDAG::Node * predicate, + const ContextPtr & context, + ClusterPtr) const override; private: void updateQueryToSendIfNeeded(ASTPtr & query, const StorageSnapshotPtr & storage_snapshot, const ContextPtr & context) override; diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index f349e9317db0..9ec0231a8c71 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -6086,8 +6086,7 @@ std::optional StorageReplicatedMergeTree::distributedWriteFromClu ContextMutablePtr query_context = Context::createCopy(local_context); query_context->increaseDistributedDepth(); - auto number_of_replicas = static_cast(src_cluster->getShardsAddresses().size()); - auto extension = src_storage_cluster->getTaskIteratorExtension(nullptr, local_context, number_of_replicas); + auto extension = src_storage_cluster->getTaskIteratorExtension(nullptr, local_context, src_cluster); size_t replica_index = 0; for (const auto & replicas : src_cluster->getShardsAddresses()) diff --git a/src/Storages/StorageURLCluster.cpp b/src/Storages/StorageURLCluster.cpp index fff85117b37a..12ae6b32e018 100644 --- a/src/Storages/StorageURLCluster.cpp +++ b/src/Storages/StorageURLCluster.cpp @@ -129,7 +129,10 @@ void StorageURLCluster::updateQueryToSendIfNeeded(ASTPtr & query, const StorageS ); } -RemoteQueryExecutor::Extension StorageURLCluster::getTaskIteratorExtension(const ActionsDAG::Node * predicate, const ContextPtr & context, size_t) const +RemoteQueryExecutor::Extension StorageURLCluster::getTaskIteratorExtension( + const ActionsDAG::Node * predicate, + const ContextPtr & context, + ClusterPtr) const { auto iterator = std::make_shared( uri, context->getSettingsRef()[Setting::glob_expansion_max_elements], predicate, getVirtualsList(), hive_partition_columns_to_read_from_file_path, context); diff --git a/src/Storages/StorageURLCluster.h b/src/Storages/StorageURLCluster.h index 8349f7594294..e360eb22d701 100644 --- a/src/Storages/StorageURLCluster.h +++ b/src/Storages/StorageURLCluster.h @@ -30,7 +30,10 @@ class StorageURLCluster : public IStorageCluster const StorageURL::Configuration & configuration_); std::string getName() const override { return "URLCluster"; } - RemoteQueryExecutor::Extension getTaskIteratorExtension(const ActionsDAG::Node * predicate, const ContextPtr & context, size_t number_of_replicas) const override; + RemoteQueryExecutor::Extension getTaskIteratorExtension( + const ActionsDAG::Node * predicate, + const ContextPtr & context, + ClusterPtr) const override; private: void updateQueryToSendIfNeeded(ASTPtr & query, const StorageSnapshotPtr & storage_snapshot, const ContextPtr & context) override; From 83bf85cd8f956cd4136f12b92213fcd5b338d0d4 Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Thu, 17 Jul 2025 18:46:25 +0200 Subject: [PATCH 2/6] Fix tests --- src/Storages/IStorageCluster.cpp | 10 +++------- .../configs/named_collections.xml | 2 +- tests/integration/test_s3_cache_locality/test.py | 10 ++++++---- .../configs/config.d/named_collections.xml | 2 +- 4 files changed, 11 insertions(+), 13 deletions(-) diff --git a/src/Storages/IStorageCluster.cpp b/src/Storages/IStorageCluster.cpp index 12c503fa7570..b7f1c9760322 100644 --- a/src/Storages/IStorageCluster.cpp +++ b/src/Storages/IStorageCluster.cpp @@ -93,7 +93,7 @@ class ReadFromCluster : public SourceStepWithFilter std::optional extension; - void createExtension(const ActionsDAG::Node * predicate, size_t number_of_replicas); + void createExtension(const ActionsDAG::Node * predicate); ContextPtr updateSettings(const Settings & settings); }; @@ -105,14 +105,10 @@ void ReadFromCluster::applyFilters(ActionDAGNodes added_filter_nodes) if (filter_actions_dag) predicate = filter_actions_dag->getOutputs().at(0); - auto max_replicas_to_use = static_cast(cluster->getShardsInfo().size()); - if (context->getSettingsRef()[Setting::max_parallel_replicas] > 1) - max_replicas_to_use = std::min(max_replicas_to_use, context->getSettingsRef()[Setting::max_parallel_replicas].value); - - createExtension(predicate, max_replicas_to_use); + createExtension(predicate); } -void ReadFromCluster::createExtension(const ActionsDAG::Node * predicate, size_t number_of_replicas) +void ReadFromCluster::createExtension(const ActionsDAG::Node * predicate) { if (extension) return; diff --git a/tests/integration/test_s3_cache_locality/configs/named_collections.xml b/tests/integration/test_s3_cache_locality/configs/named_collections.xml index 511078d6f0d9..6994aa3f5e77 100644 --- a/tests/integration/test_s3_cache_locality/configs/named_collections.xml +++ b/tests/integration/test_s3_cache_locality/configs/named_collections.xml @@ -3,7 +3,7 @@ http://minio1:9001/root/data/* minio - minio123 + ClickHouse_Minio_P@ssw0rd CSV> diff --git a/tests/integration/test_s3_cache_locality/test.py b/tests/integration/test_s3_cache_locality/test.py index a2020d7e0568..c72755a90965 100644 --- a/tests/integration/test_s3_cache_locality/test.py +++ b/tests/integration/test_s3_cache_locality/test.py @@ -7,6 +7,8 @@ import pytest from helpers.cluster import ClickHouseCluster +from helpers.config_cluster import minio_secret_key + logging.getLogger().setLevel(logging.INFO) logging.getLogger().addHandler(logging.StreamHandler()) @@ -81,7 +83,7 @@ def check_s3_gets(cluster, node, expected_result, cluster_first, cluster_second, result_first = node.query( f""" SELECT count(*) - FROM s3Cluster('{cluster_first}', 'http://minio1:9001/root/data/generated/*', 'minio', 'minio123', 'CSV', 'a String, b UInt64') + FROM s3Cluster('{cluster_first}', 'http://minio1:9001/root/data/generated/*', 'minio', '{minio_secret_key}', 'CSV', 'a String, b UInt64') WHERE b=42 SETTINGS enable_filesystem_cache={enable_filesystem_cache}, @@ -95,7 +97,7 @@ def check_s3_gets(cluster, node, expected_result, cluster_first, cluster_second, result_second = node.query( f""" SELECT count(*) - FROM s3Cluster('{cluster_second}', 'http://minio1:9001/root/data/generated/*', 'minio', 'minio123', 'CSV', 'a String, b UInt64') + FROM s3Cluster('{cluster_second}', 'http://minio1:9001/root/data/generated/*', 'minio', '{minio_secret_key}', 'CSV', 'a String, b UInt64') WHERE b=42 SETTINGS enable_filesystem_cache={enable_filesystem_cache}, @@ -148,9 +150,9 @@ def test_cache_locality(started_cluster): node = started_cluster.instances["clickhouse0"] expected_result = node.query( - """ + f""" SELECT count(*) - FROM s3('http://minio1:9001/root/data/generated/*', 'minio', 'minio123', 'CSV', 'a String, b UInt64') + FROM s3('http://minio1:9001/root/data/generated/*', 'minio', '{minio_secret_key}', 'CSV', 'a String, b UInt64') WHERE b=42 """ ) diff --git a/tests/integration/test_storage_iceberg/configs/config.d/named_collections.xml b/tests/integration/test_storage_iceberg/configs/config.d/named_collections.xml index 892665d3934d..77f9e7e4b17b 100644 --- a/tests/integration/test_storage_iceberg/configs/config.d/named_collections.xml +++ b/tests/integration/test_storage_iceberg/configs/config.d/named_collections.xml @@ -14,7 +14,7 @@ http://minio1:9001/root/ minio - minio123 + ClickHouse_Minio_P@ssw0rd s3 From d4da5f33bd8f63605a7768c0caa6ac8f459e1304 Mon Sep 17 00:00:00 2001 From: Vasily Nemkov Date: Thu, 3 Jul 2025 02:10:48 +0200 Subject: [PATCH 3/6] Merge pull request #866 from Altinity/feature/cache_locality_lock Antalya 25.3: lock_object_storage_task_distribution_ms setting --- src/Core/Settings.cpp | 3 + src/Core/SettingsChangesHistory.cpp | 5 ++ src/Disks/ObjectStorages/IObjectStorage.cpp | 36 +++++++++ src/Disks/ObjectStorages/IObjectStorage.h | 32 +++++++- .../StorageObjectStorageCluster.cpp | 19 ++++- .../StorageObjectStorageSource.cpp | 25 ++++++- ...rageObjectStorageStableTaskDistributor.cpp | 59 +++++++++++++-- ...torageObjectStorageStableTaskDistributor.h | 14 +++- .../test_s3_cache_locality/test.py | 75 ++++++++++--------- 9 files changed, 215 insertions(+), 53 deletions(-) diff --git a/src/Core/Settings.cpp b/src/Core/Settings.cpp index 77db44fe35d7..087739f7cc51 100644 --- a/src/Core/Settings.cpp +++ b/src/Core/Settings.cpp @@ -6860,6 +6860,9 @@ Default number of tasks for parallel reading in distributed query. Tasks are spr DECLARE(Bool, distributed_plan_optimize_exchanges, true, R"( Removes unnecessary exchanges in distributed query plan. Disable it for debugging. )", 0) \ + DECLARE(UInt64, lock_object_storage_task_distribution_ms, 0, R"( +In object storage distribution queries do not distibute tasks on non-prefetched nodes until prefetched node is active. +)", EXPERIMENTAL) \ DECLARE(String, distributed_plan_force_exchange_kind, "", R"( Force specified kind of Exchange operators between distributed query stages. diff --git a/src/Core/SettingsChangesHistory.cpp b/src/Core/SettingsChangesHistory.cpp index 6e6bfe580117..b3c5dce8751b 100644 --- a/src/Core/SettingsChangesHistory.cpp +++ b/src/Core/SettingsChangesHistory.cpp @@ -67,6 +67,11 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory() /// controls new feature and it's 'true' by default, use 'false' as previous_value). /// It's used to implement `compatibility` setting (see https://github.com/ClickHouse/ClickHouse/issues/35972) /// Note: please check if the key already exists to prevent duplicate entries. + addSettingsChanges(settings_changes_history, "25.6.5.20000", + { + // Altinity Antalya modifications atop of 25.6 + {"lock_object_storage_task_distribution_ms", 0, 0, "New setting."}, + }); addSettingsChanges(settings_changes_history, "25.6", { {"output_format_native_use_flattened_dynamic_and_json_serialization", false, false, "Add flattened Dynamic/JSON serializations to Native format"}, diff --git a/src/Disks/ObjectStorages/IObjectStorage.cpp b/src/Disks/ObjectStorages/IObjectStorage.cpp index da10528bbedf..bdaa391dd729 100644 --- a/src/Disks/ObjectStorages/IObjectStorage.cpp +++ b/src/Disks/ObjectStorages/IObjectStorage.cpp @@ -8,6 +8,10 @@ #include #include +#include +#include +#include + namespace DB { @@ -97,4 +101,36 @@ WriteSettings IObjectStorage::patchSettings(const WriteSettings & write_settings return write_settings; } +RelativePathWithMetadata::CommandInTaskResponse::CommandInTaskResponse(const std::string & task) +{ + Poco::JSON::Parser parser; + try + { + auto json = parser.parse(task).extract(); + if (!json) + return; + + successfully_parsed = true; + + if (json->has("retry_after_us")) + retry_after_us = json->getValue("retry_after_us"); + } + catch (const Poco::JSON::JSONException &) + { /// Not a JSON + return; + } +} + +std::string RelativePathWithMetadata::CommandInTaskResponse::to_string() const +{ + Poco::JSON::Object json; + if (retry_after_us.has_value()) + json.set("retry_after_us", retry_after_us.value()); + + std::ostringstream oss; + oss.exceptions(std::ios::failbit); + Poco::JSON::Stringifier::stringify(json, oss); + return oss.str(); +} + } diff --git a/src/Disks/ObjectStorages/IObjectStorage.h b/src/Disks/ObjectStorages/IObjectStorage.h index 0d9464b1ad7e..5754c9ec7af6 100644 --- a/src/Disks/ObjectStorages/IObjectStorage.h +++ b/src/Disks/ObjectStorages/IObjectStorage.h @@ -83,15 +83,37 @@ struct ObjectMetadata struct RelativePathWithMetadata { + class CommandInTaskResponse + { + public: + CommandInTaskResponse() {} + CommandInTaskResponse(const std::string & task); + + bool is_parsed() const { return successfully_parsed; } + void set_retry_after_us(Poco::Timestamp::TimeDiff time_us) { retry_after_us = time_us; } + + std::string to_string() const; + + std::optional get_retry_after_us() const { return retry_after_us; } + + private: + bool successfully_parsed = false; + std::optional retry_after_us; + }; + String relative_path; std::optional metadata; + CommandInTaskResponse command; RelativePathWithMetadata() = default; - explicit RelativePathWithMetadata(String relative_path_, std::optional metadata_ = std::nullopt) - : relative_path(std::move(relative_path_)) - , metadata(std::move(metadata_)) - {} + explicit RelativePathWithMetadata(const String & task_string, std::optional metadata_ = std::nullopt) + : metadata(std::move(metadata_)) + , command(task_string) + { + if (!command.is_parsed()) + relative_path = task_string; + } virtual ~RelativePathWithMetadata() = default; @@ -100,6 +122,8 @@ struct RelativePathWithMetadata virtual bool isArchive() const { return false; } virtual std::string getPathToArchive() const { throw Exception(ErrorCodes::LOGICAL_ERROR, "Not an archive"); } virtual size_t fileSizeInArchive() const { throw Exception(ErrorCodes::LOGICAL_ERROR, "Not an archive"); } + + const CommandInTaskResponse & getCommand() const { return command; } }; struct ObjectKeyWithMetadata diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp index 46aef403ff85..51b40b55c72d 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp @@ -23,12 +23,14 @@ namespace DB namespace Setting { extern const SettingsBool use_hive_partitioning; + extern const SettingsUInt64 lock_object_storage_task_distribution_ms; } namespace ErrorCodes { extern const int LOGICAL_ERROR; extern const int INCORRECT_DATA; + extern const int INVALID_SETTING_VALUE; } String StorageObjectStorageCluster::getPathSample(ContextPtr context) @@ -234,7 +236,22 @@ RemoteQueryExecutor::Extension StorageObjectStorageCluster::getTaskIteratorExten } } - auto task_distributor = std::make_shared(iterator, ids_of_hosts); + uint64_t lock_object_storage_task_distribution_ms = local_context->getSettingsRef()[Setting::lock_object_storage_task_distribution_ms]; + + /// Check value to avoid negative result after conversion in microseconds. + /// Poco::Timestamp::TimeDiff is signed int 64. + static const uint64_t lock_object_storage_task_distribution_ms_max = 0x0020000000000000ULL; + if (lock_object_storage_task_distribution_ms > lock_object_storage_task_distribution_ms_max) + throw Exception(ErrorCodes::INVALID_SETTING_VALUE, + "Value lock_object_storage_task_distribution_ms is too big: {}, allowed maximum is {}", + lock_object_storage_task_distribution_ms, + lock_object_storage_task_distribution_ms_max + ); + + auto task_distributor = std::make_shared( + iterator, + ids_of_hosts, + lock_object_storage_task_distribution_ms); auto callback = std::make_shared( [task_distributor](size_t number_of_current_replica) mutable -> String diff --git a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp index b4ae724abd03..28f61c2e2fa6 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp @@ -19,6 +19,7 @@ #include #include #include +#include #include #include #include @@ -432,11 +433,31 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade ObjectInfoPtr object_info; auto query_settings = configuration->getQuerySettings(context_); + bool not_a_path = false; + do { + not_a_path = false; object_info = file_iterator->next(processor); - if (!object_info || object_info->getPath().empty()) + if (!object_info) + return {}; + + if (object_info->getCommand().is_parsed()) + { + auto retry_after_us = object_info->getCommand().get_retry_after_us(); + if (retry_after_us.has_value()) + { + not_a_path = true; + /// TODO: Make asyncronous waiting without sleep in thread + /// Now this sleep is on executor node in worker thread + /// Does not block query initiator + sleepForMicroseconds(std::min(Poco::Timestamp::TimeDiff(100000ul), retry_after_us.value())); + continue; + } + } + + if (object_info->getPath().empty()) return {}; if (!object_info->metadata) @@ -455,7 +476,7 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade object_info->metadata = object_storage->getObjectMetadata(path); } } - while (query_settings.skip_empty_files && object_info->metadata->size_bytes == 0); + while (not_a_path || (query_settings.skip_empty_files && object_info->metadata->size_bytes == 0)); QueryPipelineBuilder builder; std::shared_ptr source; diff --git a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp index a5b85454d69d..5f709f2ec1d9 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp @@ -13,10 +13,12 @@ namespace ErrorCodes StorageObjectStorageStableTaskDistributor::StorageObjectStorageStableTaskDistributor( std::shared_ptr iterator_, - std::vector ids_of_nodes_) + std::vector ids_of_nodes_, + uint64_t lock_object_storage_task_distribution_ms_) : iterator(std::move(iterator_)) , connection_to_files(ids_of_nodes_.size()) , ids_of_nodes(ids_of_nodes_) + , lock_object_storage_task_distribution_us(lock_object_storage_task_distribution_ms_ * 1000) , iterator_exhausted(false) { } @@ -32,6 +34,8 @@ std::optional StorageObjectStorageStableTaskDistributor::getNextTask(siz number_of_current_replica, connection_to_files.size() - 1); + saveLastNodeActivity(number_of_current_replica); + // 1. Check pre-queued files first if (auto file = getPreQueuedFile(number_of_current_replica)) return file; @@ -156,7 +160,7 @@ std::optional StorageObjectStorageStableTaskDistributor::getMatchingFile // Queue file for its assigned replica { std::lock_guard lock(mutex); - unprocessed_files.insert(file_path); + unprocessed_files[file_path] = number_of_current_replica; connection_to_files[file_replica_idx].push_back(file_path); } } @@ -166,25 +170,64 @@ std::optional StorageObjectStorageStableTaskDistributor::getMatchingFile std::optional StorageObjectStorageStableTaskDistributor::getAnyUnprocessedFile(size_t number_of_current_replica) { + /// Limit time of node activity to keep task in queue + Poco::Timestamp activity_limit; + Poco::Timestamp oldest_activity; + if (lock_object_storage_task_distribution_us > 0) + activity_limit -= lock_object_storage_task_distribution_us; + std::lock_guard lock(mutex); if (!unprocessed_files.empty()) { auto it = unprocessed_files.begin(); - String next_file = *it; - unprocessed_files.erase(it); + + while (it != unprocessed_files.end()) + { + auto last_activity = last_node_activity.find(it->second); + if (lock_object_storage_task_distribution_us <= 0 + || last_activity == last_node_activity.end() + || activity_limit > last_activity->second) + { + String next_file = it->first; + unprocessed_files.erase(it); + + LOG_TRACE( + log, + "Iterator exhausted. Assigning unprocessed file {} to replica {}", + next_file, + number_of_current_replica + ); + + return next_file; + } + + oldest_activity = std::min(oldest_activity, last_activity->second); + ++it; + } LOG_TRACE( log, - "Iterator exhausted. Assigning unprocessed file {} to replica {}", - next_file, - number_of_current_replica + "No unprocessed file for replica {}, need to retry after {} us", + number_of_current_replica, + oldest_activity - activity_limit ); - return next_file; + /// All unprocessed files owned by alive replicas with recenlty activity + /// Need to retry after (oldest_activity - activity_limit) microseconds + RelativePathWithMetadata::CommandInTaskResponse response; + response.set_retry_after_us(oldest_activity - activity_limit); + return response.to_string(); } return std::nullopt; } +void StorageObjectStorageStableTaskDistributor::saveLastNodeActivity(size_t number_of_current_replica) +{ + Poco::Timestamp now; + std::lock_guard lock(mutex); + last_node_activity[number_of_current_replica] = now; +} + } diff --git a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h index 678ff4372f5f..2132ba95a752 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h +++ b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h @@ -5,7 +5,11 @@ #include #include #include + +#include + #include +#include #include #include #include @@ -18,7 +22,8 @@ class StorageObjectStorageStableTaskDistributor public: StorageObjectStorageStableTaskDistributor( std::shared_ptr iterator_, - std::vector ids_of_nodes_); + std::vector ids_of_nodes_, + uint64_t lock_object_storage_task_distribution_ms_); std::optional getNextTask(size_t number_of_current_replica); @@ -28,12 +33,17 @@ class StorageObjectStorageStableTaskDistributor std::optional getMatchingFileFromIterator(size_t number_of_current_replica); std::optional getAnyUnprocessedFile(size_t number_of_current_replica); + void saveLastNodeActivity(size_t number_of_current_replica); + std::shared_ptr iterator; std::vector> connection_to_files; - std::unordered_set unprocessed_files; + /// Map of unprocessed files in format filename => number of prefetched replica + std::unordered_map unprocessed_files; std::vector ids_of_nodes; + std::unordered_map last_node_activity; + Poco::Timestamp::TimeDiff lock_object_storage_task_distribution_us; std::mutex mutex; bool iterator_exhausted = false; diff --git a/tests/integration/test_s3_cache_locality/test.py b/tests/integration/test_s3_cache_locality/test.py index c72755a90965..da85e78a5643 100644 --- a/tests/integration/test_s3_cache_locality/test.py +++ b/tests/integration/test_s3_cache_locality/test.py @@ -16,12 +16,12 @@ SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__)) -def create_buckets_s3(cluster): +def create_buckets_s3(cluster, files=1000): minio = cluster.minio_client s3_data = [] - for file_number in range(1000): + for file_number in range(files): file_name = f"data/generated/file_{file_number}.csv" os.makedirs(os.path.join(SCRIPT_DIR, "data/generated/"), exist_ok=True) s3_data.append(file_name) @@ -61,6 +61,7 @@ def started_cluster(): macros={"replica": f"clickhouse{i}"}, with_minio=True, with_zookeeper=True, + stay_alive=True, ) logging.info("Starting cluster...") @@ -71,13 +72,22 @@ def started_cluster(): yield cluster finally: - shutil.rmtree(os.path.join(SCRIPT_DIR, "data/generated/")) + shutil.rmtree(os.path.join(SCRIPT_DIR, "data/generated/"), ignore_errors=True) cluster.shutdown() -def check_s3_gets(cluster, node, expected_result, cluster_first, cluster_second, enable_filesystem_cache): +def check_s3_gets(cluster, node, expected_result, cluster_first, cluster_second, enable_filesystem_cache, + lock_object_storage_task_distribution_ms): for host in list(cluster.instances.values()): - host.query("SYSTEM DROP FILESYSTEM CACHE 'raw_s3_cache'", timeout=30) + host.query("SYSTEM DROP FILESYSTEM CACHE 'raw_s3_cache'", ignore_error=True) + + settings = { + "enable_filesystem_cache": enable_filesystem_cache, + "filesystem_cache_name": "'raw_s3_cache'", + } + + if lock_object_storage_task_distribution_ms > 0: + settings["lock_object_storage_task_distribution_ms"] = lock_object_storage_task_distribution_ms query_id_first = str(uuid.uuid4()) result_first = node.query( @@ -85,12 +95,9 @@ def check_s3_gets(cluster, node, expected_result, cluster_first, cluster_second, SELECT count(*) FROM s3Cluster('{cluster_first}', 'http://minio1:9001/root/data/generated/*', 'minio', '{minio_secret_key}', 'CSV', 'a String, b UInt64') WHERE b=42 - SETTINGS - enable_filesystem_cache={enable_filesystem_cache}, - filesystem_cache_name='raw_s3_cache' + SETTINGS {",".join(f"{k}={v}" for k, v in settings.items())} """, query_id=query_id_first, - timeout=30, ) assert result_first == expected_result query_id_second = str(uuid.uuid4()) @@ -99,18 +106,14 @@ def check_s3_gets(cluster, node, expected_result, cluster_first, cluster_second, SELECT count(*) FROM s3Cluster('{cluster_second}', 'http://minio1:9001/root/data/generated/*', 'minio', '{minio_secret_key}', 'CSV', 'a String, b UInt64') WHERE b=42 - SETTINGS - enable_filesystem_cache={enable_filesystem_cache}, - filesystem_cache_name='raw_s3_cache' + SETTINGS {",".join(f"{k}={v}" for k, v in settings.items())} """, query_id=query_id_second, - timeout=30, ) assert result_second == expected_result - node.query("SYSTEM FLUSH LOGS", timeout=30) - node.query(f"SYSTEM FLUSH LOGS ON CLUSTER {cluster_first}", timeout=30) - node.query(f"SYSTEM FLUSH LOGS ON CLUSTER {cluster_second}", timeout=30) + node.query(f"SYSTEM FLUSH LOGS ON CLUSTER {cluster_first}") + node.query(f"SYSTEM FLUSH LOGS ON CLUSTER {cluster_second}") s3_get_first = node.query( f""" @@ -119,7 +122,6 @@ def check_s3_gets(cluster, node, expected_result, cluster_first, cluster_second, WHERE type='QueryFinish' AND initial_query_id='{query_id_first}' """, - timeout=30, ) s3_get_second = node.query( f""" @@ -128,25 +130,26 @@ def check_s3_gets(cluster, node, expected_result, cluster_first, cluster_second, WHERE type='QueryFinish' AND initial_query_id='{query_id_second}' """, - timeout=30, ) return int(s3_get_first), int(s3_get_second) -def check_s3_gets_repeat(cluster, node, expected_result, cluster_first, cluster_second, enable_filesystem_cache): +def check_s3_gets_repeat(cluster, node, expected_result, cluster_first, cluster_second, enable_filesystem_cache, + lock_object_storage_task_distribution_ms): # Repeat test several times to get average result - iterations = 10 + iterations = 1 if lock_object_storage_task_distribution_ms > 0 else 10 s3_get_first_sum = 0 s3_get_second_sum = 0 for _ in range(iterations): - (s3_get_first, s3_get_second) = check_s3_gets(cluster, node, expected_result, cluster_first, cluster_second, enable_filesystem_cache) + (s3_get_first, s3_get_second) = check_s3_gets(cluster, node, expected_result, cluster_first, cluster_second, enable_filesystem_cache, lock_object_storage_task_distribution_ms) s3_get_first_sum += s3_get_first s3_get_second_sum += s3_get_second return s3_get_first_sum, s3_get_second_sum -def test_cache_locality(started_cluster): +@pytest.mark.parametrize("lock_object_storage_task_distribution_ms ", [0, 30000]) +def test_cache_locality(started_cluster, lock_object_storage_task_distribution_ms): node = started_cluster.instances["clickhouse0"] expected_result = node.query( @@ -158,36 +161,36 @@ def test_cache_locality(started_cluster): ) # Algorithm does not give 100% guarantee, so add 10% on dispersion - dispersion = 0.1 + dispersion = 0.0 if lock_object_storage_task_distribution_ms > 0 else 0.1 # No cache - (s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_12345', 'cluster_12345', 0) + (s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_12345', 'cluster_12345', 0, lock_object_storage_task_distribution_ms) assert s3_get_second == s3_get_first # With cache - (s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_12345', 'cluster_12345', 1) + (s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_12345', 'cluster_12345', 1, lock_object_storage_task_distribution_ms) assert s3_get_second <= s3_get_first * dispersion # Different nodes order - (s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_12345', 'cluster_34512', 1) + (s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_12345', 'cluster_34512', 1, lock_object_storage_task_distribution_ms) assert s3_get_second <= s3_get_first * dispersion # No last node - (s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_12345', 'cluster_1234', 1) - assert s3_get_second <= s3_get_first * (0.2 + dispersion) + (s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_12345', 'cluster_1234', 1, lock_object_storage_task_distribution_ms) + assert s3_get_second <= s3_get_first * (0.211 + dispersion) # actual value - 24 for 100 files, 211 for 1000 # No first node - (s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_12345', 'cluster_2345', 1) - assert s3_get_second <= s3_get_first * (0.2 + dispersion) + (s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_12345', 'cluster_2345', 1, lock_object_storage_task_distribution_ms) + assert s3_get_second <= s3_get_first * (0.189 + dispersion) # actual value - 12 for 100 files, 189 for 1000 # No first node, different nodes order - (s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_12345', 'cluster_4523', 1) - assert s3_get_second <= s3_get_first * (0.2 + dispersion) + (s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_12345', 'cluster_4523', 1, lock_object_storage_task_distribution_ms) + assert s3_get_second <= s3_get_first * (0.189 + dispersion) # Add new node, different nodes order - (s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_4523', 'cluster_12345', 1) - assert s3_get_second <= s3_get_first * (0.2 + dispersion) + (s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_4523', 'cluster_12345', 1, lock_object_storage_task_distribution_ms) + assert s3_get_second <= s3_get_first * (0.189 + dispersion) # New node and old node, different nodes order - (s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_1234', 'cluster_4523', 1) - assert s3_get_second <= s3_get_first * (0.4375 + dispersion) + (s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_1234', 'cluster_4523', 1, lock_object_storage_task_distribution_ms) + assert s3_get_second <= s3_get_first * (0.400 + dispersion) # actual value - 36 for 100 files, 400 for 1000 From 234fda45b1b208e0f7a288bf97e98267e34a4460 Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Wed, 6 Aug 2025 12:15:39 +0200 Subject: [PATCH 4/6] Fix tidy build --- src/Disks/ObjectStorages/IObjectStorage.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Disks/ObjectStorages/IObjectStorage.h b/src/Disks/ObjectStorages/IObjectStorage.h index 5754c9ec7af6..abe3f5210953 100644 --- a/src/Disks/ObjectStorages/IObjectStorage.h +++ b/src/Disks/ObjectStorages/IObjectStorage.h @@ -86,8 +86,8 @@ struct RelativePathWithMetadata class CommandInTaskResponse { public: - CommandInTaskResponse() {} - CommandInTaskResponse(const std::string & task); + CommandInTaskResponse() = default; + explicit CommandInTaskResponse(const std::string & task); bool is_parsed() const { return successfully_parsed; } void set_retry_after_us(Poco::Timestamp::TimeDiff time_us) { retry_after_us = time_us; } From d18f5bb739c5d9b3d2d98c2f2b5d0280a89c71a5 Mon Sep 17 00:00:00 2001 From: Vasily Nemkov Date: Tue, 9 Sep 2025 15:40:33 +0200 Subject: [PATCH 5/6] Fixed merge hiccup --- src/Storages/IStorageCluster.cpp | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/src/Storages/IStorageCluster.cpp b/src/Storages/IStorageCluster.cpp index a5f471fbed25..0155014729c1 100644 --- a/src/Storages/IStorageCluster.cpp +++ b/src/Storages/IStorageCluster.cpp @@ -65,15 +65,19 @@ void ReadFromCluster::applyFilters(ActionDAGNodes added_filter_nodes) if (filter_actions_dag) predicate = filter_actions_dag->getOutputs().at(0); - createExtension(predicate); + auto max_replicas_to_use = static_cast(cluster->getShardsInfo().size()); + if (context->getSettingsRef()[Setting::max_parallel_replicas] > 1) + max_replicas_to_use = std::min(max_replicas_to_use, context->getSettingsRef()[Setting::max_parallel_replicas].value); + + createExtension(predicate, max_replicas_to_use); } -void ReadFromCluster::createExtension(const ActionsDAG::Node * predicate) +void ReadFromCluster::createExtension(const ActionsDAG::Node * predicate, size_t number_of_replicas) { if (extension) return; - extension = storage->getTaskIteratorExtension(predicate, context, cluster); + extension = storage->getTaskIteratorExtension(predicate, context, number_of_replicas); } /// The code executes on initiator @@ -174,7 +178,7 @@ void ReadFromCluster::initializePipeline(QueryPipelineBuilder & pipeline, const if (current_settings[Setting::max_parallel_replicas] > 1) max_replicas_to_use = std::min(max_replicas_to_use, current_settings[Setting::max_parallel_replicas].value); - createExtension(nullptr); + createExtension(nullptr, max_replicas_to_use); for (const auto & shard_info : cluster->getShardsInfo()) { From 7152388ba3f466052d14382e37e9c615e26ce1b6 Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Tue, 9 Sep 2025 16:47:31 +0200 Subject: [PATCH 6/6] Fix build after merge --- src/Storages/IStorageCluster.cpp | 12 ++++-------- src/Storages/IStorageCluster.h | 2 +- 2 files changed, 5 insertions(+), 9 deletions(-) diff --git a/src/Storages/IStorageCluster.cpp b/src/Storages/IStorageCluster.cpp index 0155014729c1..a5f471fbed25 100644 --- a/src/Storages/IStorageCluster.cpp +++ b/src/Storages/IStorageCluster.cpp @@ -65,19 +65,15 @@ void ReadFromCluster::applyFilters(ActionDAGNodes added_filter_nodes) if (filter_actions_dag) predicate = filter_actions_dag->getOutputs().at(0); - auto max_replicas_to_use = static_cast(cluster->getShardsInfo().size()); - if (context->getSettingsRef()[Setting::max_parallel_replicas] > 1) - max_replicas_to_use = std::min(max_replicas_to_use, context->getSettingsRef()[Setting::max_parallel_replicas].value); - - createExtension(predicate, max_replicas_to_use); + createExtension(predicate); } -void ReadFromCluster::createExtension(const ActionsDAG::Node * predicate, size_t number_of_replicas) +void ReadFromCluster::createExtension(const ActionsDAG::Node * predicate) { if (extension) return; - extension = storage->getTaskIteratorExtension(predicate, context, number_of_replicas); + extension = storage->getTaskIteratorExtension(predicate, context, cluster); } /// The code executes on initiator @@ -178,7 +174,7 @@ void ReadFromCluster::initializePipeline(QueryPipelineBuilder & pipeline, const if (current_settings[Setting::max_parallel_replicas] > 1) max_replicas_to_use = std::min(max_replicas_to_use, current_settings[Setting::max_parallel_replicas].value); - createExtension(nullptr, max_replicas_to_use); + createExtension(nullptr); for (const auto & shard_info : cluster->getShardsInfo()) { diff --git a/src/Storages/IStorageCluster.h b/src/Storages/IStorageCluster.h index 05ed7dc0c301..ad5c41d17eec 100644 --- a/src/Storages/IStorageCluster.h +++ b/src/Storages/IStorageCluster.h @@ -133,7 +133,7 @@ class ReadFromCluster : public SourceStepWithFilter std::optional extension; - void createExtension(const ActionsDAG::Node * predicate, size_t number_of_replicas); + void createExtension(const ActionsDAG::Node * predicate); ContextPtr updateSettings(const Settings & settings); };