Skip to content

feat: consumer lease streaming & message delivery (story 1.7)#7

Merged
vieiralucas merged 5 commits intomainfrom
feat/1-7-consumer-lease-message-delivery
Feb 12, 2026
Merged

feat: consumer lease streaming & message delivery (story 1.7)#7
vieiralucas merged 5 commits intomainfrom
feat/1-7-consumer-lease-message-delivery

Conversation

@vieiralucas
Copy link
Copy Markdown
Member

@vieiralucas vieiralucas commented Feb 11, 2026

Summary

  • Add consumer registry to scheduler with per-consumer tokio::sync::mpsc channels
  • Implement try_deliver_pending: scans for unleased messages, creates lease + lease_expiry entries atomically via write_batch, delivers to consumers in round-robin
  • Implement Lease gRPC RPC with two-channel bridge pattern (scheduler sends ReadyMessage, converter task maps to LeaseResponse)
  • Automatic consumer unregistration on stream close via tokio::select! detecting both message arrival and client disconnection

Test plan

  • Consumer receives enqueued messages immediately
  • Consumer receives pending messages on registration (backlog drain)
  • Lease creates entries in leases CF after delivery
  • Multiple consumers receive different messages (no duplicates)
  • Unregister consumer stops message delivery
  • Enqueue 10 messages, lease receives all 10 in FIFO order
  • 41 tests pass, clippy clean, cargo fmt clean

Summary by cubic

Implements a server‑streaming Lease RPC and a consumer registry to push messages instantly without polling. Adds per‑queue round‑robin delivery with lease‑first, atomic lease+expiry writes, per‑queue visibility timeouts (30s default), and auto cleanup on disconnect (Story 1.7).

  • New Features

    • Lease RPC opens a server stream; maps ReadyMessage → LeaseResponse with metadata (fairness_key, weight, throttle_keys, attempt_count, queue_id); unregisters on disconnect via tokio::select.
    • Scheduler registers consumers via tokio mpsc; delivers on enqueue and on registration; scans for unleased messages; writes lease and expiry in one batch using the queue’s visibility_timeout_ms; persists a per‑queue round‑robin index across calls for fairness.
  • Bug Fixes

    • Lease‑before‑send to prevent delivery without a lease; on full/closed channels roll back the lease and try next; if none available, keep the message; log list/write/rollback failures with consumer/message IDs; tests cover closed/full channel cases and lease rollback.

Written for commit 6e63f64. Summary will update on new commits.

Copy link
Copy Markdown

@cubic-dev-ai cubic-dev-ai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2 issues found across 6 files

Prompt for AI agents (all issues)

Check if these issues are valid — if so, understand the root cause of each and fix them.


<file name="crates/fila-server/src/service.rs">

<violation number="1" location="crates/fila-server/src/service.rs:43">
P2: `weight` is hardcoded to `0` and `throttle_keys` to `vec![]` instead of carrying over the original message values. The `ReadyMessage` struct is missing these fields, so the data is silently lost during delivery. Consumers will see incorrect metadata. Consider adding `weight` and `throttle_keys` to `ReadyMessage` and populating them from the source `Message`.</violation>
</file>

<file name="crates/fila-core/src/broker/scheduler.rs">

<violation number="1" location="crates/fila-core/src/broker/scheduler.rs:227">
P1: Round-robin `consumer_idx` is a local variable reset to 0 on every `try_deliver_pending` call. Since each enqueue triggers a separate call, messages arriving one-at-a-time will always be offered to the same consumer first, completely defeating the intended round-robin distribution. Promote `consumer_idx` (or a per-queue equivalent) to a `Scheduler` field so the position persists across invocations.</violation>
</file>

Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.

