From 32af5cf7af861bc1bbf7da2d9732e38dabba14bd Mon Sep 17 00:00:00 2001 From: Brandur Date: Sat, 27 Apr 2024 20:28:25 -0700 Subject: [PATCH] Implement unique job insertions Implement unique job insertion in a way that's compatible with the main Go library. Unique opts live on insert opts just like in Go, and we copy as much of its API as possible including return result. insert_res = client.insert(args, insert_opts: River::InsertOpts.new( unique_opts: River::UniqueOpts.new( by_args: true, by_period: 15 * 60, by_queue: true, by_state: [River::JOB_STATE_AVAILABLE] ) ) # contains either a newly inserted job, or an existing one if insertion was skipped insert_res.job # true if insertion was skipped insert_res.unique_skipped_as_duplicated --- CHANGELOG.md | 4 + Gemfile.lock | 2 + Steepfile | 2 + docs/README.md | 25 ++- drivers/riverqueue-activerecord/Gemfile.lock | 2 + drivers/riverqueue-activerecord/lib/driver.rb | 21 ++- .../spec/driver_spec.rb | 171 +++++++++++++++++- drivers/riverqueue-sequel/Gemfile.lock | 2 + drivers/riverqueue-sequel/lib/driver.rb | 21 ++- drivers/riverqueue-sequel/spec/driver_spec.rb | 171 +++++++++++++++++- lib/client.rb | 151 ++++++++++++++-- lib/driver.rb | 30 ++- lib/insert_opts.rb | 103 +++++++++-- riverqueue.gemspec | 2 + sig/client.rbs | 14 +- sig/driver.rbs | 24 ++- sig/insert_opts.rbs | 7 +- sig_gem/fnv-hash/fnv.rbs | 21 +++ spec/client_spec.rb | 170 ++++++++++++++++- 19 files changed, 874 insertions(+), 69 deletions(-) create mode 100644 sig_gem/fnv-hash/fnv.rbs diff --git a/CHANGELOG.md b/CHANGELOG.md index e7739db..f95d759 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added + +- Implement unique job insertion. [PR #10](https://github.com/riverqueue/riverqueue-ruby/pull/10). + ## [0.2.0] - 2024-04-27 ### Added diff --git a/Gemfile.lock b/Gemfile.lock index c76f9a9..4e71c8e 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -2,6 +2,7 @@ PATH remote: . specs: riverqueue (0.2.0) + fnv-hash GEM remote: https://rubygems.org/ @@ -31,6 +32,7 @@ GEM drb (2.2.1) ffi (1.16.3) fileutils (1.7.2) + fnv-hash (0.2.0) i18n (1.14.4) concurrent-ruby (~> 1.0) io-console (0.7.2) diff --git a/Steepfile b/Steepfile index 620dab0..65146c1 100644 --- a/Steepfile +++ b/Steepfile @@ -4,8 +4,10 @@ target :lib do check "lib" library "json" + library "time" signature "sig" + signature "sig_gem" configure_code_diagnostics(D::Ruby.strict) end diff --git a/docs/README.md b/docs/README.md index 328b5e2..33cd7a9 100644 --- a/docs/README.md +++ b/docs/README.md @@ -60,6 +60,27 @@ insert_res = client.insert( ) ``` +## Inserting unique jobs + +[Unique jobs](https://riverqueue.com/docs/unique-jobs) are supported through `InsertOpts#unique_opts`, and can be made unique by args, period, queue, and state. If a job matching unique properties is found on insert, the insert is skipped and the existing job returned. + +```ruby +insert_res = client.insert(args, insert_opts: River::InsertOpts.new( + unique_opts: River::UniqueOpts.new( + by_args: true, + by_period: 15 * 60, + by_queue: true, + by_state: [River::JOB_STATE_AVAILABLE] + ) +) + +# contains either a newly inserted job, or an existing one if insertion was skipped +insert_res.job + +# true if insertion was skipped +insert_res.unique_skipped_as_duplicated +``` + ## Inserting jobs in bulk Use `#insert_many` to bulk insert jobs as a single operation for improved efficiency: @@ -75,8 +96,8 @@ Or with `InsertManyParams`, which may include insertion options: ```ruby num_inserted = client.insert_many([ - River::InsertManyParams.new(SimpleArgs.new(job_num: 1), insert_opts: InsertOpts.new(max_attempts: 5)), - River::InsertManyParams.new(SimpleArgs.new(job_num: 2), insert_opts: InsertOpts.new(queue: "high_priority")) + River::InsertManyParams.new(SimpleArgs.new(job_num: 1), insert_opts: River::InsertOpts.new(max_attempts: 5)), + River::InsertManyParams.new(SimpleArgs.new(job_num: 2), insert_opts: River::InsertOpts.new(queue: "high_priority")) ]) ``` diff --git a/drivers/riverqueue-activerecord/Gemfile.lock b/drivers/riverqueue-activerecord/Gemfile.lock index a4f51e4..20d4642 100644 --- a/drivers/riverqueue-activerecord/Gemfile.lock +++ b/drivers/riverqueue-activerecord/Gemfile.lock @@ -2,6 +2,7 @@ PATH remote: ../.. specs: riverqueue (0.2.0) + fnv-hash PATH remote: . @@ -41,6 +42,7 @@ GEM diff-lcs (1.5.1) docile (1.4.0) drb (2.2.1) + fnv-hash (0.2.0) i18n (1.14.4) concurrent-ruby (~> 1.0) io-console (0.7.2) diff --git a/drivers/riverqueue-activerecord/lib/driver.rb b/drivers/riverqueue-activerecord/lib/driver.rb index 6eacf92..8c329f5 100644 --- a/drivers/riverqueue-activerecord/lib/driver.rb +++ b/drivers/riverqueue-activerecord/lib/driver.rb @@ -31,15 +31,32 @@ def errors = {} end end - def insert(insert_params) + def advisory_lock(key) + ::ActiveRecord::Base.connection.execute("SELECT pg_advisory_xact_lock(#{key})") + end + + def job_get_by_kind_and_unique_properties(get_params) + data_set = RiverJob.where(kind: get_params.kind) + data_set = data_set.where("tstzrange(?, ?, '[)') @> created_at", get_params.created_at[0], get_params.created_at[1]) if get_params.created_at + data_set = data_set.where(args: get_params.encoded_args) if get_params.encoded_args + data_set = data_set.where(queue: get_params.queue) if get_params.queue + data_set = data_set.where(state: get_params.state) if get_params.state + data_set.take + end + + def job_insert(insert_params) to_job_row(RiverJob.create(insert_params_to_hash(insert_params))) end - def insert_many(insert_params_many) + def job_insert_many(insert_params_many) RiverJob.insert_all(insert_params_many.map { |p| insert_params_to_hash(p) }) insert_params_many.count end + def transaction(&) + ::ActiveRecord::Base.transaction(requires_new: true, &) + end + private def insert_params_to_hash(insert_params) # the call to `#compact` is important so that we remove nils and table # default values get picked up instead diff --git a/drivers/riverqueue-activerecord/spec/driver_spec.rb b/drivers/riverqueue-activerecord/spec/driver_spec.rb index a501788..95fc03e 100644 --- a/drivers/riverqueue-activerecord/spec/driver_spec.rb +++ b/drivers/riverqueue-activerecord/spec/driver_spec.rb @@ -25,7 +25,155 @@ class SimpleArgsWithInsertOpts < SimpleArgs let!(:driver) { River::Driver::ActiveRecord.new } let(:client) { River::Client.new(driver) } - describe "#insert" do + describe "unique insertion" do + it "inserts a unique job once" do + args = SimpleArgsWithInsertOpts.new(job_num: 1) + args.insert_opts = River::InsertOpts.new( + unique_opts: River::UniqueOpts.new( + by_queue: true + ) + ) + + insert_res = client.insert(args) + expect(insert_res.job).to_not be_nil + expect(insert_res.unique_skipped_as_duplicated).to be false + original_job = insert_res.job + + insert_res = client.insert(args) + expect(insert_res.job.id).to eq(original_job.id) + expect(insert_res.unique_skipped_as_duplicated).to be true + end + + it "inserts a unique job with an advisory lock prefix" do + client = River::Client.new(driver, advisory_lock_prefix: 123456) + + args = SimpleArgsWithInsertOpts.new(job_num: 1) + args.insert_opts = River::InsertOpts.new( + unique_opts: River::UniqueOpts.new( + by_queue: true + ) + ) + + insert_res = client.insert(args) + expect(insert_res.job).to_not be_nil + expect(insert_res.unique_skipped_as_duplicated).to be false + original_job = insert_res.job + + insert_res = client.insert(args) + expect(insert_res.job.id).to eq(original_job.id) + expect(insert_res.unique_skipped_as_duplicated).to be true + end + end + + describe "#advisory_lock" do + it "takes an advisory lock" do + driver.advisory_lock(123) + end + end + + describe "#job_get_by_kind_and_unique_properties" do + let(:job_args) { SimpleArgs.new(job_num: 1) } + + it "gets a job by kind" do + insert_res = client.insert(job_args) + + job = driver.send( + :to_job_row, + driver.job_get_by_kind_and_unique_properties(River::Driver::JobGetByKindAndUniquePropertiesParam.new( + kind: job_args.kind + )) + ) + expect(job.id).to eq(insert_res.job.id) + + expect( + driver.job_get_by_kind_and_unique_properties(River::Driver::JobGetByKindAndUniquePropertiesParam.new( + kind: "does_not_exist" + )) + ).to be_nil + end + + it "gets a job by created at period" do + insert_res = client.insert(job_args) + + job = driver.send( + :to_job_row, + driver.job_get_by_kind_and_unique_properties(River::Driver::JobGetByKindAndUniquePropertiesParam.new( + kind: job_args.kind, + created_at: [insert_res.job.created_at - 1, insert_res.job.created_at + 1] + )) + ) + expect(job.id).to eq(insert_res.job.id) + + expect( + driver.job_get_by_kind_and_unique_properties(River::Driver::JobGetByKindAndUniquePropertiesParam.new( + kind: job_args.kind, + created_at: [insert_res.job.created_at + 1, insert_res.job.created_at + 3] + )) + ).to be_nil + end + + it "gets a job by encoded args" do + insert_res = client.insert(job_args) + + job = driver.send( + :to_job_row, + driver.job_get_by_kind_and_unique_properties(River::Driver::JobGetByKindAndUniquePropertiesParam.new( + kind: job_args.kind, + encoded_args: JSON.dump(insert_res.job.args) + )) + ) + expect(job.id).to eq(insert_res.job.id) + + expect( + driver.job_get_by_kind_and_unique_properties(River::Driver::JobGetByKindAndUniquePropertiesParam.new( + kind: job_args.kind, + encoded_args: JSON.dump({"job_num" => 2}) + )) + ).to be_nil + end + + it "gets a job by queue" do + insert_res = client.insert(job_args) + + job = driver.send( + :to_job_row, + driver.job_get_by_kind_and_unique_properties(River::Driver::JobGetByKindAndUniquePropertiesParam.new( + kind: job_args.kind, + queue: insert_res.job.queue + )) + ) + expect(job.id).to eq(insert_res.job.id) + + expect( + driver.job_get_by_kind_and_unique_properties(River::Driver::JobGetByKindAndUniquePropertiesParam.new( + kind: job_args.kind, + queue: "other_queue" + )) + ).to be_nil + end + + it "gets a job by state" do + insert_res = client.insert(job_args) + + job = driver.send( + :to_job_row, + driver.job_get_by_kind_and_unique_properties(River::Driver::JobGetByKindAndUniquePropertiesParam.new( + kind: job_args.kind, + state: [River::JOB_STATE_AVAILABLE, River::JOB_STATE_COMPLETED] + )) + ) + expect(job.id).to eq(insert_res.job.id) + + expect( + driver.job_get_by_kind_and_unique_properties(River::Driver::JobGetByKindAndUniquePropertiesParam.new( + kind: job_args.kind, + state: [River::JOB_STATE_RUNNING, River::JOB_STATE_SCHEDULED] + )) + ).to be_nil + end + end + + describe "#job_insert" do it "inserts a job" do insert_res = client.insert(SimpleArgs.new(job_num: 1)) expect(insert_res.job).to have_attributes( @@ -133,7 +281,7 @@ class SimpleArgsWithInsertOpts < SimpleArgs end end - describe "#insert_many" do + describe "#job_insert_many" do it "inserts multiple jobs" do num_inserted = client.insert_many([ SimpleArgs.new(job_num: 1), @@ -197,6 +345,25 @@ class SimpleArgsWithInsertOpts < SimpleArgs end end + describe "#transaction" do + it "runs block in a transaction" do + insert_res = nil + + driver.transaction do + insert_res = client.insert(SimpleArgs.new(job_num: 1)) + + river_job = River::Driver::ActiveRecord::RiverJob.find_by(id: insert_res.job.id) + expect(river_job).to_not be_nil + + raise ActiveRecord::Rollback + end + + # Not present because the job was rolled back. + river_job = River::Driver::ActiveRecord::RiverJob.find_by(id: insert_res.job.id) + expect(river_job).to be_nil + end + end + describe "#to_job_row" do it "converts a database record to `River::JobRow`" do now = Time.now.utc diff --git a/drivers/riverqueue-sequel/Gemfile.lock b/drivers/riverqueue-sequel/Gemfile.lock index d4aa8f5..fbb7cf5 100644 --- a/drivers/riverqueue-sequel/Gemfile.lock +++ b/drivers/riverqueue-sequel/Gemfile.lock @@ -2,6 +2,7 @@ PATH remote: ../.. specs: riverqueue (0.2.0) + fnv-hash PATH remote: . @@ -17,6 +18,7 @@ GEM bigdecimal (3.1.7) diff-lcs (1.5.1) docile (1.4.0) + fnv-hash (0.2.0) json (2.7.2) language_server-protocol (3.17.0.3) lint_roller (1.1.0) diff --git a/drivers/riverqueue-sequel/lib/driver.rb b/drivers/riverqueue-sequel/lib/driver.rb index 6973f94..5f615d1 100644 --- a/drivers/riverqueue-sequel/lib/driver.rb +++ b/drivers/riverqueue-sequel/lib/driver.rb @@ -21,15 +21,32 @@ def initialize(db) end end - def insert(insert_params) + def advisory_lock(key) + @db.fetch("SELECT pg_advisory_xact_lock(?)", key).first + end + + def job_get_by_kind_and_unique_properties(get_params) + data_set = RiverJob.where(kind: get_params.kind) + data_set = data_set.where(::Sequel.lit("tstzrange(?, ?, '[)') @> created_at", get_params.created_at[0], get_params.created_at[1])) if get_params.created_at + data_set = data_set.where(args: get_params.encoded_args) if get_params.encoded_args + data_set = data_set.where(queue: get_params.queue) if get_params.queue + data_set = data_set.where(state: get_params.state) if get_params.state + data_set.first + end + + def job_insert(insert_params) to_job_row(RiverJob.create(insert_params_to_hash(insert_params))) end - def insert_many(insert_params_many) + def job_insert_many(insert_params_many) RiverJob.multi_insert(insert_params_many.map { |p| insert_params_to_hash(p) }) insert_params_many.count end + def transaction(&) + @db.transaction(savepoint: true, &) + end + private def insert_params_to_hash(insert_params) # the call to `#compact` is important so that we remove nils and table # default values get picked up instead diff --git a/drivers/riverqueue-sequel/spec/driver_spec.rb b/drivers/riverqueue-sequel/spec/driver_spec.rb index 44b3b96..ef2abc1 100644 --- a/drivers/riverqueue-sequel/spec/driver_spec.rb +++ b/drivers/riverqueue-sequel/spec/driver_spec.rb @@ -25,7 +25,155 @@ class SimpleArgsWithInsertOpts < SimpleArgs let!(:driver) { River::Driver::Sequel.new(DB) } let(:client) { River::Client.new(driver) } - describe "#insert" do + describe "unique insertion" do + it "inserts a unique job once" do + args = SimpleArgsWithInsertOpts.new(job_num: 1) + args.insert_opts = River::InsertOpts.new( + unique_opts: River::UniqueOpts.new( + by_queue: true + ) + ) + + insert_res = client.insert(args) + expect(insert_res.job).to_not be_nil + expect(insert_res.unique_skipped_as_duplicated).to be false + original_job = insert_res.job + + insert_res = client.insert(args) + expect(insert_res.job.id).to eq(original_job.id) + expect(insert_res.unique_skipped_as_duplicated).to be true + end + + it "inserts a unique job with an advisory lock prefix" do + client = River::Client.new(driver, advisory_lock_prefix: 123456) + + args = SimpleArgsWithInsertOpts.new(job_num: 1) + args.insert_opts = River::InsertOpts.new( + unique_opts: River::UniqueOpts.new( + by_queue: true + ) + ) + + insert_res = client.insert(args) + expect(insert_res.job).to_not be_nil + expect(insert_res.unique_skipped_as_duplicated).to be false + original_job = insert_res.job + + insert_res = client.insert(args) + expect(insert_res.job.id).to eq(original_job.id) + expect(insert_res.unique_skipped_as_duplicated).to be true + end + end + + describe "#advisory_lock" do + it "takes an advisory lock" do + driver.advisory_lock(123) + end + end + + describe "#job_get_by_kind_and_unique_properties" do + let(:job_args) { SimpleArgs.new(job_num: 1) } + + it "gets a job by kind" do + insert_res = client.insert(job_args) + + job = driver.send( + :to_job_row, + driver.job_get_by_kind_and_unique_properties(River::Driver::JobGetByKindAndUniquePropertiesParam.new( + kind: job_args.kind + )) + ) + expect(job.id).to eq(insert_res.job.id) + + expect( + driver.job_get_by_kind_and_unique_properties(River::Driver::JobGetByKindAndUniquePropertiesParam.new( + kind: "does_not_exist" + )) + ).to be_nil + end + + it "gets a job by created at period" do + insert_res = client.insert(job_args) + + job = driver.send( + :to_job_row, + driver.job_get_by_kind_and_unique_properties(River::Driver::JobGetByKindAndUniquePropertiesParam.new( + kind: job_args.kind, + created_at: [insert_res.job.created_at - 1, insert_res.job.created_at + 1] + )) + ) + expect(job.id).to eq(insert_res.job.id) + + expect( + driver.job_get_by_kind_and_unique_properties(River::Driver::JobGetByKindAndUniquePropertiesParam.new( + kind: job_args.kind, + created_at: [insert_res.job.created_at + 1, insert_res.job.created_at + 3] + )) + ).to be_nil + end + + it "gets a job by encoded args" do + insert_res = client.insert(job_args) + + job = driver.send( + :to_job_row, + driver.job_get_by_kind_and_unique_properties(River::Driver::JobGetByKindAndUniquePropertiesParam.new( + kind: job_args.kind, + encoded_args: JSON.dump(insert_res.job.args) + )) + ) + expect(job.id).to eq(insert_res.job.id) + + expect( + driver.job_get_by_kind_and_unique_properties(River::Driver::JobGetByKindAndUniquePropertiesParam.new( + kind: job_args.kind, + encoded_args: JSON.dump({"job_num" => 2}) + )) + ).to be_nil + end + + it "gets a job by queue" do + insert_res = client.insert(job_args) + + job = driver.send( + :to_job_row, + driver.job_get_by_kind_and_unique_properties(River::Driver::JobGetByKindAndUniquePropertiesParam.new( + kind: job_args.kind, + queue: insert_res.job.queue + )) + ) + expect(job.id).to eq(insert_res.job.id) + + expect( + driver.job_get_by_kind_and_unique_properties(River::Driver::JobGetByKindAndUniquePropertiesParam.new( + kind: job_args.kind, + queue: "other_queue" + )) + ).to be_nil + end + + it "gets a job by state" do + insert_res = client.insert(job_args) + + job = driver.send( + :to_job_row, + driver.job_get_by_kind_and_unique_properties(River::Driver::JobGetByKindAndUniquePropertiesParam.new( + kind: job_args.kind, + state: [River::JOB_STATE_AVAILABLE, River::JOB_STATE_COMPLETED] + )) + ) + expect(job.id).to eq(insert_res.job.id) + + expect( + driver.job_get_by_kind_and_unique_properties(River::Driver::JobGetByKindAndUniquePropertiesParam.new( + kind: job_args.kind, + state: [River::JOB_STATE_RUNNING, River::JOB_STATE_SCHEDULED] + )) + ).to be_nil + end + end + + describe "#job_insert" do it "inserts a job" do insert_res = client.insert(SimpleArgs.new(job_num: 1)) expect(insert_res.job).to have_attributes( @@ -133,7 +281,7 @@ class SimpleArgsWithInsertOpts < SimpleArgs end end - describe "#insert_many" do + describe "#job_insert_many" do it "inserts multiple jobs" do num_inserted = client.insert_many([ SimpleArgs.new(job_num: 1), @@ -195,6 +343,25 @@ class SimpleArgsWithInsertOpts < SimpleArgs end end + describe "#transaction" do + it "runs block in a transaction" do + insert_res = nil + + driver.transaction do + insert_res = client.insert(SimpleArgs.new(job_num: 1)) + + river_job = River::Driver::Sequel::RiverJob.first(id: insert_res.job.id) + expect(river_job).to_not be_nil + + raise Sequel::Rollback + end + + # Not present because the job was rolled back. + river_job = River::Driver::Sequel::RiverJob.first(id: insert_res.job.id) + expect(river_job).to be_nil + end + end + describe "#to_job_row" do it "converts a database record to `River::JobRow`" do now = Time.now.utc diff --git a/lib/client.rb b/lib/client.rb index c4192a3..b6b8218 100644 --- a/lib/client.rb +++ b/lib/client.rb @@ -1,3 +1,6 @@ +require "fnv" +require "time" + module River MAX_ATTEMPTS_DEFAULT = 25 PRIORITY_DEFAULT = 1 @@ -16,10 +19,21 @@ module River # River drivers are found in separate gems like `riverqueue-sequel` to help # minimize transient dependencies. class Client - def initialize(driver) + def initialize(driver, advisory_lock_prefix: nil) @driver = driver + @advisory_lock_prefix = advisory_lock_prefix + @time_now_utc = -> { Time.now.utc } # for test time stubbing end + DEFAULT_UNIQUE_STATES = [ + JOB_STATE_AVAILABLE, + JOB_STATE_COMPLETED, + JOB_STATE_RUNNING, + JOB_STATE_RETRYABLE, + JOB_STATE_SCHEDULED + ].freeze + private_constant :DEFAULT_UNIQUE_STATES + EMPTY_INSERT_OPTS = InsertOpts.new.freeze private_constant :EMPTY_INSERT_OPTS @@ -65,8 +79,11 @@ def initialize(driver) # # Returns an instance of InsertResult. def insert(args, insert_opts: EMPTY_INSERT_OPTS) - job = @driver.insert(make_insert_params(args, insert_opts)) - InsertResult.new(job) + insert_params, unique_opts = make_insert_params(args, insert_opts) + check_unique_job(insert_params, unique_opts) do + job = @driver.job_insert(insert_params) + InsertResult.new(job) + end end # Inserts many new jobs as part of a single batch operation for improved @@ -111,20 +128,90 @@ def insert(args, insert_opts: EMPTY_INSERT_OPTS) # # See also JobArgsHash for an easy way to insert a job from a hash. # + # Unique job insertion isn't supported with bulk insertion because it'd run + # the risk of major lock contention. + # # Returns the number of jobs inserted. def insert_many(args) all_params = args.map do |arg| if arg.is_a?(InsertManyParams) - make_insert_params(arg.args, arg.insert_opts || EMPTY_INSERT_OPTS) + make_insert_params(arg.args, arg.insert_opts || EMPTY_INSERT_OPTS, is_insert_many: true).first # unique opts ignored on batch insert else # jobArgs - make_insert_params(arg, EMPTY_INSERT_OPTS) + make_insert_params(arg, EMPTY_INSERT_OPTS, is_insert_many: true).first # unique opts ignored on batch insert end end - @driver.insert_many(all_params) + @driver.job_insert_many(all_params) end - private def make_insert_params(args, insert_opts) + private def check_unique_job(insert_params, unique_opts, &block) + return block.call if unique_opts.nil? + + any_changes = false + get_params = Driver::JobGetByKindAndUniquePropertiesParam.new(kind: insert_params.kind) + + # It's extremely important here that this lock string format and algorithm + # match the one in the main River library _exactly_. Don't change them + # unless they're updated everywhere. + lock_str = "unique_key" + lock_str += "kind=#{insert_params.kind}" + + if unique_opts.by_args + any_changes = true + get_params.encoded_args = insert_params.encoded_args + lock_str += "&args=#{insert_params.encoded_args}" + end + + if unique_opts.by_period + lower_period_bound = truncate_time(@time_now_utc.call, unique_opts.by_period).utc + + any_changes = true + get_params.created_at = [lower_period_bound, lower_period_bound + unique_opts.by_period] + lock_str += "&period=#{lower_period_bound.strftime("%FT%TZ")}" + end + + if unique_opts.by_queue + any_changes = true + get_params.queue = insert_params.queue + lock_str += "&queue=#{insert_params.queue}" + end + + if unique_opts.by_state + any_changes = true + get_params.state = unique_opts.by_state + lock_str += "&state=#{unique_opts.by_state.join(",")}" + else + get_params.state = DEFAULT_UNIQUE_STATES + lock_str += "&state=#{DEFAULT_UNIQUE_STATES.join(",")}" + end + + return block.call unless any_changes + + @driver.transaction do + lock_key = if @advisory_lock_prefix.nil? + Fnv::Hash.fnv_1(lock_str, size: 64) + else + # Steep should be able to tell that this is not nil, but it can't. + prefix = @advisory_lock_prefix #: Integer # rubocop:disable Layout/LeadingCommentSpace + prefix << 32 | Fnv::Hash.fnv_1(lock_str, size: 32) + end + + # Packs a uint64 then unpacks to int64, which we need to do to keep the + # value within the bounds of Postgres' bigint. Overflow is okay because + # we can use the full bigint space (including negative numbers) for the + # advisory lock. + lock_key = uint64_to_int64(lock_key) + + @driver.advisory_lock(lock_key) + + existing_job = @driver.job_get_by_kind_and_unique_properties(get_params) + return InsertResult.new(existing_job, unique_skipped_as_duplicated: true) if existing_job + + block.call + end + end + + private def make_insert_params(args, insert_opts, is_insert_many: false) raise "args should respond to `#kind`" if !args.respond_to?(:kind) # ~all objects in Ruby respond to `#to_json`, so check non-nil instead. @@ -139,17 +226,37 @@ def insert_many(args) end scheduled_at = insert_opts.scheduled_at || args_insert_opts.scheduled_at + unique_opts = insert_opts.unique_opts || args_insert_opts.unique_opts + + raise ArgumentError, "unique opts can't be used with `#insert_many`" if is_insert_many && unique_opts + + [ + Driver::JobInsertParams.new( + encoded_args: args_json, + kind: args.kind, + max_attempts: insert_opts.max_attempts || args_insert_opts.max_attempts || MAX_ATTEMPTS_DEFAULT, + priority: insert_opts.priority || args_insert_opts.priority || PRIORITY_DEFAULT, + queue: insert_opts.queue || args_insert_opts.queue || QUEUE_DEFAULT, + scheduled_at: scheduled_at&.utc, # database defaults to now + state: scheduled_at ? JOB_STATE_SCHEDULED : JOB_STATE_AVAILABLE, + tags: insert_opts.tags || args_insert_opts.tags + ), + unique_opts + ] + end - Driver::JobInsertParams.new( - encoded_args: args_json, - kind: args.kind, - max_attempts: insert_opts.max_attempts || args_insert_opts.max_attempts || MAX_ATTEMPTS_DEFAULT, - priority: insert_opts.priority || args_insert_opts.priority || PRIORITY_DEFAULT, - queue: insert_opts.queue || args_insert_opts.queue || QUEUE_DEFAULT, - scheduled_at: scheduled_at&.utc, # database defaults to now - state: scheduled_at ? JOB_STATE_SCHEDULED : JOB_STATE_AVAILABLE, - tags: insert_opts.tags || args_insert_opts.tags - ) + # Truncates the given time down to the interval. For example: + # + # Thu Jan 15 21:26:36 UTC 2024 @ 15 minutes -> + # Thu Jan 15 21:15:00 UTC 2024 + private def truncate_time(time, interval_seconds) + Time.at((time.to_f / interval_seconds).floor * interval_seconds) + end + + # Moves an integer that may occupy the entire uint64 space to one that's + # bounded within int64. Allows overflow. + private def uint64_to_int64(int) + [int].pack("Q").unpack1("q") #: Integer # rubocop:disable Layout/LeadingCommentSpace end end @@ -170,11 +277,17 @@ def initialize(args, insert_opts: nil) # Result of a single insertion. class InsertResult - # Inserted job row. + # Inserted job row, or an existing job row if insert was skipped due to a + # previously existing unique job. attr_reader :job - def initialize(job) + # True if for a unique job, the insertion was skipped due to an equivalent + # job matching unique property already being present. + attr_reader :unique_skipped_as_duplicated + + def initialize(job, unique_skipped_as_duplicated: false) @job = job + @unique_skipped_as_duplicated = unique_skipped_as_duplicated end end end diff --git a/lib/driver.rb b/lib/driver.rb index 97045fd..a3c11c6 100644 --- a/lib/driver.rb +++ b/lib/driver.rb @@ -4,6 +4,29 @@ module River # considered to be for internal use only and subject to change. API stability # is not guaranteed. module Driver + # Parameters for looking up a job by kind and unique properties. + class JobGetByKindAndUniquePropertiesParam + attr_accessor :created_at + attr_accessor :encoded_args + attr_accessor :kind + attr_accessor :queue + attr_accessor :state + + def initialize( + kind:, + created_at: nil, + encoded_args: nil, + queue: nil, + state: nil + ) + self.kind = kind + self.created_at = created_at + self.encoded_args = encoded_args + self.queue = queue + self.state = state + end + end + # Insert parameters for a job. This is sent to underlying drivers and is meant # for internal use only. Its interface is subject to change. class JobInsertParams @@ -16,13 +39,6 @@ class JobInsertParams attr_accessor :state attr_accessor :tags - # TODO(brandur): Get these supported. - # attr_accessor :unique - # attr_accessor :unique_by_args - # attr_accessor :unique_by_period - # attr_accessor :unique_by_queue - # attr_accessor :unique_by_state - def initialize( encoded_args:, kind:, diff --git a/lib/insert_opts.rb b/lib/insert_opts.rb index 98035c0..5c07fa8 100644 --- a/lib/insert_opts.rb +++ b/lib/insert_opts.rb @@ -5,42 +5,40 @@ class InsertOpts # discarded. attr_accessor :max_attempts - # Priority is the priority of the job, with 1 being the highest priority and - # 4 being the lowest. When fetching available jobs to work, the highest - # priority jobs will always be fetched before any lower priority jobs are - # fetched. Note that if your workers are swamped with more high-priority jobs - # then they can handle, lower priority jobs may not be fetched. + # The priority of the job, with 1 being the highest priority and 4 being the + # lowest. When fetching available jobs to work, the highest priority jobs + # will always be fetched before any lower priority jobs are fetched. Note + # that if your workers are swamped with more high-priority jobs then they + # can handle, lower priority jobs may not be fetched. # # Defaults to PRIORITY_DEFAULT. attr_accessor :priority - # Queue is the name of the job queue in which to insert the job. + # The name of the job queue in which to insert the job. # # Defaults to QUEUE_DEFAULT. attr_accessor :queue - # ScheduledAt is a time in future at which to schedule the job (i.e. in - # cases where it shouldn't be run immediately). The job is guaranteed not - # to run before this time, but may run slightly after depending on the - # number of other scheduled jobs and how busy the queue is. + # A time in future at which to schedule the job (i.e. in cases where it + # shouldn't be run immediately). The job is guaranteed not to run before + # this time, but may run slightly after depending on the number of other + # scheduled jobs and how busy the queue is. # # Use of this option generally only makes sense when passing options into # Insert rather than when a job args is returning `#insert_opts`, however, # it will work in both cases. attr_accessor :scheduled_at - # Tags are an arbitrary list of keywords to add to the job. They have no - # functional behavior and are meant entirely as a user-specified construct - # to help group and categorize jobs. + # An arbitrary list of keywords to add to the job. They have no functional + # behavior and are meant entirely as a user-specified construct to help + # group and categorize jobs. # # If tags are specified from both a job args override and from options on # Insert, the latter takes precedence. Tags are not merged. attr_accessor :tags - # UniqueOpts returns options relating to job uniqueness. An empty struct - # avoids setting any worker-level unique options. - # - # TODO: Implement. + # Options relating to job uniqueness. No unique options means that the job + # is never treated as unique. attr_accessor :unique_opts def initialize( @@ -60,6 +58,77 @@ def initialize( end end + # Parameters for uniqueness for a job. + # + # If all properties are nil, no uniqueness at is enforced. As each property is + # initialized, it's added as a dimension on the uniqueness matrix, and with + # any property on, the job's kind always counts toward uniqueness. + # + # So for example, if only #by_queue is on, then for the given job kind, only a + # single instance is allowed in any given queue, regardless of other + # properties on the job. If both #by_args and #by_queue are on, then for the + # given job kind, a single instance is allowed for each combination of args + # and queues. If either args or queue is changed on a new job, it's allowed to + # be inserted as a new job. + # + # Uniquenes is checked at insert time by taking a Postgres advisory lock, + # doing a look up for an equivalent row, and inserting only if none was found. + # There's no database-level mechanism that guarantees jobs stay unique, so if + # an equivalent row is inserted out of band (or batch inserted, where a unique + # check doesn't occur), it's conceivable that duplicates could coexist. class UniqueOpts + # Indicates that uniqueness should be enforced for any specific instance of + # encoded args for a job. + # + # Default is false, meaning that as long as any other unique property is + # enabled, uniqueness will be enforced for a kind regardless of input args. + attr_accessor :by_args + + # Defines uniqueness within a given period. On an insert time is rounded + # down to the nearest multiple of the given period, and a job is only + # inserted if there isn't an existing job that will run between then and the + # next multiple of the period. + # + # The period should be specified in seconds. So a job that's unique every 15 + # minute period would have a value of 900. + # + # Default is no unique period, meaning that as long as any other unique + # property is enabled, uniqueness will be enforced across all jobs of the + # kind in the database, regardless of when they were scheduled. + attr_accessor :by_period + + # Indicates that uniqueness should be enforced within each queue. + # + # Default is false, meaning that as long as any other unique property is + # enabled, uniqueness will be enforced for a kind across all queues. + attr_accessor :by_queue + + # Indicates that uniqueness should be enforced across any of the states in + # the given set. For example, if the given states were `(scheduled, + # running)` then a new job could be inserted even if one of the same kind + # was already being worked by the queue (new jobs are inserted as + # `available`). + # + # Unlike other unique options, ByState gets a default when it's not set for + # user convenience. The default is equivalent to: + # + # ByState: []rivertype.JobState{rivertype.JobStateAvailable, rivertype.JobStateCompleted, rivertype.JobStateRunning, rivertype.JobStateRetryable, rivertype.JobStateScheduled} + # + # With this setting, any jobs of the same kind that have been completed or + # discarded, but not yet cleaned out by the system, won't count towards the + # uniqueness of a new insert. + attr_accessor :by_state + + def initialize( + by_args: nil, + by_period: nil, + by_queue: nil, + by_state: nil + ) + self.by_args = by_args + self.by_period = by_period + self.by_queue = by_queue + self.by_state = by_state + end end end diff --git a/riverqueue.gemspec b/riverqueue.gemspec index 2cd7ed8..08ed85d 100644 --- a/riverqueue.gemspec +++ b/riverqueue.gemspec @@ -8,4 +8,6 @@ Gem::Specification.new do |s| s.files = ["lib/riverqueue.rb"] s.homepage = "https://riverqueue.com" s.license = "LGPL-3.0-or-later" + + s.add_dependency "fnv-hash" end diff --git a/sig/client.rbs b/sig/client.rbs index 76cf8f8..40f55f0 100644 --- a/sig/client.rbs +++ b/sig/client.rbs @@ -4,15 +4,21 @@ module River QUEUE_DEFAULT: String class Client + @advisory_lock_prefix: Integer? @driver: _Driver + @time_now_utc: ^() -> Time + DEFAULT_UNIQUE_STATES: Array[jobStateAll] EMPTY_INSERT_OPTS: InsertOpts - def initialize: (_Driver driver) -> void + def initialize: (_Driver driver, ?advisory_lock_prefix: Integer?) -> void def insert: (jobArgs, ?insert_opts: InsertOpts) -> InsertResult def insert_many: (Array[jobArgs | InsertManyParams]) -> Integer - private def make_insert_params: (jobArgs, InsertOpts) -> Driver::JobInsertParams + private def check_unique_job: (Driver::JobInsertParams, UniqueOpts?) { () -> InsertResult } -> InsertResult + private def uint64_to_int64: (Integer) -> Integer + private def make_insert_params: (jobArgs, InsertOpts, ?is_insert_many: bool) -> [Driver::JobInsertParams, UniqueOpts?] + private def truncate_time: (Time, Integer) -> Time end class InsertManyParams @@ -28,9 +34,11 @@ module River class InsertResult @job: JobRow + @unique_skipped_as_duplicated: bool attr_reader job: JobRow + attr_reader unique_skipped_as_duplicated: bool - def initialize: (JobRow job) -> void + def initialize: (JobRow job, ?unique_skipped_as_duplicated: bool) -> void end end diff --git a/sig/driver.rbs b/sig/driver.rbs index 21b5516..ddfac56 100644 --- a/sig/driver.rbs +++ b/sig/driver.rbs @@ -1,10 +1,23 @@ module River interface _Driver - def insert: (Driver::JobInsertParams) -> JobRow - def insert_many: (Array[Driver::JobInsertParams]) -> Integer + def advisory_lock: (Integer) -> void + def job_get_by_kind_and_unique_properties: (Driver::JobGetByKindAndUniquePropertiesParam) -> JobRow? + def job_insert: (Driver::JobInsertParams) -> JobRow + def job_insert_many: (Array[Driver::JobInsertParams]) -> Integer + def transaction: [T] () { () -> T } -> T end module Driver + class JobGetByKindAndUniquePropertiesParam + attr_accessor created_at: [Time, Time]? + attr_accessor encoded_args: String? + attr_accessor kind: String + attr_accessor queue: String? + attr_accessor state: Array[jobStateAll]? + + def initialize: (kind: String, ?created_at: [Time, Time]?, ?encoded_args: String?, ?queue: String?, ?state: Array[jobStateAll]?) -> void + end + class JobInsertParams attr_accessor encoded_args: String attr_accessor kind: String @@ -15,13 +28,6 @@ module River attr_accessor state: jobStateAll attr_accessor tags: Array[String]? - # TODO(brandur): Get these supported. - # attr_accessor :unique - # attr_accessor :unique_by_args - # attr_accessor :unique_by_period - # attr_accessor :unique_by_queue - # attr_accessor :unique_by_state - def initialize: (encoded_args: String, kind: String, max_attempts: Integer, priority: Integer, queue: String, scheduled_at: Time?, state: jobStateAll, tags: Array[String]?) -> void end end diff --git a/sig/insert_opts.rbs b/sig/insert_opts.rbs index 698f692..69ad0ca 100644 --- a/sig/insert_opts.rbs +++ b/sig/insert_opts.rbs @@ -11,6 +11,11 @@ module River end class UniqueOpts - # TODO + attr_accessor by_args: bool? + attr_accessor by_period: Integer? + attr_accessor by_queue: bool? + attr_accessor by_state: Array[jobStateAll]? + + def initialize: (?by_args: bool?, ?by_period: Integer?, ?by_queue: bool?, ?by_state: Array[jobStateAll]?) -> void end end diff --git a/sig_gem/fnv-hash/fnv.rbs b/sig_gem/fnv-hash/fnv.rbs new file mode 100644 index 0000000..2963be6 --- /dev/null +++ b/sig_gem/fnv-hash/fnv.rbs @@ -0,0 +1,21 @@ +module Fnv + class Hash + # Calculates the FNV-1 hash for the given + # item value + # + # @param item The item to hash + # @param size [Integer] the size of the resulting hash + # + # @return [Integer] the calculated hash value + def self.fnv_1: (Object, size: Integer) -> Integer + + # Calculates the FNV-1a hash for the given + # item value + # + # @param item The item to hash + # @param size [Integer] the size of the resulting hash + # + # @return [Integer] the calculated hash value + def self.fnv_1a: (Object, size: Integer) -> Integer + end +end \ No newline at end of file diff --git a/spec/client_spec.rb b/spec/client_spec.rb index 67915cb..abba887 100644 --- a/spec/client_spec.rb +++ b/spec/client_spec.rb @@ -3,25 +3,44 @@ # We use a mock here, but each driver has a more comprehensive test suite that # performs full integration level tests. class MockDriver + attr_accessor :advisory_lock_calls attr_accessor :inserted_jobs + attr_accessor :job_get_by_kind_and_unique_properties_calls + attr_accessor :job_get_by_kind_and_unique_properties_returns def initialize + @advisory_lock_calls = [] @inserted_jobs = [] + @job_get_by_kind_and_unique_properties_calls = [] + @job_get_by_kind_and_unique_properties_returns = [] @next_id = 0 end - def insert(insert_params) + def advisory_lock(key) + @advisory_lock_calls << key + end + + def job_get_by_kind_and_unique_properties(get_params) + @job_get_by_kind_and_unique_properties_calls << get_params + job_get_by_kind_and_unique_properties_returns.shift + end + + def job_insert(insert_params) insert_params_to_jow_row(insert_params) end - def insert_many(insert_params_many) + def job_insert_many(insert_params_many) insert_params_many.each do |insert_params| insert_params_to_jow_row(insert_params) end insert_params_many.count end - private def insert_params_to_jow_row(insert_params) + def transaction(&) + yield + end + + def insert_params_to_jow_row(insert_params) job = River::JobRow.new( id: (@next_id += 1), args: JSON.parse(insert_params.encoded_args), @@ -174,6 +193,123 @@ def to_json = nil client.insert(args_klass.new) end.to raise_error(RuntimeError, "args should return non-nil from `#to_json`") end + + def check_bigint_bounds(int) + raise "lock key shouldn't be larger than Postgres bigint max (9223372036854775807); was: #{int}" if int > 9223372036854775807 + raise "lock key shouldn't be smaller than Postgres bigint min (-9223372036854775808); was: #{int}" if int < -9223372036854775808 + int + end + + # These unique insertion specs are pretty mock heavy, but each of the + # individual drivers has their own unique insert tests that make sure to do + # a full round trip. + describe "unique opts" do + let(:now) { Time.now.utc } + before { client.instance_variable_set(:@time_now_utc, -> { now }) } + + it "inserts a new unique job with minimal options" do + args = SimpleArgsWithInsertOpts.new(job_num: 1) + args.insert_opts = River::InsertOpts.new( + unique_opts: River::UniqueOpts.new( + by_queue: true + ) + ) + + insert_res = client.insert(args) + expect(insert_res.job).to_not be_nil + expect(insert_res.unique_skipped_as_duplicated).to be false + + lock_str = "unique_keykind=#{args.kind}" \ + "&queue=#{River::QUEUE_DEFAULT}" \ + "&state=#{River::Client.const_get(:DEFAULT_UNIQUE_STATES).join(",")}" + expect(mock_driver.advisory_lock_calls).to eq([check_bigint_bounds(client.send(:uint64_to_int64, Fnv::Hash.fnv_1(lock_str, size: 64)))]) + end + + it "inserts a new unique job with all options" do + args = SimpleArgsWithInsertOpts.new(job_num: 1) + args.insert_opts = River::InsertOpts.new( + unique_opts: River::UniqueOpts.new( + by_args: true, + by_period: 15 * 60, + by_queue: true, + by_state: [River::JOB_STATE_AVAILABLE] + ) + ) + + insert_res = client.insert(args) + expect(insert_res.job).to_not be_nil + expect(insert_res.unique_skipped_as_duplicated).to be false + + lock_str = "unique_keykind=#{args.kind}" \ + "&args=#{JSON.dump({job_num: 1})}" \ + "&period=#{client.send(:truncate_time, now, 15 * 60).utc.strftime("%FT%TZ")}" \ + "&queue=#{River::QUEUE_DEFAULT}" \ + "&state=#{[River::JOB_STATE_AVAILABLE].join(",")}" + expect(mock_driver.advisory_lock_calls).to eq([check_bigint_bounds(client.send(:uint64_to_int64, Fnv::Hash.fnv_1(lock_str, size: 64)))]) + end + + it "inserts a new unique job with advisory lock prefix" do + client = River::Client.new(mock_driver, advisory_lock_prefix: 123456) + + args = SimpleArgsWithInsertOpts.new(job_num: 1) + args.insert_opts = River::InsertOpts.new( + unique_opts: River::UniqueOpts.new( + by_queue: true + ) + ) + + insert_res = client.insert(args) + expect(insert_res.job).to_not be_nil + expect(insert_res.unique_skipped_as_duplicated).to be false + + lock_str = "unique_keykind=#{args.kind}" \ + "&queue=#{River::QUEUE_DEFAULT}" \ + "&state=#{River::Client.const_get(:DEFAULT_UNIQUE_STATES).join(",")}" + expect(mock_driver.advisory_lock_calls).to eq([check_bigint_bounds(client.send(:uint64_to_int64, 123456 << 32 | Fnv::Hash.fnv_1(lock_str, size: 32)))]) + + lock_key = mock_driver.advisory_lock_calls[0] + expect(lock_key >> 32).to eq(123456) + end + + it "gets an existing unique job" do + args = SimpleArgsWithInsertOpts.new(job_num: 1) + args.insert_opts = River::InsertOpts.new( + unique_opts: River::UniqueOpts.new( + by_args: true, + by_period: 15 * 60, + by_queue: true, + by_state: [River::JOB_STATE_AVAILABLE] + ) + ) + + job = mock_driver.insert_params_to_jow_row(client.send(:make_insert_params, args, River::InsertOpts.new)[0]) + mock_driver.job_get_by_kind_and_unique_properties_returns << job + + insert_res = client.insert(args) + expect(insert_res).to have_attributes( + job: job, + unique_skipped_as_duplicated: true + ) + + lock_str = "unique_keykind=#{args.kind}" \ + "&args=#{JSON.dump({job_num: 1})}" \ + "&period=#{client.send(:truncate_time, now, 15 * 60).utc.strftime("%FT%TZ")}" \ + "&queue=#{River::QUEUE_DEFAULT}" \ + "&state=#{[River::JOB_STATE_AVAILABLE].join(",")}" + expect(mock_driver.advisory_lock_calls).to eq([check_bigint_bounds(client.send(:uint64_to_int64, Fnv::Hash.fnv_1(lock_str, size: 64)))]) + end + + it "skips unique check if unique opts empty" do + args = SimpleArgsWithInsertOpts.new(job_num: 1) + args.insert_opts = River::InsertOpts.new( + unique_opts: River::UniqueOpts.new + ) + + insert_res = client.insert(args) + expect(insert_res.job).to_not be_nil + expect(insert_res.unique_skipped_as_duplicated).to be false + end + end end describe "#insert_many" do @@ -303,6 +439,34 @@ def to_json = nil tags: ["custom_2"] ) end + + it "raises error with unique opts" do + expect do + client.insert_many([ + River::InsertManyParams.new(SimpleArgs.new(job_num: 1), insert_opts: River::InsertOpts.new( + unique_opts: River::UniqueOpts.new + )) + ]) + end.to raise_error(ArgumentError, "unique opts can't be used with `#insert_many`") + end + end + + describe "#truncate_time" do + it "truncates times to nearest interval" do + expect(client.send(:truncate_time, Time.parse("Thu Jan 15 21:26:36 UTC 2024").utc, 1 * 60).utc).to eq(Time.parse("Thu Jan 15 21:26:00 UTC 2024")) # rubocop:disable Layout/ExtraSpacing + expect(client.send(:truncate_time, Time.parse("Thu Jan 15 21:26:36 UTC 2024").utc, 5 * 60).utc).to eq(Time.parse("Thu Jan 15 21:25:00 UTC 2024")) # rubocop:disable Layout/ExtraSpacing + expect(client.send(:truncate_time, Time.parse("Thu Jan 15 21:26:36 UTC 2024").utc, 15 * 60).utc).to eq(Time.parse("Thu Jan 15 21:15:00 UTC 2024")) # rubocop:disable Layout/ExtraSpacing + expect(client.send(:truncate_time, Time.parse("Thu Jan 15 21:26:36 UTC 2024").utc, 1 * 60 * 60).utc).to eq(Time.parse("Thu Jan 15 21:00:00 UTC 2024")) # rubocop:disable Layout/ExtraSpacing + expect(client.send(:truncate_time, Time.parse("Thu Jan 15 21:26:36 UTC 2024").utc, 5 * 60 * 60).utc).to eq(Time.parse("Thu Jan 15 17:00:00 UTC 2024")) # rubocop:disable Layout/ExtraSpacing + expect(client.send(:truncate_time, Time.parse("Thu Jan 15 21:26:36 UTC 2024").utc, 24 * 60 * 60).utc).to eq(Time.parse("Thu Jan 15 00:00:00 UTC 2024")) + end + end + + describe "#uint64_to_int64" do + it "converts between integer types" do + expect(client.send(:uint64_to_int64, 123456)).to eq(123456) + expect(client.send(:uint64_to_int64, 13977996710702069744)).to eq(-4468747363007481872) + end end end