Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ jobs:
- name: Install dependencies
run: |
python -m pip install --upgrade pip
if [ -f requirements.txt ]; then pip install -r requirements.txt; fi
if [ -f hub-server/requirements-dev.txt ]; then pip install -r hub-server/requirements-dev.txt; fi

- name: Run unit tests
- name: Run pytest
working-directory: hub-server
run: |
python -m unittest discover -s tests
pytest --cov=. --cov-report=xml --cov-report=term-missing tests/
6 changes: 5 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,18 @@ services:
TZ: Australia/Brisbane
# Logging
LOG_LEVEL: INFO
# How many months of readings and aggregated values to keep, 0 = keep everything
HISTORY_RETENTION_MONTHS: 0
# MQTT
MQTT_ENABLED: true
MQTT_BROKER: homeassistant.local
MQTT_PORT: 1883
MQTT_USER: mqtt-broker-username-here
MQTT_PASS: your-password-here
# Home Assistant Discovery
# Home Assistant
HA_DISCOVERY: true
# Setting to true will reset the cumulative energy value each month
ENERGY_MONTHLY_RESET: false

legacy-nginx:
# build: ./legacy-nginx
Expand Down
10 changes: 10 additions & 0 deletions hub-server/aggregator.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import logging
import threading
import time
from database import Database
from mqtt_manager import MQTTManager
from config import HISTORY_RETENTION_MONTHS


