3030import org .apache .qpid .protonj2 .client .Receiver ;
3131import org .apache .qpid .protonj2 .client .ReceiverOptions ;
3232import org .apache .qpid .protonj2 .client .Sender ;
33+ import org .apache .qpid .protonj2 .client .Session ;
34+ import org .apache .qpid .protonj2 .client .exceptions .ClientException ;
3335import picocli .CommandLine ;
3436
37+ import java .io .IOException ;
3538import java .net .URI ;
3639import java .nio .charset .StandardCharsets ;
3740import java .nio .file .Files ;
@@ -109,14 +112,26 @@ public class CliProtonJ2Receiver extends CliProtonJ2SenderReceiver implements Ca
109112 private Integer duration ;
110113
111114 @ CommandLine .Option (names = {"--duration-mode" }) // todo
112- private DurationMode durationMode ;
115+ private DurationModeReceiver durationMode ;
113116
114117 @ CommandLine .Option (names = {"--ssn-ack-mode" })
115118 private SsnAckMode ssnAckMode ;
116119
120+ @ CommandLine .Option (names = {"--tx-size" })
121+ private Integer txSize ;
122+
123+ @ CommandLine .Option (names = {"--tx-endloop-action" })
124+ private TxAction txEndloopAction ;
125+
126+ @ CommandLine .Option (names = {"--tx-action" })
127+ private TxAction txAction ;
128+
117129 @ CommandLine .Option (names = {"--msg-content-to-file" })
118130 private String msgContentToFile ;
119131
132+ @ CommandLine .Option (names = {"--conn-reconnect" })
133+ private String reconnectString = "false" ;
134+
120135 public CliProtonJ2Receiver () {
121136 this .messageFormatter = new ProtonJ2MessageFormatter ();
122137 }
@@ -125,8 +140,11 @@ public CliProtonJ2Receiver(ProtonJ2MessageFormatter messageFormatter) {
125140 this .messageFormatter = messageFormatter ;
126141 }
127142
143+ /**
144+ * This is the main function of the client, as called by the cli options handling library.
145+ */
128146 @ Override
129- public Integer call () throws Exception { // your business logic goes here...
147+ public Integer call () throws Exception {
130148 String prefix = "" ;
131149 if (!broker .startsWith ("amqp://" ) && !broker .startsWith ("amqps://" )) {
132150 prefix = "amqp://" ;
@@ -163,6 +181,9 @@ public Integer call() throws Exception { // your business logic goes here...
163181 }
164182
165183 final ConnectionOptions options = new ConnectionOptions ();
184+ if (stringToBool (reconnectString )) {
185+ options .reconnectEnabled (true );
186+ }
166187 options .user (connUsername );
167188 options .password (connPassword );
168189 for (AuthMechanism mech : connAuthMechanisms ) {
@@ -189,26 +210,33 @@ public Integer call() throws Exception { // your business logic goes here...
189210 }
190211 }
191212
192- try (Connection connection = client .connect (serverHost , serverPort , options )) {
213+ boolean transacted = txSize != null || txAction != null || txEndloopAction != null ;
214+
215+ try (Connection connection = client .connect (serverHost , serverPort , options );
216+ Session session = connection .openSession ()) {
193217 Receiver receiver ;
194218 if (stringToBool (durableSubscriberString )) {
195- receiver = connection .openDurableReceiver (address , durableSubscriberName , receiverOptions );
219+ receiver = session .openDurableReceiver (address , durableSubscriberName , receiverOptions );
196220 } else {
197- receiver = connection .openReceiver (address , receiverOptions );
221+ receiver = session .openReceiver (address , receiverOptions );
198222 }
199223
200224 if (stringToBool (subscriberUnsubscribeString )) {
201225 receiver .close ();
202226 return 0 ;
203227 }
204228
229+ if (transacted ) {
230+ session .beginTransaction ();
231+ }
232+
233+ int i = 0 ;
205234 double initialTimestamp = Utils .getTime ();
206- for ( int i = 0 ; i < count ; i ++ ) {
235+ while ( true ) {
207236
208- // if (durationMode == DurationMode.sleepBeforeReceive) {
209- // LOG.trace("Sleeping before receive");
210- // Utils.sleepUntilNextIteration(initialTimestamp, msgCount, duration, i + 1);
211- // }
237+ if (durationMode == DurationModeReceiver .beforeReceive ) {
238+ Utils .sleepUntilNextIteration (initialTimestamp , count , duration , i + 1 );
239+ }
212240
213241 final Delivery delivery ;
214242 if (timeout == 0 ) {
@@ -221,9 +249,8 @@ public Integer call() throws Exception { // your business logic goes here...
221249 break ;
222250 }
223251
224- if (durationMode == DurationMode .afterReceive ) {
225- // LOG.trace("Sleeping after receive");
226- Utils .sleepUntilNextIteration (initialTimestamp , count , duration , i + 1 ); // todo possibly it is i, different loop here
252+ if (durationMode == DurationModeReceiver .afterReceive ) {
253+ Utils .sleepUntilNextIteration (initialTimestamp , count , duration , i + 1 );
227254 }
228255
229256 if (processReplyTo && delivery .message ().replyTo () != null ) {
@@ -235,42 +262,51 @@ public Integer call() throws Exception { // your business logic goes here...
235262 }
236263 }
237264
238- int messageFormat = delivery .messageFormat ();
239- Message <Object > message = delivery .message ();
240-
241265 // todo, is this what we mean?
242266 if (ssnAckMode != null && ssnAckMode == SsnAckMode .client ) {
243267 delivery .accept ();
244268 }
245269
246- Map <String , Object > messageDict = messageFormatter .formatMessage (address , message , stringToBool (msgContentHashedString ));
247- if (msgContentToFile != null ) {
248- // todo?
249- Path file = Paths .get (msgContentToFile + "_" + i );
250- Files .write (file , message .body ().toString ().getBytes (StandardCharsets .UTF_8 ));
251- }
252- switch (out ) {
253- case python :
254- switch (logMsgs ) {
255- case dict :
256- messageFormatter .printMessageAsPython (messageDict );
257- break ;
258- case interop :
259- messageFormatter .printMessageAsPython (messageDict );
260- break ;
270+ outputReceivedMessage (i , delivery );
271+ i ++;
272+
273+ if (txSize != null && txSize != 0 ) {
274+ if (i % txSize == 0 ) {
275+ if (txAction != null ) {
276+ switch (txAction ) {
277+ case commit :
278+ session .commitTransaction ();
279+ break ;
280+ case rollback :
281+ session .rollbackTransaction ();
282+ break ;
283+ }
284+
285+ session .beginTransaction ();
286+
287+ if (durationMode == DurationModeReceiver .afterReceiveTxAction ) {
288+ Utils .sleepUntilNextIteration (initialTimestamp , i , duration , i + 1 );
289+ }
261290 }
291+ }
292+ }
293+
294+ if (i == count ) { // not i > count; --count=0 needs to disable the break
295+ break ;
296+ }
297+ }
298+
299+ if (txEndloopAction != null ) {
300+ switch (txEndloopAction ) {
301+ case commit :
302+ session .commitTransaction ();
262303 break ;
263- case json :
264- switch (logMsgs ) {
265- case dict :
266- messageFormatter .printMessageAsJson (messageDict );
267- break ;
268- case interop :
269- messageFormatter .printMessageAsJson (messageDict );
270- break ;
271- }
304+ case rollback :
305+ session .rollbackTransaction ();
272306 break ;
273307 }
308+ } else if (transacted ) {
309+ session .rollbackTransaction ();
274310 }
275311
276312 if (stringToBool (durableSubscriberString )) {
@@ -283,4 +319,36 @@ public Integer call() throws Exception { // your business logic goes here...
283319 return 0 ;
284320 }
285321
322+ private void outputReceivedMessage (int i , Delivery delivery ) throws ClientException , IOException {
323+ Message <Object > message = delivery .message ();
324+ int messageFormat = delivery .messageFormat ();
325+ Map <String , Object > messageDict = messageFormatter .formatMessage (address , message , stringToBool (msgContentHashedString ));
326+ if (msgContentToFile != null ) {
327+ // todo?
328+ Path file = Paths .get (msgContentToFile + "_" + i );
329+ Files .write (file , message .body ().toString ().getBytes (StandardCharsets .UTF_8 ));
330+ }
331+ switch (out ) {
332+ case python :
333+ switch (logMsgs ) {
334+ case dict :
335+ messageFormatter .printMessageAsPython (messageDict );
336+ break ;
337+ case interop :
338+ messageFormatter .printMessageAsPython (messageDict );
339+ break ;
340+ }
341+ break ;
342+ case json :
343+ switch (logMsgs ) {
344+ case dict :
345+ messageFormatter .printMessageAsJson (messageDict );
346+ break ;
347+ case interop :
348+ messageFormatter .printMessageAsJson (messageDict );
349+ break ;
350+ }
351+ break ;
352+ }
353+ }
286354}
0 commit comments