diff --git a/manager/aggregator.go b/manager/aggregator.go index 481d7d2491..f158cb0152 100644 --- a/manager/aggregator.go +++ b/manager/aggregator.go @@ -110,7 +110,7 @@ type Aggregator struct { getAggregatesRequests chan *getAggregatesRequest removeAggregateRequests chan EventFingerprint rulesRequests chan *aggregatorResetRulesRequest - closed chan bool + closeRequests chan *closeRequest } func NewAggregator() *Aggregator { @@ -121,18 +121,24 @@ func NewAggregator() *Aggregator { getAggregatesRequests: make(chan *getAggregatesRequest), removeAggregateRequests: make(chan EventFingerprint), rulesRequests: make(chan *aggregatorResetRulesRequest), - closed: make(chan bool), + closeRequests: make(chan *closeRequest), } } func (a *Aggregator) Close() { + req := &closeRequest{ + done: make(chan bool), + } + a.closeRequests <- req + <-req.done +} + +func (a *Aggregator) closeInternal() { close(a.rulesRequests) close(a.aggRequests) close(a.getAggregatesRequests) close(a.removeAggregateRequests) - - <-a.closed - close(a.closed) + close(a.closeRequests) } type aggregateEventsResponse struct { @@ -153,6 +159,10 @@ type getAggregatesRequest struct { Response chan getAggregatesResponse } +type closeRequest struct { + done chan bool +} + func (a *Aggregator) aggregate(req *aggregateEventsRequest, s SummaryReceiver) { if len(a.Rules) == 0 { req.Response <- &aggregateEventsResponse{ @@ -256,27 +266,14 @@ func (a *Aggregator) SetRules(r AggregationRules) error { } func (a *Aggregator) Dispatch(s SummaryReceiver) { - t := time.NewTicker(time.Second) - defer t.Stop() - - closed := 0 - - for closed < 2 { + for { select { - case req, open := <-a.aggRequests: + case req := <-a.aggRequests: a.aggregate(req, s) - if !open { - closed++ - } - - case rules, open := <-a.rulesRequests: + case rules := <-a.rulesRequests: a.replaceRules(rules) - if !open { - closed++ - } - case req := <-a.getAggregatesRequests: aggs := a.aggregates() req.Response <- getAggregatesResponse{ @@ -288,8 +285,12 @@ func (a *Aggregator) Dispatch(s SummaryReceiver) { log.Println("Deleting expired aggregation instance", a) a.Aggregates[fp].Close() delete(a.Aggregates, fp) + + case req := <-a.closeRequests: + a.closeInternal() + req.done <- true + // BUG: Simply returning here will prevent proper draining. Fix this. + return } } - - a.closed <- true } diff --git a/manager/aggregator_test.go b/manager/aggregator_test.go new file mode 100644 index 0000000000..fbf39a965b --- /dev/null +++ b/manager/aggregator_test.go @@ -0,0 +1,128 @@ +// Copyright 2013 Prometheus Team +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package manager + +import ( + "testing" +) + +type dummyReceiver struct{} + +func (d *dummyReceiver) Receive(*EventSummary) RemoteError { + return nil +} + +type testAggregatorScenario struct { + rules AggregationRules + inMatch Events + inNoMatch Events +} + +func (s *testAggregatorScenario) test(i int, t *testing.T) { + a := NewAggregator() + go a.Dispatch(&dummyReceiver{}) + + done := make(chan bool) + go func() { + a.SetRules(s.rules) + done <- true + }() + <-done + + if len(s.inMatch) > 0 { + err := a.Receive(s.inMatch) + if err != nil { + t.Fatalf("%d. Expected input %v to match, got error: %s", i, s.inMatch, err) + } + } + + if len(s.inNoMatch) > 0 { + err := a.Receive(s.inNoMatch) + // BUG: we need to define more clearly what should happen if a subset of + // events doesn't match. Right now we only return an error if no rules + // are configured. + if len(s.rules) == 0 && err == nil { + t.Fatalf("%d. Expected aggregation error when no rules are set", i) + } + } + + aggs := a.AlertAggregates() + if len(aggs) != len(s.inMatch) { + t.Fatalf("%d. Expected %d aggregates, got %d", i, len(s.inMatch), len(aggs)) + } + + for j, agg := range aggs { + ev := s.inMatch[j] + if len(agg.Event.Labels) != len(ev.Labels) { + t.Fatalf("%d.%d. Expected %d labels, got %d", i, j, len(ev.Labels), len(agg.Event.Labels)) + } + + for l, v := range agg.Event.Labels { + if ev.Labels[l] != v { + t.Fatalf("%d.%d. Expected label %s=%s, got %s=%s", l, ev.Labels[l], l, v) + } + } + } + + a.Close() +} + +func TestAggregator(t *testing.T) { + scenarios := []testAggregatorScenario{ + { + // No rules, one event. + inNoMatch: Events{ + &Event{ + Labels: map[string]string{ + "foo": "bar", + }, + }, + }, + }, + { + // One rule, two matching events, one non-matching. + rules: AggregationRules{ + &AggregationRule{ + Filters: Filters{NewFilter("service", "test(-)?service")}, + }, + }, + inMatch: Events{ + &Event{ + Labels: map[string]string{ + "service": "testservice", + "foo": "bar", + }, + }, + &Event{ + Labels: map[string]string{ + "service": "test-service", + "bar": "baz", + }, + }, + }, + inNoMatch: Events{ + &Event{ + Labels: map[string]string{ + "service": "testservice2", + "foo": "bar", + }, + }, + }, + }, + } + + for i, scenario := range scenarios { + scenario.test(i, t) + } +} diff --git a/manager/matcher.go b/manager/matcher.go index d54bea0193..9eda71a974 100644 --- a/manager/matcher.go +++ b/manager/matcher.go @@ -33,8 +33,8 @@ func NewFilter(namePattern string, valuePattern string) *Filter { fmt.Fprintf(summer, namePattern, valuePattern) return &Filter{ - Name: regexp.MustCompile(namePattern), - Value: regexp.MustCompile(valuePattern), + Name: regexp.MustCompile("^" + namePattern + "$"), + Value: regexp.MustCompile("^" + valuePattern + "$"), fingerprint: summer.Sum64(), } }