Skip to content

[FEATURE]: Plugin hooks for streaming token observation (stream.delta) and abort handling (stream.aborted) #14691

@marcusquinn

Description

@marcusquinn

Feature hasn't been suggested before.

  • I have verified this feature I'm about to request hasn't been suggested before.

Describe the enhancement you want to request

Summary

Two new plugin hooks for the streaming loop in processor.ts:

  1. stream.delta — observe individual streaming tokens/chunks, optionally signal abort
  2. stream.aborted — handle stream abort with optional retry and message injection

These hooks enable a new class of harness optimizations that are currently impossible with OpenCode's plugin system: real-time token observation, pattern-based early abort, and corrective steering on retry.

Motivation

Can Boluk's "The Harness Problem" demonstrated that harness engineering is the highest-leverage optimization available today:

  • 15 LLMs improved by changing only the edit tool format (hashline)
  • 5–68% success rate gains across models (Grok Code Fast 1: 6.7% → 68.3%)
  • 20–61% token reduction (Grok 4 Fast output tokens dropped 61%)
  • Zero training compute required

The key insight: the harness — the tool layer between model output and workspace — is where most failures happen in practice. Streaming hooks unlock the next frontier of harness optimization: intervening during generation rather than only after.

Use Cases

Time-To-Stream Rules (TTSR): Pattern-match streaming text against rules as tokens arrive. When a known-bad pattern is detected (model about to repeat a mistake, wrong language, hallucinated import), abort early and retry with a corrective steering message. This saves tokens and wall-clock time compared to waiting for the full response and then discarding it.

Early abort on waste: Detect obviously wrong output mid-stream (infinite retry loops, off-topic generation, budget exceeded) and abort before burning thousands of tokens.

Real-time observability: Token-level metrics — TTFT measurement, throughput tracking, per-model streaming latency — without requiring consumers to parse SSE events externally.

Progressive tool input rendering: Accumulating tool-input-delta into state.raw (which this proposal naturally does) also addresses #9737 — plugins and UIs can show partial tool arguments during long tool calls.

Existing Issues (no direct overlap)

I reviewed existing issues to confirm this hasn't been proposed:

Issue Title Relationship
#9737 Expose partial tool arguments during streaming via state.raw Identifies the tool-input-delta: break no-op. Focuses on UI state accumulation, not plugin hooks. This proposal subsumes it.
#13524 Refactor: centralize tool plugin hooks + add agent to hook input Centralizes existing hooks. These new hooks would naturally integrate with that refactor.
#12472 Native Claude Code hooks compatibility Maps Claude Code's PreToolUse/PostToolUse/Stop to OpenCode events. No streaming-level hooks — these would make OpenCode's plugin system strictly more capable than Claude Code's.
#14451 Ability to intercept or emulate agent messages in plugins Message-level interception, not token-level streaming.
#10374 Allow "aborted" agents to be continued Subagent abort recovery, not streaming abort.
#8197 Add retry/re-run capability when operation is aborted UI retry button, not programmatic abort handling.

Proposed Hook Signatures

stream.delta

"stream.delta"?: (
  input: {
    sessionID: string
    messageID: string
    partID: string
    type: "text" | "reasoning" | "tool-input"
    /** For tool-input deltas, the tool name and call ID */
    tool?: { name: string; callID: string }
  },
  output: {
    delta: string
    /** Set to true to abort the current stream */
    abort?: boolean
  },
) => Promise<void>

stream.aborted

"stream.aborted"?: (
  input: {
    sessionID: string
    messageID: string
    reason: "user" | "plugin" | "error" | "timeout"
    /** Accumulated text so far */
    partial: string
    /** If plugin-initiated, which plugin triggered the abort */
    source?: string
  },
  output: {
    /** Set to true to retry the stream from scratch */
    retry?: boolean
    /** Inject a user message before retry (steering) */
    injectMessage?: string
  },
) => Promise<void>

Code Sketch

The change to processor.ts is modest (~30 lines). In the existing for await (const value of stream.fullStream) loop:

case "text-delta":
  if (currentText) {
    // NEW: trigger stream.delta hook
    const deltaOutput = await Plugin.trigger(
      "stream.delta",
      {
        sessionID: input.sessionID,
        messageID: input.assistantMessage.id,
        partID: currentText.id,
        type: "text",
      },
      { delta: value.text },
    )
    if (deltaOutput.abort) {
      abortReason = "plugin"
      break
    }
    currentText.text += deltaOutput.delta
    // ... existing updatePartDelta logic unchanged
  }
  break

case "tool-input-delta":
  // Instead of `break`, accumulate and trigger hook
  const toolMatch = toolcalls[value.id]
  if (toolMatch && toolMatch.state.status === "pending") {
    const deltaOutput = await Plugin.trigger(
      "stream.delta",
      {
        sessionID: input.sessionID,
        messageID: input.assistantMessage.id,
        partID: toolMatch.id,
        type: "tool-input",
        tool: { name: toolMatch.tool, callID: value.id },
      },
      { delta: value.delta },
    )
    if (deltaOutput.abort) {
      abortReason = "plugin"
      break
    }
    // Accumulate raw (also addresses #9737)
    await Session.updatePart({
      ...toolMatch,
      state: {
        ...toolMatch.state,
        raw: (toolMatch.state.raw || "") + deltaOutput.delta,
      },
    })
  }
  break

After the stream loop, before error handling:

if (abortReason) {
  const abortOutput = await Plugin.trigger(
    "stream.aborted",
    {
      sessionID: input.sessionID,
      messageID: input.assistantMessage.id,
      reason: abortReason,
      partial: currentText?.text ?? "",
    },
    { retry: false, injectMessage: undefined },
  )
  if (abortOutput.retry) {
    if (abortOutput.injectMessage) {
      await Session.addUserMessage(input.sessionID, abortOutput.injectMessage)
    }
    continue // Re-enter the while(true) loop
  }
}

Design Considerations

Performance: Plugin.trigger is already called in the hot path (e.g., experimental.text.complete on text-end). Adding it to text-delta adds per-token overhead. Mitigation: only invoke if any loaded plugin actually registers the hook — check at plugin load time, skip the await entirely if no plugin cares. This is zero-cost for users without streaming plugins.

Backward compatibility: Plugins that don't register these hooks see zero change. output.abort defaults to undefined (falsy), preserving existing behavior.

Subsumes #9737: The tool-input-delta handling naturally accumulates state.raw, which is exactly what #9737 requests.

Complements #12472: Claude Code's hooks operate at the tool execution level. These streaming hooks cover a phase that Claude Code doesn't expose at all, making OpenCode's plugin system strictly more capable.

Complements #13524: The centralized hook dispatch from PR #13521 would naturally include these new hooks.

Additional context

I'm building an open-source agent harness (aidevops) that uses OpenCode as its primary coding agent. We've implemented a "soft TTSR" rule engine that pattern-matches completed responses, but the real value comes from intervening during streaming — which requires these hooks. Happy to contribute a PR if the approach looks reasonable.

Metadata

Metadata

Assignees

Labels

coreAnything pertaining to core functionality of the application (opencode server stuff)

Type

No type
No fields configured for issues without a type.

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions