Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
249 changes: 230 additions & 19 deletions luxtronik/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@
import logging
import socket
import struct
import threading
import time

from luxtronik.common import get_host_lock
from luxtronik.calculations import Calculations
from luxtronik.parameters import Parameters
from luxtronik.visibilities import Visibilities
Expand All @@ -24,6 +24,13 @@
LUXTRONIK_SOCKET_READ_SIZE_INTEGER,
LUXTRONIK_SOCKET_READ_SIZE_CHAR,
)
from luxtronik.shi import resolve_version
from luxtronik.shi.modbus import LuxtronikModbusTcpInterface
from luxtronik.shi.holdings import Holdings
from luxtronik.shi.interface import LuxtronikSmartHomeData, LuxtronikSmartHomeInterface
from luxtronik.shi.constants import (
LUXTRONIK_DEFAULT_MODBUS_PORT,
)

# endregion Imports

Expand Down Expand Up @@ -73,7 +80,9 @@ class LuxtronikSocketInterface:
"""Luxtronik read/write interface via socket."""

def __init__(self, host, port=LUXTRONIK_DEFAULT_PORT):
self._lock = threading.Lock()
# Acquire a lock object for this host to ensure thread safety
self._lock = get_host_lock(host)

self._host = host
self._port = port
self._socket = None
Expand All @@ -82,6 +91,10 @@ def __init__(self, host, port=LUXTRONIK_DEFAULT_PORT):
def __del__(self):
self._disconnect()

@property
def lock(self):
return self._lock

def _connect(self):
"""Connect the socket if not already done."""
is_none = self._socket is None
Expand All @@ -108,7 +121,7 @@ def _with_lock_and_connect(self, func, *args, **kwargs):
performed at any point in time. This helps to avoid issues with the
Luxtronik controller, which seems unstable otherwise.
"""
with self._lock:
with self.lock:
self._connect()
ret_val = func(*args, **kwargs)
return ret_val
Expand Down Expand Up @@ -278,38 +291,236 @@ def _read_char(self):
return struct.unpack(">b", reading)[0]


class Luxtronik(LuxtronikData):
class LuxtronikAllData(LuxtronikData, LuxtronikSmartHomeData):
"""
Data-vector collection for all luxtronik data vectors.

The collection currently consists of:
- `parameters`
- `calculations`
- `visibilities`
- `holdings`
- `inputs`
"""

def __init__(
self,
parameters=None,
calculations=None,
visibilities=None,
holdings=None,
inputs=None,
version=None,
safe=True
):
"""
Initialize a LuxtronikAllData instance.

Args:
parameters (Parameters): Optional parameters data vector. If not provided,
a new `Parameters` instance is created.
calculations (Calculations): Optional calculations data vector. If not provided,
a new `Calculations` instance is created.
visibilities (Visibilities): Optional visibilities data vector. If not provided,
a new `Visibilities` instance is created.
holdings (Holdings): Optional holdings data vector. If not provided,
a new `Holdings` instance is created.
inputs (Inputs): Optional inputs data vector. If not provided,
a new `Inputs` instance is created.
version (tuple[int] | None): Version to be used for creating the data vectors.
This ensures that the data vectors only contain valid fields.
If None is passed, all available fields are added.
safe (bool): If true, prevent parameter and holding fields marked as
not secure from being written to.
"""
LuxtronikData.__init__(self, parameters, calculations, visibilities, safe)
LuxtronikSmartHomeData.__init__(self, holdings, inputs, version, safe)

class LuxtronikInterface(LuxtronikSocketInterface, LuxtronikSmartHomeInterface):
"""
Combined interface that can be used to control both
the configuration interface and the smart home interface.

For simplicity, only the basic functions are offered.

Attention! It must be ensured that `LuxtronikSocketInterface` and
`LuxtronikSmartHomeInterface` do not instantiate the same fields.
Otherwise, the derivations will overwrite each other.
"""

def __init__(
self,
host,
port_config=LUXTRONIK_DEFAULT_PORT,
port_shi=LUXTRONIK_DEFAULT_MODBUS_PORT
):
"""
Initialize the "combined" luxtronik interface.

