From 648a79a3e11e8a992d48f614b54c83071d3dc3d4 Mon Sep 17 00:00:00 2001 From: Julius Volz Date: Fri, 19 Jul 2013 13:29:52 +0200 Subject: [PATCH 1/7] Synchronize Close(), fix race conditions. Close() was not synced through the main dispatcher loop, so it could close channels that were currently being written to by methods called from said dispatcher loop. This leads to a crash. Instead, Close() now writes a closeRequest, which is handled in the dispatcher. --- manager/aggregator.go | 43 +++++++++++++++++++++++-------------------- 1 file changed, 23 insertions(+), 20 deletions(-) diff --git a/manager/aggregator.go b/manager/aggregator.go index 481d7d2491..16ae4bcbd7 100644 --- a/manager/aggregator.go +++ b/manager/aggregator.go @@ -110,7 +110,7 @@ type Aggregator struct { getAggregatesRequests chan *getAggregatesRequest removeAggregateRequests chan EventFingerprint rulesRequests chan *aggregatorResetRulesRequest - closed chan bool + closeRequests chan *closeRequest } func NewAggregator() *Aggregator { @@ -121,18 +121,24 @@ func NewAggregator() *Aggregator { getAggregatesRequests: make(chan *getAggregatesRequest), removeAggregateRequests: make(chan EventFingerprint), rulesRequests: make(chan *aggregatorResetRulesRequest), - closed: make(chan bool), + closeRequests: make(chan *closeRequest), } } func (a *Aggregator) Close() { + req := &closeRequest{ + done: make(chan bool), + } + a.closeRequests <- req + <-req.done +} + +func (a *Aggregator) closeInternal() { close(a.rulesRequests) close(a.aggRequests) close(a.getAggregatesRequests) close(a.removeAggregateRequests) - - <-a.closed - close(a.closed) + close(a.closeRequests) } type aggregateEventsResponse struct { @@ -153,6 +159,10 @@ type getAggregatesRequest struct { Response chan getAggregatesResponse } +type closeRequest struct { + done chan bool +} + func (a *Aggregator) aggregate(req *aggregateEventsRequest, s SummaryReceiver) { if len(a.Rules) == 0 { req.Response <- &aggregateEventsResponse{ @@ -259,24 +269,14 @@ func (a *Aggregator) Dispatch(s SummaryReceiver) { t := time.NewTicker(time.Second) defer t.Stop() - closed := 0 - - for closed < 2 { + for { select { - case req, open := <-a.aggRequests: + case req := <-a.aggRequests: a.aggregate(req, s) - if !open { - closed++ - } - - case rules, open := <-a.rulesRequests: + case rules := <-a.rulesRequests: a.replaceRules(rules) - if !open { - closed++ - } - case req := <-a.getAggregatesRequests: aggs := a.aggregates() req.Response <- getAggregatesResponse{ @@ -288,8 +288,11 @@ func (a *Aggregator) Dispatch(s SummaryReceiver) { log.Println("Deleting expired aggregation instance", a) a.Aggregates[fp].Close() delete(a.Aggregates, fp) + + case req := <-a.closeRequests: + a.closeInternal() + req.done <- true + return } } - - a.closed <- true } From a8bd98b7e1b6f10130554fd520fdb9cf8f3e7692 Mon Sep 17 00:00:00 2001 From: Julius Volz Date: Fri, 19 Jul 2013 13:31:12 +0200 Subject: [PATCH 2/7] Fix regex filters to match complete string. If someone specifies service = "foo-service" ...they probably don't want it to match: service = "foo-servicebar" --- manager/matcher.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/manager/matcher.go b/manager/matcher.go index d54bea0193..9eda71a974 100644 --- a/manager/matcher.go +++ b/manager/matcher.go @@ -33,8 +33,8 @@ func NewFilter(namePattern string, valuePattern string) *Filter { fmt.Fprintf(summer, namePattern, valuePattern) return &Filter{ - Name: regexp.MustCompile(namePattern), - Value: regexp.MustCompile(valuePattern), + Name: regexp.MustCompile("^" + namePattern + "$"), + Value: regexp.MustCompile("^" + valuePattern + "$"), fingerprint: summer.Sum64(), } } From bc57fa4936fbc957b124892abdc46126e2ba9206 Mon Sep 17 00:00:00 2001 From: Julius Volz Date: Fri, 19 Jul 2013 13:32:53 +0200 Subject: [PATCH 3/7] Add initial aggregator tests. --- manager/aggregator_test.go | 122 +++++++++++++++++++++++++++++++++++++ 1 file changed, 122 insertions(+) create mode 100644 manager/aggregator_test.go diff --git a/manager/aggregator_test.go b/manager/aggregator_test.go new file mode 100644 index 0000000000..ffb21b694d --- /dev/null +++ b/manager/aggregator_test.go @@ -0,0 +1,122 @@ +// Copyright 2013 Prometheus Team +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package manager + +import ( + "testing" +) + +type dummyReceiver struct{} + +func (d *dummyReceiver) Receive(*EventSummary) RemoteError { + return nil +} + +func TestAggregator(t *testing.T) { + scenarios := []struct { + rules AggregationRules + inMatch Events + inNoMatch Events + }{ + { + // No rules, one event. + inNoMatch: Events{ + &Event{ + Labels: map[string]string{ + "foo": "bar", + }, + }, + }, + }, + { + // One rule, two matching events, one non-matching. + rules: AggregationRules{ + &AggregationRule{ + Filters: Filters{NewFilter("service", "test(-)?service")}, + }, + }, + inMatch: Events{ + &Event{ + Labels: map[string]string{ + "service": "testservice", + "foo": "bar", + }, + }, + &Event{ + Labels: map[string]string{ + "service": "test-service", + "bar": "baz", + }, + }, + }, + inNoMatch: Events{ + &Event{ + Labels: map[string]string{ + "service": "testservice2", + "foo": "bar", + }, + }, + }, + }, + } + + for i, scenario := range scenarios { + a := NewAggregator() + go a.Dispatch(&dummyReceiver{}) + + done := make(chan bool) + go func() { + a.SetRules(scenario.rules) + done <- true + }() + <-done + + if len(scenario.inMatch) > 0 { + err := a.Receive(scenario.inMatch) + if err != nil { + t.Fatalf("%d. Expected input %v to match, got error: %s", i, scenario.inMatch, err) + } + } + + if len(scenario.inNoMatch) > 0 { + err := a.Receive(scenario.inNoMatch) + // BUG: we need to define more clearly what should happen if a subset of + // events doesn't match. Right now we only return an error if no rules + // are configured. + if len(scenario.rules) == 0 && err == nil { + t.Fatalf("%d. Expected aggregation error when no rules are set", i) + } + } + + aggs := a.AlertAggregates() + if len(aggs) != len(scenario.inMatch) { + t.Fatalf("%d. Expected %d aggregates, got %d", i, len(scenario.inMatch), len(aggs)) + } + + for j, agg := range aggs { + ev := scenario.inMatch[j] + if len(agg.Event.Labels) != len(ev.Labels) { + t.Fatalf("%d.%d. Expected %d labels, got %d", i, j, len(ev.Labels), len(agg.Event.Labels)) + } + + for l, v := range agg.Event.Labels { + if ev.Labels[l] != v { + t.Fatalf("%d.%d. Expected label %s=%s, got %s=%s", l, ev.Labels[l], l, v) + } + } + } + + a.Close() + } +} From f9bca4ba2b60e441e765460651c2dfa863b2f7d5 Mon Sep 17 00:00:00 2001 From: Julius Volz Date: Fri, 19 Jul 2013 15:15:53 +0200 Subject: [PATCH 4/7] Remove unused timer. --- manager/aggregator.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/manager/aggregator.go b/manager/aggregator.go index 16ae4bcbd7..0508de2cb1 100644 --- a/manager/aggregator.go +++ b/manager/aggregator.go @@ -266,9 +266,6 @@ func (a *Aggregator) SetRules(r AggregationRules) error { } func (a *Aggregator) Dispatch(s SummaryReceiver) { - t := time.NewTicker(time.Second) - defer t.Stop() - for { select { case req := <-a.aggRequests: From 606d120541ae02a77c7b34a1013c32f7c97d1034 Mon Sep 17 00:00:00 2001 From: Julius Volz Date: Fri, 19 Jul 2013 15:26:51 +0200 Subject: [PATCH 5/7] Move aggregator scenario tests to separate type. --- manager/aggregator_test.go | 108 +++++++++++++++++++------------------ 1 file changed, 57 insertions(+), 51 deletions(-) diff --git a/manager/aggregator_test.go b/manager/aggregator_test.go index ffb21b694d..fbf39a965b 100644 --- a/manager/aggregator_test.go +++ b/manager/aggregator_test.go @@ -23,12 +23,63 @@ func (d *dummyReceiver) Receive(*EventSummary) RemoteError { return nil } +type testAggregatorScenario struct { + rules AggregationRules + inMatch Events + inNoMatch Events +} + +func (s *testAggregatorScenario) test(i int, t *testing.T) { + a := NewAggregator() + go a.Dispatch(&dummyReceiver{}) + + done := make(chan bool) + go func() { + a.SetRules(s.rules) + done <- true + }() + <-done + + if len(s.inMatch) > 0 { + err := a.Receive(s.inMatch) + if err != nil { + t.Fatalf("%d. Expected input %v to match, got error: %s", i, s.inMatch, err) + } + } + + if len(s.inNoMatch) > 0 { + err := a.Receive(s.inNoMatch) + // BUG: we need to define more clearly what should happen if a subset of + // events doesn't match. Right now we only return an error if no rules + // are configured. + if len(s.rules) == 0 && err == nil { + t.Fatalf("%d. Expected aggregation error when no rules are set", i) + } + } + + aggs := a.AlertAggregates() + if len(aggs) != len(s.inMatch) { + t.Fatalf("%d. Expected %d aggregates, got %d", i, len(s.inMatch), len(aggs)) + } + + for j, agg := range aggs { + ev := s.inMatch[j] + if len(agg.Event.Labels) != len(ev.Labels) { + t.Fatalf("%d.%d. Expected %d labels, got %d", i, j, len(ev.Labels), len(agg.Event.Labels)) + } + + for l, v := range agg.Event.Labels { + if ev.Labels[l] != v { + t.Fatalf("%d.%d. Expected label %s=%s, got %s=%s", l, ev.Labels[l], l, v) + } + } + } + + a.Close() +} + func TestAggregator(t *testing.T) { - scenarios := []struct { - rules AggregationRules - inMatch Events - inNoMatch Events - }{ + scenarios := []testAggregatorScenario{ { // No rules, one event. inNoMatch: Events{ @@ -72,51 +123,6 @@ func TestAggregator(t *testing.T) { } for i, scenario := range scenarios { - a := NewAggregator() - go a.Dispatch(&dummyReceiver{}) - - done := make(chan bool) - go func() { - a.SetRules(scenario.rules) - done <- true - }() - <-done - - if len(scenario.inMatch) > 0 { - err := a.Receive(scenario.inMatch) - if err != nil { - t.Fatalf("%d. Expected input %v to match, got error: %s", i, scenario.inMatch, err) - } - } - - if len(scenario.inNoMatch) > 0 { - err := a.Receive(scenario.inNoMatch) - // BUG: we need to define more clearly what should happen if a subset of - // events doesn't match. Right now we only return an error if no rules - // are configured. - if len(scenario.rules) == 0 && err == nil { - t.Fatalf("%d. Expected aggregation error when no rules are set", i) - } - } - - aggs := a.AlertAggregates() - if len(aggs) != len(scenario.inMatch) { - t.Fatalf("%d. Expected %d aggregates, got %d", i, len(scenario.inMatch), len(aggs)) - } - - for j, agg := range aggs { - ev := scenario.inMatch[j] - if len(agg.Event.Labels) != len(ev.Labels) { - t.Fatalf("%d.%d. Expected %d labels, got %d", i, j, len(ev.Labels), len(agg.Event.Labels)) - } - - for l, v := range agg.Event.Labels { - if ev.Labels[l] != v { - t.Fatalf("%d.%d. Expected label %s=%s, got %s=%s", l, ev.Labels[l], l, v) - } - } - } - - a.Close() + scenario.test(i, t) } } From ca1eb66df41f3183f101d2bf3de71643624170c4 Mon Sep 17 00:00:00 2001 From: Julius Volz Date: Fri, 19 Jul 2013 18:10:40 +0200 Subject: [PATCH 6/7] Add BUG comment about aggregator draining. --- manager/aggregator.go | 1 + 1 file changed, 1 insertion(+) diff --git a/manager/aggregator.go b/manager/aggregator.go index 0508de2cb1..3e0dc24b3f 100644 --- a/manager/aggregator.go +++ b/manager/aggregator.go @@ -289,6 +289,7 @@ func (a *Aggregator) Dispatch(s SummaryReceiver) { case req := <-a.closeRequests: a.closeInternal() req.done <- true + // BUG: Simply returning here will prevent proper draining. Fix this. return } } From 24d9977c957dba704b6a8db62cfecb5ef8e42878 Mon Sep 17 00:00:00 2001 From: Julius Volz Date: Mon, 22 Jul 2013 11:02:45 +0200 Subject: [PATCH 7/7] Run go fmt. --- manager/aggregator.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/manager/aggregator.go b/manager/aggregator.go index 3e0dc24b3f..f158cb0152 100644 --- a/manager/aggregator.go +++ b/manager/aggregator.go @@ -289,7 +289,7 @@ func (a *Aggregator) Dispatch(s SummaryReceiver) { case req := <-a.closeRequests: a.closeInternal() req.done <- true - // BUG: Simply returning here will prevent proper draining. Fix this. + // BUG: Simply returning here will prevent proper draining. Fix this. return } }