Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 27 additions & 17 deletions pi-fallback/ble_poller.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,17 @@ def load_config(self) -> bool:
logger.error(f"Missing required config section: {section}")
return False

# Validate required sensor keys early to catch config errors at startup
required_sensor_keys = ["mac_address", "device_id", "local_key"]
for i, sensor in enumerate(self.config.get("sensors", [])):
for key in required_sensor_keys:
if key not in sensor:
logger.error(
f"Sensor {i} ('{sensor.get('name', 'unnamed')}') "
f"missing required key: '{key}'"
)
return False

# Set logging level
log_level = self.config.get("logging", {}).get("level", "INFO")
logging.getLogger().setLevel(getattr(logging, log_level.upper()))
Expand All @@ -79,7 +90,7 @@ def load_config(self) -> bool:
logger.error(f"Config load error: {e}")
return False

def connect_mqtt(self) -> bool:
async def connect_mqtt(self) -> bool:
"""Initialize and connect MQTT publisher."""
mqtt_config = self.config["mqtt"]
ha_config = self.config.get("homeassistant", {})
Expand All @@ -96,7 +107,7 @@ def connect_mqtt(self) -> bool:
)

self.mqtt = HADiscoveryPublisher(config)
return self.mqtt.connect()
return await self.mqtt.connect()

async def poll_sensor(self, sensor_config: dict) -> Optional[SensorData]:
"""
Expand Down Expand Up @@ -283,32 +294,31 @@ def main():
# Initialize poller
poller = BLEPoller(args.config)

# Load config
# Load config (synchronous)
if not poller.load_config():
return 1

# Connect MQTT
if not poller.connect_mqtt():
logger.error("Failed to connect to MQTT broker")
return 1

# Setup signal handlers
def signal_handler(sig, frame):
poller.shutdown()

signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)

# Run poller
try:
asyncio.run(poller.run(once=args.once))
except KeyboardInterrupt:
pass
finally:
if poller.mqtt:
poller.mqtt.disconnect()
async def _run():
if not await poller.connect_mqtt():
logger.error("Failed to connect to MQTT broker")
return 1
try:
await poller.run(once=args.once)
except KeyboardInterrupt:
pass
finally:
if poller.mqtt:
poller.mqtt.disconnect()
return 0

return 0
return asyncio.run(_run())


if __name__ == "__main__":
Expand Down
16 changes: 12 additions & 4 deletions pi-fallback/mqtt_publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
Publishes sensor data to MQTT with automatic Home Assistant entity configuration.
"""

import asyncio
import json
import logging
import ssl
Expand Down Expand Up @@ -42,7 +43,7 @@ def __init__(self, config: MQTTConfig):
self._connected = False
self._discovery_sent: set = set()

def connect(self) -> bool:
async def connect(self) -> bool:
"""Connect to MQTT broker."""
try:
# Use MQTT v5 protocol
Expand Down Expand Up @@ -73,12 +74,11 @@ def connect(self) -> bool:
self._client.connect(self.config.host, self.config.port, keepalive=60)
self._client.loop_start()

# Wait for connection
import time
# Wait for connection without blocking the event loop
for _ in range(50): # 5 seconds timeout
if self._connected:
return True
time.sleep(0.1)
await asyncio.sleep(0.1)

logger.error("MQTT connection timeout")
return False
Expand Down Expand Up @@ -215,6 +215,10 @@ def publish_state(self, sensor_config: dict, data: dict):
sensor_config: Sensor configuration from config file
data: Sensor data dictionary (from SensorData.to_dict())
"""
if not self._connected or not self._client:
logger.error("Not connected to MQTT broker, dropping publish_state")
return

unique_id = sensor_config["unique_id"]
state_topic = f"sgs01/{unique_id}/state"

Expand All @@ -233,6 +237,10 @@ def publish_state(self, sensor_config: dict, data: dict):

def publish_availability(self, sensor_config: dict, available: bool):
"""Publish sensor availability status."""
if not self._connected or not self._client:
logger.error("Not connected to MQTT broker, dropping publish_availability")
return

unique_id = sensor_config["unique_id"]
availability_topic = f"sgs01/{unique_id}/availability"

Expand Down
22 changes: 17 additions & 5 deletions pi-fallback/tuya_ble.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,9 +148,9 @@ def _decrypt(self, data: bytes) -> bytes:

# Remove PKCS7 padding
padding_len = decrypted[-1]
if padding_len <= 16:
return decrypted[:-padding_len]
return decrypted
if padding_len == 0 or padding_len > 16:
raise ValueError(f"Invalid PKCS7 padding length: {padding_len}")
return decrypted[:-padding_len]

def _build_packet(self, command: int, data: bytes = b"") -> bytes:
"""Build a Tuya BLE packet."""
Expand Down Expand Up @@ -203,7 +203,7 @@ def _parse_dps(self, data: bytes) -> dict:
if dp_type == DPType.BOOL:
value = bool(dp_data[0]) if dp_data else False
elif dp_type == DPType.VALUE:
value = struct.unpack(">i", dp_data.ljust(4, b'\x00')[:4])[0]
value = struct.unpack(">i", dp_data.rjust(4, b'\x00')[:4])[0]
elif dp_type == DPType.STRING:
value = dp_data.decode("utf-8", errors="replace")
elif dp_type == DPType.ENUM:
Expand Down Expand Up @@ -359,8 +359,16 @@ async def read_sensors(self) -> Optional[SensorData]:

# If no DPs received, try triggering by writing temp unit
if not self._received_dps:
self._response_event.clear()
await self._trigger_update()
await asyncio.sleep(2)
if not self._received_dps:
# Clear stale signal from _trigger_update's DP_WRITE ack before
# waiting for the actual DP_REPORT notification
self._response_event.clear()
try:
await asyncio.wait_for(self._response_event.wait(), timeout=5.0)
except asyncio.TimeoutError:
logger.warning("No DPs received after trigger update")

# Parse received DPs into SensorData
return self._parse_sensor_data()
Expand All @@ -371,6 +379,10 @@ async def read_sensors(self) -> Optional[SensorData]:

async def _trigger_update(self):
"""Trigger sensor to send data by writing temperature unit."""
if not self._client or not self._client.is_connected:
logger.error("Device disconnected, cannot trigger update")
return

# Write DP 9 (temp_unit) = 0 (celsius)
dp_data = struct.pack(">BBHB", 9, DPType.ENUM, 1, 0)

Expand Down