Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .env.example
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
CORS_ORIGINS=http://localhost:5173,https://example.com
DATABASE_URL=postgres://postgres:postgres@localhost:5432/river-development
DATABASE_URL=postgres://postgres:postgres@localhost:5432/river_dev
OTEL_ENABLED=false
PORT=8080
9 changes: 6 additions & 3 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ name: CI

env:
# A suitable URL for the test database.
DATABASE_URL: postgres://postgres:postgres@127.0.0.1:5432/riverui_dev?sslmode=disable
DATABASE_URL: postgres://postgres:postgres@127.0.0.1:5432/river_dev?sslmode=disable

# Test database.
TEST_DATABASE_URL: postgres://postgres:postgres@127.0.0.1:5432/river_test?sslmode=disable

on:
push:
Expand Down Expand Up @@ -58,14 +61,14 @@ jobs:
run: go install github.com/riverqueue/river/cmd/river@latest

- name: Create test DB
run: createdb riverui_dev
run: createdb river_test
env:
PGHOST: 127.0.0.1
PGUSER: postgres
PGPASSWORD: postgres

- name: Migrate test DB
run: river migrate-up --database-url "$DATABASE_URL"
run: river migrate-up --database-url "$TEST_DATABASE_URL"

# ensure that there is a file in `ui/dist` to prevent a lint error about
# it during CI when there is nothing there:
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
.env
.env.*
!.env.example
.tool-versions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no asdf for you? :)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hah, actually I do use asdf! I figured I'd keep this out of the repo so not everyone gets affixed to one particular Node version though — I always find it kind of annoying when I go to a repo and am forced to install a new version because there's a .tool-versions in there. Does that make sense in this instance? The project should work on a variety of Node versions shouldn't it?

/riverui
2 changes: 2 additions & 0 deletions .golangci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,11 @@ linters-settings:
- id
- j
- mu
- r # common for http.Request
- rw # common for http.ResponseWriter
- sb # common convention for string builder
- t
- tt # common convention for table tests
- tx
- w # common for http.ResponseWriter
- wg
148 changes: 85 additions & 63 deletions api_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,63 +12,78 @@ import (

"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgtype"
"github.com/jackc/pgx/v5/pgxpool"

"github.com/riverqueue/river"
"github.com/riverqueue/river/rivertype"
"github.com/riverqueue/riverui/internal/apiendpoint"
"github.com/riverqueue/riverui/internal/apierror"
"github.com/riverqueue/riverui/internal/db"
"github.com/riverqueue/riverui/internal/util/dbutil"
)

type jobCancelRequest struct {
JobIDStrings []string `json:"ids"`
}

