From cb03da6888a0340f29175db3c534fb90a658cf90 Mon Sep 17 00:00:00 2001 From: Martin Baillie Date: Thu, 23 Mar 2017 17:52:05 +1100 Subject: [PATCH 1/5] Add Eureka service discovery implementation This commit adds a service discovery implementation for the Eureka registry, a component in Netflix's OSS suite. Eureka is a popular choice in JVM-based microservice architectures, particularly when used in conjunction with the Spring Cloud ecosystem. This implementation delegates integration to Fargo: the de facto Golang Eureka client. It allows the user to employ a Fargo connection as the foundational configuration item for Registrars and Subscribers. This should offer the user the most control of Fargo within the constraints of the Go-kit service discovery abstractions. --- circle.yml | 1 + docker-compose-integration.yml | 6 ++ sd/eureka/client.go | 83 +++++++++++++++++++ sd/eureka/client_test.go | 94 ++++++++++++++++++++++ sd/eureka/doc.go | 2 + sd/eureka/integration_test.go | 106 +++++++++++++++++++++++++ sd/eureka/registrar.go | 89 +++++++++++++++++++++ sd/eureka/registrar_test.go | 58 ++++++++++++++ sd/eureka/subscriber.go | 99 +++++++++++++++++++++++ sd/eureka/subscriber_test.go | 140 +++++++++++++++++++++++++++++++++ 10 files changed, 678 insertions(+) create mode 100644 sd/eureka/client.go create mode 100644 sd/eureka/client_test.go create mode 100644 sd/eureka/doc.go create mode 100644 sd/eureka/integration_test.go create mode 100644 sd/eureka/registrar.go create mode 100644 sd/eureka/registrar_test.go create mode 100644 sd/eureka/subscriber.go create mode 100644 sd/eureka/subscriber_test.go diff --git a/circle.yml b/circle.yml index 0c35bb29d..a5acd9cb2 100644 --- a/circle.yml +++ b/circle.yml @@ -24,3 +24,4 @@ test: ETCD_ADDR: http://localhost:2379 CONSUL_ADDR: localhost:8500 ZK_ADDR: localhost:2181 + EUREKA_ADDR: http://localhost:8761/eureka diff --git a/docker-compose-integration.yml b/docker-compose-integration.yml index f316a1376..287d97db0 100644 --- a/docker-compose-integration.yml +++ b/docker-compose-integration.yml @@ -14,3 +14,9 @@ services: image: zookeeper ports: - "2181:2181" + eureka: + image: springcloud/eureka + environment: + eureka.server.responseCacheUpdateIntervalMs: 1000 + ports: + - "8761:8761" diff --git a/sd/eureka/client.go b/sd/eureka/client.go new file mode 100644 index 000000000..7e6bbcd0d --- /dev/null +++ b/sd/eureka/client.go @@ -0,0 +1,83 @@ +package eureka + +import ( + stdeureka "github.com/hudl/fargo" + stdeurekalogging "github.com/op/go-logging" +) + +func init() { + // Quieten Fargo's own logging + stdeurekalogging.SetLevel(stdeurekalogging.ERROR, "fargo") +} + +// Client is a wrapper around the Eureka API. +type Client interface { + // Register an instance with Eureka. + Register(i *stdeureka.Instance) error + + // Deregister an instance from Eureka. + Deregister(i *stdeureka.Instance) error + + // Send an instance heartbeat to Eureka. + Heartbeat(i *stdeureka.Instance) error + + // Get all instances for an app in Eureka. + Instances(app string) ([]*stdeureka.Instance, error) + + // Receive scheduled updates about an app's instances in Eureka. + ScheduleUpdates(app string, quitc chan struct{}) <-chan stdeureka.AppUpdate +} + +type client struct { + connection *stdeureka.EurekaConnection +} + +// NewClient returns an implementation of the Client interface, wrapping a +// concrete connection to Eureka using the Fargo library. +// Taking in Fargo's own connection abstraction gives the user maximum +// freedom in regards to how that connection is configured. +func NewClient(ec *stdeureka.EurekaConnection) Client { + return &client{connection: ec} +} + +func (c *client) Register(i *stdeureka.Instance) error { + if c.instanceRegistered(i) { + // Already registered. Send a heartbeat instead. + return c.Heartbeat(i) + } + return c.connection.RegisterInstance(i) +} + +func (c *client) Deregister(i *stdeureka.Instance) error { + return c.connection.DeregisterInstance(i) +} + +func (c *client) Heartbeat(i *stdeureka.Instance) (err error) { + if err = c.connection.HeartBeatInstance(i); err != nil && c.instanceNotFoundErr(err) { + // Instance not registered. Register first before sending heartbeats. + return c.Register(i) + } + return err +} + +func (c *client) Instances(app string) ([]*stdeureka.Instance, error) { + stdApp, err := c.connection.GetApp(app) + if err != nil { + return nil, err + } + return stdApp.Instances, nil +} + +func (c *client) ScheduleUpdates(app string, quitc chan struct{}) <-chan stdeureka.AppUpdate { + return c.connection.ScheduleAppUpdates(app, false, quitc) +} + +func (c *client) instanceRegistered(i *stdeureka.Instance) bool { + _, err := c.connection.GetInstance(i.App, i.Id()) + return err == nil +} + +func (c *client) instanceNotFoundErr(err error) bool { + code, ok := stdeureka.HTTPResponseStatusCode(err) + return ok && code == 404 +} diff --git a/sd/eureka/client_test.go b/sd/eureka/client_test.go new file mode 100644 index 000000000..294bd8b9a --- /dev/null +++ b/sd/eureka/client_test.go @@ -0,0 +1,94 @@ +package eureka + +import ( + "errors" + "reflect" + + "github.com/go-kit/kit/log" + stdeureka "github.com/hudl/fargo" +) + +var ( + errTest = errors.New("kaboom") + loggerTest = log.NewNopLogger() + instanceTest1 = &stdeureka.Instance{ + HostName: "server1.acme.org", + Port: 8080, + App: "go-kit", + IPAddr: "192.168.0.1", + VipAddress: "192.168.0.1", + SecureVipAddress: "192.168.0.1", + HealthCheckUrl: "http://server1.acme.org:8080/healthz", + StatusPageUrl: "http://server1.acme.org:8080/status", + HomePageUrl: "http://server1.acme.org:8080/", + Status: stdeureka.UP, + DataCenterInfo: stdeureka.DataCenterInfo{Name: stdeureka.MyOwn}, + LeaseInfo: stdeureka.LeaseInfo{RenewalIntervalInSecs: 1}, + } + instanceTest2 = &stdeureka.Instance{ + HostName: "server2.acme.org", + Port: 8080, + App: "go-kit", + IPAddr: "192.168.0.2", + VipAddress: "192.168.0.2", + SecureVipAddress: "192.168.0.2", + HealthCheckUrl: "http://server2.acme.org:8080/healthz", + StatusPageUrl: "http://server2.acme.org:8080/status", + HomePageUrl: "http://server2.acme.org:8080/", + Status: stdeureka.UP, + DataCenterInfo: stdeureka.DataCenterInfo{Name: stdeureka.MyOwn}, + } + applicationTest = &stdeureka.Application{ + Name: "go-kit", + Instances: []*stdeureka.Instance{instanceTest1, instanceTest2}, + } +) + +type testClient struct { + instances []*stdeureka.Instance + application *stdeureka.Application + errInstances error + errApplication error + errHeartbeat error +} + +func (c *testClient) Register(i *stdeureka.Instance) error { + for _, instance := range c.instances { + if reflect.DeepEqual(*instance, *i) { + return errors.New("already registered") + } + } + + c.instances = append(c.instances, i) + return nil +} + +func (c *testClient) Deregister(i *stdeureka.Instance) error { + var newInstances []*stdeureka.Instance + for _, instance := range c.instances { + if reflect.DeepEqual(*instance, *i) { + continue + } + newInstances = append(newInstances, instance) + } + if len(newInstances) == len(c.instances) { + return errors.New("not registered") + } + + c.instances = newInstances + return nil +} + +func (c *testClient) Heartbeat(i *stdeureka.Instance) (err error) { + return c.errHeartbeat +} + +func (c *testClient) Instances(app string) ([]*stdeureka.Instance, error) { + return c.instances, c.errInstances +} + +func (c *testClient) ScheduleUpdates(service string, quitc chan struct{}) <-chan stdeureka.AppUpdate { + updatec := make(chan stdeureka.AppUpdate, 1) + updatec <- stdeureka.AppUpdate{App: c.application, Err: c.errApplication} + return updatec +} diff --git a/sd/eureka/doc.go b/sd/eureka/doc.go new file mode 100644 index 000000000..d41c352f7 --- /dev/null +++ b/sd/eureka/doc.go @@ -0,0 +1,2 @@ +// Package eureka provides subscriber and registrar implementations for Netflix OSS's Eureka +package eureka diff --git a/sd/eureka/integration_test.go b/sd/eureka/integration_test.go new file mode 100644 index 000000000..02b8b4146 --- /dev/null +++ b/sd/eureka/integration_test.go @@ -0,0 +1,106 @@ +// +build integration + +package eureka + +import ( + "io" + "os" + "testing" + "time" + + "github.com/go-kit/kit/endpoint" + "github.com/go-kit/kit/log" + stdeureka "github.com/hudl/fargo" +) + +// Package sd/eureka provides a wrapper around the Netflix Eureka service +// registry by way of the Fargo library. This test assumes the user has an +// instance of Eureka available at the address in the environment variable. +// Example `${EUREKA_ADDR}` format: http://localhost:8761/eureka +// +// NOTE: when starting a Eureka server for integration testing, ensure +// the response cache interval is reduced to one second. This can be +// achieved with the following Java argument: +// `-Deureka.server.responseCacheUpdateIntervalMs=1000` +func TestIntegration(t *testing.T) { + eurekaAddr := os.Getenv("EUREKA_ADDR") + if eurekaAddr == "" { + t.Skip("EUREKA_ADDR is not set") + } + + var client Client + { + var stdConfig stdeureka.Config + stdConfig.Eureka.ServiceUrls = []string{eurekaAddr} + stdConfig.Eureka.PollIntervalSeconds = 1 + + stdConnection := stdeureka.NewConnFromConfig(stdConfig) + client = NewClient(&stdConnection) + } + + logger := log.NewLogfmtLogger(os.Stderr) + logger = log.With(logger, "ts", log.DefaultTimestamp) + + // Register one instance. + registrar1 := NewRegistrar(client, instanceTest1, log.With(logger, "component", "registrar1")) + registrar1.Register() + defer registrar1.Deregister() + + // This should be enough time for the Eureka server response cache to update. + time.Sleep(time.Second) + + // Build a subscriber. + factory := func(instance string) (endpoint.Endpoint, io.Closer, error) { + t.Logf("factory invoked for %q", instance) + return endpoint.Nop, nil, nil + } + s := NewSubscriber( + client, + factory, + log.With(logger, "component", "subscriber"), + instanceTest1.App, + ) + defer s.Stop() + + // We should have one endpoint immediately after subscriber instantiation. + endpoints, err := s.Endpoints() + if err != nil { + t.Error(err) + } + if want, have := 1, len(endpoints); want != have { + t.Errorf("want %d, have %d", want, have) + } + + // Register a second instance + registrar2 := NewRegistrar(client, instanceTest2, log.With(logger, "component", "registrar2")) + registrar2.Register() + defer registrar2.Deregister() // In case of exceptional circumstances. + + // This should be enough time for a scheduled update assuming Eureka is + // configured with the properties mentioned in the function comments. + time.Sleep(2 * time.Second) + + // Now we should have two endpoints. + endpoints, err = s.Endpoints() + if err != nil { + t.Error(err) + } + if want, have := 2, len(endpoints); want != have { + t.Errorf("want %d, have %d", want, have) + } + + // Deregister the second instance. + registrar2.Deregister() + + // Wait for another scheduled update. + time.Sleep(2 * time.Second) + + // And then there was one. + endpoints, err = s.Endpoints() + if err != nil { + t.Error(err) + } + if want, have := 1, len(endpoints); want != have { + t.Errorf("want %d, have %d", want, have) + } +} diff --git a/sd/eureka/registrar.go b/sd/eureka/registrar.go new file mode 100644 index 000000000..5f6429187 --- /dev/null +++ b/sd/eureka/registrar.go @@ -0,0 +1,89 @@ +package eureka + +import ( + "fmt" + "sync" + "time" + + stdeureka "github.com/hudl/fargo" + + "github.com/go-kit/kit/log" +) + +// Registrar maintains service instance liveness information in Eureka. +type Registrar struct { + client Client + instance *stdeureka.Instance + logger log.Logger + + quitmtx sync.Mutex + quit chan bool +} + +// NewRegistrar returns an Eureka Registrar acting on behalf of the provided +// Fargo instance. +func NewRegistrar(client Client, i *stdeureka.Instance, l log.Logger) *Registrar { + return &Registrar{ + client: client, + instance: i, + logger: log.With(l, "service", i.App, "address", fmt.Sprintf("%s:%d", i.IPAddr, i.Port)), + } +} + +// Register implements sd.Registrar interface. +func (r *Registrar) Register() { + if err := r.client.Register(r.instance); err != nil { + r.logger.Log("err", err) + } else { + r.logger.Log("action", "register") + } + + if r.instance.LeaseInfo.RenewalIntervalInSecs > 0 { + // User has opted for heartbeat functionality in Eureka. + go r.loop() + } +} + +// Deregister implements sd.Registrar interface. +func (r *Registrar) Deregister() { + if err := r.client.Deregister(r.instance); err != nil { + r.logger.Log("err", err) + } else { + r.logger.Log("action", "deregister") + } + + r.quitmtx.Lock() + defer r.quitmtx.Unlock() + if r.quit != nil { + r.quit <- true + } +} + +func (r *Registrar) loop() { + r.quitmtx.Lock() + if r.quit != nil { + defer r.quitmtx.Unlock() + return // Already running. + } + r.quit = make(chan bool) + r.quitmtx.Unlock() + + tick := time.NewTicker(time.Duration(r.instance.LeaseInfo.RenewalIntervalInSecs) * time.Second) + defer tick.Stop() + for { + select { + case <-tick.C: + if err := r.client.Heartbeat(r.instance); err != nil { + r.logger.Log("err", err) + } + case <-r.quit: + r.quitmtx.Lock() + defer r.quitmtx.Unlock() + + close(r.quit) + r.quit = nil + + return + } + } +} diff --git a/sd/eureka/registrar_test.go b/sd/eureka/registrar_test.go new file mode 100644 index 000000000..9ae7f3d6d --- /dev/null +++ b/sd/eureka/registrar_test.go @@ -0,0 +1,58 @@ +package eureka + +import ( + "testing" + "time" + + stdeureka "github.com/hudl/fargo" +) + +func TestRegistrar(t *testing.T) { + client := &testClient{ + instances: []*stdeureka.Instance{}, + errHeartbeat: errTest, + } + + r := NewRegistrar(client, instanceTest1, loggerTest) + if want, have := 0, len(client.instances); want != have { + t.Errorf("want %d, have %d", want, have) + } + + // Not registered. + r.Deregister() + if want, have := 0, len(client.instances); want != have { + t.Errorf("want %d, have %d", want, have) + } + + // Register. + r.Register() + if want, have := 1, len(client.instances); want != have { + t.Errorf("want %d, have %d", want, have) + } + + // Deregister. + r.Deregister() + if want, have := 0, len(client.instances); want != have { + t.Errorf("want %d, have %d", want, have) + } + + // Already registered. + r.Register() + if want, have := 1, len(client.instances); want != have { + t.Errorf("want %d, have %d", want, have) + } + r.Register() + if want, have := 1, len(client.instances); want != have { + t.Errorf("want %d, have %d", want, have) + } + + // Wait for a heartbeat failure. + time.Sleep(time.Second) + if want, have := 1, len(client.instances); want != have { + t.Errorf("want %d, have %d", want, have) + } + r.Deregister() + if want, have := 0, len(client.instances); want != have { + t.Errorf("want %d, have %d", want, have) + } +} diff --git a/sd/eureka/subscriber.go b/sd/eureka/subscriber.go new file mode 100644 index 000000000..5873eabd3 --- /dev/null +++ b/sd/eureka/subscriber.go @@ -0,0 +1,99 @@ +package eureka + +import ( + "fmt" + + stdeureka "github.com/hudl/fargo" + + "github.com/go-kit/kit/endpoint" + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/sd" + "github.com/go-kit/kit/sd/cache" +) + +// Subscriber yields endpoints stored in the Eureka registry for the given app. +// Changes in that app are watched and will update the Subscriber endpoints. +type Subscriber struct { + client Client + cache *cache.Cache + logger log.Logger + app string + quitc chan struct{} +} + +var _ sd.Subscriber = &Subscriber{} + +// NewSubscriber returns a Eureka subscriber. It will start watching the given +// app string for changes, and update the endpoints accordingly. +func NewSubscriber(c Client, f sd.Factory, l log.Logger, app string) *Subscriber { + s := &Subscriber{ + client: c, + cache: cache.New(f, l), + app: app, + logger: l, + quitc: make(chan struct{}), + } + + instances, err := s.getInstances() + if err == nil { + s.logger.Log("app", s.app, "instances", len(instances)) + } else { + s.logger.Log("app", s.app, "msg", "failed to retrieve instances", "err", err) + } + + s.cache.Update(instances) + go s.loop() + return s +} + +func (s *Subscriber) getInstances() ([]string, error) { + stdInstances, err := s.client.Instances(s.app) + if err != nil { + return nil, err + } + return convertStdInstances(stdInstances), nil +} + +func (s *Subscriber) loop() { + updatec := s.client.ScheduleUpdates(s.app, s.quitc) + for { + select { + case <-s.quitc: + return + case u := <-updatec: + if u.Err != nil { + s.logger.Log("app", s.app, "msg", "failed to retrieve instances", "err", u.Err) + continue + } + + instances := convertStdApplication(u.App) + s.logger.Log("app", s.app, "instances", len(instances)) + s.cache.Update(instances) + } + } +} + +// Endpoints implements the Subscriber interface. +func (s *Subscriber) Endpoints() ([]endpoint.Endpoint, error) { + return s.cache.Endpoints(), nil +} + +// Stop terminates the Subscriber. +func (s *Subscriber) Stop() { + close(s.quitc) +} + +func convertStdApplication(stdApplication *stdeureka.Application) (instances []string) { + if stdApplication != nil { + instances = convertStdInstances(stdApplication.Instances) + } + return instances +} + +func convertStdInstances(stdInstances []*stdeureka.Instance) []string { + instances := make([]string, len(stdInstances)) + for i, stdInstance := range stdInstances { + instances[i] = fmt.Sprintf("%s:%d", stdInstance.IPAddr, stdInstance.Port) + } + return instances +} diff --git a/sd/eureka/subscriber_test.go b/sd/eureka/subscriber_test.go new file mode 100644 index 000000000..9ef726436 --- /dev/null +++ b/sd/eureka/subscriber_test.go @@ -0,0 +1,140 @@ +package eureka + +import ( + "io" + "testing" + "time" + + "github.com/go-kit/kit/endpoint" + stdeureka "github.com/hudl/fargo" +) + +func TestSubscriber(t *testing.T) { + factory := func(string) (endpoint.Endpoint, io.Closer, error) { + return endpoint.Nop, nil, nil + } + + client := &testClient{ + instances: []*stdeureka.Instance{instanceTest1}, + application: applicationTest, + errApplication: nil, + } + + s := NewSubscriber(client, factory, loggerTest, instanceTest1.App) + defer s.Stop() + + endpoints, err := s.Endpoints() + if err != nil { + t.Fatal(err) + } + + if want, have := 1, len(endpoints); want != have { + t.Errorf("want %d, have %d", want, have) + } +} + +func TestSubscriberScheduleUpdates(t *testing.T) { + factory := func(string) (endpoint.Endpoint, io.Closer, error) { + return endpoint.Nop, nil, nil + } + + client := &testClient{ + instances: []*stdeureka.Instance{instanceTest1}, + application: applicationTest, + errApplication: nil, + } + + s := NewSubscriber(client, factory, loggerTest, instanceTest1.App) + defer s.Stop() + + endpoints, _ := s.Endpoints() + if want, have := 1, len(endpoints); want != have { + t.Errorf("want %d, have %d", want, have) + } + + time.Sleep(50 * time.Millisecond) + + endpoints, _ = s.Endpoints() + if want, have := 2, len(endpoints); want != have { + t.Errorf("want %v, have %v", want, have) + } +} + +func TestBadFactory(t *testing.T) { + factory := func(string) (endpoint.Endpoint, io.Closer, error) { + return nil, nil, errTest + } + + client := &testClient{ + instances: []*stdeureka.Instance{instanceTest1}, + } + + s := NewSubscriber(client, factory, loggerTest, instanceTest1.App) + defer s.Stop() + + endpoints, err := s.Endpoints() + if err != nil { + t.Fatal(err) + } + + if want, have := 0, len(endpoints); want != have { + t.Errorf("want %d, have %d", want, have) + } +} + +func TestBadSubscriberInstances(t *testing.T) { + factory := func(string) (endpoint.Endpoint, io.Closer, error) { + return endpoint.Nop, nil, nil + } + + client := &testClient{ + errInstances: errTest, + application: applicationTest, + errApplication: nil, + } + + s := NewSubscriber(client, factory, loggerTest, instanceTest1.App) + defer s.Stop() + + endpoints, err := s.Endpoints() + if err != nil { + t.Fatal(err) + } + + if want, have := 0, len(endpoints); want != have { + t.Errorf("want %d, have %d", want, have) + } +} + +func TestBadSubscriberScheduleUpdates(t *testing.T) { + factory := func(string) (endpoint.Endpoint, io.Closer, error) { + return endpoint.Nop, nil, nil + } + + client := &testClient{ + instances: []*stdeureka.Instance{instanceTest1}, + application: applicationTest, + errApplication: errTest, + } + + s := NewSubscriber(client, factory, loggerTest, instanceTest1.App) + defer s.Stop() + + endpoints, err := s.Endpoints() + if err != nil { + t.Error(err) + } + if want, have := 1, len(endpoints); want != have { + t.Errorf("want %d, have %d", want, have) + } + + time.Sleep(50 * time.Millisecond) + + endpoints, err = s.Endpoints() + if err != nil { + t.Error(err) + } + if want, have := 1, len(endpoints); want != have { + t.Errorf("want %v, have %v", want, have) + } +} From 72a93ad974c824fb3142b5339da9e4247ad445ce Mon Sep 17 00:00:00 2001 From: Martin Baillie Date: Sun, 2 Apr 2017 11:36:06 +1000 Subject: [PATCH 2/5] Initial PR tidy-up based on feedback --- sd/eureka/client.go | 36 +++++++++++++----------------- sd/eureka/client_test.go | 41 ++++++++++++++++++----------------- sd/eureka/integration_test.go | 13 ++++++----- sd/eureka/registrar.go | 35 +++++++++--------------------- sd/eureka/registrar_test.go | 4 ++-- sd/eureka/subscriber.go | 28 ++++++++++++------------ sd/eureka/subscriber_test.go | 11 +++++----- 7 files changed, 75 insertions(+), 93 deletions(-) diff --git a/sd/eureka/client.go b/sd/eureka/client.go index 7e6bbcd0d..0bc48ecda 100644 --- a/sd/eureka/client.go +++ b/sd/eureka/client.go @@ -1,46 +1,40 @@ package eureka import ( - stdeureka "github.com/hudl/fargo" - stdeurekalogging "github.com/op/go-logging" + fargo "github.com/hudl/fargo" ) -func init() { - // Quieten Fargo's own logging - stdeurekalogging.SetLevel(stdeurekalogging.ERROR, "fargo") -} - // Client is a wrapper around the Eureka API. type Client interface { // Register an instance with Eureka. - Register(i *stdeureka.Instance) error + Register(i *fargo.Instance) error // Deregister an instance from Eureka. - Deregister(i *stdeureka.Instance) error + Deregister(i *fargo.Instance) error // Send an instance heartbeat to Eureka. - Heartbeat(i *stdeureka.Instance) error + Heartbeat(i *fargo.Instance) error // Get all instances for an app in Eureka. - Instances(app string) ([]*stdeureka.Instance, error) + Instances(app string) ([]*fargo.Instance, error) // Receive scheduled updates about an app's instances in Eureka. - ScheduleUpdates(app string, quitc chan struct{}) <-chan stdeureka.AppUpdate + ScheduleUpdates(app string, quitc chan struct{}) <-chan fargo.AppUpdate } type client struct { - connection *stdeureka.EurekaConnection + connection *fargo.EurekaConnection } // NewClient returns an implementation of the Client interface, wrapping a // concrete connection to Eureka using the Fargo library. // Taking in Fargo's own connection abstraction gives the user maximum // freedom in regards to how that connection is configured. -func NewClient(ec *stdeureka.EurekaConnection) Client { +func NewClient(ec *fargo.EurekaConnection) Client { return &client{connection: ec} } -func (c *client) Register(i *stdeureka.Instance) error { +func (c *client) Register(i *fargo.Instance) error { if c.instanceRegistered(i) { // Already registered. Send a heartbeat instead. return c.Heartbeat(i) @@ -48,11 +42,11 @@ func (c *client) Register(i *stdeureka.Instance) error { return c.connection.RegisterInstance(i) } -func (c *client) Deregister(i *stdeureka.Instance) error { +func (c *client) Deregister(i *fargo.Instance) error { return c.connection.DeregisterInstance(i) } -func (c *client) Heartbeat(i *stdeureka.Instance) (err error) { +func (c *client) Heartbeat(i *fargo.Instance) (err error) { if err = c.connection.HeartBeatInstance(i); err != nil && c.instanceNotFoundErr(err) { // Instance not registered. Register first before sending heartbeats. return c.Register(i) @@ -60,7 +54,7 @@ func (c *client) Heartbeat(i *stdeureka.Instance) (err error) { return err } -func (c *client) Instances(app string) ([]*stdeureka.Instance, error) { +func (c *client) Instances(app string) ([]*fargo.Instance, error) { stdApp, err := c.connection.GetApp(app) if err != nil { return nil, err @@ -68,16 +62,16 @@ func (c *client) Instances(app string) ([]*stdeureka.Instance, error) { return stdApp.Instances, nil } -func (c *client) ScheduleUpdates(app string, quitc chan struct{}) <-chan stdeureka.AppUpdate { +func (c *client) ScheduleUpdates(app string, quitc chan struct{}) <-chan fargo.AppUpdate { return c.connection.ScheduleAppUpdates(app, false, quitc) } -func (c *client) instanceRegistered(i *stdeureka.Instance) bool { +func (c *client) instanceRegistered(i *fargo.Instance) bool { _, err := c.connection.GetInstance(i.App, i.Id()) return err == nil } func (c *client) instanceNotFoundErr(err error) bool { - code, ok := stdeureka.HTTPResponseStatusCode(err) + code, ok := fargo.HTTPResponseStatusCode(err) return ok && code == 404 } diff --git a/sd/eureka/client_test.go b/sd/eureka/client_test.go index 294bd8b9a..bc5c77d9e 100644 --- a/sd/eureka/client_test.go +++ b/sd/eureka/client_test.go @@ -4,14 +4,15 @@ import ( "errors" "reflect" + fargo "github.com/hudl/fargo" + "github.com/go-kit/kit/log" - stdeureka "github.com/hudl/fargo" ) var ( errTest = errors.New("kaboom") loggerTest = log.NewNopLogger() - instanceTest1 = &stdeureka.Instance{ + instanceTest1 = &fargo.Instance{ HostName: "server1.acme.org", Port: 8080, App: "go-kit", @@ -21,11 +22,11 @@ var ( HealthCheckUrl: "http://server1.acme.org:8080/healthz", StatusPageUrl: "http://server1.acme.org:8080/status", HomePageUrl: "http://server1.acme.org:8080/", - Status: stdeureka.UP, - DataCenterInfo: stdeureka.DataCenterInfo{Name: stdeureka.MyOwn}, - LeaseInfo: stdeureka.LeaseInfo{RenewalIntervalInSecs: 1}, + Status: fargo.UP, + DataCenterInfo: fargo.DataCenterInfo{Name: fargo.MyOwn}, + LeaseInfo: fargo.LeaseInfo{RenewalIntervalInSecs: 1}, } - instanceTest2 = &stdeureka.Instance{ + instanceTest2 = &fargo.Instance{ HostName: "server2.acme.org", Port: 8080, App: "go-kit", @@ -35,24 +36,24 @@ var ( HealthCheckUrl: "http://server2.acme.org:8080/healthz", StatusPageUrl: "http://server2.acme.org:8080/status", HomePageUrl: "http://server2.acme.org:8080/", - Status: stdeureka.UP, - DataCenterInfo: stdeureka.DataCenterInfo{Name: stdeureka.MyOwn}, + Status: fargo.UP, + DataCenterInfo: fargo.DataCenterInfo{Name: fargo.MyOwn}, } - applicationTest = &stdeureka.Application{ + applicationTest = &fargo.Application{ Name: "go-kit", - Instances: []*stdeureka.Instance{instanceTest1, instanceTest2}, + Instances: []*fargo.Instance{instanceTest1, instanceTest2}, } ) type testClient struct { - instances []*stdeureka.Instance - application *stdeureka.Application + instances []*fargo.Instance + application *fargo.Application errInstances error errApplication error errHeartbeat error } -func (c *testClient) Register(i *stdeureka.Instance) error { +func (c *testClient) Register(i *fargo.Instance) error { for _, instance := range c.instances { if reflect.DeepEqual(*instance, *i) { return errors.New("already registered") @@ -63,8 +64,8 @@ func (c *testClient) Register(i *stdeureka.Instance) error { return nil } -func (c *testClient) Deregister(i *stdeureka.Instance) error { - var newInstances []*stdeureka.Instance +func (c *testClient) Deregister(i *fargo.Instance) error { + var newInstances []*fargo.Instance for _, instance := range c.instances { if reflect.DeepEqual(*instance, *i) { continue @@ -79,16 +80,16 @@ func (c *testClient) Deregister(i *stdeureka.Instance) error { return nil } -func (c *testClient) Heartbeat(i *stdeureka.Instance) (err error) { +func (c *testClient) Heartbeat(i *fargo.Instance) (err error) { return c.errHeartbeat } -func (c *testClient) Instances(app string) ([]*stdeureka.Instance, error) { +func (c *testClient) Instances(app string) ([]*fargo.Instance, error) { return c.instances, c.errInstances } -func (c *testClient) ScheduleUpdates(service string, quitc chan struct{}) <-chan stdeureka.AppUpdate { - updatec := make(chan stdeureka.AppUpdate, 1) - updatec <- stdeureka.AppUpdate{App: c.application, Err: c.errApplication} +func (c *testClient) ScheduleUpdates(service string, quitc chan struct{}) <-chan fargo.AppUpdate { + updatec := make(chan fargo.AppUpdate, 1) + updatec <- fargo.AppUpdate{App: c.application, Err: c.errApplication} return updatec } diff --git a/sd/eureka/integration_test.go b/sd/eureka/integration_test.go index 02b8b4146..349fc2e20 100644 --- a/sd/eureka/integration_test.go +++ b/sd/eureka/integration_test.go @@ -8,9 +8,10 @@ import ( "testing" "time" + fargo "github.com/hudl/fargo" + "github.com/go-kit/kit/endpoint" "github.com/go-kit/kit/log" - stdeureka "github.com/hudl/fargo" ) // Package sd/eureka provides a wrapper around the Netflix Eureka service @@ -30,12 +31,12 @@ func TestIntegration(t *testing.T) { var client Client { - var stdConfig stdeureka.Config - stdConfig.Eureka.ServiceUrls = []string{eurekaAddr} - stdConfig.Eureka.PollIntervalSeconds = 1 + var fargoConfig fargo.Config + fargoConfig.Eureka.ServiceUrls = []string{eurekaAddr} + fargoConfig.Eureka.PollIntervalSeconds = 1 - stdConnection := stdeureka.NewConnFromConfig(stdConfig) - client = NewClient(&stdConnection) + fargoConnection := fargo.NewConnFromConfig(fargoConfig) + client = NewClient(&fargoConnection) } logger := log.NewLogfmtLogger(os.Stderr) diff --git a/sd/eureka/registrar.go b/sd/eureka/registrar.go index 5f6429187..f62a48327 100644 --- a/sd/eureka/registrar.go +++ b/sd/eureka/registrar.go @@ -2,10 +2,9 @@ package eureka import ( "fmt" - "sync" "time" - stdeureka "github.com/hudl/fargo" + fargo "github.com/hudl/fargo" "github.com/go-kit/kit/log" ) @@ -13,20 +12,18 @@ import ( // Registrar maintains service instance liveness information in Eureka. type Registrar struct { client Client - instance *stdeureka.Instance + instance *fargo.Instance logger log.Logger - - quitmtx sync.Mutex - quit chan bool + quit chan bool } // NewRegistrar returns an Eureka Registrar acting on behalf of the provided // Fargo instance. -func NewRegistrar(client Client, i *stdeureka.Instance, l log.Logger) *Registrar { +func NewRegistrar(client Client, i *fargo.Instance, logger log.Logger) *Registrar { return &Registrar{ client: client, instance: i, - logger: log.With(l, "service", i.App, "address", fmt.Sprintf("%s:%d", i.IPAddr, i.Port)), + logger: log.With(logger, "service", i.App, "address", fmt.Sprintf("%s:%d", i.IPAddr, i.Port)), } } @@ -40,7 +37,10 @@ func (r *Registrar) Register() { if r.instance.LeaseInfo.RenewalIntervalInSecs > 0 { // User has opted for heartbeat functionality in Eureka. - go r.loop() + if r.quit == nil { + r.quit = make(chan bool) + go r.loop() + } } } @@ -52,22 +52,13 @@ func (r *Registrar) Deregister() { r.logger.Log("action", "deregister") } - r.quitmtx.Lock() - defer r.quitmtx.Unlock() if r.quit != nil { r.quit <- true + r.quit = nil } } func (r *Registrar) loop() { - r.quitmtx.Lock() - if r.quit != nil { - defer r.quitmtx.Unlock() - return // Already running. - } - r.quit = make(chan bool) - r.quitmtx.Unlock() - tick := time.NewTicker(time.Duration(r.instance.LeaseInfo.RenewalIntervalInSecs) * time.Second) defer tick.Stop() for { @@ -77,12 +68,6 @@ func (r *Registrar) loop() { r.logger.Log("err", err) } case <-r.quit: - r.quitmtx.Lock() - defer r.quitmtx.Unlock() - - close(r.quit) - r.quit = nil - return } } diff --git a/sd/eureka/registrar_test.go b/sd/eureka/registrar_test.go index 9ae7f3d6d..4f9e3e019 100644 --- a/sd/eureka/registrar_test.go +++ b/sd/eureka/registrar_test.go @@ -4,12 +4,12 @@ import ( "testing" "time" - stdeureka "github.com/hudl/fargo" + fargo "github.com/hudl/fargo" ) func TestRegistrar(t *testing.T) { client := &testClient{ - instances: []*stdeureka.Instance{}, + instances: []*fargo.Instance{}, errHeartbeat: errTest, } diff --git a/sd/eureka/subscriber.go b/sd/eureka/subscriber.go index 5873eabd3..25f32b43e 100644 --- a/sd/eureka/subscriber.go +++ b/sd/eureka/subscriber.go @@ -3,7 +3,7 @@ package eureka import ( "fmt" - stdeureka "github.com/hudl/fargo" + fargo "github.com/hudl/fargo" "github.com/go-kit/kit/endpoint" "github.com/go-kit/kit/log" @@ -25,12 +25,12 @@ var _ sd.Subscriber = &Subscriber{} // NewSubscriber returns a Eureka subscriber. It will start watching the given // app string for changes, and update the endpoints accordingly. -func NewSubscriber(c Client, f sd.Factory, l log.Logger, app string) *Subscriber { +func NewSubscriber(c Client, factory sd.Factory, logger log.Logger, app string) *Subscriber { s := &Subscriber{ client: c, - cache: cache.New(f, l), + cache: cache.New(factory, logger), app: app, - logger: l, + logger: logger, quitc: make(chan struct{}), } @@ -47,11 +47,11 @@ func NewSubscriber(c Client, f sd.Factory, l log.Logger, app string) *Subscriber } func (s *Subscriber) getInstances() ([]string, error) { - stdInstances, err := s.client.Instances(s.app) + fargoInstances, err := s.client.Instances(s.app) if err != nil { return nil, err } - return convertStdInstances(stdInstances), nil + return convertFargoInstances(fargoInstances), nil } func (s *Subscriber) loop() { @@ -66,7 +66,7 @@ func (s *Subscriber) loop() { continue } - instances := convertStdApplication(u.App) + instances := convertFargoApplication(u.App) s.logger.Log("app", s.app, "instances", len(instances)) s.cache.Update(instances) } @@ -83,17 +83,17 @@ func (s *Subscriber) Stop() { close(s.quitc) } -func convertStdApplication(stdApplication *stdeureka.Application) (instances []string) { - if stdApplication != nil { - instances = convertStdInstances(stdApplication.Instances) +func convertFargoApplication(fargoApplication *fargo.Application) (instances []string) { + if fargoApplication != nil { + instances = convertFargoInstances(fargoApplication.Instances) } return instances } -func convertStdInstances(stdInstances []*stdeureka.Instance) []string { - instances := make([]string, len(stdInstances)) - for i, stdInstance := range stdInstances { - instances[i] = fmt.Sprintf("%s:%d", stdInstance.IPAddr, stdInstance.Port) +func convertFargoInstances(fargoInstances []*fargo.Instance) []string { + instances := make([]string, len(fargoInstances)) + for i, fargoInstance := range fargoInstances { + instances[i] = fmt.Sprintf("%s:%d", fargoInstance.IPAddr, fargoInstance.Port) } return instances } diff --git a/sd/eureka/subscriber_test.go b/sd/eureka/subscriber_test.go index 9ef726436..3a0403dfd 100644 --- a/sd/eureka/subscriber_test.go +++ b/sd/eureka/subscriber_test.go @@ -5,8 +5,9 @@ import ( "testing" "time" + fargo "github.com/hudl/fargo" + "github.com/go-kit/kit/endpoint" - stdeureka "github.com/hudl/fargo" ) func TestSubscriber(t *testing.T) { @@ -15,7 +16,7 @@ func TestSubscriber(t *testing.T) { } client := &testClient{ - instances: []*stdeureka.Instance{instanceTest1}, + instances: []*fargo.Instance{instanceTest1}, application: applicationTest, errApplication: nil, } @@ -39,7 +40,7 @@ func TestSubscriberScheduleUpdates(t *testing.T) { } client := &testClient{ - instances: []*stdeureka.Instance{instanceTest1}, + instances: []*fargo.Instance{instanceTest1}, application: applicationTest, errApplication: nil, } @@ -66,7 +67,7 @@ func TestBadFactory(t *testing.T) { } client := &testClient{ - instances: []*stdeureka.Instance{instanceTest1}, + instances: []*fargo.Instance{instanceTest1}, } s := NewSubscriber(client, factory, loggerTest, instanceTest1.App) @@ -112,7 +113,7 @@ func TestBadSubscriberScheduleUpdates(t *testing.T) { } client := &testClient{ - instances: []*stdeureka.Instance{instanceTest1}, + instances: []*fargo.Instance{instanceTest1}, application: applicationTest, errApplication: errTest, } From f5b8fe6f837a8da24d4e318388ae12c6851ed314 Mon Sep 17 00:00:00 2001 From: Martin Baillie Date: Sun, 2 Apr 2017 19:00:22 +1000 Subject: [PATCH 3/5] Removing redundant alias and reintroducing quitmtx --- sd/eureka/client.go | 2 +- sd/eureka/client_test.go | 2 +- sd/eureka/integration_test.go | 2 +- sd/eureka/registrar.go | 8 +++++++- sd/eureka/registrar_test.go | 2 +- sd/eureka/subscriber.go | 2 +- sd/eureka/subscriber_test.go | 2 +- 7 files changed, 13 insertions(+), 7 deletions(-) diff --git a/sd/eureka/client.go b/sd/eureka/client.go index 0bc48ecda..deeb4979e 100644 --- a/sd/eureka/client.go +++ b/sd/eureka/client.go @@ -1,7 +1,7 @@ package eureka import ( - fargo "github.com/hudl/fargo" + "github.com/hudl/fargo" ) // Client is a wrapper around the Eureka API. diff --git a/sd/eureka/client_test.go b/sd/eureka/client_test.go index bc5c77d9e..4df5eb272 100644 --- a/sd/eureka/client_test.go +++ b/sd/eureka/client_test.go @@ -4,7 +4,7 @@ import ( "errors" "reflect" - fargo "github.com/hudl/fargo" + "github.com/hudl/fargo" "github.com/go-kit/kit/log" ) diff --git a/sd/eureka/integration_test.go b/sd/eureka/integration_test.go index 349fc2e20..ff998fa4d 100644 --- a/sd/eureka/integration_test.go +++ b/sd/eureka/integration_test.go @@ -8,7 +8,7 @@ import ( "testing" "time" - fargo "github.com/hudl/fargo" + "github.com/hudl/fargo" "github.com/go-kit/kit/endpoint" "github.com/go-kit/kit/log" diff --git a/sd/eureka/registrar.go b/sd/eureka/registrar.go index f62a48327..a69ca7ef8 100644 --- a/sd/eureka/registrar.go +++ b/sd/eureka/registrar.go @@ -2,9 +2,10 @@ package eureka import ( "fmt" + "sync" "time" - fargo "github.com/hudl/fargo" + "github.com/hudl/fargo" "github.com/go-kit/kit/log" ) @@ -15,6 +16,7 @@ type Registrar struct { instance *fargo.Instance logger log.Logger quit chan bool + quitmtx sync.Mutex } // NewRegistrar returns an Eureka Registrar acting on behalf of the provided @@ -37,6 +39,8 @@ func (r *Registrar) Register() { if r.instance.LeaseInfo.RenewalIntervalInSecs > 0 { // User has opted for heartbeat functionality in Eureka. + r.quitmtx.Lock() + defer r.quitmtx.Unlock() if r.quit == nil { r.quit = make(chan bool) go r.loop() @@ -52,6 +56,8 @@ func (r *Registrar) Deregister() { r.logger.Log("action", "deregister") } + r.quitmtx.Lock() + defer r.quitmtx.Unlock() if r.quit != nil { r.quit <- true r.quit = nil diff --git a/sd/eureka/registrar_test.go b/sd/eureka/registrar_test.go index 4f9e3e019..eee26f377 100644 --- a/sd/eureka/registrar_test.go +++ b/sd/eureka/registrar_test.go @@ -4,7 +4,7 @@ import ( "testing" "time" - fargo "github.com/hudl/fargo" + "github.com/hudl/fargo" ) func TestRegistrar(t *testing.T) { diff --git a/sd/eureka/subscriber.go b/sd/eureka/subscriber.go index 25f32b43e..278835467 100644 --- a/sd/eureka/subscriber.go +++ b/sd/eureka/subscriber.go @@ -3,7 +3,7 @@ package eureka import ( "fmt" - fargo "github.com/hudl/fargo" + "github.com/hudl/fargo" "github.com/go-kit/kit/endpoint" "github.com/go-kit/kit/log" diff --git a/sd/eureka/subscriber_test.go b/sd/eureka/subscriber_test.go index 3a0403dfd..9c748430e 100644 --- a/sd/eureka/subscriber_test.go +++ b/sd/eureka/subscriber_test.go @@ -5,7 +5,7 @@ import ( "testing" "time" - fargo "github.com/hudl/fargo" + "github.com/hudl/fargo" "github.com/go-kit/kit/endpoint" ) From 2a823da08cc4810caf427bb690ffde782498172a Mon Sep 17 00:00:00 2001 From: Martin Baillie Date: Mon, 3 Apr 2017 21:32:56 +1000 Subject: [PATCH 4/5] Ditching mutexes for a waitgroup --- sd/eureka/registrar.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/sd/eureka/registrar.go b/sd/eureka/registrar.go index a69ca7ef8..1a14d3d4c 100644 --- a/sd/eureka/registrar.go +++ b/sd/eureka/registrar.go @@ -15,8 +15,8 @@ type Registrar struct { client Client instance *fargo.Instance logger log.Logger - quit chan bool - quitmtx sync.Mutex + quit chan struct{} + wg sync.WaitGroup } // NewRegistrar returns an Eureka Registrar acting on behalf of the provided @@ -39,10 +39,9 @@ func (r *Registrar) Register() { if r.instance.LeaseInfo.RenewalIntervalInSecs > 0 { // User has opted for heartbeat functionality in Eureka. - r.quitmtx.Lock() - defer r.quitmtx.Unlock() if r.quit == nil { - r.quit = make(chan bool) + r.quit = make(chan struct{}) + r.wg.Add(1) go r.loop() } } @@ -56,10 +55,9 @@ func (r *Registrar) Deregister() { r.logger.Log("action", "deregister") } - r.quitmtx.Lock() - defer r.quitmtx.Unlock() if r.quit != nil { - r.quit <- true + close(r.quit) + r.wg.Wait() r.quit = nil } } @@ -67,6 +65,8 @@ func (r *Registrar) Deregister() { func (r *Registrar) loop() { tick := time.NewTicker(time.Duration(r.instance.LeaseInfo.RenewalIntervalInSecs) * time.Second) defer tick.Stop() + defer r.wg.Done() + for { select { case <-tick.C: From 443f6ead51a91ddf48136543c1878aedb3efb2fd Mon Sep 17 00:00:00 2001 From: Martin Baillie Date: Mon, 10 Apr 2017 20:29:41 +1000 Subject: [PATCH 5/5] Refactoring without client intermediary --- sd/eureka/client.go | 77 ---------------------- sd/eureka/client_test.go | 95 -------------------------- sd/eureka/integration_test.go | 29 ++++---- sd/eureka/registrar.go | 121 +++++++++++++++++++++++----------- sd/eureka/registrar_test.go | 88 ++++++++++++++++++------- sd/eureka/subscriber.go | 101 +++++++++++++++------------- sd/eureka/subscriber_test.go | 57 ++++++++-------- sd/eureka/util_test.go | 108 ++++++++++++++++++++++++++++++ 8 files changed, 356 insertions(+), 320 deletions(-) delete mode 100644 sd/eureka/client.go delete mode 100644 sd/eureka/client_test.go create mode 100644 sd/eureka/util_test.go diff --git a/sd/eureka/client.go b/sd/eureka/client.go deleted file mode 100644 index deeb4979e..000000000 --- a/sd/eureka/client.go +++ /dev/null @@ -1,77 +0,0 @@ -package eureka - -import ( - "github.com/hudl/fargo" -) - -// Client is a wrapper around the Eureka API. -type Client interface { - // Register an instance with Eureka. - Register(i *fargo.Instance) error - - // Deregister an instance from Eureka. - Deregister(i *fargo.Instance) error - - // Send an instance heartbeat to Eureka. - Heartbeat(i *fargo.Instance) error - - // Get all instances for an app in Eureka. - Instances(app string) ([]*fargo.Instance, error) - - // Receive scheduled updates about an app's instances in Eureka. - ScheduleUpdates(app string, quitc chan struct{}) <-chan fargo.AppUpdate -} - -type client struct { - connection *fargo.EurekaConnection -} - -// NewClient returns an implementation of the Client interface, wrapping a -// concrete connection to Eureka using the Fargo library. -// Taking in Fargo's own connection abstraction gives the user maximum -// freedom in regards to how that connection is configured. -func NewClient(ec *fargo.EurekaConnection) Client { - return &client{connection: ec} -} - -func (c *client) Register(i *fargo.Instance) error { - if c.instanceRegistered(i) { - // Already registered. Send a heartbeat instead. - return c.Heartbeat(i) - } - return c.connection.RegisterInstance(i) -} - -func (c *client) Deregister(i *fargo.Instance) error { - return c.connection.DeregisterInstance(i) -} - -func (c *client) Heartbeat(i *fargo.Instance) (err error) { - if err = c.connection.HeartBeatInstance(i); err != nil && c.instanceNotFoundErr(err) { - // Instance not registered. Register first before sending heartbeats. - return c.Register(i) - } - return err -} - -func (c *client) Instances(app string) ([]*fargo.Instance, error) { - stdApp, err := c.connection.GetApp(app) - if err != nil { - return nil, err - } - return stdApp.Instances, nil -} - -func (c *client) ScheduleUpdates(app string, quitc chan struct{}) <-chan fargo.AppUpdate { - return c.connection.ScheduleAppUpdates(app, false, quitc) -} - -func (c *client) instanceRegistered(i *fargo.Instance) bool { - _, err := c.connection.GetInstance(i.App, i.Id()) - return err == nil -} - -func (c *client) instanceNotFoundErr(err error) bool { - code, ok := fargo.HTTPResponseStatusCode(err) - return ok && code == 404 -} diff --git a/sd/eureka/client_test.go b/sd/eureka/client_test.go deleted file mode 100644 index 4df5eb272..000000000 --- a/sd/eureka/client_test.go +++ /dev/null @@ -1,95 +0,0 @@ -package eureka - -import ( - "errors" - "reflect" - - "github.com/hudl/fargo" - - "github.com/go-kit/kit/log" -) - -var ( - errTest = errors.New("kaboom") - loggerTest = log.NewNopLogger() - instanceTest1 = &fargo.Instance{ - HostName: "server1.acme.org", - Port: 8080, - App: "go-kit", - IPAddr: "192.168.0.1", - VipAddress: "192.168.0.1", - SecureVipAddress: "192.168.0.1", - HealthCheckUrl: "http://server1.acme.org:8080/healthz", - StatusPageUrl: "http://server1.acme.org:8080/status", - HomePageUrl: "http://server1.acme.org:8080/", - Status: fargo.UP, - DataCenterInfo: fargo.DataCenterInfo{Name: fargo.MyOwn}, - LeaseInfo: fargo.LeaseInfo{RenewalIntervalInSecs: 1}, - } - instanceTest2 = &fargo.Instance{ - HostName: "server2.acme.org", - Port: 8080, - App: "go-kit", - IPAddr: "192.168.0.2", - VipAddress: "192.168.0.2", - SecureVipAddress: "192.168.0.2", - HealthCheckUrl: "http://server2.acme.org:8080/healthz", - StatusPageUrl: "http://server2.acme.org:8080/status", - HomePageUrl: "http://server2.acme.org:8080/", - Status: fargo.UP, - DataCenterInfo: fargo.DataCenterInfo{Name: fargo.MyOwn}, - } - applicationTest = &fargo.Application{ - Name: "go-kit", - Instances: []*fargo.Instance{instanceTest1, instanceTest2}, - } -) - -type testClient struct { - instances []*fargo.Instance - application *fargo.Application - errInstances error - errApplication error - errHeartbeat error -} - -func (c *testClient) Register(i *fargo.Instance) error { - for _, instance := range c.instances { - if reflect.DeepEqual(*instance, *i) { - return errors.New("already registered") - } - } - - c.instances = append(c.instances, i) - return nil -} - -func (c *testClient) Deregister(i *fargo.Instance) error { - var newInstances []*fargo.Instance - for _, instance := range c.instances { - if reflect.DeepEqual(*instance, *i) { - continue - } - newInstances = append(newInstances, instance) - } - if len(newInstances) == len(c.instances) { - return errors.New("not registered") - } - - c.instances = newInstances - return nil -} - -func (c *testClient) Heartbeat(i *fargo.Instance) (err error) { - return c.errHeartbeat -} - -func (c *testClient) Instances(app string) ([]*fargo.Instance, error) { - return c.instances, c.errInstances -} - -func (c *testClient) ScheduleUpdates(service string, quitc chan struct{}) <-chan fargo.AppUpdate { - updatec := make(chan fargo.AppUpdate, 1) - updatec <- fargo.AppUpdate{App: c.application, Err: c.errApplication} - return updatec -} diff --git a/sd/eureka/integration_test.go b/sd/eureka/integration_test.go index ff998fa4d..b519f70bd 100644 --- a/sd/eureka/integration_test.go +++ b/sd/eureka/integration_test.go @@ -29,37 +29,36 @@ func TestIntegration(t *testing.T) { t.Skip("EUREKA_ADDR is not set") } - var client Client - { - var fargoConfig fargo.Config - fargoConfig.Eureka.ServiceUrls = []string{eurekaAddr} - fargoConfig.Eureka.PollIntervalSeconds = 1 - - fargoConnection := fargo.NewConnFromConfig(fargoConfig) - client = NewClient(&fargoConnection) - } - logger := log.NewLogfmtLogger(os.Stderr) logger = log.With(logger, "ts", log.DefaultTimestamp) + var fargoConfig fargo.Config + // Target Eureka server(s). + fargoConfig.Eureka.ServiceUrls = []string{eurekaAddr} + // How often the subscriber should poll for updates. + fargoConfig.Eureka.PollIntervalSeconds = 1 + + // Create a Fargo connection and a Eureka registrar. + fargoConnection := fargo.NewConnFromConfig(fargoConfig) + registrar1 := NewRegistrar(&fargoConnection, instanceTest1, log.With(logger, "component", "registrar1")) + // Register one instance. - registrar1 := NewRegistrar(client, instanceTest1, log.With(logger, "component", "registrar1")) registrar1.Register() defer registrar1.Deregister() // This should be enough time for the Eureka server response cache to update. time.Sleep(time.Second) - // Build a subscriber. + // Build a Eureka subscriber. factory := func(instance string) (endpoint.Endpoint, io.Closer, error) { t.Logf("factory invoked for %q", instance) return endpoint.Nop, nil, nil } s := NewSubscriber( - client, + &fargoConnection, + appNameTest, factory, log.With(logger, "component", "subscriber"), - instanceTest1.App, ) defer s.Stop() @@ -73,7 +72,7 @@ func TestIntegration(t *testing.T) { } // Register a second instance - registrar2 := NewRegistrar(client, instanceTest2, log.With(logger, "component", "registrar2")) + registrar2 := NewRegistrar(&fargoConnection, instanceTest2, log.With(logger, "component", "registrar2")) registrar2.Register() defer registrar2.Deregister() // In case of exceptional circumstances. diff --git a/sd/eureka/registrar.go b/sd/eureka/registrar.go index 1a14d3d4c..99fef7c5d 100644 --- a/sd/eureka/registrar.go +++ b/sd/eureka/registrar.go @@ -2,79 +2,126 @@ package eureka import ( "fmt" + "net/http" "sync" "time" "github.com/hudl/fargo" "github.com/go-kit/kit/log" + "github.com/go-kit/kit/sd" ) +// Matches official Netflix Java client default. +const defaultRenewalInterval = 30 * time.Second + +// The methods of fargo.Connection used in this package. +type fargoConnection interface { + RegisterInstance(instance *fargo.Instance) error + DeregisterInstance(instance *fargo.Instance) error + ReregisterInstance(instance *fargo.Instance) error + HeartBeatInstance(instance *fargo.Instance) error + ScheduleAppUpdates(name string, await bool, done <-chan struct{}) <-chan fargo.AppUpdate + GetApp(name string) (*fargo.Application, error) +} + +type fargoUnsuccessfulHTTPResponse struct { + statusCode int + messagePrefix string +} + // Registrar maintains service instance liveness information in Eureka. type Registrar struct { - client Client + conn fargoConnection instance *fargo.Instance logger log.Logger - quit chan struct{} - wg sync.WaitGroup + quitc chan chan struct{} + sync.Mutex } +var _ sd.Registrar = (*Registrar)(nil) + // NewRegistrar returns an Eureka Registrar acting on behalf of the provided -// Fargo instance. -func NewRegistrar(client Client, i *fargo.Instance, logger log.Logger) *Registrar { +// Fargo connection and instance. See the integration test for usage examples. +func NewRegistrar(conn fargoConnection, instance *fargo.Instance, logger log.Logger) *Registrar { return &Registrar{ - client: client, - instance: i, - logger: log.With(logger, "service", i.App, "address", fmt.Sprintf("%s:%d", i.IPAddr, i.Port)), + conn: conn, + instance: instance, + logger: log.With(logger, "service", instance.App, "address", fmt.Sprintf("%s:%d", instance.IPAddr, instance.Port)), } } -// Register implements sd.Registrar interface. +// Register implements sd.Registrar. func (r *Registrar) Register() { - if err := r.client.Register(r.instance); err != nil { - r.logger.Log("err", err) - } else { - r.logger.Log("action", "register") + r.Lock() + defer r.Unlock() + + if r.quitc != nil { + return // Already in the registration loop. } - if r.instance.LeaseInfo.RenewalIntervalInSecs > 0 { - // User has opted for heartbeat functionality in Eureka. - if r.quit == nil { - r.quit = make(chan struct{}) - r.wg.Add(1) - go r.loop() - } + if err := r.conn.RegisterInstance(r.instance); err != nil { + r.logger.Log("during", "Register", "err", err) } + + r.quitc = make(chan chan struct{}) + go r.loop() } -// Deregister implements sd.Registrar interface. +// Deregister implements sd.Registrar. func (r *Registrar) Deregister() { - if err := r.client.Deregister(r.instance); err != nil { - r.logger.Log("err", err) - } else { - r.logger.Log("action", "deregister") - } + r.Lock() + defer r.Unlock() - if r.quit != nil { - close(r.quit) - r.wg.Wait() - r.quit = nil + if r.quitc == nil { + return // Already deregistered. } + + q := make(chan struct{}) + r.quitc <- q + <-q + r.quitc = nil } func (r *Registrar) loop() { - tick := time.NewTicker(time.Duration(r.instance.LeaseInfo.RenewalIntervalInSecs) * time.Second) - defer tick.Stop() - defer r.wg.Done() + var renewalInterval time.Duration + if r.instance.LeaseInfo.RenewalIntervalInSecs > 0 { + renewalInterval = time.Duration(r.instance.LeaseInfo.RenewalIntervalInSecs) * time.Second + } else { + renewalInterval = defaultRenewalInterval + } + ticker := time.NewTicker(renewalInterval) + defer ticker.Stop() for { select { - case <-tick.C: - if err := r.client.Heartbeat(r.instance); err != nil { - r.logger.Log("err", err) + case <-ticker.C: + if err := r.heartbeat(); err != nil { + r.logger.Log("during", "heartbeat", "err", err) + } + + case q := <-r.quitc: + if err := r.conn.DeregisterInstance(r.instance); err != nil { + r.logger.Log("during", "Deregister", "err", err) } - case <-r.quit: + close(q) return } } } + +func (r *Registrar) heartbeat() error { + err := r.conn.HeartBeatInstance(r.instance) + if err != nil { + if u, ok := err.(*fargoUnsuccessfulHTTPResponse); ok && u.statusCode == http.StatusNotFound { + // Instance expired (e.g. network partition). Re-register. + r.logger.Log("during", "heartbeat", err.Error()) + return r.conn.ReregisterInstance(r.instance) + } + } + return err +} + +func (u *fargoUnsuccessfulHTTPResponse) Error() string { + return fmt.Sprintf("err=%s code=%d", u.messagePrefix, u.statusCode) +} diff --git a/sd/eureka/registrar_test.go b/sd/eureka/registrar_test.go index eee26f377..7974e75b9 100644 --- a/sd/eureka/registrar_test.go +++ b/sd/eureka/registrar_test.go @@ -3,56 +3,100 @@ package eureka import ( "testing" "time" - - "github.com/hudl/fargo" ) func TestRegistrar(t *testing.T) { - client := &testClient{ - instances: []*fargo.Instance{}, + connection := &testConnection{ errHeartbeat: errTest, } - r := NewRegistrar(client, instanceTest1, loggerTest) - if want, have := 0, len(client.instances); want != have { - t.Errorf("want %d, have %d", want, have) - } + registrar1 := NewRegistrar(connection, instanceTest1, loggerTest) + registrar2 := NewRegistrar(connection, instanceTest2, loggerTest) // Not registered. - r.Deregister() - if want, have := 0, len(client.instances); want != have { + registrar1.Deregister() + if want, have := 0, len(connection.instances); want != have { t.Errorf("want %d, have %d", want, have) } // Register. - r.Register() - if want, have := 1, len(client.instances); want != have { + registrar1.Register() + if want, have := 1, len(connection.instances); want != have { + t.Errorf("want %d, have %d", want, have) + } + + registrar2.Register() + if want, have := 2, len(connection.instances); want != have { t.Errorf("want %d, have %d", want, have) } // Deregister. - r.Deregister() - if want, have := 0, len(client.instances); want != have { + registrar1.Deregister() + if want, have := 1, len(connection.instances); want != have { t.Errorf("want %d, have %d", want, have) } // Already registered. - r.Register() - if want, have := 1, len(client.instances); want != have { + registrar1.Register() + if want, have := 2, len(connection.instances); want != have { t.Errorf("want %d, have %d", want, have) } - r.Register() - if want, have := 1, len(client.instances); want != have { + registrar1.Register() + if want, have := 2, len(connection.instances); want != have { t.Errorf("want %d, have %d", want, have) } // Wait for a heartbeat failure. - time.Sleep(time.Second) - if want, have := 1, len(client.instances); want != have { + time.Sleep(1010 * time.Millisecond) + if want, have := 2, len(connection.instances); want != have { + t.Errorf("want %d, have %d", want, have) + } + registrar1.Deregister() + if want, have := 1, len(connection.instances); want != have { t.Errorf("want %d, have %d", want, have) } - r.Deregister() - if want, have := 0, len(client.instances); want != have { +} + +func TestBadRegister(t *testing.T) { + connection := &testConnection{ + errRegister: errTest, + } + + registrar := NewRegistrar(connection, instanceTest1, loggerTest) + registrar.Register() + if want, have := 0, len(connection.instances); want != have { + t.Errorf("want %d, have %d", want, have) + } +} + +func TestBadDeregister(t *testing.T) { + connection := &testConnection{ + errDeregister: errTest, + } + + registrar := NewRegistrar(connection, instanceTest1, loggerTest) + registrar.Register() + if want, have := 1, len(connection.instances); want != have { + t.Errorf("want %d, have %d", want, have) + } + registrar.Deregister() + if want, have := 1, len(connection.instances); want != have { + t.Errorf("want %d, have %d", want, have) + } +} + +func TestExpiredInstance(t *testing.T) { + connection := &testConnection{ + errHeartbeat: errNotFound, + } + + registrar := NewRegistrar(connection, instanceTest1, loggerTest) + registrar.Register() + + // Wait for a heartbeat failure. + time.Sleep(1010 * time.Millisecond) + + if want, have := 1, len(connection.instances); want != have { t.Errorf("want %d, have %d", want, have) } } diff --git a/sd/eureka/subscriber.go b/sd/eureka/subscriber.go index 278835467..0300d0d28 100644 --- a/sd/eureka/subscriber.go +++ b/sd/eureka/subscriber.go @@ -14,31 +14,35 @@ import ( // Subscriber yields endpoints stored in the Eureka registry for the given app. // Changes in that app are watched and will update the Subscriber endpoints. type Subscriber struct { - client Client - cache *cache.Cache - logger log.Logger - app string - quitc chan struct{} + conn fargoConnection + app string + factory sd.Factory + logger log.Logger + cache *cache.Cache + quitc chan chan struct{} } -var _ sd.Subscriber = &Subscriber{} +var _ sd.Subscriber = (*Subscriber)(nil) // NewSubscriber returns a Eureka subscriber. It will start watching the given // app string for changes, and update the endpoints accordingly. -func NewSubscriber(c Client, factory sd.Factory, logger log.Logger, app string) *Subscriber { +func NewSubscriber(conn fargoConnection, app string, factory sd.Factory, logger log.Logger) *Subscriber { + logger = log.With(logger, "app", app) + s := &Subscriber{ - client: c, - cache: cache.New(factory, logger), - app: app, - logger: logger, - quitc: make(chan struct{}), + conn: conn, + app: app, + factory: factory, + logger: logger, + cache: cache.New(factory, logger), + quitc: make(chan chan struct{}), } instances, err := s.getInstances() if err == nil { - s.logger.Log("app", s.app, "instances", len(instances)) + s.logger.Log("instances", len(instances)) } else { - s.logger.Log("app", s.app, "msg", "failed to retrieve instances", "err", err) + s.logger.Log("during", "getInstances", "err", err) } s.cache.Update(instances) @@ -46,54 +50,57 @@ func NewSubscriber(c Client, factory sd.Factory, logger log.Logger, app string) return s } -func (s *Subscriber) getInstances() ([]string, error) { - fargoInstances, err := s.client.Instances(s.app) - if err != nil { - return nil, err - } - return convertFargoInstances(fargoInstances), nil +// Endpoints implements the Subscriber interface. +func (s *Subscriber) Endpoints() ([]endpoint.Endpoint, error) { + return s.cache.Endpoints(), nil +} + +// Stop terminates the subscriber. +func (s *Subscriber) Stop() { + q := make(chan struct{}) + s.quitc <- q + <-q + s.quitc = nil } func (s *Subscriber) loop() { - updatec := s.client.ScheduleUpdates(s.app, s.quitc) + var ( + await = false + done = make(chan struct{}) + updatec = s.conn.ScheduleAppUpdates(s.app, await, done) + ) + defer close(done) + for { select { - case <-s.quitc: - return - case u := <-updatec: - if u.Err != nil { - s.logger.Log("app", s.app, "msg", "failed to retrieve instances", "err", u.Err) + case update := <-updatec: + if update.Err != nil { + s.logger.Log("during", "Update", "err", update.Err) continue } - - instances := convertFargoApplication(u.App) - s.logger.Log("app", s.app, "instances", len(instances)) + instances := convertFargoAppToInstances(update.App) + s.logger.Log("instances", len(instances)) s.cache.Update(instances) + + case q := <-s.quitc: + close(q) + return } } } -// Endpoints implements the Subscriber interface. -func (s *Subscriber) Endpoints() ([]endpoint.Endpoint, error) { - return s.cache.Endpoints(), nil -} - -// Stop terminates the Subscriber. -func (s *Subscriber) Stop() { - close(s.quitc) -} - -func convertFargoApplication(fargoApplication *fargo.Application) (instances []string) { - if fargoApplication != nil { - instances = convertFargoInstances(fargoApplication.Instances) +func (s *Subscriber) getInstances() ([]string, error) { + app, err := s.conn.GetApp(s.app) + if err != nil { + return nil, err } - return instances + return convertFargoAppToInstances(app), nil } -func convertFargoInstances(fargoInstances []*fargo.Instance) []string { - instances := make([]string, len(fargoInstances)) - for i, fargoInstance := range fargoInstances { - instances[i] = fmt.Sprintf("%s:%d", fargoInstance.IPAddr, fargoInstance.Port) +func convertFargoAppToInstances(app *fargo.Application) []string { + instances := make([]string, len(app.Instances)) + for i, inst := range app.Instances { + instances[i] = fmt.Sprintf("%s:%d", inst.IPAddr, inst.Port) } return instances } diff --git a/sd/eureka/subscriber_test.go b/sd/eureka/subscriber_test.go index 9c748430e..4d0d9b371 100644 --- a/sd/eureka/subscriber_test.go +++ b/sd/eureka/subscriber_test.go @@ -15,16 +15,16 @@ func TestSubscriber(t *testing.T) { return endpoint.Nop, nil, nil } - client := &testClient{ + connection := &testConnection{ instances: []*fargo.Instance{instanceTest1}, - application: applicationTest, + application: appUpdateTest, errApplication: nil, } - s := NewSubscriber(client, factory, loggerTest, instanceTest1.App) - defer s.Stop() + subscriber := NewSubscriber(connection, appNameTest, factory, loggerTest) + defer subscriber.Stop() - endpoints, err := s.Endpoints() + endpoints, err := subscriber.Endpoints() if err != nil { t.Fatal(err) } @@ -39,23 +39,23 @@ func TestSubscriberScheduleUpdates(t *testing.T) { return endpoint.Nop, nil, nil } - client := &testClient{ + connection := &testConnection{ instances: []*fargo.Instance{instanceTest1}, - application: applicationTest, + application: appUpdateTest, errApplication: nil, } - s := NewSubscriber(client, factory, loggerTest, instanceTest1.App) - defer s.Stop() + subscriber := NewSubscriber(connection, appNameTest, factory, loggerTest) + defer subscriber.Stop() - endpoints, _ := s.Endpoints() + endpoints, _ := subscriber.Endpoints() if want, have := 1, len(endpoints); want != have { t.Errorf("want %d, have %d", want, have) } time.Sleep(50 * time.Millisecond) - endpoints, _ = s.Endpoints() + endpoints, _ = subscriber.Endpoints() if want, have := 2, len(endpoints); want != have { t.Errorf("want %v, have %v", want, have) } @@ -66,14 +66,16 @@ func TestBadFactory(t *testing.T) { return nil, nil, errTest } - client := &testClient{ - instances: []*fargo.Instance{instanceTest1}, + connection := &testConnection{ + instances: []*fargo.Instance{instanceTest1}, + application: appUpdateTest, + errApplication: nil, } - s := NewSubscriber(client, factory, loggerTest, instanceTest1.App) - defer s.Stop() + subscriber := NewSubscriber(connection, appNameTest, factory, loggerTest) + defer subscriber.Stop() - endpoints, err := s.Endpoints() + endpoints, err := subscriber.Endpoints() if err != nil { t.Fatal(err) } @@ -88,16 +90,17 @@ func TestBadSubscriberInstances(t *testing.T) { return endpoint.Nop, nil, nil } - client := &testClient{ + connection := &testConnection{ + instances: []*fargo.Instance{}, errInstances: errTest, - application: applicationTest, + application: appUpdateTest, errApplication: nil, } - s := NewSubscriber(client, factory, loggerTest, instanceTest1.App) - defer s.Stop() + subscriber := NewSubscriber(connection, appNameTest, factory, loggerTest) + defer subscriber.Stop() - endpoints, err := s.Endpoints() + endpoints, err := subscriber.Endpoints() if err != nil { t.Fatal(err) } @@ -112,16 +115,16 @@ func TestBadSubscriberScheduleUpdates(t *testing.T) { return endpoint.Nop, nil, nil } - client := &testClient{ + connection := &testConnection{ instances: []*fargo.Instance{instanceTest1}, - application: applicationTest, + application: appUpdateTest, errApplication: errTest, } - s := NewSubscriber(client, factory, loggerTest, instanceTest1.App) - defer s.Stop() + subscriber := NewSubscriber(connection, appNameTest, factory, loggerTest) + defer subscriber.Stop() - endpoints, err := s.Endpoints() + endpoints, err := subscriber.Endpoints() if err != nil { t.Error(err) } @@ -131,7 +134,7 @@ func TestBadSubscriberScheduleUpdates(t *testing.T) { time.Sleep(50 * time.Millisecond) - endpoints, err = s.Endpoints() + endpoints, err = subscriber.Endpoints() if err != nil { t.Error(err) } diff --git a/sd/eureka/util_test.go b/sd/eureka/util_test.go new file mode 100644 index 000000000..a66e7121a --- /dev/null +++ b/sd/eureka/util_test.go @@ -0,0 +1,108 @@ +package eureka + +import ( + "errors" + "reflect" + + "github.com/go-kit/kit/log" + "github.com/hudl/fargo" +) + +type testConnection struct { + instances []*fargo.Instance + application *fargo.Application + errInstances error + errApplication error + errHeartbeat error + errRegister error + errDeregister error +} + +var ( + errTest = errors.New("kaboom") + errNotFound = &fargoUnsuccessfulHTTPResponse{statusCode: 404, messagePrefix: "not found"} + loggerTest = log.NewNopLogger() + appNameTest = "go-kit" + appUpdateTest = &fargo.Application{ + Name: appNameTest, + Instances: []*fargo.Instance{instanceTest1, instanceTest2}, + } + instanceTest1 = &fargo.Instance{ + HostName: "serveregistrar1.acme.org", + Port: 8080, + App: appNameTest, + IPAddr: "192.168.0.1", + VipAddress: "192.168.0.1", + SecureVipAddress: "192.168.0.1", + HealthCheckUrl: "http://serveregistrar1.acme.org:8080/healthz", + StatusPageUrl: "http://serveregistrar1.acme.org:8080/status", + HomePageUrl: "http://serveregistrar1.acme.org:8080/", + Status: fargo.UP, + DataCenterInfo: fargo.DataCenterInfo{Name: fargo.MyOwn}, + LeaseInfo: fargo.LeaseInfo{RenewalIntervalInSecs: 1}, + } + instanceTest2 = &fargo.Instance{ + HostName: "serveregistrar2.acme.org", + Port: 8080, + App: appNameTest, + IPAddr: "192.168.0.2", + VipAddress: "192.168.0.2", + SecureVipAddress: "192.168.0.2", + HealthCheckUrl: "http://serveregistrar2.acme.org:8080/healthz", + StatusPageUrl: "http://serveregistrar2.acme.org:8080/status", + HomePageUrl: "http://serveregistrar2.acme.org:8080/", + Status: fargo.UP, + DataCenterInfo: fargo.DataCenterInfo{Name: fargo.MyOwn}, + } +) + +var _ fargoConnection = (*testConnection)(nil) + +func (c *testConnection) RegisterInstance(i *fargo.Instance) error { + if c.errRegister == nil { + for _, instance := range c.instances { + if reflect.DeepEqual(*instance, *i) { + return errors.New("already registered") + } + } + + c.instances = append(c.instances, i) + } + return c.errRegister +} + +func (c *testConnection) HeartBeatInstance(i *fargo.Instance) error { + return c.errHeartbeat +} + +func (c *testConnection) DeregisterInstance(i *fargo.Instance) error { + if c.errDeregister == nil { + var newInstances []*fargo.Instance + for _, instance := range c.instances { + if reflect.DeepEqual(*instance, *i) { + continue + } + newInstances = append(newInstances, instance) + } + if len(newInstances) == len(c.instances) { + return errors.New("not registered") + } + + c.instances = newInstances + } + return c.errDeregister +} + +func (c *testConnection) ReregisterInstance(ins *fargo.Instance) error { + return nil +} + +func (c *testConnection) ScheduleAppUpdates(name string, await bool, done <-chan struct{}) <-chan fargo.AppUpdate { + updatec := make(chan fargo.AppUpdate, 1) + updatec <- fargo.AppUpdate{App: c.application, Err: c.errApplication} + return updatec +} + +func (c *testConnection) GetApp(name string) (*fargo.Application, error) { + return &fargo.Application{Name: appNameTest, Instances: c.instances}, c.errInstances +}