diff --git a/hugegraph-api/src/main/java/com/baidu/hugegraph/api/raft/RaftAPI.java b/hugegraph-api/src/main/java/com/baidu/hugegraph/api/raft/RaftAPI.java index e8e7aa7a68..a042c56840 100644 --- a/hugegraph-api/src/main/java/com/baidu/hugegraph/api/raft/RaftAPI.java +++ b/hugegraph-api/src/main/java/com/baidu/hugegraph/api/raft/RaftAPI.java @@ -19,6 +19,7 @@ package com.baidu.hugegraph.api.raft; +import java.util.HashMap; import java.util.List; import java.util.Map; @@ -40,8 +41,14 @@ import com.baidu.hugegraph.HugeGraph; import com.baidu.hugegraph.api.API; import com.baidu.hugegraph.api.filter.StatusFilter.Status; +import com.baidu.hugegraph.backend.id.Id; +import com.baidu.hugegraph.backend.store.raft.RaftAddPeerJob; import com.baidu.hugegraph.backend.store.raft.RaftGroupManager; +import com.baidu.hugegraph.backend.store.raft.RaftRemovePeerJob; import com.baidu.hugegraph.core.GraphManager; +import com.baidu.hugegraph.job.JobBuilder; +import com.baidu.hugegraph.util.DateUtil; +import com.baidu.hugegraph.util.JsonUtil; import com.baidu.hugegraph.util.Log; import com.codahale.metrics.annotation.Timed; import com.google.common.collect.ImmutableMap; @@ -144,19 +151,26 @@ public Map setLeader(@Context GraphManager manager, @Consumes(APPLICATION_JSON) @Produces(APPLICATION_JSON_WITH_CHARSET) @RolesAllowed({"admin"}) - public Map addPeer(@Context GraphManager manager, - @PathParam("graph") String graph, - @QueryParam("group") - @DefaultValue("default") - String group, - @QueryParam("endpoint") - String endpoint) { + public Map addPeer(@Context GraphManager manager, + @PathParam("graph") String graph, + @QueryParam("group") @DefaultValue("default") + String group, + @QueryParam("endpoint") String endpoint) { LOG.debug("Graph [{}] prepare to add peer: {}", graph, endpoint); HugeGraph g = graph(manager, graph); RaftGroupManager raftManager = raftGroupManager(g, group, "add_peer"); - String peerId = raftManager.addPeer(endpoint); - return ImmutableMap.of(raftManager.group(), peerId); + + JobBuilder builder = JobBuilder.of(g); + String name = String.format("raft-group-[%s]-add-peer-[%s]-at-[%s]", + raftManager.group(), endpoint, + DateUtil.now()); + Map inputs = new HashMap<>(); + inputs.put("endpoint", endpoint); + builder.name(name) + .input(JsonUtil.toJson(inputs)) + .job(new RaftAddPeerJob()); + return ImmutableMap.of("task_id", builder.schedule().id()); } @POST @@ -166,26 +180,32 @@ public Map addPeer(@Context GraphManager manager, @Consumes(APPLICATION_JSON) @Produces(APPLICATION_JSON_WITH_CHARSET) @RolesAllowed({"admin"}) - public Map removePeer(@Context GraphManager manager, - @PathParam("graph") String graph, - @QueryParam("group") - @DefaultValue("default") - String group, - @QueryParam("endpoint") - String endpoint) { + public Map removePeer(@Context GraphManager manager, + @PathParam("graph") String graph, + @QueryParam("group") + @DefaultValue("default") String group, + @QueryParam("endpoint") String endpoint) { LOG.debug("Graph [{}] prepare to remove peer: {}", graph, endpoint); HugeGraph g = graph(manager, graph); RaftGroupManager raftManager = raftGroupManager(g, group, "remove_peer"); - String peerId = raftManager.removePeer(endpoint); - return ImmutableMap.of(raftManager.group(), peerId); + JobBuilder builder = JobBuilder.of(g); + String name = String.format("raft-group-[%s]-remove-peer-[%s]-at-[%s]", + raftManager.group(), endpoint, + DateUtil.now()); + Map inputs = new HashMap<>(); + inputs.put("endpoint", endpoint); + builder.name(name) + .input(JsonUtil.toJson(inputs)) + .job(new RaftRemovePeerJob()); + return ImmutableMap.of("task_id", builder.schedule().id()); } private static RaftGroupManager raftGroupManager(HugeGraph graph, String group, String operation) { - RaftGroupManager raftManager = graph.raftGroupManager(group); + RaftGroupManager raftManager = graph.raftGroupManager(); if (raftManager == null) { throw new HugeException("Allowed %s operation only when " + "working on raft mode", operation); diff --git a/hugegraph-api/src/main/java/com/baidu/hugegraph/auth/HugeGraphAuthProxy.java b/hugegraph-api/src/main/java/com/baidu/hugegraph/auth/HugeGraphAuthProxy.java index 909209556c..4a6ab95120 100644 --- a/hugegraph-api/src/main/java/com/baidu/hugegraph/auth/HugeGraphAuthProxy.java +++ b/hugegraph-api/src/main/java/com/baidu/hugegraph/auth/HugeGraphAuthProxy.java @@ -63,6 +63,7 @@ import org.apache.tinkerpop.gremlin.structure.io.Io; import org.slf4j.Logger; +import com.alipay.remoting.rpc.RpcServer; import com.baidu.hugegraph.HugeGraph; import com.baidu.hugegraph.auth.HugeAuthenticator.RolePerm; import com.baidu.hugegraph.auth.HugeAuthenticator.User; @@ -645,9 +646,9 @@ public void readMode(GraphReadMode readMode) { } @Override - public void waitStarted() { + public void waitReady(RpcServer rpcServer) { this.verifyAnyPermission(); - this.hugegraph.waitStarted(); + this.hugegraph.waitReady(rpcServer); } @Override @@ -694,9 +695,9 @@ public void switchAuthManager(AuthManager authManager) { } @Override - public RaftGroupManager raftGroupManager(String group) { + public RaftGroupManager raftGroupManager() { this.verifyAdminPermission(); - return this.hugegraph.raftGroupManager(group); + return this.hugegraph.raftGroupManager(); } @Override diff --git a/hugegraph-api/src/main/java/com/baidu/hugegraph/auth/StandardAuthenticator.java b/hugegraph-api/src/main/java/com/baidu/hugegraph/auth/StandardAuthenticator.java index 5d56169f0e..b8e559680f 100644 --- a/hugegraph-api/src/main/java/com/baidu/hugegraph/auth/StandardAuthenticator.java +++ b/hugegraph-api/src/main/java/com/baidu/hugegraph/auth/StandardAuthenticator.java @@ -118,6 +118,12 @@ public void setup(HugeConfig config) { // Forced set RAFT_MODE to false when initializing backend graphConfig.setProperty(CoreOptions.RAFT_MODE.name(), "false"); } + + // Transfer `raft.group_peers` from server config to graph config + String raftGroupPeers = config.get(ServerOptions.RAFT_GROUP_PEERS); + graphConfig.addProperty(ServerOptions.RAFT_GROUP_PEERS.name(), + raftGroupPeers); + this.graph = (HugeGraph) GraphFactory.open(graphConfig); String remoteUrl = config.get(ServerOptions.AUTH_REMOTE_URL); diff --git a/hugegraph-api/src/main/java/com/baidu/hugegraph/config/ServerOptions.java b/hugegraph-api/src/main/java/com/baidu/hugegraph/config/ServerOptions.java index a0c4b073fa..c132edcd22 100644 --- a/hugegraph-api/src/main/java/com/baidu/hugegraph/config/ServerOptions.java +++ b/hugegraph-api/src/main/java/com/baidu/hugegraph/config/ServerOptions.java @@ -177,6 +177,14 @@ public static synchronized ServerOptions instance() { nonNegativeInt(), 0); + public static final ConfigOption RAFT_GROUP_PEERS = + new ConfigOption<>( + "raft.group_peers", + "The rpc address of raft group initial peers.", + disallowEmpty(), + "127.0.0.1:8090" + ); + public static final ConfigOption ALLOW_TRACE = new ConfigOption<>( "exception.allow_trace", diff --git a/hugegraph-api/src/main/java/com/baidu/hugegraph/core/GraphManager.java b/hugegraph-api/src/main/java/com/baidu/hugegraph/core/GraphManager.java index 7f4ccecc2b..6a4217f4d5 100644 --- a/hugegraph-api/src/main/java/com/baidu/hugegraph/core/GraphManager.java +++ b/hugegraph-api/src/main/java/com/baidu/hugegraph/core/GraphManager.java @@ -37,6 +37,7 @@ import org.apache.tinkerpop.gremlin.structure.util.GraphFactory; import org.slf4j.Logger; +import com.alipay.sofa.rpc.config.ServerConfig; import com.baidu.hugegraph.HugeFactory; import com.baidu.hugegraph.HugeGraph; import com.baidu.hugegraph.auth.AuthManager; @@ -66,6 +67,7 @@ import com.baidu.hugegraph.serializer.Serializer; import com.baidu.hugegraph.server.RestServer; import com.baidu.hugegraph.task.TaskManager; +import com.baidu.hugegraph.testutil.Whitebox; import com.baidu.hugegraph.type.define.NodeRole; import com.baidu.hugegraph.util.ConfigUtil; import com.baidu.hugegraph.util.E; @@ -96,37 +98,39 @@ public GraphManager(HugeConfig conf, EventHub hub) { this.rpcClient = new RpcClientProvider(conf); this.eventHub = hub; this.conf = conf; + this.listenChanges(); + this.loadGraphs(ConfigUtil.scanGraphsDir(this.graphsDir)); + // this.installLicense(conf, ""); + + // Start RPC-Server for raft-rpc/auth-rpc/cache-notify-rpc... + this.startRpcServer(); + // Raft will load snapshot firstly then launch election and replay log - this.waitGraphsStarted(); + this.waitGraphsReady(); + this.checkBackendVersionOrExit(conf); - this.startRpcServer(); this.serverStarted(conf); + this.addMetrics(conf); } - public void loadGraphs(final Map graphConfs) { + public void loadGraphs(Map graphConfs) { for (Map.Entry conf : graphConfs.entrySet()) { String name = conf.getKey(); - String path = conf.getValue(); + String graphConfPath = conf.getValue(); HugeFactory.checkGraphName(name, "rest-server.properties"); try { - this.loadGraph(name, path); + this.loadGraph(name, graphConfPath); } catch (RuntimeException e) { - LOG.error("Graph '{}' can't be loaded: '{}'", name, path, e); + LOG.error("Graph '{}' can't be loaded: '{}'", + name, graphConfPath, e); } } } - public void waitGraphsStarted() { - this.graphs.keySet().forEach(name -> { - HugeGraph graph = this.graph(name); - graph.waitStarted(); - }); - } - public HugeGraph cloneGraph(String name, String newName, String configText) { /* @@ -289,6 +293,13 @@ private void startRpcServer() { } } + private com.alipay.remoting.rpc.RpcServer remotingRpcServer() { + ServerConfig serverConfig = Whitebox.getInternalState(this.rpcServer, + "serverConfig"); + return Whitebox.getInternalState(serverConfig.getServer(), + "remotingServer"); + } + private void destroyRpcServer() { try { this.rpcClient.destroy(); @@ -331,21 +342,41 @@ private void closeTx(final Set graphSourceNamesToCloseTxOn, }); } - private void loadGraph(String name, String path) { - final Graph graph = GraphFactory.open(path); + private void loadGraph(String name, String graphConfPath) { + HugeConfig config = new HugeConfig(graphConfPath); + + // Transfer `raft.group_peers` from server config to graph config + String raftGroupPeers = this.conf.get(ServerOptions.RAFT_GROUP_PEERS); + config.addProperty(ServerOptions.RAFT_GROUP_PEERS.name(), + raftGroupPeers); + + Graph graph = GraphFactory.open(config); this.graphs.put(name, graph); - HugeConfig config = (HugeConfig) graph.configuration(); - config.file(path); - LOG.info("Graph '{}' was successfully configured via '{}'", name, path); + + HugeConfig graphConfig = (HugeConfig) graph.configuration(); + assert graphConfPath.equals(graphConfig.file().getPath()); + + LOG.info("Graph '{}' was successfully configured via '{}'", + name, graphConfPath); if (this.requireAuthentication() && !(graph instanceof HugeGraphAuthProxy)) { LOG.warn("You may need to support access control for '{}' with {}", - path, HugeFactoryAuthProxy.GRAPH_FACTORY); + graphConfPath, HugeFactoryAuthProxy.GRAPH_FACTORY); } } + private void waitGraphsReady() { + com.alipay.remoting.rpc.RpcServer remotingRpcServer = + this.remotingRpcServer(); + this.graphs.keySet().forEach(name -> { + HugeGraph graph = this.graph(name); + graph.waitReady(remotingRpcServer); + }); + } + private void checkBackendVersionOrExit(HugeConfig config) { + LOG.info("Check backend version"); for (String graph : this.graphs()) { // TODO: close tx from main thread HugeGraph hugegraph = this.graph(graph); diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/HugeGraph.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/HugeGraph.java index 673671c794..caf706ee11 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/HugeGraph.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/HugeGraph.java @@ -31,6 +31,7 @@ import org.apache.tinkerpop.gremlin.structure.Vertex; import org.apache.tinkerpop.gremlin.structure.VertexProperty; +import com.alipay.remoting.rpc.RpcServer; import com.baidu.hugegraph.auth.AuthManager; import com.baidu.hugegraph.backend.id.Id; import com.baidu.hugegraph.backend.query.Query; @@ -192,7 +193,7 @@ public interface HugeGraph extends Graph { void readMode(GraphReadMode readMode); - void waitStarted(); + void waitReady(RpcServer rpcServer); void serverStarted(Id serverId, NodeRole serverRole); @@ -227,7 +228,7 @@ public interface HugeGraph extends Graph { TaskScheduler taskScheduler(); - RaftGroupManager raftGroupManager(String group); + RaftGroupManager raftGroupManager(); void proxy(HugeGraph graph); diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/StandardHugeGraph.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/StandardHugeGraph.java index f4b9691310..569d3d9e27 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/StandardHugeGraph.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/StandardHugeGraph.java @@ -39,6 +39,7 @@ import org.apache.tinkerpop.gremlin.structure.util.StringFactory; import org.slf4j.Logger; +import com.alipay.remoting.rpc.RpcServer; import com.baidu.hugegraph.analyzer.Analyzer; import com.baidu.hugegraph.analyzer.AnalyzerFactory; import com.baidu.hugegraph.auth.AuthManager; @@ -206,7 +207,7 @@ public StandardHugeGraph(HugeConfig config) { LockUtil.destroy(this.name); String message = "Failed to load backend store provider"; LOG.error("{}: {}", message, e.getMessage()); - throw new HugeException(message); + throw new HugeException(message, e); } try { @@ -310,10 +311,10 @@ public void readMode(GraphReadMode readMode) { } @Override - public void waitStarted() { + public void waitReady(RpcServer rpcServer) { // Just for trigger Tx.getOrNewTransaction, then load 3 stores this.schemaTransaction(); - this.storeProvider.waitStoreStarted(); + this.storeProvider.waitReady(rpcServer); } @Override @@ -1004,13 +1005,13 @@ public void switchAuthManager(AuthManager authManager) { } @Override - public RaftGroupManager raftGroupManager(String group) { + public RaftGroupManager raftGroupManager() { if (!(this.storeProvider instanceof RaftBackendStoreProvider)) { return null; } RaftBackendStoreProvider provider = ((RaftBackendStoreProvider) this.storeProvider); - return provider.raftNodeManager(group); + return provider.raftNodeManager(); } @Override diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/AbstractBackendStoreProvider.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/AbstractBackendStoreProvider.java index 005d133ba4..b7e6951d64 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/AbstractBackendStoreProvider.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/AbstractBackendStoreProvider.java @@ -25,6 +25,7 @@ import org.slf4j.Logger; +import com.alipay.remoting.rpc.RpcServer; import com.baidu.hugegraph.HugeGraph; import com.baidu.hugegraph.backend.BackendException; import com.baidu.hugegraph.backend.store.raft.StoreSnapshotFile; @@ -94,8 +95,8 @@ public void open(String graph) { } @Override - public void waitStoreStarted() { - // pass + public void waitReady(RpcServer rpcServer) { + // passs } @Override diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/BackendProviderFactory.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/BackendProviderFactory.java index deac4254d8..c8d22450a0 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/BackendProviderFactory.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/BackendProviderFactory.java @@ -52,7 +52,7 @@ public static BackendStoreProvider open(HugeGraphParams params) { if (raftMode) { LOG.info("Opening backend store '{}' in raft mode for graph '{}'", backend, graph); - provider = new RaftBackendStoreProvider(provider, params); + provider = new RaftBackendStoreProvider(params, provider); } provider.open(graph); return provider; diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/BackendStoreProvider.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/BackendStoreProvider.java index 3d794d2c9d..2b84853267 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/BackendStoreProvider.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/BackendStoreProvider.java @@ -19,6 +19,7 @@ package com.baidu.hugegraph.backend.store; +import com.alipay.remoting.rpc.RpcServer; import com.baidu.hugegraph.HugeGraph; import com.baidu.hugegraph.config.HugeConfig; import com.baidu.hugegraph.event.EventHub; @@ -43,7 +44,7 @@ public interface BackendStoreProvider { void open(String name); - void waitStoreStarted(); + void waitReady(RpcServer rpcServer); void close(); diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftAddPeerJob.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftAddPeerJob.java new file mode 100644 index 0000000000..0fd1d49c37 --- /dev/null +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftAddPeerJob.java @@ -0,0 +1,50 @@ +/* + * Copyright 2017 HugeGraph Authors + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.baidu.hugegraph.backend.store.raft; + +import java.util.Map; + +import com.baidu.hugegraph.job.SysJob; +import com.baidu.hugegraph.util.E; +import com.baidu.hugegraph.util.JsonUtil; + +public class RaftAddPeerJob extends SysJob { + + public static final String TASK_TYPE = "raft_add_peer"; + + @Override + public String type() { + return TASK_TYPE; + } + + @Override + public String execute() throws Exception { + String input = this.task().input(); + E.checkArgumentNotNull(input, "The input can't be null"); + @SuppressWarnings("unchecked") + Map map = JsonUtil.fromJson(input, Map.class); + + Object value = map.get("endpoint"); + E.checkArgument(value instanceof String, + "Invalid endpoint value '%s'", value); + String endpoint = (String) value; + return this.graph().raftGroupManager().addPeer(endpoint); + } +} diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftBackendStore.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftBackendStore.java index 06ddf9e20b..8032691816 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftBackendStore.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftBackendStore.java @@ -48,11 +48,11 @@ public class RaftBackendStore implements BackendStore { private static final Logger LOG = Log.logger(RaftBackendStore.class); private final BackendStore store; - private final RaftSharedContext context; + private final RaftContext context; private final ThreadLocal mutationBatch; private final boolean isSafeRead; - public RaftBackendStore(BackendStore store, RaftSharedContext context) { + public RaftBackendStore(BackendStore store, RaftContext context) { this.store = store; this.context = context; this.mutationBatch = new ThreadLocal<>(); diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftBackendStoreProvider.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftBackendStoreProvider.java index 7e9ca5e66e..145951604f 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftBackendStoreProvider.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftBackendStoreProvider.java @@ -24,8 +24,10 @@ import org.slf4j.Logger; +import com.alipay.remoting.rpc.RpcServer; import com.baidu.hugegraph.HugeGraph; import com.baidu.hugegraph.HugeGraphParams; +import com.baidu.hugegraph.backend.BackendException; import com.baidu.hugegraph.backend.store.BackendStore; import com.baidu.hugegraph.backend.store.BackendStoreProvider; import com.baidu.hugegraph.backend.store.BackendStoreSystemInfo; @@ -44,22 +46,29 @@ public class RaftBackendStoreProvider implements BackendStoreProvider { private static final Logger LOG = Log.logger(RaftBackendStoreProvider.class); private final BackendStoreProvider provider; - private final RaftSharedContext context; + private final RaftContext context; + private RaftBackendStore schemaStore; private RaftBackendStore graphStore; private RaftBackendStore systemStore; - - public RaftBackendStoreProvider(BackendStoreProvider provider, - HugeGraphParams params) { + public RaftBackendStoreProvider(HugeGraphParams params, + BackendStoreProvider provider) { this.provider = provider; - this.context = new RaftSharedContext(params); this.schemaStore = null; this.graphStore = null; this.systemStore = null; + this.context = new RaftContext(params); } - public RaftGroupManager raftNodeManager(String group) { - return this.context.raftNodeManager(group); + public RaftGroupManager raftNodeManager() { + return this.context().raftNodeManager(); + } + + private RaftContext context() { + if (this.context == null) { + E.checkState(false, "Please ensure init raft context"); + } + return this.context; } private Set stores() { @@ -102,8 +111,8 @@ public synchronized BackendStore loadSchemaStore(HugeConfig config, String name) LOG.info("Init raft backend schema store"); BackendStore store = this.provider.loadSchemaStore(config, name); this.checkNonSharedStore(store); - this.schemaStore = new RaftBackendStore(store, this.context); - this.context.addStore(StoreType.SCHEMA, this.schemaStore); + this.schemaStore = new RaftBackendStore(store, this.context()); + this.context().addStore(StoreType.SCHEMA, this.schemaStore); } return this.schemaStore; } @@ -114,8 +123,8 @@ public synchronized BackendStore loadGraphStore(HugeConfig config, String name) LOG.info("Init raft backend graph store"); BackendStore store = this.provider.loadGraphStore(config, name); this.checkNonSharedStore(store); - this.graphStore = new RaftBackendStore(store, this.context); - this.context.addStore(StoreType.GRAPH, this.graphStore); + this.graphStore = new RaftBackendStore(store, this.context()); + this.context().addStore(StoreType.GRAPH, this.graphStore); } return this.graphStore; } @@ -126,8 +135,8 @@ public synchronized BackendStore loadSystemStore(HugeConfig config, String name) LOG.info("Init raft backend system store"); BackendStore store = this.provider.loadSystemStore(config, name); this.checkNonSharedStore(store); - this.systemStore = new RaftBackendStore(store, this.context); - this.context.addStore(StoreType.SYSTEM, this.systemStore); + this.systemStore = new RaftBackendStore(store, this.context()); + this.context().addStore(StoreType.SYSTEM, this.systemStore); } return this.systemStore; } @@ -138,18 +147,20 @@ public void open(String name) { } @Override - public void waitStoreStarted() { - this.context.initRaftNode(); + public void waitReady(RpcServer rpcServer) { + this.context().initRaftNode(rpcServer); LOG.info("The raft node is initialized"); - this.context.waitRaftNodeStarted(); + this.context().waitRaftNodeStarted(); LOG.info("The raft store is started"); } @Override public void close() { this.provider.close(); - this.context.close(); + if (this.context != null) { + this.context.close(); + } } @Override @@ -218,8 +229,15 @@ public void createSnapshot() { StoreCommand command = new StoreCommand(StoreType.GRAPH, StoreAction.SNAPSHOT, null); RaftStoreClosure closure = new RaftStoreClosure(command); - this.context.node().submitAndWait(command, closure); - LOG.debug("Graph '{}' has writed snapshot", this.graph()); + RaftClosure future = this.context().node().submitAndWait(command, + closure); + E.checkState(future != null, "The snapshot future can't be null"); + try { + future.waitFinished(); + LOG.debug("Graph '{}' has writed snapshot", this.graph()); + } catch (Throwable e) { + throw new BackendException("Failed to create snapshot", e); + } } @Override diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftClosure.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftClosure.java index 1808bc85b6..d81559ea4d 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftClosure.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftClosure.java @@ -57,7 +57,7 @@ public Status status() { private RaftResult get() { try { - return this.future.get(RaftSharedContext.WAIT_RAFTLOG_TIMEOUT, + return this.future.get(RaftContext.WAIT_RAFTLOG_TIMEOUT, TimeUnit.MILLISECONDS); } catch (ExecutionException e) { throw new BackendException("ExecutionException", e); @@ -73,6 +73,7 @@ public void complete(Status status) { } public void complete(Status status, Supplier callback) { + // This callback is called by consumer thread(like grizzly) this.future.complete(new RaftResult<>(status, callback)); } diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftSharedContext.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftContext.java similarity index 77% rename from hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftSharedContext.java rename to hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftContext.java index 3176cafc8d..7197dd4614 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftSharedContext.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftContext.java @@ -42,6 +42,7 @@ import com.alipay.sofa.jraft.option.ReadOnlyOption; import com.alipay.sofa.jraft.rpc.RaftRpcServerFactory; import com.alipay.sofa.jraft.rpc.RpcServer; +import com.alipay.sofa.jraft.rpc.impl.BoltRpcServer; import com.alipay.sofa.jraft.util.NamedThreadFactory; import com.alipay.sofa.jraft.util.ThreadPoolUtil; import com.baidu.hugegraph.HugeException; @@ -67,16 +68,17 @@ import com.baidu.hugegraph.util.Events; import com.baidu.hugegraph.util.Log; -public final class RaftSharedContext { +public final class RaftContext { - private static final Logger LOG = Log.logger(RaftSharedContext.class); + private static final Logger LOG = Log.logger(RaftContext.class); // unit is ms public static final int NO_TIMEOUT = -1; - public static final int POLL_INTERVAL = 3000; + public static final int POLL_INTERVAL = 5000; public static final int WAIT_RAFTLOG_TIMEOUT = 30 * 60 * 1000; - public static final int WAIT_LEADER_TIMEOUT = 5 * 60 * 1000; - public static final int BUSY_SLEEP_FACTOR = 3 * 1000; + public static final int WAIT_LEADER_TIMEOUT = 10 * 60 * 1000; + public static final int BUSY_MIN_SLEEP_FACTOR = 3 * 1000; + public static final int BUSY_MAX_SLEEP_FACTOR = 5 * 1000; public static final int WAIT_RPC_TIMEOUT = 30 * 60 * 1000; public static final int LOG_WARN_INTERVAL = 60 * 1000; @@ -85,56 +87,81 @@ public final class RaftSharedContext { // work queue size public static final int QUEUE_SIZE = CoreOptions.CPUS; - public static final long KEEP_ALIVE_SEC = 300L; - - public static final String DEFAULT_GROUP = "default"; + public static final long KEEP_ALIVE_SECOND = 300L; private final HugeGraphParams params; + + private final Configuration groupPeers; + private final String schemaStoreName; private final String graphStoreName; private final String systemStoreName; + private final RaftBackendStore[] stores; - private final RpcServer rpcServer; - @SuppressWarnings("unused") + private final ExecutorService readIndexExecutor; private final ExecutorService snapshotExecutor; private final ExecutorService backendExecutor; + private RpcServer raftRpcServer; + private PeerId endpoint; + private RaftNode raftNode; private RaftGroupManager raftGroupManager; private RpcForwarder rpcForwarder; - public RaftSharedContext(HugeGraphParams params) { + public RaftContext(HugeGraphParams params) { this.params = params; - HugeConfig config = this.config(); + + HugeConfig config = params.configuration(); + + /* + * NOTE: `raft.group_peers` option is transfered from ServerConfig + * to CoreConfig, since it's shared by all graphs. + */ + String groupPeersString = this.config().getString("raft.group_peers"); + E.checkArgument(groupPeersString != null, + "Please ensure config `raft.group_peers` in raft mode"); + this.groupPeers = new Configuration(); + if (!this.groupPeers.parse(groupPeersString)) { + throw new HugeException("Failed to parse raft.group_peers: '%s'", + groupPeersString); + } this.schemaStoreName = config.get(CoreOptions.STORE_SCHEMA); this.graphStoreName = config.get(CoreOptions.STORE_GRAPH); this.systemStoreName = config.get(CoreOptions.STORE_SYSTEM); + this.stores = new RaftBackendStore[StoreType.ALL.getNumber()]; - this.rpcServer = this.initAndStartRpcServer(); + if (config.get(CoreOptions.RAFT_SAFE_READ)) { int threads = config.get(CoreOptions.RAFT_READ_INDEX_THREADS); this.readIndexExecutor = this.createReadIndexExecutor(threads); } else { this.readIndexExecutor = null; } - if (config.get(CoreOptions.RAFT_USE_SNAPSHOT)) { - this.snapshotExecutor = this.createSnapshotExecutor(4); - } else { - this.snapshotExecutor = null; - } - int backendThreads = config.get(CoreOptions.RAFT_BACKEND_THREADS); - this.backendExecutor = this.createBackendExecutor(backendThreads); + + int threads = config.get(CoreOptions.RAFT_SNAPSHOT_THREADS); + this.snapshotExecutor = this.createSnapshotExecutor(threads); + + threads = config.get(CoreOptions.RAFT_BACKEND_THREADS); + this.backendExecutor = this.createBackendExecutor(threads); + + this.raftRpcServer = null; + this.endpoint = null; this.raftNode = null; this.raftGroupManager = null; this.rpcForwarder = null; + } + + public void initRaftNode(com.alipay.remoting.rpc.RpcServer rpcServer) { + this.raftRpcServer = this.wrapRpcServer(rpcServer); + this.endpoint = new PeerId(rpcServer.ip(), rpcServer.port()); + this.registerRpcRequestProcessors(); LOG.info("Start raft server successfully: {}", this.endpoint()); - } - public void initRaftNode() { this.raftNode = new RaftNode(this); this.rpcForwarder = new RpcForwarder(this.raftNode.node()); this.raftGroupManager = new RaftGroupManagerImpl(this); @@ -142,10 +169,8 @@ public void initRaftNode() { public void waitRaftNodeStarted() { RaftNode node = this.node(); - node.waitLeaderElected(RaftSharedContext.WAIT_LEADER_TIMEOUT); - if (node.selfIsLeader()) { - node.waitStarted(RaftSharedContext.NO_TIMEOUT); - } + node.waitLeaderElected(RaftContext.WAIT_LEADER_TIMEOUT); + node.waitRaftLogSynced(RaftContext.NO_TIMEOUT); } public void close() { @@ -163,19 +188,21 @@ public RaftNode node() { return this.raftNode; } - public RpcForwarder rpcForwarder() { + protected RpcServer rpcServer() { + return this.raftRpcServer; + } + + protected RpcForwarder rpcForwarder() { return this.rpcForwarder; } - public RaftGroupManager raftNodeManager(String group) { - E.checkArgument(DEFAULT_GROUP.equals(group), - "The group must be '%s' now, actual is '%s'", - DEFAULT_GROUP, group); + public RaftGroupManager raftNodeManager() { return this.raftGroupManager; } public String group() { - return DEFAULT_GROUP; + // Use graph name as group name + return this.params.name(); } public void addStore(StoreType type, RaftBackendStore store) { @@ -206,8 +233,6 @@ public BackendStore originStore(StoreType storeType) { public NodeOptions nodeOptions() throws IOException { HugeConfig config = this.config(); - PeerId selfId = new PeerId(); - selfId.parse(config.get(CoreOptions.RAFT_ENDPOINT)); NodeOptions nodeOptions = new NodeOptions(); nodeOptions.setEnableMetrics(false); @@ -216,7 +241,9 @@ public NodeOptions nodeOptions() throws IOException { nodeOptions.setRpcConnectTimeoutMs( config.get(CoreOptions.RAFT_RPC_CONNECT_TIMEOUT)); nodeOptions.setRpcDefaultTimeout( - config.get(CoreOptions.RAFT_RPC_TIMEOUT)); + 1000 * config.get(CoreOptions.RAFT_RPC_TIMEOUT)); + nodeOptions.setRpcInstallSnapshotTimeout( + 1000 * config.get(CoreOptions.RAFT_INSTALL_SNAPSHOT_TIMEOUT)); int electionTimeout = config.get(CoreOptions.RAFT_ELECTION_TIMEOUT); nodeOptions.setElectionTimeoutMs(electionTimeout); @@ -224,14 +251,7 @@ public NodeOptions nodeOptions() throws IOException { int snapshotInterval = config.get(CoreOptions.RAFT_SNAPSHOT_INTERVAL); nodeOptions.setSnapshotIntervalSecs(snapshotInterval); - - Configuration groupPeers = new Configuration(); - String groupPeersStr = config.get(CoreOptions.RAFT_GROUP_PEERS); - if (!groupPeers.parse(groupPeersStr)) { - throw new HugeException("Failed to parse group peers %s", - groupPeersStr); - } - nodeOptions.setInitialConf(groupPeers); + nodeOptions.setInitialConf(this.groupPeers); String raftPath = config.get(CoreOptions.RAFT_PATH); String logUri = Paths.get(raftPath, "log").toString(); @@ -242,11 +262,9 @@ public NodeOptions nodeOptions() throws IOException { FileUtils.forceMkdir(new File(metaUri)); nodeOptions.setRaftMetaUri(metaUri); - if (config.get(CoreOptions.RAFT_USE_SNAPSHOT)) { - String snapshotUri = Paths.get(raftPath, "snapshot").toString(); - FileUtils.forceMkdir(new File(snapshotUri)); - nodeOptions.setSnapshotUri(snapshotUri); - } + String snapshotUri = Paths.get(raftPath, "snapshot").toString(); + FileUtils.forceMkdir(new File(snapshotUri)); + nodeOptions.setSnapshotUri(snapshotUri); RaftOptions raftOptions = nodeOptions.getRaftOptions(); /* @@ -329,22 +347,13 @@ protected void notifyCache(String action, HugeType type, List ids) { } public PeerId endpoint() { - PeerId endpoint = new PeerId(); - String endpointStr = this.config().get(CoreOptions.RAFT_ENDPOINT); - if (!endpoint.parse(endpointStr)) { - throw new HugeException("Failed to parse endpoint %s", endpointStr); - } - return endpoint; + return this.endpoint; } public boolean safeRead() { return this.config().get(CoreOptions.RAFT_SAFE_READ); } - public boolean useSnapshot() { - return this.config().get(CoreOptions.RAFT_USE_SNAPSHOT); - } - public ExecutorService snapshotExecutor() { return this.snapshotExecutor; } @@ -353,6 +362,10 @@ public ExecutorService backendExecutor() { return this.backendExecutor; } + public ExecutorService readIndexExecutor() { + return this.readIndexExecutor; + } + public GraphMode graphMode() { return this.params.mode(); } @@ -361,6 +374,7 @@ private HugeConfig config() { return this.params.configuration(); } + @SuppressWarnings("unused") private RpcServer initAndStartRpcServer() { Integer lowWaterMark = this.config().get( CoreOptions.RAFT_RPC_BUF_LOW_WATER_MARK); @@ -375,19 +389,38 @@ private RpcServer initAndStartRpcServer() { NodeManager.getInstance().addAddress(endpoint.getEndpoint()); RpcServer rpcServer = RaftRpcServerFactory.createAndStartRaftRpcServer( endpoint.getEndpoint()); + LOG.info("Raft-RPC server is started successfully"); return rpcServer; } + private RpcServer wrapRpcServer(com.alipay.remoting.rpc.RpcServer rpcServer) { + // TODO: pass ServerOptions instead of CoreOptions, to share by graphs + Integer lowWaterMark = this.config().get( + CoreOptions.RAFT_RPC_BUF_LOW_WATER_MARK); + System.setProperty("bolt.channel_write_buf_low_water_mark", + String.valueOf(lowWaterMark)); + Integer highWaterMark = this.config().get( + CoreOptions.RAFT_RPC_BUF_HIGH_WATER_MARK); + System.setProperty("bolt.channel_write_buf_high_water_mark", + String.valueOf(highWaterMark)); + + // Reference from RaftRpcServerFactory.createAndStartRaftRpcServer + RpcServer raftRpcServer = new BoltRpcServer(rpcServer); + RaftRpcServerFactory.addRaftRequestProcessors(raftRpcServer); + + return raftRpcServer; + } + private void shutdownRpcServer() { - this.rpcServer.shutdown(); + this.raftRpcServer.shutdown(); PeerId endpoint = this.endpoint(); NodeManager.getInstance().removeAddress(endpoint.getEndpoint()); } private void registerRpcRequestProcessors() { - this.rpcServer.registerProcessor(new StoreCommandProcessor(this)); - this.rpcServer.registerProcessor(new SetLeaderProcessor(this)); - this.rpcServer.registerProcessor(new ListPeersProcessor(this)); + this.raftRpcServer.registerProcessor(new StoreCommandProcessor(this)); + this.raftRpcServer.registerProcessor(new SetLeaderProcessor(this)); + this.raftRpcServer.registerProcessor(new ListPeersProcessor(this)); } private ExecutorService createReadIndexExecutor(int coreThreads) { @@ -421,7 +454,7 @@ private static ExecutorService newPool(int coreThreads, int maxThreads, .enableMetric(false) .coreThreads(coreThreads) .maximumThreads(maxThreads) - .keepAliveSeconds(KEEP_ALIVE_SEC) + .keepAliveSeconds(KEEP_ALIVE_SECOND) .workQueue(queue) .threadFactory(new NamedThreadFactory(name, true)) .rejectedHandler(handler) diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftGroupManagerImpl.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftGroupManagerImpl.java index d1b15b634e..a000403711 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftGroupManagerImpl.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftGroupManagerImpl.java @@ -40,7 +40,7 @@ public class RaftGroupManagerImpl implements RaftGroupManager { private final RaftNode raftNode; private final RpcForwarder rpcForwarder; - public RaftGroupManagerImpl(RaftSharedContext context) { + public RaftGroupManagerImpl(RaftContext context) { this.group = context.group(); this.raftNode = context.node(); this.rpcForwarder = context.rpcForwarder(); diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftNode.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftNode.java index 078b9382fc..53ad9bcbbb 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftNode.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftNode.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.nio.ByteBuffer; +import java.util.Random; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -28,7 +29,7 @@ import org.slf4j.Logger; import com.alipay.sofa.jraft.Node; -import com.alipay.sofa.jraft.RaftServiceFactory; +import com.alipay.sofa.jraft.RaftGroupService; import com.alipay.sofa.jraft.Status; import com.alipay.sofa.jraft.closure.ReadIndexClosure; import com.alipay.sofa.jraft.core.Replicator.ReplicatorStateListener; @@ -36,6 +37,7 @@ import com.alipay.sofa.jraft.entity.Task; import com.alipay.sofa.jraft.error.RaftError; import com.alipay.sofa.jraft.option.NodeOptions; +import com.alipay.sofa.jraft.rpc.RpcServer; import com.alipay.sofa.jraft.util.BytesUtil; import com.baidu.hugegraph.backend.BackendException; import com.baidu.hugegraph.util.LZ4Util; @@ -45,17 +47,19 @@ public final class RaftNode { private static final Logger LOG = Log.logger(RaftNode.class); - private final RaftSharedContext context; + private final RaftContext context; + private RaftGroupService raftGroupService; private final Node node; private final StoreStateMachine stateMachine; private final AtomicReference leaderInfo; private final AtomicBoolean started; private final AtomicInteger busyCounter; - public RaftNode(RaftSharedContext context) { + public RaftNode(RaftContext context) { this.context = context; this.stateMachine = new StoreStateMachine(context); try { + // Start raft node this.node = this.initRaftNode(); LOG.info("Start raft node: {}", this); } catch (IOException e) { @@ -67,7 +71,7 @@ public RaftNode(RaftSharedContext context) { this.busyCounter = new AtomicInteger(); } - protected RaftSharedContext context() { + protected RaftContext context() { return this.context; } @@ -96,16 +100,23 @@ public void onLeaderInfoChange(PeerId leaderId, boolean selfIsLeader) { public void shutdown() { LOG.info("Shutdown raft node: {}", this); this.node.shutdown(); - } - public void snapshot() { - if (!this.context.useSnapshot()) { - return; + if (this.raftGroupService != null) { + this.raftGroupService.shutdown(); + try { + this.raftGroupService.join(); + } catch (final InterruptedException e) { + throw new RaftException( + "Interrupted while shutdown raftGroupService"); + } } + } + + public RaftClosure snapshot() { RaftClosure future = new RaftClosure<>(); try { this.node().snapshot(future); - future.waitFinished(); + return future; } catch (Throwable e) { throw new BackendException("Failed to create snapshot", e); } @@ -115,7 +126,7 @@ public void readIndex(byte[] reqCtx, ReadIndexClosure done) { this.node.readIndex(reqCtx, done); } - public Object submitAndWait(StoreCommand command, RaftStoreClosure future) { + public T submitAndWait(StoreCommand command, RaftStoreClosure future) { // Submit command to raft node this.submitCommand(command, future); @@ -126,7 +137,9 @@ public Object submitAndWait(StoreCommand command, RaftStoreClosure future) { * 2.If on the follower, request command will be forwarded to the * leader, actually it has waited in forwardToLeader(). */ - return future.waitFinished(); + @SuppressWarnings("unchecked") + T result = (T) future.waitFinished(); + return result; } catch (Throwable e) { throw new BackendException("Failed to wait store command %s", e, command); @@ -136,7 +149,7 @@ public Object submitAndWait(StoreCommand command, RaftStoreClosure future) { private void submitCommand(StoreCommand command, RaftStoreClosure future) { // Wait leader elected LeaderInfo leaderInfo = this.waitLeaderElected( - RaftSharedContext.NO_TIMEOUT); + RaftContext.WAIT_LEADER_TIMEOUT); // If myself is not leader, forward to the leader if (!leaderInfo.selfIsLeader) { this.context.rpcForwarder().forwardToLeader(leaderInfo.leaderId, @@ -150,7 +163,7 @@ private void submitCommand(StoreCommand command, RaftStoreClosure future) { Task task = new Task(); // Compress data, note compress() will return a BytesBuffer ByteBuffer buffer = LZ4Util.compress(command.data(), - RaftSharedContext.BLOCK_SIZE) + RaftContext.BLOCK_SIZE) .forReadWritten() .asByteBuffer(); LOG.debug("Submit to raft node '{}', the compressed bytes of command " + @@ -166,13 +179,14 @@ protected LeaderInfo waitLeaderElected(int timeout) { if (leaderInfo.leaderId != null) { return leaderInfo; } + LOG.info("Waiting for raft group '{}' leader elected", group); long beginTime = System.currentTimeMillis(); while (leaderInfo.leaderId == null) { try { - Thread.sleep(RaftSharedContext.POLL_INTERVAL); + Thread.sleep(RaftContext.POLL_INTERVAL); } catch (InterruptedException e) { - LOG.info("Waiting for raft group '{}' election is " + - "interrupted: {}", group, e); + throw new BackendException("Interrupted while waiting for " + + "raft group '%s' election", group, e); } long consumedTime = System.currentTimeMillis() - beginTime; if (timeout > 0 && consumedTime >= timeout) { @@ -180,42 +194,38 @@ protected LeaderInfo waitLeaderElected(int timeout) { "Waiting for raft group '%s' election timeout(%sms)", group, consumedTime); } - LOG.warn("Waiting for raft group '{}' election cost {}s", - group, consumedTime / 1000.0); leaderInfo = this.leaderInfo.get(); assert leaderInfo != null; } + LOG.info("Waited for raft group '{}' leader elected successfully", group); return leaderInfo; } - protected void waitStarted(int timeout) { + protected void waitRaftLogSynced(int timeout) { String group = this.context.group(); - ReadIndexClosure readIndexClosure = new ReadIndexClosure() { - @Override - public void run(Status status, long index, byte[] reqCtx) { - RaftNode.this.started.set(status.isOk()); - } - }; + LOG.info("Waiting for raft group '{}' log synced", group); long beginTime = System.currentTimeMillis(); - while (true) { - this.node.readIndex(BytesUtil.EMPTY_BYTES, readIndexClosure); - if (this.started.get()) { - break; - } + while (!this.started.get()) { + this.node.readIndex(BytesUtil.EMPTY_BYTES, new ReadIndexClosure() { + @Override + public void run(Status status, long index, byte[] reqCtx) { + RaftNode.this.started.set(status.isOk()); + } + }); try { - Thread.sleep(RaftSharedContext.POLL_INTERVAL); + Thread.sleep(RaftContext.POLL_INTERVAL); } catch (InterruptedException e) { - LOG.info("Waiting for heartbeat is interrupted: {}", e); + throw new BackendException("Interrupted while waiting for " + + "raft group '%s' log synced", group, e); } long consumedTime = System.currentTimeMillis() - beginTime; if (timeout > 0 && consumedTime >= timeout) { throw new BackendException( - "Waiting for raft group '%s' heartbeat timeout(%sms)", + "Waiting for raft group '%s' log synced timeout(%sms)", group, consumedTime); } - LOG.warn("Waiting for raft group '{}' heartbeat cost {}s", - group, consumedTime / 1000.0); } + LOG.info("Waited for raft group '{}' log synced successfully", group); } private void waitIfBusy() { @@ -224,8 +234,12 @@ private void waitIfBusy() { return; } // It may lead many thread sleep, but this is exactly what I want - long time = counter * RaftSharedContext.BUSY_SLEEP_FACTOR; - LOG.info("The node {} will try to sleep {} ms", this.node, time); + int delta = RaftContext.BUSY_MAX_SLEEP_FACTOR - + RaftContext.BUSY_MIN_SLEEP_FACTOR; + Random random = new Random(); + int timeout = random.nextInt(delta) + + RaftContext.BUSY_MIN_SLEEP_FACTOR; + int time = counter * timeout; try { Thread.sleep(time); } catch (InterruptedException e) { @@ -246,17 +260,25 @@ private void waitIfBusy() { private Node initRaftNode() throws IOException { NodeOptions nodeOptions = this.context.nodeOptions(); nodeOptions.setFsm(this.stateMachine); - // TODO: When support sharding, groupId needs to be bound to shard Id + /* + * TODO: the groupId is same as graph name now, when support sharding, + * groupId needs to be bound to shard Id + */ String groupId = this.context.group(); PeerId endpoint = this.context.endpoint(); + /* - * Start raft node with shared rpc server: - * return new RaftGroupService(groupId, endpoint, nodeOptions, - * this.context.rpcServer(), true) - * .start(false) + * Create RaftGroupService with shared rpc-server, then start raft node + * TODO: don't create + hold RaftGroupService and just share rpc-server + * and create Node by RaftServiceFactory.createAndInitRaftNode() */ - return RaftServiceFactory.createAndInitRaftNode(groupId, endpoint, - nodeOptions); + RpcServer rpcServer = this.context.rpcServer(); + LOG.debug("Start raft node with endpoint '{}', initial conf [{}]", + endpoint, nodeOptions.getInitialConf()); + this.raftGroupService = new RaftGroupService(groupId, endpoint, + nodeOptions, + rpcServer, true); + return this.raftGroupService.start(false); } @Override @@ -286,7 +308,7 @@ public void onDestroyed(PeerId peer) { public void onError(PeerId peer, Status status) { long now = System.currentTimeMillis(); long interval = now - this.lastPrintTime; - if (interval >= RaftSharedContext.LOG_WARN_INTERVAL) { + if (interval >= RaftContext.LOG_WARN_INTERVAL) { LOG.warn("Replicator meet error: {}", status); this.lastPrintTime = now; } diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftRemovePeerJob.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftRemovePeerJob.java new file mode 100644 index 0000000000..eed62515dc --- /dev/null +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftRemovePeerJob.java @@ -0,0 +1,50 @@ +/* + * Copyright 2017 HugeGraph Authors + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.baidu.hugegraph.backend.store.raft; + +import java.util.Map; + +import com.baidu.hugegraph.job.SysJob; +import com.baidu.hugegraph.util.E; +import com.baidu.hugegraph.util.JsonUtil; + +public class RaftRemovePeerJob extends SysJob { + + public static final String TASK_TYPE = "raft_remove_peer"; + + @Override + public String type() { + return TASK_TYPE; + } + + @Override + public String execute() throws Exception { + String input = this.task().input(); + E.checkArgumentNotNull(input, "The input can't be null"); + @SuppressWarnings("unchecked") + Map map = JsonUtil.fromJson(input, Map.class); + + Object value = map.get("endpoint"); + E.checkArgument(value instanceof String, + "Invalid endpoint value '%s'", value); + String endpoint = (String) value; + return this.graph().raftGroupManager().removePeer(endpoint); + } +} diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/StoreSnapshotFile.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/StoreSnapshotFile.java index 54a8b9c32b..e54fef62f8 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/StoreSnapshotFile.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/StoreSnapshotFile.java @@ -27,6 +27,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.zip.Checksum; import org.apache.commons.io.FileUtils; @@ -52,10 +53,11 @@ public class StoreSnapshotFile { private static final Logger LOG = Log.logger(StoreSnapshotFile.class); public static final String SNAPSHOT_DIR = "snapshot"; - private static final String TAR = ".tar"; + private static final String TAR = ".zip"; private final RaftBackendStore[] stores; private final Map dataDisks; + private final AtomicBoolean compressing; public StoreSnapshotFile(RaftBackendStore[] stores) { this.stores = stores; @@ -65,6 +67,7 @@ public StoreSnapshotFile(RaftBackendStore[] stores) { this.dataDisks.putAll(Whitebox.invoke(raftStore, "store", "reportDiskMapping")); } + this.compressing = new AtomicBoolean(false); /* * Like that: * general=/parent_path/rocksdb-data @@ -79,6 +82,14 @@ public void save(SnapshotWriter writer, Closure done, // Write snapshot to real directory Map snapshotDirMaps = this.doSnapshotSave(); executor.execute(() -> { + if (!this.compressing.compareAndSet(false, true)) { + LOG.info("Last compress task doesn't finish, skipped it"); + done.run(new Status(RaftError.EBUSY, + "Last compress task doesn't finish, " + + "skipped it")); + return; + } + try { this.compressSnapshotDir(writer, snapshotDirMaps); this.deleteSnapshotDirs(snapshotDirMaps.keySet()); @@ -88,6 +99,8 @@ public void save(SnapshotWriter writer, Closure done, done.run(new Status(RaftError.EIO, "Failed to compress snapshot, " + "error is %s", e.getMessage())); + } finally { + this.compressing.compareAndSet(true, false); } }); } catch (Throwable e) { @@ -102,15 +115,22 @@ public boolean load(SnapshotReader reader) { Set snapshotDirTars = reader.listFiles(); LOG.info("The snapshot tar files to be loaded are {}", snapshotDirTars); Set snapshotDirs = new HashSet<>(); - for (String snapshotDirTar : snapshotDirTars) { - try { + if (!this.compressing.compareAndSet(false, true)) { + LOG.info("Last decompress task doesn't finish, skipped it"); + return false; + } + + try { + for (String snapshotDirTar : snapshotDirTars) { String snapshotDir = this.decompressSnapshot(reader, snapshotDirTar); snapshotDirs.add(snapshotDir); - } catch (Throwable e) { - LOG.error("Failed to decompress snapshot tar", e); - return false; } + } catch (Throwable e) { + LOG.error("Failed to decompress snapshot tar", e); + return false; + } finally { + this.compressing.compareAndSet(true, false); } try { @@ -151,7 +171,13 @@ private void compressSnapshotDir(SnapshotWriter writer, .toString(); Checksum checksum = new CRC64(); try { - CompressUtil.compressTar(snapshotDir, outputFile, checksum); + LOG.info("Prepare to compress dir '{}' to '{}'", + snapshotDir, outputFile); + long begin = System.currentTimeMillis(); + CompressUtil.compressZip(snapshotDir, outputFile, checksum); + long end = System.currentTimeMillis(); + LOG.info("Compressed dir '{}' to '{}', took {} seconds", + snapshotDir, outputFile, (end - begin) / 1000.0F); } catch (Throwable e) { throw new RaftException( "Failed to compress snapshot, path=%s, files=%s", @@ -195,7 +221,13 @@ private String decompressSnapshot(SnapshotReader reader, Checksum checksum = new CRC64(); String archiveFile = Paths.get(reader.getPath(), snapshotDirTar) .toString(); - CompressUtil.decompressTar(archiveFile, parentPath, checksum); + LOG.info("Prepare to decompress snapshot zip '{}' to '{}'", + archiveFile, parentPath); + long begin = System.currentTimeMillis(); + CompressUtil.decompressZip(archiveFile, parentPath, checksum); + long end = System.currentTimeMillis(); + LOG.info("Decompress snapshot zip '{}' to '{}', took {} seconds", + archiveFile, parentPath, (end - begin) / 1000.0F); if (meta.hasChecksum()) { String expected = meta.getChecksum(); String actual = Long.toHexString(checksum.getValue()); diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/StoreStateMachine.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/StoreStateMachine.java index 73b1cd7c24..d06481bf72 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/StoreStateMachine.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/StoreStateMachine.java @@ -52,10 +52,10 @@ public final class StoreStateMachine extends StateMachineAdapter { private static final Logger LOG = Log.logger(StoreStateMachine.class); - private final RaftSharedContext context; + private final RaftContext context; private final StoreSnapshotFile snapshotFile; - public StoreStateMachine(RaftSharedContext context) { + public StoreStateMachine(RaftContext context) { this.context = context; this.snapshotFile = new StoreSnapshotFile(context.stores()); } @@ -72,7 +72,7 @@ private RaftNode node() { public void onApply(Iterator iter) { LOG.debug("Node role: {}", this.node().selfIsLeader() ? "leader" : "follower"); - List> futures = new ArrayList<>(); + List> futures = new ArrayList<>(64); try { // Apply all the logs while (iter.hasNext()) { @@ -130,7 +130,7 @@ private Future onApplyFollower(ByteBuffer data) { // Let the backend thread do it directly return this.context.backendExecutor().submit(() -> { BytesBuffer buffer = LZ4Util.decompress(bytes, - RaftSharedContext.BLOCK_SIZE); + RaftContext.BLOCK_SIZE); buffer.forReadWritten(); StoreType type = StoreType.valueOf(buffer.read()); StoreAction action = StoreAction.valueOf(buffer.read()); @@ -161,8 +161,7 @@ private Object applyCommand(StoreType type, StoreAction action, break; case SNAPSHOT: assert store == null; - this.node().snapshot(); - break; + return this.node().snapshot(); case BEGIN_TX: store.beginTx(); break; diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/rpc/ListPeersProcessor.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/rpc/ListPeersProcessor.java index 328cf3c108..a79a075872 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/rpc/ListPeersProcessor.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/rpc/ListPeersProcessor.java @@ -24,7 +24,7 @@ import com.alipay.sofa.jraft.rpc.RpcRequestClosure; import com.alipay.sofa.jraft.rpc.RpcRequestProcessor; import com.baidu.hugegraph.backend.store.raft.RaftGroupManager; -import com.baidu.hugegraph.backend.store.raft.RaftSharedContext; +import com.baidu.hugegraph.backend.store.raft.RaftContext; import com.baidu.hugegraph.backend.store.raft.rpc.RaftRequests.CommonResponse; import com.baidu.hugegraph.backend.store.raft.rpc.RaftRequests.ListPeersRequest; import com.baidu.hugegraph.backend.store.raft.rpc.RaftRequests.ListPeersResponse; @@ -37,9 +37,9 @@ public class ListPeersProcessor private static final Logger LOG = Log.logger(ListPeersProcessor.class); - private final RaftSharedContext context; + private final RaftContext context; - public ListPeersProcessor(RaftSharedContext context) { + public ListPeersProcessor(RaftContext context) { super(null, null); this.context = context; } @@ -48,8 +48,7 @@ public ListPeersProcessor(RaftSharedContext context) { public Message processRequest(ListPeersRequest request, RpcRequestClosure done) { LOG.debug("Processing ListPeersRequest {}", request.getClass()); - RaftGroupManager nodeManager = this.context.raftNodeManager( - RaftSharedContext.DEFAULT_GROUP); + RaftGroupManager nodeManager = this.context.raftNodeManager(); try { CommonResponse common = CommonResponse.newBuilder() .setStatus(true) diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/rpc/RpcForwarder.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/rpc/RpcForwarder.java index 2c93d113ee..2d2b41c91a 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/rpc/RpcForwarder.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/rpc/RpcForwarder.java @@ -19,8 +19,6 @@ package com.baidu.hugegraph.backend.store.raft.rpc; -import static com.baidu.hugegraph.backend.store.raft.RaftSharedContext.WAIT_RPC_TIMEOUT; - import java.util.concurrent.ExecutionException; import org.slf4j.Logger; @@ -35,6 +33,7 @@ import com.alipay.sofa.jraft.util.Endpoint; import com.baidu.hugegraph.backend.BackendException; import com.baidu.hugegraph.backend.store.raft.RaftClosure; +import com.baidu.hugegraph.backend.store.raft.RaftContext; import com.baidu.hugegraph.backend.store.raft.RaftStoreClosure; import com.baidu.hugegraph.backend.store.raft.StoreCommand; import com.baidu.hugegraph.backend.store.raft.rpc.RaftRequests.CommonResponse; @@ -151,7 +150,7 @@ private void waitRpc(Endpoint endpoint, Message request, E.checkNotNull(endpoint, "leader endpoint"); try { this.rpcClient.invokeWithDone(endpoint, request, done, - WAIT_RPC_TIMEOUT) + RaftContext.WAIT_RPC_TIMEOUT) .get(); } catch (InterruptedException e) { throw new BackendException("Invoke rpc request was interrupted, " + diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/rpc/SetLeaderProcessor.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/rpc/SetLeaderProcessor.java index f110537ffb..56518f73f1 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/rpc/SetLeaderProcessor.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/rpc/SetLeaderProcessor.java @@ -24,7 +24,7 @@ import com.alipay.sofa.jraft.rpc.RpcRequestClosure; import com.alipay.sofa.jraft.rpc.RpcRequestProcessor; import com.baidu.hugegraph.backend.store.raft.RaftGroupManager; -import com.baidu.hugegraph.backend.store.raft.RaftSharedContext; +import com.baidu.hugegraph.backend.store.raft.RaftContext; import com.baidu.hugegraph.backend.store.raft.rpc.RaftRequests.CommonResponse; import com.baidu.hugegraph.backend.store.raft.rpc.RaftRequests.SetLeaderRequest; import com.baidu.hugegraph.backend.store.raft.rpc.RaftRequests.SetLeaderResponse; @@ -36,9 +36,9 @@ public class SetLeaderProcessor private static final Logger LOG = Log.logger(SetLeaderProcessor.class); - private final RaftSharedContext context; + private final RaftContext context; - public SetLeaderProcessor(RaftSharedContext context) { + public SetLeaderProcessor(RaftContext context) { super(null, null); this.context = context; } @@ -47,8 +47,7 @@ public SetLeaderProcessor(RaftSharedContext context) { public Message processRequest(SetLeaderRequest request, RpcRequestClosure done) { LOG.debug("Processing SetLeaderRequest {}", request.getClass()); - RaftGroupManager nodeManager = this.context.raftNodeManager( - RaftSharedContext.DEFAULT_GROUP); + RaftGroupManager nodeManager = this.context.raftNodeManager(); try { nodeManager.setLeader(request.getEndpoint()); CommonResponse common = CommonResponse.newBuilder() diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/rpc/StoreCommandProcessor.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/rpc/StoreCommandProcessor.java index 13ce2cfb8e..447f990137 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/rpc/StoreCommandProcessor.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/rpc/StoreCommandProcessor.java @@ -24,7 +24,7 @@ import com.alipay.sofa.jraft.rpc.RpcRequestClosure; import com.alipay.sofa.jraft.rpc.RpcRequestProcessor; import com.baidu.hugegraph.backend.store.raft.RaftNode; -import com.baidu.hugegraph.backend.store.raft.RaftSharedContext; +import com.baidu.hugegraph.backend.store.raft.RaftContext; import com.baidu.hugegraph.backend.store.raft.RaftStoreClosure; import com.baidu.hugegraph.backend.store.raft.StoreCommand; import com.baidu.hugegraph.backend.store.raft.rpc.RaftRequests.StoreAction; @@ -40,9 +40,9 @@ public class StoreCommandProcessor private static final Logger LOG = Log.logger( StoreCommandProcessor.class); - private final RaftSharedContext context; + private final RaftContext context; - public StoreCommandProcessor(RaftSharedContext context) { + public StoreCommandProcessor(RaftContext context) { super(null, null); this.context = context; } diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/tx/SchemaTransaction.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/tx/SchemaTransaction.java index 7563dbad04..7d6a4b1005 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/tx/SchemaTransaction.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/tx/SchemaTransaction.java @@ -69,7 +69,7 @@ public class SchemaTransaction extends IndexableTransaction { - private SchemaIndexTransaction indexTx; + private final SchemaIndexTransaction indexTx; public SchemaTransaction(HugeGraphParams graph, BackendStore store) { super(graph, store); diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/config/CoreOptions.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/config/CoreOptions.java index e5be7e6347..e158a4e3f2 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/config/CoreOptions.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/config/CoreOptions.java @@ -120,30 +120,6 @@ public static synchronized CoreOptions instance() { false ); - public static final ConfigOption RAFT_USE_SNAPSHOT = - new ConfigOption<>( - "raft.use_snapshot", - "Whether to use snapshot.", - disallowEmpty(), - true - ); - - public static final ConfigOption RAFT_ENDPOINT = - new ConfigOption<>( - "raft.endpoint", - "The peerid of current raft node.", - disallowEmpty(), - "127.0.0.1:8281" - ); - - public static final ConfigOption RAFT_GROUP_PEERS = - new ConfigOption<>( - "raft.group_peers", - "The peers of current raft group.", - disallowEmpty(), - "127.0.0.1:8281,127.0.0.1:8282,127.0.0.1:8283" - ); - public static final ConfigOption RAFT_PATH = new ConfigOption<>( "raft.path", @@ -179,6 +155,14 @@ public static synchronized CoreOptions instance() { 3600 ); + public static final ConfigOption RAFT_SNAPSHOT_THREADS = + new ConfigOption<>( + "raft.snapshot_threads", + "The thread number used to do snapshot.", + rangeInt(0, Integer.MAX_VALUE), + 4 + ); + public static final ConfigOption RAFT_BACKEND_THREADS = new ConfigOption<>( "raft.backend_threads", @@ -252,10 +236,19 @@ public static synchronized CoreOptions instance() { public static final ConfigOption RAFT_RPC_TIMEOUT = new ConfigOption<>( "raft.rpc_timeout", - "The rpc timeout for jraft rpc.", + "The general rpc timeout in seconds for jraft rpc.", + positiveInt(), + // jraft default value is 5s + 60 + ); + + public static final ConfigOption RAFT_INSTALL_SNAPSHOT_TIMEOUT = + new ConfigOption<>( + "raft.install_snapshot_rpc_timeout", + "The install snapshot rpc timeout in seconds for jraft rpc.", positiveInt(), - // jraft default value is 5000(ms) - 60000 + // jraft default value is 5 minutes + 10 * 60 * 60 ); public static final ConfigOption RAFT_RPC_BUF_LOW_WATER_MARK = diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/util/CompressUtil.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/util/CompressUtil.java index 2d28333147..72034a6ea3 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/util/CompressUtil.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/util/CompressUtil.java @@ -46,7 +46,7 @@ import org.apache.commons.io.IOUtils; import org.apache.commons.io.output.NullOutputStream; -import com.baidu.hugegraph.backend.store.raft.RaftSharedContext; +import com.baidu.hugegraph.backend.store.raft.RaftContext; import net.jpountz.lz4.LZ4BlockInputStream; import net.jpountz.lz4.LZ4BlockOutputStream; @@ -63,7 +63,7 @@ public static void compressTar(String inputDir, String outputFile, Checksum checksum) throws IOException { LZ4Factory factory = LZ4Factory.fastestInstance(); LZ4Compressor compressor = factory.fastCompressor(); - int blockSize = RaftSharedContext.BLOCK_SIZE; + int blockSize = RaftContext.BLOCK_SIZE; try (FileOutputStream fos = new FileOutputStream(outputFile); CheckedOutputStream cos = new CheckedOutputStream(fos, checksum); BufferedOutputStream bos = new BufferedOutputStream(cos); @@ -170,13 +170,20 @@ private static Path zipSlipProtect(ArchiveEntry entry, Path targetDir) * else throws exception */ Path normalizePath = targetDirResolved.normalize(); - if (!normalizePath.startsWith(targetDir)) { + if (!normalizePath.startsWith(targetDir.normalize())) { throw new IOException(String.format("Bad entry: %s", entry.getName())); } return normalizePath; } + public static void compressZip(String inputDir, String outputFile, + Checksum checksum) throws IOException { + String rootDir = Paths.get(inputDir).getParent().toString(); + String sourceDir = Paths.get(inputDir).getFileName().toString(); + compressZip(rootDir, sourceDir, outputFile, checksum); + } + public static void compressZip(String rootDir, String sourceDir, String outputFile, Checksum checksum) throws IOException { diff --git a/hugegraph-dist/src/assembly/static/conf/graphs/hugegraph.properties b/hugegraph-dist/src/assembly/static/conf/graphs/hugegraph.properties index 9dc75f7244..94af261455 100644 --- a/hugegraph-dist/src/assembly/static/conf/graphs/hugegraph.properties +++ b/hugegraph-dist/src/assembly/static/conf/graphs/hugegraph.properties @@ -25,23 +25,22 @@ serializer=binary store=hugegraph raft.mode=false -raft.safe_read=false -raft.use_snapshot=false -raft.endpoint=127.0.0.1:8281 -raft.group_peers=127.0.0.1:8281,127.0.0.1:8282,127.0.0.1:8283 raft.path=./raft-log +raft.safe_read=true raft.use_replicator_pipeline=true raft.election_timeout=10000 raft.snapshot_interval=3600 raft.backend_threads=48 raft.read_index_threads=8 +raft.snapshot_threads=4 raft.read_strategy=ReadOnlyLeaseBased raft.queue_size=16384 raft.queue_publish_timeout=60 raft.apply_batch=1 raft.rpc_threads=80 raft.rpc_connect_timeout=5000 -raft.rpc_timeout=60000 +raft.rpc_timeout=60 +raft.install_snapshot_rpc_timeout=36000 search.text_analyzer=jieba search.text_analyzer_mode=INDEX diff --git a/hugegraph-dist/src/assembly/static/conf/rest-server.properties b/hugegraph-dist/src/assembly/static/conf/rest-server.properties index 1666946dda..f7eda0010c 100644 --- a/hugegraph-dist/src/assembly/static/conf/rest-server.properties +++ b/hugegraph-dist/src/assembly/static/conf/rest-server.properties @@ -22,20 +22,22 @@ batch.max_write_threads=0 #auth.admin_token= #auth.user_tokens=[] -# rpc group configs of multi graph servers -# rpc server configs +# rpc server configs for multi graph-servers or raft-servers rpc.server_host=127.0.0.1 -rpc.server_port=8090 +rpc.server_port=8091 #rpc.server_timeout=30 # rpc client configs (like enable to keep cache consistency) -rpc.remote_url=127.0.0.1:8090 +#rpc.remote_url=127.0.0.1:8091,127.0.0.1:8092,127.0.0.1:8093 #rpc.client_connect_timeout=20 #rpc.client_reconnect_period=10 #rpc.client_read_timeout=40 #rpc.client_retries=3 #rpc.client_load_balancer=consistentHash +# raft group initial peers +#raft.group_peers=127.0.0.1:8091,127.0.0.1:8092,127.0.0.1:8093 + # lightweight load balancing (beta) server.id=server-1 server.role=master diff --git a/hugegraph-dist/src/assembly/travis/conf-raft1/graphs/hugegraph.properties b/hugegraph-dist/src/assembly/travis/conf-raft1/graphs/hugegraph.properties index 0f043b44dc..33150b467b 100644 --- a/hugegraph-dist/src/assembly/travis/conf-raft1/graphs/hugegraph.properties +++ b/hugegraph-dist/src/assembly/travis/conf-raft1/graphs/hugegraph.properties @@ -10,20 +10,5 @@ rocksdb.data_path=rocksdb-data-raft1 rocksdb.wal_path=rocksdb-data-raft1 raft.mode=true -raft.safe_read=true -raft.use_snapshot=false -raft.endpoint=127.0.0.1:8281 -raft.group_peers=127.0.0.1:8281,127.0.0.1:8282,127.0.0.1:8283 raft.path=rocksdb-raftlog1 -raft.use_replicator_pipeline=true -raft.election_timeout=10000 -raft.snapshot_interval=3600 -raft.backend_threads=48 -raft.read_index_threads=8 -raft.read_strategy=ReadOnlyLeaseBased -raft.queue_size=16384 -raft.queue_publish_timeout=60 -raft.apply_batch=1 -raft.rpc_threads=8 -raft.rpc_connect_timeout=5000 -raft.rpc_timeout=60000 +raft.safe_read=true diff --git a/hugegraph-dist/src/assembly/travis/conf-raft1/rest-server.properties b/hugegraph-dist/src/assembly/travis/conf-raft1/rest-server.properties index 25b6b283d3..a94463487c 100644 --- a/hugegraph-dist/src/assembly/travis/conf-raft1/rest-server.properties +++ b/hugegraph-dist/src/assembly/travis/conf-raft1/rest-server.properties @@ -7,6 +7,8 @@ rpc.server_host=127.0.0.1 rpc.server_port=8091 rpc.remote_url=127.0.0.1:8091,127.0.0.1:8092,127.0.0.1:8093 +raft.group_peers=127.0.0.1:8091,127.0.0.1:8092,127.0.0.1:8093 + server.id=server1 server.role=master diff --git a/hugegraph-dist/src/assembly/travis/conf-raft2/graphs/hugegraph.properties b/hugegraph-dist/src/assembly/travis/conf-raft2/graphs/hugegraph.properties index 9aaf5fdfc4..13deeae4f0 100644 --- a/hugegraph-dist/src/assembly/travis/conf-raft2/graphs/hugegraph.properties +++ b/hugegraph-dist/src/assembly/travis/conf-raft2/graphs/hugegraph.properties @@ -10,20 +10,5 @@ rocksdb.data_path=rocksdb-data-raft2 rocksdb.wal_path=rocksdb-data-raft2 raft.mode=true -raft.safe_read=true -raft.use_snapshot=false -raft.endpoint=127.0.0.1:8282 -raft.group_peers=127.0.0.1:8281,127.0.0.1:8282,127.0.0.1:8283 raft.path=rocksdb-raftlog2 -raft.use_replicator_pipeline=true -raft.election_timeout=10000 -raft.snapshot_interval=3600 -raft.backend_threads=48 -raft.read_index_threads=8 -raft.read_strategy=ReadOnlyLeaseBased -raft.queue_size=16384 -raft.queue_publish_timeout=60 -raft.apply_batch=1 -raft.rpc_threads=8 -raft.rpc_connect_timeout=5000 -raft.rpc_timeout=60000 +raft.safe_read=true diff --git a/hugegraph-dist/src/assembly/travis/conf-raft2/rest-server.properties b/hugegraph-dist/src/assembly/travis/conf-raft2/rest-server.properties index f1eb1b4584..2a02d20b14 100644 --- a/hugegraph-dist/src/assembly/travis/conf-raft2/rest-server.properties +++ b/hugegraph-dist/src/assembly/travis/conf-raft2/rest-server.properties @@ -7,6 +7,8 @@ rpc.server_host=127.0.0.1 rpc.server_port=8092 rpc.remote_url=127.0.0.1:8091,127.0.0.1:8092,127.0.0.1:8093 +raft.group_peers=127.0.0.1:8091,127.0.0.1:8092,127.0.0.1:8093 + server.id=server2 server.role=worker diff --git a/hugegraph-dist/src/assembly/travis/conf-raft3/graphs/hugegraph.properties b/hugegraph-dist/src/assembly/travis/conf-raft3/graphs/hugegraph.properties index 0fb5a203cd..38f859c0a4 100644 --- a/hugegraph-dist/src/assembly/travis/conf-raft3/graphs/hugegraph.properties +++ b/hugegraph-dist/src/assembly/travis/conf-raft3/graphs/hugegraph.properties @@ -10,20 +10,5 @@ rocksdb.data_path=rocksdb-data-raft3 rocksdb.wal_path=rocksdb-data-raft3 raft.mode=true -raft.safe_read=true -raft.use_snapshot=false -raft.endpoint=127.0.0.1:8283 -raft.group_peers=127.0.0.1:8281,127.0.0.1:8282,127.0.0.1:8283 raft.path=rocksdb-raftlog3 -raft.use_replicator_pipeline=true -raft.election_timeout=10000 -raft.snapshot_interval=3600 -raft.backend_threads=48 -raft.read_index_threads=8 -raft.read_strategy=ReadOnlyLeaseBased -raft.queue_size=16384 -raft.queue_publish_timeout=60 -raft.apply_batch=1 -raft.rpc_threads=8 -raft.rpc_connect_timeout=5000 -raft.rpc_timeout=60000 +raft.safe_read=true diff --git a/hugegraph-dist/src/assembly/travis/conf-raft3/rest-server.properties b/hugegraph-dist/src/assembly/travis/conf-raft3/rest-server.properties index e31d3d286f..1fa980f4e8 100644 --- a/hugegraph-dist/src/assembly/travis/conf-raft3/rest-server.properties +++ b/hugegraph-dist/src/assembly/travis/conf-raft3/rest-server.properties @@ -7,6 +7,8 @@ rpc.server_host=127.0.0.1 rpc.server_port=8093 rpc.remote_url=127.0.0.1:8091,127.0.0.1:8092,127.0.0.1:8093 +raft.group_peers=127.0.0.1:8091,127.0.0.1:8092,127.0.0.1:8093 + server.id=server3 server.role=worker diff --git a/hugegraph-dist/src/assembly/travis/run-api-test-for-raft.sh b/hugegraph-dist/src/assembly/travis/run-api-test-for-raft.sh index 9c58e7170b..75819ee13c 100755 --- a/hugegraph-dist/src/assembly/travis/run-api-test-for-raft.sh +++ b/hugegraph-dist/src/assembly/travis/run-api-test-for-raft.sh @@ -18,7 +18,7 @@ GREMLIN_SERVER_CONF=$SERVER_DIR/conf/gremlin-server.yaml JACOCO_PORT=36320 RAFT_TOOLS=$RAFT1_DIR/bin/raft-tools.sh -RAFT_LEADER="127.0.0.1:8281" +RAFT_LEADER="127.0.0.1:8091" mvn package -DskipTests diff --git a/hugegraph-dist/src/main/java/com/baidu/hugegraph/dist/HugeGraphServer.java b/hugegraph-dist/src/main/java/com/baidu/hugegraph/dist/HugeGraphServer.java index 476e5420ea..602f20ecdc 100644 --- a/hugegraph-dist/src/main/java/com/baidu/hugegraph/dist/HugeGraphServer.java +++ b/hugegraph-dist/src/main/java/com/baidu/hugegraph/dist/HugeGraphServer.java @@ -19,6 +19,8 @@ package com.baidu.hugegraph.dist; +import java.util.concurrent.CompletableFuture; + import org.apache.tinkerpop.gremlin.server.GremlinServer; import org.slf4j.Logger; @@ -31,14 +33,12 @@ import com.baidu.hugegraph.util.ConfigUtil; import com.baidu.hugegraph.util.Log; -import java.util.concurrent.CompletableFuture; - public class HugeGraphServer { private static final Logger LOG = Log.logger(HugeGraphServer.class); - private final GremlinServer gremlinServer; private final RestServer restServer; + private final GremlinServer gremlinServer; public static void register() { RegisterUtil.registerBackends(); @@ -56,46 +56,46 @@ public HugeGraphServer(String gremlinServerConf, String restServerConf) HugeConfig restServerConfig = new HugeConfig(restServerConf); String graphsDir = restServerConfig.get(ServerOptions.GRAPHS); EventHub hub = new EventHub("gremlin=>hub<=rest"); + try { - // Start GremlinServer - this.gremlinServer = HugeGremlinServer.start(gremlinServerConf, - graphsDir, hub); + // Start HugeRestServer + this.restServer = HugeRestServer.start(restServerConf, hub); } catch (Throwable e) { - LOG.error("HugeGremlinServer start error: ", e); - HugeFactory.shutdown(30L); + LOG.error("HugeRestServer start error: ", e); throw e; - } finally { - System.setSecurityManager(securityManager); } try { - // Start HugeRestServer - this.restServer = HugeRestServer.start(restServerConf, hub); + // Start GremlinServer + this.gremlinServer = HugeGremlinServer.start(gremlinServerConf, + graphsDir, hub); } catch (Throwable e) { - LOG.error("HugeRestServer start error: ", e); + LOG.error("HugeGremlinServer start error: ", e); try { - this.gremlinServer.stop().get(); + this.restServer.shutdown().get(); } catch (Throwable t) { - LOG.error("GremlinServer stop error: ", t); + LOG.error("HugeRestServer stop error: ", t); } HugeFactory.shutdown(30L); throw e; + } finally { + System.setSecurityManager(securityManager); } } public void stop() { try { - this.restServer.shutdown().get(); - LOG.info("HugeRestServer stopped"); + this.gremlinServer.stop().get(); + LOG.info("HugeGremlinServer stopped"); } catch (Throwable e) { - LOG.error("HugeRestServer stop error: ", e); + LOG.error("HugeGremlinServer stop error: ", e); } try { - this.gremlinServer.stop().get(); - LOG.info("HugeGremlinServer stopped"); + this.restServer.shutdown().get(); + LOG.info("HugeRestServer stopped"); } catch (Throwable e) { - LOG.error("HugeGremlinServer stop error: ", e); + LOG.error("HugeRestServer stop error: ", e); } try { diff --git a/hugegraph-test/src/main/java/com/baidu/hugegraph/unit/util/CompressUtilTest.java b/hugegraph-test/src/main/java/com/baidu/hugegraph/unit/util/CompressUtilTest.java index b91f66dee1..c3b8f7c736 100644 --- a/hugegraph-test/src/main/java/com/baidu/hugegraph/unit/util/CompressUtilTest.java +++ b/hugegraph-test/src/main/java/com/baidu/hugegraph/unit/util/CompressUtilTest.java @@ -50,7 +50,8 @@ public void testZipCompress() throws IOException { prepareFiles(Paths.get(rootDir, sourceDir).toString()); Checksum checksum = new CRC64(); - CompressUtil.compressZip(rootDir, sourceDir, zipFile, checksum); + CompressUtil.compressZip(Paths.get(rootDir, sourceDir).toString(), + zipFile, checksum); CompressUtil.decompressZip(zipFile, output, checksum); assertDirEquals(rootDir, output);