From b8205eb2b267e6d90e1a5edb7e08a1d61364c963 Mon Sep 17 00:00:00 2001 From: Christoph Maser Date: Wed, 11 Mar 2026 21:25:07 +0100 Subject: [PATCH] feat(notify): generic handling of 429 in retier add generic handling of 429 in retier. This will allow to remove any handling of 429 in the individual notifiers and instead rely on the retier to handle it (TBD in follow-up PRs). Fixes #2205 Refs: #5048 Signed-off-by: Christoph Maser --- notify/discord/discord.go | 8 +- notify/discord/discord_test.go | 2 +- notify/incidentio/incidentio.go | 6 +- notify/incidentio/incidentio_test.go | 4 +- notify/jira/jira.go | 14 ++- notify/jira/jira_test.go | 2 +- notify/mattermost/mattermost.go | 10 +- notify/mattermost/mattermost_test.go | 2 +- notify/msteams/msteams.go | 7 +- notify/msteams/msteams_test.go | 2 +- notify/msteamsv2/msteamsv2.go | 7 +- notify/msteamsv2/msteamsv2_test.go | 2 +- notify/notify.go | 34 +++++- notify/notify_test.go | 109 ++++++++++++++++++++ notify/opsgenie/opsgenie.go | 6 +- notify/opsgenie/opsgenie_test.go | 2 +- notify/pagerduty/pagerduty.go | 9 +- notify/pagerduty/pagerduty_test.go | 4 +- notify/pushover/pushover.go | 7 +- notify/pushover/pushover_test.go | 2 +- notify/rocketchat/rocketchat.go | 13 ++- notify/rocketchat/rocketchat_test.go | 2 +- notify/slack/slack.go | 13 ++- notify/slack/slack_test.go | 2 +- notify/sns/sns.go | 18 +++- notify/telegram/telegram_test.go | 3 +- notify/test/test.go | 25 +++++ notify/util.go | 90 +++++++++++++--- notify/util_test.go | 125 +++++++++++++++++++---- notify/victorops/victorops.go | 7 +- notify/victorops/victorops_test.go | 2 +- notify/webex/webex.go | 2 +- notify/webex/webex_test.go | 2 +- notify/webhook/webhook.go | 7 +- notify/webhook/webhook_test.go | 13 ++- notify/wechat/wechat.go | 1 + test/with_api_v2/acceptance/send_test.go | 15 ++- 37 files changed, 452 insertions(+), 127 deletions(-) diff --git a/notify/discord/discord.go b/notify/discord/discord.go index 8941996a6c..b8c58d7003 100644 --- a/notify/discord/discord.go +++ b/notify/discord/discord.go @@ -176,10 +176,8 @@ func (n *Notifier) Notify(ctx context.Context, as ...*types.Alert) (bool, error) if err != nil { return true, notify.RedactURL(err) } + defer notify.Drain(resp) - shouldRetry, err := n.retrier.Check(resp.StatusCode, resp.Body) - if err != nil { - return shouldRetry, err - } - return false, nil + shouldRetry, errWithReason := n.retrier.Check(resp) + return shouldRetry, errWithReason } diff --git a/notify/discord/discord_test.go b/notify/discord/discord_test.go index ca533e1208..62c2cdd5d8 100644 --- a/notify/discord/discord_test.go +++ b/notify/discord/discord_test.go @@ -52,7 +52,7 @@ func TestDiscordRetry(t *testing.T) { require.NoError(t, err) for statusCode, expected := range test.RetryTests(test.DefaultRetryCodes()) { - actual, _ := notifier.retrier.Check(statusCode, nil) + actual, _ := notifier.retrier.Check(test.HTTPResponseForStatusCode(statusCode)) require.Equal(t, expected, actual, "retry - error on status %d", statusCode) } } diff --git a/notify/incidentio/incidentio.go b/notify/incidentio/incidentio.go index b985853eab..9f987007ec 100644 --- a/notify/incidentio/incidentio.go +++ b/notify/incidentio/incidentio.go @@ -202,11 +202,7 @@ func (n *Notifier) Notify(ctx context.Context, alerts ...*types.Alert) (bool, er } defer notify.Drain(resp) - shouldRetry, err := n.retrier.Check(resp.StatusCode, resp.Body) - if err != nil { - return shouldRetry, notify.NewErrorWithReason(notify.GetFailureReasonFromStatusCode(resp.StatusCode), err) - } - return shouldRetry, err + return n.retrier.Check(resp) } // errDetails extracts error details from the response for better error messages. diff --git a/notify/incidentio/incidentio_test.go b/notify/incidentio/incidentio_test.go index 65557a6199..708bcafbce 100644 --- a/notify/incidentio/incidentio_test.go +++ b/notify/incidentio/incidentio_test.go @@ -52,7 +52,7 @@ func TestIncidentIORetry(t *testing.T) { retryCodes := append(test.DefaultRetryCodes(), http.StatusTooManyRequests) for statusCode, expected := range test.RetryTests(retryCodes) { - actual, _ := notifier.retrier.Check(statusCode, nil) + actual, _ := notifier.retrier.Check(test.HTTPResponseForStatusCode(statusCode)) require.Equal(t, expected, actual, "retry - error on status %d", statusCode) } } @@ -187,7 +187,7 @@ func TestIncidentIORetryScenarios(t *testing.T) { statusCode: http.StatusTooManyRequests, responseBody: []byte(`{"error":"rate limit exceeded","message":"Too many requests"}`), expectRetry: true, - expectErrorMsgContains: "rate limit exceeded", + expectErrorMsgContains: "unexpected status code 429: Too many requests", }, { name: "server error response", diff --git a/notify/jira/jira.go b/notify/jira/jira.go index 401ae1df6f..06e924f2a5 100644 --- a/notify/jira/jira.go +++ b/notify/jira/jira.go @@ -401,9 +401,17 @@ func (n *Notifier) doAPIRequestFullPath(ctx context.Context, method, path string return nil, false, err } - shouldRetry, err := n.retrier.Check(resp.StatusCode, bytes.NewReader(responseBody)) - if err != nil { - return nil, shouldRetry, notify.NewErrorWithReason(notify.GetFailureReasonFromStatusCode(resp.StatusCode), err) + // Pass a temporary response to the retrier so that resp.Body remains + // the original network body; defer notify.Drain(resp) will then close + // the real connection rather than the in-memory reader. + checkResp := &http.Response{ + StatusCode: resp.StatusCode, + Header: resp.Header, + Body: io.NopCloser(bytes.NewReader(responseBody)), + } + shouldRetry, errWithReason := n.retrier.Check(checkResp) + if errWithReason != nil { + return nil, shouldRetry, errWithReason } return responseBody, false, nil diff --git a/notify/jira/jira_test.go b/notify/jira/jira_test.go index 77bad0c7c0..eabd9d4f1f 100644 --- a/notify/jira/jira_test.go +++ b/notify/jira/jira_test.go @@ -70,7 +70,7 @@ func TestJiraRetry(t *testing.T) { retryCodes := append(test.DefaultRetryCodes(), http.StatusTooManyRequests) for statusCode, expected := range test.RetryTests(retryCodes) { - actual, _ := notifier.retrier.Check(statusCode, nil) + actual, _ := notifier.retrier.Check(test.HTTPResponseForStatusCode(statusCode)) require.Equal(t, expected, actual, "retry - error on status %d", statusCode) } } diff --git a/notify/mattermost/mattermost.go b/notify/mattermost/mattermost.go index 625aaa7aaa..878a3dc1f7 100644 --- a/notify/mattermost/mattermost.go +++ b/notify/mattermost/mattermost.go @@ -141,10 +141,14 @@ func (n *Notifier) Notify(ctx context.Context, alert ...*types.Alert) (bool, err // Use a retrier to generate an error message for non-200 responses and // classify them as retriable or not. - retry, err := n.retrier.Check(resp.StatusCode, resp.Body) + retry, err := n.retrier.Check(resp) if err != nil { - err = fmt.Errorf("channel %q: %w", req.Channel, err) - return retry, notify.NewErrorWithReason(notify.GetFailureReasonFromStatusCode(resp.StatusCode), err) + var ewr *notify.ErrorWithReason + if errors.As(err, &ewr) { + ewr.Err = fmt.Errorf("channel %q: %w", req.Channel, ewr.Err) + return retry, ewr + } + return retry, fmt.Errorf("channel %q: %w", req.Channel, err) } n.logger.Debug("Message sent to Mattermost successfully", "status", resp.StatusCode) diff --git a/notify/mattermost/mattermost_test.go b/notify/mattermost/mattermost_test.go index 65adafb289..3d5a4c594f 100644 --- a/notify/mattermost/mattermost_test.go +++ b/notify/mattermost/mattermost_test.go @@ -51,7 +51,7 @@ func TestMattermostRetry(t *testing.T) { require.NoError(t, err) for statusCode, expected := range test.RetryTests(test.DefaultRetryCodes()) { - actual, _ := notifier.retrier.Check(statusCode, nil) + actual, _ := notifier.retrier.Check(test.HTTPResponseForStatusCode(statusCode)) require.Equal(t, expected, actual, "retry - error on status %d", statusCode) } } diff --git a/notify/msteams/msteams.go b/notify/msteams/msteams.go index 1f606479c5..c23c214140 100644 --- a/notify/msteams/msteams.go +++ b/notify/msteams/msteams.go @@ -150,9 +150,6 @@ func (n *Notifier) Notify(ctx context.Context, as ...*types.Alert) (bool, error) defer notify.Drain(resp) // https://learn.microsoft.com/en-us/microsoftteams/platform/webhooks-and-connectors/how-to/connectors-using?tabs=cURL#rate-limiting-for-connectors - shouldRetry, err := n.retrier.Check(resp.StatusCode, resp.Body) - if err != nil { - return shouldRetry, notify.NewErrorWithReason(notify.GetFailureReasonFromStatusCode(resp.StatusCode), err) - } - return shouldRetry, err + shouldRetry, errWithReason := n.retrier.Check(resp) + return shouldRetry, errWithReason } diff --git a/notify/msteams/msteams_test.go b/notify/msteams/msteams_test.go index 476566c90d..ffab47fc24 100644 --- a/notify/msteams/msteams_test.go +++ b/notify/msteams/msteams_test.go @@ -52,7 +52,7 @@ func TestMSTeamsRetry(t *testing.T) { require.NoError(t, err) for statusCode, expected := range test.RetryTests(test.DefaultRetryCodes()) { - actual, _ := notifier.retrier.Check(statusCode, nil) + actual, _ := notifier.retrier.Check(test.HTTPResponseForStatusCode(statusCode)) require.Equal(t, expected, actual, "retry - error on status %d", statusCode) } } diff --git a/notify/msteamsv2/msteamsv2.go b/notify/msteamsv2/msteamsv2.go index dc87666238..9ff321ad2f 100644 --- a/notify/msteamsv2/msteamsv2.go +++ b/notify/msteamsv2/msteamsv2.go @@ -196,9 +196,6 @@ func (n *Notifier) Notify(ctx context.Context, as ...*types.Alert) (bool, error) defer notify.Drain(resp) // https://learn.microsoft.com/en-us/microsoftteams/platform/webhooks-and-connectors/how-to/connectors-using?tabs=cURL#rate-limiting-for-connectors - shouldRetry, err := n.retrier.Check(resp.StatusCode, resp.Body) - if err != nil { - return shouldRetry, notify.NewErrorWithReason(notify.GetFailureReasonFromStatusCode(resp.StatusCode), err) - } - return shouldRetry, err + shouldRetry, errWithReason := n.retrier.Check(resp) + return shouldRetry, errWithReason } diff --git a/notify/msteamsv2/msteamsv2_test.go b/notify/msteamsv2/msteamsv2_test.go index 5bae1ab790..aa27954830 100644 --- a/notify/msteamsv2/msteamsv2_test.go +++ b/notify/msteamsv2/msteamsv2_test.go @@ -52,7 +52,7 @@ func TestMSTeamsV2Retry(t *testing.T) { require.NoError(t, err) for statusCode, expected := range test.RetryTests(test.DefaultRetryCodes()) { - actual, _ := notifier.retrier.Check(statusCode, nil) + actual, _ := notifier.retrier.Check(test.HTTPResponseForStatusCode(statusCode)) require.Equal(t, expected, actual, "retry - error on status %d", statusCode) } } diff --git a/notify/notify.go b/notify/notify.go index 1419469f6a..2d31cc5710 100644 --- a/notify/notify.go +++ b/notify/notify.go @@ -908,8 +908,20 @@ func (r RetryStage) exec(ctx context.Context, l *slog.Logger, alerts ...*types.A b := backoff.NewExponentialBackOff() b.MaxElapsedTime = 0 // Always retry. - tick := backoff.NewTicker(b) - defer tick.Stop() + stopTimer := func(timer *time.Timer) { + if timer == nil { + return + } + if !timer.Stop() { + select { + case <-timer.C: + default: + } + } + } + + attemptTimer := time.NewTimer(0) + defer stopTimer(attemptTimer) var ( i = 0 @@ -943,7 +955,7 @@ func (r RetryStage) exec(ctx context.Context, l *slog.Logger, alerts ...*types.A } select { - case <-tick.C: + case <-attemptTimer.C: now := time.Now() retry, err := r.integration.Notify(ctx, sent...) i++ @@ -956,10 +968,24 @@ func (r RetryStage) exec(ctx context.Context, l *slog.Logger, alerts ...*types.A return ctx, alerts, fmt.Errorf("%s/%s: notify retry canceled due to unrecoverable error after %d attempts: %w", r.groupName, r.integration.String(), i, err) } if ctx.Err() == nil { - if iErr == nil || err.Error() != iErr.Error() { + nextDelay := b.NextBackOff() + + var e *ErrorWithReason + if errors.As(err, &e) && e.Reason == TooManyRequestsReason && e.RetryAfter > 0 { + nextDelay = e.RetryAfter + l.Warn("Notify attempt failed, honoring Retry-After", "attempts", i, "retry_after", e.RetryAfter, "err", err) + } else if iErr == nil || err.Error() != iErr.Error() { // Log the error if the context isn't done and the error isn't the same as before. l.Warn("Notify attempt failed, will retry later", "attempts", i, "err", err) } + + // not really needed since the set b.MaxElapsedTime = 0, + // but just in case the backoff configuration changes in the future. + if nextDelay == backoff.Stop { + return ctx, nil, fmt.Errorf("%s/%s: notify retry stopped after %d attempts: %w", r.groupName, r.integration.String(), i, err) + } + + attemptTimer.Reset(nextDelay) // Save this error to be able to return the last seen error by an // integration upon context timeout. iErr = err diff --git a/notify/notify_test.go b/notify/notify_test.go index b16988aa37..b8f6503ac8 100644 --- a/notify/notify_test.go +++ b/notify/notify_test.go @@ -539,6 +539,115 @@ func TestRetryStageWithContextCanceled(t *testing.T) { require.NotNil(t, resctx) } +func TestRetryStageHonorsRetryAfter(t *testing.T) { + attempts := 0 + i := Integration{ + name: "test", + notifier: notifierFunc(func(ctx context.Context, alerts ...*types.Alert) (bool, error) { + attempts++ + if attempts < 4 { + err := NewErrorWithReason(TooManyRequestsReason, errors.New("received 429 Too Many Requests")) + err.RetryAfter = 10 * time.Millisecond + return true, err + } + return false, nil + }), + rs: sendResolved(false), + } + r := NewRetryStage(i, "", NewMetrics(prometheus.NewRegistry(), featurecontrol.NoopFlags{})) + + alerts := []*types.Alert{{ + Alert: model.Alert{ + EndsAt: time.Now().Add(time.Hour), + }, + }} + + // The default exponential backoff starts at 500ms after the first immediate + // attempt, so 4 attempts can only complete within this timeout when + // Retry-After is actually honored. + ctx, cancel := context.WithTimeout(context.Background(), 400*time.Millisecond) + defer cancel() + ctx = WithFiringAlerts(ctx, []uint64{0}) + + start := time.Now() + _, _, err := r.Exec(ctx, promslog.NewNopLogger(), alerts...) + elapsed := time.Since(start) + require.NoError(t, err) + require.Equal(t, 4, attempts) + require.GreaterOrEqual(t, elapsed, 30*time.Millisecond) + require.Less(t, elapsed, 350*time.Millisecond) +} + +func TestRetryStageRecalculatesBackoffAfterRetryAfter(t *testing.T) { + attempts := 0 + i := Integration{ + name: "test", + notifier: notifierFunc(func(ctx context.Context, alerts ...*types.Alert) (bool, error) { + attempts++ + switch attempts { + case 1: + err := NewErrorWithReason(TooManyRequestsReason, errors.New("received 429 Too Many Requests")) + err.RetryAfter = 10 * time.Millisecond + return true, err + case 2: + return true, errors.New("temporary failure") + default: + return false, nil + } + }), + rs: sendResolved(false), + } + r := NewRetryStage(i, "", NewMetrics(prometheus.NewRegistry(), featurecontrol.NoopFlags{})) + + alerts := []*types.Alert{{ + Alert: model.Alert{ + EndsAt: time.Now().Add(time.Hour), + }, + }} + + ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond) + defer cancel() + ctx = WithFiringAlerts(ctx, []uint64{0}) + + _, _, err := r.Exec(ctx, promslog.NewNopLogger(), alerts...) + require.Error(t, err) + require.Contains(t, err.Error(), "notify retry canceled after 2 attempts") + require.Equal(t, 2, attempts) +} + +func TestRetryStageWithoutRetryAfterUsesExponentialBackoff(t *testing.T) { + attempts := 0 + i := Integration{ + name: "test", + notifier: notifierFunc(func(ctx context.Context, alerts ...*types.Alert) (bool, error) { + attempts++ + if attempts < 4 { + return true, NewErrorWithReason(TooManyRequestsReason, errors.New("received 429 Too Many Requests")) + } + return false, nil + }), + rs: sendResolved(false), + } + r := NewRetryStage(i, "", NewMetrics(prometheus.NewRegistry(), featurecontrol.NoopFlags{})) + + alerts := []*types.Alert{{ + Alert: model.Alert{ + EndsAt: time.Now().Add(time.Hour), + }, + }} + + // Without Retry-After we should follow the default backoff ticker, whose + // first interval after the initial attempt is far larger than this timeout. + ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond) + defer cancel() + ctx = WithFiringAlerts(ctx, []uint64{0}) + + _, _, err := r.Exec(ctx, promslog.NewNopLogger(), alerts...) + require.Error(t, err) + require.Contains(t, err.Error(), "notify retry canceled after 1 attempts") + require.Equal(t, 1, attempts) +} + func TestRetryStageNoResolved(t *testing.T) { sent := []*types.Alert{} i := Integration{ diff --git a/notify/opsgenie/opsgenie.go b/notify/opsgenie/opsgenie.go index a267b45e70..da6b58b6ea 100644 --- a/notify/opsgenie/opsgenie.go +++ b/notify/opsgenie/opsgenie.go @@ -106,10 +106,10 @@ func (n *Notifier) Notify(ctx context.Context, as ...*types.Alert) (bool, error) if err != nil { return true, err } - shouldRetry, err := n.retrier.Check(resp.StatusCode, resp.Body) + shouldRetry, errWithReason := n.retrier.Check(resp) notify.Drain(resp) - if err != nil { - return shouldRetry, notify.NewErrorWithReason(notify.GetFailureReasonFromStatusCode(resp.StatusCode), err) + if errWithReason != nil { + return shouldRetry, errWithReason } } return true, nil diff --git a/notify/opsgenie/opsgenie_test.go b/notify/opsgenie/opsgenie_test.go index ec525adb5f..9f7802ec61 100644 --- a/notify/opsgenie/opsgenie_test.go +++ b/notify/opsgenie/opsgenie_test.go @@ -48,7 +48,7 @@ func TestOpsGenieRetry(t *testing.T) { retryCodes := append(test.DefaultRetryCodes(), http.StatusTooManyRequests) for statusCode, expected := range test.RetryTests(retryCodes) { - actual, _ := notifier.retrier.Check(statusCode, nil) + actual, _ := notifier.retrier.Check(test.HTTPResponseForStatusCode(statusCode)) require.Equal(t, expected, actual, "error on status %d", statusCode) } } diff --git a/notify/pagerduty/pagerduty.go b/notify/pagerduty/pagerduty.go index 8532723992..dee184cbe5 100644 --- a/notify/pagerduty/pagerduty.go +++ b/notify/pagerduty/pagerduty.go @@ -200,7 +200,7 @@ func (n *Notifier) notifyV1( } defer notify.Drain(resp) - return n.retrier.Check(resp.StatusCode, resp.Body) + return n.retrier.Check(resp) } func (n *Notifier) notifyV2( @@ -293,11 +293,8 @@ func (n *Notifier) notifyV2( } defer notify.Drain(resp) - retry, err := n.retrier.Check(resp.StatusCode, resp.Body) - if err != nil { - return retry, notify.NewErrorWithReason(notify.GetFailureReasonFromStatusCode(resp.StatusCode), err) - } - return retry, err + retry, errWithReason := n.retrier.Check(resp) + return retry, errWithReason } // Notify implements the Notifier interface. diff --git a/notify/pagerduty/pagerduty_test.go b/notify/pagerduty/pagerduty_test.go index 302b4c3d3b..39338a60b2 100644 --- a/notify/pagerduty/pagerduty_test.go +++ b/notify/pagerduty/pagerduty_test.go @@ -53,7 +53,7 @@ func TestPagerDutyRetryV1(t *testing.T) { retryCodes := append(test.DefaultRetryCodes(), http.StatusForbidden) for statusCode, expected := range test.RetryTests(retryCodes) { - actual, _ := notifier.retrier.Check(statusCode, nil) + actual, _ := notifier.retrier.Check(test.HTTPResponseForStatusCode(statusCode)) require.Equal(t, expected, actual, "retryv1 - error on status %d", statusCode) } } @@ -71,7 +71,7 @@ func TestPagerDutyRetryV2(t *testing.T) { retryCodes := append(test.DefaultRetryCodes(), http.StatusTooManyRequests) for statusCode, expected := range test.RetryTests(retryCodes) { - actual, _ := notifier.retrier.Check(statusCode, nil) + actual, _ := notifier.retrier.Check(test.HTTPResponseForStatusCode(statusCode)) require.Equal(t, expected, actual, "retryv2 - error on status %d", statusCode) } } diff --git a/notify/pushover/pushover.go b/notify/pushover/pushover.go index 3f5155668c..02529d726d 100644 --- a/notify/pushover/pushover.go +++ b/notify/pushover/pushover.go @@ -174,9 +174,6 @@ func (n *Notifier) Notify(ctx context.Context, as ...*types.Alert) (bool, error) } defer notify.Drain(resp) - shouldRetry, err := n.retrier.Check(resp.StatusCode, resp.Body) - if err != nil { - return shouldRetry, notify.NewErrorWithReason(notify.GetFailureReasonFromStatusCode(resp.StatusCode), err) - } - return shouldRetry, err + shouldRetry, errWithReason := n.retrier.Check(resp) + return shouldRetry, errWithReason } diff --git a/notify/pushover/pushover_test.go b/notify/pushover/pushover_test.go index 88ffa33173..aa67112e36 100644 --- a/notify/pushover/pushover_test.go +++ b/notify/pushover/pushover_test.go @@ -38,7 +38,7 @@ func TestPushoverRetry(t *testing.T) { ) require.NoError(t, err) for statusCode, expected := range test.RetryTests(test.DefaultRetryCodes()) { - actual, _ := notifier.retrier.Check(statusCode, nil) + actual, _ := notifier.retrier.Check(test.HTTPResponseForStatusCode(statusCode)) require.Equal(t, expected, actual, "error on status %d", statusCode) } } diff --git a/notify/rocketchat/rocketchat.go b/notify/rocketchat/rocketchat.go index 8d1a6f4085..00867390cd 100644 --- a/notify/rocketchat/rocketchat.go +++ b/notify/rocketchat/rocketchat.go @@ -17,6 +17,7 @@ import ( "bytes" "context" "encoding/json" + "errors" "fmt" "io" "log/slog" @@ -227,10 +228,14 @@ func (n *Notifier) Notify(ctx context.Context, as ...*types.Alert) (bool, error) // Use a retrier to generate an error message for non-200 responses and // classify them as retriable or not. - retry, err := n.retrier.Check(resp.StatusCode, resp.Body) - if err != nil { - err = fmt.Errorf("channel %q: %w", body.Channel, err) - return retry, notify.NewErrorWithReason(notify.GetFailureReasonFromStatusCode(resp.StatusCode), err) + retry, errWithReason := n.retrier.Check(resp) + if errWithReason != nil { + var ewr *notify.ErrorWithReason + if errors.As(errWithReason, &ewr) { + ewr.Err = fmt.Errorf("channel %q: %w", body.Channel, ewr.Err) + return retry, ewr + } + return retry, fmt.Errorf("channel %q: %w", body.Channel, errWithReason) } // Rocketchat web API might return errors with a 200 response code. diff --git a/notify/rocketchat/rocketchat_test.go b/notify/rocketchat/rocketchat_test.go index 6f0c36c31b..5b9f300020 100644 --- a/notify/rocketchat/rocketchat_test.go +++ b/notify/rocketchat/rocketchat_test.go @@ -42,7 +42,7 @@ func TestRocketchatRetry(t *testing.T) { require.NoError(t, err) for statusCode, expected := range test.RetryTests(test.DefaultRetryCodes()) { - actual, _ := notifier.retrier.Check(statusCode, nil) + actual, _ := notifier.retrier.Check(test.HTTPResponseForStatusCode(statusCode)) require.Equal(t, expected, actual, "error on status %d", statusCode) } } diff --git a/notify/slack/slack.go b/notify/slack/slack.go index dbcea63dd7..bcc100e24a 100644 --- a/notify/slack/slack.go +++ b/notify/slack/slack.go @@ -17,6 +17,7 @@ import ( "bytes" "context" "encoding/json" + "errors" "fmt" "io" "log/slog" @@ -254,10 +255,14 @@ func (n *Notifier) Notify(ctx context.Context, as ...*types.Alert) (bool, error) // Use a retrier to generate an error message for non-200 responses and // classify them as retriable or not. - retry, err := n.retrier.Check(resp.StatusCode, resp.Body) - if err != nil { - err = fmt.Errorf("channel %q: %w", req.Channel, err) - return retry, notify.NewErrorWithReason(notify.GetFailureReasonFromStatusCode(resp.StatusCode), err) + retry, errWithReason := n.retrier.Check(resp) + if errWithReason != nil { + var ewr *notify.ErrorWithReason + if errors.As(errWithReason, &ewr) { + ewr.Err = fmt.Errorf("channel %q: %w", req.Channel, ewr.Err) + return retry, ewr + } + return retry, fmt.Errorf("channel %q: %w", req.Channel, errWithReason) } retry, err = n.slackResponseHandler(resp, store) diff --git a/notify/slack/slack_test.go b/notify/slack/slack_test.go index 68924f7075..92817bded2 100644 --- a/notify/slack/slack_test.go +++ b/notify/slack/slack_test.go @@ -51,7 +51,7 @@ func TestSlackRetry(t *testing.T) { require.NoError(t, err) for statusCode, expected := range test.RetryTests(test.DefaultRetryCodes()) { - actual, _ := notifier.retrier.Check(statusCode, nil) + actual, _ := notifier.retrier.Check(test.HTTPResponseForStatusCode(statusCode)) require.Equal(t, expected, actual, "error on status %d", statusCode) } } diff --git a/notify/sns/sns.go b/notify/sns/sns.go index 80b039b8b1..d8774093ae 100644 --- a/notify/sns/sns.go +++ b/notify/sns/sns.go @@ -17,6 +17,7 @@ import ( "context" "errors" "fmt" + "io" "log/slog" "net/http" "strings" @@ -80,7 +81,12 @@ func (n *Notifier) Notify(ctx context.Context, alert ...*types.Alert) (bool, err // To maintain compatibility with the retrier, we attempt to get an HTTP status code. var respErr *smithyhttp.ResponseError if errors.As(err, &respErr) && respErr.Response != nil { - return n.retrier.Check(respErr.Response.StatusCode, strings.NewReader(apiErr.ErrorMessage())) + resp := &http.Response{ + StatusCode: respErr.Response.StatusCode, + Header: respErr.Response.Header.Clone(), + Body: io.NopCloser(strings.NewReader(apiErr.ErrorMessage())), + } + return n.retrier.Check(resp) } // Fallback if we can't get a status code. return true, fmt.Errorf("failed to create SNS client: %s: %s", apiErr.ErrorCode(), apiErr.ErrorMessage()) @@ -107,9 +113,13 @@ func (n *Notifier) Notify(ctx context.Context, alert ...*types.Alert) (bool, err // If we got a status code, use the retrier logic. if statusCode != 0 { - retryable, checkErr := n.retrier.Check(statusCode, strings.NewReader(apiErr.ErrorMessage())) - reasonErr := notify.NewErrorWithReason(notify.GetFailureReasonFromStatusCode(statusCode), checkErr) - return retryable, reasonErr + resp := &http.Response{ + StatusCode: statusCode, + Header: respErr.Response.Header.Clone(), + Body: io.NopCloser(strings.NewReader(apiErr.ErrorMessage())), + } + retryable, errWithReason := n.retrier.Check(resp) + return retryable, errWithReason } } // Fallback for non-API errors or if status code extraction fails. diff --git a/notify/telegram/telegram_test.go b/notify/telegram/telegram_test.go index 7192d493fd..3cefc7999e 100644 --- a/notify/telegram/telegram_test.go +++ b/notify/telegram/telegram_test.go @@ -83,7 +83,8 @@ func TestTelegramRetry(t *testing.T) { require.NoError(t, err) for statusCode, expected := range test.RetryTests(test.DefaultRetryCodes()) { - actual, _ := notifier.retrier.Check(statusCode, nil) + resp := test.HTTPResponseForStatusCode(statusCode) + actual, _ := notifier.retrier.Check(resp) require.Equal(t, expected, actual, "error on status %d", statusCode) } } diff --git a/notify/test/test.go b/notify/test/test.go index 75729b2a4f..35d40546f3 100644 --- a/notify/test/test.go +++ b/notify/test/test.go @@ -14,7 +14,10 @@ package test import ( + "bytes" "context" + "fmt" + "io" "net/http" "net/http/httptest" "net/url" @@ -123,6 +126,7 @@ func DefaultRetryCodes() []int { http.StatusLoopDetected, http.StatusNotExtended, http.StatusNetworkAuthenticationRequired, + http.StatusTooManyRequests, } } @@ -189,3 +193,24 @@ func GetContextWithCancelingURL(h ...func(w http.ResponseWriter, r *http.Request srv.Close() } } + +// HTTPResponseForStatusCode returns a http.Response with the given status code and a body containing the status code. +func HTTPResponseForStatusCode(statusCode int) *http.Response { + status := fmt.Sprintf("%d %s", statusCode, http.StatusText(statusCode)) + + if statusCode == http.StatusBadRequest { + body := `{"status":"invalid event"}` + return &http.Response{ + Status: status, + StatusCode: statusCode, + Header: make(http.Header), + Body: io.NopCloser(bytes.NewBufferString(body)), + } + } + return &http.Response{ + Status: status, + StatusCode: statusCode, + Header: make(http.Header), + Body: io.NopCloser(bytes.NewBufferString(fmt.Sprintf("response with status %d", statusCode))), + } +} diff --git a/notify/util.go b/notify/util.go index a99e2b067c..9bd10ac3a1 100644 --- a/notify/util.go +++ b/notify/util.go @@ -23,7 +23,9 @@ import ( "net/http" "net/url" "slices" + "strconv" "strings" + "time" commoncfg "github.com/prometheus/common/config" "github.com/prometheus/common/version" @@ -233,38 +235,99 @@ type Retrier struct { RetryCodes []int } +// parseRetryAfter parses the Retry-After header value, which can be either +// a delay in seconds (integer) or an HTTP-date. Returns zero if absent or unparseable. +func parseRetryAfter(h http.Header) time.Duration { + val := h.Get("Retry-After") + if val == "" { + return 0 + } + // Try integer seconds first + if secs, err := strconv.Atoi(val); err == nil { + return time.Duration(secs) * time.Second + } + // Try HTTP-date format + if t, err := http.ParseTime(val); err == nil { + d := time.Until(t) + if d < 0 { + return 0 + } + return d + } + return 0 +} + // Check returns a boolean indicating whether the request should be retried -// and an optional error if the request has failed. If body is not nil, it will +// and an optional ErrorWithReason if the request has failed. If body is not nil, it will // be included in the error message. -func (r *Retrier) Check(statusCode int, body io.Reader) (bool, error) { +func (r *Retrier) Check(resp *http.Response) (bool, error) { + if resp == nil { + return false, NewErrorWithReason(DefaultReason, errors.New("nil HTTP response")) + } + // 2xx responses are considered to be always successful. - if statusCode/100 == 2 { + if resp.StatusCode/100 == 2 { return false, nil } - // 5xx responses are considered to be always retried. - retry := statusCode/100 == 5 || slices.Contains(r.RetryCodes, statusCode) - - s := fmt.Sprintf("unexpected status code %v", statusCode) + s := fmt.Sprintf("unexpected status code %v", resp.StatusCode) var details string if r.CustomDetailsFunc != nil { - details = r.CustomDetailsFunc(statusCode, body) + details = r.CustomDetailsFunc(resp.StatusCode, resp.Body) } else { - details = readAll(body) + details = readAll(resp.Body) } if details != "" { s = fmt.Sprintf("%s: %s", s, details) } - return retry, errors.New(s) + + // Status codes in the RetryCodes list are considered to be retriable regardless of their class. + if slices.Contains(r.RetryCodes, resp.StatusCode) && resp.StatusCode != http.StatusTooManyRequests { + return true, NewErrorWithReason(GetFailureReasonFromStatusCode(resp.StatusCode), errors.New(s)) + } + + if resp.StatusCode == http.StatusTooManyRequests { + e := NewErrorWithReason( + TooManyRequestsReason, + errors.New(s), + ) + retryAfter := parseRetryAfter(resp.Header) + if retryAfter > 0 { + e.RetryAfter = retryAfter + } + return true, e + } + + if resp.StatusCode/100 == 4 { + return false, NewErrorWithReason( + ClientErrorReason, errors.New(s), + ) + } + // 5xx responses are considered to be always retried. + if resp.StatusCode/100 == 5 { + return true, NewErrorWithReason( + ServerErrorReason, errors.New(s), + ) + } + + return false, NewErrorWithReason(GetFailureReasonFromStatusCode(resp.StatusCode), errors.New(s)) } type ErrorWithReason struct { Err error - Reason Reason + Reason Reason + RetryAfter time.Duration } func NewErrorWithReason(reason Reason, err error) *ErrorWithReason { + if reason == TooManyRequestsReason { + return &ErrorWithReason{ + Err: err, + Reason: reason, + RetryAfter: 0, // Default 0 indicates no server indicated retry time, so use default incremental backoff for retries. + } + } return &ErrorWithReason{ Err: err, Reason: reason, @@ -284,6 +347,7 @@ const ( ServerErrorReason ContextCanceledReason ContextDeadlineExceededReason + TooManyRequestsReason ) func (s Reason) String() string { @@ -298,13 +362,15 @@ func (s Reason) String() string { return "contextCanceled" case ContextDeadlineExceededReason: return "contextDeadlineExceeded" + case TooManyRequestsReason: + return "tooManyRequests" default: panic(fmt.Sprintf("unknown Reason: %d", s)) } } // possibleFailureReasonCategory is a list of possible failure reason. -var possibleFailureReasonCategory = []string{DefaultReason.String(), ClientErrorReason.String(), ServerErrorReason.String(), ContextCanceledReason.String(), ContextDeadlineExceededReason.String()} +var possibleFailureReasonCategory = []string{DefaultReason.String(), ClientErrorReason.String(), ServerErrorReason.String(), ContextCanceledReason.String(), ContextDeadlineExceededReason.String(), TooManyRequestsReason.String()} // GetFailureReasonFromStatusCode returns the reason for the failure based on the status code provided. func GetFailureReasonFromStatusCode(statusCode int) Reason { diff --git a/notify/util_test.go b/notify/util_test.go index 032fc51508..f94beedaf2 100644 --- a/notify/util_test.go +++ b/notify/util_test.go @@ -22,6 +22,7 @@ import ( "reflect" "runtime" "testing" + "time" "github.com/stretchr/testify/require" ) @@ -132,60 +133,75 @@ func (b brokenReader) Read([]byte) (int, error) { func TestRetrierCheck(t *testing.T) { for _, tc := range []struct { - retrier Retrier - status int - body io.Reader + retrier Retrier + response *http.Response retry bool expectedErr string }{ { retrier: Retrier{}, - status: http.StatusOK, - body: bytes.NewBuffer([]byte("ok")), + response: &http.Response{ + StatusCode: http.StatusOK, + Body: io.NopCloser(bytes.NewBuffer([]byte("ok"))), + }, retry: false, }, { retrier: Retrier{}, - status: http.StatusNoContent, + response: &http.Response{ + StatusCode: http.StatusNoContent, + Body: io.NopCloser(bytes.NewBuffer([]byte{})), + }, retry: false, }, { retrier: Retrier{}, - status: http.StatusBadRequest, + response: &http.Response{ + StatusCode: http.StatusBadRequest, + }, retry: false, expectedErr: "unexpected status code 400", }, { - retrier: Retrier{RetryCodes: []int{http.StatusTooManyRequests}}, - status: http.StatusBadRequest, - body: bytes.NewBuffer([]byte("invalid request")), + retrier: Retrier{}, + response: &http.Response{ + StatusCode: http.StatusBadRequest, + Body: io.NopCloser(bytes.NewBuffer([]byte("invalid request"))), + }, retry: false, expectedErr: "unexpected status code 400: invalid request", }, { - retrier: Retrier{RetryCodes: []int{http.StatusTooManyRequests}}, - status: http.StatusTooManyRequests, + retrier: Retrier{}, + response: &http.Response{ + StatusCode: http.StatusTooManyRequests, + Body: io.NopCloser(bytes.NewBuffer([]byte("too many requests"))), + }, retry: true, - expectedErr: "unexpected status code 429", + expectedErr: "unexpected status code 429: too many requests", }, { retrier: Retrier{}, - status: http.StatusServiceUnavailable, - body: bytes.NewBuffer([]byte("retry later")), + response: &http.Response{ + StatusCode: http.StatusServiceUnavailable, + Body: io.NopCloser(bytes.NewBuffer([]byte("retry later"))), + }, retry: true, expectedErr: "unexpected status code 503: retry later", }, { retrier: Retrier{}, - status: http.StatusBadGateway, - body: &brokenReader{}, + response: &http.Response{ + StatusCode: http.StatusBadGateway, + Body: io.NopCloser(&brokenReader{}), + }, retry: true, expectedErr: "unexpected status code 502", @@ -198,15 +214,17 @@ func TestRetrierCheck(t *testing.T) { bs, _ := io.ReadAll(b) return fmt.Sprintf("server response is %q", string(bs)) }}, - status: http.StatusServiceUnavailable, - body: bytes.NewBuffer([]byte("retry later")), + response: &http.Response{ + StatusCode: http.StatusServiceUnavailable, + Body: io.NopCloser(bytes.NewBuffer([]byte("retry later"))), + }, retry: true, expectedErr: "unexpected status code 503: server response is \"retry later\"", }, } { t.Run("", func(t *testing.T) { - retry, err := tc.retrier.Check(tc.status, tc.body) + retry, err := tc.retrier.Check(tc.response) require.Equal(t, tc.retry, retry) if tc.expectedErr == "" { require.NoError(t, err) @@ -216,3 +234,70 @@ func TestRetrierCheck(t *testing.T) { }) } } + +func TestRetrierCheckTooManyRequestsRetryAfterPropagation(t *testing.T) { + for _, tc := range []struct { + name string + retrier Retrier + retryAfter string + expected time.Duration + useHTTPDate bool + expectExactRetryAfter bool + }{ + { + name: "retry-after seconds", + retrier: Retrier{}, + retryAfter: "7", + expected: 7 * time.Second, + expectExactRetryAfter: true, + }, + { + name: "retry-after seconds with retry-codes including 429", + retrier: Retrier{RetryCodes: []int{http.StatusTooManyRequests}}, + retryAfter: "10", + expected: 10 * time.Second, + expectExactRetryAfter: true, + }, + { + name: "retry-after http-date with retry-codes including 429", + retrier: Retrier{RetryCodes: []int{http.StatusTooManyRequests}}, + // retryAfter is intentionally left empty; the HTTP-date value is + // generated inside the subtest to avoid it being stale. + retryAfter: "", + expected: 2 * time.Second, + useHTTPDate: true, + }, + } { + t.Run(tc.name, func(t *testing.T) { + retryAfter := tc.retryAfter + if tc.useHTTPDate { + retryAfter = time.Now().Add(tc.expected).UTC().Format(http.TimeFormat) + } + + resp := &http.Response{ + StatusCode: http.StatusTooManyRequests, + Header: make(http.Header), + Body: io.NopCloser(bytes.NewBufferString("too many requests")), + } + resp.Header.Set("Retry-After", retryAfter) + + retry, err := tc.retrier.Check(resp) + require.True(t, retry) + require.Error(t, err) + + var errWithReason *ErrorWithReason + require.ErrorAs(t, err, &errWithReason) + require.Equal(t, TooManyRequestsReason, errWithReason.Reason) + + if tc.expectExactRetryAfter { + require.Equal(t, tc.expected, errWithReason.RetryAfter) + return + } + + // HTTP-date parsing depends on wall clock timing; assert we keep a positive value + // and that it is close to what was requested. + require.Greater(t, errWithReason.RetryAfter, time.Duration(0)) + require.InDelta(t, tc.expected.Seconds(), errWithReason.RetryAfter.Seconds(), 1.0) + }) + } +} diff --git a/notify/victorops/victorops.go b/notify/victorops/victorops.go index 746a10b55d..681ad966a5 100644 --- a/notify/victorops/victorops.go +++ b/notify/victorops/victorops.go @@ -102,11 +102,8 @@ func (n *Notifier) Notify(ctx context.Context, as ...*types.Alert) (bool, error) } defer notify.Drain(resp) - shouldRetry, err := n.retrier.Check(resp.StatusCode, resp.Body) - if err != nil { - return shouldRetry, notify.NewErrorWithReason(notify.GetFailureReasonFromStatusCode(resp.StatusCode), err) - } - return shouldRetry, err + shouldRetry, errWithReason := n.retrier.Check(resp) + return shouldRetry, errWithReason } // Create the JSON payload to be sent to the VictorOps API. diff --git a/notify/victorops/victorops_test.go b/notify/victorops/victorops_test.go index 7766c6f9cb..ca8462c145 100644 --- a/notify/victorops/victorops_test.go +++ b/notify/victorops/victorops_test.go @@ -97,7 +97,7 @@ func TestVictorOpsRetry(t *testing.T) { ) require.NoError(t, err) for statusCode, expected := range test.RetryTests(test.DefaultRetryCodes()) { - actual, _ := notifier.retrier.Check(statusCode, nil) + actual, _ := notifier.retrier.Check(test.HTTPResponseForStatusCode(statusCode)) require.Equal(t, expected, actual, "error on status %d", statusCode) } } diff --git a/notify/webex/webex.go b/notify/webex/webex.go index 922ea543d5..539fe2d39b 100644 --- a/notify/webex/webex.go +++ b/notify/webex/webex.go @@ -106,7 +106,7 @@ func (n *Notifier) Notify(ctx context.Context, as ...*types.Alert) (bool, error) return true, notify.RedactURL(err) } - shouldRetry, err := n.retrier.Check(resp.StatusCode, resp.Body) + shouldRetry, err := n.retrier.Check(resp) if err != nil { return shouldRetry, err } diff --git a/notify/webex/webex_test.go b/notify/webex/webex_test.go index e5b913d876..14c05cb810 100644 --- a/notify/webex/webex_test.go +++ b/notify/webex/webex_test.go @@ -50,7 +50,7 @@ func TestWebexRetry(t *testing.T) { require.NoError(t, err) for statusCode, expected := range test.RetryTests(test.DefaultRetryCodes()) { - actual, _ := notifier.retrier.Check(statusCode, nil) + actual, _ := notifier.retrier.Check(test.HTTPResponseForStatusCode(statusCode)) require.Equal(t, expected, actual, "error on status %d", statusCode) } } diff --git a/notify/webhook/webhook.go b/notify/webhook/webhook.go index 4165e28188..5bfa4798a0 100644 --- a/notify/webhook/webhook.go +++ b/notify/webhook/webhook.go @@ -146,11 +146,8 @@ func (n *Notifier) Notify(ctx context.Context, alerts ...*types.Alert) (bool, er } defer notify.Drain(resp) - shouldRetry, err := n.retrier.Check(resp.StatusCode, resp.Body) - if err != nil { - return shouldRetry, notify.NewErrorWithReason(notify.GetFailureReasonFromStatusCode(resp.StatusCode), err) - } - return shouldRetry, err + shouldRetry, errWithReason := n.retrier.Check(resp) + return shouldRetry, errWithReason } func (n *Notifier) renderPayload( diff --git a/notify/webhook/webhook_test.go b/notify/webhook/webhook_test.go index dfa73d47c0..e0dd71620c 100644 --- a/notify/webhook/webhook_test.go +++ b/notify/webhook/webhook_test.go @@ -52,7 +52,7 @@ func TestWebhookRetry(t *testing.T) { t.Run("test retry status code", func(t *testing.T) { for statusCode, expected := range test.RetryTests(test.DefaultRetryCodes()) { - actual, _ := notifier.retrier.Check(statusCode, nil) + actual, _ := notifier.retrier.Check(test.HTTPResponseForStatusCode(statusCode)) require.Equal(t, expected, actual, "error on status %d", statusCode) } }) @@ -72,14 +72,13 @@ func TestWebhookRetry(t *testing.T) { exp: fmt.Sprintf(`unexpected status code %d: {"status":"invalid event"}`, http.StatusBadRequest), }, - { - status: http.StatusBadRequest, - - exp: fmt.Sprintf(`unexpected status code %d`, http.StatusBadRequest), - }, } { t.Run("", func(t *testing.T) { - _, err = notifier.retrier.Check(tc.status, tc.body) + resp := &http.Response{ + StatusCode: tc.status, + Body: io.NopCloser(tc.body), + } + _, err = notifier.retrier.Check(resp) require.Equal(t, tc.exp, err.Error()) }) } diff --git a/notify/wechat/wechat.go b/notify/wechat/wechat.go index 35d418931d..d30c2ff47d 100644 --- a/notify/wechat/wechat.go +++ b/notify/wechat/wechat.go @@ -182,6 +182,7 @@ func (n *Notifier) Notify(ctx context.Context, as ...*types.Alert) (bool, error) if err != nil { return true, err } + logger.Debug(string(body)) var weResp weChatResponse diff --git a/test/with_api_v2/acceptance/send_test.go b/test/with_api_v2/acceptance/send_test.go index 0bce0e1f96..a787be1141 100644 --- a/test/with_api_v2/acceptance/send_test.go +++ b/test/with_api_v2/acceptance/send_test.go @@ -15,6 +15,7 @@ package test import ( "fmt" + "sync/atomic" "testing" "time" @@ -555,10 +556,11 @@ receivers: co := at.Collector("webhook") wh := NewWebhook(t, co) - wh.Func = func(ts float64) bool { - // Make some webhook requests slow enough to hit the webhook - // timeout, but not so slow as to hit the dispatcher timeout. - if ts < 3 { + var attempts atomic.Int32 + wh.Func = func(_ float64) bool { + // Make the first webhook request slow enough to hit the webhook + // timeout. After the timeout, the retry must succeed. + if attempts.Add(1) <= 1 { time.Sleep(time.Second) return true } @@ -569,7 +571,10 @@ receivers: am.Push(At(1), Alert("alertname", "test1")) - co.Want(Between(3, 4), Alert("alertname", "test1").Active(1)) + // Alert is dispatched at t=2 (group_wait=1s). The first attempt times out + // after 500ms. The retry fires after backoff (250ms–750ms), so success + // arrives between t≈2.75 and t≈3.25. + co.Want(Between(2, 4), Alert("alertname", "test1").Active(1)) at.Run()