Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions src/main/java/org/mifos/processor/bulk/camel/routes/Routes.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package org.mifos.processor.bulk.camel.routes;


import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.util.json.JsonArray;
import org.springframework.stereotype.Component;

@Component
public class Routes extends RouteBuilder {


@Override
public void configure() throws Exception {
routeCheckTransactions();
routeSampleTransactions();
}

private void routeCheckTransactions(){
String id = "check-transactions";
from("direct:"+id)
.id(id)
.log("Fetching transaction details")
//set request params
.to("/api/v1/batch/transactions")
.process(exchange -> {
// get response body
// check successful transactions >= x%
// set zeebe variable readyForSample = true
});
}

private void routeSampleTransactions(){
String id = "sample-transactions";
from("direct:" + id)
.id(id)
.log("Fetching transaction details")
//set request params
.to("/api/v1/batch/transactions")
.process(exchange -> {
// get response body
// sample transactions
// store the sampled transaction ids in zeebe variable
});
}
}
62 changes: 61 additions & 1 deletion src/main/java/org/mifos/processor/bulk/zeebe/ZeebeWorkers.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@
import com.fasterxml.jackson.dataformat.csv.CsvSchema;
import io.camunda.zeebe.client.ZeebeClient;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.camunda.zeebe.client.api.response.ActivatedJob;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.support.DefaultExchange;
import org.mifos.processor.bulk.file.FileTransferService;
import org.mifos.processor.bulk.schema.Transaction;
import org.slf4j.Logger;
Expand All @@ -16,9 +21,10 @@
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.util.List;
import java.util.Map;

import static org.mifos.processor.bulk.zeebe.ZeebeVariables.BATCH_ID;
import static org.mifos.processor.bulk.zeebe.ZeebeVariables.*;

@Component
public class ZeebeWorkers {
Expand Down Expand Up @@ -52,9 +58,20 @@ public class ZeebeWorkers {
@Value(value = "${kafka.topic.slcb.name}")
private String slcbTopicName;

@Autowired
private CamelContext camelContext;

@Autowired
private ProducerTemplate producerTemplate;

@PostConstruct
public void setupWorkers() {
workerBulkProcessor();
workerCheckTransactions();
workerSampleTransactions();
}

private void workerBulkProcessor(){
zeebeClient.newWorker()
.jobType("bulk-processor")
.handler((client, job) -> {
Expand Down Expand Up @@ -85,6 +102,49 @@ else if (current.getPayment_mode().equals("sclb"))
.name("bulk-processor")
.maxJobsActive(workerMaxJobs)
.open();
}

private void workerCheckTransactions(){
String jobType = "check-transactions";
zeebeClient.newWorker()
.jobType(jobType)
.handler((client, job) -> {
logger.info("Job '{}' started from process '{}' with key {}", job.getType(), job.getBpmnProcessId(), job.getKey());

Map<String, Object> variables = job.getVariablesAsMap();
String batchId = (String) variables.get(BATCH_ID);

Exchange exchange = new DefaultExchange(camelContext);
exchange.setProperty("batchId", batchId);
producerTemplate.send("direct:check-transactions", exchange);

client.newCompleteCommand(job.getKey())
.send();
})
.name(jobType)
.maxJobsActive(workerMaxJobs)
.open();
}

private void workerSampleTransactions(){
String jobType = "sample-transactions";
zeebeClient.newWorker()
.jobType(jobType)
.handler((client, job) -> {
logger.info("Job '{}' started from process '{}' with key {}", job.getType(), job.getBpmnProcessId(), job.getKey());

Map<String, Object> variables = job.getVariablesAsMap();
String batchId = (String) variables.get(BATCH_ID);

Exchange exchange = new DefaultExchange(camelContext);
exchange.setProperty("batchId", batchId);
producerTemplate.send("direct:sample-transactions", exchange);

client.newCompleteCommand(job.getKey())
.send();
})
.name(jobType)
.maxJobsActive(workerMaxJobs)
.open();
}
}