Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions packages/app/src/context/global-sync.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import { bootstrapDirectory, bootstrapGlobal } from "./global-sync/bootstrap"
import { createChildStoreManager } from "./global-sync/child-store"
import { applyDirectoryEvent, applyGlobalEvent, cleanupDroppedSessionCaches } from "./global-sync/event-reducer"
import { createRefreshQueue } from "./global-sync/queue"
import { clearSessionPrefetchDirectory } from "./global-sync/session-prefetch"
import { estimateRootSessionTotal, loadRootSessionsWithFallback } from "./global-sync/session-load"
import { trimSessions } from "./global-sync/session-trim"
import type { ProjectMeta } from "./global-sync/types"
Expand Down Expand Up @@ -161,6 +162,7 @@ function createGlobalSync() {
queue.clear(directory)
sessionMeta.delete(directory)
sdkCache.delete(directory)
clearSessionPrefetchDirectory(directory)
},
})

Expand Down
63 changes: 63 additions & 0 deletions packages/app/src/context/global-sync/session-prefetch.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import { describe, expect, test } from "bun:test"
import {
clearSessionPrefetch,
clearSessionPrefetchDirectory,
getSessionPrefetch,
runSessionPrefetch,
setSessionPrefetch,
} from "./session-prefetch"

describe("session prefetch", () => {
test("stores and clears message metadata by directory", () => {
clearSessionPrefetch("/tmp/a", ["ses_1"])
clearSessionPrefetch("/tmp/b", ["ses_1"])

setSessionPrefetch({
directory: "/tmp/a",
sessionID: "ses_1",
limit: 200,
complete: false,
at: 123,
})

expect(getSessionPrefetch("/tmp/a", "ses_1")).toEqual({ limit: 200, complete: false, at: 123 })
expect(getSessionPrefetch("/tmp/b", "ses_1")).toBeUndefined()

clearSessionPrefetch("/tmp/a", ["ses_1"])

expect(getSessionPrefetch("/tmp/a", "ses_1")).toBeUndefined()
})

test("dedupes inflight work", async () => {
clearSessionPrefetch("/tmp/c", ["ses_2"])

let calls = 0
const run = () =>
runSessionPrefetch({
directory: "/tmp/c",
sessionID: "ses_2",
task: async () => {
calls += 1
return { limit: 100, complete: true, at: 456 }
},
})

const [a, b] = await Promise.all([run(), run()])

expect(calls).toBe(1)
expect(a).toEqual({ limit: 100, complete: true, at: 456 })
expect(b).toEqual({ limit: 100, complete: true, at: 456 })
})

test("clears a whole directory", () => {
setSessionPrefetch({ directory: "/tmp/d", sessionID: "ses_1", limit: 10, complete: true, at: 1 })
setSessionPrefetch({ directory: "/tmp/d", sessionID: "ses_2", limit: 20, complete: false, at: 2 })
setSessionPrefetch({ directory: "/tmp/e", sessionID: "ses_1", limit: 30, complete: true, at: 3 })

clearSessionPrefetchDirectory("/tmp/d")

expect(getSessionPrefetch("/tmp/d", "ses_1")).toBeUndefined()
expect(getSessionPrefetch("/tmp/d", "ses_2")).toBeUndefined()
expect(getSessionPrefetch("/tmp/e", "ses_1")).toEqual({ limit: 30, complete: true, at: 3 })
})
})
85 changes: 85 additions & 0 deletions packages/app/src/context/global-sync/session-prefetch.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
const key = (directory: string, sessionID: string) => `${directory}\n${sessionID}`

export const SESSION_PREFETCH_TTL = 15_000

type Meta = {
limit: number
complete: boolean
at: number
}

const cache = new Map<string, Meta>()
const inflight = new Map<string, Promise<Meta | undefined>>()
const rev = new Map<string, number>()

const version = (id: string) => rev.get(id) ?? 0

export function getSessionPrefetch(directory: string, sessionID: string) {
return cache.get(key(directory, sessionID))
}

export function getSessionPrefetchPromise(directory: string, sessionID: string) {
return inflight.get(key(directory, sessionID))
}

export function clearSessionPrefetchInflight() {
inflight.clear()
}

export function isSessionPrefetchCurrent(directory: string, sessionID: string, value: number) {
return version(key(directory, sessionID)) === value
}

export function runSessionPrefetch(input: {
directory: string
sessionID: string
task: (value: number) => Promise<Meta | undefined>
}) {
const id = key(input.directory, input.sessionID)
const pending = inflight.get(id)
if (pending) return pending

const value = version(id)

const promise = input.task(value).finally(() => {
if (inflight.get(id) === promise) inflight.delete(id)
})

inflight.set(id, promise)
return promise
}

export function setSessionPrefetch(input: {
directory: string
sessionID: string
limit: number
complete: boolean
at?: number
}) {
cache.set(key(input.directory, input.sessionID), {
limit: input.limit,
complete: input.complete,
at: input.at ?? Date.now(),
})
}

export function clearSessionPrefetch(directory: string, sessionIDs: Iterable<string>) {
for (const sessionID of sessionIDs) {
if (!sessionID) continue
const id = key(directory, sessionID)
rev.set(id, version(id) + 1)
cache.delete(id)
inflight.delete(id)
}
}

