From ac34f269f15323772430d8ca638a271642f89ab9 Mon Sep 17 00:00:00 2001 From: Guzz-T Date: Tue, 23 Dec 2025 10:45:26 +0100 Subject: [PATCH 1/4] Integrate the smart-home-interface into the luxtronik object --- luxtronik/__init__.py | 249 +++++++++++++++++++++++++++++++++++--- luxtronik/shi/__init__.py | 6 +- 2 files changed, 235 insertions(+), 20 deletions(-) diff --git a/luxtronik/__init__.py b/luxtronik/__init__.py index 3ae68af9..e5a68491 100755 --- a/luxtronik/__init__.py +++ b/luxtronik/__init__.py @@ -7,9 +7,9 @@ import logging import socket import struct -import threading import time +from luxtronik.common import get_host_lock from luxtronik.calculations import Calculations from luxtronik.parameters import Parameters from luxtronik.visibilities import Visibilities @@ -24,6 +24,13 @@ LUXTRONIK_SOCKET_READ_SIZE_INTEGER, LUXTRONIK_SOCKET_READ_SIZE_CHAR, ) +from luxtronik.shi import resolve_version +from luxtronik.shi.modbus import LuxtronikModbusTcpInterface +from luxtronik.shi.holdings import Holdings +from luxtronik.shi.interface import LuxtronikSmartHomeData, LuxtronikSmartHomeInterface +from luxtronik.shi.constants import ( + LUXTRONIK_DEFAULT_MODBUS_PORT, +) # endregion Imports @@ -73,7 +80,9 @@ class LuxtronikSocketInterface: """Luxtronik read/write interface via socket.""" def __init__(self, host, port=LUXTRONIK_DEFAULT_PORT): - self._lock = threading.Lock() + # Acquire a lock object for this host to ensure thread safety + self._lock = get_host_lock(host) + self._host = host self._port = port self._socket = None @@ -82,6 +91,10 @@ def __init__(self, host, port=LUXTRONIK_DEFAULT_PORT): def __del__(self): self._disconnect() + @property + def lock(self): + return self._lock + def _connect(self): """Connect the socket if not already done.""" is_none = self._socket is None @@ -108,7 +121,7 @@ def _with_lock_and_connect(self, func, *args, **kwargs): performed at any point in time. This helps to avoid issues with the Luxtronik controller, which seems unstable otherwise. """ - with self._lock: + with self.lock: self._connect() ret_val = func(*args, **kwargs) return ret_val @@ -278,38 +291,236 @@ def _read_char(self): return struct.unpack(">b", reading)[0] -class Luxtronik(LuxtronikData): +class LuxtronikAllData(LuxtronikData, LuxtronikSmartHomeData): + """ + Data-vector collection for all luxtronik data vectors. + + The collection currently consists of: + - `parameters` + - `calculations` + - `visibilities` + - `holdings` + - `inputs` + """ + + def __init__( + self, + parameters=None, + calculations=None, + visibilities=None, + holdings=None, + inputs=None, + version=None, + safe=True + ): + """ + Initialize a LuxtronikAllData instance. + + Args: + parameters (Parameters): Optional parameters data vector. If not provided, + a new `Parameters` instance is created. + calculations (Calculations): Optional calculations data vector. If not provided, + a new `Calculations` instance is created. + visibilities (Visibilities): Optional visibilities data vector. If not provided, + a new `Visibilities` instance is created. + holdings (Holdings): Optional holdings data vector. If not provided, + a new `Holdings` instance is created. + inputs (Inputs): Optional inputs data vector. If not provided, + a new `Inputs` instance is created. + version (tuple[int] | None): Version to be used for creating the data vectors. + This ensures that the data vectors only contain valid fields. + If None is passed, all available fields are added. + safe (bool): If true, prevent parameter and holding fields marked as + not secure from being written to. + """ + LuxtronikData.__init__(self, parameters, calculations, visibilities, safe) + LuxtronikSmartHomeData.__init__(self, holdings, inputs, version, safe) + +class LuxtronikInterface(LuxtronikSocketInterface, LuxtronikSmartHomeInterface): + """ + Combined interface that can be used to control both + the configuration interface and the smart home interface. + + For simplicity, only the basic functions are offered. + + Attention! It must be ensured that `LuxtronikSocketInterface` and + `LuxtronikSmartHomeInterface` do not instantiate the same fields. + Otherwise, the derivations will overwrite each other. + """ + + def __init__( + self, + host, + port_config=LUXTRONIK_DEFAULT_PORT, + port_shi=LUXTRONIK_DEFAULT_MODBUS_PORT + ): + """ + Initialize the "combined" luxtronik interface. + + Args: + host (str): Hostname or IP address of the heat pump. + port_config (int): TCP port for the config interface + (default: LUXTRONIK_DEFAULT_PORT). + port_shi (int): TCP port for the smart-home interface (via modbusTCP) + (default: LUXTRONIK_DEFAULT_MODBUS_PORT). + """ + self._lock = get_host_lock(host) + + self._host = host + LuxtronikSocketInterface.__init__(self, host, port_config) + modbus_interface = LuxtronikModbusTcpInterface(host, port_shi) + resolved_version = resolve_version(modbus_interface) + LuxtronikSmartHomeInterface.__init__(self, modbus_interface, resolved_version) + + @property + def lock(self): + return self._lock + + def create_all_data(self, safe=True): + """ + Create a data vector collection only with fields that match the stored version. + + Args: + safe (bool): If true, prevent holding fields marked as + not secure from being written to. + + Returns: + LuxtronikAllData: The created data-collection. + """ + return LuxtronikAllData(None, None, None, None, None, self._version, safe) + + def read_all(self, data=None): + """ + Read the data of all fields within the data vector collection + that are supported by the controller. + + Args: + data (LuxtronikAllData | None): Optional existing data vector collection. + If None is provided, a new instance is created. + + Returns: + LuxtronikAllData: The passed / created data vector collection. + """ + if not isinstance(data, LuxtronikAllData): + data = self.create_all_data(True) + + with self.lock: + LuxtronikSocketInterface.read(self, data) + LuxtronikSmartHomeInterface.read(self, data) + return data + + def read(self, data=None): + """ + Calls `read_all()`. Please check its documentation. + Exists mainly to standardize the various interfaces. + """ + return self.read_all(data) + + def write_all(self, data): + """ + Write the data of all fields within the data vector (collection) + that are supported by the controller. + + Args: + data (LuxtronikAllData | Parameters | Holdings): The data vector (collection) containing field data. + If None is provided, the write is aborted. + + Returns: + bool: True if no errors occurred, otherwise False. + """ + if isinstance(data, Parameters): + with self.lock: + LuxtronikSocketInterface.write(self, data) + shi_result = True + elif isinstance(data, Holdings): + with self.lock: + shi_result = LuxtronikSmartHomeInterface.write_holdings(self, data) + # Because of LuxtronikAllData(LuxtronikSmartHomeData) we must use type(..) + elif type(data) is LuxtronikSmartHomeData: + with self.lock: + shi_result = LuxtronikSmartHomeInterface.write(self, data) + elif isinstance(data, LuxtronikAllData): + with self.lock: + LuxtronikSocketInterface.write(self, data.parameters) + shi_result = LuxtronikSmartHomeInterface.write(self, data) + else: + LOGGER.warning("Abort write! No data to write provided.") + return False + return shi_result + + def write(self, data): + """ + Calls `write_all()`. Please check its documentation. + Exists mainly to standardize the various interfaces. + """ + return self.write_all(data) + + def write_and_read(self, write_data, read_data=None): + """ + Write and then read the data of all fields within the data vector collection + that are supported by the controller. + + Args: + data (LuxtronikAllData): The data vector collection containing field data. + If None is provided, the write and read is aborted. + + Returns: + bool: True if no errors occurred, otherwise False. + """ + with self.lock: + self.write_all(write_data) + data = self.read_all(read_data) + return data + + +class Luxtronik(LuxtronikAllData): """ Wrapper around the data and the read/write interface. Mainly to ensure backwards compatibility of the read/write interface to other projects. """ - def __init__(self, host, port=LUXTRONIK_DEFAULT_PORT, safe=True): - super().__init__(safe=safe) - self.interface = LuxtronikSocketInterface(host, port) + def __init__( + self, + host, + port=LUXTRONIK_DEFAULT_PORT, + safe=True, + port_shi=LUXTRONIK_DEFAULT_MODBUS_PORT + ): + self._interface = LuxtronikInterface(host, port, port_shi) + super().__init__(version=self._interface.version, safe=safe) self.read() + @property + def interface(self): + return self._interface + def read(self): - return self.interface.read(self) + return self._interface.read(self) def read_parameters(self): - return self.interface.read_parameters(self.parameters) + return self._interface.read_parameters(self.parameters) def read_calculations(self): - return self.interface.read_calculations(self.calculations) + return self._interface.read_calculations(self.calculations) def read_visibilities(self): - return self.interface.read_visibilities(self.visibilities) + return self._interface.read_visibilities(self.visibilities) - def write(self, parameters=None): - if parameters is None: - self.interface.write(self.parameters) + def read_holdings(self): + return self._interface.read_holdings(self.holdings) + + def read_inputs(self): + return self._interface.read_inputs(self.inputs) + + def write(self, data=None): + if data is None: + return self._interface.write(self) else: - self.interface.write(parameters) + return self._interface.write(data) - def write_and_read(self, parameters=None): - if parameters is None: - return self.interface.write_and_read(self.parameters, self) + def write_and_read(self, data=None): + if data is None: + return self._interface.write_and_read(self, self) else: - return self.interface.write_and_read(parameters, self) + return self._interface.write_and_read(data, self) \ No newline at end of file diff --git a/luxtronik/shi/__init__.py b/luxtronik/shi/__init__.py index d37ae089..18157ce2 100644 --- a/luxtronik/shi/__init__.py +++ b/luxtronik/shi/__init__.py @@ -10,7 +10,11 @@ LUXTRONIK_LATEST_SHI_VERSION, ) from luxtronik.shi.common import LOGGER, parse_version -from luxtronik.shi.inputs import INPUTS_DEFINITIONS +# Skip ruff unused-import (F401) by using "as" +from luxtronik.shi.inputs import Inputs as Inputs +from luxtronik.shi.inputs import INPUTS_DEFINITIONS as INPUTS_DEFINITIONS +from luxtronik.shi.holdings import Holdings as Holdings +from luxtronik.shi.holdings import HOLDINGS_DEFINITIONS as HOLDINGS_DEFINITIONS from luxtronik.shi.modbus import LuxtronikModbusTcpInterface from luxtronik.shi.interface import LuxtronikSmartHomeInterface From 20f995625a8394b0728c67635f8846ff254c8eb1 Mon Sep 17 00:00:00 2001 From: Guzz-T Date: Tue, 23 Dec 2025 10:45:48 +0100 Subject: [PATCH 2/4] Add scripts for the smart-home-interface --- luxtronik/scripts/__init__.py | 29 +++++++- luxtronik/scripts/discover_shi.py | 98 ++++++++++++++++++++++++++++ luxtronik/scripts/dump_shi.py | 40 ++++++++++++ luxtronik/scripts/performance_shi.py | 89 +++++++++++++++++++++++++ luxtronik/scripts/watch_shi.py | 85 ++++++++++++++++++++++++ 5 files changed, 340 insertions(+), 1 deletion(-) create mode 100644 luxtronik/scripts/discover_shi.py create mode 100644 luxtronik/scripts/dump_shi.py create mode 100644 luxtronik/scripts/performance_shi.py create mode 100644 luxtronik/scripts/watch_shi.py diff --git a/luxtronik/scripts/__init__.py b/luxtronik/scripts/__init__.py index aab72124..d5ff75b5 100644 --- a/luxtronik/scripts/__init__.py +++ b/luxtronik/scripts/__init__.py @@ -1 +1,28 @@ -"""Luxtronik helper scripts.""" +"""Luxtronik script helper.""" + +import argparse + +def create_default_args_parser(func_desc, default_port): + parser = argparse.ArgumentParser(description=func_desc) + parser.add_argument("ip", help="IP address of Luxtronik controller to connect to") + parser.add_argument( + "port", + nargs="?", + type=int, + default=default_port, + help="Port to use to connect to Luxtronik controller", + ) + return parser + +def print_dump_header(caption): + print("=" * 130) + print(f"{' ' + caption + ' ': ^120}") + print("=" * 130) + +def print_dump_row(number, field): + print(f"Number: {number:<5} Name: {field.name:<60} " + f"Type: {field.__class__.__name__:<20} Value: {field}") + +def print_watch_header(caption): + print("=" * 130) + print(caption) + print("=" * 130) diff --git a/luxtronik/scripts/discover_shi.py b/luxtronik/scripts/discover_shi.py new file mode 100644 index 00000000..a9d539f5 --- /dev/null +++ b/luxtronik/scripts/discover_shi.py @@ -0,0 +1,98 @@ +#! /usr/bin/env python3 +# pylint: disable=invalid-name +""" +Script to scan all inputs/holdings of the Smart-Home-Interface. +Only undefined but existing fields will be dumped. +""" + +import logging + +from luxtronik.scripts import ( + create_default_args_parser, + print_dump_header, + print_dump_row +) +from luxtronik.datatypes import Unknown +from luxtronik.shi.constants import LUXTRONIK_DEFAULT_MODBUS_PORT +from luxtronik.shi.modbus import LuxtronikModbusTcpInterface +from luxtronik.shi.inputs import INPUTS_DEFINITIONS +from luxtronik.shi.holdings import HOLDINGS_DEFINITIONS + +logging.disable(logging.CRITICAL) + +def get_undefined(definitions, start, count): + skip_count = 0 + undefined = [] + for i in range(start, start + count): + # Skip addresses that belongs to a previous field + if skip_count > 0: + skip_count -= 1 + continue + definition = definitions[i] + # Add unknown + if definition is None: + undefined.append((i, None, Unknown(f"unknown_{definitions.name}_{i}", False))) + else: + skip_count = definition.count - 1 + return undefined + +def get_defined(definitions): + defined = [] + for definition in definitions: + defined.append((definition.index, definition, definition.create_field())) + return defined + +def dump_undefined(undefined, offset, read_cb): + for number, _, field in undefined: + print(f"Number: {number:<5}", end="\r") + data = read_cb(number + offset, 1) + if data is not None: + field.raw = data[0] + print_dump_row(number, field) + +def dump_defined(defined, offset, read_cb): + for number, definition, field in defined: + print(f"Number: {number:<5}", end="\r") + data = read_cb(number + offset, definition.count) + if data is None: + field.raw = None + print_dump_row(number, field) + +def discover_fields(definitions, start, count, read_cb): + print_dump_header(f"Undefined but existing {definitions.name}s") + undefined = get_undefined(definitions, start, count) + dump_undefined(undefined, definitions.offset, read_cb) + +def discontinue_fields(definitions, read_cb): + print_dump_header(f"Defined but not existing {definitions.name}s") + defined = get_defined(definitions) + dump_defined(defined, definitions.offset, read_cb) + +def discover_shi(): + parser = create_default_args_parser( + "Dumps all undefined but existing fields of the Smart-Home-Interface.", + LUXTRONIK_DEFAULT_MODBUS_PORT + ) + parser.add_argument( + "count", + nargs="?", + type=int, + default=1000, + help="Total number of registers to check", + ) + args = parser.parse_args() + print(f"Discover SHI of {args.ip}:{args.port}") + + client = LuxtronikModbusTcpInterface(args.ip, args.port) + + discover_fields(INPUTS_DEFINITIONS, 0, args.count, client.read_inputs) + discontinue_fields(INPUTS_DEFINITIONS, client.read_inputs) + discover_fields(HOLDINGS_DEFINITIONS, 0, args.count, client.read_holdings) + discontinue_fields(HOLDINGS_DEFINITIONS, client.read_holdings) + + # Clear last line if nothing was found + print(' '*100) + + +if __name__ == "__main__": + discover_shi() diff --git a/luxtronik/scripts/dump_shi.py b/luxtronik/scripts/dump_shi.py new file mode 100644 index 00000000..42b9452e --- /dev/null +++ b/luxtronik/scripts/dump_shi.py @@ -0,0 +1,40 @@ +#! /usr/bin/env python3 +# pylint: disable=invalid-name +""" +Script to dump all available Smart-Home-Interface values from Luxtronik controller. +""" + +import logging + +from luxtronik.scripts import ( + create_default_args_parser, + print_dump_header, + print_dump_row +) +from luxtronik.shi import create_modbus_tcp +from luxtronik.shi.constants import LUXTRONIK_DEFAULT_MODBUS_PORT + +logging.disable(logging.CRITICAL) + +def dump_fields(read_cb): + data_vector = read_cb() + print_dump_header(f"{data_vector.name}s") + for definition, field in data_vector.items(): + print_dump_row(definition.index, field) + +def dump_shi(): + parser = create_default_args_parser( + "Dumps all Smart-Home-Interface values from Luxtronik controller.", + LUXTRONIK_DEFAULT_MODBUS_PORT + ) + args = parser.parse_args() + print(f"Dump SHI of {args.ip}:{args.port}") + + shi = create_modbus_tcp(args.ip, args.port) + + dump_fields(shi.read_inputs) + dump_fields(shi.read_holdings) + + +if __name__ == "__main__": + dump_shi() diff --git a/luxtronik/scripts/performance_shi.py b/luxtronik/scripts/performance_shi.py new file mode 100644 index 00000000..7530200a --- /dev/null +++ b/luxtronik/scripts/performance_shi.py @@ -0,0 +1,89 @@ +#! /usr/bin/env python3 +# pylint: disable=invalid-name +""" +Script to measure different access methods of the Smart-Home-Interface. +""" + +import time + +from luxtronik.scripts import ( + create_default_args_parser +) +from luxtronik.shi import create_modbus_tcp +from luxtronik.shi.constants import LUXTRONIK_DEFAULT_MODBUS_PORT +from luxtronik.shi.common import LuxtronikSmartHomeReadInputsTelegram + +class TimeMeasurement: + def __init__(self): + self.duration = 0 + self._start = 0 + + def __enter__(self): + self._start = time.perf_counter() + return self + + def __exit__(self, exc_type, exc, tb): + end = time.perf_counter() + self.duration = end - self._start + +def performance_shi(): + parser = create_default_args_parser( + "Measure different access methods of the Smart-Home-Interface.", + LUXTRONIK_DEFAULT_MODBUS_PORT + ) + args = parser.parse_args() + print(f"Measure SHI performance of {args.ip}:{args.port}") + + shi = create_modbus_tcp(args.ip, args.port) + client = shi._interface + num_inputs = len(shi.inputs) + + with TimeMeasurement() as t: + client._connect() + for _ in range(0, 100): + client._client.read_input_registers(10002, 1) + client._disconnect() + print(f"Read inputs with bare modbus interface: {100 / t.duration:.1f} fields/s") + + with TimeMeasurement() as t: + for _ in range(0, 100): + client.read_inputs(10002, 1) + print(f"Read inputs one after another with re-connect every time: {100 / t.duration:.1f} fields/s") + + with TimeMeasurement() as t: + telegrams = [] + for _ in range(0, 100): + telegrams.append(LuxtronikSmartHomeReadInputsTelegram(10002, 1)) + client.send(telegrams) + print(f"Read inputs within one telegram list: {100 / t.duration:.1f} fields/s") + + with TimeMeasurement() as t: + telegrams = [] + for definition in shi.inputs: + telegrams.append(LuxtronikSmartHomeReadInputsTelegram(definition.addr, definition.count)) + for _ in range(0, 10): + client.send(telegrams) + print(f"Read whole input vector field by field: {(10 * num_inputs) / t.duration:.1f} fields/s") + + with TimeMeasurement() as t: + inputs = shi.create_inputs() + for _ in range(0, 10): + shi.read_inputs(inputs) + print(f"Read whole input vector with data blocks (same data vector): {(10 * num_inputs) / t.duration:.1f} fields/s") + + with TimeMeasurement() as t: + for _ in range(0, 10): + shi.read_inputs() + print(f"Read whole input vector with data blocks (create new data vectors): {(10 * num_inputs) / t.duration:.1f} fields/s") + + with TimeMeasurement() as t: + inputs = shi.create_inputs() + for _ in range(0, 5): + for _ in range(0, 5): + shi.collect_inputs(inputs) + shi.send() + print(f"Collect and send multiple input vectors: {(5 * 5 * num_inputs) / t.duration:.1f} fields/s") + + +if __name__ == "__main__": + performance_shi() diff --git a/luxtronik/scripts/watch_shi.py b/luxtronik/scripts/watch_shi.py new file mode 100644 index 00000000..713ff5b4 --- /dev/null +++ b/luxtronik/scripts/watch_shi.py @@ -0,0 +1,85 @@ +#! /usr/bin/env python3 +# pylint: disable=invalid-name, disable=too-many-locals +""" +Script to watch all value changes from the Smart-Home-Interface of the Luxtronik controller +""" + +from collections import OrderedDict +import logging +#import select +#import sys +import time + +from luxtronik.scripts import ( + create_default_args_parser, + print_watch_header +) +from luxtronik.shi import create_modbus_tcp +from luxtronik.shi.constants import LUXTRONIK_DEFAULT_MODBUS_PORT + +logging.disable(logging.CRITICAL) + +def update_changes(changes, this_data_vector, prev_data_vector): + for definition, this_field in this_data_vector.items(): + short_name = this_data_vector.name[:4] + number = definition.index + key = f"{short_name}_{str(number).zfill(5)}" + prev_field = prev_data_vector.get(number) + if this_field.raw != prev_field.raw: + changes[key] = ( + f"{short_name}: Number: {number:<5} Name: {prev_field.name:<60} " + f"Value: {prev_field} -> {this_field}" + ) + elif key in changes: + changes[key] = ( + f"{short_name}: Number: {number:<5} Name: {prev_field.name:<60} " + f"Value: {prev_field}" + ) + +def watch_shi(): + parser = create_default_args_parser( + "Watch all value changes from the Smart-Home-Interface of the Luxtronik controller.", + LUXTRONIK_DEFAULT_MODBUS_PORT + ) + args = parser.parse_args() + + shi = create_modbus_tcp(args.ip, args.port) + + prev_data = shi.read() + this_data = shi.create_data() + changes = {} + + print("\033[2J") # clear screen + + while True: + # Get new data + shi.read(this_data) + + # Compare this values with the initial values + # and add changes to dictionary + update_changes(changes, this_data.inputs, prev_data.inputs) + update_changes(changes, this_data.holdings, prev_data.holdings) + + # Print changes + print("\033[H") # Go-to home, line 0 + #print_watch_header(f"Watch SHI of {args.ip}:{args.port}: Press a key and enter to: q = quit; r = reset") + print_watch_header(f"Watch SHI of {args.ip}:{args.port}") + sorted_changes = OrderedDict(sorted(changes.items())) + for key, values in sorted_changes.items(): + print(values + "\033[0K") # clear residual line + print("\n") + + # Read stdin + # input, _, _ = select.select([sys.stdin], [], [], 0.1) + # if input: + # key = sys.stdin.read(1) + # if key == 'q': + # break + # elif key == 'r': + # prev_data = client.read() + # changes = {} + # print("\033[2J") # clear screen + + time.sleep(1) + + +if __name__ == "__main__": + watch_shi() From ad55b8712c9ff449ece9a4c4fdb68df048c97c54 Mon Sep 17 00:00:00 2001 From: Guzz-T Date: Tue, 23 Dec 2025 10:46:15 +0100 Subject: [PATCH 3/4] Add unit tests for the smart-home-interface integration --- tests/test_Luxtronik.py | 326 +++++++++++++++++++++++++++++++ tests/test_LuxtronikData.py | 45 ++++- tests/test_socket_interaction.py | 18 ++ 3 files changed, 388 insertions(+), 1 deletion(-) create mode 100644 tests/test_Luxtronik.py diff --git a/tests/test_Luxtronik.py b/tests/test_Luxtronik.py new file mode 100644 index 00000000..96322063 --- /dev/null +++ b/tests/test_Luxtronik.py @@ -0,0 +1,326 @@ +from unittest.mock import patch + +from luxtronik.shi import LuxtronikSmartHomeInterface +from luxtronik.shi.interface import LuxtronikSmartHomeData +from luxtronik import ( + Parameters, + Holdings, + LuxtronikSocketInterface, + LuxtronikAllData, + LuxtronikInterface, + Luxtronik +) + +############################################################################### +# Fake interfaces +############################################################################### + +class FakeSocketInterface(LuxtronikSocketInterface): + + write_counter = 0 + read_counter = 0 + + @classmethod + def reset(cls): + FakeSocketInterface.write_counter = 0 + FakeSocketInterface.read_counter = 0 + + def _connect(self): + pass + + def _disconnect(self): + pass + + def read(self, data): + FakeSocketInterface.read_parameters(self, data.parameters) + FakeSocketInterface.read_visibilities(self, data.visibilities) + FakeSocketInterface.read_calculations(self, data.calculations) + + def read_parameters(self, parameters): + parameters.get(0).raw = 2 + FakeSocketInterface.read_counter += 1 + + def read_visibilities(self, visibilities): + visibilities.get(0).raw = 4 + FakeSocketInterface.read_counter += 1 + + def read_calculations(self, calculations): + calculations.get(0).raw = 6 + FakeSocketInterface.read_counter += 1 + + def write(self, data): + FakeSocketInterface.write_counter += 1 + +class FakeShiInterface(LuxtronikSmartHomeInterface): + + write_counter = 0 + read_counter = 0 + + @classmethod + def reset(cls): + FakeShiInterface.write_counter = 0 + FakeShiInterface.read_counter = 0 + + def read(self, data): + FakeShiInterface.read_inputs(self, data.inputs) + FakeShiInterface.read_holdings(self, data.holdings) + + def read_inputs(self, inputs): + inputs[0].raw = 3 + FakeShiInterface.read_counter += 1 + + def read_holdings(self, holdings): + holdings[1].raw = 5 + FakeShiInterface.read_counter += 1 + + def write(self, data): + return FakeShiInterface.write_holdings(self, data.holdings) + + def write_holdings(self, holdings): + FakeShiInterface.write_counter += 1 + return True + +def fake_resolve_version(modbus_interface): + return (3, 99, 11, 0) + +############################################################################### +# Tests +############################################################################### + +@patch("luxtronik.LuxtronikSocketInterface", FakeSocketInterface) +@patch("luxtronik.LuxtronikSocketInterface._connect", FakeSocketInterface._connect) +@patch("luxtronik.LuxtronikSocketInterface._disconnect", FakeSocketInterface._disconnect) +@patch("luxtronik.LuxtronikSocketInterface.read_parameters", FakeSocketInterface.read_parameters) +@patch("luxtronik.LuxtronikSocketInterface.read_visibilities", FakeSocketInterface.read_visibilities) +@patch("luxtronik.LuxtronikSocketInterface.read_calculations", FakeSocketInterface.read_calculations) +@patch("luxtronik.LuxtronikSmartHomeInterface", FakeShiInterface) +@patch("luxtronik.LuxtronikSmartHomeInterface.read_inputs", FakeShiInterface.read_inputs) +@patch("luxtronik.LuxtronikSmartHomeInterface.read_holdings", FakeShiInterface.read_holdings) +@patch("luxtronik.resolve_version", fake_resolve_version) +class TestLuxtronik: + + def test_if_init(self): + lux = LuxtronikInterface('host', 1234, 5678) + + assert lux._host == 'host' + assert lux._port == 1234 + assert lux._interface._client._port == 5678 + assert lux.version == (3, 99, 11, 0) + + def test_if_lock(self): + lux = LuxtronikInterface('host', 1234, 5678) + lux.lock.acquire(blocking=False) + lux.lock.acquire(blocking=False) + lux.lock.release() + lux.lock.release() + + def test_if_create_all_data(self): + lux = LuxtronikInterface('host', 1234, 5678) + + data = lux.create_all_data() + assert type(data) is LuxtronikAllData + + def test_if_read_all(self): + FakeSocketInterface.reset() + FakeShiInterface.reset() + lux = LuxtronikInterface('host', 1234, 5678) + + assert FakeSocketInterface.read_counter == 0 + assert FakeShiInterface.read_counter == 0 + + data1 = lux.read_all() + assert type(data1) is LuxtronikAllData + assert data1.parameters.get(0).raw == 2 + assert data1.inputs[0].raw == 3 + assert FakeSocketInterface.read_counter == 3 + assert FakeShiInterface.read_counter == 2 + + data2 = lux.read(data1) + assert data1 == data2 + assert FakeSocketInterface.read_counter == 6 + assert FakeShiInterface.read_counter == 4 + + def test_if_write_all(self): + FakeSocketInterface.reset() + FakeShiInterface.reset() + lux = LuxtronikInterface('host', 1234, 5678) + + assert FakeSocketInterface.write_counter == 0 + assert FakeShiInterface.write_counter == 0 + + p = Parameters() + result = lux.write_all(p) + assert result + assert FakeSocketInterface.write_counter == 1 + assert FakeShiInterface.write_counter == 0 + + d = LuxtronikAllData() + data = lux.write_and_read(p, d) + assert data == d + assert data.inputs[0].raw == 3 + assert FakeSocketInterface.write_counter == 2 + assert FakeShiInterface.write_counter == 0 + + h = Holdings() + result = lux.write_all(h) + assert result + assert FakeSocketInterface.write_counter == 2 + assert FakeShiInterface.write_counter == 1 + + s = LuxtronikSmartHomeData() + result = lux.write(s) + assert result + assert FakeSocketInterface.write_counter == 2 + assert FakeShiInterface.write_counter == 2 + + result = lux.write_all(None) + assert not result + assert FakeSocketInterface.write_counter == 2 + assert FakeShiInterface.write_counter == 2 + + d = LuxtronikAllData() + data = lux.write_and_read(d, d) + assert data == d + assert data.inputs[0].raw == 3 + assert FakeSocketInterface.write_counter == 3 + assert FakeShiInterface.write_counter == 3 + + def test_lux_init(self): + lux = Luxtronik('host', 1234, 5678) + + assert isinstance(lux, LuxtronikAllData) + assert isinstance(lux.interface, LuxtronikInterface) + + def test_read(self): + FakeSocketInterface.reset() + FakeShiInterface.reset() + lux = Luxtronik('host', 1234, 5678) + + assert FakeSocketInterface.read_counter == 3 + assert FakeShiInterface.read_counter == 2 + + lux.read() + assert lux.parameters.get(0).raw == 2 + assert lux.inputs[0].raw == 3 + assert FakeSocketInterface.read_counter == 6 + assert FakeShiInterface.read_counter == 4 + + def test_read_parameters(self): + FakeSocketInterface.reset() + FakeShiInterface.reset() + lux = Luxtronik('host', 1234, 5678) + + assert FakeSocketInterface.read_counter == 3 + assert FakeShiInterface.read_counter == 2 + + lux.read_parameters() + assert lux.parameters.get(0).raw == 2 + assert lux.inputs[0].raw == 3 + assert FakeSocketInterface.read_counter == 4 + assert FakeShiInterface.read_counter == 2 + + def test_read_visibilities(self): + FakeSocketInterface.reset() + FakeShiInterface.reset() + lux = Luxtronik('host', 1234, 5678) + + assert FakeSocketInterface.read_counter == 3 + assert FakeShiInterface.read_counter == 2 + + lux.read_visibilities() + assert lux.visibilities.get(0).raw == 4 + assert lux.inputs[0].raw == 3 + assert FakeSocketInterface.read_counter == 4 + assert FakeShiInterface.read_counter == 2 + + def test_read_calculations(self): + FakeSocketInterface.reset() + FakeShiInterface.reset() + lux = Luxtronik('host', 1234, 5678) + + assert FakeSocketInterface.read_counter == 3 + assert FakeShiInterface.read_counter == 2 + + lux.read_calculations() + assert lux.calculations.get(0).raw == 6 + assert lux.inputs[0].raw == 3 + assert FakeSocketInterface.read_counter == 4 + assert FakeShiInterface.read_counter == 2 + + def test_read_inputs(self): + FakeSocketInterface.reset() + FakeShiInterface.reset() + lux = Luxtronik('host', 1234, 5678) + + assert FakeSocketInterface.read_counter == 3 + assert FakeShiInterface.read_counter == 2 + + lux.read_inputs() + assert lux.parameters.get(0).raw == 2 + assert lux.inputs[0].raw == 3 + assert FakeSocketInterface.read_counter == 3 + assert FakeShiInterface.read_counter == 3 + + def test_read_holdings(self): + FakeSocketInterface.reset() + FakeShiInterface.reset() + lux = Luxtronik('host', 1234, 5678) + + assert FakeSocketInterface.read_counter == 3 + assert FakeShiInterface.read_counter == 2 + + lux.read_holdings() + assert lux.parameters.get(0).raw == 2 + assert lux.holdings[1].raw == 5 + assert FakeSocketInterface.read_counter == 3 + assert FakeShiInterface.read_counter == 3 + + def test_write(self): + FakeSocketInterface.reset() + FakeShiInterface.reset() + lux = Luxtronik('host', 1234, 5678) + + assert FakeSocketInterface.write_counter == 0 + assert FakeSocketInterface.read_counter == 3 + assert FakeShiInterface.write_counter == 0 + assert FakeShiInterface.read_counter == 2 + + result = lux.write() + assert result + assert FakeSocketInterface.write_counter == 1 + assert FakeSocketInterface.read_counter == 3 + assert FakeShiInterface.write_counter == 1 + assert FakeShiInterface.read_counter == 2 + + data = LuxtronikAllData() + result = lux.write(data) + assert result + assert FakeSocketInterface.write_counter == 2 + assert FakeSocketInterface.read_counter == 3 + assert FakeShiInterface.write_counter == 2 + assert FakeShiInterface.read_counter == 2 + + def test_write_and_read(self): + FakeSocketInterface.reset() + FakeShiInterface.reset() + lux = Luxtronik('host', 1234, 5678) + + assert FakeSocketInterface.write_counter == 0 + assert FakeSocketInterface.read_counter == 3 + assert FakeShiInterface.write_counter == 0 + assert FakeShiInterface.read_counter == 2 + + result = lux.write_and_read() + assert result + assert FakeSocketInterface.write_counter == 1 + assert FakeSocketInterface.read_counter == 6 + assert FakeShiInterface.write_counter == 1 + assert FakeShiInterface.read_counter == 4 + + data = LuxtronikAllData() + result = lux.write_and_read(data) + assert result + assert FakeSocketInterface.write_counter == 2 + assert FakeSocketInterface.read_counter == 9 + assert FakeShiInterface.write_counter == 2 + assert FakeShiInterface.read_counter == 6 diff --git a/tests/test_LuxtronikData.py b/tests/test_LuxtronikData.py index 2ea4c40f..6189b0ca 100644 --- a/tests/test_LuxtronikData.py +++ b/tests/test_LuxtronikData.py @@ -1,6 +1,16 @@ """Test suite for LuxtronikData""" -from luxtronik import LuxtronikData, Parameters, Calculations, Visibilities +from luxtronik import ( + LuxtronikData, + LuxtronikAllData, + Parameters, + Calculations, + Visibilities +) +from luxtronik.shi import ( + Inputs, + Holdings +) class TestLuxtronikData: @@ -47,3 +57,36 @@ def test_get_firmware_version(self): a.calculations.get(84).raw = ord("1") assert a.get_firmware_version() == "V3.1" + + + +class TestLuxtronikAllData: + """Test suite for LuxtronikAllData datatype""" + + def test_init(self): + """Test cases for __init__""" + + a = LuxtronikAllData() + assert a.parameters.safe + + b = LuxtronikAllData(safe=False) + assert not b.parameters.safe + + para = Parameters() + hold = Holdings() + c = LuxtronikAllData(para, holdings=hold) + assert c.parameters == para + assert c.holdings == hold + assert a.parameters != para + assert a.holdings != hold + + calc = Calculations() + visi = Visibilities() + inpu = Inputs() + d = LuxtronikAllData(calculations=calc, visibilities=visi, inputs=inpu) + assert d.calculations == calc + assert d.visibilities == visi + assert d.inputs == inpu + assert c.calculations != calc + assert c.visibilities != visi + assert c.inputs != inpu diff --git a/tests/test_socket_interaction.py b/tests/test_socket_interaction.py index e7e2f60b..2a7816d9 100644 --- a/tests/test_socket_interaction.py +++ b/tests/test_socket_interaction.py @@ -144,6 +144,23 @@ def recv(self, cnt, flag=0): return data +class FakeModbus: + + def __init__( + self, + host, + port=0, + timeout=0 + ): + self._connected = False + self._blocking = False + + def read_inputs(self, addr, count): + return [addr + i for i in range(count)] + + def send(self, data): + return True + class TestSocketInteraction: def check_luxtronik_data(self, lux, check_for_true=True): @@ -239,6 +256,7 @@ def test_luxtronik_socket_interface(self): assert len(p.queue) == 0 assert self.check_luxtronik_data(d) + @mock.patch("luxtronik.LuxtronikModbusTcpInterface", FakeModbus) def test_luxtronik(self): host = "my_heatpump" port = 4711 From ddf0d0734b28a1b73893489290356ccabd074a6d Mon Sep 17 00:00:00 2001 From: Guzz-T Date: Sat, 27 Dec 2025 18:25:12 +0100 Subject: [PATCH 4/4] Use the spelling "smart home interface" consistently --- luxtronik/__init__.py | 2 +- luxtronik/datatypes.py | 8 ++++---- luxtronik/definitions/holdings.py | 2 +- luxtronik/definitions/inputs.py | 2 +- luxtronik/scripts/discover_shi.py | 4 ++-- luxtronik/scripts/dump_shi.py | 4 ++-- luxtronik/scripts/performance_shi.py | 4 ++-- luxtronik/scripts/watch_shi.py | 4 ++-- luxtronik/shi/README.md | 2 +- luxtronik/shi/__init__.py | 2 +- luxtronik/shi/common.py | 2 +- luxtronik/shi/constants.py | 4 ++-- luxtronik/shi/interface.py | 2 +- luxtronik/shi/vector.py | 2 +- 14 files changed, 22 insertions(+), 22 deletions(-) diff --git a/luxtronik/__init__.py b/luxtronik/__init__.py index e5a68491..cfd93e3c 100755 --- a/luxtronik/__init__.py +++ b/luxtronik/__init__.py @@ -361,7 +361,7 @@ def __init__( host (str): Hostname or IP address of the heat pump. port_config (int): TCP port for the config interface (default: LUXTRONIK_DEFAULT_PORT). - port_shi (int): TCP port for the smart-home interface (via modbusTCP) + port_shi (int): TCP port for the smart home interface (via modbusTCP) (default: LUXTRONIK_DEFAULT_MODBUS_PORT). """ self._lock = get_host_lock(host) diff --git a/luxtronik/datatypes.py b/luxtronik/datatypes.py index 492f1a87..90e992fe 100755 --- a/luxtronik/datatypes.py +++ b/luxtronik/datatypes.py @@ -914,7 +914,7 @@ class ControlMode(SelectionBase): 0: "Off", # System value is used 1: "Setpoint", # Setpoint register value is used 2: "Offset", # System values + offset register value is used - 3: "Level", # System values + smart-home-interface-settings + 3: "Level", # System values + smart home interface settings # register value is used } @@ -959,13 +959,13 @@ class LevelMode(SelectionBase): codes = { 0: "Normal", # No correction 1: "Increased", # Increase the temperature by the values - # within the smart-home-interface-settings + # within the smart home interface settings # TODO: Function unknown – requires further analysis 2: "Increased2", # Increase the temperature by the values - # within the smart-home-interface-settings + # within the smart home interface settings # TODO: Function unknown – requires further analysis 3: "Decreased", # Decrease the temperature by the values - # within the smart-home-interface-settings + # within the smart home interface settings # TODO: Function unknown – requires further analysis } diff --git a/luxtronik/definitions/holdings.py b/luxtronik/definitions/holdings.py index c5b765fa..2f10fb4e 100644 --- a/luxtronik/definitions/holdings.py +++ b/luxtronik/definitions/holdings.py @@ -1,6 +1,6 @@ """ Constant list containing all 'holdings' definitions -used by the Smart Home Interface (SHI) of the Luxtronik controller. +used by the smart home interface (SHI) of the Luxtronik controller. Unlike the setting registers, these SHI register are volatile and intended for communication with smart home systems. 'Holding' registers are readable diff --git a/luxtronik/definitions/inputs.py b/luxtronik/definitions/inputs.py index c983955a..57db3954 100644 --- a/luxtronik/definitions/inputs.py +++ b/luxtronik/definitions/inputs.py @@ -1,6 +1,6 @@ """ Constant list containing all 'inputs' definitions -used by the Smart Home Interface (SHI) of the Luxtronik controller. +used by the smart home interface (SHI) of the Luxtronik controller. Unlike the setting registers, these SHI register are volatile and intended for communication with smart home systems. 'Input' register are read-only diff --git a/luxtronik/scripts/discover_shi.py b/luxtronik/scripts/discover_shi.py index a9d539f5..146379a4 100644 --- a/luxtronik/scripts/discover_shi.py +++ b/luxtronik/scripts/discover_shi.py @@ -1,7 +1,7 @@ #! /usr/bin/env python3 # pylint: disable=invalid-name """ -Script to scan all inputs/holdings of the Smart-Home-Interface. +Script to scan all inputs/holdings of the smart home interface. Only undefined but existing fields will be dumped. """ @@ -70,7 +70,7 @@ def discontinue_fields(definitions, read_cb): def discover_shi(): parser = create_default_args_parser( - "Dumps all undefined but existing fields of the Smart-Home-Interface.", + "Dumps all undefined but existing fields of the smart home interface.", LUXTRONIK_DEFAULT_MODBUS_PORT ) parser.add_argument( diff --git a/luxtronik/scripts/dump_shi.py b/luxtronik/scripts/dump_shi.py index 42b9452e..38290f41 100644 --- a/luxtronik/scripts/dump_shi.py +++ b/luxtronik/scripts/dump_shi.py @@ -1,7 +1,7 @@ #! /usr/bin/env python3 # pylint: disable=invalid-name """ -Script to dump all available Smart-Home-Interface values from Luxtronik controller. +Script to dump all available smart home interface values from Luxtronik controller. """ import logging @@ -24,7 +24,7 @@ def dump_fields(read_cb): def dump_shi(): parser = create_default_args_parser( - "Dumps all Smart-Home-Interface values from Luxtronik controller.", + "Dumps all smart home interface values from Luxtronik controller.", LUXTRONIK_DEFAULT_MODBUS_PORT ) args = parser.parse_args() diff --git a/luxtronik/scripts/performance_shi.py b/luxtronik/scripts/performance_shi.py index 7530200a..a4415a64 100644 --- a/luxtronik/scripts/performance_shi.py +++ b/luxtronik/scripts/performance_shi.py @@ -1,7 +1,7 @@ #! /usr/bin/env python3 # pylint: disable=invalid-name """ -Script to measure different access methods of the Smart-Home-Interface. +Script to measure different access methods of the smart home interface. """ import time @@ -28,7 +28,7 @@ def __exit__(self, exc_type, exc, tb): def performance_shi(): parser = create_default_args_parser( - "Measure different access methods of the Smart-Home-Interface.", + "Measure different access methods of the smart home interface.", LUXTRONIK_DEFAULT_MODBUS_PORT ) args = parser.parse_args() diff --git a/luxtronik/scripts/watch_shi.py b/luxtronik/scripts/watch_shi.py index 713ff5b4..b3d443f1 100644 --- a/luxtronik/scripts/watch_shi.py +++ b/luxtronik/scripts/watch_shi.py @@ -1,7 +1,7 @@ #! /usr/bin/env python3 # pylint: disable=invalid-name, disable=too-many-locals """ -Script to watch all value changes from the Smart-Home-Interface of the Luxtronik controller +Script to watch all value changes from the smart home interface of the Luxtronik controller """ from collections import OrderedDict @@ -36,7 +36,7 @@ def update_changes(changes, this_data_vector, prev_data_vector): def watch_shi(): parser = create_default_args_parser( - "Watch all value changes from the Smart-Home-Interface of the Luxtronik controller.", + "Watch all value changes from the smart home interface of the Luxtronik controller.", LUXTRONIK_DEFAULT_MODBUS_PORT ) args = parser.parse_args() diff --git a/luxtronik/shi/README.md b/luxtronik/shi/README.md index c34d289f..65f6eb95 100644 --- a/luxtronik/shi/README.md +++ b/luxtronik/shi/README.md @@ -1,5 +1,5 @@ -# Smart-Home-Interface +# Smart home interface ## Introduction diff --git a/luxtronik/shi/__init__.py b/luxtronik/shi/__init__.py index 18157ce2..dea8b0df 100644 --- a/luxtronik/shi/__init__.py +++ b/luxtronik/shi/__init__.py @@ -141,6 +141,6 @@ def create_modbus_tcp( """ modbus_interface = LuxtronikModbusTcpInterface(host, port, timeout) resolved_version = resolve_version(modbus_interface, version) - LOGGER.info(f"Create smart-home-interface via modbus-TCP on {host}:{port}" + LOGGER.info(f"Create smart home interface via modbus-TCP on {host}:{port}" + f" for version {resolved_version}") return LuxtronikSmartHomeInterface(modbus_interface, resolved_version) \ No newline at end of file diff --git a/luxtronik/shi/common.py b/luxtronik/shi/common.py index 91bddfc0..b5acf196 100644 --- a/luxtronik/shi/common.py +++ b/luxtronik/shi/common.py @@ -1,6 +1,6 @@ """ Commonly used methods and classes throughout -the Luxtronik Smart Home Interface (SHI) module. +the Luxtronik smart home interface (SHI) module. """ import logging diff --git a/luxtronik/shi/constants.py b/luxtronik/shi/constants.py index cc1702d9..338539d1 100644 --- a/luxtronik/shi/constants.py +++ b/luxtronik/shi/constants.py @@ -1,4 +1,4 @@ -"""Constants used throughout the Luxtronik Smart Home Interface (SHI) module.""" +"""Constants used throughout the Luxtronik smart home interface (SHI) module.""" from typing import Final @@ -25,7 +25,7 @@ # have been returning this value (0x7FFF) LUXTRONIK_VALUE_FUNCTION_NOT_AVAILABLE: Final = 32767 -# First Luxtronik firmware version that supports the Smart Home Interface (SHI) +# First Luxtronik firmware version that supports the smart home interface (SHI) LUXTRONIK_FIRST_VERSION_WITH_SHI: Final = (3, 90, 1, 0) # Latest supported Luxtronik firmware version diff --git a/luxtronik/shi/interface.py b/luxtronik/shi/interface.py index c68eae00..88ac6032 100644 --- a/luxtronik/shi/interface.py +++ b/luxtronik/shi/interface.py @@ -32,7 +32,7 @@ class LuxtronikSmartHomeData: Data-vector collection for all smart home interface data vectors. Holds both the `holdings` and `inputs` data structures that represent - the smart-home data exposed by the Luxtronik controller. + the smart home data exposed by the Luxtronik controller. """ def __init__( diff --git a/luxtronik/shi/vector.py b/luxtronik/shi/vector.py index 6ea85b05..510f0df2 100644 --- a/luxtronik/shi/vector.py +++ b/luxtronik/shi/vector.py @@ -16,7 +16,7 @@ class DataVectorSmartHome(DataVector): """ - Specialized DataVector for Luxtronik Smart Home fields. + Specialized DataVector for Luxtronik smart home fields. Provides access to fields by name, index or alias. To use aliases, they must first be registered here (locally = only valid