diff --git a/.vscode/launch.json b/.vscode/launch.json new file mode 100644 index 00000000..2ba986f6 --- /dev/null +++ b/.vscode/launch.json @@ -0,0 +1,15 @@ +{ + // Use IntelliSense to learn about possible attributes. + // Hover to view descriptions of existing attributes. + // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 + "version": "0.2.0", + "configurations": [ + { + "type": "chrome", + "request": "launch", + "name": "Launch Chrome against localhost", + "url": "http://localhost:8080", + "webRoot": "${workspaceFolder}" + } + ] +} \ No newline at end of file diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 00000000..d293ec5c --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,14 @@ +{ + "python.testing.unittestArgs": [ + "-v", + "-s", + "./flo_ai_tools", + "-p", + "*test.py" + ], + "python.testing.pytestEnabled": true, + "python.testing.unittestEnabled": false, + "python.testing.pytestArgs": [ + "flo_ai" + ] +} \ No newline at end of file diff --git a/flo_ai/flo_ai/state/flo_json_output_collector.py b/flo_ai/flo_ai/state/flo_json_output_collector.py index 40b88fc9..9df81bf7 100644 --- a/flo_ai/flo_ai/state/flo_json_output_collector.py +++ b/flo_ai/flo_ai/state/flo_json_output_collector.py @@ -1,25 +1,40 @@ import json -import regex +import re +from typing import Dict, List, Any, Callable, Optional + from flo_ai.error.flo_exception import FloException -from typing import Dict, List, Any from flo_ai.common.flo_logger import get_logger from flo_ai.state.flo_output_collector import FloOutputCollector, CollectionStatus class FloJsonOutputCollector(FloOutputCollector): + """ + FloJsonOutputCollector — collects JSON payloads from LLM/agent outputs, + gracefully handles comments, and offers “Flo” Q-promise looping. + Key Features: + - Strips out // and /*…*/ comments before parsing + - Uses recursive regex to find balanced { … } blocks + - Strict mode: raises exception if no JSON found + - peek, pop, fetch to manage collected data + - rewind(): recursive promise-then replay, newest-first + - iter_q(): while–for hybrid iterator over memory steps + """ + def __init__(self, strict: bool = False): super().__init__() - self.strict = strict - self.status = CollectionStatus.success - self.data: List[Dict[str, Any]] = [] + self.strict = strict # Enforce JSON presence? + self.status = CollectionStatus.success # success, partial, or error + self.data: List[Dict[str, Any]] = [] # Stored JSON dictionaries - def append(self, agent_output): + def append(self, agent_output: str) -> None: + """Extracts JSON from `agent_output` and appends the resulting dict.""" self.data.append(self.__extract_jsons(agent_output)) def __strip_comments(self, json_str: str) -> str: + """Remove JS-style comments (// and /*…*/) so json.loads() will succeed.""" cleaned = [] - length = len(json_str) i = 0 + length = len(json_str) while i < length: char = json_str[i] @@ -32,18 +47,16 @@ def __strip_comments(self, json_str: str) -> str: if char == '"': cleaned.append(char) i += 1 - while i < length: char = json_str[i] cleaned.append(char) i += 1 - if char == '"' and json_str[i - 2] != '\\': + if char == '"' and (i < 2 or json_str[i - 2] != '\\'): break continue if char == '/' and i + 1 < length: next_char = json_str[i + 1] - if next_char == '/': i += 2 while i < length and json_str[i] != '\n': @@ -60,38 +73,126 @@ def __strip_comments(self, json_str: str) -> str: cleaned.append(char) i += 1 + return ''.join(cleaned) - def __extract_jsons(self, llm_response): - json_pattern = r'\{(?:[^{}]|(?R))*\}' - json_matches = regex.findall(json_pattern, llm_response) - json_object = {} - for json_str in json_matches: + def __extract_jsons(self, llm_response: str) -> Dict[str, Any]: + """ + 1) Find all balanced `{ … }` blocks via recursive regex + 2) Strip comments and json.loads() each + 3) Merge into one dict (later keys override earlier) + 4) On strict mode, raise FloException if no JSON found + """ + pattern = r'\{(?:[^{}]|(?R))*\}' + matches = re.findall(pattern, llm_response) + merged: Dict[str, Any] = {} + + for json_str in matches: try: - json_obj = json.loads(self.__strip_comments(json_str)) - json_object.update(json_obj) + cleaned = self.__strip_comments(json_str) + obj = json.loads(cleaned) + merged.update(obj) except json.JSONDecodeError as e: self.status = CollectionStatus.partial get_logger().error(f'Invalid JSON in response: {json_str}, {e}') - if self.strict and len(json_matches) == 0: + + if self.strict and not matches: self.status = CollectionStatus.error - get_logger().error(f'Error while finding json in -- {llm_response}') + get_logger().error(f'No JSON found in strict mode: {llm_response}') raise FloException( 'JSON response expected in collector model: strict', error_code=1099 ) - return json_object - def pop(self): + return merged + + # ——————————————————————————————— + # Standard Data Management + # ——————————————————————————————— + + def pop(self) -> Dict[str, Any]: + """Remove and return the last collected JSON dict.""" return self.data.pop() - def peek(self): - return self.data[-1] if len(self.data) > 0 else None + def peek(self) -> Optional[Dict[str, Any]]: + """View the last collected JSON dict without removing it.""" + return self.data[-1] if self.data else None - def fetch(self): + def fetch(self) -> Dict[str, Any]: + """Merge all collected dicts into one and return it.""" return self.__merge_data() - def __merge_data(self): - result = {} + def __merge_data(self) -> Dict[str, Any]: + """Helper method to merge all collected dicts.""" + merged = {} for d in self.data: - result.update(d) - return result + merged.update(d) + return merged + + # ——————————————————————————————— + # Flo Q-Promise Looping Methods + # ——————————————————————————————— + + def rewind( + self, + then_callback: Optional[Callable[[Dict[str, Any]], None]] = None, + depth: Optional[int] = None + ) -> None: + """ + Recursively replay memory entries newest→oldest, invoking `then_callback` per step. + Mirrors JS Promise.then chaining in reverse order. + :param then_callback: function to handle each entry + :param depth: max number of entries to process + """ + if not self.data: + get_logger().warn("No memory to rewind.") + return + + entries = self.data[::-1] # Reverse: newest first + if depth: + entries = entries[:depth] + + def _recursive(idx: int) -> None: + if idx >= len(entries): + return + entry = entries[idx] + if then_callback: + then_callback(entry) + _recursive(idx + 1) + + _recursive(0) + + def iter_q(self, depth: Optional[int] = None) -> "FloIterator": + """ + Return a FloIterator that yields one-item lists of entries, + enabling a while–for hybrid loop over memory steps. + """ + return FloIterator(self, depth) + + +class FloIterator: + """ + Hybrid while–for iterator over FloJsonOutputCollector data. + Newest entries first, depth-limited. + """ + + def __init__(self, collector: FloJsonOutputCollector, depth: Optional[int] = None): + self.entries = collector.data[::-1] + self.limit = min(depth, len(self.entries)) if depth is not None else len(self.entries) + self.index = 0 + + def has_next(self) -> bool: + """True if more entries remain.""" + return self.index < self.limit + + def next(self) -> List[Dict[str, Any]]: + """ + Return the next “batch” of entries (here, a single-item list). + Returns [] when exhausted. + """ + if not self.has_next(): + return [] + entry = self.entries[self.index] + self.index += 1 + return [entry] + +