Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions samples/js/native-chat-completions/app.js
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,22 @@ const completion = await chatClient.completeChat([
console.log('\nChat completion result:');
console.log(completion.choices[0]?.message?.content);

// Example streaming completion
console.log('\nTesting streaming completion...');
// Example streaming completion (async iterable pattern - recommended)
console.log('\nTesting streaming completion (async iterable)...');
for await (const chunk of chatClient.completeStreamingChat(
[{ role: 'user', content: 'Write a short poem about programming.' }]
)) {
const content = chunk.choices?.[0]?.delta?.content;
if (content) {
process.stdout.write(content);
}
}
console.log('\n');

// Example streaming completion (callback pattern - backward-compatible)
console.log('\nTesting streaming completion (callback)...');
await chatClient.completeStreamingChat(
[{ role: 'user', content: 'Write a short poem about programming.' }],
[{ role: 'user', content: 'Write a short poem about nature.' }],
(chunk) => {
const content = chunk.choices?.[0]?.message?.content;
if (content) {
Expand Down
93 changes: 92 additions & 1 deletion sdk_v2/js/docs/classes/ChatClient.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,76 @@ Error - If messages are invalid or completion fails.

### completeStreamingChat()

#### Overload 1: Async iterable pattern (recommended)

```ts
completeStreamingChat(messages): AsyncIterable<any>;
```

Performs a streaming chat completion, returning an async iterable. This follows the same pattern as the OpenAI SDK:

```ts
for await (const chunk of chatClient.completeStreamingChat(messages)) {
process.stdout.write(chunk.choices?.[0]?.delta?.content ?? '');
}
```

#### Parameters

| Parameter | Type | Description |
| ------ | ------ | ------ |
| `messages` | `any`[] | An array of message objects. |

#### Returns

`AsyncIterable`\<`any`\>

An async iterable that yields each chunk of the streaming response.

#### Throws

Error - If messages are invalid, or streaming fails.

***

#### Overload 2: Async iterable pattern with tools

```ts
completeStreamingChat(messages, tools): AsyncIterable<any>;
```

#### Parameters

| Parameter | Type | Description |
| ------ | ------ | ------ |
| `messages` | `any`[] | An array of message objects. |
| `tools` | `any`[] | An array of tool objects. |

#### Returns

`AsyncIterable`\<`any`\>

An async iterable that yields each chunk of the streaming response.

#### Throws

Error - If messages or tools are invalid, or streaming fails.

***

#### Overload 3: Callback pattern (backward-compatible)

```ts
completeStreamingChat(messages, callback): Promise<void>;
```

Performs a streaming chat completion.
Performs a streaming chat completion using a callback for each chunk. This pattern is kept for backward compatibility.

```ts
await chatClient.completeStreamingChat(messages, (chunk) => {
process.stdout.write(chunk.choices?.[0]?.delta?.content ?? '');
});
```

#### Parameters

Expand All @@ -67,3 +132,29 @@ A promise that resolves when the stream is complete.
#### Throws

Error - If messages or callback are invalid, or streaming fails.

***

#### Overload 4: Callback pattern with tools (backward-compatible)

```ts
completeStreamingChat(messages, tools, callback): Promise<void>;
```

#### Parameters

| Parameter | Type | Description |
| ------ | ------ | ------ |
| `messages` | `any`[] | An array of message objects. |
| `tools` | `any`[] | An array of tool objects. |
| `callback` | (`chunk`) => `void` | A callback function that receives each chunk of the streaming response. |

#### Returns

`Promise`\<`void`\>

A promise that resolves when the stream is complete.

#### Throws

Error - If messages, tools, or callback are invalid, or streaming fails.
148 changes: 142 additions & 6 deletions sdk_v2/js/src/openai/chatClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -212,25 +212,161 @@ export class ChatClient {

/**
* Performs a streaming chat completion.
*
* Can be used with the async iterable pattern (no callback):
* ```ts
* for await (const chunk of chatClient.completeStreamingChat(messages)) {
* process.stdout.write(chunk.choices?.[0]?.delta?.content ?? '');
* }
* ```
*
* Or with the callback pattern (for backward compatibility):
* ```ts
* await chatClient.completeStreamingChat(messages, (chunk) => {
* process.stdout.write(chunk.choices?.[0]?.delta?.content ?? '');
* });
* ```
*
* @param messages - An array of message objects.
* @returns An async iterable that yields each chunk of the streaming response.
* @throws Error - If messages or tools are invalid, or streaming fails.
*/
public completeStreamingChat(messages: any[]): AsyncIterable<any>;
/**
* Performs a streaming chat completion with tools.
* @param messages - An array of message objects.
* @param tools - An array of tool objects.
* @returns An async iterable that yields each chunk of the streaming response.
* @throws Error - If messages or tools are invalid, or streaming fails.
*/
public completeStreamingChat(messages: any[], tools: any[]): AsyncIterable<any>;
/**
* Performs a streaming chat completion with a callback (backward-compatible).
* @param messages - An array of message objects.
* @param callback - A callback function that receives each chunk of the streaming response.
* @returns A promise that resolves when the stream is complete.
* @throws Error - If messages or callback are invalid, or streaming fails.
*/
public completeStreamingChat(messages: any[], callback: (chunk: any) => void): Promise<void>;
/**
* Performs a streaming chat completion with tools and a callback (backward-compatible).
* @param messages - An array of message objects.
* @param tools - An array of tool objects.
* @param callback - A callback function that receives each chunk of the streaming response.
* @returns A promise that resolves when the stream is complete.
* @throws Error - If messages, tools, or callback are invalid, or streaming fails.
*/
public async completeStreamingChat(messages: any[], callback: (chunk: any) => void): Promise<void>;
public async completeStreamingChat(messages: any[], tools: any[], callback: (chunk: any) => void): Promise<void>;
public async completeStreamingChat(messages: any[], toolsOrCallback: any[] | ((chunk: any) => void), maybeCallback?: (chunk: any) => void): Promise<void> {
public completeStreamingChat(messages: any[], tools: any[], callback: (chunk: any) => void): Promise<void>;
public completeStreamingChat(
messages: any[],
toolsOrCallback?: any[] | ((chunk: any) => void),
maybeCallback?: (chunk: any) => void
): AsyncIterable<any> | Promise<void> {
const tools = Array.isArray(toolsOrCallback) ? toolsOrCallback : undefined;
const callback = (Array.isArray(toolsOrCallback) ? maybeCallback : toolsOrCallback) as ((chunk: any) => void) | undefined;
const callback = typeof toolsOrCallback === 'function' ? toolsOrCallback : maybeCallback;

this.validateMessages(messages);
this.validateTools(tools);

if (!callback || typeof callback !== 'function') {
throw new Error('Callback must be a valid function.');
if (callback !== undefined) {
if (typeof callback !== 'function') {
throw new Error('Callback must be a valid function.');
}
return this._completeStreamingChatWithCallback(messages, tools, callback);
}

return this._streamChat(messages, tools);
}

/**
* Internal async generator that bridges the native callback-based streaming API
* to an async iterable interface.
* @internal
*/
private async *_streamChat(messages: any[], tools?: any[]): AsyncIterableIterator<any> {
const request = {
model: this.modelId,
messages,
...(tools ? { tools } : {}),
stream: true,
...this.settings._serialize()
};

const chunks: any[] = [];
let streamDone = false;
let streamError: Error | null = null;
let notify: (() => void) | null = null;

const wakeConsumer = () => {
if (notify) { const n = notify; notify = null; n(); }
};

const streamPromise = this.coreInterop.executeCommandStreaming(
'chat_completions',
{ Params: { OpenAICreateRequest: JSON.stringify(request) } },
(chunkStr: string) => {
// Skip processing if we already encountered an error
if (streamError) return;

if (chunkStr) {
let chunk: any;
try {
chunk = JSON.parse(chunkStr);
} catch (e) {
// Don't throw from callback - store first error and stop processing
streamError = new Error(
`Failed to parse streaming chunk: ${e instanceof Error ? e.message : String(e)}`,
{ cause: e }
);
wakeConsumer();
return;
}
chunks.push(chunk);
wakeConsumer();
}
}
).then(() => {
streamDone = true;
wakeConsumer();
}).catch((err: unknown) => {
streamError = err instanceof Error ? err : new Error(String(err));
streamDone = true;
wakeConsumer();
});

try {
while (!streamDone && !streamError) {
while (chunks.length > 0) {
yield chunks.shift()!;
}
if (!streamDone && !streamError) {
await new Promise<void>(resolve => { notify = resolve; });
}
}
// Drain any remaining chunks that arrived before streamDone was observed
while (chunks.length > 0) {
yield chunks.shift()!;
}
} finally {
await streamPromise;
}

// TypeScript's control-flow analysis doesn't track mutations through closures,
// so cast through unknown to widen the narrowed type before checking.
const maybeError = streamError as unknown;
if (maybeError instanceof Error) {
throw new Error(
`Streaming chat completion failed for model '${this.modelId}': ${maybeError.message}`,
{ cause: maybeError }
);
}
}

/**
* Internal callback-based streaming implementation (backward-compatible).
* @internal
*/
private async _completeStreamingChatWithCallback(messages: any[], tools: any[] | undefined, callback: (chunk: any) => void): Promise<void> {
const request = {
model: this.modelId,
messages,
Expand Down
Loading
Loading