Skip to content
17 changes: 17 additions & 0 deletions cloudflare-gastown/container/src/control-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,13 @@ function syncTownConfigToProcessEnv(): void {
} else {
delete process.env.GASTOWN_DISABLE_AI_COAUTHOR;
}

// Keep the standalone env var in sync with the town config so org
// billing context is never lost across model changes.
const orgId = cfg.organization_id;
if (typeof orgId === 'string' && orgId) {
process.env.GASTOWN_ORGANIZATION_ID = orgId;
}
}

export const app = new Hono();
Expand Down Expand Up @@ -216,6 +223,11 @@ app.post('/agents/start', async c => {
return c.json({ error: 'Invalid request body', issues: parsed.error.issues }, 400);
}

// Persist the organization ID as a standalone env var so it survives
// config rebuilds (e.g. model hot-swap). The env var is the primary
// source of truth; KILO_CONFIG_CONTENT extraction is the fallback.
process.env.GASTOWN_ORGANIZATION_ID = parsed.data.organizationId ?? '';

console.log(
`[control-server] /agents/start: role=${parsed.data.role} name=${parsed.data.name} rigId=${parsed.data.rigId} agentId=${parsed.data.agentId}`
);
Expand Down Expand Up @@ -285,6 +297,11 @@ app.patch('/agents/:agentId/model', async c => {
return c.json({ error: 'Invalid request body', issues: parsed.error.issues }, 400);
}

// Update org billing context from the request body if provided.
if (parsed.data.organizationId) {
process.env.GASTOWN_ORGANIZATION_ID = parsed.data.organizationId;
}

// Sync config-derived env vars from X-Town-Config into process.env so
// the SDK server restart picks up fresh tokens and git identity.
// The middleware already parsed the header into lastKnownTownConfig.
Expand Down
89 changes: 89 additions & 0 deletions cloudflare-gastown/container/src/process-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -620,6 +620,14 @@ export async function startAgent(
console.log(
`${MANAGER_LOG} startAgent: stopping existing session for ${request.agentId} (status=${existing.status})`
);

// If the agent is still starting, abort the in-flight startup to prevent
// an orphaned session from being created after stopAgent returns.
if (existing.status === 'starting' && existing.startupAbortController) {
console.log(`${MANAGER_LOG} startAgent: aborting in-flight startup for ${request.agentId}`);
existing.startupAbortController.abort();
}

await stopAgent(request.agentId).catch(err => {
console.warn(
`${MANAGER_LOG} startAgent: failed to stop existing session for ${request.agentId}`,
Expand All @@ -629,6 +637,7 @@ export async function startAgent(
}

const now = new Date().toISOString();
const startupAbortController = new AbortController();
const agent: ManagedAgent = {
agentId: request.agentId,
rigId: request.rigId,
Expand All @@ -653,15 +662,22 @@ export async function startAgent(
completionCallbackUrl: request.envVars?.GASTOWN_COMPLETION_CALLBACK_URL ?? null,
model: request.model ?? null,
startupEnv: env,
startupAbortController,
};
agents.set(request.agentId, agent);

const { signal } = startupAbortController;
let sessionCounted = false;
try {
// 1. Ensure SDK server is running for this workdir
const { client, port } = await ensureSDKServer(workdir, env);
agent.serverPort = port;

// Check if startup was cancelled while waiting for the SDK server
if (signal.aborted) {
throw new StartupAbortedError(request.agentId);
}

// Track session count on the SDK instance
const instance = sdkInstances.get(workdir);
if (instance) {
Expand All @@ -671,6 +687,10 @@ export async function startAgent(

// 2. Create a session
const sessionResult = await client.session.create({ body: {} });

// Parse and store the session ID immediately so the catch block can
// abort an orphaned session if startupAbortController fires during
// the await above.
const rawSession: unknown = sessionResult.data ?? sessionResult;
const parsed = SessionResponse.safeParse(rawSession);
if (!parsed.success) {
Expand All @@ -684,6 +704,12 @@ export async function startAgent(
const sessionId = parsed.data.id;
agent.sessionId = sessionId;

// Now check if startup was cancelled while creating the session.
// agent.sessionId is already set, so the catch block will abort it.
if (signal.aborted) {
throw new StartupAbortedError(request.agentId);
}

// 3. Subscribe to events (async, runs in background)
void subscribeToEvents(client, agent, request);

Expand All @@ -705,6 +731,11 @@ export async function startAgent(
modelParam = { providerID: 'kilo', modelID: request.model };
}

// Final abort check before sending the prompt
if (signal.aborted) {
throw new StartupAbortedError(request.agentId);
}

await client.session.prompt({
path: { id: sessionId },
body: {
Expand All @@ -722,6 +753,7 @@ export async function startAgent(
sessionCounted = false;
throw new Error('Event stream failed during initial prompt');
}
agent.startupAbortController = null;

agent.messageCount = 1;

Expand All @@ -735,7 +767,39 @@ export async function startAgent(

return agent;
} catch (err) {
// On abort, clean up silently — the new startAgent invocation will
// proceed with a fresh entry.
if (err instanceof StartupAbortedError) {
console.log(`${MANAGER_LOG} startAgent: startup aborted for ${request.agentId}, cleaning up`);
if (sessionCounted) {
const instance = sdkInstances.get(workdir);
if (instance) {
// Abort the orphaned session if one was created before the abort
if (agent.sessionId) {
try {
await instance.client.session.abort({ path: { id: agent.sessionId } });
} catch (abortErr) {
console.error(
`${MANAGER_LOG} startAgent: failed to abort orphaned session ${agent.sessionId}:`,
abortErr
);
}
}
instance.sessionCount--;
Comment thread
jrf0110 marked this conversation as resolved.
if (instance.sessionCount <= 0) {
instance.server.close();
sdkInstances.delete(workdir);
}
}
}
if (agents.get(request.agentId) === agent) {
agents.delete(request.agentId);
}
throw err;
}

agent.status = 'failed';
agent.startupAbortController = null;
agent.exitReason = err instanceof Error ? err.message : String(err);
if (sessionCounted) {
const instance = sdkInstances.get(workdir);
Expand All @@ -745,6 +809,18 @@ export async function startAgent(
}
}

/**
* Thrown when a startup sequence is cancelled via AbortController.
* Distinct from other errors so the catch block can clean up without
* marking the agent as failed (a new startup is taking over).
*/
class StartupAbortedError extends Error {
constructor(agentId: string) {
super(`Startup aborted for agent ${agentId}`);
this.name = 'StartupAbortedError';
}
}

/**
* Stop an agent by aborting its session.
*/
Expand All @@ -753,6 +829,13 @@ export async function stopAgent(agentId: string): Promise<void> {
if (!agent) throw new Error(`Agent ${agentId} not found`);
if (agent.status !== 'running' && agent.status !== 'starting') return;

// If still starting, abort the in-flight startup so session.create()
// doesn't produce an orphaned session after we return.
if (agent.startupAbortController) {
agent.startupAbortController.abort();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WARNING: Restart cleanup can release the same session twice

After aborting the startup controller here, stopAgent() still falls through to the normal session.abort()/sessionCount-- cleanup. If the old startup already reached session.create() and populated agent.sessionId, its startAgent() catch path now performs the same cleanup again, which can drive sessionCount negative and close a shared SDK server while other sessions are still alive. Only one path should own teardown for a starting agent.

agent.startupAbortController = null;
}

agent.status = 'stopping';

// Cancel any pending idle timer
Expand Down Expand Up @@ -839,6 +922,12 @@ export async function sendMessage(agentId: string, prompt: string): Promise<void
* by `buildKiloConfigContent` at agent startup.
*/
function extractOrganizationId(): string | undefined {
// Primary source: standalone env var set by control-server on /agents/start
// and updated on every PATCH /model via X-Town-Config.
const envOrgId = process.env.GASTOWN_ORGANIZATION_ID;
if (envOrgId) return envOrgId;

// Fallback: extract from KILO_CONFIG_CONTENT (legacy path)
const raw = process.env.KILO_CONFIG_CONTENT;
if (!raw) return undefined;
try {
Expand Down
6 changes: 6 additions & 0 deletions cloudflare-gastown/container/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ export const UpdateAgentModelRequest = z.object({
smallModel: z.string().optional(),
/** Pre-formatted conversation history to inject into the new session prompt. */
conversationHistory: z.string().optional(),
/** Organization ID — ensures org billing context is preserved across model changes. */
organizationId: z.string().optional(),
});
export type UpdateAgentModelRequest = z.infer<typeof UpdateAgentModelRequest>;

Expand Down Expand Up @@ -133,6 +135,10 @@ export type ManagedAgent = {
model: string | null;
/** Full env dict from buildAgentEnv, stored so model hot-swap can replay it. */
startupEnv: Record<string, string>;
/** AbortController for the in-flight startup sequence. Aborted when a
* restart is requested while the agent is still in 'starting' status,
* preventing orphaned sessions from leaking. */
startupAbortController: AbortController | null;
};

export type AgentStatusResponse = {
Expand Down
27 changes: 24 additions & 3 deletions cloudflare-gastown/src/dos/Town.do.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2425,14 +2425,20 @@ export class TownDO extends DurableObject<Env> {
// before restarting the SDK server (tokens, git identity, etc.).
const containerConfig = await config.buildContainerConfig(this.ctx.storage, this.env);

// Resolve townConfig to thread the organization_id into the request body
// (belt-and-suspenders: ensures org billing survives even if X-Town-Config
// header parsing fails on the container side).
const townConfig = await config.getTownConfig(this.ctx.storage);

const updated = await dispatch.updateAgentModelInContainer(
this.env,
townId,
mayor.id,
model,
smallModel,
conversationHistory || undefined,
containerConfig
containerConfig,
townConfig.organization_id
);
if (updated) {
console.log(
Expand Down Expand Up @@ -4007,9 +4013,24 @@ export class TownDO extends DurableObject<Env> {
const ghMatch = prUrl.match(/^https:\/\/github\.com\/([^/]+)\/([^/]+)\/pull\/(\d+)/);
if (ghMatch) {
const [, owner, repo, numberStr] = ghMatch;
const token = townConfig.git_auth.github_token;
// Fix 1 & 2: Token fallback chain — github_token → github_cli_pat → platform integration
let token = townConfig.git_auth.github_token ?? townConfig.github_cli_pat;
if (!token) {
// Try resolving from GitHub App installation as final fallback
const integrationId = townConfig.git_auth.platform_integration_id;
if (integrationId && this.env.GIT_TOKEN_SERVICE) {
try {
token = await this.env.GIT_TOKEN_SERVICE.getToken(integrationId);
} catch (err) {
console.warn(
`${TOWN_LOG} checkPRStatus: platform integration token lookup failed for ${integrationId}`,
err
);
}
}
}
if (!token) {
console.warn(`${TOWN_LOG} checkPRStatus: no github_token configured, cannot poll ${prUrl}`);
console.warn(`${TOWN_LOG} checkPRStatus: no github token available, cannot poll ${prUrl}`);
return null;
}

Expand Down
Loading