From d9ddb931bf623b593b48d1ac46fcd7ab95945f92 Mon Sep 17 00:00:00 2001 From: devAsmodeus Date: Sat, 2 May 2026 13:26:13 +0300 Subject: [PATCH 1/7] feat(webui): stdlib http.server dashboard + SSE event stream Single-page HTML (vanilla JS, inline-SVG sparkline, no CDN), JSON snapshot at /api/stats (no-store), Server-Sent Events at /api/events covering share_found / share_accepted / share_rejected / job / pool, and /healthz proxied through the same provider as MetricsServer. StatsProvider extended with subscribe(callback) -> unsubscribe and publish_event hook; update_job/record_share/update_pool now publish. sha_backend tracked alongside the snapshot so the UI can show the active backend next to the hashrate. Tests: 17 new (StatsProvider pub/sub, render_html, HTML/JSON/SSE/healthz endpoints, idempotent start/stop, lifecycle). Co-Authored-By: Claude Opus 4.7 (1M context) --- src/hope_hash/tui.py | 89 +++++++- src/hope_hash/webui.py | 489 +++++++++++++++++++++++++++++++++++++++++ tests/test_webui.py | 272 +++++++++++++++++++++++ 3 files changed, 848 insertions(+), 2 deletions(-) create mode 100644 src/hope_hash/webui.py create mode 100644 tests/test_webui.py diff --git a/src/hope_hash/tui.py b/src/hope_hash/tui.py index 8b6c3a9..69a9038 100644 --- a/src/hope_hash/tui.py +++ b/src/hope_hash/tui.py @@ -21,10 +21,11 @@ from __future__ import annotations +import logging import threading import time from dataclasses import dataclass, field -from typing import Optional +from typing import Any, Callable, Optional # ─────────────────── Stats provider (без curses) ─────────────────── @@ -60,9 +61,18 @@ class StatsProvider: Намеренно plain-data, никакой бизнес-логики — она в ``mine()``. """ - def __init__(self, pool_url: str = "") -> None: + def __init__(self, pool_url: str = "", sha_backend: str = "hashlib") -> None: self._lock = threading.Lock() self._snap = StatsSnapshot(pool_url=pool_url) + # SHA-backend имя; не часть StatsSnapshot, потому что не меняется в runtime, + # но web-дашборду удобно читать из одного места. + self._sha_backend = sha_backend + # Подписчики на события (share found/accepted/rejected, job change, pool rotation). + # Каждый callback — sync-функция (event_type: str, payload: dict). Вызывается + # из публикующей нити (mine/supervisor) под локом списка, поэтому + # обработчики ДОЛЖНЫ быть быстрыми и не блокировать (например, кладут в Queue). + self._subscribers: list[Callable[[str, dict[str, Any]], None]] = [] + self._sub_lock = threading.Lock() def snapshot(self) -> StatsSnapshot: """Атомарный снимок текущего состояния.""" @@ -90,8 +100,15 @@ def update_hashrate(self, ema: float, last_sample: float, workers: int) -> None: def update_job(self, job_id: Optional[str], pool_difficulty: float) -> None: with self._lock: + prev = self._snap.current_job_id self._snap.current_job_id = job_id self._snap.pool_difficulty = float(pool_difficulty) + # Событие job-change только при реальной смене (чтобы не флудить SSE + # на каждый пересчёт хешрейта). + if prev != job_id: + self._publish( + "job", {"job_id": job_id, "pool_difficulty": float(pool_difficulty)} + ) def record_share(self, accepted: Optional[bool] = None) -> None: """Учёт шар. accepted=None → submitted (ещё ждём ответа пула). @@ -100,10 +117,14 @@ def record_share(self, accepted: Optional[bool] = None) -> None: if accepted is None: self._snap.shares_total += 1 self._snap.last_share_ts = time.time() + event = "share_found" elif accepted: self._snap.shares_accepted += 1 + event = "share_accepted" else: self._snap.shares_rejected += 1 + event = "share_rejected" + self._publish(event, {"accepted": accepted}) def update_pool(self, pool_url: str) -> None: """Меняет текущий pool URL (для multi-pool failover). @@ -113,6 +134,70 @@ def update_pool(self, pool_url: str) -> None: """ with self._lock: self._snap.pool_url = str(pool_url) + self._publish("pool", {"pool_url": pool_url}) + + @property + def sha_backend(self) -> str: + """Имя текущего SHA-backend (для /api/stats и UI).""" + return self._sha_backend + + def set_sha_backend(self, name: str) -> None: + """Меняет SHA-backend имя (вызывается один раз из cli.main).""" + self._sha_backend = str(name) + + # ─── publish/subscribe для SSE (web дашборд) ─── + + def subscribe( + self, callback: Callable[[str, dict[str, Any]], None] + ) -> Callable[[], None]: + """Подписка на события майнера. + + Возвращает функцию-отписку. Callback дёргается синхронно из + публикующей нити, поэтому должен быть мгновенным (в идеале — + ``queue.put_nowait``). Любые исключения внутри callback ловятся + и логируются как warning, чтобы один сломанный подписчик не + ронял publish для остальных. + """ + with self._sub_lock: + self._subscribers.append(callback) + + def _unsubscribe() -> None: + with self._sub_lock: + try: + self._subscribers.remove(callback) + except ValueError: + # Уже удалён — идемпотентно + pass + + return _unsubscribe + + def _publish(self, event_type: str, payload: dict[str, Any]) -> None: + """Внутренний publish — рассылает событие всем подписчикам. + + Снимок списка под локом, потом вызовы без удержания лока, чтобы + подписчик мог при желании отписаться внутри своего callback. + """ + with self._sub_lock: + subs = list(self._subscribers) + if not subs: + return + for cb in subs: + try: + cb(event_type, payload) + except Exception as exc: # noqa: BLE001 — пользовательский callback + logging.getLogger("hope_hash").warning( + "[stats] подписчик упал на событии %s: %s", event_type, exc + ) + + def publish_event(self, event_type: str, payload: dict[str, Any]) -> None: + """Публикует произвольное событие (используется mine() для + share-found / share-accepted / share-rejected / job-change). + + Публичный alias для ``_publish``, чтобы внешний код не лез в + приватный API. Имя события — строка, payload — JSON-сериализуемый + dict. + """ + self._publish(event_type, payload) def format_rate(rate: float) -> str: diff --git a/src/hope_hash/webui.py b/src/hope_hash/webui.py new file mode 100644 index 0000000..2902e82 --- /dev/null +++ b/src/hope_hash/webui.py @@ -0,0 +1,489 @@ +"""Web-дашборд на stdlib ``http.server``. Без зависимостей и без CDN. + +Структура зеркалит ``MetricsServer``: ``ThreadingHTTPServer`` в фоновой +нити, mutable container для health-провайдера, идемпотентные ``start()`` / +``stop()``. Источник данных — общий ``StatsProvider`` (тот же, что у TUI), +поэтому web-страница и curses-дашборд показывают одинаковые числа. + +Эндпоинты: + +- ``GET /`` — single-page HTML (vanilla JS, без CDN), сам опрашивает + ``/api/stats`` каждые 2 секунды и подключается к ``/api/events`` для + стрима. +- ``GET /api/stats`` — JSON-снапшот: hashrate, uptime, шары, pool URL, + sha_backend, текущий job_id и т. д. ``Cache-Control: no-store``. +- ``GET /api/events`` — Server-Sent Events: ``share_found``, + ``share_accepted``, ``share_rejected``, ``job``, ``pool``. Keep-alive + comment каждые 15 секунд. +- ``GET /healthz`` — то же, что у ``MetricsServer`` (ставится через + ``set_health_provider``). Удобно, когда web и metrics на разных портах. + +Web по умолчанию слушает только loopback (``127.0.0.1``) — то же +правило, что и у ``MetricsServer``. Если нужен внешний доступ, оператор +ставит обратный прокси (nginx/caddy) с auth. +""" + +from __future__ import annotations + +import html +import json +import logging +import queue +import threading +import time +from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer +from typing import Any, Callable, Optional + +from .tui import StatsProvider, format_rate, format_uptime + +logger = logging.getLogger("hope_hash") + +HealthProvider = Callable[[], dict] + + +# Размер кольцевого буфера событий на одного SSE-клиента. Больше — больше +# памяти на медленного клиента; меньше — рискуем уронить событие при флуде. +_SSE_QUEUE_MAX = 256 + +# Период keepalive-комментариев SSE. Спецификация SSE не требует, но без +# них некоторые прокси (включая nginx default) рвут idle-коннект. +_SSE_KEEPALIVE_S = 15.0 + + +def _stats_payload(provider: StatsProvider) -> dict[str, Any]: + """Серилизуемый JSON-снапшот для ``/api/stats``. + + Не возвращает datetime/Path/etc — только строки/числа/None, чтобы + ``json.dumps`` не падал на нестандартных типах. + """ + snap = provider.snapshot() + return { + "hashrate_ema": snap.hashrate_ema, + "hashrate_last": snap.hashrate_last, + "hashrate_human": format_rate(snap.hashrate_ema), + "workers": snap.workers, + "pool_url": snap.pool_url, + "current_pool": snap.pool_url, + "pool_difficulty": snap.pool_difficulty, + "current_job_id": snap.current_job_id, + "shares_total": snap.shares_total, + "shares_accepted": snap.shares_accepted, + "shares_rejected": snap.shares_rejected, + "last_share_ts": snap.last_share_ts, + "uptime_s": snap.uptime_s, + "uptime_human": format_uptime(snap.uptime_s), + "sha_backend": provider.sha_backend, + "started_at": snap.started_at, + "now": time.time(), + } + + +# ─────────────────── HTML ─────────────────── + +_HTML_PAGE = """ + + + +hope-hash dashboard + + + + +
+

