Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions polkadot/node/overseer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -622,7 +622,7 @@ pub struct Overseer<SupportsParachains> {
ApprovalVotingMessage,
ApprovalDistributionMessage,
ApprovalVotingParallelMessage,
])]
], can_receive_priority_messages)]
approval_voting_parallel: ApprovalVotingParallel,
#[subsystem(GossipSupportMessage, sends: [
NetworkBridgeTxMessage,
Expand All @@ -643,7 +643,7 @@ pub struct Overseer<SupportsParachains> {
AvailabilityRecoveryMessage,
ChainSelectionMessage,
ApprovalVotingParallelMessage,
])]
], can_receive_priority_messages)]
dispute_coordinator: DisputeCoordinator,

#[subsystem(DisputeDistributionMessage, sends: [
Expand Down
278 changes: 276 additions & 2 deletions polkadot/node/overseer/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ use async_trait::async_trait;
use futures::{executor, pending, pin_mut, poll, select, stream, FutureExt};
use std::{collections::HashMap, sync::atomic, task::Poll};

use polkadot_node_network_protocol::{PeerId, UnifiedReputationChange};
use polkadot_node_network_protocol::{
peer_set::ValidationVersion, ObservedRole, PeerId, UnifiedReputationChange,
};
use polkadot_node_primitives::{
BlockData, CollationGenerationConfig, CollationResult, DisputeMessage, InvalidDisputeVote, PoV,
UncheckedDisputeMessage, ValidDisputeVote,
Expand Down Expand Up @@ -853,10 +855,14 @@ fn test_network_bridge_event<M>() -> NetworkBridgeEvent<M> {
NetworkBridgeEvent::PeerDisconnected(PeerId::random())
}

fn test_statement_distribution_msg() -> StatementDistributionMessage {
fn test_statement_distribution_with_priority_msg() -> StatementDistributionMessage {
StatementDistributionMessage::NetworkBridgeUpdate(test_network_bridge_event())
}

fn test_statement_distribution_msg() -> StatementDistributionMessage {
StatementDistributionMessage::Backed(Default::default())
}

fn test_availability_recovery_msg() -> AvailabilityRecoveryMessage {
let (sender, _) = oneshot::channel();
AvailabilityRecoveryMessage::RecoverAvailableData(
Expand All @@ -872,6 +878,15 @@ fn test_bitfield_distribution_msg() -> BitfieldDistributionMessage {
BitfieldDistributionMessage::NetworkBridgeUpdate(test_network_bridge_event())
}

fn test_bitfield_distribution_with_priority_msg() -> BitfieldDistributionMessage {
BitfieldDistributionMessage::NetworkBridgeUpdate(NetworkBridgeEvent::PeerConnected(
PeerId::random(),
ObservedRole::Authority,
ValidationVersion::V3.into(),
None,
))
}

fn test_provisioner_msg() -> ProvisionerMessage {
let (sender, _) = oneshot::channel();
ProvisionerMessage::RequestInherentData(Default::default(), sender)
Expand Down Expand Up @@ -912,11 +927,25 @@ fn test_approval_voting_msg() -> ApprovalVotingMessage {
ApprovalVotingMessage::ApprovedAncestor(Default::default(), 0, sender)
}

fn test_approval_voting_parallel_with_priority_msg() -> ApprovalVotingParallelMessage {
let (sender, _) = oneshot::channel();
ApprovalVotingParallelMessage::ApprovedAncestor(Default::default(), 0, sender)
}

fn test_dispute_coordinator_msg() -> DisputeCoordinatorMessage {
let (sender, _) = oneshot::channel();
DisputeCoordinatorMessage::RecentDisputes(sender)
}

fn test_dispute_coordinator_msg_with_priority() -> DisputeCoordinatorMessage {
let (sender, _) = oneshot::channel();
DisputeCoordinatorMessage::DetermineUndisputedChain {
base: Default::default(),
block_descriptions: Default::default(),
tx: sender,
}
}

fn test_dispute_distribution_msg() -> DisputeDistributionMessage {
let dummy_dispute_message = UncheckedDisputeMessage {
candidate_receipt: dummy_candidate_receipt_v2(dummy_hash()),
Expand Down Expand Up @@ -1238,3 +1267,248 @@ fn context_holds_onto_message_until_enough_signals_received() {

futures::executor::block_on(test_fut);
}

// A subsystem that simulates a slow subsystem, processing messages at a rate of one per second.
// We will use this to test the prioritization of messages in the subsystems generated by orchestra.
#[derive(Clone)]
struct SlowSubsystem {
num_normal_msgs_received: Arc<atomic::AtomicUsize>,
num_prio_msgs_received: Arc<atomic::AtomicUsize>,
}

impl SlowSubsystem {
fn new(
msgs_received: Arc<atomic::AtomicUsize>,
prio_msgs_received: Arc<atomic::AtomicUsize>,
) -> Self {
Self { num_normal_msgs_received: msgs_received, num_prio_msgs_received: prio_msgs_received }
}
}

// Trait to determine if a message is a priority message or not, it is by the SlowSubsystem
// to determine if it should count the message as a priority message or not.
trait IsPrioMessage {
// Tells if the message is a priority message.
fn is_prio(&self) -> bool {
// By default, messages are not priority messages.
false
}
}

// Implement the IsPrioMessage trait for all message types.
impl IsPrioMessage for CandidateValidationMessage {}
impl IsPrioMessage for CandidateBackingMessage {}
impl IsPrioMessage for ChainApiMessage {}
impl IsPrioMessage for CollationGenerationMessage {}
impl IsPrioMessage for CollatorProtocolMessage {}
impl IsPrioMessage for StatementDistributionMessage {
fn is_prio(&self) -> bool {
matches!(self, StatementDistributionMessage::NetworkBridgeUpdate(_))
}
}
impl IsPrioMessage for ApprovalDistributionMessage {}
impl IsPrioMessage for ApprovalVotingMessage {}
impl IsPrioMessage for ApprovalVotingParallelMessage {
fn is_prio(&self) -> bool {
matches!(self, ApprovalVotingParallelMessage::ApprovedAncestor(_, _, _))
}
}
impl IsPrioMessage for AvailabilityDistributionMessage {}
impl IsPrioMessage for AvailabilityRecoveryMessage {}
impl IsPrioMessage for AvailabilityStoreMessage {}
impl IsPrioMessage for BitfieldDistributionMessage {
fn is_prio(&self) -> bool {
matches!(
self,
BitfieldDistributionMessage::NetworkBridgeUpdate(NetworkBridgeEvent::PeerConnected(
_,
_,
_,
_
),)
)
}
}
impl IsPrioMessage for ChainSelectionMessage {}
impl IsPrioMessage for DisputeCoordinatorMessage {
fn is_prio(&self) -> bool {
matches!(self, DisputeCoordinatorMessage::DetermineUndisputedChain { .. })
}
}
impl IsPrioMessage for DisputeDistributionMessage {}
impl IsPrioMessage for GossipSupportMessage {}
impl IsPrioMessage for NetworkBridgeRxMessage {}
impl IsPrioMessage for NetworkBridgeTxMessage {}
impl IsPrioMessage for ProspectiveParachainsMessage {}
impl IsPrioMessage for ProvisionerMessage {}
impl IsPrioMessage for RuntimeApiMessage {}
impl IsPrioMessage for BitfieldSigningMessage {}
impl IsPrioMessage for PvfCheckerMessage {}

impl<C, M> Subsystem<C, SubsystemError> for SlowSubsystem
where
C: overseer::SubsystemContext<Message = M, Signal = OverseerSignal>,
M: Send + IsPrioMessage,
{
fn start(self, mut ctx: C) -> SpawnedSubsystem {
SpawnedSubsystem {
name: "counter-subsystem",
future: Box::pin(async move {
loop {
// Simulate a slow processing subsystem to give time for both priority and
// normal messages to accumulate.
Delay::new(Duration::from_secs(1)).await;
match ctx.try_recv().await {
Ok(Some(FromOrchestra::Signal(OverseerSignal::Conclude))) => break,
Ok(Some(FromOrchestra::Signal(_))) => continue,
Ok(Some(FromOrchestra::Communication { msg })) => {
if msg.is_prio() {
self.num_prio_msgs_received.fetch_add(1, atomic::Ordering::SeqCst);
} else {
self.num_normal_msgs_received
.fetch_add(1, atomic::Ordering::SeqCst);
}
continue
},
Err(_) => (),
_ => (),
}
pending!();
}

Ok(())
}),
}
}
}

#[test]
fn overseer_all_subsystems_can_receive_their_priority_messages() {
const NUM_NORMAL_MESSAGES: usize = 10;
const NUM_PRIORITY_MESSAGES: usize = 4;
overseer_check_subsystem_can_receive_their_priority_messages(
(0..NUM_NORMAL_MESSAGES)
.map(|_| AllMessages::DisputeCoordinator(test_dispute_coordinator_msg()))
.collect(),
(0..NUM_PRIORITY_MESSAGES)
.map(|_| AllMessages::DisputeCoordinator(test_dispute_coordinator_msg_with_priority()))
.collect(),
);

overseer_check_subsystem_can_receive_their_priority_messages(
(0..NUM_NORMAL_MESSAGES)
.map(|_| AllMessages::ApprovalVotingParallel(test_approval_distribution_msg().into()))
.collect(),
(0..NUM_PRIORITY_MESSAGES)
.map(|_| {
AllMessages::ApprovalVotingParallel(
test_approval_voting_parallel_with_priority_msg(),
)
})
.collect(),
);

overseer_check_subsystem_can_receive_their_priority_messages(
(0..NUM_NORMAL_MESSAGES)
.map(|_| AllMessages::StatementDistribution(test_statement_distribution_msg()))
.collect(),
(0..NUM_PRIORITY_MESSAGES)
.map(|_| {
AllMessages::StatementDistribution(test_statement_distribution_with_priority_msg())
})
.collect(),
);

overseer_check_subsystem_can_receive_their_priority_messages(
(0..NUM_NORMAL_MESSAGES)
.map(|_| AllMessages::BitfieldDistribution(test_bitfield_distribution_msg()))
.collect(),
(0..NUM_PRIORITY_MESSAGES)
.map(|_| {
AllMessages::BitfieldDistribution(test_bitfield_distribution_with_priority_msg())
})
.collect(),
);
}

// Test that when subsystem processes messages slow, the priority messages are processed before
// the normal messages. This is important to ensure that the subsytem can handle priority messages.
fn overseer_check_subsystem_can_receive_their_priority_messages(
normal_msgs: Vec<AllMessages>,
prio_msgs: Vec<AllMessages>,
) {
let num_normal_messages = normal_msgs.len();
let num_prio_messages: usize = prio_msgs.len();
let spawner = sp_core::testing::TaskExecutor::new();
executor::block_on(async move {
let msgs_received = Arc::new(atomic::AtomicUsize::new(0));
let prio_msgs_received = Arc::new(atomic::AtomicUsize::new(0));

let subsystem = SlowSubsystem::new(msgs_received.clone(), prio_msgs_received.clone());

let (overseer, handle) =
one_for_all_overseer_builder(spawner, MockSupportsParachains, subsystem, None)
.unwrap()
.build()
.unwrap();

let mut handle = Handle::new(handle);
let overseer_fut = overseer.run_inner().fuse();

pin_mut!(overseer_fut);

// send a signal to each subsystem
let unpin_handle = dummy_unpin_handle(dummy_hash());
handle
.block_imported(BlockInfo {
hash: Default::default(),
parent_hash: Default::default(),
number: Default::default(),
unpin_handle: unpin_handle.clone(),
})
.await;

// Send normal messages first, they are processed 1 per second by the SlowSubsystem, so they
// should accumulated in the queue.
for msg in normal_msgs {
handle.send_msg_anon(msg).await;
}

// Send priority messages.
for msg in prio_msgs {
handle.send_msg_with_priority(msg, "test", PriorityLevel::High).await;
}

loop {
match (&mut overseer_fut).timeout(Duration::from_millis(100)).await {
None => {
let normal_msgs: usize = msgs_received.load(atomic::Ordering::SeqCst);
let prio_msgs: usize = prio_msgs_received.load(atomic::Ordering::SeqCst);

assert!(
prio_msgs == num_prio_messages || normal_msgs < num_normal_messages,
"we should not receive all normal messages before the prio message"
);

assert!(
normal_msgs <= num_normal_messages && prio_msgs <= num_prio_messages,
"too many messages received"
);

if normal_msgs < num_normal_messages || prio_msgs < num_prio_messages {
Delay::new(Duration::from_millis(100)).await;
} else {
break;
}
},
Some(_) => panic!("exited too early"),
}
}

// send a stop signal to each subsystems
handle.stop().await;

let res = overseer_fut.await;
assert!(res.is_ok());
});
}
2 changes: 2 additions & 0 deletions polkadot/node/service/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,8 @@ async fn test_skeleton(
) => {
tx.send(undisputed_chain.unwrap_or((target_block_number, target_block_hash))).unwrap();
});
// Check that ApprovedAncestor and DetermineUndisputedChain are sent with high priority.
assert_eq!(virtual_overseer.message_counter.with_high_priority(), 2);
}

/// Straight forward test case, where the test is not
Expand Down
16 changes: 16 additions & 0 deletions prdoc/pr_8948.prdoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
title: make sure dispute_coordinator/approval-voting parallel can receive priority messages
doc:
- audience: Node Dev
description: |-
https://github.com/paritytech/polkadot-sdk/pull/8834, changed relay_chain_selection to send priority messages, but did not configured
the subsystems to tell they can receive priority messages, with `can_receive_priority_messages` flag.

If `can_receive_priority_messages` is not specified orchestra falls back when sending a priority message to the normal queue,
so this resulted in the messages not being processed ahead of the others in the queue.

Fix this configuration mistake and add a test to make sure priority messages are consumed ahead of normal ones by the subsystems.
crates:
- name: polkadot-overseer
bump: patch
- name: polkadot-service
bump: patch