From 628904a71afc0f0921f8b7ff73699b7ee368f7f2 Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Tue, 9 Dec 2025 13:33:08 +0100 Subject: [PATCH 1/2] Remove useless allow_retries_in_cluster_requests setting --- src/Core/Settings.cpp | 4 +- src/QueryPipeline/RemoteQueryExecutor.cpp | 25 +-------- src/QueryPipeline/RemoteQueryExecutor.h | 4 -- .../RemoteQueryExecutorReadContext.cpp | 52 ++++--------------- .../RemoteQueryExecutorReadContext.h | 4 +- 5 files changed, 15 insertions(+), 74 deletions(-) diff --git a/src/Core/Settings.cpp b/src/Core/Settings.cpp index 30728305319c..79ff0d5adeff 100644 --- a/src/Core/Settings.cpp +++ b/src/Core/Settings.cpp @@ -7103,9 +7103,6 @@ Use Shuffle aggregation strategy instead of PartialAggregation + Merge in distri )", EXPERIMENTAL) \ DECLARE(Bool, allow_experimental_iceberg_read_optimization, true, R"( Allow Iceberg read optimization based on Iceberg metadata. -)", EXPERIMENTAL) \ - DECLARE(Bool, allow_retries_in_cluster_requests, false, R"( -Allow retries in cluster request, when one node goes offline )", EXPERIMENTAL) \ DECLARE(Bool, object_storage_remote_initiator, false, R"( Execute request to object storage as remote on one of object_storage_cluster nodes. @@ -7227,6 +7224,7 @@ Sets the evaluation time to be used with promql dialect. 'auto' means the curren MAKE_OBSOLETE(M, Bool, allow_experimental_shared_set_join, true) \ MAKE_OBSOLETE(M, UInt64, min_external_sort_block_bytes, 100_MiB) \ MAKE_OBSOLETE(M, UInt64, distributed_cache_read_alignment, 0) \ + MAKE_OBSOLETE(M, Bool, allow_retries_in_cluster_requests, false) \ /** The section above is for obsolete settings. Do not add anything there. */ #endif /// __CLION_IDE__ diff --git a/src/QueryPipeline/RemoteQueryExecutor.cpp b/src/QueryPipeline/RemoteQueryExecutor.cpp index 83f33a52de41..887710e46b63 100644 --- a/src/QueryPipeline/RemoteQueryExecutor.cpp +++ b/src/QueryPipeline/RemoteQueryExecutor.cpp @@ -52,7 +52,6 @@ namespace Setting extern const SettingsBool use_hedged_requests; extern const SettingsBool push_external_roles_in_interserver_queries; extern const SettingsMilliseconds parallel_replicas_connect_timeout_ms; - extern const SettingsBool allow_retries_in_cluster_requests; } namespace ErrorCodes @@ -83,7 +82,6 @@ RemoteQueryExecutor::RemoteQueryExecutor( , extension(extension_) , priority_func(priority_func_) , read_packet_type_separately(context->canUseParallelReplicasOnInitiator() && !context->getSettingsRef()[Setting::use_hedged_requests]) - , allow_retries_in_cluster_requests(context->getSettingsRef()[Setting::allow_retries_in_cluster_requests]) { if (stage == QueryProcessingStage::QueryPlan && !query_plan) throw Exception(ErrorCodes::LOGICAL_ERROR, "Query plan is not passed for QueryPlan processing stage"); @@ -468,8 +466,7 @@ int RemoteQueryExecutor::sendQueryAsync() read_context = std::make_unique( *this, /*suspend_when_query_sent*/ true, - read_packet_type_separately, - allow_retries_in_cluster_requests); + read_packet_type_separately); /// If query already sent, do nothing. Note that we cannot use sent_query flag here, /// because we can still be in process of sending scalars or external tables. @@ -542,8 +539,7 @@ RemoteQueryExecutor::ReadResult RemoteQueryExecutor::readAsync() read_context = std::make_unique( *this, /*suspend_when_query_sent*/ false, - read_packet_type_separately, - allow_retries_in_cluster_requests); + read_packet_type_separately); recreate_read_context = false; } @@ -734,18 +730,6 @@ RemoteQueryExecutor::ReadResult RemoteQueryExecutor::processPacket(Packet packet break; case Protocol::Server::ConnectionLost: - if (allow_retries_in_cluster_requests) - { - if (extension && extension->task_iterator && extension->task_iterator->supportRerunTask() && extension->replica_info) - { - if (!replica_has_processed_data.contains(extension->replica_info->number_of_current_replica)) - { - finished = true; - extension->task_iterator->rescheduleTasksFromReplica(extension->replica_info->number_of_current_replica); - return ReadResult(Block{}); - } - } - } packet.exception->rethrow(); break; @@ -1016,11 +1000,6 @@ void RemoteQueryExecutor::setProfileInfoCallback(ProfileInfoCallback callback) profile_info_callback = std::move(callback); } -bool RemoteQueryExecutor::skipUnavailableShards() const -{ - return context->getSettingsRef()[Setting::skip_unavailable_shards]; -} - bool RemoteQueryExecutor::needToSkipUnavailableShard() const { return context->getSettingsRef()[Setting::skip_unavailable_shards] && (0 == connections->size()); diff --git a/src/QueryPipeline/RemoteQueryExecutor.h b/src/QueryPipeline/RemoteQueryExecutor.h index 7ef8be9e27cc..525447a96178 100644 --- a/src/QueryPipeline/RemoteQueryExecutor.h +++ b/src/QueryPipeline/RemoteQueryExecutor.h @@ -233,8 +233,6 @@ class RemoteQueryExecutor IConnections & getConnections() { return *connections; } - bool skipUnavailableShards() const; - bool needToSkipUnavailableShard() const; bool isReplicaUnavailable() const { return extension && extension->parallel_reading_coordinator && connections->size() == 0; } @@ -339,8 +337,6 @@ class RemoteQueryExecutor const bool read_packet_type_separately = false; - const bool allow_retries_in_cluster_requests = false; - std::unordered_set replica_has_processed_data; /// Send all scalars to remote servers diff --git a/src/QueryPipeline/RemoteQueryExecutorReadContext.cpp b/src/QueryPipeline/RemoteQueryExecutorReadContext.cpp index bd9c0f4966e4..9281403224dd 100644 --- a/src/QueryPipeline/RemoteQueryExecutorReadContext.cpp +++ b/src/QueryPipeline/RemoteQueryExecutorReadContext.cpp @@ -22,13 +22,11 @@ namespace ErrorCodes RemoteQueryExecutorReadContext::RemoteQueryExecutorReadContext( RemoteQueryExecutor & executor_, bool suspend_when_query_sent_, - bool read_packet_type_separately_, - bool allow_retries_in_cluster_requests_) + bool read_packet_type_separately_) : AsyncTaskExecutor(std::make_unique(*this)) , executor(executor_) , suspend_when_query_sent(suspend_when_query_sent_) , read_packet_type_separately(read_packet_type_separately_) - , allow_retries_in_cluster_requests(allow_retries_in_cluster_requests_) { if (-1 == pipe2(pipe_fd, O_NONBLOCK)) throw ErrnoException(ErrorCodes::CANNOT_OPEN_FILE, "Cannot create pipe"); @@ -59,49 +57,21 @@ void RemoteQueryExecutorReadContext::Task::run(AsyncCallback async_callback, Sus if (read_context.executor.needToSkipUnavailableShard()) return; - try + while (true) { - while (true) - { - try - { - read_context.has_read_packet_part = PacketPart::None; - - if (read_context.read_packet_type_separately) - { - read_context.packet.type = read_context.executor.getConnections().receivePacketTypeUnlocked(async_callback); - read_context.has_read_packet_part = PacketPart::Type; - suspend_callback(); - } - read_context.packet = read_context.executor.getConnections().receivePacketUnlocked(async_callback); - read_context.has_read_packet_part = PacketPart::Body; - if (read_context.packet.type == Protocol::Server::Data) - read_context.has_data_packets = true; - } - catch (const Exception & e) - { - /// If cluster node unxepectedly shutted down (kill/segfault/power off/etc.) socket just closes. - /// If initiator did not process any data packets before, this fact can be ignored. - /// Unprocessed tasks will be executed on other nodes. - if (e.code() == ErrorCodes::ATTEMPT_TO_READ_AFTER_EOF - && !read_context.has_data_packets.load() && read_context.executor.skipUnavailableShards()) - { - read_context.has_read_packet_part = PacketPart::None; - } - else - throw; - } + read_context.has_read_packet_part = PacketPart::None; + if (read_context.read_packet_type_separately) + { + read_context.packet.type = read_context.executor.getConnections().receivePacketTypeUnlocked(async_callback); + read_context.has_read_packet_part = PacketPart::Type; suspend_callback(); } - } - catch (const Exception &) - { - if (!read_context.allow_retries_in_cluster_requests) - throw; - read_context.packet.type = Protocol::Server::ConnectionLost; - read_context.packet.exception = std::make_unique(getCurrentExceptionMessageAndPattern(true), getCurrentExceptionCode()); + read_context.packet = read_context.executor.getConnections().receivePacketUnlocked(async_callback); read_context.has_read_packet_part = PacketPart::Body; + if (read_context.packet.type == Protocol::Server::Data) + read_context.has_data_packets = true; + suspend_callback(); } } diff --git a/src/QueryPipeline/RemoteQueryExecutorReadContext.h b/src/QueryPipeline/RemoteQueryExecutorReadContext.h index 82bb28f81264..16ff5dc67a69 100644 --- a/src/QueryPipeline/RemoteQueryExecutorReadContext.h +++ b/src/QueryPipeline/RemoteQueryExecutorReadContext.h @@ -28,8 +28,7 @@ class RemoteQueryExecutorReadContext : public AsyncTaskExecutor explicit RemoteQueryExecutorReadContext( RemoteQueryExecutor & executor_, bool suspend_when_query_sent_, - bool read_packet_type_separately_, - bool allow_retries_in_cluster_requests_); + bool read_packet_type_separately_); ~RemoteQueryExecutorReadContext() override; @@ -112,7 +111,6 @@ class RemoteQueryExecutorReadContext : public AsyncTaskExecutor bool suspend_when_query_sent = false; bool is_query_sent = false; const bool read_packet_type_separately = false; - const bool allow_retries_in_cluster_requests = false; }; } From be618673459aff412613e08e0cd9f405c616189f Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Tue, 9 Dec 2025 15:34:30 +0100 Subject: [PATCH 2/2] Return try-catch for unexpected EOF --- src/QueryPipeline/RemoteQueryExecutor.cpp | 14 +++++++ src/QueryPipeline/RemoteQueryExecutor.h | 2 + .../RemoteQueryExecutorReadContext.cpp | 42 ++++++++++++++----- 3 files changed, 47 insertions(+), 11 deletions(-) diff --git a/src/QueryPipeline/RemoteQueryExecutor.cpp b/src/QueryPipeline/RemoteQueryExecutor.cpp index 887710e46b63..d51a5aecf8b5 100644 --- a/src/QueryPipeline/RemoteQueryExecutor.cpp +++ b/src/QueryPipeline/RemoteQueryExecutor.cpp @@ -730,6 +730,15 @@ RemoteQueryExecutor::ReadResult RemoteQueryExecutor::processPacket(Packet packet break; case Protocol::Server::ConnectionLost: + if (extension && extension->task_iterator && extension->task_iterator->supportRerunTask() && extension->replica_info) + { + if (!replica_has_processed_data.contains(extension->replica_info->number_of_current_replica)) + { + finished = true; + extension->task_iterator->rescheduleTasksFromReplica(extension->replica_info->number_of_current_replica); + return ReadResult(Block{}); + } + } packet.exception->rethrow(); break; @@ -1000,6 +1009,11 @@ void RemoteQueryExecutor::setProfileInfoCallback(ProfileInfoCallback callback) profile_info_callback = std::move(callback); } +bool RemoteQueryExecutor::skipUnavailableShards() const +{ + return context->getSettingsRef()[Setting::skip_unavailable_shards]; +} + bool RemoteQueryExecutor::needToSkipUnavailableShard() const { return context->getSettingsRef()[Setting::skip_unavailable_shards] && (0 == connections->size()); diff --git a/src/QueryPipeline/RemoteQueryExecutor.h b/src/QueryPipeline/RemoteQueryExecutor.h index 525447a96178..17bff1573ee4 100644 --- a/src/QueryPipeline/RemoteQueryExecutor.h +++ b/src/QueryPipeline/RemoteQueryExecutor.h @@ -233,6 +233,8 @@ class RemoteQueryExecutor IConnections & getConnections() { return *connections; } + bool skipUnavailableShards() const; + bool needToSkipUnavailableShard() const; bool isReplicaUnavailable() const { return extension && extension->parallel_reading_coordinator && connections->size() == 0; } diff --git a/src/QueryPipeline/RemoteQueryExecutorReadContext.cpp b/src/QueryPipeline/RemoteQueryExecutorReadContext.cpp index 9281403224dd..2fe4cab35003 100644 --- a/src/QueryPipeline/RemoteQueryExecutorReadContext.cpp +++ b/src/QueryPipeline/RemoteQueryExecutorReadContext.cpp @@ -57,22 +57,42 @@ void RemoteQueryExecutorReadContext::Task::run(AsyncCallback async_callback, Sus if (read_context.executor.needToSkipUnavailableShard()) return; - while (true) + try { - read_context.has_read_packet_part = PacketPart::None; + while (true) + { + read_context.has_read_packet_part = PacketPart::None; + + if (read_context.read_packet_type_separately) + { + read_context.packet.type = read_context.executor.getConnections().receivePacketTypeUnlocked(async_callback); + read_context.has_read_packet_part = PacketPart::Type; + suspend_callback(); + } + read_context.packet = read_context.executor.getConnections().receivePacketUnlocked(async_callback); + read_context.has_read_packet_part = PacketPart::Body; + if (read_context.packet.type == Protocol::Server::Data) + read_context.has_data_packets = true; - if (read_context.read_packet_type_separately) + suspend_callback(); + } + } + catch (const Exception & e) + { + /// If cluster node unxepectedly shutted down (kill/segfault/power off/etc.) socket just closes. + /// If initiator did not process any data packets before, this fact can be ignored. + /// Unprocessed tasks will be executed on other nodes. + if (e.code() == ErrorCodes::ATTEMPT_TO_READ_AFTER_EOF + && !read_context.has_data_packets.load() + && read_context.executor.skipUnavailableShards()) { - read_context.packet.type = read_context.executor.getConnections().receivePacketTypeUnlocked(async_callback); - read_context.has_read_packet_part = PacketPart::Type; + read_context.packet.type = Protocol::Server::ConnectionLost; + read_context.packet.exception = std::make_unique(getCurrentExceptionMessageAndPattern(true), getCurrentExceptionCode()); + read_context.has_read_packet_part = PacketPart::Body; suspend_callback(); } - read_context.packet = read_context.executor.getConnections().receivePacketUnlocked(async_callback); - read_context.has_read_packet_part = PacketPart::Body; - if (read_context.packet.type == Protocol::Server::Data) - read_context.has_data_packets = true; - - suspend_callback(); + else + throw; } }