Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions circle.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,4 @@ test:
ETCD_ADDR: http://localhost:2379
CONSUL_ADDR: localhost:8500
ZK_ADDR: localhost:2181
EUREKA_ADDR: http://localhost:8761/eureka
6 changes: 6 additions & 0 deletions docker-compose-integration.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,9 @@ services:
image: zookeeper
ports:
- "2181:2181"
eureka:
image: springcloud/eureka
environment:
eureka.server.responseCacheUpdateIntervalMs: 1000
ports:
- "8761:8761"
2 changes: 2 additions & 0 deletions sd/eureka/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
// Package eureka provides subscriber and registrar implementations for Netflix OSS's Eureka
package eureka
106 changes: 106 additions & 0 deletions sd/eureka/integration_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
// +build integration

package eureka

import (
"io"
"os"
"testing"
"time"

"github.com/hudl/fargo"

"github.com/go-kit/kit/endpoint"
"github.com/go-kit/kit/log"
)

// Package sd/eureka provides a wrapper around the Netflix Eureka service
// registry by way of the Fargo library. This test assumes the user has an
// instance of Eureka available at the address in the environment variable.
// Example `${EUREKA_ADDR}` format: http://localhost:8761/eureka
//
// NOTE: when starting a Eureka server for integration testing, ensure
// the response cache interval is reduced to one second. This can be
// achieved with the following Java argument:
// `-Deureka.server.responseCacheUpdateIntervalMs=1000`
func TestIntegration(t *testing.T) {
eurekaAddr := os.Getenv("EUREKA_ADDR")
if eurekaAddr == "" {
t.Skip("EUREKA_ADDR is not set")
}

logger := log.NewLogfmtLogger(os.Stderr)
logger = log.With(logger, "ts", log.DefaultTimestamp)

var fargoConfig fargo.Config
// Target Eureka server(s).
fargoConfig.Eureka.ServiceUrls = []string{eurekaAddr}
// How often the subscriber should poll for updates.
fargoConfig.Eureka.PollIntervalSeconds = 1

// Create a Fargo connection and a Eureka registrar.
fargoConnection := fargo.NewConnFromConfig(fargoConfig)
registrar1 := NewRegistrar(&fargoConnection, instanceTest1, log.With(logger, "component", "registrar1"))

// Register one instance.
registrar1.Register()
defer registrar1.Deregister()

// This should be enough time for the Eureka server response cache to update.
time.Sleep(time.Second)

// Build a Eureka subscriber.
factory := func(instance string) (endpoint.Endpoint, io.Closer, error) {
t.Logf("factory invoked for %q", instance)
return endpoint.Nop, nil, nil
}
s := NewSubscriber(
&fargoConnection,
appNameTest,
factory,
log.With(logger, "component", "subscriber"),
)
defer s.Stop()

// We should have one endpoint immediately after subscriber instantiation.
endpoints, err := s.Endpoints()
if err != nil {
t.Error(err)
}
if want, have := 1, len(endpoints); want != have {
t.Errorf("want %d, have %d", want, have)
}

// Register a second instance
registrar2 := NewRegistrar(&fargoConnection, instanceTest2, log.With(logger, "component", "registrar2"))
registrar2.Register()
defer registrar2.Deregister() // In case of exceptional circumstances.

// This should be enough time for a scheduled update assuming Eureka is
// configured with the properties mentioned in the function comments.
time.Sleep(2 * time.Second)

// Now we should have two endpoints.
endpoints, err = s.Endpoints()
if err != nil {
t.Error(err)
}
if want, have := 2, len(endpoints); want != have {
t.Errorf("want %d, have %d", want, have)
}

// Deregister the second instance.
registrar2.Deregister()

// Wait for another scheduled update.
time.Sleep(2 * time.Second)

// And then there was one.
endpoints, err = s.Endpoints()
if err != nil {
t.Error(err)
}
if want, have := 1, len(endpoints); want != have {
t.Errorf("want %d, have %d", want, have)
}
}
127 changes: 127 additions & 0 deletions sd/eureka/registrar.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package eureka

import (
"fmt"
"net/http"
"sync"
"time"

"github.com/hudl/fargo"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/sd"
)

// Matches official Netflix Java client default.
const defaultRenewalInterval = 30 * time.Second

// The methods of fargo.Connection used in this package.
type fargoConnection interface {
RegisterInstance(instance *fargo.Instance) error
DeregisterInstance(instance *fargo.Instance) error
ReregisterInstance(instance *fargo.Instance) error
HeartBeatInstance(instance *fargo.Instance) error
ScheduleAppUpdates(name string, await bool, done <-chan struct{}) <-chan fargo.AppUpdate
GetApp(name string) (*fargo.Application, error)
}

type fargoUnsuccessfulHTTPResponse struct {
statusCode int
messagePrefix string
}

// Registrar maintains service instance liveness information in Eureka.
type Registrar struct {
conn fargoConnection
instance *fargo.Instance
logger log.Logger
quitc chan chan struct{}
sync.Mutex
}

var _ sd.Registrar = (*Registrar)(nil)

