Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions docs/en/guides/01-configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,28 @@ Embedding model configuration for vector search, supporting dense, sparse, and h

`embedding.max_retries` only applies to transient errors such as `429`, `5xx`, timeouts, and connection failures. Permanent errors such as `400`, `401`, `403`, and `AccountOverdue` are not retried automatically. The backoff strategy is exponential backoff with jitter, starting at `0.5s` and capped at `8s`.

#### Embedding Circuit Breaker

When the embedding provider experiences consecutive transient failures (e.g. `429`, `5xx`), OpenViking opens a circuit breaker to temporarily stop calling the provider and re-enqueue embedding tasks. After the base `reset_timeout`, it allows a probe request (HALF_OPEN). If the probe fails, the next `reset_timeout` is doubled (capped by `max_reset_timeout`).

```json
{
"embedding": {
"circuit_breaker": {
"failure_threshold": 5,
"reset_timeout": 60,
"max_reset_timeout": 600
}
}
}
```

| Parameter | Type | Description |
|-----------|------|-------------|
| `circuit_breaker.failure_threshold` | int | Consecutive failures required to open the breaker (default: `5`) |
| `circuit_breaker.reset_timeout` | float | Base reset timeout in seconds (default: `60`) |
| `circuit_breaker.max_reset_timeout` | float | Maximum reset timeout in seconds when backing off (default: `600`) |

**Available Models**

| Model | Dimension | Input Type | Notes |
Expand Down
22 changes: 22 additions & 0 deletions docs/zh/guides/01-configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,28 @@ OpenViking 使用 JSON 配置文件(`ov.conf`)进行设置。配置文件支

`embedding.max_retries` 仅对瞬时错误生效,例如 `429`、`5xx`、超时和连接错误;`400`、`401`、`403`、`AccountOverdue` 这类永久错误不会自动重试。退避策略为指数退避,初始延迟 `0.5s`,上限 `8s`,并带随机抖动。

#### Embedding 熔断(Circuit Breaker)

当 embedding provider 出现连续瞬时错误(如 `429`、`5xx`)时,OpenViking 会触发熔断,在一段时间内暂停调用 provider,并将 embedding 任务重新入队。超过基础 `reset_timeout` 后进入 HALF_OPEN,允许一次探测请求;如果探测失败,则下一次 `reset_timeout` 翻倍(上限为 `max_reset_timeout`)。

```json
{
"embedding": {
"circuit_breaker": {
"failure_threshold": 5,
"reset_timeout": 60,
"max_reset_timeout": 600
}
}
}
```

| 参数 | 类型 | 说明 |
|------|------|------|
| `circuit_breaker.failure_threshold` | int | 连续失败多少次后熔断(默认:`5`) |
| `circuit_breaker.reset_timeout` | float | 基础恢复等待时间(秒,默认:`60`) |
| `circuit_breaker.max_reset_timeout` | float | 指数退避后的最大恢复等待时间(秒,默认:`600`) |

**可用模型**

