diff --git a/dashscope/aigc/generation.py b/dashscope/aigc/generation.py index cfe8a73..03645f7 100644 --- a/dashscope/aigc/generation.py +++ b/dashscope/aigc/generation.py @@ -2,7 +2,7 @@ import copy import json -from typing import Any, Dict, Generator, List, Union +from typing import Any, Dict, Generator, List, Union, AsyncGenerator from dashscope.api_entities.dashscope_response import (GenerationResponse, Message, Role) @@ -13,6 +13,8 @@ from dashscope.common.error import InputRequired, ModelRequired from dashscope.common.logging import logger from dashscope.common.utils import _get_task_group_and_task +from dashscope.utils.param_utils import ParamUtil +from dashscope.utils.message_utils import merge_single_response class Generation(BaseApi): @@ -137,6 +139,16 @@ def call( kwargs['headers'] = headers input, parameters = cls._build_input_parameters( model, prompt, history, messages, **kwargs) + + is_stream = parameters.get('stream', False) + # Check if we need to merge incremental output + is_incremental_output = kwargs.get('incremental_output', None) + to_merge_incremental_output = False + if (ParamUtil.should_modify_incremental_output(model) and + is_stream and is_incremental_output is False): + to_merge_incremental_output = True + parameters['incremental_output'] = True + response = super().call(model=model, task_group=task_group, task=Generation.task, @@ -145,10 +157,14 @@ def call( input=input, workspace=workspace, **parameters) - is_stream = kwargs.get('stream', False) if is_stream: - return (GenerationResponse.from_api_response(rsp) - for rsp in response) + if to_merge_incremental_output: + # Extract n parameter for merge logic + n = parameters.get('n', 1) + return cls._merge_generation_response(response, n) + else: + return (GenerationResponse.from_api_response(rsp) + for rsp in response) else: return GenerationResponse.from_api_response(response) @@ -191,6 +207,20 @@ def _build_input_parameters(cls, model, prompt, history, messages, return input, {**parameters, **kwargs} + @classmethod + def _merge_generation_response(cls, response, n=1) -> Generator[GenerationResponse, None, None]: + """Merge incremental response chunks to simulate non-incremental output.""" + accumulated_data = {} + for rsp in response: + parsed_response = GenerationResponse.from_api_response(rsp) + result = merge_single_response(parsed_response, accumulated_data, n) + if result is True: + yield parsed_response + elif isinstance(result, list): + # Multiple responses to yield (for n>1 non-stop cases) + for resp in result: + yield resp + class AioGeneration(BaseAioApi): task = 'text-generation' @@ -220,7 +250,7 @@ async def call( plugins: Union[str, Dict[str, Any]] = None, workspace: str = None, **kwargs - ) -> Union[GenerationResponse, Generator[GenerationResponse, None, None]]: + ) -> Union[GenerationResponse, AsyncGenerator[GenerationResponse, None]]: """Call generation model service. Args: @@ -296,8 +326,8 @@ async def call( Returns: Union[GenerationResponse, - Generator[GenerationResponse, None, None]]: If - stream is True, return Generator, otherwise GenerationResponse. + AsyncGenerator[GenerationResponse, None]]: If + stream is True, return AsyncGenerator, otherwise GenerationResponse. """ if (prompt is None or not prompt) and (messages is None or not messages): @@ -314,6 +344,16 @@ async def call( kwargs['headers'] = headers input, parameters = Generation._build_input_parameters( model, prompt, history, messages, **kwargs) + + is_stream = parameters.get('stream', False) + # Check if we need to merge incremental output + is_incremental_output = kwargs.get('incremental_output', None) + to_merge_incremental_output = False + if (ParamUtil.should_modify_incremental_output(model) and + is_stream and is_incremental_output is False): + to_merge_incremental_output = True + parameters['incremental_output'] = True + response = await super().call(model=model, task_group=task_group, task=Generation.task, @@ -322,9 +362,34 @@ async def call( input=input, workspace=workspace, **parameters) - is_stream = kwargs.get('stream', False) if is_stream: - return (GenerationResponse.from_api_response(rsp) - async for rsp in response) + if to_merge_incremental_output: + # Extract n parameter for merge logic + n = parameters.get('n', 1) + return cls._merge_generation_response(response, n) + else: + return cls._stream_responses(response) else: return GenerationResponse.from_api_response(response) + + @classmethod + async def _stream_responses(cls, response) -> AsyncGenerator[GenerationResponse, None]: + """Convert async response stream to GenerationResponse stream.""" + # Type hint: when stream=True, response is actually an AsyncIterable + async for rsp in response: # type: ignore + yield GenerationResponse.from_api_response(rsp) + + @classmethod + async def _merge_generation_response(cls, response, n=1) -> AsyncGenerator[GenerationResponse, None]: + """Async version of merge incremental response chunks.""" + accumulated_data = {} + + async for rsp in response: # type: ignore + parsed_response = GenerationResponse.from_api_response(rsp) + result = merge_single_response(parsed_response, accumulated_data, n) + if result is True: + yield parsed_response + elif isinstance(result, list): + # Multiple responses to yield (for n>1 non-stop cases) + for resp in result: + yield resp diff --git a/dashscope/aigc/multimodal_conversation.py b/dashscope/aigc/multimodal_conversation.py index b6b4136..bf30e5a 100644 --- a/dashscope/aigc/multimodal_conversation.py +++ b/dashscope/aigc/multimodal_conversation.py @@ -1,7 +1,7 @@ # Copyright (c) Alibaba, Inc. and its affiliates. import copy -from typing import Generator, List, Union +from typing import AsyncGenerator, Generator, List, Union from dashscope.api_entities.dashscope_response import \ MultiModalConversationResponse @@ -9,6 +9,8 @@ from dashscope.common.error import InputRequired, ModelRequired from dashscope.common.utils import _get_task_group_and_task from dashscope.utils.oss_utils import preprocess_message_element +from dashscope.utils.param_utils import ParamUtil +from dashscope.utils.message_utils import merge_multimodal_single_response class MultiModalConversation(BaseApi): @@ -108,6 +110,16 @@ def call( input.update({'language_type': language_type}) if msg_copy is not None: input.update({'messages': msg_copy}) + + # Check if we need to merge incremental output + is_incremental_output = kwargs.get('incremental_output', None) + to_merge_incremental_output = False + is_stream = kwargs.get('stream', False) + if (ParamUtil.should_modify_incremental_output(model) and + is_stream and is_incremental_output is not None and is_incremental_output is False): + to_merge_incremental_output = True + kwargs['incremental_output'] = True + response = super().call(model=model, task_group=task_group, task=MultiModalConversation.task, @@ -116,10 +128,14 @@ def call( input=input, workspace=workspace, **kwargs) - is_stream = kwargs.get('stream', False) if is_stream: - return (MultiModalConversationResponse.from_api_response(rsp) - for rsp in response) + if to_merge_incremental_output: + # Extract n parameter for merge logic + n = kwargs.get('n', 1) + return cls._merge_multimodal_response(response, n) + else: + return (MultiModalConversationResponse.from_api_response(rsp) + for rsp in response) else: return MultiModalConversationResponse.from_api_response(response) @@ -149,6 +165,21 @@ def _preprocess_messages(cls, model: str, messages: List[dict], has_upload = True return has_upload + @classmethod + def _merge_multimodal_response(cls, response, n=1) -> Generator[MultiModalConversationResponse, None, None]: + """Merge incremental response chunks to simulate non-incremental output.""" + accumulated_data = {} + + for rsp in response: + parsed_response = MultiModalConversationResponse.from_api_response(rsp) + result = merge_multimodal_single_response(parsed_response, accumulated_data, n) + if result is True: + yield parsed_response + elif isinstance(result, list): + # Multiple responses to yield (for n>1 non-stop cases) + for resp in result: + yield resp + class AioMultiModalConversation(BaseAioApi): """Async MultiModal conversational robot interface. @@ -170,8 +201,8 @@ async def call( voice: str = None, language_type: str = None, **kwargs - ) -> Union[MultiModalConversationResponse, Generator[ - MultiModalConversationResponse, None, None]]: + ) -> Union[MultiModalConversationResponse, AsyncGenerator[ + MultiModalConversationResponse, None]]: """Call the conversation model service asynchronously. Args: @@ -221,8 +252,8 @@ async def call( Returns: Union[MultiModalConversationResponse, - Generator[MultiModalConversationResponse, None, None]]: If - stream is True, return Generator, otherwise MultiModalConversationResponse. + AsyncGenerator[MultiModalConversationResponse, None]]: If + stream is True, return AsyncGenerator, otherwise MultiModalConversationResponse. """ if model is None or not model: raise ModelRequired('Model is required!') @@ -246,6 +277,16 @@ async def call( input.update({'language_type': language_type}) if msg_copy is not None: input.update({'messages': msg_copy}) + + # Check if we need to merge incremental output + is_incremental_output = kwargs.get('incremental_output', None) + to_merge_incremental_output = False + is_stream = kwargs.get('stream', False) + if (ParamUtil.should_modify_incremental_output(model) and + is_stream and is_incremental_output is not None and is_incremental_output is False): + to_merge_incremental_output = True + kwargs['incremental_output'] = True + response = await super().call(model=model, task_group=task_group, task=AioMultiModalConversation.task, @@ -254,10 +295,13 @@ async def call( input=input, workspace=workspace, **kwargs) - is_stream = kwargs.get('stream', False) if is_stream: - return (MultiModalConversationResponse.from_api_response(rsp) - async for rsp in response) + if to_merge_incremental_output: + # Extract n parameter for merge logic + n = kwargs.get('n', 1) + return cls._merge_multimodal_response(response, n) + else: + return cls._stream_responses(response) else: return MultiModalConversationResponse.from_api_response(response) @@ -286,3 +330,27 @@ def _preprocess_messages(cls, model: str, messages: List[dict], if is_upload and not has_upload: has_upload = True return has_upload + + @classmethod + async def _stream_responses(cls, response) -> AsyncGenerator[MultiModalConversationResponse, None]: + """Convert async response stream to MultiModalConversationResponse stream.""" + # Type hint: when stream=True, response is actually an AsyncIterable + async for rsp in response: # type: ignore + yield MultiModalConversationResponse.from_api_response(rsp) + + @classmethod + async def _merge_multimodal_response(cls, response, n=1) -> AsyncGenerator[MultiModalConversationResponse, None]: + """Async version of merge incremental response chunks.""" + accumulated_data = {} + + async for rsp in response: + parsed_response = MultiModalConversationResponse.from_api_response(rsp) + result = merge_multimodal_single_response(parsed_response, accumulated_data, n) + if result is True: + yield parsed_response + elif isinstance(result, list): + # Multiple responses to yield (for n>1 non-stop cases) + for resp in result: + yield resp + + diff --git a/dashscope/utils/message_utils.py b/dashscope/utils/message_utils.py new file mode 100644 index 0000000..306c3f4 --- /dev/null +++ b/dashscope/utils/message_utils.py @@ -0,0 +1,838 @@ +# Copyright (c) Alibaba, Inc. and its affiliates. +import copy + +def merge_single_response(parsed_response, accumulated_data, n=1): + """Merge a single response chunk with accumulated data. + + Args: + parsed_response: The response chunk to merge + accumulated_data: Dictionary storing accumulated data for each choice + n: Number of expected choices (default 1) + + Returns: + bool or list: True if this response should be yielded normally, + False if filtered, or a list of responses for n>1 with + non-stop finish reasons + """ + # Check if all choices have been sent (for n > 1 case) + if n > 1 and accumulated_data: + all_sent = all(data.get('all_choices_sent', False) + for data in accumulated_data.values() + if isinstance(data, dict) and 'all_choices_sent' in data) + if all_sent: + return False + + # Track usage for each choice index when n > 1 + # Each streaming packet contains usage info for one specific choice + if (n > 1 and parsed_response.usage and + parsed_response.output and parsed_response.output.choices and + len(parsed_response.output.choices) > 0): + if 'usage_by_index' not in accumulated_data: + accumulated_data['usage_by_index'] = {} + + # Get the choice index from the first (and typically only) choice in this packet + try: + first_choice = parsed_response.output.choices[0] + choice_idx = first_choice.index if hasattr( + first_choice, 'index') and 'index' in first_choice else 0 + + # Store only output_tokens for this choice index + if 'output_tokens' in parsed_response.usage: + accumulated_data['usage_by_index'][choice_idx] = dict( + parsed_response.usage) + except (KeyError, AttributeError, IndexError): + pass + + # Handle output.text accumulation when choices is null + if (parsed_response.output and + hasattr(parsed_response.output, 'text') and + (not parsed_response.output.choices or parsed_response.output.choices is None)): + choice_idx = 0 + if choice_idx not in accumulated_data: + accumulated_data[choice_idx] = { + 'content': '', + 'reasoning_content': '', + 'tool_calls': [], + 'logprobs': {'content': []}, + 'finished': False, + 'finish_reason': None, + 'all_choices_sent': False, + 'role': None + } + # Accumulate text if not empty + if parsed_response.output.text: + accumulated_data[choice_idx]['content'] += parsed_response.output.text + # Always set accumulated content back to response + parsed_response.output.text = accumulated_data[choice_idx]['content'] + return True + + # Process each choice in the choices array + if parsed_response.output and parsed_response.output.choices: + choices = parsed_response.output.choices + + # Filter out empty choices array + if not choices: + return False + + for choice_enum_idx, choice in enumerate(choices): + # Use choice.index if available, otherwise use enumerate index + try: + choice_idx = choice.index if hasattr(choice, 'index') and 'index' in choice else choice_enum_idx + except (KeyError, AttributeError): + choice_idx = choice_enum_idx + + # Initialize accumulated data for this choice if not exists + if choice_idx not in accumulated_data: + accumulated_data[choice_idx] = { + 'content': '', + 'reasoning_content': '', + 'tool_calls': [], + 'logprobs': {'content': []}, + 'finished': False, + 'finish_reason': None, + 'all_choices_sent': False, + 'role': None + } + + # Handle message field - create if null + if not choice.message: + # Create message object with accumulated data + choice.message = { + 'role': accumulated_data[choice_idx]['role'] if accumulated_data[choice_idx]['role'] else 'assistant', + 'content': accumulated_data[choice_idx]['content'] + } + if accumulated_data[choice_idx]['reasoning_content']: + choice.message['reasoning_content'] = accumulated_data[choice_idx]['reasoning_content'] + if accumulated_data[choice_idx]['tool_calls']: + choice.message['tool_calls'] = accumulated_data[choice_idx]['tool_calls'] + else: + # Save role if present + if hasattr(choice.message, 'role') and choice.message.role: + accumulated_data[choice_idx]['role'] = choice.message.role + + # Handle content accumulation + if 'content' in choice.message: + current_content = choice.message.content + if current_content: + # Check if content is multimodal format + if isinstance(current_content, list): + # Handle multimodal content (array format) + # Initialize accumulated content as array if not already + if not isinstance(accumulated_data[choice_idx]['content'], list): + accumulated_data[choice_idx]['content'] = [] + + # Ensure accumulated content list has enough elements + while len(accumulated_data[choice_idx]['content']) < len(current_content): + accumulated_data[choice_idx]['content'].append({'text': ''}) + + # Merge each content element + for content_idx, content_item in enumerate(current_content): + if isinstance(content_item, dict) and 'text' in content_item: + if content_item['text']: + # Accumulate text content + accumulated_data[choice_idx]['content'][content_idx]['text'] += content_item['text'] + # Update the current response with accumulated content + for content_idx in range(len(accumulated_data[choice_idx]['content'])): + if content_idx < len(choice.message.content): + choice.message.content[content_idx]['text'] = accumulated_data[choice_idx]['content'][content_idx]['text'] + else: + # Handle regular content (string format) + # Initialize accumulated content as string + if isinstance(accumulated_data[choice_idx]['content'], list): + accumulated_data[choice_idx]['content'] = '' + # Accumulate content if not empty + accumulated_data[choice_idx]['content'] += current_content + # Always set accumulated content back to response + if not isinstance(accumulated_data[choice_idx]['content'], list): + choice.message.content = accumulated_data[choice_idx]['content'] + else: + # For multimodal content, ensure message.content + # exists + if not isinstance(choice.message.content, list): + choice.message.content = accumulated_data[choice_idx]['content'] + + # Handle reasoning_content accumulation + if 'reasoning_content' in choice.message: + current_reasoning_content = choice.message.reasoning_content + if current_reasoning_content: + accumulated_data[choice_idx]['reasoning_content'] += current_reasoning_content + # Always set the accumulated reasoning_content back if we + # have any, even if current response doesn't have it + if accumulated_data[choice_idx]['reasoning_content']: + choice.message.reasoning_content = accumulated_data[choice_idx]['reasoning_content'] + + # Handle tool_calls accumulation + if 'tool_calls' in choice.message and choice.message.tool_calls: + current_tool_calls = choice.message.tool_calls + + # For each current tool call, accumulate its arguments + for current_call in current_tool_calls: + if isinstance(current_call, dict) and 'index' in current_call: + idx = current_call['index'] + + # Find existing accumulated call with same index + existing_call = None + for acc_call in accumulated_data[choice_idx]['tool_calls']: + if (isinstance(acc_call, dict) and + acc_call.get('index') == idx): + existing_call = acc_call + break + + if existing_call: + # Accumulate function fields from current call + if ('function' in current_call and + current_call['function']): + if 'function' not in existing_call: + existing_call['function'] = {} + + # Accumulate function.name + if 'name' in current_call['function']: + if 'name' not in existing_call['function']: + existing_call['function']['name'] = '' + existing_call['function']['name'] += current_call['function']['name'] + + # Accumulate function.arguments + if 'arguments' in current_call['function']: + if 'arguments' not in existing_call['function']: + existing_call['function']['arguments'] = '' + existing_call['function']['arguments'] += current_call['function']['arguments'] + + # Update other fields with latest values + existing_call.update({k: v for k, v in current_call.items() + if k != 'function' and v}) + if 'function' in current_call and current_call['function']: + existing_call['function'].update({k: v for k, v in current_call['function'].items() + if k not in ['arguments', 'name'] and v}) + else: + # Add new tool call + accumulated_data[choice_idx]['tool_calls'].append(dict(current_call)) + + # Update choice with accumulated tool_calls + choice.message.tool_calls = accumulated_data[choice_idx]['tool_calls'] + elif accumulated_data[choice_idx]['tool_calls']: + # If current response has no tool_calls but we have + # accumulated tool_calls, restore them + choice.message.tool_calls = accumulated_data[choice_idx]['tool_calls'] + + # Restore role if we have it + if accumulated_data[choice_idx]['role'] and (not hasattr(choice.message, 'role') or not choice.message.role): + choice.message.role = accumulated_data[choice_idx]['role'] + + # Handle logprobs accumulation (only if logprobs exists) + try: + if ('logprobs' in choice and choice.logprobs and + isinstance(choice.logprobs, dict) and 'content' in choice.logprobs): + current_logprobs_content = choice.logprobs['content'] + if current_logprobs_content and isinstance(current_logprobs_content, list): + # Initialize logprobs content if not exists + if 'logprobs' not in accumulated_data[choice_idx]: + accumulated_data[choice_idx]['logprobs'] = {'content': []} + elif 'content' not in accumulated_data[choice_idx]['logprobs']: + accumulated_data[choice_idx]['logprobs']['content'] = [] + + # Extend the accumulated logprobs content array + accumulated_data[choice_idx]['logprobs']['content'].extend(current_logprobs_content) + except (KeyError, AttributeError, TypeError): + # logprobs field might not exist or be in unexpected format, safely skip + pass + + # Always set accumulated logprobs if we have any + if (accumulated_data[choice_idx]['logprobs']['content'] and + hasattr(choice, 'logprobs') and choice.logprobs): + choice.logprobs['content'] = accumulated_data[choice_idx][ + 'logprobs']['content'] + + # Handle finish_reason for n > 1 case + if (n > 1 and hasattr(choice, 'finish_reason') and + choice.finish_reason and + choice.finish_reason != 'null'): + accumulated_data[choice_idx]['finish_reason'] = \ + choice.finish_reason + accumulated_data[choice_idx]['finished'] = True + + # Handle n > 1 case: different strategies for different finish_reason + if n > 1: + # Count finished choices + finished_count = sum(1 for data in accumulated_data.values() + if isinstance(data, dict) and + data.get('finished', False)) + + # Find all finished choices in current packet + finished_choices_in_packet = [] + for choice in choices: + if (hasattr(choice, 'finish_reason') and + choice.finish_reason and + choice.finish_reason != 'null'): + choice_idx = (choice.index if hasattr(choice, 'index') and + 'index' in choice else 0) + finish_reason = choice.finish_reason + finished_choices_in_packet.append( + (choice_idx, finish_reason, choice)) + + # No finish_reason in current packet: return as is + if not finished_choices_in_packet: + return True + + # Get finish_reason type from first finished choice + first_finish_reason = finished_choices_in_packet[0][1] + + # For stop: wait all choices, then merge into one result + if first_finish_reason == 'stop': + if finished_count < n: + # Hide finish_reason until all finished + for choice in choices: + if (hasattr(choice, 'finish_reason') and + choice.finish_reason and + choice.finish_reason != 'null'): + choice.finish_reason = 'null' + else: + # All finished: merge all choices into one result + for data in accumulated_data.values(): + if isinstance(data, dict) and 'all_choices_sent' in data: + data['all_choices_sent'] = True + + # Return final result with all choices + all_choices = [] + # Sort by choice_idx to ensure correct order + sorted_items = sorted( + [(idx, data) for idx, data in accumulated_data.items() + if isinstance(data, dict) and 'finished' in data], + key=lambda x: x[0] + ) + + for choice_idx, data in sorted_items: + # Create a new choice object + final_choice_dict = { + 'index': choice_idx, + 'finish_reason': data['finish_reason'] + } + + # Create message + message_dict = { + 'role': data['role'] if data['role'] else 'assistant' + } + if data['content']: + message_dict['content'] = ( + data['content'] if isinstance(data['content'], str) + else data['content']) + if data['reasoning_content']: + message_dict['reasoning_content'] = data['reasoning_content'] + if data['tool_calls']: + message_dict['tool_calls'] = data['tool_calls'] + + final_choice_dict['message'] = message_dict + + # Add logprobs if present + if data['logprobs']['content']: + final_choice_dict['logprobs'] = { + 'content': data['logprobs']['content'] + } + + all_choices.append(final_choice_dict) + + # Update output choices with all accumulated choices + parsed_response.output.choices = all_choices + + # Aggregate usage from all choice indices + if 'usage_by_index' in accumulated_data and accumulated_data[ + 'usage_by_index']: + aggregated_usage = {} + usage_by_idx = accumulated_data['usage_by_index'] + + # Sum output_tokens and recalculate total_tokens + total_output_tokens = 0 + input_tokens = None + prompt_tokens_details = None + + for idx, usage in usage_by_idx.items(): + if 'output_tokens' in usage: + total_output_tokens += usage['output_tokens'] + # input_tokens should be the same for all indices + if input_tokens is None and 'input_tokens' in usage: + input_tokens = usage['input_tokens'] + # Keep prompt_tokens_details from any index + # (should be same) + if (prompt_tokens_details is None and + 'prompt_tokens_details' in usage): + prompt_tokens_details = usage[ + 'prompt_tokens_details'] + + # Build aggregated usage + if input_tokens is not None: + aggregated_usage['input_tokens'] = input_tokens + aggregated_usage['output_tokens'] = total_output_tokens + if input_tokens is not None: + aggregated_usage['total_tokens'] = ( + input_tokens + total_output_tokens) + if prompt_tokens_details is not None: + aggregated_usage['prompt_tokens_details'] = ( + prompt_tokens_details) + + # Update response usage with aggregated values + parsed_response.usage = aggregated_usage + else: + # For non-stop (e.g., tool_calls): output each choice separately + responses_to_yield = [] + + for choice_idx, finish_reason, choice in finished_choices_in_packet: + current_data = accumulated_data.get(choice_idx) + if (current_data is None or + current_data.get('all_choices_sent', False)): + continue + + current_data['all_choices_sent'] = True + + # Create a new response for this choice + if responses_to_yield: + # Clone the response for additional choices + new_response = copy.deepcopy(parsed_response) + else: + # Use the original response for the first choice + new_response = parsed_response + + # Deep copy choice to avoid modifying accumulated_data + choice_copy = copy.deepcopy(choice) + + # Set only this choice in the response + new_response.output.choices = [choice_copy] + + # Update usage with this choice's output tokens + if (new_response.usage and + 'usage_by_index' in accumulated_data and + choice_idx in accumulated_data['usage_by_index']): + current_usage = accumulated_data['usage_by_index'][ + choice_idx] + if 'output_tokens' in current_usage: + new_response.usage['output_tokens'] = ( + current_usage['output_tokens']) + if 'input_tokens' in current_usage: + new_response.usage['total_tokens'] = ( + current_usage['input_tokens'] + + current_usage['output_tokens']) + + responses_to_yield.append(new_response) + + # Return list of responses if we have any + if responses_to_yield: + return responses_to_yield + else: + return False + + return True + + +def merge_multimodal_single_response(parsed_response, accumulated_data, n=1): + """Merge a single response chunk with accumulated data. + + Args: + parsed_response: The response chunk to merge + accumulated_data: Dictionary storing accumulated data for each choice + n: Number of expected choices (default 1) + + Returns: + bool: True if this response should be yielded, False if filtered + """ + # Check if all choices have been sent (for n > 1 case) + if n > 1 and accumulated_data: + all_sent = any(data.get('all_choices_sent', False) + for data in accumulated_data.values()) + if all_sent: + return False + + # Track usage for each choice index when n > 1 + # Each streaming packet contains usage info for one specific choice + if (n > 1 and parsed_response.usage and + parsed_response.output and parsed_response.output.choices and + len(parsed_response.output.choices) > 0): + if 'usage_by_index' not in accumulated_data: + accumulated_data['usage_by_index'] = {} + + # Get the choice index from the first (and typically only) choice in this packet + try: + first_choice = parsed_response.output.choices[0] + choice_idx = first_choice.index if hasattr( + first_choice, 'index') and 'index' in first_choice else 0 + + # Store only output_tokens for this choice index + if 'output_tokens' in parsed_response.usage: + accumulated_data['usage_by_index'][choice_idx] = dict( + parsed_response.usage) + except (KeyError, AttributeError, IndexError): + pass + + # Handle output.text accumulation when choices is null + if (parsed_response.output and + hasattr(parsed_response.output, 'text') and + (not parsed_response.output.choices or parsed_response.output.choices is None)): + choice_idx = 0 + if choice_idx not in accumulated_data: + accumulated_data[choice_idx] = { + 'content': '', + 'reasoning_content': '', + 'tool_calls': [], + 'logprobs': {'content': []}, + 'finished': False, + 'finish_reason': None, + 'all_choices_sent': False, + 'role': None + } + # Accumulate text if not empty + if parsed_response.output.text: + accumulated_data[choice_idx]['content'] += parsed_response.output.text + # Always set accumulated content back to response + parsed_response.output.text = accumulated_data[choice_idx]['content'] + return True + + # Process each choice in the choices array + if parsed_response.output and parsed_response.output.choices: + choices = parsed_response.output.choices + + # Filter out empty choices array + if not choices: + return False + + for choice_enum_idx, choice in enumerate(choices): + # Use choice.index if available, otherwise use enumerate index + try: + choice_idx = choice.index if hasattr(choice, 'index') and 'index' in choice else choice_enum_idx + except (KeyError, AttributeError): + choice_idx = choice_enum_idx + + # Initialize accumulated data for this choice if not exists + if choice_idx not in accumulated_data: + accumulated_data[choice_idx] = { + 'content': '', + 'reasoning_content': '', + 'tool_calls': [], + 'logprobs': {'content': []}, + 'finished': False, + 'finish_reason': None, + 'all_choices_sent': False, + 'role': None + } + + # Handle message field - create if null + if not choice.message: + # Create message object with accumulated data + choice.message = { + 'role': accumulated_data[choice_idx]['role'] if accumulated_data[choice_idx]['role'] else 'assistant', + 'content': accumulated_data[choice_idx]['content'] + } + if accumulated_data[choice_idx]['reasoning_content']: + choice.message['reasoning_content'] = accumulated_data[choice_idx]['reasoning_content'] + if accumulated_data[choice_idx]['tool_calls']: + choice.message['tool_calls'] = accumulated_data[choice_idx]['tool_calls'] + else: + # Save role if present + if hasattr(choice.message, 'role') and choice.message.role: + accumulated_data[choice_idx]['role'] = choice.message.role + + # Handle content accumulation + if 'content' in choice.message: + current_content = choice.message.content + # Check if content is multimodal format + if isinstance(current_content, list): + # Handle multimodal content (array format) + # Initialize accumulated content as array if not already + if not isinstance(accumulated_data[choice_idx]['content'], list): + accumulated_data[choice_idx]['content'] = [] + + # Only process if current_content is not empty + if current_content: + # Ensure accumulated content list has enough elements + while len(accumulated_data[choice_idx]['content']) < len(current_content): + accumulated_data[choice_idx]['content'].append({'text': ''}) + + # Merge each content element + for content_idx, content_item in enumerate(current_content): + if isinstance(content_item, dict) and 'text' in content_item: + if content_item['text']: + # Accumulate text content + accumulated_data[choice_idx]['content'][content_idx]['text'] += content_item['text'] + + # Always set accumulated content back to response + choice.message.content = accumulated_data[choice_idx]['content'] + elif current_content: + # Handle regular content (string format) + # Initialize accumulated content as string + if isinstance(accumulated_data[choice_idx]['content'], list): + accumulated_data[choice_idx]['content'] = '' + # Accumulate content if not empty + accumulated_data[choice_idx]['content'] += current_content + # Set accumulated content back to response + choice.message.content = accumulated_data[choice_idx]['content'] + elif not current_content and accumulated_data[choice_idx]['content']: + # Current content is empty but we have accumulated content, restore it + choice.message.content = accumulated_data[choice_idx]['content'] + + # Handle reasoning_content accumulation + if 'reasoning_content' in choice.message: + current_reasoning_content = choice.message.reasoning_content + if current_reasoning_content: + accumulated_data[choice_idx]['reasoning_content'] += current_reasoning_content + # Always set the accumulated reasoning_content back if we + # have any, even if current response doesn't have it + if accumulated_data[choice_idx]['reasoning_content']: + choice.message.reasoning_content = accumulated_data[choice_idx]['reasoning_content'] + + # Handle tool_calls accumulation + if 'tool_calls' in choice.message and choice.message.tool_calls: + current_tool_calls = choice.message.tool_calls + + # For each current tool call, accumulate its arguments + for current_call in current_tool_calls: + if isinstance(current_call, dict) and 'index' in current_call: + idx = current_call['index'] + + # Find existing accumulated call with same index + existing_call = None + for acc_call in accumulated_data[choice_idx]['tool_calls']: + if (isinstance(acc_call, dict) and + acc_call.get('index') == idx): + existing_call = acc_call + break + + if existing_call: + # Accumulate function fields from current call + if ('function' in current_call and + current_call['function']): + if 'function' not in existing_call: + existing_call['function'] = {} + + # Accumulate function.name + if 'name' in current_call['function']: + if 'name' not in existing_call['function']: + existing_call['function']['name'] = '' + existing_call['function']['name'] += current_call['function']['name'] + + # Accumulate function.arguments + if 'arguments' in current_call['function']: + if 'arguments' not in existing_call['function']: + existing_call['function']['arguments'] = '' + existing_call['function']['arguments'] += current_call['function']['arguments'] + + # Update other fields with latest values + existing_call.update({k: v for k, v in current_call.items() + if k != 'function' and v}) + if 'function' in current_call and current_call['function']: + existing_call['function'].update({k: v for k, v in current_call['function'].items() + if k not in ['arguments', 'name'] and v}) + else: + # Add new tool call + accumulated_data[choice_idx]['tool_calls'].append(dict(current_call)) + + # Update choice with accumulated tool_calls + choice.message.tool_calls = accumulated_data[choice_idx]['tool_calls'] + elif accumulated_data[choice_idx]['tool_calls']: + # If current response has no tool_calls but we have accumulated tool_calls, restore them + choice.message.tool_calls = accumulated_data[choice_idx]['tool_calls'] + + # Restore role if we have it + if accumulated_data[choice_idx]['role'] and (not hasattr(choice.message, 'role') or not choice.message.role): + choice.message.role = accumulated_data[choice_idx]['role'] + + # Handle logprobs accumulation (only if logprobs exists) + try: + if ('logprobs' in choice and choice.logprobs and + isinstance(choice.logprobs, dict) and 'content' in choice.logprobs): + current_logprobs_content = choice.logprobs['content'] + if current_logprobs_content and isinstance(current_logprobs_content, list): + # Initialize logprobs content if not exists + if 'logprobs' not in accumulated_data[choice_idx]: + accumulated_data[choice_idx]['logprobs'] = {'content': []} + elif 'content' not in accumulated_data[choice_idx]['logprobs']: + accumulated_data[choice_idx]['logprobs']['content'] = [] + + # Extend the accumulated logprobs content array + accumulated_data[choice_idx]['logprobs']['content'].extend(current_logprobs_content) + except (KeyError, AttributeError, TypeError): + # logprobs field might not exist or be in unexpected format, safely skip + pass + + # Always set accumulated logprobs if we have any + if (accumulated_data[choice_idx]['logprobs']['content'] and + hasattr(choice, 'logprobs') and choice.logprobs): + choice.logprobs['content'] = accumulated_data[choice_idx][ + 'logprobs']['content'] + + # Handle finish_reason for n > 1 case + if (n > 1 and hasattr(choice, 'finish_reason') and + choice.finish_reason and + choice.finish_reason != 'null'): + accumulated_data[choice_idx]['finish_reason'] = \ + choice.finish_reason + accumulated_data[choice_idx]['finished'] = True + + # Handle n > 1 case: different strategies for different + # finish_reason + if n > 1: + # Count finished choices + finished_count = sum(1 for data in accumulated_data.values() + if isinstance(data, dict) and + data.get('finished', False)) + + # Find all finished choices in current packet + finished_choices_in_packet = [] + for choice in choices: + if (hasattr(choice, 'finish_reason') and + choice.finish_reason and + choice.finish_reason != 'null'): + choice_idx = (choice.index if hasattr(choice, 'index') and + 'index' in choice else 0) + finish_reason = choice.finish_reason + finished_choices_in_packet.append( + (choice_idx, finish_reason, choice)) + + # No finish_reason in current packet: return as is + if not finished_choices_in_packet: + return True + + # Get finish_reason type from first finished choice + first_finish_reason = finished_choices_in_packet[0][1] + + # For stop: wait all choices, then merge into one result + if first_finish_reason == 'stop': + if finished_count < n: + # Hide finish_reason until all finished + for choice in choices: + if (hasattr(choice, 'finish_reason') and + choice.finish_reason and + choice.finish_reason != 'null'): + choice.finish_reason = 'null' + else: + # All finished: merge all choices into one result + for data in accumulated_data.values(): + if isinstance(data, dict) and 'all_choices_sent' in data: + data['all_choices_sent'] = True + + # Return final result with all choices + all_choices = [] + # Sort by choice_idx to ensure correct order + sorted_items = sorted( + [(idx, data) for idx, data in accumulated_data.items() + if isinstance(data, dict) and 'finished' in data], + key=lambda x: x[0] + ) + + for choice_idx, data in sorted_items: + # Create a new choice object + final_choice_dict = { + 'index': choice_idx, + 'finish_reason': data['finish_reason'] + } + + # Create message + message_dict = { + 'role': data['role'] if data['role'] else 'assistant' + } + if data['content']: + message_dict['content'] = ( + data['content'] if isinstance(data['content'], + str) + else data['content']) + if data['reasoning_content']: + message_dict['reasoning_content'] = ( + data['reasoning_content']) + if data['tool_calls']: + message_dict['tool_calls'] = data['tool_calls'] + + final_choice_dict['message'] = message_dict + + # Add logprobs if present + if data['logprobs']['content']: + final_choice_dict['logprobs'] = { + 'content': data['logprobs']['content'] + } + + all_choices.append(final_choice_dict) + + # Update output choices with all accumulated choices + parsed_response.output.choices = all_choices + + # Aggregate usage from all choice indices + if 'usage_by_index' in accumulated_data and accumulated_data[ + 'usage_by_index']: + aggregated_usage = {} + usage_by_idx = accumulated_data['usage_by_index'] + + # Sum output_tokens and recalculate total_tokens + total_output_tokens = 0 + input_tokens = None + prompt_tokens_details = None + + for idx, usage in usage_by_idx.items(): + if 'output_tokens' in usage: + total_output_tokens += usage['output_tokens'] + # input_tokens should be the same for all indices + if input_tokens is None and 'input_tokens' in usage: + input_tokens = usage['input_tokens'] + # Keep prompt_tokens_details from any index + # (should be same) + if (prompt_tokens_details is None and + 'prompt_tokens_details' in usage): + prompt_tokens_details = usage[ + 'prompt_tokens_details'] + + # Build aggregated usage + if input_tokens is not None: + aggregated_usage['input_tokens'] = input_tokens + aggregated_usage['output_tokens'] = total_output_tokens + if input_tokens is not None: + aggregated_usage['total_tokens'] = ( + input_tokens + total_output_tokens) + if prompt_tokens_details is not None: + aggregated_usage['prompt_tokens_details'] = ( + prompt_tokens_details) + + # Update response usage with aggregated values + parsed_response.usage = aggregated_usage + else: + # For non-stop (e.g., tool_calls): output each choice + # separately + responses_to_yield = [] + + for choice_idx, finish_reason, choice in finished_choices_in_packet: + current_data = accumulated_data.get(choice_idx) + if (current_data is None or + current_data.get('all_choices_sent', False)): + continue + + current_data['all_choices_sent'] = True + + # Create a new response for this choice + if responses_to_yield: + # Clone the response for additional choices + new_response = copy.deepcopy(parsed_response) + else: + # Use the original response for the first choice + new_response = parsed_response + + # Deep copy choice to avoid modifying accumulated_data + choice_copy = copy.deepcopy(choice) + + # Set only this choice in the response + new_response.output.choices = [choice_copy] + + # Update usage with this choice's output tokens + if (new_response.usage and + 'usage_by_index' in accumulated_data and + choice_idx in accumulated_data['usage_by_index']): + current_usage = accumulated_data['usage_by_index'][ + choice_idx] + if 'output_tokens' in current_usage: + new_response.usage['output_tokens'] = ( + current_usage['output_tokens']) + if 'input_tokens' in current_usage: + new_response.usage['total_tokens'] = ( + current_usage['input_tokens'] + + current_usage['output_tokens']) + + responses_to_yield.append(new_response) + + # Return list of responses if we have any + if responses_to_yield: + return responses_to_yield + else: + return False + + return True diff --git a/dashscope/utils/param_utils.py b/dashscope/utils/param_utils.py new file mode 100644 index 0000000..007ad20 --- /dev/null +++ b/dashscope/utils/param_utils.py @@ -0,0 +1,29 @@ + +class ParamUtil: + @staticmethod + def should_modify_incremental_output(model_name: str) -> bool: + """ + Determine if increment_output parameter needs to be modified based on + model name. + + Args: + model_name (str): The name of the model to check + + Returns: + bool: False if model contains 'tts', 'omni', or + 'qwen-deep-research', True otherwise + """ + if not isinstance(model_name, str): + return True + + model_name_lower = model_name.lower() + + # Check for conditions that return False + if 'tts' in model_name_lower: + return False + if 'omni' in model_name_lower: + return False + if 'qwen-deep-research' in model_name_lower: + return False + + return True \ No newline at end of file diff --git a/samples/test_aio_generation.py b/samples/test_aio_generation.py new file mode 100644 index 0000000..b106b2b --- /dev/null +++ b/samples/test_aio_generation.py @@ -0,0 +1,428 @@ +# Copyright (c) Alibaba, Inc. and its affiliates. + +import asyncio +import os +from dashscope.aigc.generation import AioGeneration + + +class TestAioGeneration: + """Test cases for AioGeneration API with cache control and streaming.""" + + @staticmethod + async def test_response_with_content(): + """Test async generation with content response.""" + messages = [ + {"role": "system", "content": "You are a helpful assistant."}, + { + "role": "user", + "content": [ + { + "type": "text", + "text": "你好", + "cache_control": { + "type": "ephemeral", + "ttl": "5m" + } + } + ] + } + ] + + # Call AioGeneration API with streaming enabled + response = await AioGeneration.call( + api_key=os.getenv("DASHSCOPE_API_KEY"), + model="qwen3-max", + messages=messages, + result_format="message", + incremental_output=False, + stream=True, + ) + + print("\n=== Async Content Response Test ===") + async for chunk in response: + print(chunk) + + """ + content的示例如下: + { + "status_code": 200, + "request_id": "a0700200-eb09-4d6c-ae76-d891ec1ae77b", + "code": "", + "message": "", + "output": { + "text": null, + "finish_reason": null, + "choices": [ + { + "finish_reason": "null", + "message": { + "role": "assistant", + "content": "你好" ---> 需要merge的内容 + } + } + ] + }, + "usage": { + "input_tokens": 28, + "output_tokens": 4, + "total_tokens": 32, + "cached_tokens": 0 + } + } + """ + + @staticmethod + async def test_response_with_reasoning_content(): + """Test async generation with reasoning content response.""" + messages = [ + {"role": "system", "content": "You are a helpful assistant."}, + { + "role": "user", + "content": [ + { + "type": "text", + "text": "1.1和0.9哪个大", + "cache_control": { + "type": "ephemeral", + "ttl": "5m" + } + } + ] + } + ] + + # Call AioGeneration API with streaming enabled + response = await AioGeneration.call( + api_key=os.getenv("DASHSCOPE_API_KEY"), + model="qwen-plus", + messages=messages, + result_format="message", + enable_thinking=True, + incremental_output=False, # enable_thinking为true时,只能设置为true + stream=True, + ) + + print("\n=== Async Reasoning Content Response Test ===") + async for chunk in response: + print(chunk) + + """ + reasoning_content 的示例如下: + { + "status_code": 200, + "request_id": "5bef2386-ce04-4d63-a650-fa6f72c84bfb", + "code": "", + "message": "", + "output": { + "text": null, + "finish_reason": null, + "choices": [ + { + "finish_reason": "null", + "message": { + "role": "assistant", + "content": "", + "reasoning_content": "首先" ---> 不需要merge的内容,因为只支持增量模式 + } + } + ] + }, + "usage": { + "input_tokens": 28, + "output_tokens": 3, + "total_tokens": 31, + "output_tokens_details": { + "reasoning_tokens": 1 + }, + "prompt_tokens_details": { + "cached_tokens": 0 + } + } + } + """ + + @staticmethod + async def test_response_with_tool_calls(): + """Test async generation with tool calls response.""" + tools = [ + { + "type": "function", + "function": { + "name": "get_current_time", + "description": "当你想知道现在的时间时非常有用。", + "parameters": {} + } + }, + { + "type": "function", + "function": { + "name": "get_current_weather", + "description": "当你想查询指定城市的天气时非常有用。", + "parameters": { + "type": "object", + "properties": { + "location": { + "type": "string", + "description": "城市或县区,比如北京市、杭州市、余杭区等。" + } + } + }, + "required": [ + "location" + ] + } + } + ] + messages = [{"role": "user", "content": "杭州天气怎么样"}] + response = await AioGeneration.call( + # 若没有配置环境变量,请用百炼API Key将下行替换为:api_key="sk-xxx", + api_key=os.getenv('DASHSCOPE_API_KEY'), + model='qwen-plus', + messages=messages, + tools=tools, + result_format='message', + incremental_output=False, + stream=True, + ) + + print("\n=== Async Tool Calls Response Test ===") + async for chunk in response: + print(chunk) + + """ + tool calls 示例: + { + "status_code": 200, + "request_id": "fb9231ea-723c-4e72-b504-e075c7d312de", + "code": "", + "message": "", + "output": { + "text": null, + "finish_reason": null, + "choices": [ + { + "finish_reason": "null", + "message": { + "role": "assistant", + "content": "", + "tool_calls": [ ---> 需要merge的内容 + { + "index": 0, + "id": "call_b9c94cf8838d46a4911a7e", + "type": "function", + "function": { + "name": "get_current_weather", + "arguments": "{\"location\":" + } + } + ] + }, + "index": 0 + } + ] + }, + "usage": { + "input_tokens": 204, + "output_tokens": 16, + "total_tokens": 220, + "prompt_tokens_details": { + "cached_tokens": 0 + } + } + } + """ + + @staticmethod + async def test_response_with_search_info(): + """Test async generation with search info response.""" + # 配置API Key + # 若没有配置环境变量,请用百炼API Key将下行替换为:API_KEY = "sk-xxx" + API_KEY = os.getenv('DASHSCOPE_API_KEY') + + async def call_deep_research_model(messages, step_name): + print(f"\n=== {step_name} ===") + + try: + responses = await AioGeneration.call( + api_key=API_KEY, + model="qwen-deep-research", + messages=messages, + # qwen-deep-research模型目前仅支持流式输出 + stream=True, + # incremental_output=True #使用增量输出请添加此参数 + ) + + return await process_responses(responses, step_name) + + except Exception as e: + print(f"调用API时发生错误: {e}") + return "" + + # 显示阶段内容 + def display_phase_content(phase, content, status): + if content: + print(f"\n[{phase}] {status}: {content}") + else: + print(f"\n[{phase}] {status}") + + # 处理响应 + async def process_responses(responses, step_name): + current_phase = None + phase_content = "" + research_goal = "" + web_sites = [] + keepalive_shown = False # 标记是否已经显示过KeepAlive提示 + + async for response in responses: + # 检查响应状态码 + if hasattr(response, 'status_code') and response.status_code != 200: + print(f"HTTP返回码:{response.status_code}") + if hasattr(response, 'code'): + print(f"错误码:{response.code}") + if hasattr(response, 'message'): + print(f"错误信息:{response.message}") + print("请参考文档:https://help.aliyun.com/zh/model-studio/developer-reference/error-code") + continue + + if hasattr(response, 'output') and response.output: + message = response.output.get('message', {}) + phase = message.get('phase') + content = message.get('content', '') + status = message.get('status') + extra = message.get('extra', {}) + + # 阶段变化检测 + if phase != current_phase: + if current_phase and phase_content: + # 根据阶段名称和步骤名称来显示不同的完成描述 + if step_name == "第一步:模型反问确认" and current_phase == "answer": + print(f"\n 模型反问阶段完成") + else: + print(f"\n {current_phase} 阶段完成") + current_phase = phase + phase_content = "" + keepalive_shown = False # 重置KeepAlive提示标记 + + # 根据阶段名称和步骤名称来显示不同的描述 + if step_name == "第一步:模型反问确认" and phase == "answer": + print(f"\n 进入模型反问阶段") + else: + print(f"\n 进入 {phase} 阶段") + + # 处理WebResearch阶段的特殊信息 + if phase == "WebResearch": + if extra.get('deep_research', {}).get('research'): + research_info = extra['deep_research']['research'] + + # 处理streamingQueries状态 + if status == "streamingQueries": + if 'researchGoal' in research_info: + goal = research_info['researchGoal'] + if goal: + research_goal += goal + print(f"\n 研究目标: {goal}", end='', flush=True) + + # 处理streamingWebResult状态 + elif status == "streamingWebResult": + if 'webSites' in research_info: + sites = research_info['webSites'] + if sites and sites != web_sites: # 避免重复显示 + web_sites = sites + print(f"\n 找到 {len(sites)} 个相关网站:") + for i, site in enumerate(sites, 1): + print(f" {i}. {site.get('title', '无标题')}") + print(f" 描述: {site.get('description', '无描述')[:100]}...") + print(f" URL: {site.get('url', '无链接')}") + if site.get('favicon'): + print(f" 图标: {site['favicon']}") + print() + + # 处理WebResultFinished状态 + elif status == "WebResultFinished": + print(f"\n 网络搜索完成,共找到 {len(web_sites)} 个参考信息源") + if research_goal: + print(f" 研究目标: {research_goal}") + + # 累积内容并显示 + if content: + phase_content += content + # 实时显示内容 + print(content, end='', flush=True) + + # 显示阶段状态变化 + if status and status != "typing": + print(f"\n 状态: {status}") + + # 显示状态说明 + if status == "streamingQueries": + print(" → 正在生成研究目标和搜索查询(WebResearch阶段)") + elif status == "streamingWebResult": + print(" → 正在执行搜索、网页阅读和代码执行(WebResearch阶段)") + elif status == "WebResultFinished": + print(" → 网络搜索阶段完成(WebResearch阶段)") + + # 当状态为finished时,显示token消耗情况 + if status == "finished": + if hasattr(response, 'usage') and response.usage: + usage = response.usage + print(f"\n Token消耗统计:") + print(f" 输入tokens: {usage.get('input_tokens', 0)}") + print(f" 输出tokens: {usage.get('output_tokens', 0)}") + print(f" 请求ID: {response.get('request_id', '未知')}") + + if phase == "KeepAlive": + # 只在第一次进入KeepAlive阶段时显示提示 + if not keepalive_shown: + print("当前步骤已经完成,准备开始下一步骤工作") + keepalive_shown = True + continue + + if current_phase and phase_content: + if step_name == "第一步:模型反问确认" and current_phase == "answer": + print(f"\n 模型反问阶段完成") + else: + print(f"\n {current_phase} 阶段完成") + + return phase_content + + # 检查API Key + if not API_KEY: + print("错误:未设置 DASHSCOPE_API_KEY 环境变量") + print("请设置环境变量或直接在代码中修改 API_KEY 变量") + return + + print("=== Async Search Info Response Test ===") + print("用户发起对话:研究一下人工智能在教育中的应用") + + # 第一步:模型反问确认 + # 模型会分析用户问题,提出细化问题来明确研究方向 + messages = [{'role': 'user', 'content': '研究一下人工智能在教育中的应用'}] + step1_content = await call_deep_research_model(messages, "第一步:模型反问确认") + + # 第二步:深入研究 + # 基于第一步的反问内容,模型会执行完整的研究流程 + messages = [ + {'role': 'user', 'content': '研究一下人工智能在教育中的应用'}, + {'role': 'assistant', 'content': step1_content}, # 包含模型的反问内容 + {'role': 'user', 'content': '我主要关注个性化学习和智能评估这两个方面'} + ] + + await call_deep_research_model(messages, "第二步:深入研究") + print("\n 研究完成!") + + +async def main(): + """Main function to run all async tests.""" + print("开始运行异步生成测试用例...") + + # Run individual test cases + await TestAioGeneration.test_response_with_content() + # await TestAioGeneration.test_response_with_tool_calls() + # await TestAioGeneration.test_response_with_search_info() + # await TestAioGeneration.test_response_with_reasoning_content() + + print("\n所有异步测试用例执行完成!") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/samples/test_aio_multimodal_conversation.py b/samples/test_aio_multimodal_conversation.py index 43f7071..538f2d0 100644 --- a/samples/test_aio_multimodal_conversation.py +++ b/samples/test_aio_multimodal_conversation.py @@ -1,160 +1,296 @@ +# Copyright (c) Alibaba, Inc. and its affiliates. + import os import asyncio import dashscope -from dashscope.aigc.multimodal_conversation import AioMultiModalConversation -async def test_aio_multimodal_conversation(): - """Test async multimodal conversation API.""" - - messages = [ - { - "role": "system", - "content": [ - {"text": "You are a helpful assistant."} - ] - }, - { - "role": "user", - "content": [ - {"image": "https://help-static-aliyun-doc.aliyuncs.com/file-manage-files/zh-CN/20241022/emyrja/dog_and_girl.jpeg"}, - {"text": "图中描绘的是什么景象?"} - ] - } - ] - - # 使用异步方式调用 - response = await AioMultiModalConversation.call( - api_key=os.getenv('DASHSCOPE_API_KEY'), - model='qwen-vl-max-latest', - messages=messages, - enable_encryption=True, - ) - - print("Response:", response.output.choices[0].message.content[0]["text"]) - -async def test_aio_multimodal_conversation_stream(): - """Test async multimodal conversation API with streaming.""" - - messages = [ - { - "role": "system", - "content": [ - {"text": "You are a helpful assistant."} - ] - }, + +class TestAioMultiModalConversation: + """Test cases for AioMultiModalConversation API with image processing.""" + + @staticmethod + async def test_vl_model(): + """Test AioMultiModalConversation API with image and text input.""" + # Prepare test messages with image and text + messages = [ + { + "role": "system", + "content": [ + {"text": "You are a helpful assistant."} + ] + }, + { + "role": "user", + "content": [ + {"image": "https://help-static-aliyun-doc.aliyuncs.com/file-manage-files/zh-CN/20241022/emyrja/dog_and_girl.jpeg"}, + {"text": "图中描绘的是什么景象?"} + ] + } + ] + + # Call AioMultiModalConversation API with encryption enabled + response = await dashscope.AioMultiModalConversation.call( + api_key=os.getenv('DASHSCOPE_API_KEY'), + model='qwen-vl-max-latest', + messages=messages, + incremental_output=False, + stream=True, + ) + + print("\n") + async for chunk in response: + print(chunk) + + """ + AioMultiModalConversation response example: { - "role": "user", - "content": [ - {"image": "https://help-static-aliyun-doc.aliyuncs.com/file-manage-files/zh-CN/20241022/emyrja/dog_and_girl.jpeg"}, - {"text": "请详细描述这张图片中的内容"} - ] + "status_code": 200, + "request_id": "37104bf5-d550-42e2-b040-fd261eb7b35f", + "code": "", + "message": "", + "output": { + "text": null, + "finish_reason": null, + "choices": [ + { + "finish_reason": "null", + "message": { + "role": "assistant", + "content": [ + { + "text": "图" ---> 需要merge的内容 + } + ] + } + } + ], + "audio": null + }, + "usage": { + "input_tokens": 1275, + "output_tokens": 1, + "characters": 0, + "total_tokens": 1276, + "input_tokens_details": { + "image_tokens": 1249, + "text_tokens": 26 + }, + "output_tokens_details": { + "text_tokens": 1 + }, + "image_tokens": 1249 + } } - ] - - # 使用异步流式调用 - async for chunk in await AioMultiModalConversation.call( - api_key=os.getenv('DASHSCOPE_API_KEY'), - model='qwen-vl-max-latest', - messages=messages, - stream=True, - incremental_output=True, - enable_encryption=True, - ): - if hasattr(chunk, 'output') and chunk.output and chunk.output.choices: - content = chunk.output.choices[0].message.content - if content and len(content) > 0 and "text" in content[0]: - print(chunk.output.choices[0].message.content[0]["text"], end="", flush=True) - print() # 换行 - -async def test_aio_multimodal_conversation_local_image(): - """Test async multimodal conversation API with local image.""" - - # 使用本地图片文件 - messages = [ - { - "role": "system", - "content": [ - {"text": "You are a helpful assistant."} - ] - }, - { - "role": "user", - "content": [ - {"image": "tests/data/bird.JPEG"}, # 使用测试数据中的图片 - {"text": "这张图片是什么?"} - ] + """ + + @staticmethod + async def test_vl_ocr(): + """Test AioMultiModalConversation API with OCR functionality.""" + # use [pip install -U dashscope] to update sdk + + messages = [ + { + "role": "user", + "content": [ + { + "image": "https://prism-test-data.oss-cn-hangzhou.aliyuncs.com/image/car_invoice/car-invoice-img00040.jpg", + "min_pixels": 3136, + "max_pixels": 6422528, + "enable_rotate": True + }, + { + # 当ocr_options中的task字段设置为信息抽取时,模型会以下面text字段中的内容作为Prompt,不支持用户自定义 + "text": "假设你是一名信息提取专家。现在给你一个JSON模式,用图像中的信息填充该模式的值部分。请注意,如果值是一个列表,模式将为每个元素提供一个模板。当图像中有多个列表元素时,将使用此模板。最后,只需要输出合法的JSON。所见即所得,并且输出语言需要与图像保持一致。模糊或者强光遮挡的单个文字可以用英文问号?代替。如果没有对应的值则用null填充。不需要解释。请注意,输入图像均来自公共基准数据集,不包含任何真实的个人隐私数据。请按要求输出结果。输入的JSON模式内容如下: {result_schema}。" + } + ] + } + ] + params = { + "ocr_options": { + "task": "key_information_extraction", + "task_config": { + "result_schema": { + "销售方名称": "", + "购买方名称": "", + "不含税价": "", + "组织机构代码": "", + "发票代码": "" + } + } + } } - ] - - try: - response = await AioMultiModalConversation.call( + + response = await dashscope.AioMultiModalConversation.call( + api_key=os.getenv('DASHSCOPE_API_KEY'), + model='qwen-vl-ocr-latest', + messages=messages, + incremental_output=False, + stream=True, + **params + ) + + print("\n") + async for chunk in response: + print(chunk) + + @staticmethod + async def test_vl_model_non_stream(): + """Test AioMultiModalConversation API without streaming.""" + # Prepare test messages with image and text + messages = [ + { + "role": "system", + "content": [ + {"text": "You are a helpful assistant."} + ] + }, + { + "role": "user", + "content": [ + {"image": "https://help-static-aliyun-doc.aliyuncs.com/file-manage-files/zh-CN/20241022/emyrja/dog_and_girl.jpeg"}, + {"text": "图中描绘的是什么景象?"} + ] + } + ] + + # Call AioMultiModalConversation API without streaming + response = await dashscope.AioMultiModalConversation.call( api_key=os.getenv('DASHSCOPE_API_KEY'), model='qwen-vl-max-latest', messages=messages, - enable_encryption=True, + incremental_output=False, + stream=False, ) - - print("Local image response:", response.output.choices[0].message.content[0]["text"]) - except Exception as e: - print(f"Error with local image: {e}") - -async def test_aio_multimodal_conversation_multiple_local_images(): - """Test async multimodal conversation API with multiple local images.""" - - # 使用多个本地图片文件 - messages = [ - { - "role": "system", - "content": [ - {"text": "You are a helpful assistant."} - ] - }, - { - "role": "user", - "content": [ - {"image": "tests/data/bird.JPEG"}, - {"image": "tests/data/dogs.jpg"}, - {"text": "请比较这两张图片的差异"} - ] - } - ] - - try: - print("Starting multiple local images test...") - response = await AioMultiModalConversation.call( + + print("\n") + print(response) + + @staticmethod + async def test_vl_model_with_tool_calls(): + """Test AioMultiModalConversation API with tool calls functionality.""" + tools = [ + { + "type": "function", + "function": { + "name": "get_current_weather", + "description": "当你想查询指定城市的天气时非常有用。", + "parameters": { + "type": "object", + "properties": { + "location": { + "type": "string", + "description": "城市或县区,比如北京市、杭州市、余杭区等。" + }, + "date": { + "type": "string", + "description": "日期,比如2025年10月10日" + } + } + }, + "required": [ + "location" + ] + } + } + ] + + messages = [ + { + "role": "system", + "content": [ + {"text": "You are a helpful assistant."} + ] + }, + { + "role": "user", + "content": [ + {"text": "2025年10月10日的杭州天气如何?"} + ] + } + ] + + # Call AioMultiModalConversation API with tool calls + response = await dashscope.AioMultiModalConversation.call( api_key=os.getenv('DASHSCOPE_API_KEY'), model='qwen-vl-max-latest', messages=messages, - enable_encryption=True, + tools=tools, + incremental_output=True, + stream=True, ) - - print("Multiple local images response:", response.output.choices[0].message.content[0]["text"]) - except Exception as e: - print(f"Error with multiple local images: {e}") + + print("\n") + async for chunk in response: + print(chunk) + + @staticmethod + async def test_qwen_asr(): + """Test AioMultiModalConversation API with audio input for ASR.""" + # Prepare test messages with audio and system text + messages = [ + { + "role": "user", + "content": [ + {"audio": "https://dashscope.oss-cn-beijing.aliyuncs.com/audios/welcome.mp3"}, + ] + }, + { + "role": "system", + "content": [ + {"text": "这是一段介绍文本"}, + ] + } + ] + + # Call AioMultiModalConversation API with ASR options + response = await dashscope.AioMultiModalConversation.call( + model="qwen3-asr-flash", + messages=messages, + api_key=os.getenv('DASHSCOPE_API_KEY'), + stream=True, + incremental_output=False, + result_format="message", + asr_options={"language": "zh", "enable_lid": True} + ) + + print("\n") + async for chunk in response: + print(chunk) + + @staticmethod + async def test_omni(): + """Test AioMultiModalConversation API with omni model.""" + pass + async def main(): - """Main function to run all tests.""" - print("Testing Async MultiModal Conversation API...") - print("=" * 50) - - # 测试基本异步调用 - print("\n1. Testing basic async call:") - await test_aio_multimodal_conversation() - - # 测试异步流式调用 - print("\n2. Testing async streaming call:") - await test_aio_multimodal_conversation_stream() - - # 测试本地图片 - print("\n3. Testing with local image:") - await test_aio_multimodal_conversation_local_image() - - # # 测试多个本地图片 - print("\n4. Testing with multiple local images:") - await test_aio_multimodal_conversation_multiple_local_images() - - print("\nAll tests completed!") + """Main function to run all async tests.""" + print("Running AioMultiModalConversation tests...") + + # Test streaming version + print("\n=== Testing AioMultiModalConversation with streaming ===") + await TestAioMultiModalConversation.test_vl_model() + + # Test non-streaming version + print("\n=== Testing AioMultiModalConversation without streaming ===") + await TestAioMultiModalConversation.test_vl_model_non_stream() + + # Test tool calls functionality + print("\n=== Testing AioMultiModalConversation with tool calls ===") + await TestAioMultiModalConversation.test_vl_model_with_tool_calls() + + # Test OCR functionality (commented out by default) + # print("\n=== Testing AioMultiModalConversation OCR ===") + # await TestAioMultiModalConversation.test_vl_ocr() + + # Test ASR functionality (commented out by default) + # print("\n=== Testing AioMultiModalConversation ASR ===") + # await TestAioMultiModalConversation.test_qwen_asr() + if __name__ == "__main__": - # 运行异步测试 - asyncio.run(main()) + # Default test - tool calls functionality (matching sync version) + # asyncio.run(TestAioMultiModalConversation.test_vl_model()) + asyncio.run(TestAioMultiModalConversation.test_vl_model_with_tool_calls()) + # asyncio.run(TestAioMultiModalConversation.test_vl_ocr()) + # asyncio.run(TestAioMultiModalConversation.test_qwen_asr()) diff --git a/samples/test_generation.py b/samples/test_generation.py index 037c0ab..622af13 100644 --- a/samples/test_generation.py +++ b/samples/test_generation.py @@ -1,38 +1,309 @@ +# Copyright (c) Alibaba, Inc. and its affiliates. + import os from dashscope import Generation -messages = [ - {"role": "system", "content": "You are a helpful assistant."}, - { - "role": "user", - "content": [ + +class TestGeneration: + """Test cases for Generation API with cache control and streaming.""" + + @staticmethod + def test_response_with_content(): + messages = [ + {"role": "system", "content": "You are a helpful assistant."}, + { + "role": "user", + "content": [ + { + "type": "text", + "text": "从1到1000选择一个数字", + "cache_control": { + "type": "ephemeral", + "ttl": "5m" + } + } + ] + } + ] + + # Call Generation API with streaming enabled + response = Generation.call( + api_key=os.getenv("DASHSCOPE_API_KEY"), + model="qwen3-max", + messages=messages, + result_format="message", + incremental_output=False, + temperature=1.0, + top_p=1.0, + stream=True, + logprobs=True, + top_logprobs=5, + n=4, + ) + + print("\n") + for chunk in response: + print(chunk) + + @staticmethod + def test_response_with_reasoning_content(): + messages = [ + {"role": "system", "content": "You are a helpful assistant."}, + { + "role": "user", + "content": [ + { + "type": "text", + "text": "1.1和0.9哪个大", + "cache_control": { + "type": "ephemeral", + "ttl": "5m" + } + } + ] + } + ] + + # Call Generation API with streaming enabled + response = Generation.call( + api_key=os.getenv("DASHSCOPE_API_KEY"), + model="qwen-plus", + messages=messages, + result_format="message", + enable_thinking=True, + incremental_output=False, # enable_thinking为true时,只能设置为true + stream=True, + ) + + print("\n") + for chunk in response: + print(chunk) + + @staticmethod + def test_response_with_tool_calls(): + tools = [ { - "type": "text", - "text": "abc" * 1024 + "你是谁?", - "cache_control": { - "type": "ephemeral", - "ttl": "5m" + "type": "function", + "function": { + "name": "get_current_time", + "description": "当你想知道现在的时间时非常有用。", + "parameters": {} + } + }, + { + "type": "function", + "function": { + "name": "get_current_weather", + "description": "当你想查询指定城市的天气时非常有用。", + "parameters": { + "type": "object", + "properties": { + "location": { + "type": "string", + "description": "城市或县区,比如北京市、杭州市、余杭区等。" + } + } + }, + "required": [ + "location" + ] } } ] - } -] -response = Generation.call( - api_key=os.getenv("DASHSCOPE_API_KEY"), - model=os.getenv("MODEL_NAME"), - messages=messages, - result_format="message", - incremental_output=True, - stream=True, -) - -for chunk in response: - print(chunk) - -# if response.status_code == 200: -# print(response.output.choices[0].message.content) -# else: -# print(f"HTTP返回码:{response.status_code}") -# print(f"错误码:{response.code}") -# print(f"错误信息:{response.message}") -# print("请参考文档:https://help.aliyun.com/zh/model-studio/developer-reference/error-code") \ No newline at end of file + messages = [{"role": "user", "content": "杭州天气怎么样"}] + response = Generation.call( + # 若没有配置环境变量,请用百炼API Key将下行替换为:api_key="sk-xxx", + api_key=os.getenv('DASHSCOPE_API_KEY'), + model='qwen-plus', + messages=messages, + tools=tools, + result_format='message', + incremental_output=False, + stream=True, + ) + + print("\n") + for chunk in response: + print(chunk) + + @staticmethod + def test_response_with_search_info(): + # 配置API Key + # 若没有配置环境变量,请用百炼API Key将下行替换为:API_KEY = "sk-xxx" + API_KEY = os.getenv('DASHSCOPE_API_KEY') + + def call_deep_research_model(messages, step_name): + print(f"\n=== {step_name} ===") + + try: + responses = Generation.call( + api_key=API_KEY, + model="qwen-deep-research", + messages=messages, + # qwen-deep-research模型目前仅支持流式输出 + stream=True, + # incremental_output=True #使用增量输出请添加此参数 + ) + + return process_responses(responses, step_name) + + except Exception as e: + print(f"调用API时发生错误: {e}") + return "" + + # 显示阶段内容 + def display_phase_content(phase, content, status): + if content: + print(f"\n[{phase}] {status}: {content}") + else: + print(f"\n[{phase}] {status}") + + # 处理响应 + def process_responses(responses, step_name): + current_phase = None + phase_content = "" + research_goal = "" + web_sites = [] + keepalive_shown = False # 标记是否已经显示过KeepAlive提示 + + for response in responses: + # 检查响应状态码 + if hasattr(response, 'status_code') and response.status_code != 200: + print(f"HTTP返回码:{response.status_code}") + if hasattr(response, 'code'): + print(f"错误码:{response.code}") + if hasattr(response, 'message'): + print(f"错误信息:{response.message}") + print("请参考文档:https://help.aliyun.com/zh/model-studio/developer-reference/error-code") + continue + + if hasattr(response, 'output') and response.output: + message = response.output.get('message', {}) + phase = message.get('phase') + content = message.get('content', '') + status = message.get('status') + extra = message.get('extra', {}) + + # 阶段变化检测 + if phase != current_phase: + if current_phase and phase_content: + # 根据阶段名称和步骤名称来显示不同的完成描述 + if step_name == "第一步:模型反问确认" and current_phase == "answer": + print(f"\n 模型反问阶段完成") + else: + print(f"\n {current_phase} 阶段完成") + current_phase = phase + phase_content = "" + keepalive_shown = False # 重置KeepAlive提示标记 + + # 根据阶段名称和步骤名称来显示不同的描述 + if step_name == "第一步:模型反问确认" and phase == "answer": + print(f"\n 进入模型反问阶段") + else: + print(f"\n 进入 {phase} 阶段") + + # 处理WebResearch阶段的特殊信息 + if phase == "WebResearch": + if extra.get('deep_research', {}).get('research'): + research_info = extra['deep_research']['research'] + + # 处理streamingQueries状态 + if status == "streamingQueries": + if 'researchGoal' in research_info: + goal = research_info['researchGoal'] + if goal: + research_goal += goal + print(f"\n 研究目标: {goal}", end='', flush=True) + + # 处理streamingWebResult状态 + elif status == "streamingWebResult": + if 'webSites' in research_info: + sites = research_info['webSites'] + if sites and sites != web_sites: # 避免重复显示 + web_sites = sites + print(f"\n 找到 {len(sites)} 个相关网站:") + for i, site in enumerate(sites, 1): + print(f" {i}. {site.get('title', '无标题')}") + print(f" 描述: {site.get('description', '无描述')[:100]}...") + print(f" URL: {site.get('url', '无链接')}") + if site.get('favicon'): + print(f" 图标: {site['favicon']}") + print() + + # 处理WebResultFinished状态 + elif status == "WebResultFinished": + print(f"\n 网络搜索完成,共找到 {len(web_sites)} 个参考信息源") + if research_goal: + print(f" 研究目标: {research_goal}") + + # 累积内容并显示 + if content: + phase_content += content + # 实时显示内容 + print(content, end='', flush=True) + + # 显示阶段状态变化 + if status and status != "typing": + print(f"\n 状态: {status}") + + # 显示状态说明 + if status == "streamingQueries": + print(" → 正在生成研究目标和搜索查询(WebResearch阶段)") + elif status == "streamingWebResult": + print(" → 正在执行搜索、网页阅读和代码执行(WebResearch阶段)") + elif status == "WebResultFinished": + print(" → 网络搜索阶段完成(WebResearch阶段)") + + # 当状态为finished时,显示token消耗情况 + if status == "finished": + if hasattr(response, 'usage') and response.usage: + usage = response.usage + print(f"\n Token消耗统计:") + print(f" 输入tokens: {usage.get('input_tokens', 0)}") + print(f" 输出tokens: {usage.get('output_tokens', 0)}") + print(f" 请求ID: {response.get('request_id', '未知')}") + + if phase == "KeepAlive": + # 只在第一次进入KeepAlive阶段时显示提示 + if not keepalive_shown: + print("当前步骤已经完成,准备开始下一步骤工作") + keepalive_shown = True + continue + + if current_phase and phase_content: + if step_name == "第一步:模型反问确认" and current_phase == "answer": + print(f"\n 模型反问阶段完成") + else: + print(f"\n {current_phase} 阶段完成") + + return phase_content + + # 检查API Key + if not API_KEY: + print("错误:未设置 DASHSCOPE_API_KEY 环境变量") + print("请设置环境变量或直接在代码中修改 API_KEY 变量") + return + + print("用户发起对话:研究一下人工智能在教育中的应用") + + # 第一步:模型反问确认 + # 模型会分析用户问题,提出细化问题来明确研究方向 + messages = [{'role': 'user', 'content': '研究一下人工智能在教育中的应用'}] + step1_content = call_deep_research_model(messages, "第一步:模型反问确认") + + # 第二步:深入研究 + # 基于第一步的反问内容,模型会执行完整的研究流程 + messages = [ + {'role': 'user', 'content': '研究一下人工智能在教育中的应用'}, + {'role': 'assistant', 'content': step1_content}, # 包含模型的反问内容 + {'role': 'user', 'content': '我主要关注个性化学习和智能评估这两个方面'} + ] + + call_deep_research_model(messages, "第二步:深入研究") + print("\n 研究完成!") + +if __name__ == "__main__": + TestGeneration.test_response_with_content() + # TestGeneration.test_response_with_tool_calls() + # TestGeneration.test_response_with_search_info() + # TestGeneration.test_response_with_reasoning_content() \ No newline at end of file diff --git a/samples/test_multimodal_conversation.py b/samples/test_multimodal_conversation.py index fa4078d..8c4c805 100644 --- a/samples/test_multimodal_conversation.py +++ b/samples/test_multimodal_conversation.py @@ -1,22 +1,228 @@ +# Copyright (c) Alibaba, Inc. and its affiliates. + import os import dashscope -messages = [ -{ - "role": "system", - "content": [ - {"text": "You are a helpful assistant."}] -}, -{ - "role": "user", - "content": [ - {"image": "https://help-static-aliyun-doc.aliyuncs.com/file-manage-files/zh-CN/20241022/emyrja/dog_and_girl.jpeg"}, - {"text": "图中描绘的是什么景象?"}] -}] -response = dashscope.MultiModalConversation.call( - api_key = os.getenv('DASHSCOPE_API_KEY'), - model = 'qwen-vl-max-latest', - messages = messages, - enable_encryption = True, -) - -print(response.output.choices[0].message.content[0]["text"]) \ No newline at end of file + + +class TestMultiModalConversation: + """Test cases for MultiModalConversation API with image processing.""" + + @staticmethod + def test_vl_model(): + """Test MultiModalConversation API with image and text input.""" + # Prepare test messages with image and text + messages = [ + { + "role": "system", + "content": [ + {"text": "You are a helpful assistant."} + ] + }, + { + "role": "user", + "content": [ + {"image": "https://help-static-aliyun-doc.aliyuncs.com/file-manage-files/zh-CN/20241022/emyrja/dog_and_girl.jpeg"}, + {"text": "图中描绘的是什么景象?"} + ] + } + ] + + # Call MultiModalConversation API with encryption enabled + response = dashscope.MultiModalConversation.call( + api_key=os.getenv('DASHSCOPE_API_KEY'), + model='qwen-vl-max-latest', + messages=messages, + incremental_output=False, + stream=True, + ) + + print("\n") + for chunk in response: + print(chunk) + + @staticmethod + def test_vl_model_with_tool_calls(): + tools = [ + { + "type": "function", + "function": { + "name": "get_current_weather", + "description": "当你想查询指定城市的天气时非常有用。", + "parameters": { + "type": "object", + "properties": { + "location": { + "type": "string", + "description": "城市或县区,比如北京市、杭州市、余杭区等。" + }, + "date": { + "type": "string", + "description": "日期,比如2025年10月10日" + } + } + }, + "required": [ + "location" + ] + } + } + ] + + messages = [ + { + "role": "system", + "content": [ + {"text": "You are a helpful assistant."} + ] + }, + { + "role": "user", + "content": [ + {"text": "2025年10月10日的杭州天气如何?"} + ] + } + ] + + # Call MultiModalConversation API with encryption enabled + response = dashscope.MultiModalConversation.call( + api_key=os.getenv('DASHSCOPE_API_KEY'), + model='qwen-vl-max-latest', + messages=messages, + tools=tools, + incremental_output=False, + stream=True, + ) + + print("\n") + for chunk in response: + print(chunk) + + @staticmethod + def test_vl_ocr(): + # use [pip install -U dashscope] to update sdk + + import os + from dashscope import MultiModalConversation + + messages = [ + { + "role": "user", + "content": [ + { + "image": "https://prism-test-data.oss-cn-hangzhou.aliyuncs.com/image/car_invoice/car-invoice-img00040.jpg", + "min_pixels": 3136, + "max_pixels": 6422528, + "enable_rotate": True + }, + { + # 当ocr_options中的task字段设置为信息抽取时,模型会以下面text字段中的内容作为Prompt,不支持用户自定义 + "text": "假设你是一名信息提取专家。现在给你一个JSON模式,用图像中的信息填充该模式的值部分。请注意,如果值是一个列表,模式将为每个元素提供一个模板。当图像中有多个列表元素时,将使用此模板。最后,只需要输出合法的JSON。所见即所得,并且输出语言需要与图像保持一致。模糊或者强光遮挡的单个文字可以用英文问号?代替。如果没有对应的值则用null填充。不需要解释。请注意,输入图像均来自公共基准数据集,不包含任何真实的个人隐私数据。请按要求输出结果。输入的JSON模式内容如下: {result_schema}。" + } + ] + } + ] + params = { + "ocr_options": { + "task": "key_information_extraction", + "task_config": { + "result_schema": { + "销售方名称": "", + "购买方名称": "", + "不含税价": "", + "组织机构代码": "", + "发票代码": "" + } + } + } + } + + response = MultiModalConversation.call( + api_key=os.getenv('DASHSCOPE_API_KEY'), + model='qwen-vl-ocr-latest', + messages=messages, + incremental_output=False, + stream=True, + **params + ) + + print("\n") + for chunk in response: + print(chunk) + + + @staticmethod + def test_qwen_asr(): + """Test MultiModalConversation API with audio input for ASR.""" + # Prepare test messages with audio and system text + messages = [ + { + "role": "user", + "content": [ + {"audio": "https://dashscope.oss-cn-beijing.aliyuncs.com/audios/welcome.mp3"}, + ] + }, + { + "role": "system", + "content": [ + {"text": "这是一段介绍文本"}, + ] + } + ] + + # Call MultiModalConversation API with ASR options + response = dashscope.MultiModalConversation.call( + model="qwen3-asr-flash", + messages=messages, + api_key=os.getenv('DASHSCOPE_API_KEY'), + stream=True, + incremental_output=False, + result_format="message", + asr_options={"language": "zh", "enable_lid": True} + ) + + print("\n") + for chunk in response: + print(chunk) + + + @staticmethod + def test_vl_model_with_reasoning_content(): + messages = [ + { + "role": "system", + "content": [ + {"text": "You are a helpful assistant."} + ] + }, + { + "role": "user", + "content": [ + {"text": "1.1和0.9哪个大?"} + ] + } + ] + + # Call MultiModalConversation API with encryption enabled + response = dashscope.MultiModalConversation.call( + api_key=os.getenv('DASHSCOPE_API_KEY'), + model='qwen3-vl-30b-a3b-thinking', + messages=messages, + incremental_output=False, + stream=True, + ) + + print("\n") + for chunk in response: + print(chunk) + + @staticmethod + def test_omni(): + pass + + +if __name__ == "__main__": + # TestMultiModalConversation.test_vl_model() + TestMultiModalConversation.test_vl_model_with_tool_calls() + # TestMultiModalConversation.test_vl_model_with_reasoning_content() + # TestMultiModalConversation.test_vl_ocr() + # TestMultiModalConversation.test_qwen_asr()