Skip to content

silvermpx/wse

Repository files navigation

WSE - WebSocket Engine

PyPI - Server PyPI - Client npm License: MIT

High-performance WebSocket server built in Rust with native clustering, E2E encryption, message recovery, presence tracking, and real-time fan-out. Exposed to Python via PyO3 with zero GIL overhead on the data path. Ships with Rust-accelerated utilities for application code: priority queues, rate limiters, event sequencing, compression, cryptography, and JWT.

Features

Server

Feature Details
Rust core tokio async runtime, tungstenite WebSocket transport, dedicated thread pool, zero GIL on the data path
JWT authentication HS256, RS256, ES256 algorithms via jsonwebtoken crate. Validated during handshake (0.01ms), cookie + Authorization header extraction, key rotation, kid validation
Protocol negotiation client_hello/server_hello handshake with feature discovery, capability advertisement, version agreement
Topic subscriptions Per-connection topic subscriptions with automatic cleanup on disconnect
Pre-framed broadcast WebSocket frame built once, shared via Arc across all connections, single allocation per broadcast
Vectored writes write_vectored (writev syscall) batches multiple frames per connection in a single kernel call
Write coalescing Write task drains up to 256 pending frames per iteration via recv_many
DashMap state Lock-free sharded concurrent hash maps for topics, rates, formats, activity tracking
mimalloc allocator Global allocator optimized for multi-threaded workloads with frequent small allocations
Deduplication 50,000-entry AHashSet with FIFO eviction per send_event() call
Rate limiting Per-connection token bucket: 100K capacity, 10K/s refill, client warning at 20% remaining
Zombie detection Server pings every 25s, force-closes connections with no activity for 60s
Drain mode Lock-free crossbeam bounded channel, Python acquires GIL once per batch (not per event)
Compression zlib for client-facing messages above threshold (default 1024 bytes)
MessagePack Opt-in binary transport via ?format=msgpack, roughly 2x faster serialization, 30% smaller
Message signing Selective HMAC-SHA256 signing for critical operations, nonce-based replay prevention
Queue groups Round-robin dispatch within named groups for load-balanced worker pools
Topic ACL Per-connection allow/deny glob patterns for topic access control
Graceful drain drain() sends Close frame to all clients, rejects new connections, notifies cluster peers

End-to-End Encryption

Feature Details
Key exchange ECDH P-256 (per-connection keypair, automatic during handshake)
Encryption AES-GCM-256 with unique 12-byte IV per message
Key derivation HKDF-SHA256 (salt: wse-encryption, info: aes-gcm-key)
Wire format E: prefix + 12-byte IV + AES-GCM ciphertext + 16-byte auth tag
Key rotation Configurable rotation interval (default 1 hour), automatic renegotiation
Replay prevention Nonce cache (10K entries, 5-minute TTL) on the client side

Cluster Protocol

Feature Details
Topology Full TCP mesh, direct peer-to-peer connections
Wire format Custom binary frames: 8-byte header + topic + payload, 12 message types
Interest routing SUB/UNSUB/RESYNC frames, messages forwarded only to peers with matching subscribers
Gossip discovery PeerAnnounce/PeerList frames, new nodes need one seed address to join
mTLS rustls + tokio-rustls, P-256 certificates, WebPkiClientVerifier for both sides
Compression zstd level 1 for payloads above 256 bytes, capability-negotiated, output capped at 1 MB
Heartbeat 5s ping interval, 15s timeout, dead peer detection
Circuit breaker 10 failures to open, 60s reset, 3 half-open probe calls
Dead letter queue 1000-entry ring buffer for failed cluster sends
Presence sync PresenceUpdate/PresenceFull frames, CRDT last-write-wins conflict resolution
Topology API cluster_info() returns connected peer list with address, instance_id, status

Presence Tracking

Feature Details
Per-topic tracking Which users are active in each topic, with custom metadata (status, avatar, etc.)
User-level grouping Multiple connections from same JWT sub share one presence entry
Join/leave lifecycle presence_join on first connection, presence_leave on last disconnect
O(1) stats presence_stats() returns member/connection counts without iteration
Data updates update_presence() broadcasts to all topics where the user is present
Cluster sync Synchronized across all nodes, CRDT last-write-wins resolution
TTL sweep Background task every 30s removes entries from dead connections

Message Recovery

Feature Details
Ring buffers Per-topic, power-of-2 capacity, bitmask indexing (single AND instruction)
Epoch+offset tracking Precise recovery positioning, epoch changes on buffer recreation
Memory management Global budget (default 256 MB), TTL eviction, LRU eviction when over budget
Zero-copy storage Recovery entries share Bytes (Arc) with the broadcast path
Recovery on reconnect subscribe_with_recovery() replays missed messages automatically

Client SDKs (Python + TypeScript/React)

Feature Details
Auto-reconnection 4 strategies: exponential, linear, fibonacci, adaptive backoff with jitter
Connection pool Multi-endpoint with health scoring, 3 load balancing strategies, automatic failover
Circuit breaker CLOSED/OPEN/HALF_OPEN state machine, prevents connection storms
Rate limiting Client-side token bucket, coordinates with server feedback
E2E encryption Wire-compatible AES-GCM-256 + ECDH P-256 (both clients speak the same protocol)
Event sequencing Duplicate detection (sliding window) + out-of-order buffering
Network monitor Real-time latency, jitter, packet loss measurement, quality scoring
Priority queues 5 levels from CRITICAL to BACKGROUND
Offline queue IndexedDB persistence (TypeScript), replayed on reconnect
Compression Automatic zlib for messages above threshold
MessagePack Binary encoding for smaller payloads and faster serialization
Message signing HMAC-SHA256 integrity verification

Transport Security

Feature Details
Origin validation Configure in reverse proxy (nginx/Caddy) to prevent CSWSH
Cookie auth access_token HTTP-only cookie with Secure + SameSite=Lax (OWASP recommended for browsers)
Frame protection 1 MB max frame size, serde_json parsing (no eval), escaped user IDs in server_ready
Cluster frame protection zstd decompression output capped at 1 MB (MAX_FRAME_SIZE), protocol version validation

Quick Start

pip install wse-server
from wse_server import RustWSEServer, rust_jwt_encode
import time, threading

server = RustWSEServer(
    "0.0.0.0", 5007,
    max_connections=10_000,
    jwt_secret=b"replace-with-a-strong-secret-key!",
    jwt_issuer="my-app",
    jwt_audience="my-api",
)
server.enable_drain_mode()
server.start()

def handle_events(srv):
    while True:
        for ev in srv.drain_inbound(256, 50):
            if ev[0] == "auth_connect":
                srv.subscribe_connection(ev[1], ["updates"])
            elif ev[0] == "msg":
                print(f"Message from {ev[1]}: {ev[2]}")
            elif ev[0] == "disconnect":
                print(f"Disconnected: {ev[1]}")

threading.Thread(target=handle_events, args=(server,), daemon=True).start()

while server.is_running():
    time.sleep(1)

Generate a test token:

token = rust_jwt_encode(
    {"sub": "user-1", "iss": "my-app", "aud": "my-api",
     "exp": int(time.time()) + 3600, "iat": int(time.time())},
    b"replace-with-a-strong-secret-key!",
)

Server Configuration

RustWSEServer constructor parameters:

Parameter Default Description
host required Bind address
port required Bind port
max_connections 1000 Maximum concurrent WebSocket connections
jwt_secret None JWT key for validation. HS256: shared secret (bytes, min 32). RS256/ES256: PEM public key. None disables auth
jwt_issuer None Expected iss claim. Skipped if None
jwt_audience None Expected aud claim. Skipped if None
jwt_cookie_name "access_token" Cookie name for JWT token extraction
jwt_previous_secret None Previous key for zero-downtime rotation. HS256: previous secret. RS256/ES256: previous public key PEM
jwt_key_id None Expected kid header claim. Rejects tokens with mismatched key ID
jwt_algorithm None JWT algorithm: "HS256" (default), "RS256", or "ES256"
jwt_private_key None PEM private key for RS256/ES256 token encoding. Not needed for HS256
max_inbound_queue_size 131072 Drain mode bounded queue capacity
recovery_enabled False Enable per-topic message recovery buffers
recovery_buffer_size 128 Ring buffer slots per topic (rounded to power-of-2)
recovery_ttl 300 Buffer TTL in seconds before eviction
recovery_max_messages 500 Max messages returned per recovery response
recovery_memory_budget 268435456 Global memory limit for all recovery buffers (bytes, default 256 MB)
presence_enabled False Enable per-topic presence tracking
presence_max_data_size 4096 Max bytes for a user's presence metadata
presence_max_members 0 Max tracked members per topic (0 = unlimited)
max_outbound_queue_bytes 16777216 Per-connection outbound buffer limit (bytes, default 16 MB). Messages dropped when exceeded
max_subscriptions_per_connection 0 Max topics per connection (0 = unlimited). Prevents topic explosion DoS
rate_limit_capacity 100000.0 Token bucket capacity per connection
rate_limit_refill 10000.0 Token bucket refill rate per second
max_message_size 1048576 Maximum WebSocket frame size in bytes (default 1 MB)
ping_interval 25 Server-initiated ping interval in seconds
idle_timeout 60 Force-close connections idle for this many seconds

