From 91212b9877a0a615b78f41789c8086392a6dca4f Mon Sep 17 00:00:00 2001 From: Jason Raede Date: Tue, 15 Nov 2016 22:54:50 -0500 Subject: [PATCH 1/4] Replace args with payload --- client.go | 8 +- client_test.go | 32 +++-- cmd/workenqueue/main.go | 8 +- enqueue.go | 47 +++++-- enqueue_test.go | 53 ++++---- job.go | 151 ++++------------------ job_test.go | 259 +++----------------------------------- observer.go | 20 +-- observer_test.go | 18 ++- periodic_enqueuer.go | 2 +- periodic_enqueuer_test.go | 2 +- redis.go | 20 ++- run_test.go | 11 +- worker.go | 4 +- worker_test.go | 24 ++-- 15 files changed, 191 insertions(+), 468 deletions(-) diff --git a/client.go b/client.go index c7a2e7e9..cae61afe 100644 --- a/client.go +++ b/client.go @@ -125,7 +125,7 @@ type WorkerObservation struct { JobName string `json:"job_name"` JobID string `json:"job_id"` StartedAt int64 `json:"started_at"` - ArgsJSON string `json:"args_json"` + Payload []byte `json:"payload"` Checkin string `json:"checkin"` CheckinAt int64 `json:"checkin_at"` } @@ -182,8 +182,8 @@ func (c *Client) WorkerObservations() ([]*WorkerObservation, error) { ob.JobID = value } else if key == "started_at" { ob.StartedAt, err = strconv.ParseInt(value, 10, 64) - } else if key == "args" { - ob.ArgsJSON = value + } else if key == "payload" { + ob.Payload = []byte(value) } else if key == "checkin" { ob.Checkin = value } else if key == "checkin_at" { @@ -480,7 +480,7 @@ func (c *Client) DeleteScheduledJob(scheduledFor int64, jobID string) error { } if job.Unique { - uniqueKey, err := redisKeyUniqueJob(c.namespace, job.Name, job.Args) + uniqueKey, err := redisKeyUniqueJob(c.namespace, job.Name, job.Payload) if err != nil { logError("client.delete_scheduled_job.redis_key_unique_job", err) return err diff --git a/client_test.go b/client_test.go index f4fba19b..9b05b887 100644 --- a/client_test.go +++ b/client_test.go @@ -97,13 +97,13 @@ func TestClientWorkerObservations(t *testing.T) { if ob.JobName == "foo" { fooCount++ assert.True(t, ob.IsBusy) - assert.Equal(t, `{"a":3,"b":4}`, ob.ArgsJSON) + assert.Equal(t, []byte(`{"a":3,"b":4}`), ob.Payload) assert.True(t, (nowEpochSeconds()-ob.StartedAt) <= 3) assert.True(t, ob.JobID != "") } else if ob.JobName == "wat" { watCount++ assert.True(t, ob.IsBusy) - assert.Equal(t, `{"a":1,"b":2}`, ob.ArgsJSON) + assert.Equal(t, []byte(`{"a":1,"b":2}`), ob.Payload) assert.True(t, (nowEpochSeconds()-ob.StartedAt) <= 3) assert.True(t, ob.JobID != "") } else { @@ -213,8 +213,11 @@ func TestClientScheduledJobs(t *testing.T) { assert.EqualValues(t, 1425263409, jobs[1].EnqueuedAt) assert.EqualValues(t, 1425263409, jobs[2].EnqueuedAt) - assert.EqualValues(t, interface{}(1), jobs[0].Args["a"]) - assert.EqualValues(t, interface{}(2), jobs[0].Args["b"]) + var q Q + err = jobs[0].GetPayload(&q) + assert.Nil(t, err) + assert.EqualValues(t, interface{}(1), q["a"]) + assert.EqualValues(t, interface{}(2), q["b"]) assert.EqualValues(t, 0, jobs[0].Fails) assert.EqualValues(t, 0, jobs[1].Fails) @@ -262,7 +265,9 @@ func TestClientRetryJobs(t *testing.T) { assert.EqualValues(t, 1425263429, jobs[0].FailedAt) assert.Equal(t, "wat", jobs[0].Name) assert.EqualValues(t, 1425263409, jobs[0].EnqueuedAt) - assert.EqualValues(t, interface{}(1), jobs[0].Args["a"]) + var q Q + assert.Nil(t, jobs[0].GetPayload(&q)) + assert.EqualValues(t, interface{}(1), q["a"]) assert.EqualValues(t, 1, jobs[0].Fails) assert.EqualValues(t, 1425263429, jobs[0].Job.FailedAt) assert.Equal(t, "ohno", jobs[0].LastErr) @@ -302,7 +307,9 @@ func TestClientDeadJobs(t *testing.T) { assert.EqualValues(t, 1425263429, jobs[0].FailedAt) assert.Equal(t, "wat", jobs[0].Name) assert.EqualValues(t, 1425263409, jobs[0].EnqueuedAt) - assert.EqualValues(t, interface{}(1), jobs[0].Args["a"]) + var q Q + assert.Nil(t, jobs[0].GetPayload(&q)) + assert.EqualValues(t, interface{}(1), q["a"]) assert.EqualValues(t, 1, jobs[0].Fails) assert.EqualValues(t, 1425263429, jobs[0].Job.FailedAt) assert.Equal(t, "ohno", jobs[0].LastErr) @@ -422,7 +429,7 @@ func TestClientRetryDeadJobWithArgs(t *testing.T) { Name: name, ID: makeIdentifier(), EnqueuedAt: encAt, - Args: map[string]interface{}{"a": "wat"}, + Payload: []byte(`{"a": "wat"}`), Fails: 3, LastErr: "sorry", FailedAt: failAt, @@ -447,9 +454,10 @@ func TestClientRetryDeadJobWithArgs(t *testing.T) { job1 := getQueuedJob(ns, pool, name) if assert.NotNil(t, job1) { + var q Q + assert.Nil(t, job1.GetPayload(&q)) assert.Equal(t, name, job1.Name) - assert.Equal(t, "wat", job1.ArgString("a")) - assert.NoError(t, job1.ArgError()) + assert.Equal(t, "wat", q["a"].(string)) } } @@ -553,7 +561,7 @@ func TestClientRetryAllDeadJobsBig(t *testing.T) { Name: "wat1", ID: makeIdentifier(), EnqueuedAt: 12345, - Args: nil, + Payload: nil, Fails: 3, LastErr: "sorry", FailedAt: 12347, @@ -574,7 +582,7 @@ func TestClientRetryAllDeadJobsBig(t *testing.T) { Name: "dontexist", ID: makeIdentifier(), EnqueuedAt: 12345, - Args: nil, + Payload: nil, Fails: 3, LastErr: "sorry", FailedAt: 12347, @@ -687,7 +695,7 @@ func insertDeadJob(ns string, pool *redis.Pool, name string, encAt, failAt int64 Name: name, ID: makeIdentifier(), EnqueuedAt: encAt, - Args: nil, + Payload: nil, Fails: 3, LastErr: "sorry", FailedAt: failAt, diff --git a/cmd/workenqueue/main.go b/cmd/workenqueue/main.go index fb08c3c3..e281196c 100644 --- a/cmd/workenqueue/main.go +++ b/cmd/workenqueue/main.go @@ -13,7 +13,7 @@ import ( var redisHostPort = flag.String("redis", ":6379", "redis hostport") var redisNamespace = flag.String("ns", "work", "redis namespace") var jobName = flag.String("job", "", "job name") -var jobArgs = flag.String("args", "{}", "job arguments") +var jobPayload = flag.String("payload", "{}", "job payload") func main() { flag.Parse() @@ -25,15 +25,15 @@ func main() { pool := newPool(*redisHostPort) - var args map[string]interface{} - err := json.Unmarshal([]byte(*jobArgs), &args) + var payload map[string]interface{} + err := json.Unmarshal([]byte(*jobPayload), &payload) if err != nil { fmt.Println("invalid args:", err) os.Exit(1) } en := work.NewEnqueuer(*redisNamespace, pool) - en.Enqueue(*jobName, args) + en.Enqueue(*jobName, payload) } func newPool(addr string) *redis.Pool { diff --git a/enqueue.go b/enqueue.go index c1f4ed6c..2b1d98a0 100644 --- a/enqueue.go +++ b/enqueue.go @@ -1,10 +1,9 @@ package work import ( + "github.com/garyburd/redigo/redis" "sync" "time" - - "github.com/garyburd/redigo/redis" ) // Enqueuer can enqueue jobs. @@ -37,12 +36,18 @@ func NewEnqueuer(namespace string, pool *redis.Pool) *Enqueuer { // Enqueue will enqueue the specified job name and arguments. The args param can be nil if no args ar needed. // Example: e.Enqueue("send_email", work.Q{"addr": "test@example.com"}) -func (e *Enqueuer) Enqueue(jobName string, args map[string]interface{}) (*Job, error) { +func (e *Enqueuer) Enqueue(jobName string, payload interface{}) (*Job, error) { job := &Job{ Name: jobName, ID: makeIdentifier(), EnqueuedAt: nowEpochSeconds(), - Args: args, + } + + if payload != nil { + err := job.SetPayload(payload) + if err != nil { + return nil, err + } } rawJSON, err := job.serialize() @@ -65,12 +70,18 @@ func (e *Enqueuer) Enqueue(jobName string, args map[string]interface{}) (*Job, e } // EnqueueIn enqueues a job in the scheduled job queue for execution in secondsFromNow seconds. -func (e *Enqueuer) EnqueueIn(jobName string, secondsFromNow int64, args map[string]interface{}) (*ScheduledJob, error) { +func (e *Enqueuer) EnqueueIn(jobName string, secondsFromNow int64, payload interface{}) (*ScheduledJob, error) { job := &Job{ Name: jobName, ID: makeIdentifier(), EnqueuedAt: nowEpochSeconds(), - Args: args, + } + + if payload != nil { + err := job.SetPayload(payload) + if err != nil { + return nil, err + } } rawJSON, err := job.serialize() @@ -101,8 +112,8 @@ func (e *Enqueuer) EnqueueIn(jobName string, secondsFromNow int64, args map[stri // EnqueueUnique enqueues a job unless a job is already enqueued with the same name and arguments. The already-enqueued job can be in the normal work queue or in the scheduled job queue. Once a worker begins processing a job, another job with the same name and arguments can be enqueued again. Any failed jobs in the retry queue or dead queue don't count against the uniqueness -- so if a job fails and is retried, two unique jobs with the same name and arguments can be enqueued at once. // In order to add robustness to the system, jobs are only unique for 24 hours after they're enqueued. This is mostly relevant for scheduled jobs. // EnqueueUnique returns the job if it was enqueued and nil if it wasn't -func (e *Enqueuer) EnqueueUnique(jobName string, args map[string]interface{}) (*Job, error) { - uniqueKey, err := redisKeyUniqueJob(e.Namespace, jobName, args) +func (e *Enqueuer) EnqueueUnique(jobName string, payload interface{}) (*Job, error) { + uniqueKey, err := redisKeyUniqueJob(e.Namespace, jobName, payload) if err != nil { return nil, err } @@ -111,9 +122,14 @@ func (e *Enqueuer) EnqueueUnique(jobName string, args map[string]interface{}) (* Name: jobName, ID: makeIdentifier(), EnqueuedAt: nowEpochSeconds(), - Args: args, Unique: true, } + if payload != nil { + err = job.SetPayload(payload) + if err != nil { + return nil, err + } + } rawJSON, err := job.serialize() if err != nil { @@ -140,8 +156,8 @@ func (e *Enqueuer) EnqueueUnique(jobName string, args map[string]interface{}) (* } // EnqueueUniqueIn enqueues a unique job in the scheduled job queue for execution in secondsFromNow seconds. See EnqueueUnique for the semantics of unique jobs. -func (e *Enqueuer) EnqueueUniqueIn(jobName string, secondsFromNow int64, args map[string]interface{}) (*ScheduledJob, error) { - uniqueKey, err := redisKeyUniqueJob(e.Namespace, jobName, args) +func (e *Enqueuer) EnqueueUniqueIn(jobName string, secondsFromNow int64, payload interface{}) (*ScheduledJob, error) { + uniqueKey, err := redisKeyUniqueJob(e.Namespace, jobName, payload) if err != nil { return nil, err } @@ -150,10 +166,16 @@ func (e *Enqueuer) EnqueueUniqueIn(jobName string, secondsFromNow int64, args ma Name: jobName, ID: makeIdentifier(), EnqueuedAt: nowEpochSeconds(), - Args: args, Unique: true, } + if payload != nil { + err = job.SetPayload(payload) + if err != nil { + return nil, err + } + } + rawJSON, err := job.serialize() if err != nil { return nil, err @@ -182,6 +204,7 @@ func (e *Enqueuer) EnqueueUniqueIn(jobName string, secondsFromNow int64, args ma if res == "ok" && err == nil { return scheduledJob, nil } + return nil, err } diff --git a/enqueue_test.go b/enqueue_test.go index 8ed480c7..2d52d06b 100644 --- a/enqueue_test.go +++ b/enqueue_test.go @@ -18,9 +18,11 @@ func TestEnqueue(t *testing.T) { assert.True(t, len(job.ID) > 10) // Something is in it assert.True(t, job.EnqueuedAt > (time.Now().Unix()-10)) // Within 10 seconds assert.True(t, job.EnqueuedAt < (time.Now().Unix()+10)) // Within 10 seconds - assert.Equal(t, "cool", job.ArgString("b")) - assert.EqualValues(t, 1, job.ArgInt64("a")) - assert.NoError(t, job.ArgError()) + var args Q + assert.Nil(t, job.GetPayload(&args)) + + assert.Equal(t, "cool", args["b"].(string)) + assert.EqualValues(t, 1, args["a"].(float64)) // Make sure "wat" is in the known jobs assert.EqualValues(t, []string{"wat"}, knownJobs(pool, redisKeyKnownJobs(ns))) @@ -38,9 +40,10 @@ func TestEnqueue(t *testing.T) { assert.True(t, len(j.ID) > 10) // Something is in it assert.True(t, j.EnqueuedAt > (time.Now().Unix()-10)) // Within 10 seconds assert.True(t, j.EnqueuedAt < (time.Now().Unix()+10)) // Within 10 seconds - assert.Equal(t, "cool", j.ArgString("b")) - assert.EqualValues(t, 1, j.ArgInt64("a")) - assert.NoError(t, j.ArgError()) + var q Q + assert.Nil(t, j.GetPayload(&q)) + assert.Equal(t, "cool", q["b"].(string)) + assert.EqualValues(t, 1, q["a"].(float64)) // Now enqueue another job, make sure that we can enqueue multiple _, err = enqueuer.Enqueue("wat", Q{"a": 1, "b": "cool"}) @@ -65,9 +68,10 @@ func TestEnqueueIn(t *testing.T) { assert.True(t, len(job.ID) > 10) // Something is in it assert.True(t, job.EnqueuedAt > (time.Now().Unix()-10)) // Within 10 seconds assert.True(t, job.EnqueuedAt < (time.Now().Unix()+10)) // Within 10 seconds - assert.Equal(t, "cool", job.ArgString("b")) - assert.EqualValues(t, 1, job.ArgInt64("a")) - assert.NoError(t, job.ArgError()) + var q Q + assert.Nil(t, job.GetPayload(&q)) + assert.Equal(t, "cool", q["b"].(string)) + assert.EqualValues(t, 1, q["a"].(float64)) assert.EqualValues(t, job.EnqueuedAt+300, job.RunAt) } @@ -91,9 +95,10 @@ func TestEnqueueIn(t *testing.T) { assert.True(t, len(j.ID) > 10) // Something is in it assert.True(t, j.EnqueuedAt > (time.Now().Unix()-10)) // Within 10 seconds assert.True(t, j.EnqueuedAt < (time.Now().Unix()+10)) // Within 10 seconds - assert.Equal(t, "cool", j.ArgString("b")) - assert.EqualValues(t, 1, j.ArgInt64("a")) - assert.NoError(t, j.ArgError()) + var q Q + assert.Nil(t, j.GetPayload(&q)) + assert.Equal(t, "cool", q["b"].(string)) + assert.EqualValues(t, 1, q["a"].(float64)) } func TestEnqueueUnique(t *testing.T) { @@ -109,9 +114,10 @@ func TestEnqueueUnique(t *testing.T) { assert.True(t, len(job.ID) > 10) // Something is in it assert.True(t, job.EnqueuedAt > (time.Now().Unix()-10)) // Within 10 seconds assert.True(t, job.EnqueuedAt < (time.Now().Unix()+10)) // Within 10 seconds - assert.Equal(t, "cool", job.ArgString("b")) - assert.EqualValues(t, 1, job.ArgInt64("a")) - assert.NoError(t, job.ArgError()) + var q Q + assert.Nil(t, job.GetPayload(&q)) + assert.Equal(t, "cool", q["b"].(string)) + assert.EqualValues(t, 1, q["a"].(float64)) } job, err = enqueuer.EnqueueUnique("wat", Q{"a": 1, "b": "cool"}) @@ -134,7 +140,7 @@ func TestEnqueueUnique(t *testing.T) { assert.NoError(t, err) assert.NotNil(t, job) - // Process the queues. Ensure the right numbero of jobs was processed + // Process the queues. Ensure the right number of jobs was processed var wats, taws int64 wp := NewWorkerPool(3, ns, pool) wp.JobWithOptions("wat", JobOptions{Priority: 1, MaxFails: 1}, func(ctx *Context) error { @@ -182,9 +188,11 @@ func TestEnqueueUniqueIn(t *testing.T) { assert.True(t, len(job.ID) > 10) // Something is in it assert.True(t, job.EnqueuedAt > (time.Now().Unix()-10)) // Within 10 seconds assert.True(t, job.EnqueuedAt < (time.Now().Unix()+10)) // Within 10 seconds - assert.Equal(t, "cool", job.ArgString("b")) - assert.EqualValues(t, 1, job.ArgInt64("a")) - assert.NoError(t, job.ArgError()) + + var q Q + assert.Nil(t, job.GetPayload(&q)) + assert.Equal(t, "cool", q["b"].(string)) + assert.EqualValues(t, 1, q["a"].(float64)) assert.EqualValues(t, job.EnqueuedAt+300, job.RunAt) } @@ -202,9 +210,10 @@ func TestEnqueueUniqueIn(t *testing.T) { assert.True(t, len(j.ID) > 10) // Something is in it assert.True(t, j.EnqueuedAt > (time.Now().Unix()-10)) // Within 10 seconds assert.True(t, j.EnqueuedAt < (time.Now().Unix()+10)) // Within 10 seconds - assert.Equal(t, "cool", j.ArgString("b")) - assert.EqualValues(t, 1, j.ArgInt64("a")) - assert.NoError(t, j.ArgError()) + var q Q + assert.Nil(t, j.GetPayload(&q)) + assert.Equal(t, "cool", q["b"].(string)) + assert.EqualValues(t, 1, q["a"].(float64)) assert.True(t, j.Unique) // Now try to enqueue more stuff and ensure it diff --git a/job.go b/job.go index 7b3a7076..e59c80fb 100644 --- a/job.go +++ b/job.go @@ -2,19 +2,21 @@ package work import ( "encoding/json" - "fmt" - "math" - "reflect" ) // Job represents a job. type Job struct { // Inputs when making a new job - Name string `json:"name,omitempty"` - ID string `json:"id"` - EnqueuedAt int64 `json:"t"` - Args map[string]interface{} `json:"args"` - Unique bool `json:"unique,omitempty"` + Name string `json:"name,omitempty"` + ID string `json:"id"` + EnqueuedAt int64 `json:"t"` + + // Payload can be any json-marshallable object. This leads to a + // double json-marshal which could be a bit better optimized, + // but makes it easier to have custom job definitions. + Payload []byte `json:"payload"` + + Unique bool `json:"unique,omitempty"` // Inputs when retrying Fails int64 `json:"fails,omitempty"` // number of times this job has failed @@ -28,6 +30,19 @@ type Job struct { observer *observer } +func (j *Job) SetPayload(payload interface{}) error { + marshaled, err := json.Marshal(payload) + if err != nil { + return err + } + j.Payload = marshaled + return nil +} + +func (j *Job) GetPayload(dest interface{}) error { + return json.Unmarshal(j.Payload, dest) +} + // Q is a shortcut to easily specify arguments for jobs when enqueueing them. // Example: e.Enqueue("send_email", work.Q{"addr": "test@example.com", "track": true}) type Q map[string]interface{} @@ -48,14 +63,6 @@ func (j *Job) serialize() ([]byte, error) { return json.Marshal(j) } -// setArg sets a single named argument on the job. -func (j *Job) setArg(key string, val interface{}) { - if j.Args == nil { - j.Args = make(map[string]interface{}) - } - j.Args[key] = val -} - func (j *Job) failed(err error) { j.Fails++ j.LastErr = err.Error() @@ -68,115 +75,3 @@ func (j *Job) Checkin(msg string) { j.observer.observeCheckin(j.Name, j.ID, msg) } } - -// ArgString returns j.Args[key] typed to a string. If the key is missing or of the wrong type, it sets an argument error -// on the job. This function is meant to be used in the body of a job handling function while extracting arguments, -// followed by a single call to j.ArgError(). -func (j *Job) ArgString(key string) string { - v, ok := j.Args[key] - if ok { - typedV, ok := v.(string) - if ok { - return typedV - } - j.argError = typecastError("string", key, v) - } else { - j.argError = missingKeyError("string", key) - } - return "" -} - -// ArgInt64 returns j.Args[key] typed to an int64. If the key is missing or of the wrong type, it sets an argument error -// on the job. This function is meant to be used in the body of a job handling function while extracting arguments, -// followed by a single call to j.ArgError(). -func (j *Job) ArgInt64(key string) int64 { - v, ok := j.Args[key] - if ok { - rVal := reflect.ValueOf(v) - if isIntKind(rVal) { - return rVal.Int() - } else if isUintKind(rVal) { - vUint := rVal.Uint() - if vUint <= math.MaxInt64 { - return int64(vUint) - } - } else if isFloatKind(rVal) { - vFloat64 := rVal.Float() - vInt64 := int64(vFloat64) - if vFloat64 == math.Trunc(vFloat64) && vInt64 <= 9007199254740892 && vInt64 >= -9007199254740892 { - return vInt64 - } - } - j.argError = typecastError("int64", key, v) - } else { - j.argError = missingKeyError("int64", key) - } - return 0 -} - -// ArgFloat64 returns j.Args[key] typed to a float64. If the key is missing or of the wrong type, it sets an argument error -// on the job. This function is meant to be used in the body of a job handling function while extracting arguments, -// followed by a single call to j.ArgError(). -func (j *Job) ArgFloat64(key string) float64 { - v, ok := j.Args[key] - if ok { - rVal := reflect.ValueOf(v) - if isIntKind(rVal) { - return float64(rVal.Int()) - } else if isUintKind(rVal) { - return float64(rVal.Uint()) - } else if isFloatKind(rVal) { - return rVal.Float() - } - j.argError = typecastError("float64", key, v) - } else { - j.argError = missingKeyError("float64", key) - } - return 0.0 -} - -// ArgBool returns j.Args[key] typed to a bool. If the key is missing or of the wrong type, it sets an argument error -// on the job. This function is meant to be used in the body of a job handling function while extracting arguments, -// followed by a single call to j.ArgError(). -func (j *Job) ArgBool(key string) bool { - v, ok := j.Args[key] - if ok { - typedV, ok := v.(bool) - if ok { - return typedV - } - j.argError = typecastError("bool", key, v) - } else { - j.argError = missingKeyError("bool", key) - } - return false -} - -// ArgError returns the last error generated when extracting typed params. Returns nil if extracting the args went fine. -func (j *Job) ArgError() error { - return j.argError -} - -func isIntKind(v reflect.Value) bool { - k := v.Kind() - return k == reflect.Int || k == reflect.Int8 || k == reflect.Int16 || k == reflect.Int32 || k == reflect.Int64 -} - -func isUintKind(v reflect.Value) bool { - k := v.Kind() - return k == reflect.Uint || k == reflect.Uint8 || k == reflect.Uint16 || k == reflect.Uint32 || k == reflect.Uint64 -} - -func isFloatKind(v reflect.Value) bool { - k := v.Kind() - return k == reflect.Float32 || k == reflect.Float64 -} - -func missingKeyError(jsonType, key string) error { - return fmt.Errorf("looking for a %s in job.Arg[%s] but key wasn't found", jsonType, key) -} - -func typecastError(jsonType, key string, v interface{}) error { - actualType := reflect.TypeOf(v) - return fmt.Errorf("looking for a %s in job.Arg[%s] but value wasn't right type: %v(%v)", jsonType, key, actualType, v) -} diff --git a/job_test.go b/job_test.go index 97e2b99e..abe81573 100644 --- a/job_test.go +++ b/job_test.go @@ -2,254 +2,23 @@ package work import ( "github.com/stretchr/testify/assert" - "math" "testing" ) -func TestJobArgumentExtraction(t *testing.T) { - j := Job{} - j.setArg("str1", "bar") - - j.setArg("int1", int64(77)) - j.setArg("int2", 77) - j.setArg("int3", uint64(77)) - j.setArg("int4", float64(77.0)) - - j.setArg("bool1", true) - - j.setArg("float1", 3.14) - - // - // Success cases: - // - vString := j.ArgString("str1") - assert.Equal(t, vString, "bar") - assert.NoError(t, j.ArgError()) - - vInt64 := j.ArgInt64("int1") - assert.EqualValues(t, vInt64, 77) - assert.NoError(t, j.ArgError()) - - vInt64 = j.ArgInt64("int2") - assert.EqualValues(t, vInt64, 77) - assert.NoError(t, j.ArgError()) - - vInt64 = j.ArgInt64("int3") - assert.EqualValues(t, vInt64, 77) - assert.NoError(t, j.ArgError()) - - vInt64 = j.ArgInt64("int4") - assert.EqualValues(t, vInt64, 77) - assert.NoError(t, j.ArgError()) - - vBool := j.ArgBool("bool1") - assert.Equal(t, vBool, true) - assert.NoError(t, j.ArgError()) - - vFloat := j.ArgFloat64("float1") - assert.Equal(t, vFloat, 3.14) - assert.NoError(t, j.ArgError()) - - // Missing key results in error: - vString = j.ArgString("str_missing") - assert.Equal(t, vString, "") - assert.Error(t, j.ArgError()) - j.argError = nil - assert.NoError(t, j.ArgError()) - - vInt64 = j.ArgInt64("int_missing") - assert.EqualValues(t, vInt64, 0) - assert.Error(t, j.ArgError()) - j.argError = nil - assert.NoError(t, j.ArgError()) - - vBool = j.ArgBool("bool_missing") - assert.Equal(t, vBool, false) - assert.Error(t, j.ArgError()) - j.argError = nil - assert.NoError(t, j.ArgError()) - - vFloat = j.ArgFloat64("float_missing") - assert.Equal(t, vFloat, 0.0) - assert.Error(t, j.ArgError()) - j.argError = nil - assert.NoError(t, j.ArgError()) - - // Missing string; Make sure we don't reset it with successes after - vString = j.ArgString("str_missing") - assert.Equal(t, vString, "") - assert.Error(t, j.ArgError()) - _ = j.ArgString("str1") - _ = j.ArgInt64("int1") - _ = j.ArgBool("bool1") - _ = j.ArgFloat64("float1") - assert.Error(t, j.ArgError()) +type FakeJobPayload struct { + Str1 string `json:"str1"` + Int1 int `json:"int1"` } -func TestJobArgumentExtractionBadString(t *testing.T) { - var testCases = []struct { - key string - val interface{} - good bool - }{ - {"a", 1, false}, - {"b", false, false}, - {"c", "yay", true}, - } - - j := Job{} - - for _, tc := range testCases { - j.setArg(tc.key, tc.val) - } - - for _, tc := range testCases { - r := j.ArgString(tc.key) - err := j.ArgError() - if tc.good { - if err != nil { - t.Errorf("Failed test case: %v; err = %v\n", tc, err) - } - if r != tc.val.(string) { - t.Errorf("Failed test case: %v; r = %v\n", tc, r) - } - } else { - if err == nil { - t.Errorf("Failed test case: %v; but err was nil\n", tc) - } - if r != "" { - t.Errorf("Failed test case: %v; but r was %v\n", tc, r) - } - } - j.argError = nil - } -} - -func TestJobArgumentExtractionBadBool(t *testing.T) { - var testCases = []struct { - key string - val interface{} - good bool - }{ - {"a", 1, false}, - {"b", "boo", false}, - {"c", true, true}, - {"d", false, true}, - } - - j := Job{} - - for _, tc := range testCases { - j.setArg(tc.key, tc.val) - } - - for _, tc := range testCases { - r := j.ArgBool(tc.key) - err := j.ArgError() - if tc.good { - if err != nil { - t.Errorf("Failed test case: %v; err = %v\n", tc, err) - } - if r != tc.val.(bool) { - t.Errorf("Failed test case: %v; r = %v\n", tc, r) - } - } else { - if err == nil { - t.Errorf("Failed test case: %v; but err was nil\n", tc) - } - if r != false { - t.Errorf("Failed test case: %v; but r was %v\n", tc, r) - } - } - j.argError = nil - } -} - -func TestJobArgumentExtractionBadInt(t *testing.T) { - var testCases = []struct { - key string - val interface{} - good bool - }{ - {"a", "boo", false}, - {"b", true, false}, - {"c", 1.1, false}, - {"d", 19007199254740892.0, false}, - {"e", -19007199254740892.0, false}, - {"f", uint64(math.MaxInt64) + 1, false}, - - {"z", 0, true}, - {"y", 9007199254740892, true}, - {"x", 9007199254740892.0, true}, - {"w", 573839921, true}, - {"v", -573839921, true}, - {"u", uint64(math.MaxInt64), true}, - } - - j := Job{} - - for _, tc := range testCases { - j.setArg(tc.key, tc.val) - } - - for _, tc := range testCases { - r := j.ArgInt64(tc.key) - err := j.ArgError() - if tc.good { - if err != nil { - t.Errorf("Failed test case: %v; err = %v\n", tc, err) - } - } else { - if err == nil { - t.Errorf("Failed test case: %v; but err was nil\n", tc) - } - if r != 0 { - t.Errorf("Failed test case: %v; but r was %v\n", tc, r) - } - } - j.argError = nil - } -} - -func TestJobArgumentExtractionBadFloat(t *testing.T) { - var testCases = []struct { - key string - val interface{} - good bool - }{ - {"a", "boo", false}, - {"b", true, false}, - - {"z", 0, true}, - {"y", 9007199254740892, true}, - {"x", 9007199254740892.0, true}, - {"w", 573839921, true}, - {"v", -573839921, true}, - {"u", math.MaxFloat64, true}, - {"t", math.SmallestNonzeroFloat64, true}, - } - - j := Job{} - - for _, tc := range testCases { - j.setArg(tc.key, tc.val) - } - - for _, tc := range testCases { - r := j.ArgFloat64(tc.key) - err := j.ArgError() - if tc.good { - if err != nil { - t.Errorf("Failed test case: %v; err = %v\n", tc, err) - } - } else { - if err == nil { - t.Errorf("Failed test case: %v; but err was nil\n", tc) - } - if r != 0 { - t.Errorf("Failed test case: %v; but r was %v\n", tc, r) - } - } - j.argError = nil - } +func TestJobPayload(t *testing.T) { + j := new(Job) + assert.Nil(t, j.SetPayload(&FakeJobPayload{ + Str1: "foo", + Int1: 2, + })) + + payload := new(FakeJobPayload) + assert.Nil(t, j.GetPayload(payload)) + assert.Equal(t, "foo", payload.Str1) + assert.Equal(t, 2, payload.Int1) } diff --git a/observer.go b/observer.go index d92c74e5..e4f8d750 100644 --- a/observer.go +++ b/observer.go @@ -1,7 +1,6 @@ package work import ( - "encoding/json" "fmt" "github.com/garyburd/redigo/redis" "time" @@ -49,7 +48,7 @@ type observation struct { // These need to be set when starting a job startedAt int64 - arguments map[string]interface{} + payload []byte // If we're done w/ the job, err will indicate the success/failure of it err error // nil: success. not nil: the error we got when running the job @@ -90,13 +89,13 @@ func (o *observer) drain() { <-o.doneDrainingChan } -func (o *observer) observeStarted(jobName, jobID string, arguments map[string]interface{}) { +func (o *observer) observeStarted(jobName, jobID string, payload []byte) { o.observationsChan <- &observation{ kind: observationKindStarted, jobName: jobName, jobID: jobID, startedAt: nowEpochSeconds(), - arguments: arguments, + payload: payload, } } @@ -200,24 +199,13 @@ func (o *observer) writeStatus(obv *observation) error { // checkin -> obv.checkin // checkin_at -> obv.checkinAt - var argsJSON []byte - if len(obv.arguments) == 0 { - argsJSON = []byte("") - } else { - var err error - argsJSON, err = json.Marshal(obv.arguments) - if err != nil { - return err - } - } - args := make([]interface{}, 0, 13) args = append(args, key, "job_name", obv.jobName, "job_id", obv.jobID, "started_at", obv.startedAt, - "args", argsJSON, + "payload", obv.payload, ) if (obv.checkin != "") && (obv.checkinAt > 0) { diff --git a/observer_test.go b/observer_test.go index 086d83c7..75be7894 100644 --- a/observer_test.go +++ b/observer_test.go @@ -1,6 +1,7 @@ package work import ( + "encoding/json" "fmt" "github.com/garyburd/redigo/redis" "github.com/stretchr/testify/assert" @@ -8,6 +9,11 @@ import ( // "time" ) +func parsePayload(payload Q) []byte { + data, _ := json.Marshal(payload) + return data +} + func TestObserverStarted(t *testing.T) { pool := newTestPool(":6379") ns := "work" @@ -18,7 +24,7 @@ func TestObserverStarted(t *testing.T) { observer := newObserver(ns, pool, "abcd") observer.start() - observer.observeStarted("foo", "bar", Q{"a": 1, "b": "wat"}) + observer.observeStarted("foo", "bar", parsePayload(Q{"a": 1, "b": "wat"})) //observer.observeDone("foo", "bar", nil) observer.drain() observer.stop() @@ -27,7 +33,7 @@ func TestObserverStarted(t *testing.T) { assert.Equal(t, "foo", h["job_name"]) assert.Equal(t, "bar", h["job_id"]) assert.Equal(t, fmt.Sprint(tMock), h["started_at"]) - assert.Equal(t, `{"a":1,"b":"wat"}`, h["args"]) + assert.Equal(t, `{"a":1,"b":"wat"}`, h["payload"]) } func TestObserverStartedDone(t *testing.T) { @@ -40,7 +46,7 @@ func TestObserverStartedDone(t *testing.T) { observer := newObserver(ns, pool, "abcd") observer.start() - observer.observeStarted("foo", "bar", Q{"a": 1, "b": "wat"}) + observer.observeStarted("foo", "bar", parsePayload(Q{"a": 1, "b": "wat"})) observer.observeDone("foo", "bar", nil) observer.drain() observer.stop() @@ -59,7 +65,7 @@ func TestObserverCheckin(t *testing.T) { tMock := int64(1425263401) setNowEpochSecondsMock(tMock) defer resetNowEpochSecondsMock() - observer.observeStarted("foo", "bar", Q{"a": 1, "b": "wat"}) + observer.observeStarted("foo", "bar", parsePayload(Q{"a": 1, "b": "wat"})) tMockCheckin := int64(1425263402) setNowEpochSecondsMock(tMockCheckin) @@ -71,7 +77,7 @@ func TestObserverCheckin(t *testing.T) { assert.Equal(t, "foo", h["job_name"]) assert.Equal(t, "bar", h["job_id"]) assert.Equal(t, fmt.Sprint(tMock), h["started_at"]) - assert.Equal(t, `{"a":1,"b":"wat"}`, h["args"]) + assert.Equal(t, `{"a":1,"b":"wat"}`, h["payload"]) assert.Equal(t, "doin it", h["checkin"]) assert.Equal(t, fmt.Sprint(tMockCheckin), h["checkin_at"]) } @@ -86,7 +92,7 @@ func TestObserverCheckinFromJob(t *testing.T) { tMock := int64(1425263401) setNowEpochSecondsMock(tMock) defer resetNowEpochSecondsMock() - observer.observeStarted("foo", "barbar", Q{"a": 1, "b": "wat"}) + observer.observeStarted("foo", "barbar", parsePayload(Q{"a": 1, "b": "wat"})) tMockCheckin := int64(1425263402) setNowEpochSecondsMock(tMockCheckin) diff --git a/periodic_enqueuer.go b/periodic_enqueuer.go index 91085f70..3f2d64f0 100644 --- a/periodic_enqueuer.go +++ b/periodic_enqueuer.go @@ -101,7 +101,7 @@ func (pe *periodicEnqueuer) enqueue() error { // This is technically wrong, but this lets the bytes be identical for the same periodic job instance. If we don't do this, we'd need to use a different approach -- probably giving each periodic job its own history of the past 100 periodic jobs, and only scheduling a job if it's not in the history. EnqueuedAt: epoch, - Args: nil, + Payload: nil, } rawJSON, err := job.serialize() diff --git a/periodic_enqueuer_test.go b/periodic_enqueuer_test.go index 68d52b2f..c3e2f2d8 100644 --- a/periodic_enqueuer_test.go +++ b/periodic_enqueuer_test.go @@ -59,7 +59,7 @@ func TestPeriodicEnqueuer(t *testing.T) { for i, e := range expected { assert.EqualValues(t, scheduledJobs[i].RunAt, scheduledJobs[i].EnqueuedAt) - assert.Nil(t, scheduledJobs[i].Args) + assert.Nil(t, scheduledJobs[i].Payload) assert.Equal(t, e.name, scheduledJobs[i].Name) assert.Equal(t, e.id, scheduledJobs[i].ID) diff --git a/redis.go b/redis.go index c3f8c45b..f0a06c95 100644 --- a/redis.go +++ b/redis.go @@ -4,6 +4,7 @@ import ( "bytes" "encoding/json" "fmt" + "strings" ) func redisNamespacePrefix(namespace string) string { @@ -56,7 +57,7 @@ func redisKeyHeartbeat(namespace, workerPoolID string) string { return redisNamespacePrefix(namespace) + "worker_pools:" + workerPoolID } -func redisKeyUniqueJob(namespace, jobName string, args map[string]interface{}) (string, error) { +func redisKeyUniqueJob(namespace, jobName string, payload interface{}) (string, error) { var buf bytes.Buffer buf.WriteString(redisNamespacePrefix(namespace)) @@ -64,14 +65,25 @@ func redisKeyUniqueJob(namespace, jobName string, args map[string]interface{}) ( buf.WriteString(jobName) buf.WriteRune(':') - if args != nil { - err := json.NewEncoder(&buf).Encode(args) + if payload != nil { + var err error + // If it's already a byte array, use that. Otherwise encode it + if byteArr, ok := payload.([]byte); ok { + _, err = buf.Write(byteArr) + } else { + + err = json.NewEncoder(&buf).Encode(payload) + } + if err != nil { return "", err } } - return buf.String(), nil + // JSON encode adds a new line at the end so will not be the same as + // the decoded byte array. Need to strip it. + // https://github.com/golang/go/issues/7767 + return strings.TrimSuffix(buf.String(), "\n"), nil } func redisKeyLastPeriodicEnqueue(namespace string) string { diff --git a/run_test.go b/run_test.go index 1b60fedf..4180ae65 100644 --- a/run_test.go +++ b/run_test.go @@ -18,12 +18,14 @@ func appendToContext(ctx *Context, val interface{}) { func TestRunBasicMiddleware(t *testing.T) { mw1 := func(ctx *Context, next NextMiddlewareFunc) error { - ctx.Job.setArg("mw1", "mw1") + ctx.Job.SetPayload(Q{"mw1": "mw1", "a": "foo"}) return next() } mw2 := func(ctx *Context, next NextMiddlewareFunc) error { - appendToContext(ctx, ctx.Job.Args["mw1"]) + var q Q + assert.NoError(t, ctx.Job.GetPayload(&q)) + appendToContext(ctx, q["mw1"]) appendToContext(ctx, "mw2") return next() } @@ -34,8 +36,10 @@ func TestRunBasicMiddleware(t *testing.T) { } h1 := func(ctx *Context) error { + var q Q + assert.NoError(t, ctx.Job.GetPayload(&q)) appendToContext(ctx, "h1") - appendToContext(ctx, ctx.Job.Args["a"]) + appendToContext(ctx, q["a"]) return nil } @@ -48,7 +52,6 @@ func TestRunBasicMiddleware(t *testing.T) { job := &Job{ Name: "foo", - Args: map[string]interface{}{"a": "foo"}, } ctx := NewContext(job) diff --git a/worker.go b/worker.go index fc7c9df1..839b63fd 100644 --- a/worker.go +++ b/worker.go @@ -180,7 +180,7 @@ func (w *worker) processJob(job *Job) { } if jt, ok := w.jobTypes[job.Name]; ok { ctx := NewContext(job) - w.observeStarted(job.Name, job.ID, job.Args) + w.observeStarted(job.Name, job.ID, job.Payload) job.observer = w.observer // for Checkin runErr := runJob(ctx, w.middleware, jt) w.observeDone(job.Name, job.ID, runErr) @@ -200,7 +200,7 @@ func (w *worker) processJob(job *Job) { } func (w *worker) deleteUniqueJob(job *Job) { - uniqueKey, err := redisKeyUniqueJob(w.namespace, job.Name, job.Args) + uniqueKey, err := redisKeyUniqueJob(w.namespace, job.Name, job.Payload) if err != nil { logError("worker.delete_unique_job.key", err) } diff --git a/worker_test.go b/worker_test.go index 9303a641..3a55074a 100644 --- a/worker_test.go +++ b/worker_test.go @@ -10,6 +10,10 @@ import ( "github.com/stretchr/testify/assert" ) +type WorkerTestJobArgs struct { + A float64 +} + func TestWorkerBasics(t *testing.T) { pool := newTestPool(":6379") ns := "work" @@ -28,7 +32,9 @@ func TestWorkerBasics(t *testing.T) { Name: job1, JobOptions: JobOptions{Priority: 1}, Handler: func(ctx *Context) error { - arg1 = ctx.Job.Args["a"].(float64) + args := new(WorkerTestJobArgs) + ctx.Job.GetPayload(args) + arg1 = args.A return nil }, } @@ -36,7 +42,9 @@ func TestWorkerBasics(t *testing.T) { Name: job2, JobOptions: JobOptions{Priority: 1}, Handler: func(ctx *Context) error { - arg2 = ctx.Job.Args["a"].(float64) + args := new(WorkerTestJobArgs) + ctx.Job.GetPayload(args) + arg2 = args.A return nil }, } @@ -44,17 +52,19 @@ func TestWorkerBasics(t *testing.T) { Name: job3, JobOptions: JobOptions{Priority: 1}, Handler: func(ctx *Context) error { - arg3 = ctx.Job.Args["a"].(float64) + args := new(WorkerTestJobArgs) + ctx.Job.GetPayload(args) + arg3 = args.A return nil }, } enqueuer := NewEnqueuer(ns, pool) - _, err := enqueuer.Enqueue(job1, Q{"a": 1}) + _, err := enqueuer.Enqueue(job1, &WorkerTestJobArgs{1}) assert.Nil(t, err) - _, err = enqueuer.Enqueue(job2, Q{"a": 2}) + _, err = enqueuer.Enqueue(job2, &WorkerTestJobArgs{2}) assert.Nil(t, err) - _, err = enqueuer.Enqueue(job3, Q{"a": 3}) + _, err = enqueuer.Enqueue(job3, &WorkerTestJobArgs{3}) assert.Nil(t, err) w := newWorker(ns, "1", pool, nil, jobTypes) @@ -118,7 +128,7 @@ func TestWorkerInProgress(t *testing.T) { w.observer.drain() h := readHash(pool, redisKeyWorkerObservation(ns, w.workerID)) assert.Equal(t, job1, h["job_name"]) - assert.Equal(t, `{"a":1}`, h["args"]) + assert.Equal(t, `{"a":1}`, h["payload"]) // NOTE: we could check for job_id and started_at, but it's a PITA and it's tested in observer_test. w.drain() From 53379284af26fe7a1cfa5acc6f86c5858e18f9db Mon Sep 17 00:00:00 2001 From: Jason Raede Date: Fri, 18 Nov 2016 13:59:17 -0500 Subject: [PATCH 2/4] Change args to JSON-marshallable payload --- README.md | 28 +++++++++---------- client_test.go | 8 +++--- enqueue_test.go | 14 +++++----- job.go | 2 +- job_test.go | 2 +- run_test.go | 4 +-- webui/internal/assets/src/DeadJobs.js | 2 +- webui/internal/assets/src/DeadJobs.test.js | 6 ++-- webui/internal/assets/src/RetryJobs.js | 2 +- webui/internal/assets/src/RetryJobs.test.js | 6 ++-- webui/internal/assets/src/ScheduledJobs.js | 2 +- .../internal/assets/src/ScheduledJobs.test.js | 6 ++-- webui/webui.go | 4 +-- worker_test.go | 6 ++-- 14 files changed, 45 insertions(+), 47 deletions(-) diff --git a/README.md b/README.md index cbb7baf2..40c2496b 100644 --- a/README.md +++ b/README.md @@ -35,12 +35,18 @@ var redisPool = &redis.Pool{ }, } +type SendEmailJobParameters struct { + Address string + Subject string + CustomerID int +} + // Make an enqueuer with a particular namespace var enqueuer = work.NewEnqueuer("my_app_namespace", redisPool) func main() { // Enqueue a job named "send_email" with the specified parameters. - _, err := enqueuer.Enqueue("send_email", work.Q{"address": "test@example.com", "subject": "hello world", "customer_id": 4}) + _, err := enqueuer.Enqueue("send_email", &SendEmailJobParameters{Address: "test@example.com", Subject: "hello world", CustomerID: 4}) if err != nil { log.Fatal(err) } @@ -73,6 +79,8 @@ var redisPool = &redis.Pool{ }, } +type SendEmail + func main() { // Make a new pool. Arguments: // 10 is the max concurrency @@ -82,7 +90,6 @@ func main() { // Add middleware that will be executed for each job pool.Middleware(LogMiddleware) - pool.Middleware(FindCustomerMiddleware) // Map the name of jobs to handler functions pool.Job("send_email", SendEmailHandler) @@ -107,25 +114,16 @@ func LogMiddleware(ctx *work.Context, next work.NextMiddlewareFunc) error { return next() } -func FindCustomerMiddleware(ctx *work.Context, next work.NextMiddlewareFunc) error { - // If there's a customer_id param, set it in the context for future middleware and handlers to use. - if _, ok := job.Args["customer_id"]; ok { - ctx.Set("customer_id", ctx.job.Args["customer_id"]) - } - - return next() -} - func SendEmailHandler(ctx *work.Context) error { // Extract arguments: - addr := ctx.Job.ArgString("address") - subject := ctx.Job.ArgString("subject") - if err := ctx.Job.ArgError(); err != nil { + args := new(SendEmailJobParameters) + err := ctx.Job.UnmarshalPayload(args) + if err != nil { return err } // Go ahead and send the email... - // sendEmailTo(addr, subject) + // sendEmailTo(args.Address, args.Subject) return nil } diff --git a/client_test.go b/client_test.go index 9b05b887..88956e69 100644 --- a/client_test.go +++ b/client_test.go @@ -214,7 +214,7 @@ func TestClientScheduledJobs(t *testing.T) { assert.EqualValues(t, 1425263409, jobs[2].EnqueuedAt) var q Q - err = jobs[0].GetPayload(&q) + err = jobs[0].UnmarshalPayload(&q) assert.Nil(t, err) assert.EqualValues(t, interface{}(1), q["a"]) assert.EqualValues(t, interface{}(2), q["b"]) @@ -266,7 +266,7 @@ func TestClientRetryJobs(t *testing.T) { assert.Equal(t, "wat", jobs[0].Name) assert.EqualValues(t, 1425263409, jobs[0].EnqueuedAt) var q Q - assert.Nil(t, jobs[0].GetPayload(&q)) + assert.Nil(t, jobs[0].UnmarshalPayload(&q)) assert.EqualValues(t, interface{}(1), q["a"]) assert.EqualValues(t, 1, jobs[0].Fails) assert.EqualValues(t, 1425263429, jobs[0].Job.FailedAt) @@ -308,7 +308,7 @@ func TestClientDeadJobs(t *testing.T) { assert.Equal(t, "wat", jobs[0].Name) assert.EqualValues(t, 1425263409, jobs[0].EnqueuedAt) var q Q - assert.Nil(t, jobs[0].GetPayload(&q)) + assert.Nil(t, jobs[0].UnmarshalPayload(&q)) assert.EqualValues(t, interface{}(1), q["a"]) assert.EqualValues(t, 1, jobs[0].Fails) assert.EqualValues(t, 1425263429, jobs[0].Job.FailedAt) @@ -455,7 +455,7 @@ func TestClientRetryDeadJobWithArgs(t *testing.T) { job1 := getQueuedJob(ns, pool, name) if assert.NotNil(t, job1) { var q Q - assert.Nil(t, job1.GetPayload(&q)) + assert.Nil(t, job1.UnmarshalPayload(&q)) assert.Equal(t, name, job1.Name) assert.Equal(t, "wat", q["a"].(string)) } diff --git a/enqueue_test.go b/enqueue_test.go index 2d52d06b..216ef238 100644 --- a/enqueue_test.go +++ b/enqueue_test.go @@ -19,7 +19,7 @@ func TestEnqueue(t *testing.T) { assert.True(t, job.EnqueuedAt > (time.Now().Unix()-10)) // Within 10 seconds assert.True(t, job.EnqueuedAt < (time.Now().Unix()+10)) // Within 10 seconds var args Q - assert.Nil(t, job.GetPayload(&args)) + assert.Nil(t, job.UnmarshalPayload(&args)) assert.Equal(t, "cool", args["b"].(string)) assert.EqualValues(t, 1, args["a"].(float64)) @@ -41,7 +41,7 @@ func TestEnqueue(t *testing.T) { assert.True(t, j.EnqueuedAt > (time.Now().Unix()-10)) // Within 10 seconds assert.True(t, j.EnqueuedAt < (time.Now().Unix()+10)) // Within 10 seconds var q Q - assert.Nil(t, j.GetPayload(&q)) + assert.Nil(t, j.UnmarshalPayload(&q)) assert.Equal(t, "cool", q["b"].(string)) assert.EqualValues(t, 1, q["a"].(float64)) @@ -69,7 +69,7 @@ func TestEnqueueIn(t *testing.T) { assert.True(t, job.EnqueuedAt > (time.Now().Unix()-10)) // Within 10 seconds assert.True(t, job.EnqueuedAt < (time.Now().Unix()+10)) // Within 10 seconds var q Q - assert.Nil(t, job.GetPayload(&q)) + assert.Nil(t, job.UnmarshalPayload(&q)) assert.Equal(t, "cool", q["b"].(string)) assert.EqualValues(t, 1, q["a"].(float64)) assert.EqualValues(t, job.EnqueuedAt+300, job.RunAt) @@ -96,7 +96,7 @@ func TestEnqueueIn(t *testing.T) { assert.True(t, j.EnqueuedAt > (time.Now().Unix()-10)) // Within 10 seconds assert.True(t, j.EnqueuedAt < (time.Now().Unix()+10)) // Within 10 seconds var q Q - assert.Nil(t, j.GetPayload(&q)) + assert.Nil(t, j.UnmarshalPayload(&q)) assert.Equal(t, "cool", q["b"].(string)) assert.EqualValues(t, 1, q["a"].(float64)) } @@ -115,7 +115,7 @@ func TestEnqueueUnique(t *testing.T) { assert.True(t, job.EnqueuedAt > (time.Now().Unix()-10)) // Within 10 seconds assert.True(t, job.EnqueuedAt < (time.Now().Unix()+10)) // Within 10 seconds var q Q - assert.Nil(t, job.GetPayload(&q)) + assert.Nil(t, job.UnmarshalPayload(&q)) assert.Equal(t, "cool", q["b"].(string)) assert.EqualValues(t, 1, q["a"].(float64)) } @@ -190,7 +190,7 @@ func TestEnqueueUniqueIn(t *testing.T) { assert.True(t, job.EnqueuedAt < (time.Now().Unix()+10)) // Within 10 seconds var q Q - assert.Nil(t, job.GetPayload(&q)) + assert.Nil(t, job.UnmarshalPayload(&q)) assert.Equal(t, "cool", q["b"].(string)) assert.EqualValues(t, 1, q["a"].(float64)) assert.EqualValues(t, job.EnqueuedAt+300, job.RunAt) @@ -211,7 +211,7 @@ func TestEnqueueUniqueIn(t *testing.T) { assert.True(t, j.EnqueuedAt > (time.Now().Unix()-10)) // Within 10 seconds assert.True(t, j.EnqueuedAt < (time.Now().Unix()+10)) // Within 10 seconds var q Q - assert.Nil(t, j.GetPayload(&q)) + assert.Nil(t, j.UnmarshalPayload(&q)) assert.Equal(t, "cool", q["b"].(string)) assert.EqualValues(t, 1, q["a"].(float64)) assert.True(t, j.Unique) diff --git a/job.go b/job.go index e59c80fb..8ebc5878 100644 --- a/job.go +++ b/job.go @@ -39,7 +39,7 @@ func (j *Job) SetPayload(payload interface{}) error { return nil } -func (j *Job) GetPayload(dest interface{}) error { +func (j *Job) UnmarshalPayload(dest interface{}) error { return json.Unmarshal(j.Payload, dest) } diff --git a/job_test.go b/job_test.go index abe81573..578f9001 100644 --- a/job_test.go +++ b/job_test.go @@ -18,7 +18,7 @@ func TestJobPayload(t *testing.T) { })) payload := new(FakeJobPayload) - assert.Nil(t, j.GetPayload(payload)) + assert.Nil(t, j.UnmarshalPayload(payload)) assert.Equal(t, "foo", payload.Str1) assert.Equal(t, 2, payload.Int1) } diff --git a/run_test.go b/run_test.go index 4180ae65..4a7a696f 100644 --- a/run_test.go +++ b/run_test.go @@ -24,7 +24,7 @@ func TestRunBasicMiddleware(t *testing.T) { mw2 := func(ctx *Context, next NextMiddlewareFunc) error { var q Q - assert.NoError(t, ctx.Job.GetPayload(&q)) + assert.NoError(t, ctx.Job.UnmarshalPayload(&q)) appendToContext(ctx, q["mw1"]) appendToContext(ctx, "mw2") return next() @@ -37,7 +37,7 @@ func TestRunBasicMiddleware(t *testing.T) { h1 := func(ctx *Context) error { var q Q - assert.NoError(t, ctx.Job.GetPayload(&q)) + assert.NoError(t, ctx.Job.UnmarshalPayload(&q)) appendToContext(ctx, "h1") appendToContext(ctx, q["a"]) return nil diff --git a/webui/internal/assets/src/DeadJobs.js b/webui/internal/assets/src/DeadJobs.js index ea044acf..cbf846ba 100644 --- a/webui/internal/assets/src/DeadJobs.js +++ b/webui/internal/assets/src/DeadJobs.js @@ -143,7 +143,7 @@ export default class DeadJobs extends React.Component { this.check(job)}/> {job.name} - {JSON.stringify(job.args)} + {job.payload} {job.err} diff --git a/webui/internal/assets/src/DeadJobs.test.js b/webui/internal/assets/src/DeadJobs.test.js index c5aa1a3c..531f1689 100644 --- a/webui/internal/assets/src/DeadJobs.test.js +++ b/webui/internal/assets/src/DeadJobs.test.js @@ -16,8 +16,8 @@ describe('DeadJobs', () => { deadJobs.setState({ count: 2, jobs: [ - {id: 1, name: 'test', args: {}, t: 1467760821, err: 'err1'}, - {id: 2, name: 'test2', args: {}, t: 1467760822, err: 'err2'} + {id: 1, name: 'test', payload: {}, t: 1467760821, err: 'err1'}, + {id: 2, name: 'test2', payload: {}, t: 1467760822, err: 'err2'} ] }); @@ -86,7 +86,7 @@ describe('DeadJobs', () => { job.push({ id: i, name: 'test', - args: {}, + payload: '{}', t: 1467760821, err: 'err', }); diff --git a/webui/internal/assets/src/RetryJobs.js b/webui/internal/assets/src/RetryJobs.js index c274b6e7..541a752f 100644 --- a/webui/internal/assets/src/RetryJobs.js +++ b/webui/internal/assets/src/RetryJobs.js @@ -59,7 +59,7 @@ export default class RetryJobs extends React.Component { return ( {job.name} - {JSON.stringify(job.args)} + {job.payload} {job.err} diff --git a/webui/internal/assets/src/RetryJobs.test.js b/webui/internal/assets/src/RetryJobs.test.js index 1ad7f680..e304e4f1 100644 --- a/webui/internal/assets/src/RetryJobs.test.js +++ b/webui/internal/assets/src/RetryJobs.test.js @@ -15,8 +15,8 @@ describe('RetryJobs', () => { retryJobs.setState({ count: 2, jobs: [ - {id: 1, name: 'test', args: {}, t: 1467760821, err: 'err1'}, - {id: 2, name: 'test2', args: {}, t: 1467760822, err: 'err2'} + {id: 1, name: 'test', payload: {}, t: 1467760821, err: 'err1'}, + {id: 2, name: 'test2', payload: {}, t: 1467760822, err: 'err2'} ] }); @@ -34,7 +34,7 @@ describe('RetryJobs', () => { job.push({ id: i, name: 'test', - args: {}, + payload: '{}', t: 1467760821, err: 'err', }); diff --git a/webui/internal/assets/src/ScheduledJobs.js b/webui/internal/assets/src/ScheduledJobs.js index 2a576605..c8201dff 100644 --- a/webui/internal/assets/src/ScheduledJobs.js +++ b/webui/internal/assets/src/ScheduledJobs.js @@ -58,7 +58,7 @@ export default class ScheduledJobs extends React.Component { return ( {job.name} - {JSON.stringify(job.args)} + {job.payload} ); diff --git a/webui/internal/assets/src/ScheduledJobs.test.js b/webui/internal/assets/src/ScheduledJobs.test.js index 0eb8805e..49fde430 100644 --- a/webui/internal/assets/src/ScheduledJobs.test.js +++ b/webui/internal/assets/src/ScheduledJobs.test.js @@ -15,8 +15,8 @@ describe('ScheduledJobs', () => { scheduledJobs.setState({ count: 2, jobs: [ - {id: 1, name: 'test', args: {}, t: 1467760821, err: 'err1'}, - {id: 2, name: 'test2', args: {}, t: 1467760822, err: 'err2'} + {id: 1, name: 'test', payload: {}, t: 1467760821, err: 'err1'}, + {id: 2, name: 'test2', payload: {}, t: 1467760822, err: 'err2'} ] }); @@ -34,7 +34,7 @@ describe('ScheduledJobs', () => { job.push({ id: i, name: 'test', - args: {}, + payload: '{}', t: 1467760821, err: 'err', }); diff --git a/webui/webui.go b/webui/webui.go index 428a761e..51ceb48e 100644 --- a/webui/webui.go +++ b/webui/webui.go @@ -7,11 +7,11 @@ import ( "strconv" "sync" + "github.com/DispatchMe/go-work" + "github.com/DispatchMe/go-work/webui/internal/assets" "github.com/braintree/manners" "github.com/garyburd/redigo/redis" "github.com/gocraft/web" - "github.com/gocraft/work" - "github.com/gocraft/work/webui/internal/assets" ) // Server implements an HTTP server which exposes a JSON API to view and manage gocraft/work items. diff --git a/worker_test.go b/worker_test.go index 3a55074a..42a00e27 100644 --- a/worker_test.go +++ b/worker_test.go @@ -33,7 +33,7 @@ func TestWorkerBasics(t *testing.T) { JobOptions: JobOptions{Priority: 1}, Handler: func(ctx *Context) error { args := new(WorkerTestJobArgs) - ctx.Job.GetPayload(args) + ctx.Job.UnmarshalPayload(args) arg1 = args.A return nil }, @@ -43,7 +43,7 @@ func TestWorkerBasics(t *testing.T) { JobOptions: JobOptions{Priority: 1}, Handler: func(ctx *Context) error { args := new(WorkerTestJobArgs) - ctx.Job.GetPayload(args) + ctx.Job.UnmarshalPayload(args) arg2 = args.A return nil }, @@ -53,7 +53,7 @@ func TestWorkerBasics(t *testing.T) { JobOptions: JobOptions{Priority: 1}, Handler: func(ctx *Context) error { args := new(WorkerTestJobArgs) - ctx.Job.GetPayload(args) + ctx.Job.UnmarshalPayload(args) arg3 = args.A return nil }, From 090ad6a78954611912987215ee610ffe0c555b10 Mon Sep 17 00:00:00 2001 From: Jason Raede Date: Fri, 18 Nov 2016 14:01:04 -0500 Subject: [PATCH 3/4] Fix README --- README.md | 2 -- 1 file changed, 2 deletions(-) diff --git a/README.md b/README.md index 40c2496b..abd6aa16 100644 --- a/README.md +++ b/README.md @@ -79,8 +79,6 @@ var redisPool = &redis.Pool{ }, } -type SendEmail - func main() { // Make a new pool. Arguments: // 10 is the max concurrency From a25975ecfb67a596a22a502006f7feec7feaec7a Mon Sep 17 00:00:00 2001 From: Jason Raede Date: Fri, 18 Nov 2016 14:06:15 -0500 Subject: [PATCH 4/4] Fix web UI --- webui/internal/assets/src/Processes.js | 2 +- webui/internal/assets/src/Processes.test.js | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/webui/internal/assets/src/Processes.js b/webui/internal/assets/src/Processes.js index d45a6f08..8052b78e 100644 --- a/webui/internal/assets/src/Processes.js +++ b/webui/internal/assets/src/Processes.js @@ -26,7 +26,7 @@ class BusyWorkers extends React.Component { return ( {worker.job_name} - {worker.args_json} + {worker.payload} {worker.checkin} diff --git a/webui/internal/assets/src/Processes.test.js b/webui/internal/assets/src/Processes.test.js index 5fb3892a..24991210 100644 --- a/webui/internal/assets/src/Processes.test.js +++ b/webui/internal/assets/src/Processes.test.js @@ -21,7 +21,7 @@ describe('Processes', () => { started_at: 1467753603, checkin_at: 1467753603, checkin: '123', - args_json: '{}' + payload: '{}' } ], workerPool: [ @@ -44,7 +44,7 @@ describe('Processes', () => { expect(processes.state.workerPool.length).toEqual(1); expect(processes.workerCount).toEqual(3); - const expectedBusyWorker = [ { args_json: '{}', checkin: '123', checkin_at: 1467753603, job_name: 'job1', started_at: 1467753603, worker_id: '2' } ]; + const expectedBusyWorker = [ { payload: '{}', checkin: '123', checkin_at: 1467753603, job_name: 'job1', started_at: 1467753603, worker_id: '2' } ]; let output = r.getRenderOutput(); let busyWorkers = findAllByTag(output, 'BusyWorkers');