Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,13 @@ function makeSessionDeps(overrides: Partial<SessionDeps> = {}): SessionDeps & {
rows,
isWaitingRoomEnabled: () => true,
graceMs: 30 * 60 * 1000,
sessionLengthMs: 60 * 60 * 1000,
// Keep instant-admit disabled in handler tests — they verify queue/state
// transitions, not admission policy. With capacity 0 the deps below
// aren't reached, so they're trivial stubs.
getInstantAdmitCapacity: () => 0,
activeCountForModel: async () => 0,
promoteQueuedUser: async () => null,
now: () => now,
getSessionRow: async (userId) => rows.get(userId) ?? null,
queueDepthsByModel: async () => {
Expand Down
78 changes: 78 additions & 0 deletions web/src/server/free-session/__tests__/public-api.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,27 @@ function makeDeps(overrides: Partial<SessionDeps> = {}): SessionDeps & {
_now: () => currentNow,
isWaitingRoomEnabled: () => true,
graceMs: GRACE_MS,
sessionLengthMs: SESSION_LEN,
// Test default: instant-admit disabled (capacity 0) so existing FIFO
// queue tests stay green. Tests that exercise instant admission opt in
// via `getInstantAdmitCapacity: () => N`.
getInstantAdmitCapacity: () => 0,
activeCountForModel: async (model) => {
let n = 0
for (const r of rows.values()) {
if (r.status === 'active' && r.model === model) n++
}
return n
},
promoteQueuedUser: async ({ userId, model, sessionLengthMs, now }) => {
const row = rows.get(userId)
if (!row || row.status !== 'queued' || row.model !== model) return null
row.status = 'active'
row.admitted_at = now
row.expires_at = new Date(now.getTime() + sessionLengthMs)
row.updated_at = now
return row
},
now: () => currentNow,
getSessionRow: async (userId) => rows.get(userId) ?? null,
endSession: async (userId) => {
Expand Down Expand Up @@ -192,6 +213,63 @@ describe('requestSession', () => {
if (second.status !== 'active') throw new Error('unreachable')
expect(second.instanceId).not.toBe('inst-1') // rotated
})

test('instant-admit: below capacity admits the user in the same request', async () => {
const admitDeps = makeDeps({ getInstantAdmitCapacity: () => 3 })
const state = await requestSession({
userId: 'u1',
model: DEFAULT_MODEL,
deps: admitDeps,
})
expect(state.status).toBe('active')
if (state.status !== 'active') throw new Error('unreachable')
expect(state.remainingMs).toBe(SESSION_LEN)
// The row in storage is flipped too, so the next GET /session also sees active.
expect(admitDeps.rows.get('u1')?.status).toBe('active')
})

test('instant-admit: queues once active-count reaches capacity', async () => {
const admitDeps = makeDeps({ getInstantAdmitCapacity: () => 2 })
const s1 = await requestSession({
userId: 'u1',
model: DEFAULT_MODEL,
deps: admitDeps,
})
const s2 = await requestSession({
userId: 'u2',
model: DEFAULT_MODEL,
deps: admitDeps,
})
const s3 = await requestSession({
userId: 'u3',
model: DEFAULT_MODEL,
deps: admitDeps,
})
expect(s1.status).toBe('active')
expect(s2.status).toBe('active')
expect(s3.status).toBe('queued')
})

test('instant-admit: per-model capacities are independent', async () => {
// GLM saturated at 1 active, MiniMax still has room.
const admitDeps = makeDeps({
getInstantAdmitCapacity: (model) =>
model === DEFAULT_MODEL ? 1 : 10,
})
await requestSession({ userId: 'u1', model: DEFAULT_MODEL, deps: admitDeps })
const s2 = await requestSession({
userId: 'u2',
model: DEFAULT_MODEL,
deps: admitDeps,
})
const s3 = await requestSession({
userId: 'u3',
model: 'minimax/minimax-m2.7',
deps: admitDeps,
})
expect(s2.status).toBe('queued')
expect(s3.status).toBe('active')
})
})

describe('getSessionState', () => {
Expand Down
16 changes: 16 additions & 0 deletions web/src/server/free-session/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,19 @@ export function getSessionLengthMs(): number {
export function getSessionGraceMs(): number {
return env.FREEBUFF_SESSION_GRACE_MS
}

/**
* Per-model instant-admit capacity: how many concurrent active sessions a
* deployment can hold before new joiners fall back to the FIFO queue + tick.
* Deployment-sizing knob — kept server-side so we can tune without bumping
* the shared `common` package that the CLI consumes. Unknown ids → 0 (always
* queue).
*/
const INSTANT_ADMIT_CAPACITY: Record<string, number> = {
'z-ai/glm-5.1': 50,
'minimax/minimax-m2.7': 200,
}
Comment on lines +50 to +53
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Hardcoded model IDs could silently drift from the canonical registry

INSTANT_ADMIT_CAPACITY hardcodes model ID strings ('z-ai/glm-5.1', 'minimax/minimax-m2.7'). If a model is renamed or a new default is added to the @codebuff/common/constants/freebuff-models registry, this map will silently return 0 for the new ID and every user will fall back to the FIFO queue — without any error or warning.

Consider importing the canonical model ID constants from @codebuff/common so a rename triggers a compile-time error rather than a silent behaviour change:

import { GLM_MODEL_ID, MINIMAX_MODEL_ID } from '@codebuff/common/constants/freebuff-models'

const INSTANT_ADMIT_CAPACITY: Record<string, number> = {
  [GLM_MODEL_ID]: 50,
  [MINIMAX_MODEL_ID]: 200,
}


export function getInstantAdmitCapacity(id: string): number {
return INSTANT_ADMIT_CAPACITY[id] ?? 0
}
54 changes: 54 additions & 0 deletions web/src/server/free-session/public-api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,19 @@ import {
} from '@codebuff/common/constants/freebuff-models'

import {
getInstantAdmitCapacity,
getSessionGraceMs,
getSessionLengthMs,
isWaitingRoomBypassedForEmail,
isWaitingRoomEnabled,
} from './config'
import {
activeCountForModel,
endSession,
FreeSessionModelLockedError,
getSessionRow,
joinOrTakeOver,
promoteQueuedUser,
queueDepthsByModel,
queuePositionFor,
} from './store'
Expand All @@ -35,11 +39,28 @@ export interface SessionDeps {
model: string
queuedAt: Date
}) => Promise<number>
/** Instant-admit check: returns the number of active sessions currently
* bound to a given model. Compared against the model's configured
* `instantAdmitCapacity` to decide whether a new joiner skips the queue. */
activeCountForModel: (model: string) => Promise<number>
/** Instant-admit promotion: flips a specific queued row to active. Returns
* the updated row or null if the row wasn't in a queued state. */
promoteQueuedUser: (params: {
userId: string
model: string
sessionLengthMs: number
now: Date
}) => Promise<InternalSessionRow | null>
/** Per-model capacity lookup. Indirected through deps so tests can
* force-enable / force-disable instant admit without mutating the
* shared model registry. */
getInstantAdmitCapacity: (model: string) => number
isWaitingRoomEnabled: () => boolean
/** Plain values, not getters: these never change at runtime. The deps
* interface uses values rather than thunks so tests can pass numbers
* inline without wrapping. */
graceMs: number
sessionLengthMs: number
now?: () => Date
}

Expand All @@ -49,13 +70,19 @@ const defaultDeps: SessionDeps = {
endSession,
queueDepthsByModel,
queuePositionFor,
activeCountForModel,
promoteQueuedUser,
getInstantAdmitCapacity,
isWaitingRoomEnabled,
get graceMs() {
// Read-through getter so test overrides via env still work; the value
// itself is materialized once per call. Cheaper than a thunk because
// callers don't have to invoke a function.
return getSessionGraceMs()
},
get sessionLengthMs() {
return getSessionLengthMs()
},
}

const nowOf = (deps: SessionDeps): Date => (deps.now ?? (() => new Date()))()
Expand Down Expand Up @@ -145,6 +172,33 @@ export async function requestSession(params: {
}
throw err
}

// Instant-admit: if the model has spare capacity (fewer active sessions
// than its configured `instantAdmitCapacity`), skip the waiting room
// entirely and flip the user to active in this same request. The tick
// + FIFO queue only engage once we hit the threshold, so backpressure
// kicks in exactly when the deployment needs it.
//
// Race note: two concurrent joiners may each see `active < capacity`
// and both get admitted, overshooting the cap by up to `concurrency - 1`.
// Capacities are chosen with headroom for this, and the configured
// value is a comfort threshold not a hard ceiling.
if (row.status === 'queued') {
const capacity = deps.getInstantAdmitCapacity(model)
if (capacity > 0) {
const activeCount = await deps.activeCountForModel(model)
if (activeCount < capacity) {
const promoted = await deps.promoteQueuedUser({
userId: params.userId,
model,
sessionLengthMs: deps.sessionLengthMs,
now: nowOf(deps),
})
if (promoted) row = promoted
}
}
}

const view = await viewForRow(params.userId, deps, row)
if (!view) {
throw new Error(
Expand Down
55 changes: 55 additions & 0 deletions web/src/server/free-session/store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,24 @@ export async function queueDepthsByModel(): Promise<Record<string, number>> {
return out
}

/**
* Count of rows currently in `active` status for one model — the threshold
* check that gates instant admission. Hot-path lookup; callers avoid the
* full `activeCountsByModel` scan when they only need one model's count.
*/
export async function activeCountForModel(model: string): Promise<number> {
const rows = await db
.select({ n: count() })
.from(schema.freeSession)
.where(
and(
eq(schema.freeSession.status, 'active'),
eq(schema.freeSession.model, model),
),
)
return Number(rows[0]?.n ?? 0)
}
Comment on lines +184 to +195
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 activeCountForModel includes expired-but-still-active sessions

activeCountForModel counts every row where status = 'active', including sessions that have passed expires_at but are still within the grace window (not yet swept by sweepExpired). This means instant-admit can be blocked even when real capacity exists — e.g., if 10 of the 50 configured GLM slots have expired but haven't been swept yet, the capacity check still reads 50 and falls back to the queue unnecessarily.

Since the comment in requestSession already states capacities are chosen "with headroom for this," the conservative behaviour is intentional, but it's worth knowing. Adding an expires_at > now filter here would make the count more accurate at the cost of passing now through the call chain:

// Optional tighter version — only count truly live sessions:
export async function activeCountForModel(model: string, now?: Date): Promise<number> {
  const conditions = [
    eq(schema.freeSession.status, 'active'),
    eq(schema.freeSession.model, model),
    ...(now ? [gt(schema.freeSession.expires_at, now)] : []),
  ]
  ...
}

Not a bug given the acknowledged headroom, but worth documenting or addressing in a follow-up.


/**
* Single-query read of active-row counts bucketed by model. Mirrors
* `queueDepthsByModel` so the admission tick can log per-model utilization
Expand Down Expand Up @@ -333,6 +351,43 @@ export async function admitFromQueue(params: {
})
}

/**
* Promote a specific queued user to active. Used by the instant-admit path
* in `requestSession` when the model's active-session count is below its
* configured capacity — skips the FIFO advisory-lock dance because each
* call targets a distinct (user_id, model) and the UPDATE is a no-op if
* the row isn't queued any more.
*
* Returns the updated row or null if the row was not in the expected
* (queued, same-model) state.
*/
export async function promoteQueuedUser(params: {
userId: string
model: string
sessionLengthMs: number
now: Date
}): Promise<InternalSessionRow | null> {
const { userId, model, sessionLengthMs, now } = params
const expiresAt = new Date(now.getTime() + sessionLengthMs)
const [row] = await db
.update(schema.freeSession)
.set({
status: 'active',
admitted_at: now,
expires_at: expiresAt,
updated_at: now,
})
.where(
and(
eq(schema.freeSession.user_id, userId),
eq(schema.freeSession.status, 'queued'),
eq(schema.freeSession.model, model),
),
)
.returning()
return (row as InternalSessionRow | undefined) ?? null
}

/** Stable 31-bit hash so model-keyed advisory lock ids don't overflow int4. */
function hashStringToInt32(s: string): number {
let h = 0
Expand Down
Loading