Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions src/governs_ai/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
from .client import GovernsAIClient, GovernsAIError, PrecheckError
from .types import PrecheckResult
from .types import BudgetResult, PrecheckResult

__all__ = ["GovernsAIClient", "GovernsAIError", "PrecheckError", "PrecheckResult"]
__all__ = [
"GovernsAIClient",
"GovernsAIError",
"PrecheckError",
"PrecheckResult",
"BudgetResult",
]
157 changes: 156 additions & 1 deletion src/governs_ai/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import httpx

from .types import PrecheckResult
from .types import BudgetResult, PrecheckResult


class GovernsAIError(Exception):
Expand Down Expand Up @@ -180,3 +180,158 @@ async def async_precheck(
continue

raise PrecheckError(f"Max retries exceeded: {last_error_msg}")

# ------------------------------------------------------------------
# 1.4c — record_usage()
# ------------------------------------------------------------------

def record_usage(
self,
org_id: str,
user_id: str,
tokens: int,
model: str,
*,
provider: str = "openai",
) -> None:
"""Record token usage for a model request.

Example::

client.record_usage(
org_id="org-1", user_id="user-123",
tokens=180, model="gpt-4o-mini",
)
"""
payload: Dict[str, Any] = {
"orgId": org_id or self.org_id,
"userId": user_id,
"inputTokens": tokens,
"outputTokens": 0,
"model": model,
"provider": provider,
}
with httpx.Client(timeout=self.timeout) as http:
resp = http.post(
f"{self.base_url}/api/v1/usage",
json=payload,
headers=self.headers,
)
if resp.status_code >= 400:
raise GovernsAIError(
f"record_usage failed with HTTP {resp.status_code}: {resp.text}",
status_code=resp.status_code,
)

async def async_record_usage(
self,
org_id: str,
user_id: str,
tokens: int,
model: str,
*,
provider: str = "openai",
) -> None:
"""Async variant of :meth:`record_usage`."""
payload: Dict[str, Any] = {
"orgId": org_id or self.org_id,
"userId": user_id,
"inputTokens": tokens,
"outputTokens": 0,
"model": model,
"provider": provider,
}
async with httpx.AsyncClient(timeout=self.timeout) as http:
resp = await http.post(
f"{self.base_url}/api/v1/usage",
json=payload,
headers=self.headers,
)
if resp.status_code >= 400:
raise GovernsAIError(
f"record_usage failed with HTTP {resp.status_code}: {resp.text}",
status_code=resp.status_code,
)

# ------------------------------------------------------------------
# 1.4c — budget_check()
# ------------------------------------------------------------------

def budget_check(
self,
org_id: str,
user_id: str,
estimated_tokens: int = 0,
) -> BudgetResult:
"""Check whether the user/org is within budget.

Example::

budget = client.budget_check(org_id="org-1", user_id="u1", estimated_tokens=500)
if not budget.allowed:
raise RuntimeError("Budget exceeded")
"""
params: Dict[str, Any] = {
"orgId": org_id or self.org_id,
"userId": user_id,
"estimatedTokens": estimated_tokens,
}
with httpx.Client(timeout=self.timeout) as http:
resp = http.get(
f"{self.base_url}/api/v1/budget/context",
params=params,
headers=self.headers,
)
if resp.status_code >= 400:
raise GovernsAIError(
f"budget_check failed with HTTP {resp.status_code}: {resp.text}",
status_code=resp.status_code,
)
data = resp.json()
limit = data.get("limit", data.get("monthly_limit", 0))
remaining = data.get("remaining_tokens", data.get("remaining", limit))
allowed = data.get("allowed", remaining > 0)
warning_threshold_hit = limit > 0 and (remaining / limit) < 0.10
return BudgetResult(
allowed=allowed,
remaining_tokens=int(remaining),
limit=int(limit),
warning_threshold_hit=warning_threshold_hit,
reason=data.get("reason", ""),
)

async def async_budget_check(
self,
org_id: str,
user_id: str,
estimated_tokens: int = 0,
) -> BudgetResult:
"""Async variant of :meth:`budget_check`."""
params: Dict[str, Any] = {
"orgId": org_id or self.org_id,
"userId": user_id,
"estimatedTokens": estimated_tokens,
}
async with httpx.AsyncClient(timeout=self.timeout) as http:
resp = await http.get(
f"{self.base_url}/api/v1/budget/context",
params=params,
headers=self.headers,
)
if resp.status_code >= 400:
raise GovernsAIError(
f"budget_check failed with HTTP {resp.status_code}: {resp.text}",
status_code=resp.status_code,
)
data = resp.json()
limit = data.get("limit", data.get("monthly_limit", 0))
remaining = data.get("remaining_tokens", data.get("remaining", limit))
allowed = data.get("allowed", remaining > 0)
warning_threshold_hit = limit > 0 and (remaining / limit) < 0.10
return BudgetResult(
allowed=allowed,
remaining_tokens=int(remaining),
limit=int(limit),
warning_threshold_hit=warning_threshold_hit,
reason=data.get("reason", ""),
)
20 changes: 20 additions & 0 deletions src/governs_ai/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,23 @@ class PrecheckResult:
redacted_content: Optional[str] = None
reasons: List[str] = field(default_factory=list)
latency_ms: float = 0.0


@dataclass
class BudgetResult:
"""Result of a budget_check call.

Example::

budget = client.budget_check(org_id="org-1", user_id="user-1", estimated_tokens=500)
if not budget.allowed:
raise RuntimeError("Budget exceeded")
if budget.warning_threshold_hit:
logger.warning("Less than 10% budget remaining")
"""

allowed: bool
remaining_tokens: int
limit: int
warning_threshold_hit: bool
reason: str = ""
82 changes: 82 additions & 0 deletions tests/test_record_usage_budget.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
"""Unit tests for record_usage() and budget_check()."""

import json
import pytest
import respx
import httpx

from governs_ai import GovernsAIClient, BudgetResult

BASE = "https://api.governs.ai"


@pytest.fixture
def client():
return GovernsAIClient(api_key="test-key", org_id="org-test")


@respx.mock
def test_record_usage_sends_correct_payload(client):
route = respx.post(f"{BASE}/api/v1/usage").mock(
return_value=httpx.Response(200, json={"accepted": True})
)
client.record_usage(org_id="org-1", user_id="user-123", tokens=100, model="gpt-4o")
body = json.loads(route.calls[0].request.content)
assert body["orgId"] == "org-1"
assert body["userId"] == "user-123"
assert body["inputTokens"] == 100
assert body["model"] == "gpt-4o"


@respx.mock
async def test_async_record_usage_sends_correct_payload(client):
route = respx.post(f"{BASE}/api/v1/usage").mock(
return_value=httpx.Response(200, json={"accepted": True})
)
await client.async_record_usage(
org_id="org-1", user_id="user-123", tokens=50, model="gpt-4o-mini"
)
body = json.loads(route.calls[0].request.content)
assert body["userId"] == "user-123"
assert body["inputTokens"] == 50


@respx.mock
def test_budget_check_allowed(client):
respx.get(f"{BASE}/api/v1/budget/context").mock(
return_value=httpx.Response(
200,
json={"allowed": True, "remaining_tokens": 9000, "limit": 10000, "reason": ""},
)
)
result = client.budget_check(org_id="org-1", user_id="user-1")
assert isinstance(result, BudgetResult)
assert result.allowed is True
assert result.warning_threshold_hit is False


@respx.mock
def test_budget_check_denied_when_over_budget(client):
respx.get(f"{BASE}/api/v1/budget/context").mock(
return_value=httpx.Response(
200,
json={"allowed": False, "remaining_tokens": 0, "limit": 10000, "reason": "over_budget"},
)
)
result = client.budget_check(org_id="org-1", user_id="user-1")
assert result.allowed is False
assert result.reason == "over_budget"


@respx.mock
def test_budget_check_warning_threshold_hit(client):
"""warning_threshold_hit=True when remaining < 10% of limit."""
respx.get(f"{BASE}/api/v1/budget/context").mock(
return_value=httpx.Response(
200,
json={"allowed": True, "remaining_tokens": 500, "limit": 10000, "reason": ""},
)
)
result = client.budget_check(org_id="org-1", user_id="user-1")
assert result.allowed is True
assert result.warning_threshold_hit is True
Loading