From 4cea0775c353ecc21061340ce99b4ece1df575c6 Mon Sep 17 00:00:00 2001 From: robin Date: Wed, 7 Feb 2018 16:35:41 -0600 Subject: [PATCH 1/2] Service Discovery support for etcd v3 This adds a go-kit/kit/sd implementation for etcd/clientv3. All tests except the client_test were ported over. I have removed the client tests because they are more problematic with regards to etcd v3. The etcd maintainers switched to protobufs in v3 and placed those generated messages under internal/, which would cause issues trying to reference it (see https://golang.org/s/go14internal). This makes stubbing clientv3.KV more difficult without burying the implementation behind interfaces to remove the direct dependence on those internal packages. Features in use with v3: - Support for etcd/clientv3.Watcher - Support for etcd/clientv3.Lease (TTL) - Username/password support --- sd/etcdv3/client.go | 234 ++++++++++++++++++++++++++++++++++ sd/etcdv3/doc.go | 1 + sd/etcdv3/example_test.go | 92 +++++++++++++ sd/etcdv3/instancer.go | 76 +++++++++++ sd/etcdv3/instancer_test.go | 86 +++++++++++++ sd/etcdv3/integration_test.go | 176 +++++++++++++++++++++++++ sd/etcdv3/registrar.go | 119 +++++++++++++++++ sd/etcdv3/registrar_test.go | 120 +++++++++++++++++ 8 files changed, 904 insertions(+) create mode 100644 sd/etcdv3/client.go create mode 100644 sd/etcdv3/doc.go create mode 100644 sd/etcdv3/example_test.go create mode 100644 sd/etcdv3/instancer.go create mode 100644 sd/etcdv3/instancer_test.go create mode 100644 sd/etcdv3/integration_test.go create mode 100644 sd/etcdv3/registrar.go create mode 100644 sd/etcdv3/registrar_test.go diff --git a/sd/etcdv3/client.go b/sd/etcdv3/client.go new file mode 100644 index 000000000..2eeb41089 --- /dev/null +++ b/sd/etcdv3/client.go @@ -0,0 +1,234 @@ +package etcdv3 + +import ( + "context" + "crypto/tls" + "errors" + "time" + + "github.com/coreos/etcd/clientv3" + "github.com/coreos/etcd/pkg/transport" +) + +var ( + // ErrNoKey indicates a client method needs a key but receives none. + ErrNoKey = errors.New("no key provided") + + // ErrNoValue indicates a client method needs a value but receives none. + ErrNoValue = errors.New("no value provided") + + ErrKeyNotFound = errors.New("requested key not found") +) + +// Client is a wrapper around the etcd client. +type Client interface { + // GetEntries queries the given prefix in etcd and returns a slice + // containing the values of all keys found, recursively, underneath that + // prefix. + GetEntries(prefix string) ([]string, error) + + // WatchPrefix watches the given prefix in etcd for changes. When a change + // is detected, it will signal on the passed channel. Clients are expected + // to call GetEntries to update themselves with the latest set of complete + // values. WatchPrefix will always send an initial sentinel value on the + // channel after establishing the watch, to ensure that clients always + // receive the latest set of values. WatchPrefix will block until the + // context passed to the NewClient constructor is terminated. + WatchPrefix(prefix string, ch chan struct{}) + + // Register a service with etcd. + Register(s Service) error + + // Deregister a service with etcd. + Deregister(s Service) error + + // LeaseID returns the lease id created for this service instance + LeaseID() int64 +} + +type client struct { + cli *clientv3.Client + ctx context.Context + + kv clientv3.KV + + // Watcher interface instance, used to leverage Watcher.Close() + watcher clientv3.Watcher + // watcher context + wctx context.Context + // watcher cancel func + wcf context.CancelFunc + + // leaseID will be 0 (clientv3.NoLease) if a lease was not created + leaseID clientv3.LeaseID + + hbch <-chan *clientv3.LeaseKeepAliveResponse + // Lease interface instance, used to leverage Lease.Close() + leaser clientv3.Lease +} + +// ClientOptions defines options for the etcd client. All values are optional. +// If any duration is not specified, a default of 3 seconds will be used. +type ClientOptions struct { + Cert string + Key string + CACert string + DialTimeout time.Duration + DialKeepAlive time.Duration + Username string + Password string +} + +// NewClient returns Client with a connection to the named machines. It will +// return an error if a connection to the cluster cannot be made. +func NewClient(ctx context.Context, machines []string, options ClientOptions) (Client, error) { + if options.DialTimeout == 0 { + options.DialTimeout = 3 * time.Second + } + if options.DialKeepAlive == 0 { + options.DialKeepAlive = 3 * time.Second + } + + var err error + var tlscfg *tls.Config + + if options.Cert != "" && options.Key != "" { + tlsInfo := transport.TLSInfo{ + CertFile: options.Cert, + KeyFile: options.Key, + TrustedCAFile: options.CACert, + } + tlscfg, err = tlsInfo.ClientConfig() + if err != nil { + return nil, err + } + } + + cli, err := clientv3.New(clientv3.Config{ + Context: ctx, + Endpoints: machines, + DialTimeout: options.DialTimeout, + DialKeepAliveTime: options.DialKeepAlive, + TLS: tlscfg, + Username: options.Username, + Password: options.Password, + }) + if err != nil { + return nil, err + } + + return &client{ + cli: cli, + ctx: ctx, + kv: clientv3.NewKV(cli), + }, nil +} + +func (c *client) LeaseID() int64 { return int64(c.leaseID) } + +// GetEntries implements the etcd Client interface. +func (c *client) GetEntries(key string) ([]string, error) { + resp, err := c.kv.Get(c.ctx, key, clientv3.WithPrefix()) + if err != nil { + return nil, err + } + + entries := make([]string, len(resp.Kvs)) + for i, kv := range resp.Kvs { + entries[i] = string(kv.Value) + } + + return entries, nil +} + +// WatchPrefix implements the etcd Client interface. +func (c *client) WatchPrefix(prefix string, ch chan struct{}) { + c.wctx, c.wcf = context.WithCancel(c.ctx) + c.watcher = clientv3.NewWatcher(c.cli) + + wch := c.watcher.Watch(c.wctx, prefix, clientv3.WithPrefix(), clientv3.WithRev(0)) + ch <- struct{}{} + for wr := range wch { + if wr.Canceled { + return + } + ch <- struct{}{} + } +} + +func (c *client) Register(s Service) error { + var err error + + if s.Key == "" { + return ErrNoKey + } + if s.Value == "" { + return ErrNoValue + } + + if c.leaser != nil { + c.leaser.Close() + } + c.leaser = clientv3.NewLease(c.cli) + + if c.watcher != nil { + c.watcher.Close() + } + c.watcher = clientv3.NewWatcher(c.cli) + if c.kv == nil { + c.kv = clientv3.NewKV(c.cli) + } + + if s.TTL == nil { + s.TTL = NewTTLOption(time.Second*3, time.Second*10) + } + + grantResp, err := c.leaser.Grant(c.ctx, int64(s.TTL.ttl.Seconds())) + if err != nil { + return err + } + c.leaseID = grantResp.ID + + _, err = c.kv.Put( + c.ctx, + s.Key, + s.Value, + clientv3.WithLease(c.leaseID), + ) + if err != nil { + return err + } + + // this will keep the key alive 'forever' or until we revoke it or + // the context is canceled + c.hbch, err = c.leaser.KeepAlive(c.ctx, c.leaseID) + if err != nil { + return err + } + + return nil +} + +func (c *client) Deregister(s Service) error { + defer c.close() + + if s.Key == "" { + return ErrNoKey + } + if _, err := c.cli.Delete(c.ctx, s.Key, clientv3.WithIgnoreLease()); err != nil { + return err + } + + return nil +} + +func (c *client) close() error { + if c.leaser != nil { + c.leaser.Close() + } + if c.watcher != nil { + c.watcher.Close() + c.wcf() + } + return nil +} diff --git a/sd/etcdv3/doc.go b/sd/etcdv3/doc.go new file mode 100644 index 000000000..dde1a00c5 --- /dev/null +++ b/sd/etcdv3/doc.go @@ -0,0 +1 @@ +package etcdv3 diff --git a/sd/etcdv3/example_test.go b/sd/etcdv3/example_test.go new file mode 100644 index 000000000..1422e31b7 --- /dev/null +++ b/sd/etcdv3/example_test.go @@ -0,0 +1,92 @@ +package etcdv3 + +import ( + "context" + "io" + "time" + + "github.com/go-kit/kit/endpoint" + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/sd" + "github.com/go-kit/kit/sd/lb" +) + +func Example() { + // Let's say this is a service that means to register itself. + // First, we will set up some context. + var ( + etcdServer = "10.0.0.1:2379" // in the change from v2 to v3, the schema is no longer necessary if connecting directly to an etcd v3 instance + prefix = "/services/foosvc/" // known at compile time + instance = "1.2.3.4:8080" // taken from runtime or platform, somehow + key = prefix + instance // should be globally unique + value = "http://" + instance // based on our transport + ctx = context.Background() + ) + + options := ClientOptions{ + // Path to trusted ca file + CACert: "", + + // Path to certificate + Cert: "", + + // Path to private key + Key: "", + + // Username if required + Username: "", + + // Password if required + Password: "", + + // If DialTimeout is 0, it defaults to 3s + DialTimeout: time.Second * 3, + + // If DialKeepAlive is 0, it defaults to 3s + DialKeepAlive: time.Second * 3, + } + + // Build the client. + client, err := NewClient(ctx, []string{etcdServer}, options) + if err != nil { + panic(err) + } + + // Build the registrar. + registrar := NewRegistrar(client, Service{ + Key: key, + Value: value, + }, log.NewNopLogger()) + + // Register our instance. + registrar.Register() + + // At the end of our service lifecycle, for example at the end of func main, + // we should make sure to deregister ourselves. This is important! Don't + // accidentally skip this step by invoking a log.Fatal or os.Exit in the + // interim, which bypasses the defer stack. + defer registrar.Deregister() + + // It's likely that we'll also want to connect to other services and call + // their methods. We can build an Instancer to listen for changes from etcd, + // create Endpointer, wrap it with a load-balancer to pick a single + // endpoint, and finally wrap it with a retry strategy to get something that + // can be used as an endpoint directly. + barPrefix := "/services/barsvc" + logger := log.NewNopLogger() + instancer, err := NewInstancer(client, barPrefix, logger) + if err != nil { + panic(err) + } + endpointer := sd.NewEndpointer(instancer, barFactory, logger) + balancer := lb.NewRoundRobin(endpointer) + retry := lb.Retry(3, 3*time.Second, balancer) + + // And now retry can be used like any other endpoint. + req := struct{}{} + if _, err = retry(ctx, req); err != nil { + panic(err) + } +} + +func barFactory(string) (endpoint.Endpoint, io.Closer, error) { return endpoint.Nop, nil, nil } diff --git a/sd/etcdv3/instancer.go b/sd/etcdv3/instancer.go new file mode 100644 index 000000000..17442b1a9 --- /dev/null +++ b/sd/etcdv3/instancer.go @@ -0,0 +1,76 @@ +package etcdv3 + +import ( + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/sd" + "github.com/go-kit/kit/sd/internal/instance" +) + +// Instancer yields instances stored in a certain etcd keyspace. Any kind of +// change in that keyspace is watched and will update the Instancer's Instancers. +type Instancer struct { + cache *instance.Cache + client Client + prefix string + logger log.Logger + quitc chan struct{} +} + +// NewInstancer returns an etcd instancer. It will start watching the given +// prefix for changes, and update the subscribers. +func NewInstancer(c Client, prefix string, logger log.Logger) (*Instancer, error) { + s := &Instancer{ + client: c, + prefix: prefix, + cache: instance.NewCache(), + logger: logger, + quitc: make(chan struct{}), + } + + instances, err := s.client.GetEntries(s.prefix) + if err == nil { + logger.Log("prefix", s.prefix, "instances", len(instances)) + } else { + logger.Log("prefix", s.prefix, "err", err) + } + s.cache.Update(sd.Event{Instances: instances, Err: err}) + + go s.loop() + return s, nil +} + +func (s *Instancer) loop() { + ch := make(chan struct{}) + go s.client.WatchPrefix(s.prefix, ch) + + for { + select { + case <-ch: + instances, err := s.client.GetEntries(s.prefix) + if err != nil { + s.logger.Log("msg", "failed to retrieve entries", "err", err) + s.cache.Update(sd.Event{Err: err}) + continue + } + s.cache.Update(sd.Event{Instances: instances}) + + case <-s.quitc: + return + } + } +} + +// Stop terminates the Instancer. +func (s *Instancer) Stop() { + close(s.quitc) +} + +// Register implements Instancer. +func (s *Instancer) Register(ch chan<- sd.Event) { + s.cache.Register(ch) +} + +// Deregister implements Instancer. +func (s *Instancer) Deregister(ch chan<- sd.Event) { + s.cache.Deregister(ch) +} diff --git a/sd/etcdv3/instancer_test.go b/sd/etcdv3/instancer_test.go new file mode 100644 index 000000000..79ae7029a --- /dev/null +++ b/sd/etcdv3/instancer_test.go @@ -0,0 +1,86 @@ +package etcdv3 + +import ( + "errors" + "testing" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/sd" +) + +var _ sd.Instancer = (*Instancer)(nil) // API check + +type testKV struct { + Key []byte + Value []byte +} + +type testResponse struct { + Kvs []testKV +} + +var ( + fakeResponse = testResponse{ + Kvs: []testKV{ + { + Key: []byte("/foo/1"), + Value: []byte("1:1"), + }, + { + Key: []byte("/foo/2"), + Value: []byte("2:2"), + }, + }, + } +) + +var _ sd.Instancer = &Instancer{} // API check + +func TestInstancer(t *testing.T) { + client := &fakeClient{ + responses: map[string]testResponse{"/foo": fakeResponse}, + } + + s, err := NewInstancer(client, "/foo", log.NewNopLogger()) + if err != nil { + t.Fatal(err) + } + defer s.Stop() + + if state := s.cache.State(); state.Err != nil { + t.Fatal(state.Err) + } +} + +type fakeClient struct { + responses map[string]testResponse +} + +func (c *fakeClient) GetEntries(prefix string) ([]string, error) { + response, ok := c.responses[prefix] + if !ok { + return nil, errors.New("key not exist") + } + + entries := make([]string, len(response.Kvs)) + for i, node := range response.Kvs { + entries[i] = string(node.Value) + } + return entries, nil +} + +func (c *fakeClient) CancelWatch() {} + +func (c *fakeClient) WatchPrefix(prefix string, ch chan struct{}) { +} + +func (c *fakeClient) LeaseID() int64 { + return 0 +} + +func (c *fakeClient) Register(Service) error { + return nil +} +func (c *fakeClient) Deregister(Service) error { + return nil +} diff --git a/sd/etcdv3/integration_test.go b/sd/etcdv3/integration_test.go new file mode 100644 index 000000000..c1d6dbf25 --- /dev/null +++ b/sd/etcdv3/integration_test.go @@ -0,0 +1,176 @@ +// +build integration + +package etcdv3 + +import ( + "context" + "io" + "os" + "testing" + "time" + + "github.com/go-kit/kit/endpoint" + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/sd" +) + +func runIntegration(settings integrationSettings, client Client, service Service, t *testing.T) { + // Verify test data is initially empty. + entries, err := client.GetEntries(settings.key) + if err != nil { + t.Fatalf("GetEntries(%q): expected no error, got one: %v", settings.key, err) + } + if len(entries) > 0 { + t.Fatalf("GetEntries(%q): expected no instance entries, got %d", settings.key, len(entries)) + } + t.Logf("GetEntries(%q): %v (OK)", settings.key, entries) + + // Instantiate a new Registrar, passing in test data. + registrar := NewRegistrar( + client, + service, + log.With(log.NewLogfmtLogger(os.Stderr), "component", "registrar"), + ) + + // Register our instance. + registrar.Register() + t.Logf("Registered") + + // Retrieve entries from etcd manually. + entries, err = client.GetEntries(settings.key) + if err != nil { + t.Fatalf("client.GetEntries(%q): %v", settings.key, err) + } + if want, have := 1, len(entries); want != have { + t.Fatalf("client.GetEntries(%q): want %d, have %d", settings.key, want, have) + } + if want, have := settings.value, entries[0]; want != have { + t.Fatalf("want %q, have %q", want, have) + } + + instancer, err := NewInstancer( + client, + settings.prefix, + log.With(log.NewLogfmtLogger(os.Stderr), "component", "instancer"), + ) + if err != nil { + t.Fatalf("NewInstancer: %v", err) + } + t.Logf("Constructed Instancer OK") + defer instancer.Stop() + + endpointer := sd.NewEndpointer( + instancer, + func(string) (endpoint.Endpoint, io.Closer, error) { return endpoint.Nop, nil, nil }, + log.With(log.NewLogfmtLogger(os.Stderr), "component", "instancer"), + ) + t.Logf("Constructed Endpointer OK") + defer endpointer.Close() + + if !within(time.Second, func() bool { + endpoints, err := endpointer.Endpoints() + return err == nil && len(endpoints) == 1 + }) { + t.Fatalf("Endpointer didn't see Register in time") + } + t.Logf("Endpointer saw Register OK") + + // Deregister first instance of test data. + registrar.Deregister() + t.Logf("Deregistered") + + // Check it was deregistered. + if !within(time.Second, func() bool { + endpoints, err := endpointer.Endpoints() + t.Logf("Checking Deregister: len(endpoints) = %d, err = %v", len(endpoints), err) + return err == nil && len(endpoints) == 0 + }) { + t.Fatalf("Endpointer didn't see Deregister in time") + } + + // Verify test data no longer exists in etcd. + entries, err = client.GetEntries(settings.key) + if err != nil { + t.Fatalf("GetEntries(%q): expected no error, got one: %v", settings.key, err) + } + if len(entries) > 0 { + t.Fatalf("GetEntries(%q): expected no entries, got %v", settings.key, entries) + } + t.Logf("GetEntries(%q): %v (OK)", settings.key, entries) +} + +type integrationSettings struct { + addr string + prefix string + instance string + key string + value string +} + +func testIntegrationSettings(t *testing.T) integrationSettings { + var settings integrationSettings + + settings.addr = os.Getenv("ETCD_ADDR") + if settings.addr == "" { + t.Skip("ETCD_ADDR not set; skipping integration test") + } + + settings.prefix = "/services/foosvc/" // known at compile time + settings.instance = "1.2.3.4:8080" // taken from runtime or platform, somehow + settings.key = settings.prefix + settings.instance + settings.value = "http://" + settings.instance // based on our transport + + return settings +} + +// Package sd/etcd provides a wrapper around the etcd key/value store. This +// example assumes the user has an instance of etcd installed and running +// locally on port 2379. +func TestIntegration(t *testing.T) { + settings := testIntegrationSettings(t) + client, err := NewClient(context.Background(), []string{settings.addr}, ClientOptions{ + DialTimeout: 2 * time.Second, + DialKeepAlive: 2 * time.Second, + }) + if err != nil { + t.Fatalf("NewClient(%q): %v", settings.addr, err) + } + + service := Service{ + Key: settings.key, + Value: settings.value, + } + + runIntegration(settings, client, service, t) +} + +func TestIntegrationTTL(t *testing.T) { + settings := testIntegrationSettings(t) + client, err := NewClient(context.Background(), []string{settings.addr}, ClientOptions{ + DialTimeout: 2 * time.Second, + DialKeepAlive: 2 * time.Second, + }) + if err != nil { + t.Fatalf("NewClient(%q): %v", settings.addr, err) + } + + service := Service{ + Key: settings.key, + Value: settings.value, + TTL: NewTTLOption(time.Second*3, time.Second*10), + } + defer client.Deregister(service) + + runIntegration(settings, client, service, t) +} + +func within(d time.Duration, f func() bool) bool { + deadline := time.Now().Add(d) + for time.Now().Before(deadline) { + if f() { + return true + } + time.Sleep(d / 10) + } + return false +} diff --git a/sd/etcdv3/registrar.go b/sd/etcdv3/registrar.go new file mode 100644 index 000000000..7b8a99b83 --- /dev/null +++ b/sd/etcdv3/registrar.go @@ -0,0 +1,119 @@ +package etcdv3 + +import ( + "sync" + "time" + + "github.com/go-kit/kit/log" +) + +const minHeartBeatTime = 500 * time.Millisecond + +// Registrar registers service instance liveness information to etcd. +type Registrar struct { + client Client + service Service + logger log.Logger + + quitmtx sync.Mutex + quit chan struct{} +} + +// Service holds the instance identifying data you want to publish to etcd. Key +// must be unique, and value is the string returned to subscribers, typically +// called the "instance" string in other parts of package sd. +type Service struct { + Key string // unique key, e.g. "/service/foobar/1.2.3.4:8080" + Value string // returned to subscribers, e.g. "http://1.2.3.4:8080" + TTL *TTLOption +} + +// TTLOption allow setting a key with a TTL. This option will be used by a loop +// goroutine which regularly refreshes the lease of the key. +type TTLOption struct { + heartbeat time.Duration // e.g. time.Second * 3 + ttl time.Duration // e.g. time.Second * 10 +} + +// NewTTLOption returns a TTLOption that contains proper TTL settings. Heartbeat +// is used to refresh the lease of the key periodically; its value should be at +// least 500ms. TTL defines the lease of the key; its value should be +// significantly greater than heartbeat. +// +// Good default values might be 3s heartbeat, 10s TTL. +func NewTTLOption(heartbeat, ttl time.Duration) *TTLOption { + if heartbeat <= minHeartBeatTime { + heartbeat = minHeartBeatTime + } + if ttl <= heartbeat { + ttl = 3 * heartbeat + } + return &TTLOption{ + heartbeat: heartbeat, + ttl: ttl, + } +} + +// NewRegistrar returns a etcd Registrar acting on the provided catalog +// registration (service). +func NewRegistrar(client Client, service Service, logger log.Logger) *Registrar { + return &Registrar{ + client: client, + service: service, + logger: log.With(logger, "key", service.Key, "value", service.Value), + } +} + +// Register implements the sd.Registrar interface. Call it when you want your +// service to be registered in etcd, typically at startup. +func (r *Registrar) Register() { + if err := r.client.Register(r.service); err != nil { + r.logger.Log("err", err) + return + } + if r.service.TTL != nil { + r.logger.Log("action", "register", "lease", r.client.LeaseID()) + } else { + r.logger.Log("action", "register") + } +} + +// func (r *Registrar) loop() { +// r.quitmtx.Lock() +// if r.quit != nil { +// return // already running +// } +// r.quit = make(chan struct{}) +// r.quitmtx.Unlock() + +// tick := time.NewTicker(r.service.TTL.heartbeat) +// defer tick.Stop() + +// for { +// select { +// case <-tick.C: +// if err := r.client.Register(r.service); err != nil { +// r.logger.Log("err", err) +// } +// case <-r.quit: +// return +// } +// } +// } + +// Deregister implements the sd.Registrar interface. Call it when you want your +// service to be deregistered from etcd, typically just prior to shutdown. +func (r *Registrar) Deregister() { + if err := r.client.Deregister(r.service); err != nil { + r.logger.Log("err", err) + } else { + r.logger.Log("action", "deregister") + } + + r.quitmtx.Lock() + defer r.quitmtx.Unlock() + if r.quit != nil { + close(r.quit) + r.quit = nil + } +} diff --git a/sd/etcdv3/registrar_test.go b/sd/etcdv3/registrar_test.go new file mode 100644 index 000000000..afd01b704 --- /dev/null +++ b/sd/etcdv3/registrar_test.go @@ -0,0 +1,120 @@ +package etcdv3 + +import ( + "bytes" + "errors" + "testing" + + "github.com/go-kit/kit/log" +) + +// testClient is a basic implementation of Client +type testClient struct { + registerRes error // value returned when Register or Deregister is called +} + +func (tc *testClient) GetEntries(prefix string) ([]string, error) { + return nil, nil +} + +func (tc *testClient) CancelWatch() {} + +func (tc *testClient) WatchPrefix(prefix string, ch chan struct{}) { +} + +func (tc *testClient) Register(s Service) error { + return tc.registerRes +} + +func (tc *testClient) Deregister(s Service) error { + return tc.registerRes +} + +func (tc *testClient) LeaseID() int64 { + return 0 +} + +// default service used to build registrar in our tests +var testService = Service{ + Key: "testKey", + Value: "testValue", + TTL: nil, +} + +// NewRegistar should return a registar with a logger using the service key and value +func TestNewRegistar(t *testing.T) { + c := Client(&testClient{nil}) + buf := &bytes.Buffer{} + logger := log.NewLogfmtLogger(buf) + r := NewRegistrar( + c, + testService, + logger, + ) + + if err := r.logger.Log("msg", "message"); err != nil { + t.Fatal(err) + } + if want, have := "key=testKey value=testValue msg=message\n", buf.String(); want != have { + t.Errorf("\nwant: %shave: %s", want, have) + } +} + +// Register log the error returned by the client or log the successful registration action +// table of test cases for method Register +var registerTestTable = []struct { + registerRes error // value returned by the client on calls to Register + log string // expected log by the registrar + +}{ + // test case: an error is returned by the client + {errors.New("regError"), "key=testKey value=testValue err=regError\n"}, + // test case: registration successful + {nil, "key=testKey value=testValue action=register\n"}, +} + +func TestRegister(t *testing.T) { + for _, tc := range registerTestTable { + c := Client(&testClient{tc.registerRes}) + buf := &bytes.Buffer{} + logger := log.NewLogfmtLogger(buf) + r := NewRegistrar( + c, + testService, + logger, + ) + r.Register() + if want, have := tc.log, buf.String(); want != have { + t.Fatalf("want %v, have %v", want, have) + } + } +} + +// Deregister log the error returned by the client or log the successful deregistration action +// table of test cases for method Deregister +var deregisterTestTable = []struct { + deregisterRes error // value returned by the client on calls to Deregister + log string // expected log by the registrar +}{ + // test case: an error is returned by the client + {errors.New("deregError"), "key=testKey value=testValue err=deregError\n"}, + // test case: deregistration successful + {nil, "key=testKey value=testValue action=deregister\n"}, +} + +func TestDeregister(t *testing.T) { + for _, tc := range deregisterTestTable { + c := Client(&testClient{tc.deregisterRes}) + buf := &bytes.Buffer{} + logger := log.NewLogfmtLogger(buf) + r := NewRegistrar( + c, + testService, + logger, + ) + r.Deregister() + if want, have := tc.log, buf.String(); want != have { + t.Fatalf("want %v, have %v", want, have) + } + } +} From acac733a3f6fab305bbece6cc38b07dc34acae47 Mon Sep 17 00:00:00 2001 From: robin Date: Tue, 13 Feb 2018 09:45:01 -0600 Subject: [PATCH 2/2] - dead code cleanup - moved global structs inside of test functions since they are not used anywhere else - moved unused Err variable - removed error return type from close() - added doc comment --- sd/etcdv3/client.go | 7 +++-- sd/etcdv3/doc.go | 3 +++ sd/etcdv3/instancer_test.go | 2 -- sd/etcdv3/registrar.go | 23 ---------------- sd/etcdv3/registrar_test.go | 52 ++++++++++++++++++------------------- 5 files changed, 31 insertions(+), 56 deletions(-) diff --git a/sd/etcdv3/client.go b/sd/etcdv3/client.go index 2eeb41089..57d16387d 100644 --- a/sd/etcdv3/client.go +++ b/sd/etcdv3/client.go @@ -16,8 +16,6 @@ var ( // ErrNoValue indicates a client method needs a value but receives none. ErrNoValue = errors.New("no value provided") - - ErrKeyNotFound = errors.New("requested key not found") ) // Client is a wrapper around the etcd client. @@ -222,7 +220,9 @@ func (c *client) Deregister(s Service) error { return nil } -func (c *client) close() error { +// close will close any open clients and call +// the watcher cancel func +func (c *client) close() { if c.leaser != nil { c.leaser.Close() } @@ -230,5 +230,4 @@ func (c *client) close() error { c.watcher.Close() c.wcf() } - return nil } diff --git a/sd/etcdv3/doc.go b/sd/etcdv3/doc.go index dde1a00c5..e2aa0dde8 100644 --- a/sd/etcdv3/doc.go +++ b/sd/etcdv3/doc.go @@ -1 +1,4 @@ +// Package etcdv3 provides an Instancer and Registrar implementation for etcd v3. If +// you use etcd v3 as your service discovery system, this package will help you +// implement the registration and client-side load balancing patterns. package etcdv3 diff --git a/sd/etcdv3/instancer_test.go b/sd/etcdv3/instancer_test.go index 79ae7029a..ecedf8c8e 100644 --- a/sd/etcdv3/instancer_test.go +++ b/sd/etcdv3/instancer_test.go @@ -69,8 +69,6 @@ func (c *fakeClient) GetEntries(prefix string) ([]string, error) { return entries, nil } -func (c *fakeClient) CancelWatch() {} - func (c *fakeClient) WatchPrefix(prefix string, ch chan struct{}) { } diff --git a/sd/etcdv3/registrar.go b/sd/etcdv3/registrar.go index 7b8a99b83..27f7a6b6b 100644 --- a/sd/etcdv3/registrar.go +++ b/sd/etcdv3/registrar.go @@ -78,29 +78,6 @@ func (r *Registrar) Register() { } } -// func (r *Registrar) loop() { -// r.quitmtx.Lock() -// if r.quit != nil { -// return // already running -// } -// r.quit = make(chan struct{}) -// r.quitmtx.Unlock() - -// tick := time.NewTicker(r.service.TTL.heartbeat) -// defer tick.Stop() - -// for { -// select { -// case <-tick.C: -// if err := r.client.Register(r.service); err != nil { -// r.logger.Log("err", err) -// } -// case <-r.quit: -// return -// } -// } -// } - // Deregister implements the sd.Registrar interface. Call it when you want your // service to be deregistered from etcd, typically just prior to shutdown. func (r *Registrar) Deregister() { diff --git a/sd/etcdv3/registrar_test.go b/sd/etcdv3/registrar_test.go index afd01b704..052506fc8 100644 --- a/sd/etcdv3/registrar_test.go +++ b/sd/etcdv3/registrar_test.go @@ -17,8 +17,6 @@ func (tc *testClient) GetEntries(prefix string) ([]string, error) { return nil, nil } -func (tc *testClient) CancelWatch() {} - func (tc *testClient) WatchPrefix(prefix string, ch chan struct{}) { } @@ -60,20 +58,20 @@ func TestNewRegistar(t *testing.T) { } } -// Register log the error returned by the client or log the successful registration action -// table of test cases for method Register -var registerTestTable = []struct { - registerRes error // value returned by the client on calls to Register - log string // expected log by the registrar - -}{ - // test case: an error is returned by the client - {errors.New("regError"), "key=testKey value=testValue err=regError\n"}, - // test case: registration successful - {nil, "key=testKey value=testValue action=register\n"}, -} - func TestRegister(t *testing.T) { + // Register log the error returned by the client or log the successful registration action + // table of test cases for method Register + var registerTestTable = []struct { + registerRes error // value returned by the client on calls to Register + log string // expected log by the registrar + + }{ + // test case: an error is returned by the client + {errors.New("regError"), "key=testKey value=testValue err=regError\n"}, + // test case: registration successful + {nil, "key=testKey value=testValue action=register\n"}, + } + for _, tc := range registerTestTable { c := Client(&testClient{tc.registerRes}) buf := &bytes.Buffer{} @@ -90,19 +88,19 @@ func TestRegister(t *testing.T) { } } -// Deregister log the error returned by the client or log the successful deregistration action -// table of test cases for method Deregister -var deregisterTestTable = []struct { - deregisterRes error // value returned by the client on calls to Deregister - log string // expected log by the registrar -}{ - // test case: an error is returned by the client - {errors.New("deregError"), "key=testKey value=testValue err=deregError\n"}, - // test case: deregistration successful - {nil, "key=testKey value=testValue action=deregister\n"}, -} - func TestDeregister(t *testing.T) { + // Deregister log the error returned by the client or log the successful deregistration action + // table of test cases for method Deregister + var deregisterTestTable = []struct { + deregisterRes error // value returned by the client on calls to Deregister + log string // expected log by the registrar + }{ + // test case: an error is returned by the client + {errors.New("deregError"), "key=testKey value=testValue err=deregError\n"}, + // test case: deregistration successful + {nil, "key=testKey value=testValue action=deregister\n"}, + } + for _, tc := range deregisterTestTable { c := Client(&testClient{tc.deregisterRes}) buf := &bytes.Buffer{}