From 510fb7fdbf73b42e6c8160cd2f0202e9e99e4493 Mon Sep 17 00:00:00 2001 From: Kevin Lin Date: Mon, 29 Sep 2025 16:31:38 +0800 Subject: [PATCH 01/12] feat: support advanced non-incremental output --- dashscope/aigc/generation.py | 152 ++++++- dashscope/aigc/multimodal_conversation.py | 153 ++++++- dashscope/utils/param_utils.py | 29 ++ samples/test_aio_generation.py | 428 +++++++++++++++++++ samples/test_aio_multimodal_conversation.py | 418 ++++++++++++------- samples/test_generation.py | 437 ++++++++++++++++++-- samples/test_multimodal_conversation.py | 257 +++++++++++- 7 files changed, 1661 insertions(+), 213 deletions(-) create mode 100644 dashscope/utils/param_utils.py create mode 100644 samples/test_aio_generation.py diff --git a/dashscope/aigc/generation.py b/dashscope/aigc/generation.py index cfe8a73..3782a59 100644 --- a/dashscope/aigc/generation.py +++ b/dashscope/aigc/generation.py @@ -2,7 +2,7 @@ import copy import json -from typing import Any, Dict, Generator, List, Union +from typing import Any, Dict, Generator, List, Union, AsyncGenerator from dashscope.api_entities.dashscope_response import (GenerationResponse, Message, Role) @@ -13,6 +13,7 @@ from dashscope.common.error import InputRequired, ModelRequired from dashscope.common.logging import logger from dashscope.common.utils import _get_task_group_and_task +from dashscope.utils.param_utils import ParamUtil class Generation(BaseApi): @@ -137,6 +138,16 @@ def call( kwargs['headers'] = headers input, parameters = cls._build_input_parameters( model, prompt, history, messages, **kwargs) + + is_stream = parameters.get('stream', False) + # Check if we need to merge incremental output + is_incremental_output = kwargs.get('incremental_output', None) + to_merge_incremental_output = False + if (ParamUtil.should_modify_incremental_output(model) and + is_stream and is_incremental_output is False): + to_merge_incremental_output = True + parameters['incremental_output'] = True + response = super().call(model=model, task_group=task_group, task=Generation.task, @@ -145,10 +156,12 @@ def call( input=input, workspace=workspace, **parameters) - is_stream = kwargs.get('stream', False) if is_stream: - return (GenerationResponse.from_api_response(rsp) - for rsp in response) + if to_merge_incremental_output: + return cls._merge_generation_response(response) + else: + return (GenerationResponse.from_api_response(rsp) + for rsp in response) else: return GenerationResponse.from_api_response(response) @@ -191,6 +204,15 @@ def _build_input_parameters(cls, model, prompt, history, messages, return input, {**parameters, **kwargs} + @classmethod + def _merge_generation_response(cls, response) -> Generator[GenerationResponse, None, None]: + """Merge incremental response chunks to simulate non-incremental output.""" + accumulated_data = {} + for rsp in response: + parsed_response = GenerationResponse.from_api_response(rsp) + _merge_single_response(parsed_response, accumulated_data) + yield parsed_response + class AioGeneration(BaseAioApi): task = 'text-generation' @@ -220,7 +242,7 @@ async def call( plugins: Union[str, Dict[str, Any]] = None, workspace: str = None, **kwargs - ) -> Union[GenerationResponse, Generator[GenerationResponse, None, None]]: + ) -> Union[GenerationResponse, AsyncGenerator[GenerationResponse, None]]: """Call generation model service. Args: @@ -296,8 +318,8 @@ async def call( Returns: Union[GenerationResponse, - Generator[GenerationResponse, None, None]]: If - stream is True, return Generator, otherwise GenerationResponse. + AsyncGenerator[GenerationResponse, None]]: If + stream is True, return AsyncGenerator, otherwise GenerationResponse. """ if (prompt is None or not prompt) and (messages is None or not messages): @@ -314,6 +336,16 @@ async def call( kwargs['headers'] = headers input, parameters = Generation._build_input_parameters( model, prompt, history, messages, **kwargs) + + is_stream = parameters.get('stream', False) + # Check if we need to merge incremental output + is_incremental_output = kwargs.get('incremental_output', None) + to_merge_incremental_output = False + if (ParamUtil.should_modify_incremental_output(model) and + is_stream and is_incremental_output is False): + to_merge_incremental_output = True + parameters['incremental_output'] = True + response = await super().call(model=model, task_group=task_group, task=Generation.task, @@ -322,9 +354,109 @@ async def call( input=input, workspace=workspace, **parameters) - is_stream = kwargs.get('stream', False) if is_stream: - return (GenerationResponse.from_api_response(rsp) - async for rsp in response) + if to_merge_incremental_output: + return cls._merge_generation_response(response) + else: + return cls._stream_responses(response) else: return GenerationResponse.from_api_response(response) + + @classmethod + async def _stream_responses(cls, response) -> AsyncGenerator[GenerationResponse, None]: + """Convert async response stream to GenerationResponse stream.""" + # Type hint: when stream=True, response is actually an AsyncIterable + async for rsp in response: # type: ignore + yield GenerationResponse.from_api_response(rsp) + + @classmethod + async def _merge_generation_response(cls, response) -> AsyncGenerator[GenerationResponse, None]: + """Async version of merge incremental response chunks.""" + accumulated_data = {} + + async for rsp in response: # type: ignore + parsed_response = GenerationResponse.from_api_response(rsp) + _merge_single_response(parsed_response, accumulated_data) + yield parsed_response + +def _merge_single_response(parsed_response, accumulated_data): + """Merge a single response chunk with accumulated data.""" + # Process each choice in the choices array + if parsed_response.output: + if parsed_response.output.choices: + for choice_idx, choice in enumerate(parsed_response.output.choices): + # Initialize accumulated data for this choice if not exists + if choice_idx not in accumulated_data: + accumulated_data[choice_idx] = { + 'content': '', + 'tool_calls': [] + } + + if choice.message: + # Handle content accumulation + if 'content' in choice.message and choice.message.content: + current_content = choice.message.content + if current_content: + accumulated_data[choice_idx]['content'] += current_content + choice.message.content = accumulated_data[choice_idx]['content'] + + # Handle tool_calls accumulation + if 'tool_calls' in choice.message and choice.message.tool_calls: + current_tool_calls = choice.message.tool_calls + + # For each current tool call, accumulate its arguments + for current_call in current_tool_calls: + if isinstance(current_call, dict) and 'index' in current_call: + idx = current_call['index'] + + # Find existing accumulated call with same index + existing_call = None + for acc_call in accumulated_data[choice_idx]['tool_calls']: + if (isinstance(acc_call, dict) and + acc_call.get('index') == idx): + existing_call = acc_call + break + + if existing_call: + # Accumulate function fields from current call + if ('function' in current_call and + current_call['function']): + if 'function' not in existing_call: + existing_call['function'] = {} + + # Accumulate function.name + if 'name' in current_call['function']: + if 'name' not in existing_call['function']: + existing_call['function']['name'] = '' + existing_call['function']['name'] += current_call['function']['name'] + + # Accumulate function.arguments + if 'arguments' in current_call['function']: + if 'arguments' not in existing_call['function']: + existing_call['function']['arguments'] = '' + existing_call['function']['arguments'] += current_call['function']['arguments'] + + # Update other fields with latest values + existing_call.update({k: v for k, v in current_call.items() + if k != 'function' and v}) + if 'function' in current_call and current_call['function']: + existing_call['function'].update({k: v for k, v in current_call['function'].items() + if k not in ['arguments', 'name'] and v}) + else: + # Add new tool call + accumulated_data[choice_idx]['tool_calls'].append(dict(current_call)) + + # Update choice with accumulated tool_calls + choice.message.tool_calls = accumulated_data[choice_idx]['tool_calls'] + elif 'text' in parsed_response.output and parsed_response.output.text: + # Handle output.text accumulation (when choices is null) + # Use choice_idx 0 for output.text content to reuse existing structure + choice_idx = 0 + if choice_idx not in accumulated_data: + accumulated_data[choice_idx] = { + 'content': '', + 'tool_calls': [] + } + + accumulated_data[choice_idx]['content'] += parsed_response.output.text + parsed_response.output.text = accumulated_data[choice_idx]['content'] \ No newline at end of file diff --git a/dashscope/aigc/multimodal_conversation.py b/dashscope/aigc/multimodal_conversation.py index b6b4136..7648714 100644 --- a/dashscope/aigc/multimodal_conversation.py +++ b/dashscope/aigc/multimodal_conversation.py @@ -1,7 +1,7 @@ # Copyright (c) Alibaba, Inc. and its affiliates. import copy -from typing import Generator, List, Union +from typing import AsyncGenerator, Generator, List, Union from dashscope.api_entities.dashscope_response import \ MultiModalConversationResponse @@ -9,6 +9,7 @@ from dashscope.common.error import InputRequired, ModelRequired from dashscope.common.utils import _get_task_group_and_task from dashscope.utils.oss_utils import preprocess_message_element +from dashscope.utils.param_utils import ParamUtil class MultiModalConversation(BaseApi): @@ -108,6 +109,16 @@ def call( input.update({'language_type': language_type}) if msg_copy is not None: input.update({'messages': msg_copy}) + + # Check if we need to merge incremental output + is_incremental_output = kwargs.get('incremental_output', None) + to_merge_incremental_output = False + is_stream = kwargs.get('stream', False) + if (ParamUtil.should_modify_incremental_output(model) and + is_stream and is_incremental_output is not None and is_incremental_output is False): + to_merge_incremental_output = True + kwargs['incremental_output'] = True + response = super().call(model=model, task_group=task_group, task=MultiModalConversation.task, @@ -116,10 +127,12 @@ def call( input=input, workspace=workspace, **kwargs) - is_stream = kwargs.get('stream', False) if is_stream: - return (MultiModalConversationResponse.from_api_response(rsp) - for rsp in response) + if to_merge_incremental_output: + return cls._merge_multimodal_response(response) + else: + return (MultiModalConversationResponse.from_api_response(rsp) + for rsp in response) else: return MultiModalConversationResponse.from_api_response(response) @@ -149,6 +162,16 @@ def _preprocess_messages(cls, model: str, messages: List[dict], has_upload = True return has_upload + @classmethod + def _merge_multimodal_response(cls, response) -> Generator[MultiModalConversationResponse, None, None]: + """Merge incremental response chunks to simulate non-incremental output.""" + accumulated_data = {} + + for rsp in response: + parsed_response = MultiModalConversationResponse.from_api_response(rsp) + _merge_multimodal_single_response(parsed_response, accumulated_data) + yield parsed_response + class AioMultiModalConversation(BaseAioApi): """Async MultiModal conversational robot interface. @@ -170,8 +193,8 @@ async def call( voice: str = None, language_type: str = None, **kwargs - ) -> Union[MultiModalConversationResponse, Generator[ - MultiModalConversationResponse, None, None]]: + ) -> Union[MultiModalConversationResponse, AsyncGenerator[ + MultiModalConversationResponse, None]]: """Call the conversation model service asynchronously. Args: @@ -221,8 +244,8 @@ async def call( Returns: Union[MultiModalConversationResponse, - Generator[MultiModalConversationResponse, None, None]]: If - stream is True, return Generator, otherwise MultiModalConversationResponse. + AsyncGenerator[MultiModalConversationResponse, None]]: If + stream is True, return AsyncGenerator, otherwise MultiModalConversationResponse. """ if model is None or not model: raise ModelRequired('Model is required!') @@ -246,6 +269,16 @@ async def call( input.update({'language_type': language_type}) if msg_copy is not None: input.update({'messages': msg_copy}) + + # Check if we need to merge incremental output + is_incremental_output = kwargs.get('incremental_output', None) + to_merge_incremental_output = False + is_stream = kwargs.get('stream', False) + if (ParamUtil.should_modify_incremental_output(model) and + is_stream and is_incremental_output is not None and is_incremental_output is False): + to_merge_incremental_output = True + kwargs['incremental_output'] = True + response = await super().call(model=model, task_group=task_group, task=AioMultiModalConversation.task, @@ -254,10 +287,11 @@ async def call( input=input, workspace=workspace, **kwargs) - is_stream = kwargs.get('stream', False) if is_stream: - return (MultiModalConversationResponse.from_api_response(rsp) - async for rsp in response) + if to_merge_incremental_output: + return cls._merge_multimodal_response(response) + else: + return cls._stream_responses(response) else: return MultiModalConversationResponse.from_api_response(response) @@ -286,3 +320,100 @@ def _preprocess_messages(cls, model: str, messages: List[dict], if is_upload and not has_upload: has_upload = True return has_upload + + @classmethod + async def _stream_responses(cls, response) -> AsyncGenerator[MultiModalConversationResponse, None]: + """Convert async response stream to MultiModalConversationResponse stream.""" + # Type hint: when stream=True, response is actually an AsyncIterable + async for rsp in response: # type: ignore + yield MultiModalConversationResponse.from_api_response(rsp) + + @classmethod + async def _merge_multimodal_response(cls, response) -> AsyncGenerator[MultiModalConversationResponse, None]: + """Async version of merge incremental response chunks.""" + accumulated_data = {} + + async for rsp in response: + parsed_response = MultiModalConversationResponse.from_api_response(rsp) + _merge_multimodal_single_response(parsed_response, accumulated_data) + yield parsed_response + + +def _merge_multimodal_single_response(parsed_response, accumulated_data): + """Merge a single multimodal response chunk with accumulated data.""" + # Process each choice in the choices array + if parsed_response.output and parsed_response.output.choices: + for choice_idx, choice in enumerate(parsed_response.output.choices): + # Initialize accumulated data for this choice if not exists + if choice_idx not in accumulated_data: + accumulated_data[choice_idx] = { + 'content': [], + 'tool_calls': [] + } + + if choice.message: + # Handle content accumulation for multimodal content + if choice.message.content: + current_content = choice.message.content + + # Ensure accumulated content list has enough elements + while len(accumulated_data[choice_idx]['content']) < len(current_content): + accumulated_data[choice_idx]['content'].append({'text': ''}) + + # Merge each content element + for content_idx, content_item in enumerate(current_content): + if isinstance(content_item, dict) and 'text' in content_item: + if content_item['text']: + # Accumulate text content + accumulated_data[choice_idx]['content'][content_idx]['text'] += content_item['text'] + # Update the current response with accumulated content + choice.message.content[content_idx]['text'] = accumulated_data[choice_idx]['content'][content_idx]['text'] + + # Handle tool_calls accumulation + if 'tool_calls' in choice.message and choice.message.tool_calls: + current_tool_calls = choice.message.tool_calls + + # For each current tool call, accumulate its arguments + for current_call in current_tool_calls: + if isinstance(current_call, dict) and 'index' in current_call: + idx = current_call['index'] + + # Find existing accumulated call with same index + existing_call = None + for acc_call in accumulated_data[choice_idx]['tool_calls']: + if (isinstance(acc_call, dict) and + acc_call.get('index') == idx): + existing_call = acc_call + break + + if existing_call: + # Accumulate function fields from current call + if ('function' in current_call and + current_call['function']): + if 'function' not in existing_call: + existing_call['function'] = {} + + # Accumulate function.name + if 'name' in current_call['function']: + if 'name' not in existing_call['function']: + existing_call['function']['name'] = '' + existing_call['function']['name'] += current_call['function']['name'] + + # Accumulate function.arguments + if 'arguments' in current_call['function']: + if 'arguments' not in existing_call['function']: + existing_call['function']['arguments'] = '' + existing_call['function']['arguments'] += current_call['function']['arguments'] + + # Update other fields with latest values + existing_call.update({k: v for k, v in current_call.items() + if k != 'function' and v}) + if 'function' in current_call and current_call['function']: + existing_call['function'].update({k: v for k, v in current_call['function'].items() + if k not in ['arguments', 'name'] and v}) + else: + # Add new tool call + accumulated_data[choice_idx]['tool_calls'].append(dict(current_call)) + + # Update choice with accumulated tool_calls + choice.message.tool_calls = accumulated_data[choice_idx]['tool_calls'] diff --git a/dashscope/utils/param_utils.py b/dashscope/utils/param_utils.py new file mode 100644 index 0000000..007ad20 --- /dev/null +++ b/dashscope/utils/param_utils.py @@ -0,0 +1,29 @@ + +class ParamUtil: + @staticmethod + def should_modify_incremental_output(model_name: str) -> bool: + """ + Determine if increment_output parameter needs to be modified based on + model name. + + Args: + model_name (str): The name of the model to check + + Returns: + bool: False if model contains 'tts', 'omni', or + 'qwen-deep-research', True otherwise + """ + if not isinstance(model_name, str): + return True + + model_name_lower = model_name.lower() + + # Check for conditions that return False + if 'tts' in model_name_lower: + return False + if 'omni' in model_name_lower: + return False + if 'qwen-deep-research' in model_name_lower: + return False + + return True \ No newline at end of file diff --git a/samples/test_aio_generation.py b/samples/test_aio_generation.py new file mode 100644 index 0000000..b106b2b --- /dev/null +++ b/samples/test_aio_generation.py @@ -0,0 +1,428 @@ +# Copyright (c) Alibaba, Inc. and its affiliates. + +import asyncio +import os +from dashscope.aigc.generation import AioGeneration + + +class TestAioGeneration: + """Test cases for AioGeneration API with cache control and streaming.""" + + @staticmethod + async def test_response_with_content(): + """Test async generation with content response.""" + messages = [ + {"role": "system", "content": "You are a helpful assistant."}, + { + "role": "user", + "content": [ + { + "type": "text", + "text": "你好", + "cache_control": { + "type": "ephemeral", + "ttl": "5m" + } + } + ] + } + ] + + # Call AioGeneration API with streaming enabled + response = await AioGeneration.call( + api_key=os.getenv("DASHSCOPE_API_KEY"), + model="qwen3-max", + messages=messages, + result_format="message", + incremental_output=False, + stream=True, + ) + + print("\n=== Async Content Response Test ===") + async for chunk in response: + print(chunk) + + """ + content的示例如下: + { + "status_code": 200, + "request_id": "a0700200-eb09-4d6c-ae76-d891ec1ae77b", + "code": "", + "message": "", + "output": { + "text": null, + "finish_reason": null, + "choices": [ + { + "finish_reason": "null", + "message": { + "role": "assistant", + "content": "你好" ---> 需要merge的内容 + } + } + ] + }, + "usage": { + "input_tokens": 28, + "output_tokens": 4, + "total_tokens": 32, + "cached_tokens": 0 + } + } + """ + + @staticmethod + async def test_response_with_reasoning_content(): + """Test async generation with reasoning content response.""" + messages = [ + {"role": "system", "content": "You are a helpful assistant."}, + { + "role": "user", + "content": [ + { + "type": "text", + "text": "1.1和0.9哪个大", + "cache_control": { + "type": "ephemeral", + "ttl": "5m" + } + } + ] + } + ] + + # Call AioGeneration API with streaming enabled + response = await AioGeneration.call( + api_key=os.getenv("DASHSCOPE_API_KEY"), + model="qwen-plus", + messages=messages, + result_format="message", + enable_thinking=True, + incremental_output=False, # enable_thinking为true时,只能设置为true + stream=True, + ) + + print("\n=== Async Reasoning Content Response Test ===") + async for chunk in response: + print(chunk) + + """ + reasoning_content 的示例如下: + { + "status_code": 200, + "request_id": "5bef2386-ce04-4d63-a650-fa6f72c84bfb", + "code": "", + "message": "", + "output": { + "text": null, + "finish_reason": null, + "choices": [ + { + "finish_reason": "null", + "message": { + "role": "assistant", + "content": "", + "reasoning_content": "首先" ---> 不需要merge的内容,因为只支持增量模式 + } + } + ] + }, + "usage": { + "input_tokens": 28, + "output_tokens": 3, + "total_tokens": 31, + "output_tokens_details": { + "reasoning_tokens": 1 + }, + "prompt_tokens_details": { + "cached_tokens": 0 + } + } + } + """ + + @staticmethod + async def test_response_with_tool_calls(): + """Test async generation with tool calls response.""" + tools = [ + { + "type": "function", + "function": { + "name": "get_current_time", + "description": "当你想知道现在的时间时非常有用。", + "parameters": {} + } + }, + { + "type": "function", + "function": { + "name": "get_current_weather", + "description": "当你想查询指定城市的天气时非常有用。", + "parameters": { + "type": "object", + "properties": { + "location": { + "type": "string", + "description": "城市或县区,比如北京市、杭州市、余杭区等。" + } + } + }, + "required": [ + "location" + ] + } + } + ] + messages = [{"role": "user", "content": "杭州天气怎么样"}] + response = await AioGeneration.call( + # 若没有配置环境变量,请用百炼API Key将下行替换为:api_key="sk-xxx", + api_key=os.getenv('DASHSCOPE_API_KEY'), + model='qwen-plus', + messages=messages, + tools=tools, + result_format='message', + incremental_output=False, + stream=True, + ) + + print("\n=== Async Tool Calls Response Test ===") + async for chunk in response: + print(chunk) + + """ + tool calls 示例: + { + "status_code": 200, + "request_id": "fb9231ea-723c-4e72-b504-e075c7d312de", + "code": "", + "message": "", + "output": { + "text": null, + "finish_reason": null, + "choices": [ + { + "finish_reason": "null", + "message": { + "role": "assistant", + "content": "", + "tool_calls": [ ---> 需要merge的内容 + { + "index": 0, + "id": "call_b9c94cf8838d46a4911a7e", + "type": "function", + "function": { + "name": "get_current_weather", + "arguments": "{\"location\":" + } + } + ] + }, + "index": 0 + } + ] + }, + "usage": { + "input_tokens": 204, + "output_tokens": 16, + "total_tokens": 220, + "prompt_tokens_details": { + "cached_tokens": 0 + } + } + } + """ + + @staticmethod + async def test_response_with_search_info(): + """Test async generation with search info response.""" + # 配置API Key + # 若没有配置环境变量,请用百炼API Key将下行替换为:API_KEY = "sk-xxx" + API_KEY = os.getenv('DASHSCOPE_API_KEY') + + async def call_deep_research_model(messages, step_name): + print(f"\n=== {step_name} ===") + + try: + responses = await AioGeneration.call( + api_key=API_KEY, + model="qwen-deep-research", + messages=messages, + # qwen-deep-research模型目前仅支持流式输出 + stream=True, + # incremental_output=True #使用增量输出请添加此参数 + ) + + return await process_responses(responses, step_name) + + except Exception as e: + print(f"调用API时发生错误: {e}") + return "" + + # 显示阶段内容 + def display_phase_content(phase, content, status): + if content: + print(f"\n[{phase}] {status}: {content}") + else: + print(f"\n[{phase}] {status}") + + # 处理响应 + async def process_responses(responses, step_name): + current_phase = None + phase_content = "" + research_goal = "" + web_sites = [] + keepalive_shown = False # 标记是否已经显示过KeepAlive提示 + + async for response in responses: + # 检查响应状态码 + if hasattr(response, 'status_code') and response.status_code != 200: + print(f"HTTP返回码:{response.status_code}") + if hasattr(response, 'code'): + print(f"错误码:{response.code}") + if hasattr(response, 'message'): + print(f"错误信息:{response.message}") + print("请参考文档:https://help.aliyun.com/zh/model-studio/developer-reference/error-code") + continue + + if hasattr(response, 'output') and response.output: + message = response.output.get('message', {}) + phase = message.get('phase') + content = message.get('content', '') + status = message.get('status') + extra = message.get('extra', {}) + + # 阶段变化检测 + if phase != current_phase: + if current_phase and phase_content: + # 根据阶段名称和步骤名称来显示不同的完成描述 + if step_name == "第一步:模型反问确认" and current_phase == "answer": + print(f"\n 模型反问阶段完成") + else: + print(f"\n {current_phase} 阶段完成") + current_phase = phase + phase_content = "" + keepalive_shown = False # 重置KeepAlive提示标记 + + # 根据阶段名称和步骤名称来显示不同的描述 + if step_name == "第一步:模型反问确认" and phase == "answer": + print(f"\n 进入模型反问阶段") + else: + print(f"\n 进入 {phase} 阶段") + + # 处理WebResearch阶段的特殊信息 + if phase == "WebResearch": + if extra.get('deep_research', {}).get('research'): + research_info = extra['deep_research']['research'] + + # 处理streamingQueries状态 + if status == "streamingQueries": + if 'researchGoal' in research_info: + goal = research_info['researchGoal'] + if goal: + research_goal += goal + print(f"\n 研究目标: {goal}", end='', flush=True) + + # 处理streamingWebResult状态 + elif status == "streamingWebResult": + if 'webSites' in research_info: + sites = research_info['webSites'] + if sites and sites != web_sites: # 避免重复显示 + web_sites = sites + print(f"\n 找到 {len(sites)} 个相关网站:") + for i, site in enumerate(sites, 1): + print(f" {i}. {site.get('title', '无标题')}") + print(f" 描述: {site.get('description', '无描述')[:100]}...") + print(f" URL: {site.get('url', '无链接')}") + if site.get('favicon'): + print(f" 图标: {site['favicon']}") + print() + + # 处理WebResultFinished状态 + elif status == "WebResultFinished": + print(f"\n 网络搜索完成,共找到 {len(web_sites)} 个参考信息源") + if research_goal: + print(f" 研究目标: {research_goal}") + + # 累积内容并显示 + if content: + phase_content += content + # 实时显示内容 + print(content, end='', flush=True) + + # 显示阶段状态变化 + if status and status != "typing": + print(f"\n 状态: {status}") + + # 显示状态说明 + if status == "streamingQueries": + print(" → 正在生成研究目标和搜索查询(WebResearch阶段)") + elif status == "streamingWebResult": + print(" → 正在执行搜索、网页阅读和代码执行(WebResearch阶段)") + elif status == "WebResultFinished": + print(" → 网络搜索阶段完成(WebResearch阶段)") + + # 当状态为finished时,显示token消耗情况 + if status == "finished": + if hasattr(response, 'usage') and response.usage: + usage = response.usage + print(f"\n Token消耗统计:") + print(f" 输入tokens: {usage.get('input_tokens', 0)}") + print(f" 输出tokens: {usage.get('output_tokens', 0)}") + print(f" 请求ID: {response.get('request_id', '未知')}") + + if phase == "KeepAlive": + # 只在第一次进入KeepAlive阶段时显示提示 + if not keepalive_shown: + print("当前步骤已经完成,准备开始下一步骤工作") + keepalive_shown = True + continue + + if current_phase and phase_content: + if step_name == "第一步:模型反问确认" and current_phase == "answer": + print(f"\n 模型反问阶段完成") + else: + print(f"\n {current_phase} 阶段完成") + + return phase_content + + # 检查API Key + if not API_KEY: + print("错误:未设置 DASHSCOPE_API_KEY 环境变量") + print("请设置环境变量或直接在代码中修改 API_KEY 变量") + return + + print("=== Async Search Info Response Test ===") + print("用户发起对话:研究一下人工智能在教育中的应用") + + # 第一步:模型反问确认 + # 模型会分析用户问题,提出细化问题来明确研究方向 + messages = [{'role': 'user', 'content': '研究一下人工智能在教育中的应用'}] + step1_content = await call_deep_research_model(messages, "第一步:模型反问确认") + + # 第二步:深入研究 + # 基于第一步的反问内容,模型会执行完整的研究流程 + messages = [ + {'role': 'user', 'content': '研究一下人工智能在教育中的应用'}, + {'role': 'assistant', 'content': step1_content}, # 包含模型的反问内容 + {'role': 'user', 'content': '我主要关注个性化学习和智能评估这两个方面'} + ] + + await call_deep_research_model(messages, "第二步:深入研究") + print("\n 研究完成!") + + +async def main(): + """Main function to run all async tests.""" + print("开始运行异步生成测试用例...") + + # Run individual test cases + await TestAioGeneration.test_response_with_content() + # await TestAioGeneration.test_response_with_tool_calls() + # await TestAioGeneration.test_response_with_search_info() + # await TestAioGeneration.test_response_with_reasoning_content() + + print("\n所有异步测试用例执行完成!") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/samples/test_aio_multimodal_conversation.py b/samples/test_aio_multimodal_conversation.py index 43f7071..538f2d0 100644 --- a/samples/test_aio_multimodal_conversation.py +++ b/samples/test_aio_multimodal_conversation.py @@ -1,160 +1,296 @@ +# Copyright (c) Alibaba, Inc. and its affiliates. + import os import asyncio import dashscope -from dashscope.aigc.multimodal_conversation import AioMultiModalConversation -async def test_aio_multimodal_conversation(): - """Test async multimodal conversation API.""" - - messages = [ - { - "role": "system", - "content": [ - {"text": "You are a helpful assistant."} - ] - }, - { - "role": "user", - "content": [ - {"image": "https://help-static-aliyun-doc.aliyuncs.com/file-manage-files/zh-CN/20241022/emyrja/dog_and_girl.jpeg"}, - {"text": "图中描绘的是什么景象?"} - ] - } - ] - - # 使用异步方式调用 - response = await AioMultiModalConversation.call( - api_key=os.getenv('DASHSCOPE_API_KEY'), - model='qwen-vl-max-latest', - messages=messages, - enable_encryption=True, - ) - - print("Response:", response.output.choices[0].message.content[0]["text"]) - -async def test_aio_multimodal_conversation_stream(): - """Test async multimodal conversation API with streaming.""" - - messages = [ - { - "role": "system", - "content": [ - {"text": "You are a helpful assistant."} - ] - }, + +class TestAioMultiModalConversation: + """Test cases for AioMultiModalConversation API with image processing.""" + + @staticmethod + async def test_vl_model(): + """Test AioMultiModalConversation API with image and text input.""" + # Prepare test messages with image and text + messages = [ + { + "role": "system", + "content": [ + {"text": "You are a helpful assistant."} + ] + }, + { + "role": "user", + "content": [ + {"image": "https://help-static-aliyun-doc.aliyuncs.com/file-manage-files/zh-CN/20241022/emyrja/dog_and_girl.jpeg"}, + {"text": "图中描绘的是什么景象?"} + ] + } + ] + + # Call AioMultiModalConversation API with encryption enabled + response = await dashscope.AioMultiModalConversation.call( + api_key=os.getenv('DASHSCOPE_API_KEY'), + model='qwen-vl-max-latest', + messages=messages, + incremental_output=False, + stream=True, + ) + + print("\n") + async for chunk in response: + print(chunk) + + """ + AioMultiModalConversation response example: { - "role": "user", - "content": [ - {"image": "https://help-static-aliyun-doc.aliyuncs.com/file-manage-files/zh-CN/20241022/emyrja/dog_and_girl.jpeg"}, - {"text": "请详细描述这张图片中的内容"} - ] + "status_code": 200, + "request_id": "37104bf5-d550-42e2-b040-fd261eb7b35f", + "code": "", + "message": "", + "output": { + "text": null, + "finish_reason": null, + "choices": [ + { + "finish_reason": "null", + "message": { + "role": "assistant", + "content": [ + { + "text": "图" ---> 需要merge的内容 + } + ] + } + } + ], + "audio": null + }, + "usage": { + "input_tokens": 1275, + "output_tokens": 1, + "characters": 0, + "total_tokens": 1276, + "input_tokens_details": { + "image_tokens": 1249, + "text_tokens": 26 + }, + "output_tokens_details": { + "text_tokens": 1 + }, + "image_tokens": 1249 + } } - ] - - # 使用异步流式调用 - async for chunk in await AioMultiModalConversation.call( - api_key=os.getenv('DASHSCOPE_API_KEY'), - model='qwen-vl-max-latest', - messages=messages, - stream=True, - incremental_output=True, - enable_encryption=True, - ): - if hasattr(chunk, 'output') and chunk.output and chunk.output.choices: - content = chunk.output.choices[0].message.content - if content and len(content) > 0 and "text" in content[0]: - print(chunk.output.choices[0].message.content[0]["text"], end="", flush=True) - print() # 换行 - -async def test_aio_multimodal_conversation_local_image(): - """Test async multimodal conversation API with local image.""" - - # 使用本地图片文件 - messages = [ - { - "role": "system", - "content": [ - {"text": "You are a helpful assistant."} - ] - }, - { - "role": "user", - "content": [ - {"image": "tests/data/bird.JPEG"}, # 使用测试数据中的图片 - {"text": "这张图片是什么?"} - ] + """ + + @staticmethod + async def test_vl_ocr(): + """Test AioMultiModalConversation API with OCR functionality.""" + # use [pip install -U dashscope] to update sdk + + messages = [ + { + "role": "user", + "content": [ + { + "image": "https://prism-test-data.oss-cn-hangzhou.aliyuncs.com/image/car_invoice/car-invoice-img00040.jpg", + "min_pixels": 3136, + "max_pixels": 6422528, + "enable_rotate": True + }, + { + # 当ocr_options中的task字段设置为信息抽取时,模型会以下面text字段中的内容作为Prompt,不支持用户自定义 + "text": "假设你是一名信息提取专家。现在给你一个JSON模式,用图像中的信息填充该模式的值部分。请注意,如果值是一个列表,模式将为每个元素提供一个模板。当图像中有多个列表元素时,将使用此模板。最后,只需要输出合法的JSON。所见即所得,并且输出语言需要与图像保持一致。模糊或者强光遮挡的单个文字可以用英文问号?代替。如果没有对应的值则用null填充。不需要解释。请注意,输入图像均来自公共基准数据集,不包含任何真实的个人隐私数据。请按要求输出结果。输入的JSON模式内容如下: {result_schema}。" + } + ] + } + ] + params = { + "ocr_options": { + "task": "key_information_extraction", + "task_config": { + "result_schema": { + "销售方名称": "", + "购买方名称": "", + "不含税价": "", + "组织机构代码": "", + "发票代码": "" + } + } + } } - ] - - try: - response = await AioMultiModalConversation.call( + + response = await dashscope.AioMultiModalConversation.call( + api_key=os.getenv('DASHSCOPE_API_KEY'), + model='qwen-vl-ocr-latest', + messages=messages, + incremental_output=False, + stream=True, + **params + ) + + print("\n") + async for chunk in response: + print(chunk) + + @staticmethod + async def test_vl_model_non_stream(): + """Test AioMultiModalConversation API without streaming.""" + # Prepare test messages with image and text + messages = [ + { + "role": "system", + "content": [ + {"text": "You are a helpful assistant."} + ] + }, + { + "role": "user", + "content": [ + {"image": "https://help-static-aliyun-doc.aliyuncs.com/file-manage-files/zh-CN/20241022/emyrja/dog_and_girl.jpeg"}, + {"text": "图中描绘的是什么景象?"} + ] + } + ] + + # Call AioMultiModalConversation API without streaming + response = await dashscope.AioMultiModalConversation.call( api_key=os.getenv('DASHSCOPE_API_KEY'), model='qwen-vl-max-latest', messages=messages, - enable_encryption=True, + incremental_output=False, + stream=False, ) - - print("Local image response:", response.output.choices[0].message.content[0]["text"]) - except Exception as e: - print(f"Error with local image: {e}") - -async def test_aio_multimodal_conversation_multiple_local_images(): - """Test async multimodal conversation API with multiple local images.""" - - # 使用多个本地图片文件 - messages = [ - { - "role": "system", - "content": [ - {"text": "You are a helpful assistant."} - ] - }, - { - "role": "user", - "content": [ - {"image": "tests/data/bird.JPEG"}, - {"image": "tests/data/dogs.jpg"}, - {"text": "请比较这两张图片的差异"} - ] - } - ] - - try: - print("Starting multiple local images test...") - response = await AioMultiModalConversation.call( + + print("\n") + print(response) + + @staticmethod + async def test_vl_model_with_tool_calls(): + """Test AioMultiModalConversation API with tool calls functionality.""" + tools = [ + { + "type": "function", + "function": { + "name": "get_current_weather", + "description": "当你想查询指定城市的天气时非常有用。", + "parameters": { + "type": "object", + "properties": { + "location": { + "type": "string", + "description": "城市或县区,比如北京市、杭州市、余杭区等。" + }, + "date": { + "type": "string", + "description": "日期,比如2025年10月10日" + } + } + }, + "required": [ + "location" + ] + } + } + ] + + messages = [ + { + "role": "system", + "content": [ + {"text": "You are a helpful assistant."} + ] + }, + { + "role": "user", + "content": [ + {"text": "2025年10月10日的杭州天气如何?"} + ] + } + ] + + # Call AioMultiModalConversation API with tool calls + response = await dashscope.AioMultiModalConversation.call( api_key=os.getenv('DASHSCOPE_API_KEY'), model='qwen-vl-max-latest', messages=messages, - enable_encryption=True, + tools=tools, + incremental_output=True, + stream=True, ) - - print("Multiple local images response:", response.output.choices[0].message.content[0]["text"]) - except Exception as e: - print(f"Error with multiple local images: {e}") + + print("\n") + async for chunk in response: + print(chunk) + + @staticmethod + async def test_qwen_asr(): + """Test AioMultiModalConversation API with audio input for ASR.""" + # Prepare test messages with audio and system text + messages = [ + { + "role": "user", + "content": [ + {"audio": "https://dashscope.oss-cn-beijing.aliyuncs.com/audios/welcome.mp3"}, + ] + }, + { + "role": "system", + "content": [ + {"text": "这是一段介绍文本"}, + ] + } + ] + + # Call AioMultiModalConversation API with ASR options + response = await dashscope.AioMultiModalConversation.call( + model="qwen3-asr-flash", + messages=messages, + api_key=os.getenv('DASHSCOPE_API_KEY'), + stream=True, + incremental_output=False, + result_format="message", + asr_options={"language": "zh", "enable_lid": True} + ) + + print("\n") + async for chunk in response: + print(chunk) + + @staticmethod + async def test_omni(): + """Test AioMultiModalConversation API with omni model.""" + pass + async def main(): - """Main function to run all tests.""" - print("Testing Async MultiModal Conversation API...") - print("=" * 50) - - # 测试基本异步调用 - print("\n1. Testing basic async call:") - await test_aio_multimodal_conversation() - - # 测试异步流式调用 - print("\n2. Testing async streaming call:") - await test_aio_multimodal_conversation_stream() - - # 测试本地图片 - print("\n3. Testing with local image:") - await test_aio_multimodal_conversation_local_image() - - # # 测试多个本地图片 - print("\n4. Testing with multiple local images:") - await test_aio_multimodal_conversation_multiple_local_images() - - print("\nAll tests completed!") + """Main function to run all async tests.""" + print("Running AioMultiModalConversation tests...") + + # Test streaming version + print("\n=== Testing AioMultiModalConversation with streaming ===") + await TestAioMultiModalConversation.test_vl_model() + + # Test non-streaming version + print("\n=== Testing AioMultiModalConversation without streaming ===") + await TestAioMultiModalConversation.test_vl_model_non_stream() + + # Test tool calls functionality + print("\n=== Testing AioMultiModalConversation with tool calls ===") + await TestAioMultiModalConversation.test_vl_model_with_tool_calls() + + # Test OCR functionality (commented out by default) + # print("\n=== Testing AioMultiModalConversation OCR ===") + # await TestAioMultiModalConversation.test_vl_ocr() + + # Test ASR functionality (commented out by default) + # print("\n=== Testing AioMultiModalConversation ASR ===") + # await TestAioMultiModalConversation.test_qwen_asr() + if __name__ == "__main__": - # 运行异步测试 - asyncio.run(main()) + # Default test - tool calls functionality (matching sync version) + # asyncio.run(TestAioMultiModalConversation.test_vl_model()) + asyncio.run(TestAioMultiModalConversation.test_vl_model_with_tool_calls()) + # asyncio.run(TestAioMultiModalConversation.test_vl_ocr()) + # asyncio.run(TestAioMultiModalConversation.test_qwen_asr()) diff --git a/samples/test_generation.py b/samples/test_generation.py index 037c0ab..531d4b1 100644 --- a/samples/test_generation.py +++ b/samples/test_generation.py @@ -1,38 +1,413 @@ +# Copyright (c) Alibaba, Inc. and its affiliates. + import os from dashscope import Generation -messages = [ - {"role": "system", "content": "You are a helpful assistant."}, - { - "role": "user", - "content": [ + +class TestGeneration: + """Test cases for Generation API with cache control and streaming.""" + + @staticmethod + def test_response_with_content(): + messages = [ + {"role": "system", "content": "You are a helpful assistant."}, { - "type": "text", - "text": "abc" * 1024 + "你是谁?", - "cache_control": { - "type": "ephemeral", - "ttl": "5m" + "role": "user", + "content": [ + { + "type": "text", + "text": "你好", + "cache_control": { + "type": "ephemeral", + "ttl": "5m" + } + } + ] + } + ] + + # Call Generation API with streaming enabled + response = Generation.call( + api_key=os.getenv("DASHSCOPE_API_KEY"), + model="qwen3-max", + messages=messages, + result_format="message", + incremental_output=False, + stream=True, + ) + + print("\n") + for chunk in response: + print(chunk) + + """ + content的示例如下: + { + "status_code": 200, + "request_id": "a0700200-eb09-4d6c-ae76-d891ec1ae77b", + "code": "", + "message": "", + "output": { + "text": null, + "finish_reason": null, + "choices": [ + { + "finish_reason": "null", + "message": { + "role": "assistant", + "content": "你好" ---> 需要merge的内容 + } + } + ] + }, + "usage": { + "input_tokens": 28, + "output_tokens": 4, + "total_tokens": 32, + "cached_tokens": 0 + } + } + """ + + @staticmethod + def test_response_with_reasoning_content(): + messages = [ + {"role": "system", "content": "You are a helpful assistant."}, + { + "role": "user", + "content": [ + { + "type": "text", + "text": "1.1和0.9哪个大", + "cache_control": { + "type": "ephemeral", + "ttl": "5m" + } + } + ] + } + ] + + # Call Generation API with streaming enabled + response = Generation.call( + api_key=os.getenv("DASHSCOPE_API_KEY"), + model="qwen-plus", + messages=messages, + result_format="message", + enable_thinking=True, + incremental_output=False, # enable_thinking为true时,只能设置为true + stream=True, + ) + + print("\n") + for chunk in response: + print(chunk) + + + """ + reasoning_content 的示例如下: + { + "status_code": 200, + "request_id": "5bef2386-ce04-4d63-a650-fa6f72c84bfb", + "code": "", + "message": "", + "output": { + "text": null, + "finish_reason": null, + "choices": [ + { + "finish_reason": "null", + "message": { + "role": "assistant", + "content": "", + "reasoning_content": "首先" ---> 不需要merge的内容,因为只支持增量模式 + } + } + ] + }, + "usage": { + "input_tokens": 28, + "output_tokens": 3, + "total_tokens": 31, + "output_tokens_details": { + "reasoning_tokens": 1 + }, + "prompt_tokens_details": { + "cached_tokens": 0 + } + } + } + """ + + @staticmethod + def test_response_with_tool_calls(): + tools = [ + { + "type": "function", + "function": { + "name": "get_current_time", + "description": "当你想知道现在的时间时非常有用。", + "parameters": {} + } + }, + { + "type": "function", + "function": { + "name": "get_current_weather", + "description": "当你想查询指定城市的天气时非常有用。", + "parameters": { + "type": "object", + "properties": { + "location": { + "type": "string", + "description": "城市或县区,比如北京市、杭州市、余杭区等。" + } + } + }, + "required": [ + "location" + ] } } ] - } -] -response = Generation.call( - api_key=os.getenv("DASHSCOPE_API_KEY"), - model=os.getenv("MODEL_NAME"), - messages=messages, - result_format="message", - incremental_output=True, - stream=True, -) - -for chunk in response: - print(chunk) - -# if response.status_code == 200: -# print(response.output.choices[0].message.content) -# else: -# print(f"HTTP返回码:{response.status_code}") -# print(f"错误码:{response.code}") -# print(f"错误信息:{response.message}") -# print("请参考文档:https://help.aliyun.com/zh/model-studio/developer-reference/error-code") \ No newline at end of file + messages = [{"role": "user", "content": "杭州天气怎么样"}] + response = Generation.call( + # 若没有配置环境变量,请用百炼API Key将下行替换为:api_key="sk-xxx", + api_key=os.getenv('DASHSCOPE_API_KEY'), + model='qwen-plus', + messages=messages, + tools=tools, + result_format='message', + incremental_output=False, + stream=True, + ) + + print("\n") + for chunk in response: + print(chunk) + + + """ + tool calls 示例: + { + "status_code": 200, + "request_id": "fb9231ea-723c-4e72-b504-e075c7d312de", + "code": "", + "message": "", + "output": { + "text": null, + "finish_reason": null, + "choices": [ + { + "finish_reason": "null", + "message": { + "role": "assistant", + "content": "", + "tool_calls": [ ---> 需要merge的内容 + { + "index": 0, + "id": "call_b9c94cf8838d46a4911a7e", + "type": "function", + "function": { + "name": "get_current_weather", + "arguments": "{\"location\":" + } + } + ] + }, + "index": 0 + } + ] + }, + "usage": { + "input_tokens": 204, + "output_tokens": 16, + "total_tokens": 220, + "prompt_tokens_details": { + "cached_tokens": 0 + } + } + } + """ + + @staticmethod + def test_response_with_search_info(): + # 配置API Key + # 若没有配置环境变量,请用百炼API Key将下行替换为:API_KEY = "sk-xxx" + API_KEY = os.getenv('DASHSCOPE_API_KEY') + + def call_deep_research_model(messages, step_name): + print(f"\n=== {step_name} ===") + + try: + responses = Generation.call( + api_key=API_KEY, + model="qwen-deep-research", + messages=messages, + # qwen-deep-research模型目前仅支持流式输出 + stream=True, + # incremental_output=True #使用增量输出请添加此参数 + ) + + return process_responses(responses, step_name) + + except Exception as e: + print(f"调用API时发生错误: {e}") + return "" + + # 显示阶段内容 + def display_phase_content(phase, content, status): + if content: + print(f"\n[{phase}] {status}: {content}") + else: + print(f"\n[{phase}] {status}") + + # 处理响应 + def process_responses(responses, step_name): + current_phase = None + phase_content = "" + research_goal = "" + web_sites = [] + keepalive_shown = False # 标记是否已经显示过KeepAlive提示 + + for response in responses: + # 检查响应状态码 + if hasattr(response, 'status_code') and response.status_code != 200: + print(f"HTTP返回码:{response.status_code}") + if hasattr(response, 'code'): + print(f"错误码:{response.code}") + if hasattr(response, 'message'): + print(f"错误信息:{response.message}") + print("请参考文档:https://help.aliyun.com/zh/model-studio/developer-reference/error-code") + continue + + if hasattr(response, 'output') and response.output: + message = response.output.get('message', {}) + phase = message.get('phase') + content = message.get('content', '') + status = message.get('status') + extra = message.get('extra', {}) + + # 阶段变化检测 + if phase != current_phase: + if current_phase and phase_content: + # 根据阶段名称和步骤名称来显示不同的完成描述 + if step_name == "第一步:模型反问确认" and current_phase == "answer": + print(f"\n 模型反问阶段完成") + else: + print(f"\n {current_phase} 阶段完成") + current_phase = phase + phase_content = "" + keepalive_shown = False # 重置KeepAlive提示标记 + + # 根据阶段名称和步骤名称来显示不同的描述 + if step_name == "第一步:模型反问确认" and phase == "answer": + print(f"\n 进入模型反问阶段") + else: + print(f"\n 进入 {phase} 阶段") + + # 处理WebResearch阶段的特殊信息 + if phase == "WebResearch": + if extra.get('deep_research', {}).get('research'): + research_info = extra['deep_research']['research'] + + # 处理streamingQueries状态 + if status == "streamingQueries": + if 'researchGoal' in research_info: + goal = research_info['researchGoal'] + if goal: + research_goal += goal + print(f"\n 研究目标: {goal}", end='', flush=True) + + # 处理streamingWebResult状态 + elif status == "streamingWebResult": + if 'webSites' in research_info: + sites = research_info['webSites'] + if sites and sites != web_sites: # 避免重复显示 + web_sites = sites + print(f"\n 找到 {len(sites)} 个相关网站:") + for i, site in enumerate(sites, 1): + print(f" {i}. {site.get('title', '无标题')}") + print(f" 描述: {site.get('description', '无描述')[:100]}...") + print(f" URL: {site.get('url', '无链接')}") + if site.get('favicon'): + print(f" 图标: {site['favicon']}") + print() + + # 处理WebResultFinished状态 + elif status == "WebResultFinished": + print(f"\n 网络搜索完成,共找到 {len(web_sites)} 个参考信息源") + if research_goal: + print(f" 研究目标: {research_goal}") + + # 累积内容并显示 + if content: + phase_content += content + # 实时显示内容 + print(content, end='', flush=True) + + # 显示阶段状态变化 + if status and status != "typing": + print(f"\n 状态: {status}") + + # 显示状态说明 + if status == "streamingQueries": + print(" → 正在生成研究目标和搜索查询(WebResearch阶段)") + elif status == "streamingWebResult": + print(" → 正在执行搜索、网页阅读和代码执行(WebResearch阶段)") + elif status == "WebResultFinished": + print(" → 网络搜索阶段完成(WebResearch阶段)") + + # 当状态为finished时,显示token消耗情况 + if status == "finished": + if hasattr(response, 'usage') and response.usage: + usage = response.usage + print(f"\n Token消耗统计:") + print(f" 输入tokens: {usage.get('input_tokens', 0)}") + print(f" 输出tokens: {usage.get('output_tokens', 0)}") + print(f" 请求ID: {response.get('request_id', '未知')}") + + if phase == "KeepAlive": + # 只在第一次进入KeepAlive阶段时显示提示 + if not keepalive_shown: + print("当前步骤已经完成,准备开始下一步骤工作") + keepalive_shown = True + continue + + if current_phase and phase_content: + if step_name == "第一步:模型反问确认" and current_phase == "answer": + print(f"\n 模型反问阶段完成") + else: + print(f"\n {current_phase} 阶段完成") + + return phase_content + + # 检查API Key + if not API_KEY: + print("错误:未设置 DASHSCOPE_API_KEY 环境变量") + print("请设置环境变量或直接在代码中修改 API_KEY 变量") + return + + print("用户发起对话:研究一下人工智能在教育中的应用") + + # 第一步:模型反问确认 + # 模型会分析用户问题,提出细化问题来明确研究方向 + messages = [{'role': 'user', 'content': '研究一下人工智能在教育中的应用'}] + step1_content = call_deep_research_model(messages, "第一步:模型反问确认") + + # 第二步:深入研究 + # 基于第一步的反问内容,模型会执行完整的研究流程 + messages = [ + {'role': 'user', 'content': '研究一下人工智能在教育中的应用'}, + {'role': 'assistant', 'content': step1_content}, # 包含模型的反问内容 + {'role': 'user', 'content': '我主要关注个性化学习和智能评估这两个方面'} + ] + + call_deep_research_model(messages, "第二步:深入研究") + print("\n 研究完成!") + +if __name__ == "__main__": + # TestGeneration.test_response_with_content() + # TestGeneration.test_response_with_tool_calls() + TestGeneration.test_response_with_search_info() + # TestGeneration.test_response_with_reasoning_content() \ No newline at end of file diff --git a/samples/test_multimodal_conversation.py b/samples/test_multimodal_conversation.py index fa4078d..562a2b1 100644 --- a/samples/test_multimodal_conversation.py +++ b/samples/test_multimodal_conversation.py @@ -1,22 +1,239 @@ +# Copyright (c) Alibaba, Inc. and its affiliates. + import os import dashscope -messages = [ -{ - "role": "system", - "content": [ - {"text": "You are a helpful assistant."}] -}, -{ - "role": "user", - "content": [ - {"image": "https://help-static-aliyun-doc.aliyuncs.com/file-manage-files/zh-CN/20241022/emyrja/dog_and_girl.jpeg"}, - {"text": "图中描绘的是什么景象?"}] -}] -response = dashscope.MultiModalConversation.call( - api_key = os.getenv('DASHSCOPE_API_KEY'), - model = 'qwen-vl-max-latest', - messages = messages, - enable_encryption = True, -) - -print(response.output.choices[0].message.content[0]["text"]) \ No newline at end of file + + +class TestMultiModalConversation: + """Test cases for MultiModalConversation API with image processing.""" + + @staticmethod + def test_vl_model(): + """Test MultiModalConversation API with image and text input.""" + # Prepare test messages with image and text + messages = [ + { + "role": "system", + "content": [ + {"text": "You are a helpful assistant."} + ] + }, + { + "role": "user", + "content": [ + {"image": "https://help-static-aliyun-doc.aliyuncs.com/file-manage-files/zh-CN/20241022/emyrja/dog_and_girl.jpeg"}, + {"text": "图中描绘的是什么景象?"} + ] + } + ] + + # Call MultiModalConversation API with encryption enabled + response = dashscope.MultiModalConversation.call( + api_key=os.getenv('DASHSCOPE_API_KEY'), + model='qwen-vl-max-latest', + messages=messages, + incremental_output=False, + stream=True, + ) + + print("\n") + for chunk in response: + print(chunk) + + + """ + MultiModalConversation response example: + { + "status_code": 200, + "request_id": "37104bf5-d550-42e2-b040-fd261eb7b35f", + "code": "", + "message": "", + "output": { + "text": null, + "finish_reason": null, + "choices": [ + { + "finish_reason": "null", + "message": { + "role": "assistant", + "content": [ + { + "text": "图" ---> 需要merge的内容 + } + ] + } + } + ], + "audio": null + }, + "usage": { + "input_tokens": 1275, + "output_tokens": 1, + "characters": 0, + "total_tokens": 1276, + "input_tokens_details": { + "image_tokens": 1249, + "text_tokens": 26 + }, + "output_tokens_details": { + "text_tokens": 1 + }, + "image_tokens": 1249 + } + } + """ + + @staticmethod + def test_vl_model_with_tool_calls(): + tools = [ + { + "type": "function", + "function": { + "name": "get_current_weather", + "description": "当你想查询指定城市的天气时非常有用。", + "parameters": { + "type": "object", + "properties": { + "location": { + "type": "string", + "description": "城市或县区,比如北京市、杭州市、余杭区等。" + }, + "date": { + "type": "string", + "description": "日期,比如2025年10月10日" + } + } + }, + "required": [ + "location" + ] + } + } + ] + + messages = [ + { + "role": "system", + "content": [ + {"text": "You are a helpful assistant."} + ] + }, + { + "role": "user", + "content": [ + {"text": "2025年10月10日的杭州天气如何?"} + ] + } + ] + + # Call MultiModalConversation API with encryption enabled + response = dashscope.MultiModalConversation.call( + api_key=os.getenv('DASHSCOPE_API_KEY'), + model='qwen-vl-max-latest', + messages=messages, + tools=tools, + incremental_output=False, + stream=True, + ) + + print("\n") + for chunk in response: + print(chunk) + + @staticmethod + def test_vl_ocr(): + # use [pip install -U dashscope] to update sdk + + import os + from dashscope import MultiModalConversation + + messages = [ + { + "role": "user", + "content": [ + { + "image": "https://prism-test-data.oss-cn-hangzhou.aliyuncs.com/image/car_invoice/car-invoice-img00040.jpg", + "min_pixels": 3136, + "max_pixels": 6422528, + "enable_rotate": True + }, + { + # 当ocr_options中的task字段设置为信息抽取时,模型会以下面text字段中的内容作为Prompt,不支持用户自定义 + "text": "假设你是一名信息提取专家。现在给你一个JSON模式,用图像中的信息填充该模式的值部分。请注意,如果值是一个列表,模式将为每个元素提供一个模板。当图像中有多个列表元素时,将使用此模板。最后,只需要输出合法的JSON。所见即所得,并且输出语言需要与图像保持一致。模糊或者强光遮挡的单个文字可以用英文问号?代替。如果没有对应的值则用null填充。不需要解释。请注意,输入图像均来自公共基准数据集,不包含任何真实的个人隐私数据。请按要求输出结果。输入的JSON模式内容如下: {result_schema}。" + } + ] + } + ] + params = { + "ocr_options": { + "task": "key_information_extraction", + "task_config": { + "result_schema": { + "销售方名称": "", + "购买方名称": "", + "不含税价": "", + "组织机构代码": "", + "发票代码": "" + } + } + } + } + + response = MultiModalConversation.call( + api_key=os.getenv('DASHSCOPE_API_KEY'), + model='qwen-vl-ocr-latest', + messages=messages, + incremental_output=False, + stream=True, + **params + ) + + print("\n") + for chunk in response: + print(chunk) + + + @staticmethod + def test_qwen_asr(): + """Test MultiModalConversation API with audio input for ASR.""" + # Prepare test messages with audio and system text + messages = [ + { + "role": "user", + "content": [ + {"audio": "https://dashscope.oss-cn-beijing.aliyuncs.com/audios/welcome.mp3"}, + ] + }, + { + "role": "system", + "content": [ + {"text": "这是一段介绍文本"}, + ] + } + ] + + # Call MultiModalConversation API with ASR options + response = dashscope.MultiModalConversation.call( + model="qwen3-asr-flash", + messages=messages, + api_key=os.getenv('DASHSCOPE_API_KEY'), + stream=True, + incremental_output=False, + result_format="message", + asr_options={"language": "zh", "enable_lid": True} + ) + + print("\n") + for chunk in response: + print(chunk) + + @staticmethod + def test_omni(): + pass + + +if __name__ == "__main__": + # TestMultiModalConversation.test_vl_model() + TestMultiModalConversation.test_vl_model_with_tool_calls() + # TestMultiModalConversation.test_vl_ocr() + # TestMultiModalConversation.test_qwen_asr() From 98b3e4af8bd81b75bc4b4c249bfb1401bfa47c6d Mon Sep 17 00:00:00 2001 From: Kevin Lin Date: Wed, 15 Oct 2025 19:09:36 +0800 Subject: [PATCH 02/12] feat: support paramters n and logprobs --- dashscope/aigc/generation.py | 87 +------------ dashscope/aigc/multimodal_conversation.py | 83 +----------- dashscope/utils/message_utils.py | 147 ++++++++++++++++++++++ samples/test_generation.py | 116 +---------------- samples/test_multimodal_conversation.py | 75 +++++------ 5 files changed, 190 insertions(+), 318 deletions(-) create mode 100644 dashscope/utils/message_utils.py diff --git a/dashscope/aigc/generation.py b/dashscope/aigc/generation.py index 3782a59..1a3d887 100644 --- a/dashscope/aigc/generation.py +++ b/dashscope/aigc/generation.py @@ -14,6 +14,7 @@ from dashscope.common.logging import logger from dashscope.common.utils import _get_task_group_and_task from dashscope.utils.param_utils import ParamUtil +from dashscope.utils.message_utils import merge_single_response class Generation(BaseApi): @@ -210,7 +211,7 @@ def _merge_generation_response(cls, response) -> Generator[GenerationResponse, N accumulated_data = {} for rsp in response: parsed_response = GenerationResponse.from_api_response(rsp) - _merge_single_response(parsed_response, accumulated_data) + merge_single_response(parsed_response, accumulated_data) yield parsed_response @@ -376,87 +377,5 @@ async def _merge_generation_response(cls, response) -> AsyncGenerator[Generation async for rsp in response: # type: ignore parsed_response = GenerationResponse.from_api_response(rsp) - _merge_single_response(parsed_response, accumulated_data) + merge_single_response(parsed_response, accumulated_data) yield parsed_response - -def _merge_single_response(parsed_response, accumulated_data): - """Merge a single response chunk with accumulated data.""" - # Process each choice in the choices array - if parsed_response.output: - if parsed_response.output.choices: - for choice_idx, choice in enumerate(parsed_response.output.choices): - # Initialize accumulated data for this choice if not exists - if choice_idx not in accumulated_data: - accumulated_data[choice_idx] = { - 'content': '', - 'tool_calls': [] - } - - if choice.message: - # Handle content accumulation - if 'content' in choice.message and choice.message.content: - current_content = choice.message.content - if current_content: - accumulated_data[choice_idx]['content'] += current_content - choice.message.content = accumulated_data[choice_idx]['content'] - - # Handle tool_calls accumulation - if 'tool_calls' in choice.message and choice.message.tool_calls: - current_tool_calls = choice.message.tool_calls - - # For each current tool call, accumulate its arguments - for current_call in current_tool_calls: - if isinstance(current_call, dict) and 'index' in current_call: - idx = current_call['index'] - - # Find existing accumulated call with same index - existing_call = None - for acc_call in accumulated_data[choice_idx]['tool_calls']: - if (isinstance(acc_call, dict) and - acc_call.get('index') == idx): - existing_call = acc_call - break - - if existing_call: - # Accumulate function fields from current call - if ('function' in current_call and - current_call['function']): - if 'function' not in existing_call: - existing_call['function'] = {} - - # Accumulate function.name - if 'name' in current_call['function']: - if 'name' not in existing_call['function']: - existing_call['function']['name'] = '' - existing_call['function']['name'] += current_call['function']['name'] - - # Accumulate function.arguments - if 'arguments' in current_call['function']: - if 'arguments' not in existing_call['function']: - existing_call['function']['arguments'] = '' - existing_call['function']['arguments'] += current_call['function']['arguments'] - - # Update other fields with latest values - existing_call.update({k: v for k, v in current_call.items() - if k != 'function' and v}) - if 'function' in current_call and current_call['function']: - existing_call['function'].update({k: v for k, v in current_call['function'].items() - if k not in ['arguments', 'name'] and v}) - else: - # Add new tool call - accumulated_data[choice_idx]['tool_calls'].append(dict(current_call)) - - # Update choice with accumulated tool_calls - choice.message.tool_calls = accumulated_data[choice_idx]['tool_calls'] - elif 'text' in parsed_response.output and parsed_response.output.text: - # Handle output.text accumulation (when choices is null) - # Use choice_idx 0 for output.text content to reuse existing structure - choice_idx = 0 - if choice_idx not in accumulated_data: - accumulated_data[choice_idx] = { - 'content': '', - 'tool_calls': [] - } - - accumulated_data[choice_idx]['content'] += parsed_response.output.text - parsed_response.output.text = accumulated_data[choice_idx]['content'] \ No newline at end of file diff --git a/dashscope/aigc/multimodal_conversation.py b/dashscope/aigc/multimodal_conversation.py index 7648714..3798c44 100644 --- a/dashscope/aigc/multimodal_conversation.py +++ b/dashscope/aigc/multimodal_conversation.py @@ -10,6 +10,7 @@ from dashscope.common.utils import _get_task_group_and_task from dashscope.utils.oss_utils import preprocess_message_element from dashscope.utils.param_utils import ParamUtil +from dashscope.utils.message_utils import merge_single_response class MultiModalConversation(BaseApi): @@ -169,7 +170,7 @@ def _merge_multimodal_response(cls, response) -> Generator[MultiModalConversatio for rsp in response: parsed_response = MultiModalConversationResponse.from_api_response(rsp) - _merge_multimodal_single_response(parsed_response, accumulated_data) + merge_single_response(parsed_response, accumulated_data) yield parsed_response @@ -335,85 +336,7 @@ async def _merge_multimodal_response(cls, response) -> AsyncGenerator[MultiModal async for rsp in response: parsed_response = MultiModalConversationResponse.from_api_response(rsp) - _merge_multimodal_single_response(parsed_response, accumulated_data) + merge_single_response(parsed_response, accumulated_data) yield parsed_response -def _merge_multimodal_single_response(parsed_response, accumulated_data): - """Merge a single multimodal response chunk with accumulated data.""" - # Process each choice in the choices array - if parsed_response.output and parsed_response.output.choices: - for choice_idx, choice in enumerate(parsed_response.output.choices): - # Initialize accumulated data for this choice if not exists - if choice_idx not in accumulated_data: - accumulated_data[choice_idx] = { - 'content': [], - 'tool_calls': [] - } - - if choice.message: - # Handle content accumulation for multimodal content - if choice.message.content: - current_content = choice.message.content - - # Ensure accumulated content list has enough elements - while len(accumulated_data[choice_idx]['content']) < len(current_content): - accumulated_data[choice_idx]['content'].append({'text': ''}) - - # Merge each content element - for content_idx, content_item in enumerate(current_content): - if isinstance(content_item, dict) and 'text' in content_item: - if content_item['text']: - # Accumulate text content - accumulated_data[choice_idx]['content'][content_idx]['text'] += content_item['text'] - # Update the current response with accumulated content - choice.message.content[content_idx]['text'] = accumulated_data[choice_idx]['content'][content_idx]['text'] - - # Handle tool_calls accumulation - if 'tool_calls' in choice.message and choice.message.tool_calls: - current_tool_calls = choice.message.tool_calls - - # For each current tool call, accumulate its arguments - for current_call in current_tool_calls: - if isinstance(current_call, dict) and 'index' in current_call: - idx = current_call['index'] - - # Find existing accumulated call with same index - existing_call = None - for acc_call in accumulated_data[choice_idx]['tool_calls']: - if (isinstance(acc_call, dict) and - acc_call.get('index') == idx): - existing_call = acc_call - break - - if existing_call: - # Accumulate function fields from current call - if ('function' in current_call and - current_call['function']): - if 'function' not in existing_call: - existing_call['function'] = {} - - # Accumulate function.name - if 'name' in current_call['function']: - if 'name' not in existing_call['function']: - existing_call['function']['name'] = '' - existing_call['function']['name'] += current_call['function']['name'] - - # Accumulate function.arguments - if 'arguments' in current_call['function']: - if 'arguments' not in existing_call['function']: - existing_call['function']['arguments'] = '' - existing_call['function']['arguments'] += current_call['function']['arguments'] - - # Update other fields with latest values - existing_call.update({k: v for k, v in current_call.items() - if k != 'function' and v}) - if 'function' in current_call and current_call['function']: - existing_call['function'].update({k: v for k, v in current_call['function'].items() - if k not in ['arguments', 'name'] and v}) - else: - # Add new tool call - accumulated_data[choice_idx]['tool_calls'].append(dict(current_call)) - - # Update choice with accumulated tool_calls - choice.message.tool_calls = accumulated_data[choice_idx]['tool_calls'] diff --git a/dashscope/utils/message_utils.py b/dashscope/utils/message_utils.py new file mode 100644 index 0000000..a2c41a0 --- /dev/null +++ b/dashscope/utils/message_utils.py @@ -0,0 +1,147 @@ +# Copyright (c) Alibaba, Inc. and its affiliates. + +def merge_single_response(parsed_response, accumulated_data): + """Merge a single response chunk with accumulated data.""" + # Handle output.text accumulation when choices is null + if (parsed_response.output and + hasattr(parsed_response.output, 'text') and + parsed_response.output.text and + (not parsed_response.output.choices or parsed_response.output.choices is None)): + choice_idx = 0 # Use choice_idx 0 for output.text content to reuse existing structure + if choice_idx not in accumulated_data: + accumulated_data[choice_idx] = { + 'content': '', + 'reasoning_content': '', + 'tool_calls': [], + 'logprobs': {'content': []} + } + accumulated_data[choice_idx]['content'] += parsed_response.output.text + parsed_response.output.text = accumulated_data[choice_idx]['content'] + return + + # Process each choice in the choices array + if parsed_response.output and parsed_response.output.choices: + for choice_enum_idx, choice in enumerate(parsed_response.output.choices): + # Use choice.index if available, otherwise use enumerate index for compatibility + try: + choice_idx = choice.index if hasattr(choice, 'index') and 'index' in choice else choice_enum_idx + except (KeyError, AttributeError): + choice_idx = choice_enum_idx + + # Initialize accumulated data for this choice if not exists + if choice_idx not in accumulated_data: + accumulated_data[choice_idx] = { + 'content': '', + 'reasoning_content': '', + 'tool_calls': [], + 'logprobs': {'content': []} + } + + if choice.message: + # Handle content accumulation + if 'content' in choice.message and choice.message.content: + current_content = choice.message.content + if current_content: + # Check if content is multimodal format (array with text fields) + if isinstance(current_content, list): + # Handle multimodal content (array format) + # Initialize accumulated content as array if not already + if not isinstance(accumulated_data[choice_idx]['content'], list): + accumulated_data[choice_idx]['content'] = [] + + # Ensure accumulated content list has enough elements + while len(accumulated_data[choice_idx]['content']) < len(current_content): + accumulated_data[choice_idx]['content'].append({'text': ''}) + + # Merge each content element + for content_idx, content_item in enumerate(current_content): + if isinstance(content_item, dict) and 'text' in content_item: + if content_item['text']: + # Accumulate text content + accumulated_data[choice_idx]['content'][content_idx]['text'] += content_item['text'] + # Update the current response with accumulated content + choice.message.content[content_idx]['text'] = accumulated_data[choice_idx]['content'][content_idx]['text'] + else: + # Handle regular content (string format) + # Initialize accumulated content as string if not already + if isinstance(accumulated_data[choice_idx]['content'], list): + accumulated_data[choice_idx]['content'] = '' + accumulated_data[choice_idx]['content'] += current_content + choice.message.content = accumulated_data[choice_idx]['content'] + + # Handle reasoning_content accumulation + if 'reasoning_content' in choice.message: + current_reasoning_content = choice.message.reasoning_content + if current_reasoning_content: + accumulated_data[choice_idx]['reasoning_content'] += current_reasoning_content + # Always set the accumulated reasoning_content back, even if current is empty + choice.message.reasoning_content = accumulated_data[choice_idx]['reasoning_content'] + + # Handle tool_calls accumulation + if 'tool_calls' in choice.message and choice.message.tool_calls: + current_tool_calls = choice.message.tool_calls + + # For each current tool call, accumulate its arguments + for current_call in current_tool_calls: + if isinstance(current_call, dict) and 'index' in current_call: + idx = current_call['index'] + + # Find existing accumulated call with same index + existing_call = None + for acc_call in accumulated_data[choice_idx]['tool_calls']: + if (isinstance(acc_call, dict) and + acc_call.get('index') == idx): + existing_call = acc_call + break + + if existing_call: + # Accumulate function fields from current call + if ('function' in current_call and + current_call['function']): + if 'function' not in existing_call: + existing_call['function'] = {} + + # Accumulate function.name + if 'name' in current_call['function']: + if 'name' not in existing_call['function']: + existing_call['function']['name'] = '' + existing_call['function']['name'] += current_call['function']['name'] + + # Accumulate function.arguments + if 'arguments' in current_call['function']: + if 'arguments' not in existing_call['function']: + existing_call['function']['arguments'] = '' + existing_call['function']['arguments'] += current_call['function']['arguments'] + + # Update other fields with latest values + existing_call.update({k: v for k, v in current_call.items() + if k != 'function' and v}) + if 'function' in current_call and current_call['function']: + existing_call['function'].update({k: v for k, v in current_call['function'].items() + if k not in ['arguments', 'name'] and v}) + else: + # Add new tool call + accumulated_data[choice_idx]['tool_calls'].append(dict(current_call)) + + # Update choice with accumulated tool_calls + choice.message.tool_calls = accumulated_data[choice_idx]['tool_calls'] + + # Handle logprobs accumulation (only if logprobs exists) + try: + if ('logprobs' in choice and choice.logprobs and + isinstance(choice.logprobs, dict) and 'content' in choice.logprobs): + current_logprobs_content = choice.logprobs['content'] + if current_logprobs_content and isinstance(current_logprobs_content, list): + # Initialize logprobs content if not exists + if 'logprobs' not in accumulated_data[choice_idx]: + accumulated_data[choice_idx]['logprobs'] = {'content': []} + elif 'content' not in accumulated_data[choice_idx]['logprobs']: + accumulated_data[choice_idx]['logprobs']['content'] = [] + + # Extend the accumulated logprobs content array + accumulated_data[choice_idx]['logprobs']['content'].extend(current_logprobs_content) + # Update choice with accumulated logprobs + choice.logprobs['content'] = accumulated_data[choice_idx]['logprobs']['content'] + except (KeyError, AttributeError, TypeError): + # logprobs field might not exist or be in unexpected format, safely skip + pass diff --git a/samples/test_generation.py b/samples/test_generation.py index 531d4b1..b5b132e 100644 --- a/samples/test_generation.py +++ b/samples/test_generation.py @@ -34,41 +34,15 @@ def test_response_with_content(): result_format="message", incremental_output=False, stream=True, + logprobs=True, + top_logprobs=5, + n=4, ) print("\n") for chunk in response: print(chunk) - """ - content的示例如下: - { - "status_code": 200, - "request_id": "a0700200-eb09-4d6c-ae76-d891ec1ae77b", - "code": "", - "message": "", - "output": { - "text": null, - "finish_reason": null, - "choices": [ - { - "finish_reason": "null", - "message": { - "role": "assistant", - "content": "你好" ---> 需要merge的内容 - } - } - ] - }, - "usage": { - "input_tokens": 28, - "output_tokens": 4, - "total_tokens": 32, - "cached_tokens": 0 - } - } - """ - @staticmethod def test_response_with_reasoning_content(): messages = [ @@ -103,42 +77,6 @@ def test_response_with_reasoning_content(): for chunk in response: print(chunk) - - """ - reasoning_content 的示例如下: - { - "status_code": 200, - "request_id": "5bef2386-ce04-4d63-a650-fa6f72c84bfb", - "code": "", - "message": "", - "output": { - "text": null, - "finish_reason": null, - "choices": [ - { - "finish_reason": "null", - "message": { - "role": "assistant", - "content": "", - "reasoning_content": "首先" ---> 不需要merge的内容,因为只支持增量模式 - } - } - ] - }, - "usage": { - "input_tokens": 28, - "output_tokens": 3, - "total_tokens": 31, - "output_tokens_details": { - "reasoning_tokens": 1 - }, - "prompt_tokens_details": { - "cached_tokens": 0 - } - } - } - """ - @staticmethod def test_response_with_tool_calls(): tools = [ @@ -186,50 +124,6 @@ def test_response_with_tool_calls(): for chunk in response: print(chunk) - - """ - tool calls 示例: - { - "status_code": 200, - "request_id": "fb9231ea-723c-4e72-b504-e075c7d312de", - "code": "", - "message": "", - "output": { - "text": null, - "finish_reason": null, - "choices": [ - { - "finish_reason": "null", - "message": { - "role": "assistant", - "content": "", - "tool_calls": [ ---> 需要merge的内容 - { - "index": 0, - "id": "call_b9c94cf8838d46a4911a7e", - "type": "function", - "function": { - "name": "get_current_weather", - "arguments": "{\"location\":" - } - } - ] - }, - "index": 0 - } - ] - }, - "usage": { - "input_tokens": 204, - "output_tokens": 16, - "total_tokens": 220, - "prompt_tokens_details": { - "cached_tokens": 0 - } - } - } - """ - @staticmethod def test_response_with_search_info(): # 配置API Key @@ -408,6 +302,6 @@ def process_responses(responses, step_name): if __name__ == "__main__": # TestGeneration.test_response_with_content() - # TestGeneration.test_response_with_tool_calls() - TestGeneration.test_response_with_search_info() + TestGeneration.test_response_with_tool_calls() + # TestGeneration.test_response_with_search_info() # TestGeneration.test_response_with_reasoning_content() \ No newline at end of file diff --git a/samples/test_multimodal_conversation.py b/samples/test_multimodal_conversation.py index 562a2b1..8c4c805 100644 --- a/samples/test_multimodal_conversation.py +++ b/samples/test_multimodal_conversation.py @@ -40,49 +40,6 @@ def test_vl_model(): for chunk in response: print(chunk) - - """ - MultiModalConversation response example: - { - "status_code": 200, - "request_id": "37104bf5-d550-42e2-b040-fd261eb7b35f", - "code": "", - "message": "", - "output": { - "text": null, - "finish_reason": null, - "choices": [ - { - "finish_reason": "null", - "message": { - "role": "assistant", - "content": [ - { - "text": "图" ---> 需要merge的内容 - } - ] - } - } - ], - "audio": null - }, - "usage": { - "input_tokens": 1275, - "output_tokens": 1, - "characters": 0, - "total_tokens": 1276, - "input_tokens_details": { - "image_tokens": 1249, - "text_tokens": 26 - }, - "output_tokens_details": { - "text_tokens": 1 - }, - "image_tokens": 1249 - } - } - """ - @staticmethod def test_vl_model_with_tool_calls(): tools = [ @@ -227,6 +184,37 @@ def test_qwen_asr(): for chunk in response: print(chunk) + + @staticmethod + def test_vl_model_with_reasoning_content(): + messages = [ + { + "role": "system", + "content": [ + {"text": "You are a helpful assistant."} + ] + }, + { + "role": "user", + "content": [ + {"text": "1.1和0.9哪个大?"} + ] + } + ] + + # Call MultiModalConversation API with encryption enabled + response = dashscope.MultiModalConversation.call( + api_key=os.getenv('DASHSCOPE_API_KEY'), + model='qwen3-vl-30b-a3b-thinking', + messages=messages, + incremental_output=False, + stream=True, + ) + + print("\n") + for chunk in response: + print(chunk) + @staticmethod def test_omni(): pass @@ -235,5 +223,6 @@ def test_omni(): if __name__ == "__main__": # TestMultiModalConversation.test_vl_model() TestMultiModalConversation.test_vl_model_with_tool_calls() + # TestMultiModalConversation.test_vl_model_with_reasoning_content() # TestMultiModalConversation.test_vl_ocr() # TestMultiModalConversation.test_qwen_asr() From 7163801f1b708db4c5d90056565842e73da0b582 Mon Sep 17 00:00:00 2001 From: Kevin Lin Date: Sun, 19 Oct 2025 19:33:12 +0800 Subject: [PATCH 03/12] fix: logprobs and n output --- dashscope/aigc/generation.py | 22 ++-- dashscope/aigc/multimodal_conversation.py | 22 ++-- dashscope/utils/message_utils.py | 126 ++++++++++++++++++++-- samples/test_generation.py | 8 +- 4 files changed, 148 insertions(+), 30 deletions(-) diff --git a/dashscope/aigc/generation.py b/dashscope/aigc/generation.py index 1a3d887..28bb7f7 100644 --- a/dashscope/aigc/generation.py +++ b/dashscope/aigc/generation.py @@ -159,7 +159,9 @@ def call( **parameters) if is_stream: if to_merge_incremental_output: - return cls._merge_generation_response(response) + # Extract n parameter for merge logic + n = parameters.get('n', 1) + return cls._merge_generation_response(response, n) else: return (GenerationResponse.from_api_response(rsp) for rsp in response) @@ -206,13 +208,14 @@ def _build_input_parameters(cls, model, prompt, history, messages, return input, {**parameters, **kwargs} @classmethod - def _merge_generation_response(cls, response) -> Generator[GenerationResponse, None, None]: + def _merge_generation_response(cls, response, n=1) -> Generator[GenerationResponse, None, None]: """Merge incremental response chunks to simulate non-incremental output.""" accumulated_data = {} for rsp in response: parsed_response = GenerationResponse.from_api_response(rsp) - merge_single_response(parsed_response, accumulated_data) - yield parsed_response + should_yield = merge_single_response(parsed_response, accumulated_data, n) + if should_yield: + yield parsed_response class AioGeneration(BaseAioApi): @@ -357,7 +360,9 @@ async def call( **parameters) if is_stream: if to_merge_incremental_output: - return cls._merge_generation_response(response) + # Extract n parameter for merge logic + n = parameters.get('n', 1) + return cls._merge_generation_response(response, n) else: return cls._stream_responses(response) else: @@ -371,11 +376,12 @@ async def _stream_responses(cls, response) -> AsyncGenerator[GenerationResponse, yield GenerationResponse.from_api_response(rsp) @classmethod - async def _merge_generation_response(cls, response) -> AsyncGenerator[GenerationResponse, None]: + async def _merge_generation_response(cls, response, n=1) -> AsyncGenerator[GenerationResponse, None]: """Async version of merge incremental response chunks.""" accumulated_data = {} async for rsp in response: # type: ignore parsed_response = GenerationResponse.from_api_response(rsp) - merge_single_response(parsed_response, accumulated_data) - yield parsed_response + should_yield = merge_single_response(parsed_response, accumulated_data, n) + if should_yield: + yield parsed_response diff --git a/dashscope/aigc/multimodal_conversation.py b/dashscope/aigc/multimodal_conversation.py index 3798c44..e375ad4 100644 --- a/dashscope/aigc/multimodal_conversation.py +++ b/dashscope/aigc/multimodal_conversation.py @@ -130,7 +130,9 @@ def call( **kwargs) if is_stream: if to_merge_incremental_output: - return cls._merge_multimodal_response(response) + # Extract n parameter for merge logic + n = kwargs.get('n', 1) + return cls._merge_multimodal_response(response, n) else: return (MultiModalConversationResponse.from_api_response(rsp) for rsp in response) @@ -164,14 +166,15 @@ def _preprocess_messages(cls, model: str, messages: List[dict], return has_upload @classmethod - def _merge_multimodal_response(cls, response) -> Generator[MultiModalConversationResponse, None, None]: + def _merge_multimodal_response(cls, response, n=1) -> Generator[MultiModalConversationResponse, None, None]: """Merge incremental response chunks to simulate non-incremental output.""" accumulated_data = {} for rsp in response: parsed_response = MultiModalConversationResponse.from_api_response(rsp) - merge_single_response(parsed_response, accumulated_data) - yield parsed_response + should_yield = merge_single_response(parsed_response, accumulated_data, n) + if should_yield: + yield parsed_response class AioMultiModalConversation(BaseAioApi): @@ -290,7 +293,9 @@ async def call( **kwargs) if is_stream: if to_merge_incremental_output: - return cls._merge_multimodal_response(response) + # Extract n parameter for merge logic + n = kwargs.get('n', 1) + return cls._merge_multimodal_response(response, n) else: return cls._stream_responses(response) else: @@ -330,13 +335,14 @@ async def _stream_responses(cls, response) -> AsyncGenerator[MultiModalConversat yield MultiModalConversationResponse.from_api_response(rsp) @classmethod - async def _merge_multimodal_response(cls, response) -> AsyncGenerator[MultiModalConversationResponse, None]: + async def _merge_multimodal_response(cls, response, n=1) -> AsyncGenerator[MultiModalConversationResponse, None]: """Async version of merge incremental response chunks.""" accumulated_data = {} async for rsp in response: parsed_response = MultiModalConversationResponse.from_api_response(rsp) - merge_single_response(parsed_response, accumulated_data) - yield parsed_response + should_yield = merge_single_response(parsed_response, accumulated_data, n) + if should_yield: + yield parsed_response diff --git a/dashscope/utils/message_utils.py b/dashscope/utils/message_utils.py index a2c41a0..d5a9ee2 100644 --- a/dashscope/utils/message_utils.py +++ b/dashscope/utils/message_utils.py @@ -1,28 +1,54 @@ # Copyright (c) Alibaba, Inc. and its affiliates. -def merge_single_response(parsed_response, accumulated_data): - """Merge a single response chunk with accumulated data.""" +def merge_single_response(parsed_response, accumulated_data, n=1): + """Merge a single response chunk with accumulated data. + + Args: + parsed_response: The response chunk to merge + accumulated_data: Dictionary storing accumulated data for each choice + n: Number of expected choices (default 1) + + Returns: + bool: True if this response should be yielded, False if filtered + """ + # Check if all choices have been sent (for n > 1 case) + if n > 1 and accumulated_data: + all_sent = any(data.get('all_choices_sent', False) + for data in accumulated_data.values()) + if all_sent: + return False + # Handle output.text accumulation when choices is null if (parsed_response.output and hasattr(parsed_response.output, 'text') and parsed_response.output.text and (not parsed_response.output.choices or parsed_response.output.choices is None)): - choice_idx = 0 # Use choice_idx 0 for output.text content to reuse existing structure + choice_idx = 0 if choice_idx not in accumulated_data: accumulated_data[choice_idx] = { 'content': '', 'reasoning_content': '', 'tool_calls': [], - 'logprobs': {'content': []} + 'logprobs': {'content': []}, + 'finished': False, + 'finish_reason': None, + 'all_choices_sent': False, + 'role': None } accumulated_data[choice_idx]['content'] += parsed_response.output.text parsed_response.output.text = accumulated_data[choice_idx]['content'] - return + return True # Process each choice in the choices array if parsed_response.output and parsed_response.output.choices: - for choice_enum_idx, choice in enumerate(parsed_response.output.choices): - # Use choice.index if available, otherwise use enumerate index for compatibility + choices = parsed_response.output.choices + + # Filter out empty choices array + if not choices: + return False + + for choice_enum_idx, choice in enumerate(choices): + # Use choice.index if available, otherwise use enumerate index try: choice_idx = choice.index if hasattr(choice, 'index') and 'index' in choice else choice_enum_idx except (KeyError, AttributeError): @@ -34,15 +60,23 @@ def merge_single_response(parsed_response, accumulated_data): 'content': '', 'reasoning_content': '', 'tool_calls': [], - 'logprobs': {'content': []} + 'logprobs': {'content': []}, + 'finished': False, + 'finish_reason': None, + 'all_choices_sent': False, + 'role': None } if choice.message: + # Save role if present + if hasattr(choice.message, 'role') and choice.message.role: + accumulated_data[choice_idx]['role'] = choice.message.role + # Handle content accumulation if 'content' in choice.message and choice.message.content: current_content = choice.message.content if current_content: - # Check if content is multimodal format (array with text fields) + # Check if content is multimodal format if isinstance(current_content, list): # Handle multimodal content (array format) # Initialize accumulated content as array if not already @@ -126,6 +160,10 @@ def merge_single_response(parsed_response, accumulated_data): # Update choice with accumulated tool_calls choice.message.tool_calls = accumulated_data[choice_idx]['tool_calls'] + # Restore role if we have it + if accumulated_data[choice_idx]['role'] and (not hasattr(choice.message, 'role') or not choice.message.role): + choice.message.role = accumulated_data[choice_idx]['role'] + # Handle logprobs accumulation (only if logprobs exists) try: if ('logprobs' in choice and choice.logprobs and @@ -140,8 +178,74 @@ def merge_single_response(parsed_response, accumulated_data): # Extend the accumulated logprobs content array accumulated_data[choice_idx]['logprobs']['content'].extend(current_logprobs_content) - # Update choice with accumulated logprobs - choice.logprobs['content'] = accumulated_data[choice_idx]['logprobs']['content'] except (KeyError, AttributeError, TypeError): # logprobs field might not exist or be in unexpected format, safely skip pass + + # Always set accumulated logprobs if we have any + if (accumulated_data[choice_idx]['logprobs']['content'] and + hasattr(choice, 'logprobs') and choice.logprobs): + choice.logprobs['content'] = accumulated_data[choice_idx][ + 'logprobs']['content'] + + # Handle finish_reason for n > 1 case + if (n > 1 and hasattr(choice, 'finish_reason') and + choice.finish_reason and + choice.finish_reason != 'null'): + accumulated_data[choice_idx]['finish_reason'] = \ + choice.finish_reason + accumulated_data[choice_idx]['finished'] = True + + # Check if all choices are finished when n > 1 + if n > 1: + finished_count = sum(1 for data in accumulated_data.values() + if data.get('finished', False)) + + # If not all choices finished, hide finish_reason + if finished_count < n: + for choice in choices: + if (hasattr(choice, 'finish_reason') and + choice.finish_reason and + choice.finish_reason != 'null'): + choice.finish_reason = 'null' + else: + # All choices finished, mark as sent first + for data in accumulated_data.values(): + data['all_choices_sent'] = True + + # Return final result with all choices + all_choices = [] + for choice_idx, data in accumulated_data.items(): + # Create a new choice object + final_choice_dict = { + 'index': choice_idx, + 'finish_reason': data['finish_reason'] + } + + # Create message + message_dict = { + 'role': data['role'] if data['role'] else 'assistant' + } + if data['content']: + message_dict['content'] = ( + data['content'] if isinstance(data['content'], str) + else data['content']) + if data['reasoning_content']: + message_dict['reasoning_content'] = data['reasoning_content'] + if data['tool_calls']: + message_dict['tool_calls'] = data['tool_calls'] + + final_choice_dict['message'] = message_dict + + # Add logprobs if present + if data['logprobs']['content']: + final_choice_dict['logprobs'] = { + 'content': data['logprobs']['content'] + } + + all_choices.append(final_choice_dict) + + # Update output choices with all accumulated choices + parsed_response.output.choices = all_choices + + return True diff --git a/samples/test_generation.py b/samples/test_generation.py index b5b132e..622af13 100644 --- a/samples/test_generation.py +++ b/samples/test_generation.py @@ -16,7 +16,7 @@ def test_response_with_content(): "content": [ { "type": "text", - "text": "你好", + "text": "从1到1000选择一个数字", "cache_control": { "type": "ephemeral", "ttl": "5m" @@ -33,6 +33,8 @@ def test_response_with_content(): messages=messages, result_format="message", incremental_output=False, + temperature=1.0, + top_p=1.0, stream=True, logprobs=True, top_logprobs=5, @@ -301,7 +303,7 @@ def process_responses(responses, step_name): print("\n 研究完成!") if __name__ == "__main__": - # TestGeneration.test_response_with_content() - TestGeneration.test_response_with_tool_calls() + TestGeneration.test_response_with_content() + # TestGeneration.test_response_with_tool_calls() # TestGeneration.test_response_with_search_info() # TestGeneration.test_response_with_reasoning_content() \ No newline at end of file From 29781fde812f6f8a97f62e82d468d3c52a56305b Mon Sep 17 00:00:00 2001 From: Kevin Lin Date: Tue, 21 Oct 2025 11:07:22 +0800 Subject: [PATCH 04/12] fix: empty text of last response --- dashscope/utils/message_utils.py | 24 ++++++++++++++++++------ 1 file changed, 18 insertions(+), 6 deletions(-) diff --git a/dashscope/utils/message_utils.py b/dashscope/utils/message_utils.py index d5a9ee2..df82e1b 100644 --- a/dashscope/utils/message_utils.py +++ b/dashscope/utils/message_utils.py @@ -21,7 +21,6 @@ def merge_single_response(parsed_response, accumulated_data, n=1): # Handle output.text accumulation when choices is null if (parsed_response.output and hasattr(parsed_response.output, 'text') and - parsed_response.output.text and (not parsed_response.output.choices or parsed_response.output.choices is None)): choice_idx = 0 if choice_idx not in accumulated_data: @@ -35,7 +34,10 @@ def merge_single_response(parsed_response, accumulated_data, n=1): 'all_choices_sent': False, 'role': None } - accumulated_data[choice_idx]['content'] += parsed_response.output.text + # Accumulate text if not empty + if parsed_response.output.text: + accumulated_data[choice_idx]['content'] += parsed_response.output.text + # Always set accumulated content back to response parsed_response.output.text = accumulated_data[choice_idx]['content'] return True @@ -73,7 +75,7 @@ def merge_single_response(parsed_response, accumulated_data, n=1): accumulated_data[choice_idx]['role'] = choice.message.role # Handle content accumulation - if 'content' in choice.message and choice.message.content: + if 'content' in choice.message: current_content = choice.message.content if current_content: # Check if content is multimodal format @@ -93,14 +95,24 @@ def merge_single_response(parsed_response, accumulated_data, n=1): if content_item['text']: # Accumulate text content accumulated_data[choice_idx]['content'][content_idx]['text'] += content_item['text'] - # Update the current response with accumulated content - choice.message.content[content_idx]['text'] = accumulated_data[choice_idx]['content'][content_idx]['text'] + # Update the current response with accumulated content + for content_idx in range(len(accumulated_data[choice_idx]['content'])): + if content_idx < len(choice.message.content): + choice.message.content[content_idx]['text'] = accumulated_data[choice_idx]['content'][content_idx]['text'] else: # Handle regular content (string format) - # Initialize accumulated content as string if not already + # Initialize accumulated content as string if isinstance(accumulated_data[choice_idx]['content'], list): accumulated_data[choice_idx]['content'] = '' + # Accumulate content if not empty accumulated_data[choice_idx]['content'] += current_content + # Always set accumulated content back to response + if not isinstance(accumulated_data[choice_idx]['content'], list): + choice.message.content = accumulated_data[choice_idx]['content'] + else: + # For multimodal content, ensure message.content + # exists + if not isinstance(choice.message.content, list): choice.message.content = accumulated_data[choice_idx]['content'] # Handle reasoning_content accumulation From ee9377dd4f5003355c3c1e5c03b2cd923fc867a8 Mon Sep 17 00:00:00 2001 From: Kevin Lin Date: Thu, 23 Oct 2025 14:15:50 +0800 Subject: [PATCH 05/12] fix: message is empty while n greater than 1 --- dashscope/utils/message_utils.py | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/dashscope/utils/message_utils.py b/dashscope/utils/message_utils.py index df82e1b..ebfbc42 100644 --- a/dashscope/utils/message_utils.py +++ b/dashscope/utils/message_utils.py @@ -69,7 +69,18 @@ def merge_single_response(parsed_response, accumulated_data, n=1): 'role': None } - if choice.message: + # Handle message field - create if null + if not choice.message: + # Create message object with accumulated data + choice.message = { + 'role': accumulated_data[choice_idx]['role'] if accumulated_data[choice_idx]['role'] else 'assistant', + 'content': accumulated_data[choice_idx]['content'] + } + if accumulated_data[choice_idx]['reasoning_content']: + choice.message['reasoning_content'] = accumulated_data[choice_idx]['reasoning_content'] + if accumulated_data[choice_idx]['tool_calls']: + choice.message['tool_calls'] = accumulated_data[choice_idx]['tool_calls'] + else: # Save role if present if hasattr(choice.message, 'role') and choice.message.role: accumulated_data[choice_idx]['role'] = choice.message.role From 79ab9cfe3456ba7cc10a40cfea8265ea4235fd4d Mon Sep 17 00:00:00 2001 From: Kevin Lin Date: Thu, 23 Oct 2025 14:48:33 +0800 Subject: [PATCH 06/12] fix(mutimodal): empty content of last response --- dashscope/aigc/multimodal_conversation.py | 6 +- dashscope/utils/message_utils.py | 271 ++++++++++++++++++++++ 2 files changed, 274 insertions(+), 3 deletions(-) diff --git a/dashscope/aigc/multimodal_conversation.py b/dashscope/aigc/multimodal_conversation.py index e375ad4..1b0983a 100644 --- a/dashscope/aigc/multimodal_conversation.py +++ b/dashscope/aigc/multimodal_conversation.py @@ -10,7 +10,7 @@ from dashscope.common.utils import _get_task_group_and_task from dashscope.utils.oss_utils import preprocess_message_element from dashscope.utils.param_utils import ParamUtil -from dashscope.utils.message_utils import merge_single_response +from dashscope.utils.message_utils import merge_multimodal_single_response class MultiModalConversation(BaseApi): @@ -172,7 +172,7 @@ def _merge_multimodal_response(cls, response, n=1) -> Generator[MultiModalConver for rsp in response: parsed_response = MultiModalConversationResponse.from_api_response(rsp) - should_yield = merge_single_response(parsed_response, accumulated_data, n) + should_yield = merge_multimodal_single_response(parsed_response, accumulated_data, n) if should_yield: yield parsed_response @@ -341,7 +341,7 @@ async def _merge_multimodal_response(cls, response, n=1) -> AsyncGenerator[Multi async for rsp in response: parsed_response = MultiModalConversationResponse.from_api_response(rsp) - should_yield = merge_single_response(parsed_response, accumulated_data, n) + should_yield = merge_multimodal_single_response(parsed_response, accumulated_data, n) if should_yield: yield parsed_response diff --git a/dashscope/utils/message_utils.py b/dashscope/utils/message_utils.py index ebfbc42..a836295 100644 --- a/dashscope/utils/message_utils.py +++ b/dashscope/utils/message_utils.py @@ -272,3 +272,274 @@ def merge_single_response(parsed_response, accumulated_data, n=1): parsed_response.output.choices = all_choices return True + + +def merge_multimodal_single_response(parsed_response, accumulated_data, n=1): + """Merge a single response chunk with accumulated data. + + Args: + parsed_response: The response chunk to merge + accumulated_data: Dictionary storing accumulated data for each choice + n: Number of expected choices (default 1) + + Returns: + bool: True if this response should be yielded, False if filtered + """ + # Check if all choices have been sent (for n > 1 case) + if n > 1 and accumulated_data: + all_sent = any(data.get('all_choices_sent', False) + for data in accumulated_data.values()) + if all_sent: + return False + + # Handle output.text accumulation when choices is null + if (parsed_response.output and + hasattr(parsed_response.output, 'text') and + (not parsed_response.output.choices or parsed_response.output.choices is None)): + choice_idx = 0 + if choice_idx not in accumulated_data: + accumulated_data[choice_idx] = { + 'content': '', + 'reasoning_content': '', + 'tool_calls': [], + 'logprobs': {'content': []}, + 'finished': False, + 'finish_reason': None, + 'all_choices_sent': False, + 'role': None + } + # Accumulate text if not empty + if parsed_response.output.text: + accumulated_data[choice_idx]['content'] += parsed_response.output.text + # Always set accumulated content back to response + parsed_response.output.text = accumulated_data[choice_idx]['content'] + return True + + # Process each choice in the choices array + if parsed_response.output and parsed_response.output.choices: + choices = parsed_response.output.choices + + # Filter out empty choices array + if not choices: + return False + + for choice_enum_idx, choice in enumerate(choices): + # Use choice.index if available, otherwise use enumerate index + try: + choice_idx = choice.index if hasattr(choice, 'index') and 'index' in choice else choice_enum_idx + except (KeyError, AttributeError): + choice_idx = choice_enum_idx + + # Initialize accumulated data for this choice if not exists + if choice_idx not in accumulated_data: + accumulated_data[choice_idx] = { + 'content': '', + 'reasoning_content': '', + 'tool_calls': [], + 'logprobs': {'content': []}, + 'finished': False, + 'finish_reason': None, + 'all_choices_sent': False, + 'role': None + } + + # Handle message field - create if null + if not choice.message: + # Create message object with accumulated data + choice.message = { + 'role': accumulated_data[choice_idx]['role'] if accumulated_data[choice_idx]['role'] else 'assistant', + 'content': accumulated_data[choice_idx]['content'] + } + if accumulated_data[choice_idx]['reasoning_content']: + choice.message['reasoning_content'] = accumulated_data[choice_idx]['reasoning_content'] + if accumulated_data[choice_idx]['tool_calls']: + choice.message['tool_calls'] = accumulated_data[choice_idx]['tool_calls'] + else: + # Save role if present + if hasattr(choice.message, 'role') and choice.message.role: + accumulated_data[choice_idx]['role'] = choice.message.role + + # Handle content accumulation + if 'content' in choice.message: + current_content = choice.message.content + # Check if content is multimodal format + if isinstance(current_content, list): + # Handle multimodal content (array format) + # Initialize accumulated content as array if not already + if not isinstance(accumulated_data[choice_idx]['content'], list): + accumulated_data[choice_idx]['content'] = [] + + # Only process if current_content is not empty + if current_content: + # Ensure accumulated content list has enough elements + while len(accumulated_data[choice_idx]['content']) < len(current_content): + accumulated_data[choice_idx]['content'].append({'text': ''}) + + # Merge each content element + for content_idx, content_item in enumerate(current_content): + if isinstance(content_item, dict) and 'text' in content_item: + if content_item['text']: + # Accumulate text content + accumulated_data[choice_idx]['content'][content_idx]['text'] += content_item['text'] + + # Always set accumulated content back to response + choice.message.content = accumulated_data[choice_idx]['content'] + elif current_content: + # Handle regular content (string format) + # Initialize accumulated content as string + if isinstance(accumulated_data[choice_idx]['content'], list): + accumulated_data[choice_idx]['content'] = '' + # Accumulate content if not empty + accumulated_data[choice_idx]['content'] += current_content + # Set accumulated content back to response + choice.message.content = accumulated_data[choice_idx]['content'] + elif not current_content and accumulated_data[choice_idx]['content']: + # Current content is empty but we have accumulated content, restore it + choice.message.content = accumulated_data[choice_idx]['content'] + + # Handle reasoning_content accumulation + if 'reasoning_content' in choice.message: + current_reasoning_content = choice.message.reasoning_content + if current_reasoning_content: + accumulated_data[choice_idx]['reasoning_content'] += current_reasoning_content + # Always set the accumulated reasoning_content back, even if current is empty + choice.message.reasoning_content = accumulated_data[choice_idx]['reasoning_content'] + + # Handle tool_calls accumulation + if 'tool_calls' in choice.message and choice.message.tool_calls: + current_tool_calls = choice.message.tool_calls + + # For each current tool call, accumulate its arguments + for current_call in current_tool_calls: + if isinstance(current_call, dict) and 'index' in current_call: + idx = current_call['index'] + + # Find existing accumulated call with same index + existing_call = None + for acc_call in accumulated_data[choice_idx]['tool_calls']: + if (isinstance(acc_call, dict) and + acc_call.get('index') == idx): + existing_call = acc_call + break + + if existing_call: + # Accumulate function fields from current call + if ('function' in current_call and + current_call['function']): + if 'function' not in existing_call: + existing_call['function'] = {} + + # Accumulate function.name + if 'name' in current_call['function']: + if 'name' not in existing_call['function']: + existing_call['function']['name'] = '' + existing_call['function']['name'] += current_call['function']['name'] + + # Accumulate function.arguments + if 'arguments' in current_call['function']: + if 'arguments' not in existing_call['function']: + existing_call['function']['arguments'] = '' + existing_call['function']['arguments'] += current_call['function']['arguments'] + + # Update other fields with latest values + existing_call.update({k: v for k, v in current_call.items() + if k != 'function' and v}) + if 'function' in current_call and current_call['function']: + existing_call['function'].update({k: v for k, v in current_call['function'].items() + if k not in ['arguments', 'name'] and v}) + else: + # Add new tool call + accumulated_data[choice_idx]['tool_calls'].append(dict(current_call)) + + # Update choice with accumulated tool_calls + choice.message.tool_calls = accumulated_data[choice_idx]['tool_calls'] + + # Restore role if we have it + if accumulated_data[choice_idx]['role'] and (not hasattr(choice.message, 'role') or not choice.message.role): + choice.message.role = accumulated_data[choice_idx]['role'] + + # Handle logprobs accumulation (only if logprobs exists) + try: + if ('logprobs' in choice and choice.logprobs and + isinstance(choice.logprobs, dict) and 'content' in choice.logprobs): + current_logprobs_content = choice.logprobs['content'] + if current_logprobs_content and isinstance(current_logprobs_content, list): + # Initialize logprobs content if not exists + if 'logprobs' not in accumulated_data[choice_idx]: + accumulated_data[choice_idx]['logprobs'] = {'content': []} + elif 'content' not in accumulated_data[choice_idx]['logprobs']: + accumulated_data[choice_idx]['logprobs']['content'] = [] + + # Extend the accumulated logprobs content array + accumulated_data[choice_idx]['logprobs']['content'].extend(current_logprobs_content) + except (KeyError, AttributeError, TypeError): + # logprobs field might not exist or be in unexpected format, safely skip + pass + + # Always set accumulated logprobs if we have any + if (accumulated_data[choice_idx]['logprobs']['content'] and + hasattr(choice, 'logprobs') and choice.logprobs): + choice.logprobs['content'] = accumulated_data[choice_idx][ + 'logprobs']['content'] + + # Handle finish_reason for n > 1 case + if (n > 1 and hasattr(choice, 'finish_reason') and + choice.finish_reason and + choice.finish_reason != 'null'): + accumulated_data[choice_idx]['finish_reason'] = \ + choice.finish_reason + accumulated_data[choice_idx]['finished'] = True + + # Check if all choices are finished when n > 1 + if n > 1: + finished_count = sum(1 for data in accumulated_data.values() + if data.get('finished', False)) + + # If not all choices finished, hide finish_reason + if finished_count < n: + for choice in choices: + if (hasattr(choice, 'finish_reason') and + choice.finish_reason and + choice.finish_reason != 'null'): + choice.finish_reason = 'null' + else: + # All choices finished, mark as sent first + for data in accumulated_data.values(): + data['all_choices_sent'] = True + + # Return final result with all choices + all_choices = [] + for choice_idx, data in accumulated_data.items(): + # Create a new choice object + final_choice_dict = { + 'index': choice_idx, + 'finish_reason': data['finish_reason'] + } + + # Create message + message_dict = { + 'role': data['role'] if data['role'] else 'assistant' + } + if data['content']: + message_dict['content'] = ( + data['content'] if isinstance(data['content'], str) + else data['content']) + if data['reasoning_content']: + message_dict['reasoning_content'] = data['reasoning_content'] + if data['tool_calls']: + message_dict['tool_calls'] = data['tool_calls'] + + final_choice_dict['message'] = message_dict + + # Add logprobs if present + if data['logprobs']['content']: + final_choice_dict['logprobs'] = { + 'content': data['logprobs']['content'] + } + + all_choices.append(final_choice_dict) + + # Update output choices with all accumulated choices + parsed_response.output.choices = all_choices + + return True From 1ee1a618fff42b118222083ea235bcd76cda8d37 Mon Sep 17 00:00:00 2001 From: Kevin Lin Date: Thu, 23 Oct 2025 15:27:48 +0800 Subject: [PATCH 07/12] feat(multimodal): empty reasoning_content while content not empty --- dashscope/utils/message_utils.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/dashscope/utils/message_utils.py b/dashscope/utils/message_utils.py index a836295..e92f881 100644 --- a/dashscope/utils/message_utils.py +++ b/dashscope/utils/message_utils.py @@ -131,7 +131,9 @@ def merge_single_response(parsed_response, accumulated_data, n=1): current_reasoning_content = choice.message.reasoning_content if current_reasoning_content: accumulated_data[choice_idx]['reasoning_content'] += current_reasoning_content - # Always set the accumulated reasoning_content back, even if current is empty + # Always set the accumulated reasoning_content back if we + # have any, even if current response doesn't have it + if accumulated_data[choice_idx]['reasoning_content']: choice.message.reasoning_content = accumulated_data[choice_idx]['reasoning_content'] # Handle tool_calls accumulation @@ -402,7 +404,9 @@ def merge_multimodal_single_response(parsed_response, accumulated_data, n=1): current_reasoning_content = choice.message.reasoning_content if current_reasoning_content: accumulated_data[choice_idx]['reasoning_content'] += current_reasoning_content - # Always set the accumulated reasoning_content back, even if current is empty + # Always set the accumulated reasoning_content back if we + # have any, even if current response doesn't have it + if accumulated_data[choice_idx]['reasoning_content']: choice.message.reasoning_content = accumulated_data[choice_idx]['reasoning_content'] # Handle tool_calls accumulation From 1440118d7cfada11f1ebb32287c5a418f52cf1cd Mon Sep 17 00:00:00 2001 From: Kevin Lin Date: Thu, 23 Oct 2025 16:02:19 +0800 Subject: [PATCH 08/12] fix(multimodal): empty tool calls of last response --- dashscope/utils/message_utils.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/dashscope/utils/message_utils.py b/dashscope/utils/message_utils.py index e92f881..b031b52 100644 --- a/dashscope/utils/message_utils.py +++ b/dashscope/utils/message_utils.py @@ -184,6 +184,10 @@ def merge_single_response(parsed_response, accumulated_data, n=1): # Update choice with accumulated tool_calls choice.message.tool_calls = accumulated_data[choice_idx]['tool_calls'] + elif accumulated_data[choice_idx]['tool_calls']: + # If current response has no tool_calls but we have + # accumulated tool_calls, restore them + choice.message.tool_calls = accumulated_data[choice_idx]['tool_calls'] # Restore role if we have it if accumulated_data[choice_idx]['role'] and (not hasattr(choice.message, 'role') or not choice.message.role): @@ -457,6 +461,9 @@ def merge_multimodal_single_response(parsed_response, accumulated_data, n=1): # Update choice with accumulated tool_calls choice.message.tool_calls = accumulated_data[choice_idx]['tool_calls'] + elif accumulated_data[choice_idx]['tool_calls']: + # If current response has no tool_calls but we have accumulated tool_calls, restore them + choice.message.tool_calls = accumulated_data[choice_idx]['tool_calls'] # Restore role if we have it if accumulated_data[choice_idx]['role'] and (not hasattr(choice.message, 'role') or not choice.message.role): From 7cb79583b55edecf6f574f56661e6a997e9cc17d Mon Sep 17 00:00:00 2001 From: Kevin Lin Date: Mon, 27 Oct 2025 16:47:55 +0800 Subject: [PATCH 09/12] fix: wrong usage while n > 1 --- dashscope/utils/message_utils.py | 132 ++++++++++++++++++++++++++++++- 1 file changed, 130 insertions(+), 2 deletions(-) diff --git a/dashscope/utils/message_utils.py b/dashscope/utils/message_utils.py index b031b52..7f56bc5 100644 --- a/dashscope/utils/message_utils.py +++ b/dashscope/utils/message_utils.py @@ -18,6 +18,27 @@ def merge_single_response(parsed_response, accumulated_data, n=1): if all_sent: return False + # Track usage for each choice index when n > 1 + # Each streaming packet contains usage info for one specific choice + if (n > 1 and parsed_response.usage and + parsed_response.output and parsed_response.output.choices and + len(parsed_response.output.choices) > 0): + if 'usage_by_index' not in accumulated_data: + accumulated_data['usage_by_index'] = {} + + # Get the choice index from the first (and typically only) choice in this packet + try: + first_choice = parsed_response.output.choices[0] + choice_idx = first_choice.index if hasattr( + first_choice, 'index') and 'index' in first_choice else 0 + + # Store only output_tokens for this choice index + if 'output_tokens' in parsed_response.usage: + accumulated_data['usage_by_index'][choice_idx] = dict( + parsed_response.usage) + except (KeyError, AttributeError, IndexError): + pass + # Handle output.text accumulation when choices is null if (parsed_response.output and hasattr(parsed_response.output, 'text') and @@ -240,11 +261,16 @@ def merge_single_response(parsed_response, accumulated_data, n=1): else: # All choices finished, mark as sent first for data in accumulated_data.values(): - data['all_choices_sent'] = True + if isinstance(data, dict) and 'all_choices_sent' in data: + data['all_choices_sent'] = True # Return final result with all choices all_choices = [] for choice_idx, data in accumulated_data.items(): + # Skip non-choice data (like usage_by_index) + if not isinstance(data, dict) or 'finished' not in data: + continue + # Create a new choice object final_choice_dict = { 'index': choice_idx, @@ -277,6 +303,44 @@ def merge_single_response(parsed_response, accumulated_data, n=1): # Update output choices with all accumulated choices parsed_response.output.choices = all_choices + # Aggregate usage from all choice indices + if 'usage_by_index' in accumulated_data and accumulated_data[ + 'usage_by_index']: + aggregated_usage = {} + usage_by_idx = accumulated_data['usage_by_index'] + + # Sum output_tokens and recalculate total_tokens + total_output_tokens = 0 + input_tokens = None + prompt_tokens_details = None + + for idx, usage in usage_by_idx.items(): + if 'output_tokens' in usage: + total_output_tokens += usage['output_tokens'] + # input_tokens should be the same for all indices + if input_tokens is None and 'input_tokens' in usage: + input_tokens = usage['input_tokens'] + # Keep prompt_tokens_details from any index + # (should be same) + if (prompt_tokens_details is None and + 'prompt_tokens_details' in usage): + prompt_tokens_details = usage[ + 'prompt_tokens_details'] + + # Build aggregated usage + if input_tokens is not None: + aggregated_usage['input_tokens'] = input_tokens + aggregated_usage['output_tokens'] = total_output_tokens + if input_tokens is not None: + aggregated_usage['total_tokens'] = ( + input_tokens + total_output_tokens) + if prompt_tokens_details is not None: + aggregated_usage['prompt_tokens_details'] = ( + prompt_tokens_details) + + # Update response usage with aggregated values + parsed_response.usage = aggregated_usage + return True @@ -298,6 +362,27 @@ def merge_multimodal_single_response(parsed_response, accumulated_data, n=1): if all_sent: return False + # Track usage for each choice index when n > 1 + # Each streaming packet contains usage info for one specific choice + if (n > 1 and parsed_response.usage and + parsed_response.output and parsed_response.output.choices and + len(parsed_response.output.choices) > 0): + if 'usage_by_index' not in accumulated_data: + accumulated_data['usage_by_index'] = {} + + # Get the choice index from the first (and typically only) choice in this packet + try: + first_choice = parsed_response.output.choices[0] + choice_idx = first_choice.index if hasattr( + first_choice, 'index') and 'index' in first_choice else 0 + + # Store only output_tokens for this choice index + if 'output_tokens' in parsed_response.usage: + accumulated_data['usage_by_index'][choice_idx] = dict( + parsed_response.usage) + except (KeyError, AttributeError, IndexError): + pass + # Handle output.text accumulation when choices is null if (parsed_response.output and hasattr(parsed_response.output, 'text') and @@ -516,11 +601,16 @@ def merge_multimodal_single_response(parsed_response, accumulated_data, n=1): else: # All choices finished, mark as sent first for data in accumulated_data.values(): - data['all_choices_sent'] = True + if isinstance(data, dict) and 'all_choices_sent' in data: + data['all_choices_sent'] = True # Return final result with all choices all_choices = [] for choice_idx, data in accumulated_data.items(): + # Skip non-choice data (like usage_by_index) + if not isinstance(data, dict) or 'finished' not in data: + continue + # Create a new choice object final_choice_dict = { 'index': choice_idx, @@ -553,4 +643,42 @@ def merge_multimodal_single_response(parsed_response, accumulated_data, n=1): # Update output choices with all accumulated choices parsed_response.output.choices = all_choices + # Aggregate usage from all choice indices + if 'usage_by_index' in accumulated_data and accumulated_data[ + 'usage_by_index']: + aggregated_usage = {} + usage_by_idx = accumulated_data['usage_by_index'] + + # Sum output_tokens and recalculate total_tokens + total_output_tokens = 0 + input_tokens = None + prompt_tokens_details = None + + for idx, usage in usage_by_idx.items(): + if 'output_tokens' in usage: + total_output_tokens += usage['output_tokens'] + # input_tokens should be the same for all indices + if input_tokens is None and 'input_tokens' in usage: + input_tokens = usage['input_tokens'] + # Keep prompt_tokens_details from any index + # (should be same) + if (prompt_tokens_details is None and + 'prompt_tokens_details' in usage): + prompt_tokens_details = usage[ + 'prompt_tokens_details'] + + # Build aggregated usage + if input_tokens is not None: + aggregated_usage['input_tokens'] = input_tokens + aggregated_usage['output_tokens'] = total_output_tokens + if input_tokens is not None: + aggregated_usage['total_tokens'] = ( + input_tokens + total_output_tokens) + if prompt_tokens_details is not None: + aggregated_usage['prompt_tokens_details'] = ( + prompt_tokens_details) + + # Update response usage with aggregated values + parsed_response.usage = aggregated_usage + return True From ee1dc4e4931bc3640be07f97e5dff83f14c71e80 Mon Sep 17 00:00:00 2001 From: Kevin Lin Date: Wed, 29 Oct 2025 19:21:32 +0800 Subject: [PATCH 10/12] fix: last response of tool calls --- dashscope/aigc/generation.py | 16 +- dashscope/aigc/multimodal_conversation.py | 16 +- dashscope/utils/message_utils.py | 516 ++++++++++++++-------- 3 files changed, 361 insertions(+), 187 deletions(-) diff --git a/dashscope/aigc/generation.py b/dashscope/aigc/generation.py index 28bb7f7..03645f7 100644 --- a/dashscope/aigc/generation.py +++ b/dashscope/aigc/generation.py @@ -213,9 +213,13 @@ def _merge_generation_response(cls, response, n=1) -> Generator[GenerationRespon accumulated_data = {} for rsp in response: parsed_response = GenerationResponse.from_api_response(rsp) - should_yield = merge_single_response(parsed_response, accumulated_data, n) - if should_yield: + result = merge_single_response(parsed_response, accumulated_data, n) + if result is True: yield parsed_response + elif isinstance(result, list): + # Multiple responses to yield (for n>1 non-stop cases) + for resp in result: + yield resp class AioGeneration(BaseAioApi): @@ -382,6 +386,10 @@ async def _merge_generation_response(cls, response, n=1) -> AsyncGenerator[Gener async for rsp in response: # type: ignore parsed_response = GenerationResponse.from_api_response(rsp) - should_yield = merge_single_response(parsed_response, accumulated_data, n) - if should_yield: + result = merge_single_response(parsed_response, accumulated_data, n) + if result is True: yield parsed_response + elif isinstance(result, list): + # Multiple responses to yield (for n>1 non-stop cases) + for resp in result: + yield resp diff --git a/dashscope/aigc/multimodal_conversation.py b/dashscope/aigc/multimodal_conversation.py index 1b0983a..bf30e5a 100644 --- a/dashscope/aigc/multimodal_conversation.py +++ b/dashscope/aigc/multimodal_conversation.py @@ -172,9 +172,13 @@ def _merge_multimodal_response(cls, response, n=1) -> Generator[MultiModalConver for rsp in response: parsed_response = MultiModalConversationResponse.from_api_response(rsp) - should_yield = merge_multimodal_single_response(parsed_response, accumulated_data, n) - if should_yield: + result = merge_multimodal_single_response(parsed_response, accumulated_data, n) + if result is True: yield parsed_response + elif isinstance(result, list): + # Multiple responses to yield (for n>1 non-stop cases) + for resp in result: + yield resp class AioMultiModalConversation(BaseAioApi): @@ -341,8 +345,12 @@ async def _merge_multimodal_response(cls, response, n=1) -> AsyncGenerator[Multi async for rsp in response: parsed_response = MultiModalConversationResponse.from_api_response(rsp) - should_yield = merge_multimodal_single_response(parsed_response, accumulated_data, n) - if should_yield: + result = merge_multimodal_single_response(parsed_response, accumulated_data, n) + if result is True: yield parsed_response + elif isinstance(result, list): + # Multiple responses to yield (for n>1 non-stop cases) + for resp in result: + yield resp diff --git a/dashscope/utils/message_utils.py b/dashscope/utils/message_utils.py index 7f56bc5..7153d3f 100644 --- a/dashscope/utils/message_utils.py +++ b/dashscope/utils/message_utils.py @@ -1,4 +1,5 @@ # Copyright (c) Alibaba, Inc. and its affiliates. +import copy def merge_single_response(parsed_response, accumulated_data, n=1): """Merge a single response chunk with accumulated data. @@ -9,12 +10,15 @@ def merge_single_response(parsed_response, accumulated_data, n=1): n: Number of expected choices (default 1) Returns: - bool: True if this response should be yielded, False if filtered + bool or list: True if this response should be yielded normally, + False if filtered, or a list of responses for n>1 with + non-stop finish reasons """ # Check if all choices have been sent (for n > 1 case) if n > 1 and accumulated_data: - all_sent = any(data.get('all_choices_sent', False) - for data in accumulated_data.values()) + all_sent = all(data.get('all_choices_sent', False) + for data in accumulated_data.values() + if isinstance(data, dict) and 'all_choices_sent' in data) if all_sent: return False @@ -246,100 +250,175 @@ def merge_single_response(parsed_response, accumulated_data, n=1): choice.finish_reason accumulated_data[choice_idx]['finished'] = True - # Check if all choices are finished when n > 1 + # Handle n > 1 case: different strategies for different finish_reason if n > 1: + # Count finished choices finished_count = sum(1 for data in accumulated_data.values() - if data.get('finished', False)) - - # If not all choices finished, hide finish_reason - if finished_count < n: - for choice in choices: - if (hasattr(choice, 'finish_reason') and - choice.finish_reason and - choice.finish_reason != 'null'): - choice.finish_reason = 'null' + if isinstance(data, dict) and + data.get('finished', False)) + + # Find all finished choices in current packet + finished_choices_in_packet = [] + for choice in choices: + if (hasattr(choice, 'finish_reason') and + choice.finish_reason and + choice.finish_reason != 'null'): + choice_idx = (choice.index if hasattr(choice, 'index') and + 'index' in choice else 0) + finish_reason = choice.finish_reason + finished_choices_in_packet.append( + (choice_idx, finish_reason, choice)) + + # No finish_reason in current packet: return as is + if not finished_choices_in_packet: + return True + + # Get finish_reason type from first finished choice + first_finish_reason = finished_choices_in_packet[0][1] + + # For stop: wait all choices, then merge into one result + if first_finish_reason == 'stop': + if finished_count < n: + # Hide finish_reason until all finished + for choice in choices: + if (hasattr(choice, 'finish_reason') and + choice.finish_reason and + choice.finish_reason != 'null'): + choice.finish_reason = 'null' + else: + # All finished: merge all choices into one result + for data in accumulated_data.values(): + if isinstance(data, dict) and 'all_choices_sent' in data: + data['all_choices_sent'] = True + + # Return final result with all choices + all_choices = [] + for choice_idx, data in accumulated_data.items(): + # Skip non-choice data (like usage_by_index) + if not isinstance(data, dict) or 'finished' not in data: + continue + + # Create a new choice object + final_choice_dict = { + 'index': choice_idx, + 'finish_reason': data['finish_reason'] + } + + # Create message + message_dict = { + 'role': data['role'] if data['role'] else 'assistant' + } + if data['content']: + message_dict['content'] = ( + data['content'] if isinstance(data['content'], str) + else data['content']) + if data['reasoning_content']: + message_dict['reasoning_content'] = data['reasoning_content'] + if data['tool_calls']: + message_dict['tool_calls'] = data['tool_calls'] + + final_choice_dict['message'] = message_dict + + # Add logprobs if present + if data['logprobs']['content']: + final_choice_dict['logprobs'] = { + 'content': data['logprobs']['content'] + } + + all_choices.append(final_choice_dict) + + # Update output choices with all accumulated choices + parsed_response.output.choices = all_choices + + # Aggregate usage from all choice indices + if 'usage_by_index' in accumulated_data and accumulated_data[ + 'usage_by_index']: + aggregated_usage = {} + usage_by_idx = accumulated_data['usage_by_index'] + + # Sum output_tokens and recalculate total_tokens + total_output_tokens = 0 + input_tokens = None + prompt_tokens_details = None + + for idx, usage in usage_by_idx.items(): + if 'output_tokens' in usage: + total_output_tokens += usage['output_tokens'] + # input_tokens should be the same for all indices + if input_tokens is None and 'input_tokens' in usage: + input_tokens = usage['input_tokens'] + # Keep prompt_tokens_details from any index + # (should be same) + if (prompt_tokens_details is None and + 'prompt_tokens_details' in usage): + prompt_tokens_details = usage[ + 'prompt_tokens_details'] + + # Build aggregated usage + if input_tokens is not None: + aggregated_usage['input_tokens'] = input_tokens + aggregated_usage['output_tokens'] = total_output_tokens + if input_tokens is not None: + aggregated_usage['total_tokens'] = ( + input_tokens + total_output_tokens) + if prompt_tokens_details is not None: + aggregated_usage['prompt_tokens_details'] = ( + prompt_tokens_details) + + # Update response usage with aggregated values + parsed_response.usage = aggregated_usage else: - # All choices finished, mark as sent first - for data in accumulated_data.values(): - if isinstance(data, dict) and 'all_choices_sent' in data: - data['all_choices_sent'] = True - - # Return final result with all choices - all_choices = [] - for choice_idx, data in accumulated_data.items(): - # Skip non-choice data (like usage_by_index) - if not isinstance(data, dict) or 'finished' not in data: + # For non-stop (e.g., tool_calls): output each choice separately + responses_to_yield = [] + + for choice_idx, finish_reason, choice in finished_choices_in_packet: + current_data = accumulated_data.get(choice_idx) + if (current_data is None or + current_data.get('all_choices_sent', False)): continue - # Create a new choice object - final_choice_dict = { - 'index': choice_idx, - 'finish_reason': data['finish_reason'] - } - - # Create message - message_dict = { - 'role': data['role'] if data['role'] else 'assistant' - } - if data['content']: - message_dict['content'] = ( - data['content'] if isinstance(data['content'], str) - else data['content']) - if data['reasoning_content']: - message_dict['reasoning_content'] = data['reasoning_content'] - if data['tool_calls']: - message_dict['tool_calls'] = data['tool_calls'] - - final_choice_dict['message'] = message_dict - - # Add logprobs if present - if data['logprobs']['content']: - final_choice_dict['logprobs'] = { - 'content': data['logprobs']['content'] - } + current_data['all_choices_sent'] = True - all_choices.append(final_choice_dict) - - # Update output choices with all accumulated choices - parsed_response.output.choices = all_choices - - # Aggregate usage from all choice indices - if 'usage_by_index' in accumulated_data and accumulated_data[ - 'usage_by_index']: - aggregated_usage = {} - usage_by_idx = accumulated_data['usage_by_index'] - - # Sum output_tokens and recalculate total_tokens - total_output_tokens = 0 - input_tokens = None - prompt_tokens_details = None - - for idx, usage in usage_by_idx.items(): - if 'output_tokens' in usage: - total_output_tokens += usage['output_tokens'] - # input_tokens should be the same for all indices - if input_tokens is None and 'input_tokens' in usage: - input_tokens = usage['input_tokens'] - # Keep prompt_tokens_details from any index - # (should be same) - if (prompt_tokens_details is None and - 'prompt_tokens_details' in usage): - prompt_tokens_details = usage[ - 'prompt_tokens_details'] - - # Build aggregated usage - if input_tokens is not None: - aggregated_usage['input_tokens'] = input_tokens - aggregated_usage['output_tokens'] = total_output_tokens - if input_tokens is not None: - aggregated_usage['total_tokens'] = ( - input_tokens + total_output_tokens) - if prompt_tokens_details is not None: - aggregated_usage['prompt_tokens_details'] = ( - prompt_tokens_details) - - # Update response usage with aggregated values - parsed_response.usage = aggregated_usage + # Create a new response for this choice + if responses_to_yield: + # Clone the response for additional choices + new_response = copy.deepcopy(parsed_response) + else: + # Use the original response for the first choice + new_response = parsed_response + + # Set only this choice in the response + new_response.output.choices = [choice] + + # Remove index field from tool_calls in final output + if (hasattr(choice, 'message') and choice.message and + 'tool_calls' in choice.message and + choice.message.tool_calls): + for tool_call in choice.message.tool_calls: + if isinstance(tool_call, dict) and 'index' in tool_call: + del tool_call['index'] + + # Update usage with this choice's output tokens + if (new_response.usage and + 'usage_by_index' in accumulated_data and + choice_idx in accumulated_data['usage_by_index']): + current_usage = accumulated_data['usage_by_index'][ + choice_idx] + if 'output_tokens' in current_usage: + new_response.usage['output_tokens'] = ( + current_usage['output_tokens']) + if 'input_tokens' in current_usage: + new_response.usage['total_tokens'] = ( + current_usage['input_tokens'] + + current_usage['output_tokens']) + + responses_to_yield.append(new_response) + + # Return list of responses if we have any + if responses_to_yield: + return responses_to_yield + else: + return False return True @@ -586,99 +665,178 @@ def merge_multimodal_single_response(parsed_response, accumulated_data, n=1): choice.finish_reason accumulated_data[choice_idx]['finished'] = True - # Check if all choices are finished when n > 1 + # Handle n > 1 case: different strategies for different + # finish_reason if n > 1: + # Count finished choices finished_count = sum(1 for data in accumulated_data.values() - if data.get('finished', False)) - - # If not all choices finished, hide finish_reason - if finished_count < n: - for choice in choices: - if (hasattr(choice, 'finish_reason') and - choice.finish_reason and - choice.finish_reason != 'null'): - choice.finish_reason = 'null' + if isinstance(data, dict) and + data.get('finished', False)) + + # Find all finished choices in current packet + finished_choices_in_packet = [] + for choice in choices: + if (hasattr(choice, 'finish_reason') and + choice.finish_reason and + choice.finish_reason != 'null'): + choice_idx = (choice.index if hasattr(choice, 'index') and + 'index' in choice else 0) + finish_reason = choice.finish_reason + finished_choices_in_packet.append( + (choice_idx, finish_reason, choice)) + + # No finish_reason in current packet: return as is + if not finished_choices_in_packet: + return True + + # Get finish_reason type from first finished choice + first_finish_reason = finished_choices_in_packet[0][1] + + # For stop: wait all choices, then merge into one result + if first_finish_reason == 'stop': + if finished_count < n: + # Hide finish_reason until all finished + for choice in choices: + if (hasattr(choice, 'finish_reason') and + choice.finish_reason and + choice.finish_reason != 'null'): + choice.finish_reason = 'null' + else: + # All finished: merge all choices into one result + for data in accumulated_data.values(): + if isinstance(data, dict) and 'all_choices_sent' in data: + data['all_choices_sent'] = True + + # Return final result with all choices + all_choices = [] + for choice_idx, data in accumulated_data.items(): + # Skip non-choice data (like usage_by_index) + if not isinstance(data, dict) or 'finished' not in data: + continue + + # Create a new choice object + final_choice_dict = { + 'index': choice_idx, + 'finish_reason': data['finish_reason'] + } + + # Create message + message_dict = { + 'role': data['role'] if data['role'] else 'assistant' + } + if data['content']: + message_dict['content'] = ( + data['content'] if isinstance(data['content'], + str) + else data['content']) + if data['reasoning_content']: + message_dict['reasoning_content'] = ( + data['reasoning_content']) + if data['tool_calls']: + message_dict['tool_calls'] = data['tool_calls'] + + final_choice_dict['message'] = message_dict + + # Add logprobs if present + if data['logprobs']['content']: + final_choice_dict['logprobs'] = { + 'content': data['logprobs']['content'] + } + + all_choices.append(final_choice_dict) + + # Update output choices with all accumulated choices + parsed_response.output.choices = all_choices + + # Aggregate usage from all choice indices + if 'usage_by_index' in accumulated_data and accumulated_data[ + 'usage_by_index']: + aggregated_usage = {} + usage_by_idx = accumulated_data['usage_by_index'] + + # Sum output_tokens and recalculate total_tokens + total_output_tokens = 0 + input_tokens = None + prompt_tokens_details = None + + for idx, usage in usage_by_idx.items(): + if 'output_tokens' in usage: + total_output_tokens += usage['output_tokens'] + # input_tokens should be the same for all indices + if input_tokens is None and 'input_tokens' in usage: + input_tokens = usage['input_tokens'] + # Keep prompt_tokens_details from any index + # (should be same) + if (prompt_tokens_details is None and + 'prompt_tokens_details' in usage): + prompt_tokens_details = usage[ + 'prompt_tokens_details'] + + # Build aggregated usage + if input_tokens is not None: + aggregated_usage['input_tokens'] = input_tokens + aggregated_usage['output_tokens'] = total_output_tokens + if input_tokens is not None: + aggregated_usage['total_tokens'] = ( + input_tokens + total_output_tokens) + if prompt_tokens_details is not None: + aggregated_usage['prompt_tokens_details'] = ( + prompt_tokens_details) + + # Update response usage with aggregated values + parsed_response.usage = aggregated_usage else: - # All choices finished, mark as sent first - for data in accumulated_data.values(): - if isinstance(data, dict) and 'all_choices_sent' in data: - data['all_choices_sent'] = True - - # Return final result with all choices - all_choices = [] - for choice_idx, data in accumulated_data.items(): - # Skip non-choice data (like usage_by_index) - if not isinstance(data, dict) or 'finished' not in data: + # For non-stop (e.g., tool_calls): output each choice + # separately + responses_to_yield = [] + + for choice_idx, finish_reason, choice in finished_choices_in_packet: + current_data = accumulated_data.get(choice_idx) + if (current_data is None or + current_data.get('all_choices_sent', False)): continue - # Create a new choice object - final_choice_dict = { - 'index': choice_idx, - 'finish_reason': data['finish_reason'] - } - - # Create message - message_dict = { - 'role': data['role'] if data['role'] else 'assistant' - } - if data['content']: - message_dict['content'] = ( - data['content'] if isinstance(data['content'], str) - else data['content']) - if data['reasoning_content']: - message_dict['reasoning_content'] = data['reasoning_content'] - if data['tool_calls']: - message_dict['tool_calls'] = data['tool_calls'] - - final_choice_dict['message'] = message_dict - - # Add logprobs if present - if data['logprobs']['content']: - final_choice_dict['logprobs'] = { - 'content': data['logprobs']['content'] - } + current_data['all_choices_sent'] = True - all_choices.append(final_choice_dict) - - # Update output choices with all accumulated choices - parsed_response.output.choices = all_choices - - # Aggregate usage from all choice indices - if 'usage_by_index' in accumulated_data and accumulated_data[ - 'usage_by_index']: - aggregated_usage = {} - usage_by_idx = accumulated_data['usage_by_index'] - - # Sum output_tokens and recalculate total_tokens - total_output_tokens = 0 - input_tokens = None - prompt_tokens_details = None - - for idx, usage in usage_by_idx.items(): - if 'output_tokens' in usage: - total_output_tokens += usage['output_tokens'] - # input_tokens should be the same for all indices - if input_tokens is None and 'input_tokens' in usage: - input_tokens = usage['input_tokens'] - # Keep prompt_tokens_details from any index - # (should be same) - if (prompt_tokens_details is None and - 'prompt_tokens_details' in usage): - prompt_tokens_details = usage[ - 'prompt_tokens_details'] - - # Build aggregated usage - if input_tokens is not None: - aggregated_usage['input_tokens'] = input_tokens - aggregated_usage['output_tokens'] = total_output_tokens - if input_tokens is not None: - aggregated_usage['total_tokens'] = ( - input_tokens + total_output_tokens) - if prompt_tokens_details is not None: - aggregated_usage['prompt_tokens_details'] = ( - prompt_tokens_details) - - # Update response usage with aggregated values - parsed_response.usage = aggregated_usage + # Create a new response for this choice + if responses_to_yield: + # Clone the response for additional choices + new_response = copy.deepcopy(parsed_response) + else: + # Use the original response for the first choice + new_response = parsed_response + + # Set only this choice in the response + new_response.output.choices = [choice] + + # Remove index field from tool_calls in final output + if (hasattr(choice, 'message') and choice.message and + 'tool_calls' in choice.message and + choice.message.tool_calls): + for tool_call in choice.message.tool_calls: + if isinstance(tool_call, dict) and 'index' in tool_call: + del tool_call['index'] + + # Update usage with this choice's output tokens + if (new_response.usage and + 'usage_by_index' in accumulated_data and + choice_idx in accumulated_data['usage_by_index']): + current_usage = accumulated_data['usage_by_index'][ + choice_idx] + if 'output_tokens' in current_usage: + new_response.usage['output_tokens'] = ( + current_usage['output_tokens']) + if 'input_tokens' in current_usage: + new_response.usage['total_tokens'] = ( + current_usage['input_tokens'] + + current_usage['output_tokens']) + + responses_to_yield.append(new_response) + + # Return list of responses if we have any + if responses_to_yield: + return responses_to_yield + else: + return False return True From a0ebbc36069261bb5f8e5d9cd58f5c2ab24ca178 Mon Sep 17 00:00:00 2001 From: Kevin Lin Date: Wed, 29 Oct 2025 19:48:41 +0800 Subject: [PATCH 11/12] fix: incorrect order of last responses while n > 1 --- dashscope/utils/message_utils.py | 26 ++++++++++++++++---------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/dashscope/utils/message_utils.py b/dashscope/utils/message_utils.py index 7153d3f..bf1b479 100644 --- a/dashscope/utils/message_utils.py +++ b/dashscope/utils/message_utils.py @@ -293,11 +293,14 @@ def merge_single_response(parsed_response, accumulated_data, n=1): # Return final result with all choices all_choices = [] - for choice_idx, data in accumulated_data.items(): - # Skip non-choice data (like usage_by_index) - if not isinstance(data, dict) or 'finished' not in data: - continue - + # Sort by choice_idx to ensure correct order + sorted_items = sorted( + [(idx, data) for idx, data in accumulated_data.items() + if isinstance(data, dict) and 'finished' in data], + key=lambda x: x[0] + ) + + for choice_idx, data in sorted_items: # Create a new choice object final_choice_dict = { 'index': choice_idx, @@ -709,11 +712,14 @@ def merge_multimodal_single_response(parsed_response, accumulated_data, n=1): # Return final result with all choices all_choices = [] - for choice_idx, data in accumulated_data.items(): - # Skip non-choice data (like usage_by_index) - if not isinstance(data, dict) or 'finished' not in data: - continue - + # Sort by choice_idx to ensure correct order + sorted_items = sorted( + [(idx, data) for idx, data in accumulated_data.items() + if isinstance(data, dict) and 'finished' in data], + key=lambda x: x[0] + ) + + for choice_idx, data in sorted_items: # Create a new choice object final_choice_dict = { 'index': choice_idx, From d70b2c10294e2d7d345a25fab90d5c254b093fba Mon Sep 17 00:00:00 2001 From: Kevin Lin Date: Tue, 4 Nov 2025 17:12:57 +0800 Subject: [PATCH 12/12] fix: missing index while n > 1 in tool calling --- dashscope/utils/message_utils.py | 26 ++++++++------------------ 1 file changed, 8 insertions(+), 18 deletions(-) diff --git a/dashscope/utils/message_utils.py b/dashscope/utils/message_utils.py index bf1b479..306c3f4 100644 --- a/dashscope/utils/message_utils.py +++ b/dashscope/utils/message_utils.py @@ -390,16 +390,11 @@ def merge_single_response(parsed_response, accumulated_data, n=1): # Use the original response for the first choice new_response = parsed_response - # Set only this choice in the response - new_response.output.choices = [choice] + # Deep copy choice to avoid modifying accumulated_data + choice_copy = copy.deepcopy(choice) - # Remove index field from tool_calls in final output - if (hasattr(choice, 'message') and choice.message and - 'tool_calls' in choice.message and - choice.message.tool_calls): - for tool_call in choice.message.tool_calls: - if isinstance(tool_call, dict) and 'index' in tool_call: - del tool_call['index'] + # Set only this choice in the response + new_response.output.choices = [choice_copy] # Update usage with this choice's output tokens if (new_response.usage and @@ -812,16 +807,11 @@ def merge_multimodal_single_response(parsed_response, accumulated_data, n=1): # Use the original response for the first choice new_response = parsed_response + # Deep copy choice to avoid modifying accumulated_data + choice_copy = copy.deepcopy(choice) + # Set only this choice in the response - new_response.output.choices = [choice] - - # Remove index field from tool_calls in final output - if (hasattr(choice, 'message') and choice.message and - 'tool_calls' in choice.message and - choice.message.tool_calls): - for tool_call in choice.message.tool_calls: - if isinstance(tool_call, dict) and 'index' in tool_call: - del tool_call['index'] + new_response.output.choices = [choice_copy] # Update usage with this choice's output tokens if (new_response.usage and