// NewRegistrar returns an Eureka Registrar acting on behalf of the provided
// Fargo connection and instance. See the integration test for usage examples.
func NewRegistrar(conn fargoConnection, instance *fargo.Instance, logger log.Logger) *Registrar {
return &Registrar{
conn: conn,
instance: instance,
logger: log.With(logger, "service", instance.App, "address", fmt.Sprintf("%s:%d", instance.IPAddr, instance.Port)),
}
}

// Register implements sd.Registrar.
func (r *Registrar) Register() {
r.Lock()
defer r.Unlock()

if r.quitc != nil {
return // Already in the registration loop.
}

if err := r.conn.RegisterInstance(r.instance); err != nil {
r.logger.Log("during", "Register", "err", err)
}

r.quitc = make(chan chan struct{})
go r.loop()
}

// Deregister implements sd.Registrar.
func (r *Registrar) Deregister() {
r.Lock()
defer r.Unlock()

if r.quitc == nil {
return // Already deregistered.
}

q := make(chan struct{})
r.quitc <- q
<-q
r.quitc = nil
}

func (r *Registrar) loop() {
var renewalInterval time.Duration
if r.instance.LeaseInfo.RenewalIntervalInSecs > 0 {
renewalInterval = time.Duration(r.instance.LeaseInfo.RenewalIntervalInSecs) * time.Second
} else {
renewalInterval = defaultRenewalInterval
}
ticker := time.NewTicker(renewalInterval)
defer ticker.Stop()

for {
select {
case <-ticker.C:
if err := r.heartbeat(); err != nil {
r.logger.Log("during", "heartbeat", "err", err)
}

case q := <-r.quitc:
if err := r.conn.DeregisterInstance(r.instance); err != nil {
r.logger.Log("during", "Deregister", "err", err)
}
close(q)
return
}
}
}

func (r *Registrar) heartbeat() error {
err := r.conn.HeartBeatInstance(r.instance)
if err != nil {
if u, ok := err.(*fargoUnsuccessfulHTTPResponse); ok && u.statusCode == http.StatusNotFound {
// Instance expired (e.g. network partition). Re-register.
r.logger.Log("during", "heartbeat", err.Error())
return r.conn.ReregisterInstance(r.instance)
}
}
return err
}

func (u *fargoUnsuccessfulHTTPResponse) Error() string {
return fmt.Sprintf("err=%s code=%d", u.messagePrefix, u.statusCode)
}
102 changes: 102 additions & 0 deletions sd/eureka/registrar_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package eureka

import (
"testing"
"time"
)

func TestRegistrar(t *testing.T) {
connection := &testConnection{
errHeartbeat: errTest,
}

registrar1 := NewRegistrar(connection, instanceTest1, loggerTest)
registrar2 := NewRegistrar(connection, instanceTest2, loggerTest)

// Not registered.
registrar1.Deregister()
if want, have := 0, len(connection.instances); want != have {
t.Errorf("want %d, have %d", want, have)
}

// Register.
registrar1.Register()
if want, have := 1, len(connection.instances); want != have {
t.Errorf("want %d, have %d", want, have)
}

registrar2.Register()
if want, have := 2, len(connection.instances); want != have {
t.Errorf("want %d, have %d", want, have)
}

// Deregister.
registrar1.Deregister()
if want, have := 1, len(connection.instances); want != have {
t.Errorf("want %d, have %d", want, have)
}

// Already registered.
registrar1.Register()
if want, have := 2, len(connection.instances); want != have {
t.Errorf("want %d, have %d", want, have)
}
registrar1.Register()
if want, have := 2, len(connection.instances); want != have {
t.Errorf("want %d, have %d", want, have)
}

// Wait for a heartbeat failure.
time.Sleep(1010 * time.Millisecond)
if want, have := 2, len(connection.instances); want != have {
t.Errorf("want %d, have %d", want, have)
}
registrar1.Deregister()
if want, have := 1, len(connection.instances); want != have {
t.Errorf("want %d, have %d", want, have)
}
}

func TestBadRegister(t *testing.T) {
connection := &testConnection{
errRegister: errTest,
}

registrar := NewRegistrar(connection, instanceTest1, loggerTest)
registrar.Register()
if want, have := 0, len(connection.instances); want != have {
t.Errorf("want %d, have %d", want, have)
}
}

func TestBadDeregister(t *testing.T) {
connection := &testConnection{
errDeregister: errTest,
}

registrar := NewRegistrar(connection, instanceTest1, loggerTest)
registrar.Register()
if want, have := 1, len(connection.instances); want != have {
t.Errorf("want %d, have %d", want, have)
}
registrar.Deregister()
if want, have := 1, len(connection.instances); want != have {
t.Errorf("want %d, have %d", want, have)
}
}

func TestExpiredInstance(t *testing.T) {
connection := &testConnection{
errHeartbeat: errNotFound,
}

registrar := NewRegistrar(connection, instanceTest1, loggerTest)
registrar.Register()

// Wait for a heartbeat failure.
time.Sleep(1010 * time.Millisecond)

if want, have := 1, len(connection.instances); want != have {
t.Errorf("want %d, have %d", want, have)
}
}
Loading