Capability Protocol for Autonomous Maritime Vessels
cocapn is a Python framework that defines how autonomous vessel agents discover, compile, share, and execute capabilities — discrete units of shipboard intelligence such as bilge monitoring, auto-docking, route optimization, and species identification. It is the nervous system of the SuperInstance Fleet, operating on Plane 3 (Domain): connecting physical sensors to fleet-level AI.
cam/ Capability Agent Manager — vessel-level agent orchestration & deck snapshots
capdb/ Capability Database v3 — vector store, semantic lookup, sync protocol, bootstrap bomb
cocapn/ Core protocol — SignalK telemetry, digital twin, anomaly detection, priority queues, camera
docs/ White papers (6), spec documents, founding philosophy, roadmap
tests/ pytest suite — 90+ tests across all modules
Maritime vessels produce a constant stream of sensor data — depth sounders, engine temperatures, GPS positions, camera feeds, weather sensors. cocapn turns this raw data into actionable intelligence through a protocol that answers three questions:
- What can this vessel do? — A capability registry (CapDB) catalogs every compiled function available on board, from engine-overheat guards to fish species screeners.
- What is happening right now? — A digital twin maps vessel rooms (Bridge, Engine Room, Galley) with live gauges fed by Signal K telemetry.
- What needs attention? — An anomaly detector watches gauge histories for rate-of-change spikes, and a priority queue schedules responses across HOT/WARM/COLD compute tiers.
Unlike traditional SCADA or vessel-monitoring software, cocapn is designed for autonomous agents — AI systems that operate vessels with minimal human intervention. Capabilities are compiled once, shared between vessels via a git-style sync protocol, and can be autonomously generated by the Bootstrap Bomb when gaps are detected.
Part of the Cocapn Fleet.
┌─────────────────────────────────────────────────────────────────────────┐
│ SUPERINSTANCE FLEET │
│ │
│ ┌──────────────┐ ┌──────────────────┐ ┌───────────────────────┐ │
│ │ CAM │───>│ CAPDB │───>│ COCAPN PROTOCOL │ │
│ │ Agent Mgr │ │ Capability DB │ │ Core Runtime │ │
│ │ │ │ │ │ │ │
│ │ • Deck state │ │ • 15+ compiled │ │ • SignalK Client │ │
│ │ • Snapshots │ │ capabilities │ │ • Digital Twin │ │
│ │ • Orchestration│ │ • Semantic search│ │ • Anomaly Detector │ │
│ └──────────────┘ │ • Vector embeds │ │ • Priority Queue │ │
│ │ │ • Sync protocol │ │ • Camera Pulse │ │
│ │ │ • Bootstrap bomb │ │ │ │
│ │ └──────────────────┘ └───────────┬───────────┘ │
│ │ │ │ │
│ │ ┌────────┴────────┐ ┌────────┴────────┐ │
│ │ │ │ │ │ │
│ │ ┌─────▼─────┐ ┌─────▼─────┐ ┌──▼───┐ ┌──────▼─────┐ │
│ └─────>│ Vessel A │ │ Vessel B │ │ GPS │ │ Sensors │ │
│ │ (Jetson) │<─>│ (Pi) │ │Depth │ │ Temp/Hum │ │
│ └─────────────┘ └────────────┘ │Cam │ │ Pressure │ │
│ └──────┘ └────────────┘ │
└─────────────────────────────────────────────────────────────────────────┘
Data Flow:
Sensors ──> SignalK ──> Digital Twin (rooms + gauges)
│
└──> Anomaly Detector ──> Priority Queue (HOT/WARM/COLD)
│
└──> Capability Execution
│
┌───────────┴───────────┐
│ CAPDB │
│ Compile · Lookup · Sync│
└───────────────────────┘
| Layer | Module | Role |
|---|---|---|
| CAM | cam/ |
Capability Agent Manager — orchestrates vessel-level agents, captures deck state snapshots for playback and audit |
| CAPDB | capdb/ |
Capability Database v3 — stores compiled capabilities with vector embeddings, provides semantic search, handles vessel-to-vessel sync |
| COCAPN | cocapn/ |
Core protocol runtime — ingests sensor data, maintains the digital twin, detects anomalies, schedules tasks, manages camera snapshots |
A capability is a compiled, tested, and versioned unit of autonomous vessel intelligence. Each capability includes:
- Code compiled for multiple targets (Python, C, Go, Rust)
- Vector embedding (32-dim) for semantic similarity search
- Hardware requirements (e.g.,
["Pi", "Jetson"]) - Test results from automated validation
- Vessel origin — which vessel first contributed the capability
- Semantic versioning with deprecation paths
# Example: a capability from the registry
{
"id": "monitor-bilge",
"description": "Monitor bilge water level using depth sensor and alert when threshold exceeded",
"hardware": {"min_ram": "1MB", "gpu": false},
"version": 1,
"compiled_from": "groq-llama-3.3-70b"
}The v0.2.0 CapDB ships with 15 pre-compiled capabilities:
| ID | Description |
|---|---|
monitor-bilge |
Bilge water depth monitoring with threshold alerts |
engine-guard |
Engine temperature guard against overheating |
fuel-calculator |
Remaining fuel hours from flow rate and tank capacity |
anchor-watch |
GPS anchor drift detection |
battery-sentinel |
Battery voltage prediction to critical drop |
species-screener |
Camera-based fish species identification during sorting |
weather-correlator |
Weather pattern vs. historical catch success correlation |
route-optimizer |
Fuel-efficient waypoint routing with sea conditions |
deck-logger |
Timestamped deck activity logging |
arrival-estimator |
ETA from speed, heading, and destination |
channel-guard |
Speed limit enforcement in narrow channels |
fish-counter |
Camera-based bin fish counting |
radio-relay |
Inter-vessel message relay |
depth-tracker |
Depth reading pattern analysis for bottom structure |
tide-predictor |
Tide timing from historical data and barometric pressure |
A grant is the permission for a vessel agent to execute a capability. In the current prototype, grants are implicit — all capabilities stored locally are available to the vessel's agent. The planned v0.2 security model introduces:
- Explicit grant tokens with scope and expiry
- Hardware tier gating — capabilities tagged
["Jetson"]won't execute on Tier 3 MCUs - Trust-but-monitor — external capabilities from other vessels are accepted but logged and rate-limited
Capabilities follow a lifecycle: create → test → deploy → deprecate. When a capability is superseded:
- The
deprecatedflag is set totrue - A
deprecation_pathpoints to the replacement capability ID - Downstream vessels receive the deprecation via the next sync digest
- After 30 days, deprecated capabilities are garbage-collected
from capdb.capdb_v3 import CompilationPipeline
pipeline = CompilationPipeline(llm, embedder, tester)
deprecated_cap = pipeline.deprecate(old_cap, replacement=new_cap, level="minor")
# deprecated_cap.version == "2.0.0", deprecated_cap.deprecated == TrueDelegation is the vessel-to-vessel sharing of capabilities via the Sync Protocol. The protocol uses a git-like model:
- HELLO — negotiate protocol version between peers
- DIGEST — exchange SHA-256 hashes of all capability records
- REQ/RESP — transfer missing or changed capabilities
- ACK — confirm receipt
Conflict resolution follows deterministic rules: higher semantic version wins; if versions match, the capability with the better test pass-rate is kept.
from capdb.capdb_v3 import SyncProtocol
sync = SyncProtocol(local_db, repo_path="/mnt/shared/capdb_remote.json")
sync.pull() # Fetch and merge remote capabilities
sync.push() # Publish local capabilities
sync.quality_score() # Returns 0.0–1.0 based on test pass rateAn agent is the autonomous software entity running on each vessel. Agents:
- Ingest real-time data via Signal K (navigation, propulsion, environment)
- Maintain a digital twin of the vessel with live-updating gauges
- Monitor all gauges for anomalies using rate-of-change detection
- Schedule responses through a three-tier priority queue (HOT/WARM/COLD)
- Execute capabilities from CapDB based on detected conditions
- Sync with other vessels to share and receive new capabilities
- Python 3.10+
- No external dependencies required for core
cocapnmodules (stdlib only) numpyandscikit-learnrequired forcapdb/capdb_v3.pysemantic search
cd cocapn
pip install numpy scikit-learn # Only needed for capdb_v3Connect to a vessel's Signal K server to pull live navigation, propulsion, and environment data:
from cocapn.signalk import SignalKClient
# Connect to the vessel's Signal K server
sk = SignalKClient(host='192.168.1.100', port=3000)
# Fetch individual data groups
nav = sk.get_navigation() # {"speedOverGround": {"value": 8.2, "units": "knots"}, ...}
prop = sk.get_propulsion() # {"engine0": {"temperature": {"value": 178, "units": "F"}}, ...}
env = sk.get_environment() # {"wind": {"speedOverGround": {"value": 12, "units": "knots"}}, ...}
# Or fetch everything at once
all_data = sk.get_all()Build a room-based digital twin that auto-populates from Signal K data:
from cocapn.signalk import SignalKClient
from cocapn.digital_twin import DigitalTwin
sk = SignalKClient()
twin = DigitalTwin("F/V Northwest Wind")
# Auto-create rooms and gauges from live data
twin.update_from_signalk(sk.get_all())
# Inspect rooms
print(twin.look("Bridge"))
# -- Bridge --
# Command center
#
# [knots] SOG: 8.2
# [ deg] HDG: 45
# Exits: Flying Bridge, Engine Room
print(twin.map())
# === F/V Northwest Wind Map ===
#
# [Bridge] SOG, HDG
# [Engine Room] engine0_temperature, engine0_oilPressure
# Manually add rooms and gauges
engine_room = twin.add_room("Engine Room", "Propulsion systems")
engine_room.add_gauge("RPM", 2500, "rpm")
engine_room.add_exit("up", "Bridge")Feed gauge readings into the anomaly detector to catch rate-of-change spikes:
from cocapn.anomaly import AnomalyDetector
det = AnomalyDetector(window=20)
# Feed time-series readings
for reading in sensor_stream:
det.feed("engine_temp", reading)
# Check for anomalies (returns None if stable)
result = det.check("engine_temp", threshold_pct=0.1)
if result:
print(f"WARNING: {result['gauge']} is {result['direction']} "
f"({result['pct_change']}% change rate)")
# Output: WARNING: engine_temp is rising (47.83% change rate)Schedule tasks across three compute tiers with automatic escalation:
from cocapn.priority_queue import Priority, AsyncQueue
q = AsyncQueue(data_dir="/var/cocapn/queue")
# Capture tasks at different priorities
q.capture(Priority.HOT, "engine_overheat", {"temp": 210, "limit": 190})
q.capture(Priority.WARM, "route_recalc", {"waypoints": [...]})
q.capture(Priority.COLD, "deck_log_sync", {"since": "2026-04-14"})
# Process all urgent tasks immediately
hot_tasks = q.process_hot()
# Process up to 5 warm tasks
warm_tasks = q.process_warm(max_tasks=5)
# Escalate a category if it's been waiting too long
q.escalate("route_recalc") # WARM -> HOT
# Check queue health
stats = q.stats()
# {"queued": 2, "processed": 6, "by_priority": {"HOT": 0, "COLD": 2}, "by_category": {...}}Persist camera frames with metadata for later analysis:
from cocapn.camera import CameraPulse
cam = CameraPulse(data_dir="/mnt/storage/cocapn/camera")
# Save a snapshot (dict or JSON string)
path = cam.save_snapshot("deck", frame_data, metadata={"exposure": "auto", "iso": 800})
# Retrieve recent snapshots
recent = cam.get_recent("deck", count=10)
# ["deck_20260414_030811", "deck_20260414_030812", ...]Compile, store, search, and sync capabilities:
from capdb.capdb_v3 import (
CapDB, Capability, CapabilityGroup, CapabilityQuery,
CompilationPipeline, SemanticLookup, SyncProtocol, BootstrapBomb
)
# Initialize the store
db = CapDB()
# Load from existing JSON
db.load_from_json("capdb/capdb.json")
print(f"Loaded {len(db.list_capabilities())} capabilities")
# Semantic search
lookup = SemanticLookup(db)
query = CapabilityQuery(
embedding=[0.1, 0.2, ...], # From your embedding model
hardware_filter=["Pi"],
domain_filter="navigation",
max_results=5
)
results = lookup.search(query)
for cap, score in results:
print(f"{cap.name} (v{cap.version}) — confidence={score:.3f}")
# Vessel-to-vessel sync
sync = SyncProtocol(db, repo_path="/mnt/shared/capdb_remote.json")
sync.pull() # Merge remote capabilities into local DB
sync.push() # Publish local capabilities
print(f"Quality score: {sync.quality_score():.0%}")
# Bootstrap bomb — autonomously fill capability gaps
bomb = BootstrapBomb(db, pipeline, lookup)
missing = bomb.detect_gaps(["Auto-dock at pier", "Detect oil spill", "Predict weather 24h"])
for desc in missing:
bomb.fill_gap(desc, name=desc.replace(" ", "_"), hardware=["Pi"], vessel_origin="Vessel-A")
print(f"LLM call reduction rate: {bomb.llm_reduction_rate:.1%}")| Class/Method | Description |
|---|---|
SignalKClient(host, port) |
Connect to a Signal K v1 API server (default: localhost:3000) |
.get(path) |
Raw GET request to any Signal K API path |
.get_navigation() |
Shortcut: vessels/self/navigation |
.get_propulsion() |
Shortcut: vessels/self/propulsion |
.get_environment() |
Shortcut: vessels/self/environment |
.get_all() |
Returns dict with navigation, propulsion, environment keys |
| Class/Method | Description |
|---|---|
DigitalTwin(vessel_name) |
Create a twin for a named vessel |
.add_room(name, description, gauges) |
Add a room; returns the Room object |
.update_from_signalk(data) |
Auto-create Bridge + Engine Room from Signal K data |
.look(name) |
Human-readable room view with gauges and exits |
.map() |
Overview of all rooms and their live gauge names |
Room(name, description, gauges) |
Individual vessel space with gauges and exits |
Room.add_gauge(name, value, unit) |
Add/update a live gauge reading |
Room.add_exit(direction, room_name) |
Connect rooms directionally |
| Class/Method | Description |
|---|---|
AnomalyDetector(window=20) |
Rolling-window anomaly detector |
.feed(gauge, value, timestamp) |
Record a gauge reading |
.check(gauge, threshold_pct=0.1) |
Check for rate-of-change anomaly; returns dict or None |
Return value when anomaly detected:
{
"gauge": "engine_temp",
"rate": 15.67, # Change per sample
"pct_change": 47.83, # Percentage relative to rolling average
"direction": "rising", # "rising" or "falling"
"values": [178, 183, 208] # Last 3 numeric readings
}| Class/Method | Description |
|---|---|
Priority |
IntEnum: HOT=0, WARM=1, COLD=2 |
Task(priority, category, payload) |
Unit of work with enqueue time and escalation tracking |
AsyncQueue(data_dir) |
Persistent priority queue with JSON state file |
.capture(priority, category, payload) |
Enqueue a task; returns the Task |
.pop() |
Dequeue highest-priority task (FIFO within priority) |
.process_hot() |
Drain all HOT tasks |
.process_warm(max_tasks=5) |
Drain up to N WARM tasks |
.escalate(category) |
Promote all tasks in a category by one tier |
.stats() |
Queue statistics: counts by priority and category |
| Class/Method | Description |
|---|---|
CameraPulse(data_dir) |
Snapshot manager creating full/<camera>/ directory tree |
.save_snapshot(camera, data, metadata) |
Save JSON frame; returns file path |
.get_recent(camera, count=10) |
List recent snapshot filenames (newest first) |
| Class | Description |
|---|---|
Capability |
Dataclass: id, name, description, code, embedding, hardware_reqs, version, test_results, deprecation |
CapabilityGroup |
Logical grouping with domain tag (navigation, engine, bilge, weather) |
CapabilityQuery |
Search query with embedding, hardware/domain filters, max_results |
CapDB |
In-memory store with CRUD, hardware/domain filtering, JSON persistence |
CompilationPipeline |
End-to-end: LLM code gen → multi-target compile → test → embed → store |
SemanticLookup |
Cosine-similarity search with hardware/domain filters and confidence scores |
SyncProtocol |
Git-style vessel-to-vessel capability sharing with conflict resolution |
BootstrapBomb |
Autonomous gap detection and capability generation with LLM-call reduction tracking |
The project uses pytest with 90+ tests across all modules.
# Via pytest (recommended)
cd cocapn
python -m pytest tests/ -v
# Via the bundled test runner (spins up a mock Signal K server)
python test_all.pypython -m pytest tests/test_signalk.py -v # Signal K client
python -m pytest tests/test_digital_twin.py -v # Digital twin
python -m pytest tests/test_anomaly.py -v # Anomaly detector
python -m pytest tests/test_priority_queue.py -v # Priority queue
python -m pytest tests/test_camera.py -v # Camera pulse
python -m pytest tests/test_version.py -v # Package metadata| Module | Tests | Coverage Focus |
|---|---|---|
signalk.py |
10 | Connection, JSON parsing, error handling, convenience methods |
digital_twin.py |
27 | Room CRUD, gauge management, Signal K auto-update, map rendering |
anomaly.py |
17 | Feed/window management, rising/falling detection, thresholds, edge cases |
priority_queue.py |
31 | Priority ordering, FIFO within tier, escalation, persistence, state reload |
camera.py |
14 | Snapshot save/load, metadata, timestamp naming, recent retrieval |
version |
4 | Package version format and existence |
Tests use unittest.mock for network isolation (Signal K) and pytest.tmp_path fixtures for filesystem isolation (queue, camera). The test_all.py integration runner spins up a mock HTTP server on localhost:3099 serving realistic vessel telemetry data.
Status: Active | Phase: v0.1 prototype | Fleet Score: 90/100 | Updated: 2026-04-14
- Core Protocol (
cocapn/) — Signal K ingestion, digital twin with room graph, anomaly detection, three-tier priority queue, camera snapshot persistence. All stdlib, zero external dependencies. - Capability Database v3 (
capdb/capdb_v3.py) — Full data model, compilation pipeline (multi-target LLM code gen), semantic lookup with cosine similarity, vessel-to-vessel sync with conflict resolution, bootstrap bomb for autonomous gap filling. - 15 Pre-Compiled Capabilities (
capdb/capdb.json) — Maritime operational capabilities from bilge monitoring to route optimization, compiled via Groq LLaMA 3.3 70B. - Comprehensive Tests — 90+ pytest cases with mock isolation and integration coverage.
- White Papers — 6 research papers covering forcing function architecture, crew-as-a-service, lazy evaluation, compiled agency, bootstrap bomb, and semantic compiler.
| Milestone | Scope |
|---|---|
| v0.2 | Explicit capability grants with scope tokens; hardware tier gating; REST API per CAPDB-V3-SPEC |
| v0.3 | Encrypted WebSocket sync (TLS 1.3 / QUIC); mutual TLS with vessel X.509 certificates |
| v0.4 | Persistent vector DB backend (FAISS/Milvus); real LLM integration; automated test harness |
| v0.5 | Full bootstrap bomb with CRL revocation; Merkle-tree audit log; production deployment |
- CHARTER ✅
- DOCKSIDE-EXAM ✅
- README ✅
cocapn/
├── README.md # This file
├── ABSTRACTION.md # Plane 3 (Domain) abstraction mapping
├── CHARTER.md # Mission, type (vessel), captain, maintainer
├── STATE.md # Protocol status, phase, fleet score
├── test_all.py # Integration test runner with mock Signal K server
│
├── cocapn/ # Core protocol runtime
│ ├── __init__.py # Version 0.1.0
│ ├── signalk.py # Signal K v1 API client
│ ├── digital_twin.py # Room-based vessel digital twin
│ ├── anomaly.py # Rate-of-change anomaly detector
│ ├── priority_queue.py # HOT/WARM/COLD async priority queue
│ └── camera.py # Camera snapshot persistence
│
├── cam/ # Capability Agent Manager
│ └── full/deck/ # Deck state snapshots
│
├── capdb/ # Capability Database
│ ├── capdb_v3.py # Full v3 spec: data model, pipeline, lookup, sync, bomb
│ └── capdb.json # 15 pre-compiled capabilities with embeddings
│
├── docs/ # Documentation & research
│ ├── CAPDB-V3-SPEC.md # CapDB v3 specification (REST API, data model, security)
│ ├── FOUNDING-PHILOSOPHY.md # Nine cultural currents synthesis
│ ├── FLEET-ROADMAP-90DAY.md # 90-day fleet roadmap
│ ├── PRODUCT-ROADMAP.md # Product roadmap
│ ├── cocapn-wp-001.json # WP-001: Forcing Function Architecture
│ ├── cocapn-wp-002.json # WP-002: Crew as a Service
│ ├── cocapn-wp-003.json # WP-003: Lazy Evaluation
│ ├── cocapn-wp-004.json # WP-004: Compiled Agency
│ ├── cocapn-wp-005.json # WP-005: Bootstrap Bomb
│ └── cocapn-wp-006.json # WP-006: Semantic Compiler
│
├── tests/ # pytest suite
│ ├── __init__.py
│ ├── test_version.py
│ ├── test_signalk.py
│ ├── test_digital_twin.py
│ ├── test_anomaly.py
│ ├── test_priority_queue.py
│ └── test_camera.py
│
└── callsign1.jpg # Vessel callsign image
Part of the Cocapn Fleet. Captain: Casey Digennaro. Maintainer: Oracle1 (Lighthouse Keeper).
