From eb9bfbeccd1a540c7899133358cf2a3d93b3d247 Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Fri, 28 Nov 2025 16:39:08 +0100 Subject: [PATCH 1/6] Initialize last_node_activity for all nodes --- .../StorageObjectStorageStableTaskDistributor.cpp | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp index d84eb6dbea28..3125fe663572 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp @@ -24,9 +24,13 @@ StorageObjectStorageStableTaskDistributor::StorageObjectStorageStableTaskDistrib , lock_object_storage_task_distribution_us(lock_object_storage_task_distribution_ms_ * 1000) , iterator_exhausted(false) { + Poco::Timestamp now; size_t nodes = ids_of_nodes.size(); for (size_t i = 0; i < nodes; ++i) + { replica_to_files_to_be_processed[i] = std::list{}; + last_node_activity[i] = now; + } } ObjectInfoPtr StorageObjectStorageStableTaskDistributor::getNextTask(size_t number_of_current_replica) From 6f6ef4bb56beed6756733606399c4f1b045064f4 Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Tue, 2 Dec 2025 11:16:06 +0100 Subject: [PATCH 2/6] Fix lock_object_storage_task_distribution_ms with a few files --- src/Disks/ObjectStorages/IObjectStorage.cpp | 4 +- src/Disks/ObjectStorages/IObjectStorage.h | 16 +-- src/Interpreters/ClusterFunctionReadTask.cpp | 11 ++- .../StorageObjectStorageSource.cpp | 4 +- ...rageObjectStorageStableTaskDistributor.cpp | 4 +- .../test_s3_cache_locality/test.py | 99 ++++++++++++++++--- 6 files changed, 107 insertions(+), 31 deletions(-) diff --git a/src/Disks/ObjectStorages/IObjectStorage.cpp b/src/Disks/ObjectStorages/IObjectStorage.cpp index 9b0137af7f28..2541d71c49f2 100644 --- a/src/Disks/ObjectStorages/IObjectStorage.cpp +++ b/src/Disks/ObjectStorages/IObjectStorage.cpp @@ -118,7 +118,7 @@ PathWithMetadata::CommandInTaskResponse::CommandInTaskResponse(const std::string if (!json) return; - successfully_parsed = true; + is_valid = true; if (json->has("retry_after_us")) retry_after_us = json->getValue("retry_after_us"); @@ -129,7 +129,7 @@ PathWithMetadata::CommandInTaskResponse::CommandInTaskResponse(const std::string } } -std::string PathWithMetadata::CommandInTaskResponse::to_string() const +std::string PathWithMetadata::CommandInTaskResponse::toString() const { Poco::JSON::Object json; if (retry_after_us.has_value()) diff --git a/src/Disks/ObjectStorages/IObjectStorage.h b/src/Disks/ObjectStorages/IObjectStorage.h index 26017f46f57b..292a8de13537 100644 --- a/src/Disks/ObjectStorages/IObjectStorage.h +++ b/src/Disks/ObjectStorages/IObjectStorage.h @@ -121,15 +121,19 @@ struct PathWithMetadata CommandInTaskResponse() = default; explicit CommandInTaskResponse(const std::string & task); - bool is_parsed() const { return successfully_parsed; } - void set_retry_after_us(Poco::Timestamp::TimeDiff time_us) { retry_after_us = time_us; } + bool isValid() const { return is_valid; } + void setRetryAfterUs(Poco::Timestamp::TimeDiff time_us) + { + retry_after_us = time_us; + is_valid = true; + } - std::string to_string() const; + std::string toString() const; - std::optional get_retry_after_us() const { return retry_after_us; } + std::optional getRetryAfterUs() const { return retry_after_us; } private: - bool successfully_parsed = false; + bool is_valid = false; std::optional retry_after_us; }; @@ -158,7 +162,7 @@ struct PathWithMetadata , absolute_path((absolute_path_.has_value() && !absolute_path_.value().empty()) ? absolute_path_ : std::nullopt) , object_storage_to_use(object_storage_to_use_) { - if (command.is_parsed()) + if (command.isValid()) relative_path = ""; } diff --git a/src/Interpreters/ClusterFunctionReadTask.cpp b/src/Interpreters/ClusterFunctionReadTask.cpp index adef5b74b98d..319505917f2f 100644 --- a/src/Interpreters/ClusterFunctionReadTask.cpp +++ b/src/Interpreters/ClusterFunctionReadTask.cpp @@ -33,9 +33,14 @@ ClusterFunctionReadTaskResponse::ClusterFunctionReadTaskResponse(ObjectInfoPtr o file_meta_info = object->file_meta_info; - const bool send_over_whole_archive = !context->getSettingsRef()[Setting::cluster_function_process_archive_on_multiple_nodes]; - path = send_over_whole_archive ? object->getPathOrPathToArchiveIfArchive() : object->getPath(); - absolute_path = object->getAbsolutePath(); + if (object->getCommand().isValid()) + path = object->getCommand().toString(); + else + { + const bool send_over_whole_archive = !context->getSettingsRef()[Setting::cluster_function_process_archive_on_multiple_nodes]; + path = send_over_whole_archive ? object->getPathOrPathToArchiveIfArchive() : object->getPath(); + absolute_path = object->getAbsolutePath(); + } } ClusterFunctionReadTaskResponse::ClusterFunctionReadTaskResponse(const std::string & path_) diff --git a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp index 9d9191daf871..358f9a2e6fea 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp @@ -498,9 +498,9 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade if (!object_info) return {}; - if (object_info->getCommand().is_parsed()) + if (object_info->getCommand().isValid()) { - auto retry_after_us = object_info->getCommand().get_retry_after_us(); + auto retry_after_us = object_info->getCommand().getRetryAfterUs(); if (retry_after_us.has_value()) { not_a_path = true; diff --git a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp index 3125fe663572..651521e92a0f 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp @@ -250,8 +250,8 @@ ObjectInfoPtr StorageObjectStorageStableTaskDistributor::getAnyUnprocessedFile(s /// All unprocessed files owned by alive replicas with recenlty activity /// Need to retry after (oldest_activity - activity_limit) microseconds PathWithMetadata::CommandInTaskResponse response; - response.set_retry_after_us(oldest_activity - activity_limit); - return std::make_shared(response.to_string()); + response.setRetryAfterUs(oldest_activity - activity_limit); + return std::make_shared(response.toString()); } return {}; diff --git a/tests/integration/test_s3_cache_locality/test.py b/tests/integration/test_s3_cache_locality/test.py index 7d2fc2a2ada4..60a414fe0014 100644 --- a/tests/integration/test_s3_cache_locality/test.py +++ b/tests/integration/test_s3_cache_locality/test.py @@ -22,8 +22,8 @@ def create_buckets_s3(cluster, files=1000): s3_data = [] for file_number in range(files): - file_name = f"data/generated/file_{file_number}.csv" - os.makedirs(os.path.join(SCRIPT_DIR, "data/generated/"), exist_ok=True) + file_name = f"data/generated_{files}/file_{file_number}.csv" + os.makedirs(os.path.join(SCRIPT_DIR, f"data/generated_{files}/"), exist_ok=True) s3_data.append(file_name) with open(os.path.join(SCRIPT_DIR, file_name), "w+", encoding="utf-8") as f: # a String, b UInt64 @@ -69,15 +69,17 @@ def started_cluster(): logging.info("Cluster started") create_buckets_s3(cluster) + create_buckets_s3(cluster, files=3) yield cluster finally: - shutil.rmtree(os.path.join(SCRIPT_DIR, "data/generated/"), ignore_errors=True) + shutil.rmtree(os.path.join(SCRIPT_DIR, "data/generated_1000/"), ignore_errors=True) + shutil.rmtree(os.path.join(SCRIPT_DIR, "data/generated_3/"), ignore_errors=True) cluster.shutdown() def check_s3_gets(cluster, node, expected_result, cluster_first, cluster_second, enable_filesystem_cache, - lock_object_storage_task_distribution_ms): + lock_object_storage_task_distribution_ms, files=1000): for host in list(cluster.instances.values()): host.query("SYSTEM DROP FILESYSTEM CACHE 'raw_s3_cache'", ignore_error=True) @@ -92,7 +94,7 @@ def check_s3_gets(cluster, node, expected_result, cluster_first, cluster_second, result_first = node.query( f""" SELECT count(*) - FROM s3Cluster('{cluster_first}', 'http://minio1:9001/root/data/generated/*', 'minio', '{minio_secret_key}', 'CSV', 'a String, b UInt64') + FROM s3Cluster('{cluster_first}', 'http://minio1:9001/root/data/generated_{files}/*', 'minio', '{minio_secret_key}', 'CSV', 'a String, b UInt64') WHERE b=42 SETTINGS {",".join(f"{k}={v}" for k, v in settings.items())} """, @@ -103,7 +105,7 @@ def check_s3_gets(cluster, node, expected_result, cluster_first, cluster_second, result_second = node.query( f""" SELECT count(*) - FROM s3Cluster('{cluster_second}', 'http://minio1:9001/root/data/generated/*', 'minio', '{minio_secret_key}', 'CSV', 'a String, b UInt64') + FROM s3Cluster('{cluster_second}', 'http://minio1:9001/root/data/generated_{files}/*', 'minio', '{minio_secret_key}', 'CSV', 'a String, b UInt64') WHERE b=42 SETTINGS {",".join(f"{k}={v}" for k, v in settings.items())} """, @@ -134,6 +136,40 @@ def check_s3_gets(cluster, node, expected_result, cluster_first, cluster_second, return int(s3_get_first), int(s3_get_second) +def check_s3_gets_by_hosts(cluster, node, expected_result, + lock_object_storage_task_distribution_ms, files=1000): + settings = { + "enable_filesystem_cache": False, + } + + settings["lock_object_storage_task_distribution_ms"] = lock_object_storage_task_distribution_ms + query_id = str(uuid.uuid4()) + result = node.query( + f""" + SELECT count(*) + FROM s3Cluster('{cluster}', 'http://minio1:9001/root/data/generated_{files}/*', 'minio', '{minio_secret_key}', 'CSV', 'a String, b UInt64') + WHERE b=42 + SETTINGS {",".join(f"{k}={v}" for k, v in settings.items())} + """, + query_id=query_id, + ) + assert result == expected_result + + node.query(f"SYSTEM FLUSH LOGS ON CLUSTER {cluster}") + + s3_get = node.query( + f""" + SELECT ProfileEvents['S3GetObject'] + FROM clusterAllReplicas('{cluster}', system.query_log) + WHERE type='QueryFinish' + AND initial_query_id='{query_id}' + ORDER BY hostname + """, + ) + + return [int(events) for events in s3_get.strip().split("\n")] + + def check_s3_gets_repeat(cluster, node, expected_result, cluster_first, cluster_second, enable_filesystem_cache, lock_object_storage_task_distribution_ms): # Repeat test several times to get average result @@ -154,7 +190,7 @@ def test_cache_locality(started_cluster, lock_object_storage_task_distribution_m expected_result = node.query( f""" SELECT count(*) - FROM s3('http://minio1:9001/root/data/generated/*', 'minio', '{minio_secret_key}', 'CSV', 'a String, b UInt64') + FROM s3('http://minio1:9001/root/data/generated_1000/*', 'minio', '{minio_secret_key}', 'CSV', 'a String, b UInt64') WHERE b=42 """ ) @@ -170,26 +206,57 @@ def test_cache_locality(started_cluster, lock_object_storage_task_distribution_m (s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_12345', 'cluster_12345', 1, lock_object_storage_task_distribution_ms) assert s3_get_second <= s3_get_first * dispersion - # Different nodes order + # Different replicas order (s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_12345', 'cluster_34512', 1, lock_object_storage_task_distribution_ms) assert s3_get_second <= s3_get_first * dispersion - # No last node + # No last replica (s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_12345', 'cluster_1234', 1, lock_object_storage_task_distribution_ms) - assert s3_get_second <= s3_get_first * (0.211 + dispersion) # actual value - 24 for 100 files, 211 for 1000 + assert s3_get_second <= s3_get_first * (0.179 + dispersion) # actual value - 179 of 1000 files changed replica - # No first node + # No first replica (s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_12345', 'cluster_2345', 1, lock_object_storage_task_distribution_ms) - assert s3_get_second <= s3_get_first * (0.189 + dispersion) # actual value - 12 for 100 files, 189 for 1000 + assert s3_get_second <= s3_get_first * (0.189 + dispersion) # actual value - 189 of 1000 files changed replica - # No first node, different nodes order + # No first replica, different replicas order (s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_12345', 'cluster_4523', 1, lock_object_storage_task_distribution_ms) assert s3_get_second <= s3_get_first * (0.189 + dispersion) - # Add new node, different nodes order + # Add new replica, different replicas order (s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_4523', 'cluster_12345', 1, lock_object_storage_task_distribution_ms) assert s3_get_second <= s3_get_first * (0.189 + dispersion) - # New node and old node, different nodes order + # New replica and old replica, different replicas order + # All files form removed replica changed replica + # Some files form existed replicas changed replica on the new replica (s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_1234', 'cluster_4523', 1, lock_object_storage_task_distribution_ms) - assert s3_get_second <= s3_get_first * (0.400 + dispersion) # actual value - 36 for 100 files, 400 for 1000 + assert s3_get_second <= s3_get_first * (0.368 + dispersion) # actual value - 368 of 1000 changed replica + + if (lock_object_storage_task_distribution_ms > 0): + s3_get = check_s3_gets_by_hosts('cluster_12345', node, expected_result, lock_object_storage_task_distribution_ms, files=1000) + assert s3_get == [189,210,220,202,179] + s3_get = check_s3_gets_by_hosts('cluster_1234', node, expected_result, lock_object_storage_task_distribution_ms, files=1000) + assert s3_get == [247,243,264,246] + s3_get = check_s3_gets_by_hosts('cluster_2345', node, expected_result, lock_object_storage_task_distribution_ms, files=1000) + assert s3_get == [251,280,248,221] + + +def test_cache_locality_few_files(started_cluster): + node = started_cluster.instances["clickhouse0"] + + expected_result = node.query( + f""" + SELECT count(*) + FROM s3('http://minio1:9001/root/data/generated_3/*', 'minio', '{minio_secret_key}', 'CSV', 'a String, b UInt64') + WHERE b=42 + """ + ) + + # Rendezvous hash makes the next distribution: + # file_0 - clickhouse1 + # file_1 - clickhouse4 + # file_2 - clickhouse3 + # The same distribution must be in each query + for _ in range(10): + s3_get = check_s3_gets_by_hosts('cluster_12345', node, expected_result, lock_object_storage_task_distribution_ms=30000, files=3) + assert s3_get == [1,0,1,1,0] From bdd9988914ce7e66360b01fe2e80cb79f34ede6e Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Wed, 3 Dec 2025 12:26:16 +0100 Subject: [PATCH 3/6] Fix for case with matched file in unprocessed list --- .../StorageObjectStorageStableTaskDistributor.cpp | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp index 651521e92a0f..d2639462abab 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp @@ -218,9 +218,10 @@ ObjectInfoPtr StorageObjectStorageStableTaskDistributor::getAnyUnprocessedFile(s while (it != unprocessed_files.end()) { auto last_activity = last_node_activity.find(it->second.second); - if (lock_object_storage_task_distribution_us <= 0 - || last_activity == last_node_activity.end() - || activity_limit > last_activity->second) + if (lock_object_storage_task_distribution_us <= 0 // file deferring is turned off + || it->second.second == number_of_current_replica // file is matching with current replica + || last_activity == last_node_activity.end() // msut never be happen, last_activity is filled for each replica on start + || activity_limit > last_activity->second) // matched replica did not ask for a new files for a while { auto next_file = it->second.first; unprocessed_files.erase(it); @@ -228,9 +229,10 @@ ObjectInfoPtr StorageObjectStorageStableTaskDistributor::getAnyUnprocessedFile(s auto file_path = send_over_whole_archive ? next_file->getPathOrPathToArchiveIfArchive() : next_file->getAbsolutePath().value_or(next_file->getPath()); LOG_TRACE( log, - "Iterator exhausted. Assigning unprocessed file {} to replica {}", + "Iterator exhausted. Assigning unprocessed file {} to replica {}, original matching replica {}", file_path, - number_of_current_replica + number_of_current_replica, + it->second.second ); return next_file; From 3cb419930e585736973a17cf60ef91ce84d51d65 Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Wed, 3 Dec 2025 12:44:58 +0100 Subject: [PATCH 4/6] Fix iterator using after remove --- .../StorageObjectStorageStableTaskDistributor.cpp | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp index d2639462abab..e665d99994e9 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp @@ -217,7 +217,8 @@ ObjectInfoPtr StorageObjectStorageStableTaskDistributor::getAnyUnprocessedFile(s while (it != unprocessed_files.end()) { - auto last_activity = last_node_activity.find(it->second.second); + auto number_of_matched_replica = it->second.second; + auto last_activity = last_node_activity.find(number_of_matched_replica); if (lock_object_storage_task_distribution_us <= 0 // file deferring is turned off || it->second.second == number_of_current_replica // file is matching with current replica || last_activity == last_node_activity.end() // msut never be happen, last_activity is filled for each replica on start @@ -229,10 +230,10 @@ ObjectInfoPtr StorageObjectStorageStableTaskDistributor::getAnyUnprocessedFile(s auto file_path = send_over_whole_archive ? next_file->getPathOrPathToArchiveIfArchive() : next_file->getAbsolutePath().value_or(next_file->getPath()); LOG_TRACE( log, - "Iterator exhausted. Assigning unprocessed file {} to replica {}, original matching replica {}", + "Iterator exhausted. Assigning unprocessed file {} to replica {} from matched replica {}", file_path, number_of_current_replica, - it->second.second + number_of_matched_replica ); return next_file; From 249feb8a3eea5097e57df6d1e8e262351f59015d Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Wed, 3 Dec 2025 14:15:52 +0100 Subject: [PATCH 5/6] Fix typo in comment --- tests/integration/test_s3_cache_locality/test.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/integration/test_s3_cache_locality/test.py b/tests/integration/test_s3_cache_locality/test.py index 60a414fe0014..68993d85aeed 100644 --- a/tests/integration/test_s3_cache_locality/test.py +++ b/tests/integration/test_s3_cache_locality/test.py @@ -227,8 +227,8 @@ def test_cache_locality(started_cluster, lock_object_storage_task_distribution_m assert s3_get_second <= s3_get_first * (0.189 + dispersion) # New replica and old replica, different replicas order - # All files form removed replica changed replica - # Some files form existed replicas changed replica on the new replica + # All files from removed replica changed replica + # Some files from existed replicas changed replica on the new replica (s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_1234', 'cluster_4523', 1, lock_object_storage_task_distribution_ms) assert s3_get_second <= s3_get_first * (0.368 + dispersion) # actual value - 368 of 1000 changed replica From cb8916c0fb33fa727b41d7600683069013801612 Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Wed, 3 Dec 2025 17:31:39 +0100 Subject: [PATCH 6/6] Fix test_storage_s3/test.py::test_archive --- .../configs/lock_object_storage_task_distribution_ms.xml | 7 +++++++ tests/integration/test_storage_s3/test.py | 2 ++ 2 files changed, 9 insertions(+) create mode 100644 tests/integration/test_storage_s3/configs/lock_object_storage_task_distribution_ms.xml diff --git a/tests/integration/test_storage_s3/configs/lock_object_storage_task_distribution_ms.xml b/tests/integration/test_storage_s3/configs/lock_object_storage_task_distribution_ms.xml new file mode 100644 index 000000000000..a8239a28293c --- /dev/null +++ b/tests/integration/test_storage_s3/configs/lock_object_storage_task_distribution_ms.xml @@ -0,0 +1,7 @@ + + + + 0 + + + diff --git a/tests/integration/test_storage_s3/test.py b/tests/integration/test_storage_s3/test.py index 53c68fa2e5fb..65ec8d1af5d9 100644 --- a/tests/integration/test_storage_s3/test.py +++ b/tests/integration/test_storage_s3/test.py @@ -74,6 +74,7 @@ def started_cluster(): "configs/access.xml", "configs/users.xml", "configs/s3_retry.xml", + "configs/lock_object_storage_task_distribution_ms.xml", ], ) cluster.add_instance( @@ -122,6 +123,7 @@ def started_cluster(): "configs/users.xml", "configs/s3_retry.xml", "configs/process_archives_as_whole_with_cluster.xml", + "configs/lock_object_storage_task_distribution_ms.xml", ], ) cluster.add_instance(