Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
6b6167b
init
dinwwwh Oct 9, 2025
766ab34
publisher & memory publisher
dinwwwh Oct 10, 2025
688bff7
improve
dinwwwh Oct 13, 2025
346f80b
redis - wip
dinwwwh Oct 13, 2025
ee2fb18
improve
dinwwwh Oct 13, 2025
a70f9b4
wip
dinwwwh Oct 14, 2025
46731a2
wip
dinwwwh Oct 15, 2025
e2ac5d2
ioredis
dinwwwh Oct 16, 2025
171bb6d
ci: tests with real redis
dinwwwh Oct 16, 2025
0cc63c2
upstash redis
dinwwwh Oct 17, 2025
6eae7c0
docs
dinwwwh Oct 17, 2025
d8a69f0
fix & improve
dinwwwh Oct 17, 2025
e85a5f5
fix & improve
dinwwwh Oct 17, 2025
b855377
Merge branch 'main' into feat/publisher/renew
dinwwwh Oct 17, 2025
48002b5
fix and improve race condition
dinwwwh Oct 18, 2025
7ca7912
version
dinwwwh Oct 18, 2025
a971e9e
fix and improve test speed
dinwwwh Oct 18, 2025
0881c35
comment
dinwwwh Oct 18, 2025
0e5fbfe
improve tests coverage
dinwwwh Oct 18, 2025
8c3a72f
fix a test not cover as expected
dinwwwh Oct 18, 2025
4d2db9c
fix
dinwwwh Oct 18, 2025
eb574ba
fix concurrent tests
dinwwwh Oct 18, 2025
dd63481
Merge branch 'main' into feat/publisher/renew
dinwwwh Oct 18, 2025
5b281c1
handle error
dinwwwh Oct 18, 2025
26c5f19
comment
dinwwwh Oct 18, 2025
e76d1e2
fix ioredis cleanup
dinwwwh Oct 18, 2025
bc506f7
use array instead of set - because listener can duplicate + fix onError
dinwwwh Oct 18, 2025
7e4012b
cover case unsub use multiple time + same lister
dinwwwh Oct 18, 2025
c89872d
improve unsub
dinwwwh Oct 18, 2025
5d9b55c
handles multiple subscribers on same event with race condition
dinwwwh Oct 19, 2025
9cfe343
reorganize
dinwwwh Oct 19, 2025
70f2209
bump version
dinwwwh Oct 19, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# copy this file and rename it to .env
# then fill in the appropriate values

# Some tests in the project depend on Redis or Upstash Redis
# You can create a free redis instance at upstash and copy the connection details here
REDIS_URL=
UPSTASH_REDIS_REST_URL=
UPSTASH_REDIS_REST_TOKEN=
14 changes: 14 additions & 0 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,16 @@ jobs:

test:
runs-on: ubuntu-latest
services:
redis:
image: redis:7-alpine
options: >-
--health-cmd "redis-cli ping"
--health-interval 10s
--health-timeout 5s
--health-retries 5
ports:
- 6379:6379
steps:
Comment thread
coderabbitai[bot] marked this conversation as resolved.
- uses: actions/checkout@v4

Expand All @@ -40,6 +50,10 @@ jobs:
- run: pnpm i

- run: pnpm run test:coverage
env:
REDIS_URL: redis://localhost:6379
UPSTASH_REDIS_REST_URL: ${{ secrets.UPSTASH_REDIS_REST_URL }}
UPSTASH_REDIS_REST_TOKEN: ${{ secrets.UPSTASH_REDIS_REST_TOKEN }}

- uses: codecov/codecov-action@v5
with:
Expand Down
1 change: 1 addition & 0 deletions apps/content/.vitepress/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ export default withMermaid(defineConfig({
{ text: 'Cookie', link: '/docs/helpers/cookie' },
{ text: 'Encryption', link: '/docs/helpers/encryption' },
{ text: 'Form Data', link: '/docs/helpers/form-data' },
{ text: 'Publisher', link: '/docs/helpers/publisher' },
{ text: 'Signing', link: '/docs/helpers/signing' },
],
},
Expand Down
29 changes: 28 additions & 1 deletion apps/content/docs/event-iterator.md
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,36 @@ const example = os
})
```

## Publisher Helper

You can combine the event iterator with the [Publisher Helper](/docs/helpers/publisher) to build real-time features like chat, notifications, or live updates with resume support.

```ts
const publisher = new MemoryPublisher<{
'something-updated': {
id: string
}
}>()

