Skip to content

feat(publisher): memory, ioredis, upstash redis publishers#1094

Merged
dinwwwh merged 32 commits intomainfrom
feat/publisher/renew
Oct 19, 2025
Merged

feat(publisher): memory, ioredis, upstash redis publishers#1094
dinwwwh merged 32 commits intomainfrom
feat/publisher/renew

Conversation

@dinwwwh
Copy link
Copy Markdown
Member

@dinwwwh dinwwwh commented Oct 13, 2025

  • memory
  • ioredis
  • upstash redis
  • docs
  • random prefix all keys when tests to avoid conflict
  • handle error after subscribe success

Closes: #1011 #1087

Summary by CodeRabbit

  • New Features

    • New Publisher system: buffered publish/subscribe with async-iterator support and adapters for in-memory, IORedis and Upstash Redis including optional resume/replay.
  • Documentation

    • Publisher guide, helper docs, and event-iterator examples added; package README updated.
  • Tests

    • Extensive unit and integration suites (including real Redis/Upstash scenarios); CI configured to run Redis during tests.
  • Chores

    • .env.example, .gitignore, package configs, TS project config and test runner updates; ID sequencing/compare behavior adjusted.

Co-authored-by: Joonseo Lee niceweather94@gmail.com

@vercel
Copy link
Copy Markdown

vercel Bot commented Oct 13, 2025

The latest updates on your projects. Learn more about Vercel for GitHub.

Project Deployment Preview Comments Updated (UTC)
orpc Ready Ready Preview Comment Oct 19, 2025 7:54am

💡 Enable Vercel Agent with $100 free credit for automated AI reviews

@dinwwwh dinwwwh marked this pull request as draft October 13, 2025 02:45
@dosubot dosubot Bot added the size:XL This PR changes 500-999 lines, ignoring generated files. label Oct 13, 2025
@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Oct 13, 2025

Note

Other AI code review bot(s) detected

CodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review.

Walkthrough

Adds a new Publisher abstraction with Memory, IORedis, and Upstash adapters, extensive tests and docs, package scaffolding and CI Redis support, changes to sequential ID generation and comparison, and small tooling/type updates across the monorepo.

Changes

