Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
158 changes: 158 additions & 0 deletions watch/queue/queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
package queue

import (
"container/list"
"fmt"
"sync"

"github.com/Sirupsen/logrus"
"github.com/docker/go-events"
)

// ErrQueueFull is returned by a Write operation when that Write causes the
// queue to reach its size limit.
var ErrQueueFull = fmt.Errorf("queue closed due to size limit")

// LimitQueue accepts all messages into a queue for asynchronous consumption by
// a sink until an upper limit of messages is reached. When that limit is
// reached, the entire Queue is Closed. It is thread safe but the
// sink must be reliable or events will be dropped.
// If a size of 0 is provided, the LimitQueue is considered limitless.
type LimitQueue struct {
dst events.Sink
events *list.List
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure how much time you have to make this work, but if you can instrument the entrance and exit of a regular queue, you can avoid having to replicate all of this logic.

limit uint64
cond *sync.Cond
mu sync.Mutex
closed bool
full chan struct{}
fullClosed bool
}

// NewLimitQueue returns a queue to the provided Sink dst.
func NewLimitQueue(dst events.Sink, limit uint64) *LimitQueue {
eq := LimitQueue{
dst: dst,
events: list.New(),
limit: limit,
full: make(chan struct{}),
}

eq.cond = sync.NewCond(&eq.mu)
go eq.run()
return &eq
}

