From d7f2c924a9141bcdec9fc54c15a0846e60198cbf Mon Sep 17 00:00:00 2001 From: Siavash Safi Date: Mon, 23 Jun 2025 17:24:01 +0200 Subject: [PATCH] fix(marker): stop state leakage from aggregation groups This change makes aggregation groups to delete resolved alerts from marker, therefore avoiding the leakage of ghost states mentioned in #4402. Signed-off-by: Siavash Safi --- dispatch/dispatch.go | 14 ++- dispatch/dispatch_test.go | 200 +++++++++++++++++++++++++++++++++++++- 2 files changed, 209 insertions(+), 5 deletions(-) diff --git a/dispatch/dispatch.go b/dispatch/dispatch.go index 973f084a43..b99807fd10 100644 --- a/dispatch/dispatch.go +++ b/dispatch/dispatch.go @@ -340,7 +340,7 @@ func (d *Dispatcher) processAlert(alert *types.Alert, route *Route) { return } - ag = newAggrGroup(d.ctx, groupLabels, route, d.timeout, d.logger) + ag = newAggrGroup(d.ctx, groupLabels, route, d.timeout, d.marker.(types.AlertMarker), d.logger) routeGroups[fp] = ag d.aggrGroupsNum++ d.metrics.aggrGroups.Inc() @@ -389,6 +389,7 @@ type aggrGroup struct { routeKey string alerts *store.Alerts + marker types.AlertMarker ctx context.Context cancel func() done chan struct{} @@ -400,7 +401,7 @@ type aggrGroup struct { } // newAggrGroup returns a new aggregation group. -func newAggrGroup(ctx context.Context, labels model.LabelSet, r *Route, to func(time.Duration) time.Duration, logger *slog.Logger) *aggrGroup { +func newAggrGroup(ctx context.Context, labels model.LabelSet, r *Route, to func(time.Duration) time.Duration, marker types.AlertMarker, logger *slog.Logger) *aggrGroup { if to == nil { to = func(d time.Duration) time.Duration { return d } } @@ -411,6 +412,7 @@ func newAggrGroup(ctx context.Context, labels model.LabelSet, r *Route, to func( opts: &r.RouteOpts, timeout: to, alerts: store.NewAlerts(), + marker: marker, done: make(chan struct{}), } ag.ctx, ag.cancel = context.WithCancel(ctx) @@ -539,6 +541,14 @@ func (ag *aggrGroup) flush(notify func(...*types.Alert) bool) { // we would delete an active alert thinking it was resolved. if err := ag.alerts.DeleteIfNotModified(resolvedSlice); err != nil { ag.logger.Error("error on delete alerts", "err", err) + } else { + // Delete markers for resolved alerts that are not in the store. + for _, alert := range resolvedSlice { + _, err := ag.alerts.Get(alert.Fingerprint()) + if errors.Is(err, store.ErrNotFound) { + ag.marker.Delete(alert.Fingerprint()) + } + } } } } diff --git a/dispatch/dispatch_test.go b/dispatch/dispatch_test.go index de3d4db393..a0875687d2 100644 --- a/dispatch/dispatch_test.go +++ b/dispatch/dispatch_test.go @@ -141,7 +141,7 @@ func TestAggrGroup(t *testing.T) { } // Test regular situation where we wait for group_wait to send out alerts. - ag := newAggrGroup(context.Background(), lset, route, nil, promslog.NewNopLogger()) + ag := newAggrGroup(context.Background(), lset, route, nil, types.NewMarker(prometheus.NewRegistry()), promslog.NewNopLogger()) go ag.run(ntfy) ag.insert(a1) @@ -195,7 +195,7 @@ func TestAggrGroup(t *testing.T) { // immediate flushing. // Finally, set all alerts to be resolved. After successful notify the aggregation group // should empty itself. - ag = newAggrGroup(context.Background(), lset, route, nil, promslog.NewNopLogger()) + ag = newAggrGroup(context.Background(), lset, route, nil, types.NewMarker(prometheus.NewRegistry()), promslog.NewNopLogger()) go ag.run(ntfy) ag.insert(a1) @@ -757,7 +757,7 @@ func TestDispatcher_DoMaintenance(t *testing.T) { // Insert an aggregation group with no alerts. labels := model.LabelSet{"alertname": "1"} - aggrGroup1 := newAggrGroup(ctx, labels, route, timeout, promslog.NewNopLogger()) + aggrGroup1 := newAggrGroup(ctx, labels, route, timeout, types.NewMarker(prometheus.NewRegistry()), promslog.NewNopLogger()) aggrGroups[route][aggrGroup1.fingerprint()] = aggrGroup1 dispatcher.aggrGroupsPerRoute = aggrGroups // Must run otherwise doMaintenance blocks on aggrGroup1.stop(). @@ -775,3 +775,197 @@ func TestDispatcher_DoMaintenance(t *testing.T) { require.False(t, isMuted) require.Empty(t, mutedBy) } + +func TestDispatcher_DeleteResolvedAlertsFromMarker(t *testing.T) { + t.Run("successful flush deletes markers for resolved alerts", func(t *testing.T) { + ctx := context.Background() + marker := types.NewMarker(prometheus.NewRegistry()) + labels := model.LabelSet{"alertname": "TestAlert"} + route := &Route{ + RouteOpts: RouteOpts{ + Receiver: "test", + GroupBy: map[model.LabelName]struct{}{"alertname": {}}, + GroupWait: 0, + GroupInterval: time.Minute, + RepeatInterval: time.Hour, + }, + } + timeout := func(d time.Duration) time.Duration { return d } + logger := promslog.NewNopLogger() + + // Create an aggregation group + ag := newAggrGroup(ctx, labels, route, timeout, marker, logger) + + // Create test alerts: one active and one resolved + now := time.Now() + activeAlert := &types.Alert{ + Alert: model.Alert{ + Labels: model.LabelSet{ + "alertname": "TestAlert", + "instance": "1", + }, + StartsAt: now.Add(-time.Hour), + EndsAt: now.Add(time.Hour), // Active alert + }, + UpdatedAt: now, + } + resolvedAlert := &types.Alert{ + Alert: model.Alert{ + Labels: model.LabelSet{ + "alertname": "TestAlert", + "instance": "2", + }, + StartsAt: now.Add(-time.Hour), + EndsAt: now.Add(-time.Minute), // Resolved alert + }, + UpdatedAt: now, + } + + // Insert alerts into the aggregation group + ag.insert(activeAlert) + ag.insert(resolvedAlert) + + // Set markers for both alerts + marker.SetActiveOrSilenced(activeAlert.Fingerprint(), 0, nil, nil) + marker.SetActiveOrSilenced(resolvedAlert.Fingerprint(), 0, nil, nil) + + // Verify markers exist before flush + require.True(t, marker.Active(activeAlert.Fingerprint())) + require.True(t, marker.Active(resolvedAlert.Fingerprint())) + + // Create a notify function that succeeds + notifyFunc := func(alerts ...*types.Alert) bool { + return true + } + + // Flush the alerts + ag.flush(notifyFunc) + + // Verify that the resolved alert's marker was deleted + require.True(t, marker.Active(activeAlert.Fingerprint()), "active alert marker should still exist") + require.False(t, marker.Active(resolvedAlert.Fingerprint()), "resolved alert marker should be deleted") + }) + + t.Run("failed flush does not delete markers", func(t *testing.T) { + ctx := context.Background() + marker := types.NewMarker(prometheus.NewRegistry()) + labels := model.LabelSet{"alertname": "TestAlert"} + route := &Route{ + RouteOpts: RouteOpts{ + Receiver: "test", + GroupBy: map[model.LabelName]struct{}{"alertname": {}}, + GroupWait: 0, + GroupInterval: time.Minute, + RepeatInterval: time.Hour, + }, + } + timeout := func(d time.Duration) time.Duration { return d } + logger := promslog.NewNopLogger() + + // Create an aggregation group + ag := newAggrGroup(ctx, labels, route, timeout, marker, logger) + + // Create a resolved alert + now := time.Now() + resolvedAlert := &types.Alert{ + Alert: model.Alert{ + Labels: model.LabelSet{ + "alertname": "TestAlert", + "instance": "1", + }, + StartsAt: now.Add(-time.Hour), + EndsAt: now.Add(-time.Minute), // Resolved alert + }, + UpdatedAt: now, + } + + // Insert alert into the aggregation group + ag.insert(resolvedAlert) + + // Set marker for the alert + marker.SetActiveOrSilenced(resolvedAlert.Fingerprint(), 0, nil, nil) + + // Verify marker exists before flush + require.True(t, marker.Active(resolvedAlert.Fingerprint())) + + // Create a notify function that fails + notifyFunc := func(alerts ...*types.Alert) bool { + return false + } + + // Flush the alerts (notify will fail) + ag.flush(notifyFunc) + + // Verify that the marker was NOT deleted due to failed notification + require.True(t, marker.Active(resolvedAlert.Fingerprint()), "marker should not be deleted when notify fails") + }) + + t.Run("markers not deleted when alert is modified during flush", func(t *testing.T) { + ctx := context.Background() + marker := types.NewMarker(prometheus.NewRegistry()) + labels := model.LabelSet{"alertname": "TestAlert"} + route := &Route{ + RouteOpts: RouteOpts{ + Receiver: "test", + GroupBy: map[model.LabelName]struct{}{"alertname": {}}, + GroupWait: 0, + GroupInterval: time.Minute, + RepeatInterval: time.Hour, + }, + } + timeout := func(d time.Duration) time.Duration { return d } + logger := promslog.NewNopLogger() + + // Create an aggregation group + ag := newAggrGroup(ctx, labels, route, timeout, marker, logger) + + // Create a resolved alert + now := time.Now() + resolvedAlert := &types.Alert{ + Alert: model.Alert{ + Labels: model.LabelSet{ + "alertname": "TestAlert", + "instance": "1", + }, + StartsAt: now.Add(-time.Hour), + EndsAt: now.Add(-time.Minute), // Resolved alert + }, + UpdatedAt: now, + } + + // Insert alert into the aggregation group + ag.insert(resolvedAlert) + + // Set marker for the alert + marker.SetActiveOrSilenced(resolvedAlert.Fingerprint(), 0, nil, nil) + + // Verify marker exists before flush + require.True(t, marker.Active(resolvedAlert.Fingerprint())) + + // Create a notify function that modifies the alert before returning + notifyFunc := func(alerts ...*types.Alert) bool { + // Simulate the alert being modified (e.g., firing again) during flush + modifiedAlert := &types.Alert{ + Alert: model.Alert{ + Labels: model.LabelSet{ + "alertname": "TestAlert", + "instance": "1", + }, + StartsAt: now.Add(-time.Hour), + EndsAt: now.Add(time.Hour), // Active again + }, + UpdatedAt: now.Add(time.Second), // More recent update + } + // Update the alert in the store + ag.alerts.Set(modifiedAlert) + return true + } + + // Flush the alerts + ag.flush(notifyFunc) + + // Verify that the marker was NOT deleted because the alert was modified + // during the flush (DeleteIfNotModified should have failed) + require.True(t, marker.Active(resolvedAlert.Fingerprint()), "marker should not be deleted when alert is modified during flush") + }) +}