Skip to content
This repository was archived by the owner on May 19, 2026. It is now read-only.

feat: implement real-time seat status updates and automated seat release#100

Merged
DungxND merged 3 commits into
mainfrom
feat/89-emit-realtime-events
Apr 27, 2026
Merged

feat: implement real-time seat status updates and automated seat release#100
DungxND merged 3 commits into
mainfrom
feat/89-emit-realtime-events

Conversation

@HungND-flocus
Copy link
Copy Markdown
Contributor

@HungND-flocus HungND-flocus commented Apr 26, 2026

Mô tả

Triển khai hệ thống đồng bộ trạng thái ghế ngồi theo thời gian thực (Real-time) và cơ chế tự động nhả ghế thông qua Worker chạy ngầm. Mục tiêu giúp đảm bảo trạng thái ghế (Đang giữ, Đã bán, Trống) luôn được cập nhật tức thì tới tất cả người dùng và tự động dọn dẹp các đơn hàng quá hạn.

Closes #89

Loại thay đổi

  • ✨ Feature mới

Screenshots / Demo

Kịch bản Hold

image

Kịch bản Checkout

image

Kịch bản Timeout

image

Checklist

  • Code chạy không lỗi (bun run dev)
  • TypeScript check pass (bun run check)
  • Lint pass (bun run lint)
  • Đã format code (bun run format)
  • Đã test thủ công chức năng
  • Commit message đúng convention (feat:, fix:, chore:,...)

Summary by CodeRabbit

  • New Features

    • Real-time seat availability: customers receive instant SSE updates when seats are locked, sold, or released, grouped by show.
  • Chores

    • Messaging configuration updated to use the new AMQP environment variable name.

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented Apr 26, 2026

📝 Walkthrough

Walkthrough

This PR adds SSE event emissions for seat state changes in three flows (purchase → locked, checkout → sold, worker release → available) and updates AMQP env parsing to use AMQP_URL. It also adjusts service return shapes to include seat id and showId where needed.

Changes

Cohort / File(s) Summary
Configuration Updates
src/lib/server/config.ts
Read AMQP connection from AMQP_URL (validation and mapping updated); exported config.cloudAMQPURL now sourced from parsed.AMQP_URL.
Purchase flow (lock)
src/lib/server/services/purchase.service.ts
Capture locked seat rows ({id, showId}) via returning, emit SSE_EVENTS.SEAT_UPDATE(showId) with { showId, seatIds, status: 'locked' } after commit; emission errors are caught and logged.
Order flow (checkout & release)
src/lib/server/services/order.service.ts
checkout now collects sold seats ({id, showId}), emits SEAT_UPDATE(..., status: 'sold') post-commit; releaseExpiredOrder return shape changed to return { releasedSeats: {id, showId}[] }. SSE emits are try/catch-wrapped.
Worker consumer (timeout release)
src/lib/server/mq/consumer.ts
After releaseExpiredOrder, groups released seats by showId and emits SEAT_UPDATE(..., status: 'available'); added logging and emission error handling while still acknowledging messages.

Sequence Diagram(s)

sequenceDiagram
    participant Client
    participant Service as Service\n(Purchase / Order)
    participant DB as Database
    participant EventBus
    participant SSE as SSE Endpoint
    participant Frontend as Frontend Client

    Client->>Service: API call (purchase / checkout)
    Service->>DB: Begin transaction / update seats (returning {id, showId})
    DB-->>Service: Returning locked/sold seat rows
    Service->>DB: Commit transaction
    DB-->>Service: Commit acknowledged
    Service->>Service: Group rows by showId
    Service->>EventBus: emit(SEAT_UPDATE(showId), {showId, seatIds, status})
    EventBus->>SSE: publish to channel
    SSE->>Frontend: stream event (locked/sold/available)
    Frontend-->>Frontend: update UI
    Service-->>Client: respond API result
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Possibly related PRs

Suggested reviewers

  • DungxND

"🐰
I hopped through commits, ears all alert,
Seats now shout locked, sold, or back to flert;
EventBus whispers down the SSE stream,
Frontend nods along to the real-time dream,
Hop—another release, another bright gleam!"

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 0.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed The title 'feat: implement real-time seat status updates and automated seat release' accurately and specifically summarizes the main changes in the pull request.
Description check ✅ Passed The PR description includes all required sections from the template: summary, closes issue reference, change type checkbox, screenshots/demo, and completed checklist with manual testing confirmation.
Linked Issues check ✅ Passed All three event emission flows are implemented: locked status in purchase.service.ts [#89], sold status in order.service.ts [#89], and available status in consumer.ts [#89]. Payload format matches specification and safety rules are followed with try-catch blocks.
Out of Scope Changes check ✅ Passed All code changes are directly aligned with issue #89 requirements: environment variable migration, seat status emission implementation, and worker message handling remain in scope.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feat/89-emit-realtime-events

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
src/lib/server/services/purchase.service.ts (1)

277-297: ⚠️ Potential issue | 🟠 Major

Seats released from a prior pending order are not broadcast — other clients keep showing them as locked.

When pendingOrderId exists, this block flips the previously-locked seats back to available (and deletes their order items), but no SSE available event is emitted for them. Acceptance criterion #1 ("SSE shows immediate 'available' when ... released") only holds for the worker path; for the in-purchase release path, any seat that the user drops from their cart will remain visually locked to other clients until the worker eventually times out the (now-deleted) order — which won't happen because the order was just updated/replaced.

Consider capturing released seats via .returning({ id: seats.id, showId: seats.showId }) and emitting them with status: 'available' after the transaction commits, alongside the locked emit at lines 426-441.

🛠 Suggested approach
         if (itemsToRelease.length > 0) {
-          await tx
+          const releasedRows = await tx
             .update(seats)
             .set({ status: 'available', lockedBy: null, lockedAt: null })
             .where(
               inArray(
                 seats.id,
                 itemsToRelease.map((i) => i.seatId),
               ),
-            );
+            )
+            .returning({ id: seats.id, showId: seats.showId });
+          // include releasedRows in the tx return so the post-commit block can emit 'available' SSE
         }

…then group/emit status: 'available' for releasedRows in the same post-commit block as the locked emission.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/lib/server/services/purchase.service.ts` around lines 277 - 297, The
pendingOrderId branch flips seats back to available but doesn't broadcast that
change; modify the tx.update(seats)...where(inArray(...)) call inside the
pendingOrderId block to use .returning({ id: seats.id, showId: seats.showId })
and capture the returned rows (e.g., releasedRows), then after the transaction
commits emit SSE events for those releasedRows with status: 'available' (the
same post-commit place where the existing locked emit occurs around the locked
emission block) so other clients immediately see the seats as available; ensure
any subsequent tx.delete(orderItems) remains but the broadcast happens
post-commit.
🧹 Nitpick comments (3)
src/lib/server/services/purchase.service.ts (2)

403-422: finalResponseData still leaks the internal isNewOrder field.

responseData is { ...finalResponse, isNewOrder, lockedSeatRows }. The destructure on line 422 only strips lockedSeatRows, so finalResponseData returned to the controller still has isNewOrder even though purchaseTickets is typed as Promise<PurchaseResponse> (which doesn't declare it). If isNewOrder is genuinely internal, also strip it; if it's part of the contract, add it to PurchaseResponse.

-      const { lockedSeatRows, ...finalResponseData } = responseData;
+      const { lockedSeatRows, isNewOrder: _isNewOrder, ...finalResponseData } = responseData;
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/lib/server/services/purchase.service.ts` around lines 403 - 422, The
returned object still includes the internal isNewOrder flag because you only
removed lockedSeatRows from responseData; update the cleanup before returning in
purchaseTickets to also strip isNewOrder (e.g., const { lockedSeatRows,
isNewOrder, ...finalResponseData } = responseData) so controllers receive only
the declared PurchaseResponse, or alternatively add isNewOrder to the
PurchaseResponse type if it should be part of the public contract; adjust any
references to finalResponseData/responseData accordingly.

421-444: Extract the group-by-show + per-show emit into a shared helper.

