Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ jobs:
matrix:
ruby_version: [2.5.x, 2.6.x, 2.7.x]
gemfile: ["4.2", "5.2", "6.0"]
postgres_version: [9, 10, 11]
postgres_version: [9, 10, 11, 12]
exclude:
- { gemfile: "4.2", ruby_version: "2.7.x" }
services:
Expand Down
4 changes: 4 additions & 0 deletions lib/que/connection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,10 @@ def drain_notifications
loop { break if next_notification.nil? }
end

def server_version
wrapped_connection.server_version
end

def in_transaction?
wrapped_connection.transaction_status != ::PG::PQTRANS_IDLE
end
Expand Down
8 changes: 8 additions & 0 deletions lib/que/connection.spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,14 @@
end
end

describe "server_version" do
it "returns the PostgreSQL server version" do
assert_instance_of Integer, connection.server_version
assert_operator connection.server_version, :>=, 9_06_00
assert_operator connection.server_version, :<, 99_00_00
end
end

describe "in_transaction?" do
it "should know when it is in a transaction" do
refute connection.in_transaction?
Expand Down
4 changes: 3 additions & 1 deletion lib/que/locker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -393,10 +393,12 @@ def lock_jobs(metajobs)
}
end

materalize_cte = connection.server_version >= 12_00_00

jobs =
connection.execute \
<<-SQL
WITH jobs AS (SELECT * FROM que_jobs WHERE id IN (#{ids.join(', ')}))
WITH jobs AS #{materalize_cte ? 'MATERIALIZED' : ''} (SELECT * FROM que_jobs WHERE id IN (#{ids.join(', ')}))
SELECT * FROM jobs WHERE pg_try_advisory_lock(id)
SQL

Expand Down
17 changes: 17 additions & 0 deletions lib/que/locker.spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,23 @@ def assert_que_locker_insertion(
locker.stop!
end

it "locks only accepted jobs in a listen batch" do
locker_settings[:poll] = false
locker
sleep_until_equal(1) { DB[:que_lockers].count }

Que.execute <<~SQL
INSERT INTO que_jobs (job_class, priority)
SELECT 'Que::Job', 1
FROM generate_series(1, 10) AS i;
SQL

sleep_until_equal(2) { active_jobs_dataset.count }
sleep_until_equal(0) { locked_ids.size }

locker.stop!
end

it "should repeat batch polls until there are no more available jobs" do
Que.execute <<-SQL
INSERT INTO que_jobs (job_class, priority)
Expand Down
2 changes: 1 addition & 1 deletion lib/que/migrations.spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

default = proc do
result = Que.execute <<-SQL
select adsrc::integer
select pg_get_expr(adbin, adrelid)::integer AS adsrc
from pg_attribute a
join pg_class c on c.oid = a.attrelid
join pg_attrdef on adrelid = attrelid AND adnum = attnum
Expand Down