diff --git a/circle.yml b/circle.yml index 0c35bb29d..a5acd9cb2 100644 --- a/circle.yml +++ b/circle.yml @@ -24,3 +24,4 @@ test: ETCD_ADDR: http://localhost:2379 CONSUL_ADDR: localhost:8500 ZK_ADDR: localhost:2181 + EUREKA_ADDR: http://localhost:8761/eureka diff --git a/docker-compose-integration.yml b/docker-compose-integration.yml index f316a1376..287d97db0 100644 --- a/docker-compose-integration.yml +++ b/docker-compose-integration.yml @@ -14,3 +14,9 @@ services: image: zookeeper ports: - "2181:2181" + eureka: + image: springcloud/eureka + environment: + eureka.server.responseCacheUpdateIntervalMs: 1000 + ports: + - "8761:8761" diff --git a/sd/eureka/doc.go b/sd/eureka/doc.go new file mode 100644 index 000000000..d41c352f7 --- /dev/null +++ b/sd/eureka/doc.go @@ -0,0 +1,2 @@ +// Package eureka provides subscriber and registrar implementations for Netflix OSS's Eureka +package eureka diff --git a/sd/eureka/integration_test.go b/sd/eureka/integration_test.go new file mode 100644 index 000000000..b519f70bd --- /dev/null +++ b/sd/eureka/integration_test.go @@ -0,0 +1,106 @@ +// +build integration + +package eureka + +import ( + "io" + "os" + "testing" + "time" + + "github.com/hudl/fargo" + + "github.com/go-kit/kit/endpoint" + "github.com/go-kit/kit/log" +) + +// Package sd/eureka provides a wrapper around the Netflix Eureka service +// registry by way of the Fargo library. This test assumes the user has an +// instance of Eureka available at the address in the environment variable. +// Example `${EUREKA_ADDR}` format: http://localhost:8761/eureka +// +// NOTE: when starting a Eureka server for integration testing, ensure +// the response cache interval is reduced to one second. This can be +// achieved with the following Java argument: +// `-Deureka.server.responseCacheUpdateIntervalMs=1000` +func TestIntegration(t *testing.T) { + eurekaAddr := os.Getenv("EUREKA_ADDR") + if eurekaAddr == "" { + t.Skip("EUREKA_ADDR is not set") + } + + logger := log.NewLogfmtLogger(os.Stderr) + logger = log.With(logger, "ts", log.DefaultTimestamp) + + var fargoConfig fargo.Config + // Target Eureka server(s). + fargoConfig.Eureka.ServiceUrls = []string{eurekaAddr} + // How often the subscriber should poll for updates. + fargoConfig.Eureka.PollIntervalSeconds = 1 + + // Create a Fargo connection and a Eureka registrar. + fargoConnection := fargo.NewConnFromConfig(fargoConfig) + registrar1 := NewRegistrar(&fargoConnection, instanceTest1, log.With(logger, "component", "registrar1")) + + // Register one instance. + registrar1.Register() + defer registrar1.Deregister() + + // This should be enough time for the Eureka server response cache to update. + time.Sleep(time.Second) + + // Build a Eureka subscriber. + factory := func(instance string) (endpoint.Endpoint, io.Closer, error) { + t.Logf("factory invoked for %q", instance) + return endpoint.Nop, nil, nil + } + s := NewSubscriber( + &fargoConnection, + appNameTest, + factory, + log.With(logger, "component", "subscriber"), + ) + defer s.Stop() + + // We should have one endpoint immediately after subscriber instantiation. + endpoints, err := s.Endpoints() + if err != nil { + t.Error(err) + } + if want, have := 1, len(endpoints); want != have { + t.Errorf("want %d, have %d", want, have) + } + + // Register a second instance + registrar2 := NewRegistrar(&fargoConnection, instanceTest2, log.With(logger, "component", "registrar2")) + registrar2.Register() + defer registrar2.Deregister() // In case of exceptional circumstances. + + // This should be enough time for a scheduled update assuming Eureka is + // configured with the properties mentioned in the function comments. + time.Sleep(2 * time.Second) + + // Now we should have two endpoints. + endpoints, err = s.Endpoints() + if err != nil { + t.Error(err) + } + if want, have := 2, len(endpoints); want != have { + t.Errorf("want %d, have %d", want, have) + } + + // Deregister the second instance. + registrar2.Deregister() + + // Wait for another scheduled update. + time.Sleep(2 * time.Second) + + // And then there was one. + endpoints, err = s.Endpoints() + if err != nil { + t.Error(err) + } + if want, have := 1, len(endpoints); want != have { + t.Errorf("want %d, have %d", want, have) + } +} diff --git a/sd/eureka/registrar.go b/sd/eureka/registrar.go new file mode 100644 index 000000000..99fef7c5d --- /dev/null +++ b/sd/eureka/registrar.go @@ -0,0 +1,127 @@ +package eureka + +import ( + "fmt" + "net/http" + "sync" + "time" + + "github.com/hudl/fargo" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/sd" +) + +// Matches official Netflix Java client default. +const defaultRenewalInterval = 30 * time.Second + +// The methods of fargo.Connection used in this package. +type fargoConnection interface { + RegisterInstance(instance *fargo.Instance) error + DeregisterInstance(instance *fargo.Instance) error + ReregisterInstance(instance *fargo.Instance) error + HeartBeatInstance(instance *fargo.Instance) error + ScheduleAppUpdates(name string, await bool, done <-chan struct{}) <-chan fargo.AppUpdate + GetApp(name string) (*fargo.Application, error) +} + +type fargoUnsuccessfulHTTPResponse struct { + statusCode int + messagePrefix string +} + +// Registrar maintains service instance liveness information in Eureka. +type Registrar struct { + conn fargoConnection + instance *fargo.Instance + logger log.Logger + quitc chan chan struct{} + sync.Mutex +} + +var _ sd.Registrar = (*Registrar)(nil) + +// NewRegistrar returns an Eureka Registrar acting on behalf of the provided +// Fargo connection and instance. See the integration test for usage examples. +func NewRegistrar(conn fargoConnection, instance *fargo.Instance, logger log.Logger) *Registrar { + return &Registrar{ + conn: conn, + instance: instance, + logger: log.With(logger, "service", instance.App, "address", fmt.Sprintf("%s:%d", instance.IPAddr, instance.Port)), + } +} + +// Register implements sd.Registrar. +func (r *Registrar) Register() { + r.Lock() + defer r.Unlock() + + if r.quitc != nil { + return // Already in the registration loop. + } + + if err := r.conn.RegisterInstance(r.instance); err != nil { + r.logger.Log("during", "Register", "err", err) + } + + r.quitc = make(chan chan struct{}) + go r.loop() +} + +// Deregister implements sd.Registrar. +func (r *Registrar) Deregister() { + r.Lock() + defer r.Unlock() + + if r.quitc == nil { + return // Already deregistered. + } + + q := make(chan struct{}) + r.quitc <- q + <-q + r.quitc = nil +} + +func (r *Registrar) loop() { + var renewalInterval time.Duration + if r.instance.LeaseInfo.RenewalIntervalInSecs > 0 { + renewalInterval = time.Duration(r.instance.LeaseInfo.RenewalIntervalInSecs) * time.Second + } else { + renewalInterval = defaultRenewalInterval + } + ticker := time.NewTicker(renewalInterval) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + if err := r.heartbeat(); err != nil { + r.logger.Log("during", "heartbeat", "err", err) + } + + case q := <-r.quitc: + if err := r.conn.DeregisterInstance(r.instance); err != nil { + r.logger.Log("during", "Deregister", "err", err) + } + close(q) + return + } + } +} + +func (r *Registrar) heartbeat() error { + err := r.conn.HeartBeatInstance(r.instance) + if err != nil { + if u, ok := err.(*fargoUnsuccessfulHTTPResponse); ok && u.statusCode == http.StatusNotFound { + // Instance expired (e.g. network partition). Re-register. + r.logger.Log("during", "heartbeat", err.Error()) + return r.conn.ReregisterInstance(r.instance) + } + } + return err +} + +func (u *fargoUnsuccessfulHTTPResponse) Error() string { + return fmt.Sprintf("err=%s code=%d", u.messagePrefix, u.statusCode) +} diff --git a/sd/eureka/registrar_test.go b/sd/eureka/registrar_test.go new file mode 100644 index 000000000..7974e75b9 --- /dev/null +++ b/sd/eureka/registrar_test.go @@ -0,0 +1,102 @@ +package eureka + +import ( + "testing" + "time" +) + +func TestRegistrar(t *testing.T) { + connection := &testConnection{ + errHeartbeat: errTest, + } + + registrar1 := NewRegistrar(connection, instanceTest1, loggerTest) + registrar2 := NewRegistrar(connection, instanceTest2, loggerTest) + + // Not registered. + registrar1.Deregister() + if want, have := 0, len(connection.instances); want != have { + t.Errorf("want %d, have %d", want, have) + } + + // Register. + registrar1.Register() + if want, have := 1, len(connection.instances); want != have { + t.Errorf("want %d, have %d", want, have) + } + + registrar2.Register() + if want, have := 2, len(connection.instances); want != have { + t.Errorf("want %d, have %d", want, have) + } + + // Deregister. + registrar1.Deregister() + if want, have := 1, len(connection.instances); want != have { + t.Errorf("want %d, have %d", want, have) + } + + // Already registered. + registrar1.Register() + if want, have := 2, len(connection.instances); want != have { + t.Errorf("want %d, have %d", want, have) + } + registrar1.Register() + if want, have := 2, len(connection.instances); want != have { + t.Errorf("want %d, have %d", want, have) + } + + // Wait for a heartbeat failure. + time.Sleep(1010 * time.Millisecond) + if want, have := 2, len(connection.instances); want != have { + t.Errorf("want %d, have %d", want, have) + } + registrar1.Deregister() + if want, have := 1, len(connection.instances); want != have { + t.Errorf("want %d, have %d", want, have) + } +} + +func TestBadRegister(t *testing.T) { + connection := &testConnection{ + errRegister: errTest, + } + + registrar := NewRegistrar(connection, instanceTest1, loggerTest) + registrar.Register() + if want, have := 0, len(connection.instances); want != have { + t.Errorf("want %d, have %d", want, have) + } +} + +func TestBadDeregister(t *testing.T) { + connection := &testConnection{ + errDeregister: errTest, + } + + registrar := NewRegistrar(connection, instanceTest1, loggerTest) + registrar.Register() + if want, have := 1, len(connection.instances); want != have { + t.Errorf("want %d, have %d", want, have) + } + registrar.Deregister() + if want, have := 1, len(connection.instances); want != have { + t.Errorf("want %d, have %d", want, have) + } +} + +func TestExpiredInstance(t *testing.T) { + connection := &testConnection{ + errHeartbeat: errNotFound, + } + + registrar := NewRegistrar(connection, instanceTest1, loggerTest) + registrar.Register() + + // Wait for a heartbeat failure. + time.Sleep(1010 * time.Millisecond) + + if want, have := 1, len(connection.instances); want != have { + t.Errorf("want %d, have %d", want, have) + } +} diff --git a/sd/eureka/subscriber.go b/sd/eureka/subscriber.go new file mode 100644 index 000000000..0300d0d28 --- /dev/null +++ b/sd/eureka/subscriber.go @@ -0,0 +1,106 @@ +package eureka + +import ( + "fmt" + + "github.com/hudl/fargo" + + "github.com/go-kit/kit/endpoint" + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/sd" + "github.com/go-kit/kit/sd/cache" +) + +// Subscriber yields endpoints stored in the Eureka registry for the given app. +// Changes in that app are watched and will update the Subscriber endpoints. +type Subscriber struct { + conn fargoConnection + app string + factory sd.Factory + logger log.Logger + cache *cache.Cache + quitc chan chan struct{} +} + +var _ sd.Subscriber = (*Subscriber)(nil) + +// NewSubscriber returns a Eureka subscriber. It will start watching the given +// app string for changes, and update the endpoints accordingly. +func NewSubscriber(conn fargoConnection, app string, factory sd.Factory, logger log.Logger) *Subscriber { + logger = log.With(logger, "app", app) + + s := &Subscriber{ + conn: conn, + app: app, + factory: factory, + logger: logger, + cache: cache.New(factory, logger), + quitc: make(chan chan struct{}), + } + + instances, err := s.getInstances() + if err == nil { + s.logger.Log("instances", len(instances)) + } else { + s.logger.Log("during", "getInstances", "err", err) + } + + s.cache.Update(instances) + go s.loop() + return s +} + +// Endpoints implements the Subscriber interface. +func (s *Subscriber) Endpoints() ([]endpoint.Endpoint, error) { + return s.cache.Endpoints(), nil +} + +// Stop terminates the subscriber. +func (s *Subscriber) Stop() { + q := make(chan struct{}) + s.quitc <- q + <-q + s.quitc = nil +} + +func (s *Subscriber) loop() { + var ( + await = false + done = make(chan struct{}) + updatec = s.conn.ScheduleAppUpdates(s.app, await, done) + ) + defer close(done) + + for { + select { + case update := <-updatec: + if update.Err != nil { + s.logger.Log("during", "Update", "err", update.Err) + continue + } + instances := convertFargoAppToInstances(update.App) + s.logger.Log("instances", len(instances)) + s.cache.Update(instances) + + case q := <-s.quitc: + close(q) + return + } + } +} + +func (s *Subscriber) getInstances() ([]string, error) { + app, err := s.conn.GetApp(s.app) + if err != nil { + return nil, err + } + return convertFargoAppToInstances(app), nil +} + +func convertFargoAppToInstances(app *fargo.Application) []string { + instances := make([]string, len(app.Instances)) + for i, inst := range app.Instances { + instances[i] = fmt.Sprintf("%s:%d", inst.IPAddr, inst.Port) + } + return instances +} diff --git a/sd/eureka/subscriber_test.go b/sd/eureka/subscriber_test.go new file mode 100644 index 000000000..4d0d9b371 --- /dev/null +++ b/sd/eureka/subscriber_test.go @@ -0,0 +1,144 @@ +package eureka + +import ( + "io" + "testing" + "time" + + "github.com/hudl/fargo" + + "github.com/go-kit/kit/endpoint" +) + +func TestSubscriber(t *testing.T) { + factory := func(string) (endpoint.Endpoint, io.Closer, error) { + return endpoint.Nop, nil, nil + } + + connection := &testConnection{ + instances: []*fargo.Instance{instanceTest1}, + application: appUpdateTest, + errApplication: nil, + } + + subscriber := NewSubscriber(connection, appNameTest, factory, loggerTest) + defer subscriber.Stop() + + endpoints, err := subscriber.Endpoints() + if err != nil { + t.Fatal(err) + } + + if want, have := 1, len(endpoints); want != have { + t.Errorf("want %d, have %d", want, have) + } +} + +func TestSubscriberScheduleUpdates(t *testing.T) { + factory := func(string) (endpoint.Endpoint, io.Closer, error) { + return endpoint.Nop, nil, nil + } + + connection := &testConnection{ + instances: []*fargo.Instance{instanceTest1}, + application: appUpdateTest, + errApplication: nil, + } + + subscriber := NewSubscriber(connection, appNameTest, factory, loggerTest) + defer subscriber.Stop() + + endpoints, _ := subscriber.Endpoints() + if want, have := 1, len(endpoints); want != have { + t.Errorf("want %d, have %d", want, have) + } + + time.Sleep(50 * time.Millisecond) + + endpoints, _ = subscriber.Endpoints() + if want, have := 2, len(endpoints); want != have { + t.Errorf("want %v, have %v", want, have) + } +} + +func TestBadFactory(t *testing.T) { + factory := func(string) (endpoint.Endpoint, io.Closer, error) { + return nil, nil, errTest + } + + connection := &testConnection{ + instances: []*fargo.Instance{instanceTest1}, + application: appUpdateTest, + errApplication: nil, + } + + subscriber := NewSubscriber(connection, appNameTest, factory, loggerTest) + defer subscriber.Stop() + + endpoints, err := subscriber.Endpoints() + if err != nil { + t.Fatal(err) + } + + if want, have := 0, len(endpoints); want != have { + t.Errorf("want %d, have %d", want, have) + } +} + +func TestBadSubscriberInstances(t *testing.T) { + factory := func(string) (endpoint.Endpoint, io.Closer, error) { + return endpoint.Nop, nil, nil + } + + connection := &testConnection{ + instances: []*fargo.Instance{}, + errInstances: errTest, + application: appUpdateTest, + errApplication: nil, + } + + subscriber := NewSubscriber(connection, appNameTest, factory, loggerTest) + defer subscriber.Stop() + + endpoints, err := subscriber.Endpoints() + if err != nil { + t.Fatal(err) + } + + if want, have := 0, len(endpoints); want != have { + t.Errorf("want %d, have %d", want, have) + } +} + +func TestBadSubscriberScheduleUpdates(t *testing.T) { + factory := func(string) (endpoint.Endpoint, io.Closer, error) { + return endpoint.Nop, nil, nil + } + + connection := &testConnection{ + instances: []*fargo.Instance{instanceTest1}, + application: appUpdateTest, + errApplication: errTest, + } + + subscriber := NewSubscriber(connection, appNameTest, factory, loggerTest) + defer subscriber.Stop() + + endpoints, err := subscriber.Endpoints() + if err != nil { + t.Error(err) + } + if want, have := 1, len(endpoints); want != have { + t.Errorf("want %d, have %d", want, have) + } + + time.Sleep(50 * time.Millisecond) + + endpoints, err = subscriber.Endpoints() + if err != nil { + t.Error(err) + } + if want, have := 1, len(endpoints); want != have { + t.Errorf("want %v, have %v", want, have) + } +} diff --git a/sd/eureka/util_test.go b/sd/eureka/util_test.go new file mode 100644 index 000000000..a66e7121a --- /dev/null +++ b/sd/eureka/util_test.go @@ -0,0 +1,108 @@ +package eureka + +import ( + "errors" + "reflect" + + "github.com/go-kit/kit/log" + "github.com/hudl/fargo" +) + +type testConnection struct { + instances []*fargo.Instance + application *fargo.Application + errInstances error + errApplication error + errHeartbeat error + errRegister error + errDeregister error +} + +var ( + errTest = errors.New("kaboom") + errNotFound = &fargoUnsuccessfulHTTPResponse{statusCode: 404, messagePrefix: "not found"} + loggerTest = log.NewNopLogger() + appNameTest = "go-kit" + appUpdateTest = &fargo.Application{ + Name: appNameTest, + Instances: []*fargo.Instance{instanceTest1, instanceTest2}, + } + instanceTest1 = &fargo.Instance{ + HostName: "serveregistrar1.acme.org", + Port: 8080, + App: appNameTest, + IPAddr: "192.168.0.1", + VipAddress: "192.168.0.1", + SecureVipAddress: "192.168.0.1", + HealthCheckUrl: "http://serveregistrar1.acme.org:8080/healthz", + StatusPageUrl: "http://serveregistrar1.acme.org:8080/status", + HomePageUrl: "http://serveregistrar1.acme.org:8080/", + Status: fargo.UP, + DataCenterInfo: fargo.DataCenterInfo{Name: fargo.MyOwn}, + LeaseInfo: fargo.LeaseInfo{RenewalIntervalInSecs: 1}, + } + instanceTest2 = &fargo.Instance{ + HostName: "serveregistrar2.acme.org", + Port: 8080, + App: appNameTest, + IPAddr: "192.168.0.2", + VipAddress: "192.168.0.2", + SecureVipAddress: "192.168.0.2", + HealthCheckUrl: "http://serveregistrar2.acme.org:8080/healthz", + StatusPageUrl: "http://serveregistrar2.acme.org:8080/status", + HomePageUrl: "http://serveregistrar2.acme.org:8080/", + Status: fargo.UP, + DataCenterInfo: fargo.DataCenterInfo{Name: fargo.MyOwn}, + } +) + +var _ fargoConnection = (*testConnection)(nil) + +func (c *testConnection) RegisterInstance(i *fargo.Instance) error { + if c.errRegister == nil { + for _, instance := range c.instances { + if reflect.DeepEqual(*instance, *i) { + return errors.New("already registered") + } + } + + c.instances = append(c.instances, i) + } + return c.errRegister +} + +func (c *testConnection) HeartBeatInstance(i *fargo.Instance) error { + return c.errHeartbeat +} + +func (c *testConnection) DeregisterInstance(i *fargo.Instance) error { + if c.errDeregister == nil { + var newInstances []*fargo.Instance + for _, instance := range c.instances { + if reflect.DeepEqual(*instance, *i) { + continue + } + newInstances = append(newInstances, instance) + } + if len(newInstances) == len(c.instances) { + return errors.New("not registered") + } + + c.instances = newInstances + } + return c.errDeregister +} + +func (c *testConnection) ReregisterInstance(ins *fargo.Instance) error { + return nil +} + +func (c *testConnection) ScheduleAppUpdates(name string, await bool, done <-chan struct{}) <-chan fargo.AppUpdate { + updatec := make(chan fargo.AppUpdate, 1) + updatec <- fargo.AppUpdate{App: c.application, Err: c.errApplication} + return updatec +} + +func (c *testConnection) GetApp(name string) (*fargo.Application, error) { + return &fargo.Application{Name: appNameTest, Instances: c.instances}, c.errInstances +}