From 872af3cb044170cbccc6df2f7626bd28ce310ded Mon Sep 17 00:00:00 2001 From: mikemolinet Date: Mon, 11 May 2026 08:56:42 -0700 Subject: [PATCH] feat(events): port Phase 4a per-tier policy from cueapi/cueapi#779 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Verbatim port of cueapi/cueapi#779 (commit a319134) — per-tier subscription dispatcher policy with p=4 debounce. Backward-compat default (`redis=None` kwarg) preserves v1 behavior so this PR is purely additive in the OSS substrate as well. **Surface (verbatim):** - New module `worker/subscription_dispatcher_policy.py` (332 lines): apply_tier_policy + stamp_dispatch_markers + 4 pure helpers. - `dispatch_subscription_events(db_engine, redis=None)` — new optional kwarg; bypassed when None. - Watermark math caveat for deferred events documented inline. - Redis-down fails OPEN (fires events anyway + logs warning). - Settings: `PRIORITY_4_DEBOUNCE_SECONDS: float = 2.0`. - `run_poller` passes `heartbeat_redis` so production gets new policy. **Behavior:** - p=5 / p=3 / p=2 / p=1: unchanged (pass through) - p=4: NEW debounce — max 1 webhook fire per recipient per 2s window **Tests** (22 verbatim port; uses FakeRedis stub): - 5 `_event_priority` branches - 1 `_debounce_key` - 5 `_is_p4_debounced` branches - 2 `_stamp_p4_fire` - 7 `apply_tier_policy` branches - 2 `stamp_dispatch_markers` **Regression**: 40 dispatcher tests pass locally (22 new policy + 18 existing dispatcher). **parity-manifest delta**: - `worker/subscription_dispatcher.py` — last_synced + ported_in bumped (now lists event-emit-primitive-port + phase-4a-policy-port) + deviation note documenting the Phase 4a additions. - NEW: `worker/subscription_dispatcher_policy.py` — `ported_in: phase-4a-policy-port`. **OSS-shape**: verbatim — no hosted-only deviations. Co-Authored-By: Claude Opus 4.7 (1M context) --- app/config.py | 6 + parity-manifest.json | 3 +- tests/test_subscription_dispatcher_policy.py | 351 +++++++++++++++++++ worker/poller.py | 7 +- worker/subscription_dispatcher.py | 59 +++- worker/subscription_dispatcher_policy.py | 243 +++++++++++++ 6 files changed, 665 insertions(+), 4 deletions(-) create mode 100644 tests/test_subscription_dispatcher_policy.py create mode 100644 worker/subscription_dispatcher_policy.py diff --git a/app/config.py b/app/config.py index 6b81644..ea5439d 100644 --- a/app/config.py +++ b/app/config.py @@ -21,6 +21,12 @@ class Settings(BaseSettings): # Messaging push delivery: how long a "delivering" message can sit # before stale-recovery transitions it back to retry_ready. MESSAGE_DELIVERY_STALE_AFTER_SECONDS: int = 300 + # Phase 4a: priority-tier subscription dispatcher. p=4 (high) events + # debounce to at most 1 webhook fire per recipient per window. p=3 + # (default) + p=5 (urgent) pass through unchanged at v1; p=2 + p=1 + # batch into digests in Phase 4b. Tunable; default 2s per CTO concur + # 2026-05-11 (matches "rapid burst" debounce of Q2 design lock). + PRIORITY_4_DEBOUNCE_SECONDS: float = 2.0 # Messaging TTL: how many days a queued message stays visible before # the cleanup task moves it to ``expired``. Self-hosters can tune # this for archival vs retention semantics — short TTL (e.g. 7d) diff --git a/parity-manifest.json b/parity-manifest.json index dec5a5b..ad0254d 100644 --- a/parity-manifest.json +++ b/parity-manifest.json @@ -165,7 +165,8 @@ "worker": [ {"path": "worker/tasks.py", "private_counterpart": "worker/tasks.py", "last_synced": "2026-05-01", "ported_in": "messaging-primitive-port", "deviation": "ports messaging-specific functions only (deliver_message_task, retry_message_task, _check_concurrent_cap_or_recycle, _release_concurrent, _load_message_context, _claim_message, _route_attempt_outcome) — does not port other private deltas (catch_up_policy, heartbeat trigger, etc., scheduled for a follow-up phase)"}, {"path": "worker/poller.py", "private_counterpart": "worker/poller.py", "last_synced": "2026-05-01", "ported_in": "messaging-primitive-port", "deviation": "ports messaging-specific changes only (recover_stale_message_deliveries function + dispatch_outbox scheduled_at filter + deliver_message/retry_message routing)"}, - {"path": "worker/subscription_dispatcher.py", "private_counterpart": "worker/subscription_dispatcher.py", "last_synced": "2026-05-11", "ported_in": "event-emit-primitive-port (PR-1b)"} + {"path": "worker/subscription_dispatcher.py", "private_counterpart": "worker/subscription_dispatcher.py", "last_synced": "2026-05-11", "ported_in": "event-emit-primitive-port (PR-1b) + phase-4a-policy-port", "deviation": "Phase 4a additions verbatim: redis=None kwarg + apply_tier_policy pre-deliver filter + stamp_dispatch_markers post-deliver side effect + watermark math caveat for deferred events. Backward-compat default (redis=None) preserves v1 behavior."}, + {"path": "worker/subscription_dispatcher_policy.py", "private_counterpart": "worker/subscription_dispatcher_policy.py", "last_synced": "2026-05-11", "ported_in": "phase-4a-policy-port"} ] } } diff --git a/tests/test_subscription_dispatcher_policy.py b/tests/test_subscription_dispatcher_policy.py new file mode 100644 index 0000000..2bd4329 --- /dev/null +++ b/tests/test_subscription_dispatcher_policy.py @@ -0,0 +1,351 @@ +"""Tests for the per-tier subscription dispatcher policy (Phase 4a). + +Two layers: + +* Pure-helper unit tests on ``_event_priority`` + ``_debounce_key`` + — no DB, no Redis, no HTTP. +* ``apply_tier_policy`` + ``stamp_dispatch_markers`` integration with + a fake-redis stand-in — exercises every branch (p=3/5 pass-through, + p=4 debounce hit, p=4 debounce miss, Redis-down fallback, malformed + marker fallback, mixed-tier batch). + +Per CLAUDE.md pure-helper extraction discipline: every branch in the +policy module gets a dedicated direct-call test so pytest-cov sees +it without going through the ASGI dispatch layer. +""" +from __future__ import annotations + +from typing import Any, Dict, Optional +from unittest.mock import patch + +import pytest + +from worker.subscription_dispatcher_policy import ( + DEBOUNCE_KEY_PREFIX, + PRIORITY_ARCHIVE, + PRIORITY_DEFAULT, + PRIORITY_HIGH, + PRIORITY_LOW, + PRIORITY_URGENT, + _debounce_key, + _event_priority, + _is_p4_debounced, + _stamp_p4_fire, + apply_tier_policy, + stamp_dispatch_markers, +) + + +# ─────────────────────────────────────────────────────────────────────── +# FakeRedis stub — minimal in-memory + async get/set with optional +# side-effects for Redis-down simulation. +# ─────────────────────────────────────────────────────────────────────── + + +class FakeRedis: + """Minimal stand-in for the Redis async client. + + Supports: ``get(key)``, ``set(key, value, ex=ttl)``. Real Redis + behavior on TTL is not simulated (TTL is recorded but doesn't + auto-expire in the test); tests that need expiry use the + monkeypatched ``_now_seconds`` clock. + """ + + def __init__(self): + self.store: Dict[str, str] = {} + self.ttls: Dict[str, int] = {} + self.fail_on_get = False + self.fail_on_set = False + + async def get(self, key: str) -> Optional[str]: + if self.fail_on_get: + raise RuntimeError("simulated Redis down on get") + return self.store.get(key) + + async def set(self, key: str, value: str, ex: Optional[int] = None) -> None: + if self.fail_on_set: + raise RuntimeError("simulated Redis down on set") + self.store[key] = value + if ex is not None: + self.ttls[key] = ex + + +def _evt(*, priority: int = 3, eid: int = 1, payload_extra: Optional[Dict] = None) -> Any: + """Construct a dict-shaped event suitable for the helper. + + The helper accepts either ORM Event objects or plain dicts (per + its docstring) — using dicts keeps the test free of DB setup. + """ + payload = {"priority": priority} + if payload_extra: + payload.update(payload_extra) + return {"id": eid, "payload": payload, "event_type": "message.delivered"} + + +# ─────────────────────────────────────────────────────────────────────── +# _event_priority — branches +# ─────────────────────────────────────────────────────────────────────── + + +def test_event_priority_extracts_from_dict(): + assert _event_priority(_evt(priority=5)) == 5 + assert _event_priority(_evt(priority=1)) == 1 + + +def test_event_priority_defaults_when_missing(): + """No `priority` field in payload → default tier.""" + assert _event_priority({"id": 1, "payload": {}}) == PRIORITY_DEFAULT + + +def test_event_priority_defaults_on_non_int(): + """priority="high" (string) → default tier (defensive).""" + assert _event_priority({"payload": {"priority": "high"}}) == PRIORITY_DEFAULT + + +def test_event_priority_defaults_on_out_of_range(): + """priority=99 → default tier; priority=0 → default tier.""" + assert _event_priority({"payload": {"priority": 99}}) == PRIORITY_DEFAULT + assert _event_priority({"payload": {"priority": 0}}) == PRIORITY_DEFAULT + + +def test_event_priority_handles_missing_payload(): + """No payload attr at all → default tier.""" + assert _event_priority({}) == PRIORITY_DEFAULT + + +def test_debounce_key_format(): + assert _debounce_key("agt_abc") == f"{DEBOUNCE_KEY_PREFIX}:agt_abc" + + +# ─────────────────────────────────────────────────────────────────────── +# _is_p4_debounced — branches +# ─────────────────────────────────────────────────────────────────────── + + +@pytest.mark.asyncio +async def test_p4_debounced_false_when_no_marker(): + """No prior fire recorded → not debounced.""" + redis = FakeRedis() + assert await _is_p4_debounced(redis, "agt_x") is False + + +@pytest.mark.asyncio +async def test_p4_debounced_true_within_window(): + """Fire within the window → debounced.""" + redis = FakeRedis() + # Pretend a fire just happened. + with patch( + "worker.subscription_dispatcher_policy._now_seconds", + return_value=1000.0, + ): + await _stamp_p4_fire(redis, "agt_x") + # Now check — clock advanced 1s (well within 2s default window). + with patch( + "worker.subscription_dispatcher_policy._now_seconds", + return_value=1001.0, + ): + assert await _is_p4_debounced(redis, "agt_x") is True + + +@pytest.mark.asyncio +async def test_p4_debounced_false_outside_window(): + """Fire 3s ago → window elapsed → not debounced.""" + redis = FakeRedis() + with patch( + "worker.subscription_dispatcher_policy._now_seconds", + return_value=1000.0, + ): + await _stamp_p4_fire(redis, "agt_x") + with patch( + "worker.subscription_dispatcher_policy._now_seconds", + return_value=1003.5, + ): + assert await _is_p4_debounced(redis, "agt_x") is False + + +@pytest.mark.asyncio +async def test_p4_debounced_false_on_redis_down(): + """Redis errors out → not debounced (fail-open allows fire).""" + redis = FakeRedis() + redis.fail_on_get = True + assert await _is_p4_debounced(redis, "agt_x") is False + + +@pytest.mark.asyncio +async def test_p4_debounced_false_on_malformed_marker(): + """Marker isn't a valid float → not debounced.""" + redis = FakeRedis() + redis.store[_debounce_key("agt_x")] = "not-a-number" + assert await _is_p4_debounced(redis, "agt_x") is False + + +# ─────────────────────────────────────────────────────────────────────── +# _stamp_p4_fire — Redis-down doesn't raise +# ─────────────────────────────────────────────────────────────────────── + + +@pytest.mark.asyncio +async def test_stamp_p4_fire_handles_redis_down(): + """Redis errors on SET → logged but doesn't raise.""" + redis = FakeRedis() + redis.fail_on_set = True + # Should not raise. + await _stamp_p4_fire(redis, "agt_x") + + +@pytest.mark.asyncio +async def test_stamp_p4_fire_sets_ttl(): + """The marker key is set with a TTL > debounce window.""" + redis = FakeRedis() + await _stamp_p4_fire(redis, "agt_x") + key = _debounce_key("agt_x") + assert key in redis.store + # TTL is debounce_seconds + 1, so >= 3 for the default 2s window. + assert redis.ttls[key] >= 3 + + +# ─────────────────────────────────────────────────────────────────────── +# apply_tier_policy — branches +# ─────────────────────────────────────────────────────────────────────── + + +@pytest.mark.asyncio +async def test_apply_tier_policy_empty_batch(): + """Empty events list → empty results, no Redis call needed.""" + redis = FakeRedis() + to_fire, deferred = await apply_tier_policy( + redis, subscriber_agent_id="agt_x", events=[] + ) + assert to_fire == [] + assert deferred == [] + + +@pytest.mark.asyncio +async def test_apply_tier_policy_no_p4_events_skips_redis(): + """No p=4 events in batch → no Redis check, all pass through.""" + redis = FakeRedis() + events = [_evt(priority=5, eid=1), _evt(priority=3, eid=2)] + to_fire, deferred = await apply_tier_policy( + redis, subscriber_agent_id="agt_x", events=events + ) + assert to_fire == events + assert deferred == [] + # Redis store still empty — no `get` call should have been needed. + assert _debounce_key("agt_x") not in redis.store + + +@pytest.mark.asyncio +async def test_apply_tier_policy_p4_passes_when_not_debounced(): + """p=4 event + no prior fire → passes through.""" + redis = FakeRedis() + events = [_evt(priority=PRIORITY_HIGH, eid=1)] + to_fire, deferred = await apply_tier_policy( + redis, subscriber_agent_id="agt_x", events=events + ) + assert to_fire == events + assert deferred == [] + + +@pytest.mark.asyncio +async def test_apply_tier_policy_p4_defers_when_debounced(): + """p=4 event + prior fire within window → deferred.""" + redis = FakeRedis() + # Pre-stamp a recent fire. + with patch( + "worker.subscription_dispatcher_policy._now_seconds", + return_value=1000.0, + ): + await _stamp_p4_fire(redis, "agt_x") + + events = [_evt(priority=PRIORITY_HIGH, eid=1)] + with patch( + "worker.subscription_dispatcher_policy._now_seconds", + return_value=1001.0, + ): + to_fire, deferred = await apply_tier_policy( + redis, subscriber_agent_id="agt_x", events=events + ) + assert to_fire == [] + assert deferred == events + + +@pytest.mark.asyncio +async def test_apply_tier_policy_mixed_batch_p5_passes_p4_defers(): + """Mixed-priority batch + recipient is debounced: p=5 + p=3 fire, + only the p=4 gets deferred.""" + redis = FakeRedis() + with patch( + "worker.subscription_dispatcher_policy._now_seconds", + return_value=1000.0, + ): + await _stamp_p4_fire(redis, "agt_x") + + events = [ + _evt(priority=PRIORITY_URGENT, eid=1), + _evt(priority=PRIORITY_HIGH, eid=2), # deferred + _evt(priority=PRIORITY_DEFAULT, eid=3), + ] + with patch( + "worker.subscription_dispatcher_policy._now_seconds", + return_value=1001.0, + ): + to_fire, deferred = await apply_tier_policy( + redis, subscriber_agent_id="agt_x", events=events + ) + assert [e["id"] for e in to_fire] == [1, 3] + assert [e["id"] for e in deferred] == [2] + + +@pytest.mark.asyncio +async def test_apply_tier_policy_p2_p1_pass_through_at_phase4a(): + """Phase 4a preserves v1 behavior for p=1/p=2 — they pass through. + Phase 4b will swap them to digest emission.""" + redis = FakeRedis() + events = [ + _evt(priority=PRIORITY_LOW, eid=1), + _evt(priority=PRIORITY_ARCHIVE, eid=2), + ] + to_fire, deferred = await apply_tier_policy( + redis, subscriber_agent_id="agt_x", events=events + ) + assert to_fire == events + assert deferred == [] + + +@pytest.mark.asyncio +async def test_apply_tier_policy_redis_down_fires_everything(): + """Redis errors on GET → all events fire (no silent suppression).""" + redis = FakeRedis() + redis.fail_on_get = True + events = [_evt(priority=PRIORITY_HIGH, eid=1)] + to_fire, deferred = await apply_tier_policy( + redis, subscriber_agent_id="agt_x", events=events + ) + assert to_fire == events + assert deferred == [] + + +# ─────────────────────────────────────────────────────────────────────── +# stamp_dispatch_markers — branches +# ─────────────────────────────────────────────────────────────────────── + + +@pytest.mark.asyncio +async def test_stamp_markers_p4_in_batch_stamps(): + redis = FakeRedis() + events = [_evt(priority=PRIORITY_HIGH, eid=1)] + await stamp_dispatch_markers( + redis, subscriber_agent_id="agt_x", events=events + ) + assert _debounce_key("agt_x") in redis.store + + +@pytest.mark.asyncio +async def test_stamp_markers_no_p4_skips_redis(): + """If no p=4 events fired, no need to stamp.""" + redis = FakeRedis() + events = [_evt(priority=PRIORITY_DEFAULT, eid=1)] + await stamp_dispatch_markers( + redis, subscriber_agent_id="agt_x", events=events + ) + assert _debounce_key("agt_x") not in redis.store diff --git a/worker/poller.py b/worker/poller.py index dd82e43..30a882b 100644 --- a/worker/poller.py +++ b/worker/poller.py @@ -988,7 +988,12 @@ async def run_poller(): # wires `emit_event`, the events table stays empty and # this call returns 0. from worker.subscription_dispatcher import dispatch_subscription_events - subscription_dispatch_count = await dispatch_subscription_events(db_engine) + # Phase 4a — pass Redis client so per-tier policy + # (p=4 debounce) can consult + stamp markers. Falls + # through to fire-everything when redis=None. + subscription_dispatch_count = await dispatch_subscription_events( + db_engine, redis=heartbeat_redis, + ) worker_alerts = await check_worker_health(db_engine, heartbeat_redis) cycle_duration_ms = int((time.monotonic() - cycle_start) * 1000) diff --git a/worker/subscription_dispatcher.py b/worker/subscription_dispatcher.py index 027a638..a02f530 100644 --- a/worker/subscription_dispatcher.py +++ b/worker/subscription_dispatcher.py @@ -35,6 +35,10 @@ from app.models.event import Event from app.models.subscription import Subscription from app.utils.signing import sign_payload +from worker.subscription_dispatcher_policy import ( + apply_tier_policy, + stamp_dispatch_markers, +) logger = logging.getLogger(__name__) @@ -225,6 +229,7 @@ async def _deliver_webhook( async def dispatch_subscription_events( db_engine: AsyncEngine, batch_size: int = DISPATCH_BATCH_SIZE, + redis=None, ) -> int: """Drain one cycle of pending webhook deliveries. @@ -256,7 +261,27 @@ async def dispatch_subscription_events( # idle case. Skip silently. continue - body = _build_webhook_body(str(sub.id), events) + # Phase 4a — per-tier policy. Filters p=4 events when + # the recipient is in the debounce window; passes the + # rest through. Redis-down falls through to "fire + # everything" per the helper's defensive behavior. + if redis is not None: + events_to_fire, events_deferred = await apply_tier_policy( + redis, + subscriber_agent_id=sub.subscriber_agent_id, + events=events, + ) + else: + events_to_fire, events_deferred = list(events), [] + + if not events_to_fire: + # All events deferred (only p=4 in the batch + + # recipient is debounced). Skip the webhook fire; + # leave watermark unadvanced; re-evaluate next + # cycle. NOT counted as an attempt. + continue + + body = _build_webhook_body(str(sub.id), events_to_fire) ok, status_code = await _deliver_webhook( url=sub.webhook_url, # type: ignore[arg-type] # delivery_target='webhook' guarantees non-None secret=sub.webhook_secret, # type: ignore[arg-type] @@ -265,15 +290,45 @@ async def dispatch_subscription_events( outcome = _classify_response(ok=ok, status_code=status_code) if outcome == "success": + # Phase 4a watermark math: when some events are + # deferred, advance only up to the highest-id + # event in events_to_fire that's BEFORE the lowest + # deferred event. Preserves ordering: deferred + # events stay in the queue at their original id + # for the next cycle. With no deferred events, + # advances to events_to_fire[-1].id (v1 behavior). + if events_deferred: + lowest_deferred_id = min(e.id for e in events_deferred) + contiguous_fired = [ + e for e in events_to_fire if e.id < lowest_deferred_id + ] + new_watermark = ( + contiguous_fired[-1].id + if contiguous_fired + else (sub.last_dispatched_event_id or 0) + ) + else: + new_watermark = events_to_fire[-1].id + await session.execute( update(Subscription) .where(Subscription.id == sub.id) .values( - last_dispatched_event_id=events[-1].id, + last_dispatched_event_id=new_watermark, last_dispatched_at=datetime.now(timezone.utc), consecutive_failures=0, ) ) + + # Phase 4a — record p=4 fire so subsequent cycles + # within the window suppress further p=4 fires + # to the same recipient. + if redis is not None: + await stamp_dispatch_markers( + redis, + subscriber_agent_id=sub.subscriber_agent_id, + events=events_to_fire, + ) else: # retry + skip both bump the failure counter without # advancing the watermark. Distinction matters for diff --git a/worker/subscription_dispatcher_policy.py b/worker/subscription_dispatcher_policy.py new file mode 100644 index 0000000..5a9c273 --- /dev/null +++ b/worker/subscription_dispatcher_policy.py @@ -0,0 +1,243 @@ +"""Per-tier policy gate for the subscription webhook dispatcher (Phase 4a). + +Sits between ``_fetch_pending_events_for_sub`` and ``_deliver_webhook`` +in ``worker/subscription_dispatcher.py``. Inspects each candidate event's +``payload.priority`` and applies tier-specific rules: + +* **p=5 (urgent)** — passes through immediately. Unchanged v1 behavior. +* **p=4 (high)** — DEBOUNCED. At most one webhook fire per recipient + per ``PRIORITY_4_DEBOUNCE_SECONDS`` window (default 2s). When a + previous fire happened within the window, all p=4 events stay in + the queue and re-evaluate next dispatch cycle. +* **p=3 (default)** — passes through. Unchanged v1 behavior. +* **p=2 + p=1 (low / archive)** — passes through at Phase 4a; Phase 4b + swaps these to digest-batched emission. v1 behavior preserved + until 4b ships so 4a alone is purely additive (debounce-only + semantics). + +Pure-helper design for ASGI-coverage-tracing reliability + direct +testability (per CLAUDE.md discipline). The dispatcher calls +:func:`apply_tier_policy` with the candidate events list + a Redis +client; the helper returns the events that should actually fire + +stamps the dispatched-at marker for p=4. The dispatcher's own +side-effects (HTTP POST, watermark advance) remain unchanged. + +Why a separate module: keeps the dispatcher's existing surface small +(fetch → deliver → update watermark). Tier policy is a substantive +chunk of behavior that benefits from being unit-testable without +spinning up the whole dispatch loop. Pattern mirrors the +``_run_long_poll_wait`` extraction in PR #776. + +Closes Phase 4a (Backlog ``cmp0qzg6l000004jr272gbirx``); spec at +https://trydock.ai/workspaces/handoff?surface=phase-4-priority-tier-dispatcher. +""" +from __future__ import annotations + +import logging +import time +from typing import Any, List, Sequence, Tuple + +from app.config import settings + + +logger = logging.getLogger(__name__) + + +# Priority constants — match the messaging primitive's 1-5 enum. +PRIORITY_URGENT = 5 +PRIORITY_HIGH = 4 +PRIORITY_DEFAULT = 3 +PRIORITY_LOW = 2 +PRIORITY_ARCHIVE = 1 + +# Redis key prefix for the p=4 debounce marker. One key per +# subscriber_agent_id; stores the last fire's Unix timestamp as a +# string. Expires after the debounce window so stale markers don't +# leak. +DEBOUNCE_KEY_PREFIX = "priority_4_debounce" + + +def _debounce_key(subscriber_agent_id: str) -> str: + """Build the Redis key for a recipient's p=4 debounce marker.""" + return f"{DEBOUNCE_KEY_PREFIX}:{subscriber_agent_id}" + + +def _event_priority(event: Any) -> int: + """Extract the priority field from an event's payload. + + Defaults to :data:`PRIORITY_DEFAULT` if the payload is missing + the field or has a non-int value. Defensive — older event shapes + pre-PR-2a may not have populated this column. Returning the + default tier means they pass through unchanged (correct v1 + behavior for missing-metadata events). + + Accepts either an ORM ``Event`` instance with a ``.payload`` dict + attribute or a plain dict (for direct-call unit tests). + """ + payload = getattr(event, "payload", None) + if payload is None and isinstance(event, dict): + payload = event.get("payload") or event + if not isinstance(payload, dict): + return PRIORITY_DEFAULT + value = payload.get("priority", PRIORITY_DEFAULT) + if not isinstance(value, int): + return PRIORITY_DEFAULT + if value < PRIORITY_ARCHIVE or value > PRIORITY_URGENT: + # Out-of-range priorities (shouldn't happen post-validation + # at /v1/messages, but defensive) treated as default tier. + return PRIORITY_DEFAULT + return value + + +def _now_seconds() -> float: + """Indirection for tests — monkeypatched to fake the clock.""" + return time.time() + + +async def _is_p4_debounced(redis, subscriber_agent_id: str) -> bool: + """Check whether a recipient is currently inside the p=4 debounce + window. + + Returns True if a fire happened within + ``PRIORITY_4_DEBOUNCE_SECONDS`` of now. False otherwise (no + recorded fire, or the recorded fire is older than the window). + + Redis-down is treated as "not debounced" (allow fire). The + rationale matches the existing rate-limit-middleware fallback: + don't let an infra outage on Redis silently suppress + notifications. Logs a warning if Redis errors out so the + operator notices. + """ + try: + raw = await redis.get(_debounce_key(subscriber_agent_id)) + except Exception as exc: # noqa: BLE001 — Redis-down fallback + logger.warning( + "p=4 debounce Redis check failed; allowing fire", + extra={ + "event_type": "p4_debounce_redis_error", + "subscriber_agent_id": subscriber_agent_id, + "error": str(exc)[:200], + }, + ) + return False + + if raw is None: + return False + + try: + last_fire = float(raw) + except (TypeError, ValueError): + # Marker was malformed; treat as "no recent fire." + return False + + elapsed = _now_seconds() - last_fire + return elapsed < settings.PRIORITY_4_DEBOUNCE_SECONDS + + +async def _stamp_p4_fire(redis, subscriber_agent_id: str) -> None: + """Record that a p=4 fire just happened for this recipient. + + Stores the current Unix timestamp + sets a TTL slightly longer + than the debounce window so stale markers auto-expire. Redis-down + is logged but not raised — the next fire will re-stamp. + """ + key = _debounce_key(subscriber_agent_id) + now_str = str(_now_seconds()) + # TTL = debounce window + 1s safety buffer. Slightly over so the + # key is guaranteed to still be readable at the boundary. + ttl_seconds = max(1, int(settings.PRIORITY_4_DEBOUNCE_SECONDS) + 1) + try: + await redis.set(key, now_str, ex=ttl_seconds) + except Exception as exc: # noqa: BLE001 — non-blocking + logger.warning( + "p=4 debounce stamp failed; next fire may re-fire within window", + extra={ + "event_type": "p4_debounce_stamp_error", + "subscriber_agent_id": subscriber_agent_id, + "error": str(exc)[:200], + }, + ) + + +async def apply_tier_policy( + redis, + *, + subscriber_agent_id: str, + events: Sequence[Any], +) -> Tuple[List[Any], List[Any]]: + """Apply per-tier dispatch policy to a candidate event batch. + + Returns ``(events_to_fire, events_deferred)``: + + * ``events_to_fire`` — pass through to the webhook POST. The + dispatcher's existing flow handles them normally (watermark + advances to ``events_to_fire[-1].id`` after successful fire). + * ``events_deferred`` — held back this cycle. The dispatcher + should NOT advance the watermark past these; they'll be + re-evaluated next cycle. + + Phase 4a behavior (only p=4 has non-trivial policy): + + * p=5 / p=3 / p=2 / p=1 → events_to_fire (v1 behavior preserved + for everything but p=4). + * p=4 → events_to_fire ONLY if the recipient isn't currently in + the debounce window. If they are, the p=4 events go to + events_deferred. + + **Watermark math caveat** (callers MUST observe): when + ``events_deferred`` is non-empty, the watermark must advance to + the highest-id event in ``events_to_fire`` IF those events form + a contiguous prefix of the original batch. If p=4 events are + interleaved with higher-priority events, we'd lose ordering + semantics by advancing past a deferred p=4. Phase 4a's solution: + when ANY events are deferred, the dispatcher fires + ``events_to_fire`` BUT advances watermark only to the lowest + deferred event's ``id - 1``. The dispatcher implements this; the + helper just returns the two lists. + + Note: the watermark-advance constraint is a sequencing decision + documented at the call site, not enforced inside this helper. + The helper's contract is purely "which events should fire now." + """ + if not events: + return [], [] + + # Optimization: if no p=4 events in the batch, no debounce check + # needed at all. Skip the Redis round trip in the common case. + has_priority_4 = any(_event_priority(e) == PRIORITY_HIGH for e in events) + if not has_priority_4: + return list(events), [] + + debounced = await _is_p4_debounced(redis, subscriber_agent_id) + + to_fire: List[Any] = [] + deferred: List[Any] = [] + + for event in events: + if _event_priority(event) == PRIORITY_HIGH and debounced: + deferred.append(event) + else: + to_fire.append(event) + + return to_fire, deferred + + +async def stamp_dispatch_markers( + redis, + *, + subscriber_agent_id: str, + events: Sequence[Any], +) -> None: + """Record dispatch markers for tiers that need post-fire state. + + Called AFTER ``_deliver_webhook`` succeeds. Currently only stamps + the p=4 debounce marker (one per recipient regardless of how + many p=4 events were in the batch — the marker is "did a fire + happen recently?", not "how many events fired"). + + Phase 4b will extend this with digest emission markers for + p=1 + p=2. The function exists at v1 to keep the dispatcher's + side-effect surface localized to one helper call. + """ + if any(_event_priority(e) == PRIORITY_HIGH for e in events): + await _stamp_p4_fire(redis, subscriber_agent_id)