Apalis RouteQueue with PostgreSQL; tests, docs, and BDD#335
Conversation
|
Note Reviews pausedUse the following commands to manage reviews:
Use the checkboxes below for quick actions:
WalkthroughAdd an Apalis‑backed PostgreSQL RouteQueue adapter and provider abstraction; introduce apalis/sqlx dependencies and update a git dependency; extract stub adapter to its own module; add in‑memory and failing test providers; add unit and BDD tests using embedded Postgres; update architecture and roadmap docs. Request‑path dispatch remains unimplemented. Changes
Sequence Diagram(s)sequenceDiagram
participant Caller as Caller
participant RouteQueueAdapter as GenericApalisRouteQueue
participant Provider as ApalisPostgresProvider / QueueProvider
participant ApalisStorage as PostgresStorage
participant Postgres as PostgreSQL
Caller->>RouteQueueAdapter: enqueue(plan)
RouteQueueAdapter->>RouteQueueAdapter: serde_json::to_value(plan)
alt serialization OK
RouteQueueAdapter->>Provider: push_job(json_payload)
Provider->>ApalisStorage: push(payload)
ApalisStorage->>Postgres: INSERT INTO apalis.jobs (payload,...)
Postgres-->>ApalisStorage: OK / Error
ApalisStorage-->>Provider: Ok / Err
Provider-->>RouteQueueAdapter: Ok / JobDispatchError::Unavailable
RouteQueueAdapter-->>Caller: Ok / JobDispatchError::Unavailable
else serialization fails
RouteQueueAdapter-->>Caller: JobDispatchError::Rejected (with serializer error)
end
Poem
🚥 Pre-merge checks | ✅ 6 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (6 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Comment |
Reviewer's GuideIntroduces a production-ready Apalis-backed RouteQueue adapter using PostgreSQL storage behind a QueueProvider abstraction, retains a stub implementation, adds unit and BDD tests with an embedded Postgres harness, updates architecture docs, and wires in Apalis/sqlx dependencies. Sequence diagram for enqueue via GenericApalisRouteQueue with ApalisPostgresProvidersequenceDiagram
actor DomainService
participant RouteQueueImpl as GenericApalisRouteQueue_P_Q
participant Provider as ApalisPostgresProvider
participant Storage as PostgresStorage_serde_json_Value
participant Postgres as PostgreSQL
DomainService->>RouteQueueImpl: enqueue(&plan)
activate RouteQueueImpl
RouteQueueImpl->>RouteQueueImpl: serde_json::to_vec(plan)
alt serialization_success
RouteQueueImpl->>Provider: push_job(payload)
activate Provider
Provider->>Provider: serde_json::from_slice(payload)
Provider->>Storage: push(job)
activate Storage
Storage->>Postgres: INSERT job
Postgres-->>Storage: ok
Storage-->>Provider: Ok(())
deactivate Storage
Provider-->>RouteQueueImpl: Ok(())
deactivate Provider
RouteQueueImpl-->>DomainService: Ok(())
else serialization_failure
RouteQueueImpl-->>DomainService: Err(JobDispatchError::Rejected)
end
deactivate RouteQueueImpl
Class diagram for Apalis-backed RouteQueue adapter and stubclassDiagram
direction LR
class RouteQueue {
<<trait>>
+enqueue(plan : Plan) Result~(), JobDispatchError~
<<associated type>> Plan
}
class JobDispatchError {
<<enum>>
Unavailable
Rejected
}
class QueueProvider {
<<trait>>
+push_job(payload : Vec~u8~) Result~(), JobDispatchError~
}
class GenericApalisRouteQueue {
<<struct>>
-provider : Q
-_plan : PhantomData~fn() -> P~
+new(provider : Q) GenericApalisRouteQueue~P, Q~
+enqueue(plan : &P) Result~(), JobDispatchError~
}
class ApalisRouteQueue {
<<type alias>>
GenericApalisRouteQueue~P, ApalisPostgresProvider~
}
class ApalisPostgresProvider {
<<struct>>
-storage : PostgresStorage~serde_json::Value~
+new(pool : PgPool) Result~ApalisPostgresProvider, JobDispatchError~
+push_job(payload : Vec~u8~) Result~(), JobDispatchError~
}
class StubRouteQueue {
<<struct>>
-_marker : PhantomData~P~
+new() StubRouteQueue~P~
+enqueue(plan : &P) Result~(), JobDispatchError~
}
class FakeQueueProvider {
<<struct>>
-jobs : Vec~Vec~u8~~
+new() FakeQueueProvider
+push_job(payload : Vec~u8~) Result~(), JobDispatchError~
+pushed_jobs() Vec~Vec~u8~~
}
class FailingQueueProvider {
<<struct>>
-message : String
+new(message : String) FailingQueueProvider
+push_job(payload : Vec~u8~) Result~(), JobDispatchError~
}
class PgPool {
<<struct>>
}
class PostgresStorage {
<<struct>>
+setup(pool : &PgPool) Result~(), Error~
+new(pool : &PgPool) PostgresStorage~T~
+push(job : T) Result~(), Error~
}
RouteQueue <|.. GenericApalisRouteQueue : implements
RouteQueue <|.. StubRouteQueue : implements
QueueProvider <|.. ApalisPostgresProvider : implements
QueueProvider <|.. FakeQueueProvider : implements
QueueProvider <|.. FailingQueueProvider : implements
GenericApalisRouteQueue o--> QueueProvider : uses
GenericApalisRouteQueue --> JobDispatchError : error_mapping
ApalisPostgresProvider --> PgPool : constructed_with
ApalisPostgresProvider o--> PostgresStorage : wraps
ApalisPostgresProvider --> JobDispatchError : error_mapping
StubRouteQueue --> JobDispatchError : returns
FakeQueueProvider --> JobDispatchError : returns
FailingQueueProvider --> JobDispatchError : returns
JobDispatchError ..> RouteQueue : used_by
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
|
@coderabbitai Please suggest a fix for this issue and supply a prompt for an AI coding agent to enable it to apply the fix. Include the file and symbol names indicated in the issue at the head of your response. backend/tests/route_queue_apalis_bdd.rs Comment on file fn the_plan_is_persisted_in_the_queue_storage(world: &SharedContext) {
with_context_async(
world,
|ctx| {
let pool = ctx.pool.clone().expect("pool should be available");
let plan = ctx.enqueued_plans.last().expect("at least one plan");
(pool, plan.clone())
},
|(pool, plan)| async move {
let count: i64 =
sqlx::query_scalar("SELECT COUNT(*) FROM apalis.jobs WHERE job->>'name' = $1")
.bind(&plan.name)
.fetch_one(&pool)
.await
.expect("query job count");
count
},
|_ctx, count| {
assert!(
count >= 1,
"expected at least one job in storage, found {count}"
);
},
);
}❌ New issue: Code Duplication |
This comment was marked as resolved.
This comment was marked as resolved.
cd95224 to
3605a4c
Compare
|
@coderabbitai Please suggest a fix for this issue and supply a prompt for an AI coding agent to enable it to apply the fix. Include the file and symbol names indicated in the issue at the head of your response. Ensure that this is validated against the current version of the codegraph. If further refinement to address this finding would be deleterious, please supply a clear explanatory one to two paragraph markdown message I can paste into the CodeScene web ui's diagnostic suppression function so this diagnostic can be silenced. Code Duplicationbackend/tests/route_queue_apalis_bdd.rs: What lead to degradation?The module contains 2 functions with similar structure: the_plan_is_persisted_in_the_queue_storage,two_independent_jobs_exist_in_storage Why does this problem occur?Duplicated code often leads to code that's harder to change since the same logical change has to be done in multiple functions. More duplication gives lower code health. How to fix it?A certain degree of duplicated code might be acceptable. The problems start when it is the same behavior that is duplicated across the functions in the module, ie. a violation of the Don't Repeat Yourself (DRY) principle. DRY violations lead to code that is changed together in predictable patterns, which is both expensive and risky. DRY violations can be identified using CodeScene's X-Ray analysis to detect clusters of change coupled functions with high code similarity. Read More |
|
Caution Failed to replace (edit) comment. This is likely due to insufficient permissions or the comment being deleted. Error details |
This comment was marked as resolved.
This comment was marked as resolved.
|
@coderabbitai Please suggest a fix for the following test failures and provide an AI coding agent prompt for the fix: |
This comment was marked as resolved.
This comment was marked as resolved.
89a5113 to
cd58818
Compare
|
@coderabbitai Please suggest a fix for the following test failure and provide an AI coding agent prompt for the fix: |
This comment was marked as resolved.
This comment was marked as resolved.
…helpers Simplified locking mechanism in FakeQueueProvider by removing explicit unwrap_or_else panic and using unwrap directly, leading to cleaner and more concise code. Co-authored-by: devboxerhub[bot] <devboxerhub[bot]@users.noreply.github.com>
…' for clarity Refactored the fixture name from 'queue_world' to 'world' in route_queue_apalis_bdd.rs to improve test code readability and consistency in naming conventions. Co-authored-by: devboxerhub[bot] <devboxerhub[bot]@users.noreply.github.com>
…ontext Change all step definition functions in route_queue_apalis_bdd.rs to accept an Option<SharedContext> instead of SharedContext. Add early return if None, tightening safety and preventing panics when context is missing during BDD tests. Co-authored-by: devboxerhub[bot] <devboxerhub[bot]@users.noreply.github.com>
Modify SQL to convert 'job' from bytea to JSONB before accessing 'name' field, fixing query error in Apalis jobs count helper function. Co-authored-by: devboxerhub[bot] <devboxerhub[bot]@users.noreply.github.com>
Changed ApalisPostgresProvider to parse the job payload as JSON before pushing it to the PostgresStorage. This prevents invalid payloads from being enqueued and provides clearer error handling when the payload is not valid JSON. Co-authored-by: devboxerhub[bot] <devboxerhub[bot]@users.noreply.github.com>
Introduce a new `QueueProvider` trait accepting `serde_json::Value` payloads to unify job serialization. Implement `GenericApalisRouteQueue` with PostgreSQL storage via `apalis-postgres`. Migrate serialization from bytes vector to JSON values for improved decoupling. Add new integration tests and improve error handling for queue unavailability. Update documentation to reflect PostgreSQL as job queue backend using Apalis, describing architecture, monitoring, and migration rationale. Co-authored-by: devboxerhub[bot] <devboxerhub[bot]@users.noreply.github.com>
…g jobs Avoid parsing payload into serde_json::Value before pushing; push raw payload instead. Simplify FakeQueueProvider code by replacing expect calls with unwrap for mutex locking. Co-authored-by: devboxerhub[bot] <devboxerhub[bot]@users.noreply.github.com>
…rror Add test case for GenericApalisRouteQueue to ensure that enqueue returns a rejected error when serialization of the job plan fails. This verifies proper error handling and that no jobs are pushed to the queue provider on serialization failure. Co-authored-by: devboxerhub[bot] <devboxerhub[bot]@users.noreply.github.com>
…ing and document usage - changed FakeQueueProvider::pushed_jobs to return Result instead of panicking on mutex poison - updated push_job to propagate mutex lock errors as JobDispatchError::Unavailable - added example code to QueueProvider::push_job trait method doc - updated tests to properly handle Result from pushed_jobs - improved error message assertions for serialization failure Co-authored-by: devboxerhub[bot] <devboxerhub[bot]@users.noreply.github.com>
- Removed custom FailingProvider and replaced with real ApalisPostgresProvider using an invalid database connection URL to simulate failure. - Adapted async setup code to handle lazy connection pool and provider error. - Improved error handling and test robustness for database unavailability scenarios. - Renamed test structs and improved error unwrapping for serialization failure tests. - Removed unnecessary imports and test helper annotations for clarity. Co-authored-by: devboxerhub[bot] <devboxerhub[bot]@users.noreply.github.com>
Add Debug and Clone derives to the FailingSerializePlan struct used in serialization failing tests. This change improves test diagnostics and usability by enabling debug printing and cloning of the test plan. Co-authored-by: devboxerhub[bot] <devboxerhub[bot]@users.noreply.github.com>
…usage Enhanced explanations clarify why the `setup()` method is called on `PostgresStorage<(), (), ()>` despite type parameters used elsewhere, detailing Apalis library conventions and safety rationale. Co-authored-by: devboxerhub[bot] <devboxerhub[bot]@users.noreply.github.com>
Add a check for queue availability before attempting to enqueue a test plan in the BDD test step. If the queue is unavailable (e.g., provider construction failed), skip enqueueing since the error is already recorded. This prevents redundant attempts and clarifies test behavior when queue setup fails. Co-authored-by: devboxerhub[bot] <devboxerhub[bot]@users.noreply.github.com>
… and comments Corrected the spelling of 'initialised' to 'initialized' in the BDD feature file and corresponding Rust test step definitions and comments for consistency. Co-authored-by: devboxerhub[bot] <devboxerhub[bot]@users.noreply.github.com>
- Remove unused import handle_cluster_setup_failure for clarity - Replace cluster setup failure handler with panic to fail tests explicitly - Replace conditional skip in tests with assert to enforce fixture setup success Co-authored-by: devboxerhub[bot] <devboxerhub[bot]@users.noreply.github.com>
- Make QueueProvider trait crate-internal (pub(crate)) and remove public re-export from mod.rs to align documentation with visibility - Update stub warning message from "not implemented" to "stub adapter selected; job discarded" for clarity - Update FailingQueueProvider documentation to clarify it only produces JobDispatchError::Unavailable (not rejection errors) - Fix spelling: change "initialised" to "initialized" in BDD test expect message for consistency with American English - Update architecture docs to use future tense for Apalis worker configuration (queues, handlers, retries, dead-letter, timeouts) since these features are planned but not yet implemented - Update execplan documentation to reflect current queue contract: replace DeserializeOwned/Vec<u8> references with serde_json::Value payload flow and Serialize-only bounds Note: Kept manual Default implementation for StubRouteQueue as deriving Default would add unwanted P: Default bound. The inline comment suggesting derivation was incorrect. Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
- Update example code in QueueProvider trait to use 'rust,ignore' for maintainers. - Refactor Apalis RouteQueue documentation for clarity and accuracy. - Edit wildside-backend-architecture.md to better explain worker mode initialization, job queue config, retries, and timeouts with Apalis. - Add dependency on apalis-core 1.0.0-rc.7 in execplan toml. These changes improve documentation clarity around the Apalis-based backend queue integration and usage in roadmap 5.2.1. Co-authored-by: devboxerhub[bot] <devboxerhub[bot]@users.noreply.github.com>
…QL job queue adapter and roadmap - Describe new ApalisPostgresProvider, ApalisRouteQueue, and GenericApalisRouteQueue alongside StubRouteQueue in queue adapter docs. - Revise backend architecture details to clarify planned Apalis PostgreSQL-backed job queue, replacing Redis queue mentions. - Update design decisions to reflect introduction of PostgreSQL-backed RouteQueue adapter with Apalis. - Clarify worker mode bootstrap and scaling strategy is planned but not yet implemented. - Refine explanations on job queue usage, observability, and deployment scaling with Apalis and PostgreSQL. These documentation changes align with roadmap item 5.2.1 and clarify the current state and future plans for job queueing in the backend. Co-authored-by: devboxerhub[bot] <devboxerhub[bot]@users.noreply.github.com>
Keep the invalid-database BDD scenario focused on RouteQueue::enqueue instead of recording failure during the Given step. Expose the queue test helpers to integration tests through the existing feature, then fall back to a failing queue provider when Apalis provider setup cannot complete on a lazy broken pool. This keeps the queue present, leaves empty until the When step, and ensures the unavailable error is observed at enqueue time.
Wrap the PostgreSQL queue-capabilities sentence in so the paragraph stays within the repository's 80-column Markdown limit without changing its wording or context.
Annotate the FakeQueueProvider implementation of QueueProvider with async_trait so the generated push_job signature matches the trait declaration. Without the attribute, the test helper trips E0195 during lint and test builds because the async trait transformation is only applied on the trait side.
Add a queue adapter testing section to the canonical developers guide. The new section documents the Apalis PostgreSQL RouteQueue adapter, its unit and BDD test infrastructure, setup_apalis_storage, build requirements, and adapter boundaries while leaving the existing Redis guidance unchanged.
Rebasing onto origin/main exposed duplicate ingest-osm binary entries in backend/Cargo.toml. Restore the openapi-dump and er-snapshots targets from main while keeping the queue adapter dependency changes from this branch. This returns the manifest to a valid state and keeps the rebased branch buildable.
d528cc0 to
3527543
Compare
Remove extra blank lines in the queue adapter testing section of the developers guide so markdownlint no longer reports MD012 violations.
|
Tip For best results, initiate chat on the files or code changes. 🧩 Analysis chain🏁 Script executed: Length of output: 4280
No further work is required for either check. |
Summary
Changes
Rationale
Plan of work (highlights)
Validation and acceptance
implements RouteQueue<Plan = P> using Apalis PostgreSQL storage, serializing plans to JSON payloads and persisting them via Apalis storage, with proper error mapping (Unavailable for storage/connectivity; Rejected for serialization failures).
Risks
Interfaces and dependencies
Notes for reviewers
Approval
📎 Task: https://www.devboxer.com/task/fce762d0-cfb1-4ddc-8ebe-0e0c73f5e683