From bb58490c95594873023e0b29d82773d8691d5076 Mon Sep 17 00:00:00 2001 From: Kishore K G Date: Tue, 6 Jan 2026 10:52:37 +0000 Subject: [PATCH 1/6] Fix scheduler deadlock when reducing max_outstanding_requests_per_tenant This commit fixes a critical deadlock in the query scheduler that occurs when a tenant's max_outstanding_requests_per_tenant limit is dynamically reduced via runtime configuration. When MaxOutstandingPerTenant is reduced while a user's FIFORequestQueue is full, the getOrAddQueue method attempts to migrate requests to a smaller queue. Previously, this loop blocked indefinitely when the new queue capacity was reached, causing the scheduler to freeze. The fix ensures the migration loop breaks when the new queue is full, effectively dropping excess requests instead of blocking. Verification: Added regression test TestGetOrAddQueue_ShouldNotDeadlockWhenLimitsAreReduced Signed-off-by: Kishore K G --- pkg/scheduler/queue/queue_test.go | 54 ++++++++++++++++++++++++++++++ pkg/scheduler/queue/user_queues.go | 2 +- 2 files changed, 55 insertions(+), 1 deletion(-) diff --git a/pkg/scheduler/queue/queue_test.go b/pkg/scheduler/queue/queue_test.go index 47bd5295271..4b9561fc76d 100644 --- a/pkg/scheduler/queue/queue_test.go +++ b/pkg/scheduler/queue/queue_test.go @@ -285,6 +285,60 @@ func TestQueriersShouldGetHighPriorityQueryFirst(t *testing.T) { assert.Equal(t, highPriorityRequest, nextRequest) // high priority request returned, although it was enqueued the last } +func TestGetOrAddQueue_ShouldNotDeadlockWhenLimitsAreReduced(t *testing.T) { + // Setup: Large initial limit + initialLimit := 100 + newLimit := 50 + + limits := MockLimits{ + MaxOutstanding: initialLimit, + } + + // Initialize queues + q := newUserQueues(0, limits, nil) + + // Create user queue + userID := "test-user-deadlock" + queue := q.getOrAddQueue(userID, 1) + + // Fill queue to capacity (near initialLimit) + // We fill it more than newLimit + itemsToFill := 80 + for i := 0; i < itemsToFill; i++ { + queue.enqueueRequest(MockRequest{priority: 1}) + } + + require.Equal(t, itemsToFill, queue.length()) + + // Reduce limit below current size + // We change the mock limits return value. + // In real app this comes from runtime config reload. + limits.MaxOutstanding = newLimit + q.limits = limits // Update strict reference in queues struct (mocking the reload effect) + + // Now call getOrAddQueue again. + // This triggers the migration logic: existing queue (80 items) -> new queue (cap 50). + done := make(chan struct{}) + go func() { + _ = q.getOrAddQueue(userID, 1) + close(done) + }() + + select { + case <-done: + // Success: no deadlock + case <-time.After(2 * time.Second): + t.Fatal("Deadlock detected! getOrAddQueue timed out while migrating queue with reduced limits.") + } + + // The new queue should be capped at newLimit or contain what managed to fit. + // Logic: it breaks when full. So new queue should be full (length == newLimit). + newQueue := q.getOrAddQueue(userID, 1) // Should be fast now + + // Note: The actual items in queue should be newLimit (50). The rest (30) are dropped. + assert.Equal(t, newLimit, newQueue.length()) +} + func TestReservedQueriersShouldOnlyGetHighPriorityQueries(t *testing.T) { queue := NewRequestQueue(0, prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"user", "priority", "type"}), diff --git a/pkg/scheduler/queue/user_queues.go b/pkg/scheduler/queue/user_queues.go index c7e30d87375..eaed52ad2e4 100644 --- a/pkg/scheduler/queue/user_queues.go +++ b/pkg/scheduler/queue/user_queues.go @@ -169,7 +169,7 @@ func (q *queues) getOrAddQueue(userID string, maxQueriers int) userRequestQueue tmpQueue := q.createUserRequestQueue(userID) // flush to new queue - for uq.queue.length() > 0 { + for (uq.queue.length() > 0) && (tmpQueue.length() < maxOutstanding) { tmpQueue.enqueueRequest(uq.queue.dequeueRequest(0, false)) } From f9f7d361d24b5ee7573a3e7b141729447097fc05 Mon Sep 17 00:00:00 2001 From: Kishore K G Date: Tue, 6 Jan 2026 11:53:53 +0000 Subject: [PATCH 2/6] fix lint Signed-off-by: Kishore K G --- pkg/scheduler/queue/queue_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/scheduler/queue/queue_test.go b/pkg/scheduler/queue/queue_test.go index 4b9561fc76d..665ead376e5 100644 --- a/pkg/scheduler/queue/queue_test.go +++ b/pkg/scheduler/queue/queue_test.go @@ -304,7 +304,7 @@ func TestGetOrAddQueue_ShouldNotDeadlockWhenLimitsAreReduced(t *testing.T) { // Fill queue to capacity (near initialLimit) // We fill it more than newLimit itemsToFill := 80 - for i := 0; i < itemsToFill; i++ { + for range itemsToFill { queue.enqueueRequest(MockRequest{priority: 1}) } From 4b823f31bf2971cae3ec99243daea6690bb1aa4b Mon Sep 17 00:00:00 2001 From: kishorekg1999 Date: Mon, 5 Jan 2026 03:41:25 +0530 Subject: [PATCH 3/6] Update Ruler frontend_address comments to mention Store Gateway (#7179) * Update Ruler frontend_address comments to mention Store Gateway Signed-off-by: Kishore K G * Update pkg/ruler/ruler.go Co-authored-by: SungJin1212 Signed-off-by: kishorekg1999 * Update Ruler frontend_address comments to mention Store Gateway Signed-off-by: Kishore K G * empty commit Signed-off-by: Kishore K G --------- Signed-off-by: Kishore K G Signed-off-by: kishorekg1999 Co-authored-by: SungJin1212 --- docs/configuration/config-file-reference.md | 7 ++++--- pkg/ruler/ruler.go | 2 +- schemas/cortex-config-schema.json | 2 +- 3 files changed, 6 insertions(+), 5 deletions(-) diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 260f9a40a69..f7deb13faae 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -5145,9 +5145,10 @@ The `redis_config` configures the Redis backend cache. The `ruler_config` configures the Cortex ruler. ```yaml -# [Experimental] GRPC listen address of the Query Frontend, in host:port format. -# If set, Ruler queries to Query Frontends via gRPC. If not set, ruler queries -# to Ingesters directly. +# [Experimental] gRPC address of the Query Frontend (host:port). If set, the +# Ruler send queries to the Query Frontend to utilize splitting and caching, at +# the cost of additional network hops compared to direct querying to Ingesters +# and Store Gateway. # CLI flag: -ruler.frontend-address [frontend_address: | default = ""] diff --git a/pkg/ruler/ruler.go b/pkg/ruler/ruler.go index 7c9cf64ac74..a965b33f39d 100644 --- a/pkg/ruler/ruler.go +++ b/pkg/ruler/ruler.go @@ -231,7 +231,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { //lint:ignore faillint Need to pass the global logger like this for warning on deprecated methods flagext.DeprecatedFlag(f, "ruler.alertmanager-use-v2", "This flag is no longer functional. V1 API is deprecated and removed", util_log.Logger) - f.StringVar(&cfg.FrontendAddress, "ruler.frontend-address", "", "[Experimental] GRPC listen address of the Query Frontend, in host:port format. If set, Ruler queries to Query Frontends via gRPC. If not set, ruler queries to Ingesters directly.") + f.StringVar(&cfg.FrontendAddress, "ruler.frontend-address", "", "[Experimental] gRPC address of the Query Frontend (host:port). If set, the Ruler send queries to the Query Frontend to utilize splitting and caching, at the cost of additional network hops compared to direct querying to Ingesters and Store Gateway.") f.StringVar(&cfg.QueryResponseFormat, "ruler.query-response-format", queryResponseFormatProtobuf, fmt.Sprintf("[Experimental] Query response format to get query results from Query Frontend when the rule evaluation. It will only take effect when `-ruler.frontend-address` is configured. Supported values: %s", strings.Join(supportedQueryResponseFormats, ","))) cfg.ExternalURL.URL, _ = url.Parse("") // Must be non-nil f.Var(&cfg.ExternalURL, "ruler.external.url", "URL of alerts return path.") diff --git a/schemas/cortex-config-schema.json b/schemas/cortex-config-schema.json index 3cdc6290259..228617db42b 100644 --- a/schemas/cortex-config-schema.json +++ b/schemas/cortex-config-schema.json @@ -6587,7 +6587,7 @@ "x-format": "duration" }, "frontend_address": { - "description": "[Experimental] GRPC listen address of the Query Frontend, in host:port format. If set, Ruler queries to Query Frontends via gRPC. If not set, ruler queries to Ingesters directly.", + "description": "[Experimental] gRPC address of the Query Frontend (host:port). If set, the Ruler send queries to the Query Frontend to utilize splitting and caching, at the cost of additional network hops compared to direct querying to Ingesters and Store Gateway.", "type": "string", "x-cli-flag": "ruler.frontend-address" }, From 36c090772d44f7e8be4de3e3d788274105b2b68a Mon Sep 17 00:00:00 2001 From: SungJin1212 Date: Mon, 5 Jan 2026 10:40:03 +0900 Subject: [PATCH 4/6] Rename user index update config (#7180) * Rename user index update config Signed-off-by: SungJin1212 * fix lint Signed-off-by: SungJin1212 --------- Signed-off-by: SungJin1212 --- CHANGELOG.md | 4 +- docs/blocks-storage/querier.md | 4 +- docs/blocks-storage/store-gateway.md | 4 +- docs/configuration/config-file-reference.md | 12 +++--- integration/alertmanager_test.go | 6 +-- integration/ruler_test.go | 8 ++-- .../alertstore/bucketclient/bucket_client.go | 2 +- pkg/compactor/compactor.go | 4 +- pkg/compactor/compactor_test.go | 2 +- .../rulestore/bucketclient/bucket_client.go | 2 +- pkg/util/users/index_updater_test.go | 4 +- pkg/util/users/scanner_config.go | 12 +++--- schemas/cortex-config-schema.json | 42 +++++++++---------- 13 files changed, 54 insertions(+), 52 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 17b10223a33..44ddb10a605 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,7 +1,9 @@ # Changelog ## master / unreleased - +* [CHANGE] Users Scanner: Rename user index update configuration. #7180 + * Flag: Renamed `-*.users-scanner.user-index.cleanup-interval` to `-*.users-scanner.user-index.update-interval`. + * Config: Renamed `clean_up_interval` to `update_interval` within the `users_scanner` configuration block.. * [CHANGE] Querier: Refactored parquet cache configuration naming. #7146 * Metrics: Renamed `cortex_parquet_queryable_cache_*` to `cortex_parquet_cache_*`. * Flags: Renamed `-querier.parquet-queryable-shard-cache-size` to `-querier.parquet-shard-cache-size` and `-querier.parquet-queryable-shard-cache-ttl` to `-querier.parquet-shard-cache-ttl`. diff --git a/docs/blocks-storage/querier.md b/docs/blocks-storage/querier.md index 915bf258bd5..3eb44525d6e 100644 --- a/docs/blocks-storage/querier.md +++ b/docs/blocks-storage/querier.md @@ -1948,8 +1948,8 @@ blocks_storage: # How frequently user index file is updated. It only takes effect when user # scan strategy is user_index. - # CLI flag: -blocks-storage.users-scanner.user-index.cleanup-interval - [clean_up_interval: | default = 15m] + # CLI flag: -blocks-storage.users-scanner.user-index.update-interval + [update_interval: | default = 15m] # TTL of the cached users. 0 disables caching and relies on caching at # bucket client level. diff --git a/docs/blocks-storage/store-gateway.md b/docs/blocks-storage/store-gateway.md index 868f534b2a0..56dc288dad4 100644 --- a/docs/blocks-storage/store-gateway.md +++ b/docs/blocks-storage/store-gateway.md @@ -2019,8 +2019,8 @@ blocks_storage: # How frequently user index file is updated. It only takes effect when user # scan strategy is user_index. - # CLI flag: -blocks-storage.users-scanner.user-index.cleanup-interval - [clean_up_interval: | default = 15m] + # CLI flag: -blocks-storage.users-scanner.user-index.update-interval + [update_interval: | default = 15m] # TTL of the cached users. 0 disables caching and relies on caching at # bucket client level. diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index f7deb13faae..d3de1ce279a 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -999,8 +999,8 @@ users_scanner: # How frequently user index file is updated. It only takes effect when user # scan strategy is user_index. - # CLI flag: -alertmanager-storage.users-scanner.user-index.cleanup-interval - [clean_up_interval: | default = 15m] + # CLI flag: -alertmanager-storage.users-scanner.user-index.update-interval + [update_interval: | default = 15m] # TTL of the cached users. 0 disables caching and relies on caching at bucket # client level. @@ -2624,8 +2624,8 @@ users_scanner: # How frequently user index file is updated. It only takes effect when user # scan strategy is user_index. - # CLI flag: -blocks-storage.users-scanner.user-index.cleanup-interval - [clean_up_interval: | default = 15m] + # CLI flag: -blocks-storage.users-scanner.user-index.update-interval + [update_interval: | default = 15m] # TTL of the cached users. 0 disables caching and relies on caching at bucket # client level. @@ -5905,8 +5905,8 @@ users_scanner: # How frequently user index file is updated. It only takes effect when user # scan strategy is user_index. - # CLI flag: -ruler-storage.users-scanner.user-index.cleanup-interval - [clean_up_interval: | default = 15m] + # CLI flag: -ruler-storage.users-scanner.user-index.update-interval + [update_interval: | default = 15m] # TTL of the cached users. 0 disables caching and relies on caching at bucket # client level. diff --git a/integration/alertmanager_test.go b/integration/alertmanager_test.go index 7eb2be2d47f..cbbdccdce15 100644 --- a/integration/alertmanager_test.go +++ b/integration/alertmanager_test.go @@ -82,9 +82,9 @@ func TestAlertmanagerWithUserIndexUpdater(t *testing.T) { baseFlags := mergeFlags(AlertmanagerFlags(), AlertmanagerS3Flags()) flags := mergeFlags(baseFlags, AlertmanagerShardingFlags(consul.NetworkHTTPEndpoint(), 1), map[string]string{ - "-alertmanager-storage.users-scanner.strategy": "user_index", - "-alertmanager-storage.users-scanner.user-index.cleanup-interval": "15s", - "-alertmanager.configs.poll-interval": "5s", + "-alertmanager-storage.users-scanner.strategy": "user_index", + "-alertmanager-storage.users-scanner.user-index.update-interval": "15s", + "-alertmanager.configs.poll-interval": "5s", }) am := e2ecortex.NewAlertmanager( diff --git a/integration/ruler_test.go b/integration/ruler_test.go index e34c3efa5a6..56f29295aae 100644 --- a/integration/ruler_test.go +++ b/integration/ruler_test.go @@ -158,10 +158,10 @@ func TestRulerWithUserIndexUpdater(t *testing.T) { RulerFlags(), RulerShardingFlags(consul.NetworkHTTPEndpoint()), map[string]string{ - "-ruler.sharding-strategy": "shuffle-sharding", - "-ruler-storage.users-scanner.strategy": "user_index", - "-ruler-storage.users-scanner.user-index.cleanup-interval": "15s", - "-ruler.tenant-shard-size": "1", + "-ruler.sharding-strategy": "shuffle-sharding", + "-ruler-storage.users-scanner.strategy": "user_index", + "-ruler-storage.users-scanner.user-index.update-interval": "15s", + "-ruler.tenant-shard-size": "1", // Since we're not going to run any rule, we don't need the // store-gateway to be configured to a valid address. "-querier.store-gateway-addresses": "localhost:12345", diff --git a/pkg/alertmanager/alertstore/bucketclient/bucket_client.go b/pkg/alertmanager/alertstore/bucketclient/bucket_client.go index 989df29ebe0..ba3a3639e71 100644 --- a/pkg/alertmanager/alertstore/bucketclient/bucket_client.go +++ b/pkg/alertmanager/alertstore/bucketclient/bucket_client.go @@ -67,7 +67,7 @@ func NewBucketAlertStore(bkt objstore.InstrumentedBucket, userScannerCfg users.U baseScanner, _ := users.NewScanner(users.UsersScannerConfig{ Strategy: users.UserScanStrategyList, }, alertBucket, logger, regWithComponent) - userIndexUpdater = users.NewUserIndexUpdater(alertBucket, userScannerCfg.CleanUpInterval, baseScanner, regWithComponent) + userIndexUpdater = users.NewUserIndexUpdater(alertBucket, userScannerCfg.UpdateInterval, baseScanner, regWithComponent) } return &BucketAlertStore{ diff --git a/pkg/compactor/compactor.go b/pkg/compactor/compactor.go index 6506bea1f94..d66797c6e13 100644 --- a/pkg/compactor/compactor.go +++ b/pkg/compactor/compactor.go @@ -762,7 +762,7 @@ func (c *Compactor) starting(ctx context.Context) error { baseScanner, _ := users.NewScanner(users.UsersScannerConfig{ Strategy: users.UserScanStrategyList, }, c.bucketClient, c.logger, c.registerer) - c.userIndexUpdater = users.NewUserIndexUpdater(c.bucketClient, c.storageCfg.UsersScanner.CleanUpInterval, baseScanner, extprom.WrapRegistererWith(prometheus.Labels{"component": "compactor"}, c.registerer)) + c.userIndexUpdater = users.NewUserIndexUpdater(c.bucketClient, c.storageCfg.UsersScanner.UpdateInterval, baseScanner, extprom.WrapRegistererWith(prometheus.Labels{"component": "compactor"}, c.registerer)) } return nil @@ -1212,7 +1212,7 @@ func (c *Compactor) userIndexUpdateLoop(ctx context.Context) { // Hardcode ID to check which compactor owns updating user index. userID := users.UserIndexCompressedFilename // Align with clean up interval. - ticker := time.NewTicker(util.DurationWithJitter(c.storageCfg.UsersScanner.CleanUpInterval, 0.1)) + ticker := time.NewTicker(util.DurationWithJitter(c.storageCfg.UsersScanner.UpdateInterval, 0.1)) defer ticker.Stop() for { diff --git a/pkg/compactor/compactor_test.go b/pkg/compactor/compactor_test.go index c8c617d4279..0538be32400 100644 --- a/pkg/compactor/compactor_test.go +++ b/pkg/compactor/compactor_test.go @@ -1580,7 +1580,7 @@ func prepare(t *testing.T, compactorCfg Config, bucketClient objstore.Instrument flagext.DefaultValues(&storageCfg) storageCfg.BucketStore.BlockDiscoveryStrategy = string(cortex_tsdb.RecursiveDiscovery) storageCfg.UsersScanner.Strategy = users.UserScanStrategyUserIndex - storageCfg.UsersScanner.CleanUpInterval = 100 * time.Millisecond // Short interval for testing + storageCfg.UsersScanner.UpdateInterval = 100 * time.Millisecond // Short interval for testing // Create a temporary directory for compactor data. compactorCfg.DataDir = t.TempDir() diff --git a/pkg/ruler/rulestore/bucketclient/bucket_client.go b/pkg/ruler/rulestore/bucketclient/bucket_client.go index 00128672be7..62f6903275f 100644 --- a/pkg/ruler/rulestore/bucketclient/bucket_client.go +++ b/pkg/ruler/rulestore/bucketclient/bucket_client.go @@ -65,7 +65,7 @@ func NewBucketRuleStore(bkt objstore.Bucket, userScannerCfg users.UsersScannerCo baseScanner, _ := users.NewScanner(users.UsersScannerConfig{ Strategy: users.UserScanStrategyList, }, rulesBucket, logger, regWithComponent) - userIndexUpdater = users.NewUserIndexUpdater(rulesBucket, userScannerCfg.CleanUpInterval, baseScanner, regWithComponent) + userIndexUpdater = users.NewUserIndexUpdater(rulesBucket, userScannerCfg.UpdateInterval, baseScanner, regWithComponent) } return &BucketRuleStore{ diff --git a/pkg/util/users/index_updater_test.go b/pkg/util/users/index_updater_test.go index 0b4a6a68e9c..8cdef00bfdd 100644 --- a/pkg/util/users/index_updater_test.go +++ b/pkg/util/users/index_updater_test.go @@ -77,7 +77,7 @@ func TestUserIndexUpdater_UpdateUserIndex(t *testing.T) { t.Parallel() bkt, _ := cortex_testutil.PrepareFilesystemBucket(t) - updater := NewUserIndexUpdater(bkt, defaultCleanUpInterval, testData.scanner, nil) + updater := NewUserIndexUpdater(bkt, defaultUpdateInterval, testData.scanner, nil) err := updater.UpdateUserIndex(ctx) if testData.expectErr { @@ -118,7 +118,7 @@ func TestUserIndexUpdater_UpdateUserIndex_WriteError(t *testing.T) { // Mock the bucket to return an error on upload bkt.MockUpload(UserIndexCompressedFilename, assert.AnError) - updater := NewUserIndexUpdater(bkt, defaultCleanUpInterval, scanner, nil) + updater := NewUserIndexUpdater(bkt, defaultUpdateInterval, scanner, nil) err := updater.UpdateUserIndex(ctx) require.Error(t, err) assert.Contains(t, err.Error(), "upload user index") diff --git a/pkg/util/users/scanner_config.go b/pkg/util/users/scanner_config.go index 9ac6194c3ee..7cd08867de1 100644 --- a/pkg/util/users/scanner_config.go +++ b/pkg/util/users/scanner_config.go @@ -10,17 +10,17 @@ import ( ) type UsersScannerConfig struct { - Strategy string `yaml:"strategy"` - MaxStalePeriod time.Duration `yaml:"max_stale_period"` - CleanUpInterval time.Duration `yaml:"clean_up_interval"` - CacheTTL time.Duration `yaml:"cache_ttl"` + Strategy string `yaml:"strategy"` + MaxStalePeriod time.Duration `yaml:"max_stale_period"` + UpdateInterval time.Duration `yaml:"update_interval"` + CacheTTL time.Duration `yaml:"cache_ttl"` } const ( UserScanStrategyList = "list" UserScanStrategyUserIndex = "user_index" - defaultCleanUpInterval = time.Minute * 15 + defaultUpdateInterval = time.Minute * 15 ) var ( @@ -46,6 +46,6 @@ func (c *UsersScannerConfig) Validate() error { func (c *UsersScannerConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { f.StringVar(&c.Strategy, prefix+"users-scanner.strategy", UserScanStrategyList, fmt.Sprintf("Strategy to use to scan users. Supported values are: %s.", strings.Join(supportedStrategies, ", "))) f.DurationVar(&c.MaxStalePeriod, prefix+"users-scanner.user-index.max-stale-period", time.Hour, "Maximum period of time to consider the user index as stale. Fall back to the base scanner if stale. Only valid when strategy is user_index.") - f.DurationVar(&c.CleanUpInterval, prefix+"users-scanner.user-index.cleanup-interval", defaultCleanUpInterval, fmt.Sprintf("How frequently user index file is updated. It only takes effect when user scan strategy is %s.", UserScanStrategyUserIndex)) + f.DurationVar(&c.UpdateInterval, prefix+"users-scanner.user-index.update-interval", defaultUpdateInterval, fmt.Sprintf("How frequently user index file is updated. It only takes effect when user scan strategy is %s.", UserScanStrategyUserIndex)) f.DurationVar(&c.CacheTTL, prefix+"users-scanner.cache-ttl", 0, "TTL of the cached users. 0 disables caching and relies on caching at bucket client level.") } diff --git a/schemas/cortex-config-schema.json b/schemas/cortex-config-schema.json index 228617db42b..59df63212ac 100644 --- a/schemas/cortex-config-schema.json +++ b/schemas/cortex-config-schema.json @@ -912,13 +912,6 @@ "x-cli-flag": "alertmanager-storage.users-scanner.cache-ttl", "x-format": "duration" }, - "clean_up_interval": { - "default": "15m0s", - "description": "How frequently user index file is updated. It only takes effect when user scan strategy is user_index.", - "type": "string", - "x-cli-flag": "alertmanager-storage.users-scanner.user-index.cleanup-interval", - "x-format": "duration" - }, "max_stale_period": { "default": "1h0m0s", "description": "Maximum period of time to consider the user index as stale. Fall back to the base scanner if stale. Only valid when strategy is user_index.", @@ -931,6 +924,13 @@ "description": "Strategy to use to scan users. Supported values are: list, user_index.", "type": "string", "x-cli-flag": "alertmanager-storage.users-scanner.strategy" + }, + "update_interval": { + "default": "15m0s", + "description": "How frequently user index file is updated. It only takes effect when user scan strategy is user_index.", + "type": "string", + "x-cli-flag": "alertmanager-storage.users-scanner.user-index.update-interval", + "x-format": "duration" } }, "type": "object" @@ -3114,13 +3114,6 @@ "x-cli-flag": "blocks-storage.users-scanner.cache-ttl", "x-format": "duration" }, - "clean_up_interval": { - "default": "15m0s", - "description": "How frequently user index file is updated. It only takes effect when user scan strategy is user_index.", - "type": "string", - "x-cli-flag": "blocks-storage.users-scanner.user-index.cleanup-interval", - "x-format": "duration" - }, "max_stale_period": { "default": "1h0m0s", "description": "Maximum period of time to consider the user index as stale. Fall back to the base scanner if stale. Only valid when strategy is user_index.", @@ -3133,6 +3126,13 @@ "description": "Strategy to use to scan users. Supported values are: list, user_index.", "type": "string", "x-cli-flag": "blocks-storage.users-scanner.strategy" + }, + "update_interval": { + "default": "15m0s", + "description": "How frequently user index file is updated. It only takes effect when user scan strategy is user_index.", + "type": "string", + "x-cli-flag": "blocks-storage.users-scanner.user-index.update-interval", + "x-format": "duration" } }, "type": "object" @@ -7472,13 +7472,6 @@ "x-cli-flag": "ruler-storage.users-scanner.cache-ttl", "x-format": "duration" }, - "clean_up_interval": { - "default": "15m0s", - "description": "How frequently user index file is updated. It only takes effect when user scan strategy is user_index.", - "type": "string", - "x-cli-flag": "ruler-storage.users-scanner.user-index.cleanup-interval", - "x-format": "duration" - }, "max_stale_period": { "default": "1h0m0s", "description": "Maximum period of time to consider the user index as stale. Fall back to the base scanner if stale. Only valid when strategy is user_index.", @@ -7491,6 +7484,13 @@ "description": "Strategy to use to scan users. Supported values are: list, user_index.", "type": "string", "x-cli-flag": "ruler-storage.users-scanner.strategy" + }, + "update_interval": { + "default": "15m0s", + "description": "How frequently user index file is updated. It only takes effect when user scan strategy is user_index.", + "type": "string", + "x-cli-flag": "ruler-storage.users-scanner.user-index.update-interval", + "x-format": "duration" } }, "type": "object" From fdf96a156917f077db7c764666a1487b99047d8f Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Mon, 5 Jan 2026 11:33:55 -0800 Subject: [PATCH 5/6] instrument ingester query path with pprof user label (#7184) * instrument ingester query path with pprof user label Signed-off-by: yeya24 * update changelog Signed-off-by: yeya24 --------- Signed-off-by: yeya24 --- CHANGELOG.md | 1 + pkg/ingester/ingester.go | 217 +++++++++++++++++++++++++++------------ 2 files changed, 154 insertions(+), 64 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 44ddb10a605..f110486d740 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -27,6 +27,7 @@ * [ENHANCEMENT] Compactor: Avoid double compaction by not filtering delete blocks on real time when using bucketIndex lister. #7156 * [ENHANCEMENT] Upgrade to go 1.25. #7164 * [ENHANCEMENT] Upgraded container base images to `alpine:3.23`. #7163 +* [ENHANCEMENT] Instrument Ingester CPU profile with userID for read APIs. #7184 * [BUGFIX] Ring: Change DynamoDB KV to retry indefinitely for WatchKey. #7088 * [BUGFIX] Ruler: Add XFunctions validation support. #7111 * [BUGFIX] Querier: propagate Prometheus info annotations in protobuf responses. #7132 diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index acb8bac27a8..adbb136f155 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -11,6 +11,7 @@ import ( "os" "path/filepath" "runtime" + "runtime/pprof" "slices" "strings" "sync" @@ -1780,6 +1781,14 @@ func (i *Ingester) QueryExemplars(ctx context.Context, req *client.ExemplarQuery return nil, err } + // Set pprof labels for profiling + pprof.Do(ctx, pprof.Labels("user", userID), func(ctx context.Context) { + resp, err = i.queryExemplars(ctx, userID, req) + }) + return resp, err +} + +func (i *Ingester) queryExemplars(ctx context.Context, userID string, req *client.ExemplarQueryRequest) (*client.ExemplarQueryResponse, error) { from, through, matchers, err := client.FromExemplarQueryRequest(i.matchersCache, req) if err != nil { return nil, err @@ -1836,33 +1845,55 @@ func (i *Ingester) QueryExemplars(ctx context.Context, req *client.ExemplarQuery // LabelValues returns all label values that are associated with a given label name. func (i *Ingester) LabelValues(ctx context.Context, req *client.LabelValuesRequest) (resp *client.LabelValuesResponse, err error) { defer recoverIngester(i.logger, &err) - resp, cleanup, err := i.labelsValuesCommon(ctx, req) - defer cleanup() + + userID, userErr := users.TenantID(ctx) + if userErr != nil { + return nil, userErr + } + + // Set pprof labels for profiling + pprof.Do(ctx, pprof.Labels("user", userID), func(ctx context.Context) { + var cleanup func() + resp, cleanup, err = i.labelsValuesCommon(ctx, req) + defer cleanup() + }) return resp, err } // LabelValuesStream returns all label values that are associated with a given label name. func (i *Ingester) LabelValuesStream(req *client.LabelValuesRequest, stream client.Ingester_LabelValuesStreamServer) (err error) { defer recoverIngester(i.logger, &err) - resp, cleanup, err := i.labelsValuesCommon(stream.Context(), req) - defer cleanup() - if err != nil { - return err + ctx := stream.Context() + userID, userErr := users.TenantID(ctx) + if userErr != nil { + return userErr } - for i := 0; i < len(resp.LabelValues); i += metadataStreamBatchSize { - j := min(i+metadataStreamBatchSize, len(resp.LabelValues)) - resp := &client.LabelValuesStreamResponse{ - LabelValues: resp.LabelValues[i:j], - } - err := client.SendLabelValuesStream(stream, resp) + // Set pprof labels for profiling + pprof.Do(ctx, pprof.Labels("user", userID), func(ctx context.Context) { + var resp *client.LabelValuesResponse + var cleanup func() + resp, cleanup, err = i.labelsValuesCommon(ctx, req) + defer cleanup() + if err != nil { - return err + return } - } - return nil + for i := 0; i < len(resp.LabelValues); i += metadataStreamBatchSize { + j := min(i+metadataStreamBatchSize, len(resp.LabelValues)) + resp := &client.LabelValuesStreamResponse{ + LabelValues: resp.LabelValues[i:j], + } + err = client.SendLabelValuesStream(stream, resp) + if err != nil { + return + } + } + }) + + return err } // labelsValuesCommon returns all label values that are associated with a given label name. @@ -1930,33 +1961,55 @@ func (i *Ingester) labelsValuesCommon(ctx context.Context, req *client.LabelValu // LabelNames return all the label names. func (i *Ingester) LabelNames(ctx context.Context, req *client.LabelNamesRequest) (resp *client.LabelNamesResponse, err error) { defer recoverIngester(i.logger, &err) - resp, cleanup, err := i.labelNamesCommon(ctx, req) - defer cleanup() + + userID, userErr := users.TenantID(ctx) + if userErr != nil { + return nil, userErr + } + + // Set pprof labels for profiling + pprof.Do(ctx, pprof.Labels("user", userID), func(ctx context.Context) { + var cleanup func() + resp, cleanup, err = i.labelNamesCommon(ctx, req) + defer cleanup() + }) return resp, err } // LabelNamesStream return all the label names. func (i *Ingester) LabelNamesStream(req *client.LabelNamesRequest, stream client.Ingester_LabelNamesStreamServer) (err error) { defer recoverIngester(i.logger, &err) - resp, cleanup, err := i.labelNamesCommon(stream.Context(), req) - defer cleanup() - if err != nil { - return err + ctx := stream.Context() + userID, userErr := users.TenantID(ctx) + if userErr != nil { + return userErr } - for i := 0; i < len(resp.LabelNames); i += metadataStreamBatchSize { - j := min(i+metadataStreamBatchSize, len(resp.LabelNames)) - resp := &client.LabelNamesStreamResponse{ - LabelNames: resp.LabelNames[i:j], - } - err = client.SendLabelNamesStream(stream, resp) + // Set pprof labels for profiling + pprof.Do(ctx, pprof.Labels("user", userID), func(ctx context.Context) { + var resp *client.LabelNamesResponse + var cleanup func() + resp, cleanup, err = i.labelNamesCommon(ctx, req) + defer cleanup() + if err != nil { - return err + return } - } - return nil + for i := 0; i < len(resp.LabelNames); i += metadataStreamBatchSize { + j := min(i+metadataStreamBatchSize, len(resp.LabelNames)) + resp := &client.LabelNamesStreamResponse{ + LabelNames: resp.LabelNames[i:j], + } + err = client.SendLabelNamesStream(stream, resp) + if err != nil { + return + } + } + }) + + return err } // labelNamesCommon return all the label names. @@ -2024,49 +2077,70 @@ func (i *Ingester) labelNamesCommon(ctx context.Context, req *client.LabelNamesR // MetricsForLabelMatchers returns all the metrics which match a set of matchers. func (i *Ingester) MetricsForLabelMatchers(ctx context.Context, req *client.MetricsForLabelMatchersRequest) (result *client.MetricsForLabelMatchersResponse, err error) { defer recoverIngester(i.logger, &err) - result = &client.MetricsForLabelMatchersResponse{} - cleanup, err := i.metricsForLabelMatchersCommon(ctx, req, func(l labels.Labels) error { - result.Metric = append(result.Metric, &cortexpb.Metric{ - Labels: cortexpb.FromLabelsToLabelAdapters(l), + + userID, userErr := users.TenantID(ctx) + if userErr != nil { + return nil, userErr + } + + // Set pprof labels for profiling + pprof.Do(ctx, pprof.Labels("user", userID), func(ctx context.Context) { + result = &client.MetricsForLabelMatchersResponse{} + var cleanup func() + cleanup, err = i.metricsForLabelMatchersCommon(ctx, req, func(l labels.Labels) error { + result.Metric = append(result.Metric, &cortexpb.Metric{ + Labels: cortexpb.FromLabelsToLabelAdapters(l), + }) + return nil }) - return nil + defer cleanup() }) - defer cleanup() return result, err } func (i *Ingester) MetricsForLabelMatchersStream(req *client.MetricsForLabelMatchersRequest, stream client.Ingester_MetricsForLabelMatchersStreamServer) (err error) { defer recoverIngester(i.logger, &err) - result := &client.MetricsForLabelMatchersStreamResponse{} - cleanup, err := i.metricsForLabelMatchersCommon(stream.Context(), req, func(l labels.Labels) error { - result.Metric = append(result.Metric, &cortexpb.Metric{ - Labels: cortexpb.FromLabelsToLabelAdapters(l), + ctx := stream.Context() + userID, userErr := users.TenantID(ctx) + if userErr != nil { + return userErr + } + + // Set pprof labels for profiling + pprof.Do(ctx, pprof.Labels("user", userID), func(ctx context.Context) { + result := &client.MetricsForLabelMatchersStreamResponse{} + + var cleanup func() + cleanup, err = i.metricsForLabelMatchersCommon(ctx, req, func(l labels.Labels) error { + result.Metric = append(result.Metric, &cortexpb.Metric{ + Labels: cortexpb.FromLabelsToLabelAdapters(l), + }) + + if len(result.Metric) >= metadataStreamBatchSize { + err := client.SendMetricsForLabelMatchersStream(stream, result) + if err != nil { + return err + } + result.Metric = result.Metric[:0] + } + return nil }) + defer cleanup() + if err != nil { + return + } - if len(result.Metric) >= metadataStreamBatchSize { - err := client.SendMetricsForLabelMatchersStream(stream, result) + // Send last batch + if len(result.Metric) > 0 { + err = client.SendMetricsForLabelMatchersStream(stream, result) if err != nil { - return err + return } - result.Metric = result.Metric[:0] } - return nil }) - defer cleanup() - if err != nil { - return err - } - - // Send last batch - if len(result.Metric) > 0 { - err = client.SendMetricsForLabelMatchersStream(stream, result) - if err != nil { - return err - } - } - return nil + return err } // metricsForLabelMatchersCommon returns all the metrics which match a set of matchers. @@ -2160,7 +2234,7 @@ func (i *Ingester) metricsForLabelMatchersCommon(ctx context.Context, req *clien } // MetricsMetadata returns all the metric metadata of a user. -func (i *Ingester) MetricsMetadata(ctx context.Context, req *client.MetricsMetadataRequest) (*client.MetricsMetadataResponse, error) { +func (i *Ingester) MetricsMetadata(ctx context.Context, req *client.MetricsMetadataRequest) (resp *client.MetricsMetadataResponse, err error) { i.stoppedMtx.RLock() if err := i.checkRunningOrStopping(); err != nil { i.stoppedMtx.RUnlock() @@ -2173,13 +2247,19 @@ func (i *Ingester) MetricsMetadata(ctx context.Context, req *client.MetricsMetad return nil, err } - userMetadata := i.getUserMetadata(userID) + // Set pprof labels for profiling + pprof.Do(ctx, pprof.Labels("user", userID), func(ctx context.Context) { + userMetadata := i.getUserMetadata(userID) - if userMetadata == nil { - return &client.MetricsMetadataResponse{}, nil - } + if userMetadata == nil { + resp = &client.MetricsMetadataResponse{} + return + } + + resp = &client.MetricsMetadataResponse{Metadata: userMetadata.toClientMetadata(req)} + }) - return &client.MetricsMetadataResponse{Metadata: userMetadata.toClientMetadata(req)}, nil + return resp, nil } // CheckReady is the readiness handler used to indicate to k8s when the ingesters @@ -2309,6 +2389,15 @@ func (i *Ingester) QueryStream(req *client.QueryRequest, stream client.Ingester_ return err } + // Set pprof labels for profiling + pprof.Do(ctx, pprof.Labels("user", userID), func(ctx context.Context) { + err = i.queryStream(ctx, userID, req, stream, spanlog) + }) + + return err +} + +func (i *Ingester) queryStream(ctx context.Context, userID string, req *client.QueryRequest, stream client.Ingester_QueryStreamServer, spanlog *spanlogger.SpanLogger) error { from, through, matchers, err := client.FromQueryRequest(i.matchersCache, req) if err != nil { return err From 965dc5f816f0ae9511a8b5a60eff22e90977c8cb Mon Sep 17 00:00:00 2001 From: Alex Le Date: Mon, 5 Jan 2026 12:46:59 -0800 Subject: [PATCH 6/6] Add partitioned group creation time to logs where partitioned group id got logged and add performance logs for sharded block populator (#7181) Signed-off-by: Alex Le --- pkg/compactor/blocks_cleaner.go | 24 +++++---- pkg/compactor/blocks_cleaner_test.go | 2 +- pkg/compactor/partition_compaction_grouper.go | 16 +++--- pkg/compactor/partitioned_group_info.go | 50 +++++++++++-------- pkg/compactor/sharded_block_populator.go | 10 +++- pkg/storage/tsdb/meta_extensions.go | 5 ++ 6 files changed, 64 insertions(+), 43 deletions(-) diff --git a/pkg/compactor/blocks_cleaner.go b/pkg/compactor/blocks_cleaner.go index b2b9cf02777..22fe7abf915 100644 --- a/pkg/compactor/blocks_cleaner.go +++ b/pkg/compactor/blocks_cleaner.go @@ -319,7 +319,7 @@ func (c *BlocksCleaner) runEmitPartitionMetricsWorker(ctx context.Context, jobCh err := concurrency.ForEachUser(ctx, job.users, c.cfg.CleanupConcurrency, func(ctx context.Context, userID string) error { userLogger := util_log.WithUserID(userID, c.logger) userBucket := bucket.NewUserBucketClient(userID, c.bucketClient, c.cfgProvider) - c.emitUserParititionMetrics(ctx, userLogger, userBucket, userID) + c.emitUserPartitionMetrics(ctx, userLogger, userBucket, userID) return nil }) @@ -789,13 +789,14 @@ func (c *BlocksCleaner) cleanPartitionedGroupInfo(ctx context.Context, userBucke isPartitionGroupInfoDeleted := false partitionedGroupInfoFile := extraInfo.path deletedBlocksCount := 0 + partitionedGroupLogger := log.With(userLogger, "partitioned_group_id", partitionedGroupInfo.PartitionedGroupID, "partitioned_group_creation_time", partitionedGroupInfo.CreationTimeString()) if extraInfo.status.CanDelete { if extraInfo.status.IsCompleted { // Try to remove all blocks included in partitioned group info deletedBlocksCount, err = partitionedGroupInfo.markAllBlocksForDeletion(ctx, userBucket, userLogger, c.blocksMarkedForDeletion, userID) if err != nil { - level.Warn(userLogger).Log("msg", "unable to mark all blocks in partitioned group info for deletion", "partitioned_group_id", partitionedGroupInfo.PartitionedGroupID) + level.Warn(partitionedGroupLogger).Log("msg", "unable to mark all blocks in partitioned group info for deletion") // if one block can not be marked for deletion, we should // skip delete this partitioned group. next iteration // would try it again. @@ -804,13 +805,13 @@ func (c *BlocksCleaner) cleanPartitionedGroupInfo(ctx context.Context, userBucke } if deletedBlocksCount > 0 { - level.Info(userLogger).Log("msg", "parent blocks deleted, will delete partition group file in next cleaning cycle", "partitioned_group_id", partitionedGroupInfo.PartitionedGroupID) + level.Info(partitionedGroupLogger).Log("msg", "parent blocks deleted, will delete partition group file in next cleaning cycle") } else { - level.Info(userLogger).Log("msg", "deleting partition group because either all associated blocks have been deleted or partition group is invalid", "partitioned_group_id", partitionedGroupInfo.PartitionedGroupID) + level.Info(partitionedGroupLogger).Log("msg", "deleting partition group because either all associated blocks have been deleted or partition group is invalid") if err := userBucket.Delete(ctx, partitionedGroupInfoFile); err != nil { - level.Warn(userLogger).Log("msg", "failed to delete partitioned group info", "partitioned_group_info", partitionedGroupInfoFile, "err", err) + level.Warn(partitionedGroupLogger).Log("msg", "failed to delete partitioned group info", "partitioned_group_file", partitionedGroupInfoFile, "err", err) } else { - level.Info(userLogger).Log("msg", "deleted partitioned group info", "partitioned_group_info", partitionedGroupInfoFile) + level.Info(partitionedGroupLogger).Log("msg", "deleted partitioned group info", "partitioned_group_file", partitionedGroupInfoFile) isPartitionGroupInfoDeleted = true } } @@ -819,15 +820,15 @@ func (c *BlocksCleaner) cleanPartitionedGroupInfo(ctx context.Context, userBucke if isPartitionGroupInfoDeleted && (extraInfo.status.CanDelete || extraInfo.status.DeleteVisitMarker) { // Remove partition visit markers if _, err := bucket.DeletePrefix(ctx, userBucket, GetPartitionVisitMarkerDirectoryPath(partitionedGroupInfo.PartitionedGroupID), userLogger, defaultDeleteBlocksConcurrency); err != nil { - level.Warn(userLogger).Log("msg", "failed to delete partition visit markers for partitioned group", "partitioned_group_info", partitionedGroupInfoFile, "err", err) + level.Warn(partitionedGroupLogger).Log("msg", "failed to delete partition visit markers for partitioned group", "err", err) } else { - level.Info(userLogger).Log("msg", "deleted partition visit markers for partitioned group", "partitioned_group_info", partitionedGroupInfoFile) + level.Info(partitionedGroupLogger).Log("msg", "deleted partition visit markers for partitioned group") } } } } -func (c *BlocksCleaner) emitUserParititionMetrics(ctx context.Context, userLogger log.Logger, userBucket objstore.InstrumentedBucket, userID string) { +func (c *BlocksCleaner) emitUserPartitionMetrics(ctx context.Context, userLogger log.Logger, userBucket objstore.InstrumentedBucket, userID string) { existentPartitionedGroupInfo, err := c.iterPartitionGroups(ctx, userBucket, userLogger) if err != nil { level.Warn(userLogger).Log("msg", "error listing partitioned group directory to emit metrics", "err", err) @@ -842,7 +843,7 @@ func (c *BlocksCleaner) emitUserParititionMetrics(ctx context.Context, userLogge c.inProgressCompactions.WithLabelValues(userID).Set(float64(inProgressCompactions)) if oldestPartitionGroup != nil { c.oldestPartitionGroupOffset.WithLabelValues(userID).Set(float64(time.Now().Unix() - oldestPartitionGroup.CreationTime)) - level.Debug(userLogger).Log("msg", "partition group info with oldest creation time", "partitioned_group_id", oldestPartitionGroup.PartitionedGroupID, "creation_time", oldestPartitionGroup.CreationTime) + level.Debug(userLogger).Log("msg", "partition group info with oldest creation time", "partitioned_group_id", oldestPartitionGroup.PartitionedGroupID, "creation_time", oldestPartitionGroup.CreationTimeString()) } else { c.oldestPartitionGroupOffset.WithLabelValues(userID).Set(0) } @@ -874,8 +875,9 @@ func (c *BlocksCleaner) iterPartitionGroups(ctx context.Context, userBucket objs return nil } + partitionedGroupLogger := log.With(userLogger, "partitioned_group_id", partitionedGroupInfo.PartitionedGroupID, "partitioned_group_creation_time", partitionedGroupInfo.CreationTimeString()) status := partitionedGroupInfo.getPartitionedGroupStatus(ctx, userBucket, c.compactionVisitMarkerTimeout, userLogger) - level.Debug(userLogger).Log("msg", "got partitioned group status", "partitioned_group_status", status.String()) + level.Debug(partitionedGroupLogger).Log("msg", "got partitioned group status", "partitioned_group_status", status.String()) existentPartitionedGroupInfo[partitionedGroupInfo] = struct { path string status PartitionedGroupStatus diff --git a/pkg/compactor/blocks_cleaner_test.go b/pkg/compactor/blocks_cleaner_test.go index 9b317474fa8..e5e5037db80 100644 --- a/pkg/compactor/blocks_cleaner_test.go +++ b/pkg/compactor/blocks_cleaner_test.go @@ -1256,7 +1256,7 @@ func TestBlocksCleaner_EmitUserMetrics(t *testing.T) { err = v4Manager.updateVisitMarker(ctx) require.NoError(t, err) - cleaner.emitUserParititionMetrics(ctx, logger, userBucket, userID) + cleaner.emitUserPartitionMetrics(ctx, logger, userBucket, userID) metricNames := []string{ "cortex_compactor_remaining_planned_compactions", diff --git a/pkg/compactor/partition_compaction_grouper.go b/pkg/compactor/partition_compaction_grouper.go index 4f1d955bfc6..839cd74e2a9 100644 --- a/pkg/compactor/partition_compaction_grouper.go +++ b/pkg/compactor/partition_compaction_grouper.go @@ -171,7 +171,7 @@ func (g *PartitionCompactionGrouper) generateCompactionJobs(blocks map[ulid.ULID var blockIDs []string for _, p := range existingPartitionedGroups { blockIDs = p.getAllBlockIDs() - level.Info(g.logger).Log("msg", "existing partitioned group", "partitioned_group_id", p.PartitionedGroupID, "partition_count", p.PartitionCount, "rangeStart", p.rangeStartTime().String(), "rangeEnd", p.rangeEndTime().String(), "blocks", strings.Join(blockIDs, ",")) + level.Info(g.logger).Log("msg", "existing partitioned group", "partitioned_group_id", p.PartitionedGroupID, "partitioned_group_creation_time", p.CreationTimeString(), "partition_count", p.PartitionCount, "rangeStart", p.rangeStartTime().String(), "rangeEnd", p.rangeEndTime().String(), "blocks", strings.Join(blockIDs, ",")) } allPartitionedGroup, err := g.generatePartitionedGroups(blocks, groups, existingPartitionedGroups, timeRanges) @@ -181,13 +181,13 @@ func (g *PartitionCompactionGrouper) generateCompactionJobs(blocks map[ulid.ULID g.sortPartitionedGroups(allPartitionedGroup) for _, p := range allPartitionedGroup { blockIDs = p.getAllBlockIDs() - level.Info(g.logger).Log("msg", "partitioned group ready for compaction", "partitioned_group_id", p.PartitionedGroupID, "partition_count", p.PartitionCount, "rangeStart", p.rangeStartTime().String(), "rangeEnd", p.rangeEndTime().String(), "blocks", strings.Join(blockIDs, ",")) + level.Info(g.logger).Log("msg", "partitioned group ready for compaction", "partitioned_group_id", p.PartitionedGroupID, "partitioned_group_creation_time", p.CreationTimeString(), "partition_count", p.PartitionCount, "rangeStart", p.rangeStartTime().String(), "rangeEnd", p.rangeEndTime().String(), "blocks", strings.Join(blockIDs, ",")) } partitionCompactionJobs := g.generatePartitionCompactionJobs(blocks, allPartitionedGroup, g.doRandomPick) for _, p := range partitionCompactionJobs { blockIDs = p.getBlockIDs() - level.Info(g.logger).Log("msg", "partitioned compaction job", "partitioned_group_id", p.partitionedGroupInfo.PartitionedGroupID, "partition_id", p.partition.PartitionID, "partition_count", p.partitionedGroupInfo.PartitionCount, "rangeStart", p.rangeStartTime().String(), "rangeEnd", p.rangeEndTime().String(), "blocks", strings.Join(blockIDs, ",")) + level.Info(g.logger).Log("msg", "partitioned compaction job", "partitioned_group_id", p.partitionedGroupInfo.PartitionedGroupID, "partitioned_group_creation_time", p.partitionedGroupInfo.CreationTimeString(), "partition_id", p.partition.PartitionID, "partition_count", p.partitionedGroupInfo.PartitionCount, "rangeStart", p.rangeStartTime().String(), "rangeEnd", p.rangeEndTime().String(), "blocks", strings.Join(blockIDs, ",")) } return partitionCompactionJobs, nil } @@ -582,10 +582,10 @@ func (g *PartitionCompactionGrouper) generatePartitionCompactionJobs(blocks map[ partition := partitionedGroupInfo.Partitions[i] if len(partition.Blocks) == 1 { partition.Blocks = append(partition.Blocks, DUMMY_BLOCK_ID) - level.Info(g.logger).Log("msg", "handled single block in partition", "partitioned_group_id", partitionedGroupInfo.PartitionedGroupID, "partition_count", partitionedGroupInfo.PartitionCount, "partition_id", partition.PartitionID) + level.Info(g.logger).Log("msg", "handled single block in partition", "partitioned_group_id", partitionedGroupInfo.PartitionedGroupID, "partitioned_group_creation_time", partitionedGroupInfo.CreationTimeString(), "partition_count", partitionedGroupInfo.PartitionCount, "partition_id", partition.PartitionID) } else if len(partition.Blocks) < 1 { if err := g.handleEmptyPartition(partitionedGroupInfo, partition); err != nil { - level.Warn(g.logger).Log("msg", "failed to handle empty partition", "partitioned_group_id", partitionedGroupInfo.PartitionedGroupID, "partition_count", partitionedGroupInfo.PartitionCount, "partition_id", partition.PartitionID, "err", err) + level.Warn(g.logger).Log("msg", "failed to handle empty partition", "partitioned_group_id", partitionedGroupInfo.PartitionedGroupID, "partitioned_group_creation_time", partitionedGroupInfo.CreationTimeString(), "partition_count", partitionedGroupInfo.PartitionCount, "partition_id", partition.PartitionID, "err", err) } continue } @@ -609,7 +609,7 @@ func (g *PartitionCompactionGrouper) handleEmptyPartition(partitionedGroupInfo * return nil } - level.Info(g.logger).Log("msg", "handling empty block partition", "partitioned_group_id", partitionedGroupInfo.PartitionedGroupID, "partition_count", partitionedGroupInfo.PartitionCount, "partition_id", partition.PartitionID) + level.Info(g.logger).Log("msg", "handling empty block partition", "partitioned_group_id", partitionedGroupInfo.PartitionedGroupID, "partitioned_group_creation_time", partitionedGroupInfo.CreationTimeString(), "partition_count", partitionedGroupInfo.PartitionCount, "partition_id", partition.PartitionID) visitMarker := &partitionVisitMarker{ PartitionedGroupID: partitionedGroupInfo.PartitionedGroupID, PartitionID: partition.PartitionID, @@ -618,7 +618,7 @@ func (g *PartitionCompactionGrouper) handleEmptyPartition(partitionedGroupInfo * visitMarkerManager := NewVisitMarkerManager(g.bkt, g.logger, g.ringLifecyclerID, visitMarker) visitMarkerManager.MarkWithStatus(g.ctx, Completed) - level.Info(g.logger).Log("msg", "handled empty block in partition", "partitioned_group_id", partitionedGroupInfo.PartitionedGroupID, "partition_count", partitionedGroupInfo.PartitionCount, "partition_id", partition.PartitionID) + level.Info(g.logger).Log("msg", "handled empty block in partition", "partitioned_group_id", partitionedGroupInfo.PartitionedGroupID, "partitioned_group_creation_time", partitionedGroupInfo.CreationTimeString(), "partition_count", partitionedGroupInfo.PartitionCount, "partition_id", partition.PartitionID) return nil } @@ -720,7 +720,7 @@ func (g *PartitionCompactionGrouper) pickPartitionCompactionJob(partitionCompact for _, p := range outGroups { partitionInfo, err := tsdb.ConvertToPartitionInfo(p.Extensions()) if err == nil && partitionInfo != nil { - level.Info(g.logger).Log("msg", "picked compaction job", "partitioned_group_id", partitionInfo.PartitionedGroupID, "partition_count", partitionInfo.PartitionCount) + level.Info(g.logger).Log("msg", "picked compaction job", "partitioned_group_id", partitionInfo.PartitionedGroupID, "partitioned_group_creation_time", partitionInfo.CreationTimeString(), "partition_count", partitionInfo.PartitionCount) } } return outGroups diff --git a/pkg/compactor/partitioned_group_info.go b/pkg/compactor/partitioned_group_info.go index 9d9d1fd7859..a43bb262301 100644 --- a/pkg/compactor/partitioned_group_info.go +++ b/pkg/compactor/partitioned_group_info.go @@ -120,6 +120,7 @@ func (p *PartitionedGroupInfo) getPartitionedGroupStatus( partitionVisitMarkerTimeout time.Duration, userLogger log.Logger, ) PartitionedGroupStatus { + partitionedGroupLogger := log.With(userLogger, "partitioned_group_id", p.PartitionedGroupID, "partitioned_group_creation_time", p.CreationTimeString()) status := PartitionedGroupStatus{ PartitionedGroupID: p.PartitionedGroupID, CanDelete: false, @@ -136,13 +137,13 @@ func (p *PartitionedGroupInfo) getPartitionedGroupStatus( PartitionedGroupID: p.PartitionedGroupID, PartitionID: partition.PartitionID, } - visitMarkerManager := NewVisitMarkerManager(userBucket, userLogger, "PartitionedGroupInfo.getPartitionedGroupStatus", visitMarker) + visitMarkerManager := NewVisitMarkerManager(userBucket, partitionedGroupLogger, "PartitionedGroupInfo.getPartitionedGroupStatus", visitMarker) partitionVisitMarkerExists := true if err := visitMarkerManager.ReadVisitMarker(ctx, visitMarker); err != nil { if errors.Is(err, errorVisitMarkerNotFound) { partitionVisitMarkerExists = false } else { - level.Warn(userLogger).Log("msg", "unable to read partition visit marker", "path", visitMarker.GetVisitMarkerFilePath(), "err", err) + level.Warn(partitionedGroupLogger).Log("msg", "unable to read partition visit marker", "path", visitMarker.GetVisitMarkerFilePath(), "err", err) return status } } @@ -183,20 +184,20 @@ func (p *PartitionedGroupInfo) getPartitionedGroupStatus( if _, ok := checkedBlocks[blockID]; ok { continue } - if !p.doesBlockExist(ctx, userBucket, userLogger, blockID) { - level.Info(userLogger).Log("msg", "delete partitioned group", "reason", "block is physically deleted", "block", blockID) + if !p.doesBlockExist(ctx, userBucket, partitionedGroupLogger, blockID) { + level.Info(partitionedGroupLogger).Log("msg", "delete partitioned group", "reason", "block is physically deleted", "block", blockID) status.CanDelete = true status.DeleteVisitMarker = true return status } - if p.isBlockDeleted(ctx, userBucket, userLogger, blockID) { - level.Info(userLogger).Log("msg", "delete partitioned group", "reason", "block is marked for deletion", "block", blockID) + if p.isBlockDeleted(ctx, userBucket, partitionedGroupLogger, blockID) { + level.Info(partitionedGroupLogger).Log("msg", "delete partitioned group", "reason", "block is marked for deletion", "block", blockID) status.CanDelete = true status.DeleteVisitMarker = true return status } - if p.isBlockNoCompact(ctx, userBucket, userLogger, blockID) { - level.Info(userLogger).Log("msg", "delete partitioned group", "reason", "block is marked for no compact", "block", blockID) + if p.isBlockNoCompact(ctx, userBucket, partitionedGroupLogger, blockID) { + level.Info(partitionedGroupLogger).Log("msg", "delete partitioned group", "reason", "block is marked for no compact", "block", blockID) status.CanDelete = true status.DeleteVisitMarker = true return status @@ -207,28 +208,28 @@ func (p *PartitionedGroupInfo) getPartitionedGroupStatus( return status } -func (p *PartitionedGroupInfo) doesBlockExist(ctx context.Context, userBucket objstore.InstrumentedBucket, userLogger log.Logger, blockID ulid.ULID) bool { +func (p *PartitionedGroupInfo) doesBlockExist(ctx context.Context, userBucket objstore.InstrumentedBucket, partitionedGroupLogger log.Logger, blockID ulid.ULID) bool { metaExists, err := userBucket.Exists(ctx, path.Join(blockID.String(), metadata.MetaFilename)) if err != nil { - level.Warn(userLogger).Log("msg", "unable to get stats of meta.json for block", "partitioned_group_id", p.PartitionedGroupID, "block", blockID.String()) + level.Warn(partitionedGroupLogger).Log("msg", "unable to get stats of meta.json for block", "block", blockID.String()) return true } return metaExists } -func (p *PartitionedGroupInfo) isBlockDeleted(ctx context.Context, userBucket objstore.InstrumentedBucket, userLogger log.Logger, blockID ulid.ULID) bool { +func (p *PartitionedGroupInfo) isBlockDeleted(ctx context.Context, userBucket objstore.InstrumentedBucket, partitionedGroupLogger log.Logger, blockID ulid.ULID) bool { deletionMarkerExists, err := userBucket.Exists(ctx, path.Join(blockID.String(), metadata.DeletionMarkFilename)) if err != nil { - level.Warn(userLogger).Log("msg", "unable to get stats of deletion-mark.json for block", "partitioned_group_id", p.PartitionedGroupID, "block", blockID.String()) + level.Warn(partitionedGroupLogger).Log("msg", "unable to get stats of deletion-mark.json for block", "block", blockID.String()) return false } return deletionMarkerExists } -func (p *PartitionedGroupInfo) isBlockNoCompact(ctx context.Context, userBucket objstore.InstrumentedBucket, userLogger log.Logger, blockID ulid.ULID) bool { +func (p *PartitionedGroupInfo) isBlockNoCompact(ctx context.Context, userBucket objstore.InstrumentedBucket, partitionedGroupLogger log.Logger, blockID ulid.ULID) bool { noCompactMarkerExists, err := userBucket.Exists(ctx, path.Join(blockID.String(), metadata.NoCompactMarkFilename)) if err != nil { - level.Warn(userLogger).Log("msg", "unable to get stats of no-compact-mark.json for block", "partitioned_group_id", p.PartitionedGroupID, "block", blockID.String()) + level.Warn(partitionedGroupLogger).Log("msg", "unable to get stats of no-compact-mark.json for block", "block", blockID.String()) return false } return noCompactMarkerExists @@ -237,17 +238,18 @@ func (p *PartitionedGroupInfo) isBlockNoCompact(ctx context.Context, userBucket func (p *PartitionedGroupInfo) markAllBlocksForDeletion(ctx context.Context, userBucket objstore.InstrumentedBucket, userLogger log.Logger, blocksMarkedForDeletion *prometheus.CounterVec, userID string) (int, error) { blocks := p.getAllBlocks() deleteBlocksCount := 0 + partitionedGroupLogger := log.With(userLogger, "partitioned_group_id", p.PartitionedGroupID, "partitioned_group_creation_time", p.CreationTimeString()) defer func() { - level.Info(userLogger).Log("msg", "total number of blocks marked for deletion during partitioned group info clean up", "count", deleteBlocksCount) + level.Info(partitionedGroupLogger).Log("msg", "total number of blocks marked for deletion during partitioned group info clean up", "count", deleteBlocksCount) }() for _, blockID := range blocks { - if p.doesBlockExist(ctx, userBucket, userLogger, blockID) && !p.isBlockDeleted(ctx, userBucket, userLogger, blockID) && !p.isBlockNoCompact(ctx, userBucket, userLogger, blockID) { - if err := block.MarkForDeletion(ctx, userLogger, userBucket, blockID, "delete block during partitioned group completion check", blocksMarkedForDeletion.WithLabelValues(userID, reasonValueRetention)); err != nil { - level.Warn(userLogger).Log("msg", "unable to mark block for deletion", "partitioned_group_id", p.PartitionedGroupID, "block", blockID.String()) + if p.doesBlockExist(ctx, userBucket, partitionedGroupLogger, blockID) && !p.isBlockDeleted(ctx, userBucket, partitionedGroupLogger, blockID) && !p.isBlockNoCompact(ctx, userBucket, partitionedGroupLogger, blockID) { + if err := block.MarkForDeletion(ctx, partitionedGroupLogger, userBucket, blockID, "delete block during partitioned group completion check", blocksMarkedForDeletion.WithLabelValues(userID, reasonValueRetention)); err != nil { + level.Warn(partitionedGroupLogger).Log("msg", "unable to mark block for deletion", "block", blockID.String()) return deleteBlocksCount, err } deleteBlocksCount++ - level.Debug(userLogger).Log("msg", "marked block for deletion during partitioned group info clean up", "partitioned_group_id", p.PartitionedGroupID, "block", blockID.String()) + level.Debug(partitionedGroupLogger).Log("msg", "marked block for deletion during partitioned group info clean up", "block", blockID.String()) } } return deleteBlocksCount, nil @@ -258,7 +260,11 @@ func (p *PartitionedGroupInfo) String() string { for _, partition := range p.Partitions { partitions = append(partitions, fmt.Sprintf("(PartitionID: %d, Blocks: %s)", partition.PartitionID, partition.Blocks)) } - return fmt.Sprintf("{PartitionedGroupID: %d, PartitionCount: %d, Partitions: %s}", p.PartitionedGroupID, p.PartitionCount, strings.Join(partitions, ", ")) + return fmt.Sprintf("{PartitionedGroupID: %d, CreationTime: %s, PartitionCount: %d, Partitions: %s}", p.PartitionedGroupID, p.CreationTimeString(), p.PartitionCount, strings.Join(partitions, ", ")) +} + +func (p *PartitionedGroupInfo) CreationTimeString() string { + return time.Unix(p.CreationTime, 0).Format(time.RFC3339) } func GetPartitionedGroupFile(partitionedGroupID uint32) string { @@ -304,7 +310,7 @@ func UpdatePartitionedGroupInfo(ctx context.Context, bkt objstore.InstrumentedBu // partitioned group info which is supposed to be the correct grouping based on latest bucket store. existingPartitionedGroup, _ := ReadPartitionedGroupInfo(ctx, bkt, logger, partitionedGroupInfo.PartitionedGroupID) if existingPartitionedGroup != nil { - level.Warn(logger).Log("msg", "partitioned group info already exists", "partitioned_group_id", partitionedGroupInfo.PartitionedGroupID) + level.Warn(logger).Log("msg", "partitioned group info already exists", "partitioned_group_id", partitionedGroupInfo.PartitionedGroupID, "partitioned_group_creation_time", partitionedGroupInfo.CreationTimeString()) return existingPartitionedGroup, nil } if partitionedGroupInfo.CreationTime <= 0 { @@ -319,6 +325,6 @@ func UpdatePartitionedGroupInfo(ctx context.Context, bkt objstore.InstrumentedBu if err := bkt.Upload(ctx, partitionedGroupFile, reader); err != nil { return nil, err } - level.Info(logger).Log("msg", "created new partitioned group info", "partitioned_group_id", partitionedGroupInfo.PartitionedGroupID) + level.Info(logger).Log("msg", "created new partitioned group info", "partitioned_group_id", partitionedGroupInfo.PartitionedGroupID, "partitioned_group_creation_time", partitionedGroupInfo.CreationTimeString()) return &partitionedGroupInfo, nil } diff --git a/pkg/compactor/sharded_block_populator.go b/pkg/compactor/sharded_block_populator.go index a8d4228d13b..c7f8932367e 100644 --- a/pkg/compactor/sharded_block_populator.go +++ b/pkg/compactor/sharded_block_populator.go @@ -52,6 +52,7 @@ func (c ShardedBlockPopulator) PopulateBlock(ctx context.Context, metrics *tsdb. }() metrics.PopulatingBlocks.Set(1) + begin := time.Now() globalMaxt := blocks[0].Meta().MaxTime g, gCtx := errgroup.WithContext(ctx) g.SetLimit(8) @@ -91,7 +92,7 @@ func (c ShardedBlockPopulator) PopulateBlock(ctx context.Context, metrics *tsdb. if err != nil { return err } - level.Debug(c.logger).Log("msg", "finished sharding", "duration", time.Since(shardStart)) + level.Debug(c.logger).Log("msg", "finished sharding", "duration", time.Since(shardStart), "duration_ms", time.Since(shardStart).Milliseconds()) // Blocks meta is half open: [min, max), so subtract 1 to ensure we don't hold samples with exact meta.MaxTime timestamp. setsMtx.Lock() sets = append(sets, tsdb.NewBlockChunkSeriesSet(meta.ULID, indexr, chunkr, tombsr, shardedPosting, meta.MinTime, meta.MaxTime-1, false)) @@ -103,7 +104,9 @@ func (c ShardedBlockPopulator) PopulateBlock(ctx context.Context, metrics *tsdb. if err := g.Wait(); err != nil { return err } + level.Info(c.logger).Log("msg", "finished sharding all blocks and created series sets", "series_sets_count", len(sets), "symbols_count", len(symbols), "duration", time.Since(begin), "duration_ms", time.Since(begin).Milliseconds()) + begin = time.Now() symbolsList := make([]string, len(symbols)) symbolIdx := 0 for symbol := range symbols { @@ -116,6 +119,7 @@ func (c ShardedBlockPopulator) PopulateBlock(ctx context.Context, metrics *tsdb. return errors.Wrap(err, "add symbol") } } + level.Info(c.logger).Log("msg", "finished sorting symbols and added to index", "symbols_count", len(symbols), "duration", time.Since(begin), "duration_ms", time.Since(begin).Milliseconds()) var ( ref = storage.SeriesRef(0) @@ -131,6 +135,8 @@ func (c ShardedBlockPopulator) PopulateBlock(ctx context.Context, metrics *tsdb. defer cancel() } + begin = time.Now() + seriesCount := 0 go func() { // Iterate over all sorted chunk series. for set.Next() { @@ -173,6 +179,7 @@ func (c ShardedBlockPopulator) PopulateBlock(ctx context.Context, metrics *tsdb. if err := indexw.AddSeries(r, s.Labels(), chks...); err != nil { return errors.Wrap(err, "add series") } + seriesCount++ meta.Stats.NumChunks += uint64(len(chks)) meta.Stats.NumSeries++ @@ -209,6 +216,7 @@ func (c ShardedBlockPopulator) PopulateBlock(ctx context.Context, metrics *tsdb. return err } } + level.Info(c.logger).Log("msg", "finished iterating all series sets", "series_sets_count", len(sets), "series_count", seriesCount, "duration", time.Since(begin), "duration_ms", time.Since(begin).Milliseconds()) if set.Err() != nil { return errors.Wrap(set.Err(), "iterate compaction set") diff --git a/pkg/storage/tsdb/meta_extensions.go b/pkg/storage/tsdb/meta_extensions.go index b6b8a7acf07..970c632839d 100644 --- a/pkg/storage/tsdb/meta_extensions.go +++ b/pkg/storage/tsdb/meta_extensions.go @@ -3,6 +3,7 @@ package tsdb import ( "fmt" "strconv" + "time" "github.com/thanos-io/thanos/pkg/block/metadata" ) @@ -19,6 +20,10 @@ type PartitionInfo struct { PartitionedGroupCreationTime int64 `json:"partitioned_group_creation_time"` } +func (p *PartitionInfo) CreationTimeString() string { + return time.Unix(p.PartitionedGroupCreationTime, 0).Format(time.RFC3339) +} + var ( DefaultPartitionInfo = PartitionInfo{ PartitionedGroupID: 0,