From d889fa61ee6d765a05004a2c437d096ea852c07e Mon Sep 17 00:00:00 2001 From: HotNoob Date: Sun, 22 Jun 2025 21:52:39 -0500 Subject: [PATCH 1/2] Revert "Improve support of two rs485 inputs at once and single output transport" --- MULTIPROCESSING.md | 166 ---------------- classes/protocol_settings.py | 3 +- classes/transports/influxdb_out.py | 28 +-- classes/transports/transport_base.py | 5 +- protocol_gateway.py | 276 +-------------------------- pytests/test_multiprocessing.py | 71 ------- 6 files changed, 10 insertions(+), 539 deletions(-) delete mode 100644 MULTIPROCESSING.md delete mode 100644 pytests/test_multiprocessing.py diff --git a/MULTIPROCESSING.md b/MULTIPROCESSING.md deleted file mode 100644 index bbd703b..0000000 --- a/MULTIPROCESSING.md +++ /dev/null @@ -1,166 +0,0 @@ -# Multiprocessing Support - -## Overview - -The Python Protocol Gateway now supports **automatic multiprocessing** when multiple transports are configured. This provides true concurrency and complete isolation between transports, solving the "transport busy" issues that can occur with single-threaded operation. - -## How It Works - -### Automatic Detection -- **Single Transport**: Uses the original single-threaded approach -- **Multiple Transports**: Automatically switches to multiprocessing mode - -### Process Isolation -Each transport runs in its own separate process, providing: -- **Complete isolation** of resources and state -- **True concurrent operation** - no waiting for other transports -- **Independent error handling** - one transport failure doesn't affect others -- **Automatic restart** of failed processes - -### Transport Types - -#### Input Transports (read_interval > 0) -- Modbus RTU, TCP, etc. -- Actively read data from devices -- Send data to output transports via bridging - -#### Output Transports (read_interval <= 0) -- InfluxDB, MQTT, etc. -- Receive data from input transports via bridging -- Process and forward data to external systems - -## Configuration Example - -```ini -[transport.0] -transport = modbus_rtu -protocol_version = eg4_v58 -address = 1 -port = /dev/ttyUSB0 -baudrate = 19200 -bridge = influxdb_output -read_interval = 10 - -[transport.1] -transport = modbus_rtu -protocol_version = eg4_v58 -address = 1 -port = /dev/ttyUSB1 -baudrate = 19200 -bridge = influxdb_output -read_interval = 10 - -[influxdb_output] -transport = influxdb_out -host = influxdb.example.com -port = 8086 -database = solar -measurement = eg4_data -``` - -## Inter-Process Communication - -### Bridging -- Uses `multiprocessing.Queue` for communication -- Automatic message routing between processes -- Non-blocking communication -- Source transport information preserved - -### Message Format -```python -{ - 'source_transport': 'transport.0', - 'target_transport': 'influxdb_output', - 'data': {...}, - 'source_transport_info': { - 'transport_name': 'transport.0', - 'device_identifier': '...', - 'device_manufacturer': '...', - 'device_model': '...', - 'device_serial_number': '...' - } -} -``` - -## Benefits - -### Performance -- **True concurrency** - no serialization delays -- **Independent timing** - each transport runs at its own interval -- **No resource contention** - each process has isolated resources - -### Reliability -- **Process isolation** - one transport failure doesn't affect others -- **Automatic restart** - failed processes are automatically restarted -- **Independent error handling** - each process handles its own errors - -### Scalability -- **Linear scaling** - performance scales with number of CPU cores -- **Resource efficiency** - only uses multiprocessing when needed -- **Memory isolation** - each process has its own memory space - -## Troubleshooting - -### Common Issues - -#### "Register is Empty; transport busy?" -- **Cause**: Shared state between transports in single-threaded mode -- **Solution**: Use multiprocessing mode (automatic with multiple transports) - -#### InfluxDB not receiving data -- **Cause**: Output transport not properly configured or started -- **Solution**: Ensure `influxdb_output` section has `transport = influxdb_out` - -#### Process restarting frequently -- **Cause**: Transport configuration error or device connection issue -- **Solution**: Check logs for specific error messages - -### Debugging - -#### Enable Debug Logging -```ini -[general] -log_level = DEBUG -``` - -#### Monitor Process Status -The gateway logs process creation and status: -``` -[2025-06-22 19:30:45] Starting multiprocessing mode with 3 transports -[2025-06-22 19:30:45] Input transports: 2, Output transports: 1 -[2025-06-22 19:30:45] Bridging detected - enabling inter-process communication -[2025-06-22 19:30:45] Started process for transport.0 (PID: 12345) -[2025-06-22 19:30:45] Started process for transport.1 (PID: 12346) -[2025-06-22 19:30:45] Started process for influxdb_output (PID: 12347) -``` - -## Testing - -Run the test script to verify multiprocessing functionality: -```bash -python pytests/test_multiprocessing.py -``` - -This will: -- Load your configuration -- Display transport information -- Run for 30 seconds to verify operation -- Show any errors or issues - -## Limitations - -1. **Memory Usage**: Each process uses additional memory -2. **Startup Time**: Slight delay when starting multiple processes -3. **Inter-Process Communication**: Bridge messages have small overhead -4. **Debugging**: More complex debugging due to multiple processes - -## Migration - -No migration required! Existing configurations will automatically benefit from multiprocessing when multiple transports are present. - -## Performance Tips - -1. **Stagger Read Intervals**: Use different read intervals to avoid resource contention -2. **Optimize Batch Sizes**: Adjust batch sizes for faster individual reads -3. **Monitor Logs**: Watch for process restarts indicating issues -4. **Resource Limits**: Ensure sufficient system resources for multiple processes diff --git a/classes/protocol_settings.py b/classes/protocol_settings.py index eb71a65..cc5e74e 100644 --- a/classes/protocol_settings.py +++ b/classes/protocol_settings.py @@ -265,7 +265,7 @@ class protocol_settings: _log : logging.Logger = None - def __init__(self, protocol : str, transport_settings : "SectionProxy" = None, settings_dir : str = "protocols", unique_id : str = None): + def __init__(self, protocol : str, transport_settings : "SectionProxy" = None, settings_dir : str = "protocols"): #apply log level to logger self._log_level = getattr(logging, logging.getLevelName(logging.getLogger().getEffectiveLevel()), logging.INFO) @@ -275,7 +275,6 @@ def __init__(self, protocol : str, transport_settings : "SectionProxy" = None, s self.protocol = protocol self.settings_dir = settings_dir self.transport_settings = transport_settings - self.unique_id = unique_id # Store unique identifier for this instance #load variable mask self.variable_mask = [] diff --git a/classes/transports/influxdb_out.py b/classes/transports/influxdb_out.py index 01c698a..d0a9589 100644 --- a/classes/transports/influxdb_out.py +++ b/classes/transports/influxdb_out.py @@ -2,7 +2,6 @@ from configparser import SectionProxy from typing import TextIO import time -import logging from defs.common import strtobool @@ -22,7 +21,6 @@ class influxdb_out(transport_base): include_device_info: bool = True batch_size: int = 100 batch_timeout: float = 10.0 - force_float: bool = True # Force all numeric fields to be floats to avoid InfluxDB type conflicts client = None batch_points = [] @@ -39,7 +37,6 @@ def __init__(self, settings: SectionProxy): self.include_device_info = strtobool(settings.get("include_device_info", fallback=self.include_device_info)) self.batch_size = settings.getint("batch_size", fallback=self.batch_size) self.batch_timeout = settings.getfloat("batch_timeout", fallback=self.batch_timeout) - self.force_float = strtobool(settings.get("force_float", fallback=self.force_float)) self.write_enabled = True # InfluxDB output is always write-enabled super().__init__(settings) @@ -106,7 +103,6 @@ def write_data(self, data: dict[str, str], from_transport: transport_base): for key, value in data.items(): # Check if we should force float formatting based on protocol settings should_force_float = False - unit_mod_found = None # Try to get registry entry from protocol settings to check unit_mod if hasattr(from_transport, 'protocolSettings') and from_transport.protocolSettings: @@ -114,9 +110,7 @@ def write_data(self, data: dict[str, str], from_transport: transport_base): for registry_type in [Registry_Type.INPUT, Registry_Type.HOLDING]: registry_map = from_transport.protocolSettings.get_registry_map(registry_type) for entry in registry_map: - # Match by variable_name (which is lowercase) - if entry.variable_name.lower() == key.lower(): - unit_mod_found = entry.unit_mod + if entry.variable_name == key: # If unit_mod is not 1.0, this value should be treated as float if entry.unit_mod != 1.0: should_force_float = True @@ -130,28 +124,14 @@ def write_data(self, data: dict[str, str], from_transport: transport_base): # Try to convert to float first float_val = float(value) - # Always use float for InfluxDB to avoid type conflicts - # InfluxDB is strict about field types - once a field is created as integer, - # it must always be integer. Using float avoids this issue. - if self.force_float: + # If it's an integer but should be forced to float, or if it's already a float + if should_force_float or not float_val.is_integer(): fields[key] = float_val else: - # Only use integer if it's actually an integer and we're not forcing floats - if float_val.is_integer(): - fields[key] = int(float_val) - else: - fields[key] = float_val - - # Log data type conversion for debugging - if self._log.isEnabledFor(logging.DEBUG): - original_type = type(value).__name__ - final_type = type(fields[key]).__name__ - self._log.debug(f"Field {key}: {value} ({original_type}) -> {fields[key]} ({final_type}) [unit_mod: {unit_mod_found}]") - + fields[key] = int(float_val) except (ValueError, TypeError): # If conversion fails, store as string fields[key] = str(value) - self._log.debug(f"Field {key}: {value} -> string (conversion failed)") # Create InfluxDB point point = { diff --git a/classes/transports/transport_base.py b/classes/transports/transport_base.py index aef23f9..e747102 100644 --- a/classes/transports/transport_base.py +++ b/classes/transports/transport_base.py @@ -112,10 +112,7 @@ def __init__(self, settings : "SectionProxy") -> None: #must load after settings self.protocol_version = settings.get("protocol_version") if self.protocol_version: - # Create a unique protocol settings instance for each transport to avoid shared state - unique_id = f"{self.transport_name}_{self.protocol_version}" - self._log.debug(f"Creating protocol settings with unique_id: {unique_id}") - self.protocolSettings = protocol_settings(self.protocol_version, transport_settings=settings, unique_id=unique_id) + self.protocolSettings = protocol_settings(self.protocol_version, transport_settings=settings) if self.protocolSettings: self.protocol_version = self.protocolSettings.protocol diff --git a/protocol_gateway.py b/protocol_gateway.py index 7be2f5c..dc3c4fa 100644 --- a/protocol_gateway.py +++ b/protocol_gateway.py @@ -23,15 +23,11 @@ import os import sys import traceback -import multiprocessing from configparser import ConfigParser, NoOptionError from classes.protocol_settings import protocol_settings, registry_map_entry from classes.transports.transport_base import transport_base -# Global queue for inter-process communication -bridge_queue = None - __logo = """ ██████╗ ██╗ ██╗████████╗██╗ ██╗ ██████╗ ███╗ ██╗ @@ -94,183 +90,6 @@ def getfloat(self, section, option, *args, **kwargs): #bypass fallback bug return float(value) if value is not None else None -class SingleTransportGateway: - """ - Gateway class for running a single transport in its own process - """ - __log = None - __running = False - __transport = None - config_file = "" - __bridge_queue = None - - def __init__(self, config_file: str, transport_name: str, bridge_queue=None): - self.config_file = config_file - self.__bridge_queue = bridge_queue - - # Set up logging for this process - self.__log = logging.getLogger(f"single_transport_{transport_name}") - handler = logging.StreamHandler(sys.stdout) - formatter = logging.Formatter("[%(asctime)s] {%(filename)s:%(lineno)d} %(levelname)s - %(message)s") - handler.setFormatter(formatter) - self.__log.addHandler(handler) - self.__log.setLevel(logging.INFO) - - self.__log.info(f"Initializing single transport gateway for {transport_name}") - - # Load configuration - self.__settings = CustomConfigParser() - self.__settings.read(self.config_file) - - # Find and initialize the specific transport - if transport_name in self.__settings.sections(): - transport_cfg = self.__settings[transport_name] - transport_type = transport_cfg.get("transport", fallback="") - protocol_version = transport_cfg.get("protocol_version", fallback="") - - if not transport_type and not protocol_version: - raise ValueError("Missing Transport / Protocol Version") - - if not transport_type and protocol_version: - protocolSettings = protocol_settings(protocol_version) - if not transport_type and not protocolSettings.transport: - raise ValueError("Missing Transport") - if not transport_type: - transport_type = protocolSettings.transport - - # Import the module - module = importlib.import_module("classes.transports." + transport_type) - # Get the class from the module - cls = getattr(module, transport_type) - self.__transport = cls(transport_cfg) - - self.__log.info(f"Created transport: {self.__transport.type}:{self.__transport.transport_name}") - - # Connect the transport - self.__log.info(f"Connecting to {self.__transport.type}:{self.__transport.transport_name}...") - self.__transport.connect() - - else: - raise ValueError(f"Transport section '{transport_name}' not found in config") - - def handle_bridge_message(self, message): - """ - Handle bridge messages from other transports - """ - try: - source_transport = message.get('source_transport') - target_transport = message.get('target_transport') - data = message.get('data') - source_transport_info = message.get('source_transport_info', {}) - - # Check if this transport is the target - if target_transport == self.__transport.transport_name: - self.__log.debug(f"Received bridge message from {source_transport} with {len(data)} items") - - # Forward the data to this transport - if hasattr(self.__transport, 'write_data'): - # Create a mock transport object with the source transport info - class MockSourceTransport: - def __init__(self, info): - self.transport_name = info.get('transport_name', '') - self.device_identifier = info.get('device_identifier', '') - self.device_name = info.get('device_name', '') - self.device_manufacturer = info.get('device_manufacturer', '') - self.device_model = info.get('device_model', '') - self.device_serial_number = info.get('device_serial_number', '') - - source_transport_obj = MockSourceTransport(source_transport_info) - - # Call write_data with the correct parameters - self.__transport.write_data(data, source_transport_obj) - else: - self.__log.warning(f"Transport {self.__transport.transport_name} does not support write_data") - - except Exception as err: - self.__log.error(f"Error handling bridge message: {err}") - traceback.print_exc() - - def run(self): - """ - Run the single transport - """ - self.__running = True - self.__log.info(f"Starting single transport: {self.__transport.transport_name}") - - # Check if this is an output transport (no read_interval) - is_output_transport = (self.__transport.read_interval <= 0) - - if is_output_transport: - self.__log.info(f"Running output transport: {self.__transport.transport_name}") - # For output transports, just handle bridge messages - while self.__running: - try: - # Check for bridge messages - if self.__bridge_queue: - try: - while not self.__bridge_queue.empty(): - message = self.__bridge_queue.get_nowait() - self.handle_bridge_message(message) - except: - pass # Queue is empty or other error - - time.sleep(0.1) # Short sleep for output transports - - except Exception as err: - self.__log.error(f"Error in output transport {self.__transport.transport_name}: {err}") - traceback.print_exc() - else: - # For input transports, handle both reading and bridging - while self.__running: - try: - # Check for bridge messages - if self.__bridge_queue: - try: - while not self.__bridge_queue.empty(): - message = self.__bridge_queue.get_nowait() - self.handle_bridge_message(message) - except: - pass # Queue is empty or other error - - now = time.time() - if self.__transport.read_interval > 0 and now - self.__transport.last_read_time > self.__transport.read_interval: - self.__transport.last_read_time = now - - if not self.__transport.connected: - self.__transport.connect() - else: - info = self.__transport.read_data() - - if info: - self.__log.debug(f"Read data from {self.__transport.transport_name}: {len(info)} items") - - # Handle bridging if configured - if self.__transport.bridge and self.__bridge_queue: - self.__log.debug(f"Sending bridge message from {self.__transport.transport_name} to {self.__transport.bridge}") - bridge_message = { - 'source_transport': self.__transport.transport_name, - 'target_transport': self.__transport.bridge, - 'data': info, - 'source_transport_info': { - 'transport_name': self.__transport.transport_name, - 'device_identifier': getattr(self.__transport, 'device_identifier', ''), - 'device_name': getattr(self.__transport, 'device_name', ''), - 'device_manufacturer': getattr(self.__transport, 'device_manufacturer', ''), - 'device_model': getattr(self.__transport, 'device_model', ''), - 'device_serial_number': getattr(self.__transport, 'device_serial_number', '') - } - } - self.__bridge_queue.put(bridge_message) - else: - self.__log.debug(f"No data read from {self.__transport.transport_name}") - - except Exception as err: - self.__log.error(f"Error in transport {self.__transport.transport_name}: {err}") - traceback.print_exc() - - time.sleep(0.7) - - class Protocol_Gateway: """ Main class, implementing the Growatt / Inverters to MQTT functionality @@ -318,12 +137,11 @@ def __init__(self, config_file : str): logging.basicConfig(level=log_level) for section in self.__settings.sections(): - transport_cfg = self.__settings[section] - transport_type = transport_cfg.get("transport", fallback="") - protocol_version = transport_cfg.get("protocol_version", fallback="") + if section.startswith("transport"): + transport_cfg = self.__settings[section] + transport_type = transport_cfg.get("transport", fallback="") + protocol_version = transport_cfg.get("protocol_version", fallback="") - # Process sections that either start with "transport" OR have a transport field - if section.startswith("transport") or transport_type: if not transport_type and not protocol_version: raise ValueError("Missing Transport / Protocol Version") @@ -369,33 +187,11 @@ def on_message(self, transport : transport_base, entry : registry_map_entry, dat to_transport.write_data({entry.variable_name : data}, transport) break - def run_single_transport(self, transport_name: str, config_file: str, bridge_queue=None): - """ - Run a single transport in its own process - """ - try: - # Create a new gateway instance for this transport - single_gateway = SingleTransportGateway(config_file, transport_name, bridge_queue) - single_gateway.run() - except Exception as err: - print(f"Error in transport {transport_name}: {err}") - traceback.print_exc() - def run(self): """ run method, starts ModBus connection and mqtt connection """ - if len(self.__transports) <= 1: - # Use single-threaded approach for 1 or fewer transports - self.__run_single_threaded() - else: - # Use multiprocessing approach for multiple transports - self.__run_multiprocess() - def __run_single_threaded(self): - """ - Original single-threaded implementation - """ self.__running = True if False: @@ -430,70 +226,6 @@ def __run_single_threaded(self): time.sleep(0.07) #change this in future. probably reduce to allow faster reads. - def __run_multiprocess(self): - """ - Multiprocessing implementation for multiple transports - """ - self.__log.info(f"Starting multiprocessing mode with {len(self.__transports)} transports") - - # Separate input and output transports - input_transports = [t for t in self.__transports if t.read_interval > 0] - output_transports = [t for t in self.__transports if t.read_interval <= 0] - - self.__log.info(f"Input transports: {len(input_transports)}, Output transports: {len(output_transports)}") - - # Check for bridging configuration - has_bridging = any(transport.bridge for transport in self.__transports) - if has_bridging: - self.__log.info("Bridging detected - enabling inter-process communication") - else: - self.__log.info("No bridging configured - transports will run independently") - - # Create a shared queue for inter-process communication - bridge_queue = multiprocessing.Queue() if has_bridging else None - - # Create processes for each transport - processes = [] - for transport in self.__transports: - process = multiprocessing.Process( - target=self.run_single_transport, - args=(transport.transport_name, self.config_file, bridge_queue), - name=f"transport_{transport.transport_name}" - ) - process.start() - processes.append(process) - self.__log.info(f"Started process for {transport.transport_name} (PID: {process.pid})") - - # Monitor processes - try: - while True: - # Check if any process has died - for i, process in enumerate(processes): - if not process.is_alive(): - transport_name = self.__transports[i].transport_name - self.__log.warning(f"Process for {transport_name} died, restarting...") - - # Restart the process - new_process = multiprocessing.Process( - target=self.run_single_transport, - args=(transport_name, self.config_file, bridge_queue), - name=f"transport_{transport_name}" - ) - new_process.start() - processes[i] = new_process - self.__log.info(f"Restarted process for {transport_name} (PID: {new_process.pid})") - - time.sleep(5) # Check every 5 seconds - - except KeyboardInterrupt: - self.__log.info("Shutting down multiprocessing mode...") - for process in processes: - process.terminate() - process.join(timeout=5) - if process.is_alive(): - process.kill() - self.__log.info("All processes terminated") - diff --git a/pytests/test_multiprocessing.py b/pytests/test_multiprocessing.py deleted file mode 100644 index 8b32c8d..0000000 --- a/pytests/test_multiprocessing.py +++ /dev/null @@ -1,71 +0,0 @@ -#!/usr/bin/env python3 -""" -Test script for multiprocessing implementation -""" - -import os -import sys -import time - -# Add the current directory to the path so we can import the gateway -sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) - -from protocol_gateway import Protocol_Gateway - -def test_multiprocessing(): - """ - Test the multiprocessing implementation - """ - print("Testing multiprocessing implementation...") - - # Test with a config file that has multiple transports - config_file = "config.cfg" - - if not os.path.exists(config_file): - print(f"Config file {config_file} not found. Please create a config with multiple transports.") - return - - try: - # Create the gateway - gateway = Protocol_Gateway(config_file) - - print(f"Found {len(gateway._Protocol_Gateway__transports)} transports:") - for transport in gateway._Protocol_Gateway__transports: - transport_type = "INPUT" if transport.read_interval > 0 else "OUTPUT" - print(f" - {transport.transport_name}: {transport_type} transport") - if hasattr(transport, 'bridge') and transport.bridge: - print(f" Bridges to: {transport.bridge}") - - # Test the multiprocessing mode - print("\nStarting multiprocessing test (will run for 30 seconds)...") - print("Press Ctrl+C to stop early") - - # Start the gateway in a separate thread so we can monitor it - import threading - import signal - - def run_gateway(): - try: - gateway.run() - except KeyboardInterrupt: - print("Gateway stopped by user") - - gateway_thread = threading.Thread(target=run_gateway) - gateway_thread.daemon = True - gateway_thread.start() - - # Monitor for 30 seconds - start_time = time.time() - while time.time() - start_time < 30: - time.sleep(1) - print(f"Running... ({int(time.time() - start_time)}s elapsed)") - - print("Test completed successfully!") - - except Exception as err: - print(f"Error during test: {err}") - import traceback - traceback.print_exc() - -if __name__ == "__main__": - test_multiprocessing() \ No newline at end of file From 42bc7c94fbbe7cf414d2feebc021bcf5c03dd008 Mon Sep 17 00:00:00 2001 From: HotNoob Date: Sun, 22 Jun 2025 21:56:40 -0500 Subject: [PATCH 2/2] add --- classes/transports/influxdb_out.py | 28 ++++++++++++++++++++++++---- protocol_gateway.py | 9 +++++---- 2 files changed, 29 insertions(+), 8 deletions(-) diff --git a/classes/transports/influxdb_out.py b/classes/transports/influxdb_out.py index d0a9589..01c698a 100644 --- a/classes/transports/influxdb_out.py +++ b/classes/transports/influxdb_out.py @@ -2,6 +2,7 @@ from configparser import SectionProxy from typing import TextIO import time +import logging from defs.common import strtobool @@ -21,6 +22,7 @@ class influxdb_out(transport_base): include_device_info: bool = True batch_size: int = 100 batch_timeout: float = 10.0 + force_float: bool = True # Force all numeric fields to be floats to avoid InfluxDB type conflicts client = None batch_points = [] @@ -37,6 +39,7 @@ def __init__(self, settings: SectionProxy): self.include_device_info = strtobool(settings.get("include_device_info", fallback=self.include_device_info)) self.batch_size = settings.getint("batch_size", fallback=self.batch_size) self.batch_timeout = settings.getfloat("batch_timeout", fallback=self.batch_timeout) + self.force_float = strtobool(settings.get("force_float", fallback=self.force_float)) self.write_enabled = True # InfluxDB output is always write-enabled super().__init__(settings) @@ -103,6 +106,7 @@ def write_data(self, data: dict[str, str], from_transport: transport_base): for key, value in data.items(): # Check if we should force float formatting based on protocol settings should_force_float = False + unit_mod_found = None # Try to get registry entry from protocol settings to check unit_mod if hasattr(from_transport, 'protocolSettings') and from_transport.protocolSettings: @@ -110,7 +114,9 @@ def write_data(self, data: dict[str, str], from_transport: transport_base): for registry_type in [Registry_Type.INPUT, Registry_Type.HOLDING]: registry_map = from_transport.protocolSettings.get_registry_map(registry_type) for entry in registry_map: - if entry.variable_name == key: + # Match by variable_name (which is lowercase) + if entry.variable_name.lower() == key.lower(): + unit_mod_found = entry.unit_mod # If unit_mod is not 1.0, this value should be treated as float if entry.unit_mod != 1.0: should_force_float = True @@ -124,14 +130,28 @@ def write_data(self, data: dict[str, str], from_transport: transport_base): # Try to convert to float first float_val = float(value) - # If it's an integer but should be forced to float, or if it's already a float - if should_force_float or not float_val.is_integer(): + # Always use float for InfluxDB to avoid type conflicts + # InfluxDB is strict about field types - once a field is created as integer, + # it must always be integer. Using float avoids this issue. + if self.force_float: fields[key] = float_val else: - fields[key] = int(float_val) + # Only use integer if it's actually an integer and we're not forcing floats + if float_val.is_integer(): + fields[key] = int(float_val) + else: + fields[key] = float_val + + # Log data type conversion for debugging + if self._log.isEnabledFor(logging.DEBUG): + original_type = type(value).__name__ + final_type = type(fields[key]).__name__ + self._log.debug(f"Field {key}: {value} ({original_type}) -> {fields[key]} ({final_type}) [unit_mod: {unit_mod_found}]") + except (ValueError, TypeError): # If conversion fails, store as string fields[key] = str(value) + self._log.debug(f"Field {key}: {value} -> string (conversion failed)") # Create InfluxDB point point = { diff --git a/protocol_gateway.py b/protocol_gateway.py index dc3c4fa..c2f6a3a 100644 --- a/protocol_gateway.py +++ b/protocol_gateway.py @@ -137,11 +137,12 @@ def __init__(self, config_file : str): logging.basicConfig(level=log_level) for section in self.__settings.sections(): - if section.startswith("transport"): - transport_cfg = self.__settings[section] - transport_type = transport_cfg.get("transport", fallback="") - protocol_version = transport_cfg.get("protocol_version", fallback="") + transport_cfg = self.__settings[section] + transport_type = transport_cfg.get("transport", fallback="") + protocol_version = transport_cfg.get("protocol_version", fallback="") + # Process sections that either start with "transport" OR have a transport field + if section.startswith("transport") or transport_type: if not transport_type and not protocol_version: raise ValueError("Missing Transport / Protocol Version")