Skip to content

feat(events): port Phase 4a per-tier policy from cueapi/cueapi#779#76

Merged
mikemolinet merged 1 commit into
mainfrom
feat/phase-4a-priority-tier-policy
May 11, 2026
Merged

feat(events): port Phase 4a per-tier policy from cueapi/cueapi#779#76
mikemolinet merged 1 commit into
mainfrom
feat/phase-4a-priority-tier-policy

Conversation

@mikemolinet
Copy link
Copy Markdown
Collaborator

Verbatim port of cueapi/cueapi#779 (commit a319134) — Phase 4a per-tier subscription dispatcher policy + p=4 debounce.

Backward-compat default (redis=None kwarg) preserves v1 behavior; this PR is purely additive in the OSS substrate.

Behavior

Tier Phase 4a behavior Notes
p=5 unchanged pass-through
p=4 NEW debounced — max 1 webhook fire per recipient per PRIORITY_4_DEBOUNCE_SECONDS window (default 2s) Deferred p=4 events stay queued; re-evaluate next cycle
p=3 unchanged pass-through
p=2 + p=1 unchanged at 4a; digest emission ships in 4b Purely additive at 4a

New module

worker/subscription_dispatcher_policy.py (verbatim, 332 lines): apply_tier_policy + stamp_dispatch_markers + 4 pure-helper internals (_event_priority, _debounce_key, _is_p4_debounced, _stamp_p4_fire).

Dispatcher wiring

  • dispatch_subscription_events(db_engine, redis=None) — new optional kwarg. redis=None (backward-compat) bypasses policy entirely.
  • Watermark math caveat for deferred events documented inline.
  • Redis-down fails OPEN (logs + fires) to avoid silent suppression.
  • run_poller passes heartbeat_redis so production gets the new policy.

Settings

  • PRIORITY_4_DEBOUNCE_SECONDS: float = 2.0 — tunable via env.

Tests (22 verbatim port)

5 _event_priority branches + 1 _debounce_key + 5 _is_p4_debounced + 2 _stamp_p4_fire + 7 apply_tier_policy + 2 stamp_dispatch_markers. Uses a FakeRedis stub — no real Redis dependency.

Local regression: 40 dispatcher tests pass (22 new policy + 18 existing dispatcher).

parity-manifest delta

  • worker/subscription_dispatcher.pylast_synced + ported_in bumped; deviation note documents the Phase 4a additions.
  • New: worker/subscription_dispatcher_policy.pyported_in: phase-4a-policy-port.

OSS-shape

Verbatim — no hosted-only deviations.

Phase 4b (digest for p=1/p=2) + Phase 4c (observability) queued as follow-up PRs.

🤖 Generated with Claude Code

Verbatim port of cueapi/cueapi#779 (commit a319134) — per-tier
subscription dispatcher policy with p=4 debounce. Backward-compat
default (`redis=None` kwarg) preserves v1 behavior so this PR is
purely additive in the OSS substrate as well.

**Surface (verbatim):**

- New module `worker/subscription_dispatcher_policy.py` (332 lines):
  apply_tier_policy + stamp_dispatch_markers + 4 pure helpers.
- `dispatch_subscription_events(db_engine, redis=None)` — new
  optional kwarg; bypassed when None.
- Watermark math caveat for deferred events documented inline.
- Redis-down fails OPEN (fires events anyway + logs warning).
- Settings: `PRIORITY_4_DEBOUNCE_SECONDS: float = 2.0`.
- `run_poller` passes `heartbeat_redis` so production gets new policy.

**Behavior:**

- p=5 / p=3 / p=2 / p=1: unchanged (pass through)
- p=4: NEW debounce — max 1 webhook fire per recipient per 2s window

**Tests** (22 verbatim port; uses FakeRedis stub):

- 5 `_event_priority` branches
- 1 `_debounce_key`
- 5 `_is_p4_debounced` branches
- 2 `_stamp_p4_fire`
- 7 `apply_tier_policy` branches
- 2 `stamp_dispatch_markers`

**Regression**: 40 dispatcher tests pass locally (22 new policy + 18
existing dispatcher).

**parity-manifest delta**:

- `worker/subscription_dispatcher.py` — last_synced + ported_in
  bumped (now lists event-emit-primitive-port + phase-4a-policy-port)
  + deviation note documenting the Phase 4a additions.
- NEW: `worker/subscription_dispatcher_policy.py` — `ported_in:
  phase-4a-policy-port`.

**OSS-shape**: verbatim — no hosted-only deviations.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@github-actions
Copy link
Copy Markdown

Parity check

This PR modifies files tracked in parity-manifest.json:

  • app/config.py
  • app/models/dispatch_outbox.py
  • worker/poller.py
  • worker/subscription_dispatcher.py
  • worker/subscription_dispatcher_policy.py

Please confirm one of the following in a reply or PR description update:

  1. The equivalent change has been applied to the private cueapi monorepo. Link the PR.
  2. This change is OSS-only and does not need porting. Briefly explain why (e.g. "fixes a bug that only exists in the OSS build").
  3. A follow-up issue has been filed to port the reverse direction. Link the issue.

This is a soft check — it does not block merge. The goal is visibility, not friction. See HOSTED_ONLY.md for the open-core policy.

