diff --git a/src/Core/Protocol.h b/src/Core/Protocol.h index e197db70f04e..adfef5f48c8e 100644 --- a/src/Core/Protocol.h +++ b/src/Core/Protocol.h @@ -96,8 +96,10 @@ namespace Protocol MergeTreeReadTaskRequest = 16, /// Request from a MergeTree replica to a coordinator TimezoneUpdate = 17, /// Receive server's (session-wide) default timezone SSHChallenge = 18, /// Return challenge for SSH signature signing + MAX = SSHChallenge, + ConnectionLost = 255, /// Exception that occurred on the client side. }; /// NOTE: If the type of packet argument would be Enum, the comparison packet >= 0 && packet < 10 diff --git a/src/Core/Settings.cpp b/src/Core/Settings.cpp index 9f3ac9367df0..76fdaa95c68d 100644 --- a/src/Core/Settings.cpp +++ b/src/Core/Settings.cpp @@ -6883,6 +6883,9 @@ Possible values: - '' - do not force any kind of Exchange operators, let the optimizer choose, - 'Persisted' - use temporary files in object storage, - 'Streaming' - stream exchange data over network. +)", EXPERIMENTAL) \ + DECLARE(Bool, allow_retries_in_cluster_requests, false, R"( +Allow retries in cluster request, when one node goes offline )", EXPERIMENTAL) \ DECLARE(Bool, object_storage_remote_initiator, false, R"( Execute request to object storage as remote on one of object_storage_cluster nodes. diff --git a/src/Core/SettingsChangesHistory.cpp b/src/Core/SettingsChangesHistory.cpp index 766bb8fe5c9f..85a3965af357 100644 --- a/src/Core/SettingsChangesHistory.cpp +++ b/src/Core/SettingsChangesHistory.cpp @@ -196,6 +196,8 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory() {"parallel_replicas_for_cluster_engines", false, true, "New setting."}, {"parallel_hash_join_threshold", 0, 0, "New setting"}, /// Release closed. Please use 25.4 + {"use_object_storage_list_objects_cache", true, false, "New setting."}, + {"allow_retries_in_cluster_requests", false, false, "New setting."}, }); addSettingsChanges(settings_changes_history, "24.12.2.20000", { diff --git a/src/QueryPipeline/RemoteQueryExecutor.cpp b/src/QueryPipeline/RemoteQueryExecutor.cpp index eb709d2c49cb..a11bfb64cca1 100644 --- a/src/QueryPipeline/RemoteQueryExecutor.cpp +++ b/src/QueryPipeline/RemoteQueryExecutor.cpp @@ -52,6 +52,7 @@ namespace Setting extern const SettingsBool use_hedged_requests; extern const SettingsBool push_external_roles_in_interserver_queries; extern const SettingsMilliseconds parallel_replicas_connect_timeout_ms; + extern const SettingsBool allow_retries_in_cluster_requests; } namespace ErrorCodes @@ -82,6 +83,7 @@ RemoteQueryExecutor::RemoteQueryExecutor( , extension(extension_) , priority_func(priority_func_) , read_packet_type_separately(context->canUseParallelReplicasOnInitiator() && !context->getSettingsRef()[Setting::use_hedged_requests]) + , allow_retries_in_cluster_requests(context->getSettingsRef()[Setting::allow_retries_in_cluster_requests]) { if (stage == QueryProcessingStage::QueryPlan && !query_plan) throw Exception(ErrorCodes::LOGICAL_ERROR, "Query plan is not passed for QueryPlan processing stage"); @@ -484,7 +486,8 @@ int RemoteQueryExecutor::sendQueryAsync() read_context = std::make_unique( *this, /*suspend_when_query_sent*/ true, - read_packet_type_separately); + read_packet_type_separately, + allow_retries_in_cluster_requests); /// If query already sent, do nothing. Note that we cannot use sent_query flag here, /// because we can still be in process of sending scalars or external tables. @@ -557,7 +560,8 @@ RemoteQueryExecutor::ReadResult RemoteQueryExecutor::readAsync() read_context = std::make_unique( *this, /*suspend_when_query_sent*/ false, - read_packet_type_separately); + read_packet_type_separately, + allow_retries_in_cluster_requests); recreate_read_context = false; } @@ -681,7 +685,11 @@ RemoteQueryExecutor::ReadResult RemoteQueryExecutor::processPacket(Packet packet /// We can actually return it, and the first call to RemoteQueryExecutor::read /// will return earlier. We should consider doing it. if (packet.block && (packet.block.rows() > 0)) + { + if (extension && extension->replica_info) + replica_has_processed_data.insert(extension->replica_info->number_of_current_replica); return ReadResult(adaptBlockStructure(packet.block, header)); + } break; /// If the block is empty - we will receive other packets before EndOfStream. case Protocol::Server::Exception: @@ -743,6 +751,22 @@ RemoteQueryExecutor::ReadResult RemoteQueryExecutor::processPacket(Packet packet case Protocol::Server::TimezoneUpdate: break; + case Protocol::Server::ConnectionLost: + if (allow_retries_in_cluster_requests) + { + if (extension && extension->task_iterator && extension->task_iterator->supportRerunTask() && extension->replica_info) + { + if (!replica_has_processed_data.contains(extension->replica_info->number_of_current_replica)) + { + finished = true; + extension->task_iterator->rescheduleTasksFromReplica(extension->replica_info->number_of_current_replica); + return ReadResult(Block{}); + } + } + } + packet.exception->rethrow(); + break; + default: got_unknown_packet_from_replica = true; throw Exception( diff --git a/src/QueryPipeline/RemoteQueryExecutor.h b/src/QueryPipeline/RemoteQueryExecutor.h index c32d2fbce19e..cc4201ae7f01 100644 --- a/src/QueryPipeline/RemoteQueryExecutor.h +++ b/src/QueryPipeline/RemoteQueryExecutor.h @@ -28,8 +28,22 @@ class RemoteQueryExecutorReadContext; class ParallelReplicasReadingCoordinator; -/// This is the same type as StorageS3Source::IteratorWrapper -using TaskIterator = std::function; +namespace ErrorCodes +{ + extern const int NOT_IMPLEMENTED; +}; + +class TaskIterator +{ +public: + virtual ~TaskIterator() = default; + virtual bool supportRerunTask() const { return false; } + virtual void rescheduleTasksFromReplica(size_t /* number_of_current_replica */) + { + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method rescheduleTasksFromReplica is not implemented"); + } + virtual std::string operator()(size_t number_of_current_replica) const = 0; +}; /// This class allows one to launch queries on remote replicas of one shard and get results class RemoteQueryExecutor @@ -331,6 +345,10 @@ class RemoteQueryExecutor const bool read_packet_type_separately = false; + const bool allow_retries_in_cluster_requests = false; + + std::unordered_set replica_has_processed_data; + /// Send all scalars to remote servers void sendScalars(); diff --git a/src/QueryPipeline/RemoteQueryExecutorReadContext.cpp b/src/QueryPipeline/RemoteQueryExecutorReadContext.cpp index 9090d045daae..0e3fb4952eb4 100644 --- a/src/QueryPipeline/RemoteQueryExecutorReadContext.cpp +++ b/src/QueryPipeline/RemoteQueryExecutorReadContext.cpp @@ -19,11 +19,15 @@ namespace ErrorCodes } RemoteQueryExecutorReadContext::RemoteQueryExecutorReadContext( - RemoteQueryExecutor & executor_, bool suspend_when_query_sent_, bool read_packet_type_separately_) + RemoteQueryExecutor & executor_, + bool suspend_when_query_sent_, + bool read_packet_type_separately_, + bool allow_retries_in_cluster_requests_) : AsyncTaskExecutor(std::make_unique(*this)) , executor(executor_) , suspend_when_query_sent(suspend_when_query_sent_) , read_packet_type_separately(read_packet_type_separately_) + , allow_retries_in_cluster_requests(allow_retries_in_cluster_requests_) { if (-1 == pipe2(pipe_fd, O_NONBLOCK)) throw ErrnoException(ErrorCodes::CANNOT_OPEN_FILE, "Cannot create pipe"); @@ -54,18 +58,29 @@ void RemoteQueryExecutorReadContext::Task::run(AsyncCallback async_callback, Sus if (read_context.executor.needToSkipUnavailableShard()) return; - while (true) + try { - read_context.has_read_packet_part = PacketPart::None; - - if (read_context.read_packet_type_separately) + while (true) { - read_context.packet.type = read_context.executor.getConnections().receivePacketTypeUnlocked(async_callback); - read_context.has_read_packet_part = PacketPart::Type; + read_context.has_read_packet_part = PacketPart::None; + + if (read_context.read_packet_type_separately) + { + read_context.packet.type = read_context.executor.getConnections().receivePacketTypeUnlocked(async_callback); + read_context.has_read_packet_part = PacketPart::Type; + suspend_callback(); + } + read_context.packet = read_context.executor.getConnections().receivePacketUnlocked(async_callback); + read_context.has_read_packet_part = PacketPart::Body; suspend_callback(); } - read_context.packet = read_context.executor.getConnections().receivePacketUnlocked(async_callback); - read_context.has_read_packet_part = PacketPart::Body; + } + catch (const Exception &) + { + if (!read_context.allow_retries_in_cluster_requests) + throw; + read_context.packet.type = Protocol::Server::ConnectionLost; + read_context.packet.exception = std::make_unique(getCurrentExceptionMessageAndPattern(true), getCurrentExceptionCode()); suspend_callback(); } } diff --git a/src/QueryPipeline/RemoteQueryExecutorReadContext.h b/src/QueryPipeline/RemoteQueryExecutorReadContext.h index abde6cb93ef3..d850244bed6d 100644 --- a/src/QueryPipeline/RemoteQueryExecutorReadContext.h +++ b/src/QueryPipeline/RemoteQueryExecutorReadContext.h @@ -26,7 +26,10 @@ class RemoteQueryExecutorReadContext : public AsyncTaskExecutor { public: explicit RemoteQueryExecutorReadContext( - RemoteQueryExecutor & executor_, bool suspend_when_query_sent_, bool read_packet_type_separately_); + RemoteQueryExecutor & executor_, + bool suspend_when_query_sent_, + bool read_packet_type_separately_, + bool allow_retries_in_cluster_requests_); ~RemoteQueryExecutorReadContext() override; @@ -108,6 +111,7 @@ class RemoteQueryExecutorReadContext : public AsyncTaskExecutor bool suspend_when_query_sent = false; bool is_query_sent = false; const bool read_packet_type_separately = false; + const bool allow_retries_in_cluster_requests = false; }; } diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp index f2a9615fb86f..ed34199adb07 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp @@ -439,6 +439,30 @@ void StorageObjectStorageCluster::updateQueryToSendIfNeeded( args.insert(args.end(), object_storage_type_arg); } +class TaskDistributor : public TaskIterator +{ +public: + TaskDistributor(std::shared_ptr iterator, + const std::vector & ids_of_hosts, + uint64_t lock_object_storage_task_distribution_ms + ) + : task_distributor(iterator, ids_of_hosts, lock_object_storage_task_distribution_ms) {} + ~TaskDistributor() override = default; + bool supportRerunTask() const override { return true; } + void rescheduleTasksFromReplica(size_t number_of_current_replica) override + { + task_distributor.rescheduleTasksFromReplica(number_of_current_replica); + } + + std::string operator()(size_t number_of_current_replica) const override + { + return task_distributor.getNextTask(number_of_current_replica).value_or(""); + } + +private: + mutable StorageObjectStorageStableTaskDistributor task_distributor; +}; + RemoteQueryExecutor::Extension StorageObjectStorageCluster::getTaskIteratorExtension( const ActionsDAG::Node * predicate, const std::optional & filter_actions_dag, @@ -474,14 +498,7 @@ RemoteQueryExecutor::Extension StorageObjectStorageCluster::getTaskIteratorExten lock_object_storage_task_distribution_ms_max ); - auto task_distributor = std::make_shared( - iterator, - ids_of_hosts, - lock_object_storage_task_distribution_ms); - - auto callback = std::make_shared( - [task_distributor](size_t number_of_current_replica) mutable -> String - { return task_distributor->getNextTask(number_of_current_replica).value_or(""); }); + auto callback = std::make_shared(iterator, ids_of_hosts, lock_object_storage_task_distribution_ms); return RemoteQueryExecutor::Extension{.task_iterator = std::move(callback)}; } diff --git a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp index 5f709f2ec1d9..78652fd803e3 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp @@ -9,7 +9,8 @@ namespace DB namespace ErrorCodes { extern const int LOGICAL_ERROR; -} + extern const int CANNOT_READ_ALL_DATA; +}; StorageObjectStorageStableTaskDistributor::StorageObjectStorageStableTaskDistributor( std::shared_ptr iterator_, @@ -21,6 +22,9 @@ StorageObjectStorageStableTaskDistributor::StorageObjectStorageStableTaskDistrib , lock_object_storage_task_distribution_us(lock_object_storage_task_distribution_ms_ * 1000) , iterator_exhausted(false) { + size_t nodes = ids_of_nodes.size(); + for (size_t i = 0; i < nodes; ++i) + replica_to_files_to_be_processed[i] = std::list{}; } std::optional StorageObjectStorageStableTaskDistributor::getNextTask(size_t number_of_current_replica) @@ -36,16 +40,27 @@ std::optional StorageObjectStorageStableTaskDistributor::getNextTask(siz saveLastNodeActivity(number_of_current_replica); - // 1. Check pre-queued files first - if (auto file = getPreQueuedFile(number_of_current_replica)) - return file; + auto processed_file_list_ptr = replica_to_files_to_be_processed.find(number_of_current_replica); + if (processed_file_list_ptr == replica_to_files_to_be_processed.end()) + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "Replica number {} was marked as lost, can't set task for it anymore", + number_of_current_replica + ); + // 1. Check pre-queued files first + std::optional file = getPreQueuedFile(number_of_current_replica); // 2. Try to find a matching file from the iterator - if (auto file = getMatchingFileFromIterator(number_of_current_replica)) - return file; - + if (!file.has_value()) + file = getMatchingFileFromIterator(number_of_current_replica); // 3. Process unprocessed files if iterator is exhausted - return getAnyUnprocessedFile(number_of_current_replica); + if (!file.has_value()) + file = getAnyUnprocessedFile(number_of_current_replica); + + if (file.has_value()) + processed_file_list_ptr->second.push_back(*file); + + return file; } size_t StorageObjectStorageStableTaskDistributor::getReplicaForFile(const String & file_path) @@ -57,16 +72,27 @@ size_t StorageObjectStorageStableTaskDistributor::getReplicaForFile(const String return 0; /// Rendezvous hashing - size_t best_id = 0; - UInt64 best_weight = sipHash64(ids_of_nodes[0] + file_path); - for (size_t id = 1; id < nodes_count; ++id) + auto replica = replica_to_files_to_be_processed.begin(); + if (replica == replica_to_files_to_be_processed.end()) + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "No active replicas, can't find best replica for file {}", + file_path + ); + + size_t best_id = replica->first; + UInt64 best_weight = sipHash64(ids_of_nodes[best_id] + file_path); + ++replica; + while (replica != replica_to_files_to_be_processed.end()) { + size_t id = replica->first; UInt64 weight = sipHash64(ids_of_nodes[id] + file_path); if (weight > best_weight) { best_weight = weight; best_id = id; } + ++replica; } return best_id; } @@ -230,4 +256,28 @@ void StorageObjectStorageStableTaskDistributor::saveLastNodeActivity(size_t numb last_node_activity[number_of_current_replica] = now; } +void StorageObjectStorageStableTaskDistributor::rescheduleTasksFromReplica(size_t number_of_current_replica) +{ + LOG_INFO(log, "Replica {} is marked as lost, tasks are returned to queue", number_of_current_replica); + std::lock_guard lock(mutex); + + auto processed_file_list_ptr = replica_to_files_to_be_processed.find(number_of_current_replica); + if (processed_file_list_ptr == replica_to_files_to_be_processed.end()) + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "Replica number {} was marked as lost already", + number_of_current_replica + ); + + if (replica_to_files_to_be_processed.size() < 2) + throw Exception( + ErrorCodes::CANNOT_READ_ALL_DATA, + "All replicas were marked as lost" + ); + + replica_to_files_to_be_processed.erase(number_of_current_replica); + for (const auto & file_path : processed_file_list_ptr->second) + unprocessed_files[file_path] = getReplicaForFile(file_path); +} + } diff --git a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h index 2132ba95a752..49b4c811c6ae 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h +++ b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h @@ -10,6 +10,7 @@ #include #include +#include #include #include #include @@ -27,6 +28,9 @@ class StorageObjectStorageStableTaskDistributor std::optional getNextTask(size_t number_of_current_replica); + /// Insert objects back to unprocessed files + void rescheduleTasksFromReplica(size_t number_of_current_replica); + private: size_t getReplicaForFile(const String & file_path); std::optional getPreQueuedFile(size_t number_of_current_replica); @@ -44,6 +48,7 @@ class StorageObjectStorageStableTaskDistributor std::vector ids_of_nodes; std::unordered_map last_node_activity; Poco::Timestamp::TimeDiff lock_object_storage_task_distribution_us; + std::unordered_map> replica_to_files_to_be_processed; std::mutex mutex; bool iterator_exhausted = false; diff --git a/src/Storages/StorageFileCluster.cpp b/src/Storages/StorageFileCluster.cpp index de825df3a255..93abd607ae16 100644 --- a/src/Storages/StorageFileCluster.cpp +++ b/src/Storages/StorageFileCluster.cpp @@ -94,14 +94,49 @@ void StorageFileCluster::updateQueryToSendIfNeeded(DB::ASTPtr & query, const Sto ); } +class FileTaskIterator : public TaskIterator +{ +public: + FileTaskIterator(const Strings & files, + std::optional archive_info, + const ActionsDAG::Node * predicate, + const NamesAndTypesList & virtual_columns, + const NamesAndTypesList & hive_partition_columns_to_read_from_file_path, + const ContextPtr & context, + bool distributed_processing = false) + : iterator(files + , archive_info + , predicate + , virtual_columns + , hive_partition_columns_to_read_from_file_path + , context + , distributed_processing) {} + + ~FileTaskIterator() override = default; + + std::string operator()(size_t /* number_of_current_replica */) const override + { + return iterator.next(); + } + +private: + mutable StorageFileSource::FilesIterator iterator; +}; + RemoteQueryExecutor::Extension StorageFileCluster::getTaskIteratorExtension( const ActionsDAG::Node * predicate, const std::optional & /* filter_actions_dag */, const ContextPtr & context, ClusterPtr) const { - auto iterator = std::make_shared(paths, std::nullopt, predicate, getVirtualsList(), hive_partition_columns_to_read_from_file_path, context); - auto callback = std::make_shared([iter = std::move(iterator)](size_t) mutable -> String { return iter->next(); }); + auto callback = std::make_shared( + paths, + std::nullopt, + predicate, + getVirtualsList(), + hive_partition_columns_to_read_from_file_path, + context + ); return RemoteQueryExecutor::Extension{.task_iterator = std::move(callback)}; } diff --git a/src/Storages/StorageURLCluster.cpp b/src/Storages/StorageURLCluster.cpp index 5570392477e8..ce040d45aab0 100644 --- a/src/Storages/StorageURLCluster.cpp +++ b/src/Storages/StorageURLCluster.cpp @@ -126,15 +126,42 @@ void StorageURLCluster::updateQueryToSendIfNeeded(ASTPtr & query, const StorageS ); } +class UrlTaskIterator : public TaskIterator +{ +public: + UrlTaskIterator(const String & uri, + size_t max_addresses, + const ActionsDAG::Node * predicate, + const NamesAndTypesList & virtual_columns, + const NamesAndTypesList & hive_partition_columns_to_read_from_file_path, + const ContextPtr & context) + : iterator(uri, max_addresses, predicate, virtual_columns, hive_partition_columns_to_read_from_file_path, context) {} + + ~UrlTaskIterator() override = default; + + std::string operator()(size_t /* number_of_current_replica */) const override + { + return iterator.next(); + } + +private: + mutable StorageURLSource::DisclosedGlobIterator iterator; +}; + RemoteQueryExecutor::Extension StorageURLCluster::getTaskIteratorExtension( const ActionsDAG::Node * predicate, const std::optional & /* filter_actions_dag */, const ContextPtr & context, ClusterPtr) const { - auto iterator = std::make_shared( - uri, context->getSettingsRef()[Setting::glob_expansion_max_elements], predicate, getVirtualsList(), hive_partition_columns_to_read_from_file_path, context); - auto callback = std::make_shared([iter = std::move(iterator)](size_t) mutable -> String { return iter->next(); }); + auto callback = std::make_shared( + uri, + context->getSettingsRef()[Setting::glob_expansion_max_elements], + predicate, + getVirtualsList(), + hive_partition_columns_to_read_from_file_path, + context + ); return RemoteQueryExecutor::Extension{.task_iterator = std::move(callback)}; }