Comment thread crates/fila-core/src/broker/scheduler.rs Outdated
Comment thread crates/fila-server/src/service.rs Outdated
@vieiralucas vieiralucas force-pushed the feat/1-6-message-enqueue branch from 767fc0f to bb6dc55 Compare February 11, 2026 12:10
@vieiralucas vieiralucas force-pushed the feat/1-7-consumer-lease-message-delivery branch 2 times, most recently from 3f41912 to c56d405 Compare February 11, 2026 13:18
@vieiralucas vieiralucas force-pushed the feat/1-6-message-enqueue branch 2 times, most recently from 4970e1d to 8ad8619 Compare February 11, 2026 16:10
@vieiralucas vieiralucas force-pushed the feat/1-7-consumer-lease-message-delivery branch 2 times, most recently from 75dd9e7 to df92fa9 Compare February 11, 2026 16:11
@vieiralucas vieiralucas force-pushed the feat/1-6-message-enqueue branch from 8ad8619 to 71e67d3 Compare February 11, 2026 16:11
@vieiralucas vieiralucas force-pushed the feat/1-7-consumer-lease-message-delivery branch from df92fa9 to 4222fdc Compare February 11, 2026 16:18
@vieiralucas vieiralucas force-pushed the feat/1-6-message-enqueue branch from 71e67d3 to 460bef4 Compare February 11, 2026 16:18
@vieiralucas vieiralucas force-pushed the feat/1-7-consumer-lease-message-delivery branch from 4222fdc to 5841b6c Compare February 11, 2026 16:21
@vieiralucas vieiralucas force-pushed the feat/1-6-message-enqueue branch 2 times, most recently from ab6e29a to e6bfae9 Compare February 11, 2026 16:23
@vieiralucas vieiralucas force-pushed the feat/1-7-consumer-lease-message-delivery branch 2 times, most recently from 6d915b3 to b8e0d37 Compare February 11, 2026 21:00
@vieiralucas vieiralucas force-pushed the feat/1-6-message-enqueue branch 2 times, most recently from 67ebcd2 to e662cbf Compare February 11, 2026 21:03
@vieiralucas vieiralucas force-pushed the feat/1-7-consumer-lease-message-delivery branch from b8e0d37 to f66134b Compare February 11, 2026 21:03
@vieiralucas vieiralucas force-pushed the feat/1-6-message-enqueue branch from e662cbf to 89e7d8e Compare February 11, 2026 21:15
@vieiralucas vieiralucas force-pushed the feat/1-7-consumer-lease-message-delivery branch 2 times, most recently from 0b01ecd to 03f341c Compare February 11, 2026 21:33
Base automatically changed from feat/1-6-message-enqueue to main February 11, 2026 21:37
…1.7)

add consumer registry to scheduler with per-consumer tokio mpsc channels.
implement try_deliver_pending that scans for unleased messages, creates
lease and lease_expiry entries atomically, and delivers to consumers in
round-robin. implement Lease grpc rpc with two-channel bridge pattern
(ReadyMessage -> LeaseResponse conversion via spawned task) and automatic
consumer unregistration on stream close via tokio::select!.
@vieiralucas vieiralucas force-pushed the feat/1-7-consumer-lease-message-delivery branch from 03f341c to 942a6e7 Compare February 11, 2026 21:38
Comment thread crates/fila-core/src/broker/scheduler.rs Outdated
Comment thread crates/fila-core/src/broker/scheduler.rs Outdated
Comment thread crates/fila-core/src/broker/scheduler.rs Outdated
- reorder lease-then-send to prevent delivering without a lease
- roll back lease on try_send failure (channel full or closed)
- log try_send failures with consumer id and message id
- promote list_messages and lease write failures to error! level
@vieiralucas vieiralucas force-pushed the feat/1-7-consumer-lease-message-delivery branch from c17df24 to aa08438 Compare February 12, 2026 01:03
- closed consumer channel: message delivered to next consumer
- all consumers closed: lease rolled back, message preserved
- full consumer channel: message overflows to next consumer
- all consumers full: lease rolled back for undelivered message
Comment thread crates/fila-server/src/service.rs
@vieiralucas vieiralucas force-pushed the feat/1-7-consumer-lease-message-delivery branch 2 times, most recently from 87d7425 to 3d21a80 Compare February 12, 2026 01:24
@vieiralucas
Copy link
Copy Markdown
Member Author

