diff --git a/classes/protocol_settings.py b/classes/protocol_settings.py index cc5e74e..4b7ef02 100644 --- a/classes/protocol_settings.py +++ b/classes/protocol_settings.py @@ -1149,41 +1149,82 @@ def process_registery(self, registry : Union[dict[int, int], dict[int, bytes]] , concatenate_registry : dict = {} info = {} + + # First pass: process all non-concatenated entries for entry in map: - if entry.register not in registry: continue - value = "" - - if isinstance(registry[entry.register], bytes): - value = self.process_register_bytes(registry, entry) - else: - value = self.process_register_ushort(registry, entry) - - #if item.unit: - # value = str(value) + item.unit + + if not entry.concatenate: + value = "" + if isinstance(registry[entry.register], bytes): + value = self.process_register_bytes(registry, entry) + else: + value = self.process_register_ushort(registry, entry) + info[entry.variable_name] = value + + # Second pass: process concatenated entries + for entry in map: + if entry.register not in registry: + continue + if entry.concatenate: - concatenate_registry[entry.register] = value - - all_exist = True - for key in entry.concatenate_registers: - if key not in concatenate_registry: - all_exist = False - break - if all_exist: - #if all(key in concatenate_registry for key in item.concatenate_registers): - concatenated_value = "" - for key in entry.concatenate_registers: - concatenated_value = concatenated_value + str(concatenate_registry[key]) - del concatenate_registry[key] - - #replace null characters with spaces and trim + # For concatenated entries, we need to process each register in the concatenate_registers list + concatenated_value = "" + all_registers_exist = True + + # For ASCII concatenated variables, extract 8-bit characters from 16-bit registers + if entry.data_type == Data_Type.ASCII: + for reg in entry.concatenate_registers: + if reg not in registry: + all_registers_exist = False + break + + reg_value = registry[reg] + # Extract high byte (bits 8-15) and low byte (bits 0-7) + high_byte = (reg_value >> 8) & 0xFF + low_byte = reg_value & 0xFF + + # Convert each byte to ASCII character (low byte first, then high byte) + low_char = chr(low_byte) + high_char = chr(high_byte) + concatenated_value += low_char + high_char + else: + for reg in entry.concatenate_registers: + if reg not in registry: + all_registers_exist = False + break + + # Create a temporary entry for this register to process it + temp_entry = registry_map_entry( + registry_type=entry.registry_type, + register=reg, + register_bit=0, + register_byte=0, + variable_name=f"temp_{reg}", + documented_name=f"temp_{reg}", + unit="", + unit_mod=1.0, + concatenate=False, + concatenate_registers=[], + values=[], + data_type=entry.data_type, + data_type_size=entry.data_type_size + ) + + if isinstance(registry[reg], bytes): + value = self.process_register_bytes(registry, temp_entry) + else: + value = self.process_register_ushort(registry, temp_entry) + + concatenated_value += str(value) + + if all_registers_exist: + # Replace null characters with spaces and trim for ASCII if entry.data_type == Data_Type.ASCII: concatenated_value = concatenated_value.replace("\x00", " ").strip() - + info[entry.variable_name] = concatenated_value - else: - info[entry.variable_name] = value return info diff --git a/classes/transports/canbus.py b/classes/transports/canbus.py index 7bc1a62..72bec8f 100644 --- a/classes/transports/canbus.py +++ b/classes/transports/canbus.py @@ -240,6 +240,18 @@ def read_data(self) -> dict[str, str]: info.update(new_info) + # Check for serial number variables and promote to device_serial_number + if info: + # Look for serial number variable + if "serial_number" in info: + value = info["serial_number"] + if value and value != "None" and str(value).strip(): + # Found a valid serial number, promote it + if self.device_serial_number != str(value): + self._log.info(f"Promoting parsed serial number: {value} (from variable: serial_number)") + self.device_serial_number = str(value) + self.update_identifier() + currentTime = time.time() if not info: diff --git a/classes/transports/influxdb_out.py b/classes/transports/influxdb_out.py new file mode 100644 index 0000000..d0a9589 --- /dev/null +++ b/classes/transports/influxdb_out.py @@ -0,0 +1,182 @@ +import sys +from configparser import SectionProxy +from typing import TextIO +import time + +from defs.common import strtobool + +from ..protocol_settings import Registry_Type, WriteMode, registry_map_entry +from .transport_base import transport_base + + +class influxdb_out(transport_base): + ''' InfluxDB v1 output transport that writes data to an InfluxDB server ''' + host: str = "localhost" + port: int = 8086 + database: str = "solar" + username: str = "" + password: str = "" + measurement: str = "device_data" + include_timestamp: bool = True + include_device_info: bool = True + batch_size: int = 100 + batch_timeout: float = 10.0 + + client = None + batch_points = [] + last_batch_time = 0 + + def __init__(self, settings: SectionProxy): + self.host = settings.get("host", fallback=self.host) + self.port = settings.getint("port", fallback=self.port) + self.database = settings.get("database", fallback=self.database) + self.username = settings.get("username", fallback=self.username) + self.password = settings.get("password", fallback=self.password) + self.measurement = settings.get("measurement", fallback=self.measurement) + self.include_timestamp = strtobool(settings.get("include_timestamp", fallback=self.include_timestamp)) + self.include_device_info = strtobool(settings.get("include_device_info", fallback=self.include_device_info)) + self.batch_size = settings.getint("batch_size", fallback=self.batch_size) + self.batch_timeout = settings.getfloat("batch_timeout", fallback=self.batch_timeout) + + self.write_enabled = True # InfluxDB output is always write-enabled + super().__init__(settings) + + def connect(self): + """Initialize the InfluxDB client connection""" + self._log.info("influxdb_out connect") + + try: + from influxdb import InfluxDBClient + + # Create InfluxDB client + self.client = InfluxDBClient( + host=self.host, + port=self.port, + username=self.username if self.username else None, + password=self.password if self.password else None, + database=self.database + ) + + # Test connection + self.client.ping() + + # Create database if it doesn't exist + databases = self.client.get_list_database() + if not any(db['name'] == self.database for db in databases): + self._log.info(f"Creating database: {self.database}") + self.client.create_database(self.database) + + self.connected = True + self._log.info(f"Connected to InfluxDB at {self.host}:{self.port}") + + except ImportError: + self._log.error("InfluxDB client not installed. Please install with: pip install influxdb") + self.connected = False + except Exception as e: + self._log.error(f"Failed to connect to InfluxDB: {e}") + self.connected = False + + def write_data(self, data: dict[str, str], from_transport: transport_base): + """Write data to InfluxDB""" + if not self.write_enabled or not self.connected: + return + + self._log.info(f"write data from [{from_transport.transport_name}] to influxdb_out transport") + self._log.info(data) + + # Prepare tags for InfluxDB + tags = {} + + # Add device information as tags if enabled + if self.include_device_info: + tags.update({ + "device_identifier": from_transport.device_identifier, + "device_name": from_transport.device_name, + "device_manufacturer": from_transport.device_manufacturer, + "device_model": from_transport.device_model, + "device_serial_number": from_transport.device_serial_number, + "transport": from_transport.transport_name + }) + + # Prepare fields (the actual data values) + fields = {} + for key, value in data.items(): + # Check if we should force float formatting based on protocol settings + should_force_float = False + + # Try to get registry entry from protocol settings to check unit_mod + if hasattr(from_transport, 'protocolSettings') and from_transport.protocolSettings: + # Check both input and holding registries + for registry_type in [Registry_Type.INPUT, Registry_Type.HOLDING]: + registry_map = from_transport.protocolSettings.get_registry_map(registry_type) + for entry in registry_map: + if entry.variable_name == key: + # If unit_mod is not 1.0, this value should be treated as float + if entry.unit_mod != 1.0: + should_force_float = True + self._log.debug(f"Variable {key} has unit_mod {entry.unit_mod}, forcing float format") + break + if should_force_float: + break + + # Try to convert to numeric values for InfluxDB + try: + # Try to convert to float first + float_val = float(value) + + # If it's an integer but should be forced to float, or if it's already a float + if should_force_float or not float_val.is_integer(): + fields[key] = float_val + else: + fields[key] = int(float_val) + except (ValueError, TypeError): + # If conversion fails, store as string + fields[key] = str(value) + + # Create InfluxDB point + point = { + "measurement": self.measurement, + "tags": tags, + "fields": fields + } + + # Add timestamp if enabled + if self.include_timestamp: + point["time"] = int(time.time() * 1e9) # Convert to nanoseconds + + # Add to batch + self.batch_points.append(point) + + # Check if we should flush the batch + current_time = time.time() + if (len(self.batch_points) >= self.batch_size or + (current_time - self.last_batch_time) >= self.batch_timeout): + self._flush_batch() + + def _flush_batch(self): + """Flush the batch of points to InfluxDB""" + if not self.batch_points: + return + + try: + self.client.write_points(self.batch_points) + self._log.info(f"Wrote {len(self.batch_points)} points to InfluxDB") + self.batch_points = [] + self.last_batch_time = time.time() + except Exception as e: + self._log.error(f"Failed to write batch to InfluxDB: {e}") + self.connected = False + + def init_bridge(self, from_transport: transport_base): + """Initialize bridge - not needed for InfluxDB output""" + pass + + def __del__(self): + """Cleanup on destruction - flush any remaining points""" + if self.batch_points: + self._flush_batch() + if self.client: + try: + self.client.close() + except Exception: + pass \ No newline at end of file diff --git a/classes/transports/json_out.py b/classes/transports/json_out.py new file mode 100644 index 0000000..51fe6d4 --- /dev/null +++ b/classes/transports/json_out.py @@ -0,0 +1,109 @@ +import json +import sys +from configparser import SectionProxy +from typing import TextIO + +from defs.common import strtobool + +from ..protocol_settings import Registry_Type, WriteMode, registry_map_entry +from .transport_base import transport_base + + +class json_out(transport_base): + ''' JSON output transport that writes data to a file or stdout ''' + output_file: str = "stdout" + pretty_print: bool = True + append_mode: bool = False + include_timestamp: bool = True + include_device_info: bool = True + + file_handle: TextIO = None + + def __init__(self, settings: SectionProxy): + self.output_file = settings.get("output_file", fallback=self.output_file) + self.pretty_print = strtobool(settings.get("pretty_print", fallback=self.pretty_print)) + self.append_mode = strtobool(settings.get("append_mode", fallback=self.append_mode)) + self.include_timestamp = strtobool(settings.get("include_timestamp", fallback=self.include_timestamp)) + self.include_device_info = strtobool(settings.get("include_device_info", fallback=self.include_device_info)) + + self.write_enabled = True # JSON output is always write-enabled + super().__init__(settings) + + def connect(self): + """Initialize the output file handle""" + self._log.info("json_out connect") + + if self.output_file.lower() == "stdout": + self.file_handle = sys.stdout + else: + try: + mode = "a" if self.append_mode else "w" + self.file_handle = open(self.output_file, mode, encoding='utf-8') + self.connected = True + except Exception as e: + self._log.error(f"Failed to open output file {self.output_file}: {e}") + self.connected = False + return + + self.connected = True + + def write_data(self, data: dict[str, str], from_transport: transport_base): + """Write data as JSON to the output file""" + if not self.write_enabled or not self.connected: + return + + self._log.info(f"write data from [{from_transport.transport_name}] to json_out transport") + self._log.info(data) + + # Prepare the JSON output structure + output_data = {} + + # Add device information if enabled + if self.include_device_info: + output_data["device"] = { + "identifier": from_transport.device_identifier, + "name": from_transport.device_name, + "manufacturer": from_transport.device_manufacturer, + "model": from_transport.device_model, + "serial_number": from_transport.device_serial_number, + "transport": from_transport.transport_name + } + + # Add timestamp if enabled + if self.include_timestamp: + import time + output_data["timestamp"] = time.time() + + # Add the actual data + output_data["data"] = data + + # Convert to JSON + if self.pretty_print: + json_string = json.dumps(output_data, indent=2, ensure_ascii=False) + else: + json_string = json.dumps(output_data, ensure_ascii=False) + + # Write to file + try: + if self.output_file.lower() != "stdout": + # For files, add a newline and flush + self.file_handle.write(json_string + "\n") + self.file_handle.flush() + else: + # For stdout, just print + print(json_string) + except Exception as e: + self._log.error(f"Failed to write to output: {e}") + self.connected = False + + def init_bridge(self, from_transport: transport_base): + """Initialize bridge - not needed for JSON output""" + pass + + def __del__(self): + """Cleanup file handle on destruction""" + if self.file_handle and self.output_file.lower() != "stdout": + try: + self.file_handle.close() + except: + pass \ No newline at end of file diff --git a/classes/transports/modbus_base.py b/classes/transports/modbus_base.py index d040746..1745cb5 100644 --- a/classes/transports/modbus_base.py +++ b/classes/transports/modbus_base.py @@ -56,8 +56,7 @@ def __init__(self, settings : "SectionProxy", protocolSettings : "protocol_setti self.analyze_protocol_enabled = settings.getboolean("analyze_protocol", fallback=self.analyze_protocol_enabled) self.analyze_protocol_save_load = settings.getboolean("analyze_protocol_save_load", fallback=self.analyze_protocol_save_load) - - #get defaults from protocol settings + # get defaults from protocol settings if "send_input_register" in self.protocolSettings.settings: self.send_input_register = strtobool(self.protocolSettings.settings["send_input_register"]) @@ -67,17 +66,13 @@ def __init__(self, settings : "SectionProxy", protocolSettings : "protocol_setti if "batch_delay" in self.protocolSettings.settings: self.modbus_delay = float(self.protocolSettings.settings["batch_delay"]) - #allow enable/disable of which registers to send + # allow enable/disable of which registers to send self.send_holding_register = settings.getboolean("send_holding_register", fallback=self.send_holding_register) self.send_input_register = settings.getboolean("send_input_register", fallback=self.send_input_register) self.modbus_delay = settings.getfloat(["batch_delay", "modbus_delay"], fallback=self.modbus_delay) self.modbus_delay_setting = self.modbus_delay - - if self.analyze_protocol_enabled: - self.connect() - self.analyze_protocol() - quit() + # Note: Connection and analyze_protocol will be called after subclass initialization is complete def init_after_connect(self): #from transport_base settings @@ -90,14 +85,23 @@ def init_after_connect(self): self.update_identifier() def connect(self): - if self.connected and self.first_connect: - self.first_connect = False - self.init_after_connect() + # Base class connect method - subclasses should override this + # to establish the actual hardware connection + pass def read_serial_number(self) -> str: + # First try to read "Serial Number" from input registers (for protocols like EG4 v58) + self._log.info("Looking for serial_number variable in input registers...") + serial_number = str(self.read_variable("Serial Number", Registry_Type.INPUT)) + self._log.info("read SN from input registers: " + serial_number) + if serial_number and serial_number != "None": + return serial_number + + # Then try holding registers (for other protocols) + self._log.info("Looking for serial_number variable in holding registers...") serial_number = str(self.read_variable("Serial Number", Registry_Type.HOLDING)) - self._log.info("read SN: " +serial_number) - if serial_number: + self._log.info("read SN from holding registers: " + serial_number) + if serial_number and serial_number != "None": return serial_number sn2 = "" @@ -121,8 +125,8 @@ def read_serial_number(self) -> str: time.sleep(self.modbus_delay*2) #sleep inbetween requests so modbus can rest - print(sn2) - print(sn3) + self._log.debug(f"Serial number sn2: {sn2}") + self._log.debug(f"Serial number sn3: {sn3}") if not re.search("[^a-zA-Z0-9_]", sn2) : serial_number = sn2 @@ -136,15 +140,27 @@ def enable_write(self): self._log.info("Validating Protocol for Writing") self.write_enabled = False - score_percent = self.validate_protocol(Registry_Type.HOLDING) - if(score_percent > 90): - self.write_enabled = True - self._log.warning("enable write - validation passed") - elif self.write_mode == TransportWriteMode.RELAXED: - self.write_enabled = True - self._log.warning("enable write - WARNING - RELAXED MODE") - else: - self._log.error("enable write FAILED - WRITE DISABLED") + + # Add a small delay to ensure device is ready, especially during initialization + time.sleep(self.modbus_delay * 2) + + try: + score_percent = self.validate_protocol(Registry_Type.HOLDING) + if(score_percent > 90): + self.write_enabled = True + self._log.warning("enable write - validation passed") + elif self.write_mode == TransportWriteMode.RELAXED: + self.write_enabled = True + self._log.warning("enable write - WARNING - RELAXED MODE") + else: + self._log.error("enable write FAILED - WRITE DISABLED") + except Exception as e: + self._log.error(f"enable write FAILED due to error: {str(e)}") + if self.write_mode == TransportWriteMode.RELAXED: + self.write_enabled = True + self._log.warning("enable write - WARNING - RELAXED MODE (due to validation error)") + else: + self._log.error("enable write FAILED - WRITE DISABLED") @@ -187,6 +203,18 @@ def read_data(self) -> dict[str, str]: info.update(new_info) + # Check for serial number variables and promote to device_serial_number + if info: + # Look for serial number variable + if "serial_number" in info: + value = info["serial_number"] + if value and value != "None" and str(value).strip(): + # Found a valid serial number, promote it + if self.device_serial_number != str(value): + self._log.info(f"Promoting parsed serial number: {value} (from variable: serial_number)") + self.device_serial_number = str(value) + self.update_identifier() + if not info: self._log.info("Register is Empty; transport busy?") @@ -232,23 +260,34 @@ def analyze_protocol(self, settings_dir : str = "protocols"): print(file) protocol_names.append(file) - max_input_register : int = 0 - max_holding_register : int = 0 + # Use the configured protocol's register ranges instead of maximum from all protocols + # This prevents trying to read non-existent registers from other protocols + if hasattr(self, 'protocolSettings') and self.protocolSettings: + max_input_register = self.protocolSettings.registry_map_size[Registry_Type.INPUT] + max_holding_register = self.protocolSettings.registry_map_size[Registry_Type.HOLDING] + self._log.debug(f"Using configured protocol register ranges: input={max_input_register}, holding={max_holding_register}") + + # Use the configured protocol for analysis + protocols[self.protocolSettings.protocol] = self.protocolSettings + else: + # Fallback to calculating max from all protocols (original behavior) + max_input_register : int = 0 + max_holding_register : int = 0 - for name in protocol_names: - protocols[name] = protocol_settings(name) + for name in protocol_names: + protocols[name] = protocol_settings(name) - if protocols[name].registry_map_size[Registry_Type.INPUT] > max_input_register: - max_input_register = protocols[name].registry_map_size[Registry_Type.INPUT] + if protocols[name].registry_map_size[Registry_Type.INPUT] > max_input_register: + max_input_register = protocols[name].registry_map_size[Registry_Type.INPUT] - if protocols[name].registry_map_size[Registry_Type.HOLDING] > max_holding_register: - max_holding_register = protocols[name].registry_map_size[Registry_Type.HOLDING] + if protocols[name].registry_map_size[Registry_Type.HOLDING] > max_holding_register: + max_holding_register = protocols[name].registry_map_size[Registry_Type.HOLDING] - print("max input register: ", max_input_register) - print("max holding register: ", max_holding_register) + self._log.debug(f"max input register: {max_input_register}") + self._log.debug(f"max holding register: {max_holding_register}") self.modbus_delay = self.modbus_delay #decrease delay because can probably get away with it due to lots of small reads - print("read INPUT Registers: ") + self._log.debug("read INPUT Registers: ") input_save_path = "input_registry.json" holding_save_path = "holding_registry.json" @@ -267,8 +306,8 @@ def analyze_protocol(self, settings_dir : str = "protocols"): else: #perform registry scan ##batch_size = 1, read registers one by one; if out of bound. it just returns error - input_registry = self.read_modbus_registers(start=0, end=max_input_register, batch_size=45, registry_type=Registry_Type.INPUT) - holding_registry = self.read_modbus_registers(start=0, end=max_holding_register, batch_size=45, registry_type=Registry_Type.HOLDING) + input_registry = self.read_modbus_registers(start=0, end=max_input_register, registry_type=Registry_Type.INPUT) + holding_registry = self.read_modbus_registers(start=0, end=max_holding_register, registry_type=Registry_Type.HOLDING) if self.analyze_protocol_save_load: #save results if enabled with open(input_save_path, "w") as file: @@ -278,14 +317,14 @@ def analyze_protocol(self, settings_dir : str = "protocols"): json.dump(holding_registry, file) #print results for debug - print("=== START INPUT REGISTER ===") + self._log.debug("=== START INPUT REGISTER ===") if input_registry: - print([(key, value) for key, value in input_registry.items()]) - print("=== END INPUT REGISTER ===") - print("=== START HOLDING REGISTER ===") + self._log.debug([(key, value) for key, value in input_registry.items()]) + self._log.debug("=== END INPUT REGISTER ===") + self._log.debug("=== START HOLDING REGISTER ===") if holding_registry: - print([(key, value) for key, value in holding_registry.items()]) - print("=== END HOLDING REGISTER ===") + self._log.debug([(key, value) for key, value in holding_registry.items()]) + self._log.debug("=== END HOLDING REGISTER ===") #very well possible the registers will be incomplete due to different hardware sizes #so dont assume they are set / complete @@ -369,9 +408,9 @@ def evaluate_score(entry : registry_map_entry, val): #print scores for name in sorted(protocol_scores, key=protocol_scores.get, reverse=True): - print("=== "+str(name)+" - "+str(protocol_scores[name])+" ===") - print("input register score: " + str(input_register_score[name]) + "; valid registers: "+str(input_valid_count[name])+" of " + str(len(protocols[name].get_registry_map(Registry_Type.INPUT)))) - print("holding register score : " + str(holding_register_score[name]) + "; valid registers: "+str(holding_valid_count[name])+" of " + str(len(protocols[name].get_registry_map(Registry_Type.HOLDING)))) + self._log.debug("=== "+str(name)+" - "+str(protocol_scores[name])+" ===") + self._log.debug("input register score: " + str(input_register_score[name]) + "; valid registers: "+str(input_valid_count[name])+" of " + str(len(protocols[name].get_registry_map(Registry_Type.INPUT)))) + self._log.debug("holding register score : " + str(holding_register_score[name]) + "; valid registers: "+str(holding_valid_count[name])+" of " + str(len(protocols[name].get_registry_map(Registry_Type.HOLDING)))) def write_variable(self, entry : registry_map_entry, value : str, registry_type : Registry_Type = Registry_Type.HOLDING): @@ -497,16 +536,57 @@ def read_variable(self, variable_name : str, registry_type : Registry_Type, entr start = entry.register end = entry.register else: - start = entry.register + start = min(entry.concatenate_registers) end = max(entry.concatenate_registers) registers = self.read_modbus_registers(start=start, end=end, registry_type=registry_type) - results = self.protocolSettings.process_registery(registers, registry_map) - return results[entry.variable_name] + + # Special handling for concatenated ASCII variables (like serial numbers) + if entry.concatenate and entry.data_type == Data_Type.ASCII: + concatenated_value = "" + + # For serial numbers, we need to extract 8-bit ASCII characters from 16-bit registers + # Each register contains two ASCII characters (low byte and high byte) + for reg in entry.concatenate_registers: + if reg in registers: + reg_value = registers[reg] + # Extract low byte (bits 0-7) and high byte (bits 8-15) + low_byte = reg_value & 0xFF + high_byte = (reg_value >> 8) & 0xFF + + # Convert each byte to ASCII character + low_char = chr(low_byte) + high_char = chr(high_byte) + + concatenated_value += low_char + high_char + else: + self._log.warning(f"Register {reg} not found in registry") + + result = concatenated_value.replace("\x00", " ").strip() + return result + + # Only process the specific entry, not the entire registry map + results = self.protocolSettings.process_registery(registers, [entry]) + result = results.get(entry.variable_name) + return result + else: + self._log.warning(f"Entry not found for variable: {variable_name}") + return None - def read_modbus_registers(self, ranges : list[tuple] = None, start : int = 0, end : int = None, batch_size : int = 45, registry_type : Registry_Type = Registry_Type.INPUT ) -> dict: + def read_modbus_registers(self, ranges : list[tuple] = None, start : int = 0, end : int = None, batch_size : int = None, registry_type : Registry_Type = Registry_Type.INPUT ) -> dict: ''' maybe move this to transport_base ?''' + # Get batch_size from protocol settings if not provided + if batch_size is None: + if hasattr(self, 'protocolSettings') and self.protocolSettings: + batch_size = self.protocolSettings.settings.get("batch_size", 45) + try: + batch_size = int(batch_size) + except (ValueError, TypeError): + batch_size = 45 + else: + batch_size = 45 + if not ranges: #ranges is empty, use min max if start == 0 and end is None: return {} #empty @@ -533,22 +613,24 @@ def read_modbus_registers(self, ranges : list[tuple] = None, start : int = 0, en time.sleep(self.modbus_delay) #sleep for 1ms to give bus a rest #manual recommends 1s between commands isError = False + register = None # Initialize register variable try: register = self.read_registers(range[0], range[1], registry_type=registry_type) except ModbusIOException as e: - self._log.error("ModbusIOException : ", e.error_code) - if e.error_code == 4: #if no response; probably time out. retry with increased delay - isError = True - else: - isError = True #other erorrs. ie Failed to connect[ModbusSerialClient(rtu baud[9600])] + self._log.error("ModbusIOException: " + str(e)) + # In pymodbus 3.7+, ModbusIOException doesn't have error_code attribute + # Treat all ModbusIOException as retryable errors + isError = True - if isinstance(register, bytes) or register.isError() or isError: #sometimes weird errors are handled incorrectly and response is a ascii error string - if isinstance(register, bytes): + if register is None or isinstance(register, bytes) or (hasattr(register, 'isError') and register.isError()) or isError: #sometimes weird errors are handled incorrectly and response is a ascii error string + if register is None: + self._log.error("No response received from modbus device") + elif isinstance(register, bytes): self._log.error(register.decode("utf-8")) else: - self._log.error(register.__str__) + self._log.error(str(register)) self.modbus_delay += self.modbus_delay_increament #increase delay, error is likely due to modbus being busy if self.modbus_delay > 60: #max delay. 60 seconds between requests should be way over kill if it happens @@ -573,12 +655,13 @@ def read_modbus_registers(self, ranges : list[tuple] = None, start : int = 0, en if retry < 0: retry = 0 - - #combine registers into "registry" - i = -1 - while(i := i + 1 ) < range[1]: - #print(str(i) + " => " + str(i+range[0])) - registry[i+range[0]] = register.registers[i] + # Only process registers if we have a valid response + if register is not None and hasattr(register, 'registers') and register.registers is not None: + #combine registers into "registry" + i = -1 + while(i := i + 1 ) < range[1]: + #print(str(i) + " => " + str(i+range[0])) + registry[i+range[0]] = register.registers[i] return registry diff --git a/classes/transports/modbus_rtu.py b/classes/transports/modbus_rtu.py index d4f9147..3d2eb71 100644 --- a/classes/transports/modbus_rtu.py +++ b/classes/transports/modbus_rtu.py @@ -24,63 +24,134 @@ class modbus_rtu(modbus_base): pymodbus_slave_arg = "unit" def __init__(self, settings : SectionProxy, protocolSettings : protocol_settings = None): - super().__init__(settings, protocolSettings=protocolSettings) - - self.port = settings.get("port", "") - if not self.port: - raise ValueError("Port is not set") - - self.port = find_usb_serial_port(self.port) - if not self.port: - raise ValueError("Port is not valid / not found") - - print("Serial Port : " + self.port + " = ", get_usb_serial_port_info(self.port)) #print for config convience - - if "baud" in self.protocolSettings.settings: - self.baudrate = strtoint(self.protocolSettings.settings["baud"]) - - self.baudrate = settings.getint("baudrate", self.baudrate) - - address : int = settings.getint("address", 0) - self.addresses = [address] - - # pymodbus compatability; unit was renamed to address - if "slave" in inspect.signature(ModbusSerialClient.read_holding_registers).parameters: - self.pymodbus_slave_arg = "slave" - - - # Get the signature of the __init__ method - init_signature = inspect.signature(ModbusSerialClient.__init__) - - client_str = self.port+"("+str(self.baudrate)+")" - - if client_str in modbus_base.clients: - self.client = modbus_base.clients[client_str] - return - - if "method" in init_signature.parameters: - self.client = ModbusSerialClient(method="rtu", port=self.port, - baudrate=int(self.baudrate), - stopbits=1, parity="N", bytesize=8, timeout=2 - ) - else: - self.client = ModbusSerialClient( - port=self.port, - baudrate=int(self.baudrate), - stopbits=1, parity="N", bytesize=8, timeout=2 - ) - - #add to clients - modbus_base.clients[client_str] = self.client + try: + super().__init__(settings, protocolSettings=protocolSettings) + self._log.debug("modbus_rtu.__init__ starting") + self._log.debug("super().__init__ completed") + + self.port = settings.get("port", "") + self._log.debug(f"Port from settings: '{self.port}'") + if not self.port: + raise ValueError("Port is not set") + + self.port = find_usb_serial_port(self.port) + self._log.debug(f"Port after find_usb_serial_port: '{self.port}'") + if not self.port: + raise ValueError("Port is not valid / not found") + + print("Serial Port : " + self.port + " = ", get_usb_serial_port_info(self.port)) #print for config convience + + if "baud" in self.protocolSettings.settings: + self.baudrate = strtoint(self.protocolSettings.settings["baud"]) + + # Check for baud rate in config settings (look for both 'baud' and 'baudrate') + if "baud" in settings: + self.baudrate = settings.getint("baud") + self._log.debug(f"Using baud rate from config 'baud': {self.baudrate}") + elif "baudrate" in settings: + self.baudrate = settings.getint("baudrate") + self._log.debug(f"Using baud rate from config 'baudrate': {self.baudrate}") + else: + self._log.debug(f"Using default baud rate: {self.baudrate}") + + address : int = settings.getint("address", 0) + self.addresses = [address] + + # Get the signature of the __init__ method + init_signature = inspect.signature(ModbusSerialClient.__init__) + + client_str = self.port+"("+str(self.baudrate)+")" + self._log.debug(f"Client cache key: {client_str}") + self._log.debug(f"Existing clients in cache: {list(modbus_base.clients.keys())}") + + if client_str in modbus_base.clients: + self._log.debug(f"Using existing client from cache: {client_str}") + self.client = modbus_base.clients[client_str] + # Set compatibility flag based on existing client + self._set_compatibility_flag() + else: + self._log.debug(f"Creating new client with baud rate: {self.baudrate}") + if "method" in init_signature.parameters: + self.client = ModbusSerialClient(method="rtu", port=self.port, + baudrate=int(self.baudrate), + stopbits=1, parity="N", bytesize=8, timeout=2 + ) + else: + self.client = ModbusSerialClient( + port=self.port, + baudrate=int(self.baudrate), + stopbits=1, parity="N", bytesize=8, timeout=2 + ) + + # Set compatibility flag based on created client + self._set_compatibility_flag() + + #add to clients + modbus_base.clients[client_str] = self.client + self._log.debug(f"Added client to cache: {client_str}") + + self._log.debug("modbus_rtu.__init__ completed successfully") + + # Handle analyze_protocol after initialization is complete + if self.analyze_protocol_enabled: + self._log.debug("analyze_protocol enabled, connecting and analyzing...") + # Connect to the device first + self.connect() + + # Call init_after_connect after connection + if self.connected and self.first_connect: + self.first_connect = False + self.init_after_connect() + + # Now run protocol analysis + self.analyze_protocol() + quit() + + except Exception as e: + if hasattr(self, '_log') and self._log: + self._log.debug(f"Exception in modbus_rtu.__init__: {e}") + else: + print(f"Exception in modbus_rtu.__init__: {e}") + import traceback + traceback.print_exc() + raise + + def _set_compatibility_flag(self): + """Determine the correct parameter name for slave/unit based on pymodbus version""" + self.pymodbus_slave_arg = None + + try: + # For pymodbus 3.7+, we don't need unit/slave parameter + import pymodbus + version = pymodbus.__version__ + + # pymodbus 3.7+ doesn't need slave/unit parameter for most operations + if version.startswith('3.'): + self.pymodbus_slave_arg = None + else: + # Fallback for any other versions - assume newer API + self.pymodbus_slave_arg = None + + except (ImportError, AttributeError): + # If we can't determine version, assume newer API (3.7+) + self.pymodbus_slave_arg = None def read_registers(self, start, count=1, registry_type : Registry_Type = Registry_Type.INPUT, **kwargs): - if "unit" not in kwargs: - kwargs = {"unit": int(self.addresses[0]), **kwargs} - - #compatability - if self.pymodbus_slave_arg != "unit": - kwargs["slave"] = kwargs.pop("unit") + # Only add unit/slave parameter if the pymodbus version supports it + if self.pymodbus_slave_arg is not None: + if self.pymodbus_slave_arg not in kwargs: + # Ensure addresses is initialized + if not hasattr(self, 'addresses') or not self.addresses: + # Try to get address from settings if not already set + if hasattr(self, 'settings'): + address = self.settings.getint("address", 0) + self.addresses = [address] + else: + # Fallback to default address + self.addresses = [1] + + kwargs[self.pymodbus_slave_arg] = int(self.addresses[0]) if registry_type == Registry_Type.INPUT: return self.client.read_input_registers(address=start, count=count, **kwargs) @@ -91,15 +162,56 @@ def write_register(self, register : int, value : int, **kwargs): if not self.write_enabled: return - if "unit" not in kwargs: - kwargs = {"unit": self.addresses[0], **kwargs} - - #compatability - if self.pymodbus_slave_arg != "unit": - kwargs["slave"] = kwargs.pop("unit") + # Only add unit/slave parameter if the pymodbus version supports it + if self.pymodbus_slave_arg is not None: + if self.pymodbus_slave_arg not in kwargs: + # Ensure addresses is initialized + if not hasattr(self, 'addresses') or not self.addresses: + # Try to get address from settings if not already set + if hasattr(self, 'settings'): + address = self.settings.getint("address", 0) + self.addresses = [address] + else: + # Fallback to default address + self.addresses = [1] + + kwargs[self.pymodbus_slave_arg] = self.addresses[0] self.client.write_register(register, value, **kwargs) #function code 0x06 writes to holding register def connect(self): + self._log.debug("modbus_rtu.connect() called") + # Ensure client is initialized before trying to connect + if not hasattr(self, 'client') or self.client is None: + self._log.debug("Client not found, re-initializing...") + # Re-initialize the client if it wasn't set properly + client_str = self.port+"("+str(self.baudrate)+")" + + if client_str in modbus_base.clients: + self.client = modbus_base.clients[client_str] + else: + # Get the signature of the __init__ method + init_signature = inspect.signature(ModbusSerialClient.__init__) + + if "method" in init_signature.parameters: + self.client = ModbusSerialClient(method="rtu", port=self.port, + baudrate=int(self.baudrate), + stopbits=1, parity="N", bytesize=8, timeout=2 + ) + else: + self.client = ModbusSerialClient( + port=self.port, + baudrate=int(self.baudrate), + stopbits=1, parity="N", bytesize=8, timeout=2 + ) + + #add to clients + modbus_base.clients[client_str] = self.client + + # Set compatibility flag + self._set_compatibility_flag() + + self._log.debug(f"Attempting to connect to {self.port} at {self.baudrate} baud...") self.connected = self.client.connect() + self._log.debug(f"Connection result: {self.connected}") super().connect() diff --git a/classes/transports/modbus_tcp.py b/classes/transports/modbus_tcp.py index 594dda9..ee0cd71 100644 --- a/classes/transports/modbus_tcp.py +++ b/classes/transports/modbus_tcp.py @@ -26,44 +26,62 @@ def __init__(self, settings : SectionProxy, protocolSettings : protocol_settings self.port = settings.getint("port", self.port) - # pymodbus compatability; unit was renamed to address - if "slave" in inspect.signature(ModbusTcpClient.read_holding_registers).parameters: - self.pymodbus_slave_arg = "slave" - client_str = self.host+"("+str(self.port)+")" #check if client is already initialied if client_str in modbus_base.clients: self.client = modbus_base.clients[client_str] + # Set compatibility flag based on existing client + self._set_compatibility_flag() + super().__init__(settings, protocolSettings=protocolSettings) return self.client = ModbusTcpClient(host=self.host, port=self.port, timeout=7, retries=3) + # Set compatibility flag based on created client + self._set_compatibility_flag() + #add to clients modbus_base.clients[client_str] = self.client super().__init__(settings, protocolSettings=protocolSettings) + def _set_compatibility_flag(self): + """Determine the correct parameter name for slave/unit based on pymodbus version""" + self.pymodbus_slave_arg = None + + try: + # For pymodbus 3.7+, we don't need unit/slave parameter + import pymodbus + version = pymodbus.__version__ + + # pymodbus 3.7+ doesn't need slave/unit parameter for most operations + if version.startswith('3.'): + self.pymodbus_slave_arg = None + else: + # Fallback for any other versions - assume newer API + self.pymodbus_slave_arg = None + + except (ImportError, AttributeError): + # If we can't determine version, assume newer API (3.7+) + self.pymodbus_slave_arg = None + def write_register(self, register : int, value : int, **kwargs): if not self.write_enabled: return - if "unit" not in kwargs: - kwargs = {"unit": 1, **kwargs} - - #compatability - if self.pymodbus_slave_arg != "unit": - kwargs["slave"] = kwargs.pop("unit") + # Only add unit/slave parameter if the pymodbus version supports it + if self.pymodbus_slave_arg is not None: + if self.pymodbus_slave_arg not in kwargs: + kwargs[self.pymodbus_slave_arg] = 1 self.client.write_register(register, value, **kwargs) #function code 0x06 writes to holding register def read_registers(self, start, count=1, registry_type : Registry_Type = Registry_Type.INPUT, **kwargs): - if "unit" not in kwargs: - kwargs = {"unit": 1, **kwargs} - - #compatability - if self.pymodbus_slave_arg != "unit": - kwargs["slave"] = kwargs.pop("unit") + # Only add unit/slave parameter if the pymodbus version supports it + if self.pymodbus_slave_arg is not None: + if self.pymodbus_slave_arg not in kwargs: + kwargs[self.pymodbus_slave_arg] = 1 if registry_type == Registry_Type.INPUT: return self.client.read_input_registers(start, count, **kwargs ) diff --git a/classes/transports/serial_pylon.py b/classes/transports/serial_pylon.py index fe42ef9..21a1e23 100644 --- a/classes/transports/serial_pylon.py +++ b/classes/transports/serial_pylon.py @@ -120,6 +120,18 @@ def read_data(self): info = self.protocolSettings.process_registery({entry.register : raw}, map=registry_map) + # Check for serial number variables and promote to device_serial_number + if info: + # Look for serial number variable + if "serial_number" in info: + value = info["serial_number"] + if value and value != "None" and str(value).strip(): + # Found a valid serial number, promote it + if self.device_serial_number != str(value): + self._log.info(f"Promoting parsed serial number: {value} (from variable: serial_number)") + self.device_serial_number = str(value) + self.update_identifier() + if not info: self._log.info("Data is Empty; Serial Pylon Transport busy?") diff --git a/config.influxdb.example b/config.influxdb.example new file mode 100644 index 0000000..c3a07a8 --- /dev/null +++ b/config.influxdb.example @@ -0,0 +1,24 @@ +# +[influxdb_output] +transport = influxdb_out +host = localhost +port = 8086 +database = solar +username = +password = +measurement = device_data +include_timestamp = true +include_device_info = true +batch_size = 100 +batch_timeout = 10.0 +log_level = INFO + +# Example bridge configuration +[modbus_rtu_source] +type = modbus_rtu +port = /dev/ttyUSB0 +baudrate = 9600 +protocol_version = growatt_2020_v1.24 +device_serial_number = 123456789 +bridge = influxdb_output +# diff --git a/config.json_out.example b/config.json_out.example new file mode 100644 index 0000000..9772b9a --- /dev/null +++ b/config.json_out.example @@ -0,0 +1,43 @@ +[general] +log_level = INFO + +[transport.modbus_input] +# Modbus input transport - reads data from device +protocol_version = v0.14 +address = 1 +port = /dev/ttyUSB0 +baudrate = 9600 +bridge = transport.json_output +read_interval = 10 + +manufacturer = TestDevice +model = Test Model +serial_number = TEST123 + +[transport.json_output] +# JSON output transport - writes data to stdout +transport = json_out +output_file = stdout +pretty_print = true +include_timestamp = true +include_device_info = true + +# Alternative configurations (uncomment to use): + +# [transport.json_file] +# # JSON output to file +# transport = json_out +# output_file = /var/log/inverter_data.json +# pretty_print = false +# append_mode = true +# include_timestamp = true +# include_device_info = false + +# [transport.json_compact] +# # Compact JSON output +# transport = json_out +# output_file = /tmp/compact_data.json +# pretty_print = false +# append_mode = false +# include_timestamp = true +# include_device_info = false \ No newline at end of file diff --git a/documentation/README.md b/documentation/README.md index f756603..53ce9b3 100644 --- a/documentation/README.md +++ b/documentation/README.md @@ -32,6 +32,7 @@ This README file contains an index of all files in the documentation directory. - [modbus_rtu_to_modbus_tcp.md](usage/configuration_examples/modbus_rtu_to_modbus_tcp.md) - ModBus RTU to ModBus TCP - [modbus_rtu_to_mqtt.md](usage/configuration_examples/modbus_rtu_to_mqtt.md) - ModBus RTU to MQTT +- [influxdb_example.md](usage/configuration_examples/influxdb_example.md) - ModBus RTU to InfluxDB **3rdparty** diff --git a/documentation/usage/configuration_examples/influxdb_example.md b/documentation/usage/configuration_examples/influxdb_example.md new file mode 100644 index 0000000..56a500a --- /dev/null +++ b/documentation/usage/configuration_examples/influxdb_example.md @@ -0,0 +1,177 @@ +# InfluxDB Output Transport + +The InfluxDB output transport allows you to send data from your devices directly to an InfluxDB v1 server for time-series data storage and visualization. + +## Features + +- **Batch Writing**: Efficiently batches data points to reduce network overhead +- **Automatic Database Creation**: Creates the database if it doesn't exist +- **Device Information Tags**: Includes device metadata as InfluxDB tags for easy querying +- **Flexible Data Types**: Automatically converts data to appropriate InfluxDB field types +- **Configurable Timeouts**: Adjustable batch size and timeout settings + +## Configuration + +### Basic Configuration + +```ini +[influxdb_output] +type = influxdb_out +host = localhost +port = 8086 +database = solar +measurement = device_data +``` + +### Advanced Configuration + +```ini +[influxdb_output] +type = influxdb_out +host = localhost +port = 8086 +database = solar +username = admin +password = your_password +measurement = device_data +include_timestamp = true +include_device_info = true +batch_size = 100 +batch_timeout = 10.0 +log_level = INFO +``` + +### Configuration Options + +| Option | Default | Description | +|--------|---------|-------------| +| `host` | `localhost` | InfluxDB server hostname or IP address | +| `port` | `8086` | InfluxDB server port | +| `database` | `solar` | Database name (will be created if it doesn't exist) | +| `username` | `` | Username for authentication (optional) | +| `password` | `` | Password for authentication (optional) | +| `measurement` | `device_data` | InfluxDB measurement name | +| `include_timestamp` | `true` | Include timestamp in data points | +| `include_device_info` | `true` | Include device information as tags | +| `batch_size` | `100` | Number of points to batch before writing | +| `batch_timeout` | `10.0` | Maximum time (seconds) to wait before flushing batch | + +## Data Structure + +The InfluxDB output creates data points with the following structure: + +### Tags (if `include_device_info = true`) +- `device_identifier`: Device serial number (lowercase) +- `device_name`: Device name +- `device_manufacturer`: Device manufacturer +- `device_model`: Device model +- `device_serial_number`: Device serial number +- `transport`: Source transport name + +### Fields +All device data values are stored as fields. The transport automatically converts: +- Numeric strings to integers or floats +- Non-numeric strings remain as strings + +### Time +- Uses current timestamp in nanoseconds (if `include_timestamp = true`) +- Can be disabled for custom timestamp handling + +## Example Bridge Configuration + +```ini +# Source device (e.g., Modbus RTU) +[growatt_inverter] +type = modbus_rtu +port = /dev/ttyUSB0 +baudrate = 9600 +protocol_version = growatt_2020_v1.24 +device_serial_number = 123456789 +device_manufacturer = Growatt +device_model = SPH3000 +bridge = influxdb_output + +# InfluxDB output +[influxdb_output] +type = influxdb_out +host = localhost +port = 8086 +database = solar +measurement = inverter_data +``` + +## Installation + +1. Install the required dependency: + ```bash + pip install influxdb + ``` + +2. Or add to your requirements.txt: + ``` + influxdb + ``` + +## InfluxDB Setup + +1. Install InfluxDB v1: + ```bash + # Ubuntu/Debian + sudo apt install influxdb influxdb-client + sudo systemctl enable influxdb + sudo systemctl start influxdb + + # Or download from https://portal.influxdata.com/downloads/ + ``` + +2. Create a database (optional - will be created automatically): + ```bash + echo "CREATE DATABASE solar" | influx + ``` + +## Querying Data + +Once data is flowing, you can query it using InfluxDB's SQL-like query language: + +```sql +-- Show all measurements +SHOW MEASUREMENTS + +-- Query recent data +SELECT * FROM device_data WHERE time > now() - 1h + +-- Query specific device +SELECT * FROM device_data WHERE device_identifier = '123456789' + +-- Aggregate data +SELECT mean(value) FROM device_data WHERE field_name = 'battery_voltage' GROUP BY time(5m) +``` + +## Integration with Grafana + +InfluxDB data can be easily visualized in Grafana: + +1. Add InfluxDB as a data source in Grafana +2. Use the same connection details as your configuration +3. Create dashboards using InfluxDB queries + +## Troubleshooting + +### Connection Issues +- Verify InfluxDB is running: `systemctl status influxdb` +- Check firewall settings for port 8086 +- Verify host and port configuration + +### Authentication Issues +- Ensure username/password are correct +- Check InfluxDB user permissions + +### Data Not Appearing +- Check log levels for detailed error messages +- Verify database exists and is accessible +- Check batch settings - data may be buffered + +### Performance +- Adjust `batch_size` and `batch_timeout` for your use case +- Larger batches reduce network overhead but increase memory usage +- Shorter timeouts provide more real-time data but increase network traffic \ No newline at end of file diff --git a/documentation/usage/configuration_examples/json_out_example.md b/documentation/usage/configuration_examples/json_out_example.md new file mode 100644 index 0000000..14d04af --- /dev/null +++ b/documentation/usage/configuration_examples/json_out_example.md @@ -0,0 +1,144 @@ +# JSON Output Transport + +The `json_out` transport outputs data in JSON format to either a file or stdout. This is useful for logging, debugging, or integrating with other systems that consume JSON data. + +## Configuration + +### Basic Configuration + +```ini +[transport.json_output] +transport = json_out +# Output to stdout (default) +output_file = stdout +# Pretty print the JSON (default: true) +pretty_print = true +# Include timestamp in output (default: true) +include_timestamp = true +# Include device information (default: true) +include_device_info = true +``` + +### File Output Configuration + +```ini +[transport.json_output] +transport = json_out +# Output to a file +output_file = /path/to/output.json +# Append to file instead of overwriting (default: false) +append_mode = false +pretty_print = true +include_timestamp = true +include_device_info = true +``` + +### Bridged Configuration Example + +```ini +[transport.modbus_input] +# Modbus input transport +protocol_version = v0.14 +address = 1 +port = /dev/ttyUSB0 +baudrate = 9600 +bridge = transport.json_output +read_interval = 10 + +[transport.json_output] +# JSON output transport +transport = json_out +output_file = /var/log/inverter_data.json +pretty_print = false +append_mode = true +include_timestamp = true +include_device_info = true +``` + +## Configuration Options + +| Option | Type | Default | Description | +|--------|------|---------|-------------| +| `output_file` | string | `stdout` | Output destination. Use `stdout` for console output or a file path | +| `pretty_print` | boolean | `true` | Whether to format JSON with indentation | +| `append_mode` | boolean | `false` | Whether to append to file instead of overwriting | +| `include_timestamp` | boolean | `true` | Whether to include Unix timestamp in output | +| `include_device_info` | boolean | `true` | Whether to include device metadata in output | + +## Output Format + +The JSON output includes the following structure: + +```json +{ + "device": { + "identifier": "device_serial", + "name": "Device Name", + "manufacturer": "Manufacturer", + "model": "Model", + "serial_number": "Serial Number", + "transport": "transport_name" + }, + "timestamp": 1703123456.789, + "data": { + "variable_name": "value", + "another_variable": "another_value" + } +} +``` + +### Compact Output Example + +With `pretty_print = false` and `include_device_info = false`: + +```json +{"timestamp":1703123456.789,"data":{"battery_voltage":"48.5","battery_current":"2.1"}} +``` + +### File Output with Append Mode + +When using `append_mode = true`, each data read will be written as a separate JSON object on a new line, making it suitable for log files or streaming data processing. + +## Use Cases + +1. **Debugging**: Output data to console for real-time monitoring +2. **Logging**: Write data to log files for historical analysis +3. **Integration**: Feed data to other systems that consume JSON +4. **Data Collection**: Collect data for analysis or backup purposes + +## Examples + +### Console Output for Debugging + +```ini +[transport.debug_output] +transport = json_out +output_file = stdout +pretty_print = true +include_timestamp = true +include_device_info = true +``` + +### Log File for Data Collection + +```ini +[transport.data_log] +transport = json_out +output_file = /var/log/inverter_data.log +pretty_print = false +append_mode = true +include_timestamp = true +include_device_info = false +``` + +### Compact File Output + +```ini +[transport.compact_output] +transport = json_out +output_file = /tmp/inverter_data.json +pretty_print = false +append_mode = false +include_timestamp = true +include_device_info = false +``` \ No newline at end of file diff --git a/documentation/usage/transports.md b/documentation/usage/transports.md index 8e516c5..8819d03 100644 --- a/documentation/usage/transports.md +++ b/documentation/usage/transports.md @@ -159,6 +159,212 @@ the writable topics are given a prefix of "/write/" ## MQTT Write by default mqtt writes data from the bridged transport. +# JSON Output +``` +###required +transport = json_out +``` + +``` +###optional +output_file = stdout +pretty_print = true +append_mode = false +include_timestamp = true +include_device_info = true +``` + +## JSON Output Configuration + +### output_file +Specifies the output destination. Use `stdout` for console output or provide a file path. +``` +output_file = stdout +output_file = /var/log/inverter_data.json +``` + +### pretty_print +Whether to format JSON with indentation for readability. +``` +pretty_print = true +``` + +### append_mode +Whether to append to file instead of overwriting. Useful for log files. +``` +append_mode = false +``` + +### include_timestamp +Whether to include Unix timestamp in the JSON output. +``` +include_timestamp = true +``` + +### include_device_info +Whether to include device metadata (identifier, name, manufacturer, etc.) in the JSON output. +``` +include_device_info = true +``` + +## JSON Output Format + +The JSON output includes the following structure: + +```json +{ + "device": { + "identifier": "device_serial", + "name": "Device Name", + "manufacturer": "Manufacturer", + "model": "Model", + "serial_number": "Serial Number", + "transport": "transport_name" + }, + "timestamp": 1703123456.789, + "data": { + "variable_name": "value", + "another_variable": "another_value" + } +} +``` + +## JSON Output Use Cases + +1. **Debugging**: Output data to console for real-time monitoring +2. **Logging**: Write data to log files for historical analysis +3. **Integration**: Feed data to other systems that consume JSON +4. **Data Collection**: Collect data for analysis or backup purposes + +# InfluxDB Output +``` +###required +transport = influxdb_out +host = +port = +database = +``` + +``` +###optional +username = +password = +measurement = device_data +include_timestamp = true +include_device_info = true +batch_size = 100 +batch_timeout = 10.0 +``` + +## InfluxDB Output Configuration + +### host +InfluxDB server hostname or IP address. +``` +host = localhost +host = 192.168.1.100 +``` + +### port +InfluxDB server port (default: 8086). +``` +port = 8086 +``` + +### database +Database name. Will be created automatically if it doesn't exist. +``` +database = solar +database = inverter_data +``` + +### username +Username for authentication (optional). +``` +username = admin +``` + +### password +Password for authentication (optional). +``` +password = your_password +``` + +### measurement +InfluxDB measurement name for storing data points. +``` +measurement = device_data +measurement = inverter_metrics +``` + +### include_timestamp +Whether to include timestamp in data points. +``` +include_timestamp = true +``` + +### include_device_info +Whether to include device metadata as InfluxDB tags. +``` +include_device_info = true +``` + +### batch_size +Number of data points to batch before writing to InfluxDB. +``` +batch_size = 100 +``` + +### batch_timeout +Maximum time (seconds) to wait before flushing batch. +``` +batch_timeout = 10.0 +``` + +## InfluxDB Data Structure + +The InfluxDB output creates data points with the following structure: + +### Tags (if `include_device_info = true`) +- `device_identifier`: Device serial number (lowercase) +- `device_name`: Device name +- `device_manufacturer`: Device manufacturer +- `device_model`: Device model +- `device_serial_number`: Device serial number +- `transport`: Source transport name + +### Fields +All device data values are stored as fields. The transport automatically converts: +- Numeric strings to integers or floats +- Non-numeric strings remain as strings + +### Time +- Uses current timestamp in nanoseconds (if `include_timestamp = true`) +- Can be disabled for custom timestamp handling + +## InfluxDB Output Use Cases + +1. **Time-Series Data Storage**: Store historical device data for analysis +2. **Grafana Integration**: Visualize data with Grafana dashboards +3. **Data Analytics**: Perform time-series analysis and trending +4. **Monitoring**: Set up alerts and monitoring based on data thresholds + +## Example InfluxDB Queries + +```sql +-- Show all measurements +SHOW MEASUREMENTS + +-- Query recent data +SELECT * FROM device_data WHERE time > now() - 1h + +-- Query specific device +SELECT * FROM device_data WHERE device_identifier = '123456789' + +-- Aggregate data +SELECT mean(value) FROM device_data WHERE field_name = 'battery_voltage' GROUP BY time(5m) +``` + # ModBus_RTU ``` ###required diff --git a/pytests/test_influxdb_out.py b/pytests/test_influxdb_out.py new file mode 100644 index 0000000..d94f4e6 --- /dev/null +++ b/pytests/test_influxdb_out.py @@ -0,0 +1,116 @@ +#!/usr/bin/env python3 +""" +Test for InfluxDB output transport +""" + +import unittest +from unittest.mock import Mock, patch, MagicMock +from configparser import ConfigParser + +from classes.transports.influxdb_out import influxdb_out + + +class TestInfluxDBOut(unittest.TestCase): + """Test cases for InfluxDB output transport""" + + def setUp(self): + """Set up test fixtures""" + self.config = ConfigParser() + self.config.add_section('influxdb_output') + self.config.set('influxdb_output', 'type', 'influxdb_out') + self.config.set('influxdb_output', 'host', 'localhost') + self.config.set('influxdb_output', 'port', '8086') + self.config.set('influxdb_output', 'database', 'test_db') + + @patch('classes.transports.influxdb_out.InfluxDBClient') + def test_connect_success(self, mock_influxdb_client): + """Test successful connection to InfluxDB""" + # Mock the InfluxDB client + mock_client = Mock() + mock_influxdb_client.return_value = mock_client + mock_client.get_list_database.return_value = [{'name': 'test_db'}] + + transport = influxdb_out(self.config['influxdb_output']) + transport.connect() + + self.assertTrue(transport.connected) + mock_influxdb_client.assert_called_once_with( + host='localhost', + port=8086, + username=None, + password=None, + database='test_db' + ) + + @patch('classes.transports.influxdb_out.InfluxDBClient') + def test_connect_database_creation(self, mock_influxdb_client): + """Test database creation when it doesn't exist""" + # Mock the InfluxDB client + mock_client = Mock() + mock_influxdb_client.return_value = mock_client + mock_client.get_list_database.return_value = [{'name': 'other_db'}] + + transport = influxdb_out(self.config['influxdb_output']) + transport.connect() + + self.assertTrue(transport.connected) + mock_client.create_database.assert_called_once_with('test_db') + + @patch('classes.transports.influxdb_out.InfluxDBClient') + def test_write_data_batching(self, mock_influxdb_client): + """Test data writing and batching""" + # Mock the InfluxDB client + mock_client = Mock() + mock_influxdb_client.return_value = mock_client + mock_client.get_list_database.return_value = [{'name': 'test_db'}] + + transport = influxdb_out(self.config['influxdb_output']) + transport.connect() + + # Mock source transport + source_transport = Mock() + source_transport.transport_name = 'test_source' + source_transport.device_identifier = 'test123' + source_transport.device_name = 'Test Device' + source_transport.device_manufacturer = 'Test Manufacturer' + source_transport.device_model = 'Test Model' + source_transport.device_serial_number = '123456' + + # Test data + test_data = {'battery_voltage': '48.5', 'battery_current': '10.2'} + + transport.write_data(test_data, source_transport) + + # Check that data was added to batch + self.assertEqual(len(transport.batch_points), 1) + point = transport.batch_points[0] + + self.assertEqual(point['measurement'], 'device_data') + self.assertIn('device_identifier', point['tags']) + self.assertIn('battery_voltage', point['fields']) + self.assertIn('battery_current', point['fields']) + + # Check data type conversion + self.assertEqual(point['fields']['battery_voltage'], 48.5) + self.assertEqual(point['fields']['battery_current'], 10.2) + + def test_configuration_options(self): + """Test configuration option parsing""" + # Add more configuration options + self.config.set('influxdb_output', 'username', 'admin') + self.config.set('influxdb_output', 'password', 'secret') + self.config.set('influxdb_output', 'measurement', 'custom_measurement') + self.config.set('influxdb_output', 'batch_size', '50') + self.config.set('influxdb_output', 'batch_timeout', '5.0') + + transport = influxdb_out(self.config['influxdb_output']) + + self.assertEqual(transport.username, 'admin') + self.assertEqual(transport.password, 'secret') + self.assertEqual(transport.measurement, 'custom_measurement') + self.assertEqual(transport.batch_size, 50) + self.assertEqual(transport.batch_timeout, 5.0) + + +if __name__ == '__main__': + unittest.main() \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 2158c18..af0cebf 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,3 +2,4 @@ pymodbus==3.7.0 paho-mqtt pyserial python-can +influxdb