From 924501ae1fcfadaa27593e9c019283412c513928 Mon Sep 17 00:00:00 2001 From: Nico Tonozzi Date: Sun, 3 Dec 2017 13:38:11 -0700 Subject: [PATCH 1/7] Add backoff package Justification for jitter and growth factor: https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/. Add backoff to the Consul instancer loop. Fixes https://github.com/go-kit/kit/issues/627. --- backoff/backoff.go | 70 +++++++++++++++++++++++++++++++++++++++++ backoff/backoff_test.go | 34 ++++++++++++++++++++ sd/consul/instancer.go | 4 +++ util/conn/manager.go | 18 +++-------- 4 files changed, 113 insertions(+), 13 deletions(-) create mode 100644 backoff/backoff.go create mode 100644 backoff/backoff_test.go diff --git a/backoff/backoff.go b/backoff/backoff.go new file mode 100644 index 000000000..2566943be --- /dev/null +++ b/backoff/backoff.go @@ -0,0 +1,70 @@ +package backoff + +import ( + "math/rand" + "sync/atomic" + "time" +) + +const ( + DefaultInterval = time.Second + DefaultMaxInterval = time.Minute +) + +// ExponentialBackoff provides jittered exponential durations for the purpose of +// avoiding flodding a service with requests. +type ExponentialBackoff struct { + Interval time.Duration + Max time.Duration + + currentInterval atomic.Value + cancel <-chan struct{} +} + +// New creates a new ExpontentialBackoff instance with the default values, and +// an optional cancel channel. +func New(cancel <-chan struct{}) *ExponentialBackoff { + backoff := ExponentialBackoff{ + Interval: DefaultInterval, + Max: DefaultMaxInterval, + cancel: cancel, + } + backoff.Reset() + return &backoff +} + +// Reset should be called after a request succeeds. +func (b *ExponentialBackoff) Reset() { + b.currentInterval.Store(b.Interval) +} + +// Wait increases the backoff and blocks until the duration is over or the +// cancel channel is filled. +func (b *ExponentialBackoff) Wait() { + d := b.NextBackoff() + select { + case <-time.After(d): + case <-b.cancel: + } +} + +// NextBackoff updates the time interval and returns the updated value. +func (b *ExponentialBackoff) NextBackoff() time.Duration { + d := b.next() + if d > b.Max { + d = b.Max + } + + b.currentInterval.Store(d) + return d +} + +// next provides the exponential jittered backoff value. See +// https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/ +// for rationale. +func (b *ExponentialBackoff) next() time.Duration { + current := b.currentInterval.Load().(time.Duration) + d := float64(current * 2) + jitter := rand.Float64() + 0.5 + return time.Duration(d * jitter) +} diff --git a/backoff/backoff_test.go b/backoff/backoff_test.go new file mode 100644 index 000000000..bac69b96b --- /dev/null +++ b/backoff/backoff_test.go @@ -0,0 +1,34 @@ +package backoff + +import ( + "testing" + "time" +) + +func TestNext(t *testing.T) { + b := ExponentialBackoff{} + b.currentInterval.Store(time.Duration(12)) + + next := b.next() + + if next < 12 || next > 36 { + t.Errorf("Expected next to be between 12 and 36, got %d", 12) + } +} + +func TestNextBackoffMax(t *testing.T) { + max := time.Duration(13) + b := ExponentialBackoff{ + Max: max, + } + b.currentInterval.Store(time.Duration(14)) + next := b.NextBackoff() + if next != max { + t.Errorf("Expected next to be max, %d, but got %d", max, next) + } + + current := b.currentInterval.Load().(time.Duration) + if current != max { + t.Errorf("Expected currentInterval to be max, %d, but got %d", max, current) + } +} diff --git a/sd/consul/instancer.go b/sd/consul/instancer.go index 38b18f0d3..ad608e459 100644 --- a/sd/consul/instancer.go +++ b/sd/consul/instancer.go @@ -6,6 +6,7 @@ import ( consul "github.com/hashicorp/consul/api" + "github.com/go-kit/kit/backoff" "github.com/go-kit/kit/log" "github.com/go-kit/kit/sd" "github.com/go-kit/kit/sd/internal/instance" @@ -59,6 +60,7 @@ func (s *Instancer) loop(lastIndex uint64) { var ( instances []string err error + backoff = backoff.New(s.quitc) ) for { instances, lastIndex, err = s.getInstances(lastIndex, s.quitc) @@ -68,8 +70,10 @@ func (s *Instancer) loop(lastIndex uint64) { case err != nil: s.logger.Log("err", err) s.cache.Update(sd.Event{Err: err}) + backoff.Wait() default: s.cache.Update(sd.Event{Instances: instances}) + backoff.Reset() } } } diff --git a/util/conn/manager.go b/util/conn/manager.go index 0b7db6281..b627fb757 100644 --- a/util/conn/manager.go +++ b/util/conn/manager.go @@ -5,6 +5,7 @@ import ( "net" "time" + "github.com/go-kit/kit/backoff" "github.com/go-kit/kit/log" ) @@ -86,7 +87,7 @@ func (m *Manager) loop() { conn = dial(m.dialer, m.network, m.address, m.logger) // may block slightly connc = make(chan net.Conn, 1) reconnectc <-chan time.Time // initially nil - backoff = time.Second + backoff = backoff.New(nil) ) // If the initial dial fails, we need to trigger a reconnect via the loop @@ -103,12 +104,11 @@ func (m *Manager) loop() { case conn = <-connc: if conn == nil { // didn't work - backoff = exponential(backoff) // wait longer - reconnectc = m.after(backoff) // try again + reconnectc = m.after(backoff.NextBackoff()) // try again } else { // worked! - backoff = time.Second // reset wait time - reconnectc = nil // no retry necessary + backoff.Reset() // reset wait time + reconnectc = nil // no retry necessary } case m.takec <- conn: @@ -132,14 +132,6 @@ func dial(d Dialer, network, address string, logger log.Logger) net.Conn { return conn } -func exponential(d time.Duration) time.Duration { - d *= 2 - if d > time.Minute { - d = time.Minute - } - return d -} - // ErrConnectionUnavailable is returned by the Manager's Write method when the // manager cannot yield a good connection. var ErrConnectionUnavailable = errors.New("connection unavailable") From 0ad65865bc1cd1c090f659b1e56eed758dc3729e Mon Sep 17 00:00:00 2001 From: Nico Tonozzi Date: Sun, 18 Mar 2018 21:39:56 -0600 Subject: [PATCH 2/7] Revert "Add backoff package" This reverts commit 924501ae1fcfadaa27593e9c019283412c513928. --- backoff/backoff.go | 70 ----------------------------------------- backoff/backoff_test.go | 34 -------------------- sd/consul/instancer.go | 4 --- util/conn/manager.go | 18 ++++++++--- 4 files changed, 13 insertions(+), 113 deletions(-) delete mode 100644 backoff/backoff.go delete mode 100644 backoff/backoff_test.go diff --git a/backoff/backoff.go b/backoff/backoff.go deleted file mode 100644 index 2566943be..000000000 --- a/backoff/backoff.go +++ /dev/null @@ -1,70 +0,0 @@ -package backoff - -import ( - "math/rand" - "sync/atomic" - "time" -) - -const ( - DefaultInterval = time.Second - DefaultMaxInterval = time.Minute -) - -// ExponentialBackoff provides jittered exponential durations for the purpose of -// avoiding flodding a service with requests. -type ExponentialBackoff struct { - Interval time.Duration - Max time.Duration - - currentInterval atomic.Value - cancel <-chan struct{} -} - -// New creates a new ExpontentialBackoff instance with the default values, and -// an optional cancel channel. -func New(cancel <-chan struct{}) *ExponentialBackoff { - backoff := ExponentialBackoff{ - Interval: DefaultInterval, - Max: DefaultMaxInterval, - cancel: cancel, - } - backoff.Reset() - return &backoff -} - -// Reset should be called after a request succeeds. -func (b *ExponentialBackoff) Reset() { - b.currentInterval.Store(b.Interval) -} - -// Wait increases the backoff and blocks until the duration is over or the -// cancel channel is filled. -func (b *ExponentialBackoff) Wait() { - d := b.NextBackoff() - select { - case <-time.After(d): - case <-b.cancel: - } -} - -// NextBackoff updates the time interval and returns the updated value. -func (b *ExponentialBackoff) NextBackoff() time.Duration { - d := b.next() - if d > b.Max { - d = b.Max - } - - b.currentInterval.Store(d) - return d -} - -// next provides the exponential jittered backoff value. See -// https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/ -// for rationale. -func (b *ExponentialBackoff) next() time.Duration { - current := b.currentInterval.Load().(time.Duration) - d := float64(current * 2) - jitter := rand.Float64() + 0.5 - return time.Duration(d * jitter) -} diff --git a/backoff/backoff_test.go b/backoff/backoff_test.go deleted file mode 100644 index bac69b96b..000000000 --- a/backoff/backoff_test.go +++ /dev/null @@ -1,34 +0,0 @@ -package backoff - -import ( - "testing" - "time" -) - -func TestNext(t *testing.T) { - b := ExponentialBackoff{} - b.currentInterval.Store(time.Duration(12)) - - next := b.next() - - if next < 12 || next > 36 { - t.Errorf("Expected next to be between 12 and 36, got %d", 12) - } -} - -func TestNextBackoffMax(t *testing.T) { - max := time.Duration(13) - b := ExponentialBackoff{ - Max: max, - } - b.currentInterval.Store(time.Duration(14)) - next := b.NextBackoff() - if next != max { - t.Errorf("Expected next to be max, %d, but got %d", max, next) - } - - current := b.currentInterval.Load().(time.Duration) - if current != max { - t.Errorf("Expected currentInterval to be max, %d, but got %d", max, current) - } -} diff --git a/sd/consul/instancer.go b/sd/consul/instancer.go index ad608e459..38b18f0d3 100644 --- a/sd/consul/instancer.go +++ b/sd/consul/instancer.go @@ -6,7 +6,6 @@ import ( consul "github.com/hashicorp/consul/api" - "github.com/go-kit/kit/backoff" "github.com/go-kit/kit/log" "github.com/go-kit/kit/sd" "github.com/go-kit/kit/sd/internal/instance" @@ -60,7 +59,6 @@ func (s *Instancer) loop(lastIndex uint64) { var ( instances []string err error - backoff = backoff.New(s.quitc) ) for { instances, lastIndex, err = s.getInstances(lastIndex, s.quitc) @@ -70,10 +68,8 @@ func (s *Instancer) loop(lastIndex uint64) { case err != nil: s.logger.Log("err", err) s.cache.Update(sd.Event{Err: err}) - backoff.Wait() default: s.cache.Update(sd.Event{Instances: instances}) - backoff.Reset() } } } diff --git a/util/conn/manager.go b/util/conn/manager.go index b627fb757..0b7db6281 100644 --- a/util/conn/manager.go +++ b/util/conn/manager.go @@ -5,7 +5,6 @@ import ( "net" "time" - "github.com/go-kit/kit/backoff" "github.com/go-kit/kit/log" ) @@ -87,7 +86,7 @@ func (m *Manager) loop() { conn = dial(m.dialer, m.network, m.address, m.logger) // may block slightly connc = make(chan net.Conn, 1) reconnectc <-chan time.Time // initially nil - backoff = backoff.New(nil) + backoff = time.Second ) // If the initial dial fails, we need to trigger a reconnect via the loop @@ -104,11 +103,12 @@ func (m *Manager) loop() { case conn = <-connc: if conn == nil { // didn't work - reconnectc = m.after(backoff.NextBackoff()) // try again + backoff = exponential(backoff) // wait longer + reconnectc = m.after(backoff) // try again } else { // worked! - backoff.Reset() // reset wait time - reconnectc = nil // no retry necessary + backoff = time.Second // reset wait time + reconnectc = nil // no retry necessary } case m.takec <- conn: @@ -132,6 +132,14 @@ func dial(d Dialer, network, address string, logger log.Logger) net.Conn { return conn } +func exponential(d time.Duration) time.Duration { + d *= 2 + if d > time.Minute { + d = time.Minute + } + return d +} + // ErrConnectionUnavailable is returned by the Manager's Write method when the // manager cannot yield a good connection. var ErrConnectionUnavailable = errors.New("connection unavailable") From 99319fb08a17f970a371678eaaa2dbdc5eb5e9ec Mon Sep 17 00:00:00 2001 From: Nico Tonozzi Date: Sun, 18 Mar 2018 21:51:37 -0600 Subject: [PATCH 3/7] Get rid of external package and update exponential --- util/conn/manager.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/util/conn/manager.go b/util/conn/manager.go index 0b7db6281..0cdcfa48c 100644 --- a/util/conn/manager.go +++ b/util/conn/manager.go @@ -2,6 +2,7 @@ package conn import ( "errors" + "math/rand" "net" "time" @@ -134,10 +135,13 @@ func dial(d Dialer, network, address string, logger log.Logger) net.Conn { func exponential(d time.Duration) time.Duration { d *= 2 + jitter := rand.Float64() + 0.5 + d = time.Duration(int64(float64(d.Nanoseconds()) * jitter)) if d > time.Minute { d = time.Minute } return d + } // ErrConnectionUnavailable is returned by the Manager's Write method when the From 8c60620d88882dc17e1f19a7abcff446c7f30973 Mon Sep 17 00:00:00 2001 From: Nico Tonozzi Date: Sun, 18 Mar 2018 22:09:48 -0600 Subject: [PATCH 4/7] Add instancer backoff --- sd/consul/instancer.go | 5 +++++ util/conn/manager.go | 2 +- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/sd/consul/instancer.go b/sd/consul/instancer.go index 38b18f0d3..12c1a26b3 100644 --- a/sd/consul/instancer.go +++ b/sd/consul/instancer.go @@ -3,12 +3,14 @@ package consul import ( "fmt" "io" + "time" consul "github.com/hashicorp/consul/api" "github.com/go-kit/kit/log" "github.com/go-kit/kit/sd" "github.com/go-kit/kit/sd/internal/instance" + "github.com/go-kit/kit/util/conn" ) const defaultIndex = 0 @@ -59,6 +61,7 @@ func (s *Instancer) loop(lastIndex uint64) { var ( instances []string err error + d time.Duration = time.Millisecond * 10 ) for { instances, lastIndex, err = s.getInstances(lastIndex, s.quitc) @@ -70,6 +73,8 @@ func (s *Instancer) loop(lastIndex uint64) { s.cache.Update(sd.Event{Err: err}) default: s.cache.Update(sd.Event{Instances: instances}) + time.Sleep(d) + d = conn.Exponential(d) } } } diff --git a/util/conn/manager.go b/util/conn/manager.go index 0cdcfa48c..f100015f3 100644 --- a/util/conn/manager.go +++ b/util/conn/manager.go @@ -133,7 +133,7 @@ func dial(d Dialer, network, address string, logger log.Logger) net.Conn { return conn } -func exponential(d time.Duration) time.Duration { +func Exponential(d time.Duration) time.Duration { d *= 2 jitter := rand.Float64() + 0.5 d = time.Duration(int64(float64(d.Nanoseconds()) * jitter)) From 10620f32cdcb200c6b89b03a9778bc27249d7040 Mon Sep 17 00:00:00 2001 From: Nico Tonozzi Date: Sun, 18 Mar 2018 22:51:25 -0600 Subject: [PATCH 5/7] Fix old exponential name --- util/conn/manager.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/util/conn/manager.go b/util/conn/manager.go index f100015f3..f52e4d175 100644 --- a/util/conn/manager.go +++ b/util/conn/manager.go @@ -104,7 +104,7 @@ func (m *Manager) loop() { case conn = <-connc: if conn == nil { // didn't work - backoff = exponential(backoff) // wait longer + backoff = Exponential(backoff) // wait longer reconnectc = m.after(backoff) // try again } else { // worked! From 9e8371f7d901e290e8459b9d5e8a0fcf98d786d6 Mon Sep 17 00:00:00 2001 From: Nico Tonozzi Date: Sat, 31 Mar 2018 17:55:47 -0600 Subject: [PATCH 6/7] Add doc comment --- util/conn/manager.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/util/conn/manager.go b/util/conn/manager.go index f52e4d175..725cbbc7a 100644 --- a/util/conn/manager.go +++ b/util/conn/manager.go @@ -133,6 +133,9 @@ func dial(d Dialer, network, address string, logger log.Logger) net.Conn { return conn } +// Exponential takes a duration and returns another one that is twice as long, +/- 50%. It is +// used to provide backoff for operations that may fail and should avoid thundering herds. +// See https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/ for rationale func Exponential(d time.Duration) time.Duration { d *= 2 jitter := rand.Float64() + 0.5 From dc88158e5249fde11bafee7086987e03114b6ce4 Mon Sep 17 00:00:00 2001 From: Nico Tonozzi Date: Sun, 1 Apr 2018 11:20:29 -0600 Subject: [PATCH 7/7] Fixup & respond to review --- sd/consul/instancer.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/sd/consul/instancer.go b/sd/consul/instancer.go index 12c1a26b3..6eb87a450 100644 --- a/sd/consul/instancer.go +++ b/sd/consul/instancer.go @@ -61,7 +61,7 @@ func (s *Instancer) loop(lastIndex uint64) { var ( instances []string err error - d time.Duration = time.Millisecond * 10 + d time.Duration = 10 * time.Millisecond ) for { instances, lastIndex, err = s.getInstances(lastIndex, s.quitc) @@ -70,11 +70,12 @@ func (s *Instancer) loop(lastIndex uint64) { return // stopped via quitc case err != nil: s.logger.Log("err", err) + time.Sleep(d) + d = conn.Exponential(d) s.cache.Update(sd.Event{Err: err}) default: s.cache.Update(sd.Event{Instances: instances}) - time.Sleep(d) - d = conn.Exponential(d) + d = 10 * time.Millisecond } } }