Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 20 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,36 @@ func main() {
aggregator := manager.NewAggregator()
defer aggregator.Close()

summarizer := new(manager.SummaryDispatcher)
summarizer := manager.NewSummaryDispatcher()
go aggregator.Dispatch(summarizer)
log.Println("Done.")

webService := &web.WebService{
AlertManagerService: &api.AlertManagerService{
Aggregator: aggregator,
},
AlertsHandler: nil,
AlertsHandler: &web.AlertsHandler{
Aggregator: aggregator,
},
}
go webService.ServeForever()

// BEGIN EXAMPLE CODE - replace with config loading later.
done := make(chan bool)
go func() {
rules := manager.AggregationRules{
&manager.AggregationRule{
Filters: manager.Filters{manager.NewFilter("service", "discovery")},
},
}

aggregator.SetRules(rules)

done <- true
}()
<-done
// END EXAMPLE CODE

log.Println("Running summary dispatcher...")
summarizer.Dispatch(suppressor)
}
176 changes: 108 additions & 68 deletions manager/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,9 @@ import (
"time"
)

type aggDispatchState int

const (
aggUnsent aggDispatchState = iota
aggSent
minimumRefreshPeriod = 5 * time.Minute
notificationRetryPeriod = 1 * time.Minute
)

// AggregationRule creates and manages the scope for received events.
Expand All @@ -33,104 +31,105 @@ type AggregationRule struct {
RepeatRate time.Duration
}

