Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
0546d1d
fix: preserve text output without text-start events
tsubasakong Mar 9, 2026
2043233
Merge remote-tracking branch 'upstream/dev' into HEAD
tsubasakong Mar 9, 2026
ceaf688
Merge remote-tracking branch 'upstream/dev' into HEAD
tsubasakong Mar 9, 2026
e3ce901
Merge remote-tracking branch 'upstream/dev' into HEAD
tsubasakong Mar 9, 2026
50c025a
Merge remote-tracking branch 'upstream/dev' into sync/pr-16654
tsubasakong Mar 10, 2026
639b63a
Merge remote-tracking branch 'upstream/dev' into HEAD
tsubasakong Mar 10, 2026
72eba61
Merge remote-tracking branch 'upstream/dev' into HEAD
tsubasakong Mar 10, 2026
e900189
Merge remote-tracking branch 'upstream/dev' into lucas/opener-a/anoma…
tsubasakong Mar 10, 2026
9daad1c
Merge remote-tracking branch 'upstream/dev' into HEAD
tsubasakong Mar 10, 2026
e21bd09
Merge remote-tracking branch 'upstream/dev' into HEAD
tsubasakong Mar 10, 2026
8114621
Merge remote-tracking branch 'upstream/dev' into HEAD
tsubasakong Mar 10, 2026
4edd49e
Merge remote-tracking branch 'upstream/dev' into HEAD
tsubasakong Mar 10, 2026
d5f269a
Merge remote-tracking branch 'upstream/dev' into HEAD
tsubasakong Mar 10, 2026
ab80953
Merge remote-tracking branch 'upstream/dev' into HEAD
tsubasakong Mar 10, 2026
636b659
Merge remote-tracking branch 'upstream/dev' into HEAD
tsubasakong Mar 10, 2026
287bf72
Merge remote-tracking branch 'upstream/dev' into HEAD
tsubasakong Mar 10, 2026
a51bc73
Merge remote-tracking branch 'upstream/dev' into HEAD
tsubasakong Mar 11, 2026
a1eaf31
Merge remote-tracking branch 'upstream/dev' into HEAD
tsubasakong Mar 11, 2026
ac46a51
Merge remote-tracking branch 'upstream/dev' into HEAD
tsubasakong Mar 11, 2026
cbb8cd8
Merge remote-tracking branch 'upstream/dev' into lucas/opener-a/anoma…
tsubasakong Mar 11, 2026
33c86ee
Merge remote-tracking branch 'upstream/dev' into HEAD
tsubasakong Mar 11, 2026
c28b87a
Merge remote-tracking branch 'upstream/dev' into HEAD
tsubasakong Mar 11, 2026
ca694ef
Merge remote-tracking branch 'upstream/dev' into HEAD
tsubasakong Mar 11, 2026
a279640
Merge remote-tracking branch 'upstream/dev' into lucas/opener-a/anoma…
tsubasakong Mar 11, 2026
066fe97
Merge remote-tracking branch 'upstream/dev' into lucas/opener-a/anoma…
tsubasakong Mar 11, 2026
4ec7de3
Merge remote-tracking branch 'upstream/dev' into lucas/opener-a/anoma…
tsubasakong Mar 11, 2026
882dcbe
Merge remote-tracking branch 'upstream/dev' into HEAD
tsubasakong Mar 11, 2026
ae6a767
Merge remote-tracking branch 'upstream/dev' into HEAD
tsubasakong Mar 11, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 32 additions & 24 deletions packages/opencode/src/session/processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,25 @@ export namespace SessionProcessor {
try {
let currentText: MessageV2.TextPart | undefined
let reasoningMap: Record<string, MessageV2.ReasoningPart> = {}
const ensureCurrentText = async (providerMetadata?: MessageV2.TextPart["metadata"]) => {
if (currentText) {
if (providerMetadata) currentText.metadata = providerMetadata
return currentText
}
currentText = {
id: Identifier.ascending("part"),
messageID: input.assistantMessage.id,
sessionID: input.assistantMessage.sessionID,
type: "text",
text: "",
time: {
start: Date.now(),
},
metadata: providerMetadata,
}
await Session.updatePart(currentText)
return currentText
}
const stream = await LLM.stream(streamInput)

for await (const value of stream.fullStream) {
Expand Down Expand Up @@ -289,33 +308,22 @@ export namespace SessionProcessor {
break

case "text-start":
currentText = {
id: Identifier.ascending("part"),
messageID: input.assistantMessage.id,
sessionID: input.assistantMessage.sessionID,
type: "text",
text: "",
time: {
start: Date.now(),
},
metadata: value.providerMetadata,
}
await Session.updatePart(currentText)
await ensureCurrentText(value.providerMetadata)
break

case "text-delta":
if (currentText) {
currentText.text += value.text
if (value.providerMetadata) currentText.metadata = value.providerMetadata
await Session.updatePartDelta({
sessionID: currentText.sessionID,
messageID: currentText.messageID,
partID: currentText.id,
field: "text",
delta: value.text,
})
}
case "text-delta": {
const textPart = await ensureCurrentText(value.providerMetadata)
textPart.text += value.text
if (value.providerMetadata) textPart.metadata = value.providerMetadata
await Session.updatePartDelta({
sessionID: textPart.sessionID,
messageID: textPart.messageID,
partID: textPart.id,
field: "text",
delta: value.text,
})
break
}

case "text-end":
if (currentText) {
Expand Down
95 changes: 95 additions & 0 deletions packages/opencode/test/session/processor.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import { describe, expect, test, spyOn } from "bun:test"
import { Session } from "../../src/session"
import { SessionProcessor } from "../../src/session/processor"
import { MessageV2 } from "../../src/session/message-v2"
import { LLM } from "../../src/session/llm"
import { Log } from "../../src/util/log"
import { Instance } from "../../src/project/instance"
import { Identifier } from "../../src/id/id"
import { tmpdir } from "../fixture/fixture"

Log.init({ print: false })

describe("session.processor text streaming", () => {
test("creates a text part when text-delta arrives before text-start", async () => {
const streamSpy = spyOn(LLM, "stream").mockResolvedValue({
fullStream: (async function* () {
yield { type: "start" }
yield { type: "text-delta", text: "Hello" }
yield { type: "text-delta", text: " world" }
yield { type: "text-end" }
})(),
} as any)

try {
await using tmp = await tmpdir({ git: true })
await Instance.provide({
directory: tmp.path,
fn: async () => {
const session = await Session.create({})
const userMsg = await Session.updateMessage({
id: Identifier.ascending("message"),
role: "user",
sessionID: session.id,
agent: "default",
model: {
providerID: "test",
modelID: "test-model",
},
time: {
created: Date.now(),
},
})

const assistantMsg: MessageV2.Assistant = {
id: Identifier.ascending("message"),
role: "assistant",
sessionID: session.id,
mode: "default",
agent: "default",
path: {
cwd: tmp.path,
root: tmp.path,
},
cost: 0,
tokens: {
output: 0,
input: 0,
reasoning: 0,
cache: { read: 0, write: 0 },
},
modelID: "test-model",
providerID: "test",
parentID: userMsg.id,
time: {
created: Date.now(),
},
}
await Session.updateMessage(assistantMsg)

const processor = SessionProcessor.create({
assistantMessage: assistantMsg,
sessionID: session.id,
model: { providerID: "test", id: "test-model", api: { id: "test-model" } } as any,
abort: new AbortController().signal,
})

const result = await processor.process({} as any)
const parts = await MessageV2.parts(assistantMsg.id)
const textParts = parts.filter((part) => part.type === "text")

expect(result).toBe("continue")
expect(textParts).toHaveLength(1)
expect(textParts[0]).toMatchObject({
type: "text",
text: "Hello world",
})

await Session.remove(session.id)
},
})
} finally {
streamSpy.mockRestore()
}
})
})
Loading