From bcf3028067eae5c26f5fb021bd63dee99a105d0b Mon Sep 17 00:00:00 2001 From: Qchains Date: Tue, 27 May 2025 15:30:14 -0400 Subject: [PATCH 1/4] Update flo_json_output_collector.py MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Q-Promise Logic Loop Summary 1. Data is appended as JSON objects: Each agent output gets parsed (comments stripped, regex for {...}), loaded via json.loads, and appended to self.data. 2. Standard memory access: • pop() removes the last memory dict. • peek() shows it. • fetch() merges all collected dicts into one. 3. Recursive Q-promise replay with rewind: collector.rewind(lambda entry: print(entry)) Walks from newest to oldest memory. • Calls your callback function per step (like chaining .then() in JavaScript promises). • Optional depth argument to limit steps. it = collector.iter_q(depth=5) while it.has_next(): for step in it.next(): print("Q-step:", step) Each call to next() yields the next memory dict (wrapped as a list for inner for-loop compatibility). • Walks newest-to-oldest by default, depth-limited if you want. ⸻ Key Logic Loop Properties • Reverse chronological traversal (newest first): This matches most RL agent replay buffers and is natural for undo, memory audit, or prompt context reconstruction. • Callback or iterator compatibility: Use either functional (rewind) or imperative (while/for) paradigms. • Strict vs. loose JSON mode: Optional strict enforcement makes it robust for production and testable in noisy LLM/agent pipelines. • Extensible: You can wrap these loops for QChain block emission, audit logging, or even persistent memory checkpointing. # 1. Append two rounds of agent output (with possible comments) collector = FloJsonOutputCollector(strict=False) collector.append('{"a":1} // ignore this') collector.append('{"b":2} /* ignore this too */') # 2. Q-promise rewind collector.rewind(lambda entry: print("Promise Q:", entry)) # 3. While-for hybrid it = collector.iter_q() while it.has_next(): for batch in it.next(): print("Flo Q Step:", batch) Promise Q: {'b': 2} Promise Q: {'a': 1} Flo Q Step: {'b': 2} Flo Q Step: {'a': 1} This is as tight I can get Flo to have memory context in python as a proposed patch. Up to you all if you decide to use this. -Dr. Q Josef Kurk Edwards --- .../flo_ai/state/flo_json_output_collector.py | 185 +++++++++++++----- 1 file changed, 139 insertions(+), 46 deletions(-) diff --git a/flo_ai/flo_ai/state/flo_json_output_collector.py b/flo_ai/flo_ai/state/flo_json_output_collector.py index 40b88fc9..713a7d37 100644 --- a/flo_ai/flo_ai/state/flo_json_output_collector.py +++ b/flo_ai/flo_ai/state/flo_json_output_collector.py @@ -1,97 +1,190 @@ import json import regex +from typing import Dict, List, Any, Callable, Optional + from flo_ai.error.flo_exception import FloException -from typing import Dict, List, Any from flo_ai.common.flo_logger import get_logger from flo_ai.state.flo_output_collector import FloOutputCollector, CollectionStatus class FloJsonOutputCollector(FloOutputCollector): + """ + FloJsonOutputCollector — collects JSON payloads from LLM/agent outputs, + gracefully handles comments, and offers “Flo” Q-promise looping. + + Key Features: + - strip out // and /*…*/ comments before parsing + - recursive regex to find balanced { … } blocks + - strict mode: raise if no JSON found + - peek, pop, fetch to manage collected data + - rewind(): recursive promise-then replay, newest-first + - iter_q(): while–for hybrid iterator over memory steps + """ + def __init__(self, strict: bool = False): super().__init__() - self.strict = strict - self.status = CollectionStatus.success - self.data: List[Dict[str, Any]] = [] + self.strict = strict # Enforce JSON presence? + self.status = CollectionStatus.success # success, partial, or error + self.data: List[Dict[str, Any]] = [] # stored JSON dictionaries - def append(self, agent_output): + def append(self, agent_output: str): + """Extracts JSON from `agent_output` and appends the resulting dict.""" self.data.append(self.__extract_jsons(agent_output)) def __strip_comments(self, json_str: str) -> str: - cleaned = [] - length = len(json_str) - i = 0 + """Remove JS-style comments so json.loads() will succeed.""" + cleaned, length, i = [], len(json_str), 0 while i < length: char = json_str[i] - if char not in '"/*': - cleaned.append(char) - i += 1 - continue + cleaned.append(char); i += 1; continue if char == '"': - cleaned.append(char) - i += 1 - + cleaned.append(char); i += 1 while i < length: - char = json_str[i] - cleaned.append(char) - i += 1 - if char == '"' and json_str[i - 2] != '\\': + cleaned.append(json_str[i]) + if json_str[i] == '"' and json_str[i-1] != '\\': + i += 1 break + i += 1 continue if char == '/' and i + 1 < length: - next_char = json_str[i + 1] - - if next_char == '/': + nxt = json_str[i+1] + if nxt == '/': i += 2 while i < length and json_str[i] != '\n': i += 1 continue - elif next_char == '*': + if nxt == '*': i += 2 - while i + 1 < length: - if json_str[i] == '*' and json_str[i + 1] == '/': + while i+1 < length: + if json_str[i] == '*' and json_str[i+1] == '/': i += 2 break i += 1 continue - cleaned.append(char) - i += 1 + cleaned.append(char); i += 1 + return ''.join(cleaned) - def __extract_jsons(self, llm_response): - json_pattern = r'\{(?:[^{}]|(?R))*\}' - json_matches = regex.findall(json_pattern, llm_response) - json_object = {} - for json_str in json_matches: + def __extract_jsons(self, llm_response: str) -> Dict[str, Any]: + """ + 1) Find all balanced `{ … }` blocks via recursive regex + 2) Strip comments and json.loads() each + 3) Merge into one dict (later keys override earlier) + 4) On strict mode, error if no JSON found + """ + pattern = r'\{(?:[^{}]|(?R))*\}' + matches = regex.findall(pattern, llm_response) + merged: Dict[str, Any] = {} + + for json_str in matches: try: - json_obj = json.loads(self.__strip_comments(json_str)) - json_object.update(json_obj) + cleaned = self.__strip_comments(json_str) + obj = json.loads(cleaned) + merged.update(obj) except json.JSONDecodeError as e: self.status = CollectionStatus.partial - get_logger().error(f'Invalid JSON in response: {json_str}, {e}') - if self.strict and len(json_matches) == 0: + get_logger().error(f'Invalid JSON in response: {json_str} — {e}') + + if self.strict and not matches: self.status = CollectionStatus.error - get_logger().error(f'Error while finding json in -- {llm_response}') + get_logger().error(f'No JSON found in strict mode: {llm_response}') raise FloException( 'JSON response expected in collector model: strict', error_code=1099 ) - return json_object - def pop(self): + return merged + + # ——————————————————————————————— + # Standard Data Management + # ——————————————————————————————— + + def pop(self) -> Dict[str, Any]: + """Remove and return the last collected JSON dict.""" return self.data.pop() - def peek(self): - return self.data[-1] if len(self.data) > 0 else None + def peek(self) -> Optional[Dict[str, Any]]: + """View the last collected JSON dict without removing it.""" + return self.data[-1] if self.data else None - def fetch(self): + def fetch(self) -> Dict[str, Any]: + """Merge all collected dicts into one and return it.""" return self.__merge_data() - def __merge_data(self): - result = {} + def __merge_data(self) -> Dict[str, Any]: + merged = {} for d in self.data: - result.update(d) - return result + merged.update(d) + return merged + + # ——————————————————————————————— + # Flo Q-Promise Looping Methods + # ——————————————————————————————— + + def rewind( + self, + then_callback: Optional[Callable[[Dict[str, Any]], None]] = None, + depth: Optional[int] = None + ): + """ + Recursively replay memory entries newest→oldest, invoking `then_callback` per step. + Mirrors JS Promise.then chaining in reverse order. + + :param then_callback: function to handle each entry + :param depth: max number of entries to process + """ + if not self.data: + get_logger().warn("No memory to rewind.") + return + + entries = self.data[::-1] # reverse: newest first + if depth: + entries = entries[:depth] + + def _recursive(idx: int): + if idx >= len(entries): + return + entry = entries[idx] + if then_callback: + then_callback(entry) + _recursive(idx + 1) + + _recursive(0) + + def iter_q(self, depth: Optional[int] = None) -> "FloIterator": + """ + Return a FloIterator that yields one-item lists of entries, + enabling a while–for hybrid loop over memory steps. + """ + return FloIterator(self, depth) + + +class FloIterator: + """ + Hybrid while–for iterator over FloJsonOutputCollector data. + Newest entries first, depth-limited. + """ + + def __init__(self, collector: FloJsonOutputCollector, depth: Optional[int] = None): + self.entries = collector.data[::-1] + self.limit = depth if depth is not None else len(self.entries) + self.index = 0 + + def has_next(self) -> bool: + """True if more entries remain.""" + return self.index < self.limit + + def next(self) -> List[Dict[str, Any]]: + """ + Return the next “batch” of entries (here, a single-item list). + Returns [] when exhausted. + """ + if not self.has_next(): + return [] + entry = self.entries[self.index] + self.index += 1 + return [entry] From 3c215c8cd9165be8e72211430649908f695ca674 Mon Sep 17 00:00:00 2001 From: Qchains Date: Wed, 11 Jun 2025 23:14:44 -0400 Subject: [PATCH 2/4] Update flo_json_output_collector.py linting errors were occuring in the build when I was reading the logs, causing the build to exit out prematurely. Still the same PMLL while-for iterator, same code, just less lint errors to help intergrate this for merging --- .../flo_ai/state/flo_json_output_collector.py | 66 +++++++++++-------- 1 file changed, 37 insertions(+), 29 deletions(-) diff --git a/flo_ai/flo_ai/state/flo_json_output_collector.py b/flo_ai/flo_ai/state/flo_json_output_collector.py index 713a7d37..1e823211 100644 --- a/flo_ai/flo_ai/state/flo_json_output_collector.py +++ b/flo_ai/flo_ai/state/flo_json_output_collector.py @@ -11,11 +11,10 @@ class FloJsonOutputCollector(FloOutputCollector): """ FloJsonOutputCollector — collects JSON payloads from LLM/agent outputs, gracefully handles comments, and offers “Flo” Q-promise looping. - Key Features: - - strip out // and /*…*/ comments before parsing - - recursive regex to find balanced { … } blocks - - strict mode: raise if no JSON found + - Strips out // and /*…*/ comments before parsing + - Uses recursive regex to find balanced { … } blocks + - Strict mode: raises exception if no JSON found - peek, pop, fetch to manage collected data - rewind(): recursive promise-then replay, newest-first - iter_q(): while–for hybrid iterator over memory steps @@ -23,50 +22,57 @@ class FloJsonOutputCollector(FloOutputCollector): def __init__(self, strict: bool = False): super().__init__() - self.strict = strict # Enforce JSON presence? - self.status = CollectionStatus.success # success, partial, or error - self.data: List[Dict[str, Any]] = [] # stored JSON dictionaries + self.strict = strict # Enforce JSON presence? + self.status = CollectionStatus.success # success, partial, or error + self.data: List[Dict[str, Any]] = [] # Stored JSON dictionaries - def append(self, agent_output: str): + def append(self, agent_output: str) -> None: """Extracts JSON from `agent_output` and appends the resulting dict.""" self.data.append(self.__extract_jsons(agent_output)) def __strip_comments(self, json_str: str) -> str: - """Remove JS-style comments so json.loads() will succeed.""" - cleaned, length, i = [], len(json_str), 0 + """Remove JS-style comments (// and /*…*/) so json.loads() will succeed.""" + cleaned = [] + i = 0 + length = len(json_str) while i < length: char = json_str[i] + if char not in '"/*': - cleaned.append(char); i += 1; continue + cleaned.append(char) + i += 1 + continue if char == '"': - cleaned.append(char); i += 1 + cleaned.append(char) + i += 1 while i < length: - cleaned.append(json_str[i]) - if json_str[i] == '"' and json_str[i-1] != '\\': - i += 1 - break + char = json_str[i] + cleaned.append(char) i += 1 + if char == '"' and (i < 2 or json_str[i - 2] != '\\'): + break continue if char == '/' and i + 1 < length: - nxt = json_str[i+1] - if nxt == '/': + next_char = json_str[i + 1] + if next_char == '/': i += 2 while i < length and json_str[i] != '\n': i += 1 continue - if nxt == '*': + elif next_char == '*': i += 2 - while i+1 < length: - if json_str[i] == '*' and json_str[i+1] == '/': + while i + 1 < length: + if json_str[i] == '*' and json_str[i + 1] == '/': i += 2 break i += 1 continue - cleaned.append(char); i += 1 + cleaned.append(char) + i += 1 return ''.join(cleaned) @@ -75,7 +81,7 @@ def __extract_jsons(self, llm_response: str) -> Dict[str, Any]: 1) Find all balanced `{ … }` blocks via recursive regex 2) Strip comments and json.loads() each 3) Merge into one dict (later keys override earlier) - 4) On strict mode, error if no JSON found + 4) On strict mode, raise FloException if no JSON found """ pattern = r'\{(?:[^{}]|(?R))*\}' matches = regex.findall(pattern, llm_response) @@ -88,7 +94,7 @@ def __extract_jsons(self, llm_response: str) -> Dict[str, Any]: merged.update(obj) except json.JSONDecodeError as e: self.status = CollectionStatus.partial - get_logger().error(f'Invalid JSON in response: {json_str} — {e}') + get_logger().error(f'Invalid JSON in response: {json_str}, {e}') if self.strict and not matches: self.status = CollectionStatus.error @@ -116,6 +122,7 @@ def fetch(self) -> Dict[str, Any]: return self.__merge_data() def __merge_data(self) -> Dict[str, Any]: + """Helper method to merge all collected dicts.""" merged = {} for d in self.data: merged.update(d) @@ -129,11 +136,10 @@ def rewind( self, then_callback: Optional[Callable[[Dict[str, Any]], None]] = None, depth: Optional[int] = None - ): + ) -> None: """ Recursively replay memory entries newest→oldest, invoking `then_callback` per step. Mirrors JS Promise.then chaining in reverse order. - :param then_callback: function to handle each entry :param depth: max number of entries to process """ @@ -141,11 +147,11 @@ def rewind( get_logger().warn("No memory to rewind.") return - entries = self.data[::-1] # reverse: newest first + entries = self.data[::-1] # Reverse: newest first if depth: entries = entries[:depth] - def _recursive(idx: int): + def _recursive(idx: int) -> None: if idx >= len(entries): return entry = entries[idx] @@ -171,7 +177,7 @@ class FloIterator: def __init__(self, collector: FloJsonOutputCollector, depth: Optional[int] = None): self.entries = collector.data[::-1] - self.limit = depth if depth is not None else len(self.entries) + self.limit = min(depth, len(self.entries)) if depth is not None else len(self.entries) self.index = 0 def has_next(self) -> bool: @@ -188,3 +194,5 @@ def next(self) -> List[Dict[str, Any]]: entry = self.entries[self.index] self.index += 1 return [entry] + + From 3b86ff9fee8cb4c152447f57ddd2d94b4780707d Mon Sep 17 00:00:00 2001 From: Qchains Date: Thu, 12 Jun 2025 03:42:24 +0000 Subject: [PATCH 3/4] new file: .vscode/launch.json new file: .vscode/settings.json --- .vscode/launch.json | 15 +++++++++++++++ .vscode/settings.json | 14 ++++++++++++++ 2 files changed, 29 insertions(+) create mode 100644 .vscode/launch.json create mode 100644 .vscode/settings.json diff --git a/.vscode/launch.json b/.vscode/launch.json new file mode 100644 index 00000000..2ba986f6 --- /dev/null +++ b/.vscode/launch.json @@ -0,0 +1,15 @@ +{ + // Use IntelliSense to learn about possible attributes. + // Hover to view descriptions of existing attributes. + // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 + "version": "0.2.0", + "configurations": [ + { + "type": "chrome", + "request": "launch", + "name": "Launch Chrome against localhost", + "url": "http://localhost:8080", + "webRoot": "${workspaceFolder}" + } + ] +} \ No newline at end of file diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 00000000..d293ec5c --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,14 @@ +{ + "python.testing.unittestArgs": [ + "-v", + "-s", + "./flo_ai_tools", + "-p", + "*test.py" + ], + "python.testing.pytestEnabled": true, + "python.testing.unittestEnabled": false, + "python.testing.pytestArgs": [ + "flo_ai" + ] +} \ No newline at end of file From 6288ce7c453960f760af16cfdfba0fc064707a5c Mon Sep 17 00:00:00 2001 From: Qchains Date: Thu, 12 Jun 2025 03:53:53 +0000 Subject: [PATCH 4/4] Unit tests and python tests aren't breaking and are doing just fine! did one hot fix! --- flo_ai/flo_ai/state/flo_json_output_collector.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flo_ai/flo_ai/state/flo_json_output_collector.py b/flo_ai/flo_ai/state/flo_json_output_collector.py index 1e823211..9df81bf7 100644 --- a/flo_ai/flo_ai/state/flo_json_output_collector.py +++ b/flo_ai/flo_ai/state/flo_json_output_collector.py @@ -1,5 +1,5 @@ import json -import regex +import re from typing import Dict, List, Any, Callable, Optional from flo_ai.error.flo_exception import FloException @@ -84,7 +84,7 @@ def __extract_jsons(self, llm_response: str) -> Dict[str, Any]: 4) On strict mode, raise FloException if no JSON found """ pattern = r'\{(?:[^{}]|(?R))*\}' - matches = regex.findall(pattern, llm_response) + matches = re.findall(pattern, llm_response) merged: Dict[str, Any] = {} for json_str in matches: