From 17be327498c1b90a443f905e8c87065bb5a0a8a2 Mon Sep 17 00:00:00 2001 From: Kit Langton Date: Wed, 1 Apr 2026 12:47:45 -0400 Subject: [PATCH 1/3] fix(account): coalesce concurrent console token refreshes --- packages/opencode/src/account/index.ts | 44 ++++++++++++- .../opencode/test/account/service.test.ts | 64 +++++++++++++++++++ 2 files changed, 105 insertions(+), 3 deletions(-) diff --git a/packages/opencode/src/account/index.ts b/packages/opencode/src/account/index.ts index 82b166ef2af2..55abde1d2200 100644 --- a/packages/opencode/src/account/index.ts +++ b/packages/opencode/src/account/index.ts @@ -1,4 +1,4 @@ -import { Clock, Duration, Effect, Layer, Option, Schema, SchemaGetter, ServiceMap } from "effect" +import { Clock, Deferred, Duration, Effect, Layer, Option, Schema, SchemaGetter, ServiceMap, SynchronizedRef } from "effect" import { FetchHttpClient, HttpClient, HttpClientRequest, HttpClientResponse } from "effect/unstable/http" import { makeRuntime } from "@/effect/run-service" @@ -175,9 +175,10 @@ export namespace Account { mapAccountServiceError("HTTP request failed"), ) - const resolveToken = Effect.fnUntraced(function* (row: AccountRow) { + const refreshInFlight = yield* SynchronizedRef.make(new Map>()) + + const refreshToken = Effect.fnUntraced(function* (row: AccountRow) { const now = yield* Clock.currentTimeMillis - if (row.token_expiry && row.token_expiry > now) return row.access_token const response = yield* executeEffectOk( HttpClientRequest.post(`${row.url}/auth/device/token`).pipe( @@ -208,6 +209,43 @@ export namespace Account { return parsed.access_token }) + const clearRefreshInFlight = (accountID: AccountID, deferred: Deferred.Deferred) => + SynchronizedRef.update(refreshInFlight, (inFlight) => { + if (inFlight.get(accountID) !== deferred) return inFlight + const next = new Map(inFlight) + next.delete(accountID) + return next + }) + + const resolveToken = Effect.fnUntraced(function* (row: AccountRow) { + const now = yield* Clock.currentTimeMillis + if (row.token_expiry && row.token_expiry > now) return row.access_token + + return yield* Effect.uninterruptibleMask((restore) => + Effect.gen(function* () { + const [deferred, shouldStart] = yield* SynchronizedRef.modify(refreshInFlight, (inFlight) => { + const existing = inFlight.get(row.id) + if (existing) return [[existing, false] as const, inFlight] as const + + const deferred = Deferred.makeUnsafe() + const next = new Map(inFlight) + next.set(row.id, deferred) + return [[deferred, true] as const, next] as const + }) + + if (shouldStart) { + yield* refreshToken(row).pipe( + Effect.onExit((exit) => Deferred.done(deferred, exit).pipe(Effect.asVoid)), + Effect.ensuring(clearRefreshInFlight(row.id, deferred)), + Effect.forkDetach, + ) + } + + return yield* restore(Deferred.await(deferred)) + }), + ) + }) + const resolveAccess = Effect.fnUntraced(function* (accountID: AccountID) { const maybeAccount = yield* repo.getRow(accountID) if (Option.isNone(maybeAccount)) return Option.none() diff --git a/packages/opencode/test/account/service.test.ts b/packages/opencode/test/account/service.test.ts index cfe55e23e4f6..a08fce03ca97 100644 --- a/packages/opencode/test/account/service.test.ts +++ b/packages/opencode/test/account/service.test.ts @@ -148,6 +148,70 @@ it.live("token refresh persists the new token", () => }), ) +it.live("concurrent config and token requests coalesce token refresh", () => + Effect.gen(function* () { + const id = AccountID.make("user-1") + + yield* AccountRepo.use((r) => + r.persistAccount({ + id, + email: "user@example.com", + url: "https://one.example.com", + accessToken: AccessToken.make("at_old"), + refreshToken: RefreshToken.make("rt_old"), + expiry: Date.now() - 1_000, + orgID: Option.some(OrgID.make("org-9")), + }), + ) + + let refreshCalls = 0 + const client = HttpClient.make((req) => + Effect.promise(async () => { + if (req.url === "https://one.example.com/auth/device/token") { + refreshCalls += 1 + + if (refreshCalls === 1) { + await new Promise((resolve) => setTimeout(resolve, 25)) + return json(req, { + access_token: "at_new", + refresh_token: "rt_new", + expires_in: 60, + }) + } + + return json( + req, + { + error: "invalid_grant", + error_description: "refresh token already used", + }, + 400, + ) + } + + if (req.url === "https://one.example.com/api/config") { + return json(req, { config: { theme: "light", seats: 5 } }) + } + + return json(req, {}, 404) + }), + ) + + const [cfg, token] = yield* Account.Service.use((s) => + Effect.all([s.config(id, OrgID.make("org-9")), s.token(id)], { concurrency: 2 }), + ).pipe(Effect.provide(live(client))) + + expect(Option.getOrThrow(cfg)).toEqual({ theme: "light", seats: 5 }) + expect(String(Option.getOrThrow(token))).toBe("at_new") + expect(refreshCalls).toBe(1) + + const row = yield* AccountRepo.use((r) => r.getRow(id)) + const value = Option.getOrThrow(row) + expect(value.access_token).toBe(AccessToken.make("at_new")) + expect(value.refresh_token).toBe(RefreshToken.make("rt_new")) + }), +) + it.live("config sends the selected org header", () => Effect.gen(function* () { const id = AccountID.make("user-1") From d39c7519ea758363f7e24710abd3459a020ffc2f Mon Sep 17 00:00:00 2001 From: Kit Langton Date: Wed, 1 Apr 2026 12:50:11 -0400 Subject: [PATCH 2/3] refactor(account): use Deferred for coalesced refresh --- packages/opencode/src/account/index.ts | 31 +++++++++++++++++--------- 1 file changed, 20 insertions(+), 11 deletions(-) diff --git a/packages/opencode/src/account/index.ts b/packages/opencode/src/account/index.ts index 55abde1d2200..1505b6629111 100644 --- a/packages/opencode/src/account/index.ts +++ b/packages/opencode/src/account/index.ts @@ -120,6 +120,9 @@ class TokenRefreshRequest extends Schema.Class("TokenRefres const clientId = "opencode-cli" +type InFlightRefresh = Deferred.Deferred +type InFlightRefreshMap = Map + const mapAccountServiceError = (message = "Account service operation failed") => (effect: Effect.Effect): Effect.Effect => @@ -175,7 +178,7 @@ export namespace Account { mapAccountServiceError("HTTP request failed"), ) - const refreshInFlight = yield* SynchronizedRef.make(new Map>()) + const refreshInFlight = yield* SynchronizedRef.make(new Map()) const refreshToken = Effect.fnUntraced(function* (row: AccountRow) { const now = yield* Clock.currentTimeMillis @@ -209,7 +212,7 @@ export namespace Account { return parsed.access_token }) - const clearRefreshInFlight = (accountID: AccountID, deferred: Deferred.Deferred) => + const clearRefreshInFlight = (accountID: AccountID, deferred: InFlightRefresh) => SynchronizedRef.update(refreshInFlight, (inFlight) => { if (inFlight.get(accountID) !== deferred) return inFlight const next = new Map(inFlight) @@ -217,21 +220,27 @@ export namespace Account { return next }) + const getOrStartRefresh = (accountID: AccountID): Effect.Effect => + SynchronizedRef.modify( + refreshInFlight, + (inFlight): readonly [readonly [InFlightRefresh, boolean], InFlightRefreshMap] => { + const existing = inFlight.get(accountID) + if (existing) return [[existing, false] as const, inFlight] as const + + const deferred = Deferred.makeUnsafe() + const next = new Map(inFlight) + next.set(accountID, deferred) + return [[deferred, true] as const, next] as const + }, + ) + const resolveToken = Effect.fnUntraced(function* (row: AccountRow) { const now = yield* Clock.currentTimeMillis if (row.token_expiry && row.token_expiry > now) return row.access_token return yield* Effect.uninterruptibleMask((restore) => Effect.gen(function* () { - const [deferred, shouldStart] = yield* SynchronizedRef.modify(refreshInFlight, (inFlight) => { - const existing = inFlight.get(row.id) - if (existing) return [[existing, false] as const, inFlight] as const - - const deferred = Deferred.makeUnsafe() - const next = new Map(inFlight) - next.set(row.id, deferred) - return [[deferred, true] as const, next] as const - }) + const [deferred, shouldStart] = yield* getOrStartRefresh(row.id) if (shouldStart) { yield* refreshToken(row).pipe( From 2c8e0ae8313f8f8c1592a01f9719af265c6bdc1e Mon Sep 17 00:00:00 2001 From: Kit Langton Date: Wed, 1 Apr 2026 13:08:53 -0400 Subject: [PATCH 3/3] refactor(account): use Cache for refresh single-flight --- packages/opencode/src/account/index.ts | 60 ++++++++------------------ 1 file changed, 18 insertions(+), 42 deletions(-) diff --git a/packages/opencode/src/account/index.ts b/packages/opencode/src/account/index.ts index 1505b6629111..e063eaab564f 100644 --- a/packages/opencode/src/account/index.ts +++ b/packages/opencode/src/account/index.ts @@ -1,4 +1,4 @@ -import { Clock, Deferred, Duration, Effect, Layer, Option, Schema, SchemaGetter, ServiceMap, SynchronizedRef } from "effect" +import { Cache, Clock, Duration, Effect, Layer, Option, Schema, SchemaGetter, ServiceMap } from "effect" import { FetchHttpClient, HttpClient, HttpClientRequest, HttpClientResponse } from "effect/unstable/http" import { makeRuntime } from "@/effect/run-service" @@ -120,9 +120,6 @@ class TokenRefreshRequest extends Schema.Class("TokenRefres const clientId = "opencode-cli" -type InFlightRefresh = Deferred.Deferred -type InFlightRefreshMap = Map - const mapAccountServiceError = (message = "Account service operation failed") => (effect: Effect.Effect): Effect.Effect => @@ -178,8 +175,6 @@ export namespace Account { mapAccountServiceError("HTTP request failed"), ) - const refreshInFlight = yield* SynchronizedRef.make(new Map()) - const refreshToken = Effect.fnUntraced(function* (row: AccountRow) { const now = yield* Clock.currentTimeMillis @@ -212,47 +207,28 @@ export namespace Account { return parsed.access_token }) - const clearRefreshInFlight = (accountID: AccountID, deferred: InFlightRefresh) => - SynchronizedRef.update(refreshInFlight, (inFlight) => { - if (inFlight.get(accountID) !== deferred) return inFlight - const next = new Map(inFlight) - next.delete(accountID) - return next - }) - - const getOrStartRefresh = (accountID: AccountID): Effect.Effect => - SynchronizedRef.modify( - refreshInFlight, - (inFlight): readonly [readonly [InFlightRefresh, boolean], InFlightRefreshMap] => { - const existing = inFlight.get(accountID) - if (existing) return [[existing, false] as const, inFlight] as const - - const deferred = Deferred.makeUnsafe() - const next = new Map(inFlight) - next.set(accountID, deferred) - return [[deferred, true] as const, next] as const - }, - ) + const refreshTokenCache = yield* Cache.make({ + capacity: Number.POSITIVE_INFINITY, + timeToLive: Duration.zero, + lookup: Effect.fnUntraced(function* (accountID) { + const maybeAccount = yield* repo.getRow(accountID) + if (Option.isNone(maybeAccount)) { + return yield* Effect.fail(new AccountServiceError({ message: "Account not found during token refresh" })) + } + + const account = maybeAccount.value + const now = yield* Clock.currentTimeMillis + if (account.token_expiry && account.token_expiry > now) return account.access_token + + return yield* refreshToken(account) + }), + }) const resolveToken = Effect.fnUntraced(function* (row: AccountRow) { const now = yield* Clock.currentTimeMillis if (row.token_expiry && row.token_expiry > now) return row.access_token - return yield* Effect.uninterruptibleMask((restore) => - Effect.gen(function* () { - const [deferred, shouldStart] = yield* getOrStartRefresh(row.id) - - if (shouldStart) { - yield* refreshToken(row).pipe( - Effect.onExit((exit) => Deferred.done(deferred, exit).pipe(Effect.asVoid)), - Effect.ensuring(clearRefreshInFlight(row.id, deferred)), - Effect.forkDetach, - ) - } - - return yield* restore(Deferred.await(deferred)) - }), - ) + return yield* Cache.get(refreshTokenCache, row.id) }) const resolveAccess = Effect.fnUntraced(function* (accountID: AccountID) {