Cohort / File(s) Summary
Core Publisher Abstraction
packages/publisher/src/publisher.ts
New abstract Publisher<T> with buffered subscriptions, async-iterator & callback overloads, AbortSignal support, and related option interfaces.
Publisher Package & Exports
packages/publisher/*
packages/publisher/package.json, packages/publisher/tsconfig.json, packages/publisher/.gitignore, packages/publisher/README.md, packages/publisher/src/index.ts, packages/publisher/src/index.test.ts
New @orpc/experimental-publisher package manifest, exports for memory/ioredis/upstash adapters, TS config, .gitignore, README, index re-export and index test.
Memory Adapter & Tests
packages/publisher/src/adapters/memory.ts, packages/publisher/src/adapters/memory.test.ts
New MemoryPublisher with optional resumeRetentionSeconds, in-memory retention/replay, cleanup throttling, size metric and comprehensive unit tests.
IORedis Adapter & Tests
packages/publisher/src/adapters/ioredis.ts, packages/publisher/src/adapters/ioredis.test.ts
New IORedisPublisher using Redis streams/channels (XADD/XREAD/XTRIM), serializer/meta handling, prefixing, deduplication, lifecycle management and extensive Redis-backed tests.
Upstash Adapter & Tests
packages/publisher/src/adapters/upstash-redis.ts, packages/publisher/src/adapters/upstash-redis.test.ts
New UpstashRedisPublisher using Upstash REST for stream resume/publish, with dedupe, prefixing, serializer hooks and integration tests.
Publisher Tests & Types
packages/publisher/src/publisher.test.ts, packages/publisher/src/publisher.test-d.ts
Behavioral and type tests covering publish/subscribe semantics, async iterator behavior, buffering, abort handling and type assertions.
Sequential ID & ID Utilities
packages/shared/src/id.ts, packages/shared/src/id.test.ts
Sequential ID generator now starts at 1, uses base-36; added compareSequentialIds(a,b) export and updated tests.
EventPublisher internals
packages/shared/src/event-publisher.ts, packages/shared/src/event-publisher.test.ts
Changed per-event listener storage from Set to Array; adjusted subscribe/unsubscribe semantics and added tests for duplicate subscriptions and idempotent unsubscribe.
Docs & Site
apps/content/docs/helpers/publisher.md, apps/content/docs/event-iterator.md, apps/content/.vitepress/config.ts, apps/content/package.json
Added Publisher docs, replaced EventPublisher example with MemoryPublisher usage, added sidebar entry and added @orpc/experimental-publisher devDependency to the docs app.
CI & Env
.env.example, .github/workflows/ci.yaml
New .env.example with Redis/Upstash placeholders; CI job now launches a Redis container with healthchecks and injects REDIS_URL / UPSTASH_* env vars/secrets for tests.
Tooling / Test Config
vitest.config.ts, (removed) vitest.workspace.ts
Replaced workspace-based Vitest configuration with multi-project vitest.config.ts; removed vitest.workspace.ts.
DevDependency & Overrides
packages/arktype/package.json, packages/server/package.json, packages/valibot/package.json, package.json
Added zod devDependency entries in several packages and a pnpm override for @wxt-dev/storage.
Playground Typing
playgrounds/tanstack-start/src/routeTree.gen.ts
Augmented Register interface with ssr: true property for typings.
Adjusted Tests for ID Change
packages/standard-server-peer/src/client.test.ts
Updated test expectations/message identifiers to match revised sequential ID scheme (IDs start at 1, base-36).

Sequence Diagram(s)

sequenceDiagram
    participant App as Application
    participant Pub as Publisher (core)
    participant Adapter as Adapter\n(Memory / IORedis / Upstash)
    participant Redis as Redis (optional)
    participant Sub as Subscriber

    rect rgba(240,248,255,0.9)
    note over App,Sub: Callback subscription (live delivery)
    App->>Pub: subscribe(event, listener)
    Pub->>Adapter: subscribeListener(event, listener, options?)
    Adapter-->>Pub: Promise<unsubscribe>
    App->>Pub: publish(event, payload)
    Pub->>Adapter: publish(event, payload)
    Adapter->>Sub: deliver payload (serialized + meta)
    end

    rect rgba(240,255,240,0.9)
    note over App,Sub: Async-iterator subscription (resume possible)
    App->>Pub: subscribe(event, { lastEventId?, signal? })
    Pub->>Adapter: subscribeListener(event, wrapper, { lastEventId })
    alt resume enabled
        Adapter->>Redis: XREAD / XADD or in-memory replay
        Redis-->>Adapter: replayed events (ids + payloads)
        Adapter->>Pub: enqueue replayed events
    end
    App->>Pub: for await (payload)
    Pub-->>App: yield payload from queue
    end

    rect rgba(255,245,240,0.9)
    note over Adapter: Redis-specific flows (IORedis / Upstash)
    Adapter->>Redis: XADD (persist) & PUBLISH channel
    Adapter->>Redis: XTRIM / EXPIRE for retention
    Adapter->>Adapter: deduplicate replay vs live deliveries
    end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related PRs

  • unnoq/orpc#603 — Prior EventPublisher work; closely related to introducing the new Publisher abstraction and adapters.
  • unnoq/orpc#928 — Touches EventPublisher/subscribe/iterator semantics and overlaps on iterator/subscribe behavior.
  • unnoq/orpc#925 — DevDependency zod updates; aligns with the zod additions in multiple package.json files here.

Poem

🐰 I hopped through queues both near and far,

Memory, Redis — now each event's a star.
IDs march onward, base-36 and bright,
Replay and publish keep the streams alight.
The rabbit nudges bytes and bounds into the night.

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Out of Scope Changes Check ⚠️ Warning Consider removing or justifying the out-of-scope additions: the zod devDependencies added to arktype, server, and valibot packages; the @wxt-dev/storage pnpm override in the root package.json; and the tanstack-start SSR interface change in playgrounds/tanstack-start/src/routeTree.gen.ts. If these changes are required for a separate issue (such as #1087 mentioned in the PR description), they should be documented or separated into a distinct pull request to maintain clear scope boundaries.
✅ Passed checks (4 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title Check ✅ Passed The pull request title "feat(publisher): memory, ioredis, upstash redis publishers" directly and accurately reflects the main changes in the changeset. It uses conventional commit formatting with a clear scope (publisher) and concisely identifies the three primary implementations being added: memory, ioredis, and upstash redis publishers. The title is specific enough for teammates to understand the core feature without being overly verbose.
Linked Issues Check ✅ Passed The pull request successfully implements all primary coding objectives from issue #1011. The changeset includes three Redis-backed publisher implementations (MemoryPublisher, IORedisPublisher, UpstashRedisPublisher) that enable cross-server and serverless event publication as required. Comprehensive test coverage for all adapters, base Publisher abstraction with buffering support, documentation updates, and CI configuration for Redis testing are all provided. The feature fulfills the stated requirement to provide Redis-based event publishers for cross-server and serverless use cases.
Docstring Coverage ✅ Passed No functions found in the changes. Docstring coverage check skipped.
✨ Finishing touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch feat/publisher/renew

📜 Recent review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 9cfe343 and 70f2209.

📒 Files selected for processing (1)
  • packages/publisher/package.json (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • packages/publisher/package.json
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
  • GitHub Check: publish-commit
  • GitHub Check: lint
  • GitHub Check: test

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@gemini-code-assist
Copy link
Copy Markdown
Contributor

Summary of Changes

Hello @unnoq, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request introduces a new @orpc/experimental-publisher package, designed to provide a unified and flexible event publishing system within the oRPC ecosystem. It includes a MemoryPublisher adapter that supports event resume capabilities, allowing new subscribers to catch up on past events. A significant enhancement is the addition of async iterator support for event subscriptions, which offers advanced features like configurable event buffering and integration with AbortSignal for controlled stream termination. This new package lays the groundwork for future event-driven features and integrations.

Highlights

  • New Publisher Package: Introduction of @orpc/experimental-publisher for unified event publishing within the oRPC ecosystem.
  • Memory Adapter with Resume: Implementation of MemoryPublisher which provides an in-memory event publishing mechanism with optional event retention for resume functionality.
  • Async Iterator Subscriptions: Enhanced subscription model to support for await...of loops, including configurable event buffering and AbortSignal integration for controlled stream termination.
  • Sequential ID Management: Introduction of SequentialIdGenerator and compareSequentialIds for robust event ID handling, particularly for resume functionality and ordering.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a new @orpc/experimental-publisher package, providing a unified event publisher with an in-memory adapter. The overall structure is well-designed, with a clear separation of concerns and comprehensive test coverage for the new functionality. I have identified a few issues, including a potential unhandled promise rejection, an incorrect HTML tag in the documentation, and a change that causes tests to fail. Addressing these points will improve the robustness and correctness of the new package.

Comment thread packages/publisher/src/publisher.ts
Comment thread packages/shared/src/id.ts
Comment thread packages/publisher/README.md
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (7)
packages/publisher/src/adapters/ioredis.ts (1)

255-266: Spreading undefined eventMeta will throw a runtime error.

The past review comment on these lines is still unresolved. If getEventMeta(payload) returns undefined, the spread operation on line 264 ({ ...eventMeta }) will throw a TypeError.

Apply defensive guards:

 protected serializePayload(payload: object): SerializedPayload {
-  const eventMeta = getEventMeta(payload)
+  const eventMeta = getEventMeta(payload) ?? {}
   const [json, meta] = this.serializer.serialize(payload)
   return { json: json as object, meta, eventMeta }
 }

 protected deserializePayload(id: string | undefined, { json, meta, eventMeta }: SerializedPayload): any {
+  const base = eventMeta ? { ...eventMeta } : {}
+  if (id !== undefined) (base as any).id = id
   return withEventMeta(
     this.serializer.deserialize(json, meta) as object,
-    id === undefined ? { ...eventMeta } : { ...eventMeta, id },
+    base,
   )
 }
packages/publisher/src/adapters/upstash-redis.ts (5)

190-193: Avoid clobbering listener Set under concurrency.

Compare-and-set to prevent overwriting a Set created by a racing caller.

Apply:

-    let listeners = this.listenersMap.get(key)
-    if (!listeners) {
-      this.listenersMap.set(key, listeners = new Set())
-    }
+    let listeners = this.listenersMap.get(key)
+    if (!listeners) {
+      const existingSet = this.listenersMap.get(key)
+      if (existingSet) {
+        listeners = existingSet
+      } else {
+        listeners = new Set()
+        this.listenersMap.set(key, listeners)
+      }
+    }

98-104: Store JSON strings in streams and parse on resume.

Avoid driver-dependent object encoding in XADD/XREAD; persist strings and JSON.parse on read.

Apply:

-        const results = await this.redis.multi()
-          .xadd(key, '*', { data: serialized })
+        const results = await this.redis.multi()
+          .xadd(key, '*', { data: JSON.stringify(serialized) })
           .xtrim(key, { strategy: 'MINID', exactness: this.xtrimExactness, threshold: `${now - this.retentionSeconds * 1000}-0` })
           .expire(key, this.retentionSeconds * 2)
           .exec()
@@
-        const result = await this.redis.xadd(key, '*', { data: serialized })
+        const result = await this.redis.xadd(key, '*', { data: JSON.stringify(serialized) })
         id = result
       }
@@
-            for (const [id, fields] of items) {
-              const serialized = fields[1]! // field value is at index 1 (index 0 is field name 'data')
-              const payload = this.deserializePayload(id, serialized)
+            for (const [id, fields] of items) {
+              const raw = Array.isArray(fields)
+                ? fields[1]
+                : (fields as any).data
+              const parsed = typeof raw === 'string' ? JSON.parse(raw) : raw
+              const payload = this.deserializePayload(id, parsed)
               resumePayloadIds.add(id)
               originalListener(payload)
             }

Based on learnings.

Also applies to: 107-109, 205-208


112-113: Publish explicit JSON and parse on subscribe.

Don’t rely on SDK to auto-(de)serialize pub/sub payloads.

Apply:

-    await this.redis.publish(key, { ...serialized, id })
+    await this.redis.publish(key, JSON.stringify({ ...serialized, id }))

And in the subscriber handler:

-      subscription.on('message', (event) => {
+      subscription.on('message', (event) => {
         try {
           const listeners = this.listenersMap.get(event.channel)

           if (listeners) {
-            const { id, ...rest } = event.message as any
-            const payload = this.deserializePayload(id, rest)
+            const parsed = typeof event.message === 'string'
+              ? JSON.parse(event.message as any)
+              : (event.message as any)
+            const payload = this.deserializePayload(parsed.id, parsed)
 
             for (const listener of listeners) {
               listener(payload)
             }
           }
         }
         catch {

Based on learnings.

Also applies to: 146-157


253-257: Fix potential TypeError when eventMeta is undefined.

Spreading undefined throws; guard before spreading.

Apply:

-  protected deserializePayload(id: string | undefined, { json, meta, eventMeta }: SerializedPayload): any {
-    return withEventMeta(
-      this.serializer.deserialize(json, meta) as object,
-      id === undefined ? { ...eventMeta } : { ...eventMeta, id },
-    )
-  }
+  protected deserializePayload(id: string | undefined, { json, meta, eventMeta }: SerializedPayload): any {
+    const base = eventMeta ? { ...eventMeta } as Record<string, unknown> : {}
+    if (id !== undefined) (base as any).id = id
+    return withEventMeta(this.serializer.deserialize(json, meta) as object, base)
+  }

180-184: Eliminate duplicate concurrent subscriptions.

Re-check after readiness and prefer existing subscription; close the extra one.

Apply:

       try {
         this.subscriptionPromiseMap.set(key, promise)
         await promise
-        this.subscriptionsMap.set(key, subscription) // set after subscription is ready
+        const existing = this.subscriptionsMap.get(key)
+        if (existing) {
+          try { await subscription.unsubscribe() } catch {}
+          subscription = existing
+        } else {
+          this.subscriptionsMap.set(key, subscription) // set after subscription is ready
+        }
       }
       finally {
         this.subscriptionPromiseMap.delete(key)
       }
packages/publisher/src/adapters/upstash-redis.test.ts (1)

13-14: Gate the suite via SKIP env too (don’t rely only on creds).

Allow CI/users to skip with SKIP_UPSTASH_TESTS; keeps forks green without secrets.

Apply:

-describe.concurrent('upstash redis publisher', { skip: !UPSTASH_REDIS_REST_URL || !UPSTASH_REDIS_REST_TOKEN, timeout: 20000 }, () => {
+const suite = (!UPSTASH_REDIS_REST_URL || !UPSTASH_REDIS_REST_TOKEN || process.env.SKIP_UPSTASH_TESTS)
+  ? describe.skip
+  : describe
+
+suite.concurrent('upstash redis publisher', { timeout: 20000 }, () => {
🧹 Nitpick comments (7)
packages/publisher/src/adapters/ioredis.ts (2)

115-134: Strengthen result validation before non-null assertions.

Lines 129 and 133 use multiple non-null assertions (result![0]![1] and result!) that assume Redis always returns the expected structure. While the error loop (lines 122-126) checks for command errors, it doesn't validate that result is non-null or that the array structure exists.

Consider defensive guards:

     const result = await this.commander.multi()
       .xadd(key, '*', 'data', stringifyJSON(serialized))
       .xtrim(key, 'MINID', this.xtrimExactness as '~', `${now - this.retentionSeconds * 1000}-0`)
       .expire(key, this.retentionSeconds * 2)
       .exec()

-    if (result) {
+    if (!result || result.length === 0) {
+      throw new Error('Redis transaction returned unexpected result')
+    }
+
       for (const [error] of result) {
         if (error) {
           throw error
         }
       }
-    }

-    id = (result![0]![1] as string)
+    const xaddResult = result[0]?.[1]
+    if (typeof xaddResult !== 'string') {
+      throw new Error('XADD did not return expected ID')
+    }
+    id = xaddResult
   }
   else {
     const result = await this.commander.xadd(key, '*', 'data', stringifyJSON(serialized))
-    id = result!
+    if (typeof result !== 'string') {
+      throw new Error('XADD did not return expected ID')
+    }
+    id = result
   }

178-181: TODO: Implement error logging for message deserialization failures.

Silent error swallowing (also on lines 222-225) makes debugging difficult. At minimum, log to console.error or integrate with a logging framework.

Would you like me to generate a logging implementation or open an issue to track this TODO?

packages/publisher/src/adapters/ioredis.test.ts (1)

158-213: Consider adding test coverage for payloads without event metadata.

While this test verifies metadata propagation when present, there's no test for payloads where getEventMeta() returns undefined. This edge case would help catch the spread issue flagged in the adapter implementation.

Example test:

it('handles payloads without event metadata', async () => {
  const publisher = createTestingPublisher({
    resumeRetentionSeconds: 10,
  })

  const listener1 = vi.fn()
  const unsub1 = await publisher.subscribe('event1', listener1)

  // Payload without any event metadata
  const payload = { order: 1 }
  await publisher.publish('event1', payload)

  await vi.waitFor(() => {
    expect(listener1).toHaveBeenCalledTimes(1)
  })

  const received = listener1.mock.calls[0]![0]
  expect(received).toEqual(expect.objectContaining(payload))
  expect(getEventMeta(received)?.id).toBeDefined() // Should have auto-generated ID

  await unsub1()
})
packages/publisher/src/adapters/upstash-redis.test.ts (2)

17-21: Ensure prefixes are namespaced with a delimiter.

Add a trailing colon to avoid accidental key collisions like abcevent1.

-    const publisher = new UpstashRedisPublisher(useRedis, {
-      prefix: crypto.randomUUID(), // isolated from other tests
+    const publisher = new UpstashRedisPublisher(useRedis, {
+      prefix: `${crypto.randomUUID()}:`, // isolated from other tests
       ...options,
     })

381-384: Deflake TTL assertion.

Network jitter can make TTL slightly above 2s right after EXPIRE. Loosen upper bound.

-        expect(ttl1).toBeLessThanOrEqual(2) // (2 * retentionSeconds)
+        expect(ttl1).toBeLessThanOrEqual(3) // allow small clock/HTTP jitter
packages/publisher/src/adapters/upstash-redis.ts (2)

235-238: Unsubscribe errors should not bubble.

Protect unsubscribe to avoid leaking rejections during teardown.

-          this.subscriptionsMap.delete(key) // should execute before async to avoid race condition
-          await subscription.unsubscribe()
+          this.subscriptionsMap.delete(key) // should execute before async to avoid race condition
+          try {
+            await subscription.unsubscribe()
+          } catch {
+            // ignore
+          }

247-251: Normalize eventMeta at serialization time.

Optional but safer: ensure eventMeta is always an object.

-  protected serializePayload(payload: object): SerializedPayload {
-    const eventMeta = getEventMeta(payload)
+  protected serializePayload(payload: object): SerializedPayload {
+    const eventMeta = getEventMeta(payload) ?? {}
     const [json, meta] = this.serializer.serialize(payload)
     return { json: json as object, meta, eventMeta }
   }
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 0881c35 and eb574ba.

📒 Files selected for processing (4)
  • packages/publisher/src/adapters/ioredis.test.ts (1 hunks)
  • packages/publisher/src/adapters/ioredis.ts (1 hunks)
  • packages/publisher/src/adapters/upstash-redis.test.ts (1 hunks)
  • packages/publisher/src/adapters/upstash-redis.ts (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (4)
packages/publisher/src/adapters/upstash-redis.ts (3)
packages/client/src/adapters/standard/rpc-json-serializer.ts (3)
  • StandardRPCJsonSerializedMetaItem (15-15)
  • StandardRPCJsonSerializerOptions (25-27)
  • StandardRPCJsonSerializer (29-214)
packages/publisher/src/publisher.ts (2)
  • PublisherOptions (3-18)
  • PublisherSubscribeListenerOptions (20-25)
packages/shared/src/value.ts (1)
  • fallback (17-19)
packages/publisher/src/adapters/upstash-redis.test.ts (1)
packages/publisher/src/adapters/upstash-redis.ts (2)
  • UpstashRedisPublisher (33-259)
  • UpstashRedisPublisherOptions (11-31)
packages/publisher/src/adapters/ioredis.test.ts (2)
packages/publisher/src/adapters/ioredis.ts (2)
  • IORedisPublisher (46-267)
  • IORedisPublisherOptions (11-44)
packages/shared/src/iterator.ts (1)
  • next (60-62)
packages/publisher/src/adapters/ioredis.ts (4)
packages/client/src/adapters/standard/rpc-json-serializer.ts (3)
  • StandardRPCJsonSerializedMetaItem (15-15)
  • StandardRPCJsonSerializerOptions (25-27)
  • StandardRPCJsonSerializer (29-214)
packages/publisher/src/publisher.ts (2)
  • PublisherOptions (3-18)
  • PublisherSubscribeListenerOptions (20-25)
packages/shared/src/value.ts (1)
  • fallback (17-19)
packages/shared/src/json.ts (1)
  • stringifyJSON (9-12)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
  • GitHub Check: publish-commit
  • GitHub Check: lint
  • GitHub Check: test
🔇 Additional comments (8)
packages/publisher/src/adapters/ioredis.ts (4)

187-201: LGTM! Race condition handling is correct.

The use of listenerPromiseMap to serialize concurrent subscriptions to the same channel is well-designed. The try/finally ensures cleanup even if the subscribe fails.


236-248: LGTM! Unsubscribe cleanup logic is sound.

Synchronous map cleanup (line 239) before async Redis unsubscribe (line 246) correctly prevents race conditions. The global listener cleanup (lines 241-244) ensures proper resource management.


147-162: Excellent deduplication strategy for resume race conditions.

The pending queue (lines 148-151) combined with the resumePayloadIds Set (lines 153-159) elegantly handles the race where events published during resume could arrive via both XREAD and pub/sub. Clean implementation.


205-234: LGTM! Background resume with graceful fallback.

The async IIFE pattern allows non-blocking resume. The finally block (lines 226-233) ensures buffered events are delivered even if resume fails, and correctly routes them through the wrapped listener (line 231) for deduplication rather than directly to originalListener.

packages/publisher/src/adapters/ioredis.test.ts (4)

17-30: Excellent test setup with isolation and determinism.

The random UUID prefix (line 21) ensures test isolation, while xtrimExactness = '=' (line 25) makes stream trimming deterministic for testing. The helper function keeps tests DRY.


37-44: Excellent memory leak detection in afterAll hook.

Verifying publisher.size === 0 for all created publishers (lines 41-43) is a robust way to catch cleanup bugs. This pattern should be adopted in other adapter tests.


321-349: Clever use of { repeats: 3 } to catch race conditions.

Running this test multiple times (line 321) increases confidence in the deduplication logic when events are published during resume. The assertions verify both count and order, ensuring no duplicates or missed events.


1-550: Comprehensive test coverage with excellent edge case handling.

This test suite covers all critical scenarios: basic pub/sub, resume/replay, race conditions, error handling, cleanup, and serialization. The use of real Redis (gated by REDIS_URL) combined with isolated prefixes provides high confidence in the implementation.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

♻️ Duplicate comments (9)
package.json (1)

66-68: TypeScript version mismatch; align devDependency with overrides.

devDependencies declare ^5.9.3 while pnpm.overrides pins ~5.8.3. This desync breaks editor/CI parity. Pick one and align.

Option A (bump override to 5.9.x):

   "overrides": {
-      "@wxt-dev/storage": "1.2.0",
-      "typescript": "~5.8.3"
+      "@wxt-dev/storage": "1.2.0",
+      "typescript": "~5.9.3"
   }

Option B (stay on 5.8.x; also adjust devDep at line 54):

-    "typescript": "^5.9.3",
+    "typescript": "~5.8.3",

Also document why "@wxt-dev/storage" is pinned to 1.2.0 (changelog/regression note).

packages/publisher/src/adapters/ioredis.ts (2)

193-210: Fix race: concurrent subscribers can clobber Set and break unsubscribe.

Double-check the Set after awaiting subscribe, and resolve the Set at unsubscribe time instead of using the captured reference.

-    let listeners = this.listenersMap.get(key)
-    if (!listeners) {
-      try {
-        const promise = this.listener.subscribe(key)
-        this.listenerPromiseMap.set(key, promise)
-        await promise
-        this.listenersMap.set(key, listeners = new Set()) // only set after subscribe successfully
-      }
-      finally {
-        this.listenerPromiseMap.delete(key)
-      }
-    }
+    let listeners = this.listenersMap.get(key)
+    if (!listeners) {
+      try {
+        const promise = this.listener.subscribe(key)
+        this.listenerPromiseMap.set(key, promise)
+        await promise
+        listeners = this.listenersMap.get(key) ?? new Set()
+        this.listenersMap.set(key, listeners)
+      } finally {
+        this.listenerPromiseMap.delete(key)
+      }
+    }
@@
-    return async () => {
-      listeners.delete(listener)
+    return async () => {
+      const set = this.listenersMap.get(key) ?? listeners
+      set.delete(listener)
       if (onError) {
         this.listener.off('error', onError)
       }
-      if (listeners.size === 0) {
-        this.listenersMap.delete(key) // should execute before async to avoid race condition
+      if (set.size === 0) {
+        this.listenersMap.delete(key)
         if (this.redisListener && this.listenersMap.size === 0) {
           this.listener.off('message', this.redisListener)
           this.redisListener = undefined
         }
         await this.listener.unsubscribe(key)
       }
     }

Also applies to: 242-259


266-277: Guard against undefined eventMeta during (de)serialization.

Spreading undefined throws. Make eventMeta optional-safe.

-  protected serializePayload(payload: object): SerializedPayload {
-    const eventMeta = getEventMeta(payload)
+  protected serializePayload(payload: object): SerializedPayload {
+    const eventMeta = getEventMeta(payload) ?? {}
     const [json, meta] = this.serializer.serialize(payload)
     return { json: json as object, meta, eventMeta }
   }
@@
-  protected deserializePayload(id: string | undefined, { json, meta, eventMeta }: SerializedPayload): any {
-    return withEventMeta(
-      this.serializer.deserialize(json, meta) as object,
-      id === undefined ? { ...eventMeta } : { ...eventMeta, id },
-    )
-  }
+  protected deserializePayload(id: string | undefined, { json, meta, eventMeta }: SerializedPayload): any {
+    const base = eventMeta ? { ...eventMeta } : {}
+    if (id !== undefined) (base as any).id = id
+    return withEventMeta(this.serializer.deserialize(json, meta) as object, base)
+  }
packages/publisher/src/adapters/upstash-redis.ts (5)

113-114: Publish explicit JSON; parse on receive.

Avoid SDK-dependent payload coercion. Always stringify and parse. Based on learnings.

-    await this.redis.publish(key, { ...serialized, id })
+    await this.redis.publish(key, JSON.stringify({ ...serialized, id }))
@@
-      subscription.on('message', (event) => {
+      subscription.on('message', (event) => {
         try {
           const listeners = this.listenersMap.get(event.channel)
 
           if (listeners) {
-            const { id, ...rest } = event.message as any
-            const payload = this.deserializePayload(id, rest)
+            const parsed = JSON.parse(event.message as any) as { id?: string } & SerializedPayload
+            const payload = this.deserializePayload(parsed.id, parsed)
 
             for (const listener of listeners) {
               listener(payload)
             }
           }
         }
         catch (error) {
           // there error can happen when event.message is invalid
           options?.onError?.(error as ThrowableError)
         }
       })

Also applies to: 147-164


99-110: Store serialized string in XADD; parse on resume.

Upstash stream fields should be strings; align with ioredis and parse consistently.

-        const results = await this.redis.multi()
-          .xadd(key, '*', { data: serialized })
+        const results = await this.redis.multi()
+          .xadd(key, '*', { data: JSON.stringify(serialized) })
@@
-        const result = await this.redis.xadd(key, '*', { data: serialized })
+        const result = await this.redis.xadd(key, '*', { data: JSON.stringify(serialized) })
         id = result

And when resuming:

-            for (const [id, fields] of items) {
-              const serialized = fields[1]! // field value is at index 1 (index 0 is field name 'data')
-              const payload = this.deserializePayload(id, serialized)
+            for (const [id, fields] of items) {
+              const serialized =
+                typeof (fields as any).data === 'string'
+                  ? JSON.parse((fields as any).data)
+                  : Array.isArray(fields) ? JSON.parse(fields[1] as string) : (fields as any).data
+              const payload = this.deserializePayload(id, serialized)
               resumePayloadIds.add(id)
               originalListener(payload)
             }

Also applies to: 207-210


182-189: Prevent duplicate concurrent subscriptions.

After readiness, prefer existing subscription if another caller won the race; unsubscribe the extra.

       try {
         this.subscriptionPromiseMap.set(key, promise)
         await promise
-        this.subscriptionsMap.set(key, subscription) // set after subscription is ready
+        const existing = this.subscriptionsMap.get(key)
+        if (existing) {
+          try { await subscription.unsubscribe() } catch {}
+          subscription = existing
+        } else {
+          this.subscriptionsMap.set(key, subscription)
+        }
       }
       finally {
         this.subscriptionPromiseMap.delete(key)
       }

192-196: Same Set clobber race as ioredis; double-check before creating.

-    let listeners = this.listenersMap.get(key)
-    if (!listeners) {
-      this.listenersMap.set(key, listeners = new Set())
-    }
+    let listeners = this.listenersMap.get(key)
+    if (!listeners) {
+      listeners = this.listenersMap.get(key) ?? new Set()
+      this.listenersMap.set(key, listeners)
+    }

249-260: Guard eventMeta during (de)serialization.

Avoid spreading undefined to prevent runtime TypeError.

-  protected serializePayload(payload: object): SerializedPayload {
-    const eventMeta = getEventMeta(payload)
+  protected serializePayload(payload: object): SerializedPayload {
+    const eventMeta = getEventMeta(payload) ?? {}
     const [json, meta] = this.serializer.serialize(payload)
     return { json: json as object, meta, eventMeta }
   }
@@
-  protected deserializePayload(id: string | undefined, { json, meta, eventMeta }: SerializedPayload): any {
-    return withEventMeta(
-      this.serializer.deserialize(json, meta) as object,
-      id === undefined ? { ...eventMeta } : { ...eventMeta, id },
-    )
-  }
+  protected deserializePayload(id: string | undefined, { json, meta, eventMeta }: SerializedPayload): any {
+    const base = eventMeta ? { ...eventMeta } : {}
+    if (id !== undefined) (base as any).id = id
+    return withEventMeta(this.serializer.deserialize(json, meta) as object, base)
+  }
packages/publisher/src/publisher.ts (1)

180-181: Catch unsubscribe rejection during iterator cleanup.

Prevents unhandled rejection if unsubscribe() throws/rejects. This mirrors the earlier catch paths.

-      await unsubscribePromise.then(unsubscribe => unsubscribe())
+      await unsubscribePromise.then(unsubscribe => unsubscribe()).catch(() => {
+        // TODO: log error
+      })
🧹 Nitpick comments (6)
packages/publisher/src/publisher.test.ts (2)

114-121: Avoid real timers to reduce flakiness.

Use vi.useFakeTimers() + runAllTimers() around setTimeout-driven iteration/return to make this deterministic.


330-334: Make “error happen after pull” deterministic.

AbortSignal.timeout(100) can fire before onError, causing a different rejection. Use a manual AbortController (abort after invoking onError) to assert the intended error reliably.

packages/publisher/src/adapters/upstash-redis.test.ts (1)

381-391: TTL assertion may be flaky across regions.

Upstash TTL can drift slightly; consider widening the upper bound or polling until expired instead of asserting <= 2s strictly.

packages/publisher/src/adapters/ioredis.test.ts (2)

541-553: Close invalid Redis client and disable reconnects to avoid leaked handles/flaky counts.

  • The invalid client keeps retry timers by default; not quitting it can leave open handles and multiple error emissions.
  • Disable reconnect/ready check, use lazyConnect, and always quit the client. Relax the exact error count to avoid flakiness.
-    it('subscribe should throw & on connection error', async () => {
-      const invalidListener = new Redis('redis://invalid', {
-        maxRetriesPerRequest: 0,
-      })
+    it('subscribe should throw & on connection error', async () => {
+      const invalidListener = new Redis('redis://invalid', {
+        maxRetriesPerRequest: 0,
+        retryStrategy: () => null,     // no reconnect timers
+        enableReadyCheck: false,
+        lazyConnect: true,
+      })
+      onTestFinished(async () => { try { await invalidListener.quit() } catch {} })
@@
-      await expect(publisher.subscribe('event1', listener1, { onError })).rejects.toThrow()
-      expect(listener1).toHaveBeenCalledTimes(0)
-      expect(onError).toHaveBeenCalledTimes(1)
+      await expect(publisher.subscribe('event1', listener1, { onError })).rejects.toThrow()
+      expect(listener1).toHaveBeenCalledTimes(0)
+      await vi.waitFor(() => expect(onError).toHaveBeenCalled())
     })

Additionally, in the adapter, if subscribe() fails, detach the error listener you attach before subscribe to prevent leaks (see suggestion below tied to multi‑subscriber errors).


21-23: Avoid relying on global crypto; import randomUUID for TS/node portability.

Prevents TS lib config issues and keeps intent clear.

+import { randomUUID } from 'node:crypto'
@@
-      prefix: crypto.randomUUID(), // isolated from other tests
+      prefix: randomUUID(), // isolated from other tests
@@
-        const prefix = `cleanup:${crypto.randomUUID()}:`
+        const prefix = `cleanup:${randomUUID()}:`
@@
-        const prefix = `expire:${crypto.randomUUID()}:`
+        const prefix = `expire:${randomUUID()}:`
@@
-    const prefix = `custom:${crypto.randomUUID()}:`
+    const prefix = `custom:${randomUUID()}:`
@@
-      const prefix = `invalid:${crypto.randomUUID()}:`
+      const prefix = `invalid:${randomUUID()}:`

Also applies to: 353-357, 382-386, 409-413, 556-559

packages/publisher/src/publisher.ts (1)

145-152: Use signal.reason instead of event.target.reason.

More robust across runtimes and typings.

-    function abortListener(event: any) {
-      pullResolvers.forEach(resolver => resolver[1](event.target.reason))
+    function abortListener() {
+      // AbortSignal is closed over; prefer signal.reason over event.target.reason
+      pullResolvers.forEach(resolver => resolver[1](signal!.reason as any))
       pullResolvers.length = 0
       bufferedEvents.length = 0
       unsubscribePromise.then(unsubscribe => unsubscribe()).catch(() => {
         // TODO: log error
       })
     }
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between eb574ba and 26c5f19.

⛔ Files ignored due to path filters (1)
  • pnpm-lock.yaml is excluded by !**/pnpm-lock.yaml
