diff --git a/hivemind_etl/activities.py b/hivemind_etl/activities.py index 25c231d..9ad8b3f 100644 --- a/hivemind_etl/activities.py +++ b/hivemind_etl/activities.py @@ -13,6 +13,9 @@ transform_mediawiki_data, load_mediawiki_data, ) +from hivemind_etl.simple_ingestion.pipeline import ( + process_document, +) from temporalio import activity diff --git a/hivemind_etl/simple_ingestion/__init__.py b/hivemind_etl/simple_ingestion/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/hivemind_etl/simple_ingestion/pipeline.py b/hivemind_etl/simple_ingestion/pipeline.py new file mode 100644 index 0000000..2e763e0 --- /dev/null +++ b/hivemind_etl/simple_ingestion/pipeline.py @@ -0,0 +1,89 @@ +from datetime import timedelta + +from temporalio import activity, workflow +from temporalio.common import RetryPolicy +from temporalio.workflow import execute_activity +from .schema import IngestionRequest + +with workflow.unsafe.imports_passed_through(): + from tc_hivemind_backend.ingest_qdrant import CustomIngestionPipeline + from llama_index.core import Document + + +@workflow.defn +class IngestionWorkflow: + """A Temporal workflow for processing document ingestion requests. + + This workflow handles the orchestration of document processing activities, + including retry logic and timeout configurations. + """ + + @workflow.run + async def run(self, ingestion_request: IngestionRequest) -> None: + """Execute the ingestion workflow. + + Parameters + ---------- + ingestion_request : IngestionRequest + The request containing all necessary information for document processing, + including community ID, platform ID, text content, and metadata. + + Notes + ----- + The workflow implements a retry policy with the following configuration: + - Initial retry interval: 1 second + - Maximum retry interval: 1 minute + - Maximum retry attempts: 3 + - Activity timeout: 5 minutes + """ + retry_policy = RetryPolicy( + initial_interval=timedelta(seconds=1), + maximum_interval=timedelta(minutes=1), + maximum_attempts=3, + ) + + await execute_activity( + process_document, + ingestion_request, + retry_policy=retry_policy, + start_to_close_timeout=timedelta(minutes=5), + ) + + +@activity.defn +async def process_document( + ingestion_request: IngestionRequest, +) -> None: + """Process the document according to the ingestion request specifications. + + Parameters + ---------- + ingestion_request : IngestionRequest + The request containing all necessary information for document processing, + including community ID, platform ID, text content, and metadata. + + Notes + ----- + This activity will be implemented by the user to handle the actual document + processing logic, including any necessary embedding or LLM operations. + """ + if ingestion_request.collectionName is None: + collection_name = ( + f"{ingestion_request.communityId}_{ingestion_request.platformId}" + ) + else: + collection_name = ingestion_request.collectionName + + # Initialize the ingestion pipeline + pipeline = CustomIngestionPipeline( + community_id=ingestion_request.communityId, + collection_name=collection_name, + ) + + document = Document( + doc_id=ingestion_request.docId, + text=ingestion_request.text, + metadata=ingestion_request.metadata, + ) + + pipeline.run_pipeline(docs=[document]) diff --git a/hivemind_etl/simple_ingestion/schema.py b/hivemind_etl/simple_ingestion/schema.py new file mode 100644 index 0000000..9a9920e --- /dev/null +++ b/hivemind_etl/simple_ingestion/schema.py @@ -0,0 +1,39 @@ +from pydantic import BaseModel +from uuid import uuid4 + + +class IngestionRequest(BaseModel): + """A model representing an ingestion request for document processing. + + Parameters + ---------- + communityId : str + The unique identifier of the community. + platformId : str + The unique identifier of the platform. + text : str + The text content to be processed. + metadata : dict + Additional metadata associated with the document. + docId : str, optional + Unique identifier for the document. If not provided, a UUID will be generated. + Default is a new UUID. + excludedEmbedMetadataKeys : list[str], optional + List of metadata keys to exclude from embedding process. + Default is an empty list. + excludedLlmMetadataKeys : list[str], optional + List of metadata keys to exclude from LLM processing. + Default is an empty list. + collectionName : str | None, optional + The name of the collection to use for the document. + Default is `None` means it would follow the default pattern of `[communityId]_[platformId]` + """ + + communityId: str + platformId: str + text: str + metadata: dict + docId: str = str(uuid4()) + excludedEmbedMetadataKeys: list[str] = [] + excludedLlmMetadataKeys: list[str] = [] + collectionName: str | None = None diff --git a/hivemind_etl/website/website_etl.py b/hivemind_etl/website/website_etl.py index 385b3c9..2d678bb 100644 --- a/hivemind_etl/website/website_etl.py +++ b/hivemind_etl/website/website_etl.py @@ -19,7 +19,7 @@ def __init__( the community to save its data platform_id : str the platform to save its data - + Note: the collection name would be `community_id_platform_id` """ if not community_id or not isinstance(community_id, str): diff --git a/registry.py b/registry.py index 0856ac2..4e22c70 100644 --- a/registry.py +++ b/registry.py @@ -8,6 +8,7 @@ get_hivemind_mediawiki_platforms, transform_mediawiki_data, load_mediawiki_data, + process_document, ) from hivemind_summarizer.activities import ( fetch_platform_summaries_by_date, @@ -16,18 +17,18 @@ ) from workflows import ( CommunityWebsiteWorkflow, - SayHello, WebsiteIngestionSchedulerWorkflow, MediaWikiETLWorkflow, PlatformSummariesWorkflow, + IngestionWorkflow, ) WORKFLOWS = [ CommunityWebsiteWorkflow, - SayHello, WebsiteIngestionSchedulerWorkflow, MediaWikiETLWorkflow, PlatformSummariesWorkflow, + IngestionWorkflow, ] ACTIVITIES = [ @@ -43,4 +44,5 @@ fetch_platform_summaries_by_date, fetch_platform_summaries_by_date_range, get_platform_name, + process_document, ] diff --git a/workflows.py b/workflows.py index 8f60276..fa1ab2b 100644 --- a/workflows.py +++ b/workflows.py @@ -10,6 +10,9 @@ from hivemind_etl.mediawiki.workflows import ( MediaWikiETLWorkflow, ) +from hivemind_etl.simple_ingestion.pipeline import ( + IngestionWorkflow, +) from hivemind_summarizer.workflows import PlatformSummariesWorkflow from temporalio import workflow @@ -17,15 +20,3 @@ # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) - - -# For test purposes -# To be deleted in future -@workflow.defn -class SayHello: - @workflow.run - async def run(self) -> int: - logger.info(f"Hello at time {workflow.now()}!") - return await workflow.start_activity( - say_hello, start_to_close_timeout=timedelta(seconds=5) - )