Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions pkg/aggregator/context_sketch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ func TestContextSketchSampling(t *testing.T) {
resultSeries := ctxSketch.flush(12345.0)

expectedSketch := percentile.NewQSketch()
expectedSketch.Add(1)
expectedSketch.Add(5)
expectedSketch.Compress()
expectedSketch = expectedSketch.Add(1)
expectedSketch = expectedSketch.Add(5)
expectedSketch = expectedSketch.Compress()
expectedSeries := &percentile.SketchSeries{
ContextKey: contextKey,
Sketches: []percentile.Sketch{{Timestamp: int64(12345), Sketch: expectedSketch}}}
Expand Down Expand Up @@ -57,15 +57,15 @@ func TestContextSketchSamplingMultiContexts(t *testing.T) {
sort.Sort(orderedSketchSeries)

expectedSketch1 := percentile.NewQSketch()
expectedSketch1.Add(1)
expectedSketch1.Add(3)
expectedSketch1.Compress()
expectedSketch1 = expectedSketch1.Add(1)
expectedSketch1 = expectedSketch1.Add(3)
expectedSketch1 = expectedSketch1.Compress()
expectedSeries1 := &percentile.SketchSeries{
ContextKey: contextKey1,
Sketches: []percentile.Sketch{{Timestamp: int64(12345), Sketch: expectedSketch1}}}
expectedSketch2 := percentile.NewQSketch()
expectedSketch2.Add(1)
expectedSketch2.Compress()
expectedSketch2 = expectedSketch2.Add(1)
expectedSketch2 = expectedSketch2.Compress()
expectedSeries2 := &percentile.SketchSeries{
ContextKey: contextKey2,
Sketches: []percentile.Sketch{{Timestamp: int64(12345), Sketch: expectedSketch2}}}
Expand Down
10 changes: 5 additions & 5 deletions pkg/aggregator/dist_sampler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,9 @@ func TestDistSamplerBucketSampling(t *testing.T) {
sketchSeries := distSampler.flush(10020.0)

expectedSketch := percentile.NewQSketch()
expectedSketch.Add(1)
expectedSketch.Add(2)
expectedSketch.Compress()
expectedSketch = expectedSketch.Add(1)
expectedSketch = expectedSketch.Add(2)
expectedSketch = expectedSketch.Compress()
expectedSeries := &percentile.SketchSeries{
Name: "test.metric.name",
Tags: []string{"a", "b"},
Expand Down Expand Up @@ -140,8 +140,8 @@ func TestDistSamplerContextSampling(t *testing.T) {
sketchSeries := orderedSketchSeries.sketchSeries

expectedSketch := percentile.NewQSketch()
expectedSketch.Add(1)
expectedSketch.Compress()
expectedSketch = expectedSketch.Add(1)
expectedSketch = expectedSketch.Compress()
expectedSeries1 := &percentile.SketchSeries{
Name: "test.metric.name1",
Tags: []string{"a", "b"},
Expand Down
4 changes: 2 additions & 2 deletions pkg/aggregator/distribution.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func NewDistribution() *Distribution {

func (d *Distribution) addSample(sample *MetricSample, timestamp float64) {
// Insert sample value into the sketch
d.sketch.Add(sample.Value)
d.sketch = d.sketch.Add(sample.Value)
d.count++
}

Expand All @@ -27,7 +27,7 @@ func (d *Distribution) flush(timestamp float64) (*percentile.SketchSeries, error
return &percentile.SketchSeries{}, percentile.NoSketchError{}
}
// compress the sketch before flushing
d.sketch.Compress()
d.sketch = d.sketch.Compress()
sketch := &percentile.SketchSeries{
Sketches: []percentile.Sketch{{Timestamp: int64(timestamp),
Sketch: d.sketch}},
Expand Down
8 changes: 4 additions & 4 deletions pkg/aggregator/distribution_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ func TestDistributionSampling(t *testing.T) {
assert.Nil(t, err)

expectedSketch := percentile.NewQSketch()
expectedSketch.Add(1)
expectedSketch.Add(10)
expectedSketch.Add(5)
expectedSketch.Compress()
expectedSketch = expectedSketch.Add(1)
expectedSketch = expectedSketch.Add(10)
expectedSketch = expectedSketch.Add(5)
expectedSketch = expectedSketch.Compress()
expectedSeries := &percentile.SketchSeries{
Sketches: []percentile.Sketch{{Timestamp: int64(15), Sketch: expectedSketch}}}

Expand Down
34 changes: 18 additions & 16 deletions pkg/aggregator/percentile/gk_array.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func NewGKArray() GKArray {
}

// Add a new value to the summary.
func (s *GKArray) Add(v float64) {
func (s GKArray) Add(v float64) GKArray {
s.Count++
s.Sum += v
s.Avg += (v - s.Avg) / float64(s.Count)
Expand All @@ -101,12 +101,14 @@ func (s *GKArray) Add(v float64) {
s.Max = v
}
if s.Count%int(1/EPSILON) == 0 {
s.compress(nil)
return s.compress(nil)
}

return s
}

// Quantile returns an epsilon estimate of the element at quantile q.
func (s *GKArray) Quantile(q float64) float64 {
func (s GKArray) Quantile(q float64) float64 {
if q < 0 || q > 1 {
panic("Quantile out of bounds")
}
Expand All @@ -126,7 +128,7 @@ func (s *GKArray) Quantile(q float64) float64 {
}

if len(s.incoming) > 0 {
s.compress(nil)
s = s.compress(nil)
}

rank := int(q * float64(s.Count-1))
Expand All @@ -149,7 +151,7 @@ func (s *GKArray) Quantile(q float64) float64 {
// interpolatedQuantile returns an estimate of the element at quantile q,
// but interpolates between the lower and higher elements when Count is
// less than 1/EPSILON
func (s *GKArray) interpolatedQuantile(q float64) float64 {
func (s GKArray) interpolatedQuantile(q float64) float64 {
rank := q * float64(s.Count-1)
indexBelow := int(rank)
indexAbove := indexBelow + 1
Expand All @@ -160,16 +162,16 @@ func (s *GKArray) interpolatedQuantile(q float64) float64 {
weightBelow := 1.0 - weightAbove

if len(s.incoming) > 0 {
s.compress(nil)
s = s.compress(nil)
}
// When Count is less than 1/EPSILON, all the entries will have G = 1, Delta = 0.
return weightBelow*s.Entries[indexBelow].V + weightAbove*s.Entries[indexAbove].V
}

// Merge another GKArray into this in-place.
func (s *GKArray) Merge(o GKArray) {
func (s GKArray) Merge(o GKArray) GKArray {
if o.Count == 0 {
return
return s
}
if s.Count == 0 {
s.Entries = o.Entries
Expand All @@ -179,9 +181,9 @@ func (s *GKArray) Merge(o GKArray) {
s.Max = o.Max
s.Sum = o.Sum
s.Avg = o.Avg
return
return s
}
o.compress(nil)
o = o.compress(nil)
spread := int(EPSILON * float64(o.Count-1))

/*
Expand Down Expand Up @@ -224,18 +226,18 @@ func (s *GKArray) Merge(o GKArray) {
if o.Max > s.Max {
s.Max = o.Max
}
s.compress(incomingEntries)
return s.compress(incomingEntries)
}

// Compress merges the incoming buffer into entries and compresses it.
func (s *GKArray) Compress() {
func (s GKArray) Compress() GKArray {
if len(s.incoming) == 0 {
return
return s
}
s.compress(nil)
return s.compress(nil)
}

func (s *GKArray) compress(incomingEntries Entries) {
func (s GKArray) compress(incomingEntries Entries) GKArray {

// TODO[Charles]: use s.incoming and incomingEntries directly instead of merging them prior to compressing
for _, v := range s.incoming {
Expand Down Expand Up @@ -290,7 +292,7 @@ func (s *GKArray) compress(incomingEntries Entries) {
j++
}
}

s.Entries = merged
s.incoming = make([]float64, 0, int(1/EPSILON))
return s
}
16 changes: 8 additions & 8 deletions pkg/aggregator/percentile/gk_array_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func EvaluateSketch(t *testing.T, n int, gen Generator) {
d := NewDataset()
for i := 0; i < n; i++ {
value := gen.Generate()
s.Add(value)
s = s.Add(value)
d.Add(value)
}
eps := float64(1.0e-6)
Expand All @@ -116,7 +116,7 @@ func TestConstant(t *testing.T) {
d := NewDataset()
for i := 0; i < n; i++ {
value := constantGenerator.Generate()
s.Add(value)
s = s.Add(value)
d.Add(value)
}
for _, q := range testQuantiles {
Expand Down Expand Up @@ -162,25 +162,25 @@ func TestMerge(t *testing.T) {
generator1 := NewNormal(35, 1)
for i := 0; i < n; i += 3 {
value := generator1.Generate()
s1.Add(value)
s1 = s1.Add(value)
d.Add(value)
}
s2 := NewGKArray()
generator2 := NewNormal(50, 2)
for i := 1; i < n; i += 3 {
value := generator2.Generate()
s2.Add(value)
s2 = s2.Add(value)
d.Add(value)
}
s1.Merge(s2)
s1 = s1.Merge(s2)
s3 := NewGKArray()
generator3 := NewNormal(40, 0.5)
for i := 2; i < n; i += 3 {
value := generator3.Generate()
s3.Add(value)
s3 = s3.Add(value)
d.Add(value)
}
s1.Merge(s3)
s1 = s1.Merge(s3)

eps := float64(1e-6)
for _, q := range testQuantiles {
Expand All @@ -199,7 +199,7 @@ func TestInterpolatedQuantile(t *testing.T) {
if n < int(1/EPSILON) {
s := NewGKArray()
for i := 0; i < n; i++ {
s.Add(float64(i))
s = s.Add(float64(i))
}
for _, q := range testQuantiles {
expected := q * (float64(n) - 1)
Expand Down
11 changes: 10 additions & 1 deletion pkg/aggregator/percentile/sketch_series.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,16 @@ func NewQSketch() QSketch {
return QSketch{NewGKArray()}
}

// Add a value to the qsketch
func (q QSketch) Add(v float64) QSketch {
return QSketch{GKArray: q.GKArray.Add(v)}
}

// Compress the qsketch
func (q QSketch) Compress() QSketch {
return QSketch{GKArray: q.GKArray.Compress()}
}

// NoSketchError is the error returned when not enough samples have been
//submitted to generate a sketch
type NoSketchError struct{}
Expand Down Expand Up @@ -79,7 +89,6 @@ func unmarshalSketches(summarySketches []agentpayload.SketchPayload_Summary_Sket
})
}
return sketches

}

// MarshalSketchSeries serializes sketch series using protocol buffers
Expand Down