Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 46 additions & 12 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"net/http/httputil"
"net/url"
"strings"
"time"

"github.com/segmentio/encoding/json"
)
Expand Down Expand Up @@ -95,25 +96,58 @@ func (c *client[R, T]) makeRequest(ctx context.Context, resourceName string, met
fmt.Printf("REQUEST:\n%s\n", string(b))
}

resp, err := c.httpDoer.Do(req)
if err != nil {
return nil, err
}

if c.options.Debug {
b, err := httputil.DumpResponse(resp, true)
doFunc := func(c *client[R, T], req *http.Request) (*http.Response, error) {
resp, err := c.httpDoer.Do(req)
if err != nil {
return nil, err
}
fmt.Printf("RESPONSE:\n%s\n", string(b))

if c.options.Debug {
b, err := httputil.DumpResponse(resp, true)
if err != nil {
return nil, err
}
fmt.Printf("RESPONSE:\n%s\n", string(b))
}

if resp.StatusCode >= 400 {
err = decodeError(resp)
return nil, err
}
return resp, nil
}

if resp.StatusCode > 399 {
err = decodeError(resp)
return nil, err
// Try to use backoff retry mechanism
if c.retry == nil {
// Do single request without using backoff retry mechanism
return doFunc(c, req)
}

return resp, nil
for {
resp, err := doFunc(c, req)

var isMatchedCond bool
for _, cond := range c.options.Retry.Conditions {
if ok := cond(resp, err); ok {
isMatchedCond = true
break
}
}
if isMatchedCond {
// Get next duration internval, sleep and make another request
// till nextDuration != stopBackoff
nextDuration := c.retry.next()
if nextDuration == stopBackoff {
c.retry.reset()
return resp, err
}
time.Sleep(nextDuration)
continue
}

// Break retries mechanism if conditions weren't matched
return resp, err
}
}

func (c *client[R, T]) buildRequestURL(resourceName string) (*url.URL, error) {
Expand Down
26 changes: 26 additions & 0 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ package duffel

import (
"context"
"net/http"
"testing"
"time"

"github.com/stretchr/testify/assert"
"gopkg.in/h2non/gock.v1"
Expand Down Expand Up @@ -59,3 +61,27 @@ func TestClientErrorBadGateway(t *testing.T) {
a.Nil(data)
a.Equal("duffel: An internal server error occurred. Please try again later.", err.Error())
}

func TestClientRetry(t *testing.T) {
ctx := context.TODO()
a := assert.New(t)
gock.New("https://api.duffel.com/air/offer_requests").
Persist().
Reply(502).
AddHeader("Content-Type", "text/html").
File("fixtures/502-bad-gateway.html")
defer gock.Off()

client := New("duffel_test_123",
WithRetry(3, time.Second, time.Second*5, ExponentalBackoff),
WithRetryCondition(func(resp *http.Response, err error) bool {
return err != nil
}),
)
data, err := client.CreateOfferRequest(ctx, OfferRequestInput{
ReturnOffers: true,
})
a.Error(err)
a.Nil(data)
a.Equal("duffel: An internal server error occurred. Please try again later.", err.Error())
}
12 changes: 11 additions & 1 deletion duffel.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,16 @@ type (
Host string
UserAgent string
HttpDoer *http.Client
Debug bool
Retry struct {
MaxAttempts int
MinWaitTime time.Duration
MaxWaitTime time.Duration
// Conditions that will be applied on retry mechanism.
Conditions []RetryCond
// Retry function which describes backoff algorithm.
Fn RetryFunc
}
Debug bool
}

client[Req any, Resp any] struct {
Expand All @@ -256,6 +265,7 @@ type (
options *Options
limiter *rate.Limiter
rateLimit *RateLimit
retry *backoff
afterResponse []func(resp *http.Response)
}

Expand Down
27 changes: 26 additions & 1 deletion options.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@

package duffel

import "net/http"
import (
"net/http"
"time"
)

// WithAPIToken sets the API host to the default Duffel production host.
func WithDefaultAPI() Option {
Expand Down Expand Up @@ -49,3 +52,25 @@ func WithDebug() Option {
c.Debug = true
}
}

// WithRetry enables backoff retrying mechanism. If f retry function isn't provided
// ExponentalBackoff algorithm will be used. You should always use it in bound with WithRetryConditions options.
func WithRetry(maxAttempts int, minWaitTime, maxWaitTime time.Duration, f RetryFunc) Option {
return func(c *Options) {
c.Retry.MaxAttempts = maxAttempts
c.Retry.MinWaitTime = minWaitTime
c.Retry.MaxWaitTime = maxWaitTime
if f == nil {
f = ExponentalBackoff // used as default
}
c.Retry.Fn = f
}
}

// WithRetryConditions appends retry condition. Retry functionality won't work
// without at least 1 retry condition.
func WithRetryCondition(condition RetryCond) Option {
return func(c *Options) {
c.Retry.Conditions = append(c.Retry.Conditions, condition)
}
}
18 changes: 12 additions & 6 deletions request.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"context"
"fmt"
"net/http"
"sync"
"time"

"golang.org/x/time/rate"
Expand All @@ -22,13 +21,20 @@ func newInternalClient[Req any, Resp any](a *API) *client[Req, Resp] {
limiter: rate.NewLimiter(rate.Every(1*time.Second), 5),
afterResponse: []func(resp *http.Response){
func(resp *http.Response) {
mu := new(sync.Mutex)
mu.Lock()
a.lastRequestID, a.lastResponse = resp.Header.Get(RequestIDHeader), resp
mu.Unlock()
if resp != nil {
a.lastRequestID, a.lastResponse = resp.Header.Get(RequestIDHeader), resp
}
},
},
}
if a.options.Retry.MaxAttempts != 0 {
client.retry = &backoff{
minWaitTime: a.options.Retry.MinWaitTime,
maxWaitTime: a.options.Retry.MaxWaitTime,
maxAttempts: int32(a.options.Retry.MaxAttempts),
f: a.options.Retry.Fn,
}
}

return client
}
Expand Down Expand Up @@ -61,9 +67,9 @@ func (c *client[Req, Resp]) Do(ctx context.Context, resourceName string, method
c.rateLimit = rateLimit
c.limiter.SetBurst(rateLimit.Limit)
c.limiter.SetLimit(rate.Every(rateLimit.Period))

if rateLimit.Remaining == 0 || resp.StatusCode == http.StatusTooManyRequests {
return nil, fmt.Errorf("rate limit exceeded, reset in: %s, current limit: %d", rateLimit.Period.String(), rateLimit.Limit)
}

return resp, nil
}
54 changes: 54 additions & 0 deletions retry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package duffel

import (
"math"
"math/rand"
"net/http"
"sync/atomic"
"time"
)

// RetryCond is a condition that applies only to retry backoff mechanism.
type RetryCond func(resp *http.Response, err error) bool

// RetryFunc takes attemps number, minimal and maximal wait time for backoff.
// Returns duration that mechanism have to wait before making a request.
type RetryFunc func(n int, min, max time.Duration) time.Duration

// backoff is a thread-safe retry backoff mechanism.
// Currently supported only ExponentalBackoff retry algorithm.
type backoff struct {
minWaitTime time.Duration
maxWaitTime time.Duration
maxAttempts int32
attempts int32
f RetryFunc
}

const stopBackoff time.Duration = -1

func (b *backoff) next() time.Duration {
if atomic.LoadInt32(&b.attempts) >= b.maxAttempts {
return stopBackoff
}
atomic.AddInt32(&b.attempts, 1)
return b.f(int(atomic.LoadInt32(&b.attempts)), b.minWaitTime, b.maxWaitTime)
}

func (b *backoff) reset() {
atomic.SwapInt32(&b.attempts, 0)
}

func ExponentalBackoff(attemptNum int, min, max time.Duration) time.Duration {
const factor = 2.0
rand.Seed(time.Now().UnixNano())
delay := time.Duration(math.Pow(factor, float64(attemptNum)) * float64(min))
jitter := time.Duration(rand.Float64() * float64(min) * float64(attemptNum))

delay = delay + jitter
if delay > max {
delay = max
}

return delay
}