@cubic-dev-ai full review

@cubic-dev-ai
Copy link
Copy Markdown

cubic-dev-ai Bot commented Feb 12, 2026

@cubic-dev-ai full review

@vieiralucas I have started the AI code review. It will take a few minutes to complete.

Copy link
Copy Markdown

@cubic-dev-ai cubic-dev-ai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2 issues found across 6 files

Prompt for AI agents (all issues)

Check if these issues are valid — if so, understand the root cause of each and fix them.


<file name="crates/fila-core/src/broker/scheduler.rs">

<violation number="1" location="crates/fila-core/src/broker/scheduler.rs:28">
P2: Global `consumer_rr_idx` is shared across all queues, breaking per-queue round-robin fairness. Deliveries to queue A advance the index that queue B uses, causing uneven distribution. Additionally, `queue_consumers` is collected from a `HashMap` whose iteration order is non-deterministic — any consumer add/remove reshuffles the order, making the persisted index meaningless. Consider using a per-queue round-robin index (e.g., `HashMap<String, usize>`).</violation>

<violation number="2" location="crates/fila-core/src/broker/scheduler.rs:300">
P2: Silently ignoring lease rollback failures can orphan leases and block messages. If `write_batch` (delete) fails here, the lease persists and the message is stuck until natural lease expiry. At minimum, log an error so operators can detect this condition.</violation>
</file>

Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.

Comment thread crates/fila-core/src/broker/scheduler.rs Outdated
Comment thread crates/fila-core/src/broker/scheduler.rs Outdated
- replace global consumer_rr_idx with per-queue HashMap<String, usize>
  so deliveries to one queue don't affect another queue's distribution
- log error on lease rollback failure instead of silently ignoring
@vieiralucas vieiralucas merged commit 9086848 into main Feb 12, 2026
5 checks passed
@vieiralucas vieiralucas deleted the feat/1-7-consumer-lease-message-delivery branch February 12, 2026 01:42
vieiralucas added a commit that referenced this pull request Mar 18, 2026
- apply_to_broker_storage now returns Result and propagates StorageError
  instead of silently swallowing storage failures (cubic #1)
- add DeleteLeaseExpiry mutation in ack/nack replication paths to clean up
  orphaned lease expiry entries (cubic #3)
- fix no-op leased_msg_keys.retain in recovery — now properly clears
  entries for the recovering queue before rebuild (cubic #4)
- warn when create_group is called without broker_storage set (cubic #5)
- check send_command result in watch_leader_changes — only update leading
  state on success so next poll retries on failure (cubic #6, #7)
- trigger RecoverQueue on first-sight leader state to catch messages
  replicated between startup and first poll (cubic #8)
- replace catch-all _ => {} with explicit variant listing in
  apply_to_broker_storage for compiler-enforced exhaustiveness
vieiralucas added a commit that referenced this pull request Mar 18, 2026
- apply_to_broker_storage now returns Result and propagates StorageError
  instead of silently swallowing storage failures (cubic #1)
- add DeleteLeaseExpiry mutation in ack/nack replication paths to clean up
  orphaned lease expiry entries (cubic #3)
- fix no-op leased_msg_keys.retain in recovery — now properly clears
  entries for the recovering queue before rebuild (cubic #4)
- warn when create_group is called without broker_storage set (cubic #5)
- check send_command result in watch_leader_changes — only update leading
  state on success so next poll retries on failure (cubic #6, #7)
- trigger RecoverQueue on first-sight leader state to catch messages
  replicated between startup and first poll (cubic #8)
- replace catch-all _ => {} with explicit variant listing in
  apply_to_broker_storage for compiler-enforced exhaustiveness
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant