feat: Redis-backed job queue for multi-worker deployments#12588
Conversation
|
Important Review skippedAuto reviews are disabled on base/target branches other than the default branch. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Path: .coderabbit.yaml Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
✨ Finishing Touches🧪 Generate unit tests (beta)
Tip 💬 Introducing Slack Agent: The best way for teams to turn conversations into code.Slack Agent is built on CodeRabbit's deep understanding of your code, so your team can collaborate across the entire SDLC without losing context.
Built for teams:
One agent for your entire SDLC. Right inside Slack. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
@ogabrielluiz Hi, what do you think about it? |
981fe5f to
dc26d19
Compare
|
@erichare let me know if there is anything I can help here with :-) |
|
@ogabrielluiz would this feature would be useful? |
- Refactored the job queue service to support Redis-backed management for cross-worker scaling.
- Added environment variables for configuration:
- `LANGFLOW_JOB_QUEUE_TYPE=redis`
- `LANGFLOW_REDIS_QUEUE_DB=1`
- Updated job ownership methods to be asynchronous for improved concurrency handling.
- Enhanced Redis cache service with namespacing via key prefixes.
- Introduced `fakeredis` for in-memory Redis simulation in testin>
- Added comprehensive unit tests for Redis job queue components.
- Introduced a mechanism to emit a one-time warning for the RedisCache experimental feature during server runtime. - The warning is logged only if no other worker has already emitted it, ensuring clarity for users regarding the experimental status of RedisCache. - The implementation includes a temporary file check to prevent multiple warnings across different processes.
- Added documentation for LANGFLOW_GUNICORN_PRELOAD to explain preloading for better performance. - Detailed the use of LANGFLOW_JOB_QUEUE_TYPE for specifying backends (e.g., Redis). - Included LANGFLOW_REDIS_QUEUE_DB to define the database index for job queues. - Updated the "High-Load Environments" guide with these optimal configurations.
…ncellation and buffer management - Introduced a structural protocol `_CancellableQueue` to ensure queues can handle cancellation properly during client disconnects. - Updated `RedisQueueWrapper` to implement this protocol, allowing for graceful cancellation of background tasks. - Added a maximum size limit to the internal buffer to prevent unbounded memory usage and ensure backpressure on slow consumers. - Implemented a done callback to handle unexpected fill task crashes, ensuring consumers are not left hanging indefinitely. - Enhanced unit tests to verify compliance with the new protocol and the behavior of the buffer under various conditions.
|
@ogabrielluiz did some enhancements. |
ogabrielluiz
left a comment
There was a problem hiding this comment.
LGTM — all 10 review threads addressed with dedicated fixes, verified end-to-end against a two-worker harness on real Redis. 23/23 unit tests + 7/7 cross-worker scenarios pass on fbd774a.
The two blockers from the last pass are closed:
- streaming/direct cross-worker no longer 404s (9cf34bf)
- early-poll race resolved with the 30s
_STARTUP_GRACE_S+_observed_streamflag (88b60cf)
The remaining cross-worker passive-disconnect case (worker B sees client leave but worker A keeps producing) is now an explicit, logged limitation rather than a silent no-op — happy to ship that as a follow-up PR with a Redis pubsub side-channel.
Nice work on the iteration!
|
@ogabrielluiz thanks! I merged conflicts :-) ready to go. Thanks! |
244f455 to
86722e2
Compare
…l response - Deliver end-of-stream sentinel on fill-task cancellation (_on_fill_done now handles both cancelled and exception paths so consumers are never left hanging) - Add _error_start time-bound to xread and exists() error loops: after _STARTUP_GRACE_S seconds of continuous Redis errors the sentinel is delivered instead of retrying forever - Advance _last_id cursor only after buffer.put() succeeds so cancellation mid-put does not silently skip that message in the Redis cursor - Return False from cancel_flow_build when event_task is None (cross-worker path) so the HTTP response correctly reports success=False instead of false success
jordanrfrazier
left a comment
There was a problem hiding this comment.
@severfire Awesome work again. Skimmed over and briefly tested manually, all seems good to me. Going to get this in our QA's field now.
I also added one commit -- it had some fixes that made sense in the explanation (in commit message). Please take a look and feel free to revert and update as you see fit.
Did you by chance test whether tracing continues to work as expected?
|
@jordanrfrazier Hmm... To tell you the truth, I tested manually chat and ran automated tests that are in Langflow. Also some things might have been fixed in #13084 - I wonder if fixes should be done there or here. Did you see any issues with tracing? |
I didn't test myself, it's just the first thing that I thought of that may not have been accounted for in a more distributed world. I'll make sure we get some manual tests done on this. |
|
@jordanrfrazier thank you very much! I will check it as well tonight. Btw - I am starting work on OAuth Accounts manager for Langflow :-) I would be awesome to connect to Google Cloud and others using OAuth |
|
@jordanrfrazier - Tracing works on my machine. tested with 3 chats running at once :-) From what I see, as you queued to merge, you tested it as well :-) glad it worked! |
ea29bcc
|
oh, I think my update went 3 minutes too late! :-D Pull request merged. LOL. |
Conflicts resolved by taking the PR (HEAD) side for the production-hardened job_queue service, factory, settings, build/monitor APIs, and tests. The PR's RedisJobQueueService is a deliberate superset of the version that landed via #12588: it adds cross-worker cancel via PSUBSCRIBE, cancel-marker fallback, dispatcher auto-reconnect, polling watchdog, ops metrics endpoint, and client-disconnect propagation via signal_cancel. Other resolutions: - pyproject.toml: keep the Python 3.14 onnxruntime split from release-1.10.0 - docs env-variables.mdx: drop duplicate 'High-load and multi-worker' heading - uv.lock: regenerated against the merged pyproject.toml
…rted delivery - RedisQueueWrapper: restore _BUFFER_MAXSIZE bounded buffer and the _on_fill_done done-callback safety net that release-1.10.0 added in #12588, so a slow consumer cannot grow the buffer without bound and a crashing or cancelled _fill_task cannot leave consumers stuck on await get(). - build.get_flow_events_response: explicit exhaustiveness guard. Unknown EventDeliveryType values now return HTTP 400 with the supported set and a remediation hint instead of silently falling through to the polling path. - lfx settings.set_event_delivery: when workers > 1 without a redis queue, upgrade the warning to name the requested mode, the forced fallback, and the LANGFLOW_JOB_QUEUE_TYPE env var that would preserve the original mode. - Tests: port the three RedisQueueWrapper safety tests from release-1.10.0 and add coverage for the new event_delivery guard.