const live = os
.handler(async function* ({ input, signal }) {
const iterator = publisher.subscribe('something-updated', { signal })
for await (const payload of iterator) {
// Handle payload here or yield directly to client
yield payload
}
})

const publish = os
.input(z.object({ id: z.string() }))
.handler(async ({ input }) => {
await publisher.publish('something-updated', { id: input.id })
})
```

## Event Publisher

oRPC includes a built-in `EventPublisher` for real-time features like chat, notifications, or live updates. It supports broadcasting and subscribing to named events.
Unlike the [Publisher Helper](/docs/helpers/publisher), the `EventPublisher` is more lightweight with synchronous publishing and no resume support.

::: code-group

Expand Down
223 changes: 223 additions & 0 deletions apps/content/docs/helpers/publisher.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
---
title: Publisher
description: Listen and publish events with resuming support in oRPC
---

# Publisher

The Publisher is a helper that enables you to listen to and publish events to subscribers. Combined with the [Event Iterator](/docs/client/event-iterator), it allows you to build streaming responses, real-time updates, and server-sent events with minimal requirements.

## Installation

::: code-group

```sh [npm]
npm install @orpc/experimental-publisher@latest
```

```sh [yarn]
yarn add @orpc/experimental-publisher@latest
```

```sh [pnpm]
pnpm add @orpc/experimental-publisher@latest
```

```sh [bun]
bun add @orpc/experimental-publisher@latest
```

```sh [deno]
deno add npm:@orpc/experimental-publisher@latest
```

:::

## Basic Usage

```ts twoslash
import { MemoryPublisher } from '@orpc/experimental-publisher/memory'
import { os } from '@orpc/server'
import * as z from 'zod'
// ---cut---
const publisher = new MemoryPublisher<{
'something-updated': {
id: string
}
}>()

const live = os
.handler(async function* ({ input, signal }) {
const iterator = publisher.subscribe('something-updated', { signal })
for await (const payload of iterator) {
// Handle payload here or yield directly to client
yield payload
}
})

const publish = os
.input(z.object({ id: z.string() }))
.handler(async ({ input }) => {
await publisher.publish('something-updated', { id: input.id })
})
```

::: tip
The publisher supports both static and dynamic event names.

```ts
const publisher = new MemoryPublisher<Record<string, { message: string }>>()
```

:::

## Resume Feature

The resume feature uses `lastEventId` to determine where to resume from after a disconnection.

::: warning
By default, most adapters have this feature disabled.
:::

### Server Implementation

When subscribing, you must forward the `lastEventId` to the publisher to enable resuming:

```ts
const live = os
.handler(async function* ({ input, signal, lastEventId }) {
const iterator = publisher.subscribe('something-updated', { signal, lastEventId })
for await (const payload of iterator) {
yield payload
}
})
```

::: warning Event ID Management
The publisher automatically manages event ids when resume is enabled. This means:

- Event ids you provide when publishing will be ignored
- When subscribing, you must forward the event id when yielding custom payloads

```ts
import { getEventMeta, withEventMeta } from '@orpc/server'

const live = os
.handler(async function* ({ input, signal, lastEventId }) {
const iterator = publisher.subscribe('something-updated', { signal, lastEventId })
for await (const payload of iterator) {
// Preserve event id when yielding custom data
yield withEventMeta({ custom: 'value' }, { ...getEventMeta(payload) })
}
})

const publish = os
.input(z.object({ id: z.string() }))
.handler(async ({ input }) => {
// The event id 'this-will-be-ignored' will be replaced by the publisher
await publisher.publish('something-updated', withEventMeta({ id: input.id }, { id: 'this-will-be-ignored' }))
})
```

:::

### Client Implementation

On the client, you can use the [Client Retry Plugin](/docs/plugins/client-retry), which automatically controls and passes `lastEventId` to the server when reconnecting. Alternatively, you can manage `lastEventId` manually:

```ts
import { getEventMeta } from '@orpc/client'

