Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions dispatch/dispatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ func (d *Dispatcher) processAlert(alert *types.Alert, route *Route) {
return
}

ag = newAggrGroup(d.ctx, groupLabels, route, d.timeout, d.logger)
ag = newAggrGroup(d.ctx, groupLabels, route, d.timeout, d.marker.(types.AlertMarker), d.logger)
routeGroups[fp] = ag
d.aggrGroupsNum++
d.metrics.aggrGroups.Inc()
Expand Down Expand Up @@ -389,6 +389,7 @@ type aggrGroup struct {
routeKey string

alerts *store.Alerts
marker types.AlertMarker
ctx context.Context
cancel func()
done chan struct{}
Expand All @@ -400,7 +401,7 @@ type aggrGroup struct {
}

// newAggrGroup returns a new aggregation group.
func newAggrGroup(ctx context.Context, labels model.LabelSet, r *Route, to func(time.Duration) time.Duration, logger *slog.Logger) *aggrGroup {
func newAggrGroup(ctx context.Context, labels model.LabelSet, r *Route, to func(time.Duration) time.Duration, marker types.AlertMarker, logger *slog.Logger) *aggrGroup {
if to == nil {
to = func(d time.Duration) time.Duration { return d }
}
Expand All @@ -411,6 +412,7 @@ func newAggrGroup(ctx context.Context, labels model.LabelSet, r *Route, to func(
opts: &r.RouteOpts,
timeout: to,
alerts: store.NewAlerts(),
marker: marker,
done: make(chan struct{}),
}
ag.ctx, ag.cancel = context.WithCancel(ctx)
Expand Down Expand Up @@ -539,6 +541,14 @@ func (ag *aggrGroup) flush(notify func(...*types.Alert) bool) {
// we would delete an active alert thinking it was resolved.
if err := ag.alerts.DeleteIfNotModified(resolvedSlice); err != nil {
ag.logger.Error("error on delete alerts", "err", err)
} else {
// Delete markers for resolved alerts that are not in the store.
for _, alert := range resolvedSlice {
Copy link
Collaborator

@grobinson-grafana grobinson-grafana Nov 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just heads up this is a race condition for the same reason we had to implement DeleteIfNotModified. There is a missing check to make sure the deleted alert is the same alert returned from ag.alerts.Get(alert.Fingerprint()).

Without this check what can happen is we delete the alert in between DeletedIfNotModified and then before we call Get a new alert is received. What happens then is we delete the marker of the new alert by mistake.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we discussed on Slack, this is safe since even if we delete the marker the alert status would then be unprocessed:

alertmanager/types/types.go

Lines 261 to 274 in 52eb1fc

// Status implements AlertMarker.
func (m *MemMarker) Status(alert model.Fingerprint) AlertStatus {
m.mtx.RLock()
defer m.mtx.RUnlock()
if s, found := m.alerts[alert]; found {
return *s
}
return AlertStatus{
State: AlertStateUnprocessed,
SilencedBy: []string{},
InhibitedBy: []string{},
}
}

_, err := ag.alerts.Get(alert.Fingerprint())
if errors.Is(err, store.ErrNotFound) {
ag.marker.Delete(alert.Fingerprint())
}
}
}
}
}
Expand Down
200 changes: 197 additions & 3 deletions dispatch/dispatch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func TestAggrGroup(t *testing.T) {
}

// Test regular situation where we wait for group_wait to send out alerts.
ag := newAggrGroup(context.Background(), lset, route, nil, promslog.NewNopLogger())
ag := newAggrGroup(context.Background(), lset, route, nil, types.NewMarker(prometheus.NewRegistry()), promslog.NewNopLogger())
go ag.run(ntfy)

ag.insert(a1)
Expand Down Expand Up @@ -195,7 +195,7 @@ func TestAggrGroup(t *testing.T) {
// immediate flushing.
// Finally, set all alerts to be resolved. After successful notify the aggregation group
// should empty itself.
ag = newAggrGroup(context.Background(), lset, route, nil, promslog.NewNopLogger())
ag = newAggrGroup(context.Background(), lset, route, nil, types.NewMarker(prometheus.NewRegistry()), promslog.NewNopLogger())
go ag.run(ntfy)

ag.insert(a1)
Expand Down Expand Up @@ -757,7 +757,7 @@ func TestDispatcher_DoMaintenance(t *testing.T) {

// Insert an aggregation group with no alerts.
labels := model.LabelSet{"alertname": "1"}
aggrGroup1 := newAggrGroup(ctx, labels, route, timeout, promslog.NewNopLogger())
aggrGroup1 := newAggrGroup(ctx, labels, route, timeout, types.NewMarker(prometheus.NewRegistry()), promslog.NewNopLogger())
aggrGroups[route][aggrGroup1.fingerprint()] = aggrGroup1
dispatcher.aggrGroupsPerRoute = aggrGroups
// Must run otherwise doMaintenance blocks on aggrGroup1.stop().
Expand All @@ -775,3 +775,197 @@ func TestDispatcher_DoMaintenance(t *testing.T) {
require.False(t, isMuted)
require.Empty(t, mutedBy)
}

func TestDispatcher_DeleteResolvedAlertsFromMarker(t *testing.T) {
t.Run("successful flush deletes markers for resolved alerts", func(t *testing.T) {
ctx := context.Background()
marker := types.NewMarker(prometheus.NewRegistry())
labels := model.LabelSet{"alertname": "TestAlert"}
route := &Route{
RouteOpts: RouteOpts{
Receiver: "test",
GroupBy: map[model.LabelName]struct{}{"alertname": {}},
GroupWait: 0,
GroupInterval: time.Minute,
RepeatInterval: time.Hour,
},
}
timeout := func(d time.Duration) time.Duration { return d }
logger := promslog.NewNopLogger()

// Create an aggregation group
ag := newAggrGroup(ctx, labels, route, timeout, marker, logger)

// Create test alerts: one active and one resolved
now := time.Now()
activeAlert := &types.Alert{
Alert: model.Alert{
Labels: model.LabelSet{
"alertname": "TestAlert",
"instance": "1",
},
StartsAt: now.Add(-time.Hour),
EndsAt: now.Add(time.Hour), // Active alert
},
UpdatedAt: now,
}
resolvedAlert := &types.Alert{
Alert: model.Alert{
Labels: model.LabelSet{
"alertname": "TestAlert",
"instance": "2",
},
StartsAt: now.Add(-time.Hour),
EndsAt: now.Add(-time.Minute), // Resolved alert
},
UpdatedAt: now,
}

// Insert alerts into the aggregation group
ag.insert(activeAlert)
ag.insert(resolvedAlert)

// Set markers for both alerts
marker.SetActiveOrSilenced(activeAlert.Fingerprint(), 0, nil, nil)
marker.SetActiveOrSilenced(resolvedAlert.Fingerprint(), 0, nil, nil)

// Verify markers exist before flush
require.True(t, marker.Active(activeAlert.Fingerprint()))
require.True(t, marker.Active(resolvedAlert.Fingerprint()))

// Create a notify function that succeeds
notifyFunc := func(alerts ...*types.Alert) bool {
return true
}

// Flush the alerts
ag.flush(notifyFunc)

// Verify that the resolved alert's marker was deleted
require.True(t, marker.Active(activeAlert.Fingerprint()), "active alert marker should still exist")
require.False(t, marker.Active(resolvedAlert.Fingerprint()), "resolved alert marker should be deleted")
})

t.Run("failed flush does not delete markers", func(t *testing.T) {
ctx := context.Background()
marker := types.NewMarker(prometheus.NewRegistry())
labels := model.LabelSet{"alertname": "TestAlert"}
route := &Route{
RouteOpts: RouteOpts{
Receiver: "test",
GroupBy: map[model.LabelName]struct{}{"alertname": {}},
GroupWait: 0,
GroupInterval: time.Minute,
RepeatInterval: time.Hour,
},
}
timeout := func(d time.Duration) time.Duration { return d }
logger := promslog.NewNopLogger()

// Create an aggregation group
ag := newAggrGroup(ctx, labels, route, timeout, marker, logger)

// Create a resolved alert
now := time.Now()
resolvedAlert := &types.Alert{
Alert: model.Alert{
Labels: model.LabelSet{
"alertname": "TestAlert",
"instance": "1",
},
StartsAt: now.Add(-time.Hour),
EndsAt: now.Add(-time.Minute), // Resolved alert
},
UpdatedAt: now,
}

// Insert alert into the aggregation group
ag.insert(resolvedAlert)

// Set marker for the alert
marker.SetActiveOrSilenced(resolvedAlert.Fingerprint(), 0, nil, nil)

// Verify marker exists before flush
require.True(t, marker.Active(resolvedAlert.Fingerprint()))

// Create a notify function that fails
notifyFunc := func(alerts ...*types.Alert) bool {
return false
}

// Flush the alerts (notify will fail)
ag.flush(notifyFunc)

// Verify that the marker was NOT deleted due to failed notification
require.True(t, marker.Active(resolvedAlert.Fingerprint()), "marker should not be deleted when notify fails")
})

t.Run("markers not deleted when alert is modified during flush", func(t *testing.T) {
ctx := context.Background()
marker := types.NewMarker(prometheus.NewRegistry())
labels := model.LabelSet{"alertname": "TestAlert"}
route := &Route{
RouteOpts: RouteOpts{
Receiver: "test",
GroupBy: map[model.LabelName]struct{}{"alertname": {}},
GroupWait: 0,
GroupInterval: time.Minute,
RepeatInterval: time.Hour,
},
}
timeout := func(d time.Duration) time.Duration { return d }
logger := promslog.NewNopLogger()

// Create an aggregation group
ag := newAggrGroup(ctx, labels, route, timeout, marker, logger)

// Create a resolved alert
now := time.Now()
resolvedAlert := &types.Alert{
Alert: model.Alert{
Labels: model.LabelSet{
"alertname": "TestAlert",
"instance": "1",
},
StartsAt: now.Add(-time.Hour),
EndsAt: now.Add(-time.Minute), // Resolved alert
},
UpdatedAt: now,
}

// Insert alert into the aggregation group
ag.insert(resolvedAlert)

// Set marker for the alert
marker.SetActiveOrSilenced(resolvedAlert.Fingerprint(), 0, nil, nil)

// Verify marker exists before flush
require.True(t, marker.Active(resolvedAlert.Fingerprint()))

// Create a notify function that modifies the alert before returning
notifyFunc := func(alerts ...*types.Alert) bool {
// Simulate the alert being modified (e.g., firing again) during flush
modifiedAlert := &types.Alert{
Alert: model.Alert{
Labels: model.LabelSet{
"alertname": "TestAlert",
"instance": "1",
},
StartsAt: now.Add(-time.Hour),
EndsAt: now.Add(time.Hour), // Active again
},
UpdatedAt: now.Add(time.Second), // More recent update
}
// Update the alert in the store
ag.alerts.Set(modifiedAlert)
return true
}

// Flush the alerts
ag.flush(notifyFunc)

// Verify that the marker was NOT deleted because the alert was modified
// during the flush (DeleteIfNotModified should have failed)
require.True(t, marker.Active(resolvedAlert.Fingerprint()), "marker should not be deleted when alert is modified during flush")
})
}
Loading