From bf7f19ad67db434d0da035f696a664caa4626792 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 12 Mar 2026 16:36:40 +0000 Subject: [PATCH 1/2] Initial plan From f00ef1d7648b46466def3009b1e997deb25a27c8 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 12 Mar 2026 16:45:23 +0000 Subject: [PATCH 2/2] Add async iterable support to ChatClient.completeStreamingChat() Co-authored-by: baijumeswani <12852605+baijumeswani@users.noreply.github.com> --- samples/js/native-chat-completions/app.js | 18 ++- sdk_v2/js/docs/classes/ChatClient.md | 93 +++++++++++++- sdk_v2/js/src/openai/chatClient.ts | 148 +++++++++++++++++++++- sdk_v2/js/test/openai/chatClient.test.ts | 75 ++++++++++- 4 files changed, 320 insertions(+), 14 deletions(-) diff --git a/samples/js/native-chat-completions/app.js b/samples/js/native-chat-completions/app.js index af566ef7..79526eec 100644 --- a/samples/js/native-chat-completions/app.js +++ b/samples/js/native-chat-completions/app.js @@ -39,10 +39,22 @@ const completion = await chatClient.completeChat([ console.log('\nChat completion result:'); console.log(completion.choices[0]?.message?.content); -// Example streaming completion -console.log('\nTesting streaming completion...'); +// Example streaming completion (async iterable pattern - recommended) +console.log('\nTesting streaming completion (async iterable)...'); +for await (const chunk of chatClient.completeStreamingChat( + [{ role: 'user', content: 'Write a short poem about programming.' }] +)) { + const content = chunk.choices?.[0]?.delta?.content; + if (content) { + process.stdout.write(content); + } +} +console.log('\n'); + +// Example streaming completion (callback pattern - backward-compatible) +console.log('\nTesting streaming completion (callback)...'); await chatClient.completeStreamingChat( - [{ role: 'user', content: 'Write a short poem about programming.' }], + [{ role: 'user', content: 'Write a short poem about nature.' }], (chunk) => { const content = chunk.choices?.[0]?.message?.content; if (content) { diff --git a/sdk_v2/js/docs/classes/ChatClient.md b/sdk_v2/js/docs/classes/ChatClient.md index 70ce80c4..d897e9d8 100644 --- a/sdk_v2/js/docs/classes/ChatClient.md +++ b/sdk_v2/js/docs/classes/ChatClient.md @@ -45,11 +45,76 @@ Error - If messages are invalid or completion fails. ### completeStreamingChat() +#### Overload 1: Async iterable pattern (recommended) + +```ts +completeStreamingChat(messages): AsyncIterable; +``` + +Performs a streaming chat completion, returning an async iterable. This follows the same pattern as the OpenAI SDK: + +```ts +for await (const chunk of chatClient.completeStreamingChat(messages)) { + process.stdout.write(chunk.choices?.[0]?.delta?.content ?? ''); +} +``` + +#### Parameters + +| Parameter | Type | Description | +| ------ | ------ | ------ | +| `messages` | `any`[] | An array of message objects. | + +#### Returns + +`AsyncIterable`\<`any`\> + +An async iterable that yields each chunk of the streaming response. + +#### Throws + +Error - If messages are invalid, or streaming fails. + +*** + +#### Overload 2: Async iterable pattern with tools + +```ts +completeStreamingChat(messages, tools): AsyncIterable; +``` + +#### Parameters + +| Parameter | Type | Description | +| ------ | ------ | ------ | +| `messages` | `any`[] | An array of message objects. | +| `tools` | `any`[] | An array of tool objects. | + +#### Returns + +`AsyncIterable`\<`any`\> + +An async iterable that yields each chunk of the streaming response. + +#### Throws + +Error - If messages or tools are invalid, or streaming fails. + +*** + +#### Overload 3: Callback pattern (backward-compatible) + ```ts completeStreamingChat(messages, callback): Promise; ``` -Performs a streaming chat completion. +Performs a streaming chat completion using a callback for each chunk. This pattern is kept for backward compatibility. + +```ts +await chatClient.completeStreamingChat(messages, (chunk) => { + process.stdout.write(chunk.choices?.[0]?.delta?.content ?? ''); +}); +``` #### Parameters @@ -67,3 +132,29 @@ A promise that resolves when the stream is complete. #### Throws Error - If messages or callback are invalid, or streaming fails. + +*** + +#### Overload 4: Callback pattern with tools (backward-compatible) + +```ts +completeStreamingChat(messages, tools, callback): Promise; +``` + +#### Parameters + +| Parameter | Type | Description | +| ------ | ------ | ------ | +| `messages` | `any`[] | An array of message objects. | +| `tools` | `any`[] | An array of tool objects. | +| `callback` | (`chunk`) => `void` | A callback function that receives each chunk of the streaming response. | + +#### Returns + +`Promise`\<`void`\> + +A promise that resolves when the stream is complete. + +#### Throws + +Error - If messages, tools, or callback are invalid, or streaming fails. diff --git a/sdk_v2/js/src/openai/chatClient.ts b/sdk_v2/js/src/openai/chatClient.ts index 7aa77170..763cbdfc 100644 --- a/sdk_v2/js/src/openai/chatClient.ts +++ b/sdk_v2/js/src/openai/chatClient.ts @@ -212,25 +212,161 @@ export class ChatClient { /** * Performs a streaming chat completion. + * + * Can be used with the async iterable pattern (no callback): + * ```ts + * for await (const chunk of chatClient.completeStreamingChat(messages)) { + * process.stdout.write(chunk.choices?.[0]?.delta?.content ?? ''); + * } + * ``` + * + * Or with the callback pattern (for backward compatibility): + * ```ts + * await chatClient.completeStreamingChat(messages, (chunk) => { + * process.stdout.write(chunk.choices?.[0]?.delta?.content ?? ''); + * }); + * ``` + * + * @param messages - An array of message objects. + * @returns An async iterable that yields each chunk of the streaming response. + * @throws Error - If messages or tools are invalid, or streaming fails. + */ + public completeStreamingChat(messages: any[]): AsyncIterable; + /** + * Performs a streaming chat completion with tools. + * @param messages - An array of message objects. + * @param tools - An array of tool objects. + * @returns An async iterable that yields each chunk of the streaming response. + * @throws Error - If messages or tools are invalid, or streaming fails. + */ + public completeStreamingChat(messages: any[], tools: any[]): AsyncIterable; + /** + * Performs a streaming chat completion with a callback (backward-compatible). + * @param messages - An array of message objects. + * @param callback - A callback function that receives each chunk of the streaming response. + * @returns A promise that resolves when the stream is complete. + * @throws Error - If messages or callback are invalid, or streaming fails. + */ + public completeStreamingChat(messages: any[], callback: (chunk: any) => void): Promise; + /** + * Performs a streaming chat completion with tools and a callback (backward-compatible). * @param messages - An array of message objects. * @param tools - An array of tool objects. * @param callback - A callback function that receives each chunk of the streaming response. * @returns A promise that resolves when the stream is complete. * @throws Error - If messages, tools, or callback are invalid, or streaming fails. */ - public async completeStreamingChat(messages: any[], callback: (chunk: any) => void): Promise; - public async completeStreamingChat(messages: any[], tools: any[], callback: (chunk: any) => void): Promise; - public async completeStreamingChat(messages: any[], toolsOrCallback: any[] | ((chunk: any) => void), maybeCallback?: (chunk: any) => void): Promise { + public completeStreamingChat(messages: any[], tools: any[], callback: (chunk: any) => void): Promise; + public completeStreamingChat( + messages: any[], + toolsOrCallback?: any[] | ((chunk: any) => void), + maybeCallback?: (chunk: any) => void + ): AsyncIterable | Promise { const tools = Array.isArray(toolsOrCallback) ? toolsOrCallback : undefined; - const callback = (Array.isArray(toolsOrCallback) ? maybeCallback : toolsOrCallback) as ((chunk: any) => void) | undefined; + const callback = typeof toolsOrCallback === 'function' ? toolsOrCallback : maybeCallback; this.validateMessages(messages); this.validateTools(tools); - if (!callback || typeof callback !== 'function') { - throw new Error('Callback must be a valid function.'); + if (callback !== undefined) { + if (typeof callback !== 'function') { + throw new Error('Callback must be a valid function.'); + } + return this._completeStreamingChatWithCallback(messages, tools, callback); } + return this._streamChat(messages, tools); + } + + /** + * Internal async generator that bridges the native callback-based streaming API + * to an async iterable interface. + * @internal + */ + private async *_streamChat(messages: any[], tools?: any[]): AsyncIterableIterator { + const request = { + model: this.modelId, + messages, + ...(tools ? { tools } : {}), + stream: true, + ...this.settings._serialize() + }; + + const chunks: any[] = []; + let streamDone = false; + let streamError: Error | null = null; + let notify: (() => void) | null = null; + + const wakeConsumer = () => { + if (notify) { const n = notify; notify = null; n(); } + }; + + const streamPromise = this.coreInterop.executeCommandStreaming( + 'chat_completions', + { Params: { OpenAICreateRequest: JSON.stringify(request) } }, + (chunkStr: string) => { + // Skip processing if we already encountered an error + if (streamError) return; + + if (chunkStr) { + let chunk: any; + try { + chunk = JSON.parse(chunkStr); + } catch (e) { + // Don't throw from callback - store first error and stop processing + streamError = new Error( + `Failed to parse streaming chunk: ${e instanceof Error ? e.message : String(e)}`, + { cause: e } + ); + wakeConsumer(); + return; + } + chunks.push(chunk); + wakeConsumer(); + } + } + ).then(() => { + streamDone = true; + wakeConsumer(); + }).catch((err: unknown) => { + streamError = err instanceof Error ? err : new Error(String(err)); + streamDone = true; + wakeConsumer(); + }); + + try { + while (!streamDone && !streamError) { + while (chunks.length > 0) { + yield chunks.shift()!; + } + if (!streamDone && !streamError) { + await new Promise(resolve => { notify = resolve; }); + } + } + // Drain any remaining chunks that arrived before streamDone was observed + while (chunks.length > 0) { + yield chunks.shift()!; + } + } finally { + await streamPromise; + } + + // TypeScript's control-flow analysis doesn't track mutations through closures, + // so cast through unknown to widen the narrowed type before checking. + const maybeError = streamError as unknown; + if (maybeError instanceof Error) { + throw new Error( + `Streaming chat completion failed for model '${this.modelId}': ${maybeError.message}`, + { cause: maybeError } + ); + } + } + + /** + * Internal callback-based streaming implementation (backward-compatible). + * @internal + */ + private async _completeStreamingChatWithCallback(messages: any[], tools: any[] | undefined, callback: (chunk: any) => void): Promise { const request = { model: this.modelId, messages, diff --git a/sdk_v2/js/test/openai/chatClient.test.ts b/sdk_v2/js/test/openai/chatClient.test.ts index 6c0b1c4b..8aaa2e49 100644 --- a/sdk_v2/js/test/openai/chatClient.test.ts +++ b/sdk_v2/js/test/openai/chatClient.test.ts @@ -46,7 +46,7 @@ describe('Chat Client Tests', () => { } }); - it('should perform streaming chat completion', async function() { + it('should perform streaming chat completion (callback pattern)', async function() { this.timeout(10000); const manager = getTestManager(); const catalog = manager.catalog; @@ -119,6 +119,57 @@ describe('Chat Client Tests', () => { } }); + it('should perform streaming chat completion (async iterable pattern)', async function() { + this.timeout(10000); + const manager = getTestManager(); + const catalog = manager.catalog; + + // Ensure cache is populated first + const cachedModels = await catalog.getCachedModels(); + expect(cachedModels.length).to.be.greaterThan(0); + + const cachedVariant = cachedModels.find(m => m.alias === TEST_MODEL_ALIAS); + expect(cachedVariant).to.not.be.undefined; + + const model = await catalog.getModel(TEST_MODEL_ALIAS); + + expect(model).to.not.be.undefined; + if (!cachedVariant) return; + + model.selectVariant(cachedVariant.id); + + await model.load(); + + try { + const client = model.createChatClient(); + + client.settings.maxTokens = 500; + client.settings.temperature = 0.0; // for deterministic results + + const messages = [ + { role: 'user', content: 'You are a calculator. Be precise. What is the answer to 7 multiplied by 6?' } + ]; + + let fullContent = ''; + let chunkCount = 0; + + for await (const chunk of client.completeStreamingChat(messages)) { + chunkCount++; + const content = chunk.choices?.[0]?.delta?.content; + if (content) { + fullContent += content; + } + } + + expect(chunkCount).to.be.greaterThan(0); + expect(fullContent).to.be.a('string'); + console.log(`Async iterable response: ${fullContent}`); + expect(fullContent).to.include('42'); + } finally { + await model.unload(); + } + }); + it('should throw when completing chat with empty, null, or undefined messages', async function() { const manager = getTestManager(); const catalog = manager.catalog; @@ -171,6 +222,7 @@ describe('Chat Client Tests', () => { const invalidMessages: any[] = [[], null, undefined]; for (const invalidMessage of invalidMessages) { + // Test with callback pattern try { await client.completeStreamingChat(invalidMessage, () => {}); expect.fail(`Should have thrown an error for ${Array.isArray(invalidMessage) ? 'empty' : invalidMessage} messages`); @@ -178,19 +230,34 @@ describe('Chat Client Tests', () => { expect(error).to.be.instanceOf(Error); expect((error as Error).message).to.include('Messages array cannot be null, undefined, or empty.'); } + // Test with async iterable pattern + try { + // Trigger iteration to materialize the error + const iter = client.completeStreamingChat(invalidMessage); + await iter[Symbol.asyncIterator]().next(); + expect.fail(`Should have thrown an error for ${Array.isArray(invalidMessage) ? 'empty' : invalidMessage} messages`); + } catch (error) { + expect(error).to.be.instanceOf(Error); + expect((error as Error).message).to.include('Messages array cannot be null, undefined, or empty.'); + } } }); - it('should throw when completing streaming chat with invalid callback', async function() { + it('should throw when completing streaming chat with invalid callback (tools + bad callback)', async function() { const manager = getTestManager(); const catalog = manager.catalog; const model = await catalog.getModel(TEST_MODEL_ALIAS); const client = model.createChatClient(); const messages = [{ role: 'user', content: 'Hello' }]; - const invalidCallbacks: any[] = [null, undefined, {} as any, 'not a function' as any]; + const tools: any[] = []; + // When tools array is provided as second arg, the third arg must be a function if present. + // We use `as any` here intentionally to bypass TypeScript type-checking and verify + // that invalid runtime values are rejected with a descriptive error. + const invalidCallbacks: any[] = [{} as any, 'not a function' as any, 42 as any]; for (const invalidCallback of invalidCallbacks) { try { - await client.completeStreamingChat(messages as any, invalidCallback as any); + // eslint-disable-next-line @typescript-eslint/no-explicit-any + await (client.completeStreamingChat as any)(messages, tools, invalidCallback); expect.fail('Should have thrown an error for invalid callback'); } catch (error) { expect(error).to.be.instanceOf(Error);