Skip to content

tailucas/event-processor

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

1,862 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Contributors Forks Stargazers Issues MIT License

About The Project

Overview

Note 1: See my write-up on Home Security Automation which provides an architectural overview of how this project works with others in my collection.

Note 2: While I use the word Automation in these projects, there is no integration with sensible frameworks like openHAB or Home Assistant... not yet at least. The goal behind this project was a learning opportunity by employing specific technologies and architectural patterns. The parts most likely to be useful are integrations with libraries like Flask, FastAPI, ZeroMQ, and RabbitMQ, where seamless behavior comes after much trial and error.

Core Functionality

This project is a comprehensive event hub for home automation designed with resource constraints in mind (runs on Raspberry Pi). It extends base-app and takes a git submodule dependency on pylib. Related projects include Snapshot Processor and Remote Monitor.

Key Features:

  • Device Registration & Discovery: Automatically registers devices as input/output sources via RabbitMQ heartbeat messages. Supports multi-role devices (input and output on same source).
  • Web UI Dashboard (Flask + FastAPI):
    • Display available output devices with enable/disable controls
    • Schedule-based automation for device enable/disable
    • Network accessibility via ngrok tunnel or Tailscale
    • Login/authentication system with role-based access
  • Input-Output Linking: Create and configure input-to-output triggers
    • Configuration persisted to local SQLite database
    • Database schema available in config/db_schema.sql
    • Automated backups to Amazon S3 via cron job (backup_db.sh)
  • Message Processing: Routes messages from inputs to configured outputs according to rules
  • Telegram Bot Integration: Built-in SMS output via Telegram bot with optional AWS SNS fallback
  • MQTT Support: Full mDash device discovery integration for MQTT message handling
  • Metrics & Monitoring:
    • Audit trail in local database
    • Meter device updates sent to InfluxDB bucket
    • Sentry error tracking integration
  • High Availability: Optional leader election for multi-instance deployments

Package Structure

This application has grown to 2273 lines in app/__main__.py, demonstrating patterns for complex, multi-threaded Python applications. Significant functionality has been factored into the pylib shared library.

Main Application Components (app/__main__.py):

  • EventProcessor (line 1356+): Core event processing thread responsible for the main application loop. Inherits from AppThread and manages RabbitMQ message routing through MQConnection. Implements the central business logic for connecting device inputs to configured outputs.

  • MQConnection: Manages RabbitMQ exchange connectivity, channel lifecycle, error recovery, and graceful shutdown. Inherits from AppThread for thread tracking and Closable for ZeroMQ socket management.

  • TBot (line 1786+): Telegram bot wrapper using asyncio for async message handling. Provides SMS-like notifications via Telegram with optional AWS SNS fallback. Handles commands, photo uploads, and multi-user access control.

  • HeartbeatFilter: Demonstrates ZeroMQ internal message relay pattern. Receives device heartbeat/status updates from MQTT/RabbitMQ sources, updates device inventory, and relays messages to EventProcessor for processing. See blog post for architectural details.

  • Flask Web Server: Multi-threaded synchronous web application for dashboard, configuration management, and user authentication. Templates render device controls, configuration forms, audit trails, and metrics visualization.

  • FastAPI/uvicorn: Modern async web framework for API endpoints and real-time capabilities. Provides alternative to Flask for new API development.

  • SQLAlchemy ORM: Async database layer for SQLite persistence. Manages device configurations, input-output relationships, audit logs, and metrics.

  • Background Schedulers: Handles scheduled enable/disable of devices and periodic tasks (heartbeat reports, metrics collection, database backups).

Web UI Templates (templates/):

  • login.html: User authentication
  • index.html: Main dashboard with real-time device controls
  • config.html: Application-level configuration
  • input_config.html / output_config.html: Device-specific settings
  • input_link.html / output_link.html: Input-output automation rules
  • event_log.html: Audit trail of all system events
  • metrics.html: Performance metrics and monitoring
  • layout.html: Base template with navigation and styling

Static Assets (static/):

  • Bootstrap CSS/JS for responsive UI
  • Font Awesome icons
  • Custom clock picker for schedule configuration
  • Favicon