Args:
host (str): Hostname or IP address of the heat pump.
port_config (int): TCP port for the config interface
(default: LUXTRONIK_DEFAULT_PORT).
port_shi (int): TCP port for the smart home interface (via modbusTCP)
(default: LUXTRONIK_DEFAULT_MODBUS_PORT).
"""
self._lock = get_host_lock(host)

self._host = host
LuxtronikSocketInterface.__init__(self, host, port_config)
modbus_interface = LuxtronikModbusTcpInterface(host, port_shi)
resolved_version = resolve_version(modbus_interface)
LuxtronikSmartHomeInterface.__init__(self, modbus_interface, resolved_version)

@property
def lock(self):
return self._lock

def create_all_data(self, safe=True):
"""
Create a data vector collection only with fields that match the stored version.

Args:
safe (bool): If true, prevent holding fields marked as
not secure from being written to.

Returns:
LuxtronikAllData: The created data-collection.
"""
return LuxtronikAllData(None, None, None, None, None, self._version, safe)

def read_all(self, data=None):
"""
Read the data of all fields within the data vector collection
that are supported by the controller.

Args:
data (LuxtronikAllData | None): Optional existing data vector collection.
If None is provided, a new instance is created.

Returns:
LuxtronikAllData: The passed / created data vector collection.
"""
if not isinstance(data, LuxtronikAllData):
data = self.create_all_data(True)

with self.lock:
LuxtronikSocketInterface.read(self, data)
LuxtronikSmartHomeInterface.read(self, data)
return data

def read(self, data=None):
"""
Calls `read_all()`. Please check its documentation.
Exists mainly to standardize the various interfaces.
"""
return self.read_all(data)

def write_all(self, data):
"""
Write the data of all fields within the data vector (collection)
that are supported by the controller.

Args:
data (LuxtronikAllData | Parameters | Holdings): The data vector (collection) containing field data.
If None is provided, the write is aborted.

Returns:
bool: True if no errors occurred, otherwise False.
"""
if isinstance(data, Parameters):
with self.lock:
LuxtronikSocketInterface.write(self, data)
shi_result = True
elif isinstance(data, Holdings):
with self.lock:
shi_result = LuxtronikSmartHomeInterface.write_holdings(self, data)
# Because of LuxtronikAllData(LuxtronikSmartHomeData) we must use type(..)
elif type(data) is LuxtronikSmartHomeData:
with self.lock:
shi_result = LuxtronikSmartHomeInterface.write(self, data)
elif isinstance(data, LuxtronikAllData):
with self.lock:
LuxtronikSocketInterface.write(self, data.parameters)
shi_result = LuxtronikSmartHomeInterface.write(self, data)
else:
LOGGER.warning("Abort write! No data to write provided.")
return False
return shi_result

def write(self, data):
"""
Calls `write_all()`. Please check its documentation.
Exists mainly to standardize the various interfaces.
"""
return self.write_all(data)

def write_and_read(self, write_data, read_data=None):
"""
Write and then read the data of all fields within the data vector collection
that are supported by the controller.

Args:
data (LuxtronikAllData): The data vector collection containing field data.
If None is provided, the write and read is aborted.

