diff --git a/docs/en/guides/01-configuration.md b/docs/en/guides/01-configuration.md index f088bd90f..8f9120fe2 100644 --- a/docs/en/guides/01-configuration.md +++ b/docs/en/guides/01-configuration.md @@ -130,6 +130,28 @@ Embedding model configuration for vector search, supporting dense, sparse, and h `embedding.max_retries` only applies to transient errors such as `429`, `5xx`, timeouts, and connection failures. Permanent errors such as `400`, `401`, `403`, and `AccountOverdue` are not retried automatically. The backoff strategy is exponential backoff with jitter, starting at `0.5s` and capped at `8s`. +#### Embedding Circuit Breaker + +When the embedding provider experiences consecutive transient failures (e.g. `429`, `5xx`), OpenViking opens a circuit breaker to temporarily stop calling the provider and re-enqueue embedding tasks. After the base `reset_timeout`, it allows a probe request (HALF_OPEN). If the probe fails, the next `reset_timeout` is doubled (capped by `max_reset_timeout`). + +```json +{ + "embedding": { + "circuit_breaker": { + "failure_threshold": 5, + "reset_timeout": 60, + "max_reset_timeout": 600 + } + } +} +``` + +| Parameter | Type | Description | +|-----------|------|-------------| +| `circuit_breaker.failure_threshold` | int | Consecutive failures required to open the breaker (default: `5`) | +| `circuit_breaker.reset_timeout` | float | Base reset timeout in seconds (default: `60`) | +| `circuit_breaker.max_reset_timeout` | float | Maximum reset timeout in seconds when backing off (default: `600`) | + **Available Models** | Model | Dimension | Input Type | Notes | diff --git a/docs/zh/guides/01-configuration.md b/docs/zh/guides/01-configuration.md index ac75a2528..e7a8adb4b 100644 --- a/docs/zh/guides/01-configuration.md +++ b/docs/zh/guides/01-configuration.md @@ -132,6 +132,28 @@ OpenViking 使用 JSON 配置文件(`ov.conf`)进行设置。配置文件支 `embedding.max_retries` 仅对瞬时错误生效,例如 `429`、`5xx`、超时和连接错误;`400`、`401`、`403`、`AccountOverdue` 这类永久错误不会自动重试。退避策略为指数退避,初始延迟 `0.5s`,上限 `8s`,并带随机抖动。 +#### Embedding 熔断(Circuit Breaker) + +当 embedding provider 出现连续瞬时错误(如 `429`、`5xx`)时,OpenViking 会触发熔断,在一段时间内暂停调用 provider,并将 embedding 任务重新入队。超过基础 `reset_timeout` 后进入 HALF_OPEN,允许一次探测请求;如果探测失败,则下一次 `reset_timeout` 翻倍(上限为 `max_reset_timeout`)。 + +```json +{ + "embedding": { + "circuit_breaker": { + "failure_threshold": 5, + "reset_timeout": 60, + "max_reset_timeout": 600 + } + } +} +``` + +| 参数 | 类型 | 说明 | +|------|------|------| +| `circuit_breaker.failure_threshold` | int | 连续失败多少次后熔断(默认:`5`) | +| `circuit_breaker.reset_timeout` | float | 基础恢复等待时间(秒,默认:`60`) | +| `circuit_breaker.max_reset_timeout` | float | 指数退避后的最大恢复等待时间(秒,默认:`600`) | + **可用模型** | 模型 | 维度 | 输入类型 | 说明 | diff --git a/examples/ov.conf.example b/examples/ov.conf.example index 2ef493ada..1940bf274 100644 --- a/examples/ov.conf.example +++ b/examples/ov.conf.example @@ -43,6 +43,11 @@ "dimension": 1024, "provider": "volcengine", "input": "multimodal" + }, + "circuit_breaker": { + "failure_threshold": 5, + "reset_timeout": 60, + "max_reset_timeout": 600 } }, "embedding_ollama_example": { diff --git a/openviking/storage/collection_schemas.py b/openviking/storage/collection_schemas.py index a37276f6f..d79f9092a 100644 --- a/openviking/storage/collection_schemas.py +++ b/openviking/storage/collection_schemas.py @@ -11,6 +11,7 @@ import hashlib import json import threading +import time from contextlib import nullcontext from dataclasses import dataclass from typing import Any, Dict, List, Optional @@ -175,12 +176,35 @@ def __init__(self, vikingdb: VikingVectorIndexBackend): self._collection_name = config.storage.vectordb.name self._vector_dim = config.embedding.dimension self._initialize_embedder(config) - self._circuit_breaker = CircuitBreaker() + breaker_cfg = config.embedding.circuit_breaker + self._circuit_breaker = CircuitBreaker( + failure_threshold=breaker_cfg.failure_threshold, + reset_timeout=breaker_cfg.reset_timeout, + max_reset_timeout=breaker_cfg.max_reset_timeout, + ) + self._breaker_open_last_log_at = 0.0 + self._breaker_open_suppressed_count = 0 + self._breaker_open_log_interval = 30.0 def _initialize_embedder(self, config: "OpenVikingConfig"): """Initialize the embedder instance from config.""" self._embedder = config.embedding.get_embedder() + def _log_breaker_open_reenqueue_summary(self) -> None: + """Log a throttled warning when embeddings are re-enqueued due to an open circuit breaker.""" + now = time.monotonic() + if self._breaker_open_last_log_at == 0.0: + logger.warning("Embedding circuit breaker is open; re-enqueueing messages") + self._breaker_open_last_log_at = now + self._breaker_open_suppressed_count = 0 + return + + self._breaker_open_suppressed_count += 1 + if now - self._breaker_open_last_log_at >= self._breaker_open_log_interval: + logger.warning("Embedding circuit breaker is open; re-enqueueing messages") + self._breaker_open_last_log_at = now + self._breaker_open_suppressed_count = 0 + @classmethod def _merge_request_stats( cls, telemetry_id: str, processed: int = 0, error_count: int = 0 @@ -258,10 +282,10 @@ async def on_dequeue(self, data: Optional[Dict[str, Any]]) -> Optional[Dict[str, # Circuit breaker: if API is known-broken, re-enqueue and wait try: self._circuit_breaker.check() + self._breaker_open_last_log_at = 0.0 + self._breaker_open_suppressed_count = 0 except CircuitBreakerOpen: - logger.warning( - f"Circuit breaker is open, re-enqueueing embedding: {embedding_msg.id}" - ) + self._log_breaker_open_reenqueue_summary() if self._vikingdb.has_queue_manager: wait = self._circuit_breaker.retry_after if wait > 0: diff --git a/openviking/utils/circuit_breaker.py b/openviking/utils/circuit_breaker.py index cddd2a7fe..a479e780f 100644 --- a/openviking/utils/circuit_breaker.py +++ b/openviking/utils/circuit_breaker.py @@ -33,9 +33,17 @@ class CircuitBreaker: fails, the breaker reopens. """ - def __init__(self, failure_threshold: int = 5, reset_timeout: float = 300): + def __init__( + self, + failure_threshold: int = 5, + reset_timeout: float = 300, + max_reset_timeout: float | None = None, + ): self._failure_threshold = failure_threshold self._reset_timeout = reset_timeout + self._base_reset_timeout = reset_timeout + self._max_reset_timeout = reset_timeout if max_reset_timeout is None else max_reset_timeout + self._current_reset_timeout = reset_timeout self._lock = threading.Lock() self._state = _STATE_CLOSED self._failure_count = 0 @@ -50,12 +58,12 @@ def check(self) -> None: return # allow probe request # OPEN — check if timeout elapsed elapsed = time.monotonic() - self._last_failure_time - if elapsed >= self._reset_timeout: + if elapsed >= self._current_reset_timeout: self._state = _STATE_HALF_OPEN logger.info("Circuit breaker transitioning OPEN -> HALF_OPEN (timeout elapsed)") return raise CircuitBreakerOpen( - f"Circuit breaker is OPEN, retry after {self._reset_timeout - elapsed:.0f}s" + f"Circuit breaker is OPEN, retry after {self._current_reset_timeout - elapsed:.0f}s" ) @property @@ -67,7 +75,7 @@ def retry_after(self) -> float: with self._lock: if self._state != _STATE_OPEN: return 0 - remaining = self._reset_timeout - (time.monotonic() - self._last_failure_time) + remaining = self._current_reset_timeout - (time.monotonic() - self._last_failure_time) return min(max(remaining, 0), 30) def record_success(self) -> None: @@ -77,6 +85,7 @@ def record_success(self) -> None: logger.info("Circuit breaker transitioning HALF_OPEN -> CLOSED (probe succeeded)") self._failure_count = 0 self._state = _STATE_CLOSED + self._current_reset_timeout = self._base_reset_timeout def record_failure(self, error: Exception) -> None: """Record a failed API call. May trip the breaker.""" @@ -87,6 +96,10 @@ def record_failure(self, error: Exception) -> None: if self._state == _STATE_HALF_OPEN: self._state = _STATE_OPEN + self._current_reset_timeout = min( + self._current_reset_timeout * 2, + self._max_reset_timeout, + ) logger.info( f"Circuit breaker transitioning HALF_OPEN -> OPEN (probe failed: {error})" ) @@ -94,11 +107,13 @@ def record_failure(self, error: Exception) -> None: if error_class == "permanent": self._state = _STATE_OPEN + self._current_reset_timeout = self._base_reset_timeout logger.info(f"Circuit breaker tripped immediately on permanent error: {error}") return if self._failure_count >= self._failure_threshold: self._state = _STATE_OPEN + self._current_reset_timeout = self._base_reset_timeout logger.info( f"Circuit breaker tripped after {self._failure_count} consecutive " f"failures: {error}" diff --git a/openviking_cli/utils/config/embedding_config.py b/openviking_cli/utils/config/embedding_config.py index 2392198c9..ca9fdf29b 100644 --- a/openviking_cli/utils/config/embedding_config.py +++ b/openviking_cli/utils/config/embedding_config.py @@ -246,6 +246,30 @@ def get_effective_dimension(self) -> int: return 2048 +class EmbeddingCircuitBreakerConfig(BaseModel): + failure_threshold: int = Field( + default=5, + ge=1, + description="Consecutive failures required to open the embedding circuit breaker", + ) + reset_timeout: float = Field( + default=60.0, + gt=0, + description="Base circuit breaker reset timeout in seconds", + ) + max_reset_timeout: float = Field( + default=600.0, + gt=0, + description="Maximum circuit breaker reset timeout in seconds", + ) + + @model_validator(mode="after") + def validate_bounds(self): + if self.max_reset_timeout < self.reset_timeout: + raise ValueError("embedding.circuit_breaker.max_reset_timeout must be >= reset_timeout") + return self + + class EmbeddingConfig(BaseModel): """ Embedding configuration, supports OpenAI, VolcEngine, VikingDB, Jina, Gemini, Voyage, or LiteLLM APIs. @@ -261,6 +285,9 @@ class EmbeddingConfig(BaseModel): dense: Optional[EmbeddingModelConfig] = Field(default=None) sparse: Optional[EmbeddingModelConfig] = Field(default=None) hybrid: Optional[EmbeddingModelConfig] = Field(default=None) + circuit_breaker: EmbeddingCircuitBreakerConfig = Field( + default_factory=EmbeddingCircuitBreakerConfig + ) max_concurrent: int = Field( default=10, description="Maximum number of concurrent embedding requests" diff --git a/tests/storage/test_collection_schemas.py b/tests/storage/test_collection_schemas.py index 59618d22c..bc636e55d 100644 --- a/tests/storage/test_collection_schemas.py +++ b/tests/storage/test_collection_schemas.py @@ -4,6 +4,7 @@ import asyncio import inspect import json +import logging from types import SimpleNamespace import pytest @@ -34,6 +35,11 @@ def __init__(self, embedder: _DummyEmbedder, backend: str = "volcengine"): self.embedding = SimpleNamespace( dimension=2, get_embedder=lambda: embedder, + circuit_breaker=SimpleNamespace( + failure_threshold=5, + reset_timeout=60.0, + max_reset_timeout=600.0, + ), ) @@ -50,6 +56,29 @@ def _build_queue_payload() -> dict: return {"data": json.dumps(msg.to_dict())} +def test_embedding_handler_builds_circuit_breaker_from_config(monkeypatch): + class _DummyVikingDB: + is_closing = False + + embedder = _DummyEmbedder() + config = _DummyConfig(embedder) + config.embedding.circuit_breaker = SimpleNamespace( + failure_threshold=7, + reset_timeout=60.0, + max_reset_timeout=600.0, + ) + monkeypatch.setattr( + "openviking_cli.utils.config.get_openviking_config", + lambda: config, + ) + + handler = TextEmbeddingHandler(_DummyVikingDB()) + + assert handler._circuit_breaker._failure_threshold == 7 + assert handler._circuit_breaker._base_reset_timeout == 60.0 + assert handler._circuit_breaker._max_reset_timeout == 600.0 + + @pytest.mark.asyncio async def test_embedding_handler_skip_all_work_when_manager_is_closing(monkeypatch): class _ClosingVikingDB: @@ -79,6 +108,51 @@ async def upsert(self, _data, *, ctx): # pragma: no cover - should never run assert status["error"] == 0 +@pytest.mark.asyncio +async def test_embedding_handler_open_breaker_logs_summary_instead_of_per_item_warning( + monkeypatch, caplog +): + from openviking.utils.circuit_breaker import CircuitBreakerOpen + + class _QueueingVikingDB: + is_closing = False + has_queue_manager = True + + def __init__(self): + self.enqueued = [] + + async def enqueue_embedding_msg(self, msg): + self.enqueued.append(msg.id) + return None + + embedder = _DummyEmbedder() + monkeypatch.setattr( + "openviking_cli.utils.config.get_openviking_config", + lambda: _DummyConfig(embedder), + ) + + handler = TextEmbeddingHandler(_QueueingVikingDB()) + monkeypatch.setattr( + handler._circuit_breaker, + "check", + lambda: (_ for _ in ()).throw(CircuitBreakerOpen("open")), + ) + + import openviking.storage.collection_schemas as collection_schemas + + collection_schemas.logger.addHandler(caplog.handler) + collection_schemas.logger.setLevel(logging.WARNING) + try: + with caplog.at_level(logging.WARNING): + await handler.on_dequeue(_build_queue_payload()) + await handler.on_dequeue(_build_queue_payload()) + finally: + collection_schemas.logger.removeHandler(caplog.handler) + + warnings = [record.message for record in caplog.records if record.levelno == logging.WARNING] + assert warnings.count("Embedding circuit breaker is open; re-enqueueing messages") == 1 + + @pytest.mark.asyncio async def test_embedding_handler_treats_shutdown_write_lock_as_success(monkeypatch): class _ClosingDuringUpsertVikingDB: diff --git a/tests/utils/test_circuit_breaker.py b/tests/utils/test_circuit_breaker.py index db3bb51eb..d5d54f0b6 100644 --- a/tests/utils/test_circuit_breaker.py +++ b/tests/utils/test_circuit_breaker.py @@ -74,6 +74,43 @@ def test_circuit_breaker_half_open_failure_reopens(monkeypatch): cb.check() +def test_half_open_failure_doubles_reset_timeout(monkeypatch): + from openviking.utils.circuit_breaker import CircuitBreaker, CircuitBreakerOpen + + base = time.monotonic() + cb = CircuitBreaker(failure_threshold=1, reset_timeout=60, max_reset_timeout=240) + cb.record_failure(RuntimeError("429 TooManyRequests")) + + monkeypatch.setattr(time, "monotonic", lambda: base + 61) + cb.check() + cb.record_failure(RuntimeError("429 TooManyRequests")) + + assert cb._current_reset_timeout == 120 + + monkeypatch.setattr(time, "monotonic", lambda: base + 61 + 119) + with pytest.raises(CircuitBreakerOpen): + cb.check() + + +def test_half_open_success_resets_backoff(monkeypatch): + from openviking.utils.circuit_breaker import CircuitBreaker + + base = time.monotonic() + cb = CircuitBreaker(failure_threshold=1, reset_timeout=60, max_reset_timeout=240) + cb.record_failure(RuntimeError("500")) + + monkeypatch.setattr(time, "monotonic", lambda: base + 61) + cb.check() + cb.record_failure(RuntimeError("500 again")) + assert cb._current_reset_timeout == 120 + + monkeypatch.setattr(time, "monotonic", lambda: base + 61 + 121) + cb.check() + cb.record_success() + + assert cb._current_reset_timeout == 60 + + def test_permanent_error_trips_immediately(): from openviking.utils.circuit_breaker import CircuitBreaker, CircuitBreakerOpen