Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
# Changelog

## master / unreleased

* [CHANGE] Users Scanner: Rename user index update configuration. #7180
* Flag: Renamed `-*.users-scanner.user-index.cleanup-interval` to `-*.users-scanner.user-index.update-interval`.
* Config: Renamed `clean_up_interval` to `update_interval` within the `users_scanner` configuration block..
* [CHANGE] Querier: Refactored parquet cache configuration naming. #7146
* Metrics: Renamed `cortex_parquet_queryable_cache_*` to `cortex_parquet_cache_*`.
* Flags: Renamed `-querier.parquet-queryable-shard-cache-size` to `-querier.parquet-shard-cache-size` and `-querier.parquet-queryable-shard-cache-ttl` to `-querier.parquet-shard-cache-ttl`.
Expand All @@ -25,6 +27,7 @@
* [ENHANCEMENT] Compactor: Avoid double compaction by not filtering delete blocks on real time when using bucketIndex lister. #7156
* [ENHANCEMENT] Upgrade to go 1.25. #7164
* [ENHANCEMENT] Upgraded container base images to `alpine:3.23`. #7163
* [ENHANCEMENT] Instrument Ingester CPU profile with userID for read APIs. #7184
* [BUGFIX] Ring: Change DynamoDB KV to retry indefinitely for WatchKey. #7088
* [BUGFIX] Ruler: Add XFunctions validation support. #7111
* [BUGFIX] Querier: propagate Prometheus info annotations in protobuf responses. #7132
Expand Down
4 changes: 2 additions & 2 deletions docs/blocks-storage/querier.md
Original file line number Diff line number Diff line change
Expand Up @@ -1948,8 +1948,8 @@ blocks_storage:

# How frequently user index file is updated. It only takes effect when user
# scan strategy is user_index.
# CLI flag: -blocks-storage.users-scanner.user-index.cleanup-interval
[clean_up_interval: <duration> | default = 15m]
# CLI flag: -blocks-storage.users-scanner.user-index.update-interval
[update_interval: <duration> | default = 15m]

# TTL of the cached users. 0 disables caching and relies on caching at
# bucket client level.
Expand Down
4 changes: 2 additions & 2 deletions docs/blocks-storage/store-gateway.md
Original file line number Diff line number Diff line change
Expand Up @@ -2019,8 +2019,8 @@ blocks_storage:

# How frequently user index file is updated. It only takes effect when user
# scan strategy is user_index.
# CLI flag: -blocks-storage.users-scanner.user-index.cleanup-interval
[clean_up_interval: <duration> | default = 15m]
# CLI flag: -blocks-storage.users-scanner.user-index.update-interval
[update_interval: <duration> | default = 15m]

# TTL of the cached users. 0 disables caching and relies on caching at
# bucket client level.
Expand Down
19 changes: 10 additions & 9 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -999,8 +999,8 @@ users_scanner:

# How frequently user index file is updated. It only takes effect when user
# scan strategy is user_index.
# CLI flag: -alertmanager-storage.users-scanner.user-index.cleanup-interval
[clean_up_interval: <duration> | default = 15m]
# CLI flag: -alertmanager-storage.users-scanner.user-index.update-interval
[update_interval: <duration> | default = 15m]

# TTL of the cached users. 0 disables caching and relies on caching at bucket
# client level.
Expand Down Expand Up @@ -2624,8 +2624,8 @@ users_scanner:

# How frequently user index file is updated. It only takes effect when user
# scan strategy is user_index.
# CLI flag: -blocks-storage.users-scanner.user-index.cleanup-interval
[clean_up_interval: <duration> | default = 15m]
# CLI flag: -blocks-storage.users-scanner.user-index.update-interval
[update_interval: <duration> | default = 15m]

# TTL of the cached users. 0 disables caching and relies on caching at bucket
# client level.
Expand Down Expand Up @@ -5145,9 +5145,10 @@ The `redis_config` configures the Redis backend cache.
The `ruler_config` configures the Cortex ruler.

