Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package com.baidu.hugegraph.api.raft;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

Expand All @@ -40,8 +41,14 @@
import com.baidu.hugegraph.HugeGraph;
import com.baidu.hugegraph.api.API;
import com.baidu.hugegraph.api.filter.StatusFilter.Status;
import com.baidu.hugegraph.backend.id.Id;
import com.baidu.hugegraph.backend.store.raft.RaftAddPeerJob;
import com.baidu.hugegraph.backend.store.raft.RaftGroupManager;
import com.baidu.hugegraph.backend.store.raft.RaftRemovePeerJob;
import com.baidu.hugegraph.core.GraphManager;
import com.baidu.hugegraph.job.JobBuilder;
import com.baidu.hugegraph.util.DateUtil;
import com.baidu.hugegraph.util.JsonUtil;
import com.baidu.hugegraph.util.Log;
import com.codahale.metrics.annotation.Timed;
import com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -144,19 +151,26 @@ public Map<String, String> setLeader(@Context GraphManager manager,
@Consumes(APPLICATION_JSON)
@Produces(APPLICATION_JSON_WITH_CHARSET)
@RolesAllowed({"admin"})
public Map<String, String> addPeer(@Context GraphManager manager,
@PathParam("graph") String graph,
@QueryParam("group")
@DefaultValue("default")
String group,
@QueryParam("endpoint")
String endpoint) {
public Map<String, Id> addPeer(@Context GraphManager manager,
@PathParam("graph") String graph,
@QueryParam("group") @DefaultValue("default")
String group,
@QueryParam("endpoint") String endpoint) {
LOG.debug("Graph [{}] prepare to add peer: {}", graph, endpoint);

HugeGraph g = graph(manager, graph);
RaftGroupManager raftManager = raftGroupManager(g, group, "add_peer");
String peerId = raftManager.addPeer(endpoint);
return ImmutableMap.of(raftManager.group(), peerId);

JobBuilder<String> builder = JobBuilder.of(g);
String name = String.format("raft-group-[%s]-add-peer-[%s]-at-[%s]",
raftManager.group(), endpoint,
DateUtil.now());
Map<String, String> inputs = new HashMap<>();
inputs.put("endpoint", endpoint);
builder.name(name)
.input(JsonUtil.toJson(inputs))
.job(new RaftAddPeerJob());
return ImmutableMap.of("task_id", builder.schedule().id());
}

@POST
Expand All @@ -166,26 +180,32 @@ public Map<String, String> addPeer(@Context GraphManager manager,
@Consumes(APPLICATION_JSON)
@Produces(APPLICATION_JSON_WITH_CHARSET)
@RolesAllowed({"admin"})
public Map<String, String> removePeer(@Context GraphManager manager,
@PathParam("graph") String graph,
@QueryParam("group")
@DefaultValue("default")
String group,
@QueryParam("endpoint")
String endpoint) {
public Map<String, Id> removePeer(@Context GraphManager manager,
@PathParam("graph") String graph,
@QueryParam("group")
@DefaultValue("default") String group,
@QueryParam("endpoint") String endpoint) {
LOG.debug("Graph [{}] prepare to remove peer: {}", graph, endpoint);

HugeGraph g = graph(manager, graph);
RaftGroupManager raftManager = raftGroupManager(g, group,
"remove_peer");
String peerId = raftManager.removePeer(endpoint);
return ImmutableMap.of(raftManager.group(), peerId);
JobBuilder<String> builder = JobBuilder.of(g);
String name = String.format("raft-group-[%s]-remove-peer-[%s]-at-[%s]",
raftManager.group(), endpoint,
DateUtil.now());
Map<String, String> inputs = new HashMap<>();
inputs.put("endpoint", endpoint);
builder.name(name)
.input(JsonUtil.toJson(inputs))
.job(new RaftRemovePeerJob());
return ImmutableMap.of("task_id", builder.schedule().id());
}

private static RaftGroupManager raftGroupManager(HugeGraph graph,
String group,
String operation) {
RaftGroupManager raftManager = graph.raftGroupManager(group);
RaftGroupManager raftManager = graph.raftGroupManager();
if (raftManager == null) {
throw new HugeException("Allowed %s operation only when " +
"working on raft mode", operation);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import org.apache.tinkerpop.gremlin.structure.io.Io;
import org.slf4j.Logger;

import com.alipay.remoting.rpc.RpcServer;
import com.baidu.hugegraph.HugeGraph;
import com.baidu.hugegraph.auth.HugeAuthenticator.RolePerm;
import com.baidu.hugegraph.auth.HugeAuthenticator.User;
Expand Down Expand Up @@ -645,9 +646,9 @@ public void readMode(GraphReadMode readMode) {
}

@Override
public void waitStarted() {
public void waitReady(RpcServer rpcServer) {
this.verifyAnyPermission();
this.hugegraph.waitStarted();
this.hugegraph.waitReady(rpcServer);
}

@Override
Expand Down Expand Up @@ -694,9 +695,9 @@ public void switchAuthManager(AuthManager authManager) {
}

@Override
public RaftGroupManager raftGroupManager(String group) {
public RaftGroupManager raftGroupManager() {
this.verifyAdminPermission();
return this.hugegraph.raftGroupManager(group);
return this.hugegraph.raftGroupManager();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,12 @@ public void setup(HugeConfig config) {
// Forced set RAFT_MODE to false when initializing backend
graphConfig.setProperty(CoreOptions.RAFT_MODE.name(), "false");
}

// Transfer `raft.group_peers` from server config to graph config
String raftGroupPeers = config.get(ServerOptions.RAFT_GROUP_PEERS);
graphConfig.addProperty(ServerOptions.RAFT_GROUP_PEERS.name(),
raftGroupPeers);

this.graph = (HugeGraph) GraphFactory.open(graphConfig);

String remoteUrl = config.get(ServerOptions.AUTH_REMOTE_URL);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,14 @@ public static synchronized ServerOptions instance() {
nonNegativeInt(),
0);

public static final ConfigOption<String> RAFT_GROUP_PEERS =
new ConfigOption<>(
"raft.group_peers",
"The rpc address of raft group initial peers.",
disallowEmpty(),
"127.0.0.1:8090"
);

public static final ConfigOption<Boolean> ALLOW_TRACE =
new ConfigOption<>(
"exception.allow_trace",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.tinkerpop.gremlin.structure.util.GraphFactory;
import org.slf4j.Logger;

import com.alipay.sofa.rpc.config.ServerConfig;
import com.baidu.hugegraph.HugeFactory;
import com.baidu.hugegraph.HugeGraph;
import com.baidu.hugegraph.auth.AuthManager;
Expand Down Expand Up @@ -66,6 +67,7 @@
import com.baidu.hugegraph.serializer.Serializer;
import com.baidu.hugegraph.server.RestServer;
import com.baidu.hugegraph.task.TaskManager;
import com.baidu.hugegraph.testutil.Whitebox;
import com.baidu.hugegraph.type.define.NodeRole;
import com.baidu.hugegraph.util.ConfigUtil;
import com.baidu.hugegraph.util.E;
Expand Down Expand Up @@ -96,37 +98,39 @@ public GraphManager(HugeConfig conf, EventHub hub) {
this.rpcClient = new RpcClientProvider(conf);
this.eventHub = hub;
this.conf = conf;

this.listenChanges();

this.loadGraphs(ConfigUtil.scanGraphsDir(this.graphsDir));

// this.installLicense(conf, "");

// Start RPC-Server for raft-rpc/auth-rpc/cache-notify-rpc...
this.startRpcServer();

// Raft will load snapshot firstly then launch election and replay log
this.waitGraphsStarted();
this.waitGraphsReady();

this.checkBackendVersionOrExit(conf);
this.startRpcServer();
this.serverStarted(conf);

this.addMetrics(conf);
}

public void loadGraphs(final Map<String, String> graphConfs) {
public void loadGraphs(Map<String, String> graphConfs) {
for (Map.Entry<String, String> conf : graphConfs.entrySet()) {
String name = conf.getKey();
String path = conf.getValue();
String graphConfPath = conf.getValue();
HugeFactory.checkGraphName(name, "rest-server.properties");
try {
this.loadGraph(name, path);
this.loadGraph(name, graphConfPath);
} catch (RuntimeException e) {
LOG.error("Graph '{}' can't be loaded: '{}'", name, path, e);
LOG.error("Graph '{}' can't be loaded: '{}'",
name, graphConfPath, e);
}
}
}

public void waitGraphsStarted() {
this.graphs.keySet().forEach(name -> {
HugeGraph graph = this.graph(name);
graph.waitStarted();
});
}

public HugeGraph cloneGraph(String name, String newName,
String configText) {
/*
Expand Down Expand Up @@ -289,6 +293,13 @@ private void startRpcServer() {
}
}

private com.alipay.remoting.rpc.RpcServer remotingRpcServer() {
ServerConfig serverConfig = Whitebox.getInternalState(this.rpcServer,
"serverConfig");
return Whitebox.getInternalState(serverConfig.getServer(),
"remotingServer");
}

private void destroyRpcServer() {
try {
this.rpcClient.destroy();
Expand Down Expand Up @@ -331,21 +342,41 @@ private void closeTx(final Set<String> graphSourceNamesToCloseTxOn,
});
}

private void loadGraph(String name, String path) {
final Graph graph = GraphFactory.open(path);
private void loadGraph(String name, String graphConfPath) {
HugeConfig config = new HugeConfig(graphConfPath);

// Transfer `raft.group_peers` from server config to graph config
String raftGroupPeers = this.conf.get(ServerOptions.RAFT_GROUP_PEERS);
config.addProperty(ServerOptions.RAFT_GROUP_PEERS.name(),
raftGroupPeers);

Graph graph = GraphFactory.open(config);
this.graphs.put(name, graph);
HugeConfig config = (HugeConfig) graph.configuration();
config.file(path);
LOG.info("Graph '{}' was successfully configured via '{}'", name, path);

HugeConfig graphConfig = (HugeConfig) graph.configuration();
assert graphConfPath.equals(graphConfig.file().getPath());

LOG.info("Graph '{}' was successfully configured via '{}'",
name, graphConfPath);

if (this.requireAuthentication() &&
!(graph instanceof HugeGraphAuthProxy)) {
LOG.warn("You may need to support access control for '{}' with {}",
path, HugeFactoryAuthProxy.GRAPH_FACTORY);
graphConfPath, HugeFactoryAuthProxy.GRAPH_FACTORY);
}
}

private void waitGraphsReady() {
com.alipay.remoting.rpc.RpcServer remotingRpcServer =
this.remotingRpcServer();
this.graphs.keySet().forEach(name -> {
HugeGraph graph = this.graph(name);
graph.waitReady(remotingRpcServer);
});
}

private void checkBackendVersionOrExit(HugeConfig config) {
LOG.info("Check backend version");
for (String graph : this.graphs()) {
// TODO: close tx from main thread
HugeGraph hugegraph = this.graph(graph);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.apache.tinkerpop.gremlin.structure.VertexProperty;

import com.alipay.remoting.rpc.RpcServer;
import com.baidu.hugegraph.auth.AuthManager;
import com.baidu.hugegraph.backend.id.Id;
import com.baidu.hugegraph.backend.query.Query;
Expand Down Expand Up @@ -192,7 +193,7 @@ public interface HugeGraph extends Graph {

void readMode(GraphReadMode readMode);

void waitStarted();
void waitReady(RpcServer rpcServer);

void serverStarted(Id serverId, NodeRole serverRole);

Expand Down Expand Up @@ -227,7 +228,7 @@ public interface HugeGraph extends Graph {

TaskScheduler taskScheduler();

RaftGroupManager raftGroupManager(String group);
RaftGroupManager raftGroupManager();

void proxy(HugeGraph graph);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
import org.slf4j.Logger;

import com.alipay.remoting.rpc.RpcServer;
import com.baidu.hugegraph.analyzer.Analyzer;
import com.baidu.hugegraph.analyzer.AnalyzerFactory;
import com.baidu.hugegraph.auth.AuthManager;
Expand Down Expand Up @@ -206,7 +207,7 @@ public StandardHugeGraph(HugeConfig config) {
LockUtil.destroy(this.name);
String message = "Failed to load backend store provider";
LOG.error("{}: {}", message, e.getMessage());
throw new HugeException(message);
throw new HugeException(message, e);
}

try {
Expand Down Expand Up @@ -310,10 +311,10 @@ public void readMode(GraphReadMode readMode) {
}

@Override
public void waitStarted() {
public void waitReady(RpcServer rpcServer) {
// Just for trigger Tx.getOrNewTransaction, then load 3 stores
this.schemaTransaction();
this.storeProvider.waitStoreStarted();
this.storeProvider.waitReady(rpcServer);
}

@Override
Expand Down Expand Up @@ -1004,13 +1005,13 @@ public void switchAuthManager(AuthManager authManager) {
}

@Override
public RaftGroupManager raftGroupManager(String group) {
public RaftGroupManager raftGroupManager() {
if (!(this.storeProvider instanceof RaftBackendStoreProvider)) {
return null;
}
RaftBackendStoreProvider provider =
((RaftBackendStoreProvider) this.storeProvider);
return provider.raftNodeManager(group);
return provider.raftNodeManager();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import org.slf4j.Logger;

import com.alipay.remoting.rpc.RpcServer;
import com.baidu.hugegraph.HugeGraph;
import com.baidu.hugegraph.backend.BackendException;
import com.baidu.hugegraph.backend.store.raft.StoreSnapshotFile;
Expand Down Expand Up @@ -94,8 +95,8 @@ public void open(String graph) {
}

@Override
public void waitStoreStarted() {
// pass
public void waitReady(RpcServer rpcServer) {
// passs
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public static BackendStoreProvider open(HugeGraphParams params) {
if (raftMode) {
LOG.info("Opening backend store '{}' in raft mode for graph '{}'",
backend, graph);
provider = new RaftBackendStoreProvider(provider, params);
provider = new RaftBackendStoreProvider(params, provider);
}
provider.open(graph);
return provider;
Expand Down
Loading