EdgePulse is an end-to-end, production-oriented micro-SaaS for ingesting time-series telemetry (IoT sensors, energy meters, industrial signals, and application/infra metrics) and detecting anomalies in near real time.
- What it does
- Architecture
- Tech stack
- Repository layout
- API overview
- Quick start (local)
- Configuration
- ML pipeline
- Alerts
- Infrastructure & deployment
- Roadmap
- Security
- Contributing
- Licence
EdgePulse provides:
- Fast ingestion at the edge: a Cloudflare Worker validates payloads, authenticates tenants via API keys, archives raw payloads, and buffers events asynchronously.
- Reliable async processing: Cloudflare Queues decouple ingestion from downstream processing for resilience and predictable latency.
- Durable raw archive: raw request payloads are stored in Cloudflare R2 for auditability and replay.
- SQL-first system of record: tenants, devices, metrics, datapoints, features, models, scores, and alert metadata are stored in Postgres (AWS RDS).
- Baseline anomaly detection: rolling-window feature extraction + Isolation Forest scoring (scikit-learn).
- Alerting: configurable anomaly rules trigger signed webhook notifications and create an auditable alert history.
- Dashboard: a lightweight UI to browse devices/metrics, inspect ingestion health, and visualise anomalies over time.
- MLOps foundations: model artefact versioning, containerised jobs, and scheduled orchestration on AWS Fargate.
- Client → Worker (
POST /v1/ingest)
Validates schema, checks API key, enforces tenant scope. - Worker → R2
Stores the raw payload (immutable archive). - Worker → Queue
Publishes an async message for downstream normalisation. - Consumer → Postgres
Writes normalised datapoints and ingestion batch status. - Scheduled ML jobs (Fargate)
Compute features, train/update models, score new windows, store anomaly scores. - Alert engine
Evaluates alert rules and delivers webhook notifications. - Dashboard (Pages)
Displays devices, metrics, anomalies, and alert history via the API.
Devices/Apps
|
| HTTPS (X-API-Key)
v
Cloudflare Worker (API Gateway)
| \
| \--> R2 (raw archive)
v
Cloudflare Queue ---> Consumer ---> Postgres (RDS)
\
\--> AWS Fargate jobs (features/train/score)
|
v
anomaly_scores + models
|
v
Alert engine ---> Webhooks
|
v
Cloudflare Pages (Dashboard UI)
Cloudflare
- Workers (API gateway)
- Queues (async buffering)
- R2 (raw payload archive)
- Pages (dashboard hosting)
AWS
- ECS Fargate (containerised ML jobs)
- EventBridge Scheduler (cron triggers)
- RDS Postgres (system of record)
- S3 (optional; model artefacts if not using R2)
Data/ML
- Python, SQL
- pandas, NumPy
- scikit-learn (baseline)
- PyTorch (planned upgrade path)
services/
ingest-worker/ # Cloudflare Worker: auth, validation, R2 archive, Queue publish
queue-consumer/ # Consumer (Worker or service): normalise + write to Postgres
jobs/
ml-pipeline/ # Python pipeline: features/train/score (Dockerised for Fargate)
web/
dashboard/ # Dashboard UI (Cloudflare Pages)
infra/
terraform/ # IaC: Cloudflare + AWS (RDS, ECS, Scheduler, IAM, etc.)
docs/
architecture.md # Design decisions, diagrams, threat model (recommended)
api/openapi.yaml # OpenAPI spec
db/schema.sql # Database schema (if you keep SQL snapshots)
examples/
device-simulator/ # Generate realistic telemetry + injected anomalies
sample-payloads/ # Example JSON payloads
All API calls (except /v1/health) require:
X-API-Key: <tenant_api_key>
POST /v1/ingest
Example payload:
{
"device_external_id": "meter-001",
"metrics": [
{ "name": "voltage", "ts": "2025-12-25T08:45:00Z", "value": 229.5, "unit": "V" },
{ "name": "current", "ts": "2025-12-25T08:45:00Z", "value": 12.1, "unit": "A" }
]
}Example request:
curl -X POST "http://localhost:8787/v1/ingest" \
-H "Content-Type: application/json" \
-H "X-API-Key: ep_dev_XXXXXXXXXXXXXXXX" \
-d @examples/sample-payloads/ingest.jsonGET /v1/anomalies?metric_id=<uuid>&from=<iso>&to=<iso>
GET/POST /v1/devicesGET/POST /v1/metricsGET/POST /v1/alert-rulesGET/POST /v1/webhooks
See docs/api/openapi.yaml for the canonical API contract.
- Node.js 18+ (or 20+)
- Python 3.10+ (3.11 recommended)
- Docker (recommended for local Postgres and for building ML job images)
- Cloudflare Wrangler CLI (for Workers/Pages local dev)
docker run --name edgepulse-postgres \
-e POSTGRES_PASSWORD=postgres \
-e POSTGRES_DB=edgepulse \
-p 5432:5432 \
-d postgres:16If you keep a SQL snapshot:
psql "postgresql://postgres:postgres@localhost:5432/edgepulse" \
-f docs/db/schema.sqlOr use migrations (recommended) from jobs/ml-pipeline (e.g., Alembic), depending on your setup.
cd services/ingest-worker
npm install
npm run devThe Worker should serve locally at a Wrangler URL (commonly http://localhost:8787).
cd web/dashboard
npm install
npm run devcd jobs/ml-pipeline
python -m venv .venv && source .venv/bin/activate
pip install -r requirements.txt
python -m edgepulse_ml score --tenant <tenant_id> --metric <metric_id> --since "24h"Worker (services/ingest-worker)
DATABASE_URL— Postgres connection stringR2_BUCKET_NAME— R2 bucket for raw payloadsQUEUE_NAME— Cloudflare Queue nameAPI_KEY_HASH_SALT— salt/pepper for key hashingRATE_LIMIT_*— optional (plan-based quotas)
ML jobs (jobs/ml-pipeline)
DATABASE_URLMODEL_ARTEFACT_STORE—r2://...ors3://...FEATURE_WINDOW_SEC— e.g.,300SCORE_INTERVAL_SEC— e.g.,600
Dashboard (web/dashboard)
VITE_API_BASE_URL(or equivalent if you choose Next.js)
Commit .env.example files, never commit real secrets.
- Windowing: group datapoints into rolling windows per metric (e.g., 5 minutes).
- Feature extraction (robust + cheap):
- mean, std
- median, MAD (robust dispersion)
- min/max range
- slope (trend)
- missing rate
- stuck-at detection (low variance)
- Model: Isolation Forest per metric (or per metric family) trained on recent “mostly normal” history.
- Scoring: compute anomaly score per window; assign labels (spike/drop/drift/missingness/stuck).
- PyTorch temporal autoencoder for reconstruction error
- Quantisation / sparse variants (“edge profile”) for efficient inference
Alert rules evaluate anomaly scores and deliver signed webhooks.
A typical webhook payload includes:
tenant_id,metric_id,window_end,score,label,explanation- signature header (HMAC) for authenticity
Webhook retries with exponential backoff; deliveries are recorded in alert_events.
Use infra/terraform to provision:
- Cloudflare: Worker, Queue, R2 bucket, Pages project
- AWS: RDS Postgres, ECS cluster/services/tasks, EventBridge schedules, IAM roles/policies
Suggested deployment sequence:
- Provision cloud infrastructure (Terraform).
- Deploy Worker + consumer (Wrangler).
- Deploy dashboard (Pages).
- Build and push ML job image (ECR), then enable schedules.
This repo intentionally separates:
- low-latency ingestion (Cloudflare) from
- compute-heavy training/scoring (AWS)
- Worker auth + validation
- R2 raw archive + Queue buffering
- Consumer normalisation to Postgres
- Basic dashboard views (devices/metrics/batches)
- Feature extraction + Isolation Forest training
- Scheduled scoring job (Fargate)
- Metric detail page (values + scores)
- Alert rules + webhook delivery + audit trail
- Usage accounting (ingested points, scored windows)
- Plan enforcement (quotas + retention)
- Edge agent (Python) for batching + signing
- PyTorch model option + “edge profile” export
- Drift monitoring + model refresh policies
- Never store raw API keys; store only hashes and a short lookup prefix.
- Do not commit secrets: use
.envlocally and secret managers in CI/cloud. - Sign webhooks (HMAC) and verify signatures on the receiver side.
- Apply least-privilege IAM for AWS tasks and storage access.
Contributions are welcome:
- Open an issue describing the change (bug, feature, refactor).
- Keep PRs small and test-backed.
- Ensure no secrets are included in commits.
See LICENSE.