From 445c6c70c2e07518b22fff968c863711d2e96e35 Mon Sep 17 00:00:00 2001 From: pdaures Date: Sun, 4 Sep 2016 22:49:11 +0200 Subject: [PATCH 1/2] sd/etcd:started to work on a periodical registrar --- sd/etcd/client.go | 14 +++++++ sd/etcd/periodic_registrar.go | 73 +++++++++++++++++++++++++++++++++++ 2 files changed, 87 insertions(+) create mode 100644 sd/etcd/periodic_registrar.go diff --git a/sd/etcd/client.go b/sd/etcd/client.go index 0508723b5..9688cef53 100644 --- a/sd/etcd/client.go +++ b/sd/etcd/client.go @@ -40,6 +40,9 @@ type Client interface { // Register a service with etcd. Register(s Service) error + // Register a service with etcd with a time to leave + RegisterTTL(s Service, TTL time.Duration) error + // Deregister a service with etcd. Deregister(s Service) error } @@ -160,6 +163,17 @@ func (c *client) Register(s Service) error { return err } +func (c *client) RegisterTTL(s Service, TTL time.Duration) error { + if s.Key == "" { + return ErrNoKey + } + if s.Value == "" { + return ErrNoValue + } + _, err := c.keysAPI.Set(c.ctx, s.Key, s.Value, &etcd.SetOptions{PrevExist: etcd.PrevNoExist, TTL: TTL}) + return err +} + func (c *client) Deregister(s Service) error { if s.Key == "" { return ErrNoKey diff --git a/sd/etcd/periodic_registrar.go b/sd/etcd/periodic_registrar.go new file mode 100644 index 000000000..8e70a2958 --- /dev/null +++ b/sd/etcd/periodic_registrar.go @@ -0,0 +1,73 @@ +package etcd + +import ( + "time" + + "github.com/go-kit/kit/log" +) + +// PeriodicRegistrar periodically registers service instance liveness information to etcd. +type PeriodicRegistrar struct { + client Client + service Service + logger log.Logger + expiration time.Duration + frequency time.Duration + stopC chan bool +} + +// NewPeriodicRegistrar returns a etcd Registrar with recurring registeation acting on the provided catalog +// registration (service). +func NewPeriodicRegistrar(client Client, service Service, logger log.Logger, frequencySec, expirationSec int) *PeriodicRegistrar { + return &PeriodicRegistrar{ + client: client, + service: service, + logger: log.NewContext(logger).With( + "key", service.Key, + "value", service.Value, + ), + expiration: time.Duration(expirationSec) * time.Second, + frequency: time.Duration(frequencySec) * time.Second, + stopC: make(chan bool), + } +} + +// Register implements the sd.Registrar interface. Call it when you want your +// service to be registered in etcd, typically at startup. +func (r *PeriodicRegistrar) Register() { + r.register() + go func() { + tick := time.Tick(r.frequency) + for { + select { + case <-tick: + r.register() + case <-r.stopC: + r.deregister() + } + } + }() +} + +func (r *PeriodicRegistrar) register() { + r.stopC <- true + if err := r.client.RegisterTTL(r.service, r.expiration); err != nil { + r.logger.Log("err", err) + } else { + r.logger.Log("action", "register") + } +} + +// Deregister implements the sd.Registrar interface. Call it when you want your +// service to be deregistered from etcd, typically just prior to shutdown. +func (r *PeriodicRegistrar) Deregister() { + r.stopC <- true +} + +func (r *PeriodicRegistrar) deregister() { + if err := r.client.Deregister(r.service); err != nil { + r.logger.Log("err", err) + } else { + r.logger.Log("action", "deregister") + } +} From 85f121c5c595089fba2e42789377084d741881fa Mon Sep 17 00:00:00 2001 From: pdaures Date: Fri, 9 Sep 2016 21:07:05 +0200 Subject: [PATCH 2/2] sd/etcd:periodic_registrar - use time.Duration for setup and prevent deadlocks --- sd/etcd/periodic_registrar.go | 38 ++++++++++++++++++++++++++--------- 1 file changed, 28 insertions(+), 10 deletions(-) diff --git a/sd/etcd/periodic_registrar.go b/sd/etcd/periodic_registrar.go index 8e70a2958..e7af2eff5 100644 --- a/sd/etcd/periodic_registrar.go +++ b/sd/etcd/periodic_registrar.go @@ -3,6 +3,8 @@ package etcd import ( "time" + "sync" + "github.com/go-kit/kit/log" ) @@ -13,12 +15,14 @@ type PeriodicRegistrar struct { logger log.Logger expiration time.Duration frequency time.Duration - stopC chan bool + stopC chan chan bool + mu *sync.Mutex // mutex for registered + registered bool } // NewPeriodicRegistrar returns a etcd Registrar with recurring registeation acting on the provided catalog // registration (service). -func NewPeriodicRegistrar(client Client, service Service, logger log.Logger, frequencySec, expirationSec int) *PeriodicRegistrar { +func NewPeriodicRegistrar(client Client, service Service, logger log.Logger, regFrequency, regExpiration time.Duration) *PeriodicRegistrar { return &PeriodicRegistrar{ client: client, service: service, @@ -26,15 +30,23 @@ func NewPeriodicRegistrar(client Client, service Service, logger log.Logger, fre "key", service.Key, "value", service.Value, ), - expiration: time.Duration(expirationSec) * time.Second, - frequency: time.Duration(frequencySec) * time.Second, - stopC: make(chan bool), + expiration: regExpiration, + frequency: regFrequency, + stopC: make(chan chan bool), + mu: &sync.Mutex{}, } } // Register implements the sd.Registrar interface. Call it when you want your // service to be registered in etcd, typically at startup. func (r *PeriodicRegistrar) Register() { + r.mu.Lock() + defer r.mu.Unlock() + if r.registered { + return + } + r.registered = true + r.logger.Log("action", "register") r.register() go func() { tick := time.Tick(r.frequency) @@ -42,26 +54,32 @@ func (r *PeriodicRegistrar) Register() { select { case <-tick: r.register() - case <-r.stopC: + case doneC := <-r.stopC: r.deregister() + doneC <- true } } }() } func (r *PeriodicRegistrar) register() { - r.stopC <- true if err := r.client.RegisterTTL(r.service, r.expiration); err != nil { r.logger.Log("err", err) - } else { - r.logger.Log("action", "register") } } // Deregister implements the sd.Registrar interface. Call it when you want your // service to be deregistered from etcd, typically just prior to shutdown. func (r *PeriodicRegistrar) Deregister() { - r.stopC <- true + r.mu.Lock() + defer r.mu.Unlock() + if !r.registered { + return + } + done := make(chan bool) + r.stopC <- done + <-done + r.registered = false } func (r *PeriodicRegistrar) deregister() {