type apiHandler struct {
// A bundle of common utilities needed for many API endpoints.
type apiBundle struct {
client *river.Client[pgx.Tx]
dbPool *pgxpool.Pool
dbPool DBTXWithBegin
logger *slog.Logger
queries *db.Queries
}

func (a *apiHandler) JobCancel(rw http.ResponseWriter, req *http.Request) {
ctx, cancel := context.WithTimeout(req.Context(), 5*time.Second)
defer cancel()
// SetBundle sets all values to the same as the given bundle.
func (a *apiBundle) SetBundle(bundle *apiBundle) {
*a = *bundle
}

var payload jobCancelRequest
if err := json.NewDecoder(req.Body).Decode(&payload); err != nil {
a.logger.ErrorContext(ctx, "error decoding request", slog.String("error", err.Error()))
http.Error(rw, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
return
}
jobIDs, err := stringIDsToInt64s(payload.JobIDStrings)
if err != nil {
a.logger.ErrorContext(ctx, "error decoding job IDs", slog.String("error", err.Error()))
http.Error(rw, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
return
// withSetBundle is an interface that's automatically implemented by types that
// embed apiBundle. It lets places like tests generically set bundle values on
// any general endpoint type.
type withSetBundle interface {
// SetBundle sets all values to the same as the given bundle.
SetBundle(bundle *apiBundle)
}

type jobCancelEndpoint struct {
apiBundle
apiendpoint.Endpoint[jobCancelRequest, jobCancelResponse]
}

func (*jobCancelEndpoint) Meta() *apiendpoint.EndpointMeta {
return &apiendpoint.EndpointMeta{
Pattern: "POST /api/jobs/cancel",
StatusCode: http.StatusOK,
}
}

type jobCancelRequest struct {
JobIDs []int64String `json:"ids"`
}

updatedJobs := make(map[int64]*rivertype.JobRow)
type jobCancelResponse struct {
Status string `json:"status"`
}

if err := pgx.BeginFunc(ctx, a.dbPool, func(tx pgx.Tx) error {
for _, jobID := range jobIDs {
func (a *jobCancelEndpoint) Execute(ctx context.Context, req *jobCancelRequest) (*jobCancelResponse, error) {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So each endpoint gets an Execute function like this one, with the main benefits compared to the alternative being:

  • Error handling gets much easier; the handler can just return an error or API error as the second return value and not having to worry about writing the error back or calling return.
  • Makes general testing much easier because most tests can just send a strongly typed request struct and receive a typed response.
  • Moves marshaling/unmarshaling and other common API endpoint features down a layer so the handler code can be made much more succinct.

return dbutil.WithTxV(ctx, a.dbPool, func(ctx context.Context, tx pgx.Tx) (*jobCancelResponse, error) {
updatedJobs := make(map[int64]*rivertype.JobRow)
for _, jobID := range req.JobIDs {
jobID := int64(jobID)
job, err := a.client.JobCancelTx(ctx, tx, jobID)
if err != nil {
if errors.Is(err, river.ErrNotFound) {
fmt.Printf("job %d not found\n", jobID)
return nil, apierror.NewNotFoundJob(jobID)
}
return err
return nil, fmt.Errorf("error canceling jobs: %w", err)
}
updatedJobs[jobID] = job
}
return nil
}); err != nil {
a.logger.ErrorContext(ctx, "error cancelling jobs", slog.String("error", err.Error()))
http.Error(rw, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
return
}

// TODO: return jobs in response, use in frontend instead of invalidating
a.writeResponse(ctx, rw, []byte("{\"status\": \"ok\"}"))
// TODO: return jobs in response, use in frontend instead of invalidating
return &jobCancelResponse{Status: "ok"}, nil
})
}

type apiHandler struct {
apiBundle
}

func (a *apiHandler) writeResponse(ctx context.Context, rw http.ResponseWriter, data []byte) {
Expand Down Expand Up @@ -175,38 +190,45 @@ func (a *apiHandler) JobRetry(rw http.ResponseWriter, req *http.Request) {
a.writeResponse(ctx, rw, []byte("{\"status\": \"ok\"}"))
}

func (a *apiHandler) JobGet(rw http.ResponseWriter, req *http.Request) {
ctx, cancel := context.WithTimeout(req.Context(), 5*time.Second)
defer cancel()
type jobGetEndpoint struct {
apiBundle
apiendpoint.Endpoint[jobGetRequest, RiverJob]
}

idString := req.PathValue("id")
if idString == "" {
http.Error(rw, "missing job id", http.StatusBadRequest)
return
func (*jobGetEndpoint) Meta() *apiendpoint.EndpointMeta {
return &apiendpoint.EndpointMeta{
Pattern: "GET /api/jobs/{job_id}",
StatusCode: http.StatusOK,
}
}

type jobGetRequest struct {
JobID int64 `json:"-"` // from ExtractRaw
}

func (req *jobGetRequest) ExtractRaw(r *http.Request) error {
idString := r.PathValue("job_id")

jobID, err := strconv.ParseInt(idString, 10, 64)
if err != nil {
http.Error(rw, fmt.Sprintf("invalid job id: %s", err), http.StatusBadRequest)
return
return apierror.NewBadRequest("Couldn't convert job ID to int64: %s.", err)
}
req.JobID = jobID

job, err := a.client.JobGet(ctx, jobID)
if errors.Is(err, river.ErrNotFound) {
http.Error(rw, "{\"error\": {\"msg\": \"job not found\"}}", http.StatusNotFound)
return
}
if err != nil {
a.logger.ErrorContext(ctx, "error getting job", slog.String("error", err.Error()))
http.Error(rw, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
return
}
return nil
}

if err := json.NewEncoder(rw).Encode(riverJobToSerializableJob(*job)); err != nil {
a.logger.ErrorContext(ctx, "error encoding job", slog.String("error", err.Error()))
http.Error(rw, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
return
}
func (a *jobGetEndpoint) Execute(ctx context.Context, req *jobGetRequest) (*RiverJob, error) {
return dbutil.WithTxV(ctx, a.dbPool, func(ctx context.Context, tx pgx.Tx) (*RiverJob, error) {
job, err := a.client.JobGetTx(ctx, tx, req.JobID)
if err != nil {
if errors.Is(err, river.ErrNotFound) {
return nil, apierror.NewNotFoundJob(req.JobID)
}
return nil, fmt.Errorf("error getting job: %w", err)
}
return riverJobToSerializableJob(job), nil
})
}

func (a *apiHandler) JobList(rw http.ResponseWriter, req *http.Request) {
Expand Down Expand Up @@ -544,7 +566,7 @@ func internalJobsToSerializableJobs(internal []db.RiverJob) []RiverJob {
return jobs
}

func riverJobToSerializableJob(riverJob rivertype.JobRow) RiverJob {
func riverJobToSerializableJob(riverJob *rivertype.JobRow) *RiverJob {
attemptedBy := riverJob.AttemptedBy
if attemptedBy == nil {
attemptedBy = []string{}
Expand All @@ -554,7 +576,7 @@ func riverJobToSerializableJob(riverJob rivertype.JobRow) RiverJob {
errs = []rivertype.AttemptError{}
}

return RiverJob{
return &RiverJob{
ID: riverJob.ID,
Args: riverJob.EncodedArgs,
Attempt: riverJob.Attempt,
Expand All @@ -574,10 +596,10 @@ func riverJobToSerializableJob(riverJob rivertype.JobRow) RiverJob {
}
}

func riverJobsToSerializableJobs(result *river.JobListResult) []RiverJob {
jobs := make([]RiverJob, len(result.Jobs))
func riverJobsToSerializableJobs(result *river.JobListResult) []*RiverJob {
jobs := make([]*RiverJob, len(result.Jobs))
for i, internalJob := range result.Jobs {
jobs[i] = riverJobToSerializableJob(*internalJob)
jobs[i] = riverJobToSerializableJob(internalJob)
}
return jobs
}
Expand Down
112 changes: 112 additions & 0 deletions api_handler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package riverui

import (
"context"
"testing"

"github.com/jackc/pgx/v5"
"github.com/stretchr/testify/require"

"github.com/riverqueue/river"
"github.com/riverqueue/river/rivertype"
"github.com/riverqueue/riverui/internal/apierror"
"github.com/riverqueue/riverui/internal/db"
"github.com/riverqueue/riverui/internal/riverinternaltest"
)

type setupEndpointTestBundle struct {
client *river.Client[pgx.Tx]
tx pgx.Tx
}

func setupEndpoint[TEndpoint any](ctx context.Context, t *testing.T) (*TEndpoint, *setupEndpointTestBundle) {
t.Helper()

var (
endpoint TEndpoint
logger = riverinternaltest.Logger(t)
client = insertOnlyClient(t, logger)
tx = riverinternaltest.TestTx(ctx, t)
)

if withSetBundle, ok := any(&endpoint).(withSetBundle); ok {
withSetBundle.SetBundle(&apiBundle{
client: client,
dbPool: tx,
logger: logger,
queries: db.New(tx),
})
}

return &endpoint, &setupEndpointTestBundle{
client: client,
tx: tx,
}
}

func TestAPIHandlerJobCancel(t *testing.T) {
t.Parallel()

ctx := context.Background()

t.Run("Success", func(t *testing.T) {
t.Parallel()

endpoint, bundle := setupEndpoint[jobCancelEndpoint](ctx, t)

insertRes1, err := bundle.client.InsertTx(ctx, bundle.tx, &noOpArgs{}, nil)
require.NoError(t, err)

insertRes2, err := bundle.client.InsertTx(ctx, bundle.tx, &noOpArgs{}, nil)
require.NoError(t, err)

resp, err := endpoint.Execute(ctx, &jobCancelRequest{JobIDs: []int64String{int64String(insertRes1.Job.ID), int64String(insertRes2.Job.ID)}})
require.NoError(t, err)
require.Equal(t, &jobCancelResponse{Status: "ok"}, resp)
Copy link
Collaborator Author

@brandur brandur Jun 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are two classes of new API tests in this PR. This is the first that zeroes in on the endpoint and just tests that, with the benefits being:

  • Strongly typed request/resp so each test doesn't have marshal and unmarshal JSON everywhere, which adds a ton of noise.
  • Similarly, much faster because we get to skip the entire HTTP layer in most cases.
  • In case of an error, it's much clearer to interpret because you get a real error instead of 500 response.

Most test cases including edges like not founds will be written in this style because it's more convenient and lighter weight.


updatedJob1, err := bundle.client.JobGetTx(ctx, bundle.tx, insertRes1.Job.ID)
require.NoError(t, err)
require.Equal(t, rivertype.JobStateCancelled, updatedJob1.State)

updatedJob2, err := bundle.client.JobGetTx(ctx, bundle.tx, insertRes2.Job.ID)
require.NoError(t, err)
require.Equal(t, rivertype.JobStateCancelled, updatedJob2.State)
})

t.Run("NotFound", func(t *testing.T) {
t.Parallel()

endpoint, _ := setupEndpoint[jobCancelEndpoint](ctx, t)

_, err := endpoint.Execute(ctx, &jobCancelRequest{JobIDs: []int64String{123}})
requireAPIError(t, apierror.NewNotFoundJob(123), err)
})
}

func TestAPIHandlerJobGet(t *testing.T) {
t.Parallel()

ctx := context.Background()

t.Run("Success", func(t *testing.T) {
t.Parallel()

endpoint, bundle := setupEndpoint[jobGetEndpoint](ctx, t)

insertRes, err := bundle.client.InsertTx(ctx, bundle.tx, &noOpArgs{}, nil)
require.NoError(t, err)

resp, err := endpoint.Execute(ctx, &jobGetRequest{JobID: insertRes.Job.ID})
require.NoError(t, err)
require.Equal(t, insertRes.Job.ID, resp.ID)
})

t.Run("NotFound", func(t *testing.T) {
t.Parallel()

endpoint, _ := setupEndpoint[jobGetEndpoint](ctx, t)

_, err := endpoint.Execute(ctx, &jobGetRequest{JobID: 123})
requireAPIError(t, apierror.NewNotFoundJob(123), err)
})
}
Loading