diff --git a/sd/etcd/client.go b/sd/etcd/client.go index 0508723b5..9688cef53 100644 --- a/sd/etcd/client.go +++ b/sd/etcd/client.go @@ -40,6 +40,9 @@ type Client interface { // Register a service with etcd. Register(s Service) error + // Register a service with etcd with a time to leave + RegisterTTL(s Service, TTL time.Duration) error + // Deregister a service with etcd. Deregister(s Service) error } @@ -160,6 +163,17 @@ func (c *client) Register(s Service) error { return err } +func (c *client) RegisterTTL(s Service, TTL time.Duration) error { + if s.Key == "" { + return ErrNoKey + } + if s.Value == "" { + return ErrNoValue + } + _, err := c.keysAPI.Set(c.ctx, s.Key, s.Value, &etcd.SetOptions{PrevExist: etcd.PrevNoExist, TTL: TTL}) + return err +} + func (c *client) Deregister(s Service) error { if s.Key == "" { return ErrNoKey diff --git a/sd/etcd/periodic_registrar.go b/sd/etcd/periodic_registrar.go new file mode 100644 index 000000000..e7af2eff5 --- /dev/null +++ b/sd/etcd/periodic_registrar.go @@ -0,0 +1,91 @@ +package etcd + +import ( + "time" + + "sync" + + "github.com/go-kit/kit/log" +) + +// PeriodicRegistrar periodically registers service instance liveness information to etcd. +type PeriodicRegistrar struct { + client Client + service Service + logger log.Logger + expiration time.Duration + frequency time.Duration + stopC chan chan bool + mu *sync.Mutex // mutex for registered + registered bool +} + +// NewPeriodicRegistrar returns a etcd Registrar with recurring registeation acting on the provided catalog +// registration (service). +func NewPeriodicRegistrar(client Client, service Service, logger log.Logger, regFrequency, regExpiration time.Duration) *PeriodicRegistrar { + return &PeriodicRegistrar{ + client: client, + service: service, + logger: log.NewContext(logger).With( + "key", service.Key, + "value", service.Value, + ), + expiration: regExpiration, + frequency: regFrequency, + stopC: make(chan chan bool), + mu: &sync.Mutex{}, + } +} + +// Register implements the sd.Registrar interface. Call it when you want your +// service to be registered in etcd, typically at startup. +func (r *PeriodicRegistrar) Register() { + r.mu.Lock() + defer r.mu.Unlock() + if r.registered { + return + } + r.registered = true + r.logger.Log("action", "register") + r.register() + go func() { + tick := time.Tick(r.frequency) + for { + select { + case <-tick: + r.register() + case doneC := <-r.stopC: + r.deregister() + doneC <- true + } + } + }() +} + +func (r *PeriodicRegistrar) register() { + if err := r.client.RegisterTTL(r.service, r.expiration); err != nil { + r.logger.Log("err", err) + } +} + +// Deregister implements the sd.Registrar interface. Call it when you want your +// service to be deregistered from etcd, typically just prior to shutdown. +func (r *PeriodicRegistrar) Deregister() { + r.mu.Lock() + defer r.mu.Unlock() + if !r.registered { + return + } + done := make(chan bool) + r.stopC <- done + <-done + r.registered = false +} + +func (r *PeriodicRegistrar) deregister() { + if err := r.client.Deregister(r.service); err != nil { + r.logger.Log("err", err) + } else { + r.logger.Log("action", "deregister") + } +}