diff --git a/flo_ai/examples/document_processing_example.py b/flo_ai/examples/document_processing_example.py new file mode 100644 index 00000000..f70648d2 --- /dev/null +++ b/flo_ai/examples/document_processing_example.py @@ -0,0 +1,581 @@ +#!/usr/bin/env python3 +""" +Document Processing Examples for Flo AI Framework + +This example demonstrates the new document processing capabilities including: +- PDF and TXT document support +- Document analysis with agents +- Document processing tools +- YAML-based document workflows +""" + +import asyncio +import os +import base64 +from pathlib import Path +from reportlab.lib.pagesizes import letter +from reportlab.lib.styles import getSampleStyleSheet +from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer + +from flo_ai.builder.agent_builder import AgentBuilder +from flo_ai.arium import AriumBuilder +from flo_ai.llm import OpenAI, Gemini +from flo_ai.models.document import DocumentMessage, DocumentType + +openai_api_key = os.getenv('OPENAI_API_KEY') +google_api_key = os.getenv('GOOGLE_API_KEY') + + +def create_sample_txt_file(): + """Create a sample TXT file for testing.""" + sample_text = """ +# Sample Document for Testing + +This is a sample text document created for testing the Flo AI document processing capabilities. + +## Introduction + +The Flo AI framework now supports processing of PDF and TXT documents with an extensible +architecture that can be easily expanded to support additional document formats in the future. + +## Key Features + +1. Document Processing: Extract text from PDF and TXT files +2. Agent Integration: Agents can now accept documents as inputs +3. Tool Support: Ready-to-use tools for document analysis +4. LLM Integration: Works with OpenAI, Gemini, and other supported LLMs + +## Technical Implementation + +The document processing system uses: +- DocumentMessage class for structured document inputs +- DocumentProcessor utility for extensible document handling +- LLM-specific formatting for optimal processing + +## Conclusion + +This document processing capability makes Flo AI more versatile for real-world applications +that need to analyze and process various document formats. +""" + + file_path = Path('sample_document.txt') + with open(file_path, 'w', encoding='utf-8') as f: + f.write(sample_text) + + return str(file_path) + + +def create_sample_pdf_file(): + """Create a sample PDF file for testing.""" + file_path = Path('sample_document.pdf') + + # Create PDF with reportlab + doc = SimpleDocTemplate(str(file_path), pagesize=letter) + styles = getSampleStyleSheet() + + # Build the document content + story = [] + + # Title + title = Paragraph('Sample PDF Document for Flo AI Testing', styles['Title']) + story.append(title) + story.append(Spacer(1, 12)) + + # Introduction + intro_text = """ + This is a sample PDF document created for testing the Flo AI document processing capabilities. + The PDF format testing demonstrates the framework's ability to handle structured documents + with formatting, multiple pages, and various content types. + """ + intro = Paragraph(intro_text, styles['Normal']) + story.append(intro) + story.append(Spacer(1, 12)) + + # Section 1 + section1_title = Paragraph('Document Processing Features', styles['Heading2']) + story.append(section1_title) + + features_text = """ + 1. PDF Text Extraction: Advanced parsing of PDF content including formatted text + 2. Bytes Processing: Handle PDF documents from memory without file system access + 3. Base64 Encoding: Process documents encoded in base64 format for API transmission + 4. Multi-format Support: Seamless handling of both PDF and TXT documents + 5. Agent Integration: Direct document input to AI agents for analysis + """ + features = Paragraph(features_text, styles['Normal']) + story.append(features) + story.append(Spacer(1, 12)) + + # Section 2 + section2_title = Paragraph('Technical Architecture', styles['Heading2']) + story.append(section2_title) + + technical_text = """ + The Flo AI document processing system leverages: + • DocumentMessage class for unified document representation + • DocumentProcessor with extensible format handlers + • LLM-specific document formatting for optimal AI processing + • Async processing pipeline for efficient document handling + """ + technical = Paragraph(technical_text, styles['Normal']) + story.append(technical) + story.append(Spacer(1, 12)) + + # Conclusion + conclusion_title = Paragraph('Use Cases', styles['Heading2']) + story.append(conclusion_title) + + conclusion_text = """ + This comprehensive document processing capability enables Flo AI to handle real-world + applications including: + + • Document analysis and summarization + • Content extraction and search + • Multi-document comparison and insights + • Workflow automation with document inputs + • API-based document processing services + """ + conclusion = Paragraph(conclusion_text, styles['Normal']) + story.append(conclusion) + + # Build the PDF + doc.build(story) + + return str(file_path) + + +async def example_1_basic_document_agent(): + """Example 1: Basic document processing with an agent""" + print('=' * 60) + print('EXAMPLE 1: Basic Document Processing with Agent') + print('=' * 60) + + # Create sample file + txt_file = create_sample_txt_file() + + try: + # Create document message + document = DocumentMessage( + document_type=DocumentType.TXT, document_file_path=txt_file + ) + + # Create document analysis agent + llm = OpenAI( + model='gpt-4o-mini', api_key=openai_api_key + ) # Use OpenAI for this example + + agent = ( + AgentBuilder() + .with_name('Document Analyzer') + .with_prompt( + 'You are a document analysis expert. Analyze the provided document and provide insights about its content, structure, and key points.' + ) + .with_llm(llm) + .build() + ) + + # Process document with agent + result = await agent.run([document]) + + print('📄 Document Analysis Result:') + print(result) + + except Exception as e: + print(f'❌ Error in example 1: {e}') + if 'Not implemented document processing' in str(e): + print( + "💡 Note: This LLM doesn't support document processing yet. Try with Gemini LLM." + ) + + finally: + # Clean up + if os.path.exists(txt_file): + os.remove(txt_file) + + +async def example_2_document_workflow(): + """Example 3: Document processing workflow with YAML""" + print('\n' + '=' * 60) + print('EXAMPLE 2: Document Processing Workflow') + print('=' * 60) + + # Create sample file + txt_file = create_sample_txt_file() + + try: + # Create document analysis workflow with proper router configuration + workflow_yaml = """ + metadata: + name: document-analysis-workflow + version: 2.0.0 + description: "Document analysis workflow with intelligent routing" + + arium: + agents: + - name: intake_agent + role: "Document Intake" + job: "Initial document processing and content assessment." + model: + provider: openai + name: gpt-4o-mini + settings: + temperature: 0.1 + + - name: content_analyzer + role: "Document Content Analyst" + job: "Analyze document content for themes, insights, structure, and key information." + model: + provider: openai + name: gpt-4o-mini + settings: + temperature: 0.3 + + - name: summary_generator + role: "Summary Writer" + job: "Create comprehensive and informative summaries of analyzed content." + model: + provider: openai + name: gpt-4o-mini + settings: + temperature: 0.2 + + routers: + - name: analysis_router + type: smart + routing_options: + content_analyzer: "Route here for detailed content analysis of the document" + summary_generator: "Route here if the content is already analyzed and ready for summarization" + model: + provider: openai + name: gpt-4o-mini + settings: + temperature: 0.1 + context_description: "a document processing workflow that analyzes then summarizes content" + + workflow: + start: intake_agent + edges: + - from: intake_agent + to: [content_analyzer, summary_generator] + router: analysis_router + - from: content_analyzer + to: [summary_generator] + end: [summary_generator] + """ + + # Create document message for workflow processing + document = DocumentMessage( + document_type=DocumentType.TXT, document_file_path=txt_file + ) + + # Build and run workflow with DocumentMessage + print('🔄 Running document analysis workflow...') + + workflow = AriumBuilder().from_yaml(yaml_str=workflow_yaml).build() + + result = await workflow.run([document, 'process this document']) + + print('📊 Workflow Analysis Results:') + for i, message in enumerate(result, 1): + # Convert message to string if it's a DocumentMessage or other object + message_str = str(message) + print( + f'{i}. {message_str[:200]}...' + if len(message_str) > 200 + else f'{i}. {message_str}' + ) + print() + + except Exception as e: + print(f'❌ Error in example 2: {e}') + if 'OPENAI_API_KEY' in str(e): + print('💡 Make sure to set your OPENAI_API_KEY environment variable') + + finally: + # Clean up + if os.path.exists(txt_file): + os.remove(txt_file) + + +async def example_6_gemini_document_processing(): + """Example 6: Document processing with Gemini LLM (if available)""" + print('\n' + '=' * 60) + print('EXAMPLE 6: Document Processing with Gemini LLM') + print('=' * 60) + + # Create sample file + txt_file = create_sample_txt_file() + + try: + # Create document message + document = DocumentMessage( + document_type=DocumentType.TXT, document_file_path=txt_file + ) + + # Create document analysis agent with Gemini + llm = Gemini(model='gemini-2.5-flash', api_key=google_api_key) + + agent = ( + AgentBuilder() + .with_name('Gemini Document Processor') + .with_prompt( + 'You are an advanced document analysis AI. Provide detailed insights about the document structure, content themes, and actionable recommendations.' + ) + .with_llm(llm) + .build() + ) + + # Process document with Gemini + result = await agent.run([document]) + + print('🤖 Gemini Document Analysis:') + print(result) + + except Exception as e: + print(f'❌ Error in example 6: {e}') + if 'api_key' in str(e).lower(): + print('💡 Make sure to set your Google API key for Gemini') + + finally: + # Clean up + if os.path.exists(txt_file): + os.remove(txt_file) + + +async def example_3_pdf_document_processing(): + """Example 3: PDF document processing with agents""" + print('\n' + '=' * 60) + print('EXAMPLE 3: PDF Document Processing') + print('=' * 60) + + # Create sample PDF file + pdf_file = create_sample_pdf_file() + + try: + # Create document message for PDF + document = DocumentMessage( + document_type=DocumentType.PDF, document_file_path=pdf_file + ) + + # Create PDF analysis agent + llm = OpenAI(model='gpt-4o-mini', api_key=openai_api_key) + + agent = ( + AgentBuilder() + .with_name('PDF Document Analyzer') + .with_prompt( + 'You are a PDF document analysis expert. Analyze the provided PDF document and provide insights about its content, structure, formatting, and key information.' + ) + .with_llm(llm) + .build() + ) + + # Process PDF document with agent + result = await agent.run([document]) + + print('📄 PDF Document Analysis Result:') + print(result) + + except Exception as e: + print(f'❌ Error in example 3: {e}') + if 'Not implemented document processing' in str(e): + print("💡 Note: This LLM doesn't support PDF document processing yet.") + + finally: + # Clean up + if os.path.exists(pdf_file): + os.remove(pdf_file) + + +async def example_4_document_bytes_processing(): + """Example 4: Processing documents from bytes data""" + print('\n' + '=' * 60) + print('EXAMPLE 4: Document Processing from Bytes') + print('=' * 60) + + # Create sample files and read as bytes + txt_file = create_sample_txt_file() + pdf_file = create_sample_pdf_file() + + try: + print('🔧 Testing Document Processing from Bytes Data\n') + + # Process TXT from bytes + print('1. Processing TXT document from bytes:') + with open(txt_file, 'rb') as f: + txt_bytes = f.read() + + txt_document = DocumentMessage( + document_type=DocumentType.TXT, + document_bytes=txt_bytes, + mime_type='text/plain', + ) + + # Create agent for bytes processing + llm = OpenAI(model='gpt-4o-mini', api_key=openai_api_key) + agent = ( + AgentBuilder() + .with_name('Bytes Document Processor') + .with_prompt( + 'Analyze this document that was provided as bytes data. Focus on the content and structure.' + ) + .with_llm(llm) + .build() + ) + + result = await agent.run([txt_document]) + print(f' TXT from bytes analysis: {result[:100]}...\n') + + # Process PDF from bytes + print('2. Processing PDF document from bytes:') + with open(pdf_file, 'rb') as f: + pdf_bytes = f.read() + + pdf_document = DocumentMessage( + document_type=DocumentType.PDF, + document_bytes=pdf_bytes, + mime_type='application/pdf', + ) + + result = await agent.run([pdf_document]) + print(f' PDF from bytes analysis: {result[:100]}...\n') + + # Test tools with bytes (using extract tools) + print('3. Using document tools with bytes data:') + # Note: Tools expect file paths, so we'll show the concept + print(f' TXT bytes size: {len(txt_bytes)} bytes') + print(f' PDF bytes size: {len(pdf_bytes)} bytes') + print(' ✅ Documents successfully processed from bytes data') + + except Exception as e: + print(f'❌ Error in example 4: {e}') + + finally: + # Clean up + if os.path.exists(txt_file): + os.remove(txt_file) + if os.path.exists(pdf_file): + os.remove(pdf_file) + + +async def example_5_document_base64_processing(): + """Example 5: Processing documents from base64 encoded data""" + print('\n' + '=' * 60) + print('EXAMPLE 5: Document Processing from Base64') + print('=' * 60) + + # Create sample files and encode as base64 + txt_file = create_sample_txt_file() + pdf_file = create_sample_pdf_file() + + try: + print('🔧 Testing Document Processing from Base64 Data\n') + + # Process TXT from base64 + print('1. Processing TXT document from base64:') + with open(txt_file, 'rb') as f: + txt_bytes = f.read() + txt_base64 = base64.b64encode(txt_bytes).decode('utf-8') + + txt_document = DocumentMessage( + document_type=DocumentType.TXT, + document_base64=txt_base64, + mime_type='text/plain', + ) + + # Create agent for base64 processing + llm = OpenAI(model='gpt-4o-mini', api_key=openai_api_key) + agent = ( + AgentBuilder() + .with_name('Base64 Document Processor') + .with_prompt( + 'Analyze this document that was provided as base64 encoded data. Provide insights about the content.' + ) + .with_llm(llm) + .build() + ) + + result = await agent.run([txt_document]) + print(f' TXT from base64 analysis: {result[:100]}...\n') + + # Process PDF from base64 + print('2. Processing PDF document from base64:') + with open(pdf_file, 'rb') as f: + pdf_bytes = f.read() + pdf_base64 = base64.b64encode(pdf_bytes).decode('utf-8') + + pdf_document = DocumentMessage( + document_type=DocumentType.PDF, + document_base64=pdf_base64, + mime_type='application/pdf', + ) + + result = await agent.run([pdf_document]) + print(f' PDF from base64 analysis: {result[:100]}...\n') + + # Show base64 data info + print('3. Base64 encoding information:') + print(f' TXT base64 length: {len(txt_base64)} characters') + print(f' PDF base64 length: {len(pdf_base64)} characters') + print(f' TXT base64 sample: {txt_base64[:50]}...') + print(' ✅ Documents successfully processed from base64 data') + + except Exception as e: + print(f'❌ Error in example 5: {e}') + + finally: + # Clean up + if os.path.exists(txt_file): + os.remove(txt_file) + if os.path.exists(pdf_file): + os.remove(pdf_file) + + +async def main(): + """Run all document processing examples""" + print('🚀 Flo AI Document Processing Examples') + print('Demonstrating PDF and TXT document processing capabilities\n') + + try: + # Run examples + await example_1_basic_document_agent() + # await example_2_document_workflow() + await example_3_pdf_document_processing() + await example_4_document_bytes_processing() + await example_5_document_base64_processing() + await example_6_gemini_document_processing() + + print('\n' + '=' * 60) + print('🎉 ALL DOCUMENT PROCESSING EXAMPLES COMPLETED!') + print('=' * 60) + print('\n📋 Examples demonstrated:') + print(' • Basic document processing with agents') + print(' • YAML-based document workflows') + print(' • PDF document processing and analysis') + print(' • Document processing from bytes data') + print(' • Document processing from base64 encoded data') + print(' • Multi-LLM document support (OpenAI, Gemini)') + print('\n💡 Key features showcased:') + print( + ' • DocumentMessage for structured document inputs (file_path, bytes, base64)' + ) + print(' • Extensible document processor architecture') + print(' • PDF and TXT document format support') + print(' • LLM-agnostic document formatting') + print(' • Integration with existing Arium workflows') + + except Exception as e: + print(f'❌ Error running examples: {e}') + import traceback + + traceback.print_exc() + + +if __name__ == '__main__': + # Check API keys + if not os.getenv('OPENAI_API_KEY'): + print('⚠️ Warning: OPENAI_API_KEY environment variable not set!') + print(' Set it with: export OPENAI_API_KEY=your_api_key_here') + print(' Some examples may fail without API keys.\n') + + asyncio.run(main()) diff --git a/flo_ai/flo_ai/__init__.py b/flo_ai/flo_ai/__init__.py index 3965e83f..671e8abe 100644 --- a/flo_ai/flo_ai/__init__.py +++ b/flo_ai/flo_ai/__init__.py @@ -3,7 +3,15 @@ """ # Models package - Agent framework components -from .models import Agent, AgentError, BaseAgent, AgentType, ReasoningPattern +from .models import ( + Agent, + AgentError, + BaseAgent, + AgentType, + ReasoningPattern, + DocumentType, + DocumentMessage, +) from .builder.agent_builder import AgentBuilder @@ -50,6 +58,8 @@ 'OpenAIVLLM', # LLM DataClass 'ImageMessage', + 'DocumentType', + 'DocumentMessage', # Tools 'Tool', 'ToolExecutionError', diff --git a/flo_ai/flo_ai/arium/arium.py b/flo_ai/flo_ai/arium/arium.py index ecde45fa..a86243a6 100644 --- a/flo_ai/flo_ai/arium/arium.py +++ b/flo_ai/flo_ai/arium/arium.py @@ -1,6 +1,7 @@ from flo_ai.arium.base import BaseArium from flo_ai.arium.memory import MessageMemory, BaseMemory from flo_ai.llm.base_llm import ImageMessage +from flo_ai.models.document import DocumentMessage from typing import List, Dict, Any, Optional, Callable from flo_ai.models.agent import Agent from flo_ai.tool.base_tool import Tool @@ -29,7 +30,7 @@ def compile(self): async def run( self, - inputs: List[str | ImageMessage], + inputs: List[str | ImageMessage | DocumentMessage], variables: Optional[Dict[str, Any]] = None, event_callback: Optional[Callable[[AriumEvent], None]] = None, events_filter: Optional[List[AriumEventType]] = None, @@ -114,7 +115,7 @@ def _emit_event( async def _execute_graph( self, - inputs: List[str | ImageMessage], + inputs: List[str | ImageMessage | DocumentMessage], event_callback: Optional[Callable[[AriumEvent], None]] = None, events_filter: Optional[List[AriumEventType]] = None, ): @@ -221,7 +222,9 @@ async def _execute_graph( return self.memory.get() def _extract_and_validate_variables( - self, inputs: List[str | ImageMessage], variables: Dict[str, Any] + self, + inputs: List[str | ImageMessage | DocumentMessage], + variables: Dict[str, Any], ) -> None: """Extract variables from inputs and agents, then validate them. @@ -258,8 +261,10 @@ def _extract_and_validate_variables( validate_multi_agent_variables(agents_variables, variables) def _resolve_inputs( - self, inputs: List[str | ImageMessage], variables: Dict[str, Any] - ) -> List[str | ImageMessage]: + self, + inputs: List[str | ImageMessage | DocumentMessage], + variables: Dict[str, Any], + ) -> List[str | ImageMessage | DocumentMessage]: """Resolve variables in input messages. Args: @@ -276,7 +281,7 @@ def _resolve_inputs( resolved_input = resolve_variables(input_item, variables) resolved_inputs.append(resolved_input) else: - # ImageMessage objects don't need variable resolution + # ImageMessage and DocumentMessage objects don't need variable resolution resolved_inputs.append(input_item) return resolved_inputs diff --git a/flo_ai/flo_ai/arium/builder.py b/flo_ai/flo_ai/arium/builder.py index 5f0c3b60..03ca6c80 100644 --- a/flo_ai/flo_ai/arium/builder.py +++ b/flo_ai/flo_ai/arium/builder.py @@ -4,6 +4,7 @@ from flo_ai.models.agent import Agent from flo_ai.tool.base_tool import Tool from flo_ai.llm.base_llm import ImageMessage +from flo_ai.models.document import DocumentMessage import yaml from flo_ai.builder.agent_builder import AgentBuilder from flo_ai.llm import BaseLLM @@ -134,7 +135,7 @@ def build(self) -> Arium: async def build_and_run( self, - inputs: List[Union[str, ImageMessage]], + inputs: List[Union[str, ImageMessage, DocumentMessage]], variables: Optional[Dict[str, Any]] = None, ) -> List[dict]: """Build the Arium and run it with the given inputs and optional runtime variables.""" diff --git a/flo_ai/flo_ai/llm/base_llm.py b/flo_ai/flo_ai/llm/base_llm.py index 8cf435f5..d7910e54 100644 --- a/flo_ai/flo_ai/llm/base_llm.py +++ b/flo_ai/flo_ai/llm/base_llm.py @@ -1,7 +1,10 @@ from abc import ABC, abstractmethod from typing import Dict, Any, List, Optional from flo_ai.tool.base_tool import Tool +from flo_ai.utils.document_processor import get_default_processor +from flo_ai.utils.logger import logger from dataclasses import dataclass +from flo_ai.models.document import DocumentMessage @dataclass @@ -65,3 +68,25 @@ def format_tools_for_llm(self, tools: List['Tool']) -> List[Dict[str, Any]]: def format_image_in_message(self, image: ImageMessage) -> str: """Format a image in the message""" pass + + async def format_document_in_message(self, document: 'DocumentMessage') -> str: + """Format a document in the message by extracting text content""" + + try: + # Process document to extract text + result = await get_default_processor().process_document(document) + + # Format the extracted content for the LLM + extracted_text = result.get('extracted_text', '') + doc_type = result.get('document_type', 'unknown') + + logger.info( + f'Successfully formatted {doc_type} document for {self.__class__.__name__} LLM' + ) + return extracted_text + + except Exception as e: + logger.error( + f'Error formatting document for {self.__class__.__name__}: {e}' + ) + raise Exception(f'Failed to format document: {str(e)}') diff --git a/flo_ai/flo_ai/models/__init__.py b/flo_ai/flo_ai/models/__init__.py index 52f0ceb6..2f7c5d80 100644 --- a/flo_ai/flo_ai/models/__init__.py +++ b/flo_ai/flo_ai/models/__init__.py @@ -5,5 +5,14 @@ from .agent import Agent from .agent_error import AgentError from .base_agent import BaseAgent, AgentType, ReasoningPattern +from .document import DocumentMessage, DocumentType -__all__ = ['Agent', 'AgentError', 'BaseAgent', 'AgentType', 'ReasoningPattern'] +__all__ = [ + 'Agent', + 'AgentError', + 'BaseAgent', + 'AgentType', + 'ReasoningPattern', + 'DocumentMessage', + 'DocumentType', +] diff --git a/flo_ai/flo_ai/models/agent.py b/flo_ai/flo_ai/models/agent.py index a3b78b26..0cd9243f 100644 --- a/flo_ai/flo_ai/models/agent.py +++ b/flo_ai/flo_ai/models/agent.py @@ -2,6 +2,7 @@ from typing import Dict, Any, List, Optional from flo_ai.models.base_agent import BaseAgent, AgentType, ReasoningPattern from flo_ai.llm.base_llm import BaseLLM, ImageMessage +from flo_ai.models.document import DocumentMessage from flo_ai.tool.base_tool import Tool, ToolExecutionError from flo_ai.models.agent_error import AgentError from flo_ai.utils.logger import logger @@ -48,7 +49,7 @@ def __init__( async def run( self, - inputs: List[str | ImageMessage] | str, + inputs: List[str | ImageMessage | DocumentMessage] | str, variables: Optional[Dict[str, Any]] = None, ) -> str: variables = variables or {} @@ -75,6 +76,9 @@ async def run( for input in inputs: if isinstance(input, ImageMessage): self.add_to_history('user', self.llm.format_image_in_message(input)) + elif isinstance(input, DocumentMessage): + formatted_doc = await self.llm.format_document_in_message(input) + self.add_to_history('user', formatted_doc) else: # Resolve variables in text input resolved_input = resolve_variables(input, variables) @@ -88,6 +92,9 @@ async def run( for input in inputs: if isinstance(input, ImageMessage): self.add_to_history('user', self.llm.format_image_in_message(input)) + elif isinstance(input, DocumentMessage): + formatted_doc = await self.llm.format_document_in_message(input) + self.add_to_history('user', formatted_doc) else: self.add_to_history('user', input) diff --git a/flo_ai/flo_ai/models/document.py b/flo_ai/flo_ai/models/document.py new file mode 100644 index 00000000..5f82d5a5 --- /dev/null +++ b/flo_ai/flo_ai/models/document.py @@ -0,0 +1,33 @@ +""" +Document-related data models for Flo AI framework. + +This module contains document types and message classes to avoid circular imports. +""" + +from dataclasses import dataclass +from enum import Enum +from typing import Dict, Any, Optional + + +class DocumentType(Enum): + """Enumeration of supported document types.""" + + PDF = 'pdf' + TXT = 'txt' + + +@dataclass +class DocumentMessage: + """ + Data structure for document inputs to LLMs. + + Supports various document formats with extensible design for future types. + """ + + document_type: DocumentType + document_url: Optional[str] = None + document_bytes: Optional[bytes] = None + document_file_path: Optional[str] = None + document_base64: Optional[str] = None + mime_type: Optional[str] = None + metadata: Optional[Dict[str, Any]] = None diff --git a/flo_ai/flo_ai/utils/document_processor.py b/flo_ai/flo_ai/utils/document_processor.py new file mode 100644 index 00000000..288f677f --- /dev/null +++ b/flo_ai/flo_ai/utils/document_processor.py @@ -0,0 +1,239 @@ +""" +Document processing utilities for Flo AI framework. + +This module provides extensible document processing capabilities for PDF and TXT files, +with a factory pattern design for easy addition of new document types. +""" + +import os +import base64 +import time +from abc import ABC, abstractmethod +from typing import Dict, Any, Union + +import pymupdf +import pymupdf4llm +import chardet + +from flo_ai.models.document import DocumentMessage, DocumentType +from flo_ai.utils.logger import logger + + +class DocumentProcessingError(Exception): + """Exception raised when document processing fails.""" + + pass + + +class BaseDocumentProcessor(ABC): + """Abstract base class for document processors.""" + + @abstractmethod + async def process(self, document: DocumentMessage) -> Dict[str, Any]: + """ + Process a document and return extracted content and metadata. + + Args: + document: DocumentMessage containing document data + + Returns: + Dict containing extracted text, metadata, and processing info + """ + pass + + +class PDFProcessor(BaseDocumentProcessor): + """Processor for PDF documents.""" + + async def process(self, document: DocumentMessage) -> Dict[str, Any]: + """Extract text and metadata from PDF document.""" + try: + pdf_content = await self._get_pdf_content(document) + + # Process with pymupdf4llm (LLM-optimized) + text_data = await self._process_with_pymupdf4llm(pdf_content) + + return { + 'extracted_text': text_data['text'], + 'page_count': text_data.get('page_count', 0), + 'processing_method': text_data.get('method', 'unknown'), + 'metadata': text_data.get('metadata', {}), + 'document_type': DocumentType.PDF.value, + } + + except Exception as e: + logger.error(f'Error processing PDF: {str(e)}') + raise DocumentProcessingError(f'Failed to process PDF: {str(e)}') + + async def _get_pdf_content(self, document: DocumentMessage) -> Union[str, bytes]: + """Get PDF content from various sources.""" + if document.document_file_path: + if not os.path.exists(document.document_file_path): + raise DocumentProcessingError( + f'PDF file not found: {document.document_file_path}' + ) + return document.document_file_path + elif document.document_bytes: + return document.document_bytes + elif document.document_base64: + return base64.b64decode(document.document_base64) + else: + raise DocumentProcessingError('No PDF content provided') + + async def _process_with_pymupdf4llm( + self, pdf_content: Union[str, bytes] + ) -> Dict[str, Any]: + """Process PDF using pymupdf4llm (LLM-optimized).""" + if isinstance(pdf_content, str): + # File path - pass directly to pymupdf4llm + text_data = pymupdf4llm.to_markdown(pdf_content) + metadata = {} + else: + # Bytes - create PyMuPDF Document from memory + doc = pymupdf.open(stream=pdf_content) + try: + text_data = pymupdf4llm.to_markdown(doc) + metadata = {} + finally: + doc.close() # Clean up document object + + return { + 'text': text_data, + 'method': 'pymupdf4llm', + 'metadata': metadata, + 'page_count': len(text_data.split('\n---\n')) if '---' in text_data else 1, + } + + +class TXTProcessor(BaseDocumentProcessor): + """Processor for text documents.""" + + async def process(self, document: DocumentMessage) -> Dict[str, Any]: + """Extract text from TXT document.""" + try: + text_content = await self._get_text_content(document) + + return { + 'extracted_text': text_content, + 'page_count': 1, + 'processing_method': 'text_reader', + 'metadata': { + 'character_count': len(text_content), + 'line_count': len(text_content.splitlines()), + 'encoding': 'utf-8', + }, + 'document_type': DocumentType.TXT.value, + } + + except Exception as e: + logger.error(f'Error processing TXT: {str(e)}') + raise DocumentProcessingError(f'Failed to process TXT: {str(e)}') + + async def _get_text_content(self, document: DocumentMessage) -> str: + """Get text content from various sources.""" + if document.document_file_path: + if not os.path.exists(document.document_file_path): + raise DocumentProcessingError( + f'TXT file not found: {document.document_file_path}' + ) + return await self._read_text_file(document.document_file_path) + elif document.document_bytes: + return await self._decode_bytes(document.document_bytes) + elif document.document_base64: + decoded_bytes = base64.b64decode(document.document_base64) + return await self._decode_bytes(decoded_bytes) + else: + raise DocumentProcessingError('No TXT content provided') + + async def _read_text_file(self, file_path: str) -> str: + """Read text file with encoding detection.""" + try: + # Try UTF-8 first + with open(file_path, 'r', encoding='utf-8') as f: + return f.read() + except UnicodeDecodeError: + # Try encoding detection with chardet + with open(file_path, 'rb') as f: + raw_data = f.read() + detected = chardet.detect(raw_data) + encoding = detected.get('encoding', 'utf-8') + return raw_data.decode(encoding, errors='replace') + + async def _decode_bytes(self, content_bytes: bytes) -> str: + """Decode bytes with encoding detection.""" + try: + return content_bytes.decode('utf-8') + except UnicodeDecodeError: + detected = chardet.detect(content_bytes) + encoding = detected.get('encoding', 'utf-8') + return content_bytes.decode(encoding, errors='replace') + + +class DocumentProcessor: + """ + Main document processor with factory pattern for extensibility. + + Supports PDF and TXT documents with easy extension for new types. + """ + + def __init__(self): + self._processors = { + DocumentType.PDF: PDFProcessor(), + DocumentType.TXT: TXTProcessor(), + } + + def register_processor( + self, document_type: DocumentType, processor: BaseDocumentProcessor + ): + """Register a new document processor for a specific type.""" + self._processors[document_type] = processor + + async def process_document(self, document: DocumentMessage) -> Dict[str, Any]: + """ + Process a document using the appropriate processor. + + Args: + document: DocumentMessage containing document data + + Returns: + Dict containing extracted content and metadata + + Raises: + DocumentProcessingError: If processing fails or document type unsupported + """ + if document.document_type not in self._processors: + raise DocumentProcessingError( + f'Unsupported document type: {document.document_type}. ' + f'Supported types: {list(self._processors.keys())}' + ) + + processor: BaseDocumentProcessor = self._processors[document.document_type] + + try: + result = await processor.process(document) + + # Add common metadata + result['processing_timestamp'] = time.time() + + logger.info( + f"Successfully processed {document.document_type.value} document " + f"using {result.get('processing_method', 'unknown')} method" + ) + + return result + + except Exception as e: + logger.error(f'Document processing failed: {str(e)}') + raise + + +# Lazy singleton for default processor +_default_processor = None + + +def get_default_processor() -> DocumentProcessor: + """Get the default DocumentProcessor instance (lazy singleton).""" + global _default_processor + if _default_processor is None: + _default_processor = DocumentProcessor() + return _default_processor diff --git a/flo_ai/poetry.lock b/flo_ai/poetry.lock index 5daba1c5..434f281e 100644 --- a/flo_ai/poetry.lock +++ b/flo_ai/poetry.lock @@ -488,6 +488,18 @@ files = [ {file = "cfgv-3.4.0.tar.gz", hash = "sha256:e52591d4c5f5dead8e0f673fb16db7949d2cfb3f7da4582893288f0ded8fe560"}, ] +[[package]] +name = "chardet" +version = "5.2.0" +description = "Universal encoding detector for Python 3" +optional = false +python-versions = ">=3.7" +groups = ["main"] +files = [ + {file = "chardet-5.2.0-py3-none-any.whl", hash = "sha256:e1cf59446890a00105fe7b7912492ea04b6e6f06d4b742b2c788469e34c82970"}, + {file = "chardet-5.2.0.tar.gz", hash = "sha256:1b3b6ff479a8c414bc3fa2c0852995695c4a026dcd6d0633b2dd092ca39c1cf7"}, +] + [[package]] name = "charset-normalizer" version = "3.4.2" @@ -1130,16 +1142,16 @@ files = [ google-auth = ">=2.14.1,<3.0.0" googleapis-common-protos = ">=1.56.2,<2.0.0" grpcio = [ - {version = ">=1.33.2,<2.0.0", optional = true, markers = "extra == \"grpc\""}, {version = ">=1.49.1,<2.0.0", optional = true, markers = "python_version >= \"3.11\" and extra == \"grpc\""}, + {version = ">=1.33.2,<2.0.0", optional = true, markers = "python_version < \"3.11\" and extra == \"grpc\""}, ] grpcio-status = [ - {version = ">=1.33.2,<2.0.0", optional = true, markers = "extra == \"grpc\""}, {version = ">=1.49.1,<2.0.0", optional = true, markers = "python_version >= \"3.11\" and extra == \"grpc\""}, + {version = ">=1.33.2,<2.0.0", optional = true, markers = "extra == \"grpc\""}, ] proto-plus = [ - {version = ">=1.22.3,<2.0.0"}, {version = ">=1.25.0,<2.0.0", markers = "python_version >= \"3.13\""}, + {version = ">=1.22.3,<2.0.0"}, ] protobuf = ">=3.19.5,<3.20.0 || >3.20.0,<3.20.1 || >3.20.1,<4.21.0 || >4.21.0,<4.21.1 || >4.21.1,<4.21.2 || >4.21.2,<4.21.3 || >4.21.3,<4.21.4 || >4.21.4,<4.21.5 || >4.21.5,<7.0.0" requests = ">=2.18.0,<3.0.0" @@ -1302,8 +1314,8 @@ google-api-core = {version = ">=1.34.1,<2.0.dev0 || >=2.11.dev0,<3.0.0", extras google-auth = ">=2.14.1,<2.24.0 || >2.24.0,<2.25.0 || >2.25.0,<3.0.0" grpc-google-iam-v1 = ">=0.14.0,<1.0.0" proto-plus = [ - {version = ">=1.22.3,<2.0.0"}, {version = ">=1.25.0,<2.0.0", markers = "python_version >= \"3.13\""}, + {version = ">=1.22.3,<2.0.0"}, ] protobuf = ">=3.20.2,<4.21.0 || >4.21.0,<4.21.1 || >4.21.1,<4.21.2 || >4.21.2,<4.21.3 || >4.21.3,<4.21.4 || >4.21.4,<4.21.5 || >4.21.5,<7.0.0" @@ -2573,9 +2585,9 @@ files = [ [package.dependencies] numpy = [ - {version = ">=1.22.4", markers = "python_version < \"3.11\""}, - {version = ">=1.23.2", markers = "python_version == \"3.11\""}, {version = ">=1.26.0", markers = "python_version >= \"3.12\""}, + {version = ">=1.23.2", markers = "python_version == \"3.11\""}, + {version = ">=1.22.4", markers = "python_version < \"3.11\""}, ] python-dateutil = ">=2.8.2" pytz = ">=2020.1" @@ -3290,6 +3302,39 @@ files = [ [package.extras] windows-terminal = ["colorama (>=0.4.6)"] +[[package]] +name = "pymupdf" +version = "1.26.4" +description = "A high performance Python library for data extraction, analysis, conversion & manipulation of PDF (and other) documents." +optional = false +python-versions = ">=3.9" +groups = ["main"] +files = [ + {file = "pymupdf-1.26.4-cp39-abi3-macosx_10_9_x86_64.whl", hash = "sha256:cb95562a0a63ce906fd788bdad5239063b63068cf4a991684f43acb09052cb99"}, + {file = "pymupdf-1.26.4-cp39-abi3-macosx_11_0_arm64.whl", hash = "sha256:67e9e6b45832c33726651c2a031e9a20108fd9e759140b9e843f934de813a7ff"}, + {file = "pymupdf-1.26.4-cp39-abi3-manylinux_2_28_aarch64.whl", hash = "sha256:2604f687dd02b6a1b98c81bd8becfc0024899a2d2085adfe3f9e91607721fd22"}, + {file = "pymupdf-1.26.4-cp39-abi3-manylinux_2_28_x86_64.whl", hash = "sha256:973a6dda61ebd34040e4df3753bf004b669017663fbbfdaa294d44eceba98de0"}, + {file = "pymupdf-1.26.4-cp39-abi3-musllinux_1_2_x86_64.whl", hash = "sha256:299a49797df5b558e695647fa791329ba3911cbbb31ed65f24a6266c118ef1a7"}, + {file = "pymupdf-1.26.4-cp39-abi3-win32.whl", hash = "sha256:51b38379aad8c71bd7a8dd24d93fbe7580c2a5d9d7e1f9cd29ebbba315aa1bd1"}, + {file = "pymupdf-1.26.4-cp39-abi3-win_amd64.whl", hash = "sha256:0b6345a93a9afd28de2567e433055e873205c52e6b920b129ca50e836a3aeec6"}, + {file = "pymupdf-1.26.4.tar.gz", hash = "sha256:be13a066d42bfaed343a488168656637c4d9843ddc63b768dc827c9dfc6b9989"}, +] + +[[package]] +name = "pymupdf4llm" +version = "0.0.17" +description = "PyMuPDF Utilities for LLM/RAG" +optional = false +python-versions = "*" +groups = ["main"] +files = [ + {file = "pymupdf4llm-0.0.17-py3-none-any.whl", hash = "sha256:26de9996945f15e3ca507908f80dc18a959f5b5214bb2e302c7f7034089665a0"}, + {file = "pymupdf4llm-0.0.17.tar.gz", hash = "sha256:27287ef9fe0217cf37841a3ef2bcf70da2553c43d95ea39b664a6de6485678c3"}, +] + +[package.dependencies] +pymupdf = ">=1.24.10" + [[package]] name = "pyparsing" version = "3.2.3" @@ -3312,7 +3357,7 @@ version = "4.3.1" description = "A pure-python PDF library capable of splitting, merging, cropping, and transforming PDF files" optional = false python-versions = ">=3.6" -groups = ["dev"] +groups = ["main"] files = [ {file = "pypdf-4.3.1-py3-none-any.whl", hash = "sha256:64b31da97eda0771ef22edb1bfecd5deee4b72c3d1736b7df2689805076d6418"}, {file = "pypdf-4.3.1.tar.gz", hash = "sha256:b2f37fe9a3030aa97ca86067a56ba3f9d3565f9a791b305c7355d8392c30d91b"}, @@ -3612,6 +3657,29 @@ attrs = ">=22.2.0" rpds-py = ">=0.7.0" typing-extensions = {version = ">=4.4.0", markers = "python_version < \"3.13\""} +[[package]] +name = "reportlab" +version = "4.4.3" +description = "The Reportlab Toolkit" +optional = false +python-versions = "<4,>=3.7" +groups = ["dev"] +files = [ + {file = "reportlab-4.4.3-py3-none-any.whl", hash = "sha256:df905dc5ec5ddaae91fc9cb3371af863311271d555236410954961c5ee6ee1b5"}, + {file = "reportlab-4.4.3.tar.gz", hash = "sha256:073b0975dab69536acd3251858e6b0524ed3e087e71f1d0d1895acb50acf9c7b"}, +] + +[package.dependencies] +charset-normalizer = "*" +pillow = ">=9.0.0" + +[package.extras] +accel = ["rl_accel (>=0.9.0,<1.1)"] +bidi = ["rlbidi"] +pycairo = ["freetype-py (>=2.3.0,<2.4)", "rlPyCairo (>=0.2.0,<1)"] +renderpm = ["rl_renderPM (>=4.0.3,<4.1)"] +shaping = ["uharfbuzz"] + [[package]] name = "requests" version = "2.32.4" @@ -4466,4 +4534,4 @@ vizualize = ["matplotlib", "networkx"] [metadata] lock-version = "2.1" python-versions = ">=3.10,<4.0" -content-hash = "dcf3d63c0346f6c835e36784bd6d76b6bb810d36056fd9d8a4a816a3b6814283" +content-hash = "9fb94715a0d2fd21db13ad36d2a498d5da444bef6292dcb289d0b6dd7537b82f" diff --git a/flo_ai/pyproject.toml b/flo_ai/pyproject.toml index 8b527246..7d643513 100644 --- a/flo_ai/pyproject.toml +++ b/flo_ai/pyproject.toml @@ -21,6 +21,9 @@ networkx = { version = "^3.0", optional = true } anthropic = "^0.57.1" aiohttp = "^3.12.14" google-cloud-aiplatform = "^1.109.0" +pypdf = "^4.2.0" +pymupdf4llm = "^0.0.17" +chardet = "^5.2.0" [tool.poetry.extras] vizualize = ["matplotlib", "networkx"] @@ -28,7 +31,6 @@ vizualize = ["matplotlib", "networkx"] [tool.poetry.group.dev.dependencies] boto3 = "^1.36.1" botocore = "^1.36.1" -pypdf = "^4.2.0" ipykernel = "^6.29.5" db-sqlite3 = "^0.0.1" peewee = "^3.17.6" @@ -37,6 +39,7 @@ wikipedia = "^1.4.0" pytest = "^8.3.3" pytest-asyncio = "^0.24.0" pre-commit = "^4.0.1" +reportlab = "^4.4.3" streamlit = "^1.42.2" [build-system]