From 4947fda517d2bebe772538a189413e20c03d7bdc Mon Sep 17 00:00:00 2001 From: unnoq Date: Wed, 2 Jul 2025 15:08:05 +0700 Subject: [PATCH 1/4] lastEventId --- packages/trpc/src/to-orpc-router.test.ts | 44 +++++++++++++++++++++++- packages/trpc/src/to-orpc-router.ts | 12 ++++--- 2 files changed, 51 insertions(+), 5 deletions(-) diff --git a/packages/trpc/src/to-orpc-router.test.ts b/packages/trpc/src/to-orpc-router.test.ts index 2816a01c4..a1c2550ab 100644 --- a/packages/trpc/src/to-orpc-router.test.ts +++ b/packages/trpc/src/to-orpc-router.test.ts @@ -1,9 +1,15 @@ import { call, createRouterClient, isProcedure, ORPCError, unlazy } from '@orpc/server' import { isAsyncIteratorObject } from '@orpc/shared' +import { tracked } from '@trpc/server' +import { z } from 'zod' import { inputSchema, outputSchema } from '../../contract/tests/shared' -import { trpcRouter } from '../tests/shared' +import { t, trpcRouter } from '../tests/shared' import { experimental_toORPCRouter as toORPCRouter } from './to-orpc-router' +beforeEach(() => { + vi.clearAllMocks() +}) + describe('toORPCRouter', async () => { const orpcRouter = toORPCRouter(trpcRouter) @@ -73,4 +79,40 @@ describe('toORPCRouter', async () => { }) }) }) + + describe('event iterators', () => { + const trackedSubscription = vi.fn(async function* () { + yield { order: 1 } + yield tracked('id-2', { order: 2 }) + }) + + const trpcRouter = t.router({ + tracked: t.procedure + .input(z.any()) + .subscription(trackedSubscription), + unsupportedTracked: t.procedure + .subscription(async function* () { + yield tracked('id-1', 1) + }), + }) + + const orpcRouter = toORPCRouter(trpcRouter) + + it('lastEventId', async () => { + await call(orpcRouter.tracked, { u: 'u' }, { lastEventId: 'id-1', context: { a: 'test' } }) + expect(trackedSubscription).toHaveBeenNthCalledWith(1, expect.objectContaining({ + input: { u: 'u', lastEventId: 'id-1' }, + })) + + await call(orpcRouter.tracked, undefined, { lastEventId: 'id-2', context: { a: 'test' } }) + expect(trackedSubscription).toHaveBeenNthCalledWith(2, expect.objectContaining({ + input: { lastEventId: 'id-2' }, + })) + + await call(orpcRouter.tracked, 1234, { lastEventId: 'id-3', context: { a: 'test' } }) + expect(trackedSubscription).toHaveBeenNthCalledWith(3, expect.objectContaining({ + input: 1234, + })) + }) + }) }) diff --git a/packages/trpc/src/to-orpc-router.ts b/packages/trpc/src/to-orpc-router.ts index 2eb8c81d9..5ee8a90ab 100644 --- a/packages/trpc/src/to-orpc-router.ts +++ b/packages/trpc/src/to-orpc-router.ts @@ -1,7 +1,7 @@ import type { AnyProcedure, AnyRouter, inferRouterContext } from '@trpc/server' import type { inferRouterMeta, Parser } from '@trpc/server/unstable-core-do-not-import' import * as ORPC from '@orpc/server' -import { isTypescriptObject } from '@orpc/shared' +import { isObject, isTypescriptObject } from '@orpc/shared' import { TRPCError } from '@trpc/server' import { getHTTPStatusCodeFromError } from '@trpc/server/unstable-core-do-not-import' @@ -88,15 +88,19 @@ function toORPCProcedure(procedure: AnyProcedure) { middlewares: [], inputSchema: toDisabledStandardSchema(procedure._def.inputs.at(-1)), outputSchema: toDisabledStandardSchema((procedure as any)._def.output), - handler: async ({ context, signal, path, input }) => { + handler: async ({ context, signal, path, input, lastEventId }) => { try { + const trpcInput = lastEventId !== undefined && (input === undefined || isObject(input)) + ? { ...input, lastEventId } + : input + return await procedure({ ctx: context, signal, path: path.join('.'), type: procedure._def.type, - input, - getRawInput: () => input, + input: trpcInput, + getRawInput: () => trpcInput, }) } catch (cause) { From fa61e9c62618bb0b1ffc6e24d59d63c7b7cd7e69 Mon Sep 17 00:00:00 2001 From: unnoq Date: Wed, 2 Jul 2025 16:18:01 +0700 Subject: [PATCH 2/4] feat(trpc): support event iterator for toORPCRouter --- packages/trpc/package.json | 1 + packages/trpc/src/to-orpc-router.test-d.ts | 6 +- packages/trpc/src/to-orpc-router.test.ts | 79 ++++++++++++++++++---- packages/trpc/src/to-orpc-router.ts | 38 +++++++++-- packages/trpc/tests/shared.ts | 4 +- packages/trpc/tsconfig.json | 1 + pnpm-lock.yaml | 3 + 7 files changed, 108 insertions(+), 24 deletions(-) diff --git a/packages/trpc/package.json b/packages/trpc/package.json index 6a9f97af9..ca5e4eadb 100644 --- a/packages/trpc/package.json +++ b/packages/trpc/package.json @@ -38,6 +38,7 @@ "@trpc/server": ">=11.4.2" }, "dependencies": { + "@orpc/client": "workspace:*", "@orpc/server": "workspace:*", "@orpc/shared": "workspace:*" }, diff --git a/packages/trpc/src/to-orpc-router.test-d.ts b/packages/trpc/src/to-orpc-router.test-d.ts index e237af9de..ceec852b7 100644 --- a/packages/trpc/src/to-orpc-router.test-d.ts +++ b/packages/trpc/src/to-orpc-router.test-d.ts @@ -1,6 +1,6 @@ import type { ContractRouter, InferRouterInitialContext, Procedure, Router, Schema } from '@orpc/server' import type { inferRouterContext } from '@trpc/server' -import type { inferRouterMeta } from '@trpc/server/unstable-core-do-not-import' +import type { inferRouterMeta, TrackedData } from '@trpc/server/unstable-core-do-not-import' import type { TRPCContext, TRPCMeta, trpcRouter } from '../tests/shared' import type { experimental_ToORPCRouterResult as ToORPCRouterResult } from './to-orpc-router' @@ -24,7 +24,7 @@ it('ToORPCRouterResult', () => { >() expectTypeOf(orpcRouter.subscribe).toEqualTypeOf< - Procedure, Schema>, object, TRPCMeta> + Procedure, Schema, void, any>>, object, TRPCMeta> >() expectTypeOf(orpcRouter.nested).toEqualTypeOf< @@ -35,7 +35,7 @@ it('ToORPCRouterResult', () => { expectTypeOf(orpcRouter.lazy).toEqualTypeOf< { - subscribe: Procedure, Schema>, object, TRPCMeta> + subscribe: Procedure, Schema>, object, TRPCMeta> lazy: { throw: Procedure, Schema, object, TRPCMeta> } diff --git a/packages/trpc/src/to-orpc-router.test.ts b/packages/trpc/src/to-orpc-router.test.ts index a1c2550ab..6677a6bd2 100644 --- a/packages/trpc/src/to-orpc-router.test.ts +++ b/packages/trpc/src/to-orpc-router.test.ts @@ -1,4 +1,4 @@ -import { call, createRouterClient, isProcedure, ORPCError, unlazy } from '@orpc/server' +import { call, createRouterClient, getEventMeta, isProcedure, ORPCError, unlazy } from '@orpc/server' import { isAsyncIteratorObject } from '@orpc/shared' import { tracked } from '@trpc/server' import { z } from 'zod' @@ -81,24 +81,41 @@ describe('toORPCRouter', async () => { }) describe('event iterators', () => { - const trackedSubscription = vi.fn(async function* () { - yield { order: 1 } - yield tracked('id-2', { order: 2 }) - }) + it('subscribe & tracked', async () => { + const output = await call(orpcRouter.subscribe, { u: '2' }, { lastEventId: 'id-1', context: { a: 'test' } }) as any + expect(output).toSatisfy(isAsyncIteratorObject) + await expect(output.next()).resolves.toEqual({ done: false, value: 'pong' }) + await expect(output.next()).resolves.toSatisfy((result) => { + expect(result.done).toEqual(false) + expect(result.value).toEqual({ id: 'id-1', data: { order: 1 } }) + expect(getEventMeta(result.value)).toEqual({ id: 'id-1' }) + + return true + }) + await expect(output.next()).resolves.toSatisfy((result) => { + expect(result.done).toEqual(false) + expect(result.value).toEqual({ id: 'id-2', data: { order: 2 } }) + expect(getEventMeta(result.value)).toEqual({ id: 'id-2' }) - const trpcRouter = t.router({ - tracked: t.procedure - .input(z.any()) - .subscription(trackedSubscription), - unsupportedTracked: t.procedure - .subscription(async function* () { - yield tracked('id-1', 1) - }), + return true + }) + await expect(output.next()).resolves.toEqual({ done: true, value: undefined }) }) - const orpcRouter = toORPCRouter(trpcRouter) - it('lastEventId', async () => { + const trackedSubscription = vi.fn(async function* () { + yield { order: 1 } + yield tracked('id-2', { order: 2 }) + }) + + const trpcRouter = t.router({ + tracked: t.procedure + .input(z.any()) + .subscription(trackedSubscription), + }) + + const orpcRouter = toORPCRouter(trpcRouter) + await call(orpcRouter.tracked, { u: 'u' }, { lastEventId: 'id-1', context: { a: 'test' } }) expect(trackedSubscription).toHaveBeenNthCalledWith(1, expect.objectContaining({ input: { u: 'u', lastEventId: 'id-1' }, @@ -114,5 +131,37 @@ describe('toORPCRouter', async () => { input: 1234, })) }) + + it('works with AsyncIterable & cleanup', async () => { + let cleanupCalled = false + + const trackedSubscription = vi.fn(async () => { + return { + async* [Symbol.asyncIterator]() { + try { + yield { order: 1 } + yield tracked('id-2', { order: 2 }) + } + finally { + cleanupCalled = true + } + }, + } + }) + + const trpcRouter = t.router({ + tracked: t.procedure + .input(z.any()) + .subscription(trackedSubscription), + }) + + const orpcRouter = toORPCRouter(trpcRouter) + + const output = await call(orpcRouter.tracked, { u: 'u' }, { lastEventId: 'id-1', context: { a: 'test' } }) + + await expect(output.next()).resolves.toEqual({ done: false, value: { order: 1 } }) + await expect(output.return?.()).resolves.toEqual({ done: true, value: undefined }) + expect(cleanupCalled).toBe(true) + }) }) }) diff --git a/packages/trpc/src/to-orpc-router.ts b/packages/trpc/src/to-orpc-router.ts index 5ee8a90ab..0515f6636 100644 --- a/packages/trpc/src/to-orpc-router.ts +++ b/packages/trpc/src/to-orpc-router.ts @@ -1,14 +1,20 @@ import type { AnyProcedure, AnyRouter, inferRouterContext } from '@trpc/server' -import type { inferRouterMeta, Parser } from '@trpc/server/unstable-core-do-not-import' +import type { inferRouterMeta, Parser, TrackedData } from '@trpc/server/unstable-core-do-not-import' +import { mapEventIterator } from '@orpc/client' import * as ORPC from '@orpc/server' import { isObject, isTypescriptObject } from '@orpc/shared' -import { TRPCError } from '@trpc/server' -import { getHTTPStatusCodeFromError } from '@trpc/server/unstable-core-do-not-import' +import { isTrackedEnvelope, TRPCError } from '@trpc/server' +import { getHTTPStatusCodeFromError, isAsyncIterable } from '@trpc/server/unstable-core-do-not-import' export interface experimental_ORPCMeta extends ORPC.Route { } +export type experimental_ToORPCOutput + = T extends AsyncIterable + ? AsyncIteratorObject + : T + export type experimental_ToORPCRouterResult> = { [K in keyof TRecord]: @@ -17,7 +23,7 @@ export type experimental_ToORPCRouterResult, - ORPC.Schema, + ORPC.Schema>, object, TMeta > @@ -94,7 +100,7 @@ function toORPCProcedure(procedure: AnyProcedure) { ? { ...input, lastEventId } : input - return await procedure({ + const output = await procedure({ ctx: context, signal, path: path.join('.'), @@ -102,6 +108,28 @@ function toORPCProcedure(procedure: AnyProcedure) { input: trpcInput, getRawInput: () => trpcInput, }) + + if (isAsyncIterable(output)) { + return mapEventIterator(output[Symbol.asyncIterator](), { + error: async error => error, + value: (value) => { + if (isTrackedEnvelope(value)) { + const [id, data] = value + + return ORPC.withEventMeta({ + id, + data, + } as TrackedData, { + id, + }) + } + + return value + }, + }) + } + + return output } catch (cause) { if (cause instanceof TRPCError) { diff --git a/packages/trpc/tests/shared.ts b/packages/trpc/tests/shared.ts index 13555cef2..4025de247 100644 --- a/packages/trpc/tests/shared.ts +++ b/packages/trpc/tests/shared.ts @@ -1,5 +1,5 @@ import type { experimental_ORPCMeta as ORPCMeta } from '../src/to-orpc-router' -import { initTRPC, lazy, TRPCError } from '@trpc/server' +import { initTRPC, lazy, tracked, TRPCError } from '@trpc/server' import { z } from 'zod/v4' import { inputSchema, outputSchema } from '../../contract/tests/shared' @@ -34,6 +34,8 @@ export const trpcRouter = t.router({ .input(z.object({ u: z.string() })) .subscription(async function* () { yield 'pong' + yield tracked('id-1', { order: 1 }) + yield tracked('id-2', { order: 2 }) }), nested: { diff --git a/packages/trpc/tsconfig.json b/packages/trpc/tsconfig.json index 0d99cefed..a2ac5e7f6 100644 --- a/packages/trpc/tsconfig.json +++ b/packages/trpc/tsconfig.json @@ -2,6 +2,7 @@ "extends": "../../tsconfig.lib.json", "references": [ { "path": "../server" }, + { "path": "../client" }, { "path": "../shared" } ], "include": ["src"], diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 3f5fa06ea..2b75053c3 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -664,6 +664,9 @@ importers: packages/trpc: dependencies: + '@orpc/client': + specifier: workspace:* + version: link:../client '@orpc/server': specifier: workspace:* version: link:../server From 07fab905db98da70d1d7dffff809d9ce2083be3b Mon Sep 17 00:00:00 2001 From: unnoq Date: Wed, 2 Jul 2025 16:23:26 +0700 Subject: [PATCH 3/4] AsyncIteratorClass --- packages/trpc/src/to-orpc-router.test-d.ts | 5 +++-- packages/trpc/src/to-orpc-router.ts | 3 ++- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/packages/trpc/src/to-orpc-router.test-d.ts b/packages/trpc/src/to-orpc-router.test-d.ts index ceec852b7..ea21219fe 100644 --- a/packages/trpc/src/to-orpc-router.test-d.ts +++ b/packages/trpc/src/to-orpc-router.test-d.ts @@ -1,4 +1,5 @@ import type { ContractRouter, InferRouterInitialContext, Procedure, Router, Schema } from '@orpc/server' +import type { AsyncIteratorClass } from '@orpc/shared' import type { inferRouterContext } from '@trpc/server' import type { inferRouterMeta, TrackedData } from '@trpc/server/unstable-core-do-not-import' import type { TRPCContext, TRPCMeta, trpcRouter } from '../tests/shared' @@ -24,7 +25,7 @@ it('ToORPCRouterResult', () => { >() expectTypeOf(orpcRouter.subscribe).toEqualTypeOf< - Procedure, Schema, void, any>>, object, TRPCMeta> + Procedure, Schema, void, any>>, object, TRPCMeta> >() expectTypeOf(orpcRouter.nested).toEqualTypeOf< @@ -35,7 +36,7 @@ it('ToORPCRouterResult', () => { expectTypeOf(orpcRouter.lazy).toEqualTypeOf< { - subscribe: Procedure, Schema>, object, TRPCMeta> + subscribe: Procedure, Schema>, object, TRPCMeta> lazy: { throw: Procedure, Schema, object, TRPCMeta> } diff --git a/packages/trpc/src/to-orpc-router.ts b/packages/trpc/src/to-orpc-router.ts index 0515f6636..6639b5b5c 100644 --- a/packages/trpc/src/to-orpc-router.ts +++ b/packages/trpc/src/to-orpc-router.ts @@ -1,3 +1,4 @@ +import type { AsyncIteratorClass } from '@orpc/shared' import type { AnyProcedure, AnyRouter, inferRouterContext } from '@trpc/server' import type { inferRouterMeta, Parser, TrackedData } from '@trpc/server/unstable-core-do-not-import' import { mapEventIterator } from '@orpc/client' @@ -12,7 +13,7 @@ export interface experimental_ORPCMeta extends ORPC.Route { export type experimental_ToORPCOutput = T extends AsyncIterable - ? AsyncIteratorObject + ? AsyncIteratorClass : T export type experimental_ToORPCRouterResult> From 93521fa9234c7c5efcf5d21cf8f1bd379de1a482 Mon Sep 17 00:00:00 2001 From: unnoq Date: Wed, 2 Jul 2025 16:54:15 +0700 Subject: [PATCH 4/4] fix --- packages/trpc/src/to-orpc-router.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/trpc/src/to-orpc-router.ts b/packages/trpc/src/to-orpc-router.ts index 6639b5b5c..f93323493 100644 --- a/packages/trpc/src/to-orpc-router.ts +++ b/packages/trpc/src/to-orpc-router.ts @@ -120,7 +120,7 @@ function toORPCProcedure(procedure: AnyProcedure) { return ORPC.withEventMeta({ id, data, - } as TrackedData, { + } satisfies TrackedData, { id, }) }