export function clearSessionPrefetchDirectory(directory: string) {
const prefix = `${directory}\n`
const keys = new Set([...cache.keys(), ...inflight.keys()])
for (const id of keys) {
if (!id.startsWith(prefix)) continue
rev.set(id, version(id) + 1)
cache.delete(id)
inflight.delete(id)
}
}
115 changes: 78 additions & 37 deletions packages/app/src/context/sync.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,12 @@ import { createStore, produce, reconcile } from "solid-js/store"
import { Binary } from "@opencode-ai/util/binary"
import { retry } from "@opencode-ai/util/retry"
import { createSimpleContext } from "@opencode-ai/ui/context"
import {
clearSessionPrefetch,
getSessionPrefetch,
getSessionPrefetchPromise,
setSessionPrefetch,
} from "./global-sync/session-prefetch"
import { useGlobalSync } from "./global-sync"
import { useSDK } from "./sdk"
import type { Message, Part } from "@opencode-ai/sdk/v2/client"
Expand Down Expand Up @@ -160,6 +166,7 @@ export const { use: useSync, provider: SyncProvider } = createSimpleContext({

const evict = (directory: string, setStore: Setter, sessionIDs: string[]) => {
if (sessionIDs.length === 0) return
clearSessionPrefetch(directory, sessionIDs)
for (const sessionID of sessionIDs) {
globalSync.todo.set(sessionID, undefined)
}
Expand Down Expand Up @@ -217,6 +224,12 @@ export const { use: useSync, provider: SyncProvider } = createSimpleContext({
}
setMeta("limit", key, input.limit)
setMeta("complete", key, next.complete)
setSessionPrefetch({
directory: input.directory,
sessionID: input.sessionID,
limit: input.limit,
complete: next.complete,
})
})
})
.finally(() => {
Expand Down Expand Up @@ -280,54 +293,82 @@ export const { use: useSync, provider: SyncProvider } = createSimpleContext({
parts: input.parts,
})
},
async sync(sessionID: string) {
async sync(sessionID: string, opts?: { force?: boolean }) {
const directory = sdk.directory
const client = sdk.client
const [store, setStore] = globalSync.child(directory)
const key = keyFor(directory, sessionID)
const hasSession = Binary.search(store.session, sessionID, (s) => s.id).found

touch(directory, setStore, sessionID)

if (store.message[sessionID] !== undefined && hasSession && meta.limit[key] !== undefined) return

const limit = meta.limit[key] ?? messagePageSize

const sessionReq = hasSession
? Promise.resolve()
: retry(() => client.session.get({ sessionID })).then((session) => {
if (!tracked(directory, sessionID)) return
const data = session.data
if (!data) return
setStore(
"session",
produce((draft) => {
const match = Binary.search(draft, sessionID, (s) => s.id)
if (match.found) {
draft[match.index] = data
return
}
draft.splice(match.index, 0, data)
}),
)
})

const messagesReq = loadMessages({
directory,
client,
setStore,
sessionID,
limit,
})
const seeded = getSessionPrefetch(directory, sessionID)
if (seeded && store.message[sessionID] !== undefined && meta.limit[key] === undefined) {
batch(() => {
setMeta("limit", key, seeded.limit)
setMeta("complete", key, seeded.complete)
setMeta("loading", key, false)
})
}

return runInflight(inflight, key, () => Promise.all([sessionReq, messagesReq]).then(() => {}))
return runInflight(inflight, key, async () => {
const pending = getSessionPrefetchPromise(directory, sessionID)
if (pending) {
await pending
const seeded = getSessionPrefetch(directory, sessionID)
if (seeded && store.message[sessionID] !== undefined && meta.limit[key] === undefined) {
batch(() => {
setMeta("limit", key, seeded.limit)
setMeta("complete", key, seeded.complete)
setMeta("loading", key, false)
})
}
}

const hasSession = Binary.search(store.session, sessionID, (s) => s.id).found
const cached = store.message[sessionID] !== undefined && meta.limit[key] !== undefined
if (cached && hasSession && !opts?.force) return

const limit = meta.limit[key] ?? messagePageSize
const sessionReq =
hasSession && !opts?.force
? Promise.resolve()
: retry(() => client.session.get({ sessionID })).then((session) => {
if (!tracked(directory, sessionID)) return
const data = session.data
if (!data) return
setStore(
"session",
produce((draft) => {
const match = Binary.search(draft, sessionID, (s) => s.id)
if (match.found) {
draft[match.index] = data
return
}
draft.splice(match.index, 0, data)
}),
)
})

const messagesReq =
cached && !opts?.force
? Promise.resolve()
: loadMessages({
directory,
client,
setStore,
sessionID,
limit,
})

await Promise.all([sessionReq, messagesReq])
})
},
async diff(sessionID: string) {
async diff(sessionID: string, opts?: { force?: boolean }) {
const directory = sdk.directory
const client = sdk.client
const [store, setStore] = globalSync.child(directory)
touch(directory, setStore, sessionID)
if (store.session_diff[sessionID] !== undefined) return
if (store.session_diff[sessionID] !== undefined && !opts?.force) return

const key = keyFor(directory, sessionID)
return runInflight(inflightDiff, key, () =>
Expand All @@ -337,7 +378,7 @@ export const { use: useSync, provider: SyncProvider } = createSimpleContext({
}),
)
},
async todo(sessionID: string) {
async todo(sessionID: string, opts?: { force?: boolean }) {
const directory = sdk.directory
const client = sdk.client
const [store, setStore] = globalSync.child(directory)
Expand All @@ -348,7 +389,7 @@ export const { use: useSync, provider: SyncProvider } = createSimpleContext({
if (cached === undefined) {
globalSync.todo.set(sessionID, existing)
}
return
if (!opts?.force) return
}

if (cached !== undefined) {
Expand Down
Loading
Loading