Returns:
bool: True if no errors occurred, otherwise False.
"""
with self.lock:
self.write_all(write_data)
data = self.read_all(read_data)
return data


class Luxtronik(LuxtronikAllData):
"""
Wrapper around the data and the read/write interface.
Mainly to ensure backwards compatibility
of the read/write interface to other projects.
"""

def __init__(self, host, port=LUXTRONIK_DEFAULT_PORT, safe=True):
super().__init__(safe=safe)
self.interface = LuxtronikSocketInterface(host, port)
def __init__(
self,
host,
port=LUXTRONIK_DEFAULT_PORT,
safe=True,
port_shi=LUXTRONIK_DEFAULT_MODBUS_PORT
):
self._interface = LuxtronikInterface(host, port, port_shi)
super().__init__(version=self._interface.version, safe=safe)
self.read()

@property
def interface(self):
return self._interface

def read(self):
return self.interface.read(self)
return self._interface.read(self)

def read_parameters(self):
return self.interface.read_parameters(self.parameters)
return self._interface.read_parameters(self.parameters)

def read_calculations(self):
return self.interface.read_calculations(self.calculations)
return self._interface.read_calculations(self.calculations)

def read_visibilities(self):
return self.interface.read_visibilities(self.visibilities)
return self._interface.read_visibilities(self.visibilities)

def write(self, parameters=None):
if parameters is None:
self.interface.write(self.parameters)
def read_holdings(self):
return self._interface.read_holdings(self.holdings)

def read_inputs(self):
return self._interface.read_inputs(self.inputs)

def write(self, data=None):
if data is None:
return self._interface.write(self)
else:
self.interface.write(parameters)
return self._interface.write(data)

def write_and_read(self, parameters=None):
if parameters is None:
return self.interface.write_and_read(self.parameters, self)
def write_and_read(self, data=None):
if data is None:
return self._interface.write_and_read(self, self)
else:
return self.interface.write_and_read(parameters, self)
return self._interface.write_and_read(data, self)
8 changes: 4 additions & 4 deletions luxtronik/datatypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -914,7 +914,7 @@ class ControlMode(SelectionBase):
0: "Off", # System value is used
1: "Setpoint", # Setpoint register value is used
2: "Offset", # System values + offset register value is used
3: "Level", # System values + smart-home-interface-settings
3: "Level", # System values + smart home interface settings
# register value is used
}

Expand Down Expand Up @@ -959,13 +959,13 @@ class LevelMode(SelectionBase):
codes = {
0: "Normal", # No correction
1: "Increased", # Increase the temperature by the values
# within the smart-home-interface-settings
# within the smart home interface settings
# TODO: Function unknown – requires further analysis
2: "Increased2", # Increase the temperature by the values
# within the smart-home-interface-settings
# within the smart home interface settings
# TODO: Function unknown – requires further analysis
3: "Decreased", # Decrease the temperature by the values
# within the smart-home-interface-settings
# within the smart home interface settings
# TODO: Function unknown – requires further analysis
}

Expand Down
2 changes: 1 addition & 1 deletion luxtronik/definitions/holdings.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""
Constant list containing all 'holdings' definitions
used by the Smart Home Interface (SHI) of the Luxtronik controller.
used by the smart home interface (SHI) of the Luxtronik controller.

Unlike the setting registers, these SHI register are volatile and intended for
communication with smart home systems. 'Holding' registers are readable
Expand Down
2 changes: 1 addition & 1 deletion luxtronik/definitions/inputs.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""
Constant list containing all 'inputs' definitions
used by the Smart Home Interface (SHI) of the Luxtronik controller.
used by the smart home interface (SHI) of the Luxtronik controller.

Unlike the setting registers, these SHI register are volatile and intended for
communication with smart home systems. 'Input' register are read-only
Expand Down
29 changes: 28 additions & 1 deletion luxtronik/scripts/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,28 @@
"""Luxtronik helper scripts."""
"""Luxtronik script helper."""

import argparse

def create_default_args_parser(func_desc, default_port):
parser = argparse.ArgumentParser(description=func_desc)
parser.add_argument("ip", help="IP address of Luxtronik controller to connect to")
parser.add_argument(
"port",
nargs="?",
type=int,
default=default_port,
help="Port to use to connect to Luxtronik controller",
)
return parser

def print_dump_header(caption):
print("=" * 130)
print(f"{' ' + caption + ' ': ^120}")
print("=" * 130)

def print_dump_row(number, field):
print(f"Number: {number:<5} Name: {field.name:<60} " + f"Type: {field.__class__.__name__:<20} Value: {field}")

def print_watch_header(caption):
print("=" * 130)
print(caption)
print("=" * 130)
Loading