From 76c11231cc78a110a4fbcd56b31504e2384eec3a Mon Sep 17 00:00:00 2001 From: Vasily Nemkov Date: Tue, 8 Apr 2025 21:17:11 +0200 Subject: [PATCH 1/2] Merge pull request #709 from Altinity/feature/rendezvous-hashing-filesystem-cache Rendezvous hashing filesystem cache --- src/QueryPipeline/RemoteQueryExecutor.cpp | 6 +- src/QueryPipeline/RemoteQueryExecutor.h | 2 +- src/Storages/IStorageCluster.cpp | 22 ++- src/Storages/IStorageCluster.h | 5 +- .../StorageObjectStorageCluster.cpp | 20 +- .../StorageObjectStorageCluster.h | 4 +- ...rageObjectStorageStableTaskDistributor.cpp | 178 ++++++++++++++++++ ...torageObjectStorageStableTaskDistributor.h | 44 +++++ src/Storages/StorageDistributed.cpp | 14 +- src/Storages/StorageFileCluster.cpp | 7 +- src/Storages/StorageFileCluster.h | 5 +- src/Storages/StorageReplicatedMergeTree.cpp | 8 +- src/Storages/StorageURLCluster.cpp | 7 +- src/Storages/StorageURLCluster.h | 5 +- 14 files changed, 295 insertions(+), 32 deletions(-) create mode 100644 src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp create mode 100644 src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h diff --git a/src/QueryPipeline/RemoteQueryExecutor.cpp b/src/QueryPipeline/RemoteQueryExecutor.cpp index 80ee8d86040a..1b7df321b7f0 100644 --- a/src/QueryPipeline/RemoteQueryExecutor.cpp +++ b/src/QueryPipeline/RemoteQueryExecutor.cpp @@ -738,8 +738,12 @@ void RemoteQueryExecutor::processReadTaskRequest() if (!extension || !extension->task_iterator) throw Exception(ErrorCodes::LOGICAL_ERROR, "Distributed task iterator is not initialized"); + if (!extension->replica_info) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Replica info is not initialized"); + ProfileEvents::increment(ProfileEvents::ReadTaskRequestsReceived); - auto response = (*extension->task_iterator)(); + + auto response = (*extension->task_iterator)(extension->replica_info->number_of_current_replica); connections->sendReadTaskResponse(response); } diff --git a/src/QueryPipeline/RemoteQueryExecutor.h b/src/QueryPipeline/RemoteQueryExecutor.h index 2077990da946..f2d7f8ca8823 100644 --- a/src/QueryPipeline/RemoteQueryExecutor.h +++ b/src/QueryPipeline/RemoteQueryExecutor.h @@ -28,7 +28,7 @@ class RemoteQueryExecutorReadContext; class ParallelReplicasReadingCoordinator; /// This is the same type as StorageS3Source::IteratorWrapper -using TaskIterator = std::function; +using TaskIterator = std::function; /// This class allows one to launch queries on remote replicas of one shard and get results class RemoteQueryExecutor diff --git a/src/Storages/IStorageCluster.cpp b/src/Storages/IStorageCluster.cpp index d304eeea4bf2..5741a0ea57b9 100644 --- a/src/Storages/IStorageCluster.cpp +++ b/src/Storages/IStorageCluster.cpp @@ -112,7 +112,17 @@ void ReadFromCluster::createExtension(const ActionsDAG::Node * predicate) if (extension) return; - extension = storage->getTaskIteratorExtension(predicate, context); + std::vector ids_of_hosts; + for (const auto & shard : cluster->getShardsInfo()) + { + if (shard.per_replica_pools.empty()) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Cluster {} with empty shard {}", cluster->getName(), shard.shard_num); + if (!shard.per_replica_pools[0]) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Cluster {}, shard {} with empty node", cluster->getName(), shard.shard_num); + ids_of_hosts.push_back(shard.per_replica_pools[0]->getAddress()); + } + + extension = storage->getTaskIteratorExtension(predicate, context, ids_of_hosts); } /// The code executes on initiator @@ -178,8 +188,6 @@ void IStorageCluster::read( void ReadFromCluster::initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) { - createExtension(nullptr); - const Scalars & scalars = context->hasQueryContext() ? context->getQueryContext()->getScalars() : Scalars{}; const bool add_agg_info = processed_stage == QueryProcessingStage::WithMergeableState; @@ -192,6 +200,10 @@ void ReadFromCluster::initializePipeline(QueryPipelineBuilder & pipeline, const if (current_settings[Setting::max_parallel_replicas] > 1) max_replicas_to_use = std::min(max_replicas_to_use, current_settings[Setting::max_parallel_replicas].value); + size_t replica_index = 0; + + createExtension(nullptr); + for (const auto & shard_info : cluster->getShardsInfo()) { if (pipes.size() >= max_replicas_to_use) @@ -209,6 +221,8 @@ void ReadFromCluster::initializePipeline(QueryPipelineBuilder & pipeline, const if (try_results.empty()) continue; + IConnections::ReplicaInfo replica_info{ .number_of_current_replica = replica_index++ }; + auto remote_query_executor = std::make_shared( std::vector{try_results.front()}, query_to_send->formatWithSecretsOneLine(), @@ -218,7 +232,7 @@ void ReadFromCluster::initializePipeline(QueryPipelineBuilder & pipeline, const scalars, Tables(), processed_stage, - extension); + RemoteQueryExecutor::Extension{.task_iterator = extension->task_iterator, .replica_info = std::move(replica_info)}); remote_query_executor->setLogger(log); pipes.emplace_back(std::make_shared( diff --git a/src/Storages/IStorageCluster.h b/src/Storages/IStorageCluster.h index 2c81539e669e..d10ef28e556d 100644 --- a/src/Storages/IStorageCluster.h +++ b/src/Storages/IStorageCluster.h @@ -33,7 +33,10 @@ class IStorageCluster : public IStorage ClusterPtr getCluster(ContextPtr context) const; /// Query is needed for pruning by virtual columns (_file, _path) - virtual RemoteQueryExecutor::Extension getTaskIteratorExtension(const ActionsDAG::Node * predicate, const ContextPtr & context) const = 0; + virtual RemoteQueryExecutor::Extension getTaskIteratorExtension( + const ActionsDAG::Node * predicate, + const ContextPtr & context, + std::optional> ids_of_hosts = std::nullopt) const = 0; QueryProcessingStage::Enum getQueryProcessingStage(ContextPtr, QueryProcessingStage::Enum, const StorageSnapshotPtr &, SelectQueryInfo &) const override; diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp index 8fabcc368a5e..291f206ab2e6 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp @@ -13,6 +13,7 @@ #include #include #include +#include namespace DB @@ -144,24 +145,21 @@ void StorageObjectStorageCluster::updateQueryToSendIfNeeded( } RemoteQueryExecutor::Extension StorageObjectStorageCluster::getTaskIteratorExtension( - const ActionsDAG::Node * predicate, const ContextPtr & local_context) const + const ActionsDAG::Node * predicate, + const ContextPtr & local_context, + std::optional> ids_of_replicas) const { auto iterator = StorageObjectStorageSource::createFileIterator( configuration, configuration->getQuerySettings(local_context), object_storage, /* distributed_processing */false, local_context, predicate, virtual_columns, nullptr, local_context->getFileProgressCallback(), /*ignore_archive_globs=*/true); - auto callback = std::make_shared>([iterator]() mutable -> String - { - auto object_info = iterator->next(0); - if (!object_info) - return ""; + auto task_distributor = std::make_shared(iterator, ids_of_replicas); - auto archive_object_info = std::dynamic_pointer_cast(object_info); - if (archive_object_info) - return archive_object_info->getPathToArchive(); + auto callback = std::make_shared( + [task_distributor](size_t number_of_current_replica) mutable -> String { + return task_distributor->getNextTask(number_of_current_replica).value_or(""); + }); - return object_info->getPath(); - }); return RemoteQueryExecutor::Extension{ .task_iterator = std::move(callback) }; } diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.h b/src/Storages/ObjectStorage/StorageObjectStorageCluster.h index 0088ff28fc22..5fc02d5bf1bf 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.h +++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.h @@ -25,7 +25,9 @@ class StorageObjectStorageCluster : public IStorageCluster std::string getName() const override; RemoteQueryExecutor::Extension getTaskIteratorExtension( - const ActionsDAG::Node * predicate, const ContextPtr & context) const override; + const ActionsDAG::Node * predicate, + const ContextPtr & context, + std::optional> ids_of_replicas) const override; String getPathSample(StorageInMemoryMetadata metadata, ContextPtr context); diff --git a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp new file mode 100644 index 000000000000..d2127a7f45c4 --- /dev/null +++ b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp @@ -0,0 +1,178 @@ +#include "StorageObjectStorageStableTaskDistributor.h" +#include +#include +#include + +namespace DB +{ + +StorageObjectStorageStableTaskDistributor::StorageObjectStorageStableTaskDistributor( + std::shared_ptr iterator_, + std::optional> ids_of_nodes_) + : iterator(std::move(iterator_)) + , connection_to_files(ids_of_nodes_.has_value() ? ids_of_nodes_.value().size() : 1) + , ids_of_nodes(ids_of_nodes_) + , iterator_exhausted(false) +{ +} + +std::optional StorageObjectStorageStableTaskDistributor::getNextTask(size_t number_of_current_replica) +{ + LOG_TRACE( + log, + "Received a new connection from replica {} looking for a file", + number_of_current_replica + ); + + // 1. Check pre-queued files first + if (auto file = getPreQueuedFile(number_of_current_replica)) + return file; + + // 2. Try to find a matching file from the iterator + if (auto file = getMatchingFileFromIterator(number_of_current_replica)) + return file; + + // 3. Process unprocessed files if iterator is exhausted + return getAnyUnprocessedFile(number_of_current_replica); +} + +size_t StorageObjectStorageStableTaskDistributor::getReplicaForFile(const String & file_path) +{ + if (!ids_of_nodes.has_value()) + throw Exception(ErrorCodes::LOGICAL_ERROR, "No list of nodes inside Task Distributer."); + + const auto & ids_of_nodes_value = ids_of_nodes.value(); + size_t nodes_count = ids_of_nodes_value.size(); + + /// Trivial case + if (nodes_count < 2) + return 0; + + /// Rendezvous hashing + size_t best_id = 0; + UInt64 best_weight = sipHash64(ids_of_nodes_value[0] + file_path); + for (size_t id = 1; id < nodes_count; ++id) + { + UInt64 weight = sipHash64(ids_of_nodes_value[id] + file_path); + if (weight > best_weight) + { + best_weight = weight; + best_id = id; + } + } + return best_id; +} + +std::optional StorageObjectStorageStableTaskDistributor::getPreQueuedFile(size_t number_of_current_replica) +{ + std::lock_guard lock(mutex); + + auto & files = connection_to_files[number_of_current_replica]; + + while (!files.empty()) + { + String next_file = files.back(); + files.pop_back(); + + auto it = unprocessed_files.find(next_file); + if (it == unprocessed_files.end()) + continue; + + unprocessed_files.erase(it); + + LOG_TRACE( + log, + "Assigning pre-queued file {} to replica {}", + next_file, + number_of_current_replica + ); + + return next_file; + } + + return std::nullopt; +} + +std::optional StorageObjectStorageStableTaskDistributor::getMatchingFileFromIterator(size_t number_of_current_replica) +{ + { + std::lock_guard lock(mutex); + if (iterator_exhausted) + return std::nullopt; + } + + while (true) + { + ObjectInfoPtr object_info; + + { + std::lock_guard lock(mutex); + object_info = iterator->next(0); + + if (!object_info) + { + iterator_exhausted = true; + break; + } + } + + String file_path; + + auto archive_object_info = std::dynamic_pointer_cast(object_info); + if (archive_object_info) + { + file_path = archive_object_info->getPathToArchive(); + } + else + { + file_path = object_info->getPath(); + } + + size_t file_replica_idx = getReplicaForFile(file_path); + if (file_replica_idx == number_of_current_replica) + { + LOG_TRACE( + log, + "Found file {} for replica {}", + file_path, + number_of_current_replica + ); + + return file_path; + } + + // Queue file for its assigned replica + { + std::lock_guard lock(mutex); + unprocessed_files.insert(file_path); + connection_to_files[file_replica_idx].push_back(file_path); + } + } + + return std::nullopt; +} + +std::optional StorageObjectStorageStableTaskDistributor::getAnyUnprocessedFile(size_t number_of_current_replica) +{ + std::lock_guard lock(mutex); + + if (!unprocessed_files.empty()) + { + auto it = unprocessed_files.begin(); + String next_file = *it; + unprocessed_files.erase(it); + + LOG_TRACE( + log, + "Iterator exhausted. Assigning unprocessed file {} to replica {}", + next_file, + number_of_current_replica + ); + + return next_file; + } + + return std::nullopt; +} + +} diff --git a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h new file mode 100644 index 000000000000..a87884885a45 --- /dev/null +++ b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h @@ -0,0 +1,44 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace DB +{ + +class StorageObjectStorageStableTaskDistributor +{ +public: + StorageObjectStorageStableTaskDistributor( + std::shared_ptr iterator_, + std::optional> ids_of_nodes_); + + std::optional getNextTask(size_t number_of_current_replica); + +private: + size_t getReplicaForFile(const String & file_path); + std::optional getPreQueuedFile(size_t number_of_current_replica); + std::optional getMatchingFileFromIterator(size_t number_of_current_replica); + std::optional getAnyUnprocessedFile(size_t number_of_current_replica); + + std::shared_ptr iterator; + + std::vector> connection_to_files; + std::unordered_set unprocessed_files; + + std::optional> ids_of_nodes; + + std::mutex mutex; + bool iterator_exhausted = false; + + LoggerPtr log = getLogger("StorageClusterTaskDistributor"); +}; + +} diff --git a/src/Storages/StorageDistributed.cpp b/src/Storages/StorageDistributed.cpp index ca4b7206755d..e8aaa2cce8aa 100644 --- a/src/Storages/StorageDistributed.cpp +++ b/src/Storages/StorageDistributed.cpp @@ -1135,9 +1135,6 @@ std::optional StorageDistributed::distributedWriteFromClusterStor if (filter) predicate = filter->getOutputs().at(0); - /// Select query is needed for pruining on virtual columns - auto extension = src_storage_cluster.getTaskIteratorExtension(predicate, local_context); - auto dst_cluster = getCluster(); auto new_query = std::dynamic_pointer_cast(query.clone()); @@ -1164,8 +1161,13 @@ std::optional StorageDistributed::distributedWriteFromClusterStor const auto & current_settings = query_context->getSettingsRef(); auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(current_settings); - /// Here we take addresses from destination cluster and assume source table exists on these nodes const auto cluster = getCluster(); + + /// Select query is needed for pruining on virtual columns + auto extension = src_storage_cluster.getTaskIteratorExtension(predicate, local_context); + + /// Here we take addresses from destination cluster and assume source table exists on these nodes + size_t replica_index = 0; for (const auto & replicas : cluster->getShardsInfo()) { /// Skip unavailable hosts if necessary @@ -1174,6 +1176,8 @@ std::optional StorageDistributed::distributedWriteFromClusterStor /// There will be only one replica, because we consider each replica as a shard for (const auto & try_result : try_results) { + IConnections::ReplicaInfo replica_info{ .number_of_current_replica = replica_index++ }; + auto remote_query_executor = std::make_shared( std::vector{try_result}, new_query_str, @@ -1183,7 +1187,7 @@ std::optional StorageDistributed::distributedWriteFromClusterStor Scalars{}, Tables{}, QueryProcessingStage::Complete, - extension); + RemoteQueryExecutor::Extension{.task_iterator = extension.task_iterator, .replica_info = std::move(replica_info)}); QueryPipeline remote_pipeline(std::make_shared( remote_query_executor, false, settings[Setting::async_socket_for_remote], settings[Setting::async_query_sending_for_remote])); diff --git a/src/Storages/StorageFileCluster.cpp b/src/Storages/StorageFileCluster.cpp index 269bc61a0d11..075ee910a3dc 100644 --- a/src/Storages/StorageFileCluster.cpp +++ b/src/Storages/StorageFileCluster.cpp @@ -77,10 +77,13 @@ void StorageFileCluster::updateQueryToSendIfNeeded(DB::ASTPtr & query, const Sto ); } -RemoteQueryExecutor::Extension StorageFileCluster::getTaskIteratorExtension(const ActionsDAG::Node * predicate, const ContextPtr & context) const +RemoteQueryExecutor::Extension StorageFileCluster::getTaskIteratorExtension( + const ActionsDAG::Node * predicate, + const ContextPtr & context, + std::optional>) const { auto iterator = std::make_shared(paths, std::nullopt, predicate, getVirtualsList(), context); - auto callback = std::make_shared([iter = std::move(iterator)]() mutable -> String { return iter->next(); }); + auto callback = std::make_shared([iter = std::move(iterator)](size_t) mutable -> String { return iter->next(); }); return RemoteQueryExecutor::Extension{.task_iterator = std::move(callback)}; } diff --git a/src/Storages/StorageFileCluster.h b/src/Storages/StorageFileCluster.h index 9549f3a035c3..3329223739ae 100644 --- a/src/Storages/StorageFileCluster.h +++ b/src/Storages/StorageFileCluster.h @@ -27,7 +27,10 @@ class StorageFileCluster : public IStorageCluster const ConstraintsDescription & constraints_); std::string getName() const override { return "FileCluster"; } - RemoteQueryExecutor::Extension getTaskIteratorExtension(const ActionsDAG::Node * predicate, const ContextPtr & context) const override; + RemoteQueryExecutor::Extension getTaskIteratorExtension( + const ActionsDAG::Node * predicate, + const ContextPtr & context, + std::optional> ids_of_nodes) const override; private: void updateQueryToSendIfNeeded(ASTPtr & query, const StorageSnapshotPtr & storage_snapshot, const ContextPtr & context) override; diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 1e466d2fc699..405a8c45174c 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -5993,7 +5993,6 @@ SinkToStoragePtr StorageReplicatedMergeTree::write(const ASTPtr & /*query*/, con std::optional StorageReplicatedMergeTree::distributedWriteFromClusterStorage(const std::shared_ptr & src_storage_cluster, const ASTInsertQuery & query, ContextPtr local_context) { const auto & settings = local_context->getSettingsRef(); - auto extension = src_storage_cluster->getTaskIteratorExtension(nullptr, local_context); /// Here we won't check that the cluster formed from table replicas is a subset of a cluster specified in s3Cluster/hdfsCluster table function auto src_cluster = src_storage_cluster->getCluster(local_context); @@ -6012,6 +6011,9 @@ std::optional StorageReplicatedMergeTree::distributedWriteFromClu ContextMutablePtr query_context = Context::createCopy(local_context); query_context->increaseDistributedDepth(); + auto extension = src_storage_cluster->getTaskIteratorExtension(nullptr, local_context); + + size_t replica_index = 0; for (const auto & replicas : src_cluster->getShardsAddresses()) { /// There will be only one replica, because we consider each replica as a shard @@ -6026,6 +6028,8 @@ std::optional StorageReplicatedMergeTree::distributedWriteFromClu node.secure ); + IConnections::ReplicaInfo replica_info{ .number_of_current_replica = replica_index++ }; + auto remote_query_executor = std::make_shared( connection, query_str, @@ -6035,7 +6039,7 @@ std::optional StorageReplicatedMergeTree::distributedWriteFromClu Scalars{}, Tables{}, QueryProcessingStage::Complete, - extension); + RemoteQueryExecutor::Extension{.task_iterator = extension.task_iterator, .replica_info = std::move(replica_info)}); QueryPipeline remote_pipeline(std::make_shared( remote_query_executor, false, settings[Setting::async_socket_for_remote], settings[Setting::async_query_sending_for_remote])); diff --git a/src/Storages/StorageURLCluster.cpp b/src/Storages/StorageURLCluster.cpp index 2600f8105ea3..3122f5d190f0 100644 --- a/src/Storages/StorageURLCluster.cpp +++ b/src/Storages/StorageURLCluster.cpp @@ -114,11 +114,14 @@ void StorageURLCluster::updateQueryToSendIfNeeded(ASTPtr & query, const StorageS ); } -RemoteQueryExecutor::Extension StorageURLCluster::getTaskIteratorExtension(const ActionsDAG::Node * predicate, const ContextPtr & context) const +RemoteQueryExecutor::Extension StorageURLCluster::getTaskIteratorExtension( + const ActionsDAG::Node * predicate, + const ContextPtr & context, + std::optional>) const { auto iterator = std::make_shared( uri, context->getSettingsRef()[Setting::glob_expansion_max_elements], predicate, getVirtualsList(), context); - auto callback = std::make_shared([iter = std::move(iterator)]() mutable -> String { return iter->next(); }); + auto callback = std::make_shared([iter = std::move(iterator)](size_t) mutable -> String { return iter->next(); }); return RemoteQueryExecutor::Extension{.task_iterator = std::move(callback)}; } diff --git a/src/Storages/StorageURLCluster.h b/src/Storages/StorageURLCluster.h index 31bffa062104..d09d2a36bd7e 100644 --- a/src/Storages/StorageURLCluster.h +++ b/src/Storages/StorageURLCluster.h @@ -30,7 +30,10 @@ class StorageURLCluster : public IStorageCluster const StorageURL::Configuration & configuration_); std::string getName() const override { return "URLCluster"; } - RemoteQueryExecutor::Extension getTaskIteratorExtension(const ActionsDAG::Node * predicate, const ContextPtr & context) const override; + RemoteQueryExecutor::Extension getTaskIteratorExtension( + const ActionsDAG::Node * predicate, + const ContextPtr & context, + std::optional> ids_of_replicas) const override; private: void updateQueryToSendIfNeeded(ASTPtr & query, const StorageSnapshotPtr & storage_snapshot, const ContextPtr & context) override; From 0ece70ee16c94568b7c6e31a52aa3c1154fe543e Mon Sep 17 00:00:00 2001 From: Vasily Nemkov Date: Sat, 3 May 2025 18:36:27 +0200 Subject: [PATCH 2/2] Merge pull request #760 from Altinity/feature/fix_rendezvous_hashing Fix rendezvous hashing in complex queries --- src/Storages/IStorageCluster.cpp | 12 +--------- src/Storages/IStorageCluster.h | 2 +- .../StorageObjectStorageCluster.cpp | 17 ++++++++++++-- .../StorageObjectStorageCluster.h | 2 +- ...rageObjectStorageStableTaskDistributor.cpp | 22 +++++++++++-------- ...torageObjectStorageStableTaskDistributor.h | 4 ++-- src/Storages/StorageDistributed.cpp | 2 +- src/Storages/StorageFileCluster.cpp | 2 +- src/Storages/StorageFileCluster.h | 2 +- src/Storages/StorageReplicatedMergeTree.cpp | 2 +- src/Storages/StorageURLCluster.cpp | 2 +- src/Storages/StorageURLCluster.h | 2 +- 12 files changed, 39 insertions(+), 32 deletions(-) diff --git a/src/Storages/IStorageCluster.cpp b/src/Storages/IStorageCluster.cpp index 5741a0ea57b9..0d873249c472 100644 --- a/src/Storages/IStorageCluster.cpp +++ b/src/Storages/IStorageCluster.cpp @@ -112,17 +112,7 @@ void ReadFromCluster::createExtension(const ActionsDAG::Node * predicate) if (extension) return; - std::vector ids_of_hosts; - for (const auto & shard : cluster->getShardsInfo()) - { - if (shard.per_replica_pools.empty()) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Cluster {} with empty shard {}", cluster->getName(), shard.shard_num); - if (!shard.per_replica_pools[0]) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Cluster {}, shard {} with empty node", cluster->getName(), shard.shard_num); - ids_of_hosts.push_back(shard.per_replica_pools[0]->getAddress()); - } - - extension = storage->getTaskIteratorExtension(predicate, context, ids_of_hosts); + extension = storage->getTaskIteratorExtension(predicate, context, cluster); } /// The code executes on initiator diff --git a/src/Storages/IStorageCluster.h b/src/Storages/IStorageCluster.h index d10ef28e556d..4487b2d85636 100644 --- a/src/Storages/IStorageCluster.h +++ b/src/Storages/IStorageCluster.h @@ -36,7 +36,7 @@ class IStorageCluster : public IStorage virtual RemoteQueryExecutor::Extension getTaskIteratorExtension( const ActionsDAG::Node * predicate, const ContextPtr & context, - std::optional> ids_of_hosts = std::nullopt) const = 0; + ClusterPtr cluster) const = 0; QueryProcessingStage::Enum getQueryProcessingStage(ContextPtr, QueryProcessingStage::Enum, const StorageSnapshotPtr &, SelectQueryInfo &) const override; diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp index 291f206ab2e6..f89edf6ae28f 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp @@ -147,13 +147,26 @@ void StorageObjectStorageCluster::updateQueryToSendIfNeeded( RemoteQueryExecutor::Extension StorageObjectStorageCluster::getTaskIteratorExtension( const ActionsDAG::Node * predicate, const ContextPtr & local_context, - std::optional> ids_of_replicas) const + ClusterPtr cluster) const { auto iterator = StorageObjectStorageSource::createFileIterator( configuration, configuration->getQuerySettings(local_context), object_storage, /* distributed_processing */false, local_context, predicate, virtual_columns, nullptr, local_context->getFileProgressCallback(), /*ignore_archive_globs=*/true); - auto task_distributor = std::make_shared(iterator, ids_of_replicas); + std::vector ids_of_hosts; + for (const auto & shard : cluster->getShardsInfo()) + { + if (shard.per_replica_pools.empty()) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Cluster {} with empty shard {}", cluster->getName(), shard.shard_num); + for (const auto & replica : shard.per_replica_pools) + { + if (!replica) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Cluster {}, shard {} with empty node", cluster->getName(), shard.shard_num); + ids_of_hosts.push_back(replica->getAddress()); + } + } + + auto task_distributor = std::make_shared(iterator, ids_of_hosts); auto callback = std::make_shared( [task_distributor](size_t number_of_current_replica) mutable -> String { diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.h b/src/Storages/ObjectStorage/StorageObjectStorageCluster.h index 5fc02d5bf1bf..5b979f3dcf00 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.h +++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.h @@ -27,7 +27,7 @@ class StorageObjectStorageCluster : public IStorageCluster RemoteQueryExecutor::Extension getTaskIteratorExtension( const ActionsDAG::Node * predicate, const ContextPtr & context, - std::optional> ids_of_replicas) const override; + ClusterPtr cluster) const override; String getPathSample(StorageInMemoryMetadata metadata, ContextPtr context); diff --git a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp index d2127a7f45c4..d9ca7b344637 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp @@ -8,9 +8,9 @@ namespace DB StorageObjectStorageStableTaskDistributor::StorageObjectStorageStableTaskDistributor( std::shared_ptr iterator_, - std::optional> ids_of_nodes_) + std::vector ids_of_nodes_) : iterator(std::move(iterator_)) - , connection_to_files(ids_of_nodes_.has_value() ? ids_of_nodes_.value().size() : 1) + , connection_to_files(ids_of_nodes_.size()) , ids_of_nodes(ids_of_nodes_) , iterator_exhausted(false) { @@ -38,11 +38,7 @@ std::optional StorageObjectStorageStableTaskDistributor::getNextTask(siz size_t StorageObjectStorageStableTaskDistributor::getReplicaForFile(const String & file_path) { - if (!ids_of_nodes.has_value()) - throw Exception(ErrorCodes::LOGICAL_ERROR, "No list of nodes inside Task Distributer."); - - const auto & ids_of_nodes_value = ids_of_nodes.value(); - size_t nodes_count = ids_of_nodes_value.size(); + size_t nodes_count = ids_of_nodes.size(); /// Trivial case if (nodes_count < 2) @@ -50,10 +46,10 @@ size_t StorageObjectStorageStableTaskDistributor::getReplicaForFile(const String /// Rendezvous hashing size_t best_id = 0; - UInt64 best_weight = sipHash64(ids_of_nodes_value[0] + file_path); + UInt64 best_weight = sipHash64(ids_of_nodes[0] + file_path); for (size_t id = 1; id < nodes_count; ++id) { - UInt64 weight = sipHash64(ids_of_nodes_value[id] + file_path); + UInt64 weight = sipHash64(ids_of_nodes[id] + file_path); if (weight > best_weight) { best_weight = weight; @@ -67,6 +63,14 @@ std::optional StorageObjectStorageStableTaskDistributor::getPreQueuedFil { std::lock_guard lock(mutex); + if (connection_to_files.size() <= number_of_current_replica) + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "Replica number {} is out of range. Expected range: [0, {})", + number_of_current_replica, + connection_to_files.size() + ); + auto & files = connection_to_files[number_of_current_replica]; while (!files.empty()) diff --git a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h index a87884885a45..678ff4372f5f 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h +++ b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h @@ -18,7 +18,7 @@ class StorageObjectStorageStableTaskDistributor public: StorageObjectStorageStableTaskDistributor( std::shared_ptr iterator_, - std::optional> ids_of_nodes_); + std::vector ids_of_nodes_); std::optional getNextTask(size_t number_of_current_replica); @@ -33,7 +33,7 @@ class StorageObjectStorageStableTaskDistributor std::vector> connection_to_files; std::unordered_set unprocessed_files; - std::optional> ids_of_nodes; + std::vector ids_of_nodes; std::mutex mutex; bool iterator_exhausted = false; diff --git a/src/Storages/StorageDistributed.cpp b/src/Storages/StorageDistributed.cpp index e8aaa2cce8aa..571098cafb04 100644 --- a/src/Storages/StorageDistributed.cpp +++ b/src/Storages/StorageDistributed.cpp @@ -1164,7 +1164,7 @@ std::optional StorageDistributed::distributedWriteFromClusterStor const auto cluster = getCluster(); /// Select query is needed for pruining on virtual columns - auto extension = src_storage_cluster.getTaskIteratorExtension(predicate, local_context); + auto extension = src_storage_cluster.getTaskIteratorExtension(predicate, local_context, cluster); /// Here we take addresses from destination cluster and assume source table exists on these nodes size_t replica_index = 0; diff --git a/src/Storages/StorageFileCluster.cpp b/src/Storages/StorageFileCluster.cpp index 075ee910a3dc..d70e92fa3b45 100644 --- a/src/Storages/StorageFileCluster.cpp +++ b/src/Storages/StorageFileCluster.cpp @@ -80,7 +80,7 @@ void StorageFileCluster::updateQueryToSendIfNeeded(DB::ASTPtr & query, const Sto RemoteQueryExecutor::Extension StorageFileCluster::getTaskIteratorExtension( const ActionsDAG::Node * predicate, const ContextPtr & context, - std::optional>) const + ClusterPtr) const { auto iterator = std::make_shared(paths, std::nullopt, predicate, getVirtualsList(), context); auto callback = std::make_shared([iter = std::move(iterator)](size_t) mutable -> String { return iter->next(); }); diff --git a/src/Storages/StorageFileCluster.h b/src/Storages/StorageFileCluster.h index 3329223739ae..49d39a24ceba 100644 --- a/src/Storages/StorageFileCluster.h +++ b/src/Storages/StorageFileCluster.h @@ -30,7 +30,7 @@ class StorageFileCluster : public IStorageCluster RemoteQueryExecutor::Extension getTaskIteratorExtension( const ActionsDAG::Node * predicate, const ContextPtr & context, - std::optional> ids_of_nodes) const override; + ClusterPtr) const override; private: void updateQueryToSendIfNeeded(ASTPtr & query, const StorageSnapshotPtr & storage_snapshot, const ContextPtr & context) override; diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 405a8c45174c..efd998aa6d54 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -6011,7 +6011,7 @@ std::optional StorageReplicatedMergeTree::distributedWriteFromClu ContextMutablePtr query_context = Context::createCopy(local_context); query_context->increaseDistributedDepth(); - auto extension = src_storage_cluster->getTaskIteratorExtension(nullptr, local_context); + auto extension = src_storage_cluster->getTaskIteratorExtension(nullptr, local_context, src_cluster); size_t replica_index = 0; for (const auto & replicas : src_cluster->getShardsAddresses()) diff --git a/src/Storages/StorageURLCluster.cpp b/src/Storages/StorageURLCluster.cpp index 3122f5d190f0..7ba7f22c62e8 100644 --- a/src/Storages/StorageURLCluster.cpp +++ b/src/Storages/StorageURLCluster.cpp @@ -117,7 +117,7 @@ void StorageURLCluster::updateQueryToSendIfNeeded(ASTPtr & query, const StorageS RemoteQueryExecutor::Extension StorageURLCluster::getTaskIteratorExtension( const ActionsDAG::Node * predicate, const ContextPtr & context, - std::optional>) const + ClusterPtr) const { auto iterator = std::make_shared( uri, context->getSettingsRef()[Setting::glob_expansion_max_elements], predicate, getVirtualsList(), context); diff --git a/src/Storages/StorageURLCluster.h b/src/Storages/StorageURLCluster.h index d09d2a36bd7e..9bfbaffe30f8 100644 --- a/src/Storages/StorageURLCluster.h +++ b/src/Storages/StorageURLCluster.h @@ -33,7 +33,7 @@ class StorageURLCluster : public IStorageCluster RemoteQueryExecutor::Extension getTaskIteratorExtension( const ActionsDAG::Node * predicate, const ContextPtr & context, - std::optional> ids_of_replicas) const override; + ClusterPtr) const override; private: void updateQueryToSendIfNeeded(ASTPtr & query, const StorageSnapshotPtr & storage_snapshot, const ContextPtr & context) override;