Integration Architecture:

The application demonstrates professional integration patterns:

  • ZeroMQ: Thread-safe inter-component messaging with proper socket lifecycle
  • RabbitMQ: External device message queue with exchange-based routing
  • MQTT: IoT device connectivity with mDash automatic discovery
  • Flask/FastAPI: Dual web framework approach (legacy + modern async)
  • SQLAlchemy: Async ORM with relationship mapping and data validation
  • Telegram Bot Framework: Async command handling and media support
  • 1Password Secrets Automation: Encrypted credential and configuration management
  • Sentry SDK: Production error tracking with integrations for Flask, async, threading
  • Permit.io: Role-based access control (RBAC) for web UI
  • InfluxDB: Time-series storage for meter/sensor readings
  • AWS Services: S3 for database backups, SNS for SMS fallback
  • ngrok/Tailscale: Network tunneling for remote access

See tailucas-pylib for shared architectural patterns and utilities.

(back to top)

Built With

Technologies that help make this project useful:

1Password AWS Bootstrap Font Awesome Docker InfluxDB ngrok MQTT RabbitMQ Poetry Python Flask Sentry SQLite Telegram ZeroMQ

Also:

(back to top)

Getting Started

Here is some detail about the intended use of this project.

Prerequisites

Beyond the Python dependencies defined in pyproject.toml, the project requires:

  • 1Password Secrets Automation: Runtime credential and configuration management (paid service with free tier)
  • Sentry: Error tracking and monitoring (free tier available)
  • RabbitMQ: Message broker for device communication (self-hosted or managed service)
  • MQTT Broker: IoT device messaging (self-hosted like Mosquitto or managed service)
  • InfluxDB: Time-series database for meter metrics (optional, self-hosted or cloud)
  • Telegram Bot: SMS-like notifications via Telegram (free)
  • mDash: Device discovery service (free tier available)

Optional services:

  • ngrok: Remote access tunnel (free tier available)
  • Tailscale: VPN-based remote access (free tier available)
  • AWS Services: S3 (database backups), SNS (SMS fallback)
  • InfluxDB Cloud: Managed metrics storage
  • Sentry: Production error tracking

Required Tools

Install these tools and ensure they're on your environment $PATH:

For local development (optional):

  • java and javac: Java 25+ runtime/compiler (Amazon Corretto recommended)
  • python3: Python 3.12+ runtime
  • poetry: Legacy dependency management (if not using uv)