let lastEventId: string | undefined

while (true) {
try {
const iterator = await client.live('input', { lastEventId })

for await (const payload of iterator) {
lastEventId = getEventMeta(payload)?.id // Update lastEventId

console.log(payload)
}
}
catch {
await new Promise(resolve => setTimeout(resolve, 1000)) // Wait 1 second before retrying
}
}
```

## Available Adapters

| Name | Resume Support | Description |
| ----------------------- | -------------- | ---------------------------------------------------------------- |
| `MemoryPublisher` | ✅ | A simple in-memory publisher |
| `IORedisPublisher` | ✅ | Adapter for [ioredis](https://github.com/redis/ioredis) |
| `UpstashRedisPublisher` | ✅ | Adapter for [Upstash Redis](https://github.com/upstash/redis-js) |

::: info
If you'd like to add a new publisher adapter, please open an issue.
:::

### Memory Publisher

```ts
import { MemoryPublisher } from '@orpc/experimental-publisher/memory'

const publisher = new MemoryPublisher<{
'something-updated': {
id: string
}
}>({
resumeRetentionSeconds: 60 * 2, // Retain events for 2 minutes to support resume
})
```

::: info
Resume support is disabled by default in `MemoryPublisher`. Enable it by setting `resumeRetentionSeconds` to an appropriate value.
:::

### IORedis Publisher

```ts
import { Redis } from 'ioredis'
import { IORedisPublisher } from '@orpc/experimental-publisher/ioredis'

const publisher = new IORedisPublisher<{
'something-updated': {
id: string
}
}>({
commander: new Redis(), // For executing short-lived commands
subscriber: new Redis(), // For subscribing to events
resumeRetentionSeconds: 60 * 2, // Retain events for 2 minutes to support resume
prefix: 'orpc:publisher:', // avoid conflict with other keys
})
```

This adapter requires two Redis instances: one for executing short-lived commands and another for subscribing to events.

::: info
Resume support is disabled by default in `IORedisPublisher`. Enable it by setting `resumeRetentionSeconds` to an appropriate value.
:::

### Upstash Redis Publisher

```ts
import { Redis } from '@upstash/redis'
import { UpstashRedisPublisher } from '@orpc/experimental-publisher/upstash-redis'

const redis = Redis.fromEnv()

const publisher = new UpstashRedisPublisher<{
'something-updated': {
id: string
}
}>(redis, {
resumeRetentionSeconds: 60 * 2, // Retain events for 2 minutes to support resume
prefix: 'orpc:publisher:', // avoid conflict with other keys
})
```

::: info
Resume support is disabled by default in `UpstashRedisPublisher`. Enable it by setting `resumeRetentionSeconds` to an appropriate value.
:::
1 change: 1 addition & 0 deletions apps/content/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
"@orpc/arktype": "workspace:*",
"@orpc/client": "workspace:*",
"@orpc/contract": "workspace:*",
"@orpc/experimental-publisher": "workspace:*",
"@orpc/experimental-react-swr": "workspace:*",
"@orpc/openapi": "workspace:*",
"@orpc/openapi-client": "workspace:*",
Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
"esbuild"
],
"overrides": {
"@wxt-dev/storage": "1.2.0",
"typescript": "~5.8.3"
}
Comment thread
dinwwwh marked this conversation as resolved.
},
Expand Down
3 changes: 3 additions & 0 deletions packages/arktype/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -40,5 +40,8 @@
},
"dependencies": {
"@orpc/openapi": "workspace:*"
},
"devDependencies": {
"zod": "^4.1.11"
}
}
26 changes: 26 additions & 0 deletions packages/publisher/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Hidden folders and files
.*
!.gitignore
!.*.example

# Common generated folders
logs/
node_modules/
out/
dist/
dist-ssr/
build/
coverage/
temp/

# Common generated files
*.log
*.log.*
*.tsbuildinfo
*.vitest-temp.json
vite.config.ts.timestamp-*
vitest.config.ts.timestamp-*

# Common manual ignore files
*.local
*.pem
Loading
Loading