From 39faf6e59c15a154926ea8956dbbf450f676d8eb Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Wed, 26 Nov 2025 18:36:19 +0100 Subject: [PATCH 1/3] Profile events for task distribution in ObjectStorageCluster requests --- src/Common/ProfileEvents.cpp | 5 +++++ .../ObjectStorage/StorageObjectStorageSource.cpp | 5 +++++ .../StorageObjectStorageStableTaskDistributor.cpp | 9 +++++++++ 3 files changed, 19 insertions(+) diff --git a/src/Common/ProfileEvents.cpp b/src/Common/ProfileEvents.cpp index 41790c8adf90..a13a92478e15 100644 --- a/src/Common/ProfileEvents.cpp +++ b/src/Common/ProfileEvents.cpp @@ -1159,6 +1159,11 @@ The server successfully detected this situation and will download merged part fr M(ObjectStorageListObjectsCachePrefixMatchHits, "Number of times object storage list objects operation miss the cache using prefix matching.", ValueType::Number) \ M(ParquetMetaDataCacheHits, "Number of times the read from filesystem cache hit the cache.", ValueType::Number) \ M(ParquetMetaDataCacheMisses, "Number of times the read from filesystem cache miss the cache.", ValueType::Number) \ + \ + M(ObjectStorageClusterSentToMatchedReplica, "Number of tasks in ObjectStorageCluster request sent on matched replica.", ValueType::Number) \ + M(ObjectStorageClusterSentToNonMatchedReplica, "Number of tasks in ObjectStorageCluster request sent on non-matched replica.", ValueType::Number) \ + M(ObjectStorageClusterProcessedTasks, "Number of processed tasks in ObjectStorageCluster request.", ValueType::Number) \ + M(ObjectStorageClusterWaitingMicroseconds, "Time of waiting for tasks in ObjectStorageCluster request.", ValueType::Microseconds) \ #ifdef APPLY_FOR_EXTERNAL_EVENTS #define APPLY_FOR_EVENTS(M) APPLY_FOR_BUILTIN_EVENTS(M) APPLY_FOR_EXTERNAL_EVENTS(M) diff --git a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp index 9d9191daf871..dc907b1f2823 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp @@ -45,6 +45,8 @@ namespace fs = std::filesystem; namespace ProfileEvents { extern const Event EngineFileLikeReadFiles; + extern const Event ObjectStorageClusterProcessedTasks; + extern const Event ObjectStorageClusterWaitingMicroseconds; } namespace CurrentMetrics @@ -507,6 +509,7 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade /// TODO: Make asyncronous waiting without sleep in thread /// Now this sleep is on executor node in worker thread /// Does not block query initiator + ProfileEvents::increment(ProfileEvents::ObjectStorageClusterWaitingMicroseconds, retry_after_us.value()); sleepForMicroseconds(std::min(Poco::Timestamp::TimeDiff(100000ul), retry_after_us.value())); continue; } @@ -519,6 +522,8 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade } while (not_a_path || (query_settings.skip_empty_files && object_info->metadata->size_bytes == 0)); + ProfileEvents::increment(ProfileEvents::ObjectStorageClusterProcessedTasks); + ObjectStoragePtr storage_to_use = object_info->getObjectStorage(); if (!storage_to_use) storage_to_use = object_storage; diff --git a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp index d84eb6dbea28..99d656acc419 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp @@ -3,6 +3,12 @@ #include #include +namespace ProfileEvents +{ + extern const Event ObjectStorageClusterSentToMatchedReplica; + extern const Event ObjectStorageClusterSentToNonMatchedReplica; +}; + namespace DB { @@ -125,6 +131,7 @@ ObjectInfoPtr StorageObjectStorageStableTaskDistributor::getPreQueuedFile(size_t number_of_current_replica ); + ProfileEvents::increment(ProfileEvents::ObjectStorageClusterSentToMatchedReplica); return next_file; } @@ -176,6 +183,7 @@ ObjectInfoPtr StorageObjectStorageStableTaskDistributor::getMatchingFileFromIter file_path, number_of_current_replica ); + ProfileEvents::increment(ProfileEvents::ObjectStorageClusterSentToMatchedReplica); return object_info; } LOG_TEST( @@ -229,6 +237,7 @@ ObjectInfoPtr StorageObjectStorageStableTaskDistributor::getAnyUnprocessedFile(s number_of_current_replica ); + ProfileEvents::increment(ProfileEvents::ObjectStorageClusterSentToNonMatchedReplica); return next_file; } From 1f8c29838e756d622e340f8d1464cae76e6c137b Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Wed, 26 Nov 2025 19:59:26 +0100 Subject: [PATCH 2/3] Fix ObjectStorageClusterWaitingMicroseconds --- src/Storages/ObjectStorage/StorageObjectStorageSource.cpp | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp index dc907b1f2823..73d6b5af2d39 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp @@ -509,8 +509,9 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade /// TODO: Make asyncronous waiting without sleep in thread /// Now this sleep is on executor node in worker thread /// Does not block query initiator - ProfileEvents::increment(ProfileEvents::ObjectStorageClusterWaitingMicroseconds, retry_after_us.value()); - sleepForMicroseconds(std::min(Poco::Timestamp::TimeDiff(100000ul), retry_after_us.value())); + auto wait_time = std::min(Poco::Timestamp::TimeDiff(100000ul), retry_after_us.value()); + ProfileEvents::increment(ProfileEvents::ObjectStorageClusterWaitingMicroseconds, wait_time); + sleepForMicroseconds(wait_time); continue; } } From 20920777808d3f20d769cbfabd2b895328030e85 Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Wed, 26 Nov 2025 20:02:25 +0100 Subject: [PATCH 3/3] Fix spelling --- src/Common/ProfileEvents.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Common/ProfileEvents.cpp b/src/Common/ProfileEvents.cpp index a13a92478e15..add2836b2dc3 100644 --- a/src/Common/ProfileEvents.cpp +++ b/src/Common/ProfileEvents.cpp @@ -1160,8 +1160,8 @@ The server successfully detected this situation and will download merged part fr M(ParquetMetaDataCacheHits, "Number of times the read from filesystem cache hit the cache.", ValueType::Number) \ M(ParquetMetaDataCacheMisses, "Number of times the read from filesystem cache miss the cache.", ValueType::Number) \ \ - M(ObjectStorageClusterSentToMatchedReplica, "Number of tasks in ObjectStorageCluster request sent on matched replica.", ValueType::Number) \ - M(ObjectStorageClusterSentToNonMatchedReplica, "Number of tasks in ObjectStorageCluster request sent on non-matched replica.", ValueType::Number) \ + M(ObjectStorageClusterSentToMatchedReplica, "Number of tasks in ObjectStorageCluster request sent to matched replica.", ValueType::Number) \ + M(ObjectStorageClusterSentToNonMatchedReplica, "Number of tasks in ObjectStorageCluster request sent to non-matched replica.", ValueType::Number) \ M(ObjectStorageClusterProcessedTasks, "Number of processed tasks in ObjectStorageCluster request.", ValueType::Number) \ M(ObjectStorageClusterWaitingMicroseconds, "Time of waiting for tasks in ObjectStorageCluster request.", ValueType::Microseconds) \