Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 25 additions & 1 deletion events/loop/loop.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type Interface[T any] interface {
}

type loop[T any] struct {
name string
queue chan T
handler Handler[T]

Expand All @@ -38,8 +39,26 @@ type loop[T any] struct {
lock sync.RWMutex
}

func New[T any](h Handler[T], size uint64) Interface[T] {
type options struct {
name string
}

type Option func(*options)

func WithName(name string) Option {
return func(o *options) {
o.name = name
}
}

func New[T any](h Handler[T], size uint64, setters ...Option) Interface[T] {
var opts options
for _, setter := range setters {
setter(&opts)
}

return &loop[T]{
name: opts.name,
queue: make(chan T, size),
handler: h,
closeCh: make(chan struct{}),
Expand Down Expand Up @@ -76,6 +95,11 @@ func (l *loop[T]) Enqueue(req T) {
select {
case l.queue <- req:
case <-l.closeCh:
// ?? point of adding the name is to quicky surface which loop is blocked
// default:
// fmt.Printf("loop '%s' queue is full (size: %d), blocking\n",
// l.name, len(l.queue))
// l.queue <- req
}
}

Expand Down