From ee86044569a1996fa01af9e313516835196e61ec Mon Sep 17 00:00:00 2001 From: liningrui Date: Mon, 10 Jun 2019 17:11:51 +0800 Subject: [PATCH 1/2] Support to detect connection before using if needed Fix #556 Change-Id: I622e285f812098d601061f2515d70629f1e07d88 --- .../api/filter/LoadDetectFilter.java | 4 +-- .../api/filter/LoadReleaseFilter.java | 9 +++--- .../baidu/hugegraph/config/ServerOptions.java | 2 +- .../store/cassandra/CassandraSessionPool.java | 8 +++-- .../store/cassandra/CassandraStore.java | 19 ++++++++---- .../backend/store/BackendSession.java | 20 +++++++++++++ .../backend/store/BackendSessionPool.java | 25 ++++++++++++++-- .../baidu/hugegraph/config/CoreOptions.java | 13 +++++++-- .../backend/store/hbase/HbaseSessions.java | 29 ++++++++++--------- .../backend/store/hbase/HbaseStore.java | 16 ++++++++-- .../backend/store/mysql/MysqlSessions.java | 13 +++++++-- .../backend/store/mysql/MysqlStore.java | 2 +- .../store/rocksdb/RocksDBSessions.java | 5 ++-- .../store/rocksdb/RocksDBStdSessions.java | 22 ++++++-------- .../store/rocksdbsst/RocksDBSstSessions.java | 11 ++++--- 15 files changed, 137 insertions(+), 61 deletions(-) diff --git a/hugegraph-api/src/main/java/com/baidu/hugegraph/api/filter/LoadDetectFilter.java b/hugegraph-api/src/main/java/com/baidu/hugegraph/api/filter/LoadDetectFilter.java index 317b2ec35d..881da5a56b 100644 --- a/hugegraph-api/src/main/java/com/baidu/hugegraph/api/filter/LoadDetectFilter.java +++ b/hugegraph-api/src/main/java/com/baidu/hugegraph/api/filter/LoadDetectFilter.java @@ -62,7 +62,7 @@ public class LoadDetectFilter implements ContainerRequestFilter { @Override public void filter(ContainerRequestContext context) { - if (isWhiteAPI(context)) { + if (LoadDetectFilter.isWhiteAPI(context)) { return; } @@ -93,7 +93,7 @@ public void filter(ContainerRequestContext context) { } } - private static boolean isWhiteAPI(ContainerRequestContext context) { + public static boolean isWhiteAPI(ContainerRequestContext context) { List segments = context.getUriInfo().getPathSegments(); E.checkArgument(segments.size() > 0, "Invalid request uri '%s'", context.getUriInfo().getPath()); diff --git a/hugegraph-api/src/main/java/com/baidu/hugegraph/api/filter/LoadReleaseFilter.java b/hugegraph-api/src/main/java/com/baidu/hugegraph/api/filter/LoadReleaseFilter.java index c5b57941cf..71fae891ce 100644 --- a/hugegraph-api/src/main/java/com/baidu/hugegraph/api/filter/LoadReleaseFilter.java +++ b/hugegraph-api/src/main/java/com/baidu/hugegraph/api/filter/LoadReleaseFilter.java @@ -19,8 +19,6 @@ package com.baidu.hugegraph.api.filter; -import java.io.IOException; - import javax.inject.Singleton; import javax.ws.rs.container.ContainerRequestContext; import javax.ws.rs.container.ContainerResponseContext; @@ -39,8 +37,11 @@ public class LoadReleaseFilter implements ContainerResponseFilter { @Override public void filter(ContainerRequestContext requestContext, - ContainerResponseContext responseContext) - throws IOException { + ContainerResponseContext responseContext) { + if (LoadDetectFilter.isWhiteAPI(requestContext)) { + return; + } + WorkLoad load = this.loadProvider.get(); load.decrementAndGet(); } diff --git a/hugegraph-api/src/main/java/com/baidu/hugegraph/config/ServerOptions.java b/hugegraph-api/src/main/java/com/baidu/hugegraph/config/ServerOptions.java index 2753477343..a4673b1561 100644 --- a/hugegraph-api/src/main/java/com/baidu/hugegraph/config/ServerOptions.java +++ b/hugegraph-api/src/main/java/com/baidu/hugegraph/config/ServerOptions.java @@ -52,7 +52,7 @@ public static synchronized ServerOptions instance() { new ConfigOption<>( "restserver.max_worker_threads", "The maxmium worker threads of rest server.", - positiveInt(), + rangeInt(2, Integer.MAX_VALUE), 2 * Runtime.getRuntime().availableProcessors() ); diff --git a/hugegraph-cassandra/src/main/java/com/baidu/hugegraph/backend/store/cassandra/CassandraSessionPool.java b/hugegraph-cassandra/src/main/java/com/baidu/hugegraph/backend/store/cassandra/CassandraSessionPool.java index c4351b3657..f41641ae35 100644 --- a/hugegraph-cassandra/src/main/java/com/baidu/hugegraph/backend/store/cassandra/CassandraSessionPool.java +++ b/hugegraph-cassandra/src/main/java/com/baidu/hugegraph/backend/store/cassandra/CassandraSessionPool.java @@ -46,19 +46,21 @@ public class CassandraSessionPool extends BackendSessionPool { private Cluster cluster; private String keyspace; - public CassandraSessionPool(String keyspace, String store) { - super(keyspace + "/" + store); + public CassandraSessionPool(HugeConfig config, String keyspace, + String store) { + super(config, keyspace + "/" + store); this.cluster = null; this.keyspace = keyspace; } @Override - public synchronized void open(HugeConfig config) { + public synchronized void open() { if (this.opened()) { throw new BackendException("Please close the old SessionPool " + "before opening a new one"); } + HugeConfig config = this.config(); // Contact options String hosts = config.get(CassandraOptions.CASSANDRA_HOST); int port = config.get(CassandraOptions.CASSANDRA_PORT); diff --git a/hugegraph-cassandra/src/main/java/com/baidu/hugegraph/backend/store/cassandra/CassandraStore.java b/hugegraph-cassandra/src/main/java/com/baidu/hugegraph/backend/store/cassandra/CassandraStore.java index e8af2c2c1b..11ff643bc7 100644 --- a/hugegraph-cassandra/src/main/java/com/baidu/hugegraph/backend/store/cassandra/CassandraStore.java +++ b/hugegraph-cassandra/src/main/java/com/baidu/hugegraph/backend/store/cassandra/CassandraStore.java @@ -61,11 +61,10 @@ public abstract class CassandraStore private final String keyspace; private final BackendStoreProvider provider; - - private final CassandraSessionPool sessions; // TODO: move to parent class private final Map tables; + private volatile CassandraSessionPool sessions; private HugeConfig conf; public CassandraStore(final BackendStoreProvider provider, @@ -77,10 +76,9 @@ public CassandraStore(final BackendStoreProvider provider, this.keyspace = keyspace; this.store = store; - - this.sessions = new CassandraSessionPool(keyspace, store); this.tables = new ConcurrentHashMap<>(); + this.sessions = null; this.conf = null; this.registerMetaHandlers(); @@ -116,9 +114,18 @@ public BackendStoreProvider provider() { @Override public void open(HugeConfig config) { LOG.debug("Store open: {}", this.store); - E.checkNotNull(config, "config"); + if (this.sessions == null) { + synchronized(this) { + if (this.sessions == null) { + this.sessions = new CassandraSessionPool(config, + this.keyspace, + this.store); + } + } + } + if (this.sessions.opened()) { // TODO: maybe we should throw an exception here instead of ignore LOG.debug("Store {} has been opened before", this.store); @@ -128,7 +135,7 @@ public void open(HugeConfig config) { this.conf = config; // Init cluster - this.sessions.open(config); + this.sessions.open(); // Init a session for current thread try { diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/BackendSession.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/BackendSession.java index 1f905c7195..6dd01e4b7a 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/BackendSession.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/BackendSession.java @@ -28,10 +28,26 @@ public abstract class BackendSession { private int refs; private TxState txState; + private final long created; + private long updated; public BackendSession() { this.refs = 1; this.txState = TxState.CLEAN; + this.created = System.currentTimeMillis(); + this.updated = this.created; + } + + public long created() { + return this.created; + } + + public long updated() { + return this.updated; + } + + public void update() { + this.updated = System.currentTimeMillis(); } public abstract void close(); @@ -44,6 +60,10 @@ public BackendSession() { public abstract boolean hasChanges(); + protected void reconnectIfNeeded() { + // pass + } + protected int attach() { return ++this.refs; } diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/BackendSessionPool.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/BackendSessionPool.java index 1cb8f80084..164dc39e4f 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/BackendSessionPool.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/BackendSessionPool.java @@ -19,11 +19,13 @@ package com.baidu.hugegraph.backend.store; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.lang3.tuple.Pair; import org.slf4j.Logger; +import com.baidu.hugegraph.config.CoreOptions; import com.baidu.hugegraph.config.HugeConfig; import com.baidu.hugegraph.util.Log; @@ -31,16 +33,22 @@ public abstract class BackendSessionPool { private static final Logger LOG = Log.logger(BackendSessionPool.class); + private final HugeConfig config; private final String name; private final ThreadLocal threadLocalSession; private final AtomicInteger sessionCount; - public BackendSessionPool(String name) { + public BackendSessionPool(HugeConfig config, String name) { + this.config = config; this.name = name; this.threadLocalSession = new ThreadLocal<>(); this.sessionCount = new AtomicInteger(0); } + public HugeConfig config() { + return this.config; + } + public final BackendSession getOrNewSession() { BackendSession session = this.threadLocalSession.get(); if (session == null) { @@ -50,6 +58,8 @@ public final BackendSession getOrNewSession() { this.sessionCount.incrementAndGet(); LOG.debug("Now(after connect({})) session count is: {}", this, this.sessionCount.get()); + } else { + this.detectSession(session); } return session; } @@ -58,12 +68,23 @@ public BackendSession useSession() { BackendSession session = this.threadLocalSession.get(); if (session != null) { session.attach(); + this.detectSession(session); } else { session = this.getOrNewSession(); } return session; } + private void detectSession(BackendSession session) { + // Reconnect if the session idle time exceed specified value + long interval = this.config.get(CoreOptions.CONNECTION_DETECT_INTERVAL); + long now = System.currentTimeMillis(); + if (now - session.updated() > TimeUnit.SECONDS.toMillis(interval)) { + session.reconnectIfNeeded(); + } + session.update(); + } + public Pair closeSession() { BackendSession session = this.threadLocalSession.get(); if (session == null) { @@ -112,7 +133,7 @@ public String toString() { this.getClass().getSimpleName(), this.hashCode()); } - public abstract void open(HugeConfig config) throws Exception; + public abstract void open() throws Exception; protected abstract boolean opened(); diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/config/CoreOptions.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/config/CoreOptions.java index 8623df8bb6..470e085354 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/config/CoreOptions.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/config/CoreOptions.java @@ -19,12 +19,12 @@ package com.baidu.hugegraph.config; -import com.baidu.hugegraph.backend.query.Query; - import static com.baidu.hugegraph.backend.tx.GraphTransaction.COMMIT_BATCH; import static com.baidu.hugegraph.config.OptionChecker.disallowEmpty; import static com.baidu.hugegraph.config.OptionChecker.rangeInt; +import com.baidu.hugegraph.backend.query.Query; + public class CoreOptions extends OptionHolder { private CoreOptions() { @@ -115,6 +115,15 @@ public static synchronized CoreOptions instance() { 10L ); + public static final ConfigOption CONNECTION_DETECT_INTERVAL = + new ConfigOption<>( + "store.connection_detect_interval", + "The interval for detecting connection, if connection's " + + "idle time exceed this interval, detect it before using.", + rangeInt(0L, Long.MAX_VALUE), + 600L + ); + public static final ConfigOption VERTEX_DEFAULT_LABEL = new ConfigOption<>( "vertex.default_label", diff --git a/hugegraph-hbase/src/main/java/com/baidu/hugegraph/backend/store/hbase/HbaseSessions.java b/hugegraph-hbase/src/main/java/com/baidu/hugegraph/backend/store/hbase/HbaseSessions.java index b66fceb34f..71239052c8 100644 --- a/hugegraph-hbase/src/main/java/com/baidu/hugegraph/backend/store/hbase/HbaseSessions.java +++ b/hugegraph-hbase/src/main/java/com/baidu/hugegraph/backend/store/hbase/HbaseSessions.java @@ -79,8 +79,8 @@ public class HbaseSessions extends BackendSessionPool { private final String namespace; private Connection hbase; - public HbaseSessions(String namespace, String store) { - super(namespace + "/" + store); + public HbaseSessions(HugeConfig config, String namespace, String store) { + super(config, namespace + "/" + store); this.namespace = namespace; } @@ -91,20 +91,21 @@ private Table table(String table) throws IOException { } @Override - public synchronized void open(HugeConfig conf) throws IOException { - String hosts = conf.get(HbaseOptions.HBASE_HOSTS); - int port = conf.get(HbaseOptions.HBASE_PORT); - String znodeParent = conf.get(HbaseOptions.HBASE_ZNODE_PARENT); - - Configuration config = HBaseConfiguration.create(); - config.set(HConstants.ZOOKEEPER_QUORUM, hosts); - config.set(HConstants.ZOOKEEPER_CLIENT_PORT, String.valueOf(port)); - config.set(HConstants.ZOOKEEPER_ZNODE_PARENT, znodeParent); + public synchronized void open() throws IOException { + HugeConfig config = this.config(); + String hosts = config.get(HbaseOptions.HBASE_HOSTS); + int port = config.get(HbaseOptions.HBASE_PORT); + String znodeParent = config.get(HbaseOptions.HBASE_ZNODE_PARENT); + + Configuration hConfig = HBaseConfiguration.create(); + hConfig.set(HConstants.ZOOKEEPER_QUORUM, hosts); + hConfig.set(HConstants.ZOOKEEPER_CLIENT_PORT, String.valueOf(port)); + hConfig.set(HConstants.ZOOKEEPER_ZNODE_PARENT, znodeParent); // Set hbase.hconnection.threads.max 64 to avoid OOM(default value: 256) - config.setInt("hbase.hconnection.threads.max", - conf.get(HbaseOptions.HBASE_THREADS_MAX)); + hConfig.setInt("hbase.hconnection.threads.max", + config.get(HbaseOptions.HBASE_THREADS_MAX)); - this.hbase = ConnectionFactory.createConnection(config); + this.hbase = ConnectionFactory.createConnection(hConfig); } @Override diff --git a/hugegraph-hbase/src/main/java/com/baidu/hugegraph/backend/store/hbase/HbaseStore.java b/hugegraph-hbase/src/main/java/com/baidu/hugegraph/backend/store/hbase/HbaseStore.java index 7768c31f9b..4810f32663 100644 --- a/hugegraph-hbase/src/main/java/com/baidu/hugegraph/backend/store/hbase/HbaseStore.java +++ b/hugegraph-hbase/src/main/java/com/baidu/hugegraph/backend/store/hbase/HbaseStore.java @@ -58,7 +58,7 @@ public abstract class HbaseStore extends AbstractBackendStore { private final BackendStoreProvider provider; private final Map tables; - private final HbaseSessions sessions; + private volatile HbaseSessions sessions; public HbaseStore(final BackendStoreProvider provider, final String namespace, final String store) { @@ -67,7 +67,7 @@ public HbaseStore(final BackendStoreProvider provider, this.provider = provider; this.namespace = namespace; this.store = store; - this.sessions = new HbaseSessions(namespace, store); + this.sessions = null; } protected void registerTableManager(HugeType type, HbaseTable table) { @@ -123,6 +123,16 @@ public BackendFeatures features() { public void open(HugeConfig config) { E.checkNotNull(config, "config"); + if (this.sessions == null) { + synchronized(this) { + if (this.sessions == null) { + this.sessions = new HbaseSessions(config, + this.namespace, + this.store); + } + } + } + if (this.sessions.opened()) { LOG.debug("Store {} has been opened before", this.store); this.sessions.useSession(); @@ -130,7 +140,7 @@ public void open(HugeConfig config) { } try { - this.sessions.open(config); + this.sessions.open(); } catch (IOException e) { if (!e.getMessage().contains("Column family not found")) { LOG.error("Failed to open HBase '{}'", this.store, e); diff --git a/hugegraph-mysql/src/main/java/com/baidu/hugegraph/backend/store/mysql/MysqlSessions.java b/hugegraph-mysql/src/main/java/com/baidu/hugegraph/backend/store/mysql/MysqlSessions.java index 9411ed5f0e..815d7a24cb 100644 --- a/hugegraph-mysql/src/main/java/com/baidu/hugegraph/backend/store/mysql/MysqlSessions.java +++ b/hugegraph-mysql/src/main/java/com/baidu/hugegraph/backend/store/mysql/MysqlSessions.java @@ -50,7 +50,7 @@ public class MysqlSessions extends BackendSessionPool { private volatile boolean opened; public MysqlSessions(HugeConfig config, String database, String store) { - super(database + "/" + store); + super(config, database + "/" + store); this.config = config; this.database = database; this.opened = false; @@ -69,7 +69,7 @@ public String database() { * @throws SQLException if a database access error occurs */ @Override - public synchronized void open(HugeConfig config) throws Exception { + public synchronized void open() throws Exception { try (Connection conn = this.open(false)) { this.opened = true; } @@ -346,6 +346,15 @@ public boolean hasChanges() { return this.count > 0; } + @Override + protected void reconnectIfNeeded() { + try { + this.execute("SELECT 1;"); + } catch (SQLException ignored) { + // pass + } + } + public ResultSet select(String sql) throws SQLException { assert this.conn.getAutoCommit(); return this.conn.createStatement().executeQuery(sql); diff --git a/hugegraph-mysql/src/main/java/com/baidu/hugegraph/backend/store/mysql/MysqlStore.java b/hugegraph-mysql/src/main/java/com/baidu/hugegraph/backend/store/mysql/MysqlStore.java index 3c39ddbbc6..511b33012a 100644 --- a/hugegraph-mysql/src/main/java/com/baidu/hugegraph/backend/store/mysql/MysqlStore.java +++ b/hugegraph-mysql/src/main/java/com/baidu/hugegraph/backend/store/mysql/MysqlStore.java @@ -112,7 +112,7 @@ public synchronized void open(HugeConfig config) { LOG.debug("Store connect with database: {}", this.database); try { - this.sessions.open(config); + this.sessions.open(); } catch (Exception e) { if (!e.getMessage().startsWith("Unknown database") && !e.getMessage().endsWith("does not exist")) { diff --git a/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBSessions.java b/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBSessions.java index bfbe332273..a55eefa90d 100644 --- a/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBSessions.java +++ b/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBSessions.java @@ -26,11 +26,12 @@ import com.baidu.hugegraph.backend.store.BackendEntry.BackendColumnIterator; import com.baidu.hugegraph.backend.store.BackendSession; import com.baidu.hugegraph.backend.store.BackendSessionPool; +import com.baidu.hugegraph.config.HugeConfig; public abstract class RocksDBSessions extends BackendSessionPool { - public RocksDBSessions(String database, String store) { - super(database + "/" + store); + public RocksDBSessions(HugeConfig config, String database, String store) { + super(config, database + "/" + store); } public abstract Set openedTables(); diff --git a/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBStdSessions.java b/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBStdSessions.java index ed0c4f6875..0ec7b240d9 100644 --- a/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBStdSessions.java +++ b/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBStdSessions.java @@ -64,20 +64,17 @@ public class RocksDBStdSessions extends RocksDBSessions { private final Map cfs = new HashMap<>(); - private final HugeConfig conf; private final RocksDB rocksdb; private final SstFileManager sstFileManager; public RocksDBStdSessions(HugeConfig config, String dataPath, String walPath, String database, String store) throws RocksDBException { - super(database, store); - - this.conf = config; + super(config, database, store); // Init options Options options = new Options(); - RocksDBStdSessions.initOptions(this.conf, options, options, options); + RocksDBStdSessions.initOptions(config, options, options, options); options.setWalDir(walPath); this.sstFileManager = new SstFileManager(Env.getDefault()); @@ -93,8 +90,7 @@ public RocksDBStdSessions(HugeConfig config, String dataPath, public RocksDBStdSessions(HugeConfig config, String dataPath, String walPath, String database, String store, List cfNames) throws RocksDBException { - super(database, store); - this.conf = config; + super(config, database, store); // Old CFs should always be opened Set mergedCFs = this.mergeOldCFs(dataPath, cfNames); @@ -105,13 +101,13 @@ public RocksDBStdSessions(HugeConfig config, String dataPath, for (String cf : cfs) { ColumnFamilyDescriptor cfd = new ColumnFamilyDescriptor(encode(cf)); ColumnFamilyOptions options = cfd.getOptions(); - RocksDBStdSessions.initOptions(this.conf, null, options, options); + RocksDBStdSessions.initOptions(config, null, options, options); cfds.add(cfd); } // Init DB options DBOptions options = new DBOptions(); - RocksDBStdSessions.initOptions(this.conf, options, null, null); + RocksDBStdSessions.initOptions(config, options, null, null); options.setWalDir(walPath); this.sstFileManager = new SstFileManager(Env.getDefault()); @@ -132,7 +128,7 @@ public RocksDBStdSessions(HugeConfig config, String dataPath, } @Override - public void open(HugeConfig config) throws Exception { + public void open() throws Exception { // pass } @@ -157,7 +153,7 @@ public void createTable(String table) throws RocksDBException { // Should we use options.setCreateMissingColumnFamilies() to create CF ColumnFamilyDescriptor cfd = new ColumnFamilyDescriptor(encode(table)); ColumnFamilyOptions options = cfd.getOptions(); - initOptions(this.conf, null, options, options); + initOptions(this.config(), null, options, options); this.cfs.put(table, this.rocksdb.createColumnFamily(cfd)); ingestExternalFile(); @@ -194,7 +190,7 @@ public final Session session() { protected final Session newSession() { E.checkState(this.rocksdb.isOwningHandle(), "RocksDB has not been initialized"); - return new StdSession(this.conf); + return new StdSession(this.config()); } @Override @@ -235,7 +231,7 @@ private Set mergeOldCFs(String path, List cfNames) } private void ingestExternalFile() throws RocksDBException { - String directory = this.conf.get(RocksDBOptions.SST_PATH); + String directory = this.config().get(RocksDBOptions.SST_PATH); if (directory == null || directory.isEmpty()) { return; } diff --git a/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdbsst/RocksDBSstSessions.java b/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdbsst/RocksDBSstSessions.java index 38bcbbf3b6..c9599b5a96 100644 --- a/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdbsst/RocksDBSstSessions.java +++ b/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdbsst/RocksDBSstSessions.java @@ -46,15 +46,13 @@ public class RocksDBSstSessions extends RocksDBSessions { - private final HugeConfig conf; private final String dataPath; private final Map tables; - public RocksDBSstSessions(HugeConfig conf, String dataPath, + public RocksDBSstSessions(HugeConfig config, String dataPath, String database, String store) { - super(database, store); + super(config, database, store); - this.conf = conf; this.dataPath = dataPath; this.tables = new ConcurrentHashMap<>(); @@ -74,7 +72,7 @@ public RocksDBSstSessions(HugeConfig config, String dataPath, } @Override - public void open(HugeConfig config) throws Exception { + public void open() throws Exception { // pass } @@ -92,7 +90,8 @@ public Set openedTables() { public void createTable(String table) throws RocksDBException { EnvOptions env = new EnvOptions(); Options options = new Options(); - RocksDBStdSessions.initOptions(this.conf, options, options, options); + RocksDBStdSessions.initOptions(this.config(), options, + options, options); // NOTE: unset merge op due to SIGSEGV when cf.setMergeOperatorName() options.setMergeOperatorName("not-exist-merge-op"); SstFileWriter sst = new SstFileWriter(env, options); From 504979d1bd0da098a90b16e81c88a64aae7542de Mon Sep 17 00:00:00 2001 From: liningrui Date: Tue, 11 Jun 2019 16:19:26 +0800 Subject: [PATCH 2/2] tiny improve Change-Id: I827356fd0b35173ea2db1d99b61e03e3b56a2a78 --- .../backend/store/cassandra/CassandraStore.java | 13 ++++--------- .../com/baidu/hugegraph/config/CoreOptions.java | 6 ++++-- .../hugegraph/backend/store/hbase/HbaseStore.java | 12 +++--------- 3 files changed, 11 insertions(+), 20 deletions(-) diff --git a/hugegraph-cassandra/src/main/java/com/baidu/hugegraph/backend/store/cassandra/CassandraStore.java b/hugegraph-cassandra/src/main/java/com/baidu/hugegraph/backend/store/cassandra/CassandraStore.java index 11ff643bc7..62f3a2de1e 100644 --- a/hugegraph-cassandra/src/main/java/com/baidu/hugegraph/backend/store/cassandra/CassandraStore.java +++ b/hugegraph-cassandra/src/main/java/com/baidu/hugegraph/backend/store/cassandra/CassandraStore.java @@ -64,7 +64,7 @@ public abstract class CassandraStore // TODO: move to parent class private final Map tables; - private volatile CassandraSessionPool sessions; + private CassandraSessionPool sessions; private HugeConfig conf; public CassandraStore(final BackendStoreProvider provider, @@ -112,18 +112,13 @@ public BackendStoreProvider provider() { } @Override - public void open(HugeConfig config) { + public synchronized void open(HugeConfig config) { LOG.debug("Store open: {}", this.store); E.checkNotNull(config, "config"); if (this.sessions == null) { - synchronized(this) { - if (this.sessions == null) { - this.sessions = new CassandraSessionPool(config, - this.keyspace, - this.store); - } - } + this.sessions = new CassandraSessionPool(config, this.keyspace, + this.store); } if (this.sessions.opened()) { diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/config/CoreOptions.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/config/CoreOptions.java index 470e085354..829f65f48c 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/config/CoreOptions.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/config/CoreOptions.java @@ -118,8 +118,10 @@ public static synchronized CoreOptions instance() { public static final ConfigOption CONNECTION_DETECT_INTERVAL = new ConfigOption<>( "store.connection_detect_interval", - "The interval for detecting connection, if connection's " + - "idle time exceed this interval, detect it before using.", + "The interval in seconds for detecting connections, " + + "if the idle time of a connection exceeds this value, " + + "detect it and reconnect if needed before using, " + + "value 0 means detecting every time.", rangeInt(0L, Long.MAX_VALUE), 600L ); diff --git a/hugegraph-hbase/src/main/java/com/baidu/hugegraph/backend/store/hbase/HbaseStore.java b/hugegraph-hbase/src/main/java/com/baidu/hugegraph/backend/store/hbase/HbaseStore.java index 4810f32663..62a13811c3 100644 --- a/hugegraph-hbase/src/main/java/com/baidu/hugegraph/backend/store/hbase/HbaseStore.java +++ b/hugegraph-hbase/src/main/java/com/baidu/hugegraph/backend/store/hbase/HbaseStore.java @@ -58,7 +58,7 @@ public abstract class HbaseStore extends AbstractBackendStore { private final BackendStoreProvider provider; private final Map tables; - private volatile HbaseSessions sessions; + private HbaseSessions sessions; public HbaseStore(final BackendStoreProvider provider, final String namespace, final String store) { @@ -120,17 +120,11 @@ public BackendFeatures features() { } @Override - public void open(HugeConfig config) { + public synchronized void open(HugeConfig config) { E.checkNotNull(config, "config"); if (this.sessions == null) { - synchronized(this) { - if (this.sessions == null) { - this.sessions = new HbaseSessions(config, - this.namespace, - this.store); - } - } + this.sessions = new HbaseSessions(config, this.namespace, this.store); } if (this.sessions.opened()) {