Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 18 additions & 25 deletions sd/eureka/instancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,10 @@ func NewInstancer(conn fargoConnection, app string, logger log.Logger) *Instance
quitc: make(chan chan struct{}),
}

instances, err := s.getInstances()
if err == nil {
s.logger.Log("instances", len(instances))
} else {
s.logger.Log("during", "getInstances", "err", err)
}

s.cache.Update(sd.Event{Instances: instances, Err: err})
go s.loop()
done := make(chan struct{})
updates := conn.ScheduleAppUpdates(app, true, done)
s.consume(<-updates)
go s.loop(updates, done)
return s
}

Expand All @@ -53,26 +48,24 @@ func (s *Instancer) Stop() {
s.quitc = nil
}

func (s *Instancer) loop() {
var (
await = false
done = make(chan struct{})
updatec = s.conn.ScheduleAppUpdates(s.app, await, done)
)
func (s *Instancer) consume(update fargo.AppUpdate) {
if update.Err != nil {
s.logger.Log("during", "Update", "err", update.Err)
s.cache.Update(sd.Event{Err: update.Err})
return
}
instances := convertFargoAppToInstances(update.App)
s.logger.Log("instances", len(instances))
s.cache.Update(sd.Event{Instances: instances})
}

func (s *Instancer) loop(updates <-chan fargo.AppUpdate, done chan<- struct{}) {
defer close(done)

for {
select {
case update := <-updatec:
if update.Err != nil {
s.logger.Log("during", "Update", "err", update.Err)
s.cache.Update(sd.Event{Err: update.Err})
continue
}
instances := convertFargoAppToInstances(update.App)
s.logger.Log("instances", len(instances))
s.cache.Update(sd.Event{Instances: instances})

case update := <-updates:
s.consume(update)
case q := <-s.quitc:
close(q)
return
Expand Down
72 changes: 29 additions & 43 deletions sd/eureka/instancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,94 +13,80 @@ var _ sd.Instancer = &Instancer{} // API check

func TestInstancer(t *testing.T) {
connection := &testConnection{
instances: []*fargo.Instance{instanceTest1},
application: appUpdateTest,
instances: []*fargo.Instance{instanceTest1, instanceTest2},
errApplication: nil,
}

instancer := NewInstancer(connection, appNameTest, loggerTest)
defer instancer.Stop()

state := instancer.cache.State()
state := instancer.state()
if state.Err != nil {
t.Fatal(state.Err)
}

if want, have := 1, len(state.Instances); want != have {
if want, have := 2, len(state.Instances); want != have {
t.Errorf("want %d, have %d", want, have)
}
}

func TestInstancerScheduleUpdates(t *testing.T) {
func TestInstancerReceivesUpdates(t *testing.T) {
connection := &testConnection{
instances: []*fargo.Instance{instanceTest1},
application: appUpdateTest,
errApplication: nil,
}

instancer := NewInstancer(connection, appNameTest, loggerTest)
defer instancer.Stop()

state := instancer.cache.State()
if want, have := 1, len(state.Instances); want != have {
t.Errorf("want %d, have %d", want, have)
verifyCount := func(want int) (have int, converged bool) {
const maxPollAttempts = 5
const delayPerAttempt = 200 * time.Millisecond
for i := 1; ; i++ {
state := instancer.state()
if have := len(state.Instances); want == have {
return have, true
} else if i == maxPollAttempts {
return have, false
}
time.Sleep(delayPerAttempt)
}
}

time.Sleep(50 * time.Millisecond)

state = instancer.cache.State()
if want, have := 2, len(state.Instances); want != have {
t.Errorf("want %v, have %v", want, have)
if have, converged := verifyCount(1); !converged {
t.Fatalf("initial: want %d, have %d", 1, have)
}
}

func TestBadInstancerInstances(t *testing.T) {
connection := &testConnection{
instances: []*fargo.Instance{},
errInstances: errTest,
application: appUpdateTest,
errApplication: nil,
if err := connection.RegisterInstance(instanceTest2); err != nil {
t.Fatalf("failed to register an instance: %v", err)
}

instancer := NewInstancer(connection, appNameTest, loggerTest)
defer instancer.Stop()

state := instancer.cache.State()
if state.Err == nil {
t.Fatal("expecting error")
if have, converged := verifyCount(2); !converged {
t.Fatalf("after registration: want %d, have %d", 2, have)
}

if want, have := 0, len(state.Instances); want != have {
t.Errorf("want %d, have %d", want, have)
if err := connection.DeregisterInstance(instanceTest1); err != nil {
t.Fatalf("failed to unregister an instance: %v", err)
}
if have, converged := verifyCount(1); !converged {
t.Fatalf("after deregistration: want %d, have %d", 1, have)
}
}

func TestBadInstancerScheduleUpdates(t *testing.T) {
connection := &testConnection{
instances: []*fargo.Instance{instanceTest1},
application: appUpdateTest,
errApplication: errTest,
}

instancer := NewInstancer(connection, appNameTest, loggerTest)
defer instancer.Stop()

state := instancer.cache.State()
if state.Err != nil {
t.Error(state.Err)
}
if want, have := 1, len(state.Instances); want != have {
t.Errorf("want %d, have %d", want, have)
}

time.Sleep(50 * time.Millisecond)

state = instancer.cache.State()
state := instancer.state()
if state.Err == nil {
t.Fatal("expecting error")
}

if want, have := 0, len(state.Instances); want != have {
t.Errorf("want %v, have %v", want, have)
t.Errorf("want %d, have %d", want, have)
}
}
36 changes: 26 additions & 10 deletions sd/eureka/registrar.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ type fargoUnsuccessfulHTTPResponse struct {
messagePrefix string
}

func (u *fargoUnsuccessfulHTTPResponse) Error() string {
return fmt.Sprintf("err=%s code=%d", u.messagePrefix, u.statusCode)
}

// Registrar maintains service instance liveness information in Eureka.
type Registrar struct {
conn fargoConnection
Expand Down Expand Up @@ -110,18 +114,30 @@ func (r *Registrar) loop() {
}
}

func httpResponseStatusCode(err error) (code int, present bool) {
if code, ok := fargo.HTTPResponseStatusCode(err); ok {
return code, true
}
// Allow injection of errors for testing.
if u, ok := err.(*fargoUnsuccessfulHTTPResponse); ok {
return u.statusCode, true
}
return 0, false
}

func isNotFound(err error) bool {
code, ok := httpResponseStatusCode(err)
return ok && code == http.StatusNotFound
}

func (r *Registrar) heartbeat() error {
err := r.conn.HeartBeatInstance(r.instance)
if err != nil {
if u, ok := err.(*fargoUnsuccessfulHTTPResponse); ok && u.statusCode == http.StatusNotFound {
// Instance expired (e.g. network partition). Re-register.
r.logger.Log("during", "heartbeat", err.Error())
return r.conn.ReregisterInstance(r.instance)
}
if err == nil {
return nil
}
if isNotFound(err) {
// Instance expired (e.g. network partition). Re-register.
return r.conn.ReregisterInstance(r.instance)
}
return err
}

func (u *fargoUnsuccessfulHTTPResponse) Error() string {
return fmt.Sprintf("err=%s code=%d", u.messagePrefix, u.statusCode)
}
110 changes: 77 additions & 33 deletions sd/eureka/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,19 @@ package eureka

import (
"errors"
"fmt"
"reflect"
"sync"
"time"

"github.com/go-kit/kit/log"
"github.com/hudl/fargo"
)

type testConnection struct {
instances []*fargo.Instance
application *fargo.Application
errInstances error
mu sync.RWMutex
instances []*fargo.Instance

errApplication error
errHeartbeat error
errRegister error
Expand All @@ -23,10 +26,6 @@ var (
errNotFound = &fargoUnsuccessfulHTTPResponse{statusCode: 404, messagePrefix: "not found"}
loggerTest = log.NewNopLogger()
appNameTest = "go-kit"
appUpdateTest = &fargo.Application{
Name: appNameTest,
Instances: []*fargo.Instance{instanceTest1, instanceTest2},
}
instanceTest1 = &fargo.Instance{
HostName: "serveregistrar1.acme.org",
Port: 8080,
Expand Down Expand Up @@ -59,50 +58,95 @@ var (
var _ fargoConnection = (*testConnection)(nil)

func (c *testConnection) RegisterInstance(i *fargo.Instance) error {
if c.errRegister == nil {
for _, instance := range c.instances {
if reflect.DeepEqual(*instance, *i) {
return errors.New("already registered")
}
if c.errRegister != nil {
return c.errRegister
}
c.mu.Lock()
defer c.mu.Unlock()
for _, instance := range c.instances {
if reflect.DeepEqual(*instance, *i) {
return errors.New("already registered")
}

c.instances = append(c.instances, i)
}
return c.errRegister
c.instances = append(c.instances, i)
return nil
}

func (c *testConnection) HeartBeatInstance(i *fargo.Instance) error {
return c.errHeartbeat
}

func (c *testConnection) DeregisterInstance(i *fargo.Instance) error {
if c.errDeregister == nil {
var newInstances []*fargo.Instance
for _, instance := range c.instances {
if reflect.DeepEqual(*instance, *i) {
continue
}
newInstances = append(newInstances, instance)
}
if len(newInstances) == len(c.instances) {
return errors.New("not registered")
if c.errDeregister != nil {
return c.errDeregister
}
c.mu.Lock()
defer c.mu.Unlock()
remaining := make([]*fargo.Instance, 0, len(c.instances))
for _, instance := range c.instances {
if reflect.DeepEqual(*instance, *i) {
continue
}

c.instances = newInstances
remaining = append(remaining, instance)
}
return c.errDeregister
if len(remaining) == len(c.instances) {
return errors.New("not registered")
}
c.instances = remaining
return nil
}

func (c *testConnection) ReregisterInstance(ins *fargo.Instance) error {
return nil
}

func (c *testConnection) ScheduleAppUpdates(name string, await bool, done <-chan struct{}) <-chan fargo.AppUpdate {
updatec := make(chan fargo.AppUpdate, 1)
updatec <- fargo.AppUpdate{App: c.application, Err: c.errApplication}
return updatec
func (c *testConnection) instancesForApplication(name string) []*fargo.Instance {
c.mu.RLock()
defer c.mu.RUnlock()
instances := make([]*fargo.Instance, 0, len(c.instances))
for _, i := range c.instances {
if i.App == name {
instances = append(instances, i)
}
}
return instances
}

func (c *testConnection) GetApp(name string) (*fargo.Application, error) {
return &fargo.Application{Name: appNameTest, Instances: c.instances}, c.errInstances
if err := c.errApplication; err != nil {
return nil, err
}
instances := c.instancesForApplication(name)
if len(instances) == 0 {
return nil, fmt.Errorf("Application not found for name=%s", name)
}
return &fargo.Application{Name: name, Instances: instances}, nil
}

func (c *testConnection) ScheduleAppUpdates(name string, await bool, done <-chan struct{}) <-chan fargo.AppUpdate {
updatec := make(chan fargo.AppUpdate, 1)
send := func() {
app, err := c.GetApp(name)
select {
case updatec <- fargo.AppUpdate{App: app, Err: err}:
default:
}
}

if await {
send()
}
go func() {
ticker := time.NewTicker(100 * time.Millisecond)
for {
select {
case <-ticker.C:
send()
case <-done:
ticker.Stop()
return
}
}
}()
return updatec
}