class Aggregator:
Expand All @@ -11,6 +13,7 @@ def __init__(self, database: Database, mqtt_manager: MQTTManager, interval_sec=3
self.interval_sec = interval_sec
self._stop_event = threading.Event()
self._thread = None
self._last_truncation_ts = 0


def aggregate_loop(self):
Expand All @@ -21,6 +24,13 @@ def aggregate_loop(self):
"""
while not self._stop_event.is_set():
try:
# Perform history truncation check once per day
if HISTORY_RETENTION_MONTHS > 0:
now = time.time()
if now - self._last_truncation_ts >= 86400:
self.database.truncate_old_data(HISTORY_RETENTION_MONTHS)
self._last_truncation_ts = now

processed = self.database.aggregate_hours(limit_hours=1000)
logging.debug(f"Aggregator processed {processed} hours")

Expand Down
5 changes: 5 additions & 0 deletions hub-server/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@
ENERGY_VALUE_TEMPLATE = os.getenv("ENERGY_VALUE_TEMPLATE", "{{ value_json.value }}")
ENERGY_UNIT_OF_MEASUREMENT = os.getenv("ENERGY_UNIT_OF_MEASUREMENT", "kWh")

ENERGY_MONTHLY_RESET = os.getenv("ENERGY_MONTHLY_RESET", "false").lower() in ("true", "1", "yes", "on")

# History retention in months (0 means keep everything)
HISTORY_RETENTION_MONTHS = int(os.getenv("HISTORY_RETENTION_MONTHS", "0"))

DEVICE_NAME = os.getenv("DEVICE_NAME", "Efergy Hub")
DEVICE_IDENTIFIERS = os.getenv("DEVICE_IDENTIFIERS", ["efergy"])
DEVICE_MANUFACTURER = os.getenv("DEVICE_MANUFACTURER", "Efergy")
Expand Down
71 changes: 69 additions & 2 deletions hub-server/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@
import threading
import time
import sqlite3
from datetime import datetime
from pathlib import Path
from typing import Optional, Dict, Union
from config import (
SQLITE_TIMEOUT, POWER_FACTOR, MAINS_VOLTAGE
SQLITE_TIMEOUT, POWER_FACTOR, MAINS_VOLTAGE, ENERGY_MONTHLY_RESET
)


Expand Down Expand Up @@ -180,17 +181,82 @@ def get_all_labels(self):


def get_total_energy(self) -> float:
"""
Return the sum of energy.
If ENERGY_MONTHLY_RESET is True, it returns the sum for the current month only.
Otherwise, it returns the absolute total energy.
"""
try:
query = "SELECT SUM(kwh) FROM energy_hourly"
params = ()

if ENERGY_MONTHLY_RESET:
# Calculate start of current month
now = datetime.now()
first_day_of_month = now.replace(day=1, hour=0, minute=0, second=0, microsecond=0)
month_start_ts = int(first_day_of_month.timestamp())
query += " WHERE hour_start >= ?"
params = (month_start_ts,)

with sqlite3.connect(self.db_path, timeout=SQLITE_TIMEOUT) as conn:
cursor = conn.cursor()
cursor.execute("SELECT SUM(kwh) FROM energy_hourly")
cursor.execute(query, params)
row = cursor.fetchone()
return float(row[0]) if row and row[0] else 0.0
except Exception as e:
logging.error(f"Failed to compute total energy: {e}")
return 0.0


def truncate_old_data(self, months: int) -> int:
"""
Truncates data older than the specified number of months.
Deletes from 'readings' and 'energy_hourly'.

Args:
months: Number of months of history to keep.

Returns:
Number of rows deleted (total).
"""
if months <= 0:
return 0

try:
now = datetime.now()
year, month = divmod(now.month - months - 1, 12)
month += 1
year = now.year + year

cutoff_date = now.replace(year=year, month=month, day=1, hour=0, minute=0, second=0, microsecond=0)
cutoff_ts = int(cutoff_date.timestamp())

deleted_count = 0
with sqlite3.connect(self.db_path, timeout=SQLITE_TIMEOUT) as conn:
cursor = conn.cursor()

# Delete from readings
cursor.execute("DELETE FROM readings WHERE timestamp < ?", (cutoff_ts,))
deleted_count += cursor.rowcount

# Delete from energy_hourly
cursor.execute("DELETE FROM energy_hourly WHERE hour_start < ?", (cutoff_ts,))
deleted_count += cursor.rowcount

conn.commit()

# Reclaim space
cursor.execute("VACUUM")

if deleted_count > 0:
logging.info(f"Truncated {deleted_count} old records (older than {cutoff_date.strftime('%Y-%m-%d')})")

return deleted_count
except Exception as e:
logging.error(f"Failed to truncate old data: {e}")
return 0


# ---------------- Aggregation logic ----------------
def fetch_hour_range_to_process(self, cursor: sqlite3.Cursor) -> Optional[int]:
"""
Expand Down Expand Up @@ -219,6 +285,7 @@ def fetch_hour_range_to_process(self, cursor: sqlite3.Cursor) -> Optional[int]:

return last_hour_done + 3600


def aggregate_one_hour(self, cursor: sqlite3.Cursor, hour_start: int) -> Optional[float]:
"""
Aggregate a single hour [hour_start, hour_start+3600) and return kwh inserted,
Expand Down
3 changes: 3 additions & 0 deletions hub-server/requirements-dev.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
paho-mqtt >= 2.1.0
pytest >= 8.4.1
pytest-cov >= 7.0.0
92 changes: 92 additions & 0 deletions hub-server/tests/test_aggregator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
import pytest
import time
from unittest.mock import MagicMock, patch
from aggregator import Aggregator

@pytest.fixture
def mock_db():
return MagicMock()

@pytest.fixture
def mock_mqtt():
return MagicMock()


def test_aggregator_loop_truncation(mock_db, mock_mqtt):
with patch('aggregator.HISTORY_RETENTION_MONTHS', 1):
aggregator = Aggregator(mock_db, mock_mqtt, interval_sec=0.1)

mock_db.get_total_energy.return_value = 12.34

def side_effect(*args, **kwargs):
if mock_db.truncate_old_data.call_count >= 1:
aggregator._stop_event.set()
return 1

mock_db.aggregate_hours.side_effect = side_effect

aggregator.start()
time.sleep(0.5)
aggregator.stop()

assert mock_db.truncate_old_data.called
assert mock_db.aggregate_hours.called
assert mock_mqtt.publish_energy.called



def test_aggregator_no_truncation_when_disabled(mock_db, mock_mqtt):
# Set HISTORY_RETENTION_MONTHS = 0
with patch('aggregator.HISTORY_RETENTION_MONTHS', 0):
aggregator = Aggregator(mock_db, mock_mqtt, interval_sec=0.1)

# Stop without joining
mock_db.aggregate_hours.side_effect = lambda **kwargs: aggregator._stop_event.set() or 1

aggregator.start()
time.sleep(0.3)
aggregator.stop()

assert not mock_db.truncate_old_data.called
assert mock_db.aggregate_hours.called
assert mock_mqtt.publish_energy.called


def test_aggregator_truncation_interval(mock_db, mock_mqtt):
# Re-testing logic by calling aggregate_loop once and checking state
with patch('aggregator.HISTORY_RETENTION_MONTHS', 1):
aggregator = Aggregator(mock_db, mock_mqtt, interval_sec=0.1)

# Patching wait so it doesn't sleep
aggregator._stop_event.wait = MagicMock()

# Mocking time to test daily truncation
# The loop calls time.time() at the beginning of each iteration.
# Iteration 1: time.time() -> 1000. (now - self._last_truncation_ts) = 1000 - 0 >= 86400 is True if we consider initial last_truncation_ts=0
# Wait... the code has self._last_truncation_ts = 0.
# If now = 1000, then 1000 - 0 = 1000. 1000 >= 86400 is FALSE.

# Let's check aggregator.py:
# if now - self._last_truncation_ts >= 86400:

# So it won't truncate on the very first run if now < 86400.

with patch('time.time', side_effect=[100000, 100000, 100000 + 86400 + 1, 300000]):
# Iteration 1: now = 100000. 100000 - 0 = 100000 >= 86400. TRUNCATE. _last_truncation_ts = 100000.
# Iteration 2: now = 100000. 100000 - 100000 = 0 < 86400. NO TRUNCATE.
# Iteration 3: now = 100000 + 86400 + 1 = 186401. 186401 - 100000 = 86401 >= 86400. TRUNCATE.

# Use a side effect to stop after 3 iterations
count = 0
def stop_side_effect(*args, **kwargs):
nonlocal count
count += 1
if count >= 3:
aggregator._stop_event.set()
return 1
mock_db.aggregate_hours.side_effect = stop_side_effect

aggregator.aggregate_loop()

assert mock_db.truncate_old_data.call_count == 2
assert aggregator._last_truncation_ts == 186401
117 changes: 117 additions & 0 deletions hub-server/tests/test_database.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
import pytest
import sqlite3
import time
from database import Database

@pytest.fixture
def db_path(tmp_path):
return tmp_path / "test_readings.db"

@pytest.fixture
def db(db_path):
database = Database(db_path)
database.setup()
return database

def test_database_setup(db_path):
db = Database(db_path)
db.setup()
assert db_path.exists()

with sqlite3.connect(db_path) as conn:
cursor = conn.cursor()
cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
tables = {row[0] for row in cursor.fetchall()}
assert "labels" in tables
assert "readings" in tables
assert "energy_hourly" in tables


def test_log_data_and_labels(db):
db.log_data("test_label", 100.0, timestamp=1000)
db.log_data("test_label", 200.0, timestamp=1100)
db.log_data("another_label", 50.0, timestamp=1200)

labels = db.get_all_labels()
assert "test_label" in labels
assert "another_label" in labels
assert len(labels) == 2

with sqlite3.connect(db.db_path) as conn:
cursor = conn.cursor()
cursor.execute("SELECT value FROM readings ORDER BY timestamp")
readings = [row[0] for row in cursor.fetchall()]
assert readings == [100.0, 200.0, 50.0]


def test_aggregate_one_hour(db):
# Hour starts at 3600
hour_start = 3600

# Add some readings in this hour
# efergy_h1 uses (PF * V * I/1000) / 1000 for kW
db.log_data("efergy_h1_test", 1000.0, timestamp=hour_start) # 1000mA -> (0.6 * 230 * 1) / 1000 = 0.138 kW
db.log_data("efergy_h1_test", 2000.0, timestamp=hour_start + 1800) # 2000mA -> 0.276 kW
db.log_data("efergy_h1_test", 1000.0, timestamp=hour_start + 3600) # Next hour, shouldn't be included in this aggregation

with sqlite3.connect(db.db_path) as conn:
cursor = conn.cursor()
kwh = db.aggregate_one_hour(cursor, hour_start)

# Calculation:
# 0.138 kW for 1800s = 0.138 * (1800/3600) = 0.069 kWh
# 0.276 kW for 1800s = 0.276 * (1800/3600) = 0.138 kWh
# Total = 0.069 + 0.138 = 0.207 kWh
# Note: the code uses (rows[i+1][0] - rows[i][0]) for interval.
# So for row 0 (ts=3600), interval = 5400 - 3600 = 1800.
# For row 1 (ts=5400), it's the last row, so it uses the same interval as previous: 1800.

expected_kwh = (0.138 * 0.5) + (0.276 * 0.5)
assert kwh == pytest.approx(expected_kwh)


def test_aggregate_hours(db):
now = int(time.time())
# Round to start of 2 hours ago to ensure we have a full hour to process
hour1_start = (now - 7200) - (now % 3600)
hour2_start = hour1_start + 3600

db.log_data("efergy_h3_test", 100.0, timestamp=hour1_start) # h3 uses value/10/1000 = 0.01 kW
db.log_data("efergy_h3_test", 100.0, timestamp=hour1_start + 3599)

processed = db.aggregate_hours()
assert processed >= 1

total_energy = db.get_total_energy()
assert total_energy > 0


def test_truncate_old_data(db):
# Log some "old" data
now = int(time.time())
two_months_ago = now - (62 * 24 * 3600)

db.log_data("old_label", 100.0, timestamp=two_months_ago)
db.log_data("new_label", 100.0, timestamp=now)

# Aggregate to have something in energy_hourly to
# Need to make sure we have a full hour for the old data
old_hour_start = two_months_ago - (two_months_ago % 3600)
db.log_data("old_label", 100.0, timestamp=old_hour_start)
db.log_data("old_label", 100.0, timestamp=old_hour_start + 3599)

with sqlite3.connect(db.db_path) as conn:
db.aggregate_one_hour(conn.cursor(), old_hour_start)
conn.commit()

# Truncate to 1 month
deleted = db.truncate_old_data(1)
assert deleted > 0

with sqlite3.connect(db.db_path) as conn:
cursor = conn.cursor()
cursor.execute("SELECT count(*) FROM readings WHERE timestamp < ?", (now - 31*24*3600,))
assert cursor.fetchone()[0] == 0

cursor.execute("SELECT count(*) FROM energy_hourly WHERE hour_start < ?", (now - 31*24*3600,))
assert cursor.fetchone()[0] == 0
Loading