feat: 14.4 replication, failover & recovery#59
Conversation
There was a problem hiding this comment.
8 issues found across 12 files
Prompt for AI agents (unresolved issues)
Check if these issues are valid — if so, understand the root cause of each and fix them. If appropriate, use sub-agents to investigate and fix each issue separately.
<file name="crates/fila-core/src/cluster/store.rs">
<violation number="1" location="crates/fila-core/src/cluster/store.rs:413">
P1: `apply_to_broker_storage` silently swallows storage errors (log-only). Since this runs in the Raft `apply_to_state_machine` path, a failed mutation means the local broker storage diverges from the committed log. On failover, the new leader could be missing data. Consider returning a `Result` and propagating the error as a `StorageError` from `apply_to_state_machine`, which would let Raft handle the failure appropriately.</violation>
<violation number="2" location="crates/fila-core/src/cluster/store.rs:607">
P1: Ack and Nack both do a linear scan of all messages in the queue (`list_messages`) to find a single message by ID. This runs in the Raft `apply_to_state_machine` path on every node for every committed entry. Consider adding a secondary index (msg_id → storage key) or including the full storage key in the `ClusterRequest::Ack`/`Nack` variants so followers can do a direct lookup.</violation>
<violation number="3" location="crates/fila-core/src/cluster/store.rs:614">
P1: Lease expiry entries are not cleaned up on Ack (or Nack). The comment says "Also clean up any lease/lease_expiry entries" but only `DeleteLease` is emitted — no `DeleteLeaseExpiry`. Parse the expiry timestamp from the lease value (via `parse_expiry_from_lease_value`) to construct the `lease_expiry_key` and add a `Mutation::DeleteLeaseExpiry` to the batch. Without this, orphaned expiry entries will trigger spurious expiration attempts.</violation>
</file>
<file name="crates/fila-core/src/broker/scheduler/recovery.rs">
<violation number="1" location="crates/fila-core/src/broker/scheduler/recovery.rs:337">
P1: No-op retain on `leased_msg_keys` leaves stale entries after per-queue recovery. Unlike `pending` and `pending_by_id` (which are properly filtered to remove this queue's entries), `retain(|_, _| true)` removes nothing. After the rebuild loop, messages that are no longer leased will still have ghost entries in `leased_msg_keys`, causing inconsistent scheduler state (e.g. wrong lease counts in metrics, stale lookups in `reclaim_expired_leases`).</violation>
</file>
<file name="crates/fila-core/src/cluster/multi_raft.rs">
<violation number="1" location="crates/fila-core/src/cluster/multi_raft.rs:62">
P1: `create_group()` should fail when broker storage is unset; currently it silently creates a queue Raft store that skips applying committed entries to broker storage.</violation>
</file>
<file name="crates/fila-core/src/cluster/mod.rs">
<violation number="1" location="crates/fila-core/src/cluster/mod.rs:347">
P1: `let _ =` silently discards the `send_command` result, then `leading` is unconditionally updated. If the command channel is full during failover load, recovery is lost and never retried because `was_leader` will be `true` on the next poll.
Only update `leading` on success; log and skip the update on failure so the next poll retries.</violation>
<violation number="2" location="crates/fila-core/src/cluster/mod.rs:354">
P2: Same silent-discard pattern: if `DropQueueConsumers` fails to send, `leading` is set to `false` and the drop is never retried. Consumers would remain connected to a non-leader, receiving stale state or errors.</violation>
<violation number="3" location="crates/fila-core/src/cluster/mod.rs:360">
P1: When the watcher first discovers a queue where this node is already leader, it records the state but does not trigger `RecoverQueue`. Any messages replicated to RocksDB between initial startup recovery and the first poll will be missing from the in-memory scheduler (DRR, pending index), so they won't be delivered.
Trigger recovery on first sight when `is_leader` is true, matching the `is_leader && !was_leader` branch.</violation>
</file>
Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.
| } | ||
| super::types::ClusterRequest::Ack { queue_id, msg_id } => { | ||
| // Find the message and its lease, then delete them. | ||
| let msg_prefix = crate::storage::keys::message_prefix(queue_id); |
There was a problem hiding this comment.
P1: Ack and Nack both do a linear scan of all messages in the queue (list_messages) to find a single message by ID. This runs in the Raft apply_to_state_machine path on every node for every committed entry. Consider adding a secondary index (msg_id → storage key) or including the full storage key in the ClusterRequest::Ack/Nack variants so followers can do a direct lookup.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At crates/fila-core/src/cluster/store.rs, line 607:
<comment>Ack and Nack both do a linear scan of all messages in the queue (`list_messages`) to find a single message by ID. This runs in the Raft `apply_to_state_machine` path on every node for every committed entry. Consider adding a secondary index (msg_id → storage key) or including the full storage key in the `ClusterRequest::Ack`/`Nack` variants so followers can do a direct lookup.</comment>
<file context>
@@ -541,6 +569,108 @@ impl RaftStorage<TypeConfig> for FilaRaftStore {
+ }
+ super::types::ClusterRequest::Ack { queue_id, msg_id } => {
+ // Find the message and its lease, then delete them.
+ let msg_prefix = crate::storage::keys::message_prefix(queue_id);
+ match storage.list_messages(&msg_prefix) {
+ Ok(messages) => {
</file context>
There was a problem hiding this comment.
Dismissed — accepted for now.
The ClusterRequest::Ack only carries (queue_id, msg_id) but the storage key requires (queue_id, fairness_key, enqueued_at, msg_id). A proper fix requires either a secondary index or carrying the full key in ClusterRequest, which changes the Raft log format. For current workloads (single-digit to low-thousands messages per queue), this is acceptable. Deferring to a future optimization pass.
There was a problem hiding this comment.
Understood—thanks for the context. I’ll treat this as acceptable for current workloads and leave it as a future optimization.
Benchmark Results (median of 3 runs)Commit:
|
Benchmark Results (median of 3 runs)Commit:
|
Benchmark Results (median of 3 runs)Commit:
|
Benchmark Results (median of 3 runs)Commit:
|
vieiralucas
left a comment
There was a problem hiding this comment.
Cubic findings addressed
Fixed in commit cca7587:
- #1 (P1):
apply_to_broker_storagenow returnsResult<(), StorageError>and propagates errors instead of silently logging - #3 (P1): Added
DeleteLeaseExpirymutations in both Ack and Nack paths - #4 (P1): Fixed no-op
leased_msg_keys.retain(|_, _| true)— now properly clears entries for the recovering queue - #5 (P1): Added warning log when
create_group()is called withoutbroker_storageset - #6 (P1):
send_commandresult is now checked;leadingstate only updated on success so next poll retries - #7 (P2): Same fix as #6 for
DropQueueConsumers - #8 (P1): First-sight leader now triggers
RecoverQueueto catch messages replicated between startup and first poll
Dismissed
- #2 (P1): O(n) linear scan in Ack/Nack
apply_to_broker_storage— accepted for now. TheClusterRequest::Ackonly carries(queue_id, msg_id)but the storage key requires(queue_id, fairness_key, enqueued_at, msg_id). Adding a secondary index or carrying the full key in ClusterRequest would be the proper fix, but is a design change that affects the Raft log format (serialized ClusterRequest). Deferring to a future optimization pass. For the current workloads (single-digit to low-thousands messages per queue), this is acceptable.
95de06f to
8c8bcc1
Compare
b1a911e to
157b463
Compare
Benchmark Results (median of 3 runs)Commit:
|
Benchmark Results (median of 3 runs)Commit:
|
8c8bcc1 to
d9a736b
Compare
- Add broker storage replication: queue-level Raft state machines now apply committed entries (enqueue, ack, nack) to the broker's RocksDB on ALL nodes, not just the leader. Followers have full data for zero-loss failover. - Add LeaderChangeWatcher: background task monitors queue Raft groups for leadership changes. On leader promotion, sends RecoverQueue to rebuild in-memory scheduler state. On leader loss, sends DropQueueConsumers to close consumer streams so clients reconnect to the new leader. - Add per-queue scheduler recovery: RecoverQueue command rebuilds DRR keys, pending index, and leased_msg_keys for a single queue from RocksDB without disrupting other queues. - Add consumer stream leader-awareness: consume() handler rejects requests on non-leader nodes with UNAVAILABLE status. - 3 new integration tests: failover leader election, zero message loss after failover, node rejoin and catchup.
- apply_to_broker_storage now returns Result and propagates StorageError instead of silently swallowing storage failures (cubic #1) - add DeleteLeaseExpiry mutation in ack/nack replication paths to clean up orphaned lease expiry entries (cubic #3) - fix no-op leased_msg_keys.retain in recovery — now properly clears entries for the recovering queue before rebuild (cubic #4) - warn when create_group is called without broker_storage set (cubic #5) - check send_command result in watch_leader_changes — only update leading state on success so next poll retries on failure (cubic #6, #7) - trigger RecoverQueue on first-sight leader state to catch messages replicated between startup and first poll (cubic #8) - replace catch-all _ => {} with explicit variant listing in apply_to_broker_storage for compiler-enforced exhaustiveness
b09a368 to
3afdd6f
Compare
…eLock RocksDB exists before both Broker and ClusterManager, so there's no chicken-and-egg problem. Pass Arc<dyn StorageEngine> directly to MultiRaftManager::new and make FilaRaftStore::for_queue take it non-optionally.
Benchmark Results (median of 3 runs)Commit:
|
Summary
watch_leader_changes()background task polls queue Raft groups for leadership transitions. On leader gain →RecoverQueue(rebuild scheduler state). On leader loss →DropQueueConsumers(close consumer streams).recover_queue()rebuilds DRR keys, pending index, and leased_msg_keys from RocksDB for a single queue without disrupting other queues.consume()handler rejects non-leader nodes withUNAVAILABLEstatus, directing clients to reconnect to the leader.Acceptance Criteria Covered
Test Summary
test_cluster_failover_new_leader_elected— kill leader, verify new leader <10stest_cluster_failover_zero_message_loss— enqueue 5, kill leader, consume all 5test_cluster_node_rejoin_catchup— kill node, enqueue, restart, verify catch-upFiles Changed
crates/fila-core/src/broker/command.rscrates/fila-core/src/broker/scheduler/mod.rscrates/fila-core/src/broker/scheduler/recovery.rscrates/fila-core/src/cluster/mod.rscrates/fila-core/src/cluster/multi_raft.rscrates/fila-core/src/cluster/store.rscrates/fila-core/src/cluster/tests.rscrates/fila-server/src/service.rscrates/fila-server/src/main.rs🤖 Generated with Claude Code
Summary by cubic
Adds queue data replication and fast failover with zero‑loss recovery. Queue Raft groups apply committed enqueue/ack/nack to local storage on all nodes; on leader change the new leader rebuilds per‑queue scheduler state and consumers reconnect to the leader. Implements Story 14.4.
New Features
MultiRaftand passes it at construction toMultiRaftManager/FilaRaftStore::for_queue.watch_leader_changes()sendsRecoverQueueon promotion andDropQueueConsumerson loss; triggers recovery on first‑sight leader and retries on send failure.consume()rejects non‑leaders with UNAVAILABLE and returns NOT_FOUND if the group is missing.apply_to_broker_storage(), delete orphaned lease‑expiry entries on ack/nack, warn if a group is created without broker storage, and use explicit match arms for exhaustiveness.Migration
consume()by reconnecting to the queue leader.Written for commit 149a325. Summary will update on new commits.