From d1507c6ba234756d6878f4b4ccf522d26f2cd799 Mon Sep 17 00:00:00 2001 From: Yuri Shkuro Date: Fri, 3 Mar 2017 13:52:32 -0500 Subject: [PATCH 1/3] Support discovery of instance locations, not just Endpoints --- sd/cache/cache.go | 28 +++++++++++++++++++--------- sd/consul/subscriber.go | 5 +++++ sd/discoverer.go | 9 +++++++++ sd/dnssrv/subscriber.go | 5 +++++ sd/etcd/subscriber.go | 5 +++++ sd/factory.go | 2 ++ sd/fixed_discoverer.go | 7 +++++++ 7 files changed, 52 insertions(+), 9 deletions(-) create mode 100644 sd/discoverer.go create mode 100644 sd/fixed_discoverer.go diff --git a/sd/cache/cache.go b/sd/cache/cache.go index 82af86b51..185330418 100644 --- a/sd/cache/cache.go +++ b/sd/cache/cache.go @@ -11,16 +11,19 @@ import ( "github.com/go-kit/kit/sd" ) +// TODO move this to endpoint/cache + // Cache collects the most recent set of endpoints from a service discovery // system via a subscriber, and makes them available to consumers. Cache is // meant to be embedded inside of a concrete subscriber, and can serve Service // invocations directly. type Cache struct { - mtx sync.RWMutex - factory sd.Factory - cache map[string]endpointCloser - slice atomic.Value // []endpoint.Endpoint - logger log.Logger + mtx sync.RWMutex + factory sd.Factory + cache map[string]endpointCloser + instances atomic.Value // []string + endpoints atomic.Value // []endpoint.Endpoint + logger log.Logger } type endpointCloser struct { @@ -75,22 +78,29 @@ func (c *Cache) Update(instances []string) { } // Populate the slice of endpoints. - slice := make([]endpoint.Endpoint, 0, len(cache)) + endpoints := make([]endpoint.Endpoint, 0, len(cache)) for _, instance := range instances { // A bad factory may mean an instance is not present. if _, ok := cache[instance]; !ok { continue } - slice = append(slice, cache[instance].Endpoint) + endpoints = append(endpoints, cache[instance].Endpoint) } // Swap and trigger GC for old copies. - c.slice.Store(slice) + c.instances.Store(instances) + c.endpoints.Store(endpoints) c.cache = cache } // Endpoints yields the current set of (presumably identical) endpoints, ordered // lexicographically by the corresponding instance string. func (c *Cache) Endpoints() []endpoint.Endpoint { - return c.slice.Load().([]endpoint.Endpoint) + return c.endpoints.Load().([]endpoint.Endpoint) +} + +// Instances yields the current set of (presumably identical) instance locations, +// ordered lexicographically. +func (c *Cache) Instances() []string { + return c.instances.Load().([]string) } diff --git a/sd/consul/subscriber.go b/sd/consul/subscriber.go index ee3ae34bb..883ec0baf 100644 --- a/sd/consul/subscriber.go +++ b/sd/consul/subscriber.go @@ -55,6 +55,11 @@ func NewSubscriber(client Client, factory sd.Factory, logger log.Logger, service return s } +// Instances implements the Discoverer interface. +func (s *Subscriber) Instances() ([]endpoint.Endpoint, error) { + return s.cache.Instances(), nil +} + // Endpoints implements the Subscriber interface. func (s *Subscriber) Endpoints() ([]endpoint.Endpoint, error) { return s.cache.Endpoints(), nil diff --git a/sd/discoverer.go b/sd/discoverer.go new file mode 100644 index 000000000..913093a4f --- /dev/null +++ b/sd/discoverer.go @@ -0,0 +1,9 @@ +package sd + +// Discoverer listens to a service discovery system and yields a set of +// identical instance locations. An error indicates a problem with connectivity +// to the service discovery system, or within the system itself; a subscriber +// may yield no endpoints without error. +type Discoverer interface { + Instances() ([]string, error) +} diff --git a/sd/dnssrv/subscriber.go b/sd/dnssrv/subscriber.go index 422fdaa76..45d66fcb6 100644 --- a/sd/dnssrv/subscriber.go +++ b/sd/dnssrv/subscriber.go @@ -82,6 +82,11 @@ func (p *Subscriber) loop(t *time.Ticker, lookup Lookup) { } } +// Instances implements the Discoverer interface. +func (p *Subscriber) Instances() ([]endpoint.Endpoint, error) { + return p.cache.Instances(), nil +} + // Endpoints implements the Subscriber interface. func (p *Subscriber) Endpoints() ([]endpoint.Endpoint, error) { return p.cache.Endpoints(), nil diff --git a/sd/etcd/subscriber.go b/sd/etcd/subscriber.go index 1b9187283..b0a460340 100644 --- a/sd/etcd/subscriber.go +++ b/sd/etcd/subscriber.go @@ -61,6 +61,11 @@ func (s *Subscriber) loop() { } } +// Instances implements the Discoverer interface. +func (s *Subscriber) Instances() ([]endpoint.Endpoint, error) { + return s.cache.Instances(), nil +} + // Endpoints implements the Subscriber interface. func (s *Subscriber) Endpoints() ([]endpoint.Endpoint, error) { return s.cache.Endpoints(), nil diff --git a/sd/factory.go b/sd/factory.go index af99817b4..385be8d84 100644 --- a/sd/factory.go +++ b/sd/factory.go @@ -6,6 +6,8 @@ import ( "github.com/go-kit/kit/endpoint" ) +// TODO move this to endpoint/ + // Factory is a function that converts an instance string (e.g. host:port) to a // specific endpoint. Instances that provide multiple endpoints require multiple // factories. A factory also returns an io.Closer that's invoked when the diff --git a/sd/fixed_discoverer.go b/sd/fixed_discoverer.go new file mode 100644 index 000000000..e2d0e46a4 --- /dev/null +++ b/sd/fixed_discoverer.go @@ -0,0 +1,7 @@ +package sd + +// FixedDiscoverer yields a fixed set of instances. +type FixedDiscoverer []string + +// Instances implements Discoverer. +func (d FixedDiscoverer) Instances() ([]string, error) { return d, nil } From 7e9d374e351ed65409c606ab7faaf27a06e7762a Mon Sep 17 00:00:00 2001 From: Yuri Shkuro Date: Fri, 3 Mar 2017 13:56:36 -0500 Subject: [PATCH 2/3] strings --- sd/consul/subscriber.go | 2 +- sd/dnssrv/subscriber.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sd/consul/subscriber.go b/sd/consul/subscriber.go index 883ec0baf..9029d9d55 100644 --- a/sd/consul/subscriber.go +++ b/sd/consul/subscriber.go @@ -56,7 +56,7 @@ func NewSubscriber(client Client, factory sd.Factory, logger log.Logger, service } // Instances implements the Discoverer interface. -func (s *Subscriber) Instances() ([]endpoint.Endpoint, error) { +func (s *Subscriber) Instances() ([]string, error) { return s.cache.Instances(), nil } diff --git a/sd/dnssrv/subscriber.go b/sd/dnssrv/subscriber.go index 45d66fcb6..cdedf13a5 100644 --- a/sd/dnssrv/subscriber.go +++ b/sd/dnssrv/subscriber.go @@ -83,7 +83,7 @@ func (p *Subscriber) loop(t *time.Ticker, lookup Lookup) { } // Instances implements the Discoverer interface. -func (p *Subscriber) Instances() ([]endpoint.Endpoint, error) { +func (p *Subscriber) Instances() ([]string, error) { return p.cache.Instances(), nil } From d97d6078866ebc8072bdceb09dd3e9b166a1b32d Mon Sep 17 00:00:00 2001 From: Yuri Shkuro Date: Fri, 3 Mar 2017 13:57:51 -0500 Subject: [PATCH 3/3] Missed one --- sd/etcd/subscriber.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sd/etcd/subscriber.go b/sd/etcd/subscriber.go index b0a460340..6b9a50797 100644 --- a/sd/etcd/subscriber.go +++ b/sd/etcd/subscriber.go @@ -62,7 +62,7 @@ func (s *Subscriber) loop() { } // Instances implements the Discoverer interface. -func (s *Subscriber) Instances() ([]endpoint.Endpoint, error) { +func (s *Subscriber) Instances() ([]string, error) { return s.cache.Instances(), nil }