diff --git a/src/auth.js b/src/auth.js index 90a57db..d0b4c01 100644 --- a/src/auth.js +++ b/src/auth.js @@ -428,6 +428,41 @@ export function acquireAccountByKey(apiKey, modelKey = null) { }; } +/** + * Explain why a pinned account cannot be used right now. Used by strict + * Cascade reuse mode, where switching accounts would lose server-side + * conversation context. + */ +export function getAccountAvailability(apiKey, modelKey = null) { + const now = Date.now(); + const a = accounts.find(x => x.apiKey === apiKey); + if (!a) return { available: false, reason: 'missing', retryAfterMs: 60_000 }; + if (a.status !== 'active') return { available: false, reason: `status:${a.status}`, retryAfterMs: 60_000 }; + + if (a.rateLimitedUntil && a.rateLimitedUntil > now) { + return { available: false, reason: 'rate_limited', retryAfterMs: Math.max(1000, a.rateLimitedUntil - now) }; + } + if (modelKey && a._modelRateLimits) { + const until = a._modelRateLimits[modelKey]; + if (until && until > now) { + return { available: false, reason: 'model_rate_limited', retryAfterMs: Math.max(1000, until - now) }; + } + if (until && until <= now) delete a._modelRateLimits[modelKey]; + } + + const limit = rpmLimitFor(a); + if (limit <= 0) return { available: false, reason: 'tier_expired', retryAfterMs: 60_000 }; + const used = pruneRpmHistory(a, now); + if (used >= limit) { + const oldest = a._rpmHistory?.[0] || now; + return { available: false, reason: 'rpm_full', retryAfterMs: Math.max(1000, oldest + RPM_WINDOW_MS - now) }; + } + if (modelKey && !isModelAllowedForAccount(a, modelKey)) { + return { available: false, reason: 'model_not_available', retryAfterMs: 60_000 }; + } + return { available: true, reason: 'available', retryAfterMs: 0 }; +} + /** * Snapshot of per-account RPM usage, for dashboard display. */ diff --git a/src/client.js b/src/client.js index 1fd4583..91f964a 100644 --- a/src/client.js +++ b/src/client.js @@ -35,6 +35,19 @@ function contentToString(content) { return content == null ? '' : JSON.stringify(content); } +function positiveIntEnv(name, fallback) { + const n = parseInt(process.env[name] || '', 10); + return Number.isFinite(n) && n > 0 ? n : fallback; +} + +function cascadeHistoryBudget(modelUid) { + const normal = positiveIntEnv('CASCADE_MAX_HISTORY_BYTES', 200_000); + if (/\b1m\b|[-_]1m$/i.test(String(modelUid || ''))) { + return positiveIntEnv('CASCADE_1M_HISTORY_BYTES', 900_000); + } + return normal; +} + // ─── WindsurfClient ──────────────────────────────────────── export class WindsurfClient { @@ -233,14 +246,14 @@ export class WindsurfClient { images = extracted.images; if (!isResume && sysText) text = sysText + '\n\n' + text; } else { - const MAX_HISTORY_BYTES = 200_000; + const maxHistoryBytes = cascadeHistoryBudget(modelUid); const lines = []; let historyBytes = 0; for (let i = convo.length - 2; i >= 0; i--) { const m = convo[i]; const tag = m.role === 'user' ? 'human' : 'assistant'; const line = `<${tag}>\n${contentToString(m.content)}\n`; - if (historyBytes + line.length > MAX_HISTORY_BYTES && lines.length > 0) { + if (historyBytes + line.length > maxHistoryBytes && lines.length > 0) { log.info(`Cascade: trimmed history at turn ${i}/${convo.length} (${Math.round(historyBytes/1024)}KB kept, ${convo.length - 2 - i} turns dropped)`); break; } @@ -270,14 +283,14 @@ export class WindsurfClient { // Cascade expired — fall back to fresh with FULL history. // text was built as resume-only (last message). Rebuild it. if (isResume && convo.length > 1) { - const MAX_HISTORY_BYTES = 200_000; + const maxHistoryBytes = cascadeHistoryBudget(modelUid); const lines = []; let historyBytes = 0; for (let i = convo.length - 2; i >= 0; i--) { const m = convo[i]; const tag = m.role === 'user' ? 'human' : 'assistant'; const line = `<${tag}>\n${contentToString(m.content)}\n`; - if (historyBytes + line.length > MAX_HISTORY_BYTES && lines.length > 0) break; + if (historyBytes + line.length > maxHistoryBytes && lines.length > 0) break; lines.unshift(line); historyBytes += line.length; } diff --git a/src/conversation-pool.js b/src/conversation-pool.js index e51e5fb..4497bfa 100644 --- a/src/conversation-pool.js +++ b/src/conversation-pool.js @@ -6,23 +6,30 @@ * Windsurf backend keep its own per-cascade context cached — we avoid * resending the full history on each turn and the server responds faster. * - * The key is a "fingerprint" of the conversation up to (but not including) - * the newest user message. A client sending [u1, a1, u2] looks up fp([u1, a1]); - * a hit means we already drove the cascade to exactly that state. We then - * `SendUserCascadeMessage(u2)` on the stored cascade_id and, on success, - * re-store the entry under fp([u1, a1, u2, a2]) for the next turn. + * The key is a "fingerprint" of the stable caller-visible trajectory up to + * (but not including) the newest user/tool result turn. A client sending + * [u1, a1, u2] looks up fp([u1]); a hit means we already drove the cascade to + * exactly that state. We then `SendUserCascadeMessage(u2)` on the stored + * cascade_id and, on success, re-store the entry under fp([u1, u2]) for the + * next turn. * * Safety rails: * - Entries are pinned to a specific (apiKey, lsPort) pair. We must reuse * the same LS and the same account or the cascade_id is meaningless. * - A checked-out entry is removed from the pool. Concurrent second request * with the same fingerprint falls back to a fresh cascade. - * - TTL 30 min; LRU eviction at 500 entries. + * - TTL defaults to 30 min (override with CASCADE_POOL_TTL_MS); LRU eviction + * at 500 entries. */ import { createHash } from 'crypto'; -const POOL_TTL_MS = 30 * 60 * 1000; +function positiveIntEnv(name, fallback) { + const n = parseInt(process.env[name] || '', 10); + return Number.isFinite(n) && n > 0 ? n : fallback; +} + +const POOL_TTL_MS = positiveIntEnv('CASCADE_POOL_TTL_MS', 30 * 60 * 1000); const POOL_MAX = 500; // fingerprint -> { cascadeId, sessionId, lsPort, apiKey, createdAt, lastAccess } @@ -51,30 +58,42 @@ function canonicalise(messages) { } /** - * Fingerprint for "resume this conversation". Hash only USER messages - * (excluding the latest one we're about to send). User messages have stable - * format across client round-trips; assistant messages don't — the client - * may restructure content arrays, add tool_use blocks, or modify text, - * causing hash mismatches and 0% hit rate. (#24) + * Fingerprint for "resume this conversation". Hash only stable caller-visible + * turns: normal user messages and tool results. Assistant messages are + * excluded because clients may restructure content arrays, add tool_use + * blocks, or modify text between turns, causing hash mismatches and 0% hit + * rate. Claude Code's system prompt also changes frequently as local project + * state changes, so it is excluded by default; set + * CASCADE_REUSE_HASH_SYSTEM=1 if strict system-prompt isolation matters more + * than reuse hit rate for a deployment. */ function systemPrefix(messages) { + if (process.env.CASCADE_REUSE_HASH_SYSTEM !== '1') return ''; return messages .filter(m => m.role === 'system') .map(m => typeof m.content === 'string' ? m.content : JSON.stringify(m.content ?? '')) .join('\0'); } +function stableTurns(messages) { + return messages + .filter(m => m.role === 'user' || m.role === 'tool') + .map(m => m.role === 'tool' + ? { ...m, role: 'tool_result' } + : m); +} + export function fingerprintBefore(messages, modelKey = '') { if (!Array.isArray(messages) || messages.length < 2) return null; - const users = messages.filter(m => m.role === 'user'); - if (users.length < 2) return null; - return sha256(modelKey + '\0' + systemPrefix(messages) + '\0' + JSON.stringify(canonicalise(users.slice(0, -1)))); + const turns = stableTurns(messages); + if (turns.length < 2) return null; + return sha256(modelKey + '\0' + systemPrefix(messages) + '\0' + JSON.stringify(canonicalise(turns.slice(0, -1)))); } export function fingerprintAfter(messages, modelKey = '') { - const users = messages.filter(m => m.role === 'user'); - if (!users.length) return null; - return sha256(modelKey + '\0' + systemPrefix(messages) + '\0' + JSON.stringify(canonicalise(users))); + const turns = stableTurns(messages); + if (!turns.length) return null; + return sha256(modelKey + '\0' + systemPrefix(messages) + '\0' + JSON.stringify(canonicalise(turns))); } function prune(now) { diff --git a/src/handlers/chat.js b/src/handlers/chat.js index 86dd378..f50d720 100644 --- a/src/handlers/chat.js +++ b/src/handlers/chat.js @@ -5,7 +5,7 @@ import { randomUUID } from 'crypto'; import { WindsurfClient } from '../client.js'; -import { getApiKey, acquireAccountByKey, reportError, reportSuccess, markRateLimited, reportInternalError, updateCapability, getAccountList, isAllRateLimited } from '../auth.js'; +import { getApiKey, acquireAccountByKey, getAccountAvailability, reportError, reportSuccess, markRateLimited, reportInternalError, updateCapability, getAccountList, isAllRateLimited } from '../auth.js'; import { resolveModel, getModelInfo } from '../models.js'; import { getLsFor, ensureLs } from '../langserver.js'; import { config, log } from '../config.js'; @@ -27,6 +27,11 @@ import { sanitizeText, PathSanitizeStream } from '../sanitize.js'; const HEARTBEAT_MS = 15_000; const QUEUE_RETRY_MS = 1_000; const QUEUE_MAX_WAIT_MS = 30_000; +const CASCADE_REUSE_STRICT = process.env.CASCADE_REUSE_STRICT === '1'; +const CASCADE_REUSE_STRICT_RETRY_MS = (() => { + const n = parseInt(process.env.CASCADE_REUSE_STRICT_RETRY_MS || '', 10); + return Number.isFinite(n) && n > 0 ? n : 60_000; +})(); // ── Model identity prompt ────────────────────────────────── // Templates live in runtime-config (editable from the dashboard). Use {model} @@ -38,6 +43,19 @@ function buildIdentitySystemMessage(displayModel, provider) { return template.replace(/\{model\}/g, displayModel); } +function strictReuseRetryMs(availability) { + return Math.max(1000, availability?.retryAfterMs || CASCADE_REUSE_STRICT_RETRY_MS); +} + +function strictReuseMessage(model, retryMs, reason = 'temporarily unavailable') { + return `${model} 上下文复用绑定账号暂不可用(${reason})。为避免切换账号导致上下文丢失,请 ${Math.ceil(retryMs / 1000)} 秒后重试`; +} + +function rateLimitCooldownMs(message = '') { + if (/about an hour|in an hour|try again in.*hour/i.test(message)) return 60 * 60 * 1000; + return 5 * 60 * 1000; +} + function genId() { return 'chatcmpl-' + randomUUID().replace(/-/g, '').slice(0, 29); } @@ -261,6 +279,7 @@ export async function handleChatCompletions(body) { const reuseEnabled = useCascade && isExperimentalEnabled('cascadeConversationReuse'); const fpBefore = reuseEnabled ? fingerprintBefore(messages, modelKey) : null; let reuseEntry = reuseEnabled ? poolCheckout(fpBefore) : null; + let checkedOutReuseEntry = reuseEntry; if (reuseEntry) log.info(`Chat[${reqId}]: reuse HIT cascade=${reuseEntry.cascadeId.slice(0, 8)} model=${displayModel}`); // Non-stream: retry with a different account on model-not-available errors @@ -286,6 +305,22 @@ export async function handleChatCompletions(body) { } if (!acct) { log.info(`Chat[${reqId}]: reuse MISS — owning account not available after 5s wait`); + if (CASCADE_REUSE_STRICT && checkedOutReuseEntry && fpBefore) { + const availability = getAccountAvailability(checkedOutReuseEntry.apiKey, modelKey); + const retryAfterMs = strictReuseRetryMs(availability); + poolCheckin(fpBefore, checkedOutReuseEntry); + log.info(`Chat[${reqId}]: strict reuse preserved cascade; owner unavailable reason=${availability.reason}`); + return { + status: 429, + body: { + error: { + message: strictReuseMessage(displayModel, retryAfterMs, availability.reason), + type: 'rate_limit_exceeded', + retry_after_ms: retryAfterMs, + }, + }, + }; + } reuseEntry = null; } } @@ -305,6 +340,22 @@ export async function handleChatCompletions(body) { if (!rl.hasCapacity) { log.warn(`Preflight: ${acct.email} has no capacity (remaining=${rl.messagesRemaining}), skipping`); markRateLimited(acct.apiKey, 5 * 60 * 1000, modelKey); + if (CASCADE_REUSE_STRICT && checkedOutReuseEntry && fpBefore && checkedOutReuseEntry.apiKey === acct.apiKey) { + const availability = getAccountAvailability(acct.apiKey, modelKey); + const retryAfterMs = strictReuseRetryMs(availability); + poolCheckin(fpBefore, checkedOutReuseEntry); + log.info(`Chat[${reqId}]: strict reuse preserved cascade after preflight rate limit`); + return { + status: 429, + body: { + error: { + message: strictReuseMessage(displayModel, retryAfterMs, availability.reason), + type: 'rate_limit_exceeded', + retry_after_ms: retryAfterMs, + }, + }, + }; + } continue; } } catch (e) { @@ -320,6 +371,7 @@ export async function handleChatCompletions(body) { // born on has been replaced, the cascade_id is dead. if (reuseEntry && reuseEntry.lsPort !== ls.port) { log.info(`Chat[${reqId}]: reuse MISS — LS port changed`); + checkedOutReuseEntry = null; reuseEntry = null; } const _msgChars = (messages || []).reduce((n, m) => { @@ -340,6 +392,22 @@ export async function handleChatCompletions(body) { const errType = result.body?.error?.type; // Rate limit: this account is done for this model, try the next one if (errType === 'rate_limit_exceeded') { + if (CASCADE_REUSE_STRICT && checkedOutReuseEntry && fpBefore && checkedOutReuseEntry.apiKey === acct.apiKey) { + const availability = getAccountAvailability(acct.apiKey, modelKey); + const retryAfterMs = strictReuseRetryMs(availability); + poolCheckin(fpBefore, checkedOutReuseEntry); + log.info(`Chat[${reqId}]: strict reuse preserved cascade after rate limit`); + return { + status: 429, + body: { + error: { + message: strictReuseMessage(displayModel, retryAfterMs, availability.reason), + type: 'rate_limit_exceeded', + retry_after_ms: retryAfterMs, + }, + }, + }; + } log.warn(`Account ${acct.email} rate-limited on ${displayModel}, trying next account`); continue; } @@ -354,9 +422,17 @@ export async function handleChatCompletions(body) { if (!lastErr || lastErr.status === 429) { const rl = isAllRateLimited(modelKey); if (rl.allLimited) { + if (checkedOutReuseEntry && fpBefore) { + poolCheckin(fpBefore, checkedOutReuseEntry); + log.info(`Chat[${reqId}]: restored checked-out cascade after rate limit`); + } return { status: 429, body: { error: { message: `${displayModel} 所有账号均已达速率限制,请 ${Math.ceil(rl.retryAfterMs / 1000)} 秒后重试`, type: 'rate_limit_exceeded', retry_after_ms: rl.retryAfterMs } } }; } } + if (checkedOutReuseEntry && fpBefore) { + poolCheckin(fpBefore, checkedOutReuseEntry); + log.info(`Chat[${reqId}]: restored checked-out cascade after failed request`); + } return lastErr || { status: 503, body: { error: { message: 'No active accounts available', type: 'pool_exhausted' } } }; } @@ -476,7 +552,7 @@ async function nonStreamResponse(client, id, created, model, modelKey, messages, const isRateLimit = /rate limit|rate_limit|too many requests|quota/i.test(err.message); const isInternal = /internal error occurred.*error id/i.test(err.message); if (isAuthFail) reportError(apiKey); - if (isRateLimit) { markRateLimited(apiKey, 5 * 60 * 1000, modelKey); err.isRateLimit = true; err.isModelError = true; } + if (isRateLimit) { markRateLimited(apiKey, rateLimitCooldownMs(err.message), modelKey); err.isRateLimit = true; err.isModelError = true; } if (isInternal) { reportInternalError(apiKey); err.isModelError = true; } if (err.isModelError && !isRateLimit && !isInternal) { updateCapability(apiKey, modelKey, false, 'model_error'); @@ -598,6 +674,7 @@ function streamResponse(id, created, model, modelKey, messages, cascadeMessages, const reuseEnabled = useCascade && isExperimentalEnabled('cascadeConversationReuse'); const fpBefore = reuseEnabled ? fingerprintBefore(messages, modelKey) : null; let reuseEntry = reuseEnabled ? poolCheckout(fpBefore) : null; + let checkedOutReuseEntry = reuseEntry; if (reuseEntry) log.info(`Chat: cascade reuse HIT cascadeId=${reuseEntry.cascadeId.slice(0, 8)}… stream model=${model}`); // Always strip / blocks in Cascade mode. @@ -702,6 +779,13 @@ function streamResponse(id, created, model, modelKey, messages, cascadeMessages, } if (!acct) { log.info(`Chat[${reqId}]: reuse MISS — owning account not available after 5s wait`); + if (CASCADE_REUSE_STRICT && checkedOutReuseEntry && fpBefore) { + const availability = getAccountAvailability(checkedOutReuseEntry.apiKey, modelKey); + const retryAfterMs = strictReuseRetryMs(availability); + lastErr = new Error(strictReuseMessage(model, retryAfterMs, availability.reason)); + log.info(`Chat[${reqId}]: strict reuse preserved cascade; owner unavailable reason=${availability.reason}`); + break; + } reuseEntry = null; } } @@ -721,6 +805,13 @@ function streamResponse(id, created, model, modelKey, messages, cascadeMessages, if (!rl.hasCapacity) { log.warn(`Preflight: ${acct.email} has no capacity (remaining=${rl.messagesRemaining}), skipping`); markRateLimited(acct.apiKey, 5 * 60 * 1000, modelKey); + if (CASCADE_REUSE_STRICT && checkedOutReuseEntry && fpBefore && checkedOutReuseEntry.apiKey === acct.apiKey) { + const availability = getAccountAvailability(acct.apiKey, modelKey); + const retryAfterMs = strictReuseRetryMs(availability); + lastErr = new Error(strictReuseMessage(model, retryAfterMs, availability.reason)); + log.info(`Chat[${reqId}]: strict reuse preserved cascade after preflight rate limit`); + break; + } continue; } } catch (e) { @@ -733,6 +824,7 @@ function streamResponse(id, created, model, modelKey, messages, cascadeMessages, if (!ls) { lastErr = new Error('No LS instance available'); break; } if (reuseEntry && reuseEntry.lsPort !== ls.port) { log.info(`Chat[${reqId}]: reuse MISS — LS port changed`); + checkedOutReuseEntry = null; reuseEntry = null; } const _msgCharsStream = (messages || []).reduce((n, m) => { @@ -813,11 +905,15 @@ function streamResponse(id, created, model, modelKey, messages, cascadeMessages, const isRateLimit = /rate limit|rate_limit|too many requests|quota/i.test(err.message); const isInternal = /internal error occurred.*error id/i.test(err.message); if (isAuthFail) reportError(currentApiKey); - if (isRateLimit) { markRateLimited(currentApiKey, 5 * 60 * 1000, modelKey); err.isRateLimit = true; err.isModelError = true; } + if (isRateLimit) { markRateLimited(currentApiKey, rateLimitCooldownMs(err.message), modelKey); err.isRateLimit = true; err.isModelError = true; } if (isInternal) { reportInternalError(currentApiKey); err.isModelError = true; } if (err.isModelError && !isRateLimit && !isInternal) { updateCapability(currentApiKey, modelKey, false, 'model_error'); } + if (isRateLimit && CASCADE_REUSE_STRICT && checkedOutReuseEntry && fpBefore && checkedOutReuseEntry.apiKey === currentApiKey) { + log.info(`Chat[${reqId}]: strict reuse preserved cascade after rate limit`); + break; + } // Retry only if nothing has been streamed yet AND it's a retryable error if (!hadSuccess && (err.isModelError || isRateLimit)) { const tag = isRateLimit ? 'rate_limit' : isInternal ? 'internal_error' : 'model_error'; @@ -836,6 +932,10 @@ function streamResponse(id, created, model, modelKey, messages, cascadeMessages, const errMsg = rl.allLimited ? `${model} 所有账号均已达速率限制,请 ${Math.ceil(rl.retryAfterMs / 1000)} 秒后重试` : sanitizeText(lastErr?.message || 'no accounts'); + if (!hadSuccess && checkedOutReuseEntry && fpBefore) { + poolCheckin(fpBefore, checkedOutReuseEntry); + log.info(`Chat[${reqId}]: restored checked-out cascade after failed stream`); + } if (hadSuccess) { // We already streamed real assistant content. Injecting diff --git a/src/langserver.js b/src/langserver.js index da6473b..1bc0c5c 100644 --- a/src/langserver.js +++ b/src/langserver.js @@ -13,6 +13,7 @@ import { mkdirSync } from 'fs'; import { existsSync } from 'fs'; import http2 from 'http2'; import net from 'net'; +import { resolve } from 'path'; import { log } from './config.js'; import { closeSessionForPort } from './grpc.js'; @@ -20,6 +21,7 @@ const DEFAULT_BINARY = '/opt/windsurf/language_server_linux_x64'; const DEFAULT_PORT = 42100; const DEFAULT_CSRF = 'windsurf-api-csrf-fixed-token'; const DEFAULT_API_URL = 'https://server.self-serve.windsurf.com'; +const DEFAULT_DATA_ROOT = '/opt/windsurf/data'; // Pool: key -> { process, port, csrfToken, proxy, startedAt, ready } const _pool = new Map(); @@ -34,13 +36,20 @@ let _apiServerUrl = DEFAULT_API_URL; function proxyKey(proxy) { if (!proxy || !proxy.host) return 'default'; // Sanitize to [A-Za-z0-9_] — the key flows into a filesystem path - // (`/opt/windsurf/data/${key}`) and a shell-quoted mkdir, so strip any + // (`${LS_DATA_DIR}/${key}`) and a shell-quoted mkdir, so strip any // special character that could slip past execSync's naive quoting. const safeHost = proxy.host.replace(/[^a-zA-Z0-9]/g, '_'); const safePort = String(proxy.port || 8080).replace(/[^0-9]/g, ''); return `px_${safeHost}_${safePort}`; } +function dataDirForKey(key) { + const root = process.env.LS_DATA_DIR + ? resolve(process.cwd(), process.env.LS_DATA_DIR) + : DEFAULT_DATA_ROOT; + return `${root}/${key}`; +} + function proxyUrl(proxy) { if (!proxy || !proxy.host) return null; const auth = proxy.username @@ -124,7 +133,7 @@ export async function ensureLs(proxy = null) { } } - const dataDir = `/opt/windsurf/data/${key}`; + const dataDir = dataDirForKey(key); try { mkdirSync(`${dataDir}/db`, { recursive: true }); } catch (e) { log.warn(`mkdirSync ${dataDir}/db: ${e.message}`); } const args = [