Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,19 @@ jobs:
run: ./river validate --database-url $DATABASE_URL
working-directory: ./cmd/river

- name: river bench
run: |
( sleep 10 && killall -SIGTERM river ) &
./river bench --database-url $DATABASE_URL
working-directory: ./cmd/river

# Bench again in fixed number of jobs mode.
- name: river bench
run: |
( sleep 10 && killall -SIGTERM river ) &
./river bench --database-url $DATABASE_URL --num-total-jobs 1_234
working-directory: ./cmd/river

- name: river migrate-down
run: ./river migrate-down --database-url $DATABASE_URL --max-steps 100
working-directory: ./cmd/river
Expand Down
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Added

- The River CLI now supports `river bench` to benchmark River's job throughput against a database. [PR #254](https://github.com/riverqueue/river/pull/254).

### Changed

- Changed default client IDs to be a combination of hostname and the time which the client started. This can still be changed by specifying `Config.ID`. [PR #255](https://github.com/riverqueue/river/pull/255).
Expand Down
4 changes: 3 additions & 1 deletion cmd/river/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ go 1.21.4
require (
github.com/jackc/pgx/v5 v5.5.2
github.com/riverqueue/river v0.0.17
github.com/riverqueue/river/riverdriver v0.0.17
github.com/riverqueue/river/riverdriver/riverpgxv5 v0.0.17
github.com/spf13/cobra v1.8.0
)
Expand All @@ -22,9 +23,10 @@ require (
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
github.com/jackc/puddle/v2 v2.2.1 // indirect
github.com/riverqueue/river/riverdriver v0.0.17 // indirect
github.com/oklog/ulid/v2 v2.1.0 // indirect
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems like this is outdated diff?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ugh, I thought so too, but that's what go mod tidy produces. The ULID dep should be dropped next time we cut a release and can target the River CLI Go submodule against it.

github.com/spf13/pflag v1.0.5 // indirect
golang.org/x/crypto v0.17.0 // indirect
golang.org/x/exp v0.0.0-20230522175609-2e198f4a06a1 // indirect
golang.org/x/sync v0.6.0 // indirect
golang.org/x/text v0.14.0 // indirect
)
5 changes: 5 additions & 0 deletions cmd/river/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk
github.com/jackc/puddle/v2 v2.2.1/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw=
github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/oklog/ulid/v2 v2.1.0 h1:+9lhoxAP56we25tyYETBBY1YLA2SaoLvUFgrP2miPJU=
github.com/oklog/ulid/v2 v2.1.0/go.mod h1:rcEKHmBBKfef9DhnvX7y1HZBYxjXb0cP5ExxNsTT1QQ=
github.com/pborman/getopt v0.0.0-20170112200414-7148bc3a4c30/go.mod h1:85jBQOZwpVEaDAr341tbn15RS4fCAsIst0qp7i8ex1o=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/riverqueue/river v0.0.17 h1:7beHZxo1WMzhN48y1Jt7CKkkmsw+TjuLd6qCEaznm7s=
Expand All @@ -26,6 +29,8 @@ github.com/riverqueue/river/riverdriver/riverdatabasesql v0.0.17 h1:xPmTpQNBicTZ
github.com/riverqueue/river/riverdriver/riverdatabasesql v0.0.17/go.mod h1:zlZKXZ6XHcbwYniSKWX2+GwFlXHTnG9pJtE/BkxK0Xc=
github.com/riverqueue/river/riverdriver/riverpgxv5 v0.0.17 h1:iuruCNT7nkC7Z4Qzb79jcvAVniGyK+Kstsy7fKJagUU=
github.com/riverqueue/river/riverdriver/riverpgxv5 v0.0.17/go.mod h1:kL59NW3LoPbQxPz9DQoUtDYq3Zkcpdt5CIowgeBZwFw=
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/spf13/cobra v1.8.0 h1:7aJaZx1B85qltLMc546zn58BxxfZdR/W22ej9CFoEf0=
github.com/spf13/cobra v1.8.0/go.mod h1:WXLWApfZ71AjXPya3WOlMsY9yMs7YeiHhFVlvLyhcho=
Expand Down
86 changes: 81 additions & 5 deletions cmd/river/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@ import (
"context"
"errors"
"fmt"
"log/slog"
"os"
"strconv"
"time"

"github.com/jackc/pgx/v5/pgxpool"
"github.com/spf13/cobra"

"github.com/riverqueue/river/cmd/river/riverbench"
"github.com/riverqueue/river/riverdriver/riverpgxv5"
"github.com/riverqueue/river/rivermigrate"
)
Expand Down Expand Up @@ -39,14 +41,46 @@ Provides command line facilities for the River job queue.
}
}

