From b791eabc3c5291de16123a47de830c34f1a4b299 Mon Sep 17 00:00:00 2001 From: Mohammad Amin Date: Wed, 7 May 2025 12:15:04 +0330 Subject: [PATCH 1/4] feat: add simple ingestion workflow and document processing activities - Introduced IngestionWorkflow for orchestrating document ingestion requests. - Added process_document activity to handle document processing logic. - Created schema for IngestionRequest to define the structure of ingestion requests. - Updated registry and workflows to include new ingestion components. --- hivemind_etl/activities.py | 3 + hivemind_etl/simple_ingestion/__init__.py | 0 hivemind_etl/simple_ingestion/pipeline.py | 87 +++++++++++++++++++++++ hivemind_etl/simple_ingestion/schema.py | 39 ++++++++++ hivemind_etl/website/website_etl.py | 2 +- registry.py | 4 ++ workflows.py | 3 + 7 files changed, 137 insertions(+), 1 deletion(-) create mode 100644 hivemind_etl/simple_ingestion/__init__.py create mode 100644 hivemind_etl/simple_ingestion/pipeline.py create mode 100644 hivemind_etl/simple_ingestion/schema.py diff --git a/hivemind_etl/activities.py b/hivemind_etl/activities.py index 25c231d..9ad8b3f 100644 --- a/hivemind_etl/activities.py +++ b/hivemind_etl/activities.py @@ -13,6 +13,9 @@ transform_mediawiki_data, load_mediawiki_data, ) +from hivemind_etl.simple_ingestion.pipeline import ( + process_document, +) from temporalio import activity diff --git a/hivemind_etl/simple_ingestion/__init__.py b/hivemind_etl/simple_ingestion/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/hivemind_etl/simple_ingestion/pipeline.py b/hivemind_etl/simple_ingestion/pipeline.py new file mode 100644 index 0000000..8d0a769 --- /dev/null +++ b/hivemind_etl/simple_ingestion/pipeline.py @@ -0,0 +1,87 @@ +from datetime import timedelta + +from temporalio import workflow +from temporalio.common import RetryPolicy +from temporalio.workflow import execute_activity +from .schema import IngestionRequest +from tc_hivemind_backend.ingest_qdrant import CustomIngestionPipeline +from llama_index.core import Document + + +@workflow.defn +class IngestionWorkflow: + """A Temporal workflow for processing document ingestion requests. + + This workflow handles the orchestration of document processing activities, + including retry logic and timeout configurations. + """ + + @workflow.run + async def run(self, ingestion_request: IngestionRequest) -> None: + """Execute the ingestion workflow. + + Parameters + ---------- + ingestion_request : IngestionRequest + The request containing all necessary information for document processing, + including community ID, platform ID, text content, and metadata. + + Notes + ----- + The workflow implements a retry policy with the following configuration: + - Initial retry interval: 1 second + - Maximum retry interval: 1 minute + - Maximum retry attempts: 3 + - Activity timeout: 5 minutes + """ + retry_policy = RetryPolicy( + initial_interval=timedelta(seconds=1), + maximum_interval=timedelta(minutes=1), + maximum_attempts=3, + ) + + await execute_activity( + process_document, + ingestion_request, + retry_policy=retry_policy, + start_to_close_timeout=timedelta(minutes=5), + ) + + +@workflow.activity +async def process_document( + ingestion_request: IngestionRequest, +) -> None: + """Process the document according to the ingestion request specifications. + + Parameters + ---------- + ingestion_request : IngestionRequest + The request containing all necessary information for document processing, + including community ID, platform ID, text content, and metadata. + + Notes + ----- + This activity will be implemented by the user to handle the actual document + processing logic, including any necessary embedding or LLM operations. + """ + if ingestion_request.collectionName is None: + collection_name = ( + f"{ingestion_request.communityId}_{ingestion_request.platformId}" + ) + else: + collection_name = ingestion_request.collectionName + + # Initialize the ingestion pipeline + pipeline = CustomIngestionPipeline( + community_id=ingestion_request.communityId, + collectionName=collection_name, + ) + + document = Document( + doc_id=ingestion_request.docId, + text=ingestion_request.text, + metadata=ingestion_request.metadata, + ) + + pipeline.run_pipeline(docs=[document]) diff --git a/hivemind_etl/simple_ingestion/schema.py b/hivemind_etl/simple_ingestion/schema.py new file mode 100644 index 0000000..9a9920e --- /dev/null +++ b/hivemind_etl/simple_ingestion/schema.py @@ -0,0 +1,39 @@ +from pydantic import BaseModel +from uuid import uuid4 + + +class IngestionRequest(BaseModel): + """A model representing an ingestion request for document processing. + + Parameters + ---------- + communityId : str + The unique identifier of the community. + platformId : str + The unique identifier of the platform. + text : str + The text content to be processed. + metadata : dict + Additional metadata associated with the document. + docId : str, optional + Unique identifier for the document. If not provided, a UUID will be generated. + Default is a new UUID. + excludedEmbedMetadataKeys : list[str], optional + List of metadata keys to exclude from embedding process. + Default is an empty list. + excludedLlmMetadataKeys : list[str], optional + List of metadata keys to exclude from LLM processing. + Default is an empty list. + collectionName : str | None, optional + The name of the collection to use for the document. + Default is `None` means it would follow the default pattern of `[communityId]_[platformId]` + """ + + communityId: str + platformId: str + text: str + metadata: dict + docId: str = str(uuid4()) + excludedEmbedMetadataKeys: list[str] = [] + excludedLlmMetadataKeys: list[str] = [] + collectionName: str | None = None diff --git a/hivemind_etl/website/website_etl.py b/hivemind_etl/website/website_etl.py index 385b3c9..2d678bb 100644 --- a/hivemind_etl/website/website_etl.py +++ b/hivemind_etl/website/website_etl.py @@ -19,7 +19,7 @@ def __init__( the community to save its data platform_id : str the platform to save its data - + Note: the collection name would be `community_id_platform_id` """ if not community_id or not isinstance(community_id, str): diff --git a/registry.py b/registry.py index 0856ac2..5e57eba 100644 --- a/registry.py +++ b/registry.py @@ -8,6 +8,7 @@ get_hivemind_mediawiki_platforms, transform_mediawiki_data, load_mediawiki_data, + process_document, ) from hivemind_summarizer.activities import ( fetch_platform_summaries_by_date, @@ -20,6 +21,7 @@ WebsiteIngestionSchedulerWorkflow, MediaWikiETLWorkflow, PlatformSummariesWorkflow, + IngestionWorkflow, ) WORKFLOWS = [ @@ -28,6 +30,7 @@ WebsiteIngestionSchedulerWorkflow, MediaWikiETLWorkflow, PlatformSummariesWorkflow, + IngestionWorkflow, ] ACTIVITIES = [ @@ -43,4 +46,5 @@ fetch_platform_summaries_by_date, fetch_platform_summaries_by_date_range, get_platform_name, + process_document, ] diff --git a/workflows.py b/workflows.py index 8f60276..6afb12b 100644 --- a/workflows.py +++ b/workflows.py @@ -10,6 +10,9 @@ from hivemind_etl.mediawiki.workflows import ( MediaWikiETLWorkflow, ) +from hivemind_etl.simple_ingestion.pipeline import ( + IngestionWorkflow, +) from hivemind_summarizer.workflows import PlatformSummariesWorkflow from temporalio import workflow From 9a5c7b0498e642955b6687842f64d0f0caaa1e41 Mon Sep 17 00:00:00 2001 From: Mohammad Amin Date: Thu, 8 May 2025 10:00:05 +0330 Subject: [PATCH 2/4] fix: IngestionWorkflow pipeline! --- hivemind_etl/simple_ingestion/pipeline.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/hivemind_etl/simple_ingestion/pipeline.py b/hivemind_etl/simple_ingestion/pipeline.py index 8d0a769..1008cee 100644 --- a/hivemind_etl/simple_ingestion/pipeline.py +++ b/hivemind_etl/simple_ingestion/pipeline.py @@ -1,11 +1,13 @@ from datetime import timedelta -from temporalio import workflow +from temporalio import activity,workflow from temporalio.common import RetryPolicy from temporalio.workflow import execute_activity from .schema import IngestionRequest -from tc_hivemind_backend.ingest_qdrant import CustomIngestionPipeline -from llama_index.core import Document + +with workflow.unsafe.imports_passed_through(): + from tc_hivemind_backend.ingest_qdrant import CustomIngestionPipeline + from llama_index.core import Document @workflow.defn @@ -48,7 +50,7 @@ async def run(self, ingestion_request: IngestionRequest) -> None: ) -@workflow.activity +@activity.defn async def process_document( ingestion_request: IngestionRequest, ) -> None: @@ -75,7 +77,7 @@ async def process_document( # Initialize the ingestion pipeline pipeline = CustomIngestionPipeline( community_id=ingestion_request.communityId, - collectionName=collection_name, + collection_name=collection_name, ) document = Document( From c47035303912461621b512d67399ec0613eed34d Mon Sep 17 00:00:00 2001 From: Mohammad Amin Date: Thu, 8 May 2025 10:00:29 +0330 Subject: [PATCH 3/4] feat: removed test workflow! --- registry.py | 2 -- workflows.py | 12 ------------ 2 files changed, 14 deletions(-) diff --git a/registry.py b/registry.py index 5e57eba..4e22c70 100644 --- a/registry.py +++ b/registry.py @@ -17,7 +17,6 @@ ) from workflows import ( CommunityWebsiteWorkflow, - SayHello, WebsiteIngestionSchedulerWorkflow, MediaWikiETLWorkflow, PlatformSummariesWorkflow, @@ -26,7 +25,6 @@ WORKFLOWS = [ CommunityWebsiteWorkflow, - SayHello, WebsiteIngestionSchedulerWorkflow, MediaWikiETLWorkflow, PlatformSummariesWorkflow, diff --git a/workflows.py b/workflows.py index 6afb12b..fa1ab2b 100644 --- a/workflows.py +++ b/workflows.py @@ -20,15 +20,3 @@ # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) - - -# For test purposes -# To be deleted in future -@workflow.defn -class SayHello: - @workflow.run - async def run(self) -> int: - logger.info(f"Hello at time {workflow.now()}!") - return await workflow.start_activity( - say_hello, start_to_close_timeout=timedelta(seconds=5) - ) From a6463ffffc0bce0a85b0c0a619b1de7c4155b5f3 Mon Sep 17 00:00:00 2001 From: Mohammad Amin Date: Thu, 8 May 2025 10:00:59 +0330 Subject: [PATCH 4/4] fix: black linter issue! --- hivemind_etl/simple_ingestion/pipeline.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hivemind_etl/simple_ingestion/pipeline.py b/hivemind_etl/simple_ingestion/pipeline.py index 1008cee..2e763e0 100644 --- a/hivemind_etl/simple_ingestion/pipeline.py +++ b/hivemind_etl/simple_ingestion/pipeline.py @@ -1,6 +1,6 @@ from datetime import timedelta -from temporalio import activity,workflow +from temporalio import activity, workflow from temporalio.common import RetryPolicy from temporalio.workflow import execute_activity from .schema import IngestionRequest