diff --git a/pkg/aggregator/context_sketch_test.go b/pkg/aggregator/context_sketch_test.go index d536791232ac..ebbbcec466c9 100644 --- a/pkg/aggregator/context_sketch_test.go +++ b/pkg/aggregator/context_sketch_test.go @@ -18,9 +18,9 @@ func TestContextSketchSampling(t *testing.T) { resultSeries := ctxSketch.flush(12345.0) expectedSketch := percentile.NewQSketch() - expectedSketch.Add(1) - expectedSketch.Add(5) - expectedSketch.Compress() + expectedSketch = expectedSketch.Add(1) + expectedSketch = expectedSketch.Add(5) + expectedSketch = expectedSketch.Compress() expectedSeries := &percentile.SketchSeries{ ContextKey: contextKey, Sketches: []percentile.Sketch{{Timestamp: int64(12345), Sketch: expectedSketch}}} @@ -57,15 +57,15 @@ func TestContextSketchSamplingMultiContexts(t *testing.T) { sort.Sort(orderedSketchSeries) expectedSketch1 := percentile.NewQSketch() - expectedSketch1.Add(1) - expectedSketch1.Add(3) - expectedSketch1.Compress() + expectedSketch1 = expectedSketch1.Add(1) + expectedSketch1 = expectedSketch1.Add(3) + expectedSketch1 = expectedSketch1.Compress() expectedSeries1 := &percentile.SketchSeries{ ContextKey: contextKey1, Sketches: []percentile.Sketch{{Timestamp: int64(12345), Sketch: expectedSketch1}}} expectedSketch2 := percentile.NewQSketch() - expectedSketch2.Add(1) - expectedSketch2.Compress() + expectedSketch2 = expectedSketch2.Add(1) + expectedSketch2 = expectedSketch2.Compress() expectedSeries2 := &percentile.SketchSeries{ ContextKey: contextKey2, Sketches: []percentile.Sketch{{Timestamp: int64(12345), Sketch: expectedSketch2}}} diff --git a/pkg/aggregator/dist_sampler_test.go b/pkg/aggregator/dist_sampler_test.go index 126bcfd6cd3a..8cda66bae0da 100644 --- a/pkg/aggregator/dist_sampler_test.go +++ b/pkg/aggregator/dist_sampler_test.go @@ -95,9 +95,9 @@ func TestDistSamplerBucketSampling(t *testing.T) { sketchSeries := distSampler.flush(10020.0) expectedSketch := percentile.NewQSketch() - expectedSketch.Add(1) - expectedSketch.Add(2) - expectedSketch.Compress() + expectedSketch = expectedSketch.Add(1) + expectedSketch = expectedSketch.Add(2) + expectedSketch = expectedSketch.Compress() expectedSeries := &percentile.SketchSeries{ Name: "test.metric.name", Tags: []string{"a", "b"}, @@ -140,8 +140,8 @@ func TestDistSamplerContextSampling(t *testing.T) { sketchSeries := orderedSketchSeries.sketchSeries expectedSketch := percentile.NewQSketch() - expectedSketch.Add(1) - expectedSketch.Compress() + expectedSketch = expectedSketch.Add(1) + expectedSketch = expectedSketch.Compress() expectedSeries1 := &percentile.SketchSeries{ Name: "test.metric.name1", Tags: []string{"a", "b"}, diff --git a/pkg/aggregator/distribution.go b/pkg/aggregator/distribution.go index bcae2612e621..d73a502ea5df 100644 --- a/pkg/aggregator/distribution.go +++ b/pkg/aggregator/distribution.go @@ -18,7 +18,7 @@ func NewDistribution() *Distribution { func (d *Distribution) addSample(sample *MetricSample, timestamp float64) { // Insert sample value into the sketch - d.sketch.Add(sample.Value) + d.sketch = d.sketch.Add(sample.Value) d.count++ } @@ -27,7 +27,7 @@ func (d *Distribution) flush(timestamp float64) (*percentile.SketchSeries, error return &percentile.SketchSeries{}, percentile.NoSketchError{} } // compress the sketch before flushing - d.sketch.Compress() + d.sketch = d.sketch.Compress() sketch := &percentile.SketchSeries{ Sketches: []percentile.Sketch{{Timestamp: int64(timestamp), Sketch: d.sketch}}, diff --git a/pkg/aggregator/distribution_test.go b/pkg/aggregator/distribution_test.go index a411271e99b6..d2d03ab15ff7 100644 --- a/pkg/aggregator/distribution_test.go +++ b/pkg/aggregator/distribution_test.go @@ -26,10 +26,10 @@ func TestDistributionSampling(t *testing.T) { assert.Nil(t, err) expectedSketch := percentile.NewQSketch() - expectedSketch.Add(1) - expectedSketch.Add(10) - expectedSketch.Add(5) - expectedSketch.Compress() + expectedSketch = expectedSketch.Add(1) + expectedSketch = expectedSketch.Add(10) + expectedSketch = expectedSketch.Add(5) + expectedSketch = expectedSketch.Compress() expectedSeries := &percentile.SketchSeries{ Sketches: []percentile.Sketch{{Timestamp: int64(15), Sketch: expectedSketch}}} diff --git a/pkg/aggregator/percentile/gk_array.go b/pkg/aggregator/percentile/gk_array.go index 5659f4409bb7..477bef4d4ae6 100644 --- a/pkg/aggregator/percentile/gk_array.go +++ b/pkg/aggregator/percentile/gk_array.go @@ -89,7 +89,7 @@ func NewGKArray() GKArray { } // Add a new value to the summary. -func (s *GKArray) Add(v float64) { +func (s GKArray) Add(v float64) GKArray { s.Count++ s.Sum += v s.Avg += (v - s.Avg) / float64(s.Count) @@ -101,12 +101,14 @@ func (s *GKArray) Add(v float64) { s.Max = v } if s.Count%int(1/EPSILON) == 0 { - s.compress(nil) + return s.compress(nil) } + + return s } // Quantile returns an epsilon estimate of the element at quantile q. -func (s *GKArray) Quantile(q float64) float64 { +func (s GKArray) Quantile(q float64) float64 { if q < 0 || q > 1 { panic("Quantile out of bounds") } @@ -126,7 +128,7 @@ func (s *GKArray) Quantile(q float64) float64 { } if len(s.incoming) > 0 { - s.compress(nil) + s = s.compress(nil) } rank := int(q * float64(s.Count-1)) @@ -149,7 +151,7 @@ func (s *GKArray) Quantile(q float64) float64 { // interpolatedQuantile returns an estimate of the element at quantile q, // but interpolates between the lower and higher elements when Count is // less than 1/EPSILON -func (s *GKArray) interpolatedQuantile(q float64) float64 { +func (s GKArray) interpolatedQuantile(q float64) float64 { rank := q * float64(s.Count-1) indexBelow := int(rank) indexAbove := indexBelow + 1 @@ -160,16 +162,16 @@ func (s *GKArray) interpolatedQuantile(q float64) float64 { weightBelow := 1.0 - weightAbove if len(s.incoming) > 0 { - s.compress(nil) + s = s.compress(nil) } // When Count is less than 1/EPSILON, all the entries will have G = 1, Delta = 0. return weightBelow*s.Entries[indexBelow].V + weightAbove*s.Entries[indexAbove].V } // Merge another GKArray into this in-place. -func (s *GKArray) Merge(o GKArray) { +func (s GKArray) Merge(o GKArray) GKArray { if o.Count == 0 { - return + return s } if s.Count == 0 { s.Entries = o.Entries @@ -179,9 +181,9 @@ func (s *GKArray) Merge(o GKArray) { s.Max = o.Max s.Sum = o.Sum s.Avg = o.Avg - return + return s } - o.compress(nil) + o = o.compress(nil) spread := int(EPSILON * float64(o.Count-1)) /* @@ -224,18 +226,18 @@ func (s *GKArray) Merge(o GKArray) { if o.Max > s.Max { s.Max = o.Max } - s.compress(incomingEntries) + return s.compress(incomingEntries) } // Compress merges the incoming buffer into entries and compresses it. -func (s *GKArray) Compress() { +func (s GKArray) Compress() GKArray { if len(s.incoming) == 0 { - return + return s } - s.compress(nil) + return s.compress(nil) } -func (s *GKArray) compress(incomingEntries Entries) { +func (s GKArray) compress(incomingEntries Entries) GKArray { // TODO[Charles]: use s.incoming and incomingEntries directly instead of merging them prior to compressing for _, v := range s.incoming { @@ -290,7 +292,7 @@ func (s *GKArray) compress(incomingEntries Entries) { j++ } } - s.Entries = merged s.incoming = make([]float64, 0, int(1/EPSILON)) + return s } diff --git a/pkg/aggregator/percentile/gk_array_test.go b/pkg/aggregator/percentile/gk_array_test.go index ade9ae9ac4f7..a5f6cd77f889 100644 --- a/pkg/aggregator/percentile/gk_array_test.go +++ b/pkg/aggregator/percentile/gk_array_test.go @@ -89,7 +89,7 @@ func EvaluateSketch(t *testing.T, n int, gen Generator) { d := NewDataset() for i := 0; i < n; i++ { value := gen.Generate() - s.Add(value) + s = s.Add(value) d.Add(value) } eps := float64(1.0e-6) @@ -116,7 +116,7 @@ func TestConstant(t *testing.T) { d := NewDataset() for i := 0; i < n; i++ { value := constantGenerator.Generate() - s.Add(value) + s = s.Add(value) d.Add(value) } for _, q := range testQuantiles { @@ -162,25 +162,25 @@ func TestMerge(t *testing.T) { generator1 := NewNormal(35, 1) for i := 0; i < n; i += 3 { value := generator1.Generate() - s1.Add(value) + s1 = s1.Add(value) d.Add(value) } s2 := NewGKArray() generator2 := NewNormal(50, 2) for i := 1; i < n; i += 3 { value := generator2.Generate() - s2.Add(value) + s2 = s2.Add(value) d.Add(value) } - s1.Merge(s2) + s1 = s1.Merge(s2) s3 := NewGKArray() generator3 := NewNormal(40, 0.5) for i := 2; i < n; i += 3 { value := generator3.Generate() - s3.Add(value) + s3 = s3.Add(value) d.Add(value) } - s1.Merge(s3) + s1 = s1.Merge(s3) eps := float64(1e-6) for _, q := range testQuantiles { @@ -199,7 +199,7 @@ func TestInterpolatedQuantile(t *testing.T) { if n < int(1/EPSILON) { s := NewGKArray() for i := 0; i < n; i++ { - s.Add(float64(i)) + s = s.Add(float64(i)) } for _, q := range testQuantiles { expected := q * (float64(n) - 1) diff --git a/pkg/aggregator/percentile/sketch_series.go b/pkg/aggregator/percentile/sketch_series.go index 140f19ee36dc..f0e077bc5845 100644 --- a/pkg/aggregator/percentile/sketch_series.go +++ b/pkg/aggregator/percentile/sketch_series.go @@ -36,6 +36,16 @@ func NewQSketch() QSketch { return QSketch{NewGKArray()} } +// Add a value to the qsketch +func (q QSketch) Add(v float64) QSketch { + return QSketch{GKArray: q.GKArray.Add(v)} +} + +// Compress the qsketch +func (q QSketch) Compress() QSketch { + return QSketch{GKArray: q.GKArray.Compress()} +} + // NoSketchError is the error returned when not enough samples have been //submitted to generate a sketch type NoSketchError struct{} @@ -79,7 +89,6 @@ func unmarshalSketches(summarySketches []agentpayload.SketchPayload_Summary_Sket }) } return sketches - } // MarshalSketchSeries serializes sketch series using protocol buffers