From 6f8153f743cf1c8b38e33f71d1368be6850f322b Mon Sep 17 00:00:00 2001 From: Cursx <33718736+Cursx@users.noreply.github.com> Date: Fri, 21 Nov 2025 17:52:31 +0800 Subject: [PATCH 01/43] Update fc_agent_runner.py --- api/core/agent/fc_agent_runner.py | 73 +++++++++++++++++++++++++------ 1 file changed, 60 insertions(+), 13 deletions(-) diff --git a/api/core/agent/fc_agent_runner.py b/api/core/agent/fc_agent_runner.py index dcc1326b33bc34..fe24a27491d39e 100644 --- a/api/core/agent/fc_agent_runner.py +++ b/api/core/agent/fc_agent_runner.py @@ -265,7 +265,10 @@ def increase_usage(final_llm_usage_dict: dict[str, LLMUsage | None], usage: LLMU } tool_responses.append(tool_response) - if tool_response["tool_response"] is not None: + # check direct return flag + direct_flag = (tool_invoke_meta.extra or {}).get("return_direct", False) + + if tool_response["tool_response"] is not None and not direct_flag: self._current_thoughts.append( ToolPromptMessage( content=str(tool_response["tool_response"]), @@ -274,6 +277,28 @@ def increase_usage(final_llm_usage_dict: dict[str, LLMUsage | None], usage: LLMU ) ) + if direct_flag: + # save agent thought for this tool call + self.save_agent_thought( + agent_thought_id=agent_thought_id, + tool_name=tool_call_name, + tool_input=tool_call_args, + thought=llm_result.message.content or "", + tool_invoke_meta={tool_call_name: tool_invoke_meta.to_dict()}, + observation={tool_call_name: tool_invoke_response}, + answer=str(tool_invoke_response or ""), + messages_ids=message_file_ids, + ) + self.queue_manager.publish( + QueueAgentThoughtEvent(agent_thought_id=agent_thought_id), PublishFrom.APPLICATION_MANAGER + ) + + # publish end event immediately and return + final_answer = str(tool_invoke_response or "") + llm_final_usage = llm_usage.get("usage") or LLMUsage.empty_usage() + yield from self._yield_final_answer(prompt_messages, final_answer, llm_final_usage) + return + if len(tool_responses) > 0: # save agent thought self.save_agent_thought( @@ -301,18 +326,10 @@ def increase_usage(final_llm_usage_dict: dict[str, LLMUsage | None], usage: LLMU iteration_step += 1 - # publish end event - self.queue_manager.publish( - QueueMessageEndEvent( - llm_result=LLMResult( - model=model_instance.model, - prompt_messages=prompt_messages, - message=AssistantPromptMessage(content=final_answer), - usage=llm_usage["usage"] or LLMUsage.empty_usage(), - system_fingerprint="", - ) - ), - PublishFrom.APPLICATION_MANAGER, + yield from self._yield_final_answer( + prompt_messages, + final_answer, + llm_usage["usage"] or LLMUsage.empty_usage(), ) def check_tool_calls(self, llm_result_chunk: LLMResultChunk) -> bool: @@ -377,6 +394,36 @@ def extract_blocking_tool_calls(self, llm_result: LLMResult) -> list[tuple[str, return tool_calls + def _yield_final_answer( + self, + prompt_messages: list, + final_answer: str, + usage: LLMUsage, + ) -> Generator[LLMResultChunk, None, None]: + self.queue_manager.publish( + QueueMessageEndEvent( + llm_result=LLMResult( + model=self.model_instance.model, + prompt_messages=prompt_messages, + message=AssistantPromptMessage(content=final_answer), + usage=usage, + system_fingerprint="", + ) + ), + PublishFrom.APPLICATION_MANAGER, + ) + + yield LLMResultChunk( + model=self.model_instance.model, + prompt_messages=prompt_messages, + system_fingerprint="", + delta=LLMResultChunkDelta( + index=0, + message=AssistantPromptMessage(content=final_answer), + usage=usage, + ), + ) + def _init_system_message(self, prompt_template: str, prompt_messages: list[PromptMessage]) -> list[PromptMessage]: """ Initialize system message From 8f13ea07fd7c7688db83b5b0331ded33e176b2cc Mon Sep 17 00:00:00 2001 From: Cursx <33718736+Cursx@users.noreply.github.com> Date: Fri, 21 Nov 2025 17:53:49 +0800 Subject: [PATCH 02/43] Update cot_agent_runner.py --- api/core/agent/cot_agent_runner.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/api/core/agent/cot_agent_runner.py b/api/core/agent/cot_agent_runner.py index 25ad6dc06017db..eea40821282c28 100644 --- a/api/core/agent/cot_agent_runner.py +++ b/api/core/agent/cot_agent_runner.py @@ -202,7 +202,6 @@ def increase_usage(final_llm_usage_dict: dict[str, LLMUsage | None], usage: LLMU except TypeError: final_answer = f"{scratchpad.action.action_input}" else: - function_call_state = True # action is tool call, invoke tool tool_invoke_response, tool_invoke_meta = self._handle_invoke_action( action=scratchpad.action, @@ -213,6 +212,9 @@ def increase_usage(final_llm_usage_dict: dict[str, LLMUsage | None], usage: LLMU scratchpad.observation = tool_invoke_response scratchpad.agent_response = tool_invoke_response + # detect direct return + direct_flag = (tool_invoke_meta.extra or {}).get("return_direct", False) + self.save_agent_thought( agent_thought_id=agent_thought_id, tool_name=scratchpad.action.action_name, @@ -229,6 +231,12 @@ def increase_usage(final_llm_usage_dict: dict[str, LLMUsage | None], usage: LLMU QueueAgentThoughtEvent(agent_thought_id=agent_thought_id), PublishFrom.APPLICATION_MANAGER ) + if direct_flag: + final_answer = str(tool_invoke_response or "") + # keep function_call_state as False to end iterations + else: + function_call_state = True + # update prompt tool message for prompt_tool in self._prompt_messages_tools: self.update_prompt_message_tool(tool_instances[prompt_tool.name], prompt_tool) From ac6845637e0ba5553ecc1c4a48298950a41bf2e7 Mon Sep 17 00:00:00 2001 From: Cursx <33718736+Cursx@users.noreply.github.com> Date: Fri, 21 Nov 2025 17:55:15 +0800 Subject: [PATCH 03/43] Update tool_entities.py --- api/core/tools/entities/tool_entities.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/api/core/tools/entities/tool_entities.py b/api/core/tools/entities/tool_entities.py index 353f3a646a9542..23e6c0c5e11140 100644 --- a/api/core/tools/entities/tool_entities.py +++ b/api/core/tools/entities/tool_entities.py @@ -427,6 +427,7 @@ class ToolInvokeMeta(BaseModel): time_cost: float = Field(..., description="The time cost of the tool invoke") error: str | None = None tool_config: dict | None = None + extra: dict | None = None @classmethod def empty(cls) -> "ToolInvokeMeta": @@ -447,6 +448,7 @@ def to_dict(self): "time_cost": self.time_cost, "error": self.error, "tool_config": self.tool_config, + "extra": self.extra, } From c3e09b42edd60a8f2f0c46d86035c1d78570919d Mon Sep 17 00:00:00 2001 From: Cursx <33718736+Cursx@users.noreply.github.com> Date: Fri, 21 Nov 2025 17:55:44 +0800 Subject: [PATCH 04/43] Update tool_engine.py --- api/core/tools/tool_engine.py | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/api/core/tools/tool_engine.py b/api/core/tools/tool_engine.py index 13fd579e20aa01..0c49ffda2e8440 100644 --- a/api/core/tools/tool_engine.py +++ b/api/core/tools/tool_engine.py @@ -107,9 +107,19 @@ def message_callback( tool_messages=binary_files, agent_message=message, invoke_from=invoke_from, user_id=user_id ) + # detect return_direct signal from variable messages (short-circuit) + return_direct = False + for m in message_list: + if m.type == ToolInvokeMessage.MessageType.VARIABLE: + variable = cast(ToolInvokeMessage.VariableMessage, m.message) + if variable.variable_name == "return_direct" and bool(variable.variable_value): + return_direct = True + break + plain_text = ToolEngine._convert_tool_response_to_str(message_list) meta = invocation_meta_dict["meta"] + meta.extra = {"return_direct": return_direct} # hit the callback handler agent_tool_callback.on_tool_end( @@ -254,6 +264,9 @@ def _convert_tool_response_to_str(tool_response: list[ToolInvokeMessage]) -> str ensure_ascii=False, ) ) + elif response.type == ToolInvokeMessage.MessageType.VARIABLE: + # internal variable messages should not be surfaced into plain text + continue else: parts.append(str(response.message)) From 00f49e1f45fed9868db2d96715a6776ea1099843 Mon Sep 17 00:00:00 2001 From: Cursx <33718736+Cursx@users.noreply.github.com> Date: Fri, 21 Nov 2025 17:59:51 +0800 Subject: [PATCH 05/43] Update api/core/agent/fc_agent_runner.py Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> --- api/core/agent/fc_agent_runner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/api/core/agent/fc_agent_runner.py b/api/core/agent/fc_agent_runner.py index fe24a27491d39e..3c61284404c553 100644 --- a/api/core/agent/fc_agent_runner.py +++ b/api/core/agent/fc_agent_runner.py @@ -283,7 +283,7 @@ def increase_usage(final_llm_usage_dict: dict[str, LLMUsage | None], usage: LLMU agent_thought_id=agent_thought_id, tool_name=tool_call_name, tool_input=tool_call_args, - thought=llm_result.message.content or "", + thought=response or "", tool_invoke_meta={tool_call_name: tool_invoke_meta.to_dict()}, observation={tool_call_name: tool_invoke_response}, answer=str(tool_invoke_response or ""), From f51e8b0d1dc1123ac92cd6bf254e1c20bb7d1f6c Mon Sep 17 00:00:00 2001 From: Cursx <33718736+Cursx@users.noreply.github.com> Date: Fri, 21 Nov 2025 18:01:53 +0800 Subject: [PATCH 06/43] Update api/core/agent/fc_agent_runner.py Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> --- api/core/agent/fc_agent_runner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/api/core/agent/fc_agent_runner.py b/api/core/agent/fc_agent_runner.py index 3c61284404c553..31dbf5dc807899 100644 --- a/api/core/agent/fc_agent_runner.py +++ b/api/core/agent/fc_agent_runner.py @@ -329,7 +329,7 @@ def increase_usage(final_llm_usage_dict: dict[str, LLMUsage | None], usage: LLMU yield from self._yield_final_answer( prompt_messages, final_answer, - llm_usage["usage"] or LLMUsage.empty_usage(), + llm_usage.get("usage") or LLMUsage.empty_usage(), ) def check_tool_calls(self, llm_result_chunk: LLMResultChunk) -> bool: From 94bad9cb31a229384e0adb54b5941e05ffa261b9 Mon Sep 17 00:00:00 2001 From: Cursx <33718736+Cursx@users.noreply.github.com> Date: Fri, 21 Nov 2025 18:07:50 +0800 Subject: [PATCH 07/43] Update api/core/agent/fc_agent_runner.py Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> --- api/core/agent/fc_agent_runner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/api/core/agent/fc_agent_runner.py b/api/core/agent/fc_agent_runner.py index 31dbf5dc807899..3e640e4eabb2de 100644 --- a/api/core/agent/fc_agent_runner.py +++ b/api/core/agent/fc_agent_runner.py @@ -396,7 +396,7 @@ def extract_blocking_tool_calls(self, llm_result: LLMResult) -> list[tuple[str, def _yield_final_answer( self, - prompt_messages: list, + prompt_messages: list[PromptMessage], final_answer: str, usage: LLMUsage, ) -> Generator[LLMResultChunk, None, None]: From 9a8b33372480a78e85600231ecb17c987538836b Mon Sep 17 00:00:00 2001 From: Cursx <33718736+Cursx@users.noreply.github.com> Date: Mon, 24 Nov 2025 14:33:15 +0800 Subject: [PATCH 08/43] Update tool.py --- api/core/tools/workflow_as_tool/tool.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/api/core/tools/workflow_as_tool/tool.py b/api/core/tools/workflow_as_tool/tool.py index 5703c19c880221..2750133902f4c0 100644 --- a/api/core/tools/workflow_as_tool/tool.py +++ b/api/core/tools/workflow_as_tool/tool.py @@ -114,10 +114,24 @@ def _invoke( for file in files: yield self.create_file_message(file) # type: ignore + # detect return_direct flag from workflow outputs + return_direct_flag = False + if isinstance(outputs, dict) and "return_direct" in outputs: + raw_flag = outputs.pop("return_direct") + if isinstance(raw_flag, str): + return_direct_flag = raw_flag.strip().lower() in {"true", "1", "yes", "y"} + else: + try: + return_direct_flag = bool(raw_flag) + except Exception: + return_direct_flag = False + self._latest_usage = self._derive_usage_from_result(data) yield self.create_text_message(json.dumps(outputs, ensure_ascii=False)) yield self.create_json_message(outputs, suppress_output=True) + if return_direct_flag: + yield self.create_variable_message("return_direct", True) @property def latest_usage(self) -> LLMUsage: From f48435fb91dd92d4838e9f08f2fae81d2e342921 Mon Sep 17 00:00:00 2001 From: Cursx <33718736+Cursx@users.noreply.github.com> Date: Tue, 25 Nov 2025 17:00:02 +0800 Subject: [PATCH 09/43] Update tool.py --- api/core/tools/workflow_as_tool/tool.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/api/core/tools/workflow_as_tool/tool.py b/api/core/tools/workflow_as_tool/tool.py index 2750133902f4c0..57db271ac8e73e 100644 --- a/api/core/tools/workflow_as_tool/tool.py +++ b/api/core/tools/workflow_as_tool/tool.py @@ -121,10 +121,7 @@ def _invoke( if isinstance(raw_flag, str): return_direct_flag = raw_flag.strip().lower() in {"true", "1", "yes", "y"} else: - try: - return_direct_flag = bool(raw_flag) - except Exception: - return_direct_flag = False + return_direct_flag = bool(raw_flag) self._latest_usage = self._derive_usage_from_result(data) From 925aef3d06c83d0a6c611fe68478874d048eb4bd Mon Sep 17 00:00:00 2001 From: Cursx <33718736+Cursx@users.noreply.github.com> Date: Tue, 25 Nov 2025 17:01:22 +0800 Subject: [PATCH 10/43] Apply suggestions from code review Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> --- api/core/tools/tool_engine.py | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/api/core/tools/tool_engine.py b/api/core/tools/tool_engine.py index 0c49ffda2e8440..2ca5fd9ae2fd1d 100644 --- a/api/core/tools/tool_engine.py +++ b/api/core/tools/tool_engine.py @@ -108,13 +108,14 @@ def message_callback( ) # detect return_direct signal from variable messages (short-circuit) - return_direct = False - for m in message_list: - if m.type == ToolInvokeMessage.MessageType.VARIABLE: - variable = cast(ToolInvokeMessage.VariableMessage, m.message) - if variable.variable_name == "return_direct" and bool(variable.variable_value): - return_direct = True - break + variable_messages = ( + cast(ToolInvokeMessage.VariableMessage, m.message) + for m in message_list if m.type == ToolInvokeMessage.MessageType.VARIABLE + ) + return_direct = any( + vm.variable_name == "return_direct" and bool(vm.variable_value) + for vm in variable_messages + ) plain_text = ToolEngine._convert_tool_response_to_str(message_list) From ee9380545e37acacf812a7cbd587b01a1f476e47 Mon Sep 17 00:00:00 2001 From: Cursx <33718736+Cursx@users.noreply.github.com> Date: Tue, 25 Nov 2025 17:05:17 +0800 Subject: [PATCH 11/43] Update fc_agent_runner.py --- api/core/agent/fc_agent_runner.py | 57 +++++++++++++++++++++---------- 1 file changed, 39 insertions(+), 18 deletions(-) diff --git a/api/core/agent/fc_agent_runner.py b/api/core/agent/fc_agent_runner.py index 3e640e4eabb2de..8fea383ca7d696 100644 --- a/api/core/agent/fc_agent_runner.py +++ b/api/core/agent/fc_agent_runner.py @@ -278,25 +278,18 @@ def increase_usage(final_llm_usage_dict: dict[str, LLMUsage | None], usage: LLMU ) if direct_flag: - # save agent thought for this tool call - self.save_agent_thought( - agent_thought_id=agent_thought_id, - tool_name=tool_call_name, - tool_input=tool_call_args, - thought=response or "", - tool_invoke_meta={tool_call_name: tool_invoke_meta.to_dict()}, - observation={tool_call_name: tool_invoke_response}, - answer=str(tool_invoke_response or ""), - messages_ids=message_file_ids, - ) - self.queue_manager.publish( - QueueAgentThoughtEvent(agent_thought_id=agent_thought_id), PublishFrom.APPLICATION_MANAGER - ) - - # publish end event immediately and return - final_answer = str(tool_invoke_response or "") llm_final_usage = llm_usage.get("usage") or LLMUsage.empty_usage() - yield from self._yield_final_answer(prompt_messages, final_answer, llm_final_usage) + yield from self._handle_direct_return( + agent_thought_id, + tool_call_name, + tool_call_args, + response or "", + tool_invoke_meta, + tool_invoke_response, + message_file_ids, + prompt_messages, + llm_final_usage, + ) return if len(tool_responses) > 0: @@ -424,6 +417,34 @@ def _yield_final_answer( ), ) + def _handle_direct_return( + self, + agent_thought_id: str, + tool_call_name: str, + tool_call_args: dict[str, Any], + response: str, + tool_invoke_meta: ToolInvokeMeta, + tool_invoke_response: str | None, + message_file_ids: list[str], + prompt_messages: list[PromptMessage], + usage: LLMUsage, + ) -> Generator[LLMResultChunk, None, None]: + self.save_agent_thought( + agent_thought_id=agent_thought_id, + tool_name=tool_call_name, + tool_input=tool_call_args, + thought=response, + tool_invoke_meta={tool_call_name: tool_invoke_meta.to_dict()}, + observation={tool_call_name: tool_invoke_response}, + answer=str(tool_invoke_response or ""), + messages_ids=message_file_ids, + ) + self.queue_manager.publish( + QueueAgentThoughtEvent(agent_thought_id=agent_thought_id), PublishFrom.APPLICATION_MANAGER + ) + final_answer = str(tool_invoke_response or "") + yield from self._yield_final_answer(prompt_messages, final_answer, usage) + def _init_system_message(self, prompt_template: str, prompt_messages: list[PromptMessage]) -> list[PromptMessage]: """ Initialize system message From faa979b48db0403bdd27016716b8cb3a2cd5cc9f Mon Sep 17 00:00:00 2001 From: Cursx <33718736+Cursx@users.noreply.github.com> Date: Tue, 25 Nov 2025 17:19:08 +0800 Subject: [PATCH 12/43] Update fc_agent_runner.py --- api/core/agent/fc_agent_runner.py | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/api/core/agent/fc_agent_runner.py b/api/core/agent/fc_agent_runner.py index 8fea383ca7d696..c7058bab4ddee1 100644 --- a/api/core/agent/fc_agent_runner.py +++ b/api/core/agent/fc_agent_runner.py @@ -281,10 +281,8 @@ def increase_usage(final_llm_usage_dict: dict[str, LLMUsage | None], usage: LLMU llm_final_usage = llm_usage.get("usage") or LLMUsage.empty_usage() yield from self._handle_direct_return( agent_thought_id, - tool_call_name, - tool_call_args, + tool_responses, response or "", - tool_invoke_meta, tool_invoke_response, message_file_ids, prompt_messages, @@ -420,10 +418,8 @@ def _yield_final_answer( def _handle_direct_return( self, agent_thought_id: str, - tool_call_name: str, - tool_call_args: dict[str, Any], + tool_responses: list[dict[str, Any]], response: str, - tool_invoke_meta: ToolInvokeMeta, tool_invoke_response: str | None, message_file_ids: list[str], prompt_messages: list[PromptMessage], @@ -431,11 +427,11 @@ def _handle_direct_return( ) -> Generator[LLMResultChunk, None, None]: self.save_agent_thought( agent_thought_id=agent_thought_id, - tool_name=tool_call_name, - tool_input=tool_call_args, + tool_name="", + tool_input="", thought=response, - tool_invoke_meta={tool_call_name: tool_invoke_meta.to_dict()}, - observation={tool_call_name: tool_invoke_response}, + tool_invoke_meta={tr["tool_call_name"]: tr["meta"] for tr in tool_responses}, + observation={tr["tool_call_name"]: tr["tool_response"] for tr in tool_responses}, answer=str(tool_invoke_response or ""), messages_ids=message_file_ids, ) From 944ba64264da4ab047c2dc0e4beca2e6d86496f5 Mon Sep 17 00:00:00 2001 From: Cursx <33718736+Cursx@users.noreply.github.com> Date: Tue, 25 Nov 2025 17:22:13 +0800 Subject: [PATCH 13/43] Update api/core/agent/fc_agent_runner.py Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> --- api/core/agent/fc_agent_runner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/api/core/agent/fc_agent_runner.py b/api/core/agent/fc_agent_runner.py index c7058bab4ddee1..c48f7a10beed61 100644 --- a/api/core/agent/fc_agent_runner.py +++ b/api/core/agent/fc_agent_runner.py @@ -429,7 +429,7 @@ def _handle_direct_return( agent_thought_id=agent_thought_id, tool_name="", tool_input="", - thought=response, + thought="", tool_invoke_meta={tr["tool_call_name"]: tr["meta"] for tr in tool_responses}, observation={tr["tool_call_name"]: tr["tool_response"] for tr in tool_responses}, answer=str(tool_invoke_response or ""), From 5687103c0e7fae7c1f93dd85d599bc2a5ca0c452 Mon Sep 17 00:00:00 2001 From: Cursx <33718736+Cursx@users.noreply.github.com> Date: Tue, 25 Nov 2025 17:33:27 +0800 Subject: [PATCH 14/43] Update fc_agent_runner.py --- api/core/agent/fc_agent_runner.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/api/core/agent/fc_agent_runner.py b/api/core/agent/fc_agent_runner.py index c48f7a10beed61..98528ccb9e7527 100644 --- a/api/core/agent/fc_agent_runner.py +++ b/api/core/agent/fc_agent_runner.py @@ -282,7 +282,6 @@ def increase_usage(final_llm_usage_dict: dict[str, LLMUsage | None], usage: LLMU yield from self._handle_direct_return( agent_thought_id, tool_responses, - response or "", tool_invoke_response, message_file_ids, prompt_messages, @@ -419,7 +418,6 @@ def _handle_direct_return( self, agent_thought_id: str, tool_responses: list[dict[str, Any]], - response: str, tool_invoke_response: str | None, message_file_ids: list[str], prompt_messages: list[PromptMessage], From f164760dcbc570f287e9ebed8a2696fd66ae3590 Mon Sep 17 00:00:00 2001 From: Cursx <33718736+Cursx@users.noreply.github.com> Date: Wed, 26 Nov 2025 10:17:06 +0800 Subject: [PATCH 15/43] Update tool.py --- api/core/tools/workflow_as_tool/tool.py | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/api/core/tools/workflow_as_tool/tool.py b/api/core/tools/workflow_as_tool/tool.py index 57db271ac8e73e..fefde7fda84dd4 100644 --- a/api/core/tools/workflow_as_tool/tool.py +++ b/api/core/tools/workflow_as_tool/tool.py @@ -121,11 +121,24 @@ def _invoke( if isinstance(raw_flag, str): return_direct_flag = raw_flag.strip().lower() in {"true", "1", "yes", "y"} else: - return_direct_flag = bool(raw_flag) + try: + return_direct_flag = bool(raw_flag) + except Exception: + return_direct_flag = False self._latest_usage = self._derive_usage_from_result(data) - yield self.create_text_message(json.dumps(outputs, ensure_ascii=False)) + direct_text = None + if return_direct_flag and isinstance(outputs, dict): + v = outputs.get("text") + if isinstance(v, str) and v.strip(): + direct_text = v + + if direct_text is not None: + yield self.create_text_message(direct_text) + else: + yield self.create_text_message(json.dumps(outputs, ensure_ascii=False)) + yield self.create_json_message(outputs, suppress_output=True) if return_direct_flag: yield self.create_variable_message("return_direct", True) From ff108d4d765c86b14abd492d87385a9615dccfa8 Mon Sep 17 00:00:00 2001 From: Cursx <33718736+Cursx@users.noreply.github.com> Date: Wed, 26 Nov 2025 10:55:07 +0800 Subject: [PATCH 16/43] Update tool.py --- api/core/tools/workflow_as_tool/tool.py | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/api/core/tools/workflow_as_tool/tool.py b/api/core/tools/workflow_as_tool/tool.py index fefde7fda84dd4..950760f3e63b1a 100644 --- a/api/core/tools/workflow_as_tool/tool.py +++ b/api/core/tools/workflow_as_tool/tool.py @@ -114,17 +114,14 @@ def _invoke( for file in files: yield self.create_file_message(file) # type: ignore - # detect return_direct flag from workflow outputs + # detect return_direct flag from workflow outputs (strict boolean only) return_direct_flag = False if isinstance(outputs, dict) and "return_direct" in outputs: raw_flag = outputs.pop("return_direct") - if isinstance(raw_flag, str): - return_direct_flag = raw_flag.strip().lower() in {"true", "1", "yes", "y"} + if isinstance(raw_flag, bool): + return_direct_flag = raw_flag else: - try: - return_direct_flag = bool(raw_flag) - except Exception: - return_direct_flag = False + return_direct_flag = False self._latest_usage = self._derive_usage_from_result(data) From 32d08f721158e67a2f8013419dee7962a4b498dc Mon Sep 17 00:00:00 2001 From: Cursx <33718736+Cursx@users.noreply.github.com> Date: Wed, 26 Nov 2025 10:55:34 +0800 Subject: [PATCH 17/43] Update tool_engine.py --- api/core/tools/tool_engine.py | 21 ++++++++++++--------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/api/core/tools/tool_engine.py b/api/core/tools/tool_engine.py index 2ca5fd9ae2fd1d..b23d04824dd7d6 100644 --- a/api/core/tools/tool_engine.py +++ b/api/core/tools/tool_engine.py @@ -107,15 +107,18 @@ def message_callback( tool_messages=binary_files, agent_message=message, invoke_from=invoke_from, user_id=user_id ) - # detect return_direct signal from variable messages (short-circuit) - variable_messages = ( - cast(ToolInvokeMessage.VariableMessage, m.message) - for m in message_list if m.type == ToolInvokeMessage.MessageType.VARIABLE - ) - return_direct = any( - vm.variable_name == "return_direct" and bool(vm.variable_value) - for vm in variable_messages - ) + # detect return_direct signal from variable messages (strict boolean short-circuit) + return_direct = False + for m in message_list: + if m.type == ToolInvokeMessage.MessageType.VARIABLE: + variable = cast(ToolInvokeMessage.VariableMessage, m.message) + if ( + variable.variable_name == "return_direct" + and isinstance(variable.variable_value, bool) + and variable.variable_value is True + ): + return_direct = True + break plain_text = ToolEngine._convert_tool_response_to_str(message_list) From 81a95f564ef7c9043ac3f306c6438e7a6abe6b0b Mon Sep 17 00:00:00 2001 From: Cursx <33718736+Cursx@users.noreply.github.com> Date: Wed, 26 Nov 2025 11:03:21 +0800 Subject: [PATCH 18/43] Update tool_engine.py --- api/core/tools/tool_engine.py | 1 - 1 file changed, 1 deletion(-) diff --git a/api/core/tools/tool_engine.py b/api/core/tools/tool_engine.py index b23d04824dd7d6..7674730fbc2937 100644 --- a/api/core/tools/tool_engine.py +++ b/api/core/tools/tool_engine.py @@ -114,7 +114,6 @@ def message_callback( variable = cast(ToolInvokeMessage.VariableMessage, m.message) if ( variable.variable_name == "return_direct" - and isinstance(variable.variable_value, bool) and variable.variable_value is True ): return_direct = True From d237d8b189caaeaf7845e591ce291cdb1b39e0f2 Mon Sep 17 00:00:00 2001 From: Cursx <33718736+Cursx@users.noreply.github.com> Date: Wed, 26 Nov 2025 11:03:47 +0800 Subject: [PATCH 19/43] Update tool.py --- api/core/tools/workflow_as_tool/tool.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/api/core/tools/workflow_as_tool/tool.py b/api/core/tools/workflow_as_tool/tool.py index 950760f3e63b1a..c32538265a2b97 100644 --- a/api/core/tools/workflow_as_tool/tool.py +++ b/api/core/tools/workflow_as_tool/tool.py @@ -118,10 +118,8 @@ def _invoke( return_direct_flag = False if isinstance(outputs, dict) and "return_direct" in outputs: raw_flag = outputs.pop("return_direct") - if isinstance(raw_flag, bool): - return_direct_flag = raw_flag - else: - return_direct_flag = False + if raw_flag is True: + return_direct_flag = True self._latest_usage = self._derive_usage_from_result(data) From 3f7998689d4c70e9b40cf3ad0371f23a320a1ebe Mon Sep 17 00:00:00 2001 From: Cursx <33718736+Cursx@users.noreply.github.com> Date: Wed, 26 Nov 2025 13:52:06 +0800 Subject: [PATCH 20/43] Update fc_agent_runner.py --- api/core/agent/fc_agent_runner.py | 32 ++++++++++++++++--------------- 1 file changed, 17 insertions(+), 15 deletions(-) diff --git a/api/core/agent/fc_agent_runner.py b/api/core/agent/fc_agent_runner.py index 98528ccb9e7527..13c62787b9a95a 100644 --- a/api/core/agent/fc_agent_runner.py +++ b/api/core/agent/fc_agent_runner.py @@ -267,29 +267,30 @@ def increase_usage(final_llm_usage_dict: dict[str, LLMUsage | None], usage: LLMU tool_responses.append(tool_response) # check direct return flag direct_flag = (tool_invoke_meta.extra or {}).get("return_direct", False) + tool_response["direct_flag"] = direct_flag - if tool_response["tool_response"] is not None and not direct_flag: - self._current_thoughts.append( - ToolPromptMessage( - content=str(tool_response["tool_response"]), - tool_call_id=tool_call_id, - name=tool_call_name, - ) - ) - - if direct_flag: + if len(tool_responses) > 0: + all_direct = all(tr.get("direct_flag") is True for tr in tool_responses) + if all_direct: llm_final_usage = llm_usage.get("usage") or LLMUsage.empty_usage() yield from self._handle_direct_return( agent_thought_id, tool_responses, - tool_invoke_response, message_file_ids, prompt_messages, llm_final_usage, ) return - if len(tool_responses) > 0: + for tr in tool_responses: + if tr["tool_response"] is not None: + self._current_thoughts.append( + ToolPromptMessage( + content=str(tr["tool_response"]), + tool_call_id=tr["tool_call_id"], + name=tr["tool_call_name"], + ) + ) # save agent thought self.save_agent_thought( agent_thought_id=agent_thought_id, @@ -418,11 +419,13 @@ def _handle_direct_return( self, agent_thought_id: str, tool_responses: list[dict[str, Any]], - tool_invoke_response: str | None, message_file_ids: list[str], prompt_messages: list[PromptMessage], usage: LLMUsage, ) -> Generator[LLMResultChunk, None, None]: + final_answer = "\n".join( + [str(tr["tool_response"]) for tr in tool_responses if tr.get("tool_response") is not None] + ) self.save_agent_thought( agent_thought_id=agent_thought_id, tool_name="", @@ -430,13 +433,12 @@ def _handle_direct_return( thought="", tool_invoke_meta={tr["tool_call_name"]: tr["meta"] for tr in tool_responses}, observation={tr["tool_call_name"]: tr["tool_response"] for tr in tool_responses}, - answer=str(tool_invoke_response or ""), + answer=final_answer, messages_ids=message_file_ids, ) self.queue_manager.publish( QueueAgentThoughtEvent(agent_thought_id=agent_thought_id), PublishFrom.APPLICATION_MANAGER ) - final_answer = str(tool_invoke_response or "") yield from self._yield_final_answer(prompt_messages, final_answer, usage) def _init_system_message(self, prompt_template: str, prompt_messages: list[PromptMessage]) -> list[PromptMessage]: From 1c113b48c9cfeb69cb5d6cea6fcdcfd0420d0d4d Mon Sep 17 00:00:00 2001 From: Cursx <33718736+Cursx@users.noreply.github.com> Date: Wed, 26 Nov 2025 14:00:57 +0800 Subject: [PATCH 21/43] Update tool.py --- api/core/tools/workflow_as_tool/tool.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/api/core/tools/workflow_as_tool/tool.py b/api/core/tools/workflow_as_tool/tool.py index c32538265a2b97..d586aec6ac6d1f 100644 --- a/api/core/tools/workflow_as_tool/tool.py +++ b/api/core/tools/workflow_as_tool/tool.py @@ -126,7 +126,7 @@ def _invoke( direct_text = None if return_direct_flag and isinstance(outputs, dict): v = outputs.get("text") - if isinstance(v, str) and v.strip(): + if isinstance(v, str): direct_text = v if direct_text is not None: From 72e928857fe528464e045cfa59456dde90920a56 Mon Sep 17 00:00:00 2001 From: Cursx <33718736+Cursx@users.noreply.github.com> Date: Wed, 26 Nov 2025 14:28:59 +0800 Subject: [PATCH 22/43] Update cot_agent_runner.py --- api/core/agent/cot_agent_runner.py | 720 ++++++++++++++++------------- 1 file changed, 410 insertions(+), 310 deletions(-) diff --git a/api/core/agent/cot_agent_runner.py b/api/core/agent/cot_agent_runner.py index eea40821282c28..f95511bdf3e618 100644 --- a/api/core/agent/cot_agent_runner.py +++ b/api/core/agent/cot_agent_runner.py @@ -1,79 +1,63 @@ import json -from abc import ABC, abstractmethod -from collections.abc import Generator, Mapping, Sequence -from typing import Any +import logging +from collections.abc import Generator +from copy import deepcopy +from typing import Any, Union from core.agent.base_agent_runner import BaseAgentRunner -from core.agent.entities import AgentScratchpadUnit -from core.agent.output_parser.cot_output_parser import CotAgentOutputParser from core.app.apps.base_app_queue_manager import PublishFrom from core.app.entities.queue_entities import QueueAgentThoughtEvent, QueueMessageEndEvent, QueueMessageFileEvent -from core.model_runtime.entities.llm_entities import LLMResult, LLMResultChunk, LLMResultChunkDelta, LLMUsage -from core.model_runtime.entities.message_entities import ( +from core.file import file_manager +from core.model_runtime.entities import ( AssistantPromptMessage, + LLMResult, + LLMResultChunk, + LLMResultChunkDelta, + LLMUsage, PromptMessage, - PromptMessageTool, + PromptMessageContentType, + SystemPromptMessage, + TextPromptMessageContent, ToolPromptMessage, UserPromptMessage, ) -from core.ops.ops_trace_manager import TraceQueueManager +from core.model_runtime.entities.message_entities import ImagePromptMessageContent, PromptMessageContentUnionTypes from core.prompt.agent_history_prompt_transform import AgentHistoryPromptTransform -from core.tools.__base.tool import Tool from core.tools.entities.tool_entities import ToolInvokeMeta from core.tools.tool_engine import ToolEngine from models.model import Message +logger = logging.getLogger(__name__) -class CotAgentRunner(BaseAgentRunner, ABC): - _is_first_iteration = True - _ignore_observation_providers = ["wenxin"] - _historic_prompt_messages: list[PromptMessage] - _agent_scratchpad: list[AgentScratchpadUnit] - _instruction: str - _query: str - _prompt_messages_tools: Sequence[PromptMessageTool] - def run( - self, - message: Message, - query: str, - inputs: Mapping[str, str], - ) -> Generator: +class FunctionCallAgentRunner(BaseAgentRunner): + def run(self, message: Message, query: str, **kwargs: Any) -> Generator[LLMResultChunk, None, None]: """ - Run Cot agent application + Run FunctionCall agent application """ - + self.query = query app_generate_entity = self.application_generate_entity - self._repack_app_generate_entity(app_generate_entity) - self._init_react_state(query) - trace_manager = app_generate_entity.trace_manager + app_config = self.app_config + assert app_config is not None, "app_config is required" + assert app_config.agent is not None, "app_config.agent is required" - # check model mode - if "Observation" not in app_generate_entity.model_conf.stop: - if app_generate_entity.model_conf.provider not in self._ignore_observation_providers: - app_generate_entity.model_conf.stop.append("Observation") + # convert tools into ModelRuntime Tool format + tool_instances, prompt_messages_tools = self._init_prompt_tools() - app_config = self.app_config assert app_config.agent - # init instruction - inputs = inputs or {} - instruction = app_config.prompt_template.simple_prompt_template or "" - self._instruction = self._fill_in_inputs_from_external_data_tools(instruction, inputs) - iteration_step = 1 max_iteration_steps = min(app_config.agent.max_iteration, 99) + 1 - # convert tools into ModelRuntime Tool format - tool_instances, prompt_messages_tools = self._init_prompt_tools() - self._prompt_messages_tools = prompt_messages_tools - + # continue to run until there is not any tool call function_call_state = True llm_usage: dict[str, LLMUsage | None] = {"usage": None} final_answer = "" prompt_messages: list = [] # Initialize prompt_messages - agent_thought_id = "" # Initialize agent_thought_id + + # get tracing instance + trace_manager = app_generate_entity.trace_manager def increase_usage(final_llm_usage_dict: dict[str, LLMUsage | None], usage: LLMUsage): if not final_llm_usage_dict["usage"]: @@ -90,347 +74,463 @@ def increase_usage(final_llm_usage_dict: dict[str, LLMUsage | None], usage: LLMU model_instance = self.model_instance while function_call_state and iteration_step <= max_iteration_steps: - # continue to run until there is not any tool call function_call_state = False if iteration_step == max_iteration_steps: # the last iteration, remove all tools - self._prompt_messages_tools = [] + prompt_messages_tools = [] message_file_ids: list[str] = [] - agent_thought_id = self.create_agent_thought( message_id=message.id, message="", tool_name="", tool_input="", messages_ids=message_file_ids ) - if iteration_step > 1: - self.queue_manager.publish( - QueueAgentThoughtEvent(agent_thought_id=agent_thought_id), PublishFrom.APPLICATION_MANAGER - ) - # recalc llm max tokens prompt_messages = self._organize_prompt_messages() self.recalc_llm_max_tokens(self.model_config, prompt_messages) # invoke model - chunks = model_instance.invoke_llm( + chunks: Union[Generator[LLMResultChunk, None, None], LLMResult] = model_instance.invoke_llm( prompt_messages=prompt_messages, model_parameters=app_generate_entity.model_conf.parameters, - tools=[], + tools=prompt_messages_tools, stop=app_generate_entity.model_conf.stop, - stream=True, + stream=self.stream_tool_call, user=self.user_id, callbacks=[], ) - usage_dict: dict[str, LLMUsage | None] = {} - react_chunks = CotAgentOutputParser.handle_react_stream_output(chunks, usage_dict) - scratchpad = AgentScratchpadUnit( - agent_response="", - thought="", - action_str="", - observation="", - action=None, - ) + tool_calls: list[tuple[str, str, dict[str, Any]]] = [] + + # save full response + response = "" + + # save tool call names and inputs + tool_call_names = "" + tool_call_inputs = "" + + current_llm_usage = None + + if isinstance(chunks, Generator): + is_first_chunk = True + for chunk in chunks: + if is_first_chunk: + self.queue_manager.publish( + QueueAgentThoughtEvent(agent_thought_id=agent_thought_id), PublishFrom.APPLICATION_MANAGER + ) + is_first_chunk = False + # check if there is any tool call + if self.check_tool_calls(chunk): + function_call_state = True + tool_calls.extend(self.extract_tool_calls(chunk) or []) + tool_call_names = ";".join([tool_call[1] for tool_call in tool_calls]) + try: + tool_call_inputs = json.dumps( + {tool_call[1]: tool_call[2] for tool_call in tool_calls}, ensure_ascii=False + ) + except TypeError: + # fallback: force ASCII to handle non-serializable objects + tool_call_inputs = json.dumps({tool_call[1]: tool_call[2] for tool_call in tool_calls}) + + if chunk.delta.message and chunk.delta.message.content: + if isinstance(chunk.delta.message.content, list): + for content in chunk.delta.message.content: + response += content.data + else: + response += str(chunk.delta.message.content) + + if chunk.delta.usage: + increase_usage(llm_usage, chunk.delta.usage) + current_llm_usage = chunk.delta.usage + + yield chunk + else: + result = chunks + # check if there is any tool call + if self.check_blocking_tool_calls(result): + function_call_state = True + tool_calls.extend(self.extract_blocking_tool_calls(result) or []) + tool_call_names = ";".join([tool_call[1] for tool_call in tool_calls]) + try: + tool_call_inputs = json.dumps( + {tool_call[1]: tool_call[2] for tool_call in tool_calls}, ensure_ascii=False + ) + except TypeError: + # fallback: force ASCII to handle non-serializable objects + tool_call_inputs = json.dumps({tool_call[1]: tool_call[2] for tool_call in tool_calls}) + + if result.usage: + increase_usage(llm_usage, result.usage) + current_llm_usage = result.usage + + if result.message and result.message.content: + if isinstance(result.message.content, list): + for content in result.message.content: + response += content.data + else: + response += str(result.message.content) + + if not result.message.content: + result.message.content = "" - # publish agent thought if it's first iteration - if iteration_step == 1: self.queue_manager.publish( QueueAgentThoughtEvent(agent_thought_id=agent_thought_id), PublishFrom.APPLICATION_MANAGER ) - for chunk in react_chunks: - if isinstance(chunk, AgentScratchpadUnit.Action): - action = chunk - # detect action - assert scratchpad.agent_response is not None - scratchpad.agent_response += json.dumps(chunk.model_dump()) - scratchpad.action_str = json.dumps(chunk.model_dump()) - scratchpad.action = action - else: - assert scratchpad.agent_response is not None - scratchpad.agent_response += chunk - assert scratchpad.thought is not None - scratchpad.thought += chunk - yield LLMResultChunk( - model=self.model_config.model, - prompt_messages=prompt_messages, - system_fingerprint="", - delta=LLMResultChunkDelta(index=0, message=AssistantPromptMessage(content=chunk), usage=None), - ) - - assert scratchpad.thought is not None - scratchpad.thought = scratchpad.thought.strip() or "I am thinking about how to help you" - self._agent_scratchpad.append(scratchpad) + yield LLMResultChunk( + model=model_instance.model, + prompt_messages=result.prompt_messages, + system_fingerprint=result.system_fingerprint, + delta=LLMResultChunkDelta( + index=0, + message=result.message, + usage=result.usage, + ), + ) - # get llm usage - if "usage" in usage_dict: - if usage_dict["usage"] is not None: - increase_usage(llm_usage, usage_dict["usage"]) + assistant_message = AssistantPromptMessage(content="", tool_calls=[]) + if tool_calls: + assistant_message.tool_calls = [ + AssistantPromptMessage.ToolCall( + id=tool_call[0], + type="function", + function=AssistantPromptMessage.ToolCall.ToolCallFunction( + name=tool_call[1], arguments=json.dumps(tool_call[2], ensure_ascii=False) + ), + ) + for tool_call in tool_calls + ] else: - usage_dict["usage"] = LLMUsage.empty_usage() + assistant_message.content = response + self._current_thoughts.append(assistant_message) + + # save thought self.save_agent_thought( agent_thought_id=agent_thought_id, - tool_name=(scratchpad.action.action_name if scratchpad.action and not scratchpad.is_final() else ""), - tool_input={scratchpad.action.action_name: scratchpad.action.action_input} if scratchpad.action else {}, - tool_invoke_meta={}, - thought=scratchpad.thought or "", - observation="", - answer=scratchpad.agent_response or "", + tool_name=tool_call_names, + tool_input=tool_call_inputs, + thought=response, + tool_invoke_meta=None, + observation=None, + answer=response, messages_ids=[], - llm_usage=usage_dict["usage"], + llm_usage=current_llm_usage, + ) + self.queue_manager.publish( + QueueAgentThoughtEvent(agent_thought_id=agent_thought_id), PublishFrom.APPLICATION_MANAGER ) - if not scratchpad.is_final(): - self.queue_manager.publish( - QueueAgentThoughtEvent(agent_thought_id=agent_thought_id), PublishFrom.APPLICATION_MANAGER - ) - - if not scratchpad.action: - # failed to extract action, return final answer directly - final_answer = "" - else: - if scratchpad.action.action_name.lower() == "final answer": - # action is final answer, return final answer directly - try: - if isinstance(scratchpad.action.action_input, dict): - final_answer = json.dumps(scratchpad.action.action_input, ensure_ascii=False) - elif isinstance(scratchpad.action.action_input, str): - final_answer = scratchpad.action.action_input - else: - final_answer = f"{scratchpad.action.action_input}" - except TypeError: - final_answer = f"{scratchpad.action.action_input}" + final_answer += response + "\n" + + # call tools + tool_responses = [] + for tool_call_id, tool_call_name, tool_call_args in tool_calls: + tool_instance = tool_instances.get(tool_call_name) + if not tool_instance: + tool_response = { + "tool_call_id": tool_call_id, + "tool_call_name": tool_call_name, + "tool_response": f"there is not a tool named {tool_call_name}", + "meta": ToolInvokeMeta.error_instance(f"there is not a tool named {tool_call_name}").to_dict(), + } else: - # action is tool call, invoke tool - tool_invoke_response, tool_invoke_meta = self._handle_invoke_action( - action=scratchpad.action, - tool_instances=tool_instances, - message_file_ids=message_file_ids, + # invoke tool + tool_invoke_response, message_files, tool_invoke_meta = ToolEngine.agent_invoke( + tool=tool_instance, + tool_parameters=tool_call_args, + user_id=self.user_id, + tenant_id=self.tenant_id, + message=self.message, + invoke_from=self.application_generate_entity.invoke_from, + agent_tool_callback=self.agent_callback, trace_manager=trace_manager, + app_id=self.application_generate_entity.app_config.app_id, + message_id=self.message.id, + conversation_id=self.conversation.id, ) - scratchpad.observation = tool_invoke_response - scratchpad.agent_response = tool_invoke_response - - # detect direct return - direct_flag = (tool_invoke_meta.extra or {}).get("return_direct", False) - - self.save_agent_thought( - agent_thought_id=agent_thought_id, - tool_name=scratchpad.action.action_name, - tool_input={scratchpad.action.action_name: scratchpad.action.action_input}, - thought=scratchpad.thought or "", - observation={scratchpad.action.action_name: tool_invoke_response}, - tool_invoke_meta={scratchpad.action.action_name: tool_invoke_meta.to_dict()}, - answer=scratchpad.agent_response, - messages_ids=message_file_ids, - llm_usage=usage_dict["usage"], - ) - - self.queue_manager.publish( - QueueAgentThoughtEvent(agent_thought_id=agent_thought_id), PublishFrom.APPLICATION_MANAGER + # publish files + for message_file_id in message_files: + # publish message file + self.queue_manager.publish( + QueueMessageFileEvent(message_file_id=message_file_id), PublishFrom.APPLICATION_MANAGER + ) + # add message file ids + message_file_ids.append(message_file_id) + + tool_response = { + "tool_call_id": tool_call_id, + "tool_call_name": tool_call_name, + "tool_response": tool_invoke_response, + "meta": tool_invoke_meta.to_dict(), + } + + tool_responses.append(tool_response) + # check direct return flag + direct_flag = (tool_invoke_meta.extra or {}).get("return_direct", False) + tool_response["direct_flag"] = direct_flag + + if len(tool_responses) > 0: + all_direct = all(tr.get("direct_flag") is True for tr in tool_responses) + if all_direct: + llm_final_usage = llm_usage.get("usage") or LLMUsage.empty_usage() + yield from self._handle_direct_return( + agent_thought_id, + tool_responses, + message_file_ids, + prompt_messages, + llm_final_usage, ) + return + + for tr in tool_responses: + if tr["tool_response"] is not None: + self._current_thoughts.append( + ToolPromptMessage( + content=str(tr["tool_response"]), + tool_call_id=tr["tool_call_id"], + name=tr["tool_call_name"], + ) + ) + # save agent thought + self.save_agent_thought( + agent_thought_id=agent_thought_id, + tool_name="", + tool_input="", + thought="", + tool_invoke_meta={ + tool_response["tool_call_name"]: tool_response["meta"] for tool_response in tool_responses + }, + observation={ + tool_response["tool_call_name"]: tool_response["tool_response"] + for tool_response in tool_responses + }, + answer="", + messages_ids=message_file_ids, + ) + self.queue_manager.publish( + QueueAgentThoughtEvent(agent_thought_id=agent_thought_id), PublishFrom.APPLICATION_MANAGER + ) - if direct_flag: - final_answer = str(tool_invoke_response or "") - # keep function_call_state as False to end iterations - else: - function_call_state = True - - # update prompt tool message - for prompt_tool in self._prompt_messages_tools: - self.update_prompt_message_tool(tool_instances[prompt_tool.name], prompt_tool) + # update prompt tool + for prompt_tool in prompt_messages_tools: + self.update_prompt_message_tool(tool_instances[prompt_tool.name], prompt_tool) iteration_step += 1 - yield LLMResultChunk( - model=model_instance.model, - prompt_messages=prompt_messages, - delta=LLMResultChunkDelta( - index=0, message=AssistantPromptMessage(content=final_answer), usage=llm_usage["usage"] - ), - system_fingerprint="", + yield from self._yield_final_answer( + prompt_messages, + final_answer, + llm_usage.get("usage") or LLMUsage.empty_usage(), ) - # save agent thought - self.save_agent_thought( - agent_thought_id=agent_thought_id, - tool_name="", - tool_input={}, - tool_invoke_meta={}, - thought=final_answer, - observation={}, - answer=final_answer, - messages_ids=[], - ) - # publish end event + def check_tool_calls(self, llm_result_chunk: LLMResultChunk) -> bool: + """ + Check if there is any tool call in llm result chunk + """ + if llm_result_chunk.delta.message.tool_calls: + return True + return False + + def check_blocking_tool_calls(self, llm_result: LLMResult) -> bool: + """ + Check if there is any blocking tool call in llm result + """ + if llm_result.message.tool_calls: + return True + return False + + def extract_tool_calls(self, llm_result_chunk: LLMResultChunk) -> list[tuple[str, str, dict[str, Any]]]: + """ + Extract tool calls from llm result chunk + + Returns: + List[Tuple[str, str, Dict[str, Any]]]: [(tool_call_id, tool_call_name, tool_call_args)] + """ + tool_calls = [] + for prompt_message in llm_result_chunk.delta.message.tool_calls: + args = {} + if prompt_message.function.arguments != "": + args = json.loads(prompt_message.function.arguments) + + tool_calls.append( + ( + prompt_message.id, + prompt_message.function.name, + args, + ) + ) + + return tool_calls + + def extract_blocking_tool_calls(self, llm_result: LLMResult) -> list[tuple[str, str, dict[str, Any]]]: + """ + Extract blocking tool calls from llm result + + Returns: + List[Tuple[str, str, Dict[str, Any]]]: [(tool_call_id, tool_call_name, tool_call_args)] + """ + tool_calls = [] + for prompt_message in llm_result.message.tool_calls: + args = {} + if prompt_message.function.arguments != "": + args = json.loads(prompt_message.function.arguments) + + tool_calls.append( + ( + prompt_message.id, + prompt_message.function.name, + args, + ) + ) + + return tool_calls + + def _yield_final_answer( + self, + prompt_messages: list[PromptMessage], + final_answer: str, + usage: LLMUsage, + ) -> Generator[LLMResultChunk, None, None]: self.queue_manager.publish( QueueMessageEndEvent( llm_result=LLMResult( - model=model_instance.model, + model=self.model_instance.model, prompt_messages=prompt_messages, message=AssistantPromptMessage(content=final_answer), - usage=llm_usage["usage"] or LLMUsage.empty_usage(), + usage=usage, system_fingerprint="", ) ), PublishFrom.APPLICATION_MANAGER, ) - def _handle_invoke_action( + yield LLMResultChunk( + model=self.model_instance.model, + prompt_messages=prompt_messages, + system_fingerprint="", + delta=LLMResultChunkDelta( + index=0, + message=AssistantPromptMessage(content=final_answer), + usage=usage, + ), + ) + + def _handle_direct_return( self, - action: AgentScratchpadUnit.Action, - tool_instances: Mapping[str, Tool], + agent_thought_id: str, + tool_responses: list[dict[str, Any]], message_file_ids: list[str], - trace_manager: TraceQueueManager | None = None, - ) -> tuple[str, ToolInvokeMeta]: - """ - handle invoke action - :param action: action - :param tool_instances: tool instances - :param message_file_ids: message file ids - :param trace_manager: trace manager - :return: observation, meta - """ - # action is tool call, invoke tool - tool_call_name = action.action_name - tool_call_args = action.action_input - tool_instance = tool_instances.get(tool_call_name) - - if not tool_instance: - answer = f"there is not a tool named {tool_call_name}" - return answer, ToolInvokeMeta.error_instance(answer) - - if isinstance(tool_call_args, str): - try: - tool_call_args = json.loads(tool_call_args) - except json.JSONDecodeError: - pass - - # invoke tool - tool_invoke_response, message_files, tool_invoke_meta = ToolEngine.agent_invoke( - tool=tool_instance, - tool_parameters=tool_call_args, - user_id=self.user_id, - tenant_id=self.tenant_id, - message=self.message, - invoke_from=self.application_generate_entity.invoke_from, - agent_tool_callback=self.agent_callback, - trace_manager=trace_manager, + prompt_messages: list[PromptMessage], + usage: LLMUsage, + ) -> Generator[LLMResultChunk, None, None]: + final_answer = "\n".join( + [str(tr["tool_response"]) for tr in tool_responses if tr.get("tool_response") is not None] ) + tool_invoke_meta_agg: dict[str, list[Any]] = {} + observation_agg: dict[str, list[Any]] = {} + for tr in tool_responses: + tool_invoke_meta_agg.setdefault(tr["tool_call_name"], []).append(tr["meta"]) + observation_agg.setdefault(tr["tool_call_name"], []).append(tr["tool_response"]) + tool_invoke_meta = {k: (v[0] if len(v) == 1 else v) for k, v in tool_invoke_meta_agg.items()} + observation = {k: (v[0] if len(v) == 1 else v) for k, v in observation_agg.items()} + self.save_agent_thought( + agent_thought_id=agent_thought_id, + tool_name="", + tool_input="", + thought="", + tool_invoke_meta=tool_invoke_meta, + observation=observation, + answer=final_answer, + messages_ids=message_file_ids, + ) + self.queue_manager.publish( + QueueAgentThoughtEvent(agent_thought_id=agent_thought_id), PublishFrom.APPLICATION_MANAGER + ) + yield from self._yield_final_answer(prompt_messages, final_answer, usage) - # publish files - for message_file_id in message_files: - # publish message file - self.queue_manager.publish( - QueueMessageFileEvent(message_file_id=message_file_id), PublishFrom.APPLICATION_MANAGER - ) - # add message file ids - message_file_ids.append(message_file_id) - - return tool_invoke_response, tool_invoke_meta - - def _convert_dict_to_action(self, action: dict) -> AgentScratchpadUnit.Action: - """ - convert dict to action - """ - return AgentScratchpadUnit.Action(action_name=action["action"], action_input=action["action_input"]) - - def _fill_in_inputs_from_external_data_tools(self, instruction: str, inputs: Mapping[str, Any]) -> str: + def _init_system_message(self, prompt_template: str, prompt_messages: list[PromptMessage]) -> list[PromptMessage]: """ - fill in inputs from external data tools + Initialize system message """ - for key, value in inputs.items(): - try: - instruction = instruction.replace(f"{{{{{key}}}}}", str(value)) - except Exception: - continue + if not prompt_messages and prompt_template: + return [ + SystemPromptMessage(content=prompt_template), + ] - return instruction + if prompt_messages and not isinstance(prompt_messages[0], SystemPromptMessage) and prompt_template: + prompt_messages.insert(0, SystemPromptMessage(content=prompt_template)) - def _init_react_state(self, query): - """ - init agent scratchpad - """ - self._query = query - self._agent_scratchpad = [] - self._historic_prompt_messages = self._organize_historic_prompt_messages() + return prompt_messages or [] - @abstractmethod - def _organize_prompt_messages(self) -> list[PromptMessage]: + def _organize_user_query(self, query: str, prompt_messages: list[PromptMessage]) -> list[PromptMessage]: """ - organize prompt messages + Organize user query """ + if self.files: + # get image detail config + image_detail_config = ( + self.application_generate_entity.file_upload_config.image_config.detail + if ( + self.application_generate_entity.file_upload_config + and self.application_generate_entity.file_upload_config.image_config + ) + else None + ) + image_detail_config = image_detail_config or ImagePromptMessageContent.DETAIL.LOW + + prompt_message_contents: list[PromptMessageContentUnionTypes] = [] + for file in self.files: + prompt_message_contents.append( + file_manager.to_prompt_message_content( + file, + image_detail_config=image_detail_config, + ) + ) + prompt_message_contents.append(TextPromptMessageContent(data=query)) - def _format_assistant_message(self, agent_scratchpad: list[AgentScratchpadUnit]) -> str: - """ - format assistant message - """ - message = "" - for scratchpad in agent_scratchpad: - if scratchpad.is_final(): - message += f"Final Answer: {scratchpad.agent_response}" - else: - message += f"Thought: {scratchpad.thought}\n\n" - if scratchpad.action_str: - message += f"Action: {scratchpad.action_str}\n\n" - if scratchpad.observation: - message += f"Observation: {scratchpad.observation}\n\n" + prompt_messages.append(UserPromptMessage(content=prompt_message_contents)) + else: + prompt_messages.append(UserPromptMessage(content=query)) - return message + return prompt_messages - def _organize_historic_prompt_messages( - self, current_session_messages: list[PromptMessage] | None = None - ) -> list[PromptMessage]: + def _clear_user_prompt_image_messages(self, prompt_messages: list[PromptMessage]) -> list[PromptMessage]: """ - organize historic prompt messages + As for now, gpt supports both fc and vision at the first iteration. + We need to remove the image messages from the prompt messages at the first iteration. """ - result: list[PromptMessage] = [] - scratchpads: list[AgentScratchpadUnit] = [] - current_scratchpad: AgentScratchpadUnit | None = None - - for message in self.history_prompt_messages: - if isinstance(message, AssistantPromptMessage): - if not current_scratchpad: - assert isinstance(message.content, str) - current_scratchpad = AgentScratchpadUnit( - agent_response=message.content, - thought=message.content or "I am thinking about how to help you", - action_str="", - action=None, - observation=None, + prompt_messages = deepcopy(prompt_messages) + + for prompt_message in prompt_messages: + if isinstance(prompt_message, UserPromptMessage): + if isinstance(prompt_message.content, list): + prompt_message.content = "\n".join( + [ + content.data + if content.type == PromptMessageContentType.TEXT + else "[image]" + if content.type == PromptMessageContentType.IMAGE + else "[file]" + for content in prompt_message.content + ] ) - scratchpads.append(current_scratchpad) - if message.tool_calls: - try: - current_scratchpad.action = AgentScratchpadUnit.Action( - action_name=message.tool_calls[0].function.name, - action_input=json.loads(message.tool_calls[0].function.arguments), - ) - current_scratchpad.action_str = json.dumps(current_scratchpad.action.to_dict()) - except: - pass - elif isinstance(message, ToolPromptMessage): - if current_scratchpad: - assert isinstance(message.content, str) - current_scratchpad.observation = message.content - else: - raise NotImplementedError("expected str type") - elif isinstance(message, UserPromptMessage): - if scratchpads: - result.append(AssistantPromptMessage(content=self._format_assistant_message(scratchpads))) - scratchpads = [] - current_scratchpad = None - result.append(message) + return prompt_messages - if scratchpads: - result.append(AssistantPromptMessage(content=self._format_assistant_message(scratchpads))) + def _organize_prompt_messages(self): + prompt_template = self.app_config.prompt_template.simple_prompt_template or "" + self.history_prompt_messages = self._init_system_message(prompt_template, self.history_prompt_messages) + query_prompt_messages = self._organize_user_query(self.query or "", []) - historic_prompts = AgentHistoryPromptTransform( + self.history_prompt_messages = AgentHistoryPromptTransform( model_config=self.model_config, - prompt_messages=current_session_messages or [], - history_messages=result, + prompt_messages=[*query_prompt_messages, *self._current_thoughts], + history_messages=self.history_prompt_messages, memory=self.memory, ).get_prompt() - return historic_prompts + + prompt_messages = [*self.history_prompt_messages, *query_prompt_messages, *self._current_thoughts] + if len(self._current_thoughts) != 0: + # clear messages after the first iteration + prompt_messages = self._clear_user_prompt_image_messages(prompt_messages) + return prompt_messages From 8fe4fbcef757696296ff7a26fe5cd38dcaa70325 Mon Sep 17 00:00:00 2001 From: Cursx <33718736+Cursx@users.noreply.github.com> Date: Wed, 26 Nov 2025 14:31:09 +0800 Subject: [PATCH 23/43] Update cot_agent_runner.py --- api/core/agent/cot_agent_runner.py | 720 +++++++++++++---------------- 1 file changed, 310 insertions(+), 410 deletions(-) diff --git a/api/core/agent/cot_agent_runner.py b/api/core/agent/cot_agent_runner.py index f95511bdf3e618..eea40821282c28 100644 --- a/api/core/agent/cot_agent_runner.py +++ b/api/core/agent/cot_agent_runner.py @@ -1,63 +1,79 @@ import json -import logging -from collections.abc import Generator -from copy import deepcopy -from typing import Any, Union +from abc import ABC, abstractmethod +from collections.abc import Generator, Mapping, Sequence +from typing import Any from core.agent.base_agent_runner import BaseAgentRunner +from core.agent.entities import AgentScratchpadUnit +from core.agent.output_parser.cot_output_parser import CotAgentOutputParser from core.app.apps.base_app_queue_manager import PublishFrom from core.app.entities.queue_entities import QueueAgentThoughtEvent, QueueMessageEndEvent, QueueMessageFileEvent -from core.file import file_manager -from core.model_runtime.entities import ( +from core.model_runtime.entities.llm_entities import LLMResult, LLMResultChunk, LLMResultChunkDelta, LLMUsage +from core.model_runtime.entities.message_entities import ( AssistantPromptMessage, - LLMResult, - LLMResultChunk, - LLMResultChunkDelta, - LLMUsage, PromptMessage, - PromptMessageContentType, - SystemPromptMessage, - TextPromptMessageContent, + PromptMessageTool, ToolPromptMessage, UserPromptMessage, ) -from core.model_runtime.entities.message_entities import ImagePromptMessageContent, PromptMessageContentUnionTypes +from core.ops.ops_trace_manager import TraceQueueManager from core.prompt.agent_history_prompt_transform import AgentHistoryPromptTransform +from core.tools.__base.tool import Tool from core.tools.entities.tool_entities import ToolInvokeMeta from core.tools.tool_engine import ToolEngine from models.model import Message -logger = logging.getLogger(__name__) +class CotAgentRunner(BaseAgentRunner, ABC): + _is_first_iteration = True + _ignore_observation_providers = ["wenxin"] + _historic_prompt_messages: list[PromptMessage] + _agent_scratchpad: list[AgentScratchpadUnit] + _instruction: str + _query: str + _prompt_messages_tools: Sequence[PromptMessageTool] -class FunctionCallAgentRunner(BaseAgentRunner): - def run(self, message: Message, query: str, **kwargs: Any) -> Generator[LLMResultChunk, None, None]: + def run( + self, + message: Message, + query: str, + inputs: Mapping[str, str], + ) -> Generator: """ - Run FunctionCall agent application + Run Cot agent application """ - self.query = query + app_generate_entity = self.application_generate_entity + self._repack_app_generate_entity(app_generate_entity) + self._init_react_state(query) - app_config = self.app_config - assert app_config is not None, "app_config is required" - assert app_config.agent is not None, "app_config.agent is required" + trace_manager = app_generate_entity.trace_manager - # convert tools into ModelRuntime Tool format - tool_instances, prompt_messages_tools = self._init_prompt_tools() + # check model mode + if "Observation" not in app_generate_entity.model_conf.stop: + if app_generate_entity.model_conf.provider not in self._ignore_observation_providers: + app_generate_entity.model_conf.stop.append("Observation") + app_config = self.app_config assert app_config.agent + # init instruction + inputs = inputs or {} + instruction = app_config.prompt_template.simple_prompt_template or "" + self._instruction = self._fill_in_inputs_from_external_data_tools(instruction, inputs) + iteration_step = 1 max_iteration_steps = min(app_config.agent.max_iteration, 99) + 1 - # continue to run until there is not any tool call + # convert tools into ModelRuntime Tool format + tool_instances, prompt_messages_tools = self._init_prompt_tools() + self._prompt_messages_tools = prompt_messages_tools + function_call_state = True llm_usage: dict[str, LLMUsage | None] = {"usage": None} final_answer = "" prompt_messages: list = [] # Initialize prompt_messages - - # get tracing instance - trace_manager = app_generate_entity.trace_manager + agent_thought_id = "" # Initialize agent_thought_id def increase_usage(final_llm_usage_dict: dict[str, LLMUsage | None], usage: LLMUsage): if not final_llm_usage_dict["usage"]: @@ -74,463 +90,347 @@ def increase_usage(final_llm_usage_dict: dict[str, LLMUsage | None], usage: LLMU model_instance = self.model_instance while function_call_state and iteration_step <= max_iteration_steps: + # continue to run until there is not any tool call function_call_state = False if iteration_step == max_iteration_steps: # the last iteration, remove all tools - prompt_messages_tools = [] + self._prompt_messages_tools = [] message_file_ids: list[str] = [] + agent_thought_id = self.create_agent_thought( message_id=message.id, message="", tool_name="", tool_input="", messages_ids=message_file_ids ) + if iteration_step > 1: + self.queue_manager.publish( + QueueAgentThoughtEvent(agent_thought_id=agent_thought_id), PublishFrom.APPLICATION_MANAGER + ) + # recalc llm max tokens prompt_messages = self._organize_prompt_messages() self.recalc_llm_max_tokens(self.model_config, prompt_messages) # invoke model - chunks: Union[Generator[LLMResultChunk, None, None], LLMResult] = model_instance.invoke_llm( + chunks = model_instance.invoke_llm( prompt_messages=prompt_messages, model_parameters=app_generate_entity.model_conf.parameters, - tools=prompt_messages_tools, + tools=[], stop=app_generate_entity.model_conf.stop, - stream=self.stream_tool_call, + stream=True, user=self.user_id, callbacks=[], ) - tool_calls: list[tuple[str, str, dict[str, Any]]] = [] - - # save full response - response = "" - - # save tool call names and inputs - tool_call_names = "" - tool_call_inputs = "" - - current_llm_usage = None - - if isinstance(chunks, Generator): - is_first_chunk = True - for chunk in chunks: - if is_first_chunk: - self.queue_manager.publish( - QueueAgentThoughtEvent(agent_thought_id=agent_thought_id), PublishFrom.APPLICATION_MANAGER - ) - is_first_chunk = False - # check if there is any tool call - if self.check_tool_calls(chunk): - function_call_state = True - tool_calls.extend(self.extract_tool_calls(chunk) or []) - tool_call_names = ";".join([tool_call[1] for tool_call in tool_calls]) - try: - tool_call_inputs = json.dumps( - {tool_call[1]: tool_call[2] for tool_call in tool_calls}, ensure_ascii=False - ) - except TypeError: - # fallback: force ASCII to handle non-serializable objects - tool_call_inputs = json.dumps({tool_call[1]: tool_call[2] for tool_call in tool_calls}) - - if chunk.delta.message and chunk.delta.message.content: - if isinstance(chunk.delta.message.content, list): - for content in chunk.delta.message.content: - response += content.data - else: - response += str(chunk.delta.message.content) - - if chunk.delta.usage: - increase_usage(llm_usage, chunk.delta.usage) - current_llm_usage = chunk.delta.usage - - yield chunk - else: - result = chunks - # check if there is any tool call - if self.check_blocking_tool_calls(result): - function_call_state = True - tool_calls.extend(self.extract_blocking_tool_calls(result) or []) - tool_call_names = ";".join([tool_call[1] for tool_call in tool_calls]) - try: - tool_call_inputs = json.dumps( - {tool_call[1]: tool_call[2] for tool_call in tool_calls}, ensure_ascii=False - ) - except TypeError: - # fallback: force ASCII to handle non-serializable objects - tool_call_inputs = json.dumps({tool_call[1]: tool_call[2] for tool_call in tool_calls}) - - if result.usage: - increase_usage(llm_usage, result.usage) - current_llm_usage = result.usage - - if result.message and result.message.content: - if isinstance(result.message.content, list): - for content in result.message.content: - response += content.data - else: - response += str(result.message.content) - - if not result.message.content: - result.message.content = "" + usage_dict: dict[str, LLMUsage | None] = {} + react_chunks = CotAgentOutputParser.handle_react_stream_output(chunks, usage_dict) + scratchpad = AgentScratchpadUnit( + agent_response="", + thought="", + action_str="", + observation="", + action=None, + ) + # publish agent thought if it's first iteration + if iteration_step == 1: self.queue_manager.publish( QueueAgentThoughtEvent(agent_thought_id=agent_thought_id), PublishFrom.APPLICATION_MANAGER ) - yield LLMResultChunk( - model=model_instance.model, - prompt_messages=result.prompt_messages, - system_fingerprint=result.system_fingerprint, - delta=LLMResultChunkDelta( - index=0, - message=result.message, - usage=result.usage, - ), - ) - - assistant_message = AssistantPromptMessage(content="", tool_calls=[]) - if tool_calls: - assistant_message.tool_calls = [ - AssistantPromptMessage.ToolCall( - id=tool_call[0], - type="function", - function=AssistantPromptMessage.ToolCall.ToolCallFunction( - name=tool_call[1], arguments=json.dumps(tool_call[2], ensure_ascii=False) - ), + for chunk in react_chunks: + if isinstance(chunk, AgentScratchpadUnit.Action): + action = chunk + # detect action + assert scratchpad.agent_response is not None + scratchpad.agent_response += json.dumps(chunk.model_dump()) + scratchpad.action_str = json.dumps(chunk.model_dump()) + scratchpad.action = action + else: + assert scratchpad.agent_response is not None + scratchpad.agent_response += chunk + assert scratchpad.thought is not None + scratchpad.thought += chunk + yield LLMResultChunk( + model=self.model_config.model, + prompt_messages=prompt_messages, + system_fingerprint="", + delta=LLMResultChunkDelta(index=0, message=AssistantPromptMessage(content=chunk), usage=None), ) - for tool_call in tool_calls - ] - else: - assistant_message.content = response - self._current_thoughts.append(assistant_message) + assert scratchpad.thought is not None + scratchpad.thought = scratchpad.thought.strip() or "I am thinking about how to help you" + self._agent_scratchpad.append(scratchpad) + + # get llm usage + if "usage" in usage_dict: + if usage_dict["usage"] is not None: + increase_usage(llm_usage, usage_dict["usage"]) + else: + usage_dict["usage"] = LLMUsage.empty_usage() - # save thought self.save_agent_thought( agent_thought_id=agent_thought_id, - tool_name=tool_call_names, - tool_input=tool_call_inputs, - thought=response, - tool_invoke_meta=None, - observation=None, - answer=response, + tool_name=(scratchpad.action.action_name if scratchpad.action and not scratchpad.is_final() else ""), + tool_input={scratchpad.action.action_name: scratchpad.action.action_input} if scratchpad.action else {}, + tool_invoke_meta={}, + thought=scratchpad.thought or "", + observation="", + answer=scratchpad.agent_response or "", messages_ids=[], - llm_usage=current_llm_usage, - ) - self.queue_manager.publish( - QueueAgentThoughtEvent(agent_thought_id=agent_thought_id), PublishFrom.APPLICATION_MANAGER + llm_usage=usage_dict["usage"], ) - final_answer += response + "\n" - - # call tools - tool_responses = [] - for tool_call_id, tool_call_name, tool_call_args in tool_calls: - tool_instance = tool_instances.get(tool_call_name) - if not tool_instance: - tool_response = { - "tool_call_id": tool_call_id, - "tool_call_name": tool_call_name, - "tool_response": f"there is not a tool named {tool_call_name}", - "meta": ToolInvokeMeta.error_instance(f"there is not a tool named {tool_call_name}").to_dict(), - } - else: - # invoke tool - tool_invoke_response, message_files, tool_invoke_meta = ToolEngine.agent_invoke( - tool=tool_instance, - tool_parameters=tool_call_args, - user_id=self.user_id, - tenant_id=self.tenant_id, - message=self.message, - invoke_from=self.application_generate_entity.invoke_from, - agent_tool_callback=self.agent_callback, - trace_manager=trace_manager, - app_id=self.application_generate_entity.app_config.app_id, - message_id=self.message.id, - conversation_id=self.conversation.id, - ) - # publish files - for message_file_id in message_files: - # publish message file - self.queue_manager.publish( - QueueMessageFileEvent(message_file_id=message_file_id), PublishFrom.APPLICATION_MANAGER - ) - # add message file ids - message_file_ids.append(message_file_id) - - tool_response = { - "tool_call_id": tool_call_id, - "tool_call_name": tool_call_name, - "tool_response": tool_invoke_response, - "meta": tool_invoke_meta.to_dict(), - } - - tool_responses.append(tool_response) - # check direct return flag - direct_flag = (tool_invoke_meta.extra or {}).get("return_direct", False) - tool_response["direct_flag"] = direct_flag - - if len(tool_responses) > 0: - all_direct = all(tr.get("direct_flag") is True for tr in tool_responses) - if all_direct: - llm_final_usage = llm_usage.get("usage") or LLMUsage.empty_usage() - yield from self._handle_direct_return( - agent_thought_id, - tool_responses, - message_file_ids, - prompt_messages, - llm_final_usage, - ) - return - - for tr in tool_responses: - if tr["tool_response"] is not None: - self._current_thoughts.append( - ToolPromptMessage( - content=str(tr["tool_response"]), - tool_call_id=tr["tool_call_id"], - name=tr["tool_call_name"], - ) - ) - # save agent thought - self.save_agent_thought( - agent_thought_id=agent_thought_id, - tool_name="", - tool_input="", - thought="", - tool_invoke_meta={ - tool_response["tool_call_name"]: tool_response["meta"] for tool_response in tool_responses - }, - observation={ - tool_response["tool_call_name"]: tool_response["tool_response"] - for tool_response in tool_responses - }, - answer="", - messages_ids=message_file_ids, - ) + if not scratchpad.is_final(): self.queue_manager.publish( QueueAgentThoughtEvent(agent_thought_id=agent_thought_id), PublishFrom.APPLICATION_MANAGER ) - # update prompt tool - for prompt_tool in prompt_messages_tools: - self.update_prompt_message_tool(tool_instances[prompt_tool.name], prompt_tool) - - iteration_step += 1 - - yield from self._yield_final_answer( - prompt_messages, - final_answer, - llm_usage.get("usage") or LLMUsage.empty_usage(), - ) - - def check_tool_calls(self, llm_result_chunk: LLMResultChunk) -> bool: - """ - Check if there is any tool call in llm result chunk - """ - if llm_result_chunk.delta.message.tool_calls: - return True - return False - - def check_blocking_tool_calls(self, llm_result: LLMResult) -> bool: - """ - Check if there is any blocking tool call in llm result - """ - if llm_result.message.tool_calls: - return True - return False - - def extract_tool_calls(self, llm_result_chunk: LLMResultChunk) -> list[tuple[str, str, dict[str, Any]]]: - """ - Extract tool calls from llm result chunk + if not scratchpad.action: + # failed to extract action, return final answer directly + final_answer = "" + else: + if scratchpad.action.action_name.lower() == "final answer": + # action is final answer, return final answer directly + try: + if isinstance(scratchpad.action.action_input, dict): + final_answer = json.dumps(scratchpad.action.action_input, ensure_ascii=False) + elif isinstance(scratchpad.action.action_input, str): + final_answer = scratchpad.action.action_input + else: + final_answer = f"{scratchpad.action.action_input}" + except TypeError: + final_answer = f"{scratchpad.action.action_input}" + else: + # action is tool call, invoke tool + tool_invoke_response, tool_invoke_meta = self._handle_invoke_action( + action=scratchpad.action, + tool_instances=tool_instances, + message_file_ids=message_file_ids, + trace_manager=trace_manager, + ) + scratchpad.observation = tool_invoke_response + scratchpad.agent_response = tool_invoke_response + + # detect direct return + direct_flag = (tool_invoke_meta.extra or {}).get("return_direct", False) + + self.save_agent_thought( + agent_thought_id=agent_thought_id, + tool_name=scratchpad.action.action_name, + tool_input={scratchpad.action.action_name: scratchpad.action.action_input}, + thought=scratchpad.thought or "", + observation={scratchpad.action.action_name: tool_invoke_response}, + tool_invoke_meta={scratchpad.action.action_name: tool_invoke_meta.to_dict()}, + answer=scratchpad.agent_response, + messages_ids=message_file_ids, + llm_usage=usage_dict["usage"], + ) - Returns: - List[Tuple[str, str, Dict[str, Any]]]: [(tool_call_id, tool_call_name, tool_call_args)] - """ - tool_calls = [] - for prompt_message in llm_result_chunk.delta.message.tool_calls: - args = {} - if prompt_message.function.arguments != "": - args = json.loads(prompt_message.function.arguments) - - tool_calls.append( - ( - prompt_message.id, - prompt_message.function.name, - args, - ) - ) + self.queue_manager.publish( + QueueAgentThoughtEvent(agent_thought_id=agent_thought_id), PublishFrom.APPLICATION_MANAGER + ) - return tool_calls + if direct_flag: + final_answer = str(tool_invoke_response or "") + # keep function_call_state as False to end iterations + else: + function_call_state = True - def extract_blocking_tool_calls(self, llm_result: LLMResult) -> list[tuple[str, str, dict[str, Any]]]: - """ - Extract blocking tool calls from llm result + # update prompt tool message + for prompt_tool in self._prompt_messages_tools: + self.update_prompt_message_tool(tool_instances[prompt_tool.name], prompt_tool) - Returns: - List[Tuple[str, str, Dict[str, Any]]]: [(tool_call_id, tool_call_name, tool_call_args)] - """ - tool_calls = [] - for prompt_message in llm_result.message.tool_calls: - args = {} - if prompt_message.function.arguments != "": - args = json.loads(prompt_message.function.arguments) - - tool_calls.append( - ( - prompt_message.id, - prompt_message.function.name, - args, - ) - ) + iteration_step += 1 - return tool_calls + yield LLMResultChunk( + model=model_instance.model, + prompt_messages=prompt_messages, + delta=LLMResultChunkDelta( + index=0, message=AssistantPromptMessage(content=final_answer), usage=llm_usage["usage"] + ), + system_fingerprint="", + ) - def _yield_final_answer( - self, - prompt_messages: list[PromptMessage], - final_answer: str, - usage: LLMUsage, - ) -> Generator[LLMResultChunk, None, None]: + # save agent thought + self.save_agent_thought( + agent_thought_id=agent_thought_id, + tool_name="", + tool_input={}, + tool_invoke_meta={}, + thought=final_answer, + observation={}, + answer=final_answer, + messages_ids=[], + ) + # publish end event self.queue_manager.publish( QueueMessageEndEvent( llm_result=LLMResult( - model=self.model_instance.model, + model=model_instance.model, prompt_messages=prompt_messages, message=AssistantPromptMessage(content=final_answer), - usage=usage, + usage=llm_usage["usage"] or LLMUsage.empty_usage(), system_fingerprint="", ) ), PublishFrom.APPLICATION_MANAGER, ) - yield LLMResultChunk( - model=self.model_instance.model, - prompt_messages=prompt_messages, - system_fingerprint="", - delta=LLMResultChunkDelta( - index=0, - message=AssistantPromptMessage(content=final_answer), - usage=usage, - ), - ) - - def _handle_direct_return( + def _handle_invoke_action( self, - agent_thought_id: str, - tool_responses: list[dict[str, Any]], + action: AgentScratchpadUnit.Action, + tool_instances: Mapping[str, Tool], message_file_ids: list[str], - prompt_messages: list[PromptMessage], - usage: LLMUsage, - ) -> Generator[LLMResultChunk, None, None]: - final_answer = "\n".join( - [str(tr["tool_response"]) for tr in tool_responses if tr.get("tool_response") is not None] - ) - tool_invoke_meta_agg: dict[str, list[Any]] = {} - observation_agg: dict[str, list[Any]] = {} - for tr in tool_responses: - tool_invoke_meta_agg.setdefault(tr["tool_call_name"], []).append(tr["meta"]) - observation_agg.setdefault(tr["tool_call_name"], []).append(tr["tool_response"]) - tool_invoke_meta = {k: (v[0] if len(v) == 1 else v) for k, v in tool_invoke_meta_agg.items()} - observation = {k: (v[0] if len(v) == 1 else v) for k, v in observation_agg.items()} - self.save_agent_thought( - agent_thought_id=agent_thought_id, - tool_name="", - tool_input="", - thought="", - tool_invoke_meta=tool_invoke_meta, - observation=observation, - answer=final_answer, - messages_ids=message_file_ids, - ) - self.queue_manager.publish( - QueueAgentThoughtEvent(agent_thought_id=agent_thought_id), PublishFrom.APPLICATION_MANAGER + trace_manager: TraceQueueManager | None = None, + ) -> tuple[str, ToolInvokeMeta]: + """ + handle invoke action + :param action: action + :param tool_instances: tool instances + :param message_file_ids: message file ids + :param trace_manager: trace manager + :return: observation, meta + """ + # action is tool call, invoke tool + tool_call_name = action.action_name + tool_call_args = action.action_input + tool_instance = tool_instances.get(tool_call_name) + + if not tool_instance: + answer = f"there is not a tool named {tool_call_name}" + return answer, ToolInvokeMeta.error_instance(answer) + + if isinstance(tool_call_args, str): + try: + tool_call_args = json.loads(tool_call_args) + except json.JSONDecodeError: + pass + + # invoke tool + tool_invoke_response, message_files, tool_invoke_meta = ToolEngine.agent_invoke( + tool=tool_instance, + tool_parameters=tool_call_args, + user_id=self.user_id, + tenant_id=self.tenant_id, + message=self.message, + invoke_from=self.application_generate_entity.invoke_from, + agent_tool_callback=self.agent_callback, + trace_manager=trace_manager, ) - yield from self._yield_final_answer(prompt_messages, final_answer, usage) - def _init_system_message(self, prompt_template: str, prompt_messages: list[PromptMessage]) -> list[PromptMessage]: + # publish files + for message_file_id in message_files: + # publish message file + self.queue_manager.publish( + QueueMessageFileEvent(message_file_id=message_file_id), PublishFrom.APPLICATION_MANAGER + ) + # add message file ids + message_file_ids.append(message_file_id) + + return tool_invoke_response, tool_invoke_meta + + def _convert_dict_to_action(self, action: dict) -> AgentScratchpadUnit.Action: """ - Initialize system message + convert dict to action """ - if not prompt_messages and prompt_template: - return [ - SystemPromptMessage(content=prompt_template), - ] + return AgentScratchpadUnit.Action(action_name=action["action"], action_input=action["action_input"]) - if prompt_messages and not isinstance(prompt_messages[0], SystemPromptMessage) and prompt_template: - prompt_messages.insert(0, SystemPromptMessage(content=prompt_template)) + def _fill_in_inputs_from_external_data_tools(self, instruction: str, inputs: Mapping[str, Any]) -> str: + """ + fill in inputs from external data tools + """ + for key, value in inputs.items(): + try: + instruction = instruction.replace(f"{{{{{key}}}}}", str(value)) + except Exception: + continue - return prompt_messages or [] + return instruction - def _organize_user_query(self, query: str, prompt_messages: list[PromptMessage]) -> list[PromptMessage]: + def _init_react_state(self, query): """ - Organize user query + init agent scratchpad + """ + self._query = query + self._agent_scratchpad = [] + self._historic_prompt_messages = self._organize_historic_prompt_messages() + + @abstractmethod + def _organize_prompt_messages(self) -> list[PromptMessage]: + """ + organize prompt messages """ - if self.files: - # get image detail config - image_detail_config = ( - self.application_generate_entity.file_upload_config.image_config.detail - if ( - self.application_generate_entity.file_upload_config - and self.application_generate_entity.file_upload_config.image_config - ) - else None - ) - image_detail_config = image_detail_config or ImagePromptMessageContent.DETAIL.LOW - - prompt_message_contents: list[PromptMessageContentUnionTypes] = [] - for file in self.files: - prompt_message_contents.append( - file_manager.to_prompt_message_content( - file, - image_detail_config=image_detail_config, - ) - ) - prompt_message_contents.append(TextPromptMessageContent(data=query)) - prompt_messages.append(UserPromptMessage(content=prompt_message_contents)) - else: - prompt_messages.append(UserPromptMessage(content=query)) + def _format_assistant_message(self, agent_scratchpad: list[AgentScratchpadUnit]) -> str: + """ + format assistant message + """ + message = "" + for scratchpad in agent_scratchpad: + if scratchpad.is_final(): + message += f"Final Answer: {scratchpad.agent_response}" + else: + message += f"Thought: {scratchpad.thought}\n\n" + if scratchpad.action_str: + message += f"Action: {scratchpad.action_str}\n\n" + if scratchpad.observation: + message += f"Observation: {scratchpad.observation}\n\n" - return prompt_messages + return message - def _clear_user_prompt_image_messages(self, prompt_messages: list[PromptMessage]) -> list[PromptMessage]: + def _organize_historic_prompt_messages( + self, current_session_messages: list[PromptMessage] | None = None + ) -> list[PromptMessage]: """ - As for now, gpt supports both fc and vision at the first iteration. - We need to remove the image messages from the prompt messages at the first iteration. + organize historic prompt messages """ - prompt_messages = deepcopy(prompt_messages) - - for prompt_message in prompt_messages: - if isinstance(prompt_message, UserPromptMessage): - if isinstance(prompt_message.content, list): - prompt_message.content = "\n".join( - [ - content.data - if content.type == PromptMessageContentType.TEXT - else "[image]" - if content.type == PromptMessageContentType.IMAGE - else "[file]" - for content in prompt_message.content - ] + result: list[PromptMessage] = [] + scratchpads: list[AgentScratchpadUnit] = [] + current_scratchpad: AgentScratchpadUnit | None = None + + for message in self.history_prompt_messages: + if isinstance(message, AssistantPromptMessage): + if not current_scratchpad: + assert isinstance(message.content, str) + current_scratchpad = AgentScratchpadUnit( + agent_response=message.content, + thought=message.content or "I am thinking about how to help you", + action_str="", + action=None, + observation=None, ) + scratchpads.append(current_scratchpad) + if message.tool_calls: + try: + current_scratchpad.action = AgentScratchpadUnit.Action( + action_name=message.tool_calls[0].function.name, + action_input=json.loads(message.tool_calls[0].function.arguments), + ) + current_scratchpad.action_str = json.dumps(current_scratchpad.action.to_dict()) + except: + pass + elif isinstance(message, ToolPromptMessage): + if current_scratchpad: + assert isinstance(message.content, str) + current_scratchpad.observation = message.content + else: + raise NotImplementedError("expected str type") + elif isinstance(message, UserPromptMessage): + if scratchpads: + result.append(AssistantPromptMessage(content=self._format_assistant_message(scratchpads))) + scratchpads = [] + current_scratchpad = None - return prompt_messages + result.append(message) - def _organize_prompt_messages(self): - prompt_template = self.app_config.prompt_template.simple_prompt_template or "" - self.history_prompt_messages = self._init_system_message(prompt_template, self.history_prompt_messages) - query_prompt_messages = self._organize_user_query(self.query or "", []) + if scratchpads: + result.append(AssistantPromptMessage(content=self._format_assistant_message(scratchpads))) - self.history_prompt_messages = AgentHistoryPromptTransform( + historic_prompts = AgentHistoryPromptTransform( model_config=self.model_config, - prompt_messages=[*query_prompt_messages, *self._current_thoughts], - history_messages=self.history_prompt_messages, + prompt_messages=current_session_messages or [], + history_messages=result, memory=self.memory, ).get_prompt() - - prompt_messages = [*self.history_prompt_messages, *query_prompt_messages, *self._current_thoughts] - if len(self._current_thoughts) != 0: - # clear messages after the first iteration - prompt_messages = self._clear_user_prompt_image_messages(prompt_messages) - return prompt_messages + return historic_prompts From f390578c390f5f76eee4426435a34e0917059667 Mon Sep 17 00:00:00 2001 From: Cursx <33718736+Cursx@users.noreply.github.com> Date: Wed, 26 Nov 2025 14:31:48 +0800 Subject: [PATCH 24/43] Update fc_agent_runner.py --- api/core/agent/fc_agent_runner.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/api/core/agent/fc_agent_runner.py b/api/core/agent/fc_agent_runner.py index 13c62787b9a95a..f95511bdf3e618 100644 --- a/api/core/agent/fc_agent_runner.py +++ b/api/core/agent/fc_agent_runner.py @@ -426,13 +426,20 @@ def _handle_direct_return( final_answer = "\n".join( [str(tr["tool_response"]) for tr in tool_responses if tr.get("tool_response") is not None] ) + tool_invoke_meta_agg: dict[str, list[Any]] = {} + observation_agg: dict[str, list[Any]] = {} + for tr in tool_responses: + tool_invoke_meta_agg.setdefault(tr["tool_call_name"], []).append(tr["meta"]) + observation_agg.setdefault(tr["tool_call_name"], []).append(tr["tool_response"]) + tool_invoke_meta = {k: (v[0] if len(v) == 1 else v) for k, v in tool_invoke_meta_agg.items()} + observation = {k: (v[0] if len(v) == 1 else v) for k, v in observation_agg.items()} self.save_agent_thought( agent_thought_id=agent_thought_id, tool_name="", tool_input="", thought="", - tool_invoke_meta={tr["tool_call_name"]: tr["meta"] for tr in tool_responses}, - observation={tr["tool_call_name"]: tr["tool_response"] for tr in tool_responses}, + tool_invoke_meta=tool_invoke_meta, + observation=observation, answer=final_answer, messages_ids=message_file_ids, ) From 02e2d30c2913a34d88a722da340e26f916072189 Mon Sep 17 00:00:00 2001 From: Cursx <33718736+Cursx@users.noreply.github.com> Date: Wed, 26 Nov 2025 14:32:55 +0800 Subject: [PATCH 25/43] Update tool.py --- api/core/tools/workflow_as_tool/tool.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/api/core/tools/workflow_as_tool/tool.py b/api/core/tools/workflow_as_tool/tool.py index d586aec6ac6d1f..e482110797ddc8 100644 --- a/api/core/tools/workflow_as_tool/tool.py +++ b/api/core/tools/workflow_as_tool/tool.py @@ -127,7 +127,7 @@ def _invoke( if return_direct_flag and isinstance(outputs, dict): v = outputs.get("text") if isinstance(v, str): - direct_text = v + direct_text = outputs.pop("text") if direct_text is not None: yield self.create_text_message(direct_text) From 8b8423ab43c836f4c00a398ade0c992531aa40ba Mon Sep 17 00:00:00 2001 From: Cursx <33718736+Cursx@users.noreply.github.com> Date: Wed, 26 Nov 2025 14:42:08 +0800 Subject: [PATCH 26/43] Update fc_agent_runner.py --- api/core/agent/fc_agent_runner.py | 25 ++++++++++++++++--------- 1 file changed, 16 insertions(+), 9 deletions(-) diff --git a/api/core/agent/fc_agent_runner.py b/api/core/agent/fc_agent_runner.py index f95511bdf3e618..df358edaf2b94f 100644 --- a/api/core/agent/fc_agent_runner.py +++ b/api/core/agent/fc_agent_runner.py @@ -257,12 +257,13 @@ def increase_usage(final_llm_usage_dict: dict[str, LLMUsage | None], usage: LLMU # add message file ids message_file_ids.append(message_file_id) - tool_response = { - "tool_call_id": tool_call_id, - "tool_call_name": tool_call_name, - "tool_response": tool_invoke_response, - "meta": tool_invoke_meta.to_dict(), - } + tool_response = { + "tool_call_id": tool_call_id, + "tool_call_name": tool_call_name, + "tool_response": tool_invoke_response, + "tool_call_args": tool_call_args, + "meta": tool_invoke_meta.to_dict(), + } tool_responses.append(tool_response) # check direct return flag @@ -276,6 +277,7 @@ def increase_usage(final_llm_usage_dict: dict[str, LLMUsage | None], usage: LLMU yield from self._handle_direct_return( agent_thought_id, tool_responses, + response or "", message_file_ids, prompt_messages, llm_final_usage, @@ -419,6 +421,7 @@ def _handle_direct_return( self, agent_thought_id: str, tool_responses: list[dict[str, Any]], + thought: str, message_file_ids: list[str], prompt_messages: list[PromptMessage], usage: LLMUsage, @@ -428,16 +431,20 @@ def _handle_direct_return( ) tool_invoke_meta_agg: dict[str, list[Any]] = {} observation_agg: dict[str, list[Any]] = {} + tool_input_agg: dict[str, list[Any]] = {} for tr in tool_responses: tool_invoke_meta_agg.setdefault(tr["tool_call_name"], []).append(tr["meta"]) observation_agg.setdefault(tr["tool_call_name"], []).append(tr["tool_response"]) + tool_input_agg.setdefault(tr["tool_call_name"], []).append(tr.get("tool_call_args", {})) tool_invoke_meta = {k: (v[0] if len(v) == 1 else v) for k, v in tool_invoke_meta_agg.items()} observation = {k: (v[0] if len(v) == 1 else v) for k, v in observation_agg.items()} + tool_input = {k: (v[0] if len(v) == 1 else v) for k, v in tool_input_agg.items()} + tool_name = ";".join(sorted({tr["tool_call_name"] for tr in tool_responses})) self.save_agent_thought( agent_thought_id=agent_thought_id, - tool_name="", - tool_input="", - thought="", + tool_name=tool_name, + tool_input=tool_input, + thought=thought, tool_invoke_meta=tool_invoke_meta, observation=observation, answer=final_answer, From 528f74005a2dde026f3844a1150ccaa8aa1d0f05 Mon Sep 17 00:00:00 2001 From: Cursx <33718736+Cursx@users.noreply.github.com> Date: Wed, 26 Nov 2025 15:05:34 +0800 Subject: [PATCH 27/43] Update fc_agent_runner.py --- api/core/agent/fc_agent_runner.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/api/core/agent/fc_agent_runner.py b/api/core/agent/fc_agent_runner.py index df358edaf2b94f..11ef0964513a87 100644 --- a/api/core/agent/fc_agent_runner.py +++ b/api/core/agent/fc_agent_runner.py @@ -426,6 +426,9 @@ def _handle_direct_return( prompt_messages: list[PromptMessage], usage: LLMUsage, ) -> Generator[LLMResultChunk, None, None]: + def _flatten(agg_dict: dict[str, list[Any]]) -> dict[str, Any]: + return {k: (v[0] if len(v) == 1 else v) for k, v in agg_dict.items()} + final_answer = "\n".join( [str(tr["tool_response"]) for tr in tool_responses if tr.get("tool_response") is not None] ) @@ -436,9 +439,9 @@ def _handle_direct_return( tool_invoke_meta_agg.setdefault(tr["tool_call_name"], []).append(tr["meta"]) observation_agg.setdefault(tr["tool_call_name"], []).append(tr["tool_response"]) tool_input_agg.setdefault(tr["tool_call_name"], []).append(tr.get("tool_call_args", {})) - tool_invoke_meta = {k: (v[0] if len(v) == 1 else v) for k, v in tool_invoke_meta_agg.items()} - observation = {k: (v[0] if len(v) == 1 else v) for k, v in observation_agg.items()} - tool_input = {k: (v[0] if len(v) == 1 else v) for k, v in tool_input_agg.items()} + tool_invoke_meta = _flatten(tool_invoke_meta_agg) + observation = _flatten(observation_agg) + tool_input = _flatten(tool_input_agg) tool_name = ";".join(sorted({tr["tool_call_name"] for tr in tool_responses})) self.save_agent_thought( agent_thought_id=agent_thought_id, From 1baa4d4ee27d8c97c1b66b2322179cab71c0cc21 Mon Sep 17 00:00:00 2001 From: Cursx <33718736+Cursx@users.noreply.github.com> Date: Wed, 26 Nov 2025 15:06:10 +0800 Subject: [PATCH 28/43] Update tool.py --- api/core/tools/workflow_as_tool/tool.py | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/api/core/tools/workflow_as_tool/tool.py b/api/core/tools/workflow_as_tool/tool.py index e482110797ddc8..c83bc90cac4911 100644 --- a/api/core/tools/workflow_as_tool/tool.py +++ b/api/core/tools/workflow_as_tool/tool.py @@ -114,12 +114,7 @@ def _invoke( for file in files: yield self.create_file_message(file) # type: ignore - # detect return_direct flag from workflow outputs (strict boolean only) - return_direct_flag = False - if isinstance(outputs, dict) and "return_direct" in outputs: - raw_flag = outputs.pop("return_direct") - if raw_flag is True: - return_direct_flag = True + return_direct_flag = isinstance(outputs, dict) and outputs.pop("return_direct", None) is True self._latest_usage = self._derive_usage_from_result(data) From 203383146eeae82b2d2f7bc6f2a159501dae1be0 Mon Sep 17 00:00:00 2001 From: Cursx <33718736+Cursx@users.noreply.github.com> Date: Wed, 26 Nov 2025 15:14:31 +0800 Subject: [PATCH 29/43] Update fc_agent_runner.py --- api/core/agent/fc_agent_runner.py | 33 ++++++++++++++++++------------- 1 file changed, 19 insertions(+), 14 deletions(-) diff --git a/api/core/agent/fc_agent_runner.py b/api/core/agent/fc_agent_runner.py index 11ef0964513a87..fe854c7d166870 100644 --- a/api/core/agent/fc_agent_runner.py +++ b/api/core/agent/fc_agent_runner.py @@ -227,12 +227,19 @@ def increase_usage(final_llm_usage_dict: dict[str, LLMUsage | None], usage: LLMU for tool_call_id, tool_call_name, tool_call_args in tool_calls: tool_instance = tool_instances.get(tool_call_name) if not tool_instance: + tool_invoke_meta = ToolInvokeMeta.error_instance( + f"there is not a tool named {tool_call_name}" + ) + tool_invoke_response = f"there is not a tool named {tool_call_name}" tool_response = { "tool_call_id": tool_call_id, "tool_call_name": tool_call_name, - "tool_response": f"there is not a tool named {tool_call_name}", - "meta": ToolInvokeMeta.error_instance(f"there is not a tool named {tool_call_name}").to_dict(), + "tool_response": tool_invoke_response, + "tool_call_args": tool_call_args, + "meta": tool_invoke_meta.to_dict(), } + tool_response["direct_flag"] = False + tool_responses.append(tool_response) else: # invoke tool tool_invoke_response, message_files, tool_invoke_meta = ToolEngine.agent_invoke( @@ -257,18 +264,16 @@ def increase_usage(final_llm_usage_dict: dict[str, LLMUsage | None], usage: LLMU # add message file ids message_file_ids.append(message_file_id) - tool_response = { - "tool_call_id": tool_call_id, - "tool_call_name": tool_call_name, - "tool_response": tool_invoke_response, - "tool_call_args": tool_call_args, - "meta": tool_invoke_meta.to_dict(), - } - - tool_responses.append(tool_response) - # check direct return flag - direct_flag = (tool_invoke_meta.extra or {}).get("return_direct", False) - tool_response["direct_flag"] = direct_flag + tool_response = { + "tool_call_id": tool_call_id, + "tool_call_name": tool_call_name, + "tool_response": tool_invoke_response, + "tool_call_args": tool_call_args, + "meta": tool_invoke_meta.to_dict(), + } + direct_flag = bool((tool_invoke_meta.extra or {}).get("return_direct", False)) + tool_response["direct_flag"] = direct_flag + tool_responses.append(tool_response) if len(tool_responses) > 0: all_direct = all(tr.get("direct_flag") is True for tr in tool_responses) From 3c8ea64ef3b16978ee44cbcc588089d03e8473f3 Mon Sep 17 00:00:00 2001 From: Cursx <33718736+Cursx@users.noreply.github.com> Date: Wed, 26 Nov 2025 15:23:40 +0800 Subject: [PATCH 30/43] Update fc_agent_runner.py --- api/core/agent/fc_agent_runner.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/api/core/agent/fc_agent_runner.py b/api/core/agent/fc_agent_runner.py index fe854c7d166870..245c6608776b5c 100644 --- a/api/core/agent/fc_agent_runner.py +++ b/api/core/agent/fc_agent_runner.py @@ -227,10 +227,9 @@ def increase_usage(final_llm_usage_dict: dict[str, LLMUsage | None], usage: LLMU for tool_call_id, tool_call_name, tool_call_args in tool_calls: tool_instance = tool_instances.get(tool_call_name) if not tool_instance: - tool_invoke_meta = ToolInvokeMeta.error_instance( - f"there is not a tool named {tool_call_name}" - ) - tool_invoke_response = f"there is not a tool named {tool_call_name}" + error_message = f"there is not a tool named {tool_call_name}" + tool_invoke_meta = ToolInvokeMeta.error_instance(error_message) + tool_invoke_response = error_message tool_response = { "tool_call_id": tool_call_id, "tool_call_name": tool_call_name, From 924a8e6ab64c638a6d504285200759467cb5d9d7 Mon Sep 17 00:00:00 2001 From: Cursx <33718736+Cursx@users.noreply.github.com> Date: Wed, 26 Nov 2025 15:24:12 +0800 Subject: [PATCH 31/43] Update tool_engine.py --- api/core/tools/tool_engine.py | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/api/core/tools/tool_engine.py b/api/core/tools/tool_engine.py index 7674730fbc2937..dee9da53966b5e 100644 --- a/api/core/tools/tool_engine.py +++ b/api/core/tools/tool_engine.py @@ -108,16 +108,13 @@ def message_callback( ) # detect return_direct signal from variable messages (strict boolean short-circuit) - return_direct = False - for m in message_list: - if m.type == ToolInvokeMessage.MessageType.VARIABLE: - variable = cast(ToolInvokeMessage.VariableMessage, m.message) - if ( - variable.variable_name == "return_direct" - and variable.variable_value is True - ): - return_direct = True - break + return_direct = any( + m.type == ToolInvokeMessage.MessageType.VARIABLE + and (variable := cast(ToolInvokeMessage.VariableMessage, m.message)) + and variable.variable_name == "return_direct" + and variable.variable_value is True + for m in message_list + ) plain_text = ToolEngine._convert_tool_response_to_str(message_list) From 07c9652ceeedaf4d0c15cfbac8e997d8b0cb6ed6 Mon Sep 17 00:00:00 2001 From: Cursx <33718736+Cursx@users.noreply.github.com> Date: Wed, 26 Nov 2025 15:47:52 +0800 Subject: [PATCH 32/43] Update fc_agent_runner.py --- api/core/agent/fc_agent_runner.py | 50 +++++++++++++++++++++---------- 1 file changed, 34 insertions(+), 16 deletions(-) diff --git a/api/core/agent/fc_agent_runner.py b/api/core/agent/fc_agent_runner.py index 245c6608776b5c..adae02699919c7 100644 --- a/api/core/agent/fc_agent_runner.py +++ b/api/core/agent/fc_agent_runner.py @@ -230,14 +230,14 @@ def increase_usage(final_llm_usage_dict: dict[str, LLMUsage | None], usage: LLMU error_message = f"there is not a tool named {tool_call_name}" tool_invoke_meta = ToolInvokeMeta.error_instance(error_message) tool_invoke_response = error_message - tool_response = { - "tool_call_id": tool_call_id, - "tool_call_name": tool_call_name, - "tool_response": tool_invoke_response, - "tool_call_args": tool_call_args, - "meta": tool_invoke_meta.to_dict(), - } - tool_response["direct_flag"] = False + tool_response = self._create_tool_response( + tool_call_id, + tool_call_name, + tool_call_args, + tool_invoke_response, + tool_invoke_meta, + False, + ) tool_responses.append(tool_response) else: # invoke tool @@ -263,15 +263,15 @@ def increase_usage(final_llm_usage_dict: dict[str, LLMUsage | None], usage: LLMU # add message file ids message_file_ids.append(message_file_id) - tool_response = { - "tool_call_id": tool_call_id, - "tool_call_name": tool_call_name, - "tool_response": tool_invoke_response, - "tool_call_args": tool_call_args, - "meta": tool_invoke_meta.to_dict(), - } direct_flag = bool((tool_invoke_meta.extra or {}).get("return_direct", False)) - tool_response["direct_flag"] = direct_flag + tool_response = self._create_tool_response( + tool_call_id, + tool_call_name, + tool_call_args, + tool_invoke_response, + tool_invoke_meta, + direct_flag, + ) tool_responses.append(tool_response) if len(tool_responses) > 0: @@ -421,6 +421,24 @@ def _yield_final_answer( ), ) + def _create_tool_response( + self, + tool_call_id: str, + tool_call_name: str, + tool_call_args: dict[str, Any], + tool_invoke_response: str | None, + tool_invoke_meta: ToolInvokeMeta, + direct_flag: bool, + ) -> dict[str, Any]: + return { + "tool_call_id": tool_call_id, + "tool_call_name": tool_call_name, + "tool_response": tool_invoke_response, + "tool_call_args": tool_call_args, + "meta": tool_invoke_meta.to_dict(), + "direct_flag": direct_flag, + } + def _handle_direct_return( self, agent_thought_id: str, From 6ca8e9282bc309e57f04648119f56557dd7ac3a5 Mon Sep 17 00:00:00 2001 From: Cursx <33718736+Cursx@users.noreply.github.com> Date: Wed, 26 Nov 2025 15:48:29 +0800 Subject: [PATCH 33/43] Simplify condition for direct text retrieval --- api/core/tools/workflow_as_tool/tool.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/api/core/tools/workflow_as_tool/tool.py b/api/core/tools/workflow_as_tool/tool.py index c83bc90cac4911..6abd0b9b502f22 100644 --- a/api/core/tools/workflow_as_tool/tool.py +++ b/api/core/tools/workflow_as_tool/tool.py @@ -119,7 +119,7 @@ def _invoke( self._latest_usage = self._derive_usage_from_result(data) direct_text = None - if return_direct_flag and isinstance(outputs, dict): + if return_direct_flag: v = outputs.get("text") if isinstance(v, str): direct_text = outputs.pop("text") From 4d64aca875bd6df69508bc713b3406fc07eb0172 Mon Sep 17 00:00:00 2001 From: Cursx <33718736+Cursx@users.noreply.github.com> Date: Wed, 26 Nov 2025 16:45:28 +0800 Subject: [PATCH 34/43] Update fc_agent_runner.py --- api/core/agent/fc_agent_runner.py | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/api/core/agent/fc_agent_runner.py b/api/core/agent/fc_agent_runner.py index adae02699919c7..bda66d5025ed3b 100644 --- a/api/core/agent/fc_agent_runner.py +++ b/api/core/agent/fc_agent_runner.py @@ -397,6 +397,17 @@ def _yield_final_answer( final_answer: str, usage: LLMUsage, ) -> Generator[LLMResultChunk, None, None]: + yield LLMResultChunk( + model=self.model_instance.model, + prompt_messages=prompt_messages, + system_fingerprint="", + delta=LLMResultChunkDelta( + index=0, + message=AssistantPromptMessage(content=final_answer), + usage=usage, + ), + ) + self.queue_manager.publish( QueueMessageEndEvent( llm_result=LLMResult( @@ -410,17 +421,6 @@ def _yield_final_answer( PublishFrom.APPLICATION_MANAGER, ) - yield LLMResultChunk( - model=self.model_instance.model, - prompt_messages=prompt_messages, - system_fingerprint="", - delta=LLMResultChunkDelta( - index=0, - message=AssistantPromptMessage(content=final_answer), - usage=usage, - ), - ) - def _create_tool_response( self, tool_call_id: str, From c85ad29567d17c59b74ee3ff5ca3ae99c0fc27f4 Mon Sep 17 00:00:00 2001 From: Cursx <33718736+Cursx@users.noreply.github.com> Date: Wed, 26 Nov 2025 17:14:26 +0800 Subject: [PATCH 35/43] Update tool.py --- api/core/tools/workflow_as_tool/tool.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/api/core/tools/workflow_as_tool/tool.py b/api/core/tools/workflow_as_tool/tool.py index 6abd0b9b502f22..906d46aabde8ae 100644 --- a/api/core/tools/workflow_as_tool/tool.py +++ b/api/core/tools/workflow_as_tool/tool.py @@ -120,9 +120,13 @@ def _invoke( direct_text = None if return_direct_flag: - v = outputs.get("text") - if isinstance(v, str): - direct_text = outputs.pop("text") + if isinstance(outputs, str): + direct_text = outputs + outputs = {} + elif isinstance(outputs, dict): + string_values = [v for v in outputs.values() if isinstance(v, str)] + if string_values: + direct_text = "\n".join(string_values) if direct_text is not None: yield self.create_text_message(direct_text) From 398ab387373ec43c754f33c7f9a3d6cce6041a75 Mon Sep 17 00:00:00 2001 From: Cursx <33718736+Cursx@users.noreply.github.com> Date: Wed, 26 Nov 2025 17:22:25 +0800 Subject: [PATCH 36/43] Update tool.py --- api/core/tools/workflow_as_tool/tool.py | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/api/core/tools/workflow_as_tool/tool.py b/api/core/tools/workflow_as_tool/tool.py index 906d46aabde8ae..9cb2bae634897f 100644 --- a/api/core/tools/workflow_as_tool/tool.py +++ b/api/core/tools/workflow_as_tool/tool.py @@ -120,13 +120,9 @@ def _invoke( direct_text = None if return_direct_flag: - if isinstance(outputs, str): - direct_text = outputs - outputs = {} - elif isinstance(outputs, dict): - string_values = [v for v in outputs.values() if isinstance(v, str)] - if string_values: - direct_text = "\n".join(string_values) + string_values = [v for v in outputs.values() if isinstance(v, str)] + if string_values: + direct_text = "\n".join(string_values) if direct_text is not None: yield self.create_text_message(direct_text) From 2725ea7cc2fc87c5600dd1f2ceae92f0a45ef94e Mon Sep 17 00:00:00 2001 From: Cursx <33718736+Cursx@users.noreply.github.com> Date: Thu, 27 Nov 2025 17:36:38 +0800 Subject: [PATCH 37/43] Update cot_agent_runner.py --- api/core/agent/cot_agent_runner.py | 52 +++++++++++++++++++++++------- 1 file changed, 40 insertions(+), 12 deletions(-) diff --git a/api/core/agent/cot_agent_runner.py b/api/core/agent/cot_agent_runner.py index eea40821282c28..35fe1b62505a29 100644 --- a/api/core/agent/cot_agent_runner.py +++ b/api/core/agent/cot_agent_runner.py @@ -74,6 +74,7 @@ def run( final_answer = "" prompt_messages: list = [] # Initialize prompt_messages agent_thought_id = "" # Initialize agent_thought_id + is_final_answer_from_tool = False def increase_usage(final_llm_usage_dict: dict[str, LLMUsage | None], usage: LLMUsage): if not final_llm_usage_dict["usage"]: @@ -233,7 +234,33 @@ def increase_usage(final_llm_usage_dict: dict[str, LLMUsage | None], usage: LLMU if direct_flag: final_answer = str(tool_invoke_response or "") - # keep function_call_state as False to end iterations + is_final_answer_from_tool = True + + # emit final answer immediately to preserve UI order (tool usage → answer) + yield LLMResultChunk( + model=model_instance.model, + prompt_messages=prompt_messages, + delta=LLMResultChunkDelta( + index=0, + message=AssistantPromptMessage(content=final_answer), + usage=llm_usage["usage"], + ), + system_fingerprint="", + ) + + self.queue_manager.publish( + QueueMessageEndEvent( + llm_result=LLMResult( + model=model_instance.model, + prompt_messages=prompt_messages, + message=AssistantPromptMessage(content=final_answer), + usage=llm_usage["usage"] or LLMUsage.empty_usage(), + system_fingerprint="", + ) + ), + PublishFrom.APPLICATION_MANAGER, + ) + return else: function_call_state = True @@ -252,17 +279,18 @@ def increase_usage(final_llm_usage_dict: dict[str, LLMUsage | None], usage: LLMU system_fingerprint="", ) - # save agent thought - self.save_agent_thought( - agent_thought_id=agent_thought_id, - tool_name="", - tool_input={}, - tool_invoke_meta={}, - thought=final_answer, - observation={}, - answer=final_answer, - messages_ids=[], - ) + # save agent thought only when final answer is NOT directly from tool + if not is_final_answer_from_tool: + self.save_agent_thought( + agent_thought_id=agent_thought_id, + tool_name="", + tool_input={}, + tool_invoke_meta={}, + thought=final_answer, + observation={}, + answer=final_answer, + messages_ids=[], + ) # publish end event self.queue_manager.publish( QueueMessageEndEvent( From a70e5f2dae49912f3726a0a9c26a01455bbea0f6 Mon Sep 17 00:00:00 2001 From: Cursx <33718736+Cursx@users.noreply.github.com> Date: Thu, 27 Nov 2025 17:51:07 +0800 Subject: [PATCH 38/43] Update cot_agent_runner.py --- api/core/agent/cot_agent_runner.py | 60 ++++++++++++++---------------- 1 file changed, 28 insertions(+), 32 deletions(-) diff --git a/api/core/agent/cot_agent_runner.py b/api/core/agent/cot_agent_runner.py index 35fe1b62505a29..20a386a60fbfb5 100644 --- a/api/core/agent/cot_agent_runner.py +++ b/api/core/agent/cot_agent_runner.py @@ -235,30 +235,11 @@ def increase_usage(final_llm_usage_dict: dict[str, LLMUsage | None], usage: LLMU if direct_flag: final_answer = str(tool_invoke_response or "") is_final_answer_from_tool = True - - # emit final answer immediately to preserve UI order (tool usage → answer) - yield LLMResultChunk( - model=model_instance.model, + + yield from self._yield_final_answer( prompt_messages=prompt_messages, - delta=LLMResultChunkDelta( - index=0, - message=AssistantPromptMessage(content=final_answer), - usage=llm_usage["usage"], - ), - system_fingerprint="", - ) - - self.queue_manager.publish( - QueueMessageEndEvent( - llm_result=LLMResult( - model=model_instance.model, - prompt_messages=prompt_messages, - message=AssistantPromptMessage(content=final_answer), - usage=llm_usage["usage"] or LLMUsage.empty_usage(), - system_fingerprint="", - ) - ), - PublishFrom.APPLICATION_MANAGER, + final_answer=final_answer, + usage=llm_usage["usage"], ) return else: @@ -270,13 +251,10 @@ def increase_usage(final_llm_usage_dict: dict[str, LLMUsage | None], usage: LLMU iteration_step += 1 - yield LLMResultChunk( - model=model_instance.model, + yield from self._yield_final_answer( prompt_messages=prompt_messages, - delta=LLMResultChunkDelta( - index=0, message=AssistantPromptMessage(content=final_answer), usage=llm_usage["usage"] - ), - system_fingerprint="", + final_answer=final_answer, + usage=llm_usage["usage"], ) # save agent thought only when final answer is NOT directly from tool @@ -291,14 +269,32 @@ def increase_usage(final_llm_usage_dict: dict[str, LLMUsage | None], usage: LLMU answer=final_answer, messages_ids=[], ) - # publish end event + + def _yield_final_answer( + self, + prompt_messages: list, + final_answer: str, + usage: LLMUsage | None, + ) -> Generator[LLMResultChunk, None, None]: + """Yields the final answer chunk and publishes the end event.""" + yield LLMResultChunk( + model=self.model_instance.model, + prompt_messages=prompt_messages, + delta=LLMResultChunkDelta( + index=0, + message=AssistantPromptMessage(content=final_answer), + usage=usage, + ), + system_fingerprint="", + ) + self.queue_manager.publish( QueueMessageEndEvent( llm_result=LLMResult( - model=model_instance.model, + model=self.model_instance.model, prompt_messages=prompt_messages, message=AssistantPromptMessage(content=final_answer), - usage=llm_usage["usage"] or LLMUsage.empty_usage(), + usage=usage or LLMUsage.empty_usage(), system_fingerprint="", ) ), From 4dce6c8603a41d1356d464d58754453344411948 Mon Sep 17 00:00:00 2001 From: Cursx <33718736+Cursx@users.noreply.github.com> Date: Thu, 27 Nov 2025 18:22:28 +0800 Subject: [PATCH 39/43] Update cot_agent_runner.py --- api/core/agent/cot_agent_runner.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/api/core/agent/cot_agent_runner.py b/api/core/agent/cot_agent_runner.py index 20a386a60fbfb5..caf61c4af83ec7 100644 --- a/api/core/agent/cot_agent_runner.py +++ b/api/core/agent/cot_agent_runner.py @@ -239,7 +239,7 @@ def increase_usage(final_llm_usage_dict: dict[str, LLMUsage | None], usage: LLMU yield from self._yield_final_answer( prompt_messages=prompt_messages, final_answer=final_answer, - usage=llm_usage["usage"], + usage=llm_usage["usage"] or LLMUsage.empty_usage(), ) return else: @@ -254,7 +254,7 @@ def increase_usage(final_llm_usage_dict: dict[str, LLMUsage | None], usage: LLMU yield from self._yield_final_answer( prompt_messages=prompt_messages, final_answer=final_answer, - usage=llm_usage["usage"], + usage=llm_usage["usage"] or LLMUsage.empty_usage(), ) # save agent thought only when final answer is NOT directly from tool From 990cf84c10feca85b6c9e3b1c17b2538f468640f Mon Sep 17 00:00:00 2001 From: Cursx <33718736+Cursx@users.noreply.github.com> Date: Thu, 27 Nov 2025 18:50:29 +0800 Subject: [PATCH 40/43] Update fc_agent_runner.py --- api/core/agent/fc_agent_runner.py | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/api/core/agent/fc_agent_runner.py b/api/core/agent/fc_agent_runner.py index bda66d5025ed3b..bb06803773309f 100644 --- a/api/core/agent/fc_agent_runner.py +++ b/api/core/agent/fc_agent_runner.py @@ -229,12 +229,11 @@ def increase_usage(final_llm_usage_dict: dict[str, LLMUsage | None], usage: LLMU if not tool_instance: error_message = f"there is not a tool named {tool_call_name}" tool_invoke_meta = ToolInvokeMeta.error_instance(error_message) - tool_invoke_response = error_message tool_response = self._create_tool_response( tool_call_id, tool_call_name, tool_call_args, - tool_invoke_response, + error_message, tool_invoke_meta, False, ) @@ -439,6 +438,11 @@ def _create_tool_response( "direct_flag": direct_flag, } + @staticmethod + def _flatten(agg_dict: dict[str, list[Any]]) -> dict[str, Any]: + """Flattens a dictionary of lists, keeping single-item lists as values.""" + return {k: (v[0] if len(v) == 1 else v) for k, v in agg_dict.items()} + def _handle_direct_return( self, agent_thought_id: str, @@ -448,9 +452,6 @@ def _handle_direct_return( prompt_messages: list[PromptMessage], usage: LLMUsage, ) -> Generator[LLMResultChunk, None, None]: - def _flatten(agg_dict: dict[str, list[Any]]) -> dict[str, Any]: - return {k: (v[0] if len(v) == 1 else v) for k, v in agg_dict.items()} - final_answer = "\n".join( [str(tr["tool_response"]) for tr in tool_responses if tr.get("tool_response") is not None] ) @@ -461,9 +462,9 @@ def _flatten(agg_dict: dict[str, list[Any]]) -> dict[str, Any]: tool_invoke_meta_agg.setdefault(tr["tool_call_name"], []).append(tr["meta"]) observation_agg.setdefault(tr["tool_call_name"], []).append(tr["tool_response"]) tool_input_agg.setdefault(tr["tool_call_name"], []).append(tr.get("tool_call_args", {})) - tool_invoke_meta = _flatten(tool_invoke_meta_agg) - observation = _flatten(observation_agg) - tool_input = _flatten(tool_input_agg) + tool_invoke_meta = self._flatten(tool_invoke_meta_agg) + observation = self._flatten(observation_agg) + tool_input = self._flatten(tool_input_agg) tool_name = ";".join(sorted({tr["tool_call_name"] for tr in tool_responses})) self.save_agent_thought( agent_thought_id=agent_thought_id, From 9917609b910479f34b4efc8d6676d6bc5709ae85 Mon Sep 17 00:00:00 2001 From: Cursx <33718736+Cursx@users.noreply.github.com> Date: Thu, 27 Nov 2025 18:51:08 +0800 Subject: [PATCH 41/43] Apply suggestion from @gemini-code-assist[bot] Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> --- api/core/tools/workflow_as_tool/tool.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/api/core/tools/workflow_as_tool/tool.py b/api/core/tools/workflow_as_tool/tool.py index 9cb2bae634897f..9d350b42cdb7be 100644 --- a/api/core/tools/workflow_as_tool/tool.py +++ b/api/core/tools/workflow_as_tool/tool.py @@ -120,7 +120,7 @@ def _invoke( direct_text = None if return_direct_flag: - string_values = [v for v in outputs.values() if isinstance(v, str)] + string_values = [v for k, v in sorted(outputs.items()) if isinstance(v, str)] if string_values: direct_text = "\n".join(string_values) From dc5bc33030ce9703657b822b9c2557fbc85e353d Mon Sep 17 00:00:00 2001 From: Cursx <33718736+Cursx@users.noreply.github.com> Date: Thu, 27 Nov 2025 19:10:31 +0800 Subject: [PATCH 42/43] Update tool_engine.py --- api/core/tools/tool_engine.py | 1 + 1 file changed, 1 insertion(+) diff --git a/api/core/tools/tool_engine.py b/api/core/tools/tool_engine.py index dee9da53966b5e..d73feb0cc0dffe 100644 --- a/api/core/tools/tool_engine.py +++ b/api/core/tools/tool_engine.py @@ -110,6 +110,7 @@ def message_callback( # detect return_direct signal from variable messages (strict boolean short-circuit) return_direct = any( m.type == ToolInvokeMessage.MessageType.VARIABLE + and m.message is not None and (variable := cast(ToolInvokeMessage.VariableMessage, m.message)) and variable.variable_name == "return_direct" and variable.variable_value is True From 29d31930c0fa4223a4c6467a79b34c3f92419e38 Mon Sep 17 00:00:00 2001 From: Cursx <33718736+Cursx@users.noreply.github.com> Date: Thu, 27 Nov 2025 19:11:45 +0800 Subject: [PATCH 43/43] Refactor final_answer construction using list append --- api/core/agent/fc_agent_runner.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/api/core/agent/fc_agent_runner.py b/api/core/agent/fc_agent_runner.py index bb06803773309f..a3a7a030dd06df 100644 --- a/api/core/agent/fc_agent_runner.py +++ b/api/core/agent/fc_agent_runner.py @@ -452,16 +452,19 @@ def _handle_direct_return( prompt_messages: list[PromptMessage], usage: LLMUsage, ) -> Generator[LLMResultChunk, None, None]: - final_answer = "\n".join( - [str(tr["tool_response"]) for tr in tool_responses if tr.get("tool_response") is not None] - ) + final_answer_parts = [] tool_invoke_meta_agg: dict[str, list[Any]] = {} observation_agg: dict[str, list[Any]] = {} tool_input_agg: dict[str, list[Any]] = {} + for tr in tool_responses: + if tr.get("tool_response") is not None: + final_answer_parts.append(str(tr["tool_response"])) tool_invoke_meta_agg.setdefault(tr["tool_call_name"], []).append(tr["meta"]) observation_agg.setdefault(tr["tool_call_name"], []).append(tr["tool_response"]) tool_input_agg.setdefault(tr["tool_call_name"], []).append(tr.get("tool_call_args", {})) + + final_answer = "\n".join(final_answer_parts) tool_invoke_meta = self._flatten(tool_invoke_meta_agg) observation = self._flatten(observation_agg) tool_input = self._flatten(tool_input_agg)