API Reference

Lifecycle

server.start()                          # Start the server
server.stop()                           # Graceful shutdown
server.is_running()                     # Check server status (bool)

Event Handling

Drain mode (recommended) - events are queued in a lock-free crossbeam channel. Python polls in batches, acquiring the GIL once per batch.

server.enable_drain_mode()              # Switch to batch-polling mode (call before start)
events = server.drain_inbound(256, 50)  # Poll up to 256 events, wait up to 50ms

Each event is a tuple: (event_type, conn_id, payload)

Event Type Trigger Payload
"auth_connect" JWT-validated connection user_id (string)
"connect" Connection without JWT cookies (string)
"msg" Client sent WSE-prefixed JSON parsed dict
"raw" Client sent plain text raw string
"bin" Client sent binary frame bytes
"disconnect" Connection closed None
"presence_join" User's first connection joined a topic dict with user_id, topic, data
"presence_leave" User's last connection left a topic dict with user_id, topic, data

Callback mode - alternative to drain mode. Callbacks are invoked via spawn_blocking per event.

server.set_callbacks(on_connect, on_message, on_disconnect)

Sending Messages

server.send(conn_id, text)                      # Send text to one connection
server.send_bytes(conn_id, data)                 # Send binary to one connection
server.send_event(conn_id, event_dict)           # Send structured event (auto-serialized, deduped, rate-checked)

server.broadcast_all(text)                       # Send to every connected client (text)
server.broadcast_all_bytes(data)                 # Send to every connected client (binary)
server.broadcast_local(topic, text)              # Fan-out to topic subscribers on this instance
server.broadcast(topic, text)                    # Fan-out to topic subscribers across all cluster nodes
Method Scope Notes
send Single connection Raw text frame
send_bytes Single connection Raw binary frame
send_event Single connection JSON-serialized, compressed if above threshold, deduplication via 50K-entry FIFO window
broadcast_all All connections Pre-framed, single frame build shared via Arc
broadcast_local Topic (local) Pre-framed, DashMap subscriber lookup, stored in recovery buffer if enabled
broadcast Topic (all nodes) Local fan-out + forwarded to cluster peers with matching interest

Topic Subscriptions

server.subscribe_connection(conn_id, ["prices", "news"])              # Subscribe to topics
server.subscribe_connection(conn_id, ["chat"], {"status": "online"})  # Subscribe with presence data
server.subscribe_connection(conn_id, ["tasks"], queue_group="workers") # Subscribe with queue group (round-robin)
server.unsubscribe_connection(conn_id, ["news"])                      # Unsubscribe from specific topics
server.unsubscribe_connection(conn_id, None)                          # Unsubscribe from all topics
server.get_topic_subscriber_count("prices")                           # Subscriber count for a topic

Subscriptions are cleaned up automatically on disconnect. In cluster mode, interest changes are propagated to peers via SUB/UNSUB frames.

Queue groups: connections in the same queue_group receive messages round-robin instead of fanout. Normal subscribers (no queue group) still receive all messages. Useful for distributing work across a pool of consumers.

Topic ACL

Per-connection topic access control with glob pattern matching.

# Allow only "user:*" topics, deny everything else
server.set_topic_acl(conn_id, allow=["user:*"])

# Allow "data:*" but deny "data:internal:*"
server.set_topic_acl(conn_id, allow=["data:*"], deny=["data:internal:*"])

# Must be called before subscribe_connection
server.subscribe_connection(conn_id, ["data:prices"])   # allowed
server.subscribe_connection(conn_id, ["data:internal:audit"])  # denied

Deny patterns take precedence over allow patterns. Supports * (any characters) and ? (single character) wildcards. Applied at subscribe time.

Presence Tracking

Requires presence_enabled=True in the constructor.

# Query members in a topic
members = server.presence("chat-room")
# {"alice": {"data": {"status": "online"}, "connections": 2},
#  "bob":   {"data": {"status": "away"},   "connections": 1}}

# Lightweight counts (O(1), no iteration)
stats = server.presence_stats("chat-room")
# {"num_users": 2, "num_connections": 3}

# Update a user's presence data across all their subscribed topics
server.update_presence(conn_id, {"status": "away"})

Presence is tracked at the user level (JWT sub claim). Multiple connections from the same user share a single presence entry. presence_join fires on first connection, presence_leave on last disconnect. In cluster mode, presence state is synchronized across all nodes using CRDT last-write-wins resolution.

