Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
166 changes: 166 additions & 0 deletions MULTIPROCESSING.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
# Multiprocessing Support

## Overview

The Python Protocol Gateway now supports **automatic multiprocessing** when multiple transports are configured. This provides true concurrency and complete isolation between transports, solving the "transport busy" issues that can occur with single-threaded operation.

## How It Works

### Automatic Detection
- **Single Transport**: Uses the original single-threaded approach
- **Multiple Transports**: Automatically switches to multiprocessing mode

### Process Isolation
Each transport runs in its own separate process, providing:
- **Complete isolation** of resources and state
- **True concurrent operation** - no waiting for other transports
- **Independent error handling** - one transport failure doesn't affect others
- **Automatic restart** of failed processes

### Transport Types

#### Input Transports (read_interval > 0)
- Modbus RTU, TCP, etc.
- Actively read data from devices
- Send data to output transports via bridging

#### Output Transports (read_interval <= 0)
- InfluxDB, MQTT, etc.
- Receive data from input transports via bridging
- Process and forward data to external systems

## Configuration Example

```ini
[transport.0]
transport = modbus_rtu
protocol_version = eg4_v58
address = 1
port = /dev/ttyUSB0
baudrate = 19200
bridge = influxdb_output
read_interval = 10

[transport.1]
transport = modbus_rtu
protocol_version = eg4_v58
address = 1
port = /dev/ttyUSB1
baudrate = 19200
bridge = influxdb_output
read_interval = 10

[influxdb_output]
transport = influxdb_out
host = influxdb.example.com
port = 8086
database = solar
measurement = eg4_data
```

## Inter-Process Communication

### Bridging
- Uses `multiprocessing.Queue` for communication
- Automatic message routing between processes
- Non-blocking communication
- Source transport information preserved

### Message Format
```python
{
'source_transport': 'transport.0',
'target_transport': 'influxdb_output',
'data': {...},
'source_transport_info': {
'transport_name': 'transport.0',
'device_identifier': '...',
'device_manufacturer': '...',
'device_model': '...',
'device_serial_number': '...'
}
}
```

## Benefits

### Performance
- **True concurrency** - no serialization delays
- **Independent timing** - each transport runs at its own interval
- **No resource contention** - each process has isolated resources

### Reliability
- **Process isolation** - one transport failure doesn't affect others
- **Automatic restart** - failed processes are automatically restarted
- **Independent error handling** - each process handles its own errors

### Scalability
- **Linear scaling** - performance scales with number of CPU cores
- **Resource efficiency** - only uses multiprocessing when needed
- **Memory isolation** - each process has its own memory space

## Troubleshooting

### Common Issues

#### "Register is Empty; transport busy?"
- **Cause**: Shared state between transports in single-threaded mode
- **Solution**: Use multiprocessing mode (automatic with multiple transports)

#### InfluxDB not receiving data
- **Cause**: Output transport not properly configured or started
- **Solution**: Ensure `influxdb_output` section has `transport = influxdb_out`

#### Process restarting frequently
- **Cause**: Transport configuration error or device connection issue
- **Solution**: Check logs for specific error messages

### Debugging

#### Enable Debug Logging
```ini
[general]
log_level = DEBUG
```

#### Monitor Process Status
The gateway logs process creation and status:
```
[2025-06-22 19:30:45] Starting multiprocessing mode with 3 transports
[2025-06-22 19:30:45] Input transports: 2, Output transports: 1
[2025-06-22 19:30:45] Bridging detected - enabling inter-process communication
[2025-06-22 19:30:45] Started process for transport.0 (PID: 12345)
[2025-06-22 19:30:45] Started process for transport.1 (PID: 12346)
[2025-06-22 19:30:45] Started process for influxdb_output (PID: 12347)
```

## Testing

Run the test script to verify multiprocessing functionality:
```bash
python pytests/test_multiprocessing.py
```

This will:
- Load your configuration
- Display transport information
- Run for 30 seconds to verify operation
- Show any errors or issues

## Limitations

1. **Memory Usage**: Each process uses additional memory
2. **Startup Time**: Slight delay when starting multiple processes
3. **Inter-Process Communication**: Bridge messages have small overhead
4. **Debugging**: More complex debugging due to multiple processes

## Migration

No migration required! Existing configurations will automatically benefit from multiprocessing when multiple transports are present.

## Performance Tips

1. **Stagger Read Intervals**: Use different read intervals to avoid resource contention
2. **Optimize Batch Sizes**: Adjust batch sizes for faster individual reads
3. **Monitor Logs**: Watch for process restarts indicating issues
4. **Resource Limits**: Ensure sufficient system resources for multiple processes
3 changes: 2 additions & 1 deletion classes/protocol_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ class protocol_settings:
_log : logging.Logger = None


