Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 19 additions & 9 deletions sd/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,19 @@ import (
"github.com/go-kit/kit/sd"
)

// TODO move this to endpoint/cache

// Cache collects the most recent set of endpoints from a service discovery
// system via a subscriber, and makes them available to consumers. Cache is
// meant to be embedded inside of a concrete subscriber, and can serve Service
// invocations directly.
type Cache struct {
mtx sync.RWMutex
factory sd.Factory
cache map[string]endpointCloser
slice atomic.Value // []endpoint.Endpoint
logger log.Logger
mtx sync.RWMutex
factory sd.Factory
cache map[string]endpointCloser
instances atomic.Value // []string
endpoints atomic.Value // []endpoint.Endpoint
logger log.Logger
}

type endpointCloser struct {
Expand Down Expand Up @@ -75,22 +78,29 @@ func (c *Cache) Update(instances []string) {
}

// Populate the slice of endpoints.
slice := make([]endpoint.Endpoint, 0, len(cache))
endpoints := make([]endpoint.Endpoint, 0, len(cache))
for _, instance := range instances {
// A bad factory may mean an instance is not present.
if _, ok := cache[instance]; !ok {
continue
}
slice = append(slice, cache[instance].Endpoint)
endpoints = append(endpoints, cache[instance].Endpoint)
}

// Swap and trigger GC for old copies.
c.slice.Store(slice)
c.instances.Store(instances)
c.endpoints.Store(endpoints)
c.cache = cache
}

// Endpoints yields the current set of (presumably identical) endpoints, ordered
// lexicographically by the corresponding instance string.
func (c *Cache) Endpoints() []endpoint.Endpoint {
return c.slice.Load().([]endpoint.Endpoint)
return c.endpoints.Load().([]endpoint.Endpoint)
}

// Instances yields the current set of (presumably identical) instance locations,
// ordered lexicographically.
func (c *Cache) Instances() []string {
return c.instances.Load().([]string)
}
5 changes: 5 additions & 0 deletions sd/consul/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ func NewSubscriber(client Client, factory sd.Factory, logger log.Logger, service
return s
}

// Instances implements the Discoverer interface.
func (s *Subscriber) Instances() ([]string, error) {
return s.cache.Instances(), nil
}

// Endpoints implements the Subscriber interface.
func (s *Subscriber) Endpoints() ([]endpoint.Endpoint, error) {
return s.cache.Endpoints(), nil
Expand Down
9 changes: 9 additions & 0 deletions sd/discoverer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package sd

// Discoverer listens to a service discovery system and yields a set of
// identical instance locations. An error indicates a problem with connectivity
// to the service discovery system, or within the system itself; a subscriber
// may yield no endpoints without error.
type Discoverer interface {
Instances() ([]string, error)
}
5 changes: 5 additions & 0 deletions sd/dnssrv/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,11 @@ func (p *Subscriber) loop(t *time.Ticker, lookup Lookup) {
}
}

// Instances implements the Discoverer interface.
func (p *Subscriber) Instances() ([]string, error) {
return p.cache.Instances(), nil
}

// Endpoints implements the Subscriber interface.
func (p *Subscriber) Endpoints() ([]endpoint.Endpoint, error) {
return p.cache.Endpoints(), nil
Expand Down
5 changes: 5 additions & 0 deletions sd/etcd/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@ func (s *Subscriber) loop() {
}
}

// Instances implements the Discoverer interface.
func (s *Subscriber) Instances() ([]string, error) {
return s.cache.Instances(), nil
}

// Endpoints implements the Subscriber interface.
func (s *Subscriber) Endpoints() ([]endpoint.Endpoint, error) {
return s.cache.Endpoints(), nil
Expand Down
2 changes: 2 additions & 0 deletions sd/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"github.com/go-kit/kit/endpoint"
)

// TODO move this to endpoint/

// Factory is a function that converts an instance string (e.g. host:port) to a
// specific endpoint. Instances that provide multiple endpoints require multiple
// factories. A factory also returns an io.Closer that's invoked when the
Expand Down
7 changes: 7 additions & 0 deletions sd/fixed_discoverer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package sd

// FixedDiscoverer yields a fixed set of instances.
type FixedDiscoverer []string

// Instances implements Discoverer.
func (d FixedDiscoverer) Instances() ([]string, error) { return d, nil }