-
Notifications
You must be signed in to change notification settings - Fork 0
feat: add simple ingestion workflow and document processing activities #36
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
- Introduced IngestionWorkflow for orchestrating document ingestion requests. - Added process_document activity to handle document processing logic. - Created schema for IngestionRequest to define the structure of ingestion requests. - Updated registry and workflows to include new ingestion components.
WalkthroughA new Temporal-based ingestion workflow is introduced, including the Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant Temporal
participant IngestionWorkflow
participant process_document
participant CustomIngestionPipeline
Client->>Temporal: Start IngestionWorkflow.run(ingestion_request)
Temporal->>IngestionWorkflow: Invoke run(ingestion_request)
IngestionWorkflow->>process_document: Call with ingestion_request (with retry policy)
process_document->>CustomIngestionPipeline: Initialize with communityId, collectionName
process_document->>CustomIngestionPipeline: Ingest Document (docId, text, metadata, exclusions)
CustomIngestionPipeline-->>process_document: Return
process_document-->>IngestionWorkflow: Return
IngestionWorkflow-->>Temporal: Workflow complete
Temporal-->>Client: Completion notification
Poem
Tip ⚡️ Faster reviews with caching
Enjoy the performance boost—your workflow just got faster. ✨ Finishing Touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (5)
hivemind_etl/simple_ingestion/schema.py (2)
27-29: Improve docstring clarityThe docstring for
collectionNamecould be clearer by rephrasing the default value explanation.- Default is `None` means it would follow the default pattern of `[communityId]_[platformId]` + Default is `None`. When None, the collection name will follow the pattern `[communityId]_[platformId]`
36-36: Suggestion for better UUID generationConsider using a more explicit approach for UUID generation that preserves the UUID type information until string conversion is necessary.
- docId: str = str(uuid4()) + docId: str = None + + def __init__(self, **data): + super().__init__(**data) + if self.docId is None: + self.docId = str(uuid4())This approach would allow passing an existing ID when needed while still generating a UUID when none is provided, and it preserves type information until the last moment.
hivemind_etl/simple_ingestion/pipeline.py (3)
37-48: Consider adding error handling and result feedback.The workflow execution is well-configured with appropriate retry policies and timeouts. However, there's no error handling or status feedback mechanism to indicate success or failure.
Consider enhancing the workflow to return a status object:
@workflow.run -async def run(self, ingestion_request: IngestionRequest) -> None: +async def run(self, ingestion_request: IngestionRequest) -> dict: # ... existing code ... try: await execute_activity( process_document, ingestion_request, retry_policy=retry_policy, start_to_close_timeout=timedelta(minutes=5), ) + return {"status": "success", "request_id": ingestion_request.docId} + except Exception as e: + workflow.logger.error(f"Ingestion failed: {str(e)}") + return {"status": "failed", "request_id": ingestion_request.docId, "error": str(e)}
51-67: Update activity docstring to reflect actual implementation.The docstring mentions "This activity will be implemented by the user" but the implementation is already provided in this file.
@workflow.activity async def process_document( ingestion_request: IngestionRequest, ) -> None: """Process the document according to the ingestion request specifications. Parameters ---------- ingestion_request : IngestionRequest The request containing all necessary information for document processing, including community ID, platform ID, text content, and metadata. Notes ----- - This activity will be implemented by the user to handle the actual document - processing logic, including any necessary embedding or LLM operations. + This activity handles document processing by initializing a CustomIngestionPipeline + with the appropriate collection name and running it on the document created from + the ingestion request data. """
1-88: Consider adding activity logging for observability.The implementation would benefit from adding logging statements to track the progress and completion of activities for better observability in a production environment.
+import logging + from datetime import timedelta from temporalio import workflow from temporalio.common import RetryPolicy from temporalio.workflow import execute_activity from .schema import IngestionRequest from tc_hivemind_backend.ingest_qdrant import CustomIngestionPipeline from llama_index.core import Document +logger = logging.getLogger(__name__) + @workflow.defn class IngestionWorkflow: # ...existing code... @workflow.activity async def process_document( ingestion_request: IngestionRequest, ) -> None: # ...existing docstring... + logger.info(f"Starting document processing for doc_id: {ingestion_request.docId}") if ingestion_request.collectionName is None: collection_name = ( f"{ingestion_request.communityId}_{ingestion_request.platformId}" ) + logger.debug(f"Using default collection name: {collection_name}") else: collection_name = ingestion_request.collectionName + logger.debug(f"Using provided collection name: {collection_name}") # Initialize the ingestion pipeline + logger.debug(f"Initializing pipeline for community: {ingestion_request.communityId}") pipeline = CustomIngestionPipeline( community_id=ingestion_request.communityId, collectionName=collection_name, ) document = Document( doc_id=ingestion_request.docId, text=ingestion_request.text, metadata=ingestion_request.metadata, ) + logger.debug(f"Created document with ID: {document.doc_id}") try: pipeline.run_pipeline(docs=[document]) + logger.info(f"Successfully processed document: {ingestion_request.docId}") except Exception as e: + logger.error(f"Failed to process document {ingestion_request.docId}: {str(e)}") + raise
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (6)
hivemind_etl/activities.py(1 hunks)hivemind_etl/simple_ingestion/pipeline.py(1 hunks)hivemind_etl/simple_ingestion/schema.py(1 hunks)hivemind_etl/website/website_etl.py(1 hunks)registry.py(4 hunks)workflows.py(1 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (4)
hivemind_etl/activities.py (1)
hivemind_etl/simple_ingestion/pipeline.py (1)
process_document(52-87)
workflows.py (1)
hivemind_etl/simple_ingestion/pipeline.py (1)
IngestionWorkflow(12-48)
registry.py (1)
hivemind_etl/simple_ingestion/pipeline.py (2)
process_document(52-87)IngestionWorkflow(12-48)
hivemind_etl/simple_ingestion/pipeline.py (2)
hivemind_etl/simple_ingestion/schema.py (1)
IngestionRequest(5-39)workflows.py (1)
run(30-34)
🪛 Ruff (0.8.2)
hivemind_etl/activities.py
17-17: hivemind_etl.simple_ingestion.pipeline.process_document imported but unused
Remove unused import: hivemind_etl.simple_ingestion.pipeline.process_document
(F401)
workflows.py
14-14: hivemind_etl.simple_ingestion.pipeline.IngestionWorkflow imported but unused
Remove unused import: hivemind_etl.simple_ingestion.pipeline.IngestionWorkflow
(F401)
⏰ Context from checks skipped due to timeout of 90000ms (2)
- GitHub Check: ci / lint / Lint
- GitHub Check: ci / test / Test
🔇 Additional comments (13)
hivemind_etl/website/website_etl.py (1)
22-22: Clean code improvement!Removal of unnecessary trailing whitespace in the docstring improves code cleanliness.
hivemind_etl/activities.py (1)
16-18:Details
✅ Verification successful
Import added for new ingestion capabilities
The import of
process_documentfrom the newsimple_ingestion.pipelinemodule is necessary for the activity to be available for registration in the registry, even though it appears unused in this file directly.
🏁 Script executed:
#!/bin/bash # Check where the imported process_document is used rg -A 2 "process_document" --type pyLength of output: 739
🏁 Script executed:
#!/bin/bash # Show import and registry list in registry.py to confirm usage of process_document echo "== registry.py imports and registry definitions ==" sed -n '1,50p' registry.py echo "..." sed -n '50,100p' registry.py # Show decorators or registration logic in pipeline.py around process_document echo "== pipeline.py around process_document definition ==" sed -n '1,200p' hivemind_etl/simple_ingestion/pipeline.py | sed -n '1,100p'Length of output: 4380
Import confirmed for activity registration
Theprocess_documentfunction is defined inhivemind_etl/simple_ingestion/pipeline.pyand must be re-exported viahivemind_etl/activities.pyso thatregistry.pycan import and include it in itsACTIVITIESlist. The current import is correct and no changes are needed.🧰 Tools
🪛 Ruff (0.8.2)
17-17:
hivemind_etl.simple_ingestion.pipeline.process_documentimported but unusedRemove unused import:
hivemind_etl.simple_ingestion.pipeline.process_document(F401)
workflows.py (1)
13-15:Details
✅ Verification successful
Import added for new workflow registration
The import of
IngestionWorkflowis necessary for the workflow to be available for registration in the registry system, even though it appears unused in this file directly.
🏁 Script executed:
#!/bin/bash # Check where IngestionWorkflow is used rg -A 2 "IngestionWorkflow" --type pyLength of output: 525
🏁 Script executed:
#!/bin/bash # Display the contents of workflows.py around the imports and workflow list sed -n '1,200p' workflows.pyLength of output: 912
🏁 Script executed:
#!/bin/bash # Display the first 200 lines of registry.py to inspect workflow registration sed -n '1,200p' registry.pyLength of output: 1237
✅ Confirmed:
IngestionWorkflowimport is required for registry registrationThe added import in
workflows.pyensures thatregistry.pycan successfully import and includeIngestionWorkflowin theWORKFLOWSlist. No further changes are needed.• workflows.py: lines 13–15 – import of
IngestionWorkflow
• registry.py: imports and listsIngestionWorkflowinWORKFLOWS🧰 Tools
🪛 Ruff (0.8.2)
14-14:
hivemind_etl.simple_ingestion.pipeline.IngestionWorkflowimported but unusedRemove unused import:
hivemind_etl.simple_ingestion.pipeline.IngestionWorkflow(F401)
registry.py (4)
11-11: New activity imported correctlyThe
process_documentactivity is properly imported from activities, allowing it to be registered in the system.
24-24: New workflow imported correctlyThe
IngestionWorkflowis correctly imported from workflows, allowing it to be registered in the system.
33-33: Workflow registration completeThe
IngestionWorkflowis properly added to theWORKFLOWSlist, making it available to the Temporal orchestration framework.
49-49: Activity registration completeThe
process_documentactivity is properly added to theACTIVITIESlist, making it available to the Temporal orchestration framework.hivemind_etl/simple_ingestion/schema.py (1)
1-40: Well-structured data model for ingestion requestsThe
IngestionRequestPydantic model is well-defined with comprehensive documentation and appropriate default values.hivemind_etl/simple_ingestion/pipeline.py (5)
1-9: Import organization looks good.The imports are well-organized, with standard library imports first, followed by third-party imports and local imports. Clear separation between import groups enhances readability.
11-18: Well-structured workflow definition with clear documentation.The workflow definition follows Temporal best practices with appropriate decorators and documentation. The class docstring clearly explains the purpose of the workflow.
19-36: Comprehensive documentation for the run method.The docstring provides detailed information about parameters and implementation notes, which is excellent for maintainability and developer onboarding.
68-74: Collection name determination logic is correct.The code properly handles both cases where a collection name is explicitly provided or needs to be constructed from community and platform IDs.
75-88:Details
❓ Verification inconclusive
Add error handling and utilize all ingestion request properties.
The activity doesn't handle potential exceptions from the pipeline and doesn't utilize the excluded metadata keys from the ingestion request.
# Initialize the ingestion pipeline pipeline = CustomIngestionPipeline( community_id=ingestion_request.communityId, collectionName=collection_name, + excluded_embed_metadata_keys=ingestion_request.excludedEmbedMetadataKeys, + excluded_llm_metadata_keys=ingestion_request.excludedLlmMetadataKeys, ) document = Document( doc_id=ingestion_request.docId, text=ingestion_request.text, metadata=ingestion_request.metadata, ) + try: pipeline.run_pipeline(docs=[document]) + return {"status": "success", "doc_id": ingestion_request.docId} + except Exception as e: + import logging + logging.error(f"Document processing failed: {str(e)}") + raise # Re-raise to trigger Temporal's retry mechanismPlease verify that the
CustomIngestionPipelineclass acceptsexcluded_embed_metadata_keysandexcluded_llm_metadata_keysparameters:
🏁 Script executed:
#!/bin/bash # Check if CustomIngestionPipeline accepts excluded metadata keys parameters rg -A 10 "class CustomIngestionPipeline" tc_hivemind_backend/Length of output: 124
I couldn’t find
CustomIngestionPipelineunder the path you tried. Let’s search the entire repository for its definition:
🏁 Script executed:
#!/bin/bash # Locate CustomIngestionPipeline class definition and signature rg -n "class CustomIngestionPipeline" -C 5 .Length of output: 46
Let’s locate any
pipeline.pyfiles and inspect their class definitions and constructors:
🏁 Script executed:
#!/bin/bash # Find pipeline.py files echo "pipeline.py files:" fd '^pipeline\.py$' # For each pipeline.py, list class definitions and __init__ signatures for f in $(fd '^pipeline\.py$'); do echo "---- $f ----" rg -n "^class " -C 3 "$f" rg -n "def __init__" -C 3 "$f" doneLength of output: 628
Let’s search the entire repo for
CustomIngestionPipelinedefinitions and usages:
🏁 Script executed:
#!/bin/bash # Look for class definition rg -n "class CustomIngestionPipeline" -C 5 . # Look for any usage or import sites to infer its constructor signature rg -n "CustomIngestionPipeline" -C 5 .Length of output: 8199
Verify CustomIngestionPipeline constructor parameters
I wasn’t able to find the
CustomIngestionPipelineclass definition in this repo—it’s imported fromtc_hivemind_backend.ingest_qdrant, which appears to be an external module. Please confirm that its__init__signature supports the parameters:
excluded_embed_metadata_keysexcluded_llm_metadata_keys
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
hivemind_etl/simple_ingestion/pipeline.py(1 hunks)registry.py(3 hunks)workflows.py(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
- registry.py
- hivemind_etl/simple_ingestion/pipeline.py
🧰 Additional context used
🧬 Code Graph Analysis (1)
workflows.py (1)
hivemind_etl/simple_ingestion/pipeline.py (1)
IngestionWorkflow(14-50)
🪛 Ruff (0.8.2)
workflows.py
14-14: hivemind_etl.simple_ingestion.pipeline.IngestionWorkflow imported but unused
Remove unused import: hivemind_etl.simple_ingestion.pipeline.IngestionWorkflow
(F401)
⏰ Context from checks skipped due to timeout of 90000ms (1)
- GitHub Check: ci / test / Test
Summary by CodeRabbit
New Features
Chores