diff --git a/packages/trpc/package.json b/packages/trpc/package.json index 6a9f97af9..ca5e4eadb 100644 --- a/packages/trpc/package.json +++ b/packages/trpc/package.json @@ -38,6 +38,7 @@ "@trpc/server": ">=11.4.2" }, "dependencies": { + "@orpc/client": "workspace:*", "@orpc/server": "workspace:*", "@orpc/shared": "workspace:*" }, diff --git a/packages/trpc/src/to-orpc-router.test-d.ts b/packages/trpc/src/to-orpc-router.test-d.ts index e237af9de..ea21219fe 100644 --- a/packages/trpc/src/to-orpc-router.test-d.ts +++ b/packages/trpc/src/to-orpc-router.test-d.ts @@ -1,6 +1,7 @@ import type { ContractRouter, InferRouterInitialContext, Procedure, Router, Schema } from '@orpc/server' +import type { AsyncIteratorClass } from '@orpc/shared' import type { inferRouterContext } from '@trpc/server' -import type { inferRouterMeta } from '@trpc/server/unstable-core-do-not-import' +import type { inferRouterMeta, TrackedData } from '@trpc/server/unstable-core-do-not-import' import type { TRPCContext, TRPCMeta, trpcRouter } from '../tests/shared' import type { experimental_ToORPCRouterResult as ToORPCRouterResult } from './to-orpc-router' @@ -24,7 +25,7 @@ it('ToORPCRouterResult', () => { >() expectTypeOf(orpcRouter.subscribe).toEqualTypeOf< - Procedure, Schema>, object, TRPCMeta> + Procedure, Schema, void, any>>, object, TRPCMeta> >() expectTypeOf(orpcRouter.nested).toEqualTypeOf< @@ -35,7 +36,7 @@ it('ToORPCRouterResult', () => { expectTypeOf(orpcRouter.lazy).toEqualTypeOf< { - subscribe: Procedure, Schema>, object, TRPCMeta> + subscribe: Procedure, Schema>, object, TRPCMeta> lazy: { throw: Procedure, Schema, object, TRPCMeta> } diff --git a/packages/trpc/src/to-orpc-router.test.ts b/packages/trpc/src/to-orpc-router.test.ts index 2816a01c4..6677a6bd2 100644 --- a/packages/trpc/src/to-orpc-router.test.ts +++ b/packages/trpc/src/to-orpc-router.test.ts @@ -1,9 +1,15 @@ -import { call, createRouterClient, isProcedure, ORPCError, unlazy } from '@orpc/server' +import { call, createRouterClient, getEventMeta, isProcedure, ORPCError, unlazy } from '@orpc/server' import { isAsyncIteratorObject } from '@orpc/shared' +import { tracked } from '@trpc/server' +import { z } from 'zod' import { inputSchema, outputSchema } from '../../contract/tests/shared' -import { trpcRouter } from '../tests/shared' +import { t, trpcRouter } from '../tests/shared' import { experimental_toORPCRouter as toORPCRouter } from './to-orpc-router' +beforeEach(() => { + vi.clearAllMocks() +}) + describe('toORPCRouter', async () => { const orpcRouter = toORPCRouter(trpcRouter) @@ -73,4 +79,89 @@ describe('toORPCRouter', async () => { }) }) }) + + describe('event iterators', () => { + it('subscribe & tracked', async () => { + const output = await call(orpcRouter.subscribe, { u: '2' }, { lastEventId: 'id-1', context: { a: 'test' } }) as any + expect(output).toSatisfy(isAsyncIteratorObject) + await expect(output.next()).resolves.toEqual({ done: false, value: 'pong' }) + await expect(output.next()).resolves.toSatisfy((result) => { + expect(result.done).toEqual(false) + expect(result.value).toEqual({ id: 'id-1', data: { order: 1 } }) + expect(getEventMeta(result.value)).toEqual({ id: 'id-1' }) + + return true + }) + await expect(output.next()).resolves.toSatisfy((result) => { + expect(result.done).toEqual(false) + expect(result.value).toEqual({ id: 'id-2', data: { order: 2 } }) + expect(getEventMeta(result.value)).toEqual({ id: 'id-2' }) + + return true + }) + await expect(output.next()).resolves.toEqual({ done: true, value: undefined }) + }) + + it('lastEventId', async () => { + const trackedSubscription = vi.fn(async function* () { + yield { order: 1 } + yield tracked('id-2', { order: 2 }) + }) + + const trpcRouter = t.router({ + tracked: t.procedure + .input(z.any()) + .subscription(trackedSubscription), + }) + + const orpcRouter = toORPCRouter(trpcRouter) + + await call(orpcRouter.tracked, { u: 'u' }, { lastEventId: 'id-1', context: { a: 'test' } }) + expect(trackedSubscription).toHaveBeenNthCalledWith(1, expect.objectContaining({ + input: { u: 'u', lastEventId: 'id-1' }, + })) + + await call(orpcRouter.tracked, undefined, { lastEventId: 'id-2', context: { a: 'test' } }) + expect(trackedSubscription).toHaveBeenNthCalledWith(2, expect.objectContaining({ + input: { lastEventId: 'id-2' }, + })) + + await call(orpcRouter.tracked, 1234, { lastEventId: 'id-3', context: { a: 'test' } }) + expect(trackedSubscription).toHaveBeenNthCalledWith(3, expect.objectContaining({ + input: 1234, + })) + }) + + it('works with AsyncIterable & cleanup', async () => { + let cleanupCalled = false + + const trackedSubscription = vi.fn(async () => { + return { + async* [Symbol.asyncIterator]() { + try { + yield { order: 1 } + yield tracked('id-2', { order: 2 }) + } + finally { + cleanupCalled = true + } + }, + } + }) + + const trpcRouter = t.router({ + tracked: t.procedure + .input(z.any()) + .subscription(trackedSubscription), + }) + + const orpcRouter = toORPCRouter(trpcRouter) + + const output = await call(orpcRouter.tracked, { u: 'u' }, { lastEventId: 'id-1', context: { a: 'test' } }) + + await expect(output.next()).resolves.toEqual({ done: false, value: { order: 1 } }) + await expect(output.return?.()).resolves.toEqual({ done: true, value: undefined }) + expect(cleanupCalled).toBe(true) + }) + }) }) diff --git a/packages/trpc/src/to-orpc-router.ts b/packages/trpc/src/to-orpc-router.ts index 2eb8c81d9..f93323493 100644 --- a/packages/trpc/src/to-orpc-router.ts +++ b/packages/trpc/src/to-orpc-router.ts @@ -1,14 +1,21 @@ +import type { AsyncIteratorClass } from '@orpc/shared' import type { AnyProcedure, AnyRouter, inferRouterContext } from '@trpc/server' -import type { inferRouterMeta, Parser } from '@trpc/server/unstable-core-do-not-import' +import type { inferRouterMeta, Parser, TrackedData } from '@trpc/server/unstable-core-do-not-import' +import { mapEventIterator } from '@orpc/client' import * as ORPC from '@orpc/server' -import { isTypescriptObject } from '@orpc/shared' -import { TRPCError } from '@trpc/server' -import { getHTTPStatusCodeFromError } from '@trpc/server/unstable-core-do-not-import' +import { isObject, isTypescriptObject } from '@orpc/shared' +import { isTrackedEnvelope, TRPCError } from '@trpc/server' +import { getHTTPStatusCodeFromError, isAsyncIterable } from '@trpc/server/unstable-core-do-not-import' export interface experimental_ORPCMeta extends ORPC.Route { } +export type experimental_ToORPCOutput + = T extends AsyncIterable + ? AsyncIteratorClass + : T + export type experimental_ToORPCRouterResult> = { [K in keyof TRecord]: @@ -17,7 +24,7 @@ export type experimental_ToORPCRouterResult, - ORPC.Schema, + ORPC.Schema>, object, TMeta > @@ -88,16 +95,42 @@ function toORPCProcedure(procedure: AnyProcedure) { middlewares: [], inputSchema: toDisabledStandardSchema(procedure._def.inputs.at(-1)), outputSchema: toDisabledStandardSchema((procedure as any)._def.output), - handler: async ({ context, signal, path, input }) => { + handler: async ({ context, signal, path, input, lastEventId }) => { try { - return await procedure({ + const trpcInput = lastEventId !== undefined && (input === undefined || isObject(input)) + ? { ...input, lastEventId } + : input + + const output = await procedure({ ctx: context, signal, path: path.join('.'), type: procedure._def.type, - input, - getRawInput: () => input, + input: trpcInput, + getRawInput: () => trpcInput, }) + + if (isAsyncIterable(output)) { + return mapEventIterator(output[Symbol.asyncIterator](), { + error: async error => error, + value: (value) => { + if (isTrackedEnvelope(value)) { + const [id, data] = value + + return ORPC.withEventMeta({ + id, + data, + } satisfies TrackedData, { + id, + }) + } + + return value + }, + }) + } + + return output } catch (cause) { if (cause instanceof TRPCError) { diff --git a/packages/trpc/tests/shared.ts b/packages/trpc/tests/shared.ts index 13555cef2..4025de247 100644 --- a/packages/trpc/tests/shared.ts +++ b/packages/trpc/tests/shared.ts @@ -1,5 +1,5 @@ import type { experimental_ORPCMeta as ORPCMeta } from '../src/to-orpc-router' -import { initTRPC, lazy, TRPCError } from '@trpc/server' +import { initTRPC, lazy, tracked, TRPCError } from '@trpc/server' import { z } from 'zod/v4' import { inputSchema, outputSchema } from '../../contract/tests/shared' @@ -34,6 +34,8 @@ export const trpcRouter = t.router({ .input(z.object({ u: z.string() })) .subscription(async function* () { yield 'pong' + yield tracked('id-1', { order: 1 }) + yield tracked('id-2', { order: 2 }) }), nested: { diff --git a/packages/trpc/tsconfig.json b/packages/trpc/tsconfig.json index 0d99cefed..a2ac5e7f6 100644 --- a/packages/trpc/tsconfig.json +++ b/packages/trpc/tsconfig.json @@ -2,6 +2,7 @@ "extends": "../../tsconfig.lib.json", "references": [ { "path": "../server" }, + { "path": "../client" }, { "path": "../shared" } ], "include": ["src"], diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 3f5fa06ea..2b75053c3 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -664,6 +664,9 @@ importers: packages/trpc: dependencies: + '@orpc/client': + specifier: workspace:* + version: link:../client '@orpc/server': specifier: workspace:* version: link:../server