feat(server): add Redis event publisher#1087
feat(server): add Redis event publisher#1087joonseolee wants to merge 1 commit intomiddleapi:mainfrom
Conversation
|
@joonseolee is attempting to deploy a commit to the unnoq-team Team on Vercel. A member of the Team first needs to authorize it. |
Summary of ChangesHello @joonseolee, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This PR introduces a Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
|
Note Other AI code review bot(s) detectedCodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review. WalkthroughAdds a new RedisEventPublisher helper with publish/subscribe APIs (callback and async iterator), associated types and defaults, plus tests. Updates helpers index to re-export the new module. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
actor Producer
participant Pub as RedisEventPublisher
participant Redis
participant Sub as Subscriber (callback)
rect rgb(245,248,255)
note right of Pub: Channel = keyPrefix + event
Producer->>Pub: publish(event, payload)
Pub->>Pub: serialize(payload)
Pub->>Redis: PUBLISH(channel, json)
end
par Subscribe (callback)
Sub->>Pub: subscribe(event, listener)
Pub->>Redis: SUBSCRIBE(channel)
Redis-->>Pub: message(json)
Pub->>Pub: deserialize(json)
Pub-->>Sub: listener(payload)
and Unsubscribe
Sub->>Pub: unsubscribe()
Pub->>Redis: UNSUBSCRIBE(channel)
end
sequenceDiagram
autonumber
participant Consumer as Consumer (async iterator)
participant Pub as RedisEventPublisher
participant Redis
Consumer->>Pub: subscribe(event, {signal?, maxBufferedEvents?})
Pub->>Redis: SUBSCRIBE(channel)
Redis-->>Pub: message(json)
Pub->>Pub: deserialize -> enqueue buffer
loop for-await-of
Consumer->>Pub: next()
Pub-->>Consumer: dequeued payload
end
alt Abort or return()
Consumer->>Pub: return()/abort
Pub->>Redis: UNSUBSCRIBE(channel)
end
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Possibly related issues
Poem
Pre-merge checks and finishing touches✅ Passed checks (3 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
📜 Recent review detailsConfiguration used: CodeRabbit UI Review profile: CHILL Plan: Pro 📒 Files selected for processing (3)
🚧 Files skipped from review as they are similar to previous changes (1)
🧰 Additional context used🧬 Code graph analysis (2)packages/server/src/helpers/redis-event-publisher.test.ts (1)
packages/server/src/helpers/redis-event-publisher.ts (3)
🔇 Additional comments (14)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Code Review
This pull request introduces a well-structured RedisEventPublisher for handling events across servers. The implementation is clean and makes good use of modern TypeScript features. The addition of both callback and async iterator-based subscriptions provides a flexible API for consumers.
I've identified a few areas for improvement, primarily concerning asynchronous error handling and robustness, which are critical for a library component like this. I've also found an issue in one of the test cases.
My comments are focused on ensuring the implementation is as robust and type-safe as possible. Overall, this is a great addition to the project.
| expect(result1.value).toEqual({ message: 'First' }) | ||
| expect(result2.value).toEqual({ message: 'Second' }) |
There was a problem hiding this comment.
There seems to be a logic error in this test case. The RedisEventPublisher is configured with maxBufferedEvents: 2. When three events are published ('First', 'Second', 'Third'), the buffer should contain the last two events due to the bufferedEvents.shift() logic when the buffer exceeds its max size. Therefore, the buffer should hold ['Second', 'Third'].
The test currently expects 'First' and 'Second', but it should expect 'Second' and 'Third' to be dequeued.
| expect(result1.value).toEqual({ message: 'First' }) | |
| expect(result2.value).toEqual({ message: 'Second' }) | |
| expect(result1.value).toEqual({ message: 'Second' }) | |
| expect(result2.value).toEqual({ message: 'Third' }) |
| this.#subscribeWithCallback(event, listener).then((unsub) => { | ||
| unsubscribe = unsub | ||
| }) |
There was a problem hiding this comment.
The promise returned by this.#subscribeWithCallback is not handled. If the subscription fails (e.g., due to a Redis error), this will result in an unhandledRejection, which could crash the Node.js process. You should handle this potential error, for instance by using .catch().
When an error occurs, it should be propagated to the consumer of the async iterator by rejecting the pending promises in pullResolvers.
this.#subscribeWithCallback(event, listener).then((unsub) => {
unsubscribe = unsub
}).catch((error) => {
// Propagate subscription error to any waiting consumers
pullResolvers.forEach(([, reject]) => reject(error))
pullResolvers.length = 0
})| const abortListener = (event: any) => { | ||
| if (unsubscribe) { | ||
| unsubscribe() | ||
| } | ||
| pullResolvers.forEach(resolver => resolver[1](event.target.reason)) | ||
| pullResolvers.length = 0 | ||
| bufferedEvents.length = 0 | ||
| } |
There was a problem hiding this comment.
There are a few issues in this abortListener that could affect robustness:
- The
unsubscribe()call returns a promise that is not handled. If it rejects, it will cause anunhandledRejection. - The
eventparameter is typed asany. It's better to have a parameterless listener and use thesignalobject from the outer scope, which is more reliable. event.target.reasonis used to get the abort reason. Usingsignal.reasonis cleaner and safer. It's also good practice to provide a fallback error in case the reason isundefined.
const abortListener = () => {
if (unsubscribe) {
unsubscribe().catch(err => console.error('Failed to unsubscribe on abort:', err))
}
const reason = signal?.reason ?? new Error('Aborted')
pullResolvers.forEach(([, reject]) => reject(reason))
pullResolvers.length = 0
bufferedEvents.length = 0
}| } | ||
| } | ||
| catch (error) { | ||
| console.error('Error processing Redis message:', error) |
There was a problem hiding this comment.
Using console.error for error handling might not be ideal for a library, as consumers may want to integrate with their own logging and monitoring systems. Consider providing a way for users to handle these errors, for example by emitting an 'error' event on the publisher or allowing an error handler to be passed in the constructor options. This would make the component more robust and configurable for production environments.
| for (const event of this.#subscribedChannels) { | ||
| await this.#redis.unsubscribe(this.#getChannelName(event)) | ||
| } |
There was a problem hiding this comment.
The current for...of loop with await will stop if any of the unsubscribe calls fail. This could leave some channels subscribed and prevent the quit command from being called. To make the close method more robust, consider using Promise.allSettled to attempt unsubscribing from all channels, regardless of individual failures.
const unsubscribePromises = Array.from(this.#subscribedChannels).map(event =>
this.#redis.unsubscribe(this.#getChannelName(event)),
)
await Promise.allSettled(unsubscribePromises)| serializerOptions?: { | ||
| customJsonSerializers?: readonly any[] | ||
| } |
There was a problem hiding this comment.
For better type safety and maintainability, it's recommended to use a more specific type for serializerOptions instead of an inline object type with any[].
You can import StandardRPCJsonSerializerOptions from @orpc/client/standard and use it here. This will ensure that the options passed to RedisEventPublisher match what StandardRPCJsonSerializer expects.
You'll need to add the following import at the top of the file:
import type { StandardRPCJsonSerializerOptions } from '@orpc/client/standard' serializerOptions?: StandardRPCJsonSerializerOptionsThere was a problem hiding this comment.
Actionable comments posted: 0
🧹 Nitpick comments (2)
packages/server/src/helpers/redis-event-publisher.test.ts (1)
194-211: Consider testing buffer overflow behavior.The test verifies buffering up to
maxBufferedEvents: 2but doesn't explicitly validate what happens when a third event is published. Based on the implementation, the buffer should drop the oldest event when exceeding the limit.Consider adding an assertion that explicitly tests the buffer overflow scenario:
await publisher.publish('test-event', { message: 'First' }) await publisher.publish('test-event', { message: 'Second' }) await publisher.publish('test-event', { message: 'Third' }) const iterator = subscription[Symbol.asyncIterator]() const result1 = await iterator.next() const result2 = await iterator.next() expect(result1.value).toEqual({ message: 'First' }) expect(result2.value).toEqual({ message: 'Second' }) + +// Publish a 4th event to test that 'First' was already consumed +// and verify the buffer correctly held 'Second' and 'Third' +await publisher.publish('test-event', { message: 'Fourth' }) +const result3 = await iterator.next() +expect(result3.value).toEqual({ message: 'Third' })Or alternatively, create a dedicated test that validates the buffer drops the oldest event when exceeding the limit by publishing 3 events before consuming any.
packages/server/src/helpers/redis-options.ts (1)
1-10: Document RedisClient adapter requirements
No built-in adapter exists for real clients—users must wrap libraries like ioredis (subscribe returnsPromise<number>; uses events), node-redis v4 (callback signature), and upstash (REST API). Add a section (README or docs) outlining how to implementRedisClientfor each, with example code.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
packages/server/src/helpers/index.ts(1 hunks)packages/server/src/helpers/redis-event-publisher.test.ts(1 hunks)packages/server/src/helpers/redis-event-publisher.ts(1 hunks)packages/server/src/helpers/redis-options.ts(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
packages/server/src/helpers/redis-event-publisher.test.ts (2)
packages/server/src/helpers/redis-options.ts (1)
RedisClient(5-10)packages/server/src/helpers/redis-event-publisher.ts (1)
RedisEventPublisher(43-248)
packages/server/src/helpers/redis-event-publisher.ts (3)
packages/server/src/helpers/redis-options.ts (2)
RedisClient(5-10)RedisEventPublisherOptions(15-38)packages/client/src/adapters/standard/rpc-json-serializer.ts (1)
StandardRPCJsonSerializer(29-214)packages/shared/src/json.ts (1)
stringifyJSON(9-12)
🔇 Additional comments (11)
packages/server/src/helpers/redis-event-publisher.test.ts (10)
6-37: LGTM! Well-designed mock for testing.The
MockRedisClientimplementation correctly simulates Redis Pub/Sub behavior for testing purposes. The use ofsetImmediateto asynchronously deliver published messages to subscribers accurately mimics real Redis behavior and ensures proper testing of async event handling.
46-52: LGTM! Proper test setup with custom prefix.The test setup correctly initializes a fresh
MockRedisClientandRedisEventPublisherinstance for each test with a customkeyPrefix. This ensures test isolation and validates the channel naming functionality.
58-73: LGTM! Validates publish and serialization format.The test correctly verifies that:
- Events are published to the expected Redis channel (
test:test-event)- The serialization format follows the
[json, meta, maps, blobs]structure- Simple payloads serialize correctly with empty metadata arrays
75-94: LGTM! Comprehensive complex type serialization test.The test validates that the serializer correctly handles:
BigInt→ string representationDate→ ISO 8601 stringSet→ arrayMap→ array of tuplesThis ensures cross-server compatibility when publishing complex JavaScript types.
98-114: LGTM! Correct handling of async message delivery.The test properly accounts for async message delivery by:
- Awaiting subscription to ensure setup is complete
- Publishing the event
- Using
setImmediatecallback to wait for async delivery- Cleaning up with unsubscribe
116-137: LGTM! Validates multiple subscriber fan-out.The test correctly verifies that multiple subscribers on the same event channel all receive the published message, which is essential for distributed event handling across server instances.
139-148: LGTM! Validates cleanup and listener count tracking.The test confirms that:
- The
sizeproperty accurately tracks active listeners- Unsubscribing properly cleans up resources
- The listener count returns to zero after unsubscribe
152-170: LGTM! Validates async iterator consumption.The test correctly demonstrates manual async iterator usage with
Symbol.asyncIteratorand validates that:
- Buffered events are delivered in FIFO order
- The iterator protocol (
next(),return()) works correctly
172-192: LGTM! Validates abort signal integration.The test correctly verifies that
AbortSignalintegration works by:
- Creating a subscription with an
AbortController- Aborting after a delay
- Confirming that pending
iterator.next()calls reject withAbortErrorThis ensures proper cleanup when operations are cancelled.
214-224: LGTM! Validates resource cleanup on close.The test confirms that calling
close()properly:
- Cleans up all subscriptions
- Resets the listener count to zero
- Closes the Redis connection
packages/server/src/helpers/redis-options.ts (1)
12-38: LGTM! Well-documented configuration options.The
RedisEventPublisherOptionsinterface is well-structured with:
- Clear documentation for each option
- Sensible defaults mentioned in JSDoc
- Flexible serialization configuration
The
serializerOptions.customJsonSerializersusesreadonly any[]for flexibility, which is acceptable given the serialization framework's requirements.
Codecov Report❌ Patch coverage is
📢 Thoughts on this report? Let us know! |
More templates
@orpc/arktype
@orpc/client
@orpc/contract
@orpc/experimental-durable-iterator
@orpc/hey-api
@orpc/interop
@orpc/json-schema
@orpc/nest
@orpc/openapi
@orpc/openapi-client
@orpc/otel
@orpc/react
@orpc/react-query
@orpc/experimental-react-swr
@orpc/server
@orpc/shared
@orpc/solid-query
@orpc/standard-server
@orpc/standard-server-aws-lambda
@orpc/standard-server-fetch
@orpc/standard-server-node
@orpc/standard-server-peer
@orpc/svelte-query
@orpc/tanstack-query
@orpc/trpc
@orpc/valibot
@orpc/vue-colada
@orpc/vue-query
@orpc/zod
commit: |
| @@ -0,0 +1,38 @@ | |||
| /** | |||
There was a problem hiding this comment.
I don't think we need another file for there stuffs (almost related to publisher class), so I think think should merge back into redis-event-publisher.ts
| /** | ||
| * Options for Redis Event Publisher | ||
| */ | ||
| export interface RedisEventPublisherOptions { |
There was a problem hiding this comment.
This interface should extends StandardRPCJsonSerializerOptions instead of require user provide it through serializerOptions
| } | ||
| } | ||
| catch (error) { | ||
| console.error('Error processing Redis message:', error) |
There was a problem hiding this comment.
We should rethrow error here avoid console as much as possible
63ab1b0 to
19003b1
Compare
|
Thanks @joonseolee! It's looking pretty good now (just a few small bugs left). I'm planning to introduce |
- [x] memory - [x] ioredis - [x] upstash redis - [x] docs - [x] random prefix all keys when tests to avoid conflict - [x] handle error after subscribe success Closes: #1011 #1087 <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit * **New Features** * New Publisher system: buffered publish/subscribe with async-iterator support and adapters for in-memory, IORedis and Upstash Redis including optional resume/replay. * **Documentation** * Publisher guide, helper docs, and event-iterator examples added; package README updated. * **Tests** * Extensive unit and integration suites (including real Redis/Upstash scenarios); CI configured to run Redis during tests. * **Chores** * .env.example, .gitignore, package configs, TS project config and test runner updates; ID sequencing/compare behavior adjusted. <!-- end of auto-generated comment: release notes by coderabbit.ai --> Co-authored-by: Joonseo Lee <niceweather94@gmail.com>
|
|
Summary
Adds RedisEventPublisher class to enable cross-server event handling using Redis Pub/Sub.
Changes
RedisEventPublisherclass with publish/subscribe functionalityTesting
Fixes
#1011
Summary by CodeRabbit
New Features
Tests