feat: broker core with single-threaded scheduler loop#4
Conversation
There was a problem hiding this comment.
2 issues found across 11 files
Prompt for AI agents (all issues)
Check if these issues are valid — if so, understand the root cause of each and fix them.
<file name="crates/fila-core/src/broker/mod.rs">
<violation number="1" location="crates/fila-core/src/broker/mod.rs:58">
P2: send_command blocks when the channel is full despite the method contract saying it returns an error on full. Use try_send and map Full vs Disconnected to avoid hanging callers.</violation>
</file>
<file name="_bmad-output/implementation-artifacts/1-4-broker-core-scheduler-loop.md">
<violation number="1" location="_bmad-output/implementation-artifacts/1-4-broker-core-scheduler-loop.md:15">
P3: Acceptance Criteria lists `Admin`, but the rest of the story uses `Shutdown`. This inconsistency makes the required command set unclear. Align AC #3 with the `Shutdown` variant used elsewhere.</violation>
</file>
Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.
| scheduler.run(); | ||
| }) | ||
| .map_err(|e| { | ||
| FilaError::StorageError(format!("failed to spawn scheduler thread: {e}")) |
There was a problem hiding this comment.
Using a StorageError here feels like using it as a fallback for any kind of error. I’d like us to be very diligent about defining and using meaningful, specific errors.
| pub fn send_command(&self, cmd: SchedulerCommand) -> Result<()> { | ||
| self.command_tx.try_send(cmd).map_err(|e| match e { | ||
| crossbeam_channel::TrySendError::Full(_) => { | ||
| FilaError::StorageError("scheduler command channel full".to_string()) |
There was a problem hiding this comment.
Using a StorageError here feels like using it as a fallback for any kind of error. I’d like us to be very diligent about defining and using meaningful, specific errors.
| FilaError::StorageError("scheduler command channel full".to_string()) | ||
| } | ||
| crossbeam_channel::TrySendError::Disconnected(_) => { | ||
| FilaError::StorageError("scheduler channel disconnected".to_string()) |
There was a problem hiding this comment.
Using a StorageError here feels like using it as a fallback for any kind of error. I’d like us to be very diligent about defining and using meaningful, specific errors.
5c62cd7 to
b468d09
Compare
792b36c to
5ffa754
Compare
a6cdccc to
899b27b
Compare
e1f40eb to
8060632
Compare
…ch-alls add ColumnFamilyNotFound variant to StorageError and document the explicit error mapping pattern in CLAUDE.md
implement the redis-inspired single-threaded scheduler architecture: broker struct spawns a dedicated os thread running a scheduler event loop that drains commands via crossbeam channel. includes scheduler command enum with oneshot reply channels, broker config with toml deserialization, tracing subscriber setup, and graceful shutdown. command handlers are stubs to be filled in by subsequent stories.
8060632 to
acc9695
Compare
- apply_to_broker_storage now returns Result and propagates StorageError instead of silently swallowing storage failures (cubic #1) - add DeleteLeaseExpiry mutation in ack/nack replication paths to clean up orphaned lease expiry entries (cubic #3) - fix no-op leased_msg_keys.retain in recovery — now properly clears entries for the recovering queue before rebuild (cubic #4) - warn when create_group is called without broker_storage set (cubic #5) - check send_command result in watch_leader_changes — only update leading state on success so next poll retries on failure (cubic #6, #7) - trigger RecoverQueue on first-sight leader state to catch messages replicated between startup and first poll (cubic #8) - replace catch-all _ => {} with explicit variant listing in apply_to_broker_storage for compiler-enforced exhaustiveness
- apply_to_broker_storage now returns Result and propagates StorageError instead of silently swallowing storage failures (cubic #1) - add DeleteLeaseExpiry mutation in ack/nack replication paths to clean up orphaned lease expiry entries (cubic #3) - fix no-op leased_msg_keys.retain in recovery — now properly clears entries for the recovering queue before rebuild (cubic #4) - warn when create_group is called without broker_storage set (cubic #5) - check send_command result in watch_leader_changes — only update leading state on success so next poll retries on failure (cubic #6, #7) - trigger RecoverQueue on first-sight leader state to catch messages replicated between startup and first poll (cubic #8) - replace catch-all _ => {} with explicit variant listing in apply_to_broker_storage for compiler-enforced exhaustiveness
Summary
std::threadrunning a tight event loopSchedulerCommandenum withEnqueue,Ack,Nack,RegisterConsumer,UnregisterConsumer,Shutdownvariants usingtokio::sync::oneshotfor request-responseBrokerstruct that manages the scheduler thread lifecycle with graceful shutdown andDropsafetyBrokerConfigwith TOML deserialization and sensible defaults (server + scheduler sections)tracingsubscriber setup with JSON/pretty-print mode selectionTest plan
cargo nextest runcargo clippy -- -D warningspassescargo fmt --checkpassesSummary by cubic
Implements a single-threaded scheduler core with a Broker, structured logging, and graceful shutdown, plus a RocksDB-backed storage layer with ordered keys and atomic batches. Satisfies Story 1.4 (Broker Core & Scheduler Loop) and completes Story 1.3 (Core Domain Types & Storage), laying groundwork for enqueue/ack/nack in later stories.
New Features
Bug Fixes and Refactors
Written for commit acc9695. Summary will update on new commits.