Skip to content

dobredodo/goqueue

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

3 Commits
 
 
 
 
 
 
 
 

Repository files navigation

goqueue

A small, context-aware job queue with a worker pool for Go.

  • Bounded, buffered job channel
  • Worker pool with graceful drain (Close) or hard cancel (Stop)
  • Jobs receive a context.Context so long-running work can be cancelled
  • Safe to call Close and Stop multiple times
  • Pluggable logger
  • Zero dependencies

Install

go get github.com/dobredodo/goqueue

Requires Go 1.22+.

Usage

package main

import (
	"context"
	"fmt"

	"github.com/dobredodo/goqueue"
)

func main() {
	q := goqueue.New("emails", 16)
	q.Start(4)

	for i := 0; i < 10; i++ {
		i := i
		_ = q.Add(goqueue.Job{
			Name: fmt.Sprintf("send-%d", i),
			Action: func(ctx context.Context) error {
				return nil
			},
		})
	}

	q.Close()
	q.Wait()
}

API

Method Description
New(name, buffer) Create a queue with a buffered job channel.
Start(n) Launch n worker goroutines.
Add(job) Enqueue a job. Returns ErrQueueStopped if the queue was stopped.
AddAll(jobs) Enqueue a slice, stopping at the first error.
Close() No more jobs will be added. Workers drain remaining jobs then exit.
Stop() Cancel the queue context. Workers exit ASAP; pending jobs are dropped.
Wait() Block until all workers have exited.

Close vs Stop

  • Close: graceful. Finish queued work, then exit.
  • Stop: abortive. Cancel the shared context.Context passed to each job; workers exit as soon as their current job returns.

Use Close for normal shutdown. Use Stop for shutdown on error, timeout, or signal.

Lifecycle

              ┌─────┐
              │ New │
              └──┬──┘
                 │ Start(n)
                 ▼
         ┌───────────────┐   Add(job)
         │    Running    │◄───────────┐
         └───┬────────┬──┘            │
    Close() │        │ Stop()         │
   (drain)  │        │ (cancel ctx)   │
            ▼        ▼                │
      ┌──────────┐  ┌──────────┐      │
      │ Draining │  │ Stopping │      │
      └────┬─────┘  └────┬─────┘      │
           └──────┬──────┘            │
                  ▼                   │
               ┌──────┐  Add → ErrQueueStopped
               │ Done │─────────────────┘
               └──────┘  Wait() returns
Guarantee Covered by
All queued jobs run before Wait returns after Close TestQueueRunsAllJobs
Stop cancels the context.Context passed to in-flight jobs TestStopCancelsJobContext
Add after Stop or Close returns ErrQueueStopped TestAddAfterStopReturnsError
Close is idempotent; double-close does not panic TestDoubleCloseSafe
Job returning an error does not kill its worker TestJobErrorDoesNotStopWorker

Logging

Replace the package-level Log fields to integrate your own logger:

goqueue.Log.Info = func(msg string, args ...any) { slog.Info(fmt.Sprintf(msg, args...)) }
goqueue.Log.Error = func(msg string, args ...any) { slog.Error(fmt.Sprintf(msg, args...)) }

About

Tiny Go job queue: buffered channel, worker pool, context cancellation

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors

Languages