The exact same reduce + Object.entries + eventBus.emit(SSE_EVENTS.SEAT_UPDATE(...)) block now appears three times: here, in src/lib/server/services/order.service.ts (lines 282-302), and in src/lib/server/mq/consumer.ts (lines 29-42). Extracting a single helper (e.g. emitSeatStatusUpdates(seats, status)) under $lib/server/events/ would centralize the payload shape, the showId coercion, and the try/catch wrapper — and prevent drift if the SSE contract changes.

// $lib/server/events/seat-events.ts
export function emitSeatStatusUpdates(
  rows: { id: number; showId: number }[],
  status: 'available' | 'locked' | 'sold',
) {
  if (!rows?.length) return;
  try {
    const byShow = rows.reduce<Record<number, number[]>>((acc, { id, showId }) => {
      (acc[showId] ??= []).push(id);
      return acc;
    }, {});
    for (const [sId, seatIds] of Object.entries(byShow)) {
      const showId = Number(sId);
      eventBus.emit(SSE_EVENTS.SEAT_UPDATE(showId), { showId, seatIds, status });
    }
  } catch (err) {
    console.error('[sse] emitSeatStatusUpdates failed:', err);
  }
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/lib/server/services/purchase.service.ts` around lines 421 - 444, Extract
the duplicated reduce+emit logic into a shared helper (e.g.
emitSeatStatusUpdates) under $lib/server/events/ that accepts rows: { id:
number; showId: number }[] and status: 'available' | 'locked' | 'sold'; the
helper should return early on empty input, group rows by showId, coerce keys to
Number, call eventBus.emit(SSE_EVENTS.SEAT_UPDATE(showId), { showId, seatIds,
status }) for each group, and wrap the whole body in a try/catch that logs
errors; then replace the duplicated blocks in purchase.service.ts (the
lockedSeatRows handling), order.service.ts, and mq/consumer.ts with calls to
this new emitSeatStatusUpdates helper.
src/lib/server/config.ts (1)

52-52: Property name cloudAMQPURL is now inconsistent with the generic AMQP_URL env var.

The env var was generalized away from a CloudAMQP-specific name, but the exported property still encodes the provider. Consider renaming config.cloudAMQPURLconfig.amqpUrl (also update src/lib/server/mq/connection.ts which reads it) so the public-facing config matches the schema. Non-blocking.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/lib/server/config.ts` at line 52, Rename the exported config property
from cloudAMQPURL to amqpUrl to match the generic AMQP_URL env var: update the
assignment currently using parsed.AMQP_URL (symbol: cloudAMQPURL) to export
amqpUrl instead, and adjust all usages (notably the consumer in
src/lib/server/mq/connection.ts that reads config.cloudAMQPURL) to reference
config.amqpUrl; ensure any type definitions or tests referencing cloudAMQPURL
are updated to amqpUrl and run the build/tests to verify no remaining
references.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/lib/server/mq/consumer.ts`:
- Around line 27-45: The SSE emission loop in consumer.ts (processing
releasedSeats → seatsByShow and calling eventBus.emit with
SSE_EVENTS.SEAT_UPDATE) must be isolated in its own try/catch so listener errors
don't propagate and trigger a retry/DLQ after releaseExpiredOrder has committed;
wrap the for...of loop (or each emit) in try/catch, log/process the error (e.g.,
processLogger.error or console.error) and do NOT rethrow so the main flow
completes even if SSE subscribers fail.

---

Outside diff comments:
In `@src/lib/server/services/purchase.service.ts`:
- Around line 277-297: The pendingOrderId branch flips seats back to available
but doesn't broadcast that change; modify the
tx.update(seats)...where(inArray(...)) call inside the pendingOrderId block to
use .returning({ id: seats.id, showId: seats.showId }) and capture the returned
rows (e.g., releasedRows), then after the transaction commits emit SSE events
for those releasedRows with status: 'available' (the same post-commit place
where the existing locked emit occurs around the locked emission block) so other
clients immediately see the seats as available; ensure any subsequent
tx.delete(orderItems) remains but the broadcast happens post-commit.

---

Nitpick comments:
In `@src/lib/server/config.ts`:
- Line 52: Rename the exported config property from cloudAMQPURL to amqpUrl to
match the generic AMQP_URL env var: update the assignment currently using
parsed.AMQP_URL (symbol: cloudAMQPURL) to export amqpUrl instead, and adjust all
usages (notably the consumer in src/lib/server/mq/connection.ts that reads
config.cloudAMQPURL) to reference config.amqpUrl; ensure any type definitions or
tests referencing cloudAMQPURL are updated to amqpUrl and run the build/tests to
verify no remaining references.

In `@src/lib/server/services/purchase.service.ts`:
- Around line 403-422: The returned object still includes the internal
isNewOrder flag because you only removed lockedSeatRows from responseData;
update the cleanup before returning in purchaseTickets to also strip isNewOrder
(e.g., const { lockedSeatRows, isNewOrder, ...finalResponseData } =
responseData) so controllers receive only the declared PurchaseResponse, or
alternatively add isNewOrder to the PurchaseResponse type if it should be part
of the public contract; adjust any references to finalResponseData/responseData
accordingly.
- Around line 421-444: Extract the duplicated reduce+emit logic into a shared
helper (e.g. emitSeatStatusUpdates) under $lib/server/events/ that accepts rows:
{ id: number; showId: number }[] and status: 'available' | 'locked' | 'sold';
the helper should return early on empty input, group rows by showId, coerce keys
to Number, call eventBus.emit(SSE_EVENTS.SEAT_UPDATE(showId), { showId, seatIds,
status }) for each group, and wrap the whole body in a try/catch that logs
errors; then replace the duplicated blocks in purchase.service.ts (the
lockedSeatRows handling), order.service.ts, and mq/consumer.ts with calls to
this new emitSeatStatusUpdates helper.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: d37b1bdc-3d4a-4e2a-b55c-1bb3e8587e2b

📥 Commits

Reviewing files that changed from the base of the PR and between 22e5b2d and accaaff.

📒 Files selected for processing (4)
  • src/lib/server/config.ts
  • src/lib/server/mq/consumer.ts
  • src/lib/server/services/order.service.ts
  • src/lib/server/services/purchase.service.ts

Comment thread src/lib/server/mq/consumer.ts
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
src/lib/server/services/purchase.service.ts (1)

277-297: ⚠️ Potential issue | 🟡 Minor

Missing SSE emit when releasing seats from an existing pending order.

When a user has a prior pending order, lines 283-292 transition those seats from lockedavailable in the DB, but no SSE_EVENTS.SEAT_UPDATE(..., status: 'available') event is emitted for them. Other clients watching the seat map will continue to see those seats as locked until a full refresh, which contradicts the PR's real-time consistency goal.

Concretely, if a user modifies their pending cart (e.g., drops seat 3, adds seat 4), the new seat 4 is broadcast as locked, but seat 3 — now available — is silently freed. This is not one of the three flows explicitly listed in issue #89, but it produces the same "ghost lock" UX it's trying to prevent.

🔧 Suggested fix — capture released seat IDs and emit after commit
         // If a pending order exists, release its seats
+        let releasedSeatRows: { id: number; showId: number }[] = [];
         if (pendingOrderId) {
           const itemsToRelease = await tx
             .select({ seatId: orderItems.seatId })
             .from(orderItems)
             .where(eq(orderItems.orderId, pendingOrderId));

           if (itemsToRelease.length > 0) {
-            await tx
+            releasedSeatRows = await tx
               .update(seats)
               .set({ status: 'available', lockedBy: null, lockedAt: null })
               .where(
                 inArray(
                   seats.id,
                   itemsToRelease.map((i) => i.seatId),
                 ),
-              );
+              )
+              .returning({ id: seats.id, showId: seats.showId });
           }

Then return releasedSeatRows from the transaction alongside lockedSeatRows, and emit a second SSE batch with status: 'available' (in its own try/catch) before emitting the locked batch.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/lib/server/services/purchase.service.ts` around lines 277 - 297, When
releasing seats for a prior pendingOrderId, capture the released seat IDs (the
result of selecting itemsToRelease) and return them from the transaction (e.g.,
as releasedSeatRows alongside any lockedSeatRows), then after the transaction
commits emit a second SSE batch using SSE_EVENTS.SEAT_UPDATE for those released
seats with status: 'available' (in its own try/catch) before or alongside
emitting the existing locked-seat SSE batch; update the transaction block around
pendingOrderId (itemsToRelease, tx.update(seats), tx.delete(orderItems)) to
collect and return released ids and add a post-commit SSE emission for them.
🧹 Nitpick comments (1)
src/lib/server/services/purchase.service.ts (1)

421-449: SSE emission for locked seats LGTM.

Emission is placed after the DB transaction commits and after publishOrderTimeout succeeds, satisfying the "emit only after commit" rule from issue #89. The try/catch isolates listener failures so realtime issues won't break the purchase response. Payload shape { showId, seatIds, status: 'locked' } and the per-show channel match the contract used in order.service.ts and consumer.ts.

Minor stylistic note (optional): the reduce/emit block here, in order.service.ts, and in consumer.ts is now the same shape three times — a small helper like emitSeatUpdates(rows, status) would DRY this up if you want to consolidate later.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/lib/server/services/purchase.service.ts` around lines 421 - 449,
Duplicate reduce/emit logic for locked-seat SSEs appears in purchase.service.ts,
order.service.ts and consumer.ts — extract it into a small shared helper
emitSeatUpdates(rows, status) that accepts an array of seat rows and a status
string, groups rows by showId, and calls
eventBus.emit(SSE_EVENTS.SEAT_UPDATE(numShowId), { showId: numShowId, seatIds:
sIds, status }) while catching/logging errors; then replace the inline
reduce/emit block in PurchaseService (the code around lockedSeatRows,
SSE_EVENTS.SEAT_UPDATE and eventBus.emit) with a call to
emitSeatUpdates(lockedSeatRows, 'locked') so all three places reuse the same
implementation.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Outside diff comments:
In `@src/lib/server/services/purchase.service.ts`:
- Around line 277-297: When releasing seats for a prior pendingOrderId, capture
the released seat IDs (the result of selecting itemsToRelease) and return them
from the transaction (e.g., as releasedSeatRows alongside any lockedSeatRows),
then after the transaction commits emit a second SSE batch using
SSE_EVENTS.SEAT_UPDATE for those released seats with status: 'available' (in its
own try/catch) before or alongside emitting the existing locked-seat SSE batch;
update the transaction block around pendingOrderId (itemsToRelease,
tx.update(seats), tx.delete(orderItems)) to collect and return released ids and
add a post-commit SSE emission for them.

---

Nitpick comments:
In `@src/lib/server/services/purchase.service.ts`:
- Around line 421-449: Duplicate reduce/emit logic for locked-seat SSEs appears
in purchase.service.ts, order.service.ts and consumer.ts — extract it into a
small shared helper emitSeatUpdates(rows, status) that accepts an array of seat
rows and a status string, groups rows by showId, and calls
eventBus.emit(SSE_EVENTS.SEAT_UPDATE(numShowId), { showId: numShowId, seatIds:
sIds, status }) while catching/logging errors; then replace the inline
reduce/emit block in PurchaseService (the code around lockedSeatRows,
SSE_EVENTS.SEAT_UPDATE and eventBus.emit) with a call to
emitSeatUpdates(lockedSeatRows, 'locked') so all three places reuse the same
implementation.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 5d5ef240-e950-434e-bb53-79c57057ba6e

📥 Commits

Reviewing files that changed from the base of the PR and between accaaff and 7b0a8b2.

📒 Files selected for processing (3)
  • src/lib/server/mq/consumer.ts
  • src/lib/server/services/order.service.ts
  • src/lib/server/services/purchase.service.ts
🚧 Files skipped from review as they are similar to previous changes (1)
  • src/lib/server/services/order.service.ts

@DungxND DungxND merged commit 022b031 into main Apr 27, 2026
2 checks passed
@DungxND DungxND deleted the feat/89-emit-realtime-events branch April 27, 2026 09:52
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[BE] Bắn sự kiện Realtime (Emit) khi trạng thái ghế thay đổi

2 participants