diff --git a/.gitignore b/.gitignore index 0bf110a..72150b1 100644 --- a/.gitignore +++ b/.gitignore @@ -16,6 +16,10 @@ htmlcov/ venv/ env/ *.log +*.db +*.db-journal +*.db-wal +*.db-shm .idea/ .vscode/ .DS_Store diff --git a/CHANGELOG.md b/CHANGELOG.md index 5a6620a..d4a4e4d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,30 @@ ## [Unreleased] +## [0.2.0] — 2026-04-30 + +### Добавлено +- **Multiprocessing** (`parallel.py`): N воркеров делят nonce-пространство + `[0, 2³²)` равными долями. CLI-флаг `--workers N` (default `cpu_count - 1`). + `found_queue` для шаров, `hashes_counter` для статистики. +- **EMA-хешрейт**: скользящее среднее (alpha=0.3, окно 5с) вместо мгновенного. +- **SQLite-журнал** (`storage.py`): таблицы `shares` и `sessions`, WAL-режим, + потокобезопасность. CLI-флаги `--db PATH`, `--no-db`. +- **Prometheus-экспортёр** (`metrics.py`): `/metrics` на `http.server`, метрики + `hopehash_shares_total`, `hopehash_hashrate_hps`, `hopehash_pool_difficulty`, + `hopehash_workers`, `hopehash_uptime_seconds`. CLI-флаг `--metrics-port` + (default 9090, 0 — выключить). +- **Telegram-уведомления** (`notifier.py`): через stdlib `urllib`, без + `python-telegram-bot`. Конфиг через env: `HOPE_HASH_TELEGRAM_TOKEN`, + `HOPE_HASH_TELEGRAM_CHAT_ID`. События: started / stopped / share_accepted / + block_found. +- 41 новый юнит-тест (storage: 9, metrics: 16, notifier: 16). Всего **56 тестов**. + +### Изменено +- `mine()` принимает опциональные `store`, `metrics`, `notifier` (None — disabled). +- `__init__.py` re-export `ShareStore`, `Metrics`, `MetricsServer`, `TelegramNotifier`. +- `.gitignore` дополнен `*.db`, `*.db-journal`, `*.db-wal`, `*.db-shm`. + ## [0.1.0] — 2026-04-30 ### Добавлено @@ -19,5 +43,6 @@ - 15 юнит-тестов на криптографические функции (`unittest`). - Реструктуризация в `src/`-layout с пакетом `hope_hash`. -[Unreleased]: https://github.com/KruglikovskiiPA/Hope-Hash/compare/v0.1.0...HEAD -[0.1.0]: https://github.com/KruglikovskiiPA/Hope-Hash/releases/tag/v0.1.0 +[Unreleased]: https://github.com/devAsmodeus/Hope-Hash/compare/v0.2.0...HEAD +[0.2.0]: https://github.com/devAsmodeus/Hope-Hash/compare/v0.1.0...v0.2.0 +[0.1.0]: https://github.com/devAsmodeus/Hope-Hash/releases/tag/v0.1.0 diff --git a/README.md b/README.md index 3ef924d..d302e5f 100644 --- a/README.md +++ b/README.md @@ -36,12 +36,21 @@ - [x] CI matrix Python 3.11/3.12/3.13 × ubuntu/windows/macos - [ ] Конфиг через CLI/YAML — перенесено в Уровень 1 +**Уровень 1 — производительность и наблюдаемость: завершён.** + +- [x] Multiprocessing: N воркеров (default `cpu_count - 1`), флаг `--workers` +- [x] EMA-хешрейт (alpha=0.3, окно 5с) +- [x] SQLite-журнал шаров и сессий (`storage.py`, флаг `--db`) +- [x] Prometheus-метрики на `/metrics` (`metrics.py`, флаг `--metrics-port`) +- [x] Telegram-уведомления (через stdlib urllib, env-конфиг) +- [ ] TUI на `rich` / `curses` — отложено (зависимости либо ограниченная Win-поддержка) +- [ ] Команды Telegram-бота (`/stats`, `/restart`) — отложено + **Не сделано / известные ограничения:** -- Один поток → ~50–200 KH/s на ноуте. Нет multiprocessing. -- Нет персистентной статистики (логов, БД). -- Нет UI — только консоль через `logging`. +- Нет UI — только консоль через `logging` и `/metrics` через HTTP. - Только Stratum V1, без `mining.suggest_difficulty` и без Stratum V2. +- C/Rust/SIMD/GPU — Уровни 2–3, ещё впереди. --- @@ -60,15 +69,22 @@ ├── src/hope_hash/ │ ├── __init__.py ← публичный API + __version__ │ ├── __main__.py ← `python -m hope_hash` -│ ├── cli.py ← argparse, точка входа -│ ├── miner.py ← mine(), supervisor_loop, run_session +│ ├── cli.py ← argparse, точка входа, инициализация observers +│ ├── miner.py ← mine() оркестратор + supervisor_loop +│ ├── parallel.py ← multiprocessing воркеры nonce-loop │ ├── stratum.py ← StratumClient (TCP + JSON-RPC) │ ├── block.py ← double_sha256, swap_words, target, merkle +│ ├── storage.py ← SQLite журнал шаров и сессий +│ ├── metrics.py ← Prometheus экспортёр (http.server) +│ ├── notifier.py ← Telegram через urllib │ ├── _logging.py ← настройка logger("hope_hash") │ └── py.typed ← PEP 561 marker └── tests/ ├── conftest.py ← общие фикстуры (заготовка) - └── test_block.py ← 15 unittest-тестов на чистые функции + ├── test_block.py ← 15 тестов на чистые функции + ├── test_storage.py ← 9 тестов на SQLite журнал + ├── test_metrics.py ← 16 тестов на Prometheus экспортёр + └── test_notifier.py ← 16 тестов на Telegram (через mock) ``` --- @@ -92,10 +108,27 @@ python -m hope_hash [имя_воркера] hope-hash bc1q5n2x4pvxhq8sxc7ck3uxq8sxc7ck3uxqzfm2py mylaptop ``` +**Расширенные опции:** + +```bash +hope-hash mylaptop \ + --workers 8 \ # число процессов (default: cpu_count - 1) + --db ./shares.db \ # путь к SQLite (default: hope_hash.db) + --metrics-port 9090 # Prometheus /metrics (0 — отключить) +``` + +**Telegram-уведомления (опционально):** задать env vars и просто запустить: + +```bash +export HOPE_HASH_TELEGRAM_TOKEN=123456:abcdef-your-bot-token +export HOPE_HASH_TELEGRAM_CHAT_ID=123456789 +hope-hash +``` + **Тесты:** ```bash -python -m unittest discover -s tests -v +python -m unittest discover -s tests -v # 56 тестов ``` BTC-адрес нужен валидный (любой формат: `1...`, `3...`, `bc1q...`, `bc1p...`). Можно завести в любом некастодиальном кошельке — например, **Sparrow**, **Electrum**, **Wasabi**. Имя воркера — произвольная строка. diff --git a/ROADMAP.md b/ROADMAP.md index 31820ca..791a276 100644 --- a/ROADMAP.md +++ b/ROADMAP.md @@ -26,12 +26,15 @@ ## Уровень 1 — лёгкие апгрейды (вечер каждый) +**Статус (2026-04-30):** производительность + наблюдаемость — закрыто. +TUI и команды Telegram — отложены. + Видимые фичи, не требующие глубокой переработки. ### Производительность -- [ ] **Multiprocessing.** Каждый CPU-ядро — отдельный процесс, общий `extranonce2` через `multiprocessing.Value`. Х2-х8 к хешрейту в зависимости от железа. -- [ ] **Замер реального хешрейта по окнам.** Сейчас считается «за интервал печати», правильнее — скользящее окно с EMA (exponential moving average). +- [x] **Multiprocessing.** `parallel.py`, CLI `--workers N`. nonce-пространство `[0, 2³²)` делится поровну между N процессами. found_queue + hashes_counter (`mp.Value('Q')`). На 16-CPU машине default = 15 воркеров. +- [x] **EMA-хешрейт.** alpha=0.3, окно 5с. Сэмпл = дельта счётчика / dt. Логируется и в `[stats]`, и в Prometheus gauge `hopehash_hashrate_hps`. ### UI/UX @@ -41,14 +44,14 @@ ### Telegram-бот -- [ ] **Уведомления о ключевых событиях:** старт майнера, потеря соединения, принятый шар, (мечты) найденный блок. Через `python-telegram-bot` или просто curl на `api.telegram.org`. -- [ ] **Команды `/stats`, `/restart`, `/stop`** через того же бота. +- [x] **Исходящие уведомления.** `notifier.py` через stdlib `urllib`, без `python-telegram-bot`. События: started / stopped / share_accepted / block_found / disconnected / reconnected. Конфиг через env (`HOPE_HASH_TELEGRAM_TOKEN`, `HOPE_HASH_TELEGRAM_CHAT_ID`). Если переменные не заданы — модуль молча disabled. +- [ ] **Входящие команды `/stats`, `/restart`, `/stop`** через тот же бот (long polling). Отложено. ### Логи и метрики -- [ ] **SQLite-журнал шар.** Каждый принятый шар → строка с timestamp, job_id, hash, difficulty. -- [ ] **Экспорт в Prometheus формат** через эндпоинт `/metrics` на `http.server`. -- [ ] **Grafana-дашборд** с готовым JSON для импорта. +- [x] **SQLite-журнал** (`storage.py`). Таблицы `shares` (ts, job_id, nonce_hex, hash_hex, difficulty, accepted, is_block) и `sessions`. WAL-режим, потокобезопасность через `threading.Lock`. +- [x] **Prometheus-экспортёр** (`metrics.py`). Эндпоинт `/metrics` на `http.server` (`ThreadingHTTPServer` в фоновой нити). Метрики: `hopehash_shares_total`, `hopehash_hashrate_hps`, `hopehash_pool_difficulty`, `hopehash_workers`, `hopehash_uptime_seconds`. CLI `--metrics-port` (0 — выключить). +- [ ] **Grafana-дашборд** с готовым JSON для импорта. Отложено. --- diff --git a/pyproject.toml b/pyproject.toml index 0b3a11c..b1c2de0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -28,8 +28,8 @@ dependencies = [] dynamic = ["version"] [project.urls] -Repository = "https://github.com/KruglikovskiiPA/Hope-Hash" -Issues = "https://github.com/KruglikovskiiPA/Hope-Hash/issues" +Repository = "https://github.com/devAsmodeus/Hope-Hash" +Issues = "https://github.com/devAsmodeus/Hope-Hash/issues" [project.scripts] hope-hash = "hope_hash.cli:main" diff --git a/src/hope_hash/__init__.py b/src/hope_hash/__init__.py index 1f56b52..9af66f7 100644 --- a/src/hope_hash/__init__.py +++ b/src/hope_hash/__init__.py @@ -1,10 +1,13 @@ """Hope-Hash — учебный solo BTC miner на чистом stdlib.""" from .block import build_merkle_root, difficulty_to_target, double_sha256, swap_words +from .metrics import Metrics, MetricsServer from .miner import mine +from .notifier import TelegramNotifier +from .storage import ShareStore from .stratum import StratumClient -__version__ = "0.1.0" +__version__ = "0.2.0" __all__ = [ "double_sha256", "swap_words", @@ -12,5 +15,9 @@ "build_merkle_root", "StratumClient", "mine", + "ShareStore", + "Metrics", + "MetricsServer", + "TelegramNotifier", "__version__", ] diff --git a/src/hope_hash/cli.py b/src/hope_hash/cli.py index e2108b7..649c535 100644 --- a/src/hope_hash/cli.py +++ b/src/hope_hash/cli.py @@ -1,11 +1,17 @@ -"""Точка входа CLI: argparse, запуск supervisor + mine().""" +"""Точка входа CLI: argparse, запуск supervisor + mine() + observers.""" import argparse +import multiprocessing +import os import threading import time +from pathlib import Path from ._logging import logger, setup_logging +from .metrics import Metrics, MetricsServer from .miner import mine, supervisor_loop +from .notifier import TelegramNotifier +from .storage import ShareStore from .stratum import StratumClient @@ -13,9 +19,13 @@ POOL_PORT = 3333 -def main(): - setup_logging() +def _default_workers() -> int: + """Один CPU оставляем сетевой части/IO. Минимум — 1 воркер.""" + cpu = os.cpu_count() or 1 + return max(1, cpu - 1) + +def _parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser( prog="hope_hash", description="Учебный solo BTC miner на чистом stdlib.", @@ -23,13 +33,61 @@ def main(): parser.add_argument("btc_address", help="BTC-адрес для выплат (на него уйдёт награда).") parser.add_argument("worker_name", nargs="?", default="py01", help="Имя воркера (по умолчанию: py01).") - args = parser.parse_args() + parser.add_argument( + "--workers", type=int, default=_default_workers(), + help=f"Число процессов-воркеров (по умолчанию: {_default_workers()} = cpu_count - 1).", + ) + parser.add_argument( + "--db", type=str, default="hope_hash.db", + help="Путь к SQLite-журналу шаров (по умолчанию: hope_hash.db).", + ) + parser.add_argument( + "--no-db", action="store_true", + help="Отключить SQLite-журнал (--db игнорируется).", + ) + parser.add_argument( + "--metrics-port", type=int, default=9090, + help="Порт Prometheus /metrics (по умолчанию: 9090, 0 — отключить).", + ) + return parser.parse_args() + - btc_address = args.btc_address - worker_name = args.worker_name +def main(): + # Защитный вызов: на Windows multiprocessing требует freeze_support() + # при запуске через `python -m hope_hash`. Без него spawn-дети могут + # пытаться повторно стартовать main() и упасть. + multiprocessing.freeze_support() + + setup_logging() + args = _parse_args() + n_workers = max(1, args.workers) + + # ─── observers ─── + # Все три опциональны и не зависят друг от друга. Каждый сам решает, + # включаться ли (notifier — по env vars; metrics — по порту; store — по флагу). + store: ShareStore | None = None + if not args.no_db: + store = ShareStore(Path(args.db)) + metrics: Metrics | None = None + metrics_server: MetricsServer | None = None + if args.metrics_port > 0: + metrics = Metrics() + metrics_server = MetricsServer(metrics, port=args.metrics_port) + metrics_server.start() + + notifier = TelegramNotifier.from_env() + notifier.notify_started(args.btc_address, args.worker_name) + + if store is not None: + session_id = store.start_session(POOL_HOST, args.btc_address, args.worker_name) + else: + session_id = None + + # ─── сетевая часть и mine() ─── stop = threading.Event() - client = StratumClient(POOL_HOST, POOL_PORT, btc_address, worker_name, stop_event=stop) + client = StratumClient(POOL_HOST, POOL_PORT, args.btc_address, args.worker_name, + stop_event=stop) # Сетевая часть живёт в отдельной нити-супервизоре: она держит коннект, # переподключается при разрывах и сама поднимает reader_loop. main thread @@ -38,13 +96,14 @@ def main(): name="stratum-supervisor", daemon=False) supervisor.start() - logger.info("[main] жду первый job от пула...") + logger.info(f"[main] жду первый job от пула... (воркеров: {n_workers})") while client.current_job is None and not stop.is_set(): time.sleep(0.1) try: if not stop.is_set(): - mine(client, stop) + mine(client, stop, n_workers=n_workers, + store=store, metrics=metrics, notifier=notifier) except KeyboardInterrupt: logger.info("[main] остановка по Ctrl+C") finally: @@ -55,3 +114,13 @@ def main(): supervisor.join(timeout=5) if supervisor.is_alive(): logger.warning("[main] supervisor не остановился за 5с") + + # Закрываем observers последними, чтобы дать им зафиксировать финальные события. + notifier.notify_stopped() + notifier.shutdown() + if metrics_server is not None: + metrics_server.stop() + if store is not None: + if session_id is not None: + store.end_session(session_id) + store.close() diff --git a/src/hope_hash/metrics.py b/src/hope_hash/metrics.py new file mode 100644 index 0000000..4dec04e --- /dev/null +++ b/src/hope_hash/metrics.py @@ -0,0 +1,188 @@ +"""Prometheus-совместимые метрики через stdlib http.server. Без зависимостей. + +Два класса: ``Metrics`` — потокобезопасный регистр counter/gauge, +``MetricsServer`` — HTTP-сервер ``/metrics`` на фоновой нити. Логгер +берём по имени пакета, чтобы не плодить циклических импортов. +""" + +import logging +import threading +import time +from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer + +logger = logging.getLogger("hope_hash") + + +# Допустимые символы для имени метрики по Prometheus naming convention: +# первая буква — [a-zA-Z_:], остальные — [a-zA-Z0-9_:]. Всё прочее +# заменяем на подчёркивание. Это терпит произвольный пользовательский ввод. +_NAME_FIRST_OK = set("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ_:") +_NAME_REST_OK = _NAME_FIRST_OK | set("0123456789") + + +def _sanitize_name(name: str) -> str: + """Приводит имя метрики к виду ``[a-zA-Z_:][a-zA-Z0-9_:]*``.""" + if not name: + return "_" + chars = [] + for i, ch in enumerate(name): + ok = _NAME_FIRST_OK if i == 0 else _NAME_REST_OK + chars.append(ch if ch in ok else "_") + return "".join(chars) + + +def _format_float(v: float) -> str: + """Форматирует gauge для Prometheus: целые без ``.0``, дробные через ``repr``.""" + if v == int(v) and abs(v) < 1e16: + return str(int(v)) + return repr(v) + + +class Metrics: + """Регистр метрик. Потокобезопасный. + + Поддерживаемые типы: + + - counter — монотонно растущий int (например, число шар). + - gauge — произвольное float (например, текущий хешрейт). + + Формат вывода соответствует Prometheus text format 0.0.4. Помимо + пользовательских метрик, ``render()`` всегда добавляет автоматический + gauge ``hopehash_uptime_seconds``. + """ + + def __init__(self) -> None: + self._lock = threading.Lock() + self._counters: dict[str, int] = {} + self._gauges: dict[str, float] = {} + self._help: dict[str, str] = {} + self._created_at = time.time() + + def counter_inc(self, name: str, value: int = 1, help: str | None = None) -> None: + """Инкрементирует counter. ``value`` должен быть >= 0.""" + if value < 0: + # Counter по определению монотонный; отрицательный шаг — это баг + # в коде вызывающего, лучше упасть громко, чем тихо «декрементить». + raise ValueError("counter_inc: value must be >= 0") + key = _sanitize_name(name) + with self._lock: + self._counters[key] = self._counters.get(key, 0) + int(value) + if help is not None: + self._help[key] = help + + def gauge_set(self, name: str, value: float, help: str | None = None) -> None: + """Устанавливает gauge в указанное значение.""" + key = _sanitize_name(name) + with self._lock: + self._gauges[key] = float(value) + if help is not None: + self._help[key] = help + + def render(self) -> bytes: + """Возвращает все метрики в Prometheus text format. UTF-8 bytes.""" + # Снимок под локом — потом форматируем без удержания лока, чтобы + # не блокировать producer'ов на длительный sprintf. + with self._lock: + counters = dict(self._counters) + gauges = dict(self._gauges) + helps = dict(self._help) + uptime = time.time() - self._created_at + + lines: list[str] = [] + + for name in sorted(counters): + help_text = helps.get(name, f"Counter {name}") + lines.append(f"# HELP {name} {help_text}") + lines.append(f"# TYPE {name} counter") + lines.append(f"{name} {counters[name]}") + + for name in sorted(gauges): + help_text = helps.get(name, f"Gauge {name}") + lines.append(f"# HELP {name} {help_text}") + lines.append(f"# TYPE {name} gauge") + lines.append(f"{name} {_format_float(gauges[name])}") + + # Автоматический uptime — всегда последним, чтобы порядок был стабильным. + lines.append("# HELP hopehash_uptime_seconds Seconds since metrics registry created") + lines.append("# TYPE hopehash_uptime_seconds gauge") + lines.append(f"hopehash_uptime_seconds {_format_float(uptime)}") + + # Финальный перевод строки — Prometheus exposition требует, чтобы + # последняя метрика заканчивалась ``\n``. + return ("\n".join(lines) + "\n").encode("utf-8") + + +def _make_handler(metrics: Metrics) -> type[BaseHTTPRequestHandler]: + """Фабрика handler-класса с ``metrics`` через замыкание — без глобалов.""" + + class _Handler(BaseHTTPRequestHandler): + def do_GET(self) -> None: # noqa: N802 — имя задано базовым классом + if self.path != "/metrics": + self.send_error(404) + return + body = metrics.render() + self.send_response(200) + self.send_header("Content-Type", "text/plain; version=0.0.4; charset=utf-8") + self.send_header("Content-Length", str(len(body))) + self.end_headers() + self.wfile.write(body) + + def log_message(self, format: str, *args: object) -> None: # noqa: A002 + # Подавляем стандартный stderr-лог BaseHTTPRequestHandler: + # в проекте единый канал — logger "hope_hash", и засорять его + # каждым GET /metrics не нужно. + return + + return _Handler + + +class MetricsServer: + """HTTP-сервер для ``/metrics`` на отдельной нити. Старт/стоп идемпотентны.""" + + def __init__(self, metrics: Metrics, host: str = "127.0.0.1", port: int = 9090) -> None: + self.metrics = metrics + self.host = host + self.port = port + self._server: ThreadingHTTPServer | None = None + self._thread: threading.Thread | None = None + # Лок защищает start/stop от гонки, если их зовут из разных нитей. + self._lifecycle_lock = threading.Lock() + + def start(self) -> None: + """Запускает сервер в фоновой нити. Идемпотентен.""" + with self._lifecycle_lock: + if self._server is not None: + # Уже запущен — ничего не делаем, чтобы не порвать рабочий + # сокет повторным bind'ом. + return + handler_cls = _make_handler(self.metrics) + self._server = ThreadingHTTPServer((self.host, self.port), handler_cls) + self._thread = threading.Thread( + target=self._server.serve_forever, + name=f"hope_hash-metrics-{self.port}", + daemon=True, + ) + self._thread.start() + logger.info("[metrics] сервер запущен на %s", self.url) + + def stop(self, timeout: float = 2.0) -> None: + """Останавливает сервер. Идемпотентен.""" + with self._lifecycle_lock: + server = self._server + thread = self._thread + self._server = None + self._thread = None + + if server is not None: + # shutdown() блокирует serve_forever и ждёт его выхода, + # server_close() закрывает listening-сокет. + server.shutdown() + server.server_close() + if thread is not None: + thread.join(timeout=timeout) + if server is not None: + logger.info("[metrics] сервер остановлен") + + @property + def url(self) -> str: + return f"http://{self.host}:{self.port}/metrics" diff --git a/src/hope_hash/miner.py b/src/hope_hash/miner.py index 7181e0b..ebd56ea 100644 --- a/src/hope_hash/miner.py +++ b/src/hope_hash/miner.py @@ -1,12 +1,16 @@ """Главный цикл хеширования mine() и сетевой супервизор переподключений.""" import socket -import struct import threading import time +from typing import Optional from ._logging import logger from .block import build_merkle_root, difficulty_to_target, double_sha256, swap_words +from .metrics import Metrics +from .notifier import TelegramNotifier +from .parallel import start_pool, stop_pool +from .storage import ShareStore from .stratum import StratumClient @@ -64,10 +68,63 @@ def supervisor_loop(client: StratumClient): # ─────────────────────── основной майнинг-цикл ─────────────────────── -def mine(client: StratumClient, stop_event: threading.Event): - hashes = 0 - last_print = time.time() +def _build_header_base(job: dict, extranonce1: str, extranonce2: str) -> bytes: + """ + Собирает 76-байтовый префикс block header (всё, кроме nonce). + + Вынесено в отдельную функцию: воркеры nonce-loop теперь в parallel.py, + а main process только формирует header_base и отдаёт его в пул. + """ + coinbase_hex = job["coinb1"] + extranonce1 + extranonce2 + job["coinb2"] + coinbase_hash = double_sha256(bytes.fromhex(coinbase_hex)) + merkle_root = build_merkle_root(coinbase_hash, job["merkle_branch"]) + return ( + bytes.fromhex(job["version"])[::-1] + # 4 b version (LE) + swap_words(job["prevhash"]) + # 32 b prev hash (word-swap) + merkle_root + # 32 b merkle (LE) + bytes.fromhex(job["ntime"])[::-1] + # 4 b ntime (LE) + bytes.fromhex(job["nbits"])[::-1] # 4 b nbits (LE) + ) + + +def _format_rate(rate: float) -> str: + """Человекочитаемый хешрейт: H/s → KH/s → MH/s.""" + if rate < 1000: + return f"{rate:.0f} H/s" + if rate < 1_000_000: + return f"{rate/1000:.2f} KH/s" + return f"{rate/1_000_000:.2f} MH/s" + + +def mine( + client: StratumClient, + stop_event: threading.Event, + n_workers: int = 1, + store: Optional[ShareStore] = None, + metrics: Optional[Metrics] = None, + notifier: Optional[TelegramNotifier] = None, +): + """ + Оркестратор пула воркеров. + + Логика: + 1. Ждём первый job, собираем header_base. + 2. Поднимаем пул (start_pool) с уникальным extranonce2. + 3. В цикле: читаем found_queue, считаем EMA-хешрейт, следим за сменой job. + 4. При смене job_id (или stop_event) — gracefully гасим пул и идём на 1. + + EMA: alpha=0.3, окно ~5с. Сэмпл = (current_counter - prev_counter) / dt. + Счётчик не сбрасываем — храним предыдущее значение и считаем дельту, + так точнее и не надо синхронизироваться с воркерами через Lock. + + Опциональные наблюдатели (store/metrics/notifier) подключаются хуками + на ключевые события: найденный шар → запись в БД + counter в Prometheus + + уведомление в Telegram; EMA-хешрейт → gauge. + """ extranonce2_counter = 0 + ema = 0.0 + alpha = 0.3 + report_interval = 5.0 while not stop_event.is_set(): with client.job_lock: @@ -79,65 +136,103 @@ def mine(client: StratumClient, stop_event: threading.Event): continue # extranonce2 — наша часть coinbase, чтобы каждый воркер крутил уникальные хеши. + # Поднимается на каждый job; в пределах одного job всё nonce-пространство + # делится между процессами. extranonce2 = f"{extranonce2_counter:0{en2_size * 2}x}" extranonce2_counter += 1 - # Собираем coinbase = coinb1 + extranonce1 + extranonce2 + coinb2 - coinbase_hex = job["coinb1"] + en1 + extranonce2 + job["coinb2"] - coinbase_hash = double_sha256(bytes.fromhex(coinbase_hex)) - - # Считаем merkle root через ветки от пула - merkle_root = build_merkle_root(coinbase_hash, job["merkle_branch"]) - - # Базовый block header без nonce (76 байт = 80 - 4) - header_base = ( - bytes.fromhex(job["version"])[::-1] + # 4 b version (LE) - swap_words(job["prevhash"]) + # 32 b prev hash (word-swap) - merkle_root + # 32 b merkle (LE) - bytes.fromhex(job["ntime"])[::-1] + # 4 b ntime (LE) - bytes.fromhex(job["nbits"])[::-1] # 4 b nbits (LE) - ) - + header_base = _build_header_base(job, en1, extranonce2) target = difficulty_to_target(client.difficulty) current_job_id = job["job_id"] - # Перебор nonce: 0 .. 2^32 - 1 - for nonce in range(0, 0xFFFFFFFF): - if stop_event.is_set(): - return + processes, found_queue, hashes_counter, mp_stop = start_pool( + n_workers, header_base, target, extranonce2, + ) - # Если пришла свежая работа — выходим, чтобы не тратить время на старую - if hashes & 0x3FFF == 0: # каждые 16k хешей дёргаем lock, чтобы не душить нить + prev_count = 0 + last_report = time.time() + last_alive_check = time.time() + + # ─── основной цикл одного job ─── + try: + while not stop_event.is_set(): + # 1. Не блокирующее чтение находок. + try: + while True: + nonce_hex, hash_hex, en2 = found_queue.get_nowait() + logger.warning( + f"[mine] !!! НАЙДЕН ШАР !!! nonce={nonce_hex} hash={hash_hex}" + ) + try: + client.submit(current_job_id, en2, job["ntime"], nonce_hex) + except (OSError, AttributeError) as e: + # submit может прийтись на момент reconnect — не валим майнер. + logger.warning(f"[stratum] не удалось отправить шар: {e}") + # Хуки наблюдателей. Все опциональны — None означает disabled. + if store is not None: + store.record_share( + job_id=current_job_id, nonce_hex=nonce_hex, + hash_hex=hash_hex, difficulty=client.difficulty, + accepted=True, + ) + if metrics is not None: + metrics.counter_inc( + "hopehash_shares_total", 1, + help="Total shares submitted (accepted by client side)", + ) + if notifier is not None: + notifier.notify_share_accepted( + job_id=current_job_id, difficulty=client.difficulty, + ) + except Exception: + # Empty/queue closed — единственный нормальный путь выхода из while. + pass + + now = time.time() + + # 2. EMA-хешрейт. + if now - last_report >= report_interval: + with hashes_counter.get_lock(): + cur = hashes_counter.value + sample = (cur - prev_count) / (now - last_report) + ema = sample if ema == 0.0 else alpha * sample + (1 - alpha) * ema + logger.info( + f"[stats] хешрейт ≈ {_format_rate(ema)} " + f"(окно {_format_rate(sample)}) | " + f"pool diff = {client.difficulty} | workers = {len(processes)}" + ) + if metrics is not None: + metrics.gauge_set( + "hopehash_hashrate_hps", ema, + help="Current EMA hashrate in hashes per second", + ) + metrics.gauge_set( + "hopehash_pool_difficulty", float(client.difficulty), + help="Current pool difficulty", + ) + metrics.gauge_set( + "hopehash_workers", float(len(processes)), + help="Number of active worker processes", + ) + prev_count = cur + last_report = now + + # 3. Смена job — выходим из цикла, чтобы пересоздать пул. with client.job_lock: - if not client.current_job or client.current_job["job_id"] != current_job_id: + cj = client.current_job + if not cj or cj["job_id"] != current_job_id: + logger.info(f"[mine] job сменился ({current_job_id} → " + f"{cj['job_id'] if cj else 'None'}), рестарт пула") + break + + # 4. Все воркеры исчерпали nonce-пространство? + if now - last_alive_check >= 1.0: + if not any(p.is_alive() for p in processes): + logger.info("[pool] все воркеры исчерпали nonce — берём новый extranonce2") break + last_alive_check = now - header = header_base + struct.pack("I", nonce).hex() - logger.warning( - f"[mine] !!! НАЙДЕН ШАР !!! nonce={nonce_hex} hash={h[::-1].hex()}" - ) - try: - client.submit(current_job_id, extranonce2, job["ntime"], nonce_hex) - except (OSError, AttributeError) as e: - # submit может прийтись на момент reconnect — не валим майнер. - logger.warning(f"[stratum] не удалось отправить шар: {e}") - - hashes += 1 - - # Каждые 5 секунд — статистика - now = time.time() - if now - last_print >= 5.0: - rate = hashes / (now - last_print) - rate_str = f"{rate:.0f} H/s" if rate < 1000 else f"{rate/1000:.2f} KH/s" - logger.info( - f"[stats] хешрейт ≈ {rate_str} | pool diff = {client.difficulty}" - ) - hashes = 0 - last_print = now + # Дёшево спим, чтобы не жечь main CPU на busy-loop. + time.sleep(0.05) + finally: + stop_pool(processes, found_queue, mp_stop) diff --git a/src/hope_hash/notifier.py b/src/hope_hash/notifier.py new file mode 100644 index 0000000..a22da9b --- /dev/null +++ b/src/hope_hash/notifier.py @@ -0,0 +1,167 @@ +"""Telegram-уведомления через stdlib urllib. Без зависимостей. + +Использование: + notifier = TelegramNotifier(token="123:ABC", chat_id="456789") + notifier.notify("Майнер стартовал") + notifier.notify_share_accepted(job_id="abc", difficulty=1.0) + notifier.notify_block_found(hash_hex="00000000...", height=999999) + notifier.shutdown() # дренирует очередь, закрывает воркер-нить + +Если token или chat_id не заданы — все методы становятся no-op (silent disable). +Это позволяет интегрировать notifier в miner без обязательной настройки. + +Архитектура: фоновая нить-воркер тащит сообщения из `queue.Queue` и шлёт +на Telegram Bot API через urllib. Сетевые вызовы не блокируют hot-path +майнера; при переполнении очереди сообщения отбрасываются с warning. +""" + +import logging +import os +import queue +import threading +import urllib.error +import urllib.parse +import urllib.request +from typing import Optional + +logger = logging.getLogger("hope_hash") + +# Шаблон URL Telegram Bot API. sendMessage принимает form-encoded body. +TELEGRAM_API = "https://api.telegram.org/bot{token}/sendMessage" + + +class TelegramNotifier: + """Асинхронный отправщик сообщений в Telegram. + + Работает через background thread + очередь, чтобы сетевые вызовы + не блокировали майнинг-цикл. Если token или chat_id отсутствуют + (None или пустая строка) — все методы превращаются в no-op. + """ + + def __init__( + self, + token: Optional[str] = None, + chat_id: Optional[str] = None, + timeout: float = 5.0, + queue_maxsize: int = 100, + ): + # Disabled mode: явно проверяем оба значения. Пустая строка трактуется + # как «не задано» — это удобнее, чем падать на пустом env-var. + self.enabled = bool(token) and bool(chat_id) + self.token = token or "" + self.chat_id = chat_id or "" + self.timeout = timeout + self._queue: queue.Queue = queue.Queue(maxsize=queue_maxsize) + self._stop_event = threading.Event() + self._thread: Optional[threading.Thread] = None + if self.enabled: + # daemon=True — чтобы при жёстком SIGKILL процесс не висел из-за нити. + # Корректное завершение всё равно через shutdown(). + self._thread = threading.Thread( + target=self._worker, + daemon=True, + name="telegram-notifier", + ) + self._thread.start() + logger.info("[telegram] notifier активен (chat=%s)", self.chat_id) + else: + logger.info("[telegram] notifier отключён (нет token/chat_id)") + + @classmethod + def from_env(cls) -> "TelegramNotifier": + """Читает HOPE_HASH_TELEGRAM_TOKEN и HOPE_HASH_TELEGRAM_CHAT_ID из env.""" + return cls( + token=os.environ.get("HOPE_HASH_TELEGRAM_TOKEN"), + chat_id=os.environ.get("HOPE_HASH_TELEGRAM_CHAT_ID"), + ) + + def notify(self, text: str) -> None: + """Кладёт сообщение в очередь. No-op если disabled или очередь полна.""" + if not self.enabled: + return + try: + self._queue.put_nowait(text) + except queue.Full: + # Сознательно роняем сообщение, а не блокируем — для майнера + # потеря уведомления безопаснее, чем застывший hot-path. + logger.warning("[telegram] очередь переполнена, сообщение отброшено") + + def notify_started(self, btc_address: str, worker_name: str) -> None: + self.notify( + f"🟢 Hope-Hash запущен\nworker: {worker_name}\nadr: {btc_address[:8]}…" + ) + + def notify_stopped(self) -> None: + self.notify("🔴 Hope-Hash остановлен") + + def notify_disconnected(self, reason: str) -> None: + self.notify(f"⚠️ Потеря соединения: {reason}") + + def notify_reconnected(self) -> None: + self.notify("✅ Соединение восстановлено") + + def notify_share_accepted(self, job_id: str, difficulty: float) -> None: + self.notify(f"✓ Шар принят (job={job_id}, diff={difficulty})") + + def notify_block_found(self, hash_hex: str, height: Optional[int] = None) -> None: + h = f" #{height}" if height else "" + self.notify(f"🎉 БЛОК НАЙДЕН{h}!\nhash: {hash_hex[:32]}…") + + def shutdown(self, timeout: float = 5.0) -> None: + """Дренирует очередь и останавливает воркер. Идемпотентно.""" + if not self.enabled or self._thread is None: + return + # Сначала ждём, пока воркер обработает все ранее положенные сообщения. + # queue.join() блокируется до тех пор, пока на каждый put не сделан task_done. + self._queue.join() + # Теперь сигналим воркеру выйти и пробуждаем его sentinel-ом None. + self._stop_event.set() + try: + self._queue.put_nowait(None) + except queue.Full: + # Очередь заполнена — но раз join вернулся, воркер уже опустошил всё + # и сейчас ждёт на get(); put_nowait не должен падать. На всякий случай + # используем put с таймаутом как fallback. + try: + self._queue.put(None, timeout=1.0) + except queue.Full: + pass + self._thread.join(timeout=timeout) + self._thread = None + + def _worker(self) -> None: + """Фоновая нить: тащит из очереди и шлёт в Telegram API.""" + while True: + item = self._queue.get() + try: + # None — sentinel для остановки. Stop_event дублирует на случай + # если кто-то выставил флаг без отправки sentinel-а. + if item is None or self._stop_event.is_set() and item is None: + return + self._send(item) + except Exception as e: + # Сетевые сбои/HTTP-ошибки/таймауты — логируем и идём дальше. + # Не даём упасть нити: иначе следующие notify() будут уходить в пустоту. + logger.warning("[telegram] ошибка отправки: %s", e) + finally: + # task_done обязателен на КАЖДЫЙ get (включая sentinel), + # иначе queue.join() в shutdown() зависнет. + self._queue.task_done() + + def _send(self, text: str) -> None: + """Один HTTP POST на api.telegram.org. urllib + urlencode.""" + url = TELEGRAM_API.format(token=self.token) + data = urllib.parse.urlencode( + { + "chat_id": self.chat_id, + "text": text, + "disable_web_page_preview": "true", + } + ).encode("utf-8") + req = urllib.request.Request(url, data=data, method="POST") + with urllib.request.urlopen(req, timeout=self.timeout) as resp: + # urlopen уже бросает HTTPError на 4xx/5xx, но проверим status явно + # на случай нестандартных кодов. + if resp.status >= 400: + raise RuntimeError(f"Telegram API status {resp.status}") + logger.debug("[telegram] отправлено: %r", text[:60]) diff --git a/src/hope_hash/parallel.py b/src/hope_hash/parallel.py new file mode 100644 index 0000000..83e5505 --- /dev/null +++ b/src/hope_hash/parallel.py @@ -0,0 +1,172 @@ +"""Параллельный перебор nonce через multiprocessing. + +Архитектура (см. ROADMAP «Уровень 1 / Производительность»): + +- Сетевая часть (StratumClient) остаётся в main process. Worker-процессы + не ходят в сеть — это убирает гонки на сокете и даёт чистый pickle-able + интерфейс «header_base + диапазон nonce → найденные шары в очередь». +- Пул воркеров пересоздаётся при каждом новом job (terminate + новый pool). + Это проще, чем делать persistent workers с IPC очередями для job updates, + и достаточно для учебного кода. +- Windows-совместимость: `multiprocessing` использует `spawn`, поэтому + `worker` должен быть top-level функцией без замыканий, а все аргументы — + pickle-able (bytes, int, multiprocessing.Queue/Value/Event). +""" + +import multiprocessing as mp +import struct +import time + +from ._logging import logger +from .block import double_sha256 + + +# Каждые столько хешей воркер сверяется со stop_event и инкрементирует +# общий счётчик. Чем чаще — тем дороже из-за блокировки на Lock у Value. +# 16k подобрано так же, как в одно-процессорной версии mine() для проверки job_id. +HASHES_PER_TICK = 1 << 14 # 16384 + + +def worker( + worker_id: int, + header_base: bytes, + target: int, + nonce_start: int, + nonce_end: int, + extranonce2: str, + found_queue: "mp.Queue", + hashes_counter: "mp.sharedctypes.Synchronized", + stop_event: "mp.synchronize.Event", +) -> None: + """ + Перебирает nonce в диапазоне [nonce_start, nonce_end), считает SHA256d. + + Если хеш ≤ target — кладёт ``(nonce_hex_be, hash_hex_be, extranonce2)`` + в ``found_queue``. Каждые ``HASHES_PER_TICK`` хешей: + - проверяет ``stop_event`` (рано выйти при смене job / Ctrl+C); + - инкрементирует ``hashes_counter`` (для расчёта EMA-хешрейта в main). + + Сам ``submit`` делается из main process — здесь только находка. + Сигнатура полностью pickle-able: bytes/int/str + примитивы mp. + """ + local_hashes = 0 # копим локально, чтобы реже дёргать Lock на Value + nonce = nonce_start + try: + while nonce < nonce_end: + # Хвост блока: 4 байта nonce, LE. + header = header_base + struct.pack("I", nonce).hex() + hash_hex = h[::-1].hex() + # put не блокирует процесс надолго: очередь почти всегда пуста. + found_queue.put((nonce_hex, hash_hex, extranonce2)) + + nonce += 1 + local_hashes += 1 + + if local_hashes >= HASHES_PER_TICK: + # Сливаем локальный счётчик в общий и проверяем stop. + with hashes_counter.get_lock(): + hashes_counter.value += local_hashes + local_hashes = 0 + if stop_event.is_set(): + return + finally: + # Не теряем хвостовые хеши: дольём в общий счётчик. + if local_hashes: + with hashes_counter.get_lock(): + hashes_counter.value += local_hashes + + +# ─────────────────────── оркестрация пула ─────────────────────── + + +def start_pool( + n_workers: int, + header_base: bytes, + target: int, + extranonce2: str, +) -> tuple: + """ + Поднимает ``n_workers`` процессов, делящих [0, 2^32) поровну. + + Возвращает кортеж ``(processes, found_queue, hashes_counter, stop_event)``. + Все объекты IPC создаются здесь, чтобы main process был их единственным + владельцем — это упрощает корректный teardown в ``stop_pool``. + """ + n_workers = max(1, int(n_workers)) + nonce_space = 1 << 32 + step = nonce_space // n_workers + + found_queue: mp.Queue = mp.Queue() + hashes_counter = mp.Value("Q", 0) # uint64, lock=True по умолчанию + stop_event = mp.Event() + + processes: list[mp.Process] = [] + for i in range(n_workers): + nonce_start = i * step + # Последнему отдаём «остаток», чтобы покрыть всё пространство nonce. + nonce_end = nonce_space if i == n_workers - 1 else (i + 1) * step + p = mp.Process( + target=worker, + name=f"hope-hash-worker-{i}", + args=( + i, header_base, target, nonce_start, nonce_end, + extranonce2, found_queue, hashes_counter, stop_event, + ), + daemon=False, + ) + p.start() + processes.append(p) + + logger.info( + f"[pool] стартовал {n_workers} воркер(ов), " + f"шаг nonce-пространства = {step:#x}" + ) + return processes, found_queue, hashes_counter, stop_event + + +def stop_pool( + processes: list, + found_queue: "mp.Queue", + stop_event: "mp.synchronize.Event", + join_timeout: float = 5.0, +) -> None: + """ + Аккуратно гасит пул: set stop_event → join → terminate (если зависли) → + drain очереди. Очередь обязательно нужно осушить ДО join, иначе + дочерние процессы могут заблокироваться на ``Queue.put`` под капотом + feeder-нити (deadlock на Windows встречается особенно охотно). + """ + stop_event.set() + + # Сначала пытаемся вытащить «уже найденные» шары — чтобы не потерялись. + drained: list[tuple] = [] + deadline = time.time() + 0.2 + while time.time() < deadline: + try: + drained.append(found_queue.get_nowait()) + except Exception: + break + + for p in processes: + p.join(timeout=join_timeout) + for p in processes: + if p.is_alive(): + logger.warning(f"[pool] {p.name} не вышел за {join_timeout}с — terminate") + p.terminate() + p.join(timeout=2.0) + + # Закрываем queue, чтобы освободить ресурсы (важно на Windows). + try: + found_queue.close() + found_queue.join_thread() + except Exception: + pass + + if drained: + logger.info(f"[pool] при остановке слили {len(drained)} находок из очереди") diff --git a/src/hope_hash/storage.py b/src/hope_hash/storage.py new file mode 100644 index 0000000..3692432 --- /dev/null +++ b/src/hope_hash/storage.py @@ -0,0 +1,175 @@ +"""Persistent журнал принятых шаров и найденных блоков. SQLite, stdlib only.""" + +import logging +import sqlite3 +import threading +import time +from pathlib import Path + +# Единый logger пакета (см. _logging.py). Тэг [storage] добавляем в сообщения. +logger = logging.getLogger("hope_hash") + +DEFAULT_DB_PATH = Path("hope_hash.db") + +# Схема описывает две таблицы: shares (журнал хешей) и sessions (запуски майнера). +# `IF NOT EXISTS` делает инициализацию идемпотентной — можно открывать БД повторно. +_SCHEMA = """ +CREATE TABLE IF NOT EXISTS shares ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + ts REAL NOT NULL, -- unix timestamp (time.time()) + job_id TEXT NOT NULL, + nonce_hex TEXT NOT NULL, + hash_hex TEXT NOT NULL, + difficulty REAL NOT NULL, + accepted INTEGER NOT NULL, -- 0/1 + is_block INTEGER NOT NULL DEFAULT 0 -- 0/1, true = настоящий блок (не шар) +); +CREATE INDEX IF NOT EXISTS idx_shares_ts ON shares(ts); +CREATE INDEX IF NOT EXISTS idx_shares_accepted ON shares(accepted); + +CREATE TABLE IF NOT EXISTS sessions ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + started_at REAL NOT NULL, + ended_at REAL, + pool_host TEXT NOT NULL, + btc_address TEXT NOT NULL, + worker_name TEXT NOT NULL +); +""" + + +class ShareStore: + """Потокобезопасный фасад над SQLite. Ленивая инициализация схемы.""" + + def __init__(self, db_path: str | Path = DEFAULT_DB_PATH): + self.db_path = Path(db_path) + # Один лок на всё соединение: SQLite сам сериализует запись, но мы хотим + # ещё и атомарность последовательностей execute+commit на уровне Python. + self._lock = threading.Lock() + # check_same_thread=False — соединение шарим между нитями майнера. + self._conn = sqlite3.connect(str(self.db_path), check_same_thread=False) + # WAL включаем для параллельных читателей и более устойчивых записей. + # Если БД на платформе/ФС, где WAL не работает — падать не должны. + try: + self._conn.execute("PRAGMA journal_mode=WAL") + except sqlite3.DatabaseError: + pass + self._conn.executescript(_SCHEMA) + self._conn.commit() + self._closed = False + logger.info("[storage] открыта БД %s", self.db_path) + + def record_share( + self, + job_id: str, + nonce_hex: str, + hash_hex: str, + difficulty: float, + accepted: bool = True, + is_block: bool = False, + ts: float | None = None, + ) -> int: + """Записывает шар. Возвращает id записи.""" + with self._lock: + cur = self._conn.execute( + "INSERT INTO shares (ts, job_id, nonce_hex, hash_hex, difficulty, accepted, is_block) " + "VALUES (?, ?, ?, ?, ?, ?, ?)", + ( + ts if ts is not None else time.time(), + job_id, + nonce_hex, + hash_hex, + difficulty, + int(accepted), + int(is_block), + ), + ) + self._conn.commit() + row_id = cur.lastrowid + # Лог за пределами лока — sqlite-write не должен блокироваться форматом. + if is_block: + logger.info("[storage] BLOCK! job=%s id=%s", job_id, row_id) + else: + logger.info( + "[storage] share job=%s accepted=%s id=%s", job_id, accepted, row_id + ) + return row_id + + def start_session(self, pool_host: str, btc_address: str, worker_name: str) -> int: + """Регистрирует начало сессии. Возвращает session_id.""" + with self._lock: + cur = self._conn.execute( + "INSERT INTO sessions (started_at, pool_host, btc_address, worker_name) " + "VALUES (?, ?, ?, ?)", + (time.time(), pool_host, btc_address, worker_name), + ) + self._conn.commit() + session_id = cur.lastrowid + logger.info( + "[storage] начата сессия id=%s pool=%s worker=%s", + session_id, + pool_host, + worker_name, + ) + return session_id + + def end_session(self, session_id: int) -> None: + """Помечает сессию как завершённую.""" + with self._lock: + self._conn.execute( + "UPDATE sessions SET ended_at=? WHERE id=?", + (time.time(), session_id), + ) + self._conn.commit() + logger.info("[storage] завершена сессия id=%s", session_id) + + def total_shares(self, accepted_only: bool = True) -> int: + """Сколько шаров записано (всего/принятых).""" + with self._lock: + if accepted_only: + cur = self._conn.execute("SELECT COUNT(*) FROM shares WHERE accepted=1") + else: + cur = self._conn.execute("SELECT COUNT(*) FROM shares") + (count,) = cur.fetchone() + return int(count) + + def shares_per_hour(self, hours: int = 24) -> float: + """Среднее число шаров в час за последние N часов.""" + if hours <= 0: + return 0.0 + cutoff = time.time() - hours * 3600 + with self._lock: + cur = self._conn.execute( + "SELECT COUNT(*) FROM shares WHERE accepted=1 AND ts >= ?", + (cutoff,), + ) + (count,) = cur.fetchone() + if count == 0: + return 0.0 + return float(count) / float(hours) + + def close(self) -> None: + """Закрывает соединение. Идемпотентно.""" + # Проверка флага без лока: повторный close() из другой нити безопасен, + # потому что sqlite3.Connection.close() сам потокобезопасен, а двойной + # commit на закрытом соединении ловится try/except ниже. + if self._closed: + return + try: + with self._lock: + if self._closed: + return + try: + self._conn.commit() + except sqlite3.Error: + pass + try: + self._conn.close() + except sqlite3.Error: + pass + self._closed = True + except Exception: + # Закрытие должно быть максимально терпимым: не маскируем баги, но + # не падаем в финализаторах вызывающего кода. + self._closed = True + logger.info("[storage] БД закрыта %s", self.db_path) diff --git a/tests/test_metrics.py b/tests/test_metrics.py new file mode 100644 index 0000000..95cdb60 --- /dev/null +++ b/tests/test_metrics.py @@ -0,0 +1,156 @@ +"""Юнит-тесты для metrics — Prometheus экспортёр.""" + +import socket +import unittest +import urllib.error +import urllib.request + +from hope_hash.metrics import Metrics, MetricsServer + + +def _free_port() -> int: + """Захватывает свободный порт у ОС, возвращает его. + + Между close() и последующим bind() есть микро-окно гонки, но для + локальных юнит-тестов это допустимо. + """ + s = socket.socket() + s.bind(("127.0.0.1", 0)) + port = s.getsockname()[1] + s.close() + return port + + +class TestMetricsRegistry(unittest.TestCase): + def test_counter_increments(self) -> None: + m = Metrics() + m.counter_inc("foo", 1) + m.counter_inc("foo", 2) + body = m.render().decode() + self.assertIn("foo 3", body) + + def test_counter_default_step_is_one(self) -> None: + m = Metrics() + m.counter_inc("hits") + m.counter_inc("hits") + m.counter_inc("hits") + self.assertIn("hits 3", m.render().decode()) + + def test_counter_rejects_negative(self) -> None: + m = Metrics() + with self.assertRaises(ValueError): + m.counter_inc("bad", -1) + + def test_gauge_overwrites(self) -> None: + m = Metrics() + m.gauge_set("hashrate", 100.0) + m.gauge_set("hashrate", 152334.5) + body = m.render().decode() + self.assertIn("hashrate 152334.5", body) + self.assertNotIn("hashrate 100", body) + + def test_render_includes_help_and_type(self) -> None: + m = Metrics() + m.counter_inc("shares_total", 1, help="Total accepted shares") + m.gauge_set("hashrate_hps", 42.0, help="Current hashrate in H/s") + body = m.render().decode() + self.assertIn("# HELP shares_total Total accepted shares", body) + self.assertIn("# TYPE shares_total counter", body) + self.assertIn("# HELP hashrate_hps Current hashrate in H/s", body) + self.assertIn("# TYPE hashrate_hps gauge", body) + + def test_render_includes_uptime(self) -> None: + m = Metrics() + body = m.render().decode() + self.assertIn("# TYPE hopehash_uptime_seconds gauge", body) + self.assertIn("hopehash_uptime_seconds ", body) + + def test_render_returns_bytes(self) -> None: + m = Metrics() + m.counter_inc("foo") + out = m.render() + self.assertIsInstance(out, bytes) + # должно валидно декодироваться в UTF-8 + out.decode("utf-8") + + def test_invalid_metric_name_sanitized(self) -> None: + m = Metrics() + m.counter_inc("foo-bar.baz", 7) + body = m.render().decode() + self.assertIn("foo_bar_baz 7", body) + self.assertNotIn("foo-bar.baz", body) + + def test_thread_safety_concurrent_inc(self) -> None: + # Параллельные инкременты не должны терять обновления. + import threading + + m = Metrics() + + def worker() -> None: + for _ in range(1000): + m.counter_inc("c") + + threads = [threading.Thread(target=worker) for _ in range(5)] + for t in threads: + t.start() + for t in threads: + t.join() + self.assertIn("c 5000", m.render().decode()) + + +class TestMetricsServer(unittest.TestCase): + def setUp(self) -> None: + self.metrics = Metrics() + self.metrics.counter_inc("test_counter", 5, help="test counter") + self.server = MetricsServer(self.metrics, port=_free_port()) + self.server.start() + + def tearDown(self) -> None: + self.server.stop() + + def test_metrics_endpoint(self) -> None: + with urllib.request.urlopen(self.server.url, timeout=2) as resp: + body = resp.read().decode() + status = resp.status + ctype = resp.headers.get("Content-Type", "") + self.assertEqual(status, 200) + self.assertIn("test_counter 5", body) + self.assertIn("text/plain", ctype) + self.assertIn("version=0.0.4", ctype) + + def test_other_path_returns_404(self) -> None: + url = self.server.url.replace("/metrics", "/foo") + with self.assertRaises(urllib.error.HTTPError) as ctx: + urllib.request.urlopen(url, timeout=2) + self.assertEqual(ctx.exception.code, 404) + + def test_root_path_returns_404(self) -> None: + url = f"http://{self.server.host}:{self.server.port}/" + with self.assertRaises(urllib.error.HTTPError) as ctx: + urllib.request.urlopen(url, timeout=2) + self.assertEqual(ctx.exception.code, 404) + + def test_stop_is_idempotent(self) -> None: + self.server.stop() + self.server.stop() # повторный stop не должен падать + + def test_double_start_is_idempotent(self) -> None: + # уже запущен в setUp; повторный start не должен падать или ронять сервер. + self.server.start() + with urllib.request.urlopen(self.server.url, timeout=2) as resp: + self.assertEqual(resp.status, 200) + + def test_metrics_reflect_live_updates(self) -> None: + # После старта сервер должен отдавать актуальное состояние регистра. + self.metrics.gauge_set("live_gauge", 99.5) + with urllib.request.urlopen(self.server.url, timeout=2) as resp: + body = resp.read().decode() + self.assertIn("live_gauge 99.5", body) + + def test_url_property(self) -> None: + self.assertTrue(self.server.url.startswith("http://127.0.0.1:")) + self.assertTrue(self.server.url.endswith("/metrics")) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_notifier.py b/tests/test_notifier.py new file mode 100644 index 0000000..a13c05b --- /dev/null +++ b/tests/test_notifier.py @@ -0,0 +1,266 @@ +"""Юнит-тесты для notifier — Telegram через urllib. + +Сетевые вызовы НЕ делаем. Мокаем urllib.request.urlopen через unittest.mock. +""" + +import os +import unittest +import urllib.error +from unittest.mock import patch, MagicMock + +from hope_hash.notifier import TelegramNotifier + + +def _make_mock_response(status: int = 200) -> MagicMock: + """Хелпер: собирает мок-объект, поддерживающий with-протокол urlopen.""" + mock_resp = MagicMock() + mock_resp.status = status + mock_resp.__enter__ = MagicMock(return_value=mock_resp) + mock_resp.__exit__ = MagicMock(return_value=None) + return mock_resp + + +class TestNotifierDisabled(unittest.TestCase): + """Поведение в disabled-режиме: всё должно быть no-op без падений.""" + + def test_no_token_disabled(self): + n = TelegramNotifier(token=None, chat_id="x") + self.assertFalse(n.enabled) + n.notify("hello") # no-op + n.shutdown() + + def test_empty_chat_id_disabled(self): + # Пустая строка — это «не задано» по нашему контракту. + n = TelegramNotifier(token="123:ABC", chat_id="") + self.assertFalse(n.enabled) + n.notify("hello") + n.shutdown() + + def test_both_none_disabled(self): + n = TelegramNotifier(token=None, chat_id=None) + self.assertFalse(n.enabled) + n.shutdown() + + def test_disabled_methods_dont_throw(self): + # Все notify_* методы должны не падать в disabled-режиме. + n = TelegramNotifier() + self.assertFalse(n.enabled) + n.notify("plain") + n.notify_started("bc1qxyz...", "worker1") + n.notify_stopped() + n.notify_disconnected("timeout") + n.notify_reconnected() + n.notify_share_accepted("jobA", 1.5) + n.notify_block_found("0000abcd" * 8, 999999) + n.notify_block_found("0000abcd" * 8) # без height + n.shutdown() + # Повторный shutdown идемпотентен. + n.shutdown() + + def test_from_env_disabled_when_no_vars(self): + # Без переменных окружения — disabled. + with patch.dict(os.environ, {}, clear=True): + n = TelegramNotifier.from_env() + self.assertFalse(n.enabled) + n.shutdown() + + def test_from_env_enabled_when_vars_set(self): + env = { + "HOPE_HASH_TELEGRAM_TOKEN": "111:AAA", + "HOPE_HASH_TELEGRAM_CHAT_ID": "999", + } + with patch.dict(os.environ, env, clear=True): + with patch("hope_hash.notifier.urllib.request.urlopen") as mock_urlopen: + mock_urlopen.return_value = _make_mock_response() + n = TelegramNotifier.from_env() + self.assertTrue(n.enabled) + self.assertEqual(n.token, "111:AAA") + self.assertEqual(n.chat_id, "999") + n.shutdown() + + +class TestNotifierEnabled(unittest.TestCase): + """Поведение в enabled-режиме: проверяем формат вызовов и текстов.""" + + @patch("hope_hash.notifier.urllib.request.urlopen") + def test_send_calls_telegram_api(self, mock_urlopen): + mock_urlopen.return_value = _make_mock_response() + + n = TelegramNotifier(token="123:ABC", chat_id="456") + n.notify("test message") + n.shutdown() + + self.assertTrue(mock_urlopen.called) + req = mock_urlopen.call_args[0][0] + # URL содержит токен — это полный путь к sendMessage. + self.assertIn("123:ABC", req.full_url) + self.assertIn("api.telegram.org", req.full_url) + # Метод POST. + self.assertEqual(req.get_method(), "POST") + # Body содержит chat_id и text в urlencoded виде. + body = req.data.decode("utf-8") + self.assertIn("chat_id=456", body) + self.assertIn("text=test+message", body) + + @patch("hope_hash.notifier.urllib.request.urlopen") + def test_share_accepted_message_format(self, mock_urlopen): + mock_urlopen.return_value = _make_mock_response() + + n = TelegramNotifier(token="t", chat_id="c") + n.notify_share_accepted(job_id="abc123", difficulty=2.5) + n.shutdown() + + body = mock_urlopen.call_args[0][0].data.decode("utf-8") + # urlencode заменяет пробелы на + и не-ASCII на %XX, + # достаточно проверить ключевые подстроки. + self.assertIn("abc123", body) + self.assertIn("2.5", body) + + @patch("hope_hash.notifier.urllib.request.urlopen") + def test_block_found_message_format(self, mock_urlopen): + mock_urlopen.return_value = _make_mock_response() + + n = TelegramNotifier(token="t", chat_id="c") + block_hash = "00000000000000000007abc123def456" + "ff" * 16 + n.notify_block_found(hash_hex=block_hash, height=800000) + n.shutdown() + + body = mock_urlopen.call_args[0][0].data.decode("utf-8") + # Высота попадает в текст. + self.assertIn("800000", body) + # Префикс хеша попадает (первые 32 hex-символа). + self.assertIn(block_hash[:32], body) + + @patch("hope_hash.notifier.urllib.request.urlopen") + def test_block_found_without_height(self, mock_urlopen): + mock_urlopen.return_value = _make_mock_response() + + n = TelegramNotifier(token="t", chat_id="c") + n.notify_block_found(hash_hex="deadbeef" * 8) + n.shutdown() + + self.assertTrue(mock_urlopen.called) + + @patch("hope_hash.notifier.urllib.request.urlopen") + def test_shutdown_drains_queue(self, mock_urlopen): + # Кладём 5 сообщений → shutdown → проверяем что все 5 ушли. + mock_urlopen.return_value = _make_mock_response() + + n = TelegramNotifier(token="t", chat_id="c") + for i in range(5): + n.notify(f"msg-{i}") + n.shutdown() + + # Каждый notify → один urlopen. Sentinel(None) уходит без HTTP-вызова. + self.assertEqual(mock_urlopen.call_count, 5) + + @patch("hope_hash.notifier.urllib.request.urlopen") + def test_send_failure_logged_not_raised(self, mock_urlopen): + # urlopen бросает URLError → notify не должен падать, + # ошибка должна быть в логе. + mock_urlopen.side_effect = urllib.error.URLError("network down") + + n = TelegramNotifier(token="t", chat_id="c") + with self.assertLogs("hope_hash", level="WARNING") as cm: + n.notify("will fail") + n.shutdown() + + # В логах есть упоминание ошибки отправки. + self.assertTrue( + any("ошибка отправки" in line for line in cm.output), + f"expected error log, got: {cm.output}", + ) + + @patch("hope_hash.notifier.urllib.request.urlopen") + def test_worker_survives_failure_and_continues(self, mock_urlopen): + # После одного сбоя следующее сообщение должно уйти — нить не падает. + mock_urlopen.side_effect = [ + urllib.error.URLError("transient"), + _make_mock_response(), + _make_mock_response(), + ] + + n = TelegramNotifier(token="t", chat_id="c") + n.notify("first-fails") + n.notify("second-ok") + n.notify("third-ok") + n.shutdown() + + # Все три попытки сделаны, нить не умерла на первой. + self.assertEqual(mock_urlopen.call_count, 3) + + @patch("hope_hash.notifier.urllib.request.urlopen") + def test_http_error_status_raised_internally(self, mock_urlopen): + # status >= 400 без HTTPError-исключения — наш код должен сам кинуть RuntimeError, + # которое поймает воркер и залогирует. + mock_urlopen.return_value = _make_mock_response(status=500) + + n = TelegramNotifier(token="t", chat_id="c") + with self.assertLogs("hope_hash", level="WARNING") as cm: + n.notify("oops") + n.shutdown() + + self.assertTrue( + any("ошибка отправки" in line for line in cm.output), + f"expected error log on 500, got: {cm.output}", + ) + + @patch("hope_hash.notifier.urllib.request.urlopen") + def test_queue_full_drops_message(self, mock_urlopen): + # Проверяем что переполнение очереди → сообщение отбрасывается с warning. + # Чтобы не зависеть от расписания воркера, тестируем put_nowait напрямую: + # заполняем очередь руками, затем убеждаемся что notify() ловит queue.Full. + import threading as _t + + gate_in = _t.Event() + gate_release = _t.Event() + + def blocking_open(*a, **kw): + # Сигналим тесту что воркер уже занят первым сообщением, + # затем висим до явного релиза. + gate_in.set() + gate_release.wait(timeout=3.0) + return _make_mock_response() + + mock_urlopen.side_effect = blocking_open + + n = TelegramNotifier(token="t", chat_id="c", queue_maxsize=2) + # Первое уходит в воркер: ждём подтверждения что он застрял в urlopen. + n.notify("a") + self.assertTrue(gate_in.wait(timeout=2.0), "worker did not start") + # Теперь воркер сидит в blocking_open, очередь пуста. + # Заполняем её до maxsize=2 — оба put должны пройти. + n.notify("b") + n.notify("c") + # Третий — гарантированно уже не влезет, ловим warning. + with self.assertLogs("hope_hash", level="WARNING") as cm: + n.notify("d-overflow") + + self.assertTrue( + any("переполнена" in line for line in cm.output), + f"expected overflow warning, got: {cm.output}", + ) + # Освобождаем воркер и корректно завершаем. + gate_release.set() + n.shutdown() + + @patch("hope_hash.notifier.urllib.request.urlopen") + def test_lifecycle_messages(self, mock_urlopen): + # Smoke-тест на все lifecycle-методы: started/stopped/disconnected/reconnected. + mock_urlopen.return_value = _make_mock_response() + + n = TelegramNotifier(token="t", chat_id="c") + n.notify_started("bc1qexampleaddr", "worker-7") + n.notify_disconnected("connection reset") + n.notify_reconnected() + n.notify_stopped() + n.shutdown() + + self.assertEqual(mock_urlopen.call_count, 4) + # Проверяем что worker-name попал в первое сообщение. + first_body = mock_urlopen.call_args_list[0][0][0].data.decode("utf-8") + self.assertIn("worker-7", first_body) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_storage.py b/tests/test_storage.py new file mode 100644 index 0000000..7f0d7c8 --- /dev/null +++ b/tests/test_storage.py @@ -0,0 +1,120 @@ +"""Юнит-тесты для storage.ShareStore — журнал шаров.""" + +import tempfile +import threading +import time +import unittest +from pathlib import Path + +from hope_hash.storage import ShareStore + + +class TestShareStore(unittest.TestCase): + def setUp(self): + # Каждый тест в своей временной директории — БД не утечёт в репо. + self.tmpdir = tempfile.TemporaryDirectory() + self.db_path = Path(self.tmpdir.name) / "test.db" + self.store = ShareStore(self.db_path) + + def tearDown(self): + self.store.close() + self.tmpdir.cleanup() + + def _share(self, accepted=True, is_block=False, ts=None, job_id="job1"): + """Хелпер: вставляет один шар с дефолтными значениями.""" + return self.store.record_share( + job_id=job_id, + nonce_hex="deadbeef", + hash_hex="00" * 32, + difficulty=1.0, + accepted=accepted, + is_block=is_block, + ts=ts, + ) + + def test_record_share_returns_id(self): + rid1 = self._share() + rid2 = self._share() + self.assertIsInstance(rid1, int) + self.assertEqual(rid2, rid1 + 1) + + def test_total_shares_initially_zero(self): + self.assertEqual(self.store.total_shares(), 0) + self.assertEqual(self.store.total_shares(accepted_only=False), 0) + + def test_total_shares_counts_accepted(self): + # 2 accepted + 1 rejected → total_shares(accepted_only=True) == 2 + self._share(accepted=True) + self._share(accepted=True) + self._share(accepted=False) + self.assertEqual(self.store.total_shares(accepted_only=True), 2) + self.assertEqual(self.store.total_shares(accepted_only=False), 3) + + def test_shares_per_hour_window(self): + now = time.time() + # Свежие — попадают в окно 24ч. + self._share(ts=now - 60) + self._share(ts=now - 600) + # Старый — за пределами окна 24ч (двое суток назад). + self._share(ts=now - 2 * 24 * 3600) + # 2 шара / 24 часа ≈ 0.0833. + rate = self.store.shares_per_hour(hours=24) + self.assertAlmostEqual(rate, 2.0 / 24.0, places=5) + + def test_shares_per_hour_zero_when_empty(self): + # Пустая БД и нулевые/отрицательные часы — должны давать 0.0 без ошибок. + self.assertEqual(self.store.shares_per_hour(hours=24), 0.0) + self.assertEqual(self.store.shares_per_hour(hours=0), 0.0) + + def test_session_lifecycle(self): + sid = self.store.start_session("pool.example:3333", "btc1qaddr", "worker1") + self.assertIsInstance(sid, int) + # Читаем напрямую через само store-соединение, чтобы не плодить хендлы, + # которые на Windows блокируют удаление файла в tearDown. + cur = self.store._conn.execute( + "SELECT started_at, ended_at FROM sessions WHERE id=?", (sid,) + ) + started, ended = cur.fetchone() + self.assertIsNotNone(started) + self.assertIsNone(ended) + self.store.end_session(sid) + cur = self.store._conn.execute( + "SELECT ended_at FROM sessions WHERE id=?", (sid,) + ) + (ended,) = cur.fetchone() + self.assertIsNotNone(ended) + + def test_close_is_idempotent(self): + self.store.close() + self.store.close() # не должно падать + + def test_is_block_flag_persists(self): + # Найденный блок — отдельный признак, отличный от accepted. + rid = self._share(is_block=True) + cur = self.store._conn.execute( + "SELECT is_block FROM shares WHERE id=?", (rid,) + ) + (is_block,) = cur.fetchone() + self.assertEqual(is_block, 1) + + def test_thread_safety(self): + # 4 нити пишут по 50 шаров каждая. После — total_shares == 200. + n_threads = 4 + per_thread = 50 + + def worker(idx: int) -> None: + for k in range(per_thread): + self._share(job_id=f"job-{idx}-{k}") + + threads = [ + threading.Thread(target=worker, args=(i,)) for i in range(n_threads) + ] + for t in threads: + t.start() + for t in threads: + t.join() + self.assertEqual(self.store.total_shares(), n_threads * per_thread) + + +if __name__ == "__main__": + unittest.main()