diff --git a/computer-api/src/main/java/org/apache/hugegraph/computer/core/config/ComputerOptions.java b/computer-api/src/main/java/org/apache/hugegraph/computer/core/config/ComputerOptions.java index 765f8adbb..33fba83d9 100644 --- a/computer-api/src/main/java/org/apache/hugegraph/computer/core/config/ComputerOptions.java +++ b/computer-api/src/main/java/org/apache/hugegraph/computer/core/config/ComputerOptions.java @@ -420,6 +420,14 @@ public static synchronized ComputerOptions instance() { 10000 ); + public static final ConfigOption JOB_NAMESPACE = + new ConfigOption<>( + "job.namespace", + "The job namespace can seperate different data source.", + null, + "" + ); + public static final ConfigOption JOB_ID = new ConfigOption<>( "job.id", @@ -921,6 +929,7 @@ public static synchronized ComputerOptions instance() { ComputerOptions.BSP_ETCD_ENDPOINTS.name(), ComputerOptions.TRANSPORT_SERVER_HOST.name(), ComputerOptions.TRANSPORT_SERVER_PORT.name(), + ComputerOptions.JOB_NAMESPACE.name(), ComputerOptions.JOB_ID.name(), ComputerOptions.JOB_WORKERS_COUNT.name(), ComputerOptions.RPC_SERVER_HOST_NAME, diff --git a/computer-core/src/main/java/org/apache/hugegraph/computer/core/bsp/BspBase.java b/computer-core/src/main/java/org/apache/hugegraph/computer/core/bsp/BspBase.java index 66a3fd938..23d01f4e2 100644 --- a/computer-core/src/main/java/org/apache/hugegraph/computer/core/bsp/BspBase.java +++ b/computer-core/src/main/java/org/apache/hugegraph/computer/core/bsp/BspBase.java @@ -17,6 +17,7 @@ package org.apache.hugegraph.computer.core.bsp; +import org.apache.commons.lang3.StringUtils; import org.apache.hugegraph.computer.core.common.exception.ComputerException; import org.apache.hugegraph.computer.core.config.ComputerOptions; import org.apache.hugegraph.computer.core.config.Config; @@ -30,6 +31,7 @@ public abstract class BspBase { private final Config config; private final String jobId; + private final String jobNamespace; private final int workerCount; private final long registerTimeout; private final long barrierOnMasterTimeout; @@ -42,6 +44,7 @@ public BspBase(Config config) { this.config = config; this.jobId = config.get(ComputerOptions.JOB_ID); + this.jobNamespace = config.get(ComputerOptions.JOB_NAMESPACE); this.workerCount = this.config.get(ComputerOptions.JOB_WORKERS_COUNT); this.registerTimeout = this.config.get( ComputerOptions.BSP_REGISTER_TIMEOUT); @@ -59,7 +62,10 @@ public BspBase(Config config) { */ private BspClient init() { BspClient bspClient = this.createBspClient(); - bspClient.init(this.jobId); + String namespace = StringUtils.isEmpty(this.jobNamespace) ? + this.constructPath(null, this.jobId) : + this.constructPath(null, this.jobNamespace, this.jobId); + bspClient.init(namespace); LOG.info("Init {} BSP connection to '{}' for job '{}'", bspClient.type(), bspClient.endpoint(), this.jobId); return bspClient; @@ -123,8 +129,10 @@ public final long logInterval() { */ protected String constructPath(BspEvent event, Object... paths) { StringBuilder sb = new StringBuilder(); - // TODO: replace event.code() with event.name() - sb.append(event.name()); + if (event != null) { + // TODO: replace event.code() with event.name() + sb.append(event.name()); + } for (Object path : paths) { sb.append("/").append(path.toString()); }