From 82d2445945d30222116a7b5ff93a1f2620d4aa03 Mon Sep 17 00:00:00 2001 From: thomastomy5 <69713148+thomastomy5@users.noreply.github.com> Date: Tue, 10 Dec 2024 11:20:19 +0530 Subject: [PATCH 1/8] added tool logging --- flo_ai/callbacks/tool_logger.py | 194 +++++++++++++++++++++++++++++++ flo_ai/core.py | 3 +- flo_ai/example/tool_log_ex.py | 80 +++++++++++++ flo_ai/state/flo_session.py | 4 + flo_ai/storage/data_collector.py | 29 +++++ 5 files changed, 309 insertions(+), 1 deletion(-) create mode 100644 flo_ai/callbacks/tool_logger.py create mode 100644 flo_ai/example/tool_log_ex.py create mode 100644 flo_ai/storage/data_collector.py diff --git a/flo_ai/callbacks/tool_logger.py b/flo_ai/callbacks/tool_logger.py new file mode 100644 index 00000000..8e54ada6 --- /dev/null +++ b/flo_ai/callbacks/tool_logger.py @@ -0,0 +1,194 @@ +import json +from abc import ABC, abstractmethod +from typing import Any, Dict, Optional, List +from pathlib import Path +from datetime import datetime +from uuid import UUID +from langchain_core.callbacks import BaseCallbackHandler +from langchain.schema.agent import AgentAction, AgentFinish +from langchain.schema import HumanMessage, AIMessage, BaseMessage +from langchain_core.prompts.chat import ChatPromptValue +from flo_ai.storage.data_collector import DataCollector + +class EnhancedJSONEncoder(json.JSONEncoder): + def default(self, obj): + if isinstance(obj, (HumanMessage, AIMessage, BaseMessage)): + return { + "type": obj.__class__.__name__, + "content": obj.content, + "additional_kwargs": obj.additional_kwargs + } + elif isinstance(obj, AgentAction): + return { + "type": "AgentAction", + "tool": obj.tool, + "tool_input": obj.tool_input, + "log": obj.log + } + elif isinstance(obj, AgentFinish): + return { + "type": "AgentFinish", + "return_values": obj.return_values, + "log": obj.log + } + elif isinstance(obj, ChatPromptValue): + return { + "type": "ChatPromptValue", + "messages": [self.default(msg) for msg in obj.messages] + } + elif isinstance(obj, datetime): + return obj.isoformat() + elif isinstance(obj, UUID): + return str(obj) + elif hasattr(obj, 'to_dict'): + return obj.to_dict() + return super().default(obj) + + +class ToolCallLogger(BaseCallbackHandler): + def __init__(self, data_collector: DataCollector): + self.data_collector = data_collector + self.runs = {} + self.encoder = EnhancedJSONEncoder() + + def _encode_entry(self, entry: Dict[str, Any]) -> Dict[str, Any]: + return json.loads(self.encoder.encode(entry)) + + def _store_entry(self, entry: Dict[str, Any]) -> None: + encoded_entry = self._encode_entry(entry) + self.data_collector.store_entry(encoded_entry) + + def on_chain_start( + self, + serialized: Dict[str, Any], + inputs: Dict[str, Any], + *, + run_id: UUID, + parent_run_id: Optional[UUID] = None, + **kwargs: Any, + ) -> None: + chain_name = serialized.get("name", "unnamed_chain") if serialized else "unnamed_chain" + self.runs[str(run_id)] = { + "type": "chain", + "start_time": datetime.utcnow(), + "inputs": inputs, + "name": chain_name, + "parent_run_id": str(parent_run_id) if parent_run_id else None + } + + def on_chain_end( + self, + outputs: Dict[str, Any], + *, + run_id: UUID, + parent_run_id: Optional[UUID] = None, + **kwargs: Any, + ) -> None: + if str(run_id) in self.runs: + run_info = self.runs[str(run_id)] + run_info["end_time"] = datetime.utcnow() + run_info["outputs"] = outputs + run_info["status"] = "completed" + self._store_entry(run_info) + del self.runs[str(run_id)] + + def on_chain_error( + self, + error: Exception, + *, + run_id: UUID, + parent_run_id: Optional[UUID] = None, + **kwargs: Any, + ) -> None: + if str(run_id) in self.runs: + run_info = self.runs[str(run_id)] + run_info["end_time"] = datetime.utcnow() + run_info["error"] = str(error) + run_info["status"] = "error" + self._store_entry(run_info) + del self.runs[str(run_id)] + + def on_tool_start( + self, + serialized: Dict[str, Any], + input_str: str, + *, + run_id: UUID, + parent_run_id: Optional[UUID] = None, + **kwargs: Any, + ) -> None: + self.runs[str(run_id)] = { + "type": "tool", + "start_time": datetime.utcnow(), + "tool_name": serialized.get("name", "unnamed_tool"), + "input": input_str, + "parent_run_id": str(parent_run_id) if parent_run_id else None + } + + def on_tool_end( + self, + output: str, + *, + run_id: UUID, + parent_run_id: Optional[UUID] = None, + **kwargs: Any, + ) -> None: + if str(run_id) in self.runs: + run_info = self.runs[str(run_id)] + run_info["end_time"] = datetime.utcnow() + run_info["output"] = output + run_info["status"] = "completed" + self._store_entry(run_info) + del self.runs[str(run_id)] + + def on_tool_error( + self, + error: Exception, + *, + run_id: UUID, + parent_run_id: Optional[UUID] = None, + **kwargs: Any, + ) -> None: + if str(run_id) in self.runs: + run_info = self.runs[str(run_id)] + run_info["end_time"] = datetime.utcnow() + run_info["error"] = str(error) + run_info["status"] = "error" + self._store_entry(run_info) + del self.runs[str(run_id)] + + def on_agent_action( + self, + action: AgentAction, + *, + run_id: UUID, + parent_run_id: Optional[UUID] = None, + **kwargs: Any, + ) -> None: + agent_info = { + "type": "agent_action", + "start_time": datetime.utcnow(), + "tool": action.tool, + "tool_input": action.tool_input, + "log": action.log, + "parent_run_id": str(parent_run_id) if parent_run_id else None + } + self.runs[str(run_id)] = agent_info + self._store_entry(agent_info) + + def on_agent_finish( + self, + finish: AgentFinish, + *, + run_id: UUID, + parent_run_id: Optional[UUID] = None, + **kwargs: Any, + ) -> None: + log_entry = { + "type": "agent_finish", + "time": datetime.utcnow(), + "output": finish.return_values, + "log": finish.log, + "parent_run_id": str(parent_run_id) if parent_run_id else None + } + self._store_entry(log_entry) \ No newline at end of file diff --git a/flo_ai/core.py b/flo_ai/core.py index 79b8f9bb..ff304100 100644 --- a/flo_ai/core.py +++ b/flo_ai/core.py @@ -40,7 +40,8 @@ def async_stream(self, query, config=None) -> Iterator[Union[dict[str, Any], Any return self.runnable.astream(query, config) def invoke(self, query, config=None) -> Iterator[Union[dict[str, Any], Any]]: - config = {'callbacks': [self.session.langchain_logger]} + config = config or {} + config['callbacks'] = config.get('callbacks', []) + [self.session.langchain_logger]+ self.session.callbacks self.validate_invoke(self.session) get_logger().info(f"Invoking query: '{query}'", self.session) return self.runnable.invoke(query, config) diff --git a/flo_ai/example/tool_log_ex.py b/flo_ai/example/tool_log_ex.py new file mode 100644 index 00000000..092435e3 --- /dev/null +++ b/flo_ai/example/tool_log_ex.py @@ -0,0 +1,80 @@ +from flo_ai.callbacks.tool_logger import ToolCallLogger +from flo_ai.storage.data_collector import JSONLFileCollector +from langchain_openai import AzureChatOpenAI +import os +from dotenv import load_dotenv +from flo_ai import Flo +from flo_ai import FloSession +from typing import List +from flo_ai.tools import flotool + +load_dotenv() + +llm = AzureChatOpenAI( + temperature=0, + deployment_name="gpt-4", + model_name="gpt-4", + azure_endpoint=os.getenv("AZURE_OPENAI_ENDPOINT"), + azure_deployment=os.getenv("AZURE_DEPLOYMENT_NAME"), + api_key=os.getenv("AZURE_OPENAI_API_KEY"), + api_version="2024-08-01-preview", +) + +session = FloSession( + llm, + log_level='ERROR', +) + +@flotool(name='AdditionTool', description='Tool to add numbers') +def addition_tool(numbers: List[int]) -> str: + result = sum(numbers) + return f'The sum is {result}' + +@flotool( + name='MultiplicationTool', + description='Tool to multiply numbers to get product of numbers', +) +def mul_tool(numbers: List[int]) -> str: + result = 1 + for num in numbers: + result *= num + return f"The product is {result}" + +session.register_tool(name='Adder', tool=addition_tool).register_tool( + name='Multiplier', tool=mul_tool +) + +simple_calculator_agent = """ +apiVersion: flo/alpha-v1 +kind: FloAgent +name: calculating-assistant +agent: + name: SummationHelper + kind: agentic + job: > + You are a calculation assistant that MUST ONLY use the provided tools for calculations. + You MUST ONLY return the exact outputs from the tools without modification. + You MUST NOT perform any calculations yourself. + If you need both sum and product, you MUST use both tools and combine their exact outputs. + tools: + - name: Adder + - name: Multiplier +""" + + # You MUST format your response as: "Tool results: [exact tool outputs]" + + +file_collector = JSONLFileCollector("./flo_ai/example/my_llm_logs.jsonl") +local_tracker = ToolCallLogger(file_collector) + + +session.register_callback(local_tracker) + +flo = Flo.build(session, simple_calculator_agent, log_level='ERROR') + + +result = flo.invoke( + "find the sum of first three numbers and last three numbers and multilply the result. Numbers are 1, 3, 4, 2, 0, 1", + ) + + diff --git a/flo_ai/state/flo_session.py b/flo_ai/state/flo_session.py index 82457fb0..96c23de1 100644 --- a/flo_ai/state/flo_session.py +++ b/flo_ai/state/flo_session.py @@ -87,6 +87,10 @@ def register_callback( filter(lambda x: isinstance(x, FloToolCallback), self.callbacks) ) self.langchain_logger = FloLangchainLogger(self.session_id, tool_callbacks) + + if self.llm is not None: + self.llm = self.llm.bind(callbacks=[callback]) + print(f"selfcallback: {self.callbacks}") return self def append(self, node: str) -> int: diff --git a/flo_ai/storage/data_collector.py b/flo_ai/storage/data_collector.py new file mode 100644 index 00000000..e077c0fa --- /dev/null +++ b/flo_ai/storage/data_collector.py @@ -0,0 +1,29 @@ +from abc import ABC, abstractmethod +from typing import Any, Dict +from pathlib import Path +import json + +class DataCollector(ABC): + @abstractmethod + def store_entry(self, entry: Dict[str, Any]) -> None: + pass + + @abstractmethod + def close(self) -> None: + pass + +class JSONLFileCollector(DataCollector): + def __init__(self, file_path: str): + self.file_path = Path(file_path) + self.file_path.parent.mkdir(parents=True, exist_ok=True) + + def store_entry(self, entry: Dict[str, Any]) -> None: + try: + with open(self.file_path, 'a') as f: + json.dump(entry, f) + f.write('\n') + except Exception as e: + print(f"Error storing entry to JSONL: {e}") + + def close(self) -> None: + pass \ No newline at end of file From 5e9f00e12702f5eea9dd346cad2f69d04e7e61e2 Mon Sep 17 00:00:00 2001 From: thomastomy5 <69713148+thomastomy5@users.noreply.github.com> Date: Tue, 10 Dec 2024 16:06:46 +0530 Subject: [PATCH 2/8] updated readme --- README.md | 38 ++++++++++ examples/python/tool_data_logging_example.py | 79 ++++++++++++++++++++ flo_ai/callbacks/tool_logger.py | 8 +- flo_ai/core.py | 3 +- flo_ai/storage/data_collector.py | 14 ++-- 5 files changed, 131 insertions(+), 11 deletions(-) create mode 100644 examples/python/tool_data_logging_example.py diff --git a/README.md b/README.md index 94f74a21..549b85b1 100644 --- a/README.md +++ b/README.md @@ -233,6 +233,44 @@ session.register_tool(name='Adder', tool=addition_tool) **Note:** `@flotool` comes with inherent error handling capabilities to retry if an exception is thrown. Use `unsafe=True` to disable error handling +## 📊 Tool Logging and Data Collection + +FloAI provides built-in capabilities for logging tool calls and collecting data through the `ToolCallLogger` and `DataCollector` classes. +You can customize `DataCollector` implementation according to your database. A sample implementation where logs are stored locally as JSON files is implemented in `JSONLFileCollector`. + +### Quick Setup + +```python +from flo_ai.callbacks.tool_logger import ToolCallLogger +from flo_ai.storage.data_collector import JSONLFileCollector + +# Initialize the file collector with a path for the JSONL log file +file_collector = JSONLFileCollector("./path/to/my_llm_logs.jsonl") + +# Create a tool logger with the collector +local_tracker = ToolCallLogger(file_collector) + +# Register the logger with your session +session.register_callback(local_tracker) +``` + +### Features + +- 📝 Logs all tool calls, chain executions, and agent actions +- 🕒 Includes timestamps for start and end of operations +- 🔍 Tracks inputs, outputs, and errors +- 💾 Stores data in JSONL format for easy analysis + +### Log Data Structure + +The logger captures detailed information including: +- Tool name and inputs +- Execution timestamps +- Operation status (completed/error) +- Chain and agent activities +- Parent-child relationship between operations + + ## 📖 Documentation Visit our [comprehensive documentation](https://flo-ai.rootflo.ai) for: diff --git a/examples/python/tool_data_logging_example.py b/examples/python/tool_data_logging_example.py new file mode 100644 index 00000000..d78d6827 --- /dev/null +++ b/examples/python/tool_data_logging_example.py @@ -0,0 +1,79 @@ +from flo_ai.callbacks.tool_logger import ToolCallLogger +from flo_ai.storage.data_collector import JSONLFileCollector +from langchain_openai import AzureChatOpenAI +import os +from dotenv import load_dotenv +from flo_ai import Flo +from flo_ai import FloSession +from typing import List +from flo_ai.tools import flotool + +load_dotenv() + +llm = AzureChatOpenAI( + temperature=0, + deployment_name="gpt-4", + model_name="gpt-4", + azure_endpoint=os.getenv("AZURE_OPENAI_ENDPOINT"), + api_key=os.getenv("AZURE_OPENAI_API_KEY"), + api_version="2024-08-01-preview", +) + +session = FloSession( + llm, + log_level='ERROR', +) + +@flotool(name='AdditionTool', description='Tool to add numbers') +def addition_tool(numbers: List[int]) -> str: + result = sum(numbers) + return f'The sum is {result}' + +@flotool( + name='MultiplicationTool', + description='Tool to multiply numbers to get product of numbers', +) +def mul_tool(numbers: List[int]) -> str: + result = 1 + for num in numbers: + result *= num + return f"The product is {result}" + +session.register_tool(name='Adder', tool=addition_tool).register_tool( + name='Multiplier', tool=mul_tool +) + +simple_calculator_agent = """ +apiVersion: flo/alpha-v1 +kind: FloAgent +name: calculating-assistant +agent: + name: SummationHelper + kind: agentic + job: > + You are a calculation assistant that MUST ONLY use the provided tools for calculations. + You MUST ONLY return the exact outputs from the tools without modification. + You MUST NOT perform any calculations yourself. + If you need both sum and product, you MUST use both tools and combine their exact outputs. + tools: + - name: Adder + - name: Multiplier +""" + + + +current_dir = os.path.dirname(os.path.abspath(__file__)) +log_file_path = os.path.join(current_dir, "my_llm_logs.jsonl") + +file_collector = JSONLFileCollector(log_file_path) +local_tracker = ToolCallLogger(file_collector) + +session.register_callback(local_tracker) + +flo = Flo.build(session, simple_calculator_agent, log_level='ERROR') + +result = flo.invoke( + "find the sum of first three numbers and last three numbers and multilply the result. Numbers are 1, 3, 4, 2, 0, 1", + ) + + diff --git a/flo_ai/callbacks/tool_logger.py b/flo_ai/callbacks/tool_logger.py index 8e54ada6..371b5016 100644 --- a/flo_ai/callbacks/tool_logger.py +++ b/flo_ai/callbacks/tool_logger.py @@ -9,6 +9,7 @@ from langchain.schema import HumanMessage, AIMessage, BaseMessage from langchain_core.prompts.chat import ChatPromptValue from flo_ai.storage.data_collector import DataCollector +from flo_ai.common.flo_logger import get_logger class EnhancedJSONEncoder(json.JSONEncoder): def default(self, obj): @@ -55,8 +56,11 @@ def _encode_entry(self, entry: Dict[str, Any]) -> Dict[str, Any]: return json.loads(self.encoder.encode(entry)) def _store_entry(self, entry: Dict[str, Any]) -> None: - encoded_entry = self._encode_entry(entry) - self.data_collector.store_entry(encoded_entry) + try: + encoded_entry = self._encode_entry(entry) + self.data_collector.store_entry(encoded_entry) + except Exception as e: + get_logger().error(f"Error storing entry in ToolCallLogger: {e}") def on_chain_start( self, diff --git a/flo_ai/core.py b/flo_ai/core.py index ff304100..246a9f95 100644 --- a/flo_ai/core.py +++ b/flo_ai/core.py @@ -47,7 +47,8 @@ def invoke(self, query, config=None) -> Iterator[Union[dict[str, Any], Any]]: return self.runnable.invoke(query, config) def async_invoke(self, query, config=None) -> Iterator[Union[dict[str, Any], Any]]: - config = {'callbacks': [self.session.langchain_logger]} + config = config or {} + config['callbacks'] = config.get('callbacks', []) + [self.session.langchain_logger]+ self.session.callbacks get_logger().info(f"Invoking async query: '{query}'", self.session) return self.runnable.ainvoke(query, config) diff --git a/flo_ai/storage/data_collector.py b/flo_ai/storage/data_collector.py index e077c0fa..e7459e29 100644 --- a/flo_ai/storage/data_collector.py +++ b/flo_ai/storage/data_collector.py @@ -2,6 +2,7 @@ from typing import Any, Dict from pathlib import Path import json +from flo_ai.common.flo_logger import get_logger class DataCollector(ABC): @abstractmethod @@ -16,14 +17,11 @@ class JSONLFileCollector(DataCollector): def __init__(self, file_path: str): self.file_path = Path(file_path) self.file_path.parent.mkdir(parents=True, exist_ok=True) - + def store_entry(self, entry: Dict[str, Any]) -> None: - try: - with open(self.file_path, 'a') as f: - json.dump(entry, f) - f.write('\n') - except Exception as e: - print(f"Error storing entry to JSONL: {e}") - + with open(self.file_path, 'a') as f: + json.dump(entry, f) + f.write('\n') + def close(self) -> None: pass \ No newline at end of file From 57165fa4c01c2ed6103866238bdaecabca73a277 Mon Sep 17 00:00:00 2001 From: thomastomy5 <69713148+thomastomy5@users.noreply.github.com> Date: Tue, 10 Dec 2024 16:11:12 +0530 Subject: [PATCH 3/8] Remove examples folder --- flo_ai/example/tool_log_ex.py | 80 ----------------------------------- 1 file changed, 80 deletions(-) delete mode 100644 flo_ai/example/tool_log_ex.py diff --git a/flo_ai/example/tool_log_ex.py b/flo_ai/example/tool_log_ex.py deleted file mode 100644 index 092435e3..00000000 --- a/flo_ai/example/tool_log_ex.py +++ /dev/null @@ -1,80 +0,0 @@ -from flo_ai.callbacks.tool_logger import ToolCallLogger -from flo_ai.storage.data_collector import JSONLFileCollector -from langchain_openai import AzureChatOpenAI -import os -from dotenv import load_dotenv -from flo_ai import Flo -from flo_ai import FloSession -from typing import List -from flo_ai.tools import flotool - -load_dotenv() - -llm = AzureChatOpenAI( - temperature=0, - deployment_name="gpt-4", - model_name="gpt-4", - azure_endpoint=os.getenv("AZURE_OPENAI_ENDPOINT"), - azure_deployment=os.getenv("AZURE_DEPLOYMENT_NAME"), - api_key=os.getenv("AZURE_OPENAI_API_KEY"), - api_version="2024-08-01-preview", -) - -session = FloSession( - llm, - log_level='ERROR', -) - -@flotool(name='AdditionTool', description='Tool to add numbers') -def addition_tool(numbers: List[int]) -> str: - result = sum(numbers) - return f'The sum is {result}' - -@flotool( - name='MultiplicationTool', - description='Tool to multiply numbers to get product of numbers', -) -def mul_tool(numbers: List[int]) -> str: - result = 1 - for num in numbers: - result *= num - return f"The product is {result}" - -session.register_tool(name='Adder', tool=addition_tool).register_tool( - name='Multiplier', tool=mul_tool -) - -simple_calculator_agent = """ -apiVersion: flo/alpha-v1 -kind: FloAgent -name: calculating-assistant -agent: - name: SummationHelper - kind: agentic - job: > - You are a calculation assistant that MUST ONLY use the provided tools for calculations. - You MUST ONLY return the exact outputs from the tools without modification. - You MUST NOT perform any calculations yourself. - If you need both sum and product, you MUST use both tools and combine their exact outputs. - tools: - - name: Adder - - name: Multiplier -""" - - # You MUST format your response as: "Tool results: [exact tool outputs]" - - -file_collector = JSONLFileCollector("./flo_ai/example/my_llm_logs.jsonl") -local_tracker = ToolCallLogger(file_collector) - - -session.register_callback(local_tracker) - -flo = Flo.build(session, simple_calculator_agent, log_level='ERROR') - - -result = flo.invoke( - "find the sum of first three numbers and last three numbers and multilply the result. Numbers are 1, 3, 4, 2, 0, 1", - ) - - From f9a5848061d24b0cfba2c99e400aba29dccd8ce4 Mon Sep 17 00:00:00 2001 From: thomastomy5 <69713148+thomastomy5@users.noreply.github.com> Date: Tue, 10 Dec 2024 16:37:11 +0530 Subject: [PATCH 4/8] updated ToolCallLogger to FloChainExecutionLogger --- README.md | 6 +++--- examples/python/tool_data_logging_example.py | 4 ++-- .../callbacks/{tool_logger.py => flo_execution_logger.py} | 6 +++--- 3 files changed, 8 insertions(+), 8 deletions(-) rename flo_ai/callbacks/{tool_logger.py => flo_execution_logger.py} (97%) diff --git a/README.md b/README.md index 549b85b1..227cc20b 100644 --- a/README.md +++ b/README.md @@ -235,20 +235,20 @@ session.register_tool(name='Adder', tool=addition_tool) ## 📊 Tool Logging and Data Collection -FloAI provides built-in capabilities for logging tool calls and collecting data through the `ToolCallLogger` and `DataCollector` classes. +FloAI provides built-in capabilities for logging tool calls and collecting data through the `FloChainExecutionLogger` and `DataCollector` classes. You can customize `DataCollector` implementation according to your database. A sample implementation where logs are stored locally as JSON files is implemented in `JSONLFileCollector`. ### Quick Setup ```python -from flo_ai.callbacks.tool_logger import ToolCallLogger +from flo_ai.callbacks.tool_logger import FloChainExecutionLogger from flo_ai.storage.data_collector import JSONLFileCollector # Initialize the file collector with a path for the JSONL log file file_collector = JSONLFileCollector("./path/to/my_llm_logs.jsonl") # Create a tool logger with the collector -local_tracker = ToolCallLogger(file_collector) +local_tracker = FloChainExecutionLogger(file_collector) # Register the logger with your session session.register_callback(local_tracker) diff --git a/examples/python/tool_data_logging_example.py b/examples/python/tool_data_logging_example.py index d78d6827..2b28b623 100644 --- a/examples/python/tool_data_logging_example.py +++ b/examples/python/tool_data_logging_example.py @@ -1,4 +1,4 @@ -from flo_ai.callbacks.tool_logger import ToolCallLogger +from flo_ai.callbacks.flo_execution_logger import FloChainExecutionLogger from flo_ai.storage.data_collector import JSONLFileCollector from langchain_openai import AzureChatOpenAI import os @@ -66,7 +66,7 @@ def mul_tool(numbers: List[int]) -> str: log_file_path = os.path.join(current_dir, "my_llm_logs.jsonl") file_collector = JSONLFileCollector(log_file_path) -local_tracker = ToolCallLogger(file_collector) +local_tracker = FloChainExecutionLogger(file_collector) session.register_callback(local_tracker) diff --git a/flo_ai/callbacks/tool_logger.py b/flo_ai/callbacks/flo_execution_logger.py similarity index 97% rename from flo_ai/callbacks/tool_logger.py rename to flo_ai/callbacks/flo_execution_logger.py index 371b5016..e98d2e9e 100644 --- a/flo_ai/callbacks/tool_logger.py +++ b/flo_ai/callbacks/flo_execution_logger.py @@ -46,7 +46,7 @@ def default(self, obj): return super().default(obj) -class ToolCallLogger(BaseCallbackHandler): +class FloChainExecutionLogger(BaseCallbackHandler): def __init__(self, data_collector: DataCollector): self.data_collector = data_collector self.runs = {} @@ -60,7 +60,7 @@ def _store_entry(self, entry: Dict[str, Any]) -> None: encoded_entry = self._encode_entry(entry) self.data_collector.store_entry(encoded_entry) except Exception as e: - get_logger().error(f"Error storing entry in ToolCallLogger: {e}") + get_logger().error(f"Error storing entry in FloChainExecutionLogger: {e}") def on_chain_start( self, @@ -195,4 +195,4 @@ def on_agent_finish( "log": finish.log, "parent_run_id": str(parent_run_id) if parent_run_id else None } - self._store_entry(log_entry) \ No newline at end of file + self._store_entry(log_entry) From 7d84639f9efbf1a6c1206a66e4be0bbb9c9ebea2 Mon Sep 17 00:00:00 2001 From: thomastomy5 <69713148+thomastomy5@users.noreply.github.com> Date: Tue, 10 Dec 2024 19:01:42 +0530 Subject: [PATCH 5/8] code cleanup --- flo_ai/callbacks/flo_execution_logger.py | 101 ++++++++++++----------- flo_ai/storage/data_collector.py | 7 +- 2 files changed, 55 insertions(+), 53 deletions(-) diff --git a/flo_ai/callbacks/flo_execution_logger.py b/flo_ai/callbacks/flo_execution_logger.py index e98d2e9e..b675a746 100644 --- a/flo_ai/callbacks/flo_execution_logger.py +++ b/flo_ai/callbacks/flo_execution_logger.py @@ -1,7 +1,5 @@ import json -from abc import ABC, abstractmethod -from typing import Any, Dict, Optional, List -from pathlib import Path +from typing import Any, Dict, Optional from datetime import datetime from uuid import UUID from langchain_core.callbacks import BaseCallbackHandler @@ -11,31 +9,32 @@ from flo_ai.storage.data_collector import DataCollector from flo_ai.common.flo_logger import get_logger + class EnhancedJSONEncoder(json.JSONEncoder): def default(self, obj): if isinstance(obj, (HumanMessage, AIMessage, BaseMessage)): return { - "type": obj.__class__.__name__, - "content": obj.content, - "additional_kwargs": obj.additional_kwargs + 'type': obj.__class__.__name__, + 'content': obj.content, + 'additional_kwargs': obj.additional_kwargs, } elif isinstance(obj, AgentAction): return { - "type": "AgentAction", - "tool": obj.tool, - "tool_input": obj.tool_input, - "log": obj.log + 'type': 'AgentAction', + 'tool': obj.tool, + 'tool_input': obj.tool_input, + 'log': obj.log, } elif isinstance(obj, AgentFinish): return { - "type": "AgentFinish", - "return_values": obj.return_values, - "log": obj.log + 'type': 'AgentFinish', + 'return_values': obj.return_values, + 'log': obj.log, } elif isinstance(obj, ChatPromptValue): return { - "type": "ChatPromptValue", - "messages": [self.default(msg) for msg in obj.messages] + 'type': 'ChatPromptValue', + 'messages': [self.default(msg) for msg in obj.messages], } elif isinstance(obj, datetime): return obj.isoformat() @@ -60,7 +59,7 @@ def _store_entry(self, entry: Dict[str, Any]) -> None: encoded_entry = self._encode_entry(entry) self.data_collector.store_entry(encoded_entry) except Exception as e: - get_logger().error(f"Error storing entry in FloChainExecutionLogger: {e}") + get_logger().error(f'Error storing entry in FloChainExecutionLogger: {e}') def on_chain_start( self, @@ -71,13 +70,15 @@ def on_chain_start( parent_run_id: Optional[UUID] = None, **kwargs: Any, ) -> None: - chain_name = serialized.get("name", "unnamed_chain") if serialized else "unnamed_chain" + chain_name = ( + serialized.get('name', 'unnamed_chain') if serialized else 'unnamed_chain' + ) self.runs[str(run_id)] = { - "type": "chain", - "start_time": datetime.utcnow(), - "inputs": inputs, - "name": chain_name, - "parent_run_id": str(parent_run_id) if parent_run_id else None + 'type': 'chain', + 'start_time': datetime.utcnow(), + 'inputs': inputs, + 'name': chain_name, + 'parent_run_id': str(parent_run_id) if parent_run_id else None, } def on_chain_end( @@ -90,9 +91,9 @@ def on_chain_end( ) -> None: if str(run_id) in self.runs: run_info = self.runs[str(run_id)] - run_info["end_time"] = datetime.utcnow() - run_info["outputs"] = outputs - run_info["status"] = "completed" + run_info['end_time'] = datetime.utcnow() + run_info['outputs'] = outputs + run_info['status'] = 'completed' self._store_entry(run_info) del self.runs[str(run_id)] @@ -106,9 +107,9 @@ def on_chain_error( ) -> None: if str(run_id) in self.runs: run_info = self.runs[str(run_id)] - run_info["end_time"] = datetime.utcnow() - run_info["error"] = str(error) - run_info["status"] = "error" + run_info['end_time'] = datetime.utcnow() + run_info['error'] = str(error) + run_info['status'] = 'error' self._store_entry(run_info) del self.runs[str(run_id)] @@ -122,11 +123,11 @@ def on_tool_start( **kwargs: Any, ) -> None: self.runs[str(run_id)] = { - "type": "tool", - "start_time": datetime.utcnow(), - "tool_name": serialized.get("name", "unnamed_tool"), - "input": input_str, - "parent_run_id": str(parent_run_id) if parent_run_id else None + 'type': 'tool', + 'start_time': datetime.utcnow(), + 'tool_name': serialized.get('name', 'unnamed_tool'), + 'input': input_str, + 'parent_run_id': str(parent_run_id) if parent_run_id else None, } def on_tool_end( @@ -139,9 +140,9 @@ def on_tool_end( ) -> None: if str(run_id) in self.runs: run_info = self.runs[str(run_id)] - run_info["end_time"] = datetime.utcnow() - run_info["output"] = output - run_info["status"] = "completed" + run_info['end_time'] = datetime.utcnow() + run_info['output'] = output + run_info['status'] = 'completed' self._store_entry(run_info) del self.runs[str(run_id)] @@ -155,9 +156,9 @@ def on_tool_error( ) -> None: if str(run_id) in self.runs: run_info = self.runs[str(run_id)] - run_info["end_time"] = datetime.utcnow() - run_info["error"] = str(error) - run_info["status"] = "error" + run_info['end_time'] = datetime.utcnow() + run_info['error'] = str(error) + run_info['status'] = 'error' self._store_entry(run_info) del self.runs[str(run_id)] @@ -170,12 +171,12 @@ def on_agent_action( **kwargs: Any, ) -> None: agent_info = { - "type": "agent_action", - "start_time": datetime.utcnow(), - "tool": action.tool, - "tool_input": action.tool_input, - "log": action.log, - "parent_run_id": str(parent_run_id) if parent_run_id else None + 'type': 'agent_action', + 'start_time': datetime.utcnow(), + 'tool': action.tool, + 'tool_input': action.tool_input, + 'log': action.log, + 'parent_run_id': str(parent_run_id) if parent_run_id else None, } self.runs[str(run_id)] = agent_info self._store_entry(agent_info) @@ -189,10 +190,10 @@ def on_agent_finish( **kwargs: Any, ) -> None: log_entry = { - "type": "agent_finish", - "time": datetime.utcnow(), - "output": finish.return_values, - "log": finish.log, - "parent_run_id": str(parent_run_id) if parent_run_id else None + 'type': 'agent_finish', + 'time': datetime.utcnow(), + 'output': finish.return_values, + 'log': finish.log, + 'parent_run_id': str(parent_run_id) if parent_run_id else None, } self._store_entry(log_entry) diff --git a/flo_ai/storage/data_collector.py b/flo_ai/storage/data_collector.py index e7459e29..79a6f053 100644 --- a/flo_ai/storage/data_collector.py +++ b/flo_ai/storage/data_collector.py @@ -2,17 +2,18 @@ from typing import Any, Dict from pathlib import Path import json -from flo_ai.common.flo_logger import get_logger + class DataCollector(ABC): @abstractmethod def store_entry(self, entry: Dict[str, Any]) -> None: pass - + @abstractmethod def close(self) -> None: pass + class JSONLFileCollector(DataCollector): def __init__(self, file_path: str): self.file_path = Path(file_path) @@ -24,4 +25,4 @@ def store_entry(self, entry: Dict[str, Any]) -> None: f.write('\n') def close(self) -> None: - pass \ No newline at end of file + pass From 3c9fa12c06090ac138b137eecf40c8108ab76dfd Mon Sep 17 00:00:00 2001 From: thomastomy5 <69713148+thomastomy5@users.noreply.github.com> Date: Tue, 10 Dec 2024 22:27:38 +0530 Subject: [PATCH 6/8] refactoring --- examples/python/tool_data_logging_example.py | 26 ++++++++++---------- flo_ai/core.py | 12 +++++++-- flo_ai/state/flo_session.py | 1 - 3 files changed, 23 insertions(+), 16 deletions(-) diff --git a/examples/python/tool_data_logging_example.py b/examples/python/tool_data_logging_example.py index 2b28b623..9d9e790a 100644 --- a/examples/python/tool_data_logging_example.py +++ b/examples/python/tool_data_logging_example.py @@ -12,23 +12,25 @@ llm = AzureChatOpenAI( temperature=0, - deployment_name="gpt-4", - model_name="gpt-4", - azure_endpoint=os.getenv("AZURE_OPENAI_ENDPOINT"), - api_key=os.getenv("AZURE_OPENAI_API_KEY"), - api_version="2024-08-01-preview", + deployment_name='gpt-4', + model_name='gpt-4', + azure_endpoint=os.getenv('AZURE_OPENAI_ENDPOINT'), + api_key=os.getenv('AZURE_OPENAI_API_KEY'), + api_version='2024-08-01-preview', ) session = FloSession( - llm, + llm, log_level='ERROR', ) + @flotool(name='AdditionTool', description='Tool to add numbers') def addition_tool(numbers: List[int]) -> str: result = sum(numbers) return f'The sum is {result}' + @flotool( name='MultiplicationTool', description='Tool to multiply numbers to get product of numbers', @@ -37,7 +39,8 @@ def mul_tool(numbers: List[int]) -> str: result = 1 for num in numbers: result *= num - return f"The product is {result}" + return f'The product is {result}' + session.register_tool(name='Adder', tool=addition_tool).register_tool( name='Multiplier', tool=mul_tool @@ -61,9 +64,8 @@ def mul_tool(numbers: List[int]) -> str: """ - current_dir = os.path.dirname(os.path.abspath(__file__)) -log_file_path = os.path.join(current_dir, "my_llm_logs.jsonl") +log_file_path = os.path.join(current_dir, 'my_llm_logs.jsonl') file_collector = JSONLFileCollector(log_file_path) local_tracker = FloChainExecutionLogger(file_collector) @@ -73,7 +75,5 @@ def mul_tool(numbers: List[int]) -> str: flo = Flo.build(session, simple_calculator_agent, log_level='ERROR') result = flo.invoke( - "find the sum of first three numbers and last three numbers and multilply the result. Numbers are 1, 3, 4, 2, 0, 1", - ) - - + 'find the sum of first three numbers and last three numbers and multilply the result. Numbers are 1, 3, 4, 2, 0, 1', +) diff --git a/flo_ai/core.py b/flo_ai/core.py index 246a9f95..0a76d452 100644 --- a/flo_ai/core.py +++ b/flo_ai/core.py @@ -41,14 +41,22 @@ def async_stream(self, query, config=None) -> Iterator[Union[dict[str, Any], Any def invoke(self, query, config=None) -> Iterator[Union[dict[str, Any], Any]]: config = config or {} - config['callbacks'] = config.get('callbacks', []) + [self.session.langchain_logger]+ self.session.callbacks + config['callbacks'] = ( + config.get('callbacks', []) + + [self.session.langchain_logger] + + self.session.callbacks + ) self.validate_invoke(self.session) get_logger().info(f"Invoking query: '{query}'", self.session) return self.runnable.invoke(query, config) def async_invoke(self, query, config=None) -> Iterator[Union[dict[str, Any], Any]]: config = config or {} - config['callbacks'] = config.get('callbacks', []) + [self.session.langchain_logger]+ self.session.callbacks + config['callbacks'] = ( + config.get('callbacks', []) + + [self.session.langchain_logger] + + self.session.callbacks + ) get_logger().info(f"Invoking async query: '{query}'", self.session) return self.runnable.ainvoke(query, config) diff --git a/flo_ai/state/flo_session.py b/flo_ai/state/flo_session.py index 96c23de1..a4a9f9a8 100644 --- a/flo_ai/state/flo_session.py +++ b/flo_ai/state/flo_session.py @@ -90,7 +90,6 @@ def register_callback( if self.llm is not None: self.llm = self.llm.bind(callbacks=[callback]) - print(f"selfcallback: {self.callbacks}") return self def append(self, node: str) -> int: From ac207c8dc9c9fd6ffa10b3aa21bff94394cdb829 Mon Sep 17 00:00:00 2001 From: thomastomy5 <69713148+thomastomy5@users.noreply.github.com> Date: Wed, 11 Dec 2024 13:48:49 +0530 Subject: [PATCH 7/8] renamed FloChainExecutionLogger to FloExecutionLogger --- README.md | 14 +++++++++++--- examples/python/tool_data_logging_example.py | 4 ++-- flo_ai/callbacks/__init__.py | 2 ++ flo_ai/callbacks/flo_execution_logger.py | 4 ++-- 4 files changed, 17 insertions(+), 7 deletions(-) diff --git a/README.md b/README.md index 227cc20b..c5b926ff 100644 --- a/README.md +++ b/README.md @@ -235,20 +235,20 @@ session.register_tool(name='Adder', tool=addition_tool) ## 📊 Tool Logging and Data Collection -FloAI provides built-in capabilities for logging tool calls and collecting data through the `FloChainExecutionLogger` and `DataCollector` classes. +FloAI provides built-in capabilities for logging tool calls and collecting data through the `FloExecutionLogger` and `DataCollector` classes. You can customize `DataCollector` implementation according to your database. A sample implementation where logs are stored locally as JSON files is implemented in `JSONLFileCollector`. ### Quick Setup ```python -from flo_ai.callbacks.tool_logger import FloChainExecutionLogger +from flo_ai.callbacks import FloExecutionLogger from flo_ai.storage.data_collector import JSONLFileCollector # Initialize the file collector with a path for the JSONL log file file_collector = JSONLFileCollector("./path/to/my_llm_logs.jsonl") # Create a tool logger with the collector -local_tracker = FloChainExecutionLogger(file_collector) +local_tracker = FloExecutionLogger(file_collector) # Register the logger with your session session.register_callback(local_tracker) @@ -260,6 +260,7 @@ session.register_callback(local_tracker) - 🕒 Includes timestamps for start and end of operations - 🔍 Tracks inputs, outputs, and errors - 💾 Stores data in JSONL format for easy analysis +- 📚 Facilitates the creation of training data from logged interactions ### Log Data Structure @@ -270,6 +271,13 @@ The logger captures detailed information including: - Chain and agent activities - Parent-child relationship between operations +### Training Data Generation + +The structured logs provide valuable training data that can be used to: +- **Fine-tune LLMs** on your specific use cases +- **Train new models** to replicate successful tool usage patterns +- **Create supervised datasets** for tool selection and chain optimization + ## 📖 Documentation diff --git a/examples/python/tool_data_logging_example.py b/examples/python/tool_data_logging_example.py index 9d9e790a..ca4e59d2 100644 --- a/examples/python/tool_data_logging_example.py +++ b/examples/python/tool_data_logging_example.py @@ -1,4 +1,4 @@ -from flo_ai.callbacks.flo_execution_logger import FloChainExecutionLogger +from flo_ai.callbacks import FloExecutionLogger from flo_ai.storage.data_collector import JSONLFileCollector from langchain_openai import AzureChatOpenAI import os @@ -68,7 +68,7 @@ def mul_tool(numbers: List[int]) -> str: log_file_path = os.path.join(current_dir, 'my_llm_logs.jsonl') file_collector = JSONLFileCollector(log_file_path) -local_tracker = FloChainExecutionLogger(file_collector) +local_tracker = FloExecutionLogger(file_collector) session.register_callback(local_tracker) diff --git a/flo_ai/callbacks/__init__.py b/flo_ai/callbacks/__init__.py index 69fba216..03eb1a64 100644 --- a/flo_ai/callbacks/__init__.py +++ b/flo_ai/callbacks/__init__.py @@ -4,10 +4,12 @@ flo_tool_callback, flo_call_back, ) +from flo_ai.callbacks.flo_execution_logger import FloExecutionLogger __all__ = [ 'flo_agent_callback', 'flo_router_callback', 'flo_tool_callback', 'flo_call_back', + 'FloExecutionLogger', ] diff --git a/flo_ai/callbacks/flo_execution_logger.py b/flo_ai/callbacks/flo_execution_logger.py index b675a746..f0cb04bd 100644 --- a/flo_ai/callbacks/flo_execution_logger.py +++ b/flo_ai/callbacks/flo_execution_logger.py @@ -45,7 +45,7 @@ def default(self, obj): return super().default(obj) -class FloChainExecutionLogger(BaseCallbackHandler): +class FloExecutionLogger(BaseCallbackHandler): def __init__(self, data_collector: DataCollector): self.data_collector = data_collector self.runs = {} @@ -59,7 +59,7 @@ def _store_entry(self, entry: Dict[str, Any]) -> None: encoded_entry = self._encode_entry(entry) self.data_collector.store_entry(encoded_entry) except Exception as e: - get_logger().error(f'Error storing entry in FloChainExecutionLogger: {e}') + get_logger().error(f'Error storing entry in FloExecutionLogger: {e}') def on_chain_start( self, From 64effd28a62209339b5e361e19d87ee4ba2e1f5f Mon Sep 17 00:00:00 2001 From: thomastomy5 <69713148+thomastomy5@users.noreply.github.com> Date: Wed, 11 Dec 2024 13:54:35 +0530 Subject: [PATCH 8/8] updated readme --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index c5b926ff..eac4a92c 100644 --- a/README.md +++ b/README.md @@ -235,7 +235,7 @@ session.register_tool(name='Adder', tool=addition_tool) ## 📊 Tool Logging and Data Collection -FloAI provides built-in capabilities for logging tool calls and collecting data through the `FloExecutionLogger` and `DataCollector` classes. +FloAI provides built-in capabilities for logging tool calls and collecting data through the `FloExecutionLogger` and `DataCollector` classes, facilitating the creation of valuable training data. You can customize `DataCollector` implementation according to your database. A sample implementation where logs are stored locally as JSON files is implemented in `JSONLFileCollector`. ### Quick Setup