feat(job_queue): cross-worker cancel + polling watchdog for multi-worker Redis-backed builds#13084
feat(job_queue): cross-worker cancel + polling watchdog for multi-worker Redis-backed builds#13084ogabrielluiz wants to merge 35 commits into
Conversation
- Refactored the job queue service to support Redis-backed management for cross-worker scaling.
- Added environment variables for configuration:
- `LANGFLOW_JOB_QUEUE_TYPE=redis`
- `LANGFLOW_REDIS_QUEUE_DB=1`
- Updated job ownership methods to be asynchronous for improved concurrency handling.
- Enhanced Redis cache service with namespacing via key prefixes.
- Introduced `fakeredis` for in-memory Redis simulation in testin>
- Added comprehensive unit tests for Redis job queue components.
- Introduced a mechanism to emit a one-time warning for the RedisCache experimental feature during server runtime. - The warning is logged only if no other worker has already emitted it, ensuring clarity for users regarding the experimental status of RedisCache. - The implementation includes a temporary file check to prevent multiple warnings across different processes.
- Added documentation for LANGFLOW_GUNICORN_PRELOAD to explain preloading for better performance. - Detailed the use of LANGFLOW_JOB_QUEUE_TYPE for specifying backends (e.g., Redis). - Included LANGFLOW_REDIS_QUEUE_DB to define the database index for job queues. - Updated the "High-Load Environments" guide with these optimal configurations.
- Introduced a caching mechanism for Redis stream consumers to optimize job data retrieval. - Added methods to manage consumer wrappers, ensuring they are reused across sequential polls. - Implemented cleanup logic to cancel and clear consumer wrappers during job cleanup and service stop. - Expanded unit tests to verify consumer wrapper reuse and cleanup behavior.
…ation - Updated the cleanup_job method in RedisJobQueueService to guarantee Redis keys are removed even if the job cleanup is interrupted by a CancelledError. - Added a new unit test to verify that Redis keys are deleted correctly when cleanup is called during task cancellation.
- Added handling for the connection check task in the stop method to ensure it is properly cancelled and awaited if still running. - This change improves resource management and prevents potential issues during service shutdown.
…ueueService - Updated the job processing logic to ensure that if a job is cancelled during the xadd operation, the unpublished sentinel is requeued instead of being dropped. - Introduced a new unit test to verify this behavior, ensuring robustness in job handling during cancellations.
…cleanup - Added atexit cleanup to remove stale temporary files for RedisCache. - Refactored Redis job queue service to use shared constants for stream prefixes, improving maintainability. - Updated type hints for better clarity and consistency in RedisQueueWrapper and RedisJobQueueService. - Enhanced error handling with configurable backoff for transient read failures.
- Updated the xadd method in RedisJobQueueService to include maxlen and approximate parameters, improving stream management and preventing excessive memory usage.
- Updated the get_job_owner method to refresh the Redis key TTL on successful lookups, ensuring long-running jobs maintain their ownership anchor. - Improved code clarity by extracting the owner key into a variable and adding detailed docstring explanations for better understanding of the TTL management.
- Enhanced the cleanup_job method in RedisJobQueueService to accurately capture job ownership before deleting Redis keys, preventing potential data corruption in multi-worker scenarios. - Added comments for clarity on ownership logic and its implications during job cleanup.
- Introduced periodic TTL refresh logic in the _bridge_to_redis method to enhance Redis stream management, reducing round-trips and improving throughput. - Added constants for TTL refresh events and seconds to maintain clarity and configurability. - Updated event handling to ensure TTL is refreshed appropriately based on event count and time elapsed.
- Removed unnecessary error handling for missing event tasks in get_flow_events_response, allowing for smoother operation when no task exists. - Updated create_flow_response to handle optional event_task parameter, ensuring proper cleanup during disconnections. - Added unit tests to verify behavior when event tasks are missing, enhancing robustness in streaming scenarios.
…bservation - Added a startup grace period to prevent premature end-of-stream signals when the producer has not yet issued its first XADD. - Introduced a flag to track whether the stream has been observed, improving the handling of early polling scenarios. - Updated logic to ensure proper handling of stream existence checks and logging for better debugging during job processing.
…ueService - Added a new method to clean up done cross-worker consumer wrappers that are not owned by the current worker, ensuring proper resource management. - Enhanced the existing cleanup logic to prevent memory leaks by explicitly pruning stale entries from the consumer wrappers dictionary. - Improved logging to provide better visibility into the cleanup process for cross-worker jobs.
- Updated the start_job method to accept a Coroutine type for task_coro, ensuring type safety. - Introduced a new _guarded_task method to wrap job coroutines, guaranteeing that unhandled exceptions emit an error event and write a sentinel to the Redis Stream, improving reliability in job processing. - Enhanced documentation to clarify the behavior of the new task handling mechanism and its implications for cross-worker consumers.
- Added logging for scenarios where a client disconnects without an associated event_task, clarifying that the producer will continue running until the build completes. - Documented the limitation regarding cross-worker passive disconnects and the need for a Redis side-channel for proper cancellation, enhancing observability in the logs.
…r behavior - Introduced a flag to track the completion of the first XREAD call, ensuring proper buffer management during the initial read phase. - Updated the empty method to reflect the state of the buffer accurately, preventing premature exits from the drain loop until the first read is complete. - Improved documentation to clarify the behavior of the new flag and its impact on job processing.
… no-op operations - Updated test documentation to explain the behavior of the empty() method in relation to the first XREAD completion, ensuring accurate understanding of buffer state during job processing. - Adjusted assertions in tests to reflect the intended behavior of the RedisQueueWrapper, specifically regarding the no-op nature of put_nowait and its impact on the internal buffer.
- Added fakeredis dependency with a minimum version of 2.0.0 to both pyproject.toml and uv.lock files. - Ensured proper formatting and comments in pyproject.toml for clarity on onnxruntime version constraints.
… startup grace
- Add LANGFLOW_REDIS_QUEUE_STARTUP_GRACE_S (default 30.0) so deployments
with slow producer-worker cold-start can bump the wrapper's early-poll
grace period without editing code.
- Add LANGFLOW_REDIS_QUEUE_CANCEL_CHANNEL_ENABLED (default True) wiring a
per-job Redis pub/sub channel (langflow:cancel:<job_id>) so
POST /build/{job_id}/cancel works cross-worker. The producer subscribes
when the job starts; any worker can publish via
RedisJobQueueService.signal_cancel and the subscriber cancels the local
build task on receipt.
- cancel_flow_build now falls back to signal_cancel when event_task is
None on this worker, replacing the previous no-op for cross-worker
cancellation.
- Subscriber tasks are tracked in _cancel_subscribers and torn down in
cleanup_job/stop alongside the bridge tasks.
- Tests: 3 new unit tests covering cross-worker signal propagation, the
disabled-flag no-op, and the wrapper's per-instance startup_grace_s
override.
Discovered by a deeper local stress pass over the prototype: 1. **Signal-before-subscribe race**: publish from worker A would be lost if worker B's subscriber had not finished SUBSCRIBE yet. signal_cancel now also sets a short-lived langflow:cancel-marker:<job_id> key, and the subscriber checks it immediately after subscribing — so a cancel that raced the SUBSCRIBE still fires. 2. **Restart subscriber leak**: start_job called twice for the same job_id silently overwrote the dict entry and orphaned the previous subscriber (and its Redis pubsub connection). Now cancels the previous subscriber before storing the new one. 3. **Slow end-of-stream after cross-worker cancel**: cancelling the local task did not unblock cross-worker consumers — the bridge sat waiting on local_queue.get() until periodic cleanup ran ~5 min later. The subscriber now puts a sentinel on the local queue after task.cancel() so the bridge flushes a clean end-of-stream marker to Redis. Measured 8ms from signal_cancel to consumer end-of-stream against real Redis. Tests: 3 new unit tests covering each corner case; 9-scenario local stress harness against real Redis confirms all green.
Addresses the eight points from the PR review of the previous commits:
1. Configurable marker TTL — new LANGFLOW_REDIS_QUEUE_CANCEL_MARKER_TTL
setting (default 60s) replaces the hardcoded constant.
2. Connection-pool footprint is O(1) in active jobs — replaced the per-job
pub/sub subscriber with a single PSUBSCRIBE dispatcher per worker
listening on langflow:cancel:*.
3. Prompt stream cleanup after cancel — the dispatcher now waits for the
bridge to drain the sentinel before triggering cleanup_job, so the
Redis stream + owner keys are deleted within milliseconds instead of
waiting for the 5-minute periodic cleanup grace.
4. signal_cancel raises on Redis failure — publish errors propagate to the
caller instead of silently returning 0. The cancel HTTP endpoint
catches and returns False so the client can retry.
5. Auth note in signal_cancel docstring — explicit note that callers must
verify authorization; the HTTP cancel endpoint already does via
_verify_job_ownership before calling through.
6. Structured cancel logging — INFO-level logs on publish, marker hit,
owned dispatch, foreign dispatch. _cancel_stats counters expose
{published, marker_hit, dispatched_owned, dispatched_foreign,
publish_errors} for ops/metrics.
7. redis-py version compatibility — _close_pubsub falls back from
aclose() to close(), handles both sync and async return values.
8. Fire-and-forget tasks hold strong references — _background_tasks set
keeps marker checks and post-cancel cleanups from being GC'd before
they run; each task self-removes via add_done_callback. stop() drains
the set on shutdown.
Tests: 24 passing (1 skipped due to timing); deep stress harness verified
6 scenarios against real Redis.
…oss-worker builds Closes the remaining cross-worker passive-disconnect gap. Previously, when a client closed its streaming connection on a non-owner worker, the producer worker kept emitting events until the build completed naturally. Now the disconnect handler in create_flow_response publishes a cross-worker cancel via signal_cancel so the owning worker stops promptly. - create_flow_response accepts optional queue_service + job_id kwargs and uses them in on_disconnect for the cross-worker case (event_task is None). Both are keyword-only and default to None, preserving the single-worker contract. - get_flow_events_response wires both through. - New unit test test_cross_worker_disconnect_publishes_signal_cancel covers the full pubsub propagation path with two services sharing a fake Redis. - Tested end-to-end against real Redis with a two-worker harness: 10/10 scenarios pass, including the new disconnect propagation case.
…er setups
Five complementary improvements that make the Redis-backed cancel/lifecycle
path safe to leave running unattended under real ops pressure.
A. Dispatcher reconnect loop with exponential backoff. _run_cancel_dispatcher
used to exit silently on any Redis-side error (broker restart, network
blip, listen() hiccup), leaving the worker permanently blind to
cross-worker cancels until process restart. The loop now reconnects with
capped backoff (max 30s), and a dispatcher_reconnects counter exposes the
event for monitoring.
B. Outer timeout on _post_cancel_cleanup. The background cleanup task could
hang indefinitely if cleanup_job stalled (Redis pathology during DELETE);
wrap in asyncio.wait_for and let periodic cleanup retry instead.
C. Public metrics_snapshot() on JobQueueService (memory + Redis backends)
plus GET /monitor/job_queue endpoint behind get_current_active_user.
Surfaces backend, active_jobs, bridge_count, consumer_wrapper_count,
background_task_count, cancel_dispatcher_running, and the full
cancel_stats counter set.
D. Deterministic rewrite of test_redis_service_signal_cancel_flushes_sentinel_to_consumer.
The previous version was skipped roughly half the time due to a fakeredis
timing race; the new version no-ops cleanup_job for the duration of the
assertion so the bridge XADD ordering can be observed reliably. No more
skipped tests in the suite (33 pass, 0 skip).
E. Polling-mode stale-client watchdog. Polling has no persistent connection
for on_disconnect, so abandoned polling builds previously ran to natural
completion even after the client gave up. New flow:
* touch_activity(job_id) writes langflow:activity:<job_id> on every poll
and on streaming-response open; consume_and_yield refreshes it every
10s while the connection is held.
* _run_polling_watchdog scans owned jobs every
LANGFLOW_REDIS_QUEUE_POLLING_WATCHDOG_INTERVAL_S (default 15s) and
publishes signal_cancel for jobs whose activity is older than
LANGFLOW_REDIS_QUEUE_POLLING_STALE_THRESHOLD_S (default 90s).
* Streaming clients are protected by the heartbeat refresh; threshold <=
0 disables the watchdog entirely.
* cleanup_job folds the activity key into the existing DEL round-trip
so successful builds clean up after themselves.
End-to-end coverage: 33 unit tests pass (up from 25, with the previous skip
now passing deterministically); a two-worker real-Redis harness exercises 12
scenarios including dispatcher reconnect, post-cancel timeout, watchdog
reclamation, and metrics_snapshot schema completeness.
Acts on every must-fix finding from a multi-agent review pass (code-reviewer,
pr-test-analyzer, silent-failure-hunter, comment-analyzer) plus the Codex
adversarial review. No behavior is left to chance under realistic operational
conditions.
Critical fixes:
- Streaming heartbeat now runs as an INDEPENDENT task for the lifetime of the
response, not coupled to event yield cadence. A quiet build (long graph
step, slow LLM, no tokens for a while) previously had no heartbeat path, so
the polling watchdog could reclaim a live streaming client after the
threshold. The new heartbeat fires every streaming_activity_refresh_s
regardless of queue activity, and is cancelled cleanly on both
on_disconnect AND natural stream end.
- Watchdog start-grace window. A new in-memory _job_start_times[job_id]
timestamp is set in start_job() BEFORE super().start_job() so a job can
never appear in self._queues without a corresponding start time. When the
watchdog scans and the Redis activity key is missing, it skips the kill
until (now - start_ts) >= threshold. Without this, a slow background
touch_activity (event-loop pressure, Redis blip) could nuke a brand-new
build on the very first watchdog tick — the prior code's comment promised
the guard but didn't implement it.
- touch_activity errors are now observable. Replaces a blanket
contextlib.suppress(Exception) with an explicit try/except that bumps
activity_touch_errors and emits a debug log. CancelledError is no longer
swallowed. Operators get a signal when the heartbeat is silently failing.
- Watchdog UnboundLocalError closed. Initializes last = 0.0 before the
raw-is-None branch so a malformed activity value (parse failure) can't
crash the entire watchdog task. Adds activity_parse_errors and
activity_get_errors counters.
- Dispatcher except is split: ConnectionError/TimeoutError/OSError stay at
awarning (transient Redis), every other Exception goes to aerror with
exc_info=True AND increments dispatcher_internal_errors. Code bugs in
_handle_cancel no longer look identical to Redis disconnects.
- _post_cancel_cleanup narrows the bridge-wait suppress to TimeoutError
only, with a dedicated awarning naming the consequence ("cross-worker
consumers may see late end-of-stream"). Real bridge failures bubble up
via a separate Exception arm with awarning instead of being silent.
- Polling watchdog uses local _handle_cancel for owned jobs instead of
round-tripping through pubsub. Faster (no Redis RTT), dispatcher-
independent (works during a reconnect backoff), and keeps the
cancel_stats["published"] counter honest as a count of *external* cancels.
- /monitor/job_queue gated to get_current_active_superuser instead of
get_current_active_user — the snapshot exposes process-wide tenant
workload + cancel activity, which matters in multi-tenant deployments.
Docstring corrections:
- Removed the "cross-worker cancel is a known limitation" remnant from
RedisJobQueueService.get_queue_data — the limitation was closed in the
prior commit, and the new docstring tells callers exactly how cross-worker
cancel travels (signal_cancel → dispatcher).
- touch_activity docstring rewritten. The previous text described a
"TTL-based reasoning" fallback that didn't exist; the new text correctly
explains the 4x-threshold TTL and how it interacts with the watchdog's
start-time grace window.
New tests (5 added, total 38 pass):
- test_polling_watchdog_grants_start_grace_window: brand-new job with
deleted activity key survives until the threshold passes, then dies.
- test_polling_watchdog_skips_malformed_activity_value: malformed value
bumps activity_parse_errors and does NOT kill the job.
- test_streaming_heartbeat_runs_independent_of_event_yield: quiet streaming
build is not reclaimed; heartbeat task is cancelled on disconnect.
- test_concurrent_cancels_from_multiple_workers_are_idempotent: two workers
publish cancel simultaneously, no crashes, single observable cancel.
- test_dispatcher_internal_error_logged_at_error_level: bug in
_handle_cancel bumps dispatcher_internal_errors and dispatcher_reconnects;
the dispatcher task is still running afterwards.
Plus: test_metrics_snapshot_exposes_cancel_stats_and_counters now pins the
full cancel_stats key set (using set equality), so adding an increment
without registering the key — or vice versa — fails the test instead of
producing a silent KeyError in production.
End-to-end coverage: 38 unit tests pass; the two-worker real-Redis harness
passes all 12 scenarios on commit HEAD.
|
Important Review skippedAuto reviews are disabled on base/target branches other than the default branch. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Path: .coderabbit.yaml Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Conflicts resolved by taking the PR (HEAD) side for the production-hardened job_queue service, factory, settings, build/monitor APIs, and tests. The PR's RedisJobQueueService is a deliberate superset of the version that landed via #12588: it adds cross-worker cancel via PSUBSCRIBE, cancel-marker fallback, dispatcher auto-reconnect, polling watchdog, ops metrics endpoint, and client-disconnect propagation via signal_cancel. Other resolutions: - pyproject.toml: keep the Python 3.14 onnxruntime split from release-1.10.0 - docs env-variables.mdx: drop duplicate 'High-load and multi-worker' heading - uv.lock: regenerated against the merged pyproject.toml
…rted delivery - RedisQueueWrapper: restore _BUFFER_MAXSIZE bounded buffer and the _on_fill_done done-callback safety net that release-1.10.0 added in #12588, so a slow consumer cannot grow the buffer without bound and a crashing or cancelled _fill_task cannot leave consumers stuck on await get(). - build.get_flow_events_response: explicit exhaustiveness guard. Unknown EventDeliveryType values now return HTTP 400 with the supported set and a remediation hint instead of silently falling through to the polling path. - lfx settings.set_event_delivery: when workers > 1 without a redis queue, upgrade the warning to name the requested mode, the forced fallback, and the LANGFLOW_JOB_QUEUE_TYPE env var that would preserve the original mode. - Tests: port the three RedisQueueWrapper safety tests from release-1.10.0 and add coverage for the new event_delivery guard.
…owner Surfaced by locust load testing on a Redis-queue, 2-worker setup: 2016 requests / 2016 polling_watchdog_kills (1:1 ratio) TaskService.launch_task uses JobQueueService.start_job for server-internal tasks (telemetry, background work) without calling register_job_owner. These tasks never refresh the activity heartbeat because no polling client is involved, so the watchdog's start-time fallback was killing every single one once it crossed polling_stale_threshold_s. Under fast-completing flows the user-visible result was unaffected (the build finished before the watchdog struck) — but a long-running internal task (slow LLM call, retrieval, etc.) would be cancelled mid-flight even though no one was waiting on it. Scope the watchdog to jobs in self._job_owners: user-facing flow builds register an owner; TaskService tasks don't. After this change the same locust run reports polling_watchdog_kills=0 and dispatched_owned=0. Existing watchdog tests register an owner explicitly to mirror the real flow path. Adds a regression test (TaskService-style start_job with no owner must not be reclaimed).
|
✅ Migration Validation Passed All migrations follow the Expand-Contract pattern correctly. |
This comment has been minimized.
This comment has been minimized.
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## release-1.10.0 #13084 +/- ##
==================================================
+ Coverage 54.72% 54.80% +0.08%
==================================================
Files 2145 2145
Lines 200205 200523 +318
Branches 28489 30167 +1678
==================================================
+ Hits 109562 109897 +335
+ Misses 89461 89445 -16
+ Partials 1182 1181 -1
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
Three regression tests for the recent fixes: - TaskService integration: launch a task through TaskService.fire_and_forget_task and assert the polling watchdog leaves it alone (no owner registered, no kill). Catches a future regression where someone adds register_job_owner inside TaskService. - Settings multi-worker warning: verify Settings.set_event_delivery warns with the requested mode AND names LANGFLOW_JOB_QUEUE_TYPE in the fallback log when workers > 1 and the job queue is in-memory. An operator seeing missing events needs that env var name to fix it. - Bounded buffer backpressure: floods the fill task past the maxsize ceiling and verifies qsize() never exceeds _BUFFER_MAXSIZE. The previous test only asserted the constant, not the behavior. Skipped two suggested tests: - TestClient version of the unknown-event_delivery guard: FastAPI's enum validation returns 422 before our handler runs, so HTTP cannot reach the defensive 400. The existing unit test is the right test. - Exact watchdog kill-count assertion (== 1): the watchdog can tick again before cleanup_job removes the job from _queues, making any exact-count check flaky on slow CI. The existing >= 1 is correct.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
background_tasks.add_task() is silently dropped after FastAPI drains the POST /build response queue — by the time any cancel fires (local, cross-worker pub/sub, or polling watchdog), the task list from the original HTTP request is already consumed. Replace the background_tasks call in _run_vertex_build's CancelledError handler with asyncio.create_task(graph.end_all_traces_in_context()()) so the trace cleanup runs as an independent task regardless of the background_tasks lifecycle. Adds a regression test that exercises generate_flow_events end-to-end: a blocking vertex is cancelled mid-flight and the test asserts that end_all_traces is eventually called via the spawned task.
This comment has been minimized.
This comment has been minimized.
|
Build successful! ✅ |
Summary
Builds on top of #12588 to make the Redis-backed job queue production-ready for multi-worker deployments. Closes the cross-worker cancel gap end-to-end: pubsub side-channel with marker fallback for races, dispatcher auto-reconnect with backoff, polling watchdog for abandoned clients, and ops metrics behind a superuser-gated endpoint.
Stacks on #12588 — until that merges, this PR's diff will show those commits too. The 6 commits unique to this branch are:
What's new
Cross-worker cancel via Redis pub/sub
langflow:cancel:*— O(1) connection-pool usage in active jobs (replaces per-job subscriber).signal_cancel(job_id)setslangflow:cancel-marker:<job_id>(configurable TTL) BEFORE publishing, so a cancel signal that races the dispatcher's PSUBSCRIBE still fires when the target worker checks the marker onstart_job.POST /build/{job_id}/cancelendpoint and the streaming response'son_disconnecthandler both call through tosignal_cancel, so closing the browser tab on any worker stops the producer worker promptly instead of running to natural completion.exc_infoand bump a separatedispatcher_internal_errorscounter — Redis blips and code bugs are no longer indistinguishable in logs.Polling-mode stale-client watchdog
langflow:activity:<job_id>heartbeat written by every polling response and by an independent streaming heartbeat task that runs for the lifetime of the streaming connection (not coupled to event yield cadence — quiet builds keep their client-alive signal).LANGFLOW_REDIS_QUEUE_POLLING_WATCHDOG_INTERVAL_S(default 15s). Jobs with heartbeat older thanLANGFLOW_REDIS_QUEUE_POLLING_STALE_THRESHOLD_S(default 90s) get a local cancel._job_start_times[job_id]so brand-new jobs aren't reclaimed on the first tick iftouch_activityis briefly delayed.Observability
GET /monitor/job_queue(superuser-only) returns a metrics snapshot: backend, active jobs, bridge / consumer-wrapper / background-task counts, dispatcher liveness, and a fullcancel_statscounter set:published/marker_hit/dispatched_owned/dispatched_foreignpublish_errors/dispatcher_reconnects/dispatcher_internal_errorspolling_watchdog_killsactivity_touch_errors/activity_get_errors/activity_parse_errorsNew settings (env-var configurable)
LANGFLOW_REDIS_QUEUE_STARTUP_GRACE_S(default 30.0)LANGFLOW_REDIS_QUEUE_CANCEL_CHANNEL_ENABLED(default True)LANGFLOW_REDIS_QUEUE_CANCEL_MARKER_TTL(default 60)LANGFLOW_REDIS_QUEUE_POLLING_STALE_THRESHOLD_S(default 90.0; <= 0 disables)LANGFLOW_REDIS_QUEUE_POLLING_WATCHDOG_INTERVAL_S(default 15.0)Reliability hardening
_post_cancel_cleanupbounded by_POST_CANCEL_CLEANUP_TIMEOUT_S = 10souter timeout (periodic cleanup retries any leftovers)._post_cancel_cleanupandtouch_activity—CancelledErrorno longer swallowed, transient Redis errors counted and logged at debug, real bugs surface._background_tasksset with self-cleanup callbacks (no GC-while-pending hazard).create_flow_responseclosure and cancelled in bothon_disconnectand thefinallyofconsume_and_yield.Testing
pr-review-toolkit(code-reviewer + pr-test-analyzer + silent-failure-hunter + comment-analyzer) and by Codex adversarial review. Every must-fix finding addressed.Backwards compatibility
asyncio) queue behavior is unchanged.JobQueueServiceexposesmetrics_snapshot()returning onlybackendandactive_jobs— Redis-only metrics absent for the memory backend.signal_cancelreturns 0 (no-op) whencancel_channel_enabled=Falseor the backend is in-memory.