refactor(docling): extract processing logic to separate worker process#9393
Conversation
- Move Docling processing to dedicated worker function - Preserve all original pipeline configuration logic - Maintain support for standard and VLM pipelines - Keep complete OCR engine configuration - Add proper error handling for multiprocessing context
|
Important Review skippedAuto incremental reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the You can disable this status message by setting the WalkthroughIntroduces a separate multiprocessing worker (docling_worker) for Docling conversions. The inline component now spawns a process with a queue, delegates conversion to the worker, receives results/errors via IPC, and maps outputs to existing Data structures. Worker handles lazy imports, pipeline selection (standard/VLM), OCR options, conversion, and error normalization. Changes
Sequence Diagram(s)sequenceDiagram
participant Caller as DoclingInlineComponent
participant MP as multiprocessing (spawn)
participant P as Worker Process
participant Q as Queue
participant W as docling_worker
participant D as Docling Converter
Caller->>MP: create Queue, spawn Process(target=docling_worker, args)
MP->>P: start
P->>W: run(file_paths, queue, pipeline, ocr_engine)
W->>D: lazy import + configure pipeline (standard/VLM, OCR)
W->>D: convert_all(file_paths)
D-->>W: results
W->>Q: put(processed_data or {"error": msg})
Caller->>Q: get()
Caller->>MP: join process
alt error
Q-->>Caller: {"error": msg}
Caller->>Caller: raise ImportError(msg)
else success
Q-->>Caller: [per-file dicts/None]
Caller->>Caller: map to Data objects
end
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes ✨ Finishing Touches🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. CodeRabbit Commands (Invoked using PR/Issue comments)Type Other keywords and placeholders
Status, Documentation and Community
|
There was a problem hiding this comment.
Actionable comments posted: 4
🧹 Nitpick comments (3)
src/backend/base/langflow/components/docling/__init__.py (1)
106-114: Avoid passing heavy/unpicklable objects through multiprocessing.Queueres.document may be large and/or not picklable, risking timeouts or BrokenPipeError. Consider serializing to a compact form (e.g., JSON) or writing to a temp file and returning a reference, then reconstructing in the parent process. Also, status is not used by the caller; dropping it reduces payload size.
If you stick with pickling the document, please verify stability with realistic inputs. If you prefer, I can draft a serialization-based approach.
Minimal payload tweak (status removal) if you choose to keep pickling:
- processed_data = [ - {"document": res.document, "file_path": str(res.input.file), "status": res.status.name} + processed_data = [ + {"document": res.document, "file_path": str(res.input.file)} if res.status == ConversionStatus.SUCCESS else None for res in results ]src/backend/base/langflow/components/docling/docling_inline.py (2)
82-88: Good call on using spawn context; consider naming and reuseUsing get_context("spawn") improves cross-platform stability. Optionally, consider naming the process (name="docling-worker") for easier debugging or reusing a long-lived process if throughput becomes a concern.
If desired, I can sketch a simple single-worker lifecycle manager to amortize spawn costs.
97-97: Leverage worker-provided status (or drop it at the source)You’re discarding the status information returned from the worker. Either log/report failed conversions here for observability, or remove status from the worker payload to reduce IPC payload and serialization overhead.
I can wire basic logging that reports successes/failures per file before rollup_data if you want that visibility.
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these settings in your CodeRabbit configuration.
📒 Files selected for processing (2)
src/backend/base/langflow/components/docling/__init__.py(1 hunks)src/backend/base/langflow/components/docling/docling_inline.py(2 hunks)
🧰 Additional context used
📓 Path-based instructions (4)
src/backend/base/langflow/components/**/*.py
📄 CodeRabbit Inference Engine (.cursor/rules/backend_development.mdc)
src/backend/base/langflow/components/**/*.py: Add new backend components to the appropriate subdirectory under src/backend/base/langflow/components/
Implement async component methods using async def and await for asynchronous operations
Use asyncio.create_task for background work in async components and ensure proper cleanup on cancellation
Use asyncio.Queue for non-blocking queue operations in async components and handle timeouts appropriately
Files:
src/backend/base/langflow/components/docling/__init__.pysrc/backend/base/langflow/components/docling/docling_inline.py
src/backend/base/langflow/components/**/__init__.py
📄 CodeRabbit Inference Engine (.cursor/rules/backend_development.mdc)
Update init.py with alphabetical imports when adding new components
Files:
src/backend/base/langflow/components/docling/__init__.py
{src/backend/**/*.py,tests/**/*.py,Makefile}
📄 CodeRabbit Inference Engine (.cursor/rules/backend_development.mdc)
{src/backend/**/*.py,tests/**/*.py,Makefile}: Run make format_backend to format Python code before linting or committing changes
Run make lint to perform linting checks on backend Python code
Files:
src/backend/base/langflow/components/docling/__init__.pysrc/backend/base/langflow/components/docling/docling_inline.py
src/backend/**/components/**/*.py
📄 CodeRabbit Inference Engine (.cursor/rules/icons.mdc)
In your Python component class, set the
iconattribute to a string matching the frontend icon mapping exactly (case-sensitive).
Files:
src/backend/base/langflow/components/docling/__init__.pysrc/backend/base/langflow/components/docling/docling_inline.py
🧬 Code Graph Analysis (1)
src/backend/base/langflow/components/docling/__init__.py (2)
src/backend/base/langflow/services/task/backends/anyio.py (1)
status(24-27)src/backend/base/langflow/base/astra_assistants/util.py (1)
name(147-151)
🔇 Additional comments (3)
src/backend/base/langflow/components/docling/__init__.py (1)
95-101: Confirm handling of non-PDF inputs (DOCX, PPTX, etc.)Only InputFormat.PDF and InputFormat.IMAGE are provided with explicit options. Ensure other formats you advertise (e.g., docx, pptx) are correctly handled by DocumentConverter defaults or add explicit options if required.
Would you like me to scan the codebase/usages to confirm non-PDF formats are covered by defaults, or add explicit FormatOptions for common formats?
src/backend/base/langflow/components/docling/docling_inline.py (2)
1-4: LGTM: Worker import and multiprocessing primitivesImporting docling_worker from the package and using multiprocessing primitives at module scope is appropriate given the spawn-start method usage below.
75-99: Async guideline check: verify whether a synchronous component is acceptable herePer repository guidelines, async methods are encouraged for components. process_files is synchronous and blocks the caller while waiting on the worker. If the surrounding pipeline is async, consider providing an async variant with asyncio.to_thread or an event-loop-friendly approach. If the component framework expects sync, ignore this.
Would you like me to propose an async process_files implementation using asyncio + run_in_executor, while preserving the spawn-based worker behavior?
Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
Codecov Report✅ All modified and coverable lines are covered by tests. ❌ Your project status has failed because the head coverage (2.67%) is below the target coverage (10.00%). You can increase the head coverage or adjust the target coverage. Additional details and impacted files@@ Coverage Diff @@
## main #9393 +/- ##
==========================================
- Coverage 33.27% 33.25% -0.02%
==========================================
Files 1209 1209
Lines 57545 57545
Branches 5363 5363
==========================================
- Hits 19146 19137 -9
- Misses 38339 38348 +9
Partials 60 60
Flags with carried forward coverage won't be shown. Click here to find out more. 🚀 New features to boost your workflow:
|
|
#9393) * refactor(docling): extract processing logic to separate worker process - Move Docling processing to dedicated worker function - Preserve all original pipeline configuration logic - Maintain support for standard and VLM pipelines - Keep complete OCR engine configuration - Add proper error handling for multiprocessing context * Update src/backend/base/langflow/components/docling/__init__.py Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com> * Update src/backend/base/langflow/components/docling/__init__.py Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com> * [autofix.ci] apply automated fixes * Update src/backend/base/langflow/components/docling/__init__.py Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com> * Update src/backend/base/langflow/components/docling/docling_inline.py Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com> * [autofix.ci] apply automated fixes * feat: add process monitoring and timeout handling * fix: ruff check * feat: add graceful signal handling to docling worker * friendlier error message * Swallow stack trace on interrupt * [autofix.ci] apply automated fixes * fix: ruff error * fix: mypy error --------- Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com> Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com> Co-authored-by: Jordan Frazier <jordan.frazier@datastax.com>
#9393) * refactor(docling): extract processing logic to separate worker process - Move Docling processing to dedicated worker function - Preserve all original pipeline configuration logic - Maintain support for standard and VLM pipelines - Keep complete OCR engine configuration - Add proper error handling for multiprocessing context * Update src/backend/base/langflow/components/docling/__init__.py Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com> * Update src/backend/base/langflow/components/docling/__init__.py Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com> * [autofix.ci] apply automated fixes * Update src/backend/base/langflow/components/docling/__init__.py Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com> * Update src/backend/base/langflow/components/docling/docling_inline.py Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com> * [autofix.ci] apply automated fixes * feat: add process monitoring and timeout handling * fix: ruff check * feat: add graceful signal handling to docling worker * friendlier error message * Swallow stack trace on interrupt * [autofix.ci] apply automated fixes * fix: ruff error * fix: mypy error --------- Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com> Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com> Co-authored-by: Jordan Frazier <jordan.frazier@datastax.com>



Extracts Docling processing to separate worker process while maintaining feature parity with original implementation.
Key changes:
Summary by CodeRabbit