Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions src/main/java/org/mifos/processor/bulk/ConfigurationValidator.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ public class ConfigurationValidator {
@Value("${config.ordering.field}")
private String orderingField;

@Value("${config.success-threshold-check.success-threshold}")
private int successRate;
@Value("${config.completion-threshold-check.completion-threshold}")
private int completionRate;

@Value("${config.success-threshold-check.max-retry}")
@Value("${config.completion-threshold-check.max-retry}")
private int maxThresholdCheckRetry;

@Value("${config.formatting.standard}")
Expand All @@ -38,8 +38,8 @@ private void validate() {
if (workerConfig.isOrderingWorkerEnabled) {
validateOrderingConfig();
}
if (workerConfig.isSuccessThresholdCheckEnabled) {
validateSuccessThresholdConfig();
if (workerConfig.isCompletionThresholdCheckEnabled) {
validateCompletionThresholdConfig();
validateMaxRetryFromThresholdCheck();
}
if (workerConfig.isFormattingWorkerEnabled) {
Expand Down Expand Up @@ -87,13 +87,13 @@ private void validateOrderingConfig() {
}

// validates the success threshold related configuration
private void validateSuccessThresholdConfig() {
if (successRate <= 0 || successRate > 100) {
throw new ConfigurationValidationException("Invalid success threshold value configured (value=" + successRate + ").");
private void validateCompletionThresholdConfig() {
if (completionRate <= 0 || completionRate > 100) {
throw new ConfigurationValidationException("Invalid completion threshold value configured (value=" + completionRate + ").");
}

if (successRate < 50) {
logger.warn("It is advised to set the success threshold greater than 50. Currently configured as {}", successRate);
if (completionRate < 50) {
logger.warn("It is advised to set the completion threshold greater than 50. Currently configured as {}", completionRate);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
@Component
public class BatchStatusRoute extends BaseRouteBuilder {

@Value("${config.success-threshold-check.success-threshold}")
private int successThreshold;
@Value("${config.completion-threshold-check.completion-threshold}")
private int completionThreshold;

@Override
public void configure() throws Exception {
Expand Down Expand Up @@ -65,14 +65,14 @@ public void configure() throws Exception {
.process(exchange -> {
BatchDTO batchSummary = exchange.getIn().getBody(BatchDTO.class);

long percentage = (batchSummary.getSuccessful()/batchSummary.getTotal())*100;
long percentage =(long)(((double)batchSummary.getSuccessful()/batchSummary.getTotal())*100);

if (percentage >= successThreshold) {
if (percentage >= completionThreshold) {
logger.info("Batch success threshold reached. Expected rate: {}, Actual Rate: {}",
successThreshold, percentage);
completionThreshold, percentage);
}

exchange.setProperty(SUCCESS_RATE, percentage);
exchange.setProperty(COMPLETION_RATE, percentage);

})
.otherwise()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import org.apache.camel.LoggingLevel;
import org.json.JSONObject;
import org.mifos.processor.bulk.file.FileTransferService;
import org.mifos.processor.bulk.utility.PhaseUtils;
import org.mifos.processor.bulk.utility.Utils;
import org.mifos.processor.bulk.zeebe.ZeebeProcessStarter;
import org.mifos.processor.bulk.zeebe.worker.WorkerConfig;
Expand All @@ -16,6 +17,7 @@
import java.io.File;
import java.io.FileWriter;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;

Expand Down Expand Up @@ -43,17 +45,23 @@ public class ProcessorStartRoute extends BaseRouteBuilder {
@Value("${bpmn.flows.bulk-processor}")
private String workflowId;

@Value("${config.success-threshold-check.success-threshold}")
private int successThreshold;
@Value("${config.completion-threshold-check.completion-threshold}")
private int completionThreshold;

@Value("${config.success-threshold-check.max-retry}")
@Value("${config.completion-threshold-check.max-retry}")
private int maxThresholdCheckRetry;

@Value("${config.success-threshold-check.delay}")
@Value("${config.completion-threshold-check.delay}")
private int thresholdCheckDelay;

@Value("${callback.max-retry}")
private int maxCallbackRetry;

private final Logger logger = LoggerFactory.getLogger(this.getClass());

@Autowired
PhaseUtils phaseUtils;

@Override
public void configure() {
setup();
Expand Down Expand Up @@ -149,22 +157,28 @@ private void setup() {
logger.info("File uploaded {}", nm);

//extracting and setting callback Url
String callbackUrl = exchange.getIn().getHeader("Callback-Url", String.class);
String callbackUrl = exchange.getIn().getHeader("X-Callback-URL", String.class);
exchange.setProperty(CALLBACK_URL,callbackUrl);

List<Integer> phases = phaseUtils.getValues();
logger.info(phases.toString());
Map<String, Object> variables = new HashMap<>();
variables.put(BATCH_ID, batchId);
variables.put(FILE_NAME, fileName);
variables.put(REQUEST_ID, requestId);
variables.put(PURPOSE, purpose);
variables.put(TENANT_ID, exchange.getProperty(TENANT_NAME));
variables.put(CALLBACK_URL,callbackUrl);
variables.put(PHASES,phases);
variables.put(PHASE_COUNT,phases.size());
setConfigProperties(variables);

JSONObject response = new JSONObject();

try {
String txnId = zeebeProcessStarter.startZeebeWorkflow(workflowId, "", variables);
String tenantSpecificWorkflowId = workflowId.replace("{dfspid}",
exchange.getProperty(TENANT_NAME).toString());
String txnId = zeebeProcessStarter.startZeebeWorkflow(tenantSpecificWorkflowId, "", variables);
if (txnId == null || txnId.isEmpty()) {
response.put("errorCode", 500);
response.put("errorDescription", "Unable to start zeebe workflow");
Expand Down Expand Up @@ -205,13 +219,14 @@ private Map<String, Object> setConfigProperties(Map<String, Object> variables) {
variables.put(ORDERING_ENABLED, workerConfig.isOrderingWorkerEnabled);
variables.put(SPLITTING_ENABLED, workerConfig.isSplittingWorkerEnabled);
variables.put(FORMATTING_ENABLED, workerConfig.isFormattingWorkerEnabled);
variables.put(SUCCESS_THRESHOLD_CHECK_ENABLED, workerConfig.isSuccessThresholdCheckEnabled);
variables.put(COMPLETION_THRESHOLD_CHECK_ENABLED, workerConfig.isCompletionThresholdCheckEnabled);
variables.put(MERGE_ENABLED, workerConfig.isMergeBackWorkerEnabled);
variables.put(MAX_STATUS_RETRY, maxThresholdCheckRetry);
variables.put(SUCCESS_THRESHOLD, successThreshold);
variables.put(COMPLETION_THRESHOLD, completionThreshold);
variables.put(THRESHOLD_DELAY, Utils.getZeebeTimerValue(thresholdCheckDelay));
variables.put(BULK_NOTIF_SUCCESS,false);
variables.put(BULK_NOTIF_FAILURE,false);
variables.put(MAX_CALLBACK_RETRY,maxCallbackRetry);

return variables;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,11 @@

import org.apache.camel.Exchange;
import org.apache.camel.LoggingLevel;
import org.apache.camel.model.dataformat.JsonLibrary;
import org.apache.camel.spi.AsEndpointUri;
import org.mifos.processor.bulk.schema.BatchDTO;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import java.util.List;

import static org.mifos.processor.bulk.camel.config.CamelProperties.*;
import static org.mifos.processor.bulk.zeebe.ZeebeVariables.*;

Expand Down Expand Up @@ -35,8 +34,9 @@ public void configure() throws Exception {
.log("Starting route " + RouteId.SEND_CALLBACK.name())
.log("Sending callback for Batch Processing")
.setBody(exchange -> {
String body = "Batch Aggregation API was complete with %: " +
exchange.getProperty(SUCCESS_RATE);
String body = String.format
("The Batch Aggregation API was complete with : %s"
, exchange.getProperty(COMPLETION_RATE).toString());
callbackUrl = exchange.getProperty(CALLBACK_URL).toString();
return body;
})
Expand All @@ -45,22 +45,31 @@ public void configure() throws Exception {
.when(header("CamelHttpResponseCode").startsWith("2"))
.log(LoggingLevel.INFO, "Callback sending was successful")
.process(exchange -> {
List<Integer> phases = (List<Integer>) exchange.getProperty(PHASES);

exchange.setProperty(CALLBACK_RESPONSE_CODE, exchange.getIn()
.getHeader(Exchange.HTTP_RESPONSE_CODE));
exchange.setProperty(CALLBACK_RETRY, 1);
exchange.setProperty(CALLBACK_SUCCESS, true);
eliminatePhases(exchange,phases, (Integer)exchange.getProperty(PHASE_COUNT),
(Integer)exchange.getProperty(COMPLETION_RATE));

})
.otherwise()
.log(LoggingLevel.ERROR, "Callback request was unsuccessful")
.process(exchange -> {
if (exchange.getProperty(CALLBACK_RETRY).equals(MAX_STATUS_RETRY)) {
if (exchange.getProperty(CALLBACK_RETRY).equals(exchange.getProperty(MAX_CALLBACK_RETRY))) {
List<Integer> phases = (List<Integer>) exchange.getProperty(PHASES);
logger.info("Retry Exhausted, setting Callback as Failed");
eliminatePhases(exchange,phases, (Integer)exchange.getProperty(PHASE_COUNT),
(Integer)exchange.getProperty(COMPLETION_RATE));
exchange.setProperty(CALLBACK_RESPONSE_CODE, exchange.getIn()
.getHeader(Exchange.HTTP_RESPONSE_CODE));
} else {
int retry = (int) exchange.getProperty(CALLBACK_RETRY);
retry++;
logger.info("Retry Left {}, Setting Callback as Failed and Retrying...",
((int) exchange.getProperty(MAX_STATUS_RETRY) - retry));
((int) exchange.getProperty(MAX_CALLBACK_RETRY) - retry));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use Integer or avoid typecasting

exchange.setProperty(CALLBACK_RETRY, retry);
exchange.setProperty(CALLBACK_RESPONSE_CODE, exchange.getIn()
.getHeader(Exchange.HTTP_RESPONSE_CODE));
Expand All @@ -76,5 +85,15 @@ public void configure() throws Exception {
.log("Reached Simulation");
}

public void eliminatePhases(Exchange exchange, List<Integer> phases, int phaseCount, int completionRate){
int i = 0;
while(phases.size() > 0 && phases.size() > i){
if(phases.get(i) <= completionRate)
phases.remove(i);
i++;
}
exchange.setProperty(PHASES,phases);
exchange.setProperty(PHASE_COUNT,phaseCount);
}

}
25 changes: 25 additions & 0 deletions src/main/java/org/mifos/processor/bulk/utility/PhaseUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package org.mifos.processor.bulk.utility;


import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;

import java.util.List;

@Component
@ConfigurationProperties(prefix = "callback_phases")
public class PhaseUtils {

private List<Integer> values;

public List<Integer> getValues() {
return values;
}

public void setValues(List<Integer> values) {
this.values = values;

}


}
12 changes: 9 additions & 3 deletions src/main/java/org/mifos/processor/bulk/zeebe/ZeebeVariables.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ private ZeebeVariables() {

public static final String FORMATTING_ENABLED = "formattingEnabled";

public static final String SUCCESS_THRESHOLD_CHECK_ENABLED = "successThresholdCheckEnabled";
public static final String COMPLETION_THRESHOLD_CHECK_ENABLED = "completionThresholdCheckEnabled";

public static final String MERGE_ENABLED = "mergeEnabled";

Expand Down Expand Up @@ -89,9 +89,9 @@ private ZeebeVariables() {

public static final String CALLBACK_RETRY = "callbackRetryCount";

public static final String SUCCESS_THRESHOLD = "successThreshold";
public static final String COMPLETION_THRESHOLD = "completionThreshold";

public static final String SUCCESS_RATE = "successRate";
public static final String COMPLETION_RATE = "completionRate";

public static final String ERROR_CODE = "errorCode";

Expand All @@ -105,7 +105,13 @@ private ZeebeVariables() {

public static final String CALLBACK_URL = "callbackUrl";

public static final String MAX_CALLBACK_RETRY = "maxCallbackRetry";

public static final String BULK_NOTIF_SUCCESS = "isNotificationsSuccessEnabled";

public static final String BULK_NOTIF_FAILURE = "isNotificationsFailureEnabled";

public static final String PHASES = "phases";

public static final String PHASE_COUNT = "phaseCount";
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public void setup() {
Map<String, Object> variables = job.getVariablesAsMap();

int retry = (int) variables.getOrDefault(RETRY, 0);
int successRate = (int) variables.getOrDefault(SUCCESS_RATE, 0);
int successRate = (int) variables.getOrDefault(COMPLETION_RATE, 0);

Exchange exchange = new DefaultExchange(camelContext);
exchange.setProperty(BATCH_ID, variables.get(BATCH_ID));
Expand All @@ -27,14 +27,14 @@ public void setup() {

Boolean batchStatusFailed = exchange.getProperty(BATCH_STATUS_FAILED, Boolean.class);
if (batchStatusFailed == null || !batchStatusFailed) {
successRate = exchange.getProperty(SUCCESS_RATE, Long.class).intValue();
successRate = exchange.getProperty(COMPLETION_RATE, Long.class).intValue();
} else {
variables.put(ERROR_CODE, exchange.getProperty(ERROR_CODE));
variables.put(ERROR_DESCRIPTION, exchange.getProperty(ERROR_DESCRIPTION));
logger.info("Error: {}, {}", variables.get(ERROR_CODE), variables.get(ERROR_DESCRIPTION));
}

variables.put(SUCCESS_RATE, successRate);
variables.put(COMPLETION_RATE, successRate);
variables.put(RETRY, ++retry);

logger.info("Retry: {} and Success Rate: {}", retry, successRate);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@ public void setup() {
newWorker(Worker.SEND_CALLBACK, (client, job) -> {
Map<String, Object> variables = job.getVariablesAsMap();

int retry = (int) variables.getOrDefault(CALLBACK_RETRY, 0);

int retry = variables.getOrDefault(CALLBACK_RETRY, 0).equals(variables.get(MAX_STATUS_RETRY))?0:
(int) variables.getOrDefault(CALLBACK_RETRY, 0);
Exchange exchange = new DefaultExchange(camelContext);
exchange.setProperty(MAX_STATUS_RETRY,variables.get(MAX_STATUS_RETRY));
exchange.setProperty(MAX_CALLBACK_RETRY,variables.get(MAX_CALLBACK_RETRY));
exchange.setProperty(CALLBACK_RETRY,retry);
exchange.setProperty(CALLBACK_URL,variables.get(CALLBACK_URL));
exchange.setProperty(SUCCESS_RATE,variables.get(SUCCESS_RATE));
exchange.setProperty(COMPLETION_RATE,variables.get(COMPLETION_RATE));
exchange.setProperty(PHASES,variables.get(PHASES));
exchange.setProperty(PHASE_COUNT,variables.get(PHASE_COUNT));
sendToCamelRoute(RouteId.SEND_CALLBACK, exchange);

Boolean callbackSuccess = exchange.getProperty(CALLBACK_SUCCESS, Boolean.class);
Expand All @@ -39,8 +41,10 @@ public void setup() {

variables.put(CALLBACK_RETRY,exchange.getProperty(CALLBACK_RETRY));
variables.put(CALLBACK_RESPONSE_CODE,exchange.getProperty(CALLBACK_RESPONSE_CODE));
variables.put(PHASE_COUNT,exchange.getProperty(PHASE_COUNT));
variables.put(PHASES,exchange.getProperty(PHASES));

logger.info("Retry: {} and Response Code {}", exchange.getProperty(CALLBACK_RETRY), exchange.getProperty(CALLBACK_RESPONSE_CODE));
logger.debug("Retry: {} and Response Code {}", exchange.getProperty(CALLBACK_RETRY), exchange.getProperty(CALLBACK_RESPONSE_CODE));
client.newCompleteCommand(job.getKey()).variables(variables).send();
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public class WorkerConfig {
@Value("${config.mergeback.enable}")
public boolean isMergeBackWorkerEnabled;

@Value("${config.success-threshold-check.enable}")
public boolean isSuccessThresholdCheckEnabled;
@Value("${config.completion-threshold-check.enable}")
public boolean isCompletionThresholdCheckEnabled;

}
Loading