diff --git a/client.go b/client.go index 41957d4..d212ec8 100644 --- a/client.go +++ b/client.go @@ -14,6 +14,7 @@ import ( "net/http/httputil" "net/url" "strings" + "time" "github.com/segmentio/encoding/json" ) @@ -95,25 +96,58 @@ func (c *client[R, T]) makeRequest(ctx context.Context, resourceName string, met fmt.Printf("REQUEST:\n%s\n", string(b)) } - resp, err := c.httpDoer.Do(req) - if err != nil { - return nil, err - } - - if c.options.Debug { - b, err := httputil.DumpResponse(resp, true) + doFunc := func(c *client[R, T], req *http.Request) (*http.Response, error) { + resp, err := c.httpDoer.Do(req) if err != nil { return nil, err } - fmt.Printf("RESPONSE:\n%s\n", string(b)) + + if c.options.Debug { + b, err := httputil.DumpResponse(resp, true) + if err != nil { + return nil, err + } + fmt.Printf("RESPONSE:\n%s\n", string(b)) + } + + if resp.StatusCode >= 400 { + err = decodeError(resp) + return nil, err + } + return resp, nil } - if resp.StatusCode > 399 { - err = decodeError(resp) - return nil, err + // Try to use backoff retry mechanism + if c.retry == nil { + // Do single request without using backoff retry mechanism + return doFunc(c, req) } - return resp, nil + for { + resp, err := doFunc(c, req) + + var isMatchedCond bool + for _, cond := range c.options.Retry.Conditions { + if ok := cond(resp, err); ok { + isMatchedCond = true + break + } + } + if isMatchedCond { + // Get next duration internval, sleep and make another request + // till nextDuration != stopBackoff + nextDuration := c.retry.next() + if nextDuration == stopBackoff { + c.retry.reset() + return resp, err + } + time.Sleep(nextDuration) + continue + } + + // Break retries mechanism if conditions weren't matched + return resp, err + } } func (c *client[R, T]) buildRequestURL(resourceName string) (*url.URL, error) { diff --git a/client_test.go b/client_test.go index 319d8ce..e2b7ac8 100644 --- a/client_test.go +++ b/client_test.go @@ -6,7 +6,9 @@ package duffel import ( "context" + "net/http" "testing" + "time" "github.com/stretchr/testify/assert" "gopkg.in/h2non/gock.v1" @@ -59,3 +61,27 @@ func TestClientErrorBadGateway(t *testing.T) { a.Nil(data) a.Equal("duffel: An internal server error occurred. Please try again later.", err.Error()) } + +func TestClientRetry(t *testing.T) { + ctx := context.TODO() + a := assert.New(t) + gock.New("https://api.duffel.com/air/offer_requests"). + Persist(). + Reply(502). + AddHeader("Content-Type", "text/html"). + File("fixtures/502-bad-gateway.html") + defer gock.Off() + + client := New("duffel_test_123", + WithRetry(3, time.Second, time.Second*5, ExponentalBackoff), + WithRetryCondition(func(resp *http.Response, err error) bool { + return err != nil + }), + ) + data, err := client.CreateOfferRequest(ctx, OfferRequestInput{ + ReturnOffers: true, + }) + a.Error(err) + a.Nil(data) + a.Equal("duffel: An internal server error occurred. Please try again later.", err.Error()) +} diff --git a/duffel.go b/duffel.go index 3becb29..63c928a 100644 --- a/duffel.go +++ b/duffel.go @@ -247,7 +247,16 @@ type ( Host string UserAgent string HttpDoer *http.Client - Debug bool + Retry struct { + MaxAttempts int + MinWaitTime time.Duration + MaxWaitTime time.Duration + // Conditions that will be applied on retry mechanism. + Conditions []RetryCond + // Retry function which describes backoff algorithm. + Fn RetryFunc + } + Debug bool } client[Req any, Resp any] struct { @@ -256,6 +265,7 @@ type ( options *Options limiter *rate.Limiter rateLimit *RateLimit + retry *backoff afterResponse []func(resp *http.Response) } diff --git a/options.go b/options.go index 2f0774b..4783d09 100644 --- a/options.go +++ b/options.go @@ -4,7 +4,10 @@ package duffel -import "net/http" +import ( + "net/http" + "time" +) // WithAPIToken sets the API host to the default Duffel production host. func WithDefaultAPI() Option { @@ -49,3 +52,25 @@ func WithDebug() Option { c.Debug = true } } + +// WithRetry enables backoff retrying mechanism. If f retry function isn't provided +// ExponentalBackoff algorithm will be used. You should always use it in bound with WithRetryConditions options. +func WithRetry(maxAttempts int, minWaitTime, maxWaitTime time.Duration, f RetryFunc) Option { + return func(c *Options) { + c.Retry.MaxAttempts = maxAttempts + c.Retry.MinWaitTime = minWaitTime + c.Retry.MaxWaitTime = maxWaitTime + if f == nil { + f = ExponentalBackoff // used as default + } + c.Retry.Fn = f + } +} + +// WithRetryConditions appends retry condition. Retry functionality won't work +// without at least 1 retry condition. +func WithRetryCondition(condition RetryCond) Option { + return func(c *Options) { + c.Retry.Conditions = append(c.Retry.Conditions, condition) + } +} diff --git a/request.go b/request.go index 454d223..14323ae 100644 --- a/request.go +++ b/request.go @@ -8,7 +8,6 @@ import ( "context" "fmt" "net/http" - "sync" "time" "golang.org/x/time/rate" @@ -22,13 +21,20 @@ func newInternalClient[Req any, Resp any](a *API) *client[Req, Resp] { limiter: rate.NewLimiter(rate.Every(1*time.Second), 5), afterResponse: []func(resp *http.Response){ func(resp *http.Response) { - mu := new(sync.Mutex) - mu.Lock() - a.lastRequestID, a.lastResponse = resp.Header.Get(RequestIDHeader), resp - mu.Unlock() + if resp != nil { + a.lastRequestID, a.lastResponse = resp.Header.Get(RequestIDHeader), resp + } }, }, } + if a.options.Retry.MaxAttempts != 0 { + client.retry = &backoff{ + minWaitTime: a.options.Retry.MinWaitTime, + maxWaitTime: a.options.Retry.MaxWaitTime, + maxAttempts: int32(a.options.Retry.MaxAttempts), + f: a.options.Retry.Fn, + } + } return client } @@ -61,9 +67,9 @@ func (c *client[Req, Resp]) Do(ctx context.Context, resourceName string, method c.rateLimit = rateLimit c.limiter.SetBurst(rateLimit.Limit) c.limiter.SetLimit(rate.Every(rateLimit.Period)) - if rateLimit.Remaining == 0 || resp.StatusCode == http.StatusTooManyRequests { return nil, fmt.Errorf("rate limit exceeded, reset in: %s, current limit: %d", rateLimit.Period.String(), rateLimit.Limit) } + return resp, nil } diff --git a/retry.go b/retry.go new file mode 100644 index 0000000..8e0f6e5 --- /dev/null +++ b/retry.go @@ -0,0 +1,54 @@ +package duffel + +import ( + "math" + "math/rand" + "net/http" + "sync/atomic" + "time" +) + +// RetryCond is a condition that applies only to retry backoff mechanism. +type RetryCond func(resp *http.Response, err error) bool + +// RetryFunc takes attemps number, minimal and maximal wait time for backoff. +// Returns duration that mechanism have to wait before making a request. +type RetryFunc func(n int, min, max time.Duration) time.Duration + +// backoff is a thread-safe retry backoff mechanism. +// Currently supported only ExponentalBackoff retry algorithm. +type backoff struct { + minWaitTime time.Duration + maxWaitTime time.Duration + maxAttempts int32 + attempts int32 + f RetryFunc +} + +const stopBackoff time.Duration = -1 + +func (b *backoff) next() time.Duration { + if atomic.LoadInt32(&b.attempts) >= b.maxAttempts { + return stopBackoff + } + atomic.AddInt32(&b.attempts, 1) + return b.f(int(atomic.LoadInt32(&b.attempts)), b.minWaitTime, b.maxWaitTime) +} + +func (b *backoff) reset() { + atomic.SwapInt32(&b.attempts, 0) +} + +func ExponentalBackoff(attemptNum int, min, max time.Duration) time.Duration { + const factor = 2.0 + rand.Seed(time.Now().UnixNano()) + delay := time.Duration(math.Pow(factor, float64(attemptNum)) * float64(min)) + jitter := time.Duration(rand.Float64() * float64(min) * float64(attemptNum)) + + delay = delay + jitter + if delay > max { + delay = max + } + + return delay +}