diff --git a/CHANGELOG.md b/CHANGELOG.md index 17b10223a33..f110486d740 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,7 +1,9 @@ # Changelog ## master / unreleased - +* [CHANGE] Users Scanner: Rename user index update configuration. #7180 + * Flag: Renamed `-*.users-scanner.user-index.cleanup-interval` to `-*.users-scanner.user-index.update-interval`. + * Config: Renamed `clean_up_interval` to `update_interval` within the `users_scanner` configuration block.. * [CHANGE] Querier: Refactored parquet cache configuration naming. #7146 * Metrics: Renamed `cortex_parquet_queryable_cache_*` to `cortex_parquet_cache_*`. * Flags: Renamed `-querier.parquet-queryable-shard-cache-size` to `-querier.parquet-shard-cache-size` and `-querier.parquet-queryable-shard-cache-ttl` to `-querier.parquet-shard-cache-ttl`. @@ -25,6 +27,7 @@ * [ENHANCEMENT] Compactor: Avoid double compaction by not filtering delete blocks on real time when using bucketIndex lister. #7156 * [ENHANCEMENT] Upgrade to go 1.25. #7164 * [ENHANCEMENT] Upgraded container base images to `alpine:3.23`. #7163 +* [ENHANCEMENT] Instrument Ingester CPU profile with userID for read APIs. #7184 * [BUGFIX] Ring: Change DynamoDB KV to retry indefinitely for WatchKey. #7088 * [BUGFIX] Ruler: Add XFunctions validation support. #7111 * [BUGFIX] Querier: propagate Prometheus info annotations in protobuf responses. #7132 diff --git a/docs/blocks-storage/querier.md b/docs/blocks-storage/querier.md index 915bf258bd5..3eb44525d6e 100644 --- a/docs/blocks-storage/querier.md +++ b/docs/blocks-storage/querier.md @@ -1948,8 +1948,8 @@ blocks_storage: # How frequently user index file is updated. It only takes effect when user # scan strategy is user_index. - # CLI flag: -blocks-storage.users-scanner.user-index.cleanup-interval - [clean_up_interval: | default = 15m] + # CLI flag: -blocks-storage.users-scanner.user-index.update-interval + [update_interval: | default = 15m] # TTL of the cached users. 0 disables caching and relies on caching at # bucket client level. diff --git a/docs/blocks-storage/store-gateway.md b/docs/blocks-storage/store-gateway.md index 868f534b2a0..56dc288dad4 100644 --- a/docs/blocks-storage/store-gateway.md +++ b/docs/blocks-storage/store-gateway.md @@ -2019,8 +2019,8 @@ blocks_storage: # How frequently user index file is updated. It only takes effect when user # scan strategy is user_index. - # CLI flag: -blocks-storage.users-scanner.user-index.cleanup-interval - [clean_up_interval: | default = 15m] + # CLI flag: -blocks-storage.users-scanner.user-index.update-interval + [update_interval: | default = 15m] # TTL of the cached users. 0 disables caching and relies on caching at # bucket client level. diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 260f9a40a69..d3de1ce279a 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -999,8 +999,8 @@ users_scanner: # How frequently user index file is updated. It only takes effect when user # scan strategy is user_index. - # CLI flag: -alertmanager-storage.users-scanner.user-index.cleanup-interval - [clean_up_interval: | default = 15m] + # CLI flag: -alertmanager-storage.users-scanner.user-index.update-interval + [update_interval: | default = 15m] # TTL of the cached users. 0 disables caching and relies on caching at bucket # client level. @@ -2624,8 +2624,8 @@ users_scanner: # How frequently user index file is updated. It only takes effect when user # scan strategy is user_index. - # CLI flag: -blocks-storage.users-scanner.user-index.cleanup-interval - [clean_up_interval: | default = 15m] + # CLI flag: -blocks-storage.users-scanner.user-index.update-interval + [update_interval: | default = 15m] # TTL of the cached users. 0 disables caching and relies on caching at bucket # client level. @@ -5145,9 +5145,10 @@ The `redis_config` configures the Redis backend cache. The `ruler_config` configures the Cortex ruler. ```yaml -# [Experimental] GRPC listen address of the Query Frontend, in host:port format. -# If set, Ruler queries to Query Frontends via gRPC. If not set, ruler queries -# to Ingesters directly. +# [Experimental] gRPC address of the Query Frontend (host:port). If set, the +# Ruler send queries to the Query Frontend to utilize splitting and caching, at +# the cost of additional network hops compared to direct querying to Ingesters +# and Store Gateway. # CLI flag: -ruler.frontend-address [frontend_address: | default = ""] @@ -5904,8 +5905,8 @@ users_scanner: # How frequently user index file is updated. It only takes effect when user # scan strategy is user_index. - # CLI flag: -ruler-storage.users-scanner.user-index.cleanup-interval - [clean_up_interval: | default = 15m] + # CLI flag: -ruler-storage.users-scanner.user-index.update-interval + [update_interval: | default = 15m] # TTL of the cached users. 0 disables caching and relies on caching at bucket # client level. diff --git a/integration/alertmanager_test.go b/integration/alertmanager_test.go index 7eb2be2d47f..cbbdccdce15 100644 --- a/integration/alertmanager_test.go +++ b/integration/alertmanager_test.go @@ -82,9 +82,9 @@ func TestAlertmanagerWithUserIndexUpdater(t *testing.T) { baseFlags := mergeFlags(AlertmanagerFlags(), AlertmanagerS3Flags()) flags := mergeFlags(baseFlags, AlertmanagerShardingFlags(consul.NetworkHTTPEndpoint(), 1), map[string]string{ - "-alertmanager-storage.users-scanner.strategy": "user_index", - "-alertmanager-storage.users-scanner.user-index.cleanup-interval": "15s", - "-alertmanager.configs.poll-interval": "5s", + "-alertmanager-storage.users-scanner.strategy": "user_index", + "-alertmanager-storage.users-scanner.user-index.update-interval": "15s", + "-alertmanager.configs.poll-interval": "5s", }) am := e2ecortex.NewAlertmanager( diff --git a/integration/ruler_test.go b/integration/ruler_test.go index e34c3efa5a6..56f29295aae 100644 --- a/integration/ruler_test.go +++ b/integration/ruler_test.go @@ -158,10 +158,10 @@ func TestRulerWithUserIndexUpdater(t *testing.T) { RulerFlags(), RulerShardingFlags(consul.NetworkHTTPEndpoint()), map[string]string{ - "-ruler.sharding-strategy": "shuffle-sharding", - "-ruler-storage.users-scanner.strategy": "user_index", - "-ruler-storage.users-scanner.user-index.cleanup-interval": "15s", - "-ruler.tenant-shard-size": "1", + "-ruler.sharding-strategy": "shuffle-sharding", + "-ruler-storage.users-scanner.strategy": "user_index", + "-ruler-storage.users-scanner.user-index.update-interval": "15s", + "-ruler.tenant-shard-size": "1", // Since we're not going to run any rule, we don't need the // store-gateway to be configured to a valid address. "-querier.store-gateway-addresses": "localhost:12345", diff --git a/pkg/alertmanager/alertstore/bucketclient/bucket_client.go b/pkg/alertmanager/alertstore/bucketclient/bucket_client.go index 989df29ebe0..ba3a3639e71 100644 --- a/pkg/alertmanager/alertstore/bucketclient/bucket_client.go +++ b/pkg/alertmanager/alertstore/bucketclient/bucket_client.go @@ -67,7 +67,7 @@ func NewBucketAlertStore(bkt objstore.InstrumentedBucket, userScannerCfg users.U baseScanner, _ := users.NewScanner(users.UsersScannerConfig{ Strategy: users.UserScanStrategyList, }, alertBucket, logger, regWithComponent) - userIndexUpdater = users.NewUserIndexUpdater(alertBucket, userScannerCfg.CleanUpInterval, baseScanner, regWithComponent) + userIndexUpdater = users.NewUserIndexUpdater(alertBucket, userScannerCfg.UpdateInterval, baseScanner, regWithComponent) } return &BucketAlertStore{ diff --git a/pkg/compactor/blocks_cleaner.go b/pkg/compactor/blocks_cleaner.go index b2b9cf02777..22fe7abf915 100644 --- a/pkg/compactor/blocks_cleaner.go +++ b/pkg/compactor/blocks_cleaner.go @@ -319,7 +319,7 @@ func (c *BlocksCleaner) runEmitPartitionMetricsWorker(ctx context.Context, jobCh err := concurrency.ForEachUser(ctx, job.users, c.cfg.CleanupConcurrency, func(ctx context.Context, userID string) error { userLogger := util_log.WithUserID(userID, c.logger) userBucket := bucket.NewUserBucketClient(userID, c.bucketClient, c.cfgProvider) - c.emitUserParititionMetrics(ctx, userLogger, userBucket, userID) + c.emitUserPartitionMetrics(ctx, userLogger, userBucket, userID) return nil }) @@ -789,13 +789,14 @@ func (c *BlocksCleaner) cleanPartitionedGroupInfo(ctx context.Context, userBucke isPartitionGroupInfoDeleted := false partitionedGroupInfoFile := extraInfo.path deletedBlocksCount := 0 + partitionedGroupLogger := log.With(userLogger, "partitioned_group_id", partitionedGroupInfo.PartitionedGroupID, "partitioned_group_creation_time", partitionedGroupInfo.CreationTimeString()) if extraInfo.status.CanDelete { if extraInfo.status.IsCompleted { // Try to remove all blocks included in partitioned group info deletedBlocksCount, err = partitionedGroupInfo.markAllBlocksForDeletion(ctx, userBucket, userLogger, c.blocksMarkedForDeletion, userID) if err != nil { - level.Warn(userLogger).Log("msg", "unable to mark all blocks in partitioned group info for deletion", "partitioned_group_id", partitionedGroupInfo.PartitionedGroupID) + level.Warn(partitionedGroupLogger).Log("msg", "unable to mark all blocks in partitioned group info for deletion") // if one block can not be marked for deletion, we should // skip delete this partitioned group. next iteration // would try it again. @@ -804,13 +805,13 @@ func (c *BlocksCleaner) cleanPartitionedGroupInfo(ctx context.Context, userBucke } if deletedBlocksCount > 0 { - level.Info(userLogger).Log("msg", "parent blocks deleted, will delete partition group file in next cleaning cycle", "partitioned_group_id", partitionedGroupInfo.PartitionedGroupID) + level.Info(partitionedGroupLogger).Log("msg", "parent blocks deleted, will delete partition group file in next cleaning cycle") } else { - level.Info(userLogger).Log("msg", "deleting partition group because either all associated blocks have been deleted or partition group is invalid", "partitioned_group_id", partitionedGroupInfo.PartitionedGroupID) + level.Info(partitionedGroupLogger).Log("msg", "deleting partition group because either all associated blocks have been deleted or partition group is invalid") if err := userBucket.Delete(ctx, partitionedGroupInfoFile); err != nil { - level.Warn(userLogger).Log("msg", "failed to delete partitioned group info", "partitioned_group_info", partitionedGroupInfoFile, "err", err) + level.Warn(partitionedGroupLogger).Log("msg", "failed to delete partitioned group info", "partitioned_group_file", partitionedGroupInfoFile, "err", err) } else { - level.Info(userLogger).Log("msg", "deleted partitioned group info", "partitioned_group_info", partitionedGroupInfoFile) + level.Info(partitionedGroupLogger).Log("msg", "deleted partitioned group info", "partitioned_group_file", partitionedGroupInfoFile) isPartitionGroupInfoDeleted = true } } @@ -819,15 +820,15 @@ func (c *BlocksCleaner) cleanPartitionedGroupInfo(ctx context.Context, userBucke if isPartitionGroupInfoDeleted && (extraInfo.status.CanDelete || extraInfo.status.DeleteVisitMarker) { // Remove partition visit markers if _, err := bucket.DeletePrefix(ctx, userBucket, GetPartitionVisitMarkerDirectoryPath(partitionedGroupInfo.PartitionedGroupID), userLogger, defaultDeleteBlocksConcurrency); err != nil { - level.Warn(userLogger).Log("msg", "failed to delete partition visit markers for partitioned group", "partitioned_group_info", partitionedGroupInfoFile, "err", err) + level.Warn(partitionedGroupLogger).Log("msg", "failed to delete partition visit markers for partitioned group", "err", err) } else { - level.Info(userLogger).Log("msg", "deleted partition visit markers for partitioned group", "partitioned_group_info", partitionedGroupInfoFile) + level.Info(partitionedGroupLogger).Log("msg", "deleted partition visit markers for partitioned group") } } } } -func (c *BlocksCleaner) emitUserParititionMetrics(ctx context.Context, userLogger log.Logger, userBucket objstore.InstrumentedBucket, userID string) { +func (c *BlocksCleaner) emitUserPartitionMetrics(ctx context.Context, userLogger log.Logger, userBucket objstore.InstrumentedBucket, userID string) { existentPartitionedGroupInfo, err := c.iterPartitionGroups(ctx, userBucket, userLogger) if err != nil { level.Warn(userLogger).Log("msg", "error listing partitioned group directory to emit metrics", "err", err) @@ -842,7 +843,7 @@ func (c *BlocksCleaner) emitUserParititionMetrics(ctx context.Context, userLogge c.inProgressCompactions.WithLabelValues(userID).Set(float64(inProgressCompactions)) if oldestPartitionGroup != nil { c.oldestPartitionGroupOffset.WithLabelValues(userID).Set(float64(time.Now().Unix() - oldestPartitionGroup.CreationTime)) - level.Debug(userLogger).Log("msg", "partition group info with oldest creation time", "partitioned_group_id", oldestPartitionGroup.PartitionedGroupID, "creation_time", oldestPartitionGroup.CreationTime) + level.Debug(userLogger).Log("msg", "partition group info with oldest creation time", "partitioned_group_id", oldestPartitionGroup.PartitionedGroupID, "creation_time", oldestPartitionGroup.CreationTimeString()) } else { c.oldestPartitionGroupOffset.WithLabelValues(userID).Set(0) } @@ -874,8 +875,9 @@ func (c *BlocksCleaner) iterPartitionGroups(ctx context.Context, userBucket objs return nil } + partitionedGroupLogger := log.With(userLogger, "partitioned_group_id", partitionedGroupInfo.PartitionedGroupID, "partitioned_group_creation_time", partitionedGroupInfo.CreationTimeString()) status := partitionedGroupInfo.getPartitionedGroupStatus(ctx, userBucket, c.compactionVisitMarkerTimeout, userLogger) - level.Debug(userLogger).Log("msg", "got partitioned group status", "partitioned_group_status", status.String()) + level.Debug(partitionedGroupLogger).Log("msg", "got partitioned group status", "partitioned_group_status", status.String()) existentPartitionedGroupInfo[partitionedGroupInfo] = struct { path string status PartitionedGroupStatus diff --git a/pkg/compactor/blocks_cleaner_test.go b/pkg/compactor/blocks_cleaner_test.go index 9b317474fa8..e5e5037db80 100644 --- a/pkg/compactor/blocks_cleaner_test.go +++ b/pkg/compactor/blocks_cleaner_test.go @@ -1256,7 +1256,7 @@ func TestBlocksCleaner_EmitUserMetrics(t *testing.T) { err = v4Manager.updateVisitMarker(ctx) require.NoError(t, err) - cleaner.emitUserParititionMetrics(ctx, logger, userBucket, userID) + cleaner.emitUserPartitionMetrics(ctx, logger, userBucket, userID) metricNames := []string{ "cortex_compactor_remaining_planned_compactions", diff --git a/pkg/compactor/compactor.go b/pkg/compactor/compactor.go index 6506bea1f94..d66797c6e13 100644 --- a/pkg/compactor/compactor.go +++ b/pkg/compactor/compactor.go @@ -762,7 +762,7 @@ func (c *Compactor) starting(ctx context.Context) error { baseScanner, _ := users.NewScanner(users.UsersScannerConfig{ Strategy: users.UserScanStrategyList, }, c.bucketClient, c.logger, c.registerer) - c.userIndexUpdater = users.NewUserIndexUpdater(c.bucketClient, c.storageCfg.UsersScanner.CleanUpInterval, baseScanner, extprom.WrapRegistererWith(prometheus.Labels{"component": "compactor"}, c.registerer)) + c.userIndexUpdater = users.NewUserIndexUpdater(c.bucketClient, c.storageCfg.UsersScanner.UpdateInterval, baseScanner, extprom.WrapRegistererWith(prometheus.Labels{"component": "compactor"}, c.registerer)) } return nil @@ -1212,7 +1212,7 @@ func (c *Compactor) userIndexUpdateLoop(ctx context.Context) { // Hardcode ID to check which compactor owns updating user index. userID := users.UserIndexCompressedFilename // Align with clean up interval. - ticker := time.NewTicker(util.DurationWithJitter(c.storageCfg.UsersScanner.CleanUpInterval, 0.1)) + ticker := time.NewTicker(util.DurationWithJitter(c.storageCfg.UsersScanner.UpdateInterval, 0.1)) defer ticker.Stop() for { diff --git a/pkg/compactor/compactor_test.go b/pkg/compactor/compactor_test.go index c8c617d4279..0538be32400 100644 --- a/pkg/compactor/compactor_test.go +++ b/pkg/compactor/compactor_test.go @@ -1580,7 +1580,7 @@ func prepare(t *testing.T, compactorCfg Config, bucketClient objstore.Instrument flagext.DefaultValues(&storageCfg) storageCfg.BucketStore.BlockDiscoveryStrategy = string(cortex_tsdb.RecursiveDiscovery) storageCfg.UsersScanner.Strategy = users.UserScanStrategyUserIndex - storageCfg.UsersScanner.CleanUpInterval = 100 * time.Millisecond // Short interval for testing + storageCfg.UsersScanner.UpdateInterval = 100 * time.Millisecond // Short interval for testing // Create a temporary directory for compactor data. compactorCfg.DataDir = t.TempDir() diff --git a/pkg/compactor/partition_compaction_grouper.go b/pkg/compactor/partition_compaction_grouper.go index 4f1d955bfc6..839cd74e2a9 100644 --- a/pkg/compactor/partition_compaction_grouper.go +++ b/pkg/compactor/partition_compaction_grouper.go @@ -171,7 +171,7 @@ func (g *PartitionCompactionGrouper) generateCompactionJobs(blocks map[ulid.ULID var blockIDs []string for _, p := range existingPartitionedGroups { blockIDs = p.getAllBlockIDs() - level.Info(g.logger).Log("msg", "existing partitioned group", "partitioned_group_id", p.PartitionedGroupID, "partition_count", p.PartitionCount, "rangeStart", p.rangeStartTime().String(), "rangeEnd", p.rangeEndTime().String(), "blocks", strings.Join(blockIDs, ",")) + level.Info(g.logger).Log("msg", "existing partitioned group", "partitioned_group_id", p.PartitionedGroupID, "partitioned_group_creation_time", p.CreationTimeString(), "partition_count", p.PartitionCount, "rangeStart", p.rangeStartTime().String(), "rangeEnd", p.rangeEndTime().String(), "blocks", strings.Join(blockIDs, ",")) } allPartitionedGroup, err := g.generatePartitionedGroups(blocks, groups, existingPartitionedGroups, timeRanges) @@ -181,13 +181,13 @@ func (g *PartitionCompactionGrouper) generateCompactionJobs(blocks map[ulid.ULID g.sortPartitionedGroups(allPartitionedGroup) for _, p := range allPartitionedGroup { blockIDs = p.getAllBlockIDs() - level.Info(g.logger).Log("msg", "partitioned group ready for compaction", "partitioned_group_id", p.PartitionedGroupID, "partition_count", p.PartitionCount, "rangeStart", p.rangeStartTime().String(), "rangeEnd", p.rangeEndTime().String(), "blocks", strings.Join(blockIDs, ",")) + level.Info(g.logger).Log("msg", "partitioned group ready for compaction", "partitioned_group_id", p.PartitionedGroupID, "partitioned_group_creation_time", p.CreationTimeString(), "partition_count", p.PartitionCount, "rangeStart", p.rangeStartTime().String(), "rangeEnd", p.rangeEndTime().String(), "blocks", strings.Join(blockIDs, ",")) } partitionCompactionJobs := g.generatePartitionCompactionJobs(blocks, allPartitionedGroup, g.doRandomPick) for _, p := range partitionCompactionJobs { blockIDs = p.getBlockIDs() - level.Info(g.logger).Log("msg", "partitioned compaction job", "partitioned_group_id", p.partitionedGroupInfo.PartitionedGroupID, "partition_id", p.partition.PartitionID, "partition_count", p.partitionedGroupInfo.PartitionCount, "rangeStart", p.rangeStartTime().String(), "rangeEnd", p.rangeEndTime().String(), "blocks", strings.Join(blockIDs, ",")) + level.Info(g.logger).Log("msg", "partitioned compaction job", "partitioned_group_id", p.partitionedGroupInfo.PartitionedGroupID, "partitioned_group_creation_time", p.partitionedGroupInfo.CreationTimeString(), "partition_id", p.partition.PartitionID, "partition_count", p.partitionedGroupInfo.PartitionCount, "rangeStart", p.rangeStartTime().String(), "rangeEnd", p.rangeEndTime().String(), "blocks", strings.Join(blockIDs, ",")) } return partitionCompactionJobs, nil } @@ -582,10 +582,10 @@ func (g *PartitionCompactionGrouper) generatePartitionCompactionJobs(blocks map[ partition := partitionedGroupInfo.Partitions[i] if len(partition.Blocks) == 1 { partition.Blocks = append(partition.Blocks, DUMMY_BLOCK_ID) - level.Info(g.logger).Log("msg", "handled single block in partition", "partitioned_group_id", partitionedGroupInfo.PartitionedGroupID, "partition_count", partitionedGroupInfo.PartitionCount, "partition_id", partition.PartitionID) + level.Info(g.logger).Log("msg", "handled single block in partition", "partitioned_group_id", partitionedGroupInfo.PartitionedGroupID, "partitioned_group_creation_time", partitionedGroupInfo.CreationTimeString(), "partition_count", partitionedGroupInfo.PartitionCount, "partition_id", partition.PartitionID) } else if len(partition.Blocks) < 1 { if err := g.handleEmptyPartition(partitionedGroupInfo, partition); err != nil { - level.Warn(g.logger).Log("msg", "failed to handle empty partition", "partitioned_group_id", partitionedGroupInfo.PartitionedGroupID, "partition_count", partitionedGroupInfo.PartitionCount, "partition_id", partition.PartitionID, "err", err) + level.Warn(g.logger).Log("msg", "failed to handle empty partition", "partitioned_group_id", partitionedGroupInfo.PartitionedGroupID, "partitioned_group_creation_time", partitionedGroupInfo.CreationTimeString(), "partition_count", partitionedGroupInfo.PartitionCount, "partition_id", partition.PartitionID, "err", err) } continue } @@ -609,7 +609,7 @@ func (g *PartitionCompactionGrouper) handleEmptyPartition(partitionedGroupInfo * return nil } - level.Info(g.logger).Log("msg", "handling empty block partition", "partitioned_group_id", partitionedGroupInfo.PartitionedGroupID, "partition_count", partitionedGroupInfo.PartitionCount, "partition_id", partition.PartitionID) + level.Info(g.logger).Log("msg", "handling empty block partition", "partitioned_group_id", partitionedGroupInfo.PartitionedGroupID, "partitioned_group_creation_time", partitionedGroupInfo.CreationTimeString(), "partition_count", partitionedGroupInfo.PartitionCount, "partition_id", partition.PartitionID) visitMarker := &partitionVisitMarker{ PartitionedGroupID: partitionedGroupInfo.PartitionedGroupID, PartitionID: partition.PartitionID, @@ -618,7 +618,7 @@ func (g *PartitionCompactionGrouper) handleEmptyPartition(partitionedGroupInfo * visitMarkerManager := NewVisitMarkerManager(g.bkt, g.logger, g.ringLifecyclerID, visitMarker) visitMarkerManager.MarkWithStatus(g.ctx, Completed) - level.Info(g.logger).Log("msg", "handled empty block in partition", "partitioned_group_id", partitionedGroupInfo.PartitionedGroupID, "partition_count", partitionedGroupInfo.PartitionCount, "partition_id", partition.PartitionID) + level.Info(g.logger).Log("msg", "handled empty block in partition", "partitioned_group_id", partitionedGroupInfo.PartitionedGroupID, "partitioned_group_creation_time", partitionedGroupInfo.CreationTimeString(), "partition_count", partitionedGroupInfo.PartitionCount, "partition_id", partition.PartitionID) return nil } @@ -720,7 +720,7 @@ func (g *PartitionCompactionGrouper) pickPartitionCompactionJob(partitionCompact for _, p := range outGroups { partitionInfo, err := tsdb.ConvertToPartitionInfo(p.Extensions()) if err == nil && partitionInfo != nil { - level.Info(g.logger).Log("msg", "picked compaction job", "partitioned_group_id", partitionInfo.PartitionedGroupID, "partition_count", partitionInfo.PartitionCount) + level.Info(g.logger).Log("msg", "picked compaction job", "partitioned_group_id", partitionInfo.PartitionedGroupID, "partitioned_group_creation_time", partitionInfo.CreationTimeString(), "partition_count", partitionInfo.PartitionCount) } } return outGroups diff --git a/pkg/compactor/partitioned_group_info.go b/pkg/compactor/partitioned_group_info.go index 9d9d1fd7859..a43bb262301 100644 --- a/pkg/compactor/partitioned_group_info.go +++ b/pkg/compactor/partitioned_group_info.go @@ -120,6 +120,7 @@ func (p *PartitionedGroupInfo) getPartitionedGroupStatus( partitionVisitMarkerTimeout time.Duration, userLogger log.Logger, ) PartitionedGroupStatus { + partitionedGroupLogger := log.With(userLogger, "partitioned_group_id", p.PartitionedGroupID, "partitioned_group_creation_time", p.CreationTimeString()) status := PartitionedGroupStatus{ PartitionedGroupID: p.PartitionedGroupID, CanDelete: false, @@ -136,13 +137,13 @@ func (p *PartitionedGroupInfo) getPartitionedGroupStatus( PartitionedGroupID: p.PartitionedGroupID, PartitionID: partition.PartitionID, } - visitMarkerManager := NewVisitMarkerManager(userBucket, userLogger, "PartitionedGroupInfo.getPartitionedGroupStatus", visitMarker) + visitMarkerManager := NewVisitMarkerManager(userBucket, partitionedGroupLogger, "PartitionedGroupInfo.getPartitionedGroupStatus", visitMarker) partitionVisitMarkerExists := true if err := visitMarkerManager.ReadVisitMarker(ctx, visitMarker); err != nil { if errors.Is(err, errorVisitMarkerNotFound) { partitionVisitMarkerExists = false } else { - level.Warn(userLogger).Log("msg", "unable to read partition visit marker", "path", visitMarker.GetVisitMarkerFilePath(), "err", err) + level.Warn(partitionedGroupLogger).Log("msg", "unable to read partition visit marker", "path", visitMarker.GetVisitMarkerFilePath(), "err", err) return status } } @@ -183,20 +184,20 @@ func (p *PartitionedGroupInfo) getPartitionedGroupStatus( if _, ok := checkedBlocks[blockID]; ok { continue } - if !p.doesBlockExist(ctx, userBucket, userLogger, blockID) { - level.Info(userLogger).Log("msg", "delete partitioned group", "reason", "block is physically deleted", "block", blockID) + if !p.doesBlockExist(ctx, userBucket, partitionedGroupLogger, blockID) { + level.Info(partitionedGroupLogger).Log("msg", "delete partitioned group", "reason", "block is physically deleted", "block", blockID) status.CanDelete = true status.DeleteVisitMarker = true return status } - if p.isBlockDeleted(ctx, userBucket, userLogger, blockID) { - level.Info(userLogger).Log("msg", "delete partitioned group", "reason", "block is marked for deletion", "block", blockID) + if p.isBlockDeleted(ctx, userBucket, partitionedGroupLogger, blockID) { + level.Info(partitionedGroupLogger).Log("msg", "delete partitioned group", "reason", "block is marked for deletion", "block", blockID) status.CanDelete = true status.DeleteVisitMarker = true return status } - if p.isBlockNoCompact(ctx, userBucket, userLogger, blockID) { - level.Info(userLogger).Log("msg", "delete partitioned group", "reason", "block is marked for no compact", "block", blockID) + if p.isBlockNoCompact(ctx, userBucket, partitionedGroupLogger, blockID) { + level.Info(partitionedGroupLogger).Log("msg", "delete partitioned group", "reason", "block is marked for no compact", "block", blockID) status.CanDelete = true status.DeleteVisitMarker = true return status @@ -207,28 +208,28 @@ func (p *PartitionedGroupInfo) getPartitionedGroupStatus( return status } -func (p *PartitionedGroupInfo) doesBlockExist(ctx context.Context, userBucket objstore.InstrumentedBucket, userLogger log.Logger, blockID ulid.ULID) bool { +func (p *PartitionedGroupInfo) doesBlockExist(ctx context.Context, userBucket objstore.InstrumentedBucket, partitionedGroupLogger log.Logger, blockID ulid.ULID) bool { metaExists, err := userBucket.Exists(ctx, path.Join(blockID.String(), metadata.MetaFilename)) if err != nil { - level.Warn(userLogger).Log("msg", "unable to get stats of meta.json for block", "partitioned_group_id", p.PartitionedGroupID, "block", blockID.String()) + level.Warn(partitionedGroupLogger).Log("msg", "unable to get stats of meta.json for block", "block", blockID.String()) return true } return metaExists } -func (p *PartitionedGroupInfo) isBlockDeleted(ctx context.Context, userBucket objstore.InstrumentedBucket, userLogger log.Logger, blockID ulid.ULID) bool { +func (p *PartitionedGroupInfo) isBlockDeleted(ctx context.Context, userBucket objstore.InstrumentedBucket, partitionedGroupLogger log.Logger, blockID ulid.ULID) bool { deletionMarkerExists, err := userBucket.Exists(ctx, path.Join(blockID.String(), metadata.DeletionMarkFilename)) if err != nil { - level.Warn(userLogger).Log("msg", "unable to get stats of deletion-mark.json for block", "partitioned_group_id", p.PartitionedGroupID, "block", blockID.String()) + level.Warn(partitionedGroupLogger).Log("msg", "unable to get stats of deletion-mark.json for block", "block", blockID.String()) return false } return deletionMarkerExists } -func (p *PartitionedGroupInfo) isBlockNoCompact(ctx context.Context, userBucket objstore.InstrumentedBucket, userLogger log.Logger, blockID ulid.ULID) bool { +func (p *PartitionedGroupInfo) isBlockNoCompact(ctx context.Context, userBucket objstore.InstrumentedBucket, partitionedGroupLogger log.Logger, blockID ulid.ULID) bool { noCompactMarkerExists, err := userBucket.Exists(ctx, path.Join(blockID.String(), metadata.NoCompactMarkFilename)) if err != nil { - level.Warn(userLogger).Log("msg", "unable to get stats of no-compact-mark.json for block", "partitioned_group_id", p.PartitionedGroupID, "block", blockID.String()) + level.Warn(partitionedGroupLogger).Log("msg", "unable to get stats of no-compact-mark.json for block", "block", blockID.String()) return false } return noCompactMarkerExists @@ -237,17 +238,18 @@ func (p *PartitionedGroupInfo) isBlockNoCompact(ctx context.Context, userBucket func (p *PartitionedGroupInfo) markAllBlocksForDeletion(ctx context.Context, userBucket objstore.InstrumentedBucket, userLogger log.Logger, blocksMarkedForDeletion *prometheus.CounterVec, userID string) (int, error) { blocks := p.getAllBlocks() deleteBlocksCount := 0 + partitionedGroupLogger := log.With(userLogger, "partitioned_group_id", p.PartitionedGroupID, "partitioned_group_creation_time", p.CreationTimeString()) defer func() { - level.Info(userLogger).Log("msg", "total number of blocks marked for deletion during partitioned group info clean up", "count", deleteBlocksCount) + level.Info(partitionedGroupLogger).Log("msg", "total number of blocks marked for deletion during partitioned group info clean up", "count", deleteBlocksCount) }() for _, blockID := range blocks { - if p.doesBlockExist(ctx, userBucket, userLogger, blockID) && !p.isBlockDeleted(ctx, userBucket, userLogger, blockID) && !p.isBlockNoCompact(ctx, userBucket, userLogger, blockID) { - if err := block.MarkForDeletion(ctx, userLogger, userBucket, blockID, "delete block during partitioned group completion check", blocksMarkedForDeletion.WithLabelValues(userID, reasonValueRetention)); err != nil { - level.Warn(userLogger).Log("msg", "unable to mark block for deletion", "partitioned_group_id", p.PartitionedGroupID, "block", blockID.String()) + if p.doesBlockExist(ctx, userBucket, partitionedGroupLogger, blockID) && !p.isBlockDeleted(ctx, userBucket, partitionedGroupLogger, blockID) && !p.isBlockNoCompact(ctx, userBucket, partitionedGroupLogger, blockID) { + if err := block.MarkForDeletion(ctx, partitionedGroupLogger, userBucket, blockID, "delete block during partitioned group completion check", blocksMarkedForDeletion.WithLabelValues(userID, reasonValueRetention)); err != nil { + level.Warn(partitionedGroupLogger).Log("msg", "unable to mark block for deletion", "block", blockID.String()) return deleteBlocksCount, err } deleteBlocksCount++ - level.Debug(userLogger).Log("msg", "marked block for deletion during partitioned group info clean up", "partitioned_group_id", p.PartitionedGroupID, "block", blockID.String()) + level.Debug(partitionedGroupLogger).Log("msg", "marked block for deletion during partitioned group info clean up", "block", blockID.String()) } } return deleteBlocksCount, nil @@ -258,7 +260,11 @@ func (p *PartitionedGroupInfo) String() string { for _, partition := range p.Partitions { partitions = append(partitions, fmt.Sprintf("(PartitionID: %d, Blocks: %s)", partition.PartitionID, partition.Blocks)) } - return fmt.Sprintf("{PartitionedGroupID: %d, PartitionCount: %d, Partitions: %s}", p.PartitionedGroupID, p.PartitionCount, strings.Join(partitions, ", ")) + return fmt.Sprintf("{PartitionedGroupID: %d, CreationTime: %s, PartitionCount: %d, Partitions: %s}", p.PartitionedGroupID, p.CreationTimeString(), p.PartitionCount, strings.Join(partitions, ", ")) +} + +func (p *PartitionedGroupInfo) CreationTimeString() string { + return time.Unix(p.CreationTime, 0).Format(time.RFC3339) } func GetPartitionedGroupFile(partitionedGroupID uint32) string { @@ -304,7 +310,7 @@ func UpdatePartitionedGroupInfo(ctx context.Context, bkt objstore.InstrumentedBu // partitioned group info which is supposed to be the correct grouping based on latest bucket store. existingPartitionedGroup, _ := ReadPartitionedGroupInfo(ctx, bkt, logger, partitionedGroupInfo.PartitionedGroupID) if existingPartitionedGroup != nil { - level.Warn(logger).Log("msg", "partitioned group info already exists", "partitioned_group_id", partitionedGroupInfo.PartitionedGroupID) + level.Warn(logger).Log("msg", "partitioned group info already exists", "partitioned_group_id", partitionedGroupInfo.PartitionedGroupID, "partitioned_group_creation_time", partitionedGroupInfo.CreationTimeString()) return existingPartitionedGroup, nil } if partitionedGroupInfo.CreationTime <= 0 { @@ -319,6 +325,6 @@ func UpdatePartitionedGroupInfo(ctx context.Context, bkt objstore.InstrumentedBu if err := bkt.Upload(ctx, partitionedGroupFile, reader); err != nil { return nil, err } - level.Info(logger).Log("msg", "created new partitioned group info", "partitioned_group_id", partitionedGroupInfo.PartitionedGroupID) + level.Info(logger).Log("msg", "created new partitioned group info", "partitioned_group_id", partitionedGroupInfo.PartitionedGroupID, "partitioned_group_creation_time", partitionedGroupInfo.CreationTimeString()) return &partitionedGroupInfo, nil } diff --git a/pkg/compactor/sharded_block_populator.go b/pkg/compactor/sharded_block_populator.go index a8d4228d13b..c7f8932367e 100644 --- a/pkg/compactor/sharded_block_populator.go +++ b/pkg/compactor/sharded_block_populator.go @@ -52,6 +52,7 @@ func (c ShardedBlockPopulator) PopulateBlock(ctx context.Context, metrics *tsdb. }() metrics.PopulatingBlocks.Set(1) + begin := time.Now() globalMaxt := blocks[0].Meta().MaxTime g, gCtx := errgroup.WithContext(ctx) g.SetLimit(8) @@ -91,7 +92,7 @@ func (c ShardedBlockPopulator) PopulateBlock(ctx context.Context, metrics *tsdb. if err != nil { return err } - level.Debug(c.logger).Log("msg", "finished sharding", "duration", time.Since(shardStart)) + level.Debug(c.logger).Log("msg", "finished sharding", "duration", time.Since(shardStart), "duration_ms", time.Since(shardStart).Milliseconds()) // Blocks meta is half open: [min, max), so subtract 1 to ensure we don't hold samples with exact meta.MaxTime timestamp. setsMtx.Lock() sets = append(sets, tsdb.NewBlockChunkSeriesSet(meta.ULID, indexr, chunkr, tombsr, shardedPosting, meta.MinTime, meta.MaxTime-1, false)) @@ -103,7 +104,9 @@ func (c ShardedBlockPopulator) PopulateBlock(ctx context.Context, metrics *tsdb. if err := g.Wait(); err != nil { return err } + level.Info(c.logger).Log("msg", "finished sharding all blocks and created series sets", "series_sets_count", len(sets), "symbols_count", len(symbols), "duration", time.Since(begin), "duration_ms", time.Since(begin).Milliseconds()) + begin = time.Now() symbolsList := make([]string, len(symbols)) symbolIdx := 0 for symbol := range symbols { @@ -116,6 +119,7 @@ func (c ShardedBlockPopulator) PopulateBlock(ctx context.Context, metrics *tsdb. return errors.Wrap(err, "add symbol") } } + level.Info(c.logger).Log("msg", "finished sorting symbols and added to index", "symbols_count", len(symbols), "duration", time.Since(begin), "duration_ms", time.Since(begin).Milliseconds()) var ( ref = storage.SeriesRef(0) @@ -131,6 +135,8 @@ func (c ShardedBlockPopulator) PopulateBlock(ctx context.Context, metrics *tsdb. defer cancel() } + begin = time.Now() + seriesCount := 0 go func() { // Iterate over all sorted chunk series. for set.Next() { @@ -173,6 +179,7 @@ func (c ShardedBlockPopulator) PopulateBlock(ctx context.Context, metrics *tsdb. if err := indexw.AddSeries(r, s.Labels(), chks...); err != nil { return errors.Wrap(err, "add series") } + seriesCount++ meta.Stats.NumChunks += uint64(len(chks)) meta.Stats.NumSeries++ @@ -209,6 +216,7 @@ func (c ShardedBlockPopulator) PopulateBlock(ctx context.Context, metrics *tsdb. return err } } + level.Info(c.logger).Log("msg", "finished iterating all series sets", "series_sets_count", len(sets), "series_count", seriesCount, "duration", time.Since(begin), "duration_ms", time.Since(begin).Milliseconds()) if set.Err() != nil { return errors.Wrap(set.Err(), "iterate compaction set") diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index acb8bac27a8..adbb136f155 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -11,6 +11,7 @@ import ( "os" "path/filepath" "runtime" + "runtime/pprof" "slices" "strings" "sync" @@ -1780,6 +1781,14 @@ func (i *Ingester) QueryExemplars(ctx context.Context, req *client.ExemplarQuery return nil, err } + // Set pprof labels for profiling + pprof.Do(ctx, pprof.Labels("user", userID), func(ctx context.Context) { + resp, err = i.queryExemplars(ctx, userID, req) + }) + return resp, err +} + +func (i *Ingester) queryExemplars(ctx context.Context, userID string, req *client.ExemplarQueryRequest) (*client.ExemplarQueryResponse, error) { from, through, matchers, err := client.FromExemplarQueryRequest(i.matchersCache, req) if err != nil { return nil, err @@ -1836,33 +1845,55 @@ func (i *Ingester) QueryExemplars(ctx context.Context, req *client.ExemplarQuery // LabelValues returns all label values that are associated with a given label name. func (i *Ingester) LabelValues(ctx context.Context, req *client.LabelValuesRequest) (resp *client.LabelValuesResponse, err error) { defer recoverIngester(i.logger, &err) - resp, cleanup, err := i.labelsValuesCommon(ctx, req) - defer cleanup() + + userID, userErr := users.TenantID(ctx) + if userErr != nil { + return nil, userErr + } + + // Set pprof labels for profiling + pprof.Do(ctx, pprof.Labels("user", userID), func(ctx context.Context) { + var cleanup func() + resp, cleanup, err = i.labelsValuesCommon(ctx, req) + defer cleanup() + }) return resp, err } // LabelValuesStream returns all label values that are associated with a given label name. func (i *Ingester) LabelValuesStream(req *client.LabelValuesRequest, stream client.Ingester_LabelValuesStreamServer) (err error) { defer recoverIngester(i.logger, &err) - resp, cleanup, err := i.labelsValuesCommon(stream.Context(), req) - defer cleanup() - if err != nil { - return err + ctx := stream.Context() + userID, userErr := users.TenantID(ctx) + if userErr != nil { + return userErr } - for i := 0; i < len(resp.LabelValues); i += metadataStreamBatchSize { - j := min(i+metadataStreamBatchSize, len(resp.LabelValues)) - resp := &client.LabelValuesStreamResponse{ - LabelValues: resp.LabelValues[i:j], - } - err := client.SendLabelValuesStream(stream, resp) + // Set pprof labels for profiling + pprof.Do(ctx, pprof.Labels("user", userID), func(ctx context.Context) { + var resp *client.LabelValuesResponse + var cleanup func() + resp, cleanup, err = i.labelsValuesCommon(ctx, req) + defer cleanup() + if err != nil { - return err + return } - } - return nil + for i := 0; i < len(resp.LabelValues); i += metadataStreamBatchSize { + j := min(i+metadataStreamBatchSize, len(resp.LabelValues)) + resp := &client.LabelValuesStreamResponse{ + LabelValues: resp.LabelValues[i:j], + } + err = client.SendLabelValuesStream(stream, resp) + if err != nil { + return + } + } + }) + + return err } // labelsValuesCommon returns all label values that are associated with a given label name. @@ -1930,33 +1961,55 @@ func (i *Ingester) labelsValuesCommon(ctx context.Context, req *client.LabelValu // LabelNames return all the label names. func (i *Ingester) LabelNames(ctx context.Context, req *client.LabelNamesRequest) (resp *client.LabelNamesResponse, err error) { defer recoverIngester(i.logger, &err) - resp, cleanup, err := i.labelNamesCommon(ctx, req) - defer cleanup() + + userID, userErr := users.TenantID(ctx) + if userErr != nil { + return nil, userErr + } + + // Set pprof labels for profiling + pprof.Do(ctx, pprof.Labels("user", userID), func(ctx context.Context) { + var cleanup func() + resp, cleanup, err = i.labelNamesCommon(ctx, req) + defer cleanup() + }) return resp, err } // LabelNamesStream return all the label names. func (i *Ingester) LabelNamesStream(req *client.LabelNamesRequest, stream client.Ingester_LabelNamesStreamServer) (err error) { defer recoverIngester(i.logger, &err) - resp, cleanup, err := i.labelNamesCommon(stream.Context(), req) - defer cleanup() - if err != nil { - return err + ctx := stream.Context() + userID, userErr := users.TenantID(ctx) + if userErr != nil { + return userErr } - for i := 0; i < len(resp.LabelNames); i += metadataStreamBatchSize { - j := min(i+metadataStreamBatchSize, len(resp.LabelNames)) - resp := &client.LabelNamesStreamResponse{ - LabelNames: resp.LabelNames[i:j], - } - err = client.SendLabelNamesStream(stream, resp) + // Set pprof labels for profiling + pprof.Do(ctx, pprof.Labels("user", userID), func(ctx context.Context) { + var resp *client.LabelNamesResponse + var cleanup func() + resp, cleanup, err = i.labelNamesCommon(ctx, req) + defer cleanup() + if err != nil { - return err + return } - } - return nil + for i := 0; i < len(resp.LabelNames); i += metadataStreamBatchSize { + j := min(i+metadataStreamBatchSize, len(resp.LabelNames)) + resp := &client.LabelNamesStreamResponse{ + LabelNames: resp.LabelNames[i:j], + } + err = client.SendLabelNamesStream(stream, resp) + if err != nil { + return + } + } + }) + + return err } // labelNamesCommon return all the label names. @@ -2024,49 +2077,70 @@ func (i *Ingester) labelNamesCommon(ctx context.Context, req *client.LabelNamesR // MetricsForLabelMatchers returns all the metrics which match a set of matchers. func (i *Ingester) MetricsForLabelMatchers(ctx context.Context, req *client.MetricsForLabelMatchersRequest) (result *client.MetricsForLabelMatchersResponse, err error) { defer recoverIngester(i.logger, &err) - result = &client.MetricsForLabelMatchersResponse{} - cleanup, err := i.metricsForLabelMatchersCommon(ctx, req, func(l labels.Labels) error { - result.Metric = append(result.Metric, &cortexpb.Metric{ - Labels: cortexpb.FromLabelsToLabelAdapters(l), + + userID, userErr := users.TenantID(ctx) + if userErr != nil { + return nil, userErr + } + + // Set pprof labels for profiling + pprof.Do(ctx, pprof.Labels("user", userID), func(ctx context.Context) { + result = &client.MetricsForLabelMatchersResponse{} + var cleanup func() + cleanup, err = i.metricsForLabelMatchersCommon(ctx, req, func(l labels.Labels) error { + result.Metric = append(result.Metric, &cortexpb.Metric{ + Labels: cortexpb.FromLabelsToLabelAdapters(l), + }) + return nil }) - return nil + defer cleanup() }) - defer cleanup() return result, err } func (i *Ingester) MetricsForLabelMatchersStream(req *client.MetricsForLabelMatchersRequest, stream client.Ingester_MetricsForLabelMatchersStreamServer) (err error) { defer recoverIngester(i.logger, &err) - result := &client.MetricsForLabelMatchersStreamResponse{} - cleanup, err := i.metricsForLabelMatchersCommon(stream.Context(), req, func(l labels.Labels) error { - result.Metric = append(result.Metric, &cortexpb.Metric{ - Labels: cortexpb.FromLabelsToLabelAdapters(l), + ctx := stream.Context() + userID, userErr := users.TenantID(ctx) + if userErr != nil { + return userErr + } + + // Set pprof labels for profiling + pprof.Do(ctx, pprof.Labels("user", userID), func(ctx context.Context) { + result := &client.MetricsForLabelMatchersStreamResponse{} + + var cleanup func() + cleanup, err = i.metricsForLabelMatchersCommon(ctx, req, func(l labels.Labels) error { + result.Metric = append(result.Metric, &cortexpb.Metric{ + Labels: cortexpb.FromLabelsToLabelAdapters(l), + }) + + if len(result.Metric) >= metadataStreamBatchSize { + err := client.SendMetricsForLabelMatchersStream(stream, result) + if err != nil { + return err + } + result.Metric = result.Metric[:0] + } + return nil }) + defer cleanup() + if err != nil { + return + } - if len(result.Metric) >= metadataStreamBatchSize { - err := client.SendMetricsForLabelMatchersStream(stream, result) + // Send last batch + if len(result.Metric) > 0 { + err = client.SendMetricsForLabelMatchersStream(stream, result) if err != nil { - return err + return } - result.Metric = result.Metric[:0] } - return nil }) - defer cleanup() - if err != nil { - return err - } - - // Send last batch - if len(result.Metric) > 0 { - err = client.SendMetricsForLabelMatchersStream(stream, result) - if err != nil { - return err - } - } - return nil + return err } // metricsForLabelMatchersCommon returns all the metrics which match a set of matchers. @@ -2160,7 +2234,7 @@ func (i *Ingester) metricsForLabelMatchersCommon(ctx context.Context, req *clien } // MetricsMetadata returns all the metric metadata of a user. -func (i *Ingester) MetricsMetadata(ctx context.Context, req *client.MetricsMetadataRequest) (*client.MetricsMetadataResponse, error) { +func (i *Ingester) MetricsMetadata(ctx context.Context, req *client.MetricsMetadataRequest) (resp *client.MetricsMetadataResponse, err error) { i.stoppedMtx.RLock() if err := i.checkRunningOrStopping(); err != nil { i.stoppedMtx.RUnlock() @@ -2173,13 +2247,19 @@ func (i *Ingester) MetricsMetadata(ctx context.Context, req *client.MetricsMetad return nil, err } - userMetadata := i.getUserMetadata(userID) + // Set pprof labels for profiling + pprof.Do(ctx, pprof.Labels("user", userID), func(ctx context.Context) { + userMetadata := i.getUserMetadata(userID) - if userMetadata == nil { - return &client.MetricsMetadataResponse{}, nil - } + if userMetadata == nil { + resp = &client.MetricsMetadataResponse{} + return + } + + resp = &client.MetricsMetadataResponse{Metadata: userMetadata.toClientMetadata(req)} + }) - return &client.MetricsMetadataResponse{Metadata: userMetadata.toClientMetadata(req)}, nil + return resp, nil } // CheckReady is the readiness handler used to indicate to k8s when the ingesters @@ -2309,6 +2389,15 @@ func (i *Ingester) QueryStream(req *client.QueryRequest, stream client.Ingester_ return err } + // Set pprof labels for profiling + pprof.Do(ctx, pprof.Labels("user", userID), func(ctx context.Context) { + err = i.queryStream(ctx, userID, req, stream, spanlog) + }) + + return err +} + +func (i *Ingester) queryStream(ctx context.Context, userID string, req *client.QueryRequest, stream client.Ingester_QueryStreamServer, spanlog *spanlogger.SpanLogger) error { from, through, matchers, err := client.FromQueryRequest(i.matchersCache, req) if err != nil { return err diff --git a/pkg/ruler/ruler.go b/pkg/ruler/ruler.go index 7c9cf64ac74..a965b33f39d 100644 --- a/pkg/ruler/ruler.go +++ b/pkg/ruler/ruler.go @@ -231,7 +231,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { //lint:ignore faillint Need to pass the global logger like this for warning on deprecated methods flagext.DeprecatedFlag(f, "ruler.alertmanager-use-v2", "This flag is no longer functional. V1 API is deprecated and removed", util_log.Logger) - f.StringVar(&cfg.FrontendAddress, "ruler.frontend-address", "", "[Experimental] GRPC listen address of the Query Frontend, in host:port format. If set, Ruler queries to Query Frontends via gRPC. If not set, ruler queries to Ingesters directly.") + f.StringVar(&cfg.FrontendAddress, "ruler.frontend-address", "", "[Experimental] gRPC address of the Query Frontend (host:port). If set, the Ruler send queries to the Query Frontend to utilize splitting and caching, at the cost of additional network hops compared to direct querying to Ingesters and Store Gateway.") f.StringVar(&cfg.QueryResponseFormat, "ruler.query-response-format", queryResponseFormatProtobuf, fmt.Sprintf("[Experimental] Query response format to get query results from Query Frontend when the rule evaluation. It will only take effect when `-ruler.frontend-address` is configured. Supported values: %s", strings.Join(supportedQueryResponseFormats, ","))) cfg.ExternalURL.URL, _ = url.Parse("") // Must be non-nil f.Var(&cfg.ExternalURL, "ruler.external.url", "URL of alerts return path.") diff --git a/pkg/ruler/rulestore/bucketclient/bucket_client.go b/pkg/ruler/rulestore/bucketclient/bucket_client.go index 00128672be7..62f6903275f 100644 --- a/pkg/ruler/rulestore/bucketclient/bucket_client.go +++ b/pkg/ruler/rulestore/bucketclient/bucket_client.go @@ -65,7 +65,7 @@ func NewBucketRuleStore(bkt objstore.Bucket, userScannerCfg users.UsersScannerCo baseScanner, _ := users.NewScanner(users.UsersScannerConfig{ Strategy: users.UserScanStrategyList, }, rulesBucket, logger, regWithComponent) - userIndexUpdater = users.NewUserIndexUpdater(rulesBucket, userScannerCfg.CleanUpInterval, baseScanner, regWithComponent) + userIndexUpdater = users.NewUserIndexUpdater(rulesBucket, userScannerCfg.UpdateInterval, baseScanner, regWithComponent) } return &BucketRuleStore{ diff --git a/pkg/scheduler/queue/queue_test.go b/pkg/scheduler/queue/queue_test.go index 47bd5295271..665ead376e5 100644 --- a/pkg/scheduler/queue/queue_test.go +++ b/pkg/scheduler/queue/queue_test.go @@ -285,6 +285,60 @@ func TestQueriersShouldGetHighPriorityQueryFirst(t *testing.T) { assert.Equal(t, highPriorityRequest, nextRequest) // high priority request returned, although it was enqueued the last } +func TestGetOrAddQueue_ShouldNotDeadlockWhenLimitsAreReduced(t *testing.T) { + // Setup: Large initial limit + initialLimit := 100 + newLimit := 50 + + limits := MockLimits{ + MaxOutstanding: initialLimit, + } + + // Initialize queues + q := newUserQueues(0, limits, nil) + + // Create user queue + userID := "test-user-deadlock" + queue := q.getOrAddQueue(userID, 1) + + // Fill queue to capacity (near initialLimit) + // We fill it more than newLimit + itemsToFill := 80 + for range itemsToFill { + queue.enqueueRequest(MockRequest{priority: 1}) + } + + require.Equal(t, itemsToFill, queue.length()) + + // Reduce limit below current size + // We change the mock limits return value. + // In real app this comes from runtime config reload. + limits.MaxOutstanding = newLimit + q.limits = limits // Update strict reference in queues struct (mocking the reload effect) + + // Now call getOrAddQueue again. + // This triggers the migration logic: existing queue (80 items) -> new queue (cap 50). + done := make(chan struct{}) + go func() { + _ = q.getOrAddQueue(userID, 1) + close(done) + }() + + select { + case <-done: + // Success: no deadlock + case <-time.After(2 * time.Second): + t.Fatal("Deadlock detected! getOrAddQueue timed out while migrating queue with reduced limits.") + } + + // The new queue should be capped at newLimit or contain what managed to fit. + // Logic: it breaks when full. So new queue should be full (length == newLimit). + newQueue := q.getOrAddQueue(userID, 1) // Should be fast now + + // Note: The actual items in queue should be newLimit (50). The rest (30) are dropped. + assert.Equal(t, newLimit, newQueue.length()) +} + func TestReservedQueriersShouldOnlyGetHighPriorityQueries(t *testing.T) { queue := NewRequestQueue(0, prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"user", "priority", "type"}), diff --git a/pkg/scheduler/queue/user_queues.go b/pkg/scheduler/queue/user_queues.go index c7e30d87375..eaed52ad2e4 100644 --- a/pkg/scheduler/queue/user_queues.go +++ b/pkg/scheduler/queue/user_queues.go @@ -169,7 +169,7 @@ func (q *queues) getOrAddQueue(userID string, maxQueriers int) userRequestQueue tmpQueue := q.createUserRequestQueue(userID) // flush to new queue - for uq.queue.length() > 0 { + for (uq.queue.length() > 0) && (tmpQueue.length() < maxOutstanding) { tmpQueue.enqueueRequest(uq.queue.dequeueRequest(0, false)) } diff --git a/pkg/storage/tsdb/meta_extensions.go b/pkg/storage/tsdb/meta_extensions.go index b6b8a7acf07..970c632839d 100644 --- a/pkg/storage/tsdb/meta_extensions.go +++ b/pkg/storage/tsdb/meta_extensions.go @@ -3,6 +3,7 @@ package tsdb import ( "fmt" "strconv" + "time" "github.com/thanos-io/thanos/pkg/block/metadata" ) @@ -19,6 +20,10 @@ type PartitionInfo struct { PartitionedGroupCreationTime int64 `json:"partitioned_group_creation_time"` } +func (p *PartitionInfo) CreationTimeString() string { + return time.Unix(p.PartitionedGroupCreationTime, 0).Format(time.RFC3339) +} + var ( DefaultPartitionInfo = PartitionInfo{ PartitionedGroupID: 0, diff --git a/pkg/util/users/index_updater_test.go b/pkg/util/users/index_updater_test.go index 0b4a6a68e9c..8cdef00bfdd 100644 --- a/pkg/util/users/index_updater_test.go +++ b/pkg/util/users/index_updater_test.go @@ -77,7 +77,7 @@ func TestUserIndexUpdater_UpdateUserIndex(t *testing.T) { t.Parallel() bkt, _ := cortex_testutil.PrepareFilesystemBucket(t) - updater := NewUserIndexUpdater(bkt, defaultCleanUpInterval, testData.scanner, nil) + updater := NewUserIndexUpdater(bkt, defaultUpdateInterval, testData.scanner, nil) err := updater.UpdateUserIndex(ctx) if testData.expectErr { @@ -118,7 +118,7 @@ func TestUserIndexUpdater_UpdateUserIndex_WriteError(t *testing.T) { // Mock the bucket to return an error on upload bkt.MockUpload(UserIndexCompressedFilename, assert.AnError) - updater := NewUserIndexUpdater(bkt, defaultCleanUpInterval, scanner, nil) + updater := NewUserIndexUpdater(bkt, defaultUpdateInterval, scanner, nil) err := updater.UpdateUserIndex(ctx) require.Error(t, err) assert.Contains(t, err.Error(), "upload user index") diff --git a/pkg/util/users/scanner_config.go b/pkg/util/users/scanner_config.go index 9ac6194c3ee..7cd08867de1 100644 --- a/pkg/util/users/scanner_config.go +++ b/pkg/util/users/scanner_config.go @@ -10,17 +10,17 @@ import ( ) type UsersScannerConfig struct { - Strategy string `yaml:"strategy"` - MaxStalePeriod time.Duration `yaml:"max_stale_period"` - CleanUpInterval time.Duration `yaml:"clean_up_interval"` - CacheTTL time.Duration `yaml:"cache_ttl"` + Strategy string `yaml:"strategy"` + MaxStalePeriod time.Duration `yaml:"max_stale_period"` + UpdateInterval time.Duration `yaml:"update_interval"` + CacheTTL time.Duration `yaml:"cache_ttl"` } const ( UserScanStrategyList = "list" UserScanStrategyUserIndex = "user_index" - defaultCleanUpInterval = time.Minute * 15 + defaultUpdateInterval = time.Minute * 15 ) var ( @@ -46,6 +46,6 @@ func (c *UsersScannerConfig) Validate() error { func (c *UsersScannerConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { f.StringVar(&c.Strategy, prefix+"users-scanner.strategy", UserScanStrategyList, fmt.Sprintf("Strategy to use to scan users. Supported values are: %s.", strings.Join(supportedStrategies, ", "))) f.DurationVar(&c.MaxStalePeriod, prefix+"users-scanner.user-index.max-stale-period", time.Hour, "Maximum period of time to consider the user index as stale. Fall back to the base scanner if stale. Only valid when strategy is user_index.") - f.DurationVar(&c.CleanUpInterval, prefix+"users-scanner.user-index.cleanup-interval", defaultCleanUpInterval, fmt.Sprintf("How frequently user index file is updated. It only takes effect when user scan strategy is %s.", UserScanStrategyUserIndex)) + f.DurationVar(&c.UpdateInterval, prefix+"users-scanner.user-index.update-interval", defaultUpdateInterval, fmt.Sprintf("How frequently user index file is updated. It only takes effect when user scan strategy is %s.", UserScanStrategyUserIndex)) f.DurationVar(&c.CacheTTL, prefix+"users-scanner.cache-ttl", 0, "TTL of the cached users. 0 disables caching and relies on caching at bucket client level.") } diff --git a/schemas/cortex-config-schema.json b/schemas/cortex-config-schema.json index 3cdc6290259..59df63212ac 100644 --- a/schemas/cortex-config-schema.json +++ b/schemas/cortex-config-schema.json @@ -912,13 +912,6 @@ "x-cli-flag": "alertmanager-storage.users-scanner.cache-ttl", "x-format": "duration" }, - "clean_up_interval": { - "default": "15m0s", - "description": "How frequently user index file is updated. It only takes effect when user scan strategy is user_index.", - "type": "string", - "x-cli-flag": "alertmanager-storage.users-scanner.user-index.cleanup-interval", - "x-format": "duration" - }, "max_stale_period": { "default": "1h0m0s", "description": "Maximum period of time to consider the user index as stale. Fall back to the base scanner if stale. Only valid when strategy is user_index.", @@ -931,6 +924,13 @@ "description": "Strategy to use to scan users. Supported values are: list, user_index.", "type": "string", "x-cli-flag": "alertmanager-storage.users-scanner.strategy" + }, + "update_interval": { + "default": "15m0s", + "description": "How frequently user index file is updated. It only takes effect when user scan strategy is user_index.", + "type": "string", + "x-cli-flag": "alertmanager-storage.users-scanner.user-index.update-interval", + "x-format": "duration" } }, "type": "object" @@ -3114,13 +3114,6 @@ "x-cli-flag": "blocks-storage.users-scanner.cache-ttl", "x-format": "duration" }, - "clean_up_interval": { - "default": "15m0s", - "description": "How frequently user index file is updated. It only takes effect when user scan strategy is user_index.", - "type": "string", - "x-cli-flag": "blocks-storage.users-scanner.user-index.cleanup-interval", - "x-format": "duration" - }, "max_stale_period": { "default": "1h0m0s", "description": "Maximum period of time to consider the user index as stale. Fall back to the base scanner if stale. Only valid when strategy is user_index.", @@ -3133,6 +3126,13 @@ "description": "Strategy to use to scan users. Supported values are: list, user_index.", "type": "string", "x-cli-flag": "blocks-storage.users-scanner.strategy" + }, + "update_interval": { + "default": "15m0s", + "description": "How frequently user index file is updated. It only takes effect when user scan strategy is user_index.", + "type": "string", + "x-cli-flag": "blocks-storage.users-scanner.user-index.update-interval", + "x-format": "duration" } }, "type": "object" @@ -6587,7 +6587,7 @@ "x-format": "duration" }, "frontend_address": { - "description": "[Experimental] GRPC listen address of the Query Frontend, in host:port format. If set, Ruler queries to Query Frontends via gRPC. If not set, ruler queries to Ingesters directly.", + "description": "[Experimental] gRPC address of the Query Frontend (host:port). If set, the Ruler send queries to the Query Frontend to utilize splitting and caching, at the cost of additional network hops compared to direct querying to Ingesters and Store Gateway.", "type": "string", "x-cli-flag": "ruler.frontend-address" }, @@ -7472,13 +7472,6 @@ "x-cli-flag": "ruler-storage.users-scanner.cache-ttl", "x-format": "duration" }, - "clean_up_interval": { - "default": "15m0s", - "description": "How frequently user index file is updated. It only takes effect when user scan strategy is user_index.", - "type": "string", - "x-cli-flag": "ruler-storage.users-scanner.user-index.cleanup-interval", - "x-format": "duration" - }, "max_stale_period": { "default": "1h0m0s", "description": "Maximum period of time to consider the user index as stale. Fall back to the base scanner if stale. Only valid when strategy is user_index.", @@ -7491,6 +7484,13 @@ "description": "Strategy to use to scan users. Supported values are: list, user_index.", "type": "string", "x-cli-flag": "ruler-storage.users-scanner.strategy" + }, + "update_interval": { + "default": "15m0s", + "description": "How frequently user index file is updated. It only takes effect when user scan strategy is user_index.", + "type": "string", + "x-cli-flag": "ruler-storage.users-scanner.user-index.update-interval", + "x-format": "duration" } }, "type": "object"