feat: add s3 file storage implementation#10526
Conversation
|
Important Review skippedAuto incremental reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the You can disable this status message by setting the WalkthroughThis pull request introduces comprehensive changes to database transaction management, storage abstraction, and session lifecycle handling. It replaces explicit commit patterns with flush-based operations and centralizes session management through Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant API
participant SessionScope as session_scope()
participant DBService
participant DB as Database
Client->>API: Request (write operation)
API->>SessionScope: Enter context
SessionScope->>DBService: _with_session()
DBService->>DB: Begin transaction
DB-->>SessionScope: AsyncSession
SessionScope-->>API: Yield session
API->>API: Perform operation
API->>SessionScope: flush() instead of commit()
SessionScope->>DB: Send pending changes (no commit)
DB-->>SessionScope: Changes staged
API->>API: Additional operations in same transaction
alt Success
API->>SessionScope: Exit context normally
SessionScope->>DB: COMMIT
DB-->>SessionScope: Transaction committed
else Exception
API->>SessionScope: Exit context (exception)
SessionScope->>DB: ROLLBACK
DB-->>SessionScope: Transaction rolled back
end
SessionScope-->>Client: Response
sequenceDiagram
participant Component as File Component
participant Settings as Settings Service
participant Storage as Storage Service
participant LocalFS as Local Filesystem
participant S3 as S3 Storage
Component->>Settings: get_settings_service()
Settings-->>Component: storage_type (s3 or local)
alt storage_type == "s3"
Component->>Storage: resolve_component_path(path)
Storage->>S3: Parse S3 key
S3-->>Storage: flow_id/file_name
Storage-->>Component: S3 key
Component->>Storage: read_file_bytes(s3_path)
Storage->>S3: GetObject request
S3-->>Storage: File bytes
Storage-->>Component: File bytes
else storage_type == "local"
Component->>Storage: resolve_component_path(path)
Storage->>LocalFS: Resolve path
LocalFS-->>Storage: Local path
Storage-->>Component: Local path
Component->>Storage: read_file_bytes(local_path)
Storage->>LocalFS: Read file
LocalFS-->>Storage: File bytes
Storage-->>Component: File bytes
end
Component->>Component: Process file content
sequenceDiagram
participant Migration as Alembic Migration
participant Env as env.py
participant Settings as LANGFLOW Settings
participant Lock as PostgreSQL Advisory Lock
participant DB as PostgreSQL DB
Migration->>Env: run_migrations(PostgreSQL)
Env->>Settings: Check LANGFLOW_MIGRATION_LOCK_NAMESPACE
alt LANGFLOW_MIGRATION_LOCK_NAMESPACE set
Settings-->>Env: namespace value
Env->>Env: Compute lock_key = hash(namespace)
else LANGFLOW_MIGRATION_LOCK_NAMESPACE not set
Settings-->>Env: None
Env->>Env: Use default lock_key
end
Env->>DB: SET lock_timeout to 180s
DB-->>Env: Configured
Env->>Lock: SELECT pg_advisory_xact_lock(lock_key)
Lock->>DB: Acquire lock
DB-->>Lock: Lock acquired
Lock-->>Env: Lock held
Env->>Migration: Proceed with migration
Migration->>DB: Run SQL changes
DB-->>Migration: Changes applied
Migration-->>Env: Complete
Env->>Lock: Release lock (transaction end)
Lock->>DB: Lock released
Estimated code review effort🎯 5 (Critical) | ⏱️ ~120 minutes Areas requiring extra attention during review:
Possibly related PRs
Suggested labels
Suggested reviewers
Pre-merge checks and finishing touchesImportant Pre-merge checks failedPlease resolve all errors before merging. Addressing warnings is optional. ❌ Failed checks (1 error, 4 warnings)
✅ Passed checks (2 passed)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
This comment has been minimized.
This comment has been minimized.
Codecov Report❌ Patch coverage is ❌ Your project status has failed because the head coverage (40.17%) is below the target coverage (60.00%). You can increase the head coverage or adjust the target coverage. Additional details and impacted files@@ Coverage Diff @@
## main #10526 +/- ##
==========================================
+ Coverage 32.10% 32.38% +0.28%
==========================================
Files 1364 1366 +2
Lines 62528 62943 +415
Branches 9266 9304 +38
==========================================
+ Hits 20077 20387 +310
- Misses 41437 41531 +94
- Partials 1014 1025 +11
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
This comment has been minimized.
This comment has been minimized.
There was a problem hiding this comment.
Actionable comments posted: 15
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (10)
src/backend/base/langflow/api/v1/files.py (1)
128-151: Critical: Path traversal vulnerability allows unauthorized file access.The function constructs file paths by directly concatenating user-provided
folder_nameandfile_namewithout validation (lines 138-139). An attacker can use path traversal sequences like../to access files outside the intendedprofile_picturesdirectory.Apply this diff to validate that the resolved path stays within the intended directory:
try: # Profile pictures are in the package installation directory package_dir = Path(__file__).parent.parent.parent / "initial_setup" / "profile_pictures" file_path = package_dir / folder_name / file_name + + # Prevent path traversal by ensuring resolved path is within package_dir + if not file_path.resolve().is_relative_to(package_dir.resolve()): + raise HTTPException(status_code=400, detail="Invalid file path") if not file_path.exists(): raise HTTPException(status_code=404, detail="Profile picture not found")Additional issue: Redundant exception handling.
The HTTPException raised at line 142 is caught and re-wrapped at lines 149-150. Consider letting HTTPExceptions propagate naturally.
Apply this diff to fix the redundant error handling:
- except Exception as e: + except HTTPException: + raise + except Exception as e: raise HTTPException(status_code=500, detail=str(e)) from esrc/backend/base/langflow/services/database/models/folder/utils.py (1)
26-34: Fix folder reassignment filter
Flow.folder_id is Noneis evaluated immediately by Python, returningFalse, so the UPDATE never matches any rows. As a result, flows with a nullfolder_idare never migrated into the default folder, defeating the purpose of this helper. Switch to SQLAlchemy's.is_(None)(or equivalent) so the predicate is rendered in SQL. Suggested fix:- await session.exec( - update(Flow) - .where( - and_( - Flow.folder_id is None, - Flow.user_id == user_id, - ) - ) - .values(folder_id=folder.id) - ) + await session.exec( + update(Flow) + .where( + and_( + Flow.folder_id.is_(None), + Flow.user_id == user_id, + ) + ) + .values(folder_id=folder.id) + )src/backend/tests/unit/api/v1/test_files.py (1)
51-72: Add flush before refresh to ensure object persistence.At line 63,
session.refresh(user)is called immediately aftersession.add(user)without an intervening flush. Therefresh()operation requires the object to be persistent in the database, but without a flush, the INSERT may not have been sent yet.Apply this diff:
else: session.add(user) + await session.flush() await session.refresh(user)The same issue exists in the
files_flowfixture at lines 84-85. Apply a similar fix:async with session_scope() as session: session.add(flow) + await session.flush() await session.refresh(flow)src/backend/base/langflow/services/deps.py (1)
152-170: Missing import for asynccontextmanager decorator.Line 152 uses the
@asynccontextmanagerdecorator, butasynccontextmanageris not imported. This will cause aNameErrorat runtime.Add the missing import at the top of the file:
from __future__ import annotations +from contextlib import asynccontextmanager from typing import TYPE_CHECKINGsrc/backend/tests/conftest.py (1)
515-535: Give the superuser fixture its own username.Both
active_userandactive_super_usernow create the username"activeuser". If a test requests both fixtures,active_super_userwill pick up the user created byactive_user, leavingis_superuser=Falseand breaking the test. Please use a distinct username (or otherwise ensureis_superuseris flipped before yielding) so the superuser fixture always returns a real superuser.src/backend/base/langflow/services/auth/utils.py (2)
145-150: Fix dependency injection to returnAsyncSession.
Depends(session_scope)is pulling inlfx.services.deps.session_scope, which is an@asynccontextmanager. FastAPI will inject the context manager object itself, not anAsyncSession, so every call toget_current_userwill pass a_AsyncGeneratorContextManagerinto downstream CRUD helpers and crash at runtime. Import the backend wrapper that yields the session (e.g.,from langflow.services.deps import session_scope) or expose a generator-style function here.-from lfx.services.deps import session_scope +from langflow.services.deps import session_scope
586-591: Same dependency bug for MCP path.The MCP handler still injects the async context manager instead of an
AsyncSession, so MCP auth will fail the moment it hits the database. Align this import with the backend wrapper (see comment above) so the dependency actually yields a session.src/backend/base/langflow/api/v2/files.py (1)
467-501: Fix download streaming flow and HTTP error mapping.Calling
await storage_service.get_file(...)before the streaming branch means we always pull the entire payload into memory—even when the backend supports chunked streaming—so large S3 downloads still read the whole file twice. A missing file now bubbles up as an unhandledFileNotFoundError, which our outerexcept Exceptionconverts into a 500 instead of the expected 404. The fallback path also doesawait byte_stream_generator(...); that function is an async generator, so theawaitraisesTypeErrorwhenever we hit a storage backend without true streaming support.Please restructure this block so we only invoke
get_filewhen it’s actually needed (content return or non-streaming fallback), convertFileNotFoundError/PermissionErrorinto 404/403 immediately, and drop the extraawaitonbyte_stream_generator. For example:- # Get file stream - file_stream = await storage_service.get_file(flow_id=str(current_user.id), file_name=file_name) - - if file_stream is None: - raise HTTPException(status_code=404, detail="File stream not available") - - # If return_content is True, read the file content and return it - if return_content: - # For content return, get the full file - file_content = await storage_service.get_file(flow_id=str(current_user.id), file_name=file_name) - if file_content is None: - raise HTTPException(status_code=404, detail="File not found") - return await read_file_content(file_content, decode=True) - - # For streaming, use the appropriate method based on storage type - if hasattr(storage_service, "get_file_stream"): - # S3 storage - use streaming method - file_stream = storage_service.get_file_stream(flow_id=str(current_user.id), file_name=file_name) - byte_stream = file_stream - else: - # Local storage - get file and convert to stream - file_content = await storage_service.get_file(flow_id=str(current_user.id), file_name=file_name) - if file_content is None: - raise HTTPException(status_code=404, detail="File not found") - byte_stream = await byte_stream_generator(file_content) + try: + if return_content: + file_content = await storage_service.get_file(flow_id=str(current_user.id), file_name=file_name) + return await read_file_content(file_content, decode=True) + + if callable(getattr(storage_service, "get_file_stream", None)): + byte_stream = storage_service.get_file_stream(flow_id=str(current_user.id), file_name=file_name) + else: + file_content = await storage_service.get_file(flow_id=str(current_user.id), file_name=file_name) + byte_stream = byte_stream_generator(file_content) + except FileNotFoundError as exc: + raise HTTPException(status_code=404, detail=str(exc)) from exc + except PermissionError as exc: + raise HTTPException(status_code=403, detail=str(exc)) from excThis keeps streaming efficient, preserves memory, and returns the correct status codes for missing or forbidden files. After this change, the tests asserting a 404 can pass without flakiness.
src/backend/tests/unit/api/v2/test_files.py (1)
1-10: Import the modules you use.
json.dumpsanduuid.uuid4are referenced later in this file, butjsonanduuidare never imported. As soon as the S3 fixtures run, pytest will raiseNameError. Please add the missing imports near the top:-import asyncio -import os +import asyncio +import json +import os import tempfile from contextlib import suppress from pathlib import Path +import uuidsrc/backend/base/langflow/api/v1/flows.py (1)
270-275: Paginated branch still returns ORM modelsWhen
get_allisFalsewe still return the rawPageofFlowORM instances. That bypasses the newFlowRead.model_validate(..., from_attributes=True)conversion, so the paginated response reintroduces the same detached-instance/serialization problems and no longer matches the declaredPage[FlowRead]response model. Please convert the paginated items toFlowReadbefore returning.- return await apaginate(session, stmt, params=params) + page = await apaginate(session, stmt, params=params) + flow_reads = [FlowRead.model_validate(flow, from_attributes=True) for flow in page.items] + page_dict = page.model_dump() + page_dict["items"] = flow_reads + return Page(**page_dict)
🧹 Nitpick comments (6)
src/backend/base/langflow/api/v1/files.py (1)
153-174: Consider defensive path validation.While the folder names are currently hardcoded ("People", "Space"), applying the same path validation pattern as recommended for
download_profile_picturewould provide defense-in-depth against future modifications or directory structure issues.Additionally, the same redundant exception handling issue exists here. Consider letting HTTPExceptions propagate:
try: # Profile pictures are in the package installation directory package_dir = Path(__file__).parent.parent.parent / "initial_setup" / "profile_pictures" + + # Validate package_dir exists within expected bounds + if not package_dir.exists(): + raise HTTPException(status_code=500, detail="Profile pictures directory not found") people_path = package_dir / "People" space_path = package_dir / "Space" # List files from package directory - these are bundled with the container people = [f.name for f in people_path.iterdir() if f.is_file()] if people_path.exists() else [] space = [f.name for f in space_path.iterdir() if f.is_file()] if space_path.exists() else [] - except Exception as e: + except HTTPException: + raise + except Exception as e: raise HTTPException(status_code=500, detail=str(e)) from esrc/frontend/src/hooks/files/use-upload-file.ts (1)
60-65: Preserve the original error for better debugging.The error message normalization pattern is good and provides a consistent user-facing message. However, re-throwing a new Error discards the original stack trace and error properties, which can complicate debugging.
Apply this diff to preserve the original error context:
- } catch (e: any) { + } catch (e: unknown) { const errorMessage = - e?.response?.data?.detail || - e?.message || + (e as any)?.response?.data?.detail || + (e as Error)?.message || "An error occurred while uploading the file"; - throw new Error(errorMessage); + throw new Error(errorMessage, { cause: e }); }This change:
- Uses
unknownfor better type safety (explicit casting required)- Preserves the original error as
cause, maintaining stack traces and debugging context- Keeps the normalized message for user-facing error handling
src/frontend/src/controllers/API/queries/file-management/use-post-upload-file.ts (1)
45-53: Make type annotation consistent.Line 45 uses
anyfor theoldparameter, while lines 33 and 60 useFileType[]. For consistency and better type safety, consider usingFileType[]here as well, or useunknownif you need to handle potentially non-array values before the guard clause.Apply this diff to make the type annotation consistent:
- queryClient.setQueryData(["useGetFilesV2"], (old: any) => { + queryClient.setQueryData(["useGetFilesV2"], (old: FileType[]) => {src/backend/base/langflow/services/database/models/user/crud.py (1)
30-32: Consider removing commented-out code.The commented-out username uniqueness check appears to be dead code. If the validation is no longer needed (perhaps enforced at the database level or elsewhere), consider removing these lines to improve code clarity.
src/lfx/src/lfx/components/twelvelabs/video_file.py (1)
146-149: Consider extracting the duplicated error message.The same error message appears in both
process_files(line 103) andload_files(line 148). Consider defining it as a class constant to ensure consistency and easier maintenance.+ S3_NOT_SUPPORTED_MSG = "Video processing is not supported in S3 mode. Use local storage mode to enable this component." + def process_files(self, file_list: list[BaseFileComponent.BaseFile]) -> list[BaseFileComponent.BaseFile]: """Process video files.""" settings = get_settings_service().settings if settings.storage_type == "s3": - msg = "Video processing is not supported in S3 mode. Use local storage mode to enable this component." - raise ValueError(msg) + raise ValueError(self.S3_NOT_SUPPORTED_MSG)src/backend/base/langflow/api/utils/core.py (1)
13-13: Well-designed session dependency separation.The introduction of
DbSessionReadOnlyalongsideDbSessionis a good architectural improvement. The read-only variant avoids unnecessary locking and commit overhead for read operations.Consider using
DbSessionReadOnlyin read-heavy endpoints (like GET operations) to improve performance and reduce database contention.Also applies to: 38-41
This comment has been minimized.
This comment has been minimized.
|
Build successful! ✅ |
|
Build successful! ✅ |
|
Build successful! ✅ |
|
Build successful! ✅ |
1 similar comment
|
Build successful! ✅ |
|
Build successful! ✅ |
|
Build successful! ✅ |
|
Build successful! ✅ |
|
Build successful! ✅ |
|
@CodeRabbit review PR for issues |
|
Tip For best results, initiate chat on the files or code changes. 🧩 Analysis chain🏁 Script executed: Length of output: 12504 🏁 Script executed: Length of output: 4067 🏁 Script executed: Length of output: 4315 🏁 Script executed: Length of output: 3004 🏁 Script executed: Length of output: 119 🏁 Script executed: Length of output: 7368 🏁 Script executed: Length of output: 1729 🏁 Script executed: Length of output: 23996 @jordanrfrazier I've completed a comprehensive review of the PR. Here are my findings: 🔴 CRITICAL BUG: S3 Streaming Connection Lifecycle IssueLocation: Problem: The async generator has a fundamental context manager lifecycle bug. When the generator yields its first chunk, control returns to the caller, but the async def get_file_stream(self, flow_id: str, file_name: str, chunk_size: int = 8192) -> AsyncIterator[bytes]:
key = self.build_full_path(flow_id, file_name)
try:
async with self._get_client() as s3_client: # ❌ Context exits after first yield!
response = await s3_client.get_object(Bucket=self.bucket_name, Key=key)
body = response["Body"]
try:
async for chunk in body.iter_chunks(chunk_size):
yield chunk # Function returns here, context manager exits
finally:
if hasattr(body, "close"):
with contextlib.suppress(Exception):
await body.close()When FastAPI's Fix: Restructure to keep the aioboto3 session alive during the entire generator lifecycle: @asynccontextmanager
async def _get_streaming_client(self):
"""Get S3 client that stays alive for streaming operations."""
async with self.session.client("s3") as client:
yield client
async def get_file_stream(self, flow_id: str, file_name: str, chunk_size: int = 8192) -> AsyncIterator[bytes]:
key = self.build_full_path(flow_id, file_name)
try:
# Keep client alive for entire generator lifecycle
async with self._get_streaming_client() as s3_client:
response = await s3_client.get_object(Bucket=self.bucket_name, Key=key)
body = response["Body"]
try:
# Now the context stays open during iteration
async for chunk in body.iter_chunks(chunk_size):
yield chunk
finally:
if hasattr(body, "close"):
with contextlib.suppress(Exception):
await body.close()
logger.debug(f"File {file_name} streamed successfully from S3: s3://{self.bucket_name}/{key}")
except Exception as e:
if hasattr(e, "response") and e.response.get("Error", {}).get("Code") == "NoSuchKey":
await logger.awarning(f"File {file_name} not found in S3 flow {flow_id}")
msg = f"File not found: {file_name}"
raise FileNotFoundError(msg) from e
logger.exception(f"Error streaming file {file_name} from S3 in flow {flow_id}")
raiseThis ensures the aioboto3 session and S3 client remain active throughout the entire streaming operation. 🟡 Medium Priority Issues1. Migration Lock Collision RiskLocation: Using only 16 hex characters from SHA-256 increases collision probability: lock_key = int(hashlib.sha256(namespace.encode()).hexdigest()[:16], 16) % (2**63 - 1)Recommendation: Use full 8 bytes for better distribution: hash_bytes = hashlib.sha256(namespace.encode()).digest()[:8]
lock_key = int.from_bytes(hash_bytes, 'big') % (2**63 - 1)2. Missing Exception Handling in Read-Only SessionsLocation: No rollback on exceptions, which could leave locks held: Recommendation: @asynccontextmanager
async def session_scope_readonly() -> AsyncGenerator[AsyncSession, None]:
db_service = get_db_service()
async with db_service._with_session() as session:
try:
yield session
except Exception:
# Rollback even for read-only to release locks
if session.is_active:
try:
await session.rollback()
except InvalidRequestError:
pass
raise🟢 Low Priority ObservationsTemporary File Cleanup in ComponentsLocation: Components create temp files for S3 with try/finally cleanup, but process crashes will orphan files. Consider using ✅ Verified Working Correctly
SummaryThe S3 streaming bug is critical and will cause production issues with file downloads from S3. This must be fixed before merging. The session management refactoring looks solid - the commit → flush pattern correctly relies on The medium-priority issues are improvements but not blockers. Overall, the PR implements a significant architectural change well, aside from the streaming lifecycle bug. 🧠 Learnings used |
|
Build successful! ✅ |
|
Build successful! ✅ |
|
Build successful! ✅ |
1 similar comment
|
Build successful! ✅ |
Adds s3 as a possible backing file storage service. Includes fixes to usage of database session scope.
Summary by CodeRabbit
Release Notes
New Features
Improvements
Documentation