Skip to content
This repository was archived by the owner on Sep 12, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 11 additions & 15 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,18 @@ var redisPool = &redis.Pool{
},
}

type SendEmailJobParameters struct {
Address string
Subject string
CustomerID int
}

// Make an enqueuer with a particular namespace
var enqueuer = work.NewEnqueuer("my_app_namespace", redisPool)

func main() {
// Enqueue a job named "send_email" with the specified parameters.
_, err := enqueuer.Enqueue("send_email", work.Q{"address": "test@example.com", "subject": "hello world", "customer_id": 4})
_, err := enqueuer.Enqueue("send_email", &SendEmailJobParameters{Address: "test@example.com", Subject: "hello world", CustomerID: 4})
if err != nil {
log.Fatal(err)
}
Expand Down Expand Up @@ -82,7 +88,6 @@ func main() {

// Add middleware that will be executed for each job
pool.Middleware(LogMiddleware)
pool.Middleware(FindCustomerMiddleware)

// Map the name of jobs to handler functions
pool.Job("send_email", SendEmailHandler)
Expand All @@ -107,25 +112,16 @@ func LogMiddleware(ctx *work.Context, next work.NextMiddlewareFunc) error {
return next()
}

func FindCustomerMiddleware(ctx *work.Context, next work.NextMiddlewareFunc) error {
// If there's a customer_id param, set it in the context for future middleware and handlers to use.
if _, ok := job.Args["customer_id"]; ok {
ctx.Set("customer_id", ctx.job.Args["customer_id"])
}

return next()
}

func SendEmailHandler(ctx *work.Context) error {
// Extract arguments:
addr := ctx.Job.ArgString("address")
subject := ctx.Job.ArgString("subject")
if err := ctx.Job.ArgError(); err != nil {
args := new(SendEmailJobParameters)
err := ctx.Job.UnmarshalPayload(args)
if err != nil {
return err
}

// Go ahead and send the email...
// sendEmailTo(addr, subject)
// sendEmailTo(args.Address, args.Subject)

return nil
}
Expand Down
8 changes: 4 additions & 4 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ type WorkerObservation struct {
JobName string `json:"job_name"`
JobID string `json:"job_id"`
StartedAt int64 `json:"started_at"`
ArgsJSON string `json:"args_json"`
Payload []byte `json:"payload"`
Checkin string `json:"checkin"`
CheckinAt int64 `json:"checkin_at"`
}
Expand Down Expand Up @@ -182,8 +182,8 @@ func (c *Client) WorkerObservations() ([]*WorkerObservation, error) {
ob.JobID = value
} else if key == "started_at" {
ob.StartedAt, err = strconv.ParseInt(value, 10, 64)
} else if key == "args" {
ob.ArgsJSON = value
} else if key == "payload" {
ob.Payload = []byte(value)
} else if key == "checkin" {
ob.Checkin = value
} else if key == "checkin_at" {
Expand Down Expand Up @@ -480,7 +480,7 @@ func (c *Client) DeleteScheduledJob(scheduledFor int64, jobID string) error {
}

if job.Unique {
uniqueKey, err := redisKeyUniqueJob(c.namespace, job.Name, job.Args)
uniqueKey, err := redisKeyUniqueJob(c.namespace, job.Name, job.Payload)
if err != nil {
logError("client.delete_scheduled_job.redis_key_unique_job", err)
return err
Expand Down
32 changes: 20 additions & 12 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,13 +97,13 @@ func TestClientWorkerObservations(t *testing.T) {
if ob.JobName == "foo" {
fooCount++
assert.True(t, ob.IsBusy)
assert.Equal(t, `{"a":3,"b":4}`, ob.ArgsJSON)
assert.Equal(t, []byte(`{"a":3,"b":4}`), ob.Payload)
assert.True(t, (nowEpochSeconds()-ob.StartedAt) <= 3)
assert.True(t, ob.JobID != "")
} else if ob.JobName == "wat" {
watCount++
assert.True(t, ob.IsBusy)
assert.Equal(t, `{"a":1,"b":2}`, ob.ArgsJSON)
assert.Equal(t, []byte(`{"a":1,"b":2}`), ob.Payload)
assert.True(t, (nowEpochSeconds()-ob.StartedAt) <= 3)
assert.True(t, ob.JobID != "")
} else {
Expand Down Expand Up @@ -213,8 +213,11 @@ func TestClientScheduledJobs(t *testing.T) {
assert.EqualValues(t, 1425263409, jobs[1].EnqueuedAt)
assert.EqualValues(t, 1425263409, jobs[2].EnqueuedAt)

assert.EqualValues(t, interface{}(1), jobs[0].Args["a"])
assert.EqualValues(t, interface{}(2), jobs[0].Args["b"])
var q Q
err = jobs[0].UnmarshalPayload(&q)
assert.Nil(t, err)
assert.EqualValues(t, interface{}(1), q["a"])
assert.EqualValues(t, interface{}(2), q["b"])

assert.EqualValues(t, 0, jobs[0].Fails)
assert.EqualValues(t, 0, jobs[1].Fails)
Expand Down Expand Up @@ -262,7 +265,9 @@ func TestClientRetryJobs(t *testing.T) {
assert.EqualValues(t, 1425263429, jobs[0].FailedAt)
assert.Equal(t, "wat", jobs[0].Name)
assert.EqualValues(t, 1425263409, jobs[0].EnqueuedAt)
assert.EqualValues(t, interface{}(1), jobs[0].Args["a"])
var q Q
assert.Nil(t, jobs[0].UnmarshalPayload(&q))
assert.EqualValues(t, interface{}(1), q["a"])
assert.EqualValues(t, 1, jobs[0].Fails)
assert.EqualValues(t, 1425263429, jobs[0].Job.FailedAt)
assert.Equal(t, "ohno", jobs[0].LastErr)
Expand Down Expand Up @@ -302,7 +307,9 @@ func TestClientDeadJobs(t *testing.T) {
assert.EqualValues(t, 1425263429, jobs[0].FailedAt)
assert.Equal(t, "wat", jobs[0].Name)
assert.EqualValues(t, 1425263409, jobs[0].EnqueuedAt)
assert.EqualValues(t, interface{}(1), jobs[0].Args["a"])
var q Q
assert.Nil(t, jobs[0].UnmarshalPayload(&q))
assert.EqualValues(t, interface{}(1), q["a"])
assert.EqualValues(t, 1, jobs[0].Fails)
assert.EqualValues(t, 1425263429, jobs[0].Job.FailedAt)
assert.Equal(t, "ohno", jobs[0].LastErr)
Expand Down Expand Up @@ -422,7 +429,7 @@ func TestClientRetryDeadJobWithArgs(t *testing.T) {
Name: name,
ID: makeIdentifier(),
EnqueuedAt: encAt,
Args: map[string]interface{}{"a": "wat"},
Payload: []byte(`{"a": "wat"}`),
Fails: 3,
LastErr: "sorry",
FailedAt: failAt,
Expand All @@ -447,9 +454,10 @@ func TestClientRetryDeadJobWithArgs(t *testing.T) {

job1 := getQueuedJob(ns, pool, name)
if assert.NotNil(t, job1) {
var q Q
assert.Nil(t, job1.UnmarshalPayload(&q))
assert.Equal(t, name, job1.Name)
assert.Equal(t, "wat", job1.ArgString("a"))
assert.NoError(t, job1.ArgError())
assert.Equal(t, "wat", q["a"].(string))
}
}

Expand Down Expand Up @@ -553,7 +561,7 @@ func TestClientRetryAllDeadJobsBig(t *testing.T) {
Name: "wat1",
ID: makeIdentifier(),
EnqueuedAt: 12345,
Args: nil,
Payload: nil,
Fails: 3,
LastErr: "sorry",
FailedAt: 12347,
Expand All @@ -574,7 +582,7 @@ func TestClientRetryAllDeadJobsBig(t *testing.T) {
Name: "dontexist",
ID: makeIdentifier(),
EnqueuedAt: 12345,
Args: nil,
Payload: nil,
Fails: 3,
LastErr: "sorry",
FailedAt: 12347,
Expand Down Expand Up @@ -687,7 +695,7 @@ func insertDeadJob(ns string, pool *redis.Pool, name string, encAt, failAt int64
Name: name,
ID: makeIdentifier(),
EnqueuedAt: encAt,
Args: nil,
Payload: nil,
Fails: 3,
LastErr: "sorry",
FailedAt: failAt,
Expand Down
8 changes: 4 additions & 4 deletions cmd/workenqueue/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
var redisHostPort = flag.String("redis", ":6379", "redis hostport")
var redisNamespace = flag.String("ns", "work", "redis namespace")
var jobName = flag.String("job", "", "job name")
var jobArgs = flag.String("args", "{}", "job arguments")
var jobPayload = flag.String("payload", "{}", "job payload")

func main() {
flag.Parse()
Expand All @@ -25,15 +25,15 @@ func main() {

pool := newPool(*redisHostPort)

var args map[string]interface{}
err := json.Unmarshal([]byte(*jobArgs), &args)
var payload map[string]interface{}
err := json.Unmarshal([]byte(*jobPayload), &payload)
if err != nil {
fmt.Println("invalid args:", err)
os.Exit(1)
}

en := work.NewEnqueuer(*redisNamespace, pool)
en.Enqueue(*jobName, args)
en.Enqueue(*jobName, payload)
}

func newPool(addr string) *redis.Pool {
Expand Down
47 changes: 35 additions & 12 deletions enqueue.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
package work

import (
"github.com/garyburd/redigo/redis"
"sync"
"time"

"github.com/garyburd/redigo/redis"
)

// Enqueuer can enqueue jobs.
Expand Down Expand Up @@ -37,12 +36,18 @@ func NewEnqueuer(namespace string, pool *redis.Pool) *Enqueuer {

// Enqueue will enqueue the specified job name and arguments. The args param can be nil if no args ar needed.
// Example: e.Enqueue("send_email", work.Q{"addr": "test@example.com"})
func (e *Enqueuer) Enqueue(jobName string, args map[string]interface{}) (*Job, error) {
func (e *Enqueuer) Enqueue(jobName string, payload interface{}) (*Job, error) {
job := &Job{
Name: jobName,
ID: makeIdentifier(),
EnqueuedAt: nowEpochSeconds(),
Args: args,
}

if payload != nil {
err := job.SetPayload(payload)
if err != nil {
return nil, err
}
}

rawJSON, err := job.serialize()
Expand All @@ -65,12 +70,18 @@ func (e *Enqueuer) Enqueue(jobName string, args map[string]interface{}) (*Job, e
}

// EnqueueIn enqueues a job in the scheduled job queue for execution in secondsFromNow seconds.
func (e *Enqueuer) EnqueueIn(jobName string, secondsFromNow int64, args map[string]interface{}) (*ScheduledJob, error) {
func (e *Enqueuer) EnqueueIn(jobName string, secondsFromNow int64, payload interface{}) (*ScheduledJob, error) {
job := &Job{
Name: jobName,
ID: makeIdentifier(),
EnqueuedAt: nowEpochSeconds(),
Args: args,
}

if payload != nil {
err := job.SetPayload(payload)
if err != nil {
return nil, err
}
}

rawJSON, err := job.serialize()
Expand Down Expand Up @@ -101,8 +112,8 @@ func (e *Enqueuer) EnqueueIn(jobName string, secondsFromNow int64, args map[stri
// EnqueueUnique enqueues a job unless a job is already enqueued with the same name and arguments. The already-enqueued job can be in the normal work queue or in the scheduled job queue. Once a worker begins processing a job, another job with the same name and arguments can be enqueued again. Any failed jobs in the retry queue or dead queue don't count against the uniqueness -- so if a job fails and is retried, two unique jobs with the same name and arguments can be enqueued at once.
// In order to add robustness to the system, jobs are only unique for 24 hours after they're enqueued. This is mostly relevant for scheduled jobs.
// EnqueueUnique returns the job if it was enqueued and nil if it wasn't
func (e *Enqueuer) EnqueueUnique(jobName string, args map[string]interface{}) (*Job, error) {
uniqueKey, err := redisKeyUniqueJob(e.Namespace, jobName, args)
func (e *Enqueuer) EnqueueUnique(jobName string, payload interface{}) (*Job, error) {
uniqueKey, err := redisKeyUniqueJob(e.Namespace, jobName, payload)
if err != nil {
return nil, err
}
Expand All @@ -111,9 +122,14 @@ func (e *Enqueuer) EnqueueUnique(jobName string, args map[string]interface{}) (*
Name: jobName,
ID: makeIdentifier(),
EnqueuedAt: nowEpochSeconds(),
Args: args,
Unique: true,
}
if payload != nil {
err = job.SetPayload(payload)
if err != nil {
return nil, err
}
}

rawJSON, err := job.serialize()
if err != nil {
Expand All @@ -140,8 +156,8 @@ func (e *Enqueuer) EnqueueUnique(jobName string, args map[string]interface{}) (*
}

// EnqueueUniqueIn enqueues a unique job in the scheduled job queue for execution in secondsFromNow seconds. See EnqueueUnique for the semantics of unique jobs.
func (e *Enqueuer) EnqueueUniqueIn(jobName string, secondsFromNow int64, args map[string]interface{}) (*ScheduledJob, error) {
uniqueKey, err := redisKeyUniqueJob(e.Namespace, jobName, args)
func (e *Enqueuer) EnqueueUniqueIn(jobName string, secondsFromNow int64, payload interface{}) (*ScheduledJob, error) {
uniqueKey, err := redisKeyUniqueJob(e.Namespace, jobName, payload)
if err != nil {
return nil, err
}
Expand All @@ -150,10 +166,16 @@ func (e *Enqueuer) EnqueueUniqueIn(jobName string, secondsFromNow int64, args ma
Name: jobName,
ID: makeIdentifier(),
EnqueuedAt: nowEpochSeconds(),
Args: args,
Unique: true,
}

if payload != nil {
err = job.SetPayload(payload)
if err != nil {
return nil, err
}
}

rawJSON, err := job.serialize()
if err != nil {
return nil, err
Expand Down Expand Up @@ -182,6 +204,7 @@ func (e *Enqueuer) EnqueueUniqueIn(jobName string, secondsFromNow int64, args ma
if res == "ok" && err == nil {
return scheduledJob, nil
}

return nil, err
}

Expand Down
Loading