From 4b481291dae9345d135b7315b2c17a29b048665f Mon Sep 17 00:00:00 2001 From: miao <362622365@qq.com> Date: Wed, 14 Dec 2016 17:38:31 +0800 Subject: [PATCH 1/2] Add TTL for etcd sd --- sd/etcd/client.go | 4 ++++ sd/etcd/registrar.go | 43 +++++++++++++++++++++++++++++++++++++++ sd/etcd/registrar_test.go | 2 +- 3 files changed, 48 insertions(+), 1 deletion(-) diff --git a/sd/etcd/client.go b/sd/etcd/client.go index 0508723b5..865ca3f61 100644 --- a/sd/etcd/client.go +++ b/sd/etcd/client.go @@ -156,6 +156,10 @@ func (c *client) Register(s Service) error { if s.Value == "" { return ErrNoValue } + if s.TTL != nil { + _, err := c.keysAPI.Set(c.ctx, s.Key, s.Value, &etcd.SetOptions{PrevExist: etcd.PrevIgnore, TTL: s.TTL.TTL}) + return err + } _, err := c.keysAPI.Create(c.ctx, s.Key, s.Value) return err } diff --git a/sd/etcd/registrar.go b/sd/etcd/registrar.go index 52b632a66..7a14e6724 100644 --- a/sd/etcd/registrar.go +++ b/sd/etcd/registrar.go @@ -4,6 +4,11 @@ import ( etcd "github.com/coreos/etcd/client" "github.com/go-kit/kit/log" + "time" +) + +const ( + MinHeartBeatTime = time.Millisecond * 500 ) // Registrar registers service instance liveness information to etcd. @@ -11,6 +16,7 @@ type Registrar struct { client Client service Service logger log.Logger + quit chan struct{} } // Service holds the instance identifying data you want to publish to etcd. Key @@ -19,9 +25,27 @@ type Registrar struct { type Service struct { Key string // unique key, e.g. "/service/foobar/1.2.3.4:8080" Value string // returned to subscribers, e.g. "http://1.2.3.4:8080" + TTL *TTLOption DeleteOptions *etcd.DeleteOptions } +// TTLOption allow setting a key with a TTL, and regularly refreshes the lease with a goroutine +type TTLOption struct { + Heartbeat time.Duration + TTL time.Duration +} + +// NewTTLOption returns a TTLOption +func NewTTLOption(heartbeat, ttl time.Duration) *TTLOption { + if heartbeat <= MinHeartBeatTime { + heartbeat = MinHeartBeatTime + } + if ttl <= heartbeat { + ttl = heartbeat * 3 + } + return &TTLOption{heartbeat, ttl} +} + // NewRegistrar returns a etcd Registrar acting on the provided catalog // registration (service). func NewRegistrar(client Client, service Service, logger log.Logger) *Registrar { @@ -43,6 +67,22 @@ func (r *Registrar) Register() { } else { r.logger.Log("action", "register") } + if r.service.TTL == nil { + return + } + r.quit = make(chan struct{}) + go func() { + for { + select { + case <-r.quit: + return + case <-time.After(r.service.TTL.Heartbeat): + if err := r.client.Register(r.service); err != nil { + r.logger.Log("err", err) + } + } + } + }() } // Deregister implements the sd.Registrar interface. Call it when you want your @@ -53,4 +93,7 @@ func (r *Registrar) Deregister() { } else { r.logger.Log("action", "deregister") } + if r.quit != nil { + close(r.quit) + } } diff --git a/sd/etcd/registrar_test.go b/sd/etcd/registrar_test.go index bea6f914c..9426f0b53 100644 --- a/sd/etcd/registrar_test.go +++ b/sd/etcd/registrar_test.go @@ -30,7 +30,7 @@ func (tc *testClient) Deregister(s Service) error { } // default service used to build registrar in our tests -var testService = Service{"testKey", "testValue", nil} +var testService = Service{"testKey", "testValue", nil, nil} // NewRegistar should return a registar with a logger using the service key and value func TestNewRegistar(t *testing.T) { From 899c5f2633e30b5a413a2a4746e7e0569e142b74 Mon Sep 17 00:00:00 2001 From: miao <362622365@qq.com> Date: Sun, 8 Jan 2017 00:22:41 +0800 Subject: [PATCH 2/2] Fix ttl for etcd sd --- sd/etcd/client.go | 7 ++--- sd/etcd/registrar.go | 63 +++++++++++++++++++++++++++++--------------- 2 files changed, 46 insertions(+), 24 deletions(-) diff --git a/sd/etcd/client.go b/sd/etcd/client.go index 865ca3f61..eb604d66b 100644 --- a/sd/etcd/client.go +++ b/sd/etcd/client.go @@ -156,11 +156,12 @@ func (c *client) Register(s Service) error { if s.Value == "" { return ErrNoValue } + var err error if s.TTL != nil { - _, err := c.keysAPI.Set(c.ctx, s.Key, s.Value, &etcd.SetOptions{PrevExist: etcd.PrevIgnore, TTL: s.TTL.TTL}) - return err + _, err = c.keysAPI.Set(c.ctx, s.Key, s.Value, &etcd.SetOptions{PrevExist: etcd.PrevIgnore, TTL: s.TTL.ttl}) + } else { + _, err = c.keysAPI.Create(c.ctx, s.Key, s.Value) } - _, err := c.keysAPI.Create(c.ctx, s.Key, s.Value) return err } diff --git a/sd/etcd/registrar.go b/sd/etcd/registrar.go index 7a14e6724..e11c6474e 100644 --- a/sd/etcd/registrar.go +++ b/sd/etcd/registrar.go @@ -1,14 +1,16 @@ package etcd import ( + "sync" + "time" + etcd "github.com/coreos/etcd/client" "github.com/go-kit/kit/log" - "time" ) const ( - MinHeartBeatTime = time.Millisecond * 500 + minHeartBeatTime = time.Millisecond * 500 ) // Registrar registers service instance liveness information to etcd. @@ -17,6 +19,7 @@ type Registrar struct { service Service logger log.Logger quit chan struct{} + sync.Mutex } // Service holds the instance identifying data you want to publish to etcd. Key @@ -29,21 +32,29 @@ type Service struct { DeleteOptions *etcd.DeleteOptions } -// TTLOption allow setting a key with a TTL, and regularly refreshes the lease with a goroutine +// TTLOption allow setting a key with a TTL. This option will be used by a loop +// goroutine which regularly refreshes the lease of the key. type TTLOption struct { - Heartbeat time.Duration - TTL time.Duration + heartbeat time.Duration // e.g. time.Second * 3 + ttl time.Duration // e.g. time.Second * 10 } -// NewTTLOption returns a TTLOption +// NewTTLOption returns a TTLOption that contains proper ttl settings. param +// heartbeat is used to refresh lease of the key periodically by a loop goroutine, +// its value should be at least 500ms. param ttl definite the lease of the key, +// its value should be greater than heartbeat's. +// e.g. heartbeat: time.Second * 3, ttl: time.Second * 10. func NewTTLOption(heartbeat, ttl time.Duration) *TTLOption { - if heartbeat <= MinHeartBeatTime { - heartbeat = MinHeartBeatTime + if heartbeat <= minHeartBeatTime { + heartbeat = minHeartBeatTime } if ttl <= heartbeat { ttl = heartbeat * 3 } - return &TTLOption{heartbeat, ttl} + return &TTLOption{ + heartbeat: heartbeat, + ttl: ttl, + } } // NewRegistrar returns a etcd Registrar acting on the provided catalog @@ -67,22 +78,29 @@ func (r *Registrar) Register() { } else { r.logger.Log("action", "register") } - if r.service.TTL == nil { - return + if r.service.TTL != nil { + go r.loop() } +} + +func (r *Registrar) loop() { + r.Lock() r.quit = make(chan struct{}) - go func() { - for { - select { - case <-r.quit: - return - case <-time.After(r.service.TTL.Heartbeat): - if err := r.client.Register(r.service); err != nil { - r.logger.Log("err", err) - } + r.Unlock() + + tick := time.NewTicker(r.service.TTL.heartbeat) + defer tick.Stop() + + for { + select { + case <-r.quit: + return + case <-tick.C: + if err := r.client.Register(r.service); err != nil { + r.logger.Log("err", err) } } - }() + } } // Deregister implements the sd.Registrar interface. Call it when you want your @@ -93,7 +111,10 @@ func (r *Registrar) Deregister() { } else { r.logger.Log("action", "deregister") } + r.Lock() + defer r.Unlock() if r.quit != nil { close(r.quit) + r.quit = nil } }