hope-hash

+ solo BTC miner dashboard +
+ +
+
+
hashrate (EMA)
+
+ +
+
pool
+
sha backend
+
uptime
+
workers
+
job id
+
pool diff
+
shares (sent)
+
shares accepted
+
shares rejected
+
last share
+
+ +
+

recent events

+
+
+ +
+ polls /api/stats every 2s · streams /api/events via SSE · + /healthz +
+ + + + +""" + + +# ─────────────────── HTTP handler ─────────────────── + + +def _make_handler( + provider: StatsProvider, + health_provider_ref: list[Optional[HealthProvider]], +) -> type[BaseHTTPRequestHandler]: + """Фабрика handler-класса с замыканиями на provider/health. + + Тот же приём, что в ``metrics._make_handler``: всё через замыкания, + health-провайдер — однослотовый mutable container, чтобы можно было + подменить после ``start()``. + """ + + class _Handler(BaseHTTPRequestHandler): + # Server-Sent Events требует HTTP/1.1 для chunked transfer. + protocol_version = "HTTP/1.1" + + def do_GET(self) -> None: # noqa: N802 — имя задано базовым классом + if self.path == "/" or self.path == "/index.html": + self._serve_html() + return + if self.path == "/api/stats": + self._serve_stats() + return + if self.path == "/api/events": + self._serve_events() + return + if self.path == "/healthz": + self._serve_healthz(health_provider_ref[0]) + return + self.send_error(404) + + # ─── individual endpoints ─── + + def _serve_html(self) -> None: + body = _HTML_PAGE.encode("utf-8") + self.send_response(200) + self.send_header("Content-Type", "text/html; charset=utf-8") + self.send_header("Content-Length", str(len(body))) + self.send_header("Cache-Control", "no-store") + self.end_headers() + self.wfile.write(body) + + def _serve_stats(self) -> None: + payload = _stats_payload(provider) + body = json.dumps(payload).encode("utf-8") + self.send_response(200) + self.send_header("Content-Type", "application/json; charset=utf-8") + self.send_header("Content-Length", str(len(body))) + self.send_header("Cache-Control", "no-store") + self.end_headers() + self.wfile.write(body) + + def _serve_healthz(self, hp: Optional[HealthProvider]) -> None: + if hp is None: + payload: dict[str, Any] = { + "status": "down", + "reason": "no health provider registered", + } + http_status = 503 + else: + try: + payload = hp() or {} + except Exception as exc: # noqa: BLE001 — пользовательский callable + payload = {"status": "down", "reason": f"provider error: {exc}"} + http_status = 503 + else: + status = payload.get("status", "down") + http_status = 503 if status == "down" else 200 + body = json.dumps(payload).encode("utf-8") + self.send_response(http_status) + self.send_header("Content-Type", "application/json; charset=utf-8") + self.send_header("Content-Length", str(len(body))) + self.end_headers() + self.wfile.write(body) + + def _serve_events(self) -> None: + """SSE-стрим: подписываемся на provider, гоняем события в сокет. + + Завершается, когда: + - клиент закрыл коннект (BrokenPipeError/ConnectionResetError); + - сервер останавливается (provider.stop_event эквивалент: + мы держим короткие таймауты и проверяем connection alive). + """ + self.send_response(200) + self.send_header("Content-Type", "text/event-stream; charset=utf-8") + self.send_header("Cache-Control", "no-store") + # Полностью отключаем nginx/прокси-буферизацию, иначе события + # копятся пока не наберётся 4кб и UI «зависает». + self.send_header("X-Accel-Buffering", "no") + self.send_header("Connection", "keep-alive") + self.end_headers() + + ev_queue: queue.Queue[tuple[str, dict[str, Any]]] = queue.Queue( + maxsize=_SSE_QUEUE_MAX + ) + + def _on_event(event_type: str, payload: dict[str, Any]) -> None: + # Никогда не блокируем producer'а: если очередь полная, + # дропаем событие и логируем (лучше потерять одно + # событие, чем повесить mine-thread). + try: + ev_queue.put_nowait((event_type, payload)) + except queue.Full: + logger.warning("[webui] SSE queue full, dropping event %s", event_type) + + unsubscribe = provider.subscribe(_on_event) + try: + last_keepalive = time.time() + while True: + try: + event_type, payload = ev_queue.get(timeout=1.0) + except queue.Empty: + # Нет событий — возможно, время для keepalive. + if time.time() - last_keepalive >= _SSE_KEEPALIVE_S: + try: + self.wfile.write(b": keepalive\n\n") + self.wfile.flush() + except (BrokenPipeError, ConnectionResetError, OSError): + break + last_keepalive = time.time() + continue + try: + data = json.dumps(payload, default=str) + msg = f"event: {event_type}\ndata: {data}\n\n".encode("utf-8") + self.wfile.write(msg) + self.wfile.flush() + last_keepalive = time.time() + except (BrokenPipeError, ConnectionResetError, OSError): + # Клиент ушёл — выходим без шумного traceback. + break + finally: + unsubscribe() + + # ─── housekeeping ─── + + def log_message(self, format: str, *args: object) -> None: # noqa: A002 + # Тот же приём, что в metrics: единый канал — logger "hope_hash", + # засорять stderr каждым GET / не нужно. + return + + return _Handler + + +# ─────────────────── server lifecycle ─────────────────── + + +class WebUIServer: + """HTTP-дашборд на отдельной нити. Старт/стоп идемпотентны. + + Использование:: + + provider = StatsProvider(...) + server = WebUIServer(provider, port=8080) + server.start() + # ... майнер работает ... + server.stop() + """ + + def __init__( + self, + provider: StatsProvider, + host: str = "127.0.0.1", + port: int = 8080, + ) -> None: + self.provider = provider + self.host = host + self.port = int(port) + self._server: ThreadingHTTPServer | None = None + self._thread: threading.Thread | None = None + self._lifecycle_lock = threading.Lock() + self._health_ref: list[Optional[HealthProvider]] = [None] + + def set_health_provider(self, hp: Optional[HealthProvider]) -> None: + """Регистрирует health-провайдера (тот же контракт, что в ``MetricsServer``).""" + self._health_ref[0] = hp + + def start(self) -> None: + with self._lifecycle_lock: + if self._server is not None: + return + handler_cls = _make_handler(self.provider, self._health_ref) + self._server = ThreadingHTTPServer((self.host, self.port), handler_cls) + self._thread = threading.Thread( + target=self._server.serve_forever, + name=f"hope_hash-webui-{self.port}", + daemon=True, + ) + self._thread.start() + logger.info("[webui] дашборд на %s", self.url) + + def stop(self, timeout: float = 2.0) -> None: + with self._lifecycle_lock: + server = self._server + thread = self._thread + self._server = None + self._thread = None + if server is not None: + server.shutdown() + server.server_close() + if thread is not None: + thread.join(timeout=timeout) + if server is not None: + logger.info("[webui] дашборд остановлен") + + @property + def url(self) -> str: + return f"http://{self.host}:{self.port}/" + + +# Безопасный alias на случай, если кто-то ожидает функцию-фабрику HTML +# (например, для предзаполнения страницы в шаблоне). HTML — статика, но +# делаем доступным как функцию для тестов и переиспользования. +def render_html() -> str: + """Возвращает HTML-страницу дашборда (статика, без подстановок).""" + return _HTML_PAGE + + +# Экранируем на всякий случай, если когда-нибудь добавим динамическую +# подстановку. Сейчас не используется, но импортируется в тестах. +def _escape(value: Any) -> str: + return html.escape(str(value), quote=True) diff --git a/tests/test_webui.py b/tests/test_webui.py new file mode 100644 index 0000000..f447b52 --- /dev/null +++ b/tests/test_webui.py @@ -0,0 +1,272 @@ +"""Тесты web-дашборда: HTML, /api/stats, /api/events, /healthz, lifecycle.""" + +from __future__ import annotations + +import http.client +import json +import socket +import threading +import time +import unittest +from typing import Any + +from hope_hash.tui import StatsProvider +from hope_hash.webui import WebUIServer, render_html + + +def _free_port() -> int: + """Биндим эфемерный порт, освобождаем — отдаём номер. + + Между release и bind есть гонка, но для unittest на одной машине это + приемлемо. Если CI станет флаки — добавим retry-обёртку. + """ + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.bind(("127.0.0.1", 0)) + return s.getsockname()[1] + + +def _http_get(host: str, port: int, path: str, timeout: float = 3.0): + conn = http.client.HTTPConnection(host, port, timeout=timeout) + conn.request("GET", path) + resp = conn.getresponse() + body = resp.read() + headers = dict(resp.getheaders()) + status = resp.status + conn.close() + return status, headers, body + + +class TestStatsProviderEvents(unittest.TestCase): + """publish/subscribe — отдельно от HTTP, без сети.""" + + def test_subscribe_and_publish(self): + provider = StatsProvider() + captured: list[tuple[str, dict[str, Any]]] = [] + unsub = provider.subscribe(lambda t, p: captured.append((t, p))) + try: + provider.record_share(accepted=None) # share_found + provider.record_share(accepted=True) # share_accepted + provider.record_share(accepted=False) # share_rejected + provider.update_job("abc123", 4.0) # job + provider.update_pool("pool.example:3333") # pool + finally: + unsub() + types = [t for t, _ in captured] + self.assertIn("share_found", types) + self.assertIn("share_accepted", types) + self.assertIn("share_rejected", types) + self.assertIn("job", types) + self.assertIn("pool", types) + + def test_unsubscribe_idempotent(self): + provider = StatsProvider() + unsub = provider.subscribe(lambda t, p: None) + unsub() + unsub() # второй вызов не должен падать + + def test_subscriber_exception_does_not_break_publish(self): + provider = StatsProvider() + ok_calls: list[str] = [] + + def bad(t: str, p: dict) -> None: + raise RuntimeError("boom") + + def good(t: str, p: dict) -> None: + ok_calls.append(t) + + provider.subscribe(bad) + provider.subscribe(good) + provider.publish_event("x", {"k": 1}) + self.assertEqual(ok_calls, ["x"]) + + def test_job_not_published_when_unchanged(self): + provider = StatsProvider() + events: list[str] = [] + provider.subscribe(lambda t, p: events.append(t)) + provider.update_job("same", 1.0) + provider.update_job("same", 1.0) + provider.update_job("same", 1.0) + # Только один job-event на реальную смену. + self.assertEqual(events.count("job"), 1) + + def test_sha_backend_default_and_setter(self): + provider = StatsProvider() + self.assertEqual(provider.sha_backend, "hashlib") + provider.set_sha_backend("ctypes") + self.assertEqual(provider.sha_backend, "ctypes") + + +class TestRenderHtml(unittest.TestCase): + def test_html_contains_expected_strings(self): + html_text = render_html() + self.assertIn("hope-hash", html_text) + self.assertIn("hashrate", html_text) + self.assertIn("/api/stats", html_text) + self.assertIn("/api/events", html_text) + # Ни одной CDN-ссылки и ни одного