From 2d31d73b4d38bbadcb44367d7fb68e9ea8b188f5 Mon Sep 17 00:00:00 2001 From: Alex Mavrogiannis Date: Wed, 21 Jun 2017 17:08:10 -0700 Subject: [PATCH] Extend watch queue with timeout and size limit Signed-off-by: Alex Mavrogiannis --- watch/queue/queue.go | 158 ++++++++++++++++++++++++++++++++++ watch/queue/queue_test.go | 176 ++++++++++++++++++++++++++++++++++++++ watch/sinks.go | 95 ++++++++++++++++++++ watch/sinks_test.go | 49 +++++++++++ watch/watch.go | 160 ++++++++++++++++++++++++++-------- watch/watch_test.go | 71 +++++++++++++++ 6 files changed, 674 insertions(+), 35 deletions(-) create mode 100644 watch/queue/queue.go create mode 100644 watch/queue/queue_test.go create mode 100644 watch/sinks.go create mode 100644 watch/sinks_test.go diff --git a/watch/queue/queue.go b/watch/queue/queue.go new file mode 100644 index 0000000000..10bdb92080 --- /dev/null +++ b/watch/queue/queue.go @@ -0,0 +1,158 @@ +package queue + +import ( + "container/list" + "fmt" + "sync" + + "github.com/Sirupsen/logrus" + "github.com/docker/go-events" +) + +// ErrQueueFull is returned by a Write operation when that Write causes the +// queue to reach its size limit. +var ErrQueueFull = fmt.Errorf("queue closed due to size limit") + +// LimitQueue accepts all messages into a queue for asynchronous consumption by +// a sink until an upper limit of messages is reached. When that limit is +// reached, the entire Queue is Closed. It is thread safe but the +// sink must be reliable or events will be dropped. +// If a size of 0 is provided, the LimitQueue is considered limitless. +type LimitQueue struct { + dst events.Sink + events *list.List + limit uint64 + cond *sync.Cond + mu sync.Mutex + closed bool + full chan struct{} + fullClosed bool +} + +// NewLimitQueue returns a queue to the provided Sink dst. +func NewLimitQueue(dst events.Sink, limit uint64) *LimitQueue { + eq := LimitQueue{ + dst: dst, + events: list.New(), + limit: limit, + full: make(chan struct{}), + } + + eq.cond = sync.NewCond(&eq.mu) + go eq.run() + return &eq +} + +// Write accepts the events into the queue, only failing if the queue has +// been closed or has reached its size limit. +func (eq *LimitQueue) Write(event events.Event) error { + eq.mu.Lock() + defer eq.mu.Unlock() + + if eq.closed { + return events.ErrSinkClosed + } + + if eq.limit > 0 && uint64(eq.events.Len()) >= eq.limit { + // If the limit has been reached, don't write the event to the queue, + // and close the Full channel. This notifies listeners that the queue + // is now full, but the sink is still permitted to consume events. It's + // the responsibility of the listener to decide whether they want to + // live with dropped events or whether they want to Close() the + // LimitQueue + if !eq.fullClosed { + eq.fullClosed = true + close(eq.full) + } + return ErrQueueFull + } + + eq.events.PushBack(event) + eq.cond.Signal() // signal waiters + + return nil +} + +// Full returns a channel that is closed when the queue becomes full for the +// first time. +func (eq *LimitQueue) Full() chan struct{} { + return eq.full +} + +// Close shuts down the event queue, flushing all events +func (eq *LimitQueue) Close() error { + eq.mu.Lock() + defer eq.mu.Unlock() + + if eq.closed { + return nil + } + + // set the closed flag + eq.closed = true + eq.cond.Signal() // signal flushes queue + eq.cond.Wait() // wait for signal from last flush + return eq.dst.Close() +} + +// run is the main goroutine to flush events to the target sink. +func (eq *LimitQueue) run() { + for { + event := eq.next() + + if event == nil { + return // nil block means event queue is closed. + } + + if err := eq.dst.Write(event); err != nil { + // TODO(aaronl): Dropping events could be bad depending + // on the application. We should have a way of + // communicating this condition. However, logging + // at a log level above debug may not be appropriate. + // Eventually, go-events should not use logrus at all, + // and should bubble up conditions like this through + // error values. + logrus.WithFields(logrus.Fields{ + "event": event, + "sink": eq.dst, + }).WithError(err).Debug("eventqueue: dropped event") + } + } +} + +// Len returns the number of items that are currently stored in the queue and +// not consumed by its sink. +func (eq *LimitQueue) Len() int { + eq.mu.Lock() + defer eq.mu.Unlock() + return eq.events.Len() +} + +func (eq *LimitQueue) String() string { + eq.mu.Lock() + defer eq.mu.Unlock() + return fmt.Sprintf("%v", eq.events) +} + +// next encompasses the critical section of the run loop. When the queue is +// empty, it will block on the condition. If new data arrives, it will wake +// and return a block. When closed, a nil slice will be returned. +func (eq *LimitQueue) next() events.Event { + eq.mu.Lock() + defer eq.mu.Unlock() + + for eq.events.Len() < 1 { + if eq.closed { + eq.cond.Broadcast() + return nil + } + + eq.cond.Wait() + } + + front := eq.events.Front() + block := front.Value.(events.Event) + eq.events.Remove(front) + + return block +} diff --git a/watch/queue/queue_test.go b/watch/queue/queue_test.go new file mode 100644 index 0000000000..1b47211843 --- /dev/null +++ b/watch/queue/queue_test.go @@ -0,0 +1,176 @@ +package queue + +import ( + "fmt" + "sync" + "testing" + "time" + + "github.com/Sirupsen/logrus" + "github.com/docker/go-events" + "github.com/stretchr/testify/require" +) + +type mockSink struct { + closed bool + holdChan chan struct{} + data []events.Event + mutex sync.Mutex + once sync.Once +} + +func (s *mockSink) Write(event events.Event) error { + <-s.holdChan + + s.mutex.Lock() + defer s.mutex.Unlock() + if s.closed { + return events.ErrSinkClosed + } + s.data = append(s.data, event) + return nil +} + +func (s *mockSink) Close() error { + s.mutex.Lock() + defer s.mutex.Unlock() + + s.once.Do(func() { + s.closed = true + close(s.holdChan) + }) + return nil +} + +func (s *mockSink) Len() int { + s.mutex.Lock() + defer s.mutex.Unlock() + return len(s.data) +} + +func (s *mockSink) String() string { + s.mutex.Lock() + defer s.mutex.Unlock() + return fmt.Sprintf("%v", s.data) +} + +func TestLimitQueueNoLimit(t *testing.T) { + require := require.New(t) + ch := make(chan struct{}) + ms := &mockSink{ + holdChan: ch, + } + + // Create a limit queue with no limit and store 10k events. The events + // should be held in the queue until we unblock the sink. + q := NewLimitQueue(ms, 0) + defer q.Close() + defer ms.Close() + + // Writing one event to the queue should block during the sink write phase + require.NoError(q.Write("test event")) + + // Make sure the consumer goroutine receives the event + deadline := time.Now().Add(5 * time.Second) + for time.Now().Before(deadline) && q.Len() != 0 { + time.Sleep(20 * time.Millisecond) + } + require.Equal(0, q.Len()) + require.Equal(0, ms.Len()) + + for i := 0; i < 9999; i++ { + require.NoError(q.Write("test event")) + } + require.Equal(9999, q.Len()) // 1 event blocked in the sink, 9999 waiting in the queue + require.Equal(0, ms.Len()) + + // Unblock the sink and expect all the events to have been flushed out of + // the queue. + for i := 0; i < 10000; i++ { + ch <- struct{}{} + } + deadline = time.Now().Add(5 * time.Second) + for time.Now().Before(deadline) && ms.Len() != 10000 { + time.Sleep(20 * time.Millisecond) + } + + require.Equal(0, q.Len()) + require.Equal(10000, ms.Len()) +} + +// TestLimitQueueWithLimit ensures that the limit queue works with a limit. +func TestLimitQueueWithLimit(t *testing.T) { + require := require.New(t) + ch := make(chan struct{}) + ms := &mockSink{ + holdChan: ch, + } + + // Create a limit queue with no limit and store 10k events. The events should be held in + // the queue until we unblock the sink. + q := NewLimitQueue(ms, 10) + defer q.Close() + defer ms.Close() + + // Write the first event and wait for it to block on the writer + require.NoError(q.Write("test event")) + deadline := time.Now().Add(5 * time.Second) + for time.Now().Before(deadline) && q.Len() != 0 { + time.Sleep(20 * time.Millisecond) + } + require.Equal(0, ms.Len()) + require.Equal(0, q.Len()) + + // Fill up the queue + for i := 0; i < 10; i++ { + require.NoError(q.Write("test event")) + } + require.Equal(0, ms.Len()) + require.Equal(10, q.Len()) + + // Reading one event by the sink should allow us to write one more back + // without closing the queue. + ch <- struct{}{} + deadline = time.Now().Add(5 * time.Second) + for time.Now().Before(deadline) && q.Len() != 9 { + time.Sleep(20 * time.Millisecond) + } + require.Equal(9, q.Len()) + require.Equal(1, ms.Len()) + require.NoError(q.Write("test event")) + require.Equal(10, q.Len()) + require.Equal(1, ms.Len()) + + // Trying to write a new event in the queue should flush it + logrus.Debugf("Closing queue") + err := q.Write("test event") + require.Error(err) + require.Equal(ErrQueueFull, err) + require.Equal(10, q.Len()) + require.Equal(1, ms.Len()) + + // Further writes should return the same error + err = q.Write("test event") + require.Error(err) + require.Equal(ErrQueueFull, err) + require.Equal(10, q.Len()) + require.Equal(1, ms.Len()) + + // Reading one event from the sink will allow one more write to go through again + ch <- struct{}{} + deadline = time.Now().Add(5 * time.Second) + for time.Now().Before(deadline) && q.Len() != 9 { + time.Sleep(20 * time.Millisecond) + } + require.Equal(9, q.Len()) + require.Equal(2, ms.Len()) + require.NoError(q.Write("test event")) + require.Equal(10, q.Len()) + require.Equal(2, ms.Len()) + + err = q.Write("test event") + require.Error(err) + require.Equal(ErrQueueFull, err) + require.Equal(10, q.Len()) + require.Equal(2, ms.Len()) +} diff --git a/watch/sinks.go b/watch/sinks.go new file mode 100644 index 0000000000..b22b4842c3 --- /dev/null +++ b/watch/sinks.go @@ -0,0 +1,95 @@ +package watch + +import ( + "fmt" + "time" + + events "github.com/docker/go-events" +) + +// ErrSinkTimeout is returned from the Write method when a sink times out. +var ErrSinkTimeout = fmt.Errorf("timeout exceeded, tearing down sink") + +// timeoutSink is a sink that wraps another sink with a timeout. If the +// embedded sink fails to complete a Write operation within the specified +// timeout, the Write operation of the timeoutSink fails. +type timeoutSink struct { + timeout time.Duration + sink events.Sink +} + +func (s timeoutSink) Write(event events.Event) error { + errChan := make(chan error) + go func(c chan<- error) { + c <- s.sink.Write(event) + }(errChan) + + timer := time.NewTimer(s.timeout) + select { + case err := <-errChan: + timer.Stop() + return err + case <-timer.C: + s.sink.Close() + return ErrSinkTimeout + } +} + +func (s timeoutSink) Close() error { + return s.sink.Close() +} + +// dropErrClosed is a sink that suppresses ErrSinkClosed from Write, to avoid +// debug log messages that may be confusing. It is possible that the queue +// will try to write an event to its destination channel while the queue is +// being removed from the broadcaster. Since the channel is closed before the +// queue, there is a narrow window when this is possible. In some event-based +// dropping events when a sink is removed from a broadcaster is a problem, but +// for the usage in this watch package that's the expected behavior. +type dropErrClosed struct { + sink events.Sink +} + +func (s dropErrClosed) Write(event events.Event) error { + err := s.sink.Write(event) + if err == events.ErrSinkClosed { + return nil + } + return err +} + +func (s dropErrClosed) Close() error { + return s.sink.Close() +} + +// dropErrClosedChanGen is a ChannelSinkGenerator for dropErrClosed sinks wrapping +// unbuffered channels. +type dropErrClosedChanGen struct{} + +func (s *dropErrClosedChanGen) NewChannelSink() (events.Sink, *events.Channel) { + ch := events.NewChannel(0) + return dropErrClosed{sink: ch}, ch +} + +// TimeoutDropErrChanGen is a ChannelSinkGenerator that creates a channel, +// wrapped by the dropErrClosed sink and a timeout. +type TimeoutDropErrChanGen struct { + timeout time.Duration +} + +// NewChannelSink creates a new sink chain of timeoutSink->dropErrClosed->Channel +func (s *TimeoutDropErrChanGen) NewChannelSink() (events.Sink, *events.Channel) { + ch := events.NewChannel(0) + return timeoutSink{ + timeout: s.timeout, + sink: dropErrClosed{ + sink: ch, + }, + }, ch +} + +// NewTimeoutDropErrSinkGen returns a generator of timeoutSinks wrapping dropErrClosed +// sinks, wrapping unbuffered channel sinks. +func NewTimeoutDropErrSinkGen(timeout time.Duration) ChannelSinkGenerator { + return &TimeoutDropErrChanGen{timeout: timeout} +} diff --git a/watch/sinks_test.go b/watch/sinks_test.go new file mode 100644 index 0000000000..867b2f7c42 --- /dev/null +++ b/watch/sinks_test.go @@ -0,0 +1,49 @@ +package watch + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +// TestTimeoutDropErrSinkGen tests the full chain of sinks +func TestTimeoutDropErrSinkGen(t *testing.T) { + require := require.New(t) + doneChan := make(chan struct{}) + + sinkGen := NewTimeoutDropErrSinkGen(time.Second) + + // Generate two channels to perform the following test-cases + sink, ch := sinkGen.NewChannelSink() + sink2, ch2 := sinkGen.NewChannelSink() + + go func() { + for { + select { + case <-ch.C: + case <-doneChan: + return + } + } + }() + require.NoError(sink.Write("some event")) + + // Make sure the sink times out on the write operation if the channel is + // not read from. + err := sink2.Write("some event") + require.Error(err) + require.Equal(ErrSinkTimeout, err) + + // Ensure that hitting a timeout causes the sink to close + <-ch2.Done() + + // Make sure that closing a sink closes the channel + var errClose error + errClose = sink.Close() + <-ch.Done() + require.NoError(errClose) + + // Close the leaking goroutine + close(doneChan) +} diff --git a/watch/watch.go b/watch/watch.go index a468237015..ed5b834452 100644 --- a/watch/watch.go +++ b/watch/watch.go @@ -1,48 +1,81 @@ package watch import ( + "context" + "fmt" "sync" + "time" "github.com/docker/go-events" + "github.com/docker/swarmkit/watch/queue" ) -// dropErrClosed is a sink that suppresses ErrSinkClosed from Write, to avoid -// debug log messages that may be confusing. It is possible that the queue -// will try to write an event to its destination channel while the queue is -// being removed from the broadcaster. Since the channel is closed before the -// queue, there is a narrow window when this is possible. In some event-based -// dropping events when a sink is removed from a broadcaster is a problem, but -// for the usage in this watch package that's the expected behavior. -type dropErrClosed struct { - sink events.Sink -} - -func (s dropErrClosed) Write(event events.Event) error { - err := s.sink.Write(event) - if err == events.ErrSinkClosed { - return nil - } - return err -} - -func (s dropErrClosed) Close() error { - return s.sink.Close() +// ChannelSinkGenerator is a constructor of sinks that eventually lead to a +// channel. +type ChannelSinkGenerator interface { + NewChannelSink() (events.Sink, *events.Channel) } // Queue is the structure used to publish events and watch for them. type Queue struct { + sinkGen ChannelSinkGenerator + // limit is the max number of items to be held in memory for a watcher + limit uint64 mu sync.Mutex broadcast *events.Broadcaster - cancelFuncs map[*events.Channel]func() + cancelFuncs map[events.Sink]func() + + // closeOutChan indicates whether the watchers' channels should be closed + // when a watcher queue reaches its limit or when the Close method of the + // sink is called. + closeOutChan bool } // NewQueue creates a new publish/subscribe queue which supports watchers. // The channels that it will create for subscriptions will have the buffer // size specified by buffer. -func NewQueue() *Queue { - return &Queue{ - broadcast: events.NewBroadcaster(), - cancelFuncs: make(map[*events.Channel]func()), +func NewQueue(options ...func(*Queue) error) *Queue { + // Create a queue with the default values + q := &Queue{ + sinkGen: &dropErrClosedChanGen{}, + broadcast: events.NewBroadcaster(), + cancelFuncs: make(map[events.Sink]func()), + limit: 0, + closeOutChan: false, + } + + for _, option := range options { + err := option(q) + if err != nil { + panic(fmt.Sprintf("Failed to apply options to queue: %s", err)) + } + } + + return q +} + +// WithTimeout returns a functional option for a queue that sets a write timeout +func WithTimeout(timeout time.Duration) func(*Queue) error { + return func(q *Queue) error { + q.sinkGen = NewTimeoutDropErrSinkGen(timeout) + return nil + } +} + +// WithCloseOutChan returns a functional option for a queue whose watcher +// channel is closed when no more events are expected to be sent to the watcher. +func WithCloseOutChan() func(*Queue) error { + return func(q *Queue) error { + q.closeOutChan = true + return nil + } +} + +// WithLimit returns a functional option for a queue with a max size limit. +func WithLimit(limit uint64) func(*Queue) error { + return func(q *Queue) error { + q.limit = limit + return nil } } @@ -52,13 +85,21 @@ func (q *Queue) Watch() (eventq chan events.Event, cancel func()) { return q.CallbackWatch(nil) } +// WatchContext returns a channel where all items published to the queue will +// be received. The channel will be closed when the provided context is +// cancelled. +func (q *Queue) WatchContext(ctx context.Context) (eventq chan events.Event) { + return q.CallbackWatchContext(ctx, nil) +} + // CallbackWatch returns a channel which will receive all events published to // the queue from this point that pass the check in the provided callback // function. The returned cancel function will stop the flow of events and // close the channel. func (q *Queue) CallbackWatch(matcher events.Matcher) (eventq chan events.Event, cancel func()) { - ch := events.NewChannel(0) - sink := events.Sink(events.NewQueue(dropErrClosed{sink: ch})) + chanSink, ch := q.sinkGen.NewChannelSink() + lq := queue.NewLimitQueue(chanSink, q.limit) + sink := events.Sink(lq) if matcher != nil { sink = events.NewFilter(sink, matcher) @@ -72,19 +113,68 @@ func (q *Queue) CallbackWatch(matcher events.Matcher) (eventq chan events.Event, sink.Close() } - q.mu.Lock() - q.cancelFuncs[ch] = cancelFunc - q.mu.Unlock() - return ch.C, func() { + externalCancelFunc := func() { q.mu.Lock() - cancelFunc := q.cancelFuncs[ch] - delete(q.cancelFuncs, ch) + cancelFunc := q.cancelFuncs[sink] + delete(q.cancelFuncs, sink) q.mu.Unlock() if cancelFunc != nil { cancelFunc() } } + + q.mu.Lock() + q.cancelFuncs[sink] = cancelFunc + q.mu.Unlock() + + // If the output channel shouldn't be closed and the queue is limitless, + // there's no need for an additional goroutine. + if !q.closeOutChan && q.limit == 0 { + return ch.C, externalCancelFunc + } + + outChan := make(chan events.Event) + go func() { + for { + select { + case <-ch.Done(): + // Close the output channel if the ChannelSink is Done for any + // reason. This can happen if the cancelFunc is called + // externally or if it has been closed by a wrapper sink, such + // as the TimeoutSink. + if q.closeOutChan { + close(outChan) + } + externalCancelFunc() + return + case <-lq.Full(): + // Close the output channel and tear down the Queue if the + // LimitQueue becomes full. + if q.closeOutChan { + close(outChan) + } + externalCancelFunc() + return + case event := <-ch.C: + outChan <- event + } + } + }() + + return outChan, externalCancelFunc +} + +// CallbackWatchContext returns a channel where all items published to the queue will +// be received. The channel will be closed when the provided context is +// cancelled. +func (q *Queue) CallbackWatchContext(ctx context.Context, matcher events.Matcher) (eventq chan events.Event) { + c, cancel := q.CallbackWatch(matcher) + go func() { + <-ctx.Done() + cancel() + }() + return c } // Publish adds an item to the queue. @@ -100,7 +190,7 @@ func (q *Queue) Close() error { for _, cancelFunc := range q.cancelFuncs { cancelFunc() } - q.cancelFuncs = make(map[*events.Channel]func()) + q.cancelFuncs = make(map[events.Sink]func()) q.mu.Unlock() return q.broadcast.Close() diff --git a/watch/watch_test.go b/watch/watch_test.go index ebe7828136..28845a9416 100644 --- a/watch/watch_test.go +++ b/watch/watch_test.go @@ -1,15 +1,81 @@ package watch import ( + "context" "sync" "testing" + "time" "github.com/docker/go-events" + "github.com/stretchr/testify/require" ) +func TestTimeoutLimitWatch(t *testing.T) { + require := require.New(t) + q := NewQueue(WithTimeout(time.Second), WithLimit(5), WithCloseOutChan()) + defer q.Close() + ctx, cancel := context.WithCancel(context.Background()) + + // Cancelling a watcher's context should remove the watcher from the queue and + // close its channel. + doneChan := make(chan struct{}) + go func() { + events := q.WatchContext(ctx) + for range events { + } + close(doneChan) + }() + cancel() + <-doneChan + + // Test a scenario with a faster write rate than read rate The queue + // should eventually fill up and the channel will be closed. + readerSleepDuration := 100 * time.Millisecond + writerSleepDuration := 10 * time.Millisecond + + events, cancel := q.Watch() + defer cancel() + + receivedChan := make(chan struct{}) + eventsClosed := make(chan struct{}) + + go func() { + closed := false + for range events { + if !closed { + close(receivedChan) + closed = true + } + time.Sleep(readerSleepDuration) + } + close(eventsClosed) + }() + + // Publish one event and wait for the watcher to receive it + q.Publish("new event") + <-receivedChan + + timeoutTimer := time.NewTimer(time.Minute) +selectLoop: + for { + select { + case <-timeoutTimer.C: + require.Fail("Timeout exceeded") + case <-time.After(writerSleepDuration): + q.Publish("new event") + case <-eventsClosed: + break selectLoop + } + } + + _, ok := <-events + require.False(ok) +} + func TestWatch(t *testing.T) { // Create a queue q := NewQueue() + defer q.Close() type testEvent struct { tags []string @@ -150,6 +216,11 @@ func BenchmarkWatch1000Listeners64Publishers(b *testing.B) { func benchmarkWatch(b *testing.B, nlisteners, npublishers int, waitForWatchers bool) { q := NewQueue() + defer q.Close() + benchmarkWatchForQueue(q, b, nlisteners, npublishers, waitForWatchers) +} + +func benchmarkWatchForQueue(q *Queue, b *testing.B, nlisteners, npublishers int, waitForWatchers bool) { var ( watchersAttached sync.WaitGroup watchersRunning sync.WaitGroup