diff --git a/go.mod b/go.mod index 3a6a012cde9..096696b0968 100644 --- a/go.mod +++ b/go.mod @@ -50,7 +50,7 @@ require ( github.com/spf13/afero v1.11.0 github.com/stretchr/testify v1.11.1 github.com/thanos-io/objstore v0.0.0-20250722142242-922b22272ee3 - github.com/thanos-io/promql-engine v0.0.0-20251117105526-bcec363c24e6 + github.com/thanos-io/promql-engine v0.0.0-20251224085502-3988aa4704b5 github.com/thanos-io/thanos v0.39.3-0.20250729120336-88d0ae8071cb github.com/uber/jaeger-client-go v2.30.0+incompatible github.com/weaveworks/common v0.0.0-20230728070032-dd9e68f319d5 diff --git a/go.sum b/go.sum index 8bfaa75fbe0..7a726b84efb 100644 --- a/go.sum +++ b/go.sum @@ -1778,8 +1778,8 @@ github.com/thanos-community/galaxycache v0.0.0-20211122094458-3a32041a1f1e h1:f1 github.com/thanos-community/galaxycache v0.0.0-20211122094458-3a32041a1f1e/go.mod h1:jXcofnrSln/cLI6/dhlBxPQZEEQHVPCcFaH75M+nSzM= github.com/thanos-io/objstore v0.0.0-20250722142242-922b22272ee3 h1:P301Anc27aVL7Ls88el92j+qW3PJp8zmiDl+kOUZv3A= github.com/thanos-io/objstore v0.0.0-20250722142242-922b22272ee3/go.mod h1:uDHLkMKOGDAnlN75EAz8VrRzob1+VbgYSuUleatWuF0= -github.com/thanos-io/promql-engine v0.0.0-20251117105526-bcec363c24e6 h1:/8TlVay3hF8LQ7iGLRm9aNRGQsyYOXFPNmtg5dsNwcM= -github.com/thanos-io/promql-engine v0.0.0-20251117105526-bcec363c24e6/go.mod h1:MOFN0M1nDMcWZg1t4iF39sOard/K4SWgO/HHSODeDIc= +github.com/thanos-io/promql-engine v0.0.0-20251224085502-3988aa4704b5 h1:hIg9M9TRha/qaLDdtwsTWsTDkewGHleVZaV2JsLY1vA= +github.com/thanos-io/promql-engine v0.0.0-20251224085502-3988aa4704b5/go.mod h1:MOFN0M1nDMcWZg1t4iF39sOard/K4SWgO/HHSODeDIc= github.com/thanos-io/thanos v0.39.3-0.20250729120336-88d0ae8071cb h1:z/ePbn3lo/D4vdHGH8hpa2kgH9M6iLq0kOFtZwuelKM= github.com/thanos-io/thanos v0.39.3-0.20250729120336-88d0ae8071cb/go.mod h1:gGUG3TDEoRSjTFVs/QO6QnQIILRgNF0P9l7BiiMfmHw= github.com/tinylib/msgp v1.3.0 h1:ULuf7GPooDaIlbyvgAxBV/FI7ynli6LZ1/nVUNu+0ww= diff --git a/vendor/github.com/thanos-io/promql-engine/compute/aggregators.go b/vendor/github.com/thanos-io/promql-engine/compute/aggregators.go index 1b2fbcc8ad3..31c56527811 100644 --- a/vendor/github.com/thanos-io/promql-engine/compute/aggregators.go +++ b/vendor/github.com/thanos-io/promql-engine/compute/aggregators.go @@ -9,10 +9,7 @@ import ( "github.com/thanos-io/promql-engine/warnings" - "github.com/efficientgo/core/errors" "github.com/prometheus/prometheus/model/histogram" - "github.com/prometheus/prometheus/promql/parser/posrange" - "github.com/prometheus/prometheus/util/annotations" "gonum.org/v1/gonum/floats" ) @@ -24,17 +21,28 @@ const ( MixedTypeValue ) +// Accumulators map prometheus behavior for aggregations, either operators or +// "[...]_over_time" functions. The caller is responsible to add all errors +// returned by Add as annotations. +// Accumulators might ignore histograms (for example min or max), if they do +// the caller can check the HasIgnoredHistograms method and add appropriate +// annotations. +// The ValueType function can be checked to see if the aggregator encountered +// mixed values for its slot so the caller can again add the appropriate annotations. type Accumulator interface { Add(v float64, h *histogram.FloatHistogram) error Value() (float64, *histogram.FloatHistogram) ValueType() ValueType + HasIgnoredHistograms() bool Reset(float64) } +// VectorAccumulator is like Accumulator but accepts batches of values. type VectorAccumulator interface { AddVector(vs []float64, hs []*histogram.FloatHistogram) error Value() (float64, *histogram.FloatHistogram) ValueType() ValueType + HasIgnoredHistograms() bool Reset(float64) } @@ -43,6 +51,7 @@ type SumAcc struct { compensation float64 histSum *histogram.FloatHistogram hasFloatVal bool + hasError bool // histogram error occurred; accumulator becomes no-op } func NewSumAcc() *SumAcc { @@ -50,24 +59,38 @@ func NewSumAcc() *SumAcc { } func (s *SumAcc) AddVector(float64s []float64, histograms []*histogram.FloatHistogram) error { + if s.hasError { + return nil + } if len(float64s) > 0 { s.value, s.compensation = KahanSumInc(compensatedSum(float64s), s.value, s.compensation) s.hasFloatVal = true } - var err error if len(histograms) > 0 { + var err error s.histSum, err = histogramSum(s.histSum, histograms) + if err != nil { + s.hasError = true + return err + } } - return err + return nil } func (s *SumAcc) Add(v float64, h *histogram.FloatHistogram) error { + if s.hasError { + return nil + } if h == nil { s.hasFloatVal = true s.value, s.compensation = KahanSumInc(v, s.value, s.compensation) return nil } + return s.addHistogram(h) +} + +func (s *SumAcc) addHistogram(h *histogram.FloatHistogram) error { if s.histSum == nil { s.histSum = h.Copy() return nil @@ -76,27 +99,17 @@ func (s *SumAcc) Add(v float64, h *histogram.FloatHistogram) error { // https://github.com/prometheus/prometheus/blob/57bcbf18880f7554ae34c5b341d52fc53f059a97/promql/engine.go#L2448-L2456 var err error if h.Schema >= s.histSum.Schema { - if s.histSum, err = s.histSum.Add(h); err != nil { - if errors.Is(err, histogram.ErrHistogramsIncompatibleSchema) { - return annotations.MixedExponentialCustomHistogramsWarning - } - if errors.Is(err, histogram.ErrHistogramsIncompatibleBounds) { - return annotations.IncompatibleCustomBucketsHistogramsWarning - } - return err - } + s.histSum, err = s.histSum.Add(h) } else { t := h.Copy() - if s.histSum, err = t.Add(s.histSum); err != nil { - if errors.Is(err, histogram.ErrHistogramsIncompatibleSchema) { - return annotations.MixedExponentialCustomHistogramsWarning - } - if errors.Is(err, histogram.ErrHistogramsIncompatibleBounds) { - return annotations.IncompatibleCustomBucketsHistogramsWarning - } - return err + if s.histSum, err = t.Add(s.histSum); err == nil { + s.histSum = t } - s.histSum = t + } + if err != nil { + s.histSum = nil + s.hasError = true + return warnings.ConvertHistogramError(err) } return nil } @@ -118,9 +131,14 @@ func (s *SumAcc) ValueType() ValueType { return NoValue } +func (s *SumAcc) HasIgnoredHistograms() bool { + return false // Sum handles histograms; use ValueType() instead +} + func (s *SumAcc) Reset(_ float64) { s.histSum = nil s.hasFloatVal = false + s.hasError = false s.value = 0 s.compensation = 0 } @@ -130,44 +148,49 @@ func NewMaxAcc() *MaxAcc { } type MaxAcc struct { - value float64 - hasValue bool + value float64 + hasValue bool + ignoredHist bool } func (c *MaxAcc) AddVector(vs []float64, hs []*histogram.FloatHistogram) error { - var warn error if len(hs) > 0 { - warn = annotations.NewHistogramIgnoredInAggregationInfo("max", posrange.PositionRange{}) + c.ignoredHist = true } if len(vs) == 0 { - return warn + return nil } fst, rem := vs[0], vs[1:] - warn = warnings.Coalesce(warn, c.Add(fst, nil)) - if len(rem) == 0 { - return warn + _ = c.Add(fst, nil) + if len(rem) > 0 { + _ = c.Add(floats.Max(rem), nil) } - return warnings.Coalesce(warn, c.Add(floats.Max(rem), nil)) + return nil } func (c *MaxAcc) Add(v float64, h *histogram.FloatHistogram) error { if h != nil { - if c.hasValue { - return annotations.NewHistogramIgnoredInAggregationInfo("max", posrange.PositionRange{}) - } + c.ignoredHist = true return nil } + c.addFloat(v) + return nil +} + +func (c *MaxAcc) HasIgnoredHistograms() bool { + return c.ignoredHist +} +func (c *MaxAcc) addFloat(v float64) { if !c.hasValue { c.value = v c.hasValue = true - return nil + return } if c.value < v || math.IsNaN(c.value) { c.value = v } - return nil } func (c *MaxAcc) Value() (float64, *histogram.FloatHistogram) { @@ -184,6 +207,7 @@ func (c *MaxAcc) ValueType() ValueType { func (c *MaxAcc) Reset(_ float64) { c.hasValue = false + c.ignoredHist = false c.value = 0 } @@ -192,45 +216,49 @@ func NewMinAcc() *MinAcc { } type MinAcc struct { - value float64 - hasValue bool + value float64 + hasValue bool + ignoredHist bool } func (c *MinAcc) AddVector(vs []float64, hs []*histogram.FloatHistogram) error { - var warn error if len(hs) > 0 { - warn = annotations.NewHistogramIgnoredInAggregationInfo("min", posrange.PositionRange{}) + c.ignoredHist = true } if len(vs) == 0 { - return warn + return nil } fst, rem := vs[0], vs[1:] - warn = warnings.Coalesce(warn, c.Add(fst, nil)) - if len(rem) == 0 { - return warn + _ = c.Add(fst, nil) + if len(rem) > 0 { + _ = c.Add(floats.Min(rem), nil) } - - return warnings.Coalesce(warn, c.Add(floats.Min(rem), nil)) + return nil } func (c *MinAcc) Add(v float64, h *histogram.FloatHistogram) error { if h != nil { - if c.hasValue { - return annotations.NewHistogramIgnoredInAggregationInfo("min", posrange.PositionRange{}) - } + c.ignoredHist = true return nil } + c.addFloat(v) + return nil +} +func (c *MinAcc) HasIgnoredHistograms() bool { + return c.ignoredHist +} + +func (c *MinAcc) addFloat(v float64) { if !c.hasValue { c.value = v c.hasValue = true - return nil + return } if c.value > v || math.IsNaN(c.value) { c.value = v } - return nil } func (c *MinAcc) Value() (float64, *histogram.FloatHistogram) { @@ -247,6 +275,7 @@ func (c *MinAcc) ValueType() ValueType { func (c *MinAcc) Reset(_ float64) { c.hasValue = false + c.ignoredHist = false c.value = 0 } @@ -286,6 +315,10 @@ func (c *GroupAcc) ValueType() ValueType { } } +func (c *GroupAcc) HasIgnoredHistograms() bool { + return false +} + func (c *GroupAcc) Reset(_ float64) { c.hasValue = false c.value = 0 @@ -325,6 +358,10 @@ func (c *CountAcc) ValueType() ValueType { return NoValue } } +func (c *CountAcc) HasIgnoredHistograms() bool { + return false +} + func (c *CountAcc) Reset(_ float64) { c.hasValue = false c.value = 0 @@ -337,6 +374,7 @@ type AvgAcc struct { incremental bool count int64 hasValue bool + hasError bool // histogram error occurred; accumulator becomes no-op histSum *histogram.FloatHistogram histScratch *histogram.FloatHistogram @@ -349,27 +387,42 @@ func NewAvgAcc() *AvgAcc { } func (a *AvgAcc) Add(v float64, h *histogram.FloatHistogram) error { - if h != nil { - a.histCount++ - if a.histSum == nil { - a.histSum = h.Copy() - a.histScratch = &histogram.FloatHistogram{} - a.histSumScratch = &histogram.FloatHistogram{} - return nil - } + if a.hasError { + return nil + } + if h == nil { + return a.addFloat(v) + } + return a.addHistogram(h) +} - h.CopyTo(a.histScratch) - left := a.histScratch.Div(a.histCount) - a.histSum.CopyTo(a.histSumScratch) - right := a.histSumScratch.Div(a.histCount) - toAdd, err := left.Sub(right) - if err != nil { - return err - } +func (a *AvgAcc) addHistogram(h *histogram.FloatHistogram) error { + a.histCount++ + if a.histSum == nil { + a.histSum = h.Copy() + a.histScratch = &histogram.FloatHistogram{} + a.histSumScratch = &histogram.FloatHistogram{} + return nil + } + + h.CopyTo(a.histScratch) + left := a.histScratch.Div(a.histCount) + a.histSum.CopyTo(a.histSumScratch) + right := a.histSumScratch.Div(a.histCount) + toAdd, err := left.Sub(right) + if err == nil { a.histSum, err = a.histSum.Add(toAdd) - return err } + if err != nil { + a.histSum = nil + a.histCount = 0 + a.hasError = true + return warnings.ConvertHistogramError(err) + } + return nil +} +func (a *AvgAcc) addFloat(v float64) error { a.count++ if !a.hasValue { a.hasValue = true @@ -426,6 +479,9 @@ func (a *AvgAcc) Add(v float64, h *histogram.FloatHistogram) error { } func (a *AvgAcc) AddVector(vs []float64, hs []*histogram.FloatHistogram) error { + if a.hasError { + return nil + } for _, v := range vs { if err := a.Add(v, nil); err != nil { return err @@ -433,18 +489,6 @@ func (a *AvgAcc) AddVector(vs []float64, hs []*histogram.FloatHistogram) error { } for _, h := range hs { if err := a.Add(0, h); err != nil { - if errors.Is(err, histogram.ErrHistogramsIncompatibleSchema) { - // to make valueType NoValue - a.histSum = nil - a.histCount = 0 - return annotations.MixedExponentialCustomHistogramsWarning - } - if errors.Is(err, histogram.ErrHistogramsIncompatibleBounds) { - // to make valueType NoValue - a.histSum = nil - a.histCount = 0 - return annotations.IncompatibleCustomBucketsHistogramsWarning - } return err } } @@ -474,8 +518,13 @@ func (a *AvgAcc) ValueType() ValueType { return NoValue } +func (a *AvgAcc) HasIgnoredHistograms() bool { + return false // Avg handles histograms; use ValueType() instead +} + func (a *AvgAcc) Reset(_ float64) { a.hasValue = false + a.hasError = false a.incremental = false a.kahanSum = 0 a.kahanC = 0 @@ -486,24 +535,55 @@ func (a *AvgAcc) Reset(_ float64) { } type statAcc struct { - count float64 - mean float64 - value float64 - hasValue bool + count float64 + mean float64 + cMean float64 + value float64 + cValue float64 + hasValue bool + hasNaN bool + ignoredHist bool } func (s *statAcc) ValueType() ValueType { if s.hasValue { return SingleTypeValue - } else { - return NoValue } + return NoValue +} + +func (s *statAcc) HasIgnoredHistograms() bool { + return s.ignoredHist } + func (s *statAcc) Reset(_ float64) { s.hasValue = false + s.hasNaN = false + s.ignoredHist = false s.count = 0 s.mean = 0 + s.cMean = 0 s.value = 0 + s.cValue = 0 +} + +func (s *statAcc) add(v float64) { + s.hasValue = true + s.count++ + if math.IsNaN(v) || math.IsInf(v, 0) { + s.hasNaN = true + return + } + delta := v - (s.mean + s.cMean) + s.mean, s.cMean = KahanSumInc(delta/s.count, s.mean, s.cMean) + s.value, s.cValue = KahanSumInc(delta*(v-(s.mean+s.cMean)), s.value, s.cValue) +} + +func (s *statAcc) variance() float64 { + if s.hasNaN { + return math.NaN() + } + return (s.value + s.cValue) / s.count } type StdDevAcc struct { @@ -516,31 +596,15 @@ func NewStdDevAcc() *StdDevAcc { func (s *StdDevAcc) Add(v float64, h *histogram.FloatHistogram) error { if h != nil { - return annotations.NewHistogramIgnoredInAggregationInfo("stddev", posrange.PositionRange{}) - } - - s.hasValue = true - s.count++ - - if math.IsNaN(v) || math.IsInf(v, 0) { - s.value = math.NaN() - } else { - delta := v - s.mean - s.mean += delta / s.count - s.value += delta * (v - s.mean) + s.ignoredHist = true + return nil } + s.add(v) return nil } func (s *StdDevAcc) Value() (float64, *histogram.FloatHistogram) { - if math.IsNaN(s.value) { - return math.NaN(), nil - } - - if s.count == 1 { - return 0, nil - } - return math.Sqrt(s.value / s.count), nil + return math.Sqrt(s.variance()), nil } type StdVarAcc struct { @@ -553,37 +617,22 @@ func NewStdVarAcc() *StdVarAcc { func (s *StdVarAcc) Add(v float64, h *histogram.FloatHistogram) error { if h != nil { - return annotations.NewHistogramIgnoredInAggregationInfo("stdvar", posrange.PositionRange{}) - } - - s.hasValue = true - s.count++ - - if math.IsNaN(v) || math.IsInf(v, 0) { - s.value = math.NaN() - } else { - delta := v - s.mean - s.mean += delta / s.count - s.value += delta * (v - s.mean) + s.ignoredHist = true + return nil } + s.add(v) return nil } func (s *StdVarAcc) Value() (float64, *histogram.FloatHistogram) { - if math.IsNaN(s.value) { - return math.NaN(), nil - } - - if s.count == 1 { - return 0, nil - } - return s.value / s.count, nil + return s.variance(), nil } type QuantileAcc struct { - arg float64 - points []float64 - hasValue bool + arg float64 + points []float64 + hasValue bool + ignoredHist bool } func NewQuantileAcc() Accumulator { @@ -592,7 +641,8 @@ func NewQuantileAcc() Accumulator { func (q *QuantileAcc) Add(v float64, h *histogram.FloatHistogram) error { if h != nil { - return annotations.NewHistogramIgnoredInAggregationInfo("quantile", posrange.PositionRange{}) + q.ignoredHist = true + return nil } q.hasValue = true @@ -612,8 +662,13 @@ func (q *QuantileAcc) ValueType() ValueType { } } +func (q *QuantileAcc) HasIgnoredHistograms() bool { + return q.ignoredHist +} + func (q *QuantileAcc) Reset(f float64) { q.hasValue = false + q.ignoredHist = false q.arg = f q.points = q.points[:0] } @@ -664,10 +719,65 @@ func (acc *HistogramAvgAcc) ValueType() ValueType { return NoValue } +func (acc *HistogramAvgAcc) HasIgnoredHistograms() bool { + return false // HistogramAvg handles histograms; use ValueType() instead +} + func (acc *HistogramAvgAcc) Reset(f float64) { acc.count = 0 } +// LastAcc tracks the last value seen. Used for last_over_time. +type LastAcc struct { + value float64 + hist *histogram.FloatHistogram + hasValue bool +} + +func NewLastAcc() *LastAcc { + return &LastAcc{} +} + +func (l *LastAcc) Add(v float64, h *histogram.FloatHistogram) error { + l.hasValue = true + if h != nil { + l.value = 0 + if l.hist == nil { + l.hist = h.Copy() + } else { + h.CopyTo(l.hist) + } + } else { + l.value = v + l.hist = nil + } + return nil +} + +func (l *LastAcc) Value() (float64, *histogram.FloatHistogram) { + if l.hist != nil { + return 0, l.hist.Copy() + } + return l.value, nil +} + +func (l *LastAcc) ValueType() ValueType { + if l.hasValue { + return SingleTypeValue + } + return NoValue +} + +func (l *LastAcc) HasIgnoredHistograms() bool { + return false // Last handles histograms; use ValueType() instead +} + +func (l *LastAcc) Reset(_ float64) { + l.hasValue = false + l.value = 0 + l.hist = nil +} + // KahanSumInc implements kahan summation, see https://en.wikipedia.org/wiki/Kahan_summation_algorithm. func KahanSumInc(inc, sum, c float64) (newSum, newC float64) { t := sum + inc @@ -727,26 +837,12 @@ func histogramSum(current *histogram.FloatHistogram, histograms []*histogram.Flo for i := range histograms { if histograms[i].Schema >= histSum.Schema { histSum, err = histSum.Add(histograms[i]) - if err != nil { - if errors.Is(err, histogram.ErrHistogramsIncompatibleSchema) { - return nil, annotations.MixedExponentialCustomHistogramsWarning - } - if errors.Is(err, histogram.ErrHistogramsIncompatibleBounds) { - return nil, annotations.IncompatibleCustomBucketsHistogramsWarning - } - return nil, err - } } else { t := histograms[i].Copy() - if histSum, err = t.Add(histSum); err != nil { - if errors.Is(err, histogram.ErrHistogramsIncompatibleSchema) { - return nil, annotations.NewMixedExponentialCustomHistogramsWarning("", posrange.PositionRange{}) - } - if errors.Is(err, histogram.ErrHistogramsIncompatibleBounds) { - return nil, annotations.NewIncompatibleCustomBucketsHistogramsWarning("", posrange.PositionRange{}) - } - return nil, err - } + histSum, err = t.Add(histSum) + } + if err != nil { + return nil, warnings.ConvertHistogramError(err) } } return histSum, nil diff --git a/vendor/github.com/thanos-io/promql-engine/engine/engine.go b/vendor/github.com/thanos-io/promql-engine/engine/engine.go index ad08deb904e..3a4c47b7dd1 100644 --- a/vendor/github.com/thanos-io/promql-engine/engine/engine.go +++ b/vendor/github.com/thanos-io/promql-engine/engine/engine.go @@ -273,7 +273,7 @@ func (e *Engine) MakeInstantQuery(ctx context.Context, q storage.Queryable, opts } e.metrics.totalQueries.Inc() return &compatibilityQuery{ - Query: &Query{exec: exec, opts: opts}, + Query: &Query{exec: exec, opts: qOpts}, engine: e, plan: optimizedPlan, warns: warns, @@ -281,9 +281,6 @@ func (e *Engine) MakeInstantQuery(ctx context.Context, q storage.Queryable, opts t: InstantQuery, resultSort: resultSort, scanners: scanners, - start: ts, - end: ts, - step: 0, }, nil } @@ -318,7 +315,7 @@ func (e *Engine) MakeInstantQueryFromPlan(ctx context.Context, q storage.Queryab e.metrics.totalQueries.Inc() return &compatibilityQuery{ - Query: &Query{exec: exec, opts: opts}, + Query: &Query{exec: exec, opts: qOpts}, engine: e, plan: lplan, warns: warns, @@ -327,9 +324,6 @@ func (e *Engine) MakeInstantQueryFromPlan(ctx context.Context, q storage.Queryab // TODO(fpetkovski): Infer the sort order from the plan, ideally without copying the newResultSort function. resultSort: noSortResultSort{}, scanners: scnrs, - start: ts, - end: ts, - step: 0, }, nil } @@ -378,15 +372,12 @@ func (e *Engine) MakeRangeQuery(ctx context.Context, q storage.Queryable, opts * e.metrics.totalQueries.Inc() return &compatibilityQuery{ - Query: &Query{exec: exec, opts: opts}, + Query: &Query{exec: exec, opts: qOpts}, engine: e, plan: optimizedPlan, warns: warns, t: RangeQuery, scanners: scnrs, - start: start, - end: end, - step: step, }, nil } @@ -420,15 +411,12 @@ func (e *Engine) MakeRangeQueryFromPlan(ctx context.Context, q storage.Queryable e.metrics.totalQueries.Inc() return &compatibilityQuery{ - Query: &Query{exec: exec, opts: opts}, + Query: &Query{exec: exec, opts: qOpts}, engine: e, plan: lplan, warns: warns, t: RangeQuery, scanners: scnrs, - start: start, - end: end, - step: step, }, nil } @@ -498,7 +486,7 @@ func (e *Engine) storageScanners(queryable storage.Queryable, qOpts *query.Optio type Query struct { exec model.VectorOperator - opts promql.QueryOpts + opts *query.Options } // Explain returns human-readable explanation of the created executor. @@ -520,9 +508,6 @@ type compatibilityQuery struct { plan logicalplan.Plan ts time.Time // Empty for range queries. warns annotations.Annotations - start time.Time - end time.Time - step time.Duration t QueryType resultSort resultSorter @@ -567,6 +552,7 @@ func (q *compatibilityQuery) Exec(ctx context.Context) (ret *promql.Result) { for i, s := range resultSeries { series[i].Metric = s } + totalSteps := q.opts.TotalSteps() loop: for { select { @@ -590,7 +576,7 @@ loop: for _, vector := range r { for i, s := range vector.SampleIDs { if len(series[s].Floats) == 0 { - series[s].Floats = make([]promql.FPoint, 0, 121) // Typically 1h of data. + series[s].Floats = make([]promql.FPoint, 0, totalSteps) } series[s].Floats = append(series[s].Floats, promql.FPoint{ T: vector.T, @@ -599,7 +585,7 @@ loop: } for i, s := range vector.HistogramIDs { if len(series[s].Histograms) == 0 { - series[s].Histograms = make([]promql.HPoint, 0, 121) // Typically 1h of data. + series[s].Histograms = make([]promql.HPoint, 0, totalSteps) } series[s].Histograms = append(series[s].Histograms, promql.HPoint{ T: vector.T, @@ -695,15 +681,12 @@ func (q *compatibilityQuery) Statement() parser.Statement { return nil } // Stats always returns empty query stats for now to avoid panic. func (q *compatibilityQuery) Stats() *stats.Statistics { - var enablePerStepStats bool - if q.opts != nil { - enablePerStepStats = q.opts.EnablePerStepStats() - } + enablePerStepStats := q.opts.EnablePerStepStats analysis := q.Analyze() samples := stats.NewQuerySamples(enablePerStepStats) if enablePerStepStats { - samples.InitStepTracking(q.start.UnixMilli(), q.end.UnixMilli(), telemetry.StepTrackingInterval(q.step)) + samples.InitStepTracking(q.opts.Start.UnixMilli(), q.opts.End.UnixMilli(), telemetry.StepTrackingInterval(q.opts.Step)) } if analysis != nil { diff --git a/vendor/github.com/thanos-io/promql-engine/execution/aggregate/hashaggregate.go b/vendor/github.com/thanos-io/promql-engine/execution/aggregate/hashaggregate.go index a73ccd28727..5d2477bef9f 100644 --- a/vendor/github.com/thanos-io/promql-engine/execution/aggregate/hashaggregate.go +++ b/vendor/github.com/thanos-io/promql-engine/execution/aggregate/hashaggregate.go @@ -135,9 +135,7 @@ func (a *aggregate) Next(ctx context.Context) ([]model.StepVector, error) { a.tables[i].reset(p) } if a.lastBatch != nil { - if warn := a.aggregate(a.lastBatch); warn != nil { - warnings.AddToContext(warn, ctx) - } + a.aggregate(ctx, a.lastBatch) a.lastBatch = nil } for { @@ -151,9 +149,7 @@ func (a *aggregate) Next(ctx context.Context) ([]model.StepVector, error) { // Keep aggregating samples as long as timestamps of batches are equal. currentTs := a.tables[0].timestamp() if currentTs == math.MinInt64 || next[0].T == currentTs { - if warn := a.aggregate(next); warn != nil { - warnings.AddToContext(warn, ctx) - } + a.aggregate(ctx, next) continue } a.lastBatch = next @@ -174,14 +170,12 @@ func (a *aggregate) Next(ctx context.Context) ([]model.StepVector, error) { return result, nil } -func (a *aggregate) aggregate(in []model.StepVector) error { - var err error +func (a *aggregate) aggregate(ctx context.Context, in []model.StepVector) { for i, vector := range in { - err = warnings.Coalesce(err, a.tables[i].aggregate(vector)) + a.tables[i].aggregate(ctx, vector) a.next.GetPool().PutStepVector(vector) } a.next.GetPool().PutVectors(in) - return err } func (a *aggregate) initializeTables(ctx context.Context) error { diff --git a/vendor/github.com/thanos-io/promql-engine/execution/aggregate/scalar_table.go b/vendor/github.com/thanos-io/promql-engine/execution/aggregate/scalar_table.go index 23021a759ec..4c9845968fd 100644 --- a/vendor/github.com/thanos-io/promql-engine/execution/aggregate/scalar_table.go +++ b/vendor/github.com/thanos-io/promql-engine/execution/aggregate/scalar_table.go @@ -14,10 +14,8 @@ import ( "github.com/thanos-io/promql-engine/warnings" "github.com/efficientgo/core/errors" - "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/promql/parser" - "github.com/prometheus/prometheus/promql/parser/posrange" "github.com/prometheus/prometheus/util/annotations" ) @@ -28,7 +26,7 @@ type aggregateTable interface { // If the table is empty, it returns math.MinInt64. timestamp() int64 // aggregate aggregates the given vector into the table. - aggregate(vector model.StepVector) error + aggregate(ctx context.Context, vector model.StepVector) // toVector writes out the accumulated result to the given vector and // resets the table. toVector(ctx context.Context, pool *model.VectorPool) model.StepVector @@ -77,31 +75,23 @@ func newScalarTable(inputSampleIDs []uint64, outputs []*model.Series, aggregatio }, nil } -func (t *scalarTable) aggregate(vector model.StepVector) error { +func (t *scalarTable) aggregate(ctx context.Context, vector model.StepVector) { t.ts = vector.T - var err error for i := range vector.Samples { - err = warnings.Coalesce(err, t.addSample(vector.SampleIDs[i], vector.Samples[i])) + outputSampleID := t.inputs[vector.SampleIDs[i]] + output := t.outputs[outputSampleID] + if err := t.accumulators[output.ID].Add(vector.Samples[i], nil); err != nil { + warnings.AddToContext(err, ctx) + } } for i := range vector.Histograms { - err = warnings.Coalesce(err, t.addHistogram(vector.HistogramIDs[i], vector.Histograms[i])) + outputSampleID := t.inputs[vector.HistogramIDs[i]] + output := t.outputs[outputSampleID] + if err := t.accumulators[output.ID].Add(0, vector.Histograms[i]); err != nil { + warnings.AddToContext(err, ctx) + } } - return err -} - -func (t *scalarTable) addSample(sampleID uint64, sample float64) error { - outputSampleID := t.inputs[sampleID] - output := t.outputs[outputSampleID] - - return t.accumulators[output.ID].Add(sample, nil) -} - -func (t *scalarTable) addHistogram(sampleID uint64, h *histogram.FloatHistogram) error { - outputSampleID := t.inputs[sampleID] - output := t.outputs[outputSampleID] - - return t.accumulators[output.ID].Add(0, h) } func (t *scalarTable) reset(arg float64) { @@ -114,18 +104,22 @@ func (t *scalarTable) reset(arg float64) { func (t *scalarTable) toVector(ctx context.Context, pool *model.VectorPool) model.StepVector { result := pool.GetStepVector(t.ts) for i, v := range t.outputs { - switch t.accumulators[i].ValueType() { + acc := t.accumulators[i] + if acc.HasIgnoredHistograms() { + warnings.AddToContext(annotations.HistogramIgnoredInAggregationInfo, ctx) + } + switch acc.ValueType() { case compute.NoValue: continue case compute.SingleTypeValue: - f, h := t.accumulators[i].Value() + f, h := acc.Value() if h == nil { result.AppendSample(pool, v.ID, f) } else { result.AppendHistogram(pool, v.ID, h) } case compute.MixedTypeValue: - warnings.AddToContext(annotations.NewMixedFloatsHistogramsAggWarning(posrange.PositionRange{}), ctx) + warnings.AddToContext(warnings.MixedFloatsHistogramsAggWarning, ctx) } } return result diff --git a/vendor/github.com/thanos-io/promql-engine/execution/aggregate/vector_table.go b/vendor/github.com/thanos-io/promql-engine/execution/aggregate/vector_table.go index d81972bddea..6d7d81c237a 100644 --- a/vendor/github.com/thanos-io/promql-engine/execution/aggregate/vector_table.go +++ b/vendor/github.com/thanos-io/promql-engine/execution/aggregate/vector_table.go @@ -15,7 +15,6 @@ import ( "github.com/efficientgo/core/errors" "github.com/prometheus/prometheus/promql/parser" - "github.com/prometheus/prometheus/promql/parser/posrange" "github.com/prometheus/prometheus/util/annotations" ) @@ -48,13 +47,18 @@ func (t *vectorTable) timestamp() int64 { return t.ts } -func (t *vectorTable) aggregate(vector model.StepVector) error { +func (t *vectorTable) aggregate(ctx context.Context, vector model.StepVector) { t.ts = vector.T - return t.accumulator.AddVector(vector.Samples, vector.Histograms) + if err := t.accumulator.AddVector(vector.Samples, vector.Histograms); err != nil { + warnings.AddToContext(err, ctx) + } } func (t *vectorTable) toVector(ctx context.Context, pool *model.VectorPool) model.StepVector { result := pool.GetStepVector(t.ts) + if t.accumulator.HasIgnoredHistograms() { + warnings.AddToContext(annotations.HistogramIgnoredInAggregationInfo, ctx) + } switch t.accumulator.ValueType() { case compute.NoValue: return result @@ -66,7 +70,7 @@ func (t *vectorTable) toVector(ctx context.Context, pool *model.VectorPool) mode result.AppendHistogram(pool, 0, h) } case compute.MixedTypeValue: - warnings.AddToContext(annotations.NewMixedFloatsHistogramsAggWarning(posrange.PositionRange{}), ctx) + warnings.AddToContext(warnings.MixedFloatsHistogramsAggWarning, ctx) } return result } diff --git a/vendor/github.com/thanos-io/promql-engine/execution/binary/utils.go b/vendor/github.com/thanos-io/promql-engine/execution/binary/utils.go index 28bbe32b881..e5d4cf3f741 100644 --- a/vendor/github.com/thanos-io/promql-engine/execution/binary/utils.go +++ b/vendor/github.com/thanos-io/promql-engine/execution/binary/utils.go @@ -7,6 +7,8 @@ import ( "fmt" "math" + "github.com/thanos-io/promql-engine/warnings" + "github.com/efficientgo/core/errors" "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" @@ -113,23 +115,13 @@ func binOp(op parser.ItemType, lhs, rhs float64, hlhs, hrhs *histogram.FloatHist case parser.ADD: res, err := hlhs.Copy().Add(hrhs) if err != nil { - if errors.Is(err, histogram.ErrHistogramsIncompatibleSchema) { - return 0, nil, false, annotations.MixedExponentialCustomHistogramsWarning - } else if errors.Is(err, histogram.ErrHistogramsIncompatibleBounds) { - return 0, nil, false, annotations.IncompatibleCustomBucketsHistogramsWarning - } - return 0, nil, false, errors.Newf("%s: %s", annotations.PromQLWarning, err) + return 0, nil, false, warnings.ConvertHistogramError(err) } return 0, res.Compact(0), true, nil case parser.SUB: res, err := hlhs.Copy().Sub(hrhs) if err != nil { - if errors.Is(err, histogram.ErrHistogramsIncompatibleSchema) { - return 0, nil, false, annotations.MixedExponentialCustomHistogramsWarning - } else if errors.Is(err, histogram.ErrHistogramsIncompatibleBounds) { - return 0, nil, false, annotations.IncompatibleCustomBucketsHistogramsWarning - } - return 0, nil, false, errors.Newf("%s: %s", annotations.PromQLWarning, err) + return 0, nil, false, warnings.ConvertHistogramError(err) } return 0, res.Compact(0), true, nil case parser.EQLC: diff --git a/vendor/github.com/thanos-io/promql-engine/execution/function/functions.go b/vendor/github.com/thanos-io/promql-engine/execution/function/functions.go index 79c62aa8478..ace2707d442 100644 --- a/vendor/github.com/thanos-io/promql-engine/execution/function/functions.go +++ b/vendor/github.com/thanos-io/promql-engine/execution/function/functions.go @@ -153,62 +153,14 @@ var instantVectorFuncs = map[string]functionCall{ return histogramStdVar(h), true }, // variants of date time functions with an argument - "days_in_month": func(f float64, h *histogram.FloatHistogram, vargs ...float64) (float64, bool) { - if h != nil { - return 0., false - } - - return daysInMonth(dateFromSampleValue(f)), true - }, - "day_of_month": func(f float64, h *histogram.FloatHistogram, vargs ...float64) (float64, bool) { - if h != nil { - return 0., false - } - - return dayOfMonth(dateFromSampleValue(f)), true - }, - "day_of_week": func(f float64, h *histogram.FloatHistogram, vargs ...float64) (float64, bool) { - if h != nil { - return 0., false - } - - return dayOfWeek(dateFromSampleValue(f)), true - }, - "day_of_year": func(f float64, h *histogram.FloatHistogram, vargs ...float64) (float64, bool) { - if h != nil { - return 0., false - } - - return dayOfYear(dateFromSampleValue(f)), true - }, - "hour": func(f float64, h *histogram.FloatHistogram, vargs ...float64) (float64, bool) { - if h != nil { - return 0., false - } - - return hour(dateFromSampleValue(f)), true - }, - "minute": func(f float64, h *histogram.FloatHistogram, vargs ...float64) (float64, bool) { - if h != nil { - return 0., false - } - - return minute(dateFromSampleValue(f)), true - }, - "month": func(f float64, h *histogram.FloatHistogram, vargs ...float64) (float64, bool) { - if h != nil { - return 0., false - } - - return month(dateFromSampleValue(f)), true - }, - "year": func(f float64, h *histogram.FloatHistogram, vargs ...float64) (float64, bool) { - if h != nil { - return 0., false - } - - return year(dateFromSampleValue(f)), true - }, + "days_in_month": dateTimeFunc(daysInMonth), + "day_of_month": dateTimeFunc(dayOfMonth), + "day_of_week": dateTimeFunc(dayOfWeek), + "day_of_year": dateTimeFunc(dayOfYear), + "hour": dateTimeFunc(hour), + "minute": dateTimeFunc(minute), + "month": dateTimeFunc(month), + "year": dateTimeFunc(year), // hack we only have sort functions as argument for "timestamp" possibly so they dont actually // need to sort anything. This is only for compatibility to prometheus as this sort of query does // not make too much sense. @@ -236,30 +188,14 @@ var noArgFuncs = map[string]noArgFunctionCall{ return float64(t) / 1000 }, // variants of date time functions with no argument - "days_in_month": func(t int64) float64 { - return daysInMonth(dateFromStepTime(t)) - }, - "day_of_month": func(t int64) float64 { - return dayOfMonth(dateFromStepTime(t)) - }, - "day_of_week": func(t int64) float64 { - return dayOfWeek(dateFromStepTime(t)) - }, - "day_of_year": func(t int64) float64 { - return dayOfYear(dateFromStepTime(t)) - }, - "hour": func(t int64) float64 { - return hour(dateFromStepTime(t)) - }, - "minute": func(t int64) float64 { - return minute(dateFromStepTime(t)) - }, - "month": func(t int64) float64 { - return month(dateFromStepTime(t)) - }, - "year": func(t int64) float64 { - return year(dateFromStepTime(t)) - }, + "days_in_month": dateTimeNoArgFunc(daysInMonth), + "day_of_month": dateTimeNoArgFunc(dayOfMonth), + "day_of_week": dateTimeNoArgFunc(dayOfWeek), + "day_of_year": dateTimeNoArgFunc(dayOfYear), + "hour": dateTimeNoArgFunc(hour), + "minute": dateTimeNoArgFunc(minute), + "month": dateTimeNoArgFunc(month), + "year": dateTimeNoArgFunc(year), } func simpleFunc(f func(float64) float64) functionCall { @@ -271,6 +207,21 @@ func simpleFunc(f func(float64) float64) functionCall { } } +func dateTimeFunc(f func(time.Time) float64) functionCall { + return func(v float64, h *histogram.FloatHistogram, vargs ...float64) (float64, bool) { + if h != nil { + return 0., false + } + return f(dateFromSampleValue(v)), true + } +} + +func dateTimeNoArgFunc(f func(time.Time) float64) noArgFunctionCall { + return func(t int64) float64 { + return f(dateFromStepTime(t)) + } +} + func dateFromSampleValue(f float64) time.Time { return time.Unix(int64(f), 0).UTC() } diff --git a/vendor/github.com/thanos-io/promql-engine/logicalplan/distribute.go b/vendor/github.com/thanos-io/promql-engine/logicalplan/distribute.go index 5a8027df95e..446ce5d9a03 100644 --- a/vendor/github.com/thanos-io/promql-engine/logicalplan/distribute.go +++ b/vendor/github.com/thanos-io/promql-engine/logicalplan/distribute.go @@ -31,18 +31,18 @@ type timeRange struct { type timeRanges []timeRange -// minOverlap returns the smallest overlap between consecutive time ranges. -func (trs timeRanges) minOverlap() time.Duration { +// minOverlap returns the smallest overlap between consecutive time ranges that overlap the interval [mint, maxt]. +func (trs timeRanges) minOverlap(mint, maxt int64) time.Duration { var minEngineOverlap time.Duration = math.MaxInt64 if len(trs) == 1 { return minEngineOverlap } for i := 1; i < len(trs); i++ { - overlap := trs[i-1].end.Sub(trs[i].start) - if overlap < minEngineOverlap { - minEngineOverlap = overlap + if trs[i].end.UnixMilli() < mint || trs[i].start.UnixMilli() > maxt { + continue } + minEngineOverlap = min(minEngineOverlap, trs[i-1].end.Sub(trs[i].start)) } return minEngineOverlap } @@ -53,14 +53,11 @@ func (lrs labelSetRanges) addRange(key string, tr timeRange) { lrs[key] = append(lrs[key], tr) } -// minOverlap returns the smallest overlap between all label set ranges. -func (lrs labelSetRanges) minOverlap() time.Duration { +// minOverlap returns the smallest overlap between all label set ranges that overlap the interval [mint, maxt]. +func (lrs labelSetRanges) minOverlap(mint, maxt int64) time.Duration { var minLabelsetOverlap time.Duration = math.MaxInt64 for _, lr := range lrs { - minRangeOverlap := lr.minOverlap() - if minRangeOverlap < minLabelsetOverlap { - minLabelsetOverlap = minRangeOverlap - } + minLabelsetOverlap = min(minLabelsetOverlap, lr.minOverlap(mint, maxt)) } return minLabelsetOverlap @@ -179,39 +176,8 @@ func (m DistributedExecutionOptimizer) Optimize(plan Node, opts *query.Options) }) } } - minEngineOverlap := labelRanges.minOverlap() - // Preprocess rewrite distributable averages as sum/count var warns = annotations.New() - TraverseBottomUp(nil, &plan, func(parent, current *Node) (stop bool) { - if !(isDistributive(current, m.SkipBinaryPushdown, engineLabels, warns) || isAvgAggregation(current)) { - return true - } - // If the current node is avg(), distribute the operation and - // stop the traversal. - if aggr, ok := (*current).(*Aggregation); ok { - if aggr.Op != parser.AVG { - return true - } - - sum := *(*current).(*Aggregation) - sum.Op = parser.SUM - count := *(*current).(*Aggregation) - count.Op = parser.COUNT - *current = &Binary{ - Op: parser.DIV, - LHS: &sum, - RHS: &count, - VectorMatching: &parser.VectorMatching{ - Include: aggr.Grouping, - MatchingLabels: aggr.Grouping, - On: !aggr.Without, - }, - } - return true - } - return !(isDistributive(parent, m.SkipBinaryPushdown, engineLabels, warns) || isAvgAggregation(parent)) - }) // TODO(fpetkovski): Consider changing TraverseBottomUp to pass in a list of parents in the transform function. parents := make(map[*Node]*Node) @@ -220,21 +186,50 @@ func (m DistributedExecutionOptimizer) Optimize(plan Node, opts *query.Options) return false }) TraverseBottomUp(nil, &plan, func(parent, current *Node) (stop bool) { + // Handle avg() specially - it's not distributive but can be distributed as sum/count. + if isAvgAggregation(current) { + *current = m.distributeAvg(*current, engines, m.subqueryOpts(parents, current, opts), labelRanges) + return true + } + // If the current operation is not distributive, stop the traversal. if !isDistributive(current, m.SkipBinaryPushdown, engineLabels, warns) { return true } - // If the current node is an aggregation, distribute the operation and - // stop the traversal. + // Handle absent functions specially + if isAbsent(current) { + *current = m.distributeAbsent(*current, engines, calculateStartOffset(current, opts.LookbackDelta), m.subqueryOpts(parents, current, opts)) + return true + } + + // If the current node is an aggregation, check if we should distribute here + // or continue traversing up. if aggr, ok := (*current).(*Aggregation); ok { + // If this aggregation preserves partition labels and there's a + // distributive aggregation ancestor, continue up to let it handle distribution. + // This enables patterns like: + // - topk(10, sum by (P, instance) (X)) + // - sum(metric_a * group by (P) (metric_b)) + // - max(sum by (P, instance) (X)) + // where P is a partition label - we can push the entire expression + // to remote engines. + // + // We need to check ancestors (not just immediate parent) because the + // aggregation might be nested inside a binary expression that is itself + // inside another aggregation: sum(A * group by (P) (B)) + if preservesPartitionLabels(*current, engineLabels) { + if hasDistributiveAncestor(parents, current, m.SkipBinaryPushdown, engineLabels, warns) { + return false + } + } localAggregation := aggr.Op if aggr.Op == parser.COUNT { localAggregation = parser.SUM } remoteAggregation := newRemoteAggregation(aggr, engines) - subQueries := m.distributeQuery(&remoteAggregation, engines, m.subqueryOpts(parents, current, opts), minEngineOverlap) + subQueries := m.distributeQuery(&remoteAggregation, engines, m.subqueryOpts(parents, current, opts), labelRanges) *current = &Aggregation{ Op: localAggregation, Expr: subQueries, @@ -244,17 +239,14 @@ func (m DistributedExecutionOptimizer) Optimize(plan Node, opts *query.Options) } return true } - if isAbsent(*current) { - *current = m.distributeAbsent(*current, engines, calculateStartOffset(current, opts.LookbackDelta), m.subqueryOpts(parents, current, opts)) - return true - } - // If the parent operation is distributive, continue the traversal. - if isDistributive(parent, m.SkipBinaryPushdown, engineLabels, warns) { + // If the parent operation is distributive or is an avg (which we handle specially), + // continue the traversal. + if isDistributive(parent, m.SkipBinaryPushdown, engineLabels, warns) || isAvgAggregation(parent) { return false } - *current = m.distributeQuery(current, engines, m.subqueryOpts(parents, current, opts), minEngineOverlap) + *current = m.distributeQuery(current, engines, m.subqueryOpts(parents, current, opts), labelRanges) return true }) return plan, *warns @@ -310,8 +302,10 @@ func newRemoteAggregation(rootAggregation *Aggregation, engines []api.RemoteEngi // distributeQuery takes a PromQL expression in the form of *parser.Expr and a set of remote engines. // For each engine which matches the time range of the query, it creates a RemoteExecution scoped to the range of the engine. // All remote executions are wrapped in a Deduplicate logical node to make sure that results from overlapping engines are deduplicated. -func (m DistributedExecutionOptimizer) distributeQuery(expr *Node, engines []api.RemoteEngine, opts *query.Options, allowedStartOffset time.Duration) Node { +func (m DistributedExecutionOptimizer) distributeQuery(expr *Node, engines []api.RemoteEngine, opts *query.Options, labelRanges labelSetRanges) Node { startOffset := calculateStartOffset(expr, opts.LookbackDelta) + allowedStartOffset := labelRanges.minOverlap(opts.Start.UnixMilli()-startOffset.Milliseconds(), opts.End.UnixMilli()) + if allowedStartOffset < startOffset { return *expr } @@ -417,14 +411,61 @@ func (m DistributedExecutionOptimizer) distributeAbsent(expr Node, engines []api return rootExpr } -func isAbsent(expr Node) bool { - call, ok := expr.(*FunctionCall) +func isAbsent(expr *Node) bool { + if expr == nil { + return false + } + call, ok := (*expr).(*FunctionCall) if !ok { return false } return call.Func.Name == "absent" || call.Func.Name == "absent_over_time" } +// distributeAvg distributes an avg() aggregation by rewriting it as sum()/count() +// where each side is distributed independently. This is necessary because averaging +// averages gives incorrect results - we must sum all values and count all values +// separately, then divide. +func (m DistributedExecutionOptimizer) distributeAvg(expr Node, engines []api.RemoteEngine, opts *query.Options, labelRanges labelSetRanges) Node { + aggr := expr.(*Aggregation) + + sumAggr := *aggr + sumAggr.Op = parser.SUM + sumRemote := newRemoteAggregation(&sumAggr, engines) + sumSubQueries := m.distributeQuery(&sumRemote, engines, opts, labelRanges) + distributedSum := &Aggregation{ + Op: parser.SUM, + Expr: sumSubQueries, + Param: aggr.Param, + Grouping: aggr.Grouping, + Without: aggr.Without, + } + + countAggr := *aggr + countAggr.Op = parser.COUNT + countAggr.Expr = aggr.Expr.Clone() + countRemote := newRemoteAggregation(&countAggr, engines) + countSubQueries := m.distributeQuery(&countRemote, engines, opts, labelRanges) + distributedCount := &Aggregation{ + Op: parser.SUM, + Expr: countSubQueries, + Param: aggr.Param, + Grouping: aggr.Grouping, + Without: aggr.Without, + } + + return &Binary{ + Op: parser.DIV, + LHS: distributedSum, + RHS: distributedCount, + VectorMatching: &parser.VectorMatching{ + Include: aggr.Grouping, + MatchingLabels: aggr.Grouping, + On: !aggr.Without, + }, + } +} + func getStartTimeForEngine(e api.RemoteEngine, opts *query.Options, offset time.Duration, globalMinT int64) (time.Time, bool) { if e.MinT() > opts.End.UnixMilli() { return time.Time{}, false @@ -525,6 +566,73 @@ func numSteps(start, end time.Time, step time.Duration) int64 { return (end.UnixMilli()-start.UnixMilli())/step.Milliseconds() + 1 } +// preservesPartitionLabels checks if an expression preserves all partition labels. +// An expression preserves partition labels if the output series will still have +// those labels, meaning results from different engines won't overlap and can be +// coalesced without deduplication. +// +// This enables pushing more operations to remote engines. For example: +// +// topk(10, sum by (P, instance) (X)) +// +// If P is a partition label, the sum preserves P, so topk can also be pushed +// down since each engine's top 10 won't overlap with other engines' top 10. +func preservesPartitionLabels(expr Node, partitionLabels map[string]struct{}) bool { + if len(partitionLabels) == 0 { + return true + } + + switch e := expr.(type) { + case *VectorSelector, *MatrixSelector, *NumberLiteral, *StringLiteral: + return true + case *Aggregation: + for lbl := range partitionLabels { + if slices.Contains(e.Grouping, lbl) == e.Without { + return false + } + } + return true + case *Binary: + if e.VectorMatching != nil { + for lbl := range partitionLabels { + inMatching := slices.Contains(e.VectorMatching.MatchingLabels, lbl) + inInclude := slices.Contains(e.VectorMatching.Include, lbl) + if !inInclude && inMatching != e.VectorMatching.On { + return false + } + } + } + return preservesPartitionLabels(e.LHS, partitionLabels) && + preservesPartitionLabels(e.RHS, partitionLabels) + case *FunctionCall: + if e.Func.Name == "label_replace" { + if _, ok := partitionLabels[UnsafeUnwrapString(e.Args[1])]; ok { + return false + } + } + for _, arg := range e.Args { + if arg.ReturnType() == parser.ValueTypeVector || arg.ReturnType() == parser.ValueTypeMatrix { + if !preservesPartitionLabels(arg, partitionLabels) { + return false + } + } + } + return true + case *Unary: + return preservesPartitionLabels(e.Expr, partitionLabels) + case *Parens: + return preservesPartitionLabels(e.Expr, partitionLabels) + case *StepInvariantExpr: + return preservesPartitionLabels(e.Expr, partitionLabels) + case *CheckDuplicateLabels: + return preservesPartitionLabels(e.Expr, partitionLabels) + case *Subquery: + return preservesPartitionLabels(e.Expr, partitionLabels) + default: + return false + } +} + func isDistributive(expr *Node, skipBinaryPushdown bool, engineLabels map[string]struct{}, warns *annotations.Annotations) bool { if expr == nil { return false @@ -575,25 +683,106 @@ func isBinaryExpressionWithDistributableMatching(expr *Binary, engineLabels map[ if expr.VectorMatching == nil { return false } - // TODO: think about "or" but for safety we dont push it down for now. - if expr.Op == parser.LOR { + + isSetOperation := expr.Op == parser.LOR || expr.Op == parser.LUNLESS + + // For set operations (or/unless) with a constant expression on either side, + // distribution is not safe because the constant will be evaluated by each + // engine and cause duplicates. For example, `bar or on () vector(0)` would + // have vector(0) returned by every engine. + if isSetOperation && (IsConstantExpr(expr.LHS) || IsConstantExpr(expr.RHS)) { return false } - if expr.VectorMatching.On { - // on (...) - if ... contains all partition labels we can distribute - for lbl := range engineLabels { - if !slices.Contains(expr.VectorMatching.MatchingLabels, lbl) { - return false - } + // Default matching (no explicit on() or ignoring()) matches on all labels. + // For this to be safe, both sides must preserve partition labels so that the + // matching will include them. If only one side has partition labels, the matching + // behavior differs per partition. + if len(expr.VectorMatching.MatchingLabels) == 0 && !expr.VectorMatching.On { + // For or/unless with default matching, we can distribute if: + // 1. Both sides preserve partition labels (matching will include them), OR + // 2. Both sides have the same partition label scope (both global, or both + // filtered to the same partition values) + // + // Case 2 is important because it allows queries like: + // metric_a or metric_b (both global) + // metric_a{zone="east"} or metric_b{zone="east"} (same partition) + if isSetOperation { + lhsMatchers := getPartitionMatchers(expr.LHS, engineLabels) + rhsMatchers := getPartitionMatchers(expr.RHS, engineLabels) + return partitionMatchersEqual(lhsMatchers, rhsMatchers) } return true } - // ignoring (...) - if ... does contain any engine labels we cannot distribute + for lbl := range engineLabels { - if slices.Contains(expr.VectorMatching.MatchingLabels, lbl) { + inMatching := slices.Contains(expr.VectorMatching.MatchingLabels, lbl) + inInclude := slices.Contains(expr.VectorMatching.Include, lbl) + // If a partition label is in group_left/group_right (Include), distribution + // changes match cardinality semantics. Each partition only sees one value for + // that label, so what's many-to-many globally may become one-to-one per partition, + // producing results instead of errors (or vice versa). + return !inInclude && inMatching == expr.VectorMatching.On + } + // At this point, partition labels are in the matching set (either via on() or + // by not being in ignoring()). This means or/unless can be safely distributed + // because the matching ensures series are paired by partition. + return true +} + +// getPartitionMatchers extracts matchers for partition labels from all selectors in the expression. +// Returns a map of partition label name to a list of matchers found across all selectors. +// If a selector has no matcher for a partition label, it's considered "global" for that label. +func getPartitionMatchers(expr Node, partitionLabels map[string]struct{}) map[string][]*labels.Matcher { + result := make(map[string][]*labels.Matcher) + for lbl := range partitionLabels { + result[lbl] = nil + } + + Traverse(&expr, func(current *Node) { + vs, ok := (*current).(*VectorSelector) + if !ok { + return + } + for _, m := range vs.LabelMatchers { + if _, isPartition := partitionLabels[m.Name]; isPartition { + result[m.Name] = append(result[m.Name], m) + } + } + }) + + return result +} + +// partitionMatchersEqual checks if two sets of partition matchers are equivalent. +func partitionMatchersEqual(a, b map[string][]*labels.Matcher) bool { + for lbl := range a { + aMatchers := a[lbl] + bMatchers := b[lbl] + + // Both global (no matchers) for this label + if len(aMatchers) == 0 && len(bMatchers) == 0 { + continue + } + + // One has matchers, other doesn't - not equal + if len(aMatchers) != len(bMatchers) { return false } + + // Compare matchers - they should be identical + // Sort by name+type+value for comparison + aSet := make(map[string]struct{}) + for _, m := range aMatchers { + key := fmt.Sprintf("%s:%d:%s", m.Name, m.Type, m.Value) + aSet[key] = struct{}{} + } + for _, m := range bMatchers { + key := fmt.Sprintf("%s:%d:%s", m.Name, m.Type, m.Value) + if _, ok := aSet[key]; !ok { + return false + } + } } return true } @@ -639,6 +828,23 @@ func matchesExternalLabels(ms []*labels.Matcher, externalLabels labels.Labels) b return true } +// hasDistributiveAncestor checks if there's a distributive node somewhere up the +// parent chain from the current node that can handle distribution. +// We must have an unbroken chain of distributive nodes to the ancestor for it to +// be able to handle distribution on our behalf. +func hasDistributiveAncestor(parents map[*Node]*Node, current *Node, skipBinaryPushdown bool, engineLabels map[string]struct{}, warns *annotations.Annotations) bool { + for p := parents[current]; p != nil; p = parents[p] { + if !isDistributive(p, skipBinaryPushdown, engineLabels, warns) { + // We hit a non-distributive node, so we can't push through it. + // No ancestor can help us distribute. + return false + } + } + // All ancestors are distributive, so the root (or the point where we + // stop traversing) can handle distribution. + return parents[current] != nil +} + func maxTime(a, b time.Time) time.Time { if a.After(b) { return a diff --git a/vendor/github.com/thanos-io/promql-engine/logicalplan/logical_nodes.go b/vendor/github.com/thanos-io/promql-engine/logicalplan/logical_nodes.go index f47abcb66da..19be0e06316 100644 --- a/vendor/github.com/thanos-io/promql-engine/logicalplan/logical_nodes.go +++ b/vendor/github.com/thanos-io/promql-engine/logicalplan/logical_nodes.go @@ -176,8 +176,15 @@ type NumberLiteral struct { Val float64 } -func (c *NumberLiteral) Clone() Node { return &NumberLiteral{Val: c.Val} } -func (c *NumberLiteral) String() string { return fmt.Sprint(c.Val) } +func (c *NumberLiteral) Clone() Node { return &NumberLiteral{Val: c.Val} } +func (c *NumberLiteral) String() string { + // Wrap negative numbers in parentheses to preserve parsing behavior. + // Without parens, "-1.5 ^ 2" parses as "-(1.5 ^ 2)" due to operator precedence. + if c.Val < 0 { + return fmt.Sprintf("(%v)", c.Val) + } + return fmt.Sprint(c.Val) +} func (c *NumberLiteral) ReturnType() parser.ValueType { return parser.ValueTypeScalar } func (c *NumberLiteral) Type() NodeType { return NumberLiteralNode } diff --git a/vendor/github.com/thanos-io/promql-engine/query/options.go b/vendor/github.com/thanos-io/promql-engine/query/options.go index 5dbb3c09718..6bd01f6658b 100644 --- a/vendor/github.com/thanos-io/promql-engine/query/options.go +++ b/vendor/github.com/thanos-io/promql-engine/query/options.go @@ -33,6 +33,16 @@ func (o *Options) NumSteps() int { return int(totalSteps) } +// TotalSteps returns the total number of steps in the query, regardless of batching. +// This is useful for pre-allocating result slices. +func (o *Options) TotalSteps() int { + // Instant evaluation is executed as a range evaluation with one step. + if o.Step.Milliseconds() == 0 { + return 1 + } + return int((o.End.UnixMilli()-o.Start.UnixMilli())/o.Step.Milliseconds() + 1) +} + func (o *Options) IsInstantQuery() bool { return o.NumSteps() == 1 } diff --git a/vendor/github.com/thanos-io/promql-engine/ringbuffer/functions.go b/vendor/github.com/thanos-io/promql-engine/ringbuffer/functions.go index 93a8175e37c..09299a452c7 100644 --- a/vendor/github.com/thanos-io/promql-engine/ringbuffer/functions.go +++ b/vendor/github.com/thanos-io/promql-engine/ringbuffer/functions.go @@ -12,7 +12,6 @@ import ( "github.com/thanos-io/promql-engine/execution/parse" "github.com/thanos-io/promql-engine/warnings" - "github.com/efficientgo/core/errors" "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/promql/parser/posrange" "github.com/prometheus/prometheus/util/annotations" @@ -116,11 +115,8 @@ func instantValue(ctx context.Context, samples []Sample, isRate bool) (float64, if !isRate || !ss[1].V.H.DetectReset(ss[0].V.H) { _, err := resultSample.V.H.Sub(ss[0].V.H) - if errors.Is(err, histogram.ErrHistogramsIncompatibleSchema) { - warnings.AddToContext(annotations.NewMixedExponentialCustomHistogramsWarning("", posrange.PositionRange{}), ctx) - return 0, nil, false - } else if errors.Is(err, histogram.ErrHistogramsIncompatibleBounds) { - warnings.AddToContext(annotations.NewIncompatibleCustomBucketsHistogramsWarning("", posrange.PositionRange{}), ctx) + if err != nil { + warnings.AddToContext(warnings.ConvertHistogramError(err), ctx) return 0, nil, false } } @@ -146,18 +142,6 @@ func instantValue(ctx context.Context, samples []Sample, isRate bool) (float64, return resultSample.V.F, resultSample.V.H, true } -func handleHistogramErr(ctx context.Context, err error) error { - if errors.Is(err, histogram.ErrHistogramsIncompatibleSchema) { - warnings.AddToContext(annotations.NewMixedExponentialCustomHistogramsWarning("", posrange.PositionRange{}), ctx) - return nil - } else if errors.Is(err, histogram.ErrHistogramsIncompatibleBounds) { - warnings.AddToContext(annotations.NewIncompatibleCustomBucketsHistogramsWarning("", posrange.PositionRange{}), ctx) - return nil - } - - return err -} - var rangeVectorFuncs = map[string]FunctionCall{ "sum_over_time": func(f FunctionArgs) (float64, *histogram.FloatHistogram, bool, error) { if len(f.Samples) == 0 { @@ -348,10 +332,7 @@ var rangeVectorFuncs = map[string]FunctionCall{ if f.MetricAppearedTs == math.MinInt64 { panic("BUG: we got some Samples but metric still hasn't appeared") } - v, h, err := extendedRate(f.ctx, f.Samples, true, true, f.StepTime, f.SelectRange, f.Offset, f.MetricAppearedTs) - if err != nil { - return 0, nil, false, err - } + v, h := extendedRate(f.ctx, f.Samples, true, true, f.StepTime, f.SelectRange, f.Offset, f.MetricAppearedTs) return v, h, true, nil }, "xdelta": func(f FunctionArgs) (float64, *histogram.FloatHistogram, bool, error) { @@ -361,10 +342,7 @@ var rangeVectorFuncs = map[string]FunctionCall{ if f.MetricAppearedTs == math.MinInt64 { panic("BUG: we got some Samples but metric still hasn't appeared") } - v, h, err := extendedRate(f.ctx, f.Samples, false, false, f.StepTime, f.SelectRange, f.Offset, f.MetricAppearedTs) - if err != nil { - return 0, nil, false, err - } + v, h := extendedRate(f.ctx, f.Samples, false, false, f.StepTime, f.SelectRange, f.Offset, f.MetricAppearedTs) return v, h, true, nil }, "xincrease": func(f FunctionArgs) (float64, *histogram.FloatHistogram, bool, error) { @@ -374,10 +352,7 @@ var rangeVectorFuncs = map[string]FunctionCall{ if f.MetricAppearedTs == math.MinInt64 { panic("BUG: we got some Samples but metric still hasn't appeared") } - v, h, err := extendedRate(f.ctx, f.Samples, true, false, f.StepTime, f.SelectRange, f.Offset, f.MetricAppearedTs) - if err != nil { - return 0, nil, false, err - } + v, h := extendedRate(f.ctx, f.Samples, true, false, f.StepTime, f.SelectRange, f.Offset, f.MetricAppearedTs) return v, h, true, nil }, "predict_linear": func(f FunctionArgs) (float64, *histogram.FloatHistogram, bool, error) { @@ -433,10 +408,7 @@ func extrapolatedRate(ctx context.Context, samples []Sample, numSamples int, isC } if samples[0].V.H != nil { - resultHistogram, err = histogramRate(ctx, samples, isCounter) - if err != nil { - return 0, nil, false, err - } + resultHistogram = histogramRate(ctx, samples, isCounter) } else { resultValue = samples[len(samples)-1].V.F - samples[0].V.F if isCounter { @@ -525,7 +497,7 @@ func extrapolatedRate(ctx context.Context, samples []Sample, numSamples int, isC // It calculates the rate (allowing for counter resets if isCounter is true), // taking into account the last sample before the range start, and returns // the result as either per-second (if isRate is true) or overall. -func extendedRate(ctx context.Context, samples []Sample, isCounter, isRate bool, stepTime int64, selectRange int64, offset int64, metricAppearedTs int64) (float64, *histogram.FloatHistogram, error) { +func extendedRate(ctx context.Context, samples []Sample, isCounter, isRate bool, stepTime int64, selectRange int64, offset int64, metricAppearedTs int64) (float64, *histogram.FloatHistogram) { var ( rangeStart = stepTime - (selectRange + offset) rangeEnd = stepTime - offset @@ -534,14 +506,9 @@ func extendedRate(ctx context.Context, samples []Sample, isCounter, isRate bool, ) if samples[0].V.H != nil { - var err error // TODO - support extended rate for histograms - resultHistogram, err = histogramRate(ctx, samples, isCounter) - if err != nil { - return 0, nil, err - } - - return resultValue, resultHistogram, nil + resultHistogram = histogramRate(ctx, samples, isCounter) + return resultValue, resultHistogram } sameVals := true @@ -558,7 +525,7 @@ func extendedRate(ctx context.Context, samples []Sample, isCounter, isRate bool, if isCounter && !isRate && sameVals { // Make sure we are not at the end of the range. if stepTime-offset <= until { - return samples[0].V.F, nil, nil + return samples[0].V.F, nil } } @@ -571,7 +538,7 @@ func extendedRate(ctx context.Context, samples []Sample, isCounter, isRate bool, // If the point before the range is too far from rangeStart, drop it. if float64(rangeStart-samples[0].T) > averageDurationBetweenSamples { if len(samples) < 3 { - return resultValue, nil, nil + return resultValue, nil } firstPoint = 1 sampledInterval = float64(samples[len(samples)-1].T - samples[1].T) @@ -611,16 +578,16 @@ func extendedRate(ctx context.Context, samples []Sample, isCounter, isRate bool, resultValue = resultValue / float64(selectRange/1000) } - return resultValue, nil, nil + return resultValue, nil } // histogramRate is a helper function for extrapolatedRate. It requires // points[0] to be a histogram. It returns nil if any other Point in points is // not a histogram. -func histogramRate(ctx context.Context, points []Sample, isCounter bool) (*histogram.FloatHistogram, error) { +func histogramRate(ctx context.Context, points []Sample, isCounter bool) *histogram.FloatHistogram { // Calculating a rate on a single sample is not defined. if len(points) < 2 { - return nil, nil + return nil } var ( prev = points[0].V.H @@ -630,7 +597,7 @@ func histogramRate(ctx context.Context, points []Sample, isCounter bool) (*histo if last == nil { warnings.AddToContext(annotations.MixedFloatsHistogramsWarning, ctx) - return nil, nil // Range contains a mix of histograms and floats. + return nil // Range contains a mix of histograms and floats. } // We check for gauge type histograms in the loop below, but the loop @@ -655,14 +622,14 @@ func histogramRate(ctx context.Context, points []Sample, isCounter bool) (*histo if last.UsesCustomBuckets() != usingCustomBuckets { warnings.AddToContext(annotations.NewMixedExponentialCustomHistogramsWarning("", posrange.PositionRange{}), ctx) - return nil, nil + return nil } minSchema := min(last.Schema, prev.Schema) if last.UsesCustomBuckets() != usingCustomBuckets { warnings.AddToContext(annotations.MixedExponentialCustomHistogramsWarning, ctx) - return nil, nil + return nil } // https://github.com/prometheus/prometheus/blob/ccea61c7bf1e6bce2196ba8189a209945a204c5b/promql/functions.go#L183 @@ -674,7 +641,7 @@ func histogramRate(ctx context.Context, points []Sample, isCounter bool) (*histo curr := currPoint.V.H if curr == nil { warnings.AddToContext(annotations.MixedFloatsHistogramsWarning, ctx) - return nil, nil // Range contains a mix of histograms and floats. + return nil // Range contains a mix of histograms and floats. } if !isCounter { continue @@ -687,16 +654,14 @@ func histogramRate(ctx context.Context, points []Sample, isCounter bool) (*histo } if curr.UsesCustomBuckets() != usingCustomBuckets { warnings.AddToContext(annotations.MixedExponentialCustomHistogramsWarning, ctx) - return nil, nil + return nil } } h := last.CopyToSchema(minSchema) if _, err := h.Sub(prev); err != nil { - if err := handleHistogramErr(ctx, err); err != nil { - return nil, err - } - return nil, nil + warnings.AddToContext(warnings.ConvertHistogramError(err), ctx) + return nil } if isCounter { @@ -705,10 +670,8 @@ func histogramRate(ctx context.Context, points []Sample, isCounter bool) (*histo curr := currPoint.V.H if curr.DetectReset(prev) { if _, err := h.Add(prev); err != nil { - if err := handleHistogramErr(ctx, err); err != nil { - return nil, err - } - return nil, nil + warnings.AddToContext(warnings.ConvertHistogramError(err), ctx) + return nil } } prev = curr @@ -718,7 +681,7 @@ func histogramRate(ctx context.Context, points []Sample, isCounter bool) (*histo } h.CounterResetHint = histogram.GaugeType - return h.Compact(0), nil + return h.Compact(0) } func madOverTime(ctx context.Context, points []Sample) (float64, bool) { @@ -819,10 +782,12 @@ func avgOverTime(ctx context.Context, points []Sample) (float64, *histogram.Floa right := mean.Copy().Div(count) toAdd, err := left.Sub(right) if err != nil { - return 0, nil, false, handleHistogramErr(ctx, err) + warnings.AddToContext(warnings.ConvertHistogramError(err), ctx) + return 0, nil, false, nil } if _, err = mean.Add(toAdd); err != nil { - return 0, nil, false, handleHistogramErr(ctx, err) + warnings.AddToContext(warnings.ConvertHistogramError(err), ctx) + return 0, nil, false, nil } } return 0, mean, true, nil @@ -874,7 +839,8 @@ func sumOverTime(ctx context.Context, points []Sample) (float64, *histogram.Floa return 0, nil, false, nil } if _, err := res.Add(v.V.H); err != nil { - return 0, nil, false, handleHistogramErr(ctx, err) + warnings.AddToContext(warnings.ConvertHistogramError(err), ctx) + return 0, nil, false, nil } } return 0, res, true, nil diff --git a/vendor/github.com/thanos-io/promql-engine/ringbuffer/overtime.go b/vendor/github.com/thanos-io/promql-engine/ringbuffer/overtime.go index ec2bb65f628..10a5f9f4ded 100644 --- a/vendor/github.com/thanos-io/promql-engine/ringbuffer/overtime.go +++ b/vendor/github.com/thanos-io/promql-engine/ringbuffer/overtime.go @@ -16,6 +16,26 @@ import ( "github.com/prometheus/prometheus/util/annotations" ) +// If we use $__interval as steps and $__rate_interval for the sliding window +// we usually have an overlap of 4 steps here. This should ensure we use the +// optimized streaming approach normally, but wont regress if a user wants a very +// high overlap. +const maxStreamingStepOverlap = 5 + +// overlapSteps calculates the number of evaluation steps that a range window overlaps. +// This is the number of steps where a single sample contributes to the result. +func overlapSteps(opts query.Options, selectRange int64) int64 { + step := max(1, opts.Step.Milliseconds()) + return min( + (selectRange-1)/step+1, + querySteps(opts), + ) +} + +func UseStreamingRingBuffers(opts query.Options, selectRange int64) bool { + return overlapSteps(opts, selectRange) <= maxStreamingStepOverlap +} + // OverTimeBuffer is a Buffer which can calculate [agg]_over_time for a series in a // streaming manner, calculating the value incrementally for each step where the sample is used. type OverTimeBuffer struct { @@ -23,7 +43,6 @@ type OverTimeBuffer struct { stepRanges []stepRange // stepStates contains the aggregation state for the corresponding stepRange stepStates []stepState - // firstTimestamps contains the timestamp of the first sample for each evaluation step. firstTimestamps []int64 @@ -41,10 +60,7 @@ type stepState struct { func newOverTimeBuffer(opts query.Options, selectRange, offset int64, accMaker func() compute.Accumulator) *OverTimeBuffer { var ( step = max(1, opts.Step.Milliseconds()) - numSteps = min( - (selectRange-1)/step+1, - querySteps(opts), - ) + numSteps = overlapSteps(opts, selectRange) current = opts.Start.UnixMilli() firstTimestamps = make([]int64, 0, numSteps) @@ -72,26 +88,42 @@ func newOverTimeBuffer(opts query.Options, selectRange, offset int64, accMaker f } } -// NewCountOverTimeBuffer creates a new OverTimeBuffer for the count_over_time function. func NewCountOverTimeBuffer(opts query.Options, selectRange, offset int64) *OverTimeBuffer { return newOverTimeBuffer(opts, selectRange, offset, func() compute.Accumulator { return compute.NewCountAcc() }) } -// NewMaxOverTimeBuffer creates a new OverTimeBuffer for the max_over_time function. func NewMaxOverTimeBuffer(opts query.Options, selectRange, offset int64) *OverTimeBuffer { return newOverTimeBuffer(opts, selectRange, offset, func() compute.Accumulator { return compute.NewMaxAcc() }) } -// NewMinOverTime creates a new OverTimeBuffer for the min_over_time function. func NewMinOverTimeBuffer(opts query.Options, selectRange, offset int64) *OverTimeBuffer { return newOverTimeBuffer(opts, selectRange, offset, func() compute.Accumulator { return compute.NewMinAcc() }) } -// NewSumOverTime creates a new OverTimeBuffer for the sum_over_time function. func NewSumOverTimeBuffer(opts query.Options, selectRange, offset int64) *OverTimeBuffer { return newOverTimeBuffer(opts, selectRange, offset, func() compute.Accumulator { return compute.NewSumAcc() }) } +func NewAvgOverTimeBuffer(opts query.Options, selectRange, offset int64) *OverTimeBuffer { + return newOverTimeBuffer(opts, selectRange, offset, func() compute.Accumulator { return compute.NewAvgAcc() }) +} + +func NewStdDevOverTimeBuffer(opts query.Options, selectRange, offset int64) *OverTimeBuffer { + return newOverTimeBuffer(opts, selectRange, offset, func() compute.Accumulator { return compute.NewStdDevAcc() }) +} + +func NewStdVarOverTimeBuffer(opts query.Options, selectRange, offset int64) *OverTimeBuffer { + return newOverTimeBuffer(opts, selectRange, offset, func() compute.Accumulator { return compute.NewStdVarAcc() }) +} + +func NewPresentOverTimeBuffer(opts query.Options, selectRange, offset int64) *OverTimeBuffer { + return newOverTimeBuffer(opts, selectRange, offset, func() compute.Accumulator { return compute.NewGroupAcc() }) +} + +func NewLastOverTimeBuffer(opts query.Options, selectRange, offset int64) *OverTimeBuffer { + return newOverTimeBuffer(opts, selectRange, offset, func() compute.Accumulator { return compute.NewLastAcc() }) +} + func (r *OverTimeBuffer) SampleCount() int { return r.stepRanges[0].sampleCount } @@ -111,10 +143,11 @@ func (r *OverTimeBuffer) Push(t int64, v Value) { r.stepRanges[i].sampleCount++ } - // Aggregate the sample to the current step + // Aggregate the sample to the current step. + // Accumulators track error state internally and become no-ops after an error. + // Float-only accumulators skip histograms and track via HasIgnoredHistograms(). if err := r.stepStates[i].acc.Add(v.F, v.H); err != nil { r.stepStates[i].warn = err - continue } if fts := r.firstTimestamps[i]; t >= fts { @@ -165,11 +198,17 @@ func (r *OverTimeBuffer) Eval(ctx context.Context, _, _ float64, _ int64) (float return 0, nil, false, nil } - f, h := r.stepStates[0].acc.Value() + acc := r.stepStates[0].acc + f, h := acc.Value() - if r.stepStates[0].acc.ValueType() == compute.MixedTypeValue { + if acc.ValueType() == compute.MixedTypeValue { warnings.AddToContext(annotations.MixedFloatsHistogramsWarning, ctx) return 0, nil, false, nil } - return f, h, r.stepStates[0].acc.ValueType() == compute.SingleTypeValue, nil + + // Float-only accumulators track skipped histograms; emit info-level warning + if acc.HasIgnoredHistograms() && acc.ValueType() == compute.SingleTypeValue { + warnings.AddToContext(annotations.HistogramIgnoredInMixedRangeInfo, ctx) + } + return f, h, acc.ValueType() == compute.SingleTypeValue, nil } diff --git a/vendor/github.com/thanos-io/promql-engine/storage/prometheus/matrix_selector.go b/vendor/github.com/thanos-io/promql-engine/storage/prometheus/matrix_selector.go index 3d16b79d245..aecf7921ce0 100644 --- a/vendor/github.com/thanos-io/promql-engine/storage/prometheus/matrix_selector.go +++ b/vendor/github.com/thanos-io/promql-engine/storage/prometheus/matrix_selector.go @@ -272,19 +272,33 @@ func (o *matrixSelector) loadSeries(ctx context.Context) error { } func (o *matrixSelector) newBuffer(ctx context.Context) ringbuffer.Buffer { - switch o.functionName { - case "rate": - return ringbuffer.NewRateBuffer(ctx, *o.opts, true, true, o.selectRange, o.offset) - case "increase": - return ringbuffer.NewRateBuffer(ctx, *o.opts, true, false, o.selectRange, o.offset) - case "delta": - return ringbuffer.NewRateBuffer(ctx, *o.opts, false, false, o.selectRange, o.offset) - case "count_over_time": - return ringbuffer.NewCountOverTimeBuffer(*o.opts, o.selectRange, o.offset) - case "max_over_time": - return ringbuffer.NewMaxOverTimeBuffer(*o.opts, o.selectRange, o.offset) - case "min_over_time": - return ringbuffer.NewMinOverTimeBuffer(*o.opts, o.selectRange, o.offset) + if ringbuffer.UseStreamingRingBuffers(*o.opts, o.selectRange) { + switch o.functionName { + case "rate": + return ringbuffer.NewRateBuffer(ctx, *o.opts, true, true, o.selectRange, o.offset) + case "increase": + return ringbuffer.NewRateBuffer(ctx, *o.opts, true, false, o.selectRange, o.offset) + case "delta": + return ringbuffer.NewRateBuffer(ctx, *o.opts, false, false, o.selectRange, o.offset) + case "count_over_time": + return ringbuffer.NewCountOverTimeBuffer(*o.opts, o.selectRange, o.offset) + case "max_over_time": + return ringbuffer.NewMaxOverTimeBuffer(*o.opts, o.selectRange, o.offset) + case "min_over_time": + return ringbuffer.NewMinOverTimeBuffer(*o.opts, o.selectRange, o.offset) + case "sum_over_time": + return ringbuffer.NewSumOverTimeBuffer(*o.opts, o.selectRange, o.offset) + case "avg_over_time": + return ringbuffer.NewAvgOverTimeBuffer(*o.opts, o.selectRange, o.offset) + case "stddev_over_time": + return ringbuffer.NewStdDevOverTimeBuffer(*o.opts, o.selectRange, o.offset) + case "stdvar_over_time": + return ringbuffer.NewStdVarOverTimeBuffer(*o.opts, o.selectRange, o.offset) + case "present_over_time": + return ringbuffer.NewPresentOverTimeBuffer(*o.opts, o.selectRange, o.offset) + case "last_over_time": + return ringbuffer.NewLastOverTimeBuffer(*o.opts, o.selectRange, o.offset) + } } if o.isExtFunction { diff --git a/vendor/github.com/thanos-io/promql-engine/warnings/context.go b/vendor/github.com/thanos-io/promql-engine/warnings/context.go index 7f2572c03ab..8409f661353 100644 --- a/vendor/github.com/thanos-io/promql-engine/warnings/context.go +++ b/vendor/github.com/thanos-io/promql-engine/warnings/context.go @@ -5,12 +5,22 @@ package warnings import ( "context" + "fmt" "maps" "sync" + "github.com/efficientgo/core/errors" + "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/util/annotations" ) +// MixedFloatsHistogramsAggWarning is used when an aggregation encounters both floats and histograms. +// We define this here because Prometheus's NewMixedFloatsHistogramsAggWarning requires a posrange +// which we don't have at the accumulator level. +// +//lint:ignore faillint We need fmt.Errorf to match Prometheus error format exactly. +var MixedFloatsHistogramsAggWarning = fmt.Errorf("%w aggregation", annotations.MixedFloatsHistogramsWarning) + type warningKey string const key warningKey = "promql-warnings" @@ -67,3 +77,18 @@ func FromContext(ctx context.Context) annotations.Annotations { return maps.Clone(warns) } + +// ConvertHistogramError converts histogram operation errors to appropriate annotation warnings. +// Returns nil if the error is not a histogram error. +func ConvertHistogramError(err error) error { + if err == nil { + return nil + } + if errors.Is(err, histogram.ErrHistogramsIncompatibleSchema) { + return annotations.MixedExponentialCustomHistogramsWarning + } + if errors.Is(err, histogram.ErrHistogramsIncompatibleBounds) { + return annotations.IncompatibleCustomBucketsHistogramsWarning + } + return err +} diff --git a/vendor/modules.txt b/vendor/modules.txt index 4ce4a70ba21..68a3705f8bd 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1208,7 +1208,7 @@ github.com/thanos-io/objstore/providers/gcs github.com/thanos-io/objstore/providers/s3 github.com/thanos-io/objstore/providers/swift github.com/thanos-io/objstore/tracing/opentracing -# github.com/thanos-io/promql-engine v0.0.0-20251117105526-bcec363c24e6 +# github.com/thanos-io/promql-engine v0.0.0-20251224085502-3988aa4704b5 ## explicit; go 1.24.0 github.com/thanos-io/promql-engine/api github.com/thanos-io/promql-engine/compute