Skip to content

Commit ff40e04

Browse files
author
Petr Matousek
committed
passing tx-size fix, default batch size set to 10
1 parent 4dbb900 commit ff40e04

File tree

4 files changed

+12
-9
lines changed

4 files changed

+12
-9
lines changed

src/api/qpid-proton/reactor/TxReceivingClient.cpp

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -315,7 +315,7 @@ int TxReceivingClient::run(int argc, char **argv) const
315315
std::transform(recv_listen.begin(), recv_listen.end(), recv_listen.begin(), ::tolower);
316316
}
317317

318-
int recv_credit_window = -1;
318+
int recv_credit_window = 0;
319319
if(options.is_set("recv-credit-window")) {
320320
recv_credit_window = atoi(options["recv-credit-window"].c_str());
321321
}
@@ -386,16 +386,16 @@ int TxReceivingClient::run(int argc, char **argv) const
386386
recv_listen,
387387
recv_listen_port,
388388
recv_credit_window,
389-
recv_drain_after_credit_window
389+
recv_drain_after_credit_window,
390+
tx_action,
391+
tx_endloop_action
390392
);
391393

392394
if (selector != "") {
393395
handler.setSelector(selector);
394396
}
395397

396-
// TODO python defaults to 10
397-
// int tx_size = 10;
398-
int tx_size = 0;
398+
int tx_size = 10;
399399
if (options.is_set("tx-size")) {
400400
tx_size = static_cast<int> (options.get("tx-size"));
401401
}

src/api/qpid-proton/reactor/handler/TxReceiverHandler.cpp

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -438,6 +438,9 @@ void TxReceiverHandler::on_message(delivery &d, message &m)
438438
s.transaction_commit();
439439
} else if (tx_action == "rollback") {
440440
s.transaction_abort();
441+
if (count == 0) {
442+
recv.connection().close();
443+
}
441444
}
442445

443446
if (tx_action == "none") {
@@ -453,17 +456,17 @@ void TxReceiverHandler::on_message(delivery &d, message &m)
453456
if (duration_time > 0 && duration_mode == "after-receive-action-tx-action") {
454457
// TODO: not implemented yet
455458
}
456-
457459
} else if (count != 0 && processed + current_batch == count) {
458460
logger(debug) << "[on_message] Transaction attempt (endloop): " << tx_endloop_action;
459461
if (tx_endloop_action == "commit") {
460462
s.transaction_commit();
461463
} else if (tx_endloop_action == "rollback") {
462464
s.transaction_abort();
463465
} else {
464-
recv.connection().close();
466+
recv.connection().close();
465467
}
466468
}
469+
467470
}
468471

469472
void TxReceiverHandler::on_transport_close(transport &t) {

src/api/qpid-proton/reactor/handler/TxReceiverHandler.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ class TxReceiverHandler : public ReceiverHandler {
154154
private:
155155
typedef ReceiverHandler super;
156156

157-
int batch_size = 0;
157+
int batch_size = 10;
158158
int current_batch = 0;
159159
int processed = 0;
160160
string tx_action = "commit";

src/api/qpid-proton/reactor/handler/TxSenderHandler.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ class TxSenderHandler : public SenderHandler {
128128
private:
129129
typedef SenderHandler super;
130130

131-
int batch_size = 0;
131+
int batch_size = 10;
132132
int current_batch = 0;
133133
int processed = 0;
134134
string tx_action = "commit";

0 commit comments

Comments
 (0)