From ed68e8b821c16ce2af68d76302b40c1a51924f0d Mon Sep 17 00:00:00 2001 From: Vasily Nemkov Date: Thu, 25 Sep 2025 19:13:36 +0200 Subject: [PATCH 1/2] Changed lock_object_storage_task_distribution_ms value to 500 Also updated settings description, and minor fix in settings history --- src/Core/Settings.cpp | 14 ++++++++++++-- src/Core/SettingsChangesHistory.cpp | 6 +++++- 2 files changed, 17 insertions(+), 3 deletions(-) diff --git a/src/Core/Settings.cpp b/src/Core/Settings.cpp index 44141270985f..b2c69047ee6d 100644 --- a/src/Core/Settings.cpp +++ b/src/Core/Settings.cpp @@ -6888,8 +6888,18 @@ Cache the list of objects returned by list objects calls in object storage DECLARE(Bool, distributed_plan_optimize_exchanges, true, R"( Removes unnecessary exchanges in distributed query plan. Disable it for debugging. )", 0) \ - DECLARE(UInt64, lock_object_storage_task_distribution_ms, 0, R"( -In object storage distribution queries do not distibute tasks on non-prefetched nodes until prefetched node is active. + DECLARE(UInt64, lock_object_storage_task_distribution_ms, 500, R"( +In object storage distribution queries do not distribute tasks on non-prefetched nodes until prefetched node is active. +Determines how long the free executor node (one that finished processing all of it assigned tasks) should wait before "stealing" tasks from queue of currently busy executor nodes. + +Possible values: + +- 0 - steal tasks immediately after freeing up. +- >0 - wait for specified period of time before stealing tasks. + +Having this `>0` helps with cache reuse and might improve overall query time. +Because busy node might have warmed-up caches for this specific task, while free node needs to fetch lots of data from S3. +Which might take longer than just waiting for the busy node and generate extra traffic. )", EXPERIMENTAL) \ DECLARE(String, distributed_plan_force_exchange_kind, "", R"( Force specified kind of Exchange operators between distributed query stages. diff --git a/src/Core/SettingsChangesHistory.cpp b/src/Core/SettingsChangesHistory.cpp index 8dbfaff5c02f..e547fbb23e18 100644 --- a/src/Core/SettingsChangesHistory.cpp +++ b/src/Core/SettingsChangesHistory.cpp @@ -67,7 +67,11 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory() /// controls new feature and it's 'true' by default, use 'false' as previous_value). /// It's used to implement `compatibility` setting (see https://github.com/ClickHouse/ClickHouse/issues/35972) /// Note: please check if the key already exists to prevent duplicate entries. - addSettingsChanges(settings_changes_history, "25.6.5.2000", + addSettingsChanges(settings_changes_history, "25.6.5.20364", + { + {"lock_object_storage_task_distribution_ms", 500, 500, "Raised the value to 500 to avoid hoping tasks between executors."}, + }); + addSettingsChanges(settings_changes_history, "25.6.5.20000", { {"allow_experimental_database_iceberg", false, true, "Turned ON by default for Antalya"}, {"allow_experimental_database_unity_catalog", false, true, "Turned ON by default for Antalya"}, From 45acc4d47b7fe0fe8403f3509d492af743c79e1f Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Mon, 29 Sep 2025 16:37:04 +0200 Subject: [PATCH 2/2] Fix lock_object_storage_task_distribution_ms with lost host --- .../StorageObjectStorageStableTaskDistributor.cpp | 8 ++++++-- tests/integration/test_s3_cache_locality/test.py | 3 +-- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp index 78652fd803e3..cb9236f879ee 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp @@ -186,7 +186,7 @@ std::optional StorageObjectStorageStableTaskDistributor::getMatchingFile // Queue file for its assigned replica { std::lock_guard lock(mutex); - unprocessed_files[file_path] = number_of_current_replica; + unprocessed_files[file_path] = file_replica_idx; connection_to_files[file_replica_idx].push_back(file_path); } } @@ -277,7 +277,11 @@ void StorageObjectStorageStableTaskDistributor::rescheduleTasksFromReplica(size_ replica_to_files_to_be_processed.erase(number_of_current_replica); for (const auto & file_path : processed_file_list_ptr->second) - unprocessed_files[file_path] = getReplicaForFile(file_path); + { + auto file_replica_idx = getReplicaForFile(file_path); + unprocessed_files[file_path] = file_replica_idx; + connection_to_files[file_replica_idx].push_back(file_path); + } } } diff --git a/tests/integration/test_s3_cache_locality/test.py b/tests/integration/test_s3_cache_locality/test.py index da85e78a5643..7d2fc2a2ada4 100644 --- a/tests/integration/test_s3_cache_locality/test.py +++ b/tests/integration/test_s3_cache_locality/test.py @@ -86,8 +86,7 @@ def check_s3_gets(cluster, node, expected_result, cluster_first, cluster_second, "filesystem_cache_name": "'raw_s3_cache'", } - if lock_object_storage_task_distribution_ms > 0: - settings["lock_object_storage_task_distribution_ms"] = lock_object_storage_task_distribution_ms + settings["lock_object_storage_task_distribution_ms"] = lock_object_storage_task_distribution_ms query_id_first = str(uuid.uuid4()) result_first = node.query(