diff --git a/flo_ai/flo_ai/agent/base_agent.py b/flo_ai/flo_ai/agent/base_agent.py index 5b04015c..6d82914a 100644 --- a/flo_ai/flo_ai/agent/base_agent.py +++ b/flo_ai/flo_ai/agent/base_agent.py @@ -1,3 +1,4 @@ +import asyncio from typing import Dict, Any, List, Tuple, cast, Optional from abc import ABC, abstractmethod from enum import Enum @@ -9,6 +10,7 @@ FunctionMessage, ) from flo_ai.utils.variable_extractor import resolve_variables +from flo_ai.utils.profiler import aprofile class AgentType(Enum): @@ -85,16 +87,56 @@ def clear_history(self): self.conversation_history = [] async def _get_message_history(self, variables: Optional[Dict[str, Any]] = None): + async with aprofile(f'agent.{self.name}.get_message_history'): + return await self._get_message_history_impl(variables) + + async def _get_message_history_impl( + self, variables: Optional[Dict[str, Any]] = None + ): + """Build the message list passed to the LLM from the conversation history. + + Document formatting (the expensive step — PDF rasterization or + extraction) is dispatched concurrently via ``asyncio.gather`` and + cached on the ``DocumentMessageContent`` instance by the underlying + LLM, so the same document is formatted at most once per LLM across + all nodes and retries in a workflow. + """ variables = variables if variables is not None else {} - message_history = [] - for input in self.conversation_history: - # Handle FunctionMessage (OpenAI function role format) + + # First pass: kick off one formatting coroutine per *unique* document + # instance. If the same DocumentMessageContent is referenced at + # multiple indices, we share the single in-flight task so we never + # rasterize it twice concurrently. + doc_tasks_by_id: Dict[int, 'asyncio.Future[Any]'] = {} + doc_id_by_idx: Dict[int, int] = {} + for idx, input in enumerate(self.conversation_history): + if ( + not isinstance(input, FunctionMessage) + and isinstance(input.content, MediaMessageContent) + and input.content.type == 'document' + ): + doc_id = id(input.content) + doc_id_by_idx[idx] = doc_id + if doc_id not in doc_tasks_by_id: + doc_tasks_by_id[doc_id] = asyncio.ensure_future( + self.llm.format_document_in_message(input.content) # type: ignore[arg-type] + ) + + if doc_tasks_by_id: + formatted_docs = await asyncio.gather(*doc_tasks_by_id.values()) + formatted_by_doc_id: Dict[int, Any] = dict( + zip(doc_tasks_by_id.keys(), formatted_docs) + ) + else: + formatted_by_doc_id = {} + + # Second pass: assemble the provider-ready message list. + message_history: List[Dict[str, Any]] = [] + for idx, input in enumerate(self.conversation_history): if isinstance(input, FunctionMessage): message_history.append( {'role': input.role, 'name': input.name, 'content': input.content} ) - # CRITICAL: Check content type FIRST, before message type - # This ensures TextMessageContent objects are converted to strings elif isinstance(input.content, TextMessageContent): resolved_content = resolve_variables(input.content.text, variables) message_history.append( @@ -102,26 +144,22 @@ async def _get_message_history(self, variables: Optional[Dict[str, Any]] = None) ) elif isinstance(input.content, MediaMessageContent): if input.content.type == 'image': - # Format image message and add to history - formatted_content = self.llm.format_image_in_message(input.content) # type: ignore + formatted_content = self.llm.format_image_in_message(input.content) # type: ignore[arg-type] message_history.append( {'role': input.role, 'content': formatted_content} ) - elif input.content.type == 'document': - # Format document message and add to history - formatted_content = await self.llm.format_document_in_message( - input.content # type: ignore - ) message_history.append( - {'role': input.role, 'content': formatted_content} + { + 'role': input.role, + 'content': formatted_by_doc_id[doc_id_by_idx[idx]], + } ) else: raise ValueError( f'Invalid media message content type: {input.content.type}' ) elif isinstance(input.content, str): - # Handle other messages with string content (UserMessage, SystemMessage, etc.) resolved_content = resolve_variables(input.content, variables) message_history.append( {'role': input.role, 'content': resolved_content} diff --git a/flo_ai/flo_ai/arium/arium.py b/flo_ai/flo_ai/arium/arium.py index 715fcece..d7d0fd62 100644 --- a/flo_ai/flo_ai/arium/arium.py +++ b/flo_ai/flo_ai/arium/arium.py @@ -16,6 +16,7 @@ ) from flo_ai.telemetry.instrumentation import workflow_metrics from flo_ai.telemetry import get_tracer +from flo_ai.utils.profiler import aprofile, record as _profile_record from opentelemetry.trace import Status, StatusCode import asyncio import time @@ -203,6 +204,20 @@ async def _execute_graph( event_callback: Optional[Callable[[AriumEvent], None]] = None, events_filter: Optional[List[AriumEventType]] = None, variables: Optional[Dict[str, Any]] = None, + ): + async with aprofile( + f'arium.execute_graph[{getattr(self, "name", "unnamed_workflow")}]' + ): + return await self._execute_graph_impl( + inputs, event_callback, events_filter, variables + ) + + async def _execute_graph_impl( + self, + inputs: List[BaseMessage], + event_callback: Optional[Callable[[AriumEvent], None]] = None, + events_filter: Optional[List[AriumEventType]] = None, + variables: Optional[Dict[str, Any]] = None, ): variables = variables if variables is not None else {} [ @@ -464,33 +479,9 @@ async def _execute_node( }, ) as node_span: try: - # Execute the node based on its type - - if isinstance(node, Agent): - # Variables are already resolved, pass empty dict to avoid re-processing - result = await node.run(inputs, variables={}) - elif isinstance(node, FunctionNode): - result = await node.run(inputs, variables=None) - elif isinstance(node, ForEachNode): - foreach_results: List[ - MessageMemoryItem | BaseMessage - ] = await node.run( - inputs, - variables=variables, - ) - result = self._flatten_results(foreach_results) - elif isinstance(node, AriumNode): - # AriumNode execution - arium_result: List[MessageMemoryItem] = await node.run( - inputs, variables=variables - ) - result = self._flatten_results(arium_result) - elif isinstance(node, StartNode): - result = None - elif isinstance(node, EndNode): - result = None - else: - result = None + result = await self._dispatch_node_run( + node, node_type, inputs, variables + ) # Calculate execution time execution_time = time.time() - start_time @@ -503,6 +494,7 @@ async def _execute_node( workflow_metrics.record_node_latency( execution_time_ms, workflow_name, node.name, node_type ) + _profile_record(f'node.{node.name}[{node_type}]', execution_time) node_span.set_status(Status(StatusCode.OK)) node_span.set_attribute('node.execution_time_ms', execution_time_ms) @@ -553,33 +545,13 @@ async def _execute_node( else: # No telemetry or start/end node, execute without tracing try: - # Execute the node based on its type - if isinstance(node, Agent): - result = await node.run(inputs, variables={}) - elif isinstance(node, FunctionNode): - result = await node.run(inputs, variables=None) - elif isinstance(node, ForEachNode): - foreach_results: List[ - MessageMemoryItem | BaseMessage - ] = await node.run( - inputs, - variables=variables, - ) - result = self._flatten_results(foreach_results) - elif isinstance(node, AriumNode): - arium_result: List[MessageMemoryItem] = await node.run( - inputs, variables=variables - ) - result = self._flatten_results(arium_result) - elif isinstance(node, StartNode): - result = None - elif isinstance(node, EndNode): - result = None - else: - result = None + result = await self._dispatch_node_run( + node, node_type, inputs, variables + ) # Calculate execution time execution_time = time.time() - start_time + _profile_record(f'node.{node.name}[{node_type}]', execution_time) # Emit node completed event self._emit_event( @@ -611,6 +583,39 @@ async def _execute_node( # Re-raise the exception raise e + async def _dispatch_node_run( + self, + node: AriumNodeType, + node_type: str, + inputs: List[BaseMessage], + variables: Dict[str, Any], + ): + """Dispatch a node's ``run`` invocation under a profiler scope. + + Keeps the dispatch logic in one place so both the telemetry and + non-telemetry branches of ``_execute_node`` get consistent profiling. + """ + if node_type in ('start', 'end'): + return None + + async with aprofile(f'node.{node.name}[{node_type}]'): + if isinstance(node, Agent): + return await node.run(inputs, variables={}) + if isinstance(node, FunctionNode): + return await node.run(inputs, variables=None) + if isinstance(node, ForEachNode): + foreach_results: List[MessageMemoryItem | BaseMessage] = await node.run( + inputs, + variables=variables, + ) + return self._flatten_results(foreach_results) + if isinstance(node, AriumNode): + arium_result: List[MessageMemoryItem] = await node.run( + inputs, variables=variables + ) + return self._flatten_results(arium_result) + return None + def _flatten_results( self, sequence: List[MessageMemoryItem | BaseMessage | str] ) -> List[BaseMessage | str]: diff --git a/flo_ai/flo_ai/formatter/yaml_format_parser.py b/flo_ai/flo_ai/formatter/yaml_format_parser.py index 90029eb0..b160a76b 100644 --- a/flo_ai/flo_ai/formatter/yaml_format_parser.py +++ b/flo_ai/flo_ai/formatter/yaml_format_parser.py @@ -66,6 +66,7 @@ def __get_field_type_annotation( 'bool': bool, 'float': float, 'literal': self.__create_literal_type, + 'enum': self.__create_enum_type, 'object': lambda f: self.__create_nested_model(f, model_name), 'array': lambda f: self.__create_array_type(f, model_name), } @@ -78,7 +79,7 @@ def __get_field_type_annotation( return ( type_handler(field) - if field_type in ['literal', 'object', 'array'] + if field_type in ['literal', 'enum', 'object', 'array'] else type_handler ) @@ -89,10 +90,50 @@ def __create_literal_type(self, field: Dict[str, Any]) -> Any: raise ValueError( f"Field '{field['name']}' of type 'literal' must specify 'values'." ) - literals = tuple(literal_value['value'] for literal_value in literal_values) + literals = tuple( + self.__extract_enum_value(v, field['name']) for v in literal_values + ) # Construct Literal type dynamically at runtime return Literal.__getitem__(literals) + def __create_enum_type(self, field: Dict[str, Any]) -> Any: + """Creates a Literal type for an enum field. + + Enum values may be plain primitives (str/int/float) or dicts with a + ``value`` key (same shape as literal values). At the JSON-schema level + this emits ``{"enum": [...]}`` which every supported LLM backend handles + (OpenAI/Azure via response_format + function schema, Gemini via + response_schema, Anthropic/Ollama/Bedrock/vLLM via inlined schema in the + system prompt). + """ + raw_values = field.get('values', []) + if not raw_values: + raise ValueError( + f"Field '{field['name']}' of type 'enum' must specify 'values'." + ) + literals = tuple( + self.__extract_enum_value(v, field['name']) for v in raw_values + ) + return Literal.__getitem__(literals) + + @staticmethod + def __extract_enum_value(value: Any, field_name: str) -> Any: + """Normalize an enum/literal value entry to its primitive value.""" + if isinstance(value, dict): + if 'value' not in value: + raise ValueError( + f"Field '{field_name}' has an object-style value entry " + "missing the required 'value' key." + ) + return value['value'] + if isinstance(value, (str, int, float, bool)): + return value + raise ValueError( + f"Field '{field_name}' has an unsupported enum value of type " + f'{type(value).__name__}. Expected str, int, float or object with ' + "'value' key." + ) + def __create_array_type(self, field: Dict[str, Any], model_name: str) -> Any: """Creates a List type from field definition""" inner_type = self.__get_field_type_annotation( @@ -121,6 +162,17 @@ def __create_contract_from_json(self) -> BaseModel: This should be one of the values in the `value` column in the above csv. {default_prompt} """ + elif field['type'] == 'enum': + enum_values = [ + self.__extract_enum_value(v, field['name']) + for v in field.get('values', []) + ] + default_prompt = field.get('default_value_prompt', '') + field_description = ( + f"{field['description']}\n" + f'Must be exactly one of: {enum_values}.' + + (f'\n{default_prompt}' if default_prompt else '') + ) else: field_description = field['description'] diff --git a/flo_ai/flo_ai/llm/anthropic_llm.py b/flo_ai/flo_ai/llm/anthropic_llm.py index 7089a5a8..767cf77d 100644 --- a/flo_ai/flo_ai/llm/anthropic_llm.py +++ b/flo_ai/flo_ai/llm/anthropic_llm.py @@ -2,7 +2,9 @@ from anthropic import AsyncAnthropic import json -from flo_ai.models.chat_message import ImageMessageContent +import base64 as _base64 + +from flo_ai.models.chat_message import DocumentMessageContent, ImageMessageContent from .base_llm import BaseLLM from flo_ai.tool.base_tool import Tool from flo_ai.telemetry.instrumentation import ( @@ -25,7 +27,12 @@ def __init__( custom_headers: Optional[Dict[str, str]] = None, **kwargs, ): - super().__init__(model, api_key, temperature, **kwargs) + super().__init__( + model=model, + api_key=api_key, + temperature=temperature, + **kwargs, + ) # Filter out keys that are already passed explicitly to avoid duplicate keyword arguments filtered_kwargs = { @@ -249,6 +256,51 @@ def format_image_in_message(self, image: ImageMessageContent) -> str: """Format a image in the message""" raise NotImplementedError('Not implemented image for LLM Anthropic') + async def format_document_in_message( + self, document: DocumentMessageContent + ) -> List[Dict[str, Any]]: + """Return a native Claude document content block. + + Claude accepts PDFs directly as base64-encoded document blocks, so we + skip any local PDF parsing entirely. + """ + cache_key = self.__class__.__name__ + cache = getattr(document, '_formatted_cache', None) + if isinstance(cache, dict) and cache_key in cache: + return cache[cache_key] + + mime = document.mime_type or 'application/pdf' + if document.base64: + b64 = document.base64 + elif document.bytes: + b64 = _base64.b64encode(document.bytes).decode('utf-8') + elif document.url: + block = [ + { + 'type': 'document', + 'source': {'type': 'url', 'url': document.url}, + } + ] + if isinstance(cache, dict): + cache[cache_key] = block + return block + else: + raise ValueError('DocumentMessageContent has no bytes, base64, or url') + + block = [ + { + 'type': 'document', + 'source': { + 'type': 'base64', + 'media_type': mime, + 'data': b64, + }, + } + ] + if isinstance(cache, dict): + cache[cache_key] = block + return block + def get_assistant_message_for_tool_call( self, response: Dict[str, Any] ) -> Optional[Any]: diff --git a/flo_ai/flo_ai/llm/azure_openai_llm.py b/flo_ai/flo_ai/llm/azure_openai_llm.py index e85047a6..cc9fe3c4 100644 --- a/flo_ai/flo_ai/llm/azure_openai_llm.py +++ b/flo_ai/flo_ai/llm/azure_openai_llm.py @@ -37,9 +37,17 @@ def __init__( temperature: Sampling temperature custom_headers: Optional additional headers to send with each request **kwargs: Extra parameters forwarded to the SDK client / calls + + PDFs are sent to the underlying deployment as rasterized page images + via the Chat Completions multimodal format. The deployment must be + vision-capable (gpt-4o, gpt-4.1, gpt-5, etc.); text-only deployments + will error on the first document call. """ super().__init__( - model=model, api_key=api_key, temperature=temperature, **kwargs + model=model, + api_key=api_key, + temperature=temperature, + **kwargs, ) self.client = AsyncAzureOpenAI( api_key=self.api_key, diff --git a/flo_ai/flo_ai/llm/base_llm.py b/flo_ai/flo_ai/llm/base_llm.py index f6a62a68..adf6d554 100644 --- a/flo_ai/flo_ai/llm/base_llm.py +++ b/flo_ai/flo_ai/llm/base_llm.py @@ -1,8 +1,10 @@ +import asyncio +import base64 as _base64 from abc import ABC, abstractmethod from typing import Dict, Any, List, Optional, AsyncIterator from flo_ai.tool.base_tool import Tool -from flo_ai.utils.document_processor import get_default_processor from flo_ai.utils.logger import logger +from flo_ai.utils.profiler import aprofile, profile as _sync_profile from flo_ai.models.chat_message import DocumentMessageContent, ImageMessageContent @@ -121,24 +123,94 @@ def format_image_in_message(self, image: ImageMessageContent) -> Any: """Format a image in the message""" pass - async def format_document_in_message(self, document: DocumentMessageContent) -> str: - """Format a document in the message by extracting text content""" - - try: - # Process document to extract text - result = await get_default_processor().process_document(document) - - # Format the extracted content for the LLM - extracted_text = result.get('extracted_text', '') - doc_type = result.get('document_type', 'unknown') - - logger.info( - f'Successfully formatted {doc_type} document for {self.__class__.__name__} LLM' + async def format_document_in_message(self, document: DocumentMessageContent) -> Any: + """Return provider-native content block(s) for a document. + + Default implementation rasterizes PDF pages to PNG ``image_url`` + blocks in the OpenAI Chat Completions multimodal shape. This assumes + the underlying model is vision-capable (gpt-4o, gpt-4.1, gpt-5, + llava, etc.). LLMs that do not support image inputs will error at + request time — intentionally; the library does not ship a + text-extraction fallback because it silently hides capability + mismatches and usually produces worse results than using a vision + model. + + Providers with native PDF support (Anthropic, Gemini, Vertex, the + OpenAI Responses API, etc.) override this to return their native + document block. Results are cached on the DocumentMessageContent per + LLM class so the same document is formatted at most once across all + agent nodes and retries in a workflow. + """ + cache_key = self.__class__.__name__ + cache = getattr(document, '_formatted_cache', None) + if isinstance(cache, dict) and cache_key in cache: + return cache[cache_key] + + async with aprofile(f'llm.{cache_key}.format_document'): + try: + formatted = await asyncio.to_thread( + self._rasterize_pdf_to_images, document + ) + except Exception as e: + logger.error( + f'Error formatting document for {self.__class__.__name__}: {e}' + ) + raise Exception(f'Failed to format document: {str(e)}') + + if isinstance(cache, dict): + cache[cache_key] = formatted + return formatted + + def _rasterize_pdf_to_images( + self, document: DocumentMessageContent + ) -> List[Dict[str, Any]]: + """Rasterize a PDF to a list of OpenAI-style image_url blocks. + + Uses plain PyMuPDF (no pymupdf4llm). Skips text-extraction / table + detection entirely — vision-capable models read the page image + directly. DPI is configurable via the LLM kwargs `pdf_raster_dpi` + (default 150). + """ + import pymupdf + + data = self._document_to_bytes(document) + mime = document.mime_type or 'application/pdf' + if not mime.endswith('pdf'): + raise ValueError( + f'Default document formatter only supports PDFs, got mime={mime}. ' + f'Override format_document_in_message for {self.__class__.__name__}.' ) - return extracted_text - except Exception as e: - logger.error( - f'Error formatting document for {self.__class__.__name__}: {e}' + dpi = int((getattr(self, 'kwargs', None) or {}).get('pdf_raster_dpi', 150)) + doc = pymupdf.open(stream=data, filetype='pdf') + try: + blocks: List[Dict[str, Any]] = [] + for page_idx in range(doc.page_count): + page = doc.load_page(page_idx) + with _sync_profile(f'pdf.rasterize_page[dpi={dpi},page={page_idx}]'): + pix = page.get_pixmap(dpi=dpi) + png_bytes = pix.tobytes('png') + b64 = _base64.b64encode(png_bytes).decode('utf-8') + blocks.append( + { + 'type': 'image_url', + 'image_url': {'url': f'data:image/png;base64,{b64}'}, + } + ) + return blocks + finally: + doc.close() + + @staticmethod + def _document_to_bytes(document: DocumentMessageContent) -> bytes: + """Resolve a DocumentMessageContent to raw bytes.""" + if document.bytes: + return document.bytes + if document.base64: + return _base64.b64decode(document.base64) + if document.url: + raise ValueError( + 'URL-based documents are not supported by the default formatter; ' + 'fetch the document bytes first or override format_document_in_message.' ) - raise Exception(f'Failed to format document: {str(e)}') + raise ValueError('DocumentMessageContent has no bytes, base64, or url') diff --git a/flo_ai/flo_ai/llm/gemini_llm.py b/flo_ai/flo_ai/llm/gemini_llm.py index 9f38ffb1..578681e1 100644 --- a/flo_ai/flo_ai/llm/gemini_llm.py +++ b/flo_ai/flo_ai/llm/gemini_llm.py @@ -2,7 +2,7 @@ import asyncio from typing import Dict, Any, List, Optional, AsyncIterator from .base_llm import BaseLLM -from flo_ai.models.chat_message import ImageMessageContent +from flo_ai.models.chat_message import DocumentMessageContent, ImageMessageContent from google import genai from google.genai import types from flo_ai.tool.base_tool import Tool @@ -205,11 +205,12 @@ def get_next_chunk(): if hasattr(chunk, 'text') and chunk.text: yield {'content': chunk.text} - def get_message_content(self, response: Any) -> Optional[str]: - """Extract message content from response""" - if isinstance(response, dict): - return response.get('content', '') - return str(response) + def get_message_content(self, response: Dict[str, Any]) -> str: + if isinstance(response, str): + return response + if hasattr(response, 'content') and response.content is not None: + return str(response.content) + return '' def format_tool_for_llm(self, tool: 'Tool') -> Dict[str, Any]: """Format a single tool for Gemini's function declarations""" @@ -261,3 +262,32 @@ def format_image_in_message(self, image: ImageMessageContent) -> types.Part: raise NotImplementedError( f'Image formatting for Gemini LLM requires either url or base64 data. Received: url={image.url}, base64={bool(image.base64)}' ) + + async def format_document_in_message( + self, document: DocumentMessageContent + ) -> types.Part: + """Return a Gemini native Part for this document. + + Gemini accepts PDFs (and many other document formats) as `inline_data` + Parts, so there is no need to extract or rasterize locally. + """ + cache_key = self.__class__.__name__ + cache = getattr(document, '_formatted_cache', None) + if isinstance(cache, dict) and cache_key in cache: + return cache[cache_key] + + mime = document.mime_type or 'application/pdf' + if document.bytes: + part = types.Part.from_bytes(data=document.bytes, mime_type=mime) + elif document.base64: + part = types.Part.from_bytes( + data=base64.b64decode(document.base64), mime_type=mime + ) + elif document.url: + part = types.Part.from_uri(file_uri=document.url, mime_type=mime) + else: + raise ValueError('DocumentMessageContent has no bytes, base64, or url') + + if isinstance(cache, dict): + cache[cache_key] = part + return part diff --git a/flo_ai/flo_ai/llm/openai_llm.py b/flo_ai/flo_ai/llm/openai_llm.py index 1e585507..26282701 100644 --- a/flo_ai/flo_ai/llm/openai_llm.py +++ b/flo_ai/flo_ai/llm/openai_llm.py @@ -24,7 +24,10 @@ def __init__( **kwargs, ): super().__init__( - model=model, api_key=api_key, temperature=temperature, **kwargs + model=model, + api_key=api_key, + temperature=temperature, + **kwargs, ) self.client = AsyncOpenAI( diff --git a/flo_ai/flo_ai/llm/rootflo_llm.py b/flo_ai/flo_ai/llm/rootflo_llm.py index 2c969e78..2b40dc0c 100644 --- a/flo_ai/flo_ai/llm/rootflo_llm.py +++ b/flo_ai/flo_ai/llm/rootflo_llm.py @@ -1,7 +1,7 @@ from enum import Enum from typing import AsyncIterator, Dict, Any, List, Optional from datetime import datetime, timedelta -from flo_ai.models.chat_message import ImageMessageContent +from flo_ai.models.chat_message import DocumentMessageContent, ImageMessageContent import jwt import httpx import asyncio @@ -318,3 +318,9 @@ def format_image_in_message(self, image: ImageMessageContent) -> Any: if self._llm is None: raise RuntimeError('LLM initialization failed: _llm is None') return self._llm.format_image_in_message(image) + + async def format_document_in_message(self, document: DocumentMessageContent) -> Any: + """Delegate document formatting to the underlying provider LLM.""" + if self._llm is None: + raise RuntimeError('LLM initialization failed: _llm is None') + return await self._llm.format_document_in_message(document) diff --git a/flo_ai/flo_ai/models/agent.py b/flo_ai/flo_ai/models/agent.py index 17456201..175a7e74 100644 --- a/flo_ai/flo_ai/models/agent.py +++ b/flo_ai/flo_ai/models/agent.py @@ -35,13 +35,18 @@ class ParserFieldModel(BaseModel): """A field definition in a parser configuration.""" name: str = Field(..., description='Field name') - type: Literal['str', 'int', 'bool', 'float', 'literal', 'object', 'array'] = Field( - ..., description='Field type' - ) + type: Literal[ + 'str', 'int', 'bool', 'float', 'literal', 'enum', 'object', 'array' + ] = Field(..., description='Field type') description: str = Field(..., description='Field description') required: Optional[bool] = Field(None, description='Whether field is required') - values: Optional[List[LiteralValueModel]] = Field( - None, description='Values for literal type fields' + values: Optional[List[Union[LiteralValueModel, str, int, float]]] = Field( + None, + description=( + "Allowed values. For 'literal' type, use LiteralValueModel entries " + "(value + description [+ examples]). For 'enum' type, use plain " + 'primitives (str/int/float) or LiteralValueModel entries.' + ), ) items: Optional['ParserFieldModel'] = Field( None, description='Item type for array fields' @@ -50,15 +55,19 @@ class ParserFieldModel(BaseModel): None, description='Nested fields for object type fields' ) default_value_prompt: Optional[str] = Field( - None, description='Default value prompt for literal fields' + None, description='Default value prompt for literal/enum fields' ) def model_post_init(self, __context): - """Validate that literal type fields have values.""" + """Validate type-specific required attributes.""" if self.type == 'literal' and not self.values: raise ValueError( f"Field '{self.name}' of type 'literal' must specify 'values'." ) + if self.type == 'enum' and not self.values: + raise ValueError( + f"Field '{self.name}' of type 'enum' must specify 'values'." + ) if self.type == 'array' and not self.items: raise ValueError( f"Field '{self.name}' of type 'array' must specify 'items'." @@ -68,6 +77,17 @@ def model_post_init(self, __context): f"Field '{self.name}' of type 'object' must specify 'fields'." ) + if self.type == 'literal' and self.values: + non_literal = [ + v for v in self.values if not isinstance(v, LiteralValueModel) + ] + if non_literal: + raise ValueError( + f"Field '{self.name}' of type 'literal' requires each value to be " + "an object with 'value' and 'description'. Use type 'enum' for " + 'plain primitive values.' + ) + class ParserModel(BaseModel): """Parser configuration for structured output.""" diff --git a/flo_ai/flo_ai/models/chat_message.py b/flo_ai/flo_ai/models/chat_message.py index 27a51f04..5f9baf94 100644 --- a/flo_ai/flo_ai/models/chat_message.py +++ b/flo_ai/flo_ai/models/chat_message.py @@ -26,6 +26,7 @@ class ImageMessageContent(MediaMessageContent): def __post_init__(self): self.type = 'image' + self._formatted_cache: Dict[str, Any] = {} @dataclass @@ -37,6 +38,10 @@ class DocumentMessageContent(MediaMessageContent): def __post_init__(self): self.type = 'document' + # Cache of provider-formatted payloads keyed by LLM class name. + # Avoids re-running PDF rasterization / text extraction across + # multiple agent nodes or retry attempts. + self._formatted_cache: Dict[str, Any] = {} @dataclass diff --git a/flo_ai/flo_ai/telemetry/instrumentation.py b/flo_ai/flo_ai/telemetry/instrumentation.py index 4861af54..61ab4533 100644 --- a/flo_ai/flo_ai/telemetry/instrumentation.py +++ b/flo_ai/flo_ai/telemetry/instrumentation.py @@ -6,6 +6,7 @@ from functools import wraps from opentelemetry.trace import Status, StatusCode, Span from .telemetry import get_tracer, get_meter +from flo_ai.utils.profiler import aprofile, record as _profile_record import time import asyncio @@ -368,16 +369,25 @@ async def generate(self, messages): def decorator(func: Callable): @wraps(func) async def async_wrapper(*args, **kwargs): - tracer = get_tracer() - if not tracer: - return await func(*args, **kwargs) - - # Extract self to get instance attributes self_arg = args[0] if args else None actual_model = model or (getattr(self_arg, 'model', '') if self_arg else '') actual_provider = provider or ( self_arg.__class__.__name__ if self_arg else '' ) + profile_label = f'llm.{actual_provider}.generate[{actual_model}]' + + tracer = get_tracer() + if not tracer: + async with aprofile(profile_label): + start_time = time.time() + try: + return await func(*args, **kwargs) + finally: + llm_metrics.record_latency( + (time.time() - start_time) * 1000, + actual_model, + actual_provider, + ) with tracer.start_as_current_span( f'llm.{actual_provider}.generate', @@ -389,38 +399,45 @@ async def async_wrapper(*args, **kwargs): else 0.0, }, ) as span: - start_time = time.time() - try: - result = await func(*args, **kwargs) - - # Record success - duration_ms = (time.time() - start_time) * 1000 - llm_metrics.record_request(actual_model, actual_provider, 'success') - llm_metrics.record_latency( - duration_ms, actual_model, actual_provider - ) - - span.set_status(Status(StatusCode.OK)) - span.set_attribute('llm.response.received', True) - - return result - - except Exception as e: - # Record error - duration_ms = (time.time() - start_time) * 1000 - error_type = type(e).__name__ - - llm_metrics.record_request(actual_model, actual_provider, 'error') - llm_metrics.record_error(actual_model, actual_provider, error_type) - llm_metrics.record_latency( - duration_ms, actual_model, actual_provider - ) - - span.set_status(Status(StatusCode.ERROR, str(e))) - span.set_attribute('error.type', error_type) - span.set_attribute('error.message', str(e)) - - raise + async with aprofile(profile_label): + start_time = time.time() + try: + result = await func(*args, **kwargs) + + # Record success + duration_ms = (time.time() - start_time) * 1000 + llm_metrics.record_request( + actual_model, actual_provider, 'success' + ) + llm_metrics.record_latency( + duration_ms, actual_model, actual_provider + ) + + span.set_status(Status(StatusCode.OK)) + span.set_attribute('llm.response.received', True) + + return result + + except Exception as e: + # Record error + duration_ms = (time.time() - start_time) * 1000 + error_type = type(e).__name__ + + llm_metrics.record_request( + actual_model, actual_provider, 'error' + ) + llm_metrics.record_error( + actual_model, actual_provider, error_type + ) + llm_metrics.record_latency( + duration_ms, actual_model, actual_provider + ) + + span.set_status(Status(StatusCode.ERROR, str(e))) + span.set_attribute('error.type', error_type) + span.set_attribute('error.message', str(e)) + + raise @wraps(func) def sync_wrapper(*args, **kwargs): @@ -495,74 +512,84 @@ async def stream(self, messages): def decorator(func: Callable): @wraps(func) async def async_wrapper(*args, **kwargs): - tracer = get_tracer() - if not tracer: - async for chunk in func(*args, **kwargs): - yield chunk - return - - # Extract self to get instance attributes self_arg = args[0] if args else None actual_model = model or (getattr(self_arg, 'model', '') if self_arg else '') actual_provider = provider or ( self_arg.__class__.__name__ if self_arg else '' ) + stream_label = f'llm.{actual_provider}.stream[{actual_model}]' - with tracer.start_as_current_span( - f'llm.{actual_provider}.stream', - attributes={ - 'llm.provider': actual_provider, - 'llm.model': actual_model, - 'llm.temperature': getattr(self_arg, 'temperature', 0.0) - if self_arg - else 0.0, - 'llm.operation': 'stream', - }, - ) as span: - start_time = time.time() - chunk_count = 0 - try: - # Record stream start - llm_metrics.record_stream(actual_model, actual_provider, 'start') - - # Track the streaming response + tracer = get_tracer() + if not tracer: + async with aprofile(stream_label): async for chunk in func(*args, **kwargs): - chunk_count += 1 yield chunk + return - # Record success - duration_ms = (time.time() - start_time) * 1000 - llm_metrics.record_stream(actual_model, actual_provider, 'success') - llm_metrics.record_stream_chunks( - chunk_count, actual_model, actual_provider - ) - llm_metrics.record_stream_latency( - duration_ms, actual_model, actual_provider - ) - - span.set_status(Status(StatusCode.OK)) - span.set_attribute('llm.stream.chunks', chunk_count) - span.set_attribute('llm.stream.duration_ms', duration_ms) - span.set_attribute('llm.stream.completed', True) - - except Exception as e: - # Record error - duration_ms = (time.time() - start_time) * 1000 - error_type = type(e).__name__ - - llm_metrics.record_stream(actual_model, actual_provider, 'error') - llm_metrics.record_error(actual_model, actual_provider, error_type) - llm_metrics.record_stream_latency( - duration_ms, actual_model, actual_provider - ) - - span.set_status(Status(StatusCode.ERROR, str(e))) - span.set_attribute('error.type', error_type) - span.set_attribute('error.message', str(e)) - span.set_attribute('llm.stream.chunks', chunk_count) - span.set_attribute('llm.stream.duration_ms', duration_ms) - - raise + async with aprofile(stream_label): + with tracer.start_as_current_span( + f'llm.{actual_provider}.stream', + attributes={ + 'llm.provider': actual_provider, + 'llm.model': actual_model, + 'llm.temperature': getattr(self_arg, 'temperature', 0.0) + if self_arg + else 0.0, + 'llm.operation': 'stream', + }, + ) as span: + start_time = time.time() + chunk_count = 0 + try: + # Record stream start + llm_metrics.record_stream( + actual_model, actual_provider, 'start' + ) + + # Track the streaming response + async for chunk in func(*args, **kwargs): + chunk_count += 1 + yield chunk + + # Record success + duration_ms = (time.time() - start_time) * 1000 + llm_metrics.record_stream( + actual_model, actual_provider, 'success' + ) + llm_metrics.record_stream_chunks( + chunk_count, actual_model, actual_provider + ) + llm_metrics.record_stream_latency( + duration_ms, actual_model, actual_provider + ) + + span.set_status(Status(StatusCode.OK)) + span.set_attribute('llm.stream.chunks', chunk_count) + span.set_attribute('llm.stream.duration_ms', duration_ms) + span.set_attribute('llm.stream.completed', True) + + except Exception as e: + # Record error + duration_ms = (time.time() - start_time) * 1000 + error_type = type(e).__name__ + + llm_metrics.record_stream( + actual_model, actual_provider, 'error' + ) + llm_metrics.record_error( + actual_model, actual_provider, error_type + ) + llm_metrics.record_stream_latency( + duration_ms, actual_model, actual_provider + ) + + span.set_status(Status(StatusCode.ERROR, str(e))) + span.set_attribute('error.type', error_type) + span.set_attribute('error.message', str(e)) + span.set_attribute('llm.stream.chunks', chunk_count) + span.set_attribute('llm.stream.duration_ms', duration_ms) + + raise return async_wrapper @@ -585,58 +612,65 @@ async def run(self, inputs): def decorator(func: Callable): @wraps(func) async def async_wrapper(*args, **kwargs): - tracer = get_tracer() - if not tracer: - return await func(*args, **kwargs) - self_arg = args[0] if args else None actual_agent_name = agent_name or ( getattr(self_arg, 'name', '') if self_arg else '' ) agent_type = getattr(self_arg, 'agent_type', '') if self_arg else '' + profile_label = f'agent.{actual_agent_name}.run' - with tracer.start_as_current_span( - f'agent.{actual_agent_name}.run', - attributes={ - 'agent.name': actual_agent_name, - 'agent.type': str(agent_type), - }, - ) as span: - start_time = time.time() - try: - result = await func(*args, **kwargs) - - duration_ms = (time.time() - start_time) * 1000 - agent_metrics.record_execution( - actual_agent_name, str(agent_type), 'success' - ) - agent_metrics.record_latency( - duration_ms, actual_agent_name, str(agent_type) - ) - - span.set_status(Status(StatusCode.OK)) - span.set_attribute( - 'agent.result.length', len(str(result)) if result else 0 - ) - - return result - - except Exception as e: - duration_ms = (time.time() - start_time) * 1000 - error_type = type(e).__name__ - - agent_metrics.record_execution( - actual_agent_name, str(agent_type), 'error' - ) - agent_metrics.record_error(actual_agent_name, error_type) - agent_metrics.record_latency( - duration_ms, actual_agent_name, str(agent_type) - ) - - span.set_status(Status(StatusCode.ERROR, str(e))) - span.set_attribute('error.type', error_type) - - raise + tracer = get_tracer() + if not tracer: + async with aprofile(profile_label): + start_time = time.time() + try: + return await func(*args, **kwargs) + finally: + _profile_record(profile_label, time.time() - start_time) + + async with aprofile(profile_label): + with tracer.start_as_current_span( + f'agent.{actual_agent_name}.run', + attributes={ + 'agent.name': actual_agent_name, + 'agent.type': str(agent_type), + }, + ) as span: + start_time = time.time() + try: + result = await func(*args, **kwargs) + + duration_ms = (time.time() - start_time) * 1000 + agent_metrics.record_execution( + actual_agent_name, str(agent_type), 'success' + ) + agent_metrics.record_latency( + duration_ms, actual_agent_name, str(agent_type) + ) + + span.set_status(Status(StatusCode.OK)) + span.set_attribute( + 'agent.result.length', len(str(result)) if result else 0 + ) + + return result + + except Exception as e: + duration_ms = (time.time() - start_time) * 1000 + error_type = type(e).__name__ + + agent_metrics.record_execution( + actual_agent_name, str(agent_type), 'error' + ) + agent_metrics.record_error(actual_agent_name, error_type) + agent_metrics.record_latency( + duration_ms, actual_agent_name, str(agent_type) + ) + + span.set_status(Status(StatusCode.ERROR, str(e))) + span.set_attribute('error.type', error_type) + + raise return async_wrapper diff --git a/flo_ai/flo_ai/utils/document_processor.py b/flo_ai/flo_ai/utils/document_processor.py index 3b6a358d..86d30d2d 100644 --- a/flo_ai/flo_ai/utils/document_processor.py +++ b/flo_ai/flo_ai/utils/document_processor.py @@ -1,17 +1,18 @@ """ Document processing utilities for Flo AI framework. -This module provides extensible document processing capabilities for PDF and TXT files, -with a factory pattern design for easy addition of new document types. +This module exposes lightweight helpers used by LLM adapters that need to +produce a text representation of a document (e.g. the plain Ollama adapter) +or an image rasterization of a PDF (e.g. vision chat models that do not +accept PDFs natively). """ import base64 import time from abc import ABC, abstractmethod -from typing import Dict, Any, Union +from typing import Any, Dict, Union import pymupdf -import pymupdf4llm import chardet from flo_ai.models.document import DocumentType @@ -30,37 +31,29 @@ class BaseDocumentProcessor(ABC): @abstractmethod async def process(self, document: DocumentMessageContent) -> Dict[str, Any]: - """ - Process a document and return extracted content and metadata. - - Args: - document: DocumentMessageContent containing document data - - Returns: - Dict containing extracted text, metadata, and processing info - """ + """Process a document and return extracted content and metadata.""" pass class PDFProcessor(BaseDocumentProcessor): - """Processor for PDF documents.""" + """Processor for PDF documents using PyMuPDF's text layer. + + Intentionally avoids pymupdf4llm / markdown conversion / table detection: + modern LLMs perform that structuring far better than a local heuristic, + and the raw text layer is 10-50x cheaper to produce. + """ async def process(self, document: DocumentMessageContent) -> Dict[str, Any]: - """Extract text and metadata from PDF document.""" try: pdf_content = await self._get_pdf_content(document) - - # Process with pymupdf4llm (LLM-optimized) - text_data = await self._process_with_pymupdf4llm(pdf_content) - + text_data = self._extract_with_pymupdf(pdf_content) return { 'extracted_text': text_data['text'], 'page_count': text_data.get('page_count', 0), - 'processing_method': text_data.get('method', 'unknown'), + 'processing_method': text_data.get('method', 'pymupdf'), 'metadata': text_data.get('metadata', {}), 'document_type': DocumentType.PDF.value, } - except Exception as e: logger.error(f'Error processing PDF: {str(e)}') raise DocumentProcessingError(f'Failed to process PDF: {str(e)}') @@ -68,38 +61,31 @@ async def process(self, document: DocumentMessageContent) -> Dict[str, Any]: async def _get_pdf_content( self, document: DocumentMessageContent ) -> Union[str, bytes]: - """Get PDF content from various sources.""" if document.bytes: return document.bytes - elif document.base64: + if document.base64: return base64.b64decode(document.base64) - elif document.url: + if document.url: return document.url - else: - raise DocumentProcessingError('No PDF content provided') + raise DocumentProcessingError('No PDF content provided') - async def _process_with_pymupdf4llm( - self, pdf_content: Union[str, bytes] - ) -> Dict[str, Any]: - """Process PDF using pymupdf4llm (LLM-optimized).""" + @staticmethod + def _extract_with_pymupdf(pdf_content: Union[str, bytes]) -> Dict[str, Any]: + """Extract plain text using PyMuPDF. No markdown, no table OCR.""" if isinstance(pdf_content, str): - # File path - pass directly to pymupdf4llm - text_data = pymupdf4llm.to_markdown(pdf_content) - metadata = {} + doc = pymupdf.open(pdf_content) else: - # Bytes - create PyMuPDF Document from memory - doc = pymupdf.open(stream=pdf_content) - try: - text_data = pymupdf4llm.to_markdown(doc) - metadata = {} - finally: - doc.close() # Clean up document object + doc = pymupdf.open(stream=pdf_content, filetype='pdf') + try: + pages = [page.get_text() for page in doc] + finally: + doc.close() return { - 'text': text_data, - 'method': 'pymupdf4llm', - 'metadata': metadata, - 'page_count': len(text_data.split('\n---\n')) if '---' in text_data else 1, + 'text': '\n\n---\n\n'.join(pages), + 'method': 'pymupdf', + 'metadata': {}, + 'page_count': len(pages), } @@ -107,10 +93,8 @@ class TXTProcessor(BaseDocumentProcessor): """Processor for text documents.""" async def process(self, document: DocumentMessageContent) -> Dict[str, Any]: - """Extract text from TXT document.""" try: text_content = await self._get_text_content(document) - return { 'extracted_text': text_content, 'page_count': 1, @@ -122,37 +106,18 @@ async def process(self, document: DocumentMessageContent) -> Dict[str, Any]: }, 'document_type': DocumentType.TXT.value, } - except Exception as e: logger.error(f'Error processing TXT: {str(e)}') raise DocumentProcessingError(f'Failed to process TXT: {str(e)}') async def _get_text_content(self, document: DocumentMessageContent) -> str: - """Get text content from various sources.""" if document.bytes: return await self._decode_bytes(document.bytes) - elif document.base64: - decoded_bytes = base64.b64decode(document.base64) - return await self._decode_bytes(decoded_bytes) - else: - raise DocumentProcessingError('No TXT content provided') - - async def _read_text_file(self, file_path: str) -> str: - """Read text file with encoding detection.""" - try: - # Try UTF-8 first - with open(file_path, 'r', encoding='utf-8') as f: - return f.read() - except UnicodeDecodeError: - # Try encoding detection with chardet - with open(file_path, 'rb') as f: - raw_data = f.read() - detected = chardet.detect(raw_data) - encoding = detected.get('encoding', 'utf-8') - return raw_data.decode(encoding, errors='replace') + if document.base64: + return await self._decode_bytes(base64.b64decode(document.base64)) + raise DocumentProcessingError('No TXT content provided') async def _decode_bytes(self, content_bytes: bytes) -> str: - """Decode bytes with encoding detection.""" try: return content_bytes.decode('utf-8') except UnicodeDecodeError: @@ -162,44 +127,26 @@ async def _decode_bytes(self, content_bytes: bytes) -> str: class DocumentProcessor: - """ - Main document processor with factory pattern for extensibility. + """Factory / dispatcher for :class:`BaseDocumentProcessor` implementations.""" - Supports PDF and TXT documents with easy extension for new types. - """ - - def __init__(self): - self._processors = { + def __init__(self) -> None: + self._processors: Dict[DocumentType, BaseDocumentProcessor] = { DocumentType.PDF: PDFProcessor(), DocumentType.TXT: TXTProcessor(), } def register_processor( self, document_type: DocumentType, processor: BaseDocumentProcessor - ): - """Register a new document processor for a specific type.""" + ) -> None: + """Register a processor for an additional document type.""" self._processors[document_type] = processor async def process_document( self, document: DocumentMessageContent ) -> Dict[str, Any]: - """ - Process a document using the appropriate processor. - - Args: - document: DocumentMessageContent containing document data - - Returns: - Dict containing extracted content and metadata - - Raises: - DocumentProcessingError: If processing fails or document type unsupported - """ - # Convert mime_type string to DocumentType enum if not document.mime_type: raise DocumentProcessingError('Document mime_type is required') - # Map mime_type string to DocumentType enum document_type = None for doc_type in DocumentType: if doc_type.value == document.mime_type: @@ -212,28 +159,21 @@ async def process_document( f'Supported types: {[dt.value for dt in self._processors.keys()]}' ) - processor: BaseDocumentProcessor = self._processors[document_type] - + processor = self._processors[document_type] try: result = await processor.process(document) - - # Add common metadata result['processing_timestamp'] = time.time() - logger.info( f"Successfully processed {document_type.value} document " f"using {result.get('processing_method', 'unknown')} method" ) - return result - except Exception as e: logger.error(f'Document processing failed: {str(e)}') raise -# Lazy singleton for default processor -_default_processor = None +_default_processor: 'DocumentProcessor | None' = None def get_default_processor() -> DocumentProcessor: diff --git a/flo_ai/flo_ai/utils/profiler.py b/flo_ai/flo_ai/utils/profiler.py new file mode 100644 index 00000000..549f2bca --- /dev/null +++ b/flo_ai/flo_ai/utils/profiler.py @@ -0,0 +1,286 @@ +"""Lightweight profiler for flo_ai. + +This module provides a simple, zero-cost-when-disabled profiler that: + +* Writes a human-readable flow log to a file with indentation that reflects + call-nesting (works correctly with asyncio thanks to ``contextvars``). +* Emits a summary at process exit listing every instrumented section sorted + by total time spent, with call count and average duration. + +The profiler is disabled by default. Enable it either by calling +:func:`enable_profiling` explicitly, or by setting the ``FLO_AI_PROFILE`` +environment variable to a file path:: + + FLO_AI_PROFILE=profile.log python examples/azure_llm_example.py + +Environment variables +--------------------- +``FLO_AI_PROFILE`` + Path to the output file. When set, profiling is enabled automatically on + import and the summary is written on process exit. +``FLO_AI_PROFILE_CONSOLE`` + When set to a truthy value (``1``, ``true``, ``yes``), each enter/exit + line is also mirrored to the ``flo_ai`` logger at INFO level. +""" + +from __future__ import annotations + +import atexit +import contextvars +import functools +import os +import threading +import time +from collections import defaultdict +from contextlib import asynccontextmanager, contextmanager +from pathlib import Path +from typing import Any, Callable, Dict, List, Optional + +__all__ = [ + 'enable_profiling', + 'disable_profiling', + 'is_enabled', + 'profile_async', + 'profile_sync', + 'aprofile', + 'profile', + 'record', + 'write_summary', +] + + +_enabled: bool = False +_log_path: Optional[Path] = None +_file_handle: Optional[Any] = None +_lock = threading.Lock() +_mirror_console: bool = False + +_depth: contextvars.ContextVar[int] = contextvars.ContextVar( + 'flo_ai_profile_depth', default=0 +) + +_totals: Dict[str, List[float]] = defaultdict(list) +_process_start: float = time.perf_counter() + + +def _truthy(val: Optional[str]) -> bool: + if not val: + return False + return val.strip().lower() in {'1', 'true', 'yes', 'on'} + + +def enable_profiling( + log_file: str | os.PathLike[str] = 'flo_ai_profile.log', + mirror_console: bool = False, +) -> Path: + """Enable the profiler and direct output to ``log_file``. + + Safe to call multiple times; subsequent calls reopen the file. + """ + global _enabled, _log_path, _file_handle, _mirror_console, _process_start + + disable_profiling() + + path = Path(log_file).resolve() + path.parent.mkdir(parents=True, exist_ok=True) + _file_handle = open(path, 'w', buffering=1, encoding='utf-8') + _log_path = path + _mirror_console = mirror_console + _enabled = True + _process_start = time.perf_counter() + + header = ( + f"=== flo_ai profiler started at {time.strftime('%Y-%m-%d %H:%M:%S')} ===\n" + f"Output: {path}\n" + ) + _file_handle.write(header) + + atexit.register(write_summary) + return path + + +def disable_profiling() -> None: + global _enabled, _file_handle, _log_path + _enabled = False + if _file_handle is not None: + try: + _file_handle.flush() + _file_handle.close() + except Exception: + pass + _file_handle = None + _log_path = None + _totals.clear() + + +def is_enabled() -> bool: + return _enabled + + +def _write_line(line: str) -> None: + if _file_handle is None: + return + with _lock: + _file_handle.write(line + '\n') + if _mirror_console: + try: + from flo_ai.utils.logger import logger # late import to avoid cycles + + logger.info('profile | %s', line) + except Exception: + pass + + +def record(name: str, duration_sec: float) -> None: + """Record a duration against ``name`` without emitting flow lines. + + Useful when the surrounding code already computed an elapsed time (for + example from an OpenTelemetry span) and you only want it counted in the + summary. + """ + if not _enabled: + return + _totals[name].append(duration_sec) + + +def _emit_enter(name: str, depth: int) -> None: + indent = ' ' * depth + ts = time.perf_counter() - _process_start + _write_line(f'{ts:>10.3f}s {indent}-> {name}') + + +def _emit_exit(name: str, depth: int, elapsed: float, error: Optional[str]) -> None: + indent = ' ' * depth + ts = time.perf_counter() - _process_start + status = f' [ERROR: {error}]' if error else '' + _write_line(f'{ts:>10.3f}s {indent}<- {name} ({elapsed * 1000:.2f} ms){status}') + + +@asynccontextmanager +async def aprofile(name: str): + """Async context manager that records a profiled section.""" + if not _enabled: + yield + return + + depth = _depth.get() + _emit_enter(name, depth) + token = _depth.set(depth + 1) + start = time.perf_counter() + err: Optional[str] = None + try: + yield + except BaseException as e: # noqa: BLE001 - we re-raise + err = type(e).__name__ + raise + finally: + elapsed = time.perf_counter() - start + _depth.reset(token) + _totals[name].append(elapsed) + _emit_exit(name, depth, elapsed, err) + + +@contextmanager +def profile(name: str): + """Sync context manager that records a profiled section.""" + if not _enabled: + yield + return + + depth = _depth.get() + _emit_enter(name, depth) + token = _depth.set(depth + 1) + start = time.perf_counter() + err: Optional[str] = None + try: + yield + except BaseException as e: # noqa: BLE001 - we re-raise + err = type(e).__name__ + raise + finally: + elapsed = time.perf_counter() - start + _depth.reset(token) + _totals[name].append(elapsed) + _emit_exit(name, depth, elapsed, err) + + +def profile_async(name: Optional[str] = None) -> Callable: + """Decorator for async functions.""" + + def decorator(func: Callable) -> Callable: + label = ( + name + or f'{getattr(func, "__module__", "")}.{getattr(func, "__qualname__", getattr(func, "__name__", ""))}' + ) + + @functools.wraps(func) + async def wrapper(*args: Any, **kwargs: Any) -> Any: + async with aprofile(label): + return await func(*args, **kwargs) + + return wrapper + + return decorator + + +def profile_sync(name: Optional[str] = None) -> Callable: + """Decorator for synchronous functions.""" + + def decorator(func: Callable) -> Callable: + label = ( + name + or f'{getattr(func, "__module__", "")}.{getattr(func, "__qualname__", getattr(func, "__name__", ""))}' + ) + + @functools.wraps(func) + def wrapper(*args: Any, **kwargs: Any) -> Any: + with profile(label): + return func(*args, **kwargs) + + return wrapper + + return decorator + + +def write_summary() -> None: + """Write a summary table of total time per section to the log file.""" + if not _enabled or _file_handle is None: + return + + rows = [] + for name, durations in _totals.items(): + total = sum(durations) + rows.append((total, len(durations), name)) + rows.sort(reverse=True) + + with _lock: + _file_handle.write('\n=== SUMMARY (sections sorted by total wall time) ===\n') + _file_handle.write( + f"{'total_ms':>12} {'count':>6} {'avg_ms':>10} {'max_ms':>10} name\n" + ) + for total, count, name in rows: + durations = _totals[name] + avg = total / count if count else 0.0 + mx = max(durations) if durations else 0.0 + _file_handle.write( + f'{total * 1000:>12.2f} {count:>6} {avg * 1000:>10.2f} ' + f'{mx * 1000:>10.2f} {name}\n' + ) + total_wall = time.perf_counter() - _process_start + _file_handle.write( + f'\nTotal wall time since profiler start: {total_wall * 1000:.2f} ms\n' + ) + try: + _file_handle.flush() + except Exception: + pass + + +_env_path = os.environ.get('FLO_AI_PROFILE') +if _env_path: + try: + enable_profiling( + _env_path, mirror_console=_truthy(os.environ.get('FLO_AI_PROFILE_CONSOLE')) + ) + except Exception: + pass diff --git a/flo_ai/pyproject.toml b/flo_ai/pyproject.toml index b8c7f3e1..39cdba24 100644 --- a/flo_ai/pyproject.toml +++ b/flo_ai/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "flo_ai" -version = "1.1.3" +version = "1.1.4" description = "A easy way to create structured AI agents" authors = [{ name = "rootflo", email = "engineering.tools@rootflo.ai" }] requires-python = ">=3.10,<4.0" @@ -23,8 +23,7 @@ dependencies = [ "opentelemetry-instrumentation>=0.54b0", "pydantic>=2.9.2,<3", "pyjwt>=2.10.1", - "pypdf>=4.2.0,<5", - "pymupdf4llm>=0.0.17,<0.0.18", + "pymupdf>=1.24.0,<2", "pyyaml>=6.0.3,<7", ] diff --git a/flo_ai/uv.lock b/flo_ai/uv.lock index 5188c2c9..8e90697f 100644 --- a/flo_ai/uv.lock +++ b/flo_ai/uv.lock @@ -903,8 +903,7 @@ dependencies = [ { name = "opentelemetry-sdk" }, { name = "pydantic" }, { name = "pyjwt" }, - { name = "pymupdf4llm" }, - { name = "pypdf" }, + { name = "pymupdf" }, { name = "pyyaml" }, ] @@ -949,8 +948,7 @@ requires-dist = [ { name = "opentelemetry-sdk", specifier = ">=1.28.2,<2" }, { name = "pydantic", specifier = ">=2.9.2,<3" }, { name = "pyjwt", specifier = ">=2.10.1" }, - { name = "pymupdf4llm", specifier = ">=0.0.17,<0.0.18" }, - { name = "pypdf", specifier = ">=4.2.0,<5" }, + { name = "pymupdf", specifier = ">=1.24.0,<2" }, { name = "pyyaml", specifier = ">=6.0.3,<7" }, ] provides-extras = ["vizualize"] @@ -3278,18 +3276,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/c6/96/fd59c1532891762ea4815e73956c532053d5e26d56969e1e5d1e4ca4b207/pymupdf-1.26.5-cp39-abi3-win_amd64.whl", hash = "sha256:39a6fb58182b27b51ea8150a0cd2e4ee7e0cf71e9d6723978f28699b42ee61ae", size = 18747258, upload-time = "2025-10-10T14:01:37.346Z" }, ] -[[package]] -name = "pymupdf4llm" -version = "0.0.17" -source = { registry = "https://pypi.org/simple" } -dependencies = [ - { name = "pymupdf" }, -] -sdist = { url = "https://files.pythonhosted.org/packages/2e/3c/1a530a410bdf76d83289bf30b3b86236d338b3f5f21842790c2cf7e9c1f6/pymupdf4llm-0.0.17.tar.gz", hash = "sha256:27287ef9fe0217cf37841a3ef2bcf70da2553c43d95ea39b664a6de6485678c3", size = 25180, upload-time = "2024-09-21T18:40:01.033Z" } -wheels = [ - { url = "https://files.pythonhosted.org/packages/ae/af/1576ecfc8a62d31c0c8b34b856e52f6b05f1d76546dbac0e1d037f044a9e/pymupdf4llm-0.0.17-py3-none-any.whl", hash = "sha256:26de9996945f15e3ca507908f80dc18a959f5b5214bb2e302c7f7034089665a0", size = 26190, upload-time = "2024-09-21T18:40:03.097Z" }, -] - [[package]] name = "pyparsing" version = "3.2.5" @@ -3299,18 +3285,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/10/5e/1aa9a93198c6b64513c9d7752de7422c06402de6600a8767da1524f9570b/pyparsing-3.2.5-py3-none-any.whl", hash = "sha256:e38a4f02064cf41fe6593d328d0512495ad1f3d8a91c4f73fc401b3079a59a5e", size = 113890, upload-time = "2025-09-21T04:11:04.117Z" }, ] -[[package]] -name = "pypdf" -version = "4.3.1" -source = { registry = "https://pypi.org/simple" } -dependencies = [ - { name = "typing-extensions", marker = "python_full_version < '3.11'" }, -] -sdist = { url = "https://files.pythonhosted.org/packages/f0/65/2ed7c9e1d31d860f096061b3dd2d665f501e09faaa0409a3f0d719d2a16d/pypdf-4.3.1.tar.gz", hash = "sha256:b2f37fe9a3030aa97ca86067a56ba3f9d3565f9a791b305c7355d8392c30d91b", size = 293266, upload-time = "2024-07-21T19:35:20.207Z" } -wheels = [ - { url = "https://files.pythonhosted.org/packages/3c/60/eccdd92dd4af3e4bea6d6a342f7588c618a15b9bec4b968af581e498bcc4/pypdf-4.3.1-py3-none-any.whl", hash = "sha256:64b31da97eda0771ef22edb1bfecd5deee4b72c3d1736b7df2689805076d6418", size = 295825, upload-time = "2024-07-21T19:35:18.126Z" }, -] - [[package]] name = "pytest" version = "8.4.2" diff --git a/wavefront/server/modules/tools_module/tools_module/utils/message_processor_fn.py b/wavefront/server/modules/tools_module/tools_module/utils/message_processor_fn.py index ce327eb3..42f9ca74 100644 --- a/wavefront/server/modules/tools_module/tools_module/utils/message_processor_fn.py +++ b/wavefront/server/modules/tools_module/tools_module/utils/message_processor_fn.py @@ -18,7 +18,7 @@ async def execute_message_processor_fn(message_processor_id: str, **kwargs) -> s # Remove message_processor_id from kwargs (it's not part of input_data) input_data = {k: v for k, v in kwargs.items() if k != 'message_processor_id'} - payload = ExecuteMessageProcessorPayload(input_data=input_data) + payload = ExecuteMessageProcessorPayload(input_data=input_data['kwargs']) response = await execute_message_processor(message_processor_id, payload) response_body_bytes = response.body