Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion sd/etcd/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,12 @@ func (c *client) Register(s Service) error {
if s.Value == "" {
return ErrNoValue
}
_, err := c.keysAPI.Create(c.ctx, s.Key, s.Value)
var err error
if s.TTL != nil {
_, err = c.keysAPI.Set(c.ctx, s.Key, s.Value, &etcd.SetOptions{PrevExist: etcd.PrevIgnore, TTL: s.TTL.ttl})
} else {
_, err = c.keysAPI.Create(c.ctx, s.Key, s.Value)
}
return err
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm. What do you think about

var err error
if s.TTL != nil {
    _, err = c.keysAPI.Set(...)
} else {
    _, err = c.keysAPI.Create(...)
}
return err

}

Expand Down
64 changes: 64 additions & 0 deletions sd/etcd/registrar.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,25 @@
package etcd

import (
"sync"
"time"

etcd "github.com/coreos/etcd/client"

"github.com/go-kit/kit/log"
)

const (
minHeartBeatTime = time.Millisecond * 500
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This constant should either be made private (I think that's preferable) or given a doc comment.


// Registrar registers service instance liveness information to etcd.
type Registrar struct {
client Client
service Service
logger log.Logger
quit chan struct{}
sync.Mutex
}

// Service holds the instance identifying data you want to publish to etcd. Key
Expand All @@ -19,9 +28,35 @@ type Registrar struct {
type Service struct {
Key string // unique key, e.g. "/service/foobar/1.2.3.4:8080"
Value string // returned to subscribers, e.g. "http://1.2.3.4:8080"
TTL *TTLOption
DeleteOptions *etcd.DeleteOptions
}

// TTLOption allow setting a key with a TTL. This option will be used by a loop
// goroutine which regularly refreshes the lease of the key.
type TTLOption struct {
heartbeat time.Duration // e.g. time.Second * 3
ttl time.Duration // e.g. time.Second * 10
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that a TTLOption is constructed with a constructor, I don't think either of these fields needs to be exported. Is that correct?


// NewTTLOption returns a TTLOption that contains proper ttl settings. param
// heartbeat is used to refresh lease of the key periodically by a loop goroutine,
// its value should be at least 500ms. param ttl definite the lease of the key,
// its value should be greater than heartbeat's.
// e.g. heartbeat: time.Second * 3, ttl: time.Second * 10.
func NewTTLOption(heartbeat, ttl time.Duration) *TTLOption {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The doc comment here is a little sparse. Could it please describe what each of the parameters are, and some good default values? It would also be good to describe the bounds-checking logic implemented in the function body.

if heartbeat <= minHeartBeatTime {
heartbeat = minHeartBeatTime
}
if ttl <= heartbeat {
ttl = heartbeat * 3
}
return &TTLOption{
heartbeat: heartbeat,
ttl: ttl,
}
}

// NewRegistrar returns a etcd Registrar acting on the provided catalog
// registration (service).
func NewRegistrar(client Client, service Service, logger log.Logger) *Registrar {
Expand All @@ -43,6 +78,29 @@ func (r *Registrar) Register() {
} else {
r.logger.Log("action", "register")
}
if r.service.TTL != nil {
go r.loop()
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This says "bail early if there's no configured TTL" — but I think it's much more clear if the logic is inverted, i.e. "if there is a configured TTL, create the quit chan and launch the maintenance loop".

}

func (r *Registrar) loop() {
r.Lock()
r.quit = make(chan struct{})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This mutates registrar state, which makes it unsafe for concurrent access from multiple goroutines. I think the registrar needs its internal state protected with a mutex, which should be taken with every public method.

r.Unlock()

tick := time.NewTicker(r.service.TTL.heartbeat)
defer tick.Stop()

for {
select {
case <-r.quit:
return
case <-tick.C:
if err := r.client.Register(r.service); err != nil {
r.logger.Log("err", err)
}
}
}
}

// Deregister implements the sd.Registrar interface. Call it when you want your
Expand All @@ -53,4 +111,10 @@ func (r *Registrar) Deregister() {
} else {
r.logger.Log("action", "deregister")
}
r.Lock()
defer r.Unlock()
if r.quit != nil {
close(r.quit)
r.quit = nil
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like r.quit should also be reset to nil.

}
2 changes: 1 addition & 1 deletion sd/etcd/registrar_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func (tc *testClient) Deregister(s Service) error {
}

// default service used to build registrar in our tests
var testService = Service{"testKey", "testValue", nil}
var testService = Service{"testKey", "testValue", nil, nil}

// NewRegistar should return a registar with a logger using the service key and value
func TestNewRegistar(t *testing.T) {
Expand Down