| 模型 | 维度 | 输入类型 | 说明 |
Expand Down
5 changes: 5 additions & 0 deletions examples/ov.conf.example
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@
"dimension": 1024,
"provider": "volcengine",
"input": "multimodal"
},
"circuit_breaker": {
"failure_threshold": 5,
"reset_timeout": 60,
"max_reset_timeout": 600
}
},
"embedding_ollama_example": {
Expand Down
32 changes: 28 additions & 4 deletions openviking/storage/collection_schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import hashlib
import json
import threading
import time
from contextlib import nullcontext
from dataclasses import dataclass
from typing import Any, Dict, List, Optional
Expand Down Expand Up @@ -175,12 +176,35 @@ def __init__(self, vikingdb: VikingVectorIndexBackend):
self._collection_name = config.storage.vectordb.name
self._vector_dim = config.embedding.dimension
self._initialize_embedder(config)
self._circuit_breaker = CircuitBreaker()
breaker_cfg = config.embedding.circuit_breaker
self._circuit_breaker = CircuitBreaker(
failure_threshold=breaker_cfg.failure_threshold,
reset_timeout=breaker_cfg.reset_timeout,
max_reset_timeout=breaker_cfg.max_reset_timeout,
)
self._breaker_open_last_log_at = 0.0
self._breaker_open_suppressed_count = 0
self._breaker_open_log_interval = 30.0

def _initialize_embedder(self, config: "OpenVikingConfig"):
"""Initialize the embedder instance from config."""
self._embedder = config.embedding.get_embedder()

def _log_breaker_open_reenqueue_summary(self) -> None:
"""Log a throttled warning when embeddings are re-enqueued due to an open circuit breaker."""
now = time.monotonic()
if self._breaker_open_last_log_at == 0.0:
logger.warning("Embedding circuit breaker is open; re-enqueueing messages")
self._breaker_open_last_log_at = now
self._breaker_open_suppressed_count = 0
return

self._breaker_open_suppressed_count += 1
if now - self._breaker_open_last_log_at >= self._breaker_open_log_interval:
logger.warning("Embedding circuit breaker is open; re-enqueueing messages")
self._breaker_open_last_log_at = now
self._breaker_open_suppressed_count = 0

@classmethod
def _merge_request_stats(
cls, telemetry_id: str, processed: int = 0, error_count: int = 0
Expand Down Expand Up @@ -258,10 +282,10 @@ async def on_dequeue(self, data: Optional[Dict[str, Any]]) -> Optional[Dict[str,
# Circuit breaker: if API is known-broken, re-enqueue and wait
try:
self._circuit_breaker.check()
self._breaker_open_last_log_at = 0.0
self._breaker_open_suppressed_count = 0
except CircuitBreakerOpen:
logger.warning(
f"Circuit breaker is open, re-enqueueing embedding: {embedding_msg.id}"
)
self._log_breaker_open_reenqueue_summary()
if self._vikingdb.has_queue_manager:
wait = self._circuit_breaker.retry_after
if wait > 0:
Expand Down
23 changes: 19 additions & 4 deletions openviking/utils/circuit_breaker.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,17 @@ class CircuitBreaker:
fails, the breaker reopens.
"""

def __init__(self, failure_threshold: int = 5, reset_timeout: float = 300):
def __init__(
self,
failure_threshold: int = 5,
reset_timeout: float = 300,
max_reset_timeout: float | None = None,
):
self._failure_threshold = failure_threshold
self._reset_timeout = reset_timeout
self._base_reset_timeout = reset_timeout
self._max_reset_timeout = reset_timeout if max_reset_timeout is None else max_reset_timeout
self._current_reset_timeout = reset_timeout
self._lock = threading.Lock()
self._state = _STATE_CLOSED
self._failure_count = 0
Expand All @@ -50,12 +58,12 @@ def check(self) -> None:
return # allow probe request
# OPEN — check if timeout elapsed
elapsed = time.monotonic() - self._last_failure_time
if elapsed >= self._reset_timeout:
if elapsed >= self._current_reset_timeout:
self._state = _STATE_HALF_OPEN
logger.info("Circuit breaker transitioning OPEN -> HALF_OPEN (timeout elapsed)")
return
raise CircuitBreakerOpen(
f"Circuit breaker is OPEN, retry after {self._reset_timeout - elapsed:.0f}s"
f"Circuit breaker is OPEN, retry after {self._current_reset_timeout - elapsed:.0f}s"
)

@property
Expand All @@ -67,7 +75,7 @@ def retry_after(self) -> float:
with self._lock:
if self._state != _STATE_OPEN:
return 0
remaining = self._reset_timeout - (time.monotonic() - self._last_failure_time)
remaining = self._current_reset_timeout - (time.monotonic() - self._last_failure_time)
return min(max(remaining, 0), 30)

def record_success(self) -> None:
Expand All @@ -77,6 +85,7 @@ def record_success(self) -> None:
logger.info("Circuit breaker transitioning HALF_OPEN -> CLOSED (probe succeeded)")
self._failure_count = 0
self._state = _STATE_CLOSED
self._current_reset_timeout = self._base_reset_timeout

def record_failure(self, error: Exception) -> None:
"""Record a failed API call. May trip the breaker."""
Expand All @@ -87,18 +96,24 @@ def record_failure(self, error: Exception) -> None:

if self._state == _STATE_HALF_OPEN:
self._state = _STATE_OPEN
self._current_reset_timeout = min(
self._current_reset_timeout * 2,
self._max_reset_timeout,
)
logger.info(
f"Circuit breaker transitioning HALF_OPEN -> OPEN (probe failed: {error})"
)
return

if error_class == "permanent":
self._state = _STATE_OPEN
self._current_reset_timeout = self._base_reset_timeout
logger.info(f"Circuit breaker tripped immediately on permanent error: {error}")
return

if self._failure_count >= self._failure_threshold:
self._state = _STATE_OPEN
self._current_reset_timeout = self._base_reset_timeout
logger.info(
f"Circuit breaker tripped after {self._failure_count} consecutive "
f"failures: {error}"
Expand Down
27 changes: 27 additions & 0 deletions openviking_cli/utils/config/embedding_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,30 @@ def get_effective_dimension(self) -> int:
return 2048


class EmbeddingCircuitBreakerConfig(BaseModel):
failure_threshold: int = Field(
default=5,
ge=1,
description="Consecutive failures required to open the embedding circuit breaker",
)
reset_timeout: float = Field(
default=60.0,
gt=0,
description="Base circuit breaker reset timeout in seconds",
)
max_reset_timeout: float = Field(
default=600.0,
gt=0,
description="Maximum circuit breaker reset timeout in seconds",
)

@model_validator(mode="after")
def validate_bounds(self):
if self.max_reset_timeout < self.reset_timeout:
raise ValueError("embedding.circuit_breaker.max_reset_timeout must be >= reset_timeout")
return self


class EmbeddingConfig(BaseModel):
"""
Embedding configuration, supports OpenAI, VolcEngine, VikingDB, Jina, Gemini, Voyage, or LiteLLM APIs.
Expand All @@ -261,6 +285,9 @@ class EmbeddingConfig(BaseModel):
dense: Optional[EmbeddingModelConfig] = Field(default=None)
sparse: Optional[EmbeddingModelConfig] = Field(default=None)
hybrid: Optional[EmbeddingModelConfig] = Field(default=None)
circuit_breaker: EmbeddingCircuitBreakerConfig = Field(
default_factory=EmbeddingCircuitBreakerConfig
)

max_concurrent: int = Field(
default=10, description="Maximum number of concurrent embedding requests"
Expand Down
74 changes: 74 additions & 0 deletions tests/storage/test_collection_schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import asyncio
import inspect
import json
import logging
from types import SimpleNamespace

import pytest
Expand Down Expand Up @@ -34,6 +35,11 @@ def __init__(self, embedder: _DummyEmbedder, backend: str = "volcengine"):
self.embedding = SimpleNamespace(
dimension=2,
get_embedder=lambda: embedder,
circuit_breaker=SimpleNamespace(
failure_threshold=5,
reset_timeout=60.0,
max_reset_timeout=600.0,
),
)


Expand All @@ -50,6 +56,29 @@ def _build_queue_payload() -> dict:
return {"data": json.dumps(msg.to_dict())}


def test_embedding_handler_builds_circuit_breaker_from_config(monkeypatch):
class _DummyVikingDB:
is_closing = False

embedder = _DummyEmbedder()
config = _DummyConfig(embedder)
config.embedding.circuit_breaker = SimpleNamespace(
failure_threshold=7,
reset_timeout=60.0,
max_reset_timeout=600.0,
)
monkeypatch.setattr(
"openviking_cli.utils.config.get_openviking_config",
lambda: config,
)

handler = TextEmbeddingHandler(_DummyVikingDB())

assert handler._circuit_breaker._failure_threshold == 7
assert handler._circuit_breaker._base_reset_timeout == 60.0
assert handler._circuit_breaker._max_reset_timeout == 600.0


@pytest.mark.asyncio
async def test_embedding_handler_skip_all_work_when_manager_is_closing(monkeypatch):
class _ClosingVikingDB:
Expand Down Expand Up @@ -79,6 +108,51 @@ async def upsert(self, _data, *, ctx): # pragma: no cover - should never run
assert status["error"] == 0


@pytest.mark.asyncio
async def test_embedding_handler_open_breaker_logs_summary_instead_of_per_item_warning(
monkeypatch, caplog
):
from openviking.utils.circuit_breaker import CircuitBreakerOpen

class _QueueingVikingDB:
is_closing = False
has_queue_manager = True

def __init__(self):
self.enqueued = []

async def enqueue_embedding_msg(self, msg):
self.enqueued.append(msg.id)
return None

embedder = _DummyEmbedder()
monkeypatch.setattr(
"openviking_cli.utils.config.get_openviking_config",
lambda: _DummyConfig(embedder),
)

handler = TextEmbeddingHandler(_QueueingVikingDB())
monkeypatch.setattr(
handler._circuit_breaker,
"check",
lambda: (_ for _ in ()).throw(CircuitBreakerOpen("open")),
)

import openviking.storage.collection_schemas as collection_schemas

collection_schemas.logger.addHandler(caplog.handler)
collection_schemas.logger.setLevel(logging.WARNING)
try:
with caplog.at_level(logging.WARNING):
await handler.on_dequeue(_build_queue_payload())
await handler.on_dequeue(_build_queue_payload())
finally:
collection_schemas.logger.removeHandler(caplog.handler)

warnings = [record.message for record in caplog.records if record.levelno == logging.WARNING]
assert warnings.count("Embedding circuit breaker is open; re-enqueueing messages") == 1


@pytest.mark.asyncio
async def test_embedding_handler_treats_shutdown_write_lock_as_success(monkeypatch):
class _ClosingDuringUpsertVikingDB:
Expand Down
37 changes: 37 additions & 0 deletions tests/utils/test_circuit_breaker.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,43 @@ def test_circuit_breaker_half_open_failure_reopens(monkeypatch):
cb.check()


def test_half_open_failure_doubles_reset_timeout(monkeypatch):
from openviking.utils.circuit_breaker import CircuitBreaker, CircuitBreakerOpen

base = time.monotonic()
cb = CircuitBreaker(failure_threshold=1, reset_timeout=60, max_reset_timeout=240)
cb.record_failure(RuntimeError("429 TooManyRequests"))

monkeypatch.setattr(time, "monotonic", lambda: base + 61)
cb.check()
cb.record_failure(RuntimeError("429 TooManyRequests"))

assert cb._current_reset_timeout == 120

monkeypatch.setattr(time, "monotonic", lambda: base + 61 + 119)
with pytest.raises(CircuitBreakerOpen):
cb.check()


def test_half_open_success_resets_backoff(monkeypatch):
from openviking.utils.circuit_breaker import CircuitBreaker

base = time.monotonic()
cb = CircuitBreaker(failure_threshold=1, reset_timeout=60, max_reset_timeout=240)
cb.record_failure(RuntimeError("500"))

monkeypatch.setattr(time, "monotonic", lambda: base + 61)
cb.check()
cb.record_failure(RuntimeError("500 again"))
assert cb._current_reset_timeout == 120

monkeypatch.setattr(time, "monotonic", lambda: base + 61 + 121)
cb.check()
cb.record_success()

assert cb._current_reset_timeout == 60


def test_permanent_error_trips_immediately():
from openviking.utils.circuit_breaker import CircuitBreaker, CircuitBreakerOpen

Expand Down
Loading