Installation

  1. 🛑 Prerequisites - 1Password Secrets Automation Setup

    This project relies on 1Password Secrets Automation for configuration and credential management. A 1Password Connect server container must be running in your environment.

    Your 1Password Secrets Automation vault must contain an entry called ENV.event-processor with the following configuration variables:

    Variable Purpose Example
    APP_FLASK_DEBUG Flask debug mode false
    APP_FLASK_HTTP_PORT Flask web server port 8080
    APP_NAME Application identifier event-processor
    AWS_CONFIG_FILE AWS configuration path /home/app/.aws/config
    AWS_DEFAULT_REGION AWS region for S3/SNS us-east-1
    BACKUP_S3_BUCKET S3 bucket for DB backups my-backup-bucket
    CRONITOR_MONITOR_KEY Cronitor health check key specific to your account
    DEVICE_NAME Container hostname event-processor-a
    HC_PING_URL Healthchecks.io URL specific to your check
    HEALTHCHECKS_BADGE_CSV Healthchecks badge URLs project specific
    INFLUXDB_BUCKET InfluxDB bucket name meter
    LEADER_ELECTION_ENABLED Enable leader election false
    MDASH_API_BASE_URL mDash discovery API https://mdash.net/api/v2/devices
    MDASH_APP_CONFIG_MQTT_PUB_TOPIC mDash config topic app.mqtt_pub_topic
    MDASH_DEVICE_TAGS_CSV Filter devices by tags meter,sensor
    MQTT_METER_RESET_TOPIC Meter reset control topic meter/electricity/control
    MQTT_PUB_TOPIC_CSV MQTT topics to subscribe meter/electricity/#,sensor/#
    MQTT_SERVER_ADDRESS MQTT broker IP/hostname 192.168.1.100
    NGROK_CLIENT_API_PORT ngrok management API port 4040
    NGROK_ENABLED Enable ngrok tunneling true
    NGROK_TUNNEL_NAME ngrok tunnel name frontend
    OP_CONNECT_HOST 1Password Connect server URL http://1password-connect:8080
    OP_CONNECT_TOKEN 1Password Connect token specific to your server
    OP_VAULT 1Password vault ID specific to your vault
    OUTPUT_TYPE_BLUETOOTH Bluetooth device type l2ping
    OUTPUT_TYPE_SNAPSHOT Snapshot device type Camera
    OUTPUT_TYPE_SWITCH Switch device types switch,Buzzer
    OUTPUT_TYPE_TTS Text-to-speech type TTS
    RABBITMQ_EXCHANGE RabbitMQ exchange name home_automation
    RABBITMQ_SERVER_ADDRESS RabbitMQ broker IP 192.168.1.100
    SNS_CONTROL_ENABLED Enable SQS control messages false
    SQS_QUEUE SQS queue name automation-control
    TABLESPACE_PATH SQLite database directory /data
    TELEGRAM_CHAT_ROOM Telegram chat ID specific to your chat
    TELEGRAM_IMAGE_SEND_ONLY_WITH_PEOPLE Filter image sending true
    TELEGRAM_SMS_FALLBACK_ENABLED Use SNS SMS fallback false
    TELEGRAM_USERS_CSV Authorized Telegram users user1,user2,user3
    USER_TZ Timezone override America/New_York

    Additional runtime configuration (see app.conf) is automatically populated from 1Password.

  2. Clone the Repository and Initialize Submodules

    git clone https://github.com/tailucas/event-processor.git
    cd event-processor
    git submodule init
    git submodule update
  3. Create Docker User and Set Directory Permissions

    task datadir

    Ensure you've reviewed Makefile assumptions about user IDs for Docker (UID/GID 999).

  4. Configure Runtime Environment

    task configure

    This generates docker-compose.yml and .env from your 1Password secrets and base.env template.

  5. Build the Docker Image

    task build

    Multi-stage Docker build:

    • Builder stage: Compiles Java artifacts using Maven 3.9+, Java 25 (Amazon Corretto)
    • Runtime stage: Extends tailucas/base-app:latest with:
      • Additional system packages (sqlite3, html-xml-utils, wget)
      • Python 3.12+ with uv-managed dependencies
      • Compiled Java application (app.jar)
      • Flask web application
      • FastAPI/uvicorn endpoints
      • Telegram bot integration
      • Database backup scripts
      • Cron job configuration
  6. Run the Application

    Foreground (interactive, logs to console):

    task run

    Background (detached mode, logs to syslog):

    task rund

    The application will:

    • Start RabbitMQ client for device messaging
    • Launch broker client for external integrations
    • Initialize background scheduler for automation rules
    • Start main event processing loop
    • Discover MQTT sources via mDash
    • Start Telegram bot (asyncio)
    • Launch thread nanny (monitors thread health)
    • Start Flask web server (port 8080 by default)
    • Start FastAPI server (port 8085 by default)

(back to top)

Application Startup Sequence

When task run or task rund is executed, the application initializes in this order:

  1. Configuration Loading: Reads from 1Password Secrets Automation and config/app.conf
  2. Database Initialization: Creates/upgrades SQLite schema at config/db_schema.sql
  3. Sentry Integration: Initializes error tracking with Flask and async integrations
  4. Message Queue Setup: Connects to RabbitMQ broker for device communication
  5. Scheduler Initialization: Loads scheduled automation rules and device enable/disable timers
  6. Threading & Signals: Starts thread nanny for health monitoring, installs signal handlers
  7. ZeroMQ Relay: Initializes HeartbeatFilter for device status message relay
  8. MQTT Discovery: Queries mDash API to discover and register MQTT devices
  9. Telegram Bot: Starts async Telegram bot with command handlers
  10. Flask Web Server: Launches synchronous web server (port 8080) with user authentication
  11. FastAPI Server: Starts modern async API endpoints (port 8085) via uvicorn
  12. Event Processing Loop: Main EventProcessor thread begins processing messages

