feat: disable zmq connectivity detection for windows build#53
Conversation
WalkthroughThe pull request introduces several changes primarily focused on enhancing the handling of ZeroMQ listeners in a cross-platform context. A new conditional dependency for the Changes
Possibly related PRs
Suggested reviewers
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Outside diff range and nitpick comments (2)
src/app.rs (2)
192-205: Refactor listener initialization to reduce code duplicationThe initialization code for
mainnet_core_zmq_listenerandtestnet_core_zmq_listeneris nearly identical, differing mainly by network type and port number. Consider refactoring this into a helper function to eliminate duplication and enhance maintainability.Apply this diff to refactor the initialization:
+# Add a helper function outside the selected line ranges +fn create_core_zmq_listener( + network: Network, + address: &str, + sender: mpsc::Sender<(ZMQMessage, Network)>, + status_option: Option<tokio::sync::broadcast::Sender<ZMQConnectionStatus>>, +) -> Option<CoreZMQListener> { + #[cfg(not(target_os = "windows"))] + { + Some( + CoreZMQListener::spawn_listener( + network, + address, + sender, + status_option, + ) + .expect(&format!("Failed to create {} InstantSend listener", network)), + ) + } + #[cfg(target_os = "windows")] + { + None + } +} + -#[cfg(target_os = "windows")] -let mainnet_core_zmq_listener: Option<CoreZMQListener> = None; - -#[cfg(not(target_os = "windows"))] -let mainnet_core_zmq_listener: Option<CoreZMQListener> = { - let zmq_listener = CoreZMQListener::spawn_listener( - Network::Dash, - "tcp://127.0.0.1:23708", - core_message_sender.clone(), - Some(mainnet_app_context.sx_zmq_status.clone()), - ).expect("Failed to create mainnet InstantSend listener"); - Some(zmq_listener) -}; +let mainnet_core_zmq_listener = create_core_zmq_listener( + Network::Dash, + "tcp://127.0.0.1:23708", + core_message_sender.clone(), + Some(mainnet_app_context.sx_zmq_status.clone()), +); -#[cfg(target_os = "windows")] -let testnet_core_zmq_listener: Option<CoreZMQListener> = None; - -#[cfg(not(target_os = "windows"))] -let testnet_core_zmq_listener: Option<CoreZMQListener> = { - let zmq_listener = CoreZMQListener::spawn_listener( - Network::Testnet, - "tcp://127.0.0.1:23709", - core_message_sender, - tx_zmq_status_option, - ) - .expect("Failed to create testnet InstantSend listener"); - Some(zmq_listener) -}; +let testnet_core_zmq_listener = create_core_zmq_listener( + Network::Testnet, + "tcp://127.0.0.1:23709", + core_message_sender, + tx_zmq_status_option, +);Also applies to: 212-225
217-223: Ensure consistent cloning ofcore_message_senderwhen initializing listenersIn the initialization of
testnet_core_zmq_listener, thecore_message_senderis passed without cloning, whereas it is cloned formainnet_core_zmq_listener. To maintain consistency and avoid potential message handling issues, consider cloning the sender in both cases.Apply this diff to clone the sender:
let zmq_listener = CoreZMQListener::spawn_listener( Network::Testnet, "tcp://127.0.0.1:23709", - core_message_sender, // Use the original sender or create a new one if needed + core_message_sender.clone(), // Clone the sender for consistency tx_zmq_status_option, ) .expect("Failed to create testnet InstantSend listener");
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (4)
Cargo.toml(1 hunks)src/app.rs(2 hunks)src/components/core_zmq_listener.rs(2 hunks)src/ui/components/top_panel.rs(1 hunks)
🔇 Additional comments (5)
Cargo.toml (2)
57-58: LGTM! Platform-specific dependency configuration is correct.
The conditional ZMQ dependency is properly configured to exclude Windows builds while remaining available for other platforms.
Let's verify the Windows build configuration:
#!/bin/bash
# Description: Verify that zmq is properly excluded from Windows builds
# Expected: Should not find any unconditional zmq imports or usage
# Check for any unconditional zmq imports that might break Windows builds
rg "use\s+zmq" --type rust
# Look for cfg attributes around zmq usage
rg "#\[cfg\(not\(target_os\s*=\s*\"windows\"\)\)\]" -A 355-56: LGTM! Verify crossbeam-channel usage.
The addition of crossbeam-channel as a platform-agnostic messaging solution is appropriate.
Let's verify how crossbeam-channel is being used as an alternative to ZMQ:
✅ Verification successful
Crossbeam-channel usage verified and properly implemented
The crossbeam-channel dependency is correctly used as a platform-agnostic messaging solution, particularly for ZMQ status communication:
- Properly integrated in
src/context.rsfor ZMQ connection events viarx_zmq_status/sx_zmq_status - Correctly implemented in
CoreZMQListenerfor Windows/non-Windows compatibility - Used alongside conditional compilation (
#[cfg(not(target_os = "windows"))]) to maintain platform-specific behavior
The implementation shows a clean separation of concerns where crossbeam channels handle internal communication while ZMQ functionality is properly gated behind platform-specific flags.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Check the usage of crossbeam-channel in the codebase
# Expected: Should find usage patterns showing it's properly replacing ZMQ functionality
# Search for crossbeam usage
rg -A 5 "use crossbeam_channel"
# Search for channel creation patterns
ast-grep --pattern 'crossbeam_channel::$_'
Length of output: 1456
Script:
#!/bin/bash
# Let's search for crossbeam usage more thoroughly
rg -A 5 "use crossbeam" --type rust
# Search for Sender/Receiver types and channel creation
rg -A 5 "(Sender|Receiver|crossbeam_channel::unbounded)" --type rust
# Check AppState for channel usage
ast-grep --pattern 'struct AppState {
$$$
}'
Length of output: 9791
src/ui/components/top_panel.rs (1)
130-133: LGTM! Clean implementation of conditional ZMQ indicator.
The conditional compilation directive correctly implements the PR objective by excluding the ZMQ connection indicator on Windows builds, aligning with the broader changes to make ZMQ functionality optional on Windows.
Let's verify that the UI layout remains consistent when the connection indicator is excluded:
✅ Verification successful
The search results show that the UI layout in top_panel.rs is well-structured with consistent spacing. The connection indicator is part of a horizontal layout with proper spacing (8.0 units before the indicator), and its removal on Windows won't affect the overall layout since:
- The spacing is localized within the conditional block
- The horizontal layout continues to work correctly with or without the indicator
- No other UI elements depend on the indicator's presence
UI layout consistency verified for Windows builds
The conditional compilation cleanly removes the connection indicator without affecting the surrounding layout or spacing. The horizontal layout and other UI elements will maintain their positions and spacing regardless of the indicator's presence.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Check for any UI layout adjustments that might be needed when the indicator is hidden
# Look for spacing/padding constants or layout code that might need conditional adjustment
# Search for explicit spacing values that might need adjustment
rg -U "add_space\([0-9.]+\)" src/ui/components/
# Search for other layout-related code that might need conditional compilation
ast-grep --pattern 'ui.horizontal($$$)' src/ui/components/
Length of output: 12365
src/components/core_zmq_listener.rs (1)
14-15: LGTM: Proper conditional import for ZMQ Context
The conditional compilation directive correctly scopes the ZMQ Context import to non-Windows platforms, aligning with the PR's objective.
src/app.rs (1)
52-53:
Ensure proper handling of Option values for listeners
By changing mainnet_core_zmq_listener and testnet_core_zmq_listener to Option<CoreZMQListener>, it's crucial to verify that all usages of these fields handle the None case appropriately to prevent potential None dereferencing errors.
Run the following script to identify usages where the None case might not be handled correctly:
✅ Verification successful
Based on the search results, I can now provide a final response:
Option<CoreZMQListener> fields are properly handled
The code shows that these Optional fields are initialized correctly with proper conditional logic:
- Windows builds explicitly set both listeners to
None - Non-Windows builds initialize the listeners with proper error handling through the Option type
- No direct access to these fields without Option handling was found in the codebase
The implementation appears to be safe and correctly handles the Optional nature of these fields.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Find usages of listener fields that may not handle the `None` case properly.
# Search for direct accesses to the listener fields without proper handling.
rg -A 5 -B 2 'self\.(mainnet|testnet)_core_zmq_listener\b' src/ | rg -v 'is_some|is_none|if let|match'
Length of output: 104
Script:
#!/bin/bash
# Let's try a more comprehensive search to find all usages of these fields
rg -A 5 -B 2 '(mainnet|testnet)_core_zmq_listener' src/
Length of output: 2360
Script:
#!/bin/bash
# Also search for any struct initialization or pattern matching
ast-grep --pattern 'App { $$$ }'
Length of output: 34
Script:
#!/bin/bash
# Look for any methods that might be using these fields
ast-grep --pattern 'impl App {
$$$
}'
Length of output: 41
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Outside diff range and nitpick comments (3)
Cargo.toml (1)
57-58: Consider pinning the exact version of zmq crateThe zmq crate version is specified as "0.10" without a patch version. This could lead to unexpected behavior if a breaking change is introduced in a patch release.
-zmq = "0.10" +zmq = "0.10.0"src/components/core_zmq_listener.rs (2)
14-17: Remove unused importimage::EncodableLayoutThe import of
image::EncodableLayoutat line 17 appears to be unused in the code. Removing unnecessary imports can improve code readability and reduce compile time.
43-51: Unify message constant types across platformsThe constants
IS_LOCK_SIG_MSGandCHAIN_LOCKED_BLOCK_MSGare defined with different types based on the target OS:
- Non-Windows (
#[cfg(not(target_os = "windows"))]):&[u8; 12]- Windows (
#[cfg(target_os = "windows")]):&strThis inconsistency may lead to code duplication and complicate cross-platform logic. Consider unifying these constants to use a single type across all platforms, such as converting the strings to bytes where necessary.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (3)
Cargo.toml(1 hunks)src/app.rs(2 hunks)src/components/core_zmq_listener.rs(3 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- src/app.rs
🔇 Additional comments (2)
Cargo.toml (2)
55-56: Verify the necessity of crossbeam-channel dependency
Please confirm if crossbeam-channel is actually needed, as tokio (which is already a dependency) provides similar channel functionality through tokio::sync::mpsc.
✅ Verification successful
Based on the search results, I can generate the final response as we have clear evidence of both crossbeam-channel and tokio::sync::mpsc usage in the codebase.
Keep crossbeam-channel dependency - it's used for ZMQ communication
The crossbeam-channel dependency is necessary as it's specifically used in the ZMQ-related components:
src/context.rsuses it for ZMQ status communicationsrc/components/core_zmq_listener.rsuses it for ZMQ event handling
While tokio::sync::mpsc is indeed used in the codebase, it serves different purposes in backend tasks and is not a direct replacement for the ZMQ-related channels. The two channel implementations coexist for different use cases.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Search for crossbeam-channel usage in the codebase
echo "Checking crossbeam-channel usage:"
rg "crossbeam.*channel" -A 3
# Check if tokio channels are already in use
echo -e "\nChecking existing tokio channel usage:"
rg "tokio::sync::mpsc" -A 3
Length of output: 3812
60-61: Verify compatibility between different ZMQ implementations
The Windows-specific zeromq crate (0.4.1) is significantly older than the non-Windows zmq crate (0.10.x). This version disparity might lead to inconsistent behavior across platforms.
| #[cfg(target_os = "windows")] | ||
| pub fn spawn_listener( | ||
| network: Network, | ||
| endpoint: &str, | ||
| sender: mpsc::Sender<(ZMQMessage, Network)>, | ||
| tx_zmq_status: Option<Sender<ZMQConnectionEvent>>, | ||
| ) -> Result<Self, Box<dyn Error>> { | ||
| let should_stop = Arc::new(AtomicBool::new(false)); | ||
| let endpoint = endpoint.to_string(); | ||
| let should_stop_clone = Arc::clone(&should_stop); | ||
| let sender_clone = sender.clone(); | ||
|
|
||
| let handle = thread::spawn(move || { | ||
| // Create the runtime inside the thread. | ||
| let rt = Runtime::new().unwrap(); | ||
| rt.block_on(async move { | ||
| // Create the socket inside the async context. | ||
| let mut socket = SubSocket::new(); | ||
|
|
||
| // Connect to the endpoint | ||
| socket | ||
| .connect(&endpoint) | ||
| .await | ||
| .expect("Failed to connect"); | ||
|
|
||
| // Subscribe to the "rawtxlocksig" events. | ||
| socket | ||
| .subscribe(IS_LOCK_SIG_MSG) | ||
| .await | ||
| .expect("Failed to subscribe to rawtxlocksig"); | ||
|
|
||
| // Subscribe to the "rawchainlock" events. | ||
| socket | ||
| .subscribe(CHAIN_LOCKED_BLOCK_MSG) | ||
| .await | ||
| .expect("Failed to subscribe to rawchainlock"); | ||
|
|
||
| println!("Subscribed to ZMQ at {}", endpoint); | ||
|
|
||
| while !should_stop_clone.load(Ordering::SeqCst) { | ||
| // Receive messages | ||
| match socket.recv().await { | ||
| Ok(msg) => { | ||
| // Access frames using msg.get(n) | ||
| if let Some(topic_frame) = msg.get(0) { | ||
| let topic = String::from_utf8_lossy(topic_frame).to_string(); | ||
|
|
||
| if let Some(data_frame) = msg.get(1) { | ||
| let data_bytes = data_frame; | ||
|
|
||
| match topic.as_str() { | ||
| "rawchainlock" => { | ||
| // Deserialize the Block | ||
| let mut cursor = Cursor::new(data_bytes); | ||
| match Block::consensus_decode(&mut cursor) { | ||
| Ok(block) => { | ||
| if let Err(e) = sender_clone.send(( | ||
| ZMQMessage::ChainLockedBlock(block), | ||
| network, | ||
| )) { | ||
| eprintln!( | ||
| "Error sending data to main thread: {}", | ||
| e | ||
| ); | ||
| } | ||
| } | ||
| Err(e) => { | ||
| eprintln!( | ||
| "Error deserializing chain locked block: {}", | ||
| e | ||
| ); | ||
| } | ||
| } | ||
| } | ||
| "rawtxlocksig" => { | ||
| // Deserialize the Transaction and InstantLock | ||
| let mut cursor = Cursor::new(data_bytes); | ||
| match Transaction::consensus_decode(&mut cursor) { | ||
| Ok(tx) => { | ||
| match InstantLock::consensus_decode(&mut cursor) | ||
| { | ||
| Ok(islock) => { | ||
| if let Err(e) = sender_clone.send(( | ||
| ZMQMessage::ISLockedTransaction( | ||
| tx, islock, | ||
| ), | ||
| network, | ||
| )) { | ||
| eprintln!( | ||
| "Error sending data to main thread: {}", | ||
| e | ||
| ); | ||
| } | ||
| } | ||
| Err(e) => { | ||
| eprintln!( | ||
| "Error deserializing InstantLock: {}", | ||
| e | ||
| ); | ||
| } | ||
| } | ||
| } | ||
| Err(e) => { | ||
| eprintln!( | ||
| "Error deserializing transaction: {}", | ||
| e | ||
| ); | ||
| } | ||
| } | ||
| } | ||
| _ => { | ||
| println!("Received unknown topic: {}", topic); | ||
| } | ||
| } | ||
| } | ||
| } | ||
| } | ||
| Err(e) => { | ||
| eprintln!("Error receiving message: {}", e); | ||
| // Sleep briefly before retrying | ||
| tokio::time::sleep(Duration::from_millis(100)).await; | ||
| } | ||
| } | ||
| } | ||
|
|
||
| println!("Listener is stopping."); | ||
| // The socket will be dropped here | ||
| }); | ||
| }); | ||
|
|
||
| Ok(CoreZMQListener { | ||
| should_stop, | ||
| handle: Some(handle), | ||
| }) | ||
| } | ||
|
|
There was a problem hiding this comment.
🛠️ Refactor suggestion
Reduce code duplication in spawn_listener implementations
The spawn_listener method has separate implementations for Windows and non-Windows platforms, but much of the logic is similar. This duplication can make maintenance more challenging. Consider extracting common functionality into shared helper functions or using conditional compilation only around the specific platform-dependent sections.
Apply this refactor to reduce duplication:
-
Extract common code: Identify code blocks that are identical or very similar in both implementations and move them into shared helper functions.
-
Use conditional compilation within functions: Instead of duplicating entire functions, use
#[cfg(...)]within the function to handle platform-specific differences.
Example:
pub fn spawn_listener(
network: Network,
endpoint: &str,
sender: mpsc::Sender<(ZMQMessage, Network)>,
tx_zmq_status: Option<Sender<ZMQConnectionEvent>>,
) -> Result<Self, Box<dyn Error>> {
// Common setup code here...
#[cfg(not(target_os = "windows"))]
{
// Non-Windows specific code...
}
#[cfg(target_os = "windows")]
{
// Windows-specific code...
}
// Common code to finalize and return...
}This approach minimizes duplication and simplifies future updates.
| #[cfg(target_os = "windows")] | ||
| pub fn spawn_listener( | ||
| network: Network, | ||
| endpoint: &str, | ||
| sender: mpsc::Sender<(ZMQMessage, Network)>, | ||
| tx_zmq_status: Option<Sender<ZMQConnectionEvent>>, | ||
| ) -> Result<Self, Box<dyn Error>> { | ||
| let should_stop = Arc::new(AtomicBool::new(false)); | ||
| let endpoint = endpoint.to_string(); | ||
| let should_stop_clone = Arc::clone(&should_stop); | ||
| let sender_clone = sender.clone(); | ||
|
|
||
| let handle = thread::spawn(move || { | ||
| // Create the runtime inside the thread. | ||
| let rt = Runtime::new().unwrap(); | ||
| rt.block_on(async move { | ||
| // Create the socket inside the async context. | ||
| let mut socket = SubSocket::new(); | ||
|
|
||
| // Connect to the endpoint | ||
| socket | ||
| .connect(&endpoint) | ||
| .await | ||
| .expect("Failed to connect"); | ||
|
|
||
| // Subscribe to the "rawtxlocksig" events. | ||
| socket | ||
| .subscribe(IS_LOCK_SIG_MSG) | ||
| .await | ||
| .expect("Failed to subscribe to rawtxlocksig"); | ||
|
|
||
| // Subscribe to the "rawchainlock" events. | ||
| socket | ||
| .subscribe(CHAIN_LOCKED_BLOCK_MSG) | ||
| .await | ||
| .expect("Failed to subscribe to rawchainlock"); | ||
|
|
||
| println!("Subscribed to ZMQ at {}", endpoint); | ||
|
|
||
| while !should_stop_clone.load(Ordering::SeqCst) { | ||
| // Receive messages | ||
| match socket.recv().await { | ||
| Ok(msg) => { | ||
| // Access frames using msg.get(n) | ||
| if let Some(topic_frame) = msg.get(0) { | ||
| let topic = String::from_utf8_lossy(topic_frame).to_string(); | ||
|
|
||
| if let Some(data_frame) = msg.get(1) { | ||
| let data_bytes = data_frame; | ||
|
|
||
| match topic.as_str() { | ||
| "rawchainlock" => { | ||
| // Deserialize the Block | ||
| let mut cursor = Cursor::new(data_bytes); | ||
| match Block::consensus_decode(&mut cursor) { | ||
| Ok(block) => { | ||
| if let Err(e) = sender_clone.send(( | ||
| ZMQMessage::ChainLockedBlock(block), | ||
| network, | ||
| )) { | ||
| eprintln!( | ||
| "Error sending data to main thread: {}", | ||
| e | ||
| ); | ||
| } | ||
| } | ||
| Err(e) => { | ||
| eprintln!( | ||
| "Error deserializing chain locked block: {}", | ||
| e | ||
| ); | ||
| } | ||
| } | ||
| } | ||
| "rawtxlocksig" => { | ||
| // Deserialize the Transaction and InstantLock | ||
| let mut cursor = Cursor::new(data_bytes); | ||
| match Transaction::consensus_decode(&mut cursor) { | ||
| Ok(tx) => { | ||
| match InstantLock::consensus_decode(&mut cursor) | ||
| { | ||
| Ok(islock) => { | ||
| if let Err(e) = sender_clone.send(( | ||
| ZMQMessage::ISLockedTransaction( | ||
| tx, islock, | ||
| ), | ||
| network, | ||
| )) { | ||
| eprintln!( | ||
| "Error sending data to main thread: {}", | ||
| e | ||
| ); | ||
| } | ||
| } | ||
| Err(e) => { | ||
| eprintln!( | ||
| "Error deserializing InstantLock: {}", | ||
| e | ||
| ); | ||
| } | ||
| } | ||
| } | ||
| Err(e) => { | ||
| eprintln!( | ||
| "Error deserializing transaction: {}", | ||
| e | ||
| ); | ||
| } | ||
| } | ||
| } | ||
| _ => { | ||
| println!("Received unknown topic: {}", topic); | ||
| } | ||
| } | ||
| } | ||
| } | ||
| } | ||
| Err(e) => { | ||
| eprintln!("Error receiving message: {}", e); | ||
| // Sleep briefly before retrying | ||
| tokio::time::sleep(Duration::from_millis(100)).await; | ||
| } | ||
| } | ||
| } | ||
|
|
||
| println!("Listener is stopping."); | ||
| // The socket will be dropped here | ||
| }); | ||
| }); |
There was a problem hiding this comment.
🛠️ Refactor suggestion
Avoid creating a new Tokio runtime inside a thread
In the Windows implementation, a new tokio::runtime::Runtime is created within a spawned thread. Creating multiple runtimes or embedding a runtime inside a thread can lead to unexpected behavior and resource contention. Consider refactoring to use asynchronous functions without spawning an additional thread, or use tokio::spawn to run tasks on the existing runtime.
Apply this refactor to optimize runtime usage:
-
Option 1: If the application already uses Tokio, integrate the listener directly into the existing async context.
-
Option 2: Use
tokio::spawnto run the async task without creating a new runtime.
Example using tokio::spawn:
#[cfg(target_os = "windows")]
pub fn spawn_listener(
network: Network,
endpoint: &str,
sender: mpsc::Sender<(ZMQMessage, Network)>,
tx_zmq_status: Option<Sender<ZMQConnectionEvent>>,
) -> Result<Self, Box<dyn Error>> {
let should_stop = Arc::new(AtomicBool::new(false));
let endpoint = endpoint.to_string();
let should_stop_clone = Arc::clone(&should_stop);
let sender_clone = sender.clone();
tokio::spawn(async move {
// Async listener code here...
while !should_stop_clone.load(Ordering::SeqCst) {
// Receive and handle messages...
}
});
Ok(CoreZMQListener {
should_stop,
handle: None, // No thread handle needed
})
}This refactor simplifies the code and aligns with best practices for asynchronous Rust applications.
Disables ZMQ connectivity detection functionality for Windows build
Under Windows the native
zeromqis used whilezmqis used for any other target os.For the moment, ZMQ connectivity detection feature is available only with
zmq.Tested with:
cargo build --target x86_64-pc-windows-gnusuccessfully.Summary by CodeRabbit
New Features
Bug Fixes
Documentation
Refactor