Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 38 additions & 17 deletions queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,8 @@ package events

import (
"container/list"
"context"
"sync"

"github.com/sirupsen/logrus"
)

// Queue accepts all messages into a queue for asynchronous consumption
Expand All @@ -15,19 +14,46 @@ type Queue struct {
events *list.List
cond *sync.Cond
mu sync.Mutex
errs chan error
closed bool
}

// NewQueue returns a queue to the provided Sink dst.
//
// Deprecated: use [events.NewAsyncQueue] instead.
func NewQueue(dst Sink) *Queue {
eq := Queue{
eq, _ := NewAsyncQueue(context.TODO(), dst)
return eq
}

type QueueOpt func(*Queue)

func NewAsyncQueue(ctx context.Context, dst Sink, opts ...QueueOpt) (*Queue, <-chan error) {
eq := &Queue{
dst: dst,
events: list.New(),
}

eq.cond = sync.NewCond(&eq.mu)
go eq.run()
return &eq

for _, opt := range opts {
opt(eq)
}

go eq.run(ctx)
return eq, eq.errs
}

func WithBufferedChannel(capacity int) QueueOpt {
return func(q *Queue) {
q.errs = make(chan error, capacity)
}
}

func WithBlockingChannel() QueueOpt {
return func(q *Queue) {
q.errs = make(chan error)
}
}

// Write accepts the events into the queue, only failing if the queue has
Expand Down Expand Up @@ -63,26 +89,21 @@ func (eq *Queue) Close() error {
}

// run is the main goroutine to flush events to the target sink.
func (eq *Queue) run() {
func (eq *Queue) run(ctx context.Context) {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to hook in context into the goroutine's lifecycle here. Will require a major refactor of this implementation though.

for {
event := eq.next()

if event == nil {
if eq.errs != nil {
close(eq.errs)
}
return // nil block means event queue is closed.
}

if err := eq.dst.Write(event); err != nil {
// TODO(aaronl): Dropping events could be bad depending
// on the application. We should have a way of
// communicating this condition. However, logging
// at a log level above debug may not be appropriate.
// Eventually, go-events should not use logrus at all,
// and should bubble up conditions like this through
// error values.
logrus.WithFields(logrus.Fields{
"event": event,
"sink": eq.dst,
}).WithError(err).Debug("eventqueue: dropped event")
if eq.errs != nil {
eq.errs <- err
}
}
}
}
Expand Down
62 changes: 62 additions & 0 deletions queue_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package events

import (
"context"
"errors"
"fmt"
"sync"
"testing"
Expand Down Expand Up @@ -44,3 +46,63 @@ func TestQueue(t *testing.T) {
t.Fatalf("sink should have been closed")
}
}

type errorOnWrite struct {
numWriteErrs int
wait *sync.WaitGroup
}

func (eow *errorOnWrite) Write(event Event) error {
eow.numWriteErrs++
eow.wait.Done()
return errors.New("write failed")
}

func (eow *errorOnWrite) Close() error {
return nil
}

func TestBufferedChanneledQueue(t *testing.T) {
const nevents = 100

var waitWrite sync.WaitGroup
waitWrite.Add(nevents)
eow := &errorOnWrite{wait: &waitWrite}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

eq, errCh := NewAsyncQueue(ctx, eow, WithBufferedChannel(nevents))

var (
waitQueue sync.WaitGroup
asyncErr error
once sync.Once
)
for i := 1; i <= nevents; i++ {
waitQueue.Add(1)
go func(event Event) {
defer waitQueue.Done()

if err := eq.Write(event); err != nil {
once.Do(func() {
asyncErr = fmt.Errorf("error writing event(%v): %v", event, err)
})
}
}(fmt.Sprintf("event-%d", i))
}

// Wait for all writes to be queued.
waitQueue.Wait()

// Verify there was no error queuing up events.
if asyncErr != nil {
t.Fatalf("expected nil error, got %v", asyncErr)
}

waitWrite.Wait()

if len(errCh) != nevents {
t.Fatalf("expected %d errors, got %d", nevents, len(errCh))
}
}