📒 Files selected for processing (7)
  • package.json (1 hunks)
  • packages/publisher/src/adapters/ioredis.test.ts (1 hunks)
  • packages/publisher/src/adapters/ioredis.ts (1 hunks)
  • packages/publisher/src/adapters/upstash-redis.test.ts (1 hunks)
  • packages/publisher/src/adapters/upstash-redis.ts (1 hunks)
  • packages/publisher/src/publisher.test.ts (1 hunks)
  • packages/publisher/src/publisher.ts (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (5)
packages/publisher/src/adapters/ioredis.test.ts (1)
packages/publisher/src/adapters/ioredis.ts (2)
  • IORedisPublisher (47-278)
  • IORedisPublisherOptions (12-45)
packages/publisher/src/adapters/upstash-redis.ts (3)
packages/client/src/adapters/standard/rpc-json-serializer.ts (3)
  • StandardRPCJsonSerializedMetaItem (15-15)
  • StandardRPCJsonSerializerOptions (25-27)
  • StandardRPCJsonSerializer (29-214)
packages/publisher/src/publisher.ts (2)
  • PublisherOptions (4-19)
  • PublisherSubscribeListenerOptions (21-31)
packages/shared/src/value.ts (1)
  • fallback (17-19)
packages/publisher/src/adapters/ioredis.ts (4)
packages/client/src/adapters/standard/rpc-json-serializer.ts (3)
  • StandardRPCJsonSerializedMetaItem (15-15)
  • StandardRPCJsonSerializerOptions (25-27)
  • StandardRPCJsonSerializer (29-214)
packages/publisher/src/publisher.ts (2)
  • PublisherOptions (4-19)
  • PublisherSubscribeListenerOptions (21-31)
packages/shared/src/value.ts (1)
  • fallback (17-19)
packages/shared/src/json.ts (1)
  • stringifyJSON (9-12)
packages/publisher/src/adapters/upstash-redis.test.ts (1)
packages/publisher/src/adapters/upstash-redis.ts (2)
  • UpstashRedisPublisher (34-261)
  • UpstashRedisPublisherOptions (12-32)
packages/publisher/src/publisher.test.ts (1)
packages/publisher/src/publisher.ts (1)
  • PublisherSubscribeListenerOptions (21-31)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
  • GitHub Check: publish-commit
  • GitHub Check: test
  • GitHub Check: lint

Comment thread packages/publisher/src/adapters/ioredis.test.ts Outdated
Comment thread packages/publisher/src/adapters/ioredis.ts Outdated
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (3)
packages/publisher/src/adapters/ioredis.ts (3)

166-187: Parse errors still not broadcast to all subscribers.

The redisListener closure captures onError from the current subscriber (line 145). When a parse error occurs at line 182, only the captured handler is called—subsequent subscribers' onError callbacks won't receive parse errors.

This issue was flagged in previous reviews. The fix requires tracking per-channel error handlers and fanning out parse errors to all subscribers of that channel.

Refer to the detailed fix in past review comments (lines 510-543 in the past review), which suggests adding a channelErrorMap field and dispatching errors to all registered handlers.


194-208: Error handler not cleaned up on subscribe failure.

The onError handler is attached at line 245 before this.listener.subscribe() completes at line 197. If the subscribe operation fails, there's no catch block to remove the handler, causing a leak.

Add a catch block in the try-finally (lines 194-207) to rollback the error handler on failure, as detailed in past review comments.


272-283: Guard against undefined eventMeta during deserialization.

At line 273, getEventMeta(payload) can return undefined. When deserializing at line 281, spreading { ...eventMeta } will throw if eventMeta is undefined.

Apply the fix from past review comments:

 protected serializePayload(payload: object): SerializedPayload {
-  const eventMeta = getEventMeta(payload)
+  const eventMeta = getEventMeta(payload) ?? {}
   const [json, meta] = this.serializer.serialize(payload)
   return { json: json as object, meta, eventMeta }
 }

 protected deserializePayload(id: string | undefined, { json, meta, eventMeta }: SerializedPayload): any {
+  const base = eventMeta ? { ...eventMeta } : {}
+  if (id !== undefined) (base as any).id = id
-  return withEventMeta(
-    this.serializer.deserialize(json, meta) as object,
-    id === undefined ? { ...eventMeta } : { ...eventMeta, id },
-  )
+  return withEventMeta(this.serializer.deserialize(json, meta) as object, base)
 }
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 26c5f19 and e76d1e2.

📒 Files selected for processing (2)
  • packages/publisher/src/adapters/ioredis.test.ts (1 hunks)
  • packages/publisher/src/adapters/ioredis.ts (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
packages/publisher/src/adapters/ioredis.test.ts (1)
packages/publisher/src/adapters/ioredis.ts (2)
  • IORedisPublisher (47-284)
  • IORedisPublisherOptions (12-45)
packages/publisher/src/adapters/ioredis.ts (4)
packages/client/src/adapters/standard/rpc-json-serializer.ts (3)
  • StandardRPCJsonSerializedMetaItem (15-15)
  • StandardRPCJsonSerializerOptions (25-27)
  • StandardRPCJsonSerializer (29-214)
packages/publisher/src/publisher.ts (2)
  • PublisherOptions (4-19)
  • PublisherSubscribeListenerOptions (21-31)
packages/shared/src/value.ts (1)
  • fallback (17-19)
packages/shared/src/json.ts (1)
  • stringifyJSON (9-12)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
  • GitHub Check: test
  • GitHub Check: publish-commit
  • GitHub Check: lint
  • GitHub Check: Analyze (javascript-typescript)
🔇 Additional comments (10)
packages/publisher/src/adapters/ioredis.ts (2)

12-45: LGTM! Well-documented options interface.

The interface is clear and properly extends the base types with good documentation for each option.


96-139: Publish implementation looks solid.

The cleanup logic for lastCleanupTimeMap and atomic operations using Redis transactions are well-implemented.

packages/publisher/src/adapters/ioredis.test.ts (8)

1-48: Excellent test setup and isolation.

The test infrastructure is well-designed:

  • Gracefully skips when Redis is unavailable
  • Uses random UUID prefixes for isolation between tests
  • Thoroughly validates cleanup in afterAll (listener counts and publisher size)

50-91: Good coverage of non-resume behavior.

The test properly validates that events are delivered and that resume doesn't occur when disabled.


93-410: Comprehensive resume test coverage.

The resume tests cover:

  • Basic resume flow
  • Event ordering and deduplication
  • Event metadata propagation
  • Race conditions during resume
  • Cleanup and retention with Redis TTL/XTRIM

The race condition test at lines 325-353 with { repeats: 5 } is particularly valuable for catching timing issues.


412-440: Prefix handling properly tested.

The test validates that Redis keys use the configured prefix. Good decision to avoid NUMSUB (noted as unreliable).


442-488: Serialization test covers complex scenarios.

The test validates custom serializers with nested objects, arrays, dates, and custom classes. Good coverage of the StandardRPCJsonSerializer integration.


491-508: Transaction error handling properly tested.

The mock-based test validates that transaction errors during publish are properly propagated.


545-557: Subscribe failure test looks good.

The test properly validates that subscribe errors are propagated and that the onError handler isn't called when subscription fails before it's registered.


510-543: Add test for parse error broadcasting to multiple subscribers.

The adapter has a scoping issue where parse errors captured in the redisListener closure only broadcast to the first subscriber's onError handler. The existing parse error test only validates a single subscriber scenario, which doesn't expose this bug.

Add a test case that:

  1. Creates 2+ subscribers on the same event with different onError handlers
  2. Publishes an invalid/unparseable message via Redis
  3. Verifies all subscribers' onError handlers receive the parse error

Comment thread packages/publisher/src/adapters/ioredis.test.ts
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

♻️ Duplicate comments (1)
packages/publisher/src/adapters/ioredis.ts (1)

314-319: Critical: Spreading undefined eventMeta throws TypeError.

If getEventMeta(payload) returns undefined (line 309), spreading it at line 317 throws a runtime error. This issue was flagged in a previous review but remains unresolved.

Apply this diff to safely handle optional eventMeta:

 protected deserializePayload(id: string | undefined, { json, meta, eventMeta }: SerializedPayload): any {
+  const baseMeta = eventMeta ? { ...eventMeta } : {}
+  if (id !== undefined) {
+    (baseMeta as any).id = id
+  }
   return withEventMeta(
     this.serializer.deserialize(json, meta) as object,
-    id === undefined ? { ...eventMeta } : { ...eventMeta, id },
+    baseMeta,
   )
 }
🧹 Nitpick comments (5)
packages/publisher/src/adapters/upstash-redis.test.ts (5)

39-39: Minor grammar fix in comment.

The comment reads "ensure cleanup correctly" but should be "ensure cleanup is done correctly" or "verify cleanup completed".

-      expect(publisher.size).toEqual(0) // ensure cleanup correctly
+      expect(publisher.size).toEqual(0) // verify cleanup completed

77-83: Replace arbitrary timeout with proper assertion.

The 1-second sleep followed by an exact call count check is brittle. If the test environment is slow, the assertion might pass even if resume incorrectly happened but hadn't completed yet.

Replace the sleep with a vi.waitFor that explicitly checks no new calls occurred:

-    // Wait a bit to ensure no resume happens
-    await new Promise(resolve => setTimeout(resolve, 1000))
-
-    expect(listener1).toHaveBeenCalledTimes(1) // resume not happens
+    // Publish a new event to trigger any potential resume
+    await publisher.publish('event1', { order: 3 })
+    await vi.waitFor(() => {
+      expect(listener1).toHaveBeenCalledTimes(2) // should receive only the new event
+    })
+    expect(listener1).toHaveBeenCalledTimes(2) // resume did not happen (would be 3+ if it did)

This also fixes the grammar in the comment.


308-336: Clarify intentional race condition setup.

Lines 317-318 intentionally don't await publish() to create a race with the subscribe on line 320. This is correct for testing deduplication, but the intent should be documented.

+      // Intentionally fire these publishes without awaiting to race with subscribe
       publisher.publish('event1', { order: 3 })
       publisher.publish('event1', { order: 4 })
       const listener1 = vi.fn()

Also consider whether repeats: 5 is sufficient to catch race conditions reliably — race bugs are often environment-dependent.


405-409: Remove or document commented-out code.

The NUMSUB check is commented with "not reliable" but provides no context. If it's unreliable due to Upstash's REST API limitations or eventual consistency, document why; otherwise remove it.

-    // verify channel use prefix (NUMSUB is not reliable)
-    // await vi.waitFor(async () => {
-    //   const numSub: any = await redis.exec(['PUBSUB', 'NUMSUB', `${prefix}event1`])
-    //   expect(numSub[1]).toBe(1)
-    // })
-

Or add a brief note:

-    // verify channel use prefix (NUMSUB is not reliable)
+    // Note: NUMSUB is skipped because Upstash REST API does not reliably reflect subscription counts in real-time

418-418: Fix typo in comment.

"veryfy" should be "verify".

-    // veryfy key use prefix
+    // verify key uses prefix
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between e76d1e2 and bc506f7.

📒 Files selected for processing (4)
  • packages/publisher/src/adapters/ioredis.test.ts (1 hunks)
  • packages/publisher/src/adapters/ioredis.ts (1 hunks)
  • packages/publisher/src/adapters/upstash-redis.test.ts (1 hunks)
  • packages/publisher/src/adapters/upstash-redis.ts (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
  • packages/publisher/src/adapters/upstash-redis.ts
  • packages/publisher/src/adapters/ioredis.test.ts
🧰 Additional context used
🧬 Code graph analysis (2)
packages/publisher/src/adapters/ioredis.ts (4)
packages/client/src/adapters/standard/rpc-json-serializer.ts (3)
  • StandardRPCJsonSerializedMetaItem (15-15)
  • StandardRPCJsonSerializerOptions (25-27)
  • StandardRPCJsonSerializer (29-214)
packages/publisher/src/publisher.ts (2)
  • PublisherOptions (4-19)
  • PublisherSubscribeListenerOptions (21-31)
packages/shared/src/value.ts (1)
  • fallback (17-19)
packages/shared/src/json.ts (1)
  • stringifyJSON (9-12)
packages/publisher/src/adapters/upstash-redis.test.ts (1)
packages/publisher/src/adapters/upstash-redis.ts (2)
  • UpstashRedisPublisher (34-294)
  • UpstashRedisPublisherOptions (12-32)
🔇 Additional comments (4)
packages/publisher/src/adapters/upstash-redis.test.ts (3)

425-471: LGTM—thorough serialization coverage.

The test validates complex nested objects, built-in types (Date), and custom serializers. The Person class example is clear and demonstrates the serialization API well.


486-490: Consider simplifying redundant concurrent subscriptions.

Lines 487-488 subscribe to event1 twice with identical parameters (listener1, onError1). This tests the race condition for concurrent subscriptions to the same channel, but the duplication might be unintentional.

If you want to verify that all subscriptions fail independently, consider:

     await Promise.all([ // race condition
-      expect(publisher.subscribe('event1', listener1, { onError: onError1 })).rejects.toThrow(),
-      expect(publisher.subscribe('event1', listener1, { onError: onError1 })).rejects.toThrow(),
+      expect(publisher.subscribe('event1', listener1, { onError: onError1 })).rejects.toThrow(),
       expect(publisher.subscribe('event2', listener2, { onError: onError2 })).rejects.toThrow(),
     ])

Or add a comment explaining the intent:

+    // Test concurrent subscriptions to the same channel during connection failure
     await Promise.all([ // race condition
       expect(publisher.subscribe('event1', listener1, { onError: onError1 })).rejects.toThrow(),
       expect(publisher.subscribe('event1', listener1, { onError: onError1 })).rejects.toThrow(),
       expect(publisher.subscribe('event2', listener2, { onError: onError2 })).rejects.toThrow(),
     ])

497-555: LGTM—solid edge case coverage.

Both tests validate important edge cases:

  • The subscription lifecycle test correctly verifies that the underlying Redis subscription is reused and only unsubscribed when all listeners are removed.
  • The invalid message test confirms error handlers are invoked for all subscribers when deserialization fails.
packages/publisher/src/adapters/ioredis.ts (1)

214-234: LGTM: Race condition handling looks correct.

The listenerPromiseMap pattern at lines 215-226 correctly prevents concurrent subscribers from creating duplicate subscriptions to the same channel.

Comment thread packages/publisher/src/adapters/ioredis.ts
Comment thread packages/publisher/src/adapters/ioredis.ts
Comment thread packages/publisher/src/adapters/ioredis.ts
Comment thread packages/publisher/src/adapters/upstash-redis.test.ts
Comment thread packages/publisher/src/adapters/upstash-redis.test.ts
@dinwwwh dinwwwh changed the title feat(publisher): unified event publisher for memory, redis, ... feat(publisher): memory, ioredis, upstash redis publishers Oct 18, 2025
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

♻️ Duplicate comments (12)
packages/publisher/src/adapters/memory.test.ts (1)

47-49: Also restore mocks in afterEach.

Add vi.restoreAllMocks() to prevent leaked spies between cases.

   afterEach(() => {
-    vi.useRealTimers()
+    vi.useRealTimers()
+    vi.restoreAllMocks()
   })
packages/publisher/src/adapters/upstash-redis.test.ts (2)

358-366: Compute wait from retention instead of magic 1100ms.

Tie sleep to resumeRetentionSeconds to keep the test robust.

-        // Wait for retention to expire
-        await new Promise(resolve => setTimeout(resolve, 1100))
+        const retentionSeconds = 1
+        // Wait for retention to expire
+        await new Promise(resolve => setTimeout(resolve, retentionSeconds * 1000 + 100))

385-391: Compute expiry wait (2 * retention + buffer) instead of 2500ms.

Avoid brittle hardcoding.

-        // Wait for key to expire (2 * retentionSeconds = 2 seconds)
-        await new Promise(resolve => setTimeout(resolve, 2500))
+        // Wait for key to expire (2 * retentionSeconds + buffer)
+        const retentionSeconds = 1
+        await new Promise(resolve => setTimeout(resolve, 2 * retentionSeconds * 1000 + 500))
packages/publisher/src/adapters/ioredis.ts (5)

123-142: Handle null/errored Redis replies; remove non-null assertions.

multi().exec() and xadd() can return null; current result! uses are unsafe.

         const result = await this.commander.multi()
           .xadd(key, '*', 'data', stringifyJSON(serialized))
           .xtrim(key, 'MINID', this.xtrimExactness as '~', `${now - this.retentionSeconds * 1000}-0`)
           .expire(key, this.retentionSeconds * 2) // double to avoid expires new events
           .exec()

-        if (result) {
-          for (const [error] of result) {
-            if (error) {
-              throw error
-            }
-          }
-        }
-
-        id = (result![0]![1] as string)
+        if (!result) {
+          throw new Error('Redis multi/exec returned null')
+        }
+        for (const [error] of result) {
+          if (error) throw error
+        }
+        id = result[0]![1] as string
       }
       else {
         const result = await this.commander.xadd(key, '*', 'data', stringifyJSON(serialized))
-        id = result!
+        if (!result) {
+          throw new Error('Redis XADD returned null')
+        }
+        id = result
       }

213-234: Fix race: concurrent subscribe can clobber listeners array.

Double‑check after awaiting subscribe; re-use existing array if another caller set it.

-    let listeners = this.listenersMap.get(key)
-    if (!listeners) {
-      try {
-        const promise = this.listener.subscribe(key)
-        this.listenerPromiseMap.set(key, promise)
-        await promise
-        this.listenersMap.set(key, listeners = []) // only set after subscribe successfully
-      }
+    let listeners = this.listenersMap.get(key)
+    if (!listeners) {
+      try {
+        const promise = this.listener.subscribe(key)
+        this.listenerPromiseMap.set(key, promise)
+        await promise
+        // Re-check after await to avoid races
+        listeners = this.listenersMap.get(key)
+        if (!listeners) {
+          listeners = []
+          this.listenersMap.set(key, listeners)
+        }
+      }
       finally {
         this.listenerPromiseMap.delete(key)
@@
       }
     }

275-283: Avoid splice(-1, 1) when listener/onError not found.

Guard index before splicing to prevent removing the last element accidentally.

-      listeners.splice(listeners.indexOf(listener), 1)
+      {
+        const i = listeners.indexOf(listener)
+        if (i !== -1) listeners.splice(i, 1)
+      }
@@
-        if (onErrors) {
-          onErrors.splice(onErrors.indexOf(onError), 1)
-        }
+        if (onErrors) {
+          const j = onErrors.indexOf(onError)
+          if (j !== -1) onErrors.splice(j, 1)
+        }

252-257: Defensive XREAD parsing; don’t assert fields[1].

Malformed entries can crash; validate before JSON.parse and continue on error.

-            for (const [id, fields] of items) {
-              const serialized = fields[1]! // field value is at index 1 (index 0 is field name 'data')
-              const payload = this.deserializePayload(id, JSON.parse(serialized))
+            for (const [id, fields] of items) {
+              const serialized = Array.isArray(fields) ? fields[1] : undefined
+              if (typeof serialized !== 'string') {
+                onError?.(new Error(`Malformed Redis stream entry: ${id}`) as ThrowableError)
+                continue
+              }
+              const payload = this.deserializePayload(id, JSON.parse(serialized))
               resumePayloadIds.add(id)
               originalListener(payload)
             }

305-316: Guard eventMeta; avoid spreading undefined.

withEventMeta args should be objects; use a safe base and set id conditionally.

-  protected serializePayload(payload: object): SerializedPayload {
-    const eventMeta = getEventMeta(payload)
+  protected serializePayload(payload: object): SerializedPayload {
+    const eventMeta = getEventMeta(payload) ?? {}
     const [json, meta] = this.serializer.serialize(payload)
     return { json: json as object, meta, eventMeta }
   }
@@
-  protected deserializePayload(id: string | undefined, { json, meta, eventMeta }: SerializedPayload): any {
-    return withEventMeta(
-      this.serializer.deserialize(json, meta) as object,
-      id === undefined ? { ...eventMeta } : { ...eventMeta, id },
-    )
-  }
+  protected deserializePayload(id: string | undefined, { json, meta, eventMeta }: SerializedPayload): any {
+    const base = eventMeta ? { ...eventMeta } : {}
+    if (id !== undefined) (base as any).id = id
+    return withEventMeta(this.serializer.deserialize(json, meta) as object, base)
+  }
packages/publisher/src/adapters/upstash-redis.ts (4)

103-114: Stringify payloads for PUBLISH/XADD and parse on receive.

Upstash SDK doesn’t guarantee object auto-(de)serialization; persist JSON strings and parse consistently.

-        const results = await this.redis.multi()
-          .xadd(key, '*', { data: serialized })
+        const results = await this.redis.multi()
+          .xadd(key, '*', { data: JSON.stringify(serialized) })
@@
-        const result = await this.redis.xadd(key, '*', { data: serialized })
+        const result = await this.redis.xadd(key, '*', { data: JSON.stringify(serialized) })
@@
-    await this.redis.publish(key, { ...serialized, id })
+    await this.redis.publish(key, JSON.stringify({ ...serialized, id }))
@@
-            for (const [id, fields] of items) {
-              const serialized = fields[1]! // field value is at index 1 (index 0 is field name 'data')
-              const payload = this.deserializePayload(id, serialized)
+            for (const [id, fields] of items) {
+              const raw = Array.isArray(fields) ? fields[1] : (fields as any)?.data
+              if (typeof raw !== 'string') continue
+              const parsed = JSON.parse(raw) as SerializedPayload
+              const payload = this.deserializePayload(id, parsed)
               resumePayloadIds.add(id)
               originalListener(payload)
             }

Also applies to: 117-118, 230-236


162-206: Prevent duplicate concurrent subscriptions; clean up on failure.

Re-check subscriptionsMap after readiness; detach handlers if subscribe fails.

-      subscription = this.redis.subscribe(key)
+      subscription = this.redis.subscribe(key)
       subscription.on('message', (event) => {
@@
       try {
         this.subscriptionPromiseMap.set(key, promise)
         await promise
-        this.subscriptionsMap.set(key, subscription) // set after subscription is ready
+        const existing = this.subscriptionsMap.get(key)
+        if (existing) {
+          try { await subscription.unsubscribe() } catch {}
+          subscription = existing
+        } else {
+          this.subscriptionsMap.set(key, subscription) // set after ready
+        }
       }
       finally {
         this.subscriptionPromiseMap.delete(key)
       }

208-213: Double‑check listeners array after awaits.

Avoid clobbering an array created by another caller.

-    let listeners = this.listenersMap.get(key)
-    if (!listeners) {
-      this.listenersMap.set(key, listeners = [])
-    }
+    let listeners = this.listenersMap.get(key)
+    if (!listeners) {
+      listeners = this.listenersMap.get(key)
+      if (!listeners) {
+        listeners = []
+        this.listenersMap.set(key, listeners)
+      }
+    }

283-294: Guard eventMeta; avoid spreading undefined.

Mirror the ioredis fix for safe meta handling.

-  protected serializePayload(payload: object): SerializedPayload {
-    const eventMeta = getEventMeta(payload)
+  protected serializePayload(payload: object): SerializedPayload {
+    const eventMeta = getEventMeta(payload) ?? {}
     const [json, meta] = this.serializer.serialize(payload)
     return { json: json as object, meta, eventMeta }
   }
@@
-  protected deserializePayload(id: string | undefined, { json, meta, eventMeta }: SerializedPayload): any {
-    return withEventMeta(
-      this.serializer.deserialize(json, meta) as object,
-      id === undefined ? { ...eventMeta } : { ...eventMeta, id },
-    )
-  }
+  protected deserializePayload(id: string | undefined, { json, meta, eventMeta }: SerializedPayload): any {
+    const base = eventMeta ? { ...eventMeta } : {}
+    if (id !== undefined) (base as any).id = id
+    return withEventMeta(this.serializer.deserialize(json, meta) as object, base)
+  }
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between bc506f7 and 7e4012b.

📒 Files selected for processing (8)
  • packages/publisher/src/adapters/ioredis.test.ts (1 hunks)
  • packages/publisher/src/adapters/ioredis.ts (1 hunks)
  • packages/publisher/src/adapters/memory.test.ts (1 hunks)
  • packages/publisher/src/adapters/memory.ts (1 hunks)
  • packages/publisher/src/adapters/upstash-redis.test.ts (1 hunks)
  • packages/publisher/src/adapters/upstash-redis.ts (1 hunks)
  • packages/shared/src/event-publisher.test.ts (1 hunks)
  • packages/shared/src/event-publisher.ts (3 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • packages/publisher/src/adapters/memory.ts
🧰 Additional context used
🧬 Code graph analysis (6)
packages/publisher/src/adapters/ioredis.ts (5)
packages/client/src/adapters/standard/rpc-json-serializer.ts (3)
  • StandardRPCJsonSerializedMetaItem (15-15)
  • StandardRPCJsonSerializerOptions (25-27)
  • StandardRPCJsonSerializer (29-214)
packages/publisher/src/publisher.ts (2)
  • PublisherOptions (4-19)
  • PublisherSubscribeListenerOptions (21-31)
packages/shared/src/value.ts (1)
  • fallback (17-19)
packages/shared/src/json.ts (1)
  • stringifyJSON (9-12)
packages/shared/src/function.ts (1)
  • once (3-16)
packages/publisher/src/adapters/ioredis.test.ts (1)
packages/publisher/src/adapters/ioredis.ts (2)
  • IORedisPublisher (47-317)
  • IORedisPublisherOptions (12-45)
packages/publisher/src/adapters/upstash-redis.ts (4)
packages/client/src/adapters/standard/rpc-json-serializer.ts (3)
  • StandardRPCJsonSerializedMetaItem (15-15)
  • StandardRPCJsonSerializerOptions (25-27)
  • StandardRPCJsonSerializer (29-214)
packages/publisher/src/publisher.ts (2)
  • PublisherOptions (4-19)
  • PublisherSubscribeListenerOptions (21-31)
packages/shared/src/value.ts (1)
  • fallback (17-19)
packages/shared/src/function.ts (1)
  • once (3-16)
packages/shared/src/event-publisher.ts (1)
packages/shared/src/function.ts (1)
  • once (3-16)
packages/publisher/src/adapters/upstash-redis.test.ts (1)
packages/publisher/src/adapters/upstash-redis.ts (2)
  • UpstashRedisPublisher (34-295)
  • UpstashRedisPublisherOptions (12-32)
packages/publisher/src/adapters/memory.test.ts (1)
packages/publisher/src/adapters/memory.ts (1)
  • MemoryPublisher (21-117)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
  • GitHub Check: publish-commit
  • GitHub Check: test
  • GitHub Check: lint
🔇 Additional comments (3)
packages/shared/src/event-publisher.ts (2)

1-1: LGTM: Idempotent unsubscribe with once wrapper.

The use of once ensures that calling the unsubscribe function multiple times is safe—the listener removal happens only on the first call, and subsequent calls return the cached result. This makes the API more robust and prevents accidental double-removal.

Also applies to: 94-100


29-29: LGTM: Array-based storage enables multiple subscriptions.

The change from Set to Array allows the same listener function to be subscribed multiple times, which is a useful capability for certain use cases. While indexOf + splice is O(n) compared to Set.delete's O(1), this trade-off is acceptable for typical scenarios where the number of listeners per event remains modest.

Also applies to: 90-92, 95-99

packages/publisher/src/adapters/ioredis.test.ts (1)

12-48: LGTM: suite gating and teardown are solid.

Conditional skip on REDIS_URL and listener cleanup checks look correct.

Comment thread packages/publisher/src/adapters/upstash-redis.ts Outdated
Comment thread packages/shared/src/event-publisher.test.ts
Comment thread packages/shared/src/event-publisher.test.ts
@elliotBraem
Copy link
Copy Markdown

This is awesome, worked flawlessly for pub sub broadcasting in plugins

https://github.com/near-everything/every-plugin/blob/bfebbb591b74bd9eee81255cd5d4eaccd8b9c66a/plugins/_template/src/index.ts#L55

I had to patch @orpc/shared so I'd need a release for that in order to demo, but otherwise really excited to keep building on this. Gotta upgrade an RSS plugin now

@dinwwwh dinwwwh merged commit 22ef10a into main Oct 19, 2025
11 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

size:XXL This PR changes 1000+ lines, ignoring generated files.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Redis Event Publisher

2 participants