diff --git a/src/main/java/org/mifos/processor/bulk/ConfigurationValidator.java b/src/main/java/org/mifos/processor/bulk/ConfigurationValidator.java index 38487ced..78a98a6e 100644 --- a/src/main/java/org/mifos/processor/bulk/ConfigurationValidator.java +++ b/src/main/java/org/mifos/processor/bulk/ConfigurationValidator.java @@ -21,10 +21,10 @@ public class ConfigurationValidator { @Value("${config.ordering.field}") private String orderingField; - @Value("${config.success-threshold-check.success-threshold}") - private int successRate; + @Value("${config.completion-threshold-check.completion-threshold}") + private int completionRate; - @Value("${config.success-threshold-check.max-retry}") + @Value("${config.completion-threshold-check.max-retry}") private int maxThresholdCheckRetry; @Value("${config.formatting.standard}") @@ -38,8 +38,8 @@ private void validate() { if (workerConfig.isOrderingWorkerEnabled) { validateOrderingConfig(); } - if (workerConfig.isSuccessThresholdCheckEnabled) { - validateSuccessThresholdConfig(); + if (workerConfig.isCompletionThresholdCheckEnabled) { + validateCompletionThresholdConfig(); validateMaxRetryFromThresholdCheck(); } if (workerConfig.isFormattingWorkerEnabled) { @@ -87,13 +87,13 @@ private void validateOrderingConfig() { } // validates the success threshold related configuration - private void validateSuccessThresholdConfig() { - if (successRate <= 0 || successRate > 100) { - throw new ConfigurationValidationException("Invalid success threshold value configured (value=" + successRate + ")."); + private void validateCompletionThresholdConfig() { + if (completionRate <= 0 || completionRate > 100) { + throw new ConfigurationValidationException("Invalid completion threshold value configured (value=" + completionRate + ")."); } - if (successRate < 50) { - logger.warn("It is advised to set the success threshold greater than 50. Currently configured as {}", successRate); + if (completionRate < 50) { + logger.warn("It is advised to set the completion threshold greater than 50. Currently configured as {}", completionRate); } } diff --git a/src/main/java/org/mifos/processor/bulk/camel/routes/BatchStatusRoute.java b/src/main/java/org/mifos/processor/bulk/camel/routes/BatchStatusRoute.java index 9d170d26..05a4682e 100644 --- a/src/main/java/org/mifos/processor/bulk/camel/routes/BatchStatusRoute.java +++ b/src/main/java/org/mifos/processor/bulk/camel/routes/BatchStatusRoute.java @@ -13,8 +13,8 @@ @Component public class BatchStatusRoute extends BaseRouteBuilder { - @Value("${config.success-threshold-check.success-threshold}") - private int successThreshold; + @Value("${config.completion-threshold-check.completion-threshold}") + private int completionThreshold; @Override public void configure() throws Exception { @@ -65,14 +65,14 @@ public void configure() throws Exception { .process(exchange -> { BatchDTO batchSummary = exchange.getIn().getBody(BatchDTO.class); - long percentage = (batchSummary.getSuccessful()/batchSummary.getTotal())*100; + long percentage =(long)(((double)batchSummary.getSuccessful()/batchSummary.getTotal())*100); - if (percentage >= successThreshold) { + if (percentage >= completionThreshold) { logger.info("Batch success threshold reached. Expected rate: {}, Actual Rate: {}", - successThreshold, percentage); + completionThreshold, percentage); } - exchange.setProperty(SUCCESS_RATE, percentage); + exchange.setProperty(COMPLETION_RATE, percentage); }) .otherwise() diff --git a/src/main/java/org/mifos/processor/bulk/camel/routes/ProcessorStartRoute.java b/src/main/java/org/mifos/processor/bulk/camel/routes/ProcessorStartRoute.java index c39ab39b..d278414e 100644 --- a/src/main/java/org/mifos/processor/bulk/camel/routes/ProcessorStartRoute.java +++ b/src/main/java/org/mifos/processor/bulk/camel/routes/ProcessorStartRoute.java @@ -4,6 +4,7 @@ import org.apache.camel.LoggingLevel; import org.json.JSONObject; import org.mifos.processor.bulk.file.FileTransferService; +import org.mifos.processor.bulk.utility.PhaseUtils; import org.mifos.processor.bulk.utility.Utils; import org.mifos.processor.bulk.zeebe.ZeebeProcessStarter; import org.mifos.processor.bulk.zeebe.worker.WorkerConfig; @@ -16,6 +17,7 @@ import java.io.File; import java.io.FileWriter; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.UUID; @@ -43,17 +45,23 @@ public class ProcessorStartRoute extends BaseRouteBuilder { @Value("${bpmn.flows.bulk-processor}") private String workflowId; - @Value("${config.success-threshold-check.success-threshold}") - private int successThreshold; + @Value("${config.completion-threshold-check.completion-threshold}") + private int completionThreshold; - @Value("${config.success-threshold-check.max-retry}") + @Value("${config.completion-threshold-check.max-retry}") private int maxThresholdCheckRetry; - @Value("${config.success-threshold-check.delay}") + @Value("${config.completion-threshold-check.delay}") private int thresholdCheckDelay; + @Value("${callback.max-retry}") + private int maxCallbackRetry; + private final Logger logger = LoggerFactory.getLogger(this.getClass()); + @Autowired + PhaseUtils phaseUtils; + @Override public void configure() { setup(); @@ -149,9 +157,11 @@ private void setup() { logger.info("File uploaded {}", nm); //extracting and setting callback Url - String callbackUrl = exchange.getIn().getHeader("Callback-Url", String.class); + String callbackUrl = exchange.getIn().getHeader("X-Callback-URL", String.class); exchange.setProperty(CALLBACK_URL,callbackUrl); + List phases = phaseUtils.getValues(); + logger.info(phases.toString()); Map variables = new HashMap<>(); variables.put(BATCH_ID, batchId); variables.put(FILE_NAME, fileName); @@ -159,12 +169,16 @@ private void setup() { variables.put(PURPOSE, purpose); variables.put(TENANT_ID, exchange.getProperty(TENANT_NAME)); variables.put(CALLBACK_URL,callbackUrl); + variables.put(PHASES,phases); + variables.put(PHASE_COUNT,phases.size()); setConfigProperties(variables); JSONObject response = new JSONObject(); try { - String txnId = zeebeProcessStarter.startZeebeWorkflow(workflowId, "", variables); + String tenantSpecificWorkflowId = workflowId.replace("{dfspid}", + exchange.getProperty(TENANT_NAME).toString()); + String txnId = zeebeProcessStarter.startZeebeWorkflow(tenantSpecificWorkflowId, "", variables); if (txnId == null || txnId.isEmpty()) { response.put("errorCode", 500); response.put("errorDescription", "Unable to start zeebe workflow"); @@ -205,13 +219,14 @@ private Map setConfigProperties(Map variables) { variables.put(ORDERING_ENABLED, workerConfig.isOrderingWorkerEnabled); variables.put(SPLITTING_ENABLED, workerConfig.isSplittingWorkerEnabled); variables.put(FORMATTING_ENABLED, workerConfig.isFormattingWorkerEnabled); - variables.put(SUCCESS_THRESHOLD_CHECK_ENABLED, workerConfig.isSuccessThresholdCheckEnabled); + variables.put(COMPLETION_THRESHOLD_CHECK_ENABLED, workerConfig.isCompletionThresholdCheckEnabled); variables.put(MERGE_ENABLED, workerConfig.isMergeBackWorkerEnabled); variables.put(MAX_STATUS_RETRY, maxThresholdCheckRetry); - variables.put(SUCCESS_THRESHOLD, successThreshold); + variables.put(COMPLETION_THRESHOLD, completionThreshold); variables.put(THRESHOLD_DELAY, Utils.getZeebeTimerValue(thresholdCheckDelay)); variables.put(BULK_NOTIF_SUCCESS,false); variables.put(BULK_NOTIF_FAILURE,false); + variables.put(MAX_CALLBACK_RETRY,maxCallbackRetry); return variables; } diff --git a/src/main/java/org/mifos/processor/bulk/camel/routes/SendCallbackRoute.java b/src/main/java/org/mifos/processor/bulk/camel/routes/SendCallbackRoute.java index c2c25af6..c87e81b2 100644 --- a/src/main/java/org/mifos/processor/bulk/camel/routes/SendCallbackRoute.java +++ b/src/main/java/org/mifos/processor/bulk/camel/routes/SendCallbackRoute.java @@ -2,12 +2,11 @@ import org.apache.camel.Exchange; import org.apache.camel.LoggingLevel; -import org.apache.camel.model.dataformat.JsonLibrary; -import org.apache.camel.spi.AsEndpointUri; -import org.mifos.processor.bulk.schema.BatchDTO; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; +import java.util.List; + import static org.mifos.processor.bulk.camel.config.CamelProperties.*; import static org.mifos.processor.bulk.zeebe.ZeebeVariables.*; @@ -35,8 +34,9 @@ public void configure() throws Exception { .log("Starting route " + RouteId.SEND_CALLBACK.name()) .log("Sending callback for Batch Processing") .setBody(exchange -> { - String body = "Batch Aggregation API was complete with %: " + - exchange.getProperty(SUCCESS_RATE); + String body = String.format + ("The Batch Aggregation API was complete with : %s" + , exchange.getProperty(COMPLETION_RATE).toString()); callbackUrl = exchange.getProperty(CALLBACK_URL).toString(); return body; }) @@ -45,22 +45,31 @@ public void configure() throws Exception { .when(header("CamelHttpResponseCode").startsWith("2")) .log(LoggingLevel.INFO, "Callback sending was successful") .process(exchange -> { + List phases = (List) exchange.getProperty(PHASES); + exchange.setProperty(CALLBACK_RESPONSE_CODE, exchange.getIn() .getHeader(Exchange.HTTP_RESPONSE_CODE)); exchange.setProperty(CALLBACK_RETRY, 1); exchange.setProperty(CALLBACK_SUCCESS, true); + eliminatePhases(exchange,phases, (Integer)exchange.getProperty(PHASE_COUNT), + (Integer)exchange.getProperty(COMPLETION_RATE)); }) .otherwise() .log(LoggingLevel.ERROR, "Callback request was unsuccessful") .process(exchange -> { - if (exchange.getProperty(CALLBACK_RETRY).equals(MAX_STATUS_RETRY)) { + if (exchange.getProperty(CALLBACK_RETRY).equals(exchange.getProperty(MAX_CALLBACK_RETRY))) { + List phases = (List) exchange.getProperty(PHASES); logger.info("Retry Exhausted, setting Callback as Failed"); + eliminatePhases(exchange,phases, (Integer)exchange.getProperty(PHASE_COUNT), + (Integer)exchange.getProperty(COMPLETION_RATE)); + exchange.setProperty(CALLBACK_RESPONSE_CODE, exchange.getIn() + .getHeader(Exchange.HTTP_RESPONSE_CODE)); } else { int retry = (int) exchange.getProperty(CALLBACK_RETRY); retry++; logger.info("Retry Left {}, Setting Callback as Failed and Retrying...", - ((int) exchange.getProperty(MAX_STATUS_RETRY) - retry)); + ((int) exchange.getProperty(MAX_CALLBACK_RETRY) - retry)); exchange.setProperty(CALLBACK_RETRY, retry); exchange.setProperty(CALLBACK_RESPONSE_CODE, exchange.getIn() .getHeader(Exchange.HTTP_RESPONSE_CODE)); @@ -76,5 +85,15 @@ public void configure() throws Exception { .log("Reached Simulation"); } + public void eliminatePhases(Exchange exchange, List phases, int phaseCount, int completionRate){ + int i = 0; + while(phases.size() > 0 && phases.size() > i){ + if(phases.get(i) <= completionRate) + phases.remove(i); + i++; + } + exchange.setProperty(PHASES,phases); + exchange.setProperty(PHASE_COUNT,phaseCount); + } } diff --git a/src/main/java/org/mifos/processor/bulk/utility/PhaseUtils.java b/src/main/java/org/mifos/processor/bulk/utility/PhaseUtils.java new file mode 100644 index 00000000..e711ab23 --- /dev/null +++ b/src/main/java/org/mifos/processor/bulk/utility/PhaseUtils.java @@ -0,0 +1,25 @@ +package org.mifos.processor.bulk.utility; + + +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.stereotype.Component; + +import java.util.List; + +@Component +@ConfigurationProperties(prefix = "callback_phases") +public class PhaseUtils { + + private List values; + + public List getValues() { + return values; + } + + public void setValues(List values) { + this.values = values; + + } + + +} diff --git a/src/main/java/org/mifos/processor/bulk/zeebe/ZeebeVariables.java b/src/main/java/org/mifos/processor/bulk/zeebe/ZeebeVariables.java index 07312f58..2bdf2d17 100644 --- a/src/main/java/org/mifos/processor/bulk/zeebe/ZeebeVariables.java +++ b/src/main/java/org/mifos/processor/bulk/zeebe/ZeebeVariables.java @@ -55,7 +55,7 @@ private ZeebeVariables() { public static final String FORMATTING_ENABLED = "formattingEnabled"; - public static final String SUCCESS_THRESHOLD_CHECK_ENABLED = "successThresholdCheckEnabled"; + public static final String COMPLETION_THRESHOLD_CHECK_ENABLED = "completionThresholdCheckEnabled"; public static final String MERGE_ENABLED = "mergeEnabled"; @@ -89,9 +89,9 @@ private ZeebeVariables() { public static final String CALLBACK_RETRY = "callbackRetryCount"; - public static final String SUCCESS_THRESHOLD = "successThreshold"; + public static final String COMPLETION_THRESHOLD = "completionThreshold"; - public static final String SUCCESS_RATE = "successRate"; + public static final String COMPLETION_RATE = "completionRate"; public static final String ERROR_CODE = "errorCode"; @@ -105,7 +105,13 @@ private ZeebeVariables() { public static final String CALLBACK_URL = "callbackUrl"; + public static final String MAX_CALLBACK_RETRY = "maxCallbackRetry"; + public static final String BULK_NOTIF_SUCCESS = "isNotificationsSuccessEnabled"; public static final String BULK_NOTIF_FAILURE = "isNotificationsFailureEnabled"; + + public static final String PHASES = "phases"; + + public static final String PHASE_COUNT = "phaseCount"; } diff --git a/src/main/java/org/mifos/processor/bulk/zeebe/worker/BatchStatusWorker.java b/src/main/java/org/mifos/processor/bulk/zeebe/worker/BatchStatusWorker.java index 4945de5c..865f4efe 100644 --- a/src/main/java/org/mifos/processor/bulk/zeebe/worker/BatchStatusWorker.java +++ b/src/main/java/org/mifos/processor/bulk/zeebe/worker/BatchStatusWorker.java @@ -17,7 +17,7 @@ public void setup() { Map variables = job.getVariablesAsMap(); int retry = (int) variables.getOrDefault(RETRY, 0); - int successRate = (int) variables.getOrDefault(SUCCESS_RATE, 0); + int successRate = (int) variables.getOrDefault(COMPLETION_RATE, 0); Exchange exchange = new DefaultExchange(camelContext); exchange.setProperty(BATCH_ID, variables.get(BATCH_ID)); @@ -27,14 +27,14 @@ public void setup() { Boolean batchStatusFailed = exchange.getProperty(BATCH_STATUS_FAILED, Boolean.class); if (batchStatusFailed == null || !batchStatusFailed) { - successRate = exchange.getProperty(SUCCESS_RATE, Long.class).intValue(); + successRate = exchange.getProperty(COMPLETION_RATE, Long.class).intValue(); } else { variables.put(ERROR_CODE, exchange.getProperty(ERROR_CODE)); variables.put(ERROR_DESCRIPTION, exchange.getProperty(ERROR_DESCRIPTION)); logger.info("Error: {}, {}", variables.get(ERROR_CODE), variables.get(ERROR_DESCRIPTION)); } - variables.put(SUCCESS_RATE, successRate); + variables.put(COMPLETION_RATE, successRate); variables.put(RETRY, ++retry); logger.info("Retry: {} and Success Rate: {}", retry, successRate); diff --git a/src/main/java/org/mifos/processor/bulk/zeebe/worker/SendCallbackWorker.java b/src/main/java/org/mifos/processor/bulk/zeebe/worker/SendCallbackWorker.java index 58b8f578..bbb5ac60 100644 --- a/src/main/java/org/mifos/processor/bulk/zeebe/worker/SendCallbackWorker.java +++ b/src/main/java/org/mifos/processor/bulk/zeebe/worker/SendCallbackWorker.java @@ -19,13 +19,15 @@ public void setup() { newWorker(Worker.SEND_CALLBACK, (client, job) -> { Map variables = job.getVariablesAsMap(); - int retry = (int) variables.getOrDefault(CALLBACK_RETRY, 0); - + int retry = variables.getOrDefault(CALLBACK_RETRY, 0).equals(variables.get(MAX_STATUS_RETRY))?0: + (int) variables.getOrDefault(CALLBACK_RETRY, 0); Exchange exchange = new DefaultExchange(camelContext); - exchange.setProperty(MAX_STATUS_RETRY,variables.get(MAX_STATUS_RETRY)); + exchange.setProperty(MAX_CALLBACK_RETRY,variables.get(MAX_CALLBACK_RETRY)); exchange.setProperty(CALLBACK_RETRY,retry); exchange.setProperty(CALLBACK_URL,variables.get(CALLBACK_URL)); - exchange.setProperty(SUCCESS_RATE,variables.get(SUCCESS_RATE)); + exchange.setProperty(COMPLETION_RATE,variables.get(COMPLETION_RATE)); + exchange.setProperty(PHASES,variables.get(PHASES)); + exchange.setProperty(PHASE_COUNT,variables.get(PHASE_COUNT)); sendToCamelRoute(RouteId.SEND_CALLBACK, exchange); Boolean callbackSuccess = exchange.getProperty(CALLBACK_SUCCESS, Boolean.class); @@ -39,8 +41,10 @@ public void setup() { variables.put(CALLBACK_RETRY,exchange.getProperty(CALLBACK_RETRY)); variables.put(CALLBACK_RESPONSE_CODE,exchange.getProperty(CALLBACK_RESPONSE_CODE)); + variables.put(PHASE_COUNT,exchange.getProperty(PHASE_COUNT)); + variables.put(PHASES,exchange.getProperty(PHASES)); - logger.info("Retry: {} and Response Code {}", exchange.getProperty(CALLBACK_RETRY), exchange.getProperty(CALLBACK_RESPONSE_CODE)); + logger.debug("Retry: {} and Response Code {}", exchange.getProperty(CALLBACK_RETRY), exchange.getProperty(CALLBACK_RESPONSE_CODE)); client.newCompleteCommand(job.getKey()).variables(variables).send(); }); } diff --git a/src/main/java/org/mifos/processor/bulk/zeebe/worker/WorkerConfig.java b/src/main/java/org/mifos/processor/bulk/zeebe/worker/WorkerConfig.java index 1fc0de7a..50ac4c58 100644 --- a/src/main/java/org/mifos/processor/bulk/zeebe/worker/WorkerConfig.java +++ b/src/main/java/org/mifos/processor/bulk/zeebe/worker/WorkerConfig.java @@ -24,7 +24,7 @@ public class WorkerConfig { @Value("${config.mergeback.enable}") public boolean isMergeBackWorkerEnabled; - @Value("${config.success-threshold-check.enable}") - public boolean isSuccessThresholdCheckEnabled; + @Value("${config.completion-threshold-check.enable}") + public boolean isCompletionThresholdCheckEnabled; } diff --git a/src/main/resources/application.yaml b/src/main/resources/application.yaml index f6432821..056f00ab 100644 --- a/src/main/resources/application.yaml +++ b/src/main/resources/application.yaml @@ -72,7 +72,7 @@ bpmn: international-remittance-payee: "international_remittance_payee_process-{dfspid}" international-remittance-payer: "international_remittance_payer_process-{dfspid}" debit-party-process: "debit_party_process-{dfspid}" - bulk-processor: "bulk_processor-ibank-usa" + bulk-processor: "bulk_processor-{dfspid}" slcb: "slcb-{dfspid}" payment-mode: @@ -106,11 +106,21 @@ config: enable: false backpressure: enable: false - success-threshold-check: + completion-threshold-check: enable: false - success-threshold: 95 # in percentage - max-retry: 4 + completion-threshold: 95 # in percentage + max-retry: 4 #can be as high as 30 delay: 60 # in seconds callback: + max-retry: 3 url: "http://httpstat.us/503" + + +callback_phases: + values: + - 20 + - 40 + - 60 + - 80 + - 100 \ No newline at end of file