// Write accepts the events into the queue, only failing if the queue has
// been closed or has reached its size limit.
func (eq *LimitQueue) Write(event events.Event) error {
eq.mu.Lock()
defer eq.mu.Unlock()

if eq.closed {
return events.ErrSinkClosed
}

if eq.limit > 0 && uint64(eq.events.Len()) >= eq.limit {
// If the limit has been reached, don't write the event to the queue,
// and close the Full channel. This notifies listeners that the queue
// is now full, but the sink is still permitted to consume events. It's
// the responsibility of the listener to decide whether they want to
// live with dropped events or whether they want to Close() the
// LimitQueue
if !eq.fullClosed {
eq.fullClosed = true
close(eq.full)
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can do a callback here. That would avoid spilling the internal channel manipulation outside of the internals.

Make sure to release the locks, then return ErrQueueFull. That will allow writer and out of band notification. It also doesn't poison the queue.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that a callback would be a better pattern here. Let's do it this way for the go-events followup

return ErrQueueFull
}

eq.events.PushBack(event)
eq.cond.Signal() // signal waiters

return nil
}

// Full returns a channel that is closed when the queue becomes full for the
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens when it is no longer full?

Typically, "full" and "empty" act pretty racy for concurrent queues. When would this be used?

If we want to act on full, might want a clamping function.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The channel is closed when a Write causes the queue to reach its limit. The queue can stop being full afterwards, and this channel is not meant as a mechanism for viewing the current full-ness state of the queue.

The main use case for this is to notify that least one Event has been dropped, and then it's up to the listener to determine if any action should be taken.

In the case of docker events, all API server implementations can receive a /events?since parameter to backfill past events, and the events stream is expected to be reliable in some versions of the CLI (1.13 to 17.03). Therefore, when a slow listener fills up its queue it's preferred to close their event stream entirely and have them re-establish it with an appropriate since parameter, rather than silently dropping events.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couldn't this be handled with a callback on each dropped message? This seems very fragile.

// first time.
func (eq *LimitQueue) Full() chan struct{} {
return eq.full
}

// Close shuts down the event queue, flushing all events
func (eq *LimitQueue) Close() error {
eq.mu.Lock()
defer eq.mu.Unlock()

if eq.closed {
return nil
}

// set the closed flag
eq.closed = true
eq.cond.Signal() // signal flushes queue
eq.cond.Wait() // wait for signal from last flush
return eq.dst.Close()
}

// run is the main goroutine to flush events to the target sink.
func (eq *LimitQueue) run() {
for {
event := eq.next()

if event == nil {
return // nil block means event queue is closed.
}

if err := eq.dst.Write(event); err != nil {
// TODO(aaronl): Dropping events could be bad depending
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aaronlehmann Did we not remove this error message?

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's suppressed by a wrapper sink in swarmkit. Discussion here: docker/go-events#11

// on the application. We should have a way of
// communicating this condition. However, logging
// at a log level above debug may not be appropriate.
// Eventually, go-events should not use logrus at all,
// and should bubble up conditions like this through
// error values.
logrus.WithFields(logrus.Fields{
"event": event,
"sink": eq.dst,
}).WithError(err).Debug("eventqueue: dropped event")
}
}
}

// Len returns the number of items that are currently stored in the queue and
// not consumed by its sink.
func (eq *LimitQueue) Len() int {
eq.mu.Lock()
defer eq.mu.Unlock()
return eq.events.Len()
}

func (eq *LimitQueue) String() string {
eq.mu.Lock()
defer eq.mu.Unlock()
return fmt.Sprintf("%v", eq.events)
}

// next encompasses the critical section of the run loop. When the queue is
// empty, it will block on the condition. If new data arrives, it will wake
// and return a block. When closed, a nil slice will be returned.
func (eq *LimitQueue) next() events.Event {
eq.mu.Lock()
defer eq.mu.Unlock()

for eq.events.Len() < 1 {
if eq.closed {
eq.cond.Broadcast()
return nil
}

eq.cond.Wait()
}

front := eq.events.Front()
block := front.Value.(events.Event)
eq.events.Remove(front)

return block
}
176 changes: 176 additions & 0 deletions watch/queue/queue_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
package queue

import (
"fmt"
"sync"
"testing"
"time"

"github.com/Sirupsen/logrus"
"github.com/docker/go-events"
"github.com/stretchr/testify/require"
)

type mockSink struct {
closed bool
holdChan chan struct{}
data []events.Event
mutex sync.Mutex
once sync.Once
}

func (s *mockSink) Write(event events.Event) error {
<-s.holdChan

s.mutex.Lock()
defer s.mutex.Unlock()
if s.closed {
return events.ErrSinkClosed
}
s.data = append(s.data, event)
return nil
}

func (s *mockSink) Close() error {
s.mutex.Lock()
defer s.mutex.Unlock()

s.once.Do(func() {
s.closed = true
close(s.holdChan)
})
return nil
}

func (s *mockSink) Len() int {
s.mutex.Lock()
defer s.mutex.Unlock()
return len(s.data)
}

func (s *mockSink) String() string {
s.mutex.Lock()
defer s.mutex.Unlock()
return fmt.Sprintf("%v", s.data)
}

func TestLimitQueueNoLimit(t *testing.T) {
require := require.New(t)
ch := make(chan struct{})
ms := &mockSink{
holdChan: ch,
}

// Create a limit queue with no limit and store 10k events. The events
// should be held in the queue until we unblock the sink.
q := NewLimitQueue(ms, 0)
defer q.Close()
defer ms.Close()

// Writing one event to the queue should block during the sink write phase
require.NoError(q.Write("test event"))

// Make sure the consumer goroutine receives the event
deadline := time.Now().Add(5 * time.Second)
for time.Now().Before(deadline) && q.Len() != 0 {
time.Sleep(20 * time.Millisecond)
}
require.Equal(0, q.Len())
require.Equal(0, ms.Len())

for i := 0; i < 9999; i++ {
require.NoError(q.Write("test event"))
}
require.Equal(9999, q.Len()) // 1 event blocked in the sink, 9999 waiting in the queue
require.Equal(0, ms.Len())

// Unblock the sink and expect all the events to have been flushed out of
// the queue.
for i := 0; i < 10000; i++ {
ch <- struct{}{}
}
deadline = time.Now().Add(5 * time.Second)
for time.Now().Before(deadline) && ms.Len() != 10000 {
time.Sleep(20 * time.Millisecond)
}

require.Equal(0, q.Len())
require.Equal(10000, ms.Len())
}

// TestLimitQueueWithLimit ensures that the limit queue works with a limit.
func TestLimitQueueWithLimit(t *testing.T) {
require := require.New(t)
ch := make(chan struct{})
ms := &mockSink{
holdChan: ch,
}

// Create a limit queue with no limit and store 10k events. The events should be held in
// the queue until we unblock the sink.
q := NewLimitQueue(ms, 10)
defer q.Close()
defer ms.Close()

// Write the first event and wait for it to block on the writer
require.NoError(q.Write("test event"))
deadline := time.Now().Add(5 * time.Second)
for time.Now().Before(deadline) && q.Len() != 0 {
time.Sleep(20 * time.Millisecond)
}
require.Equal(0, ms.Len())
require.Equal(0, q.Len())

// Fill up the queue
for i := 0; i < 10; i++ {
require.NoError(q.Write("test event"))
}
require.Equal(0, ms.Len())
require.Equal(10, q.Len())

// Reading one event by the sink should allow us to write one more back
// without closing the queue.
ch <- struct{}{}
deadline = time.Now().Add(5 * time.Second)
for time.Now().Before(deadline) && q.Len() != 9 {
time.Sleep(20 * time.Millisecond)
}
require.Equal(9, q.Len())
require.Equal(1, ms.Len())
require.NoError(q.Write("test event"))
require.Equal(10, q.Len())
require.Equal(1, ms.Len())

// Trying to write a new event in the queue should flush it
logrus.Debugf("Closing queue")
err := q.Write("test event")
require.Error(err)
require.Equal(ErrQueueFull, err)
require.Equal(10, q.Len())
require.Equal(1, ms.Len())

// Further writes should return the same error
err = q.Write("test event")
require.Error(err)
require.Equal(ErrQueueFull, err)
require.Equal(10, q.Len())
require.Equal(1, ms.Len())

// Reading one event from the sink will allow one more write to go through again
ch <- struct{}{}
deadline = time.Now().Add(5 * time.Second)
for time.Now().Before(deadline) && q.Len() != 9 {
time.Sleep(20 * time.Millisecond)
}
require.Equal(9, q.Len())
require.Equal(2, ms.Len())
require.NoError(q.Write("test event"))
require.Equal(10, q.Len())
require.Equal(2, ms.Len())

err = q.Write("test event")
require.Error(err)
require.Equal(ErrQueueFull, err)
require.Equal(10, q.Len())
require.Equal(2, ms.Len())
}
Loading