Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/api/providers/__tests__/openai-native.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,9 @@ describe("OpenAiNativeHandler", () => {
},
],
}),
expect.objectContaining({
signal: expect.any(Object),
}),
)
})

Expand Down Expand Up @@ -1136,6 +1139,9 @@ describe("GPT-5 streaming event coverage (additional)", () => {
stream: false,
store: false,
}),
expect.objectContaining({
signal: expect.any(Object),
}),
)
})

Expand Down
36 changes: 34 additions & 2 deletions src/api/providers/openai-native.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ export class OpenAiNativeHandler extends BaseProvider implements SingleCompletio
private lastResponseOutput: any[] | undefined
// Last top-level response id from Responses API (for troubleshooting)
private lastResponseId: string | undefined
// Abort controller for cancelling ongoing requests
private abortController?: AbortController
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using an instance-level abort controller creates a concurrency hazard. If multiple requests are made concurrently (which can happen in tests or when the handler is reused), they would all share the same abortController instance. This means:

  1. Starting a new request would overwrite the abort controller from a previous in-flight request
  2. Aborting one request could unintentionally abort another concurrent request
  3. The finally block of one request could clear the abort controller while another request is still using it

The Bedrock provider uses method-scoped abort controllers (see bedrock.ts:403-413) which avoids this issue. Consider either:

  • Making the abort controller method-scoped like Bedrock
  • Or if instance-level cancellation is truly needed, implement proper concurrency control (e.g., tracking multiple in-flight requests)

Fix it with Roo Code or mention @roomote and request a fix.


// Event types handled by the shared event processor to avoid duplication
private readonly coreHandledEventTypes = new Set<string>([
Expand Down Expand Up @@ -255,9 +257,14 @@ export class OpenAiNativeHandler extends BaseProvider implements SingleCompletio
systemPrompt?: string,
messages?: Anthropic.Messages.MessageParam[],
): ApiStream {
// Create AbortController for cancellation
this.abortController = new AbortController()

try {
// Use the official SDK
const stream = (await (this.client as any).responses.create(requestBody)) as AsyncIterable<any>
const stream = (await (this.client as any).responses.create(requestBody, {
signal: this.abortController.signal,
})) as AsyncIterable<any>

if (typeof (stream as any)[Symbol.asyncIterator] !== "function") {
throw new Error(
Expand All @@ -266,13 +273,20 @@ export class OpenAiNativeHandler extends BaseProvider implements SingleCompletio
}

for await (const event of stream) {
// Check if request was aborted
if (this.abortController.signal.aborted) {
break
}

for await (const outChunk of this.processEvent(event, model)) {
yield outChunk
}
}
} catch (sdkErr: any) {
// For errors, fallback to manual SSE via fetch
yield* this.makeResponsesApiRequest(requestBody, model, metadata, systemPrompt, messages)
} finally {
this.abortController = undefined
}
}

Expand Down Expand Up @@ -342,6 +356,9 @@ export class OpenAiNativeHandler extends BaseProvider implements SingleCompletio
const baseUrl = this.options.openAiNativeBaseUrl || "https://api.openai.com"
const url = `${baseUrl}/v1/responses`

// Create AbortController for cancellation
this.abortController = new AbortController()

try {
const response = await fetch(url, {
method: "POST",
Expand All @@ -351,6 +368,7 @@ export class OpenAiNativeHandler extends BaseProvider implements SingleCompletio
Accept: "text/event-stream",
},
body: JSON.stringify(requestBody),
signal: this.abortController.signal,
})

if (!response.ok) {
Expand Down Expand Up @@ -426,6 +444,8 @@ export class OpenAiNativeHandler extends BaseProvider implements SingleCompletio
}
// Handle non-Error objects
throw new Error(`Unexpected error connecting to Responses API`)
} finally {
this.abortController = undefined
}
}

Expand All @@ -446,6 +466,11 @@ export class OpenAiNativeHandler extends BaseProvider implements SingleCompletio

try {
while (true) {
// Check if request was aborted
if (this.abortController?.signal.aborted) {
break
}

const { done, value } = await reader.read()
if (done) break

Expand Down Expand Up @@ -1076,6 +1101,9 @@ export class OpenAiNativeHandler extends BaseProvider implements SingleCompletio
}

async completePrompt(prompt: string): Promise<string> {
// Create AbortController for cancellation
this.abortController = new AbortController()

try {
const model = this.getModel()
const { verbosity, reasoning } = model
Expand Down Expand Up @@ -1135,7 +1163,9 @@ export class OpenAiNativeHandler extends BaseProvider implements SingleCompletio
}

// Make the non-streaming request
const response = await (this.client as any).responses.create(requestBody)
const response = await (this.client as any).responses.create(requestBody, {
signal: this.abortController.signal,
})

// Extract text from the response
if (response?.output && Array.isArray(response.output)) {
Expand All @@ -1161,6 +1191,8 @@ export class OpenAiNativeHandler extends BaseProvider implements SingleCompletio
throw new Error(`OpenAI Native completion error: ${error.message}`)
}
throw error
} finally {
this.abortController = undefined
}
}
}
Loading