From bb6f19fbbeed361f437b7033694e3449d933d3dd Mon Sep 17 00:00:00 2001 From: Tom Daly Date: Wed, 4 Mar 2026 05:15:42 +0000 Subject: [PATCH] fixes for govstack closedloop and mojaloop testing MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit SendCallbackRoute.java — formatting/logic cleanup SendCallbackWorker.java — added List import application.yaml — max-retry: 10, delay: 30 --- .../bulk/camel/routes/SendCallbackRoute.java | 15 +++-- .../bulk/zeebe/worker/SendCallbackWorker.java | 65 ++++++++++++------- src/main/resources/application.yaml | 4 +- 3 files changed, 51 insertions(+), 33 deletions(-) diff --git a/src/main/java/org/mifos/processor/bulk/camel/routes/SendCallbackRoute.java b/src/main/java/org/mifos/processor/bulk/camel/routes/SendCallbackRoute.java index 099caf2b..14b42101 100644 --- a/src/main/java/org/mifos/processor/bulk/camel/routes/SendCallbackRoute.java +++ b/src/main/java/org/mifos/processor/bulk/camel/routes/SendCallbackRoute.java @@ -50,12 +50,15 @@ public void configure() throws Exception { ObjectMapper objectMapper = new ObjectMapper(); String jsonString = objectMapper.writeValueAsString(batchCallbackDTO); exchange.getIn().setBody(jsonString); - }).choice().when(exchangeProperty("X-CallbackURL").isNotNull()).setHeader(Exchange.HTTP_METHOD, constant("POST")) + }) + // Send the callback if a URL is provided + .choice().when(exchangeProperty("X-CallbackURL").isNotNull()).setHeader(Exchange.HTTP_METHOD, constant("POST")) .toD("${exchangeProperty.X-CallbackURL}?bridgeEndpoint=true&throwExceptionOnFailure=false") - .log(LoggingLevel.INFO, "Callback Response body: ${body}").endChoice().otherwise() - .log("Unable to send callback: callback url is null").choice().when(header(Exchange.HTTP_RESPONSE_CODE).regex("^2\\d{2}$")) - .when(exchangeProperty("X-CallbackURL").isNotNull()).log(LoggingLevel.INFO, "Callback sending was successful") - .process(exchange -> { + .log(LoggingLevel.INFO, "Callback Response body: ${body}").otherwise().log("Unable to send callback: callback url is null") + .end() + // Check the response code (runs at route level regardless of URL presence) + .choice().when(header(Exchange.HTTP_RESPONSE_CODE).regex("^2\\d{2}$")) + .log(LoggingLevel.INFO, "Callback sending was successful").process(exchange -> { List phases = exchange.getProperty(PHASES, List.class); exchange.setProperty(CALLBACK_RESPONSE_CODE, exchange.getIn().getHeader(Exchange.HTTP_RESPONSE_CODE)); exchange.setProperty(CALLBACK_RETRY, 1); @@ -76,13 +79,11 @@ public void configure() throws Exception { retry++; logger.info("Retry Left {}, Setting Callback as Failed and Retrying...", (maxRetry - retry)); exchange.setProperty(CALLBACK_RETRY, retry); - } exchange.setProperty(CALLBACK_RESPONSE_CODE, exchange.getIn().getHeader(Exchange.HTTP_RESPONSE_CODE)); exchange.setProperty(CALLBACK_SUCCESS, false); exchange.setProperty(ERROR_DESCRIPTION, exchange.getIn().getBody(String.class)); exchange.setProperty(ERROR_CODE, exchange.getIn().getHeader(Exchange.HTTP_RESPONSE_CODE)); - }); } diff --git a/src/main/java/org/mifos/processor/bulk/zeebe/worker/SendCallbackWorker.java b/src/main/java/org/mifos/processor/bulk/zeebe/worker/SendCallbackWorker.java index 723e64b2..8f41c3be 100644 --- a/src/main/java/org/mifos/processor/bulk/zeebe/worker/SendCallbackWorker.java +++ b/src/main/java/org/mifos/processor/bulk/zeebe/worker/SendCallbackWorker.java @@ -16,6 +16,7 @@ import static org.mifos.processor.bulk.zeebe.ZeebeVariables.PHASE_COUNT; import static org.mifos.processor.bulk.zeebe.ZeebeVariables.RETRY; +import java.util.List; import java.util.Map; import org.apache.camel.Exchange; import org.apache.camel.support.DefaultExchange; @@ -31,29 +32,33 @@ public void setup() { logger.debug("Job '{}' started from process '{}' with key {}", job.getType(), job.getBpmnProcessId(), job.getKey()); Map variables = job.getVariablesAsMap(); - int retry = variables.getOrDefault(CALLBACK_RETRY, 0).equals(variables.get(MAX_STATUS_RETRY)) ? 0 - : (int) variables.getOrDefault(CALLBACK_RETRY, 0); Exchange exchange = new DefaultExchange(camelContext); - if (variables.get(CALLBACK_RETRY) != null && variables.get(CALLBACK_RETRY).equals(variables.get(MAX_CALLBACK_RETRY))) { - exchange.setProperty(CALLBACK_SUCCESS, false); - exchange.setProperty(CALLBACK_RESPONSE_CODE, variables.get(CALLBACK_RESPONSE_CODE)); + exchange.setProperty(MAX_CALLBACK_RETRY, variables.get(MAX_CALLBACK_RETRY)); + exchange.setProperty(CALLBACK_RETRY, variables.getOrDefault(CALLBACK_RETRY, 0)); + exchange.setProperty(CALLBACK, variables.get(CALLBACK)); + exchange.setProperty(COMPLETION_RATE, variables.get(COMPLETION_RATE)); + exchange.setProperty(PHASES, variables.get(PHASES)); + exchange.setProperty(PHASE_COUNT, variables.get(PHASE_COUNT)); + exchange.setProperty(BATCH_ID, variables.get(BATCH_ID)); + exchange.setProperty(CLIENT_CORRELATION_ID, variables.get(CLIENT_CORRELATION_ID)); + Integer maxRetry = Integer.parseInt(variables.get(MAX_STATUS_RETRY).toString()); + Integer completionRate = Integer.parseInt(variables.get(COMPLETION_RATE).toString()); + Integer completionThreshold = Integer.parseInt(variables.get(COMPLETION_THRESHOLD).toString()); + Integer statusRetry = Integer.parseInt(variables.get(RETRY).toString()); + @SuppressWarnings("unchecked") + List phaseList = (List) variables.get(PHASES); + boolean phaseReached = phaseList != null && phaseList.stream().anyMatch(p -> Integer.parseInt(p.toString()) <= completionRate); + if (statusRetry >= maxRetry || completionRate >= completionThreshold || phaseReached) { + sendToCamelRoute(RouteId.SEND_CALLBACK, exchange); } else { - exchange = new DefaultExchange(camelContext); - exchange.setProperty(MAX_CALLBACK_RETRY, variables.get(MAX_CALLBACK_RETRY)); - exchange.setProperty(CALLBACK_RETRY, variables.getOrDefault(CALLBACK_RETRY, 0)); - exchange.setProperty(CALLBACK, variables.get(CALLBACK)); - exchange.setProperty(COMPLETION_RATE, variables.get(COMPLETION_RATE)); - exchange.setProperty(PHASES, variables.get(PHASES)); - exchange.setProperty(PHASE_COUNT, variables.get(PHASE_COUNT)); - exchange.setProperty(BATCH_ID, variables.get(BATCH_ID)); - exchange.setProperty(CLIENT_CORRELATION_ID, variables.get(CLIENT_CORRELATION_ID)); - Integer maxRetry = Integer.parseInt(variables.get(MAX_STATUS_RETRY).toString()); - Integer completionRate = Integer.parseInt(variables.get(COMPLETION_RATE).toString()); - Integer completionThreshold = Integer.parseInt(variables.get(COMPLETION_THRESHOLD).toString()); - Integer statusRetry = Integer.parseInt(variables.get(RETRY).toString()); - if (statusRetry >= maxRetry || completionRate >= completionThreshold) { - sendToCamelRoute(RouteId.SEND_CALLBACK, exchange); + // No callback needed yet - eliminate any phases already passed and signal success + if (phaseList != null) { + final int rate = completionRate; + phaseList.removeIf(p -> Integer.parseInt(p.toString()) <= rate); + exchange.setProperty(PHASES, phaseList); } + exchange.setProperty(CALLBACK_RESPONSE_CODE, 200); + exchange.setProperty(CALLBACK_SUCCESS, true); } Boolean callbackSuccess = exchange.getProperty(CALLBACK_SUCCESS, Boolean.class); if (callbackSuccess == null || !callbackSuccess) { @@ -64,10 +69,22 @@ public void setup() { variables.put(CALLBACK_SUCCESS, true); } - variables.put(CALLBACK_RETRY, exchange.getProperty(CALLBACK_RETRY)); - variables.put(CALLBACK_RESPONSE_CODE, exchange.getProperty(CALLBACK_RESPONSE_CODE)); - variables.put(PHASE_COUNT, exchange.getProperty(PHASE_COUNT)); - variables.put(PHASES, exchange.getProperty(PHASES)); + Object callbackRetry = exchange.getProperty(CALLBACK_RETRY); + if (callbackRetry != null) { + variables.put(CALLBACK_RETRY, callbackRetry); + } + Object responseCode = exchange.getProperty(CALLBACK_RESPONSE_CODE); + if (responseCode != null) { + variables.put(CALLBACK_RESPONSE_CODE, responseCode); + } + Object phaseCount = exchange.getProperty(PHASE_COUNT); + if (phaseCount != null) { + variables.put(PHASE_COUNT, phaseCount); + } + Object phases = exchange.getProperty(PHASES); + if (phases != null) { + variables.put(PHASES, phases); + } logger.debug("Retry: {} and Response Code {}", exchange.getProperty(CALLBACK_RETRY), exchange.getProperty(CALLBACK_RESPONSE_CODE)); diff --git a/src/main/resources/application.yaml b/src/main/resources/application.yaml index 9a33c08f..a7ac38cb 100644 --- a/src/main/resources/application.yaml +++ b/src/main/resources/application.yaml @@ -101,8 +101,8 @@ config: completion-threshold-check: enable: false completion-threshold: 95 # in percentage - max-retry: 3 #can be as high as 30 - delay: 2 # in seconds + max-retry: 10 #can be as high as 30 + delay: 30 # in seconds deduplication: enabled: true