diff --git a/packages/server/src/helpers/index.ts b/packages/server/src/helpers/index.ts index 79fbe74a2..a9abe7f9b 100644 --- a/packages/server/src/helpers/index.ts +++ b/packages/server/src/helpers/index.ts @@ -1,4 +1,5 @@ export * from './base64url' export * from './cookie' export * from './encryption' +export * from './redis-event-publisher' export * from './signing' diff --git a/packages/server/src/helpers/redis-event-publisher.test.ts b/packages/server/src/helpers/redis-event-publisher.test.ts new file mode 100644 index 000000000..8def974ca --- /dev/null +++ b/packages/server/src/helpers/redis-event-publisher.test.ts @@ -0,0 +1,224 @@ +import type { RedisClient } from './redis-event-publisher' +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest' +import { RedisEventPublisher } from './redis-event-publisher' + +// Mock Redis client for testing +class MockRedisClient implements RedisClient { + private subscriptions = new Map void>() + private publishedMessages: Array<{ channel: string, message: string }> = [] + + async publish(channel: string, message: string): Promise { + this.publishedMessages.push({ channel, message }) + + const callback = this.subscriptions.get(channel) + if (callback) { + setImmediate(() => callback(message)) + } + + return 1 + } + + async subscribe(channel: string, callback: (message: string) => void): Promise { + this.subscriptions.set(channel, callback) + } + + async unsubscribe(channel: string): Promise { + this.subscriptions.delete(channel) + } + + async quit(): Promise { + this.subscriptions.clear() + this.publishedMessages = [] + } + + getPublishedMessages() { + return [...this.publishedMessages] + } +} + +describe('redisEventPublisher', () => { + let mockRedis: MockRedisClient + let publisher: RedisEventPublisher<{ + 'test-event': { message: string } + 'user-updated': { id: string, name: string } + }> + + beforeEach(() => { + mockRedis = new MockRedisClient() + publisher = new RedisEventPublisher({ + redis: mockRedis, + keyPrefix: 'test:', + }) + }) + + afterEach(async () => { + await publisher.close() + }) + + describe('publish', () => { + it('should publish events to Redis', async () => { + const payload = { message: 'Hello World' } + + await publisher.publish('test-event', payload) + + const publishedMessages = mockRedis.getPublishedMessages() + expect(publishedMessages).toHaveLength(1) + expect(publishedMessages[0]!.channel).toBe('test:test-event') + + const { json, meta } = JSON.parse(publishedMessages[0]!.message) + expect(json).toEqual(payload) + expect(meta).toEqual([]) + }) + + it('should handle complex types', async () => { + const payload = { + message: 'Complex data', + id: BigInt(123), + createdAt: new Date('2023-01-01T00:00:00Z'), + tags: new Set(['urgent', 'important']), + metadata: new Map([['priority', 'high']]), + } + + await publisher.publish('test-event', payload) + + const publishedMessages = mockRedis.getPublishedMessages() + expect(publishedMessages).toHaveLength(1) + + const { json, meta } = JSON.parse(publishedMessages[0]!.message) + expect(json.id).toBe('123') + expect(json.createdAt).toBe('2023-01-01T00:00:00.000Z') + expect(json.tags).toEqual(['urgent', 'important']) + expect(json.metadata).toEqual([['priority', 'high']]) + }) + }) + + describe('subscribe with callback', () => { + it('should subscribe to events and receive messages', async () => { + const receivedMessages: any[] = [] + const callback = (payload: any) => { + receivedMessages.push(payload) + } + + const unsubscribe = await publisher.subscribe('test-event', callback) + + await publisher.publish('test-event', { message: 'Hello' }) + + await new Promise(resolve => setImmediate(resolve)) + + expect(receivedMessages).toHaveLength(1) + expect(receivedMessages[0]).toEqual({ message: 'Hello' }) + + await unsubscribe() + }) + + it('should handle multiple subscribers', async () => { + const receivedMessages1: any[] = [] + const receivedMessages2: any[] = [] + + const callback1 = (payload: any) => receivedMessages1.push(payload) + const callback2 = (payload: any) => receivedMessages2.push(payload) + + const unsubscribe1 = await publisher.subscribe('test-event', callback1) + const unsubscribe2 = await publisher.subscribe('test-event', callback2) + + await publisher.publish('test-event', { message: 'Hello' }) + + await new Promise(resolve => setImmediate(resolve)) + + expect(receivedMessages1).toHaveLength(1) + expect(receivedMessages1[0]).toEqual({ message: 'Hello' }) + expect(receivedMessages2).toHaveLength(1) + expect(receivedMessages2[0]).toEqual({ message: 'Hello' }) + + await unsubscribe1() + await unsubscribe2() + }) + + it('should unsubscribe when no more listeners', async () => { + const callback = vi.fn() + const unsubscribe = await publisher.subscribe('test-event', callback) + + expect(publisher.size).toBe(1) + + await unsubscribe() + + expect(publisher.size).toBe(0) + }) + }) + + describe('subscribe with async iterator', () => { + it('should work with for await...of', async () => { + const receivedMessages: any[] = [] + + const subscription = publisher.subscribe('test-event', {}) + + await publisher.publish('test-event', { message: 'First' }) + await publisher.publish('test-event', { message: 'Second' }) + + const iterator = subscription[Symbol.asyncIterator]() + const result1 = await iterator.next() + const result2 = await iterator.next() + + expect(result1.done).toBe(false) + expect(result1.value).toEqual({ message: 'First' }) + expect(result2.done).toBe(false) + expect(result2.value).toEqual({ message: 'Second' }) + + await iterator.return?.(undefined) + }) + + it('should handle abort signal', async () => { + const abortController = new AbortController() + + const subscription = publisher.subscribe('test-event', { + signal: abortController.signal, + }) + + setTimeout(() => { + abortController.abort() + }, 10) + + const iterator = subscription[Symbol.asyncIterator]() + + try { + await iterator.next() + expect.fail('Should have thrown AbortError') + } + catch (error: any) { + expect(error.name).toBe('AbortError') + } + }) + + it('should buffer events when no consumer', async () => { + const subscription = publisher.subscribe('test-event', { + maxBufferedEvents: 2, + }) + + await publisher.publish('test-event', { message: 'First' }) + await publisher.publish('test-event', { message: 'Second' }) + await publisher.publish('test-event', { message: 'Third' }) + + const iterator = subscription[Symbol.asyncIterator]() + const result1 = await iterator.next() + const result2 = await iterator.next() + + expect(result1.value).toEqual({ message: 'First' }) + expect(result2.value).toEqual({ message: 'Second' }) + + await iterator.return?.(undefined) + }) + }) + + describe('close', () => { + it('should clean up resources', async () => { + const callback = vi.fn() + await publisher.subscribe('test-event', callback) + + expect(publisher.size).toBe(1) + + await publisher.close() + + expect(publisher.size).toBe(0) + }) + }) +}) diff --git a/packages/server/src/helpers/redis-event-publisher.ts b/packages/server/src/helpers/redis-event-publisher.ts new file mode 100644 index 000000000..b998a4d61 --- /dev/null +++ b/packages/server/src/helpers/redis-event-publisher.ts @@ -0,0 +1,281 @@ +import type { StandardRPCJsonSerializerOptions } from '@orpc/client/standard' +import { StandardRPCJsonSerializer } from '@orpc/client/standard' +import { AsyncIteratorClass, stringifyJSON } from '@orpc/shared' + +/** + * Redis client interface that supports the required operations for event publishing. + * Most Redis clients (ioredis, node-redis, upstash) follow this pattern. + */ +export interface RedisClient { + publish(channel: string, message: string): Promise + subscribe(channel: string, callback: (message: string) => void): Promise + unsubscribe(channel: string): Promise + quit(): Promise +} + +/** + * Options for Redis Event Publisher + */ +export interface RedisEventPublisherOptions extends StandardRPCJsonSerializerOptions { + /** + * Redis client instance. Must implement the RedisClient interface. + */ + redis: RedisClient + + /** + * Key prefix for Redis channels. Defaults to 'orpc:event:' + */ + keyPrefix?: string + + /** + * Maximum number of events to buffer for async iterator subscribers. + * @default 100 + */ + maxBufferedEvents?: number +} + +/** + * Default key prefix for Redis channels + */ +const DEFAULT_KEY_PREFIX = 'orpc:event:' + +/** + * Default maximum number of events to buffer for async iterator subscribers + */ +const DEFAULT_MAX_BUFFERED_EVENTS = 100 + +/** + * Redis-based Event Publisher for cross-server and serverless use cases. + * + * This implementation uses Redis Pub/Sub to enable event publishing and subscription + * across multiple server instances, making it suitable for distributed applications + * and serverless environments. + * + * @example + * ```ts + * import { RedisEventPublisher } from '@orpc/server/helpers' + * import { createClient } from 'redis' + * + * const redis = createClient({ url: 'redis://localhost:6379' }) + * await redis.connect() + * + * const publisher = new RedisEventPublisher<{ + * 'user-updated': { id: string; name: string } + * }>({ redis }) + * + * // Publish an event + * await publisher.publish('user-updated', { id: '1', name: 'John' }) + * + * // Subscribe to events + * for await (const payload of publisher.subscribe('user-updated', { signal })) { + * console.log('User updated:', payload) + * } + * ``` + */ +export class RedisEventPublisher> { + #redis: RedisClient + #keyPrefix: string + #maxBufferedEvents: number + #serializer: StandardRPCJsonSerializer + #subscribedChannels = new Set() + #callbacks = new Map void>>() + + constructor(options: RedisEventPublisherOptions) { + this.#redis = options.redis + this.#keyPrefix = options.keyPrefix ?? DEFAULT_KEY_PREFIX + this.#maxBufferedEvents = options.maxBufferedEvents ?? DEFAULT_MAX_BUFFERED_EVENTS + this.#serializer = new StandardRPCJsonSerializer(options) + } + + get size(): number { + return Array.from(this.#callbacks.values()).reduce((total, callbacks) => total + callbacks.size, 0) + } + + /** + * Emits an event and delivers the payload to all subscribed listeners. + * The event is published to Redis and will be received by all instances. + */ + async publish(event: K, payload: T[K]): Promise { + const channel = this.#getChannelName(event) + const [json, meta] = this.#serializer.serialize(payload) + const message = stringifyJSON({ json, meta }) ?? '' + + await this.#redis.publish(channel, message) + } + + /** + * Subscribes to a specific event using a callback function. + * Returns an unsubscribe function to remove the listener. + * + * @example + * ```ts + * const unsubscribe = await publisher.subscribe('event', (payload) => { + * console.log(payload) + * }) + * + * // Later + * await unsubscribe() + * ``` + */ + subscribe(event: K, listener: (payload: T[K]) => void): Promise<() => Promise> + /** + * Subscribes to a specific event using an async iterator. + * Useful for `for await...of` loops with optional buffering and abort support. + * + * @example + * ```ts + * for await (const payload of publisher.subscribe('event', { signal })) { + * console.log(payload) + * } + * ``` + */ + subscribe(event: K, options?: { signal?: AbortSignal, maxBufferedEvents?: number }): AsyncGenerator & AsyncIteratorObject + subscribe( + event: K, + listenerOrOptions?: ((payload: T[K]) => void) | { signal?: AbortSignal, maxBufferedEvents?: number }, + ): Promise<() => Promise> | (AsyncGenerator & AsyncIteratorObject) { + if (typeof listenerOrOptions === 'function') { + return this.#subscribeWithCallback(event, listenerOrOptions) + } + + return this.#subscribeWithIterator(event, listenerOrOptions) + } + + async #subscribeWithCallback( + event: K, + listener: (payload: T[K]) => void, + ): Promise<() => Promise> { + // Add callback to the set for this event + if (!this.#callbacks.has(event)) { + this.#callbacks.set(event, new Set()) + } + this.#callbacks.get(event)!.add(listener) + + // Subscribe to Redis channel if not already subscribed + if (!this.#subscribedChannels.has(event)) { + const redisCallback = async (message: string) => { + const { json, meta } = JSON.parse(message) + const payload = this.#serializer.deserialize(json, meta) + + // Call all registered callbacks for this event + const callbacks = this.#callbacks.get(event) + if (callbacks) { + for (const callback of callbacks) { + callback(payload) + } + } + } + + await this.#redis.subscribe(this.#getChannelName(event), redisCallback) + this.#subscribedChannels.add(event) + } + + return async () => { + // Remove this specific callback + const callbacks = this.#callbacks.get(event) + if (callbacks) { + callbacks.delete(listener) + + // If no more callbacks, unsubscribe from Redis channel + if (callbacks.size === 0) { + await this.#redis.unsubscribe(this.#getChannelName(event)) + this.#subscribedChannels.delete(event) + this.#callbacks.delete(event) + } + } + } + } + + #subscribeWithIterator( + event: K, + options?: { signal?: AbortSignal, maxBufferedEvents?: number }, + ): AsyncGenerator & AsyncIteratorObject { + const signal = options?.signal + const maxBufferedEvents = options?.maxBufferedEvents ?? this.#maxBufferedEvents + + signal?.throwIfAborted() + + const bufferedEvents: T[K][] = [] + const pullResolvers: [(result: IteratorResult) => void, (error: Error) => void][] = [] + + let unsubscribe: (() => Promise) | null = null + + const listener = (payload: T[K]) => { + const resolver = pullResolvers.shift() + + if (resolver) { + resolver[0]({ done: false, value: payload }) + } + else { + bufferedEvents.push(payload) + + if (bufferedEvents.length > maxBufferedEvents) { + bufferedEvents.shift() + } + } + } + + // Subscribe asynchronously + const subscribePromise = this.#subscribeWithCallback(event, listener).then((unsub) => { + unsubscribe = unsub + }) + + const abortListener = async (event: any) => { + // Wait for unsubscribe to be set if it's not set yet + if (!unsubscribe) { + await subscribePromise + } + + if (unsubscribe) { + await unsubscribe() + } + + pullResolvers.forEach(resolver => resolver[1](event.target.reason)) + pullResolvers.length = 0 + bufferedEvents.length = 0 + } + + signal?.addEventListener('abort', abortListener, { once: true }) + + return new AsyncIteratorClass(async () => { + if (signal?.aborted) { + throw signal.reason + } + + if (bufferedEvents.length > 0) { + return { done: false, value: bufferedEvents.shift()! } + } + + return new Promise((resolve, reject) => { + pullResolvers.push([resolve, reject]) + }) + }, async () => { + if (unsubscribe) { + await unsubscribe() + } + signal?.removeEventListener('abort', abortListener) + pullResolvers.forEach(resolver => resolver[0]({ done: true, value: undefined })) + pullResolvers.length = 0 + bufferedEvents.length = 0 + }) + } + + #getChannelName(event: keyof T): string { + return `${this.#keyPrefix}${String(event)}` + } + + /** + * Close the Redis connection and clean up resources. + */ + async close(): Promise { + // Unsubscribe from all Redis channels + for (const event of this.#subscribedChannels) { + await this.#redis.unsubscribe(this.#getChannelName(event)) + } + this.#subscribedChannels.clear() + this.#callbacks.clear() + + // Close Redis connection + await this.#redis.quit() + } +}