All components run concurrently with proper error handling and graceful shutdown on system signals.

Web Interface

The Flask web application (localhost:8080) provides:

  • Dashboard: Real-time display of registered devices, their status, and manual control buttons
  • Device Management: Configuration of device properties, input/output types, and capabilities
  • Automation Rules: Create input-to-output relationships with scheduling (cron-like patterns)
  • Event Log: Audit trail of all device interactions, state changes, and automated actions
  • Metrics: Historical visualization of meter readings and system performance
  • Administration: User management, system settings, database maintenance

Build System

Task CLI (Taskfile.yml)

Primary build and deployment orchestration:

  • task build - Build Docker image with Java compilation, Python dependencies, and asset bundling
  • task run - Run container in foreground with full log output
  • task rund - Run container detached (persists after terminal close)
  • task configure - Generate .env and docker-compose.yml from 1Password secrets
  • task datadir - Create data directory with proper permissions
  • task java - Compile Java artifacts with Maven (standalone Java build)
  • task python - Setup Python virtual environment with uv
  • task push - Push built image to Docker Hub/registry

Dockerfile

Multi-stage build process:

  1. Builder Stage: Compiles Java application

    • Uses tailucas/base-app:latest as builder base
    • Builds with Maven 3.9+
    • Produces app-0.1.0.jar
  2. Runtime Stage: Extends tailucas/base-app:latest with event-processor-specific components

    • System packages: sqlite3, html-xml-utils, wget
    • Python application code and dependencies (uv-managed)
    • Compiled Java JAR from builder stage
    • Flask web application and templates
    • FastAPI application
    • Telegram bot configuration
    • Database backup scripts
    • Cron job configuration
    • Runs as user app (UID 999)

Dependencies

Python (pyproject.toml, managed via uv):

  • flask>=3.1.2 - Web framework for dashboard
  • fastapi>=0.116.1 - Modern async API framework
  • flask-sqlalchemy>=3.1.1 - ORM integration with Flask
  • sqlalchemy[asyncio]>=2.0.43 - Async database ORM
  • aiosqlite>=0.21.0 - Async SQLite driver
  • pyzmq>=27.0.2 - ZeroMQ bindings
  • python-telegram-bot>=22.4 - Telegram bot integration
  • requests>=2.32.5 - HTTP client library
  • uvicorn[standard]>=0.35.0 - ASGI web server
  • schedule>=1.2.2 - Job scheduling library
  • pytz>=2025.2 - Timezone support
  • permit>=2.8.1 - Authorization/RBAC
  • pydantic>=2.11.9 - Data validation
  • sentry-sdk[flask]>=2.37.0 - Error tracking
  • tailucas-pylib>=0.5.2 - Shared utilities

Java (Maven, Spring Boot 3.4.13 parent):

  • Core: commons-lang3, ini4j, SLF4J
  • Messaging: RabbitMQ client, MQTT (Paho)
  • Data: Jackson (JSON/MessagePack)
  • Monitoring: Prometheus metrics, Sentry Spring Boot
  • Distributed: PagerDuty integration, Unleash feature flags
  • JMX: Remote management capabilities

Port Mappings

From docker-compose.yml:

  • 4041:4040 - ngrok API management interface
  • 8095:8080 - Flask web server
  • 8085:8085 - FastAPI/uvicorn endpoints
  • 9400:9400 - Prometheus metrics endpoint

Java Configuration

This project includes Spring Boot integration. For local JMX profiling, use these VM arguments:

-Djava.net.preferIPv4Stack=true \
-Dcom.sun.management.jmxremote.host=127.0.0.1 \
-Dcom.sun.management.jmxremote.port=3333 \
-Dcom.sun.management.jmxremote.ssl=false \
-Dcom.sun.management.jmxremote.authenticate=false

The EventProcessor Java class is the main entry point, providing complementary functionality to the Python application.

License

Distributed under the MIT License. See LICENSE for more information.

(back to top)

Acknowledgments

(back to top)

About

No description, website, or topics provided.

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors