diff --git a/classes/Object.py b/classes/Object.py new file mode 100644 index 0000000..bd0ccfb --- /dev/null +++ b/classes/Object.py @@ -0,0 +1,3 @@ +#hack to allow generic objects +class Object(object): + pass diff --git a/classes/protocol_settings.py b/classes/protocol_settings.py index eeddfb4..1163b1a 100644 --- a/classes/protocol_settings.py +++ b/classes/protocol_settings.py @@ -1,11 +1,13 @@ import csv from dataclasses import dataclass from enum import Enum +from defs.common import strtoint import itertools import json import re import os - +import math +import ast class Data_Type(Enum): BYTE = 1 @@ -111,6 +113,9 @@ def fromString(cls, name : str): return getattr(cls, name) class Registry_Type(Enum): + ZERO = 0x00 + ''' for protocols that don't have a command / registry type ''' + HOLDING = 0x03 INPUT = 0x04 @@ -128,7 +133,6 @@ class registry_map_entry: concatenate : bool concatenate_registers : list[int] - values : list value_regex : str = "" @@ -139,7 +143,12 @@ class registry_map_entry: ''' if value needs to be concatenated with other registers''' data_type : Data_Type = Data_Type.USHORT + data_type_size : int = -1 + ''' for non-fixed size types like ASCII''' + read_command : bytes = None + ''' for transports/protocols that require sending a command ontop of "register" ''' + write_mode : WriteMode = WriteMode.READ ''' enable disable reading/writing ''' @@ -197,7 +206,7 @@ def __init__(self, protocol : str, settings_dir : str = 'protocols'): for registry_type in Registry_Type: self.load_registry_map(registry_type) - def get_registry_map(self, registry_type : Registry_Type) -> list[registry_map_entry]: + def get_registry_map(self, registry_type : Registry_Type = Registry_Type.ZERO) -> list[registry_map_entry]: return self.registry_map[registry_type] def get_registry_ranges(self, registry_type : Registry_Type) -> list[registry_map_entry]: @@ -243,30 +252,40 @@ def load__json(self, file : str = '', settings_dir : str = ''): def load__registry(self, path, registry_type : Registry_Type = Registry_Type.INPUT) -> list[registry_map_entry]: registry_map : list[registry_map_entry] = [] - register_regex = re.compile(r'(?P\d+)\.b(?P\d{1,2})') + register_regex = re.compile(r'(?Px?\d+)\.(b(?Px?\d{1,2})|(?Px?\d{1,2}))') + + data_type_regex = re.compile(r'(?P\w+)\.(?P\d+)') - range_regex = re.compile(r'(?Pr|)(?P\d+)[\-~](?P\d+)') + range_regex = re.compile(r'(?Pr|)(?Px?\d+)[\-~](?Px?\d+)') ascii_value_regex = re.compile(r'(?P^\[.+\]$)') - list_regex = re.compile(r'\s*(?:(?P\d+)-(?P\d+)|(?P[^,\s][^,]*?))\s*(?:,|$)') + list_regex = re.compile(r'\s*(?:(?Px?\d+)-(?Px?\d+)|(?P[^,\s][^,]*?))\s*(?:,|$)') if not os.path.exists(path): #return empty is file doesnt exist. return registry_map - def clean_header(iterator): - # Lowercase and strip whitespace from each item in the first row - first_row = next(iterator).lower().replace('_', ' ') - first_row = re.sub(r"\s+;|;\s+", ";", first_row) #trim values - return itertools.chain([first_row], iterator) + def determine_delimiter(first_row) -> str: + if first_row.count(';') > first_row.count(','): + return ';' + else: + return ',' with open(path, newline='', encoding='latin-1') as csvfile: #clean column names before passing to csv dict reader - csvfile = clean_header(csvfile) + + delimeter = ';' + first_row = next(csvfile).lower().replace('_', ' ') + if first_row.count(';') < first_row.count(','): + delimeter = ',' + + first_row = re.sub(r"\s+" + re.escape(delimeter) +"|" + re.escape(delimeter) +"\s+", delimeter, first_row) #trim values + + csvfile = itertools.chain([first_row], csvfile) #add clean header to begining of iterator # Create a CSV reader object - reader = csv.DictReader(clean_header(csvfile), delimiter=';') #compensate for openoffice + reader = csv.DictReader(csvfile, delimiter=delimeter) # Iterate over each row in the CSV file for row in reader: @@ -316,9 +335,15 @@ def clean_header(iterator): row['values'] = "" print("WARNING No Value Column : path: " + str(path)) + data_type_len : int = -1 #optional row, only needed for non-default data types if 'data type' in row and row['data type']: - data_type = Data_Type.fromString(row['data type']) + matches = data_type_regex.search(row['data type']) + if matches: + data_type_len = int(matches.group('length')) + data_type = Data_Type.fromString(matches.group('datatype')) + else: + data_type = Data_Type.fromString(row['data type']) #get value range for protocol analyzer @@ -348,8 +373,8 @@ def clean_header(iterator): for match in matches: groups = match.groupdict() if groups['range_start'] and groups['range_end']: - start = int(groups['range_start']) - end = int(groups['range_end']) + start = strtoint(groups['range_start']) + end = strtoint(groups['range_end']) values.extend(range(start, end + 1)) else: values.append(groups['element']) @@ -376,19 +401,32 @@ def clean_header(iterator): register : int = -1 register_bit : int = 0 + register_byte : int = -1 match = register_regex.search(row['register']) if match: - register = int(match.group('register')) - register_bit = int(match.group('bit')) + register = strtoint(match.group('register')) + + register_bit = match.group('bit') + if register_bit: + register_bit = strtoint(register_bit) + else: + register_bit = 0 + + register_byte = match.group('byte') + if register_byte: + register_byte = strtoint(register_byte) + else: + register_byte = 0 + #print("register: " + str(register) + " bit : " + str(register_bit)) else: range_match = range_regex.search(row['register']) if not range_match: - register = int(row['register']) + register = strtoint(row['register']) else: reverse = range_match.group('reverse') - start = int(range_match.group('start')) - end = int(range_match.group('end')) + start = strtoint(range_match.group('start')) + end = strtoint(range_match.group('end')) register = start if end > start: concatenate = True @@ -404,6 +442,13 @@ def clean_header(iterator): else: r = range(1) + read_command = None + if "read command" in row and row['read command']: + if row['read command'][0] == 'x': + read_command = bytes.fromhex(row['read command'][1:]) + else: + read_command = row['read command'].encode('utf-8') + writeMode : WriteMode = WriteMode.READ if "writable" in row: writeMode = WriteMode.fromString(row['writable']) @@ -413,18 +458,20 @@ def clean_header(iterator): registry_type = registry_type, register= register, register_bit=register_bit, - register_byte= -1, + register_byte= register_byte, variable_name= variable_name, documented_name = row['documented name'], unit= str(character_part), unit_mod= numeric_part, data_type= data_type, + data_type_size = data_type_len, concatenate = concatenate, concatenate_registers = concatenate_registers, values=values, value_min=value_min, value_max=value_max, value_regex=value_regex, + read_command = read_command, write_mode=writeMode ) registry_map.append(item) @@ -504,7 +551,10 @@ def load_registry_map(self, registry_type : Registry_Type, file : str = '', sett settings_dir = self.settings_dir if not file: - file = self.protocol + '.'+registry_type.name.lower()+'_registry_map.csv' + if registry_type == Registry_Type.ZERO: + file = self.protocol + '.registry_map.csv' + else: + file = self.protocol + '.'+registry_type.name.lower()+'_registry_map.csv' path = settings_dir + '/' + file @@ -520,6 +570,197 @@ def load_registry_map(self, registry_type : Registry_Type, file : str = '', sett self.registry_map_size[registry_type] = size self.registry_map_ranges[registry_type] = self.calculate_registry_ranges(self.registry_map[registry_type], self.registry_map_size[registry_type]) + def process_register_bytes(self, registry : dict[int,bytes], entry : registry_map_entry): + ''' process bytes into data''' + + register = registry[entry.register] + if entry.register_byte > 0: + register = register[entry.register_byte:] + + if entry.data_type_size > 0: + register = register[:entry.data_type_size] + + if entry.data_type == Data_Type.UINT: + value = int.from_bytes(register[:4], byteorder='big', signed=False) + elif entry.data_type == Data_Type.INT: + value = int.from_bytes(register[:4], byteorder='big', signed=True) + elif entry.data_type == Data_Type.USHORT: + value = int.from_bytes(register[:2], byteorder='big', signed=False) + elif entry.data_type == Data_Type.SHORT: + value = int.from_bytes(register[:2], byteorder='big', signed=True) + elif entry.data_type == Data_Type._16BIT_FLAGS or entry.data_type == Data_Type._8BIT_FLAGS: + #16 bit flags + start_bit : int = 0 + if entry.data_type == Data_Type._8BIT_FLAGS: + start_bit = 8 + + if entry.documented_name+'_codes' in self.protocolSettings.codes: + flags : list[str] = [] + for i in range(start_bit, 16): # Iterate over each bit position (0 to 15) + byte = i // 8 + bit = i % 8 + val = register[byte] + # Check if the i-th bit is set + if (val >> bit) & 1: + flag_index = "b"+str(i) + if flag_index in self.protocolSettings.codes[entry.documented_name+'_codes']: + flags.append(self.protocolSettings.codes[entry.documented_name+'_codes'][flag_index]) + + value = ",".join(flags) + else: + flags : list[str] = [] + for i in range(start_bit, 16): # Iterate over each bit position (0 to 15) + # Check if the i-th bit is set + if (val >> i) & 1: + flags.append("1") + else: + flags.append("0") + value = ''.join(flags) + elif entry.data_type.value > 200 or entry.data_type == Data_Type.BYTE: #bit types + bit_size = Data_Type.getSize(entry.data_type) + bit_mask = (1 << bit_size) - 1 # Create a mask for extracting X bits + bit_index = entry.register_bit + value = (register >> bit_index) & bit_mask + elif entry.data_type == Data_Type.ASCII: + try: + value = register.decode("utf-8") #convert bytes to ascii + except UnicodeDecodeError as e: + print("UnicodeDecodeError:", e) + + return value + + + def process_register_ushort(self, registry : dict[int, int], entry : registry_map_entry ): + ''' process ushort type registry into data''' + if entry.data_type == Data_Type.UINT: #read uint + if entry.register + 1 not in registry: + return + + value = float((registry[entry.register] << 16) + registry[entry.register + 1]) + elif entry.data_type == Data_Type.SHORT: #read signed short + val = registry[entry.register] + + # Convert the combined unsigned value to a signed integer if necessary + if val & (1 << 15): # Check if the sign bit (bit 31) is set + # Perform two's complement conversion to get the signed integer + value = val - (1 << 16) + else: + value = val + value = -value + elif entry.data_type == Data_Type.INT: #read int + if entry.register + 1 not in registry: + return + + combined_value_unsigned = (registry[entry.register] << 16) + registry[entry.register + 1] + + # Convert the combined unsigned value to a signed integer if necessary + if combined_value_unsigned & (1 << 31): # Check if the sign bit (bit 31) is set + # Perform two's complement conversion to get the signed integer + value = combined_value_unsigned - (1 << 32) + else: + value = combined_value_unsigned + value = -value + #value = struct.unpack('> i) & 1: + flag_index = "b"+str(i) + if flag_index in self.codes[entry.documented_name+'_codes']: + flags.append(self.codes[entry.documented_name+'_codes'][flag_index]) + + value = ",".join(flags) + else: + flags : list[str] = [] + for i in range(start_bit, 16): # Iterate over each bit position (0 to 15) + # Check if the i-th bit is set + if (val >> i) & 1: + flags.append("1") + else: + flags.append("0") + value = ''.join(flags) + elif entry.data_type.value > 200 or entry.data_type == Data_Type.BYTE: #bit types + bit_size = Data_Type.getSize(entry.data_type) + bit_mask = (1 << bit_size) - 1 # Create a mask for extracting X bits + bit_index = entry.register_bit + value = (registry[entry.register] >> bit_index) & bit_mask + elif entry.data_type == Data_Type.ASCII: + value = registry[entry.register].to_bytes((16 + 7) // 8, byteorder='big') #convert to ushort to bytes + try: + value = value.decode("utf-8") #convert bytes to ascii + except UnicodeDecodeError as e: + print("UnicodeDecodeError:", e) + + else: #default, Data_Type.USHORT + value = float(registry[entry.register]) + + if entry.unit_mod != float(1): + value = value * entry.unit_mod + + #move this to transport level + #if isinstance(value, float) and self.max_precision > -1: + # value = round(value, self.max_precision) + + if (entry.data_type != Data_Type._16BIT_FLAGS and + entry.documented_name+'_codes' in self.codes): + try: + cleanval = str(int(value)) + + if cleanval in self.codes[entry.documented_name+'_codes']: + value = self.codes[entry.documented_name+'_codes'][cleanval] + except: + #do nothing; try is for intval + value = value + + return value + + def process_registery(self, registry : dict[int,int] | dict[int,bytes] , map : list[registry_map_entry]) -> dict[str,str]: + '''process registry into appropriate datatypes and names -- maybe add func for single entry later?''' + + concatenate_registry : dict = {} + info = {} + for entry in map: + + if entry.register not in registry: + continue + value = '' + + if isinstance(registry[entry.register], bytes): + value = self.process_register_bytes(registry, entry) + else: + value = self.process_register_ushort(registry, entry) + + #if item.unit: + # value = str(value) + item.unit + if entry.concatenate: + concatenate_registry[entry.register] = value + + all_exist = True + for key in entry.concatenate_registers: + if key not in concatenate_registry: + all_exist = False + break + if all_exist: + #if all(key in concatenate_registry for key in item.concatenate_registers): + concatenated_value = "" + for key in entry.concatenate_registers: + concatenated_value = concatenated_value + str(concatenate_registry[key]) + del concatenate_registry[key] + + info[entry.variable_name] = concatenated_value + else: + info[entry.variable_name] = value + + return info def validate_registry_entry(self, entry : registry_map_entry, val) -> int: #if code, validate first. @@ -540,5 +781,95 @@ def validate_registry_entry(self, entry : registry_map_entry, val) -> int: if int(val) >= entry.value_min and int(val) <= entry.value_max: return 1 - return 0 + return 0 + + def evaluate_expressions(self, expression, variables : dict[str,str]): + # Define the register string + register = "x4642.[ 1 + ((( [battery 1 number of cells] *2 )+ (1~[battery 1 number of temperature] *2)) ) ]" + + # Define variables + vars = {"battery 1 number of cells": 8, "battery 1 number of temperature": 2} + + # Function to evaluate mathematical expressions + def evaluate_variables(expression): + # Define a regular expression pattern to match variables + var_pattern = re.compile(r'\[([^\[\]]+)\]') + + # Replace variables in the expression with their values + def replace_vars(match): + var_name = match.group(1) + if var_name in vars: + return str(vars[var_name]) + else: + return match.group(0) + + # Replace variables with their values + return var_pattern.sub(replace_vars, expression) + + def evaluate_ranges(expression): + # Define a regular expression pattern to match ranges + range_pattern = re.compile(r'\[.*?((?P\d+)\s?\~\s?(?P\d+)).*?\]') + + # Find all ranges in the expression + ranges = range_pattern.findall(expression) + + # If there are no ranges, return the expression as is + if not ranges: + return [expression] + + # Initialize list to store results + results = [] + + # Iterate over each range found in the expression + for group, range_start, range_end in ranges: + range_start = int(range_start) + range_end = int(range_end) + if range_start > range_end: + range_start, range_end = range_end, range_start #swap + + # Generate duplicate entries for each value in the range + for i in range(range_start, range_end + 1): + replaced_expression = expression.replace(group, str(i)) + results.append(replaced_expression) + + return results + + def evaluate_expression(expression): + # Define a regular expression pattern to match "maths" + var_pattern = re.compile(r'\[(?P.*?)\]') + + # Replace variables in the expression with their values + def replace_vars(match): + try: + maths = match.group("maths") + maths = re.sub(r'\s', '', maths) #remove spaces, because ast.parse doesnt like them + + # Parse the expression safely + tree = ast.parse(maths, mode='eval') + + # Evaluate the expression + end_value = eval(compile(tree, filename='', mode='eval')) + + return str(end_value) + except : + return match.group(0) + + # Replace variables with their values + return var_pattern.sub(replace_vars, expression) + + + # Evaluate the register string + result = evaluate_variables(register) + print("Result:", result) + + result = evaluate_ranges(result) + print("Result:", result) + + results = [] + for r in result: + results.extend(evaluate_ranges(r)) + + for r in results: + print(evaluate_expression(r)) + #settings = protocol_settings('v0.14') \ No newline at end of file diff --git a/classes/transports/modbus_base.py b/classes/transports/modbus_base.py index 7b51250..8e345c2 100644 --- a/classes/transports/modbus_base.py +++ b/classes/transports/modbus_base.py @@ -107,7 +107,7 @@ def read_data(self) -> dict[str, str]: info = {} for registry_type in Registry_Type: registry = self.read_modbus_registers(ranges=self.protocolSettings.get_registry_ranges(registry_type=registry_type), registry_type=registry_type) - new_info = self.process_registery(registry, self.protocolSettings.get_registry_map(registry_type)) + new_info = self.protocolSettings.process_registery(registry, self.protocolSettings.get_registry_map(registry_type)) if False: new_info = {self.__input_register_prefix + key: value for key, value in new_info.items()} @@ -260,8 +260,8 @@ def evaluate_score(entry : registry_map_entry, val): holding_valid_count[name] = 0 #process registry based on protocol - input_info = self.process_registery(input_registry, protocol.registry_map[Registry_Type.INPUT]) - holding_info = self.process_registery(input_registry, protocol.registry_map[Registry_Type.HOLDING]) + input_info = protocol.process_registery(input_registry, protocol.registry_map[Registry_Type.INPUT]) + holding_info = protocol.process_registery(input_registry, protocol.registry_map[Registry_Type.HOLDING]) for entry in protocol.registry_map[Registry_Type.INPUT]: @@ -302,7 +302,7 @@ def write_variable(self, entry : registry_map_entry, value : str, registry_type #read current value current_registers = self.read_modbus_registers(start=entry.register, end=entry.register, registry_type=registry_type) - results = self.process_registery(current_registers, self.protocolSettings.get_registry_map(registry_type)) + results = self.protocolSettings.process_registery(current_registers, self.protocolSettings.get_registry_map(registry_type)) current_value = current_registers[entry.register] @@ -381,7 +381,7 @@ def read_variable(self, variable_name : str, registry_type : Registry_Type, entr end = max(entry.concatenate_registers) registers = self.read_modbus_registers(start=start, end=end, registry_type=registry_type) - results = self.process_registery(registers, registry_map) + results = self.protocolSettings.process_registery(registers, registry_map) return results[entry.variable_name] def read_modbus_registers(self, ranges : list[tuple] = None, start : int = 0, end : int = None, batch_size : int = 45, registry_type : Registry_Type = Registry_Type.INPUT ) -> dict: @@ -452,128 +452,6 @@ def read_modbus_registers(self, ranges : list[tuple] = None, start : int = 0, en registry[i+range[0]] = register.registers[i] return registry - - def process_registery(self, registry : dict, map : list[registry_map_entry]) -> dict[str,str]: - '''process registry into appropriate datatypes and names''' - - concatenate_registry : dict = {} - info = {} - for item in map: - - if item.register not in registry: - continue - value = '' - - if item.data_type == Data_Type.UINT: #read uint - if item.register + 1 not in registry: - continue - value = float((registry[item.register] << 16) + registry[item.register + 1]) - elif item.data_type == Data_Type.SHORT: #read signed short - val = registry[item.register] - - # Convert the combined unsigned value to a signed integer if necessary - if val & (1 << 15): # Check if the sign bit (bit 31) is set - # Perform two's complement conversion to get the signed integer - value = val - (1 << 16) - else: - value = val - value = -value - elif item.data_type == Data_Type.INT: #read int - if item.register + 1 not in registry: - continue - - combined_value_unsigned = (registry[item.register] << 16) + registry[item.register + 1] - - # Convert the combined unsigned value to a signed integer if necessary - if combined_value_unsigned & (1 << 31): # Check if the sign bit (bit 31) is set - # Perform two's complement conversion to get the signed integer - value = combined_value_unsigned - (1 << 32) - else: - value = combined_value_unsigned - value = -value - #value = struct.unpack('> i) & 1: - flag_index = "b"+str(i) - if flag_index in self.protocolSettings.codes[item.documented_name+'_codes']: - flags.append(self.protocolSettings.codes[item.documented_name+'_codes'][flag_index]) - - value = ",".join(flags) - else: - flags : list[str] = [] - for i in range(start_bit, 16): # Iterate over each bit position (0 to 15) - # Check if the i-th bit is set - if (val >> i) & 1: - flags.append("1") - else: - flags.append("0") - value = ''.join(flags) - elif item.data_type.value > 200 or item.data_type == Data_Type.BYTE: #bit types - bit_size = Data_Type.getSize(item.data_type) - bit_mask = (1 << bit_size) - 1 # Create a mask for extracting X bits - bit_index = item.register_bit - value = (registry[item.register] >> bit_index) & bit_mask - elif item.data_type == Data_Type.ASCII: - value = registry[item.register].to_bytes((16 + 7) // 8, byteorder='big') #convert to ushort to bytes - try: - value = value.decode("utf-8") #convert bytes to ascii - except UnicodeDecodeError as e: - print("UnicodeDecodeError:", e) - - else: #default, Data_Type.USHORT - value = float(registry[item.register]) - - if item.unit_mod != float(1): - value = value * item.unit_mod - - #move this to transport level - #if isinstance(value, float) and self.max_precision > -1: - # value = round(value, self.max_precision) - - if (item.data_type != Data_Type._16BIT_FLAGS and - item.documented_name+'_codes' in self.protocolSettings.codes): - try: - cleanval = str(int(value)) - - if cleanval in self.protocolSettings.codes[item.documented_name+'_codes']: - value = self.protocolSettings.codes[item.documented_name+'_codes'][cleanval] - except: - #do nothing; try is for intval - value = value - - #if item.unit: - # value = str(value) + item.unit - if item.concatenate: - concatenate_registry[item.register] = value - - all_exist = True - for key in item.concatenate_registers: - if key not in concatenate_registry: - all_exist = False - break - if all_exist: - #if all(key in concatenate_registry for key in item.concatenate_registers): - concatenated_value = "" - for key in item.concatenate_registers: - concatenated_value = concatenated_value + str(concatenate_registry[key]) - del concatenate_registry[key] - - info[item.variable_name] = concatenated_value - else: - info[item.variable_name] = value - - return info def read_registry(self, registry_type : Registry_Type = Registry_Type.INPUT) -> dict[str,str]: map = self.protocolSettings.get_registry_map(registry_type) @@ -581,5 +459,5 @@ def read_registry(self, registry_type : Registry_Type = Registry_Type.INPUT) -> return {} registry = self.read_modbus_registers(self.protocolSettings.get_registry_ranges(registry_type), registry_type=registry_type) - info = self.process_registery(registry, map) + info = self.protocolSettings.process_registery(registry, map) return info \ No newline at end of file diff --git a/classes/transports/modbus_rtu.py b/classes/transports/modbus_rtu.py index 30516cc..80fec51 100644 --- a/classes/transports/modbus_rtu.py +++ b/classes/transports/modbus_rtu.py @@ -20,7 +20,7 @@ def __init__(self, settings : SectionProxy, protocolSettings : protocol_settings if not self.port: raise ValueError("Port is not set") - self.baudrate = settings.getint("buadrate", 9600) + self.baudrate = settings.getint("baudrate", 9600) address : int = settings.getint("address", 0) self.addresses = [address] diff --git a/classes/transports/mqtt.py b/classes/transports/mqtt.py index 4a1afcf..d0b146c 100644 --- a/classes/transports/mqtt.py +++ b/classes/transports/mqtt.py @@ -9,7 +9,7 @@ import paho.mqtt.properties import paho.mqtt.packettypes -from paho.mqtt.client import Client as MQTTClient +from paho.mqtt.client import Client as MQTTClient, MQTT_ERR_NO_CONN from defs.common import strtobool from .transport_base import transport_base @@ -42,7 +42,7 @@ class mqtt(transport_base): __first_connection : bool = True __reconnecting : bool = False - __connected : bool = False + connected : bool = False def __init__(self, settings : SectionProxy): self.host = settings.get('host', fallback="") @@ -50,8 +50,8 @@ def __init__(self, settings : SectionProxy): raise ValueError("Host is not set") self.port = settings.getint('port', fallback=self.port) - self.base_topic = settings.get('base_topic', fallback=self.base_topic) - self.error_topic = settings.get('error_topic', fallback=self.error_topic) + self.base_topic = settings.get('base_topic', fallback=self.base_topic).rstrip('/') + self.error_topic = settings.get('error_topic', fallback=self.error_topic).rstrip('/') self.discovery_topic = settings.get('discovery_topic', fallback=self.discovery_topic) self.discovery_enabled = strtobool(settings.get('discovery_enabled', self.discovery_enabled)) self.json = strtobool(settings.get('json', self.json)) @@ -131,7 +131,7 @@ def mqtt_reconnect(self): #sleep to give a chance to reconnect. time.sleep(self.reconnect_delay) - if self.__connected: + if self.connected: self.__reconnecting = 0 return except: @@ -144,12 +144,12 @@ def mqtt_reconnect(self): quit() #exit, service should restart entire script def on_disconnect(self, client, userdata, rc): - self.mqtt_reconnect() + self.connected = False def on_connect(self, client, userdata, flags, rc): """ The callback for when the client receives a CONNACK response from the server. """ self._log.info("Connected with result code %s\n",str(rc)) - self.__connected = True + self.connected = True __write_topics : dict[str, registry_map_entry] = {} @@ -157,10 +157,15 @@ def write_data(self, data : dict[str, str]): if not self.write_enabled: return + if self.connected: + self.connected = self.client.is_connected() + self._log.info("write data to mqtt transport") self._log.info(data) #have to send this every loop, because mqtt doesnt disconnect when HA restarts. HA bug. - self.client.publish(self.base_topic + "/availability","online", qos=0,retain=True) + info = self.client.publish(self.base_topic + "/availability","online", qos=0,retain=True) + if info.rc == MQTT_ERR_NO_CONN: + self.connected = False if(self.json): # Serializing json @@ -204,7 +209,7 @@ def mqtt_discovery(self, from_transport : transport_base): device = {} device['manufacturer'] = from_transport.device_manufacturer device['model'] = from_transport.device_model - device['identifiers'] = "hotnoob_" + from_transport.device_serial_number + device['identifiers'] = "hotnoob_" + from_transport.device_model + "_" + from_transport.device_serial_number device['name'] = from_transport.device_name registry_map : list[registry_map_entry] = [] diff --git a/classes/transports/pace.py b/classes/transports/pace.py index ce33d5b..0bc0c48 100644 --- a/classes/transports/pace.py +++ b/classes/transports/pace.py @@ -306,8 +306,8 @@ def __init__(self, settings : dict[str,str]): if "port" in settings: self.port = settings["port"] - if "buadrate" in settings: - self.baudrate = settings["buadrate"] + if "baudrate" in settings: + self.baudrate = settings["baudrate"] self.client = CustomModbusSerialClient(method='binary', port=self.port, baudrate=int(self.baudrate), diff --git a/classes/transports/serial_frame_client.py b/classes/transports/serial_frame_client.py index 9e9ef55..30aec00 100644 --- a/classes/transports/serial_frame_client.py +++ b/classes/transports/serial_frame_client.py @@ -1,3 +1,4 @@ +from typing import Callable import serial import threading import time @@ -6,31 +7,121 @@ class serial_frame_client(): ''' basic serial client implenting a empty SOI/EOI frame''' client : serial.Serial - running : bool + running : bool = False soi : bytes '''start of information''' eoi : bytes '''end of information''' - pending_frames : list[bytes] + pending_frames : list[bytes] = [] max_frame_size : int = 256 + port : str = '/dev/ttyUSB0' + baud : int = 9600 - def __init__(self) -> None: - self.client = serial.Serial('/dev/ttyUSB0', 9600) - pass + timeout : float = 5 + ''' timeout in seconds ''' - def checksum(self, data : bytes) -> bytes: - return b'' + #region asyncronous + asynchronous : bool = False + ''' if set, runs main loop''' + + on_message : Callable[[bytes], None] = None + ''' async mode only''' + + thread : threading.Thread + ''' main thread for read loop''' + + callback_lock : threading.Lock = threading.Lock() + '''lock for callback''' + #endregion asyncronous + + + def __init__(self, port : str , baud : int , soi : bytes, eoi : bytes, **kwrgs) -> None: + self.soi = soi + self.eoi = eoi + self.port = port + self.baud = baud + self.client = serial.Serial(port, baud, **kwrgs) + + def connect(self): + if self.asynchronous: + self.running = True + self.pending_frames = [] + self.thread = threading.Thread(target=self.read_thread) + self.thread.daemon = True + self.thread.start() + return True def write(self, data : bytes): ''' write data, excluding SOI and EOI bytes''' data = self.soi + data + self.eoi - self.client.write() + self.client.write(data) - def read(self): + def read(self, reset_buffer = True, frames = 1) -> list[bytes] | bytes: + ''' returns list of frames, if frames > 1 ''' buffer = bytearray() + self.pending_frames.clear() + + #for shatty order sensitive protocols. + # Clear input buffers + if reset_buffer: + self.client.reset_input_buffer() + + timedout = time.time() + self.timeout + self.client.timeout = self.timeout + frameCount = 0 + + while time.time() < timedout: + # Read data from serial port + data = self.client.read() + + # Check if data is available + if data: + # Append data to buffer + buffer += data + + # Find SOI index in buffer + soi_index = buffer.find(self.soi) + + # Process all occurrences of SOI in buffer + while soi_index != -1: + # Remove data before SOI sequence + buffer = buffer[soi_index:] + + # Find EOI index in buffer + eoi_index = buffer.find(self.eoi) + + if eoi_index != -1: + + frame = buffer[len(self.soi):eoi_index] + if frames == 1: + return frame + + if frameCount > 1: + # Extract and store the complete frame + self.pending_frames.append(frame) + + if self.pending_frames.count() == frames: + return self.pending_frames + + # Remove the processed data from the buffer + buffer = buffer[eoi_index + len(self.eoi) : ] + + # Find next SOI index in the remaining buffer + soi_index = buffer.find(self.soi) + + else: + # If no EOI is found and buffer size exceeds max_frame_size, clear buffer + if len(buffer) > self.max_frame_size: + buffer.clear() + break #no eoi, continue waiting + + time.sleep(0.01) + def read_thread(self): + buffer = bytearray() + self.running = True while self.running: # Read data from serial port data = self.client.read() @@ -66,6 +157,12 @@ def read(self): buffer.clear() break #no eoi, continue waiting + #can probably be in the loop, but being cautious + for frame in self.pending_frames: + with self.callback_lock: + if self.on_message: + self.on_message(frame) + time.sleep(0.01) diff --git a/classes/transports/serial_pylon.py b/classes/transports/serial_pylon.py index 0b0df41..e898652 100644 --- a/classes/transports/serial_pylon.py +++ b/classes/transports/serial_pylon.py @@ -1,28 +1,234 @@ +from enum import Enum +import struct + +import serial + +from ..Object import Object + from .serial_frame_client import serial_frame_client from .transport_base import transport_base + + +from typing import TYPE_CHECKING +if TYPE_CHECKING: + from configparser import SectionProxy + from classes.protocol_settings import protocol_settings, registry_map_entry + + +class return_codes(Enum): + NORMAL = 0x00 + VERSION_ERROR = 0x01 + CHECKSUM_ERROR = 0x02 + LCHECKSUM_ERROR = 0x03 + INVALID_CID2 = 0x04 + COMMAND_FORMAT_ERROR = 0X05 + INVALID_DATA = 0X06 + ADDRESS_ERROR = 0X90 + COMMUNICATION_ERROR = 0X91 + UNKNOWN_ERROR = -1 + + @classmethod + def fromByte(cls, value : bytes): + try: + return cls(int(value, 16)) # Attempt to access the Enum member + except ValueError: + return return_codes.UNKNOWN_ERROR + class serial_pylon(transport_base): + ''' for a lack of a better name''' + + port : str = "/dev/ttyUSB0" + addresses : list[int] = [] + baudrate : int = 9600 + client : serial_frame_client #this format is pretty common; i need a name for it. - SOI : bytes = b'\x7e' - VER : bytes = b'\x00' + SOI : bytes = b'\x7e' # aka b"~" + VER : bytes = b'\x00' ''' version has to be fetched first ''' ADR : bytes CID1 : bytes CID2 : bytes LENGTH : bytes + ''' 2 bytes - include LENID & LCHKSUM''' INFO : bytes CHKSUM : bytes - EOI : bytes = b'\x0d' + EOI : bytes = b'\x0d' # aka b"\r" - ''' for a lack of a better name''' - def __init__(self) -> None: + def __init__(self, settings : 'SectionProxy', protocolSettings : 'protocol_settings' = None): + super().__init__(settings, protocolSettings=protocolSettings) '''address is required to be specified ''' + self.port = settings.get("port", "") + if not self.port: + raise ValueError("Port is not set") + + self.baudrate = settings.getint("baudrate", 9600) + + address : int = settings.getint("address", 0) + self.addresses = [address] + + self.ADR = struct.pack('B', address) + #todo, multi address support later + + self.client = serial_frame_client(self.port, + self.baudrate, + self.SOI, + self.EOI, + bytesize=8, parity=serial.PARITY_NONE, stopbits=1, exclusive=True) + + pass def connect(self): + self.client.connect() #3.1 Get protocol version + if self.VER == b'\x00': + #get VER for communicating + #SOI VER ADR 46H 4FH LENGT INFO CHKSUM EOI + version = self.read_variable('version', attribute='ver') + if version: + self.connected = True + print("pylon protocol version is "+str(version)) + self.VER = version + + name = self.read_variable('battery_name') + print(name) + pass + + def read_data(self): + info = {} + registry_map = self.protocolSettings.get_registry_map() + - pass - \ No newline at end of file + data : dict [int, bytes] = {} + for entry in registry_map: + + if entry.register not in data: #todo: need to check send data. later. + command = entry.register #CID1 and CID2 combined creates a single ushort + self.send_command(command) + frame = self.client.read() + if frame: #decode info to ascii: bytes.fromhex(name.decode("utf-8")).decode("ascii") + raw = getattr(self.decode_frame(frame), 'info') + if raw: + raw = bytes.fromhex(raw.decode("utf8")) #because protocol is in "ascii" + data[entry.register] = raw + + info = self.protocolSettings.process_registery({entry.register : raw}, map=registry_map) + + if not info: + self._log.info("Data is Empty; Serial Pylon Transport busy?") + + return info + + def read_variable(self, variable_name : str, entry : 'registry_map_entry' = None, attribute : str = 'info'): + ##clean for convinecne + if variable_name: + variable_name = variable_name.strip().lower().replace(' ', '_') + + registry_map = self.protocolSettings.get_registry_map() + + if entry == None: + for e in registry_map: + if e.variable_name == variable_name: + entry = e + break + + + if entry: + #entry.concatenate this protocol probably doesnt require concatenate, since info is variable length. + command = entry.register #CID1 and CID2 combined creates a single ushort + self.send_command(command) + frame = self.client.read() + if frame: #decode info to ascii: bytes.fromhex(name.decode("utf-8")).decode("ascii") + raw = getattr(self.decode_frame(frame), attribute) + if raw and attribute == 'info': + raw = bytes.fromhex(raw.decode("utf8")) #because protocol is in "ascii" + raw = self.protocolSettings.process_registery({entry.register : raw}, map=registry_map) + return raw + + + return None + + def calculate_checksum(self, data): + # Calculate the sum of all characters in ASCII value + ascii_sum = sum(data) + + # Take modulus 65536 + remainder = ascii_sum % 65536 + + # Bitwise invert the remainder and add 1 + checksum = ~remainder & 0xFFFF + checksum += 1 + + return checksum + + def decode_frame(self, raw_frame: bytes) -> bytes: + b4 = raw_frame + raw_frame = bytes(raw_frame) + + frame_data = raw_frame[0:-4] + frame_checksum = raw_frame[-4:] + + calc_checksum = struct.pack('>H', self.calculate_checksum(raw_frame[0:-4])).hex().upper().encode() + if calc_checksum != frame_checksum: + self._log.warning(f"Serial Pylon checksum error, got {calc_checksum}, expected {frame_checksum}") + + data = Object() + data.ver = frame_data[0:2] + data.adr = frame_data[2:4] + data.cid1 = frame_data[4:6] + data.cid2 = frame_data[6:8] + data.infolength = frame_data[8:12] + data.info = frame_data[12:] + + #on return, cid2 holds a return error code. so reads are time sensitive. will have to write syncronis functions in client + #fromByte + returnCode = return_codes.fromByte(data.cid2) + if returnCode != return_codes.NORMAL: + self._log.warning(f"Serial Pylon Error code {returnCode}") + + #todo, process info + return data + + + def build_frame(self, command : int, info: bytes = b''): + ''' builds frame without soi and eoi; that is left for frame client''' + + info_length = 0 + + lenid = len(info) + if lenid != 0: + lenid_sum = (lenid & 0xf) + ((lenid >> 4) & 0xf) + ((lenid >> 8) & 0xf) + lenid_modulo = lenid_sum % 16 + lenid_invert_plus_one = 0b1111 - lenid_modulo + 1 + + info_length = (lenid_invert_plus_one << 12) + lenid + + + self.VER = b'\x20' + + #protocol is in ASCII hex. :facepalm: + frame : str = self.VER.hex().upper() + frame = frame + self.ADR.hex().upper() + frame = frame + struct.pack('>H', command).hex().upper() + frame = frame + struct.pack('>H', info_length).hex().upper() + frame = frame + info.hex().upper() + + frame = frame.encode() + + frame_chksum = self.calculate_checksum(frame) + frame = frame + struct.pack('>H', frame_chksum).hex().upper().encode() + + #test frame + #self.decode_frame(frame) + + return frame + + + def send_command(self, cmd, info: bytes = b''): + data = self.build_frame(cmd, info) + self.client.write(data) + + \ No newline at end of file diff --git a/classes/transports/transport_base.py b/classes/transports/transport_base.py index 1693835..db0e563 100644 --- a/classes/transports/transport_base.py +++ b/classes/transports/transport_base.py @@ -14,7 +14,7 @@ class transport_base: protocol_version : str = '' transport_name : str = '' device_name : str = '' - device_serial_number : str = 'hotnoob' + device_serial_number : str = '' device_manufacturer : str = 'hotnoob' device_model : str = 'hotnoob' bridge : str = '' @@ -51,9 +51,9 @@ def __init__(self, settings : 'SectionProxy', protocolSettings : 'protocol_setti #todo, reimplement default settings from protocolsettings if settings: - self.device_serial_number = settings.get("serial_number", self.device_serial_number) - self.device_manufacturer = settings.get("manufacturer", self.device_manufacturer) - self.device_name = settings.get("device_name", fallback=self.device_manufacturer+" "+self.device_name) + self.device_serial_number = settings.get(["device_serial_number", "serial_number"], self.device_serial_number) + self.device_manufacturer = settings.get(["device_manufacturer", "manufacturer"], self.device_manufacturer) + self.device_name = settings.get(['device_name', 'name'], fallback=self.device_manufacturer+"_"+self.device_serial_number) self.bridge = settings.get("bridge", self.bridge) self.read_interval = settings.getfloat("read_interval", self.read_interval) if "write_enabled" in settings: diff --git a/defs/common.py b/defs/common.py index 5eb9a61..6ea2cf7 100644 --- a/defs/common.py +++ b/defs/common.py @@ -6,4 +6,11 @@ def strtobool (val): if val in ('y', 'yes', 't', 'true', 'on', '1'): return 1 - return 0 \ No newline at end of file + return 0 + +def strtoint(val : str) -> int: + ''' converts str to int, but allows for hex string input, identified by x prefix''' + if val and val[0] == 'x': + return int.from_bytes(bytes.fromhex(val[1:]), byteorder='big') + + return int(val) \ No newline at end of file diff --git a/docs/RS485-protocol-pylon-low-voltage-V3.3-20180821.pdf b/docs/RS485-protocol-pylon-low-voltage-V3.3-20180821.pdf new file mode 100644 index 0000000..28cf614 Binary files /dev/null and b/docs/RS485-protocol-pylon-low-voltage-V3.3-20180821.pdf differ diff --git a/docs/Victron VE-Bus-products-MK2-Protocol-3-14.pdf b/docs/Victron VE-Bus-products-MK2-Protocol-3-14.pdf new file mode 100644 index 0000000..589613c Binary files /dev/null and b/docs/Victron VE-Bus-products-MK2-Protocol-3-14.pdf differ diff --git a/protocol_gateway.py b/protocol_gateway.py index d16ae63..89ce072 100644 --- a/protocol_gateway.py +++ b/protocol_gateway.py @@ -25,7 +25,7 @@ import logging import sys import traceback -from configparser import RawConfigParser, ConfigParser +from configparser import ConfigParser, NoOptionError from classes.protocol_settings import protocol_settings,Data_Type,registry_map_entry,Registry_Type,WriteMode from classes.transports.transport_base import transport_base @@ -50,6 +50,29 @@ """ +class CustomConfigParser(ConfigParser): + def get(self, section, option, *args, **kwargs): + if isinstance(option, list): + fallback = None + + if 'fallback' in kwargs: #override kwargs fallback, for manually handling here + fallback = kwargs['fallback'] + kwargs['fallback'] = None + + for name in option: + value = super().get(section, name, *args, **kwargs) + if value: + break + + if not value: + value = fallback + + if value == None: + raise NoOptionError(option[0], section) + else: + value = super().get(section, option, *args, **kwargs) + return value.strip() if value is not None else value + class Protocol_Gateway: """ Main class, implementing the Growatt / Inverters to MQTT functionality @@ -86,7 +109,7 @@ def __init__(self, config_file : str): self.__log.info("Loading...") - self.__settings = ConfigParser() + self.__settings = CustomConfigParser() self.__settings.read(self.config_file) ##[general] diff --git a/protocols/pylon_rs485_v3.3.json b/protocols/pylon_rs485_v3.3.json new file mode 100644 index 0000000..9994868 --- /dev/null +++ b/protocols/pylon_rs485_v3.3.json @@ -0,0 +1,4 @@ +{ + "transport" : "serial_pylon", + "baud" : 115200 +} \ No newline at end of file diff --git a/protocols/pylon_rs485_v3.3.registry_map.csv b/protocols/pylon_rs485_v3.3.registry_map.csv new file mode 100644 index 0000000..259d8e7 --- /dev/null +++ b/protocols/pylon_rs485_v3.3.registry_map.csv @@ -0,0 +1,25 @@ +variable name,data type,register,read command,documented name,description,writable,values,unit,note +,BYTE,x464F,,version,protocol version,,0~255,, +,ASCII.10,x4651,,battery name,,,,, +,USHORT,x4651.10,,software version,,,,, +,ASCII.20,x4651.12,,manufacturer name,,,,, +,BYTE,x4642,0x01,Battery 1 number of cells,,,,,todo +,,,,,,,,, +,USHORT,x4647,,cell high voltage limit ,,,,0.01v, +,USHORT,x4647.2,,cell low voltage limit ,,,,0.01v, +,USHORT,X4642.4,,cell under voltage limit,,,,0.01v, +,USHORT,X4642.6,,charge high temperature limit ,,,,K, +,USHORT,X4642.8,,charge low temperature limit ,,,,K, +,USHORT,X4642.10,,charge current limit ,,,,0.01A, +,USHORT,X4642.12,,module high voltage limit,,,,0.01v, +,USHORT,X4642.14,,module low voltage limit ,,,,0.01v, +,USHORT,X4642.16,,module under voltage limit ,,,,0.01v, +,USHORT,X4642.18,,discharge high temperature limit ,,,,K, +,USHORT,X4642.20,,discharge low temperature limit ,,,,K, +,USHORT,X4642.22,,discharge current limit ,,,,0.01A, +,ASCII,x4644,0x01,Battery 1 alarm,,,,,todo +,,x4692,,Battery 1 charge info,,,,,todo +,,x4693,,Battery 1 Serial Number,,,,,todo +,,x4694,,Battery 1 charge settings,,,,,todo +,,x4695,,Battery 1 Turn Off,,RD,,,todo +,,x4696,,Battery 1 Software Info,,,,,todo diff --git a/test.py b/test.py index b6b0850..f7ee1f7 100644 --- a/test.py +++ b/test.py @@ -1,25 +1,91 @@ -class Base: - top_class_name = None +import re +import ast - def __init__(self): - self.top_class_name = self.__class__.__name__ - print("Subclass top class name:", self.top_class_name) - +# Define the register string +register = "x4642.[ 1 + ((( [battery 1 number of cells] *2 )+ (1~[battery 1 number of temperature] *2)) ) ]" -# Example usage -class Subclass(Base): - def __init__(self): - super().__init__() # Call the __init__ method of the base class - print("Subclass top class name:", self.top_class_name) +# Define variables +vars = {"battery 1 number of cells": 8, "battery 1 number of temperature": 2} -# Example usage -class Subclass2(Base): - def __init__(self): - super().__init__() # Call the __init__ method of the base class - print("Subclass top class name:", self.top_class_name) +# Function to evaluate mathematical expressions +def evaluate_variables(expression): + # Define a regular expression pattern to match variables + var_pattern = re.compile(r'\[([^\[\]]+)\]') + # Replace variables in the expression with their values + def replace_vars(match): + var_name = match.group(1) + if var_name in vars: + return str(vars[var_name]) + else: + return match.group(0) -# Create an instance of SubSubclass -sub_sub_instance = Subclass() -sub_sub_instance = Subclass2() + # Replace variables with their values + return var_pattern.sub(replace_vars, expression) + +def evaluate_ranges(expression): + # Define a regular expression pattern to match ranges + range_pattern = re.compile(r'\[.*?((?P\d+)\s?\~\s?(?P\d+)).*?\]') + + # Find all ranges in the expression + ranges = range_pattern.findall(expression) + + # If there are no ranges, return the expression as is + if not ranges: + return [expression] + + # Initialize list to store results + results = [] + + # Iterate over each range found in the expression + for group, range_start, range_end in ranges: + range_start = int(range_start) + range_end = int(range_end) + if range_start > range_end: + range_start, range_end = range_end, range_start #swap + + # Generate duplicate entries for each value in the range + for i in range(range_start, range_end + 1): + replaced_expression = expression.replace(group, str(i)) + results.append(replaced_expression) + + return results + +def evaluate_expression(expression): + # Define a regular expression pattern to match "maths" + var_pattern = re.compile(r'\[(?P.*?)\]') + + # Replace variables in the expression with their values + def replace_vars(match): + try: + maths = match.group("maths") + maths = re.sub(r'\s', '', maths) #remove spaces, because ast.parse doesnt like them + + # Parse the expression safely + tree = ast.parse(maths, mode='eval') + + # Evaluate the expression + end_value = eval(compile(tree, filename='', mode='eval')) + + return str(end_value) + except : + return match.group(0) + + # Replace variables with their values + return var_pattern.sub(replace_vars, expression) + + +# Evaluate the register string +result = evaluate_variables(register) +print("Result:", result) + +result = evaluate_ranges(result) +print("Result:", result) + +results = [] +for r in result: + results.extend(evaluate_ranges(r)) + +for r in results: + print(evaluate_expression(r))