1111
1212def add_cluster (root , suffix , num_hosts ):
1313 cluster = root .add_netzone_star (f"cluster{ suffix } " )
14- cluster .set_gateway (cluster .add_router (f"router { suffix } -router" ))
14+ cluster .set_gateway (cluster .add_router (f"cluster { suffix } -router" ))
1515 backbone = cluster .add_link (f"backbone{ suffix } " , "100Gbps" ).set_latency ("100us" )
1616 for h in range (num_hosts ):
1717 host = cluster .add_host (f"host-{ h } { suffix } " , "1Gf" )
@@ -29,6 +29,7 @@ def setup_platform():
2929 internet = root .add_link ("internet" , "500Mbps" ).set_latency ("1ms" )
3030 prod_cluster = add_cluster (root , ".prod" , 16 )
3131 cons_cluster = add_cluster (root , ".cons" , 4 )
32+ root .add_route (prod_cluster , cons_cluster , [internet ])
3233 root .seal ()
3334
3435 DTL .create ()
@@ -40,7 +41,7 @@ def run_test_single_pub_single_sub_same_cluster():
4041 def pub_test_actor ():
4142 dtl = DTL .connect ()
4243 stream = dtl .add_stream ("my-output" ).set_engine_type (DTLEngine .Type .Staging ).set_transport_method (Transport .Method .MQ )
43- this_actor .info ("Create a 2D-array variable with 20kx20k double" );
44+ this_actor .info ("Create a 2D-array variable with 20kx20k double" )
4445 var = stream .define_variable ("var" , (20000 , 20000 ), (0 , 0 ), (20000 , 20000 ), ctypes .sizeof (ctypes .c_double ))
4546 this_actor .info ("Open the stream" )
4647 engine = stream .open ("my-output" , Stream .Mode .Publish )
@@ -92,9 +93,119 @@ def sub_test_actor():
9293
9394 e .run ()
9495
96+ def run_test_multiple_pub_single_sub_message_queue ():
97+ e = setup_platform ()
98+
99+ def pub_test_actor (id ):
100+ dtl = DTL .connect ()
101+ stream = dtl .add_stream ("my-output" ).set_engine_type (DTLEngine .Type .Staging ).set_transport_method (Transport .Method .MQ )
102+ this_actor .info ("Create a 2D-array variable with 10kx10k double, publishers own 3/4 and 1/4 (along 2nd dimension)" )
103+ var = stream .define_variable ("var" , (10000 , 10000 ), (0 , 2500 * 3 * id ), (10000 , 7500 - (5000 * id )), ctypes .sizeof (ctypes .c_double ))
104+ engine = stream .open ("my-output" , Stream .Mode .Publish )
105+ this_actor .info ("Wait for all publishers to have opened the stream" )
106+ this_actor .sleep_for (.5 )
107+
108+ this_actor .info ("Start a transaction" )
109+ engine .begin_transaction ()
110+ engine .put (var , var .local_size )
111+ this_actor .info ("End the transaction" )
112+ engine .end_transaction ()
113+
114+ this_actor .sleep_for (.5 )
115+ this_actor .info ("Close the engine" )
116+ engine .close ()
117+ this_actor .info ("Disconnect from the DTL" )
118+ DTL .disconnect ()
119+
120+ def sub_test_actor (id ):
121+ dtl = DTL .connect ()
122+ stream = dtl .add_stream ("my-output" )
123+ engine = stream .open ("my-output" , Stream .Mode .Subscribe )
124+ var_sub = stream .inquire_variable ("var" )
125+
126+
127+ this_actor .info ("Set a selection for 'var_sub': get 3/4 and 1/4 of the first dimension respectively" )
128+ var_sub .set_selection ((2500 * 3 * id , 0 ), (7500 - (5000 * id ), 10000 ))
129+
130+ this_actor .info ("Start a transaction" )
131+ engine .begin_transaction ()
132+ this_actor .info ("Get a subset of the Variable 'var' from the DTL" )
133+ engine .get (var_sub )
134+ this_actor .info ("End the transaction" )
135+ engine .end_transaction ()
136+
137+ this_actor .info ("Check local size of var_sub" )
138+ assert var_sub .local_size == 8. * 10000 * 10000 * (3 - 2 * id ) / 4
139+ this_actor .info ("Close the engine" )
140+ engine .close ()
141+ this_actor .info ("Disconnect from the DTL" )
142+ DTL .disconnect ()
143+
144+ for i in range (2 ):
145+ Host .by_name (f"host-{ i } .prod" ).add_actor (f"PubTestActor{ i } " , pub_test_actor , i )
146+ Host .by_name (f"host-{ i } .cons" ).add_actor (f"SubTestActor{ i } " , sub_test_actor , i )
147+
148+ e .run ()
149+
150+ def run_test_multiple_pub_single_sub_mailbox ():
151+ e = setup_platform ()
152+
153+ def pub_test_actor (id ):
154+ dtl = DTL .connect ()
155+ stream = dtl .add_stream ("my-output" ).set_engine_type (DTLEngine .Type .Staging ).set_transport_method (Transport .Method .Mailbox )
156+ this_actor .info ("Create a 2D-array variable with 10kx10k double, publishers own 3/4 and 1/4 (along 2nd dimension)" )
157+ var = stream .define_variable ("var" , (10000 , 10000 ), (0 , 2500 * 3 * id ), (10000 , 7500 - (5000 * id )), ctypes .sizeof (ctypes .c_double ))
158+ engine = stream .open ("my-output" , Stream .Mode .Publish )
159+ this_actor .info ("Wait for all publishers to have opened the stream" )
160+ this_actor .sleep_for (.5 )
161+
162+ this_actor .info ("Start a transaction" )
163+ engine .begin_transaction ()
164+ engine .put (var , var .local_size )
165+ this_actor .info ("End the transaction" )
166+ engine .end_transaction ()
167+
168+ this_actor .sleep_for (.5 )
169+ this_actor .info ("Close the engine" )
170+ engine .close ()
171+ this_actor .info ("Disconnect from the DTL" )
172+ DTL .disconnect ()
173+
174+ def sub_test_actor (id ):
175+ dtl = DTL .connect ()
176+ stream = dtl .add_stream ("my-output" )
177+ engine = stream .open ("my-output" , Stream .Mode .Subscribe )
178+ var_sub = stream .inquire_variable ("var" )
179+
180+
181+ this_actor .info ("Set a selection for 'var_sub': get 3/4 and 1/4 of the first dimension respectively" )
182+ var_sub .set_selection ((2500 * 3 * id , 0 ), (7500 - (5000 * id ), 10000 ))
183+
184+ this_actor .info ("Start a transaction" )
185+ engine .begin_transaction ()
186+ this_actor .info ("Get a subset of the Variable 'var' from the DTL" )
187+ engine .get (var_sub )
188+ this_actor .info ("End the transaction" )
189+ engine .end_transaction ()
190+
191+ this_actor .info ("Check local size of var_sub" )
192+ assert var_sub .local_size == 8. * 10000 * 10000 * (3 - 2 * id ) / 4
193+ this_actor .info ("Close the engine" )
194+ engine .close ()
195+ this_actor .info ("Disconnect from the DTL" )
196+ DTL .disconnect ()
197+
198+ for i in range (2 ):
199+ Host .by_name (f"host-{ i } .prod" ).add_actor (f"PubTestActor{ i } " , pub_test_actor , i )
200+ Host .by_name (f"host-{ i } .cons" ).add_actor (f"SubTestActor{ i } " , sub_test_actor , i )
201+
202+ e .run ()
203+
95204if __name__ == '__main__' :
96205 tests = [
97- run_test_single_pub_single_sub_same_cluster
206+ run_test_single_pub_single_sub_same_cluster ,
207+ run_test_multiple_pub_single_sub_message_queue ,
208+ run_test_multiple_pub_single_sub_mailbox
98209 ]
99210
100211 for test in tests :
0 commit comments