From 4eba8854e4b652dda3694f2f483080456d5b53b3 Mon Sep 17 00:00:00 2001 From: Brandur Date: Thu, 25 Apr 2024 19:52:48 -0700 Subject: [PATCH] First release of River Ruby bindings A first push of Ruby bindings for River, providing insert-only client to work jobs that are implemented in Go. Includes two initial drivers in the `drivers/` directory, one for ActiveRecord and one for Sequel, which should cover the vast majority of Ruby applications making use of Postgres. The drivers are kept in the main gem's GitHub repository for convenience, but ship as separate gems so that programs including them can minimize their dependencies. Overall, I'm happy at how close I was able to keep the API to the Go version. A lot of syntax in Go just isn't needed due to the more dynamic and implicit nature of Ruby, but the parts that came through are quite close. e.g. We have a job args concept, along with `InsertOpts` that can be added to both jobs and at insert time, just like Go. Purposely not implemented on this first push (I'll follow up with these later on): * Unique jobs. * Batch insert. Try to maintain high Ruby quality standards with: * Full spec suite that requires 100.0% branch coverage. * Use standardrb for lint. * Include RBS files with type and run Steep against the project to verify correctness. --- .github/workflows/ci.yml | 158 ++++++++++++++ .gitignore | 1 + CHANGELOG.md | 12 ++ Gemfile | 15 ++ Gemfile.lock | 147 +++++++++++++ Makefile | 24 +++ Steepfile | 11 + docs/README.md | 104 +++++++++ docs/development.md | 58 +++++ drivers/riverqueue-activerecord/Gemfile | 15 ++ drivers/riverqueue-activerecord/Gemfile.lock | 132 ++++++++++++ .../riverqueue-activerecord/README.md | 6 +- .../riverqueue-activerecord/docs/README.md | 23 ++ .../docs/development.md | 48 +++++ drivers/riverqueue-activerecord/lib/driver.rb | 88 ++++++++ .../lib/riverqueue-activerecord.rb | 6 + .../riverqueue-activerecord.gemspec | 16 ++ .../spec/driver_spec.rb | 202 ++++++++++++++++++ .../spec/spec_helper.rb | 20 ++ drivers/riverqueue-sequel/Gemfile | 14 ++ drivers/riverqueue-sequel/Gemfile.lock | 90 ++++++++ drivers/riverqueue-sequel/docs/README.md | 23 ++ drivers/riverqueue-sequel/docs/development.md | 48 +++++ drivers/riverqueue-sequel/lib/driver.rb | 75 +++++++ .../lib/riverqueue-sequel.rb | 6 + .../riverqueue-sequel.gemspec | 15 ++ drivers/riverqueue-sequel/spec/driver_spec.rb | 202 ++++++++++++++++++ drivers/riverqueue-sequel/spec/spec_helper.rb | 19 ++ lib/client.rb | 110 ++++++++++ lib/driver.rb | 47 ++++ lib/insert_opts.rb | 65 ++++++ lib/job.rb | 169 +++++++++++++++ lib/riverqueue.rb | 9 + riverqueue.gemspec | 18 +- sig/client.rbs | 31 +++ sig/driver.rbs | 27 +++ sig/insert_opts.rbs | 21 ++ sig/job.rbs | 64 ++++++ sig/riverqueue.rbs | 2 + spec/client_spec.rb | 184 ++++++++++++++++ spec/job_spec.rb | 40 ++++ spec/spec_helper.rb | 12 ++ 42 files changed, 2365 insertions(+), 12 deletions(-) create mode 100644 .github/workflows/ci.yml create mode 100644 CHANGELOG.md create mode 100644 Gemfile create mode 100644 Gemfile.lock create mode 100644 Makefile create mode 100644 Steepfile create mode 100644 docs/README.md create mode 100644 docs/development.md create mode 100644 drivers/riverqueue-activerecord/Gemfile create mode 100644 drivers/riverqueue-activerecord/Gemfile.lock rename README.md => drivers/riverqueue-activerecord/README.md (53%) create mode 100644 drivers/riverqueue-activerecord/docs/README.md create mode 100644 drivers/riverqueue-activerecord/docs/development.md create mode 100644 drivers/riverqueue-activerecord/lib/driver.rb create mode 100644 drivers/riverqueue-activerecord/lib/riverqueue-activerecord.rb create mode 100644 drivers/riverqueue-activerecord/riverqueue-activerecord.gemspec create mode 100644 drivers/riverqueue-activerecord/spec/driver_spec.rb create mode 100644 drivers/riverqueue-activerecord/spec/spec_helper.rb create mode 100644 drivers/riverqueue-sequel/Gemfile create mode 100644 drivers/riverqueue-sequel/Gemfile.lock create mode 100644 drivers/riverqueue-sequel/docs/README.md create mode 100644 drivers/riverqueue-sequel/docs/development.md create mode 100644 drivers/riverqueue-sequel/lib/driver.rb create mode 100644 drivers/riverqueue-sequel/lib/riverqueue-sequel.rb create mode 100644 drivers/riverqueue-sequel/riverqueue-sequel.gemspec create mode 100644 drivers/riverqueue-sequel/spec/driver_spec.rb create mode 100644 drivers/riverqueue-sequel/spec/spec_helper.rb create mode 100644 lib/client.rb create mode 100644 lib/driver.rb create mode 100644 lib/insert_opts.rb create mode 100644 lib/job.rb create mode 100644 sig/client.rbs create mode 100644 sig/driver.rbs create mode 100644 sig/insert_opts.rbs create mode 100644 sig/job.rbs create mode 100644 sig/riverqueue.rbs create mode 100644 spec/client_spec.rb create mode 100644 spec/job_spec.rb create mode 100644 spec/spec_helper.rb diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..a7717ef --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,158 @@ +name: CI + +env: + # Database to connect to that can create other databases with `CREATE DATABASE`. + ADMIN_DATABASE_URL: postgres://postgres:postgres@localhost:5432 + + # Just a common place for steps to put binaries they need and which is added + # to GITHUB_PATH/PATH. + BIN_PATH: /home/runner/bin + + # A suitable URL for a test database. + TEST_DATABASE_URL: postgres://postgres:postgres@127.0.0.1:5432/riverqueue_ruby_test?sslmode=disable + +on: + - push + +jobs: + gem_build: + runs-on: ubuntu-latest + timeout-minutes: 3 + + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Install Ruby + `bundle install` + uses: ruby/setup-ruby@v1 + with: + ruby-version: "head" + bundler-cache: true # runs 'bundle install' and caches installed gems automatically + + - name: Build gem (riverqueue-ruby) + run: gem build riverqueue.gemspec + working-directory: . + + - name: Build gem (riverqueue-activerecord) + run: gem build riverqueue-activerecord.gemspec + working-directory: ./drivers/riverqueue-activerecord + + - name: Build gem (riverqueue-sequel) + run: gem build riverqueue-sequel.gemspec + working-directory: ./drivers/riverqueue-sequel + + lint: + runs-on: ubuntu-latest + timeout-minutes: 3 + + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Install Ruby + `bundle install` + uses: ruby/setup-ruby@v1 + with: + ruby-version: "head" + bundler-cache: true # runs 'bundle install' and caches installed gems automatically + + - name: Standard Ruby (riverqueue-ruby) + run: bundle exec standardrb + working-directory: . + + - name: bundle install (riverqueue-activerecord) + run: bundle install + working-directory: ./drivers/riverqueue-activerecord + + - name: Standard Ruby (riverqueue-activerecord) + run: bundle exec standardrb + working-directory: ./drivers/riverqueue-activerecord + + - name: bundle install (riverqueue-sequel) + run: bundle install + working-directory: ./drivers/riverqueue-sequel + + - name: Standard Ruby (riverqueue-sequel) + run: bundle exec standardrb + working-directory: ./drivers/riverqueue-sequel + + type_check: + runs-on: ubuntu-latest + timeout-minutes: 3 + + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Install Ruby + `bundle install` + uses: ruby/setup-ruby@v1 + with: + ruby-version: "head" + bundler-cache: true # runs 'bundle install' and caches installed gems automatically + + - name: Steep (riverqueue-ruby) + run: bundle exec steep check + working-directory: . + + spec: + runs-on: ubuntu-latest + timeout-minutes: 3 + + services: + postgres: + image: postgres + env: + POSTGRES_PASSWORD: postgres + options: >- + --health-cmd pg_isready + --health-interval 2s + --health-timeout 5s + --health-retries 5 + ports: + - 5432:5432 + + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Install Ruby + `bundle install` + uses: ruby/setup-ruby@v1 + with: + ruby-version: "head" + bundler-cache: true # runs 'bundle install' and caches installed gems automatically + + # There is a version of Go on Actions' base image, but it's old and can't + # read modern `go.mod` annotations correctly. + - name: Install Go + uses: actions/setup-go@v4 + with: + go-version: "stable" + check-latest: true + + - name: Create database + run: psql --echo-errors --quiet -c '\timing off' -c "CREATE DATABASE riverqueue_ruby_test;" ${ADMIN_DATABASE_URL} + + - name: Install River CLI + run: go install github.com/riverqueue/river/cmd/river@latest + + - name: river migrate-up + run: river migrate-up --database-url "$TEST_DATABASE_URL" + + - name: Rspec (riverqueue-ruby) + run: bundle exec rspec + working-directory: . + + - name: bundle install (riverqueue-activerecord) + run: bundle install + working-directory: ./drivers/riverqueue-activerecord + + - name: Rspec (riverqueue-activerecord) + run: bundle exec rspec + working-directory: ./drivers/riverqueue-activerecord + + - name: bundle install (riverqueue-sequel) + run: bundle install + working-directory: ./drivers/riverqueue-sequel + + - name: Rspec (riverqueue-sequel) + run: bundle exec rspec + working-directory: ./drivers/riverqueue-sequel diff --git a/.gitignore b/.gitignore index c111b33..e8fe1c4 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ *.gem +coverage/ diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 0000000..8a24dbf --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,12 @@ +# Changelog + +All notable changes to this project will be documented in this file. + +The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), +and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). + +## [Unreleased] + +### Added + +- Initial implementation that supports inserting jobs using either ActiveRecord or Sequel. [PR #1](https://github.com/riverqueue/riverqueue-ruby/pull/1). \ No newline at end of file diff --git a/Gemfile b/Gemfile new file mode 100644 index 0000000..213a8d2 --- /dev/null +++ b/Gemfile @@ -0,0 +1,15 @@ +source "https://rubygems.org" + +gemspec + +group :development, :test do + gem "standard" + gem "steep" +end + +group :test do + gem "debug" + gem "rspec-core" + gem "rspec-expectations" + gem "simplecov", require: false +end diff --git a/Gemfile.lock b/Gemfile.lock new file mode 100644 index 0000000..0c40957 --- /dev/null +++ b/Gemfile.lock @@ -0,0 +1,147 @@ +PATH + remote: . + specs: + riverqueue (0.0.1) + +GEM + remote: https://rubygems.org/ + specs: + abbrev (0.1.2) + activesupport (7.1.3.2) + base64 + bigdecimal + concurrent-ruby (~> 1.0, >= 1.0.2) + connection_pool (>= 2.2.5) + drb + i18n (>= 1.6, < 2) + minitest (>= 5.1) + mutex_m + tzinfo (~> 2.0) + ast (2.4.2) + base64 (0.2.0) + bigdecimal (3.1.7) + concurrent-ruby (1.2.3) + connection_pool (2.4.1) + csv (3.3.0) + debug (1.9.1) + irb (~> 1.10) + reline (>= 0.3.8) + diff-lcs (1.5.0) + docile (1.4.0) + drb (2.2.1) + ffi (1.16.3) + fileutils (1.7.2) + i18n (1.14.4) + concurrent-ruby (~> 1.0) + io-console (0.7.2) + irb (1.11.2) + rdoc + reline (>= 0.4.2) + json (2.7.1) + language_server-protocol (3.17.0.3) + lint_roller (1.1.0) + listen (3.9.0) + rb-fsevent (~> 0.10, >= 0.10.3) + rb-inotify (~> 0.9, >= 0.9.10) + logger (1.6.0) + minitest (5.22.3) + mutex_m (0.2.0) + parallel (1.24.0) + parser (3.3.0.5) + ast (~> 2.4.1) + racc + psych (5.1.2) + stringio + racc (1.7.3) + rainbow (3.1.1) + rb-fsevent (0.11.2) + rb-inotify (0.10.1) + ffi (~> 1.0) + rbs (3.4.4) + abbrev + rdoc (6.6.2) + psych (>= 4.0.0) + regexp_parser (2.9.0) + reline (0.4.3) + io-console (~> 0.5) + rexml (3.2.6) + rspec-core (3.12.2) + rspec-support (~> 3.12.0) + rspec-expectations (3.12.3) + diff-lcs (>= 1.2.0, < 2.0) + rspec-support (~> 3.12.0) + rspec-support (3.12.1) + rubocop (1.61.0) + json (~> 2.3) + language_server-protocol (>= 3.17.0) + parallel (~> 1.10) + parser (>= 3.3.0.2) + rainbow (>= 2.2.2, < 4.0) + regexp_parser (>= 1.8, < 3.0) + rexml (>= 3.2.5, < 4.0) + rubocop-ast (>= 1.30.0, < 2.0) + ruby-progressbar (~> 1.7) + unicode-display_width (>= 2.4.0, < 3.0) + rubocop-ast (1.31.1) + parser (>= 3.3.0.4) + rubocop-performance (1.20.2) + rubocop (>= 1.48.1, < 2.0) + rubocop-ast (>= 1.30.0, < 2.0) + ruby-progressbar (1.13.0) + securerandom (0.3.1) + simplecov (0.22.0) + docile (~> 1.1) + simplecov-html (~> 0.11) + simplecov_json_formatter (~> 0.1) + simplecov-html (0.12.3) + simplecov_json_formatter (0.1.4) + standard (1.34.0) + language_server-protocol (~> 3.17.0.2) + lint_roller (~> 1.0) + rubocop (~> 1.60) + standard-custom (~> 1.0.0) + standard-performance (~> 1.3) + standard-custom (1.0.2) + lint_roller (~> 1.0) + rubocop (~> 1.50) + standard-performance (1.3.1) + lint_roller (~> 1.1) + rubocop-performance (~> 1.20.2) + steep (1.6.0) + activesupport (>= 5.1) + concurrent-ruby (>= 1.1.10) + csv (>= 3.0.9) + fileutils (>= 1.1.0) + json (>= 2.1.0) + language_server-protocol (>= 3.15, < 4.0) + listen (~> 3.0) + logger (>= 1.3.0) + parser (>= 3.1) + rainbow (>= 2.2.2, < 4.0) + rbs (>= 3.1.0) + securerandom (>= 0.1) + strscan (>= 1.0.0) + terminal-table (>= 2, < 4) + stringio (3.1.0) + strscan (3.1.0) + terminal-table (3.0.2) + unicode-display_width (>= 1.1.1, < 3) + tzinfo (2.0.6) + concurrent-ruby (~> 1.0) + unicode-display_width (2.5.0) + +PLATFORMS + arm64-darwin-22 + x86_64-linux + +DEPENDENCIES + debug + riverqueue! + rspec-core + rspec-expectations + simplecov + standard + steep + +BUNDLED WITH + 2.4.20 diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..b6ff2cc --- /dev/null +++ b/Makefile @@ -0,0 +1,24 @@ +.PHONY: lint +lint: standardrb + +.PHONY: rspec +rspec: spec + +.PHONY: spec +spec: + bundle exec rspec + cd drivers/riverqueue-activerecord && bundle exec rspec + cd drivers/riverqueue-sequel && bundle exec rspec + +.PHONY: standardrb +standardrb: + bundle exec standardrb --fix + cd drivers/riverqueue-activerecord && bundle exec standardrb --fix + cd drivers/riverqueue-sequel && bundle exec standardrb --fix + +.PHONY: steep +steep: + bundle exec steep check + +.PHONY: type-check +type-check: steep diff --git a/Steepfile b/Steepfile new file mode 100644 index 0000000..620dab0 --- /dev/null +++ b/Steepfile @@ -0,0 +1,11 @@ +D = Steep::Diagnostic + +target :lib do + check "lib" + + library "json" + + signature "sig" + + configure_code_diagnostics(D::Ruby.strict) +end diff --git a/docs/README.md b/docs/README.md new file mode 100644 index 0000000..eb564c6 --- /dev/null +++ b/docs/README.md @@ -0,0 +1,104 @@ +# River client for Ruby [![Build Status](https://github.com/riverqueue/riverqueue-ruby/workflows/CI/badge.svg)](https://github.com/riverqueue/riverqueue-ruby/actions) + +An insert-only Ruby client for [River](https://github.com/riverqueue/river) packaged in the [`riverqueue` gem](https://rubygems.org/gems/riverqueue). Allows jobs to be inserted in Ruby and run by a Go worker, but doesn't support working jobs in Ruby. + +## Basic usage + +`Gemfile` should contain the core gem and a driver like [`rubyqueue-sequel`](https://github.com/riverqueue/riverqueue-ruby/drivers/riverqueue-sequel) (see [drivers](#drivers)): + +``` ruby +gem "riverqueue" +gem "riverqueue-sequel" +``` + +Initialize a client with: + +```ruby +DB = Sequel.connect("postgres://...") +client = River::Client.new(River::Driver::Sequel.new(DB)) +``` + +Define a job and insert it: + +```ruby +class SortArgs + attr_accessor :strings + + def initialize(strings:) + self.strings = strings + end + + def kind = "sort" + + def to_json = JSON.dump({strings: strings}) +end + +insert_res = client.insert(SimpleArgs.new(strings: ["whale", "tiger", "bear"])) +``` + +Job args should: + +* Respond to `#kind` with a unique string that identifies them in the database, and which a Go worker will recognize. +* Response to `#to_json` with a JSON serialization that'll be parseable as an object in Go. + +They may also respond to `#insert_opts` with an instance of `InsertOpts` to define insertion options that'll be used for all jobs of the kind. + +### Insertion options + +Inserts take an `insert_opts` parameter to customize features of the inserted job: + +```ruby +insert_res = client.insert( + SimpleArgs.new(strings: ["whale", "tiger", "bear"]), + insert_opts: River::InsertOpts.new( + max_attempts: 17, + priority: 3, + queue: "my_queue", + tags: ["custom"] + ) +) +``` + +### Inserting with a Ruby hash + +`JobArgsHash` can be used to insert with a kind and JSON hash so that it's not necessary to define a class: + +```ruby +insert_res = client.insert(River::JobArgsHash.new("hash_kind", { + job_num: 1 +})) +``` + +## Drivers + +### ActiveRecord + +``` ruby +gem "riverqueue" +gem "riverqueue-activerecord" +``` + +Initialize driver and client: + +```ruby +ActiveRecord::Base.establish_connection("postgres://...") +client = River::Client.new(River::Driver::ActiveRecord.new) +``` + +### Sequel + +``` ruby +gem "riverqueue" +gem "riverqueue-sequel" +``` + +Initialize driver and client: + +```ruby +DB = Sequel.connect("postgres://...") +client = River::Client.new(River::Driver::Sequel.new(DB)) +``` + +## Development + +See [development](./development.md). diff --git a/docs/development.md b/docs/development.md new file mode 100644 index 0000000..d816d34 --- /dev/null +++ b/docs/development.md @@ -0,0 +1,58 @@ +# riverqueue-ruby development + +## Install dependencies + +```shell +$ bundle install +``` +## Run tests + +Create a test database and migrate with River's CLI: + +```shell +$ go install github.com/riverqueue/river/cmd/river +$ createdb riverqueue_ruby_test +$ river migrate-up --database-url "postgres://localhost/riverqueue_ruby_test" +``` + +Run all specs: + +```shell +$ bundle exec rspec spec +``` + +## Run lint + +```shell +$ bundle exec standardrb --fix +``` + +## Run type check (Steep) + +```shell +$ bundle exec steep check +``` + +## Code coverage + +Running the entire test suite will produce a coverage report, and will fail if line and branch coverage is below 100%. Run the suite and open `coverage/index.html` to find lines or branches that weren't covered: + +```shell +$ bundle exec rspec spec +$ open coverage/index.html +``` + +## Publish a new gem + +```shell +git checkout master && git pull --rebase +VERSION=v0.0.x +gem build riverqueue.gemspec +gem push riverqueue-$VERSION.gem +pushd drivers/riverqueue-activerecord && gem build riverqueue-activerecord.gemspec && popd +pushd drivers/riverqueue-activerecord && gem push riverqueue-activerecord-$VERSION.gem && popd +pushd drivers/riverqueue-sequel && gem build riverqueue-sequel.gemspec && popd +pushd drivers/riverqueue-sequel && gem push riverqueue-sequel-$VERSION.gem && popd +git tag $VERSION +git push --tags +``` \ No newline at end of file diff --git a/drivers/riverqueue-activerecord/Gemfile b/drivers/riverqueue-activerecord/Gemfile new file mode 100644 index 0000000..3fd0fb5 --- /dev/null +++ b/drivers/riverqueue-activerecord/Gemfile @@ -0,0 +1,15 @@ +source "https://rubygems.org" + +gemspec + +group :development, :test do + gem "riverqueue", path: "../.." + gem "standard" +end + +group :test do + gem "debug" + gem "rspec-core" + gem "rspec-expectations" + gem "simplecov", require: false +end diff --git a/drivers/riverqueue-activerecord/Gemfile.lock b/drivers/riverqueue-activerecord/Gemfile.lock new file mode 100644 index 0000000..0b3719c --- /dev/null +++ b/drivers/riverqueue-activerecord/Gemfile.lock @@ -0,0 +1,132 @@ +PATH + remote: ../.. + specs: + riverqueue (0.0.1) + +PATH + remote: . + specs: + riverqueue-activerecord (0.0.1) + activerecord (> 0, < 1000) + activesupport + pg + +GEM + remote: https://rubygems.org/ + specs: + activemodel (7.1.3.2) + activesupport (= 7.1.3.2) + activerecord (7.1.3.2) + activemodel (= 7.1.3.2) + activesupport (= 7.1.3.2) + timeout (>= 0.4.0) + activesupport (7.1.3.2) + base64 + bigdecimal + concurrent-ruby (~> 1.0, >= 1.0.2) + connection_pool (>= 2.2.5) + drb + i18n (>= 1.6, < 2) + minitest (>= 5.1) + mutex_m + tzinfo (~> 2.0) + ast (2.4.2) + base64 (0.2.0) + bigdecimal (3.1.6) + concurrent-ruby (1.2.3) + connection_pool (2.4.1) + debug (1.9.1) + irb (~> 1.10) + reline (>= 0.3.8) + diff-lcs (1.5.1) + docile (1.4.0) + drb (2.2.1) + i18n (1.14.3) + concurrent-ruby (~> 1.0) + racc (~> 1.7) + io-console (0.7.2) + irb (1.11.2) + rdoc + reline (>= 0.4.2) + json (2.7.1) + language_server-protocol (3.17.0.3) + lint_roller (1.1.0) + minitest (5.22.2) + mutex_m (0.2.0) + parallel (1.24.0) + parser (3.3.0.5) + ast (~> 2.4.1) + racc + pg (1.5.4) + psych (5.1.2) + stringio + racc (1.7.3) + rainbow (3.1.1) + rdoc (6.6.2) + psych (>= 4.0.0) + regexp_parser (2.9.0) + reline (0.4.3) + io-console (~> 0.5) + rexml (3.2.6) + rspec-core (3.13.0) + rspec-support (~> 3.13.0) + rspec-expectations (3.13.0) + diff-lcs (>= 1.2.0, < 2.0) + rspec-support (~> 3.13.0) + rspec-support (3.13.1) + rubocop (1.61.0) + json (~> 2.3) + language_server-protocol (>= 3.17.0) + parallel (~> 1.10) + parser (>= 3.3.0.2) + rainbow (>= 2.2.2, < 4.0) + regexp_parser (>= 1.8, < 3.0) + rexml (>= 3.2.5, < 4.0) + rubocop-ast (>= 1.30.0, < 2.0) + ruby-progressbar (~> 1.7) + unicode-display_width (>= 2.4.0, < 3.0) + rubocop-ast (1.31.1) + parser (>= 3.3.0.4) + rubocop-performance (1.20.2) + rubocop (>= 1.48.1, < 2.0) + rubocop-ast (>= 1.30.0, < 2.0) + ruby-progressbar (1.13.0) + simplecov (0.22.0) + docile (~> 1.1) + simplecov-html (~> 0.11) + simplecov_json_formatter (~> 0.1) + simplecov-html (0.12.3) + simplecov_json_formatter (0.1.4) + standard (1.34.0) + language_server-protocol (~> 3.17.0.2) + lint_roller (~> 1.0) + rubocop (~> 1.60) + standard-custom (~> 1.0.0) + standard-performance (~> 1.3) + standard-custom (1.0.2) + lint_roller (~> 1.0) + rubocop (~> 1.50) + standard-performance (1.3.1) + lint_roller (~> 1.1) + rubocop-performance (~> 1.20.2) + stringio (3.1.0) + timeout (0.4.1) + tzinfo (2.0.6) + concurrent-ruby (~> 1.0) + unicode-display_width (2.5.0) + +PLATFORMS + arm64-darwin-22 + x86_64-linux + +DEPENDENCIES + debug + riverqueue! + riverqueue-activerecord! + rspec-core + rspec-expectations + simplecov + standard + +BUNDLED WITH + 2.4.20 diff --git a/README.md b/drivers/riverqueue-activerecord/README.md similarity index 53% rename from README.md rename to drivers/riverqueue-activerecord/README.md index afa2498..265dd01 100644 --- a/README.md +++ b/drivers/riverqueue-activerecord/README.md @@ -1,8 +1,8 @@ -# River Ruby bindings +# River Ruby bindings ActiveRecord driver A future home for River's Ruby bindings. For now, the [Gem is registered](https://rubygems.org/gems/riverqueue), but nothing else is done. ``` sh -$ gem build riverqueue.gemspec -$ gem push riverqueue-0.0.1.gem +$ gem build riverqueue-activerecord.gemspec +$ gem push riverqueue-activerecord-0.0.1.gem ``` diff --git a/drivers/riverqueue-activerecord/docs/README.md b/drivers/riverqueue-activerecord/docs/README.md new file mode 100644 index 0000000..7d01cf8 --- /dev/null +++ b/drivers/riverqueue-activerecord/docs/README.md @@ -0,0 +1,23 @@ +# riverqueue-sequel [![Build Status](https://github.com/riverqueue/riverqueue-ruby-sequel/workflows/CI/badge.svg)](https://github.com/riverqueue/riverqueue-ruby-sequel/actions) + +[ActiveRecord](https://github.com/jeremyevans/sequel) driver for [River](https://github.com/riverqueue/river)'s [`riverqueue` gem for Ruby](https://rubygems.org/gems/riverqueue). + +`Gemfile` should contain the core gem and a driver like this one: + +``` yaml +gem "riverqueue" +gem "riverqueue-sequel" +``` + +Initialize a client with: + +```ruby +DB = ActiveRecord.connect("postgres://...") +client = River::Client.new(River::Driver::ActiveRecord.new(DB)) +``` + +See also [`rubyqueue`](https://github.com/riverqueue/riverqueue-ruby). + +## Development + +See [development](./development.md). diff --git a/drivers/riverqueue-activerecord/docs/development.md b/drivers/riverqueue-activerecord/docs/development.md new file mode 100644 index 0000000..2f85549 --- /dev/null +++ b/drivers/riverqueue-activerecord/docs/development.md @@ -0,0 +1,48 @@ +# riverqueue-ruby development + +## Install dependencies + +```shell +$ bundle install +``` +## Run tests + +Create a test database and migrate with River's CLI: + +```shell +$ go install github.com/riverqueue/river/cmd/river +$ createdb riverqueue_ruby_test +$ river migrate-up --database-url "postgres://localhost/riverqueue_ruby_test" +``` + +Run all specs: + +```shell +$ bundle exec rspec spec +``` + +## Run lint + +```shell +$ standardrb --fix +``` + +## Code coverage + +Running the entire test suite will produce a coverage report, and will fail if line and branch coverage is below 100%. Run the suite and open `coverage/index.html` to find lines or branches that weren't covered: + +```shell +$ bundle exec rspec spec +$ open coverage/index.html +``` + +## Publish a new gem + +```shell +git checkout master && git pull --rebase +VERSION=v0.0.x +gem build riverqueue-sequel.gemspec +gem push riverqueue-sequel-$VERSION.gem +git tag $VERSION +git push --tags +``` \ No newline at end of file diff --git a/drivers/riverqueue-activerecord/lib/driver.rb b/drivers/riverqueue-activerecord/lib/driver.rb new file mode 100644 index 0000000..4ad7317 --- /dev/null +++ b/drivers/riverqueue-activerecord/lib/driver.rb @@ -0,0 +1,88 @@ +module River::Driver + # Provides a ActiveRecord driver for River. + # + # Used in conjunction with a River client like: + # + # DB = ActiveRecord.connect("postgres://...") + # client = River::Client.new(River::Driver::ActiveRecord.new(DB)) + # + class ActiveRecord + def initialize + # It's Ruby, so we can only define a model after ActiveRecord's established a + # connection because it's all dynamic. + if !River::Driver::ActiveRecord.const_defined?(:RiverJob) + River::Driver::ActiveRecord.const_set(:RiverJob, Class.new(::ActiveRecord::Base) do + self.table_name = "river_job" + + # Unfortunately, Rails errors if you have a column called `errors` and + # provides no way to remap names (beyond ignoring a column, which we + # really don't want). This patch is in place so we can hydrate this + # model at all without ActiveRecord self-immolating. + def self.dangerous_attribute_method?(method_name) + return false if method_name == "errors" + super + end + end) + end + end + + def insert(insert_params) + # the call to `#compact` is important so that we remove nils and table + # default values get picked up instead + to_job_row( + RiverJob.insert( + { + args: insert_params.encoded_args, + kind: insert_params.kind, + max_attempts: insert_params.max_attempts, + priority: insert_params.priority, + queue: insert_params.queue, + state: insert_params.state, + scheduled_at: insert_params.scheduled_at, + tags: insert_params.tags + }.compact, + returning: Arel.sql("*") + ).first + ) + end + + # Type type injected to this method is not a `RiverJob`, but rather a raw + # hash with stringified keys because we're inserting with the Arel framework + # directly rather than generating a record from a model. + private def to_job_row(raw_job) + deserialize = ->(field) do + RiverJob._default_attributes[field].type.deserialize(raw_job[field]) + end + + # Errors is `jsonb[]` so the subtype here will decode `jsonb`. + errors_subtype = RiverJob._default_attributes["errors"].type.subtype + + River::JobRow.new( + id: deserialize.call("id"), + args: deserialize.call("args").yield_self { |a| a ? JSON.parse(a) : nil }, + attempt: deserialize.call("attempt"), + attempted_at: deserialize.call("attempted_at"), + attempted_by: deserialize.call("attempted_by"), + created_at: deserialize.call("created_at"), + errors: deserialize.call("errors")&.map do |e| + deserialized_error = errors_subtype.deserialize(e) + + River::AttemptError.new( + at: Time.parse(deserialized_error["at"]), + attempt: deserialized_error["attempt"], + error: deserialized_error["error"], + trace: deserialized_error["trace"] + ) + end, + finalized_at: deserialize.call("finalized_at"), + kind: deserialize.call("kind"), + max_attempts: deserialize.call("max_attempts"), + priority: deserialize.call("priority"), + queue: deserialize.call("queue"), + scheduled_at: deserialize.call("scheduled_at"), + state: deserialize.call("state"), + tags: deserialize.call("tags") + ) + end + end +end diff --git a/drivers/riverqueue-activerecord/lib/riverqueue-activerecord.rb b/drivers/riverqueue-activerecord/lib/riverqueue-activerecord.rb new file mode 100644 index 0000000..82bf467 --- /dev/null +++ b/drivers/riverqueue-activerecord/lib/riverqueue-activerecord.rb @@ -0,0 +1,6 @@ +require "active_record" + +require_relative "driver" + +module River +end diff --git a/drivers/riverqueue-activerecord/riverqueue-activerecord.gemspec b/drivers/riverqueue-activerecord/riverqueue-activerecord.gemspec new file mode 100644 index 0000000..99c6b27 --- /dev/null +++ b/drivers/riverqueue-activerecord/riverqueue-activerecord.gemspec @@ -0,0 +1,16 @@ +Gem::Specification.new do |s| + s.name = "riverqueue-activerecord" + s.version = "0.0.1" + s.summary = "ActiveRecord driver for the River Ruby gem." + s.description = "ActiveRecord driver for the River Ruby gem. Use in conjunction with the riverqueue gem to insert jobs that are worked in Go." + s.authors = ["Blake Gentry", "Brandur Leach"] + s.email = "brandur@brandur.org" + s.files = ["lib/riverqueue-activerecord.rb"] + s.homepage = "https://riverqueue.com" + s.license = "LGPL-3.0-or-later" + + # The stupid version bounds are used to silence Ruby's extremely obnoxious warnings. + s.add_dependency "activerecord", "> 0", "< 1000" + s.add_dependency "activesupport", "> 0", "< 1000" # required for ActiveRecord to load properly + s.add_dependency "pg", "> 0", "< 1000" +end diff --git a/drivers/riverqueue-activerecord/spec/driver_spec.rb b/drivers/riverqueue-activerecord/spec/driver_spec.rb new file mode 100644 index 0000000..6b7b745 --- /dev/null +++ b/drivers/riverqueue-activerecord/spec/driver_spec.rb @@ -0,0 +1,202 @@ +require "spec_helper" + +class SimpleArgs + attr_accessor :job_num + + def initialize(job_num:) + self.job_num = job_num + end + + def kind = "simple" + + def to_json = JSON.dump({job_num: job_num}) +end + +# Lets us test job-specific insertion opts by making `#insert_opts` an accessor. +# Real args that make use of this functionality will probably want to make +# `#insert_opts` a non-accessor method instead. +class SimpleArgsWithInsertOpts < SimpleArgs + attr_accessor :insert_opts +end + +RSpec.describe River::Driver::ActiveRecord do + around(:each) { |ex| test_transaction(&ex) } + + let!(:driver) { River::Driver::ActiveRecord.new } + let(:client) { River::Client.new(driver) } + + describe "#insert" do + it "inserts a job" do + insert_res = client.insert(SimpleArgs.new(job_num: 1)) + expect(insert_res.job).to have_attributes( + args: {"job_num" => 1}, + attempt: 0, + created_at: be_within(2).of(Time.now.utc), + kind: "simple", + max_attempts: River::MAX_ATTEMPTS_DEFAULT, + queue: River::QUEUE_DEFAULT, + priority: River::PRIORITY_DEFAULT, + scheduled_at: be_within(2).of(Time.now.utc), + state: River::JOB_STATE_AVAILABLE, + tags: [] + ) + + # Make sure it made it to the database. Assert only minimally since we're + # certain it's the same as what we checked above. + river_job = River::Driver::ActiveRecord::RiverJob.find_by(id: insert_res.job.id) + expect(river_job).to have_attributes( + kind: "simple" + ) + end + + it "schedules a job" do + target_time = Time.now.utc + 1 * 3600 + + insert_res = client.insert( + SimpleArgs.new(job_num: 1), + insert_opts: River::InsertOpts.new(scheduled_at: target_time) + ) + expect(insert_res.job).to have_attributes( + scheduled_at: be_within(2).of(target_time), + state: River::JOB_STATE_SCHEDULED + ) + end + + it "inserts with job insert opts" do + args = SimpleArgsWithInsertOpts.new(job_num: 1) + args.insert_opts = River::InsertOpts.new( + max_attempts: 23, + priority: 2, + queue: "job_custom_queue", + tags: ["job_custom"] + ) + + insert_res = client.insert(args) + expect(insert_res.job).to have_attributes( + max_attempts: 23, + priority: 2, + queue: "job_custom_queue", + tags: ["job_custom"] + ) + end + + it "inserts with insert opts" do + # We set job insert opts in this spec too so that we can verify that the + # options passed at insertion time take precedence. + args = SimpleArgsWithInsertOpts.new(job_num: 1) + args.insert_opts = River::InsertOpts.new( + max_attempts: 23, + priority: 2, + queue: "job_custom_queue", + tags: ["job_custom"] + ) + + insert_res = client.insert(args, insert_opts: River::InsertOpts.new( + max_attempts: 17, + priority: 3, + queue: "my_queue", + tags: ["custom"] + )) + expect(insert_res.job).to have_attributes( + max_attempts: 17, + priority: 3, + queue: "my_queue", + tags: ["custom"] + ) + end + + it "inserts with job args hash" do + insert_res = client.insert(River::JobArgsHash.new("hash_kind", { + job_num: 1 + })) + expect(insert_res.job).to have_attributes( + args: {"job_num" => 1}, + kind: "hash_kind" + ) + end + + it "inserts in a transaction" do + insert_res = nil + + ActiveRecord::Base.transaction(requires_new: true) do + insert_res = client.insert(SimpleArgs.new(job_num: 1)) + + river_job = River::Driver::ActiveRecord::RiverJob.find_by(id: insert_res.job.id) + expect(river_job).to_not be_nil + + raise ActiveRecord::Rollback + end + + # Not visible because the job was rolled back. + river_job = River::Driver::ActiveRecord::RiverJob.find_by(id: insert_res.job.id) + expect(river_job).to be_nil + end + end + + describe "#to_job_row" do + it "converts a database record to `River::JobRow`" do + now = Time.now.utc + river_job = { + id: 1, + attempt: 1, + attempted_at: now, + attempted_by: ["client1"], + created_at: now, + args: JSON.generate(%({"job_num":1})), # encoded twice, like how ActiveRecord returns it + finalized_at: now, + kind: "simple", + max_attempts: River::MAX_ATTEMPTS_DEFAULT, + priority: River::PRIORITY_DEFAULT, + queue: River::QUEUE_DEFAULT, + scheduled_at: now, + state: River::JOB_STATE_COMPLETED, + tags: ["tag1"] + }.transform_keys { |k| k.to_s } + + job_row = driver.send(:to_job_row, river_job) + + expect(job_row).to be_an_instance_of(River::JobRow) + expect(job_row).to have_attributes( + id: 1, + args: {"job_num" => 1}, + attempt: 1, + attempted_at: now, + attempted_by: ["client1"], + created_at: now, + finalized_at: now, + kind: "simple", + max_attempts: River::MAX_ATTEMPTS_DEFAULT, + priority: River::PRIORITY_DEFAULT, + queue: River::QUEUE_DEFAULT, + scheduled_at: now, + state: River::JOB_STATE_COMPLETED, + tags: ["tag1"] + ) + end + + it "with errors" do + now = Time.now.utc + river_job = { + errors: [JSON.dump( + { + at: now, + attempt: 1, + error: "job failure", + trace: "error trace" + } + )] + }.transform_keys { |k| k.to_s } + + job_row = driver.send(:to_job_row, river_job) + + expect(job_row.errors.count).to be(1) + expect(job_row.errors[0]).to be_an_instance_of(River::AttemptError) + expect(job_row.errors[0]).to have_attributes( + at: now.floor(0), + attempt: 1, + error: "job failure", + trace: "error trace" + ) + end + end +end diff --git a/drivers/riverqueue-activerecord/spec/spec_helper.rb b/drivers/riverqueue-activerecord/spec/spec_helper.rb new file mode 100644 index 0000000..a81f76f --- /dev/null +++ b/drivers/riverqueue-activerecord/spec/spec_helper.rb @@ -0,0 +1,20 @@ +require "active_record" +require "debug" + +ActiveRecord::Base.establish_connection(ENV["TEST_DATABASE_URL"] || "postgres://localhost/riverqueue_ruby_test") + +def test_transaction + ActiveRecord::Base.transaction do + yield + raise ActiveRecord::Rollback + end +end + +require "simplecov" +SimpleCov.start do + enable_coverage :branch + minimum_coverage line: 100, branch: 100 +end + +require "riverqueue" +require "riverqueue-activerecord" diff --git a/drivers/riverqueue-sequel/Gemfile b/drivers/riverqueue-sequel/Gemfile new file mode 100644 index 0000000..be9359d --- /dev/null +++ b/drivers/riverqueue-sequel/Gemfile @@ -0,0 +1,14 @@ +source "https://rubygems.org" + +gemspec + +group :development, :test do + gem "riverqueue", path: "../.." + gem "standard" +end + +group :test do + gem "rspec-core" + gem "rspec-expectations" + gem "simplecov", require: false +end diff --git a/drivers/riverqueue-sequel/Gemfile.lock b/drivers/riverqueue-sequel/Gemfile.lock new file mode 100644 index 0000000..6c7d55f --- /dev/null +++ b/drivers/riverqueue-sequel/Gemfile.lock @@ -0,0 +1,90 @@ +PATH + remote: ../.. + specs: + riverqueue (0.0.1) + +PATH + remote: . + specs: + riverqueue-sequel (0.0.1) + pg + sequel + +GEM + remote: https://rubygems.org/ + specs: + ast (2.4.2) + bigdecimal (3.1.4) + diff-lcs (1.5.0) + docile (1.4.0) + json (2.7.1) + language_server-protocol (3.17.0.3) + lint_roller (1.1.0) + parallel (1.24.0) + parser (3.3.0.5) + ast (~> 2.4.1) + racc + pg (1.5.4) + racc (1.7.3) + rainbow (3.1.1) + regexp_parser (2.9.0) + rexml (3.2.6) + rspec-core (3.12.2) + rspec-support (~> 3.12.0) + rspec-expectations (3.12.3) + diff-lcs (>= 1.2.0, < 2.0) + rspec-support (~> 3.12.0) + rspec-support (3.12.1) + rubocop (1.61.0) + json (~> 2.3) + language_server-protocol (>= 3.17.0) + parallel (~> 1.10) + parser (>= 3.3.0.2) + rainbow (>= 2.2.2, < 4.0) + regexp_parser (>= 1.8, < 3.0) + rexml (>= 3.2.5, < 4.0) + rubocop-ast (>= 1.30.0, < 2.0) + ruby-progressbar (~> 1.7) + unicode-display_width (>= 2.4.0, < 3.0) + rubocop-ast (1.31.1) + parser (>= 3.3.0.4) + rubocop-performance (1.20.2) + rubocop (>= 1.48.1, < 2.0) + rubocop-ast (>= 1.30.0, < 2.0) + ruby-progressbar (1.13.0) + sequel (5.74.0) + bigdecimal + simplecov (0.22.0) + docile (~> 1.1) + simplecov-html (~> 0.11) + simplecov_json_formatter (~> 0.1) + simplecov-html (0.12.3) + simplecov_json_formatter (0.1.4) + standard (1.34.0) + language_server-protocol (~> 3.17.0.2) + lint_roller (~> 1.0) + rubocop (~> 1.60) + standard-custom (~> 1.0.0) + standard-performance (~> 1.3) + standard-custom (1.0.2) + lint_roller (~> 1.0) + rubocop (~> 1.50) + standard-performance (1.3.1) + lint_roller (~> 1.1) + rubocop-performance (~> 1.20.2) + unicode-display_width (2.5.0) + +PLATFORMS + arm64-darwin-22 + x86_64-linux + +DEPENDENCIES + riverqueue! + riverqueue-sequel! + rspec-core + rspec-expectations + simplecov + standard + +BUNDLED WITH + 2.4.20 diff --git a/drivers/riverqueue-sequel/docs/README.md b/drivers/riverqueue-sequel/docs/README.md new file mode 100644 index 0000000..e3f6601 --- /dev/null +++ b/drivers/riverqueue-sequel/docs/README.md @@ -0,0 +1,23 @@ +# riverqueue-sequel [![Build Status](https://github.com/riverqueue/riverqueue-ruby-sequel/workflows/CI/badge.svg)](https://github.com/riverqueue/riverqueue-ruby-sequel/actions) + +[Sequel](https://github.com/jeremyevans/sequel) driver for [River](https://github.com/riverqueue/river)'s [`riverqueue` gem for Ruby](https://rubygems.org/gems/riverqueue). + +`Gemfile` should contain the core gem and a driver like this one: + +``` yaml +gem "riverqueue" +gem "riverqueue-sequel" +``` + +Initialize a client with: + +```ruby +DB = Sequel.connect("postgres://...") +client = River::Client.new(River::Driver::Sequel.new(DB)) +``` + +See also [`rubyqueue`](https://github.com/riverqueue/riverqueue-ruby). + +## Development + +See [development](./development.md). diff --git a/drivers/riverqueue-sequel/docs/development.md b/drivers/riverqueue-sequel/docs/development.md new file mode 100644 index 0000000..2f85549 --- /dev/null +++ b/drivers/riverqueue-sequel/docs/development.md @@ -0,0 +1,48 @@ +# riverqueue-ruby development + +## Install dependencies + +```shell +$ bundle install +``` +## Run tests + +Create a test database and migrate with River's CLI: + +```shell +$ go install github.com/riverqueue/river/cmd/river +$ createdb riverqueue_ruby_test +$ river migrate-up --database-url "postgres://localhost/riverqueue_ruby_test" +``` + +Run all specs: + +```shell +$ bundle exec rspec spec +``` + +## Run lint + +```shell +$ standardrb --fix +``` + +## Code coverage + +Running the entire test suite will produce a coverage report, and will fail if line and branch coverage is below 100%. Run the suite and open `coverage/index.html` to find lines or branches that weren't covered: + +```shell +$ bundle exec rspec spec +$ open coverage/index.html +``` + +## Publish a new gem + +```shell +git checkout master && git pull --rebase +VERSION=v0.0.x +gem build riverqueue-sequel.gemspec +gem push riverqueue-sequel-$VERSION.gem +git tag $VERSION +git push --tags +``` \ No newline at end of file diff --git a/drivers/riverqueue-sequel/lib/driver.rb b/drivers/riverqueue-sequel/lib/driver.rb new file mode 100644 index 0000000..0626499 --- /dev/null +++ b/drivers/riverqueue-sequel/lib/driver.rb @@ -0,0 +1,75 @@ +module River::Driver + # Provides a Sequel driver for River. + # + # Used in conjunction with a River client like: + # + # DB = Sequel.connect("postgres://...") + # client = River::Client.new(River::Driver::Sequel.new(DB)) + # + class Sequel + def initialize(db) + @db = db + + # It's Ruby, so we can only define a model after Sequel's established a + # connection because it's all dynamic. + if !River::Driver::Sequel.const_defined?(:RiverJob) + River::Driver::Sequel.const_set(:RiverJob, Class.new(::Sequel::Model(:river_job))) + + # Since we only define our model once, take advantage of knowing this is + # our first initialization to add required extensions. + db.extension(:pg_array) + end + end + + def insert(insert_params) + # the call to `#compact` is important so that we remove nils and table + # default values get picked up instead + to_job_row( + RiverJob.create( + { + args: insert_params.encoded_args, + kind: insert_params.kind, + max_attempts: insert_params.max_attempts, + priority: insert_params.priority, + queue: insert_params.queue, + state: insert_params.state, + scheduled_at: insert_params.scheduled_at, + tags: insert_params.tags ? ::Sequel.pg_array(insert_params.tags) : nil + }.compact + ) + ) + end + + private def to_job_row(river_job) + # needs to be accessed through values because Sequel shadows `errors` + errors = river_job.values[:errors] + + River::JobRow.new( + id: river_job.id, + args: river_job.args ? JSON.parse(river_job.args) : nil, + attempt: river_job.attempt, + attempted_at: river_job.attempted_at, + attempted_by: river_job.attempted_by, + created_at: river_job.created_at, + errors: errors&.map { |e| + deserialized_error = JSON.parse(e, symbolize_names: true) + + River::AttemptError.new( + at: Time.parse(deserialized_error[:at]), + attempt: deserialized_error[:attempt], + error: deserialized_error[:error], + trace: deserialized_error[:trace] + ) + }, + finalized_at: river_job.finalized_at, + kind: river_job.kind, + max_attempts: river_job.max_attempts, + priority: river_job.priority, + queue: river_job.queue, + scheduled_at: river_job.scheduled_at, + state: river_job.state, + tags: river_job.tags + ) + end + end +end diff --git a/drivers/riverqueue-sequel/lib/riverqueue-sequel.rb b/drivers/riverqueue-sequel/lib/riverqueue-sequel.rb new file mode 100644 index 0000000..9c7ba09 --- /dev/null +++ b/drivers/riverqueue-sequel/lib/riverqueue-sequel.rb @@ -0,0 +1,6 @@ +require "sequel" + +require_relative "driver" + +module River +end diff --git a/drivers/riverqueue-sequel/riverqueue-sequel.gemspec b/drivers/riverqueue-sequel/riverqueue-sequel.gemspec new file mode 100644 index 0000000..2f72686 --- /dev/null +++ b/drivers/riverqueue-sequel/riverqueue-sequel.gemspec @@ -0,0 +1,15 @@ +Gem::Specification.new do |s| + s.name = "riverqueue-sequel" + s.version = "0.0.1" + s.summary = "Sequel driver for the River Ruby gem." + s.description = "Sequel driver for the River Ruby gem. Use in conjunction with the riverqueue gem to insert jobs that are worked in Go." + s.authors = ["Blake Gentry", "Brandur Leach"] + s.email = "brandur@brandur.org" + s.files = ["lib/riverqueue-sequel.rb"] + s.homepage = "https://riverqueue.com" + s.license = "LGPL-3.0-or-later" + + # The stupid version bounds are used to silence Ruby's extremely obnoxious warnings. + s.add_dependency "pg", "> 0", "< 1000" + s.add_dependency "sequel", "> 0", "< 1000" +end diff --git a/drivers/riverqueue-sequel/spec/driver_spec.rb b/drivers/riverqueue-sequel/spec/driver_spec.rb new file mode 100644 index 0000000..9c2fde7 --- /dev/null +++ b/drivers/riverqueue-sequel/spec/driver_spec.rb @@ -0,0 +1,202 @@ +require "spec_helper" + +class SimpleArgs + attr_accessor :job_num + + def initialize(job_num:) + self.job_num = job_num + end + + def kind = "simple" + + def to_json = JSON.dump({job_num: job_num}) +end + +# Lets us test job-specific insertion opts by making `#insert_opts` an accessor. +# Real args that make use of this functionality will probably want to make +# `#insert_opts` a non-accessor method instead. +class SimpleArgsWithInsertOpts < SimpleArgs + attr_accessor :insert_opts +end + +RSpec.describe River::Driver::Sequel do + around(:each) { |ex| test_transaction(&ex) } + + let!(:driver) { River::Driver::Sequel.new(DB) } + let(:client) { River::Client.new(driver) } + + describe "#insert" do + it "inserts a job" do + insert_res = client.insert(SimpleArgs.new(job_num: 1)) + expect(insert_res.job).to have_attributes( + attempt: 0, + args: {"job_num" => 1}, + created_at: be_within(2).of(Time.now.utc), + kind: "simple", + max_attempts: River::MAX_ATTEMPTS_DEFAULT, + queue: River::QUEUE_DEFAULT, + priority: River::PRIORITY_DEFAULT, + scheduled_at: be_within(2).of(Time.now.utc), + state: River::JOB_STATE_AVAILABLE, + tags: ::Sequel.pg_array([]) + ) + + # Make sure it made it to the database. Assert only minimally since we're + # certain it's the same as what we checked above. + river_job = River::Driver::Sequel::RiverJob.first(id: insert_res.job.id) + expect(river_job).to have_attributes( + kind: "simple" + ) + end + + it "schedules a job" do + target_time = Time.now.utc + 1 * 3600 + + insert_res = client.insert( + SimpleArgs.new(job_num: 1), + insert_opts: River::InsertOpts.new(scheduled_at: target_time) + ) + expect(insert_res.job).to have_attributes( + scheduled_at: be_within(2).of(target_time), + state: River::JOB_STATE_SCHEDULED + ) + end + + it "inserts with job insert opts" do + args = SimpleArgsWithInsertOpts.new(job_num: 1) + args.insert_opts = River::InsertOpts.new( + max_attempts: 23, + priority: 2, + queue: "job_custom_queue", + tags: ["job_custom"] + ) + + insert_res = client.insert(args) + expect(insert_res.job).to have_attributes( + max_attempts: 23, + priority: 2, + queue: "job_custom_queue", + tags: ["job_custom"] + ) + end + + it "inserts with insert opts" do + # We set job insert opts in this spec too so that we can verify that the + # options passed at insertion time take precedence. + args = SimpleArgsWithInsertOpts.new(job_num: 1) + args.insert_opts = River::InsertOpts.new( + max_attempts: 23, + priority: 2, + queue: "job_custom_queue", + tags: ["job_custom"] + ) + + insert_res = client.insert(args, insert_opts: River::InsertOpts.new( + max_attempts: 17, + priority: 3, + queue: "my_queue", + tags: ["custom"] + )) + expect(insert_res.job).to have_attributes( + max_attempts: 17, + priority: 3, + queue: "my_queue", + tags: ["custom"] + ) + end + + it "inserts with job args hash" do + insert_res = client.insert(River::JobArgsHash.new("hash_kind", { + job_num: 1 + })) + expect(insert_res.job).to have_attributes( + args: {"job_num" => 1}, + kind: "hash_kind" + ) + end + + it "inserts in a transaction" do + insert_res = nil + + DB.transaction(savepoint: true) do + insert_res = client.insert(SimpleArgs.new(job_num: 1)) + + river_job = River::Driver::Sequel::RiverJob.first(id: insert_res.job.id) + expect(river_job).to_not be_nil + + raise Sequel::Rollback + end + + # Not visible because the job was rolled back. + river_job = River::Driver::Sequel::RiverJob.first(id: insert_res.job.id) + expect(river_job).to be_nil + end + end + + describe "#to_job_row" do + it "converts a database record to `River::JobRow`" do + now = Time.now.utc + river_job = River::Driver::Sequel::RiverJob.new( + attempt: 1, + attempted_at: now, + attempted_by: ["client1"], + created_at: now, + args: %({"job_num":1}), + finalized_at: now, + kind: "simple", + max_attempts: River::MAX_ATTEMPTS_DEFAULT, + priority: River::PRIORITY_DEFAULT, + queue: River::QUEUE_DEFAULT, + scheduled_at: now, + state: River::JOB_STATE_COMPLETED, + tags: ["tag1"] + ) + river_job.id = 1 + + job_row = driver.send(:to_job_row, river_job) + + expect(job_row).to be_an_instance_of(River::JobRow) + expect(job_row).to have_attributes( + id: 1, + args: {"job_num" => 1}, + attempt: 1, + attempted_at: now, + attempted_by: ["client1"], + created_at: now, + finalized_at: now, + kind: "simple", + max_attempts: River::MAX_ATTEMPTS_DEFAULT, + priority: River::PRIORITY_DEFAULT, + queue: River::QUEUE_DEFAULT, + scheduled_at: now, + state: River::JOB_STATE_COMPLETED, + tags: ["tag1"] + ) + end + + it "with errors" do + now = Time.now.utc + river_job = River::Driver::Sequel::RiverJob.new( + errors: [JSON.dump( + { + at: now, + attempt: 1, + error: "job failure", + trace: "error trace" + } + )] + ) + + job_row = driver.send(:to_job_row, river_job) + + expect(job_row.errors.count).to be(1) + expect(job_row.errors[0]).to be_an_instance_of(River::AttemptError) + expect(job_row.errors[0]).to have_attributes( + at: now.floor(0), + attempt: 1, + error: "job failure", + trace: "error trace" + ) + end + end +end diff --git a/drivers/riverqueue-sequel/spec/spec_helper.rb b/drivers/riverqueue-sequel/spec/spec_helper.rb new file mode 100644 index 0000000..4a1d589 --- /dev/null +++ b/drivers/riverqueue-sequel/spec/spec_helper.rb @@ -0,0 +1,19 @@ +require "sequel" + +DB = Sequel.connect(ENV["TEST_DATABASE_URL"] || "postgres://localhost/riverqueue_ruby_test") + +def test_transaction + DB.transaction do + yield + raise Sequel::Rollback + end +end + +require "simplecov" +SimpleCov.start do + enable_coverage :branch + minimum_coverage line: 100, branch: 100 +end + +require "riverqueue" +require "riverqueue-sequel" diff --git a/lib/client.rb b/lib/client.rb new file mode 100644 index 0000000..58d5bbc --- /dev/null +++ b/lib/client.rb @@ -0,0 +1,110 @@ +module River + MAX_ATTEMPTS_DEFAULT = 25 + PRIORITY_DEFAULT = 1 + QUEUE_DEFAULT = "default" + + # Provides a client for River that inserts jobs. Unlike the Go version of the + # River client, this one can insert jobs only. Jobs can only be worked from Go + # code, so job arg kinds and JSON encoding details must be shared between Ruby + # and Go code. + # + # Used in conjunction with a River driver like: + # + # DB = Sequel.connect(...) + # client = River::Client.new(River::Driver::Sequel.new(DB)) + # + # River drivers are found in separate gems like `riverqueue-sequel` to help + # minimize transient dependencies. + class Client + def initialize(driver) + @driver = driver + end + + # Inserts a new job for work given a job args implementation and insertion + # options (which may be omitted). + # + # Job arg implementations are expected to respond to: + # + # * `#kind`: A string that uniquely identifies the job in the database. + # * `#to_json`: Encodes the args to JSON for persistence in the database. + # Must match encoding an args struct on the Go side to be workable. + # + # They may also respond to `#insert_opts` which is expected to return an + # `InsertOpts` that contains options that will apply to all jobs of this + # kind. Insertion options provided as an argument to `#insert` override + # those returned by job args. + # + # Returns an instance of InsertResult. + def insert(args, insert_opts: InsertOpts.new) + raise "args should respond to `#kind`" if !args.respond_to?(:kind) + + # ~all objects in Ruby respond to `#to_json`, so check non-nil instead. + args_json = args.to_json + raise "args should return non-nil from `#to_json`" if !args_json + + args_insert_opts = if args.respond_to?(:insert_opts) + args_with_insert_opts = args #: _JobArgsWithInsertOpts # rubocop:disable Layout/LeadingCommentSpace + args_with_insert_opts.insert_opts || InsertOpts.new + else + InsertOpts.new + end + + scheduled_at = insert_opts.scheduled_at || args_insert_opts.scheduled_at + + job = @driver.insert(Driver::JobInsertParams.new( + encoded_args: args_json, + kind: args.kind, + max_attempts: insert_opts.max_attempts || args_insert_opts.max_attempts || MAX_ATTEMPTS_DEFAULT, + priority: insert_opts.priority || args_insert_opts.priority || PRIORITY_DEFAULT, + queue: insert_opts.queue || args_insert_opts.queue || QUEUE_DEFAULT, + scheduled_at: scheduled_at&.utc, # database defaults to now + state: scheduled_at ? JOB_STATE_SCHEDULED : JOB_STATE_AVAILABLE, + tags: insert_opts.tags || args_insert_opts.tags + )) + + InsertResult.new(job) + end + + # Inserts many new jobs as part of a single batch operation for improved + # efficiency. + # + # Takes an array of job args or InsertManyParams which encapsulate job args + # and a paired InsertOpts. + # + # Job arg implementations are expected to respond to: + # + # * `#kind`: A string that uniquely identifies the job in the database. + # * `#to_json`: Encodes the args to JSON for persistence in the database. + # Must match encoding an args struct on the Go side to be workable. + # + # Returns the number of jobs inserted. + def insert_many(args) + raise "sorry, not implemented yet" + end + end + + # A single job to insert that's part of an #insert_many batch insert. Unlike + # sending raw job args, supports an InsertOpts to pair with the job. + class InsertManyParams + # Job args to insert. + attr_reader :args + + # Insertion options to use with the insert. + attr_reader :insert_opts + + def initialize(args, insert_opts: nil) + @args = args + @insert_opts = insert_opts + end + end + + # Result of a single insertion. + class InsertResult + # Inserted job row. + attr_reader :job + + def initialize(job) + @job = job + end + end +end diff --git a/lib/driver.rb b/lib/driver.rb new file mode 100644 index 0000000..97045fd --- /dev/null +++ b/lib/driver.rb @@ -0,0 +1,47 @@ +module River + # Contains an interface used by the top-level River module to interface with + # its driver implementations. All types and methods in this module should be + # considered to be for internal use only and subject to change. API stability + # is not guaranteed. + module Driver + # Insert parameters for a job. This is sent to underlying drivers and is meant + # for internal use only. Its interface is subject to change. + class JobInsertParams + attr_accessor :encoded_args + attr_accessor :kind + attr_accessor :max_attempts + attr_accessor :priority + attr_accessor :queue + attr_accessor :scheduled_at + attr_accessor :state + attr_accessor :tags + + # TODO(brandur): Get these supported. + # attr_accessor :unique + # attr_accessor :unique_by_args + # attr_accessor :unique_by_period + # attr_accessor :unique_by_queue + # attr_accessor :unique_by_state + + def initialize( + encoded_args:, + kind:, + max_attempts:, + priority:, + queue:, + scheduled_at:, + state:, + tags: + ) + self.encoded_args = encoded_args + self.kind = kind + self.max_attempts = max_attempts + self.priority = priority + self.queue = queue + self.scheduled_at = scheduled_at + self.state = state + self.tags = tags + end + end + end +end diff --git a/lib/insert_opts.rb b/lib/insert_opts.rb new file mode 100644 index 0000000..98035c0 --- /dev/null +++ b/lib/insert_opts.rb @@ -0,0 +1,65 @@ +module River + class InsertOpts + # MaxAttempts is the maximum number of total attempts (including both the + # original run and all retries) before a job is abandoned and set as + # discarded. + attr_accessor :max_attempts + + # Priority is the priority of the job, with 1 being the highest priority and + # 4 being the lowest. When fetching available jobs to work, the highest + # priority jobs will always be fetched before any lower priority jobs are + # fetched. Note that if your workers are swamped with more high-priority jobs + # then they can handle, lower priority jobs may not be fetched. + # + # Defaults to PRIORITY_DEFAULT. + attr_accessor :priority + + # Queue is the name of the job queue in which to insert the job. + # + # Defaults to QUEUE_DEFAULT. + attr_accessor :queue + + # ScheduledAt is a time in future at which to schedule the job (i.e. in + # cases where it shouldn't be run immediately). The job is guaranteed not + # to run before this time, but may run slightly after depending on the + # number of other scheduled jobs and how busy the queue is. + # + # Use of this option generally only makes sense when passing options into + # Insert rather than when a job args is returning `#insert_opts`, however, + # it will work in both cases. + attr_accessor :scheduled_at + + # Tags are an arbitrary list of keywords to add to the job. They have no + # functional behavior and are meant entirely as a user-specified construct + # to help group and categorize jobs. + # + # If tags are specified from both a job args override and from options on + # Insert, the latter takes precedence. Tags are not merged. + attr_accessor :tags + + # UniqueOpts returns options relating to job uniqueness. An empty struct + # avoids setting any worker-level unique options. + # + # TODO: Implement. + attr_accessor :unique_opts + + def initialize( + max_attempts: nil, + priority: nil, + queue: nil, + scheduled_at: nil, + tags: nil, + unique_opts: nil + ) + self.max_attempts = max_attempts + self.priority = priority + self.queue = queue + self.scheduled_at = scheduled_at + self.tags = tags + self.unique_opts = unique_opts + end + end + + class UniqueOpts + end +end diff --git a/lib/job.rb b/lib/job.rb new file mode 100644 index 0000000..58e32c9 --- /dev/null +++ b/lib/job.rb @@ -0,0 +1,169 @@ +module River + JOB_STATE_AVAILABLE = "available" + JOB_STATE_CANCELLED = "cancelled" + JOB_STATE_COMPLETED = "completed" + JOB_STATE_DISCARDED = "discarded" + JOB_STATE_RETRYABLE = "retryable" + JOB_STATE_RUNNING = "running" + JOB_STATE_SCHEDULED = "scheduled" + + # Provides a way of creating a job args from a simple Ruby hash for a quick + # way to insert a job without having to define a class. The first argument is + # a "kind" string for identifying the job in the database and the second is a + # hash that will be encoded to JSON. + class JobArgsHash + def initialize(kind, hash) + raise "kind should be non-nil" if !kind + raise "hash should be non-nil" if !hash + + @kind = kind + @hash = hash + end + + attr_reader :kind + + def to_json + JSON.dump(@hash) + end + end + + # JobRow contains the properties of a job that are persisted to the database. + class JobRow + # ID of the job. Generated as part of a Postgres sequence and generally + # ascending in nature, but there may be gaps in it as transactions roll + # back. + attr_accessor :id + + # The job's args as a hash decoded from JSON. + attr_accessor :args + + # The attempt number of the job. Jobs are inserted at 0, the number is + # incremented to 1 the first time work its worked, and may increment further + # if it's either snoozed or errors. + attr_accessor :attempt + + # The time that the job was last worked. Starts out as `nil` on a new + # insert. + attr_accessor :attempted_at + + # The set of worker IDs that have worked this job. A worker ID differs + # between different programs, but is shared by all executors within any + # given one. (i.e. Different Go processes have different IDs, but IDs are + # shared within any given process.) A process generates a new ULID (an + # ordered UUID) worker ID when it starts up. + attr_accessor :attempted_by + + # When the job record was created. + attr_accessor :created_at + + # A set of errors that occurred when the job was worked, one for each + # attempt. Ordered from earliest error to the latest error. + attr_accessor :errors + + # The time at which the job was "finalized", meaning it was either completed + # successfully or errored for the last time such that it'll no longer be + # retried. + attr_accessor :finalized_at + + # Kind uniquely identifies the type of job and instructs which worker + # should work it. It is set at insertion time via `#kind` on job args. + attr_accessor :kind + + # The maximum number of attempts that the job will be tried before it errors + # for the last time and will no longer be worked. + attr_accessor :max_attempts + + # The priority of the job, with 1 being the highest priority and 4 being the + # lowest. When fetching available jobs to work, the highest priority jobs + # will always be fetched before any lower priority jobs are fetched. Note + # that if your workers are swamped with more high-priority jobs then they + # can handle, lower priority jobs may not be fetched. + attr_accessor :priority + + # The name of the queue where the job will be worked. Queues can be + # configured independently and be used to isolate jobs. + attr_accessor :queue + + # When the job is scheduled to become available to be worked. Jobs default + # to running immediately, but may be scheduled for the future when they're + # inserted. They may also be scheduled for later because they were snoozed + # or because they errored and have additional retry attempts remaining. + attr_accessor :scheduled_at + + # The state of job like `available` or `completed`. Jobs are `available` + # when they're first inserted. + attr_accessor :state + + # Tags are an arbitrary list of keywords to add to the job. They have no + # functional behavior and are meant entirely as a user-specified construct + # to help group and categorize jobs. + attr_accessor :tags + + def initialize( + id:, + args:, + attempt:, + created_at:, + kind:, + max_attempts:, + priority:, + queue:, + scheduled_at:, + state:, + + # nullable/optional + attempted_at: nil, + attempted_by: nil, + errors: nil, + finalized_at: nil, + tags: nil + ) + self.id = id + self.args = args + self.attempt = attempt + self.attempted_at = attempted_at + self.attempted_by = attempted_by + self.created_at = created_at + self.errors = errors + self.finalized_at = finalized_at + self.kind = kind + self.max_attempts = max_attempts + self.priority = priority + self.queue = queue + self.scheduled_at = scheduled_at + self.state = state + self.tags = tags + end + end + + # A failed job work attempt containing information about the error or panic + # that occurred. + class AttemptError + # The time at which the error occurred. + attr_accessor :at + + # The attempt number on which the error occurred (maps to #attempt on a job + # row). + attr_accessor :attempt + + # Contains the stringified error of an error returned from a job or a panic + # value in case of a panic. + attr_accessor :error + + # Contains a stack trace from a job that panicked. The trace is produced by + # invoking `debug.Trace()`. + attr_accessor :trace + + def initialize( + at:, + attempt:, + error:, + trace: + ) + self.at = at + self.attempt = attempt + self.error = error + self.trace = trace + end + end +end diff --git a/lib/riverqueue.rb b/lib/riverqueue.rb index e69de29..e77b938 100644 --- a/lib/riverqueue.rb +++ b/lib/riverqueue.rb @@ -0,0 +1,9 @@ +require "json" + +require_relative "client" +require_relative "driver" +require_relative "insert_opts" +require_relative "job" + +module River +end diff --git a/riverqueue.gemspec b/riverqueue.gemspec index 1da7766..a56d897 100644 --- a/riverqueue.gemspec +++ b/riverqueue.gemspec @@ -1,11 +1,11 @@ Gem::Specification.new do |s| - s.name = "riverqueue" - s.version = "0.0.1" - s.summary = "A fast job queue for Go" - s.description = "A fast job queue for Go." - s.authors = ["Blake Gentry", "Brandur Leach"] - s.email = "brandur@brandur.org" - s.files = ["lib/riverqueue.rb"] - s.homepage = "https://riverqueue.com" - s.license = "LGPL-3.0-or-later" + s.name = "riverqueue" + s.version = "0.0.1" + s.summary = "River is a fast job queue for Go." + s.description = "River is a fast job queue for Go. Use this gem in conjunction with gems riverqueue-activerecord or riverqueue-sequel to insert jobs in Ruby which will be worked from Go." + s.authors = ["Blake Gentry", "Brandur Leach"] + s.email = "brandur@brandur.org" + s.files = ["lib/riverqueue.rb"] + s.homepage = "https://riverqueue.com" + s.license = "LGPL-3.0-or-later" end diff --git a/sig/client.rbs b/sig/client.rbs new file mode 100644 index 0000000..b45a73c --- /dev/null +++ b/sig/client.rbs @@ -0,0 +1,31 @@ +module River + MAX_ATTEMPTS_DEFAULT: Integer + PRIORITY_DEFAULT: Integer + QUEUE_DEFAULT: String + + class Client + @driver: _Driver + + def initialize: (_Driver driver) -> void + def insert: (jobArgs, ?insert_opts: InsertOpts) -> InsertResult + def insert_many: (Array[jobArgs | InsertManyParams]) -> Integer + end + + class InsertManyParams + @args: jobArgs + @insert_opts: InsertOpts? + + attr_reader args: jobArgs + attr_reader insert_opts: InsertOpts? + + def initialize: (jobArgs job, ?insert_opts: InsertOpts?) -> void + end + + class InsertResult + @job: JobRow + + attr_reader job: JobRow + + def initialize: (JobRow job) -> void + end +end diff --git a/sig/driver.rbs b/sig/driver.rbs new file mode 100644 index 0000000..2d4c9b8 --- /dev/null +++ b/sig/driver.rbs @@ -0,0 +1,27 @@ +module River + interface _Driver + def insert: (Driver::JobInsertParams) -> JobRow + end + + module Driver + class JobInsertParams + attr_accessor encoded_args: String + attr_accessor kind: String + attr_accessor max_attempts: Integer + attr_accessor priority: Integer + attr_accessor queue: String + attr_accessor scheduled_at: Time? + attr_accessor state: jobStateAll + attr_accessor tags: Array[String]? + + # TODO(brandur): Get these supported. + # attr_accessor :unique + # attr_accessor :unique_by_args + # attr_accessor :unique_by_period + # attr_accessor :unique_by_queue + # attr_accessor :unique_by_state + + def initialize: (encoded_args: String, kind: String, max_attempts: Integer, priority: Integer, queue: String, scheduled_at: Time?, state: jobStateAll, tags: Array[String]?) -> void + end + end +end diff --git a/sig/insert_opts.rbs b/sig/insert_opts.rbs new file mode 100644 index 0000000..387905a --- /dev/null +++ b/sig/insert_opts.rbs @@ -0,0 +1,21 @@ +module River + class InsertOpts + attr_accessor max_attempts: Integer? + + attr_accessor priority: Integer? + + attr_accessor queue: String? + + attr_accessor scheduled_at: Time? + + attr_accessor tags: Array[String]? + + attr_accessor unique_opts: UniqueOpts? + + def initialize: (?max_attempts: Integer?, ?priority: Integer?, ?queue: String?, ?scheduled_at: Time?, ?tags: Array[String]?, ?unique_opts: UniqueOpts?) -> void + end + + class UniqueOpts + # TODO + end +end diff --git a/sig/job.rbs b/sig/job.rbs new file mode 100644 index 0000000..78bda83 --- /dev/null +++ b/sig/job.rbs @@ -0,0 +1,64 @@ +module River + JOB_STATE_AVAILABLE: "available" + JOB_STATE_CANCELLED: "cancelled" + JOB_STATE_COMPLETED: "completed" + JOB_STATE_DISCARDED: "discarded" + JOB_STATE_RETRYABLE: "retryable" + JOB_STATE_RUNNING: "running" + JOB_STATE_SCHEDULED: "scheduled" + + type jobStateAll = "available" | "cancelled" | "completed" | "discarded" | "retryable" | "running" | "scheduled" + + interface _JobArgs + def kind: () -> String + def respond_to?: (Symbol) -> bool + def to_json: () -> String + end + + interface _JobArgsWithInsertOpts + include _JobArgs + + def insert_opts: () -> InsertOpts? + end + + type jobArgs = _JobArgs | _JobArgsWithInsertOpts + + class JobArgsHash + @kind: String + @hash: Hash[String | Symbol, untyped] + + attr_reader kind: String + + def initialize: (String kind, Hash[String | Symbol, untyped] hash) -> void + def to_json: () -> String + end + + class JobRow + attr_accessor id: Integer + attr_accessor args: Hash[String, untyped] + attr_accessor attempt: Integer + attr_accessor attempted_at: Time? + attr_accessor attempted_by: String? + attr_accessor created_at: Time + attr_accessor errors: Array[AttemptError]? + attr_accessor finalized_at: Time? + attr_accessor kind: String + attr_accessor max_attempts: Integer + attr_accessor priority: Integer + attr_accessor queue: String + attr_accessor scheduled_at: Time + attr_accessor state: jobStateAll + attr_accessor tags: Array[String]? + + def initialize: (id: Integer, args: Hash[String, untyped], attempt: Integer, ?attempted_at: Time?, ?attempted_by: String?, created_at: Time, ?errors: Array[AttemptError]?, ?finalized_at: Time?, kind: String, max_attempts: Integer, priority: Integer, queue: String, scheduled_at: Time, state: jobStateAll, ?tags: Array[String]?) -> void + end + + class AttemptError + attr_accessor at: Time + attr_accessor attempt: Integer + attr_accessor error: String + attr_accessor trace: String + + def initialize: (at: Time, attempt: Integer, error: String, trace: String) -> void + end +end diff --git a/sig/riverqueue.rbs b/sig/riverqueue.rbs new file mode 100644 index 0000000..6ad50d4 --- /dev/null +++ b/sig/riverqueue.rbs @@ -0,0 +1,2 @@ +module River +end diff --git a/spec/client_spec.rb b/spec/client_spec.rb new file mode 100644 index 0000000..9def9cf --- /dev/null +++ b/spec/client_spec.rb @@ -0,0 +1,184 @@ +require "spec_helper" + +# We use a mock here, but each driver has a more comprehensive test suite that +# performs full integration level tests. +class MockDriver + def initialize + @insert_params = [] + @next_id = 0 + end + + def insert(insert_params) + @insert_params << insert_params + + River::JobRow.new( + id: (@next_id += 1), + args: JSON.parse(insert_params.encoded_args), + attempt: 0, + attempted_by: nil, + created_at: Time.now, + errors: nil, + finalized_at: nil, + kind: insert_params.kind, + max_attempts: insert_params.max_attempts, + priority: insert_params.priority, + queue: insert_params.queue, + scheduled_at: insert_params.scheduled_at || Time.now, # normally defaults from DB + state: insert_params.state, + tags: insert_params.tags + ) + end +end + +class SimpleArgs + attr_accessor :job_num + + def initialize(job_num:) + self.job_num = job_num + end + + def kind = "simple" + + def to_json = JSON.dump({job_num: job_num}) +end + +# Lets us test job-specific insertion opts by making `#insert_opts` an accessor. +# Real args that make use of this functionality will probably want to make +# `#insert_opts` a non-accessor method instead. +class SimpleArgsWithInsertOpts < SimpleArgs + attr_accessor :insert_opts +end + +RSpec.describe River::Client do + let(:client) { River::Client.new(mock_driver) } + let(:mock_driver) { MockDriver.new } + + describe "#insert" do + it "inserts a job with defaults" do + insert_res = client.insert(SimpleArgs.new(job_num: 1)) + expect(insert_res.job).to have_attributes( + id: 1, + args: {"job_num" => 1}, + attempt: 0, + created_at: be_within(2).of(Time.now), + kind: "simple", + max_attempts: River::MAX_ATTEMPTS_DEFAULT, + priority: River::PRIORITY_DEFAULT, + queue: River::QUEUE_DEFAULT, + scheduled_at: be_within(2).of(Time.now), + state: River::JOB_STATE_AVAILABLE, + tags: nil + ) + end + + it "schedules a job" do + target_time = Time.now + 1 * 3600 + + insert_res = client.insert( + SimpleArgs.new(job_num: 1), + insert_opts: River::InsertOpts.new(scheduled_at: target_time) + ) + expect(insert_res.job).to have_attributes( + scheduled_at: be_within(2).of(target_time), + state: River::JOB_STATE_SCHEDULED + ) + + # Expect all inserted timestamps to go to UTC. + expect(insert_res.job.scheduled_at.utc?).to be true + end + + it "inserts with job insert opts" do + args = SimpleArgsWithInsertOpts.new(job_num: 1) + args.insert_opts = River::InsertOpts.new( + max_attempts: 23, + priority: 2, + queue: "job_custom_queue", + tags: ["job_custom"] + ) + + insert_res = client.insert(args) + expect(insert_res.job).to have_attributes( + max_attempts: 23, + priority: 2, + queue: "job_custom_queue", + tags: ["job_custom"] + ) + end + + it "inserts with insert opts" do + # We set job insert opts in this spec too so that we can verify that the + # options passed at insertion time take precedence. + args = SimpleArgsWithInsertOpts.new(job_num: 1) + args.insert_opts = River::InsertOpts.new( + max_attempts: 23, + priority: 2, + queue: "job_custom_queue", + tags: ["job_custom"] + ) + + insert_res = client.insert(args, insert_opts: River::InsertOpts.new( + max_attempts: 17, + priority: 3, + queue: "my_queue", + tags: ["custom"] + )) + expect(insert_res.job).to have_attributes( + max_attempts: 17, + priority: 3, + queue: "my_queue", + tags: ["custom"] + ) + end + + it "inserts with job args hash" do + insert_res = client.insert(River::JobArgsHash.new("hash_kind", { + job_num: 1 + })) + expect(insert_res.job).to have_attributes( + args: {"job_num" => 1}, + kind: "hash_kind" + ) + end + + it "errors if args don't respond to #kind" do + args_klass = Class.new do + def to_json = {} + end + + expect do + client.insert(args_klass.new) + end.to raise_error(RuntimeError, "args should respond to `#kind`") + end + + it "errors if args return nil from #to_json" do + args_klass = Class.new do + def kind = "args_kind" + + def to_json = nil + end + + expect do + client.insert(args_klass.new) + end.to raise_error(RuntimeError, "args should return non-nil from `#to_json`") + end + end + + describe "#insert_many" do + it "inserts many jobs" do + expect do + client.insert_many([]) + end.to raise_error(RuntimeError, "sorry, not implemented yet") + end + end +end + +RSpec.describe River::InsertManyParams do + it "initializes" do + args = SimpleArgs.new(job_num: 1) + insert_opts = River::InsertOpts.new(queue: "other") + + params = River::InsertManyParams.new(args, insert_opts: insert_opts) + expect(params.args).to eq(args) + expect(params.insert_opts).to eq(insert_opts) + end +end diff --git a/spec/job_spec.rb b/spec/job_spec.rb new file mode 100644 index 0000000..d34b40a --- /dev/null +++ b/spec/job_spec.rb @@ -0,0 +1,40 @@ +require "spec_helper" + +describe River::JobArgsHash do + it "generates a job args based on a hash" do + args = River::JobArgsHash.new("my_hash_kind", {job_num: 123}) + expect(args.kind).to eq("my_hash_kind") + expect(args.to_json).to eq(JSON.dump({job_num: 123})) + end + + it "errors on a nil kind" do + expect do + River::JobArgsHash.new(nil, {job_num: 123}) + end.to raise_error(RuntimeError, "kind should be non-nil") + end + + it "errors on a nil hash" do + expect do + River::JobArgsHash.new("my_hash_kind", nil) + end.to raise_error(RuntimeError, "hash should be non-nil") + end +end + +describe River::AttemptError do + it "initializes with parameters" do + now = Time.now + + attempt_error = River::AttemptError.new( + at: now, + attempt: 1, + error: "job failure", + trace: "error trace" + ) + expect(attempt_error).to have_attributes( + at: now, + attempt: 1, + error: "job failure", + trace: "error trace" + ) + end +end diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb new file mode 100644 index 0000000..a906f38 --- /dev/null +++ b/spec/spec_helper.rb @@ -0,0 +1,12 @@ +require "debug" + +# Only show coverage information if running the entire suite. +if RSpec.configuration.files_to_run.length > 1 + require "simplecov" + SimpleCov.start do + enable_coverage :branch + minimum_coverage line: 100, branch: 100 + end +end + +require "riverqueue"