Message Recovery

Requires recovery_enabled=True in the constructor.

result = server.subscribe_with_recovery(
    conn_id, ["prices"],
    recover=True,
    epoch=client_epoch,       # From previous session
    offset=client_offset,     # From previous session
)
# {"topics": {"prices": {"epoch": 123, "offset": 456, "recovered": True, "count": 12}}}

The server maintains per-topic ring buffers (power-of-2 capacity, bitmask indexing). Clients store the epoch and offset from their last received message. On reconnect, the server replays missed messages from the ring buffer. If the gap is too large or the epoch has changed, the client receives a NotRecovered status and should re-subscribe from scratch.

Memory is managed with a global budget (default 256 MB), TTL eviction for idle buffers, and LRU eviction when over budget.

Cluster

# Join a cluster mesh with mTLS
server.connect_cluster(
    peers=["10.0.0.2:9999", "10.0.0.3:9999"],
    tls_ca="/etc/wse/ca.pem",
    tls_cert="/etc/wse/node.pem",
    tls_key="/etc/wse/node.key",
    cluster_port=9999,
)

# With gossip discovery (only seed addresses needed)
server.connect_cluster(
    peers=[],
    seeds=["10.0.0.2:9999"],
    cluster_addr="10.0.0.1:9999",
    cluster_port=9999,
)

server.cluster_connected()       # True if connected to at least one peer
server.cluster_peers_count()     # Number of active peer connections
server.cluster_info()            # List of connected peers (address, instance_id, connected)

Nodes form a full TCP mesh automatically. The cluster protocol uses a custom binary frame format with an 8-byte header, 12 message types, and capability negotiation during handshake. Features:

  • Interest-based routing - SUB/UNSUB/RESYNC frames. Messages are only forwarded to peers with matching subscribers.
  • Gossip discovery - PeerAnnounce/PeerList frames. New nodes need one seed address to join.
  • mTLS - mutual TLS via rustls with P-256 certificates and WebPkiClientVerifier.
  • zstd compression - payloads above 256 bytes compressed at level 1, capability-negotiated.
  • Circuit breaker - 10 failures to open, 60s reset, 3 half-open probe calls.
  • Heartbeat - 5s interval, 15s timeout, dead peer detection.
  • Dead letter queue - 1000-entry ring buffer for failed cluster sends.
  • Presence sync - PresenceUpdate/PresenceFull frames with CRDT conflict resolution.

Health Monitoring

health = server.health_snapshot()
# {
#     "connections": 150,
#     "inbound_queue_depth": 0,
#     "inbound_dropped": 0,
#     "uptime_secs": 3600.5,
#     "recovery_enabled": True,
#     "recovery_topic_count": 5,
#     "recovery_total_bytes": 1048576,
#     "cluster_connected": True,
#     "cluster_peer_count": 2,
#     "cluster_messages_sent": 50000,
#     "cluster_messages_delivered": 49950,
#     "cluster_messages_dropped": 0,
#     "cluster_bytes_sent": 1048576,
#     "cluster_bytes_received": 1024000,
#     "cluster_reconnect_count": 0,
#     "cluster_unknown_message_types": 0,
#     "cluster_dlq_size": 0,
#     "presence_enabled": True,
#     "presence_topics": 3,
#     "presence_total_users": 25,
# }

Connection Management

server.get_connection_count()        # Lock-free AtomicUsize read
server.get_connections()             # List all connection IDs (snapshot)
server.disconnect(conn_id)           # Force-disconnect a connection
server.drain(close_code=4300, close_reason="shutting down", timeout=10)  # Graceful drain
server.inbound_queue_depth()         # Events waiting to be drained
server.inbound_dropped_count()       # Events dropped due to full queue
server.get_cluster_dlq_entries()     # Retrieve failed cluster messages from dead letter queue

drain() sends a WebSocket Close frame to all connected clients and rejects new connections. The drain wait runs as a separate task, so the command processor stays responsive. Use for zero-downtime deployments and rolling restarts.


Security

JWT Authentication

Rust-native HS256 validation during the WebSocket handshake. Zero GIL acquisition, 0.01ms per decode.

Token delivery:

  • Browser clients: access_token HTTP-only cookie (set by your login endpoint, attached automatically by the browser)
  • Backend clients: Authorization: Bearer <token> header and/or access_token cookie
  • API clients: Authorization: Bearer <token> header

Required claims: sub (user ID), exp (expiration). Recommended: iat (issued at). Optional: iss, aud (validated if configured).

End-to-End Encryption

