Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 25 additions & 14 deletions src/app/api/openrouter/[...path]/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -205,26 +205,15 @@ export async function POST(request: NextRequest): Promise<NextResponseType<unkno
);
console.debug(`Routing request to ${provider.id}`);

// Fire-and-forget abuse classification as early as possible
void classifyAbuse(request, requestBodyParsed, {
// Start abuse classification early (non-blocking) - we'll await it before creating usage context
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SUGGESTION: Misleading comment about when abuse classification is awaited

classifyPromise is awaited after the upstream response (via Promise.race([...])), not before creating usageContext. Updating this comment avoids confusion for future maintainers.

Suggested change
// Start abuse classification early (non-blocking) - we'll await it before creating usage context
// Start abuse classification early (non-blocking) - we'll await it later to attach request_id to usageContext

const classifyPromise = classifyAbuse(request, requestBodyParsed, {
kiloUserId: user.id,
organizationId,
projectId,
provider: provider.id,
isByok: !!userByok,
}).then(result => {
if (result) {
console.log('Abuse classification result:', {
verdict: result.verdict,
risk_score: result.risk_score,
signals: result.signals,
identity_key: result.context.identity_key,
kilo_user_id: user.id,
requested_model: originalModelIdLowerCased,
rps: result.context.requests_per_second,
});
}
});

// large responses may run longer than the 800s serverless function timeout, usually this value is set to 8192 tokens
if (requestBodyParsed.max_tokens && requestBodyParsed.max_tokens > MAX_TOKENS_LIMIT) {
console.warn(`SECURITY: Max tokens limit exceeded: ${user.id}`, {
Expand Down Expand Up @@ -392,6 +381,28 @@ export async function POST(request: NextRequest): Promise<NextResponseType<unkno

const clonedReponse = response.clone(); // reading from body is side-effectful

// Await abuse classification (with timeout) to get request_id for cost tracking correlation
let timeoutId: ReturnType<typeof setTimeout> | undefined;
const classifyResult = await Promise.race([
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WARNING: Awaiting abuse classification adds up to 2s latency on the critical path

await Promise.race([...]) runs before accountForMicrodollarUsage(...) and before returning the proxied response, so each request can incur up to ~2s extra tail latency. Consider moving the timeout-await into the fire-and-forget usage accounting path (e.g., have accountForMicrodollarUsage await classifyPromise internally), so client response timing isn't affected while still capturing request_id when available.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jrf0110 i guess this is true? could we fire this after so that we can get request_id eventually (if ever) and then call countAndStoreUsage ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The p99's for the requests to the abuse service are way less than 2s, so in practice, the classify response is going to be resolved already. I didn't want to have to pass in the promise into accountForMicrodollarUsage to resolve it there as it seemed like that would be some mega-mixed concerns programming.

Image

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh and to clarify, the abuse classify promise is resolved after the await to the openrouter. If we look at requests to the completions endpoint:

Image

It's pretty clear that 99.999% of the time, the abuse promise will resolve prior to the openrouter response coming back

classifyPromise.finally(() => timeoutId && clearTimeout(timeoutId)),
Comment thread
jrf0110 marked this conversation as resolved.
new Promise<null>(resolve => {
timeoutId = setTimeout(() => resolve(null), 2000);
}),
]);
if (classifyResult) {
console.log('Abuse classification result:', {
verdict: classifyResult.verdict,
risk_score: classifyResult.risk_score,
signals: classifyResult.signals,
identity_key: classifyResult.context.identity_key,
kilo_user_id: user.id,
requested_model: originalModelIdLowerCased,
rps: classifyResult.context.requests_per_second,
request_id: classifyResult.request_id,
});
usageContext.abuse_request_id = classifyResult.request_id;
}

accountForMicrodollarUsage(clonedReponse, usageContext, openrouterRequestSpan);

{
Expand Down
144 changes: 136 additions & 8 deletions src/lib/abuse-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

import { type NextRequest } from 'next/server';
import {
ABUSE_SERVICE_SECRET,
ABUSE_SERVICE_CF_ACCESS_CLIENT_ID,
ABUSE_SERVICE_CF_ACCESS_CLIENT_SECRET,
ABUSE_SERVICE_URL,
Expand Down Expand Up @@ -117,6 +116,8 @@ export type AbuseClassificationResponse = {
action_metadata: ActionMetadata;
/** State context for debugging headers */
context: ClassificationContext;
/** Request ID for correlating with cost updates. 0 indicates an error during classification. */
request_id: number;
};

/**
Expand Down Expand Up @@ -202,18 +203,12 @@ export async function classifyRequest(
return null;
}

if (!ABUSE_SERVICE_SECRET) {
console.warn('ABUSE_SERVICE_SECRET not configured, skipping abuse classification');
return null;
}

try {
const headers: Record<string, string> = {
'Content-Type': 'application/json',
'X-Service-Secret': ABUSE_SERVICE_SECRET,
};

// Add Cloudflare Access headers in production (validated at startup in config.server.ts)
// Add Cloudflare Access headers for authentication
if (ABUSE_SERVICE_CF_ACCESS_CLIENT_ID && ABUSE_SERVICE_CF_ACCESS_CLIENT_SECRET) {
headers['CF-Access-Client-Id'] = ABUSE_SERVICE_CF_ACCESS_CLIENT_ID;
headers['CF-Access-Client-Secret'] = ABUSE_SERVICE_CF_ACCESS_CLIENT_SECRET;
Expand All @@ -238,6 +233,87 @@ export async function classifyRequest(
}
}

/**
* Request payload for reporting cost to the abuse service after request completion.
* Enables spend-based heuristics like free_tier_exhausted.
*/
type CostUpdatePayload = {
// Identity fields (must match what was sent to /classify)
kilo_user_id?: string | null;
ip_address?: string | null;
ja4_digest?: string | null;
user_agent?: string | null;

// Request identification (REQUIRED)
request_id: number; // From classify response, for correlation
message_id: string; // From LLM response, for analytics

// Cost data (REQUIRED, in microdollars)
cost: number;
requested_model?: string | null;

// Token counts (optional but recommended)
input_tokens?: number | null;
output_tokens?: number | null;
cache_write_tokens?: number | null;
cache_hit_tokens?: number | null;
};

/**
* Response from the cost update endpoint
*/
export type CostUpdateResponse = {
success: boolean;
identity_key?: string;
message_id?: string;
do_updated?: boolean;
error?: string;
};

/**
* Report cost to the abuse service after a request completes.
* This enables spend-based heuristics like free_tier_exhausted.
*
* This is fire-and-forget - failures are logged but don't affect the user.
*
* @param payload - Cost and identity data to report
* @returns Response or null if service unavailable/failed
*/
export async function reportCost(payload: CostUpdatePayload): Promise<CostUpdateResponse | null> {
if (!ABUSE_SERVICE_URL) {
return null;
}

try {
const headers: Record<string, string> = {
Comment thread
jrf0110 marked this conversation as resolved.
'Content-Type': 'application/json',
};

// Add Cloudflare Access headers in production
if (ABUSE_SERVICE_CF_ACCESS_CLIENT_ID && ABUSE_SERVICE_CF_ACCESS_CLIENT_SECRET) {
headers['CF-Access-Client-Id'] = ABUSE_SERVICE_CF_ACCESS_CLIENT_ID;
headers['CF-Access-Client-Secret'] = ABUSE_SERVICE_CF_ACCESS_CLIENT_SECRET;
}

const response = await fetch(`${ABUSE_SERVICE_URL}/api/usage/cost`, {
method: 'POST',
headers,
body: JSON.stringify(payload),
});

if (!response.ok) {
console.error(`[Abuse] Cost update failed (${response.status}): ${await response.text()}`);
return null;
}

return (await response.json()) as CostUpdateResponse;
} catch (error) {
// Log but don't throw - this shouldn't affect user experience
console.error('[Abuse] Failed to report cost:', error);
return null;
}
}

/**
* Context needed to classify abuse for a request.
* All fields are optional to allow classification early in the request lifecycle.
Expand Down Expand Up @@ -292,3 +368,55 @@ export async function classifyAbuse(

return classifyRequest(payload);
}

/**
* Report cost to the abuse service after a request completes.
* Call this after the LLM response is processed and usage stats are available.
*
* Requires usageContext.abuse_request_id (from classify response) and
* usageStats.messageId (from LLM response). Skips if either is missing
* or if abuse_request_id is 0 (indicates classification error).
*
* Use fire-and-forget pattern since this shouldn't block:
* reportAbuseCost(usageContext, usageStats).catch(console.error)
*/
export async function reportAbuseCost(
usageContext: {
kiloUserId: string;
fraudHeaders: {
http_x_forwarded_for: string | null;
http_x_vercel_ja4_digest: string | null;
http_user_agent: string | null;
};
requested_model: string;
abuse_request_id?: number;
},
usageStats: {
messageId: string | null;
cost_mUsd: number;
inputTokens: number;
outputTokens: number;
cacheWriteTokens: number;
cacheHitTokens: number;
}
): Promise<CostUpdateResponse | null> {
// Skip if missing required fields or request_id is 0 (classification error)
if (!usageContext.abuse_request_id || !usageStats.messageId) {
return null;
}

return reportCost({
kilo_user_id: usageContext.kiloUserId,
ip_address: usageContext.fraudHeaders.http_x_forwarded_for,
ja4_digest: usageContext.fraudHeaders.http_x_vercel_ja4_digest,
user_agent: usageContext.fraudHeaders.http_user_agent,
request_id: usageContext.abuse_request_id,
message_id: usageStats.messageId,
cost: usageStats.cost_mUsd,
requested_model: usageContext.requested_model,
input_tokens: usageStats.inputTokens,
output_tokens: usageStats.outputTokens,
cache_write_tokens: usageStats.cacheWriteTokens,
cache_hit_tokens: usageStats.cacheHitTokens,
});
}
1 change: 0 additions & 1 deletion src/lib/config.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,6 @@ export const ENABLE_MILVUS_DUAL_WRITE = true;
export const AI_ATTRIBUTION_ADMIN_SECRET = getEnvVariable('AI_ATTRIBUTION_ADMIN_SECRET');

// Abuse Detection Service
export const ABUSE_SERVICE_SECRET = getEnvVariable('ABUSE_SERVICE_SECRET');
export const ABUSE_SERVICE_CF_ACCESS_CLIENT_ID = getEnvVariable(
'ABUSE_SERVICE_CF_ACCESS_CLIENT_ID'
);
Expand Down
9 changes: 9 additions & 0 deletions src/lib/processUsage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import { maybeIssueKiloPassBonusFromUsageThreshold } from '@/lib/kilo-pass/usage
import { getEffectiveKiloPassThreshold } from '@/lib/kilo-pass/threshold';
import { appendKiloPassAuditLog } from '@/lib/kilo-pass/issuance';
import { KiloPassAuditLogAction, KiloPassAuditLogResult } from '@/lib/kilo-pass/enums';
import { reportAbuseCost } from '@/lib/abuse-service';

const posthogClient = PostHogClient();

Expand Down Expand Up @@ -171,6 +172,8 @@ export type MicrodollarUsageContext = {
/** True if user/org is using their own API key - cost should be zeroed out */
user_byok: boolean;
has_tools: boolean;
/** Request ID from abuse service classify response, for cost tracking correlation. 0 means skip. */
abuse_request_id?: number;
};

export type UsageContextInfo = ReturnType<typeof extractUsageContextInfo>;
Expand Down Expand Up @@ -916,6 +919,12 @@ async function processTokenData(
usageStats.model = usageContext.requested_model;
}

// Report upstream cost to abuse service BEFORE zeroing for free/BYOK
// (abuse service needs actual spend for heuristics like free_tier_exhausted)
reportAbuseCost(usageContext, usageStats).catch(error => {
console.error('[Abuse] Failed to report cost:', error);
});

if (isFreeModel(usageContext.requested_model) || usageContext.user_byok) {
usageStats.cost_mUsd = 0;
usageStats.cacheDiscount_mUsd = 0;
Expand Down