@govindkavaturi-art govindkavaturi-art enabled auto-merge (squash) May 11, 2026 16:02
@mikemolinet mikemolinet merged commit b7ebb92 into main May 11, 2026
6 checks passed
@mikemolinet mikemolinet deleted the feat/phase-4a-priority-tier-policy branch May 11, 2026 16:07
mikemolinet added a commit that referenced this pull request May 11, 2026
…ed_at on list endpoint (#79)

Re-port of closed [PR #48](#48) which was on a stale base ~8870 deletions behind main. Fresh against current main HEAD via direct patch from the old branch's commit (c05518c).

Adds query-side enrichment to ``GET /v1/executions``:

- ``worker_id=`` — filter to executions claimed by a specific worker
  (Execution.claimed_by_worker)
- ``status__in=foo,bar,baz`` — comma-separated multi-status filter.
  Mutex with ``status=``; 400 conflicting_filters if both passed.
- Response gains ``oldest_claimed_at`` — earliest ``claimed_at`` over
  the filtered set, or null when count=0. Unblocks dashboard/menubar
  "oldest pending: 5m" rendering without a follow-up query.

## Use cases

- **Menubar pending counter**: fetch ``pending,delivering,retry_ready``
  in one round trip (status__in), get total + oldest_claimed_at to
  render "3 in flight, oldest claimed 8m ago".
- **Worker health dashboard**: filter by worker_id to scope to one
  worker's claims; see how stale their oldest claim is.

## Tests

3 new tests in TestListExecutions:
- worker_id scoping + oldest_claimed_at value reflects earliest
  claimed_at over filtered set
- status__in union + status/status__in mutex 400
- null oldest_claimed_at on empty filtered set

31/31 in test_execution_parity.py green. Full local suite: zero
regressions.

## Re-port note

Re-port of closed PR #48 (commit c05518c). Branch was ~8870 deletions
behind main; fresh against current main after PR #74/#75/#76/#77/#78
merged earlier in this session. Patch applied cleanly from c05518c
to current main — the patch only touches list_executions endpoint,
which has remained additive (cueapi-core added outcome_state filter
since the branch was cut; this PR is compatible).
mikemolinet added a commit that referenced this pull request May 11, 2026
…ueapi/cueapi#630) (#80)

Re-port of closed [PR #47](#47) which was on a stale base ~8880 deletions behind main. Fresh against current main HEAD.

Phase A of the Agent Directory productization. Eliminates the failure
mode where agents had to remember 6+ fields per recipient AND had no
way to discover the live roster.

## What lands

- **GET /v1/agents/roster** — display-optimized snapshot for prompt-
  injection at session-boot. Distinct from the existing management
  surface (GET /v1/agents):
  - Always-full list (no pagination)
  - Drops opaque IDs / secrets / timestamps / tenancy metadata
  - Adds derived ``online``, ``last_seen_relative``, ``preferred_contact``
  - Always excludes soft-deleted agents
  - Weak ETag + ``If-None-Match`` → 304 Not Modified for poll efficiency
  - ETag bucketed to 5-min windows so quiet periods produce stable hashes
  - ``Cache-Control: private, max-age=300`` matches derivation buckets

- **Migration 031** (renumbered from private's 048) — adds
  ``agents.last_seen_at TIMESTAMPTZ NULL``. Nullable, no backfill.

- **Hot-path hooks** write ``last_seen_at = now()``:
  - ``create_message`` — sender's agent (in same tx via touch_last_seen)
  - ``list_inbox`` — recipient's agent, on EVERY poll (via
    _bump_last_seen_stmt). Even when no queued messages exist, the
    poll proves activity.

- **Online derivation** (server-computed in ``list_roster``):
  - within 5 min   → ``online``
  - within 30 min  → ``away``
  - older or NULL  → ``offline``
  - Caller override wins: PATCHed status=away/offline keeps that
    override regardless of recent activity

## Pure helpers (for unit-testability — pytest-cov + ASGI issue)

- ``_build_roster_entry(agent, now)`` in agent_service.py: ORM Agent
  → (entry_dict, etag_part_string)
- ``_compute_roster_etag(parts)`` in agent_service.py: list → weak ETag
- ``_derive_online_state(now, last_seen_at, asserted_status)`` →
  (online_bool, derived_status)
- ``_format_relative(now, last_seen_at)`` → "active now" / "5m ago" / ...
- ``_bucketed_seen(last_seen_at)`` → string for ETag stability
- ``_bump_last_seen_stmt(agent_id, now)`` in inbox_service.py:
  SQLAlchemy UPDATE statement
- ``_etag_matches(if_none_match, server_etag)`` in agents router:
  conditional GET predicate

## Tests

27 new tests in tests/test_agent_roster.py (verbatim from private):
shape verification, hot-path hooks (sender + recipient), derivation
correctness across all 3 buckets, caller-asserted status override,
soft-delete exclusion, preferred_contact derivation,
last_seen_relative formatting, ETag 304 handling, ETag changes when
roster mutates, pure-helper unit tests.

27/27 pass locally. Full local suite: 890 passed + 18 xfailed
(pre-existing) + 4 skipped. Zero regressions.

## Re-port note

Re-port of closed PR #47. Fresh against current main after PR #74 +
#75 + #76 + #77 + #78 + #79 merged earlier in this session.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant