diff --git a/dashscope/audio/tts_v2/__init__.py b/dashscope/audio/tts_v2/__init__.py index be68540..45dc191 100644 --- a/dashscope/audio/tts_v2/__init__.py +++ b/dashscope/audio/tts_v2/__init__.py @@ -1,9 +1,12 @@ # Copyright (c) Alibaba, Inc. and its affiliates. from .enrollment import VoiceEnrollmentException, VoiceEnrollmentService -from .speech_synthesizer import AudioFormat, ResultCallback, SpeechSynthesizer +from .speech_synthesizer import (AudioFormat, ResultCallback, + SpeechSynthesizer, + SpeechSynthesizerObjectPool) __all__ = [ 'SpeechSynthesizer', 'ResultCallback', 'AudioFormat', - 'VoiceEnrollmentException', 'VoiceEnrollmentService' + 'VoiceEnrollmentException', 'VoiceEnrollmentService', + 'SpeechSynthesizerObjectPool' ] diff --git a/dashscope/audio/tts_v2/speech_synthesizer.py b/dashscope/audio/tts_v2/speech_synthesizer.py index f3627cd..d16dbfd 100644 --- a/dashscope/audio/tts_v2/speech_synthesizer.py +++ b/dashscope/audio/tts_v2/speech_synthesizer.py @@ -2,6 +2,7 @@ import json import platform +import random import threading import time import uuid @@ -266,7 +267,110 @@ def __init__( additional_params: Dict Additional parameters for the Dashscope API. """ + self.ws = None + self.start_event = threading.Event() + self.complete_event = threading.Event() + self._stopped = threading.Event() + self._audio_data: bytes = None + self._is_started = False + self._cancel = False + self._cancel_lock = threading.Lock() + self.async_call = True + self._is_first = True + self.async_call = True + # since dashscope sdk will send first text in run-task + self._start_stream_timestamp = -1 + self._first_package_timestamp = -1 + self._recv_audio_length = 0 + self.last_response = None + self._close_ws_after_use = True + self.__update_params(model, voice, format, volume, speech_rate, + pitch_rate, seed, synthesis_type, instruction, language_hints, headers, callback, workspace, url, + additional_params) + + def __send_str(self, data: str): + logger.debug('>>>send {}'.format(data)) + self.ws.send(data) + + def __connect(self, timeout_seconds=5) -> None: + """ + Establish a connection to the Bailian WebSocket server, + which can be used to pre-establish the connection and reduce interaction latency. + If this function is not used to create the connection, + it will be established when you first send text via call or streaming_call. + Parameters: + ----------- + timeout: int + Throws TimeoutError exception if the connection is not established after times out seconds. + """ + self.ws = websocket.WebSocketApp( + self.url, + header=self.request.getWebsocketHeaders(headers=self.headers, + workspace=self.workspace), + on_message=self.on_message, + on_error=self.on_error, + on_close=self.on_close, + ) + self.thread = threading.Thread(target=self.ws.run_forever) + self.thread.daemon = True + self.thread.start() + # 等待连接建立 + start_time = time.time() + while (not (self.ws.sock and self.ws.sock.connected) + and (time.time() - start_time) < timeout_seconds): + time.sleep(0.1) # 短暂休眠,避免密集轮询 + if not (self.ws.sock and self.ws.sock.connected): + raise TimeoutError( + 'websocket connection could not established within 5s. ' + 'Please check your network connection, firewall settings, or server status.' + ) + def __is_connected(self) -> bool: + """ + Returns True if the connection is established and still exists; + otherwise, returns False. + """ + if not self.ws: + return False + if not (self.ws.sock and self.ws.sock.connected): + return False + return True + + def __reset(self): + self.start_event.clear() + self.complete_event.clear() + self._stopped.clear() + self._audio_data: bytes = None + self._is_started = False + self._cancel = False + self.async_call = True + self._is_first = True + self.async_call = True + # since dashscope sdk will send first text in run-task + self._start_stream_timestamp = -1 + self._first_package_timestamp = -1 + self._recv_audio_length = 0 + self.last_response = None + + def __update_params( + self, + model, + voice, + format: AudioFormat = AudioFormat.DEFAULT, + volume=50, + speech_rate=1.0, + pitch_rate=1.0, + seed=0, + synthesis_type=0, + instruction=None, + language_hints: list = None, + headers=None, + callback: ResultCallback = None, + workspace=None, + url=None, + additional_params=None, + close_ws_after_use=True, + ): if model is None: raise ModelRequired('Model is required!') if format is None: @@ -275,6 +379,8 @@ def __init__( url = dashscope.base_websocket_api_url self.url = url self.apikey = dashscope.api_key + if self.apikey is None: + raise InputRequired('apikey is required!') self.headers = headers self.workspace = workspace self.additional_params = additional_params @@ -287,6 +393,9 @@ def __init__( if (self.sample_rate == 0): self.sample_rate = 22050 + self.callback = callback + if not self.callback: + self.async_call = False self.request = Request( apikey=self.apikey, model=model, @@ -303,28 +412,12 @@ def __init__( language_hints=language_hints ) self.last_request_id = self.request.task_id - self.start_event = threading.Event() - self.complete_event = threading.Event() - self._stopped = threading.Event() - self._audio_data: bytes = None - self._is_started = False - self._cancel = False - self._cancel_lock = threading.Lock() - self.async_call = True - self.callback = callback - self._is_first = True - self.async_call = True - # since dashscope sdk will send first text in run-task - if not self.callback: - self.async_call = False - self._start_stream_timestamp = -1 - self._first_package_timestamp = -1 - self._recv_audio_length = 0 - self.last_response = None + self._close_ws_after_use = close_ws_after_use - def __send_str(self, data: str): - logger.debug('>>>send {}'.format(data)) - self.ws.send(data) + def __str__(self): + return '[SpeechSynthesizer {} desc] model:{}, voice:{}, format:{}, sample_rate:{}, connected:{}'.format( + self.__hash__(), self.model, self.voice, self.aformat, + self.sample_rate, self.__is_connected()) def __start_stream(self, ): self._start_stream_timestamp = time.time() * 1000 @@ -340,30 +433,11 @@ def __start_stream(self, ): if self._is_started: raise InvalidTask('task has already started.') - - self.ws = websocket.WebSocketApp( - self.url, - header=self.request.getWebsocketHeaders(headers=self.headers, - workspace=self.workspace), - on_message=self.on_message, - on_error=self.on_error, - on_close=self.on_close, - ) - self.thread = threading.Thread(target=self.ws.run_forever) - self.thread.daemon = True - self.thread.start() + # 建立ws连接 + if self.ws is None: + self.__connect(5) + # 发送run-task指令 request = self.request.getStartRequest(self.additional_params) - # 等待连接建立 - timeout = 5 # 最长等待时间(秒) - start_time = time.time() - while (not (self.ws.sock and self.ws.sock.connected) - and (time.time() - start_time) < timeout): - time.sleep(0.1) # 短暂休眠,避免密集轮询 - if not (self.ws.sock and self.ws.sock.connected): - raise TimeoutError( - 'websocket connection could not established within 5s. ' - 'Please check your network connection, firewall settings, or server status.' - ) self.__send_str(request) if not self.start_event.wait(10): raise TimeoutError('start speech synthesizer failed within 5s.') @@ -422,7 +496,8 @@ def streaming_complete(self, complete_timeout_millis=600000): complete_timeout_millis)) else: self.complete_event.wait() - self.close() + if self._close_ws_after_use: + self.close() self._stopped.set() self._is_started = False @@ -434,7 +509,8 @@ def __waiting_for_complete(self, timeout): ) else: self.complete_event.wait() - self.close() + if self._close_ws_after_use: + self.close() self._stopped.set() self._is_started = False @@ -474,7 +550,7 @@ def streaming_cancel(self): return request = self.request.getFinishRequest() self.__send_str(request) - self.close() + self.ws.close() self.start_event.set() self.complete_event.set() @@ -595,3 +671,216 @@ def get_first_package_delay(self): def get_response(self): return self.last_response + + +class SpeechSynthesizerObjectPool: + _instance_lock = threading.Lock() + + def __new__(cls, *args, **kwargs): + if not hasattr(SpeechSynthesizerObjectPool, '_instance'): + with SpeechSynthesizerObjectPool._instance_lock: + if not hasattr(SpeechSynthesizerObjectPool, '_instance'): + SpeechSynthesizerObjectPool._instance = object.__new__(cls) + return SpeechSynthesizerObjectPool._instance + + class PoolObject: + def __init__(self, synthesizer): + self.synthesizer: SpeechSynthesizer = synthesizer + self.connect_time = -1 + + def __str__(self): + return f'synthesizer: {self.synthesizer}, connect_time: {self.connect_time}' + + def __init__(self, + max_size: int = 20, + url=None, + headers=None, + workspace=None): + """ + Speech synthesis object pool that follows the singleton pattern, + establishes WebSocket connections in advance to avoid connection overhead. + The connection pool will maintain a number of pre-created synthesizer objects + up to max_size; objects taken from the pool do not need to be returned, + and the pool will automatically replenish them. + + Parameters: + ----------- + max_size: int + Size of the object pool, with a value range of 1 to 100. + """ + self.DEFAULT_MODEL = 'cosyvoice-v1' + self.DEFAULT_VOICE = 'longxiaochun' + self.DEFAULT_RECONNECT_INTERVAL = 30 + self.DEFAULT_URL = url + self.DEFAUTL_HEADERS = headers + self.DEFAULT_WORKSPACE = workspace + if max_size <= 0: + raise ValueError('max_size must be greater than 0') + if max_size > 100: + raise ValueError('max_size must be less than 100') + self._pool = [] + # 如果重连中,则会将avaliable置为False,避免被使用 + self._avaliable = [] + self._pool_size = max_size + for i in range(self._pool_size): + synthesizer = self.__get_default_synthesizer() + tmpPoolObject = self.PoolObject(synthesizer) + tmpPoolObject.synthesizer._SpeechSynthesizer__connect() + tmpPoolObject.connect_time = time.time() + self._pool.append(tmpPoolObject) + self._avaliable.append(True) + self._borrowed_object_num = 0 + self._remain_object_num = max_size + self._lock = threading.Lock() + self._stop = False + self._stop_lock = threading.Lock() + self._working_thread = threading.Thread(target=self.__auto_reconnect, + args=()) + self._working_thread.start() + + def __get_default_synthesizer(self) -> SpeechSynthesizer: + return SpeechSynthesizer(model=self.DEFAULT_MODEL, + voice=self.DEFAULT_VOICE, + url=self.DEFAULT_URL, + headers=self.DEFAUTL_HEADERS, + workspace=self.DEFAULT_WORKSPACE) + + def __get_reconnect_interval(self): + return self.DEFAULT_RECONNECT_INTERVAL + random.random() * 10 - 5 + + def __auto_reconnect(self): + logger.debug( + 'speech synthesizer object pool auto reconnect thread start') + while True: + objects_need_to_connect = [] + objects_need_to_renew = [] + print('scanning queue borr: {}/{} remain: {}/{}'.format( + self._borrowed_object_num, self._pool_size, + self._remain_object_num, self._pool_size)) + with self._lock: + if self._stop: + return + + current_time = time.time() + for idx, poolObject in enumerate(self._pool): + # 如果超过固定时间没有使用对象,则重连 + if poolObject.connect_time == -1: + objects_need_to_connect.append(poolObject) + self._avaliable[idx] = False + elif (not poolObject.synthesizer. + _SpeechSynthesizer__is_connected()) or ( + current_time - poolObject.connect_time > + self.__get_reconnect_interval()): + objects_need_to_renew.append(poolObject) + self._avaliable[idx] = False + for poolObject in objects_need_to_connect: + logger.info( + '[SpeechSynthesizerObjectPool] pre-connect new synthesizer' + ) + poolObject.synthesizer._SpeechSynthesizer__connect() + poolObject.connect_time = time.time() + for poolObject in objects_need_to_renew: + logger.info( + '[SpeechSynthesizerObjectPool] renew synthesizer after {} s' + .format(current_time - poolObject.connect_time)) + poolObject.synthesizer = self.__get_default_synthesizer() + poolObject.synthesizer._SpeechSynthesizer__connect() + poolObject.connect_time = time.time() + with self._lock: + for i in range(len(self._avaliable)): + self._avaliable[i] = True + time.sleep(1) + + def shutdown(self): + """ + This is a ThreadSafe Method. + destroy the object pool + """ + logger.debug('[SpeechSynthesizerObjectPool] start shutdown') + with self._lock: + self._stop = True + self._pool = [] + self._working_thread.join() + logger.debug('[SpeechSynthesizerObjectPool] shutdown complete') + + def borrow_synthesizer( + self, + model, + voice, + format: AudioFormat = AudioFormat.DEFAULT, + volume=50, + speech_rate=1.0, + pitch_rate=1.0, + seed=0, + synthesis_type=0, + instruction=None, + language_hints: list = None, + headers=None, + callback: ResultCallback = None, + workspace=None, + url=None, + additional_params=None, + ): + """ + This is a ThreadSafe Method. + get a synthesizer object from the pool. + objects taken from the pool need to be returned, + and the pool will automatically replenish them. + If there is no synthesizer object in the pool, + a new synthesizer object will be created and returned. + """ + logger.debug('[SpeechSynthesizerObjectPool] get synthesizer') + synthesizer: SpeechSynthesizer = None + with self._lock: + # 遍历对象池,如果存在预建连的对象,则返回 + for idx, poolObject in enumerate(self._pool): + if self._avaliable[ + idx] and poolObject.synthesizer._SpeechSynthesizer__is_connected( + ): + synthesizer = poolObject.synthesizer + self._borrowed_object_num += 1 + self._remain_object_num -= 1 + self._pool.pop(idx) + self._avaliable.pop(idx) + break + + # 如果对象池不足,则返回未建连的新对象 + if synthesizer is None: + synthesizer = self.__get_default_synthesizer() + logger.warning( + '[SpeechSynthesizerObjectPool] object pool is exausted, create new synthesizer' + ) + synthesizer._SpeechSynthesizer__reset() + synthesizer._SpeechSynthesizer__update_params(model, voice, format, + volume, speech_rate, + pitch_rate, seed, synthesis_type, instruction, + language_hints, self.DEFAUTL_HEADERS, + callback, self.DEFAULT_WORKSPACE, self.DEFAULT_URL, + additional_params, False) + return synthesizer + + def return_synthesizer(self, synthesizer) -> bool: + """ + This is a ThreadSafe Method. + return a synthesizer object back to the pool. + """ + if not isinstance(synthesizer, SpeechSynthesizer): + logger.error( + '[SpeechSynthesizerObjectPool] return_synthesizer: synthesizer is not a SpeechSynthesizer object' + ) + return False + with self._lock: + if self._borrowed_object_num <= 0: + logger.debug( + '[SpeechSynthesizerObjectPool] pool is full, drop returned object' + ) + return False + poolObject = self.PoolObject(synthesizer) + poolObject.connect_time = time.time() + self._pool.append(poolObject) + self._avaliable.append(True) + self._borrowed_object_num -= 1 + self._remain_object_num += 1 + logger.debug( + '[SpeechSynthesizerObjectPool] return synthesizer back to pool' + )