mustMarkFlagRequired := func(cmd *cobra.Command, name string) {
mustMarkFlagRequired := func(cmd *cobra.Command, name string) { //nolint:unparam
// We just panic here because this will never happen outside of an error
// in development.
if err := cmd.MarkFlagRequired(name); err != nil {
panic(err)
}
}

// bench
{
var opts benchOpts

cmd := &cobra.Command{
Use: "bench",
Short: "Run River benchmark",
Long: `
Run a River benchmark which inserts and works jobs continually, giving a rough
idea of jobs per second and time to work a single job.

By default, the benchmark will continuously insert and work jobs in perpetuity
until interrupted by SIGINT (Ctrl^C). It can alternatively take a maximum run
duration with --duration, which takes a Go-style duration string like 1m.
Lastly, it can take --num-total-jobs, which inserts the given number of jobs
before starting the client, and works until all jobs are finished.

The database in --database-url will have its jobs table truncated, so make sure
to use a development database only.
`,
Run: func(cmd *cobra.Command, args []string) {
execHandlingError(func() (bool, error) { return bench(ctx, &opts) })
},
}
cmd.Flags().StringVar(&opts.DatabaseURL, "database-url", "", "URL of the database to benchmark (should look like `postgres://...`")
cmd.Flags().DurationVar(&opts.Duration, "duration", 0, "duration after which to stop benchmark, accepting Go-style durations like 1m, 5m30s")
cmd.Flags().IntVarP(&opts.NumTotalJobs, "num-total-jobs", "n", 0, "number of jobs to insert before starting and which are worked down until finish")
cmd.Flags().BoolVarP(&opts.Verbose, "verbose", "v", false, "output additional logging verbosity")
mustMarkFlagRequired(cmd, "database-url")
rootCmd.AddCommand(cmd)
}

// migrate-down
{
var opts migrateDownOpts
Expand All @@ -65,8 +99,8 @@ Defaults to running a single down migration. This behavior can be changed with
},
}
cmd.Flags().StringVar(&opts.DatabaseURL, "database-url", "", "URL of the database to migrate (should look like `postgres://...`")
cmd.Flags().IntVar(&opts.MaxSteps, "max-steps", 1, "Maximum number of steps to migrate")
cmd.Flags().IntVar(&opts.TargetVersion, "target-version", 0, "Target version to migrate to (final state includes this version, but none after it)")
cmd.Flags().IntVar(&opts.MaxSteps, "max-steps", 1, "maximum number of steps to migrate")
cmd.Flags().IntVar(&opts.TargetVersion, "target-version", 0, "target version to migrate to (final state includes this version, but none after it)")
mustMarkFlagRequired(cmd, "database-url")
rootCmd.AddCommand(cmd)
}
Expand All @@ -89,8 +123,8 @@ restricted with --max-steps or --target-version.
},
}
cmd.Flags().StringVar(&opts.DatabaseURL, "database-url", "", "URL of the database to migrate (should look like `postgres://...`")
cmd.Flags().IntVar(&opts.MaxSteps, "max-steps", 0, "Maximum number of steps to migrate")
cmd.Flags().IntVar(&opts.TargetVersion, "target-version", 0, "Target version to migrate to (final state includes this version)")
cmd.Flags().IntVar(&opts.MaxSteps, "max-steps", 0, "maximum number of steps to migrate")
cmd.Flags().IntVar(&opts.TargetVersion, "target-version", 0, "target version to migrate to (final state includes this version)")
mustMarkFlagRequired(cmd, "database-url")
rootCmd.AddCommand(cmd)
}
Expand Down Expand Up @@ -151,6 +185,48 @@ func setParamIfUnset(runtimeParams map[string]string, name, val string) {
runtimeParams[name] = val
}

type benchOpts struct {
DatabaseURL string
Duration time.Duration
NumTotalJobs int
Verbose bool
}

func (o *benchOpts) validate() error {
if o.DatabaseURL == "" {
return errors.New("database URL cannot be empty")
}

return nil
}

func bench(ctx context.Context, opts *benchOpts) (bool, error) {
if err := opts.validate(); err != nil {
return false, err
}

dbPool, err := openDBPool(ctx, opts.DatabaseURL)
if err != nil {
return false, err
}
defer dbPool.Close()

var logger *slog.Logger
if opts.Verbose {
logger = slog.New(slog.NewTextHandler(os.Stdout, nil))
} else {
logger = slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelWarn}))
}

benchmarker := riverbench.NewBenchmarker(riverpgxv5.New(dbPool), logger, opts.Duration, opts.NumTotalJobs)

if err := benchmarker.Run(ctx); err != nil {
return false, err
}

return true, nil
}

type migrateDownOpts struct {
DatabaseURL string
MaxSteps int
Expand Down
Loading