diff --git a/src/main/java/org/mifos/processor/bulk/camel/config/CamelProperties.java b/src/main/java/org/mifos/processor/bulk/camel/config/CamelProperties.java index 665af581..f1f85017 100644 --- a/src/main/java/org/mifos/processor/bulk/camel/config/CamelProperties.java +++ b/src/main/java/org/mifos/processor/bulk/camel/config/CamelProperties.java @@ -11,4 +11,12 @@ private CamelProperties() {} public static final String LOCAL_FILE_PATH = "localFilePath"; + public static final String SUB_BATCH_FILE_ARRAY = "subBatchFileArray"; + + public static final String SUB_BATCH_COUNT = "subBatchCount"; + + public static final String SUB_BATCH_CREATED = "subBatchCreated"; + + public static final String SERVER_SUB_BATCH_FILE_NAME_ARRAY = "serverSubBatchFileName"; + } diff --git a/src/main/java/org/mifos/processor/bulk/camel/routes/FileDownloadingRoute.java b/src/main/java/org/mifos/processor/bulk/camel/routes/FileRoute.java similarity index 55% rename from src/main/java/org/mifos/processor/bulk/camel/routes/FileDownloadingRoute.java rename to src/main/java/org/mifos/processor/bulk/camel/routes/FileRoute.java index 353545ad..a204c06c 100644 --- a/src/main/java/org/mifos/processor/bulk/camel/routes/FileDownloadingRoute.java +++ b/src/main/java/org/mifos/processor/bulk/camel/routes/FileRoute.java @@ -13,7 +13,7 @@ import static org.mifos.processor.bulk.camel.config.CamelProperties.SERVER_FILE_NAME; @Component -public class FileDownloadingRoute extends BaseRouteBuilder{ +public class FileRoute extends BaseRouteBuilder { @Autowired @Qualifier("awsStorage") @@ -24,8 +24,14 @@ public class FileDownloadingRoute extends BaseRouteBuilder{ @Override public void configure() throws Exception { + + /** + * Downloads the file from cloud, stores in local and returns the file path + * Input the file name through exchange variable: [SERVER_FILE_NAME] + * Output the local file path through exchange variable: [LOCAL_FILE_PATH] + */ from("direct:download-file") - .id("download-file") + .id("direct:download-file") .log("Started download-file route") .process(exchange -> { String filename = exchange.getProperty(SERVER_FILE_NAME, String.class); @@ -37,5 +43,20 @@ public void configure() throws Exception { } exchange.setProperty(LOCAL_FILE_PATH, file.getAbsolutePath()); }); + + /** + * Uploads the file to cloud and returns the file name in cloud + * Input the local file path through exchange variable: [LOCAL_FILE_PATH] + * Output the server file name through exchange variable: [SERVER_FILE_NAME] + */ + from("direct:upload-file") + .id("direct:upload-file") + .log("Uploading file") + .process(exchange -> { + String filepath = exchange.getProperty(LOCAL_FILE_PATH, String.class); + String serverFileName = fileTransferService.uploadFile(new File(filepath), bucketName); + exchange.setProperty(SERVER_FILE_NAME, serverFileName); + logger.info("Uploaded file: {}", serverFileName); + }); } } diff --git a/src/main/java/org/mifos/processor/bulk/camel/routes/SplittingRoute.java b/src/main/java/org/mifos/processor/bulk/camel/routes/SplittingRoute.java new file mode 100644 index 00000000..4dd6b691 --- /dev/null +++ b/src/main/java/org/mifos/processor/bulk/camel/routes/SplittingRoute.java @@ -0,0 +1,103 @@ +package org.mifos.processor.bulk.camel.routes; + +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; +import java.io.BufferedReader; +import java.io.FileReader; +import java.io.FileWriter; +import java.util.ArrayList; +import java.util.List; + +import static org.mifos.processor.bulk.camel.config.CamelProperties.*; +import static org.mifos.processor.bulk.zeebe.ZeebeVariables.SPLITTING_FAILED; + +@Component +public class SplittingRoute extends BaseRouteBuilder { + + @Value("${config.splitting.sub-batch-size}") + private int subBatchSize; + + @Override + public void configure() throws Exception { + + /** + * Base route for starting the splitting process. Refer below routes for more info + * 1. direct:create-sub-batch-file + * 2. direct:upload-sub-batch-file + */ + from(RouteId.SPLITTING.getValue()) + .id(RouteId.SPLITTING.getValue()) + .log("Starting route " + RouteId.SPLITTING.name()) + .process(exchange -> exchange.setProperty(LOCAL_FILE_PATH, "1659424507931_ph-ee-bulk-demo-5.csv")) + .to("direct:create-sub-batch-file") + .choice() + .when(exchange -> exchange.getProperty(SUB_BATCH_CREATED, Boolean.class)) + .to("direct:upload-sub-batch-file") + .otherwise() + .log("No sub batch created, so skipping upload") + .end() + .process(exchange -> exchange.setProperty(SPLITTING_FAILED, false)); + + /** + * Creates the sub-batch CSVs + */ + from("direct:create-sub-batch-file") + .id("direct:create-sub-batch-file") + .log("Creating sub-batch file") + .process(exchange -> { + String filepath = exchange.getProperty(LOCAL_FILE_PATH, String.class); + BufferedReader reader = new BufferedReader(new FileReader(filepath)); + String header = reader.readLine() + System.lineSeparator(); + List lines = new ArrayList<>(); + String line = null; + while ((line = reader.readLine()) != null) { + lines.add(line); + } + + if (lines.size() <= subBatchSize) { + exchange.setProperty(SUB_BATCH_CREATED, false); + logger.info("Skipping creating sub batch, as batch size is less than configured sub-batch size"); + return; + } + + List subBatchFile = new ArrayList<>(); + int subBatchCount = 1; + for (int i = 0; i < lines.size(); i+=subBatchSize) { + String filename = "sub-batch-"+subBatchCount+".csv"; + FileWriter writer = new FileWriter(filename); + writer.write(header); + for (int j = i; j < Math.min(i + subBatchSize, lines.size()); j++) { + writer.write(lines.get(j) + System.lineSeparator()); + } + writer.close(); + logger.info("Created sub-batch with file name {}", filename); + subBatchFile.add(filename); + subBatchCount++; + } + exchange.setProperty(SUB_BATCH_FILE_ARRAY, subBatchFile); + exchange.setProperty(SUB_BATCH_COUNT, subBatchFile.size()); + exchange.setProperty(SUB_BATCH_CREATED, true); + exchange.setProperty(SERVER_SUB_BATCH_FILE_NAME_ARRAY, new ArrayList()); + }); + + /** + * Iterate through each CSVs of sub-batches and uploads in cloud + */ + from("direct:upload-sub-batch-file") + .id("direct:upload-sub-batch-file") + .log("Starting upload of sub-batch file") + .loopDoWhile(exchange -> exchange.getProperty(SUB_BATCH_FILE_ARRAY, List.class).size() > 0) + .process(exchange -> { + List subBatchFile = exchange.getProperty(SUB_BATCH_FILE_ARRAY, List.class); + exchange.setProperty(LOCAL_FILE_PATH, subBatchFile.remove(0)); + exchange.setProperty(SUB_BATCH_FILE_ARRAY, subBatchFile); + }) + .to("direct:upload-file") + .process(exchange -> { + String serverFilename = exchange.getProperty(SERVER_FILE_NAME, String.class); + List serverSubBatchFile = exchange.getProperty(SERVER_SUB_BATCH_FILE_NAME_ARRAY, List.class); + serverSubBatchFile.add(serverFilename); + exchange.setProperty(SERVER_SUB_BATCH_FILE_NAME_ARRAY, serverSubBatchFile); + }); + } +} diff --git a/src/main/java/org/mifos/processor/bulk/zeebe/ZeebeVariables.java b/src/main/java/org/mifos/processor/bulk/zeebe/ZeebeVariables.java index b031db71..7cfd5632 100644 --- a/src/main/java/org/mifos/processor/bulk/zeebe/ZeebeVariables.java +++ b/src/main/java/org/mifos/processor/bulk/zeebe/ZeebeVariables.java @@ -31,4 +31,10 @@ private ZeebeVariables() { public static final String SPLITTING_FAILED = "splittingFailed"; public static final String FORMATTING_FAILED = "formattingFailed"; public static final String MERGE_FAILED = "mergeFailed"; + + public static final String FILE_NAME = "filename"; + + public static final String REQUEST_ID = "requestId"; + + public static final String SUB_BATCHES = "subBatches"; } diff --git a/src/main/java/org/mifos/processor/bulk/zeebe/worker/BaseWorker.java b/src/main/java/org/mifos/processor/bulk/zeebe/worker/BaseWorker.java index fe0d7abe..c225d24d 100644 --- a/src/main/java/org/mifos/processor/bulk/zeebe/worker/BaseWorker.java +++ b/src/main/java/org/mifos/processor/bulk/zeebe/worker/BaseWorker.java @@ -33,7 +33,7 @@ public abstract class BaseWorker { private int workerMaxJobs; @Autowired - private CamelContext camelContext; + protected CamelContext camelContext; @Autowired private ProducerTemplate producerTemplate; diff --git a/src/main/java/org/mifos/processor/bulk/zeebe/worker/SplittingWorker.java b/src/main/java/org/mifos/processor/bulk/zeebe/worker/SplittingWorker.java index f16c02ae..06e2cf58 100644 --- a/src/main/java/org/mifos/processor/bulk/zeebe/worker/SplittingWorker.java +++ b/src/main/java/org/mifos/processor/bulk/zeebe/worker/SplittingWorker.java @@ -1,20 +1,56 @@ package org.mifos.processor.bulk.zeebe.worker; +import org.apache.camel.Exchange; +import org.apache.camel.support.DefaultExchange; +import org.mifos.processor.bulk.camel.routes.RouteId; +import org.springframework.stereotype.Component; + +import java.util.List; import java.util.Map; -import static org.mifos.processor.bulk.zeebe.ZeebeVariables.MERGE_FAILED; -import static org.mifos.processor.bulk.zeebe.ZeebeVariables.SPLITTING_FAILED; +import static org.mifos.processor.bulk.camel.config.CamelProperties.*; +import static org.mifos.processor.bulk.zeebe.ZeebeVariables.*; +@Component public class SplittingWorker extends BaseWorker { @Override public void setup() { + + /** + * This worker performs below tasks + * 1. Downloads the original CSV from cloud + * 2. Splits entire CSV into multiple CSV of sub-batches, based on configured sub-batch size. + * 3. Uploads the sub-batch CSVs to cloud + * 4. Sets zeebeVariable [SPLITTING_FAILED, SUB_BATCHES, SUB_BATCH_CREATED] + */ newWorker(Worker.SPLITTING, (client, job) -> { Map variables = job.getVariablesAsMap(); if (workerConfig.isSplittingWorkerEnabled) { variables.put(SPLITTING_FAILED, false); } + String filename = (String) variables.get(FILE_NAME); + Exchange exchange = new DefaultExchange(camelContext); + exchange.setProperty(SERVER_FILE_NAME, filename); + + try { + sendToCamelRoute(RouteId.SPLITTING, exchange); + assert !exchange.getProperty(SPLITTING_FAILED, Boolean.class); + } catch (Exception e) { + variables.put(SPLITTING_FAILED, true); + } + + Boolean subBatchCreated = exchange.getProperty(SUB_BATCH_CREATED, Boolean.class); + List serverSubBatchFile = exchange.getProperty(SERVER_SUB_BATCH_FILE_NAME_ARRAY, List.class); + if (!subBatchCreated && serverSubBatchFile.isEmpty()) { + // if no sub-batches is created, insert the original filename in sub batch array + serverSubBatchFile.add(filename); + } + + variables.put(SUB_BATCHES, serverSubBatchFile); + variables.put(SUB_BATCH_CREATED, subBatchCreated); + client.newCompleteCommand(job.getKey()).variables(variables).send(); }); } diff --git a/src/main/resources/application.yaml b/src/main/resources/application.yaml index 3a9bd630..ab189bc9 100644 --- a/src/main/resources/application.yaml +++ b/src/main/resources/application.yaml @@ -67,7 +67,7 @@ config: enable: true splitting: enable: false - batch-size: 10 + sub-batch-size: 5 formatting: enable: true mergeback: