diff --git a/sd/cache/cache.go b/sd/cache/cache.go index 82af86b51..185330418 100644 --- a/sd/cache/cache.go +++ b/sd/cache/cache.go @@ -11,16 +11,19 @@ import ( "github.com/go-kit/kit/sd" ) +// TODO move this to endpoint/cache + // Cache collects the most recent set of endpoints from a service discovery // system via a subscriber, and makes them available to consumers. Cache is // meant to be embedded inside of a concrete subscriber, and can serve Service // invocations directly. type Cache struct { - mtx sync.RWMutex - factory sd.Factory - cache map[string]endpointCloser - slice atomic.Value // []endpoint.Endpoint - logger log.Logger + mtx sync.RWMutex + factory sd.Factory + cache map[string]endpointCloser + instances atomic.Value // []string + endpoints atomic.Value // []endpoint.Endpoint + logger log.Logger } type endpointCloser struct { @@ -75,22 +78,29 @@ func (c *Cache) Update(instances []string) { } // Populate the slice of endpoints. - slice := make([]endpoint.Endpoint, 0, len(cache)) + endpoints := make([]endpoint.Endpoint, 0, len(cache)) for _, instance := range instances { // A bad factory may mean an instance is not present. if _, ok := cache[instance]; !ok { continue } - slice = append(slice, cache[instance].Endpoint) + endpoints = append(endpoints, cache[instance].Endpoint) } // Swap and trigger GC for old copies. - c.slice.Store(slice) + c.instances.Store(instances) + c.endpoints.Store(endpoints) c.cache = cache } // Endpoints yields the current set of (presumably identical) endpoints, ordered // lexicographically by the corresponding instance string. func (c *Cache) Endpoints() []endpoint.Endpoint { - return c.slice.Load().([]endpoint.Endpoint) + return c.endpoints.Load().([]endpoint.Endpoint) +} + +// Instances yields the current set of (presumably identical) instance locations, +// ordered lexicographically. +func (c *Cache) Instances() []string { + return c.instances.Load().([]string) } diff --git a/sd/consul/subscriber.go b/sd/consul/subscriber.go index ee3ae34bb..9029d9d55 100644 --- a/sd/consul/subscriber.go +++ b/sd/consul/subscriber.go @@ -55,6 +55,11 @@ func NewSubscriber(client Client, factory sd.Factory, logger log.Logger, service return s } +// Instances implements the Discoverer interface. +func (s *Subscriber) Instances() ([]string, error) { + return s.cache.Instances(), nil +} + // Endpoints implements the Subscriber interface. func (s *Subscriber) Endpoints() ([]endpoint.Endpoint, error) { return s.cache.Endpoints(), nil diff --git a/sd/discoverer.go b/sd/discoverer.go new file mode 100644 index 000000000..913093a4f --- /dev/null +++ b/sd/discoverer.go @@ -0,0 +1,9 @@ +package sd + +// Discoverer listens to a service discovery system and yields a set of +// identical instance locations. An error indicates a problem with connectivity +// to the service discovery system, or within the system itself; a subscriber +// may yield no endpoints without error. +type Discoverer interface { + Instances() ([]string, error) +} diff --git a/sd/dnssrv/subscriber.go b/sd/dnssrv/subscriber.go index 422fdaa76..cdedf13a5 100644 --- a/sd/dnssrv/subscriber.go +++ b/sd/dnssrv/subscriber.go @@ -82,6 +82,11 @@ func (p *Subscriber) loop(t *time.Ticker, lookup Lookup) { } } +// Instances implements the Discoverer interface. +func (p *Subscriber) Instances() ([]string, error) { + return p.cache.Instances(), nil +} + // Endpoints implements the Subscriber interface. func (p *Subscriber) Endpoints() ([]endpoint.Endpoint, error) { return p.cache.Endpoints(), nil diff --git a/sd/etcd/subscriber.go b/sd/etcd/subscriber.go index 1b9187283..6b9a50797 100644 --- a/sd/etcd/subscriber.go +++ b/sd/etcd/subscriber.go @@ -61,6 +61,11 @@ func (s *Subscriber) loop() { } } +// Instances implements the Discoverer interface. +func (s *Subscriber) Instances() ([]string, error) { + return s.cache.Instances(), nil +} + // Endpoints implements the Subscriber interface. func (s *Subscriber) Endpoints() ([]endpoint.Endpoint, error) { return s.cache.Endpoints(), nil diff --git a/sd/factory.go b/sd/factory.go index af99817b4..385be8d84 100644 --- a/sd/factory.go +++ b/sd/factory.go @@ -6,6 +6,8 @@ import ( "github.com/go-kit/kit/endpoint" ) +// TODO move this to endpoint/ + // Factory is a function that converts an instance string (e.g. host:port) to a // specific endpoint. Instances that provide multiple endpoints require multiple // factories. A factory also returns an io.Closer that's invoked when the diff --git a/sd/fixed_discoverer.go b/sd/fixed_discoverer.go new file mode 100644 index 000000000..e2d0e46a4 --- /dev/null +++ b/sd/fixed_discoverer.go @@ -0,0 +1,7 @@ +package sd + +// FixedDiscoverer yields a fixed set of instances. +type FixedDiscoverer []string + +// Instances implements Discoverer. +func (d FixedDiscoverer) Instances() ([]string, error) { return d, nil }