Summary
Adds an optional Redis-backed job queue so flow build events work when multiple Gunicorn/Uvicorn workers handle the start request and later poll/stream requests on different processes. Keeps the default in-memory
asyncioqueue unchanged.Problem
With
workers > 1, the in-memoryJobQueueServiceis per-process, so a job started on one worker can hitJobQueueNotFoundError(or lose events) when the client talks to another worker. This change routes queue traffic through Redis Streams so any worker can consume the same job stream.What changed
RedisJobQueueServiceandRedisQueueWrapper: producer path bridges the existingasyncio.Queue+EventManagerto Redis Streams (XADD); consumer path reads viaXREADinto a local buffer sobuild.pykeeps anasyncio.Queue-like API.JobQueueServiceFactory: selects implementation from settings —job_queue_type == "redis"→ Redis, otherwise existing in-memory service.lfx):job_queue_type(asyncio|redis),redis_queue_*(host/port/db/url/ttl) with defaults aligned to separate queue DB from cache.event_deliveryvalidation: multi-worker no longer forcesdirectdelivery whenjob_queue_type=redis, since cross-worker queue state is shared.register_job_owner/get_job_ownercan use Redis for cross-worker auth checks.fakeredis(async) unit tests for the Redis queue wrapper and service behavior without a real Redis.LANGFLOW_JOB_QUEUE_TYPE,LANGFLOW_REDIS_QUEUE_DB,LANGFLOW_GUNICORN_PRELOAD, and high-load guidance updated (current + versioned docs).Configuration (operators)
LANGFLOW_JOB_QUEUE_TYPE=redisLANGFLOW_REDIS_QUEUE_URLor host/portLANGFLOW_REDIS_QUEUE_DBLANGFLOW_REDIS_QUEUE_TTLKnown limitations (documented in code)
How to test
related:
#12364
#12587
@jordanrfrazier