Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions livekit-plugins/livekit-plugins-liquidai/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# LiquidAI Audio plugin for LiveKit Agents

Support for the Audio family of STT/TTS from LiquidAI.

See [https://huggingface.co/LiquidAI/LFM2.5-Audio-1.5B-GGUF](https://huggingface.co/LiquidAI/LFM2.5-Audio-1.5B-GGUF) for more information.


## Installation

```bash
pip install livekit-plugins-liquidai
```

## Pre-requisites

Start audio server. `llama-liquid-audio-server` is inside LFM2.5-Audio-1.5B-GGUF's `runners` folder.

```bash
export CKPT=/path/to/LFM2.5-Audio-1.5B-GGUF
./llama-liquid-audio-server -m $CKPT/LFM2.5-Audio-1.5B-Q4_0.gguf -mm $CKPT/mmproj-LFM2.5-Audio-1.5B-Q4_0.gguf -mv $CKPT/vocoder-LFM2.5-Audio-1.5B-Q4_0.gguf --tts-speaker-file $CKPT/tokenizer-LFM2.5-Audio-1.5B-Q4_0.gguf
```
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# Copyright 2023 LiveKit, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""LiquidAI LFM2.5-Audio plugin for LiveKit Agents

Provides STT and TTS capabilities using the LFM2.5-Audio model with OpenAI-compatible API.
"""

from .stt import STT
from .tts import TTS
from .version import __version__

__all__ = ["STT", "TTS", "__version__"]

from livekit.agents import Plugin

from .log import logger


class LiquidAIPlugin(Plugin):
def __init__(self) -> None:
super().__init__(__name__, __version__, __package__, logger)


Plugin.register_plugin(LiquidAIPlugin())

# Cleanup docs of unexported modules
_module = dir()
NOT_IN_ALL = [m for m in _module if m not in __all__]

__pdoc__ = {}

for n in NOT_IN_ALL:
__pdoc__[n] = False
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
import logging

logger = logging.getLogger("livekit.plugins.liquidai")
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
from __future__ import annotations

import asyncio
import base64
from dataclasses import dataclass
from typing import cast

import httpx
import openai
from openai import AsyncStream
from openai.types.chat import ChatCompletionChunk

from livekit import rtc
from livekit.agents import APIConnectionError, APIConnectOptions, stt
from livekit.agents.stt import SpeechEventType, STTCapabilities
from livekit.agents.types import NOT_GIVEN, NotGivenOr
from livekit.agents.utils import AudioBuffer, is_given

from .log import logger

DEFAULT_BASE_URL = "http://127.0.0.1:8080/v1"
DEFAULT_API_KEY = "dummy"
DEFAULT_SYSTEM_PROMPT = "Perform ASR."


@dataclass
class _STTOptions:
language: str
system_prompt: str


class STT(stt.STT):
"""Speech-to-Text using LiquidAI LFM2.5-Audio model."""

def __init__(
self,
*,
base_url: NotGivenOr[str] = NOT_GIVEN,
api_key: NotGivenOr[str] = NOT_GIVEN,
language: NotGivenOr[str] = NOT_GIVEN,
system_prompt: NotGivenOr[str] = NOT_GIVEN,
) -> None:
"""
Create a new instance of LiquidAI STT.

Args:
base_url: The base URL of the LFM2.5-Audio server (default: http://127.0.0.1:8080/v1)
api_key: API key for authentication (default: "dummy")
language: Language code for transcription (default: "en")
system_prompt: System prompt for ASR (default: "Perform ASR.")
"""
super().__init__(
capabilities=STTCapabilities(
streaming=False, interim_results=False, aligned_transcript=False
)
)

self._opts = _STTOptions(
language=language if is_given(language) else "en",
system_prompt=system_prompt if is_given(system_prompt) else DEFAULT_SYSTEM_PROMPT,
)

self._client = openai.AsyncClient(
max_retries=0,
api_key=api_key if is_given(api_key) else DEFAULT_API_KEY,
base_url=base_url if is_given(base_url) else DEFAULT_BASE_URL,
http_client=httpx.AsyncClient(
timeout=httpx.Timeout(connect=15.0, read=60.0, write=5.0, pool=5.0),
follow_redirects=True,
limits=httpx.Limits(
max_connections=50, max_keepalive_connections=50, keepalive_expiry=120
),
),
)

@property
def model(self) -> str:
return "LFM2.5-Audio"

@property
def provider(self) -> str:
return "LiquidAI"

def update_options(
self,
*,
language: NotGivenOr[str] = NOT_GIVEN,
system_prompt: NotGivenOr[str] = NOT_GIVEN,
) -> None:
if is_given(language):
self._opts.language = language
if is_given(system_prompt):
self._opts.system_prompt = system_prompt

async def _recognize_impl(
self,
buffer: AudioBuffer,
*,
language: NotGivenOr[str] = NOT_GIVEN,
conn_options: APIConnectOptions,
) -> stt.SpeechEvent:
try:
# Use local variable to avoid mutating instance state
effective_language = language if is_given(language) else self._opts.language

# Convert audio buffer to WAV bytes and base64 encode
wav_bytes = rtc.combine_audio_frames(buffer).to_wav_bytes()
encoded_audio = base64.b64encode(wav_bytes).decode("utf-8")

# Create messages for the API
messages = [
{"role": "system", "content": self._opts.system_prompt},
{
"role": "user",
"content": [
{
"type": "input_audio",
"input_audio": {"data": encoded_audio, "format": "wav"},
}
],
},
]

# Call the streaming chat completion API
response = await self._client.chat.completions.create(
model="LFM2.5-Audio",
messages=messages, # type: ignore
stream=True,
max_tokens=512,
extra_body={"reset_context": True},
timeout=conn_options.timeout,
)
# When stream=True, the response is always an AsyncStream
stream = cast(AsyncStream[ChatCompletionChunk], response)

# Collect text from the stream
text_chunks: list[str] = []
async for chunk in stream:
if chunk.choices and chunk.choices[0].delta.content:
text_chunks.append(chunk.choices[0].delta.content)

text = "".join(text_chunks)
logger.debug(f"STT transcription: {text}")

return self._transcription_to_speech_event(text=text, language=effective_language)

except openai.APITimeoutError as e:
raise APIConnectionError() from e
except asyncio.CancelledError:
raise
except openai.APIStatusError as e:
raise APIConnectionError() from e
except Exception as e:
logger.error(f"STT error: {e}")
raise APIConnectionError() from e

def _transcription_to_speech_event(self, text: str, language: str) -> stt.SpeechEvent:
return stt.SpeechEvent(
type=SpeechEventType.FINAL_TRANSCRIPT,
alternatives=[stt.SpeechData(text=text, language=language)],
)

async def aclose(self) -> None:
await self._client.close()
Loading