diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 75f6ded8..bcd854dd 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -9,7 +9,7 @@ jobs: matrix: ruby_version: [2.5.x, 2.6.x, 2.7.x] gemfile: ["4.2", "5.2", "6.0"] - postgres_version: [9, 10, 11] + postgres_version: [9, 10, 11, 12] exclude: - { gemfile: "4.2", ruby_version: "2.7.x" } services: diff --git a/lib/que/connection.rb b/lib/que/connection.rb index 30adf777..820562da 100644 --- a/lib/que/connection.rb +++ b/lib/que/connection.rb @@ -119,6 +119,10 @@ def drain_notifications loop { break if next_notification.nil? } end + def server_version + wrapped_connection.server_version + end + def in_transaction? wrapped_connection.transaction_status != ::PG::PQTRANS_IDLE end diff --git a/lib/que/connection.spec.rb b/lib/que/connection.spec.rb index 74636d62..0379cf01 100644 --- a/lib/que/connection.spec.rb +++ b/lib/que/connection.spec.rb @@ -45,6 +45,14 @@ end end + describe "server_version" do + it "returns the PostgreSQL server version" do + assert_instance_of Integer, connection.server_version + assert_operator connection.server_version, :>=, 9_06_00 + assert_operator connection.server_version, :<, 99_00_00 + end + end + describe "in_transaction?" do it "should know when it is in a transaction" do refute connection.in_transaction? diff --git a/lib/que/locker.rb b/lib/que/locker.rb index 202ae899..651e6ec5 100644 --- a/lib/que/locker.rb +++ b/lib/que/locker.rb @@ -393,10 +393,12 @@ def lock_jobs(metajobs) } end + materalize_cte = connection.server_version >= 12_00_00 + jobs = connection.execute \ <<-SQL - WITH jobs AS (SELECT * FROM que_jobs WHERE id IN (#{ids.join(', ')})) + WITH jobs AS #{materalize_cte ? 'MATERIALIZED' : ''} (SELECT * FROM que_jobs WHERE id IN (#{ids.join(', ')})) SELECT * FROM jobs WHERE pg_try_advisory_lock(id) SQL diff --git a/lib/que/locker.spec.rb b/lib/que/locker.spec.rb index 91be86cd..50172f4c 100644 --- a/lib/que/locker.spec.rb +++ b/lib/que/locker.spec.rb @@ -239,6 +239,23 @@ def assert_que_locker_insertion( locker.stop! end + it "locks only accepted jobs in a listen batch" do + locker_settings[:poll] = false + locker + sleep_until_equal(1) { DB[:que_lockers].count } + + Que.execute <<~SQL + INSERT INTO que_jobs (job_class, priority) + SELECT 'Que::Job', 1 + FROM generate_series(1, 10) AS i; + SQL + + sleep_until_equal(2) { active_jobs_dataset.count } + sleep_until_equal(0) { locked_ids.size } + + locker.stop! + end + it "should repeat batch polls until there are no more available jobs" do Que.execute <<-SQL INSERT INTO que_jobs (job_class, priority) diff --git a/lib/que/migrations.spec.rb b/lib/que/migrations.spec.rb index 83120098..15eeb026 100644 --- a/lib/que/migrations.spec.rb +++ b/lib/que/migrations.spec.rb @@ -9,7 +9,7 @@ default = proc do result = Que.execute <<-SQL - select adsrc::integer + select pg_get_expr(adbin, adrelid)::integer AS adsrc from pg_attribute a join pg_class c on c.oid = a.attrelid join pg_attrdef on adrelid = attrelid AND adnum = attnum