diff --git a/queue.go b/queue.go index 4bb770a..5e97539 100644 --- a/queue.go +++ b/queue.go @@ -2,9 +2,8 @@ package events import ( "container/list" + "context" "sync" - - "github.com/sirupsen/logrus" ) // Queue accepts all messages into a queue for asynchronous consumption @@ -15,19 +14,46 @@ type Queue struct { events *list.List cond *sync.Cond mu sync.Mutex + errs chan error closed bool } // NewQueue returns a queue to the provided Sink dst. +// +// Deprecated: use [events.NewAsyncQueue] instead. func NewQueue(dst Sink) *Queue { - eq := Queue{ + eq, _ := NewAsyncQueue(context.TODO(), dst) + return eq +} + +type QueueOpt func(*Queue) + +func NewAsyncQueue(ctx context.Context, dst Sink, opts ...QueueOpt) (*Queue, <-chan error) { + eq := &Queue{ dst: dst, events: list.New(), } eq.cond = sync.NewCond(&eq.mu) - go eq.run() - return &eq + + for _, opt := range opts { + opt(eq) + } + + go eq.run(ctx) + return eq, eq.errs +} + +func WithBufferedChannel(capacity int) QueueOpt { + return func(q *Queue) { + q.errs = make(chan error, capacity) + } +} + +func WithBlockingChannel() QueueOpt { + return func(q *Queue) { + q.errs = make(chan error) + } } // Write accepts the events into the queue, only failing if the queue has @@ -63,26 +89,21 @@ func (eq *Queue) Close() error { } // run is the main goroutine to flush events to the target sink. -func (eq *Queue) run() { +func (eq *Queue) run(ctx context.Context) { for { event := eq.next() if event == nil { + if eq.errs != nil { + close(eq.errs) + } return // nil block means event queue is closed. } if err := eq.dst.Write(event); err != nil { - // TODO(aaronl): Dropping events could be bad depending - // on the application. We should have a way of - // communicating this condition. However, logging - // at a log level above debug may not be appropriate. - // Eventually, go-events should not use logrus at all, - // and should bubble up conditions like this through - // error values. - logrus.WithFields(logrus.Fields{ - "event": event, - "sink": eq.dst, - }).WithError(err).Debug("eventqueue: dropped event") + if eq.errs != nil { + eq.errs <- err + } } } } diff --git a/queue_test.go b/queue_test.go index 7bfe0e3..68950f4 100644 --- a/queue_test.go +++ b/queue_test.go @@ -1,6 +1,8 @@ package events import ( + "context" + "errors" "fmt" "sync" "testing" @@ -44,3 +46,63 @@ func TestQueue(t *testing.T) { t.Fatalf("sink should have been closed") } } + +type errorOnWrite struct { + numWriteErrs int + wait *sync.WaitGroup +} + +func (eow *errorOnWrite) Write(event Event) error { + eow.numWriteErrs++ + eow.wait.Done() + return errors.New("write failed") +} + +func (eow *errorOnWrite) Close() error { + return nil +} + +func TestBufferedChanneledQueue(t *testing.T) { + const nevents = 100 + + var waitWrite sync.WaitGroup + waitWrite.Add(nevents) + eow := &errorOnWrite{wait: &waitWrite} + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + eq, errCh := NewAsyncQueue(ctx, eow, WithBufferedChannel(nevents)) + + var ( + waitQueue sync.WaitGroup + asyncErr error + once sync.Once + ) + for i := 1; i <= nevents; i++ { + waitQueue.Add(1) + go func(event Event) { + defer waitQueue.Done() + + if err := eq.Write(event); err != nil { + once.Do(func() { + asyncErr = fmt.Errorf("error writing event(%v): %v", event, err) + }) + } + }(fmt.Sprintf("event-%d", i)) + } + + // Wait for all writes to be queued. + waitQueue.Wait() + + // Verify there was no error queuing up events. + if asyncErr != nil { + t.Fatalf("expected nil error, got %v", asyncErr) + } + + waitWrite.Wait() + + if len(errCh) != nevents { + t.Fatalf("expected %d errors, got %d", nevents, len(errCh)) + } +}