Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,12 @@ private CamelProperties() {}

public static final String LOCAL_FILE_PATH = "localFilePath";

public static final String SUB_BATCH_FILE_ARRAY = "subBatchFileArray";

public static final String SUB_BATCH_COUNT = "subBatchCount";

public static final String SUB_BATCH_CREATED = "subBatchCreated";

public static final String SERVER_SUB_BATCH_FILE_NAME_ARRAY = "serverSubBatchFileName";

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import static org.mifos.processor.bulk.camel.config.CamelProperties.SERVER_FILE_NAME;

@Component
public class FileDownloadingRoute extends BaseRouteBuilder{
public class FileRoute extends BaseRouteBuilder {

@Autowired
@Qualifier("awsStorage")
Expand All @@ -24,8 +24,14 @@ public class FileDownloadingRoute extends BaseRouteBuilder{

@Override
public void configure() throws Exception {

/**
* Downloads the file from cloud, stores in local and returns the file path
* Input the file name through exchange variable: [SERVER_FILE_NAME]
* Output the local file path through exchange variable: [LOCAL_FILE_PATH]
*/
from("direct:download-file")
.id("download-file")
.id("direct:download-file")
.log("Started download-file route")
.process(exchange -> {
String filename = exchange.getProperty(SERVER_FILE_NAME, String.class);
Expand All @@ -37,5 +43,20 @@ public void configure() throws Exception {
}
exchange.setProperty(LOCAL_FILE_PATH, file.getAbsolutePath());
});

/**
* Uploads the file to cloud and returns the file name in cloud
* Input the local file path through exchange variable: [LOCAL_FILE_PATH]
* Output the server file name through exchange variable: [SERVER_FILE_NAME]
*/
from("direct:upload-file")
.id("direct:upload-file")
.log("Uploading file")
.process(exchange -> {
String filepath = exchange.getProperty(LOCAL_FILE_PATH, String.class);
String serverFileName = fileTransferService.uploadFile(new File(filepath), bucketName);
exchange.setProperty(SERVER_FILE_NAME, serverFileName);
logger.info("Uploaded file: {}", serverFileName);
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package org.mifos.processor.bulk.camel.routes;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.FileWriter;
import java.util.ArrayList;
import java.util.List;

import static org.mifos.processor.bulk.camel.config.CamelProperties.*;
import static org.mifos.processor.bulk.zeebe.ZeebeVariables.SPLITTING_FAILED;

@Component
public class SplittingRoute extends BaseRouteBuilder {

@Value("${config.splitting.sub-batch-size}")
private int subBatchSize;

@Override
public void configure() throws Exception {

/**
* Base route for starting the splitting process. Refer below routes for more info
* 1. direct:create-sub-batch-file
* 2. direct:upload-sub-batch-file
*/
from(RouteId.SPLITTING.getValue())
.id(RouteId.SPLITTING.getValue())
.log("Starting route " + RouteId.SPLITTING.name())
.process(exchange -> exchange.setProperty(LOCAL_FILE_PATH, "1659424507931_ph-ee-bulk-demo-5.csv"))
.to("direct:create-sub-batch-file")
.choice()
.when(exchange -> exchange.getProperty(SUB_BATCH_CREATED, Boolean.class))
.to("direct:upload-sub-batch-file")
.otherwise()
.log("No sub batch created, so skipping upload")
.end()
.process(exchange -> exchange.setProperty(SPLITTING_FAILED, false));

/**
* Creates the sub-batch CSVs
*/
from("direct:create-sub-batch-file")
.id("direct:create-sub-batch-file")
.log("Creating sub-batch file")
.process(exchange -> {
String filepath = exchange.getProperty(LOCAL_FILE_PATH, String.class);
BufferedReader reader = new BufferedReader(new FileReader(filepath));
String header = reader.readLine() + System.lineSeparator();
List<String> lines = new ArrayList<>();
String line = null;
while ((line = reader.readLine()) != null) {
lines.add(line);
}

if (lines.size() <= subBatchSize) {
exchange.setProperty(SUB_BATCH_CREATED, false);
logger.info("Skipping creating sub batch, as batch size is less than configured sub-batch size");
return;
}

List<String> subBatchFile = new ArrayList<>();
int subBatchCount = 1;
for (int i = 0; i < lines.size(); i+=subBatchSize) {
String filename = "sub-batch-"+subBatchCount+".csv";
FileWriter writer = new FileWriter(filename);
writer.write(header);
for (int j = i; j < Math.min(i + subBatchSize, lines.size()); j++) {
writer.write(lines.get(j) + System.lineSeparator());
}
writer.close();
logger.info("Created sub-batch with file name {}", filename);
subBatchFile.add(filename);
subBatchCount++;
}
exchange.setProperty(SUB_BATCH_FILE_ARRAY, subBatchFile);
exchange.setProperty(SUB_BATCH_COUNT, subBatchFile.size());
exchange.setProperty(SUB_BATCH_CREATED, true);
exchange.setProperty(SERVER_SUB_BATCH_FILE_NAME_ARRAY, new ArrayList<String>());
});

/**
* Iterate through each CSVs of sub-batches and uploads in cloud
*/
from("direct:upload-sub-batch-file")
.id("direct:upload-sub-batch-file")
.log("Starting upload of sub-batch file")
.loopDoWhile(exchange -> exchange.getProperty(SUB_BATCH_FILE_ARRAY, List.class).size() > 0)
.process(exchange -> {
List<String> subBatchFile = exchange.getProperty(SUB_BATCH_FILE_ARRAY, List.class);
exchange.setProperty(LOCAL_FILE_PATH, subBatchFile.remove(0));
exchange.setProperty(SUB_BATCH_FILE_ARRAY, subBatchFile);
})
.to("direct:upload-file")
.process(exchange -> {
String serverFilename = exchange.getProperty(SERVER_FILE_NAME, String.class);
List<String> serverSubBatchFile = exchange.getProperty(SERVER_SUB_BATCH_FILE_NAME_ARRAY, List.class);
serverSubBatchFile.add(serverFilename);
exchange.setProperty(SERVER_SUB_BATCH_FILE_NAME_ARRAY, serverSubBatchFile);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,10 @@ private ZeebeVariables() {
public static final String SPLITTING_FAILED = "splittingFailed";
public static final String FORMATTING_FAILED = "formattingFailed";
public static final String MERGE_FAILED = "mergeFailed";

public static final String FILE_NAME = "filename";

public static final String REQUEST_ID = "requestId";

public static final String SUB_BATCHES = "subBatches";
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public abstract class BaseWorker {
private int workerMaxJobs;

@Autowired
private CamelContext camelContext;
protected CamelContext camelContext;

@Autowired
private ProducerTemplate producerTemplate;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,56 @@
package org.mifos.processor.bulk.zeebe.worker;

import org.apache.camel.Exchange;
import org.apache.camel.support.DefaultExchange;
import org.mifos.processor.bulk.camel.routes.RouteId;
import org.springframework.stereotype.Component;

import java.util.List;
import java.util.Map;

import static org.mifos.processor.bulk.zeebe.ZeebeVariables.MERGE_FAILED;
import static org.mifos.processor.bulk.zeebe.ZeebeVariables.SPLITTING_FAILED;
import static org.mifos.processor.bulk.camel.config.CamelProperties.*;
import static org.mifos.processor.bulk.zeebe.ZeebeVariables.*;

@Component
public class SplittingWorker extends BaseWorker {

@Override
public void setup() {

/**
* This worker performs below tasks
* 1. Downloads the original CSV from cloud
* 2. Splits entire CSV into multiple CSV of sub-batches, based on configured sub-batch size.
* 3. Uploads the sub-batch CSVs to cloud
* 4. Sets zeebeVariable [SPLITTING_FAILED, SUB_BATCHES, SUB_BATCH_CREATED]
*/
newWorker(Worker.SPLITTING, (client, job) -> {
Map<String, Object> variables = job.getVariablesAsMap();
if (workerConfig.isSplittingWorkerEnabled) {
variables.put(SPLITTING_FAILED, false);
}

String filename = (String) variables.get(FILE_NAME);
Exchange exchange = new DefaultExchange(camelContext);
exchange.setProperty(SERVER_FILE_NAME, filename);

try {
sendToCamelRoute(RouteId.SPLITTING, exchange);
assert !exchange.getProperty(SPLITTING_FAILED, Boolean.class);
} catch (Exception e) {
variables.put(SPLITTING_FAILED, true);
}

Boolean subBatchCreated = exchange.getProperty(SUB_BATCH_CREATED, Boolean.class);
List<String> serverSubBatchFile = exchange.getProperty(SERVER_SUB_BATCH_FILE_NAME_ARRAY, List.class);
if (!subBatchCreated && serverSubBatchFile.isEmpty()) {
// if no sub-batches is created, insert the original filename in sub batch array
serverSubBatchFile.add(filename);
}

variables.put(SUB_BATCHES, serverSubBatchFile);
variables.put(SUB_BATCH_CREATED, subBatchCreated);

client.newCompleteCommand(job.getKey()).variables(variables).send();
});
}
Expand Down
2 changes: 1 addition & 1 deletion src/main/resources/application.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ config:
enable: true
splitting:
enable: false
batch-size: 10
sub-batch-size: 5
formatting:
enable: true
mergeback:
Expand Down