diff --git a/sd/internal/instance/cache.go b/sd/internal/instance/cache.go index 27b2122a1..0eac0a430 100644 --- a/sd/internal/instance/cache.go +++ b/sd/internal/instance/cache.go @@ -41,8 +41,10 @@ func (c *Cache) Update(event sd.Event) { // State returns the current state of discovery (instances or error) as sd.Event func (c *Cache) State() sd.Event { c.mtx.RLock() - defer c.mtx.RUnlock() - return c.state + event := c.state + c.mtx.RUnlock() + eventCopy := copyEvent(event) + return eventCopy } // Stop implements Instancer. Since the cache is just a plain-old store of data, @@ -54,8 +56,10 @@ func (c *Cache) Register(ch chan<- sd.Event) { c.mtx.Lock() defer c.mtx.Unlock() c.reg.register(ch) + event := c.state + eventCopy := copyEvent(event) // always push the current state to new channels - ch <- c.state + ch <- eventCopy } // Deregister implements Instancer. @@ -70,7 +74,8 @@ type registry map[chan<- sd.Event]struct{} func (r registry) broadcast(event sd.Event) { for c := range r { - c <- event + eventCopy := copyEvent(event) + c <- eventCopy } } @@ -81,3 +86,17 @@ func (r registry) register(c chan<- sd.Event) { func (r registry) deregister(c chan<- sd.Event) { delete(r, c) } + +// copyEvent does a deep copy on sd.Event +func copyEvent(e sd.Event) sd.Event { + // observers all need their own copy of event + // because they can directly modify event.Instances + // for example, by calling sort.Strings + if e.Instances == nil { + return e + } + instances := make([]string, len(e.Instances)) + copy(instances, e.Instances) + e.Instances = instances + return e +} diff --git a/sd/internal/instance/cache_test.go b/sd/internal/instance/cache_test.go index 05a1cc270..d1135b458 100644 --- a/sd/internal/instance/cache_test.go +++ b/sd/internal/instance/cache_test.go @@ -1,10 +1,15 @@ package instance import ( + "context" + "fmt" + "io" "reflect" "testing" "time" + "github.com/go-kit/kit/endpoint" + "github.com/go-kit/kit/log" "github.com/go-kit/kit/sd" ) @@ -75,3 +80,31 @@ func TestRegistry(t *testing.T) { // if deregister didn't work, broadcast would panic on closed channels reg.broadcast(sd.Event{Instances: []string{"x", "y"}}) } + +// This test is meant to be run with the race detector enabled: -race. +// It ensures that every registered observer receives a copy +// of sd.Event.Instances because observers can directly modify the field. +// For example, endpointCache calls sort.Strings() on sd.Event.Instances. +func TestDataRace(t *testing.T) { + instances := make([]string, 0) + // the number of iterations here maters because we need sort.Strings to + // perform a Swap in doPivot -> medianOfThree to cause a data race. + for i := 1; i < 1000; i++ { + instances = append(instances, fmt.Sprintf("%v", i)) + } + e1 := sd.Event{Instances: instances} + cache := NewCache() + cache.Update(e1) + nullEndpoint := func(_ context.Context, _ interface{}) (interface{}, error) { + return nil, nil + } + nullFactory := func(instance string) (endpoint.Endpoint, io.Closer, error) { + return nullEndpoint, nil, nil + } + logger := log.Logger(log.LoggerFunc(func(keyvals ...interface{}) error { + return nil + })) + + sd.NewEndpointer(cache, nullFactory, logger) + sd.NewEndpointer(cache, nullFactory, logger) +}