Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public class LoadDetectFilter implements ContainerRequestFilter {

@Override
public void filter(ContainerRequestContext context) {
if (isWhiteAPI(context)) {
if (LoadDetectFilter.isWhiteAPI(context)) {
return;
}

Expand Down Expand Up @@ -93,7 +93,7 @@ public void filter(ContainerRequestContext context) {
}
}

private static boolean isWhiteAPI(ContainerRequestContext context) {
public static boolean isWhiteAPI(ContainerRequestContext context) {
List<PathSegment> segments = context.getUriInfo().getPathSegments();
E.checkArgument(segments.size() > 0, "Invalid request uri '%s'",
context.getUriInfo().getPath());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@

package com.baidu.hugegraph.api.filter;

import java.io.IOException;

import javax.inject.Singleton;
import javax.ws.rs.container.ContainerRequestContext;
import javax.ws.rs.container.ContainerResponseContext;
Expand All @@ -39,8 +37,11 @@ public class LoadReleaseFilter implements ContainerResponseFilter {

@Override
public void filter(ContainerRequestContext requestContext,
ContainerResponseContext responseContext)
throws IOException {
ContainerResponseContext responseContext) {
if (LoadDetectFilter.isWhiteAPI(requestContext)) {
return;
}

WorkLoad load = this.loadProvider.get();
load.decrementAndGet();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public static synchronized ServerOptions instance() {
new ConfigOption<>(
"restserver.max_worker_threads",
"The maxmium worker threads of rest server.",
positiveInt(),
rangeInt(2, Integer.MAX_VALUE),
2 * Runtime.getRuntime().availableProcessors()
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,19 +46,21 @@ public class CassandraSessionPool extends BackendSessionPool {
private Cluster cluster;
private String keyspace;

public CassandraSessionPool(String keyspace, String store) {
super(keyspace + "/" + store);
public CassandraSessionPool(HugeConfig config, String keyspace,
String store) {
super(config, keyspace + "/" + store);
this.cluster = null;
this.keyspace = keyspace;
}

@Override
public synchronized void open(HugeConfig config) {
public synchronized void open() {
if (this.opened()) {
throw new BackendException("Please close the old SessionPool " +
"before opening a new one");
}

HugeConfig config = this.config();
// Contact options
String hosts = config.get(CassandraOptions.CASSANDRA_HOST);
int port = config.get(CassandraOptions.CASSANDRA_PORT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,10 @@ public abstract class CassandraStore
private final String keyspace;

private final BackendStoreProvider provider;

private final CassandraSessionPool sessions;
// TODO: move to parent class
private final Map<HugeType, CassandraTable> tables;

private CassandraSessionPool sessions;
private HugeConfig conf;

public CassandraStore(final BackendStoreProvider provider,
Expand All @@ -77,10 +76,9 @@ public CassandraStore(final BackendStoreProvider provider,

this.keyspace = keyspace;
this.store = store;

this.sessions = new CassandraSessionPool(keyspace, store);
this.tables = new ConcurrentHashMap<>();

this.sessions = null;
this.conf = null;

this.registerMetaHandlers();
Expand Down Expand Up @@ -114,11 +112,15 @@ public BackendStoreProvider provider() {
}

@Override
public void open(HugeConfig config) {
public synchronized void open(HugeConfig config) {
LOG.debug("Store open: {}", this.store);

E.checkNotNull(config, "config");

if (this.sessions == null) {
this.sessions = new CassandraSessionPool(config, this.keyspace,
this.store);
}

if (this.sessions.opened()) {
// TODO: maybe we should throw an exception here instead of ignore
LOG.debug("Store {} has been opened before", this.store);
Expand All @@ -128,7 +130,7 @@ public void open(HugeConfig config) {
this.conf = config;

// Init cluster
this.sessions.open(config);
this.sessions.open();

// Init a session for current thread
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,26 @@ public abstract class BackendSession {

private int refs;
private TxState txState;
private final long created;
private long updated;

public BackendSession() {
this.refs = 1;
this.txState = TxState.CLEAN;
this.created = System.currentTimeMillis();
this.updated = this.created;
}

public long created() {
return this.created;
}

public long updated() {
return this.updated;
}

public void update() {
this.updated = System.currentTimeMillis();
}

public abstract void close();
Expand All @@ -44,6 +60,10 @@ public BackendSession() {

public abstract boolean hasChanges();

protected void reconnectIfNeeded() {
// pass
}

protected int attach() {
return ++this.refs;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,28 +19,36 @@

package com.baidu.hugegraph.backend.store;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;

import com.baidu.hugegraph.config.CoreOptions;
import com.baidu.hugegraph.config.HugeConfig;
import com.baidu.hugegraph.util.Log;

public abstract class BackendSessionPool {

private static final Logger LOG = Log.logger(BackendSessionPool.class);

private final HugeConfig config;
private final String name;
private final ThreadLocal<BackendSession> threadLocalSession;
private final AtomicInteger sessionCount;

public BackendSessionPool(String name) {
public BackendSessionPool(HugeConfig config, String name) {
this.config = config;
this.name = name;
this.threadLocalSession = new ThreadLocal<>();
this.sessionCount = new AtomicInteger(0);
}

public HugeConfig config() {
return this.config;
}

public final BackendSession getOrNewSession() {
BackendSession session = this.threadLocalSession.get();
if (session == null) {
Expand All @@ -50,6 +58,8 @@ public final BackendSession getOrNewSession() {
this.sessionCount.incrementAndGet();
LOG.debug("Now(after connect({})) session count is: {}",
this, this.sessionCount.get());
} else {
this.detectSession(session);
}
return session;
}
Expand All @@ -58,12 +68,23 @@ public BackendSession useSession() {
BackendSession session = this.threadLocalSession.get();
if (session != null) {
session.attach();
this.detectSession(session);
} else {
session = this.getOrNewSession();
}
return session;
}

private void detectSession(BackendSession session) {
// Reconnect if the session idle time exceed specified value
long interval = this.config.get(CoreOptions.CONNECTION_DETECT_INTERVAL);
long now = System.currentTimeMillis();
if (now - session.updated() > TimeUnit.SECONDS.toMillis(interval)) {
session.reconnectIfNeeded();
}
session.update();
}

public Pair<Integer, Integer> closeSession() {
BackendSession session = this.threadLocalSession.get();
if (session == null) {
Expand Down Expand Up @@ -112,7 +133,7 @@ public String toString() {
this.getClass().getSimpleName(), this.hashCode());
}

public abstract void open(HugeConfig config) throws Exception;
public abstract void open() throws Exception;

protected abstract boolean opened();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@

package com.baidu.hugegraph.config;

import com.baidu.hugegraph.backend.query.Query;

import static com.baidu.hugegraph.backend.tx.GraphTransaction.COMMIT_BATCH;
import static com.baidu.hugegraph.config.OptionChecker.disallowEmpty;
import static com.baidu.hugegraph.config.OptionChecker.rangeInt;

import com.baidu.hugegraph.backend.query.Query;

public class CoreOptions extends OptionHolder {

private CoreOptions() {
Expand Down Expand Up @@ -115,6 +115,17 @@ public static synchronized CoreOptions instance() {
10L
);

public static final ConfigOption<Long> CONNECTION_DETECT_INTERVAL =
new ConfigOption<>(
"store.connection_detect_interval",
"The interval in seconds for detecting connections, " +
"if the idle time of a connection exceeds this value, " +
"detect it and reconnect if needed before using, " +
"value 0 means detecting every time.",
rangeInt(0L, Long.MAX_VALUE),
600L
);

public static final ConfigOption<String> VERTEX_DEFAULT_LABEL =
new ConfigOption<>(
"vertex.default_label",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@ public class HbaseSessions extends BackendSessionPool {
private final String namespace;
private Connection hbase;

public HbaseSessions(String namespace, String store) {
super(namespace + "/" + store);
public HbaseSessions(HugeConfig config, String namespace, String store) {
super(config, namespace + "/" + store);
this.namespace = namespace;
}

Expand All @@ -91,20 +91,21 @@ private Table table(String table) throws IOException {
}

@Override
public synchronized void open(HugeConfig conf) throws IOException {
String hosts = conf.get(HbaseOptions.HBASE_HOSTS);
int port = conf.get(HbaseOptions.HBASE_PORT);
String znodeParent = conf.get(HbaseOptions.HBASE_ZNODE_PARENT);

Configuration config = HBaseConfiguration.create();
config.set(HConstants.ZOOKEEPER_QUORUM, hosts);
config.set(HConstants.ZOOKEEPER_CLIENT_PORT, String.valueOf(port));
config.set(HConstants.ZOOKEEPER_ZNODE_PARENT, znodeParent);
public synchronized void open() throws IOException {
HugeConfig config = this.config();
String hosts = config.get(HbaseOptions.HBASE_HOSTS);
int port = config.get(HbaseOptions.HBASE_PORT);
String znodeParent = config.get(HbaseOptions.HBASE_ZNODE_PARENT);

Configuration hConfig = HBaseConfiguration.create();
hConfig.set(HConstants.ZOOKEEPER_QUORUM, hosts);
hConfig.set(HConstants.ZOOKEEPER_CLIENT_PORT, String.valueOf(port));
hConfig.set(HConstants.ZOOKEEPER_ZNODE_PARENT, znodeParent);
// Set hbase.hconnection.threads.max 64 to avoid OOM(default value: 256)
config.setInt("hbase.hconnection.threads.max",
conf.get(HbaseOptions.HBASE_THREADS_MAX));
hConfig.setInt("hbase.hconnection.threads.max",
config.get(HbaseOptions.HBASE_THREADS_MAX));

this.hbase = ConnectionFactory.createConnection(config);
this.hbase = ConnectionFactory.createConnection(hConfig);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public abstract class HbaseStore extends AbstractBackendStore<Session> {
private final BackendStoreProvider provider;
private final Map<HugeType, HbaseTable> tables;

private final HbaseSessions sessions;
private HbaseSessions sessions;

public HbaseStore(final BackendStoreProvider provider,
final String namespace, final String store) {
Expand All @@ -67,7 +67,7 @@ public HbaseStore(final BackendStoreProvider provider,
this.provider = provider;
this.namespace = namespace;
this.store = store;
this.sessions = new HbaseSessions(namespace, store);
this.sessions = null;
}

protected void registerTableManager(HugeType type, HbaseTable table) {
Expand Down Expand Up @@ -120,17 +120,21 @@ public BackendFeatures features() {
}

@Override
public void open(HugeConfig config) {
public synchronized void open(HugeConfig config) {
E.checkNotNull(config, "config");

if (this.sessions == null) {
this.sessions = new HbaseSessions(config, this.namespace, this.store);
}

if (this.sessions.opened()) {
LOG.debug("Store {} has been opened before", this.store);
this.sessions.useSession();
return;
}

try {
this.sessions.open(config);
this.sessions.open();
} catch (IOException e) {
if (!e.getMessage().contains("Column family not found")) {
LOG.error("Failed to open HBase '{}'", this.store, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public class MysqlSessions extends BackendSessionPool {
private volatile boolean opened;

public MysqlSessions(HugeConfig config, String database, String store) {
super(database + "/" + store);
super(config, database + "/" + store);
this.config = config;
this.database = database;
this.opened = false;
Expand All @@ -69,7 +69,7 @@ public String database() {
* @throws SQLException if a database access error occurs
*/
@Override
public synchronized void open(HugeConfig config) throws Exception {
public synchronized void open() throws Exception {
try (Connection conn = this.open(false)) {
this.opened = true;
}
Expand Down Expand Up @@ -346,6 +346,15 @@ public boolean hasChanges() {
return this.count > 0;
}

@Override
protected void reconnectIfNeeded() {
try {
this.execute("SELECT 1;");
} catch (SQLException ignored) {
// pass
}
}

public ResultSet select(String sql) throws SQLException {
assert this.conn.getAutoCommit();
return this.conn.createStatement().executeQuery(sql);
Expand Down
Loading