Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,10 @@

package com.baidu.hugegraph.backend.store.rocksdb;

import java.io.IOException;
import java.nio.file.Paths;
import java.util.Set;

import org.apache.commons.io.FileUtils;
import org.rocksdb.RocksDBException;

import com.baidu.hugegraph.backend.BackendException;
import com.baidu.hugegraph.backend.store.BackendEntry.BackendColumnIterator;
import com.baidu.hugegraph.backend.store.BackendSession;
import com.baidu.hugegraph.backend.store.BackendSessionPool;
Expand All @@ -48,21 +44,6 @@ public RocksDBSessions(String database, String store) {
@Override
public abstract Session session();

public String wrapPath(String path) {
return wrapPath(path, this.store);
}

public static String wrapPath(String path, String store) {
// Ensure the `path` exists
try {
FileUtils.forceMkdir(FileUtils.getFile(path));
} catch (IOException e) {
throw new BackendException(e.getMessage(), e);
}
// Join with store type
return Paths.get(path, store).toString();
}

/**
* Session for RocksDB
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,13 @@ public class RocksDBStdSessions extends RocksDBSessions {
private final HugeConfig conf;
private final RocksDB rocksdb;

public RocksDBStdSessions(HugeConfig config, String database, String store)
public RocksDBStdSessions(HugeConfig config, String dataPath,
String walPath, String database, String store)
throws RocksDBException {
super(database, store);

this.conf = config;

String dataPath = wrapPath(this.conf.get(RocksDBOptions.DATA_PATH));
String walPath = wrapPath(this.conf.get(RocksDBOptions.WAL_PATH));

// Init options
Options options = new Options();
RocksDBStdSessions.initOptions(this.conf, options, options, options);
Expand All @@ -83,14 +81,12 @@ public RocksDBStdSessions(HugeConfig config, String database, String store)
this.rocksdb = RocksDB.open(options, dataPath);
}

public RocksDBStdSessions(HugeConfig config, String database, String store,
public RocksDBStdSessions(HugeConfig config, String dataPath,
String walPath, String database, String store,
List<String> cfNames) throws RocksDBException {
super(database, store);
this.conf = config;

String dataPath = wrapPath(this.conf.get(RocksDBOptions.DATA_PATH));
String walPath = wrapPath(this.conf.get(RocksDBOptions.WAL_PATH));

// Old CFs should always be opened
List<String> cfs = this.mergeOldCFs(dataPath, cfNames);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package com.baidu.hugegraph.backend.store.rocksdb;

import java.io.IOException;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
Expand All @@ -29,6 +31,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

import org.apache.commons.io.FileUtils;
import org.rocksdb.RocksDBException;
import org.slf4j.Logger;

Expand Down Expand Up @@ -129,9 +132,10 @@ public synchronized void open(HugeConfig config) {
}

// Open base disk
String dataPath = config.get(RocksDBOptions.DATA_PATH);
dataPath = RocksDBSessions.wrapPath(dataPath, this.store);
this.sessions = this.open(config, dataPath, this.tableNames());
String dataPath = this.wrapPath(config.get(RocksDBOptions.DATA_PATH));
String walPath = this.wrapPath(config.get(RocksDBOptions.WAL_PATH));

this.sessions = this.open(config, dataPath, walPath, this.tableNames());

// Open tables with optimized disk
List<String> disks = config.get(RocksDBOptions.DATA_DISKS);
Expand All @@ -140,18 +144,19 @@ public synchronized void open(HugeConfig config) {
for (Entry<HugeType, String> e : this.tableDiskMapping.entrySet()) {
String table = this.table(e.getKey()).table();
String disk = e.getValue();
this.open(config, disk, Arrays.asList(table));
this.open(config, disk, disk, Arrays.asList(table));
}
}
}

protected RocksDBSessions open(HugeConfig config, String dataPath,
List<String> tableNames) {
String walPath, List<String> tableNames) {
LOG.info("Opening RocksDB with data path: {}", dataPath);

RocksDBSessions sessions = null;
try {
sessions = this.openSessionPool(config, tableNames);
sessions = this.openSessionPool(config, dataPath,
walPath, tableNames);
} catch (RocksDBException e) {
if (dbs.containsKey(dataPath)) {
if (e.getMessage().contains("No locks available")) {
Expand All @@ -162,7 +167,8 @@ protected RocksDBSessions open(HugeConfig config, String dataPath,
try {
// Will open old CFs(of other keyspace)
final List<String> none = ImmutableList.of();
sessions = this.openSessionPool(config, none);
sessions = this.openSessionPool(config, dataPath,
walPath, none);
} catch (RocksDBException e1) {
// Let it throw later
e = e1;
Expand All @@ -174,7 +180,8 @@ protected RocksDBSessions open(HugeConfig config, String dataPath,
"try to init CF later", dataPath, this.database);
try {
// Only open default CF, won't open old CFs
sessions = this.openSessionPool(config, null);
sessions = this.openSessionPool(config, dataPath,
walPath, null);
} catch (RocksDBException e1) {
LOG.error("Failed to open RocksDB with default CF", e1);
}
Expand All @@ -198,12 +205,15 @@ protected RocksDBSessions open(HugeConfig config, String dataPath,
}

protected RocksDBSessions openSessionPool(HugeConfig config,
String dataPath, String walPath,
List<String> tableNames)
throws RocksDBException {
if (tableNames == null) {
return new RocksDBStdSessions(config, this.database, this.store);
return new RocksDBStdSessions(config, dataPath, walPath,
this.database, this.store);
} else {
return new RocksDBStdSessions(config, this.database, this.store,
return new RocksDBStdSessions(config, dataPath, walPath,
this.database, this.store,
tableNames);
}
}
Expand Down Expand Up @@ -401,7 +411,7 @@ private void parseTableDiskMapping(List<String> disks) {
String store = pair[0].trim();
HugeType table = HugeType.valueOf(pair[1].trim().toUpperCase());
if (this.store.equals(store)) {
path = RocksDBSessions.wrapPath(path, this.store);
path = this.wrapPath(path);
this.tableDiskMapping.put(table, path);
}
}
Expand All @@ -414,6 +424,17 @@ private static RocksDBSessions db(String disk) {
return db;
}

protected String wrapPath(String path) {
// Ensure the `path` exists
try {
FileUtils.forceMkdir(FileUtils.getFile(path));
} catch (IOException e) {
throw new BackendException(e.getMessage(), e);
}
// Join with store type
return Paths.get(path, this.store).toString();
}

/***************************** Store defines *****************************/

public static class RocksDBSchemaStore extends RocksDBStore {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,8 @@
import java.io.File;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

import org.apache.commons.lang3.tuple.Pair;
Expand All @@ -38,7 +34,6 @@

import com.baidu.hugegraph.backend.BackendException;
import com.baidu.hugegraph.backend.store.BackendEntry.BackendColumnIterator;
import com.baidu.hugegraph.backend.store.rocksdb.RocksDBOptions;
import com.baidu.hugegraph.backend.store.rocksdb.RocksDBSessions;
import com.baidu.hugegraph.backend.store.rocksdb.RocksDBStdSessions;
import com.baidu.hugegraph.config.HugeConfig;
Expand All @@ -51,11 +46,12 @@ public class RocksDBSstSessions extends RocksDBSessions {
private final String dataPath;
private final Map<String, SstFileWriter> tables;

public RocksDBSstSessions(HugeConfig conf, String database, String store) {
public RocksDBSstSessions(HugeConfig conf, String dataPath,
String database, String store) {
super(database, store);

this.conf = conf;
this.dataPath = this.wrapPath(this.conf.get(RocksDBOptions.DATA_PATH));
this.dataPath = dataPath;
this.tables = new ConcurrentHashMap<>();

File path = new File(this.dataPath);
Expand All @@ -64,9 +60,10 @@ public RocksDBSstSessions(HugeConfig conf, String database, String store) {
}
}

public RocksDBSstSessions(HugeConfig config, String database, String store,
public RocksDBSstSessions(HugeConfig config, String dataPath,
String database, String store,
List<String> tableNames) throws RocksDBException {
this(config, database, store);
this(config, dataPath, database, store);
for (String table : tableNames) {
this.createTable(table);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,14 @@ public RocksDBSstStore(final BackendStoreProvider provider,

@Override
protected RocksDBSessions openSessionPool(HugeConfig config,
String dataPath, String walPath,
List<String> tableNames)
throws RocksDBException {
if (tableNames == null) {
return new RocksDBSstSessions(config, this.database(),
return new RocksDBSstSessions(config, dataPath, this.database(),
this.store());
} else {
return new RocksDBSstSessions(config, this.database(),
return new RocksDBSstSessions(config, dataPath, this.database(),
this.store(), tableNames);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.mockito.Mockito;
import org.rocksdb.RocksDBException;

import com.baidu.hugegraph.backend.store.rocksdb.RocksDBOptions;
import com.baidu.hugegraph.backend.store.rocksdb.RocksDBSessions;
import com.baidu.hugegraph.backend.store.rocksdb.RocksDBStdSessions;
import com.baidu.hugegraph.config.HugeConfig;
Expand Down Expand Up @@ -117,9 +116,8 @@ private static RocksDBSessions open(String table) throws RocksDBException {
Configuration conf = Mockito.mock(PropertiesConfiguration.class);
Mockito.when(conf.getKeys()).thenReturn(Collections.emptyIterator());
HugeConfig config = new HugeConfig(conf);
config.setProperty(RocksDBOptions.DATA_PATH.name(), DB_PATH);
config.setProperty(RocksDBOptions.WAL_PATH.name(), DB_PATH);
RocksDBSessions rocks = new RocksDBStdSessions(config, "db", "store");
RocksDBSessions rocks = new RocksDBStdSessions(config, DB_PATH, DB_PATH,
"db", "store");
rocks.createTable(table);
return rocks;
}
Expand Down