Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/teams-native-streaming.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@chat-adapter/teams": minor
---

Use native Teams SDK streaming for DMs via `stream.emit()`, with accumulate-and-post fallback for group chats
145 changes: 113 additions & 32 deletions packages/adapter-teams/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,12 @@ import type {
IMessageReactionActivity,
ITaskFetchInvokeActivity,
ITaskSubmitInvokeActivity,
SentActivity,
TaskModuleResponse,
} from "@microsoft/teams.api";
import { MessageActivity, TypingActivity } from "@microsoft/teams.api";
import type { IActivityContext } from "@microsoft/teams.apps";
import { App } from "@microsoft/teams.apps";
import type { IActivityContext, IStreamer } from "@microsoft/teams.apps";
import { App, StreamCancelledError } from "@microsoft/teams.apps";
import { users } from "@microsoft/teams.graph-endpoints";
import type {
ActionEvent,
Expand Down Expand Up @@ -93,6 +94,7 @@ export class TeamsAdapter implements Adapter<TeamsThreadId, unknown> {
private readonly formatConverter = new TeamsFormatConverter();
private readonly config: TeamsAdapterConfig;
private readonly graphReader: TeamsGraphReader;
private readonly activeStreams = new Map<string, IStreamer>();

constructor(config: TeamsAdapterConfig = {}) {
this.config = config;
Expand Down Expand Up @@ -303,6 +305,10 @@ export class TeamsAdapter implements Adapter<TeamsThreadId, unknown> {

/**
* Handle message activities (normal messages + Action.Submit button clicks).
*
* For DMs we block the handler until chat processing completes so that
* ctx.stream (the Teams SDK's native IStreamer) stays alive for streaming.
* The Teams SDK auto-closes the stream after the handler returns.
*/
private async handleMessageActivity(
ctx: IActivityContext<IMessageActivity>
Expand Down Expand Up @@ -342,12 +348,41 @@ export class TeamsAdapter implements Adapter<TeamsThreadId, unknown> {
message.isMention = true;
}

this.chat.processMessage(
this,
threadId,
message,
this.bridgeAdapter.getWebhookOptions(activity.id)
);
// For DMs, capture ctx.stream and await processing so the stream stays
// alive for native streaming via emit(). Group chats use fire-and-forget.
if (this.isDM(threadId)) {
this.activeStreams.set(threadId, ctx.stream);

let resolveProcessing: () => void;
const processingDone = new Promise<void>((resolve) => {
resolveProcessing = resolve;
});

const baseOptions = this.bridgeAdapter.getWebhookOptions(activity.id);
this.chat.processMessage(this, threadId, message, {
...baseOptions,
waitUntil: (task: Promise<unknown>) => {
baseOptions?.waitUntil?.(task);
task.then(
() => resolveProcessing(),
() => resolveProcessing()
);
},
});

try {
await processingDone;
} finally {
this.activeStreams.delete(threadId);
}
} else {
this.chat.processMessage(
this,
threadId,
message,
this.bridgeAdapter.getWebhookOptions(activity.id)
);
}
}

/**
Expand Down Expand Up @@ -1132,46 +1167,92 @@ export class TeamsAdapter implements Adapter<TeamsThreadId, unknown> {
}

/**
* Stream responses via post+edit.
* TODO: Use native HttpStream for DMs once @microsoft/teams.apps exports it.
* Stream responses using the Teams SDK's native streaming protocol when
* an active IStreamer exists (DMs), falling back to post+edit otherwise.
*/
async stream(
threadId: string,
textStream: AsyncIterable<string | StreamChunk>,
_options?: StreamOptions
): Promise<RawMessage<unknown>> {
const { conversationId } = this.decodeThreadId(threadId);
let accumulated = "";
let messageId: string | undefined;
const activeStream = this.activeStreams.get(threadId);

if (activeStream && !activeStream.canceled) {
return this.streamViaEmit(threadId, textStream, activeStream);
}

// No native streamer available (group chats, proactive messages) —
// accumulate and post as a single message instead of post+edit.
let accumulated = "";
for await (const chunk of textStream) {
let text = "";
if (typeof chunk === "string") {
text = chunk;
accumulated += chunk;
} else if (chunk.type === "markdown_text") {
text = chunk.text;
}
if (!text) {
continue;
accumulated += chunk.text;
}
}
if (!accumulated) {
return { id: "", threadId, raw: { text: "" } };
}
return this.postMessage(threadId, { markdown: accumulated });
}

accumulated += text;
/**
* Native streaming using the Teams SDK's IStreamer.emit().
* Sends typing activities with streamType: 'streaming', then a final
* message with streamType: 'final' on close (handled by the framework).
*
* We do NOT call stream.close() — the Teams SDK calls it automatically
* after the handler returns.
*/
private async streamViaEmit(
threadId: string,
textStream: AsyncIterable<string | StreamChunk>,
stream: IStreamer
): Promise<RawMessage<unknown>> {
let accumulated = "";
let messageId = "";

if (messageId) {
const activity = new MessageActivity(accumulated);
activity.textFormat = "markdown";
await this.app.api.conversations
.activities(conversationId)
.update(messageId, activity);
} else {
const activity = new MessageActivity(accumulated);
activity.textFormat = "markdown";
const res = await this.app.send(conversationId, activity);
messageId = res.id ?? "";
const idCaptured = new Promise<string>((resolve) => {
stream.events.once("chunk", (activity: SentActivity) => {
resolve(activity.id || "");
});
});

try {
for await (const chunk of textStream) {
if (stream.canceled) {
this.logger.debug("Teams stream canceled by user", { threadId });
break;
}

let text = "";
if (typeof chunk === "string") {
text = chunk;
} else if (chunk.type === "markdown_text") {
text = chunk.text;
}
if (!text) {
continue;
}

stream.emit(text);
accumulated += text;
}
} catch (error) {
if (!(error instanceof StreamCancelledError)) {
throw error;
}
this.logger.debug("Teams stream canceled during iteration", { threadId });
}

// Only await the chunk ID if we emitted text and the stream wasn't
// canceled before any chunk was delivered (which would hang forever).
if (accumulated && !stream.canceled) {
messageId = await idCaptured;
}

return { id: messageId ?? "", threadId, raw: { text: accumulated } };
return { id: messageId, threadId, raw: { text: accumulated } };
}

async openDM(userId: string): Promise<string> {
Expand Down
11 changes: 4 additions & 7 deletions packages/integration-tests/src/replay-streaming.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -194,8 +194,8 @@ describe("Streaming Replay Tests", () => {
// Verify initial message was sent
expectSentMessage(ctx.mockTeamsApp, "AI Mode Enabled!");

// Verify streaming completed with final message
expectUpdatedMessage(ctx.mockTeamsApp, "Love is a complex emotion.");
// Group chats accumulate and post as single message (no post+edit)
expectSentMessage(ctx.mockTeamsApp, "Love is a complex emotion.");
});

it("should stream response to follow-up message in AI mode", async () => {
Expand All @@ -211,11 +211,8 @@ describe("Streaming Replay Tests", () => {
adapterName: "teams",
});

// Verify streaming response
expectUpdatedMessage(
ctx.mockTeamsApp,
"I am an AI assistant here to help."
);
// Group chats accumulate and post as single message (no post+edit)
expectSentMessage(ctx.mockTeamsApp, "I am an AI assistant here to help.");
});
});

Expand Down