def __init__(self, protocol : str, transport_settings : "SectionProxy" = None, settings_dir : str = "protocols"):
def __init__(self, protocol : str, transport_settings : "SectionProxy" = None, settings_dir : str = "protocols", unique_id : str = None):

#apply log level to logger
self._log_level = getattr(logging, logging.getLevelName(logging.getLogger().getEffectiveLevel()), logging.INFO)
Expand All @@ -275,6 +275,7 @@ def __init__(self, protocol : str, transport_settings : "SectionProxy" = None, s
self.protocol = protocol
self.settings_dir = settings_dir
self.transport_settings = transport_settings
self.unique_id = unique_id # Store unique identifier for this instance

#load variable mask
self.variable_mask = []
Expand Down
28 changes: 24 additions & 4 deletions classes/transports/influxdb_out.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from configparser import SectionProxy
from typing import TextIO
import time
import logging

from defs.common import strtobool

Expand All @@ -21,6 +22,7 @@ class influxdb_out(transport_base):
include_device_info: bool = True
batch_size: int = 100
batch_timeout: float = 10.0
force_float: bool = True # Force all numeric fields to be floats to avoid InfluxDB type conflicts

client = None
batch_points = []
Expand All @@ -37,6 +39,7 @@ def __init__(self, settings: SectionProxy):
self.include_device_info = strtobool(settings.get("include_device_info", fallback=self.include_device_info))
self.batch_size = settings.getint("batch_size", fallback=self.batch_size)
self.batch_timeout = settings.getfloat("batch_timeout", fallback=self.batch_timeout)
self.force_float = strtobool(settings.get("force_float", fallback=self.force_float))

self.write_enabled = True # InfluxDB output is always write-enabled
super().__init__(settings)
Expand Down Expand Up @@ -103,14 +106,17 @@ def write_data(self, data: dict[str, str], from_transport: transport_base):
for key, value in data.items():
# Check if we should force float formatting based on protocol settings
should_force_float = False
unit_mod_found = None

# Try to get registry entry from protocol settings to check unit_mod
if hasattr(from_transport, 'protocolSettings') and from_transport.protocolSettings:
# Check both input and holding registries
for registry_type in [Registry_Type.INPUT, Registry_Type.HOLDING]:
registry_map = from_transport.protocolSettings.get_registry_map(registry_type)
for entry in registry_map:
if entry.variable_name == key:
# Match by variable_name (which is lowercase)
if entry.variable_name.lower() == key.lower():
unit_mod_found = entry.unit_mod
# If unit_mod is not 1.0, this value should be treated as float
if entry.unit_mod != 1.0:
should_force_float = True
Expand All @@ -124,14 +130,28 @@ def write_data(self, data: dict[str, str], from_transport: transport_base):
# Try to convert to float first
float_val = float(value)

# If it's an integer but should be forced to float, or if it's already a float
if should_force_float or not float_val.is_integer():
# Always use float for InfluxDB to avoid type conflicts
# InfluxDB is strict about field types - once a field is created as integer,
# it must always be integer. Using float avoids this issue.
if self.force_float:
fields[key] = float_val
else:
fields[key] = int(float_val)
# Only use integer if it's actually an integer and we're not forcing floats
if float_val.is_integer():
fields[key] = int(float_val)
else:
fields[key] = float_val

# Log data type conversion for debugging
if self._log.isEnabledFor(logging.DEBUG):
original_type = type(value).__name__
final_type = type(fields[key]).__name__
self._log.debug(f"Field {key}: {value} ({original_type}) -> {fields[key]} ({final_type}) [unit_mod: {unit_mod_found}]")

except (ValueError, TypeError):
# If conversion fails, store as string
fields[key] = str(value)
self._log.debug(f"Field {key}: {value} -> string (conversion failed)")

# Create InfluxDB point
point = {
Expand Down
5 changes: 4 additions & 1 deletion classes/transports/transport_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,10 @@ def __init__(self, settings : "SectionProxy") -> None:
#must load after settings
self.protocol_version = settings.get("protocol_version")
if self.protocol_version:
self.protocolSettings = protocol_settings(self.protocol_version, transport_settings=settings)
# Create a unique protocol settings instance for each transport to avoid shared state
unique_id = f"{self.transport_name}_{self.protocol_version}"
self._log.debug(f"Creating protocol settings with unique_id: {unique_id}")
self.protocolSettings = protocol_settings(self.protocol_version, transport_settings=settings, unique_id=unique_id)

if self.protocolSettings:
self.protocol_version = self.protocolSettings.protocol
Expand Down
Loading