Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions hivemind_etl/activities.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
transform_mediawiki_data,
load_mediawiki_data,
)
from hivemind_etl.simple_ingestion.pipeline import (
process_document,
)

from temporalio import activity

Expand Down
Empty file.
89 changes: 89 additions & 0 deletions hivemind_etl/simple_ingestion/pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
from datetime import timedelta

from temporalio import activity, workflow
from temporalio.common import RetryPolicy
from temporalio.workflow import execute_activity
from .schema import IngestionRequest

with workflow.unsafe.imports_passed_through():
from tc_hivemind_backend.ingest_qdrant import CustomIngestionPipeline
from llama_index.core import Document


@workflow.defn
class IngestionWorkflow:
"""A Temporal workflow for processing document ingestion requests.

This workflow handles the orchestration of document processing activities,
including retry logic and timeout configurations.
"""

@workflow.run
async def run(self, ingestion_request: IngestionRequest) -> None:
"""Execute the ingestion workflow.

Parameters
----------
ingestion_request : IngestionRequest
The request containing all necessary information for document processing,
including community ID, platform ID, text content, and metadata.

Notes
-----
The workflow implements a retry policy with the following configuration:
- Initial retry interval: 1 second
- Maximum retry interval: 1 minute
- Maximum retry attempts: 3
- Activity timeout: 5 minutes
"""
retry_policy = RetryPolicy(
initial_interval=timedelta(seconds=1),
maximum_interval=timedelta(minutes=1),
maximum_attempts=3,
)

await execute_activity(
process_document,
ingestion_request,
retry_policy=retry_policy,
start_to_close_timeout=timedelta(minutes=5),
)


@activity.defn
async def process_document(
ingestion_request: IngestionRequest,
) -> None:
"""Process the document according to the ingestion request specifications.

Parameters
----------
ingestion_request : IngestionRequest
The request containing all necessary information for document processing,
including community ID, platform ID, text content, and metadata.

Notes
-----
This activity will be implemented by the user to handle the actual document
processing logic, including any necessary embedding or LLM operations.
"""
if ingestion_request.collectionName is None:
collection_name = (
f"{ingestion_request.communityId}_{ingestion_request.platformId}"
)
else:
collection_name = ingestion_request.collectionName

# Initialize the ingestion pipeline
pipeline = CustomIngestionPipeline(
community_id=ingestion_request.communityId,
collection_name=collection_name,
)

document = Document(
doc_id=ingestion_request.docId,
text=ingestion_request.text,
metadata=ingestion_request.metadata,
)

pipeline.run_pipeline(docs=[document])
39 changes: 39 additions & 0 deletions hivemind_etl/simple_ingestion/schema.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
from pydantic import BaseModel
from uuid import uuid4


class IngestionRequest(BaseModel):
"""A model representing an ingestion request for document processing.

Parameters
----------
communityId : str
The unique identifier of the community.
platformId : str
The unique identifier of the platform.
text : str
The text content to be processed.
metadata : dict
Additional metadata associated with the document.
docId : str, optional
Unique identifier for the document. If not provided, a UUID will be generated.
Default is a new UUID.
excludedEmbedMetadataKeys : list[str], optional
List of metadata keys to exclude from embedding process.
Default is an empty list.
excludedLlmMetadataKeys : list[str], optional
List of metadata keys to exclude from LLM processing.
Default is an empty list.
collectionName : str | None, optional
The name of the collection to use for the document.
Default is `None` means it would follow the default pattern of `[communityId]_[platformId]`
"""

communityId: str
platformId: str
text: str
metadata: dict
docId: str = str(uuid4())
excludedEmbedMetadataKeys: list[str] = []
excludedLlmMetadataKeys: list[str] = []
collectionName: str | None = None
2 changes: 1 addition & 1 deletion hivemind_etl/website/website_etl.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def __init__(
the community to save its data
platform_id : str
the platform to save its data

Note: the collection name would be `community_id_platform_id`
"""
if not community_id or not isinstance(community_id, str):
Expand Down
6 changes: 4 additions & 2 deletions registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
get_hivemind_mediawiki_platforms,
transform_mediawiki_data,
load_mediawiki_data,
process_document,
)
from hivemind_summarizer.activities import (
fetch_platform_summaries_by_date,
Expand All @@ -16,18 +17,18 @@
)
from workflows import (
CommunityWebsiteWorkflow,
SayHello,
WebsiteIngestionSchedulerWorkflow,
MediaWikiETLWorkflow,
PlatformSummariesWorkflow,
IngestionWorkflow,
)

WORKFLOWS = [
CommunityWebsiteWorkflow,
SayHello,
WebsiteIngestionSchedulerWorkflow,
MediaWikiETLWorkflow,
PlatformSummariesWorkflow,
IngestionWorkflow,
]

ACTIVITIES = [
Expand All @@ -43,4 +44,5 @@
fetch_platform_summaries_by_date,
fetch_platform_summaries_by_date_range,
get_platform_name,
process_document,
]
15 changes: 3 additions & 12 deletions workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,13 @@
from hivemind_etl.mediawiki.workflows import (
MediaWikiETLWorkflow,
)
from hivemind_etl.simple_ingestion.pipeline import (
IngestionWorkflow,
)
from hivemind_summarizer.workflows import PlatformSummariesWorkflow

from temporalio import workflow

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


# For test purposes
# To be deleted in future
@workflow.defn
class SayHello:
@workflow.run
async def run(self) -> int:
logger.info(f"Hello at time {workflow.now()}!")
return await workflow.start_activity(
say_hello, start_to_close_timeout=timedelta(seconds=5)
)