diff --git a/.gitignore b/.gitignore
index 308eac312..55936c48a 100644
--- a/.gitignore
+++ b/.gitignore
@@ -85,6 +85,7 @@ output/
tree.txt
*.versionsBackup
.flattened-pom.xml
+*.truststore
# eclipse ignore
.settings/
diff --git a/hugegraph-dist/scripts/dependency/known-dependencies.txt b/hugegraph-dist/scripts/dependency/known-dependencies.txt
index e827c1e88..0b38c41e2 100644
--- a/hugegraph-dist/scripts/dependency/known-dependencies.txt
+++ b/hugegraph-dist/scripts/dependency/known-dependencies.txt
@@ -283,6 +283,7 @@ orc-shims-1.5.8.jar
orc-shims-1.6.14.jar
ow2-asm-6.2.jar
paranamer-2.3.jar
+parboiled-core-1.1.8.jar
perfmark-api-0.23.0.jar
postgresql-42.2.6.jar
postgresql-42.4.1.jar
diff --git a/hugegraph-loader/pom.xml b/hugegraph-loader/pom.xml
index 339312e30..e3924bfde 100644
--- a/hugegraph-loader/pom.xml
+++ b/hugegraph-loader/pom.xml
@@ -52,6 +52,7 @@
42.4.1
7.2.0.jre8
1.19.0
+ 1.1.8
@@ -542,6 +543,11 @@
${kafka.testcontainer.version}
test
+
+ org.parboiled
+ parboiled-core
+ ${parboiled.version}
+
diff --git a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/HugeGraphLoader.java b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/HugeGraphLoader.java
index a46ff5923..2fb9eb4aa 100644
--- a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/HugeGraphLoader.java
+++ b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/HugeGraphLoader.java
@@ -20,20 +20,40 @@
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
-import java.util.Objects;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutorService;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
+import org.apache.hugegraph.loader.exception.ParseException;
+import org.apache.hugegraph.loader.progress.InputProgress;
+import org.apache.hugegraph.loader.task.GlobalExecutorManager;
import org.apache.hugegraph.loader.task.ParseTaskBuilder;
+import org.apache.hugegraph.loader.task.ParseTaskBuilder.ParseTask;
import org.apache.hugegraph.loader.task.TaskManager;
import org.apache.hugegraph.loader.util.HugeClientHolder;
import org.apache.hugegraph.loader.util.LoadUtil;
+import org.apache.hugegraph.structure.schema.SchemaLabel;
+import org.apache.hugegraph.util.ExecutorUtil;
import org.apache.hugegraph.loader.util.Printer;
+import org.apache.hugegraph.structure.schema.EdgeLabel;
+import org.apache.hugegraph.structure.schema.IndexLabel;
+import org.apache.hugegraph.structure.schema.VertexLabel;
import org.slf4j.Logger;
import org.apache.hugegraph.driver.HugeClient;
+import org.apache.hugegraph.exception.ServerException;
import org.apache.hugegraph.loader.builder.Record;
import org.apache.hugegraph.loader.constant.Constants;
import org.apache.hugegraph.loader.constant.ElemType;
@@ -43,6 +63,8 @@
import org.apache.hugegraph.loader.executor.GroovyExecutor;
import org.apache.hugegraph.loader.executor.LoadContext;
import org.apache.hugegraph.loader.executor.LoadOptions;
+import org.apache.hugegraph.loader.filter.util.SchemaManagerProxy;
+import org.apache.hugegraph.loader.filter.util.ShortIdConfig;
import org.apache.hugegraph.loader.mapping.ElementMapping;
import org.apache.hugegraph.loader.mapping.InputStruct;
import org.apache.hugegraph.loader.mapping.LoadMapping;
@@ -50,7 +72,15 @@
import org.apache.hugegraph.loader.metrics.LoadSummary;
import org.apache.hugegraph.loader.reader.InputReader;
import org.apache.hugegraph.loader.reader.line.Line;
+import org.apache.hugegraph.loader.source.InputSource;
+import org.apache.hugegraph.loader.source.SourceType;
+import org.apache.hugegraph.loader.source.graph.GraphSource;
+import org.apache.hugegraph.structure.constant.HugeType;
+import org.apache.hugegraph.structure.schema.PropertyKey;
import org.apache.hugegraph.util.Log;
+import org.apache.hugegraph.util.JsonUtil;
+
+import com.google.common.collect.ImmutableList;
public final class HugeGraphLoader {
@@ -59,16 +89,40 @@ public final class HugeGraphLoader {
private final LoadContext context;
private final LoadMapping mapping;
private final TaskManager manager;
+ private final LoadOptions options;
+
+ public static class InputTaskItem {
+
+ public final InputReader reader;
+ public final InputStruct struct;
+ public final int structIndex;
+ public final int seqNumber;
+
+ public InputTaskItem(InputStruct struct, InputReader reader,
+ int structIndex, int seq) {
+ this.struct = struct;
+ this.reader = reader;
+ this.structIndex = structIndex;
+ this.seqNumber = seq;
+ }
+ }
public static void main(String[] args) {
HugeGraphLoader loader;
try {
loader = new HugeGraphLoader(args);
} catch (Throwable e) {
- Printer.printError("Failed to start loading", LoadUtil.targetRuntimeException(e));
- throw e;
+ Printer.printError("Failed to start loading", e);
+ System.exit(1);
+ return;
+ }
+
+ try {
+ loader.load();
+ } finally {
+ loader.shutdown();
+ GlobalExecutorManager.shutdown(loader.options.shutdownTimeout);
}
- loader.load();
}
public HugeGraphLoader(String[] args) {
@@ -77,10 +131,14 @@ public HugeGraphLoader(String[] args) {
public HugeGraphLoader(LoadOptions options) {
this(options, LoadMapping.of(options.file));
+ // Set concurrency
+ GlobalExecutorManager.setBatchThreadCount(options.batchInsertThreads);
+ GlobalExecutorManager.setSingleThreadCount(options.singleInsertThreads);
}
public HugeGraphLoader(LoadOptions options, LoadMapping mapping) {
this.context = new LoadContext(options);
+ this.options = options;
this.mapping = mapping;
this.manager = new TaskManager(this.context);
this.addShutdownHook();
@@ -97,10 +155,52 @@ public LoadContext context() {
return this.context;
}
+ private void checkGraphExists() {
+ HugeClient client = this.context.indirectClient();
+ String targetGraph = this.options.graph;
+ if (this.options.createGraph
+ && !client.graphs().listGraph().contains(targetGraph)) {
+ Map conf = new HashMap<>();
+ conf.put("store", targetGraph);
+ conf.put("backend", this.options.backend);
+ conf.put("serializer", this.options.serializer);
+ conf.put("task.scheduler_type", this.options.schedulerType);
+ conf.put("nickname", targetGraph);
+
+ client.graphs().createGraph(targetGraph, JsonUtil.toJson(conf));
+ LOG.info("Create graph " + targetGraph + " ......");
+ }
+ }
+
+ private void setGraphMode() {
+ // Set graph mode
+ // If there is a Graph data source, all Inputs must be Graph data sources
+ Supplier> inputsSupplier =
+ () -> this.mapping.structs().stream().filter(struct -> !struct.skip())
+ .map(InputStruct::input);
+
+ boolean allMatch = inputsSupplier.get().allMatch(input -> SourceType.GRAPH.equals(input.type()));
+ boolean anyMatch = inputsSupplier.get().anyMatch(input -> SourceType.GRAPH.equals(input.type()));
+
+ if (anyMatch && !allMatch) {
+ throw new LoadException("All inputs must be of Graph Type");
+ }
+
+ if (allMatch || this.options.restore) {
+ this.context().setRestoreMode();
+ } else {
+ this.context().setLoadingMode();
+ }
+ }
+
public boolean load() {
+ this.options.dumpParams();
+
try {
- // Switch to loading mode
- this.context.setLoadingMode();
+ // check graph exists
+ this.checkGraphExists();
+ // set GraphMode
+ this.setGraphMode();
// Clear schema if needed
this.clearAllDataIfNeeded();
// Create schema
@@ -109,19 +209,30 @@ public boolean load() {
// Print load summary
Printer.printSummary(this.context);
} catch (Throwable t) {
- RuntimeException e = LoadUtil.targetRuntimeException(t);
- Printer.printError("Failed to load", e);
- if (this.context.options().testMode) {
- throw e;
+ this.context.occurredError();
+
+ if (t instanceof ServerException) {
+ ServerException e = (ServerException) t;
+ String logMessage =
+ "Log ServerException: \n" + e.exception() + "\n";
+ if (e.trace() != null) {
+ logMessage += StringUtils.join((List) e.trace(),
+ "\n");
+ }
+ LOG.warn(logMessage);
}
- } finally {
- this.stopThenShutdown();
+
+ throw LoadUtil.targetRuntimeException(t);
}
- return this.context.noError();
+
+ return true;
+ }
+
+ public void shutdown() {
+ this.stopThenShutdown();
}
private void clearAllDataIfNeeded() {
- LoadOptions options = this.context.options();
if (!options.clearAllData) {
return;
}
@@ -129,22 +240,28 @@ private void clearAllDataIfNeeded() {
int requestTimeout = options.timeout;
options.timeout = options.clearTimeout;
HugeClient client = HugeClientHolder.create(options);
- String message = "I'm sure to delete all data";
- LOG.info("Prepare to clear the data of graph '{}'", options.graph);
- client.graphs().clearGraph(options.graph, message);
- LOG.info("The graph '{}' has been cleared successfully", options.graph);
-
- options.timeout = requestTimeout;
- client.close();
+ try {
+ LOG.info("Prepare to clear the data of graph '{}'", options.graph);
+ client.graphs().clearGraph(options.graph, "I'm sure to delete all data");
+ LOG.info("The graph '{}' has been cleared successfully",
+ options.graph);
+ } catch (Exception e) {
+ LOG.error("Failed to clear data for graph '{}': {}", options.graph, e.getMessage(), e);
+ throw e;
+ } finally {
+ options.timeout = requestTimeout;
+ }
}
private void createSchema() {
- LoadOptions options = this.context.options();
if (!StringUtils.isEmpty(options.schema)) {
File file = FileUtils.getFile(options.schema);
HugeClient client = this.context.client();
GroovyExecutor groovyExecutor = new GroovyExecutor();
+ if (!options.shorterIDConfigs.isEmpty()) {
+ SchemaManagerProxy.proxy(client, options);
+ }
groovyExecutor.bind(Constants.GROOVY_SCHEMA, client.schema());
String script;
try {
@@ -153,11 +270,288 @@ private void createSchema() {
throw new LoadException("Failed to read schema file '%s'", e,
options.schema);
}
- groovyExecutor.execute(script, client);
+
+ if (!options.shorterIDConfigs.isEmpty()) {
+ for (ShortIdConfig config : options.shorterIDConfigs) {
+ PropertyKey propertyKey = client.schema().propertyKey(config.getIdFieldName())
+ .ifNotExist()
+ .dataType(config.getIdFieldType())
+ .build();
+ client.schema().addPropertyKey(propertyKey);
+ }
+ groovyExecutor.execute(script, client);
+ List vertexLabels = client.schema().getVertexLabels();
+ for (VertexLabel vertexLabel : vertexLabels) {
+ ShortIdConfig config;
+ if ((config = options.getShortIdConfig(vertexLabel.name())) != null) {
+ config.setLabelID(vertexLabel.id());
+ IndexLabel indexLabel = client.schema()
+ .indexLabel(config.getVertexLabel() + "By" +
+ config.getIdFieldName())
+ .onV(config.getVertexLabel())
+ .by(config.getIdFieldName())
+ .secondary()
+ .ifNotExist()
+ .build();
+ client.schema().addIndexLabel(indexLabel);
+ }
+ }
+ } else {
+ groovyExecutor.execute(script, client);
+ }
+ }
+
+ // create schema for Graph Source
+ List structs = this.mapping.structs();
+ for (InputStruct struct : structs) {
+ if (SourceType.GRAPH.equals(struct.input().type())) {
+ GraphSource graphSouce = (GraphSource) struct.input();
+ if (StringUtils.isEmpty(graphSouce.getPdPeers())) {
+ graphSouce.setPdPeers(this.options.pdPeers);
+ }
+ if (StringUtils.isEmpty(graphSouce.getMetaEndPoints())) {
+ graphSouce.setMetaEndPoints(this.options.metaEndPoints);
+ }
+ if (StringUtils.isEmpty(graphSouce.getCluster())) {
+ graphSouce.setCluster(this.options.cluster);
+ }
+ if (StringUtils.isEmpty(graphSouce.getUsername())) {
+ graphSouce.setUsername(this.options.username);
+ }
+ if (StringUtils.isEmpty(graphSouce.getPassword())) {
+ graphSouce.setPassword(this.options.password);
+ }
+
+ GraphSource graphSource = (GraphSource) struct.input();
+ createGraphSourceSchema(graphSource);
+ }
}
+
this.context.updateSchemaCache();
}
+ /**
+ * create schema like graphdb when source is graphdb;
+ *
+ * @param graphSource
+ */
+ private void createGraphSourceSchema(GraphSource graphSource) {
+ try (HugeClient sourceClient = graphSource.createHugeClient();
+ HugeClient client = HugeClientHolder.create(this.options, false)) {
+ createGraphSourceVertexLabel(sourceClient, client, graphSource);
+ createGraphSourceEdgeLabel(sourceClient, client, graphSource);
+ createGraphSourceIndexLabel(sourceClient, client, graphSource);
+ } catch (Exception e) {
+ LOG.error("Failed to create graph source schema for {}: {}",
+ graphSource.getGraph(), e.getMessage(), e);
+ throw new LoadException("Schema creation failed", e);
+ }
+ }
+
+ // handles labels (can be used for both VertexLabel and EdgeLabel)
+ private void createGraphSourceLabels(
+ HugeClient sourceClient,
+ HugeClient targetClient,
+ List extends SchemaLabel> labels, // VertexLabel or EdgeLabel
+ Map selectedMap,
+ Map ignoredMap,
+ boolean isVertex) {
+
+ for (SchemaLabel label : labels) {
+ if (ignoredMap.containsKey(label.name())) {
+ GraphSource.IgnoredLabelDes des
+ = ignoredMap.get(label.name());
+
+ if (des.getProperties() != null) {
+ des.getProperties()
+ .forEach((p) -> label.properties().remove(p));
+ }
+ }
+
+ Set existedPKs =
+ targetClient.schema().getPropertyKeys().stream()
+ .map(pk -> pk.name()).collect(Collectors.toSet());
+
+ for (String pkName : label.properties()) {
+ PropertyKey pk = sourceClient.schema()
+ .getPropertyKey(pkName);
+ if (!existedPKs.contains(pk.name())) {
+ targetClient.schema().addPropertyKey(pk);
+ }
+ }
+
+ if (isVertex) {
+ if (!(label instanceof VertexLabel)) {
+ throw new IllegalArgumentException("Expected VertexLabel but got " + label.getClass());
+ }
+ targetClient.schema().addVertexLabel((VertexLabel) label);
+ } else {
+ if (!(label instanceof EdgeLabel)) {
+ throw new IllegalArgumentException("Expected EdgeLabel but got " + label.getClass());
+ }
+ targetClient.schema().addEdgeLabel((EdgeLabel) label);
+ }
+ }
+ }
+
+ private void createGraphSourceVertexLabel(HugeClient sourceClient,
+ HugeClient targetClient,
+ GraphSource graphSource) {
+
+ sourceClient.assignGraph(graphSource.getGraphSpace(),
+ graphSource.getGraph());
+
+ // Create Vertex Schema
+ List vertexLabels = new ArrayList<>();
+ if (graphSource.getSelectedVertices() != null) {
+ List selectedVertexLabels =
+ graphSource.getSelectedVertices()
+ .stream().map((des) -> des.getLabel())
+ .collect(Collectors.toList());
+
+ if (!CollectionUtils.isEmpty(selectedVertexLabels)) {
+ vertexLabels =
+ sourceClient.schema()
+ .getVertexLabels(selectedVertexLabels);
+ }
+ } else {
+ vertexLabels = sourceClient.schema().getVertexLabels();
+ }
+
+ Map mapSelectedVertices
+ = new HashMap<>();
+ if (graphSource.getSelectedVertices() != null) {
+ for (GraphSource.SelectedLabelDes des :
+ graphSource.getSelectedVertices()) {
+ mapSelectedVertices.put(des.getLabel(), des);
+ }
+ }
+
+ for (VertexLabel label : vertexLabels) {
+ if (mapSelectedVertices.getOrDefault(label.name(),
+ null) != null) {
+ List selectedProperties = mapSelectedVertices.get(
+ label.name()).getProperties();
+
+ if (selectedProperties != null) {
+ label.properties().clear();
+ label.properties().addAll(selectedProperties);
+ }
+ }
+ }
+
+ Map mapIgnoredVertices
+ = new HashMap<>();
+ if (graphSource.getIgnoredVertices() != null) {
+ for (GraphSource.IgnoredLabelDes des :
+ graphSource.getIgnoredVertices()) {
+ mapIgnoredVertices.put(des.getLabel(), des);
+ }
+ }
+
+ createGraphSourceLabels(sourceClient, targetClient, vertexLabels, mapSelectedVertices,
+ mapIgnoredVertices, true);
+ }
+
+ private void createGraphSourceEdgeLabel(HugeClient sourceClient,
+ HugeClient targetClient,
+ GraphSource graphSource) {
+ // Create Edge Schema
+ List edgeLabels = new ArrayList<>();
+ if (graphSource.getSelectedEdges() != null) {
+ List selectedEdgeLabels =
+ graphSource.getSelectedEdges()
+ .stream().map((des) -> des.getLabel())
+ .collect(Collectors.toList());
+
+ if (!CollectionUtils.isEmpty(selectedEdgeLabels)) {
+ edgeLabels =
+ sourceClient.schema()
+ .getEdgeLabels(selectedEdgeLabels);
+ }
+ } else {
+ edgeLabels = sourceClient.schema().getEdgeLabels();
+ }
+
+ Map mapSelectedEdges
+ = new HashMap<>();
+ if (graphSource.getSelectedEdges() != null) {
+ for (GraphSource.SelectedLabelDes des :
+ graphSource.getSelectedEdges()) {
+ mapSelectedEdges.put(des.getLabel(), des);
+ }
+ }
+
+ for (EdgeLabel label : edgeLabels) {
+ if (mapSelectedEdges.getOrDefault(label.name(), null) != null) {
+ List selectedProperties = mapSelectedEdges.get(
+ label.name()).getProperties();
+
+ if (selectedProperties != null) {
+ label.properties().clear();
+ label.properties().addAll(selectedProperties);
+ }
+ }
+ }
+
+ Map mapIgnoredEdges
+ = new HashMap<>();
+ if (graphSource.getIgnoredEdges() != null) {
+ for (GraphSource.IgnoredLabelDes des :
+ graphSource.getIgnoredEdges()) {
+ mapIgnoredEdges.put(des.getLabel(), des);
+ }
+ }
+
+ createGraphSourceLabels(sourceClient, targetClient, edgeLabels, mapSelectedEdges,
+ mapIgnoredEdges, false);
+ }
+
+ private void createGraphSourceIndexLabel(HugeClient sourceClient,
+ HugeClient targetClient,
+ GraphSource graphSource) {
+ Set existedVertexLabels
+ = targetClient.schema().getVertexLabels().stream()
+ .map(v -> v.name()).collect(Collectors.toSet());
+
+ Set existedEdgeLabels
+ = targetClient.schema().getEdgeLabels().stream()
+ .map(v -> v.name()).collect(Collectors.toSet());
+
+ List indexLabels = sourceClient.schema()
+ .getIndexLabels();
+ for (IndexLabel indexLabel : indexLabels) {
+
+ HugeType baseType = indexLabel.baseType();
+ String baseValue = indexLabel.baseValue();
+ Set sourceIndexFields =
+ new HashSet(indexLabel.indexFields());
+
+ if (baseType.equals(HugeType.VERTEX_LABEL) &&
+ existedVertexLabels.contains(baseValue)) {
+ // Create Vertex Index
+
+ Set curFields = targetClient.schema()
+ .getVertexLabel(baseValue)
+ .properties();
+ if (curFields.containsAll(sourceIndexFields)) {
+ targetClient.schema().addIndexLabel(indexLabel);
+ }
+ }
+
+ if (baseType.equals(HugeType.EDGE_LABEL) &&
+ existedEdgeLabels.contains(baseValue)) {
+ // Create Edge Index
+ Set curFields = targetClient.schema()
+ .getEdgeLabel(baseValue)
+ .properties();
+ if (curFields.containsAll(sourceIndexFields)) {
+ targetClient.schema().addIndexLabel(indexLabel);
+ }
+ }
+ }
+ }
+
private void loadInputs() {
Printer.printRealtimeProgress(this.context);
LoadOptions options = this.context.options();
@@ -200,27 +594,152 @@ private void loadInputs(List structs) {
}
}
- private void loadStructs(List structs) {
- // Load input structs one by one
+ private List prepareTaskItems(List structs,
+ boolean scatter) {
+ ArrayList tasks = new ArrayList<>();
+ ArrayList readers = new ArrayList<>();
+ int curFile = 0;
+ int curIndex = 0;
for (InputStruct struct : structs) {
- if (this.context.stopped()) {
- break;
- }
if (struct.skip()) {
continue;
}
- // Create and init InputReader, fetch next batch lines
- try (InputReader reader = InputReader.create(struct.input())) {
- // Init reader
- reader.init(this.context, struct);
- // Load data from current input mapping
- this.loadStruct(struct, reader);
+
+ // Create and init InputReader
+ try {
+ LOG.info("Start loading: '{}'", struct);
+
+ InputReader reader = InputReader.create(struct.input());
+ List readerList = reader.multiReaders() ?
+ reader.split() :
+ ImmutableList.of(reader);
+ readers.addAll(readerList);
+
+ LOG.info("total {} found in '{}'", readerList.size(), struct);
+ tasks.ensureCapacity(tasks.size() + readerList.size());
+ int seq = 0;
+ for (InputReader r : readerList) {
+ if (curFile >= this.context.options().startFile &&
+ (this.context.options().endFile == -1 ||
+ curFile < this.context.options().endFile)) {
+ // Load data from current input mapping
+ tasks.add(new InputTaskItem(struct, r, seq, curIndex));
+ } else {
+ r.close();
+ }
+ seq += 1;
+ curFile += 1;
+ }
+ if (this.context.options().endFile != -1 &&
+ curFile >= this.context.options().endFile) {
+ break;
+ }
} catch (InitException e) {
throw new LoadException("Failed to init input reader", e);
+ } finally {
+ Set usedReaders = tasks.stream()
+ .map(item -> item.reader)
+ .collect(Collectors.toSet());
+ for (InputReader r : readers) {
+ if (!usedReaders.contains(r)) {
+ try {
+ r.close();
+ } catch (Exception ex) {
+ LOG.warn("Failed to close reader", ex);
+ }
+ }
+ }
+ }
+ curIndex += 1;
+ }
+ // sort by seqNumber to allow scatter loading from different sources
+ if (scatter) {
+ tasks.sort(Comparator.comparingInt((InputTaskItem o) -> o.structIndex)
+ .thenComparingInt(o -> o.seqNumber));
+ }
+
+ return tasks;
+ }
+
+ private void loadStructs(List structs) {
+ int parallelCount = this.context.options().parallelCount;
+ if (structs.size() == 0) {
+ return;
+ }
+ if (parallelCount <= 0) {
+ parallelCount = Math.min(structs.size(), Runtime.getRuntime().availableProcessors() * 2);
+ }
+
+ boolean scatter = this.context.options().scatterSources;
+
+ LOG.info("{} threads for loading {} structs, from {} to {} in {} mode",
+ parallelCount, structs.size(), this.context.options().startFile,
+ this.context.options().endFile,
+ scatter ? "scatter" : "sequential");
+
+ ExecutorService loadService = null;
+ try {
+ loadService = ExecutorUtil.newFixedThreadPool(parallelCount, "loader");
+ List taskItems = prepareTaskItems(structs, scatter);
+ List> loadTasks = new ArrayList<>();
+
+ if (taskItems.isEmpty()) {
+ LOG.info("No tasks to execute after filtering");
+ return;
+ }
+
+ for (InputTaskItem item : taskItems) {
+ // Init reader
+ item.reader.init(this.context, item.struct);
+ // Load data from current input mapping
+ loadTasks.add(
+ this.asyncLoadStruct(item.struct, item.reader,
+ loadService));
}
+
+ LOG.info("waiting for loading finish {}", loadTasks.size());
+ CompletableFuture.allOf(loadTasks.toArray(new CompletableFuture[0]))
+ .join();
+ } catch (CompletionException e) {
+ Throwable cause = e.getCause();
+ if (cause instanceof ParseException) {
+ throw (ParseException) cause;
+ } else if (cause instanceof LoadException) {
+ throw (LoadException) cause;
+ } else if (cause != null) {
+ if (cause instanceof RuntimeException) {
+ throw (RuntimeException) cause;
+ } else {
+ throw new RuntimeException(cause);
+ }
+ } else {
+ throw e;
+ }
+ } catch (Throwable t) {
+ throw t;
+ } finally {
+ // Shutdown service
+ cleanupEmptyProgress();
+ if (loadService != null) {
+ loadService.shutdownNow();
+ }
+ LOG.info("Load end");
}
}
+ private CompletableFuture asyncLoadStruct(
+ InputStruct struct, InputReader reader, ExecutorService service) {
+ return CompletableFuture.runAsync(() -> {
+ try {
+ this.loadStruct(struct, reader);
+ } catch (Throwable t) {
+ throw t;
+ } finally {
+ reader.close();
+ }
+ }, service);
+ }
+
/**
* TODO: Separate classes: ReadHandler -> ParseHandler -> InsertHandler
* Let load task worked in pipeline mode
@@ -233,7 +752,9 @@ private void loadStruct(InputStruct struct, InputReader reader) {
ParseTaskBuilder taskBuilder = new ParseTaskBuilder(this.context, struct);
final int batchSize = this.context.options().batchSize;
List lines = new ArrayList<>(batchSize);
- for (boolean finished = false; !finished;) {
+ long batchStartTime = System.currentTimeMillis();
+
+ for (boolean finished = false; !finished; ) {
if (this.context.stopped()) {
break;
}
@@ -241,7 +762,8 @@ private void loadStruct(InputStruct struct, InputReader reader) {
// Read next line from data source
if (reader.hasNext()) {
Line next = reader.next();
- if (Objects.nonNull(next)) {
+ // If the data source is kafka, there may be cases where the fetched data is null
+ if (next != null) {
lines.add(next);
metrics.increaseReadSuccess();
}
@@ -257,14 +779,18 @@ private void loadStruct(InputStruct struct, InputReader reader) {
if (reachedMaxReadLines) {
finished = true;
}
- if (lines.size() >= batchSize || finished) {
- List tasks = taskBuilder.build(lines);
- for (ParseTaskBuilder.ParseTask task : tasks) {
+ if (lines.size() >= batchSize ||
+ // Force commit within 5s, mainly affects kafka data source
+ (lines.size() > 0 &&
+ System.currentTimeMillis() > batchStartTime + 5000) ||
+ finished) {
+ List tasks = taskBuilder.build(lines);
+ for (ParseTask task : tasks) {
this.executeParseTask(struct, task.mapping(), task);
}
// Confirm offset to avoid lost records
reader.confirmOffset();
- this.context.newProgress().markLoaded(struct, finished);
+ this.context.newProgress().markLoaded(struct, reader, finished);
this.handleParseFailure();
if (reachedMaxReadLines) {
@@ -272,6 +798,7 @@ private void loadStruct(InputStruct struct, InputReader reader) {
this.context.stopLoading();
}
lines = new ArrayList<>(batchSize);
+ batchStartTime = System.currentTimeMillis();
}
}
@@ -387,6 +914,11 @@ private synchronized void stopThenShutdown() {
}
}
+ private void cleanupEmptyProgress() {
+ Map inputProgressMap = this.context.newProgress().inputProgress();
+ inputProgressMap.entrySet().removeIf(entry -> entry.getValue().loadedItems().isEmpty());
+ }
+
private static class SplitInputStructs {
private final List vertexInputStructs;
diff --git a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/builder/EdgeBuilder.java b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/builder/EdgeBuilder.java
index 2df3431ae..950100187 100644
--- a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/builder/EdgeBuilder.java
+++ b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/builder/EdgeBuilder.java
@@ -25,6 +25,9 @@
import java.util.Map;
import java.util.Set;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hugegraph.util.E;
+
import org.apache.hugegraph.loader.executor.LoadContext;
import org.apache.hugegraph.loader.mapping.EdgeMapping;
import org.apache.hugegraph.loader.mapping.InputStruct;
@@ -34,12 +37,8 @@
import org.apache.hugegraph.structure.schema.EdgeLabel;
import org.apache.hugegraph.structure.schema.SchemaLabel;
import org.apache.hugegraph.structure.schema.VertexLabel;
-import org.apache.hugegraph.util.E;
-
import com.google.common.collect.ImmutableList;
-import org.apache.spark.sql.Row;
-
public class EdgeBuilder extends ElementBuilder {
private final EdgeMapping mapping;
@@ -58,8 +57,7 @@ public EdgeBuilder(LoadContext context, InputStruct struct,
this.edgeLabel = this.getEdgeLabel(this.mapping.label());
this.nonNullKeys = this.nonNullableKeys(this.edgeLabel);
if (this.edgeLabel.edgeLabelType().general()) {
- // If create a general type edge, the loader can't obtain the vertexlabel info of both ends
- // Therefore, the IdStrategy of both ends is uniformly set to CUSTOMIZE_STRING
+ // the IdStrategy of both ends is uniformly set to CUSTOMIZE_STRING
this.sourceLabel = new VertexLabel("~general");
this.targetLabel = new VertexLabel("~general");
this.sourceLabel.idStrategy(IdStrategy.CUSTOMIZE_STRING);
@@ -71,7 +69,6 @@ public EdgeBuilder(LoadContext context, InputStruct struct,
// Ensure that the source/target id fileds are matched with id strategy
this.checkIdFields(this.sourceLabel, this.mapping.sourceFields());
this.checkIdFields(this.targetLabel, this.mapping.targetFields());
-
this.vertexIdsIndex = null;
}
@@ -121,62 +118,19 @@ public List build(String[] names, Object[] values) {
return edges;
}
- @Override
- public List build(Row row) {
- String[] names = row.schema().fieldNames();
- Object[] values = new Object[row.size()];
- for (int i = 0; i < row.size(); i++) {
- values[i] = row.get(i);
- }
- if (this.vertexIdsIndex == null ||
- !Arrays.equals(this.lastNames, names)) {
- this.vertexIdsIndex = this.extractVertexIdsIndex(names);
- }
-
- this.lastNames = names;
- EdgeKVPairs kvPairs = this.newEdgeKVPairs();
- kvPairs.source.extractFromEdge(names, values, this.vertexIdsIndex.sourceIndexes);
- kvPairs.target.extractFromEdge(names, values, this.vertexIdsIndex.targetIndexes);
- kvPairs.extractProperties(names, values);
-
- List sources = kvPairs.source.buildVertices(false);
- List targets = kvPairs.target.buildVertices(false);
- if (sources.isEmpty() || targets.isEmpty()) {
- return ImmutableList.of();
- }
- E.checkArgument(sources.size() == 1 || targets.size() == 1 ||
- sources.size() == targets.size(),
- "The elements number of source and target must be: " +
- "1 to n, n to 1, n to n");
- int size = Math.max(sources.size(), targets.size());
- List edges = new ArrayList<>(size);
- for (int i = 0; i < size; i++) {
- Vertex source = i < sources.size() ?
- sources.get(i) : sources.get(0);
- Vertex target = i < targets.size() ?
- targets.get(i) : targets.get(0);
- Edge edge = new Edge(this.mapping.label());
- edge.source(source);
- edge.target(target);
- // Add properties
- this.addProperties(edge, kvPairs.properties);
- this.checkNonNullableKeys(edge);
- edges.add(edge);
- }
- return edges;
- }
-
private EdgeKVPairs newEdgeKVPairs() {
EdgeKVPairs kvPairs = new EdgeKVPairs();
kvPairs.source = this.newKVPairs(this.sourceLabel,
this.mapping.unfoldSource());
+ kvPairs.source.headerCaseSensitive(this.headerCaseSensitive());
kvPairs.target = this.newKVPairs(this.targetLabel,
this.mapping.unfoldTarget());
+ kvPairs.target.headerCaseSensitive(this.headerCaseSensitive());
return kvPairs;
}
@Override
- public SchemaLabel schemaLabel() {
+ protected SchemaLabel schemaLabel() {
return this.edgeLabel;
}
@@ -199,10 +153,10 @@ private void checkIdFields(VertexLabel vertexLabel, List fields) {
} else if (vertexLabel.idStrategy().isPrimaryKey()) {
E.checkArgument(fields.size() >= 1,
"The source/target field must contains some " +
- "columns when id strategy is PrimaryKey");
+ "columns when id strategy is CUSTOMIZE");
} else {
- throw new IllegalArgumentException("Unsupported AUTOMATIC id strategy " +
- "for hugegraph-loader");
+ throw new IllegalArgumentException(
+ "Unsupported AUTOMATIC id strategy for hugegraph-loader");
}
}
@@ -225,7 +179,7 @@ public void extractProperties(String[] names, Object[] values) {
continue;
}
- String key = mapping.mappingField(fieldName);
+ String key = mappingField(fieldName);
if (isIdField(fieldName) &&
!props.contains(fieldName) && !props.contains(key)) {
continue;
@@ -240,25 +194,27 @@ public void extractProperties(String[] names, Object[] values) {
private VertexIdsIndex extractVertexIdsIndex(String[] names) {
VertexIdsIndex index = new VertexIdsIndex();
index.sourceIndexes = new int[this.mapping.sourceFields().size()];
- int idx = 0;
- for (String field : this.mapping.sourceFields()) {
- for (int pos = 0; pos < names.length; pos++) {
- String name = names[pos];
- if (field.equals(name)) {
- index.sourceIndexes[idx++] = pos;
- }
- }
+ //
+ List listNames = Arrays.asList(names);
+ for (int idx = 0; idx < this.mapping.sourceFields().size(); idx++) {
+ String field = this.mapping.sourceFields().get(idx);
+ int i = listNames.indexOf(field);
+ E.checkArgument(i >= 0,
+ "mapping file error: edges.source(%s)" +
+ " not in file header([%s])", field,
+ StringUtils.joinWith(",", names));
+ index.sourceIndexes[idx] = i;
}
index.targetIndexes = new int[this.mapping.targetFields().size()];
- idx = 0;
- for (String field : this.mapping.targetFields()) {
- for (int pos = 0; pos < names.length; pos++) {
- String name = names[pos];
- if (field.equals(name)) {
- index.targetIndexes[idx++] = pos;
- }
- }
+ for (int idx = 0; idx < this.mapping.targetFields().size(); idx++) {
+ String field = this.mapping.targetFields().get(idx);
+ int i = listNames.indexOf(field);
+ E.checkArgument(i >= 0,
+ "mapping file error: edges.target(%s)" +
+ " not in file header([%s])", field,
+ StringUtils.joinWith(",", names));
+ index.targetIndexes[idx] = i;
}
return index;
}
diff --git a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/builder/ElementBuilder.java b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/builder/ElementBuilder.java
index 7fa680776..e1d6c0818 100644
--- a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/builder/ElementBuilder.java
+++ b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/builder/ElementBuilder.java
@@ -21,6 +21,7 @@
import java.nio.CharBuffer;
import java.nio.charset.CharsetEncoder;
import java.nio.charset.CoderResult;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -35,26 +36,28 @@
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.ListUtils;
import org.apache.commons.lang3.StringUtils;
+import org.apache.hugegraph.util.E;
+import org.apache.hugegraph.util.LongEncoding;
import org.apache.hugegraph.loader.constant.Constants;
import org.apache.hugegraph.loader.executor.LoadContext;
+import org.apache.hugegraph.loader.filter.util.ShortIdConfig;
import org.apache.hugegraph.loader.mapping.ElementMapping;
import org.apache.hugegraph.loader.mapping.InputStruct;
-import org.apache.hugegraph.loader.util.DataTypeUtil;
import org.apache.hugegraph.loader.source.InputSource;
+import org.apache.hugegraph.loader.util.DataTypeUtil;
import org.apache.hugegraph.structure.GraphElement;
+import org.apache.hugegraph.structure.constant.DataType;
import org.apache.hugegraph.structure.constant.IdStrategy;
import org.apache.hugegraph.structure.graph.Vertex;
import org.apache.hugegraph.structure.schema.EdgeLabel;
import org.apache.hugegraph.structure.schema.PropertyKey;
import org.apache.hugegraph.structure.schema.SchemaLabel;
import org.apache.hugegraph.structure.schema.VertexLabel;
-import org.apache.hugegraph.util.E;
-import org.apache.hugegraph.util.LongEncoding;
-import com.google.common.collect.ImmutableList;
+import java.util.HashSet;
-import org.apache.spark.sql.Row;
+import com.google.common.collect.ImmutableList;
public abstract class ElementBuilder {
@@ -64,26 +67,51 @@ public abstract class ElementBuilder {
// NOTE: CharsetEncoder is not thread safe
private final CharsetEncoder encoder;
private final ByteBuffer buffer;
+ private LoadContext context;
+ private boolean usePrefilter;
+ private static HashSet bytesSet;
+ private static HashSet longSet;
public ElementBuilder(LoadContext context, InputStruct struct) {
this.struct = struct;
this.schema = context.schemaCache();
this.encoder = Constants.CHARSET.newEncoder();
this.buffer = ByteBuffer.allocate(Constants.VERTEX_ID_LIMIT);
+ this.context = context;
+ this.usePrefilter = this.context.options().usePrefilter;
+ if (longSet == null) {
+ synchronized (ElementBuilder.class) {
+ if (longSet == null) {
+ longSet = new HashSet<>();
+ bytesSet = new HashSet<>();
+ }
+ }
+ }
}
public abstract ElementMapping mapping();
public abstract List build(String[] names, Object[] values);
- public abstract List build(Row row);
-
- public abstract SchemaLabel schemaLabel();
+ protected abstract SchemaLabel schemaLabel();
protected abstract Collection nonNullableKeys();
protected abstract boolean isIdField(String fieldName);
+ // Whether builder distinguishes header case sensitivity
+ protected boolean headerCaseSensitive() {
+ return this.struct.input().headerCaseSensitive();
+ }
+
+ protected boolean headerEqual(String header1, String header2) {
+ if (this.headerCaseSensitive()) {
+ return header1.equals(header2);
+ } else {
+ return header1.equalsIgnoreCase(header2);
+ }
+ }
+
@SuppressWarnings("unchecked")
protected Collection nonNullableKeys(SchemaLabel schemaLabel) {
return CollectionUtils.subtract(schemaLabel.properties(),
@@ -109,6 +137,52 @@ protected VertexKVPairs newKVPairs(VertexLabel vertexLabel,
}
}
+ protected boolean isSelectedField(String fieldName) {
+ ElementMapping mapping = this.mapping();
+ Set selectedFields = mapping.selectedFields();
+
+ if (selectedFields.isEmpty()) {
+ return true;
+ }
+
+ if (this.headerCaseSensitive()) {
+ if (selectedFields.contains(fieldName)) {
+ return true;
+ }
+ } else {
+ for (String selectedField : selectedFields) {
+ if (headerEqual(selectedField, fieldName)) {
+ return true;
+ }
+ }
+ }
+
+ return false;
+ }
+
+ protected boolean isIgnoredField(String fieldName) {
+ ElementMapping mapping = this.mapping();
+ Set ignoredFields = mapping.ignoredFields();
+
+ if (ignoredFields.isEmpty()) {
+ return false;
+ }
+
+ if (this.headerCaseSensitive()) {
+ if (ignoredFields.contains(fieldName)) {
+ return true;
+ }
+ } else {
+ for (String ignoredField : ignoredFields) {
+ if (headerEqual(ignoredField, fieldName)) {
+ return true;
+ }
+ }
+ }
+
+ return false;
+ }
+
/**
* Retain only the key-value pairs needed by the current vertex or edge
*/
@@ -117,18 +191,26 @@ protected boolean retainField(String fieldName, Object fieldValue) {
Set selectedFields = mapping.selectedFields();
Set ignoredFields = mapping.ignoredFields();
// Retain selected fields or remove ignored fields
- if (!selectedFields.isEmpty() && !selectedFields.contains(fieldName)) {
+ if (!isSelectedField(fieldName)) {
return false;
}
- if (!ignoredFields.isEmpty() && ignoredFields.contains(fieldName)) {
+ if (isIgnoredField(fieldName)) {
return false;
}
- String mappedKey = mapping.mappingField(fieldName);
+
+ String mappedKey = mappingField(fieldName);
+
Set nullableKeys = this.schemaLabel().nullableKeys();
Set