Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 53 additions & 0 deletions src/riverqueue/driver/riversqlalchemy/dbsqlc/river_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,13 @@
"""


JOB_GET_BY_ID = """-- name: job_get_by_id \\:one
SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags
FROM river_job
WHERE id = :p1
"""


JOB_GET_BY_KIND_AND_UNIQUE_PROPERTIES = """-- name: job_get_by_kind_and_unique_properties \\:one
SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags
FROM river_job
Expand Down Expand Up @@ -153,6 +160,29 @@ def job_get_all(self) -> Iterator[models.RiverJob]:
tags=row[15],
)

def job_get_by_id(self, *, id: int) -> Optional[models.RiverJob]:
row = self._conn.execute(sqlalchemy.text(JOB_GET_BY_ID), {"p1": id}).first()
if row is None:
return None
return models.RiverJob(
id=row[0],
args=row[1],
attempt=row[2],
attempted_at=row[3],
attempted_by=row[4],
created_at=row[5],
errors=row[6],
finalized_at=row[7],
kind=row[8],
max_attempts=row[9],
metadata=row[10],
priority=row[11],
queue=row[12],
state=row[13],
scheduled_at=row[14],
tags=row[15],
)

def job_get_by_kind_and_unique_properties(self, arg: JobGetByKindAndUniquePropertiesParams) -> Optional[models.RiverJob]:
row = self._conn.execute(sqlalchemy.text(JOB_GET_BY_KIND_AND_UNIQUE_PROPERTIES), {
"p1": arg.kind,
Expand Down Expand Up @@ -263,6 +293,29 @@ async def job_get_all(self) -> AsyncIterator[models.RiverJob]:
tags=row[15],
)

async def job_get_by_id(self, *, id: int) -> Optional[models.RiverJob]:
row = (await self._conn.execute(sqlalchemy.text(JOB_GET_BY_ID), {"p1": id})).first()
if row is None:
return None
return models.RiverJob(
id=row[0],
args=row[1],
attempt=row[2],
attempted_at=row[3],
attempted_by=row[4],
created_at=row[5],
errors=row[6],
finalized_at=row[7],
kind=row[8],
max_attempts=row[9],
metadata=row[10],
priority=row[11],
queue=row[12],
state=row[13],
scheduled_at=row[14],
tags=row[15],
)

async def job_get_by_kind_and_unique_properties(self, arg: JobGetByKindAndUniquePropertiesParams) -> Optional[models.RiverJob]:
row = (await self._conn.execute(sqlalchemy.text(JOB_GET_BY_KIND_AND_UNIQUE_PROPERTIES), {
"p1": arg.kind,
Expand Down
5 changes: 5 additions & 0 deletions src/riverqueue/driver/riversqlalchemy/dbsqlc/river_job.sql
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ CREATE TABLE river_job(
SELECT *
FROM river_job;

-- name: JobGetByID :one
SELECT *
FROM river_job
WHERE id = @id;

-- name: JobGetByKindAndUniqueProperties :one
SELECT *
FROM river_job
Expand Down
60 changes: 19 additions & 41 deletions tests/driver/riversqlalchemy/sqlalchemy_driver_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
UniqueOpts,
)
from riverqueue.driver import riversqlalchemy
from riverqueue.driver.driver_protocol import JobGetByKindAndUniquePropertiesParam
from riverqueue.driver.riversqlalchemy import dbsqlc


class TestAsyncClient:
Expand All @@ -40,19 +40,12 @@ async def test_tx(
yield conn_tx
await conn_tx.rollback()

@pytest.fixture
@staticmethod
def driver(
test_tx: sqlalchemy.ext.asyncio.AsyncConnection,
) -> riversqlalchemy.AsyncDriver:
return riversqlalchemy.AsyncDriver(test_tx)

@pytest_asyncio.fixture
@staticmethod
async def client(
driver: riversqlalchemy.AsyncDriver,
test_tx: sqlalchemy.ext.asyncio.AsyncConnection,
) -> AsyncClient:
return AsyncClient(driver)
return AsyncClient(riversqlalchemy.AsyncDriver(test_tx))

#
# tests
Expand Down Expand Up @@ -87,26 +80,22 @@ async def test_insert_with_only_args(self, client, simple_args):
assert insert_res.job

@pytest.mark.asyncio
async def test_insert_tx(self, client, driver, engine_async, simple_args, test_tx):
async def test_insert_tx(self, client, engine_async, simple_args, test_tx):
insert_res = await client.insert_tx(test_tx, simple_args)
assert insert_res.job

job = await driver.unwrap_executor(
test_tx
).job_get_by_kind_and_unique_properties(
JobGetByKindAndUniquePropertiesParam(kind=simple_args.kind)
job = await dbsqlc.river_job.AsyncQuerier(test_tx).job_get_by_id(
id=insert_res.job.id
)
assert job == insert_res.job
assert job

async with engine_async.begin() as conn_tx2:
job = await driver.unwrap_executor(
conn_tx2
).job_get_by_kind_and_unique_properties(
JobGetByKindAndUniquePropertiesParam(kind=simple_args.kind)
async with engine_async.begin() as test_tx2:
job = await dbsqlc.river_job.AsyncQuerier(test_tx2).job_get_by_id(
id=insert_res.job.id
)
assert job is None

await conn_tx2.rollback()
await test_tx2.rollback()

@pytest.mark.asyncio
async def test_insert_with_opts(self, client, simple_args):
Expand Down Expand Up @@ -206,13 +195,8 @@ def test_tx(engine: sqlalchemy.Engine) -> Iterator[sqlalchemy.Connection]:

@pytest.fixture
@staticmethod
def driver(test_tx: sqlalchemy.Connection) -> riversqlalchemy.Driver:
return riversqlalchemy.Driver(test_tx)

@pytest.fixture
@staticmethod
def client(driver: riversqlalchemy.Driver) -> Client:
return Client(driver)
def client(test_tx: sqlalchemy.Connection) -> Client:
return Client(riversqlalchemy.Driver(test_tx))

#
# tests; should match with tests for the async client above
Expand All @@ -222,24 +206,18 @@ def test_insert_with_only_args(self, client, simple_args):
insert_res = client.insert(simple_args)
assert insert_res.job

def test_insert_tx(self, client, driver, engine, simple_args, test_tx):
def test_insert_tx(self, client, engine, simple_args, test_tx):
insert_res = client.insert_tx(test_tx, simple_args)
assert insert_res.job

job = driver.unwrap_executor(test_tx).job_get_by_kind_and_unique_properties(
JobGetByKindAndUniquePropertiesParam(kind=simple_args.kind)
)
assert job == insert_res.job
job = dbsqlc.river_job.Querier(test_tx).job_get_by_id(id=insert_res.job.id)
assert job

with engine.begin() as conn_tx2:
job = driver.unwrap_executor(
conn_tx2
).job_get_by_kind_and_unique_properties(
JobGetByKindAndUniquePropertiesParam(kind=simple_args.kind)
)
with engine.begin() as test_tx2:
job = dbsqlc.river_job.Querier(test_tx2).job_get_by_id(id=insert_res.job.id)
assert job is None

conn_tx2.rollback()
test_tx2.rollback()

def test_insert_with_opts(self, client, simple_args):
insert_opts = InsertOpts(queue="high_priority", unique_opts=None)
Expand Down