Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions web/src/server/free-session/__tests__/admission.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ function makeAdmissionDeps(overrides: Partial<AdmissionDeps> = {}): AdmissionDep
const deps: AdmissionDeps & { calls: { admit: number } } = {
calls,
sweepExpired: async () => 0,
evictBanned: async () => 0,
queueDepth: async () => 0,
activeCountsByModel: async () => ({}),
getFleetHealth: async () => ({}),
Expand Down Expand Up @@ -126,4 +127,33 @@ describe('runAdmissionTick', () => {
await runAdmissionTick(deps)
expect(received).toEqual([12_345])
})

test('evicts banned users every tick and surfaces the count', async () => {
let evictCalls = 0
const deps = makeAdmissionDeps({
evictBanned: async () => {
evictCalls += 1
return 4
},
})
const result = await runAdmissionTick(deps)
expect(evictCalls).toBe(1)
expect(result.evictedBanned).toBe(4)
})

test('still evicts banned users when admission is paused by health', async () => {
let evictCalls = 0
const deps = makeAdmissionDeps({
getFleetHealth: async () => fleet('unhealthy'),
evictBanned: async () => {
evictCalls += 1
return 2
},
})
const result = await runAdmissionTick(deps)
expect(evictCalls).toBe(1)
expect(result.evictedBanned).toBe(2)
expect(result.admitted).toBe(0)
expect(result.skipped).toBe('unhealthy')
})
})
14 changes: 13 additions & 1 deletion web/src/server/free-session/admission.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import { getFleetHealth } from './fireworks-health'
import {
activeCountsByModel,
admitFromQueue,
evictBanned,
queueDepth,
sweepExpired,
} from './store'
Expand All @@ -20,6 +21,7 @@ import { logger } from '@/util/logger'

export interface AdmissionDeps {
sweepExpired: (now: Date, graceMs: number) => Promise<number>
evictBanned: () => Promise<number>
queueDepth: (params: { model: string }) => Promise<number>
activeCountsByModel: () => Promise<Record<string, number>>
admitFromQueue: (params: {
Expand All @@ -39,6 +41,7 @@ export interface AdmissionDeps {

const defaultDeps: AdmissionDeps = {
sweepExpired,
evictBanned,
queueDepth,
activeCountsByModel,
admitFromQueue,
Expand All @@ -60,6 +63,8 @@ const defaultDeps: AdmissionDeps = {

export interface AdmissionTickResult {
expired: number
/** Free_session rows removed because the user is banned. */
evictedBanned: number
admitted: number
/** Per-model queue depth at the end of the tick. */
queueDepthByModel: Record<string, number>
Expand All @@ -86,7 +91,12 @@ export async function runAdmissionTick(
deps: AdmissionDeps = defaultDeps,
): Promise<AdmissionTickResult> {
const now = (deps.now ?? (() => new Date()))()
const expired = await deps.sweepExpired(now, deps.graceMs)
// Run eviction before admission so a banned user freed from a slot in this
// tick frees room for a queued user to be admitted in the same tick.
const [expired, evictedBanned] = await Promise.all([
deps.sweepExpired(now, deps.graceMs),
deps.evictBanned(),
])

const models = deps.models ?? FREEBUFF_MODELS.map((m) => m.id)

Expand Down Expand Up @@ -122,6 +132,7 @@ export async function runAdmissionTick(

return {
expired,
evictedBanned,
admitted: totalAdmitted,
queueDepthByModel,
activeCountByModel,
Expand All @@ -145,6 +156,7 @@ function runTick() {
metric: 'freebuff_waiting_room',
admitted: result.admitted,
expired: result.expired,
evictedBanned: result.evictedBanned,
queueDepthByModel: result.queueDepthByModel,
activeCountByModel: result.activeCountByModel,
skipped: result.skipped,
Expand Down
20 changes: 20 additions & 0 deletions web/src/server/free-session/store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,26 @@ export async function sweepExpired(now: Date, graceMs: number): Promise<number>
return deleted.length
}

/**
* Drop any free_session row whose user has been banned. Bans flipped via the
* admin UI / direct SQL / Stripe webhook don't cascade into free_session, so
* without this sweep a banned user keeps holding their admitted slot until
* expires_at. Cheap to call every tick (EXISTS subquery, indexed PK lookup).
*/
export async function evictBanned(): Promise<number> {
const deleted = await db
.delete(schema.freeSession)
.where(
sql`EXISTS (
SELECT 1 FROM ${schema.user}
WHERE ${schema.user.id} = ${schema.freeSession.user_id}
AND ${schema.user.banned} = true
)`,
)
.returning({ user_id: schema.freeSession.user_id })
return deleted.length
}

/**
* Atomically admit one queued user for a specific model, gated by the
* upstream health for that model's deployment and guarded by an advisory
Expand Down
Loading