Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,15 @@ public void configure() throws Exception {
ObjectMapper objectMapper = new ObjectMapper();
String jsonString = objectMapper.writeValueAsString(batchCallbackDTO);
exchange.getIn().setBody(jsonString);
}).choice().when(exchangeProperty("X-CallbackURL").isNotNull()).setHeader(Exchange.HTTP_METHOD, constant("POST"))
})
// Send the callback if a URL is provided
.choice().when(exchangeProperty("X-CallbackURL").isNotNull()).setHeader(Exchange.HTTP_METHOD, constant("POST"))
.toD("${exchangeProperty.X-CallbackURL}?bridgeEndpoint=true&throwExceptionOnFailure=false")
.log(LoggingLevel.INFO, "Callback Response body: ${body}").endChoice().otherwise()
.log("Unable to send callback: callback url is null").choice().when(header(Exchange.HTTP_RESPONSE_CODE).regex("^2\\d{2}$"))
.when(exchangeProperty("X-CallbackURL").isNotNull()).log(LoggingLevel.INFO, "Callback sending was successful")
.process(exchange -> {
.log(LoggingLevel.INFO, "Callback Response body: ${body}").otherwise().log("Unable to send callback: callback url is null")
.end()
// Check the response code (runs at route level regardless of URL presence)
.choice().when(header(Exchange.HTTP_RESPONSE_CODE).regex("^2\\d{2}$"))
.log(LoggingLevel.INFO, "Callback sending was successful").process(exchange -> {
List phases = exchange.getProperty(PHASES, List.class);
exchange.setProperty(CALLBACK_RESPONSE_CODE, exchange.getIn().getHeader(Exchange.HTTP_RESPONSE_CODE));
exchange.setProperty(CALLBACK_RETRY, 1);
Expand All @@ -76,13 +79,11 @@ public void configure() throws Exception {
retry++;
logger.info("Retry Left {}, Setting Callback as Failed and Retrying...", (maxRetry - retry));
exchange.setProperty(CALLBACK_RETRY, retry);

}
exchange.setProperty(CALLBACK_RESPONSE_CODE, exchange.getIn().getHeader(Exchange.HTTP_RESPONSE_CODE));
exchange.setProperty(CALLBACK_SUCCESS, false);
exchange.setProperty(ERROR_DESCRIPTION, exchange.getIn().getBody(String.class));
exchange.setProperty(ERROR_CODE, exchange.getIn().getHeader(Exchange.HTTP_RESPONSE_CODE));

});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import static org.mifos.processor.bulk.zeebe.ZeebeVariables.PHASE_COUNT;
import static org.mifos.processor.bulk.zeebe.ZeebeVariables.RETRY;

import java.util.List;
import java.util.Map;
import org.apache.camel.Exchange;
import org.apache.camel.support.DefaultExchange;
Expand All @@ -31,29 +32,33 @@ public void setup() {
logger.debug("Job '{}' started from process '{}' with key {}", job.getType(), job.getBpmnProcessId(), job.getKey());
Map<String, Object> variables = job.getVariablesAsMap();

int retry = variables.getOrDefault(CALLBACK_RETRY, 0).equals(variables.get(MAX_STATUS_RETRY)) ? 0
: (int) variables.getOrDefault(CALLBACK_RETRY, 0);
Exchange exchange = new DefaultExchange(camelContext);
if (variables.get(CALLBACK_RETRY) != null && variables.get(CALLBACK_RETRY).equals(variables.get(MAX_CALLBACK_RETRY))) {
exchange.setProperty(CALLBACK_SUCCESS, false);
exchange.setProperty(CALLBACK_RESPONSE_CODE, variables.get(CALLBACK_RESPONSE_CODE));
exchange.setProperty(MAX_CALLBACK_RETRY, variables.get(MAX_CALLBACK_RETRY));
exchange.setProperty(CALLBACK_RETRY, variables.getOrDefault(CALLBACK_RETRY, 0));
exchange.setProperty(CALLBACK, variables.get(CALLBACK));
exchange.setProperty(COMPLETION_RATE, variables.get(COMPLETION_RATE));
exchange.setProperty(PHASES, variables.get(PHASES));
exchange.setProperty(PHASE_COUNT, variables.get(PHASE_COUNT));
exchange.setProperty(BATCH_ID, variables.get(BATCH_ID));
exchange.setProperty(CLIENT_CORRELATION_ID, variables.get(CLIENT_CORRELATION_ID));
Integer maxRetry = Integer.parseInt(variables.get(MAX_STATUS_RETRY).toString());
Integer completionRate = Integer.parseInt(variables.get(COMPLETION_RATE).toString());
Integer completionThreshold = Integer.parseInt(variables.get(COMPLETION_THRESHOLD).toString());
Integer statusRetry = Integer.parseInt(variables.get(RETRY).toString());
@SuppressWarnings("unchecked")
List<Object> phaseList = (List<Object>) variables.get(PHASES);
boolean phaseReached = phaseList != null && phaseList.stream().anyMatch(p -> Integer.parseInt(p.toString()) <= completionRate);
if (statusRetry >= maxRetry || completionRate >= completionThreshold || phaseReached) {
sendToCamelRoute(RouteId.SEND_CALLBACK, exchange);
} else {
exchange = new DefaultExchange(camelContext);
exchange.setProperty(MAX_CALLBACK_RETRY, variables.get(MAX_CALLBACK_RETRY));
exchange.setProperty(CALLBACK_RETRY, variables.getOrDefault(CALLBACK_RETRY, 0));
exchange.setProperty(CALLBACK, variables.get(CALLBACK));
exchange.setProperty(COMPLETION_RATE, variables.get(COMPLETION_RATE));
exchange.setProperty(PHASES, variables.get(PHASES));
exchange.setProperty(PHASE_COUNT, variables.get(PHASE_COUNT));
exchange.setProperty(BATCH_ID, variables.get(BATCH_ID));
exchange.setProperty(CLIENT_CORRELATION_ID, variables.get(CLIENT_CORRELATION_ID));
Integer maxRetry = Integer.parseInt(variables.get(MAX_STATUS_RETRY).toString());
Integer completionRate = Integer.parseInt(variables.get(COMPLETION_RATE).toString());
Integer completionThreshold = Integer.parseInt(variables.get(COMPLETION_THRESHOLD).toString());
Integer statusRetry = Integer.parseInt(variables.get(RETRY).toString());
if (statusRetry >= maxRetry || completionRate >= completionThreshold) {
sendToCamelRoute(RouteId.SEND_CALLBACK, exchange);
// No callback needed yet - eliminate any phases already passed and signal success
if (phaseList != null) {
final int rate = completionRate;
phaseList.removeIf(p -> Integer.parseInt(p.toString()) <= rate);
exchange.setProperty(PHASES, phaseList);
}
exchange.setProperty(CALLBACK_RESPONSE_CODE, 200);
exchange.setProperty(CALLBACK_SUCCESS, true);
}
Boolean callbackSuccess = exchange.getProperty(CALLBACK_SUCCESS, Boolean.class);
if (callbackSuccess == null || !callbackSuccess) {
Expand All @@ -64,10 +69,22 @@ public void setup() {
variables.put(CALLBACK_SUCCESS, true);
}

variables.put(CALLBACK_RETRY, exchange.getProperty(CALLBACK_RETRY));
variables.put(CALLBACK_RESPONSE_CODE, exchange.getProperty(CALLBACK_RESPONSE_CODE));
variables.put(PHASE_COUNT, exchange.getProperty(PHASE_COUNT));
variables.put(PHASES, exchange.getProperty(PHASES));
Object callbackRetry = exchange.getProperty(CALLBACK_RETRY);
if (callbackRetry != null) {
variables.put(CALLBACK_RETRY, callbackRetry);
}
Object responseCode = exchange.getProperty(CALLBACK_RESPONSE_CODE);
if (responseCode != null) {
variables.put(CALLBACK_RESPONSE_CODE, responseCode);
}
Object phaseCount = exchange.getProperty(PHASE_COUNT);
if (phaseCount != null) {
variables.put(PHASE_COUNT, phaseCount);
}
Object phases = exchange.getProperty(PHASES);
if (phases != null) {
variables.put(PHASES, phases);
}

logger.debug("Retry: {} and Response Code {}", exchange.getProperty(CALLBACK_RETRY),
exchange.getProperty(CALLBACK_RESPONSE_CODE));
Expand Down
4 changes: 2 additions & 2 deletions src/main/resources/application.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,8 @@ config:
completion-threshold-check:
enable: false
completion-threshold: 95 # in percentage
max-retry: 3 #can be as high as 30
delay: 2 # in seconds
max-retry: 10 #can be as high as 30
delay: 30 # in seconds
deduplication:
enabled: true

Expand Down