From 9db7d9826338cf62c37849460428d5591ff157d6 Mon Sep 17 00:00:00 2001
From: KruglikovskiiPA
Date: Thu, 30 Apr 2026 14:19:37 +0300
Subject: [PATCH] Level 1: multiprocessing + observability (v0.2.0)
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
Производительность:
- multiprocessing-разнос nonce-loop по N процессам (parallel.py).
CLI --workers N, default cpu_count - 1.
- EMA-хешрейт (alpha=0.3, окно 5с).
Наблюдаемость:
- SQLite-журнал шаров и сессий (storage.py).
Таблицы shares/sessions, WAL-режим, потокобезопасность.
- Prometheus-экспортёр /metrics (metrics.py) на http.server.
Метрики shares_total, hashrate_hps, pool_difficulty, workers, uptime.
- Telegram-уведомления через stdlib urllib (notifier.py).
Конфиг через env: HOPE_HASH_TELEGRAM_TOKEN, HOPE_HASH_TELEGRAM_CHAT_ID.
Все три observers опциональны, отключаются флагами или отсутствием env.
mine() принимает store/metrics/notifier как None-default параметры.
Покрытие тестами: 56 тестов (было 15). Все на stdlib unittest, без
внешних зависимостей (включая mock'и для Telegram через unittest.mock).
Co-Authored-By: Claude Opus 4.7 (1M context)
---
.gitignore | 4 +
CHANGELOG.md | 29 ++++-
README.md | 47 ++++++-
ROADMAP.md | 17 ++-
pyproject.toml | 4 +-
src/hope_hash/__init__.py | 9 +-
src/hope_hash/cli.py | 87 +++++++++++--
src/hope_hash/metrics.py | 188 +++++++++++++++++++++++++++
src/hope_hash/miner.py | 207 +++++++++++++++++++++--------
src/hope_hash/notifier.py | 167 ++++++++++++++++++++++++
src/hope_hash/parallel.py | 172 ++++++++++++++++++++++++
src/hope_hash/storage.py | 175 +++++++++++++++++++++++++
tests/test_metrics.py | 156 ++++++++++++++++++++++
tests/test_notifier.py | 266 ++++++++++++++++++++++++++++++++++++++
tests/test_storage.py | 120 +++++++++++++++++
15 files changed, 1564 insertions(+), 84 deletions(-)
create mode 100644 src/hope_hash/metrics.py
create mode 100644 src/hope_hash/notifier.py
create mode 100644 src/hope_hash/parallel.py
create mode 100644 src/hope_hash/storage.py
create mode 100644 tests/test_metrics.py
create mode 100644 tests/test_notifier.py
create mode 100644 tests/test_storage.py
diff --git a/.gitignore b/.gitignore
index 0bf110a..72150b1 100644
--- a/.gitignore
+++ b/.gitignore
@@ -16,6 +16,10 @@ htmlcov/
venv/
env/
*.log
+*.db
+*.db-journal
+*.db-wal
+*.db-shm
.idea/
.vscode/
.DS_Store
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 5a6620a..d4a4e4d 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -7,6 +7,30 @@
## [Unreleased]
+## [0.2.0] — 2026-04-30
+
+### Добавлено
+- **Multiprocessing** (`parallel.py`): N воркеров делят nonce-пространство
+ `[0, 2³²)` равными долями. CLI-флаг `--workers N` (default `cpu_count - 1`).
+ `found_queue` для шаров, `hashes_counter` для статистики.
+- **EMA-хешрейт**: скользящее среднее (alpha=0.3, окно 5с) вместо мгновенного.
+- **SQLite-журнал** (`storage.py`): таблицы `shares` и `sessions`, WAL-режим,
+ потокобезопасность. CLI-флаги `--db PATH`, `--no-db`.
+- **Prometheus-экспортёр** (`metrics.py`): `/metrics` на `http.server`, метрики
+ `hopehash_shares_total`, `hopehash_hashrate_hps`, `hopehash_pool_difficulty`,
+ `hopehash_workers`, `hopehash_uptime_seconds`. CLI-флаг `--metrics-port`
+ (default 9090, 0 — выключить).
+- **Telegram-уведомления** (`notifier.py`): через stdlib `urllib`, без
+ `python-telegram-bot`. Конфиг через env: `HOPE_HASH_TELEGRAM_TOKEN`,
+ `HOPE_HASH_TELEGRAM_CHAT_ID`. События: started / stopped / share_accepted /
+ block_found.
+- 41 новый юнит-тест (storage: 9, metrics: 16, notifier: 16). Всего **56 тестов**.
+
+### Изменено
+- `mine()` принимает опциональные `store`, `metrics`, `notifier` (None — disabled).
+- `__init__.py` re-export `ShareStore`, `Metrics`, `MetricsServer`, `TelegramNotifier`.
+- `.gitignore` дополнен `*.db`, `*.db-journal`, `*.db-wal`, `*.db-shm`.
+
## [0.1.0] — 2026-04-30
### Добавлено
@@ -19,5 +43,6 @@
- 15 юнит-тестов на криптографические функции (`unittest`).
- Реструктуризация в `src/`-layout с пакетом `hope_hash`.
-[Unreleased]: https://github.com/KruglikovskiiPA/Hope-Hash/compare/v0.1.0...HEAD
-[0.1.0]: https://github.com/KruglikovskiiPA/Hope-Hash/releases/tag/v0.1.0
+[Unreleased]: https://github.com/devAsmodeus/Hope-Hash/compare/v0.2.0...HEAD
+[0.2.0]: https://github.com/devAsmodeus/Hope-Hash/compare/v0.1.0...v0.2.0
+[0.1.0]: https://github.com/devAsmodeus/Hope-Hash/releases/tag/v0.1.0
diff --git a/README.md b/README.md
index 3ef924d..d302e5f 100644
--- a/README.md
+++ b/README.md
@@ -36,12 +36,21 @@
- [x] CI matrix Python 3.11/3.12/3.13 × ubuntu/windows/macos
- [ ] Конфиг через CLI/YAML — перенесено в Уровень 1
+**Уровень 1 — производительность и наблюдаемость: завершён.**
+
+- [x] Multiprocessing: N воркеров (default `cpu_count - 1`), флаг `--workers`
+- [x] EMA-хешрейт (alpha=0.3, окно 5с)
+- [x] SQLite-журнал шаров и сессий (`storage.py`, флаг `--db`)
+- [x] Prometheus-метрики на `/metrics` (`metrics.py`, флаг `--metrics-port`)
+- [x] Telegram-уведомления (через stdlib urllib, env-конфиг)
+- [ ] TUI на `rich` / `curses` — отложено (зависимости либо ограниченная Win-поддержка)
+- [ ] Команды Telegram-бота (`/stats`, `/restart`) — отложено
+
**Не сделано / известные ограничения:**
-- Один поток → ~50–200 KH/s на ноуте. Нет multiprocessing.
-- Нет персистентной статистики (логов, БД).
-- Нет UI — только консоль через `logging`.
+- Нет UI — только консоль через `logging` и `/metrics` через HTTP.
- Только Stratum V1, без `mining.suggest_difficulty` и без Stratum V2.
+- C/Rust/SIMD/GPU — Уровни 2–3, ещё впереди.
---
@@ -60,15 +69,22 @@
├── src/hope_hash/
│ ├── __init__.py ← публичный API + __version__
│ ├── __main__.py ← `python -m hope_hash`
-│ ├── cli.py ← argparse, точка входа
-│ ├── miner.py ← mine(), supervisor_loop, run_session
+│ ├── cli.py ← argparse, точка входа, инициализация observers
+│ ├── miner.py ← mine() оркестратор + supervisor_loop
+│ ├── parallel.py ← multiprocessing воркеры nonce-loop
│ ├── stratum.py ← StratumClient (TCP + JSON-RPC)
│ ├── block.py ← double_sha256, swap_words, target, merkle
+│ ├── storage.py ← SQLite журнал шаров и сессий
+│ ├── metrics.py ← Prometheus экспортёр (http.server)
+│ ├── notifier.py ← Telegram через urllib
│ ├── _logging.py ← настройка logger("hope_hash")
│ └── py.typed ← PEP 561 marker
└── tests/
├── conftest.py ← общие фикстуры (заготовка)
- └── test_block.py ← 15 unittest-тестов на чистые функции
+ ├── test_block.py ← 15 тестов на чистые функции
+ ├── test_storage.py ← 9 тестов на SQLite журнал
+ ├── test_metrics.py ← 16 тестов на Prometheus экспортёр
+ └── test_notifier.py ← 16 тестов на Telegram (через mock)
```
---
@@ -92,10 +108,27 @@ python -m hope_hash [имя_воркера]
hope-hash bc1q5n2x4pvxhq8sxc7ck3uxq8sxc7ck3uxqzfm2py mylaptop
```
+**Расширенные опции:**
+
+```bash
+hope-hash mylaptop \
+ --workers 8 \ # число процессов (default: cpu_count - 1)
+ --db ./shares.db \ # путь к SQLite (default: hope_hash.db)
+ --metrics-port 9090 # Prometheus /metrics (0 — отключить)
+```
+
+**Telegram-уведомления (опционально):** задать env vars и просто запустить:
+
+```bash
+export HOPE_HASH_TELEGRAM_TOKEN=123456:abcdef-your-bot-token
+export HOPE_HASH_TELEGRAM_CHAT_ID=123456789
+hope-hash
+```
+
**Тесты:**
```bash
-python -m unittest discover -s tests -v
+python -m unittest discover -s tests -v # 56 тестов
```
BTC-адрес нужен валидный (любой формат: `1...`, `3...`, `bc1q...`, `bc1p...`). Можно завести в любом некастодиальном кошельке — например, **Sparrow**, **Electrum**, **Wasabi**. Имя воркера — произвольная строка.
diff --git a/ROADMAP.md b/ROADMAP.md
index 31820ca..791a276 100644
--- a/ROADMAP.md
+++ b/ROADMAP.md
@@ -26,12 +26,15 @@
## Уровень 1 — лёгкие апгрейды (вечер каждый)
+**Статус (2026-04-30):** производительность + наблюдаемость — закрыто.
+TUI и команды Telegram — отложены.
+
Видимые фичи, не требующие глубокой переработки.
### Производительность
-- [ ] **Multiprocessing.** Каждый CPU-ядро — отдельный процесс, общий `extranonce2` через `multiprocessing.Value`. Х2-х8 к хешрейту в зависимости от железа.
-- [ ] **Замер реального хешрейта по окнам.** Сейчас считается «за интервал печати», правильнее — скользящее окно с EMA (exponential moving average).
+- [x] **Multiprocessing.** `parallel.py`, CLI `--workers N`. nonce-пространство `[0, 2³²)` делится поровну между N процессами. found_queue + hashes_counter (`mp.Value('Q')`). На 16-CPU машине default = 15 воркеров.
+- [x] **EMA-хешрейт.** alpha=0.3, окно 5с. Сэмпл = дельта счётчика / dt. Логируется и в `[stats]`, и в Prometheus gauge `hopehash_hashrate_hps`.
### UI/UX
@@ -41,14 +44,14 @@
### Telegram-бот
-- [ ] **Уведомления о ключевых событиях:** старт майнера, потеря соединения, принятый шар, (мечты) найденный блок. Через `python-telegram-bot` или просто curl на `api.telegram.org`.
-- [ ] **Команды `/stats`, `/restart`, `/stop`** через того же бота.
+- [x] **Исходящие уведомления.** `notifier.py` через stdlib `urllib`, без `python-telegram-bot`. События: started / stopped / share_accepted / block_found / disconnected / reconnected. Конфиг через env (`HOPE_HASH_TELEGRAM_TOKEN`, `HOPE_HASH_TELEGRAM_CHAT_ID`). Если переменные не заданы — модуль молча disabled.
+- [ ] **Входящие команды `/stats`, `/restart`, `/stop`** через тот же бот (long polling). Отложено.
### Логи и метрики
-- [ ] **SQLite-журнал шар.** Каждый принятый шар → строка с timestamp, job_id, hash, difficulty.
-- [ ] **Экспорт в Prometheus формат** через эндпоинт `/metrics` на `http.server`.
-- [ ] **Grafana-дашборд** с готовым JSON для импорта.
+- [x] **SQLite-журнал** (`storage.py`). Таблицы `shares` (ts, job_id, nonce_hex, hash_hex, difficulty, accepted, is_block) и `sessions`. WAL-режим, потокобезопасность через `threading.Lock`.
+- [x] **Prometheus-экспортёр** (`metrics.py`). Эндпоинт `/metrics` на `http.server` (`ThreadingHTTPServer` в фоновой нити). Метрики: `hopehash_shares_total`, `hopehash_hashrate_hps`, `hopehash_pool_difficulty`, `hopehash_workers`, `hopehash_uptime_seconds`. CLI `--metrics-port` (0 — выключить).
+- [ ] **Grafana-дашборд** с готовым JSON для импорта. Отложено.
---
diff --git a/pyproject.toml b/pyproject.toml
index 0b3a11c..b1c2de0 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -28,8 +28,8 @@ dependencies = []
dynamic = ["version"]
[project.urls]
-Repository = "https://github.com/KruglikovskiiPA/Hope-Hash"
-Issues = "https://github.com/KruglikovskiiPA/Hope-Hash/issues"
+Repository = "https://github.com/devAsmodeus/Hope-Hash"
+Issues = "https://github.com/devAsmodeus/Hope-Hash/issues"
[project.scripts]
hope-hash = "hope_hash.cli:main"
diff --git a/src/hope_hash/__init__.py b/src/hope_hash/__init__.py
index 1f56b52..9af66f7 100644
--- a/src/hope_hash/__init__.py
+++ b/src/hope_hash/__init__.py
@@ -1,10 +1,13 @@
"""Hope-Hash — учебный solo BTC miner на чистом stdlib."""
from .block import build_merkle_root, difficulty_to_target, double_sha256, swap_words
+from .metrics import Metrics, MetricsServer
from .miner import mine
+from .notifier import TelegramNotifier
+from .storage import ShareStore
from .stratum import StratumClient
-__version__ = "0.1.0"
+__version__ = "0.2.0"
__all__ = [
"double_sha256",
"swap_words",
@@ -12,5 +15,9 @@
"build_merkle_root",
"StratumClient",
"mine",
+ "ShareStore",
+ "Metrics",
+ "MetricsServer",
+ "TelegramNotifier",
"__version__",
]
diff --git a/src/hope_hash/cli.py b/src/hope_hash/cli.py
index e2108b7..649c535 100644
--- a/src/hope_hash/cli.py
+++ b/src/hope_hash/cli.py
@@ -1,11 +1,17 @@
-"""Точка входа CLI: argparse, запуск supervisor + mine()."""
+"""Точка входа CLI: argparse, запуск supervisor + mine() + observers."""
import argparse
+import multiprocessing
+import os
import threading
import time
+from pathlib import Path
from ._logging import logger, setup_logging
+from .metrics import Metrics, MetricsServer
from .miner import mine, supervisor_loop
+from .notifier import TelegramNotifier
+from .storage import ShareStore
from .stratum import StratumClient
@@ -13,9 +19,13 @@
POOL_PORT = 3333
-def main():
- setup_logging()
+def _default_workers() -> int:
+ """Один CPU оставляем сетевой части/IO. Минимум — 1 воркер."""
+ cpu = os.cpu_count() or 1
+ return max(1, cpu - 1)
+
+def _parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
prog="hope_hash",
description="Учебный solo BTC miner на чистом stdlib.",
@@ -23,13 +33,61 @@ def main():
parser.add_argument("btc_address", help="BTC-адрес для выплат (на него уйдёт награда).")
parser.add_argument("worker_name", nargs="?", default="py01",
help="Имя воркера (по умолчанию: py01).")
- args = parser.parse_args()
+ parser.add_argument(
+ "--workers", type=int, default=_default_workers(),
+ help=f"Число процессов-воркеров (по умолчанию: {_default_workers()} = cpu_count - 1).",
+ )
+ parser.add_argument(
+ "--db", type=str, default="hope_hash.db",
+ help="Путь к SQLite-журналу шаров (по умолчанию: hope_hash.db).",
+ )
+ parser.add_argument(
+ "--no-db", action="store_true",
+ help="Отключить SQLite-журнал (--db игнорируется).",
+ )
+ parser.add_argument(
+ "--metrics-port", type=int, default=9090,
+ help="Порт Prometheus /metrics (по умолчанию: 9090, 0 — отключить).",
+ )
+ return parser.parse_args()
+
- btc_address = args.btc_address
- worker_name = args.worker_name
+def main():
+ # Защитный вызов: на Windows multiprocessing требует freeze_support()
+ # при запуске через `python -m hope_hash`. Без него spawn-дети могут
+ # пытаться повторно стартовать main() и упасть.
+ multiprocessing.freeze_support()
+
+ setup_logging()
+ args = _parse_args()
+ n_workers = max(1, args.workers)
+
+ # ─── observers ───
+ # Все три опциональны и не зависят друг от друга. Каждый сам решает,
+ # включаться ли (notifier — по env vars; metrics — по порту; store — по флагу).
+ store: ShareStore | None = None
+ if not args.no_db:
+ store = ShareStore(Path(args.db))
+ metrics: Metrics | None = None
+ metrics_server: MetricsServer | None = None
+ if args.metrics_port > 0:
+ metrics = Metrics()
+ metrics_server = MetricsServer(metrics, port=args.metrics_port)
+ metrics_server.start()
+
+ notifier = TelegramNotifier.from_env()
+ notifier.notify_started(args.btc_address, args.worker_name)
+
+ if store is not None:
+ session_id = store.start_session(POOL_HOST, args.btc_address, args.worker_name)
+ else:
+ session_id = None
+
+ # ─── сетевая часть и mine() ───
stop = threading.Event()
- client = StratumClient(POOL_HOST, POOL_PORT, btc_address, worker_name, stop_event=stop)
+ client = StratumClient(POOL_HOST, POOL_PORT, args.btc_address, args.worker_name,
+ stop_event=stop)
# Сетевая часть живёт в отдельной нити-супервизоре: она держит коннект,
# переподключается при разрывах и сама поднимает reader_loop. main thread
@@ -38,13 +96,14 @@ def main():
name="stratum-supervisor", daemon=False)
supervisor.start()
- logger.info("[main] жду первый job от пула...")
+ logger.info(f"[main] жду первый job от пула... (воркеров: {n_workers})")
while client.current_job is None and not stop.is_set():
time.sleep(0.1)
try:
if not stop.is_set():
- mine(client, stop)
+ mine(client, stop, n_workers=n_workers,
+ store=store, metrics=metrics, notifier=notifier)
except KeyboardInterrupt:
logger.info("[main] остановка по Ctrl+C")
finally:
@@ -55,3 +114,13 @@ def main():
supervisor.join(timeout=5)
if supervisor.is_alive():
logger.warning("[main] supervisor не остановился за 5с")
+
+ # Закрываем observers последними, чтобы дать им зафиксировать финальные события.
+ notifier.notify_stopped()
+ notifier.shutdown()
+ if metrics_server is not None:
+ metrics_server.stop()
+ if store is not None:
+ if session_id is not None:
+ store.end_session(session_id)
+ store.close()
diff --git a/src/hope_hash/metrics.py b/src/hope_hash/metrics.py
new file mode 100644
index 0000000..4dec04e
--- /dev/null
+++ b/src/hope_hash/metrics.py
@@ -0,0 +1,188 @@
+"""Prometheus-совместимые метрики через stdlib http.server. Без зависимостей.
+
+Два класса: ``Metrics`` — потокобезопасный регистр counter/gauge,
+``MetricsServer`` — HTTP-сервер ``/metrics`` на фоновой нити. Логгер
+берём по имени пакета, чтобы не плодить циклических импортов.
+"""
+
+import logging
+import threading
+import time
+from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
+
+logger = logging.getLogger("hope_hash")
+
+
+# Допустимые символы для имени метрики по Prometheus naming convention:
+# первая буква — [a-zA-Z_:], остальные — [a-zA-Z0-9_:]. Всё прочее
+# заменяем на подчёркивание. Это терпит произвольный пользовательский ввод.
+_NAME_FIRST_OK = set("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ_:")
+_NAME_REST_OK = _NAME_FIRST_OK | set("0123456789")
+
+
+def _sanitize_name(name: str) -> str:
+ """Приводит имя метрики к виду ``[a-zA-Z_:][a-zA-Z0-9_:]*``."""
+ if not name:
+ return "_"
+ chars = []
+ for i, ch in enumerate(name):
+ ok = _NAME_FIRST_OK if i == 0 else _NAME_REST_OK
+ chars.append(ch if ch in ok else "_")
+ return "".join(chars)
+
+
+def _format_float(v: float) -> str:
+ """Форматирует gauge для Prometheus: целые без ``.0``, дробные через ``repr``."""
+ if v == int(v) and abs(v) < 1e16:
+ return str(int(v))
+ return repr(v)
+
+
+class Metrics:
+ """Регистр метрик. Потокобезопасный.
+
+ Поддерживаемые типы:
+
+ - counter — монотонно растущий int (например, число шар).
+ - gauge — произвольное float (например, текущий хешрейт).
+
+ Формат вывода соответствует Prometheus text format 0.0.4. Помимо
+ пользовательских метрик, ``render()`` всегда добавляет автоматический
+ gauge ``hopehash_uptime_seconds``.
+ """
+
+ def __init__(self) -> None:
+ self._lock = threading.Lock()
+ self._counters: dict[str, int] = {}
+ self._gauges: dict[str, float] = {}
+ self._help: dict[str, str] = {}
+ self._created_at = time.time()
+
+ def counter_inc(self, name: str, value: int = 1, help: str | None = None) -> None:
+ """Инкрементирует counter. ``value`` должен быть >= 0."""
+ if value < 0:
+ # Counter по определению монотонный; отрицательный шаг — это баг
+ # в коде вызывающего, лучше упасть громко, чем тихо «декрементить».
+ raise ValueError("counter_inc: value must be >= 0")
+ key = _sanitize_name(name)
+ with self._lock:
+ self._counters[key] = self._counters.get(key, 0) + int(value)
+ if help is not None:
+ self._help[key] = help
+
+ def gauge_set(self, name: str, value: float, help: str | None = None) -> None:
+ """Устанавливает gauge в указанное значение."""
+ key = _sanitize_name(name)
+ with self._lock:
+ self._gauges[key] = float(value)
+ if help is not None:
+ self._help[key] = help
+
+ def render(self) -> bytes:
+ """Возвращает все метрики в Prometheus text format. UTF-8 bytes."""
+ # Снимок под локом — потом форматируем без удержания лока, чтобы
+ # не блокировать producer'ов на длительный sprintf.
+ with self._lock:
+ counters = dict(self._counters)
+ gauges = dict(self._gauges)
+ helps = dict(self._help)
+ uptime = time.time() - self._created_at
+
+ lines: list[str] = []
+
+ for name in sorted(counters):
+ help_text = helps.get(name, f"Counter {name}")
+ lines.append(f"# HELP {name} {help_text}")
+ lines.append(f"# TYPE {name} counter")
+ lines.append(f"{name} {counters[name]}")
+
+ for name in sorted(gauges):
+ help_text = helps.get(name, f"Gauge {name}")
+ lines.append(f"# HELP {name} {help_text}")
+ lines.append(f"# TYPE {name} gauge")
+ lines.append(f"{name} {_format_float(gauges[name])}")
+
+ # Автоматический uptime — всегда последним, чтобы порядок был стабильным.
+ lines.append("# HELP hopehash_uptime_seconds Seconds since metrics registry created")
+ lines.append("# TYPE hopehash_uptime_seconds gauge")
+ lines.append(f"hopehash_uptime_seconds {_format_float(uptime)}")
+
+ # Финальный перевод строки — Prometheus exposition требует, чтобы
+ # последняя метрика заканчивалась ``\n``.
+ return ("\n".join(lines) + "\n").encode("utf-8")
+
+
+def _make_handler(metrics: Metrics) -> type[BaseHTTPRequestHandler]:
+ """Фабрика handler-класса с ``metrics`` через замыкание — без глобалов."""
+
+ class _Handler(BaseHTTPRequestHandler):
+ def do_GET(self) -> None: # noqa: N802 — имя задано базовым классом
+ if self.path != "/metrics":
+ self.send_error(404)
+ return
+ body = metrics.render()
+ self.send_response(200)
+ self.send_header("Content-Type", "text/plain; version=0.0.4; charset=utf-8")
+ self.send_header("Content-Length", str(len(body)))
+ self.end_headers()
+ self.wfile.write(body)
+
+ def log_message(self, format: str, *args: object) -> None: # noqa: A002
+ # Подавляем стандартный stderr-лог BaseHTTPRequestHandler:
+ # в проекте единый канал — logger "hope_hash", и засорять его
+ # каждым GET /metrics не нужно.
+ return
+
+ return _Handler
+
+
+class MetricsServer:
+ """HTTP-сервер для ``/metrics`` на отдельной нити. Старт/стоп идемпотентны."""
+
+ def __init__(self, metrics: Metrics, host: str = "127.0.0.1", port: int = 9090) -> None:
+ self.metrics = metrics
+ self.host = host
+ self.port = port
+ self._server: ThreadingHTTPServer | None = None
+ self._thread: threading.Thread | None = None
+ # Лок защищает start/stop от гонки, если их зовут из разных нитей.
+ self._lifecycle_lock = threading.Lock()
+
+ def start(self) -> None:
+ """Запускает сервер в фоновой нити. Идемпотентен."""
+ with self._lifecycle_lock:
+ if self._server is not None:
+ # Уже запущен — ничего не делаем, чтобы не порвать рабочий
+ # сокет повторным bind'ом.
+ return
+ handler_cls = _make_handler(self.metrics)
+ self._server = ThreadingHTTPServer((self.host, self.port), handler_cls)
+ self._thread = threading.Thread(
+ target=self._server.serve_forever,
+ name=f"hope_hash-metrics-{self.port}",
+ daemon=True,
+ )
+ self._thread.start()
+ logger.info("[metrics] сервер запущен на %s", self.url)
+
+ def stop(self, timeout: float = 2.0) -> None:
+ """Останавливает сервер. Идемпотентен."""
+ with self._lifecycle_lock:
+ server = self._server
+ thread = self._thread
+ self._server = None
+ self._thread = None
+
+ if server is not None:
+ # shutdown() блокирует serve_forever и ждёт его выхода,
+ # server_close() закрывает listening-сокет.
+ server.shutdown()
+ server.server_close()
+ if thread is not None:
+ thread.join(timeout=timeout)
+ if server is not None:
+ logger.info("[metrics] сервер остановлен")
+
+ @property
+ def url(self) -> str:
+ return f"http://{self.host}:{self.port}/metrics"
diff --git a/src/hope_hash/miner.py b/src/hope_hash/miner.py
index 7181e0b..ebd56ea 100644
--- a/src/hope_hash/miner.py
+++ b/src/hope_hash/miner.py
@@ -1,12 +1,16 @@
"""Главный цикл хеширования mine() и сетевой супервизор переподключений."""
import socket
-import struct
import threading
import time
+from typing import Optional
from ._logging import logger
from .block import build_merkle_root, difficulty_to_target, double_sha256, swap_words
+from .metrics import Metrics
+from .notifier import TelegramNotifier
+from .parallel import start_pool, stop_pool
+from .storage import ShareStore
from .stratum import StratumClient
@@ -64,10 +68,63 @@ def supervisor_loop(client: StratumClient):
# ─────────────────────── основной майнинг-цикл ───────────────────────
-def mine(client: StratumClient, stop_event: threading.Event):
- hashes = 0
- last_print = time.time()
+def _build_header_base(job: dict, extranonce1: str, extranonce2: str) -> bytes:
+ """
+ Собирает 76-байтовый префикс block header (всё, кроме nonce).
+
+ Вынесено в отдельную функцию: воркеры nonce-loop теперь в parallel.py,
+ а main process только формирует header_base и отдаёт его в пул.
+ """
+ coinbase_hex = job["coinb1"] + extranonce1 + extranonce2 + job["coinb2"]
+ coinbase_hash = double_sha256(bytes.fromhex(coinbase_hex))
+ merkle_root = build_merkle_root(coinbase_hash, job["merkle_branch"])
+ return (
+ bytes.fromhex(job["version"])[::-1] + # 4 b version (LE)
+ swap_words(job["prevhash"]) + # 32 b prev hash (word-swap)
+ merkle_root + # 32 b merkle (LE)
+ bytes.fromhex(job["ntime"])[::-1] + # 4 b ntime (LE)
+ bytes.fromhex(job["nbits"])[::-1] # 4 b nbits (LE)
+ )
+
+
+def _format_rate(rate: float) -> str:
+ """Человекочитаемый хешрейт: H/s → KH/s → MH/s."""
+ if rate < 1000:
+ return f"{rate:.0f} H/s"
+ if rate < 1_000_000:
+ return f"{rate/1000:.2f} KH/s"
+ return f"{rate/1_000_000:.2f} MH/s"
+
+
+def mine(
+ client: StratumClient,
+ stop_event: threading.Event,
+ n_workers: int = 1,
+ store: Optional[ShareStore] = None,
+ metrics: Optional[Metrics] = None,
+ notifier: Optional[TelegramNotifier] = None,
+):
+ """
+ Оркестратор пула воркеров.
+
+ Логика:
+ 1. Ждём первый job, собираем header_base.
+ 2. Поднимаем пул (start_pool) с уникальным extranonce2.
+ 3. В цикле: читаем found_queue, считаем EMA-хешрейт, следим за сменой job.
+ 4. При смене job_id (или stop_event) — gracefully гасим пул и идём на 1.
+
+ EMA: alpha=0.3, окно ~5с. Сэмпл = (current_counter - prev_counter) / dt.
+ Счётчик не сбрасываем — храним предыдущее значение и считаем дельту,
+ так точнее и не надо синхронизироваться с воркерами через Lock.
+
+ Опциональные наблюдатели (store/metrics/notifier) подключаются хуками
+ на ключевые события: найденный шар → запись в БД + counter в Prometheus +
+ уведомление в Telegram; EMA-хешрейт → gauge.
+ """
extranonce2_counter = 0
+ ema = 0.0
+ alpha = 0.3
+ report_interval = 5.0
while not stop_event.is_set():
with client.job_lock:
@@ -79,65 +136,103 @@ def mine(client: StratumClient, stop_event: threading.Event):
continue
# extranonce2 — наша часть coinbase, чтобы каждый воркер крутил уникальные хеши.
+ # Поднимается на каждый job; в пределах одного job всё nonce-пространство
+ # делится между процессами.
extranonce2 = f"{extranonce2_counter:0{en2_size * 2}x}"
extranonce2_counter += 1
- # Собираем coinbase = coinb1 + extranonce1 + extranonce2 + coinb2
- coinbase_hex = job["coinb1"] + en1 + extranonce2 + job["coinb2"]
- coinbase_hash = double_sha256(bytes.fromhex(coinbase_hex))
-
- # Считаем merkle root через ветки от пула
- merkle_root = build_merkle_root(coinbase_hash, job["merkle_branch"])
-
- # Базовый block header без nonce (76 байт = 80 - 4)
- header_base = (
- bytes.fromhex(job["version"])[::-1] + # 4 b version (LE)
- swap_words(job["prevhash"]) + # 32 b prev hash (word-swap)
- merkle_root + # 32 b merkle (LE)
- bytes.fromhex(job["ntime"])[::-1] + # 4 b ntime (LE)
- bytes.fromhex(job["nbits"])[::-1] # 4 b nbits (LE)
- )
-
+ header_base = _build_header_base(job, en1, extranonce2)
target = difficulty_to_target(client.difficulty)
current_job_id = job["job_id"]
- # Перебор nonce: 0 .. 2^32 - 1
- for nonce in range(0, 0xFFFFFFFF):
- if stop_event.is_set():
- return
+ processes, found_queue, hashes_counter, mp_stop = start_pool(
+ n_workers, header_base, target, extranonce2,
+ )
- # Если пришла свежая работа — выходим, чтобы не тратить время на старую
- if hashes & 0x3FFF == 0: # каждые 16k хешей дёргаем lock, чтобы не душить нить
+ prev_count = 0
+ last_report = time.time()
+ last_alive_check = time.time()
+
+ # ─── основной цикл одного job ───
+ try:
+ while not stop_event.is_set():
+ # 1. Не блокирующее чтение находок.
+ try:
+ while True:
+ nonce_hex, hash_hex, en2 = found_queue.get_nowait()
+ logger.warning(
+ f"[mine] !!! НАЙДЕН ШАР !!! nonce={nonce_hex} hash={hash_hex}"
+ )
+ try:
+ client.submit(current_job_id, en2, job["ntime"], nonce_hex)
+ except (OSError, AttributeError) as e:
+ # submit может прийтись на момент reconnect — не валим майнер.
+ logger.warning(f"[stratum] не удалось отправить шар: {e}")
+ # Хуки наблюдателей. Все опциональны — None означает disabled.
+ if store is not None:
+ store.record_share(
+ job_id=current_job_id, nonce_hex=nonce_hex,
+ hash_hex=hash_hex, difficulty=client.difficulty,
+ accepted=True,
+ )
+ if metrics is not None:
+ metrics.counter_inc(
+ "hopehash_shares_total", 1,
+ help="Total shares submitted (accepted by client side)",
+ )
+ if notifier is not None:
+ notifier.notify_share_accepted(
+ job_id=current_job_id, difficulty=client.difficulty,
+ )
+ except Exception:
+ # Empty/queue closed — единственный нормальный путь выхода из while.
+ pass
+
+ now = time.time()
+
+ # 2. EMA-хешрейт.
+ if now - last_report >= report_interval:
+ with hashes_counter.get_lock():
+ cur = hashes_counter.value
+ sample = (cur - prev_count) / (now - last_report)
+ ema = sample if ema == 0.0 else alpha * sample + (1 - alpha) * ema
+ logger.info(
+ f"[stats] хешрейт ≈ {_format_rate(ema)} "
+ f"(окно {_format_rate(sample)}) | "
+ f"pool diff = {client.difficulty} | workers = {len(processes)}"
+ )
+ if metrics is not None:
+ metrics.gauge_set(
+ "hopehash_hashrate_hps", ema,
+ help="Current EMA hashrate in hashes per second",
+ )
+ metrics.gauge_set(
+ "hopehash_pool_difficulty", float(client.difficulty),
+ help="Current pool difficulty",
+ )
+ metrics.gauge_set(
+ "hopehash_workers", float(len(processes)),
+ help="Number of active worker processes",
+ )
+ prev_count = cur
+ last_report = now
+
+ # 3. Смена job — выходим из цикла, чтобы пересоздать пул.
with client.job_lock:
- if not client.current_job or client.current_job["job_id"] != current_job_id:
+ cj = client.current_job
+ if not cj or cj["job_id"] != current_job_id:
+ logger.info(f"[mine] job сменился ({current_job_id} → "
+ f"{cj['job_id'] if cj else 'None'}), рестарт пула")
+ break
+
+ # 4. Все воркеры исчерпали nonce-пространство?
+ if now - last_alive_check >= 1.0:
+ if not any(p.is_alive() for p in processes):
+ logger.info("[pool] все воркеры исчерпали nonce — берём новый extranonce2")
break
+ last_alive_check = now
- header = header_base + struct.pack("I", nonce).hex()
- logger.warning(
- f"[mine] !!! НАЙДЕН ШАР !!! nonce={nonce_hex} hash={h[::-1].hex()}"
- )
- try:
- client.submit(current_job_id, extranonce2, job["ntime"], nonce_hex)
- except (OSError, AttributeError) as e:
- # submit может прийтись на момент reconnect — не валим майнер.
- logger.warning(f"[stratum] не удалось отправить шар: {e}")
-
- hashes += 1
-
- # Каждые 5 секунд — статистика
- now = time.time()
- if now - last_print >= 5.0:
- rate = hashes / (now - last_print)
- rate_str = f"{rate:.0f} H/s" if rate < 1000 else f"{rate/1000:.2f} KH/s"
- logger.info(
- f"[stats] хешрейт ≈ {rate_str} | pool diff = {client.difficulty}"
- )
- hashes = 0
- last_print = now
+ # Дёшево спим, чтобы не жечь main CPU на busy-loop.
+ time.sleep(0.05)
+ finally:
+ stop_pool(processes, found_queue, mp_stop)
diff --git a/src/hope_hash/notifier.py b/src/hope_hash/notifier.py
new file mode 100644
index 0000000..a22da9b
--- /dev/null
+++ b/src/hope_hash/notifier.py
@@ -0,0 +1,167 @@
+"""Telegram-уведомления через stdlib urllib. Без зависимостей.
+
+Использование:
+ notifier = TelegramNotifier(token="123:ABC", chat_id="456789")
+ notifier.notify("Майнер стартовал")
+ notifier.notify_share_accepted(job_id="abc", difficulty=1.0)
+ notifier.notify_block_found(hash_hex="00000000...", height=999999)
+ notifier.shutdown() # дренирует очередь, закрывает воркер-нить
+
+Если token или chat_id не заданы — все методы становятся no-op (silent disable).
+Это позволяет интегрировать notifier в miner без обязательной настройки.
+
+Архитектура: фоновая нить-воркер тащит сообщения из `queue.Queue` и шлёт
+на Telegram Bot API через urllib. Сетевые вызовы не блокируют hot-path
+майнера; при переполнении очереди сообщения отбрасываются с warning.
+"""
+
+import logging
+import os
+import queue
+import threading
+import urllib.error
+import urllib.parse
+import urllib.request
+from typing import Optional
+
+logger = logging.getLogger("hope_hash")
+
+# Шаблон URL Telegram Bot API. sendMessage принимает form-encoded body.
+TELEGRAM_API = "https://api.telegram.org/bot{token}/sendMessage"
+
+
+class TelegramNotifier:
+ """Асинхронный отправщик сообщений в Telegram.
+
+ Работает через background thread + очередь, чтобы сетевые вызовы
+ не блокировали майнинг-цикл. Если token или chat_id отсутствуют
+ (None или пустая строка) — все методы превращаются в no-op.
+ """
+
+ def __init__(
+ self,
+ token: Optional[str] = None,
+ chat_id: Optional[str] = None,
+ timeout: float = 5.0,
+ queue_maxsize: int = 100,
+ ):
+ # Disabled mode: явно проверяем оба значения. Пустая строка трактуется
+ # как «не задано» — это удобнее, чем падать на пустом env-var.
+ self.enabled = bool(token) and bool(chat_id)
+ self.token = token or ""
+ self.chat_id = chat_id or ""
+ self.timeout = timeout
+ self._queue: queue.Queue = queue.Queue(maxsize=queue_maxsize)
+ self._stop_event = threading.Event()
+ self._thread: Optional[threading.Thread] = None
+ if self.enabled:
+ # daemon=True — чтобы при жёстком SIGKILL процесс не висел из-за нити.
+ # Корректное завершение всё равно через shutdown().
+ self._thread = threading.Thread(
+ target=self._worker,
+ daemon=True,
+ name="telegram-notifier",
+ )
+ self._thread.start()
+ logger.info("[telegram] notifier активен (chat=%s)", self.chat_id)
+ else:
+ logger.info("[telegram] notifier отключён (нет token/chat_id)")
+
+ @classmethod
+ def from_env(cls) -> "TelegramNotifier":
+ """Читает HOPE_HASH_TELEGRAM_TOKEN и HOPE_HASH_TELEGRAM_CHAT_ID из env."""
+ return cls(
+ token=os.environ.get("HOPE_HASH_TELEGRAM_TOKEN"),
+ chat_id=os.environ.get("HOPE_HASH_TELEGRAM_CHAT_ID"),
+ )
+
+ def notify(self, text: str) -> None:
+ """Кладёт сообщение в очередь. No-op если disabled или очередь полна."""
+ if not self.enabled:
+ return
+ try:
+ self._queue.put_nowait(text)
+ except queue.Full:
+ # Сознательно роняем сообщение, а не блокируем — для майнера
+ # потеря уведомления безопаснее, чем застывший hot-path.
+ logger.warning("[telegram] очередь переполнена, сообщение отброшено")
+
+ def notify_started(self, btc_address: str, worker_name: str) -> None:
+ self.notify(
+ f"🟢 Hope-Hash запущен\nworker: {worker_name}\nadr: {btc_address[:8]}…"
+ )
+
+ def notify_stopped(self) -> None:
+ self.notify("🔴 Hope-Hash остановлен")
+
+ def notify_disconnected(self, reason: str) -> None:
+ self.notify(f"⚠️ Потеря соединения: {reason}")
+
+ def notify_reconnected(self) -> None:
+ self.notify("✅ Соединение восстановлено")
+
+ def notify_share_accepted(self, job_id: str, difficulty: float) -> None:
+ self.notify(f"✓ Шар принят (job={job_id}, diff={difficulty})")
+
+ def notify_block_found(self, hash_hex: str, height: Optional[int] = None) -> None:
+ h = f" #{height}" if height else ""
+ self.notify(f"🎉 БЛОК НАЙДЕН{h}!\nhash: {hash_hex[:32]}…")
+
+ def shutdown(self, timeout: float = 5.0) -> None:
+ """Дренирует очередь и останавливает воркер. Идемпотентно."""
+ if not self.enabled or self._thread is None:
+ return
+ # Сначала ждём, пока воркер обработает все ранее положенные сообщения.
+ # queue.join() блокируется до тех пор, пока на каждый put не сделан task_done.
+ self._queue.join()
+ # Теперь сигналим воркеру выйти и пробуждаем его sentinel-ом None.
+ self._stop_event.set()
+ try:
+ self._queue.put_nowait(None)
+ except queue.Full:
+ # Очередь заполнена — но раз join вернулся, воркер уже опустошил всё
+ # и сейчас ждёт на get(); put_nowait не должен падать. На всякий случай
+ # используем put с таймаутом как fallback.
+ try:
+ self._queue.put(None, timeout=1.0)
+ except queue.Full:
+ pass
+ self._thread.join(timeout=timeout)
+ self._thread = None
+
+ def _worker(self) -> None:
+ """Фоновая нить: тащит из очереди и шлёт в Telegram API."""
+ while True:
+ item = self._queue.get()
+ try:
+ # None — sentinel для остановки. Stop_event дублирует на случай
+ # если кто-то выставил флаг без отправки sentinel-а.
+ if item is None or self._stop_event.is_set() and item is None:
+ return
+ self._send(item)
+ except Exception as e:
+ # Сетевые сбои/HTTP-ошибки/таймауты — логируем и идём дальше.
+ # Не даём упасть нити: иначе следующие notify() будут уходить в пустоту.
+ logger.warning("[telegram] ошибка отправки: %s", e)
+ finally:
+ # task_done обязателен на КАЖДЫЙ get (включая sentinel),
+ # иначе queue.join() в shutdown() зависнет.
+ self._queue.task_done()
+
+ def _send(self, text: str) -> None:
+ """Один HTTP POST на api.telegram.org. urllib + urlencode."""
+ url = TELEGRAM_API.format(token=self.token)
+ data = urllib.parse.urlencode(
+ {
+ "chat_id": self.chat_id,
+ "text": text,
+ "disable_web_page_preview": "true",
+ }
+ ).encode("utf-8")
+ req = urllib.request.Request(url, data=data, method="POST")
+ with urllib.request.urlopen(req, timeout=self.timeout) as resp:
+ # urlopen уже бросает HTTPError на 4xx/5xx, но проверим status явно
+ # на случай нестандартных кодов.
+ if resp.status >= 400:
+ raise RuntimeError(f"Telegram API status {resp.status}")
+ logger.debug("[telegram] отправлено: %r", text[:60])
diff --git a/src/hope_hash/parallel.py b/src/hope_hash/parallel.py
new file mode 100644
index 0000000..83e5505
--- /dev/null
+++ b/src/hope_hash/parallel.py
@@ -0,0 +1,172 @@
+"""Параллельный перебор nonce через multiprocessing.
+
+Архитектура (см. ROADMAP «Уровень 1 / Производительность»):
+
+- Сетевая часть (StratumClient) остаётся в main process. Worker-процессы
+ не ходят в сеть — это убирает гонки на сокете и даёт чистый pickle-able
+ интерфейс «header_base + диапазон nonce → найденные шары в очередь».
+- Пул воркеров пересоздаётся при каждом новом job (terminate + новый pool).
+ Это проще, чем делать persistent workers с IPC очередями для job updates,
+ и достаточно для учебного кода.
+- Windows-совместимость: `multiprocessing` использует `spawn`, поэтому
+ `worker` должен быть top-level функцией без замыканий, а все аргументы —
+ pickle-able (bytes, int, multiprocessing.Queue/Value/Event).
+"""
+
+import multiprocessing as mp
+import struct
+import time
+
+from ._logging import logger
+from .block import double_sha256
+
+
+# Каждые столько хешей воркер сверяется со stop_event и инкрементирует
+# общий счётчик. Чем чаще — тем дороже из-за блокировки на Lock у Value.
+# 16k подобрано так же, как в одно-процессорной версии mine() для проверки job_id.
+HASHES_PER_TICK = 1 << 14 # 16384
+
+
+def worker(
+ worker_id: int,
+ header_base: bytes,
+ target: int,
+ nonce_start: int,
+ nonce_end: int,
+ extranonce2: str,
+ found_queue: "mp.Queue",
+ hashes_counter: "mp.sharedctypes.Synchronized",
+ stop_event: "mp.synchronize.Event",
+) -> None:
+ """
+ Перебирает nonce в диапазоне [nonce_start, nonce_end), считает SHA256d.
+
+ Если хеш ≤ target — кладёт ``(nonce_hex_be, hash_hex_be, extranonce2)``
+ в ``found_queue``. Каждые ``HASHES_PER_TICK`` хешей:
+ - проверяет ``stop_event`` (рано выйти при смене job / Ctrl+C);
+ - инкрементирует ``hashes_counter`` (для расчёта EMA-хешрейта в main).
+
+ Сам ``submit`` делается из main process — здесь только находка.
+ Сигнатура полностью pickle-able: bytes/int/str + примитивы mp.
+ """
+ local_hashes = 0 # копим локально, чтобы реже дёргать Lock на Value
+ nonce = nonce_start
+ try:
+ while nonce < nonce_end:
+ # Хвост блока: 4 байта nonce, LE.
+ header = header_base + struct.pack("I", nonce).hex()
+ hash_hex = h[::-1].hex()
+ # put не блокирует процесс надолго: очередь почти всегда пуста.
+ found_queue.put((nonce_hex, hash_hex, extranonce2))
+
+ nonce += 1
+ local_hashes += 1
+
+ if local_hashes >= HASHES_PER_TICK:
+ # Сливаем локальный счётчик в общий и проверяем stop.
+ with hashes_counter.get_lock():
+ hashes_counter.value += local_hashes
+ local_hashes = 0
+ if stop_event.is_set():
+ return
+ finally:
+ # Не теряем хвостовые хеши: дольём в общий счётчик.
+ if local_hashes:
+ with hashes_counter.get_lock():
+ hashes_counter.value += local_hashes
+
+
+# ─────────────────────── оркестрация пула ───────────────────────
+
+
+def start_pool(
+ n_workers: int,
+ header_base: bytes,
+ target: int,
+ extranonce2: str,
+) -> tuple:
+ """
+ Поднимает ``n_workers`` процессов, делящих [0, 2^32) поровну.
+
+ Возвращает кортеж ``(processes, found_queue, hashes_counter, stop_event)``.
+ Все объекты IPC создаются здесь, чтобы main process был их единственным
+ владельцем — это упрощает корректный teardown в ``stop_pool``.
+ """
+ n_workers = max(1, int(n_workers))
+ nonce_space = 1 << 32
+ step = nonce_space // n_workers
+
+ found_queue: mp.Queue = mp.Queue()
+ hashes_counter = mp.Value("Q", 0) # uint64, lock=True по умолчанию
+ stop_event = mp.Event()
+
+ processes: list[mp.Process] = []
+ for i in range(n_workers):
+ nonce_start = i * step
+ # Последнему отдаём «остаток», чтобы покрыть всё пространство nonce.
+ nonce_end = nonce_space if i == n_workers - 1 else (i + 1) * step
+ p = mp.Process(
+ target=worker,
+ name=f"hope-hash-worker-{i}",
+ args=(
+ i, header_base, target, nonce_start, nonce_end,
+ extranonce2, found_queue, hashes_counter, stop_event,
+ ),
+ daemon=False,
+ )
+ p.start()
+ processes.append(p)
+
+ logger.info(
+ f"[pool] стартовал {n_workers} воркер(ов), "
+ f"шаг nonce-пространства = {step:#x}"
+ )
+ return processes, found_queue, hashes_counter, stop_event
+
+
+def stop_pool(
+ processes: list,
+ found_queue: "mp.Queue",
+ stop_event: "mp.synchronize.Event",
+ join_timeout: float = 5.0,
+) -> None:
+ """
+ Аккуратно гасит пул: set stop_event → join → terminate (если зависли) →
+ drain очереди. Очередь обязательно нужно осушить ДО join, иначе
+ дочерние процессы могут заблокироваться на ``Queue.put`` под капотом
+ feeder-нити (deadlock на Windows встречается особенно охотно).
+ """
+ stop_event.set()
+
+ # Сначала пытаемся вытащить «уже найденные» шары — чтобы не потерялись.
+ drained: list[tuple] = []
+ deadline = time.time() + 0.2
+ while time.time() < deadline:
+ try:
+ drained.append(found_queue.get_nowait())
+ except Exception:
+ break
+
+ for p in processes:
+ p.join(timeout=join_timeout)
+ for p in processes:
+ if p.is_alive():
+ logger.warning(f"[pool] {p.name} не вышел за {join_timeout}с — terminate")
+ p.terminate()
+ p.join(timeout=2.0)
+
+ # Закрываем queue, чтобы освободить ресурсы (важно на Windows).
+ try:
+ found_queue.close()
+ found_queue.join_thread()
+ except Exception:
+ pass
+
+ if drained:
+ logger.info(f"[pool] при остановке слили {len(drained)} находок из очереди")
diff --git a/src/hope_hash/storage.py b/src/hope_hash/storage.py
new file mode 100644
index 0000000..3692432
--- /dev/null
+++ b/src/hope_hash/storage.py
@@ -0,0 +1,175 @@
+"""Persistent журнал принятых шаров и найденных блоков. SQLite, stdlib only."""
+
+import logging
+import sqlite3
+import threading
+import time
+from pathlib import Path
+
+# Единый logger пакета (см. _logging.py). Тэг [storage] добавляем в сообщения.
+logger = logging.getLogger("hope_hash")
+
+DEFAULT_DB_PATH = Path("hope_hash.db")
+
+# Схема описывает две таблицы: shares (журнал хешей) и sessions (запуски майнера).
+# `IF NOT EXISTS` делает инициализацию идемпотентной — можно открывать БД повторно.
+_SCHEMA = """
+CREATE TABLE IF NOT EXISTS shares (
+ id INTEGER PRIMARY KEY AUTOINCREMENT,
+ ts REAL NOT NULL, -- unix timestamp (time.time())
+ job_id TEXT NOT NULL,
+ nonce_hex TEXT NOT NULL,
+ hash_hex TEXT NOT NULL,
+ difficulty REAL NOT NULL,
+ accepted INTEGER NOT NULL, -- 0/1
+ is_block INTEGER NOT NULL DEFAULT 0 -- 0/1, true = настоящий блок (не шар)
+);
+CREATE INDEX IF NOT EXISTS idx_shares_ts ON shares(ts);
+CREATE INDEX IF NOT EXISTS idx_shares_accepted ON shares(accepted);
+
+CREATE TABLE IF NOT EXISTS sessions (
+ id INTEGER PRIMARY KEY AUTOINCREMENT,
+ started_at REAL NOT NULL,
+ ended_at REAL,
+ pool_host TEXT NOT NULL,
+ btc_address TEXT NOT NULL,
+ worker_name TEXT NOT NULL
+);
+"""
+
+
+class ShareStore:
+ """Потокобезопасный фасад над SQLite. Ленивая инициализация схемы."""
+
+ def __init__(self, db_path: str | Path = DEFAULT_DB_PATH):
+ self.db_path = Path(db_path)
+ # Один лок на всё соединение: SQLite сам сериализует запись, но мы хотим
+ # ещё и атомарность последовательностей execute+commit на уровне Python.
+ self._lock = threading.Lock()
+ # check_same_thread=False — соединение шарим между нитями майнера.
+ self._conn = sqlite3.connect(str(self.db_path), check_same_thread=False)
+ # WAL включаем для параллельных читателей и более устойчивых записей.
+ # Если БД на платформе/ФС, где WAL не работает — падать не должны.
+ try:
+ self._conn.execute("PRAGMA journal_mode=WAL")
+ except sqlite3.DatabaseError:
+ pass
+ self._conn.executescript(_SCHEMA)
+ self._conn.commit()
+ self._closed = False
+ logger.info("[storage] открыта БД %s", self.db_path)
+
+ def record_share(
+ self,
+ job_id: str,
+ nonce_hex: str,
+ hash_hex: str,
+ difficulty: float,
+ accepted: bool = True,
+ is_block: bool = False,
+ ts: float | None = None,
+ ) -> int:
+ """Записывает шар. Возвращает id записи."""
+ with self._lock:
+ cur = self._conn.execute(
+ "INSERT INTO shares (ts, job_id, nonce_hex, hash_hex, difficulty, accepted, is_block) "
+ "VALUES (?, ?, ?, ?, ?, ?, ?)",
+ (
+ ts if ts is not None else time.time(),
+ job_id,
+ nonce_hex,
+ hash_hex,
+ difficulty,
+ int(accepted),
+ int(is_block),
+ ),
+ )
+ self._conn.commit()
+ row_id = cur.lastrowid
+ # Лог за пределами лока — sqlite-write не должен блокироваться форматом.
+ if is_block:
+ logger.info("[storage] BLOCK! job=%s id=%s", job_id, row_id)
+ else:
+ logger.info(
+ "[storage] share job=%s accepted=%s id=%s", job_id, accepted, row_id
+ )
+ return row_id
+
+ def start_session(self, pool_host: str, btc_address: str, worker_name: str) -> int:
+ """Регистрирует начало сессии. Возвращает session_id."""
+ with self._lock:
+ cur = self._conn.execute(
+ "INSERT INTO sessions (started_at, pool_host, btc_address, worker_name) "
+ "VALUES (?, ?, ?, ?)",
+ (time.time(), pool_host, btc_address, worker_name),
+ )
+ self._conn.commit()
+ session_id = cur.lastrowid
+ logger.info(
+ "[storage] начата сессия id=%s pool=%s worker=%s",
+ session_id,
+ pool_host,
+ worker_name,
+ )
+ return session_id
+
+ def end_session(self, session_id: int) -> None:
+ """Помечает сессию как завершённую."""
+ with self._lock:
+ self._conn.execute(
+ "UPDATE sessions SET ended_at=? WHERE id=?",
+ (time.time(), session_id),
+ )
+ self._conn.commit()
+ logger.info("[storage] завершена сессия id=%s", session_id)
+
+ def total_shares(self, accepted_only: bool = True) -> int:
+ """Сколько шаров записано (всего/принятых)."""
+ with self._lock:
+ if accepted_only:
+ cur = self._conn.execute("SELECT COUNT(*) FROM shares WHERE accepted=1")
+ else:
+ cur = self._conn.execute("SELECT COUNT(*) FROM shares")
+ (count,) = cur.fetchone()
+ return int(count)
+
+ def shares_per_hour(self, hours: int = 24) -> float:
+ """Среднее число шаров в час за последние N часов."""
+ if hours <= 0:
+ return 0.0
+ cutoff = time.time() - hours * 3600
+ with self._lock:
+ cur = self._conn.execute(
+ "SELECT COUNT(*) FROM shares WHERE accepted=1 AND ts >= ?",
+ (cutoff,),
+ )
+ (count,) = cur.fetchone()
+ if count == 0:
+ return 0.0
+ return float(count) / float(hours)
+
+ def close(self) -> None:
+ """Закрывает соединение. Идемпотентно."""
+ # Проверка флага без лока: повторный close() из другой нити безопасен,
+ # потому что sqlite3.Connection.close() сам потокобезопасен, а двойной
+ # commit на закрытом соединении ловится try/except ниже.
+ if self._closed:
+ return
+ try:
+ with self._lock:
+ if self._closed:
+ return
+ try:
+ self._conn.commit()
+ except sqlite3.Error:
+ pass
+ try:
+ self._conn.close()
+ except sqlite3.Error:
+ pass
+ self._closed = True
+ except Exception:
+ # Закрытие должно быть максимально терпимым: не маскируем баги, но
+ # не падаем в финализаторах вызывающего кода.
+ self._closed = True
+ logger.info("[storage] БД закрыта %s", self.db_path)
diff --git a/tests/test_metrics.py b/tests/test_metrics.py
new file mode 100644
index 0000000..95cdb60
--- /dev/null
+++ b/tests/test_metrics.py
@@ -0,0 +1,156 @@
+"""Юнит-тесты для metrics — Prometheus экспортёр."""
+
+import socket
+import unittest
+import urllib.error
+import urllib.request
+
+from hope_hash.metrics import Metrics, MetricsServer
+
+
+def _free_port() -> int:
+ """Захватывает свободный порт у ОС, возвращает его.
+
+ Между close() и последующим bind() есть микро-окно гонки, но для
+ локальных юнит-тестов это допустимо.
+ """
+ s = socket.socket()
+ s.bind(("127.0.0.1", 0))
+ port = s.getsockname()[1]
+ s.close()
+ return port
+
+
+class TestMetricsRegistry(unittest.TestCase):
+ def test_counter_increments(self) -> None:
+ m = Metrics()
+ m.counter_inc("foo", 1)
+ m.counter_inc("foo", 2)
+ body = m.render().decode()
+ self.assertIn("foo 3", body)
+
+ def test_counter_default_step_is_one(self) -> None:
+ m = Metrics()
+ m.counter_inc("hits")
+ m.counter_inc("hits")
+ m.counter_inc("hits")
+ self.assertIn("hits 3", m.render().decode())
+
+ def test_counter_rejects_negative(self) -> None:
+ m = Metrics()
+ with self.assertRaises(ValueError):
+ m.counter_inc("bad", -1)
+
+ def test_gauge_overwrites(self) -> None:
+ m = Metrics()
+ m.gauge_set("hashrate", 100.0)
+ m.gauge_set("hashrate", 152334.5)
+ body = m.render().decode()
+ self.assertIn("hashrate 152334.5", body)
+ self.assertNotIn("hashrate 100", body)
+
+ def test_render_includes_help_and_type(self) -> None:
+ m = Metrics()
+ m.counter_inc("shares_total", 1, help="Total accepted shares")
+ m.gauge_set("hashrate_hps", 42.0, help="Current hashrate in H/s")
+ body = m.render().decode()
+ self.assertIn("# HELP shares_total Total accepted shares", body)
+ self.assertIn("# TYPE shares_total counter", body)
+ self.assertIn("# HELP hashrate_hps Current hashrate in H/s", body)
+ self.assertIn("# TYPE hashrate_hps gauge", body)
+
+ def test_render_includes_uptime(self) -> None:
+ m = Metrics()
+ body = m.render().decode()
+ self.assertIn("# TYPE hopehash_uptime_seconds gauge", body)
+ self.assertIn("hopehash_uptime_seconds ", body)
+
+ def test_render_returns_bytes(self) -> None:
+ m = Metrics()
+ m.counter_inc("foo")
+ out = m.render()
+ self.assertIsInstance(out, bytes)
+ # должно валидно декодироваться в UTF-8
+ out.decode("utf-8")
+
+ def test_invalid_metric_name_sanitized(self) -> None:
+ m = Metrics()
+ m.counter_inc("foo-bar.baz", 7)
+ body = m.render().decode()
+ self.assertIn("foo_bar_baz 7", body)
+ self.assertNotIn("foo-bar.baz", body)
+
+ def test_thread_safety_concurrent_inc(self) -> None:
+ # Параллельные инкременты не должны терять обновления.
+ import threading
+
+ m = Metrics()
+
+ def worker() -> None:
+ for _ in range(1000):
+ m.counter_inc("c")
+
+ threads = [threading.Thread(target=worker) for _ in range(5)]
+ for t in threads:
+ t.start()
+ for t in threads:
+ t.join()
+ self.assertIn("c 5000", m.render().decode())
+
+
+class TestMetricsServer(unittest.TestCase):
+ def setUp(self) -> None:
+ self.metrics = Metrics()
+ self.metrics.counter_inc("test_counter", 5, help="test counter")
+ self.server = MetricsServer(self.metrics, port=_free_port())
+ self.server.start()
+
+ def tearDown(self) -> None:
+ self.server.stop()
+
+ def test_metrics_endpoint(self) -> None:
+ with urllib.request.urlopen(self.server.url, timeout=2) as resp:
+ body = resp.read().decode()
+ status = resp.status
+ ctype = resp.headers.get("Content-Type", "")
+ self.assertEqual(status, 200)
+ self.assertIn("test_counter 5", body)
+ self.assertIn("text/plain", ctype)
+ self.assertIn("version=0.0.4", ctype)
+
+ def test_other_path_returns_404(self) -> None:
+ url = self.server.url.replace("/metrics", "/foo")
+ with self.assertRaises(urllib.error.HTTPError) as ctx:
+ urllib.request.urlopen(url, timeout=2)
+ self.assertEqual(ctx.exception.code, 404)
+
+ def test_root_path_returns_404(self) -> None:
+ url = f"http://{self.server.host}:{self.server.port}/"
+ with self.assertRaises(urllib.error.HTTPError) as ctx:
+ urllib.request.urlopen(url, timeout=2)
+ self.assertEqual(ctx.exception.code, 404)
+
+ def test_stop_is_idempotent(self) -> None:
+ self.server.stop()
+ self.server.stop() # повторный stop не должен падать
+
+ def test_double_start_is_idempotent(self) -> None:
+ # уже запущен в setUp; повторный start не должен падать или ронять сервер.
+ self.server.start()
+ with urllib.request.urlopen(self.server.url, timeout=2) as resp:
+ self.assertEqual(resp.status, 200)
+
+ def test_metrics_reflect_live_updates(self) -> None:
+ # После старта сервер должен отдавать актуальное состояние регистра.
+ self.metrics.gauge_set("live_gauge", 99.5)
+ with urllib.request.urlopen(self.server.url, timeout=2) as resp:
+ body = resp.read().decode()
+ self.assertIn("live_gauge 99.5", body)
+
+ def test_url_property(self) -> None:
+ self.assertTrue(self.server.url.startswith("http://127.0.0.1:"))
+ self.assertTrue(self.server.url.endswith("/metrics"))
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/tests/test_notifier.py b/tests/test_notifier.py
new file mode 100644
index 0000000..a13c05b
--- /dev/null
+++ b/tests/test_notifier.py
@@ -0,0 +1,266 @@
+"""Юнит-тесты для notifier — Telegram через urllib.
+
+Сетевые вызовы НЕ делаем. Мокаем urllib.request.urlopen через unittest.mock.
+"""
+
+import os
+import unittest
+import urllib.error
+from unittest.mock import patch, MagicMock
+
+from hope_hash.notifier import TelegramNotifier
+
+
+def _make_mock_response(status: int = 200) -> MagicMock:
+ """Хелпер: собирает мок-объект, поддерживающий with-протокол urlopen."""
+ mock_resp = MagicMock()
+ mock_resp.status = status
+ mock_resp.__enter__ = MagicMock(return_value=mock_resp)
+ mock_resp.__exit__ = MagicMock(return_value=None)
+ return mock_resp
+
+
+class TestNotifierDisabled(unittest.TestCase):
+ """Поведение в disabled-режиме: всё должно быть no-op без падений."""
+
+ def test_no_token_disabled(self):
+ n = TelegramNotifier(token=None, chat_id="x")
+ self.assertFalse(n.enabled)
+ n.notify("hello") # no-op
+ n.shutdown()
+
+ def test_empty_chat_id_disabled(self):
+ # Пустая строка — это «не задано» по нашему контракту.
+ n = TelegramNotifier(token="123:ABC", chat_id="")
+ self.assertFalse(n.enabled)
+ n.notify("hello")
+ n.shutdown()
+
+ def test_both_none_disabled(self):
+ n = TelegramNotifier(token=None, chat_id=None)
+ self.assertFalse(n.enabled)
+ n.shutdown()
+
+ def test_disabled_methods_dont_throw(self):
+ # Все notify_* методы должны не падать в disabled-режиме.
+ n = TelegramNotifier()
+ self.assertFalse(n.enabled)
+ n.notify("plain")
+ n.notify_started("bc1qxyz...", "worker1")
+ n.notify_stopped()
+ n.notify_disconnected("timeout")
+ n.notify_reconnected()
+ n.notify_share_accepted("jobA", 1.5)
+ n.notify_block_found("0000abcd" * 8, 999999)
+ n.notify_block_found("0000abcd" * 8) # без height
+ n.shutdown()
+ # Повторный shutdown идемпотентен.
+ n.shutdown()
+
+ def test_from_env_disabled_when_no_vars(self):
+ # Без переменных окружения — disabled.
+ with patch.dict(os.environ, {}, clear=True):
+ n = TelegramNotifier.from_env()
+ self.assertFalse(n.enabled)
+ n.shutdown()
+
+ def test_from_env_enabled_when_vars_set(self):
+ env = {
+ "HOPE_HASH_TELEGRAM_TOKEN": "111:AAA",
+ "HOPE_HASH_TELEGRAM_CHAT_ID": "999",
+ }
+ with patch.dict(os.environ, env, clear=True):
+ with patch("hope_hash.notifier.urllib.request.urlopen") as mock_urlopen:
+ mock_urlopen.return_value = _make_mock_response()
+ n = TelegramNotifier.from_env()
+ self.assertTrue(n.enabled)
+ self.assertEqual(n.token, "111:AAA")
+ self.assertEqual(n.chat_id, "999")
+ n.shutdown()
+
+
+class TestNotifierEnabled(unittest.TestCase):
+ """Поведение в enabled-режиме: проверяем формат вызовов и текстов."""
+
+ @patch("hope_hash.notifier.urllib.request.urlopen")
+ def test_send_calls_telegram_api(self, mock_urlopen):
+ mock_urlopen.return_value = _make_mock_response()
+
+ n = TelegramNotifier(token="123:ABC", chat_id="456")
+ n.notify("test message")
+ n.shutdown()
+
+ self.assertTrue(mock_urlopen.called)
+ req = mock_urlopen.call_args[0][0]
+ # URL содержит токен — это полный путь к sendMessage.
+ self.assertIn("123:ABC", req.full_url)
+ self.assertIn("api.telegram.org", req.full_url)
+ # Метод POST.
+ self.assertEqual(req.get_method(), "POST")
+ # Body содержит chat_id и text в urlencoded виде.
+ body = req.data.decode("utf-8")
+ self.assertIn("chat_id=456", body)
+ self.assertIn("text=test+message", body)
+
+ @patch("hope_hash.notifier.urllib.request.urlopen")
+ def test_share_accepted_message_format(self, mock_urlopen):
+ mock_urlopen.return_value = _make_mock_response()
+
+ n = TelegramNotifier(token="t", chat_id="c")
+ n.notify_share_accepted(job_id="abc123", difficulty=2.5)
+ n.shutdown()
+
+ body = mock_urlopen.call_args[0][0].data.decode("utf-8")
+ # urlencode заменяет пробелы на + и не-ASCII на %XX,
+ # достаточно проверить ключевые подстроки.
+ self.assertIn("abc123", body)
+ self.assertIn("2.5", body)
+
+ @patch("hope_hash.notifier.urllib.request.urlopen")
+ def test_block_found_message_format(self, mock_urlopen):
+ mock_urlopen.return_value = _make_mock_response()
+
+ n = TelegramNotifier(token="t", chat_id="c")
+ block_hash = "00000000000000000007abc123def456" + "ff" * 16
+ n.notify_block_found(hash_hex=block_hash, height=800000)
+ n.shutdown()
+
+ body = mock_urlopen.call_args[0][0].data.decode("utf-8")
+ # Высота попадает в текст.
+ self.assertIn("800000", body)
+ # Префикс хеша попадает (первые 32 hex-символа).
+ self.assertIn(block_hash[:32], body)
+
+ @patch("hope_hash.notifier.urllib.request.urlopen")
+ def test_block_found_without_height(self, mock_urlopen):
+ mock_urlopen.return_value = _make_mock_response()
+
+ n = TelegramNotifier(token="t", chat_id="c")
+ n.notify_block_found(hash_hex="deadbeef" * 8)
+ n.shutdown()
+
+ self.assertTrue(mock_urlopen.called)
+
+ @patch("hope_hash.notifier.urllib.request.urlopen")
+ def test_shutdown_drains_queue(self, mock_urlopen):
+ # Кладём 5 сообщений → shutdown → проверяем что все 5 ушли.
+ mock_urlopen.return_value = _make_mock_response()
+
+ n = TelegramNotifier(token="t", chat_id="c")
+ for i in range(5):
+ n.notify(f"msg-{i}")
+ n.shutdown()
+
+ # Каждый notify → один urlopen. Sentinel(None) уходит без HTTP-вызова.
+ self.assertEqual(mock_urlopen.call_count, 5)
+
+ @patch("hope_hash.notifier.urllib.request.urlopen")
+ def test_send_failure_logged_not_raised(self, mock_urlopen):
+ # urlopen бросает URLError → notify не должен падать,
+ # ошибка должна быть в логе.
+ mock_urlopen.side_effect = urllib.error.URLError("network down")
+
+ n = TelegramNotifier(token="t", chat_id="c")
+ with self.assertLogs("hope_hash", level="WARNING") as cm:
+ n.notify("will fail")
+ n.shutdown()
+
+ # В логах есть упоминание ошибки отправки.
+ self.assertTrue(
+ any("ошибка отправки" in line for line in cm.output),
+ f"expected error log, got: {cm.output}",
+ )
+
+ @patch("hope_hash.notifier.urllib.request.urlopen")
+ def test_worker_survives_failure_and_continues(self, mock_urlopen):
+ # После одного сбоя следующее сообщение должно уйти — нить не падает.
+ mock_urlopen.side_effect = [
+ urllib.error.URLError("transient"),
+ _make_mock_response(),
+ _make_mock_response(),
+ ]
+
+ n = TelegramNotifier(token="t", chat_id="c")
+ n.notify("first-fails")
+ n.notify("second-ok")
+ n.notify("third-ok")
+ n.shutdown()
+
+ # Все три попытки сделаны, нить не умерла на первой.
+ self.assertEqual(mock_urlopen.call_count, 3)
+
+ @patch("hope_hash.notifier.urllib.request.urlopen")
+ def test_http_error_status_raised_internally(self, mock_urlopen):
+ # status >= 400 без HTTPError-исключения — наш код должен сам кинуть RuntimeError,
+ # которое поймает воркер и залогирует.
+ mock_urlopen.return_value = _make_mock_response(status=500)
+
+ n = TelegramNotifier(token="t", chat_id="c")
+ with self.assertLogs("hope_hash", level="WARNING") as cm:
+ n.notify("oops")
+ n.shutdown()
+
+ self.assertTrue(
+ any("ошибка отправки" in line for line in cm.output),
+ f"expected error log on 500, got: {cm.output}",
+ )
+
+ @patch("hope_hash.notifier.urllib.request.urlopen")
+ def test_queue_full_drops_message(self, mock_urlopen):
+ # Проверяем что переполнение очереди → сообщение отбрасывается с warning.
+ # Чтобы не зависеть от расписания воркера, тестируем put_nowait напрямую:
+ # заполняем очередь руками, затем убеждаемся что notify() ловит queue.Full.
+ import threading as _t
+
+ gate_in = _t.Event()
+ gate_release = _t.Event()
+
+ def blocking_open(*a, **kw):
+ # Сигналим тесту что воркер уже занят первым сообщением,
+ # затем висим до явного релиза.
+ gate_in.set()
+ gate_release.wait(timeout=3.0)
+ return _make_mock_response()
+
+ mock_urlopen.side_effect = blocking_open
+
+ n = TelegramNotifier(token="t", chat_id="c", queue_maxsize=2)
+ # Первое уходит в воркер: ждём подтверждения что он застрял в urlopen.
+ n.notify("a")
+ self.assertTrue(gate_in.wait(timeout=2.0), "worker did not start")
+ # Теперь воркер сидит в blocking_open, очередь пуста.
+ # Заполняем её до maxsize=2 — оба put должны пройти.
+ n.notify("b")
+ n.notify("c")
+ # Третий — гарантированно уже не влезет, ловим warning.
+ with self.assertLogs("hope_hash", level="WARNING") as cm:
+ n.notify("d-overflow")
+
+ self.assertTrue(
+ any("переполнена" in line for line in cm.output),
+ f"expected overflow warning, got: {cm.output}",
+ )
+ # Освобождаем воркер и корректно завершаем.
+ gate_release.set()
+ n.shutdown()
+
+ @patch("hope_hash.notifier.urllib.request.urlopen")
+ def test_lifecycle_messages(self, mock_urlopen):
+ # Smoke-тест на все lifecycle-методы: started/stopped/disconnected/reconnected.
+ mock_urlopen.return_value = _make_mock_response()
+
+ n = TelegramNotifier(token="t", chat_id="c")
+ n.notify_started("bc1qexampleaddr", "worker-7")
+ n.notify_disconnected("connection reset")
+ n.notify_reconnected()
+ n.notify_stopped()
+ n.shutdown()
+
+ self.assertEqual(mock_urlopen.call_count, 4)
+ # Проверяем что worker-name попал в первое сообщение.
+ first_body = mock_urlopen.call_args_list[0][0][0].data.decode("utf-8")
+ self.assertIn("worker-7", first_body)
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/tests/test_storage.py b/tests/test_storage.py
new file mode 100644
index 0000000..7f0d7c8
--- /dev/null
+++ b/tests/test_storage.py
@@ -0,0 +1,120 @@
+"""Юнит-тесты для storage.ShareStore — журнал шаров."""
+
+import tempfile
+import threading
+import time
+import unittest
+from pathlib import Path
+
+from hope_hash.storage import ShareStore
+
+
+class TestShareStore(unittest.TestCase):
+ def setUp(self):
+ # Каждый тест в своей временной директории — БД не утечёт в репо.
+ self.tmpdir = tempfile.TemporaryDirectory()
+ self.db_path = Path(self.tmpdir.name) / "test.db"
+ self.store = ShareStore(self.db_path)
+
+ def tearDown(self):
+ self.store.close()
+ self.tmpdir.cleanup()
+
+ def _share(self, accepted=True, is_block=False, ts=None, job_id="job1"):
+ """Хелпер: вставляет один шар с дефолтными значениями."""
+ return self.store.record_share(
+ job_id=job_id,
+ nonce_hex="deadbeef",
+ hash_hex="00" * 32,
+ difficulty=1.0,
+ accepted=accepted,
+ is_block=is_block,
+ ts=ts,
+ )
+
+ def test_record_share_returns_id(self):
+ rid1 = self._share()
+ rid2 = self._share()
+ self.assertIsInstance(rid1, int)
+ self.assertEqual(rid2, rid1 + 1)
+
+ def test_total_shares_initially_zero(self):
+ self.assertEqual(self.store.total_shares(), 0)
+ self.assertEqual(self.store.total_shares(accepted_only=False), 0)
+
+ def test_total_shares_counts_accepted(self):
+ # 2 accepted + 1 rejected → total_shares(accepted_only=True) == 2
+ self._share(accepted=True)
+ self._share(accepted=True)
+ self._share(accepted=False)
+ self.assertEqual(self.store.total_shares(accepted_only=True), 2)
+ self.assertEqual(self.store.total_shares(accepted_only=False), 3)
+
+ def test_shares_per_hour_window(self):
+ now = time.time()
+ # Свежие — попадают в окно 24ч.
+ self._share(ts=now - 60)
+ self._share(ts=now - 600)
+ # Старый — за пределами окна 24ч (двое суток назад).
+ self._share(ts=now - 2 * 24 * 3600)
+ # 2 шара / 24 часа ≈ 0.0833.
+ rate = self.store.shares_per_hour(hours=24)
+ self.assertAlmostEqual(rate, 2.0 / 24.0, places=5)
+
+ def test_shares_per_hour_zero_when_empty(self):
+ # Пустая БД и нулевые/отрицательные часы — должны давать 0.0 без ошибок.
+ self.assertEqual(self.store.shares_per_hour(hours=24), 0.0)
+ self.assertEqual(self.store.shares_per_hour(hours=0), 0.0)
+
+ def test_session_lifecycle(self):
+ sid = self.store.start_session("pool.example:3333", "btc1qaddr", "worker1")
+ self.assertIsInstance(sid, int)
+ # Читаем напрямую через само store-соединение, чтобы не плодить хендлы,
+ # которые на Windows блокируют удаление файла в tearDown.
+ cur = self.store._conn.execute(
+ "SELECT started_at, ended_at FROM sessions WHERE id=?", (sid,)
+ )
+ started, ended = cur.fetchone()
+ self.assertIsNotNone(started)
+ self.assertIsNone(ended)
+ self.store.end_session(sid)
+ cur = self.store._conn.execute(
+ "SELECT ended_at FROM sessions WHERE id=?", (sid,)
+ )
+ (ended,) = cur.fetchone()
+ self.assertIsNotNone(ended)
+
+ def test_close_is_idempotent(self):
+ self.store.close()
+ self.store.close() # не должно падать
+
+ def test_is_block_flag_persists(self):
+ # Найденный блок — отдельный признак, отличный от accepted.
+ rid = self._share(is_block=True)
+ cur = self.store._conn.execute(
+ "SELECT is_block FROM shares WHERE id=?", (rid,)
+ )
+ (is_block,) = cur.fetchone()
+ self.assertEqual(is_block, 1)
+
+ def test_thread_safety(self):
+ # 4 нити пишут по 50 шаров каждая. После — total_shares == 200.
+ n_threads = 4
+ per_thread = 50
+
+ def worker(idx: int) -> None:
+ for k in range(per_thread):
+ self._share(job_id=f"job-{idx}-{k}")
+
+ threads = [
+ threading.Thread(target=worker, args=(i,)) for i in range(n_threads)
+ ]
+ for t in threads:
+ t.start()
+ for t in threads:
+ t.join()
+ self.assertEqual(self.store.total_shares(), n_threads * per_thread)
+
+
+if __name__ == "__main__":
+ unittest.main()