diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/task/HugeTask.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/task/HugeTask.java index 68c978a297..e9004bc129 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/task/HugeTask.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/task/HugeTask.java @@ -196,10 +196,14 @@ public String toString() { @Override public void run() { - assert this.status.code() < TaskStatus.RUNNING.code(); - if (this.checkDependenciesSuccess()) { - this.status(TaskStatus.RUNNING); - super.run(); + try { + assert this.status.code() < TaskStatus.RUNNING.code(); + if (this.checkDependenciesSuccess()) { + this.status(TaskStatus.RUNNING); + super.run(); + } + } catch (Throwable e) { + this.setException(e); } } @@ -338,7 +342,7 @@ protected Object[] asArray() { E.checkState(this.type != null, "Task type can't be null"); E.checkState(this.name != null, "Task name can't be null"); - List list = new ArrayList<>(24); + List list = new ArrayList<>(28); list.add(T.label); list.add(P.TASK); diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/task/TaskScheduler.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/task/TaskScheduler.java index ebf7639c15..6f0452a10d 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/task/TaskScheduler.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/task/TaskScheduler.java @@ -196,17 +196,9 @@ public void save(HugeTask task) { this.call(() -> { // Construct vertex from task HugeVertex vertex = this.tx().constructVertex(task); - // Delete the old record if exist - Iterator old = this.tx().queryVertices(vertex.id()); - if (old.hasNext()) { - HugeVertex oldV = (HugeVertex) old.next(); - assert !old.hasNext(); - if (this.tx().indexValueChanged(oldV, vertex)) { - // Only delete vertex if index value changed else override - this.tx().removeVertex(oldV); - } - } - // Do update + // TODO: delete index of old vertex + // this.tx().deleteIndex(vertex); + // Add or update task info in backend store, stale index might exist return this.tx().addVertex(vertex); }); } @@ -424,6 +416,20 @@ public boolean indexValueChanged(Vertex oldV, HugeVertex newV) { return false; } + private void deleteIndex(HugeVertex vertex) { + // Delete the old record if exist + Iterator old = this.queryVertices(vertex.id()); + if (old.hasNext()) { + HugeVertex oldV = (HugeVertex) old.next(); + assert !old.hasNext(); + if (this.indexValueChanged(oldV, vertex)) { + // Only delete vertex if index value changed else override + // TODO: just delete index instead of removing old vertex + this.removeVertex(oldV); + } + } + } + protected void initSchema() { HugeGraph graph = this.graph(); VertexLabel label = graph.schemaTransaction().getVertexLabel(TASK); diff --git a/hugegraph-hbase/src/main/java/com/baidu/hugegraph/backend/store/hbase/HbaseOptions.java b/hugegraph-hbase/src/main/java/com/baidu/hugegraph/backend/store/hbase/HbaseOptions.java index df3d0321ef..2937114f7c 100644 --- a/hugegraph-hbase/src/main/java/com/baidu/hugegraph/backend/store/hbase/HbaseOptions.java +++ b/hugegraph-hbase/src/main/java/com/baidu/hugegraph/backend/store/hbase/HbaseOptions.java @@ -56,4 +56,12 @@ public static synchronized HbaseOptions instance() { rangeInt(1024, 10000), 2181 ); + + public static final ConfigOption HBASE_THREADS_MAX = + new ConfigOption<>( + "hbase.threads_max", + "The max threads num of hbase connections.", + rangeInt(1, 1000), + 64 + ); } diff --git a/hugegraph-hbase/src/main/java/com/baidu/hugegraph/backend/store/hbase/HbaseSessions.java b/hugegraph-hbase/src/main/java/com/baidu/hugegraph/backend/store/hbase/HbaseSessions.java index 3f2ff14962..bcd2f969b8 100644 --- a/hugegraph-hbase/src/main/java/com/baidu/hugegraph/backend/store/hbase/HbaseSessions.java +++ b/hugegraph-hbase/src/main/java/com/baidu/hugegraph/backend/store/hbase/HbaseSessions.java @@ -86,6 +86,9 @@ public synchronized void open(HugeConfig conf) throws IOException { Configuration config = HBaseConfiguration.create(); config.set("hbase.zookeeper.quorum", hosts); config.set("hbase.zookeeper.property.clientPort", String.valueOf(port)); + // Set hbase.hconnection.threads.max 64 to avoid OOM(default value: 256) + config.setInt("hbase.hconnection.threads.max", + conf.get(HbaseOptions.HBASE_THREADS_MAX)); this.hbase = ConnectionFactory.createConnection(config); }