From 89d1cd5c246bd557ede5725de62c78628a62fed6 Mon Sep 17 00:00:00 2001 From: Aaron Wang Date: Sat, 3 Jun 2023 21:39:17 +0800 Subject: [PATCH 1/3] feat: isolate namespace for different input data source --- .../computer/core/config/ComputerOptions.java | 9 +++++++++ .../apache/hugegraph/computer/core/bsp/BspBase.java | 11 ++++++++--- 2 files changed, 17 insertions(+), 3 deletions(-) diff --git a/computer-api/src/main/java/org/apache/hugegraph/computer/core/config/ComputerOptions.java b/computer-api/src/main/java/org/apache/hugegraph/computer/core/config/ComputerOptions.java index 765f8adbb..878ed328b 100644 --- a/computer-api/src/main/java/org/apache/hugegraph/computer/core/config/ComputerOptions.java +++ b/computer-api/src/main/java/org/apache/hugegraph/computer/core/config/ComputerOptions.java @@ -420,6 +420,14 @@ public static synchronized ComputerOptions instance() { 10000 ); + public static final ConfigOption JOB_NAMESPACE = + new ConfigOption<>( + "job.namespace", + "The job namespace can seperate different data source.", + null, + "default" + ); + public static final ConfigOption JOB_ID = new ConfigOption<>( "job.id", @@ -921,6 +929,7 @@ public static synchronized ComputerOptions instance() { ComputerOptions.BSP_ETCD_ENDPOINTS.name(), ComputerOptions.TRANSPORT_SERVER_HOST.name(), ComputerOptions.TRANSPORT_SERVER_PORT.name(), + ComputerOptions.JOB_NAMESPACE.name(), ComputerOptions.JOB_ID.name(), ComputerOptions.JOB_WORKERS_COUNT.name(), ComputerOptions.RPC_SERVER_HOST_NAME, diff --git a/computer-core/src/main/java/org/apache/hugegraph/computer/core/bsp/BspBase.java b/computer-core/src/main/java/org/apache/hugegraph/computer/core/bsp/BspBase.java index 66a3fd938..319c56d66 100644 --- a/computer-core/src/main/java/org/apache/hugegraph/computer/core/bsp/BspBase.java +++ b/computer-core/src/main/java/org/apache/hugegraph/computer/core/bsp/BspBase.java @@ -30,6 +30,7 @@ public abstract class BspBase { private final Config config; private final String jobId; + private final String jobNamespace; private final int workerCount; private final long registerTimeout; private final long barrierOnMasterTimeout; @@ -42,6 +43,7 @@ public BspBase(Config config) { this.config = config; this.jobId = config.get(ComputerOptions.JOB_ID); + this.jobNamespace = config.get(ComputerOptions.JOB_NAMESPACE); this.workerCount = this.config.get(ComputerOptions.JOB_WORKERS_COUNT); this.registerTimeout = this.config.get( ComputerOptions.BSP_REGISTER_TIMEOUT); @@ -59,7 +61,8 @@ public BspBase(Config config) { */ private BspClient init() { BspClient bspClient = this.createBspClient(); - bspClient.init(this.jobId); + String namespace = this.constructPath(null, jobNamespace, this.jobId); + bspClient.init(namespace); LOG.info("Init {} BSP connection to '{}' for job '{}'", bspClient.type(), bspClient.endpoint(), this.jobId); return bspClient; @@ -123,8 +126,10 @@ public final long logInterval() { */ protected String constructPath(BspEvent event, Object... paths) { StringBuilder sb = new StringBuilder(); - // TODO: replace event.code() with event.name() - sb.append(event.name()); + if (event != null) { + // TODO: replace event.code() with event.name() + sb.append(event.name()); + } for (Object path : paths) { sb.append("/").append(path.toString()); } From a313444d25b8a89b477d12088d737dd8b7c18108 Mon Sep 17 00:00:00 2001 From: Aaron Wang Date: Sun, 4 Jun 2023 21:50:52 +0800 Subject: [PATCH 2/3] set default job namespace to empty string --- .../apache/hugegraph/computer/core/config/ComputerOptions.java | 2 +- .../java/org/apache/hugegraph/computer/core/bsp/BspBase.java | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/computer-api/src/main/java/org/apache/hugegraph/computer/core/config/ComputerOptions.java b/computer-api/src/main/java/org/apache/hugegraph/computer/core/config/ComputerOptions.java index 878ed328b..33fba83d9 100644 --- a/computer-api/src/main/java/org/apache/hugegraph/computer/core/config/ComputerOptions.java +++ b/computer-api/src/main/java/org/apache/hugegraph/computer/core/config/ComputerOptions.java @@ -425,7 +425,7 @@ public static synchronized ComputerOptions instance() { "job.namespace", "The job namespace can seperate different data source.", null, - "default" + "" ); public static final ConfigOption JOB_ID = diff --git a/computer-core/src/main/java/org/apache/hugegraph/computer/core/bsp/BspBase.java b/computer-core/src/main/java/org/apache/hugegraph/computer/core/bsp/BspBase.java index 319c56d66..d0abc64aa 100644 --- a/computer-core/src/main/java/org/apache/hugegraph/computer/core/bsp/BspBase.java +++ b/computer-core/src/main/java/org/apache/hugegraph/computer/core/bsp/BspBase.java @@ -61,7 +61,8 @@ public BspBase(Config config) { */ private BspClient init() { BspClient bspClient = this.createBspClient(); - String namespace = this.constructPath(null, jobNamespace, this.jobId); + String namespace = this.jobNamespace == "" ? this.constructPath(null, this.jobId) : + this.constructPath(null, this.jobNamespace, this.jobId); bspClient.init(namespace); LOG.info("Init {} BSP connection to '{}' for job '{}'", bspClient.type(), bspClient.endpoint(), this.jobId); From 7057b5d98b81fdc11fc62732a71f870856d9deaf Mon Sep 17 00:00:00 2001 From: Aaron Wang Date: Tue, 6 Jun 2023 22:50:15 +0800 Subject: [PATCH 3/3] use StringUtils.isEmpty --- .../org/apache/hugegraph/computer/core/bsp/BspBase.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/computer-core/src/main/java/org/apache/hugegraph/computer/core/bsp/BspBase.java b/computer-core/src/main/java/org/apache/hugegraph/computer/core/bsp/BspBase.java index d0abc64aa..23d01f4e2 100644 --- a/computer-core/src/main/java/org/apache/hugegraph/computer/core/bsp/BspBase.java +++ b/computer-core/src/main/java/org/apache/hugegraph/computer/core/bsp/BspBase.java @@ -17,6 +17,7 @@ package org.apache.hugegraph.computer.core.bsp; +import org.apache.commons.lang3.StringUtils; import org.apache.hugegraph.computer.core.common.exception.ComputerException; import org.apache.hugegraph.computer.core.config.ComputerOptions; import org.apache.hugegraph.computer.core.config.Config; @@ -61,8 +62,9 @@ public BspBase(Config config) { */ private BspClient init() { BspClient bspClient = this.createBspClient(); - String namespace = this.jobNamespace == "" ? this.constructPath(null, this.jobId) : - this.constructPath(null, this.jobNamespace, this.jobId); + String namespace = StringUtils.isEmpty(this.jobNamespace) ? + this.constructPath(null, this.jobId) : + this.constructPath(null, this.jobNamespace, this.jobId); bspClient.init(namespace); LOG.info("Init {} BSP connection to '{}' for job '{}'", bspClient.type(), bspClient.endpoint(), this.jobId);