@@ -18,7 +18,9 @@ use async_trait::async_trait;
1818use futures:: { executor, pending, pin_mut, poll, select, stream, FutureExt } ;
1919use std:: { collections:: HashMap , sync:: atomic, task:: Poll } ;
2020
21- use polkadot_node_network_protocol:: { PeerId , UnifiedReputationChange } ;
21+ use polkadot_node_network_protocol:: {
22+ peer_set:: ValidationVersion , ObservedRole , PeerId , UnifiedReputationChange ,
23+ } ;
2224use polkadot_node_primitives:: {
2325 BlockData , CollationGenerationConfig , CollationResult , DisputeMessage , InvalidDisputeVote , PoV ,
2426 UncheckedDisputeMessage , ValidDisputeVote ,
@@ -853,10 +855,14 @@ fn test_network_bridge_event<M>() -> NetworkBridgeEvent<M> {
853855 NetworkBridgeEvent :: PeerDisconnected ( PeerId :: random ( ) )
854856}
855857
856- fn test_statement_distribution_msg ( ) -> StatementDistributionMessage {
858+ fn test_statement_distribution_with_priority_msg ( ) -> StatementDistributionMessage {
857859 StatementDistributionMessage :: NetworkBridgeUpdate ( test_network_bridge_event ( ) )
858860}
859861
862+ fn test_statement_distribution_msg ( ) -> StatementDistributionMessage {
863+ StatementDistributionMessage :: Backed ( Default :: default ( ) )
864+ }
865+
860866fn test_availability_recovery_msg ( ) -> AvailabilityRecoveryMessage {
861867 let ( sender, _) = oneshot:: channel ( ) ;
862868 AvailabilityRecoveryMessage :: RecoverAvailableData (
@@ -872,6 +878,15 @@ fn test_bitfield_distribution_msg() -> BitfieldDistributionMessage {
872878 BitfieldDistributionMessage :: NetworkBridgeUpdate ( test_network_bridge_event ( ) )
873879}
874880
881+ fn test_bitfield_distribution_with_priority_msg ( ) -> BitfieldDistributionMessage {
882+ BitfieldDistributionMessage :: NetworkBridgeUpdate ( NetworkBridgeEvent :: PeerConnected (
883+ PeerId :: random ( ) ,
884+ ObservedRole :: Authority ,
885+ ValidationVersion :: V3 . into ( ) ,
886+ None ,
887+ ) )
888+ }
889+
875890fn test_provisioner_msg ( ) -> ProvisionerMessage {
876891 let ( sender, _) = oneshot:: channel ( ) ;
877892 ProvisionerMessage :: RequestInherentData ( Default :: default ( ) , sender)
@@ -912,11 +927,25 @@ fn test_approval_voting_msg() -> ApprovalVotingMessage {
912927 ApprovalVotingMessage :: ApprovedAncestor ( Default :: default ( ) , 0 , sender)
913928}
914929
930+ fn test_approval_voting_parallel_with_priority_msg ( ) -> ApprovalVotingParallelMessage {
931+ let ( sender, _) = oneshot:: channel ( ) ;
932+ ApprovalVotingParallelMessage :: ApprovedAncestor ( Default :: default ( ) , 0 , sender)
933+ }
934+
915935fn test_dispute_coordinator_msg ( ) -> DisputeCoordinatorMessage {
916936 let ( sender, _) = oneshot:: channel ( ) ;
917937 DisputeCoordinatorMessage :: RecentDisputes ( sender)
918938}
919939
940+ fn test_dispute_coordinator_msg_with_priority ( ) -> DisputeCoordinatorMessage {
941+ let ( sender, _) = oneshot:: channel ( ) ;
942+ DisputeCoordinatorMessage :: DetermineUndisputedChain {
943+ base : Default :: default ( ) ,
944+ block_descriptions : Default :: default ( ) ,
945+ tx : sender,
946+ }
947+ }
948+
920949fn test_dispute_distribution_msg ( ) -> DisputeDistributionMessage {
921950 let dummy_dispute_message = UncheckedDisputeMessage {
922951 candidate_receipt : dummy_candidate_receipt_v2 ( dummy_hash ( ) ) ,
@@ -1238,3 +1267,248 @@ fn context_holds_onto_message_until_enough_signals_received() {
12381267
12391268 futures:: executor:: block_on ( test_fut) ;
12401269}
1270+
1271+ // A subsystem that simulates a slow subsystem, processing messages at a rate of one per second.
1272+ // We will use this to test the prioritization of messages in the subsystems generated by orchestra.
1273+ #[ derive( Clone ) ]
1274+ struct SlowSubsystem {
1275+ num_normal_msgs_received : Arc < atomic:: AtomicUsize > ,
1276+ num_prio_msgs_received : Arc < atomic:: AtomicUsize > ,
1277+ }
1278+
1279+ impl SlowSubsystem {
1280+ fn new (
1281+ msgs_received : Arc < atomic:: AtomicUsize > ,
1282+ prio_msgs_received : Arc < atomic:: AtomicUsize > ,
1283+ ) -> Self {
1284+ Self { num_normal_msgs_received : msgs_received, num_prio_msgs_received : prio_msgs_received }
1285+ }
1286+ }
1287+
1288+ // Trait to determine if a message is a priority message or not, it is by the SlowSubsystem
1289+ // to determine if it should count the message as a priority message or not.
1290+ trait IsPrioMessage {
1291+ // Tells if the message is a priority message.
1292+ fn is_prio ( & self ) -> bool {
1293+ // By default, messages are not priority messages.
1294+ false
1295+ }
1296+ }
1297+
1298+ // Implement the IsPrioMessage trait for all message types.
1299+ impl IsPrioMessage for CandidateValidationMessage { }
1300+ impl IsPrioMessage for CandidateBackingMessage { }
1301+ impl IsPrioMessage for ChainApiMessage { }
1302+ impl IsPrioMessage for CollationGenerationMessage { }
1303+ impl IsPrioMessage for CollatorProtocolMessage { }
1304+ impl IsPrioMessage for StatementDistributionMessage {
1305+ fn is_prio ( & self ) -> bool {
1306+ matches ! ( self , StatementDistributionMessage :: NetworkBridgeUpdate ( _) )
1307+ }
1308+ }
1309+ impl IsPrioMessage for ApprovalDistributionMessage { }
1310+ impl IsPrioMessage for ApprovalVotingMessage { }
1311+ impl IsPrioMessage for ApprovalVotingParallelMessage {
1312+ fn is_prio ( & self ) -> bool {
1313+ matches ! ( self , ApprovalVotingParallelMessage :: ApprovedAncestor ( _, _, _) )
1314+ }
1315+ }
1316+ impl IsPrioMessage for AvailabilityDistributionMessage { }
1317+ impl IsPrioMessage for AvailabilityRecoveryMessage { }
1318+ impl IsPrioMessage for AvailabilityStoreMessage { }
1319+ impl IsPrioMessage for BitfieldDistributionMessage {
1320+ fn is_prio ( & self ) -> bool {
1321+ matches ! (
1322+ self ,
1323+ BitfieldDistributionMessage :: NetworkBridgeUpdate ( NetworkBridgeEvent :: PeerConnected (
1324+ _,
1325+ _,
1326+ _,
1327+ _
1328+ ) , )
1329+ )
1330+ }
1331+ }
1332+ impl IsPrioMessage for ChainSelectionMessage { }
1333+ impl IsPrioMessage for DisputeCoordinatorMessage {
1334+ fn is_prio ( & self ) -> bool {
1335+ matches ! ( self , DisputeCoordinatorMessage :: DetermineUndisputedChain { .. } )
1336+ }
1337+ }
1338+ impl IsPrioMessage for DisputeDistributionMessage { }
1339+ impl IsPrioMessage for GossipSupportMessage { }
1340+ impl IsPrioMessage for NetworkBridgeRxMessage { }
1341+ impl IsPrioMessage for NetworkBridgeTxMessage { }
1342+ impl IsPrioMessage for ProspectiveParachainsMessage { }
1343+ impl IsPrioMessage for ProvisionerMessage { }
1344+ impl IsPrioMessage for RuntimeApiMessage { }
1345+ impl IsPrioMessage for BitfieldSigningMessage { }
1346+ impl IsPrioMessage for PvfCheckerMessage { }
1347+
1348+ impl < C , M > Subsystem < C , SubsystemError > for SlowSubsystem
1349+ where
1350+ C : overseer:: SubsystemContext < Message = M , Signal = OverseerSignal > ,
1351+ M : Send + IsPrioMessage ,
1352+ {
1353+ fn start ( self , mut ctx : C ) -> SpawnedSubsystem {
1354+ SpawnedSubsystem {
1355+ name : "counter-subsystem" ,
1356+ future : Box :: pin ( async move {
1357+ loop {
1358+ // Simulate a slow processing subsystem to give time for both priority and
1359+ // normal messages to accumulate.
1360+ Delay :: new ( Duration :: from_secs ( 1 ) ) . await ;
1361+ match ctx. try_recv ( ) . await {
1362+ Ok ( Some ( FromOrchestra :: Signal ( OverseerSignal :: Conclude ) ) ) => break ,
1363+ Ok ( Some ( FromOrchestra :: Signal ( _) ) ) => continue ,
1364+ Ok ( Some ( FromOrchestra :: Communication { msg } ) ) => {
1365+ if msg. is_prio ( ) {
1366+ self . num_prio_msgs_received . fetch_add ( 1 , atomic:: Ordering :: SeqCst ) ;
1367+ } else {
1368+ self . num_normal_msgs_received
1369+ . fetch_add ( 1 , atomic:: Ordering :: SeqCst ) ;
1370+ }
1371+ continue
1372+ } ,
1373+ Err ( _) => ( ) ,
1374+ _ => ( ) ,
1375+ }
1376+ pending ! ( ) ;
1377+ }
1378+
1379+ Ok ( ( ) )
1380+ } ) ,
1381+ }
1382+ }
1383+ }
1384+
1385+ #[ test]
1386+ fn overseer_all_subsystems_can_receive_their_priority_messages ( ) {
1387+ const NUM_NORMAL_MESSAGES : usize = 10 ;
1388+ const NUM_PRIORITY_MESSAGES : usize = 4 ;
1389+ overseer_check_subsystem_can_receive_their_priority_messages (
1390+ ( 0 ..NUM_NORMAL_MESSAGES )
1391+ . map ( |_| AllMessages :: DisputeCoordinator ( test_dispute_coordinator_msg ( ) ) )
1392+ . collect ( ) ,
1393+ ( 0 ..NUM_PRIORITY_MESSAGES )
1394+ . map ( |_| AllMessages :: DisputeCoordinator ( test_dispute_coordinator_msg_with_priority ( ) ) )
1395+ . collect ( ) ,
1396+ ) ;
1397+
1398+ overseer_check_subsystem_can_receive_their_priority_messages (
1399+ ( 0 ..NUM_NORMAL_MESSAGES )
1400+ . map ( |_| AllMessages :: ApprovalVotingParallel ( test_approval_distribution_msg ( ) . into ( ) ) )
1401+ . collect ( ) ,
1402+ ( 0 ..NUM_PRIORITY_MESSAGES )
1403+ . map ( |_| {
1404+ AllMessages :: ApprovalVotingParallel (
1405+ test_approval_voting_parallel_with_priority_msg ( ) ,
1406+ )
1407+ } )
1408+ . collect ( ) ,
1409+ ) ;
1410+
1411+ overseer_check_subsystem_can_receive_their_priority_messages (
1412+ ( 0 ..NUM_NORMAL_MESSAGES )
1413+ . map ( |_| AllMessages :: StatementDistribution ( test_statement_distribution_msg ( ) ) )
1414+ . collect ( ) ,
1415+ ( 0 ..NUM_PRIORITY_MESSAGES )
1416+ . map ( |_| {
1417+ AllMessages :: StatementDistribution ( test_statement_distribution_with_priority_msg ( ) )
1418+ } )
1419+ . collect ( ) ,
1420+ ) ;
1421+
1422+ overseer_check_subsystem_can_receive_their_priority_messages (
1423+ ( 0 ..NUM_NORMAL_MESSAGES )
1424+ . map ( |_| AllMessages :: BitfieldDistribution ( test_bitfield_distribution_msg ( ) ) )
1425+ . collect ( ) ,
1426+ ( 0 ..NUM_PRIORITY_MESSAGES )
1427+ . map ( |_| {
1428+ AllMessages :: BitfieldDistribution ( test_bitfield_distribution_with_priority_msg ( ) )
1429+ } )
1430+ . collect ( ) ,
1431+ ) ;
1432+ }
1433+
1434+ // Test that when subsystem processes messages slow, the priority messages are processed before
1435+ // the normal messages. This is important to ensure that the subsytem can handle priority messages.
1436+ fn overseer_check_subsystem_can_receive_their_priority_messages (
1437+ normal_msgs : Vec < AllMessages > ,
1438+ prio_msgs : Vec < AllMessages > ,
1439+ ) {
1440+ let num_normal_messages = normal_msgs. len ( ) ;
1441+ let num_prio_messages: usize = prio_msgs. len ( ) ;
1442+ let spawner = sp_core:: testing:: TaskExecutor :: new ( ) ;
1443+ executor:: block_on ( async move {
1444+ let msgs_received = Arc :: new ( atomic:: AtomicUsize :: new ( 0 ) ) ;
1445+ let prio_msgs_received = Arc :: new ( atomic:: AtomicUsize :: new ( 0 ) ) ;
1446+
1447+ let subsystem = SlowSubsystem :: new ( msgs_received. clone ( ) , prio_msgs_received. clone ( ) ) ;
1448+
1449+ let ( overseer, handle) =
1450+ one_for_all_overseer_builder ( spawner, MockSupportsParachains , subsystem, None )
1451+ . unwrap ( )
1452+ . build ( )
1453+ . unwrap ( ) ;
1454+
1455+ let mut handle = Handle :: new ( handle) ;
1456+ let overseer_fut = overseer. run_inner ( ) . fuse ( ) ;
1457+
1458+ pin_mut ! ( overseer_fut) ;
1459+
1460+ // send a signal to each subsystem
1461+ let unpin_handle = dummy_unpin_handle ( dummy_hash ( ) ) ;
1462+ handle
1463+ . block_imported ( BlockInfo {
1464+ hash : Default :: default ( ) ,
1465+ parent_hash : Default :: default ( ) ,
1466+ number : Default :: default ( ) ,
1467+ unpin_handle : unpin_handle. clone ( ) ,
1468+ } )
1469+ . await ;
1470+
1471+ // Send normal messages first, they are processed 1 per second by the SlowSubsystem, so they
1472+ // should accumulated in the queue.
1473+ for msg in normal_msgs {
1474+ handle. send_msg_anon ( msg) . await ;
1475+ }
1476+
1477+ // Send priority messages.
1478+ for msg in prio_msgs {
1479+ handle. send_msg_with_priority ( msg, "test" , PriorityLevel :: High ) . await ;
1480+ }
1481+
1482+ loop {
1483+ match ( & mut overseer_fut) . timeout ( Duration :: from_millis ( 100 ) ) . await {
1484+ None => {
1485+ let normal_msgs: usize = msgs_received. load ( atomic:: Ordering :: SeqCst ) ;
1486+ let prio_msgs: usize = prio_msgs_received. load ( atomic:: Ordering :: SeqCst ) ;
1487+
1488+ assert ! (
1489+ prio_msgs == num_prio_messages || normal_msgs < num_normal_messages,
1490+ "we should not receive all normal messages before the prio message"
1491+ ) ;
1492+
1493+ assert ! (
1494+ normal_msgs <= num_normal_messages && prio_msgs <= num_prio_messages,
1495+ "too many messages received"
1496+ ) ;
1497+
1498+ if normal_msgs < num_normal_messages || prio_msgs < num_prio_messages {
1499+ Delay :: new ( Duration :: from_millis ( 100 ) ) . await ;
1500+ } else {
1501+ break ;
1502+ }
1503+ } ,
1504+ Some ( _) => panic ! ( "exited too early" ) ,
1505+ }
1506+ }
1507+
1508+ // send a stop signal to each subsystems
1509+ handle. stop ( ) . await ;
1510+
1511+ let res = overseer_fut. await ;
1512+ assert ! ( res. is_ok( ) ) ;
1513+ } ) ;
1514+ }
0 commit comments