From 0546d1d60f74c6c30687013d4ed806a357c92aea Mon Sep 17 00:00:00 2001 From: Frank <97429702+tsubasakong@users.noreply.github.com> Date: Sun, 8 Mar 2026 17:26:24 -0700 Subject: [PATCH] fix: preserve text output without text-start events --- packages/opencode/src/session/processor.ts | 56 ++++++----- .../opencode/test/session/processor.test.ts | 95 +++++++++++++++++++ 2 files changed, 127 insertions(+), 24 deletions(-) create mode 100644 packages/opencode/test/session/processor.test.ts diff --git a/packages/opencode/src/session/processor.ts b/packages/opencode/src/session/processor.ts index 67edc0ecfe35..8eb08efb3935 100644 --- a/packages/opencode/src/session/processor.ts +++ b/packages/opencode/src/session/processor.ts @@ -50,6 +50,25 @@ export namespace SessionProcessor { try { let currentText: MessageV2.TextPart | undefined let reasoningMap: Record = {} + const ensureCurrentText = async (providerMetadata?: MessageV2.TextPart["metadata"]) => { + if (currentText) { + if (providerMetadata) currentText.metadata = providerMetadata + return currentText + } + currentText = { + id: Identifier.ascending("part"), + messageID: input.assistantMessage.id, + sessionID: input.assistantMessage.sessionID, + type: "text", + text: "", + time: { + start: Date.now(), + }, + metadata: providerMetadata, + } + await Session.updatePart(currentText) + return currentText + } const stream = await LLM.stream(streamInput) for await (const value of stream.fullStream) { @@ -288,33 +307,22 @@ export namespace SessionProcessor { break case "text-start": - currentText = { - id: Identifier.ascending("part"), - messageID: input.assistantMessage.id, - sessionID: input.assistantMessage.sessionID, - type: "text", - text: "", - time: { - start: Date.now(), - }, - metadata: value.providerMetadata, - } - await Session.updatePart(currentText) + await ensureCurrentText(value.providerMetadata) break - case "text-delta": - if (currentText) { - currentText.text += value.text - if (value.providerMetadata) currentText.metadata = value.providerMetadata - await Session.updatePartDelta({ - sessionID: currentText.sessionID, - messageID: currentText.messageID, - partID: currentText.id, - field: "text", - delta: value.text, - }) - } + case "text-delta": { + const textPart = await ensureCurrentText(value.providerMetadata) + textPart.text += value.text + if (value.providerMetadata) textPart.metadata = value.providerMetadata + await Session.updatePartDelta({ + sessionID: textPart.sessionID, + messageID: textPart.messageID, + partID: textPart.id, + field: "text", + delta: value.text, + }) break + } case "text-end": if (currentText) { diff --git a/packages/opencode/test/session/processor.test.ts b/packages/opencode/test/session/processor.test.ts new file mode 100644 index 000000000000..c7be77848a34 --- /dev/null +++ b/packages/opencode/test/session/processor.test.ts @@ -0,0 +1,95 @@ +import { describe, expect, test, spyOn } from "bun:test" +import { Session } from "../../src/session" +import { SessionProcessor } from "../../src/session/processor" +import { MessageV2 } from "../../src/session/message-v2" +import { LLM } from "../../src/session/llm" +import { Log } from "../../src/util/log" +import { Instance } from "../../src/project/instance" +import { Identifier } from "../../src/id/id" +import { tmpdir } from "../fixture/fixture" + +Log.init({ print: false }) + +describe("session.processor text streaming", () => { + test("creates a text part when text-delta arrives before text-start", async () => { + const streamSpy = spyOn(LLM, "stream").mockResolvedValue({ + fullStream: (async function* () { + yield { type: "start" } + yield { type: "text-delta", text: "Hello" } + yield { type: "text-delta", text: " world" } + yield { type: "text-end" } + })(), + } as any) + + try { + await using tmp = await tmpdir({ git: true }) + await Instance.provide({ + directory: tmp.path, + fn: async () => { + const session = await Session.create({}) + const userMsg = await Session.updateMessage({ + id: Identifier.ascending("message"), + role: "user", + sessionID: session.id, + agent: "default", + model: { + providerID: "test", + modelID: "test-model", + }, + time: { + created: Date.now(), + }, + }) + + const assistantMsg: MessageV2.Assistant = { + id: Identifier.ascending("message"), + role: "assistant", + sessionID: session.id, + mode: "default", + agent: "default", + path: { + cwd: tmp.path, + root: tmp.path, + }, + cost: 0, + tokens: { + output: 0, + input: 0, + reasoning: 0, + cache: { read: 0, write: 0 }, + }, + modelID: "test-model", + providerID: "test", + parentID: userMsg.id, + time: { + created: Date.now(), + }, + } + await Session.updateMessage(assistantMsg) + + const processor = SessionProcessor.create({ + assistantMessage: assistantMsg, + sessionID: session.id, + model: { providerID: "test", id: "test-model", api: { id: "test-model" } } as any, + abort: new AbortController().signal, + }) + + const result = await processor.process({} as any) + const parts = await MessageV2.parts(assistantMsg.id) + const textParts = parts.filter((part) => part.type === "text") + + expect(result).toBe("continue") + expect(textParts).toHaveLength(1) + expect(textParts[0]).toMatchObject({ + type: "text", + text: "Hello world", + }) + + await Session.remove(session.id) + }, + }) + } finally { + streamSpy.mockRestore() + } + }) +})