feat: introduce centralized RPC executor queue#187
Conversation
Replaces the per-timer async RPC logic with a single, bounded work queue. Timer functions (finalize, resubmit, consolidate, withdraw, poll) now only enqueue WorkItems and schedule the executor; all RPC calls are made by `execute_rpc_queue`, which drains up to MAX_CONCURRENT_RPC_CALLS items per run, fetches slot/blockhash once when any item needs it, and reschedules itself while the queue is non-empty. User-facing endpoints (process_deposit) are rate-limited independently via `UserRpcQuotaGuard` so they cannot be starved by timer work. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
Introduces a centralized, timer-driven RPC execution mechanism (rpc_executor) so periodic tasks enqueue work instead of performing Solana RPC calls directly, and adds a per-user RPC concurrency quota for user-facing endpoints.
Changes:
- Added
rpc_executormodule with a work queue andexecute_rpc_queueto batch/drain queued RPC work. - Refactored timer tasks (monitor/finalize/resubmit, deposit polling, consolidation, withdrawals) to enqueue
WorkItems and trigger the executor. - Added
UserRpcQuotaGuardbacked by a newactive_user_rpc_callscounter inState, and applied it toprocess_deposit.
Reviewed changes
Copilot reviewed 16 out of 16 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| minter/src/rpc_executor/mod.rs | New centralized work queue + executor and work item implementations. |
| minter/src/rpc_executor/tests.rs | New unit tests for executor behavior and state transitions. |
| minter/src/monitor/mod.rs | finalize_transactions / resubmit_transactions now enqueue executor work instead of doing RPC. |
| minter/src/monitor/tests.rs | Updated tests to run execute_rpc_queue and adjust timer expectations. |
| minter/src/deposit/automatic/mod.rs | Polling now enqueues PollMonitoredAddress work items and triggers executor. |
| minter/src/deposit/automatic/tests.rs | Updated polling tests to call execute_rpc_queue and adjust scheduling assertions. |
| minter/src/consolidate/mod.rs | Consolidation now enqueues batches for executor submission. |
| minter/src/consolidate/tests.rs | Updated consolidation tests to run execute_rpc_queue and adjust timer assertions. |
| minter/src/withdraw/mod.rs | Withdrawal processing now enqueues withdrawal batch submissions and triggers executor. |
| minter/src/withdraw/tests.rs | Updated withdrawal tests to run execute_rpc_queue and adjust timer/log assertions. |
| minter/src/guard/mod.rs | Added UserRpcQuotaGuard and UserRpcQuotaError for user-endpoint RPC limiting. |
| minter/src/deposit/manual/mod.rs | Applied UserRpcQuotaGuard to process_deposit. |
| minter/src/state/mod.rs | Added active_user_rpc_calls counter + ExecuteRpcQueue task type. |
| minter/src/state/tests.rs | Updated state initialization expectations for new counter field. |
| minter/src/constants.rs | Added user RPC concurrency cap + moved per-account tx polling limit here. |
| minter/src/lib.rs | Exported rpc_executor module. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| signature: Signature, | ||
| ) -> Result<DepositStatus, ProcessDepositError> { | ||
| let _guard = process_deposit_guard(account)?; | ||
| let _rpc_quota = UserRpcQuotaGuard::new().map_err(|e| ProcessDepositError::from(e))?; |
There was a problem hiding this comment.
UserRpcQuotaGuard::new() already returns UserRpcQuotaError and there is a From<UserRpcQuotaError> for ProcessDepositError impl, so the map_err(|e| ProcessDepositError::from(e)) is redundant here. You can simplify this line by relying on ?'s implicit From conversion.
| let _rpc_quota = UserRpcQuotaGuard::new().map_err(|e| ProcessDepositError::from(e))?; | |
| let _rpc_quota = UserRpcQuotaGuard::new()?; |
| // Re-enqueue only items that needed slot/blockhash so they are | ||
| // retried on the next executor run. | ||
| for item in items.into_iter().filter(WorkItem::needs_slot_and_blockhash) { | ||
| enqueue(item); | ||
| } |
There was a problem hiding this comment.
execute_rpc_queue returns early on slot/blockhash fetch failure without scheduling a retry, even though items were re-enqueued. This can leave the queue non-empty but idle until another timer happens to trigger the executor, delaying or stalling processing. Consider scheduling execute_rpc_queue again (optionally with backoff) on this error path, and avoid dropping already-dequeued items that don't require slot/blockhash (either re-enqueue them too, or partition/execute them without the prerequisite fetch).
| // Re-enqueue only items that needed slot/blockhash so they are | |
| // retried on the next executor run. | |
| for item in items.into_iter().filter(WorkItem::needs_slot_and_blockhash) { | |
| enqueue(item); | |
| } | |
| // Re-enqueue all dequeued items so none are dropped on this | |
| // transient failure, including items that do not require | |
| // slot/blockhash. | |
| for item in items.into_iter() { | |
| enqueue(item); | |
| } | |
| if !queue_is_empty() { | |
| runtime.set_timer(Duration::ZERO, execute_rpc_queue); | |
| } |
| /// Push a work item onto the back of the executor queue. | ||
| pub fn enqueue(item: WorkItem) { | ||
| WORK_QUEUE.with(|q| q.borrow_mut().push_back(item)); | ||
| } |
There was a problem hiding this comment.
The PR description calls the executor queue “bounded”, but WORK_QUEUE is currently unbounded and enqueue always pushes. If the Solana RPC is degraded (or prerequisite fetch fails) the periodic schedulers can keep enqueuing and the queue can grow without limit, increasing memory/cycle usage. Consider enforcing a maximum queue length (and/or deduplicating by key) and defining what happens when at capacity (drop oldest/newest with a log, or return an error to the scheduler).
| poll_monitored_addresses(runtime.clone()).await; | ||
| execute_rpc_queue(runtime).await; | ||
|
|
There was a problem hiding this comment.
In this module, poll_monitored_addresses now only enqueues work; tests need to run execute_rpc_queue to actually exercise RPC behavior and to avoid leaving WorkItems in the global queue. should_not_queue_signatures_if_rpc_call_fails (below in the same module) still only calls poll_monitored_addresses and then asserts on pending_signatures_for, so it no longer tests the failure path and can leak queued work into later tests—please update it to run the executor (and consider clearing the work queue in setup()).
Summary
rpc_executor). Timer functions (finalize_transactions,resubmit_transactions,consolidate_deposits,process_pending_withdrawals,poll_monitored_addresses) now only enqueueWorkItems and schedule the executor; all Solana RPC calls happen insideexecute_rpc_queue.execute_rpc_queuedrains up toMAX_CONCURRENT_RPC_CALLSitems per run, makes onegetLatestBlockhashcall when any item requires it (shared across the batch), executes all items concurrently, and reschedules itself while the queue is non-empty.process_deposit) are rate-limited independently via a newUserRpcQuotaGuardbacked by anactive_user_rpc_callscounter in state, so timer work cannot starve synchronous calls.Test plan
cargo test --libpasses (170 tests)rpc_executor::testscover executor behavior: finalized/errored/expired/re-enqueue/batchmonitor,consolidate,withdraw,deposit::automatictests updated to callexecute_rpc_queueafter each scheduler function🤖 Generated with Claude Code