Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 23 additions & 4 deletions sd/internal/instance/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,10 @@ func (c *Cache) Update(event sd.Event) {
// State returns the current state of discovery (instances or error) as sd.Event
func (c *Cache) State() sd.Event {
c.mtx.RLock()
defer c.mtx.RUnlock()
return c.state
event := c.state
c.mtx.RUnlock()
eventCopy := copyEvent(event)
return eventCopy
}

// Stop implements Instancer. Since the cache is just a plain-old store of data,
Expand All @@ -54,8 +56,10 @@ func (c *Cache) Register(ch chan<- sd.Event) {
c.mtx.Lock()
defer c.mtx.Unlock()
c.reg.register(ch)
event := c.state
eventCopy := copyEvent(event)
// always push the current state to new channels
ch <- c.state
ch <- eventCopy
}

// Deregister implements Instancer.
Expand All @@ -70,7 +74,8 @@ type registry map[chan<- sd.Event]struct{}

func (r registry) broadcast(event sd.Event) {
for c := range r {
c <- event
eventCopy := copyEvent(event)
c <- eventCopy
}
}

Expand All @@ -81,3 +86,17 @@ func (r registry) register(c chan<- sd.Event) {
func (r registry) deregister(c chan<- sd.Event) {
delete(r, c)
}

// copyEvent does a deep copy on sd.Event
func copyEvent(e sd.Event) sd.Event {
// observers all need their own copy of event
// because they can directly modify event.Instances
// for example, by calling sort.Strings
if e.Instances == nil {
return e
}
instances := make([]string, len(e.Instances))
copy(instances, e.Instances)
e.Instances = instances
return e
}
33 changes: 33 additions & 0 deletions sd/internal/instance/cache_test.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
package instance

import (
"context"
"fmt"
"io"
"reflect"
"testing"
"time"

"github.com/go-kit/kit/endpoint"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/sd"
)

Expand Down Expand Up @@ -75,3 +80,31 @@ func TestRegistry(t *testing.T) {
// if deregister didn't work, broadcast would panic on closed channels
reg.broadcast(sd.Event{Instances: []string{"x", "y"}})
}

// This test is meant to be run with the race detector enabled: -race.
// It ensures that every registered observer receives a copy
// of sd.Event.Instances because observers can directly modify the field.
// For example, endpointCache calls sort.Strings() on sd.Event.Instances.
func TestDataRace(t *testing.T) {
instances := make([]string, 0)
// the number of iterations here maters because we need sort.Strings to
// perform a Swap in doPivot -> medianOfThree to cause a data race.
for i := 1; i < 1000; i++ {
instances = append(instances, fmt.Sprintf("%v", i))
}
e1 := sd.Event{Instances: instances}
cache := NewCache()
cache.Update(e1)
nullEndpoint := func(_ context.Context, _ interface{}) (interface{}, error) {
return nil, nil
}
nullFactory := func(instance string) (endpoint.Endpoint, io.Closer, error) {
return nullEndpoint, nil, nil
}
logger := log.Logger(log.LoggerFunc(func(keyvals ...interface{}) error {
return nil
}))

sd.NewEndpointer(cache, nullFactory, logger)
sd.NewEndpointer(cache, nullFactory, logger)
}