diff --git a/classes/transports/influxdb_out.py b/classes/transports/influxdb_out.py index 01c698a..0e61032 100644 --- a/classes/transports/influxdb_out.py +++ b/classes/transports/influxdb_out.py @@ -1,4 +1,7 @@ import sys +import os +import json +import pickle from configparser import SectionProxy from typing import TextIO import time @@ -24,9 +27,36 @@ class influxdb_out(transport_base): batch_timeout: float = 10.0 force_float: bool = True # Force all numeric fields to be floats to avoid InfluxDB type conflicts + # Connection monitoring settings + reconnect_attempts: int = 5 + reconnect_delay: float = 5.0 + connection_timeout: int = 10 + + # Exponential backoff settings + use_exponential_backoff: bool = True + max_reconnect_delay: float = 300.0 # 5 minutes max delay + + # Persistent storage settings + enable_persistent_storage: bool = True + persistent_storage_path: str = "influxdb_backlog" + max_backlog_size: int = 10000 # Maximum number of points to store + max_backlog_age: int = 86400 # 24 hours in seconds + + # Periodic reconnection settings + periodic_reconnect_interval: float = 14400.0 # 4 hours in seconds + client = None batch_points = [] last_batch_time = 0 + last_connection_check = 0 + connection_check_interval = 300 # Check connection every 300 seconds + + # Periodic reconnection settings + last_periodic_reconnect_attempt = 0 + + # Persistent storage + backlog_file = None + backlog_points = [] def __init__(self, settings: SectionProxy): self.host = settings.get("host", fallback=self.host) @@ -41,8 +71,134 @@ def __init__(self, settings: SectionProxy): self.batch_timeout = settings.getfloat("batch_timeout", fallback=self.batch_timeout) self.force_float = strtobool(settings.get("force_float", fallback=self.force_float)) + # Connection monitoring settings + self.reconnect_attempts = settings.getint("reconnect_attempts", fallback=self.reconnect_attempts) + self.reconnect_delay = settings.getfloat("reconnect_delay", fallback=self.reconnect_delay) + self.connection_timeout = settings.getint("connection_timeout", fallback=self.connection_timeout) + + # Exponential backoff settings + self.use_exponential_backoff = strtobool(settings.get("use_exponential_backoff", fallback=self.use_exponential_backoff)) + self.max_reconnect_delay = settings.getfloat("max_reconnect_delay", fallback=self.max_reconnect_delay) + + # Persistent storage settings + self.enable_persistent_storage = strtobool(settings.get("enable_persistent_storage", fallback=self.enable_persistent_storage)) + self.persistent_storage_path = settings.get("persistent_storage_path", fallback=self.persistent_storage_path) + self.max_backlog_size = settings.getint("max_backlog_size", fallback=self.max_backlog_size) + self.max_backlog_age = settings.getint("max_backlog_age", fallback=self.max_backlog_age) + + # Periodic reconnection settings + self.periodic_reconnect_interval = settings.getfloat("periodic_reconnect_interval", fallback=self.periodic_reconnect_interval) + self.write_enabled = True # InfluxDB output is always write-enabled super().__init__(settings) + + # Initialize persistent storage + if self.enable_persistent_storage: + self._init_persistent_storage() + + def _init_persistent_storage(self): + """Initialize persistent storage for data backlog""" + try: + # Create storage directory if it doesn't exist + if not os.path.exists(self.persistent_storage_path): + os.makedirs(self.persistent_storage_path) + + # Create backlog file path + self.backlog_file = os.path.join( + self.persistent_storage_path, + f"influxdb_backlog_{self.transport_name}.pkl" + ) + + # Load existing backlog + self._load_backlog() + + self._log.info(f"Persistent storage initialized: {self.backlog_file}") + self._log.info(f"Loaded {len(self.backlog_points)} points from backlog") + + except Exception as e: + self._log.error(f"Failed to initialize persistent storage: {e}") + self.enable_persistent_storage = False + + def _load_backlog(self): + """Load backlog points from persistent storage""" + if not self.backlog_file or not os.path.exists(self.backlog_file): + self.backlog_points = [] + return + + try: + with open(self.backlog_file, 'rb') as f: + self.backlog_points = pickle.load(f) + + # Clean old points based on age + current_time = time.time() + original_count = len(self.backlog_points) + self.backlog_points = [ + point for point in self.backlog_points + if current_time - point.get('_backlog_time', 0) < self.max_backlog_age + ] + + if len(self.backlog_points) < original_count: + self._log.info(f"Cleaned {original_count - len(self.backlog_points)} old points from backlog") + self._save_backlog() + + except Exception as e: + self._log.error(f"Failed to load backlog: {e}") + self.backlog_points = [] + + def _save_backlog(self): + """Save backlog points to persistent storage""" + if not self.backlog_file or not self.enable_persistent_storage: + return + + try: + with open(self.backlog_file, 'wb') as f: + pickle.dump(self.backlog_points, f) + except Exception as e: + self._log.error(f"Failed to save backlog: {e}") + + def _add_to_backlog(self, point): + """Add a point to the backlog""" + if not self.enable_persistent_storage: + return + + # Add timestamp for age tracking + point['_backlog_time'] = time.time() + + self.backlog_points.append(point) + + # Limit backlog size + if len(self.backlog_points) > self.max_backlog_size: + removed = self.backlog_points.pop(0) # Remove oldest point + self._log.warning(f"Backlog full, removed oldest point: {removed.get('measurement', 'unknown')}") + + self._save_backlog() + self._log.debug(f"Added point to backlog. Backlog size: {len(self.backlog_points)}") + + def _flush_backlog(self): + """Flush backlog points to InfluxDB""" + if not self.backlog_points or not self.connected: + return + + self._log.info(f"Flushing {len(self.backlog_points)} backlog points to InfluxDB") + + try: + # Remove internal timestamp before sending to InfluxDB + points_to_send = [] + for point in self.backlog_points: + point_copy = point.copy() + point_copy.pop('_backlog_time', None) # Remove internal timestamp + points_to_send.append(point_copy) + + self.client.write_points(points_to_send) + self._log.info(f"Successfully wrote {len(points_to_send)} backlog points to InfluxDB") + + # Clear backlog after successful write + self.backlog_points = [] + self._save_backlog() + + except Exception as e: + self._log.error(f"Failed to flush backlog to InfluxDB: {e}") + # Don't clear backlog on failure - will retry later def connect(self): """Initialize the InfluxDB client connection""" @@ -51,13 +207,14 @@ def connect(self): try: from influxdb import InfluxDBClient - # Create InfluxDB client + # Create InfluxDB client with timeout settings self.client = InfluxDBClient( host=self.host, port=self.port, username=self.username if self.username else None, password=self.password if self.password else None, - database=self.database + database=self.database, + timeout=self.connection_timeout ) # Test connection @@ -70,8 +227,14 @@ def connect(self): self.client.create_database(self.database) self.connected = True + self.last_connection_check = time.time() + self.last_periodic_reconnect_attempt = time.time() self._log.info(f"Connected to InfluxDB at {self.host}:{self.port}") + # Flush any backlog after successful connection + if self.enable_persistent_storage: + self._flush_backlog() + except ImportError: self._log.error("InfluxDB client not installed. Please install with: pip install influxdb") self.connected = False @@ -79,14 +242,149 @@ def connect(self): self._log.error(f"Failed to connect to InfluxDB: {e}") self.connected = False + def _check_connection(self): + """Check if the connection is still alive and reconnect if necessary""" + current_time = time.time() + + # Check for periodic reconnection (even if connected) + if (self.periodic_reconnect_interval > 0 and + current_time - self.last_periodic_reconnect_attempt >= self.periodic_reconnect_interval): + + self.last_periodic_reconnect_attempt = current_time + self._log.info(f"Periodic reconnection check (every {self.periodic_reconnect_interval} seconds)") + + # Force a reconnection attempt to refresh the connection + if self.connected and self.client: + try: + # Test current connection + self.client.ping() + self._log.debug("Periodic connection check: connection is healthy") + except Exception as e: + self._log.warning(f"Periodic connection check failed: {e}") + return self._attempt_reconnect() + else: + # Not connected, attempt reconnection + return self._attempt_reconnect() + + # Only check connection periodically to avoid excessive ping calls + if current_time - self.last_connection_check < self.connection_check_interval: + return self.connected + + self.last_connection_check = current_time + + if not self.connected or not self.client: + return self._attempt_reconnect() + + try: + # Test connection with ping + self.client.ping() + return True + except Exception as e: + self._log.warning(f"Connection check failed: {e}") + return self._attempt_reconnect() + + def _attempt_reconnect(self): + """Attempt to reconnect to InfluxDB with exponential backoff""" + self._log.info(f"Attempting to reconnect to InfluxDB at {self.host}:{self.port}") + + for attempt in range(self.reconnect_attempts): + try: + self._log.info(f"Reconnection attempt {attempt + 1}/{self.reconnect_attempts}") + + # Close existing client if it exists + if self.client: + try: + self.client.close() + except Exception: + pass + + # Create new client + from influxdb import InfluxDBClient + self.client = InfluxDBClient( + host=self.host, + port=self.port, + username=self.username if self.username else None, + password=self.password if self.password else None, + database=self.database, + timeout=self.connection_timeout + ) + + # Test connection + self.client.ping() + + self.connected = True + self.last_periodic_reconnect_attempt = time.time() + self._log.info(f"Successfully reconnected to InfluxDB") + + # Flush any backlog after successful reconnection + if self.enable_persistent_storage: + self._flush_backlog() + + return True + + except Exception as e: + self._log.warning(f"Reconnection attempt {attempt + 1} failed: {e}") + if attempt < self.reconnect_attempts - 1: + # Calculate delay with exponential backoff + if self.use_exponential_backoff: + delay = min(self.reconnect_delay * (2 ** attempt), self.max_reconnect_delay) + self._log.info(f"Waiting {delay:.1f} seconds before next attempt (exponential backoff)") + else: + delay = self.reconnect_delay + self._log.info(f"Waiting {delay:.1f} seconds before next attempt") + + time.sleep(delay) + + self._log.error(f"Failed to reconnect after {self.reconnect_attempts} attempts") + self.connected = False + return False + + def trigger_periodic_reconnect(self): + """Manually trigger a periodic reconnection check""" + self.last_periodic_reconnect_attempt = 0 # Reset timer to force immediate check + return self._check_connection() + def write_data(self, data: dict[str, str], from_transport: transport_base): """Write data to InfluxDB""" - if not self.write_enabled or not self.connected: + if not self.write_enabled: return - self._log.info(f"write data from [{from_transport.transport_name}] to influxdb_out transport") - self._log.info(data) + # Check connection status before processing data + if not self._check_connection(): + self._log.warning("Not connected to InfluxDB, storing data in backlog") + # Store data in backlog instead of skipping + self._process_and_store_data(data, from_transport) + return + self._log.debug(f"write data from [{from_transport.transport_name}] to influxdb_out transport") + self._log.debug(f"Data: {data}") + + # Process and write data + self._process_and_write_data(data, from_transport) + + def _process_and_store_data(self, data: dict[str, str], from_transport: transport_base): + """Process data and store in backlog when not connected""" + if not self.enable_persistent_storage: + self._log.warning("Persistent storage disabled, data will be lost") + return + + # Create InfluxDB point + point = self._create_influxdb_point(data, from_transport) + + # Add to backlog + self._add_to_backlog(point) + + # Also add to current batch for immediate flush when reconnected + self.batch_points.append(point) + + current_time = time.time() + if (len(self.batch_points) >= self.batch_size or + (current_time - self.last_batch_time) >= self.batch_timeout): + self._log.debug(f"Flushing batch to backlog: size={len(self.batch_points)}") + self._flush_batch() + + def _process_and_write_data(self, data: dict[str, str], from_transport: transport_base): + """Process data and write to InfluxDB when connected""" # Prepare tags for InfluxDB tags = {} @@ -100,6 +398,7 @@ def write_data(self, data: dict[str, str], from_transport: transport_base): "device_serial_number": from_transport.device_serial_number, "transport": from_transport.transport_name }) + self._log.debug(f"Tags: {tags}") # Prepare fields (the actual data values) fields = {} @@ -153,6 +452,78 @@ def write_data(self, data: dict[str, str], from_transport: transport_base): fields[key] = str(value) self._log.debug(f"Field {key}: {value} -> string (conversion failed)") + # Create InfluxDB point + point = self._create_influxdb_point(data, from_transport) + + # Add to batch + self.batch_points.append(point) + self._log.debug(f"Added point to batch. Batch size: {len(self.batch_points)}") + + # Check if we should flush the batch + current_time = time.time() + if (len(self.batch_points) >= self.batch_size or + (current_time - self.last_batch_time) >= self.batch_timeout): + self._log.debug(f"Flushing batch: size={len(self.batch_points)}, timeout={current_time - self.last_batch_time:.1f}s") + self._flush_batch() + + def _create_influxdb_point(self, data: dict[str, str], from_transport: transport_base): + """Create an InfluxDB point from data""" + # Prepare tags for InfluxDB + tags = {} + + # Add device information as tags if enabled + if self.include_device_info: + tags.update({ + "device_identifier": from_transport.device_identifier, + "device_name": from_transport.device_name, + "device_manufacturer": from_transport.device_manufacturer, + "device_model": from_transport.device_model, + "device_serial_number": from_transport.device_serial_number, + "transport": from_transport.transport_name + }) + + # Prepare fields (the actual data values) + fields = {} + for key, value in data.items(): + # Check if we should force float formatting based on protocol settings + should_force_float = False + unit_mod_found = None + + # Try to get registry entry from protocol settings to check unit_mod + if hasattr(from_transport, 'protocolSettings') and from_transport.protocolSettings: + # Check both input and holding registries + for registry_type in [Registry_Type.INPUT, Registry_Type.HOLDING]: + registry_map = from_transport.protocolSettings.get_registry_map(registry_type) + for entry in registry_map: + # Match by variable_name (which is lowercase) + if entry.variable_name.lower() == key.lower(): + unit_mod_found = entry.unit_mod + # If unit_mod is not 1.0, this value should be treated as float + if entry.unit_mod != 1.0: + should_force_float = True + break + if should_force_float: + break + + # Try to convert to numeric values for InfluxDB + try: + # Try to convert to float first + float_val = float(value) + + # Always use float for InfluxDB to avoid type conflicts + if self.force_float: + fields[key] = float_val + else: + # Only use integer if it's actually an integer and we're not forcing floats + if float_val.is_integer(): + fields[key] = int(float_val) + else: + fields[key] = float_val + + except (ValueError, TypeError): + # If conversion fails, store as string + fields[key] = str(value) + # Create InfluxDB point point = { "measurement": self.measurement, @@ -164,20 +535,22 @@ def write_data(self, data: dict[str, str], from_transport: transport_base): if self.include_timestamp: point["time"] = int(time.time() * 1e9) # Convert to nanoseconds - # Add to batch - self.batch_points.append(point) - - # Check if we should flush the batch - current_time = time.time() - if (len(self.batch_points) >= self.batch_size or - (current_time - self.last_batch_time) >= self.batch_timeout): - self._flush_batch() + return point def _flush_batch(self): """Flush the batch of points to InfluxDB""" if not self.batch_points: return + # Check connection before attempting to write + if not self._check_connection(): + self._log.warning("Not connected to InfluxDB, storing batch in backlog") + # Store all points in backlog + for point in self.batch_points: + self._add_to_backlog(point) + self.batch_points = [] + return + try: self.client.write_points(self.batch_points) self._log.info(f"Wrote {len(self.batch_points)} points to InfluxDB") @@ -185,7 +558,27 @@ def _flush_batch(self): self.last_batch_time = time.time() except Exception as e: self._log.error(f"Failed to write batch to InfluxDB: {e}") - self.connected = False + # Don't immediately mark as disconnected, try to reconnect first + if self._attempt_reconnect(): + # If reconnection successful, try to write again + try: + self.client.write_points(self.batch_points) + self._log.info(f"Successfully wrote {len(self.batch_points)} points to InfluxDB after reconnection") + self.batch_points = [] + self.last_batch_time = time.time() + except Exception as retry_e: + self._log.error(f"Failed to write batch after reconnection: {retry_e}") + # Store failed points in backlog + for point in self.batch_points: + self._add_to_backlog(point) + self.batch_points = [] + self.connected = False + else: + # Store failed points in backlog + for point in self.batch_points: + self._add_to_backlog(point) + self.batch_points = [] + self.connected = False def init_bridge(self, from_transport: transport_base): """Initialize bridge - not needed for InfluxDB output""" diff --git a/config.influxdb.example b/config.influxdb.example index c3a07a8..da44363 100644 --- a/config.influxdb.example +++ b/config.influxdb.example @@ -13,6 +13,24 @@ batch_size = 100 batch_timeout = 10.0 log_level = INFO +# Connection monitoring settings (optional) +reconnect_attempts = 5 +reconnect_delay = 5.0 +connection_timeout = 10 + +# Exponential backoff settings (optional) +use_exponential_backoff = true +max_reconnect_delay = 300.0 + +# Periodic reconnection settings (optional) +periodic_reconnect_interval = 14400.0 + +# Persistent storage for long-term outages (optional) +enable_persistent_storage = true +persistent_storage_path = influxdb_backlog +max_backlog_size = 10000 +max_backlog_age = 86400 + # Example bridge configuration [modbus_rtu_source] type = modbus_rtu diff --git a/documentation/usage/configuration_examples/influxdb_example.md b/documentation/usage/configuration_examples/influxdb_example.md index 56a500a..eeeb4e9 100644 --- a/documentation/usage/configuration_examples/influxdb_example.md +++ b/documentation/usage/configuration_examples/influxdb_example.md @@ -9,6 +9,8 @@ The InfluxDB output transport allows you to send data from your devices directly - **Device Information Tags**: Includes device metadata as InfluxDB tags for easy querying - **Flexible Data Types**: Automatically converts data to appropriate InfluxDB field types - **Configurable Timeouts**: Adjustable batch size and timeout settings +- **Connection Monitoring**: Automatic connection health checks and reconnection logic +- **Robust Error Handling**: Retries failed writes after reconnection attempts ## Configuration @@ -39,6 +41,11 @@ include_device_info = true batch_size = 100 batch_timeout = 10.0 log_level = INFO + +# Connection monitoring settings +reconnect_attempts = 5 +reconnect_delay = 5.0 +connection_timeout = 10 ``` ### Configuration Options @@ -55,6 +62,29 @@ log_level = INFO | `include_device_info` | `true` | Include device information as tags | | `batch_size` | `100` | Number of points to batch before writing | | `batch_timeout` | `10.0` | Maximum time (seconds) to wait before flushing batch | +| `reconnect_attempts` | `5` | Number of reconnection attempts before giving up | +| `reconnect_delay` | `5.0` | Delay between reconnection attempts (seconds) | +| `connection_timeout` | `10` | Connection timeout for InfluxDB client (seconds) | + +## Connection Monitoring + +The InfluxDB transport includes robust connection monitoring to handle network issues and server restarts: + +### Automatic Health Checks +- Performs connection health checks every 30 seconds +- Uses InfluxDB ping command to verify connectivity +- Automatically attempts reconnection if connection is lost + +### Reconnection Logic +- Attempts reconnection up to `reconnect_attempts` times +- Waits `reconnect_delay` seconds between attempts +- Preserves buffered data during reconnection attempts +- Retries failed writes after successful reconnection + +### Error Recovery +- Gracefully handles network timeouts and connection drops +- Maintains data integrity by not losing buffered points +- Provides detailed logging for troubleshooting ## Data Structure @@ -161,6 +191,7 @@ InfluxDB data can be easily visualized in Grafana: - Verify InfluxDB is running: `systemctl status influxdb` - Check firewall settings for port 8086 - Verify host and port configuration +- Check connection timeout settings if using slow networks ### Authentication Issues - Ensure username/password are correct @@ -170,8 +201,17 @@ InfluxDB data can be easily visualized in Grafana: - Check log levels for detailed error messages - Verify database exists and is accessible - Check batch settings - data may be buffered +- Look for reconnection messages in logs + +### Data Stops After Some Time +- **Most Common Issue**: Network connectivity problems or InfluxDB server restarts +- Check logs for reconnection attempts and failures +- Verify InfluxDB server is stable and not restarting +- Consider increasing `reconnect_attempts` and `reconnect_delay` for unstable networks +- Monitor network connectivity between gateway and InfluxDB server ### Performance - Adjust `batch_size` and `batch_timeout` for your use case - Larger batches reduce network overhead but increase memory usage -- Shorter timeouts provide more real-time data but increase network traffic \ No newline at end of file +- Shorter timeouts provide more real-time data but increase network traffic +- Increase `connection_timeout` for slow networks \ No newline at end of file diff --git a/documentation/usage/influxdb_advanced_features.md b/documentation/usage/influxdb_advanced_features.md new file mode 100644 index 0000000..a997d94 --- /dev/null +++ b/documentation/usage/influxdb_advanced_features.md @@ -0,0 +1,413 @@ +# InfluxDB Advanced Features: Exponential Backoff & Persistent Storage + +## Overview + +The InfluxDB transport now includes advanced features to handle network instability and long-term outages: + +1. **Exponential Backoff**: Intelligent reconnection timing to avoid overwhelming the server +2. **Persistent Storage**: Local data storage to prevent data loss during extended outages +3. **Periodic Reconnection**: Regular connection health checks even during quiet periods + +## Exponential Backoff + +### How It Works + +Instead of using a fixed delay between reconnection attempts, exponential backoff increases the delay exponentially: + +- **Attempt 1**: 5 seconds delay +- **Attempt 2**: 10 seconds delay +- **Attempt 3**: 20 seconds delay +- **Attempt 4**: 40 seconds delay +- **Attempt 5**: 80 seconds delay (capped at max_reconnect_delay) + +### Configuration + +```ini +[influxdb_output] +# Enable exponential backoff +use_exponential_backoff = true + +# Base delay between attempts (seconds) +reconnect_delay = 5.0 + +# Maximum delay cap (seconds) +max_reconnect_delay = 300.0 + +# Number of reconnection attempts +reconnect_attempts = 5 +``` + +### Benefits + +- **Reduces Server Load**: Prevents overwhelming the InfluxDB server during recovery +- **Network Friendly**: Respects network conditions and server capacity +- **Configurable**: Adjust timing based on your environment + +### Example Scenarios + +#### Short Network Glitch +``` +Attempt 1: 5s delay → Success +Total time: ~5 seconds +``` + +#### Server Restart +``` +Attempt 1: 5s delay → Fail +Attempt 2: 10s delay → Fail +Attempt 3: 20s delay → Success +Total time: ~35 seconds +``` + +#### Extended Outage +``` +Attempt 1: 5s delay → Fail +Attempt 2: 10s delay → Fail +Attempt 3: 20s delay → Fail +Attempt 4: 40s delay → Fail +Attempt 5: 80s delay → Fail +Total time: ~155 seconds, then data stored in backlog +``` + +## Periodic Reconnection + +### How It Works + +Periodic reconnection ensures the connection to InfluxDB remains healthy even during periods when no data is being written: + +- **Regular Health Checks**: Performs connection tests at configurable intervals +- **Connection Refresh**: Re-establishes connection even if it appears healthy +- **Quiet Period Handling**: Maintains connection during low-activity periods +- **Proactive Recovery**: Detects and fixes connection issues before data loss + +### Configuration + +```ini +[influxdb_output] +# Periodic reconnection interval (seconds) +periodic_reconnect_interval = 14400.0 # 4 hours (default) + +# Disable periodic reconnection +periodic_reconnect_interval = 0 +``` + +### Benefits + +- **Connection Stability**: Prevents connection timeouts during quiet periods +- **Proactive Monitoring**: Detects issues before they affect data transmission +- **Network Resilience**: Handles network changes and server restarts +- **Configurable**: Adjust interval based on your environment + +### Example Scenarios + +#### Quiet Periods (No Data) +``` +10:00 AM: Last data written +11:00 AM: Periodic reconnection check → Connection healthy +12:00 PM: Periodic reconnection check → Connection healthy +01:00 PM: Periodic reconnection check → Connection healthy +02:00 PM: New data arrives → Immediate transmission +``` + +#### Network Issues During Quiet Period +``` +10:00 AM: Last data written +11:00 AM: Periodic reconnection check → Connection failed +11:00 AM: Attempting reconnection → Success +12:00 PM: Periodic reconnection check → Connection healthy +``` + +#### Server Restart During Quiet Period +``` +10:00 AM: Last data written +11:00 AM: Periodic reconnection check → Connection failed +11:00 AM: Attempting reconnection → Success (server restarted) +12:00 PM: Periodic reconnection check → Connection healthy +``` + +## Persistent Storage (Data Backlog) + +### How It Works + +When InfluxDB is unavailable, data is stored locally in pickle files: + +1. **Data Collection**: Points are stored in memory and on disk +2. **Automatic Cleanup**: Old data is removed based on age limits +3. **Recovery**: When connection is restored, backlog is flushed to InfluxDB +4. **Size Management**: Backlog is limited to prevent disk space issues + +### Configuration + +```ini +[influxdb_output] +# Enable persistent storage +enable_persistent_storage = true + +# Storage directory (relative to gateway directory) +persistent_storage_path = influxdb_backlog + +# Maximum number of points to store +max_backlog_size = 10000 + +# Maximum age of points in seconds (24 hours) +max_backlog_age = 86400 +``` + +### Storage Structure + +``` +influxdb_backlog/ +├── influxdb_backlog_influxdb_output.pkl +├── influxdb_backlog_another_transport.pkl +└── ... +``` + +### Data Recovery Process + +1. **Connection Lost**: Data continues to be collected and stored locally +2. **Reconnection**: When InfluxDB becomes available, backlog is detected +3. **Batch Upload**: All stored points are sent to InfluxDB in batches +4. **Cleanup**: Backlog is cleared after successful upload + +### Example Recovery Log + +``` +[2024-01-15 10:30:00] Connection check failed: Connection refused +[2024-01-15 10:30:00] Not connected to InfluxDB, storing data in backlog +[2024-01-15 10:30:00] Added point to backlog. Backlog size: 1 +... +[2024-01-15 18:45:00] Attempting to reconnect to InfluxDB at localhost:8086 +[2024-01-15 18:45:00] Successfully reconnected to InfluxDB +[2024-01-15 18:45:00] Flushing 2847 backlog points to InfluxDB +[2024-01-15 18:45:00] Successfully wrote 2847 backlog points to InfluxDB +``` + +## Configuration Examples + +### For Stable Networks (Local InfluxDB) + +```ini +[influxdb_output] +transport = influxdb_out +host = localhost +port = 8086 +database = solar + +# Standard reconnection +reconnect_attempts = 3 +reconnect_delay = 2.0 +use_exponential_backoff = false + +# Periodic reconnection +periodic_reconnect_interval = 1800.0 # 30 minutes + +# Minimal persistent storage +enable_persistent_storage = true +max_backlog_size = 1000 +max_backlog_age = 3600 # 1 hour +``` + +### For Unstable Networks (Remote InfluxDB) + +```ini +[influxdb_output] +transport = influxdb_out +host = remote.influxdb.com +port = 8086 +database = solar + +# Aggressive reconnection with exponential backoff +reconnect_attempts = 10 +reconnect_delay = 5.0 +use_exponential_backoff = true +max_reconnect_delay = 600.0 # 10 minutes + +# Frequent periodic reconnection +periodic_reconnect_interval = 900.0 # 15 minutes + +# Large persistent storage for extended outages +enable_persistent_storage = true +max_backlog_size = 50000 +max_backlog_age = 604800 # 1 week +``` + +### For High-Volume Data + +```ini +[influxdb_output] +transport = influxdb_out +host = localhost +port = 8086 +database = solar + +# Fast reconnection for high availability +reconnect_attempts = 5 +reconnect_delay = 1.0 +use_exponential_backoff = true +max_reconnect_delay = 60.0 + +# Less frequent periodic reconnection (data keeps connection alive) +periodic_reconnect_interval = 14400.0 # 4 hours (default) + +# Large backlog for high data rates +enable_persistent_storage = true +max_backlog_size = 100000 +max_backlog_age = 86400 # 24 hours + +# Optimized batching +batch_size = 500 +batch_timeout = 5.0 +``` + +## Monitoring and Maintenance + +### Check Backlog Status + +```bash +# Check backlog file sizes +ls -lh influxdb_backlog/ + +# Check backlog contents (Python script) +python3 -c " +import pickle +import os +for file in os.listdir('influxdb_backlog'): + if file.endswith('.pkl'): + with open(f'influxdb_backlog/{file}', 'rb') as f: + data = pickle.load(f) + print(f'{file}: {len(data)} points') +" +``` + +### Monitor Logs + +```bash +# Monitor backlog activity +grep -i "backlog\|persistent" /var/log/protocol_gateway.log + +# Monitor reconnection attempts +grep -i "reconnect\|exponential" /var/log/protocol_gateway.log + +# Monitor periodic reconnection +grep -i "periodic.*reconnect" /var/log/protocol_gateway.log +``` + +### Cleanup Old Backlog Files + +```bash +# Remove backlog files older than 7 days +find influxdb_backlog/ -name "*.pkl" -mtime +7 -delete +``` + +## Performance Considerations + +### Memory Usage + +- **Backlog Storage**: Each point uses ~200-500 bytes in memory +- **10,000 points**: ~2-5 MB memory usage +- **100,000 points**: ~20-50 MB memory usage + +### Disk Usage + +- **Backlog Files**: Compressed pickle format +- **10,000 points**: ~1-2 MB disk space +- **100,000 points**: ~10-20 MB disk space + +### Network Impact + +- **Recovery Upload**: Large batches may take time to upload +- **Bandwidth**: Consider network capacity during recovery +- **Server Load**: InfluxDB may experience high load during recovery + +## Troubleshooting + +### Backlog Not Flushing + +**Symptoms:** +- Backlog points remain after reconnection +- No "Flushing X backlog points" messages + +**Solutions:** +- Check InfluxDB server capacity +- Verify database permissions +- Monitor InfluxDB logs for errors + +### Excessive Memory Usage + +**Symptoms:** +- High memory consumption +- Slow performance + +**Solutions:** +- Reduce `max_backlog_size` +- Decrease `max_backlog_age` +- Monitor system resources + +### Disk Space Issues + +**Symptoms:** +- "Backlog full" warnings +- Disk space running low + +**Solutions:** +- Clean up old backlog files +- Reduce `max_backlog_size` +- Move `persistent_storage_path` to larger disk + +### Reconnection Too Aggressive + +**Symptoms:** +- High CPU usage during outages +- Network congestion + +**Solutions:** +- Increase `reconnect_delay` +- Reduce `reconnect_attempts` +- Enable `use_exponential_backoff` + +## Best Practices + +### 1. Size Your Backlog Appropriately + +```ini +# For 1-minute intervals, 24-hour outage +max_backlog_size = 1440 # 24 * 60 + +# For 5-minute intervals, 1-week outage +max_backlog_size = 2016 # 7 * 24 * 12 +``` + +### 2. Monitor and Clean + +- Regularly check backlog file sizes +- Clean up old files automatically +- Monitor disk space usage + +### 3. Test Recovery + +- Simulate outages to test recovery +- Verify data integrity after recovery +- Monitor performance during recovery + +### 4. Plan for Scale + +- Estimate data volume and outage duration +- Size backlog accordingly +- Monitor system resources + +## Migration from Previous Version + +If upgrading from a version without these features: + +1. **No Configuration Changes Required**: Features are enabled by default with sensible defaults +2. **Backward Compatible**: Existing configurations continue to work +3. **Gradual Adoption**: Disable features if not needed: + +```ini +[influxdb_output] +# Disable exponential backoff +use_exponential_backoff = false + +# Disable persistent storage +enable_persistent_storage = false +``` \ No newline at end of file diff --git a/documentation/usage/troubleshooting_influxdb.md b/documentation/usage/troubleshooting_influxdb.md new file mode 100644 index 0000000..abfc759 --- /dev/null +++ b/documentation/usage/troubleshooting_influxdb.md @@ -0,0 +1,340 @@ +# InfluxDB Troubleshooting Guide + +## Common Issue: Data Stops Being Written to InfluxDB + +This guide helps you diagnose and fix the issue where data stops being written to InfluxDB after some time. + +## Quick Diagnosis + +### 1. Check Logs +First, enable debug logging to see what's happening: + +```ini +[influxdb_output] +transport = influxdb_out +host = localhost +port = 8086 +database = solar +log_level = DEBUG +``` + +Look for these log messages: +- `"Not connected to InfluxDB, skipping data write"` +- `"Connection check failed"` +- `"Attempting to reconnect to InfluxDB"` +- `"Failed to write batch to InfluxDB"` + +### 2. Check InfluxDB Server +Verify InfluxDB is running and accessible: + +```bash +# Check if InfluxDB is running +systemctl status influxdb + +# Test connection +curl -i http://localhost:8086/ping + +# Check if database exists +echo "SHOW DATABASES" | influx +``` + +### 3. Check Network Connectivity +Test network connectivity between your gateway and InfluxDB: + +```bash +# Test basic connectivity +ping your_influxdb_host + +# Test port connectivity +telnet your_influxdb_host 8086 +``` + +## Root Causes and Solutions + +### 1. Network Connectivity Issues + +**Symptoms:** +- Connection timeouts +- Intermittent data loss +- Reconnection attempts in logs + +**Solutions:** +```ini +[influxdb_output] +# Increase timeouts for slow networks +connection_timeout = 30 +reconnect_attempts = 10 +reconnect_delay = 10.0 +``` + +### 2. InfluxDB Server Restarts + +**Symptoms:** +- Connection refused errors +- Sudden data gaps +- Reconnection success after delays + +**Solutions:** +- Monitor InfluxDB server stability +- Check InfluxDB logs for crashes +- Consider using InfluxDB clustering for high availability + +### 3. Memory/Resource Issues + +**Symptoms:** +- Slow response times +- Connection hangs +- Batch write failures + +**Solutions:** +```ini +[influxdb_output] +# Reduce batch size to lower memory usage +batch_size = 50 +batch_timeout = 5.0 +``` + +### 4. Authentication Issues + +**Symptoms:** +- Authentication errors in logs +- Connection succeeds but writes fail + +**Solutions:** +- Verify username/password in configuration +- Check InfluxDB user permissions +- Test authentication manually: + +```bash +curl -i -u username:password http://localhost:8086/query?q=SHOW%20DATABASES +``` + +### 5. Database/Measurement Issues + +**Symptoms:** +- Data appears in InfluxDB but not in expected measurement +- Type conflicts in logs + +**Solutions:** +- Verify database and measurement names +- Check for field type conflicts +- Use `force_float = true` to avoid type issues + +## Configuration Best Practices + +### Recommended Configuration +```ini +[influxdb_output] +transport = influxdb_out +host = localhost +port = 8086 +database = solar +measurement = device_data +include_timestamp = true +include_device_info = true + +# Connection monitoring +reconnect_attempts = 5 +reconnect_delay = 5.0 +connection_timeout = 10 + +# Batching (adjust based on your data rate) +batch_size = 100 +batch_timeout = 10.0 + +# Data handling +force_float = true +log_level = INFO +``` + +### For Unstable Networks +```ini +[influxdb_output] +# More aggressive reconnection +reconnect_attempts = 10 +reconnect_delay = 10.0 +connection_timeout = 30 + +# Smaller batches for faster recovery +batch_size = 50 +batch_timeout = 5.0 +``` + +### For High-Volume Data +```ini +[influxdb_output] +# Larger batches for efficiency +batch_size = 500 +batch_timeout = 30.0 + +# Faster reconnection +reconnect_attempts = 3 +reconnect_delay = 2.0 +``` + +## Monitoring and Alerts + +### 1. Monitor Connection Status +Add this to your monitoring system: +```bash +# Check if gateway is writing data +curl -s "http://localhost:8086/query?db=solar&q=SELECT%20count(*)%20FROM%20device_data%20WHERE%20time%20%3E%20now()%20-%201h" +``` + +### 2. Set Up Alerts +Monitor these conditions: +- No data points in the last hour +- Reconnection attempts > 5 in 10 minutes +- Connection failures > 3 in 5 minutes + +### 3. Log Monitoring +Watch for these log patterns: +```bash +# Monitor for connection issues +grep -i "connection\|reconnect\|failed" /var/log/protocol_gateway.log + +# Monitor for data flow +grep -i "wrote.*points\|batch.*flush" /var/log/protocol_gateway.log +``` + +## Testing Your Setup + +### 1. Test Connection Monitoring +Run the connection test script: +```bash +python test_influxdb_connection.py +``` + +### 2. Test Data Flow +Create a simple test configuration: +```ini +[test_source] +transport = modbus_rtu +port = /dev/ttyUSB0 +baudrate = 9600 +protocol_version = test_protocol +read_interval = 5 +bridge = influxdb_output + +[influxdb_output] +transport = influxdb_out +host = localhost +port = 8086 +database = test +measurement = test_data +log_level = DEBUG +``` + +### 3. Verify Data in InfluxDB +```sql +-- Check if data is being written +SELECT * FROM test_data ORDER BY time DESC LIMIT 10 + +-- Check data rate +SELECT count(*) FROM test_data WHERE time > now() - 1h +``` + +## Advanced Troubleshooting + +### 1. Enable Verbose Logging +```ini +[general] +log_level = DEBUG + +[influxdb_output] +log_level = DEBUG +``` + +### 2. Check Multiprocessing Issues +If using multiple transports, verify bridge configuration: +```ini +# Ensure bridge names match exactly +[source_transport] +bridge = influxdb_output + +[influxdb_output] +transport = influxdb_out +# No bridge needed for output transports +``` + +### 3. Monitor System Resources +```bash +# Check memory usage +free -h + +# Check disk space +df -h + +# Check network connections +netstat -an | grep 8086 +``` + +### 4. InfluxDB Performance Tuning +```ini +# InfluxDB configuration (influxdb.conf) +[data] +wal-fsync-delay = "1s" +cache-max-memory-size = "1g" +series-id-set-cache-size = 100 +``` + +## Common Error Messages + +### "Failed to connect to InfluxDB" +- Check if InfluxDB is running +- Verify host and port +- Check firewall settings + +### "Failed to write batch to InfluxDB" +- Check InfluxDB server resources +- Verify database permissions +- Check for field type conflicts + +### "Not connected to InfluxDB, skipping data write" +- Connection was lost, reconnection in progress +- Check network connectivity +- Monitor reconnection attempts + +### "Connection check failed" +- Network issue or InfluxDB restart +- Check InfluxDB server status +- Verify network connectivity + +## Getting Help + +If you're still experiencing issues: + +1. **Collect Information:** + - Gateway logs with DEBUG level + - InfluxDB server logs + - Network connectivity test results + - Configuration file (remove sensitive data) + +2. **Test Steps:** + - Run the connection test script + - Verify InfluxDB is accessible manually + - Test with a simple configuration + +3. **Provide Details:** + - Operating system and version + - Python version + - InfluxDB version + - Network setup (local/remote InfluxDB) + - Data volume and frequency + +## Prevention + +### 1. Regular Monitoring +- Set up automated monitoring for data flow +- Monitor InfluxDB server health +- Check network connectivity regularly + +### 2. Configuration Validation +- Test configurations before deployment +- Use connection monitoring settings +- Validate InfluxDB permissions + +### 3. Backup Strategies +- Consider multiple InfluxDB instances +- Implement data backup procedures +- Use InfluxDB clustering for high availability \ No newline at end of file