diff --git a/src/api/providers/__tests__/openai-native.spec.ts b/src/api/providers/__tests__/openai-native.spec.ts index c68c138bc40..73c79aebd9a 100644 --- a/src/api/providers/__tests__/openai-native.spec.ts +++ b/src/api/providers/__tests__/openai-native.spec.ts @@ -159,6 +159,9 @@ describe("OpenAiNativeHandler", () => { }, ], }), + expect.objectContaining({ + signal: expect.any(Object), + }), ) }) @@ -1136,6 +1139,9 @@ describe("GPT-5 streaming event coverage (additional)", () => { stream: false, store: false, }), + expect.objectContaining({ + signal: expect.any(Object), + }), ) }) diff --git a/src/api/providers/openai-native.ts b/src/api/providers/openai-native.ts index accae66f271..743c1f42d12 100644 --- a/src/api/providers/openai-native.ts +++ b/src/api/providers/openai-native.ts @@ -34,6 +34,8 @@ export class OpenAiNativeHandler extends BaseProvider implements SingleCompletio private lastResponseOutput: any[] | undefined // Last top-level response id from Responses API (for troubleshooting) private lastResponseId: string | undefined + // Abort controller for cancelling ongoing requests + private abortController?: AbortController // Event types handled by the shared event processor to avoid duplication private readonly coreHandledEventTypes = new Set([ @@ -255,9 +257,14 @@ export class OpenAiNativeHandler extends BaseProvider implements SingleCompletio systemPrompt?: string, messages?: Anthropic.Messages.MessageParam[], ): ApiStream { + // Create AbortController for cancellation + this.abortController = new AbortController() + try { // Use the official SDK - const stream = (await (this.client as any).responses.create(requestBody)) as AsyncIterable + const stream = (await (this.client as any).responses.create(requestBody, { + signal: this.abortController.signal, + })) as AsyncIterable if (typeof (stream as any)[Symbol.asyncIterator] !== "function") { throw new Error( @@ -266,6 +273,11 @@ export class OpenAiNativeHandler extends BaseProvider implements SingleCompletio } for await (const event of stream) { + // Check if request was aborted + if (this.abortController.signal.aborted) { + break + } + for await (const outChunk of this.processEvent(event, model)) { yield outChunk } @@ -273,6 +285,8 @@ export class OpenAiNativeHandler extends BaseProvider implements SingleCompletio } catch (sdkErr: any) { // For errors, fallback to manual SSE via fetch yield* this.makeResponsesApiRequest(requestBody, model, metadata, systemPrompt, messages) + } finally { + this.abortController = undefined } } @@ -342,6 +356,9 @@ export class OpenAiNativeHandler extends BaseProvider implements SingleCompletio const baseUrl = this.options.openAiNativeBaseUrl || "https://api.openai.com" const url = `${baseUrl}/v1/responses` + // Create AbortController for cancellation + this.abortController = new AbortController() + try { const response = await fetch(url, { method: "POST", @@ -351,6 +368,7 @@ export class OpenAiNativeHandler extends BaseProvider implements SingleCompletio Accept: "text/event-stream", }, body: JSON.stringify(requestBody), + signal: this.abortController.signal, }) if (!response.ok) { @@ -426,6 +444,8 @@ export class OpenAiNativeHandler extends BaseProvider implements SingleCompletio } // Handle non-Error objects throw new Error(`Unexpected error connecting to Responses API`) + } finally { + this.abortController = undefined } } @@ -446,6 +466,11 @@ export class OpenAiNativeHandler extends BaseProvider implements SingleCompletio try { while (true) { + // Check if request was aborted + if (this.abortController?.signal.aborted) { + break + } + const { done, value } = await reader.read() if (done) break @@ -1076,6 +1101,9 @@ export class OpenAiNativeHandler extends BaseProvider implements SingleCompletio } async completePrompt(prompt: string): Promise { + // Create AbortController for cancellation + this.abortController = new AbortController() + try { const model = this.getModel() const { verbosity, reasoning } = model @@ -1135,7 +1163,9 @@ export class OpenAiNativeHandler extends BaseProvider implements SingleCompletio } // Make the non-streaming request - const response = await (this.client as any).responses.create(requestBody) + const response = await (this.client as any).responses.create(requestBody, { + signal: this.abortController.signal, + }) // Extract text from the response if (response?.output && Array.isArray(response.output)) { @@ -1161,6 +1191,8 @@ export class OpenAiNativeHandler extends BaseProvider implements SingleCompletio throw new Error(`OpenAI Native completion error: ${error.message}`) } throw error + } finally { + this.abortController = undefined } } }