diff --git a/src/QueryPipeline/RemoteQueryExecutor.cpp b/src/QueryPipeline/RemoteQueryExecutor.cpp index 80ee8d86040a..1b7df321b7f0 100644 --- a/src/QueryPipeline/RemoteQueryExecutor.cpp +++ b/src/QueryPipeline/RemoteQueryExecutor.cpp @@ -738,8 +738,12 @@ void RemoteQueryExecutor::processReadTaskRequest() if (!extension || !extension->task_iterator) throw Exception(ErrorCodes::LOGICAL_ERROR, "Distributed task iterator is not initialized"); + if (!extension->replica_info) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Replica info is not initialized"); + ProfileEvents::increment(ProfileEvents::ReadTaskRequestsReceived); - auto response = (*extension->task_iterator)(); + + auto response = (*extension->task_iterator)(extension->replica_info->number_of_current_replica); connections->sendReadTaskResponse(response); } diff --git a/src/QueryPipeline/RemoteQueryExecutor.h b/src/QueryPipeline/RemoteQueryExecutor.h index 2077990da946..f2d7f8ca8823 100644 --- a/src/QueryPipeline/RemoteQueryExecutor.h +++ b/src/QueryPipeline/RemoteQueryExecutor.h @@ -28,7 +28,7 @@ class RemoteQueryExecutorReadContext; class ParallelReplicasReadingCoordinator; /// This is the same type as StorageS3Source::IteratorWrapper -using TaskIterator = std::function; +using TaskIterator = std::function; /// This class allows one to launch queries on remote replicas of one shard and get results class RemoteQueryExecutor diff --git a/src/Storages/IStorageCluster.cpp b/src/Storages/IStorageCluster.cpp index d304eeea4bf2..0d873249c472 100644 --- a/src/Storages/IStorageCluster.cpp +++ b/src/Storages/IStorageCluster.cpp @@ -112,7 +112,7 @@ void ReadFromCluster::createExtension(const ActionsDAG::Node * predicate) if (extension) return; - extension = storage->getTaskIteratorExtension(predicate, context); + extension = storage->getTaskIteratorExtension(predicate, context, cluster); } /// The code executes on initiator @@ -178,8 +178,6 @@ void IStorageCluster::read( void ReadFromCluster::initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) { - createExtension(nullptr); - const Scalars & scalars = context->hasQueryContext() ? context->getQueryContext()->getScalars() : Scalars{}; const bool add_agg_info = processed_stage == QueryProcessingStage::WithMergeableState; @@ -192,6 +190,10 @@ void ReadFromCluster::initializePipeline(QueryPipelineBuilder & pipeline, const if (current_settings[Setting::max_parallel_replicas] > 1) max_replicas_to_use = std::min(max_replicas_to_use, current_settings[Setting::max_parallel_replicas].value); + size_t replica_index = 0; + + createExtension(nullptr); + for (const auto & shard_info : cluster->getShardsInfo()) { if (pipes.size() >= max_replicas_to_use) @@ -209,6 +211,8 @@ void ReadFromCluster::initializePipeline(QueryPipelineBuilder & pipeline, const if (try_results.empty()) continue; + IConnections::ReplicaInfo replica_info{ .number_of_current_replica = replica_index++ }; + auto remote_query_executor = std::make_shared( std::vector{try_results.front()}, query_to_send->formatWithSecretsOneLine(), @@ -218,7 +222,7 @@ void ReadFromCluster::initializePipeline(QueryPipelineBuilder & pipeline, const scalars, Tables(), processed_stage, - extension); + RemoteQueryExecutor::Extension{.task_iterator = extension->task_iterator, .replica_info = std::move(replica_info)}); remote_query_executor->setLogger(log); pipes.emplace_back(std::make_shared( diff --git a/src/Storages/IStorageCluster.h b/src/Storages/IStorageCluster.h index 2c81539e669e..4487b2d85636 100644 --- a/src/Storages/IStorageCluster.h +++ b/src/Storages/IStorageCluster.h @@ -33,7 +33,10 @@ class IStorageCluster : public IStorage ClusterPtr getCluster(ContextPtr context) const; /// Query is needed for pruning by virtual columns (_file, _path) - virtual RemoteQueryExecutor::Extension getTaskIteratorExtension(const ActionsDAG::Node * predicate, const ContextPtr & context) const = 0; + virtual RemoteQueryExecutor::Extension getTaskIteratorExtension( + const ActionsDAG::Node * predicate, + const ContextPtr & context, + ClusterPtr cluster) const = 0; QueryProcessingStage::Enum getQueryProcessingStage(ContextPtr, QueryProcessingStage::Enum, const StorageSnapshotPtr &, SelectQueryInfo &) const override; diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp index 8fabcc368a5e..f89edf6ae28f 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp @@ -13,6 +13,7 @@ #include #include #include +#include namespace DB @@ -144,24 +145,34 @@ void StorageObjectStorageCluster::updateQueryToSendIfNeeded( } RemoteQueryExecutor::Extension StorageObjectStorageCluster::getTaskIteratorExtension( - const ActionsDAG::Node * predicate, const ContextPtr & local_context) const + const ActionsDAG::Node * predicate, + const ContextPtr & local_context, + ClusterPtr cluster) const { auto iterator = StorageObjectStorageSource::createFileIterator( configuration, configuration->getQuerySettings(local_context), object_storage, /* distributed_processing */false, local_context, predicate, virtual_columns, nullptr, local_context->getFileProgressCallback(), /*ignore_archive_globs=*/true); - auto callback = std::make_shared>([iterator]() mutable -> String + std::vector ids_of_hosts; + for (const auto & shard : cluster->getShardsInfo()) { - auto object_info = iterator->next(0); - if (!object_info) - return ""; + if (shard.per_replica_pools.empty()) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Cluster {} with empty shard {}", cluster->getName(), shard.shard_num); + for (const auto & replica : shard.per_replica_pools) + { + if (!replica) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Cluster {}, shard {} with empty node", cluster->getName(), shard.shard_num); + ids_of_hosts.push_back(replica->getAddress()); + } + } + + auto task_distributor = std::make_shared(iterator, ids_of_hosts); - auto archive_object_info = std::dynamic_pointer_cast(object_info); - if (archive_object_info) - return archive_object_info->getPathToArchive(); + auto callback = std::make_shared( + [task_distributor](size_t number_of_current_replica) mutable -> String { + return task_distributor->getNextTask(number_of_current_replica).value_or(""); + }); - return object_info->getPath(); - }); return RemoteQueryExecutor::Extension{ .task_iterator = std::move(callback) }; } diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.h b/src/Storages/ObjectStorage/StorageObjectStorageCluster.h index 0088ff28fc22..5b979f3dcf00 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.h +++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.h @@ -25,7 +25,9 @@ class StorageObjectStorageCluster : public IStorageCluster std::string getName() const override; RemoteQueryExecutor::Extension getTaskIteratorExtension( - const ActionsDAG::Node * predicate, const ContextPtr & context) const override; + const ActionsDAG::Node * predicate, + const ContextPtr & context, + ClusterPtr cluster) const override; String getPathSample(StorageInMemoryMetadata metadata, ContextPtr context); diff --git a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp new file mode 100644 index 000000000000..d9ca7b344637 --- /dev/null +++ b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp @@ -0,0 +1,182 @@ +#include "StorageObjectStorageStableTaskDistributor.h" +#include +#include +#include + +namespace DB +{ + +StorageObjectStorageStableTaskDistributor::StorageObjectStorageStableTaskDistributor( + std::shared_ptr iterator_, + std::vector ids_of_nodes_) + : iterator(std::move(iterator_)) + , connection_to_files(ids_of_nodes_.size()) + , ids_of_nodes(ids_of_nodes_) + , iterator_exhausted(false) +{ +} + +std::optional StorageObjectStorageStableTaskDistributor::getNextTask(size_t number_of_current_replica) +{ + LOG_TRACE( + log, + "Received a new connection from replica {} looking for a file", + number_of_current_replica + ); + + // 1. Check pre-queued files first + if (auto file = getPreQueuedFile(number_of_current_replica)) + return file; + + // 2. Try to find a matching file from the iterator + if (auto file = getMatchingFileFromIterator(number_of_current_replica)) + return file; + + // 3. Process unprocessed files if iterator is exhausted + return getAnyUnprocessedFile(number_of_current_replica); +} + +size_t StorageObjectStorageStableTaskDistributor::getReplicaForFile(const String & file_path) +{ + size_t nodes_count = ids_of_nodes.size(); + + /// Trivial case + if (nodes_count < 2) + return 0; + + /// Rendezvous hashing + size_t best_id = 0; + UInt64 best_weight = sipHash64(ids_of_nodes[0] + file_path); + for (size_t id = 1; id < nodes_count; ++id) + { + UInt64 weight = sipHash64(ids_of_nodes[id] + file_path); + if (weight > best_weight) + { + best_weight = weight; + best_id = id; + } + } + return best_id; +} + +std::optional StorageObjectStorageStableTaskDistributor::getPreQueuedFile(size_t number_of_current_replica) +{ + std::lock_guard lock(mutex); + + if (connection_to_files.size() <= number_of_current_replica) + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "Replica number {} is out of range. Expected range: [0, {})", + number_of_current_replica, + connection_to_files.size() + ); + + auto & files = connection_to_files[number_of_current_replica]; + + while (!files.empty()) + { + String next_file = files.back(); + files.pop_back(); + + auto it = unprocessed_files.find(next_file); + if (it == unprocessed_files.end()) + continue; + + unprocessed_files.erase(it); + + LOG_TRACE( + log, + "Assigning pre-queued file {} to replica {}", + next_file, + number_of_current_replica + ); + + return next_file; + } + + return std::nullopt; +} + +std::optional StorageObjectStorageStableTaskDistributor::getMatchingFileFromIterator(size_t number_of_current_replica) +{ + { + std::lock_guard lock(mutex); + if (iterator_exhausted) + return std::nullopt; + } + + while (true) + { + ObjectInfoPtr object_info; + + { + std::lock_guard lock(mutex); + object_info = iterator->next(0); + + if (!object_info) + { + iterator_exhausted = true; + break; + } + } + + String file_path; + + auto archive_object_info = std::dynamic_pointer_cast(object_info); + if (archive_object_info) + { + file_path = archive_object_info->getPathToArchive(); + } + else + { + file_path = object_info->getPath(); + } + + size_t file_replica_idx = getReplicaForFile(file_path); + if (file_replica_idx == number_of_current_replica) + { + LOG_TRACE( + log, + "Found file {} for replica {}", + file_path, + number_of_current_replica + ); + + return file_path; + } + + // Queue file for its assigned replica + { + std::lock_guard lock(mutex); + unprocessed_files.insert(file_path); + connection_to_files[file_replica_idx].push_back(file_path); + } + } + + return std::nullopt; +} + +std::optional StorageObjectStorageStableTaskDistributor::getAnyUnprocessedFile(size_t number_of_current_replica) +{ + std::lock_guard lock(mutex); + + if (!unprocessed_files.empty()) + { + auto it = unprocessed_files.begin(); + String next_file = *it; + unprocessed_files.erase(it); + + LOG_TRACE( + log, + "Iterator exhausted. Assigning unprocessed file {} to replica {}", + next_file, + number_of_current_replica + ); + + return next_file; + } + + return std::nullopt; +} + +} diff --git a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h new file mode 100644 index 000000000000..678ff4372f5f --- /dev/null +++ b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h @@ -0,0 +1,44 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace DB +{ + +class StorageObjectStorageStableTaskDistributor +{ +public: + StorageObjectStorageStableTaskDistributor( + std::shared_ptr iterator_, + std::vector ids_of_nodes_); + + std::optional getNextTask(size_t number_of_current_replica); + +private: + size_t getReplicaForFile(const String & file_path); + std::optional getPreQueuedFile(size_t number_of_current_replica); + std::optional getMatchingFileFromIterator(size_t number_of_current_replica); + std::optional getAnyUnprocessedFile(size_t number_of_current_replica); + + std::shared_ptr iterator; + + std::vector> connection_to_files; + std::unordered_set unprocessed_files; + + std::vector ids_of_nodes; + + std::mutex mutex; + bool iterator_exhausted = false; + + LoggerPtr log = getLogger("StorageClusterTaskDistributor"); +}; + +} diff --git a/src/Storages/StorageDistributed.cpp b/src/Storages/StorageDistributed.cpp index ca4b7206755d..571098cafb04 100644 --- a/src/Storages/StorageDistributed.cpp +++ b/src/Storages/StorageDistributed.cpp @@ -1135,9 +1135,6 @@ std::optional StorageDistributed::distributedWriteFromClusterStor if (filter) predicate = filter->getOutputs().at(0); - /// Select query is needed for pruining on virtual columns - auto extension = src_storage_cluster.getTaskIteratorExtension(predicate, local_context); - auto dst_cluster = getCluster(); auto new_query = std::dynamic_pointer_cast(query.clone()); @@ -1164,8 +1161,13 @@ std::optional StorageDistributed::distributedWriteFromClusterStor const auto & current_settings = query_context->getSettingsRef(); auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(current_settings); - /// Here we take addresses from destination cluster and assume source table exists on these nodes const auto cluster = getCluster(); + + /// Select query is needed for pruining on virtual columns + auto extension = src_storage_cluster.getTaskIteratorExtension(predicate, local_context, cluster); + + /// Here we take addresses from destination cluster and assume source table exists on these nodes + size_t replica_index = 0; for (const auto & replicas : cluster->getShardsInfo()) { /// Skip unavailable hosts if necessary @@ -1174,6 +1176,8 @@ std::optional StorageDistributed::distributedWriteFromClusterStor /// There will be only one replica, because we consider each replica as a shard for (const auto & try_result : try_results) { + IConnections::ReplicaInfo replica_info{ .number_of_current_replica = replica_index++ }; + auto remote_query_executor = std::make_shared( std::vector{try_result}, new_query_str, @@ -1183,7 +1187,7 @@ std::optional StorageDistributed::distributedWriteFromClusterStor Scalars{}, Tables{}, QueryProcessingStage::Complete, - extension); + RemoteQueryExecutor::Extension{.task_iterator = extension.task_iterator, .replica_info = std::move(replica_info)}); QueryPipeline remote_pipeline(std::make_shared( remote_query_executor, false, settings[Setting::async_socket_for_remote], settings[Setting::async_query_sending_for_remote])); diff --git a/src/Storages/StorageFileCluster.cpp b/src/Storages/StorageFileCluster.cpp index 269bc61a0d11..d70e92fa3b45 100644 --- a/src/Storages/StorageFileCluster.cpp +++ b/src/Storages/StorageFileCluster.cpp @@ -77,10 +77,13 @@ void StorageFileCluster::updateQueryToSendIfNeeded(DB::ASTPtr & query, const Sto ); } -RemoteQueryExecutor::Extension StorageFileCluster::getTaskIteratorExtension(const ActionsDAG::Node * predicate, const ContextPtr & context) const +RemoteQueryExecutor::Extension StorageFileCluster::getTaskIteratorExtension( + const ActionsDAG::Node * predicate, + const ContextPtr & context, + ClusterPtr) const { auto iterator = std::make_shared(paths, std::nullopt, predicate, getVirtualsList(), context); - auto callback = std::make_shared([iter = std::move(iterator)]() mutable -> String { return iter->next(); }); + auto callback = std::make_shared([iter = std::move(iterator)](size_t) mutable -> String { return iter->next(); }); return RemoteQueryExecutor::Extension{.task_iterator = std::move(callback)}; } diff --git a/src/Storages/StorageFileCluster.h b/src/Storages/StorageFileCluster.h index 9549f3a035c3..49d39a24ceba 100644 --- a/src/Storages/StorageFileCluster.h +++ b/src/Storages/StorageFileCluster.h @@ -27,7 +27,10 @@ class StorageFileCluster : public IStorageCluster const ConstraintsDescription & constraints_); std::string getName() const override { return "FileCluster"; } - RemoteQueryExecutor::Extension getTaskIteratorExtension(const ActionsDAG::Node * predicate, const ContextPtr & context) const override; + RemoteQueryExecutor::Extension getTaskIteratorExtension( + const ActionsDAG::Node * predicate, + const ContextPtr & context, + ClusterPtr) const override; private: void updateQueryToSendIfNeeded(ASTPtr & query, const StorageSnapshotPtr & storage_snapshot, const ContextPtr & context) override; diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 1e466d2fc699..efd998aa6d54 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -5993,7 +5993,6 @@ SinkToStoragePtr StorageReplicatedMergeTree::write(const ASTPtr & /*query*/, con std::optional StorageReplicatedMergeTree::distributedWriteFromClusterStorage(const std::shared_ptr & src_storage_cluster, const ASTInsertQuery & query, ContextPtr local_context) { const auto & settings = local_context->getSettingsRef(); - auto extension = src_storage_cluster->getTaskIteratorExtension(nullptr, local_context); /// Here we won't check that the cluster formed from table replicas is a subset of a cluster specified in s3Cluster/hdfsCluster table function auto src_cluster = src_storage_cluster->getCluster(local_context); @@ -6012,6 +6011,9 @@ std::optional StorageReplicatedMergeTree::distributedWriteFromClu ContextMutablePtr query_context = Context::createCopy(local_context); query_context->increaseDistributedDepth(); + auto extension = src_storage_cluster->getTaskIteratorExtension(nullptr, local_context, src_cluster); + + size_t replica_index = 0; for (const auto & replicas : src_cluster->getShardsAddresses()) { /// There will be only one replica, because we consider each replica as a shard @@ -6026,6 +6028,8 @@ std::optional StorageReplicatedMergeTree::distributedWriteFromClu node.secure ); + IConnections::ReplicaInfo replica_info{ .number_of_current_replica = replica_index++ }; + auto remote_query_executor = std::make_shared( connection, query_str, @@ -6035,7 +6039,7 @@ std::optional StorageReplicatedMergeTree::distributedWriteFromClu Scalars{}, Tables{}, QueryProcessingStage::Complete, - extension); + RemoteQueryExecutor::Extension{.task_iterator = extension.task_iterator, .replica_info = std::move(replica_info)}); QueryPipeline remote_pipeline(std::make_shared( remote_query_executor, false, settings[Setting::async_socket_for_remote], settings[Setting::async_query_sending_for_remote])); diff --git a/src/Storages/StorageURLCluster.cpp b/src/Storages/StorageURLCluster.cpp index 2600f8105ea3..7ba7f22c62e8 100644 --- a/src/Storages/StorageURLCluster.cpp +++ b/src/Storages/StorageURLCluster.cpp @@ -114,11 +114,14 @@ void StorageURLCluster::updateQueryToSendIfNeeded(ASTPtr & query, const StorageS ); } -RemoteQueryExecutor::Extension StorageURLCluster::getTaskIteratorExtension(const ActionsDAG::Node * predicate, const ContextPtr & context) const +RemoteQueryExecutor::Extension StorageURLCluster::getTaskIteratorExtension( + const ActionsDAG::Node * predicate, + const ContextPtr & context, + ClusterPtr) const { auto iterator = std::make_shared( uri, context->getSettingsRef()[Setting::glob_expansion_max_elements], predicate, getVirtualsList(), context); - auto callback = std::make_shared([iter = std::move(iterator)]() mutable -> String { return iter->next(); }); + auto callback = std::make_shared([iter = std::move(iterator)](size_t) mutable -> String { return iter->next(); }); return RemoteQueryExecutor::Extension{.task_iterator = std::move(callback)}; } diff --git a/src/Storages/StorageURLCluster.h b/src/Storages/StorageURLCluster.h index 31bffa062104..9bfbaffe30f8 100644 --- a/src/Storages/StorageURLCluster.h +++ b/src/Storages/StorageURLCluster.h @@ -30,7 +30,10 @@ class StorageURLCluster : public IStorageCluster const StorageURL::Configuration & configuration_); std::string getName() const override { return "URLCluster"; } - RemoteQueryExecutor::Extension getTaskIteratorExtension(const ActionsDAG::Node * predicate, const ContextPtr & context) const override; + RemoteQueryExecutor::Extension getTaskIteratorExtension( + const ActionsDAG::Node * predicate, + const ContextPtr & context, + ClusterPtr) const override; private: void updateQueryToSendIfNeeded(ASTPtr & query, const StorageSnapshotPtr & storage_snapshot, const ContextPtr & context) override;