From 4ede06ea7e1625066f929d1e92dfcd755ef9db12 Mon Sep 17 00:00:00 2001 From: "Steven E. Harris" Date: Tue, 6 Jun 2017 15:51:22 -0400 Subject: [PATCH 1/2] Simplify Eureka Instancer's initial cache priming Rather than first fetching a Eureka application's instances synchronously and then scheduling subsequent background updates, instead schedule the updates, wait on the first update to arrive, an then proceed. Revise the tests accordingly to no longer rely on a set of instances known to the (fake) server as distinct from any applications. Since applications are an intensional view of instances, like the real Eureka server we derive the presence of applications from the application names mentioned by instances. That is, without at least one instance declaring that it's a member of a given application, no such application exists. --- sd/eureka/instancer.go | 43 ++++++-------- sd/eureka/instancer_test.go | 72 ++++++++++------------- sd/eureka/util_test.go | 110 +++++++++++++++++++++++++----------- 3 files changed, 124 insertions(+), 101 deletions(-) diff --git a/sd/eureka/instancer.go b/sd/eureka/instancer.go index 69c45569e..d00b98c99 100644 --- a/sd/eureka/instancer.go +++ b/sd/eureka/instancer.go @@ -33,15 +33,10 @@ func NewInstancer(conn fargoConnection, app string, logger log.Logger) *Instance quitc: make(chan chan struct{}), } - instances, err := s.getInstances() - if err == nil { - s.logger.Log("instances", len(instances)) - } else { - s.logger.Log("during", "getInstances", "err", err) - } - - s.cache.Update(sd.Event{Instances: instances, Err: err}) - go s.loop() + done := make(chan struct{}) + updates := conn.ScheduleAppUpdates(app, true, done) + s.consume(<-updates) + go s.loop(updates, done) return s } @@ -53,26 +48,24 @@ func (s *Instancer) Stop() { s.quitc = nil } -func (s *Instancer) loop() { - var ( - await = false - done = make(chan struct{}) - updatec = s.conn.ScheduleAppUpdates(s.app, await, done) - ) +func (s *Instancer) consume(update fargo.AppUpdate) { + if update.Err != nil { + s.logger.Log("during", "Update", "err", update.Err) + s.cache.Update(sd.Event{Err: update.Err}) + return + } + instances := convertFargoAppToInstances(update.App) + s.logger.Log("instances", len(instances)) + s.cache.Update(sd.Event{Instances: instances}) +} + +func (s *Instancer) loop(updates <-chan fargo.AppUpdate, done chan<- struct{}) { defer close(done) for { select { - case update := <-updatec: - if update.Err != nil { - s.logger.Log("during", "Update", "err", update.Err) - s.cache.Update(sd.Event{Err: update.Err}) - continue - } - instances := convertFargoAppToInstances(update.App) - s.logger.Log("instances", len(instances)) - s.cache.Update(sd.Event{Instances: instances}) - + case update := <-updates: + s.consume(update) case q := <-s.quitc: close(q) return diff --git a/sd/eureka/instancer_test.go b/sd/eureka/instancer_test.go index 98c83e506..cde4c61ae 100644 --- a/sd/eureka/instancer_test.go +++ b/sd/eureka/instancer_test.go @@ -13,94 +13,80 @@ var _ sd.Instancer = &Instancer{} // API check func TestInstancer(t *testing.T) { connection := &testConnection{ - instances: []*fargo.Instance{instanceTest1}, - application: appUpdateTest, + instances: []*fargo.Instance{instanceTest1, instanceTest2}, errApplication: nil, } instancer := NewInstancer(connection, appNameTest, loggerTest) defer instancer.Stop() - state := instancer.cache.State() + state := instancer.state() if state.Err != nil { t.Fatal(state.Err) } - if want, have := 1, len(state.Instances); want != have { + if want, have := 2, len(state.Instances); want != have { t.Errorf("want %d, have %d", want, have) } } -func TestInstancerScheduleUpdates(t *testing.T) { +func TestInstancerReceivesUpdates(t *testing.T) { connection := &testConnection{ instances: []*fargo.Instance{instanceTest1}, - application: appUpdateTest, errApplication: nil, } instancer := NewInstancer(connection, appNameTest, loggerTest) defer instancer.Stop() - state := instancer.cache.State() - if want, have := 1, len(state.Instances); want != have { - t.Errorf("want %d, have %d", want, have) + verifyCount := func(want int) (have int, converged bool) { + const maxPollAttempts = 5 + const delayPerAttempt = 200 * time.Millisecond + for i := 1; ; i++ { + state := instancer.state() + if have := len(state.Instances); want == have { + return have, true + } else if i == maxPollAttempts { + return have, false + } + time.Sleep(delayPerAttempt) + } } - time.Sleep(50 * time.Millisecond) - - state = instancer.cache.State() - if want, have := 2, len(state.Instances); want != have { - t.Errorf("want %v, have %v", want, have) + if have, converged := verifyCount(1); !converged { + t.Fatalf("initial: want %d, have %d", 1, have) } -} -func TestBadInstancerInstances(t *testing.T) { - connection := &testConnection{ - instances: []*fargo.Instance{}, - errInstances: errTest, - application: appUpdateTest, - errApplication: nil, + if err := connection.RegisterInstance(instanceTest2); err != nil { + t.Fatalf("failed to register an instance: %v", err) } - - instancer := NewInstancer(connection, appNameTest, loggerTest) - defer instancer.Stop() - - state := instancer.cache.State() - if state.Err == nil { - t.Fatal("expecting error") + if have, converged := verifyCount(2); !converged { + t.Fatalf("after registration: want %d, have %d", 2, have) } - if want, have := 0, len(state.Instances); want != have { - t.Errorf("want %d, have %d", want, have) + if err := connection.DeregisterInstance(instanceTest1); err != nil { + t.Fatalf("failed to unregister an instance: %v", err) + } + if have, converged := verifyCount(1); !converged { + t.Fatalf("after deregistration: want %d, have %d", 1, have) } } func TestBadInstancerScheduleUpdates(t *testing.T) { connection := &testConnection{ instances: []*fargo.Instance{instanceTest1}, - application: appUpdateTest, errApplication: errTest, } instancer := NewInstancer(connection, appNameTest, loggerTest) defer instancer.Stop() - state := instancer.cache.State() - if state.Err != nil { - t.Error(state.Err) - } - if want, have := 1, len(state.Instances); want != have { - t.Errorf("want %d, have %d", want, have) - } - - time.Sleep(50 * time.Millisecond) - - state = instancer.cache.State() + state := instancer.state() if state.Err == nil { t.Fatal("expecting error") } if want, have := 0, len(state.Instances); want != have { - t.Errorf("want %v, have %v", want, have) + t.Errorf("want %d, have %d", want, have) } } diff --git a/sd/eureka/util_test.go b/sd/eureka/util_test.go index a66e7121a..e8a71a448 100644 --- a/sd/eureka/util_test.go +++ b/sd/eureka/util_test.go @@ -2,16 +2,19 @@ package eureka import ( "errors" + "fmt" "reflect" + "sync" + "time" "github.com/go-kit/kit/log" "github.com/hudl/fargo" ) type testConnection struct { - instances []*fargo.Instance - application *fargo.Application - errInstances error + mu sync.RWMutex + instances []*fargo.Instance + errApplication error errHeartbeat error errRegister error @@ -23,10 +26,6 @@ var ( errNotFound = &fargoUnsuccessfulHTTPResponse{statusCode: 404, messagePrefix: "not found"} loggerTest = log.NewNopLogger() appNameTest = "go-kit" - appUpdateTest = &fargo.Application{ - Name: appNameTest, - Instances: []*fargo.Instance{instanceTest1, instanceTest2}, - } instanceTest1 = &fargo.Instance{ HostName: "serveregistrar1.acme.org", Port: 8080, @@ -59,16 +58,18 @@ var ( var _ fargoConnection = (*testConnection)(nil) func (c *testConnection) RegisterInstance(i *fargo.Instance) error { - if c.errRegister == nil { - for _, instance := range c.instances { - if reflect.DeepEqual(*instance, *i) { - return errors.New("already registered") - } + if c.errRegister != nil { + return c.errRegister + } + c.mu.Lock() + defer c.mu.Unlock() + for _, instance := range c.instances { + if reflect.DeepEqual(*instance, *i) { + return errors.New("already registered") } - - c.instances = append(c.instances, i) } - return c.errRegister + c.instances = append(c.instances, i) + return nil } func (c *testConnection) HeartBeatInstance(i *fargo.Instance) error { @@ -76,33 +77,76 @@ func (c *testConnection) HeartBeatInstance(i *fargo.Instance) error { } func (c *testConnection) DeregisterInstance(i *fargo.Instance) error { - if c.errDeregister == nil { - var newInstances []*fargo.Instance - for _, instance := range c.instances { - if reflect.DeepEqual(*instance, *i) { - continue - } - newInstances = append(newInstances, instance) - } - if len(newInstances) == len(c.instances) { - return errors.New("not registered") + if c.errDeregister != nil { + return c.errDeregister + } + c.mu.Lock() + defer c.mu.Unlock() + remaining := make([]*fargo.Instance, 0, len(c.instances)) + for _, instance := range c.instances { + if reflect.DeepEqual(*instance, *i) { + continue } - - c.instances = newInstances + remaining = append(remaining, instance) } - return c.errDeregister + if len(remaining) == len(c.instances) { + return errors.New("not registered") + } + c.instances = remaining + return nil } func (c *testConnection) ReregisterInstance(ins *fargo.Instance) error { return nil } -func (c *testConnection) ScheduleAppUpdates(name string, await bool, done <-chan struct{}) <-chan fargo.AppUpdate { - updatec := make(chan fargo.AppUpdate, 1) - updatec <- fargo.AppUpdate{App: c.application, Err: c.errApplication} - return updatec +func (c *testConnection) instancesForApplication(name string) []*fargo.Instance { + c.mu.RLock() + defer c.mu.RUnlock() + instances := make([]*fargo.Instance, 0, len(c.instances)) + for _, i := range c.instances { + if i.App == name { + instances = append(instances, i) + } + } + return instances } func (c *testConnection) GetApp(name string) (*fargo.Application, error) { - return &fargo.Application{Name: appNameTest, Instances: c.instances}, c.errInstances + if err := c.errApplication; err != nil { + return nil, err + } + instances := c.instancesForApplication(name) + if len(instances) == 0 { + return nil, fmt.Errorf("Application not found for name=%s", name) + } + return &fargo.Application{Name: name, Instances: instances}, nil +} + +func (c *testConnection) ScheduleAppUpdates(name string, await bool, done <-chan struct{}) <-chan fargo.AppUpdate { + updatec := make(chan fargo.AppUpdate, 1) + send := func() { + app, err := c.GetApp(name) + select { + case updatec <- fargo.AppUpdate{App: app, Err: err}: + default: + } + } + + if await { + send() + } + go func() { + ticker := time.NewTicker(100 * time.Millisecond) + for { + select { + case <-ticker.C: + send() + case <-done: + ticker.Stop() + return + } + } + }() + return updatec } From a843a9e75e9316a34e3c4b539d2e8c0cc6f17c7f Mon Sep 17 00:00:00 2001 From: "Steven E. Harris" Date: Tue, 6 Jun 2017 15:36:30 -0400 Subject: [PATCH 2/2] Detect when a fargo error indicates not found The fargo library already provides the HTTPResponseStatusCode function to extract an HTTP status code from a returned error, but here we want to be able to inject similar errors for tests, so look for both a real fargo error indicating that a heartbeat attempt failed to find its target instance and a synthetic error returned by a fake connection during a test. --- sd/eureka/registrar.go | 36 ++++++++++++++++++++++++++---------- 1 file changed, 26 insertions(+), 10 deletions(-) diff --git a/sd/eureka/registrar.go b/sd/eureka/registrar.go index 99fef7c5d..7af7836f9 100644 --- a/sd/eureka/registrar.go +++ b/sd/eureka/registrar.go @@ -30,6 +30,10 @@ type fargoUnsuccessfulHTTPResponse struct { messagePrefix string } +func (u *fargoUnsuccessfulHTTPResponse) Error() string { + return fmt.Sprintf("err=%s code=%d", u.messagePrefix, u.statusCode) +} + // Registrar maintains service instance liveness information in Eureka. type Registrar struct { conn fargoConnection @@ -110,18 +114,30 @@ func (r *Registrar) loop() { } } +func httpResponseStatusCode(err error) (code int, present bool) { + if code, ok := fargo.HTTPResponseStatusCode(err); ok { + return code, true + } + // Allow injection of errors for testing. + if u, ok := err.(*fargoUnsuccessfulHTTPResponse); ok { + return u.statusCode, true + } + return 0, false +} + +func isNotFound(err error) bool { + code, ok := httpResponseStatusCode(err) + return ok && code == http.StatusNotFound +} + func (r *Registrar) heartbeat() error { err := r.conn.HeartBeatInstance(r.instance) - if err != nil { - if u, ok := err.(*fargoUnsuccessfulHTTPResponse); ok && u.statusCode == http.StatusNotFound { - // Instance expired (e.g. network partition). Re-register. - r.logger.Log("during", "heartbeat", err.Error()) - return r.conn.ReregisterInstance(r.instance) - } + if err == nil { + return nil + } + if isNotFound(err) { + // Instance expired (e.g. network partition). Re-register. + return r.conn.ReregisterInstance(r.instance) } return err } - -func (u *fargoUnsuccessfulHTTPResponse) Error() string { - return fmt.Sprintf("err=%s code=%d", u.messagePrefix, u.statusCode) -}