Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 62 additions & 0 deletions linux_voice_assistant/api_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,20 @@
PingRequest,
PingResponse,
)
# >>> PATCH: imports for entity support
from aioesphomeapi.api_pb2 import ( # type: ignore[attr-defined]
DeviceInfoRequest,
DeviceInfoResponse,
ListEntitiesDoneResponse,
ListEntitiesRequest,
ListEntitiesTextSensorResponse,
SubscribeStatesRequest,
TextSensorStateResponse,
)
import time
from dataclasses import dataclass
# <<< PATCH END

from aioesphomeapi.core import MESSAGE_TYPE_TO_PROTO
from google.protobuf import message

Expand All @@ -28,9 +42,57 @@

class APIServer(asyncio.Protocol):

# >>> PATCH: minimal entity registry for TextSensor
@dataclass
class _TextSensor:
key: int
object_id: str
name: str
unique_id: str
icon: str = "mdi:robot"

def _init_entity_registry(self) -> None:
self._entities_initialized = True
self._text_sensors = {}
self._next_entity_key = 1
# default wakeword engine sensor (can be overwritten later)
self.register_text_sensor(
object_id="wakeword_engine",
name="Wakeword Engine",
unique_id=f"{self.name.replace(' ', '_').lower()}_wakeword_engine",
icon="mdi:robot"
)

def register_text_sensor(self, object_id: str, name: str, unique_id: str, icon: str = "mdi:robot") -> int:
if not hasattr(self, "_entities_initialized"):
self._init_entity_registry()
key = self._next_entity_key
self._next_entity_key += 1
self._text_sensors[object_id] = self._TextSensor(key=key, object_id=object_id, name=name, unique_id=unique_id, icon=icon)
return key

def publish_text_sensor(self, object_id: str, state: str) -> Optional[TextSensorStateResponse]:
if not hasattr(self, "_text_sensors") or object_id not in self._text_sensors:
return None
sensor = self._text_sensors[object_id]
resp = TextSensorStateResponse()
resp.key = sensor.key
resp.state = state
# Remember last to replay on SubscribeStates
if not hasattr(self, "_last_text_states"):
self._last_text_states = {}
self._last_text_states[sensor.key] = state
return resp
# <<< PATCH END

def __init__(self, name: str) -> None:
self.name = name

# >>> PATCH: init entity registry
self._init_entity_registry()
self._last_text_states = {}
# <<< PATCH END

self._buffer: Optional[bytes] = None
self._buffer_len: int = 0
self._pos: int = 0
Expand Down
20 changes: 20 additions & 0 deletions linux_voice_assistant/wakeword_indicator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
"""Wakeword Engine indicator helper.

Provides a simple function to update the text sensor state via APIServer.
"""
from .api_server import APIServer

SENSOR_OBJECT_ID = "wakeword_engine"

def set_wakeword_engine(server: APIServer, engine_name: str) -> None:
"""Publish the current wakeword engine name to Home Assistant.

Call this after the engine is selected and whenever it changes.
"""
msg = server.publish_text_sensor(SENSOR_OBJECT_ID, engine_name)
if msg is not None:
# Enqueue/send happens in the server run loop; callers should hand this to the transport.
# For convenience, APIServer.handle_message() returns messages to write downstream.
# Here we directly push by using APIServer._writelines if available.
if getattr(server, "_writelines", None) is not None:
server._writelines([msg]) # type: ignore[attr-defined]