diff --git a/CHANGELOG.md b/CHANGELOG.md index b51e92c708..96a3f87af1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -29,7 +29,8 @@ * [ENHANCEMENT] Compactor: Avoid double compaction by not filtering delete blocks on real time when using bucketIndex lister. #7156 * [ENHANCEMENT] Upgrade to go 1.25. #7164 * [ENHANCEMENT] Upgraded container base images to `alpine:3.23`. #7163 -* [ENHANCEMENT] Instrument Ingester CPU profile with userID for read APIs. #7184 +* [ENHANCEMENT] Ingester: Instrument Ingester CPU profile with userID for read APIs. #7184 +* [ENHANCEMENT] Ingester: Add fetch timeout for Ingester expanded postings cache. #7185 * [BUGFIX] Ring: Change DynamoDB KV to retry indefinitely for WatchKey. #7088 * [BUGFIX] Ruler: Add XFunctions validation support. #7111 * [BUGFIX] Querier: propagate Prometheus info annotations in protobuf responses. #7132 diff --git a/docs/blocks-storage/querier.md b/docs/blocks-storage/querier.md index de43a4eb36..94f8767814 100644 --- a/docs/blocks-storage/querier.md +++ b/docs/blocks-storage/querier.md @@ -1929,6 +1929,12 @@ blocks_storage: # CLI flag: -blocks-storage.expanded_postings_cache.head.ttl [ttl: | default = 10m] + # Timeout for fetching postings from TSDB index when cache miss occurs. + # This prevents runaway queries from consuming resources when all + # callers have given up. + # CLI flag: -blocks-storage.expanded_postings_cache.head.fetch-timeout + [fetch_timeout: | default = 0s] + # If enabled, ingesters will cache expanded postings for the compacted # blocks. The cache is shared between all blocks. blocks: @@ -1944,6 +1950,12 @@ blocks_storage: # CLI flag: -blocks-storage.expanded_postings_cache.block.ttl [ttl: | default = 10m] + # Timeout for fetching postings from TSDB index when cache miss occurs. + # This prevents runaway queries from consuming resources when all + # callers have given up. + # CLI flag: -blocks-storage.expanded_postings_cache.block.fetch-timeout + [fetch_timeout: | default = 0s] + users_scanner: # Strategy to use to scan users. Supported values are: list, user_index. # CLI flag: -blocks-storage.users-scanner.strategy diff --git a/docs/blocks-storage/store-gateway.md b/docs/blocks-storage/store-gateway.md index 703437faf9..6e6a0c160a 100644 --- a/docs/blocks-storage/store-gateway.md +++ b/docs/blocks-storage/store-gateway.md @@ -2000,6 +2000,12 @@ blocks_storage: # CLI flag: -blocks-storage.expanded_postings_cache.head.ttl [ttl: | default = 10m] + # Timeout for fetching postings from TSDB index when cache miss occurs. + # This prevents runaway queries from consuming resources when all + # callers have given up. + # CLI flag: -blocks-storage.expanded_postings_cache.head.fetch-timeout + [fetch_timeout: | default = 0s] + # If enabled, ingesters will cache expanded postings for the compacted # blocks. The cache is shared between all blocks. blocks: @@ -2015,6 +2021,12 @@ blocks_storage: # CLI flag: -blocks-storage.expanded_postings_cache.block.ttl [ttl: | default = 10m] + # Timeout for fetching postings from TSDB index when cache miss occurs. + # This prevents runaway queries from consuming resources when all + # callers have given up. + # CLI flag: -blocks-storage.expanded_postings_cache.block.fetch-timeout + [fetch_timeout: | default = 0s] + users_scanner: # Strategy to use to scan users. Supported values are: list, user_index. # CLI flag: -blocks-storage.users-scanner.strategy diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 519f9ec11f..6a4943c665 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -2605,6 +2605,12 @@ tsdb: # CLI flag: -blocks-storage.expanded_postings_cache.head.ttl [ttl: | default = 10m] + # Timeout for fetching postings from TSDB index when cache miss occurs. + # This prevents runaway queries from consuming resources when all callers + # have given up. + # CLI flag: -blocks-storage.expanded_postings_cache.head.fetch-timeout + [fetch_timeout: | default = 0s] + # If enabled, ingesters will cache expanded postings for the compacted # blocks. The cache is shared between all blocks. blocks: @@ -2620,6 +2626,12 @@ tsdb: # CLI flag: -blocks-storage.expanded_postings_cache.block.ttl [ttl: | default = 10m] + # Timeout for fetching postings from TSDB index when cache miss occurs. + # This prevents runaway queries from consuming resources when all callers + # have given up. + # CLI flag: -blocks-storage.expanded_postings_cache.block.fetch-timeout + [fetch_timeout: | default = 0s] + users_scanner: # Strategy to use to scan users. Supported values are: list, user_index. # CLI flag: -blocks-storage.users-scanner.strategy diff --git a/pkg/storage/tsdb/expanded_postings_cache.go b/pkg/storage/tsdb/expanded_postings_cache.go index ded95d975e..b4fcd23f76 100644 --- a/pkg/storage/tsdb/expanded_postings_cache.go +++ b/pkg/storage/tsdb/expanded_postings_cache.go @@ -80,9 +80,10 @@ type TSDBPostingsCacheConfig struct { } type PostingsCacheConfig struct { - Enabled bool `yaml:"enabled"` - MaxBytes int64 `yaml:"max_bytes"` - Ttl time.Duration `yaml:"ttl"` + Enabled bool `yaml:"enabled"` + MaxBytes int64 `yaml:"max_bytes"` + Ttl time.Duration `yaml:"ttl"` + FetchTimeout time.Duration `yaml:"fetch_timeout"` } func (cfg *TSDBPostingsCacheConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { @@ -94,6 +95,7 @@ func (cfg *TSDBPostingsCacheConfig) RegisterFlagsWithPrefix(prefix string, f *fl func (cfg *PostingsCacheConfig) RegisterFlagsWithPrefix(prefix, block string, f *flag.FlagSet) { f.Int64Var(&cfg.MaxBytes, prefix+"expanded_postings_cache."+block+".max-bytes", 10*1024*1024, "Max bytes for postings cache") f.DurationVar(&cfg.Ttl, prefix+"expanded_postings_cache."+block+".ttl", 10*time.Minute, "TTL for postings cache") + f.DurationVar(&cfg.FetchTimeout, prefix+"expanded_postings_cache."+block+".fetch-timeout", 0, "Timeout for fetching postings from TSDB index when cache miss occurs. This prevents runaway queries from consuming resources when all callers have given up.") f.BoolVar(&cfg.Enabled, prefix+"expanded_postings_cache."+block+".enabled", false, "Whether the postings cache is enabled or not") } @@ -219,8 +221,18 @@ func (c *blocksPostingsForMatchersCache) fetchPostings(blockID ulid.ULID, ix tsd c.metrics.CacheRequests.WithLabelValues(cache.name).Inc() fetch := func() ([]storage.SeriesRef, int64, error) { - // Use context.Background() as this promise is maybe shared across calls - postings, err := c.postingsForMatchersFunc(context.Background(), ix, ms...) + // Use a context with timeout instead of context.Background() to prevent runaway queries. + // This promise is maybe shared across calls, so we can't use any single caller's context. + // However, we need a timeout to prevent the fetch from running indefinitely when all + // callers have given up (e.g., after their 1-minute query timeout). + fetchCtx := context.Background() + if cache.cfg.FetchTimeout > 0 { + var cancel context.CancelFunc + fetchCtx, cancel = context.WithTimeout(fetchCtx, cache.cfg.FetchTimeout) + defer cancel() + } + + postings, err := c.postingsForMatchersFunc(fetchCtx, ix, ms...) if err == nil { ids, err := index.ExpandPostings(postings) diff --git a/pkg/storage/tsdb/expanded_postings_cache_test.go b/pkg/storage/tsdb/expanded_postings_cache_test.go index abe0447402..f446803318 100644 --- a/pkg/storage/tsdb/expanded_postings_cache_test.go +++ b/pkg/storage/tsdb/expanded_postings_cache_test.go @@ -2,6 +2,7 @@ package tsdb import ( "bytes" + "context" "fmt" "strings" "sync" @@ -12,6 +13,8 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/tsdb" + "github.com/prometheus/prometheus/tsdb/index" "github.com/stretchr/testify/require" "go.uber.org/atomic" ) @@ -233,3 +236,59 @@ func RepeatStringIfNeeded(seed string, length int) string { return strings.Repeat(seed, 1+length/len(seed))[:max(length, len(seed))] } + +func TestPostingsCacheFetchTimeout(t *testing.T) { + // Test that the fetch operation respects the FetchTimeout configuration + // to prevent runaway queries when all callers have given up. + cfg := TSDBPostingsCacheConfig{ + Head: PostingsCacheConfig{ + Enabled: true, + Ttl: time.Hour, + MaxBytes: 10 << 20, + FetchTimeout: 100 * time.Millisecond, + }, + } + + fetchStarted := make(chan struct{}) + fetchShouldBlock := make(chan struct{}) + fetchCompleted := atomic.Bool{} + + cfg.PostingsForMatchers = func(ctx context.Context, ix tsdb.IndexReader, ms ...*labels.Matcher) (index.Postings, error) { + close(fetchStarted) + select { + case <-ctx.Done(): + // Good! Context was cancelled due to timeout + return nil, ctx.Err() + case <-fetchShouldBlock: + // This shouldn't happen - the fetch should be cancelled by timeout + fetchCompleted.Store(true) + return index.EmptyPostings(), nil + } + } + + m := NewPostingCacheMetrics(prometheus.NewPedanticRegistry()) + cache := newBlocksPostingsForMatchersCache("user1", cfg, m, newSeedByHash(seedArraySize)) + + // Start a query that will trigger the fetch + blockID := headULID + queryCtx, queryCancel := context.WithTimeout(context.Background(), 50*time.Millisecond) + defer queryCancel() + + _, err := cache.PostingsForMatchers(queryCtx, blockID, nil, labels.MustNewMatcher(labels.MatchEqual, "__name__", "test_metric")) + + // Wait for fetch to start + <-fetchStarted + + // The query context will timeout after 50ms + // But the fetch should continue with its own timeout (100ms) + require.Error(t, err) + require.ErrorIs(t, err, context.DeadlineExceeded) + + // Wait a bit more than the fetch timeout + time.Sleep(1 * time.Second) + + // The fetch should have been cancelled by its timeout, not completed + require.False(t, fetchCompleted.Load(), "Fetch should have been cancelled by timeout, not completed") + + close(fetchShouldBlock) +} diff --git a/schemas/cortex-config-schema.json b/schemas/cortex-config-schema.json index 8b35f06e02..67cd48dc87 100644 --- a/schemas/cortex-config-schema.json +++ b/schemas/cortex-config-schema.json @@ -2972,6 +2972,13 @@ "type": "boolean", "x-cli-flag": "blocks-storage.expanded_postings_cache.block.enabled" }, + "fetch_timeout": { + "default": "0s", + "description": "Timeout for fetching postings from TSDB index when cache miss occurs. This prevents runaway queries from consuming resources when all callers have given up.", + "type": "string", + "x-cli-flag": "blocks-storage.expanded_postings_cache.block.fetch-timeout", + "x-format": "duration" + }, "max_bytes": { "default": 10485760, "description": "Max bytes for postings cache", @@ -2997,6 +3004,13 @@ "type": "boolean", "x-cli-flag": "blocks-storage.expanded_postings_cache.head.enabled" }, + "fetch_timeout": { + "default": "0s", + "description": "Timeout for fetching postings from TSDB index when cache miss occurs. This prevents runaway queries from consuming resources when all callers have given up.", + "type": "string", + "x-cli-flag": "blocks-storage.expanded_postings_cache.head.fetch-timeout", + "x-format": "duration" + }, "max_bytes": { "default": 10485760, "description": "Max bytes for postings cache",