From e9ab2eccb80330b5b91ed58cebf3f928c3e836c2 Mon Sep 17 00:00:00 2001 From: DevOldSchool Date: Sun, 28 Dec 2025 16:55:29 +1000 Subject: [PATCH 1/2] Added energy monthly reset option and history retention. --- .github/workflows/tests.yml | 6 +- docker-compose.yml | 6 +- hub-server/aggregator.py | 10 ++ hub-server/config.py | 5 + hub-server/database.py | 71 +++++++++++++- hub-server/requirements-dev.txt | 3 + hub-server/tests/test_aggregator.py | 92 +++++++++++++++++++ hub-server/tests/test_database.py | 117 ++++++++++++++++++++++++ hub-server/tests/test_hub_server.py | 109 ++++++++++++++++++++++ hub-server/tests/test_parser.py | 74 --------------- hub-server/tests/test_payload_parser.py | 79 ++++++++++++++++ 11 files changed, 492 insertions(+), 80 deletions(-) create mode 100644 hub-server/requirements-dev.txt create mode 100644 hub-server/tests/test_aggregator.py create mode 100644 hub-server/tests/test_database.py create mode 100644 hub-server/tests/test_hub_server.py delete mode 100644 hub-server/tests/test_parser.py create mode 100644 hub-server/tests/test_payload_parser.py diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index ae741bd..deba919 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -25,9 +25,9 @@ jobs: - name: Install dependencies run: | python -m pip install --upgrade pip - if [ -f requirements.txt ]; then pip install -r requirements.txt; fi + if [ -f hub-server/requirements-dev.txt ]; then pip install -r hub-server/requirements-dev.txt; fi - - name: Run unit tests + - name: Run pytest working-directory: hub-server run: | - python -m unittest discover -s tests + pytest -v --cov=. --cov-report=term-missing diff --git a/docker-compose.yml b/docker-compose.yml index dfc02d5..2f3f91b 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -11,14 +11,18 @@ services: TZ: Australia/Brisbane # Logging LOG_LEVEL: INFO + # How many months of readings and aggregated values to keep, 0 = keep everything + HISTORY_RETENTION_MONTHS: 0 # MQTT MQTT_ENABLED: true MQTT_BROKER: homeassistant.local MQTT_PORT: 1883 MQTT_USER: mqtt-broker-username-here MQTT_PASS: your-password-here - # Home Assistant Discovery + # Home Assistant HA_DISCOVERY: true + # Setting to true will reset the cumulative energy value each month + ENERGY_MONTHLY_RESET: false legacy-nginx: # build: ./legacy-nginx diff --git a/hub-server/aggregator.py b/hub-server/aggregator.py index cdc86ac..2b28ecb 100644 --- a/hub-server/aggregator.py +++ b/hub-server/aggregator.py @@ -1,7 +1,9 @@ import logging import threading +import time from database import Database from mqtt_manager import MQTTManager +from config import HISTORY_RETENTION_MONTHS class Aggregator: @@ -11,6 +13,7 @@ def __init__(self, database: Database, mqtt_manager: MQTTManager, interval_sec=3 self.interval_sec = interval_sec self._stop_event = threading.Event() self._thread = None + self._last_truncation_ts = 0 def aggregate_loop(self): @@ -21,6 +24,13 @@ def aggregate_loop(self): """ while not self._stop_event.is_set(): try: + # Perform history truncation check once per day + if HISTORY_RETENTION_MONTHS > 0: + now = time.time() + if now - self._last_truncation_ts >= 86400: + self.database.truncate_old_data(HISTORY_RETENTION_MONTHS) + self._last_truncation_ts = now + processed = self.database.aggregate_hours(limit_hours=1000) logging.debug(f"Aggregator processed {processed} hours") diff --git a/hub-server/config.py b/hub-server/config.py index e0bd19c..1a68e2b 100644 --- a/hub-server/config.py +++ b/hub-server/config.py @@ -52,6 +52,11 @@ ENERGY_VALUE_TEMPLATE = os.getenv("ENERGY_VALUE_TEMPLATE", "{{ value_json.value }}") ENERGY_UNIT_OF_MEASUREMENT = os.getenv("ENERGY_UNIT_OF_MEASUREMENT", "kWh") +ENERGY_MONTHLY_RESET = os.getenv("ENERGY_MONTHLY_RESET", "false").lower() in ("true", "1", "yes", "on") + +# History retention in months (0 means keep everything) +HISTORY_RETENTION_MONTHS = int(os.getenv("HISTORY_RETENTION_MONTHS", "0")) + DEVICE_NAME = os.getenv("DEVICE_NAME", "Efergy Hub") DEVICE_IDENTIFIERS = os.getenv("DEVICE_IDENTIFIERS", ["efergy"]) DEVICE_MANUFACTURER = os.getenv("DEVICE_MANUFACTURER", "Efergy") diff --git a/hub-server/database.py b/hub-server/database.py index f6b9957..54e829e 100755 --- a/hub-server/database.py +++ b/hub-server/database.py @@ -2,10 +2,11 @@ import threading import time import sqlite3 +from datetime import datetime from pathlib import Path from typing import Optional, Dict, Union from config import ( - SQLITE_TIMEOUT, POWER_FACTOR, MAINS_VOLTAGE + SQLITE_TIMEOUT, POWER_FACTOR, MAINS_VOLTAGE, ENERGY_MONTHLY_RESET ) @@ -180,10 +181,26 @@ def get_all_labels(self): def get_total_energy(self) -> float: + """ + Return the sum of energy. + If ENERGY_MONTHLY_RESET is True, it returns the sum for the current month only. + Otherwise, it returns the absolute total energy. + """ try: + query = "SELECT SUM(kwh) FROM energy_hourly" + params = () + + if ENERGY_MONTHLY_RESET: + # Calculate start of current month + now = datetime.now() + first_day_of_month = now.replace(day=1, hour=0, minute=0, second=0, microsecond=0) + month_start_ts = int(first_day_of_month.timestamp()) + query += " WHERE hour_start >= ?" + params = (month_start_ts,) + with sqlite3.connect(self.db_path, timeout=SQLITE_TIMEOUT) as conn: cursor = conn.cursor() - cursor.execute("SELECT SUM(kwh) FROM energy_hourly") + cursor.execute(query, params) row = cursor.fetchone() return float(row[0]) if row and row[0] else 0.0 except Exception as e: @@ -191,6 +208,55 @@ def get_total_energy(self) -> float: return 0.0 + def truncate_old_data(self, months: int) -> int: + """ + Truncates data older than the specified number of months. + Deletes from 'readings' and 'energy_hourly'. + + Args: + months: Number of months of history to keep. + + Returns: + Number of rows deleted (total). + """ + if months <= 0: + return 0 + + try: + now = datetime.now() + year, month = divmod(now.month - months - 1, 12) + month += 1 + year = now.year + year + + cutoff_date = now.replace(year=year, month=month, day=1, hour=0, minute=0, second=0, microsecond=0) + cutoff_ts = int(cutoff_date.timestamp()) + + deleted_count = 0 + with sqlite3.connect(self.db_path, timeout=SQLITE_TIMEOUT) as conn: + cursor = conn.cursor() + + # Delete from readings + cursor.execute("DELETE FROM readings WHERE timestamp < ?", (cutoff_ts,)) + deleted_count += cursor.rowcount + + # Delete from energy_hourly + cursor.execute("DELETE FROM energy_hourly WHERE hour_start < ?", (cutoff_ts,)) + deleted_count += cursor.rowcount + + conn.commit() + + # Reclaim space + cursor.execute("VACUUM") + + if deleted_count > 0: + logging.info(f"Truncated {deleted_count} old records (older than {cutoff_date.strftime('%Y-%m-%d')})") + + return deleted_count + except Exception as e: + logging.error(f"Failed to truncate old data: {e}") + return 0 + + # ---------------- Aggregation logic ---------------- def fetch_hour_range_to_process(self, cursor: sqlite3.Cursor) -> Optional[int]: """ @@ -219,6 +285,7 @@ def fetch_hour_range_to_process(self, cursor: sqlite3.Cursor) -> Optional[int]: return last_hour_done + 3600 + def aggregate_one_hour(self, cursor: sqlite3.Cursor, hour_start: int) -> Optional[float]: """ Aggregate a single hour [hour_start, hour_start+3600) and return kwh inserted, diff --git a/hub-server/requirements-dev.txt b/hub-server/requirements-dev.txt new file mode 100644 index 0000000..daa4fe4 --- /dev/null +++ b/hub-server/requirements-dev.txt @@ -0,0 +1,3 @@ +paho-mqtt >= 2.1.0 +pytest >= 8.4.1 +pytest-cov >= 7.0.0 \ No newline at end of file diff --git a/hub-server/tests/test_aggregator.py b/hub-server/tests/test_aggregator.py new file mode 100644 index 0000000..4a125e5 --- /dev/null +++ b/hub-server/tests/test_aggregator.py @@ -0,0 +1,92 @@ +import pytest +import time +from unittest.mock import MagicMock, patch +from aggregator import Aggregator + +@pytest.fixture +def mock_db(): + return MagicMock() + +@pytest.fixture +def mock_mqtt(): + return MagicMock() + + +def test_aggregator_loop_truncation(mock_db, mock_mqtt): + with patch('aggregator.HISTORY_RETENTION_MONTHS', 1): + aggregator = Aggregator(mock_db, mock_mqtt, interval_sec=0.1) + + mock_db.get_total_energy.return_value = 12.34 + + def side_effect(*args, **kwargs): + if mock_db.truncate_old_data.call_count >= 1: + aggregator._stop_event.set() + return 1 + + mock_db.aggregate_hours.side_effect = side_effect + + aggregator.start() + time.sleep(0.5) + aggregator.stop() + + assert mock_db.truncate_old_data.called + assert mock_db.aggregate_hours.called + assert mock_mqtt.publish_energy.called + + + +def test_aggregator_no_truncation_when_disabled(mock_db, mock_mqtt): + # Set HISTORY_RETENTION_MONTHS = 0 + with patch('aggregator.HISTORY_RETENTION_MONTHS', 0): + aggregator = Aggregator(mock_db, mock_mqtt, interval_sec=0.1) + + # Stop without joining + mock_db.aggregate_hours.side_effect = lambda **kwargs: aggregator._stop_event.set() or 1 + + aggregator.start() + time.sleep(0.3) + aggregator.stop() + + assert not mock_db.truncate_old_data.called + assert mock_db.aggregate_hours.called + assert mock_mqtt.publish_energy.called + + +def test_aggregator_truncation_interval(mock_db, mock_mqtt): + # Re-testing logic by calling aggregate_loop once and checking state + with patch('aggregator.HISTORY_RETENTION_MONTHS', 1): + aggregator = Aggregator(mock_db, mock_mqtt, interval_sec=0.1) + + # Patching wait so it doesn't sleep + aggregator._stop_event.wait = MagicMock() + + # Mocking time to test daily truncation + # The loop calls time.time() at the beginning of each iteration. + # Iteration 1: time.time() -> 1000. (now - self._last_truncation_ts) = 1000 - 0 >= 86400 is True if we consider initial last_truncation_ts=0 + # Wait... the code has self._last_truncation_ts = 0. + # If now = 1000, then 1000 - 0 = 1000. 1000 >= 86400 is FALSE. + + # Let's check aggregator.py: + # if now - self._last_truncation_ts >= 86400: + + # So it won't truncate on the very first run if now < 86400. + + with patch('time.time', side_effect=[100000, 100000, 100000 + 86400 + 1, 300000]): + # Iteration 1: now = 100000. 100000 - 0 = 100000 >= 86400. TRUNCATE. _last_truncation_ts = 100000. + # Iteration 2: now = 100000. 100000 - 100000 = 0 < 86400. NO TRUNCATE. + # Iteration 3: now = 100000 + 86400 + 1 = 186401. 186401 - 100000 = 86401 >= 86400. TRUNCATE. + + # Use a side effect to stop after 3 iterations + count = 0 + def stop_side_effect(*args, **kwargs): + nonlocal count + count += 1 + if count >= 3: + aggregator._stop_event.set() + return 1 + mock_db.aggregate_hours.side_effect = stop_side_effect + + aggregator.aggregate_loop() + + assert mock_db.truncate_old_data.call_count == 2 + assert aggregator._last_truncation_ts == 186401 diff --git a/hub-server/tests/test_database.py b/hub-server/tests/test_database.py new file mode 100644 index 0000000..1fd1003 --- /dev/null +++ b/hub-server/tests/test_database.py @@ -0,0 +1,117 @@ +import pytest +import sqlite3 +import time +from database import Database + +@pytest.fixture +def db_path(tmp_path): + return tmp_path / "test_readings.db" + +@pytest.fixture +def db(db_path): + database = Database(db_path) + database.setup() + return database + +def test_database_setup(db_path): + db = Database(db_path) + db.setup() + assert db_path.exists() + + with sqlite3.connect(db_path) as conn: + cursor = conn.cursor() + cursor.execute("SELECT name FROM sqlite_master WHERE type='table';") + tables = {row[0] for row in cursor.fetchall()} + assert "labels" in tables + assert "readings" in tables + assert "energy_hourly" in tables + + +def test_log_data_and_labels(db): + db.log_data("test_label", 100.0, timestamp=1000) + db.log_data("test_label", 200.0, timestamp=1100) + db.log_data("another_label", 50.0, timestamp=1200) + + labels = db.get_all_labels() + assert "test_label" in labels + assert "another_label" in labels + assert len(labels) == 2 + + with sqlite3.connect(db.db_path) as conn: + cursor = conn.cursor() + cursor.execute("SELECT value FROM readings ORDER BY timestamp") + readings = [row[0] for row in cursor.fetchall()] + assert readings == [100.0, 200.0, 50.0] + + +def test_aggregate_one_hour(db): + # Hour starts at 3600 + hour_start = 3600 + + # Add some readings in this hour + # efergy_h1 uses (PF * V * I/1000) / 1000 for kW + db.log_data("efergy_h1_test", 1000.0, timestamp=hour_start) # 1000mA -> (0.6 * 230 * 1) / 1000 = 0.138 kW + db.log_data("efergy_h1_test", 2000.0, timestamp=hour_start + 1800) # 2000mA -> 0.276 kW + db.log_data("efergy_h1_test", 1000.0, timestamp=hour_start + 3600) # Next hour, shouldn't be included in this aggregation + + with sqlite3.connect(db.db_path) as conn: + cursor = conn.cursor() + kwh = db.aggregate_one_hour(cursor, hour_start) + + # Calculation: + # 0.138 kW for 1800s = 0.138 * (1800/3600) = 0.069 kWh + # 0.276 kW for 1800s = 0.276 * (1800/3600) = 0.138 kWh + # Total = 0.069 + 0.138 = 0.207 kWh + # Note: the code uses (rows[i+1][0] - rows[i][0]) for interval. + # So for row 0 (ts=3600), interval = 5400 - 3600 = 1800. + # For row 1 (ts=5400), it's the last row, so it uses the same interval as previous: 1800. + + expected_kwh = (0.138 * 0.5) + (0.276 * 0.5) + assert kwh == pytest.approx(expected_kwh) + + +def test_aggregate_hours(db): + now = int(time.time()) + # Round to start of 2 hours ago to ensure we have a full hour to process + hour1_start = (now - 7200) - (now % 3600) + hour2_start = hour1_start + 3600 + + db.log_data("efergy_h3_test", 100.0, timestamp=hour1_start) # h3 uses value/10/1000 = 0.01 kW + db.log_data("efergy_h3_test", 100.0, timestamp=hour1_start + 3599) + + processed = db.aggregate_hours() + assert processed >= 1 + + total_energy = db.get_total_energy() + assert total_energy > 0 + + +def test_truncate_old_data(db): + # Log some "old" data + now = int(time.time()) + two_months_ago = now - (62 * 24 * 3600) + + db.log_data("old_label", 100.0, timestamp=two_months_ago) + db.log_data("new_label", 100.0, timestamp=now) + + # Aggregate to have something in energy_hourly to + # Need to make sure we have a full hour for the old data + old_hour_start = two_months_ago - (two_months_ago % 3600) + db.log_data("old_label", 100.0, timestamp=old_hour_start) + db.log_data("old_label", 100.0, timestamp=old_hour_start + 3599) + + with sqlite3.connect(db.db_path) as conn: + db.aggregate_one_hour(conn.cursor(), old_hour_start) + conn.commit() + + # Truncate to 1 month + deleted = db.truncate_old_data(1) + assert deleted > 0 + + with sqlite3.connect(db.db_path) as conn: + cursor = conn.cursor() + cursor.execute("SELECT count(*) FROM readings WHERE timestamp < ?", (now - 31*24*3600,)) + assert cursor.fetchone()[0] == 0 + + cursor.execute("SELECT count(*) FROM energy_hourly WHERE hour_start < ?", (now - 31*24*3600,)) + assert cursor.fetchone()[0] == 0 diff --git a/hub-server/tests/test_hub_server.py b/hub-server/tests/test_hub_server.py new file mode 100644 index 0000000..de56703 --- /dev/null +++ b/hub-server/tests/test_hub_server.py @@ -0,0 +1,109 @@ +import pytest +import http.client +import threading +from unittest.mock import MagicMock +from hub_server import EfergyHTTPServer, FakeEfergyServer + +@pytest.fixture +def mock_db(): + return MagicMock() + +@pytest.fixture +def mock_mqtt(): + return MagicMock() + +@pytest.fixture +def test_server(mock_db, mock_mqtt): + # Use a random free port + server_address = ('127.0.0.1', 0) + httpd = EfergyHTTPServer(server_address, FakeEfergyServer, mock_db, mock_mqtt) + port = httpd.server_port + + thread = threading.Thread(target=httpd.serve_forever) + thread.daemon = True + thread.start() + + yield ('127.0.0.1', port) + + httpd.shutdown() + httpd.server_close() + thread.join() + + +def test_get_key(test_server): + host, port = test_server + conn = http.client.HTTPConnection(host, port) + conn.request("GET", "/get_key.html") + response = conn.getresponse() + assert response.status == 200 + assert response.read() == b"TT|a1bCDEFGHa1zZ\n" + + +def test_check_key(test_server): + host, port = test_server + conn = http.client.HTTPConnection(host, port) + conn.request("GET", "/check_key.html") + response = conn.getresponse() + assert response.status == 200 + assert response.read() == b"success" + + +def test_404(test_server): + host, port = test_server + conn = http.client.HTTPConnection(host, port) + conn.request("GET", "/unknown") + response = conn.getresponse() + assert response.status == 404 + + +def test_post_h2(test_server, mock_db, mock_mqtt): + host, port = test_server + conn = http.client.HTTPConnection(host, port) + + # Example h2 payload (from test_payload_parser.py or similar) + # The payload parser expects a certain format. + payload = '741459|1|EFCT|P1,2479.98' + headers = {"Content-Type": "text/plain", "Content-Length": len(payload)} + + conn.request("POST", "/h2", body=payload, headers=headers) + response = conn.getresponse() + + assert response.status == 200 + assert response.read() == b"success" + + # Verify DB call + # parse_sensor_payload("741459|1|EFCT|P1,2479.98", "h2") should return something that results in log_data + assert mock_db.log_data.called + assert mock_mqtt.publish_power.called + + +def test_post_recjson_h1(test_server, mock_db, mock_mqtt): + host, port = test_server + conn = http.client.HTTPConnection(host, port) + + # h1 sends json= + payload = 'json=AABBCCDDDDDD|694851F9|v1.0.1|{"data":[[610965,"mA","E1",33314,0,0,65535]]}|39ef0bdc14b52df375b79555f059b52f' + headers = {"Content-Type": "application/x-www-form-urlencoded", "Content-Length": len(payload)} + + conn.request("POST", "/recjson", body=payload, headers=headers) + response = conn.getresponse() + + assert response.status == 200 + assert response.read() == b"success" + + assert mock_db.log_data.called + assert mock_mqtt.publish_power.called + + +def test_post_ping(test_server): + host, port = test_server + conn = http.client.HTTPConnection(host, port) + + payload = '123456|789012' + headers = {"Content-Type": "application/eh-ping", "Content-Length": len(payload)} + + conn.request("POST", "/any", body=payload, headers=headers) + response = conn.getresponse() + + assert response.status == 200 + assert response.read() == b"success" diff --git a/hub-server/tests/test_parser.py b/hub-server/tests/test_parser.py deleted file mode 100644 index 06569d1..0000000 --- a/hub-server/tests/test_parser.py +++ /dev/null @@ -1,74 +0,0 @@ -import unittest -from payload_parser import parse_sensor_line, parse_sensor_payload - - -class TestParser(unittest.TestCase): - def test_h1_payload(self): - line = 'MAC123|694851F9|v1.0.1|{"data":[[610965,"mA","E1",33314,0,0,65535]]}|39ef0bdc14b52df375b79555f059b52f' - hub_version = "h1" - - result = parse_sensor_line(line, hub_version) - - self.assertIsNotNone(result) - self.assertEqual(result["type"], "CT") - self.assertEqual(result["sid"], "MAC123") - self.assertEqual(result["label"], "efergy_h1_MAC123") - self.assertEqual(result["value"], 33314.0) - - def test_h2_single_sensor_payload(self): - line = "741459|1|EFCT|P1,2479.98" - hub_version = "h2" - - result = parse_sensor_line(line, hub_version) - - self.assertIsNotNone(result) - self.assertEqual(result["type"], "CT") - self.assertEqual(result["sid"], "741459") - self.assertEqual(result["label"], "efergy_h2_741459") - self.assertEqual(result["value"], 2479.98) - self.assertEqual(result["hub_version"], "h2") - - def test_h2_multi_sensor_example(self): - line = "747952|0|EFMS1|M,96.00&T,0.00&L,0.00|-67" - hub_version = "h2" - result = parse_sensor_line(line, hub_version) - self.assertIsNotNone(result) - self.assertEqual(result["type"], "EFMS") - self.assertEqual(result["sid"], "747952") - self.assertEqual(result["rssi"], -67.0) - self.assertIn(("M", 96.00), result["metrics"]) - - def test_h3_example(self): - line = "815751|1|EFCT|P1,391.86|-66" - hub_version = "h3" - result = parse_sensor_line(line, hub_version) - self.assertIsNotNone(result) - self.assertEqual(result["type"], "CT") - self.assertEqual(result["sid"], "815751") - self.assertEqual(result["value"], 391.86) - self.assertEqual(result["rssi"], -66.0) - - def test_efms_payload(self): - line = "741459|1|EFMS|M,64.00&T,22.50&L,100.00|85" - hub_version = "h2" - - result = parse_sensor_line(line, hub_version) - - self.assertIsNotNone(result) - self.assertEqual(result["type"], "EFMS") - self.assertEqual(result["sid"], "741459") - self.assertEqual(len(result["metrics"]), 3) - self.assertIn(("M", 64.00), result["metrics"]) - self.assertIn(("T", 22.50), result["metrics"]) - self.assertIn(("L", 100.00), result["metrics"]) - self.assertEqual(result["rssi"], 85.0) - - def test_hub_status_line(self): - line = "0|1|STATUS|OK" - hub_version = "h2" - result = parse_sensor_line(line, hub_version) - self.assertIsNone(result) - - -if __name__ == "__main__": - unittest.main() diff --git a/hub-server/tests/test_payload_parser.py b/hub-server/tests/test_payload_parser.py new file mode 100644 index 0000000..1e3ce3b --- /dev/null +++ b/hub-server/tests/test_payload_parser.py @@ -0,0 +1,79 @@ +from payload_parser import parse_sensor_line + + +def test_h1_payload(): + line = 'MAC123|694851F9|v1.0.1|{"data":[[610965,"mA","E1",33314,0,0,65535]]}|39ef0bdc14b52df375b79555f059b52f' + hub_version = "h1" + + result = parse_sensor_line(line, hub_version) + + assert result is not None + assert result["type"] == "CT" + assert result["sid"] == "MAC123" + assert result["label"] == "efergy_h1_MAC123" + assert result["value"] == 33314.0 + + +def test_h2_single_sensor_payload(): + line = "741459|1|EFCT|P1,2479.98" + hub_version = "h2" + + result = parse_sensor_line(line, hub_version) + + assert result is not None + assert result["type"] == "CT" + assert result["sid"] == "741459" + assert result["label"] == "efergy_h2_741459" + assert result["value"] == 2479.98 + assert result["hub_version"] == "h2" + + +def test_h2_multi_sensor_example(): + line = "747952|0|EFMS1|M,96.00&T,0.00&L,0.00|-67" + hub_version = "h2" + + result = parse_sensor_line(line, hub_version) + + assert result is not None + assert result["type"] == "EFMS" + assert result["sid"] == "747952" + assert result["rssi"] == -67.0 + assert ("M", 96.00) in result["metrics"] + + +def test_h3_example(): + line = "815751|1|EFCT|P1,391.86|-66" + hub_version = "h3" + + result = parse_sensor_line(line, hub_version) + + assert result is not None + assert result["type"] == "CT" + assert result["sid"] == "815751" + assert result["value"] == 391.86 + assert result["rssi"] == -66.0 + + +def test_efms_payload(): + line = "741459|1|EFMS|M,64.00&T,22.50&L,100.00|85" + hub_version = "h2" + + result = parse_sensor_line(line, hub_version) + + assert result is not None + assert result["type"] == "EFMS" + assert result["sid"] == "741459" + assert len(result["metrics"]) == 3 + assert ("M", 64.00) in result["metrics"] + assert ("T", 22.50) in result["metrics"] + assert ("L", 100.00) in result["metrics"] + assert result["rssi"] == 85.0 + + +def test_hub_status_line(): + line = "0|1|STATUS|OK" + hub_version = "h2" + + result = parse_sensor_line(line, hub_version) + + assert result is None From a410533ba0a3c2585042daca44a332274cf0c1e1 Mon Sep 17 00:00:00 2001 From: DevOldSchool Date: Sun, 28 Dec 2025 17:10:14 +1000 Subject: [PATCH 2/2] Update hub server tests to resolve warnings. --- .github/workflows/tests.yml | 2 +- hub-server/tests/test_hub_server.py | 115 ++++++++++++++-------------- 2 files changed, 60 insertions(+), 57 deletions(-) diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index deba919..df7c98b 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -30,4 +30,4 @@ jobs: - name: Run pytest working-directory: hub-server run: | - pytest -v --cov=. --cov-report=term-missing + pytest --cov=. --cov-report=xml --cov-report=term-missing tests/ diff --git a/hub-server/tests/test_hub_server.py b/hub-server/tests/test_hub_server.py index de56703..73c6c25 100644 --- a/hub-server/tests/test_hub_server.py +++ b/hub-server/tests/test_hub_server.py @@ -1,6 +1,7 @@ import pytest import http.client import threading +import socket from unittest.mock import MagicMock from hub_server import EfergyHTTPServer, FakeEfergyServer @@ -14,96 +15,98 @@ def mock_mqtt(): @pytest.fixture def test_server(mock_db, mock_mqtt): - # Use a random free port - server_address = ('127.0.0.1', 0) + """ + Starts the HTTP server in a background thread and ensures clean shutdown. + """ + server_address = ('127.0.0.1', 0) # 0 = pick a free port httpd = EfergyHTTPServer(server_address, FakeEfergyServer, mock_db, mock_mqtt) port = httpd.server_port - + thread = threading.Thread(target=httpd.serve_forever) thread.daemon = True thread.start() - + + # Wait until the server socket is ready + timeout = 1.0 + while timeout > 0: + try: + with socket.create_connection(('127.0.0.1', port), timeout=0.1): + break + except ConnectionRefusedError: + timeout -= 0.1 + else: + raise RuntimeError("Server failed to start") + yield ('127.0.0.1', port) - + + # Clean shutdown httpd.shutdown() httpd.server_close() thread.join() +def http_request(host, port, method, path, body=None, headers=None): + conn = http.client.HTTPConnection(host, port, timeout=5) + try: + conn.request(method, path, body=body, headers=headers or {}) + resp = conn.getresponse() + data = resp.read() + return resp.status, data + finally: + conn.close() + + def test_get_key(test_server): host, port = test_server - conn = http.client.HTTPConnection(host, port) - conn.request("GET", "/get_key.html") - response = conn.getresponse() - assert response.status == 200 - assert response.read() == b"TT|a1bCDEFGHa1zZ\n" + status, data = http_request(host, port, "GET", "/get_key.html") + assert status == 200 + assert data == b"TT|a1bCDEFGHa1zZ\n" def test_check_key(test_server): host, port = test_server - conn = http.client.HTTPConnection(host, port) - conn.request("GET", "/check_key.html") - response = conn.getresponse() - assert response.status == 200 - assert response.read() == b"success" + status, data = http_request(host, port, "GET", "/check_key.html") + assert status == 200 + assert data == b"success" def test_404(test_server): host, port = test_server - conn = http.client.HTTPConnection(host, port) - conn.request("GET", "/unknown") - response = conn.getresponse() - assert response.status == 404 + status, data = http_request(host, port, "GET", "/unknown") + assert status == 404 def test_post_h2(test_server, mock_db, mock_mqtt): host, port = test_server - conn = http.client.HTTPConnection(host, port) - - # Example h2 payload (from test_payload_parser.py or similar) - # The payload parser expects a certain format. - payload = '741459|1|EFCT|P1,2479.98' - headers = {"Content-Type": "text/plain", "Content-Length": len(payload)} - - conn.request("POST", "/h2", body=payload, headers=headers) - response = conn.getresponse() - - assert response.status == 200 - assert response.read() == b"success" - - # Verify DB call - # parse_sensor_payload("741459|1|EFCT|P1,2479.98", "h2") should return something that results in log_data + payload = b"741459|1|EFCT|P1,2479.98" + headers = {"Content-Type": "text/plain", "Content-Length": str(len(payload))} + + status, data = http_request(host, port, "POST", "/h2", body=payload, headers=headers) + assert status == 200 + assert data == b"success" + assert mock_db.log_data.called assert mock_mqtt.publish_power.called def test_post_recjson_h1(test_server, mock_db, mock_mqtt): host, port = test_server - conn = http.client.HTTPConnection(host, port) - - # h1 sends json= - payload = 'json=AABBCCDDDDDD|694851F9|v1.0.1|{"data":[[610965,"mA","E1",33314,0,0,65535]]}|39ef0bdc14b52df375b79555f059b52f' - headers = {"Content-Type": "application/x-www-form-urlencoded", "Content-Length": len(payload)} - - conn.request("POST", "/recjson", body=payload, headers=headers) - response = conn.getresponse() - - assert response.status == 200 - assert response.read() == b"success" - + payload = b'json=AABBCCDDDDDD|694851F9|v1.0.1|{"data":[[610965,"mA","E1",33314,0,0,65535]]}|39ef0bdc14b52df375b79555f059b52f' + headers = {"Content-Type": "application/x-www-form-urlencoded", "Content-Length": str(len(payload))} + + status, data = http_request(host, port, "POST", "/recjson", body=payload, headers=headers) + assert status == 200 + assert data == b"success" + assert mock_db.log_data.called assert mock_mqtt.publish_power.called def test_post_ping(test_server): host, port = test_server - conn = http.client.HTTPConnection(host, port) - - payload = '123456|789012' - headers = {"Content-Type": "application/eh-ping", "Content-Length": len(payload)} - - conn.request("POST", "/any", body=payload, headers=headers) - response = conn.getresponse() - - assert response.status == 200 - assert response.read() == b"success" + payload = b"123456|789012" + headers = {"Content-Type": "application/eh-ping", "Content-Length": str(len(payload))} + + status, data = http_request(host, port, "POST", "/any", body=payload, headers=headers) + assert status == 200 + assert data == b"success"