fix(database): fixes pool related checks, duplicated function calls and session management#9983
fix(database): fixes pool related checks, duplicated function calls and session management#9983ogabrielluiz wants to merge 15 commits into
Conversation
Updated the poolclass validation in DatabaseService to check for subclasses of sa.pool.Pool instead of instances. Additionally, removed invalid poolclass from kwargs to ensure the default pool class is used when an invalid value is specified. This change enhances the robustness of the database connection handling.
…gine Refactored the poolclass selection in the SQLDatabaseComponent to dynamically choose between NullPool for PostgreSQL and StaticPool for other databases. This change improves connection management by ensuring that connections are closed after use for PostgreSQL, enhancing resource efficiency and robustness in database interactions.
Removed the session parameter from `flow_as_tool_websocket_no_session` and `flow_tts_websocket_no_session` functions. Updated the WebSocket handling to utilize a short-lived session context for user authentication, enhancing code clarity and maintaining robust session management. This change improves the overall structure and readability of the WebSocket endpoints.
|
Important Review skippedAuto incremental reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the You can disable this status message by setting the WalkthroughRefactors WebSocket voice endpoints to use ephemeral, context-managed DB sessions (removing Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant C as Client
participant E as WS Endpoint
participant SS as session_scope()
participant A as Auth / Key lookup
participant F as Flow/TTS Handler
participant DB as Database
C->>E: Open WebSocket (flow_id)
activate E
E->>SS: async with session_scope() (open ephemeral session)
activate SS
SS->>A: get_current_user_for_websocket(session)
A-->>SS: user / error
alt auth ok
SS-->>E: proceed
E->>F: handle messages/events
loop per message
F->>SS: save message (uses new ephemeral session via session_scope for each save)
SS->>DB: INSERT/UPDATE
DB-->>SS: ack
SS-->>F: saved
end
F-->>C: stream audio/text
else auth fail
SS-->>E: send error & close
end
note over E,SS: On close/cleanup: reopen session_scope() to fetch auth/info for cleanup
deactivate SS
deactivate E
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Suggested labels
Suggested reviewers
Pre-merge checks and finishing touches❌ Failed checks (2 warnings)
✅ Passed checks (1 passed)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Codecov Report❌ Patch coverage is ❌ Your project status has failed because the head coverage (45.78%) is below the target coverage (55.00%). You can increase the head coverage or adjust the target coverage. Additional details and impacted files@@ Coverage Diff @@
## main #9983 +/- ##
==========================================
- Coverage 23.68% 23.54% -0.15%
==========================================
Files 1090 1090
Lines 39768 39784 +16
Branches 5542 5542
==========================================
- Hits 9419 9366 -53
- Misses 30178 30247 +69
Partials 171 171
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Actionable comments posted: 4
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
src/backend/base/langflow/api/v1/voice_mode.py(2 hunks)src/backend/base/langflow/services/database/service.py(1 hunks)src/lfx/src/lfx/components/langchain_utilities/sql_database.py(2 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
{src/backend/**/*.py,tests/**/*.py,Makefile}
📄 CodeRabbit inference engine (.cursor/rules/backend_development.mdc)
{src/backend/**/*.py,tests/**/*.py,Makefile}: Run make format_backend to format Python code before linting or committing changes
Run make lint to perform linting checks on backend Python code
Files:
src/backend/base/langflow/services/database/service.pysrc/backend/base/langflow/api/v1/voice_mode.py
🧬 Code graph analysis (1)
src/backend/base/langflow/api/v1/voice_mode.py (2)
src/backend/base/langflow/services/deps.py (1)
session_scope(151-173)src/backend/base/langflow/services/auth/utils.py (1)
get_current_user_for_websocket(222-245)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (12)
- GitHub Check: Lint Backend / Run Mypy (3.11)
- GitHub Check: Lint Backend / Run Mypy (3.12)
- GitHub Check: Lint Backend / Run Mypy (3.13)
- GitHub Check: Run Backend Tests / Unit Tests - Python 3.10 - Group 1
- GitHub Check: Run Backend Tests / Unit Tests - Python 3.10 - Group 4
- GitHub Check: Run Backend Tests / Unit Tests - Python 3.10 - Group 3
- GitHub Check: Run Backend Tests / Unit Tests - Python 3.10 - Group 2
- GitHub Check: Run Backend Tests / Unit Tests - Python 3.10 - Group 5
- GitHub Check: Run Backend Tests / Integration Tests - Python 3.10
- GitHub Check: Test Starter Templates
- GitHub Check: Optimize new Python code in this PR
- GitHub Check: Update Starter Projects
🔇 Additional comments (1)
src/lfx/src/lfx/components/langchain_utilities/sql_database.py (1)
3-3: LGTM: Import addedImporting NullPool alongside StaticPool is appropriate for dynamic pool selection.
…getter with session_scope Updated the database utility functions to utilize session_scope for session management, enhancing code clarity and consistency across the codebase. This change improves the robustness of database interactions by standardizing session handling in both the database initialization and test files.
- Remove duplicate transaction management in session_scope() * Delegate to DatabaseService.with_session() which already handles commits/rollbacks * Prevents double commit/rollback that could cause connection issues - Fix transaction CRUD to not manage its own transactions * Remove manual commit/rollback in log_transaction() * Let caller handle transaction lifecycle for better control - Clean up transaction cleanup utility * Remove manual commit in clean_transactions() * Improve error handling and let caller manage transaction These changes improve database connection pool efficiency by ensuring consistent session lifecycle management and preventing connection leaks from improper transaction handling. Addresses connection pool exhaustion issues in high-scale deployments.
- Improved the session_scope context manager to include automatic commit on successful completion and rollback on exceptions, ensuring robust transaction management. - Updated documentation to reflect the new transaction handling behavior, enhancing clarity for future developers. - Removed redundant comments in the log_transaction function to streamline the code. These changes contribute to better error handling and resource management in database interactions.
…tions - Updated `flow_as_tool_websocket` and `flow_tts_websocket` to utilize a consistent short-lived session context for user authentication and database interactions. - Replaced direct session parameter usage with `session_scope` in multiple database calls, enhancing code clarity and maintaining robust session management. - Improved error handling by ensuring all database interactions are wrapped in session contexts, reducing the risk of connection leaks. These changes contribute to better resource management and maintainability in the WebSocket handling logic.
…mponent - Updated the poolclass selection logic to utilize `make_url` for better URI parsing, ensuring accurate identification of PostgreSQL databases. - This change enhances connection management by correctly applying `NullPool` for PostgreSQL and `StaticPool` for other database types, improving resource efficiency and robustness in database interactions.
- Improved the poolclass selection logic to accept both class objects and string names/paths, allowing for more flexible configuration. - Added error handling for dynamic imports of pool classes, ensuring robustness in cases where the specified class is not found. - Updated logging to provide clearer feedback on the selected pool class, enhancing debugging capabilities. These changes contribute to more versatile and reliable database connection management.
|
@coderabbitai could you review the new changes, please? |
🧩 Analysis chainI'll review the new changes for you. Let me examine the modified files to provide comprehensive feedback. @ogabrielluiz I'll review the changes in this PR. The improvements focus on database pool handling and WebSocket session management. Let me conduct a comprehensive review of all the changes. 🏁 Script executed: Length of output: 5789 🏁 Script executed: Length of output: 6216 🏁 Script executed: Length of output: 1846 🏁 Script executed: Length of output: 1170 🏁 Script executed: Length of output: 1871 🏁 Script executed: Length of output: 97 Based on my analysis of the code changes, here's my comprehensive review of PR #9983: ✅ Positive Changes1. Improved Database Pool ValidationThe change from
2. Smart Pool Selection for Different DatabasesThe dynamic pool selection in
3. WebSocket Session Management ImprovementMoving from injected
|
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/backend/base/langflow/services/utils.py (1)
283-292: Don’t reuse a session aftersession_scopecloses
session_scope()closes (and commits/rolls back) theAsyncSessionas soon as theasync withblock finishes. The subsequent calls toclean_transactions/clean_vertex_buildstherefore run against a closed session and will raise on first DB access. Please keep those cleanup calls inside the scope or open a new scope around them. For example:- async with session_scope() as session: - settings_service = get_service(ServiceType.SETTINGS_SERVICE) - await setup_superuser(settings_service, session) + async with session_scope() as session: + settings_service = get_service(ServiceType.SETTINGS_SERVICE) + await setup_superuser(settings_service, session) + await clean_transactions(settings_service, session) + await clean_vertex_builds(settings_service, session) @@ - await clean_transactions(settings_service, session) - await clean_vertex_builds(settings_service, session)
🧹 Nitpick comments (3)
src/backend/tests/unit/test_user.py (1)
8-8: Session scope migration looks good; consider removing redundant commitsAdopting session_scope improves consistency. If session_scope (or DatabaseService.with_session) auto‑commits on context exit, explicit await session.commit() calls here are redundant and can be removed to reduce noise. If you adopt the proposed change to session_scope to purely delegate, keep explicit commits in tests as-is.
Also applies to: 18-27, 55-66, 74-81, 83-92, 99-106, 153-162
src/backend/tests/conftest.py (1)
29-29: Standardizing on session_scope: verify commit responsibility and trim redundancyGood move to centralize on session_scope. Depending on final session_scope semantics:
- If it auto‑commits/rollbacks via with_session, explicit commits here can be dropped.
- If it delegates without auto‑commit, leave explicit commits in place.
Also ensure long‑lived fixtures don’t hold sessions longer than needed (they don’t here).
Also applies to: 552-560, 660-672, 690-728
src/backend/base/langflow/services/database/models/transactions/crud.py (1)
66-72: Ensure PK availability and guard against invalid offset; add flush and validate max_entriesTwo improvements:
- After db.add(table), flush to guarantee PK is available to callers before commit.
- Ensure max_entries >= 1 to avoid offset(-1) when max_entries is 0 or misconfigured.
Apply this diff within this block:
# Add new entry and execute delete db.add(table) + # Ensure INSERT is flushed so PKs are populated for immediate use + await db.flush() await db.exec(delete_older) except Exception: logger.exception("Error logging transaction") raiseAnd update max_entries handling (outside this hunk) near where it’s read:
# after reading settings max_entries = max(1, max_entries)
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (10)
src/backend/base/langflow/api/v1/voice_mode.py(7 hunks)src/backend/base/langflow/services/database/models/transactions/crud.py(1 hunks)src/backend/base/langflow/services/database/service.py(1 hunks)src/backend/base/langflow/services/database/utils.py(1 hunks)src/backend/base/langflow/services/deps.py(1 hunks)src/backend/base/langflow/services/utils.py(1 hunks)src/backend/tests/conftest.py(2 hunks)src/backend/tests/unit/test_database.py(2 hunks)src/backend/tests/unit/test_user.py(7 hunks)src/lfx/src/lfx/components/langchain_utilities/sql_database.py(2 hunks)
🧰 Additional context used
🧠 Learnings (3)
📚 Learning: 2025-07-21T14:16:14.125Z
Learnt from: CR
PR: langflow-ai/langflow#0
File: .cursor/rules/testing.mdc:0-0
Timestamp: 2025-07-21T14:16:14.125Z
Learning: Applies to src/backend/tests/**/*.py : Test Langflow's REST API endpoints in backend Python tests using the async client fixture and asserting response codes and payloads.
Applied to files:
src/backend/tests/conftest.py
📚 Learning: 2025-07-21T14:16:14.125Z
Learnt from: CR
PR: langflow-ai/langflow#0
File: .cursor/rules/testing.mdc:0-0
Timestamp: 2025-07-21T14:16:14.125Z
Learning: Applies to src/backend/tests/**/*.py : Be aware of ContextVar propagation in async tests; test both direct event loop execution and 'asyncio.to_thread' scenarios to ensure proper context isolation.
Applied to files:
src/backend/tests/conftest.py
📚 Learning: 2025-07-21T14:16:14.125Z
Learnt from: CR
PR: langflow-ai/langflow#0
File: .cursor/rules/testing.mdc:0-0
Timestamp: 2025-07-21T14:16:14.125Z
Learning: Applies to src/backend/tests/**/*.py : Use the 'client' fixture (an async httpx.AsyncClient) for API tests in backend Python tests, as defined in 'src/backend/tests/conftest.py'.
Applied to files:
src/backend/tests/conftest.py
🔇 Additional comments (11)
src/backend/base/langflow/services/deps.py (2)
137-146: LGTM: get_session wrapperYielding a session via session_scope keeps dependency usage clean.
152-156: Do not remove manual commit in session_scope
DatabaseService.with_session only handles rollbacks on SQLAlchemy errors and does not auto-commit; session_scope must retain its explicitawait session.commit()/rollback()to ensure transactions complete correctly.Likely an incorrect or invalid review comment.
src/backend/tests/unit/test_user.py (2)
18-27: LGTM: super_user fixture uses session_scopeNo functional change beyond session acquisition; pattern is sound.
55-66: Duplicate of previous session_scope noteSame considerations on commit semantics and potential cleanup of explicit commits.
Also applies to: 74-81, 83-92, 99-106, 153-162
src/backend/tests/conftest.py (1)
552-560: LGTM: flow fixture creation via session_scopeShort‑lived session with explicit commit/refresh is correct. Cleanup is handled.
src/lfx/src/lfx/components/langchain_utilities/sql_database.py (2)
35-36: Simplify Postgres detection to backend check onlyurl.get_backend_name() already returns "postgresql" for all postgresql+* drivers; the drivername check is redundant.
- is_postgresql = url.get_backend_name() == "postgresql" or url.drivername.startswith("postgresql") + is_postgresql = url.get_backend_name() == "postgresql"
41-44: LGTM: Conditional pool selection is appropriateNullPool for Postgres and StaticPool otherwise matches the project’s pooling strategy goals.
src/backend/base/langflow/api/v1/voice_mode.py (1)
1211-1215: Fix authentication call in TTS websocketHere we pass
client_send(a function) toauthenticate_and_get_openai_key, which expects aWebSocketand calls.send_json()on it—this crashes withAttributeError. We also continue even if authentication fails, meaning we could hit OpenAI with aNonebearer token. Please passclient_websocketand bail out when auth fails. For example:- async with session_scope() as auth_session: - current_user: User = await get_current_user_for_websocket(client_websocket, auth_session) - current_user, openai_key = await authenticate_and_get_openai_key(auth_session, current_user, client_send) - current_user_id = current_user.id if current_user else None + async with session_scope() as auth_session: + current_user: User = await get_current_user_for_websocket(client_websocket, auth_session) + current_user, openai_key = await authenticate_and_get_openai_key( + auth_session, current_user, client_websocket + ) + if current_user is None or openai_key is None: + return + current_user_id = current_user.idsrc/backend/base/langflow/services/database/utils.py (1)
50-53: Nice reuse of the shared session scopeReplacing the bespoke context manager with
session_scope()keeps this recovery path aligned with the rest of the codebase and hands transaction handling back toDatabaseService.with_session().src/backend/tests/unit/test_database.py (1)
541-557: Test fixture now mirrors production session handlingSwapping in
session_scope()here ensures the test exercises the same session lifecycle we rely on in production, which should make future regressions easier to catch.src/backend/base/langflow/services/database/service.py (1)
135-146: Honor pool classes defined insqlalchemy.poolRight now a setting such as
{"poolclass": "StaticPool"}is still rejected because you only probesqlalchemy.StaticPool. These pool implementations actually live undersqlalchemy.pool, so we never resolve the class and silently fall back to the default pool, defeating the configuration and reopening the connection-pressure issues this PR is trying to close. Please checksa.poolbefore giving up on the lookup.- pool_class = getattr(sa, poolclass_key, None) + pool_class = getattr(sa, poolclass_key, None) or getattr(sa.pool, poolclass_key, None)
…tions - Removed the session parameter from `add_message_to_db` and `process_message_queue` functions, simplifying the function signatures and enhancing code clarity. - Updated the message processing logic to handle timeouts gracefully, ensuring that the queue processor exits cleanly when no messages are available. - Improved error handling in the message queue processing to better manage exceptions and logging. These changes contribute to a more robust and maintainable message handling system within the voice mode functionality.
…eService - Introduced a new method `_resolve_poolclass` to centralize the logic for resolving pool classes from string names, class objects, or module paths. - Simplified the `_create_engine` method by delegating poolclass resolution to the new method, enhancing code clarity and maintainability. - Improved error handling for dynamic imports and validation of pool classes, ensuring robustness in database connection management. These changes contribute to a more organized and flexible approach to configuring database connection pools.
…rvice - Introduced comprehensive unit tests for the `_resolve_poolclass` method in `DatabaseService`, covering various scenarios including string names, class objects, full module paths, and error handling for invalid inputs. - Added integration tests to ensure correct interaction with `create_async_engine`, verifying that valid and invalid poolclass values are handled appropriately. - These tests enhance the robustness and reliability of the poolclass resolution logic, contributing to better maintainability and confidence in database connection management.
- Updated the session_scope context manager to log detailed error messages when exceptions occur during session handling. This improvement aids in debugging by providing clearer insights into session-related issues. - The change ensures that any errors encountered during the session lifecycle are captured and logged, contributing to more robust error management in database interactions.
|
|
@ogabrielluiz When will this pr be merged? I often encounter a 'database is locked' exception when using Langflow. Will merging this completely fix the problem? I look forward to your reply. |
|
Closing - these changes have already been addressed in other PRs |




This pull request focuses on improving database session management and usage patterns across the codebase. The main changes involve removing the legacy
session_gettercontext manager and standardizing on the newsession_scopecontext manager, which delegates session lifecycle management toDatabaseService.with_session(). Additionally, there are improvements to connection pooling logic and transaction handling.Database session management and context usage:
Removed the obsolete
session_gettercontext manager fromsrc/backend/base/langflow/services/database/utils.pyand updated all references to usesession_scopeinstead, ensuring consistent session handling throughout the backend and tests. [1] [2] [3] [4] [5] [6] [7] [8] [9] [10] [11] [12] [13]Updated the implementation and documentation of
session_scopeinsrc/backend/base/langflow/services/deps.pyto clarify its delegation toDatabaseService.with_session(), which now centrally manages commits and rollbacks. [1] [2]Transaction and error handling improvements:
Connection pooling logic:
DatabaseService._create_engineto useissubclassinstead of instantiating pool classes, and to remove invalid pool class specifications, ensuring robust engine creation.src/lfx/src/lfx/components/langchain_utilities/sql_database.pyto useNullPoolfor PostgreSQL connections andStaticPoolfor others, preventing unnecessary persistent connections. [1] [2]Websocket authentication session management:
src/backend/base/langflow/api/v1/voice_mode.pyto use a short-lived session for authentication, removing the need to pass a session object through the call stack. [1] [2] [3] [4] [5] [6]These changes collectively modernize and standardize database session handling, reduce boilerplate, and improve reliability and maintainability of database interactions.
Summary by CodeRabbit