Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions sd/etcd/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ type Client interface {
// Register a service with etcd.
Register(s Service) error

// Register a service with etcd with a time to leave
RegisterTTL(s Service, TTL time.Duration) error

// Deregister a service with etcd.
Deregister(s Service) error
}
Expand Down Expand Up @@ -160,6 +163,17 @@ func (c *client) Register(s Service) error {
return err
}

func (c *client) RegisterTTL(s Service, TTL time.Duration) error {
if s.Key == "" {
return ErrNoKey
}
if s.Value == "" {
return ErrNoValue
}
_, err := c.keysAPI.Set(c.ctx, s.Key, s.Value, &etcd.SetOptions{PrevExist: etcd.PrevNoExist, TTL: TTL})
return err
}

func (c *client) Deregister(s Service) error {
if s.Key == "" {
return ErrNoKey
Expand Down
91 changes: 91 additions & 0 deletions sd/etcd/periodic_registrar.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package etcd

import (
"time"

"sync"

"github.com/go-kit/kit/log"
)

// PeriodicRegistrar periodically registers service instance liveness information to etcd.
type PeriodicRegistrar struct {
client Client
service Service
logger log.Logger
expiration time.Duration
frequency time.Duration
stopC chan chan bool
mu *sync.Mutex // mutex for registered
registered bool
}

// NewPeriodicRegistrar returns a etcd Registrar with recurring registeation acting on the provided catalog
// registration (service).
func NewPeriodicRegistrar(client Client, service Service, logger log.Logger, regFrequency, regExpiration time.Duration) *PeriodicRegistrar {
return &PeriodicRegistrar{
client: client,
service: service,
logger: log.NewContext(logger).With(
"key", service.Key,
"value", service.Value,
),
expiration: regExpiration,
frequency: regFrequency,
stopC: make(chan chan bool),
mu: &sync.Mutex{},
}
}

// Register implements the sd.Registrar interface. Call it when you want your
// service to be registered in etcd, typically at startup.
func (r *PeriodicRegistrar) Register() {
r.mu.Lock()
defer r.mu.Unlock()
if r.registered {
return
}
r.registered = true
r.logger.Log("action", "register")
r.register()
go func() {
tick := time.Tick(r.frequency)
for {
select {
case <-tick:
r.register()
case doneC := <-r.stopC:
r.deregister()
doneC <- true
}
}
}()
}

func (r *PeriodicRegistrar) register() {
if err := r.client.RegisterTTL(r.service, r.expiration); err != nil {
r.logger.Log("err", err)
}
}

// Deregister implements the sd.Registrar interface. Call it when you want your
// service to be deregistered from etcd, typically just prior to shutdown.
func (r *PeriodicRegistrar) Deregister() {
r.mu.Lock()
defer r.mu.Unlock()
if !r.registered {
return
}
done := make(chan bool)
r.stopC <- done
<-done
r.registered = false
}

func (r *PeriodicRegistrar) deregister() {
if err := r.client.Deregister(r.service); err != nil {
r.logger.Log("err", err)
} else {
r.logger.Log("action", "deregister")
}
}