diff --git a/notify/discord/discord.go b/notify/discord/discord.go index 8941996a6c..b8c58d7003 100644 --- a/notify/discord/discord.go +++ b/notify/discord/discord.go @@ -176,10 +176,8 @@ func (n *Notifier) Notify(ctx context.Context, as ...*types.Alert) (bool, error) if err != nil { return true, notify.RedactURL(err) } + defer notify.Drain(resp) - shouldRetry, err := n.retrier.Check(resp.StatusCode, resp.Body) - if err != nil { - return shouldRetry, err - } - return false, nil + shouldRetry, errWithReason := n.retrier.Check(resp) + return shouldRetry, errWithReason } diff --git a/notify/discord/discord_test.go b/notify/discord/discord_test.go index ca533e1208..62c2cdd5d8 100644 --- a/notify/discord/discord_test.go +++ b/notify/discord/discord_test.go @@ -52,7 +52,7 @@ func TestDiscordRetry(t *testing.T) { require.NoError(t, err) for statusCode, expected := range test.RetryTests(test.DefaultRetryCodes()) { - actual, _ := notifier.retrier.Check(statusCode, nil) + actual, _ := notifier.retrier.Check(test.HTTPResponseForStatusCode(statusCode)) require.Equal(t, expected, actual, "retry - error on status %d", statusCode) } } diff --git a/notify/incidentio/incidentio.go b/notify/incidentio/incidentio.go index b985853eab..9f987007ec 100644 --- a/notify/incidentio/incidentio.go +++ b/notify/incidentio/incidentio.go @@ -202,11 +202,7 @@ func (n *Notifier) Notify(ctx context.Context, alerts ...*types.Alert) (bool, er } defer notify.Drain(resp) - shouldRetry, err := n.retrier.Check(resp.StatusCode, resp.Body) - if err != nil { - return shouldRetry, notify.NewErrorWithReason(notify.GetFailureReasonFromStatusCode(resp.StatusCode), err) - } - return shouldRetry, err + return n.retrier.Check(resp) } // errDetails extracts error details from the response for better error messages. diff --git a/notify/incidentio/incidentio_test.go b/notify/incidentio/incidentio_test.go index 65557a6199..708bcafbce 100644 --- a/notify/incidentio/incidentio_test.go +++ b/notify/incidentio/incidentio_test.go @@ -52,7 +52,7 @@ func TestIncidentIORetry(t *testing.T) { retryCodes := append(test.DefaultRetryCodes(), http.StatusTooManyRequests) for statusCode, expected := range test.RetryTests(retryCodes) { - actual, _ := notifier.retrier.Check(statusCode, nil) + actual, _ := notifier.retrier.Check(test.HTTPResponseForStatusCode(statusCode)) require.Equal(t, expected, actual, "retry - error on status %d", statusCode) } } @@ -187,7 +187,7 @@ func TestIncidentIORetryScenarios(t *testing.T) { statusCode: http.StatusTooManyRequests, responseBody: []byte(`{"error":"rate limit exceeded","message":"Too many requests"}`), expectRetry: true, - expectErrorMsgContains: "rate limit exceeded", + expectErrorMsgContains: "unexpected status code 429: Too many requests", }, { name: "server error response", diff --git a/notify/jira/jira.go b/notify/jira/jira.go index 401ae1df6f..06e924f2a5 100644 --- a/notify/jira/jira.go +++ b/notify/jira/jira.go @@ -401,9 +401,17 @@ func (n *Notifier) doAPIRequestFullPath(ctx context.Context, method, path string return nil, false, err } - shouldRetry, err := n.retrier.Check(resp.StatusCode, bytes.NewReader(responseBody)) - if err != nil { - return nil, shouldRetry, notify.NewErrorWithReason(notify.GetFailureReasonFromStatusCode(resp.StatusCode), err) + // Pass a temporary response to the retrier so that resp.Body remains + // the original network body; defer notify.Drain(resp) will then close + // the real connection rather than the in-memory reader. + checkResp := &http.Response{ + StatusCode: resp.StatusCode, + Header: resp.Header, + Body: io.NopCloser(bytes.NewReader(responseBody)), + } + shouldRetry, errWithReason := n.retrier.Check(checkResp) + if errWithReason != nil { + return nil, shouldRetry, errWithReason } return responseBody, false, nil diff --git a/notify/jira/jira_test.go b/notify/jira/jira_test.go index 77bad0c7c0..eabd9d4f1f 100644 --- a/notify/jira/jira_test.go +++ b/notify/jira/jira_test.go @@ -70,7 +70,7 @@ func TestJiraRetry(t *testing.T) { retryCodes := append(test.DefaultRetryCodes(), http.StatusTooManyRequests) for statusCode, expected := range test.RetryTests(retryCodes) { - actual, _ := notifier.retrier.Check(statusCode, nil) + actual, _ := notifier.retrier.Check(test.HTTPResponseForStatusCode(statusCode)) require.Equal(t, expected, actual, "retry - error on status %d", statusCode) } } diff --git a/notify/mattermost/mattermost.go b/notify/mattermost/mattermost.go index 625aaa7aaa..878a3dc1f7 100644 --- a/notify/mattermost/mattermost.go +++ b/notify/mattermost/mattermost.go @@ -141,10 +141,14 @@ func (n *Notifier) Notify(ctx context.Context, alert ...*types.Alert) (bool, err // Use a retrier to generate an error message for non-200 responses and // classify them as retriable or not. - retry, err := n.retrier.Check(resp.StatusCode, resp.Body) + retry, err := n.retrier.Check(resp) if err != nil { - err = fmt.Errorf("channel %q: %w", req.Channel, err) - return retry, notify.NewErrorWithReason(notify.GetFailureReasonFromStatusCode(resp.StatusCode), err) + var ewr *notify.ErrorWithReason + if errors.As(err, &ewr) { + ewr.Err = fmt.Errorf("channel %q: %w", req.Channel, ewr.Err) + return retry, ewr + } + return retry, fmt.Errorf("channel %q: %w", req.Channel, err) } n.logger.Debug("Message sent to Mattermost successfully", "status", resp.StatusCode) diff --git a/notify/mattermost/mattermost_test.go b/notify/mattermost/mattermost_test.go index 65adafb289..3d5a4c594f 100644 --- a/notify/mattermost/mattermost_test.go +++ b/notify/mattermost/mattermost_test.go @@ -51,7 +51,7 @@ func TestMattermostRetry(t *testing.T) { require.NoError(t, err) for statusCode, expected := range test.RetryTests(test.DefaultRetryCodes()) { - actual, _ := notifier.retrier.Check(statusCode, nil) + actual, _ := notifier.retrier.Check(test.HTTPResponseForStatusCode(statusCode)) require.Equal(t, expected, actual, "retry - error on status %d", statusCode) } } diff --git a/notify/msteams/msteams.go b/notify/msteams/msteams.go index 1f606479c5..c23c214140 100644 --- a/notify/msteams/msteams.go +++ b/notify/msteams/msteams.go @@ -150,9 +150,6 @@ func (n *Notifier) Notify(ctx context.Context, as ...*types.Alert) (bool, error) defer notify.Drain(resp) // https://learn.microsoft.com/en-us/microsoftteams/platform/webhooks-and-connectors/how-to/connectors-using?tabs=cURL#rate-limiting-for-connectors - shouldRetry, err := n.retrier.Check(resp.StatusCode, resp.Body) - if err != nil { - return shouldRetry, notify.NewErrorWithReason(notify.GetFailureReasonFromStatusCode(resp.StatusCode), err) - } - return shouldRetry, err + shouldRetry, errWithReason := n.retrier.Check(resp) + return shouldRetry, errWithReason } diff --git a/notify/msteams/msteams_test.go b/notify/msteams/msteams_test.go index 476566c90d..ffab47fc24 100644 --- a/notify/msteams/msteams_test.go +++ b/notify/msteams/msteams_test.go @@ -52,7 +52,7 @@ func TestMSTeamsRetry(t *testing.T) { require.NoError(t, err) for statusCode, expected := range test.RetryTests(test.DefaultRetryCodes()) { - actual, _ := notifier.retrier.Check(statusCode, nil) + actual, _ := notifier.retrier.Check(test.HTTPResponseForStatusCode(statusCode)) require.Equal(t, expected, actual, "retry - error on status %d", statusCode) } } diff --git a/notify/msteamsv2/msteamsv2.go b/notify/msteamsv2/msteamsv2.go index dc87666238..9ff321ad2f 100644 --- a/notify/msteamsv2/msteamsv2.go +++ b/notify/msteamsv2/msteamsv2.go @@ -196,9 +196,6 @@ func (n *Notifier) Notify(ctx context.Context, as ...*types.Alert) (bool, error) defer notify.Drain(resp) // https://learn.microsoft.com/en-us/microsoftteams/platform/webhooks-and-connectors/how-to/connectors-using?tabs=cURL#rate-limiting-for-connectors - shouldRetry, err := n.retrier.Check(resp.StatusCode, resp.Body) - if err != nil { - return shouldRetry, notify.NewErrorWithReason(notify.GetFailureReasonFromStatusCode(resp.StatusCode), err) - } - return shouldRetry, err + shouldRetry, errWithReason := n.retrier.Check(resp) + return shouldRetry, errWithReason } diff --git a/notify/msteamsv2/msteamsv2_test.go b/notify/msteamsv2/msteamsv2_test.go index 5bae1ab790..aa27954830 100644 --- a/notify/msteamsv2/msteamsv2_test.go +++ b/notify/msteamsv2/msteamsv2_test.go @@ -52,7 +52,7 @@ func TestMSTeamsV2Retry(t *testing.T) { require.NoError(t, err) for statusCode, expected := range test.RetryTests(test.DefaultRetryCodes()) { - actual, _ := notifier.retrier.Check(statusCode, nil) + actual, _ := notifier.retrier.Check(test.HTTPResponseForStatusCode(statusCode)) require.Equal(t, expected, actual, "retry - error on status %d", statusCode) } } diff --git a/notify/notify.go b/notify/notify.go index 1419469f6a..2d31cc5710 100644 --- a/notify/notify.go +++ b/notify/notify.go @@ -908,8 +908,20 @@ func (r RetryStage) exec(ctx context.Context, l *slog.Logger, alerts ...*types.A b := backoff.NewExponentialBackOff() b.MaxElapsedTime = 0 // Always retry. - tick := backoff.NewTicker(b) - defer tick.Stop() + stopTimer := func(timer *time.Timer) { + if timer == nil { + return + } + if !timer.Stop() { + select { + case <-timer.C: + default: + } + } + } + + attemptTimer := time.NewTimer(0) + defer stopTimer(attemptTimer) var ( i = 0 @@ -943,7 +955,7 @@ func (r RetryStage) exec(ctx context.Context, l *slog.Logger, alerts ...*types.A } select { - case <-tick.C: + case <-attemptTimer.C: now := time.Now() retry, err := r.integration.Notify(ctx, sent...) i++ @@ -956,10 +968,24 @@ func (r RetryStage) exec(ctx context.Context, l *slog.Logger, alerts ...*types.A return ctx, alerts, fmt.Errorf("%s/%s: notify retry canceled due to unrecoverable error after %d attempts: %w", r.groupName, r.integration.String(), i, err) } if ctx.Err() == nil { - if iErr == nil || err.Error() != iErr.Error() { + nextDelay := b.NextBackOff() + + var e *ErrorWithReason + if errors.As(err, &e) && e.Reason == TooManyRequestsReason && e.RetryAfter > 0 { + nextDelay = e.RetryAfter + l.Warn("Notify attempt failed, honoring Retry-After", "attempts", i, "retry_after", e.RetryAfter, "err", err) + } else if iErr == nil || err.Error() != iErr.Error() { // Log the error if the context isn't done and the error isn't the same as before. l.Warn("Notify attempt failed, will retry later", "attempts", i, "err", err) } + + // not really needed since the set b.MaxElapsedTime = 0, + // but just in case the backoff configuration changes in the future. + if nextDelay == backoff.Stop { + return ctx, nil, fmt.Errorf("%s/%s: notify retry stopped after %d attempts: %w", r.groupName, r.integration.String(), i, err) + } + + attemptTimer.Reset(nextDelay) // Save this error to be able to return the last seen error by an // integration upon context timeout. iErr = err diff --git a/notify/notify_test.go b/notify/notify_test.go index b16988aa37..b8f6503ac8 100644 --- a/notify/notify_test.go +++ b/notify/notify_test.go @@ -539,6 +539,115 @@ func TestRetryStageWithContextCanceled(t *testing.T) { require.NotNil(t, resctx) } +func TestRetryStageHonorsRetryAfter(t *testing.T) { + attempts := 0 + i := Integration{ + name: "test", + notifier: notifierFunc(func(ctx context.Context, alerts ...*types.Alert) (bool, error) { + attempts++ + if attempts < 4 { + err := NewErrorWithReason(TooManyRequestsReason, errors.New("received 429 Too Many Requests")) + err.RetryAfter = 10 * time.Millisecond + return true, err + } + return false, nil + }), + rs: sendResolved(false), + } + r := NewRetryStage(i, "", NewMetrics(prometheus.NewRegistry(), featurecontrol.NoopFlags{})) + + alerts := []*types.Alert{{ + Alert: model.Alert{ + EndsAt: time.Now().Add(time.Hour), + }, + }} + + // The default exponential backoff starts at 500ms after the first immediate + // attempt, so 4 attempts can only complete within this timeout when + // Retry-After is actually honored. + ctx, cancel := context.WithTimeout(context.Background(), 400*time.Millisecond) + defer cancel() + ctx = WithFiringAlerts(ctx, []uint64{0}) + + start := time.Now() + _, _, err := r.Exec(ctx, promslog.NewNopLogger(), alerts...) + elapsed := time.Since(start) + require.NoError(t, err) + require.Equal(t, 4, attempts) + require.GreaterOrEqual(t, elapsed, 30*time.Millisecond) + require.Less(t, elapsed, 350*time.Millisecond) +} + +func TestRetryStageRecalculatesBackoffAfterRetryAfter(t *testing.T) { + attempts := 0 + i := Integration{ + name: "test", + notifier: notifierFunc(func(ctx context.Context, alerts ...*types.Alert) (bool, error) { + attempts++ + switch attempts { + case 1: + err := NewErrorWithReason(TooManyRequestsReason, errors.New("received 429 Too Many Requests")) + err.RetryAfter = 10 * time.Millisecond + return true, err + case 2: + return true, errors.New("temporary failure") + default: + return false, nil + } + }), + rs: sendResolved(false), + } + r := NewRetryStage(i, "", NewMetrics(prometheus.NewRegistry(), featurecontrol.NoopFlags{})) + + alerts := []*types.Alert{{ + Alert: model.Alert{ + EndsAt: time.Now().Add(time.Hour), + }, + }} + + ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond) + defer cancel() + ctx = WithFiringAlerts(ctx, []uint64{0}) + + _, _, err := r.Exec(ctx, promslog.NewNopLogger(), alerts...) + require.Error(t, err) + require.Contains(t, err.Error(), "notify retry canceled after 2 attempts") + require.Equal(t, 2, attempts) +} + +func TestRetryStageWithoutRetryAfterUsesExponentialBackoff(t *testing.T) { + attempts := 0 + i := Integration{ + name: "test", + notifier: notifierFunc(func(ctx context.Context, alerts ...*types.Alert) (bool, error) { + attempts++ + if attempts < 4 { + return true, NewErrorWithReason(TooManyRequestsReason, errors.New("received 429 Too Many Requests")) + } + return false, nil + }), + rs: sendResolved(false), + } + r := NewRetryStage(i, "", NewMetrics(prometheus.NewRegistry(), featurecontrol.NoopFlags{})) + + alerts := []*types.Alert{{ + Alert: model.Alert{ + EndsAt: time.Now().Add(time.Hour), + }, + }} + + // Without Retry-After we should follow the default backoff ticker, whose + // first interval after the initial attempt is far larger than this timeout. + ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond) + defer cancel() + ctx = WithFiringAlerts(ctx, []uint64{0}) + + _, _, err := r.Exec(ctx, promslog.NewNopLogger(), alerts...) + require.Error(t, err) + require.Contains(t, err.Error(), "notify retry canceled after 1 attempts") + require.Equal(t, 1, attempts) +} + func TestRetryStageNoResolved(t *testing.T) { sent := []*types.Alert{} i := Integration{ diff --git a/notify/opsgenie/opsgenie.go b/notify/opsgenie/opsgenie.go index a267b45e70..da6b58b6ea 100644 --- a/notify/opsgenie/opsgenie.go +++ b/notify/opsgenie/opsgenie.go @@ -106,10 +106,10 @@ func (n *Notifier) Notify(ctx context.Context, as ...*types.Alert) (bool, error) if err != nil { return true, err } - shouldRetry, err := n.retrier.Check(resp.StatusCode, resp.Body) + shouldRetry, errWithReason := n.retrier.Check(resp) notify.Drain(resp) - if err != nil { - return shouldRetry, notify.NewErrorWithReason(notify.GetFailureReasonFromStatusCode(resp.StatusCode), err) + if errWithReason != nil { + return shouldRetry, errWithReason } } return true, nil diff --git a/notify/opsgenie/opsgenie_test.go b/notify/opsgenie/opsgenie_test.go index ec525adb5f..9f7802ec61 100644 --- a/notify/opsgenie/opsgenie_test.go +++ b/notify/opsgenie/opsgenie_test.go @@ -48,7 +48,7 @@ func TestOpsGenieRetry(t *testing.T) { retryCodes := append(test.DefaultRetryCodes(), http.StatusTooManyRequests) for statusCode, expected := range test.RetryTests(retryCodes) { - actual, _ := notifier.retrier.Check(statusCode, nil) + actual, _ := notifier.retrier.Check(test.HTTPResponseForStatusCode(statusCode)) require.Equal(t, expected, actual, "error on status %d", statusCode) } } diff --git a/notify/pagerduty/pagerduty.go b/notify/pagerduty/pagerduty.go index 8532723992..dee184cbe5 100644 --- a/notify/pagerduty/pagerduty.go +++ b/notify/pagerduty/pagerduty.go @@ -200,7 +200,7 @@ func (n *Notifier) notifyV1( } defer notify.Drain(resp) - return n.retrier.Check(resp.StatusCode, resp.Body) + return n.retrier.Check(resp) } func (n *Notifier) notifyV2( @@ -293,11 +293,8 @@ func (n *Notifier) notifyV2( } defer notify.Drain(resp) - retry, err := n.retrier.Check(resp.StatusCode, resp.Body) - if err != nil { - return retry, notify.NewErrorWithReason(notify.GetFailureReasonFromStatusCode(resp.StatusCode), err) - } - return retry, err + retry, errWithReason := n.retrier.Check(resp) + return retry, errWithReason } // Notify implements the Notifier interface. diff --git a/notify/pagerduty/pagerduty_test.go b/notify/pagerduty/pagerduty_test.go index 302b4c3d3b..39338a60b2 100644 --- a/notify/pagerduty/pagerduty_test.go +++ b/notify/pagerduty/pagerduty_test.go @@ -53,7 +53,7 @@ func TestPagerDutyRetryV1(t *testing.T) { retryCodes := append(test.DefaultRetryCodes(), http.StatusForbidden) for statusCode, expected := range test.RetryTests(retryCodes) { - actual, _ := notifier.retrier.Check(statusCode, nil) + actual, _ := notifier.retrier.Check(test.HTTPResponseForStatusCode(statusCode)) require.Equal(t, expected, actual, "retryv1 - error on status %d", statusCode) } } @@ -71,7 +71,7 @@ func TestPagerDutyRetryV2(t *testing.T) { retryCodes := append(test.DefaultRetryCodes(), http.StatusTooManyRequests) for statusCode, expected := range test.RetryTests(retryCodes) { - actual, _ := notifier.retrier.Check(statusCode, nil) + actual, _ := notifier.retrier.Check(test.HTTPResponseForStatusCode(statusCode)) require.Equal(t, expected, actual, "retryv2 - error on status %d", statusCode) } } diff --git a/notify/pushover/pushover.go b/notify/pushover/pushover.go index 3f5155668c..02529d726d 100644 --- a/notify/pushover/pushover.go +++ b/notify/pushover/pushover.go @@ -174,9 +174,6 @@ func (n *Notifier) Notify(ctx context.Context, as ...*types.Alert) (bool, error) } defer notify.Drain(resp) - shouldRetry, err := n.retrier.Check(resp.StatusCode, resp.Body) - if err != nil { - return shouldRetry, notify.NewErrorWithReason(notify.GetFailureReasonFromStatusCode(resp.StatusCode), err) - } - return shouldRetry, err + shouldRetry, errWithReason := n.retrier.Check(resp) + return shouldRetry, errWithReason } diff --git a/notify/pushover/pushover_test.go b/notify/pushover/pushover_test.go index 88ffa33173..aa67112e36 100644 --- a/notify/pushover/pushover_test.go +++ b/notify/pushover/pushover_test.go @@ -38,7 +38,7 @@ func TestPushoverRetry(t *testing.T) { ) require.NoError(t, err) for statusCode, expected := range test.RetryTests(test.DefaultRetryCodes()) { - actual, _ := notifier.retrier.Check(statusCode, nil) + actual, _ := notifier.retrier.Check(test.HTTPResponseForStatusCode(statusCode)) require.Equal(t, expected, actual, "error on status %d", statusCode) } } diff --git a/notify/rocketchat/rocketchat.go b/notify/rocketchat/rocketchat.go index 8d1a6f4085..00867390cd 100644 --- a/notify/rocketchat/rocketchat.go +++ b/notify/rocketchat/rocketchat.go @@ -17,6 +17,7 @@ import ( "bytes" "context" "encoding/json" + "errors" "fmt" "io" "log/slog" @@ -227,10 +228,14 @@ func (n *Notifier) Notify(ctx context.Context, as ...*types.Alert) (bool, error) // Use a retrier to generate an error message for non-200 responses and // classify them as retriable or not. - retry, err := n.retrier.Check(resp.StatusCode, resp.Body) - if err != nil { - err = fmt.Errorf("channel %q: %w", body.Channel, err) - return retry, notify.NewErrorWithReason(notify.GetFailureReasonFromStatusCode(resp.StatusCode), err) + retry, errWithReason := n.retrier.Check(resp) + if errWithReason != nil { + var ewr *notify.ErrorWithReason + if errors.As(errWithReason, &ewr) { + ewr.Err = fmt.Errorf("channel %q: %w", body.Channel, ewr.Err) + return retry, ewr + } + return retry, fmt.Errorf("channel %q: %w", body.Channel, errWithReason) } // Rocketchat web API might return errors with a 200 response code. diff --git a/notify/rocketchat/rocketchat_test.go b/notify/rocketchat/rocketchat_test.go index 6f0c36c31b..5b9f300020 100644 --- a/notify/rocketchat/rocketchat_test.go +++ b/notify/rocketchat/rocketchat_test.go @@ -42,7 +42,7 @@ func TestRocketchatRetry(t *testing.T) { require.NoError(t, err) for statusCode, expected := range test.RetryTests(test.DefaultRetryCodes()) { - actual, _ := notifier.retrier.Check(statusCode, nil) + actual, _ := notifier.retrier.Check(test.HTTPResponseForStatusCode(statusCode)) require.Equal(t, expected, actual, "error on status %d", statusCode) } } diff --git a/notify/slack/slack.go b/notify/slack/slack.go index dbcea63dd7..bcc100e24a 100644 --- a/notify/slack/slack.go +++ b/notify/slack/slack.go @@ -17,6 +17,7 @@ import ( "bytes" "context" "encoding/json" + "errors" "fmt" "io" "log/slog" @@ -254,10 +255,14 @@ func (n *Notifier) Notify(ctx context.Context, as ...*types.Alert) (bool, error) // Use a retrier to generate an error message for non-200 responses and // classify them as retriable or not. - retry, err := n.retrier.Check(resp.StatusCode, resp.Body) - if err != nil { - err = fmt.Errorf("channel %q: %w", req.Channel, err) - return retry, notify.NewErrorWithReason(notify.GetFailureReasonFromStatusCode(resp.StatusCode), err) + retry, errWithReason := n.retrier.Check(resp) + if errWithReason != nil { + var ewr *notify.ErrorWithReason + if errors.As(errWithReason, &ewr) { + ewr.Err = fmt.Errorf("channel %q: %w", req.Channel, ewr.Err) + return retry, ewr + } + return retry, fmt.Errorf("channel %q: %w", req.Channel, errWithReason) } retry, err = n.slackResponseHandler(resp, store) diff --git a/notify/slack/slack_test.go b/notify/slack/slack_test.go index 68924f7075..92817bded2 100644 --- a/notify/slack/slack_test.go +++ b/notify/slack/slack_test.go @@ -51,7 +51,7 @@ func TestSlackRetry(t *testing.T) { require.NoError(t, err) for statusCode, expected := range test.RetryTests(test.DefaultRetryCodes()) { - actual, _ := notifier.retrier.Check(statusCode, nil) + actual, _ := notifier.retrier.Check(test.HTTPResponseForStatusCode(statusCode)) require.Equal(t, expected, actual, "error on status %d", statusCode) } } diff --git a/notify/sns/sns.go b/notify/sns/sns.go index 80b039b8b1..d8774093ae 100644 --- a/notify/sns/sns.go +++ b/notify/sns/sns.go @@ -17,6 +17,7 @@ import ( "context" "errors" "fmt" + "io" "log/slog" "net/http" "strings" @@ -80,7 +81,12 @@ func (n *Notifier) Notify(ctx context.Context, alert ...*types.Alert) (bool, err // To maintain compatibility with the retrier, we attempt to get an HTTP status code. var respErr *smithyhttp.ResponseError if errors.As(err, &respErr) && respErr.Response != nil { - return n.retrier.Check(respErr.Response.StatusCode, strings.NewReader(apiErr.ErrorMessage())) + resp := &http.Response{ + StatusCode: respErr.Response.StatusCode, + Header: respErr.Response.Header.Clone(), + Body: io.NopCloser(strings.NewReader(apiErr.ErrorMessage())), + } + return n.retrier.Check(resp) } // Fallback if we can't get a status code. return true, fmt.Errorf("failed to create SNS client: %s: %s", apiErr.ErrorCode(), apiErr.ErrorMessage()) @@ -107,9 +113,13 @@ func (n *Notifier) Notify(ctx context.Context, alert ...*types.Alert) (bool, err // If we got a status code, use the retrier logic. if statusCode != 0 { - retryable, checkErr := n.retrier.Check(statusCode, strings.NewReader(apiErr.ErrorMessage())) - reasonErr := notify.NewErrorWithReason(notify.GetFailureReasonFromStatusCode(statusCode), checkErr) - return retryable, reasonErr + resp := &http.Response{ + StatusCode: statusCode, + Header: respErr.Response.Header.Clone(), + Body: io.NopCloser(strings.NewReader(apiErr.ErrorMessage())), + } + retryable, errWithReason := n.retrier.Check(resp) + return retryable, errWithReason } } // Fallback for non-API errors or if status code extraction fails. diff --git a/notify/telegram/telegram_test.go b/notify/telegram/telegram_test.go index 7192d493fd..3cefc7999e 100644 --- a/notify/telegram/telegram_test.go +++ b/notify/telegram/telegram_test.go @@ -83,7 +83,8 @@ func TestTelegramRetry(t *testing.T) { require.NoError(t, err) for statusCode, expected := range test.RetryTests(test.DefaultRetryCodes()) { - actual, _ := notifier.retrier.Check(statusCode, nil) + resp := test.HTTPResponseForStatusCode(statusCode) + actual, _ := notifier.retrier.Check(resp) require.Equal(t, expected, actual, "error on status %d", statusCode) } } diff --git a/notify/test/test.go b/notify/test/test.go index 75729b2a4f..35d40546f3 100644 --- a/notify/test/test.go +++ b/notify/test/test.go @@ -14,7 +14,10 @@ package test import ( + "bytes" "context" + "fmt" + "io" "net/http" "net/http/httptest" "net/url" @@ -123,6 +126,7 @@ func DefaultRetryCodes() []int { http.StatusLoopDetected, http.StatusNotExtended, http.StatusNetworkAuthenticationRequired, + http.StatusTooManyRequests, } } @@ -189,3 +193,24 @@ func GetContextWithCancelingURL(h ...func(w http.ResponseWriter, r *http.Request srv.Close() } } + +// HTTPResponseForStatusCode returns a http.Response with the given status code and a body containing the status code. +func HTTPResponseForStatusCode(statusCode int) *http.Response { + status := fmt.Sprintf("%d %s", statusCode, http.StatusText(statusCode)) + + if statusCode == http.StatusBadRequest { + body := `{"status":"invalid event"}` + return &http.Response{ + Status: status, + StatusCode: statusCode, + Header: make(http.Header), + Body: io.NopCloser(bytes.NewBufferString(body)), + } + } + return &http.Response{ + Status: status, + StatusCode: statusCode, + Header: make(http.Header), + Body: io.NopCloser(bytes.NewBufferString(fmt.Sprintf("response with status %d", statusCode))), + } +} diff --git a/notify/util.go b/notify/util.go index a99e2b067c..9bd10ac3a1 100644 --- a/notify/util.go +++ b/notify/util.go @@ -23,7 +23,9 @@ import ( "net/http" "net/url" "slices" + "strconv" "strings" + "time" commoncfg "github.com/prometheus/common/config" "github.com/prometheus/common/version" @@ -233,38 +235,99 @@ type Retrier struct { RetryCodes []int } +// parseRetryAfter parses the Retry-After header value, which can be either +// a delay in seconds (integer) or an HTTP-date. Returns zero if absent or unparseable. +func parseRetryAfter(h http.Header) time.Duration { + val := h.Get("Retry-After") + if val == "" { + return 0 + } + // Try integer seconds first + if secs, err := strconv.Atoi(val); err == nil { + return time.Duration(secs) * time.Second + } + // Try HTTP-date format + if t, err := http.ParseTime(val); err == nil { + d := time.Until(t) + if d < 0 { + return 0 + } + return d + } + return 0 +} + // Check returns a boolean indicating whether the request should be retried -// and an optional error if the request has failed. If body is not nil, it will +// and an optional ErrorWithReason if the request has failed. If body is not nil, it will // be included in the error message. -func (r *Retrier) Check(statusCode int, body io.Reader) (bool, error) { +func (r *Retrier) Check(resp *http.Response) (bool, error) { + if resp == nil { + return false, NewErrorWithReason(DefaultReason, errors.New("nil HTTP response")) + } + // 2xx responses are considered to be always successful. - if statusCode/100 == 2 { + if resp.StatusCode/100 == 2 { return false, nil } - // 5xx responses are considered to be always retried. - retry := statusCode/100 == 5 || slices.Contains(r.RetryCodes, statusCode) - - s := fmt.Sprintf("unexpected status code %v", statusCode) + s := fmt.Sprintf("unexpected status code %v", resp.StatusCode) var details string if r.CustomDetailsFunc != nil { - details = r.CustomDetailsFunc(statusCode, body) + details = r.CustomDetailsFunc(resp.StatusCode, resp.Body) } else { - details = readAll(body) + details = readAll(resp.Body) } if details != "" { s = fmt.Sprintf("%s: %s", s, details) } - return retry, errors.New(s) + + // Status codes in the RetryCodes list are considered to be retriable regardless of their class. + if slices.Contains(r.RetryCodes, resp.StatusCode) && resp.StatusCode != http.StatusTooManyRequests { + return true, NewErrorWithReason(GetFailureReasonFromStatusCode(resp.StatusCode), errors.New(s)) + } + + if resp.StatusCode == http.StatusTooManyRequests { + e := NewErrorWithReason( + TooManyRequestsReason, + errors.New(s), + ) + retryAfter := parseRetryAfter(resp.Header) + if retryAfter > 0 { + e.RetryAfter = retryAfter + } + return true, e + } + + if resp.StatusCode/100 == 4 { + return false, NewErrorWithReason( + ClientErrorReason, errors.New(s), + ) + } + // 5xx responses are considered to be always retried. + if resp.StatusCode/100 == 5 { + return true, NewErrorWithReason( + ServerErrorReason, errors.New(s), + ) + } + + return false, NewErrorWithReason(GetFailureReasonFromStatusCode(resp.StatusCode), errors.New(s)) } type ErrorWithReason struct { Err error - Reason Reason + Reason Reason + RetryAfter time.Duration } func NewErrorWithReason(reason Reason, err error) *ErrorWithReason { + if reason == TooManyRequestsReason { + return &ErrorWithReason{ + Err: err, + Reason: reason, + RetryAfter: 0, // Default 0 indicates no server indicated retry time, so use default incremental backoff for retries. + } + } return &ErrorWithReason{ Err: err, Reason: reason, @@ -284,6 +347,7 @@ const ( ServerErrorReason ContextCanceledReason ContextDeadlineExceededReason + TooManyRequestsReason ) func (s Reason) String() string { @@ -298,13 +362,15 @@ func (s Reason) String() string { return "contextCanceled" case ContextDeadlineExceededReason: return "contextDeadlineExceeded" + case TooManyRequestsReason: + return "tooManyRequests" default: panic(fmt.Sprintf("unknown Reason: %d", s)) } } // possibleFailureReasonCategory is a list of possible failure reason. -var possibleFailureReasonCategory = []string{DefaultReason.String(), ClientErrorReason.String(), ServerErrorReason.String(), ContextCanceledReason.String(), ContextDeadlineExceededReason.String()} +var possibleFailureReasonCategory = []string{DefaultReason.String(), ClientErrorReason.String(), ServerErrorReason.String(), ContextCanceledReason.String(), ContextDeadlineExceededReason.String(), TooManyRequestsReason.String()} // GetFailureReasonFromStatusCode returns the reason for the failure based on the status code provided. func GetFailureReasonFromStatusCode(statusCode int) Reason { diff --git a/notify/util_test.go b/notify/util_test.go index 032fc51508..f94beedaf2 100644 --- a/notify/util_test.go +++ b/notify/util_test.go @@ -22,6 +22,7 @@ import ( "reflect" "runtime" "testing" + "time" "github.com/stretchr/testify/require" ) @@ -132,60 +133,75 @@ func (b brokenReader) Read([]byte) (int, error) { func TestRetrierCheck(t *testing.T) { for _, tc := range []struct { - retrier Retrier - status int - body io.Reader + retrier Retrier + response *http.Response retry bool expectedErr string }{ { retrier: Retrier{}, - status: http.StatusOK, - body: bytes.NewBuffer([]byte("ok")), + response: &http.Response{ + StatusCode: http.StatusOK, + Body: io.NopCloser(bytes.NewBuffer([]byte("ok"))), + }, retry: false, }, { retrier: Retrier{}, - status: http.StatusNoContent, + response: &http.Response{ + StatusCode: http.StatusNoContent, + Body: io.NopCloser(bytes.NewBuffer([]byte{})), + }, retry: false, }, { retrier: Retrier{}, - status: http.StatusBadRequest, + response: &http.Response{ + StatusCode: http.StatusBadRequest, + }, retry: false, expectedErr: "unexpected status code 400", }, { - retrier: Retrier{RetryCodes: []int{http.StatusTooManyRequests}}, - status: http.StatusBadRequest, - body: bytes.NewBuffer([]byte("invalid request")), + retrier: Retrier{}, + response: &http.Response{ + StatusCode: http.StatusBadRequest, + Body: io.NopCloser(bytes.NewBuffer([]byte("invalid request"))), + }, retry: false, expectedErr: "unexpected status code 400: invalid request", }, { - retrier: Retrier{RetryCodes: []int{http.StatusTooManyRequests}}, - status: http.StatusTooManyRequests, + retrier: Retrier{}, + response: &http.Response{ + StatusCode: http.StatusTooManyRequests, + Body: io.NopCloser(bytes.NewBuffer([]byte("too many requests"))), + }, retry: true, - expectedErr: "unexpected status code 429", + expectedErr: "unexpected status code 429: too many requests", }, { retrier: Retrier{}, - status: http.StatusServiceUnavailable, - body: bytes.NewBuffer([]byte("retry later")), + response: &http.Response{ + StatusCode: http.StatusServiceUnavailable, + Body: io.NopCloser(bytes.NewBuffer([]byte("retry later"))), + }, retry: true, expectedErr: "unexpected status code 503: retry later", }, { retrier: Retrier{}, - status: http.StatusBadGateway, - body: &brokenReader{}, + response: &http.Response{ + StatusCode: http.StatusBadGateway, + Body: io.NopCloser(&brokenReader{}), + }, retry: true, expectedErr: "unexpected status code 502", @@ -198,15 +214,17 @@ func TestRetrierCheck(t *testing.T) { bs, _ := io.ReadAll(b) return fmt.Sprintf("server response is %q", string(bs)) }}, - status: http.StatusServiceUnavailable, - body: bytes.NewBuffer([]byte("retry later")), + response: &http.Response{ + StatusCode: http.StatusServiceUnavailable, + Body: io.NopCloser(bytes.NewBuffer([]byte("retry later"))), + }, retry: true, expectedErr: "unexpected status code 503: server response is \"retry later\"", }, } { t.Run("", func(t *testing.T) { - retry, err := tc.retrier.Check(tc.status, tc.body) + retry, err := tc.retrier.Check(tc.response) require.Equal(t, tc.retry, retry) if tc.expectedErr == "" { require.NoError(t, err) @@ -216,3 +234,70 @@ func TestRetrierCheck(t *testing.T) { }) } } + +func TestRetrierCheckTooManyRequestsRetryAfterPropagation(t *testing.T) { + for _, tc := range []struct { + name string + retrier Retrier + retryAfter string + expected time.Duration + useHTTPDate bool + expectExactRetryAfter bool + }{ + { + name: "retry-after seconds", + retrier: Retrier{}, + retryAfter: "7", + expected: 7 * time.Second, + expectExactRetryAfter: true, + }, + { + name: "retry-after seconds with retry-codes including 429", + retrier: Retrier{RetryCodes: []int{http.StatusTooManyRequests}}, + retryAfter: "10", + expected: 10 * time.Second, + expectExactRetryAfter: true, + }, + { + name: "retry-after http-date with retry-codes including 429", + retrier: Retrier{RetryCodes: []int{http.StatusTooManyRequests}}, + // retryAfter is intentionally left empty; the HTTP-date value is + // generated inside the subtest to avoid it being stale. + retryAfter: "", + expected: 2 * time.Second, + useHTTPDate: true, + }, + } { + t.Run(tc.name, func(t *testing.T) { + retryAfter := tc.retryAfter + if tc.useHTTPDate { + retryAfter = time.Now().Add(tc.expected).UTC().Format(http.TimeFormat) + } + + resp := &http.Response{ + StatusCode: http.StatusTooManyRequests, + Header: make(http.Header), + Body: io.NopCloser(bytes.NewBufferString("too many requests")), + } + resp.Header.Set("Retry-After", retryAfter) + + retry, err := tc.retrier.Check(resp) + require.True(t, retry) + require.Error(t, err) + + var errWithReason *ErrorWithReason + require.ErrorAs(t, err, &errWithReason) + require.Equal(t, TooManyRequestsReason, errWithReason.Reason) + + if tc.expectExactRetryAfter { + require.Equal(t, tc.expected, errWithReason.RetryAfter) + return + } + + // HTTP-date parsing depends on wall clock timing; assert we keep a positive value + // and that it is close to what was requested. + require.Greater(t, errWithReason.RetryAfter, time.Duration(0)) + require.InDelta(t, tc.expected.Seconds(), errWithReason.RetryAfter.Seconds(), 1.0) + }) + } +} diff --git a/notify/victorops/victorops.go b/notify/victorops/victorops.go index 746a10b55d..681ad966a5 100644 --- a/notify/victorops/victorops.go +++ b/notify/victorops/victorops.go @@ -102,11 +102,8 @@ func (n *Notifier) Notify(ctx context.Context, as ...*types.Alert) (bool, error) } defer notify.Drain(resp) - shouldRetry, err := n.retrier.Check(resp.StatusCode, resp.Body) - if err != nil { - return shouldRetry, notify.NewErrorWithReason(notify.GetFailureReasonFromStatusCode(resp.StatusCode), err) - } - return shouldRetry, err + shouldRetry, errWithReason := n.retrier.Check(resp) + return shouldRetry, errWithReason } // Create the JSON payload to be sent to the VictorOps API. diff --git a/notify/victorops/victorops_test.go b/notify/victorops/victorops_test.go index 7766c6f9cb..ca8462c145 100644 --- a/notify/victorops/victorops_test.go +++ b/notify/victorops/victorops_test.go @@ -97,7 +97,7 @@ func TestVictorOpsRetry(t *testing.T) { ) require.NoError(t, err) for statusCode, expected := range test.RetryTests(test.DefaultRetryCodes()) { - actual, _ := notifier.retrier.Check(statusCode, nil) + actual, _ := notifier.retrier.Check(test.HTTPResponseForStatusCode(statusCode)) require.Equal(t, expected, actual, "error on status %d", statusCode) } } diff --git a/notify/webex/webex.go b/notify/webex/webex.go index 922ea543d5..539fe2d39b 100644 --- a/notify/webex/webex.go +++ b/notify/webex/webex.go @@ -106,7 +106,7 @@ func (n *Notifier) Notify(ctx context.Context, as ...*types.Alert) (bool, error) return true, notify.RedactURL(err) } - shouldRetry, err := n.retrier.Check(resp.StatusCode, resp.Body) + shouldRetry, err := n.retrier.Check(resp) if err != nil { return shouldRetry, err } diff --git a/notify/webex/webex_test.go b/notify/webex/webex_test.go index e5b913d876..14c05cb810 100644 --- a/notify/webex/webex_test.go +++ b/notify/webex/webex_test.go @@ -50,7 +50,7 @@ func TestWebexRetry(t *testing.T) { require.NoError(t, err) for statusCode, expected := range test.RetryTests(test.DefaultRetryCodes()) { - actual, _ := notifier.retrier.Check(statusCode, nil) + actual, _ := notifier.retrier.Check(test.HTTPResponseForStatusCode(statusCode)) require.Equal(t, expected, actual, "error on status %d", statusCode) } } diff --git a/notify/webhook/webhook.go b/notify/webhook/webhook.go index 4165e28188..5bfa4798a0 100644 --- a/notify/webhook/webhook.go +++ b/notify/webhook/webhook.go @@ -146,11 +146,8 @@ func (n *Notifier) Notify(ctx context.Context, alerts ...*types.Alert) (bool, er } defer notify.Drain(resp) - shouldRetry, err := n.retrier.Check(resp.StatusCode, resp.Body) - if err != nil { - return shouldRetry, notify.NewErrorWithReason(notify.GetFailureReasonFromStatusCode(resp.StatusCode), err) - } - return shouldRetry, err + shouldRetry, errWithReason := n.retrier.Check(resp) + return shouldRetry, errWithReason } func (n *Notifier) renderPayload( diff --git a/notify/webhook/webhook_test.go b/notify/webhook/webhook_test.go index dfa73d47c0..e0dd71620c 100644 --- a/notify/webhook/webhook_test.go +++ b/notify/webhook/webhook_test.go @@ -52,7 +52,7 @@ func TestWebhookRetry(t *testing.T) { t.Run("test retry status code", func(t *testing.T) { for statusCode, expected := range test.RetryTests(test.DefaultRetryCodes()) { - actual, _ := notifier.retrier.Check(statusCode, nil) + actual, _ := notifier.retrier.Check(test.HTTPResponseForStatusCode(statusCode)) require.Equal(t, expected, actual, "error on status %d", statusCode) } }) @@ -72,14 +72,13 @@ func TestWebhookRetry(t *testing.T) { exp: fmt.Sprintf(`unexpected status code %d: {"status":"invalid event"}`, http.StatusBadRequest), }, - { - status: http.StatusBadRequest, - - exp: fmt.Sprintf(`unexpected status code %d`, http.StatusBadRequest), - }, } { t.Run("", func(t *testing.T) { - _, err = notifier.retrier.Check(tc.status, tc.body) + resp := &http.Response{ + StatusCode: tc.status, + Body: io.NopCloser(tc.body), + } + _, err = notifier.retrier.Check(resp) require.Equal(t, tc.exp, err.Error()) }) } diff --git a/notify/wechat/wechat.go b/notify/wechat/wechat.go index 35d418931d..d30c2ff47d 100644 --- a/notify/wechat/wechat.go +++ b/notify/wechat/wechat.go @@ -182,6 +182,7 @@ func (n *Notifier) Notify(ctx context.Context, as ...*types.Alert) (bool, error) if err != nil { return true, err } + logger.Debug(string(body)) var weResp weChatResponse diff --git a/test/with_api_v2/acceptance/send_test.go b/test/with_api_v2/acceptance/send_test.go index 0bce0e1f96..a787be1141 100644 --- a/test/with_api_v2/acceptance/send_test.go +++ b/test/with_api_v2/acceptance/send_test.go @@ -15,6 +15,7 @@ package test import ( "fmt" + "sync/atomic" "testing" "time" @@ -555,10 +556,11 @@ receivers: co := at.Collector("webhook") wh := NewWebhook(t, co) - wh.Func = func(ts float64) bool { - // Make some webhook requests slow enough to hit the webhook - // timeout, but not so slow as to hit the dispatcher timeout. - if ts < 3 { + var attempts atomic.Int32 + wh.Func = func(_ float64) bool { + // Make the first webhook request slow enough to hit the webhook + // timeout. After the timeout, the retry must succeed. + if attempts.Add(1) <= 1 { time.Sleep(time.Second) return true } @@ -569,7 +571,10 @@ receivers: am.Push(At(1), Alert("alertname", "test1")) - co.Want(Between(3, 4), Alert("alertname", "test1").Active(1)) + // Alert is dispatched at t=2 (group_wait=1s). The first attempt times out + // after 500ms. The retry fires after backoff (250ms–750ms), so success + // arrives between t≈2.75 and t≈3.25. + co.Want(Between(2, 4), Alert("alertname", "test1").Active(1)) at.Run()