diff --git a/MULTIPROCESSING.md b/MULTIPROCESSING.md new file mode 100644 index 0000000..bbd703b --- /dev/null +++ b/MULTIPROCESSING.md @@ -0,0 +1,166 @@ +# Multiprocessing Support + +## Overview + +The Python Protocol Gateway now supports **automatic multiprocessing** when multiple transports are configured. This provides true concurrency and complete isolation between transports, solving the "transport busy" issues that can occur with single-threaded operation. + +## How It Works + +### Automatic Detection +- **Single Transport**: Uses the original single-threaded approach +- **Multiple Transports**: Automatically switches to multiprocessing mode + +### Process Isolation +Each transport runs in its own separate process, providing: +- **Complete isolation** of resources and state +- **True concurrent operation** - no waiting for other transports +- **Independent error handling** - one transport failure doesn't affect others +- **Automatic restart** of failed processes + +### Transport Types + +#### Input Transports (read_interval > 0) +- Modbus RTU, TCP, etc. +- Actively read data from devices +- Send data to output transports via bridging + +#### Output Transports (read_interval <= 0) +- InfluxDB, MQTT, etc. +- Receive data from input transports via bridging +- Process and forward data to external systems + +## Configuration Example + +```ini +[transport.0] +transport = modbus_rtu +protocol_version = eg4_v58 +address = 1 +port = /dev/ttyUSB0 +baudrate = 19200 +bridge = influxdb_output +read_interval = 10 + +[transport.1] +transport = modbus_rtu +protocol_version = eg4_v58 +address = 1 +port = /dev/ttyUSB1 +baudrate = 19200 +bridge = influxdb_output +read_interval = 10 + +[influxdb_output] +transport = influxdb_out +host = influxdb.example.com +port = 8086 +database = solar +measurement = eg4_data +``` + +## Inter-Process Communication + +### Bridging +- Uses `multiprocessing.Queue` for communication +- Automatic message routing between processes +- Non-blocking communication +- Source transport information preserved + +### Message Format +```python +{ + 'source_transport': 'transport.0', + 'target_transport': 'influxdb_output', + 'data': {...}, + 'source_transport_info': { + 'transport_name': 'transport.0', + 'device_identifier': '...', + 'device_manufacturer': '...', + 'device_model': '...', + 'device_serial_number': '...' + } +} +``` + +## Benefits + +### Performance +- **True concurrency** - no serialization delays +- **Independent timing** - each transport runs at its own interval +- **No resource contention** - each process has isolated resources + +### Reliability +- **Process isolation** - one transport failure doesn't affect others +- **Automatic restart** - failed processes are automatically restarted +- **Independent error handling** - each process handles its own errors + +### Scalability +- **Linear scaling** - performance scales with number of CPU cores +- **Resource efficiency** - only uses multiprocessing when needed +- **Memory isolation** - each process has its own memory space + +## Troubleshooting + +### Common Issues + +#### "Register is Empty; transport busy?" +- **Cause**: Shared state between transports in single-threaded mode +- **Solution**: Use multiprocessing mode (automatic with multiple transports) + +#### InfluxDB not receiving data +- **Cause**: Output transport not properly configured or started +- **Solution**: Ensure `influxdb_output` section has `transport = influxdb_out` + +#### Process restarting frequently +- **Cause**: Transport configuration error or device connection issue +- **Solution**: Check logs for specific error messages + +### Debugging + +#### Enable Debug Logging +```ini +[general] +log_level = DEBUG +``` + +#### Monitor Process Status +The gateway logs process creation and status: +``` +[2025-06-22 19:30:45] Starting multiprocessing mode with 3 transports +[2025-06-22 19:30:45] Input transports: 2, Output transports: 1 +[2025-06-22 19:30:45] Bridging detected - enabling inter-process communication +[2025-06-22 19:30:45] Started process for transport.0 (PID: 12345) +[2025-06-22 19:30:45] Started process for transport.1 (PID: 12346) +[2025-06-22 19:30:45] Started process for influxdb_output (PID: 12347) +``` + +## Testing + +Run the test script to verify multiprocessing functionality: +```bash +python pytests/test_multiprocessing.py +``` + +This will: +- Load your configuration +- Display transport information +- Run for 30 seconds to verify operation +- Show any errors or issues + +## Limitations + +1. **Memory Usage**: Each process uses additional memory +2. **Startup Time**: Slight delay when starting multiple processes +3. **Inter-Process Communication**: Bridge messages have small overhead +4. **Debugging**: More complex debugging due to multiple processes + +## Migration + +No migration required! Existing configurations will automatically benefit from multiprocessing when multiple transports are present. + +## Performance Tips + +1. **Stagger Read Intervals**: Use different read intervals to avoid resource contention +2. **Optimize Batch Sizes**: Adjust batch sizes for faster individual reads +3. **Monitor Logs**: Watch for process restarts indicating issues +4. **Resource Limits**: Ensure sufficient system resources for multiple processes diff --git a/classes/protocol_settings.py b/classes/protocol_settings.py index 4b7ef02..7dcda7e1 100644 --- a/classes/protocol_settings.py +++ b/classes/protocol_settings.py @@ -265,7 +265,7 @@ class protocol_settings: _log : logging.Logger = None - def __init__(self, protocol : str, transport_settings : "SectionProxy" = None, settings_dir : str = "protocols"): + def __init__(self, protocol : str, transport_settings : "SectionProxy" = None, settings_dir : str = "protocols", unique_id : str = None): #apply log level to logger self._log_level = getattr(logging, logging.getLevelName(logging.getLogger().getEffectiveLevel()), logging.INFO) @@ -275,6 +275,7 @@ def __init__(self, protocol : str, transport_settings : "SectionProxy" = None, s self.protocol = protocol self.settings_dir = settings_dir self.transport_settings = transport_settings + self.unique_id = unique_id # Store unique identifier for this instance #load variable mask self.variable_mask = [] diff --git a/classes/transports/influxdb_out.py b/classes/transports/influxdb_out.py index d0a9589..01c698a 100644 --- a/classes/transports/influxdb_out.py +++ b/classes/transports/influxdb_out.py @@ -2,6 +2,7 @@ from configparser import SectionProxy from typing import TextIO import time +import logging from defs.common import strtobool @@ -21,6 +22,7 @@ class influxdb_out(transport_base): include_device_info: bool = True batch_size: int = 100 batch_timeout: float = 10.0 + force_float: bool = True # Force all numeric fields to be floats to avoid InfluxDB type conflicts client = None batch_points = [] @@ -37,6 +39,7 @@ def __init__(self, settings: SectionProxy): self.include_device_info = strtobool(settings.get("include_device_info", fallback=self.include_device_info)) self.batch_size = settings.getint("batch_size", fallback=self.batch_size) self.batch_timeout = settings.getfloat("batch_timeout", fallback=self.batch_timeout) + self.force_float = strtobool(settings.get("force_float", fallback=self.force_float)) self.write_enabled = True # InfluxDB output is always write-enabled super().__init__(settings) @@ -103,6 +106,7 @@ def write_data(self, data: dict[str, str], from_transport: transport_base): for key, value in data.items(): # Check if we should force float formatting based on protocol settings should_force_float = False + unit_mod_found = None # Try to get registry entry from protocol settings to check unit_mod if hasattr(from_transport, 'protocolSettings') and from_transport.protocolSettings: @@ -110,7 +114,9 @@ def write_data(self, data: dict[str, str], from_transport: transport_base): for registry_type in [Registry_Type.INPUT, Registry_Type.HOLDING]: registry_map = from_transport.protocolSettings.get_registry_map(registry_type) for entry in registry_map: - if entry.variable_name == key: + # Match by variable_name (which is lowercase) + if entry.variable_name.lower() == key.lower(): + unit_mod_found = entry.unit_mod # If unit_mod is not 1.0, this value should be treated as float if entry.unit_mod != 1.0: should_force_float = True @@ -124,14 +130,28 @@ def write_data(self, data: dict[str, str], from_transport: transport_base): # Try to convert to float first float_val = float(value) - # If it's an integer but should be forced to float, or if it's already a float - if should_force_float or not float_val.is_integer(): + # Always use float for InfluxDB to avoid type conflicts + # InfluxDB is strict about field types - once a field is created as integer, + # it must always be integer. Using float avoids this issue. + if self.force_float: fields[key] = float_val else: - fields[key] = int(float_val) + # Only use integer if it's actually an integer and we're not forcing floats + if float_val.is_integer(): + fields[key] = int(float_val) + else: + fields[key] = float_val + + # Log data type conversion for debugging + if self._log.isEnabledFor(logging.DEBUG): + original_type = type(value).__name__ + final_type = type(fields[key]).__name__ + self._log.debug(f"Field {key}: {value} ({original_type}) -> {fields[key]} ({final_type}) [unit_mod: {unit_mod_found}]") + except (ValueError, TypeError): # If conversion fails, store as string fields[key] = str(value) + self._log.debug(f"Field {key}: {value} -> string (conversion failed)") # Create InfluxDB point point = { diff --git a/classes/transports/transport_base.py b/classes/transports/transport_base.py index e747102..aef23f9 100644 --- a/classes/transports/transport_base.py +++ b/classes/transports/transport_base.py @@ -112,7 +112,10 @@ def __init__(self, settings : "SectionProxy") -> None: #must load after settings self.protocol_version = settings.get("protocol_version") if self.protocol_version: - self.protocolSettings = protocol_settings(self.protocol_version, transport_settings=settings) + # Create a unique protocol settings instance for each transport to avoid shared state + unique_id = f"{self.transport_name}_{self.protocol_version}" + self._log.debug(f"Creating protocol settings with unique_id: {unique_id}") + self.protocolSettings = protocol_settings(self.protocol_version, transport_settings=settings, unique_id=unique_id) if self.protocolSettings: self.protocol_version = self.protocolSettings.protocol diff --git a/protocol_gateway.py b/protocol_gateway.py index 652be08..c515b4d 100644 --- a/protocol_gateway.py +++ b/protocol_gateway.py @@ -23,11 +23,15 @@ import os import sys import traceback +import multiprocessing from configparser import ConfigParser, NoOptionError from classes.protocol_settings import protocol_settings, registry_map_entry from classes.transports.transport_base import transport_base +# Global queue for inter-process communication +bridge_queue = None + __logo = """ ██████╗ ██╗ ██╗████████╗██╗ ██╗ ██████╗ ███╗ ██╗ @@ -90,6 +94,183 @@ def getfloat(self, section, option, *args, **kwargs): #bypass fallback bug return float(value) if value is not None else None +class SingleTransportGateway: + """ + Gateway class for running a single transport in its own process + """ + __log = None + __running = False + __transport = None + config_file = "" + __bridge_queue = None + + def __init__(self, config_file: str, transport_name: str, bridge_queue=None): + self.config_file = config_file + self.__bridge_queue = bridge_queue + + # Set up logging for this process + self.__log = logging.getLogger(f"single_transport_{transport_name}") + handler = logging.StreamHandler(sys.stdout) + formatter = logging.Formatter("[%(asctime)s] {%(filename)s:%(lineno)d} %(levelname)s - %(message)s") + handler.setFormatter(formatter) + self.__log.addHandler(handler) + self.__log.setLevel(logging.INFO) + + self.__log.info(f"Initializing single transport gateway for {transport_name}") + + # Load configuration + self.__settings = CustomConfigParser() + self.__settings.read(self.config_file) + + # Find and initialize the specific transport + if transport_name in self.__settings.sections(): + transport_cfg = self.__settings[transport_name] + transport_type = transport_cfg.get("transport", fallback="") + protocol_version = transport_cfg.get("protocol_version", fallback="") + + if not transport_type and not protocol_version: + raise ValueError("Missing Transport / Protocol Version") + + if not transport_type and protocol_version: + protocolSettings = protocol_settings(protocol_version) + if not transport_type and not protocolSettings.transport: + raise ValueError("Missing Transport") + if not transport_type: + transport_type = protocolSettings.transport + + # Import the module + module = importlib.import_module("classes.transports." + transport_type) + # Get the class from the module + cls = getattr(module, transport_type) + self.__transport = cls(transport_cfg) + + self.__log.info(f"Created transport: {self.__transport.type}:{self.__transport.transport_name}") + + # Connect the transport + self.__log.info(f"Connecting to {self.__transport.type}:{self.__transport.transport_name}...") + self.__transport.connect() + + else: + raise ValueError(f"Transport section '{transport_name}' not found in config") + + def handle_bridge_message(self, message): + """ + Handle bridge messages from other transports + """ + try: + source_transport = message.get('source_transport') + target_transport = message.get('target_transport') + data = message.get('data') + source_transport_info = message.get('source_transport_info', {}) + + # Check if this transport is the target + if target_transport == self.__transport.transport_name: + self.__log.debug(f"Received bridge message from {source_transport} with {len(data)} items") + + # Forward the data to this transport + if hasattr(self.__transport, 'write_data'): + # Create a mock transport object with the source transport info + class MockSourceTransport: + def __init__(self, info): + self.transport_name = info.get('transport_name', '') + self.device_identifier = info.get('device_identifier', '') + self.device_name = info.get('device_name', '') + self.device_manufacturer = info.get('device_manufacturer', '') + self.device_model = info.get('device_model', '') + self.device_serial_number = info.get('device_serial_number', '') + + source_transport_obj = MockSourceTransport(source_transport_info) + + # Call write_data with the correct parameters + self.__transport.write_data(data, source_transport_obj) + else: + self.__log.warning(f"Transport {self.__transport.transport_name} does not support write_data") + + except Exception as err: + self.__log.error(f"Error handling bridge message: {err}") + traceback.print_exc() + + def run(self): + """ + Run the single transport + """ + self.__running = True + self.__log.info(f"Starting single transport: {self.__transport.transport_name}") + + # Check if this is an output transport (no read_interval) + is_output_transport = (self.__transport.read_interval <= 0) + + if is_output_transport: + self.__log.info(f"Running output transport: {self.__transport.transport_name}") + # For output transports, just handle bridge messages + while self.__running: + try: + # Check for bridge messages + if self.__bridge_queue: + try: + while not self.__bridge_queue.empty(): + message = self.__bridge_queue.get_nowait() + self.handle_bridge_message(message) + except: + pass # Queue is empty or other error + + time.sleep(0.1) # Short sleep for output transports + + except Exception as err: + self.__log.error(f"Error in output transport {self.__transport.transport_name}: {err}") + traceback.print_exc() + else: + # For input transports, handle both reading and bridging + while self.__running: + try: + # Check for bridge messages + if self.__bridge_queue: + try: + while not self.__bridge_queue.empty(): + message = self.__bridge_queue.get_nowait() + self.handle_bridge_message(message) + except: + pass # Queue is empty or other error + + now = time.time() + if self.__transport.read_interval > 0 and now - self.__transport.last_read_time > self.__transport.read_interval: + self.__transport.last_read_time = now + + if not self.__transport.connected: + self.__transport.connect() + else: + info = self.__transport.read_data() + + if info: + self.__log.debug(f"Read data from {self.__transport.transport_name}: {len(info)} items") + + # Handle bridging if configured + if self.__transport.bridge and self.__bridge_queue: + self.__log.debug(f"Sending bridge message from {self.__transport.transport_name} to {self.__transport.bridge}") + bridge_message = { + 'source_transport': self.__transport.transport_name, + 'target_transport': self.__transport.bridge, + 'data': info, + 'source_transport_info': { + 'transport_name': self.__transport.transport_name, + 'device_identifier': getattr(self.__transport, 'device_identifier', ''), + 'device_name': getattr(self.__transport, 'device_name', ''), + 'device_manufacturer': getattr(self.__transport, 'device_manufacturer', ''), + 'device_model': getattr(self.__transport, 'device_model', ''), + 'device_serial_number': getattr(self.__transport, 'device_serial_number', '') + } + } + self.__bridge_queue.put(bridge_message) + else: + self.__log.debug(f"No data read from {self.__transport.transport_name}") + + except Exception as err: + self.__log.error(f"Error in transport {self.__transport.transport_name}: {err}") + traceback.print_exc() + + time.sleep(0.7) + + class Protocol_Gateway: """ Main class, implementing the Growatt / Inverters to MQTT functionality @@ -137,11 +318,12 @@ def __init__(self, config_file : str): logging.basicConfig(level=log_level) for section in self.__settings.sections(): - if section.startswith("transport"): - transport_cfg = self.__settings[section] - transport_type = transport_cfg.get("transport", fallback="") - protocol_version = transport_cfg.get("protocol_version", fallback="") + transport_cfg = self.__settings[section] + transport_type = transport_cfg.get("transport", fallback="") + protocol_version = transport_cfg.get("protocol_version", fallback="") + # Process sections that either start with "transport" OR have a transport field + if section.startswith("transport") or transport_type: if not transport_type and not protocol_version: raise ValueError("Missing Transport / Protocol Version") @@ -187,11 +369,33 @@ def on_message(self, transport : transport_base, entry : registry_map_entry, dat to_transport.write_data({entry.variable_name : data}, transport) break + def run_single_transport(self, transport_name: str, config_file: str, bridge_queue=None): + """ + Run a single transport in its own process + """ + try: + # Create a new gateway instance for this transport + single_gateway = SingleTransportGateway(config_file, transport_name, bridge_queue) + single_gateway.run() + except Exception as err: + print(f"Error in transport {transport_name}: {err}") + traceback.print_exc() + def run(self): """ run method, starts ModBus connection and mqtt connection """ + if len(self.__transports) <= 1: + # Use single-threaded approach for 1 or fewer transports + self.__run_single_threaded() + else: + # Use multiprocessing approach for multiple transports + self.__run_multiprocess() + def __run_single_threaded(self): + """ + Original single-threaded implementation + """ self.__running = True if False: @@ -226,6 +430,70 @@ def run(self): time.sleep(0.7) #change this in future. probably reduce to allow faster reads. + def __run_multiprocess(self): + """ + Multiprocessing implementation for multiple transports + """ + self.__log.info(f"Starting multiprocessing mode with {len(self.__transports)} transports") + + # Separate input and output transports + input_transports = [t for t in self.__transports if t.read_interval > 0] + output_transports = [t for t in self.__transports if t.read_interval <= 0] + + self.__log.info(f"Input transports: {len(input_transports)}, Output transports: {len(output_transports)}") + + # Check for bridging configuration + has_bridging = any(transport.bridge for transport in self.__transports) + if has_bridging: + self.__log.info("Bridging detected - enabling inter-process communication") + else: + self.__log.info("No bridging configured - transports will run independently") + + # Create a shared queue for inter-process communication + bridge_queue = multiprocessing.Queue() if has_bridging else None + + # Create processes for each transport + processes = [] + for transport in self.__transports: + process = multiprocessing.Process( + target=self.run_single_transport, + args=(transport.transport_name, self.config_file, bridge_queue), + name=f"transport_{transport.transport_name}" + ) + process.start() + processes.append(process) + self.__log.info(f"Started process for {transport.transport_name} (PID: {process.pid})") + + # Monitor processes + try: + while True: + # Check if any process has died + for i, process in enumerate(processes): + if not process.is_alive(): + transport_name = self.__transports[i].transport_name + self.__log.warning(f"Process for {transport_name} died, restarting...") + + # Restart the process + new_process = multiprocessing.Process( + target=self.run_single_transport, + args=(transport_name, self.config_file, bridge_queue), + name=f"transport_{transport_name}" + ) + new_process.start() + processes[i] = new_process + self.__log.info(f"Restarted process for {transport_name} (PID: {new_process.pid})") + + time.sleep(5) # Check every 5 seconds + + except KeyboardInterrupt: + self.__log.info("Shutting down multiprocessing mode...") + for process in processes: + process.terminate() + process.join(timeout=5) + if process.is_alive(): + process.kill() + self.__log.info("All processes terminated") + diff --git a/pytests/test_multiprocessing.py b/pytests/test_multiprocessing.py new file mode 100644 index 0000000..8b32c8d --- /dev/null +++ b/pytests/test_multiprocessing.py @@ -0,0 +1,71 @@ +#!/usr/bin/env python3 +""" +Test script for multiprocessing implementation +""" + +import os +import sys +import time + +# Add the current directory to the path so we can import the gateway +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) + +from protocol_gateway import Protocol_Gateway + +def test_multiprocessing(): + """ + Test the multiprocessing implementation + """ + print("Testing multiprocessing implementation...") + + # Test with a config file that has multiple transports + config_file = "config.cfg" + + if not os.path.exists(config_file): + print(f"Config file {config_file} not found. Please create a config with multiple transports.") + return + + try: + # Create the gateway + gateway = Protocol_Gateway(config_file) + + print(f"Found {len(gateway._Protocol_Gateway__transports)} transports:") + for transport in gateway._Protocol_Gateway__transports: + transport_type = "INPUT" if transport.read_interval > 0 else "OUTPUT" + print(f" - {transport.transport_name}: {transport_type} transport") + if hasattr(transport, 'bridge') and transport.bridge: + print(f" Bridges to: {transport.bridge}") + + # Test the multiprocessing mode + print("\nStarting multiprocessing test (will run for 30 seconds)...") + print("Press Ctrl+C to stop early") + + # Start the gateway in a separate thread so we can monitor it + import threading + import signal + + def run_gateway(): + try: + gateway.run() + except KeyboardInterrupt: + print("Gateway stopped by user") + + gateway_thread = threading.Thread(target=run_gateway) + gateway_thread.daemon = True + gateway_thread.start() + + # Monitor for 30 seconds + start_time = time.time() + while time.time() - start_time < 30: + time.sleep(1) + print(f"Running... ({int(time.time() - start_time)}s elapsed)") + + print("Test completed successfully!") + + except Exception as err: + print(f"Error during test: {err}") + import traceback + traceback.print_exc() + +if __name__ == "__main__": + test_multiprocessing() \ No newline at end of file