diff --git a/web/src/app/api/v1/freebuff/session/__tests__/session.test.ts b/web/src/app/api/v1/freebuff/session/__tests__/session.test.ts index 3b9db7a49..cb34a0ad0 100644 --- a/web/src/app/api/v1/freebuff/session/__tests__/session.test.ts +++ b/web/src/app/api/v1/freebuff/session/__tests__/session.test.ts @@ -37,6 +37,13 @@ function makeSessionDeps(overrides: Partial = {}): SessionDeps & { rows, isWaitingRoomEnabled: () => true, graceMs: 30 * 60 * 1000, + sessionLengthMs: 60 * 60 * 1000, + // Keep instant-admit disabled in handler tests — they verify queue/state + // transitions, not admission policy. With capacity 0 the deps below + // aren't reached, so they're trivial stubs. + getInstantAdmitCapacity: () => 0, + activeCountForModel: async () => 0, + promoteQueuedUser: async () => null, now: () => now, getSessionRow: async (userId) => rows.get(userId) ?? null, queueDepthsByModel: async () => { diff --git a/web/src/server/free-session/__tests__/public-api.test.ts b/web/src/server/free-session/__tests__/public-api.test.ts index ca1dee539..5c5c51282 100644 --- a/web/src/server/free-session/__tests__/public-api.test.ts +++ b/web/src/server/free-session/__tests__/public-api.test.ts @@ -38,6 +38,27 @@ function makeDeps(overrides: Partial = {}): SessionDeps & { _now: () => currentNow, isWaitingRoomEnabled: () => true, graceMs: GRACE_MS, + sessionLengthMs: SESSION_LEN, + // Test default: instant-admit disabled (capacity 0) so existing FIFO + // queue tests stay green. Tests that exercise instant admission opt in + // via `getInstantAdmitCapacity: () => N`. + getInstantAdmitCapacity: () => 0, + activeCountForModel: async (model) => { + let n = 0 + for (const r of rows.values()) { + if (r.status === 'active' && r.model === model) n++ + } + return n + }, + promoteQueuedUser: async ({ userId, model, sessionLengthMs, now }) => { + const row = rows.get(userId) + if (!row || row.status !== 'queued' || row.model !== model) return null + row.status = 'active' + row.admitted_at = now + row.expires_at = new Date(now.getTime() + sessionLengthMs) + row.updated_at = now + return row + }, now: () => currentNow, getSessionRow: async (userId) => rows.get(userId) ?? null, endSession: async (userId) => { @@ -192,6 +213,63 @@ describe('requestSession', () => { if (second.status !== 'active') throw new Error('unreachable') expect(second.instanceId).not.toBe('inst-1') // rotated }) + + test('instant-admit: below capacity admits the user in the same request', async () => { + const admitDeps = makeDeps({ getInstantAdmitCapacity: () => 3 }) + const state = await requestSession({ + userId: 'u1', + model: DEFAULT_MODEL, + deps: admitDeps, + }) + expect(state.status).toBe('active') + if (state.status !== 'active') throw new Error('unreachable') + expect(state.remainingMs).toBe(SESSION_LEN) + // The row in storage is flipped too, so the next GET /session also sees active. + expect(admitDeps.rows.get('u1')?.status).toBe('active') + }) + + test('instant-admit: queues once active-count reaches capacity', async () => { + const admitDeps = makeDeps({ getInstantAdmitCapacity: () => 2 }) + const s1 = await requestSession({ + userId: 'u1', + model: DEFAULT_MODEL, + deps: admitDeps, + }) + const s2 = await requestSession({ + userId: 'u2', + model: DEFAULT_MODEL, + deps: admitDeps, + }) + const s3 = await requestSession({ + userId: 'u3', + model: DEFAULT_MODEL, + deps: admitDeps, + }) + expect(s1.status).toBe('active') + expect(s2.status).toBe('active') + expect(s3.status).toBe('queued') + }) + + test('instant-admit: per-model capacities are independent', async () => { + // GLM saturated at 1 active, MiniMax still has room. + const admitDeps = makeDeps({ + getInstantAdmitCapacity: (model) => + model === DEFAULT_MODEL ? 1 : 10, + }) + await requestSession({ userId: 'u1', model: DEFAULT_MODEL, deps: admitDeps }) + const s2 = await requestSession({ + userId: 'u2', + model: DEFAULT_MODEL, + deps: admitDeps, + }) + const s3 = await requestSession({ + userId: 'u3', + model: 'minimax/minimax-m2.7', + deps: admitDeps, + }) + expect(s2.status).toBe('queued') + expect(s3.status).toBe('active') + }) }) describe('getSessionState', () => { diff --git a/web/src/server/free-session/config.ts b/web/src/server/free-session/config.ts index e70e1b5c6..85bba7fa6 100644 --- a/web/src/server/free-session/config.ts +++ b/web/src/server/free-session/config.ts @@ -39,3 +39,19 @@ export function getSessionLengthMs(): number { export function getSessionGraceMs(): number { return env.FREEBUFF_SESSION_GRACE_MS } + +/** + * Per-model instant-admit capacity: how many concurrent active sessions a + * deployment can hold before new joiners fall back to the FIFO queue + tick. + * Deployment-sizing knob — kept server-side so we can tune without bumping + * the shared `common` package that the CLI consumes. Unknown ids → 0 (always + * queue). + */ +const INSTANT_ADMIT_CAPACITY: Record = { + 'z-ai/glm-5.1': 50, + 'minimax/minimax-m2.7': 200, +} + +export function getInstantAdmitCapacity(id: string): number { + return INSTANT_ADMIT_CAPACITY[id] ?? 0 +} diff --git a/web/src/server/free-session/public-api.ts b/web/src/server/free-session/public-api.ts index 10150d8f1..3357b7e05 100644 --- a/web/src/server/free-session/public-api.ts +++ b/web/src/server/free-session/public-api.ts @@ -4,15 +4,19 @@ import { } from '@codebuff/common/constants/freebuff-models' import { + getInstantAdmitCapacity, getSessionGraceMs, + getSessionLengthMs, isWaitingRoomBypassedForEmail, isWaitingRoomEnabled, } from './config' import { + activeCountForModel, endSession, FreeSessionModelLockedError, getSessionRow, joinOrTakeOver, + promoteQueuedUser, queueDepthsByModel, queuePositionFor, } from './store' @@ -35,11 +39,28 @@ export interface SessionDeps { model: string queuedAt: Date }) => Promise + /** Instant-admit check: returns the number of active sessions currently + * bound to a given model. Compared against the model's configured + * `instantAdmitCapacity` to decide whether a new joiner skips the queue. */ + activeCountForModel: (model: string) => Promise + /** Instant-admit promotion: flips a specific queued row to active. Returns + * the updated row or null if the row wasn't in a queued state. */ + promoteQueuedUser: (params: { + userId: string + model: string + sessionLengthMs: number + now: Date + }) => Promise + /** Per-model capacity lookup. Indirected through deps so tests can + * force-enable / force-disable instant admit without mutating the + * shared model registry. */ + getInstantAdmitCapacity: (model: string) => number isWaitingRoomEnabled: () => boolean /** Plain values, not getters: these never change at runtime. The deps * interface uses values rather than thunks so tests can pass numbers * inline without wrapping. */ graceMs: number + sessionLengthMs: number now?: () => Date } @@ -49,6 +70,9 @@ const defaultDeps: SessionDeps = { endSession, queueDepthsByModel, queuePositionFor, + activeCountForModel, + promoteQueuedUser, + getInstantAdmitCapacity, isWaitingRoomEnabled, get graceMs() { // Read-through getter so test overrides via env still work; the value @@ -56,6 +80,9 @@ const defaultDeps: SessionDeps = { // callers don't have to invoke a function. return getSessionGraceMs() }, + get sessionLengthMs() { + return getSessionLengthMs() + }, } const nowOf = (deps: SessionDeps): Date => (deps.now ?? (() => new Date()))() @@ -145,6 +172,33 @@ export async function requestSession(params: { } throw err } + + // Instant-admit: if the model has spare capacity (fewer active sessions + // than its configured `instantAdmitCapacity`), skip the waiting room + // entirely and flip the user to active in this same request. The tick + // + FIFO queue only engage once we hit the threshold, so backpressure + // kicks in exactly when the deployment needs it. + // + // Race note: two concurrent joiners may each see `active < capacity` + // and both get admitted, overshooting the cap by up to `concurrency - 1`. + // Capacities are chosen with headroom for this, and the configured + // value is a comfort threshold not a hard ceiling. + if (row.status === 'queued') { + const capacity = deps.getInstantAdmitCapacity(model) + if (capacity > 0) { + const activeCount = await deps.activeCountForModel(model) + if (activeCount < capacity) { + const promoted = await deps.promoteQueuedUser({ + userId: params.userId, + model, + sessionLengthMs: deps.sessionLengthMs, + now: nowOf(deps), + }) + if (promoted) row = promoted + } + } + } + const view = await viewForRow(params.userId, deps, row) if (!view) { throw new Error( diff --git a/web/src/server/free-session/store.ts b/web/src/server/free-session/store.ts index 3ef0229b0..13beb0739 100644 --- a/web/src/server/free-session/store.ts +++ b/web/src/server/free-session/store.ts @@ -176,6 +176,24 @@ export async function queueDepthsByModel(): Promise> { return out } +/** + * Count of rows currently in `active` status for one model — the threshold + * check that gates instant admission. Hot-path lookup; callers avoid the + * full `activeCountsByModel` scan when they only need one model's count. + */ +export async function activeCountForModel(model: string): Promise { + const rows = await db + .select({ n: count() }) + .from(schema.freeSession) + .where( + and( + eq(schema.freeSession.status, 'active'), + eq(schema.freeSession.model, model), + ), + ) + return Number(rows[0]?.n ?? 0) +} + /** * Single-query read of active-row counts bucketed by model. Mirrors * `queueDepthsByModel` so the admission tick can log per-model utilization @@ -333,6 +351,43 @@ export async function admitFromQueue(params: { }) } +/** + * Promote a specific queued user to active. Used by the instant-admit path + * in `requestSession` when the model's active-session count is below its + * configured capacity — skips the FIFO advisory-lock dance because each + * call targets a distinct (user_id, model) and the UPDATE is a no-op if + * the row isn't queued any more. + * + * Returns the updated row or null if the row was not in the expected + * (queued, same-model) state. + */ +export async function promoteQueuedUser(params: { + userId: string + model: string + sessionLengthMs: number + now: Date +}): Promise { + const { userId, model, sessionLengthMs, now } = params + const expiresAt = new Date(now.getTime() + sessionLengthMs) + const [row] = await db + .update(schema.freeSession) + .set({ + status: 'active', + admitted_at: now, + expires_at: expiresAt, + updated_at: now, + }) + .where( + and( + eq(schema.freeSession.user_id, userId), + eq(schema.freeSession.status, 'queued'), + eq(schema.freeSession.model, model), + ), + ) + .returning() + return (row as InternalSessionRow | undefined) ?? null +} + /** Stable 31-bit hash so model-keyed advisory lock ids don't overflow int4. */ function hashStringToInt32(s: string): number { let h = 0