diff --git a/MULTIPROCESSING.md b/MULTIPROCESSING.md deleted file mode 100644 index bbd703b..0000000 --- a/MULTIPROCESSING.md +++ /dev/null @@ -1,166 +0,0 @@ -# Multiprocessing Support - -## Overview - -The Python Protocol Gateway now supports **automatic multiprocessing** when multiple transports are configured. This provides true concurrency and complete isolation between transports, solving the "transport busy" issues that can occur with single-threaded operation. - -## How It Works - -### Automatic Detection -- **Single Transport**: Uses the original single-threaded approach -- **Multiple Transports**: Automatically switches to multiprocessing mode - -### Process Isolation -Each transport runs in its own separate process, providing: -- **Complete isolation** of resources and state -- **True concurrent operation** - no waiting for other transports -- **Independent error handling** - one transport failure doesn't affect others -- **Automatic restart** of failed processes - -### Transport Types - -#### Input Transports (read_interval > 0) -- Modbus RTU, TCP, etc. -- Actively read data from devices -- Send data to output transports via bridging - -#### Output Transports (read_interval <= 0) -- InfluxDB, MQTT, etc. -- Receive data from input transports via bridging -- Process and forward data to external systems - -## Configuration Example - -```ini -[transport.0] -transport = modbus_rtu -protocol_version = eg4_v58 -address = 1 -port = /dev/ttyUSB0 -baudrate = 19200 -bridge = influxdb_output -read_interval = 10 - -[transport.1] -transport = modbus_rtu -protocol_version = eg4_v58 -address = 1 -port = /dev/ttyUSB1 -baudrate = 19200 -bridge = influxdb_output -read_interval = 10 - -[influxdb_output] -transport = influxdb_out -host = influxdb.example.com -port = 8086 -database = solar -measurement = eg4_data -``` - -## Inter-Process Communication - -### Bridging -- Uses `multiprocessing.Queue` for communication -- Automatic message routing between processes -- Non-blocking communication -- Source transport information preserved - -### Message Format -```python -{ - 'source_transport': 'transport.0', - 'target_transport': 'influxdb_output', - 'data': {...}, - 'source_transport_info': { - 'transport_name': 'transport.0', - 'device_identifier': '...', - 'device_manufacturer': '...', - 'device_model': '...', - 'device_serial_number': '...' - } -} -``` - -## Benefits - -### Performance -- **True concurrency** - no serialization delays -- **Independent timing** - each transport runs at its own interval -- **No resource contention** - each process has isolated resources - -### Reliability -- **Process isolation** - one transport failure doesn't affect others -- **Automatic restart** - failed processes are automatically restarted -- **Independent error handling** - each process handles its own errors - -### Scalability -- **Linear scaling** - performance scales with number of CPU cores -- **Resource efficiency** - only uses multiprocessing when needed -- **Memory isolation** - each process has its own memory space - -## Troubleshooting - -### Common Issues - -#### "Register is Empty; transport busy?" -- **Cause**: Shared state between transports in single-threaded mode -- **Solution**: Use multiprocessing mode (automatic with multiple transports) - -#### InfluxDB not receiving data -- **Cause**: Output transport not properly configured or started -- **Solution**: Ensure `influxdb_output` section has `transport = influxdb_out` - -#### Process restarting frequently -- **Cause**: Transport configuration error or device connection issue -- **Solution**: Check logs for specific error messages - -### Debugging - -#### Enable Debug Logging -```ini -[general] -log_level = DEBUG -``` - -#### Monitor Process Status -The gateway logs process creation and status: -``` -[2025-06-22 19:30:45] Starting multiprocessing mode with 3 transports -[2025-06-22 19:30:45] Input transports: 2, Output transports: 1 -[2025-06-22 19:30:45] Bridging detected - enabling inter-process communication -[2025-06-22 19:30:45] Started process for transport.0 (PID: 12345) -[2025-06-22 19:30:45] Started process for transport.1 (PID: 12346) -[2025-06-22 19:30:45] Started process for influxdb_output (PID: 12347) -``` - -## Testing - -Run the test script to verify multiprocessing functionality: -```bash -python pytests/test_multiprocessing.py -``` - -This will: -- Load your configuration -- Display transport information -- Run for 30 seconds to verify operation -- Show any errors or issues - -## Limitations - -1. **Memory Usage**: Each process uses additional memory -2. **Startup Time**: Slight delay when starting multiple processes -3. **Inter-Process Communication**: Bridge messages have small overhead -4. **Debugging**: More complex debugging due to multiple processes - -## Migration - -No migration required! Existing configurations will automatically benefit from multiprocessing when multiple transports are present. - -## Performance Tips - -1. **Stagger Read Intervals**: Use different read intervals to avoid resource contention -2. **Optimize Batch Sizes**: Adjust batch sizes for faster individual reads -3. **Monitor Logs**: Watch for process restarts indicating issues -4. **Resource Limits**: Ensure sufficient system resources for multiple processes diff --git a/classes/protocol_settings.py b/classes/protocol_settings.py index eb71a65..cc5e74e 100644 --- a/classes/protocol_settings.py +++ b/classes/protocol_settings.py @@ -265,7 +265,7 @@ class protocol_settings: _log : logging.Logger = None - def __init__(self, protocol : str, transport_settings : "SectionProxy" = None, settings_dir : str = "protocols", unique_id : str = None): + def __init__(self, protocol : str, transport_settings : "SectionProxy" = None, settings_dir : str = "protocols"): #apply log level to logger self._log_level = getattr(logging, logging.getLevelName(logging.getLogger().getEffectiveLevel()), logging.INFO) @@ -275,7 +275,6 @@ def __init__(self, protocol : str, transport_settings : "SectionProxy" = None, s self.protocol = protocol self.settings_dir = settings_dir self.transport_settings = transport_settings - self.unique_id = unique_id # Store unique identifier for this instance #load variable mask self.variable_mask = [] diff --git a/classes/transports/transport_base.py b/classes/transports/transport_base.py index aef23f9..e747102 100644 --- a/classes/transports/transport_base.py +++ b/classes/transports/transport_base.py @@ -112,10 +112,7 @@ def __init__(self, settings : "SectionProxy") -> None: #must load after settings self.protocol_version = settings.get("protocol_version") if self.protocol_version: - # Create a unique protocol settings instance for each transport to avoid shared state - unique_id = f"{self.transport_name}_{self.protocol_version}" - self._log.debug(f"Creating protocol settings with unique_id: {unique_id}") - self.protocolSettings = protocol_settings(self.protocol_version, transport_settings=settings, unique_id=unique_id) + self.protocolSettings = protocol_settings(self.protocol_version, transport_settings=settings) if self.protocolSettings: self.protocol_version = self.protocolSettings.protocol diff --git a/protocol_gateway.py b/protocol_gateway.py index 7be2f5c..c2f6a3a 100644 --- a/protocol_gateway.py +++ b/protocol_gateway.py @@ -23,15 +23,11 @@ import os import sys import traceback -import multiprocessing from configparser import ConfigParser, NoOptionError from classes.protocol_settings import protocol_settings, registry_map_entry from classes.transports.transport_base import transport_base -# Global queue for inter-process communication -bridge_queue = None - __logo = """ ██████╗ ██╗ ██╗████████╗██╗ ██╗ ██████╗ ███╗ ██╗ @@ -94,183 +90,6 @@ def getfloat(self, section, option, *args, **kwargs): #bypass fallback bug return float(value) if value is not None else None -class SingleTransportGateway: - """ - Gateway class for running a single transport in its own process - """ - __log = None - __running = False - __transport = None - config_file = "" - __bridge_queue = None - - def __init__(self, config_file: str, transport_name: str, bridge_queue=None): - self.config_file = config_file - self.__bridge_queue = bridge_queue - - # Set up logging for this process - self.__log = logging.getLogger(f"single_transport_{transport_name}") - handler = logging.StreamHandler(sys.stdout) - formatter = logging.Formatter("[%(asctime)s] {%(filename)s:%(lineno)d} %(levelname)s - %(message)s") - handler.setFormatter(formatter) - self.__log.addHandler(handler) - self.__log.setLevel(logging.INFO) - - self.__log.info(f"Initializing single transport gateway for {transport_name}") - - # Load configuration - self.__settings = CustomConfigParser() - self.__settings.read(self.config_file) - - # Find and initialize the specific transport - if transport_name in self.__settings.sections(): - transport_cfg = self.__settings[transport_name] - transport_type = transport_cfg.get("transport", fallback="") - protocol_version = transport_cfg.get("protocol_version", fallback="") - - if not transport_type and not protocol_version: - raise ValueError("Missing Transport / Protocol Version") - - if not transport_type and protocol_version: - protocolSettings = protocol_settings(protocol_version) - if not transport_type and not protocolSettings.transport: - raise ValueError("Missing Transport") - if not transport_type: - transport_type = protocolSettings.transport - - # Import the module - module = importlib.import_module("classes.transports." + transport_type) - # Get the class from the module - cls = getattr(module, transport_type) - self.__transport = cls(transport_cfg) - - self.__log.info(f"Created transport: {self.__transport.type}:{self.__transport.transport_name}") - - # Connect the transport - self.__log.info(f"Connecting to {self.__transport.type}:{self.__transport.transport_name}...") - self.__transport.connect() - - else: - raise ValueError(f"Transport section '{transport_name}' not found in config") - - def handle_bridge_message(self, message): - """ - Handle bridge messages from other transports - """ - try: - source_transport = message.get('source_transport') - target_transport = message.get('target_transport') - data = message.get('data') - source_transport_info = message.get('source_transport_info', {}) - - # Check if this transport is the target - if target_transport == self.__transport.transport_name: - self.__log.debug(f"Received bridge message from {source_transport} with {len(data)} items") - - # Forward the data to this transport - if hasattr(self.__transport, 'write_data'): - # Create a mock transport object with the source transport info - class MockSourceTransport: - def __init__(self, info): - self.transport_name = info.get('transport_name', '') - self.device_identifier = info.get('device_identifier', '') - self.device_name = info.get('device_name', '') - self.device_manufacturer = info.get('device_manufacturer', '') - self.device_model = info.get('device_model', '') - self.device_serial_number = info.get('device_serial_number', '') - - source_transport_obj = MockSourceTransport(source_transport_info) - - # Call write_data with the correct parameters - self.__transport.write_data(data, source_transport_obj) - else: - self.__log.warning(f"Transport {self.__transport.transport_name} does not support write_data") - - except Exception as err: - self.__log.error(f"Error handling bridge message: {err}") - traceback.print_exc() - - def run(self): - """ - Run the single transport - """ - self.__running = True - self.__log.info(f"Starting single transport: {self.__transport.transport_name}") - - # Check if this is an output transport (no read_interval) - is_output_transport = (self.__transport.read_interval <= 0) - - if is_output_transport: - self.__log.info(f"Running output transport: {self.__transport.transport_name}") - # For output transports, just handle bridge messages - while self.__running: - try: - # Check for bridge messages - if self.__bridge_queue: - try: - while not self.__bridge_queue.empty(): - message = self.__bridge_queue.get_nowait() - self.handle_bridge_message(message) - except: - pass # Queue is empty or other error - - time.sleep(0.1) # Short sleep for output transports - - except Exception as err: - self.__log.error(f"Error in output transport {self.__transport.transport_name}: {err}") - traceback.print_exc() - else: - # For input transports, handle both reading and bridging - while self.__running: - try: - # Check for bridge messages - if self.__bridge_queue: - try: - while not self.__bridge_queue.empty(): - message = self.__bridge_queue.get_nowait() - self.handle_bridge_message(message) - except: - pass # Queue is empty or other error - - now = time.time() - if self.__transport.read_interval > 0 and now - self.__transport.last_read_time > self.__transport.read_interval: - self.__transport.last_read_time = now - - if not self.__transport.connected: - self.__transport.connect() - else: - info = self.__transport.read_data() - - if info: - self.__log.debug(f"Read data from {self.__transport.transport_name}: {len(info)} items") - - # Handle bridging if configured - if self.__transport.bridge and self.__bridge_queue: - self.__log.debug(f"Sending bridge message from {self.__transport.transport_name} to {self.__transport.bridge}") - bridge_message = { - 'source_transport': self.__transport.transport_name, - 'target_transport': self.__transport.bridge, - 'data': info, - 'source_transport_info': { - 'transport_name': self.__transport.transport_name, - 'device_identifier': getattr(self.__transport, 'device_identifier', ''), - 'device_name': getattr(self.__transport, 'device_name', ''), - 'device_manufacturer': getattr(self.__transport, 'device_manufacturer', ''), - 'device_model': getattr(self.__transport, 'device_model', ''), - 'device_serial_number': getattr(self.__transport, 'device_serial_number', '') - } - } - self.__bridge_queue.put(bridge_message) - else: - self.__log.debug(f"No data read from {self.__transport.transport_name}") - - except Exception as err: - self.__log.error(f"Error in transport {self.__transport.transport_name}: {err}") - traceback.print_exc() - - time.sleep(0.7) - - class Protocol_Gateway: """ Main class, implementing the Growatt / Inverters to MQTT functionality @@ -369,33 +188,11 @@ def on_message(self, transport : transport_base, entry : registry_map_entry, dat to_transport.write_data({entry.variable_name : data}, transport) break - def run_single_transport(self, transport_name: str, config_file: str, bridge_queue=None): - """ - Run a single transport in its own process - """ - try: - # Create a new gateway instance for this transport - single_gateway = SingleTransportGateway(config_file, transport_name, bridge_queue) - single_gateway.run() - except Exception as err: - print(f"Error in transport {transport_name}: {err}") - traceback.print_exc() - def run(self): """ run method, starts ModBus connection and mqtt connection """ - if len(self.__transports) <= 1: - # Use single-threaded approach for 1 or fewer transports - self.__run_single_threaded() - else: - # Use multiprocessing approach for multiple transports - self.__run_multiprocess() - def __run_single_threaded(self): - """ - Original single-threaded implementation - """ self.__running = True if False: @@ -430,70 +227,6 @@ def __run_single_threaded(self): time.sleep(0.07) #change this in future. probably reduce to allow faster reads. - def __run_multiprocess(self): - """ - Multiprocessing implementation for multiple transports - """ - self.__log.info(f"Starting multiprocessing mode with {len(self.__transports)} transports") - - # Separate input and output transports - input_transports = [t for t in self.__transports if t.read_interval > 0] - output_transports = [t for t in self.__transports if t.read_interval <= 0] - - self.__log.info(f"Input transports: {len(input_transports)}, Output transports: {len(output_transports)}") - - # Check for bridging configuration - has_bridging = any(transport.bridge for transport in self.__transports) - if has_bridging: - self.__log.info("Bridging detected - enabling inter-process communication") - else: - self.__log.info("No bridging configured - transports will run independently") - - # Create a shared queue for inter-process communication - bridge_queue = multiprocessing.Queue() if has_bridging else None - - # Create processes for each transport - processes = [] - for transport in self.__transports: - process = multiprocessing.Process( - target=self.run_single_transport, - args=(transport.transport_name, self.config_file, bridge_queue), - name=f"transport_{transport.transport_name}" - ) - process.start() - processes.append(process) - self.__log.info(f"Started process for {transport.transport_name} (PID: {process.pid})") - - # Monitor processes - try: - while True: - # Check if any process has died - for i, process in enumerate(processes): - if not process.is_alive(): - transport_name = self.__transports[i].transport_name - self.__log.warning(f"Process for {transport_name} died, restarting...") - - # Restart the process - new_process = multiprocessing.Process( - target=self.run_single_transport, - args=(transport_name, self.config_file, bridge_queue), - name=f"transport_{transport_name}" - ) - new_process.start() - processes[i] = new_process - self.__log.info(f"Restarted process for {transport_name} (PID: {new_process.pid})") - - time.sleep(5) # Check every 5 seconds - - except KeyboardInterrupt: - self.__log.info("Shutting down multiprocessing mode...") - for process in processes: - process.terminate() - process.join(timeout=5) - if process.is_alive(): - process.kill() - self.__log.info("All processes terminated") - diff --git a/pytests/test_multiprocessing.py b/pytests/test_multiprocessing.py deleted file mode 100644 index 8b32c8d..0000000 --- a/pytests/test_multiprocessing.py +++ /dev/null @@ -1,71 +0,0 @@ -#!/usr/bin/env python3 -""" -Test script for multiprocessing implementation -""" - -import os -import sys -import time - -# Add the current directory to the path so we can import the gateway -sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) - -from protocol_gateway import Protocol_Gateway - -def test_multiprocessing(): - """ - Test the multiprocessing implementation - """ - print("Testing multiprocessing implementation...") - - # Test with a config file that has multiple transports - config_file = "config.cfg" - - if not os.path.exists(config_file): - print(f"Config file {config_file} not found. Please create a config with multiple transports.") - return - - try: - # Create the gateway - gateway = Protocol_Gateway(config_file) - - print(f"Found {len(gateway._Protocol_Gateway__transports)} transports:") - for transport in gateway._Protocol_Gateway__transports: - transport_type = "INPUT" if transport.read_interval > 0 else "OUTPUT" - print(f" - {transport.transport_name}: {transport_type} transport") - if hasattr(transport, 'bridge') and transport.bridge: - print(f" Bridges to: {transport.bridge}") - - # Test the multiprocessing mode - print("\nStarting multiprocessing test (will run for 30 seconds)...") - print("Press Ctrl+C to stop early") - - # Start the gateway in a separate thread so we can monitor it - import threading - import signal - - def run_gateway(): - try: - gateway.run() - except KeyboardInterrupt: - print("Gateway stopped by user") - - gateway_thread = threading.Thread(target=run_gateway) - gateway_thread.daemon = True - gateway_thread.start() - - # Monitor for 30 seconds - start_time = time.time() - while time.time() - start_time < 30: - time.sleep(1) - print(f"Running... ({int(time.time() - start_time)}s elapsed)") - - print("Test completed successfully!") - - except Exception as err: - print(f"Error during test: {err}") - import traceback - traceback.print_exc() - -if __name__ == "__main__": - test_multiprocessing() \ No newline at end of file