From a88956cc5d7aad082dcc4bf6a8020cd52c7ec10c Mon Sep 17 00:00:00 2001 From: Keyur Karnik Date: Tue, 29 Jan 2019 14:37:33 -0800 Subject: [PATCH] Made entity export threads configurable Enables configuring number of threads for fetching entities, assets and entity members This fixes issue USERGRID-1356 --- .../org/apache/usergrid/tools/Export.java | 91 +++++++++++++++++-- 1 file changed, 81 insertions(+), 10 deletions(-) diff --git a/stack/tools/src/main/java/org/apache/usergrid/tools/Export.java b/stack/tools/src/main/java/org/apache/usergrid/tools/Export.java index b07d09d6b6..9a6794549f 100644 --- a/stack/tools/src/main/java/org/apache/usergrid/tools/Export.java +++ b/stack/tools/src/main/java/org/apache/usergrid/tools/Export.java @@ -41,6 +41,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.MissingOptionException; import org.apache.commons.cli.Option; import org.apache.commons.cli.OptionBuilder; import org.apache.commons.cli.Options; @@ -59,7 +60,6 @@ import org.apache.usergrid.persistence.collection.EntityCollectionManager; import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory; import org.apache.usergrid.persistence.core.scope.ApplicationScope; -import org.apache.usergrid.persistence.graph.Edge; import org.apache.usergrid.persistence.graph.GraphManager; import org.apache.usergrid.persistence.graph.SearchByEdgeType; import org.apache.usergrid.persistence.graph.impl.SimpleEdge; @@ -78,6 +78,7 @@ import com.fasterxml.jackson.core.JsonGenerator; import com.google.common.base.Optional; import com.google.common.collect.BiMap; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.gson.Gson; import com.google.gson.GsonBuilder; @@ -89,7 +90,8 @@ public class Export extends ExportingToolBase { static final Logger logger = LoggerFactory.getLogger( Export.class ); - public static final String LAST_ID = "lastId"; + private static final String ENTITY_FETCHER_THREADS = "entityFetchThreads"; + private static final String ENTITY_MEMBER_FETCHER_MULT = "entityThreadMult"; @Autowired @@ -100,11 +102,23 @@ public class Export extends ExportingToolBase { private AllEntityIdsObservable allEntityIdsObs; private SimpleEdge lastEdge = null; + //number of threads for fetching entity contents. Each thread will handle a batch of 1000 entity ids + private int entityFetcherThreads = 50; + //after an individual entity is fetched, the entity members like assets, connections etc need to be fetched + //depending on how heavy the assets/connections might be, we might need to multiply the factor so that more threads are allocated + //for pulling the members quickly without the queue backing up. + private int entityMemberFetcherMultiplier = 1; + + //TODO : Add blocking queues for these executors where appropriate - private ExecutorService orgAppCollParallelizer = Executors.newFixedThreadPool(3); - private ExecutorService entityFetcher = Executors.newFixedThreadPool(10); - private ExecutorService enitityMemberFetcher = Executors.newFixedThreadPool(10); - private ExecutorService assetsFetcher = Executors.newFixedThreadPool(10); + private ExecutorService orgAppCollParallelizer; + + //fetches the entity content + private ExecutorService entityFetcher; + //fetches the entity members like connections etc for a given entity + private ExecutorService entityMemberFetcher; + //fetches the assets for a given entity + private ExecutorService assetsFetcher; @Override @@ -113,13 +127,70 @@ public Options createOptions() { Options options = super.createOptions(); - Option lastId = OptionBuilder.withArgName( LAST_ID ).hasArg() - .withDescription( "Last Entity Id to resume from" ).create( LAST_ID ); - options.addOption( lastId); + + Option entityFetcherThreads = OptionBuilder.withArgName( ENTITY_FETCHER_THREADS ).hasArg() + .withDescription( "Number of threads to fetch entities in parallel (defaults to 50)" ).create( ENTITY_FETCHER_THREADS ); + options.addOption( entityFetcherThreads); + + Option entityMemberFetcherMultiplier = OptionBuilder.withArgName( ENTITY_MEMBER_FETCHER_MULT ).hasArg() + .withDescription( "This defines the number of threads for fetching entity members like assets/collections by multiplying the number of entity fetcher threads. Defaults to 1" ).create( ENTITY_MEMBER_FETCHER_MULT ); + options.addOption( entityMemberFetcherMultiplier); return options; } + + @Override + protected void validateOptions(CommandLine line) throws MissingOptionException { + super.validateOptions(line); + + if (line.hasOption(ENTITY_FETCHER_THREADS)) { + try { + Integer.parseInt(line.getOptionValue(ENTITY_FETCHER_THREADS)); + } catch (NumberFormatException e) { + throw new MissingOptionException("Entity fetcher threads need to be a positive integer"); + } + } + + if (line.hasOption(ENTITY_MEMBER_FETCHER_MULT)) { + try { + Integer.parseInt(line.getOptionValue(ENTITY_MEMBER_FETCHER_MULT)); + } catch (NumberFormatException e) { + throw new MissingOptionException("Entity member thread multiplier needs to be a positive integer"); + } + } + } + + @Override + protected void applyExportParams(CommandLine line) { + + super.applyExportParams(line); + + if (line.hasOption(ENTITY_FETCHER_THREADS)) { + entityFetcherThreads = Integer.parseInt(line.getOptionValue(ENTITY_FETCHER_THREADS)); + + if (entityFetcherThreads < 1) { + entityFetcherThreads = 50; + } + } + + if (line.hasOption(ENTITY_MEMBER_FETCHER_MULT)) { + entityMemberFetcherMultiplier = Integer.parseInt(line.getOptionValue(ENTITY_MEMBER_FETCHER_MULT)); + + if (entityMemberFetcherMultiplier < 1) { + entityMemberFetcherMultiplier = 1; + } + if (entityMemberFetcherMultiplier > 5) { + entityMemberFetcherMultiplier = 5; + } + } + + orgAppCollParallelizer = Executors.newFixedThreadPool(5, new ThreadFactoryBuilder().setNameFormat("OrgAppColl-Parallelizer-%d").build()); + entityFetcher = Executors.newFixedThreadPool(entityFetcherThreads, new ThreadFactoryBuilder().setNameFormat("Export-EntityFetcher-%d").build()); + entityMemberFetcher = Executors.newFixedThreadPool(entityFetcherThreads * entityMemberFetcherMultiplier, new ThreadFactoryBuilder().setNameFormat("Export-EntityMemberFetcher-%d").build()); + assetsFetcher = Executors.newFixedThreadPool(entityFetcherThreads * entityMemberFetcherMultiplier, new ThreadFactoryBuilder().setNameFormat("Export-AssetFetcher-%d").build()); + + } @Override public void runTool( CommandLine line ) throws Exception { @@ -432,7 +503,7 @@ private void extractEntityIdsForCollection(File collectionDir, UUID applicationI ConnectableObservable entityObs = Observable.just(entities) .publish(); - entityObs.subscribeOn(Schedulers.from(enitityMemberFetcher)); + entityObs.subscribeOn(Schedulers.from(entityMemberFetcher)); // fetch and write connections