diff --git a/src/riverqueue/driver/riversqlalchemy/dbsqlc/river_job.py b/src/riverqueue/driver/riversqlalchemy/dbsqlc/river_job.py index 80d098e..6e0c2e2 100644 --- a/src/riverqueue/driver/riversqlalchemy/dbsqlc/river_job.py +++ b/src/riverqueue/driver/riversqlalchemy/dbsqlc/river_job.py @@ -18,6 +18,13 @@ """ +JOB_GET_BY_ID = """-- name: job_get_by_id \\:one +SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags +FROM river_job +WHERE id = :p1 +""" + + JOB_GET_BY_KIND_AND_UNIQUE_PROPERTIES = """-- name: job_get_by_kind_and_unique_properties \\:one SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags FROM river_job @@ -153,6 +160,29 @@ def job_get_all(self) -> Iterator[models.RiverJob]: tags=row[15], ) + def job_get_by_id(self, *, id: int) -> Optional[models.RiverJob]: + row = self._conn.execute(sqlalchemy.text(JOB_GET_BY_ID), {"p1": id}).first() + if row is None: + return None + return models.RiverJob( + id=row[0], + args=row[1], + attempt=row[2], + attempted_at=row[3], + attempted_by=row[4], + created_at=row[5], + errors=row[6], + finalized_at=row[7], + kind=row[8], + max_attempts=row[9], + metadata=row[10], + priority=row[11], + queue=row[12], + state=row[13], + scheduled_at=row[14], + tags=row[15], + ) + def job_get_by_kind_and_unique_properties(self, arg: JobGetByKindAndUniquePropertiesParams) -> Optional[models.RiverJob]: row = self._conn.execute(sqlalchemy.text(JOB_GET_BY_KIND_AND_UNIQUE_PROPERTIES), { "p1": arg.kind, @@ -263,6 +293,29 @@ async def job_get_all(self) -> AsyncIterator[models.RiverJob]: tags=row[15], ) + async def job_get_by_id(self, *, id: int) -> Optional[models.RiverJob]: + row = (await self._conn.execute(sqlalchemy.text(JOB_GET_BY_ID), {"p1": id})).first() + if row is None: + return None + return models.RiverJob( + id=row[0], + args=row[1], + attempt=row[2], + attempted_at=row[3], + attempted_by=row[4], + created_at=row[5], + errors=row[6], + finalized_at=row[7], + kind=row[8], + max_attempts=row[9], + metadata=row[10], + priority=row[11], + queue=row[12], + state=row[13], + scheduled_at=row[14], + tags=row[15], + ) + async def job_get_by_kind_and_unique_properties(self, arg: JobGetByKindAndUniquePropertiesParams) -> Optional[models.RiverJob]: row = (await self._conn.execute(sqlalchemy.text(JOB_GET_BY_KIND_AND_UNIQUE_PROPERTIES), { "p1": arg.kind, diff --git a/src/riverqueue/driver/riversqlalchemy/dbsqlc/river_job.sql b/src/riverqueue/driver/riversqlalchemy/dbsqlc/river_job.sql index dd0402d..26b0d05 100644 --- a/src/riverqueue/driver/riversqlalchemy/dbsqlc/river_job.sql +++ b/src/riverqueue/driver/riversqlalchemy/dbsqlc/river_job.sql @@ -39,6 +39,11 @@ CREATE TABLE river_job( SELECT * FROM river_job; +-- name: JobGetByID :one +SELECT * +FROM river_job +WHERE id = @id; + -- name: JobGetByKindAndUniqueProperties :one SELECT * FROM river_job diff --git a/tests/driver/riversqlalchemy/sqlalchemy_driver_test.py b/tests/driver/riversqlalchemy/sqlalchemy_driver_test.py index 15c5717..d8a7ebe 100644 --- a/tests/driver/riversqlalchemy/sqlalchemy_driver_test.py +++ b/tests/driver/riversqlalchemy/sqlalchemy_driver_test.py @@ -18,7 +18,7 @@ UniqueOpts, ) from riverqueue.driver import riversqlalchemy -from riverqueue.driver.driver_protocol import JobGetByKindAndUniquePropertiesParam +from riverqueue.driver.riversqlalchemy import dbsqlc class TestAsyncClient: @@ -40,19 +40,12 @@ async def test_tx( yield conn_tx await conn_tx.rollback() - @pytest.fixture - @staticmethod - def driver( - test_tx: sqlalchemy.ext.asyncio.AsyncConnection, - ) -> riversqlalchemy.AsyncDriver: - return riversqlalchemy.AsyncDriver(test_tx) - @pytest_asyncio.fixture @staticmethod async def client( - driver: riversqlalchemy.AsyncDriver, + test_tx: sqlalchemy.ext.asyncio.AsyncConnection, ) -> AsyncClient: - return AsyncClient(driver) + return AsyncClient(riversqlalchemy.AsyncDriver(test_tx)) # # tests @@ -87,26 +80,22 @@ async def test_insert_with_only_args(self, client, simple_args): assert insert_res.job @pytest.mark.asyncio - async def test_insert_tx(self, client, driver, engine_async, simple_args, test_tx): + async def test_insert_tx(self, client, engine_async, simple_args, test_tx): insert_res = await client.insert_tx(test_tx, simple_args) assert insert_res.job - job = await driver.unwrap_executor( - test_tx - ).job_get_by_kind_and_unique_properties( - JobGetByKindAndUniquePropertiesParam(kind=simple_args.kind) + job = await dbsqlc.river_job.AsyncQuerier(test_tx).job_get_by_id( + id=insert_res.job.id ) - assert job == insert_res.job + assert job - async with engine_async.begin() as conn_tx2: - job = await driver.unwrap_executor( - conn_tx2 - ).job_get_by_kind_and_unique_properties( - JobGetByKindAndUniquePropertiesParam(kind=simple_args.kind) + async with engine_async.begin() as test_tx2: + job = await dbsqlc.river_job.AsyncQuerier(test_tx2).job_get_by_id( + id=insert_res.job.id ) assert job is None - await conn_tx2.rollback() + await test_tx2.rollback() @pytest.mark.asyncio async def test_insert_with_opts(self, client, simple_args): @@ -206,13 +195,8 @@ def test_tx(engine: sqlalchemy.Engine) -> Iterator[sqlalchemy.Connection]: @pytest.fixture @staticmethod - def driver(test_tx: sqlalchemy.Connection) -> riversqlalchemy.Driver: - return riversqlalchemy.Driver(test_tx) - - @pytest.fixture - @staticmethod - def client(driver: riversqlalchemy.Driver) -> Client: - return Client(driver) + def client(test_tx: sqlalchemy.Connection) -> Client: + return Client(riversqlalchemy.Driver(test_tx)) # # tests; should match with tests for the async client above @@ -222,24 +206,18 @@ def test_insert_with_only_args(self, client, simple_args): insert_res = client.insert(simple_args) assert insert_res.job - def test_insert_tx(self, client, driver, engine, simple_args, test_tx): + def test_insert_tx(self, client, engine, simple_args, test_tx): insert_res = client.insert_tx(test_tx, simple_args) assert insert_res.job - job = driver.unwrap_executor(test_tx).job_get_by_kind_and_unique_properties( - JobGetByKindAndUniquePropertiesParam(kind=simple_args.kind) - ) - assert job == insert_res.job + job = dbsqlc.river_job.Querier(test_tx).job_get_by_id(id=insert_res.job.id) + assert job - with engine.begin() as conn_tx2: - job = driver.unwrap_executor( - conn_tx2 - ).job_get_by_kind_and_unique_properties( - JobGetByKindAndUniquePropertiesParam(kind=simple_args.kind) - ) + with engine.begin() as test_tx2: + job = dbsqlc.river_job.Querier(test_tx2).job_get_by_id(id=insert_res.job.id) assert job is None - conn_tx2.rollback() + test_tx2.rollback() def test_insert_with_opts(self, client, simple_args): insert_opts = InsertOpts(queue="high_priority", unique_opts=None)