type AggregationInstance struct {
Rule *AggregationRule
Events Events

EndsAt time.Time
type AggregationInstances []*AggregationInstance

state aggDispatchState
type AggregationInstance struct {
Rule *AggregationRule
Event *Event

// When was this AggregationInstance created?
Created time.Time
// When was the last refresh received into this AggregationInstance?
LastRefreshed time.Time

// When was the last successful notification sent out for this
// AggregationInstance?
lastNotificationSent time.Time
// Timer used to trigger a notification retry/resend.
notificationResendTimer *time.Timer
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You will need to call notificationResendTimer.Stop() on when this AI is disposed of. You will also need to close its channel.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm already calling notificationResendTimer.Stop() in Dispatch(). As for the channel closing: you mean this only as a GC hint, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC, it is to prevent leakage, meaning more than a hint.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, found the answer myself (https://groups.google.com/forum/#!topic/golang-nuts/-xnFsH_ZRqU). At least with NewTimer, the timer returned contains a channel on which a tick is sent. However, looking at the implementation of time.AfterFunc(), the timer returned from that doesn't get any channel assigned to it, so there's nothing to close in that case.

Compare:
http://golang.org/src/pkg/time/sleep.go#L62 (NewTimer)
http://golang.org/src/pkg/time/sleep.go#L109 (AfterFunc)

// Timer used to trigger the deletion of the AggregationInstance after it
// hasn't been refreshed for too long.
expiryTimer *time.Timer
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It already fired when I'm deleting the instance, otherwise I wouldn't be deleting it (so strictly no need for stopping it). Ditto for the channel closing though.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that I'm adding a general Close() method though, I'll just add stopping of this timer to it, too.

}

func (r *AggregationRule) Handles(e *Event) bool {
return r.Filters.Handles(e)
}

func (r *AggregationInstance) Ingest(e *Event) {
r.Events = append(r.Events, e)
}
r.Event = e
r.LastRefreshed = time.Now()

func (r *AggregationInstance) Tidy() {
// BUG(matt): Drop this in favor of having the entire AggregationInstance
// being dropped when too old.
log.Println("Tidying...")
if len(r.Events) == 0 {
return
}

events := Events{}

t := time.Now()
for _, e := range r.Events {
if t.Before(e.CreatedAt) {
events = append(events, e)
}
}

if len(events) == 0 {
r.state = aggSent
}

r.Events = events
r.expiryTimer.Reset(minimumRefreshPeriod)
}

func (r *AggregationInstance) SendNotification(s SummaryReceiver) {
if r.state == aggSent {
if time.Since(r.lastNotificationSent) < r.Rule.RepeatRate {
return
}

err := s.Receive(&EventSummary{
Rule: r.Rule,
Events: r.Events,
Rule: r.Rule,
Event: r.Event,
})
if err != nil {
if err.Retryable() {
return
}
log.Println("Unretryable error while sending notification:", err)
log.Printf("Error while sending notification: %s, retrying in %v", err, notificationRetryPeriod)
r.resendNotificationAfter(notificationRetryPeriod, s)
return
}

r.state = aggSent
r.resendNotificationAfter(r.Rule.RepeatRate, s)
r.lastNotificationSent = time.Now()
}

func (r *AggregationInstance) resendNotificationAfter(d time.Duration, s SummaryReceiver) {
// BUG: we can't just call SendNotification whenever the timer ends without
// any synchronisation. The timer should instead feed into a channel which is
// served by the main Dispatch() loop.
r.notificationResendTimer = time.AfterFunc(d, func() {
r.SendNotification(s)
})
}

func (r *AggregationInstance) Close() {
if r.notificationResendTimer != nil {
r.notificationResendTimer.Stop()
}
if r.expiryTimer != nil {
r.expiryTimer.Stop()
}
}

type AggregationRules []*AggregationRule

type Aggregator struct {
Rules AggregationRules
// Used for O(1) lookup and removal of aggregations when new ones come into the system.
Aggregates map[uint64]*AggregationInstance
// TODO: Add priority queue sorted by expiration time.Time (newest, oldest).
// When a new element comes into this queue and the last head is not equal to
// current head, cancel the existing internal timer and create a new timer for
// expiry.Sub(time.Now) and have that (<- chan time.Time) funnel into the
// event into the dispatch loop where the present tidy call is made. Delete
// tidy, and just shift the head element of the priority queue off and remove
// it from the O(1) membership index above.
Aggregates map[EventFingerprint]*AggregationInstance

// TODO?: Build a new priority queue type that uses an internal wrapper container for
// the AggregationInstance it decorates to note the last dispatch time. The
// queue uses higher-level add and remove methods.

// SHORTFALL: Needing to garbage collect aggregations across three containers?

aggRequests chan *aggregateEventsRequest
rulesRequests chan *aggregatorResetRulesRequest
closed chan bool
aggRequests chan *aggregateEventsRequest
getAggregatesRequests chan *getAggregatesRequest
removeAggregateRequests chan EventFingerprint
rulesRequests chan *aggregatorResetRulesRequest
closed chan bool
}

func NewAggregator() *Aggregator {
return &Aggregator{
Aggregates: make(map[uint64]*AggregationInstance),
Aggregates: make(map[EventFingerprint]*AggregationInstance),

aggRequests: make(chan *aggregateEventsRequest),
rulesRequests: make(chan *aggregatorResetRulesRequest),
closed: make(chan bool),
aggRequests: make(chan *aggregateEventsRequest),
getAggregatesRequests: make(chan *getAggregatesRequest),
removeAggregateRequests: make(chan EventFingerprint),
rulesRequests: make(chan *aggregatorResetRulesRequest),
closed: make(chan bool),
}
}

func (a *Aggregator) Close() {
close(a.rulesRequests)
close(a.aggRequests)
close(a.getAggregatesRequests)
close(a.removeAggregateRequests)

<-a.closed
close(a.closed)
Expand All @@ -146,6 +145,14 @@ type aggregateEventsRequest struct {
Response chan *aggregateEventsResponse
}

type getAggregatesResponse struct {
Aggregates AggregationInstances
}

type getAggregatesRequest struct {
Response chan getAggregatesResponse
}

func (a *Aggregator) aggregate(req *aggregateEventsRequest, s SummaryReceiver) {
if len(a.Rules) == 0 {
req.Response <- &aggregateEventsResponse{
Expand All @@ -162,8 +169,14 @@ func (a *Aggregator) aggregate(req *aggregateEventsRequest, s SummaryReceiver) {
fp := element.Fingerprint()
aggregation, ok := a.Aggregates[fp]
if !ok {
expTimer := time.AfterFunc(minimumRefreshPeriod, func() {
a.removeAggregateRequests <- fp
})

aggregation = &AggregationInstance{
Rule: r,
Rule: r,
Created: time.Now(),
expiryTimer: expTimer,
}

a.Aggregates[fp] = aggregation
Expand Down Expand Up @@ -196,6 +209,26 @@ func (a *Aggregator) replaceRules(r *aggregatorResetRulesRequest) {
close(r.Response)
}

func (a *Aggregator) AlertAggregates() AggregationInstances {
req := &getAggregatesRequest{
Response: make(chan getAggregatesResponse),
}

a.getAggregatesRequests <- req

result := <-req.Response

return result.Aggregates
}

func (a *Aggregator) aggregates() AggregationInstances {
aggs := make(AggregationInstances, 0, len(a.Aggregates))
for _, agg := range a.Aggregates {
aggs = append(aggs, agg)
}
return aggs
}

func (a *Aggregator) Receive(e Events) error {
req := &aggregateEventsRequest{
Events: e,
Expand Down Expand Up @@ -244,10 +277,17 @@ func (a *Aggregator) Dispatch(s SummaryReceiver) {
closed++
}

case <-t.C:
for _, a := range a.Aggregates {
a.Tidy()
case req := <-a.getAggregatesRequests:
aggs := a.aggregates()
req.Response <- getAggregatesResponse{
Aggregates: aggs,
}
close(req.Response)

case fp := <-a.removeAggregateRequests:
log.Println("Deleting expired aggregation instance", a)
a.Aggregates[fp].Close()
delete(a.Aggregates, fp)
}
}

Expand Down
7 changes: 5 additions & 2 deletions manager/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func DispatcherFor(destination string) DestinationDispatcher {
type EventSummary struct {
Rule *AggregationRule

Events Events
Event *Event

Destination string
}
Expand Down Expand Up @@ -116,14 +116,17 @@ func (d *SummaryDispatcher) Receive(s *EventSummary) RemoteError {
}

func (d *SummaryDispatcher) dispatchSummary(r *summaryDispatchRequest, i IsInhibitedInterrogator) {
if i.IsInhibited(r.Summary.Events[0]) {
if i.IsInhibited(r.Summary.Event) {
r.Response <- &summaryDispatchResponse{
Disposition: SUPPRESSED,
}
return
}

// BUG: Perform sending of summaries.
r.Response <- &summaryDispatchResponse{
Disposition: DISPATCHED,
}
}

func (d *SummaryDispatcher) Dispatch(i IsInhibitedInterrogator) {
Expand Down
17 changes: 6 additions & 11 deletions manager/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,28 +17,23 @@ import (
"fmt"
"hash/fnv"
"sort"
"time"
)

type EventFingerprint uint64

// Event models an action triggered by Prometheus.
type Event struct {
// Label value pairs for purpose of aggregation, matching, and disposition
// dispatching. This must minimally include a "name" label.
Labels map[string]string

// CreatedAt indicates when the event was created.
CreatedAt time.Time

// ExpiresAt is the allowed lifetime for this event before it is reaped.
ExpiresAt time.Time

// Extra key/value information which is not used for aggregation.
Payload map[string]string
}

func (e Event) Fingerprint() uint64 {
func (e Event) Fingerprint() EventFingerprint {
keys := []string{}

for k := range e.Payload {
for k := range e.Labels {
keys = append(keys, k)
}

Expand All @@ -50,7 +45,7 @@ func (e Event) Fingerprint() uint64 {
fmt.Fprintf(summer, k, e.Labels[k])
}

return summer.Sum64()
return EventFingerprint(summer.Sum64())
}

type Events []*Event
14 changes: 12 additions & 2 deletions web/alerts.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,21 @@
package web

import (
"github.com/prometheus/alert_manager/manager"
"net/http"
)

type AlertsHandler struct {}
type AlertStatus struct {
AlertAggregates []*manager.AggregationInstance
}

type AlertsHandler struct {
Aggregator *manager.Aggregator
}

func (h *AlertsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
executeTemplate(w, "alerts", nil)
alertStatus := &AlertStatus{
AlertAggregates: h.Aggregator.AlertAggregates(),
}
executeTemplate(w, "alerts", alertStatus)
}
Loading