```yaml
# [Experimental] GRPC listen address of the Query Frontend, in host:port format.
# If set, Ruler queries to Query Frontends via gRPC. If not set, ruler queries
# to Ingesters directly.
# [Experimental] gRPC address of the Query Frontend (host:port). If set, the
# Ruler send queries to the Query Frontend to utilize splitting and caching, at
# the cost of additional network hops compared to direct querying to Ingesters
# and Store Gateway.
# CLI flag: -ruler.frontend-address
[frontend_address: <string> | default = ""]

Expand Down Expand Up @@ -5904,8 +5905,8 @@ users_scanner:

# How frequently user index file is updated. It only takes effect when user
# scan strategy is user_index.
# CLI flag: -ruler-storage.users-scanner.user-index.cleanup-interval
[clean_up_interval: <duration> | default = 15m]
# CLI flag: -ruler-storage.users-scanner.user-index.update-interval
[update_interval: <duration> | default = 15m]

# TTL of the cached users. 0 disables caching and relies on caching at bucket
# client level.
Expand Down
6 changes: 3 additions & 3 deletions integration/alertmanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,9 @@ func TestAlertmanagerWithUserIndexUpdater(t *testing.T) {

baseFlags := mergeFlags(AlertmanagerFlags(), AlertmanagerS3Flags())
flags := mergeFlags(baseFlags, AlertmanagerShardingFlags(consul.NetworkHTTPEndpoint(), 1), map[string]string{
"-alertmanager-storage.users-scanner.strategy": "user_index",
"-alertmanager-storage.users-scanner.user-index.cleanup-interval": "15s",
"-alertmanager.configs.poll-interval": "5s",
"-alertmanager-storage.users-scanner.strategy": "user_index",
"-alertmanager-storage.users-scanner.user-index.update-interval": "15s",
"-alertmanager.configs.poll-interval": "5s",
})

am := e2ecortex.NewAlertmanager(
Expand Down
8 changes: 4 additions & 4 deletions integration/ruler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,10 +158,10 @@ func TestRulerWithUserIndexUpdater(t *testing.T) {
RulerFlags(),
RulerShardingFlags(consul.NetworkHTTPEndpoint()),
map[string]string{
"-ruler.sharding-strategy": "shuffle-sharding",
"-ruler-storage.users-scanner.strategy": "user_index",
"-ruler-storage.users-scanner.user-index.cleanup-interval": "15s",
"-ruler.tenant-shard-size": "1",
"-ruler.sharding-strategy": "shuffle-sharding",
"-ruler-storage.users-scanner.strategy": "user_index",
"-ruler-storage.users-scanner.user-index.update-interval": "15s",
"-ruler.tenant-shard-size": "1",
// Since we're not going to run any rule, we don't need the
// store-gateway to be configured to a valid address.
"-querier.store-gateway-addresses": "localhost:12345",
Expand Down
2 changes: 1 addition & 1 deletion pkg/alertmanager/alertstore/bucketclient/bucket_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func NewBucketAlertStore(bkt objstore.InstrumentedBucket, userScannerCfg users.U
baseScanner, _ := users.NewScanner(users.UsersScannerConfig{
Strategy: users.UserScanStrategyList,
}, alertBucket, logger, regWithComponent)
userIndexUpdater = users.NewUserIndexUpdater(alertBucket, userScannerCfg.CleanUpInterval, baseScanner, regWithComponent)
userIndexUpdater = users.NewUserIndexUpdater(alertBucket, userScannerCfg.UpdateInterval, baseScanner, regWithComponent)
}

return &BucketAlertStore{
Expand Down
24 changes: 13 additions & 11 deletions pkg/compactor/blocks_cleaner.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ func (c *BlocksCleaner) runEmitPartitionMetricsWorker(ctx context.Context, jobCh
err := concurrency.ForEachUser(ctx, job.users, c.cfg.CleanupConcurrency, func(ctx context.Context, userID string) error {
userLogger := util_log.WithUserID(userID, c.logger)
userBucket := bucket.NewUserBucketClient(userID, c.bucketClient, c.cfgProvider)
c.emitUserParititionMetrics(ctx, userLogger, userBucket, userID)
c.emitUserPartitionMetrics(ctx, userLogger, userBucket, userID)
return nil
})

Expand Down Expand Up @@ -789,13 +789,14 @@ func (c *BlocksCleaner) cleanPartitionedGroupInfo(ctx context.Context, userBucke
isPartitionGroupInfoDeleted := false
partitionedGroupInfoFile := extraInfo.path
deletedBlocksCount := 0
partitionedGroupLogger := log.With(userLogger, "partitioned_group_id", partitionedGroupInfo.PartitionedGroupID, "partitioned_group_creation_time", partitionedGroupInfo.CreationTimeString())

if extraInfo.status.CanDelete {
if extraInfo.status.IsCompleted {
// Try to remove all blocks included in partitioned group info
deletedBlocksCount, err = partitionedGroupInfo.markAllBlocksForDeletion(ctx, userBucket, userLogger, c.blocksMarkedForDeletion, userID)
if err != nil {
level.Warn(userLogger).Log("msg", "unable to mark all blocks in partitioned group info for deletion", "partitioned_group_id", partitionedGroupInfo.PartitionedGroupID)
level.Warn(partitionedGroupLogger).Log("msg", "unable to mark all blocks in partitioned group info for deletion")
// if one block can not be marked for deletion, we should
// skip delete this partitioned group. next iteration
// would try it again.
Expand All @@ -804,13 +805,13 @@ func (c *BlocksCleaner) cleanPartitionedGroupInfo(ctx context.Context, userBucke
}

if deletedBlocksCount > 0 {
level.Info(userLogger).Log("msg", "parent blocks deleted, will delete partition group file in next cleaning cycle", "partitioned_group_id", partitionedGroupInfo.PartitionedGroupID)
level.Info(partitionedGroupLogger).Log("msg", "parent blocks deleted, will delete partition group file in next cleaning cycle")
} else {
level.Info(userLogger).Log("msg", "deleting partition group because either all associated blocks have been deleted or partition group is invalid", "partitioned_group_id", partitionedGroupInfo.PartitionedGroupID)
level.Info(partitionedGroupLogger).Log("msg", "deleting partition group because either all associated blocks have been deleted or partition group is invalid")
if err := userBucket.Delete(ctx, partitionedGroupInfoFile); err != nil {
level.Warn(userLogger).Log("msg", "failed to delete partitioned group info", "partitioned_group_info", partitionedGroupInfoFile, "err", err)
level.Warn(partitionedGroupLogger).Log("msg", "failed to delete partitioned group info", "partitioned_group_file", partitionedGroupInfoFile, "err", err)
} else {
level.Info(userLogger).Log("msg", "deleted partitioned group info", "partitioned_group_info", partitionedGroupInfoFile)
level.Info(partitionedGroupLogger).Log("msg", "deleted partitioned group info", "partitioned_group_file", partitionedGroupInfoFile)
isPartitionGroupInfoDeleted = true
}
}
Expand All @@ -819,15 +820,15 @@ func (c *BlocksCleaner) cleanPartitionedGroupInfo(ctx context.Context, userBucke
if isPartitionGroupInfoDeleted && (extraInfo.status.CanDelete || extraInfo.status.DeleteVisitMarker) {
// Remove partition visit markers
if _, err := bucket.DeletePrefix(ctx, userBucket, GetPartitionVisitMarkerDirectoryPath(partitionedGroupInfo.PartitionedGroupID), userLogger, defaultDeleteBlocksConcurrency); err != nil {
level.Warn(userLogger).Log("msg", "failed to delete partition visit markers for partitioned group", "partitioned_group_info", partitionedGroupInfoFile, "err", err)
level.Warn(partitionedGroupLogger).Log("msg", "failed to delete partition visit markers for partitioned group", "err", err)
} else {
level.Info(userLogger).Log("msg", "deleted partition visit markers for partitioned group", "partitioned_group_info", partitionedGroupInfoFile)
level.Info(partitionedGroupLogger).Log("msg", "deleted partition visit markers for partitioned group")
}
}
}
}

func (c *BlocksCleaner) emitUserParititionMetrics(ctx context.Context, userLogger log.Logger, userBucket objstore.InstrumentedBucket, userID string) {
func (c *BlocksCleaner) emitUserPartitionMetrics(ctx context.Context, userLogger log.Logger, userBucket objstore.InstrumentedBucket, userID string) {
existentPartitionedGroupInfo, err := c.iterPartitionGroups(ctx, userBucket, userLogger)
if err != nil {
level.Warn(userLogger).Log("msg", "error listing partitioned group directory to emit metrics", "err", err)
Expand All @@ -842,7 +843,7 @@ func (c *BlocksCleaner) emitUserParititionMetrics(ctx context.Context, userLogge
c.inProgressCompactions.WithLabelValues(userID).Set(float64(inProgressCompactions))
if oldestPartitionGroup != nil {
c.oldestPartitionGroupOffset.WithLabelValues(userID).Set(float64(time.Now().Unix() - oldestPartitionGroup.CreationTime))
level.Debug(userLogger).Log("msg", "partition group info with oldest creation time", "partitioned_group_id", oldestPartitionGroup.PartitionedGroupID, "creation_time", oldestPartitionGroup.CreationTime)
level.Debug(userLogger).Log("msg", "partition group info with oldest creation time", "partitioned_group_id", oldestPartitionGroup.PartitionedGroupID, "creation_time", oldestPartitionGroup.CreationTimeString())
} else {
c.oldestPartitionGroupOffset.WithLabelValues(userID).Set(0)
}
Expand Down Expand Up @@ -874,8 +875,9 @@ func (c *BlocksCleaner) iterPartitionGroups(ctx context.Context, userBucket objs
return nil
}

partitionedGroupLogger := log.With(userLogger, "partitioned_group_id", partitionedGroupInfo.PartitionedGroupID, "partitioned_group_creation_time", partitionedGroupInfo.CreationTimeString())
status := partitionedGroupInfo.getPartitionedGroupStatus(ctx, userBucket, c.compactionVisitMarkerTimeout, userLogger)
level.Debug(userLogger).Log("msg", "got partitioned group status", "partitioned_group_status", status.String())
level.Debug(partitionedGroupLogger).Log("msg", "got partitioned group status", "partitioned_group_status", status.String())
existentPartitionedGroupInfo[partitionedGroupInfo] = struct {
path string
status PartitionedGroupStatus
Expand Down
2 changes: 1 addition & 1 deletion pkg/compactor/blocks_cleaner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1256,7 +1256,7 @@ func TestBlocksCleaner_EmitUserMetrics(t *testing.T) {
err = v4Manager.updateVisitMarker(ctx)
require.NoError(t, err)

cleaner.emitUserParititionMetrics(ctx, logger, userBucket, userID)
cleaner.emitUserPartitionMetrics(ctx, logger, userBucket, userID)

metricNames := []string{
"cortex_compactor_remaining_planned_compactions",
Expand Down
4 changes: 2 additions & 2 deletions pkg/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -762,7 +762,7 @@ func (c *Compactor) starting(ctx context.Context) error {
baseScanner, _ := users.NewScanner(users.UsersScannerConfig{
Strategy: users.UserScanStrategyList,
}, c.bucketClient, c.logger, c.registerer)
c.userIndexUpdater = users.NewUserIndexUpdater(c.bucketClient, c.storageCfg.UsersScanner.CleanUpInterval, baseScanner, extprom.WrapRegistererWith(prometheus.Labels{"component": "compactor"}, c.registerer))
c.userIndexUpdater = users.NewUserIndexUpdater(c.bucketClient, c.storageCfg.UsersScanner.UpdateInterval, baseScanner, extprom.WrapRegistererWith(prometheus.Labels{"component": "compactor"}, c.registerer))
}

return nil
Expand Down Expand Up @@ -1212,7 +1212,7 @@ func (c *Compactor) userIndexUpdateLoop(ctx context.Context) {
// Hardcode ID to check which compactor owns updating user index.
userID := users.UserIndexCompressedFilename
// Align with clean up interval.
ticker := time.NewTicker(util.DurationWithJitter(c.storageCfg.UsersScanner.CleanUpInterval, 0.1))
ticker := time.NewTicker(util.DurationWithJitter(c.storageCfg.UsersScanner.UpdateInterval, 0.1))
defer ticker.Stop()

for {
Expand Down
2 changes: 1 addition & 1 deletion pkg/compactor/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1580,7 +1580,7 @@ func prepare(t *testing.T, compactorCfg Config, bucketClient objstore.Instrument
flagext.DefaultValues(&storageCfg)
storageCfg.BucketStore.BlockDiscoveryStrategy = string(cortex_tsdb.RecursiveDiscovery)
storageCfg.UsersScanner.Strategy = users.UserScanStrategyUserIndex
storageCfg.UsersScanner.CleanUpInterval = 100 * time.Millisecond // Short interval for testing
storageCfg.UsersScanner.UpdateInterval = 100 * time.Millisecond // Short interval for testing

// Create a temporary directory for compactor data.
compactorCfg.DataDir = t.TempDir()
Expand Down
Loading
Loading