Per-connection session keys via ECDH P-256 key exchange, AES-GCM-256 encryption, HKDF-SHA256 key derivation.

Wire format: E: prefix + 12-byte IV + AES-GCM ciphertext + 16-byte auth tag.

Enable on the client side - the server handles key exchange automatically during the handshake.

Rate Limiting

Per-connection token bucket: 100,000 token capacity, 10,000 tokens/second refill. Clients receive a rate_limit_warning at 20% remaining capacity, and RATE_LIMITED error when exceeded.

Deduplication

send_event() maintains a 50,000-entry AHashSet with FIFO eviction. Duplicate message IDs are dropped before serialization.

Zombie Detection

Server pings every connected client every 25 seconds. Connections with no activity for 60 seconds are force-closed.

Full security documentation: docs/SECURITY.md


Wire Protocol

WSE uses a custom wire protocol with categorized JSON messages:

Text frames: JSON with c field for category: "WSE" (system), "S" (snapshot), "U" (update)

Binary frames: C: (zlib compressed), M: (MessagePack), E: (AES-GCM encrypted), raw zlib (0x78 magic byte)

MessagePack transport: opt-in per connection via ?format=msgpack query parameter. Roughly 2x faster serialization and 30% smaller payloads.

Protocol negotiation: client_hello/server_hello handshake with feature discovery, capability advertisement, and version agreement.

Full protocol specification: docs/PROTOCOL.md


Compression

Two compression layers:

  • Client-facing: zlib for messages above the configurable threshold (default 1024 bytes). Applied automatically by send_event().
  • Inter-peer (cluster): zstd level 1 for payloads above 256 bytes. Capability-negotiated during handshake. Decompression output capped at 1 MB (MAX_FRAME_SIZE).

Client SDKs

Python

pip install wse-client

Full-featured async and sync client with connection pool, circuit breaker, auto-reconnect, E2E encryption, and msgpack binary transport.

from wse_client import connect

async with connect("ws://localhost:5007/wse", token="<jwt>") as client:
    await client.subscribe(["updates"])
    async for event in client:
        print(event.type, event.payload)

Sync interface:

from wse_client import SyncWSEClient

client = SyncWSEClient("ws://localhost:5007/wse", token="<jwt>")
client.connect()
client.subscribe(["updates"])

@client.on("updates")
def handle(event):
    print(event.payload)

client.run_forever()

Key features: 4 reconnect strategies (exponential, linear, fibonacci, adaptive), connection pool with health scoring and 3 load balancing strategies, circuit breaker, token bucket rate limiter, event sequencer with dedup and reorder buffering, network quality monitoring (latency/jitter/packet loss).

See python-client/ for full source and examples.

TypeScript / React

npm install wse-client

Single React hook (useWSE) for connection lifecycle, subscriptions, and message dispatch.

import { useWSE } from 'wse-client';

function App() {
  const { sendMessage, connectionHealth } = useWSE(
    '<jwt-token>',
    ['updates'],
    { endpoints: ['ws://localhost:5007/wse'] },
  );

  return <div>Status: {connectionHealth}</div>;
}

Key features: offline queue with IndexedDB persistence, adaptive quality management, connection pool with health scoring, E2E encryption (Web Crypto API), message batching, 5 priority levels, Zustand store for external state access.

See client/ for full source and examples.


Performance

Benchmarked on AMD EPYC 7502P (32 cores / 64 threads, 128 GB RAM), Ubuntu 24.04.

Mode Peak Throughput Connections Message Loss
Standalone (fan-out) 4.7M deliveries/s 100K 0%
Standalone (inbound JSON) 14.7M msg/s 500K 0%
Standalone (inbound msgpack) 30M msg/s 500K 0%
Cluster (2 nodes, 50/50) 6.6M deliveries/s 500 per node 0%

Sub-millisecond latency. Median 0.38ms with JWT authentication. Connection handshake: 0.53ms median (Rust JWT path).

Detailed results: Benchmarks | Fan-out | Rust Client | Python Client | TS Client


Examples

Working examples in the examples/ directory:

Example Description
standalone_basic.py Basic server with JWT auth and echo
standalone_broadcast.py Topic-based pub/sub with broadcasting
standalone_presence.py Per-topic presence tracking with join/leave events
standalone_recovery.py Message recovery on reconnect with epoch+offset

Documentation


License

MIT

About

WSE - Rust-powered WebSocket engine for Python. Up to 5M del/s fan-out, native cluster binary protocol, zero-GIL JWT, E2E encryption

Topics

Resources

License

Contributing

Security policy

Stars

Watchers

Forks

Packages

 
 
 

Contributors